Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(node): OperatorPlugin inspection config options #2742

Merged
merged 3 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions packages/node/src/plugins/operator/OperatorPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ export interface OperatorPluginConfig {
}
inspectRandomNode: {
intervalInMs: number
maxInspectionCount: number
}
reviewSuspectNode: {
maxInspectionCount: number
maxDelayBeforeFirstInspectionInMs: number
}
closeExpiredFlags: {
intervalInMs: number
Expand Down Expand Up @@ -201,6 +206,7 @@ export class OperatorPlugin extends Plugin<OperatorPluginConfig> {
streamPartAssignments,
streamrClient,
this.pluginConfig.heartbeatTimeoutInMs,
this.pluginConfig.inspectRandomNode.maxInspectionCount,
async (targetOperatorContractAddress) => {
return streamrClient.getOperator(targetOperatorContractAddress).fetchRedundancyFactor()
},
Expand Down Expand Up @@ -240,14 +246,14 @@ export class OperatorPlugin extends Plugin<OperatorPluginConfig> {
getRedundancyFactor: async (targetOperatorContractAddress) => {
return streamrClient.getOperator(targetOperatorContractAddress).fetchRedundancyFactor()
},
maxSleepTime: 5 * 60 * 1000,
maxDelayBeforeFirstInspectionInMs: this.pluginConfig.reviewSuspectNode.maxDelayBeforeFirstInspectionInMs,
heartbeatTimeoutInMs: this.pluginConfig.heartbeatTimeoutInMs,
votingPeriod: {
startTime: event.votingPeriodStartTimestamp,
endTime: event.votingPeriodEndTimestamp
},
inspectionIntervalInMs: 8 * 60 * 1000,
maxInspections: 10,
maxInspectionCount: this.pluginConfig.reviewSuspectNode.maxInspectionCount,
abortSignal: this.abortController.signal
})
}
Expand Down
26 changes: 26 additions & 0 deletions packages/node/src/plugins/operator/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,32 @@
"description": "How often to run (in milliseconds)",
"minimum": 0,
"default": 900000
},
"maxInspectionCount": {
"type": "integer",
"description": "How many rounds of analysis are performed at most",
"minimum": 1,
"default": 10
}
}
},
"reviewSuspectNode": {
"type": "object",
"description": "Review suspect node settings",
"additionalProperties": false,
"default": {},
"properties": {
"maxInspectionCount": {
"type": "integer",
"description": "How many rounds of analysis are performed at most",
"minimum": 1,
"default": 10
},
"maxDelayBeforeFirstInspectionInMs": {
"type": "integer",
"description": "The maximum time (in milliseconds) to wait before the 1st inspection round",
"minimum": 0,
"default": 300000
}
}
},
Expand Down
28 changes: 14 additions & 14 deletions packages/node/src/plugins/operator/inspectOverTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ interface InspectOverTimeOpts {
streamrClient: StreamrClient
createOperatorFleetState: CreateOperatorFleetStateFn
getRedundancyFactor: (operatorContractAddress: EthereumAddress) => Promise<number | undefined>
sleepTimeInMsBeforeFirstInspection: number
delayBeforeFirstInspectionInMs: number
heartbeatTimeoutInMs: number
inspectionIntervalInMs: number
maxInspections: number
maxInspectionCount: number
waitUntilPassOrDone: boolean
abortSignal: AbortSignal
traceId: string
Expand All @@ -44,10 +44,10 @@ class InspectionOverTimeTask {
private readonly streamrClient: StreamrClient
private readonly createOperatorFleetState: CreateOperatorFleetStateFn
private readonly getRedundancyFactor: (operatorContractAddress: EthereumAddress) => Promise<number | undefined>
private readonly sleepTimeInMsBeforeFirstInspection: number
private readonly delayBeforeFirstInspectionInMs: number
private readonly heartbeatTimeoutInMs: number
private readonly inspectionIntervalInMs: number
private readonly maxInspections: number
private readonly maxInspectionCount: number
private readonly abortSignal: AbortSignal
private readonly findNodesForTargetGivenFleetStateFn: FindNodesForTargetGivenFleetStateFn
private readonly inspectTargetFn: InspectTargetFn
Expand All @@ -64,10 +64,10 @@ class InspectionOverTimeTask {
streamrClient,
createOperatorFleetState,
getRedundancyFactor,
sleepTimeInMsBeforeFirstInspection,
delayBeforeFirstInspectionInMs,
heartbeatTimeoutInMs,
inspectionIntervalInMs,
maxInspections,
maxInspectionCount,
abortSignal: userAbortSignal,
traceId,
findNodesForTargetGivenFleetStateFn = findNodesForTargetGivenFleetState,
Expand All @@ -77,10 +77,10 @@ class InspectionOverTimeTask {
this.streamrClient = streamrClient
this.createOperatorFleetState = createOperatorFleetState
this.getRedundancyFactor = getRedundancyFactor
this.sleepTimeInMsBeforeFirstInspection = sleepTimeInMsBeforeFirstInspection
this.delayBeforeFirstInspectionInMs = delayBeforeFirstInspectionInMs
this.heartbeatTimeoutInMs = heartbeatTimeoutInMs
this.inspectionIntervalInMs = inspectionIntervalInMs
this.maxInspections = maxInspections
this.maxInspectionCount = maxInspectionCount
this.abortSignal = composeAbortSignals(userAbortSignal, this.abortController.signal)
this.findNodesForTargetGivenFleetStateFn = findNodesForTargetGivenFleetStateFn
this.inspectTargetFn = inspectTargetFn
Expand Down Expand Up @@ -124,15 +124,15 @@ class InspectionOverTimeTask {
target: this.target,
heartbeatTimeoutInMs: this.heartbeatTimeoutInMs,
inspectionIntervalInMs: this.inspectionIntervalInMs,
maxInspections: this.maxInspections
maxInspectionCount: this.maxInspectionCount
})

await this.initializeNewOperatorFleetState()

this.logger.info('Sleep', { timeInMs: this.sleepTimeInMsBeforeFirstInspection })
await wait(this.sleepTimeInMsBeforeFirstInspection, this.abortSignal)
this.logger.debug('Sleep', { timeInMs: this.delayBeforeFirstInspectionInMs })
await wait(this.delayBeforeFirstInspectionInMs, this.abortSignal)

for (const attemptNo of range(1, this.maxInspections + 1)) {
for (const attemptNo of range(1, this.maxInspectionCount + 1)) {
const startTime = Date.now()
this.logger.info('Inspecting target', { attemptNo, target: this.target })

Expand Down Expand Up @@ -162,7 +162,7 @@ class InspectionOverTimeTask {
target: this.target
})

if (attemptNo !== this.maxInspections) {
if (attemptNo !== this.maxInspectionCount) {
// TODO: remove when NET-1169 landed;
// workaround subscribe bug in @streamr/sdk (sometimes messages don't come thru to heartbeat stream)
if (this.fleetState?.getNodeIds().length === 0) {
Expand All @@ -176,7 +176,7 @@ class InspectionOverTimeTask {
}

const sleepTime = Math.max(this.inspectionIntervalInMs - timeElapsedInMs, 0)
this.logger.info('Sleep', { timeInMs: sleepTime })
this.logger.debug('Sleep', { timeInMs: sleepTime })
await wait(sleepTime, this.abortSignal)
}
}
Expand Down
5 changes: 3 additions & 2 deletions packages/node/src/plugins/operator/inspectRandomNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export async function inspectRandomNode(
assignments: StreamPartAssignments,
streamrClient: StreamrClient,
heartbeatTimeoutInMs: number,
maxInspectionCount: number,
getRedundancyFactor: (operatorContractAddress: EthereumAddress) => Promise<number | undefined>,
createOperatorFleetState: CreateOperatorFleetStateFn,
abortSignal: AbortSignal,
Expand All @@ -31,10 +32,10 @@ export async function inspectRandomNode(
streamrClient,
createOperatorFleetState,
getRedundancyFactor,
sleepTimeInMsBeforeFirstInspection: 0,
delayBeforeFirstInspectionInMs: 0,
heartbeatTimeoutInMs,
inspectionIntervalInMs: 8 * 60 * 1000,
maxInspections: 10,
maxInspectionCount,
waitUntilPassOrDone: true,
abortSignal,
traceId
Expand Down
20 changes: 10 additions & 10 deletions packages/node/src/plugins/operator/reviewSuspectNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ export interface ReviewProcessOpts {
streamrClient: StreamrClient
createOperatorFleetState: CreateOperatorFleetStateFn
getRedundancyFactor: (operatorContractAddress: EthereumAddress) => Promise<number | undefined>
maxSleepTime: number
maxDelayBeforeFirstInspectionInMs: number
heartbeatTimeoutInMs: number
votingPeriod: {
startTime: number
endTime: number
}
inspectionIntervalInMs: number
maxInspections: number
maxInspectionCount: number
abortSignal: AbortSignal
}

Expand All @@ -33,19 +33,19 @@ export const reviewSuspectNode = async ({
streamrClient,
createOperatorFleetState,
getRedundancyFactor,
maxSleepTime,
maxDelayBeforeFirstInspectionInMs,
heartbeatTimeoutInMs,
votingPeriod,
inspectionIntervalInMs,
maxInspections,
maxInspectionCount,
abortSignal
}: ReviewProcessOpts): Promise<void> => {
if (Date.now() + maxSleepTime > votingPeriod.startTime) {
throw new Error('Max sleep time overlaps with voting period')
if (Date.now() + maxDelayBeforeFirstInspectionInMs > votingPeriod.startTime) {
throw new Error('Max delay time overlaps with voting period')
}
const streamId = await myOperator.getStreamId(sponsorshipAddress)
// random sleep time to make sure multiple instances of voters don't all inspect at the same time
const sleepTimeInMsBeforeFirstInspection = random(maxSleepTime)
// random wait time to make sure multiple instances of voters don't all inspect at the same time
const delayBeforeFirstInspectionInMs = random(maxDelayBeforeFirstInspectionInMs)
const consumeResults = inspectOverTime({
target: {
sponsorshipAddress: sponsorshipAddress,
Expand All @@ -55,10 +55,10 @@ export const reviewSuspectNode = async ({
streamrClient,
createOperatorFleetState,
getRedundancyFactor,
sleepTimeInMsBeforeFirstInspection,
delayBeforeFirstInspectionInMs,
heartbeatTimeoutInMs,
inspectionIntervalInMs,
maxInspections,
maxInspectionCount,
waitUntilPassOrDone: false,
abortSignal,
traceId: randomString(6)
Expand Down
Loading