Skip to content

Commit

Permalink
PendingStateManager: More efficient validation of local op roundtrip (#…
Browse files Browse the repository at this point in the history
…22139)

We have 2 ways to check data coherency for local ops as they round-trip
through the ordering service:

1. compare the contents of each incoming message v. pending message
2. compare the batch info for the incoming batch and the next pending
message

This PR adds an additional check to (2) to make it more robust - it
checks the batch length in addition to batch start CSN. This makes it
pretty equivalent to the old validation (year+ ago) where we only looked
at each message's CSN (Before GroupedBatching changed CSN semantics).
  • Loading branch information
markfields committed Aug 15, 2024
1 parent 56989e4 commit 83c8b37
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 61 deletions.
4 changes: 2 additions & 2 deletions packages/runtime/container-runtime/src/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import type { BatchId } from "./opLifecycle/index.js";

/** Syntactic sugar for casting */
export function asBatchMetadata(metadata: unknown): IBatchMetadata | undefined {
return metadata as IBatchMetadata | undefined;
export function asBatchMetadata(metadata: unknown): Partial<IBatchMetadata> | undefined {
return metadata as Partial<IBatchMetadata> | undefined;
}

/** Syntactic sugar for casting */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export class RemoteMessageProcessor {
}

if (isGroupedBatch(message)) {
// We should be awaiting a new batch (batchStartCsn undefined)
// We should be awaiting a new batch (batchInProgress undefined)
assert(
this.batchInProgress === undefined,
0x9d3 /* Grouped batch interrupting another batch */,
Expand Down
62 changes: 35 additions & 27 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,24 @@ export interface IPendingMessage {
localOpMetadata: unknown;
opMetadata: Record<string, unknown> | undefined;
sequenceNumber?: number;
/** Info needed to compute the batchId on reconnect */
batchIdContext: {
/** Info about the batch this pending message belongs to, for validation and for computing the batchId on reconnect */
batchInfo: {
/** The Batch's original clientId, from when it was first flushed to be submitted */
clientId: string;
/**
* The Batch's original clientSequenceNumber, from when it was first flushed to be submitted
* @remarks A negative value means it was not yet submitted when queued here (e.g. disconnected right before flush fired)
*/
batchStartCsn: number;
/** length of the batch (how many runtime messages here) */
length: number;
};
}

type Patch<T, U> = U & Omit<T, keyof U>;

/** First version of the type (pre-dates batchIdContext) */
type IPendingMessageV0 = Patch<IPendingMessage, { batchIdContext?: undefined }>;
/** First version of the type (pre-dates batchInfo) */
type IPendingMessageV0 = Patch<IPendingMessage, { batchInfo?: undefined }>;

/**
* Union of all supported schemas for when applying stashed ops
Expand Down Expand Up @@ -133,10 +135,7 @@ function withoutLocalOpMetadata(message: IPendingMessage): IPendingMessage {
function getEffectiveBatchId(pendingMessage: IPendingMessage): string {
return (
asBatchMetadata(pendingMessage.opMetadata)?.batchId ??
generateBatchId(
pendingMessage.batchIdContext.clientId,
pendingMessage.batchIdContext.batchStartCsn,
)
generateBatchId(pendingMessage.batchInfo.clientId, pendingMessage.batchInfo.batchStartCsn)
);
}

Expand Down Expand Up @@ -273,8 +272,8 @@ export class PendingStateManager implements IDisposable {
content,
localOpMetadata,
opMetadata,
// Note: We only need this on the first message.
batchIdContext: { clientId, batchStartCsn },
// Note: We only will read this off the first message, but put it on all for simplicity
batchInfo: { clientId, batchStartCsn, length: batch.length },
};
this.pendingMessages.push(pendingMessage);
}
Expand Down Expand Up @@ -304,7 +303,7 @@ export class PendingStateManager implements IDisposable {
try {
if (isEmptyBatchPendingMessage(nextMessage)) {
nextMessage.localOpMetadata = { emptyBatch: true }; // equivalent to applyStashedOp for empty batch
patchBatchIdContext(nextMessage); // Back compat
patchbatchInfo(nextMessage); // Back compat
this.pendingMessages.push(nextMessage);
continue;
}
Expand All @@ -317,7 +316,7 @@ export class PendingStateManager implements IDisposable {
} else {
nextMessage.localOpMetadata = localOpMetadata;
// then we push onto pendingMessages which will cause PendingStateManager to resubmit when we connect
patchBatchIdContext(nextMessage); // Back compat
patchbatchInfo(nextMessage); // Back compat
this.pendingMessages.push(nextMessage);
}
} catch (error) {
Expand Down Expand Up @@ -374,9 +373,9 @@ export class PendingStateManager implements IDisposable {
asEmptyBatchLocalOpMetadata(localOpMetadata)?.emptyBatch === true,
"Expected empty batch marker",
);
return [];
}

// Note this will correctly return empty array for an empty batch
return batch.messages.map((message) => ({
message,
localOpMetadata: this.processNextPendingMessage(message.sequenceNumber, message),
Expand Down Expand Up @@ -447,7 +446,7 @@ export class PendingStateManager implements IDisposable {
}

/**
* Do some bookkeeping for the new batch
* Check if the incoming batch matches the batch info for the next pending message.
*/
private onLocalBatchBegin(batch: InboundBatch) {
// Get the next message from the pending queue. Verify a message exists.
Expand All @@ -462,17 +461,26 @@ export class PendingStateManager implements IDisposable {
// Empty batches became empty on Resubmit, and submit them and track them in case
// a different fork of this container also submitted the same batch (and it may not be empty for that fork).
const firstMessage = batch.messages.length > 0 ? batch.messages[0] : undefined;

if (pendingMessage.batchIdContext.batchStartCsn !== batch.batchStartCsn) {
const expectedPendingBatchLength = batch.messages.length === 0 ? 1 : batch.messages.length;

// We expect the incoming batch to be of the same length, starting at the same clientSequenceNumber,
// as the batch we originally submitted.
// We have another later check to compare the message contents, which we'd expect to fail if this check does,
// so we don't throw here, merely log. In a later release this check may replace that one.
if (
pendingMessage.batchInfo.batchStartCsn !== batch.batchStartCsn ||
(pendingMessage.batchInfo.length >= 0 && // -1 length is back compat and isn't suitable for this check
pendingMessage.batchInfo.length !== expectedPendingBatchLength)
) {
this.logger?.sendErrorEvent({
eventName: "BatchIdOrCsnMismatch",
eventName: "BatchInfoMismatch",
details: {
pendingBatchCsn: pendingMessage.batchIdContext.batchStartCsn,
pendingBatchCsn: pendingMessage.batchInfo.batchStartCsn,
batchStartCsn: batch.batchStartCsn,
inboundBatchIdComputed: batch.batchId === undefined,
messageBatchMetadata: firstMessage && (firstMessage.metadata as any)?.batch,
pendingMessageBatchMetadata: (pendingMessage.opMetadata as any)?.batch,
emptyBatch: firstMessage === undefined,
pendingBatchLength: pendingMessage.batchInfo.length,
batchLength: batch.messages.length,
pendingMessageBatchMetadata: asBatchMetadata(pendingMessage.opMetadata)?.batch,
messageBatchMetadata: asBatchMetadata(firstMessage?.metadata)?.batch,
},
messageDetails: firstMessage && extractSafePropertiesFromMessage(firstMessage),
});
Expand Down Expand Up @@ -594,13 +602,13 @@ export class PendingStateManager implements IDisposable {
}
}

/** For back-compat if trying to apply stashed ops that pre-date batchIdContext */
function patchBatchIdContext(
/** For back-compat if trying to apply stashed ops that pre-date batchInfo */
function patchbatchInfo(
message: IPendingMessageFromStash,
): asserts message is IPendingMessage {
const batchIdContext: IPendingMessageFromStash["batchIdContext"] = message.batchIdContext;
if (batchIdContext === undefined) {
const batchInfo: IPendingMessageFromStash["batchInfo"] = message.batchInfo;
if (batchInfo === undefined) {
// Using uuid guarantees uniqueness, retaining existing behavior
message.batchIdContext = { clientId: uuid(), batchStartCsn: -1 };
message.batchInfo = { clientId: uuid(), batchStartCsn: -1, length: -1 };
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ describe("Runtime", () => {
referenceSequenceNumber: 0,
localOpMetadata: undefined,
opMetadata: undefined,
batchIdContext: { clientId: "CLIENT_ID", batchStartCsn: 1 },
batchInfo: { clientId: "CLIENT_ID", batchStartCsn: 1, length: 5 },
}));
const mockPendingStateManager = new Proxy<PendingStateManager>({} as any, {
get: (_t, p: keyof PendingStateManager, _r) => {
Expand Down Expand Up @@ -1970,7 +1970,7 @@ describe("Runtime", () => {
referenceSequenceNumber: 0,
localOpMetadata: undefined,
opMetadata: undefined,
batchIdContext: { clientId: "CLIENT_ID", batchStartCsn: 1 },
batchInfo: { clientId: "CLIENT_ID", batchStartCsn: 1, length: 5 },
}));
const mockPendingStateManager = new Proxy<PendingStateManager>({} as any, {
get: (_t, p: keyof PendingStateManager, _r) => {
Expand Down Expand Up @@ -2039,7 +2039,7 @@ describe("Runtime", () => {
referenceSequenceNumber: 0,
localOpMetadata: undefined,
opMetadata: undefined,
batchIdContext: { clientId: "CLIENT_ID", batchStartCsn: 1 },
batchInfo: { clientId: "CLIENT_ID", batchStartCsn: 1, length: 5 },
}));
const mockPendingStateManager = new Proxy<PendingStateManager>({} as any, {
get: (_t, p: keyof PendingStateManager, _r) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
type BatchMessage,
ensureContentsDeserialized,
type IBatch,
type InboundBatch,
OpCompressor,
OpDecompressor,
OpGroupingManager,
Expand Down Expand Up @@ -159,10 +160,13 @@ describe("RemoteMessageProcessor", () => {
outboundMessages.push(...batch.messages);

const messageProcessor = getMessageProcessor();
const actual: ISequencedDocumentMessage[] = [];
let actualBatch: InboundBatch | undefined;
let seqNum = 1;
let actualBatchStartCsn: number | undefined;
for (const message of outboundMessages) {
assert(
actualBatch === undefined,
"actualBatch only should be set when we're done looping",
);
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const inboundMessage = {
type: MessageType.Operation,
Expand All @@ -175,23 +179,8 @@ describe("RemoteMessageProcessor", () => {
} as ISequencedDocumentMessage;

ensureContentsDeserialized(inboundMessage, true, () => {});
const processResult = messageProcessor.process(inboundMessage, () => {});

// It'll be undefined for the first n-1 chunks if chunking is enabled
if (processResult === undefined) {
continue;
}

actual.push(...processResult.messages);

if (actualBatchStartCsn === undefined) {
actualBatchStartCsn = processResult.batchStartCsn;
} else {
assert(
actualBatchStartCsn === processResult.batchStartCsn,
"batchStartCsn shouldn't change while processing a single batch",
);
}
// actualBatch will remain undefined every time except the last time through the loop
actualBatch = messageProcessor.process(inboundMessage, () => {});
}

const expected = option.grouping
Expand All @@ -210,8 +199,12 @@ describe("RemoteMessageProcessor", () => {
getProcessedMessage("e", startSeqNum, startSeqNum, false),
];

assert.deepStrictEqual(actual, expected, "unexpected output");
assert.equal(actualBatchStartCsn, leadingChunkCount + 1, "unexpected batchStartCsn");
assert.deepStrictEqual(actualBatch?.messages, expected, "unexpected output");
assert.equal(
actualBatch?.batchStartCsn,
leadingChunkCount + 1,
"unexpected batchStartCsn",
);
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,15 +640,15 @@ describe("Pending State Manager", () => {
referenceSequenceNumber: 10,
localOpMetadata: undefined,
opMetadata: undefined,
batchIdContext: { clientId: "CLIENT_ID", batchStartCsn: 1 },
batchInfo: { clientId: "CLIENT_ID", batchStartCsn: 1, length: 1 },
},
{
type: "message",
content: '{"type": "component", "contents": {"prop1": "value"}}',
referenceSequenceNumber: 11,
localOpMetadata: undefined,
opMetadata: undefined,
batchIdContext: { clientId: "CLIENT_ID", batchStartCsn: 2 },
batchInfo: { clientId: "CLIENT_ID", batchStartCsn: 2, length: 1 },
},
];

Expand Down Expand Up @@ -678,7 +678,7 @@ describe("Pending State Manager", () => {
referenceSequenceNumber: 10,
opMetadata: undefined,
localOpMetadata: { emptyBatch: true },
batchIdContext: { clientId: "CLIENT_ID", batchStartCsn: 1 },
batchInfo: { clientId: "CLIENT_ID", batchStartCsn: 1, length: 1 },
},
];

Expand Down Expand Up @@ -815,31 +815,31 @@ describe("Pending State Manager", () => {
referenceSequenceNumber: 10,
localOpMetadata: undefined,
opMetadata: undefined,
batchIdContext: { clientId: "CLIENT_ID", batchStartCsn: 1 },
batchInfo: { clientId: "CLIENT_ID", batchStartCsn: 1, length: 1 },
},
{
type: "message",
content: '{"type": "component", "contents": {"prop1": "value"}}',
referenceSequenceNumber: 11,
localOpMetadata: undefined,
opMetadata: undefined,
batchIdContext: { clientId: "CLIENT_ID", batchStartCsn: 2 },
batchInfo: { clientId: "CLIENT_ID", batchStartCsn: 2, length: 1 },
},
{
type: "message",
content: '{"type": "component", "contents": {"prop2": "value"}}',
referenceSequenceNumber: 12,
localOpMetadata: undefined,
opMetadata: undefined,
batchIdContext: { clientId: "CLIENT_ID", batchStartCsn: 3 },
batchInfo: { clientId: "CLIENT_ID", batchStartCsn: 3, length: 1 },
},
{
type: "message",
content: '{"type": "component", "contents": {"prop3": "value"}}',
referenceSequenceNumber: 12,
localOpMetadata: undefined,
opMetadata: undefined,
batchIdContext: { clientId: "CLIENT_ID", batchStartCsn: 3 },
batchInfo: { clientId: "CLIENT_ID", batchStartCsn: 4, length: 1 },
},
];

Expand Down

0 comments on commit 83c8b37

Please sign in to comment.