Skip to content

Commit

Permalink
Add support for load balancing via arbitrary resource attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
zjanc committed Feb 16, 2023
1 parent eedb371 commit 708f791
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 47 deletions.
8 changes: 5 additions & 3 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ type routingKey int
const (
traceIDRouting routingKey = iota
svcRouting
resourceAttrRouting
)

// Config defines configuration for the exporter.
type Config struct {
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
ResourceAttrKey string `mapstructure:"resource_attr_key"`
}

// Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment.
Expand Down
160 changes: 122 additions & 38 deletions exporter/loadbalancingexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
Expand All @@ -29,15 +30,14 @@ import (
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
)

var _ exporter.Traces = (*traceExporterImp)(nil)

type traceExporterImp struct {
loadBalancer loadBalancer
routingKey routingKey
loadBalancer loadBalancer
routingKey routingKey
resourceAttrKey string

stopped bool
shutdownWg sync.WaitGroup
Expand All @@ -60,6 +60,9 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t
switch cfg.(*Config).RoutingKey {
case "service":
traceExporter.routingKey = svcRouting
case "resourceAttr":
traceExporter.routingKey = resourceAttrRouting
traceExporter.resourceAttrKey = cfg.(*Config).ResourceAttrKey
case "traceID", "":
default:
return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey)
Expand All @@ -73,6 +76,57 @@ func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config {
return oCfg
}

func SplitTracesByResourceAttr(batch ptrace.Traces, attrKey string) []ptrace.Traces {
var result []ptrace.Traces
var strKey string

for i := 0; i < batch.ResourceSpans().Len(); i++ {
rs := batch.ResourceSpans().At(i)
for j := 0; j < rs.ScopeSpans().Len(); j++ {
// the batches for this ILS
batches := map[string]ptrace.ResourceSpans{}

key, ok := rs.Resource().Attributes().Get(attrKey)

ils := rs.ScopeSpans().At(j)
for k := 0; k < ils.Spans().Len(); k++ {
span := ils.Spans().At(k)

if !ok {
strKey = span.TraceID().String()
} else {
strKey = key.Str()
}

// for the first traceID in the ILS, initialize the map entry
// and add the singleTraceBatch to the result list
if _, ok := batches[strKey]; !ok {
trace := ptrace.NewTraces()
newRS := trace.ResourceSpans().AppendEmpty()
// currently, the ResourceSpans implementation has only a Resource and an ILS. We'll copy the Resource
// and set our own ILS
rs.Resource().CopyTo(newRS.Resource())
newRS.SetSchemaUrl(rs.SchemaUrl())
newILS := newRS.ScopeSpans().AppendEmpty()
// currently, the ILS implementation has only an InstrumentationLibrary and spans. We'll copy the library
// and set our own spans
ils.Scope().CopyTo(newILS.Scope())
newILS.SetSchemaUrl(ils.SchemaUrl())
batches[strKey] = newRS

result = append(result, trace)
}

// there is only one instrumentation library per batch
tgt := batches[strKey].ScopeSpans().At(0).Spans().AppendEmpty()
span.CopyTo(tgt)
}
}
}

return result
}

func (e *traceExporterImp) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
Expand All @@ -89,7 +143,12 @@ func (e *traceExporterImp) Shutdown(context.Context) error {

func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
var errs error
batches := batchpersignal.SplitTraces(td)
var batches []ptrace.Traces
if e.routingKey == resourceAttrRouting {
batches = SplitTracesByResourceAttr(td, e.resourceAttrKey)
} else {
batches = batchpersignal.SplitTraces(td)
}
for _, batch := range batches {
errs = multierr.Append(errs, e.consumeTrace(ctx, batch))
}
Expand All @@ -99,43 +158,54 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces)

func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error {
var exp component.Component
routingIds, err := routingIdentifiersFromTraces(td, e.routingKey)
var attrKey string
if e.routingKey == svcRouting || e.routingKey == traceIDRouting {
attrKey = ""
} else {
attrKey = e.resourceAttrKey
}
routingIds, err := routingIdentifiersFromTraces(td, e.routingKey, attrKey)
if err != nil {
return err
}
var rid string
for key := range routingIds {
rid = key
break
}
// for rid := range routingIds {
endpoint := e.loadBalancer.Endpoint([]byte(rid))
exp, err = e.loadBalancer.Exporter(endpoint)
if err != nil {
return err
}
for rid := range routingIds {
endpoint := e.loadBalancer.Endpoint([]byte(rid))
exp, err = e.loadBalancer.Exporter(endpoint)
if err != nil {
return err
}

te, ok := exp.(exporter.Traces)
if !ok {
return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp)
}

start := time.Now()
err = te.ConsumeTraces(ctx, td)
duration := time.Since(start)
te, ok := exp.(exporter.Traces)
if !ok {
return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp)
}

if err == nil {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator},
mBackendLatency.M(duration.Milliseconds()))
} else {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator},
mBackendLatency.M(duration.Milliseconds()))
}
start := time.Now()
err = te.ConsumeTraces(ctx, td)
duration := time.Since(start)

if err == nil {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator},
mBackendLatency.M(duration.Milliseconds()))
} else {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator},
mBackendLatency.M(duration.Milliseconds()))
}
// }
return err
}

func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]bool, error) {
ids := make(map[string]bool)
func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey string) (map[string][]int, error) {
ids := make(map[string][]int)
rs := td.ResourceSpans()
if rs.Len() == 0 {
return nil, errors.New("empty resource spans")
Expand All @@ -151,17 +221,31 @@ func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]
return nil, errors.New("empty spans")
}

if key == svcRouting {
if routing == svcRouting || routing == resourceAttrRouting {
var attrKey string
if routing == svcRouting {
attrKey = "service.name"
} else {
attrKey = routeKey
}
for i := 0; i < rs.Len(); i++ {
svc, ok := rs.At(i).Resource().Attributes().Get("service.name")
attr, ok := rs.At(i).Resource().Attributes().Get(attrKey)
if !ok {
return nil, errors.New("unable to get service name")
return nil, errors.New("unable to get attribute")
}
_, exists := ids[attr.Str()]
if exists {
ids[attr.Str()] = []int{i}
} else {
ids[attr.Str()] = append(ids[attr.Str()], i)
}
ids[svc.Str()] = true
}
return ids, nil
}
tid := spans.At(0).TraceID()
ids[string(tid[:])] = true
ids[string(tid[:])] = []int{}
for i := 0; i < rs.Len(); i++ {
ids[string(tid[:])] = append(ids[string(tid[:])], i)
}
return ids, nil
}
12 changes: 6 additions & 6 deletions exporter/loadbalancingexporter/trace_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,23 +206,23 @@ func TestServiceBasedRoutingForSameTraceId(t *testing.T) {
desc string
batch ptrace.Traces
routingKey routingKey
res map[string]bool
res map[string][]int
}{
{
"same trace id and different services - service based routing",
twoServicesWithSameTraceID(),
svcRouting,
map[string]bool{"ad-service-1": true, "get-recommendations-7": true},
map[string][]int{"ad-service-1": {0}, "get-recommendations-7": {1}},
},
{
"same trace id and different services - trace id routing",
twoServicesWithSameTraceID(),
traceIDRouting,
map[string]bool{string(b[:]): true},
map[string][]int{string(b[:]): {0, 1}},
},
} {
t.Run(tt.desc, func(t *testing.T) {
res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey)
res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, "")
assert.Equal(t, err, nil)
assert.Equal(t, res, tt.res)
})
Expand Down Expand Up @@ -392,9 +392,9 @@ func TestNoTracesInBatch(t *testing.T) {
},
} {
t.Run(tt.desc, func(t *testing.T) {
res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey)
res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, "")
assert.Equal(t, err, tt.err)
assert.Equal(t, res, map[string]bool(nil))
assert.Equal(t, res, map[string][]int(nil))
})
}
}
Expand Down

0 comments on commit 708f791

Please sign in to comment.