Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.15](backport #40075) [filebeat][log] Enable status reporter for log input #40302

Merged
merged 3 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Removed deprecated Cylance from Beats. See <<migrate-from-deprecated-module>> for migration options. {pull}38037[38037]
- Removed deprecated Bluecoat from Beats. See <<migrate-from-deprecated-module>> for migration options. {pull}38037[38037]
- Introduce input/netmetrics and refactor netflow input metrics {pull}38055[38055]
- Update Salesforce module to use new Salesforce input. {pull}37509[37509]
- Tag events that come from a filestream in "take over" mode. {pull}39828[39828]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]
- Enable file ingestion to report detailed status to Elastic Agent {pull}40075[40075]
- Fix filestream's registry GC: registry entries will never be removed if clean_inactive is set to "-1". {pull}40258[40258]

*Heartbeat*

Expand Down
31 changes: 21 additions & 10 deletions filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand All @@ -48,12 +49,13 @@ type Input interface {

// Runner encapsulate the lifecycle of the input
type Runner struct {
config inputConfig
input Input
done chan struct{}
wg *sync.WaitGroup
Once bool
beatDone chan struct{}
config inputConfig
input Input
done chan struct{}
wg *sync.WaitGroup
Once bool
beatDone chan struct{}
statusReporter status.StatusReporter
}

// New instantiates a new Runner
Expand Down Expand Up @@ -83,10 +85,11 @@ func New(
}

context := Context{
States: states,
Done: input.done,
BeatDone: input.beatDone,
Meta: nil,
States: states,
Done: input.done,
BeatDone: input.beatDone,
Meta: nil,
GetStatusReporter: input.GetStatusReporter,
}
var ipt Input
ipt, err = f(conf, connector, context)
Expand Down Expand Up @@ -164,3 +167,11 @@ func (p *Runner) stop() {
func (p *Runner) String() string {
return fmt.Sprintf("input [type=%s]", p.config.Type)
}

func (p *Runner) SetStatusReporter(statusReporter status.StatusReporter) {
p.statusReporter = statusReporter
}

func (p *Runner) GetStatusReporter() status.StatusReporter {
return p.statusReporter
}
20 changes: 20 additions & 0 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand Down Expand Up @@ -78,6 +79,7 @@ type Input struct {
meta map[string]string
stopOnce sync.Once
fileStateIdentifier file.StateIdentifier
getStatusReporter input.GetStatusReporter
}

// NewInput instantiates a new Log
Expand Down Expand Up @@ -157,8 +159,11 @@ func NewInput(
done: context.Done,
meta: meta,
fileStateIdentifier: identifier,
getStatusReporter: context.GetStatusReporter,
}

p.updateStatus(status.Starting, "starting the log input")

// Create empty harvester to check if configs are fine
// TODO: Do config validation instead
_, err = p.createHarvester(logger, file.State{}, nil)
Expand Down Expand Up @@ -224,6 +229,9 @@ func (p *Input) loadStates(states []file.State) error {

// Run runs the input
func (p *Input) Run() {
// Mark it Running for now.
// Any errors encountered in this loop will change state to degraded
p.updateStatus(status.Running, "")
logger := p.logger
logger.Debug("Start next scan")

Expand Down Expand Up @@ -558,6 +566,7 @@ func (p *Input) scan() {
continue
}
if err != nil {
p.updateStatus(status.Degraded, fmt.Sprintf(harvesterErrMsg, newState.Source, err))
logger.Errorf(harvesterErrMsg, newState.Source, err)
}
} else {
Expand All @@ -583,6 +592,7 @@ func (p *Input) harvestExistingFile(logger *logp.Logger, newState file.State, ol
logger.Debugf("Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size())
err := p.startHarvester(logger, newState, oldState.Offset)
if err != nil {
p.updateStatus(status.Degraded, fmt.Sprintf("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err))
logger.Errorf("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
}
return
Expand All @@ -593,6 +603,7 @@ func (p *Input) harvestExistingFile(logger *logp.Logger, newState file.State, ol
logger.Debugf("Old file was truncated. Starting from the beginning: %s, offset: %d, new size: %d ", newState.Source, newState.Offset, newState.Fileinfo.Size())
err := p.startHarvester(logger, newState, 0)
if err != nil {
p.updateStatus(status.Degraded, fmt.Sprintf("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err))
logger.Errorf("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)
}

Expand Down Expand Up @@ -833,3 +844,12 @@ func (p *Input) stopWhenDone() {

p.Wait()
}

func (p *Input) updateStatus(status status.Status, msg string) {
if p.getStatusReporter == nil {
return
}
if reporter := p.getStatusReporter(); reporter != nil {
reporter.UpdateStatus(status, msg)
}
}
12 changes: 8 additions & 4 deletions filebeat/input/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ import (

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

type GetStatusReporter func() status.StatusReporter

type Context struct {
States []file.State
Done chan struct{}
BeatDone chan struct{}
Meta map[string]string
States []file.State
Done chan struct{}
BeatDone chan struct{}
Meta map[string]string
GetStatusReporter GetStatusReporter
}

// Factory is used to register functions creating new Input instances.
Expand Down
205 changes: 205 additions & 0 deletions x-pack/filebeat/tests/integration/status_reporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build integration

package integration

import (
"fmt"
"os"
"path/filepath"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/client/mock"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/libbeat/common/reload"
lbmanagement "github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/x-pack/filebeat/cmd"
"github.com/elastic/beats/v7/x-pack/libbeat/management"
"github.com/elastic/beats/v7/x-pack/libbeat/management/tests"

conf "github.com/elastic/elastic-agent-libs/config"
)

func TestLogStatusReporter(t *testing.T) {
unitOneID := mock.NewID()
unitOutID := mock.NewID()
token := mock.NewID()

tests.InitBeatsForTest(t, cmd.Filebeat())
tmpDir := t.TempDir()
filename := fmt.Sprintf("test-%d", time.Now().Unix())
outPath := filepath.Join(tmpDir, filename)
t.Logf("writing output to file %s", outPath)
err := os.Mkdir(outPath, 0775)
require.NoError(t, err)

/*
* valid input stream, shouldn't raise any error.
*/
inputStream := getInputStream(unitOneID, filepath.Join(tmpDir, "*.log"), 2)
require.NoError(t, os.WriteFile(filepath.Join(tmpDir, "test.log"), []byte("Line1\nLine2\nLine3\n"), 0777))
/*
* try to open an irregular file.
* This should throw "Tried to open non regular file:" and status to degraded
*/
nullDeviceFile := "/dev/null"
if runtime.GOOS == "windows" {
nullDeviceFile = "NUL"
}
inputStreamIrregular := getInputStream(unitOneID, nullDeviceFile, 1)

outputExpectedStream := proto.UnitExpected{
Id: unitOutID,
Type: proto.UnitType_OUTPUT,
ConfigStateIdx: 1,
State: proto.State_HEALTHY,
Config: &proto.UnitExpectedConfig{
Type: "file",
Source: tests.RequireNewStruct(map[string]interface{}{
"type": "file",
"enabled": true,
"path": outPath,
"filename": "beat-out",
"number_of_files": 7,
}),
},
}

observedStates := make(chan *proto.CheckinObserved)
expectedUnits := make(chan []*proto.UnitExpected)
done := make(chan struct{})
// V2 mock server
server := &mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
select {
case observedStates <- observed:
return &proto.CheckinExpected{
Units: <-expectedUnits,
}
case <-done:
return nil
}
},
ActionImpl: func(response *proto.ActionResponse) error {
return nil
},
}
require.NoError(t, server.Start())
defer server.Stop()

// start the client
client := client.NewV2(fmt.Sprintf(":%d", server.Port), token, client.VersionInfo{
Name: "program",
}, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())))

lbmanagement.SetManagerFactory(func(cfg *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) {
c := management.DefaultConfig()
if err := cfg.Unpack(&c); err != nil {
return nil, err
}
return management.NewV2AgentManagerWithClient(c, registry, client, management.WithStopOnEmptyUnits)
})

go func() {
t.Logf("Running beats...")
err := cmd.Filebeat().Execute()
require.NoError(t, err)
}()

scenarios := []struct {
expectedStatus proto.State
nextInputunit *proto.UnitExpected
}{
{
proto.State_HEALTHY,
&inputStreamIrregular,
},
{
proto.State_DEGRADED,
&inputStream,
},
{
proto.State_HEALTHY,
&inputStream,
},
// wait for one more checkin, just to be sure it's healthy
{
proto.State_HEALTHY,
&inputStream,
},
}

timer := time.NewTimer(2 * time.Minute)
id := 0
for id < len(scenarios) {
select {
case observed := <-observedStates:
state := extractState(observed.GetUnits(), unitOneID)
expectedUnits <- []*proto.UnitExpected{
scenarios[id].nextInputunit,
&outputExpectedStream,
}
if state != scenarios[id].expectedStatus {
continue
}
// always ensure that output is healthy
outputState := extractState(observed.GetUnits(), unitOutID)
require.Equal(t, outputState, proto.State_HEALTHY)

timer.Reset(2 * time.Minute)
id++
case <-timer.C:
t.Fatal("timeout waiting for checkin")
default:
}
}
require.Eventually(t, func() bool {
events := tests.ReadLogLines(t, outPath)
return events > 0 // wait until we see one output event
}, 15*time.Second, 1*time.Second)
}

func extractState(units []*proto.UnitObserved, idx string) proto.State {
for _, unit := range units {
if unit.Id == idx {
return unit.GetState()
}
}
return -1
}

func getInputStream(id string, path string, stateIdx int) proto.UnitExpected {
return proto.UnitExpected{
Id: id,
Type: proto.UnitType_INPUT,
ConfigStateIdx: uint64(stateIdx),
State: proto.State_HEALTHY,
Config: &proto.UnitExpectedConfig{
Streams: []*proto.Stream{{
Id: "filebeat/log-default-system",
Source: tests.RequireNewStruct(map[string]interface{}{
"enabled": true,
"symlinks": true,
"type": "log",
"paths": []interface{}{path},
"scan_frequency": "500ms",
}),
}},
Type: "log",
Id: "log-input-test",
Name: "log-1",
Revision: 1,
},
}
}
Loading