diff --git a/propagation_test.go b/propagation_test.go index efcadd95..b9298c8a 100644 --- a/propagation_test.go +++ b/propagation_test.go @@ -65,7 +65,6 @@ func TestSpanPropagator(t *testing.T) { child.Finish() } sp.Finish() - closer.Close() otSpans := reporter.GetSpans() require.Equal(t, len(tests)+1, len(otSpans), "unexpected number of spans reporter") @@ -75,6 +74,8 @@ func TestSpanPropagator(t *testing.T) { spans[i] = s.(*Span) } + closer.Close() + // The last span is the original one. exp, spans := spans[len(spans)-1], spans[:len(spans)-1] exp.duration = time.Duration(123) diff --git a/reporter.go b/reporter.go index fe6288c4..27163ebe 100644 --- a/reporter.go +++ b/reporter.go @@ -93,13 +93,14 @@ func NewInMemoryReporter() *InMemoryReporter { // Report implements Report() method of Reporter by storing the span in the buffer. func (r *InMemoryReporter) Report(span *Span) { r.lock.Lock() - r.spans = append(r.spans, span) + // Need to retain the span otherwise it will be released + r.spans = append(r.spans, span.Retain()) r.lock.Unlock() } -// Close implements Close() method of Reporter by doing nothing. +// Close implements Close() method of Reporter func (r *InMemoryReporter) Close() { - // no-op + r.Reset() } // SpansSubmitted returns the number of spans accumulated in the buffer. @@ -122,7 +123,12 @@ func (r *InMemoryReporter) GetSpans() []opentracing.Span { func (r *InMemoryReporter) Reset() { r.lock.Lock() defer r.lock.Unlock() - r.spans = nil + + // Before reset the collection need to release Span memory + for _, span := range r.spans { + span.(*Span).Release() + } + r.spans = r.spans[:0] } // ------------------------------ @@ -218,7 +224,8 @@ func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter { // because some of them may still be successfully added to the queue. func (r *remoteReporter) Report(span *Span) { select { - case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span}: + // Need to retain the span otherwise it will be released + case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span.Retain()}: atomic.AddInt64(&r.queueLength, 1) default: r.metrics.ReporterDropped.Inc(1) @@ -278,6 +285,7 @@ func (r *remoteReporter) processQueue() { // to reduce the number of gauge stats, we only emit queue length on flush r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength)) } + span.Release() case reporterQueueItemClose: timer.Stop() flush() diff --git a/reporter_test.go b/reporter_test.go index cd465e5a..29da6ee2 100644 --- a/reporter_test.go +++ b/reporter_test.go @@ -143,6 +143,17 @@ func TestRemoteReporterFailedFlushViaAppend(t *testing.T) { s.assertLogs(t, "ERROR: error reporting span \"sp2\": flush error\nERROR: error when flushing the buffer: flush error\n") } +func TestRemoteReporterAppendWithPoolAllocator(t *testing.T) { + s := makeReporterSuiteWithSender(t, &fakeSender{bufferSize: 100}, ReporterOptions.BufferFlushInterval(time.Millisecond*10)) + TracerOptions.PoolSpans(true)(s.tracer.(*Tracer)) + for i := 0; i < 100; i++ { + s.tracer.StartSpan("sp").Finish() + } + time.Sleep(time.Second) + s.sender.assertFlushedSpans(t, 100) + s.close() // causes explicit flush that also fails with the same error +} + func TestRemoteReporterDroppedSpans(t *testing.T) { s := makeReporterSuite(t, ReporterOptions.QueueSize(1)) defer s.close() diff --git a/span.go b/span.go index f0b497a9..89195f14 100644 --- a/span.go +++ b/span.go @@ -16,6 +16,7 @@ package jaeger import ( "sync" + "sync/atomic" "time" "github.com/opentracing/opentracing-go" @@ -25,6 +26,10 @@ import ( // Span implements opentracing.Span type Span struct { + // referenceCounter used to increase the lifetime of + // the object before return it into the pool. + referenceCounter int32 + sync.RWMutex tracer *Tracer @@ -174,6 +179,8 @@ func (s *Span) BaggageItem(key string) string { } // Finish implements opentracing.Span API +// After finishing the Span object it returns back to the allocator unless the reporter retains it again, +// so after that, the Span object should no longer be used because it won't be valid anymore. func (s *Span) Finish() { s.FinishWithOptions(opentracing.FinishOptions{}) } @@ -197,6 +204,7 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) { } s.Unlock() // call reportSpan even for non-sampled traces, to return span to the pool + // and update metrics counter s.tracer.reportSpan(s) } @@ -225,18 +233,49 @@ func (s *Span) OperationName() string { return s.operationName } +// Retain increases object counter to increase the lifetime of the object +func (s *Span) Retain() *Span { + atomic.AddInt32(&s.referenceCounter, 1) + return s +} + +// Release decrements object counter and return to the +// allocator manager when counter will below zero +func (s *Span) Release() { + if atomic.AddInt32(&s.referenceCounter, -1) == -1 { + s.tracer.spanAllocator.Put(s) + } +} + +// reset span state and release unused data +func (s *Span) reset() { + s.firstInProcess = false + s.context = emptyContext + s.operationName = "" + s.tracer = nil + s.startTime = time.Time{} + s.duration = 0 + s.observer = nil + atomic.StoreInt32(&s.referenceCounter, 0) + + // Note: To reuse memory we can save the pointers on the heap + s.tags = s.tags[:0] + s.logs = s.logs[:0] + s.references = s.references[:0] +} + func (s *Span) serviceName() string { return s.tracer.serviceName } // setSamplingPriority returns true if the flag was updated successfully, false otherwise. func setSamplingPriority(s *Span, value interface{}) bool { - s.Lock() - defer s.Unlock() val, ok := value.(uint16) if !ok { return false } + s.Lock() + defer s.Unlock() if val == 0 { s.context.flags = s.context.flags & (^flagSampled) return true diff --git a/span_allocator.go b/span_allocator.go new file mode 100644 index 00000000..6fe0cd0c --- /dev/null +++ b/span_allocator.go @@ -0,0 +1,56 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jaeger + +import "sync" + +// SpanAllocator abstraction of managign span allocations +type SpanAllocator interface { + Get() *Span + Put(*Span) +} + +type syncPollSpanAllocator struct { + spanPool sync.Pool +} + +func newSyncPollSpanAllocator() SpanAllocator { + return &syncPollSpanAllocator{ + spanPool: sync.Pool{New: func() interface{} { + return &Span{} + }}, + } +} + +func (pool *syncPollSpanAllocator) Get() *Span { + return pool.spanPool.Get().(*Span) +} + +func (pool *syncPollSpanAllocator) Put(span *Span) { + span.reset() + pool.spanPool.Put(span) +} + +type simpleSpanAllocator struct{} + +func (pool simpleSpanAllocator) Get() *Span { + return &Span{} +} + +func (pool simpleSpanAllocator) Put(span *Span) { + // @comment https://github.com/jaegertracing/jaeger-client-go/pull/381#issuecomment-475904351 + // since finished spans are not reused, no need to reset them + // span.reset() +} diff --git a/span_allocator_test.go b/span_allocator_test.go new file mode 100644 index 00000000..23595dc8 --- /dev/null +++ b/span_allocator_test.go @@ -0,0 +1,48 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jaeger + +import "testing" + +func BenchmarkSpanAllocator(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + + b.Run("SyncPool", func(b *testing.B) { + benchSpanAllocator(newSyncPollSpanAllocator(), b) + }) + + b.Run("Simple", func(b *testing.B) { + benchSpanAllocator(simpleSpanAllocator{}, b) + }) +} + +func benchSpanAllocator(allocator SpanAllocator, b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + queue := make(chan *Span, 1000) + cancel := make(chan bool, 1) + go func() { + for span := range queue { + allocator.Put(span) + } + cancel <- true + }() + for pb.Next() { + queue <- allocator.Get() + } + close(queue) + <-cancel + }) +} diff --git a/span_test.go b/span_test.go index d44cb1dc..89d6c83d 100644 --- a/span_test.go +++ b/span_test.go @@ -16,9 +16,10 @@ package jaeger import ( "sync" + "sync/atomic" "testing" - "github.com/opentracing/opentracing-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -146,3 +147,23 @@ func TestBaggageContextRace(t *testing.T) { startWg.Done() endWg.Wait() } + +func TestSpanLifecycle(t *testing.T) { + service := "DOOP" + tracer, closer := NewTracer(service, NewConstSampler(true), NewNullReporter(), TracerOptions.PoolSpans(true)) + // After closing all contexts will be released + defer closer.Close() + + sp1 := tracer.StartSpan("s1").(*Span) + sp1.LogEvent("foo") + assert.True(t, sp1.tracer != nil, "invalid span initalisation") + + sp1.Retain() // After this we are responsible for the span releasing + assert.Equal(t, int32(1), atomic.LoadInt32(&sp1.referenceCounter)) + + sp1.Finish() // The span is still alive + assert.True(t, sp1.tracer != nil, "invalid span finishing") + + sp1.Release() // Now we will kill the object and return it in the pool + assert.True(t, sp1.tracer == nil, "span must be released") +} diff --git a/tracer.go b/tracer.go index d87fb10b..a457a741 100644 --- a/tracer.go +++ b/tracer.go @@ -47,15 +47,14 @@ type Tracer struct { randomNumber func() uint64 options struct { - poolSpans bool gen128Bit bool // whether to generate 128bit trace IDs zipkinSharedRPCSpan bool highTraceIDGenerator func() uint64 // custom high trace ID generator maxTagValueLength int // more options to come } - // pool for Span objects - spanPool sync.Pool + // allocator of Span objects + spanAllocator SpanAllocator injectors map[interface{}]Injector extractors map[interface{}]Extractor @@ -81,15 +80,13 @@ func NewTracer( options ...TracerOption, ) (opentracing.Tracer, io.Closer) { t := &Tracer{ - serviceName: serviceName, - sampler: sampler, - reporter: reporter, - injectors: make(map[interface{}]Injector), - extractors: make(map[interface{}]Extractor), - metrics: *NewNullMetrics(), - spanPool: sync.Pool{New: func() interface{} { - return &Span{} - }}, + serviceName: serviceName, + sampler: sampler, + reporter: reporter, + injectors: make(map[interface{}]Injector), + extractors: make(map[interface{}]Extractor), + metrics: *NewNullMetrics(), + spanAllocator: simpleSpanAllocator{}, } for _, option := range options { @@ -353,15 +350,7 @@ func (t *Tracer) Tags() []opentracing.Tag { // newSpan returns an instance of a clean Span object. // If options.PoolSpans is true, the spans are retrieved from an object pool. func (t *Tracer) newSpan() *Span { - if !t.options.poolSpans { - return &Span{} - } - sp := t.spanPool.Get().(*Span) - sp.context = emptyContext - sp.tracer = nil - sp.tags = nil - sp.logs = nil - return sp + return t.spanAllocator.Get() } func (t *Tracer) startSpanInternal( @@ -416,12 +405,15 @@ func (t *Tracer) startSpanInternal( func (t *Tracer) reportSpan(sp *Span) { t.metrics.SpansFinished.Inc(1) + + // Note: if the reporter is processing Span asynchronously need to Retain() it + // otherwise, in the racing condition will be rewritten span data before it will be sent + // * To remove object use method span.Release() if sp.context.IsSampled() { t.reporter.Report(sp) } - if t.options.poolSpans { - t.spanPool.Put(sp) - } + + sp.Release() } // randomID generates a random trace/span ID, using tracer.random() generator. diff --git a/tracer_options.go b/tracer_options.go index b4176cc7..ecb17276 100644 --- a/tracer_options.go +++ b/tracer_options.go @@ -81,7 +81,11 @@ func (tracerOptions) RandomNumber(randomNumber func() uint64) TracerOption { // that can access parent spans after those spans have been finished. func (tracerOptions) PoolSpans(poolSpans bool) TracerOption { return func(tracer *Tracer) { - tracer.options.poolSpans = poolSpans + if poolSpans { + tracer.spanAllocator = newSyncPollSpanAllocator() + } else { + tracer.spanAllocator = simpleSpanAllocator{} + } } } diff --git a/tracer_test.go b/tracer_test.go index b1e9ed53..ab877509 100644 --- a/tracer_test.go +++ b/tracer_test.go @@ -50,6 +50,7 @@ func (s *tracerSuite) SetupTest() { TracerOptions.Metrics(metrics), TracerOptions.ZipkinSharedRPCSpan(true), TracerOptions.BaggageRestrictionManager(baggage.NewDefaultRestrictionManager(0)), + TracerOptions.PoolSpans(false), ) s.NotNil(s.tracer) } @@ -246,6 +247,10 @@ func TestTracerOptions(t *testing.T) { rnd := func() uint64 { return 1 } + isPoolAllocator := func(allocator SpanAllocator) bool { + _, ok := allocator.(*syncPollSpanAllocator) + return ok + } openTracer, closer := NewTracer("DOOP", // respect the classics, man! NewConstSampler(true), @@ -264,7 +269,7 @@ func TestTracerOptions(t *testing.T) { assert.Equal(t, uint64(1), tracer.randomNumber()) assert.Equal(t, uint64(1), tracer.randomNumber()) assert.Equal(t, uint64(1), tracer.randomNumber()) // always 1 - assert.Equal(t, true, tracer.options.poolSpans) + assert.Equal(t, true, isPoolAllocator(tracer.spanAllocator)) assert.Equal(t, opentracing.Tag{Key: "tag_key", Value: "tag_value"}, tracer.Tags()[0]) }