From 8e765b4dccaa6e5539d1407c5bf8168e9e614160 Mon Sep 17 00:00:00 2001 From: Wyatt Date: Thu, 9 Aug 2018 14:41:35 -0400 Subject: [PATCH 1/2] Refactor Mining Code Before this mining was split roughly into the Worker and BlockGenerator interfaces. This commit changes this to a Scheduler and Worker interface. Scheduler -- we are now creating a more complex mining strategy that involves timing. The Scheduler interface is where these strategies fit into the mining code. This interface has a Start() method that begins mining and provides input and output channels to the rest of the system just like the old Worker interface. Schedulers schedule mining work for a Worker. Worker -- as before Worker is the thing that actually mines. However in this commit Worker is now seperate from the thing that Starts mining (that is the Scheduler). Workers do work with the interface Mine method. BlockGenerator -- the BlockGenerator has been absorbed into the Worker. Here is my rational. 1. This facilitates making the code easier to follow with respect to @whyrusleeping's issues. We are now one interface and one object fewer in the mining code. 2. The seperation is no longer justified. In my view now that the worker is no longer responsible for scheduling, there is enough room in the abstraction for block generation as a member function. Looking back at the code of Mine I was not convinced this interface is worth the trouble. It seems likely that any alternate implementation of Generate would warrant an alternate implementation of Mine. I don't forsee lots of swapping implementations across the BlockGenerator interface and don't think testing Mine against an abstract Generate buys us that much 3. The seperation is actively getting in the way. worker.Mine has been using fake power values for a while. The way to fix this is to use the powerTableView and getStateTree dependencies used by the block generator. It makes far more sense for one thing to share these dependencies than to inject them into two things. With this refactor we are much closer to getting actual power values. Lots of tests have changed but this commit has kept most of the old behavioral coverage. Moving the seam to Scheduler/Worker is about as fine-grained as the old seam between Worker/BlockGenerator. Lots of Scheduler tests are missing because the timingScheduler is still broken. Some of the behavior moved into the scheduler from the Mine method, e.g. null block incrementing is not yet tested. --- api/impl/mining.go | 10 +- .../{block_generator.go => block_generate.go} | 78 +-- mining/block_generator_test.go | 369 ------------ mining/scheduler.go | 345 +++++++++++ mining/scheduler_test.go | 120 ++++ mining/testing.go | 50 +- mining/worker.go | 216 +++---- mining/worker_test.go | 548 ++++++++++++------ node/node.go | 17 +- node/node_test.go | 15 +- node/testing.go | 8 +- tools/go-fakecoin/main.go | 13 +- 12 files changed, 993 insertions(+), 796 deletions(-) rename mining/{block_generator.go => block_generate.go} (51%) delete mode 100644 mining/block_generator_test.go create mode 100644 mining/scheduler.go create mode 100644 mining/scheduler_test.go diff --git a/api/impl/mining.go b/api/impl/mining.go index 7e1142feed..42a545af59 100644 --- a/api/impl/mining.go +++ b/api/impl/mining.go @@ -21,16 +21,16 @@ func (api *nodeMining) Once(ctx context.Context) (*types.Block, error) { nd := api.api.node ts := nd.ChainMgr.GetHeaviestTipSet() - blockGenerator := mining.NewBlockGenerator(nd.MsgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { - return nd.ChainMgr.State(ctx, ts.ToSlice()) - }, nd.ChainMgr.Weight, core.ApplyMessages, nd.ChainMgr.PwrTableView, nd.Blockstore, nd.CborStore) - miningAddr, err := nd.MiningAddress() if err != nil { return nil, err } - res := mining.MineOnce(ctx, mining.NewWorker(blockGenerator, miningAddr), ts) + worker := mining.NewMiningWorker(nd.MsgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { + return nd.ChainMgr.State(ctx, ts.ToSlice()) + }, nd.ChainMgr.Weight, core.ApplyMessages, nd.ChainMgr.PwrTableView, nd.Blockstore, nd.CborStore, miningAddr) + + res := mining.MineOnce(ctx, mining.NewScheduler(worker), ts) if res.Err != nil { return nil, res.Err } diff --git a/mining/block_generator.go b/mining/block_generate.go similarity index 51% rename from mining/block_generator.go rename to mining/block_generate.go index c4964027e2..1d5a22e170 100644 --- a/mining/block_generator.go +++ b/mining/block_generate.go @@ -1,78 +1,37 @@ package mining +// Block generation is part of the logic of the default Worker (miningWorker). +// 'generate' is that function that actually creates a new block from a base +// TipSet using the miningWorker's many utilities. + import ( "context" - "gx/ipfs/QmSkuaNgyGmV8c1L3cZNWcUxRJV6J3nsD96JVQPcWcwtyW/go-hamt-ipld" errors "gx/ipfs/QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy/errors" - "gx/ipfs/QmcD7SqfyQyA91TZUQ7VPRYbGarxmY7EsQewVYMuN5LNSv/go-ipfs-blockstore" - logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log" "github.com/filecoin-project/go-filecoin/address" "github.com/filecoin-project/go-filecoin/core" - "github.com/filecoin-project/go-filecoin/state" "github.com/filecoin-project/go-filecoin/types" "github.com/filecoin-project/go-filecoin/vm" ) -var log = logging.Logger("mining") - -// GetStateTree is a function that gets the aggregate state tree of a TipSet. It's -// its own function to facilitate testing. -type GetStateTree func(context.Context, core.TipSet) (state.Tree, error) - -// GetWeight is a function that calculates the weight of a TipSet. Weight is -// expressed as two uint64s comprising a rational number. -type GetWeight func(context.Context, core.TipSet) (uint64, uint64, error) - -// BlockGenerator is the primary interface for blockGenerator. -type BlockGenerator interface { - Generate(context.Context, core.TipSet, types.Signature, uint64, types.Address) (*types.Block, error) -} - -// NewBlockGenerator returns a new BlockGenerator. -func NewBlockGenerator(messagePool *core.MessagePool, getStateTree GetStateTree, getWeight GetWeight, applyMessages miningApplier, powerTable core.PowerTableView, bs blockstore.Blockstore, cstore *hamt.CborIpldStore) BlockGenerator { - return &blockGenerator{ - messagePool: messagePool, - getStateTree: getStateTree, - getWeight: getWeight, - applyMessages: applyMessages, - powerTable: powerTable, - blockstore: bs, - cstore: cstore, - } -} - -type miningApplier func(ctx context.Context, messages []*types.SignedMessage, st state.Tree, vms vm.StorageMap, bh *types.BlockHeight) (core.ApplyMessagesResponse, error) - -// blockGenerator generates new blocks for inclusion in the chain. -type blockGenerator struct { - messagePool *core.MessagePool - getStateTree GetStateTree - getWeight GetWeight - applyMessages miningApplier - powerTable core.PowerTableView - blockstore blockstore.Blockstore - cstore *hamt.CborIpldStore -} - // Generate returns a new block created from the messages in the pool. -func (b blockGenerator) Generate(ctx context.Context, baseTipSet core.TipSet, ticket types.Signature, nullBlockCount uint64, miningAddress types.Address) (*types.Block, error) { - stateTree, err := b.getStateTree(ctx, baseTipSet) +func (w *MiningWorker) Generate(ctx context.Context, baseTipSet core.TipSet, ticket types.Signature, nullBlockCount uint64) (*types.Block, error) { + stateTree, err := w.getStateTree(ctx, baseTipSet) if err != nil { return nil, errors.Wrap(err, "get state tree") } - if !b.powerTable.HasPower(ctx, stateTree, b.blockstore, miningAddress) { - return nil, errors.Errorf("bad miner address, miner must store files before mining: %s", miningAddress) + if !w.powerTable.HasPower(ctx, stateTree, w.blockstore, w.minerAddr) { + return nil, errors.Errorf("bad miner address, miner must store files before mining: %s", w.minerAddr) } - wNum, wDenom, err := b.getWeight(ctx, baseTipSet) + wNum, wDenom, err := w.getWeight(ctx, baseTipSet) if err != nil { return nil, errors.Wrap(err, "get weight") } - nonce, err := core.NextNonce(ctx, stateTree, b.messagePool, address.NetworkAddress) + nonce, err := core.NextNonce(ctx, stateTree, w.messagePool, address.NetworkAddress) if err != nil { return nil, errors.Wrap(err, "next nonce") } @@ -83,19 +42,20 @@ func (b blockGenerator) Generate(ctx context.Context, baseTipSet core.TipSet, ti } blockHeight := baseHeight + nullBlockCount + 1 - rewardMsg := types.NewMessage(address.NetworkAddress, miningAddress, nonce, types.NewAttoFILFromFIL(1000), "", nil) + rewardMsg := types.NewMessage(address.NetworkAddress, w.minerAddr, nonce, types.NewAttoFILFromFIL(1000), "", nil) srewardMsg := &types.SignedMessage{ Message: *rewardMsg, Signature: nil, } - pending := b.messagePool.Pending() + pending := w.messagePool.Pending() messages := make([]*types.SignedMessage, len(pending)+1) messages[0] = srewardMsg // Reward message must come first since this is a part of the consensus rules. - copy(messages[1:], core.OrderMessagesByNonce(b.messagePool.Pending())) + copy(messages[1:], core.OrderMessagesByNonce(w.messagePool.Pending())) + + vms := vm.NewStorageMap(w.blockstore) + res, err := w.applyMessages(ctx, messages, stateTree, vms, types.NewBlockHeight(blockHeight)) - vms := vm.NewStorageMap(b.blockstore) - res, err := b.applyMessages(ctx, messages, stateTree, vms, types.NewBlockHeight(blockHeight)) if err != nil { return nil, errors.Wrap(err, "generate apply messages") } @@ -111,7 +71,7 @@ func (b blockGenerator) Generate(ctx context.Context, baseTipSet core.TipSet, ti } next := &types.Block{ - Miner: miningAddress, + Miner: w.minerAddr, Height: types.Uint64(blockHeight), Messages: res.SuccessfulMessages, MessageReceipts: receipts, @@ -136,7 +96,7 @@ func (b blockGenerator) Generate(ctx context.Context, baseTipSet core.TipSet, ti for _, msg := range res.SuccessfulMessages { mc, err := msg.Cid() if err == nil { - b.messagePool.Remove(mc) + w.messagePool.Remove(mc) } } @@ -148,7 +108,7 @@ func (b blockGenerator) Generate(ctx context.Context, baseTipSet core.TipSet, ti log.Infof("permanent ApplyMessage failure, [%S]", mc.String()) // Intentionally not handling error case, since it just means we won't be able to remove from pool. if err == nil { - b.messagePool.Remove(mc) + w.messagePool.Remove(mc) } } diff --git a/mining/block_generator_test.go b/mining/block_generator_test.go deleted file mode 100644 index 52965997d7..0000000000 --- a/mining/block_generator_test.go +++ /dev/null @@ -1,369 +0,0 @@ -package mining - -import ( - "context" - "errors" - "testing" - - "github.com/filecoin-project/go-filecoin/address" - "github.com/filecoin-project/go-filecoin/core" - "github.com/filecoin-project/go-filecoin/state" - "github.com/filecoin-project/go-filecoin/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "gx/ipfs/QmSkuaNgyGmV8c1L3cZNWcUxRJV6J3nsD96JVQPcWcwtyW/go-hamt-ipld" - "gx/ipfs/QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP/go-cid" - "gx/ipfs/QmcD7SqfyQyA91TZUQ7VPRYbGarxmY7EsQewVYMuN5LNSv/go-ipfs-blockstore" - "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore" - - "github.com/filecoin-project/go-filecoin/vm" -) - -var seed = types.GenerateKeyInfoSeed() -var ki = types.MustGenerateKeyInfo(10, seed) -var mockSigner = types.NewMockSigner(ki) - -func TestGenerate(t *testing.T) { - // TODO fritz use core.FakeActor for state/contract tests for generate: - // - test nonces out of order - // - test nonce gap -} - -func sharedSetupInitial() (*hamt.CborIpldStore, *core.MessagePool, blockstore.Blockstore, *cid.Cid) { - cst := hamt.NewCborStore() - pool := core.NewMessagePool() - // Install the fake actor so we can execute it. - fakeActorCodeCid := types.AccountActorCodeCid - ds := datastore.NewMapDatastore() - bs := blockstore.NewBlockstore(ds) - return cst, pool, bs, fakeActorCodeCid -} - -func sharedSetup(t *testing.T) (state.Tree, *hamt.CborIpldStore, blockstore.Blockstore, *core.MessagePool, []types.Address) { - require := require.New(t) - cst, pool, bs, fakeActorCodeCid := sharedSetupInitial() - vms := vm.NewStorageMap(bs) - - // TODO: We don't need fake actors here, so these could be made real. - // And the NetworkAddress actor can/should be the real one. - // Stick two fake actors in the state tree so they can talk. - addr1, addr2, addr3, addr4 := mockSigner.Addresses[0], mockSigner.Addresses[1], mockSigner.Addresses[2], mockSigner.Addresses[3] - act1, act2, fakeNetAct := core.RequireNewFakeActor(require, vms, addr1, fakeActorCodeCid), core.RequireNewFakeActor(require, - vms, addr2, fakeActorCodeCid), core.RequireNewFakeActor(require, vms, addr3, fakeActorCodeCid) - minerAct := core.RequireNewMinerActor(require, vms, addr4, addr1, []byte{}, types.NewBytesAmount(10000), core.RequireRandomPeerID(), types.NewAttoFILFromFIL(10000)) - _, st := core.RequireMakeStateTree(require, cst, map[types.Address]*types.Actor{ - // Ensure core.NetworkAddress exists to prevent mining reward message failures. - address.NetworkAddress: fakeNetAct, - addr1: act1, - addr2: act2, - addr4: minerAct, - }) - return st, cst, bs, pool, []types.Address{addr1, addr2, addr3, addr4} -} - -func TestApplyMessagesForSuccessTempAndPermFailures(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - cst, _, bs, fakeActorCodeCid := sharedSetupInitial() - vms := vm.NewStorageMap(bs) - - // Stick two fake actors in the state tree so they can talk. - addr1, addr2 := mockSigner.Addresses[0], mockSigner.Addresses[1] - act1 := core.RequireNewFakeActor(require, vms, addr1, fakeActorCodeCid) - _, st := core.RequireMakeStateTree(require, cst, map[types.Address]*types.Actor{ - addr1: act1, - }) - - ctx := context.Background() - - // NOTE: it is important that each category (success, temporary failure, permanent failure) is represented below. - // If a given message's category changes in the future, it needs to be replaced here in tests by another so we fully - // exercise the categorization. - // addr2 doesn't correspond to an extant account, so this will trigger errAccountNotFound -- a temporary failure. - msg1 := types.NewMessage(addr2, addr1, 0, nil, "", nil) - smsg1, err := types.NewSignedMessage(*msg1, &mockSigner) - require.NoError(err) - - // This is actually okay and should result in a receipt - msg2 := types.NewMessage(addr1, addr2, 0, nil, "", nil) - smsg2, err := types.NewSignedMessage(*msg2, &mockSigner) - require.NoError(err) - - // The following two are sending to self -- errSelfSend, a permanent error. - msg3 := types.NewMessage(addr1, addr1, 1, nil, "", nil) - smsg3, err := types.NewSignedMessage(*msg3, &mockSigner) - require.NoError(err) - - msg4 := types.NewMessage(addr2, addr2, 1, nil, "", nil) - smsg4, err := types.NewSignedMessage(*msg4, &mockSigner) - require.NoError(err) - - messages := []*types.SignedMessage{smsg1, smsg2, smsg3, smsg4} - - res, err := core.ApplyMessages(ctx, messages, st, vms, types.NewBlockHeight(0)) - - assert.Len(res.PermanentFailures, 2) - assert.Contains(res.PermanentFailures, smsg3) - assert.Contains(res.PermanentFailures, smsg4) - - assert.Len(res.TemporaryFailures, 1) - assert.Contains(res.TemporaryFailures, smsg1) - - assert.Len(res.Results, 1) - assert.Contains(res.SuccessfulMessages, smsg2) - - assert.NoError(err) -} - -func TestGenerateMultiBlockTipSet(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - ctx := context.Background() - newCid := types.NewCidForTestGetter() - st, cst, bs, pool, addrs := sharedSetup(t) - - getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { - return st, nil - } - getWeight := func(c context.Context, ts core.TipSet) (uint64, uint64, error) { - num, den, err := ts.ParentWeight() - if err != nil { - return uint64(0), uint64(0), err - } - return num + uint64(int64(len(ts))*int64(core.ECV)), den, nil - } - generator := NewBlockGenerator(pool, getStateTree, getWeight, core.ApplyMessages, &core.TestView{}, bs, cst) - - parents := types.NewSortedCidSet(newCid()) - stateRoot := newCid() - baseBlock1 := types.Block{ - Parents: parents, - Height: types.Uint64(100), - ParentWeightNum: types.Uint64(1000), - StateRoot: stateRoot, - } - baseBlock2 := types.Block{ - Parents: parents, - Height: types.Uint64(100), - ParentWeightNum: types.Uint64(1000), - StateRoot: stateRoot, - Nonce: 1, - } - blk, err := generator.Generate(ctx, core.RequireNewTipSet(require, &baseBlock1, &baseBlock2), nil, 0, addrs[3]) - assert.NoError(err) - - assert.Len(blk.Messages, 1) // This is the mining reward. - assert.Equal(types.Uint64(101), blk.Height) - assert.Equal(types.Uint64(1020), blk.ParentWeightNum) -} - -// After calling Generate, do the new block and new state of the message pool conform to our expectations? -func TestGeneratePoolBlockResults(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - ctx := context.Background() - newCid := types.NewCidForTestGetter() - st, cst, bs, pool, addrs := sharedSetup(t) - - getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { - return st, nil - } - getWeight := func(c context.Context, ts core.TipSet) (uint64, uint64, error) { - num, den, err := ts.ParentWeight() - if err != nil { - return uint64(0), uint64(0), err - } - return num + uint64(int64(len(ts))*int64(core.ECV)), den, nil - } - generator := NewBlockGenerator(pool, getStateTree, getWeight, core.ApplyMessages, &core.TestView{}, bs, cst) - - // addr3 doesn't correspond to an extant account, so this will trigger errAccountNotFound -- a temporary failure. - msg1 := types.NewMessage(addrs[2], addrs[0], 0, nil, "", nil) - smsg1, err := types.NewSignedMessage(*msg1, &mockSigner) - require.NoError(err) - - // This is actually okay and should result in a receipt - msg2 := types.NewMessage(addrs[0], addrs[1], 0, nil, "", nil) - smsg2, err := types.NewSignedMessage(*msg2, &mockSigner) - require.NoError(err) - - // The following two are sending to self -- errSelfSend, a permanent error. - msg3 := types.NewMessage(addrs[0], addrs[0], 1, nil, "", nil) - smsg3, err := types.NewSignedMessage(*msg3, &mockSigner) - require.NoError(err) - - msg4 := types.NewMessage(addrs[1], addrs[1], 0, nil, "", nil) - smsg4, err := types.NewSignedMessage(*msg4, &mockSigner) - require.NoError(err) - - pool.Add(smsg1) - pool.Add(smsg2) - pool.Add(smsg3) - pool.Add(smsg4) - - assert.Len(pool.Pending(), 4) - baseBlock := types.Block{ - Parents: types.NewSortedCidSet(newCid()), - Height: types.Uint64(100), - StateRoot: newCid(), - } - blk, err := generator.Generate(ctx, core.RequireNewTipSet(require, &baseBlock), nil, 0, addrs[3]) - assert.NoError(err) - - assert.Len(pool.Pending(), 1) // This is the temporary failure. - assert.Contains(pool.Pending(), smsg1) - - assert.Len(blk.Messages, 2) // This is the good message + the mining reward. - - // Is the mining reward first? This will fail 50% of the time if we don't force the reward to come first. - assert.Equal(address.NetworkAddress, blk.Messages[0].From) -} - -func TestGenerateSetsBasicFields(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - ctx := context.Background() - newCid := types.NewCidForTestGetter() - - st, cst, bs, pool, addrs := sharedSetup(t) - - getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { - return st, nil - } - getWeight := func(c context.Context, ts core.TipSet) (uint64, uint64, error) { - num, den, err := ts.ParentWeight() - if err != nil { - return uint64(0), uint64(0), err - } - return num + uint64(int64(len(ts))*int64(core.ECV)), den, nil - } - generator := NewBlockGenerator(pool, getStateTree, getWeight, core.ApplyMessages, &core.TestView{}, bs, cst) - - h := types.Uint64(100) - wNum := types.Uint64(1000) - wDenom := types.Uint64(1) - baseBlock := types.Block{ - Height: h, - ParentWeightNum: wNum, - ParentWeightDenom: wDenom, - StateRoot: newCid(), - } - baseTipSet := core.RequireNewTipSet(require, &baseBlock) - blk, err := generator.Generate(ctx, baseTipSet, nil, 0, addrs[3]) - assert.NoError(err) - - assert.Equal(h+1, blk.Height) - assert.Equal(addrs[3], blk.Miner) - - blk, err = generator.Generate(ctx, baseTipSet, nil, 1, addrs[3]) - assert.NoError(err) - - assert.Equal(h+2, blk.Height) - assert.Equal(wNum+10.0, blk.ParentWeightNum) - assert.Equal(wDenom, blk.ParentWeightDenom) - assert.Equal(addrs[3], blk.Miner) -} - -func TestGenerateWithoutMessages(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - ctx := context.Background() - newCid := types.NewCidForTestGetter() - - st, cst, bs, pool, addrs := sharedSetup(t) - - getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { - return st, nil - } - getWeight := func(c context.Context, ts core.TipSet) (uint64, uint64, error) { - num, den, err := ts.ParentWeight() - if err != nil { - return uint64(0), uint64(0), err - } - return num + uint64(int64(len(ts))*int64(core.ECV)), den, nil - } - generator := NewBlockGenerator(pool, getStateTree, getWeight, core.ApplyMessages, &core.TestView{}, bs, cst) - - assert.Len(pool.Pending(), 0) - baseBlock := types.Block{ - Parents: types.NewSortedCidSet(newCid()), - Height: types.Uint64(100), - StateRoot: newCid(), - } - blk, err := generator.Generate(ctx, core.RequireNewTipSet(require, &baseBlock), nil, 0, addrs[3]) - assert.NoError(err) - - assert.Len(pool.Pending(), 0) // This is the temporary failure. - assert.Len(blk.Messages, 1) // This is the mining reward. -} - -// If something goes wrong while generating a new block, even as late as when flushing it, -// no block should be returned, and the message pool should not be pruned. -func TestGenerateError(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - ctx := context.Background() - newCid := types.NewCidForTestGetter() - - st, cst, bs, pool, addrs := sharedSetup(t) - - explodingGetStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { - stt := WrapStateTreeForTest(st) - stt.TestFlush = func(ctx context.Context) (*cid.Cid, error) { - return nil, errors.New("boom no flush") - } - - return stt, nil - } - getWeight := func(c context.Context, ts core.TipSet) (uint64, uint64, error) { - num, den, err := ts.ParentWeight() - if err != nil { - return uint64(0), uint64(0), err - } - return num + uint64(int64(len(ts))*int64(core.ECV)), den, nil - } - - generator := NewBlockGenerator(pool, explodingGetStateTree, getWeight, core.ApplyMessages, &core.TestView{}, bs, cst) - - // This is actually okay and should result in a receipt - msg := types.NewMessage(addrs[0], addrs[1], 0, nil, "", nil) - smsg, err := types.NewSignedMessage(*msg, &mockSigner) - require.NoError(err) - pool.Add(smsg) - - assert.Len(pool.Pending(), 1) - baseBlock := types.Block{ - Parents: types.NewSortedCidSet(newCid()), - Height: types.Uint64(100), - StateRoot: newCid(), - } - baseTipSet := core.RequireNewTipSet(require, &baseBlock) - blk, err := generator.Generate(ctx, baseTipSet, nil, 0, addrs[3]) - assert.Error(err, "boom") - assert.Nil(blk) - - assert.Len(pool.Pending(), 1) // No messages are removed from the pool. -} - -type StateTreeForTest struct { - state.Tree - TestFlush func(ctx context.Context) (*cid.Cid, error) -} - -func WrapStateTreeForTest(st state.Tree) *StateTreeForTest { - stt := StateTreeForTest{ - st, - st.Flush, - } - return &stt -} - -func (st *StateTreeForTest) Flush(ctx context.Context) (*cid.Cid, error) { - return st.TestFlush(ctx) -} diff --git a/mining/scheduler.go b/mining/scheduler.go new file mode 100644 index 0000000000..3283ee1777 --- /dev/null +++ b/mining/scheduler.go @@ -0,0 +1,345 @@ +package mining + +// The Scheduler listens for new heaviest TipSets and schedules mining work on +// these TipSets. The scheduler is ultimately responsible for informing the +// rest of the system about new successful blocks. This is the interface to +// implement if you want to explore an alternate mining strategy. +// +// The default Scheduler implementation, timingScheduler, operates in two +// states, 'collect', where the scheduler listens for new heaviest tipsets to +// replace the mining base, and 'ignore', where mining proceeds uninterrupted. + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/filecoin-project/go-filecoin/core" +) + +// Scheduler is the mining interface consumers use. When you Start() the +// scheduler it returns two channels (inCh, outCh) and a sync.WaitGroup: +// - inCh: the caller sends Inputs to mine on to this channel. +// - outCh: the worker sends Outputs to the caller on this channel. +// - doneWg: signals that the scheduler and any goroutines it launched +// have stopped. (Context cancelation happens async, so you +// need some way to know when it has actually stopped.) +// +// Once Start()ed, the Scheduler can be stopped by canceling its miningCtx, which +// will signal on doneWg when it's actually done. Canceling an Input.Ctx +// just cancels the run for that input. Canceling miningCtx cancels any run +// in progress and shuts the worker down. +type Scheduler interface { + Start(miningCtx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) +} + +type timingScheduler struct { + miningAt uint64 + base Input + state receiveState + + worker Worker +} + +type receiveState int + +const ( + delay = iota + grace + ignore + end +) + +// mineGrace is the protocol grace period +const mineGrace = time.Second + +// mineSleepTime is the protocol's estimated mining time +const mineSleepTime = mineGrace * 30 + +// mineDelay is the protocol mining delay at the beginning of an epoch +const mineDelay = mineGrace * 2 + +// runMine runs the miner, inputs to the miner's goroutine are accepted on +// mineInCh. For each new input on mineInCh, runMine cancels the old mining +// run to run the new input. There is only one outstanding call to the 'mine' +// function at a time. Newly mined valid blocks are sent out on outCh. A +// signal indicating that a run of 'mine' was completed without interruption, +// even if it did not result in a valid block, is sent on mineOutCh. +func (s *timingScheduler) runMine(miningCtx context.Context, outCh chan<- Output, mineInCh <-chan Input, mineOutCh chan<- struct{}, doneWg sync.WaitGroup) { + defer doneWg.Done() + currentRunCancel := func() {} + var currentRunCtx context.Context + for { + select { + case <-miningCtx.Done(): + fmt.Printf("runMine: miningCtx.Done()\n") + currentRunCancel() + return + case input, ok := <-mineInCh: + fmt.Printf("runMine: mineInCh: %v\n", input) + if ok { + currentRunCancel() + currentRunCtx, currentRunCancel = context.WithCancel(input.Ctx) + doneWg.Add(1) + go func() { + defer doneWg.Done() + fmt.Printf("mine: running 'mine'\n") + s.worker.Mine(currentRunCtx, input, outCh) + mineOutCh <- struct{}{} + }() + } else { + // sender closed mineCh, close and ignore + mineInCh = nil + } + } + } +} + +// delay runs for the protocol mining delay "mineDelay" and updates the base +// tipset for mining to the latest tipset read from the input channel. +// delay initializes the next round of mining, canceling any previous mining +// calls still running. If mining work from the previous period is not +// completed delay logs a warning as this indicates someone is speeding up PoST +// generation somehow. +func (s *timingScheduler) delay(miningCtx context.Context, inCh <-chan Input, mineInCh chan<- Input, mineOutCh <-chan struct{}) { + epoch, err := s.base.TipSet.Height() + if err != nil { + panic("this can't be happening") + } + epoch += s.base.NullBlks + // No blocks found last epoch? Add a null block for this epoch's input. + if s.miningAt > epoch { + input := NewInput(s.base.Ctx, s.base.TipSet) + input.NullBlks = s.base.NullBlks + uint64(1) + s.base = input + epoch++ + } + + // This is unexpected and means someone is generating + // PoSTs too fast. + defer func() { + if s.miningAt != epoch { + // log.Warningf("Not enough time to mine on epochs %d to %d", s.miningAt, epoch - 1) + s.miningAt = epoch + } + }() + delayTimer := time.NewTimer(mineDelay) + for { + select { + case <-miningCtx.Done(): + s.state = end + return + case <-delayTimer.C: + s.state = grace + mineInCh <- s.base + return + case input, ok := <-inCh: + if !ok { + inCh = nil + continue + } + inEpoch, err := input.TipSet.Height() + if err != nil { + panic("this can't be happening") + } + // Older epoch? Do nothing. TODO: is it secure that old heavier tipsets are ignored? + // Current epoch? Replace base. + // Newer epoch? Loop back to DELAY state for new epoch. + if inEpoch == epoch { + s.base = input + } + if inEpoch > epoch { + // log.Warning("miner preempted during mining delay") + s.base = input + epoch = inEpoch + return + } + case _, ok := <-mineOutCh: + if ok { + s.miningAt++ + } else { + mineOutCh = nil + } + } + } +} + +// grace waits for the protocol grace period "mineGrace". During the grace +// period mining is always running. However grace restarts mining when +// receiving new heavier base tipsets. +func (s *timingScheduler) grace(miningCtx context.Context, inCh <-chan Input, mineInCh chan<- Input) { + epoch, err := s.base.TipSet.Height() + if err != nil { + panic("this can't be happening") + } + epoch += s.base.NullBlks + + graceTimer := time.NewTimer(mineGrace) + for { + select { + case <-miningCtx.Done(): + s.state = end + return + case <-graceTimer.C: + s.state = ignore + return + case input, ok := <-inCh: + if !ok { + inCh = nil + continue + } + inEpoch, err := input.TipSet.Height() + if err != nil { + panic("this can't be happening") + } + // Older epoch? Do nothing. TODO: is it secure that old heavier tipsets are ignored? + // Current epoch? Restart mining. + // Newer epoch? Loop back to DELAY state for new epoch. + if inEpoch == epoch { + s.base = input + mineInCh <- input + } + if inEpoch > epoch { + // log.Warning("miner preempted during grace period") + s.base = input + s.state = delay + return + } + } + } +} + +// ignore waits for the mining period. During this time no new tipsets of this +// epoch are accepted as new mining bases. ignore is finished when the mining +// period is over, or a tipset of the next epoch is input. +func (s *timingScheduler) ignore(miningCtx context.Context, inCh <-chan Input, mineOutCh <-chan struct{}) { + epoch, err := s.base.TipSet.Height() + if err != nil { + panic("this can't be happening") + } + epoch += s.base.NullBlks + + mineTimer := time.NewTimer(mineSleepTime) + for { + select { + case <-miningCtx.Done(): + s.state = end + return + case <-mineTimer.C: + s.state = delay + return + case input, ok := <-inCh: + if !ok { + inCh = nil + continue + } + inEpoch, err := input.TipSet.Height() + if err != nil { + panic("this can't be happening") + } + // Older epoch? Current epoch? Do nothing. + // Newer epoch? Loop back to DELAY state for new epoch. + if inEpoch > epoch { + s.base = input + s.state = delay + return + } + case _, ok := <-mineOutCh: + if ok { + s.miningAt++ + } else { + mineOutCh = nil + } + } + } +} + +// Start is the main entrypoint for Worker. Call it to start mining. It returns +// two channels: an input channel for blocks and an output channel for results. +// It also returns a waitgroup that will signal that all mining runs have +// completed. Each block that is received on the input channel causes the +// worker to cancel the context of the previous mining run if any and start +// mining on the new block. Any results are sent into its output channel. +// Closing the input channel does not cause the worker to stop; cancel +// the Input.Ctx to cancel an individual mining run or the mininCtx to +// stop all mining and shut down the worker. +// +// TODO A potentially simpler interface here would be for the worker to +// take the input channel from the caller and then shut everything down +// when the input channel is closed. +func (s *timingScheduler) Start(miningCtx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) { + inCh := make(chan Input) + outCh := make(chan Output) + mineInCh := make(chan Input) + mineOutCh := make(chan struct{}) + var doneWg sync.WaitGroup // for internal use + var extDoneWg sync.WaitGroup // for external use + + doneWg.Add(1) + go s.runMine(miningCtx, outCh, mineInCh, mineOutCh, doneWg) + + doneWg.Add(1) + go func() { + defer doneWg.Done() + + // Wait for an initial value + input := <-inCh + s.base = input + fmt.Printf("Start: init\n") + + // The receive loop. The worker operates in three basic receiveStates: + // delay -- wait for the mining delay to receive the best tipset for this epoch + // grace -- start mining but restart off of better tipsets that arrive + // ignore -- ignore all tipsets from the current epoch and finish mining + for { + switch s.state { + case delay: + fmt.Printf("Start: delay\n") + s.delay(miningCtx, inCh, mineInCh, mineOutCh) + case grace: + fmt.Printf("Start: grace\n") + s.grace(miningCtx, inCh, mineInCh) + case ignore: + fmt.Printf("Start: ignore\n") + s.ignore(miningCtx, inCh, mineOutCh) + case end: + fmt.Printf("Start: end\n") + return + default: + panic("worker should never reach here") + } + } + }() + + // This tear down goroutine waits for all work to be done before closing + // channels. When this goroutine is complete, external code can + // consider the worker to be done. + extDoneWg.Add(1) + go func() { + doneWg.Wait() + close(outCh) + close(mineInCh) + close(mineOutCh) + extDoneWg.Done() + fmt.Printf("tear down all done\n") + }() + return inCh, outCh, &extDoneWg +} + +// NewScheduler returns a new timingScheduler to schedule mining work on the +// input worker. +func NewScheduler(w Worker) Scheduler { + return &timingScheduler{worker: w} +} + +// MineOnce is a convenience function that presents a synchronous blocking +// interface to the mining scheduler. +func MineOnce(ctx context.Context, s Scheduler, ts core.TipSet) Output { + subCtx, subCtxCancel := context.WithCancel(ctx) + defer subCtxCancel() + + inCh, outCh, _ := s.Start(subCtx) + go func() { inCh <- NewInput(subCtx, ts) }() + return <-outCh +} diff --git a/mining/scheduler_test.go b/mining/scheduler_test.go new file mode 100644 index 0000000000..45d614af12 --- /dev/null +++ b/mining/scheduler_test.go @@ -0,0 +1,120 @@ +package mining + +import ( + "context" + "testing" + + "github.com/filecoin-project/go-filecoin/core" + "github.com/filecoin-project/go-filecoin/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestUtils(t *testing.T) (*assert.Assertions, *require.Assertions, core.TipSet) { + assert := assert.New(t) + require := require.New(t) + baseBlock := &types.Block{StateRoot: types.SomeCid()} + + ts := core.TipSet{baseBlock.Cid().String(): baseBlock} + return assert, require, ts +} + +// TestMineOnce tests that the MineOnce function results in a mining job being +// scheduled and run by the mining scheduler. +func TestMineOnce(t *testing.T) { + assert, require, ts := newTestUtils(t) + + // Echoes the sent block to output. + worker := newTestWorkerWithDeps(makeEchoMine(require)) + scheduler := NewScheduler(worker) + result := MineOnce(context.Background(), scheduler, ts) + assert.NoError(result.Err) + assert.True(ts.ToSlice()[0].StateRoot.Equals(result.NewBlock.StateRoot)) +} + +func TestSchedulerPassesValue(t *testing.T) { + assert, _, ts := newTestUtils(t) + ctx, cancel := context.WithCancel(context.Background()) + + checkValsMine := func(c context.Context, i Input, outCh chan<- Output) { + assert.NotEqual(ctx, c) // individual run ctx splits off from mining ctx + assert.Equal(i.TipSet, ts) + outCh <- Output{} + } + worker := newTestWorkerWithDeps(checkValsMine) + scheduler := NewScheduler(worker) + inCh, outCh, _ := scheduler.Start(ctx) + inCh <- NewInput(context.Background(), ts) + <-outCh + cancel() + return +} + +func TestSchedulerPassValue(t *testing.T) { + assert, require, ts := newTestUtils(t) + // Test that we can push multiple blocks through. There was an actual bug + // where multiply queued inputs were not all processed. + // Another scheduler test. Now we need to have timing involved... + ctx, cancel := context.WithCancel(context.Background()) + worker := newTestWorkerWithDeps(makeEchoMine(require)) + scheduler := NewScheduler(worker) + inCh, outCh, _ := scheduler.Start(ctx) + // Note: inputs have to pass whatever check on newly arriving tipsets + // are in place in Start(). + inCh <- NewInput(context.Background(), ts) + inCh <- NewInput(context.Background(), ts) + inCh <- NewInput(context.Background(), ts) + <-outCh + <-outCh + <-outCh + assert.Equal(ChannelEmpty, ReceiveOutCh(outCh)) + cancel() +} + +func TestSchedulerCancelInputCtx(t *testing.T) { + assert, _, ts := newTestUtils(t) + // Test that canceling the Input.Ctx cancels that input's mining run. + // scheduler test. Again state / timing considerations need to be respected now + // TODO: this test might need to go away with new scheduler patterns, at very least + // it should be reevaluated after we settle on a solid new pattern for canceling input runs. + miningCtx, miningCtxCancel := context.WithCancel(context.Background()) + inputCtx, inputCtxCancel := context.WithCancel(context.Background()) + var gotMineCtx context.Context + fakeMine := func(c context.Context, i Input, outCh chan<- Output) { + gotMineCtx = c + outCh <- Output{} + } + worker := newTestWorkerWithDeps(fakeMine) + scheduler := NewScheduler(worker) + inCh, outCh, _ := scheduler.Start(miningCtx) + inCh <- NewInput(inputCtx, ts) + <-outCh + inputCtxCancel() + assert.Error(gotMineCtx.Err()) // Same context as miningRunCtx. + assert.NoError(miningCtx.Err()) + miningCtxCancel() +} + +func TestSchedulerCancelMiningCtx(t *testing.T) { + assert, _, ts := newTestUtils(t) + // Test that canceling the mining context stops mining, cancels + // the inner context, and closes the output channel. + // scheduler test + miningCtx, miningCtxCancel := context.WithCancel(context.Background()) + inputCtx, inputCtxCancel := context.WithCancel(context.Background()) + gotMineCtx := context.Background() + fakeMine := func(c context.Context, i Input, outCh chan<- Output) { + gotMineCtx = c + outCh <- Output{} + } + worker := newTestWorkerWithDeps(fakeMine) + scheduler := NewScheduler(worker) + inCh, outCh, doneWg := scheduler.Start(miningCtx) + inCh <- NewInput(inputCtx, ts) + <-outCh + miningCtxCancel() + doneWg.Wait() + assert.Equal(ChannelClosed, ReceiveOutCh(outCh)) + assert.Error(gotMineCtx.Err()) + inputCtxCancel() +} diff --git a/mining/testing.go b/mining/testing.go index 2eb2097c13..f726e68f4d 100644 --- a/mining/testing.go +++ b/mining/testing.go @@ -4,37 +4,49 @@ import ( "context" "sync" - "github.com/filecoin-project/go-filecoin/core" - "github.com/filecoin-project/go-filecoin/types" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) -// MockBlockGenerator is a testify mock for BlockGenerator. -type MockBlockGenerator struct { +// MockScheduler is a mock Scheduler. +type MockScheduler struct { mock.Mock } -var _ BlockGenerator = &MockBlockGenerator{} +// Start is the MockScheduler's Start function. +func (s *MockScheduler) Start(ctx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) { + args := s.Called(ctx) + return args.Get(0).(chan<- Input), args.Get(1).(<-chan Output), args.Get(2).(*sync.WaitGroup) +} + +// TestWorker is a worker with a customizable work function to facilitate +// easy testing. +type TestWorker struct { + WorkFun func(context.Context, Input, chan<- Output) +} -// Generate is a testify mock implementation. -func (bg *MockBlockGenerator) Generate(ctx context.Context, h core.TipSet, ticket types.Signature, nullBlockCount uint64, m types.Address) (b *types.Block, err error) { - args := bg.Called(ctx, h, nullBlockCount, m) - if args.Get(0) != nil { - b = args.Get(0).(*types.Block) +// Mine is the TestWorker's Work function. It simply calls the workFun +// field. +func (w *TestWorker) Mine(ctx context.Context, input Input, outCh chan<- Output) { + if w.WorkFun == nil { + panic("must set MutableTestWorker's WorkFun before calling Work") } - err = args.Error(1) - return + w.WorkFun(ctx, input, outCh) } -// MockWorker is a mock Worker. -type MockWorker struct { - mock.Mock +func newTestWorkerWithDeps(f func(context.Context, Input, chan<- Output)) *TestWorker { + return &TestWorker{ + WorkFun: f, + } } -// Start is the MockWorker's Start function. -func (w *MockWorker) Start(ctx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) { - args := w.Called(ctx) - return args.Get(0).(chan<- Input), args.Get(1).(<-chan Output), args.Get(2).(*sync.WaitGroup) +func makeEchoMine(require *require.Assertions) func(context.Context, Input, chan<- Output) { + echoMine := func(c context.Context, i Input, outCh chan<- Output) { + require.NotEqual(0, len(i.TipSet)) + b := i.TipSet.ToSlice()[0] + outCh <- Output{NewBlock: b} + } + return echoMine } const ( diff --git a/mining/worker.go b/mining/worker.go index 1befbb51ee..6da9910ec7 100644 --- a/mining/worker.go +++ b/mining/worker.go @@ -1,21 +1,30 @@ package mining +// The Worker Mines on Input received from a Scheduler. The Worker is +// responsible for generating the necessary proofs, checking for success, +// generating new blocks, and forwarding them out to the wider node. + import ( "bytes" "context" "encoding/binary" "math/big" - "sync" "time" "github.com/filecoin-project/go-filecoin/core" + "github.com/filecoin-project/go-filecoin/state" "github.com/filecoin-project/go-filecoin/types" + "github.com/filecoin-project/go-filecoin/vm" + "gx/ipfs/QmSkuaNgyGmV8c1L3cZNWcUxRJV6J3nsD96JVQPcWcwtyW/go-hamt-ipld" sha256 "gx/ipfs/QmXTpwq2AkzQsPjKqFQDNY2bMdsAT53hUBETeyj8QRHTZU/sha256-simd" + "gx/ipfs/QmcD7SqfyQyA91TZUQ7VPRYbGarxmY7EsQewVYMuN5LNSv/go-ipfs-blockstore" + logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log" ) var ( ticketDomain *big.Int + log = logging.Logger("mining") ) func init() { @@ -28,8 +37,9 @@ func init() { // to accrue rewards to, and a context that the caller can use // to cancel this mining run. type Input struct { - Ctx context.Context - TipSet core.TipSet + Ctx context.Context // TODO: we should evaluate if this is still useful + TipSet core.TipSet + NullBlks uint64 } // NewInput instantiates a new Input. @@ -50,105 +60,57 @@ func NewOutput(b *types.Block, e error) Output { return Output{NewBlock: b, Err: e} } -// AsyncWorker implements the plumbing that drives mining. -type AsyncWorker struct { - blockGenerator BlockGenerator - createPoST DoSomeWorkFunc // TODO: rename createPoSTFunc? - nullBlockTimer NullBlockTimerFunc - minerAddr types.Address // TODO: needs to be a key in the near future -} - -// Worker is the mining interface consumers use. When you Start() a worker -// it returns two channels (inCh, outCh) and a sync.WaitGroup: -// - inCh: caller send Inputs to mine on to this channel -// - outCh: the worker sends Outputs to the caller on this channel -// - doneWg: signals that the worker and any mining runs it launched -// have stopped. (Context cancelation happens async, so you -// need some way to know when it has actually stopped.) -// -// Once Start()ed, the Worker can be stopped by canceling its miningCtx, which -// will signal on doneWg when it's actually done. Canceling an Input.Ctx -// just cancels the run for that input. Canceling miningCtx cancels any run -// in progress and shuts the worker down. +// Worker is the interface called by the Scheduler to run the mining work being +// scheduled. type Worker interface { - Start(miningCtx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) + Mine(runCtx context.Context, input Input, outCh chan<- Output) } -// NewWorker instantiates a new Worker. -func NewWorker(blockGenerator BlockGenerator, miner types.Address) Worker { - return NewWorkerWithDeps(blockGenerator, createPoST, nullBlockTimer, miner) -} +// GetStateTree is a function that gets the aggregate state tree of a TipSet. It's +// its own function to facilitate testing. +type GetStateTree func(context.Context, core.TipSet) (state.Tree, error) -// NewWorkerWithDeps instantiates a new Worker with custom functions. -func NewWorkerWithDeps(blockGenerator BlockGenerator, createPoST DoSomeWorkFunc, nullBlockTimer NullBlockTimerFunc, miner types.Address) Worker { - return &AsyncWorker{ - blockGenerator: blockGenerator, - createPoST: createPoST, - nullBlockTimer: nullBlockTimer, - minerAddr: miner, - } -} +// GetWeight is a function that calculates the weight of a TipSet. Weight is +// expressed as two uint64s comprising a rational number. +type GetWeight func(context.Context, core.TipSet) (uint64, uint64, error) -// MineOnce is a convenience function that presents a synchronous blocking -// interface to the worker. -func MineOnce(ctx context.Context, w Worker, ts core.TipSet) Output { - subCtx, subCtxCancel := context.WithCancel(ctx) - defer subCtxCancel() +type miningApplier func(ctx context.Context, messages []*types.SignedMessage, st state.Tree, vms vm.StorageMap, bh *types.BlockHeight) (core.ApplyMessagesResponse, error) - inCh, outCh, _ := w.Start(subCtx) - go func() { inCh <- NewInput(subCtx, ts) }() - return <-outCh +// MiningWorker runs a mining job. +type MiningWorker struct { + createPoST DoSomeWorkFunc // TODO: rename createPoSTFunc? + minerAddr types.Address // TODO: needs to be a key in the near future + + // consensus things + getStateTree GetStateTree + getWeight GetWeight + + // core filecoin things + messagePool *core.MessagePool + applyMessages miningApplier + powerTable core.PowerTableView + blockstore blockstore.Blockstore + cstore *hamt.CborIpldStore } -// Start is the main entrypoint for Worker. Call it to start mining. It returns -// two channels: an input channel for blocks and an output channel for results. -// It also returns a waitgroup that will signal that all mining runs have -// completed. Each block that is received on the input channel causes the -// worker to cancel the context of the previous mining run if any and start -// mining on the new block. Any results are sent into its output channel. -// Closing the input channel does not cause the worker to stop; cancel -// the Input.Ctx to cancel an individual mining run or the mininCtx to -// stop all mining and shut down the worker. -// -// TODO A potentially simpler interface here would be for the worker to -// take the input channel from the caller and then shut everything down -// when the input channel is closed. -func (w *AsyncWorker) Start(miningCtx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) { - inCh := make(chan Input) - outCh := make(chan Output) - var doneWg sync.WaitGroup - - doneWg.Add(1) - go func() { - defer doneWg.Done() - var currentRunCtx context.Context - var currentRunCancel = func() {} - for { - select { - case <-miningCtx.Done(): - currentRunCancel() - close(outCh) - return - case input, ok := <-inCh: - if ok { - // TODO(EC): implement the mining logic described in the spec here: - // https://github.com/filecoin-project/specs/pull/71/files#diff-a7e9cad7bc42c664eb72d7042276a22fR83 - // specifically: - currentRunCancel() - currentRunCtx, currentRunCancel = context.WithCancel(input.Ctx) - doneWg.Add(1) - go func() { - w.Mine(currentRunCtx, input, outCh) - doneWg.Done() - }() - } else { - // Sender closed the channel. Set it to nil to ignore it. - inCh = nil - } - } - } - }() - return inCh, outCh, &doneWg +// NewMiningWorker instantiates a new Worker. +func NewMiningWorker(messagePool *core.MessagePool, getStateTree GetStateTree, getWeight GetWeight, applyMessages miningApplier, powerTable core.PowerTableView, bs blockstore.Blockstore, cst *hamt.CborIpldStore, miner types.Address) *MiningWorker { + return NewMiningWorkerWithDeps(messagePool, getStateTree, getWeight, applyMessages, powerTable, bs, cst, miner, createPoST) +} + +// NewMiningWorkerWithDeps instantiates a new Worker with custom functions. +func NewMiningWorkerWithDeps(messagePool *core.MessagePool, getStateTree GetStateTree, getWeight GetWeight, applyMessages miningApplier, powerTable core.PowerTableView, bs blockstore.Blockstore, cst *hamt.CborIpldStore, miner types.Address, createPoST DoSomeWorkFunc) *MiningWorker { + return &MiningWorker{ + getStateTree: getStateTree, + getWeight: getWeight, + messagePool: messagePool, + applyMessages: applyMessages, + powerTable: powerTable, + blockstore: bs, + cstore: cst, + createPoST: createPoST, + minerAddr: miner, + } } // DoSomeWorkFunc is a dummy function that mimics doing something time-consuming @@ -156,50 +118,42 @@ func (w *AsyncWorker) Start(miningCtx context.Context) (chan<- Input, <-chan Out // is a good idea for now. type DoSomeWorkFunc func() -// NullBlockTimerFunc blocks until it is time to add a null block. -type NullBlockTimerFunc func() - -// Mine does the actual work. It's the implementation of worker.mine. -func (w *AsyncWorker) Mine(ctx context.Context, input Input, outCh chan<- Output) { +// Mine implements the MiningWorkers main mining function.. +func (w *MiningWorker) Mine(ctx context.Context, input Input, outCh chan<- Output) { ctx = log.Start(ctx, "Worker.Mine") defer log.Finish(ctx) // TODO: derive these from actual storage power. - // This means broadening the scope of the State function - // and powerTableView from the generator to the worker. + // This should now be pretty easy because the worker has getState and + // powertable view. + // To fix this and keep mock-mine mode actually generating blocks we'll + // need to update the view to give every miner a little power in the + // network. const myPower = 1 const totalPower = 5 - for nullBlockCount := uint64(0); ; nullBlockCount++ { - if ctx.Err() != nil { - break - } + if ctx.Err() != nil { + return + } + + challenge := createChallenge(input.TipSet, input.NullBlks) + proof := createProof(challenge, w.createPoST) + ticket := createTicket(proof) - challenge := createChallenge(input.TipSet, nullBlockCount) - proof := createProof(challenge, createPoST) - ticket := createTicket(proof) - - // TODO: Test the interplay of isWinningTicket() and createPoST() - if isWinningTicket(ticket, myPower, totalPower) { - next, err := w.blockGenerator.Generate(ctx, input.TipSet, ticket, nullBlockCount, w.minerAddr) - if err == nil { - log.SetTag(ctx, "block", next) - } - - // TODO(EC): Consider what to do if we have found a winning ticket and are mining with - // it and a new tipset comes in with greater height. Currently Worker.Start() will cancel us. - // We should instead let the successful run proceed unless the context is explicitly canceled. - if ctx.Err() == nil { - outCh <- NewOutput(next, err) - } else { - log.Warningf("Abandoning successfully mined block without publishing: %s", input.TipSet.String()) - } - - break + // TODO: Test the interplay of isWinningTicket() and createPoST() + if isWinningTicket(ticket, myPower, totalPower) { + next, err := w.Generate(ctx, input.TipSet, ticket, input.NullBlks) + if err == nil { + log.SetTag(ctx, "block", next) } - nullBlockTimer() + if ctx.Err() == nil { + outCh <- NewOutput(next, err) + } else { + log.Warningf("Abandoning successfully mined block without publishing: %s", input.TipSet.String()) + } } + time.Sleep(mineSleepTime) } func createChallenge(parents core.TipSet, nullBlockCount uint64) []byte { @@ -243,16 +197,8 @@ var isWinningTicket = func(ticket []byte, myPower, totalPower int64) bool { return lhs.Cmp(rhs) < 0 } -// How long the node's mining Worker should sleep to simulate mining. -const mineSleepTime = time.Millisecond * 10 - // createPoST is the default implementation of DoSomeWorkFunc. Contrary to the // advertisement, it doesn't do anything yet. func createPoST() { time.Sleep(mineSleepTime) } - -// nullBlockTimer is the default implementation of NullBlockTimerFunc. -func nullBlockTimer() { - time.Sleep(mineSleepTime) -} diff --git a/mining/worker_test.go b/mining/worker_test.go index 0863d1506d..e054a5e80e 100644 --- a/mining/worker_test.go +++ b/mining/worker_test.go @@ -1,221 +1,63 @@ package mining import ( - //"context" + "context" "encoding/hex" - //"errors" + "errors" "testing" + "github.com/filecoin-project/go-filecoin/address" "github.com/filecoin-project/go-filecoin/core" + "github.com/filecoin-project/go-filecoin/state" "github.com/filecoin-project/go-filecoin/types" - //"github.com/filecoin-project/go-filecoin/util/swap" "github.com/stretchr/testify/assert" - //"github.com/stretchr/testify/mock" - //"github.com/stretchr/testify/require" + "github.com/stretchr/testify/require" + "gx/ipfs/QmSkuaNgyGmV8c1L3cZNWcUxRJV6J3nsD96JVQPcWcwtyW/go-hamt-ipld" sha256 "gx/ipfs/QmXTpwq2AkzQsPjKqFQDNY2bMdsAT53hUBETeyj8QRHTZU/sha256-simd" + "gx/ipfs/QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP/go-cid" + "gx/ipfs/QmcD7SqfyQyA91TZUQ7VPRYbGarxmY7EsQewVYMuN5LNSv/go-ipfs-blockstore" + "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore" ) -/* -func TestMineOnce(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - mockBg := &MockBlockGenerator{} - baseBlock := &types.Block{StateRoot: types.SomeCid()} - tipSet := core.TipSet{baseBlock.Cid().String(): baseBlock} - newAddr := types.NewAddressForTestGetter() - rewardAddr := newAddr() - miningAddr := newAddr() - - var mineCtx context.Context - // Echoes the sent block to output. - echoMine := func(c context.Context, i Input, _ NullBlockTimerFunc, bg BlockGenerator, doSomeWork DoSomeWorkFunc, outCh chan<- Output) { - mineCtx = c - require.Equal(1, len(i.TipSet)) - b := i.TipSet.ToSlice()[0] - outCh <- Output{NewBlock: b} - } - worker := NewWorkerWithDeps(mockBg, echoMine, func() {}, nullBlockImmediately) - result := MineOnce(context.Background(), worker, tipSet, rewardAddr, miningAddr) - assert.NoError(result.Err) - assert.True(baseBlock.StateRoot.Equals(result.NewBlock.StateRoot)) - assert.Error(mineCtx.Err()) -} - -func TestWorker_Start(t *testing.T) { +func Test_Mine(t *testing.T) { assert := assert.New(t) require := require.New(t) newCid := types.NewCidForTestGetter() - baseBlock := &types.Block{StateRoot: newCid()} + stateRoot := newCid() + baseBlock := &types.Block{Height: 2, StateRoot: stateRoot} tipSet := core.TipSet{baseBlock.Cid().String(): baseBlock} - mockBg := &MockBlockGenerator{} - newAddr := types.NewAddressForTestGetter() - rewardAddr := newAddr() - miningAddr := newAddr() - - // Test that values are passed faithfully. - ctx, cancel := context.WithCancel(context.Background()) - doSomeWorkCalled := false - doSomeWork := func() { doSomeWorkCalled = true } - mineCalled := false - fakeMine := func(c context.Context, i Input, _ NullBlockTimerFunc, bg BlockGenerator, doSomeWork DoSomeWorkFunc, outCh chan<- Output) { - mineCalled = true - assert.NotEqual(ctx, c) - require.Equal(1, len(i.TipSet)) - b := i.TipSet.ToSlice()[0] - assert.True(baseBlock.StateRoot.Equals(b.StateRoot)) - assert.Equal(mockBg, bg) - doSomeWork() - outCh <- Output{} - } - worker := NewWorkerWithDeps(mockBg, fakeMine, doSomeWork, nullBlockImmediately) - inCh, outCh, _ := worker.Start(ctx) - inCh <- NewInput(context.Background(), tipSet, rewardAddr, miningAddr) - <-outCh - assert.True(mineCalled) - assert.True(doSomeWorkCalled) - cancel() - - // Test that multi-block tipsets are passed faithfully - mineCalled = false - ctx, cancel = context.WithCancel(context.Background()) - tipSet = core.RequireNewTipSet(require, []*types.Block{{StateRoot: newCid()}, {StateRoot: newCid()}}...) - fakeMine = func(c context.Context, i Input, _ NullBlockTimerFunc, bg BlockGenerator, doSomeWork DoSomeWorkFunc, outCh chan<- Output) { - mineCalled = true - require.Equal(2, len(i.TipSet)) - tipSetMined := i.TipSet - assert.Equal(tipSet, tipSetMined) - outCh <- Output{} - } - worker = NewWorkerWithDeps(mockBg, fakeMine, func() {}, nullBlockImmediately) - inCh, outCh, _ = worker.Start(ctx) - inCh <- NewInput(context.Background(), tipSet, rewardAddr, miningAddr) - <-outCh - assert.True(mineCalled) - cancel() - - // Test that we can push multiple blocks through. There was an actual bug - // where multiply queued inputs were not all processed. - ctx, cancel = context.WithCancel(context.Background()) - fakeMine = func(c context.Context, i Input, _ NullBlockTimerFunc, bg BlockGenerator, doSomeWork DoSomeWorkFunc, outCh chan<- Output) { - outCh <- Output{} - } - worker = NewWorkerWithDeps(mockBg, fakeMine, func() {}, nullBlockImmediately) - inCh, outCh, _ = worker.Start(ctx) - // Note: inputs have to pass whatever check on newly arriving tipsets - // are in place in Start(). - inCh <- NewInput(context.Background(), tipSet, rewardAddr, miningAddr) - inCh <- NewInput(context.Background(), tipSet, rewardAddr, miningAddr) - inCh <- NewInput(context.Background(), tipSet, rewardAddr, miningAddr) - <-outCh - <-outCh - <-outCh - assert.Equal(ChannelEmpty, ReceiveOutCh(outCh)) - cancel() // Makes vet happy. - - // Test that canceling the Input.Ctx cancels that input's mining run. - miningCtx, miningCtxCancel := context.WithCancel(context.Background()) - inputCtx, inputCtxCancel := context.WithCancel(context.Background()) - var gotMineCtx context.Context - fakeMine = func(c context.Context, i Input, _ NullBlockTimerFunc, bg BlockGenerator, doSomeWork DoSomeWorkFunc, outCh chan<- Output) { - gotMineCtx = c - outCh <- Output{} - } - worker = NewWorkerWithDeps(mockBg, fakeMine, func() {}, nullBlockImmediately) - inCh, outCh, _ = worker.Start(miningCtx) - inCh <- NewInput(inputCtx, tipSet, rewardAddr, miningAddr) - <-outCh - inputCtxCancel() - assert.Error(gotMineCtx.Err()) // Same context as miningRunCtx. - assert.NoError(miningCtx.Err()) - miningCtxCancel() // Make vet happy. - - // Test that canceling the mining context stops mining, cancels - // the inner context, and closes the output channel. - miningCtx, miningCtxCancel = context.WithCancel(context.Background()) - inputCtx, inputCtxCancel = context.WithCancel(context.Background()) - gotMineCtx = context.Background() - fakeMine = func(c context.Context, i Input, _ NullBlockTimerFunc, bg BlockGenerator, doSomeWork DoSomeWorkFunc, outCh chan<- Output) { - gotMineCtx = c - outCh <- Output{} - } - worker = NewWorkerWithDeps(mockBg, fakeMine, func() {}, nullBlockImmediately) - inCh, outCh, doneWg := worker.Start(miningCtx) - inCh <- NewInput(inputCtx, tipSet, rewardAddr, miningAddr) - <-outCh - miningCtxCancel() - doneWg.Wait() - assert.Equal(ChannelClosed, ReceiveOutCh(outCh)) - assert.Error(gotMineCtx.Err()) - inputCtxCancel() // Make vet happy. -} - -func Test_mine(t *testing.T) { - assert := assert.New(t) - baseBlock := &types.Block{Height: 2} - tipSet := core.TipSet{baseBlock.Cid().String(): baseBlock} - newAddr := types.NewAddressForTestGetter() - addr := newAddr() - miningAddr := newAddr() - next := &types.Block{Height: 3} ctx := context.Background() // Success. - mockBg := &MockBlockGenerator{} + st, pool, addrs, cst, bs := sharedSetup(t) + getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { + return st, nil + } + worker := NewMiningWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + next, err := worker.Generate(ctx, tipSet, nil, 0) + require.NoError(err) outCh := make(chan Output) - mockBg.On("Generate", mock.Anything, tipSet, uint64(0), addr, miningAddr).Return(next, nil) doSomeWorkCalled := false + worker.createPoST = func() { doSomeWorkCalled = true } input := NewInput(ctx, tipSet) - go Mine(ctx, input, nullBlockImmediately, mockBg, func() { doSomeWorkCalled = true }, outCh) + go worker.Mine(ctx, input, outCh) r := <-outCh assert.NoError(r.Err) assert.True(doSomeWorkCalled) assert.True(r.NewBlock.Cid().Equals(next.Cid())) - mockBg.AssertExpectations(t) // Block generation fails. - mockBg = &MockBlockGenerator{} + worker = NewMiningWorker(pool, makeExplodingGetStateTree(st), getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) outCh = make(chan Output) - mockBg.On("Generate", mock.Anything, tipSet, uint64(0), addr, miningAddr).Return(nil, errors.New("boom")) doSomeWorkCalled = false - input = NewInput(ctx, tipSet, addr, miningAddr) - go Mine(ctx, input, nullBlockImmediately, mockBg, func() { doSomeWorkCalled = true }, outCh) + worker.createPoST = func() { doSomeWorkCalled = true } + input = NewInput(ctx, tipSet) + go worker.Mine(ctx, input, outCh) r = <-outCh assert.Error(r.Err) assert.True(doSomeWorkCalled) - mockBg.AssertExpectations(t) - - // Null block count is increased until we find a winning ticket. - mockBg = &MockBlockGenerator{} - outCh = make(chan Output) - mockBg.On("Generate", mock.Anything, tipSet, uint64(2), addr, miningAddr).Return(next, nil) - workCount := 0 - input = NewInput(ctx, tipSet, addr, miningAddr) - (func() { - defer swap.Swap(&isWinningTicket, everyThirdWinningTicket())() - go Mine(ctx, input, nullBlockImmediately, mockBg, func() { workCount++ }, outCh) - r = <-outCh - })() - assert.NoError(r.Err) - assert.Equal(3, workCount) - assert.True(r.NewBlock.Cid().Equals(next.Cid())) - mockBg.AssertExpectations(t) -} - -// Implements NullBlockTimerFunc with the policy that it is always a good time -// to create a null block. -func nullBlockImmediately() { -} - -// Returns a ticket checking function that return true every third time -func everyThirdWinningTicket() func(_ []byte, _, _ int64) bool { - count := 0 - return func(_ []byte, _, _ int64) bool { - count++ - return count%3 == 0 - } } -*/ func TestIsWinningTicket(t *testing.T) { assert := assert.New(t) @@ -248,6 +90,7 @@ func TestIsWinningTicket(t *testing.T) { } } +// worker test func TestCreateChallenge(t *testing.T) { assert := assert.New(t) @@ -279,3 +122,342 @@ func TestCreateChallenge(t *testing.T) { assert.Equal(decoded, r) } } + +// Implements NullBlockTimerFunc with the policy that it is always a good time +// to create a null block. +func nullBlockImmediately() { +} + +// Returns a ticket checking function that return true every third time +func everyThirdWinningTicket() func(_ []byte, _, _ int64) bool { + count := 0 + return func(_ []byte, _, _ int64) bool { + count++ + return count%3 == 0 + } +} + +var seed = types.GenerateKeyInfoSeed() +var ki = types.MustGenerateKeyInfo(10, seed) +var mockSigner = types.NewMockSigner(ki) + +func TestGenerate(t *testing.T) { + // TODO fritz use core.FakeActor for state/contract tests for generate: + // - test nonces out of order + // - test nonce gap +} + +func sharedSetupInitial() (*hamt.CborIpldStore, *core.MessagePool, *cid.Cid) { + cst := hamt.NewCborStore() + pool := core.NewMessagePool() + // Install the fake actor so we can execute it. + fakeActorCodeCid := types.AccountActorCodeCid + return cst, pool, fakeActorCodeCid +} + +func sharedSetup(t *testing.T) (state.Tree, *core.MessagePool, []types.Address, *hamt.CborIpldStore, blockstore.Blockstore) { + require := require.New(t) + cst, pool, fakeActorCodeCid := sharedSetupInitial() + vms := core.VMStorage() + d := datastore.NewMapDatastore() + bs := blockstore.NewBlockstore(d) + + // TODO: We don't need fake actors here, so these could be made real. + // And the NetworkAddress actor can/should be the real one. + // Stick two fake actors in the state tree so they can talk. + addr1, addr2, addr3, addr4, addr5 := mockSigner.Addresses[0], mockSigner.Addresses[1], mockSigner.Addresses[2], mockSigner.Addresses[3], mockSigner.Addresses[4] + act1 := core.RequireNewFakeActor(require, vms, addr1, fakeActorCodeCid) + act2 := core.RequireNewFakeActor(require, vms, addr2, fakeActorCodeCid) + fakeNetAct := core.RequireNewFakeActor(require, vms, addr3, fakeActorCodeCid) + minerAct := core.RequireNewMinerActor(require, vms, addr4, addr5, []byte{}, types.NewBytesAmount(10000), core.RequireRandomPeerID(), types.NewAttoFILFromFIL(10000)) + minerOwner := core.RequireNewFakeActor(require, vms, addr5, fakeActorCodeCid) + _, st := core.RequireMakeStateTree(require, cst, map[types.Address]*types.Actor{ + // Ensure core.NetworkAddress exists to prevent mining reward message failures. + address.NetworkAddress: fakeNetAct, + addr1: act1, + addr2: act2, + addr4: minerAct, + addr5: minerOwner, + }) + return st, pool, []types.Address{addr1, addr2, addr3, addr4, addr5}, cst, bs +} + +// TODO this test belongs in core, it calls ApplyMessages +func TestApplyMessagesForSuccessTempAndPermFailures(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + vms := core.VMStorage() + + cst, _, fakeActorCodeCid := sharedSetupInitial() + + // Stick two fake actors in the state tree so they can talk. + addr1, addr2 := mockSigner.Addresses[0], mockSigner.Addresses[1] + act1 := core.RequireNewFakeActor(require, vms, addr1, fakeActorCodeCid) + _, st := core.RequireMakeStateTree(require, cst, map[types.Address]*types.Actor{ + addr1: act1, + }) + + ctx := context.Background() + + // NOTE: it is important that each category (success, temporary failure, permanent failure) is represented below. + // If a given message's category changes in the future, it needs to be replaced here in tests by another so we fully + // exercise the categorization. + // addr2 doesn't correspond to an extant account, so this will trigger errAccountNotFound -- a temporary failure. + msg1 := types.NewMessage(addr2, addr1, 0, nil, "", nil) + smsg1, err := types.NewSignedMessage(*msg1, &mockSigner) + require.NoError(err) + + // This is actually okay and should result in a receipt + msg2 := types.NewMessage(addr1, addr2, 0, nil, "", nil) + smsg2, err := types.NewSignedMessage(*msg2, &mockSigner) + require.NoError(err) + + // The following two are sending to self -- errSelfSend, a permanent error. + msg3 := types.NewMessage(addr1, addr1, 1, nil, "", nil) + smsg3, err := types.NewSignedMessage(*msg3, &mockSigner) + require.NoError(err) + + msg4 := types.NewMessage(addr2, addr2, 1, nil, "", nil) + smsg4, err := types.NewSignedMessage(*msg4, &mockSigner) + require.NoError(err) + + messages := []*types.SignedMessage{smsg1, smsg2, smsg3, smsg4} + + res, err := core.ApplyMessages(ctx, messages, st, vms, types.NewBlockHeight(0)) + + assert.Len(res.PermanentFailures, 2) + assert.Contains(res.PermanentFailures, smsg3) + assert.Contains(res.PermanentFailures, smsg4) + + assert.Len(res.TemporaryFailures, 1) + assert.Contains(res.TemporaryFailures, smsg1) + + assert.Len(res.Results, 1) + assert.Contains(res.SuccessfulMessages, smsg2) + + assert.NoError(err) +} + +func TestGenerateMultiBlockTipSet(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + ctx := context.Background() + newCid := types.NewCidForTestGetter() + st, pool, addrs, cst, bs := sharedSetup(t) + getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { + return st, nil + } + worker := NewMiningWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + + parents := types.NewSortedCidSet(newCid()) + stateRoot := newCid() + baseBlock1 := types.Block{ + Parents: parents, + Height: types.Uint64(100), + ParentWeightNum: types.Uint64(1000), + StateRoot: stateRoot, + } + baseBlock2 := types.Block{ + Parents: parents, + Height: types.Uint64(100), + ParentWeightNum: types.Uint64(1000), + StateRoot: stateRoot, + Nonce: 1, + } + blk, err := worker.Generate(ctx, core.RequireNewTipSet(require, &baseBlock1, &baseBlock2), nil, 0) + assert.NoError(err) + + assert.Len(blk.Messages, 1) // This is the mining reward. + assert.Equal(types.Uint64(101), blk.Height) + assert.Equal(types.Uint64(1020), blk.ParentWeightNum) +} + +// After calling Generate, do the new block and new state of the message pool conform to our expectations? +func TestGeneratePoolBlockResults(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + ctx := context.Background() + newCid := types.NewCidForTestGetter() + st, pool, addrs, cst, bs := sharedSetup(t) + + getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { + return st, nil + } + worker := NewMiningWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + + // addr3 doesn't correspond to an extant account, so this will trigger errAccountNotFound -- a temporary failure. + msg1 := types.NewMessage(addrs[2], addrs[0], 0, nil, "", nil) + smsg1, err := types.NewSignedMessage(*msg1, &mockSigner) + require.NoError(err) + + // This is actually okay and should result in a receipt + msg2 := types.NewMessage(addrs[0], addrs[1], 0, nil, "", nil) + smsg2, err := types.NewSignedMessage(*msg2, &mockSigner) + require.NoError(err) + + // The following two are sending to self -- errSelfSend, a permanent error. + msg3 := types.NewMessage(addrs[0], addrs[0], 1, nil, "", nil) + smsg3, err := types.NewSignedMessage(*msg3, &mockSigner) + require.NoError(err) + + msg4 := types.NewMessage(addrs[1], addrs[1], 0, nil, "", nil) + smsg4, err := types.NewSignedMessage(*msg4, &mockSigner) + require.NoError(err) + + pool.Add(smsg1) + pool.Add(smsg2) + pool.Add(smsg3) + pool.Add(smsg4) + + assert.Len(pool.Pending(), 4) + baseBlock := types.Block{ + Parents: types.NewSortedCidSet(newCid()), + Height: types.Uint64(100), + StateRoot: newCid(), + } + blk, err := worker.Generate(ctx, core.RequireNewTipSet(require, &baseBlock), nil, 0) + assert.NoError(err) + + assert.Len(pool.Pending(), 1) // This is the temporary failure. + assert.Contains(pool.Pending(), smsg1) + + assert.Len(blk.Messages, 2) // This is the good message + the mining reward. + + // Is the mining reward first? This will fail 50% of the time if we don't force the reward to come first. + assert.Equal(address.NetworkAddress, blk.Messages[0].From) +} + +func TestGenerateSetsBasicFields(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + ctx := context.Background() + newCid := types.NewCidForTestGetter() + + st, pool, addrs, cst, bs := sharedSetup(t) + + getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { + return st, nil + } + worker := NewMiningWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + + h := types.Uint64(100) + wNum := types.Uint64(1000) + wDenom := types.Uint64(1) + baseBlock := types.Block{ + Height: h, + ParentWeightNum: wNum, + ParentWeightDenom: wDenom, + StateRoot: newCid(), + } + baseTipSet := core.RequireNewTipSet(require, &baseBlock) + blk, err := worker.Generate(ctx, baseTipSet, nil, 0) + assert.NoError(err) + + assert.Equal(h+1, blk.Height) + assert.Equal(addrs[3], blk.Miner) + + blk, err = worker.Generate(ctx, baseTipSet, nil, 1) + assert.NoError(err) + + assert.Equal(h+2, blk.Height) + assert.Equal(wNum+10.0, blk.ParentWeightNum) + assert.Equal(wDenom, blk.ParentWeightDenom) + assert.Equal(addrs[3], blk.Miner) +} + +func TestGenerateWithoutMessages(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + ctx := context.Background() + newCid := types.NewCidForTestGetter() + + st, pool, addrs, cst, bs := sharedSetup(t) + getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { + return st, nil + } + worker := NewMiningWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + + assert.Len(pool.Pending(), 0) + baseBlock := types.Block{ + Parents: types.NewSortedCidSet(newCid()), + Height: types.Uint64(100), + StateRoot: newCid(), + } + blk, err := worker.Generate(ctx, core.RequireNewTipSet(require, &baseBlock), nil, 0) + assert.NoError(err) + + assert.Len(pool.Pending(), 0) // This is the temporary failure. + assert.Len(blk.Messages, 1) // This is the mining reward. +} + +// If something goes wrong while generating a new block, even as late as when flushing it, +// no block should be returned, and the message pool should not be pruned. +func TestGenerateError(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + ctx := context.Background() + newCid := types.NewCidForTestGetter() + + st, pool, addrs, cst, bs := sharedSetup(t) + + worker := NewMiningWorker(pool, makeExplodingGetStateTree(st), getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + + // This is actually okay and should result in a receipt + msg := types.NewMessage(addrs[0], addrs[1], 0, nil, "", nil) + smsg, err := types.NewSignedMessage(*msg, &mockSigner) + require.NoError(err) + pool.Add(smsg) + + assert.Len(pool.Pending(), 1) + baseBlock := types.Block{ + Parents: types.NewSortedCidSet(newCid()), + Height: types.Uint64(100), + StateRoot: newCid(), + } + baseTipSet := core.RequireNewTipSet(require, &baseBlock) + blk, err := worker.Generate(ctx, baseTipSet, nil, 0) + assert.Error(err, "boom") + assert.Nil(blk) + + assert.Len(pool.Pending(), 1) // No messages are removed from the pool. +} + +type StateTreeForTest struct { + state.Tree + TestFlush func(ctx context.Context) (*cid.Cid, error) +} + +func WrapStateTreeForTest(st state.Tree) *StateTreeForTest { + stt := StateTreeForTest{ + st, + st.Flush, + } + return &stt +} + +func (st *StateTreeForTest) Flush(ctx context.Context) (*cid.Cid, error) { + return st.TestFlush(ctx) +} + +func getWeightTest(c context.Context, ts core.TipSet) (uint64, uint64, error) { + num, den, err := ts.ParentWeight() + if err != nil { + return uint64(0), uint64(0), err + } + return num + uint64(int64(len(ts))*int64(core.ECV)), den, nil +} + +func makeExplodingGetStateTree(st state.Tree) func(context.Context, core.TipSet) (state.Tree, error) { + return func(c context.Context, ts core.TipSet) (state.Tree, error) { + stt := WrapStateTreeForTest(st) + stt.TestFlush = func(ctx context.Context) (*cid.Cid, error) { + return nil, errors.New("boom no flush") + } + + return stt, nil + } +} diff --git a/node/node.go b/node/node.go index 5cf624e21d..3b77795b42 100644 --- a/node/node.go +++ b/node/node.go @@ -64,8 +64,8 @@ type Node struct { Wallet *wallet.Wallet // Mining stuff. - MiningWorker mining.Worker - mining struct { + MiningScheduler mining.Scheduler + mining struct { sync.Mutex isMining bool } @@ -432,15 +432,14 @@ func (node *Node) StartMining() error { return err } - if node.MiningWorker == nil { - blockGenerator := mining.NewBlockGenerator(node.MsgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { + if node.MiningScheduler == nil { + getStateTree := func(ctx context.Context, ts core.TipSet) (state.Tree, error) { return node.ChainMgr.State(ctx, ts.ToSlice()) - }, node.ChainMgr.Weight, core.ApplyMessages, node.ChainMgr.PwrTableView, node.Blockstore, node.CborStore) - - node.MiningWorker = mining.NewWorker(blockGenerator, miningAddress) - + } + worker := mining.NewMiningWorker(node.MsgPool, getStateTree, node.ChainMgr.Weight, core.ApplyMessages, node.ChainMgr.PwrTableView, node.Blockstore, node.CborStore, miningAddress) + node.MiningScheduler = mining.NewScheduler(worker) node.miningCtx, node.cancelMining = context.WithCancel(context.Background()) - inCh, outCh, doneWg := node.MiningWorker.Start(node.miningCtx) + inCh, outCh, doneWg := node.MiningScheduler.Start(node.miningCtx) node.miningInCh = inCh node.miningDoneWg = doneWg node.AddNewlyMinedBlock = node.addNewlyMinedBlock diff --git a/node/node_test.go b/node/node_test.go index b2b0183d42..65062de35a 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -145,14 +145,15 @@ func TestNodeMining(t *testing.T) { node := MakeNodesUnstarted(t, 1, true, true)[0] - mockWorker := &mining.MockWorker{} - inCh, outCh, doneWg := make(chan mining.Input), make(chan mining.Output), &sync.WaitGroup{} + mockScheduler := &mining.MockScheduler{} + inCh, outCh, doneWg := make(chan mining.Input), make(chan mining.Output), new(sync.WaitGroup) // Apparently you have to have exact types for testify.mock, so // we use iCh and oCh for the specific return type of Start(). var iCh chan<- mining.Input = inCh var oCh <-chan mining.Output = outCh - mockWorker.On("Start", mock.Anything).Return(iCh, oCh, doneWg) - node.MiningWorker = mockWorker + + mockScheduler.On("Start", mock.Anything).Return(iCh, oCh, doneWg) + node.MiningScheduler = mockScheduler // TODO: this is horrible, this setup needs to be a lot less dependent of the inner workings of the node!! node.miningCtx, node.cancelMining = context.WithCancel(ctx) node.miningInCh = inCh @@ -206,12 +207,12 @@ func TestNodeMining(t *testing.T) { // Ensure that the output is wired up correctly. node = MakeNodesUnstarted(t, 1, true, true)[0] - mockWorker = &mining.MockWorker{} + mockScheduler = &mining.MockScheduler{} inCh, outCh, doneWg = make(chan mining.Input), make(chan mining.Output), new(sync.WaitGroup) iCh = inCh oCh = outCh - mockWorker.On("Start", mock.Anything).Return(iCh, oCh, doneWg) - node.MiningWorker = mockWorker + mockScheduler.On("Start", mock.Anything).Return(iCh, oCh, doneWg) + node.MiningScheduler = mockScheduler node.miningCtx, node.cancelMining = context.WithCancel(ctx) node.miningInCh = inCh node.miningDoneWg = doneWg diff --git a/node/testing.go b/node/testing.go index 1b895db7cc..b7003d3aeb 100644 --- a/node/testing.go +++ b/node/testing.go @@ -116,12 +116,12 @@ func RunCreateMiner(t *testing.T, node *Node, from types.Address, pledge types.B // wait for create miner call to put a message in the pool _, err = subscription.Next(ctx) require.NoError(err) - - blockGenerator := mining.NewBlockGenerator(node.MsgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { + getStateTree := func(ctx context.Context, ts core.TipSet) (state.Tree, error) { return node.ChainMgr.State(ctx, ts.ToSlice()) - }, node.ChainMgr.Weight, core.ApplyMessages, node.ChainMgr.PwrTableView, node.Blockstore, node.CborStore) + } + w := mining.NewMiningWorker(node.MsgPool, getStateTree, node.ChainMgr.Weight, core.ApplyMessages, node.ChainMgr.PwrTableView, node.Blockstore, node.CborStore, address.TestAddress) cur := node.ChainMgr.GetHeaviestTipSet() - out := mining.MineOnce(ctx, mining.NewWorker(blockGenerator, address.TestAddress), cur) + out := mining.MineOnce(ctx, mining.NewScheduler(w), cur) require.NoError(out.Err) require.NoError(node.ChainMgr.SetHeaviestTipSetForTest(ctx, core.RequireNewTipSet(require, out.NewBlock))) diff --git a/tools/go-fakecoin/main.go b/tools/go-fakecoin/main.go index 84b124e5eb..6ec18e95d4 100644 --- a/tools/go-fakecoin/main.go +++ b/tools/go-fakecoin/main.go @@ -108,10 +108,11 @@ func getChainManager(d repo.Datastore, bs blockstore.Blockstore) (*core.ChainMan return cm, cst } -func getBlockGenerator(msgPool *core.MessagePool, cm *core.ChainManager, cst *hamt.CborIpldStore, bs blockstore.Blockstore) mining.BlockGenerator { - return mining.NewBlockGenerator(msgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { +func getWorker(msgPool *core.MessagePool, cm *core.ChainManager, cst *hamt.CborIpldStore, bs blockstore.Blockstore) *mining.MiningWorker { + ma := types.MakeTestAddress("miningAddress") + return mining.NewMiningWorker(msgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { return cm.State(ctx, ts.ToSlice()) - }, cm.Weight, core.ApplyMessages, cm.PwrTableView, bs, cst) + }, cm.Weight, core.ApplyMessages, cm.PwrTableView, bs, cst, ma) } func getStateTree(ctx context.Context, d repo.Datastore, bs blockstore.Blockstore) (state.Tree, *hamt.CborIpldStore, *core.ChainManager, core.TipSet, error) { @@ -257,15 +258,15 @@ func fakeActors(ctx context.Context, cst *hamt.CborIpldStore, cm *core.ChainMana } func mineBlock(ctx context.Context, mp *core.MessagePool, cst *hamt.CborIpldStore, cm *core.ChainManager, bs blockstore.Blockstore, blks []*types.Block) (*types.Block, error) { - bg := getBlockGenerator(mp, cm, cst, bs) - ma := types.MakeTestAddress("miningaddress") + mw := getWorker(mp, cm, cst, bs) const nullBlockCount = 0 ts, err := core.NewTipSet(blks...) if err != nil { return nil, err } - blk, err := bg.Generate(ctx, ts, nil, nullBlockCount, ma) + blk, err := mw.Generate(ctx, ts, nil, nullBlockCount) + if err != nil { return nil, err } From 6006a43d29302a52ee92bae9b5a7d790686fa6c3 Mon Sep 17 00:00:00 2001 From: Wyatt Date: Tue, 21 Aug 2018 18:46:19 -0700 Subject: [PATCH 2/2] Scheduler improvements: This is an attempt to drastically simplify the timingScheduler Now the scheduler only has two states and only tracks a base input. Nullblock increment is moved back to the worker. This scheduler has a few idiosyncracies but is simple to reason about and prevents mining processes from getting interrupted midway through PoST generation. New scheduler tests added. --- api/impl/mining.go | 2 +- mining/block_generate.go | 6 +- mining/scheduler.go | 349 ++++++++++++++------------------------ mining/scheduler_test.go | 163 +++++++++++++----- mining/testing.go | 20 ++- mining/worker.go | 109 +++++++----- mining/worker_test.go | 61 +++---- node/node.go | 22 ++- node/testing.go | 2 +- tools/go-fakecoin/main.go | 4 +- 10 files changed, 382 insertions(+), 356 deletions(-) diff --git a/api/impl/mining.go b/api/impl/mining.go index 42a545af59..e573abfc06 100644 --- a/api/impl/mining.go +++ b/api/impl/mining.go @@ -26,7 +26,7 @@ func (api *nodeMining) Once(ctx context.Context) (*types.Block, error) { return nil, err } - worker := mining.NewMiningWorker(nd.MsgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { + worker := mining.NewDefaultWorker(nd.MsgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { return nd.ChainMgr.State(ctx, ts.ToSlice()) }, nd.ChainMgr.Weight, core.ApplyMessages, nd.ChainMgr.PwrTableView, nd.Blockstore, nd.CborStore, miningAddr) diff --git a/mining/block_generate.go b/mining/block_generate.go index 1d5a22e170..d1aae265ee 100644 --- a/mining/block_generate.go +++ b/mining/block_generate.go @@ -1,8 +1,8 @@ package mining -// Block generation is part of the logic of the default Worker (miningWorker). +// Block generation is part of the logic of the DefaultWorker. // 'generate' is that function that actually creates a new block from a base -// TipSet using the miningWorker's many utilities. +// TipSet using the DefaultWorker's many utilities. import ( "context" @@ -16,7 +16,7 @@ import ( ) // Generate returns a new block created from the messages in the pool. -func (w *MiningWorker) Generate(ctx context.Context, baseTipSet core.TipSet, ticket types.Signature, nullBlockCount uint64) (*types.Block, error) { +func (w *DefaultWorker) Generate(ctx context.Context, baseTipSet core.TipSet, ticket types.Signature, nullBlockCount uint64) (*types.Block, error) { stateTree, err := w.getStateTree(ctx, baseTipSet) if err != nil { return nil, errors.Wrap(err, "get state tree") diff --git a/mining/scheduler.go b/mining/scheduler.go index 3283ee1777..028d679dfc 100644 --- a/mining/scheduler.go +++ b/mining/scheduler.go @@ -2,16 +2,38 @@ package mining // The Scheduler listens for new heaviest TipSets and schedules mining work on // these TipSets. The scheduler is ultimately responsible for informing the -// rest of the system about new successful blocks. This is the interface to -// implement if you want to explore an alternate mining strategy. +// rest of the system about new blocks mined by the Worker. This is the +// interface to implement if you want to explore an alternate mining strategy. // -// The default Scheduler implementation, timingScheduler, operates in two -// states, 'collect', where the scheduler listens for new heaviest tipsets to -// replace the mining base, and 'ignore', where mining proceeds uninterrupted. +// The default Scheduler implementation, timingScheduler, is an attempt to +// prevent miners from getting interrupted by attackers strategically releasing +// better base tipsets midway through a proving period. Such attacks are bad +// because they causes miners to waste work. Note that the term 'base tipset', +// or 'mining base' is used to denote the tipset that the miner uses as the +// parent of the block it attempts to generate during mining. +// +// The timingScheduler operates in two states, 'collect', where the scheduler +// listens for new heaviest tipsets to use as the best mining base, and 'ignore', +// where mining proceeds uninterrupted. The scheduler enters the 'collect' state +// each time a new heaviest tipset arrives with a greater height. The +// scheduler finishes the collect state after the mining delay time, a protocol +// parameter, has passed. The scheduler then enters the 'ignore' state. Here +// the scheduler mines, ignoring all inputs with the most recent and lower +// heights. 'ignore' concludes when the scheduler receives an input tipset, +// possibly the tipset consisting of the block the miner just mined, with +// a greater height, and transitions back to collect. It is in miners' +// best interest to wait for the collection period so that they can wait to +// work on a base tipset made up of all blocks mined at the new height. +// +// The current approach is limited. It does not prevent wasted work from all +// strategic block witholding attacks. This is also going to be effected by +// current unknowns surrounding the specifics of the mining protocol (i.e. how +// do VDFs and PoSTs fit into mining, and what is the lookback parameter for +// challenge sampling. For more details see: +// https://gist.github.com/whyrusleeping/4c05fd902f7123bdd1c729e3fffed797 import ( "context" - "fmt" "sync" "time" @@ -21,308 +43,195 @@ import ( // Scheduler is the mining interface consumers use. When you Start() the // scheduler it returns two channels (inCh, outCh) and a sync.WaitGroup: // - inCh: the caller sends Inputs to mine on to this channel. -// - outCh: the worker sends Outputs to the caller on this channel. +// - outCh: the scheduler sends Outputs to the caller on this channel. // - doneWg: signals that the scheduler and any goroutines it launched // have stopped. (Context cancelation happens async, so you // need some way to know when it has actually stopped.) // -// Once Start()ed, the Scheduler can be stopped by canceling its miningCtx, which -// will signal on doneWg when it's actually done. Canceling an Input.Ctx -// just cancels the run for that input. Canceling miningCtx cancels any run -// in progress and shuts the worker down. +// Once Start()ed, the Scheduler can be stopped by canceling its miningCtx, +// which will signal on doneWg when it's actually done. Canceling miningCtx +// cancels any run in progress and shuts the scheduler down. type Scheduler interface { Start(miningCtx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) } type timingScheduler struct { - miningAt uint64 - base Input - state receiveState - + // base tracks the current tipset being mined on. It is only read and + // written from the timingScheduler's main receive loop, i.e. collect + // and ignore. + base Input + // worker contains the actual mining logic. worker Worker } -type receiveState int - -const ( - delay = iota - grace - ignore - end -) - -// mineGrace is the protocol grace period -const mineGrace = time.Second - -// mineSleepTime is the protocol's estimated mining time -const mineSleepTime = mineGrace * 30 - -// mineDelay is the protocol mining delay at the beginning of an epoch -const mineDelay = mineGrace * 2 - -// runMine runs the miner, inputs to the miner's goroutine are accepted on -// mineInCh. For each new input on mineInCh, runMine cancels the old mining -// run to run the new input. There is only one outstanding call to the 'mine' -// function at a time. Newly mined valid blocks are sent out on outCh. A -// signal indicating that a run of 'mine' was completed without interruption, -// even if it did not result in a valid block, is sent on mineOutCh. -func (s *timingScheduler) runMine(miningCtx context.Context, outCh chan<- Output, mineInCh <-chan Input, mineOutCh chan<- struct{}, doneWg sync.WaitGroup) { +// mineDelay is the protocol mining delay. The timingScheduler waits for the +// mining delay during its 'collect' state. It's set so low right now to +// facilitate testing. +const mineDelay = time.Millisecond * 20 + +// runWorker launches calls to worker.Mine(). Inputs to worker.Mine() are +// accepted on mineInCh. For each new input on mineInCh, runWorker cancels the +// old call to worker.Mine() and mines on the new input. There is only one +// outstanding call to worker.Mine() at a time. outCh is the channel read by +// the scheduler's caller. Newly mined blocks are sent out on outCh if the +// worker is able to mine a block. Nothing is output over outCh if the worker +// does not mine a block. If there is a mining error it is sent over the +// outCh. +func (s *timingScheduler) runWorker(miningCtx context.Context, outCh chan<- Output, mineInCh <-chan Input, doneWg *sync.WaitGroup) { defer doneWg.Done() - currentRunCancel := func() {} var currentRunCtx context.Context + currentRunCancel := func() {} for { select { case <-miningCtx.Done(): - fmt.Printf("runMine: miningCtx.Done()\n") currentRunCancel() return case input, ok := <-mineInCh: - fmt.Printf("runMine: mineInCh: %v\n", input) - if ok { - currentRunCancel() - currentRunCtx, currentRunCancel = context.WithCancel(input.Ctx) - doneWg.Add(1) - go func() { - defer doneWg.Done() - fmt.Printf("mine: running 'mine'\n") - s.worker.Mine(currentRunCtx, input, outCh) - mineOutCh <- struct{}{} - }() - } else { + if !ok { // sender closed mineCh, close and ignore mineInCh = nil + continue } + currentRunCancel() + currentRunCtx, currentRunCancel = context.WithCancel(miningCtx) + doneWg.Add(1) + go func() { + defer doneWg.Done() + s.worker.Mine(currentRunCtx, input, outCh) + }() } } } -// delay runs for the protocol mining delay "mineDelay" and updates the base +// collect runs for the protocol mining delay "mineDelay" and updates the base // tipset for mining to the latest tipset read from the input channel. -// delay initializes the next round of mining, canceling any previous mining -// calls still running. If mining work from the previous period is not -// completed delay logs a warning as this indicates someone is speeding up PoST -// generation somehow. -func (s *timingScheduler) delay(miningCtx context.Context, inCh <-chan Input, mineInCh chan<- Input, mineOutCh <-chan struct{}) { - epoch, err := s.base.TipSet.Height() - if err != nil { - panic("this can't be happening") - } - epoch += s.base.NullBlks - // No blocks found last epoch? Add a null block for this epoch's input. - if s.miningAt > epoch { - input := NewInput(s.base.Ctx, s.base.TipSet) - input.NullBlks = s.base.NullBlks + uint64(1) - s.base = input - epoch++ - } - - // This is unexpected and means someone is generating - // PoSTs too fast. - defer func() { - if s.miningAt != epoch { - // log.Warningf("Not enough time to mine on epochs %d to %d", s.miningAt, epoch - 1) - s.miningAt = epoch - } - }() +// If the scheduler should terminate collect() returns true. collect() +// initializes the next round of mining, canceling any previous mining calls +// still running. If the eager flag is set, collect starts mining right away, +// possibly starting and stopping multiple mining jobs. +func (s *timingScheduler) collect(miningCtx context.Context, inCh <-chan Input, mineInCh chan<- Input, eager bool) bool { delayTimer := time.NewTimer(mineDelay) for { select { case <-miningCtx.Done(): - s.state = end - return + return true case <-delayTimer.C: - s.state = grace - mineInCh <- s.base - return + if !eager { + mineInCh <- s.base + } + return false case input, ok := <-inCh: if !ok { + // sender closed inCh, close and ignore inCh = nil continue } - inEpoch, err := input.TipSet.Height() - if err != nil { - panic("this can't be happening") - } - // Older epoch? Do nothing. TODO: is it secure that old heavier tipsets are ignored? - // Current epoch? Replace base. - // Newer epoch? Loop back to DELAY state for new epoch. - if inEpoch == epoch { - s.base = input - } - if inEpoch > epoch { - // log.Warning("miner preempted during mining delay") - s.base = input - epoch = inEpoch - return - } - case _, ok := <-mineOutCh: - if ok { - s.miningAt++ - } else { - mineOutCh = nil + + log.Infof("scheduler receiving new base %s during collect", input.TipSet.String()) + s.base = input + if eager { + mineInCh <- input } } } } -// grace waits for the protocol grace period "mineGrace". During the grace -// period mining is always running. However grace restarts mining when -// receiving new heavier base tipsets. -func (s *timingScheduler) grace(miningCtx context.Context, inCh <-chan Input, mineInCh chan<- Input) { - epoch, err := s.base.TipSet.Height() - if err != nil { - panic("this can't be happening") - } - epoch += s.base.NullBlks - - graceTimer := time.NewTimer(mineGrace) +// ignore() waits for a new heaviest tipset with a greater height. No new tipsets +// from the current base tipset's height or lower heights are accepted as the +// new mining base. If the scheduler should terminate ignore() returns true. +// The purpose of the ignore state is to de-incentivize strategic lagging of +// block propagation part way through a proving period which can cause +// competing miners to waste work. +// Note: this strategy is limited, it does not prevent witholding tipsets of +// greater heights than the current base or from witholding tipsets from miners +// with a null block mining base. +func (s *timingScheduler) ignore(miningCtx context.Context, inCh <-chan Input) bool { for { select { case <-miningCtx.Done(): - s.state = end - return - case <-graceTimer.C: - s.state = ignore - return + return true case input, ok := <-inCh: if !ok { inCh = nil continue } - inEpoch, err := input.TipSet.Height() - if err != nil { - panic("this can't be happening") - } - // Older epoch? Do nothing. TODO: is it secure that old heavier tipsets are ignored? - // Current epoch? Restart mining. - // Newer epoch? Loop back to DELAY state for new epoch. - if inEpoch == epoch { + + // Scheduler begins in ignore state with a nil base. + // This case handles base init on the very first input. + if s.base.TipSet == nil { s.base = input - mineInCh <- input + return false } - if inEpoch > epoch { - // log.Warning("miner preempted during grace period") - s.base = input - s.state = delay - return - } - } - } -} -// ignore waits for the mining period. During this time no new tipsets of this -// epoch are accepted as new mining bases. ignore is finished when the mining -// period is over, or a tipset of the next epoch is input. -func (s *timingScheduler) ignore(miningCtx context.Context, inCh <-chan Input, mineOutCh <-chan struct{}) { - epoch, err := s.base.TipSet.Height() - if err != nil { - panic("this can't be happening") - } - epoch += s.base.NullBlks - - mineTimer := time.NewTimer(mineSleepTime) - for { - select { - case <-miningCtx.Done(): - s.state = end - return - case <-mineTimer.C: - s.state = delay - return - case input, ok := <-inCh: - if !ok { - inCh = nil - continue + curHeight, err := s.base.TipSet.Height() + if err != nil { + panic("this can't be happening") } - inEpoch, err := input.TipSet.Height() + inHeight, err := input.TipSet.Height() if err != nil { panic("this can't be happening") } - // Older epoch? Current epoch? Do nothing. - // Newer epoch? Loop back to DELAY state for new epoch. - if inEpoch > epoch { + + // Newer epoch? Loop back to collect state for new non-null epoch. + if inHeight > curHeight { + log.Infof("scheduler receiving new base %s during ignore, transition to collect", input.TipSet.String()) s.base = input - s.state = delay - return - } - case _, ok := <-mineOutCh: - if ok { - s.miningAt++ - } else { - mineOutCh = nil + return false } + log.Debugf("scheduler ignoring %s during ignore because height is not greater than height of current base", input.TipSet.String()) } } } -// Start is the main entrypoint for Worker. Call it to start mining. It returns -// two channels: an input channel for blocks and an output channel for results. -// It also returns a waitgroup that will signal that all mining runs have -// completed. Each block that is received on the input channel causes the -// worker to cancel the context of the previous mining run if any and start -// mining on the new block. Any results are sent into its output channel. -// Closing the input channel does not cause the worker to stop; cancel -// the Input.Ctx to cancel an individual mining run or the mininCtx to -// stop all mining and shut down the worker. -// -// TODO A potentially simpler interface here would be for the worker to -// take the input channel from the caller and then shut everything down -// when the input channel is closed. +// Start is the main entrypoint for the timingScheduler. Call it to start +// mining. It returns two channels: an input channel for tipsets and an output +// channel for newly mined blocks. It also returns a waitgroup that will signal +// that all mining runs and auxiliary goroutines have completed. Each tipset +// that is received on the input channel is procssesed by the scheduler. +// Depending on the tipset height and the time the input is received, a new +// input may cancel the context of the previous mining run, or be ignorned. +// Any successfully mined blocks are sent into the output channel. Closing the +// input channel does not cause the scheduler to stop; cancel the miningCtx to +// stop all mining and shut down the scheduler. func (s *timingScheduler) Start(miningCtx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) { inCh := make(chan Input) outCh := make(chan Output) mineInCh := make(chan Input) - mineOutCh := make(chan struct{}) var doneWg sync.WaitGroup // for internal use var extDoneWg sync.WaitGroup // for external use + log.Debugf("scheduler starting 'runWorker' goroutine") doneWg.Add(1) - go s.runMine(miningCtx, outCh, mineInCh, mineOutCh, doneWg) + go s.runWorker(miningCtx, outCh, mineInCh, &doneWg) + log.Debugf("scheduler starting main receive loop") doneWg.Add(1) go func() { defer doneWg.Done() - - // Wait for an initial value - input := <-inCh - s.base = input - fmt.Printf("Start: init\n") - - // The receive loop. The worker operates in three basic receiveStates: - // delay -- wait for the mining delay to receive the best tipset for this epoch - // grace -- start mining but restart off of better tipsets that arrive - // ignore -- ignore all tipsets from the current epoch and finish mining + // for now keep 'eager' unset. TODO eventually pull from config or CLI flag. + eager := false + // The receive loop. The scheduler operates in two basic states + // collect -- wait for the mining delay to listen for better base tipsets + // ignore -- mine on the best tipset, ignore all tipsets with height <= the height of the current base for { - switch s.state { - case delay: - fmt.Printf("Start: delay\n") - s.delay(miningCtx, inCh, mineInCh, mineOutCh) - case grace: - fmt.Printf("Start: grace\n") - s.grace(miningCtx, inCh, mineInCh) - case ignore: - fmt.Printf("Start: ignore\n") - s.ignore(miningCtx, inCh, mineOutCh) - case end: - fmt.Printf("Start: end\n") + if end := s.ignore(miningCtx, inCh); end { + return + } + if end := s.collect(miningCtx, inCh, mineInCh, eager); end { return - default: - panic("worker should never reach here") } } }() // This tear down goroutine waits for all work to be done before closing // channels. When this goroutine is complete, external code can - // consider the worker to be done. + // consider the scheduler to be done. extDoneWg.Add(1) go func() { + defer extDoneWg.Done() doneWg.Wait() close(outCh) close(mineInCh) - close(mineOutCh) - extDoneWg.Done() - fmt.Printf("tear down all done\n") }() return inCh, outCh, &extDoneWg } @@ -340,6 +249,6 @@ func MineOnce(ctx context.Context, s Scheduler, ts core.TipSet) Output { defer subCtxCancel() inCh, outCh, _ := s.Start(subCtx) - go func() { inCh <- NewInput(subCtx, ts) }() + go func() { inCh <- NewInput(ts) }() return <-outCh } diff --git a/mining/scheduler_test.go b/mining/scheduler_test.go index 45d614af12..1d526a2c37 100644 --- a/mining/scheduler_test.go +++ b/mining/scheduler_test.go @@ -3,6 +3,7 @@ package mining import ( "context" "testing" + "time" "github.com/filecoin-project/go-filecoin/core" "github.com/filecoin-project/go-filecoin/types" @@ -25,7 +26,7 @@ func TestMineOnce(t *testing.T) { assert, require, ts := newTestUtils(t) // Echoes the sent block to output. - worker := newTestWorkerWithDeps(makeEchoMine(require)) + worker := NewTestWorkerWithDeps(MakeEchoMine(require)) scheduler := NewScheduler(worker) result := MineOnce(context.Background(), scheduler, ts) assert.NoError(result.Err) @@ -41,80 +42,156 @@ func TestSchedulerPassesValue(t *testing.T) { assert.Equal(i.TipSet, ts) outCh <- Output{} } - worker := newTestWorkerWithDeps(checkValsMine) + worker := NewTestWorkerWithDeps(checkValsMine) scheduler := NewScheduler(worker) inCh, outCh, _ := scheduler.Start(ctx) - inCh <- NewInput(context.Background(), ts) + inCh <- NewInput(ts) <-outCh cancel() - return } -func TestSchedulerPassValue(t *testing.T) { - assert, require, ts := newTestUtils(t) - // Test that we can push multiple blocks through. There was an actual bug - // where multiply queued inputs were not all processed. - // Another scheduler test. Now we need to have timing involved... +// Test that we can push multiple blocks through. This schedules tipsets +// with successively higher block heights (aka epoch). +func TestSchedulerPassesManyValues(t *testing.T) { + assert, require, ts1 := newTestUtils(t) ctx, cancel := context.WithCancel(context.Background()) - worker := newTestWorkerWithDeps(makeEchoMine(require)) + var checkTS core.TipSet + // make tipsets with progressively higher heights + blk2 := &types.Block{StateRoot: types.SomeCid(), Height: 1} + ts2 := core.RequireNewTipSet(require, blk2) + blk3 := &types.Block{StateRoot: types.SomeCid(), Height: 2} + ts3 := core.RequireNewTipSet(require, blk3) + + checkValsMine := func(c context.Context, i Input, outCh chan<- Output) { + assert.Equal(i.TipSet, checkTS) + outCh <- Output{} + } + worker := NewTestWorkerWithDeps(checkValsMine) scheduler := NewScheduler(worker) inCh, outCh, _ := scheduler.Start(ctx) // Note: inputs have to pass whatever check on newly arriving tipsets - // are in place in Start(). - inCh <- NewInput(context.Background(), ts) - inCh <- NewInput(context.Background(), ts) - inCh <- NewInput(context.Background(), ts) + // are in place in Start(). For the (default) timingScheduler tipsets + // need increasing heights. + checkTS = ts1 + inCh <- NewInput(ts1) <-outCh + checkTS = ts2 + inCh <- NewInput(ts2) <-outCh + checkTS = ts3 + inCh <- NewInput(ts3) <-outCh assert.Equal(ChannelEmpty, ReceiveOutCh(outCh)) cancel() } -func TestSchedulerCancelInputCtx(t *testing.T) { - assert, _, ts := newTestUtils(t) - // Test that canceling the Input.Ctx cancels that input's mining run. - // scheduler test. Again state / timing considerations need to be respected now - // TODO: this test might need to go away with new scheduler patterns, at very least - // it should be reevaluated after we settle on a solid new pattern for canceling input runs. - miningCtx, miningCtxCancel := context.WithCancel(context.Background()) - inputCtx, inputCtxCancel := context.WithCancel(context.Background()) - var gotMineCtx context.Context - fakeMine := func(c context.Context, i Input, outCh chan<- Output) { - gotMineCtx = c +// TestSchedulerCollect tests that the scheduler collects tipsets before mining +func TestSchedulerCollect(t *testing.T) { + assert, require, ts1 := newTestUtils(t) + ctx, cancel := context.WithCancel(context.Background()) + blk2 := &types.Block{StateRoot: types.SomeCid(), Height: 1} + ts2 := core.RequireNewTipSet(require, blk2) + blk3 := &types.Block{StateRoot: types.SomeCid(), Height: 1} + ts3 := core.RequireNewTipSet(require, blk3) + checkValsMine := func(c context.Context, i Input, outCh chan<- Output) { + assert.Equal(i.TipSet, ts3) outCh <- Output{} } - worker := newTestWorkerWithDeps(fakeMine) + worker := NewTestWorkerWithDeps(checkValsMine) scheduler := NewScheduler(worker) - inCh, outCh, _ := scheduler.Start(miningCtx) - inCh <- NewInput(inputCtx, ts) + inCh, outCh, _ := scheduler.Start(ctx) + inCh <- NewInput(ts1) + inCh <- NewInput(ts2) + inCh <- NewInput(ts3) // the scheduler will collect the latest input <-outCh - inputCtxCancel() - assert.Error(gotMineCtx.Err()) // Same context as miningRunCtx. - assert.NoError(miningCtx.Err()) - miningCtxCancel() + cancel() +} + +// TestCannotInterruptMiner tests that a tipset from the same epoch, i.e. with +// the same height, does not affect the base tipset for mining once the +// mining delay has finished. +func TestCannotInterruptMiner(t *testing.T) { + assert, require, ts1 := newTestUtils(t) + ctx, cancel := context.WithCancel(context.Background()) + blk1 := ts1.ToSlice()[0] + blk2 := &types.Block{StateRoot: types.SomeCid(), Height: 0} + ts2 := core.RequireNewTipSet(require, blk2) + blockingMine := func(c context.Context, i Input, outCh chan<- Output) { + time.Sleep(mineSleepTime) + assert.Equal(i.TipSet, ts1) + outCh <- Output{NewBlock: blk1} + } + worker := NewTestWorkerWithDeps(blockingMine) + scheduler := NewScheduler(worker) + inCh, outCh, _ := scheduler.Start(ctx) + inCh <- NewInput(ts1) + // Wait until well after the mining delay, and send a new input. + time.Sleep(4 * mineDelay) + inCh <- NewInput(ts2) + out := <-outCh + assert.Equal(out.NewBlock, blk1) + cancel() } func TestSchedulerCancelMiningCtx(t *testing.T) { assert, _, ts := newTestUtils(t) // Test that canceling the mining context stops mining, cancels // the inner context, and closes the output channel. - // scheduler test miningCtx, miningCtxCancel := context.WithCancel(context.Background()) - inputCtx, inputCtxCancel := context.WithCancel(context.Background()) - gotMineCtx := context.Background() - fakeMine := func(c context.Context, i Input, outCh chan<- Output) { - gotMineCtx = c - outCh <- Output{} + shouldCancelMine := func(c context.Context, i Input, outCh chan<- Output) { + mineTimer := time.NewTimer(mineSleepTime) + select { + case <-mineTimer.C: + t.Fatal("should not take whole time") + case <-c.Done(): + } } - worker := newTestWorkerWithDeps(fakeMine) + worker := NewTestWorkerWithDeps(shouldCancelMine) scheduler := NewScheduler(worker) inCh, outCh, doneWg := scheduler.Start(miningCtx) - inCh <- NewInput(inputCtx, ts) - <-outCh + inCh <- NewInput(ts) miningCtxCancel() doneWg.Wait() assert.Equal(ChannelClosed, ReceiveOutCh(outCh)) - assert.Error(gotMineCtx.Err()) - inputCtxCancel() +} + +func TestSchedulerMultiRoundWithCollect(t *testing.T) { + assert, require, ts1 := newTestUtils(t) + ctx, cancel := context.WithCancel(context.Background()) + var checkTS core.TipSet + // make tipsets with progressively higher heights + blk2 := &types.Block{StateRoot: types.SomeCid(), Height: 1} + ts2 := core.RequireNewTipSet(require, blk2) + blk3 := &types.Block{StateRoot: types.SomeCid(), Height: 2} + ts3 := core.RequireNewTipSet(require, blk3) + + checkValsMine := func(c context.Context, i Input, outCh chan<- Output) { + assert.Equal(i.TipSet, checkTS) + outCh <- Output{} + } + worker := NewTestWorkerWithDeps(checkValsMine) + scheduler := NewScheduler(worker) + inCh, outCh, doneWg := scheduler.Start(ctx) + // Note: inputs have to pass whatever check on newly arriving tipsets + // are in place in Start(). For the (default) timingScheduler tipsets + // need increasing heights. + checkTS = ts1 + inCh <- NewInput(ts3) + inCh <- NewInput(ts2) + inCh <- NewInput(ts1) + <-outCh + checkTS = ts2 + inCh <- NewInput(ts2) + inCh <- NewInput(ts1) + inCh <- NewInput(ts3) + inCh <- NewInput(ts2) + inCh <- NewInput(ts2) + <-outCh + checkTS = ts3 + inCh <- NewInput(ts3) + <-outCh + assert.Equal(ChannelEmpty, ReceiveOutCh(outCh)) + cancel() + doneWg.Wait() + assert.Equal(ChannelClosed, ReceiveOutCh(outCh)) } diff --git a/mining/testing.go b/mining/testing.go index f726e68f4d..2ac729cf74 100644 --- a/mining/testing.go +++ b/mining/testing.go @@ -22,25 +22,29 @@ func (s *MockScheduler) Start(ctx context.Context) (chan<- Input, <-chan Output, // TestWorker is a worker with a customizable work function to facilitate // easy testing. type TestWorker struct { - WorkFun func(context.Context, Input, chan<- Output) + WorkFunc func(context.Context, Input, chan<- Output) } -// Mine is the TestWorker's Work function. It simply calls the workFun +// Mine is the TestWorker's Work function. It simply calls the WorkFunc // field. func (w *TestWorker) Mine(ctx context.Context, input Input, outCh chan<- Output) { - if w.WorkFun == nil { - panic("must set MutableTestWorker's WorkFun before calling Work") + if w.WorkFunc == nil { + panic("must set MutableTestWorker's WorkFunc before calling Work") } - w.WorkFun(ctx, input, outCh) + w.WorkFunc(ctx, input, outCh) } -func newTestWorkerWithDeps(f func(context.Context, Input, chan<- Output)) *TestWorker { +// NewTestWorkerWithDeps creates a worker that calls the provided input +// function when Mine() is called. +func NewTestWorkerWithDeps(f func(context.Context, Input, chan<- Output)) *TestWorker { return &TestWorker{ - WorkFun: f, + WorkFunc: f, } } -func makeEchoMine(require *require.Assertions) func(context.Context, Input, chan<- Output) { +// MakeEchoMine returns a test worker function that itself returns the first +// block of the input tipset as output. +func MakeEchoMine(require *require.Assertions) func(context.Context, Input, chan<- Output) { echoMine := func(c context.Context, i Input, outCh chan<- Output) { require.NotEqual(0, len(i.TipSet)) b := i.TipSet.ToSlice()[0] diff --git a/mining/worker.go b/mining/worker.go index 6da9910ec7..29beea44e1 100644 --- a/mining/worker.go +++ b/mining/worker.go @@ -17,6 +17,7 @@ import ( "github.com/filecoin-project/go-filecoin/vm" "gx/ipfs/QmSkuaNgyGmV8c1L3cZNWcUxRJV6J3nsD96JVQPcWcwtyW/go-hamt-ipld" + "gx/ipfs/QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy/errors" sha256 "gx/ipfs/QmXTpwq2AkzQsPjKqFQDNY2bMdsAT53hUBETeyj8QRHTZU/sha256-simd" "gx/ipfs/QmcD7SqfyQyA91TZUQ7VPRYbGarxmY7EsQewVYMuN5LNSv/go-ipfs-blockstore" logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log" @@ -24,27 +25,35 @@ import ( var ( ticketDomain *big.Int - log = logging.Logger("mining") + // mineWarnThreshold is the number of seconds of mining after which the worker + // logs a warning that mining is wasting an unexpected amount of work. + mineWarnThreshold float64 + log = logging.Logger("mining") ) +// mineSleepTime is the estimated mining time. We define this so that we can +// fake mining with the current incomplete system. TODO this needs to be +// configurable to expediate both unit and large scale testing. +const mineSleepTime = mineDelay * 30 + func init() { ticketDomain = &big.Int{} ticketDomain.Exp(big.NewInt(2), big.NewInt(256), nil) ticketDomain.Sub(ticketDomain, big.NewInt(1)) + + mineWarnThreshold = mineSleepTime.Seconds() / 2.0 } // Input is the TipSets the worker should mine on, the address // to accrue rewards to, and a context that the caller can use // to cancel this mining run. type Input struct { - Ctx context.Context // TODO: we should evaluate if this is still useful - TipSet core.TipSet - NullBlks uint64 + TipSet core.TipSet } // NewInput instantiates a new Input. -func NewInput(ctx context.Context, ts core.TipSet) Input { - return Input{Ctx: ctx, TipSet: ts} +func NewInput(ts core.TipSet) Input { + return Input{TipSet: ts} } // Output is the result of a single mining run. It has either a new @@ -76,9 +85,9 @@ type GetWeight func(context.Context, core.TipSet) (uint64, uint64, error) type miningApplier func(ctx context.Context, messages []*types.SignedMessage, st state.Tree, vms vm.StorageMap, bh *types.BlockHeight) (core.ApplyMessagesResponse, error) -// MiningWorker runs a mining job. -type MiningWorker struct { - createPoST DoSomeWorkFunc // TODO: rename createPoSTFunc? +// DefaultWorker runs a mining job. +type DefaultWorker struct { + createPoST DoSomeWorkFunc // TODO: rename createPoSTFunc minerAddr types.Address // TODO: needs to be a key in the near future // consensus things @@ -93,14 +102,14 @@ type MiningWorker struct { cstore *hamt.CborIpldStore } -// NewMiningWorker instantiates a new Worker. -func NewMiningWorker(messagePool *core.MessagePool, getStateTree GetStateTree, getWeight GetWeight, applyMessages miningApplier, powerTable core.PowerTableView, bs blockstore.Blockstore, cst *hamt.CborIpldStore, miner types.Address) *MiningWorker { - return NewMiningWorkerWithDeps(messagePool, getStateTree, getWeight, applyMessages, powerTable, bs, cst, miner, createPoST) +// NewDefaultWorker instantiates a new Worker. +func NewDefaultWorker(messagePool *core.MessagePool, getStateTree GetStateTree, getWeight GetWeight, applyMessages miningApplier, powerTable core.PowerTableView, bs blockstore.Blockstore, cst *hamt.CborIpldStore, miner types.Address) *DefaultWorker { + return NewDefaultWorkerWithDeps(messagePool, getStateTree, getWeight, applyMessages, powerTable, bs, cst, miner, createPoST) } -// NewMiningWorkerWithDeps instantiates a new Worker with custom functions. -func NewMiningWorkerWithDeps(messagePool *core.MessagePool, getStateTree GetStateTree, getWeight GetWeight, applyMessages miningApplier, powerTable core.PowerTableView, bs blockstore.Blockstore, cst *hamt.CborIpldStore, miner types.Address, createPoST DoSomeWorkFunc) *MiningWorker { - return &MiningWorker{ +// NewDefaultWorkerWithDeps instantiates a new Worker with custom functions. +func NewDefaultWorkerWithDeps(messagePool *core.MessagePool, getStateTree GetStateTree, getWeight GetWeight, applyMessages miningApplier, powerTable core.PowerTableView, bs blockstore.Blockstore, cst *hamt.CborIpldStore, miner types.Address, createPoST DoSomeWorkFunc) *DefaultWorker { + return &DefaultWorker{ getStateTree: getStateTree, getWeight: getWeight, messagePool: messagePool, @@ -118,11 +127,14 @@ func NewMiningWorkerWithDeps(messagePool *core.MessagePool, getStateTree GetStat // is a good idea for now. type DoSomeWorkFunc func() -// Mine implements the MiningWorkers main mining function.. -func (w *MiningWorker) Mine(ctx context.Context, input Input, outCh chan<- Output) { +// Mine implements the DefaultWorkers main mining function.. +func (w *DefaultWorker) Mine(ctx context.Context, input Input, outCh chan<- Output) { ctx = log.Start(ctx, "Worker.Mine") defer log.Finish(ctx) - + if len(input.TipSet) == 0 { + outCh <- Output{Err: errors.New("bad input tipset with no blocks sent to Mine()")} + return + } // TODO: derive these from actual storage power. // This should now be pretty easy because the worker has getState and // powertable view. @@ -132,31 +144,43 @@ func (w *MiningWorker) Mine(ctx context.Context, input Input, outCh chan<- Outpu const myPower = 1 const totalPower = 5 - if ctx.Err() != nil { - return - } - - challenge := createChallenge(input.TipSet, input.NullBlks) - proof := createProof(challenge, w.createPoST) - ticket := createTicket(proof) + for nullBlkCount := uint64(0); ; nullBlkCount++ { + log.Infof("Mining on tipset: %s, with %d null blocks.", input.TipSet.String(), nullBlkCount) + start := time.Now() + if ctx.Err() != nil { + return + } - // TODO: Test the interplay of isWinningTicket() and createPoST() - if isWinningTicket(ticket, myPower, totalPower) { - next, err := w.Generate(ctx, input.TipSet, ticket, input.NullBlks) - if err == nil { - log.SetTag(ctx, "block", next) + challenge := createChallenge(input.TipSet, nullBlkCount) + prCh := createProof(challenge, w.createPoST) + var ticket []byte + select { + case <-ctx.Done(): + mineTime := time.Since(start) + log.Infof("Mining run on: %s canceled.", input.TipSet.String()) + if mineTime.Seconds() < mineWarnThreshold { + log.Warningf("Abandoning mining after %f seconds. Wasting lots of work...", mineTime.Seconds()) + } + return + case proof := <-prCh: + ticket = createTicket(proof) } - if ctx.Err() == nil { + // TODO: Test the interplay of isWinningTicket() and createPoST() + if isWinningTicket(ticket, myPower, totalPower) { + next, err := w.Generate(ctx, input.TipSet, ticket, nullBlkCount) + if err == nil { + log.SetTag(ctx, "block", next) + } outCh <- NewOutput(next, err) - } else { - log.Warningf("Abandoning successfully mined block without publishing: %s", input.TipSet.String()) } } - time.Sleep(mineSleepTime) } -func createChallenge(parents core.TipSet, nullBlockCount uint64) []byte { +// TODO -- in general this won't work with only the base tipset, we'll potentially +// need some chain manager utils, similar to the State function, to sample +// further back in the chain. +func createChallenge(parents core.TipSet, nullBlkCount uint64) []byte { // Find the smallest ticket from parent set var smallest types.Signature for _, v := range parents { @@ -166,17 +190,22 @@ func createChallenge(parents core.TipSet, nullBlockCount uint64) []byte { } buf := make([]byte, 4) - n := binary.PutUvarint(buf, nullBlockCount) + n := binary.PutUvarint(buf, nullBlkCount) buf = append(smallest, buf[:n]...) h := sha256.Sum256(buf) return h[:] } -func createProof(challenge []byte, createPoST DoSomeWorkFunc) []byte { - // TODO: Actually use the results of the PoST once it is implemented. - createPoST() - return challenge +// TODO: Actually use the results of the PoST once it is implemented. +// Currently createProof just passes the challenge value through. +func createProof(challenge []byte, createPoST DoSomeWorkFunc) <-chan []byte { + c := make(chan []byte) + go func() { + createPoST() // TODO send new PoST on channel once we can create it + c <- challenge + }() + return c } func createTicket(proof []byte) []byte { diff --git a/mining/worker_test.go b/mining/worker_test.go index e054a5e80e..8f0740c435 100644 --- a/mining/worker_test.go +++ b/mining/worker_test.go @@ -26,37 +26,52 @@ func Test_Mine(t *testing.T) { newCid := types.NewCidForTestGetter() stateRoot := newCid() baseBlock := &types.Block{Height: 2, StateRoot: stateRoot} - tipSet := core.TipSet{baseBlock.Cid().String(): baseBlock} - ctx := context.Background() + tipSet := core.RequireNewTipSet(require, baseBlock) + ctx, cancel := context.WithCancel(context.Background()) - // Success. st, pool, addrs, cst, bs := sharedSetup(t) getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { return st, nil } - worker := NewMiningWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) - next, err := worker.Generate(ctx, tipSet, nil, 0) - require.NoError(err) + + // Success case. TODO: this case isn't testing much. Testing w.Mine + // further needs a lot more attention. + worker := NewDefaultWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) outCh := make(chan Output) doSomeWorkCalled := false worker.createPoST = func() { doSomeWorkCalled = true } - input := NewInput(ctx, tipSet) + input := NewInput(tipSet) go worker.Mine(ctx, input, outCh) r := <-outCh assert.NoError(r.Err) assert.True(doSomeWorkCalled) - assert.True(r.NewBlock.Cid().Equals(next.Cid())) + cancel() // Block generation fails. - worker = NewMiningWorker(pool, makeExplodingGetStateTree(st), getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + ctx, cancel = context.WithCancel(context.Background()) + worker = NewDefaultWorker(pool, makeExplodingGetStateTree(st), getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) outCh = make(chan Output) doSomeWorkCalled = false worker.createPoST = func() { doSomeWorkCalled = true } - input = NewInput(ctx, tipSet) + input = NewInput(tipSet) go worker.Mine(ctx, input, outCh) r = <-outCh assert.Error(r.Err) assert.True(doSomeWorkCalled) + cancel() + + // Sent empty tipset + ctx, cancel = context.WithCancel(context.Background()) + worker = NewDefaultWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + outCh = make(chan Output) + doSomeWorkCalled = false + worker.createPoST = func() { doSomeWorkCalled = true } + input = NewInput(core.TipSet{}) + go worker.Mine(ctx, input, outCh) + r = <-outCh + assert.Error(r.Err) + assert.False(doSomeWorkCalled) + cancel() } func TestIsWinningTicket(t *testing.T) { @@ -123,26 +138,12 @@ func TestCreateChallenge(t *testing.T) { } } -// Implements NullBlockTimerFunc with the policy that it is always a good time -// to create a null block. -func nullBlockImmediately() { -} - -// Returns a ticket checking function that return true every third time -func everyThirdWinningTicket() func(_ []byte, _, _ int64) bool { - count := 0 - return func(_ []byte, _, _ int64) bool { - count++ - return count%3 == 0 - } -} - var seed = types.GenerateKeyInfoSeed() var ki = types.MustGenerateKeyInfo(10, seed) var mockSigner = types.NewMockSigner(ki) func TestGenerate(t *testing.T) { - // TODO fritz use core.FakeActor for state/contract tests for generate: + // TODO use core.FakeActor for state/contract tests for generate: // - test nonces out of order // - test nonce gap } @@ -248,7 +249,7 @@ func TestGenerateMultiBlockTipSet(t *testing.T) { getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { return st, nil } - worker := NewMiningWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + worker := NewDefaultWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) parents := types.NewSortedCidSet(newCid()) stateRoot := newCid() @@ -285,7 +286,7 @@ func TestGeneratePoolBlockResults(t *testing.T) { getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { return st, nil } - worker := NewMiningWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + worker := NewDefaultWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) // addr3 doesn't correspond to an extant account, so this will trigger errAccountNotFound -- a temporary failure. msg1 := types.NewMessage(addrs[2], addrs[0], 0, nil, "", nil) @@ -341,7 +342,7 @@ func TestGenerateSetsBasicFields(t *testing.T) { getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { return st, nil } - worker := NewMiningWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + worker := NewDefaultWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) h := types.Uint64(100) wNum := types.Uint64(1000) @@ -379,7 +380,7 @@ func TestGenerateWithoutMessages(t *testing.T) { getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { return st, nil } - worker := NewMiningWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + worker := NewDefaultWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) assert.Len(pool.Pending(), 0) baseBlock := types.Block{ @@ -404,7 +405,7 @@ func TestGenerateError(t *testing.T) { st, pool, addrs, cst, bs := sharedSetup(t) - worker := NewMiningWorker(pool, makeExplodingGetStateTree(st), getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + worker := NewDefaultWorker(pool, makeExplodingGetStateTree(st), getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) // This is actually okay and should result in a receipt msg := types.NewMessage(addrs[0], addrs[1], 0, nil, "", nil) diff --git a/node/node.go b/node/node.go index 3b77795b42..8e9a75bf2e 100644 --- a/node/node.go +++ b/node/node.go @@ -324,7 +324,9 @@ func (node *Node) handleNewMiningOutput(miningOutCh <-chan mining.Output) { } func (node *Node) handleNewHeaviestTipSet(ctx context.Context, head core.TipSet) { + currentBestCancel := func() {} for ts := range node.HeaviestTipSetCh { + var currentBestCtx context.Context newHead := ts.(core.TipSet) if len(newHead) == 0 { log.Error("TipSet of size 0 published on HeaviestTipSetCh:") @@ -338,19 +340,25 @@ func (node *Node) handleNewHeaviestTipSet(ctx context.Context, head core.TipSet) } head = newHead if node.isMining() { - + // The miningInCh should accept tipsets with strictly increasing + // weight, so cancel the previous input. + currentBestCancel() + currentBestCtx, currentBestCancel = context.WithCancel(context.Background()) node.miningDoneWg.Add(1) go func() { defer func() { node.miningDoneWg.Done() }() select { - case <-node.miningCtx.Done(): + case <-node.miningCtx.Done(): // mining is done + return + case <-currentBestCtx.Done(): // this input is no longer the heaviest return - case node.miningInCh <- mining.NewInput(context.Background(), head): + case node.miningInCh <- mining.NewInput(head): } }() } node.HeaviestTipSetHandled() } + currentBestCancel() // keep the linter happy } func (node *Node) cancelSubscriptions() { @@ -436,7 +444,7 @@ func (node *Node) StartMining() error { getStateTree := func(ctx context.Context, ts core.TipSet) (state.Tree, error) { return node.ChainMgr.State(ctx, ts.ToSlice()) } - worker := mining.NewMiningWorker(node.MsgPool, getStateTree, node.ChainMgr.Weight, core.ApplyMessages, node.ChainMgr.PwrTableView, node.Blockstore, node.CborStore, miningAddress) + worker := mining.NewDefaultWorker(node.MsgPool, getStateTree, node.ChainMgr.Weight, core.ApplyMessages, node.ChainMgr.PwrTableView, node.Blockstore, node.CborStore, miningAddress) node.MiningScheduler = mining.NewScheduler(worker) node.miningCtx, node.cancelMining = context.WithCancel(context.Background()) inCh, outCh, doneWg := node.MiningScheduler.Start(node.miningCtx) @@ -455,15 +463,14 @@ func (node *Node) StartMining() error { node.miningDoneWg.Add(1) go func() { defer func() { node.miningDoneWg.Done() }() - // TODO(EC): Here is where we kick mining off when we start off. Will - // need to change to pass in best tipsets, of which there can be multiple. hts := node.ChainMgr.GetHeaviestTipSet() select { case <-node.miningCtx.Done(): return - case node.miningInCh <- mining.NewInput(context.Background(), hts): + case node.miningInCh <- mining.NewInput(hts): } }() + return nil } @@ -482,7 +489,6 @@ func (node *Node) initSectorBuilder(minerAddr types.Address) error { // StopMining stops mining on new blocks. func (node *Node) StopMining() { - // TODO should probably also keep track of and cancel last mining.Input.Ctx. node.setIsMining(false) } diff --git a/node/testing.go b/node/testing.go index b7003d3aeb..7d5b2bcb19 100644 --- a/node/testing.go +++ b/node/testing.go @@ -119,7 +119,7 @@ func RunCreateMiner(t *testing.T, node *Node, from types.Address, pledge types.B getStateTree := func(ctx context.Context, ts core.TipSet) (state.Tree, error) { return node.ChainMgr.State(ctx, ts.ToSlice()) } - w := mining.NewMiningWorker(node.MsgPool, getStateTree, node.ChainMgr.Weight, core.ApplyMessages, node.ChainMgr.PwrTableView, node.Blockstore, node.CborStore, address.TestAddress) + w := mining.NewDefaultWorker(node.MsgPool, getStateTree, node.ChainMgr.Weight, core.ApplyMessages, node.ChainMgr.PwrTableView, node.Blockstore, node.CborStore, address.TestAddress) cur := node.ChainMgr.GetHeaviestTipSet() out := mining.MineOnce(ctx, mining.NewScheduler(w), cur) require.NoError(out.Err) diff --git a/tools/go-fakecoin/main.go b/tools/go-fakecoin/main.go index 6ec18e95d4..44ce949d72 100644 --- a/tools/go-fakecoin/main.go +++ b/tools/go-fakecoin/main.go @@ -108,9 +108,9 @@ func getChainManager(d repo.Datastore, bs blockstore.Blockstore) (*core.ChainMan return cm, cst } -func getWorker(msgPool *core.MessagePool, cm *core.ChainManager, cst *hamt.CborIpldStore, bs blockstore.Blockstore) *mining.MiningWorker { +func getWorker(msgPool *core.MessagePool, cm *core.ChainManager, cst *hamt.CborIpldStore, bs blockstore.Blockstore) *mining.DefaultWorker { ma := types.MakeTestAddress("miningAddress") - return mining.NewMiningWorker(msgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { + return mining.NewDefaultWorker(msgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { return cm.State(ctx, ts.ToSlice()) }, cm.Weight, core.ApplyMessages, cm.PwrTableView, bs, cst, ma) }