diff --git a/engine/api/event/event.go b/engine/api/event/event.go index 5a5fb23979..97c7dec9b6 100644 --- a/engine/api/event/event.go +++ b/engine/api/event/event.go @@ -159,17 +159,18 @@ func Subscribe(ch chan<- sdk.Event) { // DequeueEvent runs in a goroutine and dequeue event from cache func DequeueEvent(ctx context.Context, db *gorp.DbMap) { for { + if err := ctx.Err(); err != nil { + ctx := sdk.ContextWithStacktrace(ctx, err) + log.Error(ctx, "Exiting event.DequeueEvent : %v", err) + return + } + e := sdk.Event{} if err := store.DequeueWithContext(ctx, "events", 250*time.Millisecond, &e); err != nil { ctx := sdk.ContextWithStacktrace(ctx, err) log.Error(ctx, "Event.DequeueEvent> store.DequeueWithContext err: %v", err) continue } - if err := ctx.Err(); err != nil { - ctx := sdk.ContextWithStacktrace(ctx, err) - log.Error(ctx, "Exiting event.DequeueEvent : %v", err) - return - } // Filter "EventJobSummary" for globalKafka Broker if e.EventType != "sdk.EventJobSummary" { diff --git a/engine/api/event_v2/event.go b/engine/api/event_v2/event.go index dbf3d5c3fd..e52447e5e4 100644 --- a/engine/api/event_v2/event.go +++ b/engine/api/event_v2/event.go @@ -27,12 +27,17 @@ func publish(ctx context.Context, store cache.Store, event sdk.EventV2) { log.Error(ctx, "EventV2.publish: %s", err) return } - return } // Dequeue runs in a goroutine and dequeue event from cache func Dequeue(ctx context.Context, db *gorp.DbMap, store cache.Store, goroutines *sdk.GoRoutines) { for { + if err := ctx.Err(); err != nil { + ctx := sdk.ContextWithStacktrace(ctx, err) + log.Error(ctx, "EventV2.DequeueEvent> Exiting: %v", err) + return + } + e := sdk.EventV2{} if err := store.DequeueWithContext(ctx, eventQueue, 50*time.Millisecond, &e); err != nil { ctx := sdk.ContextWithStacktrace(ctx, err) @@ -75,12 +80,6 @@ func Dequeue(ctx context.Context, db *gorp.DbMap, store cache.Store, goroutines }) wg.Wait() - - if err := ctx.Err(); err != nil { - ctx := sdk.ContextWithStacktrace(ctx, err) - log.Error(ctx, "EventV2.DequeueEvent> Exiting : %v", err) - continue - } } } diff --git a/engine/api/repositoriesmanager/events.go b/engine/api/repositoriesmanager/events.go index efc1665e5c..1253b0a06b 100644 --- a/engine/api/repositoriesmanager/events.go +++ b/engine/api/repositoriesmanager/events.go @@ -13,18 +13,19 @@ import ( "github.com/ovh/cds/sdk" ) -//ReceiveEvents has to be launched as a goroutine. +// ReceiveEvents has to be launched as a goroutine. func ReceiveEvents(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store) { for { + if err := ctx.Err(); err != nil { + log.Error(ctx, "repositoriesmanager.ReceiveEvents> exiting: %v", err) + return + } + e := sdk.Event{} if err := store.DequeueWithContext(ctx, "events_repositoriesmanager", 250*time.Millisecond, &e); err != nil { log.Error(ctx, "repositoriesmanager.ReceiveEvents > store.DequeueWithContext err: %v", err) continue } - if err := ctx.Err(); err != nil { - log.Error(ctx, "Exiting repositoriesmanager.ReceiveEvents: %v", err) - return - } db := DBFunc() if db != nil { @@ -50,7 +51,7 @@ func ReceiveEvents(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.S } } -//RetryEvent retries the events +// RetryEvent retries the events func RetryEvent(e *sdk.Event, err error, store cache.Store) error { e.Attempts++ if e.Attempts > 2 { diff --git a/engine/api/v2_workflow_run_engine.go b/engine/api/v2_workflow_run_engine.go index b03b09e230..3f5fccbf56 100644 --- a/engine/api/v2_workflow_run_engine.go +++ b/engine/api/v2_workflow_run_engine.go @@ -70,20 +70,20 @@ func (api *API) V2WorkflowRunEngineChan(ctx context.Context) { func (api *API) V2WorkflowRunEngineDequeue(ctx context.Context) { for { + if err := ctx.Err(); err != nil { + ctx := sdk.ContextWithStacktrace(ctx, err) + log.Error(ctx, "V2WorkflowRunEngine> Exiting: %v", err) + return + } + var wrEnqueue sdk.V2WorkflowRunEnqueue if err := api.Cache.DequeueWithContext(ctx, workflow_v2.WorkflowEngineKey, 250*time.Millisecond, &wrEnqueue); err != nil { - log.Error(ctx, "V2WorkflowRunEngine > DequeueWithContext err: %v", err) + log.Error(ctx, "V2WorkflowRunEngine> DequeueWithContext err: %v", err) continue } if err := api.workflowRunV2Trigger(ctx, wrEnqueue); err != nil { log.ErrorWithStackTrace(ctx, err) } - if ctx.Err() != nil { - if ctx.Err() != nil { - log.Error(ctx, "%v", ctx.Err()) - } - return - } } } @@ -115,10 +115,10 @@ func (api *API) workflowRunV2Trigger(ctx context.Context, wrEnqueue sdk.V2Workfl // Load run by id run, err := workflow_v2.LoadRunByID(ctx, api.mustDB(), wrEnqueue.RunID, workflow_v2.WithRunResults) - if sdk.ErrorIs(err, sdk.ErrNotFound) { - return nil - } if err != nil { + if sdk.ErrorIs(err, sdk.ErrNotFound) { + return nil + } return sdk.WrapError(err, "unable to load workflow run %s", wrEnqueue.RunID) } diff --git a/engine/cache/redis.go b/engine/cache/redis.go index efdaf69b01..23cf36870c 100644 --- a/engine/cache/redis.go +++ b/engine/cache/redis.go @@ -276,7 +276,7 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, wai break } case <-c.Done(): - return nil + return c.Err() } } if elem != "" { diff --git a/engine/hatchery/serve.go b/engine/hatchery/serve.go index c36032d376..d19ce0ae5d 100644 --- a/engine/hatchery/serve.go +++ b/engine/hatchery/serve.go @@ -223,6 +223,9 @@ func (c *Common) Init(ctx context.Context, h hatchery.Interface) error { var cfg sdk.CDNConfig var err error for { + if err := ctx.Err(); err != nil { + return err + } cfg, err = c.Client.ConfigCDN() if err == nil { break diff --git a/engine/repositories/processor.go b/engine/repositories/processor.go index f07e6e9ddb..c9fe988a32 100644 --- a/engine/repositories/processor.go +++ b/engine/repositories/processor.go @@ -13,6 +13,9 @@ import ( func (s *Service) processor(ctx context.Context) error { for { + if ctx.Err() != nil { + return ctx.Err() + } var uuid string if err := s.dao.store.DequeueWithContext(ctx, processorKey, 250*time.Millisecond, &uuid); err != nil { log.Error(ctx, "repositories > processor > store.DequeueWithContext err: %v", err) @@ -36,9 +39,6 @@ func (s *Service) processor(ctx context.Context) error { } } } - if ctx.Err() != nil { - return ctx.Err() - } } }