Skip to content

Commit

Permalink
support external pc2 in lotus-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Aug 21, 2023
1 parent 7d8eb93 commit 8b9c17a
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 41 deletions.
58 changes: 51 additions & 7 deletions cmd/lotus-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
Expand Down Expand Up @@ -282,7 +283,36 @@ var runCmd = &cli.Command{
Value: true,
DefaultText: "inherits --addpiece",
},
&cli.StringFlag{
Name: "external-pc2",
Usage: "command for computing PC2 externally",
},
},
Description: `Run lotus-worker.
--external-pc2 can be used to compute the PreCommit2 inputs externally.
The flag behaves similarly to the related lotus-worker flag, using it in
lotus-bench may be useful for testing if the external PreCommit2 command is
invoked correctly.
The command will be called with a number of environment variables set:
* EXTSEAL_PC2_SECTOR_NUM: the sector number
* EXTSEAL_PC2_SECTOR_MINER: the miner id
* EXTSEAL_PC2_PROOF_TYPE: the proof type
* EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes
* EXTSEAL_PC2_CACHE: the path to the cache directory
* EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller)
* EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json)
The command is expected to:
* Create cache sc-02-data-tree-r* files
* Create cache sc-02-data-tree-c* files
* Create cache p_aux / t_aux files
* Transform the sealed file in place
Example invocation of lotus-bench as external executor:
'./lotus-bench simple precommit2 --sector-size $EXTSEAL_PC2_SECTOR_SIZE $EXTSEAL_PC2_SEALED $EXTSEAL_PC2_CACHE $EXTSEAL_PC2_PC1OUT'
`,
Before: func(cctx *cli.Context) error {
if cctx.IsSet("address") {
log.Warnf("The '--address' flag is deprecated, it has been replaced by '--listen'")
Expand Down Expand Up @@ -587,18 +617,32 @@ var runCmd = &cli.Command{
fh.ServeHTTP(w, r)
}

// Parse ffi executor flags

var ffiOpts []ffiwrapper.FFIWrapperOpt

if cctx.IsSet("external-pc2") {
extSeal := ffiwrapper.ExternalSealer{
PreCommit2: ffiwrapper.MakeExternPrecommit2(cctx.String("external-pc2")),
}

ffiOpts = append(ffiOpts, ffiwrapper.WithExternalSealCalls(extSeal))
}

// Create / expose the worker

wsts := statestore.New(namespace.Wrap(ds, modules.WorkerCallsPrefix))

workerApi := &sealworker.Worker{
LocalWorker: sealer.NewLocalWorker(sealer.WorkerConfig{
TaskTypes: taskTypes,
NoSwap: cctx.Bool("no-swap"),
MaxParallelChallengeReads: cctx.Int("post-parallel-reads"),
ChallengeReadTimeout: cctx.Duration("post-read-timeout"),
Name: cctx.String("name"),
}, remote, localStore, nodeApi, nodeApi, wsts),
LocalWorker: sealer.NewLocalWorkerWithExecutor(
sealer.FFIExec(ffiOpts...),
sealer.WorkerConfig{
TaskTypes: taskTypes,
NoSwap: cctx.Bool("no-swap"),
MaxParallelChallengeReads: cctx.Int("post-parallel-reads"),
ChallengeReadTimeout: cctx.Duration("post-read-timeout"),
Name: cctx.String("name"),
}, os.LookupEnv, remote, localStore, nodeApi, nodeApi, wsts),
LocalStore: localStore,
Storage: lr,
}
Expand Down
28 changes: 28 additions & 0 deletions documentation/en/cli-lotus-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,33 @@ NAME:
USAGE:
lotus-worker run [command options] [arguments...]
DESCRIPTION:
Run lotus-worker.
--external-pc2 can be used to compute the PreCommit2 inputs externally.
The flag behaves similarly to the related lotus-worker flag, using it in
lotus-bench may be useful for testing if the external PreCommit2 command is
invoked correctly.
The command will be called with a number of environment variables set:
* EXTSEAL_PC2_SECTOR_NUM: the sector number
* EXTSEAL_PC2_SECTOR_MINER: the miner id
* EXTSEAL_PC2_PROOF_TYPE: the proof type
* EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes
* EXTSEAL_PC2_CACHE: the path to the cache directory
* EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller)
* EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json)
The command is expected to:
* Create cache sc-02-data-tree-r* files
* Create cache sc-02-data-tree-c* files
* Create cache p_aux / t_aux files
* Transform the sealed file in place
Example invocation of lotus-bench as external executor:
'./lotus-bench simple precommit2 --sector-size $EXTSEAL_PC2_SECTOR_SIZE $EXTSEAL_PC2_SEALED $EXTSEAL_PC2_CACHE $EXTSEAL_PC2_PC1OUT'
OPTIONS:
--listen value host address and port the worker api will listen on (default: "0.0.0.0:3456") [$LOTUS_WORKER_LISTEN]
--no-local-storage don't use storageminer repo for sector storage (default: false) [$LOTUS_WORKER_NO_LOCAL_STORAGE]
Expand All @@ -57,6 +84,7 @@ OPTIONS:
--timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m") [$LOTUS_WORKER_TIMEOUT]
--http-server-timeout value (default: "30s")
--data-cid Run the data-cid task. true|false (default: inherits --addpiece)
--external-pc2 value command for computing PC2 externally
--help, -h show help
```

Expand Down
7 changes: 4 additions & 3 deletions storage/sealer/ffiwrapper/extern_pc2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"encoding/base64"
"fmt"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/lotus/storage/sealer/commitment"
"golang.org/x/xerrors"
"os"
"os/exec"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

commcid "github.com/filecoin-project/go-fil-commcid"

"github.com/filecoin-project/lotus/storage/sealer/commitment"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

Expand Down
4 changes: 3 additions & 1 deletion storage/sealer/ffiwrapper/sealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package ffiwrapper

import (
"context"
"github.com/filecoin-project/lotus/storage/sealer/storiface"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"

"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

var log = logging.Logger("ffiwrapper")
Expand Down
10 changes: 5 additions & 5 deletions storage/sealer/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ func TestRestartWorker(t *testing.T) {
wds := syncds.MutexWrap(datastore.NewMapDatastore())

arch := make(chan chan apres)
w := newLocalWorker(func() (storiface.Storage, error) {
w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
Expand Down Expand Up @@ -672,7 +672,7 @@ func TestRestartWorker(t *testing.T) {
}

// restart the worker
w = newLocalWorker(func() (storiface.Storage, error) {
w = NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
Expand Down Expand Up @@ -708,7 +708,7 @@ func TestReenableWorker(t *testing.T) {
wds := datastore.NewMapDatastore()

arch := make(chan chan apres)
w := newLocalWorker(func() (storiface.Storage, error) {
w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
Expand Down Expand Up @@ -781,7 +781,7 @@ func TestResUse(t *testing.T) {
wds := syncds.MutexWrap(datastore.NewMapDatastore())

arch := make(chan chan apres)
w := newLocalWorker(func() (storiface.Storage, error) {
w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
Expand Down Expand Up @@ -839,7 +839,7 @@ func TestResOverride(t *testing.T) {
wds := syncds.MutexWrap(datastore.NewMapDatastore())

arch := make(chan chan apres)
w := newLocalWorker(func() (storiface.Storage, error) {
w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
Expand Down
2 changes: 1 addition & 1 deletion storage/sealer/piece_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtas
dstore := ds_sync.MutexWrap(datastore.NewMapDatastore())
csts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))

worker := newLocalWorker(nil, WorkerConfig{
worker := NewLocalWorkerWithExecutor(nil, WorkerConfig{
TaskTypes: tasks,
}, os.LookupEnv, remote, localStore, p.index, p.mgr, csts)

Expand Down
Loading

0 comments on commit 8b9c17a

Please sign in to comment.