Skip to content

Commit

Permalink
Improve logging in DeltaScheduler (microsoft#9788)
Browse files Browse the repository at this point in the history
* Improve logging in DeltaScheduler
  • Loading branch information
NicholasCouri committed Apr 11, 2022
1 parent d53ff4a commit 7a281b8
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 45 deletions.
6 changes: 3 additions & 3 deletions api-report/container-runtime.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ export interface ContainerRuntimeMessage {
export class DeltaScheduler {
constructor(deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>, logger: ITelemetryLogger);
// (undocumented)
batchBegin(): void;
batchBegin(message: ISequencedDocumentMessage): void;
// (undocumented)
batchEnd(): void;
batchEnd(message: ISequencedDocumentMessage): void;
// (undocumented)
static readonly processingTime = 20;
static readonly processingTime = 50;
}

// @public (undocumented)
Expand Down
6 changes: 3 additions & 3 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ export class ScheduleManager {

// This could be the beginning of a new batch or an individual message.
this.emitter.emit("batchBegin", message);
this.deltaScheduler.batchBegin();
this.deltaScheduler.batchBegin(message);

const batch = (message?.metadata as IRuntimeMessageMetadata)?.batch;
if (batch) {
Expand All @@ -657,7 +657,7 @@ export class ScheduleManager {
this.hitError = true;
this.batchClientId = undefined;
this.emitter.emit("batchEnd", error, message);
this.deltaScheduler.batchEnd();
this.deltaScheduler.batchEnd(message);
return;
}

Expand All @@ -667,7 +667,7 @@ export class ScheduleManager {
if (this.batchClientId === undefined || batch === false) {
this.batchClientId = undefined;
this.emitter.emit("batchEnd", undefined, message);
this.deltaScheduler.batchEnd();
this.deltaScheduler.batchEnd(message);
return;
}
}
Expand Down
104 changes: 65 additions & 39 deletions packages/runtime/container-runtime/src/deltaScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import {
ISequencedDocumentMessage,
} from "@fluidframework/protocol-definitions";

import {
TelemetryLogger,
} from "@fluidframework/telemetry-utils";
/**
* DeltaScheduler is responsible for the scheduling of inbound delta queue in cases where there
* is more than one op a particular run of the queue. It does not schedule if there is just one
Expand All @@ -25,28 +28,27 @@ import {
export class DeltaScheduler {
private readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>;
// The time for processing ops in a single turn.
public static readonly processingTime = 20;
public static readonly processingTime = 50;

// The increase in time for processing ops after each turn.
private readonly processingTimeIncrement = 10;

private processingStartTime: number | undefined;
private totalProcessingTime: number = DeltaScheduler.processingTime;

// This keeps track of whether the delta scheduler is scheduling a particular run of the
// the inbound delta queue. Basically, every time the delta queue starts processing with
// more than one op, this will be set to true until the run completes.
private isScheduling: boolean = false;
private currentAllowedProcessingTimeForTurn: number = DeltaScheduler.processingTime;

// This keeps track of the number of times inbound queue has been scheduled. After a particular
// count, we log telemetry for the number of ops processed, the time and number of turns it took
// to process the ops.
private schedulingCount: number = 0;

private schedulingLog: {
numberOfOps: number;
opsRemainingToProcess: number;
totalProcessingTime: number;
numberOfTurns: number;
numberOfBatchesProcessed: number;
lastSequenceNumber: number;
firstSequenceNumber: number;
startTime: number;
} | undefined;

constructor(
Expand All @@ -57,50 +59,72 @@ export class DeltaScheduler {
this.deltaManager.inbound.on("idle", () => { this.inboundQueueIdle(); });
}

public batchBegin() {
public batchBegin(message: ISequencedDocumentMessage) {
if (!this.processingStartTime) {
this.processingStartTime = performance.now();
}
if (this.schedulingLog === undefined && this.schedulingCount % 500 === 0) {
// Every 2000th time we are scheduling the inbound queue, we log telemetry for the
// number of ops processed, the time and number of turns it took to process the ops.
this.schedulingLog = {
opsRemainingToProcess: 0,
numberOfTurns: 1,
totalProcessingTime: 0,
numberOfBatchesProcessed: 0,
firstSequenceNumber: message.sequenceNumber,
lastSequenceNumber: message.sequenceNumber,
startTime: performance.now(),
};
}
}

public batchEnd() {
if (this.shouldRunScheduler()) {
if (!this.isScheduling) {
this.isScheduling = true;
// Every 2000th time we are scheduling the inbound queue, we log telemetry for the
// number of ops processed, the time and number of turns it took to process the ops.
if (this.schedulingCount % 2000 === 0) {
this.schedulingLog = {
numberOfOps: this.deltaManager.inbound.length,
numberOfTurns: 1,
totalProcessingTime: 0,
};
}
}
public batchEnd(message: ISequencedDocumentMessage) {
if (this.schedulingLog) {
this.schedulingLog.numberOfBatchesProcessed++;
this.schedulingLog.lastSequenceNumber = message.sequenceNumber;
this.schedulingLog.opsRemainingToProcess = this.deltaManager.inbound.length;
}

if (this.shouldRunScheduler()) {
const currentTime = performance.now();
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const elapsedTime = performance.now() - this.processingStartTime!;
if (elapsedTime > this.totalProcessingTime) {
const elapsedTime = currentTime - this.processingStartTime!;
if (elapsedTime > this.currentAllowedProcessingTimeForTurn) {
// We have processed ops for more than the total processing time. So, pause the
// queue, yield the thread and schedule a resume.

// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.deltaManager.inbound.pause();
setTimeout(() => {
this.deltaManager.inbound.resume();
});

this.processingStartTime = undefined;
// Increase the total processing time. Keep doing this after each turn until all the ops have
// Increase the total processing time. Keep doing this after each turn until all the ops have
// been processed. This way we keep the responsiveness at the beginning while also making sure
// that all the ops process fairly quickly.
this.totalProcessingTime += this.processingTimeIncrement;
this.currentAllowedProcessingTimeForTurn += this.processingTimeIncrement;

// If we are logging the telemetry this time, update the telemetry log object.
if (this.schedulingLog) {
this.schedulingLog.numberOfTurns++;
this.schedulingLog.totalProcessingTime += elapsedTime;
}

setTimeout(() => {
if (this.schedulingLog) {
this.logger.sendTelemetryEvent({
eventName: "InboundOpsPartialProcessingTime",
duration: TelemetryLogger.formatTick(elapsedTime),
opsProcessed: this.schedulingLog.lastSequenceNumber -
this.schedulingLog.firstSequenceNumber + 1,
opsRemainingToProcess: this.deltaManager.inbound.length,
processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime),
numberOfTurns: this.schedulingLog.numberOfTurns,
batchesProcessed: this.schedulingLog.numberOfBatchesProcessed,
timeToResume: TelemetryLogger.formatTick(performance.now() - currentTime),
});
}
this.deltaManager.inbound.resume();
});

this.processingStartTime = undefined;
}
}
}
Expand All @@ -109,29 +133,31 @@ export class DeltaScheduler {
if (this.schedulingLog) {
// Add the time taken for processing the final ops to the total processing time in the
// telemetry log object.
const currentTime = performance.now();
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.schedulingLog.totalProcessingTime += performance.now() - this.processingStartTime!;
this.schedulingLog.totalProcessingTime += currentTime - this.processingStartTime!;

this.logger.sendTelemetryEvent({
eventName: "InboundOpsProcessingTime",
numberOfOps: this.schedulingLog.numberOfOps,
opsRemainingToProcess: this.schedulingLog.opsRemainingToProcess,
numberOfTurns: this.schedulingLog.numberOfTurns,
processingTime: this.schedulingLog.totalProcessingTime,
processingTime: TelemetryLogger.formatTick(this.schedulingLog.totalProcessingTime),
opsProcessed: this.schedulingLog.lastSequenceNumber - this.schedulingLog.firstSequenceNumber + 1,
batchesProcessed: this.schedulingLog.numberOfBatchesProcessed,
duration: TelemetryLogger.formatTick(currentTime - this.schedulingLog.startTime),
schedulingCount: this.schedulingCount,
});

this.schedulingLog = undefined;
}

// If we scheduled this batch of the inbound queue, increment the counter that tracks the
// number of times we have done this.
if (this.isScheduling) {
this.isScheduling = false;
this.schedulingCount++;
}
this.schedulingCount++;

// Reset the processing times.
this.processingStartTime = undefined;
this.totalProcessingTime = DeltaScheduler.processingTime;
this.currentAllowedProcessingTimeForTurn = DeltaScheduler.processingTime;
}

/**
Expand Down

0 comments on commit 7a281b8

Please sign in to comment.