Skip to content

Commit

Permalink
Moves the telemetry DetachedContext to util (#2060)
Browse files Browse the repository at this point in the history
The DetachedContext is currently defined in telemetry, but has wider
potential use in long-running goroutines that we don't want to fail when
their parent is cancelled. For this reason this PR moves it into the
util pkg so that it can be used for the notify* functions of the
scheduler.
  • Loading branch information
rossjones authored Feb 27, 2023
1 parent ebcd259 commit e82bd9e
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
8 changes: 4 additions & 4 deletions pkg/executor/docker/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/storage"
"github.com/bacalhau-project/bacalhau/pkg/storage/util"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"
pkgUtil "github.com/bacalhau-project/bacalhau/pkg/util"
dockertypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
Expand Down Expand Up @@ -288,7 +288,7 @@ func (e *Executor) RunShard(
}

// Can't use the original context as it may have already been timed out
detachedContext, cancel := context.WithTimeout(telemetry.NewDetachedContext(ctx), 3*time.Second)
detachedContext, cancel := context.WithTimeout(pkgUtil.NewDetachedContext(ctx), 3*time.Second)
defer cancel()
stdoutPipe, stderrPipe, logsErr := e.client.FollowLogs(detachedContext, jobContainer.ID)
log.Ctx(detachedContext).Debug().Err(logsErr).Msg("Captured stdout/stderr for container")
Expand All @@ -304,7 +304,7 @@ func (e *Executor) RunShard(

func (e *Executor) cleanupJob(ctx context.Context, shard model.JobShard) {
// Use a detached context in case the current one has already been canceled
separateCtx, cancel := context.WithTimeout(telemetry.NewDetachedContext(ctx), 1*time.Minute)
separateCtx, cancel := context.WithTimeout(pkgUtil.NewDetachedContext(ctx), 1*time.Minute)
defer cancel()
if config.ShouldKeepStack() || !e.client.IsInstalled(separateCtx) {
return
Expand All @@ -318,7 +318,7 @@ func (e *Executor) cleanupJob(ctx context.Context, shard model.JobShard) {
func (e *Executor) cleanupAll(ctx context.Context) error {
// We have to use a detached context, rather than the one passed in to `NewExecutor`, as it may have already been
// canceled and so would prevent us from performing any cleanup work.
safeCtx := telemetry.NewDetachedContext(ctx)
safeCtx := pkgUtil.NewDetachedContext(ctx)
if config.ShouldKeepStack() || !e.client.IsInstalled(safeCtx) {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/system/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
realsync "sync"
"time"

"github.com/bacalhau-project/bacalhau/pkg/telemetry"
"github.com/bacalhau-project/bacalhau/pkg/util"
sync "github.com/bacalhau-project/golang-mutex-tracer"
"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -66,7 +66,7 @@ func (cm *CleanupManager) Cleanup(ctx context.Context) {
var wg realsync.WaitGroup
wg.Add(len(cm.fns))

detachedContext := telemetry.NewDetachedContext(ctx)
detachedContext := util.NewDetachedContext(ctx)

for i := 0; i < len(cm.fns); i++ {
go func(fn any) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/telemetry/context.go → pkg/util/context.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package telemetry
package util

import (
"context"
Expand Down
5 changes: 3 additions & 2 deletions pkg/telemetry/context_test.go → pkg/util/context_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package telemetry
package util

import (
"context"
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"
)

func TestDetachedContext_Value_valuesPassedThrough(t *testing.T) {
Expand Down

0 comments on commit e82bd9e

Please sign in to comment.