diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go index 00bd6693f8e9..b6f08227daf8 100644 --- a/exporter/loadbalancingexporter/config.go +++ b/exporter/loadbalancingexporter/config.go @@ -30,10 +30,10 @@ const ( // Config defines configuration for the exporter. type Config struct { - Protocol Protocol `mapstructure:"protocol"` - Resolver ResolverSettings `mapstructure:"resolver"` - RoutingKey string `mapstructure:"routing_key"` - ResourceAttrKey string `mapstructure:"resource_attr_key"` + Protocol Protocol `mapstructure:"protocol"` + Resolver ResolverSettings `mapstructure:"resolver"` + RoutingKey string `mapstructure:"routing_key"` + ResourceAttrKeys []string `mapstructure:"resource_attr_key"` } // Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment. diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index 349b05d7b766..24683fb155be 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -35,14 +35,16 @@ import ( var _ exporter.Traces = (*traceExporterImp)(nil) type traceExporterImp struct { - loadBalancer loadBalancer - routingKey routingKey - resourceAttrKey string + loadBalancer loadBalancer + routingKey routingKey + resourceAttrKeys []string stopped bool shutdownWg sync.WaitGroup } +type routingFunction func(ptrace.Traces) (map[string][]int, error) + // Create new traces exporter func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*traceExporterImp, error) { exporterFactory := otlpexporter.NewFactory() @@ -62,7 +64,7 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t traceExporter.routingKey = svcRouting case "resourceAttr": traceExporter.routingKey = resourceAttrRouting - traceExporter.resourceAttrKey = cfg.(*Config).ResourceAttrKey + traceExporter.resourceAttrKeys = cfg.(*Config).ResourceAttrKeys case "traceID", "": default: return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey) @@ -76,51 +78,59 @@ func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config { return oCfg } -func SplitTracesByResourceAttr(batch ptrace.Traces, attrKey string) []ptrace.Traces { - var result []ptrace.Traces - var strKey string - - for i := 0; i < batch.ResourceSpans().Len(); i++ { - rs := batch.ResourceSpans().At(i) - for j := 0; j < rs.ScopeSpans().Len(); j++ { - // the batches for this ILS - batches := map[string]ptrace.ResourceSpans{} - - key, ok := rs.Resource().Attributes().Get(attrKey) - - ils := rs.ScopeSpans().At(j) - for k := 0; k < ils.Spans().Len(); k++ { - span := ils.Spans().At(k) - - if !ok { - strKey = span.TraceID().String() - } else { - strKey = key.Str() - } - - // for the first traceID in the ILS, initialize the map entry - // and add the singleTraceBatch to the result list - if _, ok := batches[strKey]; !ok { - trace := ptrace.NewTraces() - newRS := trace.ResourceSpans().AppendEmpty() - // currently, the ResourceSpans implementation has only a Resource and an ILS. We'll copy the Resource - // and set our own ILS - rs.Resource().CopyTo(newRS.Resource()) - newRS.SetSchemaUrl(rs.SchemaUrl()) - newILS := newRS.ScopeSpans().AppendEmpty() - // currently, the ILS implementation has only an InstrumentationLibrary and spans. We'll copy the library - // and set our own spans - ils.Scope().CopyTo(newILS.Scope()) - newILS.SetSchemaUrl(ils.SchemaUrl()) - batches[strKey] = newRS - - result = append(result, trace) - } - - // there is only one instrumentation library per batch - tgt := batches[strKey].ScopeSpans().At(0).Spans().AppendEmpty() - span.CopyTo(tgt) +func SplitTracesByResourceAttr(batches ptrace.Traces, attrKeys []string) map[string][]ptrace.Traces { + // This code is based on the ConsumeTraces function of the batchperresourceattr + // modified to support multiple routing keys + fallback on the traceId when routing attr does not exist + result := make(map[string][]ptrace.Traces) + + rss := batches.ResourceSpans() + lenRss := rss.Len() + + if lenRss <= 1 { + return map[string][]ptrace.Traces{attrKeys[0]: {batches}} + } + + indicesByAttr := make(map[string]map[string][]int) + var fallbackIndices []int + var attrFound bool + for i := 0; i < lenRss; i++ { + rs := rss.At(i) + attrFound = false + for _, attrKey := range attrKeys { + var keyValue string + if attributeValue, ok := rs.Resource().Attributes().Get(attrKey); ok { + keyValue = attributeValue.Str() + indicesByAttr[attrKey][keyValue] = append(indicesByAttr[attrKey][keyValue], i) + attrFound = true + break + } + } + if !attrFound { + // These will be processed further to be split per traceID + fallbackIndices = append(fallbackIndices, i) + } + } + + if len(indicesByAttr) <= 1 && len(fallbackIndices) == 0 { + return map[string][]ptrace.Traces{"traceId": {batches}} + } + + for j := 0; j < len(fallbackIndices); j++ { + t := ptrace.NewTraces() + rs := rss.At(j) + rs.CopyTo(t.ResourceSpans().AppendEmpty()) + nt := batchpersignal.SplitTraces(t) + result["traceId"] = append(result["traceId"], nt...) + } + + for routeKey, routeKeyAttrs := range indicesByAttr { + for _, indices := range routeKeyAttrs { + tracesForAttr := ptrace.NewTraces() + for _, i := range indices { + rs := rss.At(i) + rs.CopyTo((tracesForAttr.ResourceSpans().AppendEmpty())) } + result[routeKey] = append(result[routeKey], tracesForAttr) } } @@ -143,37 +153,44 @@ func (e *traceExporterImp) Shutdown(context.Context) error { func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { var errs error - var batches []ptrace.Traces + var batches = make(map[string][]ptrace.Traces) if e.routingKey == resourceAttrRouting { - batches = SplitTracesByResourceAttr(td, e.resourceAttrKey) + batches = SplitTracesByResourceAttr(td, e.resourceAttrKeys) } else { - batches = batchpersignal.SplitTraces(td) + batches["traceId"] = batchpersignal.SplitTraces(td) } - for _, batch := range batches { - errs = multierr.Append(errs, e.consumeTrace(ctx, batch)) + rfs := make(map[string]routingFunction) + for key, _ := range batches { + if key == "traceId" { + rfs[key] = func(x ptrace.Traces) (map[string][]int, error) { + return routeByTraceId(x) + } + } else { + rfs[key] = func(x ptrace.Traces) (map[string][]int, error) { + return routeByResourceAttr(x, key) + } + } + } + for key, tb := range batches { + for _, t := range tb { + errs = multierr.Append(errs, e.consumeTrace(ctx, t, rfs[key])) + } } return errs } -func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error { +func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces, rf routingFunction) error { var exp component.Component - var attrKey string - if e.routingKey == svcRouting || e.routingKey == traceIDRouting { - attrKey = "" - } else { - attrKey = e.resourceAttrKey - } - routingIds, err := routingIdentifiersFromTraces(td, e.routingKey, attrKey) + routingIds, err := rf(td) if err != nil { return err } var rid string - for key := range routingIds { + for key, _ := range routingIds { rid = key break } - // for rid := range routingIds { endpoint := e.loadBalancer.Endpoint([]byte(rid)) exp, err = e.loadBalancer.Exporter(endpoint) if err != nil { @@ -200,48 +217,39 @@ func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) e []tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator}, mBackendLatency.M(duration.Milliseconds())) } - // } return err } -func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey string) (map[string][]int, error) { +func validateNotEmpty(td ptrace.Traces) error { + rs := td.ResourceSpans() + if rs.Len() == 0 { + return errors.New("empty resource spans") + } + ils := rs.At(0).ScopeSpans() + if ils.Len() == 0 { + return errors.New("empty scope spans") + } + spans := ils.At(0).Spans() + if spans.Len() == 0 { + return errors.New("empty spans") + } + return nil +} + +func routeByTraceId(td ptrace.Traces) (map[string][]int, error) { ids := make(map[string][]int) rs := td.ResourceSpans() if rs.Len() == 0 { return nil, errors.New("empty resource spans") } - ils := rs.At(0).ScopeSpans() if ils.Len() == 0 { return nil, errors.New("empty scope spans") } - spans := ils.At(0).Spans() if spans.Len() == 0 { return nil, errors.New("empty spans") } - - if routing == svcRouting || routing == resourceAttrRouting { - var attrKey string - if routing == svcRouting { - attrKey = "service.name" - } else { - attrKey = routeKey - } - for i := 0; i < rs.Len(); i++ { - attr, ok := rs.At(i).Resource().Attributes().Get(attrKey) - if !ok { - return nil, errors.New("unable to get attribute") - } - _, exists := ids[attr.Str()] - if exists { - ids[attr.Str()] = []int{i} - } else { - ids[attr.Str()] = append(ids[attr.Str()], i) - } - } - return ids, nil - } tid := spans.At(0).TraceID() ids[string(tid[:])] = []int{} for i := 0; i < rs.Len(); i++ { @@ -249,3 +257,22 @@ func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey } return ids, nil } + +func routeByResourceAttr(td ptrace.Traces, routeKey string) (map[string][]int, error) { + ids := make(map[string][]int) + err := validateNotEmpty(td) + if err != nil { + return nil, err + } + + rs := td.ResourceSpans() + for i := 0; i < rs.Len(); i++ { + attr, ok := rs.At(i).Resource().Attributes().Get(routeKey) + if !ok { + // If resource attribute is not found, falls back to Trace ID routing + return routeByTraceId(td) + } + ids[attr.Str()] = append(ids[attr.Str()], i) + } + return ids, nil +}