Skip to content

Commit

Permalink
Add extra op when chunking (microsoft#14199)
Browse files Browse the repository at this point in the history
## Description

When chunking a large, compressed payload and:

- the chunk size is large (close to the 1MB limit)
- the payload is a batch with many small ops
- the last chunk is large as well (all chunks have the same size, except
for the last one)

considering that the empty ops are bundled with the last chunk, there is
a risk that the extra overhead from the empty ops will make the last
batch sent by the splitter to go over the 1MB limit and fail.

A best-effort approach to mitigate this case would be to always add an
extra empty chunk which would be sent along with the empty ops.

I've also adjusted the end-to-end test to make sure we cover the
scenario of having many small ops.
  • Loading branch information
andre4i committed Feb 16, 2023
1 parent 1fc3cbc commit a240e06
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 27 deletions.
28 changes: 17 additions & 11 deletions packages/runtime/container-runtime/src/opLifecycle/opSplitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,27 +212,33 @@ export const splitOp = (op: BatchMessage, chunkSizeInBytes: number): IChunkedOp[
);

const contentLength = op.contents.length;
const chunkN = Math.floor((contentLength - 1) / chunkSizeInBytes) + 1;
const chunkCount = Math.floor((contentLength - 1) / chunkSizeInBytes) + 2;
let offset = 0;
for (let i = 1; i <= chunkN; i++) {
for (let i = 1; i < chunkCount; i++) {
const chunk: IChunkedOp = {
chunkId: i,
contents: op.contents.substr(offset, chunkSizeInBytes),
originalType: op.deserializedContent.type,
totalChunks: chunkN,
totalChunks: chunkCount,
};

if (i === chunkN) {
// We don't need to port these to all the chunks,
// as we rebuild the original op when we process the
// last chunk, therefore it is the only one that needs it.
chunk.originalMetadata = op.metadata;
chunk.originalCompression = op.compression;
}

chunks.push(chunk);
offset += chunkSizeInBytes;
assert(i === chunkCount - 1 || offset <= contentLength, "Content offset within bounds");
}

assert(offset >= contentLength, "Content offset equal or larger than content length");
// The last chunk has empty contents, to minimize the risk of the
// resulting payload exceeding 1MB due to the overhead from the empty ops
// which will be bundled with this op.
chunks.push({
chunkId: chunkCount,
contents: "",
originalType: op.deserializedContent.type,
totalChunks: chunkCount,
originalMetadata: op.metadata,
originalCompression: op.compression,
});

return chunks;
};
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,17 @@ describe("OpSplitter", () => {
assert.equal(opSplitter.processRemoteMessage(chunks1[1]).state, "Accepted");
assert.equal(opSplitter.processRemoteMessage(chunks2[1]).state, "Accepted");

const chunks1LastResult = opSplitter.processRemoteMessage(chunks1[2]);
assert.equal(opSplitter.processRemoteMessage(chunks1[2]).state, "Accepted");
const chunks1LastResult = opSplitter.processRemoteMessage(chunks1[3]);
// The last chunk will reconstruct the original message
assert.equal(chunks1LastResult.state, "Processed");
assertSameMessage(chunks1LastResult.message, op1);
assert.equal(opSplitter.chunks.size, 1);

assert.equal(opSplitter.processRemoteMessage(chunks2[2]).state, "Accepted");
assert.equal(opSplitter.processRemoteMessage(chunks2[3]).state, "Accepted");

const chunks2LastResult = opSplitter.processRemoteMessage(chunks2[3]);
const chunks2LastResult = opSplitter.processRemoteMessage(chunks2[4]);
// The last chunk will reconstruct the original message
assert.equal(chunks2LastResult.state, "Processed");
assertSameMessage(chunks2LastResult.message, op2);
Expand All @@ -76,6 +78,7 @@ describe("OpSplitter", () => {
);
opSplitter.processRemoteMessage(chunks[0]);
opSplitter.processRemoteMessage(chunks[1]);
opSplitter.processRemoteMessage(chunks[2]);

const otherOpSplitter = new OpSplitter(
Array.from(opSplitter.chunks),
Expand All @@ -86,8 +89,8 @@ describe("OpSplitter", () => {
);
opSplitter.clearPartialChunks("testClient");

otherOpSplitter.processRemoteMessage(chunks[2]);
assertSameMessage(otherOpSplitter.processRemoteMessage(chunks[3]).message, op);
otherOpSplitter.processRemoteMessage(chunks[3]);
assertSameMessage(otherOpSplitter.processRemoteMessage(chunks[4]).message, op);
});

it("Clear chunks", () => {
Expand Down Expand Up @@ -269,7 +272,7 @@ describe("OpSplitter", () => {
contentSizeInBytes: largeMessage.contents?.length ?? 0,
});

assert.equal(batchesSubmitted.length, 5);
assert.equal(batchesSubmitted.length, 6);
for (const batch of batchesSubmitted) {
assert.equal(batch.length, 1);
assert.equal(
Expand All @@ -296,7 +299,7 @@ describe("OpSplitter", () => {
{
eventName: "OpSplitter:Chunked compressed batch",
length: result.content.length,
chunks: 100 / 20 + 1,
chunks: 100 / 20 + 2,
chunkSizeInBytes: 20,
},
]),
Expand All @@ -318,7 +321,7 @@ describe("OpSplitter", () => {
contentSizeInBytes: largeMessage.contents?.length ?? 0,
});

assert.equal(batchesSubmitted.length, 5);
assert.equal(batchesSubmitted.length, 6);
for (const batch of batchesSubmitted) {
assert.equal(batch.length, 1);
assert.equal(
Expand All @@ -344,7 +347,7 @@ describe("OpSplitter", () => {
{
eventName: "OpSplitter:Chunked compressed batch",
length: result.content.length,
chunks: 100 / 20 + 1,
chunks: 100 / 20 + 2,
chunkSizeInBytes: 20,
},
]),
Expand Down
39 changes: 31 additions & 8 deletions packages/test/test-end-to-end-tests/src/test/messageSize.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ describeNoCompat("Message size", (getTestObjectProvider) => {
},
);

const largeString = generateStringOfSize(500000);
const largeString = generateStringOfSize(maxMessageSizeInBytes);
const messageCount = 10;
assert.throws(() => setMapKeys(dataObject1map, messageCount, largeString));
await provider.ensureSynchronized();
Expand Down Expand Up @@ -302,7 +302,7 @@ describeNoCompat("Message size", (getTestObjectProvider) => {
minimumBatchSizeInBytes: 1024 * 1024,
compressionAlgorithm: CompressionAlgorithms.lz4,
},
chunkSizeInBytes: 600 * 1024,
chunkSizeInBytes: 800 * 1024,
summaryOptions: { summaryConfigOverrides: { state: "disabled" } },
},
};
Expand All @@ -312,12 +312,12 @@ describeNoCompat("Message size", (getTestObjectProvider) => {
[
{ messagesInBatch: 1, messageSize: 5 * 1024 * 1024 }, // One large message
{ messagesInBatch: 3, messageSize: 5 * 1024 * 1024 }, // Three large messages
{ messagesInBatch: 50, messageSize: 215 * 1024 }, // Many small messages
{ messagesInBatch: 1500, messageSize: 4 * 1024 }, // Many small messages
].forEach((config) => {
it(
"Large payloads pass when compression enabled, " +
"compressed content is over max op size and chunking enabled. " +
`${config.messagesInBatch} messages of ${config.messageSize}b == ` +
`${config.messagesInBatch.toLocaleString()} messages of ${config.messageSize.toLocaleString()} bytes == ` +
`${((config.messagesInBatch * config.messageSize) / (1024 * 1024)).toFixed(
2,
)} MB`,
Expand All @@ -332,12 +332,35 @@ describeNoCompat("Message size", (getTestObjectProvider) => {
}

await setupContainers(chunkingBatchesConfig);
const largeString = generateRandomStringOfSize(config.messageSize);
setMapKeys(dataObject1map, config.messagesInBatch, largeString);

// Force the container into write-mode by sending a small op,
// so that it won't resend the whole large batch at reconnection
dataObject1map.set("test", "test");
await provider.ensureSynchronized();

const generated: string[] = [];
for (let i = 0; i < config.messagesInBatch; i++) {
// Ensure that the contents don't get compressed properly, by
// generating a random string for each map value instead of repeating it
const content = generateRandomStringOfSize(config.messageSize);
generated.push(content);
dataObject1map.set(`key${i}`, content);
}

await provider.ensureSynchronized();

assertMapValues(dataObject2map, config.messagesInBatch, largeString);
assertMapValues(dataObject1map, config.messagesInBatch, largeString);
for (let i = 0; i < config.messagesInBatch; i++) {
assert.strictEqual(
dataObject1map.get(`key${i}`),
generated[i],
`Wrong value for key${i} in local map`,
);
assert.strictEqual(
dataObject2map.get(`key${i}`),
generated[i],
`Wrong value for key${i} in remote map`,
);
}
},
).timeout(chunkingBatchesTimeoutMs);
}));
Expand Down

0 comments on commit a240e06

Please sign in to comment.