Skip to content

Commit

Permalink
Implement storage handling for cursor inputs (elastic#19518)
Browse files Browse the repository at this point in the history
This change provide the store implementation that is used by the cursor
input to track ephemeral and persistent state.

The full list of changes will include:
- Introduce v2 API interfaces
- Introduce [compatibility layer](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/compat) to integrate API with existing functionality
- Introduce helpers for writing [stateless](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/input/v2/input-stateless/stateless.go) inputs.
- Introduce helpers for writing [inputs that store a state](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/input-cursor) between restarts.
- Integrate new API with [existing inputs and modules](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/beater/filebeat.go#L301) in filebeat.

(cherry picked from commit 0800ab1)
  • Loading branch information
Steffen Siering committed Jul 8, 2020
1 parent 9c79938 commit 623201a
Show file tree
Hide file tree
Showing 3 changed files with 513 additions and 5 deletions.
36 changes: 36 additions & 0 deletions filebeat/input/v2/input-cursor/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
)

// Publisher is used to publish an event and update the cursor in a single call to Publish.
Expand Down Expand Up @@ -70,3 +71,38 @@ func (c *cursorPublisher) Publish(event beat.Event, cursorUpdate interface{}) er
func (op *updateOp) Execute(numEvents uint) {
panic("TODO: implement me")
}

func createUpdateOp(store *store, resource *resource, updates interface{}) (*updateOp, error) {
ts := time.Now()

resource.stateMutex.Lock()
defer resource.stateMutex.Unlock()

cursor := resource.pendingCursor
if resource.activeCursorOperations == 0 {
var tmp interface{}
typeconv.Convert(&tmp, cursor)
resource.pendingCursor = tmp
cursor = tmp
}
if err := typeconv.Convert(&cursor, updates); err != nil {
return nil, err
}
resource.pendingCursor = cursor

resource.Retain()
resource.activeCursorOperations++
return &updateOp{
resource: resource,
store: store,
timestamp: ts,
delta: updates,
}, nil
}

// done releases resources held by the last N updateOps.
func (op *updateOp) done(n uint) {
op.resource.UpdatesReleaseN(n)
op.resource = nil
*op = updateOp{}
}
131 changes: 126 additions & 5 deletions filebeat/input/v2/input-cursor/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package cursor

import (
"strings"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/common/cleanup"
"github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"
Expand Down Expand Up @@ -126,7 +128,25 @@ type (
var closeStore = (*store).close

func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) {
panic("TODO: implement me")
ok := false

persistentStore, err := statestore.Access()
if err != nil {
return nil, err
}
defer cleanup.IfNot(&ok, func() { persistentStore.Close() })

states, err := readStates(log, persistentStore, prefix)
if err != nil {
return nil, err
}

ok = true
return &store{
log: log,
persistentStore: persistentStore,
ephemeralStore: states,
}, nil
}

func (s *store) Retain() { s.refCount.Retain() }
Expand All @@ -137,7 +157,9 @@ func (s *store) Release() {
}

func (s *store) close() {
panic("TODO: implement me")
if err := s.persistentStore.Close(); err != nil {
s.log.Errorf("Closing registry store did report an error: %+v", err)
}
}

// Get returns the resource for the key.
Expand All @@ -152,18 +174,62 @@ func (s *store) Get(key string) *resource {
// On update the resource its `cursor` state is used, to keep the cursor state in sync with the current known
// on disk store state.
func (s *store) UpdateTTL(resource *resource, ttl time.Duration) {
panic("TODO: implement me")
resource.stateMutex.Lock()
defer resource.stateMutex.Unlock()
if resource.stored && resource.internalState.TTL == ttl {
return
}

resource.internalState.TTL = ttl
if resource.internalState.Updated.IsZero() {
resource.internalState.Updated = time.Now()
}

err := s.persistentStore.Set(resource.key, state{
TTL: resource.internalState.TTL,
Updated: resource.internalState.Updated,
Cursor: resource.cursor,
})
if err != nil {
s.log.Errorf("Failed to update resource management fields for '%v'", resource.key)
resource.internalInSync = false
} else {
resource.stored = true
resource.internalInSync = true
}
}

// Find returns the resource for a given key. If the key is unknown and create is set to false nil will be returned.
// The resource returned by Find is marked as active. (*resource).Release must be called to mark the resource as inactive again.
func (s *states) Find(key string, create bool) *resource {
panic("TODO: implement me")
s.mu.Lock()
defer s.mu.Unlock()

if resource := s.table[key]; resource != nil {
resource.Retain()
return resource
}

if !create {
return nil
}

// resource is owned by table(session) and input that uses the resource.
resource := &resource{
stored: false,
key: key,
lock: unison.MakeMutex(),
}
s.table[key] = resource
resource.Retain()
return resource
}

// IsNew returns true if we have no state recorded for the current resource.
func (r *resource) IsNew() bool {
panic("TODO: implement me")
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
return r.pendingCursor == nil && r.cursor == nil
}

// Retain is used to indicate that 'resource' gets an additional 'owner'.
Expand Down Expand Up @@ -201,3 +267,58 @@ func (r *resource) inSyncStateSnapshot() state {
Cursor: r.cursor,
}
}

// stateSnapshot returns the current in memory state, that already contains state updates
// not yet ACKed.
func (r *resource) stateSnapshot() state {
cursor := r.pendingCursor
if r.activeCursorOperations == 0 {
cursor = r.cursor
}

return state{
TTL: r.internalState.TTL,
Updated: r.internalState.Updated,
Cursor: cursor,
}
}

func readStates(log *logp.Logger, store *statestore.Store, prefix string) (*states, error) {
keyPrefix := prefix + "::"
states := &states{
table: map[string]*resource{},
}

err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) {
if !strings.HasPrefix(string(key), keyPrefix) {
return true, nil
}

var st state
if err := dec.Decode(&st); err != nil {
log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v",
key, err)
return true, nil
}

resource := &resource{
key: key,
stored: true,
lock: unison.MakeMutex(),
internalInSync: true,
internalState: stateInternal{
TTL: st.TTL,
Updated: st.Updated,
},
cursor: st.Cursor,
}
states.table[resource.key] = resource

return true, nil
})

if err != nil {
return nil, err
}
return states, nil
}
Loading

0 comments on commit 623201a

Please sign in to comment.