Skip to content

Commit

Permalink
[Heartbeat] Fix Continuation Dispatch / mode: all pings (#12458)
Browse files Browse the repository at this point in the history
Currently mode: all pings are broken. The root cause is that the scheduler dispatch of continuations in the heartbeat task model is broken. Since the only current use of this is `mode:all` pings that is what is affected. The issue was incorrectly aliasing variables when dispatching future work.

This issue is well described on this wiki page https://github.com/golang/go/wiki/CommonMistakes
  • Loading branch information
andrewvc authored Jun 7, 2019
1 parent 8502c79 commit e491e9a
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

- Fix NPE on some monitor configuration errors. {pull}11910[11910]
- Fix NPEs / resource leaks when executing config checks. {pull}11165[11165]
- Fix duplicated IPs on `mode: all` monitors. {pull}12458[12458]

*Journalbeat*

Expand Down
21 changes: 13 additions & 8 deletions heartbeat/monitors/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ func runPublishJob(job jobs.Job, client beat.Client) []scheduler.TaskFunc {
Fields: common.MapStr{},
}

next, err := job(event)
conts, err := job(event)
if err != nil {
logp.Err("Job %v failed with: ", err)
}

hasContinuations := len(next) > 0
hasContinuations := len(conts) > 0

if event.Fields != nil && !eventext.IsEventCancelled(event) {
// If continuations are present we defensively publish a clone of the event
Expand All @@ -162,15 +162,20 @@ func runPublishJob(job jobs.Job, client beat.Client) []scheduler.TaskFunc {
}
}

if len(next) == 0 {
if !hasContinuations {
return nil
}

continuations := make([]scheduler.TaskFunc, len(next))
for i, n := range next {
continuations[i] = func() []scheduler.TaskFunc {
return runPublishJob(n, client)
contTasks := make([]scheduler.TaskFunc, len(conts))
for i, cont := range conts {
// Move the continuation into the local block scope
// This is important since execution is deferred
// Without this only the last continuation will be executed len(conts) times
localCont := cont

contTasks[i] = func() []scheduler.TaskFunc {
return runPublishJob(localCont, client)
}
}
return continuations
return contTasks
}
24 changes: 21 additions & 3 deletions heartbeat/monitors/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ import (
)

func Test_runPublishJob(t *testing.T) {
simpleJob := func(event *beat.Event) (j []jobs.Job, e error) {
eventext.MergeEventFields(event, common.MapStr{"foo": "bar"})
return nil, nil
defineJob := func(fields common.MapStr) func(event *beat.Event) (j []jobs.Job, e error) {
return func(event *beat.Event) (j []jobs.Job, e error) {
eventext.MergeEventFields(event, fields)
return nil, nil
}
}
simpleJob := defineJob(common.MapStr{"foo": "bar"})

testCases := []struct {
name string
Expand All @@ -58,6 +61,21 @@ func Test_runPublishJob(t *testing.T) {
mapval.MustCompile(mapval.Map{"foo": "bar"}),
},
},
{
"multiple conts",
func(event *beat.Event) (j []jobs.Job, e error) {
simpleJob(event)
return []jobs.Job{
defineJob(common.MapStr{"baz": "bot"}),
defineJob(common.MapStr{"blah": "blargh"}),
}, nil
},
[]mapval.Validator{
mapval.MustCompile(mapval.Map{"foo": "bar"}),
mapval.MustCompile(mapval.Map{"baz": "bot"}),
mapval.MustCompile(mapval.Map{"blah": "blargh"}),
},
},
{
"cancelled cont",
func(event *beat.Event) (j []jobs.Job, e error) {
Expand Down

0 comments on commit e491e9a

Please sign in to comment.