Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add support for loadbalancing with any resource attribute #18769

8 changes: 5 additions & 3 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ type routingKey int
const (
traceIDRouting routingKey = iota
svcRouting
resourceAttrRouting
)

// Config defines configuration for the exporter.
type Config struct {
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
ResourceKeys []string `mapstructure:"resource_keys"`
}

// Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment.
Expand Down
3 changes: 2 additions & 1 deletion exporter/loadbalancingexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -54,7 +55,7 @@ func createDefaultConfig() component.Config {
}

func createTracesExporter(_ context.Context, params exporter.CreateSettings, cfg component.Config) (exporter.Traces, error) {
return newTracesExporter(params, cfg)
return newTracesExporter(params, cfg, zap.NewNop())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can actually get a logger from params.

}

func createLogsExporter(_ context.Context, params exporter.CreateSettings, cfg component.Config) (exporter.Logs, error) {
Expand Down
3 changes: 2 additions & 1 deletion exporter/loadbalancingexporter/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.uber.org/zap"
)

func TestNewLoadBalancerNoResolver(t *testing.T) {
Expand Down Expand Up @@ -178,7 +179,7 @@ func TestStartFailureStaticResolver(t *testing.T) {
func TestLoadBalancerShutdown(t *testing.T) {
// prepare
cfg := simpleConfig()
p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg)
p, err := newTracesExporter(exportertest.NewNopCreateSettings(), cfg, zap.NewNop())
require.NotNil(t, p)
require.NoError(t, err)

Expand Down
29 changes: 27 additions & 2 deletions exporter/loadbalancingexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ loadbalancing:
resolver:
static:
hostnames:
- endpoint-1 # assumes 4317 as the default port
- endpoint-2:55678
- endpoint-1 # assumes 4317 as the default port
- endpoint-2:55678
loadbalancing/2:
protocol:
otlp:
Expand All @@ -27,3 +27,28 @@ loadbalancing/3:
dns:
hostname: service-1
port: 55690
loadbalancing/4:
protocol:
otlp:
resolver:
dns:
hostname: service-1
port: 55690
routing_key: traceID
loadbalancing/5:
protocol:
otlp:
resolver:
dns:
hostname: service-1
port: 55690
routing_key: service
loadbalancing/6:
protocol:
otlp:
resolver:
dns:
hostname: service-1
port: 55690
routing_key: resource
resource_keys: ["resource.attribute", "service.name"]
201 changes: 145 additions & 56 deletions exporter/loadbalancingexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,41 @@ import (
"sync"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
"go.uber.org/zap"
)

var _ exporter.Traces = (*traceExporterImp)(nil)

type traceExporterImp struct {
loadBalancer loadBalancer
routingKey routingKey
resourceKeys []string

stopped bool
shutdownWg sync.WaitGroup
traceConsumer traceConsumer
stopped bool
shutdownWg sync.WaitGroup
logger *zap.Logger
}

type routingEntry struct {
routingKey routingKey
keyValue string
trace ptrace.Traces
}

type traceConsumer func(ctx context.Context, td ptrace.Traces) error

// Create new traces exporter
func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*traceExporterImp, error) {
func newTracesExporter(params exporter.CreateSettings, cfg component.Config, logger *zap.Logger) (*traceExporterImp, error) {
exporterFactory := otlpexporter.NewFactory()

lb, err := newLoadBalancer(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) {
Expand All @@ -55,12 +66,17 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t
return nil, err
}

traceExporter := traceExporterImp{loadBalancer: lb, routingKey: traceIDRouting}
traceExporter := traceExporterImp{loadBalancer: lb, logger: logger}

switch cfg.(*Config).RoutingKey {
case "service":
traceExporter.routingKey = svcRouting
traceExporter.traceConsumer = traceExporter.consumeTracesByResource
traceExporter.resourceKeys = []string{"service.name"}
case "resource":
traceExporter.traceConsumer = traceExporter.consumeTracesByResource
traceExporter.resourceKeys = cfg.(*Config).ResourceKeys
case "traceID", "":
traceExporter.traceConsumer = traceExporter.consumeTracesById
default:
return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey)
}
Expand Down Expand Up @@ -88,80 +104,153 @@ func (e *traceExporterImp) Shutdown(context.Context) error {
}

func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
return e.traceConsumer(ctx, td)
}

func (e *traceExporterImp) consumeTracesById(ctx context.Context, td ptrace.Traces) error {
var errs error
batches := batchpersignal.SplitTraces(td)
for _, batch := range batches {
errs = multierr.Append(errs, e.consumeTrace(ctx, batch))
}

for _, t := range batches {
if tid, err := routeByTraceId(t); err == nil {
errs = multierr.Append(errs, e.consumeTrace(ctx, t, tid))
} else {
return err
}
}
return errs
}

func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces) error {
var exp component.Component
routingIds, err := routingIdentifiersFromTraces(td, e.routingKey)
func (e *traceExporterImp) consumeTracesByResource(ctx context.Context, td ptrace.Traces) error {
var errs error
routeBatches, err := splitTracesByResourceAttr(td, e.resourceKeys)
if err != nil {
return err
}
for rid := range routingIds {
endpoint := e.loadBalancer.Endpoint([]byte(rid))
exp, err = e.loadBalancer.Exporter(endpoint)
if err != nil {
return err
for _, batch := range routeBatches {
switch batch.routingKey {
case resourceAttrRouting:
errs = multierr.Append(errs, e.consumeTrace(ctx, batch.trace, batch.keyValue))
case traceIDRouting:
errs = multierr.Append(errs, e.consumeTracesById(ctx, batch.trace))
}
}
return errs

te, ok := exp.(exporter.Traces)
if !ok {
return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp)
}
}

func (e *traceExporterImp) consumeTrace(ctx context.Context, td ptrace.Traces, rid string) error {
// Routes a single trace via a given routing ID
endpoint := e.loadBalancer.Endpoint([]byte(rid))
exp, err := e.loadBalancer.Exporter(endpoint)
if err != nil {
return err
}

start := time.Now()
err = te.ConsumeTraces(ctx, td)
duration := time.Since(start)
te, ok := exp.(exporter.Traces)
if !ok {
return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", exp)
}

start := time.Now()
err = te.ConsumeTraces(ctx, td)
duration := time.Since(start)

if err == nil {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator},
mBackendLatency.M(duration.Milliseconds()))
} else {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator},
mBackendLatency.M(duration.Milliseconds()))
}
return err
}

if err == nil {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successTrueMutator},
mBackendLatency.M(duration.Milliseconds()))
func getResourceAttrValue(rs ptrace.ResourceSpans, resourceKeys []string) (string, bool) {
res := ""
found := false
rs.Resource().Attributes().Range(
func(k string, v pcommon.Value) bool {
for _, attrKey := range resourceKeys {
if k == attrKey {
res = v.Str()
found = true
return false
}
}
return true
})
return res, found
}

func splitTracesByResourceAttr(batches ptrace.Traces, resourceKeys []string) ([]routingEntry, error) {
// This function batches all the ResourceSpans with the same routing resource attribute value into a single ptrace.Trace
// This returns a list of routing entries which consists of the routing key, routing key value and the trace
// There should be a 1:1 mapping between key value <-> trace
// This is because we group all Resource Spans with the same key value under a single trace
var result []routingEntry
rss := batches.ResourceSpans()

// This is a mapping between the resource attribute values found and the constructed trace
routeMap := make(map[string]ptrace.Traces)

for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)
if keyValue, ok := getResourceAttrValue(rs, resourceKeys); ok {
// Check if this keyValue has previously been seen
// if not it constructs an empty ptrace.Trace
if _, ok := routeMap[keyValue]; !ok {
routeMap[keyValue] = ptrace.NewTraces()
}
rs.CopyTo(routeMap[keyValue].ResourceSpans().AppendEmpty())
} else {
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(endpointTagKey, endpoint), successFalseMutator},
mBackendLatency.M(duration.Milliseconds()))
// If none of the resource attributes have been found
// We fallback to routing the given Resource Span by Trace ID
t := ptrace.NewTraces()
rs.CopyTo(t.ResourceSpans().AppendEmpty())
// We can't route this whole Resource Span by a single trace ID
// because it's possible for the spans under the RS to have different trace IDs
result = append(result, routingEntry{
routingKey: traceIDRouting,
trace: t,
})
}
}
return err

// We convert the attr value:trace mapping into a list of routingEntries
for key, trace := range routeMap {
result = append(result, routingEntry{
routingKey: resourceAttrRouting,
keyValue: key,
trace: trace,
})
}

return result, nil
}

func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]bool, error) {
ids := make(map[string]bool)
func routeByTraceId(td ptrace.Traces) (string, error) {
// This function assumes that you are receiving a single trace i.e. single traceId
// returns the traceId as the routing key
rs := td.ResourceSpans()
if rs.Len() == 0 {
return nil, errors.New("empty resource spans")
return "", errors.New("empty resource spans")
}
if rs.Len() > 1 {
return "", errors.New("routeByTraceId must receive a ptrace.Traces with a single ResourceSpan")
}

ils := rs.At(0).ScopeSpans()
if ils.Len() == 0 {
return nil, errors.New("empty scope spans")
return "", errors.New("empty scope spans")
}

spans := ils.At(0).Spans()
if spans.Len() == 0 {
return nil, errors.New("empty spans")
}

if key == svcRouting {
for i := 0; i < rs.Len(); i++ {
svc, ok := rs.At(i).Resource().Attributes().Get("service.name")
if !ok {
return nil, errors.New("unable to get service name")
}
ids[svc.Str()] = true
}
return ids, nil
return "", errors.New("empty spans")
}
tid := spans.At(0).TraceID()
ids[string(tid[:])] = true
return ids, nil
return string(tid[:]), nil
}
Loading