Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Push context deadlines inside jobs and sync ops
Browse files Browse the repository at this point in the history
Synchronising with the cluster can easily take tens of seconds, so it
is better to have timeouts for individual operations or small batches
of operations, rather than the whole thing.

Similarly, jobs can wait in the job queue for e.g., syncs to happen,
so it is necessary to time them from when they _start_, not when they
are requested.

Lastly, fix and use `git.Repo.NoteRevList` when answering JobStatus
from commit notes, since that is exactly the expensive batch of git
operations that motivated it.
  • Loading branch information
squaremo committed Aug 31, 2017
1 parent f2b2c6a commit ead8413
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 70 deletions.
65 changes: 35 additions & 30 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import (
)

const (
defaultDaemonTimeout = 10 * time.Second
// This is set to be in sympathy with the request / RPC timeout (i.e., empirically)
defaultHandlerTimeout = 10 * time.Second
// A job can take an arbitrary amount of time but we want to have
// a (generous) threshold for considering a job stuck and
// abandoning it
defaultJobTimeout = 60 * time.Second
)

// Combine these things to form Devasta^Wan implementation of
Expand Down Expand Up @@ -125,16 +130,17 @@ func (d *Daemon) ListImages(spec update.ServiceSpec) ([]flux.ImageStatus, error)
// Let's use the CommitEventMetadata as a convenient transport for the
// results of a job; if no commit was made (e.g., if it was a dry
// run), leave the revision field empty.
type DaemonJobFunc func(jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error)
type DaemonJobFunc func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error)

// Must cancel the context once this job is complete
func (d *Daemon) queueJob(ctx context.Context, cancel context.CancelFunc, do DaemonJobFunc) job.ID {
func (d *Daemon) queueJob(do DaemonJobFunc) job.ID {
id := job.ID(guid.New())
d.Jobs.Enqueue(&job.Job{
ID: id,
Do: func(logger log.Logger) error {
defer cancel()
started := time.Now().UTC()
ctx, cancel := context.WithTimeout(context.Background(), defaultJobTimeout)
defer cancel()
d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusRunning})
// make a working clone so we don't mess with files we
// will be reading from elsewhere
Expand All @@ -144,7 +150,7 @@ func (d *Daemon) queueJob(ctx context.Context, cancel context.CancelFunc, do Dae
return err
}
defer working.Clean()
metadata, err := do(id, working, logger)
metadata, err := do(ctx, id, working, logger)
if err != nil {
d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusFailed, Err: err.Error()})
return err
Expand Down Expand Up @@ -176,23 +182,22 @@ func (d *Daemon) queueJob(ctx context.Context, cancel context.CancelFunc, do Dae

// Apply the desired changes to the config files
func (d *Daemon) UpdateManifests(spec update.Spec) (job.ID, error) {
ctx, cancel := newDaemonContext()
var id job.ID
if spec.Type == "" {
return id, errors.New("no type in update spec")
}
switch s := spec.Spec.(type) {
case release.Changes:
return d.queueJob(ctx, cancel, d.release(ctx, spec, s)), nil
return d.queueJob(d.release(spec, s)), nil
case policy.Updates:
return d.queueJob(ctx, cancel, d.updatePolicy(ctx, spec, s)), nil
return d.queueJob(d.updatePolicy(spec, s)), nil
default:
return id, fmt.Errorf(`unknown update type "%s"`, spec.Type)
}
}

func (d *Daemon) updatePolicy(ctx context.Context, spec update.Spec, updates policy.Updates) DaemonJobFunc {
return func(jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error) {
func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) DaemonJobFunc {
return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error) {
// For each update
var serviceIDs []flux.ServiceID
metadata := &history.CommitEventMetadata{
Expand Down Expand Up @@ -267,8 +272,8 @@ func (d *Daemon) updatePolicy(ctx context.Context, spec update.Spec, updates pol
}
}

func (d *Daemon) release(ctx context.Context, spec update.Spec, c release.Changes) DaemonJobFunc {
return func(jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error) {
func (d *Daemon) release(spec update.Spec, c release.Changes) DaemonJobFunc {
return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error) {
rc := release.NewReleaseContext(d.Cluster, d.Manifests, d.Registry, working)
result, err := release.Release(rc, c, logger)
if err != nil {
Expand Down Expand Up @@ -313,7 +318,7 @@ func (d *Daemon) SyncNotify() error {
// Ask the daemon how far it's got committing things; in particular, is the job
// queued? running? committed? If it is done, the commit ref is returned.
func (d *Daemon) JobStatus(jobID job.ID) (job.Status, error) {
ctx, cancel := newDaemonContext()
ctx, cancel := context.WithTimeout(context.Background(), defaultHandlerTimeout)
defer cancel()
// Is the job queued, running, or recently finished?
status, ok := d.JobStatusCache.Status(jobID)
Expand All @@ -324,24 +329,28 @@ func (d *Daemon) JobStatus(jobID job.ID) (job.Status, error) {
// Look through the commits for a note referencing this job. This
// means that even if fluxd restarts, we will at least remember
// jobs which have pushed a commit.
if err := d.Checkout.Pull(ctx); err != nil {
return job.Status{}, errors.Wrap(err, "updating repo for status")
notes, err := d.Checkout.NoteRevList(ctx)
if err != nil {
return job.Status{}, errors.Wrap(err, "enumerating commit notes")
}
commits, err := d.Checkout.CommitsBefore(ctx, "HEAD")
if err != nil {
return job.Status{}, errors.Wrap(err, "checking revisions for status")
}

for _, commit := range commits {
note, _ := d.Checkout.GetNote(ctx, commit.Revision)
if note != nil && note.JobID == jobID {
return job.Status{
StatusString: job.StatusSucceeded,
Result: history.CommitEventMetadata{
Revision: commit.Revision,
Spec: &note.Spec,
Result: note.Result,
},
}, nil
if _, ok := notes[commit.Revision]; ok {
note, _ := d.Checkout.GetNote(ctx, commit.Revision)
if note != nil && note.JobID == jobID {
return job.Status{
StatusString: job.StatusSucceeded,
Result: history.CommitEventMetadata{
Revision: commit.Revision,
Spec: &note.Spec,
Result: note.Result,
},
}, nil
}
}
}

Expand All @@ -354,7 +363,7 @@ func (d *Daemon) JobStatus(jobID job.ID) (job.Status, error) {
// you'll get all the commits yet to be applied. If you send a hash
// and it's applied _past_ it, you'll get an empty list.
func (d *Daemon) SyncStatus(commitRef string) ([]string, error) {
ctx, cancel := newDaemonContext()
ctx, cancel := context.WithTimeout(context.Background(), defaultHandlerTimeout)
defer cancel()
commits, err := d.Checkout.CommitsBetween(ctx, d.Checkout.SyncTag, commitRef)
if err != nil {
Expand Down Expand Up @@ -502,7 +511,3 @@ func policyEventTypes(u policy.Update) []string {
sort.Strings(result)
return result
}

func newDaemonContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultDaemonTimeout)
}
6 changes: 4 additions & 2 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package daemon

import (
"bufio"
"context"
"fmt"
"os"
"path/filepath"
Expand All @@ -10,8 +11,8 @@ import (
"testing"
"time"

"context"
"github.com/go-kit/kit/log"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/cluster/kubernetes"
Expand Down Expand Up @@ -304,7 +305,7 @@ func TestDaemon_JobStatusWithNoCache(t *testing.T) {
}

func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, history.EventReadWriter) {
logger := log.NewLogfmtLogger(os.Stdout)
logger := log.NewNopLogger()

singleService := cluster.Service{
ID: flux.ServiceID(svc),
Expand Down Expand Up @@ -500,6 +501,7 @@ func updatePolicy(t *testing.T, d *Daemon) job.ID {
},
})
}

func updateManifest(t *testing.T, d *Daemon, spec update.Spec) job.ID {
id, err := d.UpdateManifests(spec)
if err != nil {
Expand Down
104 changes: 72 additions & 32 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
)

const (
defaultSyncTimeout = 30 * time.Second // Max time allowed for syncs
// Timeout for git operations we're prepared to abandon
gitOpTimeout = 15 * time.Second
)

type LoopVars struct {
Expand Down Expand Up @@ -48,7 +49,7 @@ func (d *Daemon) GitPollLoop(stop chan struct{}, wg *sync.WaitGroup, logger log.
gitPollTimer.Stop()
gitPollTimer = time.NewTimer(d.GitPollInterval)
}()
ctx, cancel := context.WithTimeout(context.Background(), defaultSyncTimeout)
ctx, cancel := context.WithTimeout(context.Background(), gitOpTimeout)
defer cancel()
if err := d.Checkout.Pull(ctx); err != nil {
logger.Log("operation", "pull", "err", err)
Expand Down Expand Up @@ -116,17 +117,25 @@ func (d *LoopVars) askForImagePoll() {
// -- extra bits the loop needs

func (d *Daemon) doSync(logger log.Logger) {
ctx, cancel := context.WithTimeout(context.Background(), defaultSyncTimeout)
defer cancel()
started := time.Now().UTC()
// We don't care how long this takes overall, only about not
// getting bogged down in certain operations, so use an
// undeadlined context in general.
ctx := context.Background()

// checkout a working clone so we can mess around with tags later
working, err := d.Checkout.WorkingClone(ctx)
if err != nil {
logger.Log("err", err)
return
var working *git.Checkout
{
var err error
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
defer cancel()
working, err = d.Checkout.WorkingClone(ctx)
if err != nil {
logger.Log("err", err)
return
}
defer working.Clean()
}
defer working.Clean()

// TODO logging, metrics?
// Get a map of all resources defined in the repo
Expand All @@ -139,18 +148,33 @@ func (d *Daemon) doSync(logger log.Logger) {
// TODO supply deletes argument from somewhere (command-line?)
if err := fluxsync.Sync(d.Manifests, allResources, d.Cluster, false, logger); err != nil {
logger.Log("err", err)
// TODO(michael): we should distinguish between "fully mostly
// succeeded" and "failed utterly", since we want to abandon
// this and not move the tag (and send a SyncFail event
// upstream?), if the latter. For now, it's presumed that any
// error returned is at worst a minor, partial failure (e.g.,
// a small number of resources failed to sync, for unimportant
// reasons)
}

var initialSync bool
// update notes and emit events for applied commits
commits, err := working.CommitsBetween(ctx, working.SyncTag, "HEAD")
if isUnknownRevision(err) {
// No sync tag, grab all revisions
initialSync = true
commits, err = working.CommitsBefore(ctx, "HEAD")
}
if err != nil {
logger.Log("err", err)

var initialSync bool
var commits []git.Commit
{
var err error
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
commits, err = working.CommitsBetween(ctx, working.SyncTag, "HEAD")
if isUnknownRevision(err) {
// No sync tag, grab all revisions
initialSync = true
commits, err = working.CommitsBefore(ctx, "HEAD")
}
cancel()
if err != nil {
logger.Log("err", err)
return
}
}

// Figure out which service IDs changed in this release
Expand All @@ -160,11 +184,13 @@ func (d *Daemon) doSync(logger log.Logger) {
// no synctag, We are syncing everything from scratch
changedResources = allResources
} else {
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
changedFiles, err := working.ChangedFiles(ctx, working.SyncTag)
if err == nil {
// We had some changed files, we're syncing a diff
changedResources, err = d.Manifests.LoadManifests(changedFiles...)
}
cancel()
if err != nil {
logger.Log("err", errors.Wrap(err, "loading resources from repo"))
return
Expand All @@ -176,10 +202,15 @@ func (d *Daemon) doSync(logger log.Logger) {
serviceIDs.Add(r.ServiceIDs(allResources))
}

notes, err := working.NoteRevList(ctx)
if err != nil {
logger.Log("err", errors.Wrap(err, "loading notes from repo"))
return
var notes map[string]struct{}
{
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
notes, err = working.NoteRevList(ctx)
cancel()
if err != nil {
logger.Log("err", errors.Wrap(err, "loading notes from repo"))
return
}
}

// Collect any events that come from notes attached to the commits
Expand All @@ -193,16 +224,16 @@ func (d *Daemon) doSync(logger log.Logger) {

// Find notes in revisions.
for i := len(commits) - 1; i >= 0; i-- {
if ok := notes[commits[i].Revision]; !ok {
if _, ok := notes[commits[i].Revision]; !ok {
includes[history.NoneOfTheAbove] = true
continue
}
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
n, err := working.GetNote(ctx, commits[i].Revision)
cancel()
if err != nil {
logger.Log("err", errors.Wrap(err, "loading notes from repo; possibly no notes"))
// TODO: We're ignoring all errors here, not just the "no notes" error. Parse error to report proper errors.
includes[history.NoneOfTheAbove] = true
continue
return
}
if n == nil {
includes[history.NoneOfTheAbove] = true
Expand Down Expand Up @@ -300,17 +331,27 @@ func (d *Daemon) doSync(logger log.Logger) {
}

// Move the tag and push it so we know how far we've gotten.
if err := working.MoveTagAndPush(ctx, "HEAD", "Sync pointer"); err != nil {
logger.Log("err", err)
{
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
err := working.MoveTagAndPush(ctx, "HEAD", "Sync pointer")
cancel()
if err != nil {
logger.Log("err", err)
return
}
}

// Pull the tag if it has changed
if err := d.updateTagRev(ctx, working, logger); err != nil {
logger.Log("err", errors.Wrap(err, "updating tag"))
{
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
if err := d.pullIfTagMoved(ctx, working, logger); err != nil {
logger.Log("err", errors.Wrap(err, "updating tag"))
}
cancel()
}
}

func (d *Daemon) updateTagRev(ctx context.Context, working *git.Checkout, logger log.Logger) error {
func (d *Daemon) pullIfTagMoved(ctx context.Context, working *git.Checkout, logger log.Logger) error {
oldTagRev, err := d.Checkout.TagRevision(ctx, d.Checkout.SyncTag)
if err != nil && !strings.Contains(err.Error(), "unknown revision or path not in the working tree") {
return err
Expand All @@ -322,7 +363,6 @@ func (d *Daemon) updateTagRev(ctx context.Context, working *git.Checkout, logger

if oldTagRev != newTagRev {
logger.Log("tag", d.Checkout.SyncTag, "old", oldTagRev, "new", newTagRev)

if err := d.Checkout.Pull(ctx); err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions git/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,17 @@ func getNote(ctx context.Context, workingDir, notesRef, rev string) (*Note, erro
// Get all revisions with a note (NB: DO NOT RELY ON THE ORDERING)
// It appears to be ordered by ascending git object ref, not by time.
// Return a map to make it easier to do "if in" type queries.
func noteRevList(ctx context.Context, workingDir, notesRef string) (map[string]bool, error) {
func noteRevList(ctx context.Context, workingDir, notesRef string) (map[string]struct{}, error) {
out := &bytes.Buffer{}
if err := execGitCmd(ctx, workingDir, nil, out, "notes", "--ref", notesRef, "list"); err != nil {
return nil, err
}
noteList := splitList(out.String())
result := make(map[string]bool, len(noteList))
result := make(map[string]struct{}, len(noteList))
for _, l := range noteList {
split := strings.Fields(l)
if len(split) > 0 {
result[split[1]] = true // First field contains the object ref (commit id in our case)
result[split[1]] = struct{}{} // First field contains the object ref (commit id in our case)
}
}
return result, nil
Expand Down
Loading

0 comments on commit ead8413

Please sign in to comment.