From 8b9fa3af7eb870336816386e792f7a78f93aa176 Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 2 Jul 2020 14:37:46 +0200 Subject: [PATCH] review --- filebeat/input/v2/input-cursor/input.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/filebeat/input/v2/input-cursor/input.go b/filebeat/input/v2/input-cursor/input.go index a5399fe4c260..a768823d9f6c 100644 --- a/filebeat/input/v2/input-cursor/input.go +++ b/filebeat/input/v2/input-cursor/input.go @@ -137,7 +137,6 @@ func (inp *managedInput) runSource( source Source, pipeline beat.PipelineConnector, ) (err error) { - // Setup error recovery/reporting defer func() { if v := recover(); v != nil { err = fmt.Errorf("input panic with: %+v\n%s", v, debug.Stack()) @@ -145,7 +144,6 @@ func (inp *managedInput) runSource( } }() - // connect to libbeat publisher pipeline client, err := pipeline.ConnectWith(beat.ClientConfig{ CloseRef: ctx.Cancelation, ACKEvents: newInputACKHandler(ctx.Logger), @@ -155,7 +153,6 @@ func (inp *managedInput) runSource( } defer client.Close() - // lock resource for exclusive access and create cursor resourceKey := inp.createSourceID(source) resource, err := inp.manager.lock(ctx, resourceKey) if err != nil { @@ -163,10 +160,8 @@ func (inp *managedInput) runSource( } defer releaseResource(resource) - // Ensure we use the correct TTL by updating it now. If the resource is 'new' we will insert it into the registry now. store.UpdateTTL(resource, inp.cleanTimeout) - // start the collection cursor := makeCursor(store, resource) publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor} return inp.input.Run(ctx, source, cursor, publisher)