Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Add timeout to git command (#719)
Browse files Browse the repository at this point in the history
* Add timeout to git command

To mitigate against any low level execution problems, add a context timeout to git commands so that if they fail, it won't cause subsequent requests to back up.

Fixes #714

* Move context up to the callers of the git command
  • Loading branch information
philwinder authored Aug 29, 2017
1 parent 3445aa5 commit 987c94c
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 126 deletions.
5 changes: 4 additions & 1 deletion cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
k8sclient "k8s.io/client-go/1.5/kubernetes"
"k8s.io/client-go/1.5/rest"

"context"
"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/cluster/kubernetes"
Expand Down Expand Up @@ -341,7 +342,9 @@ func main() {
}

for checkout == nil {
working, err := repo.Clone(gitConfig)
ctx, cancel := context.WithTimeout(context.Background(), git.DefaultCloneTimeout)
working, err := repo.Clone(ctx, gitConfig)
cancel()
if err != nil {
if checker == nil {
checker = checkForUpdates(clusterVersion, "false", updateCheckLogger)
Expand Down
44 changes: 30 additions & 14 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/pkg/errors"

"context"
"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/git"
Expand All @@ -22,6 +23,10 @@ import (
"github.com/weaveworks/flux/update"
)

const (
defaultDaemonTimeout = 10 * time.Second
)

// Combine these things to form Devasta^Wan implementation of
// Platform.
type Daemon struct {
Expand Down Expand Up @@ -121,16 +126,18 @@ func (d *Daemon) ListImages(spec update.ServiceSpec) ([]flux.ImageStatus, error)
// run), leave the revision field empty.
type DaemonJobFunc func(jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error)

func (d *Daemon) queueJob(do DaemonJobFunc) job.ID {
// Must cancel the context once this job is complete
func (d *Daemon) queueJob(ctx context.Context, cancel context.CancelFunc, do DaemonJobFunc) job.ID {
id := job.ID(guid.New())
d.Jobs.Enqueue(&job.Job{
ID: id,
Do: func(logger log.Logger) error {
defer cancel()
started := time.Now().UTC()
d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusRunning})
// make a working clone so we don't mess with files we
// will be reading from elsewhere
working, err := d.Checkout.WorkingClone()
working, err := d.Checkout.WorkingClone(ctx)
if err != nil {
d.JobStatusCache.SetStatus(id, job.Status{StatusString: job.StatusFailed, Err: err.Error()})
return err
Expand Down Expand Up @@ -168,21 +175,22 @@ func (d *Daemon) queueJob(do DaemonJobFunc) job.ID {

// Apply the desired changes to the config files
func (d *Daemon) UpdateManifests(spec update.Spec) (job.ID, error) {
ctx, cancel := newDaemonContext()
var id job.ID
if spec.Type == "" {
return id, errors.New("no type in update spec")
}
switch s := spec.Spec.(type) {
case release.Changes:
return d.queueJob(d.release(spec, s)), nil
return d.queueJob(ctx, cancel, d.release(ctx, spec, s)), nil
case policy.Updates:
return d.queueJob(d.updatePolicy(spec, s)), nil
return d.queueJob(ctx, cancel, d.updatePolicy(ctx, spec, s)), nil
default:
return id, fmt.Errorf(`unknown update type "%s"`, spec.Type)
}
}

func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) DaemonJobFunc {
func (d *Daemon) updatePolicy(ctx context.Context, spec update.Spec, updates policy.Updates) DaemonJobFunc {
return func(jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error) {
// For each update
var serviceIDs []flux.ServiceID
Expand Down Expand Up @@ -238,7 +246,7 @@ func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) DaemonJo
return metadata, nil
}

if err := working.CommitAndPush(policyCommitMessage(updates, spec.Cause), &git.Note{JobID: jobID, Spec: spec}); err != nil {
if err := working.CommitAndPush(ctx, policyCommitMessage(updates, spec.Cause), &git.Note{JobID: jobID, Spec: spec}); err != nil {
// On the chance pushing failed because it was not
// possible to fast-forward, ask for a sync so the
// next attempt is more likely to succeed.
Expand All @@ -250,15 +258,15 @@ func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) DaemonJo
}

var err error
metadata.Revision, err = working.HeadRevision()
metadata.Revision, err = working.HeadRevision(ctx)
if err != nil {
return nil, err
}
return metadata, nil
}
}

func (d *Daemon) release(spec update.Spec, c release.Changes) DaemonJobFunc {
func (d *Daemon) release(ctx context.Context, spec update.Spec, c release.Changes) DaemonJobFunc {
return func(jobID job.ID, working *git.Checkout, logger log.Logger) (*history.CommitEventMetadata, error) {
rc := release.NewReleaseContext(d.Cluster, d.Manifests, d.Registry, working)
result, err := release.Release(rc, c, logger)
Expand All @@ -272,14 +280,14 @@ func (d *Daemon) release(spec update.Spec, c release.Changes) DaemonJobFunc {
if commitMsg == "" {
commitMsg = c.CommitMessage()
}
if err := working.CommitAndPush(commitMsg, &git.Note{JobID: jobID, Spec: spec, Result: result}); err != nil {
if err := working.CommitAndPush(ctx, commitMsg, &git.Note{JobID: jobID, Spec: spec, Result: result}); err != nil {
// On the chance pushing failed because it was not
// possible to fast-forward, ask for a sync so the
// next attempt is more likely to succeed.
d.askForSync()
return nil, err
}
revision, err = working.HeadRevision()
revision, err = working.HeadRevision(ctx)
if err != nil {
return nil, err
}
Expand All @@ -304,6 +312,8 @@ func (d *Daemon) SyncNotify() error {
// Ask the daemon how far it's got committing things; in particular, is the job
// queued? running? committed? If it is done, the commit ref is returned.
func (d *Daemon) JobStatus(jobID job.ID) (job.Status, error) {
ctx, cancel := newDaemonContext()
defer cancel()
// Is the job queued, running, or recently finished?
status, ok := d.JobStatusCache.Status(jobID)
if ok {
Expand All @@ -313,15 +323,15 @@ func (d *Daemon) JobStatus(jobID job.ID) (job.Status, error) {
// Look through the commits for a note referencing this job. This
// means that even if fluxd restarts, we will at least remember
// jobs which have pushed a commit.
if err := d.Checkout.Pull(); err != nil {
if err := d.Checkout.Pull(ctx); err != nil {
return job.Status{}, errors.Wrap(err, "updating repo for status")
}
commits, err := d.Checkout.CommitsBefore("HEAD")
commits, err := d.Checkout.CommitsBefore(ctx, "HEAD")
if err != nil {
return job.Status{}, errors.Wrap(err, "checking revisions for status")
}
for _, commit := range commits {
note, _ := d.Checkout.GetNote(commit.Revision)
note, _ := d.Checkout.GetNote(ctx, commit.Revision)
if note != nil && note.JobID == jobID {
return job.Status{
StatusString: job.StatusSucceeded,
Expand All @@ -343,7 +353,9 @@ func (d *Daemon) JobStatus(jobID job.ID) (job.Status, error) {
// you'll get all the commits yet to be applied. If you send a hash
// and it's applied _past_ it, you'll get an empty list.
func (d *Daemon) SyncStatus(commitRef string) ([]string, error) {
commits, err := d.Checkout.CommitsBetween(d.Checkout.SyncTag, commitRef)
ctx, cancel := newDaemonContext()
defer cancel()
commits, err := d.Checkout.CommitsBetween(ctx, d.Checkout.SyncTag, commitRef)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -489,3 +501,7 @@ func policyEventTypes(u policy.Update) []string {
sort.Strings(result)
return result
}

func newDaemonContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultDaemonTimeout)
}
3 changes: 2 additions & 1 deletion daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"context"
"github.com/go-kit/kit/log"
"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
Expand Down Expand Up @@ -338,7 +339,7 @@ func mockDaemon(t *testing.T) (*Daemon, func(), *cluster.Mock, history.EventRead
SyncTag: "flux-test",
NotesRef: "fluxtest",
}
checkout, err := repo.Clone(params)
checkout, err := repo.Clone(context.Background(), params)
if err != nil {
t.Fatal(err)
}
Expand Down
33 changes: 21 additions & 12 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"sync"

"context"
"github.com/weaveworks/flux"
"github.com/weaveworks/flux/git"
"github.com/weaveworks/flux/history"
Expand All @@ -17,6 +18,10 @@ import (
"github.com/weaveworks/flux/update"
)

const (
defaultSyncTimeout = 30 * time.Second // Max time allowed for syncs
)

type LoopVars struct {
GitPollInterval time.Duration
RegistryPollInterval time.Duration
Expand All @@ -43,7 +48,9 @@ func (d *Daemon) GitPollLoop(stop chan struct{}, wg *sync.WaitGroup, logger log.
gitPollTimer.Stop()
gitPollTimer = time.NewTimer(d.GitPollInterval)
}()
if err := d.Checkout.Pull(); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), defaultSyncTimeout)
defer cancel()
if err := d.Checkout.Pull(ctx); err != nil {
logger.Log("operation", "pull", "err", err)
return
}
Expand Down Expand Up @@ -109,10 +116,12 @@ func (d *LoopVars) askForImagePoll() {
// -- extra bits the loop needs

func (d *Daemon) doSync(logger log.Logger) {
ctx, cancel := context.WithTimeout(context.Background(), defaultSyncTimeout)
defer cancel()
started := time.Now().UTC()

// checkout a working clone so we can mess around with tags later
working, err := d.Checkout.WorkingClone()
working, err := d.Checkout.WorkingClone(ctx)
if err != nil {
logger.Log("err", err)
return
Expand All @@ -134,11 +143,11 @@ func (d *Daemon) doSync(logger log.Logger) {

var initialSync bool
// update notes and emit events for applied commits
commits, err := working.CommitsBetween(working.SyncTag, "HEAD")
commits, err := working.CommitsBetween(ctx, working.SyncTag, "HEAD")
if isUnknownRevision(err) {
// No sync tag, grab all revisions
initialSync = true
commits, err = working.CommitsBefore("HEAD")
commits, err = working.CommitsBefore(ctx, "HEAD")
}
if err != nil {
logger.Log("err", err)
Expand All @@ -151,7 +160,7 @@ func (d *Daemon) doSync(logger log.Logger) {
// no synctag, We are syncing everything from scratch
changedResources = allResources
} else {
changedFiles, err := working.ChangedFiles(working.SyncTag)
changedFiles, err := working.ChangedFiles(ctx, working.SyncTag)
if err == nil {
// We had some changed files, we're syncing a diff
changedResources, err = d.Manifests.LoadManifests(changedFiles...)
Expand All @@ -178,7 +187,7 @@ func (d *Daemon) doSync(logger log.Logger) {

// Find notes in revisions.
for i := len(commits) - 1; i >= 0; i-- {
n, err := working.GetNote(commits[i].Revision)
n, err := working.GetNote(ctx, commits[i].Revision)
if err != nil {
logger.Log("err", errors.Wrap(err, "loading notes from repo; possibly no notes"))
// TODO: We're ignoring all errors here, not just the "no notes" error. Parse error to report proper errors.
Expand Down Expand Up @@ -271,30 +280,30 @@ func (d *Daemon) doSync(logger log.Logger) {
}

// Move the tag and push it so we know how far we've gotten.
if err := working.MoveTagAndPush("HEAD", "Sync pointer"); err != nil {
if err := working.MoveTagAndPush(ctx, "HEAD", "Sync pointer"); err != nil {
logger.Log("err", err)
}

// Pull the tag if it has changed
if err := d.updateTagRev(working, logger); err != nil {
if err := d.updateTagRev(ctx, working, logger); err != nil {
logger.Log("err", errors.Wrap(err, "updating tag"))
}
}

func (d *Daemon) updateTagRev(working *git.Checkout, logger log.Logger) error {
oldTagRev, err := d.Checkout.TagRevision(d.Checkout.SyncTag)
func (d *Daemon) updateTagRev(ctx context.Context, working *git.Checkout, logger log.Logger) error {
oldTagRev, err := d.Checkout.TagRevision(ctx, d.Checkout.SyncTag)
if err != nil && !strings.Contains(err.Error(), "unknown revision or path not in the working tree") {
return err
}
newTagRev, err := working.TagRevision(working.SyncTag)
newTagRev, err := working.TagRevision(ctx, working.SyncTag)
if err != nil {
return err
}

if oldTagRev != newTagRev {
logger.Log("tag", d.Checkout.SyncTag, "old", oldTagRev, "new", newTagRev)

if err := d.Checkout.Pull(); err != nil {
if err := d.Checkout.Pull(ctx); err != nil {
return err
}
}
Expand Down
27 changes: 14 additions & 13 deletions daemon/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-kit/kit/log"

"context"
"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/cluster/kubernetes"
Expand All @@ -37,7 +38,7 @@ var (

func daemon(t *testing.T) (*Daemon, func()) {
repo, repoCleanup := gittest.Repo(t)
working, err := repo.Clone(git.Config{
working, err := repo.Clone(context.Background(), git.Config{
SyncTag: gitSyncTag,
NotesRef: gitNotesRef,
UserName: gitUser,
Expand Down Expand Up @@ -125,9 +126,9 @@ func TestPullAndSync_InitialSync(t *testing.T) {
}
}
// It creates the tag at HEAD
if err := d.Checkout.Pull(); err != nil {
if err := d.Checkout.Pull(context.Background()); err != nil {
t.Errorf("pulling sync tag: %v", err)
} else if revs, err := d.Checkout.CommitsBefore(gitSyncTag); err != nil {
} else if revs, err := d.Checkout.CommitsBefore(context.Background(), gitSyncTag); err != nil {
t.Errorf("finding revisions before sync tag: %v", err)
} else if len(revs) <= 0 {
t.Errorf("Found no revisions before the sync tag")
Expand All @@ -138,7 +139,7 @@ func TestDoSync_NoNewCommits(t *testing.T) {
// Tag exists
d, cleanup := daemon(t)
defer cleanup()
if err := d.Checkout.MoveTagAndPush("HEAD", "Sync pointer"); err != nil {
if err := d.Checkout.MoveTagAndPush(context.Background(), "HEAD", "Sync pointer"); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -173,13 +174,13 @@ func TestDoSync_NoNewCommits(t *testing.T) {
}

// It doesn't move the tag
oldRevs, err := d.Checkout.CommitsBefore(gitSyncTag)
oldRevs, err := d.Checkout.CommitsBefore(context.Background(), gitSyncTag)
if err != nil {
t.Fatal(err)
}
if err := d.Checkout.Pull(); err != nil {
if err := d.Checkout.Pull(context.Background()); err != nil {
t.Errorf("pulling sync tag: %v", err)
} else if revs, err := d.Checkout.CommitsBefore(gitSyncTag); err != nil {
} else if revs, err := d.Checkout.CommitsBefore(context.Background(), gitSyncTag); err != nil {
t.Errorf("finding revisions before sync tag: %v", err)
} else if !reflect.DeepEqual(revs, oldRevs) {
t.Errorf("Should have kept the sync tag at HEAD")
Expand All @@ -191,10 +192,10 @@ func TestDoSync_WithNewCommit(t *testing.T) {
d, cleanup := daemon(t)
defer cleanup()
// Set the sync tag to head
if err := d.Checkout.MoveTagAndPush("HEAD", "Sync pointer"); err != nil {
if err := d.Checkout.MoveTagAndPush(context.Background(), "HEAD", "Sync pointer"); err != nil {
t.Fatal(err)
}
oldRevision, err := d.Checkout.HeadRevision()
oldRevision, err := d.Checkout.HeadRevision(context.Background())
if err != nil {
t.Fatal(err)
}
Expand All @@ -205,10 +206,10 @@ func TestDoSync_WithNewCommit(t *testing.T) {
}); err != nil {
t.Fatal(err)
}
if err := d.Checkout.CommitAndPush("test commit", nil); err != nil {
if err := d.Checkout.CommitAndPush(context.Background(), "test commit", nil); err != nil {
t.Fatal(err)
}
newRevision, err := d.Checkout.HeadRevision()
newRevision, err := d.Checkout.HeadRevision(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -252,9 +253,9 @@ func TestDoSync_WithNewCommit(t *testing.T) {
}
}
// It moves the tag
if err := d.Checkout.Pull(); err != nil {
if err := d.Checkout.Pull(context.Background()); err != nil {
t.Errorf("pulling sync tag: %v", err)
} else if revs, err := d.Checkout.CommitsBetween(oldRevision, gitSyncTag); err != nil {
} else if revs, err := d.Checkout.CommitsBetween(context.Background(), oldRevision, gitSyncTag); err != nil {
t.Errorf("finding revisions before sync tag: %v", err)
} else if len(revs) <= 0 {
t.Errorf("Should have moved sync tag forward")
Expand Down
Loading

0 comments on commit 987c94c

Please sign in to comment.