From 010ad65d9b36138de0222239cdb3a96ea29236fa Mon Sep 17 00:00:00 2001 From: Sebastian Tiedtke Date: Tue, 16 Jul 2024 11:58:40 -0700 Subject: [PATCH] ExecInfo prototype --- internal/command/exec_info.go | 17 +++++++++++++ internal/owl/store.go | 18 +++++++++++--- internal/runner/command.go | 12 ++++++---- internal/runner/command_test.go | 16 +++++++++++++ internal/runner/service.go | 22 +++++++++++------ internal/runner/session.go | 42 ++++++++++++++++----------------- internal/runner/shell.go | 1 + internal/runner/shellraw.go | 1 + internal/runner/tempfile.go | 1 + 9 files changed, 94 insertions(+), 36 deletions(-) create mode 100644 internal/command/exec_info.go diff --git a/internal/command/exec_info.go b/internal/command/exec_info.go new file mode 100644 index 000000000..55b064ffb --- /dev/null +++ b/internal/command/exec_info.go @@ -0,0 +1,17 @@ +package command + +import "context" + +type runnerContextKey struct{} + +var ExecutionInfoKey = &runnerContextKey{} + +type ExecutionInfo struct { + RunID string + KnownName string + KnownID string +} + +func ContextWithExecutionInfo(ctx context.Context, execInfo *ExecutionInfo) context.Context { + return context.WithValue(ctx, ExecutionInfoKey, execInfo) +} diff --git a/internal/owl/store.go b/internal/owl/store.go index f8cd6a142..d40806016 100644 --- a/internal/owl/store.go +++ b/internal/owl/store.go @@ -2,6 +2,7 @@ package owl import ( "bytes" + "context" "encoding/json" "fmt" "slices" @@ -13,6 +14,7 @@ import ( "github.com/graphql-go/graphql" "github.com/stateful/godotenv" + commandpkg "github.com/stateful/runme/v3/internal/command" "go.uber.org/zap" ) @@ -521,17 +523,27 @@ func (s *Store) SensitiveKeys() ([]string, error) { return keys, nil } -func (s *Store) Update(newOrUpdated, deleted []string) error { +func (s *Store) Update(context context.Context, newOrUpdated, deleted []string) error { s.mu.Lock() defer s.mu.Unlock() + execInfo, ok := context.Value(commandpkg.ExecutionInfoKey).(*commandpkg.ExecutionInfo) + if !ok { + return errors.New("execution info not found in context") + } + + execRef := fmt.Sprintf("#%s", execInfo.KnownID) + if execInfo.KnownName != "" { + execRef = fmt.Sprintf("#%s", execInfo.KnownName) + } + if len(newOrUpdated) > 0 { updateOpSet, err := NewOperationSet(WithOperation(UpdateSetOperation), WithSpecs(false)) if err != nil { return err } - err = updateOpSet.addEnvs("[execution]", newOrUpdated...) + err = updateOpSet.addEnvs(execRef, newOrUpdated...) if err != nil { return err } @@ -545,7 +557,7 @@ func (s *Store) Update(newOrUpdated, deleted []string) error { return err } - err = deleteOpSet.addEnvs("[execution]", deleted...) + err = deleteOpSet.addEnvs(execRef, deleted...) if err != nil { return err } diff --git a/internal/runner/command.go b/internal/runner/command.go index 169e3c387..62762dd74 100644 --- a/internal/runner/command.go +++ b/internal/runner/command.go @@ -51,9 +51,10 @@ type command struct { tempScriptFile string - wg sync.WaitGroup - mu sync.Mutex - err error + context context.Context + wg sync.WaitGroup + mu sync.Mutex + err error logger *zap.Logger } @@ -103,7 +104,7 @@ type commandConfig struct { Logger *zap.Logger } -func newCommand(cfg *commandConfig) (*command, error) { +func newCommand(context context.Context, cfg *commandConfig) (*command, error) { var pathEnv string // If PATH is set in the session, use it in the system @@ -244,6 +245,7 @@ func newCommand(cfg *commandConfig) (*command, error) { args = append(args, cfg.Args...) cmd := &command{ + context: context, ProgramPath: programPath, Args: append(args, extraArgs...), Directory: directory, @@ -501,7 +503,7 @@ func (c *command) collectEnvs() { newEnvStore(endEnvs...), ) - err = c.Session.UpdateStore(c.cmd.Env, newOrUpdated, deleted) + err = c.Session.UpdateStore(c.context, c.cmd.Env, newOrUpdated, deleted) c.seterr(err) } diff --git a/internal/runner/command_test.go b/internal/runner/command_test.go index 2c30d3159..e5cdfad36 100644 --- a/internal/runner/command_test.go +++ b/internal/runner/command_test.go @@ -32,6 +32,7 @@ func Test_command(t *testing.T) { stderr := new(bytes.Buffer) cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "bash", Stdout: stdout, @@ -59,6 +60,7 @@ func Test_command(t *testing.T) { stderr := new(bytes.Buffer) cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "bash", Stdout: stdout, @@ -86,6 +88,7 @@ func Test_command(t *testing.T) { stderr := new(bytes.Buffer) cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "", LanguageID: "shellscript", @@ -114,6 +117,7 @@ func Test_command(t *testing.T) { stderr := new(bytes.Buffer) cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "", LanguageID: "js", @@ -142,6 +146,7 @@ func Test_command(t *testing.T) { stderr := new(bytes.Buffer) cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "/usr/bin/env node", LanguageID: "js", @@ -170,6 +175,7 @@ func Test_command(t *testing.T) { stderr := new(bytes.Buffer) cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "", LanguageID: "sql", @@ -200,6 +206,7 @@ func Test_command(t *testing.T) { stderr := new(bytes.Buffer) cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "bash", Tty: true, @@ -231,6 +238,7 @@ func Test_command(t *testing.T) { stderr := new(bytes.Buffer) cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "bash", Tty: true, @@ -266,6 +274,7 @@ func Test_command(t *testing.T) { _, _ = stdin.WriteString("hello") cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "bash", Stdin: stdin, @@ -295,6 +304,7 @@ func Test_command(t *testing.T) { stderr := new(bytes.Buffer) cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "bash", Tty: true, @@ -337,6 +347,7 @@ func Test_command(t *testing.T) { _, _ = stdin.WriteString("hello") cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "bash", Stdin: stdin, @@ -371,6 +382,7 @@ func Test_command(t *testing.T) { stderr := new(bytes.Buffer) cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "bash", Tty: true, @@ -427,6 +439,7 @@ func Test_command(t *testing.T) { require.NoError(t, err) cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "bash", Session: session, @@ -473,6 +486,7 @@ func Test_command(t *testing.T) { stdin := new(bytes.Buffer) cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "bash", Stdout: stdout, @@ -502,6 +516,7 @@ func Test_command_Stop(t *testing.T) { t.Parallel() cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "bash", Stdin: bytes.NewBuffer(nil), @@ -529,6 +544,7 @@ func Test_command_Stop(t *testing.T) { func Test_exitCodeFromErr(t *testing.T) { cmd, err := newCommand( + context.Background(), &commandConfig{ ProgramName: "bash", Tty: true, diff --git a/internal/runner/service.go b/internal/runner/service.go index cafa1237e..727fa2581 100644 --- a/internal/runner/service.go +++ b/internal/runner/service.go @@ -206,7 +206,8 @@ func ConvertRunnerProject(runnerProj *runnerv1.Project) (*project.Project, error } func (r *runnerService) Execute(srv runnerv1.RunnerService_ExecuteServer) error { - logger := r.logger.With(zap.String("_id", ulid.GenerateID())) + _id := ulid.GenerateID() + logger := r.logger.With(zap.String("_id", _id)) logger.Info("running Execute in runnerService") @@ -221,6 +222,13 @@ func (r *runnerService) Execute(srv runnerv1.RunnerService_ExecuteServer) error return errors.WithStack(err) } + execInfo := &commandpkg.ExecutionInfo{ + RunID: _id, + KnownName: req.GetKnownName(), + KnownID: req.GetKnownId(), + } + ctx := commandpkg.ContextWithExecutionInfo(srv.Context(), execInfo) + // We want to always log the request because it is used for AI training. // see: https://github.com/stateful/runme/issues/574 if req.KnownId != "" { @@ -255,7 +263,7 @@ func (r *runnerService) Execute(srv runnerv1.RunnerService_ExecuteServer) error } if len(req.Envs) > 0 { - err := sess.AddEnvs(req.Envs) + err := sess.AddEnvs(ctx, req.Envs) if err != nil { return err } @@ -304,7 +312,7 @@ func (r *runnerService) Execute(srv runnerv1.RunnerService_ExecuteServer) error } logger.Debug("command config", zap.Any("cfg", cfg)) - cmd, err := newCommand(cfg) + cmd, err := newCommand(ctx, cfg) if err != nil { var errInvalidLanguage ErrInvalidLanguage if errors.As(err, &errInvalidLanguage) { @@ -343,10 +351,10 @@ func (r *runnerService) Execute(srv runnerv1.RunnerService_ExecuteServer) error return err } - cmdCtx := srv.Context() + cmdCtx := ctx if req.Background { - cmdCtx = context.Background() + cmdCtx = commandpkg.ContextWithExecutionInfo(context.Background(), execInfo) } if err := cmd.StartWithOpts(cmdCtx, &startOpts{}); err != nil { @@ -518,14 +526,14 @@ func (r *runnerService) Execute(srv runnerv1.RunnerService_ExecuteServer) error } if storeStdout { - err := sess.SetEnv("__", string(stdoutMem)) + err := sess.SetEnv(ctx, "__", string(stdoutMem)) if err != nil { logger.Sugar().Errorf("%v", err) } knownName := req.GetKnownName() if knownName != "" && runnerConformsOpinionatedEnvVarNaming(knownName) { - err = sess.SetEnv(knownName, string(stdoutMem)) + err = sess.SetEnv(ctx, knownName, string(stdoutMem)) if err != nil { logger.Warn("failed to set env", zap.Error(err)) } diff --git a/internal/runner/session.go b/internal/runner/session.go index cae63338c..ddcac7214 100644 --- a/internal/runner/session.go +++ b/internal/runner/session.go @@ -18,9 +18,9 @@ type envStorer interface { getEnv(string) (string, error) envs() ([]string, error) sensitiveEnvKeys() ([]string, error) - addEnvs(envs []string) error - updateStore(envs []string, newOrUpdated []string, deleted []string) error - setEnv(k string, v string) error + addEnvs(context context.Context, envs []string) error + updateStore(context context.Context, envs []string, newOrUpdated []string, deleted []string) error + setEnv(context context.Context, k string, v string) error subscribe(ctx context.Context, snapshotc chan<- owl.SetVarItems) error complete() } @@ -65,20 +65,20 @@ func NewSessionWithStore(envs []string, proj *project.Project, owlStore bool, lo return s, nil } -func (s *Session) UpdateStore(envs []string, newOrUpdated []string, deleted []string) error { - return s.envStorer.updateStore(envs, newOrUpdated, deleted) +func (s *Session) UpdateStore(context context.Context, envs []string, newOrUpdated []string, deleted []string) error { + return s.envStorer.updateStore(context, envs, newOrUpdated, deleted) } -func (s *Session) AddEnvs(envs []string) error { - return s.envStorer.addEnvs(envs) +func (s *Session) AddEnvs(context context.Context, envs []string) error { + return s.envStorer.addEnvs(context, envs) } func (s *Session) SensitiveEnvKeys() ([]string, error) { return s.envStorer.sensitiveEnvKeys() } -func (s *Session) SetEnv(k string, v string) error { - return s.envStorer.setEnv(k, v) +func (s *Session) SetEnv(context context.Context, k string, v string) error { + return s.envStorer.setEnv(context, k, v) } func (s *Session) Envs() ([]string, error) { @@ -121,7 +121,7 @@ func newRunnerStorer(sessionEnvs ...string) *runnerEnvStorer { } } -func (es *runnerEnvStorer) subscribe(ctx context.Context, snapshotc chan<- owl.SetVarItems) error { +func (es *runnerEnvStorer) subscribe(_ context.Context, snapshotc chan<- owl.SetVarItems) error { defer close(snapshotc) return fmt.Errorf("not available for runner env store") } @@ -130,7 +130,7 @@ func (es *runnerEnvStorer) complete() { // noop } -func (es *runnerEnvStorer) addEnvs(envs []string) error { +func (es *runnerEnvStorer) addEnvs(_ context.Context, envs []string) error { es.envStore.Add(envs...) return nil } @@ -152,12 +152,12 @@ func (es *runnerEnvStorer) envs() ([]string, error) { return envs, nil } -func (es *runnerEnvStorer) setEnv(k string, v string) error { +func (es *runnerEnvStorer) setEnv(_ context.Context, k string, v string) error { _, err := es.envStore.Set(k, v) return err } -func (es *runnerEnvStorer) updateStore(envs []string, newOrUpdated []string, deleted []string) error { +func (es *runnerEnvStorer) updateStore(_ context.Context, envs []string, newOrUpdated []string, deleted []string) error { es.envStore = newEnvStore(envs...).Add(newOrUpdated...).Delete(deleted...) return nil } @@ -227,7 +227,7 @@ func newOwlStorer(envs []string, proj *project.Project, logger *zap.Logger) (*ow }, nil } -func (es *owlEnvStorer) subscribe(ctx context.Context, snapshotc chan<- owl.SetVarItems) error { +func (es *owlEnvStorer) subscribe(context context.Context, snapshotc chan<- owl.SetVarItems) error { defer es.mu.Unlock() es.mu.Lock() es.logger.Debug("subscribed to owl store") @@ -235,7 +235,7 @@ func (es *owlEnvStorer) subscribe(ctx context.Context, snapshotc chan<- owl.SetV es.subscribers = append(es.subscribers, snapshotc) go func() { - <-ctx.Done() + <-context.Done() err := es.unsubscribe(snapshotc) if err != nil { es.logger.Error("unsubscribe from owl store failed", zap.Error(err)) @@ -298,16 +298,16 @@ func (es *owlEnvStorer) notifySubscribers() { } } -func (es *owlEnvStorer) updateStore(envs []string, newOrUpdated []string, deleted []string) error { - if err := es.owlStore.Update(newOrUpdated, deleted); err != nil { +func (es *owlEnvStorer) updateStore(context context.Context, envs []string, newOrUpdated []string, deleted []string) error { + if err := es.owlStore.Update(context, newOrUpdated, deleted); err != nil { return err } es.notifySubscribers() return nil } -func (es *owlEnvStorer) addEnvs(envs []string) error { - if err := es.owlStore.Update(envs, nil); err != nil { +func (es *owlEnvStorer) addEnvs(context context.Context, envs []string) error { + if err := es.owlStore.Update(context, envs, nil); err != nil { return err } es.notifySubscribers() @@ -326,9 +326,9 @@ func (es *owlEnvStorer) sensitiveEnvKeys() ([]string, error) { return vals, nil } -func (es *owlEnvStorer) setEnv(k string, v string) error { +func (es *owlEnvStorer) setEnv(context context.Context, k string, v string) error { // todo(sebastian): add checking env length inside Update - err := es.owlStore.Update([]string{fmt.Sprintf("%s=%s", k, v)}, nil) + err := es.owlStore.Update(context, []string{fmt.Sprintf("%s=%s", k, v)}, nil) if err != nil { return err } diff --git a/internal/runner/shell.go b/internal/runner/shell.go index 130a9cbc7..76c7a8866 100644 --- a/internal/runner/shell.go +++ b/internal/runner/shell.go @@ -51,6 +51,7 @@ func (s Shell) DryRun(ctx context.Context, w io.Writer) { func (s *Shell) Run(ctx context.Context) error { cmd, err := newCommand( + ctx, &commandConfig{ ProgramName: s.ProgramPath(), Directory: s.Dir, diff --git a/internal/runner/shellraw.go b/internal/runner/shellraw.go index a6ed882c5..d01cd7b40 100644 --- a/internal/runner/shellraw.go +++ b/internal/runner/shellraw.go @@ -30,6 +30,7 @@ func (s ShellRaw) DryRun(ctx context.Context, w io.Writer) { func (s ShellRaw) Run(ctx context.Context) error { cmd, err := newCommand( + ctx, &commandConfig{ ProgramName: s.ProgramPath(), Directory: s.Dir, diff --git a/internal/runner/tempfile.go b/internal/runner/tempfile.go index d83f8d2bd..b0a1c03d2 100644 --- a/internal/runner/tempfile.go +++ b/internal/runner/tempfile.go @@ -32,6 +32,7 @@ func (s TempFile) DryRun(ctx context.Context, w io.Writer) { func (s *TempFile) Run(ctx context.Context) error { cmd, err := newCommand( + ctx, &commandConfig{ ProgramName: s.ProgramName, LanguageID: s.LanguageID,