Skip to content

Commit

Permalink
Introduce a new FlushMode in the interest of not creating too many ba…
Browse files Browse the repository at this point in the history
…tches (microsoft#14106)

## Description

When the client uses async code, there is a risk of creating too many
batches (enough batches to trigger server-side throttling) as with
FlushMode.TurnBased as flush is scheduled as a microtask. This change
allows for flushing to happen in a scheduled macrotask, capturing all
the ops from all microtask and bundling them as a single batch.

**Note this ability is not part of the public API. The runtime does not
accept this new flush mode without explicit casting**

This change is microsoft#14060
but for `main`
  • Loading branch information
andre4i committed Feb 10, 2023
1 parent 1e7e1ec commit bb38412
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 13 deletions.
5 changes: 5 additions & 0 deletions api-report/runtime-definitions.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ export enum FlushMode {
TurnBased = 1
}

// @public (undocumented)
export enum FlushModeExperimental {
Async = 2
}

// @public
export const gcBlobPrefix = "__gc";

Expand Down
58 changes: 45 additions & 13 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import {
} from "@fluidframework/protocol-definitions";
import {
FlushMode,
FlushModeExperimental,
gcTreeKey,
InboundAttachMessage,
IFluidDataStoreContextDetached,
Expand Down Expand Up @@ -908,7 +909,7 @@ export class ContainerRuntime

private _orderSequentiallyCalls: number = 0;
private readonly _flushMode: FlushMode;
private flushMicroTaskExists = false;
private flushTaskExists = false;

private _connected: boolean;

Expand Down Expand Up @@ -2115,7 +2116,7 @@ export class ContainerRuntime
* Are we in the middle of batching ops together?
*/
private currentlyBatching() {
return this.flushMode === FlushMode.TurnBased || this._orderSequentiallyCalls !== 0;
return this.flushMode !== FlushMode.Immediate || this._orderSequentiallyCalls !== 0;
}

public getQuorum(): IQuorumClients {
Expand Down Expand Up @@ -2879,17 +2880,8 @@ export class ContainerRuntime

if (!this.currentlyBatching()) {
this.flush();
} else if (!this.flushMicroTaskExists) {
this.flushMicroTaskExists = true;
// Queue a microtask to detect the end of the turn and force a flush.
Promise.resolve()
.then(() => {
this.flushMicroTaskExists = false;
this.flush();
})
.catch((error) => {
this.closeFn(error as GenericError);
});
} else {
this.scheduleFlush();
}
} catch (error) {
this.closeFn(error as GenericError);
Expand All @@ -2901,6 +2893,46 @@ export class ContainerRuntime
}
}

private scheduleFlush() {
if (this.flushTaskExists) {
return;
}

this.flushTaskExists = true;
const flush = () => {
this.flushTaskExists = false;
try {
this.flush();
} catch (error) {
this.closeFn(error as GenericError);
}
};

switch (this.flushMode) {
case FlushMode.TurnBased:
// When in TurnBased flush mode the runtime will buffer operations in the current turn and send them as a single
// batch at the end of the turn
// eslint-disable-next-line @typescript-eslint/no-floating-promises
Promise.resolve().then(flush);
break;

// FlushModeExperimental is experimental and not exposed directly in the runtime APIs
case FlushModeExperimental.Async as unknown as FlushMode:
// When in Async flush mode, the runtime will accumulate all operations across JS turns and send them as a single
// batch when all micro-tasks are complete.
// Compared to TurnBased, this flush mode will capture more ops into the same batch.
setTimeout(flush, 0);
break;

default:
assert(
this._orderSequentiallyCalls > 0,
"Unreachable unless running under orderSequentially",
);
break;
}
}

private submitSummaryMessage(contents: ISummaryContent) {
this.verifyNotClosed();
assert(
Expand Down
10 changes: 10 additions & 0 deletions packages/runtime/runtime-definitions/src/dataStoreContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ export enum FlushMode {
TurnBased,
}

export enum FlushModeExperimental {
/**
* When in Async flush mode, the runtime will accumulate all operations across JS turns and send them as a single
* batch when all micro-tasks are complete.
*
* @experimental - Not ready for use
*/
Async = 2,
}

/**
* This tells the visibility state of a Fluid object. It basically tracks whether the object is not visible, visible
* locally within the container only or visible globally to all clients.
Expand Down
1 change: 1 addition & 0 deletions packages/runtime/runtime-definitions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export {
BindState,
CreateChildSummarizerNodeFn,
FlushMode,
FlushModeExperimental,
IContainerRuntimeBase,
IContainerRuntimeBaseEvents,
IDataStore,
Expand Down
122 changes: 122 additions & 0 deletions packages/test/test-end-to-end-tests/src/test/fewerBatches.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/

import { strict as assert } from "assert";
import { IContainer } from "@fluidframework/container-definitions";
import { SharedMap } from "@fluidframework/map";
import { IDocumentMessage } from "@fluidframework/protocol-definitions";
import { requestFluidObject } from "@fluidframework/runtime-utils";
import {
ChannelFactoryRegistry,
DataObjectFactoryType,
ITestContainerConfig,
ITestFluidObject,
ITestObjectProvider,
waitForContainerConnection,
} from "@fluidframework/test-utils";
import { describeNoCompat } from "@fluidframework/test-version-utils";
import { FlushMode, FlushModeExperimental } from "@fluidframework/runtime-definitions";

describeNoCompat("Less batches", (getTestObjectProvider) => {
const mapId = "mapId";
const registry: ChannelFactoryRegistry = [[mapId, SharedMap.getFactory()]];
const testContainerConfig: ITestContainerConfig = {
fluidDataObjectType: DataObjectFactoryType.Test,
registry,
};

let provider: ITestObjectProvider;
const capturedBatches: IDocumentMessage[][] = [];

beforeEach(() => {
provider = getTestObjectProvider();
capturedBatches.splice(0);
});
afterEach(async () => provider.reset());

let localContainer: IContainer;
let remoteContainer: IContainer;
let dataObject1: ITestFluidObject;
let dataObject2: ITestFluidObject;
let dataObject1map: SharedMap;
let dataObject2map: SharedMap;

const setupContainers = async (containerConfig: ITestContainerConfig) => {
// Create a Container for the first client.
localContainer = await provider.makeTestContainer(containerConfig);
dataObject1 = await requestFluidObject<ITestFluidObject>(localContainer, "default");
dataObject1map = await dataObject1.getSharedObject<SharedMap>(mapId);

// Load the Container that was created by the first client.
remoteContainer = await provider.loadTestContainer(containerConfig);
dataObject2 = await requestFluidObject<ITestFluidObject>(remoteContainer, "default");
dataObject2map = await dataObject2.getSharedObject<SharedMap>(mapId);
await waitForContainerConnection(localContainer, true);
await waitForContainerConnection(remoteContainer, true);

localContainer.deltaManager.outbound.on("op", (batch: IDocumentMessage[]) => {
capturedBatches.push(batch);
});
await provider.ensureSynchronized();
};

[
{
flushMode: FlushMode.TurnBased,
batchCount: 5,
},
{
flushMode: FlushMode.Immediate,
batchCount: 5,
},
{
flushMode: FlushModeExperimental.Async as unknown as FlushMode,
batchCount: 1,
},
].forEach((test) => {
it(`With runtime flushMode=FlushMode.${
FlushMode[test.flushMode]
}, ops across JS turns produce ${test.batchCount} batches`, async () => {
await setupContainers({
...testContainerConfig,
runtimeOptions: {
flushMode: test.flushMode,
},
});

// Force the container into write-mode
dataObject1map.set("key0", "0");
await provider.ensureSynchronized();

// Ignore the batch we just sent
capturedBatches.splice(0);

const opCount = 5;
dataObject1map.set("key1", "1");

await Promise.resolve().then(async () => {
dataObject1map.set("key2", "2");
});
await Promise.resolve().then(async () => {
dataObject1map.set("key3", "3");
});
await Promise.resolve().then(async () => {
dataObject1map.set("key4", "4");
await Promise.resolve().then(async () => {
dataObject1map.set("key5", "5");
});
});

await provider.ensureSynchronized();

assert.strictEqual(capturedBatches.length, test.batchCount);

for (let i = 1; i <= opCount; i++) {
const value = dataObject2map.get(`key${i}`);
assert.strictEqual(value, `${i}`, `Wrong value for key${i}`);
}
});
});
});

0 comments on commit bb38412

Please sign in to comment.