Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #726 from weaveworks/timeout-jobs-etc
Browse files Browse the repository at this point in the history
Push context deadlines inside jobs and sync ops
  • Loading branch information
squaremo authored Sep 1, 2017
2 parents f9a4dea + ead8413 commit b08d2a5
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 70 deletions.
65 changes: 35 additions & 30 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import (
)

const (
defaultDaemonTimeout = 10 * time.Second
// This is set to be in sympathy with the request / RPC timeout (i.e., empirically)
defaultHandlerTimeout = 10 * time.Second
// A job can take an arbitrary amount of time but we want to have
// a (generous) threshold for considering a job stuck and
// abandoning it
defaultJobTimeout = 60 * time.Second
)

// Combine these things to form Devasta^Wan implementation of
Expand Down Expand Up @@ -125,16 +130,17 @@ func (d *Daemon) ListImages(spec update.ServiceSpec) ([]flux.ImageStatus, error)
// Let's use the CommitEventMetadata as a convenient transport for the
// results of a job; if no commit was made (e.g., if it was a dry
// run), leave the revision field empty.
type DaemonJobFunc func(jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error)
type DaemonJobFunc func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error)

// Must cancel the context once this job is complete
func (d *Daemon) queueJob(ctx context.Context, cancel context.CancelFunc, do DaemonJobFunc) job.ID {
func (d *Daemon) queueJob(do DaemonJobFunc) job.ID {
id := job.ID(guid.New())
d.Jobs.Enqueue(&job.Job{
ID: id,
Do: func(logger log.Logger) error {
defer cancel()
started := time.Now().UTC()
ctx, cancel := context.WithTimeout(context.Background(), defaultJobTimeout)
defer cancel()
d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusRunning})
// make a working clone so we don't mess with files we
// will be reading from elsewhere
Expand All @@ -144,7 +150,7 @@ func (d *Daemon) queueJob(ctx context.Context, cancel context.CancelFunc, do Dae
return err
}
defer working.Clean()
metadata, err := do(id, working, logger)
metadata, err := do(ctx, id, working, logger)
if err != nil {
d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusFailed, Err: err.Error()})
return err
Expand Down Expand Up @@ -176,23 +182,22 @@ func (d *Daemon) queueJob(ctx context.Context, cancel context.CancelFunc, do Dae

// Apply the desired changes to the config files
func (d *Daemon) UpdateManifests(spec update.Spec) (job.ID, error) {
ctx, cancel := newDaemonContext()
var id job.ID
if spec.Type == "" {
return id, errors.New("no type in update spec")
}
switch s := spec.Spec.(type) {
case release.Changes:
return d.queueJob(ctx, cancel, d.release(ctx, spec, s)), nil
return d.queueJob(d.release(spec, s)), nil
case policy.Updates:
return d.queueJob(ctx, cancel, d.updatePolicy(ctx, spec, s)), nil
return d.queueJob(d.updatePolicy(spec, s)), nil
default:
return id, fmt.Errorf(`unknown update type "%s"`, spec.Type)
}
}

func (d *Daemon) updatePolicy(ctx context.Context, spec update.Spec, updates policy.Updates) DaemonJobFunc {
return func(jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error) {
func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) DaemonJobFunc {
return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error) {
// For each update
var serviceIDs []flux.ServiceID
metadata := &history.CommitEventMetadata{
Expand Down Expand Up @@ -267,8 +272,8 @@ func (d *Daemon) updatePolicy(ctx context.Context, spec update.Spec, updates pol
}
}

func (d *Daemon) release(ctx context.Context, spec update.Spec, c release.Changes) DaemonJobFunc {
return func(jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error) {
func (d *Daemon) release(spec update.Spec, c release.Changes) DaemonJobFunc {
return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error) {
rc := release.NewReleaseContext(d.Cluster, d.Manifests, d.Registry, working)
result, err := release.Release(rc, c, logger)
if err != nil {
Expand Down Expand Up @@ -313,7 +318,7 @@ func (d *Daemon) SyncNotify() error {
// Ask the daemon how far it's got committing things; in particular, is the job
// queued? running? committed? If it is done, the commit ref is returned.
func (d *Daemon) JobStatus(jobID job.ID) (job.Status, error) {
ctx, cancel := newDaemonContext()
ctx, cancel := context.WithTimeout(context.Background(), defaultHandlerTimeout)
defer cancel()
// Is the job queued, running, or recently finished?
status, ok := d.JobStatusCache.Status(jobID)
Expand All @@ -324,24 +329,28 @@ func (d *Daemon) JobStatus(jobID job.ID) (job.Status, error) {
// Look through the commits for a note referencing this job. This
// means that even if fluxd restarts, we will at least remember
// jobs which have pushed a commit.
if err := d.Checkout.Pull(ctx); err != nil {
return job.Status{}, errors.Wrap(err, "updating repo for status")
notes, err := d.Checkout.NoteRevList(ctx)
if err != nil {
return job.Status{}, errors.Wrap(err, "enumerating commit notes")
}
commits, err := d.Checkout.CommitsBefore(ctx, "HEAD")
if err != nil {
return job.Status{}, errors.Wrap(err, "checking revisions for status")
}

for _, commit := range commits {
note, _ := d.Checkout.GetNote(ctx, commit.Revision)
if note != nil && note.JobID == jobID {
return job.Status{
StatusString: job.StatusSucceeded,
Result: history.CommitEventMetadata{
Revision: commit.Revision,
Spec: &note.Spec,
Result: note.Result,
},
}, nil
if _, ok := notes[commit.Revision]; ok {
note, _ := d.Checkout.GetNote(ctx, commit.Revision)
if note != nil && note.JobID == jobID {
return job.Status{
StatusString: job.StatusSucceeded,
Result: history.CommitEventMetadata{
Revision: commit.Revision,
Spec: &note.Spec,
Result: note.Result,
},
}, nil
}
}
}

Expand All @@ -354,7 +363,7 @@ func (d *Daemon) JobStatus(jobID job.ID) (job.Status, error) {
// you'll get all the commits yet to be applied. If you send a hash
// and it's applied _past_ it, you'll get an empty list.
func (d *Daemon) SyncStatus(commitRef string) ([]string, error) {
ctx, cancel := newDaemonContext()
ctx, cancel := context.WithTimeout(context.Background(), defaultHandlerTimeout)
defer cancel()
commits, err := d.Checkout.CommitsBetween(ctx, d.Checkout.SyncTag, commitRef)
if err != nil {
Expand Down Expand Up @@ -502,7 +511,3 @@ func policyEventTypes(u policy.Update) []string {
sort.Strings(result)
return result
}

func newDaemonContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultDaemonTimeout)
}
6 changes: 4 additions & 2 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package daemon

import (
"bufio"
"context"
"fmt"
"os"
"path/filepath"
Expand All @@ -10,8 +11,8 @@ import (
"testing"
"time"

"context"
"github.com/go-kit/kit/log"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/cluster/kubernetes"
Expand Down Expand Up @@ -304,7 +305,7 @@ func TestDaemon_JobStatusWithNoCache(t *testing.T) {
}

func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, history.EventReadWriter) {
logger := log.NewLogfmtLogger(os.Stdout)
logger := log.NewNopLogger()

singleService := cluster.Service{
ID: flux.ServiceID(svc),
Expand Down Expand Up @@ -500,6 +501,7 @@ func updatePolicy(t *testing.T, d *Daemon) job.ID {
},
})
}

func updateManifest(t *testing.T, d *Daemon, spec update.Spec) job.ID {
id, err := d.UpdateManifests(spec)
if err != nil {
Expand Down
104 changes: 72 additions & 32 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
)

const (
defaultSyncTimeout = 30 * time.Second // Max time allowed for syncs
// Timeout for git operations we're prepared to abandon
gitOpTimeout = 15 * time.Second
)

type LoopVars struct {
Expand Down Expand Up @@ -48,7 +49,7 @@ func (d *Daemon) GitPollLoop(stop chan struct{}, wg *sync.WaitGroup, logger log.
gitPollTimer.Stop()
gitPollTimer = time.NewTimer(d.GitPollInterval)
}()
ctx, cancel := context.WithTimeout(context.Background(), defaultSyncTimeout)
ctx, cancel := context.WithTimeout(context.Background(), gitOpTimeout)
defer cancel()
if err := d.Checkout.Pull(ctx); err != nil {
logger.Log("operation", "pull", "err", err)
Expand Down Expand Up @@ -116,17 +117,25 @@ func (d *LoopVars) askForImagePoll() {
// -- extra bits the loop needs

func (d *Daemon) doSync(logger log.Logger) {
ctx, cancel := context.WithTimeout(context.Background(), defaultSyncTimeout)
defer cancel()
started := time.Now().UTC()
// We don't care how long this takes overall, only about not
// getting bogged down in certain operations, so use an
// undeadlined context in general.
ctx := context.Background()

// checkout a working clone so we can mess around with tags later
working, err := d.Checkout.WorkingClone(ctx)
if err != nil {
logger.Log("err", err)
return
var working *git.Checkout
{
var err error
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
defer cancel()
working, err = d.Checkout.WorkingClone(ctx)
if err != nil {
logger.Log("err", err)
return
}
defer working.Clean()
}
defer working.Clean()

// TODO logging, metrics?
// Get a map of all resources defined in the repo
Expand All @@ -139,18 +148,33 @@ func (d *Daemon) doSync(logger log.Logger) {
// TODO supply deletes argument from somewhere (command-line?)
if err := fluxsync.Sync(d.Manifests, allResources, d.Cluster, false, logger); err != nil {
logger.Log("err", err)
// TODO(michael): we should distinguish between "fully mostly
// succeeded" and "failed utterly", since we want to abandon
// this and not move the tag (and send a SyncFail event
// upstream?), if the latter. For now, it's presumed that any
// error returned is at worst a minor, partial failure (e.g.,
// a small number of resources failed to sync, for unimportant
// reasons)
}

var initialSync bool
// update notes and emit events for applied commits
commits, err := working.CommitsBetween(ctx, working.SyncTag, "HEAD")
if isUnknownRevision(err) {
// No sync tag, grab all revisions
initialSync = true
commits, err = working.CommitsBefore(ctx, "HEAD")
}
if err != nil {
logger.Log("err", err)

var initialSync bool
var commits []git.Commit
{
var err error
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
commits, err = working.CommitsBetween(ctx, working.SyncTag, "HEAD")
if isUnknownRevision(err) {
// No sync tag, grab all revisions
initialSync = true
commits, err = working.CommitsBefore(ctx, "HEAD")
}
cancel()
if err != nil {
logger.Log("err", err)
return
}
}

// Figure out which service IDs changed in this release
Expand All @@ -160,11 +184,13 @@ func (d *Daemon) doSync(logger log.Logger) {
// no synctag, We are syncing everything from scratch
changedResources = allResources
} else {
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
changedFiles, err := working.ChangedFiles(ctx, working.SyncTag)
if err == nil {
// We had some changed files, we're syncing a diff
changedResources, err = d.Manifests.LoadManifests(changedFiles...)
}
cancel()
if err != nil {
logger.Log("err", errors.Wrap(err, "loading resources from repo"))
return
Expand All @@ -176,10 +202,15 @@ func (d *Daemon) doSync(logger log.Logger) {
serviceIDs.Add(r.ServiceIDs(allResources))
}

notes, err := working.NoteRevList(ctx)
if err != nil {
logger.Log("err", errors.Wrap(err, "loading notes from repo"))
return
var notes map[string]struct{}
{
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
notes, err = working.NoteRevList(ctx)
cancel()
if err != nil {
logger.Log("err", errors.Wrap(err, "loading notes from repo"))
return
}
}

// Collect any events that come from notes attached to the commits
Expand All @@ -193,16 +224,16 @@ func (d *Daemon) doSync(logger log.Logger) {

// Find notes in revisions.
for i := len(commits) - 1; i >= 0; i-- {
if ok := notes[commits[i].Revision]; !ok {
if _, ok := notes[commits[i].Revision]; !ok {
includes[history.NoneOfTheAbove] = true
continue
}
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
n, err := working.GetNote(ctx, commits[i].Revision)
cancel()
if err != nil {
logger.Log("err", errors.Wrap(err, "loading notes from repo; possibly no notes"))
// TODO: We're ignoring all errors here, not just the "no notes" error. Parse error to report proper errors.
includes[history.NoneOfTheAbove] = true
continue
return
}
if n == nil {
includes[history.NoneOfTheAbove] = true
Expand Down Expand Up @@ -300,17 +331,27 @@ func (d *Daemon) doSync(logger log.Logger) {
}

// Move the tag and push it so we know how far we've gotten.
if err := working.MoveTagAndPush(ctx, "HEAD", "Sync pointer"); err != nil {
logger.Log("err", err)
{
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
err := working.MoveTagAndPush(ctx, "HEAD", "Sync pointer")
cancel()
if err != nil {
logger.Log("err", err)
return
}
}

// Pull the tag if it has changed
if err := d.updateTagRev(ctx, working, logger); err != nil {
logger.Log("err", errors.Wrap(err, "updating tag"))
{
ctx, cancel := context.WithTimeout(ctx, gitOpTimeout)
if err := d.pullIfTagMoved(ctx, working, logger); err != nil {
logger.Log("err", errors.Wrap(err, "updating tag"))
}
cancel()
}
}

func (d *Daemon) updateTagRev(ctx context.Context, working *git.Checkout, logger log.Logger) error {
func (d *Daemon) pullIfTagMoved(ctx context.Context, working *git.Checkout, logger log.Logger) error {
oldTagRev, err := d.Checkout.TagRevision(ctx, d.Checkout.SyncTag)
if err != nil && !strings.Contains(err.Error(), "unknown revision or path not in the working tree") {
return err
Expand All @@ -322,7 +363,6 @@ func (d *Daemon) updateTagRev(ctx context.Context, working *git.Checkout, logger

if oldTagRev != newTagRev {
logger.Log("tag", d.Checkout.SyncTag, "old", oldTagRev, "new", newTagRev)

if err := d.Checkout.Pull(ctx); err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions git/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,17 @@ func getNote(ctx context.Context, workingDir, notesRef, rev string) (*Note, erro
// Get all revisions with a note (NB: DO NOT RELY ON THE ORDERING)
// It appears to be ordered by ascending git object ref, not by time.
// Return a map to make it easier to do "if in" type queries.
func noteRevList(ctx context.Context, workingDir, notesRef string) (map[string]bool, error) {
func noteRevList(ctx context.Context, workingDir, notesRef string) (map[string]struct{}, error) {
out := &bytes.Buffer{}
if err := execGitCmd(ctx, workingDir, nil, out, "notes", "--ref", notesRef, "list"); err != nil {
return nil, err
}
noteList := splitList(out.String())
result := make(map[string]bool, len(noteList))
result := make(map[string]struct{}, len(noteList))
for _, l := range noteList {
split := strings.Fields(l)
if len(split) > 0 {
result[split[1]] = true // First field contains the object ref (commit id in our case)
result[split[1]] = struct{}{} // First field contains the object ref (commit id in our case)
}
}
return result, nil
Expand Down
Loading

0 comments on commit b08d2a5

Please sign in to comment.