From 4f79185a44173c2bf8a44ec22073d04622cf2336 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Wed, 25 May 2022 15:55:07 -0500 Subject: [PATCH] feat(execute): refactor the operator profile to be in the statistics The operator profile is now always computed and it is part of the `flux.Statistics` struct that is returned by the query. The operator profile now uses that to return the profile instead of using Go channels and the execution dependency injected into the context. This allows the results of the operator profile to be present without it being tied directly to the profiler return. It has also been refactored to remove the use of channels for sending span data and aggregating in a goroutine to instead just aggregate the results as part of the source/transport code. This should simplify and speed up those calculations as the overall time should be equivalent but without the overhead of Go channels and only requiring a short-lived lock for appending the source profiles in the executor. --- execute/executor.go | 59 ++++++++++----- execute/profiler.go | 150 ++++----------------------------------- execute/profiler_test.go | 56 +++++++++------ execute/transport.go | 45 ++++++------ lang/compiler.go | 10 +-- lang/query_test.go | 5 ++ mock/executor.go | 17 +++-- query.go | 86 ++++++++++++++++++++++ 8 files changed, 220 insertions(+), 208 deletions(-) diff --git a/execute/executor.go b/execute/executor.go index 3c34fe7c8c..3570f46c34 100644 --- a/execute/executor.go +++ b/execute/executor.go @@ -13,7 +13,6 @@ import ( "github.com/influxdata/flux/codes" "github.com/influxdata/flux/internal/errors" "github.com/influxdata/flux/memory" - "github.com/influxdata/flux/metadata" "github.com/influxdata/flux/plan" "github.com/opentracing/opentracing-go" "go.uber.org/zap" @@ -26,7 +25,7 @@ type Executor interface { // may return zero or more values. The returned channel must not require itself to // be read so the executor must allocate enough space in the channel so if the channel // is unread that it will not block. - Execute(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan metadata.Metadata, error) + Execute(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan flux.Statistics, error) } type executor struct { @@ -62,7 +61,7 @@ type executionState struct { results map[string]flux.Result sources []Source - metaCh chan metadata.Metadata + statsCh chan flux.Statistics transports []AsyncTransport @@ -70,13 +69,13 @@ type executionState struct { logger *zap.Logger } -func (e *executor) Execute(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan metadata.Metadata, error) { +func (e *executor) Execute(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan flux.Statistics, error) { es, err := e.createExecutionState(ctx, p, a) if err != nil { return nil, nil, errors.Wrap(err, codes.Inherit, "failed to initialize execute state") } es.do() - return es.results, es.metaCh, nil + return es.results, es.statsCh, nil } func (e *executor) createExecutionState(ctx context.Context, p *plan.Spec, a memory.Allocator) (*executionState, error) { @@ -101,10 +100,9 @@ func (e *executor) createExecutionState(ctx context.Context, p *plan.Spec, a mem return nil, err } - // Only sources can be a MetadataNode at the moment so allocate enough - // space for all of them to report metadata. Not all of them will necessarily - // report metadata. - es.metaCh = make(chan metadata.Metadata, len(es.sources)) + // Only one statistics struct will be sent. Allocate space for it + // so we don't block on its creation. + es.statsCh = make(chan flux.Statistics, 1) // Choose some default resource limits based on execution options, if necessary. es.chooseDefaultResources(ctx, p) @@ -424,7 +422,18 @@ func (es *executionState) abort(err error) { } func (es *executionState) do() { - var wg sync.WaitGroup + var ( + wg sync.WaitGroup + stats flux.Statistics + statsMu sync.Mutex + ) + + updateStats := func(fn func(stats *flux.Statistics)) { + statsMu.Lock() + defer statsMu.Unlock() + fn(&stats) + } + for _, src := range es.sources { wg.Add(1) go func(src Source) { @@ -432,9 +441,11 @@ func (es *executionState) do() { opName := reflect.TypeOf(src).String() // If operator profiling is enabled for this execution, begin profiling - if opState := NewOperatorProfilingState(ctx, opName, src.Label()); opState != nil { - defer opState.Finish() + profile := flux.TransportProfile{ + NodeType: opName, + Label: src.Label(), } + profileSpan := profile.StartSpan() if span, spanCtx := opentracing.StartSpanFromContext(ctx, opName, opentracing.Tag{Key: "label", Value: src.Label()}); span != nil { ctx = spanCtx @@ -446,15 +457,23 @@ func (es *executionState) do() { // Setup panic handling on the source goroutines defer es.recover() src.Run(ctx) + profileSpan.Finish() - if mdn, ok := src.(MetadataNode); ok { - es.metaCh <- mdn.Metadata() - } + updateStats(func(stats *flux.Statistics) { + stats.Profiles = append(stats.Profiles, profile) + if mdn, ok := src.(MetadataNode); ok { + stats.Metadata.AddAll(mdn.Metadata()) + } + }) }(src) } wg.Add(1) es.dispatcher.Start(es.resources.ConcurrencyQuota, es.ctx) + + // Keep the transport profiles in a separate array from the source profiles. + // This ensures that sources are before transformations. + profiles := make([]flux.TransportProfile, 0, len(es.transports)) go func() { defer wg.Done() @@ -462,6 +481,8 @@ func (es *executionState) do() { for _, t := range es.transports { select { case <-t.Finished(): + tp := t.TransportProfile() + profiles = append(profiles, tp) case <-es.ctx.Done(): es.abort(es.ctx.Err()) case err := <-es.dispatcher.Err(): @@ -478,8 +499,14 @@ func (es *executionState) do() { }() go func() { - defer close(es.metaCh) + defer close(es.statsCh) wg.Wait() + + // Merge the transport profiles in with the ones already filled + // by the sources. + stats.Profiles = append(stats.Profiles, profiles...) + + es.statsCh <- stats }() } diff --git a/execute/profiler.go b/execute/profiler.go index 17825af227..d927089bd3 100644 --- a/execute/profiler.go +++ b/execute/profiler.go @@ -1,10 +1,8 @@ package execute import ( - "context" "fmt" "strings" - "time" "github.com/influxdata/flux" "github.com/influxdata/flux/memory" @@ -35,111 +33,19 @@ func init() { ) } -type OperatorProfilingResult struct { - Type string - // Those labels are actually their operation name. See flux/internal/spec.buildSpec. - // Some examples are: - // merged_fromRemote_range1_filter2_filter3_filter4, window5, window8, generated_yield, etc. - Label string - Start time.Time - Stop time.Time -} - -type OperatorProfilingState struct { - profiler *OperatorProfiler - Result OperatorProfilingResult -} - -func (t *OperatorProfilingState) Finish() { - t.FinishWithTime(time.Now()) -} - -func (t *OperatorProfilingState) FinishWithTime(finishTime time.Time) { - t.Result.Stop = finishTime - if t.profiler != nil && t.profiler.chIn != nil { - t.profiler.chIn <- t.Result - } -} - -type operatorProfilingResultAggregate struct { - operationType string - label string - resultCount int64 - resultMin int64 - resultMax int64 - resultSum int64 - resultMean float64 -} - -type operatorProfilerLabelGroup = map[string]*operatorProfilingResultAggregate -type operatorProfilerTypeGroup = map[string]operatorProfilerLabelGroup - -type OperatorProfiler struct { - // Receive the profiling results from the operator states. - chIn chan OperatorProfilingResult - chOut chan operatorProfilingResultAggregate -} +type OperatorProfiler struct{} func createOperatorProfiler() Profiler { - p := &OperatorProfiler{ - chIn: make(chan OperatorProfilingResult), - chOut: make(chan operatorProfilingResultAggregate), - } - go func(p *OperatorProfiler) { - aggs := make(operatorProfilerTypeGroup) - for result := range p.chIn { - _, ok := aggs[result.Type] - if !ok { - aggs[result.Type] = make(operatorProfilerLabelGroup) - } - _, ok = aggs[result.Type][result.Label] - if !ok { - aggs[result.Type][result.Label] = &operatorProfilingResultAggregate{} - } - a := aggs[result.Type][result.Label] - - // Aggregate the results - a.resultCount++ - duration := result.Stop.Sub(result.Start).Nanoseconds() - if duration > a.resultMax { - a.resultMax = duration - } - if duration < a.resultMin || a.resultMin == 0 { - a.resultMin = duration - } - a.resultSum += duration - } - - // Write the aggregated results to chOut, where they'll be - // converted into rows and appended to the final table - for typ, labels := range aggs { - for label, agg := range labels { - agg.resultMean = float64(agg.resultSum) / float64(agg.resultCount) - agg.operationType = typ - agg.label = label - p.chOut <- *agg - } - } - close(p.chOut) - }(p) - - return p + return &OperatorProfiler{} } func (o *OperatorProfiler) Name() string { return "operator" } -func (o *OperatorProfiler) closeIncomingChannel() { - if o.chIn != nil { - close(o.chIn) - o.chIn = nil - } -} - func (o *OperatorProfiler) GetResult(q flux.Query, alloc memory.Allocator) (flux.Table, error) { - o.closeIncomingChannel() - b, err := o.getTableBuilder(alloc) + stats := q.Statistics() + b, err := o.getTableBuilder(stats, alloc) if err != nil { return nil, err } @@ -154,8 +60,8 @@ func (o *OperatorProfiler) GetResult(q flux.Query, alloc memory.Allocator) (flux // on the ColListTableBuilder to make testing easier. // sortKeys and desc are passed directly into the Sort() call func (o *OperatorProfiler) GetSortedResult(q flux.Query, alloc memory.Allocator, desc bool, sortKeys ...string) (flux.Table, error) { - o.closeIncomingChannel() - b, err := o.getTableBuilder(alloc) + stats := q.Statistics() + b, err := o.getTableBuilder(stats, alloc) if err != nil { return nil, err } @@ -167,7 +73,7 @@ func (o *OperatorProfiler) GetSortedResult(q flux.Query, alloc memory.Allocator, return tbl, nil } -func (o *OperatorProfiler) getTableBuilder(alloc memory.Allocator) (*ColListTableBuilder, error) { +func (o *OperatorProfiler) getTableBuilder(stats flux.Statistics, alloc memory.Allocator) (*ColListTableBuilder, error) { groupKey := NewGroupKey( []flux.ColMeta{ { @@ -220,45 +126,19 @@ func (o *OperatorProfiler) getTableBuilder(alloc memory.Allocator) (*ColListTabl } } - for agg := range o.chOut { + for _, profile := range stats.Profiles { b.AppendString(0, "profiler/operator") - b.AppendString(1, agg.operationType) - b.AppendString(2, agg.label) - b.AppendInt(3, agg.resultCount) - b.AppendInt(4, agg.resultMin) - b.AppendInt(5, agg.resultMax) - b.AppendInt(6, agg.resultSum) - b.AppendFloat(7, agg.resultMean) + b.AppendString(1, profile.NodeType) + b.AppendString(2, profile.Label) + b.AppendInt(3, profile.Count) + b.AppendInt(4, profile.Min) + b.AppendInt(5, profile.Max) + b.AppendInt(6, profile.Sum) + b.AppendFloat(7, profile.Mean) } return b, nil } -// NewOperatorProfilingState creates a new state instance for the given operation name -// and label, provided an operator profiler exists in the execution options. -// If there is no operator profiler, this function returns nil. -func NewOperatorProfilingState(ctx context.Context, operationName string, label string, start ...time.Time) *OperatorProfilingState { - opStart := time.Now() - if len(start) > 0 { - opStart = start[0] - } - var state *OperatorProfilingState - if HaveExecutionDependencies(ctx) { - deps := GetExecutionDependencies(ctx) - if deps.ExecutionOptions.OperatorProfiler != nil { - tfp := deps.ExecutionOptions.OperatorProfiler - state = &OperatorProfilingState{ - profiler: tfp, - Result: OperatorProfilingResult{ - Type: operationName, - Label: label, - Start: opStart, - }, - } - } - } - return state -} - type QueryProfiler struct{} func createQueryProfiler() Profiler { diff --git a/execute/profiler_test.go b/execute/profiler_test.go index 4d42096e23..fee5e5905d 100644 --- a/execute/profiler_test.go +++ b/execute/profiler_test.go @@ -6,7 +6,6 @@ import ( "fmt" "io/ioutil" "strings" - "sync" "testing" "time" @@ -48,36 +47,49 @@ func TestOperatorProfiler_GetResult(t *testing.T) { #default,_profiler,,,,,,,,, ,result,table,_measurement,Type,Label,Count,MinDuration,MaxDuration,DurationSum,MeanDuration `) - wantStr.WriteString(fmt.Sprintf(",,0,profiler/operator,%s,%s,%d,%d,%d,%d,%f\n", + fmt.Fprintf(&wantStr, ",,0,profiler/operator,%s,%s,%d,%d,%d,%d,%f\n", "type0", "lab0", 4, 1000, 1606, 5212, 1303.0, - )) - wantStr.WriteString(fmt.Sprintf(",,0,profiler/operator,%s,%s,%d,%d,%d,%d,%f\n", + ) + fmt.Fprintf(&wantStr, ",,0,profiler/operator,%s,%s,%d,%d,%d,%d,%f\n", "type1", "lab0", 4, 1101, 1707, 5616, 1404.0, - )) - wantStr.WriteString(fmt.Sprintf(",,0,profiler/operator,%s,%s,%d,%d,%d,%d,%f\n", + ) + fmt.Fprintf(&wantStr, ",,0,profiler/operator,%s,%s,%d,%d,%d,%d,%f\n", "type0", "lab1", 4, 1808, 2414, 8444, 2111.0, - )) - wantStr.WriteString(fmt.Sprintf(",,0,profiler/operator,%s,%s,%d,%d,%d,%d,%f\n", + ) + fmt.Fprintf(&wantStr, ",,0,profiler/operator,%s,%s,%d,%d,%d,%d,%f\n", "type1", "lab1", 4, 1909, 2515, 8848, 2212.0, - )) + ) count := 16 - wg := sync.WaitGroup{} - wg.Add(count) - fn := func(opType string, label string, ctx context.Context, offset int) { + + stats := flux.Statistics{ + Profiles: make([]flux.TransportProfile, 0, 4), + } + for i := 0; i < 2; i++ { + for j := 0; j < 2; j++ { + stats.Profiles = append(stats.Profiles, flux.TransportProfile{ + NodeType: fmt.Sprintf("type%d", i), + Label: fmt.Sprintf("lab%d", j), + }) + } + } + + fn := func(profile *flux.TransportProfile, offset int) { st := time.Date(2020, 10, 14, 12, 30, 0, 0, time.UTC) - state := execute.NewOperatorProfilingState(ctx, opType, label, st) - // Finish() will write the data to the profiler - // In Flux runtime, this is called when an execution node finishes execution - state.FinishWithTime(time.Date(2020, 10, 14, 12, 30, 0, 1000+offset, time.UTC)) - wg.Done() + span := profile.StartSpan(st) + span.FinishWithTime(time.Date(2020, 10, 14, 12, 30, 0, 1000+offset, time.UTC)) } + + // Write profiles for the various different transports. for i := 0; i < count; i++ { - typ := fmt.Sprintf("type%d", i%2) - label := fmt.Sprintf("lab%d", i/8) - go fn(typ, label, ctx, 100*i+i) + profile := &stats.Profiles[i%2*2+i/8] + fn(profile, 100*i+i) } - wg.Wait() - tbl, err := p.GetSortedResult(nil, &memory.ResourceAllocator{}, false, "MeanDuration") + + q := &mock.Query{} + q.SetStatistics(stats) + q.Done() + + tbl, err := p.GetSortedResult(q, &memory.ResourceAllocator{}, false, "MeanDuration") if err != nil { t.Error(err) } diff --git a/execute/transport.go b/execute/transport.go index f70d9ca9a2..80f3ce446d 100644 --- a/execute/transport.go +++ b/execute/transport.go @@ -32,6 +32,9 @@ type AsyncTransport interface { Transport // Finished reports when the AsyncTransport has completed and there is no more work to do. Finished() <-chan struct{} + // TransportProfile returns the profile for this transport. + // This is only valid after the channel returned by Finished is closed. + TransportProfile() flux.TransportProfile } var _ Transformation = (*consecutiveTransport)(nil) @@ -42,10 +45,10 @@ type consecutiveTransport struct { dispatcher Dispatcher logger *zap.Logger - t Transport - messages MessageQueue - op, label string - stack []interpreter.StackEntry + t Transport + messages MessageQueue + stack []interpreter.StackEntry + profile flux.TransportProfile finished chan struct{} errMu sync.Mutex @@ -67,8 +70,10 @@ func newConsecutiveTransport(ctx context.Context, dispatcher Dispatcher, t Trans t: WrapTransformationInTransport(t, mem), // TODO(nathanielc): Have planner specify message queue initial buffer size. messages: newMessageQueue(64), - op: OperationType(t), - label: string(n.ID()), + profile: flux.TransportProfile{ + NodeType: OperationType(t), + Label: string(n.ID()), + }, stack: n.CallStack(), finished: make(chan struct{}), } @@ -116,6 +121,10 @@ func (t *consecutiveTransport) Finished() <-chan struct{} { return t.finished } +func (t *consecutiveTransport) TransportProfile() flux.TransportProfile { + return t.profile +} + func (t *consecutiveTransport) RetractTable(id DatasetID, key flux.GroupKey) error { select { case <-t.finished: @@ -215,17 +224,10 @@ func (t *consecutiveTransport) transition(new int32) { atomic.StoreInt32(&t.schedulerState, new) } -func (t *consecutiveTransport) contextWithSpan(ctx context.Context) context.Context { - didInit := false +func (t *consecutiveTransport) initSpan(ctx context.Context) { t.initSpanOnce.Do(func() { - t.span, ctx = opentracing.StartSpanFromContext(ctx, t.op, opentracing.Tag{Key: "label", Value: t.label}) - didInit = true + t.span, _ = opentracing.StartSpanFromContext(ctx, t.profile.NodeType, opentracing.Tag{Key: "label", Value: t.profile.Label}) }) - if didInit { - return ctx - } - - return opentracing.ContextWithSpan(ctx, t.span) } func (t *consecutiveTransport) finishSpan(err error) { @@ -234,13 +236,14 @@ func (t *consecutiveTransport) finishSpan(err error) { } func (t *consecutiveTransport) processMessages(ctx context.Context, throughput int) { - ctx = t.contextWithSpan(ctx) + t.initSpan(ctx) + PROCESS: i := 0 for m := t.messages.Pop(); m != nil; m = t.messages.Pop() { atomic.AddInt32(&t.inflight, -1) atomic.AddInt32(&t.totalMsgs, 1) - if f, err := t.processMessage(ctx, m); err != nil || f { + if f, err := t.processMessage(m); err != nil || f { // Set the error if there was any t.setErr(err) @@ -282,10 +285,10 @@ PROCESS: // processMessage processes the message on t. // The return value is true if the message was a FinishMsg. -func (t *consecutiveTransport) processMessage(ctx context.Context, m Message) (finished bool, err error) { - if opState := NewOperatorProfilingState(ctx, t.op, t.label); opState != nil { - defer opState.Finish() - } +func (t *consecutiveTransport) processMessage(m Message) (finished bool, err error) { + span := t.profile.StartSpan() + defer span.Finish() + if err := t.t.ProcessMessage(m); err != nil { return false, err } diff --git a/lang/compiler.go b/lang/compiler.go index d1c1059d4b..f255c1a64b 100644 --- a/lang/compiler.go +++ b/lang/compiler.go @@ -316,7 +316,7 @@ func (p *Program) Start(ctx context.Context, alloc memory.Allocator) (flux.Query fmt.Sprintf("%v", plan.Formatted(p.PlanSpec, plan.WithDetails()))) e := execute.NewExecutor(p.Logger) - resultMap, md, err := e.Execute(ctx, p.PlanSpec, q.alloc) + resultMap, statsCh, err := e.Execute(ctx, p.PlanSpec, q.alloc) if err != nil { s.Finish() return nil, err @@ -328,7 +328,7 @@ func (p *Program) Start(ctx context.Context, alloc memory.Allocator) (flux.Query // Begin reading from the metadata channel. q.wg.Add(1) - go p.readMetadata(q, md) + go p.readStatistics(q, statsCh) return q, nil } @@ -347,10 +347,10 @@ func (p *Program) processResults(ctx context.Context, q *query, resultMap map[st } } -func (p *Program) readMetadata(q *query, metaCh <-chan metadata.Metadata) { +func (p *Program) readStatistics(q *query, statsCh <-chan flux.Statistics) { defer q.wg.Done() - for md := range metaCh { - q.stats.Metadata.AddAll(md) + for stats := range statsCh { + q.stats.Merge(stats) } } diff --git a/lang/query_test.go b/lang/query_test.go index e9269a87f6..5cf8786c54 100644 --- a/lang/query_test.go +++ b/lang/query_test.go @@ -164,6 +164,7 @@ func TestQuery_Stats(t *testing.T) { t.Fatalf("unexpected error while iterating over tables: %s", err) } } + q.Done() stats := q.Statistics() if stats.TotalDuration <= 0 { @@ -395,6 +396,10 @@ data if q, close, err := runQuery(context.Background(), prelude+tc.script); err != nil { t.Error(err) } else { + // Mark the query as done as we won't read it. + q.Done() + + // Access the statistics. got := fmt.Sprintf("%v", q.Statistics().Metadata["flux/query-plan"]) if !cmp.Equal(tc.want, got) { t.Errorf("unexpected value -want/+got\n%s", cmp.Diff(tc.want, got)) diff --git a/mock/executor.go b/mock/executor.go index f6a34a01bc..a3ec6a915f 100644 --- a/mock/executor.go +++ b/mock/executor.go @@ -6,34 +6,33 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/memory" - "github.com/influxdata/flux/metadata" "github.com/influxdata/flux/plan" ) var _ execute.Executor = (*Executor)(nil) -var NoMetadata <-chan metadata.Metadata +var EmptyStatistics <-chan flux.Statistics // Executor is a mock implementation of an execute.Executor. type Executor struct { - ExecuteFn func(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan metadata.Metadata, error) + ExecuteFn func(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan flux.Statistics, error) } // NewExecutor returns a mock Executor where its methods will return zero values. func NewExecutor() *Executor { return &Executor{ - ExecuteFn: func(context.Context, *plan.Spec, memory.Allocator) (map[string]flux.Result, <-chan metadata.Metadata, error) { - return nil, NoMetadata, nil + ExecuteFn: func(context.Context, *plan.Spec, memory.Allocator) (map[string]flux.Result, <-chan flux.Statistics, error) { + return nil, EmptyStatistics, nil }, } } -func (e *Executor) Execute(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan metadata.Metadata, error) { +func (e *Executor) Execute(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan flux.Statistics, error) { return e.ExecuteFn(ctx, p, a) } func init() { - noMetaCh := make(chan metadata.Metadata) - close(noMetaCh) - NoMetadata = noMetaCh + emptyStatsCh := make(chan flux.Statistics) + close(emptyStatsCh) + EmptyStatistics = emptyStatsCh } diff --git a/query.go b/query.go index 4c22770972..3aab3a13c5 100644 --- a/query.go +++ b/query.go @@ -56,6 +56,9 @@ type Statistics struct { // The number includes memory that was freed and then used again. TotalAllocated int64 `json:"total_allocated"` + // Profiles holds the profiles for each transport (source/transformation) in this query. + Profiles []TransportProfile + // RuntimeErrors contains error messages that happened during the execution of the query. RuntimeErrors []string `json:"runtime_errors"` @@ -71,6 +74,9 @@ func (s Statistics) Add(other Statistics) Statistics { md := make(metadata.Metadata) md.AddAll(s.Metadata) md.AddAll(other.Metadata) + profiles := make([]TransportProfile, 0, len(s.Profiles)+len(other.Profiles)) + profiles = append(profiles, s.Profiles...) + profiles = append(profiles, other.Profiles...) return Statistics{ TotalDuration: s.TotalDuration + other.TotalDuration, CompileDuration: s.CompileDuration + other.CompileDuration, @@ -81,7 +87,87 @@ func (s Statistics) Add(other Statistics) Statistics { Concurrency: s.Concurrency + other.Concurrency, MaxAllocated: s.MaxAllocated + other.MaxAllocated, TotalAllocated: s.TotalAllocated + other.TotalAllocated, + Profiles: profiles, RuntimeErrors: errs, Metadata: md, } } + +// Merge copies the values from other into s. +func (s *Statistics) Merge(other Statistics) { + s.TotalDuration += other.TotalDuration + s.CompileDuration += other.CompileDuration + s.QueueDuration += other.QueueDuration + s.PlanDuration += other.PlanDuration + s.RequeueDuration += other.RequeueDuration + s.ExecuteDuration += other.ExecuteDuration + s.Concurrency += other.Concurrency + s.MaxAllocated += other.MaxAllocated + s.TotalAllocated += other.TotalAllocated + s.Profiles = append(s.Profiles, other.Profiles...) + s.RuntimeErrors = append(s.RuntimeErrors, other.RuntimeErrors...) + s.Metadata.AddAll(other.Metadata) +} + +// TransportProfile holds the profile for transport statistics. +type TransportProfile struct { + // NodeType holds the node type which is a string representation + // of the underlying transformation. + NodeType string + + // Label holds the plan node label. + Label string + + // Count holds the number of spans in this profile. + Count int64 + + // Min holds the minimum span time of this profile. + Min int64 + + // Max holds the maximum span time of this profile. + Max int64 + + // Sum holds the sum of all span times for this profile. + Sum int64 + + // Mean is the mean span time of this profile. + Mean float64 +} + +// StartSpan will start a profile span to be recorded. +func (p *TransportProfile) StartSpan(now ...time.Time) TransportProfileSpan { + var start time.Time + if len(now) > 0 { + start = now[0] + } else { + start = time.Now() + } + return TransportProfileSpan{ + p: p, + start: start, + } +} + +// TransportProfileSpan is a span that tracks the lifetime of a transport operation. +type TransportProfileSpan struct { + p *TransportProfile + start time.Time +} + +// Finish finishes the span and records the metrics for that operation. +func (span *TransportProfileSpan) Finish() { + span.FinishWithTime(time.Now()) +} + +func (span *TransportProfileSpan) FinishWithTime(now time.Time) { + d := now.Sub(span.start).Nanoseconds() + if d < span.p.Min || span.p.Count == 0 { + span.p.Min = d + } + if d > span.p.Max { + span.p.Max = d + } + span.p.Count++ + span.p.Sum += d + span.p.Mean = float64(span.p.Sum) / float64(span.p.Count) +}