From 708f791053e78429bb3135f0863224692275cef2 Mon Sep 17 00:00:00 2001 From: Zjan Turla Date: Wed, 15 Feb 2023 04:10:33 +1100 Subject: [PATCH] Add support for load balancing via arbitrary resource attribute --- exporter/loadbalancingexporter/config.go | 8 +- .../loadbalancingexporter/trace_exporter.go | 160 +++++++++++++----- .../trace_exporter_test.go | 12 +- 3 files changed, 133 insertions(+), 47 deletions(-) diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go index e3914023e0d2..00bd6693f8e9 100644 --- a/exporter/loadbalancingexporter/config.go +++ b/exporter/loadbalancingexporter/config.go @@ -25,13 +25,15 @@ type routingKey int const ( traceIDRouting routingKey = iota svcRouting + resourceAttrRouting ) // Config defines configuration for the exporter. type Config struct { - Protocol Protocol `mapstructure:"protocol"` - Resolver ResolverSettings `mapstructure:"resolver"` - RoutingKey string `mapstructure:"routing_key"` + Protocol Protocol `mapstructure:"protocol"` + Resolver ResolverSettings `mapstructure:"resolver"` + RoutingKey string `mapstructure:"routing_key"` + ResourceAttrKey string `mapstructure:"resource_attr_key"` } // Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment. diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 327bb6ca8f18..349b05d7b766 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" "go.opencensus.io/stats" "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" @@ -29,15 +30,14 @@ import ( "go.opentelemetry.io/collector/exporter/otlpexporter" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/multierr" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" ) var _ exporter.Traces = (*traceExporterImp)(nil) type traceExporterImp struct { - loadBalancer loadBalancer - routingKey routingKey + loadBalancer loadBalancer + routingKey routingKey + resourceAttrKey string stopped bool shutdownWg sync.WaitGroup @@ -60,6 +60,9 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t switch cfg.(*Config).RoutingKey { case "service": traceExporter.routingKey = svcRouting + case "resourceAttr": + traceExporter.routingKey = resourceAttrRouting + traceExporter.resourceAttrKey = cfg.(*Config).ResourceAttrKey case "traceID", "": default: return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey) @@ -73,6 +76,57 @@ func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { return oCfg } +func SplitTracesByResourceAttr(batch ptrace.Traces, attrKey string) []ptrace.Traces { + var result []ptrace.Traces + var strKey string + + for i := 0; i < batch.ResourceSpans().Len(); i++ { + rs := batch.ResourceSpans().At(i) + for j := 0; j < rs.ScopeSpans().Len(); j++ { + // the batches for this ILS + batches := map[string]ptrace.ResourceSpans{} + + key, ok := rs.Resource().Attributes().Get(attrKey) + + ils := rs.ScopeSpans().At(j) + for k := 0; k < ils.Spans().Len(); k++ { + span := ils.Spans().At(k) + + if !ok { + strKey = span.TraceID().String() + } else { + strKey = key.Str() + } + + // for the first traceID in the ILS, initialize the map entry + // and add the singleTraceBatch to the result list + if _, ok := batches[strKey]; !ok { + trace := ptrace.NewTraces() + newRS := trace.ResourceSpans().AppendEmpty() + // currently, the ResourceSpans implementation has only a Resource and an ILS. We'll copy the Resource + // and set our own ILS + rs.Resource().CopyTo(newRS.Resource()) + newRS.SetSchemaUrl(rs.SchemaUrl()) + newILS := newRS.ScopeSpans().AppendEmpty() + // currently, the ILS implementation has only an InstrumentationLibrary and spans. We'll copy the library + // and set our own spans + ils.Scope().CopyTo(newILS.Scope()) + newILS.SetSchemaUrl(ils.SchemaUrl()) + batches[strKey] = newRS + + result = append(result, trace) + } + + // there is only one instrumentation library per batch + tgt := batches[strKey].ScopeSpans().At(0).Spans().AppendEmpty() + span.CopyTo(tgt) + } + } + } + + return result +} + func (e *traceExporterImp) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } @@ -89,7 +143,12 @@ func (e *traceExporterImp) Shutdown(context.Context) error { func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { var errs error - batches := batchpersignal.SplitTraces(td) + var batches []ptrace.Traces + if e.routingKey == resourceAttrRouting { + batches = SplitTracesByResourceAttr(td, e.resourceAttrKey) + } else { + batches = batchpersignal.SplitTraces(td) + } for _, batch := range batches { errs = multierr.Append(errs, e.consumeTrace(ctx, batch)) } @@ -99,43 +158,54 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error { var exp component.Component - routingIds, err := routingIdentifiersFromTraces(td, e.routingKey) + var attrKey string + if e.routingKey == svcRouting || e.routingKey == traceIDRouting { + attrKey = "" + } else { + attrKey = e.resourceAttrKey + } + routingIds, err := routingIdentifiersFromTraces(td, e.routingKey, attrKey) + if err != nil { + return err + } + var rid string + for key := range routingIds { + rid = key + break + } + // for rid := range routingIds { + endpoint := e.loadBalancer.Endpoint([]byte(rid)) + exp, err = e.loadBalancer.Exporter(endpoint) if err != nil { return err } - for rid := range routingIds { - endpoint := e.loadBalancer.Endpoint([]byte(rid)) - exp, err = e.loadBalancer.Exporter(endpoint) - if err != nil { - return err - } - - te, ok := exp.(exporter.Traces) - if !ok { - return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp) - } - start := time.Now() - err = te.ConsumeTraces(ctx, td) - duration := time.Since(start) + te, ok := exp.(exporter.Traces) + if !ok { + return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp) + } - if err == nil { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, - mBackendLatency.M(duration.Milliseconds())) - } else { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, - mBackendLatency.M(duration.Milliseconds())) - } + start := time.Now() + err = te.ConsumeTraces(ctx, td) + duration := time.Since(start) + + if err == nil { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator}, + mBackendLatency.M(duration.Milliseconds())) + } else { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, + mBackendLatency.M(duration.Milliseconds())) } + // } return err } -func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]bool, error) { - ids := make(map[string]bool) +func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey string) (map[string][]int, error) { + ids := make(map[string][]int) rs := td.ResourceSpans() if rs.Len() == 0 { return nil, errors.New("empty resource spans") @@ -151,17 +221,31 @@ func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string] return nil, errors.New("empty spans") } - if key == svcRouting { + if routing == svcRouting || routing == resourceAttrRouting { + var attrKey string + if routing == svcRouting { + attrKey = "service.name" + } else { + attrKey = routeKey + } for i := 0; i < rs.Len(); i++ { - svc, ok := rs.At(i).Resource().Attributes().Get("service.name") + attr, ok := rs.At(i).Resource().Attributes().Get(attrKey) if !ok { - return nil, errors.New("unable to get service name") + return nil, errors.New("unable to get attribute") + } + _, exists := ids[attr.Str()] + if exists { + ids[attr.Str()] = []int{i} + } else { + ids[attr.Str()] = append(ids[attr.Str()], i) } - ids[svc.Str()] = true } return ids, nil } tid := spans.At(0).TraceID() - ids[string(tid[:])] = true + ids[string(tid[:])] = []int{} + for i := 0; i < rs.Len(); i++ { + ids[string(tid[:])] = append(ids[string(tid[:])], i) + } return ids, nil } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index d386b6ec0e2c..4b467be07a8e 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -206,23 +206,23 @@ func TestServiceBasedRoutingForSameTraceId(t *testing.T) { desc string batch ptrace.Traces routingKey routingKey - res map[string]bool + res map[string][]int }{ { "same trace id and different services - service based routing", twoServicesWithSameTraceID(), svcRouting, - map[string]bool{"ad-service-1": true, "get-recommendations-7": true}, + map[string][]int{"ad-service-1": {0}, "get-recommendations-7": {1}}, }, { "same trace id and different services - trace id routing", twoServicesWithSameTraceID(), traceIDRouting, - map[string]bool{string(b[:]): true}, + map[string][]int{string(b[:]): {0, 1}}, }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey) + res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, "") assert.Equal(t, err, nil) assert.Equal(t, res, tt.res) }) @@ -392,9 +392,9 @@ func TestNoTracesInBatch(t *testing.T) { }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey) + res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, "") assert.Equal(t, err, tt.err) - assert.Equal(t, res, map[string]bool(nil)) + assert.Equal(t, res, map[string][]int(nil)) }) } }