diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go index 31b632975ce..7577c5cf564 100644 --- a/lib/harmony/harmonytask/harmonytask.go +++ b/lib/harmony/harmonytask/harmonytask.go @@ -176,7 +176,7 @@ func New( continue // not really fatal, but not great } } - if !h.considerWork("recovered", []TaskID{TaskID(w.ID)}) { + if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) { log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name) } } @@ -285,7 +285,7 @@ func (e *TaskEngine) pollerTryAllWork() { continue } if len(unownedTasks) > 0 { - accepted := v.considerWork("poller", unownedTasks) + accepted := v.considerWork(workSourcePoller, unownedTasks) if accepted { return // accept new work slowly and in priority order } diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 79a156fef12..34f7a5c3e31 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -49,6 +49,11 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error } } +const ( + workSourcePoller = "poller" + workSourceRecover = "recovered" +) + // considerWork is called to attempt to start work on a task-id of this task type. // It presumes single-threaded calling, so there should not be a multi-threaded re-entry. // The only caller should be the one work poller thread. This does spin off other threads, @@ -87,22 +92,25 @@ top: return false } - // 4. Can we claim the work for our hostname? - ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID) - if err != nil { - log.Error(err) - return false - } - if ct == 0 { - log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name) - var tryAgain = make([]TaskID, 0, len(ids)-1) - for _, id := range ids { - if id != *tID { - tryAgain = append(tryAgain, id) + // if recovering we don't need to try to claim anything because those tasks are already claimed by us + if from != workSourceRecover { + // 4. Can we claim the work for our hostname? + ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID) + if err != nil { + log.Error(err) + return false + } + if ct == 0 { + log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name) + var tryAgain = make([]TaskID, 0, len(ids)-1) + for _, id := range ids { + if id != *tID { + tryAgain = append(tryAgain, id) + } } + ids = tryAgain + goto top } - ids = tryAgain - goto top } h.Count.Add(1)