From 9b984425bf12b416415e77ef2f3c10f6c483a593 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 8 Dec 2023 09:46:21 -0600 Subject: [PATCH] cache: capture metrics related to cache records and pruning Signed-off-by: Jonathan A. Sternberg --- cache/manager.go | 32 ++++++++-- cache/metrics.go | 85 +++++++++++++++++++++++++ cmd/buildkitd/main.go | 13 ++-- cmd/buildkitd/main_containerd_worker.go | 1 + cmd/buildkitd/main_oci_worker.go | 1 + util/bklog/log.go | 6 ++ worker/base/worker.go | 3 + 7 files changed, 130 insertions(+), 11 deletions(-) create mode 100644 cache/metrics.go diff --git a/cache/manager.go b/cache/manager.go index c09ada3b9919..aee39fd36109 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -28,6 +28,8 @@ import ( imagespecidentity "github.com/opencontainers/image-spec/identity" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "golang.org/x/sync/errgroup" ) @@ -49,6 +51,7 @@ type ManagerOpt struct { Differ diff.Comparer MetadataStore *metadata.Store MountPoolRoot string + MeterProvider metric.MeterProvider } type Accessor interface { @@ -97,6 +100,7 @@ type cacheManager struct { muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results unlazyG flightcontrol.Group[struct{}] + metrics *metrics } func NewManager(opt ManagerOpt) (Manager, error) { @@ -124,6 +128,12 @@ func NewManager(opt ManagerOpt) (Manager, error) { // cm.scheduleGC(5 * time.Minute) + mp := opt.MeterProvider + if mp == nil { + mp = noop.NewMeterProvider() + } + cm.metrics = newMetrics(cm, mp) + return cm, nil } @@ -339,6 +349,7 @@ func (cm *cacheManager) IdentityMapping() *idtools.IdentityMapping { // method should be called after Close. func (cm *cacheManager) Close() error { // TODO: allocate internal context and cancel it here + _ = cm.metrics.Close() return cm.MetadataStore.Close() } @@ -1000,17 +1011,24 @@ func (cm *cacheManager) createDiffRef(ctx context.Context, parents parentRefs, d } func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo, opts ...client.PruneInfo) error { - cm.muPrune.Lock() + if err := func() error { + record := cm.metrics.MeasurePrune() + defer record(ctx) - for _, opt := range opts { - if err := cm.pruneOnce(ctx, ch, opt); err != nil { - cm.muPrune.Unlock() - return err + cm.muPrune.Lock() + defer cm.muPrune.Unlock() + + for _, opt := range opts { + if err := cm.pruneOnce(ctx, ch, opt); err != nil { + cm.muPrune.Unlock() + return err + } } + return nil + }(); err != nil { + return err } - cm.muPrune.Unlock() - if cm.GarbageCollect != nil { if _, err := cm.GarbageCollect(ctx); err != nil { return err diff --git a/cache/metrics.go b/cache/metrics.go new file mode 100644 index 000000000000..234a3ed77df2 --- /dev/null +++ b/cache/metrics.go @@ -0,0 +1,85 @@ +package cache + +import ( + "context" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" +) + +const ( + instrumentationName = "github.com/moby/buildkit/cache" + metricCacheRecords = "cache.records.count" + metricCachePruneDuration = "cache.prune.duration" +) + +type metrics struct { + CacheRecords metric.Int64ObservableGauge + CachePruneDuration metric.Int64Histogram + meter metric.Meter + regs []metric.Registration +} + +func newMetrics(cm *cacheManager, mp metric.MeterProvider) *metrics { + m := &metrics{} + + var err error + m.meter = mp.Meter(instrumentationName) + + m.CacheRecords, err = m.meter.Int64ObservableGauge(metricCacheRecords, + metric.WithDescription("Number of cache records."), + ) + if err != nil { + otel.Handle(err) + } + + m.CachePruneDuration, err = m.meter.Int64Histogram(metricCachePruneDuration, + metric.WithDescription("Measures the duration of cache prune operations."), + metric.WithUnit("ms"), + ) + if err != nil { + otel.Handle(err) + } + + reg, err := m.meter.RegisterCallback(cm.collectMetrics, m.CacheRecords) + if err != nil { + otel.Handle(err) + } + m.regs = append(m.regs, reg) + + return m +} + +func (m *metrics) MeasurePrune() (record func(ctx context.Context)) { + start := time.Now() + return func(ctx context.Context) { + dur := int64(time.Since(start) / time.Millisecond) + m.CachePruneDuration.Record(ctx, dur) + } +} + +func (m *metrics) Close() error { + for _, reg := range m.regs { + _ = reg.Unregister() + } + return nil +} + +type cacheStats struct { + NumRecords int64 +} + +func (cm *cacheManager) readStats() (stats cacheStats) { + cm.mu.Lock() + defer cm.mu.Unlock() + + stats.NumRecords = int64(len(cm.records)) + return stats +} + +func (cm *cacheManager) collectMetrics(ctx context.Context, o metric.Observer) error { + stats := cm.readStats() + o.ObserveInt64(cm.metrics.CacheRecords, stats.NumRecords) + return nil +} diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index d7d637d5a16b..6fafa45824bd 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -62,6 +62,7 @@ import ( "github.com/urfave/cli" "go.etcd.io/bbolt" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -86,6 +87,9 @@ func init() { // enable in memory recording for buildkitd traces detect.Recorder = detect.NewTraceRecorder() + + // register alternative handler for otel + otel.SetErrorHandler(bklog.OTELErrorHandler{}) } var propagators = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}) @@ -94,6 +98,7 @@ type workerInitializerOpt struct { config *config.Config sessionManager *session.Manager traceSocket string + meterProvider metric.MeterProvider } type workerInitializer struct { @@ -316,7 +321,7 @@ func main() { os.RemoveAll(lockPath) }() - controller, err := newController(c, &cfg) + controller, err := newController(c, &cfg, mp) if err != nil { return err } @@ -711,7 +716,7 @@ func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) { return tlsConf, nil } -func newController(c *cli.Context, cfg *config.Config) (*control.Controller, error) { +func newController(c *cli.Context, cfg *config.Config, mp metric.MeterProvider) (*control.Controller, error) { sessionManager, err := session.NewManager() if err != nil { return nil, err @@ -734,6 +739,7 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err config: cfg, sessionManager: sessionManager, traceSocket: traceSocket, + meterProvider: mp, }) if err != nil { return nil, err @@ -922,8 +928,7 @@ type traceCollector struct { } func (t *traceCollector) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) { - err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans())) - if err != nil { + if err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans())); err != nil { return nil, err } return &tracev1.ExportTraceServiceResponse{}, nil diff --git a/cmd/buildkitd/main_containerd_worker.go b/cmd/buildkitd/main_containerd_worker.go index f77ee95d226b..48b21c1bac58 100644 --- a/cmd/buildkitd/main_containerd_worker.go +++ b/cmd/buildkitd/main_containerd_worker.go @@ -333,6 +333,7 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([ opt.GCPolicy = getGCPolicy(cfg.GCConfig, common.config.Root) opt.BuildkitVersion = getBuildkitVersion() opt.RegistryHosts = resolverFunc(common.config) + opt.MeterProvider = common.meterProvider if platformsStr := cfg.Platforms; len(platformsStr) != 0 { platforms, err := parsePlatforms(platformsStr) diff --git a/cmd/buildkitd/main_oci_worker.go b/cmd/buildkitd/main_oci_worker.go index 72a8309ec7df..e51f1e0d51b7 100644 --- a/cmd/buildkitd/main_oci_worker.go +++ b/cmd/buildkitd/main_oci_worker.go @@ -316,6 +316,7 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker opt.GCPolicy = getGCPolicy(cfg.GCConfig, common.config.Root) opt.BuildkitVersion = getBuildkitVersion() opt.RegistryHosts = hosts + opt.MeterProvider = common.meterProvider if platformsStr := cfg.Platforms; len(platformsStr) != 0 { platforms, err := parsePlatforms(platformsStr) diff --git a/util/bklog/log.go b/util/bklog/log.go index cf6630de19af..e1ddd1587c9d 100644 --- a/util/bklog/log.go +++ b/util/bklog/log.go @@ -73,3 +73,9 @@ func TraceLevelOnlyStack() string { } return "" } + +type OTELErrorHandler struct{} + +func (o OTELErrorHandler) Handle(err error) { + G(context.Background()).Error(err) +} diff --git a/worker/base/worker.go b/worker/base/worker.go index dcea020cb1f8..43fc31d44a08 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -50,6 +50,7 @@ import ( digest "github.com/opencontainers/go-digest" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" + "go.opentelemetry.io/otel/metric" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" ) @@ -81,6 +82,7 @@ type WorkerOpt struct { MetadataStore *metadata.Store MountPoolRoot string ResourceMonitor *resources.Monitor + MeterProvider metric.MeterProvider } // Worker is a local worker instance with dedicated snapshotter, cache, and so on. @@ -111,6 +113,7 @@ func NewWorker(ctx context.Context, opt WorkerOpt) (*Worker, error) { Differ: opt.Differ, MetadataStore: opt.MetadataStore, MountPoolRoot: opt.MountPoolRoot, + MeterProvider: opt.MeterProvider, }) if err != nil { return nil, err