Skip to content

Commit

Permalink
refactor: fix totalSampled formula + housekeeping per vlad and rene
Browse files Browse the repository at this point in the history
  • Loading branch information
derrandz committed Jan 27, 2023
1 parent a73e16c commit 54caa2e
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 25 deletions.
15 changes: 5 additions & 10 deletions das/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,11 @@ func (c checkpoint) String() string {
return str
}

// WorkersTotalSampled returns the total amount of sampled headers
func (c checkpoint) WorkersTotalSampled() int {
var total int
// totalSampled returns the total amount of sampled headers
func (c checkpoint) totalSampled() int {
totalInProgress := 0
for _, w := range c.Workers {
total += int(w.To - w.From)
totalInProgress += int(w.To-w.From) + 1
}
return total
}

// TotalSampled returns the total amount of sampled headers
func (c checkpoint) TotalSampled() int {
return int(c.SampleFrom) - 1 + c.WorkersTotalSampled() - len(c.Failed)
return int(c.SampleFrom) - totalInProgress - len(c.Failed)
}
2 changes: 1 addition & 1 deletion das/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (sc *samplingCoordinator) run(ctx context.Context, cp checkpoint) {
sc.state.resumeFromCheckpoint(cp)

// the amount of sampled headers from the last checkpoint
totalSampledFromCheckpoint := int64(cp.TotalSampled())
totalSampledFromCheckpoint := int64(cp.totalSampled())
sc.metrics.recordTotalSampled(ctx, totalSampledFromCheckpoint)

// resume workers
Expand Down
15 changes: 12 additions & 3 deletions das/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ type metrics struct {
newHead syncint64.Counter
totalSampled asyncint64.Gauge

lastSampledTS int64
lastSampledTS int64
totalSampledInt int64
}

func (d *DASer) InitMetrics() error {
Expand Down Expand Up @@ -99,7 +100,11 @@ func (d *DASer) InitMetrics() error {

err = meter.RegisterCallback(
[]instrument.Asynchronous{
lastSampledTS, busyWorkers, networkHead, sampledChainHead,
lastSampledTS,
busyWorkers,
networkHead,
sampledChainHead,
totalSampled,
},
func(ctx context.Context) {
stats, err := d.sampler.stats(ctx)
Expand All @@ -114,6 +119,9 @@ func (d *DASer) InitMetrics() error {
if ts := atomic.LoadInt64(&d.sampler.metrics.lastSampledTS); ts != 0 {
lastSampledTS.Observe(ctx, ts)
}

totalSampledInt := atomic.LoadInt64(&d.sampler.metrics.totalSampledInt)
totalSampled.Observe(ctx, totalSampledInt)
},
)

Expand Down Expand Up @@ -164,5 +172,6 @@ func (m *metrics) recordTotalSampled(ctx context.Context, n int64) {
if m == nil {
return
}
m.totalSampled.Observe(ctx, n)
totalSampledInt := atomic.LoadInt64(&m.totalSampledInt)
atomic.StoreInt64(&m.totalSampledInt, totalSampledInt+n)
}
5 changes: 0 additions & 5 deletions nodebuilder/constants/constants.go

This file was deleted.

7 changes: 5 additions & 2 deletions nodebuilder/node/uptime.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"time"

"github.com/celestiaorg/celestia-node/nodebuilder/constants"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
)

// OptlCollectPeriodInSeconds is the metrics collection
// period for the OpenTelemetry callbacks in seconds.
const OptlCollectPeriodInSeconds = 2

// UptimeMetrics is a struct that records
//
// 1. node start time: the timestamp when the node was started
Expand Down Expand Up @@ -69,7 +72,7 @@ func NewUptimeMetrics() (*UptimeMetrics, error) {
totalNodeRunTime,
},
func(ctx context.Context) {
m.totalNodeUpTimeTicks = m.totalNodeUpTimeTicks + constants.OptlCollectPeriodInSeconds
m.totalNodeUpTimeTicks = m.totalNodeUpTimeTicks + OptlCollectPeriodInSeconds
totalNodeRunTime.Observe(ctx, float64(m.totalNodeUpTimeTicks))
},
)
Expand Down
5 changes: 1 addition & 4 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ import (
"github.com/celestiaorg/celestia-node/state"
)

// The re-metering period for optl callbacks
const optlCollectPeriodInSeconds = 2

// WithNetwork specifies the Network to which the Node should connect to.
// WARNING: Use this option with caution and never run the Node with different networks over the
// same persisted Store.
Expand Down Expand Up @@ -95,7 +92,7 @@ func initializeMetrics(
exp,
),
controller.WithExporter(exp),
controller.WithCollectPeriod(optlCollectPeriodInSeconds*time.Second),
controller.WithCollectPeriod(node.OptlCollectPeriodInSeconds*time.Second),
controller.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(fmt.Sprintf("Celestia-%s", nodeType.String())),
Expand Down

0 comments on commit 54caa2e

Please sign in to comment.