Skip to content

Commit

Permalink
add support for routing by multiple attrs and falling back on trace id
Browse files Browse the repository at this point in the history
  • Loading branch information
zjanc committed Feb 17, 2023
1 parent a8447f8 commit c3a8230
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 93 deletions.
8 changes: 4 additions & 4 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ const (

// Config defines configuration for the exporter.
type Config struct {
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
ResourceAttrKey string `mapstructure:"resource_attr_key"`
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
ResourceAttrKeys []string `mapstructure:"resource_attr_key"`
}

// Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment.
Expand Down
205 changes: 116 additions & 89 deletions exporter/loadbalancingexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ import (
var _ exporter.Traces = (*traceExporterImp)(nil)

type traceExporterImp struct {
loadBalancer loadBalancer
routingKey routingKey
resourceAttrKey string
loadBalancer loadBalancer
routingKey routingKey
resourceAttrKeys []string

stopped bool
shutdownWg sync.WaitGroup
}

type routingFunction func(ptrace.Traces) (map[string][]int, error)

// Create new traces exporter
func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*traceExporterImp, error) {
exporterFactory := otlpexporter.NewFactory()
Expand All @@ -62,7 +64,7 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t
traceExporter.routingKey = svcRouting
case "resourceAttr":
traceExporter.routingKey = resourceAttrRouting
traceExporter.resourceAttrKey = cfg.(*Config).ResourceAttrKey
traceExporter.resourceAttrKeys = cfg.(*Config).ResourceAttrKeys
case "traceID", "":
default:
return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey)
Expand All @@ -76,51 +78,59 @@ func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config {
return oCfg
}

func SplitTracesByResourceAttr(batch ptrace.Traces, attrKey string) []ptrace.Traces {
var result []ptrace.Traces
var strKey string

for i := 0; i < batch.ResourceSpans().Len(); i++ {
rs := batch.ResourceSpans().At(i)
for j := 0; j < rs.ScopeSpans().Len(); j++ {
// the batches for this ILS
batches := map[string]ptrace.ResourceSpans{}

key, ok := rs.Resource().Attributes().Get(attrKey)

ils := rs.ScopeSpans().At(j)
for k := 0; k < ils.Spans().Len(); k++ {
span := ils.Spans().At(k)

if !ok {
strKey = span.TraceID().String()
} else {
strKey = key.Str()
}

// for the first traceID in the ILS, initialize the map entry
// and add the singleTraceBatch to the result list
if _, ok := batches[strKey]; !ok {
trace := ptrace.NewTraces()
newRS := trace.ResourceSpans().AppendEmpty()
// currently, the ResourceSpans implementation has only a Resource and an ILS. We'll copy the Resource
// and set our own ILS
rs.Resource().CopyTo(newRS.Resource())
newRS.SetSchemaUrl(rs.SchemaUrl())
newILS := newRS.ScopeSpans().AppendEmpty()
// currently, the ILS implementation has only an InstrumentationLibrary and spans. We'll copy the library
// and set our own spans
ils.Scope().CopyTo(newILS.Scope())
newILS.SetSchemaUrl(ils.SchemaUrl())
batches[strKey] = newRS

result = append(result, trace)
}

// there is only one instrumentation library per batch
tgt := batches[strKey].ScopeSpans().At(0).Spans().AppendEmpty()
span.CopyTo(tgt)
func SplitTracesByResourceAttr(batches ptrace.Traces, attrKeys []string) map[string][]ptrace.Traces {
// This code is based on the ConsumeTraces function of the batchperresourceattr
// modified to support multiple routing keys + fallback on the traceId when routing attr does not exist
result := make(map[string][]ptrace.Traces)

rss := batches.ResourceSpans()
lenRss := rss.Len()

if lenRss <= 1 {
return map[string][]ptrace.Traces{attrKeys[0]: {batches}}
}

indicesByAttr := make(map[string]map[string][]int)
var fallbackIndices []int
var attrFound bool
for i := 0; i < lenRss; i++ {
rs := rss.At(i)
attrFound = false
for _, attrKey := range attrKeys {
var keyValue string
if attributeValue, ok := rs.Resource().Attributes().Get(attrKey); ok {
keyValue = attributeValue.Str()
indicesByAttr[attrKey][keyValue] = append(indicesByAttr[attrKey][keyValue], i)
attrFound = true
break
}
}
if !attrFound {
// These will be processed further to be split per traceID
fallbackIndices = append(fallbackIndices, i)
}
}

if len(indicesByAttr) <= 1 && len(fallbackIndices) == 0 {
return map[string][]ptrace.Traces{"traceId": {batches}}
}

for j := 0; j < len(fallbackIndices); j++ {
t := ptrace.NewTraces()
rs := rss.At(j)
rs.CopyTo(t.ResourceSpans().AppendEmpty())
nt := batchpersignal.SplitTraces(t)
result["traceId"] = append(result["traceId"], nt...)
}

for routeKey, routeKeyAttrs := range indicesByAttr {
for _, indices := range routeKeyAttrs {
tracesForAttr := ptrace.NewTraces()
for _, i := range indices {
rs := rss.At(i)
rs.CopyTo((tracesForAttr.ResourceSpans().AppendEmpty()))
}
result[routeKey] = append(result[routeKey], tracesForAttr)
}
}

Expand All @@ -143,37 +153,44 @@ func (e *traceExporterImp) Shutdown(context.Context) error {

func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
var errs error
var batches []ptrace.Traces
var batches = make(map[string][]ptrace.Traces)
if e.routingKey == resourceAttrRouting {
batches = SplitTracesByResourceAttr(td, e.resourceAttrKey)
batches = SplitTracesByResourceAttr(td, e.resourceAttrKeys)
} else {
batches = batchpersignal.SplitTraces(td)
batches["traceId"] = batchpersignal.SplitTraces(td)
}
for _, batch := range batches {
errs = multierr.Append(errs, e.consumeTrace(ctx, batch))
rfs := make(map[string]routingFunction)
for key, _ := range batches {
if key == "traceId" {
rfs[key] = func(x ptrace.Traces) (map[string][]int, error) {
return routeByTraceId(x)
}
} else {
rfs[key] = func(x ptrace.Traces) (map[string][]int, error) {
return routeByResourceAttr(x, key)
}
}
}
for key, tb := range batches {
for _, t := range tb {
errs = multierr.Append(errs, e.consumeTrace(ctx, t, rfs[key]))
}
}

return errs
}

func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error {
func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces, rf routingFunction) error {
var exp component.Component
var attrKey string
if e.routingKey == svcRouting || e.routingKey == traceIDRouting {
attrKey = ""
} else {
attrKey = e.resourceAttrKey
}
routingIds, err := routingIdentifiersFromTraces(td, e.routingKey, attrKey)
routingIds, err := rf(td)
if err != nil {
return err
}
var rid string
for key := range routingIds {
for key, _ := range routingIds {
rid = key
break
}
// for rid := range routingIds {
endpoint := e.loadBalancer.Endpoint([]byte(rid))
exp, err = e.loadBalancer.Exporter(endpoint)
if err != nil {
Expand All @@ -200,52 +217,62 @@ func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) e
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator},
mBackendLatency.M(duration.Milliseconds()))
}
// }
return err
}

func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey string) (map[string][]int, error) {
func validateNotEmpty(td ptrace.Traces) error {
rs := td.ResourceSpans()
if rs.Len() == 0 {
return errors.New("empty resource spans")
}
ils := rs.At(0).ScopeSpans()
if ils.Len() == 0 {
return errors.New("empty scope spans")
}
spans := ils.At(0).Spans()
if spans.Len() == 0 {
return errors.New("empty spans")
}
return nil
}

func routeByTraceId(td ptrace.Traces) (map[string][]int, error) {
ids := make(map[string][]int)
rs := td.ResourceSpans()
if rs.Len() == 0 {
return nil, errors.New("empty resource spans")
}

ils := rs.At(0).ScopeSpans()
if ils.Len() == 0 {
return nil, errors.New("empty scope spans")
}

spans := ils.At(0).Spans()
if spans.Len() == 0 {
return nil, errors.New("empty spans")
}

if routing == svcRouting || routing == resourceAttrRouting {
var attrKey string
if routing == svcRouting {
attrKey = "service.name"
} else {
attrKey = routeKey
}
for i := 0; i < rs.Len(); i++ {
attr, ok := rs.At(i).Resource().Attributes().Get(attrKey)
if !ok {
return nil, errors.New("unable to get attribute")
}
_, exists := ids[attr.Str()]
if exists {
ids[attr.Str()] = []int{i}
} else {
ids[attr.Str()] = append(ids[attr.Str()], i)
}
}
return ids, nil
}
tid := spans.At(0).TraceID()
ids[string(tid[:])] = []int{}
for i := 0; i < rs.Len(); i++ {
ids[string(tid[:])] = append(ids[string(tid[:])], i)
}
return ids, nil
}

func routeByResourceAttr(td ptrace.Traces, routeKey string) (map[string][]int, error) {
ids := make(map[string][]int)
err := validateNotEmpty(td)
if err != nil {
return nil, err
}

rs := td.ResourceSpans()
for i := 0; i < rs.Len(); i++ {
attr, ok := rs.At(i).Resource().Attributes().Get(routeKey)
if !ok {
// If resource attribute is not found, falls back to Trace ID routing
return routeByTraceId(td)
}
ids[attr.Str()] = append(ids[attr.Str()], i)
}
return ids, nil
}

0 comments on commit c3a8230

Please sign in to comment.