Skip to content

Commit

Permalink
Fix the span allocation in the pool (jaegertracing#381)
Browse files Browse the repository at this point in the history
*  Fix the span allocation in the pool

This feature didn't work properly what spawned errors and data overwriting before spans was sent.

```
WARNING: DATA RACE
Write at 0x00c4202223d8 by goroutine 18:
  github.com/uber/jaeger-client-go.(*Tracer).newSpan()
      github.com/uber/jaeger-client-go/tracer.go:357 +0xff
  github.com/uber/jaeger-client-go.(*Tracer).startSpanWithOptions()
      github.com/uber/jaeger-client-go/tracer.go:289 +0xbdd
  github.com/uber/jaeger-client-go.(*Tracer).StartSpan()
      github.com/uber/jaeger-client-go/tracer.go:200 +0x17d
  github.com/uber/jaeger-client-go.TestRemoteReporterAppendWithPollAllocator()
```

Signed-off-by: Dmitry Ponomarev <demdxx@gmail.com>

* Several commits related to pool span allocation and test

* TestRemoteReporterAppendWithPollAllocator => TestRemoteReporterAppendWithPoolAllocator
* Revert tests related to span allocation Retain/Release functions
* See the pull comments jaegertracing#381 (comment)
* Revert the tests to make them work by original logic
* Revert transport and tracer tests
* Fixes after code review
  jaegertracing#381 (review)

Signed-off-by: Dmitry Ponomarev <demdxx@gmail.com>
  • Loading branch information
demdxx authored and yurishkuro committed Apr 23, 2019
1 parent 4a1cede commit 896f2ab
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 35 deletions.
3 changes: 2 additions & 1 deletion propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func TestSpanPropagator(t *testing.T) {
child.Finish()
}
sp.Finish()
closer.Close()

otSpans := reporter.GetSpans()
require.Equal(t, len(tests)+1, len(otSpans), "unexpected number of spans reporter")
Expand All @@ -75,6 +74,8 @@ func TestSpanPropagator(t *testing.T) {
spans[i] = s.(*Span)
}

closer.Close()

// The last span is the original one.
exp, spans := spans[len(spans)-1], spans[:len(spans)-1]
exp.duration = time.Duration(123)
Expand Down
18 changes: 13 additions & 5 deletions reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ func NewInMemoryReporter() *InMemoryReporter {
// Report implements Report() method of Reporter by storing the span in the buffer.
func (r *InMemoryReporter) Report(span *Span) {
r.lock.Lock()
r.spans = append(r.spans, span)
// Need to retain the span otherwise it will be released
r.spans = append(r.spans, span.Retain())
r.lock.Unlock()
}

// Close implements Close() method of Reporter by doing nothing.
// Close implements Close() method of Reporter
func (r *InMemoryReporter) Close() {
// no-op
r.Reset()
}

// SpansSubmitted returns the number of spans accumulated in the buffer.
Expand All @@ -122,7 +123,12 @@ func (r *InMemoryReporter) GetSpans() []opentracing.Span {
func (r *InMemoryReporter) Reset() {
r.lock.Lock()
defer r.lock.Unlock()
r.spans = nil

// Before reset the collection need to release Span memory
for _, span := range r.spans {
span.(*Span).Release()
}
r.spans = r.spans[:0]
}

// ------------------------------
Expand Down Expand Up @@ -218,7 +224,8 @@ func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
// because some of them may still be successfully added to the queue.
func (r *remoteReporter) Report(span *Span) {
select {
case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span}:
// Need to retain the span otherwise it will be released
case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span.Retain()}:
atomic.AddInt64(&r.queueLength, 1)
default:
r.metrics.ReporterDropped.Inc(1)
Expand Down Expand Up @@ -278,6 +285,7 @@ func (r *remoteReporter) processQueue() {
// to reduce the number of gauge stats, we only emit queue length on flush
r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength))
}
span.Release()
case reporterQueueItemClose:
timer.Stop()
flush()
Expand Down
11 changes: 11 additions & 0 deletions reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,17 @@ func TestRemoteReporterFailedFlushViaAppend(t *testing.T) {
s.assertLogs(t, "ERROR: error reporting span \"sp2\": flush error\nERROR: error when flushing the buffer: flush error\n")
}

func TestRemoteReporterAppendWithPoolAllocator(t *testing.T) {
s := makeReporterSuiteWithSender(t, &fakeSender{bufferSize: 100}, ReporterOptions.BufferFlushInterval(time.Millisecond*10))
TracerOptions.PoolSpans(true)(s.tracer.(*Tracer))
for i := 0; i < 100; i++ {
s.tracer.StartSpan("sp").Finish()
}
time.Sleep(time.Second)
s.sender.assertFlushedSpans(t, 100)
s.close() // causes explicit flush that also fails with the same error
}

func TestRemoteReporterDroppedSpans(t *testing.T) {
s := makeReporterSuite(t, ReporterOptions.QueueSize(1))
defer s.close()
Expand Down
43 changes: 41 additions & 2 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package jaeger

import (
"sync"
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
Expand All @@ -25,6 +26,10 @@ import (

// Span implements opentracing.Span
type Span struct {
// referenceCounter used to increase the lifetime of
// the object before return it into the pool.
referenceCounter int32

sync.RWMutex

tracer *Tracer
Expand Down Expand Up @@ -174,6 +179,8 @@ func (s *Span) BaggageItem(key string) string {
}

// Finish implements opentracing.Span API
// After finishing the Span object it returns back to the allocator unless the reporter retains it again,
// so after that, the Span object should no longer be used because it won't be valid anymore.
func (s *Span) Finish() {
s.FinishWithOptions(opentracing.FinishOptions{})
}
Expand All @@ -197,6 +204,7 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
}
s.Unlock()
// call reportSpan even for non-sampled traces, to return span to the pool
// and update metrics counter
s.tracer.reportSpan(s)
}

Expand Down Expand Up @@ -225,18 +233,49 @@ func (s *Span) OperationName() string {
return s.operationName
}

// Retain increases object counter to increase the lifetime of the object
func (s *Span) Retain() *Span {
atomic.AddInt32(&s.referenceCounter, 1)
return s
}

// Release decrements object counter and return to the
// allocator manager when counter will below zero
func (s *Span) Release() {
if atomic.AddInt32(&s.referenceCounter, -1) == -1 {
s.tracer.spanAllocator.Put(s)
}
}

// reset span state and release unused data
func (s *Span) reset() {
s.firstInProcess = false
s.context = emptyContext
s.operationName = ""
s.tracer = nil
s.startTime = time.Time{}
s.duration = 0
s.observer = nil
atomic.StoreInt32(&s.referenceCounter, 0)

// Note: To reuse memory we can save the pointers on the heap
s.tags = s.tags[:0]
s.logs = s.logs[:0]
s.references = s.references[:0]
}

func (s *Span) serviceName() string {
return s.tracer.serviceName
}

// setSamplingPriority returns true if the flag was updated successfully, false otherwise.
func setSamplingPriority(s *Span, value interface{}) bool {
s.Lock()
defer s.Unlock()
val, ok := value.(uint16)
if !ok {
return false
}
s.Lock()
defer s.Unlock()
if val == 0 {
s.context.flags = s.context.flags & (^flagSampled)
return true
Expand Down
56 changes: 56 additions & 0 deletions span_allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

import "sync"

// SpanAllocator abstraction of managign span allocations
type SpanAllocator interface {
Get() *Span
Put(*Span)
}

type syncPollSpanAllocator struct {
spanPool sync.Pool
}

func newSyncPollSpanAllocator() SpanAllocator {
return &syncPollSpanAllocator{
spanPool: sync.Pool{New: func() interface{} {
return &Span{}
}},
}
}

func (pool *syncPollSpanAllocator) Get() *Span {
return pool.spanPool.Get().(*Span)
}

func (pool *syncPollSpanAllocator) Put(span *Span) {
span.reset()
pool.spanPool.Put(span)
}

type simpleSpanAllocator struct{}

func (pool simpleSpanAllocator) Get() *Span {
return &Span{}
}

func (pool simpleSpanAllocator) Put(span *Span) {
// @comment https://github.com/jaegertracing/jaeger-client-go/pull/381#issuecomment-475904351
// since finished spans are not reused, no need to reset them
// span.reset()
}
48 changes: 48 additions & 0 deletions span_allocator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

import "testing"

func BenchmarkSpanAllocator(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

b.Run("SyncPool", func(b *testing.B) {
benchSpanAllocator(newSyncPollSpanAllocator(), b)
})

b.Run("Simple", func(b *testing.B) {
benchSpanAllocator(simpleSpanAllocator{}, b)
})
}

func benchSpanAllocator(allocator SpanAllocator, b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
queue := make(chan *Span, 1000)
cancel := make(chan bool, 1)
go func() {
for span := range queue {
allocator.Put(span)
}
cancel <- true
}()
for pb.Next() {
queue <- allocator.Get()
}
close(queue)
<-cancel
})
}
23 changes: 22 additions & 1 deletion span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package jaeger

import (
"sync"
"sync/atomic"
"testing"

"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -146,3 +147,23 @@ func TestBaggageContextRace(t *testing.T) {
startWg.Done()
endWg.Wait()
}

func TestSpanLifecycle(t *testing.T) {
service := "DOOP"
tracer, closer := NewTracer(service, NewConstSampler(true), NewNullReporter(), TracerOptions.PoolSpans(true))
// After closing all contexts will be released
defer closer.Close()

sp1 := tracer.StartSpan("s1").(*Span)
sp1.LogEvent("foo")
assert.True(t, sp1.tracer != nil, "invalid span initalisation")

sp1.Retain() // After this we are responsible for the span releasing
assert.Equal(t, int32(1), atomic.LoadInt32(&sp1.referenceCounter))

sp1.Finish() // The span is still alive
assert.True(t, sp1.tracer != nil, "invalid span finishing")

sp1.Release() // Now we will kill the object and return it in the pool
assert.True(t, sp1.tracer == nil, "span must be released")
}
40 changes: 16 additions & 24 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,14 @@ type Tracer struct {
randomNumber func() uint64

options struct {
poolSpans bool
gen128Bit bool // whether to generate 128bit trace IDs
zipkinSharedRPCSpan bool
highTraceIDGenerator func() uint64 // custom high trace ID generator
maxTagValueLength int
// more options to come
}
// pool for Span objects
spanPool sync.Pool
// allocator of Span objects
spanAllocator SpanAllocator

injectors map[interface{}]Injector
extractors map[interface{}]Extractor
Expand All @@ -81,15 +80,13 @@ func NewTracer(
options ...TracerOption,
) (opentracing.Tracer, io.Closer) {
t := &Tracer{
serviceName: serviceName,
sampler: sampler,
reporter: reporter,
injectors: make(map[interface{}]Injector),
extractors: make(map[interface{}]Extractor),
metrics: *NewNullMetrics(),
spanPool: sync.Pool{New: func() interface{} {
return &Span{}
}},
serviceName: serviceName,
sampler: sampler,
reporter: reporter,
injectors: make(map[interface{}]Injector),
extractors: make(map[interface{}]Extractor),
metrics: *NewNullMetrics(),
spanAllocator: simpleSpanAllocator{},
}

for _, option := range options {
Expand Down Expand Up @@ -353,15 +350,7 @@ func (t *Tracer) Tags() []opentracing.Tag {
// newSpan returns an instance of a clean Span object.
// If options.PoolSpans is true, the spans are retrieved from an object pool.
func (t *Tracer) newSpan() *Span {
if !t.options.poolSpans {
return &Span{}
}
sp := t.spanPool.Get().(*Span)
sp.context = emptyContext
sp.tracer = nil
sp.tags = nil
sp.logs = nil
return sp
return t.spanAllocator.Get()
}

func (t *Tracer) startSpanInternal(
Expand Down Expand Up @@ -416,12 +405,15 @@ func (t *Tracer) startSpanInternal(

func (t *Tracer) reportSpan(sp *Span) {
t.metrics.SpansFinished.Inc(1)

// Note: if the reporter is processing Span asynchronously need to Retain() it
// otherwise, in the racing condition will be rewritten span data before it will be sent
// * To remove object use method span.Release()
if sp.context.IsSampled() {
t.reporter.Report(sp)
}
if t.options.poolSpans {
t.spanPool.Put(sp)
}

sp.Release()
}

// randomID generates a random trace/span ID, using tracer.random() generator.
Expand Down
Loading

0 comments on commit 896f2ab

Please sign in to comment.