Skip to content

Commit

Permalink
Honor kube event resysncs to handle missed watch events (#22668) (#23219
Browse files Browse the repository at this point in the history
)

(cherry picked from commit 6678a66)

Co-authored-by: Vijay Samuel <vjsamuel@ebay.com>
  • Loading branch information
ChrsMark and vjsamuel authored Jan 4, 2021
1 parent 2d98527 commit 57bb61f
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added new `rate_limit` processor for enforcing rate limits on event throughput. {pull}22883[22883]
- Allow node/namespace metadata to be disabled on kubernetes metagen and ensure add_kubernetes_metadata honors host {pull}23012[23012]
- Improve equals check. {pull}22778[22778]
- Honor kube event resysncs to handle missed watch events {pull}22668[22668]

*Auditbeat*

Expand Down
33 changes: 25 additions & 8 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ func (a *Autodiscover) worker() {
}

func (a *Autodiscover) handleStart(event bus.Event) bool {
var updated bool

a.logger.Debugf("Got a start event: %v", event)

eventID := getID(event)
Expand All @@ -181,7 +179,7 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {

// Ensure configs list exists for this instance
if _, ok := a.configs[eventID]; !ok {
a.configs[eventID] = map[uint64]*reload.ConfigWithMeta{}
a.configs[eventID] = make(map[uint64]*reload.ConfigWithMeta)
}

configs, err := a.configurer.CreateConfig(event)
Expand All @@ -196,6 +194,11 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {
}
}

var (
updated bool
newCfg = make(map[uint64]*reload.ConfigWithMeta)
)

meta := a.getMeta(event)
for _, config := range configs {
hash, err := cfgfile.HashConfig(config)
Expand All @@ -215,18 +218,32 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {
// Update meta no matter what
dynFields := a.meta.Store(hash, meta)

if a.configs[eventID][hash] != nil {
if cfg, ok := a.configs[eventID][hash]; ok {
a.logger.Debugf("Config %v is already running", common.DebugString(config, true))
newCfg[hash] = cfg
continue
} else {
newCfg[hash] = &reload.ConfigWithMeta{
Config: config,
Meta: &dynFields,
}
}

a.configs[eventID][hash] = &reload.ConfigWithMeta{
Config: config,
Meta: &dynFields,
}
updated = true
}

// If the new add event has lesser configs than the previous stable configuration then it means that there were
// configs that were removed in something like a resync event.
if len(newCfg) < len(a.configs[eventID]) {
updated = true
}

// By replacing the config's for eventID we make sure that all old configs that are no longer in use
// are stopped correctly. This will ensure that a resync event is handled correctly.
if updated {
a.configs[eventID] = newCfg
}

return updated
}

Expand Down
131 changes: 129 additions & 2 deletions libbeat/autodiscover/autodiscover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ type mockAdapter struct {
}

// CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter`
func (m *mockAdapter) CreateConfig(bus.Event) ([]*common.Config, error) {
func (m *mockAdapter) CreateConfig(event bus.Event) ([]*common.Config, error) {
if cfgs, ok := event["config"]; ok {
return cfgs.([]*common.Config), nil
}
return m.configs, nil
}

Expand All @@ -84,7 +87,6 @@ func (m *mockAdapter) CheckConfig(c *common.Config) error {
c.Unpack(&config)

if config.Broken {
fmt.Println("broken")
return fmt.Errorf("Broken config")
}

Expand Down Expand Up @@ -375,6 +377,131 @@ func TestAutodiscoverWithConfigCheckFailures(t *testing.T) {
assert.Equal(t, 1, len(autodiscover.configs["mock:foo"]))
}

func TestAutodiscoverWithMutlipleEntries(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Registry.AddProvider("mock", func(beatName string, b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
// intercept bus to mock events
busChan <- b

return &mockProvider{}, nil
})

// Create a mock adapter
runnerConfig, _ := common.NewConfigFrom(map[string]string{
"runner": "1",
})
adapter := mockAdapter{
configs: []*common.Config{runnerConfig},
}

// and settings:
providerConfig, _ := common.NewConfigFrom(map[string]string{
"type": "mock",
})
config := Config{
Providers: []*common.Config{providerConfig},
}
k, _ := keystore.NewFileKeystore("test")
// Create autodiscover manager
autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k)
if err != nil {
t.Fatal(err)
}

// Start it
autodiscover.Start()
defer autodiscover.Stop()
eventBus := <-busChan

// Test start event
eventBus.Publish(bus.Event{
"id": "foo",
"provider": "mock",
"start": true,
"meta": common.MapStr{
"foo": "bar",
},
"config": []*common.Config{
common.MustNewConfigFrom(map[string]interface{}{
"a": "b",
}),
common.MustNewConfigFrom(map[string]interface{}{
"x": "y",
}),
},
})
wait(t, func() bool { return len(adapter.Runners()) == 2 })

runners := adapter.Runners()
assert.Equal(t, len(runners), 2)
assert.Equal(t, len(autodiscover.configs["mock:foo"]), 2)
assert.True(t, runners[0].started)
assert.False(t, runners[0].stopped)
assert.True(t, runners[1].started)
assert.False(t, runners[1].stopped)
assert.Equal(t, runners[1].config, common.MustNewConfigFrom(map[string]interface{}{"x": "y"}))

// Test start event with changed configurations
eventBus.Publish(bus.Event{
"id": "foo",
"provider": "mock",
"start": true,
"meta": common.MapStr{
"foo": "bar",
},
"config": []*common.Config{
common.MustNewConfigFrom(map[string]interface{}{
"a": "b",
}),
common.MustNewConfigFrom(map[string]interface{}{
"x": "c",
}),
},
})
wait(t, func() bool { return len(adapter.Runners()) == 3 })
runners = adapter.Runners()
// Ensure the first config is the same as before
assert.Equal(t, runners[0].config, common.MustNewConfigFrom(map[string]interface{}{"a": "b"}))
assert.Equal(t, len(runners), 3)
assert.Equal(t, len(autodiscover.configs["mock:foo"]), 2)
assert.True(t, runners[0].started)
assert.False(t, runners[0].stopped)
// Ensure that the runner for the stale config is stopped
wait(t, func() bool { return adapter.Runners()[1].stopped == true })

// Ensure that the new runner is started
assert.False(t, runners[2].stopped)
assert.True(t, runners[2].started)
assert.Equal(t, runners[2].config, common.MustNewConfigFrom(map[string]interface{}{"x": "c"}))

// Stop all the configs
eventBus.Publish(bus.Event{
"id": "foo",
"provider": "mock",
"stop": true,
"meta": common.MapStr{
"foo": "bar",
},
"config": []*common.Config{
common.MustNewConfigFrom(map[string]interface{}{
"a": "b",
}),
common.MustNewConfigFrom(map[string]interface{}{
"x": "c",
}),
},
})

wait(t, func() bool { return adapter.Runners()[2].stopped == true })
runners = adapter.Runners()
assert.True(t, runners[0].stopped)
}

func wait(t *testing.T, test func() bool) {
sleep := 20 * time.Millisecond
ready := test()
Expand Down
38 changes: 26 additions & 12 deletions libbeat/autodiscover/providers/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (d *Provider) emitContainer(container *docker.Container, meta *dockerMetada
host = container.IPAddresses[0]
}

var events []bus.Event
// Without this check there would be overlapping configurations with and without ports.
if len(container.Ports) == 0 {
event := bus.Event{
Expand All @@ -289,7 +290,7 @@ func (d *Provider) emitContainer(container *docker.Container, meta *dockerMetada
"meta": meta.Metadata,
}

d.publish(event)
events = append(events, event)
}

// Emit container container and port information
Expand All @@ -304,25 +305,38 @@ func (d *Provider) emitContainer(container *docker.Container, meta *dockerMetada
"container": meta.Container,
"meta": meta.Metadata,
}

d.publish(event)
events = append(events, event)
}
d.publish(events)
}

func (d *Provider) publish(event bus.Event) {
// Try to match a config
if config := d.templates.GetConfig(event); config != nil {
event["config"] = config
} else {
// If no template matches, try builders:
if config := d.builders.GetConfig(d.generateHints(event)); config != nil {
event["config"] = config
func (d *Provider) publish(events []bus.Event) {
if len(events) == 0 {
return
}

configs := make([]*common.Config, 0)
for _, event := range events {
// Try to match a config
if config := d.templates.GetConfig(event); config != nil {
configs = append(configs, config...)
} else {
// If there isn't a default template then attempt to use builders
e := d.generateHints(event)
if config := d.builders.GetConfig(e); config != nil {
configs = append(configs, config...)
}
}
}

// Since all the events belong to the same event ID pick on and add in all the configs
event := bus.Event(common.MapStr(events[0]).Clone())
// Remove the port to avoid ambiguity during debugging
delete(event, "port")
event["config"] = configs

// Call all appenders to append any extra configuration
d.appenders.Append(event)

d.bus.Publish(event)
}

Expand Down
39 changes: 29 additions & 10 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,18 +165,37 @@ func (p *Provider) String() string {
return "kubernetes"
}

func (p *Provider) publish(event bus.Event) {
// Try to match a config
if config := p.templates.GetConfig(event); config != nil {
event["config"] = config
} else {
// If there isn't a default template then attempt to use builders
e := p.eventManager.GenerateHints(event)
if config := p.builders.GetConfig(e); config != nil {
event["config"] = config
func (p *Provider) publish(events []bus.Event) {
if len(events) == 0 {
return
}

configs := make([]*common.Config, 0)
id, _ := events[0]["id"]
for _, event := range events {
// Ensure that all events have the same ID. If not panic
if event["id"] != id {
panic("events from Kubernetes can't have different id fields")
}

// Try to match a config
if config := p.templates.GetConfig(event); config != nil {
configs = append(configs, config...)
} else {
// If there isn't a default template then attempt to use builders
e := p.eventManager.GenerateHints(event)
if config := p.builders.GetConfig(e); config != nil {
configs = append(configs, config...)
}
}
}

// Since all the events belong to the same event ID pick on and add in all the configs
event := bus.Event(common.MapStr(events[0]).Clone())
// Remove the port to avoid ambiguity during debugging
delete(event, "port")
event["config"] = configs

// Call all appenders to append any extra configuration
p.appenders.Append(event)
p.bus.Publish(event)
Expand Down Expand Up @@ -213,7 +232,7 @@ func NewEventerManager(
c *common.Config,
cfg *Config,
client k8s.Interface,
publish func(event bus.Event),
publish func(event []bus.Event),
) (EventManager, error) {
var err error
em := &eventerManager{}
Expand Down
13 changes: 7 additions & 6 deletions libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ type node struct {
config *Config
metagen metadata.MetaGen
logger *logp.Logger
publish func(bus.Event)
publish func([]bus.Event)
watcher kubernetes.Watcher
}

// NewNodeEventer creates an eventer that can discover and process node objects
func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, publish func(event bus.Event)) (Eventer, error) {
func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, publish func(event []bus.Event)) (Eventer, error) {
logger := logp.NewLogger("autodiscover.node")

config := defaultConfig()
Expand All @@ -65,9 +65,10 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
logger.Debugf("Initializing a new Kubernetes watcher using node: %v", config.Node)

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
IsUpdated: isUpdated,
SyncTimeout: config.SyncPeriod,
Node: config.Node,
IsUpdated: isUpdated,
HonorReSyncs: true,
}, nil)

if err != nil {
Expand Down Expand Up @@ -193,7 +194,7 @@ func (n *node) emit(node *kubernetes.Node, flag string) {
"kubernetes": meta,
},
}
n.publish(event)
n.publish([]bus.Event{event})
}

func isUpdated(o, n interface{}) bool {
Expand Down
Loading

0 comments on commit 57bb61f

Please sign in to comment.