From 5367a39fd6a1c4a7abafcb31834ced8cfdd6fd12 Mon Sep 17 00:00:00 2001 From: urso Date: Tue, 30 Jun 2020 23:16:44 +0200 Subject: [PATCH] Add publisher implementation for stateful inputs --- filebeat/input/v2/input-cursor/publish.go | 53 +++++- .../input/v2/input-cursor/publish_test.go | 158 ++++++++++++++++++ 2 files changed, 207 insertions(+), 4 deletions(-) create mode 100644 filebeat/input/v2/input-cursor/publish_test.go diff --git a/filebeat/input/v2/input-cursor/publish.go b/filebeat/input/v2/input-cursor/publish.go index b5d33a7fce4..64096951132 100644 --- a/filebeat/input/v2/input-cursor/publish.go +++ b/filebeat/input/v2/input-cursor/publish.go @@ -23,6 +23,7 @@ import ( input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" + "github.com/elastic/beats/v7/libbeat/statestore" ) // Publisher is used to publish an event and update the cursor in a single call to Publish. @@ -64,12 +65,25 @@ type updateOp struct { // The ACK ordering in the publisher pipeline guarantees that update operations // will be ACKed and executed in the correct order. func (c *cursorPublisher) Publish(event beat.Event, cursorUpdate interface{}) error { - panic("TODO: implement me") + if cursorUpdate == nil { + return c.forward(event) + } + + op, err := createUpdateOp(c.cursor.store, c.cursor.resource, cursorUpdate) + if err != nil { + return err + } + + event.Private = op + return c.forward(event) } -// Execute updates the persistent store with the scheduled changes and releases the resource. -func (op *updateOp) Execute(numEvents uint) { - panic("TODO: implement me") +func (c *cursorPublisher) forward(event beat.Event) error { + c.client.Publish(event) + if c.canceler == nil { + return nil + } + return c.canceler.Err() } func createUpdateOp(store *store, resource *resource, updates interface{}) (*updateOp, error) { @@ -106,3 +120,34 @@ func (op *updateOp) done(n uint) { op.resource = nil *op = updateOp{} } + +// Execute updates the persistent store with the scheduled changes and releases the resource. +func (op *updateOp) Execute(n uint) { + resource := op.resource + defer op.done(n) + + resource.stateMutex.Lock() + defer resource.stateMutex.Unlock() + + resource.activeCursorOperations -= n + if resource.activeCursorOperations == 0 { + resource.cursor = resource.pendingCursor + resource.pendingCursor = nil + } else { + typeconv.Convert(&resource.cursor, op.delta) + } + + if resource.internalState.Updated.Before(op.timestamp) { + resource.internalState.Updated = op.timestamp + } + + err := op.store.persistentStore.Set(resource.key, resource.inSyncStateSnapshot()) + if err != nil { + if !statestore.IsClosed(err) { + op.store.log.Errorf("Failed to update state in the registry for '%v'", resource.key) + } + } else { + resource.internalInSync = true + resource.stored = true + } +} diff --git a/filebeat/input/v2/input-cursor/publish_test.go b/filebeat/input/v2/input-cursor/publish_test.go new file mode 100644 index 00000000000..28c274baf94 --- /dev/null +++ b/filebeat/input/v2/input-cursor/publish_test.go @@ -0,0 +1,158 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cursor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" +) + +func TestPublish(t *testing.T) { + t.Run("event with cursor state creates update operation", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + cursor := makeCursor(store, store.Get("test::key")) + + var actual beat.Event + client := &pubtest.FakeClient{ + PublishFunc: func(event beat.Event) { actual = event }, + } + publisher := cursorPublisher{nil, client, &cursor} + publisher.Publish(beat.Event{}, "test") + + require.NotNil(t, actual.Private) + }) + + t.Run("event without cursor creates no update operation", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + cursor := makeCursor(store, store.Get("test::key")) + + var actual beat.Event + client := &pubtest.FakeClient{ + PublishFunc: func(event beat.Event) { actual = event }, + } + publisher := cursorPublisher{nil, client, &cursor} + publisher.Publish(beat.Event{}, nil) + require.Nil(t, actual.Private) + }) + + t.Run("publish returns error if context has been cancelled", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + cursor := makeCursor(store, store.Get("test::key")) + + publisher := cursorPublisher{ctx, &pubtest.FakeClient{}, &cursor} + err := publisher.Publish(beat.Event{}, nil) + require.Equal(t, context.Canceled, err) + }) +} + +func TestOp_Execute(t *testing.T) { + t.Run("applying final op marks the key as finished", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + res := store.Get("test::key") + + // create op and release resource. The 'resource' must still be active + op := mustCreateUpdateOp(t, store, res, "test-updated-cursor-state") + res.Release() + require.False(t, res.Finished()) + + // this was the last op, the resource should become inactive + op.Execute(1) + require.True(t, res.Finished()) + + // validate state: + inSyncCursor := storeInSyncSnapshot(store)["test::key"].Cursor + inMemCursor := storeMemorySnapshot(store)["test::key"].Cursor + want := "test-updated-cursor-state" + assert.Equal(t, want, inSyncCursor) + assert.Equal(t, want, inMemCursor) + }) + + t.Run("acking multiple ops applies the latest update and marks key as finished", func(t *testing.T) { + // when acking N events, intermediate updates are dropped in favor of the latest update operation. + // This test checks that the resource is correctly marked as finished. + + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + res := store.Get("test::key") + + // create update operations and release resource. The 'resource' must still be active + mustCreateUpdateOp(t, store, res, "test-updated-cursor-state-dropped") + op := mustCreateUpdateOp(t, store, res, "test-updated-cursor-state-final") + res.Release() + require.False(t, res.Finished()) + + // this was the last op, the resource should become inactive + op.Execute(2) + require.True(t, res.Finished()) + + // validate state: + inSyncCursor := storeInSyncSnapshot(store)["test::key"].Cursor + inMemCursor := storeMemorySnapshot(store)["test::key"].Cursor + want := "test-updated-cursor-state-final" + assert.Equal(t, want, inSyncCursor) + assert.Equal(t, want, inMemCursor) + }) + + t.Run("ACK only subset of pending ops will only update up to ACKed state", func(t *testing.T) { + // when acking N events, intermediate updates are dropped in favor of the latest update operation. + // This test checks that the resource is correctly marked as finished. + + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + res := store.Get("test::key") + + // create update operations and release resource. The 'resource' must still be active + op1 := mustCreateUpdateOp(t, store, res, "test-updated-cursor-state-intermediate") + op2 := mustCreateUpdateOp(t, store, res, "test-updated-cursor-state-final") + res.Release() + require.False(t, res.Finished()) + + defer op2.done(1) // cleanup after test + + // this was the intermediate op, the resource should still be active + op1.Execute(1) + require.False(t, res.Finished()) + + // validate state (in memory state is always up to data to most recent update): + inSyncCursor := storeInSyncSnapshot(store)["test::key"].Cursor + inMemCursor := storeMemorySnapshot(store)["test::key"].Cursor + assert.Equal(t, "test-updated-cursor-state-intermediate", inSyncCursor) + assert.Equal(t, "test-updated-cursor-state-final", inMemCursor) + }) +} + +func mustCreateUpdateOp(t *testing.T, store *store, resource *resource, updates interface{}) *updateOp { + op, err := createUpdateOp(store, resource, updates) + if err != nil { + t.Fatalf("Failed to create update op: %v", err) + } + return op +}