Skip to content

Commit

Permalink
Simple mechanism to catch a runaway container that is not making prog…
Browse files Browse the repository at this point in the history
…ress (#9243)

Part of #9023 as this behavior is always observed when we hit the socket.io payload size limit and the container enters an endless reconnect loop.
  • Loading branch information
andre4i authored Mar 1, 2022
1 parent 85bf932 commit df53d66
Show file tree
Hide file tree
Showing 2 changed files with 254 additions and 15 deletions.
63 changes: 62 additions & 1 deletion packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/

// See #9219
/* eslint-disable max-lines */
import { EventEmitter } from "events";
import { ITelemetryBaseLogger, ITelemetryGenericEvent, ITelemetryLogger } from "@fluidframework/common-definitions";
import {
Expand Down Expand Up @@ -338,6 +339,7 @@ interface OldContainerContextWithLogger extends Omit<IContainerContext, "taggedL
// Local storage key to set the default flush mode to TurnBased
const turnBasedFlushModeKey = "Fluid.ContainerRuntime.FlushModeTurnBased";
const useDataStoreAliasingKey = "Fluid.ContainerRuntime.UseDataStoreAliasing";
const maxConsecutiveReconnectsKey = "Fluid.ContainerRuntime.MaxConsecutiveReconnects";

export enum RuntimeMessage {
FluidDataStoreOp = "component",
Expand Down Expand Up @@ -895,6 +897,9 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>
private readonly summarizerNode: IRootSummarizerNodeWithGC;
private readonly _aliasingEnabled: boolean;

private readonly maxConsecutiveReconnects: number;
private readonly defaultMaxConsecutiveReconnects = 15;

private _orderSequentiallyCalls: number = 0;
private _flushMode: FlushMode;
private needsFlush = false;
Expand All @@ -904,6 +909,8 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>

private paused: boolean = false;

private consecutiveReconnects = 0;

public get connected(): boolean {
return this._connected;
}
Expand Down Expand Up @@ -1024,6 +1031,9 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>
(this.mc.config.getBoolean(useDataStoreAliasingKey) ?? false) ||
(runtimeOptions.useDataStoreAliasing ?? false);

this.maxConsecutiveReconnects =
this.mc.config.getNumber(maxConsecutiveReconnectsKey) ?? this.defaultMaxConsecutiveReconnects;

this.garbageCollector = GarbageCollector.create(
this,
this.runtimeOptions.gcOptions,
Expand Down Expand Up @@ -1451,6 +1461,42 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>
}
}

// Track how many times the container tries to reconnect with pending messages.
// This happens when the connection state is changed and we reset the counter
// when we are able to process a local op or when there are no pending messages.
// If this counter reaches a max, it's a good indicator that the container
// is not making progress and it is stuck in a retry loop.
private shouldContinueReconnecting(): boolean {
if (this.maxConsecutiveReconnects <= 0) {
// Feature disabled, we never stop reconnecting
return true;
}

if (!this.pendingStateManager.hasPendingMessages()) {
// If there are no pending messages, we can always reconnect
this.resetReconnectCount();
return true;
}

this.consecutiveReconnects++;
if (this.consecutiveReconnects === Math.floor(this.maxConsecutiveReconnects / 2)) {
// If we're halfway through the max reconnects, send an event in order
// to better identify false positives, if any. If the rate of this event
// matches `MaxReconnectsWithNoProgress`, we can safely cut down
// maxConsecutiveReconnects to half.
this.mc.logger.sendTelemetryEvent({
eventName: "ReconnectsWithNoProgress",
attempts: this.consecutiveReconnects,
});
}

return this.consecutiveReconnects < this.maxConsecutiveReconnects;
}

private resetReconnectCount() {
this.consecutiveReconnects = 0;
}

private replayPendingStates() {
// We need to be able to send ops to replay states
if (!this.canSendOps()) { return; }
Expand Down Expand Up @@ -1531,6 +1577,14 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>
if (changeOfState) {
this.deltaManager.off("op", this.onOp);
this.context.pendingLocalState = undefined;
if (!this.shouldContinueReconnecting()) {
this.closeFn(new GenericError(
"MaxReconnectsWithNoProgress",
undefined, // error
{ attempts: this.consecutiveReconnects }));
return;
}

this.replayPendingStates();
}

Expand Down Expand Up @@ -1594,6 +1648,13 @@ export class ContainerRuntime extends TypedEventEmitter<IContainerRuntimeEvents>

this.emit("op", message);
this.scheduleManager.afterOpProcessing(undefined, message);

if (local) {
// If we have processed a local op, this means that the container is
// making progress and we can reset the counter for how many times
// we have consecutively replayed the pending states
this.resetReconnectCount();
}
} catch (e) {
this.scheduleManager.afterOpProcessing(e, message);
throw e;
Expand Down
206 changes: 192 additions & 14 deletions packages/runtime/container-runtime/src/test/containerRuntime.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import { FlushMode } from "@fluidframework/runtime-definitions";
import { DebugLogger, MockLogger } from "@fluidframework/telemetry-utils";
import { MockDeltaManager, MockQuorum } from "@fluidframework/test-runtime-utils";
import { ContainerRuntime, ScheduleManager } from "../containerRuntime";
import { PendingStateManager } from "../pendingStateManager";
import { DataStores } from "../dataStores";

describe("Runtime", () => {
describe("Container Runtime", () => {
Expand All @@ -35,7 +37,7 @@ describe("Runtime", () => {
containerErrors.push(error);
}
},
updateDirtyContainerState: (dirty: boolean) => {},
updateDirtyContainerState: (dirty: boolean) => { },
};
});

Expand Down Expand Up @@ -127,21 +129,21 @@ describe("Runtime", () => {
const sandbox = createSandbox();
const createMockContext =
(attachState: AttachState, addPendingMsg: boolean): Partial<IContainerContext> => {
const pendingMessage = {
type: "message",
content: {},
};
const pendingMessage = {
type: "message",
content: {},
};

return {
deltaManager: new MockDeltaManager(),
quorum: new MockQuorum(),
logger: new MockLogger(),
clientDetails: { capabilities: { interactive: true } },
updateDirtyContainerState: (dirty: boolean) => {},
attachState,
pendingLocalState: addPendingMsg ? {pendingStates: [pendingMessage]} : undefined,
return {
deltaManager: new MockDeltaManager(),
quorum: new MockQuorum(),
logger: new MockLogger(),
clientDetails: { capabilities: { interactive: true } },
updateDirtyContainerState: (dirty: boolean) => { },
attachState,
pendingLocalState: addPendingMsg ? { pendingStates: [pendingMessage] } : undefined,
};
};
};

it("should NOT be set to dirty if context is attached with no pending ops", async () => {
const mockContext = createMockContext(AttachState.Attached, false);
Expand Down Expand Up @@ -527,5 +529,181 @@ describe("Runtime", () => {
testWrongBatches();
});
});
describe("Pending state progress tracking", () => {
const maxReconnects = 15;

let containerRuntime: ContainerRuntime;
const mockLogger = new MockLogger();
const containerErrors: ICriticalContainerError[] = [];
const getMockContext = (): Partial<IContainerContext> => {
return {
clientId: "fakeClientId",
deltaManager: new MockDeltaManager(),
quorum: new MockQuorum(),
logger: mockLogger,
clientDetails: { capabilities: { interactive: true } },
closeFn: (error?: ICriticalContainerError): void => {
if (error !== undefined) {
containerErrors.push(error);
}
},
updateDirtyContainerState: (dirty: boolean) => { },
};
};
const getMockPendingStateManager = (hasPendingMessages: boolean): PendingStateManager => {
return {
replayPendingStates: () => { },
hasPendingMessages: () => hasPendingMessages,
processMessage: (_message: ISequencedDocumentMessage, _local: boolean) => {
return { localAck: false, localOpMetadata: undefined };
},
} as PendingStateManager;
}
const getMockDataStores = (): DataStores => {
return {
processFluidDataStoreOp:
(_message: ISequencedDocumentMessage,
_local: boolean,
_localMessageMetadata: unknown) => { },
setConnectionState: (_connected: boolean, _clientId?: string) => { },
} as DataStores;
}

const getFirstContainerError = (): ICriticalContainerError => {
assert.ok(containerErrors.length > 0, "Container should have errors");
return containerErrors[0];
};

beforeEach(async () => {
containerErrors.length = 0;
containerRuntime = await ContainerRuntime.load(
getMockContext() as IContainerContext,
[],
undefined, // requestHandler
{
summaryOptions: {
disableSummaries: true,
},
},
);
});

function patchRuntime(
pendingStateManager: PendingStateManager,
maxReconnects: number | undefined = undefined
) {
const runtime = containerRuntime as any;
runtime.pendingStateManager = pendingStateManager;
runtime.dataStores = getMockDataStores();
runtime.maxConsecutiveReconnects = maxReconnects ?? runtime.maxConsecutiveReconnects;
return runtime as ContainerRuntime;
}

it(`No progress for ${maxReconnects} connection state changes and pending state will ` +
"close the container", async () => {
patchRuntime(getMockPendingStateManager(true /* always has pending messages */));

for (let i = 0; i < maxReconnects; i++) {
containerRuntime.setConnectionState(!containerRuntime.connected);
}

const error = getFirstContainerError();
assert.ok(error instanceof GenericError);
assert.strictEqual(error.fluidErrorCode, "MaxReconnectsWithNoProgress");
assert.strictEqual(error.getTelemetryProperties().attempts, maxReconnects);
mockLogger.assertMatchAny([{
eventName: "ContainerRuntime:ReconnectsWithNoProgress",
attempts: 7,
}]);
});

it(`No progress for ${maxReconnects} / 2 connection state changes and pending state will ` +
"not close the container", async () => {
patchRuntime(getMockPendingStateManager(true /* always has pending messages */));

for (let i = 0; i < maxReconnects / 2; i++) {
containerRuntime.setConnectionState(!containerRuntime.connected);
}

assert.equal(containerErrors.length, 0);
mockLogger.assertMatchAny([{
eventName: "ContainerRuntime:ReconnectsWithNoProgress",
attempts: 7,
}]);
});

it(`No progress for ${maxReconnects} connection state changes and pending state with` +
"feature disabled will not close the container", async () => {
patchRuntime(
getMockPendingStateManager(true /* always has pending messages */),
-1 /* maxConsecutiveReplays */);

for (let i = 0; i < maxReconnects; i++) {
containerRuntime.setConnectionState(!containerRuntime.connected);
}

assert.equal(containerErrors.length, 0);
mockLogger.assertMatch([]);
});

it(`No progress for ${maxReconnects} connection state changes and no pending state will ` +
"not close the container", async () => {
patchRuntime(getMockPendingStateManager(false /* always has no pending messages */));

for (let i = 0; i < maxReconnects; i++) {
containerRuntime.setConnectionState(!containerRuntime.connected);
}

assert.equal(containerErrors.length, 0);
mockLogger.assertMatch([]);
});

it(`No progress for ${maxReconnects} connection state changes and pending state but successfully ` +
"processing local op will not close the container", async () => {
patchRuntime(getMockPendingStateManager(true /* always has pending messages */));

for (let i = 0; i < maxReconnects; i++) {
containerRuntime.setConnectionState(!containerRuntime.connected);
containerRuntime.process({
type: "op",
clientId: "clientId",
sequenceNumber: 0,
contents: {
address: "address",
},
} as any as ISequencedDocumentMessage, true /* local */);
}

assert.equal(containerErrors.length, 0);
mockLogger.assertMatch([]);
});

it(`No progress for ${maxReconnects} connection state changes and pending state but successfully ` +
"processing remote op will close the container", async () => {
patchRuntime(getMockPendingStateManager(true /* always has pending messages */));

for (let i = 0; i < maxReconnects; i++) {
containerRuntime.setConnectionState(false);
containerRuntime.setConnectionState(true);
containerRuntime.process({
type: "op",
clientId: "clientId",
sequenceNumber: 0,
contents: {
address: "address",
},
} as any as ISequencedDocumentMessage, false /* local */);
}

const error = getFirstContainerError();
assert.ok(error instanceof GenericError);
assert.strictEqual(error.fluidErrorCode, "MaxReconnectsWithNoProgress");
assert.strictEqual(error.getTelemetryProperties().attempts, maxReconnects);
mockLogger.assertMatchAny([{
eventName: "ContainerRuntime:ReconnectsWithNoProgress",
attempts: 7,
}]);
});
});
});
});

0 comments on commit df53d66

Please sign in to comment.