diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index d63b604c0d2e..f564c812454a 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -38,8 +38,6 @@ import ( "github.com/moby/buildkit/worker" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" - "go.opentelemetry.io/otel/sdk/trace/tracetest" - "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -158,14 +156,9 @@ func (s *Solver) Bridge(b solver.Builder) frontend.FrontendLLBBridge { } func (s *Solver) recordBuildHistory(ctx context.Context, id string, req frontend.SolveRequest, exp ExporterRequest, j *solver.Job, usage *resources.SysSampler) (func(*Result, []exporter.DescriptorReference, error) error, error) { - var stopTrace func() []tracetest.SpanStub - - if s := trace.SpanFromContext(ctx); s.SpanContext().IsValid() { - if exp, _, err := detect.Exporter(); err == nil { - if rec, ok := exp.(*detect.TraceRecorder); ok { - stopTrace = rec.Record(s.SpanContext().TraceID()) - } - } + stopTrace, err := detect.Recorder.Record(ctx) + if err != nil { + return nil, err } st := time.Now() diff --git a/util/tracing/detect/detect.go b/util/tracing/detect/detect.go index 9be07eb6dca2..5d1fc0360b55 100644 --- a/util/tracing/detect/detect.go +++ b/util/tracing/detect/detect.go @@ -32,7 +32,6 @@ type detector struct { } var ServiceName string -var Recorder *TraceRecorder var detectors map[string]detector var once sync.Once @@ -116,45 +115,32 @@ func detectExporter[T any](envVar string, fn func(d ExporterDetector) (T, bool, return exp, nil } -func getExporters() (sdktrace.SpanExporter, sdkmetric.Exporter, error) { - texp, mexp, err := detectExporters() - if err != nil { - return nil, nil, err - } - - if Recorder != nil { - Recorder.SpanExporter = texp - texp = Recorder - } - - return texp, mexp, nil -} - func detect() error { tp = noop.NewTracerProvider() mp = sdkmetric.NewMeterProvider() - texp, mexp, err := getExporters() + texp, mexp, err := detectExporters() if err != nil || (texp == nil && mexp == nil) { return err } res := Resource() - // enable log with traceID when valid exporter - if texp != nil { + if texp != nil || Recorder != nil { + // enable log with traceID when a valid exporter is used bklog.EnableLogWithTraceID(true) - sp := sdktrace.NewBatchSpanProcessor(texp) - + sdktpopts := []sdktrace.TracerProviderOption{ + sdktrace.WithResource(res), + } + if texp != nil { + sdktpopts = append(sdktpopts, sdktrace.WithBatcher(texp)) + } if Recorder != nil { - Recorder.flush = sp.ForceFlush + sp := sdktrace.NewSimpleSpanProcessor(Recorder) + sdktpopts = append(sdktpopts, sdktrace.WithSpanProcessor(sp)) } - - sdktp := sdktrace.NewTracerProvider( - sdktrace.WithSpanProcessor(sp), - sdktrace.WithResource(res), - ) + sdktp := sdktrace.NewTracerProvider(sdktpopts...) closers = append(closers, sdktp.Shutdown) exporter.SpanExporter = texp diff --git a/util/tracing/detect/recorder.go b/util/tracing/detect/recorder.go index 8ff7f1dcef38..f11bb5e927cf 100644 --- a/util/tracing/detect/recorder.go +++ b/util/tracing/detect/recorder.go @@ -5,18 +5,31 @@ import ( "sync" "time" + "github.com/pkg/errors" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/semaphore" ) +var Recorder *TraceRecorder + type TraceRecorder struct { - sdktrace.SpanExporter + // sem is a binary semaphore for this struct. + // This is used instead of sync.Mutex because it allows + // for context cancellation to work properly. + sem *semaphore.Weighted + + // shutdown function for the gc. + shutdownGC func(err error) + + // done channel that marks when background goroutines + // are closed. + done chan struct{} - mu sync.Mutex + // track traces and listeners for traces. m map[trace.TraceID]*stubs listeners map[trace.TraceID]int - flush func(context.Context) error } type stubs struct { @@ -26,37 +39,52 @@ type stubs struct { func NewTraceRecorder() *TraceRecorder { tr := &TraceRecorder{ + sem: semaphore.NewWeighted(1), + done: make(chan struct{}), m: map[trace.TraceID]*stubs{}, listeners: map[trace.TraceID]int{}, } - go func() { - t := time.NewTimer(60 * time.Second) - for { - <-t.C - tr.gc() - t.Reset(50 * time.Second) - } - }() + ctx, cancel := context.WithCancelCause(context.Background()) + go tr.gcLoop(ctx) + tr.shutdownGC = cancel return tr } -func (r *TraceRecorder) Record(traceID trace.TraceID) func() []tracetest.SpanStub { - r.mu.Lock() - defer r.mu.Unlock() +// Record signals to the TraceRecorder that it should track spans associated with the current +// trace and returns a function that will return these spans. +// +// If the TraceRecorder is nil or there is no valid active span, the returned function +// will be nil to signal that the trace cannot be recorded. +func (r *TraceRecorder) Record(ctx context.Context) (func() []tracetest.SpanStub, error) { + if r == nil { + return nil, nil + } + + spanCtx := trace.SpanContextFromContext(ctx) + if !spanCtx.IsValid() { + return nil, nil + } + + if err := r.sem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer r.sem.Release(1) + traceID := spanCtx.TraceID() r.listeners[traceID]++ - var once sync.Once - var spans []tracetest.SpanStub + + var ( + once sync.Once + spans []tracetest.SpanStub + ) return func() []tracetest.SpanStub { once.Do(func() { - if r.flush != nil { - r.flush(context.TODO()) + if err := r.sem.Acquire(context.Background(), 1); err != nil { + return } - - r.mu.Lock() - defer r.mu.Unlock() + defer r.sem.Release(1) if v, ok := r.m[traceID]; ok { spans = v.spans @@ -67,26 +95,46 @@ func (r *TraceRecorder) Record(traceID trace.TraceID) func() []tracetest.SpanStu } }) return spans + }, nil +} + +func (r *TraceRecorder) gcLoop(ctx context.Context) { + defer close(r.done) + + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case now := <-ticker.C: + r.gc(ctx, now) + } } } -func (r *TraceRecorder) gc() { - r.mu.Lock() - defer r.mu.Unlock() +func (r *TraceRecorder) gc(ctx context.Context, now time.Time) { + if err := r.sem.Acquire(ctx, 1); err != nil { + return + } + defer r.sem.Release(1) - now := time.Now() for k, s := range r.m { if _, ok := r.listeners[k]; ok { continue } - if now.Sub(s.last) > 60*time.Second { + if now.Sub(s.last) > time.Minute { delete(r.m, k) } } } func (r *TraceRecorder) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { - r.mu.Lock() + if err := r.sem.Acquire(ctx, 1); err != nil { + return err + } + defer r.sem.Release(1) now := time.Now() for _, s := range spans { @@ -99,17 +147,18 @@ func (r *TraceRecorder) ExportSpans(ctx context.Context, spans []sdktrace.ReadOn v.last = now v.spans = append(v.spans, ss) } - r.mu.Unlock() - - if r.SpanExporter == nil { - return nil - } - return r.SpanExporter.ExportSpans(ctx, spans) + return nil } func (r *TraceRecorder) Shutdown(ctx context.Context) error { - if r.SpanExporter == nil { + // Initiate the shutdown of the gc loop. + r.shutdownGC(errors.WithStack(context.Canceled)) + + // Wait for it to be done or the context is canceled. + select { + case <-r.done: return nil + case <-ctx.Done(): + return context.Cause(ctx) } - return r.SpanExporter.Shutdown(ctx) }