Skip to content

Commit

Permalink
Move log.Processor.Enabled to independent FilterProcessor interfa…
Browse files Browse the repository at this point in the history
…ced type (#5692)

Closes #5425 

Our current log `Processor` interface contains more functionality than
the [OTel
spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/sdk.md#logrecordprocessor-operations).
The additional functionality allows processors to report back to the API
if a Record should be constructed and emitted or not, which is quite
helpful[^1][^2][^3][^4][^5].

This removes the `Enabled` method from the `Processor` type. It adds
this functionality a new optional and experimental `FilterProcessor`
interface type. The logger and provider are updated to check for this
optional interface to be implemented with the configured processors and
uses them to back the `Logger.Enabled` method, preserving existing
functionality.

By making this change:

- The `Processor` interface is now compliant with the OTel spec and does
not contain any additional unspecified behavior.
- All `Processor` implementations are no longer required to implement an
`Enabled` method. The default, when they do not implement this method,
is to assume they are enabled.

### Benchmark

```terminal
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/sdk/log
cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz
                │   old.txt    │              new7.txt               │
                │    sec/op    │   sec/op     vs base                │
LoggerEnabled-8   133.30n ± 3%   32.36n ± 3%  -75.72% (p=0.000 n=10)

                │  old.txt   │            new7.txt            │
                │    B/op    │    B/op     vs base            │
LoggerEnabled-8   0.000 ± 0%   0.000 ± 0%  ~ (p=1.000 n=10) ¹
¹ all samples are equal

                │  old.txt   │            new7.txt            │
                │ allocs/op  │ allocs/op   vs base            │
LoggerEnabled-8   0.000 ± 0%   0.000 ± 0%  ~ (p=1.000 n=10) ¹
¹ all samples are equal
```

This is a significant performance improvement due to the `Record` no
longer being converted from the API version to the SDK version.

[^1]: https://pkg.go.dev/go.opentelemetry.io/contrib/processors/minsev
[^2]:
https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#BatchProcessor.Enabled
[^3]:
https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#SimpleProcessor.Enabled
[^4]:
https://github.com/open-telemetry/opentelemetry-go-contrib/blob/af75717ac4fb3ba13eaea83b88301723122060cf/bridges/otelslog/handler.go#L206-L211
[^5]:
https://github.com/open-telemetry/opentelemetry-go-contrib/blob/d0309ddd8c5714af1cd9dbe8b39b7e8f10485679/bridges/otelzap/core.go#L142-L146

---------

Co-authored-by: Robert Pająk <pellared@hotmail.com>
Co-authored-by: Sam Xie <sam@samxie.me>
  • Loading branch information
3 people authored Aug 22, 2024
1 parent fe6c67e commit 002c0a4
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 62 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ The next release will require at least [Go 1.22].
- Zero value of `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` no longer panics. (#5665)
- Add `Walk` function to `TraceState` in `go.opentelemetry.io/otel/trace` to iterate all the key-value pairs. (#5651)
- Bridge the trace state in `go.opentelemetry.io/otel/bridge/opencensus`. (#5651)
- The `FilterProcessor` interface type is added in `go.opentelemetry.io/otel/sdk/log/internal/x`.
This is an optional and experimental interface that log `Processor`s can implement to instruct the `Logger` if a `Record` will be processed or not.
It replaces the existing `Enabled` method that is removed from the `Processor` interface itself.
It does not fall within the scope of the OpenTelemetry Go versioning and stability [policy](./VERSIONING.md) and it may be changed in backwards incompatible ways or removed in feature releases. (#5692)
- Support [Go 1.23]. (#5720)

### Changed
Expand All @@ -30,6 +34,8 @@ The next release will require at least [Go 1.22].
- `SimpleProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log` now returns `false` if the exporter is `nil`. (#5665)
- Update the concurrency requirements of `Exporter` in `go.opentelemetry.io/otel/sdk/log`. (#5666)
- `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` synchronizes `OnEmit` calls. (#5666)
- The `Processor` interface in `go.opentelemetry.io/otel/sdk/log` no longer includes the `Enabled` method.
See the `FilterProcessor` interface type added in `go.opentelemetry.io/otel/sdk/log/internal/x` to continue providing this functionality. (#5692)
- The `SimpleProcessor` type in `go.opentelemetry.io/otel/sdk/log` is no longer comparable. (#5693)
- The `BatchProcessor` type in `go.opentelemetry.io/otel/sdk/log` is no longer comparable. (#5693)
- `NewMemberRaw`, `NewKeyProperty` and `NewKeyValuePropertyRaw` in `go.opentelemetry.io/otel/baggage` allow UTF-8 string in key. (#5132)
Expand All @@ -50,6 +56,11 @@ The next release will require at least [Go 1.22].
- Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their corresponding environment variables in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` and `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#5641)
- Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their corresponding environment variables in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#5650)

### Removed

- The `Enabled` method of the `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` is removed. (#5692)
- The `Enabled` method of the `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` is removed. (#5692)

<!-- Released section -->
<!-- Don't change this section unless doing release -->

Expand Down
5 changes: 0 additions & 5 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,6 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error {
return nil
}

// Enabled returns if b is enabled.
func (b *BatchProcessor) Enabled(context.Context, Record) bool {
return !b.stopped.Load() && b.q != nil
}

// Shutdown flushes queued log records and shuts down the decorated exporter.
func (b *BatchProcessor) Shutdown(ctx context.Context) error {
if b.stopped.Swap(true) || b.q == nil {
Expand Down
9 changes: 0 additions & 9 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func TestEmptyBatchConfig(t *testing.T) {
ctx := context.Background()
record := new(Record)
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
assert.False(t, bp.Enabled(ctx, *record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
})
Expand Down Expand Up @@ -270,14 +269,6 @@ func TestBatchProcessor(t *testing.T) {
assert.Equal(t, 3, e.ExportN())
})

t.Run("Enabled", func(t *testing.T) {
b := NewBatchProcessor(defaultNoopExporter)
assert.True(t, b.Enabled(ctx, Record{}))

_ = b.Shutdown(ctx)
assert.False(t, b.Enabled(ctx, Record{}))
})

t.Run("Shutdown", func(t *testing.T) {
t.Run("Error", func(t *testing.T) {
e := newTestExporter(assert.AnError)
Expand Down
3 changes: 3 additions & 0 deletions sdk/log/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@ at a single endpoint their origin is decipherable.
See [go.opentelemetry.io/otel/log] for more information about
the OpenTelemetry Logs Bridge API.
See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about the
experimental features.
*/
package log // import "go.opentelemetry.io/otel/sdk/log"
19 changes: 17 additions & 2 deletions sdk/log/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"strings"
"sync"

logapi "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/global"
Expand Down Expand Up @@ -58,7 +59,7 @@ func ExampleProcessor_filtering() {
// Wrap the processor so that it ignores processing log records
// when a context deriving from WithIgnoreLogs is passed
// to the logging methods.
processor = &ContextFilterProcessor{processor}
processor = &ContextFilterProcessor{Processor: processor}

// The created processor can then be registered with
// the OpenTelemetry Logs SDK using the WithProcessor option.
Expand All @@ -81,6 +82,15 @@ func WithIgnoreLogs(ctx context.Context) context.Context {
// [WithIgnoreLogs] is passed to its methods.
type ContextFilterProcessor struct {
log.Processor

lazyFilter sync.Once
// Use the experimental FilterProcessor interface
// (go.opentelemetry.io/otel/sdk/log/internal/x).
filter filter
}

type filter interface {
Enabled(ctx context.Context, record log.Record) bool
}

func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error {
Expand All @@ -91,7 +101,12 @@ func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record)
}

func (p *ContextFilterProcessor) Enabled(ctx context.Context, record log.Record) bool {
return !ignoreLogs(ctx) && p.Processor.Enabled(ctx, record)
p.lazyFilter.Do(func() {
if f, ok := p.Processor.(filter); ok {
p.filter = f
}
})
return !ignoreLogs(ctx) && (p.filter == nil || p.filter.Enabled(ctx, record))
}

func ignoreLogs(ctx context.Context) bool {
Expand Down
35 changes: 35 additions & 0 deletions sdk/log/internal/x/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Experimental Features

The Logs SDK contains features that have not yet stabilized.
These features are added to the OpenTelemetry Go Logs SDK prior to stabilization so that users can start experimenting with them and provide feedback.

These feature may change in backwards incompatible ways as feedback is applied.
See the [Compatibility and Stability](#compatibility-and-stability) section for more information.

## Features

- [Filter Processors](#filter-processor)

### Filter Processor

Users of logging libraries often want to know if a log `Record` will be processed or dropped before they perform complex operations to construct the `Record`.
The [`Logger`] in the Logs Bridge API provides the `Enabled` method for just this use-case.
In order for the Logs Bridge SDK to effectively implement this API, it needs to be known if the registered [`Processor`]s are enabled for the `Record` within a context.
A [`Processor`] that knows, and can identify, what `Record` it will process or drop when it is passed to `OnEmit` can communicate this to the SDK `Logger` by implementing the `FilterProcessor`.

By default, the SDK `Logger.Enabled` will return true when called.
Only if all the registered [`Processor`]s implement `FilterProcessor` and they all return `false` will `Logger.Enabled` return `false`.

See the [`minsev`] [`Processor`] for an example use-case.
It is used to filter `Record`s out that a have a `Severity` below a threshold.

[`Logger`]: https://pkg.go.dev/go.opentelemetry.io/otel/log#Logger
[`Processor`]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#Processor
[`minsev`]: https://pkg.go.dev/go.opentelemetry.io/contrib/processors/minsev

## Compatibility and Stability

Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../VERSIONING.md).
These features may be removed or modified in successive version releases, including patch versions.

When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release.
46 changes: 46 additions & 0 deletions sdk/log/internal/x/x.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package x contains support for Logs SDK experimental features.
package x // import "go.opentelemetry.io/otel/sdk/log/internal/x"

import (
"context"

"go.opentelemetry.io/otel/log"
)

// FilterProcessor is a [Processor] that knows, and can identify, what
// [log.Record] it will process or drop when it is passed to OnEmit.
//
// This is useful for users of logging libraries that want to know if a [log.Record]
// will be processed or dropped before they perform complex operations to
// construct the [log.Record].
//
// [Processor] implementations that choose to support this by satisfying this
// interface are expected to re-evaluate the [log.Record]s passed to OnEmit, it is
// not expected that the caller to OnEmit will use the functionality from this
// interface prior to calling OnEmit.
//
// This should only be implemented for [Processor]s that can make reliable
// enough determination of this prior to processing a [log.Record] and where
// the result is dynamic.
//
// [Processor]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#Processor
type FilterProcessor interface {
// Enabled returns whether the Processor will process for the given context
// and record.
//
// The passed record is likely to be a partial record with only the
// bridge-relevant information being provided (e.g a record with only the
// Severity set). If a Logger needs more information than is provided, it
// is said to be in an indeterminate state (see below).
//
// The returned value will be true when the Processor will process for the
// provided context and record, and will be false if the Processor will not
// process. An implementation should default to returning true for an
// indeterminate state.
//
// Implementations should not modify the record.
Enabled(ctx context.Context, record log.Record) bool
}
26 changes: 22 additions & 4 deletions sdk/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log/internal/x"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -42,13 +43,30 @@ func (l *logger) Emit(ctx context.Context, r log.Record) {
}
}

func (l *logger) Enabled(ctx context.Context, r log.Record) bool {
newRecord := l.newRecord(ctx, r)
for _, p := range l.provider.processors {
if enabled := p.Enabled(ctx, newRecord); enabled {
// Enabled returns true if at least one Processor held by the LoggerProvider
// that created the logger will process the record for the provided context.
//
// If it is not possible to definitively determine the record will be
// processed, true will be returned by default. A value of false will only be
// returned if it can be positively verified that no Processor will process the
// record.
func (l *logger) Enabled(ctx context.Context, record log.Record) bool {
fltrs := l.provider.filterProcessors()
// If there are more Processors than FilterProcessors we cannot be sure
// that all Processors will drop the record. Therefore, return true.
//
// If all Processors are FilterProcessors, check if any is enabled.
return len(l.provider.processors) > len(fltrs) || anyEnabled(ctx, record, fltrs)
}

func anyEnabled(ctx context.Context, r log.Record, fltrs []x.FilterProcessor) bool {
for _, f := range fltrs {
if f.Enabled(ctx, r) {
// At least one Processor will process the Record.
return true
}
}
// No Processor will process the record
return false
}

Expand Down
26 changes: 24 additions & 2 deletions sdk/log/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,9 @@ func TestLoggerEmit(t *testing.T) {
}

func TestLoggerEnabled(t *testing.T) {
p0, p1, p2WithDisabled := newProcessor("0"), newProcessor("1"), newProcessor("2")
p2WithDisabled.enabled = false
p0 := newFltrProcessor("0", true)
p1 := newFltrProcessor("1", true)
p2WithDisabled := newFltrProcessor("2", false)

testCases := []struct {
name string
Expand Down Expand Up @@ -273,3 +274,24 @@ func TestLoggerEnabled(t *testing.T) {
})
}
}

func BenchmarkLoggerEnabled(b *testing.B) {
provider := NewLoggerProvider(
WithProcessor(newFltrProcessor("0", false)),
WithProcessor(newFltrProcessor("1", true)),
)
logger := provider.Logger("BenchmarkLoggerEnabled")
ctx, r := context.Background(), log.Record{}
r.SetSeverityText("test")

var enabled bool

b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
enabled = logger.Enabled(ctx, r)
}

_ = enabled
}
24 changes: 3 additions & 21 deletions sdk/log/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
// Any of the Processor's methods may be called concurrently with itself
// or with other methods. It is the responsibility of the Processor to manage
// this concurrency.
//
// See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about how
// a Processor can be extended to support experimental features.
type Processor interface {
// OnEmit is called when a Record is emitted.
//
Expand All @@ -35,27 +38,6 @@ type Processor interface {
// to create a copy that shares no state with the original.
OnEmit(ctx context.Context, record *Record) error

// Enabled returns whether the Processor will process for the given context
// and record.
//
// The passed record is likely to be a partial record with only the
// bridge-relevant information being provided (e.g a record with only the
// Severity set). If a Logger needs more information than is provided, it
// is said to be in an indeterminate state (see below).
//
// The returned value will be true when the Processor will process for the
// provided context and record, and will be false if the Processor will not
// process. The returned value may be true or false in an indeterminate
// state. An implementation should default to returning true for an
// indeterminate state, but may return false if valid reasons in particular
// circumstances exist (e.g. performance, correctness).
//
// The SDK invokes the processors sequentially in the same order as
// they were registered using [WithProcessor] until any processor returns true.
//
// Implementations should not modify the record.
Enabled(ctx context.Context, record Record) bool

// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
//
Expand Down
15 changes: 15 additions & 0 deletions sdk/log/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/log/noop"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log/internal/x"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand Down Expand Up @@ -65,6 +66,9 @@ type LoggerProvider struct {
attributeCountLimit int
attributeValueLengthLimit int

fltrProcessorsOnce sync.Once
fltrProcessors []x.FilterProcessor

loggersMu sync.Mutex
loggers map[instrumentation.Scope]*logger

Expand Down Expand Up @@ -92,6 +96,17 @@ func NewLoggerProvider(opts ...LoggerProviderOption) *LoggerProvider {
}
}

func (p *LoggerProvider) filterProcessors() []x.FilterProcessor {
p.fltrProcessorsOnce.Do(func() {
for _, proc := range p.processors {
if f, ok := proc.(x.FilterProcessor); ok {
p.fltrProcessors = append(p.fltrProcessors, f)
}
}
})
return p.fltrProcessors
}

// Logger returns a new [log.Logger] with the provided name and configuration.
//
// If p is shut down, a [noop.Logger] instance is returned.
Expand Down
Loading

0 comments on commit 002c0a4

Please sign in to comment.