Skip to content

Commit

Permalink
spacetime: add buffering span for ts
Browse files Browse the repository at this point in the history
  • Loading branch information
1pkg committed Dec 19, 2024
1 parent be50f20 commit de76575
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
18 changes: 17 additions & 1 deletion appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (a *Appender) Add(ctx context.Context, index string, document io.WriterTo)
return errMissingBody
}

var linkedTraceContext linkedTraceContext
var linkedTraceContext *linkedTraceContext
if a.tracingEnabled() {
linkedTraceContext = newLinkedTraceContextFromAPM(apm.TransactionFromContext(ctx).TraceContext())
} else if a.otelTracingEnabled() {
Expand Down Expand Up @@ -326,11 +326,19 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
defer a.addCount(1, &a.bulkRequests, a.metrics.bulkRequests)

logger := a.config.Logger
var duration *time.Duration
var span trace.Span
if a.tracingEnabled() {
tx := a.config.Tracer.StartTransaction("docappender.flush", "output")
tx.Context.SetLabel("documents", n)
defer tx.End()
defer func() {
if duration != nil {
span := tx.StartSpan("docappender.buffering", "", nil)
span.Duration = *duration
span.End()
}
}()
ctx = apm.ContextWithTransaction(ctx, tx)

linkedTraces := bulkIndexer.uniqueLinkedTraces()
Expand All @@ -346,6 +354,13 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
attribute.Int("documents", n),
))
defer span.End()
defer func() {
if duration != nil {
ts := time.Now().Add(*duration)
_, span := a.tracer.Start(ctx, "docappender.buffering")
span.End(trace.WithTimestamp(ts))
}
}()

linkedTraces := bulkIndexer.uniqueLinkedTraces()
for _, c := range linkedTraces {
Expand Down Expand Up @@ -428,6 +443,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
clientFailed, // failed after document retries (if it applies) and final status is 400s excluding 429
serverFailed int64 // failed after document retries (if it applies) and final status is 500s
)
duration = &resp.BufferingDuration
docsIndexed = resp.Indexed
var failedCount map[BulkIndexerResponseItem]int
if len(resp.FailedDocs) > 0 {
Expand Down
19 changes: 14 additions & 5 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/http"
"slices"
"strings"
"time"
"unsafe"

"github.com/klauspost/compress/gzip"
Expand Down Expand Up @@ -98,12 +99,14 @@ type BulkIndexer struct {
retryCounts map[int]int
linkedTraces map[int]linkedTraceContext
requireDataStream bool
bufferingTimestamp int64
}

type BulkIndexerResponseStat struct {
Indexed int64
RetriedDocs int64
FailedDocs []BulkIndexerResponseItem
Indexed int64
RetriedDocs int64
FailedDocs []BulkIndexerResponseItem
BufferingDuration time.Duration
}

// BulkIndexerResponseItem represents the Elasticsearch response item.
Expand Down Expand Up @@ -288,7 +291,7 @@ type BulkIndexerItem struct {
DocumentID string
Body io.WriterTo
DynamicTemplates map[string]string
linkedTraceContext linkedTraceContext
linkedTraceContext *linkedTraceContext
}

// Add encodes an item in the buffer.
Expand All @@ -300,7 +303,12 @@ func (b *BulkIndexer) Add(item BulkIndexerItem) error {
if _, err := b.writer.Write([]byte("\n")); err != nil {
return fmt.Errorf("failed to write newline: %w", err)
}
b.linkedTraces[b.itemsAdded] = item.linkedTraceContext
if b.itemsAdded == 0 {
b.bufferingTimestamp = time.Now().UnixMilli()
}
if item.linkedTraceContext != nil {
b.linkedTraces[b.itemsAdded] = *item.linkedTraceContext
}
b.itemsAdded++
return nil
}
Expand Down Expand Up @@ -404,6 +412,7 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
b.bytesFlushed = bytesFlushed
b.bytesUncompFlushed = bytesUncompFlushed
var resp BulkIndexerResponseStat
resp.BufferingDuration = time.Since(time.UnixMilli(b.bufferingTimestamp))
if res.IsError() {
e := errorFlushFailed{resp: res.String(), statusCode: res.StatusCode}
switch {
Expand Down
14 changes: 10 additions & 4 deletions linked_trace_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,21 @@ func (c linkedTraceContext) OTELLink() trace.Link {
})}
}

func newLinkedTraceContextFromAPM(ctx apm.TraceContext) linkedTraceContext {
return linkedTraceContext{
func newLinkedTraceContextFromAPM(ctx apm.TraceContext) *linkedTraceContext {
if err := ctx.Trace.Validate(); err != nil {
return nil
}
return &linkedTraceContext{
TraceID: ctx.Trace,
SpanID: ctx.Span,
}
}

func newLinkedTraceIDFromOTEL(ctx trace.SpanContext) linkedTraceContext {
return linkedTraceContext{
func newLinkedTraceIDFromOTEL(ctx trace.SpanContext) *linkedTraceContext {
if !ctx.HasTraceID() || !ctx.HasSpanID() {
return nil
}
return &linkedTraceContext{
TraceID: ctx.TraceID(),
SpanID: ctx.SpanID(),
}
Expand Down

0 comments on commit de76575

Please sign in to comment.