From 708f791053e78429bb3135f0863224692275cef2 Mon Sep 17 00:00:00 2001 From: Zjan Turla Date: Wed, 15 Feb 2023 04:10:33 +1100 Subject: [PATCH 01/11] Add support for load balancing via arbitrary resource attribute --- exporter/loadbalancingexporter/config.go | 8 +- .../loadbalancingexporter/trace_exporter.go | 160 +++++++++++++----- .../trace_exporter_test.go | 12 +- 3 files changed, 133 insertions(+), 47 deletions(-) diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go index e3914023e0d2..00bd6693f8e9 100644 --- a/exporter/loadbalancingexporter/config.go +++ b/exporter/loadbalancingexporter/config.go @@ -25,13 +25,15 @@ type routingKey int const ( traceIDRouting routingKey = iota svcRouting + resourceAttrRouting ) // Config defines configuration for the exporter. type Config struct { - Protocol Protocol `mapstructure:"protocol"` - Resolver ResolverSettings `mapstructure:"resolver"` - RoutingKey string `mapstructure:"routing_key"` + Protocol Protocol `mapstructure:"protocol"` + Resolver ResolverSettings `mapstructure:"resolver"` + RoutingKey string `mapstructure:"routing_key"` + ResourceAttrKey string `mapstructure:"resource_attr_key"` } // Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment. diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 327bb6ca8f18..349b05d7b766 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" "go.opencensus.io/stats" "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" @@ -29,15 +30,14 @@ import ( "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/multierr" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" ) var _ exporter.Traces = (*traceExporterImp)(nil) type traceExporterImp struct { - loadBalancer loadBalancer - routingKey routingKey + loadBalancer loadBalancer + routingKey routingKey + resourceAttrKey string stopped bool shutdownWg sync.WaitGroup @@ -60,6 +60,9 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t switch cfg.(*Config).RoutingKey { case "service": traceExporter.routingKey = svcRouting + case "resourceAttr": + traceExporter.routingKey = resourceAttrRouting + traceExporter.resourceAttrKey = cfg.(*Config).ResourceAttrKey case "traceID", "": default: return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey) @@ -73,6 +76,57 @@ func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { return oCfg } +func SplitTracesByResourceAttr(batch ptrace.Traces, attrKey string) []ptrace.Traces { + var result []ptrace.Traces + var strKey string + + for i := 0; i < batch.ResourceSpans().Len(); i++ { + rs := batch.ResourceSpans().At(i) + for j := 0; j < rs.ScopeSpans().Len(); j++ { + // the batches for this ILS + batches := map[string]ptrace.ResourceSpans{} + + key, ok := rs.Resource().Attributes().Get(attrKey) + + ils := rs.ScopeSpans().At(j) + for k := 0; k < ils.Spans().Len(); k++ { + span := ils.Spans().At(k) + + if !ok { + strKey = span.TraceID().String() + } else { + strKey = key.Str() + } + + // for the first traceID in the ILS, initialize the map entry + // and add the singleTraceBatch to the result list + if _, ok := batches[strKey]; !ok { + trace := ptrace.NewTraces() + newRS := trace.ResourceSpans().AppendEmpty() + // currently, the ResourceSpans implementation has only a Resource and an ILS. We'll copy the Resource + // and set our own ILS + rs.Resource().CopyTo(newRS.Resource()) + newRS.SetSchemaUrl(rs.SchemaUrl()) + newILS := newRS.ScopeSpans().AppendEmpty() + // currently, the ILS implementation has only an InstrumentationLibrary and spans. We'll copy the library + // and set our own spans + ils.Scope().CopyTo(newILS.Scope()) + newILS.SetSchemaUrl(ils.SchemaUrl()) + batches[strKey] = newRS + + result = append(result, trace) + } + + // there is only one instrumentation library per batch + tgt := batches[strKey].ScopeSpans().At(0).Spans().AppendEmpty() + span.CopyTo(tgt) + } + } + } + + return result +} + func (e *traceExporterImp) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } @@ -89,7 +143,12 @@ func (e *traceExporterImp) Shutdown(context.Context) error { func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { var errs error - batches := batchpersignal.SplitTraces(td) + var batches []ptrace.Traces + if e.routingKey == resourceAttrRouting { + batches = SplitTracesByResourceAttr(td, e.resourceAttrKey) + } else { + batches = batchpersignal.SplitTraces(td) + } for _, batch := range batches { errs = multierr.Append(errs, e.consumeTrace(ctx, batch)) } @@ -99,43 +158,54 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error { var exp component.Component - routingIds, err := routingIdentifiersFromTraces(td, e.routingKey) + var attrKey string + if e.routingKey == svcRouting || e.routingKey == traceIDRouting { + attrKey = "" + } else { + attrKey = e.resourceAttrKey + } + routingIds, err := routingIdentifiersFromTraces(td, e.routingKey, attrKey) + if err != nil { + return err + } + var rid string + for key := range routingIds { + rid = key + break + } + // for rid := range routingIds { + endpoint := e.loadBalancer.Endpoint([]byte(rid)) + exp, err = e.loadBalancer.Exporter(endpoint) if err != nil { return err } - for rid := range routingIds { - endpoint := e.loadBalancer.Endpoint([]byte(rid)) - exp, err = e.loadBalancer.Exporter(endpoint) - if err != nil { - return err - } - - te, ok := exp.(exporter.Traces) - if !ok { - return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp) - } - start := time.Now() - err = te.ConsumeTraces(ctx, td) - duration := time.Since(start) + te, ok := exp.(exporter.Traces) + if !ok { + return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp) + } - if err == nil { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, - mBackendLatency.M(duration.Milliseconds())) - } else { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, - mBackendLatency.M(duration.Milliseconds())) - } + start := time.Now() + err = te.ConsumeTraces(ctx, td) + duration := time.Since(start) + + if err == nil { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, + mBackendLatency.M(duration.Milliseconds())) + } else { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, + mBackendLatency.M(duration.Milliseconds())) } + // } return err } -func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]bool, error) { - ids := make(map[string]bool) +func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey string) (map[string][]int, error) { + ids := make(map[string][]int) rs := td.ResourceSpans() if rs.Len() == 0 { return nil, errors.New("empty resource spans") @@ -151,17 +221,31 @@ func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string] return nil, errors.New("empty spans") } - if key == svcRouting { + if routing == svcRouting || routing == resourceAttrRouting { + var attrKey string + if routing == svcRouting { + attrKey = "service.name" + } else { + attrKey = routeKey + } for i := 0; i < rs.Len(); i++ { - svc, ok := rs.At(i).Resource().Attributes().Get("service.name") + attr, ok := rs.At(i).Resource().Attributes().Get(attrKey) if !ok { - return nil, errors.New("unable to get service name") + return nil, errors.New("unable to get attribute") + } + _, exists := ids[attr.Str()] + if exists { + ids[attr.Str()] = []int{i} + } else { + ids[attr.Str()] = append(ids[attr.Str()], i) } - ids[svc.Str()] = true } return ids, nil } tid := spans.At(0).TraceID() - ids[string(tid[:])] = true + ids[string(tid[:])] = []int{} + for i := 0; i < rs.Len(); i++ { + ids[string(tid[:])] = append(ids[string(tid[:])], i) + } return ids, nil } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index d386b6ec0e2c..4b467be07a8e 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -206,23 +206,23 @@ func TestServiceBasedRoutingForSameTraceId(t *testing.T) { desc string batch ptrace.Traces routingKey routingKey - res map[string]bool + res map[string][]int }{ { "same trace id and different services - service based routing", twoServicesWithSameTraceID(), svcRouting, - map[string]bool{"ad-service-1": true, "get-recommendations-7": true}, + map[string][]int{"ad-service-1": {0}, "get-recommendations-7": {1}}, }, { "same trace id and different services - trace id routing", twoServicesWithSameTraceID(), traceIDRouting, - map[string]bool{string(b[:]): true}, + map[string][]int{string(b[:]): {0, 1}}, }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey) + res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, "") assert.Equal(t, err, nil) assert.Equal(t, res, tt.res) }) @@ -392,9 +392,9 @@ func TestNoTracesInBatch(t *testing.T) { }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey) + res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, "") assert.Equal(t, err, tt.err) - assert.Equal(t, res, map[string]bool(nil)) + assert.Equal(t, res, map[string][]int(nil)) }) } } From d8e68e25a8879c808b89ba0c413c446258818b98 Mon Sep 17 00:00:00 2001 From: Zjan Turla Date: Thu, 16 Feb 2023 17:35:32 +1100 Subject: [PATCH 02/11] use batchperresourceattr --- exporter/loadbalancingexporter/factory.go | 2 +- exporter/loadbalancingexporter/go.mod | 1 + exporter/loadbalancingexporter/go.sum | 2 + .../loadbalancer_test.go | 2 +- .../loadbalancingexporter/trace_exporter.go | 124 +++++---------- .../trace_exporter_test.go | 141 +++++++++++++----- 6 files changed, 148 insertions(+), 124 deletions(-) diff --git a/exporter/loadbalancingexporter/factory.go b/exporter/loadbalancingexporter/factory.go index e8655a891685..76bf2ee7f5ad 100644 --- a/exporter/loadbalancingexporter/factory.go +++ b/exporter/loadbalancingexporter/factory.go @@ -54,7 +54,7 @@ func createDefaultConfig() component.Config { } func createTracesExporter(_ context.Context, params exporter.CreateSettings, cfg component.Config) (exporter.Traces, error) { - return newTracesExporter(params, cfg) + return newTracesExporter(params, cfg, newLoadBalancer) } func createLogsExporter(_ context.Context, params exporter.CreateSettings, cfg component.Config) (exporter.Logs, error) { diff --git a/exporter/loadbalancingexporter/go.mod b/exporter/loadbalancingexporter/go.mod index 40da46f82cd1..5e2cfec7e034 100644 --- a/exporter/loadbalancingexporter/go.mod +++ b/exporter/loadbalancingexporter/go.mod @@ -48,6 +48,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.1.17 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.71.0 github.com/pelletier/go-toml v1.9.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect diff --git a/exporter/loadbalancingexporter/go.sum b/exporter/loadbalancingexporter/go.sum index 052f891a17f7..6d65d1fc620b 100644 --- a/exporter/loadbalancingexporter/go.sum +++ b/exporter/loadbalancingexporter/go.sum @@ -317,6 +317,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.71.0 h1:gLH2mphRSaTSYmcsiqgxTVmCgN5+EXFVemrfIfoa/qw= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.71.0/go.mod h1:JJWlUP/iZ7lHjSBu7aJSuSb32reSUX77wzfcnAooqzU= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= diff --git a/exporter/loadbalancingexporter/loadbalancer_test.go b/exporter/loadbalancingexporter/loadbalancer_test.go index 75cc18d29701..669575e4f0c3 100644 --- a/exporter/loadbalancingexporter/loadbalancer_test.go +++ b/exporter/loadbalancingexporter/loadbalancer_test.go @@ -178,7 +178,7 @@ func TestStartFailureStaticResolver(t *testing.T) { func TestLoadBalancerShutdown(t *testing.T) { // prepare cfg := simpleConfig() - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg, newLoadBalancer) require.NotNil(t, p) require.NoError(t, err) diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 349b05d7b766..45da392cc99e 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" "go.opencensus.io/stats" "go.opencensus.io/tag" @@ -32,7 +33,11 @@ import ( "go.uber.org/multierr" ) -var _ exporter.Traces = (*traceExporterImp)(nil) +type baseTracesExporter struct { + component.Component + consumer.Traces + routingKey routingKey +} type traceExporterImp struct { loadBalancer loadBalancer @@ -43,11 +48,13 @@ type traceExporterImp struct { shutdownWg sync.WaitGroup } +type LoadBalancerGenerator func(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancerImp, error) + // Create new traces exporter -func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*traceExporterImp, error) { +func newTracesExporter(params exporter.CreateSettings, cfg component.Config, lbf LoadBalancerGenerator) (exporter.Traces, error) { exporterFactory := otlpexporter.NewFactory() - lb, err := newLoadBalancer(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) { + lb, err := lbf(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) { oCfg := buildExporterConfig(cfg.(*Config), endpoint) return exporterFactory.CreateTracesExporter(ctx, params, &oCfg) }) @@ -59,10 +66,21 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t switch cfg.(*Config).RoutingKey { case "service": - traceExporter.routingKey = svcRouting + traceExporter.routingKey = resourceAttrRouting + traceExporter.resourceAttrKey = "service.name" + return &baseTracesExporter{ + Component: &traceExporter, + Traces: batchperresourceattr.NewBatchPerResourceTraces(traceExporter.resourceAttrKey, &traceExporter), + routingKey: svcRouting, + }, nil case "resourceAttr": traceExporter.routingKey = resourceAttrRouting traceExporter.resourceAttrKey = cfg.(*Config).ResourceAttrKey + return &baseTracesExporter{ + Component: &traceExporter, + Traces: batchperresourceattr.NewBatchPerResourceTraces(traceExporter.resourceAttrKey, &traceExporter), + routingKey: resourceAttrRouting, + }, nil case "traceID", "": default: return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey) @@ -76,57 +94,6 @@ func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { return oCfg } -func SplitTracesByResourceAttr(batch ptrace.Traces, attrKey string) []ptrace.Traces { - var result []ptrace.Traces - var strKey string - - for i := 0; i < batch.ResourceSpans().Len(); i++ { - rs := batch.ResourceSpans().At(i) - for j := 0; j < rs.ScopeSpans().Len(); j++ { - // the batches for this ILS - batches := map[string]ptrace.ResourceSpans{} - - key, ok := rs.Resource().Attributes().Get(attrKey) - - ils := rs.ScopeSpans().At(j) - for k := 0; k < ils.Spans().Len(); k++ { - span := ils.Spans().At(k) - - if !ok { - strKey = span.TraceID().String() - } else { - strKey = key.Str() - } - - // for the first traceID in the ILS, initialize the map entry - // and add the singleTraceBatch to the result list - if _, ok := batches[strKey]; !ok { - trace := ptrace.NewTraces() - newRS := trace.ResourceSpans().AppendEmpty() - // currently, the ResourceSpans implementation has only a Resource and an ILS. We'll copy the Resource - // and set our own ILS - rs.Resource().CopyTo(newRS.Resource()) - newRS.SetSchemaUrl(rs.SchemaUrl()) - newILS := newRS.ScopeSpans().AppendEmpty() - // currently, the ILS implementation has only an InstrumentationLibrary and spans. We'll copy the library - // and set our own spans - ils.Scope().CopyTo(newILS.Scope()) - newILS.SetSchemaUrl(ils.SchemaUrl()) - batches[strKey] = newRS - - result = append(result, trace) - } - - // there is only one instrumentation library per batch - tgt := batches[strKey].ScopeSpans().At(0).Spans().AppendEmpty() - span.CopyTo(tgt) - } - } - } - - return result -} - func (e *traceExporterImp) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } @@ -143,12 +110,7 @@ func (e *traceExporterImp) Shutdown(context.Context) error { func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { var errs error - var batches []ptrace.Traces - if e.routingKey == resourceAttrRouting { - batches = SplitTracesByResourceAttr(td, e.resourceAttrKey) - } else { - batches = batchpersignal.SplitTraces(td) - } + batches := batchpersignal.SplitTraces(td) for _, batch := range batches { errs = multierr.Append(errs, e.consumeTrace(ctx, batch)) } @@ -159,10 +121,10 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error { var exp component.Component var attrKey string - if e.routingKey == svcRouting || e.routingKey == traceIDRouting { - attrKey = "" - } else { + if e.routingKey == svcRouting || e.routingKey == resourceAttrRouting { attrKey = e.resourceAttrKey + } else { + attrKey = "" } routingIds, err := routingIdentifiersFromTraces(td, e.routingKey, attrKey) if err != nil { @@ -173,7 +135,6 @@ func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) e rid = key break } - // for rid := range routingIds { endpoint := e.loadBalancer.Endpoint([]byte(rid)) exp, err = e.loadBalancer.Exporter(endpoint) if err != nil { @@ -200,12 +161,12 @@ func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) e []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, mBackendLatency.M(duration.Milliseconds())) } - // } return err } +// This function should receive a Trace with a single unique value for the routingKey func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey string) (map[string][]int, error) { - ids := make(map[string][]int) + keys := make(map[string][]int) rs := td.ResourceSpans() if rs.Len() == 0 { return nil, errors.New("empty resource spans") @@ -222,30 +183,27 @@ func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey } if routing == svcRouting || routing == resourceAttrRouting { - var attrKey string - if routing == svcRouting { - attrKey = "service.name" - } else { - attrKey = routeKey - } for i := 0; i < rs.Len(); i++ { - attr, ok := rs.At(i).Resource().Attributes().Get(attrKey) + attr, ok := rs.At(i).Resource().Attributes().Get(routeKey) if !ok { - return nil, errors.New("unable to get attribute") + return nil, fmt.Errorf("unable to get routing attribute: %s, %d", routeKey, routing) } - _, exists := ids[attr.Str()] + _, exists := keys[attr.Str()] if exists { - ids[attr.Str()] = []int{i} + keys[attr.Str()] = []int{i} } else { - ids[attr.Str()] = append(ids[attr.Str()], i) + keys[attr.Str()] = append(keys[attr.Str()], i) } } - return ids, nil + if len(keys) != 1 { + return nil, errors.New("batch of traces include multiple values of the routing attribute") + } + return keys, nil } - tid := spans.At(0).TraceID() - ids[string(tid[:])] = []int{} + tid := spans.At(0).TraceID().String() + keys[tid] = []int{} for i := 0; i < rs.Len(); i++ { - ids[string(tid[:])] = append(ids[string(tid[:])], i) + keys[tid] = append(keys[tid], i) } - return ids, nil + return keys, nil } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index 4b467be07a8e..124ccaa91059 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -61,7 +61,7 @@ func TestNewTracesExporter(t *testing.T) { } { t.Run(tt.desc, func(t *testing.T) { // test - _, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config) + _, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config, newLoadBalancer) // verify require.Equal(t, tt.err, err) @@ -72,29 +72,28 @@ func TestNewTracesExporter(t *testing.T) { func TestTracesExporterStart(t *testing.T) { for _, tt := range []struct { desc string - te *traceExporterImp + te exporter.Traces err error }{ { "ok", - func() *traceExporterImp { - p, _ := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + func() exporter.Traces { + p, _ := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), newLoadBalancer) return p }(), nil, }, { "error", - func() *traceExporterImp { + func() exporter.Traces { lb, _ := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), nil) - p, _ := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) - lb.res = &mockResolver{ onStart: func(context.Context) error { return errors.New("some expected err") }, } - p.loadBalancer = lb + var lbf = mockedLBGenerator(lb) + p, _ := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), lbf) return p }(), @@ -117,7 +116,7 @@ func TestTracesExporterStart(t *testing.T) { } func TestTracesExporterShutdown(t *testing.T) { - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), newLoadBalancer) require.NotNil(t, p) require.NoError(t, err) @@ -136,11 +135,6 @@ func TestConsumeTraces(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) - require.NotNil(t, p) - require.NoError(t, err) - assert.Equal(t, p.routingKey, traceIDRouting) - // pre-load an exporter here, so that we don't use the actual OTLP exporter lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) lb.res = &mockResolver{ @@ -149,7 +143,14 @@ func TestConsumeTraces(t *testing.T) { return []string{"endpoint-1"}, nil }, } - p.loadBalancer = lb + lbf := mockedLBGenerator(lb) + + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), lbf) + require.NotNil(t, p) + require.NoError(t, err) + + var te *traceExporterImp = p.(*traceExporterImp) + assert.Equal(t, te.routingKey, traceIDRouting) err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -172,10 +173,43 @@ func TestConsumeTracesServiceBased(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), serviceBasedRoutingConfig()) + // pre-load an exporter here, so that we don't use the actual OTLP exporter + lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) + lb.res = &mockResolver{ + triggerCallbacks: true, + onResolve: func(ctx context.Context) ([]string, error) { + return []string{"endpoint-1"}, nil + }, + } + lbf := mockedLBGenerator(lb) + + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), serviceBasedRoutingConfig(), lbf) require.NotNil(t, p) require.NoError(t, err) - assert.Equal(t, p.routingKey, svcRouting) + + var te *baseTracesExporter = p.(*baseTracesExporter) + assert.Equal(t, te.routingKey, svcRouting) + + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + // test + res := p.ConsumeTraces(context.Background(), simpleTracesWithServiceName()) + + // verify + assert.Nil(t, res) +} + +func TestConsumeTracesAttrBased(t *testing.T) { + componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { + return newNopMockTracesExporter(), nil + } + lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), attrBasedRoutingConfig("service.name"), componentFactory) + require.NotNil(t, lb) + require.NoError(t, err) // pre-load an exporter here, so that we don't use the actual OTLP exporter lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) @@ -185,7 +219,14 @@ func TestConsumeTracesServiceBased(t *testing.T) { return []string{"endpoint-1"}, nil }, } - p.loadBalancer = lb + lbf := mockedLBGenerator(lb) + + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), attrBasedRoutingConfig("service.name"), lbf) + require.NotNil(t, p) + require.NoError(t, err) + + var te *baseTracesExporter = p.(*baseTracesExporter) + assert.Equal(t, te.routingKey, resourceAttrRouting) err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -207,24 +248,27 @@ func TestServiceBasedRoutingForSameTraceId(t *testing.T) { batch ptrace.Traces routingKey routingKey res map[string][]int + err error }{ { "same trace id and different services - service based routing", twoServicesWithSameTraceID(), svcRouting, - map[string][]int{"ad-service-1": {0}, "get-recommendations-7": {1}}, + nil, + errors.New("batch of traces include multiple values of the routing attribute"), }, { "same trace id and different services - trace id routing", twoServicesWithSameTraceID(), traceIDRouting, - map[string][]int{string(b[:]): {0, 1}}, + map[string][]int{string(b.String()): {0, 1}}, + nil, }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, "") - assert.Equal(t, err, nil) - assert.Equal(t, res, tt.res) + res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, "service.name") + assert.Equal(t, tt.err, err) + assert.Equal(t, tt.res, res) }) } } @@ -237,17 +281,17 @@ func TestConsumeTracesExporterNoEndpoint(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) - require.NotNil(t, p) - require.NoError(t, err) - lb.res = &mockResolver{ triggerCallbacks: true, onResolve: func(ctx context.Context) ([]string, error) { return nil, nil }, } - p.loadBalancer = lb + lbf := mockedLBGenerator(lb) + + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), lbf) + require.NotNil(t, p) + require.NoError(t, err) err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -271,10 +315,6 @@ func TestConsumeTracesUnexpectedExporterType(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) - require.NotNil(t, p) - require.NoError(t, err) - // pre-load an exporter here, so that we don't use the actual OTLP exporter lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) lb.res = &mockResolver{ @@ -283,7 +323,11 @@ func TestConsumeTracesUnexpectedExporterType(t *testing.T) { return []string{"endpoint-1"}, nil }, } - p.loadBalancer = lb + lbf := mockedLBGenerator(lb) + + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), lbf) + require.NotNil(t, p) + require.NoError(t, err) err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -335,12 +379,12 @@ func TestBatchWithTwoTraces(t *testing.T) { lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory) require.NotNil(t, lb) require.NoError(t, err) + lbf := mockedLBGenerator(lb) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), lbf) require.NotNil(t, p) require.NoError(t, err) - p.loadBalancer = lb err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -362,12 +406,14 @@ func TestNoTracesInBatch(t *testing.T) { desc string batch ptrace.Traces routingKey routingKey + attrKey string err error }{ { "no resource spans", ptrace.NewTraces(), traceIDRouting, + "", errors.New("empty resource spans"), }, { @@ -378,6 +424,7 @@ func TestNoTracesInBatch(t *testing.T) { return batch }(), traceIDRouting, + "", errors.New("empty scope spans"), }, { @@ -388,6 +435,7 @@ func TestNoTracesInBatch(t *testing.T) { return batch }(), svcRouting, + "", errors.New("empty spans"), }, } { @@ -462,14 +510,13 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), cfg, componentFactory) require.NotNil(t, lb) require.NoError(t, err) + lb.res = res + lbf := mockedLBGenerator(lb) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg, lbf) require.NotNil(t, p) require.NoError(t, err) - lb.res = res - p.loadBalancer = lb - counter1 := atomic.NewInt64(0) counter2 := atomic.NewInt64(0) defaultExporters := map[string]component.Component{ @@ -582,6 +629,12 @@ func appendSimpleTraceWithID(dest ptrace.ResourceSpans, id pcommon.TraceID) { dest.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID(id) } +func mockedLBGenerator(lb *loadBalancerImp) LoadBalancerGenerator { + return func(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancerImp, error) { + return lb, nil + } +} + func simpleConfig() *Config { return &Config{ Resolver: ResolverSettings{ @@ -599,6 +652,16 @@ func serviceBasedRoutingConfig() *Config { } } +func attrBasedRoutingConfig(attrKey string) *Config { + return &Config{ + Resolver: ResolverSettings{ + Static: &StaticResolver{Hostnames: []string{"endpoint-1"}}, + }, + RoutingKey: "resourceAttr", + ResourceAttrKey: attrKey, + } +} + type mockTracesExporter struct { component.Component ConsumeTracesFn func(ctx context.Context, td ptrace.Traces) error From a8447f8faa580304c156fa8a65e125e57b4a58e4 Mon Sep 17 00:00:00 2001 From: Zjan Turla Date: Fri, 17 Feb 2023 11:03:01 +1100 Subject: [PATCH 03/11] Revert "use batchperresourceattr" This reverts commit d8e68e25a8879c808b89ba0c413c446258818b98. --- exporter/loadbalancingexporter/factory.go | 2 +- exporter/loadbalancingexporter/go.mod | 1 - exporter/loadbalancingexporter/go.sum | 2 - .../loadbalancer_test.go | 2 +- .../loadbalancingexporter/trace_exporter.go | 124 ++++++++++----- .../trace_exporter_test.go | 141 +++++------------- 6 files changed, 124 insertions(+), 148 deletions(-) diff --git a/exporter/loadbalancingexporter/factory.go b/exporter/loadbalancingexporter/factory.go index 76bf2ee7f5ad..e8655a891685 100644 --- a/exporter/loadbalancingexporter/factory.go +++ b/exporter/loadbalancingexporter/factory.go @@ -54,7 +54,7 @@ func createDefaultConfig() component.Config { } func createTracesExporter(_ context.Context, params exporter.CreateSettings, cfg component.Config) (exporter.Traces, error) { - return newTracesExporter(params, cfg, newLoadBalancer) + return newTracesExporter(params, cfg) } func createLogsExporter(_ context.Context, params exporter.CreateSettings, cfg component.Config) (exporter.Logs, error) { diff --git a/exporter/loadbalancingexporter/go.mod b/exporter/loadbalancingexporter/go.mod index 5e2cfec7e034..40da46f82cd1 100644 --- a/exporter/loadbalancingexporter/go.mod +++ b/exporter/loadbalancingexporter/go.mod @@ -48,7 +48,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.1.17 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.71.0 github.com/pelletier/go-toml v1.9.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect diff --git a/exporter/loadbalancingexporter/go.sum b/exporter/loadbalancingexporter/go.sum index 6d65d1fc620b..052f891a17f7 100644 --- a/exporter/loadbalancingexporter/go.sum +++ b/exporter/loadbalancingexporter/go.sum @@ -317,8 +317,6 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= -github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.71.0 h1:gLH2mphRSaTSYmcsiqgxTVmCgN5+EXFVemrfIfoa/qw= -github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.71.0/go.mod h1:JJWlUP/iZ7lHjSBu7aJSuSb32reSUX77wzfcnAooqzU= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= diff --git a/exporter/loadbalancingexporter/loadbalancer_test.go b/exporter/loadbalancingexporter/loadbalancer_test.go index 669575e4f0c3..75cc18d29701 100644 --- a/exporter/loadbalancingexporter/loadbalancer_test.go +++ b/exporter/loadbalancingexporter/loadbalancer_test.go @@ -178,7 +178,7 @@ func TestStartFailureStaticResolver(t *testing.T) { func TestLoadBalancerShutdown(t *testing.T) { // prepare cfg := simpleConfig() - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg, newLoadBalancer) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg) require.NotNil(t, p) require.NoError(t, err) diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 45da392cc99e..349b05d7b766 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -21,7 +21,6 @@ import ( "sync" "time" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" "go.opencensus.io/stats" "go.opencensus.io/tag" @@ -33,11 +32,7 @@ import ( "go.uber.org/multierr" ) -type baseTracesExporter struct { - component.Component - consumer.Traces - routingKey routingKey -} +var _ exporter.Traces = (*traceExporterImp)(nil) type traceExporterImp struct { loadBalancer loadBalancer @@ -48,13 +43,11 @@ type traceExporterImp struct { shutdownWg sync.WaitGroup } -type LoadBalancerGenerator func(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancerImp, error) - // Create new traces exporter -func newTracesExporter(params exporter.CreateSettings, cfg component.Config, lbf LoadBalancerGenerator) (exporter.Traces, error) { +func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*traceExporterImp, error) { exporterFactory := otlpexporter.NewFactory() - lb, err := lbf(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) { + lb, err := newLoadBalancer(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) { oCfg := buildExporterConfig(cfg.(*Config), endpoint) return exporterFactory.CreateTracesExporter(ctx, params, &oCfg) }) @@ -66,21 +59,10 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config, lbf switch cfg.(*Config).RoutingKey { case "service": - traceExporter.routingKey = resourceAttrRouting - traceExporter.resourceAttrKey = "service.name" - return &baseTracesExporter{ - Component: &traceExporter, - Traces: batchperresourceattr.NewBatchPerResourceTraces(traceExporter.resourceAttrKey, &traceExporter), - routingKey: svcRouting, - }, nil + traceExporter.routingKey = svcRouting case "resourceAttr": traceExporter.routingKey = resourceAttrRouting traceExporter.resourceAttrKey = cfg.(*Config).ResourceAttrKey - return &baseTracesExporter{ - Component: &traceExporter, - Traces: batchperresourceattr.NewBatchPerResourceTraces(traceExporter.resourceAttrKey, &traceExporter), - routingKey: resourceAttrRouting, - }, nil case "traceID", "": default: return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey) @@ -94,6 +76,57 @@ func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { return oCfg } +func SplitTracesByResourceAttr(batch ptrace.Traces, attrKey string) []ptrace.Traces { + var result []ptrace.Traces + var strKey string + + for i := 0; i < batch.ResourceSpans().Len(); i++ { + rs := batch.ResourceSpans().At(i) + for j := 0; j < rs.ScopeSpans().Len(); j++ { + // the batches for this ILS + batches := map[string]ptrace.ResourceSpans{} + + key, ok := rs.Resource().Attributes().Get(attrKey) + + ils := rs.ScopeSpans().At(j) + for k := 0; k < ils.Spans().Len(); k++ { + span := ils.Spans().At(k) + + if !ok { + strKey = span.TraceID().String() + } else { + strKey = key.Str() + } + + // for the first traceID in the ILS, initialize the map entry + // and add the singleTraceBatch to the result list + if _, ok := batches[strKey]; !ok { + trace := ptrace.NewTraces() + newRS := trace.ResourceSpans().AppendEmpty() + // currently, the ResourceSpans implementation has only a Resource and an ILS. We'll copy the Resource + // and set our own ILS + rs.Resource().CopyTo(newRS.Resource()) + newRS.SetSchemaUrl(rs.SchemaUrl()) + newILS := newRS.ScopeSpans().AppendEmpty() + // currently, the ILS implementation has only an InstrumentationLibrary and spans. We'll copy the library + // and set our own spans + ils.Scope().CopyTo(newILS.Scope()) + newILS.SetSchemaUrl(ils.SchemaUrl()) + batches[strKey] = newRS + + result = append(result, trace) + } + + // there is only one instrumentation library per batch + tgt := batches[strKey].ScopeSpans().At(0).Spans().AppendEmpty() + span.CopyTo(tgt) + } + } + } + + return result +} + func (e *traceExporterImp) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } @@ -110,7 +143,12 @@ func (e *traceExporterImp) Shutdown(context.Context) error { func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { var errs error - batches := batchpersignal.SplitTraces(td) + var batches []ptrace.Traces + if e.routingKey == resourceAttrRouting { + batches = SplitTracesByResourceAttr(td, e.resourceAttrKey) + } else { + batches = batchpersignal.SplitTraces(td) + } for _, batch := range batches { errs = multierr.Append(errs, e.consumeTrace(ctx, batch)) } @@ -121,10 +159,10 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error { var exp component.Component var attrKey string - if e.routingKey == svcRouting || e.routingKey == resourceAttrRouting { - attrKey = e.resourceAttrKey - } else { + if e.routingKey == svcRouting || e.routingKey == traceIDRouting { attrKey = "" + } else { + attrKey = e.resourceAttrKey } routingIds, err := routingIdentifiersFromTraces(td, e.routingKey, attrKey) if err != nil { @@ -135,6 +173,7 @@ func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) e rid = key break } + // for rid := range routingIds { endpoint := e.loadBalancer.Endpoint([]byte(rid)) exp, err = e.loadBalancer.Exporter(endpoint) if err != nil { @@ -161,12 +200,12 @@ func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) e []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, mBackendLatency.M(duration.Milliseconds())) } + // } return err } -// This function should receive a Trace with a single unique value for the routingKey func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey string) (map[string][]int, error) { - keys := make(map[string][]int) + ids := make(map[string][]int) rs := td.ResourceSpans() if rs.Len() == 0 { return nil, errors.New("empty resource spans") @@ -183,27 +222,30 @@ func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey } if routing == svcRouting || routing == resourceAttrRouting { + var attrKey string + if routing == svcRouting { + attrKey = "service.name" + } else { + attrKey = routeKey + } for i := 0; i < rs.Len(); i++ { - attr, ok := rs.At(i).Resource().Attributes().Get(routeKey) + attr, ok := rs.At(i).Resource().Attributes().Get(attrKey) if !ok { - return nil, fmt.Errorf("unable to get routing attribute: %s, %d", routeKey, routing) + return nil, errors.New("unable to get attribute") } - _, exists := keys[attr.Str()] + _, exists := ids[attr.Str()] if exists { - keys[attr.Str()] = []int{i} + ids[attr.Str()] = []int{i} } else { - keys[attr.Str()] = append(keys[attr.Str()], i) + ids[attr.Str()] = append(ids[attr.Str()], i) } } - if len(keys) != 1 { - return nil, errors.New("batch of traces include multiple values of the routing attribute") - } - return keys, nil + return ids, nil } - tid := spans.At(0).TraceID().String() - keys[tid] = []int{} + tid := spans.At(0).TraceID() + ids[string(tid[:])] = []int{} for i := 0; i < rs.Len(); i++ { - keys[tid] = append(keys[tid], i) + ids[string(tid[:])] = append(ids[string(tid[:])], i) } - return keys, nil + return ids, nil } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index 124ccaa91059..4b467be07a8e 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -61,7 +61,7 @@ func TestNewTracesExporter(t *testing.T) { } { t.Run(tt.desc, func(t *testing.T) { // test - _, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config, newLoadBalancer) + _, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config) // verify require.Equal(t, tt.err, err) @@ -72,28 +72,29 @@ func TestNewTracesExporter(t *testing.T) { func TestTracesExporterStart(t *testing.T) { for _, tt := range []struct { desc string - te exporter.Traces + te *traceExporterImp err error }{ { "ok", - func() exporter.Traces { - p, _ := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), newLoadBalancer) + func() *traceExporterImp { + p, _ := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) return p }(), nil, }, { "error", - func() exporter.Traces { + func() *traceExporterImp { lb, _ := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), nil) + p, _ := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + lb.res = &mockResolver{ onStart: func(context.Context) error { return errors.New("some expected err") }, } - var lbf = mockedLBGenerator(lb) - p, _ := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), lbf) + p.loadBalancer = lb return p }(), @@ -116,7 +117,7 @@ func TestTracesExporterStart(t *testing.T) { } func TestTracesExporterShutdown(t *testing.T) { - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), newLoadBalancer) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) require.NotNil(t, p) require.NoError(t, err) @@ -135,6 +136,11 @@ func TestConsumeTraces(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + require.NotNil(t, p) + require.NoError(t, err) + assert.Equal(t, p.routingKey, traceIDRouting) + // pre-load an exporter here, so that we don't use the actual OTLP exporter lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) lb.res = &mockResolver{ @@ -143,14 +149,7 @@ func TestConsumeTraces(t *testing.T) { return []string{"endpoint-1"}, nil }, } - lbf := mockedLBGenerator(lb) - - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), lbf) - require.NotNil(t, p) - require.NoError(t, err) - - var te *traceExporterImp = p.(*traceExporterImp) - assert.Equal(t, te.routingKey, traceIDRouting) + p.loadBalancer = lb err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -173,43 +172,10 @@ func TestConsumeTracesServiceBased(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) - // pre-load an exporter here, so that we don't use the actual OTLP exporter - lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) - lb.res = &mockResolver{ - triggerCallbacks: true, - onResolve: func(ctx context.Context) ([]string, error) { - return []string{"endpoint-1"}, nil - }, - } - lbf := mockedLBGenerator(lb) - - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), serviceBasedRoutingConfig(), lbf) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), serviceBasedRoutingConfig()) require.NotNil(t, p) require.NoError(t, err) - - var te *baseTracesExporter = p.(*baseTracesExporter) - assert.Equal(t, te.routingKey, svcRouting) - - err = p.Start(context.Background(), componenttest.NewNopHost()) - require.NoError(t, err) - defer func() { - require.NoError(t, p.Shutdown(context.Background())) - }() - - // test - res := p.ConsumeTraces(context.Background(), simpleTracesWithServiceName()) - - // verify - assert.Nil(t, res) -} - -func TestConsumeTracesAttrBased(t *testing.T) { - componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { - return newNopMockTracesExporter(), nil - } - lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), attrBasedRoutingConfig("service.name"), componentFactory) - require.NotNil(t, lb) - require.NoError(t, err) + assert.Equal(t, p.routingKey, svcRouting) // pre-load an exporter here, so that we don't use the actual OTLP exporter lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) @@ -219,14 +185,7 @@ func TestConsumeTracesAttrBased(t *testing.T) { return []string{"endpoint-1"}, nil }, } - lbf := mockedLBGenerator(lb) - - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), attrBasedRoutingConfig("service.name"), lbf) - require.NotNil(t, p) - require.NoError(t, err) - - var te *baseTracesExporter = p.(*baseTracesExporter) - assert.Equal(t, te.routingKey, resourceAttrRouting) + p.loadBalancer = lb err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -248,27 +207,24 @@ func TestServiceBasedRoutingForSameTraceId(t *testing.T) { batch ptrace.Traces routingKey routingKey res map[string][]int - err error }{ { "same trace id and different services - service based routing", twoServicesWithSameTraceID(), svcRouting, - nil, - errors.New("batch of traces include multiple values of the routing attribute"), + map[string][]int{"ad-service-1": {0}, "get-recommendations-7": {1}}, }, { "same trace id and different services - trace id routing", twoServicesWithSameTraceID(), traceIDRouting, - map[string][]int{string(b.String()): {0, 1}}, - nil, + map[string][]int{string(b[:]): {0, 1}}, }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, "service.name") - assert.Equal(t, tt.err, err) - assert.Equal(t, tt.res, res) + res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, "") + assert.Equal(t, err, nil) + assert.Equal(t, res, tt.res) }) } } @@ -281,17 +237,17 @@ func TestConsumeTracesExporterNoEndpoint(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + require.NotNil(t, p) + require.NoError(t, err) + lb.res = &mockResolver{ triggerCallbacks: true, onResolve: func(ctx context.Context) ([]string, error) { return nil, nil }, } - lbf := mockedLBGenerator(lb) - - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), lbf) - require.NotNil(t, p) - require.NoError(t, err) + p.loadBalancer = lb err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -315,6 +271,10 @@ func TestConsumeTracesUnexpectedExporterType(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + require.NotNil(t, p) + require.NoError(t, err) + // pre-load an exporter here, so that we don't use the actual OTLP exporter lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) lb.res = &mockResolver{ @@ -323,11 +283,7 @@ func TestConsumeTracesUnexpectedExporterType(t *testing.T) { return []string{"endpoint-1"}, nil }, } - lbf := mockedLBGenerator(lb) - - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), lbf) - require.NotNil(t, p) - require.NoError(t, err) + p.loadBalancer = lb err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -379,12 +335,12 @@ func TestBatchWithTwoTraces(t *testing.T) { lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory) require.NotNil(t, lb) require.NoError(t, err) - lbf := mockedLBGenerator(lb) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), lbf) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) require.NotNil(t, p) require.NoError(t, err) + p.loadBalancer = lb err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -406,14 +362,12 @@ func TestNoTracesInBatch(t *testing.T) { desc string batch ptrace.Traces routingKey routingKey - attrKey string err error }{ { "no resource spans", ptrace.NewTraces(), traceIDRouting, - "", errors.New("empty resource spans"), }, { @@ -424,7 +378,6 @@ func TestNoTracesInBatch(t *testing.T) { return batch }(), traceIDRouting, - "", errors.New("empty scope spans"), }, { @@ -435,7 +388,6 @@ func TestNoTracesInBatch(t *testing.T) { return batch }(), svcRouting, - "", errors.New("empty spans"), }, } { @@ -510,13 +462,14 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), cfg, componentFactory) require.NotNil(t, lb) require.NoError(t, err) - lb.res = res - lbf := mockedLBGenerator(lb) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg, lbf) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg) require.NotNil(t, p) require.NoError(t, err) + lb.res = res + p.loadBalancer = lb + counter1 := atomic.NewInt64(0) counter2 := atomic.NewInt64(0) defaultExporters := map[string]component.Component{ @@ -629,12 +582,6 @@ func appendSimpleTraceWithID(dest ptrace.ResourceSpans, id pcommon.TraceID) { dest.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID(id) } -func mockedLBGenerator(lb *loadBalancerImp) LoadBalancerGenerator { - return func(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancerImp, error) { - return lb, nil - } -} - func simpleConfig() *Config { return &Config{ Resolver: ResolverSettings{ @@ -652,16 +599,6 @@ func serviceBasedRoutingConfig() *Config { } } -func attrBasedRoutingConfig(attrKey string) *Config { - return &Config{ - Resolver: ResolverSettings{ - Static: &StaticResolver{Hostnames: []string{"endpoint-1"}}, - }, - RoutingKey: "resourceAttr", - ResourceAttrKey: attrKey, - } -} - type mockTracesExporter struct { component.Component ConsumeTracesFn func(ctx context.Context, td ptrace.Traces) error From c3a8230f1a6d91e6153ccc8e6eec4d2ff916fed5 Mon Sep 17 00:00:00 2001 From: Zjan Turla Date: Fri, 17 Feb 2023 12:38:50 +1100 Subject: [PATCH 04/11] add support for routing by multiple attrs and falling back on trace id --- exporter/loadbalancingexporter/config.go | 8 +- .../loadbalancingexporter/trace_exporter.go | 205 ++++++++++-------- 2 files changed, 120 insertions(+), 93 deletions(-) diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go index 00bd6693f8e9..b6f08227daf8 100644 --- a/exporter/loadbalancingexporter/config.go +++ b/exporter/loadbalancingexporter/config.go @@ -30,10 +30,10 @@ const ( // Config defines configuration for the exporter. type Config struct { - Protocol Protocol `mapstructure:"protocol"` - Resolver ResolverSettings `mapstructure:"resolver"` - RoutingKey string `mapstructure:"routing_key"` - ResourceAttrKey string `mapstructure:"resource_attr_key"` + Protocol Protocol `mapstructure:"protocol"` + Resolver ResolverSettings `mapstructure:"resolver"` + RoutingKey string `mapstructure:"routing_key"` + ResourceAttrKeys []string `mapstructure:"resource_attr_key"` } // Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment. diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 349b05d7b766..24683fb155be 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -35,14 +35,16 @@ import ( var _ exporter.Traces = (*traceExporterImp)(nil) type traceExporterImp struct { - loadBalancer loadBalancer - routingKey routingKey - resourceAttrKey string + loadBalancer loadBalancer + routingKey routingKey + resourceAttrKeys []string stopped bool shutdownWg sync.WaitGroup } +type routingFunction func(ptrace.Traces) (map[string][]int, error) + // Create new traces exporter func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*traceExporterImp, error) { exporterFactory := otlpexporter.NewFactory() @@ -62,7 +64,7 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t traceExporter.routingKey = svcRouting case "resourceAttr": traceExporter.routingKey = resourceAttrRouting - traceExporter.resourceAttrKey = cfg.(*Config).ResourceAttrKey + traceExporter.resourceAttrKeys = cfg.(*Config).ResourceAttrKeys case "traceID", "": default: return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey) @@ -76,51 +78,59 @@ func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { return oCfg } -func SplitTracesByResourceAttr(batch ptrace.Traces, attrKey string) []ptrace.Traces { - var result []ptrace.Traces - var strKey string - - for i := 0; i < batch.ResourceSpans().Len(); i++ { - rs := batch.ResourceSpans().At(i) - for j := 0; j < rs.ScopeSpans().Len(); j++ { - // the batches for this ILS - batches := map[string]ptrace.ResourceSpans{} - - key, ok := rs.Resource().Attributes().Get(attrKey) - - ils := rs.ScopeSpans().At(j) - for k := 0; k < ils.Spans().Len(); k++ { - span := ils.Spans().At(k) - - if !ok { - strKey = span.TraceID().String() - } else { - strKey = key.Str() - } - - // for the first traceID in the ILS, initialize the map entry - // and add the singleTraceBatch to the result list - if _, ok := batches[strKey]; !ok { - trace := ptrace.NewTraces() - newRS := trace.ResourceSpans().AppendEmpty() - // currently, the ResourceSpans implementation has only a Resource and an ILS. We'll copy the Resource - // and set our own ILS - rs.Resource().CopyTo(newRS.Resource()) - newRS.SetSchemaUrl(rs.SchemaUrl()) - newILS := newRS.ScopeSpans().AppendEmpty() - // currently, the ILS implementation has only an InstrumentationLibrary and spans. We'll copy the library - // and set our own spans - ils.Scope().CopyTo(newILS.Scope()) - newILS.SetSchemaUrl(ils.SchemaUrl()) - batches[strKey] = newRS - - result = append(result, trace) - } - - // there is only one instrumentation library per batch - tgt := batches[strKey].ScopeSpans().At(0).Spans().AppendEmpty() - span.CopyTo(tgt) +func SplitTracesByResourceAttr(batches ptrace.Traces, attrKeys []string) map[string][]ptrace.Traces { + // This code is based on the ConsumeTraces function of the batchperresourceattr + // modified to support multiple routing keys + fallback on the traceId when routing attr does not exist + result := make(map[string][]ptrace.Traces) + + rss := batches.ResourceSpans() + lenRss := rss.Len() + + if lenRss <= 1 { + return map[string][]ptrace.Traces{attrKeys[0]: {batches}} + } + + indicesByAttr := make(map[string]map[string][]int) + var fallbackIndices []int + var attrFound bool + for i := 0; i < lenRss; i++ { + rs := rss.At(i) + attrFound = false + for _, attrKey := range attrKeys { + var keyValue string + if attributeValue, ok := rs.Resource().Attributes().Get(attrKey); ok { + keyValue = attributeValue.Str() + indicesByAttr[attrKey][keyValue] = append(indicesByAttr[attrKey][keyValue], i) + attrFound = true + break + } + } + if !attrFound { + // These will be processed further to be split per traceID + fallbackIndices = append(fallbackIndices, i) + } + } + + if len(indicesByAttr) <= 1 && len(fallbackIndices) == 0 { + return map[string][]ptrace.Traces{"traceId": {batches}} + } + + for j := 0; j < len(fallbackIndices); j++ { + t := ptrace.NewTraces() + rs := rss.At(j) + rs.CopyTo(t.ResourceSpans().AppendEmpty()) + nt := batchpersignal.SplitTraces(t) + result["traceId"] = append(result["traceId"], nt...) + } + + for routeKey, routeKeyAttrs := range indicesByAttr { + for _, indices := range routeKeyAttrs { + tracesForAttr := ptrace.NewTraces() + for _, i := range indices { + rs := rss.At(i) + rs.CopyTo((tracesForAttr.ResourceSpans().AppendEmpty())) } + result[routeKey] = append(result[routeKey], tracesForAttr) } } @@ -143,37 +153,44 @@ func (e *traceExporterImp) Shutdown(context.Context) error { func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { var errs error - var batches []ptrace.Traces + var batches = make(map[string][]ptrace.Traces) if e.routingKey == resourceAttrRouting { - batches = SplitTracesByResourceAttr(td, e.resourceAttrKey) + batches = SplitTracesByResourceAttr(td, e.resourceAttrKeys) } else { - batches = batchpersignal.SplitTraces(td) + batches["traceId"] = batchpersignal.SplitTraces(td) } - for _, batch := range batches { - errs = multierr.Append(errs, e.consumeTrace(ctx, batch)) + rfs := make(map[string]routingFunction) + for key, _ := range batches { + if key == "traceId" { + rfs[key] = func(x ptrace.Traces) (map[string][]int, error) { + return routeByTraceId(x) + } + } else { + rfs[key] = func(x ptrace.Traces) (map[string][]int, error) { + return routeByResourceAttr(x, key) + } + } + } + for key, tb := range batches { + for _, t := range tb { + errs = multierr.Append(errs, e.consumeTrace(ctx, t, rfs[key])) + } } return errs } -func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error { +func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces, rf routingFunction) error { var exp component.Component - var attrKey string - if e.routingKey == svcRouting || e.routingKey == traceIDRouting { - attrKey = "" - } else { - attrKey = e.resourceAttrKey - } - routingIds, err := routingIdentifiersFromTraces(td, e.routingKey, attrKey) + routingIds, err := rf(td) if err != nil { return err } var rid string - for key := range routingIds { + for key, _ := range routingIds { rid = key break } - // for rid := range routingIds { endpoint := e.loadBalancer.Endpoint([]byte(rid)) exp, err = e.loadBalancer.Exporter(endpoint) if err != nil { @@ -200,48 +217,39 @@ func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) e []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, mBackendLatency.M(duration.Milliseconds())) } - // } return err } -func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey string) (map[string][]int, error) { +func validateNotEmpty(td ptrace.Traces) error { + rs := td.ResourceSpans() + if rs.Len() == 0 { + return errors.New("empty resource spans") + } + ils := rs.At(0).ScopeSpans() + if ils.Len() == 0 { + return errors.New("empty scope spans") + } + spans := ils.At(0).Spans() + if spans.Len() == 0 { + return errors.New("empty spans") + } + return nil +} + +func routeByTraceId(td ptrace.Traces) (map[string][]int, error) { ids := make(map[string][]int) rs := td.ResourceSpans() if rs.Len() == 0 { return nil, errors.New("empty resource spans") } - ils := rs.At(0).ScopeSpans() if ils.Len() == 0 { return nil, errors.New("empty scope spans") } - spans := ils.At(0).Spans() if spans.Len() == 0 { return nil, errors.New("empty spans") } - - if routing == svcRouting || routing == resourceAttrRouting { - var attrKey string - if routing == svcRouting { - attrKey = "service.name" - } else { - attrKey = routeKey - } - for i := 0; i < rs.Len(); i++ { - attr, ok := rs.At(i).Resource().Attributes().Get(attrKey) - if !ok { - return nil, errors.New("unable to get attribute") - } - _, exists := ids[attr.Str()] - if exists { - ids[attr.Str()] = []int{i} - } else { - ids[attr.Str()] = append(ids[attr.Str()], i) - } - } - return ids, nil - } tid := spans.At(0).TraceID() ids[string(tid[:])] = []int{} for i := 0; i < rs.Len(); i++ { @@ -249,3 +257,22 @@ func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey } return ids, nil } + +func routeByResourceAttr(td ptrace.Traces, routeKey string) (map[string][]int, error) { + ids := make(map[string][]int) + err := validateNotEmpty(td) + if err != nil { + return nil, err + } + + rs := td.ResourceSpans() + for i := 0; i < rs.Len(); i++ { + attr, ok := rs.At(i).Resource().Attributes().Get(routeKey) + if !ok { + // If resource attribute is not found, falls back to Trace ID routing + return routeByTraceId(td) + } + ids[attr.Str()] = append(ids[attr.Str()], i) + } + return ids, nil +} From 0ed2fe152005914da0eff484c7a1b9b9aabd3271 Mon Sep 17 00:00:00 2001 From: Zjan Turla Date: Fri, 17 Feb 2023 13:13:33 +1100 Subject: [PATCH 05/11] fix tests --- .../testdata/config.yaml | 29 +++++++++++++-- .../trace_exporter_test.go | 36 +++++++++++-------- 2 files changed, 48 insertions(+), 17 deletions(-) diff --git a/exporter/loadbalancingexporter/testdata/config.yaml b/exporter/loadbalancingexporter/testdata/config.yaml index a1ec52ed8d85..73cd1d9b17e7 100644 --- a/exporter/loadbalancingexporter/testdata/config.yaml +++ b/exporter/loadbalancingexporter/testdata/config.yaml @@ -8,8 +8,8 @@ loadbalancing: resolver: static: hostnames: - - endpoint-1 # assumes 4317 as the default port - - endpoint-2:55678 + - endpoint-1 # assumes 4317 as the default port + - endpoint-2:55678 loadbalancing/2: protocol: otlp: @@ -27,3 +27,28 @@ loadbalancing/3: dns: hostname: service-1 port: 55690 +loadbalancing/4: + protocol: + otlp: + resolver: + dns: + hostname: service-1 + port: 55690 + routing_key: traceID +loadbalancing/5: + protocol: + otlp: + resolver: + dns: + hostname: service-1 + port: 55690 + routing_key: service +loadbalancing/6: + protocol: + otlp: + resolver: + dns: + hostname: service-1 + port: 55690 + routing_key: resourceAttr + resource_attr_key: "resource.attribute" diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index 4b467be07a8e..129ffd3cd17b 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -203,26 +203,28 @@ func TestConsumeTracesServiceBased(t *testing.T) { func TestServiceBasedRoutingForSameTraceId(t *testing.T) { b := pcommon.TraceID([16]byte{1, 2, 3, 4}) for _, tt := range []struct { - desc string - batch ptrace.Traces - routingKey routingKey - res map[string][]int + desc string + batch ptrace.Traces + rf routingFunction + res map[string][]int }{ { "same trace id and different services - service based routing", twoServicesWithSameTraceID(), - svcRouting, + func(t ptrace.Traces) (map[string][]int, error) { + return routeByResourceAttr(t, "service.name") + }, map[string][]int{"ad-service-1": {0}, "get-recommendations-7": {1}}, }, { "same trace id and different services - trace id routing", twoServicesWithSameTraceID(), - traceIDRouting, + routeByTraceId, map[string][]int{string(b[:]): {0, 1}}, }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, "") + res, err := tt.rf(tt.batch) assert.Equal(t, err, nil) assert.Equal(t, res, tt.res) }) @@ -359,15 +361,16 @@ func TestBatchWithTwoTraces(t *testing.T) { func TestNoTracesInBatch(t *testing.T) { for _, tt := range []struct { - desc string - batch ptrace.Traces - routingKey routingKey - err error + desc string + batch ptrace.Traces + rf routingFunction + err error }{ + // Trace ID routing { "no resource spans", ptrace.NewTraces(), - traceIDRouting, + routeByTraceId, errors.New("empty resource spans"), }, { @@ -377,9 +380,10 @@ func TestNoTracesInBatch(t *testing.T) { batch.ResourceSpans().AppendEmpty() return batch }(), - traceIDRouting, + routeByTraceId, errors.New("empty scope spans"), }, + // Service / Resource Attribute routing { "no spans", func() ptrace.Traces { @@ -387,12 +391,14 @@ func TestNoTracesInBatch(t *testing.T) { batch.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty() return batch }(), - svcRouting, + func(t ptrace.Traces) (map[string][]int, error) { + return routeByResourceAttr(t, "service.name") + }, errors.New("empty spans"), }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, "") + res, err := tt.rf(tt.batch) assert.Equal(t, err, tt.err) assert.Equal(t, res, map[string][]int(nil)) }) From e46badc1967a0482cab139869cb7810a71f46ee4 Mon Sep 17 00:00:00 2001 From: Zjan Turla Date: Fri, 17 Feb 2023 14:23:33 +1100 Subject: [PATCH 06/11] address comments --- exporter/loadbalancingexporter/config.go | 2 +- exporter/loadbalancingexporter/testdata/config.yaml | 2 +- exporter/loadbalancingexporter/trace_exporter.go | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go index b6f08227daf8..203b97ea184b 100644 --- a/exporter/loadbalancingexporter/config.go +++ b/exporter/loadbalancingexporter/config.go @@ -33,7 +33,7 @@ type Config struct { Protocol Protocol `mapstructure:"protocol"` Resolver ResolverSettings `mapstructure:"resolver"` RoutingKey string `mapstructure:"routing_key"` - ResourceAttrKeys []string `mapstructure:"resource_attr_key"` + ResourceAttrKeys []string `mapstructure:"resource_attr_keys"` } // Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment. diff --git a/exporter/loadbalancingexporter/testdata/config.yaml b/exporter/loadbalancingexporter/testdata/config.yaml index 73cd1d9b17e7..48435f34a92b 100644 --- a/exporter/loadbalancingexporter/testdata/config.yaml +++ b/exporter/loadbalancingexporter/testdata/config.yaml @@ -51,4 +51,4 @@ loadbalancing/6: hostname: service-1 port: 55690 routing_key: resourceAttr - resource_attr_key: "resource.attribute" + resource_attr_keys: ["resource.attribute", "service.name"] diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 24683fb155be..81d34eebcc11 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -98,6 +98,9 @@ func SplitTracesByResourceAttr(batches ptrace.Traces, attrKeys []string) map[str attrFound = false for _, attrKey := range attrKeys { var keyValue string + if _, ok := indicesByAttr[attrKey]; !ok { + indicesByAttr[attrKey] = make(map[string][]int) + } if attributeValue, ok := rs.Resource().Attributes().Get(attrKey); ok { keyValue = attributeValue.Str() indicesByAttr[attrKey][keyValue] = append(indicesByAttr[attrKey][keyValue], i) From a636d87f91ec53f8de4a1a945c1cd82ae71795eb Mon Sep 17 00:00:00 2001 From: Zjan Turla Date: Fri, 17 Feb 2023 14:27:03 +1100 Subject: [PATCH 07/11] more cleanup --- exporter/loadbalancingexporter/trace_exporter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 81d34eebcc11..fb3d0c14387d 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -163,7 +163,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) batches["traceId"] = batchpersignal.SplitTraces(td) } rfs := make(map[string]routingFunction) - for key, _ := range batches { + for key := range batches { if key == "traceId" { rfs[key] = func(x ptrace.Traces) (map[string][]int, error) { return routeByTraceId(x) @@ -190,7 +190,7 @@ func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces, r return err } var rid string - for key, _ := range routingIds { + for key := range routingIds { rid = key break } From e368114ab7ca8cbfd194cbfcefb1cee70f241f79 Mon Sep 17 00:00:00 2001 From: Zjan Turla Date: Fri, 17 Feb 2023 19:23:41 +1100 Subject: [PATCH 08/11] fix issue that occurs when splitting a single trace (defaults to traceid) and add logger support --- exporter/loadbalancingexporter/factory.go | 3 +- .../loadbalancer_test.go | 3 +- .../loadbalancingexporter/trace_exporter.go | 25 +++--- .../trace_exporter_test.go | 81 ++++++++++++++++--- 4 files changed, 87 insertions(+), 25 deletions(-) diff --git a/exporter/loadbalancingexporter/factory.go b/exporter/loadbalancingexporter/factory.go index e8655a891685..661352025362 100644 --- a/exporter/loadbalancingexporter/factory.go +++ b/exporter/loadbalancingexporter/factory.go @@ -21,6 +21,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/otlpexporter" + "go.uber.org/zap" ) const ( @@ -54,7 +55,7 @@ func createDefaultConfig() component.Config { } func createTracesExporter(_ context.Context, params exporter.CreateSettings, cfg component.Config) (exporter.Traces, error) { - return newTracesExporter(params, cfg) + return newTracesExporter(params, cfg, zap.NewNop()) } func createLogsExporter(_ context.Context, params exporter.CreateSettings, cfg component.Config) (exporter.Logs, error) { diff --git a/exporter/loadbalancingexporter/loadbalancer_test.go b/exporter/loadbalancingexporter/loadbalancer_test.go index 75cc18d29701..c3c54484e353 100644 --- a/exporter/loadbalancingexporter/loadbalancer_test.go +++ b/exporter/loadbalancingexporter/loadbalancer_test.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/otlpexporter" + "go.uber.org/zap" ) func TestNewLoadBalancerNoResolver(t *testing.T) { @@ -178,7 +179,7 @@ func TestStartFailureStaticResolver(t *testing.T) { func TestLoadBalancerShutdown(t *testing.T) { // prepare cfg := simpleConfig() - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg, zap.NewNop()) require.NotNil(t, p) require.NoError(t, err) diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index fb3d0c14387d..6fc57f342566 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -30,6 +30,7 @@ import ( "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/multierr" + "go.uber.org/zap" ) var _ exporter.Traces = (*traceExporterImp)(nil) @@ -41,12 +42,13 @@ type traceExporterImp struct { stopped bool shutdownWg sync.WaitGroup + logger *zap.Logger } type routingFunction func(ptrace.Traces) (map[string][]int, error) // Create new traces exporter -func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*traceExporterImp, error) { +func newTracesExporter(params exporter.CreateSettings, cfg component.Config, logger *zap.Logger) (*traceExporterImp, error) { exporterFactory := otlpexporter.NewFactory() lb, err := newLoadBalancer(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) { @@ -57,7 +59,7 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t return nil, err } - traceExporter := traceExporterImp{loadBalancer: lb, routingKey: traceIDRouting} + traceExporter := traceExporterImp{loadBalancer: lb, routingKey: traceIDRouting, logger: logger} switch cfg.(*Config).RoutingKey { case "service": @@ -78,7 +80,7 @@ func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { return oCfg } -func SplitTracesByResourceAttr(batches ptrace.Traces, attrKeys []string) map[string][]ptrace.Traces { +func SplitTracesByResourceAttr(batches ptrace.Traces, attrKeys []string) (map[string][]ptrace.Traces, error) { // This code is based on the ConsumeTraces function of the batchperresourceattr // modified to support multiple routing keys + fallback on the traceId when routing attr does not exist result := make(map[string][]ptrace.Traces) @@ -87,7 +89,7 @@ func SplitTracesByResourceAttr(batches ptrace.Traces, attrKeys []string) map[str lenRss := rss.Len() if lenRss <= 1 { - return map[string][]ptrace.Traces{attrKeys[0]: {batches}} + return map[string][]ptrace.Traces{attrKeys[0]: {batches}}, nil } indicesByAttr := make(map[string]map[string][]int) @@ -114,10 +116,6 @@ func SplitTracesByResourceAttr(batches ptrace.Traces, attrKeys []string) map[str } } - if len(indicesByAttr) <= 1 && len(fallbackIndices) == 0 { - return map[string][]ptrace.Traces{"traceId": {batches}} - } - for j := 0; j < len(fallbackIndices); j++ { t := ptrace.NewTraces() rs := rss.At(j) @@ -137,7 +135,7 @@ func SplitTracesByResourceAttr(batches ptrace.Traces, attrKeys []string) map[str } } - return result + return result, nil } func (e *traceExporterImp) Capabilities() consumer.Capabilities { @@ -155,10 +153,14 @@ func (e *traceExporterImp) Shutdown(context.Context) error { } func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + var err error var errs error var batches = make(map[string][]ptrace.Traces) if e.routingKey == resourceAttrRouting { - batches = SplitTracesByResourceAttr(td, e.resourceAttrKeys) + batches, err = SplitTracesByResourceAttr(td, e.resourceAttrKeys) + if err != nil { + return err + } } else { batches["traceId"] = batchpersignal.SplitTraces(td) } @@ -277,5 +279,8 @@ func routeByResourceAttr(td ptrace.Traces, routeKey string) (map[string][]int, e } ids[attr.Str()] = append(ids[attr.Str()], i) } + if len(ids) > 1 { + return nil, errors.New("received traces were not split by resource attr") + } return ids, nil } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index 129ffd3cd17b..2bbb34828313 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -61,7 +61,7 @@ func TestNewTracesExporter(t *testing.T) { } { t.Run(tt.desc, func(t *testing.T) { // test - _, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config) + _, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config, zap.NewNop()) // verify require.Equal(t, tt.err, err) @@ -78,7 +78,7 @@ func TestTracesExporterStart(t *testing.T) { { "ok", func() *traceExporterImp { - p, _ := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + p, _ := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), zap.NewNop()) return p }(), nil, @@ -87,7 +87,7 @@ func TestTracesExporterStart(t *testing.T) { "error", func() *traceExporterImp { lb, _ := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), nil) - p, _ := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + p, _ := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), zap.NewNop()) lb.res = &mockResolver{ onStart: func(context.Context) error { @@ -117,7 +117,7 @@ func TestTracesExporterStart(t *testing.T) { } func TestTracesExporterShutdown(t *testing.T) { - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), zap.NewNop()) require.NotNil(t, p) require.NoError(t, err) @@ -136,7 +136,7 @@ func TestConsumeTraces(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), zap.NewNop()) require.NotNil(t, p) require.NoError(t, err) assert.Equal(t, p.routingKey, traceIDRouting) @@ -172,7 +172,7 @@ func TestConsumeTracesServiceBased(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), serviceBasedRoutingConfig()) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), serviceBasedRoutingConfig(), zap.NewNop()) require.NotNil(t, p) require.NoError(t, err) assert.Equal(t, p.routingKey, svcRouting) @@ -207,6 +207,7 @@ func TestServiceBasedRoutingForSameTraceId(t *testing.T) { batch ptrace.Traces rf routingFunction res map[string][]int + err error }{ { "same trace id and different services - service based routing", @@ -214,23 +215,67 @@ func TestServiceBasedRoutingForSameTraceId(t *testing.T) { func(t ptrace.Traces) (map[string][]int, error) { return routeByResourceAttr(t, "service.name") }, - map[string][]int{"ad-service-1": {0}, "get-recommendations-7": {1}}, + nil, + errors.New("received traces were not split by resource attr"), }, { "same trace id and different services - trace id routing", twoServicesWithSameTraceID(), routeByTraceId, map[string][]int{string(b[:]): {0, 1}}, + nil, }, } { t.Run(tt.desc, func(t *testing.T) { res, err := tt.rf(tt.batch) - assert.Equal(t, err, nil) - assert.Equal(t, res, tt.res) + assert.Equal(t, tt.err, err) + assert.Equal(t, tt.res, res) }) } } +func TestAttrBasedRouting(t *testing.T) { + sink := new(consumertest.TracesSink) + componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { + return newMockTracesExporter(sink.ConsumeTraces), nil + } + lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), attrBasedRoutingConfig(), componentFactory) + require.NotNil(t, lb) + require.NoError(t, err) + + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), attrBasedRoutingConfig(), zap.NewNop()) + require.NotNil(t, p) + require.NoError(t, err) + assert.Equal(t, p.routingKey, resourceAttrRouting) + + // pre-load an exporter here, so that we don't use the actual OTLP exporter + lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) + lb.addMissingExporters(context.Background(), []string{"endpoint-2"}) + lb.addMissingExporters(context.Background(), []string{"endpoint-3"}) + lb.res = &mockResolver{ + triggerCallbacks: true, + onResolve: func(ctx context.Context) ([]string, error) { + return []string{"endpoint-1", "endpoint-2", "endpoint-3"}, nil + }, + } + p.loadBalancer = lb + + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + // test + res := p.ConsumeTraces(context.Background(), twoServicesWithSameTraceID()) + + // verify + assert.Nil(t, res) + + // Verify that the single Trace was split into two based on service name + assert.Len(t, sink.AllTraces(), 2) +} + func TestConsumeTracesExporterNoEndpoint(t *testing.T) { componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { return newNopMockTracesExporter(), nil @@ -239,7 +284,7 @@ func TestConsumeTracesExporterNoEndpoint(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), zap.NewNop()) require.NotNil(t, p) require.NoError(t, err) @@ -273,7 +318,7 @@ func TestConsumeTracesUnexpectedExporterType(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), zap.NewNop()) require.NotNil(t, p) require.NoError(t, err) @@ -338,7 +383,7 @@ func TestBatchWithTwoTraces(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig()) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), zap.NewNop()) require.NotNil(t, p) require.NoError(t, err) @@ -469,7 +514,7 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { require.NotNil(t, lb) require.NoError(t, err) - p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg) + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg, zap.NewNop()) require.NotNil(t, p) require.NoError(t, err) @@ -605,6 +650,16 @@ func serviceBasedRoutingConfig() *Config { } } +func attrBasedRoutingConfig() *Config { + return &Config{ + Resolver: ResolverSettings{ + Static: &StaticResolver{Hostnames: []string{"endpoint-1", "endpoint-2", "endpoint-3"}}, + }, + RoutingKey: "resourceAttr", + ResourceAttrKeys: []string{"service.name"}, + } +} + type mockTracesExporter struct { component.Component ConsumeTracesFn func(ctx context.Context, td ptrace.Traces) error From aa77217a4f0f1b2916a3e64412fb7760c02d3712 Mon Sep 17 00:00:00 2001 From: Zjan Turla Date: Fri, 24 Feb 2023 14:33:00 +1100 Subject: [PATCH 09/11] fix issues when processing single resource span traces or falling back to trace ID balancing --- exporter/loadbalancingexporter/trace_exporter.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 6fc57f342566..3d2436c3944c 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -88,10 +88,6 @@ func SplitTracesByResourceAttr(batches ptrace.Traces, attrKeys []string) (map[st rss := batches.ResourceSpans() lenRss := rss.Len() - if lenRss <= 1 { - return map[string][]ptrace.Traces{attrKeys[0]: {batches}}, nil - } - indicesByAttr := make(map[string]map[string][]int) var fallbackIndices []int var attrFound bool @@ -118,7 +114,7 @@ func SplitTracesByResourceAttr(batches ptrace.Traces, attrKeys []string) (map[st for j := 0; j < len(fallbackIndices); j++ { t := ptrace.NewTraces() - rs := rss.At(j) + rs := rss.At(fallbackIndices[j]) rs.CopyTo(t.ResourceSpans().AppendEmpty()) nt := batchpersignal.SplitTraces(t) result["traceId"] = append(result["traceId"], nt...) From 7429ae0c9a4c1b2ff7d58b18162a7e0faed277a4 Mon Sep 17 00:00:00 2001 From: Zjan Turla Date: Thu, 16 Mar 2023 01:05:40 +1100 Subject: [PATCH 10/11] refactor and address comments --- exporter/loadbalancingexporter/config.go | 8 +- .../testdata/config.yaml | 4 +- .../loadbalancingexporter/trace_exporter.go | 249 ++++++++---------- .../trace_exporter_test.go | 195 ++++++++++++-- 4 files changed, 280 insertions(+), 176 deletions(-) diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go index 203b97ea184b..936f2f1de46a 100644 --- a/exporter/loadbalancingexporter/config.go +++ b/exporter/loadbalancingexporter/config.go @@ -30,10 +30,10 @@ const ( // Config defines configuration for the exporter. type Config struct { - Protocol Protocol `mapstructure:"protocol"` - Resolver ResolverSettings `mapstructure:"resolver"` - RoutingKey string `mapstructure:"routing_key"` - ResourceAttrKeys []string `mapstructure:"resource_attr_keys"` + Protocol Protocol `mapstructure:"protocol"` + Resolver ResolverSettings `mapstructure:"resolver"` + RoutingKey string `mapstructure:"routing_key"` + ResourceKeys []string `mapstructure:"resource_keys"` } // Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment. diff --git a/exporter/loadbalancingexporter/testdata/config.yaml b/exporter/loadbalancingexporter/testdata/config.yaml index 48435f34a92b..bef4f299574e 100644 --- a/exporter/loadbalancingexporter/testdata/config.yaml +++ b/exporter/loadbalancingexporter/testdata/config.yaml @@ -50,5 +50,5 @@ loadbalancing/6: dns: hostname: service-1 port: 55690 - routing_key: resourceAttr - resource_attr_keys: ["resource.attribute", "service.name"] + routing_key: resource + resource_keys: ["resource.attribute", "service.name"] diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 3d2436c3944c..716407d0e380 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -36,16 +36,22 @@ import ( var _ exporter.Traces = (*traceExporterImp)(nil) type traceExporterImp struct { - loadBalancer loadBalancer - routingKey routingKey - resourceAttrKeys []string + loadBalancer loadBalancer + resourceKeys []string - stopped bool - shutdownWg sync.WaitGroup - logger *zap.Logger + traceConsumer traceConsumer + stopped bool + shutdownWg sync.WaitGroup + logger *zap.Logger } -type routingFunction func(ptrace.Traces) (map[string][]int, error) +type routingEntry struct { + routingKey routingKey + keyValue string + trace ptrace.Traces +} + +type traceConsumer func(ctx context.Context, td ptrace.Traces) error // Create new traces exporter func newTracesExporter(params exporter.CreateSettings, cfg component.Config, logger *zap.Logger) (*traceExporterImp, error) { @@ -59,15 +65,17 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config, log return nil, err } - traceExporter := traceExporterImp{loadBalancer: lb, routingKey: traceIDRouting, logger: logger} + traceExporter := traceExporterImp{loadBalancer: lb, logger: logger} switch cfg.(*Config).RoutingKey { case "service": - traceExporter.routingKey = svcRouting - case "resourceAttr": - traceExporter.routingKey = resourceAttrRouting - traceExporter.resourceAttrKeys = cfg.(*Config).ResourceAttrKeys + traceExporter.traceConsumer = traceExporter.consumeTracesByResource + traceExporter.resourceKeys = []string{"service.name"} + case "resource": + traceExporter.traceConsumer = traceExporter.consumeTracesByResource + traceExporter.resourceKeys = cfg.(*Config).ResourceKeys case "traceID", "": + traceExporter.traceConsumer = traceExporter.consumeTracesById default: return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey) } @@ -80,60 +88,6 @@ func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { return oCfg } -func SplitTracesByResourceAttr(batches ptrace.Traces, attrKeys []string) (map[string][]ptrace.Traces, error) { - // This code is based on the ConsumeTraces function of the batchperresourceattr - // modified to support multiple routing keys + fallback on the traceId when routing attr does not exist - result := make(map[string][]ptrace.Traces) - - rss := batches.ResourceSpans() - lenRss := rss.Len() - - indicesByAttr := make(map[string]map[string][]int) - var fallbackIndices []int - var attrFound bool - for i := 0; i < lenRss; i++ { - rs := rss.At(i) - attrFound = false - for _, attrKey := range attrKeys { - var keyValue string - if _, ok := indicesByAttr[attrKey]; !ok { - indicesByAttr[attrKey] = make(map[string][]int) - } - if attributeValue, ok := rs.Resource().Attributes().Get(attrKey); ok { - keyValue = attributeValue.Str() - indicesByAttr[attrKey][keyValue] = append(indicesByAttr[attrKey][keyValue], i) - attrFound = true - break - } - } - if !attrFound { - // These will be processed further to be split per traceID - fallbackIndices = append(fallbackIndices, i) - } - } - - for j := 0; j < len(fallbackIndices); j++ { - t := ptrace.NewTraces() - rs := rss.At(fallbackIndices[j]) - rs.CopyTo(t.ResourceSpans().AppendEmpty()) - nt := batchpersignal.SplitTraces(t) - result["traceId"] = append(result["traceId"], nt...) - } - - for routeKey, routeKeyAttrs := range indicesByAttr { - for _, indices := range routeKeyAttrs { - tracesForAttr := ptrace.NewTraces() - for _, i := range indices { - rs := rss.At(i) - rs.CopyTo((tracesForAttr.ResourceSpans().AppendEmpty())) - } - result[routeKey] = append(result[routeKey], tracesForAttr) - } - } - - return result, nil -} - func (e *traceExporterImp) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } @@ -149,51 +103,45 @@ func (e *traceExporterImp) Shutdown(context.Context) error { } func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - var err error + return e.traceConsumer(ctx, td) +} + +func (e *traceExporterImp) consumeTracesById(ctx context.Context, td ptrace.Traces) error { var errs error - var batches = make(map[string][]ptrace.Traces) - if e.routingKey == resourceAttrRouting { - batches, err = SplitTracesByResourceAttr(td, e.resourceAttrKeys) - if err != nil { - return err - } - } else { - batches["traceId"] = batchpersignal.SplitTraces(td) - } - rfs := make(map[string]routingFunction) - for key := range batches { - if key == "traceId" { - rfs[key] = func(x ptrace.Traces) (map[string][]int, error) { - return routeByTraceId(x) - } + batches := batchpersignal.SplitTraces(td) + + for _, t := range batches { + if tid, err := routeByTraceId(t); err == nil { + errs = multierr.Append(errs, e.consumeTrace(ctx, t, tid)) } else { - rfs[key] = func(x ptrace.Traces) (map[string][]int, error) { - return routeByResourceAttr(x, key) - } - } - } - for key, tb := range batches { - for _, t := range tb { - errs = multierr.Append(errs, e.consumeTrace(ctx, t, rfs[key])) + return err } } - return errs } -func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces, rf routingFunction) error { - var exp component.Component - routingIds, err := rf(td) +func (e *traceExporterImp) consumeTracesByResource(ctx context.Context, td ptrace.Traces) error { + var errs error + routeBatches, err := splitTracesByResourceAttr(td, e.resourceKeys) if err != nil { return err } - var rid string - for key := range routingIds { - rid = key - break + for _, batch := range routeBatches { + switch batch.routingKey { + case resourceAttrRouting: + errs = multierr.Append(errs, e.consumeTrace(ctx, batch.trace, batch.keyValue)) + case traceIDRouting: + errs = multierr.Append(errs, e.consumeTracesById(ctx, batch.trace)) + } } + return errs + +} + +func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces, rid string) error { + // Routes a single trace via a given routing ID endpoint := e.loadBalancer.Endpoint([]byte(rid)) - exp, err = e.loadBalancer.Exporter(endpoint) + exp, err := e.loadBalancer.Exporter(endpoint) if err != nil { return err } @@ -221,62 +169,79 @@ func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces, r return err } -func validateNotEmpty(td ptrace.Traces) error { - rs := td.ResourceSpans() - if rs.Len() == 0 { - return errors.New("empty resource spans") +func getResourceAttrValue(rs ptrace.ResourceSpans, resourceKeys []string) (string, bool) { + for _, attrKey := range resourceKeys { + if attributeValue, ok := rs.Resource().Attributes().Get(attrKey); ok { + return attributeValue.Str(), true + } } - ils := rs.At(0).ScopeSpans() - if ils.Len() == 0 { - return errors.New("empty scope spans") + return "", false +} + +func splitTracesByResourceAttr(batches ptrace.Traces, resourceKeys []string) ([]routingEntry, error) { + // This function batches all the ResourceSpans with the same routing resource attribute value into a single ptrace.Trace + // This returns a list of routing entries which consists of the routing key, routing key value and the trace + // There should be a 1:1 mapping between key value <-> trace + // This is because we group all Resource Spans with the same key value under a single trace + result := []routingEntry{} + rss := batches.ResourceSpans() + + // This is a mapping between the resource attribute values found and the constructed trace + routeMap := make(map[string]ptrace.Traces) + + for i := 0; i < rss.Len(); i++ { + rs := rss.At(i) + if keyValue, ok := getResourceAttrValue(rs, resourceKeys); ok { + // Check if this keyValue has previously been seen + // if not it constructs an empty ptrace.Trace + if _, ok := routeMap[keyValue]; !ok { + routeMap[keyValue] = ptrace.NewTraces() + } + rs.CopyTo(routeMap[keyValue].ResourceSpans().AppendEmpty()) + } else { + // If none of the resource attributes have been found + // We fallback to routing the given Resource Span by Trace ID + t := ptrace.NewTraces() + rs.CopyTo(t.ResourceSpans().AppendEmpty()) + // We can't route this whole Resource Span by a single trace ID + // because it's possible for the spans under the RS to have different trace IDs + result = append(result, routingEntry{ + routingKey: traceIDRouting, + trace: t, + }) + } } - spans := ils.At(0).Spans() - if spans.Len() == 0 { - return errors.New("empty spans") + + // We convert the attr value:trace mapping into a list of routingEntries + for key, trace := range routeMap { + result = append(result, routingEntry{ + routingKey: resourceAttrRouting, + keyValue: key, + trace: trace, + }) } - return nil + + return result, nil } -func routeByTraceId(td ptrace.Traces) (map[string][]int, error) { - ids := make(map[string][]int) +func routeByTraceId(td ptrace.Traces) (string, error) { + // This function assumes that you are receiving a single trace i.e. single traceId + // returns the traceId as the routing key rs := td.ResourceSpans() if rs.Len() == 0 { - return nil, errors.New("empty resource spans") + return "", errors.New("empty resource spans") + } + if rs.Len() > 1 { + return "", errors.New("routeByTraceId must receive a ptrace.Traces with a single ResourceSpan") } ils := rs.At(0).ScopeSpans() if ils.Len() == 0 { - return nil, errors.New("empty scope spans") + return "", errors.New("empty scope spans") } spans := ils.At(0).Spans() if spans.Len() == 0 { - return nil, errors.New("empty spans") + return "", errors.New("empty spans") } tid := spans.At(0).TraceID() - ids[string(tid[:])] = []int{} - for i := 0; i < rs.Len(); i++ { - ids[string(tid[:])] = append(ids[string(tid[:])], i) - } - return ids, nil -} - -func routeByResourceAttr(td ptrace.Traces, routeKey string) (map[string][]int, error) { - ids := make(map[string][]int) - err := validateNotEmpty(td) - if err != nil { - return nil, err - } - - rs := td.ResourceSpans() - for i := 0; i < rs.Len(); i++ { - attr, ok := rs.At(i).Resource().Attributes().Get(routeKey) - if !ok { - // If resource attribute is not found, falls back to Trace ID routing - return routeByTraceId(td) - } - ids[attr.Str()] = append(ids[attr.Str()], i) - } - if len(ids) > 1 { - return nil, errors.New("received traces were not split by resource attr") - } - return ids, nil + return string(tid[:]), nil } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index 2bbb34828313..031f9a93c2b0 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -139,8 +139,6 @@ func TestConsumeTraces(t *testing.T) { p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), zap.NewNop()) require.NotNil(t, p) require.NoError(t, err) - assert.Equal(t, p.routingKey, traceIDRouting) - // pre-load an exporter here, so that we don't use the actual OTLP exporter lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) lb.res = &mockResolver{ @@ -175,7 +173,7 @@ func TestConsumeTracesServiceBased(t *testing.T) { p, err := newTracesExporter(exportertest.NewNopCreateSettings(), serviceBasedRoutingConfig(), zap.NewNop()) require.NotNil(t, p) require.NoError(t, err) - assert.Equal(t, p.routingKey, svcRouting) + assert.Equal(t, p.resourceKeys, []string{"service.name"}) // pre-load an exporter here, so that we don't use the actual OTLP exporter lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) @@ -201,39 +199,156 @@ func TestConsumeTracesServiceBased(t *testing.T) { } func TestServiceBasedRoutingForSameTraceId(t *testing.T) { - b := pcommon.TraceID([16]byte{1, 2, 3, 4}) for _, tt := range []struct { desc string batch ptrace.Traces - rf routingFunction - res map[string][]int + res []routingEntry err error }{ { "same trace id and different services - service based routing", twoServicesWithSameTraceID(), - func(t ptrace.Traces) (map[string][]int, error) { - return routeByResourceAttr(t, "service.name") + []routingEntry{ + { + routingKey: resourceAttrRouting, + keyValue: "ad-service-1", + }, + { + routingKey: resourceAttrRouting, + keyValue: "get-recommendations-7", + }, }, nil, - errors.New("received traces were not split by resource attr"), }, + } { + t.Run(tt.desc, func(t *testing.T) { + res, err := splitTracesByResourceAttr(tt.batch, []string{"service.name"}) + for i, r := range res { + assert.Equal(t, tt.res[i].routingKey, r.routingKey) + assert.Equal(t, tt.res[i].keyValue, r.keyValue) + + var sn string + if v, ok := r.trace.ResourceSpans().At(0).Resource().Attributes().Get("service.name"); ok { + sn = v.Str() + } else { + sn = "" + } + assert.Equal(t, tt.res[i].keyValue, sn) + } + assert.Equal(t, tt.err, err) + }) + } +} + +func TestIdBasedRoutingForSameTraceId(t *testing.T) { + for _, tt := range []struct { + desc string + batch ptrace.Traces + res string + err error + }{ { "same trace id and different services - trace id routing", twoServicesWithSameTraceID(), - routeByTraceId, - map[string][]int{string(b[:]): {0, 1}}, - nil, + "", + errors.New("routeByTraceId must receive a ptrace.Traces with a single ResourceSpan"), }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := tt.rf(tt.batch) + res, err := routeByTraceId(tt.batch) assert.Equal(t, tt.err, err) assert.Equal(t, tt.res, res) }) } } +func TestIdBasedRouting(t *testing.T) { + sink := new(consumertest.TracesSink) + componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { + return newMockTracesExporter(sink.ConsumeTraces), nil + } + lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory) + require.NotNil(t, lb) + require.NoError(t, err) + + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), simpleConfig(), zap.NewNop()) + require.NoError(t, err) + require.NotNil(t, p) + + // pre-load an exporter here, so that we don't use the actual OTLP exporter + lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) + lb.addMissingExporters(context.Background(), []string{"endpoint-2"}) + lb.addMissingExporters(context.Background(), []string{"endpoint-3"}) + lb.res = &mockResolver{ + triggerCallbacks: true, + onResolve: func(ctx context.Context) ([]string, error) { + return []string{"endpoint-1", "endpoint-2", "endpoint-3"}, nil + }, + } + p.loadBalancer = lb + + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + // test + trace := twoServicesWithSameTraceID() + appendSimpleTraceWithID(trace.ResourceSpans().At(0), [16]byte{1, 2, 3, 4}) + trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4}) + trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().AppendEmpty().SetTraceID([16]byte{2, 3, 4, 5}) + res := p.ConsumeTraces(context.Background(), trace) + + // Resulting Trace + // { + // ptrace.Traces: [ + // { + // ResourceSpans: [{ + // service.name: "ad-service-1" + // ScopeSpans: [{ + // Spans: [{ + // TraceID: "1234" + // }] + // Spans: [{ + // TraceID: "1234" + // }] + // Spans: [{ + // TraceID: "2345" + // }] + // }] + // ScopeSpans: [{ + // Spans: [{ + // TraceID: "1234" + // }] + // }] + // }] + // }, + // { + // ResourceSpans: [{ + // service.name: "get-recommendation-7" + // ScopeSpans: [{ + // Spans: [{ + // TraceID: "1234" + // }] + // }] + // }] + // } + // ] + // } + + // verify + assert.Nil(t, res) + + // This will be split into four because of the behavior of batchpersignal.SplitTraces + // The ad-service-1 trace is split into 3 + // - 1 trace containing the two "1234" spans + // - 1 trace containing the "2345" span + // - 1 trace containing the span in a different ILS (despite having the same trace ID as the two "1234" spans) + // - 1 trace containing the span in a different RS (despite having the same trace ID as the thre "1234" spans) + assert.Len(t, sink.AllTraces(), 4) +} + func TestAttrBasedRouting(t *testing.T) { sink := new(consumertest.TracesSink) componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) { @@ -244,9 +359,9 @@ func TestAttrBasedRouting(t *testing.T) { require.NoError(t, err) p, err := newTracesExporter(exportertest.NewNopCreateSettings(), attrBasedRoutingConfig(), zap.NewNop()) - require.NotNil(t, p) require.NoError(t, err) - assert.Equal(t, p.routingKey, resourceAttrRouting) + require.NotNil(t, p) + assert.Equal(t, p.resourceKeys, []string{"service.name"}) // pre-load an exporter here, so that we don't use the actual OTLP exporter lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) @@ -267,12 +382,17 @@ func TestAttrBasedRouting(t *testing.T) { }() // test - res := p.ConsumeTraces(context.Background(), twoServicesWithSameTraceID()) + trace := twoServicesWithSameTraceID() + rs := trace.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr(conventions.AttributeServiceName, "ad-service-1") + appendSimpleTraceWithID(rs, [16]byte{1, 2, 3, 4}) + res := p.ConsumeTraces(context.Background(), trace) // verify assert.Nil(t, res) // Verify that the single Trace was split into two based on service name + // With the two `ad-service-1` RS being grouped into a single trace assert.Len(t, sink.AllTraces(), 2) } @@ -404,18 +524,16 @@ func TestBatchWithTwoTraces(t *testing.T) { assert.Len(t, sink.AllTraces(), 2) } -func TestNoTracesInBatch(t *testing.T) { +func TestNoTracesInBatchTraceIdRouting(t *testing.T) { for _, tt := range []struct { desc string batch ptrace.Traces - rf routingFunction err error }{ // Trace ID routing { "no resource spans", ptrace.NewTraces(), - routeByTraceId, errors.New("empty resource spans"), }, { @@ -425,9 +543,24 @@ func TestNoTracesInBatch(t *testing.T) { batch.ResourceSpans().AppendEmpty() return batch }(), - routeByTraceId, errors.New("empty scope spans"), }, + } { + t.Run(tt.desc, func(t *testing.T) { + res, err := routeByTraceId(tt.batch) + assert.Equal(t, err, tt.err) + assert.Equal(t, res, "") + }) + } +} + +func TestNoTracesInBatchResourceRouting(t *testing.T) { + for _, tt := range []struct { + desc string + batch ptrace.Traces + res []routingEntry + err error + }{ // Service / Resource Attribute routing { "no spans", @@ -436,16 +569,22 @@ func TestNoTracesInBatch(t *testing.T) { batch.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty() return batch }(), - func(t ptrace.Traces) (map[string][]int, error) { - return routeByResourceAttr(t, "service.name") + []routingEntry{ + { + routingKey: traceIDRouting, + keyValue: "", + }, }, - errors.New("empty spans"), + nil, }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := tt.rf(tt.batch) - assert.Equal(t, err, tt.err) - assert.Equal(t, res, map[string][]int(nil)) + res, err := splitTracesByResourceAttr(tt.batch, []string{"service.name"}) + assert.Equal(t, tt.err, err) + for i, _ := range res { + assert.Equal(t, tt.res[i].routingKey, res[i].routingKey) + assert.Equal(t, tt.res[i].keyValue, res[i].keyValue) + } }) } } @@ -655,8 +794,8 @@ func attrBasedRoutingConfig() *Config { Resolver: ResolverSettings{ Static: &StaticResolver{Hostnames: []string{"endpoint-1", "endpoint-2", "endpoint-3"}}, }, - RoutingKey: "resourceAttr", - ResourceAttrKeys: []string{"service.name"}, + RoutingKey: "resource", + ResourceKeys: []string{"service.name"}, } } From c09c59540e1f71c8b52e5ffd265cb7d80e6773b0 Mon Sep 17 00:00:00 2001 From: Zjan Turla Date: Thu, 16 Mar 2023 15:17:15 +1100 Subject: [PATCH 11/11] add style guide adherence changes + optimizations --- .../loadbalancingexporter/trace_exporter.go | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 716407d0e380..8f4b65887232 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/otlpexporter" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/multierr" "go.uber.org/zap" @@ -170,12 +171,20 @@ func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces, r } func getResourceAttrValue(rs ptrace.ResourceSpans, resourceKeys []string) (string, bool) { - for _, attrKey := range resourceKeys { - if attributeValue, ok := rs.Resource().Attributes().Get(attrKey); ok { - return attributeValue.Str(), true - } - } - return "", false + res := "" + found := false + rs.Resource().Attributes().Range( + func(k string, v pcommon.Value) bool { + for _, attrKey := range resourceKeys { + if k == attrKey { + res = v.Str() + found = true + return false + } + } + return true + }) + return res, found } func splitTracesByResourceAttr(batches ptrace.Traces, resourceKeys []string) ([]routingEntry, error) { @@ -183,7 +192,7 @@ func splitTracesByResourceAttr(batches ptrace.Traces, resourceKeys []string) ([] // This returns a list of routing entries which consists of the routing key, routing key value and the trace // There should be a 1:1 mapping between key value <-> trace // This is because we group all Resource Spans with the same key value under a single trace - result := []routingEntry{} + var result []routingEntry rss := batches.ResourceSpans() // This is a mapping between the resource attribute values found and the constructed trace