diff --git a/api-report/telemetry-utils.api.md b/api-report/telemetry-utils.api.md index cb694cf4851f..ecca97695bfe 100644 --- a/api-report/telemetry-utils.api.md +++ b/api-report/telemetry-utils.api.md @@ -247,8 +247,8 @@ export class TelemetryUTLogger implements ITelemetryLogger { // @public export class ThresholdCounter { constructor(threshold: number, logger: ITelemetryLogger, thresholdMultiple?: number); - send(eventName: string, value: number): void; - sendIfMultiple(eventName: string, value: number): void; + send(eventName: string, value: number, metadata?: any): boolean; + sendIfMultiple(eventName: string, value: number, metadata?: any): boolean; } // @public diff --git a/packages/drivers/driver-base/.eslintrc.js b/packages/drivers/driver-base/.eslintrc.js index 056233f255da..0a69573e8709 100644 --- a/packages/drivers/driver-base/.eslintrc.js +++ b/packages/drivers/driver-base/.eslintrc.js @@ -7,6 +7,9 @@ module.exports = { "extends": [ "@fluidframework/eslint-config-fluid/eslint7" ], + "parserOptions": { + "project": ["./tsconfig.json", "./src/test/tsconfig.json"] + }, "rules": { "@typescript-eslint/strict-boolean-expressions": "off" } diff --git a/packages/drivers/driver-base/package.json b/packages/drivers/driver-base/package.json index f446e96b4adc..7a31ed05cf62 100644 --- a/packages/drivers/driver-base/package.json +++ b/packages/drivers/driver-base/package.json @@ -12,18 +12,24 @@ "types": "dist/index.d.ts", "scripts": { "build": "npm run build:genver && concurrently npm:build:compile npm:lint && npm run build:docs", - "build:compile": "concurrently npm:tsc npm:build:esnext", + "build:commonjs": "npm run tsc && npm run build:test", + "build:compile": "concurrently npm:build:commonjs npm:build:esnext", "build:docs": "api-extractor run --local --typescript-compiler-folder ../../../node_modules/typescript && copyfiles -u 1 ./_api-extractor-temp/doc-models/* ../../../_api-extractor-temp/", "build:esnext": "tsc --project ./tsconfig.esnext.json", "build:full": "npm run build", "build:full:compile": "npm run build:compile", "build:genver": "gen-version", + "build:test": "tsc --project ./src/test/tsconfig.json", "ci:build:docs": "api-extractor run --typescript-compiler-folder ../../../node_modules/typescript && copyfiles -u 1 ./_api-extractor-temp/* ../../../_api-extractor-temp/", "clean": "rimraf dist lib *.tsbuildinfo *.build.log", "eslint": "eslint --format stylish src", "eslint:fix": "eslint --format stylish src --fix", "lint": "npm run eslint", "lint:fix": "npm run eslint:fix", + "test": "npm run test:mocha", + "test:coverage": "nyc npm test -- --reporter xunit --reporter-option output=nyc/junit-report.xml", + "test:mocha": "mocha --recursive dist/test -r node_modules/@fluidframework/mocha-test-setup --unhandled-rejections=strict", + "test:mocha:verbose": "cross-env FLUID_TEST_VERBOSE=1 npm run test:mocha", "tsc": "tsc", "tsfmt": "tsfmt --verify", "tsfmt:fix": "tsfmt --replace" @@ -39,7 +45,9 @@ "devDependencies": { "@fluidframework/build-common": "^0.23.0", "@fluidframework/eslint-config-fluid": "^0.24.0", + "@fluidframework/mocha-test-setup": "^0.54.0", "@microsoft/api-extractor": "^7.16.1", + "@types/mocha": "^8.2.2", "@types/node": "^12.19.0", "@types/socket.io-client": "^1.4.32", "@typescript-eslint/eslint-plugin": "~4.14.0", @@ -53,6 +61,8 @@ "eslint-plugin-prefer-arrow": "~1.2.2", "eslint-plugin-react": "~7.22.0", "eslint-plugin-unicorn": "~26.0.1", + "mocha": "^8.4.0", + "nyc": "^15.0.0", "rimraf": "^2.6.2", "typescript": "~4.1.3", "typescript-formatter": "7.1.0" diff --git a/packages/drivers/driver-base/src/documentDeltaConnection.ts b/packages/drivers/driver-base/src/documentDeltaConnection.ts index ce9167e191ab..eaabe3923ff4 100644 --- a/packages/drivers/driver-base/src/documentDeltaConnection.ts +++ b/packages/drivers/driver-base/src/documentDeltaConnection.ts @@ -9,7 +9,7 @@ import { IDocumentDeltaConnectionEvents, DriverError, } from "@fluidframework/driver-definitions"; -import { createGenericNetworkError } from "@fluidframework/driver-utils"; +import { createGenericNetworkError, createWriteError } from "@fluidframework/driver-utils"; import { ConnectionMode, IClientConfiguration, @@ -24,9 +24,12 @@ import { } from "@fluidframework/protocol-definitions"; import { IDisposable, ITelemetryLogger } from "@fluidframework/common-definitions"; import { ChildLogger } from "@fluidframework/telemetry-utils"; +import { MessageSizeValidator } from "./messageSizeValidator"; // Local storage key to disable the BatchManager const batchManagerDisabledKey = "FluidDisableBatchManager"; +// Local storage key to enable message size tracking +const limitPayloadSizeKey = "FluidEnablePayloadSizeLimit"; // See #8129. // Need to move to common-utils (tracked as #8165) @@ -105,6 +108,10 @@ export class DocumentDeltaConnection protected _disposed: boolean = false; protected readonly logger: ITelemetryLogger; protected readonly isBatchManagerDisabled: boolean = false; + private messageSizeValidator: MessageSizeValidator | undefined; + // Close to 1MB with a small buffer to allow for redundant string escaping during transfer + private readonly maxPayloadSizeInBytes = 900000; + private readonly shouldLimitPayloadSize: boolean = false; public get details(): IConnected { if (!this._details) { @@ -157,7 +164,8 @@ export class DocumentDeltaConnection } }); - this.isBatchManagerDisabled = DocumentDeltaConnection.disabledBatchManagerFeatureGate; + this.isBatchManagerDisabled = DocumentDeltaConnection.booleanFeature(batchManagerDisabledKey); + this.shouldLimitPayloadSize = DocumentDeltaConnection.booleanFeature(limitPayloadSizeKey); } /** @@ -280,6 +288,11 @@ export class DocumentDeltaConnection } protected emitMessages(type: string, messages: IDocumentMessage[][]) { + if (this.messageSizeValidator && !this.messageSizeValidator.isPayloadValid(messages)) { + this.emit("error", createWriteError("Payload too large")); + return; + } + // Although the implementation here disconnects the socket and does not reuse it, other subclasses // (e.g. OdspDocumentDeltaConnection) may reuse the socket. In these cases, we need to avoid emitting // on the still-live socket. @@ -288,11 +301,11 @@ export class DocumentDeltaConnection } } - private static get disabledBatchManagerFeatureGate() { + private static booleanFeature(key: string): boolean { try { return localStorage !== undefined && typeof localStorage === "object" - && localStorage.getItem(batchManagerDisabledKey) === "1"; + && localStorage.getItem(key) === "1"; } catch (e) { } return false; } @@ -470,6 +483,12 @@ export class DocumentDeltaConnection }, timeout + 2000); }); + if (this.shouldLimitPayloadSize) { + this.messageSizeValidator = new MessageSizeValidator( + this.maxMessageSize + 1, + this.maxPayloadSizeInBytes, + ChildLogger.create(this.logger, "MessageSizeValidator")); + } assert(!this.disposed, 0x246 /* "checking consistency of socket & _disposed flags" */); } diff --git a/packages/drivers/driver-base/src/messageSizeValidator.ts b/packages/drivers/driver-base/src/messageSizeValidator.ts new file mode 100644 index 000000000000..4f23ecdcb079 --- /dev/null +++ b/packages/drivers/driver-base/src/messageSizeValidator.ts @@ -0,0 +1,75 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { ITelemetryLogger } from "@fluidframework/common-definitions"; +import { IDocumentMessage } from "@fluidframework/protocol-definitions"; +import { ThresholdCounter } from "@fluidframework/telemetry-utils"; + +export class MessageSizeValidator { + private readonly payloadSizeCountersWithEvents = [ + // The order here matters, in order to save telemetry quota. + // The first counter to exceed its limit will short-circuit + // event publishing. + { + counter: new ThresholdCounter(this.maxPayloadSizeInBytes, this.logger), + eventName: "OpsPayloadSizeLimitExceeded", + }, + { + counter: new ThresholdCounter(this.maxPayloadSizeInBytes / 2, this.logger), + eventName: "OpsPayloadSize50PcOfMax", + }, + { + counter: new ThresholdCounter(this.maxPayloadSizeInBytes / 4, this.logger), + eventName: "OpsPayloadSize25PcOfMax", + }, + ]; + + private readonly messageSizeCounter = new ThresholdCounter(this.maxMessageSizeInBytes, this.logger); + private readonly messageSizeEvent = "OpSizeLimitExceeded"; + + constructor( + private readonly maxMessageSizeInBytes: number, + private readonly maxPayloadSizeInBytes: number, + private readonly logger: ITelemetryLogger, + ) { + } + + private trackPayload(payloadSizeInBytes: number) { + for (const x of this.payloadSizeCountersWithEvents) { + if (x.counter.send(x.eventName, payloadSizeInBytes, { max: this.maxPayloadSizeInBytes })) { + break; + } + } + } + + private trackMessage(messageSizeInBytes: number) { + this.messageSizeCounter.send(this.messageSizeEvent, messageSizeInBytes, { max: this.maxMessageSizeInBytes }); + } + + public isPayloadValid(messages: IDocumentMessage[][]): boolean { + let payloadSizeInBytes = 0; + let allMessagesUnderLimit = true; + + for (const inner of messages) { + for (const message of inner) { + const messageSize = MessageSizeValidator.sizeInBytes(message); + allMessagesUnderLimit &&= messageSize < this.maxMessageSizeInBytes; + payloadSizeInBytes = payloadSizeInBytes + messageSize; + this.trackMessage(messageSize); + } + } + + this.trackPayload(payloadSizeInBytes); + return allMessagesUnderLimit && payloadSizeInBytes < this.maxPayloadSizeInBytes; + } + + public static sizeInBytes(message: IDocumentMessage): number { + const { contents, ...restOfObject } = message; + // `contents` is already stringified. Re-stringifying the whole message will + // lead to additional escape characters which will increase the size artificially. + return new TextEncoder().encode(message.contents).length + + new TextEncoder().encode(JSON.stringify(restOfObject)).length; + } +} diff --git a/packages/drivers/driver-base/src/test/messageSizeValidator.spec.ts b/packages/drivers/driver-base/src/test/messageSizeValidator.spec.ts new file mode 100644 index 000000000000..3e9cb415feb0 --- /dev/null +++ b/packages/drivers/driver-base/src/test/messageSizeValidator.spec.ts @@ -0,0 +1,102 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "assert"; +import { IDocumentMessage } from "@fluidframework/protocol-definitions"; +import { MockLogger } from "@fluidframework/telemetry-utils"; +import { MessageSizeValidator } from "../messageSizeValidator"; + +const generateStringOfSize = (sizeInBytes: number): string => new Array(sizeInBytes + 1).join("0"); +const generateMessageOfSize = (sizeInBytes: number): IDocumentMessage => { + const envelope = { + clientSequenceNumber: 1, + metadata: { + meta: "data", + other: "data", + }, + referenceSequenceNumber: 0, + type: "test", + }; + + const message = (envelope as IDocumentMessage); + message.contents = generateStringOfSize(sizeInBytes - new TextEncoder().encode(JSON.stringify(message)).length); + return message; +}; + +describe("Message size validation", () => { + let logger: MockLogger; + let validator: MessageSizeValidator; + const maxMessageSizeInBytes = 10 * 1000; + const maxPayloadSizeInBytes = 100 * maxMessageSizeInBytes; + + beforeEach(() => { + logger = new MockLogger(); + validator = new MessageSizeValidator(maxMessageSizeInBytes, maxPayloadSizeInBytes, logger); + }); + + it("Should fail when message size is max", () => { + assert(!validator.isPayloadValid([[generateMessageOfSize(maxMessageSizeInBytes)]])); + assert(logger.matchEvents([{ + eventName: "OpSizeLimitExceeded", + category: "performance", + value: maxMessageSizeInBytes, + max: maxMessageSizeInBytes, + }])); + }); + + it("Should succeed when payload is lower than 25% of max", () => { + assert(validator.isPayloadValid([[generateMessageOfSize(maxMessageSizeInBytes - 1)]])); + assert(logger.matchEvents([])); + }); + + it("Should succeed when payload is lower than 50% of max", async () => { + const size = 26 * (maxMessageSizeInBytes - 1); + assert(validator.isPayloadValid([Array(26).fill(generateMessageOfSize(maxMessageSizeInBytes - 1))])); + assert(logger.matchEvents([{ + eventName: "OpsPayloadSize25PcOfMax", + category: "performance", + value: size, + max: maxPayloadSizeInBytes, + }])); + }); + + it("Should succeed when payload is between 50% and 100% of max", () => { + const size = 51 * (maxMessageSizeInBytes - 1); + assert(validator.isPayloadValid([Array(51).fill(generateMessageOfSize(maxMessageSizeInBytes - 1))])); + assert(logger.matchEvents([{ + eventName: "OpsPayloadSize50PcOfMax", + category: "performance", + value: size, + max: maxPayloadSizeInBytes, + }])); + }); + + it("Should fail when payload size is higher than max", () => { + const size = 101 * (maxMessageSizeInBytes - 1); + assert(!validator.isPayloadValid([Array(101).fill(generateMessageOfSize(maxMessageSizeInBytes - 1))])); + assert(logger.matchEvents([{ + eventName: "OpsPayloadSizeLimitExceeded", + category: "performance", + value: size, + max: maxPayloadSizeInBytes, + }])); + }); + + it("Should fail when payload size is higher than max and message is larger than max", () => { + const size = 100 * (maxMessageSizeInBytes); + assert(!validator.isPayloadValid([Array(100).fill(generateMessageOfSize(maxMessageSizeInBytes))])); + assert(logger.matchEvents([{ + eventName: "OpSizeLimitExceeded", + category: "performance", + value: maxMessageSizeInBytes, + max: maxMessageSizeInBytes, + }, { + eventName: "OpsPayloadSizeLimitExceeded", + category: "performance", + value: size, + max: maxPayloadSizeInBytes, + }])); + }); +}); diff --git a/packages/drivers/driver-base/src/test/tsconfig.json b/packages/drivers/driver-base/src/test/tsconfig.json new file mode 100644 index 000000000000..dc68725a1b07 --- /dev/null +++ b/packages/drivers/driver-base/src/test/tsconfig.json @@ -0,0 +1,21 @@ +{ + "extends": "@fluidframework/build-common/ts-common-config.json", + "compilerOptions": { + "rootDir": "./", + "outDir": "../../dist/test", + "types": [ + "node", + "mocha" + ], + "declaration": false, + "declarationMap": false + }, + "include": [ + "./**/*" + ], + "references": [ + { + "path": "../.." + } + ] +} \ No newline at end of file diff --git a/packages/drivers/driver-base/tsconfig.json b/packages/drivers/driver-base/tsconfig.json index f579caf4a3a6..6b88d6acb741 100644 --- a/packages/drivers/driver-base/tsconfig.json +++ b/packages/drivers/driver-base/tsconfig.json @@ -2,16 +2,18 @@ "extends": "@fluidframework/build-common/ts-common-config.json", "exclude": [ "dist", - "node_modules" + "node_modules", + "src/test/**/*" ], "compilerOptions": { "rootDir": "./src", "outDir": "./dist", "types": [ "@types/socket.io-client" - ] + ], + "composite": true }, "include": [ "src/**/*" ] -} \ No newline at end of file +} diff --git a/packages/drivers/local-driver/src/test/tsconfig.json b/packages/drivers/local-driver/src/test/tsconfig.json index dc68725a1b07..bf93a7a5c0e9 100644 --- a/packages/drivers/local-driver/src/test/tsconfig.json +++ b/packages/drivers/local-driver/src/test/tsconfig.json @@ -18,4 +18,4 @@ "path": "../.." } ] -} \ No newline at end of file +} diff --git a/packages/loader/container-loader/src/connectionManager.ts b/packages/loader/container-loader/src/connectionManager.ts index 3700cc2573c8..b43324d69048 100644 --- a/packages/loader/container-loader/src/connectionManager.ts +++ b/packages/loader/container-loader/src/connectionManager.ts @@ -78,7 +78,11 @@ function getNackReconnectInfo(nackContent: INackContent) { const createReconnectError = (fluidErrorCode: string, err: any) => wrapError( err, - (errorMessage: string) => new GenericNetworkError(fluidErrorCode, errorMessage, true /* canRetry */), + (errorMessage: string) => new GenericNetworkError( + fluidErrorCode, + errorMessage, + err?.canRetry === true || err?.canRetry === undefined, // unless explicitly specified, this will retry + ), ); /** diff --git a/packages/test/test-end-to-end-tests/src/test/container.spec.ts b/packages/test/test-end-to-end-tests/src/test/container.spec.ts index b3d5266c698f..7fcbc1142996 100644 --- a/packages/test/test-end-to-end-tests/src/test/container.spec.ts +++ b/packages/test/test-end-to-end-tests/src/test/container.spec.ts @@ -19,6 +19,7 @@ import { waitContainerToCatchUp, } from "@fluidframework/container-loader"; import { + IDocumentDeltaConnection, IDocumentServiceFactory, IFluidResolvedUrl, } from "@fluidframework/driver-definitions"; @@ -39,6 +40,7 @@ import { TestDataObjectType, describeNoCompat, } from "@fluidframework/test-version-utils"; +import { IClient } from "@fluidframework/protocol-definitions"; const id = "fluid-test://localhost/containerTest"; const testRequest: IRequest = { url: id }; @@ -96,6 +98,22 @@ describeNoCompat("Container", (getTestObjectProvider) => { ); } + const mockDocumentServiceFactory = ( + connectToDeltaStream: (client: IClient) => Promise, + ): IDocumentServiceFactory => { + const documentServiceFactory = provider.documentServiceFactory; + const mockFactory = Object.create(documentServiceFactory) as IDocumentServiceFactory; + // Issue typescript-eslint/typescript-eslint #1256 + mockFactory.createDocumentService = async (resolvedUrl) => { + const service = await documentServiceFactory.createDocumentService(resolvedUrl); + // Issue typescript-eslint/typescript-eslint #1256 + service.connectToDeltaStream = connectToDeltaStream; + return service; + }; + + return mockFactory; + }; + it("Load container successfully", async () => { const container = await loadContainer(); assert.strictEqual(container.id, "containerTest", "Container's id should be set"); @@ -106,17 +124,10 @@ describeNoCompat("Container", (getTestObjectProvider) => { it("Load container unsuccessfully", async () => { let success: boolean = true; try { - const documentServiceFactory = provider.documentServiceFactory; - const mockFactory = Object.create(documentServiceFactory) as IDocumentServiceFactory; - // Issue typescript-eslint/typescript-eslint #1256 - mockFactory.createDocumentService = async (resolvedUrl) => { - const service = await documentServiceFactory.createDocumentService(resolvedUrl); - // Issue typescript-eslint/typescript-eslint #1256 - service.connectToStorage = async () => Promise.reject(new Error("expectedFailure")); - return service; - }; - - await loadContainer({ documentServiceFactory: mockFactory }); + await loadContainer({ + documentServiceFactory: mockDocumentServiceFactory( + async () => Promise.reject(new Error("expectedFailure"))), + }); assert.fail("Error expected"); } catch (error) { assert.strictEqual(error.errorType, ContainerErrorType.genericError, "Error should be a general error"); @@ -130,16 +141,10 @@ describeNoCompat("Container", (getTestObjectProvider) => { it("Load container with error", async () => { let success: boolean = true; try { - const documentServiceFactory = provider.documentServiceFactory; - const mockFactory = Object.create(documentServiceFactory) as IDocumentServiceFactory; - // Issue typescript-eslint/typescript-eslint #1256 - mockFactory.createDocumentService = async (resolvedUrl) => { - const service = await documentServiceFactory.createDocumentService(resolvedUrl); - // Issue typescript-eslint/typescript-eslint #1256 - service.connectToDeltaStorage = async () => Promise.reject(new Error("expectedFailure")); - return service; - }; - const container2 = await loadContainer({ documentServiceFactory: mockFactory }); + const container2 = await loadContainer({ + documentServiceFactory: mockDocumentServiceFactory( + async () => Promise.reject(new Error("expectedFailure"))), + }); await waitContainerToCatchUp(container2); assert.fail("Error expected"); } catch (error) { @@ -155,17 +160,9 @@ describeNoCompat("Container", (getTestObjectProvider) => { const deltaConnection = new MockDocumentDeltaConnection( "test", ); - const documentServiceFactory = provider.documentServiceFactory; - const mockFactory = Object.create(documentServiceFactory) as IDocumentServiceFactory; - // Issue typescript-eslint/typescript-eslint #1256 - mockFactory.createDocumentService = async (resolvedUrl) => { - const service = await documentServiceFactory.createDocumentService(resolvedUrl); - // Issue typescript-eslint/typescript-eslint #1256 - service.connectToDeltaStream = async () => deltaConnection; - return service; - }; - - const container = await loadContainer({ documentServiceFactory: mockFactory }); + const container = await loadContainer({ + documentServiceFactory: mockDocumentServiceFactory(async () => deltaConnection), + }); assert.strictEqual(container.connectionState, ConnectionState.Connecting, "Container should be in Connecting state"); // Note: this will create infinite loop of reconnects as every reconnect would bring closed connection. @@ -181,26 +178,18 @@ describeNoCompat("Container", (getTestObjectProvider) => { await Promise.resolve(); assert.deepEqual(disconnectedEventArgs, []); } finally { - deltaConnection.removeAllListeners(); container.close(); } }); - it("Raise connection error event", async () => { + it("Raise retriable connection error event", async () => { const deltaConnection = new MockDocumentDeltaConnection( "test", ); - const documentServiceFactory = provider.documentServiceFactory; - const mockFactory = Object.create(documentServiceFactory) as IDocumentServiceFactory; - // Issue typescript-eslint/typescript-eslint #1256 - mockFactory.createDocumentService = async (resolvedUrl) => { - const service = await documentServiceFactory.createDocumentService(resolvedUrl); - // Issue typescript-eslint/typescript-eslint #1256 - service.connectToDeltaStream = async () => deltaConnection; - return service; - }; + const container = await loadContainer({ + documentServiceFactory: mockDocumentServiceFactory(async () => deltaConnection), + }); let errorRaised = false; - const container = await loadContainer({ documentServiceFactory: mockFactory }); container.on("error", () => { errorRaised = true; }); @@ -208,7 +197,7 @@ describeNoCompat("Container", (getTestObjectProvider) => { "Container should be in Connecting state"); const err = { message: "Test error", - canRetry: false, + canRetry: true, }; // Note: this will create infinite loop of reconnects as every reconnect would bring closed connection. // Only closing container will break that cycle. @@ -216,7 +205,6 @@ describeNoCompat("Container", (getTestObjectProvider) => { try { assert.strictEqual(container.connectionState, ConnectionState.Disconnected, "Container should be in Disconnected state"); - // All errors on socket are not critical! assert.strictEqual(container.closed, false, "Container should not be closed"); assert.strictEqual(errorRaised, false, "Error event should not be raised."); } finally { @@ -225,20 +213,42 @@ describeNoCompat("Container", (getTestObjectProvider) => { } }); - it("Close called on container", async () => { + it("Raise non-retriable connection error event", async () => { const deltaConnection = new MockDocumentDeltaConnection( "test", ); - const documentServiceFactory = provider.documentServiceFactory; - const mockFactory = Object.create(documentServiceFactory) as IDocumentServiceFactory; - // Issue typescript-eslint/typescript-eslint #1256 - mockFactory.createDocumentService = async (resolvedUrl) => { - const service = await documentServiceFactory.createDocumentService(resolvedUrl); - // Issue typescript-eslint/typescript-eslint #1256 - service.connectToDeltaStream = async () => deltaConnection; - return service; + const container = await loadContainer({ + documentServiceFactory: mockDocumentServiceFactory(async () => deltaConnection), + }); + let errorRaised = false; + container.on("error", () => { + errorRaised = true; + }); + assert.strictEqual(container.connectionState, ConnectionState.Connecting, + "Container should be in Connecting state"); + const err = { + message: "Test error", + canRetry: false, }; - const container = await loadContainer({ documentServiceFactory: mockFactory }); + deltaConnection.emitError(err); + try { + assert.strictEqual(container.connectionState, ConnectionState.Disconnected, + "Container should be in Disconnected state"); + assert(container.closed, "Container should be closed"); + assert(!errorRaised, "Error event should be raised."); + } finally { + deltaConnection.removeAllListeners(); + container.close(); + } + }); + + it("Close called on container", async () => { + const deltaConnection = new MockDocumentDeltaConnection( + "test", + ); + const container = await loadContainer({ + documentServiceFactory: mockDocumentServiceFactory(async () => deltaConnection), + }); container.on("error", () => { assert.ok(false, "Error event should not be raised."); }); @@ -248,7 +258,6 @@ describeNoCompat("Container", (getTestObjectProvider) => { assert.strictEqual(container.connectionState, ConnectionState.Disconnected, "Container should be in Disconnected state"); assert.strictEqual(container.closed, true, "Container should be closed"); - deltaConnection.removeAllListeners(); }); it("Delta manager receives readonly event when calling container.forceReadonly()", async () => { diff --git a/packages/test/test-end-to-end-tests/src/test/payloadSize.spec.ts b/packages/test/test-end-to-end-tests/src/test/payloadSize.spec.ts new file mode 100644 index 000000000000..2595036266f3 --- /dev/null +++ b/packages/test/test-end-to-end-tests/src/test/payloadSize.spec.ts @@ -0,0 +1,147 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "assert"; +import { IContainerRuntime } from "@fluidframework/container-runtime-definitions"; +import { SharedMap } from "@fluidframework/map"; +import { FlushMode, IContainerRuntimeBase } from "@fluidframework/runtime-definitions"; +import { requestFluidObject } from "@fluidframework/runtime-utils"; +import { + ITestFluidObject, + ChannelFactoryRegistry, + timeoutPromise, + ITestObjectProvider, + ITestContainerConfig, + DataObjectFactoryType, +} from "@fluidframework/test-utils"; +import { describeNoCompat } from "@fluidframework/test-version-utils"; +import { IContainer } from "@fluidframework/container-definitions"; +import { GenericNetworkError } from "@fluidframework/driver-utils"; + +const map1Id = "map1Key"; +const registry: ChannelFactoryRegistry = [ + [map1Id, SharedMap.getFactory()], +]; +const testContainerConfig: ITestContainerConfig = { + fluidDataObjectType: DataObjectFactoryType.Test, + registry, +}; + +const getMockStore = ((store: Record): Storage => { + return { + getItem: (key: string): string | null => store[key], + length: Object.keys(store).length, + clear: () => { }, + // eslint-disable-next-line no-null/no-null + key: (_index: number): string | null => null, + removeItem: (_key: string) => { }, + setItem: (_key: string, _value: string) => { }, + }; +}); + +const settings: Record = {}; +global.localStorage = getMockStore(settings); + +// This test, ran against real services, should serve as a canary for socket.io +// or other communication level limitations between clients with regards +// to the op size or total payload size +describeNoCompat("Payload size", (getTestObjectProvider) => { + let provider: ITestObjectProvider; + beforeEach(() => { + provider = getTestObjectProvider(); + }); + + let container1: IContainer; + let dataObject1: ITestFluidObject; + let dataObject2: ITestFluidObject; + let dataObject1map1: SharedMap; + let dataObject2map1: SharedMap; + + // eslint-disable-next-line prefer-arrow/prefer-arrow-functions + async function waitForCleanContainers(...dataStores: ITestFluidObject[]) { + return Promise.all(dataStores.map(async (dataStore) => { + const runtime = dataStore.context.containerRuntime as IContainerRuntime; + while (runtime.isDirty) { + await timeoutPromise((resolve) => runtime.once("batchEnd", resolve)); + } + })); + } + + beforeEach(async () => { + // Create a Container for the first client. + container1 = await provider.makeTestContainer(testContainerConfig); + dataObject1 = await requestFluidObject(container1, "default"); + dataObject1.context.containerRuntime.setFlushMode(FlushMode.TurnBased); + dataObject1map1 = await dataObject1.getSharedObject(map1Id); + + // Load the Container that was created by the first client. + const container2 = await provider.loadTestContainer(testContainerConfig); + dataObject2 = await requestFluidObject(container2, "default"); + dataObject2.context.containerRuntime.setFlushMode(FlushMode.TurnBased); + dataObject2map1 = await dataObject2.getSharedObject(map1Id); + + await waitForCleanContainers(dataObject1, dataObject2); + await provider.ensureSynchronized(); + }); + + const generateStringOfSize = (sizeInBytes: number): string => new Array(sizeInBytes + 1).join("0"); + const setMapKeys = (containerRuntime: IContainerRuntimeBase, count: number, item: string): void => { + containerRuntime.orderSequentially(() => { + for (let i = 0; i < count; i++) { + dataObject1map1.set(`key${i}`, item); + } + }); + }; + + it("Can send 60 messages of 16k", async () => { + // Total payload size: 16 * 1000 * 60 = 960000 + const largeString = generateStringOfSize(16 * 1000); + const messageCount = 60; + // The limit is from socket.io seems to be 1MB + // as experimentally, a payload of 979774 bytes pass, while a + // a payload of 996103 bytes does not. Which is also an argument + // that the message is stringified again. This will also explain + // why the size varies slightly based on the string content + // of the message. + setMapKeys(dataObject1.context.containerRuntime, messageCount, largeString); + + // Wait for the ops to get processed by both the containers. + await provider.ensureSynchronized(); + + for (let i = 0; i < messageCount; i++) { + const value = dataObject2map1.get(`key${i}`); + assert.strictEqual(value, largeString, `Wrong value for key${i}`); + } + }); + + it("Cannot send large batches with feature gate enabled", async () => { + settings.FluidEnablePayloadSizeLimit = "1"; + // Total payload size: 16 * 1000 * 57 = 912000, limit is 900000 + const largeString = generateStringOfSize(16 * 1000); + const messageCount = 57; + const containerClosedByLargePayload = new Promise((res) => container1.once("closed", (error) => { + res(error instanceof GenericNetworkError && + error.fluidErrorCode === "dmDocumentDeltaConnectionError"); + })); + setMapKeys(dataObject1.context.containerRuntime, messageCount, largeString); + + assert(await containerClosedByLargePayload); + }); + + it("Can send large batches with feature gate disabled", async () => { + settings.FluidEnablePayloadSizeLimit = ""; + const largeString = generateStringOfSize(16 * 1000); + const messageCount = 100; + setMapKeys(dataObject1.context.containerRuntime, messageCount, largeString); + + // Wait for the ops to get processed by both the containers. + await provider.ensureSynchronized(); + + for (let i = 0; i < messageCount; i++) { + const value = dataObject2map1.get(`key${i}`); + assert.strictEqual(value, largeString, `Wrong value for key${i}`); + } + }); +}); diff --git a/packages/utils/telemetry-utils/src/test/thresholdCounter.spec.ts b/packages/utils/telemetry-utils/src/test/thresholdCounter.spec.ts index 7cdc625b80e8..c2883d231c00 100644 --- a/packages/utils/telemetry-utils/src/test/thresholdCounter.spec.ts +++ b/packages/utils/telemetry-utils/src/test/thresholdCounter.spec.ts @@ -4,64 +4,56 @@ */ import assert from "assert"; -import { - ITelemetryBaseEvent, - ITelemetryErrorEvent, - ITelemetryPerformanceEvent, - ITelemetryGenericEvent, - ITelemetryLogger, -} from "@fluidframework/common-definitions"; +import { MockLogger } from "../mockLogger"; import { ThresholdCounter } from "../thresholdCounter"; -class FakeTelemetryLogger implements ITelemetryLogger { - public events: ITelemetryGenericEvent[] = []; - - public send(_event: ITelemetryBaseEvent): void { - assert.fail("Should not be called"); - } - - public sendTelemetryEvent(_event: ITelemetryGenericEvent, _error?: any) { - assert.fail("Should not be called"); - } - - public sendErrorEvent(_event: ITelemetryErrorEvent, _error?: any) { - assert.fail("Should not be called"); - } - - public sendPerformanceEvent(event: ITelemetryPerformanceEvent, _error?: any): void { - this.events.push(event); - } -} - describe("ThresholdCounter", () => { - let logger: FakeTelemetryLogger; + let logger: MockLogger; let sender: ThresholdCounter; const threshold = 100; beforeEach(() => { - logger = new FakeTelemetryLogger(); + logger = new MockLogger(); sender = new ThresholdCounter(threshold, logger); }); it("Send only if it passes threshold", () => { - sender.send("event_1", threshold); - sender.send("event_2", threshold + 1); - sender.send("event_3", threshold - 1); - sender.send("event_4", 0); + assert(sender.send("event_1", threshold, { extra: threshold })); + assert(sender.send("event_2", threshold + 1, { extra: threshold })); + assert(!sender.send("event_3", threshold - 1)); + assert(!sender.send("event_4", 0)); assert.strictEqual(logger.events.length, 2); - assert.deepStrictEqual(logger.events[0], { eventName: "event_1", value: threshold }); - assert.deepStrictEqual(logger.events[1], { eventName: "event_2", value: threshold + 1 }); + assert(logger.matchEvents([{ + eventName: "event_1", + category: "performance", + value: threshold, + extra: threshold, + }, { + eventName: "event_2", + category: "performance", + value: threshold + 1, + extra: threshold, + }])); }); it("Send only if value is multiple", () => { - sender.sendIfMultiple("event_1", threshold); - sender.sendIfMultiple("event_2", threshold * 2); - sender.sendIfMultiple("event_3", threshold - 1); - sender.sendIfMultiple("event_4", 0); + assert(sender.sendIfMultiple("event_1", threshold, { extra: threshold })); + assert(sender.sendIfMultiple("event_2", threshold * 2, { extra: threshold })); + assert(!sender.sendIfMultiple("event_3", threshold - 1)); + assert(!sender.sendIfMultiple("event_4", 0)); assert.strictEqual(logger.events.length, 2); - assert.deepStrictEqual(logger.events[0], { eventName: "event_1", value: threshold }); - assert.deepStrictEqual(logger.events[1], { eventName: "event_2", value: threshold * 2 }); + assert(logger.matchEvents([{ + eventName: "event_1", + category: "performance", + value: threshold, + extra: threshold, + }, { + eventName: "event_2", + category: "performance", + value: threshold * 2, + extra: threshold, + }])); }); }); diff --git a/packages/utils/telemetry-utils/src/thresholdCounter.ts b/packages/utils/telemetry-utils/src/thresholdCounter.ts index 008576e09d7a..e93edfd346c5 100644 --- a/packages/utils/telemetry-utils/src/thresholdCounter.ts +++ b/packages/utils/telemetry-utils/src/thresholdCounter.ts @@ -17,33 +17,41 @@ export class ThresholdCounter { ) {} /** - * Sends the value if it's above the treshold. + * Sends the value if it's above the threshold. */ - public send(eventName: string, value: number) { + public send(eventName: string, value: number, metadata?: any): boolean { if (value < this.threshold) { - return; + return false; } + this.logger.sendPerformanceEvent({ eventName, value, + ...metadata, }); + + return true; } /** - * Sends the value if it's above the treshold + * Sends the value if it's above the threshold * and a multiple of the threshold. * * To be used in scenarios where we'd like to record a * threshold violation while reducing telemetry noise. */ - public sendIfMultiple(eventName: string, value: number) { + public sendIfMultiple(eventName: string, value: number, metadata?: any): boolean { if (value === this.thresholdMultiple) { this.logger.sendPerformanceEvent({ eventName, value, + ...metadata, }); // reduce number of "multiple" events. this.thresholdMultiple = this.thresholdMultiple * 2; + return true; } + + return false; } }