diff --git a/cmd/lotus-provider/deps/deps.go b/cmd/lotus-provider/deps/deps.go index 31e3058c1d9..cad7602bb1d 100644 --- a/cmd/lotus-provider/deps/deps.go +++ b/cmd/lotus-provider/deps/deps.go @@ -253,6 +253,7 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, } } } + return nil } diff --git a/itests/harmonytask_test.go b/itests/harmonytask_test.go index ab54cbef42f..463f131d8fc 100644 --- a/itests/harmonytask_test.go +++ b/itests/harmonytask_test.go @@ -37,13 +37,13 @@ func withDbSetup(t *testing.T, f func(*kit.TestMiner)) { f(miner) } -func (t *task1) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { +func (t *task1) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { if !stillOwned() { return false, errors.New("Why not still owned?") } t.myPersonalTableLock.Lock() defer t.myPersonalTableLock.Unlock() - t.WorkCompleted = append(t.WorkCompleted, fmt.Sprintf("taskResult%d", t.myPersonalTable[tID])) + t.WorkCompleted = append(t.WorkCompleted, fmt.Sprintf("taskResult%d", t.myPersonalTable[taskID])) return true, nil } func (t *task1) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { @@ -104,8 +104,8 @@ type passthru struct { adder func(add harmonytask.AddTaskFunc) } -func (t *passthru) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { - return t.do(tID, stillOwned) +func (t *passthru) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + return t.do(taskID, stillOwned) } func (t *passthru) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { return t.canAccept(list, e) diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 5702b31423b..2b252194942 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -95,16 +95,34 @@ top: return false } + releaseStorage := func() { + } + if h.TaskTypeDetails.Cost.Storage != nil { + if err = h.TaskTypeDetails.Cost.Storage.Claim(int(*tID)); err != nil { + log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "storage claim failed", "name", h.Name, "error", err) + return false + } + releaseStorage = func() { + if err := h.TaskTypeDetails.Cost.Storage.MarkComplete(int(*tID)); err != nil { + log.Errorw("Could not release storage", "error", err) + } + } + } + // if recovering we don't need to try to claim anything because those tasks are already claimed by us if from != workSourceRecover { // 4. Can we claim the work for our hostname? ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID) if err != nil { log.Error(err) + + releaseStorage() return false } if ct == 0 { log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name) + releaseStorage() + var tryAgain = make([]TaskID, 0, len(ids)-1) for _, id := range ids { if id != *tID { @@ -134,6 +152,7 @@ top: } h.Count.Add(-1) + releaseStorage() h.recordCompletion(*tID, workStart, done, doErr) if done { for _, fs := range h.TaskEngine.follows[h.Name] { // Do we know of any follows for this task type? @@ -247,5 +266,11 @@ func (h *taskTypeHandler) AssertMachineHasCapacity() error { if r.Gpu-h.Cost.Gpu < 0 { return errors.New("Did not accept " + h.Name + " task: out of available GPU") } + + if h.TaskTypeDetails.Cost.Storage != nil { + if !h.TaskTypeDetails.Cost.Storage.HasCapacity() { + return errors.New("Did not accept " + h.Name + " task: out of available Storage") + } + } return nil } diff --git a/lib/harmony/resources/resources.go b/lib/harmony/resources/resources.go index 406e71da326..4f67a26aff7 100644 --- a/lib/harmony/resources/resources.go +++ b/lib/harmony/resources/resources.go @@ -24,6 +24,19 @@ type Resources struct { Gpu float64 Ram uint64 MachineID int + Storage +} + +// Optional Storage management. +type Storage interface { + HasCapacity() bool + + // This allows some other system to claim space for this task. + Claim(taskID int) error + + // This allows some other system to consider the task done. + // It's up to the caller to remove the data, if that applies. + MarkComplete(taskID int) error } type Reg struct { Resources