diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index 72635c194f47..5f926386aa79 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -19,6 +19,7 @@ package input_logfile import ( "context" + "errors" "fmt" "runtime/debug" "sync" @@ -32,6 +33,10 @@ import ( "github.com/elastic/go-concert/unison" ) +var ( + ErrHarvesterAlreadyRunning = errors.New("harvester is already running for file") +) + // Harvester is the reader which collects the lines from // the configured source. type Harvester interface { @@ -66,7 +71,7 @@ func (r *readerGroup) newContext(id string, cancelation v2.Canceler) (context.Co defer r.mu.Unlock() if _, ok := r.table[id]; ok { - return nil, nil, fmt.Errorf("harvester is already running for file") + return nil, nil, ErrHarvesterAlreadyRunning } ctx, cancel := context.WithCancel(ctxtool.FromCanceller(cancelation)) @@ -88,6 +93,14 @@ func (r *readerGroup) remove(id string) { delete(r.table, id) } +func (r *readerGroup) hasID(id string) bool { + r.mu.Lock() + defer r.mu.Unlock() + + _, ok := r.table[id] + return ok +} + // HarvesterGroup is responsible for running the // Harvesters started by the Prospector. type HarvesterGroup interface { diff --git a/filebeat/input/filestream/internal/input-logfile/harvester_test.go b/filebeat/input/filestream/internal/input-logfile/harvester_test.go new file mode 100644 index 000000000000..6bc6f2f72e60 --- /dev/null +++ b/filebeat/input/filestream/internal/input-logfile/harvester_test.go @@ -0,0 +1,325 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input_logfile + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/tests/resources" + "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemock" + "github.com/elastic/go-concert/unison" +) + +func TestReaderGroup(t *testing.T) { + requireGroupSuccess := func(t *testing.T, ctx context.Context, cf context.CancelFunc, err error) { + require.NotNil(t, ctx) + require.NotNil(t, cf) + require.Nil(t, err) + } + + requireGroupError := func(t *testing.T, ctx context.Context, cf context.CancelFunc, err error) { + require.Nil(t, ctx) + require.Nil(t, cf) + require.Error(t, err) + } + + t.Run("assert new group is empty", func(t *testing.T) { + rg := newReaderGroup() + require.Equal(t, 0, len(rg.table)) + }) + + t.Run("assert non existent key can be removed", func(t *testing.T) { + rg := newReaderGroup() + require.Equal(t, 0, len(rg.table)) + rg.remove("no such id") + require.Equal(t, 0, len(rg.table)) + }) + + t.Run("assert inserting existing key returns error", func(t *testing.T) { + rg := newReaderGroup() + ctx, cf, err := rg.newContext("test-id", context.Background()) + requireGroupSuccess(t, ctx, cf, err) + require.Equal(t, 1, len(rg.table)) + + newCtx, newCf, err := rg.newContext("test-id", context.Background()) + requireGroupError(t, newCtx, newCf, err) + }) + + t.Run("assert new key is added, can be removed and its context is cancelled", func(t *testing.T) { + rg := newReaderGroup() + ctx, cf, err := rg.newContext("test-id", context.Background()) + requireGroupSuccess(t, ctx, cf, err) + require.Equal(t, 1, len(rg.table)) + + require.Nil(t, ctx.Err()) + rg.remove("test-id") + + require.Equal(t, 0, len(rg.table)) + require.Error(t, ctx.Err(), context.Canceled) + + newCtx, newCf, err := rg.newContext("test-id", context.Background()) + requireGroupSuccess(t, newCtx, newCf, err) + require.Equal(t, 1, len(rg.table)) + require.Nil(t, newCtx.Err()) + }) +} + +func TestDefaultHarvesterGroup(t *testing.T) { + source := &testSource{"/path/to/test"} + + requireSourceAddedToBookkeeper := func(t *testing.T, hg *defaultHarvesterGroup, s Source) { + require.True(t, hg.readers.hasID(s.Name())) + } + + requireSourceRemovedFromBookkeeper := func(t *testing.T, hg *defaultHarvesterGroup, s Source) { + require.False(t, hg.readers.hasID(s.Name())) + } + + t.Run("assert a harvester is started in a goroutine", func(t *testing.T) { + var wg sync.WaitGroup + mockHarvester := &mockHarvester{onRun: correctOnRun, wg: &wg} + hg := testDefaultHarvesterGroup(t, mockHarvester) + + gorountineChecker := resources.NewGoroutinesChecker() + defer gorountineChecker.WaitUntilOriginalCount() + + wg.Add(1) + hg.Start(input.Context{Logger: logp.L(), Cancelation: context.Background()}, source) + + // wait until harvester.Run is done + wg.Wait() + // wait until goroutine that started `harvester.Run` is finished + gorountineChecker.WaitUntilOriginalCount() + + require.Equal(t, 1, mockHarvester.getRunCount()) + + requireSourceRemovedFromBookkeeper(t, hg, source) + // stopped source can be stopped + require.Nil(t, hg.StopGroup()) + }) + + t.Run("assert a harvester can be stopped and removed from bookkeeper", func(t *testing.T) { + mockHarvester := &mockHarvester{onRun: blockUntilCancelOnRun} + hg := testDefaultHarvesterGroup(t, mockHarvester) + + gorountineChecker := resources.NewGoroutinesChecker() + + hg.Start(input.Context{Logger: logp.L(), Cancelation: context.Background()}, source) + + gorountineChecker.WaitUntilIncreased(1) + // wait until harvester is started + if mockHarvester.getRunCount() == 1 { + requireSourceAddedToBookkeeper(t, hg, source) + // after started, stop it + hg.Stop(source) + gorountineChecker.WaitUntilOriginalCount() + } + + requireSourceRemovedFromBookkeeper(t, hg, source) + }) + + t.Run("assert a harvester for same source cannot be started", func(t *testing.T) { + mockHarvester := &mockHarvester{onRun: blockUntilCancelOnRun} + hg := testDefaultHarvesterGroup(t, mockHarvester) + inputCtx := input.Context{Logger: logp.L(), Cancelation: context.Background()} + + gorountineChecker := resources.NewGoroutinesChecker() + defer gorountineChecker.WaitUntilOriginalCount() + + hg.Start(inputCtx, source) + hg.Start(inputCtx, source) + + gorountineChecker.WaitUntilIncreased(2) + // error is expected as a harvester group was expected to start twice for the same source + for !hg.readers.hasID(source.Name()) { + } + time.Sleep(3 * time.Millisecond) + + hg.Stop(source) + + err := hg.StopGroup() + require.Error(t, err) + + require.Equal(t, 1, mockHarvester.getRunCount()) + }) + + t.Run("assert a harvester panic is handled", func(t *testing.T) { + mockHarvester := &mockHarvester{onRun: panicOnRun} + hg := testDefaultHarvesterGroup(t, mockHarvester) + defer func() { + if v := recover(); v != nil { + t.Errorf("did not recover from harvester panic in defaultHarvesterGroup") + } + }() + + gorountineChecker := resources.NewGoroutinesChecker() + + hg.Start(input.Context{Logger: logp.L(), Cancelation: context.Background()}, source) + + // wait until harvester is stopped + gorountineChecker.WaitUntilOriginalCount() + + // make sure harvester had run once + require.Equal(t, 1, mockHarvester.getRunCount()) + requireSourceRemovedFromBookkeeper(t, hg, source) + + require.Nil(t, hg.StopGroup()) + }) + + t.Run("assert a harvester error is handled", func(t *testing.T) { + mockHarvester := &mockHarvester{onRun: errorOnRun} + hg := testDefaultHarvesterGroup(t, mockHarvester) + + gorountineChecker := resources.NewGoroutinesChecker() + defer gorountineChecker.WaitUntilOriginalCount() + + hg.Start(input.Context{Logger: logp.L(), Cancelation: context.Background()}, source) + + gorountineChecker.WaitUntilOriginalCount() + + requireSourceRemovedFromBookkeeper(t, hg, source) + + err := hg.StopGroup() + require.Error(t, err) + }) + + t.Run("assert already locked resource has to wait", func(t *testing.T) { + var wg sync.WaitGroup + mockHarvester := &mockHarvester{onRun: correctOnRun, wg: &wg} + hg := testDefaultHarvesterGroup(t, mockHarvester) + inputCtx := input.Context{Logger: logp.L(), Cancelation: context.Background()} + + r, err := lock(inputCtx, hg.store, source.Name()) + if err != nil { + t.Fatalf("cannot lock source") + } + + gorountineChecker := resources.NewGoroutinesChecker() + + wg.Add(1) + hg.Start(inputCtx, source) + + gorountineChecker.WaitUntilIncreased(1) + ok := false + for !ok { + // wait until harvester is added to the bookeeper + ok = hg.readers.hasID(source.Name()) + if ok { + releaseResource(r) + } + } + + // wait until harvester.Run is done + wg.Wait() + // wait until goroutine that started `harvester.Run` is finished + gorountineChecker.WaitUntilOriginalCount() + require.Equal(t, 1, mockHarvester.getRunCount()) + require.Nil(t, hg.StopGroup()) + }) + + t.Run("assert already locked resource has no problem when harvestergroup is cancelled", func(t *testing.T) { + mockHarvester := &mockHarvester{onRun: correctOnRun} + hg := testDefaultHarvesterGroup(t, mockHarvester) + inputCtx := input.Context{Logger: logp.L(), Cancelation: context.Background()} + + gorountineChecker := resources.NewGoroutinesChecker() + defer gorountineChecker.WaitUntilOriginalCount() + + r, err := lock(inputCtx, hg.store, source.Name()) + if err != nil { + t.Fatalf("cannot lock source") + } + defer releaseResource(r) + + hg.Start(inputCtx, source) + + gorountineChecker.WaitUntilIncreased(1) + require.Error(t, hg.StopGroup()) + + require.Equal(t, 0, mockHarvester.getRunCount()) + }) +} + +func testDefaultHarvesterGroup(t *testing.T, mockHarvester Harvester) *defaultHarvesterGroup { + return &defaultHarvesterGroup{ + readers: newReaderGroup(), + pipeline: &pipelinemock.MockPipelineConnector{}, + harvester: mockHarvester, + store: testOpenStore(t, "test", nil), + tg: unison.TaskGroup{}, + } +} + +type mockHarvester struct { + mu sync.Mutex + runCount int + + wg *sync.WaitGroup + onRun func(input.Context, Source, Cursor, Publisher) error +} + +func (m *mockHarvester) Run(ctx input.Context, s Source, c Cursor, p Publisher) error { + if m.wg != nil { + defer m.wg.Done() + } + + m.mu.Lock() + m.runCount += 1 + m.mu.Unlock() + + if m.onRun != nil { + return m.onRun(ctx, s, c, p) + } + return nil +} + +func (m *mockHarvester) getRunCount() int { + m.mu.Lock() + defer m.mu.Unlock() + + return m.runCount +} + +func (m *mockHarvester) Test(_ Source, _ input.TestContext) error { return nil } + +func (m *mockHarvester) Name() string { return "mock" } + +func correctOnRun(_ input.Context, _ Source, _ Cursor, _ Publisher) error { + return nil +} + +func blockUntilCancelOnRun(c input.Context, _ Source, _ Cursor, _ Publisher) error { + <-c.Cancelation.Done() + return nil +} + +func errorOnRun(_ input.Context, _ Source, _ Cursor, _ Publisher) error { + return fmt.Errorf("harvester error") +} + +func panicOnRun(_ input.Context, _ Source, _ Cursor, _ Publisher) error { + panic("don't panic") +} diff --git a/libbeat/tests/resources/goroutines.go b/libbeat/tests/resources/goroutines.go index 708ce4c201ba..3351c7b23772 100644 --- a/libbeat/tests/resources/goroutines.go +++ b/libbeat/tests/resources/goroutines.go @@ -56,15 +56,11 @@ func (c GoroutinesChecker) Check(t testing.TB) { } func (c GoroutinesChecker) check(t testing.TB) error { - timeout := time.Now().Add(c.FinalizationTimeout) - var after int - for time.Now().Before(timeout) { - after = runtime.NumGoroutine() - if after <= c.before { - return nil - } - time.Sleep(10 * time.Millisecond) + after := c.WaitUntilOriginalCount() + if after == 0 { + return nil } + profile := pprof.Lookup("goroutine") profile.WriteTo(os.Stdout, 2) return fmt.Errorf("Possible goroutines leak, before: %d, after: %d", c.before, after) @@ -78,3 +74,24 @@ func CallAndCheckGoroutines(t testing.TB, f func()) { f() c.Check(t) } + +// WaitUntilOriginalCount waits until the original number of goroutines are +// present before we has created the resource checker. +func (c GoroutinesChecker) WaitUntilOriginalCount() int { + timeout := time.Now().Add(c.FinalizationTimeout) + var after int + for time.Now().Before(timeout) { + after = runtime.NumGoroutine() + if after <= c.before { + return 0 + } + time.Sleep(10 * time.Millisecond) + } + return after +} + +func (c *GoroutinesChecker) WaitUntilIncreased(n int) { + for runtime.NumGoroutine() < c.before+n { + time.Sleep(10 * time.Millisecond) + } +}