Skip to content
This repository has been archived by the owner on Sep 9, 2020. It is now read-only.

Commit

Permalink
Fix bug where meta policy engine would not process job updates. (#16)
Browse files Browse the repository at this point in the history
The meta policy watcher was doing an incorrect comparison when
checking whether the modify index on a job specification returned
from the API. Previously it was comparing against the maxfound
which was being set to the Jobs().List() returned meta. This meant
the job modify index would always be <= as it could never be
greater than the value of the API returned meta index.

In addition to fixing the above mentioned bug; this commit adds
some contextual logging around the meta watcher to help identify
any future issues and make operating in this mode easier.

Closes #15
  • Loading branch information
jrasell authored Aug 1, 2019
1 parent 9f40720 commit 6b2a930
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 7 deletions.
2 changes: 2 additions & 0 deletions pkg/policy/watcher/meta_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
)

func (m *MetaWatcher) readJobMeta(jobID string) {
m.logger.Debug().Str("job", jobID).Msg("reading job group meta stanzas")

info, _, err := m.nomad.Jobs().Info(jobID, nil)
if err != nil {
m.logger.Error().Err(err).Msg("failed to call Nomad API for job information")
Expand Down
37 changes: 30 additions & 7 deletions pkg/policy/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ func NewMetaWatcher(l zerolog.Logger, nomad *api.Client, p backend.PolicyBackend
func (m *MetaWatcher) Run() {
m.logger.Info().Msg("starting Sherpa Nomad meta policy engine")

var maxFound uint64

q := &api.QueryOptions{WaitTime: 5 * time.Minute}

for {
var maxFound uint64

jobs, meta, err := m.nomad.Jobs().List(q)
if err != nil {
Expand All @@ -42,21 +43,36 @@ func (m *MetaWatcher) Run() {
m.logger.Debug().Msg("meta watcher last index has not changed")
continue
}
m.logger.Debug().
Uint64("old", q.WaitIndex).
Uint64("new", meta.LastIndex).
Msg("meta watcher last index has changed")

m.logger.Debug().Msg("meta watcher last index has changed")
maxFound = meta.LastIndex

// Iterate over all the returned jobs.
for i := range jobs {
if !m.indexHasChange(jobs[i].ModifyIndex, maxFound) {

// If the change index on the job is not newer than the previously recorded last index
// we should continue to the next job. It is important here to use the lastChangeIndex
// from the MetaWatcher as we want to process all jobs which have updated past this
// index.
if !m.indexHasChange(jobs[i].ModifyIndex, m.lastChangeIndex) {
continue
}

maxFound = jobs[i].ModifyIndex
m.logger.Debug().
Uint64("old", m.lastChangeIndex).
Uint64("new", jobs[i].ModifyIndex).
Str("job", jobs[i].ID).
Msg("job modify index has changed is greater than last recorded")

maxFound = m.maxFound(jobs[i].ModifyIndex, maxFound)
go m.readJobMeta(jobs[i].ID)
}

q.WaitIndex = maxFound
// Update the Nomad API wait index to start long polling from the correct point and update
// our recorded lastChangeIndex so we have the correct point to use during the next API
// return.
q.WaitIndex = meta.LastIndex
m.lastChangeIndex = maxFound
}
}
Expand All @@ -67,3 +83,10 @@ func (m *MetaWatcher) indexHasChange(new, old uint64) bool {
}
return true
}

func (m *MetaWatcher) maxFound(new, old uint64) uint64 {
if new <= old {
return old
}
return new
}
36 changes: 36 additions & 0 deletions pkg/policy/watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,39 @@ func TestMetaWatcher_indexHasChange(t *testing.T) {
assert.Equal(t, tc.expectedReturn, res)
}
}

func TestMetaWatcher_maxFound(t *testing.T) {
watcher := NewMetaWatcher(zerolog.Logger{}, nil, nil)

testCases := []struct {
newValue uint64
oldValue uint64
expectedReturn uint64
}{
{
newValue: 13,
oldValue: 7,
expectedReturn: 13,
},
{
newValue: 13696,
oldValue: 13696,
expectedReturn: 13696,
},
{
newValue: 7,
oldValue: 13,
expectedReturn: 13,
},
{
newValue: 1,
oldValue: 0,
expectedReturn: 1,
},
}

for _, tc := range testCases {
res := watcher.maxFound(tc.newValue, tc.oldValue)
assert.Equal(t, tc.expectedReturn, res)
}
}

0 comments on commit 6b2a930

Please sign in to comment.