Skip to content

Commit

Permalink
[chore][fileconsumer/archive] - add signature methods for archive fea…
Browse files Browse the repository at this point in the history
…ture (#35098)

Description:

This PR introduces the `archive` method to store files older than 3 poll
cycles.
The core logic for reading will be introduced in a follow up PR.


Link to tracking Issue:
#32727

Testing: To be included in future PRs

Documentation: To be included in future PRs
  • Loading branch information
VihasMakwana authored Oct 9, 2024
1 parent 003780f commit 19ddd21
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 16 deletions.
11 changes: 2 additions & 9 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand Down Expand Up @@ -87,6 +86,7 @@ type Config struct {
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"`
Compression string `mapstructure:"compression,omitempty"`
PollsToArchive int `mapstructure:"-"` // TODO: activate this config once archiving is set up
AcquireFSLock bool `mapstructure:"acquire_fs_lock,omitempty"`
}

Expand Down Expand Up @@ -174,13 +174,6 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
AcquireFSLock: c.AcquireFSLock,
}

var t tracker.Tracker
if o.noTracking {
t = tracker.NewNoStateTracker(set, c.MaxConcurrentFiles/2)
} else {
t = tracker.NewFileTracker(set, c.MaxConcurrentFiles/2)
}

telemetryBuilder, err := metadata.NewTelemetryBuilder(set)
if err != nil {
return nil, err
Expand All @@ -192,8 +185,8 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
tracker: t,
telemetryBuilder: telemetryBuilder,
noTracking: o.noTracking,
}, nil
}

Expand Down
29 changes: 24 additions & 5 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ type Manager struct {
readerFactory reader.Factory
fileMatcher *matcher.Matcher
tracker tracker.Tracker
noTracking bool

pollInterval time.Duration
persister operator.Persister
maxBatches int
maxBatchFiles int
pollInterval time.Duration
persister operator.Persister
maxBatches int
maxBatchFiles int
pollsToArchive int

telemetryBuilder *metadata.TelemetryBuilder
}
Expand All @@ -47,6 +49,9 @@ func (m *Manager) Start(persister operator.Persister) error {
m.set.Logger.Warn("finding files", zap.Error(err))
}

// instantiate the tracker
m.instantiateTracker(persister)

if persister != nil {
m.persister = persister
offsets, err := checkpoint.Load(ctx, m.persister)
Expand All @@ -58,6 +63,8 @@ func (m *Manager) Start(persister operator.Persister) error {
m.readerFactory.FromBeginning = true
m.tracker.LoadMetadata(offsets)
}
} else if m.pollsToArchive > 0 {
m.set.Logger.Error("archiving is not supported in memory, please use a storage extension")
}

// Start polling goroutine
Expand All @@ -73,7 +80,9 @@ func (m *Manager) Stop() error {
m.cancel = nil
}
m.wg.Wait()
m.telemetryBuilder.FileconsumerOpenFiles.Add(context.TODO(), int64(0-m.tracker.ClosePreviousFiles()))
if m.tracker != nil {
m.telemetryBuilder.FileconsumerOpenFiles.Add(context.TODO(), int64(0-m.tracker.ClosePreviousFiles()))
}
if m.persister != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil {
m.set.Logger.Error("save offsets", zap.Error(err))
Expand Down Expand Up @@ -261,3 +270,13 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
return r, nil
}

func (m *Manager) instantiateTracker(persister operator.Persister) {
var t tracker.Tracker
if m.noTracking {
t = tracker.NewNoStateTracker(m.set, m.maxBatchFiles)
} else {
t = tracker.NewFileTracker(m.set, m.maxBatchFiles, m.pollsToArchive, persister)
}
m.tracker = t
}
6 changes: 5 additions & 1 deletion pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ const knownFilesKey = "knownFiles"

// Save syncs the most recent set of files to the database
func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata) error {
return SaveKey(ctx, persister, rmds, knownFilesKey)
}

func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string) error {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

Expand All @@ -34,7 +38,7 @@ func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Meta
}
}

if err := persister.Set(ctx, knownFilesKey, buf.Bytes()); err != nil {
if err := persister.Set(ctx, key, buf.Bytes()); err != nil {
errs = append(errs, fmt.Errorf("persist known files: %w", err))
}

Expand Down
48 changes: 47 additions & 1 deletion pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

// Interface for tracking files that are being consumed.
Expand Down Expand Up @@ -37,9 +42,16 @@ type fileTracker struct {
currentPollFiles *fileset.Fileset[*reader.Reader]
previousPollFiles *fileset.Fileset[*reader.Reader]
knownFiles []*fileset.Fileset[*reader.Metadata]

// persister is to be used to store offsets older than 3 poll cycles.
// These offsets will be stored on disk
persister operator.Persister

pollsToArchive int
archiveIndex int
}

func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int) Tracker {
func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker {
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles)
Expand All @@ -51,6 +63,9 @@ func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int) Tracker
currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
previousPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
knownFiles: knownFiles,
pollsToArchive: pollsToArchive,
persister: persister,
archiveIndex: 0,
}
}

Expand Down Expand Up @@ -113,6 +128,9 @@ func (t *fileTracker) ClosePreviousFiles() (filesClosed int) {
func (t *fileTracker) EndPoll() {
// shift the filesets at end of every poll() call
// t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2]

// Instead of throwing it away, archive it.
t.archive(t.knownFiles[2])
copy(t.knownFiles[1:], t.knownFiles)
t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles)
}
Expand All @@ -125,6 +143,34 @@ func (t *fileTracker) TotalReaders() int {
return total
}

func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
// We make use of a ring buffer, where each set of files is stored under a specific index.
// Instead of discarding knownFiles[2], write it to the next index and eventually roll over.
// Separate storage keys knownFilesArchive0, knownFilesArchive1, ..., knownFilesArchiveN, roll over back to knownFilesArchive0

// Archiving: ┌─────────────────────on-disk archive─────────────────────────┐
// | ┌───┐ ┌───┐ ┌──────────────────┐ |
// index | ▶ │ 0 │ ▶ │ 1 │ ▶ ... ▶ │ polls_to_archive │ |
// | ▲ └───┘ └───┘ └──────────────────┘ |
// | ▲ ▲ ▼ |
// | ▲ │ Roll over overriting older offsets, if any ◀ |
// └──────│──────────────────────────────────────────────────────┘
// │
// │
// │
// start
// index

if t.pollsToArchive <= 0 || t.persister == nil {
return
}
key := fmt.Sprintf("knownFiles%d", t.archiveIndex)
if err := checkpoint.SaveKey(context.Background(), t.persister, metadata.Get(), key); err != nil {
t.set.Logger.Error("error faced while saving to the archive", zap.Error(err))
}
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
}

// noStateTracker only tracks the current polled files. Once the poll is
// complete and telemetry is consumed, the tracked files are closed. The next
// poll will create fresh readers with no previously tracked offsets.
Expand Down
3 changes: 3 additions & 0 deletions pkg/stanza/fileconsumer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"go.opentelemetry.io/collector/component/componenttest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

func testManager(t *testing.T, cfg *Config, opts ...Option) (*Manager, *emittest.Sink) {
Expand All @@ -20,6 +22,7 @@ func testManager(t *testing.T, cfg *Config, opts ...Option) (*Manager, *emittest
func testManagerWithSink(t *testing.T, cfg *Config, sink *emittest.Sink, opts ...Option) *Manager {
set := componenttest.NewNopTelemetrySettings()
input, err := cfg.Build(set, sink.Callback, opts...)
input.tracker = tracker.NewFileTracker(set, cfg.MaxBatches, cfg.PollsToArchive, testutil.NewUnscopedMockPersister())
require.NoError(t, err)
t.Cleanup(func() { input.tracker.ClosePreviousFiles() })
return input
Expand Down

0 comments on commit 19ddd21

Please sign in to comment.