diff --git a/api/api_storage.go b/api/api_storage.go index 0411af537df..5785ede1fdd 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -54,6 +54,8 @@ type StorageMiner interface { ComputeWindowPoSt(ctx context.Context, dlIdx uint64, tsk types.TipSetKey) ([]miner.SubmitWindowedPoStParams, error) //perm:admin + ComputeDataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) //perm:admin + // Temp api for testing PledgeSector(context.Context) (abi.SectorID, error) //perm:write @@ -124,6 +126,7 @@ type StorageMiner interface { WorkerJobs(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) //perm:admin //storiface.WorkerReturn + ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error //perm:admin retry:true ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error //perm:admin retry:true diff --git a/api/api_worker.go b/api/api_worker.go index 0c4fb3d14b1..cd4cde15136 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -34,6 +34,7 @@ type Worker interface { Info(context.Context) (storiface.WorkerInfo, error) //perm:admin // storiface.WorkerCalls + DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) //perm:admin AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) //perm:admin SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index b5bc36ade2d..300e6755528 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -641,6 +641,8 @@ type StorageMinerStruct struct { CheckProvable func(p0 context.Context, p1 abi.RegisteredPoStProof, p2 []storage.SectorRef, p3 bool) (map[abi.SectorNumber]string, error) `perm:"admin"` + ComputeDataCid func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (abi.PieceInfo, error) `perm:"admin"` + ComputeProof func(p0 context.Context, p1 []builtin.ExtendedSectorInfo, p2 abi.PoStRandomness, p3 abi.ChainEpoch, p4 abinetwork.Version) ([]builtin.PoStProof, error) `perm:"read"` ComputeWindowPoSt func(p0 context.Context, p1 uint64, p2 types.TipSetKey) ([]miner.SubmitWindowedPoStParams, error) `perm:"admin"` @@ -743,6 +745,8 @@ type StorageMinerStruct struct { ReturnAddPiece func(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error `perm:"admin"` + ReturnDataCid func(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error `perm:"admin"` + ReturnFetch func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"` ReturnFinalizeReplicaUpdate func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"` @@ -894,6 +898,8 @@ type WorkerStruct struct { Internal struct { AddPiece func(p0 context.Context, p1 storage.SectorRef, p2 []abi.UnpaddedPieceSize, p3 abi.UnpaddedPieceSize, p4 storage.Data) (storiface.CallID, error) `perm:"admin"` + DataCid func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (storiface.CallID, error) `perm:"admin"` + Enabled func(p0 context.Context) (bool, error) `perm:"admin"` Fetch func(p0 context.Context, p1 storage.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"` @@ -3861,6 +3867,17 @@ func (s *StorageMinerStub) CheckProvable(p0 context.Context, p1 abi.RegisteredPo return *new(map[abi.SectorNumber]string), ErrNotSupported } +func (s *StorageMinerStruct) ComputeDataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (abi.PieceInfo, error) { + if s.Internal.ComputeDataCid == nil { + return *new(abi.PieceInfo), ErrNotSupported + } + return s.Internal.ComputeDataCid(p0, p1, p2) +} + +func (s *StorageMinerStub) ComputeDataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (abi.PieceInfo, error) { + return *new(abi.PieceInfo), ErrNotSupported +} + func (s *StorageMinerStruct) ComputeProof(p0 context.Context, p1 []builtin.ExtendedSectorInfo, p2 abi.PoStRandomness, p3 abi.ChainEpoch, p4 abinetwork.Version) ([]builtin.PoStProof, error) { if s.Internal.ComputeProof == nil { return *new([]builtin.PoStProof), ErrNotSupported @@ -4422,6 +4439,17 @@ func (s *StorageMinerStub) ReturnAddPiece(p0 context.Context, p1 storiface.CallI return ErrNotSupported } +func (s *StorageMinerStruct) ReturnDataCid(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error { + if s.Internal.ReturnDataCid == nil { + return ErrNotSupported + } + return s.Internal.ReturnDataCid(p0, p1, p2, p3) +} + +func (s *StorageMinerStub) ReturnDataCid(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error { + return ErrNotSupported +} + func (s *StorageMinerStruct) ReturnFetch(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error { if s.Internal.ReturnFetch == nil { return ErrNotSupported @@ -5159,6 +5187,17 @@ func (s *WorkerStub) AddPiece(p0 context.Context, p1 storage.SectorRef, p2 []abi return *new(storiface.CallID), ErrNotSupported } +func (s *WorkerStruct) DataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (storiface.CallID, error) { + if s.Internal.DataCid == nil { + return *new(storiface.CallID), ErrNotSupported + } + return s.Internal.DataCid(p0, p1, p2) +} + +func (s *WorkerStub) DataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (storiface.CallID, error) { + return *new(storiface.CallID), ErrNotSupported +} + func (s *WorkerStruct) Enabled(p0 context.Context) (bool, error) { if s.Internal.Enabled == nil { return false, ErrNotSupported diff --git a/build/builtin-actors/builtin-actors-v7.car b/build/builtin-actors/builtin-actors-v7.car new file mode 100644 index 00000000000..e69de29bb2d diff --git a/build/builtin-actors/builtin-actors-v8.car b/build/builtin-actors/builtin-actors-v8.car new file mode 100644 index 00000000000..1193b93b358 Binary files /dev/null and b/build/builtin-actors/builtin-actors-v8.car differ diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 1eaaf6a849b..c27a0fc28a1 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 7b1191138e7..952c771ca2d 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index 554082fd6e4..f1372a6602a 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index 12c5f8dc880..c341f52c26f 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -313,7 +313,7 @@ var runCmd = &cli.Command{ } if (workerType == sealtasks.WorkerSealing || cctx.IsSet("addpiece")) && cctx.Bool("addpiece") { - taskTypes = append(taskTypes, sealtasks.TTAddPiece) + taskTypes = append(taskTypes, sealtasks.TTAddPiece, sealtasks.TTDataCid) } if (workerType == sealtasks.WorkerSealing || cctx.IsSet("precommit1")) && cctx.Bool("precommit1") { taskTypes = append(taskTypes, sealtasks.TTPreCommit1) diff --git a/cmd/lotus-worker/tasks.go b/cmd/lotus-worker/tasks.go index 52133d09d16..880381fd259 100644 --- a/cmd/lotus-worker/tasks.go +++ b/cmd/lotus-worker/tasks.go @@ -23,6 +23,7 @@ var tasksCmd = &cli.Command{ var allowSetting = map[sealtasks.TaskType]struct{}{ sealtasks.TTAddPiece: {}, + sealtasks.TTDataCid: {}, sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}, sealtasks.TTCommit2: {}, diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index 64d09971b9f..7642efbee5a 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -15,6 +15,7 @@ * [Check](#Check) * [CheckProvable](#CheckProvable) * [Compute](#Compute) + * [ComputeDataCid](#ComputeDataCid) * [ComputeProof](#ComputeProof) * [ComputeWindowPoSt](#ComputeWindowPoSt) * [Create](#Create) @@ -105,6 +106,7 @@ * [PledgeSector](#PledgeSector) * [Return](#Return) * [ReturnAddPiece](#ReturnAddPiece) + * [ReturnDataCid](#ReturnDataCid) * [ReturnFetch](#ReturnFetch) * [ReturnFinalizeReplicaUpdate](#ReturnFinalizeReplicaUpdate) * [ReturnFinalizeSector](#ReturnFinalizeSector) @@ -361,6 +363,29 @@ Response: ## Compute +### ComputeDataCid + + +Perms: admin + +Inputs: +```json +[ + 1024, + {} +] +``` + +Response: +```json +{ + "Size": 1032, + "PieceCID": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } +} +``` + ### ComputeProof @@ -2195,6 +2220,36 @@ Response: ### ReturnAddPiece + + +Perms: admin + +Inputs: +```json +[ + { + "Sector": { + "Miner": 1000, + "Number": 9 + }, + "ID": "07070707-0707-0707-0707-070707070707" + }, + { + "Size": 1032, + "PieceCID": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } + }, + { + "Code": 0, + "Message": "string value" + } +] +``` + +Response: `{}` + +### ReturnDataCid storiface.WorkerReturn @@ -4020,6 +4075,88 @@ Response: "BaseMinMemory": 68719476736 } }, + "seal/v0/datacid": { + "0": { + "MinMemory": 2048, + "MaxMemory": 2048, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 2048 + }, + "1": { + "MinMemory": 8388608, + "MaxMemory": 8388608, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 8388608 + }, + "2": { + "MinMemory": 1073741824, + "MaxMemory": 1073741824, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "3": { + "MinMemory": 4294967296, + "MaxMemory": 4294967296, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "4": { + "MinMemory": 8589934592, + "MaxMemory": 8589934592, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "5": { + "MinMemory": 2048, + "MaxMemory": 2048, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 2048 + }, + "6": { + "MinMemory": 8388608, + "MaxMemory": 8388608, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 8388608 + }, + "7": { + "MinMemory": 1073741824, + "MaxMemory": 1073741824, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "8": { + "MinMemory": 4294967296, + "MaxMemory": 4294967296, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "9": { + "MinMemory": 8589934592, + "MaxMemory": 8589934592, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + } + }, "seal/v0/fetch": { "0": { "MinMemory": 1048576, diff --git a/documentation/en/api-v0-methods-worker.md b/documentation/en/api-v0-methods-worker.md index 4a09e530132..57a371008bd 100644 --- a/documentation/en/api-v0-methods-worker.md +++ b/documentation/en/api-v0-methods-worker.md @@ -9,6 +9,8 @@ * [Version](#Version) * [Add](#Add) * [AddPiece](#AddPiece) +* [Data](#Data) + * [DataCid](#DataCid) * [Finalize](#Finalize) * [FinalizeReplicaUpdate](#FinalizeReplicaUpdate) * [FinalizeSector](#FinalizeSector) @@ -520,6 +522,88 @@ Response: "BaseMinMemory": 68719476736 } }, + "seal/v0/datacid": { + "0": { + "MinMemory": 2048, + "MaxMemory": 2048, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 2048 + }, + "1": { + "MinMemory": 8388608, + "MaxMemory": 8388608, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 8388608 + }, + "2": { + "MinMemory": 1073741824, + "MaxMemory": 1073741824, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "3": { + "MinMemory": 4294967296, + "MaxMemory": 4294967296, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "4": { + "MinMemory": 8589934592, + "MaxMemory": 8589934592, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "5": { + "MinMemory": 2048, + "MaxMemory": 2048, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 2048 + }, + "6": { + "MinMemory": 8388608, + "MaxMemory": 8388608, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 8388608 + }, + "7": { + "MinMemory": 1073741824, + "MaxMemory": 1073741824, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "8": { + "MinMemory": 4294967296, + "MaxMemory": 4294967296, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "9": { + "MinMemory": 8589934592, + "MaxMemory": 8589934592, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + } + }, "seal/v0/fetch": { "0": { "MinMemory": 1048576, @@ -1242,7 +1326,6 @@ Response: `131584` ### AddPiece -storiface.WorkerCalls Perms: admin @@ -1276,6 +1359,34 @@ Response: } ``` +## Data + + +### DataCid +storiface.WorkerCalls + + +Perms: admin + +Inputs: +```json +[ + 1024, + {} +] +``` + +Response: +```json +{ + "Sector": { + "Miner": 1000, + "Number": 9 + }, + "ID": "07070707-0707-0707-0707-070707070707" +} +``` + ## Finalize diff --git a/documentation/en/cli-lotus-worker.md b/documentation/en/cli-lotus-worker.md index 9212d59cf79..68d0d45c248 100644 --- a/documentation/en/cli-lotus-worker.md +++ b/documentation/en/cli-lotus-worker.md @@ -173,7 +173,7 @@ NAME: lotus-worker tasks enable - Enable a task type USAGE: - lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK] + lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|DC|GSK] OPTIONS: --help, -h show help (default: false) @@ -186,7 +186,7 @@ NAME: lotus-worker tasks disable - Disable a task type USAGE: - lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK] + lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|DC|GSK] OPTIONS: --help, -h show help (default: false) diff --git a/extern/sector-storage/ffiwrapper/sealer_cgo.go b/extern/sector-storage/ffiwrapper/sealer_cgo.go index 3f596d250a4..400b672117f 100644 --- a/extern/sector-storage/ffiwrapper/sealer_cgo.go +++ b/extern/sector-storage/ffiwrapper/sealer_cgo.go @@ -51,6 +51,120 @@ func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error return nil } +func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { + // TODO: allow tuning those: + chunk := abi.PaddedPieceSize(4 << 20) + parallel := runtime.NumCPU() + + maxSizeSpt := abi.RegisteredSealProof_StackedDrg64GiBV1_1 + + throttle := make(chan []byte, parallel) + piecePromises := make([]func() (abi.PieceInfo, error), 0) + + buf := make([]byte, chunk.Unpadded()) + for i := 0; i < parallel; i++ { + if abi.UnpaddedPieceSize(i)*chunk.Unpadded() >= pieceSize { + break // won't use this many buffers + } + throttle <- make([]byte, chunk.Unpadded()) + } + + for { + var read int + for rbuf := buf; len(rbuf) > 0; { + n, err := pieceData.Read(rbuf) + if err != nil && err != io.EOF { + return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err) + } + + rbuf = rbuf[n:] + read += n + + if err == io.EOF { + break + } + } + if read == 0 { + break + } + + done := make(chan struct { + cid.Cid + error + }, 1) + pbuf := <-throttle + copy(pbuf, buf[:read]) + + go func(read int) { + defer func() { + throttle <- pbuf + }() + + c, err := sb.pieceCid(maxSizeSpt, pbuf[:read]) + done <- struct { + cid.Cid + error + }{c, err} + }(read) + + piecePromises = append(piecePromises, func() (abi.PieceInfo, error) { + select { + case e := <-done: + if e.error != nil { + return abi.PieceInfo{}, e.error + } + + return abi.PieceInfo{ + Size: abi.UnpaddedPieceSize(read).Padded(), + PieceCID: e.Cid, + }, nil + case <-ctx.Done(): + return abi.PieceInfo{}, ctx.Err() + } + }) + } + + if len(piecePromises) == 1 { + return piecePromises[0]() + } + + var payloadRoundedBytes abi.PaddedPieceSize + pieceCids := make([]abi.PieceInfo, len(piecePromises)) + for i, promise := range piecePromises { + pinfo, err := promise() + if err != nil { + return abi.PieceInfo{}, err + } + + pieceCids[i] = pinfo + payloadRoundedBytes += pinfo.Size + } + + pieceCID, err := ffi.GenerateUnsealedCID(maxSizeSpt, pieceCids) + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("generate unsealed CID: %w", err) + } + + // validate that the pieceCID was properly formed + if _, err := commcid.CIDToPieceCommitmentV1(pieceCID); err != nil { + return abi.PieceInfo{}, err + } + + if payloadRoundedBytes < pieceSize.Padded() { + paddedCid, err := commpffi.ZeroPadPieceCommitment(pieceCID, payloadRoundedBytes.Unpadded(), pieceSize) + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("failed to pad data: %w", err) + } + + pieceCID = paddedCid + } + + return abi.PieceInfo{ + Size: pieceSize.Padded(), + PieceCID: pieceCID, + }, nil +} + func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) { // TODO: allow tuning those: chunk := abi.PaddedPieceSize(4 << 20) diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 8bea96cca73..4b52f9a1d7c 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -165,7 +165,7 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores. sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTFinalizeReplicaUpdate, } if sc.AllowAddPiece { - localTasks = append(localTasks, sealtasks.TTAddPiece) + localTasks = append(localTasks, sealtasks.TTAddPiece, sealtasks.TTDataCid) } if sc.AllowPreCommit1 { localTasks = append(localTasks, sealtasks.TTPreCommit1) @@ -327,6 +327,27 @@ func (m *Manager) NewSector(ctx context.Context, sector storage.SectorRef) error return nil } +func (m *Manager) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + selector := newTaskSelector() + + var out abi.PieceInfo + err := m.sched.Schedule(ctx, storage.NoSectorRef, sealtasks.TTDataCid, selector, schedNop, func(ctx context.Context, w Worker) error { + p, err := m.waitSimpleCall(ctx)(w.DataCid(ctx, pieceSize, pieceData)) + if err != nil { + return err + } + if p != nil { + out = p.(abi.PieceInfo) + } + return nil + }) + + return out, err +} + func (m *Manager) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -975,6 +996,10 @@ func (m *Manager) ProveReplicaUpdate2(ctx context.Context, sector storage.Sector return out, waitErr } +func (m *Manager) ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error { + return m.returnResult(ctx, callID, pi, err) +} + func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error { return m.returnResult(ctx, callID, pi, err) } diff --git a/extern/sector-storage/mock/mock.go b/extern/sector-storage/mock/mock.go index 37d8af00e4b..ecb06da0b63 100644 --- a/extern/sector-storage/mock/mock.go +++ b/extern/sector-storage/mock/mock.go @@ -80,6 +80,10 @@ func (mgr *SectorMgr) SectorsUnsealPiece(ctx context.Context, sector storage.Sec panic("SectorMgr: unsealing piece: implement me") } +func (mgr *SectorMgr) DataCid(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { + panic("todo") +} + func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorID storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { log.Warn("Add piece: ", sectorID, size, sectorID.ProofType) @@ -537,6 +541,10 @@ func (mgr *SectorMgr) CheckProvable(ctx context.Context, pp abi.RegisteredPoStPr var _ storiface.WorkerReturn = &SectorMgr{} +func (mgr *SectorMgr) ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error { + panic("not supported") +} + func (mgr *SectorMgr) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error { panic("not supported") } diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 2245c8a3f46..77e67479345 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -68,6 +68,10 @@ type schedTestWorker struct { ignoreResources bool } +func (s *schedTestWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { + panic("implement me") +} + func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { panic("implement me") } diff --git a/extern/sector-storage/sealtasks/task.go b/extern/sector-storage/sealtasks/task.go index 1d3d3c1b5f0..e8a156291b7 100644 --- a/extern/sector-storage/sealtasks/task.go +++ b/extern/sector-storage/sealtasks/task.go @@ -3,6 +3,7 @@ package sealtasks type TaskType string const ( + TTDataCid TaskType = "seal/v0/datacid" TTAddPiece TaskType = "seal/v0/addpiece" TTPreCommit1 TaskType = "seal/v0/precommit/1" TTPreCommit2 TaskType = "seal/v0/precommit/2" @@ -25,7 +26,8 @@ const ( ) var order = map[TaskType]int{ - TTRegenSectorKey: 10, // least priority + TTRegenSectorKey: 11, // least priority + TTDataCid: 10, TTAddPiece: 9, TTReplicaUpdate: 8, TTProveReplicaUpdate2: 7, @@ -44,6 +46,7 @@ var order = map[TaskType]int{ } var shortNames = map[TaskType]string{ + TTDataCid: "DC", TTAddPiece: "AP", TTPreCommit1: "PC1", diff --git a/extern/sector-storage/storiface/resources.go b/extern/sector-storage/storiface/resources.go index ce533e2c00d..71fd9e30c62 100644 --- a/extern/sector-storage/storiface/resources.go +++ b/extern/sector-storage/storiface/resources.go @@ -569,6 +569,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources func init() { ResourceTable[sealtasks.TTUnseal] = ResourceTable[sealtasks.TTPreCommit1] // TODO: measure accurately ResourceTable[sealtasks.TTRegenSectorKey] = ResourceTable[sealtasks.TTReplicaUpdate] + ResourceTable[sealtasks.TTDataCid] = ResourceTable[sealtasks.TTAddPiece] // V1_1 is the same as V1 for _, m := range ResourceTable { diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index e37df31b5a6..5b4fabf0248 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -117,6 +117,7 @@ var UndefCall CallID type WorkerCalls interface { // async + DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (CallID, error) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (CallID, error) @@ -197,6 +198,7 @@ func Err(code ErrorCode, sub error) *CallError { } type WorkerReturn interface { + ReturnDataCid(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error ReturnAddPiece(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error ReturnSealPreCommit1(ctx context.Context, callID CallID, p1o storage.PreCommit1Out, err *CallError) error ReturnSealPreCommit2(ctx context.Context, callID CallID, sealed storage.SectorCids, err *CallError) error diff --git a/extern/sector-storage/teststorage_test.go b/extern/sector-storage/teststorage_test.go index c825542ea8c..0c15fbf7bab 100644 --- a/extern/sector-storage/teststorage_test.go +++ b/extern/sector-storage/teststorage_test.go @@ -23,6 +23,10 @@ type testExec struct { apch chan chan apres } +func (t *testExec) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { + panic("implement me") +} + func (t *testExec) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.ExtendedSectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error) { panic("implement me") } diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 46464caf6fa..9a14e42b5f1 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -180,6 +180,7 @@ func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) { type ReturnType string const ( + DataCid ReturnType = "DataCid" AddPiece ReturnType = "AddPiece" SealPreCommit1 ReturnType = "SealPreCommit1" SealPreCommit2 ReturnType = "SealPreCommit2" @@ -232,6 +233,7 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor } var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error{ + DataCid: rfunc(storiface.WorkerReturn.ReturnDataCid), AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece), SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1), SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2), @@ -341,6 +343,17 @@ func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) e return sb.NewSector(ctx, sector) } +func (l *LocalWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { + sb, err := l.executor() + if err != nil { + return storiface.UndefCall, err + } + + return l.asyncCall(ctx, storage.NoSectorRef, DataCid, func(ctx context.Context, ci storiface.CallID) (interface{}, error) { + return sb.DataCid(ctx, pieceSize, pieceData) + }) +} + func (l *LocalWorker) AddPiece(ctx context.Context, sector storage.SectorRef, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) { sb, err := l.executor() if err != nil { diff --git a/extern/sector-storage/worker_tracked.go b/extern/sector-storage/worker_tracked.go index 1d92579a5e4..e3ce0a46a45 100644 --- a/extern/sector-storage/worker_tracked.go +++ b/extern/sector-storage/worker_tracked.go @@ -186,6 +186,12 @@ func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.Secto return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) }) } +func (t *trackedWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, storage.NoSectorRef, sealtasks.TTDataCid, func() (storiface.CallID, error) { + return t.Worker.DataCid(ctx, pieceSize, pieceData) + }) +} + func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece, func() (storiface.CallID, error) { return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData) diff --git a/go.mod b/go.mod index f5b141d0977..8cc4f32d1e0 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/filecoin-project/specs-actors/v5 v5.0.4 github.com/filecoin-project/specs-actors/v6 v6.0.1 github.com/filecoin-project/specs-actors/v7 v7.0.0 - github.com/filecoin-project/specs-storage v0.2.2 + github.com/filecoin-project/specs-storage v0.2.3-0.20220426183226-1a0a63c5990f github.com/filecoin-project/test-vectors/schema v0.0.5 github.com/gbrlsnchs/jwt/v3 v3.0.1 github.com/gdamore/tcell/v2 v2.2.0 diff --git a/go.sum b/go.sum index 5cc8b469380..9ec39ebc399 100644 --- a/go.sum +++ b/go.sum @@ -394,8 +394,8 @@ github.com/filecoin-project/specs-actors/v6 v6.0.1/go.mod h1:V1AYfi5GkHXipx1mnVi github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1.0.20220118005651-2470cb39827e/go.mod h1:TA5FwCna+Yi36POaT7SLKXsgEDvJwc0V/L6ZsO19B9M= github.com/filecoin-project/specs-actors/v7 v7.0.0 h1:FQN7tjt3o68hfb3qLFSJBoLMuOFY0REkFVLO/zXj8RU= github.com/filecoin-project/specs-actors/v7 v7.0.0/go.mod h1:TA5FwCna+Yi36POaT7SLKXsgEDvJwc0V/L6ZsO19B9M= -github.com/filecoin-project/specs-storage v0.2.2 h1:6ugbtKQ6LTcTEnEIX9HkeCtTp1PCYO497P/bokF5tF4= -github.com/filecoin-project/specs-storage v0.2.2/go.mod h1:6cc/lncmAxMUocPi0z1EPCX63beIX7F7UnlmUZ3hLQo= +github.com/filecoin-project/specs-storage v0.2.3-0.20220426183226-1a0a63c5990f h1:+suJFu4RJt7aZRXvE+Innrpacap+Z8N87y6a1Cgkuqc= +github.com/filecoin-project/specs-storage v0.2.3-0.20220426183226-1a0a63c5990f/go.mod h1:6cc/lncmAxMUocPi0z1EPCX63beIX7F7UnlmUZ3hLQo= github.com/filecoin-project/storetheindex v0.3.5 h1:KoS9TvjPm6zIZfUH8atAHJbVHOO7GTP1MdTG+v0eE+Q= github.com/filecoin-project/storetheindex v0.3.5/go.mod h1:0r3d0kSpK63O6AvLr1CjAINLi+nWD49clzcnKV+GLpI= github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg= diff --git a/itests/worker_test.go b/itests/worker_test.go index 03b8cceba5d..c1fba26007f 100644 --- a/itests/worker_test.go +++ b/itests/worker_test.go @@ -2,6 +2,7 @@ package itests import ( "context" + "strings" "sync/atomic" "testing" "time" @@ -40,6 +41,27 @@ func TestWorkerPledge(t *testing.T) { miner.PledgeSectors(ctx, 1, 0, nil) } +func TestWorkerDataCid(t *testing.T) { + ctx := context.Background() + _, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true), + kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs + + e, err := worker.Enabled(ctx) + require.NoError(t, err) + require.True(t, e) + /* + pi, err := miner.ComputeDataCid(ctx, 1016, strings.NewReader(strings.Repeat("a", 1016))) + require.NoError(t, err) + require.Equal(t, abi.PaddedPieceSize(1024), pi.Size) + require.Equal(t, "baga6ea4seaqlhznlutptgfwhffupyer6txswamerq5fc2jlwf2lys2mm5jtiaeq", pi.PieceCID.String()) + */ + bigPiece := abi.PaddedPieceSize(16 << 20).Unpadded() + pi, err := miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(bigPiece)))) + require.NoError(t, err) + require.Equal(t, bigPiece.Padded(), pi.Size) + require.Equal(t, "baga6ea4seaqmhoxl2ybw5m2wyd3pt3h4zmp7j52yumzu2rar26twns3uocq7yfa", pi.PieceCID.String()) +} + func TestWinningPostWorker(t *testing.T) { prevIns := build.InsecurePoStValidation build.InsecurePoStValidation = false diff --git a/lib/rpcenc/reader.go b/lib/rpcenc/reader.go index 6693dc83d3d..61ae80ba938 100644 --- a/lib/rpcenc/reader.go +++ b/lib/rpcenc/reader.go @@ -211,6 +211,7 @@ type RpcReader struct { postBody io.ReadCloser // nil on initial head request next chan *RpcReader // on head will get us the postBody after sending resStart mustRedirect bool + eof bool res chan readRes beginOnce *sync.Once @@ -266,6 +267,10 @@ func (w *RpcReader) Read(p []byte) (int, error) { w.beginPost() }) + if w.eof { + return 0, io.EOF + } + if w.mustRedirect { return 0, ErrMustRedirect } @@ -276,6 +281,9 @@ func (w *RpcReader) Read(p []byte) (int, error) { n, err := w.postBody.Read(p) if err != nil { + if err == io.EOF { + w.eof = true + } w.closeOnce.Do(func() { close(w.res) }) diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 86e0ba0fcd0..f34761d891a 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -425,6 +425,10 @@ func (sm *StorageMinerAPI) ComputeWindowPoSt(ctx context.Context, dlIdx uint64, return sm.WdPoSt.ComputePoSt(ctx, dlIdx, ts) } +func (sm *StorageMinerAPI) ComputeDataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData sto.Data) (abi.PieceInfo, error) { + return sm.StorageMgr.DataCid(ctx, pieceSize, pieceData) +} + func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error { w, err := connectRemoteWorker(ctx, sm, url) if err != nil {