Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed Jul 2, 2020
1 parent 760fb81 commit 8b9fa3a
Showing 1 changed file with 0 additions and 5 deletions.
5 changes: 0 additions & 5 deletions filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,13 @@ func (inp *managedInput) runSource(
source Source,
pipeline beat.PipelineConnector,
) (err error) {
// Setup error recovery/reporting
defer func() {
if v := recover(); v != nil {
err = fmt.Errorf("input panic with: %+v\n%s", v, debug.Stack())
ctx.Logger.Errorf("Input crashed with: %+v", err)
}
}()

// connect to libbeat publisher pipeline
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
ACKEvents: newInputACKHandler(ctx.Logger),
Expand All @@ -155,18 +153,15 @@ func (inp *managedInput) runSource(
}
defer client.Close()

// lock resource for exclusive access and create cursor
resourceKey := inp.createSourceID(source)
resource, err := inp.manager.lock(ctx, resourceKey)
if err != nil {
return err
}
defer releaseResource(resource)

// Ensure we use the correct TTL by updating it now. If the resource is 'new' we will insert it into the registry now.
store.UpdateTTL(resource, inp.cleanTimeout)

// start the collection
cursor := makeCursor(store, resource)
publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor}
return inp.input.Run(ctx, source, cursor, publisher)
Expand Down

0 comments on commit 8b9fa3a

Please sign in to comment.