Skip to content
This repository has been archived by the owner on Sep 21, 2023. It is now read-only.

Commit

Permalink
Activate event ID reporting in the shipper (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Aug 17, 2022
1 parent 34571c4 commit 5b427f7
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 93 deletions.
46 changes: 38 additions & 8 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ Third party libraries used by the Elastic Agent Shipper:

--------------------------------------------------------------------------------
Dependency : github.com/elastic/beats/v7
Version: v7.0.0-alpha2.0.20220722175030-7cb39607b349
Version: v7.0.0-alpha2.0.20220810153818-dd118efed5a5
Licence type (autodetected): Elastic
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/beats/v7@v7.0.0-alpha2.0.20220722175030-7cb39607b349/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/elastic/beats/v7@v7.0.0-alpha2.0.20220810153818-dd118efed5a5/LICENSE.txt:

Source code in this repository is variously licensed under the Apache License
Version 2.0, an Apache compatible license, or the Elastic License. Outside of
Expand Down Expand Up @@ -19944,11 +19944,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-a

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-system-metrics
Version: v0.4.3
Version: v0.4.4
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-system-metrics@v0.4.3/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-system-metrics@v0.4.4/LICENSE.txt:

Apache License
Version 2.0, January 2004
Expand Down Expand Up @@ -20788,11 +20788,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-elasticsearc

--------------------------------------------------------------------------------
Dependency : github.com/elastic/go-libaudit/v2
Version: v2.3.1
Version: v2.3.2-0.20220729123722-f8f7d5c19e6b
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/go-libaudit/v2@v2.3.1/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/elastic/go-libaudit/v2@v2.3.2-0.20220729123722-f8f7d5c19e6b/LICENSE.txt:


Apache License
Expand Down Expand Up @@ -21695,11 +21695,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-seccomp-bpf@

--------------------------------------------------------------------------------
Dependency : github.com/elastic/go-structform
Version: v0.0.9
Version: v0.0.10
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/go-structform@v0.0.9/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/go-structform@v0.0.10/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down Expand Up @@ -50851,6 +50851,36 @@ Contents of probable licence file $GOMODCACHE/gopkg.in/jcmturner/rpc.v1@v1.1.0/L
limitations under the License.


--------------------------------------------------------------------------------
Dependency : gopkg.in/natefinch/lumberjack.v2
Version: v2.0.0
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/gopkg.in/natefinch/lumberjack.v2@v2.0.0/LICENSE:

The MIT License (MIT)

Copyright (c) 2014 Nate Finch

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

--------------------------------------------------------------------------------
Dependency : gopkg.in/yaml.v1
Version: v1.0.0-20140924161607-9f9df34309c0
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
)

require (
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220722175030-7cb39607b349
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220810153818-dd118efed5a5
github.com/elastic/elastic-agent-client/v7 v7.0.0-20220804181728-b0328d2fe484
github.com/elastic/elastic-agent-shipper-client v0.4.0
github.com/elastic/go-ucfg v0.8.6
Expand All @@ -28,7 +28,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/elastic/go-licenser v0.4.1 // indirect
github.com/elastic/go-structform v0.0.9 // indirect
github.com/elastic/go-structform v0.0.10 // indirect
github.com/elastic/go-sysinfo v1.8.1 // indirect
github.com/elastic/go-windows v1.0.1 // indirect
github.com/fatih/color v1.13.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elastic/bayeux v1.0.5/go.mod h1:CSI4iP7qeo5MMlkznGvYKftp8M7qqP/3nzmVZoXHY68=
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220722175030-7cb39607b349 h1:YtK5738FoqTiStMjK0oK94kYeUKaV5+9PHOGBY1XkNM=
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220722175030-7cb39607b349/go.mod h1:w9b9yyXtqmZIYrMzDxPmPlRvAco6pVLmHvqEr3PuoO8=
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220810153818-dd118efed5a5 h1:DQFUQvUtrZ9sWlv+p304kxh+1EqHgDP17DK0frLhs9o=
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220810153818-dd118efed5a5/go.mod h1:qmsQeDT6sq+bptuYmTBhJxTC0YoBoCh8yNE7ddAVbl4=
github.com/elastic/elastic-agent-autodiscover v0.2.1/go.mod h1:gPnzzfdYNdgznAb+iG9eyyXaQXBbAMHa+Y6Z8hXfcGY=
github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc=
github.com/elastic/elastic-agent-client/v7 v7.0.0-20220804181728-b0328d2fe484 h1:uJIMfLgCenJvxsVmEjBjYGxt0JddCgw2IxgoNfcIXOk=
Expand All @@ -504,20 +504,21 @@ github.com/elastic/elastic-agent-libs v0.2.11/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCz
github.com/elastic/elastic-agent-shipper-client v0.2.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q=
github.com/elastic/elastic-agent-shipper-client v0.4.0 h1:nsTJF9oo4RHLl+zxFUZqNHaE86C6Ba5aImfegcEf6Sk=
github.com/elastic/elastic-agent-shipper-client v0.4.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q=
github.com/elastic/elastic-agent-system-metrics v0.4.3/go.mod h1:tF/f9Off38nfzTZHIVQ++FkXrDm9keFhFpJ+3pQ00iI=
github.com/elastic/elastic-agent-system-metrics v0.4.4/go.mod h1:tF/f9Off38nfzTZHIVQ++FkXrDm9keFhFpJ+3pQ00iI=
github.com/elastic/elastic-transport-go/v8 v8.1.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
github.com/elastic/go-concert v0.2.0/go.mod h1:HWjpO3IAEJUxOeaJOWXWEp7imKd27foxz9V5vegC/38=
github.com/elastic/go-elasticsearch/v8 v8.2.0/go.mod h1:yY52i2Vj0unLz+N3Nwx1gM5LXwoj3h2dgptNGBYkMLA=
github.com/elastic/go-libaudit/v2 v2.3.1/go.mod h1:+ZE0czqmbqtnRkl0fNgpI+HvVVRo/ZMJdcXv/PaKcOo=
github.com/elastic/go-libaudit/v2 v2.3.2-0.20220729123722-f8f7d5c19e6b/go.mod h1:+ZE0czqmbqtnRkl0fNgpI+HvVVRo/ZMJdcXv/PaKcOo=
github.com/elastic/go-licenser v0.4.0/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU=
github.com/elastic/go-licenser v0.4.1 h1:1xDURsc8pL5zYT9R29425J3vkHdt4RT5TNEMeRN48x4=
github.com/elastic/go-licenser v0.4.1/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU=
github.com/elastic/go-lookslike v0.3.0/go.mod h1:AhH+rdJux5RlVjs+6ej4jkvYyoNRkj2crxmqeHlj3hA=
github.com/elastic/go-lumber v0.1.0/go.mod h1:8YvjMIRYypWuPvpxx7WoijBYdbB7XIh/9FqSYQZTtxQ=
github.com/elastic/go-perf v0.0.0-20191212140718-9c656876f595/go.mod h1:s09U1b4P1ZxnKx2OsqY7KlHdCesqZWIhyq0Gs/QC/Us=
github.com/elastic/go-seccomp-bpf v1.2.0/go.mod h1:l+89Vy5BzjVcaX8USZRMOwmwwDScE+vxCFzzvQwN7T8=
github.com/elastic/go-structform v0.0.9 h1:HpcS7xljL4kSyUfDJ8cXTJC6rU5ChL1wYb6cx3HLD+o=
github.com/elastic/go-structform v0.0.9/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4=
github.com/elastic/go-structform v0.0.10 h1:oy08o/Ih2hHTkNcRY/1HhaYvIp5z6t8si8gnCJPDo1w=
github.com/elastic/go-structform v0.0.10/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4=
github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-sysinfo v1.8.1 h1:4Yhj+HdV6WjbCRgGdZpPJ8lZQlXZLKDAeIkmQ/VRvi4=
github.com/elastic/go-sysinfo v1.8.1/go.mod h1:JfllUnzoQV/JRYymbH3dO1yggI3mV2oTKSXsDHM+uIM=
Expand Down Expand Up @@ -1927,7 +1928,6 @@ golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220702020025-31831981b65f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab h1:2QkjZIsXupsJbJIdSjjUOgWK3aEtzyuh2mPt3l/CkeU=
Expand Down
2 changes: 1 addition & 1 deletion output/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (out *ConsoleOutput) Start() {
break
}
for i := 0; i < batch.Count(); i++ {
if event, ok := batch.Event(i).(*messages.Event); ok {
if event, ok := batch.Entry(i).(*messages.Event); ok {
out.send(event)
}
}
Expand Down
4 changes: 4 additions & 0 deletions queue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,7 @@ func (c *Config) Validate() error {
}
return nil
}

func (c Config) useDiskQueue() bool {
return c.DiskSettings != nil
}
45 changes: 26 additions & 19 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
// features as the libbeat queue evolves and we decide what we want
// to support in the shipper.
type Queue struct {
config Config

eventQueue beatsqueue.Queue

producer beatsqueue.Producer
Expand All @@ -34,7 +36,7 @@ type Metrics beatsqueue.Metrics
// EntryID is a unique ascending id assigned to each entry that goes in the
// queue, to handle acknowledgments within the shipper and report progress
// to the client.
type EntryID uint64
type EntryID beatsqueue.EntryID

// metricsSource is a wrapper around the libbeat queue interface, exposing only
// the callback to query the current metrics. It is used to pass queue metrics
Expand All @@ -51,7 +53,7 @@ var (
func New(c Config) (*Queue, error) {
var eventQueue beatsqueue.Queue
// If both Disk & Mem settings exist, go with Disk
if c.DiskSettings != nil {
if c.useDiskQueue() {
var err error
eventQueue, err = diskqueue.NewQueue(logp.L(), *c.DiskSettings)
if err != nil {
Expand All @@ -61,23 +63,24 @@ func New(c Config) (*Queue, error) {
eventQueue = memqueue.NewQueue(logp.L(), *c.MemSettings)
}
producer := eventQueue.Producer(beatsqueue.ProducerConfig{})
return &Queue{eventQueue: eventQueue, producer: producer}, nil
return &Queue{config: c, eventQueue: eventQueue, producer: producer}, nil
}

func (queue *Queue) Publish(ctx context.Context, event *messages.Event) (EntryID, error) {
_ = ctx.Done()
// TODO pass the real channel once libbeat supports it
if !queue.producer.Publish(event /*, cancelCh*/) {
id, published := queue.producer.Publish(event /*, ctx.Done()*/)
if !published {
return EntryID(0), ErrQueueIsClosed
}
return EntryID(0), nil
return EntryID(id), nil
}

func (queue *Queue) TryPublish(event *messages.Event) (EntryID, error) {
if !queue.producer.TryPublish(event) {
id, published := queue.producer.TryPublish(event)
if !published {
return EntryID(0), ErrQueueIsFull
}
return EntryID(0), nil
return EntryID(id), nil
}

func (queue *Queue) Metrics() (Metrics, error) {
Expand All @@ -94,15 +97,19 @@ func (queue *Queue) Close() error {
return queue.eventQueue.Close()
}

func (queue *Queue) AcceptedIndex() EntryID {
return EntryID(0)
}

func (queue *Queue) PersistedIndex() EntryID {
// This function needs to be implemented differently depending on the queue
// type. For the memory queue, it should return the most recent sequential
// entry id that has been published and acknowledged by the outputs.
// For the disk queue, it should return the most recent sequential entry id
// that has been written to disk.
return EntryID(0)
func (queue *Queue) PersistedIndex() (EntryID, error) {
if queue.config.useDiskQueue() {
// TODO (https://github.com/elastic/elastic-agent-shipper/issues/27):
// Once the disk queue supports entry IDs, this should return the
// ID of the oldest entry that has not yet been written to disk.
return EntryID(0), nil
} else {
metrics, err := queue.eventQueue.Metrics()
if err != nil {
return EntryID(0), err
}
// When a memory queue event is persisted, it is removed from the queue,
// so we return the oldest remaining entry ID.
return EntryID(metrics.OldestEntryID), nil
}
}
4 changes: 2 additions & 2 deletions queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestMemoryQueueSimpleBatch(t *testing.T) {

assert.Equal(t, batch.Count(), eventCount)
for i := 0; i < eventCount; i++ {
event, ok := batch.Event(i).(*messages.Event)
event, ok := batch.Entry(i).(*messages.Event)
assert.True(t, ok, "queue output should have the same concrete type as its input")
// Need to use assert.True since assert.Equal* uses value comparison
// for unequal pointers.
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestQueueTypes(t *testing.T) {
assert.NoError(t, err, "couldn't get queue batch")
for i := 0; i < batch.Count(); i++ {
//get each event and mark the index as received
event, ok := batch.Event(i).(*messages.Event)
event, ok := batch.Entry(i).(*messages.Event)
require.True(t, ok)
data := event.GetFields().GetData()
testField, prs := data["message"]
Expand Down
Loading

0 comments on commit 5b427f7

Please sign in to comment.