diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 455ed109fdd..240c6aacf4b 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -276,12 +276,12 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error) e.childCmd.Stderr = stderr // Look up the binary path and make it executable - absPath, err := e.lookupBin(command.Cmd) + absPath, err := lookupBin(command.TaskDir, command.Cmd) if err != nil { return nil, err } - if err := e.makeExecutable(absPath); err != nil { + if err := makeExecutable(absPath); err != nil { return nil, err } @@ -476,18 +476,42 @@ func (e *UniversalExecutor) Shutdown(signal string, grace time.Duration) error { return nil } +// Signal sends the passed signal to the task +func (e *UniversalExecutor) Signal(s os.Signal) error { + if e.childCmd.Process == nil { + return fmt.Errorf("Task not yet run") + } + + e.logger.Debug("sending signal to PID", "signal", s, "pid", e.childCmd.Process.Pid) + err := e.childCmd.Process.Signal(s) + if err != nil { + e.logger.Error("sending signal failed", "signal", s, "error", err) + return err + } + + return nil +} + +func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) { + pidStats, err := e.pidCollector.pidStats() + if err != nil { + return nil, err + } + return aggregatedResourceUsage(e.systemCpuStats, pidStats), nil +} + // lookupBin looks for path to the binary to run by looking for the binary in // the following locations, in-order: task/local/, task/, based on host $PATH. // The return path is absolute. -func (e *UniversalExecutor) lookupBin(bin string) (string, error) { +func lookupBin(taskDir string, bin string) (string, error) { // Check in the local directory - local := filepath.Join(e.commandCfg.TaskDir, allocdir.TaskLocal, bin) + local := filepath.Join(taskDir, allocdir.TaskLocal, bin) if _, err := os.Stat(local); err == nil { return local, nil } // Check at the root of the task's directory - root := filepath.Join(e.commandCfg.TaskDir, bin) + root := filepath.Join(taskDir, bin) if _, err := os.Stat(root); err == nil { return root, nil } @@ -501,7 +525,7 @@ func (e *UniversalExecutor) lookupBin(bin string) (string, error) { } // makeExecutable makes the given file executable for root,group,others. -func (e *UniversalExecutor) makeExecutable(binPath string) error { +func makeExecutable(binPath string) error { if runtime.GOOS == "windows" { return nil } @@ -524,27 +548,3 @@ func (e *UniversalExecutor) makeExecutable(binPath string) error { } return nil } - -// Signal sends the passed signal to the task -func (e *UniversalExecutor) Signal(s os.Signal) error { - if e.childCmd.Process == nil { - return fmt.Errorf("Task not yet run") - } - - e.logger.Debug("sending signal to PID", "signal", s, "pid", e.childCmd.Process.Pid) - err := e.childCmd.Process.Signal(s) - if err != nil { - e.logger.Error("sending signal failed", "signal", s, "error", err) - return err - } - - return nil -} - -func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) { - pidStats, err := e.pidCollector.pidStats() - if err != nil { - return nil, err - } - return aggregatedResourceUsage(e.systemCpuStats, pidStats), nil -} diff --git a/client/driver/executor/executor_linux.go b/client/driver/executor/executor_linux.go index dfc2db0105d..be1f76f6dc8 100644 --- a/client/driver/executor/executor_linux.go +++ b/client/driver/executor/executor_linux.go @@ -101,6 +101,10 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro return nil, fmt.Errorf("unable to find the nomad binary: %v", err) } + if command.Resources == nil { + command.Resources = &Resources{} + } + l.command = command // Move to the root cgroup until process is started @@ -129,7 +133,26 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro } l.container = container - combined := append([]string{command.Cmd}, command.Args...) + // Look up the binary path and make it executable + absPath, err := lookupBin(command.TaskDir, command.Cmd) + if err != nil { + return nil, err + } + + if err := makeExecutable(absPath); err != nil { + return nil, err + } + + path := absPath + + // Determine the path to run as it may have to be relative to the chroot. + rel, err := filepath.Rel(command.TaskDir, path) + if err != nil { + return nil, fmt.Errorf("failed to determine relative path base=%q target=%q: %v", command.TaskDir, path, err) + } + path = rel + + combined := append([]string{path}, command.Args...) stdout, err := command.Stdout() if err != nil { return nil, err diff --git a/drivers/exec/driver.go b/drivers/exec/driver.go new file mode 100644 index 00000000000..231382a3233 --- /dev/null +++ b/drivers/exec/driver.go @@ -0,0 +1,440 @@ +package exec + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "time" + + "github.com/hashicorp/consul-template/signals" + hclog "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/driver/executor" + dstructs "github.com/hashicorp/nomad/client/driver/structs" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/drivers/shared/eventer" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/drivers/utils" + "github.com/hashicorp/nomad/plugins/shared/hclspec" + "golang.org/x/net/context" +) + +const ( + // pluginName is the name of the plugin + pluginName = "exec" + + // fingerprintPeriod is the interval at which the driver will send fingerprint responses + fingerprintPeriod = 30 * time.Second +) + +var ( + // pluginInfo is the response returned for the PluginInfo RPC + pluginInfo = &base.PluginInfoResponse{ + Type: base.PluginTypeDriver, + PluginApiVersion: "0.0.1", + PluginVersion: "0.1.0", + Name: pluginName, + } + + // configSpec is the hcl specification returned by the ConfigSchema RPC + configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + "enabled": hclspec.NewDefault( + hclspec.NewAttr("enabled", "bool", false), + hclspec.NewLiteral("true"), + ), + }) + + // taskConfigSpec is the hcl specification for the driver config section of + // a task within a job. It is returned in the TaskConfigSchema RPC + taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + "command": hclspec.NewAttr("command", "string", true), + "args": hclspec.NewAttr("args", "list(string)", false), + }) + + // capabilities is returned by the Capabilities RPC and indicates what + // optional features this driver supports + capabilities = &drivers.Capabilities{ + SendSignals: true, + Exec: true, + FSIsolation: cstructs.FSIsolationChroot, + } +) + +// ExecDriver fork/execs tasks using many of the underlying OS's isolation +// features where configured. +type ExecDriver struct { + // eventer is used to handle multiplexing of TaskEvents calls such that an + // event can be broadcast to all callers + eventer *eventer.Eventer + + // config is the driver configuration set by the SetConfig RPC + config *Config + + // tasks is the in memory datastore mapping taskIDs to execDriverHandles + tasks *taskStore + + // ctx is the context for the driver. It is passed to other subsystems to + // coordinate shutdown + ctx context.Context + + // signalShutdown is called when the driver is shutting down and cancels the + // ctx passed to any subsystems + signalShutdown context.CancelFunc + + // logger will log to the plugin output which is usually an 'executor.out' + // file located in the root of the TaskDir + logger hclog.Logger +} + +// Config is the driver configuration set by the SetConfig RPC call +type Config struct { + // Enabled is set to true to enable the driver + Enabled bool `cty:"enabled"` +} + +// TaskConfig is the driver configuration of a task within a job +type TaskConfig struct { + Command string `cty:"command"` + Args []string `cty:"args"` +} + +// TaskState is the state which is encoded in the handle returned in +// StartTask. This information is needed to rebuild the task state and handler +// during recovery. +type TaskState struct { + ReattachConfig *utils.ReattachConfig + TaskConfig *drivers.TaskConfig + Pid int + StartedAt time.Time +} + +// NewExecDriver returns a new DrivePlugin implementation +func NewExecDriver(logger hclog.Logger) drivers.DriverPlugin { + ctx, cancel := context.WithCancel(context.Background()) + logger = logger.Named(pluginName) + return &ExecDriver{ + eventer: eventer.NewEventer(ctx, logger), + config: &Config{}, + tasks: newTaskStore(), + ctx: ctx, + signalShutdown: cancel, + logger: logger, + } +} + +func (*ExecDriver) PluginInfo() (*base.PluginInfoResponse, error) { + return pluginInfo, nil +} + +func (*ExecDriver) ConfigSchema() (*hclspec.Spec, error) { + return configSpec, nil +} + +func (d *ExecDriver) SetConfig(data []byte) error { + var config Config + if err := base.MsgPackDecode(data, &config); err != nil { + return err + } + + d.config = &config + return nil +} + +func (d *ExecDriver) Shutdown(ctx context.Context) error { + d.signalShutdown() + return nil +} + +func (d *ExecDriver) TaskConfigSchema() (*hclspec.Spec, error) { + return taskConfigSpec, nil +} + +func (d *ExecDriver) Capabilities() (*drivers.Capabilities, error) { + return capabilities, nil +} + +func (d *ExecDriver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { + ch := make(chan *drivers.Fingerprint) + go d.handleFingerprint(ctx, ch) + return ch, nil + +} +func (d *ExecDriver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerprint) { + defer close(ch) + ticker := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return + case <-d.ctx.Done(): + return + case <-ticker.C: + ticker.Reset(fingerprintPeriod) + ch <- d.buildFingerprint() + } + } +} + +func (d *ExecDriver) RecoverTask(handle *drivers.TaskHandle) error { + var taskState TaskState + logger := d.logger.With("task_id", handle.Config.ID) + + err := handle.GetDriverState(&taskState) + if err != nil { + logger.Error("failed to decode driver state during task recovery", "error", err) + return fmt.Errorf("failed to decode state: %v", err) + } + + plugRC, err := utils.ReattachConfigToGoPlugin(taskState.ReattachConfig) + if err != nil { + logger.Error("failed to build reattach config during task recovery", "error", err) + return fmt.Errorf("failed to build reattach config: %v", err) + } + + pluginConfig := &plugin.ClientConfig{ + Reattach: plugRC, + } + + exec, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr) + if err != nil { + logger.Error("failed to build executor during task recovery", "error", err) + return fmt.Errorf("failed to build executor: %v", err) + } + + h := &execTaskHandle{ + exec: exec, + pid: taskState.Pid, + pluginClient: pluginClient, + task: taskState.TaskConfig, + procState: drivers.TaskStateRunning, + startedAt: taskState.StartedAt, + exitCh: make(chan struct{}), + } + + d.tasks.Set(taskState.TaskConfig.ID, h) + + go h.run() + return nil +} + +func (d *ExecDriver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) { + if _, ok := d.tasks.Get(cfg.ID); ok { + return nil, nil, fmt.Errorf("task with ID '%s' already started", cfg.ID) + } + + var driverConfig TaskConfig + if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { + return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) + } + + handle := drivers.NewTaskHandle(pluginName) + handle.Config = cfg + + pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out") + executorConfig := &dstructs.ExecutorConfig{ + LogFile: pluginLogFile, + LogLevel: "debug", + FSIsolation: true, + } + + // TODO: best way to pass port ranges in from client config + exec, pluginClient, err := utils.CreateExecutor(os.Stderr, hclog.Debug, 14000, 14512, executorConfig) + if err != nil { + return nil, nil, err + } + + execCmd := &executor.ExecCommand{ + Cmd: driverConfig.Command, + Args: driverConfig.Args, + Env: cfg.EnvList(), + User: cfg.User, + ResourceLimits: true, + TaskDir: cfg.TaskDir().Dir, + StdoutPath: cfg.StdoutPath, + StderrPath: cfg.StderrPath, + } + + ps, err := exec.Launch(execCmd) + if err != nil { + pluginClient.Kill() + return nil, nil, err + } + + h := &execTaskHandle{ + exec: exec, + pid: ps.Pid, + pluginClient: pluginClient, + task: cfg, + procState: drivers.TaskStateRunning, + startedAt: time.Now().Round(time.Millisecond), + logger: d.logger, + exitCh: make(chan struct{}), + } + + driverState := &TaskState{ + ReattachConfig: utils.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()), + Pid: ps.Pid, + TaskConfig: cfg, + StartedAt: h.startedAt, + } + + if err := handle.SetDriverState(&driverState); err != nil { + d.logger.Error("failed to start task, error setting driver state", "error", err) + exec.Shutdown("", 0) + pluginClient.Kill() + return nil, nil, err + } + + d.tasks.Set(cfg.ID, h) + go h.run() + return handle, nil, nil +} + +func (d *ExecDriver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { + ch := make(chan *drivers.ExitResult) + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + go d.handleWait(ctx, handle, ch) + + return ch, nil +} + +func (d *ExecDriver) handleWait(ctx context.Context, handle *execTaskHandle, ch chan *drivers.ExitResult) { + defer close(ch) + + select { + case <-ctx.Done(): + return + case <-d.ctx.Done(): + return + case <-handle.exitCh: + ch <- handle.exitResult + } +} + +func (d *ExecDriver) StopTask(taskID string, timeout time.Duration, signal string) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + if err := handle.exec.Shutdown(signal, timeout); err != nil { + if handle.pluginClient.Exited() { + return nil + } + return fmt.Errorf("executor Shutdown failed: %v", err) + } + + return nil +} + +func (d *ExecDriver) DestroyTask(taskID string, force bool) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + if handle.IsRunning() && !force { + return fmt.Errorf("cannot destroy running task") + } + + if !handle.pluginClient.Exited() { + if handle.IsRunning() { + if err := handle.exec.Shutdown("", 0); err != nil { + handle.logger.Error("destroying executor failed", "err", err) + } + } + + handle.pluginClient.Kill() + } + + d.tasks.Delete(taskID) + return nil +} + +func (d *ExecDriver) InspectTask(taskID string) (*drivers.TaskStatus, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + handle.stateLock.RLock() + defer handle.stateLock.RUnlock() + + status := &drivers.TaskStatus{ + ID: handle.task.ID, + Name: handle.task.Name, + State: handle.procState, + StartedAt: handle.startedAt, + CompletedAt: handle.completedAt, + ExitResult: handle.exitResult, + DriverAttributes: map[string]string{ + "pid": strconv.Itoa(handle.pid), + }, + } + + return status, nil +} + +func (d *ExecDriver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + stats, err := handle.exec.Stats() + if err != nil { + return nil, fmt.Errorf("failed to retrieve stats from executor: %v", err) + } + + return stats, nil +} + +func (d *ExecDriver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { + return d.eventer.TaskEvents(ctx) +} + +func (d *ExecDriver) SignalTask(taskID string, signal string) error { + handle, ok := d.tasks.Get(taskID) + if !ok { + return drivers.ErrTaskNotFound + } + + sig := os.Interrupt + if s, ok := signals.SignalLookup[signal]; ok { + d.logger.Warn("signal to send to task unknown, using SIGINT", "signal", signal, "task_id", handle.task.ID) + sig = s + } + return handle.exec.Signal(sig) +} + +func (d *ExecDriver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { + if len(cmd) == 0 { + return nil, fmt.Errorf("error cmd must have atleast one value") + } + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + args := []string{} + if len(cmd) > 1 { + args = cmd[1:] + } + + out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], args) + if err != nil { + return nil, err + } + + return &drivers.ExecTaskResult{ + Stdout: out, + ExitResult: &drivers.ExitResult{ + ExitCode: exitCode, + }, + }, nil +} diff --git a/drivers/exec/driver_default.go b/drivers/exec/driver_default.go new file mode 100644 index 00000000000..f8c8051d6b7 --- /dev/null +++ b/drivers/exec/driver_default.go @@ -0,0 +1,12 @@ +//+build !linux + +package exec + +import "github.com/hashicorp/nomad/plugins/drivers" + +func (d *ExecDriver) buildFingerprint() *drivers.Fingerprint { + return &drivers.Fingerprint{ + Health: drivers.HealthStateUndetected, + HealthDescription: "exec driver unsupported on client OS", + } +} diff --git a/drivers/exec/driver_linux.go b/drivers/exec/driver_linux.go new file mode 100644 index 00000000000..411d33dd747 --- /dev/null +++ b/drivers/exec/driver_linux.go @@ -0,0 +1,38 @@ +package exec + +import ( + "github.com/hashicorp/nomad/client/fingerprint" + "github.com/hashicorp/nomad/plugins/drivers" + "golang.org/x/sys/unix" +) + +func (d *ExecDriver) buildFingerprint() *drivers.Fingerprint { + fp := &drivers.Fingerprint{ + Attributes: map[string]string{}, + Health: drivers.HealthStateHealthy, + HealthDescription: "healthy", + } + + mount, err := fingerprint.FindCgroupMountpointDir() + if err != nil { + fp.Health = drivers.HealthStateUnhealthy + fp.HealthDescription = "failed to discover cgroup mount point" + d.logger.Warn(fp.HealthDescription, "error", err) + return fp + } + + if mount == "" { + fp.Health = drivers.HealthStateUnhealthy + fp.HealthDescription = "cgroups are unavailable" + return fp + } + + if unix.Geteuid() != 0 { + fp.Health = drivers.HealthStateUnhealthy + fp.HealthDescription = "exec driver must run as root" + return fp + } + + fp.Attributes["driver.exec"] = "1" + return fp +} diff --git a/drivers/exec/driver_test.go b/drivers/exec/driver_test.go new file mode 100644 index 00000000000..131154f009a --- /dev/null +++ b/drivers/exec/driver_test.go @@ -0,0 +1,442 @@ +package exec + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "strings" + "sync" + "testing" + "time" + + "github.com/hashicorp/hcl2/hcl" + ctestutils "github.com/hashicorp/nomad/client/testutil" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/testtask" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/shared" + "github.com/hashicorp/nomad/plugins/shared/hclspec" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +func TestMain(m *testing.M) { + if !testtask.Run() { + os.Exit(m.Run()) + } +} + +func TestExecDriver_Fingerprint_NonLinux(t *testing.T) { + if !testutil.IsTravis() { + t.Parallel() + } + require := require.New(t) + if runtime.GOOS == "linux" { + t.Skip("Test only available not on Linux") + } + + d := NewExecDriver(testlog.HCLogger(t)) + harness := drivers.NewDriverHarness(t, d) + + fingerCh, err := harness.Fingerprint(context.Background()) + require.NoError(err) + select { + case finger := <-fingerCh: + require.Equal(drivers.HealthStateUndetected, finger.Health) + case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second): + require.Fail("timeout receiving fingerprint") + } +} + +func TestExecDriver_Fingerprint(t *testing.T) { + t.Parallel() + require := require.New(t) + + ctestutils.ExecCompatible(t) + + d := NewExecDriver(testlog.HCLogger(t)) + harness := drivers.NewDriverHarness(t, d) + + fingerCh, err := harness.Fingerprint(context.Background()) + require.NoError(err) + select { + case finger := <-fingerCh: + require.Equal(drivers.HealthStateHealthy, finger.Health) + require.Equal("1", finger.Attributes["driver.exec"]) + case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second): + require.Fail("timeout receiving fingerprint") + } +} + +func TestExecDriver_StartWait(t *testing.T) { + t.Parallel() + require := require.New(t) + ctestutils.ExecCompatible(t) + + d := NewExecDriver(testlog.HCLogger(t)) + harness := drivers.NewDriverHarness(t, d) + task := &drivers.TaskConfig{ + ID: uuid.Generate(), + Name: "test", + } + + taskConfig := map[string]interface{}{ + "command": "cat", + "args": []string{"/proc/self/cgroup"}, + } + encodeDriverHelper(require, task, taskConfig) + + cleanup := harness.MkAllocDir(task, false) + defer cleanup() + fmt.Println(task.AllocDir) + + handle, _, err := harness.StartTask(task) + require.NoError(err) + + ch, err := harness.WaitTask(context.Background(), handle.Config.ID) + require.NoError(err) + result := <-ch + require.Zero(result.ExitCode) + require.NoError(harness.DestroyTask(task.ID, true)) +} + +func TestExecDriver_StartWaitStop(t *testing.T) { + t.Parallel() + require := require.New(t) + ctestutils.ExecCompatible(t) + + d := NewExecDriver(testlog.HCLogger(t)) + harness := drivers.NewDriverHarness(t, d) + task := &drivers.TaskConfig{ + ID: uuid.Generate(), + Name: "test", + } + + taskConfig := map[string]interface{}{ + "command": "/bin/sleep", + "args": []string{"5"}, + } + encodeDriverHelper(require, task, taskConfig) + + cleanup := harness.MkAllocDir(task, false) + defer cleanup() + + handle, _, err := harness.StartTask(task) + require.NoError(err) + + ch, err := harness.WaitTask(context.Background(), handle.Config.ID) + require.NoError(err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + result := <-ch + require.Equal(2, result.Signal) + }() + + require.NoError(harness.WaitUntilStarted(task.ID, 1*time.Second)) + + wg.Add(1) + go func() { + defer wg.Done() + err := harness.StopTask(task.ID, 2*time.Second, "SIGINT") + require.NoError(err) + }() + + waitCh := make(chan struct{}) + go func() { + defer close(waitCh) + wg.Wait() + }() + + select { + case <-waitCh: + status, err := harness.InspectTask(task.ID) + require.NoError(err) + require.Equal(drivers.TaskStateExited, status.State) + case <-time.After(1 * time.Second): + require.Fail("timeout waiting for task to shutdown") + } + + require.NoError(harness.DestroyTask(task.ID, true)) +} + +func TestExecDriver_StartWaitRecover(t *testing.T) { + t.Parallel() + require := require.New(t) + ctestutils.ExecCompatible(t) + + d := NewExecDriver(testlog.HCLogger(t)) + harness := drivers.NewDriverHarness(t, d) + task := &drivers.TaskConfig{ + ID: uuid.Generate(), + Name: "test", + } + + taskConfig := map[string]interface{}{ + "command": "/bin/sleep", + "args": []string{"5"}, + } + encodeDriverHelper(require, task, taskConfig) + + cleanup := harness.MkAllocDir(task, false) + defer cleanup() + + handle, _, err := harness.StartTask(task) + require.NoError(err) + + ctx, cancel := context.WithCancel(context.Background()) + + ch, err := harness.WaitTask(ctx, handle.Config.ID) + require.NoError(err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + result := <-ch + require.Error(result.Err) + }() + + require.NoError(harness.WaitUntilStarted(task.ID, 1*time.Second)) + cancel() + + waitCh := make(chan struct{}) + go func() { + defer close(waitCh) + wg.Wait() + }() + + select { + case <-waitCh: + status, err := harness.InspectTask(task.ID) + require.NoError(err) + require.Equal(drivers.TaskStateRunning, status.State) + case <-time.After(1 * time.Second): + require.Fail("timeout waiting for task wait to cancel") + } + + // Loose task + d.(*ExecDriver).tasks.Delete(task.ID) + _, err = harness.InspectTask(task.ID) + require.Error(err) + + require.NoError(harness.RecoverTask(handle)) + status, err := harness.InspectTask(task.ID) + require.NoError(err) + require.Equal(drivers.TaskStateRunning, status.State) + + require.NoError(harness.StopTask(task.ID, 0, "")) + require.NoError(harness.DestroyTask(task.ID, true)) +} + +func TestExecDriver_Stats(t *testing.T) { + t.Parallel() + require := require.New(t) + ctestutils.ExecCompatible(t) + + d := NewExecDriver(testlog.HCLogger(t)) + harness := drivers.NewDriverHarness(t, d) + task := &drivers.TaskConfig{ + ID: uuid.Generate(), + Name: "test", + } + + taskConfig := map[string]interface{}{ + "command": "/bin/sleep", + "args": []string{"5"}, + } + encodeDriverHelper(require, task, taskConfig) + + cleanup := harness.MkAllocDir(task, false) + defer cleanup() + + handle, _, err := harness.StartTask(task) + require.NoError(err) + require.NotNil(handle) + + require.NoError(harness.WaitUntilStarted(task.ID, 1*time.Second)) + stats, err := harness.TaskStats(task.ID) + require.NoError(err) + require.NotZero(stats.ResourceUsage.MemoryStats.RSS) + + require.NoError(harness.DestroyTask(task.ID, true)) +} + +func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { + t.Parallel() + require := require.New(t) + ctestutils.ExecCompatible(t) + + d := NewExecDriver(testlog.HCLogger(t)) + harness := drivers.NewDriverHarness(t, d) + task := &drivers.TaskConfig{ + ID: uuid.Generate(), + Name: "sleep", + } + cleanup := harness.MkAllocDir(task, false) + defer cleanup() + + exp := []byte{'w', 'i', 'n'} + file := "output.txt" + taskConfig := map[string]interface{}{ + "command": "/bin/bash", + "args": []string{ + "-c", + fmt.Sprintf(`sleep 1; echo -n %s > /alloc/%s`, string(exp), file), + }, + } + encodeDriverHelper(require, task, taskConfig) + + handle, _, err := harness.StartTask(task) + require.NoError(err) + require.NotNil(handle) + + // Task should terminate quickly + waitCh, err := harness.WaitTask(context.Background(), task.ID) + require.NoError(err) + select { + case res := <-waitCh: + require.True(res.Successful(), "task should have exited successfully: %v", res) + case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second): + require.Fail("timeout waiting for task") + } + + // Check that data was written to the shared alloc directory. + outputFile := filepath.Join(task.TaskDir().SharedAllocDir, file) + act, err := ioutil.ReadFile(outputFile) + require.NoError(err) + require.Exactly(exp, act) + + require.NoError(harness.DestroyTask(task.ID, true)) +} + +func TestExecDriver_User(t *testing.T) { + t.Parallel() + require := require.New(t) + ctestutils.ExecCompatible(t) + + d := NewExecDriver(testlog.HCLogger(t)) + harness := drivers.NewDriverHarness(t, d) + task := &drivers.TaskConfig{ + ID: uuid.Generate(), + Name: "sleep", + User: "alice", + } + cleanup := harness.MkAllocDir(task, false) + defer cleanup() + + taskConfig := map[string]interface{}{ + "command": "/bin/sleep", + "args": []string{"100"}, + } + encodeDriverHelper(require, task, taskConfig) + + handle, _, err := harness.StartTask(task) + require.Error(err) + require.Nil(handle) + + msg := "user alice" + if !strings.Contains(err.Error(), msg) { + t.Fatalf("Expecting '%v' in '%v'", msg, err) + } +} + +// TestExecDriver_HandlerExec ensures the exec driver's handle properly +// executes commands inside the container. +func TestExecDriver_HandlerExec(t *testing.T) { + t.Parallel() + require := require.New(t) + ctestutils.ExecCompatible(t) + + d := NewExecDriver(testlog.HCLogger(t)) + harness := drivers.NewDriverHarness(t, d) + task := &drivers.TaskConfig{ + ID: uuid.Generate(), + Name: "sleep", + } + cleanup := harness.MkAllocDir(task, false) + defer cleanup() + + taskConfig := map[string]interface{}{ + "command": "/bin/sleep", + "args": []string{"9000"}, + } + encodeDriverHelper(require, task, taskConfig) + + handle, _, err := harness.StartTask(task) + require.NoError(err) + require.NotNil(handle) + + // Exec a command that should work and dump the environment + // TODO: enable section when exec env is fully loaded + /*res, err := harness.ExecTask(task.ID, []string{"/bin/sh", "-c", "env | grep ^NOMAD"}, time.Second) + require.NoError(err) + require.True(res.ExitResult.Successful()) + + // Assert exec'd commands are run in a task-like environment + scriptEnv := make(map[string]string) + for _, line := range strings.Split(string(res.Stdout), "\n") { + if line == "" { + continue + } + parts := strings.SplitN(string(line), "=", 2) + if len(parts) != 2 { + t.Fatalf("Invalid env var: %q", line) + } + scriptEnv[parts[0]] = parts[1] + } + if v, ok := scriptEnv["NOMAD_SECRETS_DIR"]; !ok || v != "/secrets" { + t.Errorf("Expected NOMAD_SECRETS_DIR=/secrets but found=%t value=%q", ok, v) + }*/ + + // Assert cgroup membership + res, err := harness.ExecTask(task.ID, []string{"/bin/cat", "/proc/self/cgroup"}, time.Second) + require.NoError(err) + require.True(res.ExitResult.Successful()) + found := false + for _, line := range strings.Split(string(res.Stdout), "\n") { + // Every cgroup entry should be /nomad/$ALLOC_ID + if line == "" { + continue + } + // Skip systemd cgroup + if strings.HasPrefix(line, "1:name=systemd") { + continue + } + if !strings.Contains(line, ":/nomad/") { + t.Errorf("Not a member of the alloc's cgroup: expected=...:/nomad/... -- found=%q", line) + continue + } + found = true + } + require.True(found, "exec'd command isn't in the task's cgroup") + + // Exec a command that should fail + res, err = harness.ExecTask(task.ID, []string{"/usr/bin/stat", "lkjhdsaflkjshowaisxmcvnlia"}, time.Second) + require.NoError(err) + require.False(res.ExitResult.Successful()) + if expected := "No such file or directory"; !bytes.Contains(res.Stdout, []byte(expected)) { + t.Fatalf("expected output to contain %q but found: %q", expected, res.Stdout) + } + + require.NoError(harness.DestroyTask(task.ID, true)) +} + +func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) { + evalCtx := &hcl.EvalContext{ + Functions: shared.GetStdlibFuncs(), + } + spec, diag := hclspec.Convert(taskConfigSpec) + require.False(diag.HasErrors()) + taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx) + require.False(diag.HasErrors()) + err := task.EncodeDriverConfig(taskConfigCtyVal) + require.Nil(err) +} diff --git a/drivers/exec/handle.go b/drivers/exec/handle.go new file mode 100644 index 00000000000..2a8c90dc91f --- /dev/null +++ b/drivers/exec/handle.go @@ -0,0 +1,52 @@ +package exec + +import ( + "sync" + "time" + + hclog "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/driver/executor" + "github.com/hashicorp/nomad/plugins/drivers" +) + +type execTaskHandle struct { + exec executor.Executor + pid int + pluginClient *plugin.Client + logger hclog.Logger + + // stateLock syncs access to all fields below + stateLock sync.RWMutex + + task *drivers.TaskConfig + procState drivers.TaskState + startedAt time.Time + completedAt time.Time + exitResult *drivers.ExitResult + exitCh chan struct{} +} + +func (h *execTaskHandle) IsRunning() bool { + return h.procState == drivers.TaskStateRunning +} + +func (h *execTaskHandle) run() { + defer close(h.exitCh) + if h.exitResult == nil { + h.exitResult = &drivers.ExitResult{} + } + + ps, err := h.exec.Wait() + if err != nil { + h.exitResult.Err = err + h.procState = drivers.TaskStateUnknown + h.completedAt = time.Now() + return + } + h.procState = drivers.TaskStateExited + h.exitResult.ExitCode = ps.ExitCode + h.exitResult.Signal = ps.Signal + h.completedAt = ps.Time + // TODO: plumb OOM bool +} diff --git a/drivers/exec/state.go b/drivers/exec/state.go new file mode 100644 index 00000000000..ca3b1cd2c3d --- /dev/null +++ b/drivers/exec/state.go @@ -0,0 +1,33 @@ +package exec + +import ( + "sync" +) + +type taskStore struct { + store map[string]*execTaskHandle + lock sync.RWMutex +} + +func newTaskStore() *taskStore { + return &taskStore{store: map[string]*execTaskHandle{}} +} + +func (ts *taskStore) Set(id string, handle *execTaskHandle) { + ts.lock.Lock() + defer ts.lock.Unlock() + ts.store[id] = handle +} + +func (ts *taskStore) Get(id string) (*execTaskHandle, bool) { + ts.lock.RLock() + defer ts.lock.RUnlock() + t, ok := ts.store[id] + return t, ok +} + +func (ts *taskStore) Delete(id string) { + ts.lock.Lock() + defer ts.lock.Unlock() + delete(ts.store, id) +} diff --git a/plugins/drivers/server.go b/plugins/drivers/server.go index 80e232d06ae..4ad385e24e9 100644 --- a/plugins/drivers/server.go +++ b/plugins/drivers/server.go @@ -1,6 +1,7 @@ package drivers import ( + "fmt" "io" "golang.org/x/net/context" @@ -125,7 +126,19 @@ func (b *driverPluginServer) WaitTask(ctx context.Context, req *proto.WaitTaskRe return nil, err } - result := <-ch + var ok bool + var result *ExitResult + select { + case <-ctx.Done(): + return nil, ctx.Err() + case result, ok = <-ch: + if !ok { + return &proto.WaitTaskResponse{ + Err: "channel closed", + }, nil + } + } + var errStr string if result.Err != nil { errStr = result.Err.Error() @@ -206,7 +219,7 @@ func (b *driverPluginServer) TaskStats(ctx context.Context, req *proto.TaskStats pb, err := taskStatsToProto(stats) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to encode task stats: %v", err) } resp := &proto.TaskStatsResponse{ diff --git a/plugins/drivers/testing.go b/plugins/drivers/testing.go index 53eecb64214..000c81b92db 100644 --- a/plugins/drivers/testing.go +++ b/plugins/drivers/testing.go @@ -14,6 +14,7 @@ import ( hclog "github.com/hashicorp/go-hclog" plugin "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/logmon" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/testlog" @@ -55,6 +56,7 @@ func NewDriverHarness(t testing.T, d DriverPlugin) *DriverHarness { server: server, DriverPlugin: dClient, logger: logger, + t: t, } raw, err = client.Dispense("logmon") @@ -84,7 +86,16 @@ func (h *DriverHarness) MkAllocDir(t *TaskConfig, enableLogs bool) func() { allocDir := allocdir.NewAllocDir(h.logger, dir) require.NoError(h.t, allocDir.Build()) taskDir := allocDir.NewTaskDir(t.Name) - require.NoError(h.t, taskDir.Build(false, nil, 0)) + + caps, err := h.Capabilities() + require.NoError(h.t, err) + + var entries map[string]string + fsi := caps.FSIsolation + if fsi == cstructs.FSIsolationChroot { + entries = config.DefaultChrootEnv + } + require.NoError(h.t, taskDir.Build(false, entries, fsi)) //logmon if enableLogs { diff --git a/plugins/drivers/utils.go b/plugins/drivers/utils.go index 293e14c4553..db37561879b 100644 --- a/plugins/drivers/utils.go +++ b/plugins/drivers/utils.go @@ -263,7 +263,7 @@ func taskStatusFromProto(pb *proto.TaskStatus) (*TaskStatus, error) { } func taskStatsToProto(stats *cstructs.TaskResourceUsage) (*proto.TaskStats, error) { - timestamp, err := ptypes.TimestampProto(time.Unix(stats.Timestamp, 0)) + timestamp, err := ptypes.TimestampProto(time.Unix(0, stats.Timestamp)) if err != nil { return nil, err }