From df53d666319b3a2f9e2d6dc4a4a1d6f20da3f623 Mon Sep 17 00:00:00 2001 From: Andrei Iacob <84357545+andre4i@users.noreply.github.com> Date: Tue, 1 Mar 2022 10:29:18 -0800 Subject: [PATCH] Simple mechanism to catch a runaway container that is not making progress (#9243) Part of #9023 as this behavior is always observed when we hit the socket.io payload size limit and the container enters an endless reconnect loop. --- .../container-runtime/src/containerRuntime.ts | 63 +++++- .../src/test/containerRuntime.spec.ts | 206 ++++++++++++++++-- 2 files changed, 254 insertions(+), 15 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 0465befc39b9..5f904180b26a 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -2,7 +2,8 @@ * Copyright (c) Microsoft Corporation and contributors. All rights reserved. * Licensed under the MIT License. */ - +// See #9219 +/* eslint-disable max-lines */ import { EventEmitter } from "events"; import { ITelemetryBaseLogger, ITelemetryGenericEvent, ITelemetryLogger } from "@fluidframework/common-definitions"; import { @@ -338,6 +339,7 @@ interface OldContainerContextWithLogger extends Omit private readonly summarizerNode: IRootSummarizerNodeWithGC; private readonly _aliasingEnabled: boolean; + private readonly maxConsecutiveReconnects: number; + private readonly defaultMaxConsecutiveReconnects = 15; + private _orderSequentiallyCalls: number = 0; private _flushMode: FlushMode; private needsFlush = false; @@ -904,6 +909,8 @@ export class ContainerRuntime extends TypedEventEmitter private paused: boolean = false; + private consecutiveReconnects = 0; + public get connected(): boolean { return this._connected; } @@ -1024,6 +1031,9 @@ export class ContainerRuntime extends TypedEventEmitter (this.mc.config.getBoolean(useDataStoreAliasingKey) ?? false) || (runtimeOptions.useDataStoreAliasing ?? false); + this.maxConsecutiveReconnects = + this.mc.config.getNumber(maxConsecutiveReconnectsKey) ?? this.defaultMaxConsecutiveReconnects; + this.garbageCollector = GarbageCollector.create( this, this.runtimeOptions.gcOptions, @@ -1451,6 +1461,42 @@ export class ContainerRuntime extends TypedEventEmitter } } + // Track how many times the container tries to reconnect with pending messages. + // This happens when the connection state is changed and we reset the counter + // when we are able to process a local op or when there are no pending messages. + // If this counter reaches a max, it's a good indicator that the container + // is not making progress and it is stuck in a retry loop. + private shouldContinueReconnecting(): boolean { + if (this.maxConsecutiveReconnects <= 0) { + // Feature disabled, we never stop reconnecting + return true; + } + + if (!this.pendingStateManager.hasPendingMessages()) { + // If there are no pending messages, we can always reconnect + this.resetReconnectCount(); + return true; + } + + this.consecutiveReconnects++; + if (this.consecutiveReconnects === Math.floor(this.maxConsecutiveReconnects / 2)) { + // If we're halfway through the max reconnects, send an event in order + // to better identify false positives, if any. If the rate of this event + // matches `MaxReconnectsWithNoProgress`, we can safely cut down + // maxConsecutiveReconnects to half. + this.mc.logger.sendTelemetryEvent({ + eventName: "ReconnectsWithNoProgress", + attempts: this.consecutiveReconnects, + }); + } + + return this.consecutiveReconnects < this.maxConsecutiveReconnects; + } + + private resetReconnectCount() { + this.consecutiveReconnects = 0; + } + private replayPendingStates() { // We need to be able to send ops to replay states if (!this.canSendOps()) { return; } @@ -1531,6 +1577,14 @@ export class ContainerRuntime extends TypedEventEmitter if (changeOfState) { this.deltaManager.off("op", this.onOp); this.context.pendingLocalState = undefined; + if (!this.shouldContinueReconnecting()) { + this.closeFn(new GenericError( + "MaxReconnectsWithNoProgress", + undefined, // error + { attempts: this.consecutiveReconnects })); + return; + } + this.replayPendingStates(); } @@ -1594,6 +1648,13 @@ export class ContainerRuntime extends TypedEventEmitter this.emit("op", message); this.scheduleManager.afterOpProcessing(undefined, message); + + if (local) { + // If we have processed a local op, this means that the container is + // making progress and we can reset the counter for how many times + // we have consecutively replayed the pending states + this.resetReconnectCount(); + } } catch (e) { this.scheduleManager.afterOpProcessing(e, message); throw e; diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 3a04a1c099e3..4feeace7fe3c 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -16,6 +16,8 @@ import { FlushMode } from "@fluidframework/runtime-definitions"; import { DebugLogger, MockLogger } from "@fluidframework/telemetry-utils"; import { MockDeltaManager, MockQuorum } from "@fluidframework/test-runtime-utils"; import { ContainerRuntime, ScheduleManager } from "../containerRuntime"; +import { PendingStateManager } from "../pendingStateManager"; +import { DataStores } from "../dataStores"; describe("Runtime", () => { describe("Container Runtime", () => { @@ -35,7 +37,7 @@ describe("Runtime", () => { containerErrors.push(error); } }, - updateDirtyContainerState: (dirty: boolean) => {}, + updateDirtyContainerState: (dirty: boolean) => { }, }; }); @@ -127,21 +129,21 @@ describe("Runtime", () => { const sandbox = createSandbox(); const createMockContext = (attachState: AttachState, addPendingMsg: boolean): Partial => { - const pendingMessage = { - type: "message", - content: {}, - }; + const pendingMessage = { + type: "message", + content: {}, + }; - return { - deltaManager: new MockDeltaManager(), - quorum: new MockQuorum(), - logger: new MockLogger(), - clientDetails: { capabilities: { interactive: true } }, - updateDirtyContainerState: (dirty: boolean) => {}, - attachState, - pendingLocalState: addPendingMsg ? {pendingStates: [pendingMessage]} : undefined, + return { + deltaManager: new MockDeltaManager(), + quorum: new MockQuorum(), + logger: new MockLogger(), + clientDetails: { capabilities: { interactive: true } }, + updateDirtyContainerState: (dirty: boolean) => { }, + attachState, + pendingLocalState: addPendingMsg ? { pendingStates: [pendingMessage] } : undefined, + }; }; - }; it("should NOT be set to dirty if context is attached with no pending ops", async () => { const mockContext = createMockContext(AttachState.Attached, false); @@ -527,5 +529,181 @@ describe("Runtime", () => { testWrongBatches(); }); }); + describe("Pending state progress tracking", () => { + const maxReconnects = 15; + + let containerRuntime: ContainerRuntime; + const mockLogger = new MockLogger(); + const containerErrors: ICriticalContainerError[] = []; + const getMockContext = (): Partial => { + return { + clientId: "fakeClientId", + deltaManager: new MockDeltaManager(), + quorum: new MockQuorum(), + logger: mockLogger, + clientDetails: { capabilities: { interactive: true } }, + closeFn: (error?: ICriticalContainerError): void => { + if (error !== undefined) { + containerErrors.push(error); + } + }, + updateDirtyContainerState: (dirty: boolean) => { }, + }; + }; + const getMockPendingStateManager = (hasPendingMessages: boolean): PendingStateManager => { + return { + replayPendingStates: () => { }, + hasPendingMessages: () => hasPendingMessages, + processMessage: (_message: ISequencedDocumentMessage, _local: boolean) => { + return { localAck: false, localOpMetadata: undefined }; + }, + } as PendingStateManager; + } + const getMockDataStores = (): DataStores => { + return { + processFluidDataStoreOp: + (_message: ISequencedDocumentMessage, + _local: boolean, + _localMessageMetadata: unknown) => { }, + setConnectionState: (_connected: boolean, _clientId?: string) => { }, + } as DataStores; + } + + const getFirstContainerError = (): ICriticalContainerError => { + assert.ok(containerErrors.length > 0, "Container should have errors"); + return containerErrors[0]; + }; + + beforeEach(async () => { + containerErrors.length = 0; + containerRuntime = await ContainerRuntime.load( + getMockContext() as IContainerContext, + [], + undefined, // requestHandler + { + summaryOptions: { + disableSummaries: true, + }, + }, + ); + }); + + function patchRuntime( + pendingStateManager: PendingStateManager, + maxReconnects: number | undefined = undefined + ) { + const runtime = containerRuntime as any; + runtime.pendingStateManager = pendingStateManager; + runtime.dataStores = getMockDataStores(); + runtime.maxConsecutiveReconnects = maxReconnects ?? runtime.maxConsecutiveReconnects; + return runtime as ContainerRuntime; + } + + it(`No progress for ${maxReconnects} connection state changes and pending state will ` + + "close the container", async () => { + patchRuntime(getMockPendingStateManager(true /* always has pending messages */)); + + for (let i = 0; i < maxReconnects; i++) { + containerRuntime.setConnectionState(!containerRuntime.connected); + } + + const error = getFirstContainerError(); + assert.ok(error instanceof GenericError); + assert.strictEqual(error.fluidErrorCode, "MaxReconnectsWithNoProgress"); + assert.strictEqual(error.getTelemetryProperties().attempts, maxReconnects); + mockLogger.assertMatchAny([{ + eventName: "ContainerRuntime:ReconnectsWithNoProgress", + attempts: 7, + }]); + }); + + it(`No progress for ${maxReconnects} / 2 connection state changes and pending state will ` + + "not close the container", async () => { + patchRuntime(getMockPendingStateManager(true /* always has pending messages */)); + + for (let i = 0; i < maxReconnects / 2; i++) { + containerRuntime.setConnectionState(!containerRuntime.connected); + } + + assert.equal(containerErrors.length, 0); + mockLogger.assertMatchAny([{ + eventName: "ContainerRuntime:ReconnectsWithNoProgress", + attempts: 7, + }]); + }); + + it(`No progress for ${maxReconnects} connection state changes and pending state with` + + "feature disabled will not close the container", async () => { + patchRuntime( + getMockPendingStateManager(true /* always has pending messages */), + -1 /* maxConsecutiveReplays */); + + for (let i = 0; i < maxReconnects; i++) { + containerRuntime.setConnectionState(!containerRuntime.connected); + } + + assert.equal(containerErrors.length, 0); + mockLogger.assertMatch([]); + }); + + it(`No progress for ${maxReconnects} connection state changes and no pending state will ` + + "not close the container", async () => { + patchRuntime(getMockPendingStateManager(false /* always has no pending messages */)); + + for (let i = 0; i < maxReconnects; i++) { + containerRuntime.setConnectionState(!containerRuntime.connected); + } + + assert.equal(containerErrors.length, 0); + mockLogger.assertMatch([]); + }); + + it(`No progress for ${maxReconnects} connection state changes and pending state but successfully ` + + "processing local op will not close the container", async () => { + patchRuntime(getMockPendingStateManager(true /* always has pending messages */)); + + for (let i = 0; i < maxReconnects; i++) { + containerRuntime.setConnectionState(!containerRuntime.connected); + containerRuntime.process({ + type: "op", + clientId: "clientId", + sequenceNumber: 0, + contents: { + address: "address", + }, + } as any as ISequencedDocumentMessage, true /* local */); + } + + assert.equal(containerErrors.length, 0); + mockLogger.assertMatch([]); + }); + + it(`No progress for ${maxReconnects} connection state changes and pending state but successfully ` + + "processing remote op will close the container", async () => { + patchRuntime(getMockPendingStateManager(true /* always has pending messages */)); + + for (let i = 0; i < maxReconnects; i++) { + containerRuntime.setConnectionState(false); + containerRuntime.setConnectionState(true); + containerRuntime.process({ + type: "op", + clientId: "clientId", + sequenceNumber: 0, + contents: { + address: "address", + }, + } as any as ISequencedDocumentMessage, false /* local */); + } + + const error = getFirstContainerError(); + assert.ok(error instanceof GenericError); + assert.strictEqual(error.fluidErrorCode, "MaxReconnectsWithNoProgress"); + assert.strictEqual(error.getTelemetryProperties().attempts, maxReconnects); + mockLogger.assertMatchAny([{ + eventName: "ContainerRuntime:ReconnectsWithNoProgress", + attempts: 7, + }]); + }); + }); }); });