diff --git a/das/checkpoint.go b/das/checkpoint.go index f712aa1ad8..4a3d4f3989 100644 --- a/das/checkpoint.go +++ b/das/checkpoint.go @@ -49,16 +49,11 @@ func (c checkpoint) String() string { return str } -// WorkersTotalSampled returns the total amount of sampled headers -func (c checkpoint) WorkersTotalSampled() int { - var total int +// totalSampled returns the total amount of sampled headers +func (c checkpoint) totalSampled() int { + totalInProgress := 0 for _, w := range c.Workers { - total += int(w.To - w.From) + totalInProgress += int(w.To-w.From) + 1 } - return total -} - -// TotalSampled returns the total amount of sampled headers -func (c checkpoint) TotalSampled() int { - return int(c.SampleFrom) - 1 + c.WorkersTotalSampled() - len(c.Failed) + return int(c.SampleFrom) - totalInProgress - len(c.Failed) } diff --git a/das/coordinator.go b/das/coordinator.go index 21b71f208a..d623b0e86e 100644 --- a/das/coordinator.go +++ b/das/coordinator.go @@ -57,7 +57,7 @@ func (sc *samplingCoordinator) run(ctx context.Context, cp checkpoint) { sc.state.resumeFromCheckpoint(cp) // the amount of sampled headers from the last checkpoint - totalSampledFromCheckpoint := int64(cp.TotalSampled()) + totalSampledFromCheckpoint := int64(cp.totalSampled()) sc.metrics.recordTotalSampled(ctx, totalSampledFromCheckpoint) // resume workers diff --git a/das/metrics.go b/das/metrics.go index 092fecd51f..9115718059 100644 --- a/das/metrics.go +++ b/das/metrics.go @@ -27,7 +27,8 @@ type metrics struct { newHead syncint64.Counter totalSampled asyncint64.Gauge - lastSampledTS int64 + lastSampledTS int64 + totalSampledInt int64 } func (d *DASer) InitMetrics() error { @@ -99,7 +100,11 @@ func (d *DASer) InitMetrics() error { err = meter.RegisterCallback( []instrument.Asynchronous{ - lastSampledTS, busyWorkers, networkHead, sampledChainHead, + lastSampledTS, + busyWorkers, + networkHead, + sampledChainHead, + totalSampled, }, func(ctx context.Context) { stats, err := d.sampler.stats(ctx) @@ -114,6 +119,9 @@ func (d *DASer) InitMetrics() error { if ts := atomic.LoadInt64(&d.sampler.metrics.lastSampledTS); ts != 0 { lastSampledTS.Observe(ctx, ts) } + + totalSampledInt := atomic.LoadInt64(&d.sampler.metrics.totalSampledInt) + totalSampled.Observe(ctx, totalSampledInt) }, ) @@ -164,5 +172,6 @@ func (m *metrics) recordTotalSampled(ctx context.Context, n int64) { if m == nil { return } - m.totalSampled.Observe(ctx, n) + totalSampledInt := atomic.LoadInt64(&m.totalSampledInt) + atomic.StoreInt64(&m.totalSampledInt, totalSampledInt+n) } diff --git a/nodebuilder/constants/constants.go b/nodebuilder/constants/constants.go deleted file mode 100644 index e62ff35314..0000000000 --- a/nodebuilder/constants/constants.go +++ /dev/null @@ -1,5 +0,0 @@ -package constants - -// OptlCollectPeriodInSeconds is the metrics collection -// period for the OpenTelemetry callbacks in seconds. -const OptlCollectPeriodInSeconds = 2 diff --git a/nodebuilder/node/uptime.go b/nodebuilder/node/uptime.go index 580fc85e40..4782d4bdb7 100644 --- a/nodebuilder/node/uptime.go +++ b/nodebuilder/node/uptime.go @@ -4,12 +4,15 @@ import ( "context" "time" - "github.com/celestiaorg/celestia-node/nodebuilder/constants" "go.opentelemetry.io/otel/metric/global" "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/metric/instrument/asyncfloat64" ) +// OptlCollectPeriodInSeconds is the metrics collection +// period for the OpenTelemetry callbacks in seconds. +const OptlCollectPeriodInSeconds = 2 + // UptimeMetrics is a struct that records // // 1. node start time: the timestamp when the node was started @@ -69,7 +72,7 @@ func NewUptimeMetrics() (*UptimeMetrics, error) { totalNodeRunTime, }, func(ctx context.Context) { - m.totalNodeUpTimeTicks = m.totalNodeUpTimeTicks + constants.OptlCollectPeriodInSeconds + m.totalNodeUpTimeTicks = m.totalNodeUpTimeTicks + OptlCollectPeriodInSeconds totalNodeRunTime.Observe(ctx, float64(m.totalNodeUpTimeTicks)) }, ) diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index 9a201f167a..a0f7d89233 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -23,9 +23,6 @@ import ( "github.com/celestiaorg/celestia-node/state" ) -// The re-metering period for optl callbacks -const optlCollectPeriodInSeconds = 2 - // WithNetwork specifies the Network to which the Node should connect to. // WARNING: Use this option with caution and never run the Node with different networks over the // same persisted Store. @@ -95,7 +92,7 @@ func initializeMetrics( exp, ), controller.WithExporter(exp), - controller.WithCollectPeriod(optlCollectPeriodInSeconds*time.Second), + controller.WithCollectPeriod(node.OptlCollectPeriodInSeconds*time.Second), controller.WithResource(resource.NewWithAttributes( semconv.SchemaURL, semconv.ServiceNameKey.String(fmt.Sprintf("Celestia-%s", nodeType.String())),