Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Print step output into a single buffer #619

Merged
merged 3 commits into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ All notable changes to `src-cli` are documented in this file.

### Changed

- For internal use only: when `src batch [preview|apply|exec]` are executed in `-text-only` mode, command output on stdout/stderr will be logged in the same message, with each line prefixed accordingly. [#619](https://github.com/sourcegraph/src-cli/pull/619)

### Fixed

### Removed
Expand Down
10 changes: 4 additions & 6 deletions internal/batches/executor/run_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,14 @@ func executeSingleStep(

writerCtx, writerCancel := context.WithCancel(ctx)
defer writerCancel()
uiStdoutWriter := opts.ui.StepStdoutWriter(writerCtx, opts.task, i)
uiStderrWriter := opts.ui.StepStderrWriter(writerCtx, opts.task, i)
outputWriter := opts.ui.StepOutputWriter(writerCtx, opts.task, i)
defer func() {
uiStdoutWriter.Close()
uiStderrWriter.Close()
outputWriter.Close()
}()

var stdoutBuffer, stderrBuffer bytes.Buffer
stdout := io.MultiWriter(&stdoutBuffer, uiStdoutWriter, opts.logger.PrefixWriter("stdout"))
stderr := io.MultiWriter(&stderrBuffer, uiStderrWriter, opts.logger.PrefixWriter("stderr"))
stdout := io.MultiWriter(&stdoutBuffer, outputWriter.StdoutWriter(), opts.logger.PrefixWriter("stdout"))
stderr := io.MultiWriter(&stderrBuffer, outputWriter.StderrWriter(), opts.logger.PrefixWriter("stderr"))

// Setup readers that pipe the output into the given buffers
wg, err := process.PipeOutput(ctx, cmd, stdout, stderr)
Expand Down
24 changes: 13 additions & 11 deletions internal/batches/executor/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ type TaskExecutionUI interface {
StepsExecutionUI(*Task) StepsExecutionUI
}

type StepOutputWriter interface {
StdoutWriter() io.Writer
StderrWriter() io.Writer
Close() error
}

type StepsExecutionUI interface {
ArchiveDownloadStarted()
ArchiveDownloadFinished()
Expand All @@ -34,8 +40,7 @@ type StepsExecutionUI interface {
StepPreparing(int)
StepStarted(int, string)

StepStdoutWriter(context.Context, *Task, int) io.WriteCloser
StepStderrWriter(context.Context, *Task, int) io.WriteCloser
StepOutputWriter(context.Context, *Task, int) StepOutputWriter

CalculatingDiffStarted()
CalculatingDiffFinished()
Expand All @@ -54,19 +59,16 @@ func (noop NoopStepsExecUI) SkippingStepsUpto(startStep int) {}
func (noop NoopStepsExecUI) StepSkipped(step int) {}
func (noop NoopStepsExecUI) StepPreparing(step int) {}
func (noop NoopStepsExecUI) StepStarted(step int, runScript string) {}
func (noop NoopStepsExecUI) StepStdoutWriter(ctx context.Context, task *Task, step int) io.WriteCloser {
return discardCloser{io.Discard}
}
func (noop NoopStepsExecUI) StepStderrWriter(ctx context.Context, task *Task, step int) io.WriteCloser {
return discardCloser{io.Discard}
func (noop NoopStepsExecUI) StepOutputWriter(ctx context.Context, task *Task, step int) StepOutputWriter {
return NoopStepOutputWriter{}
}
func (noop NoopStepsExecUI) CalculatingDiffStarted() {}
func (noop NoopStepsExecUI) CalculatingDiffFinished() {}
func (noop NoopStepsExecUI) StepFinished(idx int, diff []byte, changes *git.Changes, outputs map[string]interface{}) {
}

type discardCloser struct {
io.Writer
}
type NoopStepOutputWriter struct{}

func (discardCloser) Close() error { return nil }
func (noop NoopStepOutputWriter) StdoutWriter() io.Writer { return io.Discard }
func (noop NoopStepOutputWriter) StderrWriter() io.Writer { return io.Discard }
func (noop NoopStepOutputWriter) Close() error { return nil }
71 changes: 48 additions & 23 deletions internal/batches/ui/interval_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package ui
import (
"bytes"
"context"
"io"
"time"

"github.com/derision-test/glock"
)

// IntervalWriter is a io.Writer that flushes to the given sink on the given
// interval.
type IntervalWriter struct {
// IntervalProcessWriter accepts stdout/stderr writes from processes, prefixed
// them accordingly, and flushes to the given sink on the given interval.
type IntervalProcessWriter struct {
sink func(string)

ticker glock.Ticker
Expand All @@ -25,8 +26,8 @@ type IntervalWriter struct {
done chan struct{}
}

func newIntervalWriter(ctx context.Context, ticker glock.Ticker, sink func(string)) *IntervalWriter {
l := &IntervalWriter{
func newIntervalProcessWriter(ctx context.Context, ticker glock.Ticker, sink func(string)) *IntervalProcessWriter {
l := &IntervalProcessWriter{
sink: sink,
ticker: ticker,

Expand All @@ -44,37 +45,42 @@ func newIntervalWriter(ctx context.Context, ticker glock.Ticker, sink func(strin
return l
}

// NewLogger returns a new Logger instance and spawns a goroutine in the
// background that regularily flushed the logged output to the given sink.
// NewIntervalProcessWriter returns a new IntervalProcessWriter instance and
// spawns a goroutine in the background that regularily flushed the logged
// output to the given sink.
//
// If the passed in ctx is canceled the goroutine will exit.
func NewIntervalWriter(ctx context.Context, interval time.Duration, sink func(string)) *IntervalWriter {
return newIntervalWriter(ctx, glock.NewRealTicker(interval), sink)
func NewIntervalProcessWriter(ctx context.Context, interval time.Duration, sink func(string)) *IntervalProcessWriter {
return newIntervalProcessWriter(ctx, glock.NewRealTicker(interval), sink)
}

func (l *IntervalWriter) flush() {
if l.buf.Len() == 0 {
return
}
l.sink(l.buf.String())
l.buf.Reset()
// StdoutWriter returns an io.Writer that prefixes every line with "stdout: "
func (l *IntervalProcessWriter) StdoutWriter() io.Writer {
return &prefixedWriter{writes: l.writes, writesDone: l.writesDone, prefix: "stdout: "}
}

// Close flushes the
func (l *IntervalWriter) Close() error {
// SterrWriter returns an io.Writer that prefixes every line with "stderr: "
func (l *IntervalProcessWriter) StderrWriter() io.Writer {
return &prefixedWriter{writes: l.writes, writesDone: l.writesDone, prefix: "stderr: "}
}

// Close blocks until all pending writes have been flushed to the buffer. It
// then causes the underlying goroutine to exit.
func (l *IntervalProcessWriter) Close() error {
l.closed <- struct{}{}
<-l.done
return nil
}

// Write handler of IntervalWriter.
func (l *IntervalWriter) Write(p []byte) (int, error) {
l.writes <- p
<-l.writesDone
return len(p), nil
func (l *IntervalProcessWriter) flush() {
if l.buf.Len() == 0 {
return
}
l.sink(l.buf.String())
l.buf.Reset()
}

func (l *IntervalWriter) writeLines(ctx context.Context) {
func (l *IntervalProcessWriter) writeLines(ctx context.Context) {
defer func() {
l.flush()
l.ticker.Stop()
Expand Down Expand Up @@ -103,3 +109,22 @@ func (l *IntervalWriter) writeLines(ctx context.Context) {
}
}
}

type prefixedWriter struct {
writes chan []byte
writesDone chan struct{}
prefix string
}

func (w *prefixedWriter) Write(p []byte) (int, error) {
var prefixedLines []byte
for _, line := range bytes.Split(p, []byte("\n")) {
prefixedLine := append([]byte(w.prefix), line...)
prefixedLine = append(prefixedLine, []byte("\n")...)

prefixedLines = append(prefixedLines, prefixedLine...)
}
w.writes <- prefixedLines
<-w.writesDone
return len(p), nil
}
33 changes: 23 additions & 10 deletions internal/batches/ui/interval_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ func TestIntervalWriter(t *testing.T) {
}

ticker := glock.NewMockTicker(1 * time.Second)
writer := newIntervalWriter(ctx, ticker, sink)
writer := newIntervalProcessWriter(ctx, ticker, sink)

writer.Write([]byte("1"))
stdoutWriter := writer.StdoutWriter()
stderrWriter := writer.StderrWriter()
stdoutWriter.Write([]byte("1"))
stderrWriter.Write([]byte("1"))
select {
case <-ch:
t.Fatalf("ch has data")
Expand All @@ -31,17 +34,22 @@ func TestIntervalWriter(t *testing.T) {

select {
case d := <-ch:
if d != "1" {
t.Fatalf("wrong data in sink")
want := "stdout: 1\nstderr: 1\n"
if d != want {
t.Fatalf("wrong data in sink. want=%q, have=%q", want, d)
}
case <-time.After(1 * time.Second):
t.Fatalf("ch has NO data")
}

writer.Write([]byte("2"))
writer.Write([]byte("3"))
writer.Write([]byte("4"))
writer.Write([]byte("5"))
stdoutWriter.Write([]byte("2"))
stderrWriter.Write([]byte("2"))
stdoutWriter.Write([]byte("3"))
stderrWriter.Write([]byte("3"))
stdoutWriter.Write([]byte("4"))
stderrWriter.Write([]byte("4"))
stdoutWriter.Write([]byte("5"))
stderrWriter.Write([]byte("5"))

select {
case <-ch:
Expand All @@ -54,8 +62,13 @@ func TestIntervalWriter(t *testing.T) {

select {
case d := <-ch:
if d != "2345" {
t.Fatalf("wrong data in sink")
want := "stdout: 2\nstderr: 2\n" +
"stdout: 3\nstderr: 3\n" +
"stdout: 4\nstderr: 4\n" +
"stdout: 5\nstderr: 5\n"

if d != want {
t.Fatalf("wrong data in sink. want")
}
case <-time.After(1 * time.Second):
t.Fatalf("ch has NO data")
Expand Down
28 changes: 5 additions & 23 deletions internal/batches/ui/json_lines.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"math/rand"
"os"
"strconv"
Expand Down Expand Up @@ -325,35 +324,18 @@ func (ui *stepsExecutionJSONLines) StepStarted(step int, runScript string) {
logOperationStart(batcheslib.LogEventOperationTaskStep, map[string]interface{}{"taskID": ui.linesTask.ID, "step": step, "runScript": runScript})
}

func (ui *stepsExecutionJSONLines) StepStdoutWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser {
func (ui *stepsExecutionJSONLines) StepOutputWriter(ctx context.Context, task *executor.Task, step int) executor.StepOutputWriter {
sink := func(data string) {
logOperationProgress(
batcheslib.LogEventOperationTaskStep,
map[string]interface{}{
"taskID": ui.linesTask.ID,
"step": step,
"out": data,
"output_type": "stdout",
"taskID": ui.linesTask.ID,
"step": step,
"out": data,
},
)
}
return NewIntervalWriter(ctx, stepFlushDuration, sink)
}

func (ui *stepsExecutionJSONLines) StepStderrWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser {
sink := func(data string) {
logOperationProgress(
batcheslib.LogEventOperationTaskStep,
map[string]interface{}{
"taskID": ui.linesTask.ID,
"step": step,
"out": data,
"output_type": "stderr",
},
)
}

return NewIntervalWriter(ctx, stepFlushDuration, sink)
return NewIntervalProcessWriter(ctx, stepFlushDuration, sink)
}

func (ui *stepsExecutionJSONLines) StepFinished(step int, diff []byte, changes *git.Changes, outputs map[string]interface{}) {
Expand Down
13 changes: 3 additions & 10 deletions internal/batches/ui/task_exec_tui.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ui
import (
"context"
"fmt"
"io"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -458,12 +457,10 @@ func (ui stepsExecTUI) StepStarted(step int, runScript string) {
ui.updateStatusBar(runScript)
}

func (ui stepsExecTUI) StepStdoutWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser {
return discardCloser{io.Discard}
}
func (ui stepsExecTUI) StepStderrWriter(ctx context.Context, task *executor.Task, step int) io.WriteCloser {
return discardCloser{io.Discard}
func (ui stepsExecTUI) StepOutputWriter(ctx context.Context, task *executor.Task, step int) executor.StepOutputWriter {
return executor.NoopStepOutputWriter{}
}

func (ui stepsExecTUI) CalculatingDiffStarted() {
ui.updateStatusBar("Calculating diff")
}
Expand All @@ -473,7 +470,3 @@ func (ui stepsExecTUI) CalculatingDiffFinished() {
func (ui stepsExecTUI) StepFinished(idx int, diff []byte, changes *git.Changes, outputs map[string]interface{}) {
// noop right now
}

type discardCloser struct{ io.Writer }

func (discardCloser) Close() error { return nil }