Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert attributor to IFluidDataStoreChannel #22120

Merged
merged 12 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
pr sugg
  • Loading branch information
Jatin Garg authored and Jatin Garg committed Aug 14, 2024
commit 425734b9e881383aeaa9a70cdcbbdccc0a1280d4
8 changes: 2 additions & 6 deletions packages/framework/attributor/src/mixinAttributor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import {
enableOnNewFileKey,
type IRuntimeAttributor,
} from "./attributorContracts.js";
import type { RuntimeAttributorDataStoreChannel } from "./runtimeAttributorDataStoreChannel.js";
import { RuntimeAttributorFactory } from "./runtimeAttributorDataStoreFactory.js";

/**
Expand All @@ -34,9 +33,7 @@ export async function getRuntimeAttributor(
runtime: IContainerRuntime,
): Promise<IRuntimeAttributor | undefined> {
const entryPoint = await runtime.getAliasedDataStoreEntryPoint(attributorDataStoreAlias);
const runtimeAttributor = (await entryPoint?.get()) as
| RuntimeAttributorDataStoreChannel
| undefined;
const runtimeAttributor = (await entryPoint?.get()) as IRuntimeAttributor | undefined;
return runtimeAttributor;
}

Expand Down Expand Up @@ -112,8 +109,7 @@ export const mixinAttributor = (
const datastore = await runtime.createDataStore(RuntimeAttributorFactory.type);
const result = await datastore.trySetAlias(attributorDataStoreAlias);
assert(result === "Success", "Failed to set alias for attributor data store");
runtimeAttributor =
(await datastore.entryPoint.get()) as RuntimeAttributorDataStoreChannel;
runtimeAttributor = (await datastore.entryPoint.get()) as IRuntimeAttributor;
assert(runtimeAttributor !== undefined, "Attributor should be defined");
}
}
Expand Down
111 changes: 111 additions & 0 deletions packages/framework/attributor/src/runtimeAttributor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/

import { bufferToString } from "@fluid-internal/client-utils";
import { IDeltaManager } from "@fluidframework/container-definitions/internal";
import { assert, unreachableCase } from "@fluidframework/core-utils/internal";
import {
IDocumentMessage,
type ISnapshotTree,
ISequencedDocumentMessage,
IQuorumClients,
} from "@fluidframework/driver-definitions/internal";
import {
type AttributionInfo,
type AttributionKey,
type ISummaryTreeWithStats,
} from "@fluidframework/runtime-definitions/internal";
import { SummaryTreeBuilder } from "@fluidframework/runtime-utils/internal";

import { OpStreamAttributor, type IAttributor } from "./attributor.js";
import { opBlobName, type IRuntimeAttributor } from "./attributorContracts.js";
import { AttributorSerializer, chain, deltaEncoder, type Encoder } from "./encoders.js";
import { makeLZ4Encoder } from "./lz4Encoder.js";

export class RuntimeAttributor implements IRuntimeAttributor {
public get IRuntimeAttributor(): IRuntimeAttributor {
return this;
}

public get(key: AttributionKey): AttributionInfo {
assert(
this.opAttributor !== undefined,
0x509 /* RuntimeAttributor must be initialized before getAttributionInfo can be called */,
);

if (key.type === "detached") {
throw new Error("Attribution of detached keys is not yet supported.");
}

if (key.type === "local") {
// Note: we can *almost* orchestrate this correctly with internal-only changes by looking up the current
// client id in the audience. However, for read->write client transition, the container might have not yet
// received a client id. This is left as a TODO as it might be more easily solved once the detached case
// is settled (e.g. if it's reasonable for the host to know the current user information at container
// creation time, we could just use that here as well).
throw new Error("Attribution of local keys is not yet supported.");
}

return this.opAttributor.getAttributionInfo(key.seq);
}

public has(key: AttributionKey): boolean {
if (key.type === "detached") {
return false;
}

if (key.type === "local") {
return false;
}

return this.opAttributor?.tryGetAttributionInfo(key.seq) !== undefined;
}

private encoder: Encoder<IAttributor, string> = {
encode: unreachableCase,
decode: unreachableCase,
};

private opAttributor: IAttributor | undefined;
public isEnabled = false;

public async initialize(
deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>,
quorum: IQuorumClients,
baseSnapshotForAttributorTree: ISnapshotTree | undefined,
readBlob: (id: string) => Promise<ArrayBufferLike>,
): Promise<void> {
this.encoder = chain(
new AttributorSerializer(
(entries) => new OpStreamAttributor(deltaManager, quorum, entries),
deltaEncoder,
),
makeLZ4Encoder(),
);

if (baseSnapshotForAttributorTree === undefined) {
this.opAttributor = new OpStreamAttributor(deltaManager, quorum);
} else {
const id = baseSnapshotForAttributorTree.blobs[opBlobName];
assert(
id !== undefined,
0x50a /* Attributor tree should have op attributor summary blob. */,
);
const blobContents = await readBlob(id);
const attributorSnapshot = bufferToString(blobContents, "utf8");
this.opAttributor = this.encoder.decode(attributorSnapshot);
}
}

public summarizeOpAttributor(): ISummaryTreeWithStats {
assert(
this.opAttributor !== undefined,
"RuntimeAttributor should be initialized before summarization",
);
const builder = new SummaryTreeBuilder();
builder.addBlob(opBlobName, this.encoder.encode(this.opAttributor));
return builder.getSummaryTree();
}
}
104 changes: 18 additions & 86 deletions packages/framework/attributor/src/runtimeAttributorDataStoreChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Licensed under the MIT License.
*/

import { bufferToString, TypedEventEmitter } from "@fluid-internal/client-utils";
import { TypedEventEmitter } from "@fluid-internal/client-utils";
import { AttachState, IDeltaManager } from "@fluidframework/container-definitions/internal";
import {
FluidObject,
Expand All @@ -29,34 +29,33 @@ import {
VisibilityState,
type ISummaryTreeWithStats,
type ITelemetryContext,
type AttributionInfo,
type AttributionKey,
} from "@fluidframework/runtime-definitions/internal";
import { SummaryTreeBuilder } from "@fluidframework/runtime-utils/internal";
import {
ITelemetryLoggerExt,
MonitoringContext,
raiseConnectedEvent,
createChildMonitoringContext,
} from "@fluidframework/telemetry-utils/internal";

import { OpStreamAttributor, type IAttributor } from "./attributor.js";
import { opBlobName, type IRuntimeAttributor } from "./attributorContracts.js";
import { AttributorSerializer, chain, deltaEncoder, type Encoder } from "./encoders.js";
import { makeLZ4Encoder } from "./lz4Encoder.js";
import {
type IProvideRuntimeAttributor,
type IRuntimeAttributor,
} from "./attributorContracts.js";
import { RuntimeAttributor } from "./runtimeAttributor.js";

/**
* Data store channel for the runtime attributor. This channel is responsible for storing and managing the
*/
export class RuntimeAttributorDataStoreChannel
extends TypedEventEmitter<IFluidDataStoreRuntimeEvents>
implements IFluidDataStoreChannel, IRuntimeAttributor
implements IFluidDataStoreChannel, IProvideRuntimeAttributor
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
{
public constructor(
public readonly dataStoreContext: IFluidDataStoreContext,
existing: boolean,
) {
super();
this.runtimeAttributor = new RuntimeAttributor();
this.mc = createChildMonitoringContext({
logger: dataStoreContext.baseLogger,
namespace: "Attributor",
Expand All @@ -75,7 +74,7 @@ export class RuntimeAttributorDataStoreChannel
this.deferredAttached.resolve();
}
this.entryPoint = new FluidObjectHandle<FluidObject>(
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
this,
this.runtimeAttributor,
"",
dataStoreContext.IFluidHandleContext,
);
Expand All @@ -86,48 +85,9 @@ export class RuntimeAttributorDataStoreChannel
}

public get IRuntimeAttributor(): IRuntimeAttributor {
return this;
}

public get(key: AttributionKey): AttributionInfo {
assert(
this.opAttributor !== undefined,
0x509 /* RuntimeAttributor must be initialized before getAttributionInfo can be called */,
);

if (key.type === "detached") {
throw new Error("Attribution of detached keys is not yet supported.");
}

if (key.type === "local") {
// Note: we can *almost* orchestrate this correctly with internal-only changes by looking up the current
// client id in the audience. However, for read->write client transition, the container might have not yet
// received a client id. This is left as a TODO as it might be more easily solved once the detached case
// is settled (e.g. if it's reasonable for the host to know the current user information at container
// creation time, we could just use that here as well).
throw new Error("Attribution of local keys is not yet supported.");
}

return this.opAttributor.getAttributionInfo(key.seq);
return this.runtimeAttributor;
}

public has(key: AttributionKey): boolean {
if (key.type === "detached") {
return false;
}

if (key.type === "local") {
return false;
}

return this.opAttributor?.tryGetAttributionInfo(key.seq) !== undefined;
}

private encoder: Encoder<IAttributor, string> = {
encode: unreachableCase,
decode: unreachableCase,
};

private _disposed = false;
public get disposed(): boolean {
return this._disposed;
Expand All @@ -137,7 +97,7 @@ export class RuntimeAttributorDataStoreChannel
this._disposed = true;
}

private opAttributor: IAttributor | undefined;
private readonly runtimeAttributor: RuntimeAttributor;
public isEnabled = true;
public _attachState: AttachState;
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
public visibilityState: VisibilityState;
Expand All @@ -153,26 +113,12 @@ export class RuntimeAttributorDataStoreChannel
baseSnapshotForAttributorTree: ISnapshotTree | undefined,
readBlob: (id: string) => Promise<ArrayBufferLike>,
): Promise<void> {
this.encoder = chain(
new AttributorSerializer(
(entries) => new OpStreamAttributor(deltaManager, quorum, entries),
deltaEncoder,
),
makeLZ4Encoder(),
await this.runtimeAttributor.initialize(
deltaManager,
quorum,
baseSnapshotForAttributorTree,
readBlob,
);

if (baseSnapshotForAttributorTree === undefined) {
this.opAttributor = new OpStreamAttributor(deltaManager, quorum);
} else {
const id = baseSnapshotForAttributorTree.blobs[opBlobName];
assert(
id !== undefined,
0x50a /* Attributor tree should have op attributor summary blob. */,
);
const blobContents = await readBlob(id);
const attributorSnapshot = bufferToString(blobContents, "utf8");
this.opAttributor = this.encoder.decode(attributorSnapshot);
}
}

public attachGraph(): void {
Expand All @@ -199,13 +145,7 @@ export class RuntimeAttributorDataStoreChannel
* {@inheritdoc IFluidDataStoreChannel.getAttachSummary}
*/
public getAttachSummary(telemetryContext?: ITelemetryContext): ISummaryTreeWithStats {
assert(
this.opAttributor !== undefined,
0x50b /* RuntimeAttributor should be initialized before summarization */,
);
const builder = new SummaryTreeBuilder();
builder.addBlob(opBlobName, this.encoder.encode(this.opAttributor));
return builder.getSummaryTree();
return this.runtimeAttributor.summarizeOpAttributor();
}

/**
Expand Down Expand Up @@ -241,13 +181,7 @@ export class RuntimeAttributorDataStoreChannel
trackState?: boolean,
telemetryContext?: ITelemetryContext,
): Promise<ISummaryTreeWithStats> {
assert(
this.opAttributor !== undefined,
"RuntimeAttributor should be initialized before summarization",
);
const builder = new SummaryTreeBuilder();
builder.addBlob(opBlobName, this.encoder.encode(this.opAttributor));
return builder.getSummaryTree();
return this.runtimeAttributor.summarizeOpAttributor();
}

/**
Expand Down Expand Up @@ -320,8 +254,6 @@ export class RuntimeAttributorDataStoreChannel
public setAttachState(attachState: AttachState.Attaching | AttachState.Attached): void {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish you didn't have to write this code here... I know this area has been bug-prone before, and I'm not sure anyone will know to come fix any issues here if they are discovered in the similar code in the main datastore runtime implementation.

No suggestion here, just wanting to point it out.

switch (attachState) {
case AttachState.Attaching: {
// this.attachGraph();

this._attachState = AttachState.Attaching;

assert(
Expand Down
Loading