Skip to content

Commit

Permalink
Add limit on log records per span (jaegertracing#483)
Browse files Browse the repository at this point in the history
The MaxLogsPerSpan option limits the number of logs in a Span. Events are
dropped as necessary; a log in the finished span indicates how many were
dropped.

fixes jaegertracing#46

Almost copy of opentracing/basictracer-go@723bb40

Signed-off-by: Sokolov Yura <funny.falcon@gmail.com>
  • Loading branch information
funny-falcon authored and yurishkuro committed Jan 13, 2020
1 parent 7cedac8 commit 8aaaade
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 5 deletions.
77 changes: 72 additions & 5 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type Span struct {
// The span's "micro-log"
logs []opentracing.LogRecord

// The number of logs dropped because of MaxLogsPerSpan.
numDroppedLogs int

// references for this span
references []Reference

Expand Down Expand Up @@ -152,7 +155,12 @@ func (s *Span) Logs() []opentracing.LogRecord {
s.Lock()
defer s.Unlock()

return append([]opentracing.LogRecord(nil), s.logs...)
logs := append([]opentracing.LogRecord(nil), s.logs...)
if s.numDroppedLogs != 0 {
fixLogs(logs, s.numDroppedLogs)
}

return logs
}

// References returns references for this span
Expand Down Expand Up @@ -234,8 +242,65 @@ func (s *Span) Log(ld opentracing.LogData) {

// this function should only be called while holding a Write lock
func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
// TODO add logic to limit number of logs per span (issue #46)
s.logs = append(s.logs, lr)
maxLogs := s.tracer.options.maxLogsPerSpan
if maxLogs == 0 || len(s.logs) < maxLogs {
s.logs = append(s.logs, lr)
return
}

// We have too many logs. We don't touch the first numOld logs; we treat the
// rest as a circular buffer and overwrite the oldest log among those.
numOld := (maxLogs - 1) / 2
numNew := maxLogs - numOld
s.logs[numOld+s.numDroppedLogs%numNew] = lr
s.numDroppedLogs++
}

// rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at
// the end (i.e. pos circular left shifts).
func rotateLogBuffer(buf []opentracing.LogRecord, pos int) {
// This algorithm is described in:
// http://www.cplusplus.com/reference/algorithm/rotate
for first, middle, next := 0, pos, pos; first != middle; {
buf[first], buf[next] = buf[next], buf[first]
first++
next++
if next == len(buf) {
next = middle
} else if first == middle {
middle = next
}
}
}

func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) {
// We dropped some log events, which means that we used part of Logs as a
// circular buffer (see appendLog). De-circularize it.
numOld := (len(logs) - 1) / 2
numNew := len(logs) - numOld
rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew)

// Replace the log in the middle (the oldest "new" log) with information
// about the dropped logs. This means that we are effectively dropping one
// more "new" log.
numDropped := numDroppedLogs + 1
logs[numOld] = opentracing.LogRecord{
// Keep the timestamp of the last dropped event.
Timestamp: logs[numOld].Timestamp,
Fields: []log.Field{
log.String("event", "dropped Span logs"),
log.Int("dropped_log_count", numDropped),
log.String("component", "jaeger-client"),
},
}
}

func (s *Span) fixLogsIfDropped() {
if s.numDroppedLogs == 0 {
return
}
fixLogs(s.logs, s.numDroppedLogs)
s.numDroppedLogs = 0
}

// SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext
Expand Down Expand Up @@ -274,17 +339,18 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
s.applySamplingDecision(decision, true)
}
if s.context.IsSampled() {
s.Lock()
s.fixLogsIfDropped()
if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 {
s.Lock()
// Note: bulk logs are not subject to maxLogsPerSpan limit
if options.LogRecords != nil {
s.logs = append(s.logs, options.LogRecords...)
}
for _, ld := range options.BulkLogData {
s.logs = append(s.logs, ld.ToLogRecord())
}
s.Unlock()
}
s.Unlock()
}
// call reportSpan even for non-sampled traces, to return span to the pool
// and update metrics counter
Expand Down Expand Up @@ -344,6 +410,7 @@ func (s *Span) reset() {
// Note: To reuse memory we can save the pointers on the heap
s.tags = s.tags[:0]
s.logs = s.logs[:0]
s.numDroppedLogs = 0
s.references = s.references[:0]
}

Expand Down
48 changes: 48 additions & 0 deletions span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package jaeger

import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
Expand Down Expand Up @@ -286,6 +288,52 @@ func TestSpan_Logs(t *testing.T) {
}
}

func TestSpan_LogsLimit(t *testing.T) {
for limit := 1; limit < 10; limit += 1 + limit/2 {
tracer := &Tracer{}
TracerOptions.MaxLogsPerSpan(limit)(tracer)

for numLogs := 1; numLogs < limit*3; numLogs += 1 + numLogs/3 {
t.Run(fmt.Sprintf("Limit_%d_Num_%d", limit, numLogs), func(t *testing.T) {
span := &Span{tracer: tracer}

var epo time.Time
for ts := 1; ts <= numLogs; ts++ {
span.appendLogNoLocking(opentracing.LogRecord{Timestamp: epo.Add(time.Duration(ts))})
}

logs := span.Logs()

if numLogs <= limit {
assert.Len(t, logs, numLogs)
for i, r := range logs {
assert.Equal(t, r.Timestamp, epo.Add(time.Duration(i+1)))
}
return
}

assert.Len(t, logs, limit)
signalRecord := (limit - 1) / 2
for ts := 1; ts <= signalRecord; ts++ {
assert.Equal(t, logs[ts-1].Timestamp, epo.Add(time.Duration(ts)))
}
numNew := limit - signalRecord
ts := numLogs - numNew + 1

assert.Equal(t, "event", logs[signalRecord].Fields[0].Key())
assert.Equal(t, "dropped Span logs", logs[signalRecord].Fields[0].Value())
assert.Equal(t, "dropped_log_count", logs[signalRecord].Fields[1].Key())
assert.Equal(t, numLogs-limit+1, logs[signalRecord].Fields[1].Value())

pos := signalRecord
for ; pos < limit; ts, pos = ts+1, pos+1 {
assert.Equal(t, epo.Add(time.Duration(ts)), logs[pos].Timestamp)
}
})
}
}
}

func TestSpan_References(t *testing.T) {
tests := []struct {
name string
Expand Down
1 change: 1 addition & 0 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Tracer struct {
highTraceIDGenerator func() uint64 // custom high trace ID generator
maxTagValueLength int
noDebugFlagOnForcedSampling bool
maxLogsPerSpan int
// more options to come
}
// allocator of Span objects
Expand Down
12 changes: 12 additions & 0 deletions tracer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,18 @@ func (tracerOptions) MaxTagValueLength(maxTagValueLength int) TracerOption {
}
}

// MaxLogsPerSpan limits the number of Logs in a span (if set to a nonzero
// value). If a span has more logs than this value, logs are dropped as
// necessary (and replaced with a log describing how many were dropped).
//
// About half of the MaxLogsPerSpan logs kept are the oldest logs, and about
// half are the newest logs.
func (tracerOptions) MaxLogsPerSpan(maxLogsPerSpan int) TracerOption {
return func(tracer *Tracer) {
tracer.options.maxLogsPerSpan = maxLogsPerSpan
}
}

func (tracerOptions) ZipkinSharedRPCSpan(zipkinSharedRPCSpan bool) TracerOption {
return func(tracer *Tracer) {
tracer.options.zipkinSharedRPCSpan = zipkinSharedRPCSpan
Expand Down

0 comments on commit 8aaaade

Please sign in to comment.