From b3657a115752b77b72ceb70c55e26c20169bb5b7 Mon Sep 17 00:00:00 2001 From: Barnaby Keene Date: Thu, 19 Mar 2020 18:15:32 +0000 Subject: [PATCH] split apart task hydration+execution from watcher/config package with a simple channel resolves #27 --- service/executor/cmd.go | 53 ++++++++++++++++++++++++++++++++++++ service/executor/executor.go | 9 ++++++ service/service.go | 8 +++++- service/task/target.go | 7 +++++ service/watcher/handler.go | 40 ++++++++------------------- service/watcher/watcher.go | 13 +++++---- 6 files changed, 95 insertions(+), 35 deletions(-) create mode 100644 service/executor/cmd.go create mode 100644 service/executor/executor.go diff --git a/service/executor/cmd.go b/service/executor/cmd.go new file mode 100644 index 0000000..f096171 --- /dev/null +++ b/service/executor/cmd.go @@ -0,0 +1,53 @@ +package executor + +import ( + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/picostack/pico/service/secret" + "github.com/picostack/pico/service/task" +) + +var _ Executor = &CommandExecutor{} + +// CommandExecutor handles command invocation targets +type CommandExecutor struct { + secrets secret.Store +} + +// NewCommandExecutor creates a new CommandExecutor +func NewCommandExecutor(secrets secret.Store) CommandExecutor { + return CommandExecutor{ + secrets: secrets, + } +} + +// Subscribe implements executor.Executor +func (e *CommandExecutor) Subscribe(bus chan task.ExecutionTask) error { + for t := range bus { + if err := e.execute(t.Target, t.Path, t.Shutdown); err != nil { + return err + } + } + return nil +} + +func (e *CommandExecutor) execute( + target task.Target, + path string, + shutdown bool, +) (err error) { + env, err := e.secrets.GetSecretsForTarget(target.Name) + if err != nil { + return errors.Wrap(err, "failed to get secrets for target") + } + + zap.L().Debug("executing with secrets", + zap.String("target", target.Name), + zap.Strings("cmd", target.Up), + zap.String("url", target.RepoURL), + zap.String("dir", path), + zap.Int("secrets", len(env))) + + return target.Execute(path, env, shutdown) +} diff --git a/service/executor/executor.go b/service/executor/executor.go new file mode 100644 index 0000000..063fcea --- /dev/null +++ b/service/executor/executor.go @@ -0,0 +1,9 @@ +package executor + +import "github.com/picostack/pico/service/task" + +// Executor describes a type that can handle events and react to them. An +// executor is also responsible for hydrating a target with secrets. +type Executor interface { + Subscribe(chan task.ExecutionTask) error +} diff --git a/service/service.go b/service/service.go index 88fa4ee..aceb406 100644 --- a/service/service.go +++ b/service/service.go @@ -11,9 +11,11 @@ import ( "gopkg.in/src-d/go-git.v4/plumbing/transport" "gopkg.in/src-d/go-git.v4/plumbing/transport/ssh" + "github.com/picostack/pico/service/executor" "github.com/picostack/pico/service/secret" "github.com/picostack/pico/service/secret/memory" "github.com/picostack/pico/service/secret/vault" + "github.com/picostack/pico/service/task" "github.com/picostack/pico/service/watcher" ) @@ -71,8 +73,12 @@ func Initialise(c Config) (app *App, err error) { app.secrets = secretStore + bus := make(chan task.ExecutionTask, 100) + ce := executor.NewCommandExecutor(secretStore) + ce.Subscribe(bus) + app.watcher = watcher.New( - secretStore, + bus, c.Hostname, c.Directory, c.Target, diff --git a/service/task/target.go b/service/task/target.go index 2f19ce1..61a6588 100644 --- a/service/task/target.go +++ b/service/task/target.go @@ -9,6 +9,13 @@ import ( "go.uber.org/zap" ) +// ExecutionTask encodes a Target with additional execution-time information. +type ExecutionTask struct { + Target Target + Path string + Shutdown bool +} + // Targets is just a list of target objects, to implement the Sort interface type Targets []Target diff --git a/service/watcher/handler.go b/service/watcher/handler.go index 8a29892..3e5f82f 100644 --- a/service/watcher/handler.go +++ b/service/watcher/handler.go @@ -20,23 +20,16 @@ func (w *Watcher) handle(e gitwatch.Event) (err error) { zap.String("target", target.Name), zap.String("url", target.RepoURL), zap.Time("timestamp", e.Timestamp)) - return w.executeWithSecrets(target, e.Path, false) + w.send(target, e.Path, false) + return nil } -func (w *Watcher) executeWithSecrets(target task.Target, path string, shutdown bool) (err error) { - env, err := w.secrets.GetSecretsForTarget(target.Name) - if err != nil { - return errors.Wrap(err, "failed to get secrets for target") +func (w Watcher) executeTargets(targets []task.Target, shutdown bool) { + zap.L().Debug("executing all targets", zap.Bool("shutdown", shutdown)) + for _, t := range targets { + w.send(t, filepath.Join(w.directory, t.Name), shutdown) } - - zap.L().Debug("executing with secrets", - zap.String("target", target.Name), - zap.Strings("cmd", target.Up), - zap.String("url", target.RepoURL), - zap.String("dir", path), - zap.Int("secrets", len(env))) - - return target.Execute(path, env, shutdown) + return } func (w Watcher) getTarget(url string) (target task.Target, exists bool) { @@ -48,19 +41,10 @@ func (w Watcher) getTarget(url string) (target task.Target, exists bool) { return } -func (w Watcher) executeTargets(targets []task.Target, shutdown bool) { - zap.L().Debug("executing all targets", zap.Bool("shutdown", shutdown)) - for _, t := range targets { - err := w.executeWithSecrets( - t, - filepath.Join(w.directory, t.Name), - shutdown, - ) - if err != nil { - zap.L().Error("failed to execute task after reconfigure", - zap.Error(errors.Cause(err))) - continue - } +func (w Watcher) send(target task.Target, path string, shutdown bool) { + w.bus <- task.ExecutionTask{ + Target: target, + Path: path, + Shutdown: shutdown, } - return } diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go index 7125dad..715fc63 100644 --- a/service/watcher/watcher.go +++ b/service/watcher/watcher.go @@ -5,16 +5,16 @@ import ( "time" "github.com/Southclaws/gitwatch" - "github.com/picostack/pico/service/config" - "github.com/picostack/pico/service/secret" - "github.com/picostack/pico/service/task" "go.uber.org/zap" "gopkg.in/src-d/go-git.v4/plumbing/transport" + + "github.com/picostack/pico/service/config" + "github.com/picostack/pico/service/task" ) // Watcher handles Git events and dispatches config or task execution events. type Watcher struct { - secrets secret.Store + bus chan task.ExecutionTask hostname string directory string configRepo string @@ -32,7 +32,7 @@ type Watcher struct { // New creates a new watcher with all necessary parameters func New( - secrets secret.Store, + bus chan task.ExecutionTask, hostname string, directory string, configRepo string, @@ -40,7 +40,7 @@ func New( ssh transport.AuthMethod, ) *Watcher { return &Watcher{ - secrets: secrets, + bus: bus, hostname: hostname, directory: directory, configRepo: configRepo, @@ -51,6 +51,7 @@ func New( } } +// Start runs the watcher and blocks until a fatal error occurs func (w *Watcher) Start() error { if err := w.reconfigure(); err != nil { return err