Skip to content

Commit

Permalink
sdk/log: Add FilterProcessor and EnabledParameters (#6317)
Browse files Browse the repository at this point in the history
Per
#6271 (comment)

> We agreed that we can move `FilterProcessor` directly to `sdk/log` as
Logs SDK does not look to be stabilized soon.

- Add the possibility to filter based on the resource and scope which is
available for the SDK. The scope information is the most important as it
gives the possibility to e.g. filter out logs emitted for a given
logger. Thus e.g.
open-telemetry/opentelemetry-specification#4364
is not necessary. See
open-telemetry/opentelemetry-specification#4290 (comment)
for more context.
- It is going be an example for
open-telemetry/opentelemetry-specification#4363

There is a little overhead (IMO totally acceptable) because of data
transformation. Most importantly, there is no new heap allocation.

```
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/sdk/log
cpu: 13th Gen Intel(R) Core(TM) i7-13800H
                 │   old.txt   │                 new.txt                  │
                 │   sec/op    │     sec/op      vs base                  │
LoggerEnabled-20   4.589n ± 1%   319.750n ± 16%  +6867.75% (p=0.000 n=10)

                 │   old.txt    │             new.txt             │
                 │     B/op     │     B/op       vs base          │
LoggerEnabled-20   0.000Ki ± 0%   1.093Ki ± 13%  ? (p=0.000 n=10)

                 │  old.txt   │            new.txt             │
                 │ allocs/op  │ allocs/op   vs base            │
LoggerEnabled-20   0.000 ± 0%   0.000 ± 0%  ~ (p=1.000 n=10) ¹
¹ all samples are equal
```

`Logger.Enabled` is still more efficient than `Logger.Emit` (benchmarks
from #6315).

```
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/sdk/log
cpu: 13th Gen Intel(R) Core(TM) i7-13800H
BenchmarkLoggerEmit/5_attributes-20               559934              2391 ns/op           39088 B/op          1 allocs/op
BenchmarkLoggerEmit/10_attributes-20             1000000              5910 ns/op           49483 B/op          5 allocs/op
BenchmarkLoggerEnabled-20                        1605697               968.7 ns/op          1272 B/op          0 allocs/op
PASS
ok      go.opentelemetry.io/otel/sdk/log        10.789s
```

Prior art:
- #6271
- #6286

I also created for tracking purposes:
- #6328
  • Loading branch information
pellared authored Feb 18, 2025
1 parent b80639c commit 1ee7c79
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 127 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ The next release will require at least [Go 1.23].
- Document the pitfalls of using `Resource` as a comparable type.
`Resource.Equal` and `Resource.Equivalent` should be used instead. (#6272)
- Support [Go 1.24]. (#6304)
- Add `FilterProcessor` and `EnabledParameters` in `go.opentelemetry.io/otel/sdk/log`.
It replaces `go.opentelemetry.io/otel/sdk/log/internal/x.FilterProcessor`.
Compared to previous version it additionally gives the possibility to filter by resource and instrumentation scope. (#6317)

### Changed

Expand Down
12 changes: 8 additions & 4 deletions log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ type Logger interface {
// Enabled returns whether the Logger emits for the given context and
// param.
//
// The passed param is likely to be a partial record with only the
// bridge-relevant information being provided (e.g a param with only the
// Severity set). If a Logger needs more information than is provided, it
// This is useful for users that want to know if a [Record]
// will be processed or dropped before they perform complex operations to
// construct the [Record].
//
// The passed param is likely to be a partial record information being
// provided (e.g a param with only the Severity set).
// If a Logger needs more information than is provided, it
// is said to be in an indeterminate state (see below).
//
// The returned value will be true when the Logger will emit for the
Expand All @@ -46,7 +50,7 @@ type Logger interface {
// exist (e.g. performance, correctness).
//
// The param should not be held by the implementation. A copy should be
// made if the record needs to be held after the call returns.
// made if the param needs to be held after the call returns.
//
// Implementations of this method need to be safe for a user to call
// concurrently.
Expand Down
3 changes: 0 additions & 3 deletions sdk/log/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,5 @@ at a single endpoint their origin is decipherable.
See [go.opentelemetry.io/otel/log] for more information about
the OpenTelemetry Logs Bridge API.
See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about the
experimental features.
*/
package log // import "go.opentelemetry.io/otel/sdk/log"
18 changes: 8 additions & 10 deletions sdk/log/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Example() {
}

// Use a processor that filters out records based on the provided context.
func ExampleProcessor_filtering() {
func ExampleFilterProcessor() {
// Existing processor that emits telemetry.
var processor log.Processor = log.NewBatchProcessor(nil)

Expand Down Expand Up @@ -84,14 +84,12 @@ type ContextFilterProcessor struct {
log.Processor

lazyFilter sync.Once
// Use the experimental FilterProcessor interface
// (go.opentelemetry.io/otel/sdk/log/internal/x).
filter filter
// Support the FilterProcessor interface for the embedded processor.
filter log.FilterProcessor
}

type filter interface {
Enabled(ctx context.Context, param logapi.EnabledParameters) bool
}
// Compile time check.
var _ log.FilterProcessor = (*ContextFilterProcessor)(nil)

func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error {
if ignoreLogs(ctx) {
Expand All @@ -100,9 +98,9 @@ func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record)
return p.Processor.OnEmit(ctx, record)
}

func (p *ContextFilterProcessor) Enabled(ctx context.Context, param logapi.EnabledParameters) bool {
func (p *ContextFilterProcessor) Enabled(ctx context.Context, param log.EnabledParameters) bool {
p.lazyFilter.Do(func() {
if f, ok := p.Processor.(filter); ok {
if f, ok := p.Processor.(log.FilterProcessor); ok {
p.filter = f
}
})
Expand All @@ -115,7 +113,7 @@ func ignoreLogs(ctx context.Context) bool {
}

// Use a processor which redacts sensitive data from some attributes.
func ExampleProcessor_redact() {
func ExampleProcessor() {
// Existing processor that emits telemetry.
var processor log.Processor = log.NewBatchProcessor(nil)

Expand Down
62 changes: 62 additions & 0 deletions sdk/log/filter_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"context"

"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
)

// FilterProcessor is a [Processor] that knows, and can identify, what [Record]
// it will process or drop when it is passed to [Processor.OnEmit].
//
// This is useful for users that want to know if a [log.Record]
// will be processed or dropped before they perform complex operations to
// construct the [log.Record].
//
// The SDK's Logger.Enabled returns false
// if all the registered Processors implement FilterProcessor
// and they all return false.
//
// Processor implementations that choose to support this by satisfying this
// interface are expected to re-evaluate the [Record] passed to [Processor.OnEmit],
// it is not expected that the caller to OnEmit will use the functionality
// from this interface prior to calling OnEmit.
//
// See the [go.opentelemetry.io/contrib/processors/minsev] for an example use-case.
// It provides a Processor used to filter out [Record]
// that has a [log.Severity] below a threshold.
type FilterProcessor interface {
// Enabled returns whether the Processor will process for the given context
// and param.
//
// The passed param is likely to be a partial record information being
// provided (e.g a param with only the Severity set).
// If a Processor needs more information than is provided, it
// is said to be in an indeterminate state (see below).
//
// The returned value will be true when the Processor will process for the
// provided context and param, and will be false if the Logger will not
// emit. The returned value may be true or false in an indeterminate state.
// An implementation should default to returning true for an indeterminate
// state, but may return false if valid reasons in particular circumstances
// exist (e.g. performance, correctness).
//
// The param should not be held by the implementation. A copy should be
// made if the param needs to be held after the call returns.
//
// Implementations of this method need to be safe for a user to call
// concurrently.
Enabled(ctx context.Context, param EnabledParameters) bool
}

// EnabledParameters represents payload for [FilterProcessor]'s Enabled method.
type EnabledParameters struct {
Resource resource.Resource
InstrumentationScope instrumentation.Scope
Severity log.Severity
}
35 changes: 0 additions & 35 deletions sdk/log/internal/x/README.md

This file was deleted.

47 changes: 0 additions & 47 deletions sdk/log/internal/x/x.go

This file was deleted.

17 changes: 12 additions & 5 deletions sdk/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log/internal/x"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -50,14 +49,22 @@ func (l *logger) Emit(ctx context.Context, r log.Record) {
// processed, true will be returned by default. A value of false will only be
// returned if it can be positively verified that no Processor will process.
func (l *logger) Enabled(ctx context.Context, param log.EnabledParameters) bool {
// If there are more Processors than FilterProcessors we cannot be sure
// that all Processors will drop the record. Therefore, return true.
p := EnabledParameters{
Resource: *l.provider.resource,
InstrumentationScope: l.instrumentationScope,
Severity: param.Severity,
}

// If there are more Processors than FilterProcessors,
// which means not all Processors are FilterProcessors,
// we cannot be sure that all Processors will drop the record.
// Therefore, return true.
//
// If all Processors are FilterProcessors, check if any is enabled.
return len(l.provider.processors) > len(l.provider.fltrProcessors) || anyEnabled(ctx, param, l.provider.fltrProcessors)
return len(l.provider.processors) > len(l.provider.fltrProcessors) || anyEnabled(ctx, p, l.provider.fltrProcessors)
}

func anyEnabled(ctx context.Context, param log.EnabledParameters, fltrs []x.FilterProcessor) bool {
func anyEnabled(ctx context.Context, param EnabledParameters, fltrs []FilterProcessor) bool {
for _, f := range fltrs {
if f.Enabled(ctx, param) {
// At least one Processor will process the Record.
Expand Down
50 changes: 39 additions & 11 deletions sdk/log/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,17 @@ func TestLoggerEnabled(t *testing.T) {
p1 := newFltrProcessor("1", true)
p2WithDisabled := newFltrProcessor("2", false)

emptyResource := resource.Empty()
res := resource.NewSchemaless(attribute.String("key", "value"))

testCases := []struct {
name string
logger *logger
ctx context.Context
expected bool
name string
logger *logger
ctx context.Context
expected bool
expectedP0Params []EnabledParameters
expectedP1Params []EnabledParameters
expectedP2Params []EnabledParameters
}{
{
name: "NoProcessors",
Expand All @@ -241,41 +247,63 @@ func TestLoggerEnabled(t *testing.T) {
logger: newLogger(NewLoggerProvider(
WithProcessor(p0),
WithProcessor(p1),
), instrumentation.Scope{}),
WithResource(res),
), instrumentation.Scope{Name: "scope"}),
ctx: context.Background(),
expected: true,
expectedP0Params: []EnabledParameters{{
Resource: *res,
InstrumentationScope: instrumentation.Scope{Name: "scope"},
}},
expectedP1Params: nil,
},
{
name: "WithDisabledProcessors",
logger: newLogger(NewLoggerProvider(
WithProcessor(p2WithDisabled),
WithResource(emptyResource),
), instrumentation.Scope{}),
ctx: context.Background(),
expected: false,
ctx: context.Background(),
expected: false,
expectedP2Params: []EnabledParameters{{}},
},
{
name: "ContainsDisabledProcessor",
logger: newLogger(NewLoggerProvider(
WithProcessor(p2WithDisabled),
WithProcessor(p0),
WithResource(emptyResource),
), instrumentation.Scope{}),
ctx: context.Background(),
expected: true,
ctx: context.Background(),
expected: true,
expectedP2Params: []EnabledParameters{{}},
expectedP0Params: []EnabledParameters{{}},
},
{
name: "WithNilContext",
logger: newLogger(NewLoggerProvider(
WithProcessor(p0),
WithProcessor(p1),
WithResource(emptyResource),
), instrumentation.Scope{}),
ctx: nil,
expected: true,
ctx: nil,
expected: true,
expectedP0Params: []EnabledParameters{{}},
expectedP1Params: nil,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Clean up the records before the test.
p0.params = nil
p1.params = nil
p2WithDisabled.params = nil

assert.Equal(t, tc.expected, tc.logger.Enabled(tc.ctx, log.EnabledParameters{}))
assert.Equal(t, tc.expectedP0Params, p0.params)
assert.Equal(t, tc.expectedP1Params, p1.params)
assert.Equal(t, tc.expectedP2Params, p2WithDisabled.params)
})
}
}
9 changes: 4 additions & 5 deletions sdk/log/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ import (
// or with other methods. It is the responsibility of the Processor to manage
// this concurrency.
//
// See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about how
// a Processor can be extended to support experimental features.
// See [FilterProcessor] for information about how a Processor can support filtering.
type Processor interface {
// OnEmit is called when a Record is emitted.
//
Expand All @@ -30,11 +29,11 @@ type Processor interface {
// Handler.
//
// The SDK invokes the processors sequentially in the same order as
// they were registered using [WithProcessor].
// they were registered using WithProcessor.
// Implementations may synchronously modify the record so that the changes
// are visible in the next registered processor.
// Notice that [Record] is not concurrent safe. Therefore, asynchronous
// processing may cause race conditions. Use [Record.Clone]
// Notice that Record is not concurrent safe. Therefore, asynchronous
// processing may cause race conditions. Use Record.Clone
// to create a copy that shares no state with the original.
OnEmit(ctx context.Context, record *Record) error

Expand Down
Loading

0 comments on commit 1ee7c79

Please sign in to comment.