From c93932c0779cbbe77d1e05d33430ff649c84d9cb Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 9 Sep 2024 11:43:36 +0300 Subject: [PATCH 1/3] maxInspectionCount --- .../src/plugins/operator/OperatorPlugin.ts | 7 ++++++- .../src/plugins/operator/config.schema.json | 20 +++++++++++++++++++ .../src/plugins/operator/inspectOverTime.ts | 14 ++++++------- .../src/plugins/operator/inspectRandomNode.ts | 3 ++- .../src/plugins/operator/reviewSuspectNode.ts | 6 +++--- 5 files changed, 38 insertions(+), 12 deletions(-) diff --git a/packages/node/src/plugins/operator/OperatorPlugin.ts b/packages/node/src/plugins/operator/OperatorPlugin.ts index 7c4ae64f8c..771e90f132 100644 --- a/packages/node/src/plugins/operator/OperatorPlugin.ts +++ b/packages/node/src/plugins/operator/OperatorPlugin.ts @@ -57,6 +57,10 @@ export interface OperatorPluginConfig { } inspectRandomNode: { intervalInMs: number + maxInspectionCount: number + } + reviewSuspectNode: { + maxInspectionCount: number } closeExpiredFlags: { intervalInMs: number @@ -201,6 +205,7 @@ export class OperatorPlugin extends Plugin { streamPartAssignments, streamrClient, this.pluginConfig.heartbeatTimeoutInMs, + this.pluginConfig.inspectRandomNode.maxInspectionCount, async (targetOperatorContractAddress) => { return streamrClient.getOperator(targetOperatorContractAddress).fetchRedundancyFactor() }, @@ -247,7 +252,7 @@ export class OperatorPlugin extends Plugin { endTime: event.votingPeriodEndTimestamp }, inspectionIntervalInMs: 8 * 60 * 1000, - maxInspections: 10, + maxInspectionCount: this.pluginConfig.reviewSuspectNode.maxInspectionCount, abortSignal: this.abortController.signal }) } diff --git a/packages/node/src/plugins/operator/config.schema.json b/packages/node/src/plugins/operator/config.schema.json index d1e4ffc33b..7ac42582bf 100644 --- a/packages/node/src/plugins/operator/config.schema.json +++ b/packages/node/src/plugins/operator/config.schema.json @@ -125,6 +125,26 @@ "description": "How often to run (in milliseconds)", "minimum": 0, "default": 900000 + }, + "maxInspectionCount": { + "type": "integer", + "description": "How many rounds of analysis are performed at most", + "minimum": 1, + "default": 10 + } + } + }, + "reviewSuspectNode": { + "type": "object", + "description": "Review suspect node settings", + "additionalProperties": false, + "default": {}, + "properties": { + "maxInspectionCount": { + "type": "integer", + "description": "How many rounds of analysis are performed at most", + "minimum": 1, + "default": 10 } } }, diff --git a/packages/node/src/plugins/operator/inspectOverTime.ts b/packages/node/src/plugins/operator/inspectOverTime.ts index 54fd77f08c..17d530715f 100644 --- a/packages/node/src/plugins/operator/inspectOverTime.ts +++ b/packages/node/src/plugins/operator/inspectOverTime.ts @@ -19,7 +19,7 @@ interface InspectOverTimeOpts { sleepTimeInMsBeforeFirstInspection: number heartbeatTimeoutInMs: number inspectionIntervalInMs: number - maxInspections: number + maxInspectionCount: number waitUntilPassOrDone: boolean abortSignal: AbortSignal traceId: string @@ -47,7 +47,7 @@ class InspectionOverTimeTask { private readonly sleepTimeInMsBeforeFirstInspection: number private readonly heartbeatTimeoutInMs: number private readonly inspectionIntervalInMs: number - private readonly maxInspections: number + private readonly maxInspectionCount: number private readonly abortSignal: AbortSignal private readonly findNodesForTargetGivenFleetStateFn: FindNodesForTargetGivenFleetStateFn private readonly inspectTargetFn: InspectTargetFn @@ -67,7 +67,7 @@ class InspectionOverTimeTask { sleepTimeInMsBeforeFirstInspection, heartbeatTimeoutInMs, inspectionIntervalInMs, - maxInspections, + maxInspectionCount, abortSignal: userAbortSignal, traceId, findNodesForTargetGivenFleetStateFn = findNodesForTargetGivenFleetState, @@ -80,7 +80,7 @@ class InspectionOverTimeTask { this.sleepTimeInMsBeforeFirstInspection = sleepTimeInMsBeforeFirstInspection this.heartbeatTimeoutInMs = heartbeatTimeoutInMs this.inspectionIntervalInMs = inspectionIntervalInMs - this.maxInspections = maxInspections + this.maxInspectionCount = maxInspectionCount this.abortSignal = composeAbortSignals(userAbortSignal, this.abortController.signal) this.findNodesForTargetGivenFleetStateFn = findNodesForTargetGivenFleetStateFn this.inspectTargetFn = inspectTargetFn @@ -124,7 +124,7 @@ class InspectionOverTimeTask { target: this.target, heartbeatTimeoutInMs: this.heartbeatTimeoutInMs, inspectionIntervalInMs: this.inspectionIntervalInMs, - maxInspections: this.maxInspections + maxInspectionCount: this.maxInspectionCount }) await this.initializeNewOperatorFleetState() @@ -132,7 +132,7 @@ class InspectionOverTimeTask { this.logger.info('Sleep', { timeInMs: this.sleepTimeInMsBeforeFirstInspection }) await wait(this.sleepTimeInMsBeforeFirstInspection, this.abortSignal) - for (const attemptNo of range(1, this.maxInspections + 1)) { + for (const attemptNo of range(1, this.maxInspectionCount + 1)) { const startTime = Date.now() this.logger.info('Inspecting target', { attemptNo, target: this.target }) @@ -162,7 +162,7 @@ class InspectionOverTimeTask { target: this.target }) - if (attemptNo !== this.maxInspections) { + if (attemptNo !== this.maxInspectionCount) { // TODO: remove when NET-1169 landed; // workaround subscribe bug in @streamr/sdk (sometimes messages don't come thru to heartbeat stream) if (this.fleetState?.getNodeIds().length === 0) { diff --git a/packages/node/src/plugins/operator/inspectRandomNode.ts b/packages/node/src/plugins/operator/inspectRandomNode.ts index 9cecffb828..729c7db169 100644 --- a/packages/node/src/plugins/operator/inspectRandomNode.ts +++ b/packages/node/src/plugins/operator/inspectRandomNode.ts @@ -11,6 +11,7 @@ export async function inspectRandomNode( assignments: StreamPartAssignments, streamrClient: StreamrClient, heartbeatTimeoutInMs: number, + maxInspectionCount: number, getRedundancyFactor: (operatorContractAddress: EthereumAddress) => Promise, createOperatorFleetState: CreateOperatorFleetStateFn, abortSignal: AbortSignal, @@ -34,7 +35,7 @@ export async function inspectRandomNode( sleepTimeInMsBeforeFirstInspection: 0, heartbeatTimeoutInMs, inspectionIntervalInMs: 8 * 60 * 1000, - maxInspections: 10, + maxInspectionCount, waitUntilPassOrDone: true, abortSignal, traceId diff --git a/packages/node/src/plugins/operator/reviewSuspectNode.ts b/packages/node/src/plugins/operator/reviewSuspectNode.ts index 8da510b80f..46a714acb4 100644 --- a/packages/node/src/plugins/operator/reviewSuspectNode.ts +++ b/packages/node/src/plugins/operator/reviewSuspectNode.ts @@ -21,7 +21,7 @@ export interface ReviewProcessOpts { endTime: number } inspectionIntervalInMs: number - maxInspections: number + maxInspectionCount: number abortSignal: AbortSignal } @@ -37,7 +37,7 @@ export const reviewSuspectNode = async ({ heartbeatTimeoutInMs, votingPeriod, inspectionIntervalInMs, - maxInspections, + maxInspectionCount, abortSignal }: ReviewProcessOpts): Promise => { if (Date.now() + maxSleepTime > votingPeriod.startTime) { @@ -58,7 +58,7 @@ export const reviewSuspectNode = async ({ sleepTimeInMsBeforeFirstInspection, heartbeatTimeoutInMs, inspectionIntervalInMs, - maxInspections, + maxInspectionCount, waitUntilPassOrDone: false, abortSignal, traceId: randomString(6) From 5b5c207d3423ae4760512f09cd770294bc3411a8 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 9 Sep 2024 11:43:36 +0300 Subject: [PATCH 2/3] maxDelayBeforeFirstInspectionInMs --- .../node/src/plugins/operator/OperatorPlugin.ts | 3 ++- .../node/src/plugins/operator/config.schema.json | 6 ++++++ .../node/src/plugins/operator/inspectOverTime.ts | 12 ++++++------ .../node/src/plugins/operator/inspectRandomNode.ts | 2 +- .../node/src/plugins/operator/reviewSuspectNode.ts | 14 +++++++------- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/packages/node/src/plugins/operator/OperatorPlugin.ts b/packages/node/src/plugins/operator/OperatorPlugin.ts index 771e90f132..31ce020f4c 100644 --- a/packages/node/src/plugins/operator/OperatorPlugin.ts +++ b/packages/node/src/plugins/operator/OperatorPlugin.ts @@ -61,6 +61,7 @@ export interface OperatorPluginConfig { } reviewSuspectNode: { maxInspectionCount: number + maxDelayBeforeFirstInspectionInMs: number } closeExpiredFlags: { intervalInMs: number @@ -245,7 +246,7 @@ export class OperatorPlugin extends Plugin { getRedundancyFactor: async (targetOperatorContractAddress) => { return streamrClient.getOperator(targetOperatorContractAddress).fetchRedundancyFactor() }, - maxSleepTime: 5 * 60 * 1000, + maxDelayBeforeFirstInspectionInMs: this.pluginConfig.reviewSuspectNode.maxDelayBeforeFirstInspectionInMs, heartbeatTimeoutInMs: this.pluginConfig.heartbeatTimeoutInMs, votingPeriod: { startTime: event.votingPeriodStartTimestamp, diff --git a/packages/node/src/plugins/operator/config.schema.json b/packages/node/src/plugins/operator/config.schema.json index 7ac42582bf..6ab3eefcce 100644 --- a/packages/node/src/plugins/operator/config.schema.json +++ b/packages/node/src/plugins/operator/config.schema.json @@ -145,6 +145,12 @@ "description": "How many rounds of analysis are performed at most", "minimum": 1, "default": 10 + }, + "maxDelayBeforeFirstInspectionInMs": { + "type": "integer", + "description": "The maximum time (in milliseconds) to wait before the 1st inspection round", + "minimum": 0, + "default": 300000 } } }, diff --git a/packages/node/src/plugins/operator/inspectOverTime.ts b/packages/node/src/plugins/operator/inspectOverTime.ts index 17d530715f..6aee5f6793 100644 --- a/packages/node/src/plugins/operator/inspectOverTime.ts +++ b/packages/node/src/plugins/operator/inspectOverTime.ts @@ -16,7 +16,7 @@ interface InspectOverTimeOpts { streamrClient: StreamrClient createOperatorFleetState: CreateOperatorFleetStateFn getRedundancyFactor: (operatorContractAddress: EthereumAddress) => Promise - sleepTimeInMsBeforeFirstInspection: number + delayBeforeFirstInspectionInMs: number heartbeatTimeoutInMs: number inspectionIntervalInMs: number maxInspectionCount: number @@ -44,7 +44,7 @@ class InspectionOverTimeTask { private readonly streamrClient: StreamrClient private readonly createOperatorFleetState: CreateOperatorFleetStateFn private readonly getRedundancyFactor: (operatorContractAddress: EthereumAddress) => Promise - private readonly sleepTimeInMsBeforeFirstInspection: number + private readonly delayBeforeFirstInspectionInMs: number private readonly heartbeatTimeoutInMs: number private readonly inspectionIntervalInMs: number private readonly maxInspectionCount: number @@ -64,7 +64,7 @@ class InspectionOverTimeTask { streamrClient, createOperatorFleetState, getRedundancyFactor, - sleepTimeInMsBeforeFirstInspection, + delayBeforeFirstInspectionInMs, heartbeatTimeoutInMs, inspectionIntervalInMs, maxInspectionCount, @@ -77,7 +77,7 @@ class InspectionOverTimeTask { this.streamrClient = streamrClient this.createOperatorFleetState = createOperatorFleetState this.getRedundancyFactor = getRedundancyFactor - this.sleepTimeInMsBeforeFirstInspection = sleepTimeInMsBeforeFirstInspection + this.delayBeforeFirstInspectionInMs = delayBeforeFirstInspectionInMs this.heartbeatTimeoutInMs = heartbeatTimeoutInMs this.inspectionIntervalInMs = inspectionIntervalInMs this.maxInspectionCount = maxInspectionCount @@ -129,8 +129,8 @@ class InspectionOverTimeTask { await this.initializeNewOperatorFleetState() - this.logger.info('Sleep', { timeInMs: this.sleepTimeInMsBeforeFirstInspection }) - await wait(this.sleepTimeInMsBeforeFirstInspection, this.abortSignal) + this.logger.info('Sleep', { timeInMs: this.delayBeforeFirstInspectionInMs }) + await wait(this.delayBeforeFirstInspectionInMs, this.abortSignal) for (const attemptNo of range(1, this.maxInspectionCount + 1)) { const startTime = Date.now() diff --git a/packages/node/src/plugins/operator/inspectRandomNode.ts b/packages/node/src/plugins/operator/inspectRandomNode.ts index 729c7db169..9ce7096ccb 100644 --- a/packages/node/src/plugins/operator/inspectRandomNode.ts +++ b/packages/node/src/plugins/operator/inspectRandomNode.ts @@ -32,7 +32,7 @@ export async function inspectRandomNode( streamrClient, createOperatorFleetState, getRedundancyFactor, - sleepTimeInMsBeforeFirstInspection: 0, + delayBeforeFirstInspectionInMs: 0, heartbeatTimeoutInMs, inspectionIntervalInMs: 8 * 60 * 1000, maxInspectionCount, diff --git a/packages/node/src/plugins/operator/reviewSuspectNode.ts b/packages/node/src/plugins/operator/reviewSuspectNode.ts index 46a714acb4..4ec8857e6f 100644 --- a/packages/node/src/plugins/operator/reviewSuspectNode.ts +++ b/packages/node/src/plugins/operator/reviewSuspectNode.ts @@ -14,7 +14,7 @@ export interface ReviewProcessOpts { streamrClient: StreamrClient createOperatorFleetState: CreateOperatorFleetStateFn getRedundancyFactor: (operatorContractAddress: EthereumAddress) => Promise - maxSleepTime: number + maxDelayBeforeFirstInspectionInMs: number heartbeatTimeoutInMs: number votingPeriod: { startTime: number @@ -33,19 +33,19 @@ export const reviewSuspectNode = async ({ streamrClient, createOperatorFleetState, getRedundancyFactor, - maxSleepTime, + maxDelayBeforeFirstInspectionInMs, heartbeatTimeoutInMs, votingPeriod, inspectionIntervalInMs, maxInspectionCount, abortSignal }: ReviewProcessOpts): Promise => { - if (Date.now() + maxSleepTime > votingPeriod.startTime) { - throw new Error('Max sleep time overlaps with voting period') + if (Date.now() + maxDelayBeforeFirstInspectionInMs > votingPeriod.startTime) { + throw new Error('Max delay time overlaps with voting period') } const streamId = await myOperator.getStreamId(sponsorshipAddress) - // random sleep time to make sure multiple instances of voters don't all inspect at the same time - const sleepTimeInMsBeforeFirstInspection = random(maxSleepTime) + // random wait time to make sure multiple instances of voters don't all inspect at the same time + const delayBeforeFirstInspectionInMs = random(maxDelayBeforeFirstInspectionInMs) const consumeResults = inspectOverTime({ target: { sponsorshipAddress: sponsorshipAddress, @@ -55,7 +55,7 @@ export const reviewSuspectNode = async ({ streamrClient, createOperatorFleetState, getRedundancyFactor, - sleepTimeInMsBeforeFirstInspection, + delayBeforeFirstInspectionInMs, heartbeatTimeoutInMs, inspectionIntervalInMs, maxInspectionCount, From c5cf77c10e00c99c00520bc81798b9424648933e Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 9 Sep 2024 11:43:36 +0300 Subject: [PATCH 3/3] change log level --- packages/node/src/plugins/operator/inspectOverTime.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/node/src/plugins/operator/inspectOverTime.ts b/packages/node/src/plugins/operator/inspectOverTime.ts index 6aee5f6793..e88b7c9557 100644 --- a/packages/node/src/plugins/operator/inspectOverTime.ts +++ b/packages/node/src/plugins/operator/inspectOverTime.ts @@ -129,7 +129,7 @@ class InspectionOverTimeTask { await this.initializeNewOperatorFleetState() - this.logger.info('Sleep', { timeInMs: this.delayBeforeFirstInspectionInMs }) + this.logger.debug('Sleep', { timeInMs: this.delayBeforeFirstInspectionInMs }) await wait(this.delayBeforeFirstInspectionInMs, this.abortSignal) for (const attemptNo of range(1, this.maxInspectionCount + 1)) { @@ -176,7 +176,7 @@ class InspectionOverTimeTask { } const sleepTime = Math.max(this.inspectionIntervalInMs - timeElapsedInMs, 0) - this.logger.info('Sleep', { timeInMs: sleepTime }) + this.logger.debug('Sleep', { timeInMs: sleepTime }) await wait(sleepTime, this.abortSignal) } }