Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Fix the span allocation in the pool #381

Merged
merged 2 commits into from
Apr 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func TestSpanPropagator(t *testing.T) {
child.Finish()
}
sp.Finish()
closer.Close()

otSpans := reporter.GetSpans()
require.Equal(t, len(tests)+1, len(otSpans), "unexpected number of spans reporter")
Expand All @@ -75,6 +74,8 @@ func TestSpanPropagator(t *testing.T) {
spans[i] = s.(*Span)
}

closer.Close()

// The last span is the original one.
exp, spans := spans[len(spans)-1], spans[:len(spans)-1]
exp.duration = time.Duration(123)
Expand Down
18 changes: 13 additions & 5 deletions reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ func NewInMemoryReporter() *InMemoryReporter {
// Report implements Report() method of Reporter by storing the span in the buffer.
func (r *InMemoryReporter) Report(span *Span) {
r.lock.Lock()
r.spans = append(r.spans, span)
// Need to retain the span otherwise it will be released
r.spans = append(r.spans, span.Retain())
r.lock.Unlock()
}

// Close implements Close() method of Reporter by doing nothing.
// Close implements Close() method of Reporter
func (r *InMemoryReporter) Close() {
// no-op
r.Reset()
}

// SpansSubmitted returns the number of spans accumulated in the buffer.
Expand All @@ -122,7 +123,12 @@ func (r *InMemoryReporter) GetSpans() []opentracing.Span {
func (r *InMemoryReporter) Reset() {
r.lock.Lock()
defer r.lock.Unlock()
r.spans = nil

// Before reset the collection need to release Span memory
for _, span := range r.spans {
span.(*Span).Release()
}
r.spans = r.spans[:0]
}

// ------------------------------
Expand Down Expand Up @@ -218,7 +224,8 @@ func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
// because some of them may still be successfully added to the queue.
func (r *remoteReporter) Report(span *Span) {
select {
case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span}:
// Need to retain the span otherwise it will be released
case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span.Retain()}:
atomic.AddInt64(&r.queueLength, 1)
default:
r.metrics.ReporterDropped.Inc(1)
Expand Down Expand Up @@ -278,6 +285,7 @@ func (r *remoteReporter) processQueue() {
// to reduce the number of gauge stats, we only emit queue length on flush
r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength))
}
span.Release()
case reporterQueueItemClose:
timer.Stop()
flush()
Expand Down
11 changes: 11 additions & 0 deletions reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,17 @@ func TestRemoteReporterFailedFlushViaAppend(t *testing.T) {
s.assertLogs(t, "ERROR: error reporting span \"sp2\": flush error\nERROR: error when flushing the buffer: flush error\n")
}

func TestRemoteReporterAppendWithPoolAllocator(t *testing.T) {
s := makeReporterSuiteWithSender(t, &fakeSender{bufferSize: 100}, ReporterOptions.BufferFlushInterval(time.Millisecond*10))
TracerOptions.PoolSpans(true)(s.tracer.(*Tracer))
for i := 0; i < 100; i++ {
s.tracer.StartSpan("sp").Finish()
}
time.Sleep(time.Second)
s.sender.assertFlushedSpans(t, 100)
s.close() // causes explicit flush that also fails with the same error
}

func TestRemoteReporterDroppedSpans(t *testing.T) {
s := makeReporterSuite(t, ReporterOptions.QueueSize(1))
defer s.close()
Expand Down
43 changes: 41 additions & 2 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package jaeger

import (
"sync"
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
Expand All @@ -25,6 +26,10 @@ import (

// Span implements opentracing.Span
type Span struct {
// referenceCounter used to increase the lifetime of
// the object before return it into the pool.
referenceCounter int32

sync.RWMutex

tracer *Tracer
Expand Down Expand Up @@ -174,6 +179,8 @@ func (s *Span) BaggageItem(key string) string {
}

// Finish implements opentracing.Span API
// After finishing the Span object it returns back to the allocator unless the reporter retains it again,
// so after that, the Span object should no longer be used because it won't be valid anymore.
func (s *Span) Finish() {
s.FinishWithOptions(opentracing.FinishOptions{})
}
Expand All @@ -197,6 +204,7 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
}
s.Unlock()
// call reportSpan even for non-sampled traces, to return span to the pool
// and update metrics counter
s.tracer.reportSpan(s)
}

Expand Down Expand Up @@ -225,18 +233,49 @@ func (s *Span) OperationName() string {
return s.operationName
}

// Retain increases object counter to increase the lifetime of the object
func (s *Span) Retain() *Span {
atomic.AddInt32(&s.referenceCounter, 1)
return s
}

// Release decrements object counter and return to the
// allocator manager when counter will below zero
func (s *Span) Release() {
if atomic.AddInt32(&s.referenceCounter, -1) == -1 {
s.tracer.spanAllocator.Put(s)
}
}

// reset span state and release unused data
func (s *Span) reset() {
s.firstInProcess = false
s.context = emptyContext
s.operationName = ""
s.tracer = nil
s.startTime = time.Time{}
s.duration = 0
s.observer = nil
atomic.StoreInt32(&s.referenceCounter, 0)

// Note: To reuse memory we can save the pointers on the heap
s.tags = s.tags[:0]
s.logs = s.logs[:0]
s.references = s.references[:0]
}

func (s *Span) serviceName() string {
return s.tracer.serviceName
}

// setSamplingPriority returns true if the flag was updated successfully, false otherwise.
func setSamplingPriority(s *Span, value interface{}) bool {
s.Lock()
defer s.Unlock()
val, ok := value.(uint16)
if !ok {
return false
}
s.Lock()
defer s.Unlock()
if val == 0 {
s.context.flags = s.context.flags & (^flagSampled)
return true
Expand Down
56 changes: 56 additions & 0 deletions span_allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

import "sync"

// SpanAllocator abstraction of managign span allocations
type SpanAllocator interface {
Get() *Span
Put(*Span)
}

type syncPollSpanAllocator struct {
spanPool sync.Pool
}

func newSyncPollSpanAllocator() SpanAllocator {
return &syncPollSpanAllocator{
spanPool: sync.Pool{New: func() interface{} {
return &Span{}
}},
}
}

func (pool *syncPollSpanAllocator) Get() *Span {
return pool.spanPool.Get().(*Span)
}

func (pool *syncPollSpanAllocator) Put(span *Span) {
span.reset()
pool.spanPool.Put(span)
}

type simpleSpanAllocator struct{}

func (pool simpleSpanAllocator) Get() *Span {
return &Span{}
}

func (pool simpleSpanAllocator) Put(span *Span) {
// @comment https://github.com/jaegertracing/jaeger-client-go/pull/381#issuecomment-475904351
// since finished spans are not reused, no need to reset them
// span.reset()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what the link is trying to say. I would add a comment // since finished spans are not reused, no need to reset them

}
48 changes: 48 additions & 0 deletions span_allocator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

import "testing"

func BenchmarkSpanAllocator(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

b.Run("SyncPool", func(b *testing.B) {
benchSpanAllocator(newSyncPollSpanAllocator(), b)
})

b.Run("Simple", func(b *testing.B) {
benchSpanAllocator(simpleSpanAllocator{}, b)
})
}

func benchSpanAllocator(allocator SpanAllocator, b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
queue := make(chan *Span, 1000)
cancel := make(chan bool, 1)
go func() {
for span := range queue {
allocator.Put(span)
}
cancel <- true
}()
for pb.Next() {
queue <- allocator.Get()
}
close(queue)
<-cancel
})
}
23 changes: 22 additions & 1 deletion span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package jaeger

import (
"sync"
"sync/atomic"
"testing"

"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -146,3 +147,23 @@ func TestBaggageContextRace(t *testing.T) {
startWg.Done()
endWg.Wait()
}

func TestSpanLifecycle(t *testing.T) {
service := "DOOP"
tracer, closer := NewTracer(service, NewConstSampler(true), NewNullReporter(), TracerOptions.PoolSpans(true))
// After closing all contexts will be released
defer closer.Close()

sp1 := tracer.StartSpan("s1").(*Span)
sp1.LogEvent("foo")
assert.True(t, sp1.tracer != nil, "invalid span initalisation")

sp1.Retain() // After this we are responsible for the span releasing
assert.Equal(t, int32(1), atomic.LoadInt32(&sp1.referenceCounter))

sp1.Finish() // The span is still alive
assert.True(t, sp1.tracer != nil, "invalid span finishing")

sp1.Release() // Now we will kill the object and return it in the pool
assert.True(t, sp1.tracer == nil, "span must be released")
}
40 changes: 16 additions & 24 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,14 @@ type Tracer struct {
randomNumber func() uint64

options struct {
poolSpans bool
gen128Bit bool // whether to generate 128bit trace IDs
zipkinSharedRPCSpan bool
highTraceIDGenerator func() uint64 // custom high trace ID generator
maxTagValueLength int
// more options to come
}
// pool for Span objects
spanPool sync.Pool
// allocator of Span objects
spanAllocator SpanAllocator

injectors map[interface{}]Injector
extractors map[interface{}]Extractor
Expand All @@ -81,15 +80,13 @@ func NewTracer(
options ...TracerOption,
) (opentracing.Tracer, io.Closer) {
t := &Tracer{
serviceName: serviceName,
sampler: sampler,
reporter: reporter,
injectors: make(map[interface{}]Injector),
extractors: make(map[interface{}]Extractor),
metrics: *NewNullMetrics(),
spanPool: sync.Pool{New: func() interface{} {
return &Span{}
}},
serviceName: serviceName,
sampler: sampler,
reporter: reporter,
injectors: make(map[interface{}]Injector),
extractors: make(map[interface{}]Extractor),
metrics: *NewNullMetrics(),
spanAllocator: simpleSpanAllocator{},
}

for _, option := range options {
Expand Down Expand Up @@ -349,15 +346,7 @@ func (t *Tracer) Tags() []opentracing.Tag {
// newSpan returns an instance of a clean Span object.
// If options.PoolSpans is true, the spans are retrieved from an object pool.
func (t *Tracer) newSpan() *Span {
if !t.options.poolSpans {
return &Span{}
}
sp := t.spanPool.Get().(*Span)
sp.context = emptyContext
sp.tracer = nil
sp.tags = nil
sp.logs = nil
return sp
return t.spanAllocator.Get()
}

func (t *Tracer) startSpanInternal(
Expand Down Expand Up @@ -412,12 +401,15 @@ func (t *Tracer) startSpanInternal(

func (t *Tracer) reportSpan(sp *Span) {
t.metrics.SpansFinished.Inc(1)

// Note: if the reporter is processing Span asynchronously need to Retain() it
// otherwise, in the racing condition will be rewritten span data before it will be sent
// * To remove object use method span.Release()
if sp.context.IsSampled() {
t.reporter.Report(sp)
}
if t.options.poolSpans {
t.spanPool.Put(sp)
}

sp.Release()
}

// randomID generates a random trace/span ID, using tracer.random() generator.
Expand Down
Loading