Skip to content

Commit

Permalink
Allow defining multiple conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
TylerHelmuth committed Nov 21, 2022
1 parent 1db4d19 commit 241591b
Show file tree
Hide file tree
Showing 12 changed files with 275 additions and 149 deletions.
33 changes: 17 additions & 16 deletions processor/filterprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ processors:
## OTTL
The [OpenTelemetry Transformation Language](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md) is a language for interacting with telemetry within the collector in generic ways.
The filterprocessor can be configured to use OTTL conditions to determine when to drop telemetry.
If any condition is met, the telemetry is dropped.
Each configuration option corresponds with a different type of telemetry and OTTL Context.
See the table below for details on each context and the fields it exposes.

Expand Down Expand Up @@ -317,24 +318,24 @@ If all datapoints for a metric are dropped, the metric will also be dropped.
processors:
filter/spans-and-spanevents:
spans:
span: '
attributes["container.name"] == "app_container_1" or
resource.attributes["host.name"] == "localhost" or
name == "app_3'
spanevent: '
attributes["grpc"] == true or
IsMatch(name, ".*grpc.*") == true'
span_conditions:
- 'attributes["container.name"] == "app_container_1"'
- 'resource.attributes["host.name"] == "localhost"'
- 'name == "app_3"'
spanevent_conditions:
- 'attributes["grpc"] == true'
- 'IsMatch(name, ".*grpc.*") == true'
metrics:
metric: '
(name == "my.metric" and attributes["my_label"] == "abc123") or
(type == METRIC_DATA_TYPE_HISTOGRAM)'
datapoint: '
metric.type == METRIC_DATA_TYPE_SUMMARY and
resource.attributes["service.name"] == "my_service_name"'
metric_conditions:
- 'name == "my.metric" and attributes["my_label"] == "abc123"'
- 'type == METRIC_DATA_TYPE_HISTOGRAM'
datapoint_conditions:
- 'metric.type == METRIC_DATA_TYPE_SUMMARY'
- 'resource.attributes["service.name"] == "my_service_name"'
logs:
log: '
IsMatch(body, ".*password.*") == true or
severity_number < SEVERITY_NUMBER_WARN'
log_conditions:
- 'IsMatch(body, ".*password.*") == true'
- 'severity_number < SEVERITY_NUMBER_WARN'
```

[alpha]:https://github.com/open-telemetry/opentelemetry-collector#alpha
Expand Down
60 changes: 30 additions & 30 deletions processor/filterprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ type MetricFilters struct {
// RegexpConfig specifies options for the Regexp match type
RegexpConfig *regexp.Config `mapstructure:"regexp"`

// Metric is an OTTL condition for an ottlmetric context.
// If the condition resolves to true, the metric will be dropped.
// MetricConditions is a list of OTTL conditions for an ottlmetric context.
// If any condition resolves to true, the metric will be dropped.
// Supports `and`, `or`, and `()`
Metric string `mapstructure:"metric"`
MetricConditions []string `mapstructure:"metric_conditions"`

// DataPoint is an OTTL condition for an ottldatapoint context.
// If the condition resolves to true, the datapoint will be dropped.
// DataPointConditions is a list of OTTL conditions for an ottldatapoint context.
// If any condition resolves to true, the datapoint will be dropped.
// Supports `and`, `or`, and `()`
DataPoint string `mapstructure:"datapoint"`
DataPointConditions []string `mapstructure:"datapoint_conditions"`
}

// SpanFilters filters by Span attributes and various other fields, Regexp config is per matcher
Expand All @@ -86,15 +86,15 @@ type SpanFilters struct {
// If both Include and Exclude are specified, Include filtering occurs first.
Exclude *filterconfig.MatchProperties `mapstructure:"exclude"`

// Span is an OTTL condition for an ottlspan context.
// If the condition resolves to true, the span will be dropped.
// SpanConditions is a list of OTTL conditions for an ottlspan context.
// If any condition resolves to true, the span will be dropped.
// Supports `and`, `or`, and `()`
Span string `mapstructure:"span"`
SpanConditions []string `mapstructure:"span_conditions"`

// SpanEvent is an OTTL condition for an ottlspanevent context.
// If the condition resolves to true, the span event will be dropped.
// SpanEventConditions is a list of OTTL conditions for an ottlspanevent context.
// If any condition resolves to true, the span event will be dropped.
// Supports `and`, `or`, and `()`
SpanEvent string `mapstructure:"spanevent"`
SpanEventConditions []string `mapstructure:"spanevent_conditions"`
}

// LogFilters filters by Log properties.
Expand All @@ -108,10 +108,10 @@ type LogFilters struct {
// If both Include and Exclude are specified, Include filtering occurs first.
Exclude *LogMatchProperties `mapstructure:"exclude"`

// Log is an OTTL condition for an ottllog context.
// If the condition resolves to true, the log event will be dropped.
// LogConditions is a list of OTTL conditions for an ottllog context.
// If any condition resolves to true, the log event will be dropped.
// Supports `and`, `or`, and `()`
Log string `mapstructure:"log"`
LogConditions []string `mapstructure:"log_conditions"`
}

// LogMatchType specifies the strategy for matching against `plog.Log`s.
Expand Down Expand Up @@ -286,53 +286,53 @@ var _ component.ProcessorConfig = (*Config)(nil)

// Validate checks if the processor configuration is valid
func (cfg *Config) Validate() error {
if (cfg.Spans.Span != "" || cfg.Spans.SpanEvent != "") && (cfg.Spans.Include != nil || cfg.Spans.Exclude != nil) {
if (cfg.Spans.SpanConditions != nil || cfg.Spans.SpanEventConditions != nil) && (cfg.Spans.Include != nil || cfg.Spans.Exclude != nil) {
return fmt.Errorf("cannot use ottl conditions and include/exclude for spans at the same time")
}
if (cfg.Metrics.Metric != "" || cfg.Metrics.DataPoint != "") && (cfg.Metrics.Include != nil || cfg.Metrics.Exclude != nil) {
if (cfg.Metrics.MetricConditions != nil || cfg.Metrics.DataPointConditions != nil) && (cfg.Metrics.Include != nil || cfg.Metrics.Exclude != nil) {
return fmt.Errorf("cannot use ottl conditions and include/exclude for metrics at the same time")
}
if cfg.Logs.Log != "" && (cfg.Logs.Include != nil || cfg.Logs.Exclude != nil) {
if cfg.Logs.LogConditions != nil && (cfg.Logs.Include != nil || cfg.Logs.Exclude != nil) {
return fmt.Errorf("cannot use ottl conditions and include/exclude for logs at the same time")
}

var errors error

if cfg.Spans.Span != "" {
if cfg.Spans.SpanConditions != nil {
spanp := ottlspan.NewParser(common.Functions[ottlspan.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := spanp.ParseStatements(common.PrepareConditionForParsing(cfg.Spans.Span))
_, err := spanp.ParseStatements(common.PrepareConditionForParsing(cfg.Spans.SpanConditions))
errors = multierr.Append(errors, err)
}

if cfg.Spans.SpanEvent != "" {
if cfg.Spans.SpanEventConditions != nil {
spaneventp := ottlspanevent.NewParser(common.Functions[ottlspanevent.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := spaneventp.ParseStatements(common.PrepareConditionForParsing(cfg.Spans.SpanEvent))
_, err := spaneventp.ParseStatements(common.PrepareConditionForParsing(cfg.Spans.SpanEventConditions))
errors = multierr.Append(errors, err)
}

if cfg.Metrics.Metric != "" {
if cfg.Metrics.MetricConditions != nil {
metricp := ottlmetric.NewParser(common.Functions[ottlmetric.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := metricp.ParseStatements(common.PrepareConditionForParsing(cfg.Metrics.Metric))
_, err := metricp.ParseStatements(common.PrepareConditionForParsing(cfg.Metrics.MetricConditions))
errors = multierr.Append(errors, err)
}

if cfg.Metrics.DataPoint != "" {
if cfg.Metrics.DataPointConditions != nil {
datapointp := ottldatapoint.NewParser(common.Functions[ottldatapoint.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := datapointp.ParseStatements(common.PrepareConditionForParsing(cfg.Metrics.DataPoint))
_, err := datapointp.ParseStatements(common.PrepareConditionForParsing(cfg.Metrics.DataPointConditions))
errors = multierr.Append(errors, err)
}

if cfg.Logs.Log != "" {
if cfg.Logs.LogConditions != nil {
logp := ottllog.NewParser(common.Functions[ottllog.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := logp.ParseStatements(common.PrepareConditionForParsing(cfg.Logs.Log))
_, err := logp.ParseStatements(common.PrepareConditionForParsing(cfg.Logs.LogConditions))
errors = multierr.Append(errors, err)
}

if cfg.Logs.Log != "" && cfg.Logs.Include != nil {
if cfg.Logs.LogConditions != nil && cfg.Logs.Include != nil {
errors = multierr.Append(errors, cfg.Logs.Include.validate())
}

if cfg.Logs.Log != "" && cfg.Logs.Exclude != nil {
if cfg.Logs.LogConditions != nil && cfg.Logs.Exclude != nil {
errors = multierr.Append(errors, cfg.Logs.Exclude.validate())
}

Expand Down
25 changes: 19 additions & 6 deletions processor/filterprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,15 +857,25 @@ func TestLoadingConfigOTTL(t *testing.T) {
expected: &Config{
ProcessorSettings: config.NewProcessorSettings(component.NewID(typeStr)),
Spans: SpanFilters{
Span: `attributes["test"] == "pass"`,
SpanEvent: `attributes["test"] == "pass"`,
SpanConditions: []string{
`attributes["test"] == "pass"`,
},
SpanEventConditions: []string{
`attributes["test"] == "pass"`,
},
},
Metrics: MetricFilters{
Metric: `name == "pass"`,
DataPoint: `attributes["test"] == "pass"`,
MetricConditions: []string{
`name == "pass"`,
},
DataPointConditions: []string{
`attributes["test"] == "pass"`,
},
},
Logs: LogFilters{
Log: `attributes["test"] == "pass"`,
LogConditions: []string{
`attributes["test"] == "pass"`,
},
},
},
},
Expand All @@ -874,7 +884,10 @@ func TestLoadingConfigOTTL(t *testing.T) {
expected: &Config{
ProcessorSettings: config.NewProcessorSettings(component.NewID(typeStr)),
Spans: SpanFilters{
Span: ` attributes["test"] == "pass" or attributes["test"] == "also pass"`,
SpanConditions: []string{
`attributes["test"] == "pass"`,
`attributes["test"] == "also pass"`,
},
},
},
},
Expand Down
34 changes: 34 additions & 0 deletions processor/filterprocessor/internal/common/matcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor/internal/common"

import (
"context"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
)

func CheckConditions[K any](ctx context.Context, tCtx K, statements []*ottl.Statement[K]) (bool, error) {
for _, statement := range statements {
_, metCondition, err := statement.Execute(ctx, tCtx)
if err != nil {
return false, err
}
if metCondition {
return true, nil
}
}
return false, nil
}
8 changes: 5 additions & 3 deletions processor/filterprocessor/internal/common/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import "fmt"

const functionWithCondition = "drop() where %v"

func PrepareConditionForParsing(condition string) []string {
return []string{
fmt.Sprintf(functionWithCondition, condition),
func PrepareConditionForParsing(conditions []string) []string {
validStatements := make([]string, len(conditions))
for i, condition := range conditions {
validStatements[i] = fmt.Sprintf(functionWithCondition, condition)
}
return validStatements
}
18 changes: 9 additions & 9 deletions processor/filterprocessor/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ type filterLogProcessor struct {
excludeMatcher filterlog.Matcher
includeMatcher filterlog.Matcher
logger *zap.Logger
logCondition *ottl.Statement[ottllog.TransformContext]
logConditions []*ottl.Statement[ottllog.TransformContext]
}

func newFilterLogsProcessor(logger *zap.Logger, cfg *Config) (*filterLogProcessor, error) {
if cfg.Logs.Log != "" {
if cfg.Logs.LogConditions != nil {
logp := ottllog.NewParser(common.Functions[ottllog.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
statements, err := logp.ParseStatements(common.PrepareConditionForParsing(cfg.Logs.Log))
statements, err := logp.ParseStatements(common.PrepareConditionForParsing(cfg.Logs.LogConditions))
if err != nil {
return nil, err
}

return &filterLogProcessor{
cfg: cfg,
logger: logger,
logCondition: statements[0],
cfg: cfg,
logger: logger,
logConditions: statements,
}, nil
}

Expand Down Expand Up @@ -81,19 +81,19 @@ func newFilterLogsProcessor(logger *zap.Logger, cfg *Config) (*filterLogProcesso
}

func (flp *filterLogProcessor) processLogs(ctx context.Context, logs plog.Logs) (plog.Logs, error) {
filteringLogs := flp.logCondition != nil
filteringLogs := flp.logConditions != nil

if filteringLogs {
var errors error
logs.ResourceLogs().RemoveIf(func(rlogs plog.ResourceLogs) bool {
rlogs.ScopeLogs().RemoveIf(func(slogs plog.ScopeLogs) bool {
slogs.LogRecords().RemoveIf(func(log plog.LogRecord) bool {
tCtx := ottllog.NewTransformContext(log, slogs.Scope(), rlogs.Resource())
_, conditionMet, err := flp.logCondition.Execute(ctx, tCtx)
metCondition, err := common.CheckConditions(ctx, tCtx, flp.logConditions)
if err != nil {
errors = multierr.Append(errors, err)
}
return conditionMet
return metCondition
})
return slogs.LogRecords().Len() == 0
})
Expand Down
24 changes: 18 additions & 6 deletions processor/filterprocessor/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,13 +713,15 @@ var (
func TestFilterLogProcessorWithOTTL(t *testing.T) {
tests := []struct {
name string
conditions string
conditions []string
filterEverything bool
want func(ld plog.Logs)
}{
{
name: "drop logs",
conditions: `body == "operationA"`,
name: "drop logs",
conditions: []string{
`body == "operationA"`,
},
want: func(ld plog.Logs) {
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().RemoveIf(func(log plog.LogRecord) bool {
return log.Body().AsString() == "operationA"
Expand All @@ -730,14 +732,24 @@ func TestFilterLogProcessorWithOTTL(t *testing.T) {
},
},
{
name: "drop everything by dropping all logs",
conditions: `IsMatch(body, "operation.*") == true`,
name: "drop everything by dropping all logs",
conditions: []string{
`IsMatch(body, "operation.*") == true`,
},
filterEverything: true,
},
{
name: "multiple conditions",
conditions: []string{
`IsMatch(body, "wrong name") == true`,
`IsMatch(body, "operation.*") == true`,
},
filterEverything: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
processor, err := newFilterLogsProcessor(zap.NewNop(), &Config{Logs: LogFilters{Log: tt.conditions}})
processor, err := newFilterLogsProcessor(zap.NewNop(), &Config{Logs: LogFilters{LogConditions: tt.conditions}})
assert.NoError(t, err)

got, err := processor.processLogs(context.Background(), constructLogs())
Expand Down
Loading

0 comments on commit 241591b

Please sign in to comment.