Skip to content

Commit

Permalink
Cursor input and manager implementation (elastic#19571)
Browse files Browse the repository at this point in the history
This change finally implements the input manager and actual input execution for stateful
inputs.
  • Loading branch information
Steffen Siering committed Jul 8, 2020
1 parent 11617f3 commit e62dde2
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 7 deletions.
120 changes: 118 additions & 2 deletions filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@
package cursor

import (
"context"
"fmt"
"runtime/debug"
"time"

"github.com/urso/sderr"

"github.com/elastic/go-concert/ctxtool"
"github.com/elastic/go-concert/unison"

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
)

// Input interface for cursor based inputs. This interface must be implemented
Expand Down Expand Up @@ -62,7 +70,29 @@ func (inp *managedInput) Name() string { return inp.input.Name() }

// Test runs the Test method for each configured source.
func (inp *managedInput) Test(ctx input.TestContext) error {
panic("TODO: implement me")
var grp unison.MultiErrGroup
for _, source := range inp.sources {
source := source
grp.Go(func() (err error) {
return inp.testSource(ctx, source)
})
}

errs := grp.Wait()
if len(errs) > 0 {
return sderr.WrapAll(errs, "input tests failed")
}
return nil
}

func (inp *managedInput) testSource(ctx input.TestContext, source Source) (err error) {
defer func() {
if v := recover(); v != nil {
err = fmt.Errorf("input panic with: %+v\n%s", v, debug.Stack())
ctx.Logger.Errorf("Input crashed with: %+v", err)
}
}()
return inp.input.Test(source, ctx)
}

// Run creates a go-routine per source, waiting until all go-routines have
Expand All @@ -73,7 +103,68 @@ func (inp *managedInput) Run(
ctx input.Context,
pipeline beat.PipelineConnector,
) (err error) {
panic("TODO: implement me")
// Setup cancellation using a custom cancel context. All workers will be
// stopped if one failed badly by returning an error.
cancelCtx, cancel := context.WithCancel(ctxtool.FromCanceller(ctx.Cancelation))
defer cancel()
ctx.Cancelation = cancelCtx

var grp unison.MultiErrGroup
for _, source := range inp.sources {
source := source
grp.Go(func() (err error) {
// refine per worker context
inpCtx := ctx
inpCtx.ID = ctx.ID + "::" + source.Name()
inpCtx.Logger = ctx.Logger.With("source", source.Name())

if err = inp.runSource(inpCtx, inp.manager.store, source, pipeline); err != nil {
cancel()
}
return err
})
}

if errs := grp.Wait(); len(errs) > 0 {
return sderr.WrapAll(errs, "input %{id} failed", ctx.ID)
}
return nil
}

func (inp *managedInput) runSource(
ctx input.Context,
store *store,
source Source,
pipeline beat.PipelineConnector,
) (err error) {
defer func() {
if v := recover(); v != nil {
err = fmt.Errorf("input panic with: %+v\n%s", v, debug.Stack())
ctx.Logger.Errorf("Input crashed with: %+v", err)
}
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
ACKEvents: newInputACKHandler(ctx.Logger),
})
if err != nil {
return err
}
defer client.Close()

resourceKey := inp.createSourceID(source)
resource, err := inp.manager.lock(ctx, resourceKey)
if err != nil {
return err
}
defer releaseResource(resource)

store.UpdateTTL(resource, inp.cleanTimeout)

cursor := makeCursor(store, resource)
publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor}
return inp.input.Run(ctx, source, cursor, publisher)
}

func (inp *managedInput) createSourceID(s Source) string {
Expand All @@ -82,3 +173,28 @@ func (inp *managedInput) createSourceID(s Source) string {
}
return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name())
}

func newInputACKHandler(log *logp.Logger) func([]interface{}) {
return func(private []interface{}) {
var n uint
var last int
for i := 0; i < len(private); i++ {
current := private[i]
if current == nil {
continue
}

if _, ok := current.(*updateOp); !ok {
continue
}

n++
last = i
}

if n == 0 {
return
}
private[last].(*updateOp).Execute(n)
}
}
112 changes: 107 additions & 5 deletions filebeat/input/v2/input-cursor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
package cursor

import (
"errors"
"sync"
"time"

"github.com/urso/sderr"

"github.com/elastic/go-concert/unison"

input "github.com/elastic/beats/v7/filebeat/input/v2"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"

"github.com/elastic/go-concert/unison"
)

// InputManager is used to create, manage, and coordinate stateful inputs and
Expand Down Expand Up @@ -60,7 +64,9 @@ type InputManager struct {
// that will be used to collect events from each source.
Configure func(cfg *common.Config) ([]Source, Input, error)

store *store
initOnce sync.Once
initErr error
store *store
}

// Source describe a source the input can collect data from.
Expand All @@ -70,22 +76,118 @@ type Source interface {
Name() string
}

var errNoSourceConfigured = errors.New("no source has been configured")
var errNoInputRunner = errors.New("no input runner available")

// StateStore interface and configurations used to give the Manager access to the persistent store.
type StateStore interface {
Access() (*statestore.Store, error)
CleanupInterval() time.Duration
}

func (cim *InputManager) init() error {
cim.initOnce.Do(func() {
if cim.DefaultCleanTimeout <= 0 {
cim.DefaultCleanTimeout = 30 * time.Minute
}

log := cim.Logger.With("input_type", cim.Type)
var store *store
store, cim.initErr = openStore(log, cim.StateStore, cim.Type)
if cim.initErr != nil {
return
}

cim.store = store
})

return cim.initErr
}

// Init starts background processes for deleting old entries from the
// persistent store if mode is ModeRun.
func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error {
panic("TODO: implement me")
if mode != v2.ModeRun {
return nil
}

if err := cim.init(); err != nil {
return err
}

log := cim.Logger.With("input_type", cim.Type)

store := cim.store
cleaner := &cleaner{log: log}
store.Retain()
err := group.Go(func(canceler unison.Canceler) error {
defer cim.shutdown()
defer store.Release()
interval := cim.StateStore.CleanupInterval()
if interval <= 0 {
interval = 5 * time.Minute
}
cleaner.run(canceler, store, interval)
return nil
})
if err != nil {
store.Release()
cim.shutdown()
return sderr.Wrap(err, "Can not start registry cleanup process")
}

return nil
}

func (cim *InputManager) shutdown() {
cim.store.Release()
}

// Create builds a new v2.Input using the provided Configure function.
// The Input will run a go-routine per source that has been configured.
func (cim *InputManager) Create(config *common.Config) (input.Input, error) {
panic("TODO: implement me")
if err := cim.init(); err != nil {
return nil, err
}

settings := struct {
ID string `config:"id"`
CleanTimeout time.Duration `config:"clean_timeout"`
}{ID: "", CleanTimeout: cim.DefaultCleanTimeout}
if err := config.Unpack(&settings); err != nil {
return nil, err
}

sources, inp, err := cim.Configure(config)
if err != nil {
return nil, err
}
if len(sources) == 0 {
return nil, errNoSourceConfigured
}
if inp == nil {
return nil, errNoInputRunner
}

return &managedInput{
manager: cim,
userID: settings.ID,
sources: sources,
input: inp,
cleanTimeout: settings.CleanTimeout,
}, nil
}

// Lock locks a key for exclusive access and returns an resource that can be used to modify
// the cursor state and unlock the key.
func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error) {
resource := cim.store.Get(key)
err := lockResource(ctx.Logger, resource, ctx.Cancelation)
if err != nil {
resource.Release()
return nil, err
}
return resource, nil
}

func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error {
Expand Down

0 comments on commit e62dde2

Please sign in to comment.