Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for tracer to be done before finishing workflow #4068

Merged
merged 1 commit into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions agent/rpc/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@
}

// Next returns the next workflow in the queue.
func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error) {
func (c *client) Next(ctx context.Context, filter rpc.Filter) (*rpc.Workflow, error) {

Check warning on line 75 in agent/rpc/client_grpc.go

View check run for this annotation

Codecov / codecov/patch

agent/rpc/client_grpc.go#L75

Added line #L75 was not covered by tests
var res *proto.NextResponse
var err error
retry := c.newBackOff()
req := new(proto.NextRequest)
req.Filter = new(proto.Filter)
req.Filter.Labels = f.Labels
req.Filter.Labels = filter.Labels

Check warning on line 81 in agent/rpc/client_grpc.go

View check run for this annotation

Codecov / codecov/patch

agent/rpc/client_grpc.go#L81

Added line #L81 was not covered by tests
for {
res, err = c.client.Next(ctx, req)
if err == nil {
Expand Down Expand Up @@ -135,10 +135,10 @@
}

// Wait blocks until the workflow is complete.
func (c *client) Wait(ctx context.Context, id string) (err error) {
func (c *client) Wait(ctx context.Context, workflowID string) (err error) {

Check warning on line 138 in agent/rpc/client_grpc.go

View check run for this annotation

Codecov / codecov/patch

agent/rpc/client_grpc.go#L138

Added line #L138 was not covered by tests
retry := c.newBackOff()
req := new(proto.WaitRequest)
req.Id = id
req.Id = workflowID

Check warning on line 141 in agent/rpc/client_grpc.go

View check run for this annotation

Codecov / codecov/patch

agent/rpc/client_grpc.go#L141

Added line #L141 was not covered by tests
for {
_, err = c.client.Wait(ctx, req)
if err == nil {
Expand Down Expand Up @@ -273,10 +273,10 @@
}

// Extend extends the workflow deadline.
func (c *client) Extend(ctx context.Context, id string) (err error) {
func (c *client) Extend(ctx context.Context, workflowID string) (err error) {

Check warning on line 276 in agent/rpc/client_grpc.go

View check run for this annotation

Codecov / codecov/patch

agent/rpc/client_grpc.go#L276

Added line #L276 was not covered by tests
retry := c.newBackOff()
req := new(proto.ExtendRequest)
req.Id = id
req.Id = workflowID

Check warning on line 279 in agent/rpc/client_grpc.go

View check run for this annotation

Codecov / codecov/patch

agent/rpc/client_grpc.go#L279

Added line #L279 was not covered by tests
for {
_, err = c.client.Extend(ctx, req)
if err == nil {
Expand Down Expand Up @@ -317,10 +317,10 @@
}

// Update updates the workflow state.
func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (err error) {
func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) (err error) {

Check warning on line 320 in agent/rpc/client_grpc.go

View check run for this annotation

Codecov / codecov/patch

agent/rpc/client_grpc.go#L320

Added line #L320 was not covered by tests
retry := c.newBackOff()
req := new(proto.UpdateRequest)
req.Id = id
req.Id = workflowID

Check warning on line 323 in agent/rpc/client_grpc.go

View check run for this annotation

Codecov / codecov/patch

agent/rpc/client_grpc.go#L323

Added line #L323 was not covered by tests
req.State = new(proto.StepState)
req.State.StepUuid = state.StepUUID
req.State.Started = state.Started
Expand Down Expand Up @@ -367,7 +367,7 @@
return nil
}

// Log writes the workflow log entry.
// Log writes the step log entry.
func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
retry := c.newBackOff()
req := new(proto.LogRequest)
Expand Down
10 changes: 4 additions & 6 deletions agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
if err := r.client.Wait(workflowCtx, workflow.ID); err != nil {
canceled = true
logger.Warn().Err(err).Msg("cancel signal received")

cancel()
} else {
logger.Debug().Msg("done listening for cancel signal")
Expand All @@ -117,11 +116,10 @@
select {
case <-workflowCtx.Done():
logger.Debug().Msg("pipeline done")

return

case <-time.After(time.Minute):
logger.Debug().Msg("pipeline lease renewed")

if err := r.client.Extend(workflowCtx, workflow.ID); err != nil {
log.Error().Err(err).Msg("extending pipeline deadline failed")
}
Expand All @@ -144,7 +142,7 @@
pipeline.WithContext(workflowCtx),
pipeline.WithTaskUUID(fmt.Sprint(workflow.ID)),
pipeline.WithLogger(r.createLogger(logger, &uploads, workflow)),
pipeline.WithTracer(r.createTracer(ctxMeta, logger, workflow)),
pipeline.WithTracer(r.createTracer(ctxMeta, &uploads, logger, workflow)),

Check warning on line 145 in agent/runner.go

View check run for this annotation

Codecov / codecov/patch

agent/runner.go#L145

Added line #L145 was not covered by tests
pipeline.WithBackend(*r.backend),
pipeline.WithDescription(map[string]string{
"workflow_id": workflow.ID,
Expand All @@ -170,9 +168,9 @@
Bool("canceled", canceled).
Msg("workflow finished")

logger.Debug().Msg("uploading logs ...")
logger.Debug().Msg("uploading logs and traces / states ...")

Check warning on line 171 in agent/runner.go

View check run for this annotation

Codecov / codecov/patch

agent/runner.go#L171

Added line #L171 was not covered by tests
uploads.Wait()
logger.Debug().Msg("uploaded logs")
logger.Debug().Msg("uploaded logs and traces / states")

Check warning on line 173 in agent/runner.go

View check run for this annotation

Codecov / codecov/patch

agent/runner.go#L173

Added line #L173 was not covered by tests

logger.Debug().
Str("error", state.Error).
Expand Down
8 changes: 6 additions & 2 deletions agent/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"context"
"runtime"
"strconv"
"sync"
"time"

"github.com/rs/zerolog"
Expand All @@ -26,11 +27,13 @@
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
)

func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc {
func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc {

Check warning on line 30 in agent/tracer.go

View check run for this annotation

Codecov / codecov/patch

agent/tracer.go#L30

Added line #L30 was not covered by tests
return func(state *pipeline.State) error {
uploads.Add(1)

Check warning on line 33 in agent/tracer.go

View check run for this annotation

Codecov / codecov/patch

agent/tracer.go#L32-L33

Added lines #L32 - L33 were not covered by tests
stepLogger := logger.With().
Str("image", state.Pipeline.Step.Image).
Str("workflowID", workflow.ID).
Str("workflow_id", workflow.ID).

Check warning on line 36 in agent/tracer.go

View check run for this annotation

Codecov / codecov/patch

agent/tracer.go#L36

Added line #L36 was not covered by tests
Err(state.Process.Error).
Int("exit_code", state.Process.ExitCode).
Bool("exited", state.Process.Exited).
Expand All @@ -57,6 +60,7 @@
}

stepLogger.Debug().Msg("update step status complete")
uploads.Done()

Check warning on line 63 in agent/tracer.go

View check run for this annotation

Codecov / codecov/patch

agent/tracer.go#L63

Added line #L63 was not covered by tests
}()
if state.Process.Exited {
return nil
Expand Down
4 changes: 2 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@
return nil, nil
}

// Some pipeline backends, such as local, will close the pipe from Tail on Wait,
// so first make sure all reading has finished.
// We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream)
wg.Wait()

Check warning on line 263 in pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

pipeline/pipeline.go#L263

Added line #L263 was not covered by tests
waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand Down