Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Commit

Permalink
Fix the span allocation in the pool
Browse files Browse the repository at this point in the history
This feature didn't work properly what spawned errors and data overwriting before spans was sent.

```
WARNING: DATA RACE
Write at 0x00c4202223d8 by goroutine 18:
  github.com/uber/jaeger-client-go.(*Tracer).newSpan()
      github.com/uber/jaeger-client-go/tracer.go:357 +0xff
  github.com/uber/jaeger-client-go.(*Tracer).startSpanWithOptions()
      github.com/uber/jaeger-client-go/tracer.go:289 +0xbdd
  github.com/uber/jaeger-client-go.(*Tracer).StartSpan()
      github.com/uber/jaeger-client-go/tracer.go:200 +0x17d
  github.com/uber/jaeger-client-go.TestRemoteReporterAppendWithPollAllocator()
```

Signed-off-by: Dmitry Ponomarev <demdxx@gmail.com>
  • Loading branch information
demdxx committed Mar 16, 2019
1 parent f4d58ba commit e06d0b2
Show file tree
Hide file tree
Showing 14 changed files with 330 additions and 85 deletions.
33 changes: 26 additions & 7 deletions jaeger_thrift_span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,17 @@ func TestBuildJaegerThrift(t *testing.T) {
ext.PeerService.Set(sp1, "svc")
sp2 := tracer.StartSpan("sp2", opentracing.ChildOf(sp1.Context())).(*Span)
ext.SpanKindRPCClient.Set(sp2)
sp2.Finish()
sp1.Finish()

// NOTE: method Finish releas the memory or return span into the pool
sp2.Retain().Finish()
sp1.Retain().Finish()

jaegerSpan1 := BuildJaegerThrift(sp1)
jaegerSpan2 := BuildJaegerThrift(sp2)

sp2.Release()
sp1.Release()

assert.Equal(t, "sp1", jaegerSpan1.OperationName)
assert.Equal(t, "sp2", jaegerSpan2.OperationName)
assert.EqualValues(t, 0, jaegerSpan1.ParentSpanId)
Expand Down Expand Up @@ -86,7 +92,8 @@ func TestBuildJaegerProcessThrift(t *testing.T) {
defer closer.Close()

sp := tracer.StartSpan("sp1").(*Span)
sp.Finish()
sp.Retain().Finish() // Need to release the memory after the converting into the Thrift object
defer sp.Release()

process := BuildJaegerProcessThrift(sp)
assert.Equal(t, process.ServiceName, "DOOP")
Expand Down Expand Up @@ -239,7 +246,17 @@ func TestBuildLogs(t *testing.T) {
}
for i, test := range tests {
testName := fmt.Sprintf("test-%02d", i)
sp := tracer.StartSpan(testName, opentracing.ChildOf(root.Context()))
sp := tracer.StartSpan(testName, opentracing.ChildOf(root.Context())).(*Span)

// Note: some of the tests run span Finish function which releases the allocated
// span object So we have to retain the object do not destroy it in memory prematurely
sp.Retain(1)

// Before release reduce the counter
defer func() {
sp.Retain(-1).Release()
}()

if test.disableSampling {
ext.SamplingPriority.Set(sp, 0)
}
Expand All @@ -248,7 +265,7 @@ func TestBuildLogs(t *testing.T) {
} else if test.field != (log.Field{}) {
sp.LogFields(test.field)
}
jaegerSpan := BuildJaegerThrift(sp.(*Span))
jaegerSpan := BuildJaegerThrift(sp)
if test.disableSampling {
assert.Equal(t, 0, len(jaegerSpan.Logs), testName)
continue
Expand Down Expand Up @@ -318,7 +335,8 @@ func TestJaegerSpanBaggageLogs(t *testing.T) {
sp := tracer.StartSpan("s1").(*Span)
sp.SetBaggageItem("auth.token", "token")
ext.SpanKindRPCServer.Set(sp)
sp.Finish()
sp.Retain().Finish()
defer sp.Release()

jaegerSpan := BuildJaegerThrift(sp)
require.Len(t, jaegerSpan.Logs, 1)
Expand Down Expand Up @@ -350,7 +368,8 @@ func TestJaegerMaxTagValueLength(t *testing.T) {
sp := tracer.StartSpan("s1").(*Span)
sp.SetTag("tag.string", string(test.value))
sp.SetTag("tag.bytes", test.value)
sp.Finish()
sp.Retain().Finish()
defer sp.Release()
thriftSpan := BuildJaegerThrift(sp)
stringTag := findTag(thriftSpan, "tag.string")
assert.Equal(t, j.TagType_STRING, stringTag.VType)
Expand Down
13 changes: 11 additions & 2 deletions propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,17 @@ func TestSpanPropagator(t *testing.T) {
const op = "test"
reporter := NewInMemoryReporter()
metricsFactory, metrics := initMetrics()
tracer, closer := NewTracer("x", NewConstSampler(true), reporter, TracerOptions.Metrics(metrics), TracerOptions.ZipkinSharedRPCSpan(true))
tracer, closer := NewTracer("x",
NewConstSampler(true),
reporter,
TracerOptions.Metrics(metrics),
TracerOptions.ZipkinSharedRPCSpan(true),
)

// Note: the closing of the main tracer object Close also and
// related containers as reporter, sampler, logger & etc.
// So need to close it after related containers no longer used
defer closer.Close()

mapc := opentracing.TextMapCarrier(make(map[string]string))
httpc := opentracing.HTTPHeadersCarrier(http.Header{})
Expand Down Expand Up @@ -65,7 +75,6 @@ func TestSpanPropagator(t *testing.T) {
child.Finish()
}
sp.Finish()
closer.Close()

otSpans := reporter.GetSpans()
require.Equal(t, len(tests)+1, len(otSpans), "unexpected number of spans reporter")
Expand Down
20 changes: 15 additions & 5 deletions reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ func NewInMemoryReporter() *InMemoryReporter {
// Report implements Report() method of Reporter by storing the span in the buffer.
func (r *InMemoryReporter) Report(span *Span) {
r.lock.Lock()
r.spans = append(r.spans, span)
// Need to retain the span otherwise it will be released
r.spans = append(r.spans, span.Retain())
r.lock.Unlock()
}

// Close implements Close() method of Reporter by doing nothing.
// Close implements Close() method of Reporter
func (r *InMemoryReporter) Close() {
// no-op
r.Reset()
}

// SpansSubmitted returns the number of spans accumulated in the buffer.
Expand All @@ -122,7 +123,14 @@ func (r *InMemoryReporter) GetSpans() []opentracing.Span {
func (r *InMemoryReporter) Reset() {
r.lock.Lock()
defer r.lock.Unlock()
r.spans = nil

if len(r.spans) > 0 {
// Before reset the collection need to release Span memory
for _, span := range r.spans {
span.(*Span).Release()
}
r.spans = r.spans[:0]
}
}

// ------------------------------
Expand Down Expand Up @@ -218,7 +226,8 @@ func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
// because some of them may still be successfully added to the queue.
func (r *remoteReporter) Report(span *Span) {
select {
case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span}:
// Need to retain the span otherwise it will be released
case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span.Retain()}:
atomic.AddInt64(&r.queueLength, 1)
default:
r.metrics.ReporterDropped.Inc(1)
Expand Down Expand Up @@ -278,6 +287,7 @@ func (r *remoteReporter) processQueue() {
// to reduce the number of gauge stats, we only emit queue length on flush
r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength))
}
span.Release()
case reporterQueueItemClose:
timer.Stop()
flush()
Expand Down
15 changes: 15 additions & 0 deletions reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,17 @@ func TestRemoteReporterFailedFlushViaAppend(t *testing.T) {
s.assertLogs(t, "ERROR: error reporting span \"sp2\": flush error\nERROR: error when flushing the buffer: flush error\n")
}

func TestRemoteReporterAppendWithPollAllocator(t *testing.T) {
s := makeReporterSuiteWithSender(t, &fakeSender{bufferSize: 10}, ReporterOptions.BufferFlushInterval(time.Millisecond*10))
TracerOptions.PoolSpans(true)(s.tracer.(*Tracer))
for i := 0; i < 1000; i++ {
s.tracer.StartSpan("sp").Finish()
}
time.Sleep(time.Second)
s.sender.assertFlushedSpans(t, 1000)
s.close() // causes explicit flush that also fails with the same error
}

func TestRemoteReporterDroppedSpans(t *testing.T) {
s := makeReporterSuite(t, ReporterOptions.QueueSize(1))
defer s.close()
Expand Down Expand Up @@ -301,6 +312,10 @@ func (s *fakeSender) Append(span *Span) (int, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

// Validation of span
if span.tracer == nil {
return 0, s.appendErr
}
s.spans = append(s.spans, span)
if n := len(s.spans); n == s.bufferSize {
return s.flushNoLock()
Expand Down
57 changes: 57 additions & 0 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package jaeger

import (
"sync"
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -57,6 +58,10 @@ type Span struct {
references []Reference

observer ContribSpanObserver

// retainCounter used to increase the lifetime of
// the object before return it into the pool.
retainCounter int32
}

// Tag is a simple key value wrapper.
Expand Down Expand Up @@ -174,6 +179,9 @@ func (s *Span) BaggageItem(key string) string {
}

// Finish implements opentracing.Span API
// After finishing of the Span object it returns back to the allocator
// so after that, the Span object should no longer be used because
// it won't be valid anymore.
func (s *Span) Finish() {
s.FinishWithOptions(opentracing.FinishOptions{})
}
Expand All @@ -197,6 +205,7 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
}
s.Unlock()
// call reportSpan even for non-sampled traces, to return span to the pool
// and update metrics counter
s.tracer.reportSpan(s)
}

Expand Down Expand Up @@ -225,6 +234,54 @@ func (s *Span) OperationName() string {
return s.operationName
}

// Retain increases object counter to increase the lifetime of the object
func (s *Span) Retain(count ...int32) *Span {
counter := int32(1)
if len(count) > 0 && count[0] != 0 {
counter = count[0]
}
atomic.AddInt32(&s.retainCounter, counter)
return s
}

// Release decrements object counter and return to the
// allocator manager when counter will below zero
func (s *Span) Release(count ...int32) {
counter := int32(-1)
if len(count) > 0 && count[0] != 0 {
counter = -count[0]
}

if atomic.AddInt32(&s.retainCounter, counter) < 0 {
if tr := s.tracer; tr != nil {
tr.spanAllocator.Put(s)
}
}
}

// reset span state and release unused data
func (s *Span) reset() {
s.firstInProcess = false
s.context = emptyContext
s.operationName = ""
s.tracer = nil
s.startTime = time.Time{}
s.duration = 0
s.observer = nil
atomic.StoreInt32(&s.retainCounter, 0)

// Note: To reuse memory we can save the pointers on the heap
if len(s.tags) > 0 {
s.tags = s.tags[:0]
}
if len(s.logs) > 0 {
s.logs = s.logs[:0]
}
if len(s.references) > 0 {
s.references = s.references[:0]
}
}

func (s *Span) serviceName() string {
return s.tracer.serviceName
}
Expand Down
63 changes: 63 additions & 0 deletions span_allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

import "sync"

// SpanAllocator abstraction of managign span allocations
type SpanAllocator interface {
Get() *Span
Put(*Span)
IsPool() bool
}

type spanSyncPool struct {
spanPool sync.Pool
}

func newSpanSyncPool() SpanAllocator {
return &spanSyncPool{
spanPool: sync.Pool{New: func() interface{} {
return &Span{}
}},
}
}

func (pool *spanSyncPool) Get() *Span {
return pool.spanPool.Get().(*Span)
}

func (pool *spanSyncPool) Put(span *Span) {
span.reset()
pool.spanPool.Put(span)
}

func (pool *spanSyncPool) IsPool() bool {
return true
}

type spanSimpleAllocator struct{}

func (pool spanSimpleAllocator) Get() *Span {
return &Span{}
}

func (pool spanSimpleAllocator) Put(span *Span) {
span.reset()
}

func (pool spanSimpleAllocator) IsPool() bool {
return false
}
48 changes: 48 additions & 0 deletions span_allocator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

import "testing"

func BenchmarkSpanAllocator(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

b.Run("SyncPool", func(b *testing.B) {
benchSpanAllocator(newSpanSyncPool(), b)
})

b.Run("Simple", func(b *testing.B) {
benchSpanAllocator(spanSimpleAllocator{}, b)
})
}

func benchSpanAllocator(allocator SpanAllocator, b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
queue := make(chan *Span, 1000)
cancel := make(chan bool, 1)
go func() {
for span := range queue {
allocator.Put(span)
}
cancel <- true
}()
for pb.Next() {
queue <- allocator.Get()
}
close(queue)
<-cancel
})
}
2 changes: 1 addition & 1 deletion span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestSpanOperationName(t *testing.T) {

sp1 := tracer.StartSpan("s1").(*Span)
sp1.SetOperationName("s2")
sp1.Finish()
defer sp1.Finish()

assert.Equal(t, "s2", sp1.OperationName())
}
Expand Down
Loading

0 comments on commit e06d0b2

Please sign in to comment.