Skip to content

Commit

Permalink
split apart task hydration+execution from watcher/config package with…
Browse files Browse the repository at this point in the history
… a simple channel

resolves #27
  • Loading branch information
Southclaws committed Mar 19, 2020
1 parent b61ceac commit b3657a1
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 35 deletions.
53 changes: 53 additions & 0 deletions service/executor/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package executor

import (
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/picostack/pico/service/secret"
"github.com/picostack/pico/service/task"
)

var _ Executor = &CommandExecutor{}

// CommandExecutor handles command invocation targets
type CommandExecutor struct {
secrets secret.Store
}

// NewCommandExecutor creates a new CommandExecutor
func NewCommandExecutor(secrets secret.Store) CommandExecutor {
return CommandExecutor{
secrets: secrets,
}
}

// Subscribe implements executor.Executor
func (e *CommandExecutor) Subscribe(bus chan task.ExecutionTask) error {
for t := range bus {
if err := e.execute(t.Target, t.Path, t.Shutdown); err != nil {
return err
}
}
return nil
}

func (e *CommandExecutor) execute(
target task.Target,
path string,
shutdown bool,
) (err error) {
env, err := e.secrets.GetSecretsForTarget(target.Name)
if err != nil {
return errors.Wrap(err, "failed to get secrets for target")
}

zap.L().Debug("executing with secrets",
zap.String("target", target.Name),
zap.Strings("cmd", target.Up),
zap.String("url", target.RepoURL),
zap.String("dir", path),
zap.Int("secrets", len(env)))

return target.Execute(path, env, shutdown)
}
9 changes: 9 additions & 0 deletions service/executor/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package executor

import "github.com/picostack/pico/service/task"

// Executor describes a type that can handle events and react to them. An
// executor is also responsible for hydrating a target with secrets.
type Executor interface {
Subscribe(chan task.ExecutionTask) error
}
8 changes: 7 additions & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"gopkg.in/src-d/go-git.v4/plumbing/transport"
"gopkg.in/src-d/go-git.v4/plumbing/transport/ssh"

"github.com/picostack/pico/service/executor"
"github.com/picostack/pico/service/secret"
"github.com/picostack/pico/service/secret/memory"
"github.com/picostack/pico/service/secret/vault"
"github.com/picostack/pico/service/task"
"github.com/picostack/pico/service/watcher"
)

Expand Down Expand Up @@ -71,8 +73,12 @@ func Initialise(c Config) (app *App, err error) {

app.secrets = secretStore

bus := make(chan task.ExecutionTask, 100)
ce := executor.NewCommandExecutor(secretStore)
ce.Subscribe(bus)

app.watcher = watcher.New(
secretStore,
bus,
c.Hostname,
c.Directory,
c.Target,
Expand Down
7 changes: 7 additions & 0 deletions service/task/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ import (
"go.uber.org/zap"
)

// ExecutionTask encodes a Target with additional execution-time information.
type ExecutionTask struct {
Target Target
Path string
Shutdown bool
}

// Targets is just a list of target objects, to implement the Sort interface
type Targets []Target

Expand Down
40 changes: 12 additions & 28 deletions service/watcher/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,16 @@ func (w *Watcher) handle(e gitwatch.Event) (err error) {
zap.String("target", target.Name),
zap.String("url", target.RepoURL),
zap.Time("timestamp", e.Timestamp))
return w.executeWithSecrets(target, e.Path, false)
w.send(target, e.Path, false)
return nil
}

func (w *Watcher) executeWithSecrets(target task.Target, path string, shutdown bool) (err error) {
env, err := w.secrets.GetSecretsForTarget(target.Name)
if err != nil {
return errors.Wrap(err, "failed to get secrets for target")
func (w Watcher) executeTargets(targets []task.Target, shutdown bool) {
zap.L().Debug("executing all targets", zap.Bool("shutdown", shutdown))
for _, t := range targets {
w.send(t, filepath.Join(w.directory, t.Name), shutdown)
}

zap.L().Debug("executing with secrets",
zap.String("target", target.Name),
zap.Strings("cmd", target.Up),
zap.String("url", target.RepoURL),
zap.String("dir", path),
zap.Int("secrets", len(env)))

return target.Execute(path, env, shutdown)
return
}

func (w Watcher) getTarget(url string) (target task.Target, exists bool) {
Expand All @@ -48,19 +41,10 @@ func (w Watcher) getTarget(url string) (target task.Target, exists bool) {
return
}

func (w Watcher) executeTargets(targets []task.Target, shutdown bool) {
zap.L().Debug("executing all targets", zap.Bool("shutdown", shutdown))
for _, t := range targets {
err := w.executeWithSecrets(
t,
filepath.Join(w.directory, t.Name),
shutdown,
)
if err != nil {
zap.L().Error("failed to execute task after reconfigure",
zap.Error(errors.Cause(err)))
continue
}
func (w Watcher) send(target task.Target, path string, shutdown bool) {
w.bus <- task.ExecutionTask{
Target: target,
Path: path,
Shutdown: shutdown,
}
return
}
13 changes: 7 additions & 6 deletions service/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import (
"time"

"github.com/Southclaws/gitwatch"
"github.com/picostack/pico/service/config"
"github.com/picostack/pico/service/secret"
"github.com/picostack/pico/service/task"
"go.uber.org/zap"
"gopkg.in/src-d/go-git.v4/plumbing/transport"

"github.com/picostack/pico/service/config"
"github.com/picostack/pico/service/task"
)

// Watcher handles Git events and dispatches config or task execution events.
type Watcher struct {
secrets secret.Store
bus chan task.ExecutionTask
hostname string
directory string
configRepo string
Expand All @@ -32,15 +32,15 @@ type Watcher struct {

// New creates a new watcher with all necessary parameters
func New(
secrets secret.Store,
bus chan task.ExecutionTask,
hostname string,
directory string,
configRepo string,
checkInterval time.Duration,
ssh transport.AuthMethod,
) *Watcher {
return &Watcher{
secrets: secrets,
bus: bus,
hostname: hostname,
directory: directory,
configRepo: configRepo,
Expand All @@ -51,6 +51,7 @@ func New(
}
}

// Start runs the watcher and blocks until a fatal error occurs
func (w *Watcher) Start() error {
if err := w.reconfigure(); err != nil {
return err
Expand Down

0 comments on commit b3657a1

Please sign in to comment.