Skip to content
This repository has been archived by the owner on Mar 6, 2023. It is now read-only.

Add option for limiting the number of logs in a Span #39

Merged
merged 1 commit into from
Sep 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 64 additions & 6 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type spanImpl struct {
event func(SpanEvent)
sync.Mutex // protects the fields below
raw RawSpan
// The number of logs dropped because of MaxLogsPerSpan.
numDroppedLogs int
}

var spanPool = &sync.Pool{New: func() interface{} {
Expand Down Expand Up @@ -98,6 +100,21 @@ func (s *spanImpl) LogKV(keyValues ...interface{}) {
s.LogFields(fields...)
}

func (s *spanImpl) appendLog(lr opentracing.LogRecord) {
maxLogs := s.tracer.options.MaxLogsPerSpan
if maxLogs == 0 || len(s.raw.Logs) < maxLogs {
s.raw.Logs = append(s.raw.Logs, lr)
return
}

// We have too many logs. We don't touch the first numOld logs; we treat the
// rest as a circular buffer and overwrite the oldest log among those.
numOld := (maxLogs - 1) / 2
numNew := maxLogs - numOld
s.raw.Logs[numOld+s.numDroppedLogs%numNew] = lr
s.numDroppedLogs++
}

func (s *spanImpl) LogFields(fields ...log.Field) {
lr := opentracing.LogRecord{
Fields: fields,
Expand All @@ -111,7 +128,7 @@ func (s *spanImpl) LogFields(fields ...log.Field) {
if lr.Timestamp.IsZero() {
lr.Timestamp = time.Now()
}
s.raw.Logs = append(s.raw.Logs, lr)
s.appendLog(lr)
}

func (s *spanImpl) LogEvent(event string) {
Expand Down Expand Up @@ -139,13 +156,30 @@ func (s *spanImpl) Log(ld opentracing.LogData) {
ld.Timestamp = time.Now()
}

s.raw.Logs = append(s.raw.Logs, ld.ToLogRecord())
s.appendLog(ld.ToLogRecord())
}

func (s *spanImpl) Finish() {
s.FinishWithOptions(opentracing.FinishOptions{})
}

// rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at
// the end (i.e. pos circular left shifts).
func rotateLogBuffer(buf []opentracing.LogRecord, pos int) {
// This algorithm is described in:
// http://www.cplusplus.com/reference/algorithm/rotate
for first, middle, next := 0, pos, pos; first != middle; {
buf[first], buf[next] = buf[next], buf[first]
first++
next++
if next == len(buf) {
next = middle
} else if first == middle {
middle = next
}
}
}

func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) {
finishTime := opts.FinishTime
if finishTime.IsZero() {
Expand All @@ -155,18 +189,42 @@ func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) {

s.Lock()
defer s.Unlock()
if opts.LogRecords != nil {
s.raw.Logs = append(s.raw.Logs, opts.LogRecords...)

for _, lr := range opts.LogRecords {
s.appendLog(lr)
}
for _, ld := range opts.BulkLogData {
s.raw.Logs = append(s.raw.Logs, ld.ToLogRecord())
s.appendLog(ld.ToLogRecord())
}

if s.numDroppedLogs > 0 {
// We dropped some log events, which means that we used part of Logs as a
// circular buffer (see appendLog). De-circularize it.
numOld := (len(s.raw.Logs) - 1) / 2
numNew := len(s.raw.Logs) - numOld
rotateLogBuffer(s.raw.Logs[numOld:], s.numDroppedLogs%numNew)

// Replace the log in the middle (the oldest "new" log) with information
// about the dropped logs. This means that we are effectively dropping one
// more "new" log.
numDropped := s.numDroppedLogs + 1
s.raw.Logs[numOld] = opentracing.LogRecord{
// Keep the timestamp of the last dropped event.
Timestamp: s.raw.Logs[numOld].Timestamp,
Fields: []log.Field{
log.String("event", "dropped Span logs"),
log.Int("dropped_log_count", numDropped),
log.String("component", "basictracer"),
},
}
}

s.raw.Duration = duration

s.onFinish(s.raw)
s.tracer.options.Recorder.RecordSpan(s.raw)

// Last chance to get options before the span is possbily reset.
// Last chance to get options before the span is possibly reset.
poolEnabled := s.tracer.options.EnableSpanPool
if s.tracer.options.DebugAssertUseAfterFinish {
// This makes it much more likely to catch a panic on any subsequent
Expand Down
56 changes: 56 additions & 0 deletions span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package basictracer

import (
"reflect"
"strconv"
"testing"

opentracing "github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -166,3 +167,58 @@ func TestSpan_DropAllLogs(t *testing.T) {
// Only logs are dropped
assert.Equal(t, 0, len(spans[0].Logs))
}

func TestSpan_MaxLogSperSpan(t *testing.T) {
for _, limit := range []int{1, 2, 3, 5, 10, 15, 20} {
for _, numLogs := range []int{1, 2, 3, 5, 10, 15, 20, 30, 40, 50} {
recorder := NewInMemoryRecorder()
// Tracer that only retains the last <limit> logs.
tracer := NewWithOptions(Options{
Recorder: recorder,
ShouldSample: func(traceID uint64) bool { return true }, // always sample
MaxLogsPerSpan: limit,
})

span := tracer.StartSpan("x")
for i := 0; i < numLogs; i++ {
span.LogKV("eventIdx", i)
}
span.Finish()

spans := recorder.GetSpans()
assert.Equal(t, 1, len(spans))
assert.Equal(t, "x", spans[0].Operation)

logs := spans[0].Logs
var firstLogs, lastLogs []opentracing.LogRecord
if numLogs <= limit {
assert.Equal(t, numLogs, len(logs))
firstLogs = logs
} else {
assert.Equal(t, limit, len(logs))
if len(logs) > 0 {
numOld := (len(logs) - 1) / 2
firstLogs = logs[:numOld]
lastLogs = logs[numOld+1:]

fv := NewLogFieldValidator(t, logs[numOld].Fields)
fv = fv.ExpectNextFieldEquals("event", reflect.String, "dropped Span logs")
fv = fv.ExpectNextFieldEquals(
"dropped_log_count", reflect.Int, strconv.Itoa(numLogs-limit+1),
)
fv.ExpectNextFieldEquals("component", reflect.String, "basictracer")
}
}

for i, lr := range firstLogs {
fv := NewLogFieldValidator(t, lr.Fields)
fv.ExpectNextFieldEquals("eventIdx", reflect.Int, strconv.Itoa(i))
}

for i, lr := range lastLogs {
fv := NewLogFieldValidator(t, lr.Fields)
fv.ExpectNextFieldEquals("eventIdx", reflect.Int, strconv.Itoa(numLogs-len(lastLogs)+i))
}
}
}
}
12 changes: 8 additions & 4 deletions testutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package basictracer
import (
"fmt"
"reflect"
"runtime"
"testing"

"github.com/opentracing/opentracing-go/log"
Expand Down Expand Up @@ -41,7 +42,8 @@ func NewLogFieldValidator(t *testing.T, fields []log.Field) *LogFieldValidator {
// []Field slices.
func (fv *LogFieldValidator) ExpectNextFieldEquals(key string, kind reflect.Kind, valAsString string) *LogFieldValidator {
if len(fv.fields) < fv.fieldIdx {
fv.t.Errorf("Expecting more than the %v Fields we have", len(fv.fields))
_, file, line, _ := runtime.Caller(1)
fv.t.Errorf("%s:%d Expecting more than the %v Fields we have", file, line, len(fv.fields))
}
fv.nextKey = key
fv.nextKind = kind
Expand Down Expand Up @@ -107,15 +109,17 @@ func (fv *LogFieldValidator) EmitLazyLogger(value log.LazyLogger) {
}

func (fv *LogFieldValidator) validateNextField(key string, actualKind reflect.Kind, value interface{}) {
// Reference the ExpectNextField caller in error messages.
_, file, line, _ := runtime.Caller(4)
if fv.nextKey != key {
fv.t.Errorf("Bad key: expected %q, found %q", fv.nextKey, key)
fv.t.Errorf("%s:%d Bad key: expected %q, found %q", file, line, fv.nextKey, key)
}
if fv.nextKind != actualKind {
fv.t.Errorf("Bad reflect.Kind: expected %v, found %v", fv.nextKind, actualKind)
fv.t.Errorf("%s:%d Bad reflect.Kind: expected %v, found %v", file, line, fv.nextKind, actualKind)
return
}
if fv.nextValAsString != fmt.Sprint(value) {
fv.t.Errorf("Bad value: expected %q, found %q", fv.nextValAsString, fmt.Sprint(value))
fv.t.Errorf("%s:%d Bad value: expected %q, found %q", file, line, fv.nextValAsString, fmt.Sprint(value))
}
// All good.
}
13 changes: 12 additions & 1 deletion tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ type Options struct {
// DropAllLogs turns log events on all Spans into no-ops.
// If NewSpanEventListener is set, the callbacks will still fire.
DropAllLogs bool
// MaxLogsPerSpan limits the number of Logs in a span (if set to a nonzero
// value). If a span has more logs than this value, logs are dropped as
// necessary (and replaced with a log describing how many were dropped).
//
// About half of the MaxLogPerSpan logs kept are the oldest logs, and about
// half are the newest logs.
//
// If NewSpanEventListener is set, the callbacks will still fire for all log
// events. This value is ignored if DropAllLogs is true.
MaxLogsPerSpan int
// DebugAssertSingleGoroutine internally records the ID of the goroutine
// creating each Span and verifies that no operation is carried out on
// it on a different goroutine.
Expand Down Expand Up @@ -87,7 +97,8 @@ type Options struct {
// returned object with a Tracer.
func DefaultOptions() Options {
return Options{
ShouldSample: func(traceID uint64) bool { return traceID%64 == 0 },
ShouldSample: func(traceID uint64) bool { return traceID%64 == 0 },
MaxLogsPerSpan: 100,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RaduBerinde you changed your mind about default being infinite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as per the discussion in #38

}
}

Expand Down