Skip to content

Commit

Permalink
support external pc2 in lotus-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Nov 14, 2023
1 parent fac122c commit 4dd5886
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 49 deletions.
58 changes: 51 additions & 7 deletions cmd/lotus-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
Expand Down Expand Up @@ -284,7 +285,36 @@ var runCmd = &cli.Command{
Value: true,
DefaultText: "inherits --addpiece",
},
&cli.StringFlag{
Name: "external-pc2",
Usage: "command for computing PC2 externally",
},
},
Description: `Run lotus-worker.
--external-pc2 can be used to compute the PreCommit2 inputs externally.
The flag behaves similarly to the related lotus-worker flag, using it in
lotus-bench may be useful for testing if the external PreCommit2 command is
invoked correctly.
The command will be called with a number of environment variables set:
* EXTSEAL_PC2_SECTOR_NUM: the sector number
* EXTSEAL_PC2_SECTOR_MINER: the miner id
* EXTSEAL_PC2_PROOF_TYPE: the proof type
* EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes
* EXTSEAL_PC2_CACHE: the path to the cache directory
* EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller)
* EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json)
The command is expected to:
* Create cache sc-02-data-tree-r* files
* Create cache sc-02-data-tree-c* files
* Create cache p_aux / t_aux files
* Transform the sealed file in place
Example invocation of lotus-bench as external executor:
'./lotus-bench simple precommit2 --sector-size $EXTSEAL_PC2_SECTOR_SIZE $EXTSEAL_PC2_SEALED $EXTSEAL_PC2_CACHE $EXTSEAL_PC2_PC1OUT'
`,
Before: func(cctx *cli.Context) error {
if cctx.IsSet("address") {
log.Warnf("The '--address' flag is deprecated, it has been replaced by '--listen'")
Expand Down Expand Up @@ -623,18 +653,32 @@ var runCmd = &cli.Command{
fh.ServeHTTP(w, r)
}

// Parse ffi executor flags

var ffiOpts []ffiwrapper.FFIWrapperOpt

if cctx.IsSet("external-pc2") {
extSeal := ffiwrapper.ExternalSealer{
PreCommit2: ffiwrapper.MakeExternPrecommit2(cctx.String("external-pc2")),
}

ffiOpts = append(ffiOpts, ffiwrapper.WithExternalSealCalls(extSeal))
}

// Create / expose the worker

wsts := statestore.New(namespace.Wrap(ds, modules.WorkerCallsPrefix))

workerApi := &sealworker.Worker{
LocalWorker: sealer.NewLocalWorker(sealer.WorkerConfig{
TaskTypes: taskTypes,
NoSwap: cctx.Bool("no-swap"),
MaxParallelChallengeReads: cctx.Int("post-parallel-reads"),
ChallengeReadTimeout: cctx.Duration("post-read-timeout"),
Name: cctx.String("name"),
}, remote, localStore, nodeApi, nodeApi, wsts),
LocalWorker: sealer.NewLocalWorkerWithExecutor(
sealer.FFIExec(ffiOpts...),
sealer.WorkerConfig{
TaskTypes: taskTypes,
NoSwap: cctx.Bool("no-swap"),
MaxParallelChallengeReads: cctx.Int("post-parallel-reads"),
ChallengeReadTimeout: cctx.Duration("post-read-timeout"),
Name: cctx.String("name"),
}, os.LookupEnv, remote, localStore, nodeApi, nodeApi, wsts),
LocalStore: localStore,
Storage: lr,
}
Expand Down
28 changes: 28 additions & 0 deletions documentation/en/cli-lotus-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,33 @@ NAME:
USAGE:
lotus-worker run [command options] [arguments...]
DESCRIPTION:
Run lotus-worker.
--external-pc2 can be used to compute the PreCommit2 inputs externally.
The flag behaves similarly to the related lotus-worker flag, using it in
lotus-bench may be useful for testing if the external PreCommit2 command is
invoked correctly.
The command will be called with a number of environment variables set:
* EXTSEAL_PC2_SECTOR_NUM: the sector number
* EXTSEAL_PC2_SECTOR_MINER: the miner id
* EXTSEAL_PC2_PROOF_TYPE: the proof type
* EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes
* EXTSEAL_PC2_CACHE: the path to the cache directory
* EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller)
* EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json)
The command is expected to:
* Create cache sc-02-data-tree-r* files
* Create cache sc-02-data-tree-c* files
* Create cache p_aux / t_aux files
* Transform the sealed file in place
Example invocation of lotus-bench as external executor:
'./lotus-bench simple precommit2 --sector-size $EXTSEAL_PC2_SECTOR_SIZE $EXTSEAL_PC2_SEALED $EXTSEAL_PC2_CACHE $EXTSEAL_PC2_PC1OUT'
OPTIONS:
--listen value host address and port the worker api will listen on (default: "0.0.0.0:3456") [$LOTUS_WORKER_LISTEN]
--no-local-storage don't use storageminer repo for sector storage (default: false) [$LOTUS_WORKER_NO_LOCAL_STORAGE]
Expand All @@ -57,6 +84,7 @@ OPTIONS:
--timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m") [$LOTUS_WORKER_TIMEOUT]
--http-server-timeout value (default: "30s")
--data-cid Run the data-cid task. true|false (default: inherits --addpiece)
--external-pc2 value command for computing PC2 externally
--help, -h show help
```

Expand Down
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ require (
github.com/ipni/index-provider v0.12.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/koalacxr/quantile v0.0.1
github.com/kr/pretty v0.3.1
github.com/libp2p/go-buffer-pool v0.1.0
github.com/libp2p/go-libp2p v0.31.0
github.com/libp2p/go-libp2p-consensus v0.0.1
Expand Down Expand Up @@ -260,7 +259,6 @@ require (
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
Expand Down Expand Up @@ -303,7 +301,6 @@ require (
github.com/quic-go/quic-go v0.38.1 // indirect
github.com/quic-go/webtransport-go v0.5.3 // indirect
github.com/rivo/uniseg v0.1.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v2.18.12+incompatible // indirect
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,6 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
Expand Down Expand Up @@ -1388,7 +1387,6 @@ github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9/go.mod h1:x3N5drFsm2uilKKuuYo6LdyD8vZAW55sH/9w+pbo1sw=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -1478,7 +1476,6 @@ github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
Expand Down
7 changes: 4 additions & 3 deletions storage/sealer/ffiwrapper/extern_pc2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"encoding/base64"
"fmt"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/lotus/storage/sealer/commitment"
"golang.org/x/xerrors"
"os"
"os/exec"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

commcid "github.com/filecoin-project/go-fil-commcid"

"github.com/filecoin-project/lotus/storage/sealer/commitment"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

Expand Down
4 changes: 3 additions & 1 deletion storage/sealer/ffiwrapper/sealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package ffiwrapper

import (
"context"
"github.com/filecoin-project/lotus/storage/sealer/storiface"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"

"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

var log = logging.Logger("ffiwrapper")
Expand Down
10 changes: 5 additions & 5 deletions storage/sealer/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ func TestRestartWorker(t *testing.T) {
wds := syncds.MutexWrap(datastore.NewMapDatastore())

arch := make(chan chan apres)
w := newLocalWorker(func() (storiface.Storage, error) {
w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
Expand Down Expand Up @@ -685,7 +685,7 @@ func TestRestartWorker(t *testing.T) {
}

// restart the worker
w = newLocalWorker(func() (storiface.Storage, error) {
w = NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
Expand Down Expand Up @@ -721,7 +721,7 @@ func TestReenableWorker(t *testing.T) {
wds := datastore.NewMapDatastore()

arch := make(chan chan apres)
w := newLocalWorker(func() (storiface.Storage, error) {
w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
Expand Down Expand Up @@ -794,7 +794,7 @@ func TestResUse(t *testing.T) {
wds := syncds.MutexWrap(datastore.NewMapDatastore())

arch := make(chan chan apres)
w := newLocalWorker(func() (storiface.Storage, error) {
w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
Expand Down Expand Up @@ -852,7 +852,7 @@ func TestResOverride(t *testing.T) {
wds := syncds.MutexWrap(datastore.NewMapDatastore())

arch := make(chan chan apres)
w := newLocalWorker(func() (storiface.Storage, error) {
w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
Expand Down
2 changes: 1 addition & 1 deletion storage/sealer/piece_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtas
dstore := ds_sync.MutexWrap(datastore.NewMapDatastore())
csts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))

worker := newLocalWorker(nil, WorkerConfig{
worker := NewLocalWorkerWithExecutor(nil, WorkerConfig{
TaskTypes: tasks,
}, os.LookupEnv, remote, localStore, p.index, p.mgr, csts)

Expand Down
Loading

0 comments on commit 4dd5886

Please sign in to comment.