From 64b951c946f6797b9755c28677faf472015d280c Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 10 Oct 2024 12:21:13 -0400 Subject: [PATCH] [connector/routing] Add support for OTTL conditions --- .chloggen/routing-connector-conditions.yaml | 28 ++++++++++++ connector/routingconnector/README.md | 22 +++++---- connector/routingconnector/config.go | 23 +++++++--- connector/routingconnector/config_test.go | 49 ++++++++++++++++++++- connector/routingconnector/logs_test.go | 6 +-- connector/routingconnector/metrics_test.go | 6 +-- connector/routingconnector/router.go | 3 ++ connector/routingconnector/traces_test.go | 6 +-- 8 files changed, 113 insertions(+), 30 deletions(-) create mode 100644 .chloggen/routing-connector-conditions.yaml diff --git a/.chloggen/routing-connector-conditions.yaml b/.chloggen/routing-connector-conditions.yaml new file mode 100644 index 000000000000..ca2cff6ad0b9 --- /dev/null +++ b/.chloggen/routing-connector-conditions.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: routingconnector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow routing based on OTTL Conditions + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35731] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Each route must contain either a statement or a condition. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/connector/routingconnector/README.md b/connector/routingconnector/README.md index 7ce1b0dd9937..91e38baf5e71 100644 --- a/connector/routingconnector/README.md +++ b/connector/routingconnector/README.md @@ -32,7 +32,8 @@ If you are not already familiar with connectors, you may find it helpful to firs The following settings are available: - `table (required)`: the routing table for this connector. -- `table.statement (required)`: the routing condition provided as the [OTTL] statement. +- `table.statement`: the routing condition provided as the [OTTL] statement. Required if `table.condition` is not provided. +- `table.condition`: the routing condition provided as the [OTTL] condition. Required if `table.statement` is not provided. - `table.pipelines (required)`: the list of pipelines to use when the routing condition is met. - `default_pipelines (optional)`: contains the list of pipelines to use when a record does not meet any of specified conditions. - `error_mode (optional)`: determines how errors returned from OTTL statements are handled. Valid values are `propagate`, `ignore` and `silent`. If `ignore` or `silent` is used and a statement's condition has an error then the payload will be routed to the default pipelines. When `silent` is used the error is not logged. If not supplied, `propagate` is used. @@ -97,23 +98,20 @@ Respectively, if none of the routing conditions met, then a signal is routed to - The connector will only route using [OTTL] statements which can only be applied to resource attributes. It does not support matching on context values at this time. - The connector routes to pipelines, not exporters as the processor does. -### OTTL Limitations -- Currently, it is not possible to specify boolean statements without function invocation as the routing condition. It is required to provide the NOOP `route()` or any other supported function as part of the routing statement, see [#13545](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13545) for more information. -- Supported [OTTL] functions: - - [IsMatch](../../pkg/ottl/ottlfuncs/README.md#IsMatch) - - [delete_key](../../pkg/ottl/ottlfuncs/README.md#delete_key) - - [delete_matching_keys](../../pkg/ottl/ottlfuncs/README.md#delete_matching_keys) +### Supported [OTTL] functions + +- [IsMatch](../../pkg/ottl/ottlfuncs/README.md#IsMatch) +- [delete_key](../../pkg/ottl/ottlfuncs/README.md#delete_key) +- [delete_matching_keys](../../pkg/ottl/ottlfuncs/README.md#delete_matching_keys) ## Additional Settings + The full list of settings exposed for this connector are documented [here](./config.go) with detailed sample configuration files: - [logs](./testdata/config_logs.yaml) - [metrics](./testdata/config_metrics.yaml) - [traces](./testdata/config_traces.yaml) -[in development]:https://github.com/open-telemetry/opentelemetry-collector#in-development [Connectors README]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md -[Exporter Pipeline Type]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type -[Receiver Pipeline Type]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type -[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib -[OTTL]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/processing.md#telemetry-query-language + +[OTTL]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md diff --git a/connector/routingconnector/config.go b/connector/routingconnector/config.go index 2288f4d262b4..8c868e152a00 100644 --- a/connector/routingconnector/config.go +++ b/connector/routingconnector/config.go @@ -12,10 +12,11 @@ import ( ) var ( - errEmptyRoute = errors.New("invalid route: no statement provided") - errNoPipelines = errors.New("invalid route: no pipelines defined") - errUnexpectedConsumer = errors.New("expected consumer to be a connector router") - errNoTableItems = errors.New("invalid routing table: the routing table is empty") + errNoConditionOrStatement = errors.New("invalid route: no condition or statement provided") + errConditionAndStatement = errors.New("invalid route: both condition and statement provided") + errNoPipelines = errors.New("invalid route: no pipelines defined") + errUnexpectedConsumer = errors.New("expected consumer to be a connector router") + errNoTableItems = errors.New("invalid routing table: the routing table is empty") ) // Config defines configuration for the Routing processor. @@ -55,8 +56,12 @@ func (c *Config) Validate() error { // validate that every route has a value for the routing attribute and has // at least one pipeline for _, item := range c.Table { - if len(item.Statement) == 0 { - return errEmptyRoute + if item.Statement == "" && item.Condition == "" { + return errNoConditionOrStatement + } + + if item.Statement != "" && item.Condition != "" { + return errConditionAndStatement } if len(item.Pipelines) == 0 { @@ -70,9 +75,13 @@ func (c *Config) Validate() error { // RoutingTableItem specifies how data should be routed to the different pipelines type RoutingTableItem struct { // Statement is a OTTL statement used for making a routing decision. - // Required when 'Value' isn't provided. + // One of 'Statement' or 'Condition' must be provided. Statement string `mapstructure:"statement"` + // Condition is an OTTL condition used for making a routing decision. + // One of 'Statement' or 'Condition' must be provided. + Condition string `mapstructure:"condition"` + // Pipelines contains the list of pipelines to use when the value from the FromAttribute field // matches this table item. When no pipelines are specified, the ones specified under // DefaultPipelines are used, if any. diff --git a/connector/routingconnector/config_test.go b/connector/routingconnector/config_test.go index bd25c054eb00..1beb6bd44638 100644 --- a/connector/routingconnector/config_test.go +++ b/connector/routingconnector/config_test.go @@ -135,7 +135,7 @@ func TestValidateConfig(t *testing.T) { }, }, }, - error: "invalid route: no statement provided", + error: "invalid route: no condition or statement provided", }, { name: "no pipeline provided", @@ -162,11 +162,56 @@ func TestValidateConfig(t *testing.T) { config: &Config{}, error: "invalid routing table: the routing table is empty", }, + { + name: "condition provided", + config: &Config{ + Table: []RoutingTableItem{ + { + Condition: `attributes["attr"] == "acme"`, + Pipelines: []pipeline.ID{ + pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"), + }, + }, + }, + }, + }, + { + name: "statement provided", + config: &Config{ + Table: []RoutingTableItem{ + { + Statement: `route() where attributes["attr"] == "acme"`, + Pipelines: []pipeline.ID{ + pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"), + }, + }, + }, + }, + }, + { + name: "both condition and statement provided", + config: &Config{ + Table: []RoutingTableItem{ + { + Condition: `attributes["attr"] == "acme"`, + Statement: `route() where attributes["attr"] == "acme"`, + Pipelines: []pipeline.ID{ + pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"), + }, + }, + }, + }, + error: "invalid route: both condition and statement provided", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.EqualError(t, component.ValidateConfig(tt.config), tt.error) + if tt.error == "" { + assert.NoError(t, component.ValidateConfig(tt.config)) + } else { + assert.EqualError(t, component.ValidateConfig(tt.config), tt.error) + } }) } } diff --git a/connector/routingconnector/logs_test.go b/connector/routingconnector/logs_test.go index 2881f34b6bad..ee450a17e4a9 100644 --- a/connector/routingconnector/logs_test.go +++ b/connector/routingconnector/logs_test.go @@ -31,7 +31,7 @@ func TestLogsRegisterConsumersForValidRoute(t *testing.T) { Pipelines: []pipeline.ID{logs0}, }, { - Statement: `route() where attributes["X-Tenant"] == "*"`, + Condition: `attributes["X-Tenant"] == "*"`, Pipelines: []pipeline.ID{logs0, logs1}, }, }, @@ -84,7 +84,7 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) { DefaultPipelines: []pipeline.ID{logsDefault}, Table: []RoutingTableItem{ { - Statement: `route() where IsMatch(attributes["X-Tenant"], ".*acme") == true`, + Condition: `IsMatch(attributes["X-Tenant"], ".*acme") == true`, Pipelines: []pipeline.ID{logs0}, }, { @@ -247,7 +247,7 @@ func TestLogsAreCorrectlyMatchOnceWithOTTL(t *testing.T) { Pipelines: []pipeline.ID{logs1}, }, { - Statement: `route() where attributes["X-Tenant"] == "ecorp"`, + Condition: `attributes["X-Tenant"] == "ecorp"`, Pipelines: []pipeline.ID{logsDefault, logs0}, }, }, diff --git a/connector/routingconnector/metrics_test.go b/connector/routingconnector/metrics_test.go index 53e2fb57962a..ffd609abc9b1 100644 --- a/connector/routingconnector/metrics_test.go +++ b/connector/routingconnector/metrics_test.go @@ -31,7 +31,7 @@ func TestMetricsRegisterConsumersForValidRoute(t *testing.T) { Pipelines: []pipeline.ID{metrics0}, }, { - Statement: `route() where attributes["X-Tenant"] == "*"`, + Condition: `attributes["X-Tenant"] == "*"`, Pipelines: []pipeline.ID{metrics0, metrics1}, }, }, @@ -84,7 +84,7 @@ func TestMetricsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) { DefaultPipelines: []pipeline.ID{metricsDefault}, Table: []RoutingTableItem{ { - Statement: `route() where attributes["value"] > 2.5`, + Condition: `attributes["value"] > 2.5`, Pipelines: []pipeline.ID{metrics0}, }, { @@ -262,7 +262,7 @@ func TestMetricsAreCorrectlyMatchOnceWithOTTL(t *testing.T) { Pipelines: []pipeline.ID{metrics1}, }, { - Statement: `route() where attributes["value"] == 1.0`, + Condition: `attributes["value"] == 1.0`, Pipelines: []pipeline.ID{metricsDefault, metrics0}, }, }, diff --git a/connector/routingconnector/router.go b/connector/routingconnector/router.go index a2f711efad06..aef71a743e39 100644 --- a/connector/routingconnector/router.go +++ b/connector/routingconnector/router.go @@ -149,6 +149,9 @@ func (r *router[C]) registerRouteConsumers() error { // does not contain a valid OTTL statement then nil is returned. func (r *router[C]) getStatementFrom(item RoutingTableItem) (*ottl.Statement[ottlresource.TransformContext], error) { var statement *ottl.Statement[ottlresource.TransformContext] + if item.Condition != "" { + item.Statement = fmt.Sprintf("route() where %s", item.Condition) + } if item.Statement != "" { var err error statement, err = r.parser.ParseStatement(item.Statement) diff --git a/connector/routingconnector/traces_test.go b/connector/routingconnector/traces_test.go index 200a670ff292..fb42892ae00e 100644 --- a/connector/routingconnector/traces_test.go +++ b/connector/routingconnector/traces_test.go @@ -31,7 +31,7 @@ func TestTracesRegisterConsumersForValidRoute(t *testing.T) { Pipelines: []pipeline.ID{traces0}, }, { - Statement: `route() where attributes["X-Tenant"] == "*"`, + Condition: `attributes["X-Tenant"] == "*"`, Pipelines: []pipeline.ID{traces0, traces1}, }, }, @@ -84,7 +84,7 @@ func TestTracesCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) { DefaultPipelines: []pipeline.ID{tracesDefault}, Table: []RoutingTableItem{ { - Statement: `route() where attributes["value"] > 0 and attributes["value"] < 4`, + Condition: `attributes["value"] > 0 and attributes["value"] < 4`, Pipelines: []pipeline.ID{traces0}, }, { @@ -223,7 +223,7 @@ func TestTracesCorrectlyMatchOnceWithOTTL(t *testing.T) { Pipelines: []pipeline.ID{traces1}, }, { - Statement: `route() where attributes["value"] == 5`, + Condition: `attributes["value"] == 5`, Pipelines: []pipeline.ID{tracesDefault, traces0}, }, },