Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block large payloads inside DocumentDeltaConnection, size due to the 1MB Kafka limit #7987

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
391382b
Track message size - first draft
andre4i Oct 23, 2021
0b43fc1
Fix test infra, add more tests
andre4i Oct 25, 2021
a18376f
Simplify threshold counter tests, add metadata to performance event f…
andre4i Oct 25, 2021
1cec55b
Small renames and refactorings
andre4i Oct 25, 2021
75f6505
Merge branch 'main' into track-message-size
andre4i Oct 25, 2021
6873e82
update package version
andre4i Oct 25, 2021
100890d
Fix package.json
andre4i Oct 27, 2021
77887cf
Change message size validation to be relative to both the message and…
andre4i Nov 1, 2021
90dc125
Merge branch 'main' into track-message-size
andre4i Nov 1, 2021
6acfd6c
Make feature gate private
andre4i Nov 2, 2021
9a135a9
Small comment
andre4i Nov 2, 2021
964587e
Formatting
andre4i Nov 2, 2021
0dca9ae
Update comment
andre4i Nov 2, 2021
38409c6
update comment
andre4i Nov 3, 2021
1870140
Merge branch 'main' into track-message-size
andre4i Nov 9, 2021
e1b3fd8
update package version
andre4i Nov 9, 2021
4be520c
Merge branch 'track-message-size' of github.com:andre4i/FluidFramewor…
andre4i Nov 9, 2021
030a302
Merge branch 'main' into track-message-size
andre4i Dec 7, 2021
ab79725
mock localstorage
andre4i Dec 8, 2021
9039f1d
Close container on errors from localdocumentdeltaconn
andre4i Dec 8, 2021
b96af42
Merge branch 'main' into track-message-size
andre4i Dec 8, 2021
8ce1e41
Add some comments
andre4i Dec 8, 2021
a601e54
Fix some tests
andre4i Dec 9, 2021
d12ec3b
Merge branch 'main' into track-message-size
andre4i Dec 9, 2021
b66f6e0
Fix package version
andre4i Dec 9, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api-report/telemetry-utils.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ export class TelemetryUTLogger implements ITelemetryLogger {
// @public
export class ThresholdCounter {
constructor(threshold: number, logger: ITelemetryLogger, thresholdMultiple?: number);
send(eventName: string, value: number): void;
sendIfMultiple(eventName: string, value: number): void;
send(eventName: string, value: number, metadata?: any): boolean;
sendIfMultiple(eventName: string, value: number, metadata?: any): boolean;
}

// @public
Expand Down
3 changes: 3 additions & 0 deletions packages/drivers/driver-base/.eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ module.exports = {
"extends": [
"@fluidframework/eslint-config-fluid/eslint7"
],
"parserOptions": {
"project": ["./tsconfig.json", "./src/test/tsconfig.json"]
},
"rules": {
"@typescript-eslint/strict-boolean-expressions": "off"
}
Expand Down
12 changes: 11 additions & 1 deletion packages/drivers/driver-base/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@
"types": "dist/index.d.ts",
"scripts": {
"build": "npm run build:genver && concurrently npm:build:compile npm:lint && npm run build:docs",
"build:compile": "concurrently npm:tsc npm:build:esnext",
"build:commonjs": "npm run tsc && npm run build:test",
"build:compile": "concurrently npm:build:commonjs npm:build:esnext",
"build:docs": "api-extractor run --local --typescript-compiler-folder ../../../node_modules/typescript && copyfiles -u 1 ./_api-extractor-temp/doc-models/* ../../../_api-extractor-temp/",
"build:esnext": "tsc --project ./tsconfig.esnext.json",
"build:full": "npm run build",
"build:full:compile": "npm run build:compile",
"build:genver": "gen-version",
"build:test": "tsc --project ./src/test/tsconfig.json",
"ci:build:docs": "api-extractor run --typescript-compiler-folder ../../../node_modules/typescript && copyfiles -u 1 ./_api-extractor-temp/* ../../../_api-extractor-temp/",
"clean": "rimraf dist lib *.tsbuildinfo *.build.log",
"eslint": "eslint --format stylish src",
"eslint:fix": "eslint --format stylish src --fix",
"lint": "npm run eslint",
"lint:fix": "npm run eslint:fix",
"test": "npm run test:mocha",
"test:coverage": "nyc npm test -- --reporter xunit --reporter-option output=nyc/junit-report.xml",
"test:mocha": "mocha --recursive dist/test -r node_modules/@fluidframework/mocha-test-setup --unhandled-rejections=strict",
"test:mocha:verbose": "cross-env FLUID_TEST_VERBOSE=1 npm run test:mocha",
"tsc": "tsc",
"tsfmt": "tsfmt --verify",
"tsfmt:fix": "tsfmt --replace"
Expand All @@ -39,7 +45,9 @@
"devDependencies": {
"@fluidframework/build-common": "^0.23.0",
"@fluidframework/eslint-config-fluid": "^0.24.0",
"@fluidframework/mocha-test-setup": "^0.54.0",
"@microsoft/api-extractor": "^7.16.1",
"@types/mocha": "^8.2.2",
"@types/node": "^12.19.0",
"@types/socket.io-client": "^1.4.32",
"@typescript-eslint/eslint-plugin": "~4.14.0",
Expand All @@ -53,6 +61,8 @@
"eslint-plugin-prefer-arrow": "~1.2.2",
"eslint-plugin-react": "~7.22.0",
"eslint-plugin-unicorn": "~26.0.1",
"mocha": "^8.4.0",
"nyc": "^15.0.0",
"rimraf": "^2.6.2",
"typescript": "~4.1.3",
"typescript-formatter": "7.1.0"
Expand Down
27 changes: 23 additions & 4 deletions packages/drivers/driver-base/src/documentDeltaConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
IDocumentDeltaConnectionEvents,
DriverError,
} from "@fluidframework/driver-definitions";
import { createGenericNetworkError } from "@fluidframework/driver-utils";
import { createGenericNetworkError, createWriteError } from "@fluidframework/driver-utils";
import {
ConnectionMode,
IClientConfiguration,
Expand All @@ -24,9 +24,12 @@ import {
} from "@fluidframework/protocol-definitions";
import { IDisposable, ITelemetryLogger } from "@fluidframework/common-definitions";
import { ChildLogger } from "@fluidframework/telemetry-utils";
import { MessageSizeValidator } from "./messageSizeValidator";

// Local storage key to disable the BatchManager
const batchManagerDisabledKey = "FluidDisableBatchManager";
// Local storage key to enable message size tracking
const limitPayloadSizeKey = "FluidEnablePayloadSizeLimit";

// See #8129.
// Need to move to common-utils (tracked as #8165)
Expand Down Expand Up @@ -105,6 +108,10 @@ export class DocumentDeltaConnection
protected _disposed: boolean = false;
protected readonly logger: ITelemetryLogger;
protected readonly isBatchManagerDisabled: boolean = false;
private messageSizeValidator: MessageSizeValidator | undefined;
// Close to 1MB with a small buffer to allow for redundant string escaping during transfer
private readonly maxPayloadSizeInBytes = 900000;
private readonly shouldLimitPayloadSize: boolean = false;

public get details(): IConnected {
if (!this._details) {
Expand Down Expand Up @@ -157,7 +164,8 @@ export class DocumentDeltaConnection
}
});

this.isBatchManagerDisabled = DocumentDeltaConnection.disabledBatchManagerFeatureGate;
this.isBatchManagerDisabled = DocumentDeltaConnection.booleanFeature(batchManagerDisabledKey);
this.shouldLimitPayloadSize = DocumentDeltaConnection.booleanFeature(limitPayloadSizeKey);
}

/**
Expand Down Expand Up @@ -280,6 +288,11 @@ export class DocumentDeltaConnection
}

protected emitMessages(type: string, messages: IDocumentMessage[][]) {
if (this.messageSizeValidator && !this.messageSizeValidator.isPayloadValid(messages)) {
this.emit("error", createWriteError("Payload too large"));
return;
}

andre4i marked this conversation as resolved.
Show resolved Hide resolved
// Although the implementation here disconnects the socket and does not reuse it, other subclasses
// (e.g. OdspDocumentDeltaConnection) may reuse the socket. In these cases, we need to avoid emitting
// on the still-live socket.
Expand All @@ -288,11 +301,11 @@ export class DocumentDeltaConnection
}
}

private static get disabledBatchManagerFeatureGate() {
private static booleanFeature(key: string): boolean {
try {
return localStorage !== undefined
&& typeof localStorage === "object"
&& localStorage.getItem(batchManagerDisabledKey) === "1";
&& localStorage.getItem(key) === "1";
} catch (e) { }
return false;
}
Expand Down Expand Up @@ -470,6 +483,12 @@ export class DocumentDeltaConnection
}, timeout + 2000);
});

if (this.shouldLimitPayloadSize) {
this.messageSizeValidator = new MessageSizeValidator(
this.maxMessageSize + 1,
this.maxPayloadSizeInBytes,
ChildLogger.create(this.logger, "MessageSizeValidator"));
}
assert(!this.disposed, 0x246 /* "checking consistency of socket & _disposed flags" */);
}

Expand Down
75 changes: 75 additions & 0 deletions packages/drivers/driver-base/src/messageSizeValidator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/

import { ITelemetryLogger } from "@fluidframework/common-definitions";
import { IDocumentMessage } from "@fluidframework/protocol-definitions";
import { ThresholdCounter } from "@fluidframework/telemetry-utils";

export class MessageSizeValidator {
private readonly payloadSizeCountersWithEvents = [
// The order here matters, in order to save telemetry quota.
// The first counter to exceed its limit will short-circuit
// event publishing.
{
counter: new ThresholdCounter(this.maxPayloadSizeInBytes, this.logger),
eventName: "OpsPayloadSizeLimitExceeded",
},
{
counter: new ThresholdCounter(this.maxPayloadSizeInBytes / 2, this.logger),
eventName: "OpsPayloadSize50PcOfMax",
},
{
counter: new ThresholdCounter(this.maxPayloadSizeInBytes / 4, this.logger),
eventName: "OpsPayloadSize25PcOfMax",
},
];

private readonly messageSizeCounter = new ThresholdCounter(this.maxMessageSizeInBytes, this.logger);
private readonly messageSizeEvent = "OpSizeLimitExceeded";

constructor(
private readonly maxMessageSizeInBytes: number,
private readonly maxPayloadSizeInBytes: number,
private readonly logger: ITelemetryLogger,
) {
}

private trackPayload(payloadSizeInBytes: number) {
for (const x of this.payloadSizeCountersWithEvents) {
if (x.counter.send(x.eventName, payloadSizeInBytes, { max: this.maxPayloadSizeInBytes })) {
break;
}
}
}

private trackMessage(messageSizeInBytes: number) {
this.messageSizeCounter.send(this.messageSizeEvent, messageSizeInBytes, { max: this.maxMessageSizeInBytes });
}

public isPayloadValid(messages: IDocumentMessage[][]): boolean {
let payloadSizeInBytes = 0;
let allMessagesUnderLimit = true;

for (const inner of messages) {
for (const message of inner) {
const messageSize = MessageSizeValidator.sizeInBytes(message);
allMessagesUnderLimit &&= messageSize < this.maxMessageSizeInBytes;
payloadSizeInBytes = payloadSizeInBytes + messageSize;
this.trackMessage(messageSize);
}
}

this.trackPayload(payloadSizeInBytes);
return allMessagesUnderLimit && payloadSizeInBytes < this.maxPayloadSizeInBytes;
}

public static sizeInBytes(message: IDocumentMessage): number {
const { contents, ...restOfObject } = message;
// `contents` is already stringified. Re-stringifying the whole message will
// lead to additional escape characters which will increase the size artificially.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment suggest that what we measure is not what actually gets counted by socket.io. I.e. if socket.io strigifies payload, then it will add all these escape characters and they will go against the limit, right?

return new TextEncoder().encode(message.contents).length
andre4i marked this conversation as resolved.
Show resolved Hide resolved
+ new TextEncoder().encode(JSON.stringify(restOfObject)).length;
}
}
102 changes: 102 additions & 0 deletions packages/drivers/driver-base/src/test/messageSizeValidator.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/

import { strict as assert } from "assert";
import { IDocumentMessage } from "@fluidframework/protocol-definitions";
import { MockLogger } from "@fluidframework/telemetry-utils";
import { MessageSizeValidator } from "../messageSizeValidator";

const generateStringOfSize = (sizeInBytes: number): string => new Array(sizeInBytes + 1).join("0");
const generateMessageOfSize = (sizeInBytes: number): IDocumentMessage => {
const envelope = {
clientSequenceNumber: 1,
metadata: {
meta: "data",
other: "data",
},
referenceSequenceNumber: 0,
type: "test",
};

const message = (envelope as IDocumentMessage);
message.contents = generateStringOfSize(sizeInBytes - new TextEncoder().encode(JSON.stringify(message)).length);
return message;
};

describe("Message size validation", () => {
let logger: MockLogger;
let validator: MessageSizeValidator;
const maxMessageSizeInBytes = 10 * 1000;
const maxPayloadSizeInBytes = 100 * maxMessageSizeInBytes;

beforeEach(() => {
logger = new MockLogger();
validator = new MessageSizeValidator(maxMessageSizeInBytes, maxPayloadSizeInBytes, logger);
});

it("Should fail when message size is max", () => {
assert(!validator.isPayloadValid([[generateMessageOfSize(maxMessageSizeInBytes)]]));
assert(logger.matchEvents([{
eventName: "OpSizeLimitExceeded",
category: "performance",
value: maxMessageSizeInBytes,
max: maxMessageSizeInBytes,
}]));
});

it("Should succeed when payload is lower than 25% of max", () => {
assert(validator.isPayloadValid([[generateMessageOfSize(maxMessageSizeInBytes - 1)]]));
assert(logger.matchEvents([]));
});

it("Should succeed when payload is lower than 50% of max", async () => {
const size = 26 * (maxMessageSizeInBytes - 1);
assert(validator.isPayloadValid([Array(26).fill(generateMessageOfSize(maxMessageSizeInBytes - 1))]));
assert(logger.matchEvents([{
eventName: "OpsPayloadSize25PcOfMax",
category: "performance",
value: size,
max: maxPayloadSizeInBytes,
}]));
});

it("Should succeed when payload is between 50% and 100% of max", () => {
const size = 51 * (maxMessageSizeInBytes - 1);
assert(validator.isPayloadValid([Array(51).fill(generateMessageOfSize(maxMessageSizeInBytes - 1))]));
assert(logger.matchEvents([{
eventName: "OpsPayloadSize50PcOfMax",
category: "performance",
value: size,
max: maxPayloadSizeInBytes,
}]));
});

it("Should fail when payload size is higher than max", () => {
const size = 101 * (maxMessageSizeInBytes - 1);
assert(!validator.isPayloadValid([Array(101).fill(generateMessageOfSize(maxMessageSizeInBytes - 1))]));
assert(logger.matchEvents([{
eventName: "OpsPayloadSizeLimitExceeded",
category: "performance",
value: size,
max: maxPayloadSizeInBytes,
}]));
});

it("Should fail when payload size is higher than max and message is larger than max", () => {
const size = 100 * (maxMessageSizeInBytes);
assert(!validator.isPayloadValid([Array(100).fill(generateMessageOfSize(maxMessageSizeInBytes))]));
assert(logger.matchEvents([{
eventName: "OpSizeLimitExceeded",
category: "performance",
value: maxMessageSizeInBytes,
max: maxMessageSizeInBytes,
}, {
eventName: "OpsPayloadSizeLimitExceeded",
category: "performance",
value: size,
max: maxPayloadSizeInBytes,
}]));
});
});
21 changes: 21 additions & 0 deletions packages/drivers/driver-base/src/test/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"extends": "@fluidframework/build-common/ts-common-config.json",
"compilerOptions": {
"rootDir": "./",
"outDir": "../../dist/test",
"types": [
"node",
"mocha"
],
"declaration": false,
"declarationMap": false
},
"include": [
"./**/*"
],
"references": [
{
"path": "../.."
}
]
}
8 changes: 5 additions & 3 deletions packages/drivers/driver-base/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@
"extends": "@fluidframework/build-common/ts-common-config.json",
"exclude": [
"dist",
"node_modules"
"node_modules",
"src/test/**/*"
],
"compilerOptions": {
"rootDir": "./src",
"outDir": "./dist",
"types": [
"@types/socket.io-client"
]
],
"composite": true
},
"include": [
"src/**/*"
]
}
}
2 changes: 1 addition & 1 deletion packages/drivers/local-driver/src/test/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
"path": "../.."
}
]
}
}
6 changes: 5 additions & 1 deletion packages/loader/container-loader/src/connectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ function getNackReconnectInfo(nackContent: INackContent) {
const createReconnectError = (fluidErrorCode: string, err: any) =>
wrapError(
err,
(errorMessage: string) => new GenericNetworkError(fluidErrorCode, errorMessage, true /* canRetry */),
(errorMessage: string) => new GenericNetworkError(
fluidErrorCode,
errorMessage,
err?.canRetry === true || err?.canRetry === undefined, // unless explicitly specified, this will retry
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^ We would always retry, regardless of the type of error. Not sure if this has always been intentional or not..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the story :)
In ODSP, we definitely want to reconnect always, because we may get disconnect with 403 due to token expiration. 403 is critical error (i.e., in general it's a game over event), but across layers we always do one retry with refreshed token to ensure host has a chance to provide new token.

With that said, I think that's the wrong layer to participate in this game. I.e. ODSP has to project such errors as recoverable. I'd need to look at the code to say if it's the case. And not sure about FRS.

Also, worth noting that for long period of time we consider errors without canRetry to be recoverable. We changed that (maybe 6 months back) - any exception, whether it's a bug in our code, or in host code (like token callback) is catastrophic.

So, to summarize, this code is likely wrong. But I do not think you are making it any more right :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe there is a viable way right now for propagating non-retriable errors from the driver to the container. Maybe a TTL on the error itself? What are your thoughts?

),
);

/**
Expand Down
Loading