From 16aed76144af195ca0cd5b4effcf7e9999607458 Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Fri, 17 Sep 2021 11:30:16 +0200 Subject: [PATCH 1/3] Print step output into a single buffer This is a side-quest to EM2 https://github.com/sourcegraph/sourcegraph/issues/24421 because we realised that we'll make things harder for ourselves if we separate stdout/stderr into separate buffers. We'd have to zip them up together in the UI but that's hard without having additional timing information. But timing information is also a bit overkill (at least for now) so we thought we'd use a single buffer. That's what this PR here contains. Stdout and stderr are logged into a single buffer, each line prefixed with `stdout: ` and `stderr: ` respectively. That works because the `process.PipeOutput` function only writes lines (not chunks) to the passed in writers. So it works even if commands print half a line on stdout, then half a line on stderr, and only then the rest. --- internal/batches/executor/run_steps.go | 10 ++--- internal/batches/executor/ui.go | 25 ++++++----- internal/batches/ui/interval_writer.go | 47 +++++++++++++++------ internal/batches/ui/interval_writer_test.go | 31 ++++++++++---- internal/batches/ui/json_lines.go | 39 ++++++++--------- internal/batches/ui/task_exec_tui.go | 13 ++---- 6 files changed, 95 insertions(+), 70 deletions(-) diff --git a/internal/batches/executor/run_steps.go b/internal/batches/executor/run_steps.go index cdbe92d70f..e996e84308 100644 --- a/internal/batches/executor/run_steps.go +++ b/internal/batches/executor/run_steps.go @@ -315,16 +315,14 @@ func executeSingleStep( writerCtx, writerCancel := context.WithCancel(ctx) defer writerCancel() - uiStdoutWriter := opts.ui.StepStdoutWriter(writerCtx, opts.task, i) - uiStderrWriter := opts.ui.StepStderrWriter(writerCtx, opts.task, i) + outputWriter := opts.ui.StepOutputWriter(writerCtx, opts.task, i) defer func() { - uiStdoutWriter.Close() - uiStderrWriter.Close() + outputWriter.Close() }() var stdoutBuffer, stderrBuffer bytes.Buffer - stdout := io.MultiWriter(&stdoutBuffer, uiStdoutWriter, opts.logger.PrefixWriter("stdout")) - stderr := io.MultiWriter(&stderrBuffer, uiStderrWriter, opts.logger.PrefixWriter("stderr")) + stdout := io.MultiWriter(&stdoutBuffer, outputWriter.StdoutWriter(), opts.logger.PrefixWriter("stdout")) + stderr := io.MultiWriter(&stderrBuffer, outputWriter.StderrWriter(), opts.logger.PrefixWriter("stderr")) // Setup readers that pipe the output into the given buffers wg, err := process.PipeOutput(ctx, cmd, stdout, stderr) diff --git a/internal/batches/executor/ui.go b/internal/batches/executor/ui.go index 4bc5c4954e..2eedc04f83 100644 --- a/internal/batches/executor/ui.go +++ b/internal/batches/executor/ui.go @@ -20,6 +20,12 @@ type TaskExecutionUI interface { StepsExecutionUI(*Task) StepsExecutionUI } +type StepOutputWriter interface { + StdoutWriter() io.Writer + StderrWriter() io.Writer + Close() error +} + type StepsExecutionUI interface { ArchiveDownloadStarted() ArchiveDownloadFinished() @@ -34,8 +40,8 @@ type StepsExecutionUI interface { StepPreparing(int) StepStarted(int, string) - StepStdoutWriter(context.Context, *Task, int) io.WriteCloser - StepStderrWriter(context.Context, *Task, int) io.WriteCloser + StepOutputWriter(context.Context, *Task, int) StepOutputWriter + // StepStderrWriter(context.Context, *Task, int) io.WriteCloser CalculatingDiffStarted() CalculatingDiffFinished() @@ -54,19 +60,16 @@ func (noop NoopStepsExecUI) SkippingStepsUpto(startStep int) {} func (noop NoopStepsExecUI) StepSkipped(step int) {} func (noop NoopStepsExecUI) StepPreparing(step int) {} func (noop NoopStepsExecUI) StepStarted(step int, runScript string) {} -func (noop NoopStepsExecUI) StepStdoutWriter(ctx context.Context, task *Task, step int) io.WriteCloser { - return discardCloser{io.Discard} -} -func (noop NoopStepsExecUI) StepStderrWriter(ctx context.Context, task *Task, step int) io.WriteCloser { - return discardCloser{io.Discard} +func (noop NoopStepsExecUI) StepOutputWriter(ctx context.Context, task *Task, step int) StepOutputWriter { + return NoopStepOutputWriter{} } func (noop NoopStepsExecUI) CalculatingDiffStarted() {} func (noop NoopStepsExecUI) CalculatingDiffFinished() {} func (noop NoopStepsExecUI) StepFinished(idx int, diff []byte, changes *git.Changes, outputs map[string]interface{}) { } -type discardCloser struct { - io.Writer -} +type NoopStepOutputWriter struct{} -func (discardCloser) Close() error { return nil } +func (noop NoopStepOutputWriter) StdoutWriter() io.Writer { return io.Discard } +func (noop NoopStepOutputWriter) StderrWriter() io.Writer { return io.Discard } +func (noop NoopStepOutputWriter) Close() error { return nil } diff --git a/internal/batches/ui/interval_writer.go b/internal/batches/ui/interval_writer.go index 71380b7518..5bcc9d9e5d 100644 --- a/internal/batches/ui/interval_writer.go +++ b/internal/batches/ui/interval_writer.go @@ -3,6 +3,7 @@ package ui import ( "bytes" "context" + "io" "time" "github.com/derision-test/glock" @@ -52,14 +53,6 @@ func NewIntervalWriter(ctx context.Context, interval time.Duration, sink func(st return newIntervalWriter(ctx, glock.NewRealTicker(interval), sink) } -func (l *IntervalWriter) flush() { - if l.buf.Len() == 0 { - return - } - l.sink(l.buf.String()) - l.buf.Reset() -} - // Close flushes the func (l *IntervalWriter) Close() error { l.closed <- struct{}{} @@ -67,11 +60,20 @@ func (l *IntervalWriter) Close() error { return nil } -// Write handler of IntervalWriter. -func (l *IntervalWriter) Write(p []byte) (int, error) { - l.writes <- p - <-l.writesDone - return len(p), nil +func (l *IntervalWriter) StdoutWriter() io.Writer { + return &prefixedWriter{writes: l.writes, writesDone: l.writesDone, prefix: "stdout: "} +} + +func (l *IntervalWriter) StderrWriter() io.Writer { + return &prefixedWriter{writes: l.writes, writesDone: l.writesDone, prefix: "stderr: "} +} + +func (l *IntervalWriter) flush() { + if l.buf.Len() == 0 { + return + } + l.sink(l.buf.String()) + l.buf.Reset() } func (l *IntervalWriter) writeLines(ctx context.Context) { @@ -103,3 +105,22 @@ func (l *IntervalWriter) writeLines(ctx context.Context) { } } } + +type prefixedWriter struct { + writes chan []byte + writesDone chan struct{} + prefix string +} + +func (w *prefixedWriter) Write(p []byte) (int, error) { + var prefixedLines []byte + for _, line := range bytes.Split(p, []byte("\n")) { + prefixedLine := append([]byte(w.prefix), line...) + prefixedLine = append(prefixedLine, []byte("\n")...) + + prefixedLines = append(prefixedLines, prefixedLine...) + } + w.writes <- prefixedLines + <-w.writesDone + return len(p), nil +} diff --git a/internal/batches/ui/interval_writer_test.go b/internal/batches/ui/interval_writer_test.go index ec5bacdd40..28d676f465 100644 --- a/internal/batches/ui/interval_writer_test.go +++ b/internal/batches/ui/interval_writer_test.go @@ -20,7 +20,10 @@ func TestIntervalWriter(t *testing.T) { ticker := glock.NewMockTicker(1 * time.Second) writer := newIntervalWriter(ctx, ticker, sink) - writer.Write([]byte("1")) + stdoutWriter := writer.StdoutWriter() + stderrWriter := writer.StderrWriter() + stdoutWriter.Write([]byte("1")) + stderrWriter.Write([]byte("1")) select { case <-ch: t.Fatalf("ch has data") @@ -31,17 +34,22 @@ func TestIntervalWriter(t *testing.T) { select { case d := <-ch: - if d != "1" { - t.Fatalf("wrong data in sink") + want := "stdout: 1\nstderr: 1\n" + if d != want { + t.Fatalf("wrong data in sink. want=%q, have=%q", want, d) } case <-time.After(1 * time.Second): t.Fatalf("ch has NO data") } - writer.Write([]byte("2")) - writer.Write([]byte("3")) - writer.Write([]byte("4")) - writer.Write([]byte("5")) + stdoutWriter.Write([]byte("2")) + stderrWriter.Write([]byte("2")) + stdoutWriter.Write([]byte("3")) + stderrWriter.Write([]byte("3")) + stdoutWriter.Write([]byte("4")) + stderrWriter.Write([]byte("4")) + stdoutWriter.Write([]byte("5")) + stderrWriter.Write([]byte("5")) select { case <-ch: @@ -54,8 +62,13 @@ func TestIntervalWriter(t *testing.T) { select { case d := <-ch: - if d != "2345" { - t.Fatalf("wrong data in sink") + want := "stdout: 2\nstderr: 2\n" + + "stdout: 3\nstderr: 3\n" + + "stdout: 4\nstderr: 4\n" + + "stdout: 5\nstderr: 5\n" + + if d != want { + t.Fatalf("wrong data in sink. want") } case <-time.After(1 * time.Second): t.Fatalf("ch has NO data") diff --git a/internal/batches/ui/json_lines.go b/internal/batches/ui/json_lines.go index 528358127b..cbd5124306 100644 --- a/internal/batches/ui/json_lines.go +++ b/internal/batches/ui/json_lines.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "math/rand" "os" "strconv" @@ -325,36 +324,34 @@ func (ui *stepsExecutionJSONLines) StepStarted(step int, runScript string) { logOperationStart(batcheslib.LogEventOperationTaskStep, map[string]interface{}{"taskID": ui.linesTask.ID, "step": step, "runScript": runScript}) } -func (ui *stepsExecutionJSONLines) StepStdoutWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser { +func (ui *stepsExecutionJSONLines) StepOutputWriter(ctx context.Context, task *executor.Task, step int) executor.StepOutputWriter { sink := func(data string) { logOperationProgress( batcheslib.LogEventOperationTaskStep, map[string]interface{}{ - "taskID": ui.linesTask.ID, - "step": step, - "out": data, - "output_type": "stdout", + "taskID": ui.linesTask.ID, + "step": step, + "out": data, }, ) } return NewIntervalWriter(ctx, stepFlushDuration, sink) } -func (ui *stepsExecutionJSONLines) StepStderrWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser { - sink := func(data string) { - logOperationProgress( - batcheslib.LogEventOperationTaskStep, - map[string]interface{}{ - "taskID": ui.linesTask.ID, - "step": step, - "out": data, - "output_type": "stderr", - }, - ) - } - - return NewIntervalWriter(ctx, stepFlushDuration, sink) -} +// func (ui *stepsExecutionJSONLines) StepStderrWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser { +// sink := func(data string) { +// logOperationProgress( +// batcheslib.LogEventOperationTaskStep, +// map[string]interface{}{ +// "taskID": ui.linesTask.ID, +// "step": step, +// "out": data, +// }, +// ) +// } + +// return NewIntervalWriter(ctx, stepFlushDuration, sink) +// } func (ui *stepsExecutionJSONLines) StepFinished(step int, diff []byte, changes *git.Changes, outputs map[string]interface{}) { logOperationSuccess( diff --git a/internal/batches/ui/task_exec_tui.go b/internal/batches/ui/task_exec_tui.go index 6f41cd4e59..a19d9d40a1 100644 --- a/internal/batches/ui/task_exec_tui.go +++ b/internal/batches/ui/task_exec_tui.go @@ -3,7 +3,6 @@ package ui import ( "context" "fmt" - "io" "sort" "strings" "sync" @@ -458,12 +457,10 @@ func (ui stepsExecTUI) StepStarted(step int, runScript string) { ui.updateStatusBar(runScript) } -func (ui stepsExecTUI) StepStdoutWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser { - return discardCloser{io.Discard} -} -func (ui stepsExecTUI) StepStderrWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser { - return discardCloser{io.Discard} +func (ui stepsExecTUI) StepOutputWriter(ctx context.Context, task *executor.Task, step int) executor.StepOutputWriter { + return executor.NoopStepOutputWriter{} } + func (ui stepsExecTUI) CalculatingDiffStarted() { ui.updateStatusBar("Calculating diff") } @@ -473,7 +470,3 @@ func (ui stepsExecTUI) CalculatingDiffFinished() { func (ui stepsExecTUI) StepFinished(idx int, diff []byte, changes *git.Changes, outputs map[string]interface{}) { // noop right now } - -type discardCloser struct{ io.Writer } - -func (discardCloser) Close() error { return nil } From 6db3480e97d23034a9a334ad35cafde6d542dad8 Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Fri, 17 Sep 2021 13:45:41 +0200 Subject: [PATCH 2/3] Clean up --- internal/batches/executor/ui.go | 1 - internal/batches/ui/interval_writer.go | 44 +++++++++++---------- internal/batches/ui/interval_writer_test.go | 2 +- internal/batches/ui/json_lines.go | 19 +-------- 4 files changed, 27 insertions(+), 39 deletions(-) diff --git a/internal/batches/executor/ui.go b/internal/batches/executor/ui.go index 2eedc04f83..3a0b56f83b 100644 --- a/internal/batches/executor/ui.go +++ b/internal/batches/executor/ui.go @@ -41,7 +41,6 @@ type StepsExecutionUI interface { StepStarted(int, string) StepOutputWriter(context.Context, *Task, int) StepOutputWriter - // StepStderrWriter(context.Context, *Task, int) io.WriteCloser CalculatingDiffStarted() CalculatingDiffFinished() diff --git a/internal/batches/ui/interval_writer.go b/internal/batches/ui/interval_writer.go index 5bcc9d9e5d..1cc285446f 100644 --- a/internal/batches/ui/interval_writer.go +++ b/internal/batches/ui/interval_writer.go @@ -9,9 +9,9 @@ import ( "github.com/derision-test/glock" ) -// IntervalWriter is a io.Writer that flushes to the given sink on the given -// interval. -type IntervalWriter struct { +// IntervalProcessWriter accepts stdout/stderr writes from processes, prefixed +// them accordingly, and flushes to the given sink on the given interval. +type IntervalProcessWriter struct { sink func(string) ticker glock.Ticker @@ -26,8 +26,8 @@ type IntervalWriter struct { done chan struct{} } -func newIntervalWriter(ctx context.Context, ticker glock.Ticker, sink func(string)) *IntervalWriter { - l := &IntervalWriter{ +func newIntervalProcessWriter(ctx context.Context, ticker glock.Ticker, sink func(string)) *IntervalProcessWriter { + l := &IntervalProcessWriter{ sink: sink, ticker: ticker, @@ -45,30 +45,34 @@ func newIntervalWriter(ctx context.Context, ticker glock.Ticker, sink func(strin return l } -// NewLogger returns a new Logger instance and spawns a goroutine in the -// background that regularily flushed the logged output to the given sink. +// NewIntervalProcessWriter returns a new IntervalProcessWriter instance and +// spawns a goroutine in the background that regularily flushed the logged +// output to the given sink. // // If the passed in ctx is canceled the goroutine will exit. -func NewIntervalWriter(ctx context.Context, interval time.Duration, sink func(string)) *IntervalWriter { - return newIntervalWriter(ctx, glock.NewRealTicker(interval), sink) +func NewIntervalProcessWriter(ctx context.Context, interval time.Duration, sink func(string)) *IntervalProcessWriter { + return newIntervalProcessWriter(ctx, glock.NewRealTicker(interval), sink) } -// Close flushes the -func (l *IntervalWriter) Close() error { - l.closed <- struct{}{} - <-l.done - return nil -} - -func (l *IntervalWriter) StdoutWriter() io.Writer { +// StdoutWriter returns an io.Writer that prefixes every line with "stdout: " +func (l *IntervalProcessWriter) StdoutWriter() io.Writer { return &prefixedWriter{writes: l.writes, writesDone: l.writesDone, prefix: "stdout: "} } -func (l *IntervalWriter) StderrWriter() io.Writer { +// SterrWriter returns an io.Writer that prefixes every line with "stderr: " +func (l *IntervalProcessWriter) StderrWriter() io.Writer { return &prefixedWriter{writes: l.writes, writesDone: l.writesDone, prefix: "stderr: "} } -func (l *IntervalWriter) flush() { +// Close blocks until all pending writes have been flushed to the buffer. It +// then causes the underlying goroutine to exit. +func (l *IntervalProcessWriter) Close() error { + l.closed <- struct{}{} + <-l.done + return nil +} + +func (l *IntervalProcessWriter) flush() { if l.buf.Len() == 0 { return } @@ -76,7 +80,7 @@ func (l *IntervalWriter) flush() { l.buf.Reset() } -func (l *IntervalWriter) writeLines(ctx context.Context) { +func (l *IntervalProcessWriter) writeLines(ctx context.Context) { defer func() { l.flush() l.ticker.Stop() diff --git a/internal/batches/ui/interval_writer_test.go b/internal/batches/ui/interval_writer_test.go index 28d676f465..1a56eb4288 100644 --- a/internal/batches/ui/interval_writer_test.go +++ b/internal/batches/ui/interval_writer_test.go @@ -18,7 +18,7 @@ func TestIntervalWriter(t *testing.T) { } ticker := glock.NewMockTicker(1 * time.Second) - writer := newIntervalWriter(ctx, ticker, sink) + writer := newIntervalProcessWriter(ctx, ticker, sink) stdoutWriter := writer.StdoutWriter() stderrWriter := writer.StderrWriter() diff --git a/internal/batches/ui/json_lines.go b/internal/batches/ui/json_lines.go index cbd5124306..c1876a130c 100644 --- a/internal/batches/ui/json_lines.go +++ b/internal/batches/ui/json_lines.go @@ -335,23 +335,8 @@ func (ui *stepsExecutionJSONLines) StepOutputWriter(ctx context.Context, task *e }, ) } - return NewIntervalWriter(ctx, stepFlushDuration, sink) -} - -// func (ui *stepsExecutionJSONLines) StepStderrWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser { -// sink := func(data string) { -// logOperationProgress( -// batcheslib.LogEventOperationTaskStep, -// map[string]interface{}{ -// "taskID": ui.linesTask.ID, -// "step": step, -// "out": data, -// }, -// ) -// } - -// return NewIntervalWriter(ctx, stepFlushDuration, sink) -// } + return NewIntervalProcessWriter(ctx, stepFlushDuration, sink) +} func (ui *stepsExecutionJSONLines) StepFinished(step int, diff []byte, changes *git.Changes, outputs map[string]interface{}) { logOperationSuccess( From 6a3c7f382e146f286e979058bad7f579f79db245 Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Fri, 17 Sep 2021 13:46:55 +0200 Subject: [PATCH 3/3] Add changelog entry --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 733f199fd3..3a01bb28ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ All notable changes to `src-cli` are documented in this file. ### Changed +- For internal use only: when `src batch [preview|apply|exec]` are executed in `-text-only` mode, command output on stdout/stderr will be logged in the same message, with each line prefixed accordingly. [#619](https://github.com/sourcegraph/src-cli/pull/619) + ### Fixed ### Removed