diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index d23b3860a0a..b15e728b1a2 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -37,6 +37,7 @@ import ( "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer" + "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" "github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -282,7 +283,36 @@ var runCmd = &cli.Command{ Value: true, DefaultText: "inherits --addpiece", }, + &cli.StringFlag{ + Name: "external-pc2", + Usage: "command for computing PC2 externally", + }, }, + Description: `Run lotus-worker. + +--external-pc2 can be used to compute the PreCommit2 inputs externally. +The flag behaves similarly to the related lotus-worker flag, using it in +lotus-bench may be useful for testing if the external PreCommit2 command is +invoked correctly. + +The command will be called with a number of environment variables set: +* EXTSEAL_PC2_SECTOR_NUM: the sector number +* EXTSEAL_PC2_SECTOR_MINER: the miner id +* EXTSEAL_PC2_PROOF_TYPE: the proof type +* EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes +* EXTSEAL_PC2_CACHE: the path to the cache directory +* EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller) +* EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json) + +The command is expected to: +* Create cache sc-02-data-tree-r* files +* Create cache sc-02-data-tree-c* files +* Create cache p_aux / t_aux files +* Transform the sealed file in place + +Example invocation of lotus-bench as external executor: +'./lotus-bench simple precommit2 --sector-size $EXTSEAL_PC2_SECTOR_SIZE $EXTSEAL_PC2_SEALED $EXTSEAL_PC2_CACHE $EXTSEAL_PC2_PC1OUT' +`, Before: func(cctx *cli.Context) error { if cctx.IsSet("address") { log.Warnf("The '--address' flag is deprecated, it has been replaced by '--listen'") @@ -587,18 +617,32 @@ var runCmd = &cli.Command{ fh.ServeHTTP(w, r) } + // Parse ffi executor flags + + var ffiOpts []ffiwrapper.FFIWrapperOpt + + if cctx.IsSet("external-pc2") { + extSeal := ffiwrapper.ExternalSealer{ + PreCommit2: ffiwrapper.MakeExternPrecommit2(cctx.String("external-pc2")), + } + + ffiOpts = append(ffiOpts, ffiwrapper.WithExternalSealCalls(extSeal)) + } + // Create / expose the worker wsts := statestore.New(namespace.Wrap(ds, modules.WorkerCallsPrefix)) workerApi := &sealworker.Worker{ - LocalWorker: sealer.NewLocalWorker(sealer.WorkerConfig{ - TaskTypes: taskTypes, - NoSwap: cctx.Bool("no-swap"), - MaxParallelChallengeReads: cctx.Int("post-parallel-reads"), - ChallengeReadTimeout: cctx.Duration("post-read-timeout"), - Name: cctx.String("name"), - }, remote, localStore, nodeApi, nodeApi, wsts), + LocalWorker: sealer.NewLocalWorkerWithExecutor( + sealer.FFIExec(ffiOpts...), + sealer.WorkerConfig{ + TaskTypes: taskTypes, + NoSwap: cctx.Bool("no-swap"), + MaxParallelChallengeReads: cctx.Int("post-parallel-reads"), + ChallengeReadTimeout: cctx.Duration("post-read-timeout"), + Name: cctx.String("name"), + }, os.LookupEnv, remote, localStore, nodeApi, nodeApi, wsts), LocalStore: localStore, Storage: lr, } diff --git a/documentation/en/cli-lotus-worker.md b/documentation/en/cli-lotus-worker.md index deff5957124..e20f0da4ccf 100644 --- a/documentation/en/cli-lotus-worker.md +++ b/documentation/en/cli-lotus-worker.md @@ -34,6 +34,33 @@ NAME: USAGE: lotus-worker run [command options] [arguments...] +DESCRIPTION: + Run lotus-worker. + + --external-pc2 can be used to compute the PreCommit2 inputs externally. + The flag behaves similarly to the related lotus-worker flag, using it in + lotus-bench may be useful for testing if the external PreCommit2 command is + invoked correctly. + + The command will be called with a number of environment variables set: + * EXTSEAL_PC2_SECTOR_NUM: the sector number + * EXTSEAL_PC2_SECTOR_MINER: the miner id + * EXTSEAL_PC2_PROOF_TYPE: the proof type + * EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes + * EXTSEAL_PC2_CACHE: the path to the cache directory + * EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller) + * EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json) + + The command is expected to: + * Create cache sc-02-data-tree-r* files + * Create cache sc-02-data-tree-c* files + * Create cache p_aux / t_aux files + * Transform the sealed file in place + + Example invocation of lotus-bench as external executor: + './lotus-bench simple precommit2 --sector-size $EXTSEAL_PC2_SECTOR_SIZE $EXTSEAL_PC2_SEALED $EXTSEAL_PC2_CACHE $EXTSEAL_PC2_PC1OUT' + + OPTIONS: --listen value host address and port the worker api will listen on (default: "0.0.0.0:3456") [$LOTUS_WORKER_LISTEN] --no-local-storage don't use storageminer repo for sector storage (default: false) [$LOTUS_WORKER_NO_LOCAL_STORAGE] @@ -57,6 +84,7 @@ OPTIONS: --timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m") [$LOTUS_WORKER_TIMEOUT] --http-server-timeout value (default: "30s") --data-cid Run the data-cid task. true|false (default: inherits --addpiece) + --external-pc2 value command for computing PC2 externally --help, -h show help ``` diff --git a/storage/sealer/ffiwrapper/extern_pc2.go b/storage/sealer/ffiwrapper/extern_pc2.go index fc6873188cf..de9246efcb0 100644 --- a/storage/sealer/ffiwrapper/extern_pc2.go +++ b/storage/sealer/ffiwrapper/extern_pc2.go @@ -4,14 +4,15 @@ import ( "context" "encoding/base64" "fmt" - commcid "github.com/filecoin-project/go-fil-commcid" - "github.com/filecoin-project/lotus/storage/sealer/commitment" - "golang.org/x/xerrors" "os" "os/exec" "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + commcid "github.com/filecoin-project/go-fil-commcid" + + "github.com/filecoin-project/lotus/storage/sealer/commitment" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) diff --git a/storage/sealer/ffiwrapper/sealer.go b/storage/sealer/ffiwrapper/sealer.go index 87311e1fe88..00374ddf5d7 100644 --- a/storage/sealer/ffiwrapper/sealer.go +++ b/storage/sealer/ffiwrapper/sealer.go @@ -2,9 +2,11 @@ package ffiwrapper import ( "context" - "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/lotus/storage/sealer/storiface" ) var log = logging.Logger("ffiwrapper") diff --git a/storage/sealer/manager_test.go b/storage/sealer/manager_test.go index 8acd474a391..d234d75f4db 100644 --- a/storage/sealer/manager_test.go +++ b/storage/sealer/manager_test.go @@ -635,7 +635,7 @@ func TestRestartWorker(t *testing.T) { wds := syncds.MutexWrap(datastore.NewMapDatastore()) arch := make(chan chan apres) - w := newLocalWorker(func() (storiface.Storage, error) { + w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) { return &testExec{apch: arch}, nil }, WorkerConfig{ TaskTypes: localTasks, @@ -672,7 +672,7 @@ func TestRestartWorker(t *testing.T) { } // restart the worker - w = newLocalWorker(func() (storiface.Storage, error) { + w = NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) { return &testExec{apch: arch}, nil }, WorkerConfig{ TaskTypes: localTasks, @@ -708,7 +708,7 @@ func TestReenableWorker(t *testing.T) { wds := datastore.NewMapDatastore() arch := make(chan chan apres) - w := newLocalWorker(func() (storiface.Storage, error) { + w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) { return &testExec{apch: arch}, nil }, WorkerConfig{ TaskTypes: localTasks, @@ -781,7 +781,7 @@ func TestResUse(t *testing.T) { wds := syncds.MutexWrap(datastore.NewMapDatastore()) arch := make(chan chan apres) - w := newLocalWorker(func() (storiface.Storage, error) { + w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) { return &testExec{apch: arch}, nil }, WorkerConfig{ TaskTypes: localTasks, @@ -839,7 +839,7 @@ func TestResOverride(t *testing.T) { wds := syncds.MutexWrap(datastore.NewMapDatastore()) arch := make(chan chan apres) - w := newLocalWorker(func() (storiface.Storage, error) { + w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) { return &testExec{apch: arch}, nil }, WorkerConfig{ TaskTypes: localTasks, diff --git a/storage/sealer/piece_provider_test.go b/storage/sealer/piece_provider_test.go index 4cbc79a93ed..588ccea9e57 100644 --- a/storage/sealer/piece_provider_test.go +++ b/storage/sealer/piece_provider_test.go @@ -286,7 +286,7 @@ func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtas dstore := ds_sync.MutexWrap(datastore.NewMapDatastore()) csts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls"))) - worker := newLocalWorker(nil, WorkerConfig{ + worker := NewLocalWorkerWithExecutor(nil, WorkerConfig{ TaskTypes: tasks, }, os.LookupEnv, remote, localStore, p.index, p.mgr, csts) diff --git a/storage/sealer/worker_local.go b/storage/sealer/worker_local.go index 24b9ff2478e..2e1a9679de4 100644 --- a/storage/sealer/worker_local.go +++ b/storage/sealer/worker_local.go @@ -47,7 +47,7 @@ type WorkerConfig struct { } // used do provide custom proofs impl (mostly used in testing) -type ExecutorFunc func() (storiface.Storage, error) +type ExecutorFunc func(w *LocalWorker) (storiface.Storage, error) type EnvFunc func(string) (string, bool) type LocalWorker struct { @@ -77,7 +77,7 @@ type LocalWorker struct { closing chan struct{} } -func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, store paths.Store, local *paths.Local, sindex paths.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker { +func NewLocalWorkerWithExecutor(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, store paths.Store, local *paths.Local, sindex paths.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker { acceptTasks := map[sealtasks.TaskType]struct{}{} for _, taskType := range wcfg.TaskTypes { acceptTasks[taskType] = struct{}{} @@ -116,7 +116,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, } if w.executor == nil { - w.executor = w.ffiExec + w.executor = FFIExec() } unfinished, err := w.ct.unfinished() @@ -143,7 +143,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, } func NewLocalWorker(wcfg WorkerConfig, store paths.Store, local *paths.Local, sindex paths.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker { - return newLocalWorker(nil, wcfg, os.LookupEnv, store, local, sindex, ret, cst) + return NewLocalWorkerWithExecutor(nil, wcfg, os.LookupEnv, store, local, sindex, ret, cst) } type localWorkerPathProvider struct { @@ -180,8 +180,10 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor }, nil } -func (l *LocalWorker) ffiExec() (storiface.Storage, error) { - return ffiwrapper.New(&localWorkerPathProvider{w: l}) +func FFIExec(opts ...ffiwrapper.FFIWrapperOpt) func(l *LocalWorker) (storiface.Storage, error) { + return func(l *LocalWorker) (storiface.Storage, error) { + return ffiwrapper.New(&localWorkerPathProvider{w: l}, opts...) + } } type ReturnType string @@ -339,7 +341,7 @@ func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret stori } func (l *LocalWorker) NewSector(ctx context.Context, sector storiface.SectorRef) error { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return err } @@ -348,7 +350,7 @@ func (l *LocalWorker) NewSector(ctx context.Context, sector storiface.SectorRef) } func (l *LocalWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -359,7 +361,7 @@ func (l *LocalWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSi } func (l *LocalWorker) AddPiece(ctx context.Context, sector storiface.SectorRef, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -394,7 +396,7 @@ func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector storiface.Secto } } - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return nil, err } @@ -404,7 +406,7 @@ func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector storiface.Secto } func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storiface.SectorRef, phase1Out storiface.PreCommit1Out) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -415,7 +417,7 @@ func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storiface.Secto } func (l *LocalWorker) SealCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storiface.SectorCids) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -426,7 +428,7 @@ func (l *LocalWorker) SealCommit1(ctx context.Context, sector storiface.SectorRe } func (l *LocalWorker) SealCommit2(ctx context.Context, sector storiface.SectorRef, phase1Out storiface.Commit1Out) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -437,7 +439,7 @@ func (l *LocalWorker) SealCommit2(ctx context.Context, sector storiface.SectorRe } func (l *LocalWorker) ReplicaUpdate(ctx context.Context, sector storiface.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -449,7 +451,7 @@ func (l *LocalWorker) ReplicaUpdate(ctx context.Context, sector storiface.Sector } func (l *LocalWorker) ProveReplicaUpdate1(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -460,7 +462,7 @@ func (l *LocalWorker) ProveReplicaUpdate1(ctx context.Context, sector storiface. } func (l *LocalWorker) ProveReplicaUpdate2(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storiface.ReplicaVanillaProofs) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -471,7 +473,7 @@ func (l *LocalWorker) ProveReplicaUpdate2(ctx context.Context, sector storiface. } func (l *LocalWorker) GenerateSectorKeyFromData(ctx context.Context, sector storiface.SectorRef, commD cid.Cid) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -482,7 +484,7 @@ func (l *LocalWorker) GenerateSectorKeyFromData(ctx context.Context, sector stor } func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -493,7 +495,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storiface.Secto } func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -504,7 +506,7 @@ func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storifac } func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -560,7 +562,7 @@ func (l *LocalWorker) MoveStorage(ctx context.Context, sector storiface.SectorRe } func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storiface.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -586,7 +588,7 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storiface.SectorRe } func (l *LocalWorker) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -597,7 +599,7 @@ func (l *LocalWorker) DownloadSectorData(ctx context.Context, sector storiface.S } func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return nil, err } @@ -642,7 +644,7 @@ func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.Registere } func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.WindowPoStResult{}, err }