From 8aaaadec439b9367478cd5f9e7c369f3712c9dfc Mon Sep 17 00:00:00 2001 From: Sokolov Yura Date: Mon, 13 Jan 2020 20:40:44 +0300 Subject: [PATCH] Add limit on log records per span (#483) The MaxLogsPerSpan option limits the number of logs in a Span. Events are dropped as necessary; a log in the finished span indicates how many were dropped. fixes #46 Almost copy of https://github.com/opentracing/basictracer-go/pull/39/commits/723bb40d4f3bf4fcc8d01a1805bde4bd19430502 Signed-off-by: Sokolov Yura --- span.go | 77 ++++++++++++++++++++++++++++++++++++++++++++--- span_test.go | 48 +++++++++++++++++++++++++++++ tracer.go | 1 + tracer_options.go | 12 ++++++++ 4 files changed, 133 insertions(+), 5 deletions(-) diff --git a/span.go b/span.go index bbf6fb06..42c9112c 100644 --- a/span.go +++ b/span.go @@ -59,6 +59,9 @@ type Span struct { // The span's "micro-log" logs []opentracing.LogRecord + // The number of logs dropped because of MaxLogsPerSpan. + numDroppedLogs int + // references for this span references []Reference @@ -152,7 +155,12 @@ func (s *Span) Logs() []opentracing.LogRecord { s.Lock() defer s.Unlock() - return append([]opentracing.LogRecord(nil), s.logs...) + logs := append([]opentracing.LogRecord(nil), s.logs...) + if s.numDroppedLogs != 0 { + fixLogs(logs, s.numDroppedLogs) + } + + return logs } // References returns references for this span @@ -234,8 +242,65 @@ func (s *Span) Log(ld opentracing.LogData) { // this function should only be called while holding a Write lock func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) { - // TODO add logic to limit number of logs per span (issue #46) - s.logs = append(s.logs, lr) + maxLogs := s.tracer.options.maxLogsPerSpan + if maxLogs == 0 || len(s.logs) < maxLogs { + s.logs = append(s.logs, lr) + return + } + + // We have too many logs. We don't touch the first numOld logs; we treat the + // rest as a circular buffer and overwrite the oldest log among those. + numOld := (maxLogs - 1) / 2 + numNew := maxLogs - numOld + s.logs[numOld+s.numDroppedLogs%numNew] = lr + s.numDroppedLogs++ +} + +// rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at +// the end (i.e. pos circular left shifts). +func rotateLogBuffer(buf []opentracing.LogRecord, pos int) { + // This algorithm is described in: + // http://www.cplusplus.com/reference/algorithm/rotate + for first, middle, next := 0, pos, pos; first != middle; { + buf[first], buf[next] = buf[next], buf[first] + first++ + next++ + if next == len(buf) { + next = middle + } else if first == middle { + middle = next + } + } +} + +func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) { + // We dropped some log events, which means that we used part of Logs as a + // circular buffer (see appendLog). De-circularize it. + numOld := (len(logs) - 1) / 2 + numNew := len(logs) - numOld + rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew) + + // Replace the log in the middle (the oldest "new" log) with information + // about the dropped logs. This means that we are effectively dropping one + // more "new" log. + numDropped := numDroppedLogs + 1 + logs[numOld] = opentracing.LogRecord{ + // Keep the timestamp of the last dropped event. + Timestamp: logs[numOld].Timestamp, + Fields: []log.Field{ + log.String("event", "dropped Span logs"), + log.Int("dropped_log_count", numDropped), + log.String("component", "jaeger-client"), + }, + } +} + +func (s *Span) fixLogsIfDropped() { + if s.numDroppedLogs == 0 { + return + } + fixLogs(s.logs, s.numDroppedLogs) + s.numDroppedLogs = 0 } // SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext @@ -274,8 +339,9 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) { s.applySamplingDecision(decision, true) } if s.context.IsSampled() { + s.Lock() + s.fixLogsIfDropped() if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 { - s.Lock() // Note: bulk logs are not subject to maxLogsPerSpan limit if options.LogRecords != nil { s.logs = append(s.logs, options.LogRecords...) @@ -283,8 +349,8 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) { for _, ld := range options.BulkLogData { s.logs = append(s.logs, ld.ToLogRecord()) } - s.Unlock() } + s.Unlock() } // call reportSpan even for non-sampled traces, to return span to the pool // and update metrics counter @@ -344,6 +410,7 @@ func (s *Span) reset() { // Note: To reuse memory we can save the pointers on the heap s.tags = s.tags[:0] s.logs = s.logs[:0] + s.numDroppedLogs = 0 s.references = s.references[:0] } diff --git a/span_test.go b/span_test.go index 15d59c10..9f08c355 100644 --- a/span_test.go +++ b/span_test.go @@ -15,9 +15,11 @@ package jaeger import ( + "fmt" "sync" "sync/atomic" "testing" + "time" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" @@ -286,6 +288,52 @@ func TestSpan_Logs(t *testing.T) { } } +func TestSpan_LogsLimit(t *testing.T) { + for limit := 1; limit < 10; limit += 1 + limit/2 { + tracer := &Tracer{} + TracerOptions.MaxLogsPerSpan(limit)(tracer) + + for numLogs := 1; numLogs < limit*3; numLogs += 1 + numLogs/3 { + t.Run(fmt.Sprintf("Limit_%d_Num_%d", limit, numLogs), func(t *testing.T) { + span := &Span{tracer: tracer} + + var epo time.Time + for ts := 1; ts <= numLogs; ts++ { + span.appendLogNoLocking(opentracing.LogRecord{Timestamp: epo.Add(time.Duration(ts))}) + } + + logs := span.Logs() + + if numLogs <= limit { + assert.Len(t, logs, numLogs) + for i, r := range logs { + assert.Equal(t, r.Timestamp, epo.Add(time.Duration(i+1))) + } + return + } + + assert.Len(t, logs, limit) + signalRecord := (limit - 1) / 2 + for ts := 1; ts <= signalRecord; ts++ { + assert.Equal(t, logs[ts-1].Timestamp, epo.Add(time.Duration(ts))) + } + numNew := limit - signalRecord + ts := numLogs - numNew + 1 + + assert.Equal(t, "event", logs[signalRecord].Fields[0].Key()) + assert.Equal(t, "dropped Span logs", logs[signalRecord].Fields[0].Value()) + assert.Equal(t, "dropped_log_count", logs[signalRecord].Fields[1].Key()) + assert.Equal(t, numLogs-limit+1, logs[signalRecord].Fields[1].Value()) + + pos := signalRecord + for ; pos < limit; ts, pos = ts+1, pos+1 { + assert.Equal(t, epo.Add(time.Duration(ts)), logs[pos].Timestamp) + } + }) + } + } +} + func TestSpan_References(t *testing.T) { tests := []struct { name string diff --git a/tracer.go b/tracer.go index f03372dc..da43ec6d 100644 --- a/tracer.go +++ b/tracer.go @@ -52,6 +52,7 @@ type Tracer struct { highTraceIDGenerator func() uint64 // custom high trace ID generator maxTagValueLength int noDebugFlagOnForcedSampling bool + maxLogsPerSpan int // more options to come } // allocator of Span objects diff --git a/tracer_options.go b/tracer_options.go index 469685bb..f016484b 100644 --- a/tracer_options.go +++ b/tracer_options.go @@ -144,6 +144,18 @@ func (tracerOptions) MaxTagValueLength(maxTagValueLength int) TracerOption { } } +// MaxLogsPerSpan limits the number of Logs in a span (if set to a nonzero +// value). If a span has more logs than this value, logs are dropped as +// necessary (and replaced with a log describing how many were dropped). +// +// About half of the MaxLogsPerSpan logs kept are the oldest logs, and about +// half are the newest logs. +func (tracerOptions) MaxLogsPerSpan(maxLogsPerSpan int) TracerOption { + return func(tracer *Tracer) { + tracer.options.maxLogsPerSpan = maxLogsPerSpan + } +} + func (tracerOptions) ZipkinSharedRPCSpan(zipkinSharedRPCSpan bool) TracerOption { return func(tracer *Tracer) { tracer.options.zipkinSharedRPCSpan = zipkinSharedRPCSpan