Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[shim] Revamp logging and CLI #2176

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runner/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Here's the steps to build `dstack-shim` and `dstack-runner` and run `dstack` wit
3. Start the shim:

```shell
./shim --home $RUNNER_DIR --runner-binary-path $COMPILED_RUNNER_PATH docker --ssh-key $DSTACK_PUBLIC_KEY
./shim --shim-home $RUNNER_DIR --runner-binary-path $COMPILED_RUNNER_PATH --ssh-key $DSTACK_PUBLIC_KEY
```

Notes:
Expand Down
242 changes: 138 additions & 104 deletions runner/cmd/shim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ import (
"context"
"errors"
"fmt"
"log"
"io"
"net/http"
"os"
"path"
"path/filepath"
"time"

"github.com/dstackai/dstack/runner/consts"
"github.com/dstackai/dstack/runner/internal/log"
"github.com/dstackai/dstack/runner/internal/shim"
"github.com/dstackai/dstack/runner/internal/shim/api"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)

Expand All @@ -24,26 +27,54 @@ func main() {
args.Docker.SSHPort = 10022
var serviceMode bool

const defaultLogLevel = int(logrus.InfoLevel)

ctx := context.Background()

log.DefaultEntry.Logger.SetLevel(logrus.Level(defaultLogLevel))
log.DefaultEntry.Logger.SetOutput(os.Stderr)

app := &cli.App{
Name: "dstack-shim",
Usage: "Starts dstack-runner or docker container.",
Version: Version,
Flags: []cli.Flag{
/* Shim Parameters */
&cli.PathFlag{
Name: "home",
Usage: "Dstack home directory",
Name: "shim-home",
Usage: "Set shim's home directory",
Destination: &args.Shim.HomeDir,
EnvVars: []string{"DSTACK_HOME"},
DefaultText: path.Join("~", consts.DstackDirPath),
EnvVars: []string{"DSTACK_SHIM_HOME"},
},
&cli.IntFlag{
Name: "shim-http-port",
Usage: "Set's shim's http port",
Usage: "Set shim's http port",
Value: 10998,
Destination: &args.Shim.HTTPPort,
EnvVars: []string{"DSTACK_SHIM_HTTP_PORT"},
},
&cli.IntFlag{
Name: "shim-log-level",
Usage: "Set shim's log level",
Value: defaultLogLevel,
Destination: &args.Shim.LogLevel,
EnvVars: []string{"DSTACK_SHIM_LOG_LEVEL"},
},
/* Runner Parameters */
&cli.StringFlag{
Name: "runner-download-url",
Usage: "Set runner's download URL",
Destination: &args.Runner.DownloadURL,
Required: true,
EnvVars: []string{"DSTACK_RUNNER_DOWNLOAD_URL"},
},
&cli.PathFlag{
Name: "runner-binary-path",
Usage: "Path to runner's binary",
Destination: &args.Runner.BinaryPath,
EnvVars: []string{"DSTACK_RUNNER_BINARY_PATH"},
},
&cli.IntFlag{
Name: "runner-http-port",
Usage: "Set runner's http port",
Expand All @@ -54,115 +85,118 @@ func main() {
&cli.IntFlag{
Name: "runner-log-level",
Usage: "Set runner's log level",
Value: 4,
Value: defaultLogLevel,
Destination: &args.Runner.LogLevel,
EnvVars: []string{"DSTACK_RUNNER_LOG_LEVEL"},
},
/* Docker Parameters */
&cli.BoolFlag{
Name: "privileged",
Usage: "Give extended privileges to the container",
Destination: &args.Docker.Privileged,
EnvVars: []string{"DSTACK_DOCKER_PRIVILEGED"},
},
&cli.StringFlag{
Name: "runner-download-url",
Usage: "Set runner's download URL",
Destination: &args.Runner.DownloadURL,
EnvVars: []string{"DSTACK_RUNNER_DOWNLOAD_URL"},
Name: "ssh-key",
Usage: "Public SSH key",
Required: true,
Destination: &args.Docker.ConcatinatedPublicSSHKeys,
EnvVars: []string{"DSTACK_PUBLIC_SSH_KEY"},
},
&cli.PathFlag{
Name: "runner-binary-path",
Usage: "Path to runner's binary",
Destination: &args.Runner.BinaryPath,
EnvVars: []string{"DSTACK_RUNNER_BINARY_PATH"},
&cli.StringFlag{
Name: "pjrt-device",
Usage: "Set the PJRT_DEVICE environment variable (e.g., TPU, GPU)",
Destination: &args.Docker.PJRTDevice,
EnvVars: []string{"PJRT_DEVICE"},
},
},
Commands: []*cli.Command{
{
Name: "docker",
Usage: "Starts docker container and modifies entrypoint",
Flags: []cli.Flag{
/* Docker Parameters */
&cli.BoolFlag{
Name: "privileged",
Usage: "Give extended privileges to the container",
Destination: &args.Docker.Privileged,
EnvVars: []string{"DSTACK_DOCKER_PRIVILEGED"},
},
&cli.StringFlag{
Name: "ssh-key",
Usage: "Public SSH key",
Required: true,
Destination: &args.Docker.ConcatinatedPublicSSHKeys,
EnvVars: []string{"DSTACK_PUBLIC_SSH_KEY"},
},
&cli.StringFlag{
Name: "pjrt-device",
Usage: "Set the PJRT_DEVICE environment variable (e.g., TPU, GPU)",
Destination: &args.Docker.PJRTDevice,
EnvVars: []string{"PJRT_DEVICE"},
},
&cli.BoolFlag{
Name: "service",
Usage: "Start as a service",
Destination: &serviceMode,
EnvVars: []string{"DSTACK_SERVICE_MODE"},
},
},
Action: func(c *cli.Context) error {
if args.Runner.BinaryPath == "" {
if err := args.DownloadRunner(); err != nil {
return cli.Exit(err, 1)
}
}

args.Runner.HomeDir = "/root"
args.Runner.WorkingDir = "/workflow"

var err error

shimHomeDir := args.Shim.HomeDir
if shimHomeDir == "" {
home, err := os.UserHomeDir()
if err != nil {
return cli.Exit(err, 1)
}
shimHomeDir = filepath.Join(home, consts.DstackDirPath)
args.Shim.HomeDir = shimHomeDir
}
log.Printf("Config Shim: %+v\n", args.Shim)
log.Printf("Config Runner: %+v\n", args.Runner)
log.Printf("Config Docker: %+v\n", args.Docker)

dockerRunner, err := shim.NewDockerRunner(&args)
if err != nil {
return cli.Exit(err, 1)
}

address := fmt.Sprintf(":%d", args.Shim.HTTPPort)
shimServer := api.NewShimServer(address, dockerRunner, Version)

defer func() {
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelShutdown()
_ = shimServer.HttpServer.Shutdown(shutdownCtx)
}()

if serviceMode {
if err := shim.WriteHostInfo(shimHomeDir, dockerRunner.Resources()); err != nil {
if errors.Is(err, os.ErrExist) {
log.Println("cannot write host info: file already exists")
} else {
return cli.Exit(err, 1)
}
}
}

if err := shimServer.HttpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return cli.Exit(err, 1)
}

return nil
},
/* Misc Parameters */
&cli.BoolFlag{
Name: "service",
Usage: "Start as a service",
Destination: &serviceMode,
EnvVars: []string{"DSTACK_SERVICE_MODE"},
},
},
Action: func(c *cli.Context) error {
return start(ctx, args, serviceMode)
},
}

if err := app.Run(os.Args); err != nil {
log.Fatal(err)
log.Fatal(ctx, err.Error())
}
}

func start(ctx context.Context, args shim.CLIArgs, serviceMode bool) (err error) {
log.DefaultEntry.Logger.SetLevel(logrus.Level(args.Shim.LogLevel))

shimHomeDir := args.Shim.HomeDir
if shimHomeDir == "" {
home, err := os.UserHomeDir()
if err != nil {
return err
}
shimHomeDir = filepath.Join(home, consts.DstackDirPath)
args.Shim.HomeDir = shimHomeDir
}

shimLogFile, err := log.CreateAppendFile(filepath.Join(shimHomeDir, consts.ShimLogFileName))
if err != nil {
return fmt.Errorf("failed to create shim log file: %w", err)
}
defer func() {
_ = shimLogFile.Close()
}()

originalLogger := log.GetLogger(ctx)
loggerOut := io.MultiWriter(originalLogger.Logger.Out, shimLogFile)
ctx = log.WithLogger(ctx, log.NewEntry(loggerOut, int(originalLogger.Logger.GetLevel())))

defer func() {
// Should be called _before_ we close shimLogFile
// If an error occurs earlier, we still log it to stderr in the main function
if err != nil {
log.Error(ctx, err.Error())
}
}()

if args.Runner.BinaryPath == "" {
if err := args.DownloadRunner(ctx); err != nil {
return err
}
}

log.Debug(ctx, "Shim", "args", args.Shim)
log.Debug(ctx, "Runner", "args", args.Runner)
log.Debug(ctx, "Docker", "args", args.Docker)

dockerRunner, err := shim.NewDockerRunner(ctx, &args)
if err != nil {
return err
}

address := fmt.Sprintf(":%d", args.Shim.HTTPPort)
shimServer := api.NewShimServer(ctx, address, dockerRunner, Version)

defer func() {
shutdownCtx, cancelShutdown := context.WithTimeout(ctx, 5*time.Second)
defer cancelShutdown()
_ = shimServer.HttpServer.Shutdown(shutdownCtx)
}()

if serviceMode {
if err := shim.WriteHostInfo(shimHomeDir, dockerRunner.Resources(ctx)); err != nil {
if errors.Is(err, os.ErrExist) {
log.Error(ctx, "cannot write host info: file already exists")
} else {
return err
}
}
}

if err := shimServer.HttpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}

return nil
}
17 changes: 15 additions & 2 deletions runner/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,18 @@ const (
// Error-containing messages will be identified by this signature
const ExecutorFailedSignature = "Executor failed"

// A directory inside the container where runner stores its files (logs, etc.)
const RunnerDir = "/tmp/runner"
// All the following are directories inside the container
const (
// A directory where runner stores its files (logs, etc.)
// NOTE: RunnerRuntimeDir would be a more appropriate name, but it's called tempDir
// throughout runner's codebase
RunnerTempDir = "/tmp/runner"
// Currently, it's a directory where autorized_keys, git credentials, etc. are placed
// The current user's homedir (as of 2024-12-28, it's always root) should be used
// instead of the hardcoded value
RunnerHomeDir = "/root"
// A repo directory and a default working directory for the job
RunnerWorkingDir = "/workflow"
)

const ShimLogFileName = "shim.log"
5 changes: 5 additions & 0 deletions runner/internal/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func NewEntry(out io.Writer, level int) *logrus.Entry {

var DefaultEntry = NewEntry(os.Stderr, int(logrus.InfoLevel))

func Fatal(ctx context.Context, msg string, args ...interface{}) {
logger := AppendArgs(GetLogger(ctx), args...)
logger.Fatal(msg)
}

func Error(ctx context.Context, msg string, args ...interface{}) {
logger := AppendArgs(GetLogger(ctx), args...)
logger.Error(msg)
Expand Down
2 changes: 1 addition & 1 deletion runner/internal/shim/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (ds *DummyRunner) TaskInfo(taskID string) shim.TaskInfo {
return shim.TaskInfo{}
}

func (ds *DummyRunner) Resources() shim.Resources {
func (ds *DummyRunner) Resources(context.Context) shim.Resources {
return shim.Resources{}
}

Expand Down
Loading
Loading