Skip to content

Commit

Permalink
otel: add test for filebeatreceiver delivery guarantees during a coll…
Browse files Browse the repository at this point in the history
…ector restart (#6228)

* otel: add test for filebeatreceiver delivery guarantees for a collector restart

This commit adds a integration test that restarts the collector a couple
times during a log ingestion and confirms that the data was fully written
to elasticsearch.

* check that input lines increase after each restart

* use json for docs, use json parser

* add debug exporter, remove fake component from fixture

* fix typo in json input

* add newlines to input file, fixes for recent fingerprint changes

* assert unique log lines

* fix restart counter logic

* update test to restart collector only once

* remove some leftover code
  • Loading branch information
mauri870 authored Feb 10, 2025
1 parent 059721b commit 346a2fe
Showing 1 changed file with 230 additions and 0 deletions.
230 changes: 230 additions & 0 deletions testing/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"text/template"
"time"
Expand Down Expand Up @@ -1099,3 +1100,232 @@ service:
fixtureWg.Wait()
require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error())
}

func TestFBOtelRestartE2E(t *testing.T) {
// This test ensures that filebeatreceiver is able to deliver logs even
// in advent of a collector restart.
// The input is a file that is being appended to n times during the test.
// It starts a filebeat receiver, waits for some logs and then stops it.
// It then restarts the collector for the remaining of the test.
// At the end it asserts that the unique number of logs in ES is equal to the number of
// lines in the input file. It is likely that there are duplicates due to the restart.
info := define.Require(t, define.Requirements{
Group: Default,
Local: true,
OS: []define.OS{
{Type: define.Windows},
{Type: define.Linux},
{Type: define.Darwin},
},
Stack: &define.Stack{},
})
tmpDir := t.TempDir()

inputFile, err := os.CreateTemp(tmpDir, "input.txt")
require.NoError(t, err, "failed to create temp file to hold data to ingest")
inputFilePath := inputFile.Name()

// Create the otel configuration file
type otelConfigOptions struct {
InputPath string
HomeDir string
ESEndpoint string
ESApiKey string
Index string
}
esEndpoint, err := getESHost()
require.NoError(t, err, "error getting elasticsearch endpoint")
esApiKey, err := createESApiKey(info.ESClient)
require.NoError(t, err, "error creating API key")
require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey)
index := "logs-integration-default"
otelConfigTemplate := `receivers:
filebeatreceiver:
filebeat:
inputs:
- type: filestream
id: filestream-end-to-end
enabled: true
paths:
- {{.InputPath}}
parsers:
- ndjson:
document_id: "id"
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
output:
otelconsumer:
logging:
level: info
selectors:
- '*'
path.home: {{.HomeDir}}
path.logs: {{.HomeDir}}
queue.mem.flush.timeout: 0s
exporters:
debug:
use_internal_logger: false
verbosity: detailed
elasticsearch/log:
endpoints:
- {{.ESEndpoint}}
api_key: {{.ESApiKey}}
logs_index: {{.Index}}
batcher:
enabled: true
flush_timeout: 1s
mapping:
mode: bodymap
service:
pipelines:
logs:
receivers:
- filebeatreceiver
exporters:
- elasticsearch/log
#- debug
`
otelConfigPath := filepath.Join(tmpDir, "otel.yml")
var otelConfigBuffer bytes.Buffer
require.NoError(t,
template.Must(template.New("otelConfig").Parse(otelConfigTemplate)).Execute(&otelConfigBuffer,
otelConfigOptions{
InputPath: inputFilePath,
HomeDir: tmpDir,
ESEndpoint: esEndpoint,
ESApiKey: esApiKey.Encoded,
Index: index,
}))
require.NoError(t, os.WriteFile(otelConfigPath, otelConfigBuffer.Bytes(), 0o600))
t.Cleanup(func() {
if t.Failed() {
contents, err := os.ReadFile(otelConfigPath)
if err != nil {
t.Logf("No otel configuration file at %s", otelConfigPath)
return
}
t.Logf("Contents of otel config file:\n%s\n", string(contents))
}
})

fixture, err := define.NewFixtureFromLocalBuild(t, define.Version(), aTesting.WithAdditionalArgs([]string{"--config", otelConfigPath}))
require.NoError(t, err)

ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute))
defer cancel()
err = fixture.Prepare(ctx)
require.NoError(t, err)

// Write logs to input file
var inputLinesCounter atomic.Int64
var stopInputWriter atomic.Bool
go func() {
for i := 0; ; i++ {
if stopInputWriter.Load() {
break
}

_, err = inputFile.Write([]byte(fmt.Sprintf(`{"id": "%d", "message": "%d"}`, i, i)))
require.NoErrorf(t, err, "failed to write line %d to temp file", i)
_, err = inputFile.Write([]byte("\n"))
require.NoErrorf(t, err, "failed to write newline to temp file")
inputLinesCounter.Add(1)
time.Sleep(100 * time.Millisecond)
}
err = inputFile.Close()
require.NoError(t, err, "failed to close input file")
}()

t.Cleanup(func() {
if t.Failed() {
contents, err := os.ReadFile(inputFilePath)
if err != nil {
t.Logf("no data file to import at %s", inputFilePath)
return
}
t.Logf("contents of import file:\n%s\n", string(contents))
}
})

// Start the collector, ingest some logs and then stop it
stoppedCh := make(chan int, 1)
fCtx, cancel := context.WithDeadline(ctx, time.Now().Add(1*time.Minute))
go func() {
err = fixture.RunOtelWithClient(fCtx)
cancel()
require.True(t, errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled), "unexpected error: %v", err)
close(stoppedCh)
}()

// Make sure we ingested at least 10 logs before stopping the collector
var hits int
require.Eventually(t, func() bool {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+index+"*", map[string]interface{}{
"log.file.path": inputFilePath,
})
require.NoError(t, err)
hits += int(docs.Hits.Total.Value)
return hits >= 10
}, 1*time.Minute, 1*time.Second, "Expected to ingest at least 10 logs, got %d", hits)
cancel()

select {
case <-stoppedCh:
case <-time.After(30 * time.Second):
require.Fail(t, "expected the collector to have stopped")
}

// Stop generating input data
stopInputWriter.Store(true)

// start the collector again for the remaining of the test
var fixtureWg sync.WaitGroup
fixtureWg.Add(1)
fCtx, cancel = context.WithDeadline(ctx, time.Now().Add(5*time.Minute))
go func() {
defer fixtureWg.Done()
err = fixture.RunOtelWithClient(fCtx)
}()

// Make sure all the logs are ingested
actualHits := &struct {
Hits int
UniqueHits int
}{}
require.Eventually(t,
func() bool {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+index+"*", map[string]interface{}{
"log.file.path": inputFilePath,
})
require.NoError(t, err)

actualHits.Hits = docs.Hits.Total.Value

uniqueIngestedLogs := make(map[string]struct{})
for _, hit := range docs.Hits.Hits {
t.Log("Hit: ", hit.Source["message"])
message, found := hit.Source["message"]
require.True(t, found, "expected message field in document %q", hit.Source)
msg, ok := message.(string)
require.True(t, ok, "expected message field to be a string, got %T", message)
if _, found := uniqueIngestedLogs[msg]; found {
t.Logf("log line %q was ingested more than once", message)
}
uniqueIngestedLogs[msg] = struct{}{}
}
actualHits.UniqueHits = len(uniqueIngestedLogs)
return actualHits.UniqueHits == int(inputLinesCounter.Load())
},
20*time.Second, 1*time.Second,
"Expected %d logs, got %v", int(inputLinesCounter.Load()), actualHits)

cancel()
fixtureWg.Wait()
require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error())
}

0 comments on commit 346a2fe

Please sign in to comment.