Skip to content

Commit

Permalink
use batchperresourceattr
Browse files Browse the repository at this point in the history
  • Loading branch information
zjanc committed Feb 17, 2023
1 parent 708f791 commit d8e68e2
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 124 deletions.
2 changes: 1 addition & 1 deletion exporter/loadbalancingexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func createDefaultConfig() component.Config {
}

func createTracesExporter(_ context.Context, params exporter.CreateSettings, cfg component.Config) (exporter.Traces, error) {
return newTracesExporter(params, cfg)
return newTracesExporter(params, cfg, newLoadBalancer)
}

func createLogsExporter(_ context.Context, params exporter.CreateSettings, cfg component.Config) (exporter.Logs, error) {
Expand Down
1 change: 1 addition & 0 deletions exporter/loadbalancingexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mostynb/go-grpc-compression v1.1.17 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.71.0
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
Expand Down
2 changes: 2 additions & 0 deletions exporter/loadbalancingexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion exporter/loadbalancingexporter/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestStartFailureStaticResolver(t *testing.T) {
func TestLoadBalancerShutdown(t *testing.T) {
// prepare
cfg := simpleConfig()
p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg)
p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg, newLoadBalancer)
require.NotNil(t, p)
require.NoError(t, err)

Expand Down
124 changes: 41 additions & 83 deletions exporter/loadbalancingexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
Expand All @@ -32,7 +33,11 @@ import (
"go.uber.org/multierr"
)

var _ exporter.Traces = (*traceExporterImp)(nil)
type baseTracesExporter struct {
component.Component
consumer.Traces
routingKey routingKey
}

type traceExporterImp struct {
loadBalancer loadBalancer
Expand All @@ -43,11 +48,13 @@ type traceExporterImp struct {
shutdownWg sync.WaitGroup
}

type LoadBalancerGenerator func(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancerImp, error)

// Create new traces exporter
func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*traceExporterImp, error) {
func newTracesExporter(params exporter.CreateSettings, cfg component.Config, lbf LoadBalancerGenerator) (exporter.Traces, error) {
exporterFactory := otlpexporter.NewFactory()

lb, err := newLoadBalancer(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) {
lb, err := lbf(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) {
oCfg := buildExporterConfig(cfg.(*Config), endpoint)
return exporterFactory.CreateTracesExporter(ctx, params, &oCfg)
})
Expand All @@ -59,10 +66,21 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t

switch cfg.(*Config).RoutingKey {
case "service":
traceExporter.routingKey = svcRouting
traceExporter.routingKey = resourceAttrRouting
traceExporter.resourceAttrKey = "service.name"
return &baseTracesExporter{
Component: &traceExporter,
Traces: batchperresourceattr.NewBatchPerResourceTraces(traceExporter.resourceAttrKey, &traceExporter),
routingKey: svcRouting,
}, nil
case "resourceAttr":
traceExporter.routingKey = resourceAttrRouting
traceExporter.resourceAttrKey = cfg.(*Config).ResourceAttrKey
return &baseTracesExporter{
Component: &traceExporter,
Traces: batchperresourceattr.NewBatchPerResourceTraces(traceExporter.resourceAttrKey, &traceExporter),
routingKey: resourceAttrRouting,
}, nil
case "traceID", "":
default:
return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey)
Expand All @@ -76,57 +94,6 @@ func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config {
return oCfg
}

func SplitTracesByResourceAttr(batch ptrace.Traces, attrKey string) []ptrace.Traces {
var result []ptrace.Traces
var strKey string

for i := 0; i < batch.ResourceSpans().Len(); i++ {
rs := batch.ResourceSpans().At(i)
for j := 0; j < rs.ScopeSpans().Len(); j++ {
// the batches for this ILS
batches := map[string]ptrace.ResourceSpans{}

key, ok := rs.Resource().Attributes().Get(attrKey)

ils := rs.ScopeSpans().At(j)
for k := 0; k < ils.Spans().Len(); k++ {
span := ils.Spans().At(k)

if !ok {
strKey = span.TraceID().String()
} else {
strKey = key.Str()
}

// for the first traceID in the ILS, initialize the map entry
// and add the singleTraceBatch to the result list
if _, ok := batches[strKey]; !ok {
trace := ptrace.NewTraces()
newRS := trace.ResourceSpans().AppendEmpty()
// currently, the ResourceSpans implementation has only a Resource and an ILS. We'll copy the Resource
// and set our own ILS
rs.Resource().CopyTo(newRS.Resource())
newRS.SetSchemaUrl(rs.SchemaUrl())
newILS := newRS.ScopeSpans().AppendEmpty()
// currently, the ILS implementation has only an InstrumentationLibrary and spans. We'll copy the library
// and set our own spans
ils.Scope().CopyTo(newILS.Scope())
newILS.SetSchemaUrl(ils.SchemaUrl())
batches[strKey] = newRS

result = append(result, trace)
}

// there is only one instrumentation library per batch
tgt := batches[strKey].ScopeSpans().At(0).Spans().AppendEmpty()
span.CopyTo(tgt)
}
}
}

return result
}

func (e *traceExporterImp) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
Expand All @@ -143,12 +110,7 @@ func (e *traceExporterImp) Shutdown(context.Context) error {

func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
var errs error
var batches []ptrace.Traces
if e.routingKey == resourceAttrRouting {
batches = SplitTracesByResourceAttr(td, e.resourceAttrKey)
} else {
batches = batchpersignal.SplitTraces(td)
}
batches := batchpersignal.SplitTraces(td)
for _, batch := range batches {
errs = multierr.Append(errs, e.consumeTrace(ctx, batch))
}
Expand All @@ -159,10 +121,10 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces)
func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error {
var exp component.Component
var attrKey string
if e.routingKey == svcRouting || e.routingKey == traceIDRouting {
attrKey = ""
} else {
if e.routingKey == svcRouting || e.routingKey == resourceAttrRouting {
attrKey = e.resourceAttrKey
} else {
attrKey = ""
}
routingIds, err := routingIdentifiersFromTraces(td, e.routingKey, attrKey)
if err != nil {
Expand All @@ -173,7 +135,6 @@ func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) e
rid = key
break
}
// for rid := range routingIds {
endpoint := e.loadBalancer.Endpoint([]byte(rid))
exp, err = e.loadBalancer.Exporter(endpoint)
if err != nil {
Expand All @@ -200,12 +161,12 @@ func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) e
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator},
mBackendLatency.M(duration.Milliseconds()))
}
// }
return err
}

// This function should receive a Trace with a single unique value for the routingKey
func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey string) (map[string][]int, error) {
ids := make(map[string][]int)
keys := make(map[string][]int)
rs := td.ResourceSpans()
if rs.Len() == 0 {
return nil, errors.New("empty resource spans")
Expand All @@ -222,30 +183,27 @@ func routingIdentifiersFromTraces(td ptrace.Traces, routing routingKey, routeKey
}

if routing == svcRouting || routing == resourceAttrRouting {
var attrKey string
if routing == svcRouting {
attrKey = "service.name"
} else {
attrKey = routeKey
}
for i := 0; i < rs.Len(); i++ {
attr, ok := rs.At(i).Resource().Attributes().Get(attrKey)
attr, ok := rs.At(i).Resource().Attributes().Get(routeKey)
if !ok {
return nil, errors.New("unable to get attribute")
return nil, fmt.Errorf("unable to get routing attribute: %s, %d", routeKey, routing)
}
_, exists := ids[attr.Str()]
_, exists := keys[attr.Str()]
if exists {
ids[attr.Str()] = []int{i}
keys[attr.Str()] = []int{i}
} else {
ids[attr.Str()] = append(ids[attr.Str()], i)
keys[attr.Str()] = append(keys[attr.Str()], i)
}
}
return ids, nil
if len(keys) != 1 {
return nil, errors.New("batch of traces include multiple values of the routing attribute")
}
return keys, nil
}
tid := spans.At(0).TraceID()
ids[string(tid[:])] = []int{}
tid := spans.At(0).TraceID().String()
keys[tid] = []int{}
for i := 0; i < rs.Len(); i++ {
ids[string(tid[:])] = append(ids[string(tid[:])], i)
keys[tid] = append(keys[tid], i)
}
return ids, nil
return keys, nil
}
Loading

0 comments on commit d8e68e2

Please sign in to comment.