Skip to content

Commit

Permalink
[chore][pkg/stanza] Extract checkpointing logic into internal package (
Browse files Browse the repository at this point in the history
…open-telemetry#27775)

This PR moves checkpointing logic into a dedicated package. Since we
only actually save and load `reader.Metadata`, the `fileconsumer`
package is still responsible for pulling `Metadata` out of each `Reader`
prior to saving, and wrapping `Metadata` into a `Reader` when loading.
  • Loading branch information
djaglowski authored Oct 18, 2023
1 parent 6d5205c commit 02c7646
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 84 deletions.
104 changes: 21 additions & 83 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
Expand All @@ -28,9 +27,9 @@ type Manager struct {
readerFactory reader.Factory
fileMatcher *matcher.Matcher
roller roller
persister operator.Persister

pollInterval time.Duration
persister operator.Persister
maxBatches int
maxBatchFiles int

Expand All @@ -43,12 +42,20 @@ type Manager struct {
func (m *Manager) Start(persister operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel

m.persister = persister

// Load offsets from disk
if err := m.loadLastPollFiles(ctx); err != nil {
offsets, err := checkpoint.Load(ctx, m.persister)
if err != nil {
return fmt.Errorf("read known files from database: %w", err)
}
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
for _, offset := range offsets {
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset})
}
}

if _, err := m.fileMatcher.MatchFiles(); err != nil {
m.Warnf("finding files: %v", err)
Expand Down Expand Up @@ -150,7 +157,15 @@ func (m *Manager) consume(ctx context.Context, paths []string) {

m.roller.roll(ctx, readers)
m.saveCurrent(readers)
m.syncLastPollFiles(ctx)

rmds := make([]*reader.Metadata, 0, len(readers))
for _, r := range readers {
rmds = append(rmds, r.Metadata)
}
if err := checkpoint.Save(ctx, m.persister, rmds); err != nil {
m.Errorw("save offsets", zap.Error(err))
}

m.clearCurrentFingerprints()
}

Expand Down Expand Up @@ -263,80 +278,3 @@ func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader.Rea
}
return nil, false
}

const knownFilesKey = "knownFiles"

// syncLastPollFiles syncs the most recent set of files to the database
func (m *Manager) syncLastPollFiles(ctx context.Context) {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

// Encode the number of known files
if err := enc.Encode(len(m.knownFiles)); err != nil {
m.Errorw("Failed to encode known files", zap.Error(err))
return
}

// Encode each known file
for _, fileReader := range m.knownFiles {
if err := enc.Encode(fileReader.Metadata); err != nil {
m.Errorw("Failed to encode known files", zap.Error(err))
}
}

if err := m.persister.Set(ctx, knownFilesKey, buf.Bytes()); err != nil {
m.Errorw("Failed to sync to database", zap.Error(err))
}
}

// syncLastPollFiles loads the most recent set of files to the database
func (m *Manager) loadLastPollFiles(ctx context.Context) error {
encoded, err := m.persister.Get(ctx, knownFilesKey)
if err != nil {
return err
}

if encoded == nil {
return nil
}

dec := json.NewDecoder(bytes.NewReader(encoded))

// Decode the number of entries
var knownFileCount int
if err = dec.Decode(&knownFileCount); err != nil {
return fmt.Errorf("decoding file count: %w", err)
}

if knownFileCount > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
}

// Decode each of the known files
for i := 0; i < knownFileCount; i++ {
rmd := new(reader.Metadata)
if err = dec.Decode(rmd); err != nil {
return err
}

// Migrate readers that used FileAttributes.HeaderAttributes
// This block can be removed in a future release, tentatively v0.90.0
if ha, ok := rmd.FileAttributes["HeaderAttributes"]; ok {
switch hat := ha.(type) {
case map[string]any:
for k, v := range hat {
rmd.FileAttributes[k] = v
}
delete(rmd.FileAttributes, "HeaderAttributes")
default:
m.Errorw("migrate header attributes: unexpected format")
}
}

// This reader won't be used for anything other than metadata reference, so just wrap the metadata
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: rmd})
}

return nil
}
91 changes: 91 additions & 0 deletions pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package checkpoint // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

const knownFilesKey = "knownFiles"

// Save syncs the most recent set of files to the database
func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata) error {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

// Encode the number of known files
if err := enc.Encode(len(rmds)); err != nil {
return fmt.Errorf("encode num files: %w", err)
}

var errs []error
// Encode each known file
for _, rmd := range rmds {
if err := enc.Encode(rmd); err != nil {
errs = append(errs, fmt.Errorf("encode metadata: %w", err))
}
}

if err := persister.Set(ctx, knownFilesKey, buf.Bytes()); err != nil {
errs = append(errs, fmt.Errorf("persist known files: %w", err))
}

return errors.Join(errs...)
}

// Load loads the most recent set of files to the database
func Load(ctx context.Context, persister operator.Persister) ([]*reader.Metadata, error) {
encoded, err := persister.Get(ctx, knownFilesKey)
if err != nil {
return nil, err
}

if encoded == nil {
return []*reader.Metadata{}, nil
}

dec := json.NewDecoder(bytes.NewReader(encoded))

// Decode the number of entries
var knownFileCount int
if err = dec.Decode(&knownFileCount); err != nil {
return nil, fmt.Errorf("decoding file count: %w", err)
}

// Decode each of the known files
var errs []error
rmds := make([]*reader.Metadata, 0, knownFileCount)
for i := 0; i < knownFileCount; i++ {
rmd := new(reader.Metadata)
if err = dec.Decode(rmd); err != nil {
return nil, err
}

// Migrate readers that used FileAttributes.HeaderAttributes
// This block can be removed in a future release, tentatively v0.90.0
if ha, ok := rmd.FileAttributes["HeaderAttributes"]; ok {
switch hat := ha.(type) {
case map[string]any:
for k, v := range hat {
rmd.FileAttributes[k] = v
}
delete(rmd.FileAttributes, "HeaderAttributes")
default:
errs = append(errs, errors.New("migrate header attributes: unexpected format"))
}
}

// This reader won't be used for anything other than metadata reference, so just wrap the metadata
rmds = append(rmds, rmd)
}

return rmds, errors.Join(errs...)
}
149 changes: 149 additions & 0 deletions pkg/stanza/fileconsumer/internal/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package checkpoint

import (
"bytes"
"context"
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

func TestLoadNothing(t *testing.T) {
reloaded, err := Load(context.Background(), testutil.NewUnscopedMockPersister())
assert.NoError(t, err)
assert.Equal(t, []*reader.Metadata{}, reloaded)
}

func TestSaveErr(t *testing.T) {
assert.Error(t, Save(context.Background(),
testutil.NewErrPersister(map[string]error{
"knownFiles": assert.AnError,
}), []*reader.Metadata{}))
}

func TestLoadErr(t *testing.T) {
_, err := Load(context.Background(),
testutil.NewErrPersister(map[string]error{
"knownFiles": assert.AnError,
}))
assert.Error(t, err)
}

func TestNopEncodingDifferentLogSizes(t *testing.T) {
testCases := []struct {
name string
rmds []*reader.Metadata
}{
{
"empty",
[]*reader.Metadata{},
},
{
"one",
[]*reader.Metadata{
{
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")},
Offset: 3,
},
},
},
{
"two",
[]*reader.Metadata{
{
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")},
Offset: 3,
},
{
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("barrrr")},
Offset: 6,
},
},
},
{
"other_fields",
[]*reader.Metadata{
{
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")},
Offset: 3,
FileAttributes: map[string]interface{}{
"hello": "world",
},
},
{
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("barrrr")},
Offset: 6,
HeaderFinalized: true,
},
{
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("ab")},
Offset: 2,
FileAttributes: map[string]interface{}{
"hello2": "world2",
},
HeaderFinalized: true,
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
p := testutil.NewUnscopedMockPersister()
assert.NoError(t, Save(context.Background(), p, tc.rmds))
reloaded, err := Load(context.Background(), p)
assert.NoError(t, err)
assert.Equal(t, tc.rmds, reloaded)
})
}
}

type deprecatedMetadata struct {
reader.Metadata
HeaderAttributes map[string]any
}

func TestMigrateHeaderAttributes(t *testing.T) {
p := testutil.NewUnscopedMockPersister()
saveDeprecated(t, p, &deprecatedMetadata{
Metadata: reader.Metadata{
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")},
Offset: 3,
FileAttributes: map[string]any{
"HeaderAttributes": map[string]any{
"hello": "world",
},
},
},
})
reloaded, err := Load(context.Background(), p)
assert.NoError(t, err)
assert.Equal(t, []*reader.Metadata{
{
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")},
Offset: 3,
FileAttributes: map[string]interface{}{
"hello": "world",
},
},
}, reloaded)

}

func saveDeprecated(t *testing.T, persister operator.Persister, dep *deprecatedMetadata) {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
require.NoError(t, enc.Encode(1))
require.NoError(t, enc.Encode(dep))
require.NoError(t, persister.Set(context.Background(), knownFilesKey, buf.Bytes()))
}
Loading

0 comments on commit 02c7646

Please sign in to comment.