From b8ed96c549258ab1d887de081a5f27509a978f76 Mon Sep 17 00:00:00 2001 From: Peter Deng Date: Thu, 3 Nov 2022 21:32:12 +0800 Subject: [PATCH] [pkg/stanza] improve performance (#16027) * [pkg/stanza] improve performance --- .chloggen/pkg-stanza-perf.yaml | 16 ++++++++++++++++ pkg/stanza/fileconsumer/reader_test.go | 4 +--- pkg/stanza/fileconsumer/util_test.go | 12 +++++++++--- pkg/stanza/operator/helper/encoding.go | 21 +++++++++++---------- pkg/stanza/operator/input/file/config.go | 4 +++- 5 files changed, 40 insertions(+), 17 deletions(-) create mode 100644 .chloggen/pkg-stanza-perf.yaml diff --git a/.chloggen/pkg-stanza-perf.yaml b/.chloggen/pkg-stanza-perf.yaml new file mode 100644 index 000000000000..33b32789701e --- /dev/null +++ b/.chloggen/pkg-stanza-perf.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "improve performance" + +# One or more tracking issues related to the change +issues: [16028] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index fb6034658518..f42529c9d843 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -105,9 +105,7 @@ func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) { readerConfig: &readerConfig{ fingerprintSize: DefaultFingerprintSize, maxLogSize: defaultMaxLogSize, - emit: func(_ context.Context, attrs *FileAttributes, token []byte) { - emitChan <- &emitParams{attrs, token} - }, + emit: testEmitFunc(emitChan), }, fromBeginning: true, splitterFactory: newMultilineSplitterFactory( diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 9a4255672323..d10a51087f82 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -33,6 +33,14 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) +func testEmitFunc(emitChan chan *emitParams) EmitFunc { + return func(_ context.Context, attrs *FileAttributes, token []byte) { + copied := make([]byte, len(token)) + copy(copied, token) + emitChan <- &emitParams{attrs, copied} + } +} + // includeDir is a builder-like helper for quickly setting up a test config func (c *Config) includeDir(dir string) *Config { c.Include = append(c.Include, fmt.Sprintf("%s/*", dir)) @@ -56,9 +64,7 @@ func buildTestManager(t *testing.T, cfg *Config) (*Manager, chan *emitParams) { } func buildTestManagerWithEmit(t *testing.T, cfg *Config, emitChan chan *emitParams) *Manager { - input, err := cfg.Build(testutil.Logger(t), func(_ context.Context, attrs *FileAttributes, token []byte) { - emitChan <- &emitParams{attrs, token} - }) + input, err := cfg.Build(testutil.Logger(t), testEmitFunc(emitChan)) require.NoError(t, err) return input } diff --git a/pkg/stanza/operator/helper/encoding.go b/pkg/stanza/operator/helper/encoding.go index fe51c09fa161..4bac27c9454d 100644 --- a/pkg/stanza/operator/helper/encoding.go +++ b/pkg/stanza/operator/helper/encoding.go @@ -45,27 +45,28 @@ func (c EncodingConfig) Build() (Encoding, error) { } return Encoding{ - Encoding: enc, + Encoding: enc, + decodeBuffer: make([]byte, 1<<12), + decoder: enc.NewDecoder(), }, nil } type Encoding struct { - Encoding encoding.Encoding + Encoding encoding.Encoding + decoder *encoding.Decoder + decodeBuffer []byte } -// decode converts the bytes in msgBuf to utf-8 from the configured encoding +// Decode converts the bytes in msgBuf to utf-8 from the configured encoding func (e *Encoding) Decode(msgBuf []byte) ([]byte, error) { - decodeBuffer := make([]byte, 1<<12) - decoder := e.Encoding.NewDecoder() - for { - decoder.Reset() - nDst, _, err := decoder.Transform(decodeBuffer, msgBuf, true) + e.decoder.Reset() + nDst, _, err := e.decoder.Transform(e.decodeBuffer, msgBuf, true) if err == nil { - return decodeBuffer[:nDst], nil + return e.decodeBuffer[:nDst], nil } if errors.Is(err, transform.ErrShortDst) { - decodeBuffer = make([]byte, len(decodeBuffer)*2) + e.decodeBuffer = make([]byte, len(e.decodeBuffer)*2) continue } return nil, fmt.Errorf("transform encoding: %w", err) diff --git a/pkg/stanza/operator/input/file/config.go b/pkg/stanza/operator/input/file/config.go index 3cbba06f7b5b..0ed45eeda4cc 100644 --- a/pkg/stanza/operator/input/file/config.go +++ b/pkg/stanza/operator/input/file/config.go @@ -73,7 +73,9 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { } if helper.IsNop(c.Config.Splitter.EncodingConfig.Encoding) { toBody = func(token []byte) interface{} { - return token + copied := make([]byte, len(token)) + copy(copied, token) + return copied } }