diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index cc3768f6..54a53485 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -282,6 +282,36 @@ func (o *Orchestrator) cancelHelper(op *v1.Operation, status v1.OperationStatus) func (o *Orchestrator) Run(ctx context.Context) { zap.L().Info("starting orchestrator loop") + go func() { + // watchdog timer to detect clock jumps and reschedule all tasks. + interval := 10 * time.Second + grace := 5 * time.Second + ticker := time.NewTicker(interval) + lastTickTime := time.Now() + + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + deltaMs := lastTickTime.Add(interval).UnixMilli() - time.Now().UnixMilli() + if deltaMs < 0 { + deltaMs = -deltaMs + } + if deltaMs < grace.Milliseconds() { + continue + } + zap.S().Warnf("detected a clock jump, watchdog timer is off from realtime by %dms, rescheduling all tasks", deltaMs) + + lastTickTime = time.Now() + if err := o.ScheduleDefaultTasks(o.config); err != nil { + zap.S().Errorf("failed to schedule default tasks: %v", err) + } + } + } + }() + for { if ctx.Err() != nil { zap.L().Info("shutting down orchestrator loop, context cancelled.")