diff --git a/.circleci/config.yml b/.circleci/config.yml index 1614daf8e6a..f0262e5b6ea 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -850,6 +850,11 @@ workflows: suite: itest-get_messages_in_ts target: "./itests/get_messages_in_ts_test.go" + - test: + name: test-itest-mempool + suite: itest-mempool + target: "./itests/mempool_test.go" + - test: name: test-itest-multisig suite: itest-multisig diff --git a/CHANGELOG.md b/CHANGELOG.md index a420421de89..170387046e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,77 @@ # Lotus changelog +# 1.14.1 / 2022-02-18 + +This is an **optional** release of lotus, that fixes the incorrect *comment* of network v15 OhSnap upgrade **date**. Note the actual upgrade epoch in [v1.14.0](https://github.com/filecoin-project/lotus/releases/tag/v1.14.0) was correct. + +# 1.14.0 / 2022-02-17 + +This is a MANDATORY release of Lotus that introduces [Filecoin network v15, +codenamed the OhSnap upgrade](https://github.com/filecoin-project/community/discussions/74?sort=new#discussioncomment-1922550). + +The network is scheduled to upgrade to v15 on March 1st at 2022-03-01T15:00:00Z. All node operators, including storage providers, must upgrade to this release (or a later release) before that time. Storage providers must update their daemons, miners, and worker(s). + +The OhSnap upgrade introduces the following FIPs, delivered in [actors v7](https://github.com/filecoin-project/specs-actors/releases/tag/v7.0.0): +- [FIP-0019 Snap Deals](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0019.md) +- [FIP-0028 Remove Datacap from Verified clients](https://github.com/filecoin-project/FIPs/pull/226) + +It is recommended that storage providers download the new params before updating their node, miner, and workers. To do so: + +- Download Lotus v1.14.0 or later +- run `make lotus-shed` +- run `./lotus-shed fetch-params` with the appropriate `proving-params` flag +- Upgrade the Lotus daemon and miner **when the previous step is complete** + +All node operators, including storage providers, should be aware that a pre-migration will begin at 2022-03-01T13:30:00Z (150 minutes before the real upgrade). The pre-migration will take between 20 and 50 minutes, depending on hardware specs. During this time, expect slower block validation times, increased CPU and memory usage, and longer delays for API queries. + +## New Features and Changes +- Integrate actor v7-rc1: + - Integrate v7 actors ([#7617](https://github.com/filecoin-project/lotus/pull/7617)) + - feat: state: Fast migration for v15 ([#7933](https://github.com/filecoin-project/lotus/pull/7933)) + - fix: blockstore: Add missing locks to autobatch::Get() [#7939](https://github.com/filecoin-project/lotus/pull/7939)) + - correctness fixes for the autobatch blockstore ([#7940](https://github.com/filecoin-project/lotus/pull/7940)) +- Implement and support [FIP-0019 Snap Deals](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0019.md) + - chore: deps: Integrate proof v11.0.0 ([#7923](https://github.com/filecoin-project/lotus/pull/7923)) + - Snap Deals Lotus Integration: FSM Posting and integration test ([#7810](https://github.com/filecoin-project/lotus/pull/7810)) + - Feat/sector storage unseal ([#7730](https://github.com/filecoin-project/lotus/pull/7730)) + - Feat/snap deals storage ([#7615](https://github.com/filecoin-project/lotus/pull/7615)) + - fix: sealing: Add more deal expiration checks during PRU pipeline ([#7871](https://github.com/filecoin-project/lotus/pull/7871)) + - chore: deps: Update go-paramfetch ([#7917](https://github.com/filecoin-project/lotus/pull/7917)) + - feat: #7880 gas: add gas charge for VerifyReplicaUpdate ([#7897](https://github.com/filecoin-project/lotus/pull/7897)) + - enhancement: sectors: disable existing cc upgrade path 2 days before the upgrade epoch ([#7900](https://github.com/filecoin-project/lotus/pull/7900)) + +## Improvements +- updating to new datastore/blockstore code with contexts ([#7646](https://github.com/filecoin-project/lotus/pull/7646)) +- reorder transfer checks so as to ensure sending 2B FIL to yourself fails if you don't have that amount ([#7637](https://github.com/filecoin-project/lotus/pull/7637)) +- VM: Circ supply should be constant per epoch ([#7811](https://github.com/filecoin-project/lotus/pull/7811)) + +## Bug Fixes +- Fix: state: circsuypply calc around null blocks ([#7890](https://github.com/filecoin-project/lotus/pull/7890)) +- Mempool msg selection should respect block message limits ([#7321](https://github.com/filecoin-project/lotus/pull/7321)) + SplitStore: supress compaction near upgrades ([#7734](https://github.com/filecoin-project/lotus/pull/7734)) + +## Others +- chore: create pull_request_template.md ([#7726](https://github.com/filecoin-project/lotus/pull/7726)) + +## Contributors + +| Contributor | Commits | Lines ± | Files Changed | +|-------------|---------|---------|---------------| +| Aayush Rajasekaran | 41 | +5538/-1205 | 189 | +| zenground0 | 11 | +3316/-524 | 124 | +| Jennifer Wang | 29 | +714/-599 | 68 | +| ZenGround0 | 3 | +263/-25 | 11 | +| c r | 2 | +198/-30 | 6 | +| vyzo | 4 | +189/-7 | 7 | +| Aayush | 11 | +146/-48 | 49 | +| web3-bot | 10 | +99/-17 | 10 | +| Steven Allen | 1 | +55/-37 | 1 | +| Jiaying Wang | 5 | +30/-8 | 5 | +| Jakub Sztandera | 2 | +8/-3 | 3 | +| Łukasz Magiera | 1 | +3/-3 | 2 | +| Travis Person | 1 | +2/-2 | 2 | +| Rod Vagg | 1 | +2/-2 | 2 | + # v1.13.2 / 2022-01-09 Lotus v1.13.2 is a *highly recommended* feature release with remarkable retrieval improvements, new features like diff --git a/Makefile b/Makefile index f7b13cc188c..f91e74e3356 100644 --- a/Makefile +++ b/Makefile @@ -345,6 +345,8 @@ gen: actors-gen type-gen method-gen cfgdoc-gen docsgen api-gen circleci @echo ">>> IF YOU'VE MODIFIED THE CLI OR CONFIG, REMEMBER TO ALSO MAKE docsgen-cli" .PHONY: gen +jen: gen + snap: lotus lotus-miner lotus-worker snapcraft # snapcraft upload ./lotus_*.snap diff --git a/api/api_gateway.go b/api/api_gateway.go index fbe2e0cd66e..be4b3b83cb4 100644 --- a/api/api_gateway.go +++ b/api/api_gateway.go @@ -45,8 +45,9 @@ type Gateway interface { GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) - MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) MsigGetPending(context.Context, address.Address, types.TipSetKey) ([]*MsigTransaction, error) + MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) + MsigGetVestingSchedule(ctx context.Context, addr address.Address, tsk types.TipSetKey) (MsigVesting, error) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) StateDealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (DealCollateralBounds, error) StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 15866e3e50b..73aa2c774e5 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -516,6 +516,8 @@ type GatewayStruct struct { MsigGetVested func(p0 context.Context, p1 address.Address, p2 types.TipSetKey, p3 types.TipSetKey) (types.BigInt, error) `` + MsigGetVestingSchedule func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (MsigVesting, error) `` + StateAccountKey func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) `` StateDealProviderCollateralBounds func(p0 context.Context, p1 abi.PaddedPieceSize, p2 bool, p3 types.TipSetKey) (DealCollateralBounds, error) `` @@ -3285,6 +3287,17 @@ func (s *GatewayStub) MsigGetVested(p0 context.Context, p1 address.Address, p2 t return *new(types.BigInt), ErrNotSupported } +func (s *GatewayStruct) MsigGetVestingSchedule(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (MsigVesting, error) { + if s.Internal.MsigGetVestingSchedule == nil { + return *new(MsigVesting), ErrNotSupported + } + return s.Internal.MsigGetVestingSchedule(p0, p1, p2) +} + +func (s *GatewayStub) MsigGetVestingSchedule(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (MsigVesting, error) { + return *new(MsigVesting), ErrNotSupported +} + func (s *GatewayStruct) StateAccountKey(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) { if s.Internal.StateAccountKey == nil { return *new(address.Address), ErrNotSupported diff --git a/blockstore/context.go b/blockstore/context.go new file mode 100644 index 00000000000..ebb6fafe337 --- /dev/null +++ b/blockstore/context.go @@ -0,0 +1,21 @@ +package blockstore + +import ( + "context" +) + +type hotViewKey struct{} + +var hotView = hotViewKey{} + +// WithHotView constructs a new context with an option that provides a hint to the blockstore +// (e.g. the splitstore) that the object (and its ipld references) should be kept hot. +func WithHotView(ctx context.Context) context.Context { + return context.WithValue(ctx, hotView, struct{}{}) +} + +// IsHotView returns true if the hot view option is set in the context +func IsHotView(ctx context.Context) bool { + v := ctx.Value(hotView) + return v != nil +} diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 6a65e01df36..a351df76a46 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -161,6 +161,13 @@ type SplitStore struct { txnSyncCond sync.Cond txnSync bool + // background cold object reification + reifyWorkers sync.WaitGroup + reifyMx sync.Mutex + reifyCond sync.Cond + reifyPend map[cid.Cid]struct{} + reifyInProgress map[cid.Cid]struct{} + // registered protectors protectors []func(func(cid.Cid) error) error } @@ -202,6 +209,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co ss.txnSyncCond.L = &ss.txnSyncMx ss.ctx, ss.cancel = context.WithCancel(context.Background()) + ss.reifyCond.L = &ss.reifyMx + ss.reifyPend = make(map[cid.Cid]struct{}) + ss.reifyInProgress = make(map[cid.Cid]struct{}) + if enableDebugLog { ss.debug, err = openDebugLog(path) if err != nil { @@ -264,7 +275,13 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) { return true, nil } - return s.cold.Has(ctx, cid) + has, err = s.cold.Has(ctx, cid) + if has && bstore.IsHotView(ctx) { + s.reifyColdObject(cid) + } + + return has, err + } func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { @@ -308,8 +325,11 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) blk, err = s.cold.Get(ctx, cid) if err == nil { - stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) + if bstore.IsHotView(ctx) { + s.reifyColdObject(cid) + } + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) } return blk, err @@ -359,6 +379,10 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { size, err = s.cold.GetSize(ctx, cid) if err == nil { + if bstore.IsHotView(ctx) { + s.reifyColdObject(cid) + } + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) } return size, err @@ -536,6 +560,10 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro err = s.cold.View(ctx, cid, cb) if err == nil { + if bstore.IsHotView(ctx) { + s.reifyColdObject(cid) + } + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) } return err @@ -645,6 +673,9 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error } } + // spawn the reifier + go s.reifyOrchestrator() + // watch the chain chain.SubscribeHeadChanges(s.HeadChange) @@ -676,6 +707,8 @@ func (s *SplitStore) Close() error { } } + s.reifyCond.Broadcast() + s.reifyWorkers.Wait() s.cancel() return multierr.Combine(s.markSetEnv.Close(), s.debug.Close()) } diff --git a/blockstore/splitstore/splitstore_reify.go b/blockstore/splitstore/splitstore_reify.go new file mode 100644 index 00000000000..85c4fa2894e --- /dev/null +++ b/blockstore/splitstore/splitstore_reify.go @@ -0,0 +1,214 @@ +package splitstore + +import ( + "errors" + "runtime" + "sync/atomic" + + "golang.org/x/xerrors" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" +) + +var ( + errReifyLimit = errors.New("reification limit reached") + ReifyLimit = 16384 +) + +func (s *SplitStore) reifyColdObject(c cid.Cid) { + if !s.isWarm() { + return + } + + if isUnitaryObject(c) { + return + } + + s.reifyMx.Lock() + defer s.reifyMx.Unlock() + + _, ok := s.reifyInProgress[c] + if ok { + return + } + + s.reifyPend[c] = struct{}{} + s.reifyCond.Broadcast() +} + +func (s *SplitStore) reifyOrchestrator() { + workers := runtime.NumCPU() / 4 + if workers < 2 { + workers = 2 + } + + workch := make(chan cid.Cid, workers) + defer close(workch) + + for i := 0; i < workers; i++ { + s.reifyWorkers.Add(1) + go s.reifyWorker(workch) + } + + for { + s.reifyMx.Lock() + for len(s.reifyPend) == 0 && atomic.LoadInt32(&s.closing) == 0 { + s.reifyCond.Wait() + } + + if atomic.LoadInt32(&s.closing) != 0 { + s.reifyMx.Unlock() + return + } + + reifyPend := s.reifyPend + s.reifyPend = make(map[cid.Cid]struct{}) + s.reifyMx.Unlock() + + for c := range reifyPend { + select { + case workch <- c: + case <-s.ctx.Done(): + return + } + } + } +} + +func (s *SplitStore) reifyWorker(workch chan cid.Cid) { + defer s.reifyWorkers.Done() + for c := range workch { + s.doReify(c) + } +} + +func (s *SplitStore) doReify(c cid.Cid) { + var toreify, totrack, toforget []cid.Cid + + defer func() { + s.reifyMx.Lock() + defer s.reifyMx.Unlock() + + for _, c := range toreify { + delete(s.reifyInProgress, c) + } + for _, c := range totrack { + delete(s.reifyInProgress, c) + } + for _, c := range toforget { + delete(s.reifyInProgress, c) + } + }() + + s.txnLk.RLock() + defer s.txnLk.RUnlock() + + count := 0 + err := s.walkObjectIncomplete(c, newTmpVisitor(), + func(c cid.Cid) error { + if isUnitaryObject(c) { + return errStopWalk + } + + count++ + if count > ReifyLimit { + return errReifyLimit + } + + s.reifyMx.Lock() + _, inProgress := s.reifyInProgress[c] + if !inProgress { + s.reifyInProgress[c] = struct{}{} + } + s.reifyMx.Unlock() + + if inProgress { + return errStopWalk + } + + has, err := s.hot.Has(s.ctx, c) + if err != nil { + return xerrors.Errorf("error checking hotstore: %w", err) + } + + if has { + if s.txnMarkSet != nil { + hasMark, err := s.txnMarkSet.Has(c) + if err != nil { + log.Warnf("error checking markset: %s", err) + } else if hasMark { + toforget = append(toforget, c) + return errStopWalk + } + } else { + totrack = append(totrack, c) + return errStopWalk + } + } + + toreify = append(toreify, c) + return nil + }, + func(missing cid.Cid) error { + log.Warnf("missing reference while reifying %s: %s", c, missing) + return errStopWalk + }) + + if err != nil { + if xerrors.Is(err, errReifyLimit) { + log.Debug("reification aborted; reify limit reached") + return + } + + log.Warnf("error walking cold object for reification (cid: %s): %s", c, err) + return + } + + log.Debugf("reifying %d objects rooted at %s", len(toreify), c) + + // this should not get too big, maybe some 100s of objects. + batch := make([]blocks.Block, 0, len(toreify)) + for _, c := range toreify { + blk, err := s.cold.Get(s.ctx, c) + if err != nil { + log.Warnf("error retrieving cold object for reification (cid: %s): %s", c, err) + continue + } + + if err := s.checkClosing(); err != nil { + return + } + + batch = append(batch, blk) + } + + if len(batch) > 0 { + err = s.hot.PutMany(s.ctx, batch) + if err != nil { + log.Warnf("error reifying cold object (cid: %s): %s", c, err) + return + } + } + + if s.txnMarkSet != nil { + if len(toreify) > 0 { + if err := s.txnMarkSet.MarkMany(toreify); err != nil { + log.Warnf("error marking reified objects: %s", err) + } + } + if len(totrack) > 0 { + if err := s.txnMarkSet.MarkMany(totrack); err != nil { + log.Warnf("error marking tracked objects: %s", err) + } + } + } else { + // if txnActive is false these are noops + if len(toreify) > 0 { + s.trackTxnRefMany(toreify) + } + if len(totrack) > 0 { + s.trackTxnRefMany(totrack) + } + } +} diff --git a/blockstore/splitstore/splitstore_test.go b/blockstore/splitstore/splitstore_test.go index 27d58bf1043..ee30400a4b6 100644 --- a/blockstore/splitstore/splitstore_test.go +++ b/blockstore/splitstore/splitstore_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/ioutil" + "math/rand" "os" "sync" "sync/atomic" @@ -387,6 +388,235 @@ func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) { } } +func testSplitStoreReification(t *testing.T, f func(context.Context, blockstore.Blockstore, cid.Cid) error) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + hot := newMockStore() + cold := newMockStore() + + mkRandomBlock := func() blocks.Block { + data := make([]byte, 128) + _, err := rand.Read(data) + if err != nil { + t.Fatal(err) + } + + return blocks.NewBlock(data) + } + + block1 := mkRandomBlock() + block2 := mkRandomBlock() + block3 := mkRandomBlock() + + hdr := mock.MkBlock(nil, 0, 0) + hdr.Messages = block1.Cid() + hdr.ParentMessageReceipts = block2.Cid() + hdr.ParentStateRoot = block3.Cid() + block4, err := hdr.ToStorageBlock() + if err != nil { + t.Fatal(err) + } + + allBlocks := []blocks.Block{block1, block2, block3, block4} + for _, blk := range allBlocks { + err := cold.Put(context.Background(), blk) + if err != nil { + t.Fatal(err) + } + } + + path, err := ioutil.TempDir("", "splitstore.*") + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = os.RemoveAll(path) + }) + + ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"}) + if err != nil { + t.Fatal(err) + } + defer ss.Close() //nolint + + ss.warmupEpoch = 1 + go ss.reifyOrchestrator() + + waitForReification := func() { + for { + ss.reifyMx.Lock() + ready := len(ss.reifyPend) == 0 && len(ss.reifyInProgress) == 0 + ss.reifyMx.Unlock() + + if ready { + return + } + + time.Sleep(time.Millisecond) + } + } + + // first access using the standard view + err = f(context.Background(), ss, block4.Cid()) + if err != nil { + t.Fatal(err) + } + + // nothing should be reified + waitForReification() + for _, blk := range allBlocks { + has, err := hot.Has(context.Background(), blk.Cid()) + if err != nil { + t.Fatal(err) + } + + if has { + t.Fatal("block unexpectedly reified") + } + } + + // now make the hot/reifying view and ensure access reifies + err = f(blockstore.WithHotView(context.Background()), ss, block4.Cid()) + if err != nil { + t.Fatal(err) + } + + // everything should be reified + waitForReification() + for i, blk := range allBlocks { + has, err := hot.Has(context.Background(), blk.Cid()) + if err != nil { + t.Fatal(err) + } + + if !has { + t.Fatalf("block%d was not reified", i+1) + } + } +} + +func testSplitStoreReificationLimit(t *testing.T, f func(context.Context, blockstore.Blockstore, cid.Cid) error) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + hot := newMockStore() + cold := newMockStore() + + mkRandomBlock := func() blocks.Block { + data := make([]byte, 128) + _, err := rand.Read(data) + if err != nil { + t.Fatal(err) + } + + return blocks.NewBlock(data) + } + + block1 := mkRandomBlock() + block2 := mkRandomBlock() + block3 := mkRandomBlock() + + hdr := mock.MkBlock(nil, 0, 0) + hdr.Messages = block1.Cid() + hdr.ParentMessageReceipts = block2.Cid() + hdr.ParentStateRoot = block3.Cid() + block4, err := hdr.ToStorageBlock() + if err != nil { + t.Fatal(err) + } + + allBlocks := []blocks.Block{block1, block2, block3, block4} + for _, blk := range allBlocks { + err := cold.Put(context.Background(), blk) + if err != nil { + t.Fatal(err) + } + } + + path, err := ioutil.TempDir("", "splitstore.*") + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = os.RemoveAll(path) + }) + + ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"}) + if err != nil { + t.Fatal(err) + } + defer ss.Close() //nolint + + ss.warmupEpoch = 1 + go ss.reifyOrchestrator() + + waitForReification := func() { + for { + ss.reifyMx.Lock() + ready := len(ss.reifyPend) == 0 && len(ss.reifyInProgress) == 0 + ss.reifyMx.Unlock() + + if ready { + return + } + + time.Sleep(time.Millisecond) + } + } + + // do a hot access -- nothing should be reified as the limit should be exceeded + oldReifyLimit := ReifyLimit + ReifyLimit = 2 + t.Cleanup(func() { + ReifyLimit = oldReifyLimit + }) + + err = f(blockstore.WithHotView(context.Background()), ss, block4.Cid()) + if err != nil { + t.Fatal(err) + } + + waitForReification() + + for _, blk := range allBlocks { + has, err := hot.Has(context.Background(), blk.Cid()) + if err != nil { + t.Fatal(err) + } + + if has { + t.Fatal("block unexpectedly reified") + } + } + +} + +func TestSplitStoreReification(t *testing.T) { + t.Log("test reification with Has") + testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error { + _, err := s.Has(ctx, c) + return err + }) + t.Log("test reification with Get") + testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error { + _, err := s.Get(ctx, c) + return err + }) + t.Log("test reification with GetSize") + testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error { + _, err := s.GetSize(ctx, c) + return err + }) + t.Log("test reification with View") + testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error { + return s.View(ctx, c, func(_ []byte) error { return nil }) + }) + t.Log("test reification limit") + testSplitStoreReificationLimit(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error { + _, err := s.Has(ctx, c) + return err + }) +} + type mockChain struct { t testing.TB diff --git a/blockstore/splitstore/visitor.go b/blockstore/splitstore/visitor.go index 9dfbb78e704..4a78f1db1b5 100644 --- a/blockstore/splitstore/visitor.go +++ b/blockstore/splitstore/visitor.go @@ -26,6 +26,10 @@ type tmpVisitor struct { var _ ObjectVisitor = (*tmpVisitor)(nil) func (v *tmpVisitor) Visit(c cid.Cid) (bool, error) { + if isUnitaryObject(c) { + return false, nil + } + return v.set.Visit(c), nil } @@ -45,6 +49,10 @@ func newConcurrentVisitor() *concurrentVisitor { } func (v *concurrentVisitor) Visit(c cid.Cid) (bool, error) { + if isUnitaryObject(c) { + return false, nil + } + v.mx.Lock() defer v.mx.Unlock() diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 5c79a80e3e9..b5d1d4e4464 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 89b05d01582..4021b573d7d 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index f1bda11fa9f..397ee7423a8 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/build/params_mainnet.go b/build/params_mainnet.go index 6efc6d62ff4..0a9f6e77552 100644 --- a/build/params_mainnet.go +++ b/build/params_mainnet.go @@ -67,7 +67,8 @@ const UpgradeHyperdriveHeight = 892800 // 2021-10-26T13:30:00Z const UpgradeChocolateHeight = 1231620 -var UpgradeOhSnapHeight = abi.ChainEpoch(999999999999) +// 2022-03-01T15:00:00Z +var UpgradeOhSnapHeight = abi.ChainEpoch(1594680) func init() { if os.Getenv("LOTUS_USE_TEST_ADDRESSES") != "1" { diff --git a/chain/consensus/filcns/compute_state.go b/chain/consensus/filcns/compute_state.go index f7f6284d0a0..44b79285420 100644 --- a/chain/consensus/filcns/compute_state.go +++ b/chain/consensus/filcns/compute_state.go @@ -32,6 +32,7 @@ import ( /* inline-gen end */ + "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin" @@ -92,6 +93,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, sm *stmgr.StateManager partDone() }() + ctx = blockstore.WithHotView(ctx) makeVmWithBaseStateAndEpoch := func(base cid.Cid, e abi.ChainEpoch) (*vm.VM, error) { vmopt := &vm.VMOpts{ StateBase: base, diff --git a/chain/consensus/filcns/upgrades.go b/chain/consensus/filcns/upgrades.go index 2fa020d3db4..116684b9f44 100644 --- a/chain/consensus/filcns/upgrades.go +++ b/chain/consensus/filcns/upgrades.go @@ -165,13 +165,8 @@ func DefaultUpgradeSchedule() stmgr.UpgradeSchedule { Migration: UpgradeActorsV7, PreMigrations: []stmgr.PreMigration{{ PreMigration: PreUpgradeActorsV7, - StartWithin: 120, + StartWithin: 180, DontStartWithin: 60, - StopWithin: 35, - }, { - PreMigration: PreUpgradeActorsV7, - StartWithin: 30, - DontStartWithin: 15, StopWithin: 5, }}, Expensive: true, @@ -1264,7 +1259,7 @@ func upgradeActorsV7Common( root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet, config nv15.Config, ) (cid.Cid, error) { - writeStore := blockstore.NewAutobatch(ctx, sm.ChainStore().StateBlockstore(), units.GiB) + writeStore := blockstore.NewAutobatch(ctx, sm.ChainStore().StateBlockstore(), units.GiB/4) // TODO: pretty sure we'd achieve nothing by doing this, confirm in review //buf := blockstore.NewTieredBstore(sm.ChainStore().StateBlockstore(), writeStore) store := store.ActorStore(ctx, writeStore) diff --git a/chain/messagepool/check_test.go b/chain/messagepool/check_test.go new file mode 100644 index 00000000000..ffcac74e5d0 --- /dev/null +++ b/chain/messagepool/check_test.go @@ -0,0 +1,224 @@ +//stm: #unit +package messagepool + +import ( + "context" + "fmt" + "testing" + + "github.com/ipfs/go-datastore" + logging "github.com/ipfs/go-log/v2" + "github.com/stretchr/testify/assert" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/consensus/filcns" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/mock" + "github.com/filecoin-project/lotus/chain/wallet" + _ "github.com/filecoin-project/lotus/lib/sigs/bls" + _ "github.com/filecoin-project/lotus/lib/sigs/secp" +) + +func init() { + _ = logging.SetLogLevel("*", "INFO") +} + +func getCheckMessageStatus(statusCode api.CheckStatusCode, msgStatuses []api.MessageCheckStatus) (*api.MessageCheckStatus, error) { + for i := 0; i < len(msgStatuses); i++ { + iMsgStatuses := msgStatuses[i] + if iMsgStatuses.CheckStatus.Code == statusCode { + return &iMsgStatuses, nil + } + } + return nil, fmt.Errorf("Could not find CheckStatusCode %s", statusCode) +} + +func TestCheckMessages(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CHECK_MESSAGES_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + if err != nil { + t.Fatal(err) + } + + sender, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + tma.setBalance(sender, 1000e15) + target := mock.Address(1001) + + var protos []*api.MessagePrototype + for i := 0; i < 5; i++ { + msg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: uint64(i), + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 2<<10), + } + proto := &api.MessagePrototype{ + Message: *msg, + ValidNonce: true, + } + protos = append(protos, proto) + } + + messageStatuses, err := mp.CheckMessages(context.TODO(), protos) + assert.NoError(t, err) + for i := 0; i < len(messageStatuses); i++ { + iMsgStatuses := messageStatuses[i] + for j := 0; j < len(iMsgStatuses); j++ { + jStatus := iMsgStatuses[i] + assert.True(t, jStatus.OK) + } + } +} + +func TestCheckPendingMessages(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + if err != nil { + t.Fatal(err) + } + + sender, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + tma.setBalance(sender, 1000e15) + target := mock.Address(1001) + + // add a valid message to the pool + msg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 2<<10), + } + + sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + mustAdd(t, mp, sm) + + messageStatuses, err := mp.CheckPendingMessages(context.TODO(), sender) + assert.NoError(t, err) + for i := 0; i < len(messageStatuses); i++ { + iMsgStatuses := messageStatuses[i] + for j := 0; j < len(iMsgStatuses); j++ { + jStatus := iMsgStatuses[i] + assert.True(t, jStatus.OK) + } + } +} + +func TestCheckReplaceMessages(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CHECK_REPLACE_MESSAGES_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + if err != nil { + t.Fatal(err) + } + + sender, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + tma.setBalance(sender, 1000e15) + target := mock.Address(1001) + + // add a valid message to the pool + msg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 2<<10), + } + + sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + mustAdd(t, mp, sm) + + // create a new message with the same data, except that it is too big + var msgs []*types.Message + invalidmsg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 128<<10), + } + msgs = append(msgs, invalidmsg) + + { + messageStatuses, err := mp.CheckReplaceMessages(context.TODO(), msgs) + if err != nil { + t.Fatal(err) + } + for i := 0; i < len(messageStatuses); i++ { + iMsgStatuses := messageStatuses[i] + + status, err := getCheckMessageStatus(api.CheckStatusMessageSize, iMsgStatuses) + if err != nil { + t.Fatal(err) + } + // the replacement message should cause a status error + assert.False(t, status.OK) + } + } + +} diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 6bd60da34b1..d7f075aabc7 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" @@ -226,6 +227,8 @@ func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) { } func TestMessagePool(t *testing.T) { + //stm: @CHAIN_MEMPOOL_GET_NONCE_001 + tma := newTestMpoolAPI() w, err := wallet.NewWallet(wallet.NewMemKeyStore()) @@ -327,6 +330,7 @@ func TestCheckMessageBig(t *testing.T) { Message: *msg, Signature: *sig, } + //stm: @CHAIN_MEMPOOL_PUSH_001 err = mp.Add(context.TODO(), sm) assert.ErrorIs(t, err, ErrMessageTooBig) } @@ -760,3 +764,302 @@ func TestUpdates(t *testing.T) { t.Fatal("expected closed channel, but got an update instead") } } + +func TestMessageBelowMinGasFee(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + // fee is just below minimum gas fee + fee := minimumBaseFee.Uint64() - 1 + { + msg := &types.Message{ + To: to, + From: from, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(fee), + GasPremium: types.NewInt(1), + Params: make([]byte, 32<<10), + } + + sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + err = mp.Add(context.TODO(), sm) + assert.ErrorIs(t, err, ErrGasFeeCapTooLow) + } +} + +func TestMessageValueTooHigh(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + totalFil := types.TotalFilecoinInt + extra := types.NewInt(1) + + value := types.BigAdd(totalFil, extra) + { + msg := &types.Message{ + To: to, + From: from, + Value: value, + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 32<<10), + } + + sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + + err = mp.Add(context.TODO(), sm) + assert.Error(t, err) + } +} + +func TestMessageSignatureInvalid(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + msg := &types.Message{ + To: to, + From: from, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 32<<10), + } + + badSig := &crypto.Signature{ + Type: crypto.SigTypeSecp256k1, + Data: make([]byte, 0), + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *badSig, + } + err = mp.Add(context.TODO(), sm) + assert.Error(t, err) + // assert.Contains(t, err.Error(), "invalid signature length") + assert.Error(t, err) + } +} + +func TestAddMessageTwice(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + // create a valid messages + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // try to add it twice + err = mp.Add(context.TODO(), sm) + // assert.Contains(t, err.Error(), "with nonce 0 already in mpool") + assert.Error(t, err) + } +} + +func TestAddMessageTwiceNonceGap(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + // create message with invalid nonce (1) + sm := makeTestMessage(w, from, to, 1, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // then try to add message again + err = mp.Add(context.TODO(), sm) + // assert.Contains(t, err.Error(), "unfulfilled nonce gap") + assert.Error(t, err) + } +} + +func TestAddMessageTwiceCidDiff(t *testing.T) { + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // Create message with different data, so CID is different + sm2 := makeTestMessage(w, from, to, 0, 50_000_001, minimumBaseFee.Uint64()) + + //stm: @CHAIN_MEMPOOL_PUSH_001 + // then try to add message again + err = mp.Add(context.TODO(), sm2) + // assert.Contains(t, err.Error(), "replace by fee has too low GasPremium") + assert.Error(t, err) + } +} + +func TestAddMessageTwiceCidDiffReplaced(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // Create message with different data, so CID is different + sm2 := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()*2) + mustAdd(t, mp, sm2) + } +} + +func TestRemoveMessage(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + //stm: @CHAIN_MEMPOOL_REMOVE_001 + // remove message for sender + mp.Remove(context.TODO(), from, sm.Message.Nonce, true) + + //stm: @CHAIN_MEMPOOL_PENDING_FOR_001 + // check messages in pool: should be none present + msgs := mp.pendingFor(context.TODO(), from) + assert.Len(t, msgs, 0) + } +} diff --git a/chain/messagepool/repub_test.go b/chain/messagepool/repub_test.go index de32eaa6bd4..18a75d88181 100644 --- a/chain/messagepool/repub_test.go +++ b/chain/messagepool/repub_test.go @@ -1,3 +1,4 @@ +//stm: #unit package messagepool import ( @@ -16,6 +17,7 @@ import ( ) func TestRepubMessages(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001 oldRepublishBatchDelay := RepublishBatchDelay RepublishBatchDelay = time.Microsecond defer func() { @@ -57,6 +59,7 @@ func TestRepubMessages(t *testing.T) { for i := 0; i < 10; i++ { m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + //stm: @CHAIN_MEMPOOL_PUSH_001 _, err := mp.Push(context.TODO(), m) if err != nil { t.Fatal(err) diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 2ae99cd779f..e97d5208ef1 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -1,3 +1,4 @@ +//stm: #unit package messagepool import ( @@ -74,6 +75,8 @@ func makeTestMpool() (*MessagePool, *testMpoolAPI) { } func TestMessageChains(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001 + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001 mp, tma := makeTestMpool() // the actors @@ -310,6 +313,8 @@ func TestMessageChains(t *testing.T) { } func TestMessageChainSkipping(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001 + // regression test for chain skip bug mp, tma := makeTestMpool() @@ -382,6 +387,7 @@ func TestMessageChainSkipping(t *testing.T) { } func TestBasicMessageSelection(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 oldMaxNonceGap := MaxNonceGap MaxNonceGap = 1000 defer func() { @@ -532,6 +538,7 @@ func TestBasicMessageSelection(t *testing.T) { } func TestMessageSelectionTrimmingGas(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -595,6 +602,7 @@ func TestMessageSelectionTrimmingGas(t *testing.T) { } func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -641,6 +649,7 @@ func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) { } func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -707,6 +716,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) { } func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -788,6 +798,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) { } func TestPriorityMessageSelection(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -867,6 +878,7 @@ func TestPriorityMessageSelection(t *testing.T) { } func TestPriorityMessageSelection2(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -934,6 +946,7 @@ func TestPriorityMessageSelection2(t *testing.T) { } func TestPriorityMessageSelection3(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -1028,6 +1041,8 @@ func TestPriorityMessageSelection3(t *testing.T) { } func TestOptimalMessageSelection1(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + // this test uses just a single actor sending messages with a low tq // the chain depenent merging algorithm should pick messages from the actor // from the start @@ -1094,6 +1109,8 @@ func TestOptimalMessageSelection1(t *testing.T) { } func TestOptimalMessageSelection2(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + // this test uses two actors sending messages to each other, with the first // actor paying (much) higher gas premium than the second. // We select with a low ticket quality; the chain depenent merging algorithm should pick @@ -1173,6 +1190,8 @@ func TestOptimalMessageSelection2(t *testing.T) { } func TestOptimalMessageSelection3(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + // this test uses 10 actors sending a block of messages to each other, with the the first // actors paying higher gas premium than the subsequent actors. // We select with a low ticket quality; the chain dependent merging algorithm should pick @@ -1416,6 +1435,8 @@ func makeZipfPremiumDistribution(rng *rand.Rand) func() uint64 { } func TestCompetitiveMessageSelectionExp(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + if testing.Short() { t.Skip("skipping in short mode") } @@ -1439,6 +1460,8 @@ func TestCompetitiveMessageSelectionExp(t *testing.T) { } func TestCompetitiveMessageSelectionZipf(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + if testing.Short() { t.Skip("skipping in short mode") } @@ -1462,6 +1485,7 @@ func TestCompetitiveMessageSelectionZipf(t *testing.T) { } func TestGasReward(t *testing.T) { + //stm: @CHAIN_MEMPOOL_GET_GAS_REWARD_001 tests := []struct { Premium uint64 FeeCap uint64 @@ -1494,6 +1518,8 @@ func TestGasReward(t *testing.T) { } func TestRealWorldSelection(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @TOKEN_WALLET_SIGN_001, @CHAIN_MEMPOOL_SELECT_001 + // load test-messages.json.gz and rewrite the messages so that // 1) we map each real actor to a test actor so that we can sign the messages // 2) adjust the nonces so that they start from 0 diff --git a/chain/types/mock/chain.go b/chain/types/mock/chain.go index c69f7f56fad..9a911c98738 100644 --- a/chain/types/mock/chain.go +++ b/chain/types/mock/chain.go @@ -2,8 +2,8 @@ package mock import ( "context" - "crypto/rand" "fmt" + "math/rand" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" @@ -90,19 +90,24 @@ func TipSet(blks ...*types.BlockHeader) *types.TipSet { return ts } -func RandomActorAddress() (*address.Address, error) { - bytes := make([]byte, 32) - _, err := rand.Read(bytes) - if err != nil { - return nil, err +// Generates count new addresses using the provided seed, and returns them +func RandomActorAddresses(seed int64, count int) ([]*address.Address, error) { + randAddrs := make([]*address.Address, count) + source := rand.New(rand.NewSource(seed)) + for i := 0; i < count; i++ { + bytes := make([]byte, 32) + _, err := source.Read(bytes) + if err != nil { + return nil, err + } + + addr, err := address.NewActorAddress(bytes) + if err != nil { + return nil, err + } + randAddrs[i] = &addr } - - addr, err := address.NewActorAddress(bytes) - if err != nil { - return nil, err - } - - return &addr, nil + return randAddrs, nil } func UnsignedMessage(from, to address.Address, nonce uint64) *types.Message { diff --git a/cli/chain_test.go b/cli/chain_test.go index f3da80197a7..0b3cce728ff 100644 --- a/cli/chain_test.go +++ b/cli/chain_test.go @@ -1,3 +1,4 @@ +//stm: #cli package cli import ( @@ -32,6 +33,7 @@ func TestChainHead(t *testing.T) { mockApi.EXPECT().ChainHead(ctx).Return(ts, nil), ) + //stm: @CLI_CHAIN_HEAD_001 err := app.Run([]string{"chain", "head"}) assert.NoError(t, err) @@ -55,6 +57,7 @@ func TestGetBlock(t *testing.T) { mockApi.EXPECT().ChainGetParentReceipts(ctx, block.Cid()).Return([]*types.MessageReceipt{}, nil), ) + //stm: @CLI_CHAIN_GET_BLOCK_001 err := app.Run([]string{"chain", "getblock", block.Cid().String()}) assert.NoError(t, err) @@ -89,6 +92,7 @@ func TestReadOjb(t *testing.T) { mockApi.EXPECT().ChainReadObj(ctx, block.Cid()).Return(obj.Bytes(), nil), ) + //stm: @CLI_CHAIN_READ_OBJECT_001 err = app.Run([]string{"chain", "read-obj", block.Cid().String()}) assert.NoError(t, err) @@ -104,6 +108,7 @@ func TestChainDeleteObj(t *testing.T) { app, _, _, done := NewMockAppWithFullAPI(t, cmd) defer done() + //stm: @CLI_CHAIN_DELETE_OBJECT_002 err := app.Run([]string{"chain", "delete-obj", block.Cid().String()}) assert.Error(t, err) }) @@ -120,6 +125,7 @@ func TestChainDeleteObj(t *testing.T) { mockApi.EXPECT().ChainDeleteObj(ctx, block.Cid()).Return(nil), ) + //stm: @CLI_CHAIN_DELETE_OBJECT_001 err := app.Run([]string{"chain", "delete-obj", "--really-do-it=true", block.Cid().String()}) assert.NoError(t, err) @@ -152,6 +158,7 @@ func TestChainStatObj(t *testing.T) { mockApi.EXPECT().ChainStatObj(ctx, block.Cid(), cid.Undef).Return(stat, nil), ) + //stm: @CLI_CHAIN_STAT_OBJECT_001 err := app.Run([]string{"chain", "stat-obj", block.Cid().String()}) assert.NoError(t, err) @@ -170,6 +177,7 @@ func TestChainStatObj(t *testing.T) { mockApi.EXPECT().ChainStatObj(ctx, block.Cid(), block.Cid()).Return(stat, nil), ) + //stm: @CLI_CHAIN_STAT_OBJECT_002 err := app.Run([]string{"chain", "stat-obj", fmt.Sprintf("-base=%s", block.Cid().String()), block.Cid().String()}) assert.NoError(t, err) @@ -181,11 +189,11 @@ func TestChainGetMsg(t *testing.T) { app, mockApi, buf, done := NewMockAppWithFullAPI(t, WithCategory("chain", ChainGetMsgCmd)) defer done() - from, err := mock.RandomActorAddress() + addrs, err := mock.RandomActorAddresses(12345, 2) assert.NoError(t, err) - to, err := mock.RandomActorAddress() - assert.NoError(t, err) + from := addrs[0] + to := addrs[1] msg := mock.UnsignedMessage(*from, *to, 0) @@ -200,6 +208,7 @@ func TestChainGetMsg(t *testing.T) { mockApi.EXPECT().ChainReadObj(ctx, msg.Cid()).Return(obj.Bytes(), nil), ) + //stm: @CLI_CHAIN_GET_MESSAGE_001 err = app.Run([]string{"chain", "getmessage", msg.Cid().String()}) assert.NoError(t, err) @@ -229,6 +238,7 @@ func TestSetHead(t *testing.T) { mockApi.EXPECT().ChainSetHead(ctx, genesis.Key()).Return(nil), ) + //stm: @CLI_CHAIN_SET_HEAD_003 err := app.Run([]string{"chain", "sethead", "-genesis=true", ts.Key().String()}) assert.NoError(t, err) }) @@ -246,6 +256,7 @@ func TestSetHead(t *testing.T) { mockApi.EXPECT().ChainSetHead(ctx, genesis.Key()).Return(nil), ) + //stm: @CLI_CHAIN_SET_HEAD_002 err := app.Run([]string{"chain", "sethead", fmt.Sprintf("-epoch=%s", epoch), ts.Key().String()}) assert.NoError(t, err) }) @@ -263,8 +274,7 @@ func TestSetHead(t *testing.T) { mockApi.EXPECT().ChainSetHead(ctx, ts.Key()).Return(nil), ) - // ts.Key should be passed as an array of arguments (CIDs) - // since we have only one CID in the key, this is ok + //stm: @CLI_CHAIN_SET_HEAD_001 err := app.Run([]string{"chain", "sethead", ts.Key().Cids()[0].String()}) assert.NoError(t, err) }) @@ -274,11 +284,11 @@ func TestInspectUsage(t *testing.T) { cmd := WithCategory("chain", ChainInspectUsage) ts := mock.TipSet(mock.MkBlock(nil, 0, 0)) - from, err := mock.RandomActorAddress() + addrs, err := mock.RandomActorAddresses(12345, 2) assert.NoError(t, err) - to, err := mock.RandomActorAddress() - assert.NoError(t, err) + from := addrs[0] + to := addrs[1] msg := mock.UnsignedMessage(*from, *to, 0) msgs := []api.Message{{Cid: msg.Cid(), Message: msg}} @@ -303,6 +313,7 @@ func TestInspectUsage(t *testing.T) { mockApi.EXPECT().StateGetActor(ctx, *to, ts.Key()).Return(actor, nil), ) + //stm: @CLI_CHAIN_INSPECT_USAGE_001 err := app.Run([]string{"chain", "inspect-usage"}) assert.NoError(t, err) @@ -311,7 +322,10 @@ func TestInspectUsage(t *testing.T) { // output is plaintext, had to do string matching assert.Contains(t, out, from.String()) assert.Contains(t, out, to.String()) - assert.Contains(t, out, "Send") + // check for gas by sender + assert.Contains(t, out, "By Sender") + // check for gas by method + assert.Contains(t, out, "By Method:\nSend") }) } @@ -322,13 +336,11 @@ func TestChainList(t *testing.T) { blk.Height = 1 head := mock.TipSet(blk) - head.Height() - - from, err := mock.RandomActorAddress() + addrs, err := mock.RandomActorAddresses(12345, 2) assert.NoError(t, err) - to, err := mock.RandomActorAddress() - assert.NoError(t, err) + from := addrs[0] + to := addrs[1] msg := mock.UnsignedMessage(*from, *to, 0) msgs := []api.Message{{Cid: msg.Cid(), Message: msg}} @@ -352,6 +364,7 @@ func TestChainList(t *testing.T) { mockApi.EXPECT().ChainGetBlockMessages(ctx, head.Blocks()[0].Cid()).Return(blockMsgs, nil), ) + //stm: CLI_CHAIN_LIST_001 err := app.Run([]string{"chain", "love", "--gas-stats=true"}) // chain is love ❤️ assert.NoError(t, err) @@ -382,6 +395,7 @@ func TestChainGet(t *testing.T) { mockApi.EXPECT().ChainGetNode(ctx, path).Return(&api.IpldObject{Cid: blk.Cid(), Obj: blk}, nil), ) + //stm: @CLI_CHAIN_GET_001 err := app.Run([]string{"chain", "get", path}) assert.NoError(t, err) @@ -407,6 +421,7 @@ func TestChainGet(t *testing.T) { mockApi.EXPECT().ChainGetNode(ctx, p2).Return(&api.IpldObject{Cid: blk.Cid(), Obj: blk}, nil), ) + //stm: @CLI_CHAIN_GET_002 err := app.Run([]string{"chain", "get", p1}) assert.NoError(t, err) @@ -430,6 +445,7 @@ func TestChainGet(t *testing.T) { mockApi.EXPECT().ChainGetNode(ctx, path).Return(&api.IpldObject{Cid: blk.Cid(), Obj: blk}, nil), ) + //stm: @CLI_CHAIN_GET_004 err := app.Run([]string{"chain", "get", "-as-type=foo", path}) assert.Error(t, err) }) @@ -465,6 +481,7 @@ func TestChainBisect(t *testing.T) { mockApi.EXPECT().ChainGetNode(ctx, path).Return(&api.IpldObject{Cid: blk2.Cid(), Obj: blk2}, nil), ) + //stm: @CLI_CHAIN_BISECT_001 err := app.Run([]string{"chain", "bisect", fmt.Sprintf("%d", minHeight), fmt.Sprintf("%d", maxHeight), subpath, shell}) assert.NoError(t, err) @@ -497,6 +514,7 @@ func TestChainExport(t *testing.T) { mockApi.EXPECT().ChainExport(ctx, abi.ChainEpoch(0), false, ts.Key()).Return(export, nil), ) + //stm: @CLI_CHAIN_EXPORT_001 err := app.Run([]string{"chain", "export", "whatever.car"}) assert.NoError(t, err) @@ -522,6 +540,7 @@ func TestChainGasPrice(t *testing.T) { calls++ }) + //stm: @CLI_CHAIN_GAS_PRICE_001 err := app.Run([]string{"chain", "gas-price"}) assert.NoError(t, err) diff --git a/cli/client.go b/cli/client.go index 634bd18e5b4..64114fa171b 100644 --- a/cli/client.go +++ b/cli/client.go @@ -667,6 +667,8 @@ uiLoop: state = "miner" case "miner": + maddrs = maddrs[:0] + ask = ask[:0] afmt.Print("Miner Addresses (f0.. f0..), none to find: ") _maddrsStr, _, err := rl.ReadLine() @@ -802,7 +804,8 @@ uiLoop: dealCount, err = strconv.ParseInt(string(dealcStr), 10, 64) if err != nil { - return err + printErr(xerrors.Errorf("reading deal count: invalid number")) + continue } color.Blue(".. Picking miners") @@ -859,12 +862,13 @@ uiLoop: a, err := api.ClientQueryAsk(ctx, *mi.PeerId, maddr) if err != nil { - printErr(xerrors.Errorf("failed to query ask: %w", err)) + printErr(xerrors.Errorf("failed to query ask for miner %s: %w", maddr.String(), err)) state = "miner" continue uiLoop } ask = append(ask, *a) + } // TODO: run more validation diff --git a/cmd/lotus-miner/info.go b/cmd/lotus-miner/info.go index 1133908cab9..46070ca966c 100644 --- a/cmd/lotus-miner/info.go +++ b/cmd/lotus-miner/info.go @@ -126,7 +126,7 @@ func infoCmdAct(cctx *cli.Context) error { alerts, err := minerApi.LogAlerts(ctx) if err != nil { - return xerrors.Errorf("getting alerts: %w", err) + fmt.Printf("ERROR: getting alerts: %s\n", err) } activeAlerts := make([]alerting.Alert, 0) diff --git a/cmd/lotus-miner/info_all.go b/cmd/lotus-miner/info_all.go index bd1147b2263..fa5a413e704 100644 --- a/cmd/lotus-miner/info_all.go +++ b/cmd/lotus-miner/info_all.go @@ -96,6 +96,11 @@ var infoAllCmd = &cli.Command{ fmt.Println("ERROR: ", err) } + fmt.Println("\n#: Storage Locks") + if err := storageLocks.Action(cctx); err != nil { + fmt.Println("ERROR: ", err) + } + fmt.Println("\n#: Sched Diag") if err := sealingSchedDiagCmd.Action(cctx); err != nil { fmt.Println("ERROR: ", err) @@ -192,6 +197,11 @@ var infoAllCmd = &cli.Command{ fmt.Println("ERROR: ", err) } + fmt.Println("\n#: Storage Sector List") + if err := storageListSectorsCmd.Action(cctx); err != nil { + fmt.Println("ERROR: ", err) + } + fmt.Println("\n#: Expired Sectors") if err := sectorsExpiredCmd.Action(cctx); err != nil { fmt.Println("ERROR: ", err) diff --git a/cmd/lotus-miner/init.go b/cmd/lotus-miner/init.go index ae742c66301..59ea75b10c3 100644 --- a/cmd/lotus-miner/init.go +++ b/cmd/lotus-miner/init.go @@ -467,12 +467,15 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode stor := stores.NewRemote(lstor, si, http.Header(sa), 10, &stores.DefaultPartialFileHandler{}) smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{ - ParallelFetchLimit: 10, - AllowAddPiece: true, - AllowPreCommit1: true, - AllowPreCommit2: true, - AllowCommit: true, - AllowUnseal: true, + ParallelFetchLimit: 10, + AllowAddPiece: true, + AllowPreCommit1: true, + AllowPreCommit2: true, + AllowCommit: true, + AllowUnseal: true, + AllowReplicaUpdate: true, + AllowProveReplicaUpdate2: true, + AllowRegenSectorKey: true, }, wsts, smsts) if err != nil { return err diff --git a/cmd/lotus-miner/sectors.go b/cmd/lotus-miner/sectors.go index c779f5a8ba1..d8c3e9c7cc2 100644 --- a/cmd/lotus-miner/sectors.go +++ b/cmd/lotus-miner/sectors.go @@ -161,7 +161,7 @@ var sectorsStatusCmd = &cli.Command{ fmt.Printf("Expiration:\t\t%v\n", status.Expiration) fmt.Printf("DealWeight:\t\t%v\n", status.DealWeight) fmt.Printf("VerifiedDealWeight:\t\t%v\n", status.VerifiedDealWeight) - fmt.Printf("InitialPledge:\t\t%v\n", status.InitialPledge) + fmt.Printf("InitialPledge:\t\t%v\n", types.FIL(status.InitialPledge)) fmt.Printf("\nExpiration Info\n") fmt.Printf("OnTime:\t\t%v\n", status.OnTime) fmt.Printf("Early:\t\t%v\n", status.Early) @@ -294,8 +294,14 @@ var sectorsListCmd = &cli.Command{ Aliases: []string{"e"}, }, &cli.BoolFlag{ - Name: "seal-time", - Usage: "display how long it took for the sector to be sealed", + Name: "initial-pledge", + Usage: "display initial pledge", + Aliases: []string{"p"}, + }, + &cli.BoolFlag{ + Name: "seal-time", + Usage: "display how long it took for the sector to be sealed", + Aliases: []string{"t"}, }, &cli.StringFlag{ Name: "states", @@ -405,6 +411,7 @@ var sectorsListCmd = &cli.Command{ tablewriter.Col("Deals"), tablewriter.Col("DealWeight"), tablewriter.Col("VerifiedPower"), + tablewriter.Col("Pledge"), tablewriter.NewLineCol("Error"), tablewriter.NewLineCol("RecoveryTimeout")) @@ -483,6 +490,9 @@ var sectorsListCmd = &cli.Command{ m["RecoveryTimeout"] = color.YellowString(lcli.EpochTime(head.Height(), st.Early)) } } + if inSSet && cctx.Bool("initial-pledge") { + m["Pledge"] = types.FIL(st.InitialPledge).Short() + } } if !fast && deals > 0 { diff --git a/cmd/lotus-miner/storage.go b/cmd/lotus-miner/storage.go index 6f7a627f60e..0fea2a3a578 100644 --- a/cmd/lotus-miner/storage.go +++ b/cmd/lotus-miner/storage.go @@ -368,6 +368,7 @@ type storedSector struct { store stores.SectorStorageInfo unsealed, sealed, cache bool + update, updatecache bool } var storageFindCmd = &cli.Command{ @@ -421,6 +422,16 @@ var storageFindCmd = &cli.Command{ return xerrors.Errorf("finding cache: %w", err) } + us, err := nodeApi.StorageFindSector(ctx, sid, storiface.FTUpdate, 0, false) + if err != nil { + return xerrors.Errorf("finding sealed: %w", err) + } + + uc, err := nodeApi.StorageFindSector(ctx, sid, storiface.FTUpdateCache, 0, false) + if err != nil { + return xerrors.Errorf("finding cache: %w", err) + } + byId := map[stores.ID]*storedSector{} for _, info := range u { sts, ok := byId[info.ID] @@ -455,6 +466,28 @@ var storageFindCmd = &cli.Command{ } sts.cache = true } + for _, info := range us { + sts, ok := byId[info.ID] + if !ok { + sts = &storedSector{ + id: info.ID, + store: info, + } + byId[info.ID] = sts + } + sts.update = true + } + for _, info := range uc { + sts, ok := byId[info.ID] + if !ok { + sts = &storedSector{ + id: info.ID, + store: info, + } + byId[info.ID] = sts + } + sts.updatecache = true + } local, err := nodeApi.StorageLocal(ctx) if err != nil { @@ -480,6 +513,12 @@ var storageFindCmd = &cli.Command{ if info.cache { types += "Cache, " } + if info.update { + types += "Update, " + } + if info.updatecache { + types += "UpdateCache, " + } fmt.Printf("In %s (%s)\n", info.id, types[:len(types)-2]) fmt.Printf("\tSealing: %t; Storage: %t\n", info.store.CanSeal, info.store.CanStore) diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index 84ff1ccddc5..9e6843dbf42 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -173,6 +173,11 @@ var runCmd = &cli.Command{ Usage: "enable prove replica update 2", Value: true, }, + &cli.BoolFlag{ + Name: "regen-sector-key", + Usage: "enable regen sector key", + Value: true, + }, &cli.IntFlag{ Name: "parallel-fetch-limit", Usage: "maximum fetch operations to run in parallel", @@ -278,12 +283,15 @@ var runCmd = &cli.Command{ if cctx.Bool("commit") { taskTypes = append(taskTypes, sealtasks.TTCommit2) } - if cctx.Bool("replicaupdate") { + if cctx.Bool("replica-update") { taskTypes = append(taskTypes, sealtasks.TTReplicaUpdate) } if cctx.Bool("prove-replica-update2") { taskTypes = append(taskTypes, sealtasks.TTProveReplicaUpdate2) } + if cctx.Bool("regen-sector-key") { + taskTypes = append(taskTypes, sealtasks.TTRegenSectorKey) + } if len(taskTypes) == 0 { return xerrors.Errorf("no task types specified") diff --git a/cmd/lotus-seal-worker/tasks.go b/cmd/lotus-seal-worker/tasks.go index 02e5d6cfd8e..52133d09d16 100644 --- a/cmd/lotus-seal-worker/tasks.go +++ b/cmd/lotus-seal-worker/tasks.go @@ -22,11 +22,14 @@ var tasksCmd = &cli.Command{ } var allowSetting = map[sealtasks.TaskType]struct{}{ - sealtasks.TTAddPiece: {}, - sealtasks.TTPreCommit1: {}, - sealtasks.TTPreCommit2: {}, - sealtasks.TTCommit2: {}, - sealtasks.TTUnseal: {}, + sealtasks.TTAddPiece: {}, + sealtasks.TTPreCommit1: {}, + sealtasks.TTPreCommit2: {}, + sealtasks.TTCommit2: {}, + sealtasks.TTUnseal: {}, + sealtasks.TTReplicaUpdate: {}, + sealtasks.TTProveReplicaUpdate2: {}, + sealtasks.TTRegenSectorKey: {}, } var settableStr = func() string { diff --git a/cmd/lotus-seed/genesis.go b/cmd/lotus-seed/genesis.go index a27cc0a2f7c..89feec33a6d 100644 --- a/cmd/lotus-seed/genesis.go +++ b/cmd/lotus-seed/genesis.go @@ -508,12 +508,19 @@ var genesisSetRemainderCmd = &cli.Command{ } var genesisSetActorVersionCmd = &cli.Command{ - Name: "set-network-version", - Usage: "Set the version that this network will start from", - ArgsUsage: " ", + Name: "set-network-version", + Usage: "Set the version that this network will start from", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "network-version", + Usage: "network version to start genesis with", + Value: int(build.GenesisNetworkVersion), + }, + }, + ArgsUsage: "", Action: func(cctx *cli.Context) error { - if cctx.Args().Len() != 2 { - return fmt.Errorf("must specify genesis file and network version (e.g. '0'") + if cctx.Args().Len() != 1 { + return fmt.Errorf("must specify genesis file") } genf, err := homedir.Expand(cctx.Args().First()) @@ -531,16 +538,12 @@ var genesisSetActorVersionCmd = &cli.Command{ return xerrors.Errorf("unmarshal genesis template: %w", err) } - nv, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64) - if err != nil { - return xerrors.Errorf("parsing network version: %w", err) - } - - if nv > uint64(build.NewestNetworkVersion) { + nv := network.Version(cctx.Int("network-version")) + if nv > build.NewestNetworkVersion { return xerrors.Errorf("invalid network version: %d", nv) } - template.NetworkVersion = network.Version(nv) + template.NetworkVersion = nv b, err = json.MarshalIndent(&template, "", " ") if err != nil { diff --git a/cmd/lotus-shed/diff.go b/cmd/lotus-shed/diff.go new file mode 100644 index 00000000000..bcaa041227e --- /dev/null +++ b/cmd/lotus-shed/diff.go @@ -0,0 +1,104 @@ +package main + +import ( + "fmt" + + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/chain/types" + lcli "github.com/filecoin-project/lotus/cli" +) + +var diffCmd = &cli.Command{ + Name: "diff", + Usage: "diff state objects", + Subcommands: []*cli.Command{diffStateTrees}, +} + +var diffStateTrees = &cli.Command{ + Name: "state-trees", + Usage: "diff two state-trees", + ArgsUsage: " ", + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + + defer closer() + ctx := lcli.ReqContext(cctx) + + if cctx.NArg() != 2 { + return xerrors.Errorf("expected two state-tree roots") + } + + argA := cctx.Args().Get(1) + rootA, err := cid.Parse(argA) + if err != nil { + return xerrors.Errorf("first state-tree root (%q) is not a CID: %w", argA, err) + } + argB := cctx.Args().Get(1) + rootB, err := cid.Parse(argB) + if err != nil { + return xerrors.Errorf("second state-tree root (%q) is not a CID: %w", argB, err) + } + + if rootA == rootB { + fmt.Println("state trees do not differ") + return nil + } + + changedB, err := api.StateChangedActors(ctx, rootA, rootB) + if err != nil { + return err + } + changedA, err := api.StateChangedActors(ctx, rootB, rootA) + if err != nil { + return err + } + + diff := func(stateA, stateB types.Actor) { + if stateB.Code != stateA.Code { + fmt.Printf(" code: %s != %s\n", stateA.Code, stateB.Code) + } + if stateB.Head != stateA.Head { + fmt.Printf(" state: %s != %s\n", stateA.Head, stateB.Head) + } + if stateB.Nonce != stateA.Nonce { + fmt.Printf(" nonce: %d != %d\n", stateA.Nonce, stateB.Nonce) + } + if !stateB.Balance.Equals(stateA.Balance) { + fmt.Printf(" balance: %s != %s\n", stateA.Balance, stateB.Balance) + } + } + + fmt.Printf("state differences between %s (first) and %s (second):\n\n", rootA, rootB) + for addr, stateA := range changedA { + fmt.Println(addr) + stateB, ok := changedB[addr] + if ok { + diff(stateA, stateB) + continue + } else { + fmt.Printf(" actor does not exist in second state-tree (%s)\n", rootB) + } + fmt.Println() + delete(changedB, addr) + } + for addr, stateB := range changedB { + fmt.Println(addr) + stateA, ok := changedA[addr] + if ok { + diff(stateA, stateB) + continue + } else { + fmt.Printf(" actor does not exist in first state-tree (%s)\n", rootA) + } + fmt.Println() + } + return nil + }, +} diff --git a/cmd/lotus-shed/main.go b/cmd/lotus-shed/main.go index 9bcea72240e..45fd24e18a6 100644 --- a/cmd/lotus-shed/main.go +++ b/cmd/lotus-shed/main.go @@ -68,6 +68,7 @@ func main() { sendCsvCmd, terminationsCmd, migrationsCmd, + diffCmd, } app := &cli.App{ diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index 1fd3e91a305..848a9c8649a 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -1621,14 +1621,15 @@ USAGE: lotus-miner sectors list [command options] [arguments...] OPTIONS: - --show-removed, -r show removed sectors (default: false) - --color, -c use color in display output (default: depends on output being a TTY) - --fast, -f don't show on-chain info for better performance (default: false) - --events, -e display number of events the sector has received (default: false) - --seal-time display how long it took for the sector to be sealed (default: false) - --states value filter sectors by a comma-separated list of states - --unproven, -u only show sectors which aren't in the 'Proving' state (default: false) - --help, -h show help (default: false) + --show-removed, -r show removed sectors (default: false) + --color, -c use color in display output (default: depends on output being a TTY) + --fast, -f don't show on-chain info for better performance (default: false) + --events, -e display number of events the sector has received (default: false) + --initial-pledge, -p display initial pledge (default: false) + --seal-time, -t display how long it took for the sector to be sealed (default: false) + --states value filter sectors by a comma-separated list of states + --unproven, -u only show sectors which aren't in the 'Proving' state (default: false) + --help, -h show help (default: false) ``` diff --git a/documentation/en/cli-lotus-worker.md b/documentation/en/cli-lotus-worker.md index c74d8e8dad1..e610cff6206 100644 --- a/documentation/en/cli-lotus-worker.md +++ b/documentation/en/cli-lotus-worker.md @@ -46,6 +46,7 @@ OPTIONS: --commit enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap) (default: true) --replica-update enable replica update (default: true) --prove-replica-update2 enable prove replica update 2 (default: true) + --regen-sector-key enable regen sector key (default: true) --parallel-fetch-limit value maximum fetch operations to run in parallel (default: 5) --timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m") --help, -h show help (default: false) @@ -170,7 +171,7 @@ NAME: lotus-worker tasks enable - Enable a task type USAGE: - lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|AP] + lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK] OPTIONS: --help, -h show help (default: false) @@ -183,7 +184,7 @@ NAME: lotus-worker tasks disable - Disable a task type USAGE: - lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|AP] + lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK] OPTIONS: --help, -h show help (default: false) diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index d8c774c7589..818f0b73c8f 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -438,6 +438,9 @@ # env var: LOTUS_STORAGE_ALLOWPROVEREPLICAUPDATE2 #AllowProveReplicaUpdate2 = true + # env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY + #AllowRegenSectorKey = true + # env var: LOTUS_STORAGE_RESOURCEFILTERING #ResourceFiltering = "hardware" diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index fcbfa2e69f7..897ba4f0611 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -105,6 +105,7 @@ type SealerConfig struct { AllowUnseal bool AllowReplicaUpdate bool AllowProveReplicaUpdate2 bool + AllowRegenSectorKey bool // ResourceFiltering instructs the system which resource filtering strategy // to use when evaluating tasks against this worker. An empty value defaults @@ -169,6 +170,9 @@ func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls store if sc.AllowProveReplicaUpdate2 { localTasks = append(localTasks, sealtasks.TTProveReplicaUpdate2) } + if sc.AllowRegenSectorKey { + localTasks = append(localTasks, sealtasks.TTRegenSectorKey) + } wcfg := WorkerConfig{ IgnoreResourceFiltering: sc.ResourceFiltering == ResourceFilteringDisabled, diff --git a/extern/storage-sealing/checks.go b/extern/storage-sealing/checks.go index 56b0677c468..dc045ded216 100644 --- a/extern/storage-sealing/checks.go +++ b/extern/storage-sealing/checks.go @@ -214,8 +214,13 @@ func checkReplicaUpdate(ctx context.Context, maddr address.Address, si SectorInf if err != nil { return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)} } - if si.UpdateUnsealed == nil || !commD.Equals(*si.UpdateUnsealed) { - return &ErrBadRU{xerrors.Errorf("on chain CommD differs from sector: %s != %s", commD, si.CommD)} + + if si.UpdateUnsealed == nil { + return &ErrBadRU{xerrors.New("nil UpdateUnsealed cid after replica update")} + } + + if !commD.Equals(*si.UpdateUnsealed) { + return &ErrBadRU{xerrors.Errorf("calculated CommD differs from updated replica: %s != %s", commD, *si.UpdateUnsealed)} } if si.UpdateSealed == nil { diff --git a/extern/storage-sealing/upgrade_queue.go b/extern/storage-sealing/upgrade_queue.go index 86083930d6a..1e5bef67c80 100644 --- a/extern/storage-sealing/upgrade_queue.go +++ b/extern/storage-sealing/upgrade_queue.go @@ -108,7 +108,8 @@ func sectorActive(ctx context.Context, api SealingAPI, maddr address.Address, to if err != nil { return false, xerrors.Errorf("failed to check active sectors: %w", err) } - // Check if sector is among active sectors + + // Ensure the upgraded sector is active var found bool for _, si := range active { if si.SectorNumber == sector { diff --git a/gateway/node.go b/gateway/node.go index a0c120d39e1..87a8551b1f4 100644 --- a/gateway/node.go +++ b/gateway/node.go @@ -51,6 +51,7 @@ type TargetAPI interface { MpoolPushUntrusted(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error) MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error) + MsigGetVestingSchedule(context.Context, address.Address, types.TipSetKey) (api.MsigVesting, error) MsigGetPending(ctx context.Context, addr address.Address, ts types.TipSetKey) ([]*api.MsigTransaction, error) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) StateDealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (api.DealCollateralBounds, error) @@ -282,6 +283,13 @@ func (gw *Node) MsigGetVested(ctx context.Context, addr address.Address, start t return gw.target.MsigGetVested(ctx, addr, start, end) } +func (gw *Node) MsigGetVestingSchedule(ctx context.Context, addr address.Address, tsk types.TipSetKey) (api.MsigVesting, error) { + if err := gw.checkTipsetKey(ctx, tsk); err != nil { + return api.MsigVesting{}, err + } + return gw.target.MsigGetVestingSchedule(ctx, addr, tsk) +} + func (gw *Node) MsigGetPending(ctx context.Context, addr address.Address, tsk types.TipSetKey) ([]*api.MsigTransaction, error) { if err := gw.checkTipsetKey(ctx, tsk); err != nil { return nil, err diff --git a/go.mod b/go.mod index f22400ded5a..0fae1713ce2 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/filecoin-project/lotus go 1.16 +retract v1.14.0 // Accidentally force-pushed tag, use v1.14.1+ instead. + require ( contrib.go.opencensus.io/exporter/prometheus v0.4.0 github.com/BurntSushi/toml v0.4.1 @@ -108,7 +110,7 @@ require ( github.com/kelseyhightower/envconfig v1.4.0 github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-eventbus v0.2.1 - github.com/libp2p/go-libp2p v0.18.0-rc4 + github.com/libp2p/go-libp2p v0.18.0-rc5 github.com/libp2p/go-libp2p-connmgr v0.3.1 // indirect github.com/libp2p/go-libp2p-core v0.14.0 github.com/libp2p/go-libp2p-discovery v0.6.0 @@ -118,9 +120,9 @@ require ( github.com/libp2p/go-libp2p-pubsub v0.6.1 github.com/libp2p/go-libp2p-quic-transport v0.16.1 github.com/libp2p/go-libp2p-record v0.1.3 - github.com/libp2p/go-libp2p-resource-manager v0.1.3 + github.com/libp2p/go-libp2p-resource-manager v0.1.4 github.com/libp2p/go-libp2p-routing-helpers v0.2.3 - github.com/libp2p/go-libp2p-swarm v0.10.1 + github.com/libp2p/go-libp2p-swarm v0.10.2 github.com/libp2p/go-libp2p-tls v0.3.1 github.com/libp2p/go-libp2p-yamux v0.8.2 github.com/libp2p/go-maddr-filter v0.1.0 diff --git a/go.sum b/go.sum index 6b725b5df74..417267e376f 100644 --- a/go.sum +++ b/go.sum @@ -995,8 +995,8 @@ github.com/libp2p/go-libp2p v0.14.4/go.mod h1:EIRU0Of4J5S8rkockZM7eJp2S0UrCyi55m github.com/libp2p/go-libp2p v0.16.0/go.mod h1:ump42BsirwAWxKzsCiFnTtN1Yc+DuPu76fyMX364/O4= github.com/libp2p/go-libp2p v0.17.0/go.mod h1:Fkin50rsGdv5mm5BshBUtPRZknt9esfmYXBOYcwOTgw= github.com/libp2p/go-libp2p v0.18.0-rc1/go.mod h1:RgYlH7IIWHXREimC92bw5Lg1V2R5XmSzuLHb5fTnr+8= -github.com/libp2p/go-libp2p v0.18.0-rc4 h1:OUsSbeu7q+Ck/bV9wHDxFzb08ORqBupHhpCmRBhWrJ8= -github.com/libp2p/go-libp2p v0.18.0-rc4/go.mod h1:wzmsk1ioOq9FGQys2BN5BIw4nugP6+R+CyW3JbPEbbs= +github.com/libp2p/go-libp2p v0.18.0-rc5 h1:88wWDHb9nNo0vBNCupLde3OTnFAkugOCNkrDfl3ivK4= +github.com/libp2p/go-libp2p v0.18.0-rc5/go.mod h1:aZPS5l84bDvCvP4jkyEUT/J6YOpUq33Fgqrs3K59mpI= github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052/go.mod h1:nRMRTab+kZuk0LnKZpxhOVH/ndsdr2Nr//Zltc/vwgo= github.com/libp2p/go-libp2p-asn-util v0.1.0 h1:rABPCO77SjdbJ/eJ/ynIo8vWICy1VEnL5JAxJbQLo1E= github.com/libp2p/go-libp2p-asn-util v0.1.0/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I= @@ -1158,8 +1158,9 @@ github.com/libp2p/go-libp2p-record v0.1.2/go.mod h1:pal0eNcT5nqZaTV7UGhqeGqxFgGd github.com/libp2p/go-libp2p-record v0.1.3 h1:R27hoScIhQf/A8XJZ8lYpnqh9LatJ5YbHs28kCIfql0= github.com/libp2p/go-libp2p-record v0.1.3/go.mod h1:yNUff/adKIfPnYQXgp6FQmNu3gLJ6EMg7+/vv2+9pY4= github.com/libp2p/go-libp2p-resource-manager v0.1.0/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y= -github.com/libp2p/go-libp2p-resource-manager v0.1.3 h1:Umf0tW6WNXSb6Uoma0YT56azB5iikL/aeGAP7s7+f5o= github.com/libp2p/go-libp2p-resource-manager v0.1.3/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y= +github.com/libp2p/go-libp2p-resource-manager v0.1.4 h1:RcxMD0pytOUimx3BqTVs6IqItb3H5Qg44SD7XyT68lw= +github.com/libp2p/go-libp2p-resource-manager v0.1.4/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y= github.com/libp2p/go-libp2p-routing v0.0.1/go.mod h1:N51q3yTr4Zdr7V8Jt2JIktVU+3xBBylx1MZeVA6t1Ys= github.com/libp2p/go-libp2p-routing v0.1.0/go.mod h1:zfLhI1RI8RLEzmEaaPwzonRvXeeSHddONWkcTcB54nE= github.com/libp2p/go-libp2p-routing-helpers v0.2.3 h1:xY61alxJ6PurSi+MXbywZpelvuU4U4p/gPTxjqCqTzY= @@ -1181,8 +1182,8 @@ github.com/libp2p/go-libp2p-swarm v0.5.3/go.mod h1:NBn7eNW2lu568L7Ns9wdFrOhgRlkR github.com/libp2p/go-libp2p-swarm v0.8.0/go.mod h1:sOMp6dPuqco0r0GHTzfVheVBh6UEL0L1lXUZ5ot2Fvc= github.com/libp2p/go-libp2p-swarm v0.9.0/go.mod h1:2f8d8uxTJmpeqHF/1ujjdXZp+98nNIbujVOMEZxCbZ8= github.com/libp2p/go-libp2p-swarm v0.10.0/go.mod h1:71ceMcV6Rg/0rIQ97rsZWMzto1l9LnNquef+efcRbmA= -github.com/libp2p/go-libp2p-swarm v0.10.1 h1:lXW3pgGt+BVmkzcFX61erX7l6Lt+WAamNhwa2Kf3eJM= -github.com/libp2p/go-libp2p-swarm v0.10.1/go.mod h1:Pdkq0QU5a+qu+oyqIV3bknMsnzk9lnNyKvB9acJ5aZs= +github.com/libp2p/go-libp2p-swarm v0.10.2 h1:UaXf+CTq6Ns1N2V1EgqJ9Q3xaRsiN7ImVlDMpirMAWw= +github.com/libp2p/go-libp2p-swarm v0.10.2/go.mod h1:Pdkq0QU5a+qu+oyqIV3bknMsnzk9lnNyKvB9acJ5aZs= github.com/libp2p/go-libp2p-testing v0.0.1/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= @@ -1298,8 +1299,9 @@ github.com/libp2p/go-tcp-transport v0.2.3/go.mod h1:9dvr03yqrPyYGIEN6Dy5UvdJZjyP github.com/libp2p/go-tcp-transport v0.2.4/go.mod h1:9dvr03yqrPyYGIEN6Dy5UvdJZjyPFvl1S/igQ5QD1SU= github.com/libp2p/go-tcp-transport v0.2.7/go.mod h1:lue9p1b3VmZj1MhhEGB/etmvF/nBQ0X9CW2DutBT3MM= github.com/libp2p/go-tcp-transport v0.4.0/go.mod h1:0y52Rwrn4076xdJYu/51/qJIdxz+EWDAOG2S45sV3VI= -github.com/libp2p/go-tcp-transport v0.5.0 h1:3ZPW8HAuyRAuFzyabE0hSrCXKKSWzROnZZX7DtcIatY= github.com/libp2p/go-tcp-transport v0.5.0/go.mod h1:UPPL0DIjQqiWRwVAb+CEQlaAG0rp/mCqJfIhFcLHc4Y= +github.com/libp2p/go-tcp-transport v0.5.1 h1:edOOs688VLZAozWC7Kj5/6HHXKNwi9M6wgRmmLa8M6Q= +github.com/libp2p/go-tcp-transport v0.5.1/go.mod h1:UPPL0DIjQqiWRwVAb+CEQlaAG0rp/mCqJfIhFcLHc4Y= github.com/libp2p/go-testutil v0.0.1/go.mod h1:iAcJc/DKJQanJ5ws2V+u5ywdL2n12X1WbbEG+Jjy69I= github.com/libp2p/go-testutil v0.1.0/go.mod h1:81b2n5HypcVyrCg/MJx4Wgfp/VHojytjVe/gLzZ2Ehc= github.com/libp2p/go-ws-transport v0.0.5/go.mod h1:Qbl4BxPfXXhhd/o0wcrgoaItHqA9tnZjoFZnxykuaXU= diff --git a/itests/kit/blockminer.go b/itests/kit/blockminer.go index 91ddc2e26e0..a232d82e026 100644 --- a/itests/kit/blockminer.go +++ b/itests/kit/blockminer.go @@ -3,6 +3,7 @@ package kit import ( "bytes" "context" + "fmt" "sync" "sync/atomic" "testing" @@ -10,6 +11,7 @@ import ( "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/lotus/api" aminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" @@ -63,11 +65,10 @@ func (p *partitionTracker) done(t *testing.T) bool { return uint64(len(p.partitions)) == p.count(t) } -func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *types.SignedMessage) (ret bool) { +func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, msg *types.Message) (ret bool) { defer func() { ret = p.done(t) }() - msg := smsg.Message if !(msg.To == bm.miner.ActorAddr) { return } @@ -82,11 +83,69 @@ func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *type return } +func (bm *BlockMiner) forcePoSt(ctx context.Context, ts *types.TipSet, dlinfo *dline.Info) { + + tracker := newPartitionTracker(ctx, dlinfo.Index, bm) + if !tracker.done(bm.t) { // need to wait for post + bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t)) + poolEvts, err := bm.miner.FullNode.MpoolSub(ctx) //subscribe before checking pending so we don't miss any events + require.NoError(bm.t, err) + + // First check pending messages we'll mine this epoch + msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK) + require.NoError(bm.t, err) + for _, msg := range msgs { + if tracker.recordIfPost(bm.t, bm, &msg.Message) { + fmt.Printf("found post in mempool pending\n") + } + } + + // Account for included but not yet executed messages + for _, bc := range ts.Cids() { + msgs, err := bm.miner.FullNode.ChainGetBlockMessages(ctx, bc) + require.NoError(bm.t, err) + for _, msg := range msgs.BlsMessages { + if tracker.recordIfPost(bm.t, bm, msg) { + fmt.Printf("found post in message of prev tipset\n") + } + + } + for _, msg := range msgs.SecpkMessages { + if tracker.recordIfPost(bm.t, bm, &msg.Message) { + fmt.Printf("found post in message of prev tipset\n") + } + } + } + + // post not yet in mpool, wait for it + if !tracker.done(bm.t) { + bm.t.Logf("post missing from mpool, block mining suspended until it arrives") + POOL: + for { + bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key()) + select { + case <-ctx.Done(): + return + case evt := <-poolEvts: + bm.t.Logf("pool event: %d", evt.Type) + if evt.Type == api.MpoolAdd { + bm.t.Logf("incoming message %v", evt.Message) + if tracker.recordIfPost(bm.t, bm, &evt.Message.Message) { + fmt.Printf("found post in mempool evt\n") + break POOL + } + } + } + } + bm.t.Logf("done waiting on mpool") + } + } +} + // Like MineBlocks but refuses to mine until the window post scheduler has wdpost messages in the mempool // and everything shuts down if a post fails. It also enforces that every block mined succeeds func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Duration) { - - time.Sleep(3 * time.Second) + time.Sleep(time.Second) // wrap context in a cancellable context. ctx, bm.cancel = context.WithCancel(ctx) @@ -94,8 +153,6 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur go func() { defer bm.wg.Done() - activeDeadlines := make(map[int]struct{}) - _ = activeDeadlines ts, err := bm.miner.FullNode.ChainHead(ctx) require.NoError(bm.t, err) wait := make(chan bool) @@ -103,7 +160,7 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur require.NoError(bm.t, err) // read current out curr := <-chg - require.Equal(bm.t, ts.Height(), curr[0].Val.Height()) + require.Equal(bm.t, ts.Height(), curr[0].Val.Height(), "failed sanity check: are multiple miners mining with must post?") for { select { case <-time.After(blocktime): @@ -111,52 +168,15 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur return } nulls := atomic.SwapInt64(&bm.nextNulls, 0) - require.Equal(bm.t, int64(0), nulls, "Injecting > 0 null blocks while `MustPost` mining is currently unsupported") // Wake up and figure out if we are at the end of an active deadline ts, err := bm.miner.FullNode.ChainHead(ctx) require.NoError(bm.t, err) - tsk := ts.Key() - dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, tsk) + dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, ts.Key()) require.NoError(bm.t, err) - if ts.Height()+1 == dlinfo.Last() { // Last epoch in dline, we need to check that miner has posted - - tracker := newPartitionTracker(ctx, dlinfo.Index, bm) - if !tracker.done(bm.t) { // need to wait for post - bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t)) - poolEvts, err := bm.miner.FullNode.MpoolSub(ctx) - require.NoError(bm.t, err) - - // First check pending messages we'll mine this epoch - msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK) - require.NoError(bm.t, err) - for _, msg := range msgs { - tracker.recordIfPost(bm.t, bm, msg) - } - - // post not yet in mpool, wait for it - if !tracker.done(bm.t) { - bm.t.Logf("post missing from mpool, block mining suspended until it arrives") - POOL: - for { - bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key()) - select { - case <-ctx.Done(): - return - case evt := <-poolEvts: - bm.t.Logf("pool event: %d", evt.Type) - if evt.Type == api.MpoolAdd { - bm.t.Logf("incoming message %v", evt.Message) - if tracker.recordIfPost(bm.t, bm, evt.Message) { - break POOL - } - } - } - } - bm.t.Logf("done waiting on mpool") - } - } + if ts.Height()+1+abi.ChainEpoch(nulls) >= dlinfo.Last() { // Next block brings us past the last epoch in dline, we need to wait for miner to post + bm.forcePoSt(ctx, ts, dlinfo) } var target abi.ChainEpoch @@ -173,6 +193,12 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur Done: reportSuccessFn, }) success = <-wait + if !success { + // if we are mining a new null block and it brings us past deadline boundary we need to wait for miner to post + if ts.Height()+1+abi.ChainEpoch(nulls+i) >= dlinfo.Last() { + bm.forcePoSt(ctx, ts, dlinfo) + } + } } // Wait until it shows up on the given full nodes ChainHead diff --git a/itests/kit/circuit.go b/itests/kit/circuit.go new file mode 100644 index 00000000000..d2857010e31 --- /dev/null +++ b/itests/kit/circuit.go @@ -0,0 +1,34 @@ +package kit + +import ( + "fmt" + "testing" + "time" +) + +/* +CircuitBreaker implements a simple time-based circuit breaker used for waiting for async operations to finish. + +This is how it works: + - It runs the `cb` function until it returns true, + - waiting for `throttle` duration between each iteration, + - or at most `timeout` duration until it breaks test execution. + +You can use it if t.Deadline() is not "granular" enough, and you want to know which specific piece of code timed out, +or you need to set different deadlines in the same test. +*/ +func CircuitBreaker(t *testing.T, label string, throttle, timeout time.Duration, cb func() bool) { + tmo := time.After(timeout) + for { + if cb() { + break + } + select { + case <-tmo: + t.Fatal("timeout: ", label) + default: + fmt.Printf("waiting: %s\n", label) + time.Sleep(throttle) + } + } +} diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index 0227ee81e70..6a0158429aa 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -151,6 +151,11 @@ func NewEnsemble(t *testing.T, opts ...EnsembleOpt) *Ensemble { return n } +// Mocknet returns the underlying mocknet. +func (n *Ensemble) Mocknet() mocknet.Mocknet { + return n.mn +} + // FullNode enrolls a new full node. func (n *Ensemble) FullNode(full *TestFullNode, opts ...NodeOpt) *Ensemble { options := DefaultNodeOpts diff --git a/itests/mempool_test.go b/itests/mempool_test.go new file mode 100644 index 00000000000..a1c2a330e50 --- /dev/null +++ b/itests/mempool_test.go @@ -0,0 +1,521 @@ +//stm: #integration +package itests + +import ( + "context" + "testing" + "time" + + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/stretchr/testify/require" +) + +const mPoolThrottle = time.Millisecond * 100 +const mPoolTimeout = time.Second * 10 + +func TestMemPoolPushSingleNode(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_PUSH_002 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, err := firstNode.WalletNew(ctx, types.KTBLS) + require.NoError(t, err) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + } + + sm, err := firstNode.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm.Message.Nonce) + + sms = append(sms, sm) + } + + // check pending messages for address + kit.CircuitBreaker(t, "push messages", mPoolThrottle, mPoolTimeout, func() bool { + msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender) + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true + } + return false + }) + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + ens.BeginMining(blockTime) + + kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool { + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) +} + +func TestMemPoolPushTwoNodes(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_PUSH_002 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, secondNode, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + sender2 := secondNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + addr2, _ := secondNode.WalletNew(ctx, types.KTBLS) + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + + const totalMessages = 10 + + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + var sms []*types.SignedMessage + // push messages to message pools of both nodes + for i := 0; i < totalMessages; i++ { + // first + msg1 := &types.Message{ + From: sender, + To: addr, + Value: each, + } + + sm1, err := firstNode.MpoolPushMessage(ctx, msg1, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm1.Message.Nonce) + sms = append(sms, sm1) + + // second + msg2 := &types.Message{ + From: sender2, + To: addr2, + Value: each, + } + + sm2, err := secondNode.MpoolPushMessage(ctx, msg2, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm2.Message.Nonce) + sms = append(sms, sm2) + } + + ens.BeginMining(blockTime) + + kit.CircuitBreaker(t, "push & mine messages", mPoolThrottle, mPoolTimeout, func() bool { + pending1, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + pending2, err := secondNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending1) == 0 && len(pending2) == 0 { + // Check messages on both nodes + for _, lookMsg := range sms { + msgLookup1, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup1) + + msgLookup2, err := secondNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup2) + } + return true + } + return false + }) +} + +func TestMemPoolClearPending(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001, @CHAIN_MEMPOOL_PENDING_001 + //stm: @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CLEAR_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // Add single message, then clear the pool + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + } + _, err = firstNode.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + + // message should be in the mempool + kit.CircuitBreaker(t, "push message", mPoolThrottle, mPoolTimeout, func() bool { + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + return len(pending) == 1 + }) + + err = firstNode.MpoolClear(ctx, true) + require.NoError(t, err) + + // pool should be empty now + kit.CircuitBreaker(t, "clear mempool", mPoolThrottle, mPoolTimeout, func() bool { + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + return len(pending) == 0 + }) + + // mine a couple of blocks + ens.BeginMining(blockTime) + time.Sleep(5 * blockTime) + + // make sure that the cleared message wasn't picked up and mined + _, err = firstNode.StateWaitMsg(ctx, msg.Cid(), 3, api.LookbackNoLimit, true) + require.Error(t, err) +} + +func TestMemPoolBatchPush(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001 + //stm: @CHAIN_MEMPOOL_BATCH_PUSH_001 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + Nonce: uint64(i), + GasLimit: 50_000_000, + GasFeeCap: types.NewInt(100_000_000), + GasPremium: types.NewInt(1), + } + + signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg) + require.NoError(t, err) + + sms = append(sms, signedMessage) + } + + _, err = firstNode.MpoolBatchPush(ctx, sms) + require.NoError(t, err) + + // check pending messages for address + kit.CircuitBreaker(t, "batch push", mPoolThrottle, mPoolTimeout, func() bool { + msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender) + require.NoError(t, err) + + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true + } + return false + }) + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + require.NoError(t, err) + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + ens.BeginMining(blockTime) + + kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool { + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) +} + +func TestMemPoolPushSingleNodeUntrusted(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001 + //stm: @CHAIN_MEMPOOL_PUSH_003 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + Nonce: uint64(i), + GasLimit: 50_000_000, + GasFeeCap: types.NewInt(100_000_000), + GasPremium: types.NewInt(1), + } + + signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg) + require.NoError(t, err) + + // push untrusted messages + pushedCid, err := firstNode.MpoolPushUntrusted(ctx, signedMessage) + require.NoError(t, err) + require.Equal(t, msg.Cid(), pushedCid) + + sms = append(sms, signedMessage) + } + + kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + // check pending messages for address + msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender) + + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true + } + return false + }) + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + ens.BeginMining(blockTime) + + kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) + +} + +func TestMemPoolBatchPushUntrusted(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001 + //stm: @CHAIN_MEMPOOL_BATCH_PUSH_002 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + Nonce: uint64(i), + GasLimit: 50_000_000, + GasFeeCap: types.NewInt(100_000_000), + GasPremium: types.NewInt(1), + } + + signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg) + require.NoError(t, err) + + sms = append(sms, signedMessage) + } + + _, err = firstNode.MpoolBatchPushUntrusted(ctx, sms) + require.NoError(t, err) + + // check pending messages for address, wait until they are all pushed + kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender) + require.NoError(t, err) + + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true + } + return false + }) + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + ens.BeginMining(blockTime) + + // wait until pending messages are mined, pool pending list should be empty + kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) + +} diff --git a/itests/paych_api_test.go b/itests/paych_api_test.go index a07c499f9cf..7e135a9bea7 100644 --- a/itests/paych_api_test.go +++ b/itests/paych_api_test.go @@ -51,7 +51,7 @@ func TestPaymentChannelsAPI(t *testing.T) { Miner(&miner, &paymentCreator, kit.WithAllSubsystems()). Start(). InterconnectAll() - bms := ens.BeginMining(blockTime) + bms := ens.BeginMiningMustPost(blockTime) bm := bms[0] // send some funds to register the receiver diff --git a/metrics/metrics.go b/metrics/metrics.go index b4032bb1d35..400e76537c5 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -47,6 +47,12 @@ var ( WorkerHostname, _ = tag.NewKey("worker_hostname") StorageID, _ = tag.NewKey("storage_id") SectorState, _ = tag.NewKey("sector_state") + + // rcmgr + ServiceID, _ = tag.NewKey("svc") + ProtocolID, _ = tag.NewKey("proto") + Direction, _ = tag.NewKey("direction") + UseFD, _ = tag.NewKey("use_fd") ) // Measures @@ -143,6 +149,22 @@ var ( SplitstoreCompactionHot = stats.Int64("splitstore/hot", "Number of hot blocks in last compaction", stats.UnitDimensionless) SplitstoreCompactionCold = stats.Int64("splitstore/cold", "Number of cold blocks in last compaction", stats.UnitDimensionless) SplitstoreCompactionDead = stats.Int64("splitstore/dead", "Number of dead blocks in last compaction", stats.UnitDimensionless) + + // rcmgr + RcmgrAllowConn = stats.Int64("rcmgr/allow_conn", "Number of allowed connections", stats.UnitDimensionless) + RcmgrBlockConn = stats.Int64("rcmgr/block_conn", "Number of blocked connections", stats.UnitDimensionless) + RcmgrAllowStream = stats.Int64("rcmgr/allow_stream", "Number of allowed streams", stats.UnitDimensionless) + RcmgrBlockStream = stats.Int64("rcmgr/block_stream", "Number of blocked streams", stats.UnitDimensionless) + RcmgrAllowPeer = stats.Int64("rcmgr/allow_peer", "Number of allowed peer connections", stats.UnitDimensionless) + RcmgrBlockPeer = stats.Int64("rcmgr/block_peer", "Number of blocked peer connections", stats.UnitDimensionless) + RcmgrAllowProto = stats.Int64("rcmgr/allow_proto", "Number of allowed streams attached to a protocol", stats.UnitDimensionless) + RcmgrBlockProto = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol", stats.UnitDimensionless) + RcmgrBlockProtoPeer = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol for a specific peer", stats.UnitDimensionless) + RcmgrAllowSvc = stats.Int64("rcmgr/allow_svc", "Number of allowed streams attached to a service", stats.UnitDimensionless) + RcmgrBlockSvc = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service", stats.UnitDimensionless) + RcmgrBlockSvcPeer = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service for a specific peer", stats.UnitDimensionless) + RcmgrAllowMem = stats.Int64("rcmgr/allow_mem", "Number of allowed memory reservations", stats.UnitDimensionless) + RcmgrBlockMem = stats.Int64("rcmgr/block_mem", "Number of blocked memory reservations", stats.UnitDimensionless) ) var ( @@ -496,6 +518,76 @@ var ( Measure: GraphsyncSendingPeersPending, Aggregation: view.LastValue(), } + + // rcmgr + RcmgrAllowConnView = &view.View{ + Measure: RcmgrAllowConn, + Aggregation: view.Count(), + TagKeys: []tag.Key{Direction, UseFD}, + } + RcmgrBlockConnView = &view.View{ + Measure: RcmgrBlockConn, + Aggregation: view.Count(), + TagKeys: []tag.Key{Direction, UseFD}, + } + RcmgrAllowStreamView = &view.View{ + Measure: RcmgrAllowStream, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID, Direction}, + } + RcmgrBlockStreamView = &view.View{ + Measure: RcmgrBlockStream, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID, Direction}, + } + RcmgrAllowPeerView = &view.View{ + Measure: RcmgrAllowPeer, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID}, + } + RcmgrBlockPeerView = &view.View{ + Measure: RcmgrBlockPeer, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID}, + } + RcmgrAllowProtoView = &view.View{ + Measure: RcmgrAllowProto, + Aggregation: view.Count(), + TagKeys: []tag.Key{ProtocolID}, + } + RcmgrBlockProtoView = &view.View{ + Measure: RcmgrBlockProto, + Aggregation: view.Count(), + TagKeys: []tag.Key{ProtocolID}, + } + RcmgrBlockProtoPeerView = &view.View{ + Measure: RcmgrBlockProtoPeer, + Aggregation: view.Count(), + TagKeys: []tag.Key{ProtocolID, PeerID}, + } + RcmgrAllowSvcView = &view.View{ + Measure: RcmgrAllowSvc, + Aggregation: view.Count(), + TagKeys: []tag.Key{ServiceID}, + } + RcmgrBlockSvcView = &view.View{ + Measure: RcmgrBlockSvc, + Aggregation: view.Count(), + TagKeys: []tag.Key{ServiceID}, + } + RcmgrBlockSvcPeerView = &view.View{ + Measure: RcmgrBlockSvcPeer, + Aggregation: view.Count(), + TagKeys: []tag.Key{ServiceID, PeerID}, + } + RcmgrAllowMemView = &view.View{ + Measure: RcmgrAllowMem, + Aggregation: view.Count(), + } + RcmgrBlockMemView = &view.View{ + Measure: RcmgrBlockMem, + Aggregation: view.Count(), + } ) // DefaultViews is an array of OpenCensus views for metric gathering purposes @@ -517,6 +609,21 @@ var DefaultViews = func() []*view.View { GraphsyncSendingTotalMemoryAllocatedView, GraphsyncSendingTotalPendingAllocationsView, GraphsyncSendingPeersPendingView, + + RcmgrAllowConnView, + RcmgrBlockConnView, + RcmgrAllowStreamView, + RcmgrBlockStreamView, + RcmgrAllowPeerView, + RcmgrBlockPeerView, + RcmgrAllowProtoView, + RcmgrBlockProtoView, + RcmgrBlockProtoPeerView, + RcmgrAllowSvcView, + RcmgrBlockSvcView, + RcmgrBlockSvcPeerView, + RcmgrAllowMemView, + RcmgrBlockMemView, } views = append(views, blockstore.DefaultViews...) views = append(views, rpcmetrics.DefaultViews...) diff --git a/node/config/def.go b/node/config/def.go index 1573508665c..aceeaadf5df 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -139,6 +139,7 @@ func DefaultStorageMiner() *StorageMiner { AllowUnseal: true, AllowReplicaUpdate: true, AllowProveReplicaUpdate2: true, + AllowRegenSectorKey: true, // Default to 10 - tcp should still be able to figure this out, and // it's the ratio between 10gbit / 1gbit diff --git a/node/modules/lp2p/rcmgr.go b/node/modules/lp2p/rcmgr.go index 8b286ff5ee6..0bc4dd6b215 100644 --- a/node/modules/lp2p/rcmgr.go +++ b/node/modules/lp2p/rcmgr.go @@ -11,9 +11,15 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" rcmgr "github.com/libp2p/go-libp2p-resource-manager" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/repo" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" ) func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) { @@ -43,6 +49,8 @@ func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceMan // TODO: also set appropriate default limits for lotus protocols libp2p.SetDefaultServiceLimits(limiter) + opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{})) + if os.Getenv("LOTUS_DEBUG_RCMGR") != "" { debugPath := filepath.Join(repoPath, "debug") if err := os.MkdirAll(debugPath, 0755); err != nil { @@ -70,3 +78,109 @@ func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts { Opts: []libp2p.Option{libp2p.ResourceManager(mgr)}, } } + +type rcmgrMetrics struct{} + +func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) { + ctx := context.Background() + if dir == network.DirInbound { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound")) + } + if usefd { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false")) + } + stats.Record(ctx, metrics.RcmgrAllowConn.M(1)) +} + +func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) { + ctx := context.Background() + if dir == network.DirInbound { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound")) + } + if usefd { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false")) + } + stats.Record(ctx, metrics.RcmgrBlockConn.M(1)) +} + +func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) { + ctx := context.Background() + if dir == network.DirInbound { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound")) + } + stats.Record(ctx, metrics.RcmgrAllowStream.M(1)) +} + +func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) { + ctx := context.Background() + if dir == network.DirInbound { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound")) + } + stats.Record(ctx, metrics.RcmgrBlockStream.M(1)) +} + +func (r rcmgrMetrics) AllowPeer(p peer.ID) { + ctx := context.Background() + stats.Record(ctx, metrics.RcmgrAllowPeer.M(1)) +} + +func (r rcmgrMetrics) BlockPeer(p peer.ID) { + ctx := context.Background() + stats.Record(ctx, metrics.RcmgrBlockPeer.M(1)) +} + +func (r rcmgrMetrics) AllowProtocol(proto protocol.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) + stats.Record(ctx, metrics.RcmgrAllowProto.M(1)) +} + +func (r rcmgrMetrics) BlockProtocol(proto protocol.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) + stats.Record(ctx, metrics.RcmgrBlockProto.M(1)) +} + +func (r rcmgrMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) + stats.Record(ctx, metrics.RcmgrBlockProtoPeer.M(1)) +} + +func (r rcmgrMetrics) AllowService(svc string) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) + stats.Record(ctx, metrics.RcmgrAllowSvc.M(1)) +} + +func (r rcmgrMetrics) BlockService(svc string) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) + stats.Record(ctx, metrics.RcmgrBlockSvc.M(1)) +} + +func (r rcmgrMetrics) BlockServicePeer(svc string, p peer.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) + stats.Record(ctx, metrics.RcmgrBlockSvcPeer.M(1)) +} + +func (r rcmgrMetrics) AllowMemory(size int) { + stats.Record(context.Background(), metrics.RcmgrAllowMem.M(1)) +} + +func (r rcmgrMetrics) BlockMemory(size int) { + stats.Record(context.Background(), metrics.RcmgrBlockMem.M(1)) +} diff --git a/testplans/lotus-soup/go.sum b/testplans/lotus-soup/go.sum index 23877f87b44..852d462d279 100644 --- a/testplans/lotus-soup/go.sum +++ b/testplans/lotus-soup/go.sum @@ -473,8 +473,8 @@ github.com/filecoin-project/specs-actors/v7 v7.0.0-20211117170924-fd07a4c7dff9/g github.com/filecoin-project/specs-actors/v7 v7.0.0-20211222192039-c83bea50c402/go.mod h1:p6LIOFezA1rgRLMewbvdi3Pp6SAu+q9FtJ9CAleSjrE= github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1 h1:FuDaXIbcw2hRsFI8SDTmsGGCE+NumpF6aiBoU/2X5W4= github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1/go.mod h1:TA5FwCna+Yi36POaT7SLKXsgEDvJwc0V/L6ZsO19B9M= -github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9 h1:oUYOvF7EvdXS0Zmk9mNkaB6Bu0l+WXBYPzVodKMiLug= -github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9/go.mod h1:Tb88Zq+IBJbvAn3mS89GYj3jdRThBTE/771HCVZdRJU= +github.com/filecoin-project/specs-storage v0.2.0 h1:Y4UDv0apRQ3zI2GiPPubi8JblpUZZphEdaJUxCutfyg= +github.com/filecoin-project/specs-storage v0.2.0/go.mod h1:Tb88Zq+IBJbvAn3mS89GYj3jdRThBTE/771HCVZdRJU= github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=