Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OTE-1506] Loadbalancer exporter: Add resource_keys routing for the traces #35970

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ const (
svcRouting
metricNameRouting
resourceRouting
resourceKeysRouting
)

// Config defines configuration for the exporter.
type Config struct {
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
ResourceKeys []string `mapstructure:"resource_keys"`
}

// Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment.
Expand Down
42 changes: 29 additions & 13 deletions exporter/loadbalancingexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ var _ exporter.Traces = (*traceExporterImp)(nil)
type exporterTraces map[*wrappedExporter]ptrace.Traces

type traceExporterImp struct {
loadBalancer *loadBalancer
routingKey routingKey
loadBalancer *loadBalancer
routingKey routingKey
routingResourceKeys []string

stopped bool
shutdownWg sync.WaitGroup
Expand All @@ -38,8 +39,9 @@ type traceExporterImp struct {
func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*traceExporterImp, error) {
exporterFactory := otlpexporter.NewFactory()

eCfg := cfg.(*Config)
lb, err := newLoadBalancer(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) {
oCfg := buildExporterConfig(cfg.(*Config), endpoint)
oCfg := buildExporterConfig(eCfg, endpoint)
return exporterFactory.CreateTracesExporter(ctx, params, &oCfg)
})
if err != nil {
Expand All @@ -48,9 +50,13 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t

traceExporter := traceExporterImp{loadBalancer: lb, routingKey: traceIDRouting}

switch cfg.(*Config).RoutingKey {
switch eCfg.RoutingKey {
case "service":
traceExporter.routingKey = svcRouting
traceExporter.routingKey = resourceKeysRouting
traceExporter.routingResourceKeys = []string{"service.name"}
case "resource":
traceExporter.routingKey = resourceKeysRouting
traceExporter.routingResourceKeys = eCfg.ResourceKeys
case "traceID", "":
default:
return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey)
Expand Down Expand Up @@ -85,7 +91,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces)
exporterSegregatedTraces := make(exporterTraces)
endpoints := make(map[*wrappedExporter]string)
for _, batch := range batches {
routingID, err := routingIdentifiersFromTraces(batch, e.routingKey)
routingID, err := e.routingIdentifiersFromTraces(batch)
if err != nil {
return err
}
Expand Down Expand Up @@ -132,7 +138,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces)
return errs
}

func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]bool, error) {
func (e *traceExporterImp) routingIdentifiersFromTraces(td ptrace.Traces) (map[string]bool, error) {
ids := make(map[string]bool)
rs := td.ResourceSpans()
if rs.Len() == 0 {
Expand All @@ -149,15 +155,25 @@ func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]
return nil, errors.New("empty spans")
}

if key == svcRouting {
if e.routingKey == resourceKeysRouting {
var missingResourceKey bool
for i := 0; i < rs.Len(); i++ {
svc, ok := rs.At(i).Resource().Attributes().Get("service.name")
if !ok {
return nil, errors.New("unable to get service name")
var resourceKeyFound bool
rsi := rs.At(i)
for _, attrKey := range e.routingResourceKeys {
if v, ok := rsi.Resource().Attributes().Get(attrKey); ok {
ids[v.AsString()] = true
resourceKeyFound = true
break
}
}
if !resourceKeyFound {
missingResourceKey = true
}
ids[svc.Str()] = true
}
return ids, nil
if !missingResourceKey {
return ids, nil
}
}
tid := spans.At(0).TraceID()
ids[string(tid[:])] = true
Expand Down
189 changes: 169 additions & 20 deletions exporter/loadbalancingexporter/trace_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,52 @@ import (

func TestNewTracesExporter(t *testing.T) {
for _, tt := range []struct {
desc string
config *Config
err error
desc string
config *Config
wantRoutingKey routingKey
wantRoutingResourceKeys []string
err error
}{
{
"simple",
simpleConfig(),
traceIDRouting,
nil,
nil,
},
{
"service",
serviceBasedRoutingConfig(),
resourceKeysRouting,
[]string{conventions.AttributeServiceName},
nil,
},
{
"resource_keys",
resourceKeysBasedRoutingConfig(),
resourceKeysRouting,
[]string{"resource.key_1", "resource.key_2"},
nil,
},
{
"empty",
&Config{},
0,
nil,
errNoResolver,
},
} {
t.Run(tt.desc, func(t *testing.T) {
// test
_, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config)
te, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config)

// verify
require.Equal(t, tt.err, err)
if err != nil {
return
}
require.Equal(t, tt.wantRoutingKey, te.routingKey)
require.Equal(t, tt.wantRoutingResourceKeys, te.routingResourceKeys)
})
}
}
Expand Down Expand Up @@ -219,7 +244,8 @@ func TestConsumeTracesServiceBased(t *testing.T) {
p, err := newTracesExporter(exportertest.NewNopCreateSettings(), serviceBasedRoutingConfig())
require.NotNil(t, p)
require.NoError(t, err)
assert.Equal(t, p.routingKey, svcRouting)
assert.Equal(t, p.routingKey, resourceKeysRouting)
assert.Equal(t, p.routingResourceKeys, []string{"service.name"})

// pre-load an exporter here, so that we don't use the actual OTLP exporter
lb.addMissingExporters(context.Background(), []string{"endpoint-1"})
Expand All @@ -245,29 +271,113 @@ func TestConsumeTracesServiceBased(t *testing.T) {
assert.Nil(t, res)
}

func TestConsumeTracesResourceKeysBased(t *testing.T) {
componentFactory := func(_ context.Context, _ string) (component.Component, error) {
return newNopMockTracesExporter(), nil
}
lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), resourceKeysBasedRoutingConfig(), componentFactory)
require.NotNil(t, lb)
require.NoError(t, err)

p, err := newTracesExporter(exportertest.NewNopCreateSettings(), resourceKeysBasedRoutingConfig())
require.NotNil(t, p)
require.NoError(t, err)
assert.Equal(t, p.routingKey, resourceKeysRouting)
assert.Equal(t, p.routingResourceKeys, []string{"resource.key_1", "resource.key_2"})

// pre-load an exporter here, so that we don't use the actual OTLP exporter
lb.addMissingExporters(context.Background(), []string{"endpoint-1"})
lb.addMissingExporters(context.Background(), []string{"endpoint-2"})
lb.res = &mockResolver{
triggerCallbacks: true,
onResolve: func(_ context.Context) ([]string, error) {
return []string{"endpoint-1", "endpoint-2"}, nil
},
}
p.loadBalancer = lb

err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, p.Shutdown(context.Background()))
}()

// test
res := p.ConsumeTraces(context.Background(), simpleTracesWithResourceKeys())

// verify
assert.Nil(t, res)
}

func TestServiceBasedRoutingForSameTraceId(t *testing.T) {
b := pcommon.TraceID([16]byte{1, 2, 3, 4})
for _, tt := range []struct {
desc string
batch ptrace.Traces
routingKey routingKey
res map[string]bool
te *traceExporterImp
desc string
batch ptrace.Traces
res map[string]bool
}{
{
&traceExporterImp{
routingKey: resourceKeysRouting,
routingResourceKeys: []string{"service.name"},
},
"same trace id and different services - service based routing",
twoServicesWithSameTraceID(),
svcRouting,
map[string]bool{"ad-service-1": true, "get-recommendations-7": true},
},
{
&traceExporterImp{
routingKey: traceIDRouting,
},
"same trace id and different services - trace id routing",
twoServicesWithSameTraceID(),
traceIDRouting,
map[string]bool{string(b[:]): true},
},
} {
t.Run(tt.desc, func(t *testing.T) {
res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey)
res, err := tt.te.routingIdentifiersFromTraces(tt.batch)
assert.Equal(t, err, nil)
assert.Equal(t, res, tt.res)
})
}
}

func TestResourceKeysBasedRoutingIdentifiers(t *testing.T) {
b := pcommon.TraceID([16]byte{1, 2, 3, 4})
for _, tt := range []struct {
te *traceExporterImp
desc string
batch ptrace.Traces
res map[string]bool
}{
{
&traceExporterImp{
routingKey: resourceKeysRouting,
routingResourceKeys: []string{"resource.key_1", "resource.key_2"},
},
"two resource_keys values",
simpleTracesWithResourceKeys(),
map[string]bool{
"val-1": true,
"val-2": true,
},
},
{
&traceExporterImp{
routingKey: resourceKeysRouting,
routingResourceKeys: []string{"resource.key_1"},
},
"single resource_keys value with trace ID as default",
simpleTracesWithResourceKeys(),
map[string]bool{
"val-1": true,
string(b[:]): true,
},
},
} {
t.Run(tt.desc, func(t *testing.T) {
res, err := tt.te.routingIdentifiersFromTraces(tt.batch)
assert.Equal(t, err, nil)
assert.Equal(t, res, tt.res)
})
Expand Down Expand Up @@ -405,40 +515,46 @@ func TestBatchWithTwoTraces(t *testing.T) {

func TestNoTracesInBatch(t *testing.T) {
for _, tt := range []struct {
desc string
batch ptrace.Traces
routingKey routingKey
err error
te *traceExporterImp
desc string
batch ptrace.Traces
err error
}{
{
&traceExporterImp{
routingKey: svcRouting,
},
"no resource spans",
ptrace.NewTraces(),
traceIDRouting,
errors.New("empty resource spans"),
},
{
&traceExporterImp{
routingKey: traceIDRouting,
},
"no instrumentation library spans",
func() ptrace.Traces {
batch := ptrace.NewTraces()
batch.ResourceSpans().AppendEmpty()
return batch
}(),
traceIDRouting,
errors.New("empty scope spans"),
},
{
&traceExporterImp{
routingKey: svcRouting,
},
"no spans",
func() ptrace.Traces {
batch := ptrace.NewTraces()
batch.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty()
return batch
}(),
svcRouting,
errors.New("empty spans"),
},
} {
t.Run(tt.desc, func(t *testing.T) {
res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey)
res, err := tt.te.routingIdentifiersFromTraces(tt.batch)
assert.Equal(t, err, tt.err)
assert.Equal(t, res, map[string]bool(nil))
})
Expand Down Expand Up @@ -684,6 +800,29 @@ func simpleTraces() ptrace.Traces {
return traces
}

func simpleTracesWithResourceKeys() ptrace.Traces {
traces := ptrace.NewTraces()
traces.ResourceSpans().EnsureCapacity(1)

rSpans := traces.ResourceSpans().AppendEmpty()
rAttrs := rSpans.Resource().Attributes()
rAttrs.PutStr("resource.key_1", "val-1")
rSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4})

rSpans = traces.ResourceSpans().AppendEmpty()
rAttrs = rSpans.Resource().Attributes()
rAttrs.PutStr("resource.key_2", "val-2")
rSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4})

rSpans = traces.ResourceSpans().AppendEmpty()
rAttrs = rSpans.Resource().Attributes()
rAttrs.PutStr("resource.key_1", "val-1")
rAttrs.PutStr("resource.key_2", "val-2")
rSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4})

return traces
}

func simpleTracesWithServiceName() ptrace.Traces {
traces := ptrace.NewTraces()
traces.ResourceSpans().EnsureCapacity(1)
Expand Down Expand Up @@ -736,6 +875,16 @@ func serviceBasedRoutingConfig() *Config {
}
}

func resourceKeysBasedRoutingConfig() *Config {
return &Config{
Resolver: ResolverSettings{
Static: &StaticResolver{Hostnames: []string{"endpoint-1", "endpoint-2"}},
},
RoutingKey: "resource_keys",
ResourceKeys: []string{"resource.key_1", "resource.key_2"},
}
}

type mockTracesExporter struct {
component.Component
ConsumeTracesFn func(ctx context.Context, td ptrace.Traces) error
Expand Down