diff --git a/filebeat/input/v2/input-cursor/input.go b/filebeat/input/v2/input-cursor/input.go index 94faeeaadda8..a768823d9f6c 100644 --- a/filebeat/input/v2/input-cursor/input.go +++ b/filebeat/input/v2/input-cursor/input.go @@ -18,11 +18,19 @@ package cursor import ( + "context" "fmt" + "runtime/debug" "time" + "github.com/urso/sderr" + + "github.com/elastic/go-concert/ctxtool" + "github.com/elastic/go-concert/unison" + input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" ) // Input interface for cursor based inputs. This interface must be implemented @@ -62,7 +70,29 @@ func (inp *managedInput) Name() string { return inp.input.Name() } // Test runs the Test method for each configured source. func (inp *managedInput) Test(ctx input.TestContext) error { - panic("TODO: implement me") + var grp unison.MultiErrGroup + for _, source := range inp.sources { + source := source + grp.Go(func() (err error) { + return inp.testSource(ctx, source) + }) + } + + errs := grp.Wait() + if len(errs) > 0 { + return sderr.WrapAll(errs, "input tests failed") + } + return nil +} + +func (inp *managedInput) testSource(ctx input.TestContext, source Source) (err error) { + defer func() { + if v := recover(); v != nil { + err = fmt.Errorf("input panic with: %+v\n%s", v, debug.Stack()) + ctx.Logger.Errorf("Input crashed with: %+v", err) + } + }() + return inp.input.Test(source, ctx) } // Run creates a go-routine per source, waiting until all go-routines have @@ -73,7 +103,68 @@ func (inp *managedInput) Run( ctx input.Context, pipeline beat.PipelineConnector, ) (err error) { - panic("TODO: implement me") + // Setup cancellation using a custom cancel context. All workers will be + // stopped if one failed badly by returning an error. + cancelCtx, cancel := context.WithCancel(ctxtool.FromCanceller(ctx.Cancelation)) + defer cancel() + ctx.Cancelation = cancelCtx + + var grp unison.MultiErrGroup + for _, source := range inp.sources { + source := source + grp.Go(func() (err error) { + // refine per worker context + inpCtx := ctx + inpCtx.ID = ctx.ID + "::" + source.Name() + inpCtx.Logger = ctx.Logger.With("source", source.Name()) + + if err = inp.runSource(inpCtx, inp.manager.store, source, pipeline); err != nil { + cancel() + } + return err + }) + } + + if errs := grp.Wait(); len(errs) > 0 { + return sderr.WrapAll(errs, "input %{id} failed", ctx.ID) + } + return nil +} + +func (inp *managedInput) runSource( + ctx input.Context, + store *store, + source Source, + pipeline beat.PipelineConnector, +) (err error) { + defer func() { + if v := recover(); v != nil { + err = fmt.Errorf("input panic with: %+v\n%s", v, debug.Stack()) + ctx.Logger.Errorf("Input crashed with: %+v", err) + } + }() + + client, err := pipeline.ConnectWith(beat.ClientConfig{ + CloseRef: ctx.Cancelation, + ACKEvents: newInputACKHandler(ctx.Logger), + }) + if err != nil { + return err + } + defer client.Close() + + resourceKey := inp.createSourceID(source) + resource, err := inp.manager.lock(ctx, resourceKey) + if err != nil { + return err + } + defer releaseResource(resource) + + store.UpdateTTL(resource, inp.cleanTimeout) + + cursor := makeCursor(store, resource) + publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor} + return inp.input.Run(ctx, source, cursor, publisher) } func (inp *managedInput) createSourceID(s Source) string { @@ -82,3 +173,28 @@ func (inp *managedInput) createSourceID(s Source) string { } return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name()) } + +func newInputACKHandler(log *logp.Logger) func([]interface{}) { + return func(private []interface{}) { + var n uint + var last int + for i := 0; i < len(private); i++ { + current := private[i] + if current == nil { + continue + } + + if _, ok := current.(*updateOp); !ok { + continue + } + + n++ + last = i + } + + if n == 0 { + return + } + private[last].(*updateOp).Execute(n) + } +} diff --git a/filebeat/input/v2/input-cursor/manager.go b/filebeat/input/v2/input-cursor/manager.go index ee1b2bc7939b..2a4310dc778e 100644 --- a/filebeat/input/v2/input-cursor/manager.go +++ b/filebeat/input/v2/input-cursor/manager.go @@ -18,15 +18,19 @@ package cursor import ( + "errors" + "sync" "time" + "github.com/urso/sderr" + + "github.com/elastic/go-concert/unison" + input "github.com/elastic/beats/v7/filebeat/input/v2" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/statestore" - - "github.com/elastic/go-concert/unison" ) // InputManager is used to create, manage, and coordinate stateful inputs and @@ -60,7 +64,9 @@ type InputManager struct { // that will be used to collect events from each source. Configure func(cfg *common.Config) ([]Source, Input, error) - store *store + initOnce sync.Once + initErr error + store *store } // Source describe a source the input can collect data from. @@ -70,22 +76,118 @@ type Source interface { Name() string } +var errNoSourceConfigured = errors.New("no source has been configured") +var errNoInputRunner = errors.New("no input runner available") + // StateStore interface and configurations used to give the Manager access to the persistent store. type StateStore interface { Access() (*statestore.Store, error) CleanupInterval() time.Duration } +func (cim *InputManager) init() error { + cim.initOnce.Do(func() { + if cim.DefaultCleanTimeout <= 0 { + cim.DefaultCleanTimeout = 30 * time.Minute + } + + log := cim.Logger.With("input_type", cim.Type) + var store *store + store, cim.initErr = openStore(log, cim.StateStore, cim.Type) + if cim.initErr != nil { + return + } + + cim.store = store + }) + + return cim.initErr +} + // Init starts background processes for deleting old entries from the // persistent store if mode is ModeRun. func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error { - panic("TODO: implement me") + if mode != v2.ModeRun { + return nil + } + + if err := cim.init(); err != nil { + return err + } + + log := cim.Logger.With("input_type", cim.Type) + + store := cim.store + cleaner := &cleaner{log: log} + store.Retain() + err := group.Go(func(canceler unison.Canceler) error { + defer cim.shutdown() + defer store.Release() + interval := cim.StateStore.CleanupInterval() + if interval <= 0 { + interval = 5 * time.Minute + } + cleaner.run(canceler, store, interval) + return nil + }) + if err != nil { + store.Release() + cim.shutdown() + return sderr.Wrap(err, "Can not start registry cleanup process") + } + + return nil +} + +func (cim *InputManager) shutdown() { + cim.store.Release() } // Create builds a new v2.Input using the provided Configure function. // The Input will run a go-routine per source that has been configured. func (cim *InputManager) Create(config *common.Config) (input.Input, error) { - panic("TODO: implement me") + if err := cim.init(); err != nil { + return nil, err + } + + settings := struct { + ID string `config:"id"` + CleanTimeout time.Duration `config:"clean_timeout"` + }{ID: "", CleanTimeout: cim.DefaultCleanTimeout} + if err := config.Unpack(&settings); err != nil { + return nil, err + } + + sources, inp, err := cim.Configure(config) + if err != nil { + return nil, err + } + if len(sources) == 0 { + return nil, errNoSourceConfigured + } + if inp == nil { + return nil, errNoInputRunner + } + + return &managedInput{ + manager: cim, + userID: settings.ID, + sources: sources, + input: inp, + cleanTimeout: settings.CleanTimeout, + }, nil +} + +// Lock locks a key for exclusive access and returns an resource that can be used to modify +// the cursor state and unlock the key. +func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error) { + resource := cim.store.Get(key) + err := lockResource(ctx.Logger, resource, ctx.Cancelation) + if err != nil { + resource.Release() + return nil, err + } + return resource, nil } func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error {