Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[routingprocessor] Make exporters registration more generic. #13529

Merged
merged 3 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions processor/routingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ var (

type processorImp struct {
logger *zap.Logger
router *router

metricsRouter router[component.MetricsExporter]
logsRouter router[component.LogsExporter]
tracesRouter router[component.TracesExporter]
}

// newProcessor creates new processor
Expand All @@ -55,25 +58,50 @@ func newProcessor(logger *zap.Logger, cfg config.Processor) *processorImp {
oCfg := cfg.(*Config)

return &processorImp{
logger: logger,
router: newRouter(*oCfg, logger),
logger: logger,
metricsRouter: newRouter[component.MetricsExporter](*oCfg, logger),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be the first usage of generics in this code base. Are you able to find other uses? If this is the first, we might want to give a heads up to other maintainers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find any other usage of generics in contrib repository. Is there an easy way to give heads up to maintainers?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@open-telemetry/collector-contrib-maintainer , heads up: we are about to introduce the first usage of generics in the code base. Does anyone have anything against this?

Copy link
Member

@TylerHelmuth TylerHelmuth Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TQL already has some due to the participle package requiring them. It is about to have some more via #13320.

logsRouter: newRouter[component.LogsExporter](*oCfg, logger),
tracesRouter: newRouter[component.TracesExporter](*oCfg, logger),
}
}

func (e *processorImp) Start(_ context.Context, host component.Host) error {
return e.router.registerExporters(host.GetExporters())
exporters := host.GetExporters()

err := e.metricsRouter.registerExportersForType(exporters, config.MetricsDataType)
if err != nil {
return err
}
err = e.logsRouter.registerExportersForType(exporters, config.LogsDataType)
if err != nil {
return err
}
err = e.tracesRouter.registerExportersForType(exporters, config.TracesDataType)
if err != nil {
return err
}
if len(e.tracesRouter.exporters) == 0 &&
len(e.tracesRouter.defaultExporters) == 0 &&
len(e.metricsRouter.exporters) == 0 &&
len(e.metricsRouter.defaultExporters) == 0 &&
len(e.logsRouter.exporters) == 0 &&
len(e.logsRouter.defaultExporters) == 0 {
return errNoExportersAfterRegistration
}

return nil
}

func (e *processorImp) Shutdown(context.Context) error {
return nil
}

func (e *processorImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
routedTraces := e.router.RouteTraces(ctx, td)
routedTraces := e.tracesRouter.RouteTraces(ctx, td)
for _, rt := range routedTraces {
for _, exp := range rt.exporters {
// TODO: determine the proper action when errors happen
if err := exp.ConsumeTraces(ctx, rt.traces); err != nil {
if err := exp.ConsumeTraces(ctx, rt.signal); err != nil {
return err
}
}
Expand All @@ -83,11 +111,11 @@ func (e *processorImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro
}

func (e *processorImp) ConsumeMetrics(ctx context.Context, tm pmetric.Metrics) error {
routedMetrics := e.router.RouteMetrics(ctx, tm)
routedMetrics := e.metricsRouter.RouteMetrics(ctx, tm)
for _, rm := range routedMetrics {
for _, exp := range rm.exporters {
// TODO: determine the proper action when errors happen
if err := exp.ConsumeMetrics(ctx, rm.metrics); err != nil {
if err := exp.ConsumeMetrics(ctx, rm.signal); err != nil {
return err
}
}
Expand All @@ -97,11 +125,11 @@ func (e *processorImp) ConsumeMetrics(ctx context.Context, tm pmetric.Metrics) e
}

func (e *processorImp) ConsumeLogs(ctx context.Context, tl plog.Logs) error {
routedLogs := e.router.RouteLogs(ctx, tl)
routedLogs := e.logsRouter.RouteLogs(ctx, tl)
for _, rl := range routedLogs {
for _, exp := range rl.exporters {
// TODO: determine the proper action when errors happen
if err := exp.ConsumeLogs(ctx, rl.logs); err != nil {
if err := exp.ConsumeLogs(ctx, rl.signal); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion processor/routingprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestTraces_RegisterExportersForValidRoute(t *testing.T) {
require.NoError(t, exp.Start(context.Background(), host))

// verify
assert.Contains(t, exp.router.tracesExporters["acme"], otlpExp)
assert.Contains(t, exp.tracesRouter.exporters["acme"], otlpExp)
}

func TestTraces_ErrorRequestedExporterNotFoundForRoute(t *testing.T) {
Expand Down
Loading