Skip to content

Commit

Permalink
Merge pull request #22 from sdevkc/main
Browse files Browse the repository at this point in the history
Check progress events for inactivity
  • Loading branch information
DenisCarriere authored Feb 12, 2024
2 parents ac116cf + 5a2d7ca commit b9444ff
Showing 1 changed file with 27 additions and 2 deletions.
29 changes: 27 additions & 2 deletions src/inactivitySeconds.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { BlockEmitter } from "@substreams/node";
import { setTimeout } from "timers/promises";
import { logger } from "./logger.js";
import { substreams_sink_progress_message } from "./prometheus.js";

Check failure on line 4 in src/inactivitySeconds.ts

View workflow job for this annotation

GitHub Actions / build

Module '"./prometheus.js"' has no exported member 'substreams_sink_progress_message'.

Check failure on line 4 in src/inactivitySeconds.ts

View workflow job for this annotation

GitHub Actions / build

Module '"./prometheus.js"' has no exported member 'substreams_sink_progress_message'.

const CHECK_INACTIVITY_INTERVAL = 1000;

Expand All @@ -18,13 +19,37 @@ export function onInactivitySeconds(emitter: BlockEmitter, inactivitySeconds: nu
checkInactivity();
}

// Check for inactivity after starting
emitter.on("anyMessage", (message, cursor, clock) => {
// Check clock events for inactivity after starting
emitter.on("clock", (clock) => {
lastUpdate = now();
if (hasStopBlock && clock.number >= emitter.request.stopBlockNum - 1n) {
isFinished = true;
}
});

emitter.on("close", error => {
if ( error ) {
console.error(error);
process.exit(1); // force quit
}
lastUpdate = now();
isFinished = true;
});

emitter.on("fatalError", error => {
console.error(error);
process.exit(1); // force quit
});

// Check progress events for inactivity after starting
emitter.on("progress", (progress) => {
const totalBytesRead = Number(progress.processedBytes?.totalBytesRead ?? 0);
if (totalBytesRead > 0) {
lastUpdate = now();
substreams_sink_progress_message?.inc(totalBytesRead);
}
});

checkInactivity();
}

Expand Down

0 comments on commit b9444ff

Please sign in to comment.