Skip to content

Commit

Permalink
Cherry-pick #19632 to 7.x: Composable ACKer (#19742)
Browse files Browse the repository at this point in the history
This change replaces the ACK handler functions with a single interface
that makes it easier to combine ACK handlers.
The global ACK handler is removed from the pipeline, requiring Beats to
wrap and compose per input ACK handlers with their own ones.

Although the PR is quite big, the main difference is that the `ACKCount`, `ACKEvents`, and `ACKLastEvents` handlers have been replaced by a single interface (`beat.ACKer`). The original ACKer implementations from `libbeat/publisher/pipeline/acker.go` and `libbeat/publisher/pipeline/client_acker.go` have been moved `libbeat/common/acker`. The former private implementation is now exposed as Helpers for writing and combining ACK handlers. Support for global ACK handlers has been removed. The `acker.Combine` and `acker.ConnectionOnly` are the only new additions to the code base.

The global ACK handler support was introduced for filebeat, that did require some support for combine events from multiple inputs before applying state updates. With the introduction of the v2 input API this requirement will go away, as per input type managers are responsible for handling state update and ACKs.

In order to run old and new architecture in parallel, we need to combine ACK handling from input managers, existing input, custom registrar ACKer in filebeat, and event counting support (also via ACK handling) for shutdown. Exposing the interface and providing combinators (acker.Combine) for merging ACK handlers into one helps with the integration.

The v2 Input API gives implementors more flexibility in how to handle event publishing, coordination, and state handling shall be implemented. With the original ACK support the callbacks have been deregistered the moment inputs are stopped automatically. But for cursor based inputs we need to continue handling ACKs, even after the input is gone. The interface and helpers provide greater control over ACK handling after shutdown, which is required for the journald, winlog, and file/log inputs.

(cherry picked from commit bb89344)
  • Loading branch information
Steffen Siering authored Jul 8, 2020
1 parent 29ef3bd commit a36910a
Show file tree
Hide file tree
Showing 34 changed files with 955 additions and 1,480 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- `management.ConfigManager` has been renamed to `management.Manager`. {pull}19114[19114]
- `UpdateStatus` has been added to the `management.Manager` interface. {pull}19114[19114]
- Remove `common.MapStrPointer` parameter from `cfgfile.Runnerfactory` interface. {pull}19135[19135]
- Replace `ACKCount`, `ACKEvents`, and `ACKLastEvent` callbacks with `ACKHandler` and interface in `beat.ClientConfig`. {pull}19632[19632]
- Remove global ACK handler support via `SetACKHandler` from publisher pipeline. {pull}19632[19632]

==== Bugfixes

Expand Down
64 changes: 30 additions & 34 deletions filebeat/beater/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,11 @@ package beater

import (
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/logp"
)

// eventAcker handles publisher pipeline ACKs and forwards
// them to the registrar or directly to the stateless logger.
type eventACKer struct {
stateful statefulLogger
stateless statelessLogger
log *logp.Logger
}

type statefulLogger interface {
Published(states []file.State)
}
Expand All @@ -38,35 +32,37 @@ type statelessLogger interface {
Published(c int) bool
}

func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer {
return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")}
}
// eventAcker handles publisher pipeline ACKs and forwards
// them to the registrar or directly to the stateless logger.
func eventACKer(statelessOut statelessLogger, statefulOut statefulLogger) beat.ACKer {
log := logp.NewLogger("acker")

func (a *eventACKer) ackEvents(data []interface{}) {
stateless := 0
states := make([]file.State, 0, len(data))
for _, datum := range data {
if datum == nil {
stateless++
continue
}
return acker.EventPrivateReporter(func(_ int, data []interface{}) {
stateless := 0
states := make([]file.State, 0, len(data))
for _, datum := range data {
if datum == nil {
stateless++
continue
}

st, ok := datum.(file.State)
if !ok {
stateless++
continue
}
st, ok := datum.(file.State)
if !ok {
stateless++
continue
}

states = append(states, st)
}
states = append(states, st)
}

if len(states) > 0 {
a.log.Debugw("stateful ack", "count", len(states))
a.stateful.Published(states)
}
if len(states) > 0 {
log.Debugw("stateful ack", "count", len(states))
statefulOut.Published(states)
}

if stateless > 0 {
a.log.Debugw("stateless ack", "count", stateless)
a.stateless.Published(stateless)
}
if stateless > 0 {
log.Debugw("stateless ack", "count", stateless)
statelessOut.Published(stateless)
}
})
}
9 changes: 7 additions & 2 deletions filebeat/beater/acker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/beat"
)

type mockStatefulLogger struct {
Expand Down Expand Up @@ -78,9 +79,13 @@ func TestACKer(t *testing.T) {
sl := &mockStatelessLogger{}
sf := &mockStatefulLogger{}

h := newEventACKer(sl, sf)
h := eventACKer(sl, sf)

h.ackEvents(test.data)
for _, datum := range test.data {
h.AddEvent(beat.Event{Private: datum}, true)
}

h.ACKEvents(len(test.data))
assert.Equal(t, test.stateless, sl.count)
assert.Equal(t, test.stateful, sf.states)
})
Expand Down
24 changes: 14 additions & 10 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,16 +322,20 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
// Make sure all events that were published in
registrarChannel := newRegistrarLogger(registrar)

err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
})
if err != nil {
logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
return err
}

fb.pipeline = pipetool.WithDefaultGuarantees(b.Publisher, beat.GuaranteedSend)
fb.pipeline = withPipelineEventCounter(fb.pipeline, wgEvents)
// setup event counting for startup and a global common ACKer, such that all events will be
// routed to the reigstrar after they've been ACKed.
// Events with Private==nil or the type of private != file.State are directly
// forwarded to `finishedLogger`. Events from the `logs` input will first be forwarded
// to the registrar via `registrarChannel`, which finally forwards the events to finishedLogger as well.
// The finishedLogger decrements the counters in wgEvents after all events have been securely processed
// by the registry.
fb.pipeline = withPipelineEventCounter(b.Publisher, wgEvents)
fb.pipeline = pipetool.WithACKer(fb.pipeline, eventACKer(finishedLogger, registrarChannel))

// Filebeat by default required infinite retry. Let's configure this for all
// inputs by default. Inputs (and InputController) can overwrite the sending
// guarantees explicitly when connecting with the pipeline.
fb.pipeline = pipetool.WithDefaultGuarantees(fb.pipeline, beat.GuaranteedSend)

outDone := make(chan struct{}) // outDone closes down all active pipeline connections
pipelineConnector := channel.NewOutletFactory(outDone).Create
Expand Down
15 changes: 9 additions & 6 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/backoff"
"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -69,13 +70,15 @@ func NewInput(
}

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
ACKEvents: func(events []interface{}) {
for _, event := range events {
if meta, ok := event.(eventMeta); ok {
meta.handler.ack(meta.message)
ACKHandler: acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, events []interface{}) {
for _, event := range events {
if meta, ok := event.(eventMeta); ok {
meta.handler.ack(meta.message)
}
}
}
},
}),
),
CloseRef: doneChannelContext(inputContext.Done),
WaitClose: config.WaitClose,
})
Expand Down
11 changes: 6 additions & 5 deletions filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand Down Expand Up @@ -145,8 +146,8 @@ func (inp *managedInput) runSource(
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
ACKEvents: newInputACKHandler(ctx.Logger),
CloseRef: ctx.Cancelation,
ACKHandler: newInputACKHandler(ctx.Logger),
})
if err != nil {
return err
Expand Down Expand Up @@ -174,8 +175,8 @@ func (inp *managedInput) createSourceID(s Source) string {
return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name())
}

func newInputACKHandler(log *logp.Logger) func([]interface{}) {
return func(private []interface{}) {
func newInputACKHandler(log *logp.Logger) beat.ACKer {
return acker.EventPrivateReporter(func(acked int, private []interface{}) {
var n uint
var last int
for i := 0; i < len(private); i++ {
Expand All @@ -196,5 +197,5 @@ func newInputACKHandler(log *logp.Logger) func([]interface{}) {
return
}
private[last].(*updateOp).Execute(n)
}
})
}
22 changes: 7 additions & 15 deletions filebeat/input/v2/input-cursor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,24 +434,16 @@ func TestManager_InputsRun(t *testing.T) {
defer cancel()

// setup publishing pipeline and capture ACKer, so we can simulate progress in the Output
var acker func([]interface{})
var activeEventPrivate []interface{}

ackEvents := func(n int) {
data, rest := activeEventPrivate[:n], activeEventPrivate[n:]
activeEventPrivate = rest
acker(data)
}

var acker beat.ACKer
var wgACKer sync.WaitGroup
wgACKer.Add(1)
pipeline := &pubtest.FakeConnector{
ConnectFunc: func(cfg beat.ClientConfig) (beat.Client, error) {
defer wgACKer.Done()
acker = cfg.ACKEvents
acker = cfg.ACKHandler
return &pubtest.FakeClient{
PublishFunc: func(event beat.Event) {
activeEventPrivate = append(activeEventPrivate, event.Private)
acker.AddEvent(event, true)
},
}, nil
},
Expand All @@ -478,19 +470,19 @@ func TestManager_InputsRun(t *testing.T) {
require.Equal(t, nil, store.snapshot()["test::key"].Cursor)

// ACK first 2 events and check snapshot state
ackEvents(2)
acker.ACKEvents(2)
require.Equal(t, "test-cursor-state2", store.snapshot()["test::key"].Cursor)

// ACK 1 events and check snapshot state (3 events published)
ackEvents(1)
acker.ACKEvents(1)
require.Equal(t, "test-cursor-state3", store.snapshot()["test::key"].Cursor)

// ACK event without cursor update and check snapshot state not modified
ackEvents(1)
acker.ACKEvents(1)
require.Equal(t, "test-cursor-state3", store.snapshot()["test::key"].Cursor)

// ACK rest
ackEvents(3)
acker.ACKEvents(3)
require.Equal(t, "test-cursor-state6", store.snapshot()["test::key"].Cursor)
})
}
Expand Down
28 changes: 12 additions & 16 deletions journalbeat/beater/journalbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"github.com/elastic/beats/v7/journalbeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"

"github.com/elastic/beats/v7/journalbeat/config"
_ "github.com/elastic/beats/v7/journalbeat/include"
Expand Down Expand Up @@ -67,7 +69,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {

var inputs []*input.Input
for _, c := range config.Inputs {
i, err := input.New(c, b, done, cp.States())
i, err := input.New(c, b.Info, done, cp.States())
if err != nil {
return nil, err
}
Expand All @@ -91,34 +93,28 @@ func (bt *Journalbeat) Run(b *beat.Beat) error {
bt.logger.Info("journalbeat is running! Hit CTRL-C to stop it.")
defer bt.logger.Info("journalbeat is stopping")

err := bt.pipeline.SetACKHandler(beat.PipelineACKHandler{
ACKLastEvents: func(data []interface{}) {
for _, datum := range data {
if st, ok := datum.(checkpoint.JournalState); ok {
bt.checkpoint.PersistState(st)
}
}
},
})
if err != nil {
return err
}
defer bt.checkpoint.Shutdown()

pipeline := pipetool.WithACKer(b.Publisher, acker.LastEventPrivateReporter(func(_ int, private interface{}) {
if st, ok := private.(checkpoint.JournalState); ok {
bt.checkpoint.PersistState(st)
}
}))

var wg sync.WaitGroup
for _, i := range bt.inputs {
wg.Add(1)
go bt.runInput(i, &wg)
go bt.runInput(i, &wg, pipeline)
}

wg.Wait()

return nil
}

func (bt *Journalbeat) runInput(i *input.Input, wg *sync.WaitGroup) {
func (bt *Journalbeat) runInput(i *input.Input, wg *sync.WaitGroup, pipeline beat.Pipeline) {
defer wg.Done()
i.Run()
i.Run(pipeline)
}

// Stop stops the beat and its inputs.
Expand Down
15 changes: 7 additions & 8 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"

"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"

"github.com/elastic/beats/v7/journalbeat/checkpoint"
Expand All @@ -38,7 +39,6 @@ type Input struct {
readers []*reader.Reader
done chan struct{}
config Config
pipeline beat.Pipeline
client beat.Client
states map[string]checkpoint.JournalState
logger *logp.Logger
Expand All @@ -49,7 +49,7 @@ type Input struct {
// New returns a new Inout
func New(
c *common.Config,
b *beat.Beat,
info beat.Info,
done chan struct{},
states map[string]checkpoint.JournalState,
) (*Input, error) {
Expand Down Expand Up @@ -104,7 +104,7 @@ func New(
readers = append(readers, r)
}

inputProcessors, err := processorsForInput(b.Info, config)
inputProcessors, err := processorsForInput(info, config)
if err != nil {
return nil, err
}
Expand All @@ -115,7 +115,6 @@ func New(
readers: readers,
done: done,
config: config,
pipeline: b.Publisher,
states: states,
logger: logger,
eventMeta: config.EventMetadata,
Expand All @@ -125,18 +124,18 @@ func New(

// Run connects to the output, collects entries from the readers
// and then publishes the events.
func (i *Input) Run() {
func (i *Input) Run(pipeline beat.Pipeline) {
var err error
i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{
i.client, err = pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
EventMetadata: i.eventMeta,
Meta: nil,
Processor: i.processors,
},
ACKCount: func(n int) {
ACKHandler: acker.Counting(func(n int) {
i.logger.Debugw("journalbeat successfully published events", "event.count", n)
},
}),
})
if err != nil {
i.logger.Error("Error connecting to output: %v", err)
Expand Down
Loading

0 comments on commit a36910a

Please sign in to comment.