Skip to content

Commit

Permalink
abstract away document collection (microsoft#14498)
Browse files Browse the repository at this point in the history
## Description

Abstract away `document` collection usage by replace with
`documentRepository`, which would used for customization override

## Breaking Changes


## Reviewer Guidance

Please have a double eye on the refactored code to make sure there is no
wrong changes, thank you for the helps!

---------

Co-authored-by: Xin Zhang <zhangxin@microsoft.com>
  • Loading branch information
zhangxin511 and Xin Zhang committed Mar 23, 2023
1 parent 8fbf79e commit f0be703
Show file tree
Hide file tree
Showing 51 changed files with 485 additions and 219 deletions.
2 changes: 2 additions & 0 deletions server/routerlicious/packages/lambdas/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,15 @@
"@types/mocha": "^10.0.1",
"@types/nconf": "^0.10.2",
"@types/node": "^16.18.16",
"@types/sinon": "^9.0.9",
"@types/uuid": "^8.3.0",
"concurrently": "^7.6.0",
"eslint": "~8.27.0",
"mocha": "^10.2.0",
"nyc": "^15.1.0",
"prettier": "~2.6.2",
"rimraf": "^4.4.0",
"sinon": "^9.2.3",
"source-map-loader": "^0.2.4",
"ts-loader": "^9.3.0",
"typescript": "~4.5.5",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
*/

import {
ICollection,
IDeliState,
IDocument,
IDocumentRepository,
IQueuedMessage,
} from "@fluidframework/server-services-core";
import { CheckpointReason } from "../utils";
Expand Down Expand Up @@ -46,31 +45,23 @@ export interface ICheckpointParams {
export function createDeliCheckpointManagerFromCollection(
tenantId: string,
documentId: string,
collection: ICollection<IDocument>,
documentRepository: IDocumentRepository,
): IDeliCheckpointManager {
const checkpointManager = {
writeCheckpoint: async (checkpoint: IDeliState) => {
return collection.update(
{
documentId,
tenantId,
},
await documentRepository.updateOne(
{ tenantId, documentId },
{
deli: JSON.stringify(checkpoint),
},
null,
);
},
deleteCheckpoint: async () => {
return collection.update(
{
documentId,
tenantId,
},
await documentRepository.updateOne(
{ tenantId, documentId },
{
deli: "",
},
null,
);
},
};
Expand Down
19 changes: 9 additions & 10 deletions server/routerlicious/packages/lambdas/src/deli/lambdaFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import { toUtf8 } from "@fluidframework/common-utils";
import { ICreateCommitParams, ICreateTreeEntry } from "@fluidframework/gitresources";
import {
IClientManager,
ICollection,
IContext,
IDeliState,
IDocument,
IDocumentRepository,
ILogger,
IPartitionLambda,
IPartitionLambdaConfig,
Expand Down Expand Up @@ -60,7 +60,7 @@ const getDefaultCheckpooint = (epoch: number): IDeliState => {
export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaFactory {
constructor(
private readonly operationsDbMongoManager: MongoManager,
private readonly collection: ICollection<IDocument>,
private readonly documentRepository: IDocumentRepository,
private readonly tenantManager: ITenantManager,
private readonly clientManager: IClientManager | undefined,
private readonly forwardProducer: IProducer,
Expand Down Expand Up @@ -100,7 +100,7 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
try {
// Lookup the last sequence number stored
// TODO - is this storage specific to the orderer in place? Or can I generalize the output context?
document = await this.collection.findOne({ documentId, tenantId });
document = await this.documentRepository.readOne({ documentId, tenantId });

// Check if the document was deleted prior.
if (!isDocumentValid(document)) {
Expand Down Expand Up @@ -213,7 +213,7 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
const checkpointManager = createDeliCheckpointManagerFromCollection(
tenantId,
documentId,
this.collection,
this.documentRepository,
);

// Should the lambda reaize that term has flipped to send a no-op message at the beginning?
Expand All @@ -239,13 +239,13 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
closeType === LambdaCloseType.ActivityTimeout ||
closeType === LambdaCloseType.Error
) {
const query = { documentId, tenantId, session: { $exists: true } };
const filter = { documentId, tenantId, session: { $exists: true } };
const data = {
"session.isSessionAlive": false,
"session.isSessionActive": false,
"lastAccessTime": Date.now(),
};
await this.collection.update(query, data, null);
await this.documentRepository.updateOne(filter, data, undefined);
const message = `Marked session alive and active as false for closeType:
${JSON.stringify(closeType)}`;
context.log?.info(message, { messageMetaData });
Expand All @@ -264,14 +264,13 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
context.log?.info(`Deli Lambda is marking session as alive and active as true.`, {
messageMetaData,
});
this.collection
.update(
{ documentId, tenantId },
this.documentRepository
.updateOne(
{ tenantId, documentId },
{
"session.isSessionAlive": true,
"session.isSessionActive": true,
},
null,
)
.catch((error) => {
const errMsg = "Deli Lambda failed to mark session as active.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import { delay } from "@fluidframework/common-utils";
import {
ICollection,
IContext,
IDocument,
isRetryEnabled,
IScribe,
ISequencedOperationMessage,
runWithRetry,
IDeltaService,
IDocumentRepository,
} from "@fluidframework/server-services-core";
import { getLumberBaseProperties, Lumberjack } from "@fluidframework/server-services-telemetry";
import { ICheckpointManager } from "./interfaces";
Expand All @@ -26,7 +26,7 @@ export class CheckpointManager implements ICheckpointManager {
protected readonly context: IContext,
private readonly tenantId: string,
private readonly documentId: string,
private readonly documentCollection: ICollection<IDocument>,
private readonly documentRepository: IDocumentRepository,
private readonly opCollection: ICollection<ISequencedOperationMessage>,
private readonly deltaService: IDeltaService,
private readonly getDeltasViaAlfred: boolean,
Expand Down Expand Up @@ -137,7 +137,7 @@ export class CheckpointManager implements ICheckpointManager {
}

private async writeScribeCheckpointState(checkpoint: IScribe) {
await this.documentCollection.update(
await this.documentRepository.updateOne(
{
documentId: this.documentId,
tenantId: this.tenantId,
Expand All @@ -156,7 +156,7 @@ export class CheckpointManager implements ICheckpointManager {
*/
public async delete(sequenceNumber: number, lte: boolean) {
// Clears the checkpoint information from mongodb.
await this.documentCollection.update(
await this.documentRepository.updateOne(
{
documentId: this.documentId,
tenantId: this.tenantId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
IControlMessage,
IDeltaService,
IDocument,
IDocumentRepository,
ILambdaStartControlMessageContents,
IPartitionLambda,
IPartitionLambdaConfig,
Expand Down Expand Up @@ -63,7 +64,7 @@ const DefaultScribe: IScribe = {
export class ScribeLambdaFactory extends EventEmitter implements IPartitionLambdaFactory {
constructor(
private readonly mongoManager: MongoManager,
private readonly documentCollection: ICollection<IDocument>,
private readonly documentRepository: IDocumentRepository,
private readonly messageCollection: ICollection<ISequencedOperationMessage>,
private readonly producer: IProducer,
private readonly deltaManager: IDeltaService,
Expand Down Expand Up @@ -100,7 +101,7 @@ export class ScribeLambdaFactory extends EventEmitter implements IPartitionLambd
);

try {
document = await this.documentCollection.findOne({ documentId, tenantId });
document = await this.documentRepository.readOne({ documentId, tenantId });

if (!isDocumentValid(document)) {
// Document sessions can be joined (via Alfred) after a document is functionally deleted.
Expand Down Expand Up @@ -251,7 +252,7 @@ export class ScribeLambdaFactory extends EventEmitter implements IPartitionLambd
context,
tenantId,
documentId,
this.documentCollection,
this.documentRepository,
this.messageCollection,
this.deltaManager,
this.getDeltasViaAlfred,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import {
ICheckpointParams,
} from "../../deli/checkpointManager";
import { CheckpointReason } from "../../utils";
import Sinon from "sinon";

describe("Routerlicious", () => {
describe("Deli", () => {
describe("CheckpointContext", () => {
const testId = "test";
const testTenant = "test";
let testCheckpointContext: CheckpointContext;
let testCollection: testUtils.TestCollection;
let testDocumentRepository: testUtils.TestNotImplementedDocumentRepository;
let testContext: testUtils.TestContext;

function createCheckpoint(
Expand Down Expand Up @@ -55,14 +56,12 @@ describe("Routerlicious", () => {

beforeEach(() => {
testContext = new testUtils.TestContext();
testCollection = new testUtils.TestCollection([
{ documentId: testId, tenantId: testTenant },
]);

testDocumentRepository = new testUtils.TestNotImplementedDocumentRepository();
Sinon.replace(testDocumentRepository, "updateOne", Sinon.fake());
const checkpointManager = createDeliCheckpointManagerFromCollection(
testTenant,
testId,
testCollection,
testDocumentRepository,
);
testCheckpointContext = new CheckpointContext(
testTenant,
Expand Down
19 changes: 12 additions & 7 deletions server/routerlicious/packages/lambdas/src/test/deli/lambda.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { MessageType } from "@fluidframework/protocol-definitions";
import { defaultHash, getNextHash } from "@fluidframework/server-services-client";
import {
DefaultServiceConfiguration,
ICollection,
IPartitionLambda,
IProducer,
ISequencedOperationMessage,
Expand All @@ -22,11 +21,13 @@ import {
MessageFactory,
TestContext,
TestDbFactory,
TestNotImplementedDocumentRepository,
TestKafka,
TestTenantManager,
} from "@fluidframework/server-test-utils";
import { strict as assert } from "assert";
import * as _ from "lodash";
import Sinon from "sinon";
import { DeliLambdaFactory } from "../../deli/lambdaFactory";

const MinSequenceNumberWindow = 2000;
Expand All @@ -46,7 +47,6 @@ describe("Routerlicious", () => {
},
];

let testCollection: ICollection<any>;
let testTenantManager: TestTenantManager;
let testKafka: TestKafka;
let testForwardProducer: IProducer;
Expand Down Expand Up @@ -118,8 +118,13 @@ describe("Routerlicious", () => {
beforeEach(async () => {
const dbFactory = new TestDbFactory(_.cloneDeep({ documents: testData }));
const mongoManager = new MongoManager(dbFactory);
const database = await mongoManager.getDatabase();
testCollection = database.collection("documents");
const documentRepository = new TestNotImplementedDocumentRepository();
Sinon.replace(
documentRepository,
"readOne",
Sinon.fake.resolves(_.cloneDeep(testData[0])),
);
Sinon.replace(documentRepository, "updateOne", Sinon.fake.resolves(undefined));

testKafka = new TestKafka();
testForwardProducer = testKafka.createProducer();
Expand All @@ -133,7 +138,7 @@ describe("Routerlicious", () => {

factory = new DeliLambdaFactory(
mongoManager,
testCollection,
documentRepository,
testTenantManager,
undefined,
testForwardProducer,
Expand All @@ -151,7 +156,7 @@ describe("Routerlicious", () => {

factoryWithSignals = new DeliLambdaFactory(
mongoManager,
testCollection,
documentRepository,
testTenantManager,
undefined,
testForwardProducer,
Expand All @@ -172,7 +177,7 @@ describe("Routerlicious", () => {

factoryWithBatching = new DeliLambdaFactory(
mongoManager,
testCollection,
documentRepository,
testTenantManager,
undefined,
testForwardProducer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import { ICreateTreeEntry, ICreateTreeParams, ITree } from "@fluidframework/gitr
import { GitManager } from "@fluidframework/server-services-client";
import {
DefaultServiceConfiguration,
ICollection,
IDocument,
IProducer,
ITenantManager,
MongoManager,
Expand All @@ -21,11 +19,13 @@ import {
TestContext,
TestDbFactory,
TestDeltaManager,
TestNotImplementedDocumentRepository,
TestKafka,
TestTenantManager,
} from "@fluidframework/server-test-utils";
import { strict as assert } from "assert";
import _ from "lodash";
import Sinon from "sinon";
import { ScribeLambda } from "../../scribe/lambda";
import { ScribeLambdaFactory } from "../../scribe/lambdaFactory";

Expand All @@ -37,7 +37,7 @@ describe("Routerlicious", () => {
const testDocumentId = "test";

let testMongoManager: MongoManager;
let testDocumentCollection: ICollection<IDocument>;
let testDocumentRepository: TestNotImplementedDocumentRepository;
let testMessageCollection: TestCollection;
let testProducer: IProducer;
let testContext: TestContext;
Expand Down Expand Up @@ -83,8 +83,13 @@ describe("Routerlicious", () => {
];
const dbFactory = new TestDbFactory(_.cloneDeep({ documents: testData }));
testMongoManager = new MongoManager(dbFactory);
const database = await testMongoManager.getDatabase();
testDocumentCollection = database.collection("documents");
testDocumentRepository = new TestNotImplementedDocumentRepository();
Sinon.replace(
testDocumentRepository,
"readOne",
Sinon.fake.resolves(_.cloneDeep(testData[0])),
);
Sinon.replace(testDocumentRepository, "updateOne", Sinon.fake.resolves(undefined));
testMessageCollection = new TestCollection([]);
testKafka = new TestKafka();
testProducer = testKafka.createProducer();
Expand All @@ -103,7 +108,7 @@ describe("Routerlicious", () => {

let factory = new ScribeLambdaFactory(
testMongoManager,
testDocumentCollection,
testDocumentRepository,
testMessageCollection,
testProducer,
testDeltaManager,
Expand Down
2 changes: 2 additions & 0 deletions server/routerlicious/packages/local-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"@types/mocha": "^10.0.1",
"@types/nock": "^9.3.0",
"@types/node": "^16.18.16",
"@types/sinon": "^9.0.9",
"concurrently": "^7.6.0",
"copyfiles": "^2.4.1",
"eslint": "~8.27.0",
Expand All @@ -86,6 +87,7 @@
"nyc": "^15.1.0",
"prettier": "~2.6.2",
"rimraf": "^4.4.0",
"sinon": "^9.2.3",
"ts-loader": "^9.3.0",
"typescript": "~4.5.5",
"webpack": "^5.76.2",
Expand Down
Loading

0 comments on commit f0be703

Please sign in to comment.