From de76575b84fc6a2f55d2a1cf7e64b90cf7815704 Mon Sep 17 00:00:00 2001 From: Kostiantyn Masliuk <1pkg@protonmail.com> Date: Thu, 19 Dec 2024 15:48:23 -0800 Subject: [PATCH] spacetime: add buffering span for ts --- appender.go | 18 +++++++++++++++++- bulk_indexer.go | 19 ++++++++++++++----- linked_trace_context.go | 14 ++++++++++---- 3 files changed, 41 insertions(+), 10 deletions(-) diff --git a/appender.go b/appender.go index 2bab48d..7bb0fbe 100644 --- a/appender.go +++ b/appender.go @@ -271,7 +271,7 @@ func (a *Appender) Add(ctx context.Context, index string, document io.WriterTo) return errMissingBody } - var linkedTraceContext linkedTraceContext + var linkedTraceContext *linkedTraceContext if a.tracingEnabled() { linkedTraceContext = newLinkedTraceContextFromAPM(apm.TransactionFromContext(ctx).TraceContext()) } else if a.otelTracingEnabled() { @@ -326,11 +326,19 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { defer a.addCount(1, &a.bulkRequests, a.metrics.bulkRequests) logger := a.config.Logger + var duration *time.Duration var span trace.Span if a.tracingEnabled() { tx := a.config.Tracer.StartTransaction("docappender.flush", "output") tx.Context.SetLabel("documents", n) defer tx.End() + defer func() { + if duration != nil { + span := tx.StartSpan("docappender.buffering", "", nil) + span.Duration = *duration + span.End() + } + }() ctx = apm.ContextWithTransaction(ctx, tx) linkedTraces := bulkIndexer.uniqueLinkedTraces() @@ -346,6 +354,13 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { attribute.Int("documents", n), )) defer span.End() + defer func() { + if duration != nil { + ts := time.Now().Add(*duration) + _, span := a.tracer.Start(ctx, "docappender.buffering") + span.End(trace.WithTimestamp(ts)) + } + }() linkedTraces := bulkIndexer.uniqueLinkedTraces() for _, c := range linkedTraces { @@ -428,6 +443,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { clientFailed, // failed after document retries (if it applies) and final status is 400s excluding 429 serverFailed int64 // failed after document retries (if it applies) and final status is 500s ) + duration = &resp.BufferingDuration docsIndexed = resp.Indexed var failedCount map[BulkIndexerResponseItem]int if len(resp.FailedDocs) > 0 { diff --git a/bulk_indexer.go b/bulk_indexer.go index 1136975..59d98e7 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -26,6 +26,7 @@ import ( "net/http" "slices" "strings" + "time" "unsafe" "github.com/klauspost/compress/gzip" @@ -98,12 +99,14 @@ type BulkIndexer struct { retryCounts map[int]int linkedTraces map[int]linkedTraceContext requireDataStream bool + bufferingTimestamp int64 } type BulkIndexerResponseStat struct { - Indexed int64 - RetriedDocs int64 - FailedDocs []BulkIndexerResponseItem + Indexed int64 + RetriedDocs int64 + FailedDocs []BulkIndexerResponseItem + BufferingDuration time.Duration } // BulkIndexerResponseItem represents the Elasticsearch response item. @@ -288,7 +291,7 @@ type BulkIndexerItem struct { DocumentID string Body io.WriterTo DynamicTemplates map[string]string - linkedTraceContext linkedTraceContext + linkedTraceContext *linkedTraceContext } // Add encodes an item in the buffer. @@ -300,7 +303,12 @@ func (b *BulkIndexer) Add(item BulkIndexerItem) error { if _, err := b.writer.Write([]byte("\n")); err != nil { return fmt.Errorf("failed to write newline: %w", err) } - b.linkedTraces[b.itemsAdded] = item.linkedTraceContext + if b.itemsAdded == 0 { + b.bufferingTimestamp = time.Now().UnixMilli() + } + if item.linkedTraceContext != nil { + b.linkedTraces[b.itemsAdded] = *item.linkedTraceContext + } b.itemsAdded++ return nil } @@ -404,6 +412,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error b.bytesFlushed = bytesFlushed b.bytesUncompFlushed = bytesUncompFlushed var resp BulkIndexerResponseStat + resp.BufferingDuration = time.Since(time.UnixMilli(b.bufferingTimestamp)) if res.IsError() { e := errorFlushFailed{resp: res.String(), statusCode: res.StatusCode} switch { diff --git a/linked_trace_context.go b/linked_trace_context.go index 0c84a0e..2d23b70 100644 --- a/linked_trace_context.go +++ b/linked_trace_context.go @@ -38,15 +38,21 @@ func (c linkedTraceContext) OTELLink() trace.Link { })} } -func newLinkedTraceContextFromAPM(ctx apm.TraceContext) linkedTraceContext { - return linkedTraceContext{ +func newLinkedTraceContextFromAPM(ctx apm.TraceContext) *linkedTraceContext { + if err := ctx.Trace.Validate(); err != nil { + return nil + } + return &linkedTraceContext{ TraceID: ctx.Trace, SpanID: ctx.Span, } } -func newLinkedTraceIDFromOTEL(ctx trace.SpanContext) linkedTraceContext { - return linkedTraceContext{ +func newLinkedTraceIDFromOTEL(ctx trace.SpanContext) *linkedTraceContext { + if !ctx.HasTraceID() || !ctx.HasSpanID() { + return nil + } + return &linkedTraceContext{ TraceID: ctx.TraceID(), SpanID: ctx.SpanID(), }