Skip to content

Commit

Permalink
Print step output into a single buffer (#619)
Browse files Browse the repository at this point in the history
This is a side-quest to EM2
https://github.com/sourcegraph/sourcegraph/issues/24421 because we
realised that we'll make things harder for ourselves if we separate
stdout/stderr into separate buffers.

We'd have to zip them up together
in the UI but that's hard without having additional timing information.

But timing information is also a bit overkill (at least for now) so we
thought we'd use a single buffer.

That's what this PR here contains.

Stdout and stderr are logged into a single buffer, each line prefixed with
`stdout: ` and `stderr: ` respectively.

That works because the `process.PipeOutput` function only writes lines
(not chunks) to the passed in writers. So it works even if commands print half a
line on stdout, then half a line on stderr, and only then the rest.
  • Loading branch information
mrnugget authored Sep 17, 2021
1 parent 6bffe0f commit 157a86d
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 83 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ All notable changes to `src-cli` are documented in this file.

### Changed

- For internal use only: when `src batch [preview|apply|exec]` are executed in `-text-only` mode, command output on stdout/stderr will be logged in the same message, with each line prefixed accordingly. [#619](https://github.com/sourcegraph/src-cli/pull/619)

### Fixed

### Removed
Expand Down
10 changes: 4 additions & 6 deletions internal/batches/executor/run_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,14 @@ func executeSingleStep(

writerCtx, writerCancel := context.WithCancel(ctx)
defer writerCancel()
uiStdoutWriter := opts.ui.StepStdoutWriter(writerCtx, opts.task, i)
uiStderrWriter := opts.ui.StepStderrWriter(writerCtx, opts.task, i)
outputWriter := opts.ui.StepOutputWriter(writerCtx, opts.task, i)
defer func() {
uiStdoutWriter.Close()
uiStderrWriter.Close()
outputWriter.Close()
}()

var stdoutBuffer, stderrBuffer bytes.Buffer
stdout := io.MultiWriter(&stdoutBuffer, uiStdoutWriter, opts.logger.PrefixWriter("stdout"))
stderr := io.MultiWriter(&stderrBuffer, uiStderrWriter, opts.logger.PrefixWriter("stderr"))
stdout := io.MultiWriter(&stdoutBuffer, outputWriter.StdoutWriter(), opts.logger.PrefixWriter("stdout"))
stderr := io.MultiWriter(&stderrBuffer, outputWriter.StderrWriter(), opts.logger.PrefixWriter("stderr"))

// Setup readers that pipe the output into the given buffers
wg, err := process.PipeOutput(ctx, cmd, stdout, stderr)
Expand Down
24 changes: 13 additions & 11 deletions internal/batches/executor/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ type TaskExecutionUI interface {
StepsExecutionUI(*Task) StepsExecutionUI
}

type StepOutputWriter interface {
StdoutWriter() io.Writer
StderrWriter() io.Writer
Close() error
}

type StepsExecutionUI interface {
ArchiveDownloadStarted()
ArchiveDownloadFinished()
Expand All @@ -34,8 +40,7 @@ type StepsExecutionUI interface {
StepPreparing(int)
StepStarted(int, string)

StepStdoutWriter(context.Context, *Task, int) io.WriteCloser
StepStderrWriter(context.Context, *Task, int) io.WriteCloser
StepOutputWriter(context.Context, *Task, int) StepOutputWriter

CalculatingDiffStarted()
CalculatingDiffFinished()
Expand All @@ -54,19 +59,16 @@ func (noop NoopStepsExecUI) SkippingStepsUpto(startStep int) {}
func (noop NoopStepsExecUI) StepSkipped(step int) {}
func (noop NoopStepsExecUI) StepPreparing(step int) {}
func (noop NoopStepsExecUI) StepStarted(step int, runScript string) {}
func (noop NoopStepsExecUI) StepStdoutWriter(ctx context.Context, task *Task, step int) io.WriteCloser {
return discardCloser{io.Discard}
}
func (noop NoopStepsExecUI) StepStderrWriter(ctx context.Context, task *Task, step int) io.WriteCloser {
return discardCloser{io.Discard}
func (noop NoopStepsExecUI) StepOutputWriter(ctx context.Context, task *Task, step int) StepOutputWriter {
return NoopStepOutputWriter{}
}
func (noop NoopStepsExecUI) CalculatingDiffStarted() {}
func (noop NoopStepsExecUI) CalculatingDiffFinished() {}
func (noop NoopStepsExecUI) StepFinished(idx int, diff []byte, changes *git.Changes, outputs map[string]interface{}) {
}

type discardCloser struct {
io.Writer
}
type NoopStepOutputWriter struct{}

func (discardCloser) Close() error { return nil }
func (noop NoopStepOutputWriter) StdoutWriter() io.Writer { return io.Discard }
func (noop NoopStepOutputWriter) StderrWriter() io.Writer { return io.Discard }
func (noop NoopStepOutputWriter) Close() error { return nil }
71 changes: 48 additions & 23 deletions internal/batches/ui/interval_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package ui
import (
"bytes"
"context"
"io"
"time"

"github.com/derision-test/glock"
)

// IntervalWriter is a io.Writer that flushes to the given sink on the given
// interval.
type IntervalWriter struct {
// IntervalProcessWriter accepts stdout/stderr writes from processes, prefixed
// them accordingly, and flushes to the given sink on the given interval.
type IntervalProcessWriter struct {
sink func(string)

ticker glock.Ticker
Expand All @@ -25,8 +26,8 @@ type IntervalWriter struct {
done chan struct{}
}

func newIntervalWriter(ctx context.Context, ticker glock.Ticker, sink func(string)) *IntervalWriter {
l := &IntervalWriter{
func newIntervalProcessWriter(ctx context.Context, ticker glock.Ticker, sink func(string)) *IntervalProcessWriter {
l := &IntervalProcessWriter{
sink: sink,
ticker: ticker,

Expand All @@ -44,37 +45,42 @@ func newIntervalWriter(ctx context.Context, ticker glock.Ticker, sink func(strin
return l
}

// NewLogger returns a new Logger instance and spawns a goroutine in the
// background that regularily flushed the logged output to the given sink.
// NewIntervalProcessWriter returns a new IntervalProcessWriter instance and
// spawns a goroutine in the background that regularily flushed the logged
// output to the given sink.
//
// If the passed in ctx is canceled the goroutine will exit.
func NewIntervalWriter(ctx context.Context, interval time.Duration, sink func(string)) *IntervalWriter {
return newIntervalWriter(ctx, glock.NewRealTicker(interval), sink)
func NewIntervalProcessWriter(ctx context.Context, interval time.Duration, sink func(string)) *IntervalProcessWriter {
return newIntervalProcessWriter(ctx, glock.NewRealTicker(interval), sink)
}

func (l *IntervalWriter) flush() {
if l.buf.Len() == 0 {
return
}
l.sink(l.buf.String())
l.buf.Reset()
// StdoutWriter returns an io.Writer that prefixes every line with "stdout: "
func (l *IntervalProcessWriter) StdoutWriter() io.Writer {
return &prefixedWriter{writes: l.writes, writesDone: l.writesDone, prefix: "stdout: "}
}

// Close flushes the
func (l *IntervalWriter) Close() error {
// SterrWriter returns an io.Writer that prefixes every line with "stderr: "
func (l *IntervalProcessWriter) StderrWriter() io.Writer {
return &prefixedWriter{writes: l.writes, writesDone: l.writesDone, prefix: "stderr: "}
}

// Close blocks until all pending writes have been flushed to the buffer. It
// then causes the underlying goroutine to exit.
func (l *IntervalProcessWriter) Close() error {
l.closed <- struct{}{}
<-l.done
return nil
}

// Write handler of IntervalWriter.
func (l *IntervalWriter) Write(p []byte) (int, error) {
l.writes <- p
<-l.writesDone
return len(p), nil
func (l *IntervalProcessWriter) flush() {
if l.buf.Len() == 0 {
return
}
l.sink(l.buf.String())
l.buf.Reset()
}

func (l *IntervalWriter) writeLines(ctx context.Context) {
func (l *IntervalProcessWriter) writeLines(ctx context.Context) {
defer func() {
l.flush()
l.ticker.Stop()
Expand Down Expand Up @@ -103,3 +109,22 @@ func (l *IntervalWriter) writeLines(ctx context.Context) {
}
}
}

type prefixedWriter struct {
writes chan []byte
writesDone chan struct{}
prefix string
}

func (w *prefixedWriter) Write(p []byte) (int, error) {
var prefixedLines []byte
for _, line := range bytes.Split(p, []byte("\n")) {
prefixedLine := append([]byte(w.prefix), line...)
prefixedLine = append(prefixedLine, []byte("\n")...)

prefixedLines = append(prefixedLines, prefixedLine...)
}
w.writes <- prefixedLines
<-w.writesDone
return len(p), nil
}
33 changes: 23 additions & 10 deletions internal/batches/ui/interval_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ func TestIntervalWriter(t *testing.T) {
}

ticker := glock.NewMockTicker(1 * time.Second)
writer := newIntervalWriter(ctx, ticker, sink)
writer := newIntervalProcessWriter(ctx, ticker, sink)

writer.Write([]byte("1"))
stdoutWriter := writer.StdoutWriter()
stderrWriter := writer.StderrWriter()
stdoutWriter.Write([]byte("1"))
stderrWriter.Write([]byte("1"))
select {
case <-ch:
t.Fatalf("ch has data")
Expand All @@ -31,17 +34,22 @@ func TestIntervalWriter(t *testing.T) {

select {
case d := <-ch:
if d != "1" {
t.Fatalf("wrong data in sink")
want := "stdout: 1\nstderr: 1\n"
if d != want {
t.Fatalf("wrong data in sink. want=%q, have=%q", want, d)
}
case <-time.After(1 * time.Second):
t.Fatalf("ch has NO data")
}

writer.Write([]byte("2"))
writer.Write([]byte("3"))
writer.Write([]byte("4"))
writer.Write([]byte("5"))
stdoutWriter.Write([]byte("2"))
stderrWriter.Write([]byte("2"))
stdoutWriter.Write([]byte("3"))
stderrWriter.Write([]byte("3"))
stdoutWriter.Write([]byte("4"))
stderrWriter.Write([]byte("4"))
stdoutWriter.Write([]byte("5"))
stderrWriter.Write([]byte("5"))

select {
case <-ch:
Expand All @@ -54,8 +62,13 @@ func TestIntervalWriter(t *testing.T) {

select {
case d := <-ch:
if d != "2345" {
t.Fatalf("wrong data in sink")
want := "stdout: 2\nstderr: 2\n" +
"stdout: 3\nstderr: 3\n" +
"stdout: 4\nstderr: 4\n" +
"stdout: 5\nstderr: 5\n"

if d != want {
t.Fatalf("wrong data in sink. want")
}
case <-time.After(1 * time.Second):
t.Fatalf("ch has NO data")
Expand Down
28 changes: 5 additions & 23 deletions internal/batches/ui/json_lines.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"math/rand"
"os"
"strconv"
Expand Down Expand Up @@ -325,35 +324,18 @@ func (ui *stepsExecutionJSONLines) StepStarted(step int, runScript string) {
logOperationStart(batcheslib.LogEventOperationTaskStep, map[string]interface{}{"taskID": ui.linesTask.ID, "step": step, "runScript": runScript})
}

func (ui *stepsExecutionJSONLines) StepStdoutWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser {
func (ui *stepsExecutionJSONLines) StepOutputWriter(ctx context.Context, task *executor.Task, step int) executor.StepOutputWriter {
sink := func(data string) {
logOperationProgress(
batcheslib.LogEventOperationTaskStep,
map[string]interface{}{
"taskID": ui.linesTask.ID,
"step": step,
"out": data,
"output_type": "stdout",
"taskID": ui.linesTask.ID,
"step": step,
"out": data,
},
)
}
return NewIntervalWriter(ctx, stepFlushDuration, sink)
}

func (ui *stepsExecutionJSONLines) StepStderrWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser {
sink := func(data string) {
logOperationProgress(
batcheslib.LogEventOperationTaskStep,
map[string]interface{}{
"taskID": ui.linesTask.ID,
"step": step,
"out": data,
"output_type": "stderr",
},
)
}

return NewIntervalWriter(ctx, stepFlushDuration, sink)
return NewIntervalProcessWriter(ctx, stepFlushDuration, sink)
}

func (ui *stepsExecutionJSONLines) StepFinished(step int, diff []byte, changes *git.Changes, outputs map[string]interface{}) {
Expand Down
13 changes: 3 additions & 10 deletions internal/batches/ui/task_exec_tui.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ui
import (
"context"
"fmt"
"io"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -458,12 +457,10 @@ func (ui stepsExecTUI) StepStarted(step int, runScript string) {
ui.updateStatusBar(runScript)
}

func (ui stepsExecTUI) StepStdoutWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser {
return discardCloser{io.Discard}
}
func (ui stepsExecTUI) StepStderrWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser {
return discardCloser{io.Discard}
func (ui stepsExecTUI) StepOutputWriter(ctx context.Context, task *executor.Task, step int) executor.StepOutputWriter {
return executor.NoopStepOutputWriter{}
}

func (ui stepsExecTUI) CalculatingDiffStarted() {
ui.updateStatusBar("Calculating diff")
}
Expand All @@ -473,7 +470,3 @@ func (ui stepsExecTUI) CalculatingDiffFinished() {
func (ui stepsExecTUI) StepFinished(idx int, diff []byte, changes *git.Changes, outputs map[string]interface{}) {
// noop right now
}

type discardCloser struct{ io.Writer }

func (discardCloser) Close() error { return nil }

0 comments on commit 157a86d

Please sign in to comment.