-
Notifications
You must be signed in to change notification settings - Fork 31
cmd/trace-agent: improvements to trace/transaction writing pipeline. #430
Conversation
writer/trace_writer.go
Outdated
@@ -17,14 +17,19 @@ import ( | |||
"github.com/golang/protobuf/proto" | |||
) | |||
|
|||
// TraceWriterPayload represents the result of a trace sample operation that should be written by a TraceWriter. | |||
type TraceWriterPayload struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be a confusing name (ex: with TracePayload
) as we used "payload" as something flushed to the API.
Any other idea? Maybe Trace(Writer?)Sample, SampledTrace, ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good call. Went with TraceSample
.
writer/trace_writer.go
Outdated
truncatedTrace := (*trace)[:splitIndex] | ||
trace = &truncatedTrace | ||
// If current number of spans in buffer reached the limit, flush | ||
w.flushDueToMaxSpansPerPayload() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we receive a large trace with len(trace) > MaxSpansPerPayload
when the buffer is empty, we will call flushDueToMaxSpansPerPayload
for nothing.
Technically it will do nothing (as flush
test the buffer size) but we will get a wrong w.stats.FlushMaxSpans
stat.
Actually a non-interesting nitpick?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch
writer/trace_writer.go
Outdated
if spansToAdd > w.conf.MaxSpansPerPayload { | ||
// If what we just added already goes over the limit, report this but lets carry on and flush | ||
atomic.AddInt64(&w.stats.SingleMaxSpans, 1) | ||
log.Warnf("Found single trace with more spans than limit per payload of %d spans", w.conf.MaxSpansPerPayload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could become noisy while not being actionable nor technically bad. Should we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed, we can rely on the metric to notice these.
cmd/trace-agent/agent.go
Outdated
a.TransactionSampler.Extract(pt, a.analyzedTransactionChan) | ||
writerPayload.Transactions = a.TransactionSampler.Extract(pt) | ||
|
||
a.traceChan <- &writerPayload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the trace is not sampled and no transactions are analyzed, writerPayload will be empty. We might just as well not send it in the channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, lets save some channel contention for sure 😄
writer/trace_writer.go
Outdated
@@ -274,6 +298,8 @@ func (w *TraceWriter) updateInfo() { | |||
w.statsClient.Count("datadog.trace_agent.trace_writer.bytes", int64(twInfo.Bytes), nil, 1) | |||
w.statsClient.Count("datadog.trace_agent.trace_writer.retries", int64(twInfo.Retries), nil, 1) | |||
w.statsClient.Count("datadog.trace_agent.trace_writer.errors", int64(twInfo.Errors), nil, 1) | |||
w.statsClient.Count("datadog.trace_agent.trace_writer.flush_max_spans", int64(twInfo.FlushMaxSpans), nil, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How relevant / useful are these new metrics? If we don't picture good use-cases ; it'd save some resources to not report them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added them because I missed them while debugging the recent issues (try to understand just how many splits were being done). But yeah now that we fixed splitting, flush_max_spans
probably won't be that useful.
single_max_spans
might still be useful though. Having a non-0 value is a hint that something is wrong as such traces are probably not very useful. And when trace payloads get rejected by the API it can give us an idea if traces just have too many spans (this would be non-0) or if they have few but very heavy spans (this would be 0). Don't feel strongly about it though.
e33e4f0
to
458d9bc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @AlexJF! This is looking good and it seems like things got cleaned up a bit in the process. Just a couple of nits.
cmd/trace-agent/agent.go
Outdated
@@ -295,12 +292,19 @@ func (a *Agent) Process(t model.Trace) { | |||
sampled = s.Add(pt) || sampled | |||
} | |||
|
|||
traceSample := writer.TraceSample{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this:
var traceSample writer.TraceSample
or
traceSample := writer.TraceSample{
Transactions: a.TransactionSampler.Extract(pt),
}
Whichever you prefer.
cmd/trace-agent/agent.go
Outdated
traceSample.Transactions = a.TransactionSampler.Extract(pt) | ||
|
||
// Only send if there's something interesting | ||
if traceSample.Trace != nil || traceSample.Transactions != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would be nicer to have a method on TraceSample
called Empty
and making this:
if !traceSample.Empty() {
// ...
}
type TransactionSampler interface { | ||
// Extracts extracts matching spans from the given trace and returns them via the `out` channel. | ||
Extract(t processedTrace, out chan<- *model.Span) | ||
// Extracts extracts matching spans from the given trace and returns them via a slice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a typo here (s/Extracts/Extract). I think this should suffice:
// Extract extracts matching spans from the given trace and returns them.
writer/trace_writer.go
Outdated
@@ -17,14 +17,19 @@ import ( | |||
"github.com/golang/protobuf/proto" | |||
) | |||
|
|||
// TraceSample represents the result of a trace sample operation. | |||
type TraceSample struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name makes me think that this structure holds only a sample (part) of a trace and could be misleading. I did see @LotharSee's comment about the naming, but I'm thinking maybe SampledTrace
would be a more specific name. What do you think?
// A SampledTrace holds the result of a trace sampling operation.
type SampledTrace struct {
// Trace specifies the trace that was sampled.
Trace *model.Trace
// Transactions specifies the spans which were extracted by the TransactionSampler.
Transactions []*model.Span
}
writer/trace_writer.go
Outdated
hostName string | ||
env string | ||
conf writerconfig.TraceWriterConfig | ||
InSamples <-chan *TraceSample |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd tend to think the same here, the word "sample" makes me think of only a part of the whole. This is the main input channel of the writer and it should be more than sufficient to call it in
. It also doesn't need to be an exported field since it isn't used anywhere outside this package. I've heard the expression "the 'in' channel" used many times, and since we only have one I think it would be clear enough.
writer/trace_writer.go
Outdated
// Should never happen due to overflow detection above but just in case | ||
panic("Number of spans in buffer went over the limit") | ||
func (w *TraceWriter) handleTransactions(transactions []*model.Span) { | ||
if transactions == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if len(transactions) == 0 {
Will have the same effect.
But then again, you can remove the entire if
statement because the for
loop will not iterate and the addition will be with 0
.
writer/trace_writer.go
Outdated
if w.spansInBuffer > 0 { | ||
spanOverflow := w.spansInBuffer + spansToAdd - w.conf.MaxSpansPerPayload | ||
|
||
if spanOverflow > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit too "smart", it would be much clearer to do:
if w.spansInBuffer + spansToAdd > w.conf.MaxSpansPerPayload {
Better keep it simple.
writer/trace_writer.go
Outdated
spansToAdd += len(transactions) | ||
} | ||
|
||
// If we have stuff pending and adding the new data would overflow max spans per payload, force a flush |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put this comment inside the if
statement body. It would also be nice if we wouldn't use the word "stuff" 😄 Something like:
// check if the new data would overflow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But stuff
is the best word in the English dictionary! So many possibilities! 😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for being so picky, I normally don't care so much. It's just because it's open source.
writer/trace_writer.go
Outdated
func (w *TraceWriter) flushDueToMaxSpansPerPayload() { | ||
atomic.AddInt64(&w.stats.FlushMaxSpans, 1) | ||
log.Debugf("Flushing because we reached max per payload") | ||
// If current number of spans in buffer reached the limit, flush |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment might be leftover from previous code, can you please remove it?
writer/trace_writer.go
Outdated
} | ||
} | ||
|
||
w.handleTrace(sample.Trace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you didn't name this but handleTrace
is not descriptive enough. What if we would call them appendTrace
and appendTransactions
, since that's what they're doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Alex for addressing part of my comments. I find it a bit hard to figure out which comments you have addressed and which you haven't because you haven't replied to any of them (at least the ones not addressed would be great to write a reply too).
I've written another round of feedback. Please bare with me for a couple more iterations and we will soon merge this.
writer/trace_writer.go
Outdated
} | ||
|
||
spanOverflow := w.spansInBuffer + spansToAdd - w.conf.MaxSpansPerPayload | ||
if w.spansInBuffer > 0 && spanOverflow > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason why you prefer this overflow version? I get the logic but I find it a bit harder to reason about, and it introduces a new variable too. Plus, I feel that the word "overflow" sounds like something bad has happened, whereas here we simply want to see if we've gone past the threshold. Would it not be more obvious as:
if w.spansInBuffer > 0 && w.spansInBuffer + n > w.conf.MaxSpansPerPayload {
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah you're right. The overflow variable was a relic from the previous code where it was actually used for index calculation. I still like to use variables as "documentation" so you reduce the parsing complexity inside the condition but fine this is not too complex anyway.
writer/trace_writer.go
Outdated
|
||
func (w *TraceWriter) flushDueToMaxSpansPerPayload() { | ||
log.Debugf("Flushing because we reached max per payload") | ||
// If current number of spans in buffer reached the limit, flush |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this comment? I think the function name is self-explanatory. Also, there is no condition here as the comment would indicate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
writer/trace_writer.go
Outdated
|
||
// Empty returns true if this TraceSample has no data. | ||
func (s *SampledTrace) Empty() bool { | ||
return s == nil || (s.Trace == nil && len(s.Transactions) == 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would s
ever be nil
? Also, this method needs not be exported. I think that if anyone declares a pointer to SampledTrace
and accesses this method while it is not initialized it's ok to permit a panic. This happens in all other cases using all types. Unless there is a very strong reason to have it here, please remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty
is used in agent.go
so it has to be exported. As for the nil check, I actually thought this was a very cool thing of Go where you don't have to constantly check everywhere if an object is nil before using it as you can have behaviour associated to the nil object itself (in Java the equivalent syntax would require you to create a NullImplementation
). But I'm also fine with standardizing a panic on nil, no strong preference. It does mean I have to move the check down to handleSampledTrace
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right! This is interesting and I have seen this used in our Go tracer as well (in the old version). Although, I have never encountered this out there in the wild. I would assume the reason is probably that if you ever end up passing a nil
pointer, it's probably a sign of bad design. I mean other than some struct's defaults, why would you? If it ever has to be nil
then I think an explicit check is better than this. That's my assumption anyway, but I'm happy to be proven wrong.
As for handleSampledTrace
, I think it should never receive nil
and you could definitely remove the check, but I don't mind keeping it if you will . You already check for that in the Process
method of the agent. If someone in the future passes nil
as a value, they might have malicious intentions and a panic
is ok I'd say 😈
cmd/trace-agent/agent.go
Outdated
if sampled { | ||
a.sampledTraceChan <- &pt.Trace | ||
sampledTrace.Trace = &pt.Trace | ||
} | ||
|
||
// Transactions extraction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please remove this comment? I don't feel it adds anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
cmd/trace-agent/agent.go
Outdated
a.TransactionSampler.Extract(pt, a.analyzedTransactionChan) | ||
sampledTrace.Transactions = a.TransactionSampler.Extract(pt) | ||
|
||
// Only send if there's something interesting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind removing this comment? It's pretty clear what happens. The trace is sent if it's not empty. I feel interesting is not the right word.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
writer/trace_writer.go
Outdated
trace := sampledTrace.Trace | ||
transactions := sampledTrace.Transactions | ||
|
||
var spansToAdd int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind renaming this to count
or n
? It's pretty obvious what we are counting here, given the context. I know you don't like short names but I feel that in general in Go it is preferred like this and I would love it if we were to stay consistent with not only the rest of the project but the Go community in general, as much as possible.
I think n
is the generally used variable for counting items, both in computer science and in mathematics and it will be clear to everyone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
writer/trace_writer_test.go
Outdated
// When sending those 6 traces | ||
var testTransactions []model.Span | ||
|
||
// When sending those 7 traces, all with transactions for the root span |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the grammar here might be a bit off or I'm too tired. I don't understand what happens when sending "those 7 traces".
Rather than making a story that has to be read throughout the test and then pieced together, I'd tend to think it's more preferred and easy to make sense of it we describe the action that happens, as opposed to giving only a piece of the story which is out of context without the rest, i.e.:
// send the test traces down the trace channel with the root span attached as a transaction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got this practice from BDD-driven Java/Scala testing frameworks but sure don't mind doing it less story like.
writer/trace_writer_test.go
Outdated
var expectedNumPayloads int | ||
var pendingSpans int | ||
|
||
for _, trace := range testTraces { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is super complicated. I'm not exactly sure what's going on here. It's pretty hard to understand. You are counting the number of spans in a trace and adding them to the pendingSpans
variable, which looks completely incorrect because of the len(trace) + 1
. Why + 1
? When looking closer at the code, it's perhaps because of transaction you add in the root above? Either way, it's very confusing and it's already a pretty big test.
I think that we need to spend a bit more time here to simplify this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to suggest to merge both these tests into one single test (let's say TestTraceWriter
) and use sub-tests for each case, one called "normal" and one called "large", or whatever you prefer. I'd also like to suggest removing all these for
loops and instead adding a new function to the fixtures
package which generates SampledTrace
s for you. We could call it RandomSampledTrace
to make it analogue to the already existing RandomTrace
.
Additionally, you could declare another method on the SampledTrace
type that returns the total length (spans). Since this method won't be used in the code, you can just declare it in the test file as it will work just fine.
The goal here is to considerably shorten these tests and remove unnecessary duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplified a little bit. Can't do much further than this without starting to lose coverage though 😞
writer/trace_writer_test.go
Outdated
@@ -25,7 +25,7 @@ func TestTraceWriter_TraceHandling(t *testing.T) { | |||
assert := assert.New(t) | |||
|
|||
// Given a trace writer, its incoming channel and the endpoint that receives the payloads | |||
traceWriter, traceChannel, testEndpoint, _ := testTraceWriter() | |||
traceWriter, payloadChannel, testEndpoint, _ := testTraceWriter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is called a traceChannel
in the other test. Please choose one, whichever you prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
writer/trace_writer_test.go
Outdated
expectedNumBytes := int64(0) | ||
expectedNumErrors := int64(0) | ||
expectedMinNumRetries := int64(0) | ||
var expectedNumPayloads int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's group these please, like elsewhere in the project:
var (
expectedNumPayloads int64
expectedNumSpans int64
expectedNumTraces int64
expectedNumBytes int64
expectedNumErrors int64
expectedMinNumRetries int64
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you Alex, I really like how this turned out!
Please use "Squash & Merge" when merging and prefix the commit message with the affected package, in this case cmd/trace-agent
.
* Traces are no longer arbitrarily split when writing. All spans for the same trace are written together. * Traces and associated transactions are processed "atomically" so that a payload is guaranteed to have a trace and all its associated transactions. Before these might have been split between different payloads as they were processed asynchronously.
ddb0c67
to
045f454
Compare
Squashed and rebased on master |
This PR reworks the logic around trace and transaction writing to fix some identified issues.
Trace splitting
Traces are no longer arbitrarily split when writing. This introduced some issues later on in the server side processing pipeline, specially around priority sampling.
Instead,
MaxSpansPerPayload
is now a soft-limit. Whenever possible, payloads will respect said limit. In case a single trace + its transactions have more spans than that limit, then we still attempt to send that trace (on its own) to Datadog's API. There's a chance that the API will reject it if it goes over the input size limit but, if that happens, that means the trace is several MB big which is not normal anyway so the issue should be handled at the client side.Traces and transactions are written atomically
Before, traces and transactions were written asynchronously which could result in scenarios where a trace would be flushed to the API without some of its transactions which would only be flushed later on in a consecutive payload. This introduced some issues during server-side processing where we might mistakenly not tag some transactions as having been sampled because we did not know about its associated trace.
With this change, traces and their respective transactions are treated "atomically" so that it can never happen that such a split occurs.