Skip to content

Commit

Permalink
Allow for disabling both compression and chunking via feature gates (m…
Browse files Browse the repository at this point in the history
…icrosoft#13578)

Even if the features are enabled using the container runtime config
(through code), this change adds the option of disabling the features
(in case of emergency) without code changes.

**Regardless of configs or feature gates, compression/chunking is not
disabled on the receiving end. Any client will be able to process
compressed and or chunked ops. The configs/feature gates are relative to
the producer of the ops, not the consumer.**
  • Loading branch information
andre4i committed Jan 13, 2023
1 parent c1dd970 commit 2cce364
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 6 deletions.
11 changes: 9 additions & 2 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,8 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>
const opSplitter = new OpSplitter(
chunks,
this.context.submitBatchFn,
runtimeOptions.chunkSizeInBytes,
this.mc.config.getBoolean("Fluid.ContainerRuntime.DisableCompressionChunking") === true ?
Number.POSITIVE_INFINITY : runtimeOptions.chunkSizeInBytes,
runtimeOptions.maxBatchSizeInBytes,
this.mc.logger);
this.remoteMessageProcessor = new RemoteMessageProcessor(opSplitter, new OpDecompressor());
Expand Down Expand Up @@ -1184,14 +1185,20 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>
},
pendingRuntimeState?.pending);

const compressionOptions = this.mc.config.getBoolean("Fluid.ContainerRuntime.DisableCompression") === true ?
{
minimumBatchSizeInBytes: Number.POSITIVE_INFINITY,
compressionAlgorithm: CompressionAlgorithms.lz4
} : runtimeOptions.compressionOptions;

this.outbox = new Outbox({
shouldSend: () => this.canSendOps(),
pendingStateManager: this.pendingStateManager,
containerContext: this.context,
compressor: new OpCompressor(this.mc.logger),
splitter: opSplitter,
config: {
compressionOptions: runtimeOptions.compressionOptions,
compressionOptions,
maxBatchSizeInBytes: runtimeOptions.maxBatchSizeInBytes,
enableOpReentryCheck: this.enableOpReentryCheck,
},
Expand Down
57 changes: 53 additions & 4 deletions packages/test/test-end-to-end-tests/src/test/messageSize.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { GenericError } from "@fluidframework/container-utils";
import { FlushMode } from "@fluidframework/runtime-definitions";
import { CompressionAlgorithms, ContainerMessageType } from "@fluidframework/container-runtime";
import { IDocumentMessage, ISequencedDocumentMessage, MessageType } from "@fluidframework/protocol-definitions";
import { ConfigTypes, IConfigProviderBase } from "@fluidframework/telemetry-utils";

describeNoCompat("Message size", (getTestObjectProvider) => {
const mapId = "mapId";
Expand All @@ -45,15 +46,28 @@ describeNoCompat("Message size", (getTestObjectProvider) => {
let dataObject1map: SharedMap;
let dataObject2map: SharedMap;

const setupContainers = async (containerConfig: ITestContainerConfig) => {
// Create a Container for the first client.
const configProvider = ((settings: Record<string, ConfigTypes>): IConfigProviderBase => {
return {
getRawConfig: (name: string): ConfigTypes => settings[name],
};
});

localContainer = await provider.makeTestContainer(containerConfig);
const setupContainers = async (
containerConfig: ITestContainerConfig,
featureGates: Record<string, ConfigTypes> = {},
) => {
const configWithFeatureGates = {
...containerConfig,
loaderProps: { configProvider: configProvider(featureGates) },
};

// Create a Container for the first client.
localContainer = await provider.makeTestContainer(configWithFeatureGates);
dataObject1 = await requestFluidObject<ITestFluidObject>(localContainer, "default");
dataObject1map = await dataObject1.getSharedObject<SharedMap>(mapId);

// Load the Container that was created by the first client.
remoteContainer = await provider.loadTestContainer(containerConfig);
remoteContainer = await provider.loadTestContainer(configWithFeatureGates);
dataObject2 = await requestFluidObject<ITestFluidObject>(remoteContainer, "default");
dataObject2map = await dataObject2.getSharedObject<SharedMap>(mapId);
await new Promise<void>((resolve) => localContainer.once("connected", () => resolve()));
Expand Down Expand Up @@ -184,6 +198,25 @@ describeNoCompat("Message size", (getTestObjectProvider) => {
await provider.ensureSynchronized();
});

itExpects("Large ops fail when compression is disabled by feature gate and the content is over max op size", [
{ eventName: "fluid:telemetry:Container:ContainerClose", error: "BatchTooLarge" },
], async function() {
const maxMessageSizeInBytes = 5 * 1024 * 1024; // 5MB
await setupContainers({
...testContainerConfig,
runtimeOptions: {
compressionOptions: { minimumBatchSizeInBytes: 1, compressionAlgorithm: CompressionAlgorithms.lz4 },
},
}, {
"Fluid.ContainerRuntime.DisableCompression": true,
});

const largeString = generateStringOfSize(500000);
const messageCount = 10;
assert.throws(() => setMapKeys(dataObject1map, messageCount, largeString));
await provider.ensureSynchronized();
});

itExpects("Large ops fail when compression enabled and compressed content is over max op size", [
{ eventName: "fluid:telemetry:Container:ContainerClose", error: "BatchTooLarge" },
], async function() {
Expand Down Expand Up @@ -237,6 +270,22 @@ describeNoCompat("Message size", (getTestObjectProvider) => {
}).timeout(chunkingBatchesTimeoutMs);
}));

itExpects("Large ops fail when compression chunking is disabled by feature gate", [
{ eventName: "fluid:telemetry:Container:ContainerClose", error: "BatchTooLarge" },
], async function() {
const maxMessageSizeInBytes = 5 * 1024 * 1024; // 5MB
await setupContainers(
chunkingBatchesConfig,
{
"Fluid.ContainerRuntime.DisableCompressionChunking": true,
});

const largeString = generateRandomStringOfSize(maxMessageSizeInBytes);
const messageCount = 3; // Will result in a 15 MB payload
setMapKeys(dataObject1map, messageCount, largeString);
await provider.ensureSynchronized();
});

describe("Resiliency", () => {
const messageSize = 5 * 1024 * 1024;
const messagesInBatch = 3;
Expand Down

0 comments on commit 2cce364

Please sign in to comment.