diff --git a/api-report/runtime-definitions.api.md b/api-report/runtime-definitions.api.md index 8df6e0ac0d04..bc5667002be6 100644 --- a/api-report/runtime-definitions.api.md +++ b/api-report/runtime-definitions.api.md @@ -100,6 +100,11 @@ export enum FlushMode { TurnBased = 1 } +// @public (undocumented) +export enum FlushModeExperimental { + Async = 2 +} + // @public export const gcBlobPrefix = "__gc"; diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 1c108384a85f..b309cbb19892 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -68,6 +68,7 @@ import { } from "@fluidframework/protocol-definitions"; import { FlushMode, + FlushModeExperimental, gcTreeKey, InboundAttachMessage, IFluidDataStoreContextDetached, @@ -908,7 +909,7 @@ export class ContainerRuntime private _orderSequentiallyCalls: number = 0; private readonly _flushMode: FlushMode; - private flushMicroTaskExists = false; + private flushTaskExists = false; private _connected: boolean; @@ -2115,7 +2116,7 @@ export class ContainerRuntime * Are we in the middle of batching ops together? */ private currentlyBatching() { - return this.flushMode === FlushMode.TurnBased || this._orderSequentiallyCalls !== 0; + return this.flushMode !== FlushMode.Immediate || this._orderSequentiallyCalls !== 0; } public getQuorum(): IQuorumClients { @@ -2879,17 +2880,8 @@ export class ContainerRuntime if (!this.currentlyBatching()) { this.flush(); - } else if (!this.flushMicroTaskExists) { - this.flushMicroTaskExists = true; - // Queue a microtask to detect the end of the turn and force a flush. - Promise.resolve() - .then(() => { - this.flushMicroTaskExists = false; - this.flush(); - }) - .catch((error) => { - this.closeFn(error as GenericError); - }); + } else { + this.scheduleFlush(); } } catch (error) { this.closeFn(error as GenericError); @@ -2901,6 +2893,46 @@ export class ContainerRuntime } } + private scheduleFlush() { + if (this.flushTaskExists) { + return; + } + + this.flushTaskExists = true; + const flush = () => { + this.flushTaskExists = false; + try { + this.flush(); + } catch (error) { + this.closeFn(error as GenericError); + } + }; + + switch (this.flushMode) { + case FlushMode.TurnBased: + // When in TurnBased flush mode the runtime will buffer operations in the current turn and send them as a single + // batch at the end of the turn + // eslint-disable-next-line @typescript-eslint/no-floating-promises + Promise.resolve().then(flush); + break; + + // FlushModeExperimental is experimental and not exposed directly in the runtime APIs + case FlushModeExperimental.Async as unknown as FlushMode: + // When in Async flush mode, the runtime will accumulate all operations across JS turns and send them as a single + // batch when all micro-tasks are complete. + // Compared to TurnBased, this flush mode will capture more ops into the same batch. + setTimeout(flush, 0); + break; + + default: + assert( + this._orderSequentiallyCalls > 0, + "Unreachable unless running under orderSequentially", + ); + break; + } + } + private submitSummaryMessage(contents: ISummaryContent) { this.verifyNotClosed(); assert( diff --git a/packages/runtime/runtime-definitions/src/dataStoreContext.ts b/packages/runtime/runtime-definitions/src/dataStoreContext.ts index d56cd373d1d1..09520b29b5a5 100644 --- a/packages/runtime/runtime-definitions/src/dataStoreContext.ts +++ b/packages/runtime/runtime-definitions/src/dataStoreContext.ts @@ -59,6 +59,16 @@ export enum FlushMode { TurnBased, } +export enum FlushModeExperimental { + /** + * When in Async flush mode, the runtime will accumulate all operations across JS turns and send them as a single + * batch when all micro-tasks are complete. + * + * @experimental - Not ready for use + */ + Async = 2, +} + /** * This tells the visibility state of a Fluid object. It basically tracks whether the object is not visible, visible * locally within the container only or visible globally to all clients. diff --git a/packages/runtime/runtime-definitions/src/index.ts b/packages/runtime/runtime-definitions/src/index.ts index ffe164fb9537..1141340b1471 100644 --- a/packages/runtime/runtime-definitions/src/index.ts +++ b/packages/runtime/runtime-definitions/src/index.ts @@ -14,6 +14,7 @@ export { BindState, CreateChildSummarizerNodeFn, FlushMode, + FlushModeExperimental, IContainerRuntimeBase, IContainerRuntimeBaseEvents, IDataStore, diff --git a/packages/test/test-end-to-end-tests/src/test/fewerBatches.spec.ts b/packages/test/test-end-to-end-tests/src/test/fewerBatches.spec.ts new file mode 100644 index 000000000000..53f4e057b68b --- /dev/null +++ b/packages/test/test-end-to-end-tests/src/test/fewerBatches.spec.ts @@ -0,0 +1,122 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "assert"; +import { IContainer } from "@fluidframework/container-definitions"; +import { SharedMap } from "@fluidframework/map"; +import { IDocumentMessage } from "@fluidframework/protocol-definitions"; +import { requestFluidObject } from "@fluidframework/runtime-utils"; +import { + ChannelFactoryRegistry, + DataObjectFactoryType, + ITestContainerConfig, + ITestFluidObject, + ITestObjectProvider, + waitForContainerConnection, +} from "@fluidframework/test-utils"; +import { describeNoCompat } from "@fluidframework/test-version-utils"; +import { FlushMode, FlushModeExperimental } from "@fluidframework/runtime-definitions"; + +describeNoCompat("Less batches", (getTestObjectProvider) => { + const mapId = "mapId"; + const registry: ChannelFactoryRegistry = [[mapId, SharedMap.getFactory()]]; + const testContainerConfig: ITestContainerConfig = { + fluidDataObjectType: DataObjectFactoryType.Test, + registry, + }; + + let provider: ITestObjectProvider; + const capturedBatches: IDocumentMessage[][] = []; + + beforeEach(() => { + provider = getTestObjectProvider(); + capturedBatches.splice(0); + }); + afterEach(async () => provider.reset()); + + let localContainer: IContainer; + let remoteContainer: IContainer; + let dataObject1: ITestFluidObject; + let dataObject2: ITestFluidObject; + let dataObject1map: SharedMap; + let dataObject2map: SharedMap; + + const setupContainers = async (containerConfig: ITestContainerConfig) => { + // Create a Container for the first client. + localContainer = await provider.makeTestContainer(containerConfig); + dataObject1 = await requestFluidObject(localContainer, "default"); + dataObject1map = await dataObject1.getSharedObject(mapId); + + // Load the Container that was created by the first client. + remoteContainer = await provider.loadTestContainer(containerConfig); + dataObject2 = await requestFluidObject(remoteContainer, "default"); + dataObject2map = await dataObject2.getSharedObject(mapId); + await waitForContainerConnection(localContainer, true); + await waitForContainerConnection(remoteContainer, true); + + localContainer.deltaManager.outbound.on("op", (batch: IDocumentMessage[]) => { + capturedBatches.push(batch); + }); + await provider.ensureSynchronized(); + }; + + [ + { + flushMode: FlushMode.TurnBased, + batchCount: 5, + }, + { + flushMode: FlushMode.Immediate, + batchCount: 5, + }, + { + flushMode: FlushModeExperimental.Async as unknown as FlushMode, + batchCount: 1, + }, + ].forEach((test) => { + it(`With runtime flushMode=FlushMode.${ + FlushMode[test.flushMode] + }, ops across JS turns produce ${test.batchCount} batches`, async () => { + await setupContainers({ + ...testContainerConfig, + runtimeOptions: { + flushMode: test.flushMode, + }, + }); + + // Force the container into write-mode + dataObject1map.set("key0", "0"); + await provider.ensureSynchronized(); + + // Ignore the batch we just sent + capturedBatches.splice(0); + + const opCount = 5; + dataObject1map.set("key1", "1"); + + await Promise.resolve().then(async () => { + dataObject1map.set("key2", "2"); + }); + await Promise.resolve().then(async () => { + dataObject1map.set("key3", "3"); + }); + await Promise.resolve().then(async () => { + dataObject1map.set("key4", "4"); + await Promise.resolve().then(async () => { + dataObject1map.set("key5", "5"); + }); + }); + + await provider.ensureSynchronized(); + + assert.strictEqual(capturedBatches.length, test.batchCount); + + for (let i = 1; i <= opCount; i++) { + const value = dataObject2map.get(`key${i}`); + assert.strictEqual(value, `${i}`, `Wrong value for key${i}`); + } + }); + }); +});