Skip to content

Commit

Permalink
[pkg/stanza] Add monitoring metrics for open and harvested files in f…
Browse files Browse the repository at this point in the history
…ileconsumer (#31544)

Blocked on
#31618

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

This PR adds support for filelog receiver to emit observable metrics
about its current state: how many files are opened, and harvested.

**Link to tracking Issue:**
#31256

**Testing:** <Describe what testing was performed and which tests were
added.>

#### How to test this manually

1. Use the following collector config:
```yaml
receivers:
  filelog:
    start_at: end
    include:
    - /var/log/busybox/monitoring/*.log
exporters:
  debug:
    verbosity: detailed
service:
  telemetry:
    metrics:
      level: detailed
      address: ":8888"
  pipelines:
    logs:
      receivers: [filelog]
      exporters: [debug]
      processors: []
```

2. Build and run the collector: `make otelcontribcol &&
./bin/otelcontribcol_linux_amd64 --config
~/otelcol/monitoring_telemetry/config.yaml`
3. Produce some logs:
```console
echo 'some line' >> /var/log/busybox/monitoring/1.log
while true; do echo -e "This is a log line" >> /var/log/busybox/monitoring/2.log; done
```
4. Verify that metrics are produced:
```console
curl 0.0.0.0:8888/metrics | grep _files
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  4002    0  4002    0     0  1954k      0 --:--:-- --:--:-- --:--:-- 1954k
# HELP otelcol_fileconsumer_open_files Number of open files
# TYPE otelcol_fileconsumer_open_files gauge
otelcol_fileconsumer_open_files{service_instance_id="72b4899d-6ce3-41de-a25b-8f0370e22ec1",service_name="otelcontribcol",service_version="0.99.0-dev"} 2
# HELP otelcol_fileconsumer_reading_files Number of open files that are being read
# TYPE otelcol_fileconsumer_reading_files gauge
otelcol_fileconsumer_reading_files{service_instance_id="72b4899d-6ce3-41de-a25b-8f0370e22ec1",service_name="otelcontribcol",service_version="0.99.0-dev"} 1
```

**Documentation:** <Describe the documentation added.>
Added a respective section in Filelog receiver's docs.

---------

Signed-off-by: ChrsMark <chrismarkou92@gmail.com>
  • Loading branch information
ChrsMark authored May 22, 2024
1 parent 483a201 commit 4ba2c7c
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 20 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add_filelog_metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add monitoring metrics for open and harvested files in fileconsumer

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31256]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
24 changes: 24 additions & 0 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"golang.org/x/text/encoding"

Expand All @@ -34,6 +35,8 @@ const (
defaultMaxConcurrentFiles = 1024
defaultEncoding = "utf-8"
defaultPollInterval = 200 * time.Millisecond
openFilesMetric = "fileconsumer/open_files"
readingFilesMetric = "fileconsumer/reading_files"
)

var allowFileDeletion = featuregate.GlobalRegistry().MustRegister(
Expand Down Expand Up @@ -172,6 +175,25 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
t = tracker.NewFileTracker(set, c.MaxConcurrentFiles/2)
}
set.Logger = set.Logger.With(zap.String("component", "fileconsumer"))

meter := set.MeterProvider.Meter("otelcol/fileconsumer")

openFiles, err := meter.Int64UpDownCounter(
openFilesMetric,
metric.WithDescription("Number of open files"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
readingFiles, err := meter.Int64UpDownCounter(
readingFilesMetric,
metric.WithDescription("Number of open files that are being read"),
metric.WithUnit("1"),
)
if err != nil {
return nil, err
}
return &Manager{
set: set,
readerFactory: readerFactory,
Expand All @@ -180,6 +202,8 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
tracker: t,
openFiles: openFiles,
readingFiles: readingFiles,
}, nil
}

Expand Down
32 changes: 24 additions & 8 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
Expand All @@ -37,6 +38,9 @@ type Manager struct {
persister operator.Persister
maxBatches int
maxBatchFiles int

openFiles metric.Int64UpDownCounter
readingFiles metric.Int64UpDownCounter
}

func (m *Manager) Start(persister operator.Persister) error {
Expand Down Expand Up @@ -73,7 +77,7 @@ func (m *Manager) Stop() error {
m.cancel = nil
}
m.wg.Wait()
m.tracker.ClosePreviousFiles()
m.openFiles.Add(context.TODO(), int64(0-m.tracker.ClosePreviousFiles()))
if m.persister != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil {
m.set.Logger.Error("save offsets", zap.Error(err))
Expand Down Expand Up @@ -146,7 +150,7 @@ func (m *Manager) poll(ctx context.Context) {

func (m *Manager) consume(ctx context.Context, paths []string) {
m.set.Logger.Debug("Consuming files", zap.Strings("paths", paths))
m.makeReaders(paths)
m.makeReaders(ctx, paths)

m.readLostFiles(ctx)

Expand All @@ -156,12 +160,14 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
wg.Add(1)
go func(r *reader.Reader) {
defer wg.Done()
m.readingFiles.Add(ctx, 1)
r.ReadToEnd(ctx)
m.readingFiles.Add(ctx, -1)
}(r)
}
wg.Wait()

m.tracker.EndConsume()
m.openFiles.Add(ctx, int64(0-m.tracker.EndConsume()))
}

func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
Expand Down Expand Up @@ -192,7 +198,7 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
// makeReader take a file path, then creates reader,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReaders(paths []string) {
func (m *Manager) makeReaders(ctx context.Context, paths []string) {
for _, path := range paths {
fp, file := m.makeFingerprint(path)
if fp == nil {
Expand All @@ -210,7 +216,7 @@ func (m *Manager) makeReaders(paths []string) {
continue
}

r, err := m.newReader(file, fp)
r, err := m.newReader(ctx, file, fp)
if err != nil {
m.set.Logger.Error("Failed to create reader", zap.Error(err))
continue
Expand All @@ -220,18 +226,28 @@ func (m *Manager) makeReaders(paths []string) {
}
}

func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check previous poll cycle for match
if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}

// Check for closed files for match
if oldMetadata := m.tracker.GetClosedFile(fp); oldMetadata != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
r, err := m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
if err != nil {
return nil, err
}
m.openFiles.Add(ctx, 1)
return r, nil
}

// If we don't match any previously known files, create a new reader from scratch
m.set.Logger.Info("Started watching file", zap.String("path", file.Name()))
return m.readerFactory.NewReader(file, fp)
r, err := m.readerFactory.NewReader(file, fp)
if err != nil {
return nil, err
}
m.openFiles.Add(ctx, 1)
return r, nil
}
2 changes: 2 additions & 0 deletions pkg/stanza/fileconsumer/file_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ OUTER:
lostWG.Add(1)
go func(r *reader.Reader) {
defer lostWG.Done()
m.readingFiles.Add(ctx, 1)
r.ReadToEnd(ctx)
m.readingFiles.Add(ctx, -1)
}(lostReader)
}
lostWG.Wait()
Expand Down
15 changes: 9 additions & 6 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ type Tracker interface {
LoadMetadata(metadata []*reader.Metadata)
CurrentPollFiles() []*reader.Reader
PreviousPollFiles() []*reader.Reader
ClosePreviousFiles()
ClosePreviousFiles() int
EndPoll()
EndConsume()
EndConsume() int
TotalReaders() int
}

Expand Down Expand Up @@ -101,12 +101,13 @@ func (t *fileTracker) PreviousPollFiles() []*reader.Reader {
return t.previousPollFiles.Get()
}

func (t *fileTracker) ClosePreviousFiles() {
func (t *fileTracker) ClosePreviousFiles() (filesClosed int) {
// t.previousPollFiles -> t.knownFiles[0]

for r, _ := t.previousPollFiles.Pop(); r != nil; r, _ = t.previousPollFiles.Pop() {
t.knownFiles[0].Add(r.Close())
filesClosed++
}
return
}

func (t *fileTracker) EndPoll() {
Expand Down Expand Up @@ -155,10 +156,12 @@ func (t *noStateTracker) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Rea
return t.currentPollFiles.Match(fp, fileset.Equal)
}

func (t *noStateTracker) EndConsume() {
func (t *noStateTracker) EndConsume() (filesClosed int) {
for r, _ := t.currentPollFiles.Pop(); r != nil; r, _ = t.currentPollFiles.Pop() {
r.Close()
filesClosed++
}
return
}

func (t *noStateTracker) GetOpenFile(_ *fingerprint.Fingerprint) *reader.Reader { return nil }
Expand All @@ -171,7 +174,7 @@ func (t *noStateTracker) LoadMetadata(_ []*reader.Metadata) {}

func (t *noStateTracker) PreviousPollFiles() []*reader.Reader { return nil }

func (t *noStateTracker) ClosePreviousFiles() {}
func (t *noStateTracker) ClosePreviousFiles() int { return 0 }

func (t *noStateTracker) EndPoll() {}

Expand Down
5 changes: 3 additions & 2 deletions pkg/stanza/fileconsumer/internal/tracker/tracker_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (

// On non-windows platforms, we keep files open between poll cycles so that we can detect
// and read "lost" files, which have been moved out of the matching pattern.
func (t *fileTracker) EndConsume() {
t.ClosePreviousFiles()
func (t *fileTracker) EndConsume() (filesClosed int) {
filesClosed = t.ClosePreviousFiles()

// t.currentPollFiles -> t.previousPollFiles
t.previousPollFiles = t.currentPollFiles
t.currentPollFiles = fileset.New[*reader.Reader](t.maxBatchFiles)
return
}
5 changes: 3 additions & 2 deletions pkg/stanza/fileconsumer/internal/tracker/tracker_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
)

// On windows, we close files immediately after reading because they cannot be moved while open.
func (t *fileTracker) EndConsume() {
func (t *fileTracker) EndConsume() (filesClosed int) {
// t.currentPollFiles -> t.previousPollFiles
t.previousPollFiles = t.currentPollFiles
t.ClosePreviousFiles()
filesClosed = t.ClosePreviousFiles()
t.currentPollFiles = fileset.New[*reader.Reader](t.maxBatchFiles)
return
}
2 changes: 1 addition & 1 deletion pkg/stanza/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
go.opentelemetry.io/collector/featuregate v1.8.0
go.opentelemetry.io/collector/pdata v1.8.0
go.opentelemetry.io/collector/receiver v0.101.0
go.opentelemetry.io/otel/metric v1.26.0
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
Expand Down Expand Up @@ -62,7 +63,6 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.101.0 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.48.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
go.opentelemetry.io/otel/sdk v1.26.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
Expand Down
8 changes: 7 additions & 1 deletion receiver/filelogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,10 @@ Exactly how this information is serialized depends on the type of storage being
### Tracking symlinked files
If the receiver is being used to track a symlinked file and the symlink target is expected to change frequently, make sure
to set the value of the `poll_interval` setting to something lower than the symlink update frequency.
to set the value of the `poll_interval` setting to something lower than the symlink update frequency.
### Telemetry metrics
Enabling [Collector metrics](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/troubleshooting.md#metrics)
will also provide telemetry metrics for the state of the receiver's file consumption.
Specifically, the `otelcol_fileconsumer_open_files` and `otelcol_fileconsumer_reading_files` metrics
are provided.

0 comments on commit 4ba2c7c

Please sign in to comment.