Skip to content

Commit

Permalink
ResourceSpans might contain spans from multiple traces
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
jpkrohling committed Jul 31, 2020
1 parent 400ca2c commit 6c2ac09
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 192 deletions.
4 changes: 2 additions & 2 deletions processor/groupbytraceprocessor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type eventMachine struct {

onTraceReceived func(pdata.Traces) error
onTraceExpired func(pdata.TraceID) error
onTraceReleased func(pdata.ResourceSpans) error
onTraceReleased func([]pdata.ResourceSpans) error
onTraceRemoved func(pdata.TraceID) error

onError func(event)
Expand Down Expand Up @@ -114,7 +114,7 @@ func (em *eventMachine) start() {
em.callOnError(e)
continue
}
payload, ok := e.payload.(pdata.ResourceSpans)
payload, ok := e.payload.([]pdata.ResourceSpans)
if !ok {
// the payload had an unexpected type!
em.callOnError(e)
Expand Down
6 changes: 3 additions & 3 deletions processor/groupbytraceprocessor/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func TestEventCallback(t *testing.T) {
{
casename: "onTraceReleased",
typ: traceReleased,
payload: pdata.NewResourceSpans(),
payload: []pdata.ResourceSpans{},
registerCallback: func(em *eventMachine, wg *sync.WaitGroup) {
em.onTraceReleased = func(expired pdata.ResourceSpans) error {
em.onTraceReleased = func(expired []pdata.ResourceSpans) error {
wg.Done()
return nil
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestEventInvalidPayload(t *testing.T) {
casename: "onTraceReleased",
typ: traceReleased,
registerCallback: func(em *eventMachine, wg *sync.WaitGroup) {
em.onTraceReleased = func(expired pdata.ResourceSpans) error {
em.onTraceReleased = func(expired []pdata.ResourceSpans) error {
return nil
}
},
Expand Down
161 changes: 100 additions & 61 deletions processor/groupbytraceprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ import (
)

var (
errNoInstrumentationLibrarySpans = errors.New("trace didn't contain any instrumentation library spans")
errNoSpans = errors.New("instrumentation library spans didn't contain any spans")
errNilTrace = errors.New("invalid trace (nil)")
errNilTraceID = errors.New("invalid trace ID (nil)")
errNilResourceSpans = errors.New("invalid resource spans (nil)")
)

// groupByTraceProcessor is a processor that keeps traces in memory for a given duration, with the expectation
Expand Down Expand Up @@ -116,69 +113,64 @@ func (sp *groupByTraceProcessor) Shutdown(ctx context.Context) error {

func (sp *groupByTraceProcessor) onTraceReceived(batch pdata.Traces) error {
for i := 0; i < batch.ResourceSpans().Len(); i++ {
if err := sp.processTrace(batch.ResourceSpans().At(i)); err != nil {
if err := sp.processResourceSpans(batch.ResourceSpans().At(i)); err != nil {
sp.logger.Info("failed to process trace", zap.Error(err))
}
}

return nil
}

func (sp *groupByTraceProcessor) processTrace(trace pdata.ResourceSpans) error {
if trace.IsNil() {
func (sp *groupByTraceProcessor) processResourceSpans(rs pdata.ResourceSpans) error {
if rs.IsNil() {
// should not happen with the current code
return errNilTrace
return errNilResourceSpans
}

traceID, err := extractTraceID(trace)
if err != nil {
return fmt.Errorf("couldn't extract trace ID from trace: %w", err)
}
for _, batch := range splitByTrace(rs) {
traceID := batch.traceID
if sp.ringBuffer.contains(traceID) {
// it exists in memory already, just append the spans to the trace in the storage
if err := sp.addSpans(traceID, batch.rs); err != nil {
return fmt.Errorf("couldn't add spans to existing trace: %w", err)
}

if traceID == nil {
return errNilTraceID
}

if sp.ringBuffer.contains(traceID) {
// it exists in memory already, just append the spans to the trace in the storage
if err := sp.addSpans(traceID, trace); err != nil {
return fmt.Errorf("couldn't add spans to existing trace: %w", err)
// we are done with this trace, move on
return nil
}

// we are done with this trace, move on
return nil
}

// at this point, we determined that we haven't seen the trace yet, so, record the
// traceID in the map and the spans to the storage
// at this point, we determined that we haven't seen the trace yet, so, record the
// traceID in the map and the spans to the storage

// place the trace ID in the buffer, and check if an item had to be evicted
evicted := sp.ringBuffer.put(traceID)
if evicted != nil {
// delete from the storage
sp.eventMachine.fire(event{
typ: traceRemoved,
payload: evicted,
})
// place the trace ID in the buffer, and check if an item had to be evicted
evicted := sp.ringBuffer.put(traceID)
if evicted != nil {
// delete from the storage
sp.eventMachine.fire(event{
typ: traceRemoved,
payload: evicted,
})

// TODO: do we want another channel that receives evicted items? record a metric perhaps?
sp.logger.Info("trace evicted: in order to avoid this in the future, adjust the wait duration and/or number of traces to keep in memory", zap.Stringer("traceID", evicted))
}
// TODO: do we want another channel that receives evicted items? record a metric perhaps?
sp.logger.Info("trace evicted: in order to avoid this in the future, adjust the wait duration and/or number of traces to keep in memory", zap.Stringer("traceID", evicted))
}

// we have the traceID in the memory, place the spans in the storage too
if err := sp.addSpans(traceID, trace); err != nil {
return fmt.Errorf("couldn't add spans to new trace: %w", err)
}
// we have the traceID in the memory, place the spans in the storage too
if err := sp.addSpans(traceID, batch.rs); err != nil {
return fmt.Errorf("couldn't add spans to new trace: %w", err)
}

sp.logger.Debug("scheduled to release trace", zap.Duration("duration", sp.config.WaitDuration))
sp.logger.Debug("scheduled to release trace", zap.Duration("duration", sp.config.WaitDuration))

time.AfterFunc(sp.config.WaitDuration, func() {
// if the event machine has stopped, it will just discard the event
sp.eventMachine.fire(event{
typ: traceExpired,
payload: traceID,
time.AfterFunc(sp.config.WaitDuration, func() {
// if the event machine has stopped, it will just discard the event
sp.eventMachine.fire(event{
typ: traceExpired,
payload: traceID,
})
})
})

}

return nil
}
Expand Down Expand Up @@ -210,7 +202,7 @@ func (sp *groupByTraceProcessor) markAsReleased(traceID pdata.TraceID) error {
return fmt.Errorf("couldn't retrieve trace %q from the storage: %w", traceID, err)
}

if trace.IsNil() {
if trace == nil {
return fmt.Errorf("the trace %q couldn't be found at the storage", traceID)
}

Expand All @@ -229,9 +221,12 @@ func (sp *groupByTraceProcessor) markAsReleased(traceID pdata.TraceID) error {
return nil
}

func (sp *groupByTraceProcessor) onTraceReleased(rs pdata.ResourceSpans) error {
func (sp *groupByTraceProcessor) onTraceReleased(rss []pdata.ResourceSpans) error {
trace := pdata.NewTraces()
trace.ResourceSpans().Append(&rs)
for _, rs := range rss {
l := rs
trace.ResourceSpans().Append(&l)
}
return sp.nextConsumer.ConsumeTraces(context.Background(), trace)
}

Expand All @@ -241,7 +236,7 @@ func (sp *groupByTraceProcessor) onTraceRemoved(traceID pdata.TraceID) error {
return fmt.Errorf("couldn't delete trace %q from the storage: %w", traceID.String(), err)
}

if trace.IsNil() {
if trace == nil {
return fmt.Errorf("trace %q not found at the storage", traceID.String())
}

Expand All @@ -253,16 +248,60 @@ func (sp *groupByTraceProcessor) addSpans(traceID pdata.TraceID, trace pdata.Res
return sp.st.createOrAppend(traceID, trace)
}

func extractTraceID(trace pdata.ResourceSpans) (pdata.TraceID, error) {
ilSpans := trace.InstrumentationLibrarySpans()
if ilSpans.Len() <= 0 {
return nil, errNoInstrumentationLibrarySpans
}
type singleTraceBatch struct {
traceID pdata.TraceID
rs pdata.ResourceSpans
}

spans := ilSpans.At(0).Spans()
if spans.Len() <= 0 {
return nil, errNoSpans
func splitByTrace(rs pdata.ResourceSpans) []*singleTraceBatch {
// for each span in the resource spans, we group them into batches of rs/ils/traceID.
// if the same traceID exists in different ils, they land in different batches.
result := []*singleTraceBatch{}

for i := 0; i < rs.InstrumentationLibrarySpans().Len(); i++ {
// the batches for this ILS
batches := map[string]*singleTraceBatch{}

ils := rs.InstrumentationLibrarySpans().At(i)
for j := 0; j < ils.Spans().Len(); j++ {
span := ils.Spans().At(j)
if span.TraceID() == nil {
// this should have already been caught before our processor, but let's
// protect ourselves against bad clients
continue
}

sTraceID := span.TraceID().String()

// for the first traceID in the ILS, initialize the map entry
// and add the singleTraceBatch to the result list
if _, ok := batches[sTraceID]; !ok {
newRS := pdata.NewResourceSpans()
newRS.InitEmpty()
// currently, the ResourceSpans implementation has only a Resource and an ILS. We'll copy the Resource
// and set our own ILS
rs.Resource().CopyTo(newRS.Resource())

newILS := pdata.NewInstrumentationLibrarySpans()
newILS.InitEmpty()
newRS.InstrumentationLibrarySpans().Append(&newILS)

// currently, the ILS implementation has only an InstrumentationLibrary and spans. We'll copy the library
// and set our own spans
ils.InstrumentationLibrary().CopyTo(newILS.InstrumentationLibrary())

batch := &singleTraceBatch{
traceID: span.TraceID(),
rs: newRS,
}
batches[sTraceID] = batch
result = append(result, batch)
}

// there is only one instrumentation library per batch
batches[sTraceID].rs.InstrumentationLibrarySpans().At(0).Spans().Append(&span)
}
}

return spans.At(0).TraceID(), nil
return result
}
Loading

0 comments on commit 6c2ac09

Please sign in to comment.