Skip to content

Commit

Permalink
[connector/routing] Supports matching the statement only once. (#28888)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Adding a feature: routing connector supports matching the statement only
once

**Link to tracking Issue:** <Issue number if applicable> #26353

**Testing:** <Describe what testing was performed and which tests were
added.>
Basic tests included for the functionality.

**Documentation:** <Describe the documentation added.>

---------

Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
  • Loading branch information
huange7 and djaglowski authored Dec 14, 2023
1 parent ed85d14 commit b4563f3
Show file tree
Hide file tree
Showing 10 changed files with 509 additions and 5 deletions.
27 changes: 27 additions & 0 deletions .chloggen/feat_routing_match_once.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: routingconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: routingconnector supports matching the statement only once

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26353]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
12 changes: 12 additions & 0 deletions connector/routingconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The following settings are available:
- `table.pipelines (required)`: the list of pipelines to use when the routing condition is met.
- `default_pipelines (optional)`: contains the list of pipelines to use when a record does not meet any of specified conditions.
- `error_mode (optional)`: determines how errors returned from OTTL statements are handled. Valid values are `propagate`, `ignore` and `silent`. If `ignored` or `silent` is used and a statement's condition has an error then the payload will be routed to the default pipelines. When `silent` is used the error is not logged. If not supplied, `propagate` is used.
- `match_once (optional, default: false)`: determines whether the connector matches multiple statements or not. If enabled, the payload will be routed to the first pipeline in the `table` whose routing condition is met.

Example:

Expand All @@ -55,12 +56,23 @@ connectors:
routing:
default_pipelines: [traces/jaeger]
error_mode: ignore
match_once: false
table:
- statement: route() where attributes["X-Tenant"] == "acme"
pipelines: [traces/jaeger-acme]
- statement: delete_key(attributes, "X-Tenant") where IsMatch(attributes["X-Tenant"], ".*corp")
pipelines: [traces/jaeger-ecorp]

routing/match_once:
default_pipelines: [traces/jaeger]
error_mode: ignore
match_once: true
table:
- statement: route() where attributes["X-Tenant"] == "acme"
pipelines: [traces/jaeger-acme]
- statement: route() where attributes["X-Tenant"] == ".*acme"
pipelines: [traces/jaeger-ecorp]

service:
pipelines:
traces/in:
Expand Down
4 changes: 4 additions & 0 deletions connector/routingconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type Config struct {
// Table contains the routing table for this processor.
// Required.
Table []RoutingTableItem `mapstructure:"table"`

// MatchOnce determines whether the connector matches multiple statements.
// Optional.
MatchOnce bool `mapstructure:"match_once"`
}

// Validate checks if the processor configuration is valid.
Expand Down
5 changes: 4 additions & 1 deletion connector/routingconnector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
rtx := ottlresource.NewTransformContext(rlogs.Resource())

noRoutesMatch := true
for _, route := range c.router.routes {
for _, route := range c.router.routeSlice {
_, isMatch, err := route.statement.Execute(ctx, rtx)
if err != nil {
if c.config.ErrorMode == ottl.PropagateError {
Expand All @@ -84,6 +84,9 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
if isMatch {
noRoutesMatch = false
c.group(groups, route.consumer, rlogs)
if c.config.MatchOnce {
break
}
}

}
Expand Down
152 changes: 152 additions & 0 deletions connector/routingconnector/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,158 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
})
}

func TestLogsAreCorrectlyMatchOnceWithOTTL(t *testing.T) {
logsDefault := component.NewIDWithName(component.DataTypeLogs, "default")
logs0 := component.NewIDWithName(component.DataTypeLogs, "0")
logs1 := component.NewIDWithName(component.DataTypeLogs, "1")

cfg := &Config{
DefaultPipelines: []component.ID{logsDefault},
Table: []RoutingTableItem{
{
Statement: `route() where IsMatch(attributes["X-Tenant"], ".*acme") == true`,
Pipelines: []component.ID{logs0},
},
{
Statement: `route() where IsMatch(attributes["X-Tenant"], "_acme") == true`,
Pipelines: []component.ID{logs1},
},
{
Statement: `route() where attributes["X-Tenant"] == "ecorp"`,
Pipelines: []component.ID{logsDefault, logs0},
},
},
MatchOnce: true,
}

var defaultSink, sink0, sink1 consumertest.LogsSink

router := connectortest.NewLogsRouter(
connectortest.WithLogsSink(logsDefault, &defaultSink),
connectortest.WithLogsSink(logs0, &sink0),
connectortest.WithLogsSink(logs1, &sink1),
)

resetSinks := func() {
defaultSink.Reset()
sink0.Reset()
sink1.Reset()
}

factory := NewFactory()
conn, err := factory.CreateLogsToLogs(
context.Background(),
connectortest.NewNopCreateSettings(),
cfg,
router.(consumer.Logs),
)

require.NoError(t, err)
require.NotNil(t, conn)
require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, conn.Shutdown(context.Background()))
}()

t.Run("logs matched by no expressions", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()
rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "something-else")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 1)
assert.Len(t, sink0.AllLogs(), 0)
assert.Len(t, sink1.AllLogs(), 0)
})

t.Run("logs matched one expression", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "xacme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 0)
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 0)
})

t.Run("logs matched by two expressions, but sinks to one", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "x_acme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

rl = l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "_acme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 0)
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 0)

assert.Equal(t, sink0.AllLogs()[0].LogRecordCount(), 2)
})

t.Run("one log matched by multiple expressions, other matched none", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "_acme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

rl = l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "something-else")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 1)
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 0)

rlog := defaultSink.AllLogs()[0].ResourceLogs().At(0)
attr, ok := rlog.Resource().Attributes().Get("X-Tenant")
assert.True(t, ok, "routing attribute must exists")
assert.Equal(t, attr.AsString(), "something-else")
})

t.Run("logs matched by one expression, multiple pipelines", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "ecorp")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 1)
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 0)

assert.Equal(t, defaultSink.AllLogs()[0].LogRecordCount(), 1)
assert.Equal(t, sink0.AllLogs()[0].LogRecordCount(), 1)
assert.Equal(t, defaultSink.AllLogs(), sink0.AllLogs())
})
}

func TestLogsResourceAttributeDroppedByOTTL(t *testing.T) {
logsDefault := component.NewIDWithName(component.DataTypeLogs, "default")
logsOther := component.NewIDWithName(component.DataTypeLogs, "other")
Expand Down
5 changes: 4 additions & 1 deletion connector/routingconnector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric
rtx := ottlresource.NewTransformContext(rmetrics.Resource())

noRoutesMatch := true
for _, route := range c.router.routes {
for _, route := range c.router.routeSlice {
_, isMatch, err := route.statement.Execute(ctx, rtx)
if err != nil {
if c.config.ErrorMode == ottl.PropagateError {
Expand All @@ -84,6 +84,9 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric
if isMatch {
noRoutesMatch = false
c.group(groups, route.consumer, rmetrics)
if c.config.MatchOnce {
break
}
}

}
Expand Down
Loading

0 comments on commit b4563f3

Please sign in to comment.