diff --git a/api/impl/mining.go b/api/impl/mining.go index 7e1142feed..e573abfc06 100644 --- a/api/impl/mining.go +++ b/api/impl/mining.go @@ -21,16 +21,16 @@ func (api *nodeMining) Once(ctx context.Context) (*types.Block, error) { nd := api.api.node ts := nd.ChainMgr.GetHeaviestTipSet() - blockGenerator := mining.NewBlockGenerator(nd.MsgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { - return nd.ChainMgr.State(ctx, ts.ToSlice()) - }, nd.ChainMgr.Weight, core.ApplyMessages, nd.ChainMgr.PwrTableView, nd.Blockstore, nd.CborStore) - miningAddr, err := nd.MiningAddress() if err != nil { return nil, err } - res := mining.MineOnce(ctx, mining.NewWorker(blockGenerator, miningAddr), ts) + worker := mining.NewDefaultWorker(nd.MsgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { + return nd.ChainMgr.State(ctx, ts.ToSlice()) + }, nd.ChainMgr.Weight, core.ApplyMessages, nd.ChainMgr.PwrTableView, nd.Blockstore, nd.CborStore, miningAddr) + + res := mining.MineOnce(ctx, mining.NewScheduler(worker), ts) if res.Err != nil { return nil, res.Err } diff --git a/mining/block_generator.go b/mining/block_generate.go similarity index 51% rename from mining/block_generator.go rename to mining/block_generate.go index c4964027e2..d1aae265ee 100644 --- a/mining/block_generator.go +++ b/mining/block_generate.go @@ -1,78 +1,37 @@ package mining +// Block generation is part of the logic of the DefaultWorker. +// 'generate' is that function that actually creates a new block from a base +// TipSet using the DefaultWorker's many utilities. + import ( "context" - "gx/ipfs/QmSkuaNgyGmV8c1L3cZNWcUxRJV6J3nsD96JVQPcWcwtyW/go-hamt-ipld" errors "gx/ipfs/QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy/errors" - "gx/ipfs/QmcD7SqfyQyA91TZUQ7VPRYbGarxmY7EsQewVYMuN5LNSv/go-ipfs-blockstore" - logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log" "github.com/filecoin-project/go-filecoin/address" "github.com/filecoin-project/go-filecoin/core" - "github.com/filecoin-project/go-filecoin/state" "github.com/filecoin-project/go-filecoin/types" "github.com/filecoin-project/go-filecoin/vm" ) -var log = logging.Logger("mining") - -// GetStateTree is a function that gets the aggregate state tree of a TipSet. It's -// its own function to facilitate testing. -type GetStateTree func(context.Context, core.TipSet) (state.Tree, error) - -// GetWeight is a function that calculates the weight of a TipSet. Weight is -// expressed as two uint64s comprising a rational number. -type GetWeight func(context.Context, core.TipSet) (uint64, uint64, error) - -// BlockGenerator is the primary interface for blockGenerator. -type BlockGenerator interface { - Generate(context.Context, core.TipSet, types.Signature, uint64, types.Address) (*types.Block, error) -} - -// NewBlockGenerator returns a new BlockGenerator. -func NewBlockGenerator(messagePool *core.MessagePool, getStateTree GetStateTree, getWeight GetWeight, applyMessages miningApplier, powerTable core.PowerTableView, bs blockstore.Blockstore, cstore *hamt.CborIpldStore) BlockGenerator { - return &blockGenerator{ - messagePool: messagePool, - getStateTree: getStateTree, - getWeight: getWeight, - applyMessages: applyMessages, - powerTable: powerTable, - blockstore: bs, - cstore: cstore, - } -} - -type miningApplier func(ctx context.Context, messages []*types.SignedMessage, st state.Tree, vms vm.StorageMap, bh *types.BlockHeight) (core.ApplyMessagesResponse, error) - -// blockGenerator generates new blocks for inclusion in the chain. -type blockGenerator struct { - messagePool *core.MessagePool - getStateTree GetStateTree - getWeight GetWeight - applyMessages miningApplier - powerTable core.PowerTableView - blockstore blockstore.Blockstore - cstore *hamt.CborIpldStore -} - // Generate returns a new block created from the messages in the pool. -func (b blockGenerator) Generate(ctx context.Context, baseTipSet core.TipSet, ticket types.Signature, nullBlockCount uint64, miningAddress types.Address) (*types.Block, error) { - stateTree, err := b.getStateTree(ctx, baseTipSet) +func (w *DefaultWorker) Generate(ctx context.Context, baseTipSet core.TipSet, ticket types.Signature, nullBlockCount uint64) (*types.Block, error) { + stateTree, err := w.getStateTree(ctx, baseTipSet) if err != nil { return nil, errors.Wrap(err, "get state tree") } - if !b.powerTable.HasPower(ctx, stateTree, b.blockstore, miningAddress) { - return nil, errors.Errorf("bad miner address, miner must store files before mining: %s", miningAddress) + if !w.powerTable.HasPower(ctx, stateTree, w.blockstore, w.minerAddr) { + return nil, errors.Errorf("bad miner address, miner must store files before mining: %s", w.minerAddr) } - wNum, wDenom, err := b.getWeight(ctx, baseTipSet) + wNum, wDenom, err := w.getWeight(ctx, baseTipSet) if err != nil { return nil, errors.Wrap(err, "get weight") } - nonce, err := core.NextNonce(ctx, stateTree, b.messagePool, address.NetworkAddress) + nonce, err := core.NextNonce(ctx, stateTree, w.messagePool, address.NetworkAddress) if err != nil { return nil, errors.Wrap(err, "next nonce") } @@ -83,19 +42,20 @@ func (b blockGenerator) Generate(ctx context.Context, baseTipSet core.TipSet, ti } blockHeight := baseHeight + nullBlockCount + 1 - rewardMsg := types.NewMessage(address.NetworkAddress, miningAddress, nonce, types.NewAttoFILFromFIL(1000), "", nil) + rewardMsg := types.NewMessage(address.NetworkAddress, w.minerAddr, nonce, types.NewAttoFILFromFIL(1000), "", nil) srewardMsg := &types.SignedMessage{ Message: *rewardMsg, Signature: nil, } - pending := b.messagePool.Pending() + pending := w.messagePool.Pending() messages := make([]*types.SignedMessage, len(pending)+1) messages[0] = srewardMsg // Reward message must come first since this is a part of the consensus rules. - copy(messages[1:], core.OrderMessagesByNonce(b.messagePool.Pending())) + copy(messages[1:], core.OrderMessagesByNonce(w.messagePool.Pending())) + + vms := vm.NewStorageMap(w.blockstore) + res, err := w.applyMessages(ctx, messages, stateTree, vms, types.NewBlockHeight(blockHeight)) - vms := vm.NewStorageMap(b.blockstore) - res, err := b.applyMessages(ctx, messages, stateTree, vms, types.NewBlockHeight(blockHeight)) if err != nil { return nil, errors.Wrap(err, "generate apply messages") } @@ -111,7 +71,7 @@ func (b blockGenerator) Generate(ctx context.Context, baseTipSet core.TipSet, ti } next := &types.Block{ - Miner: miningAddress, + Miner: w.minerAddr, Height: types.Uint64(blockHeight), Messages: res.SuccessfulMessages, MessageReceipts: receipts, @@ -136,7 +96,7 @@ func (b blockGenerator) Generate(ctx context.Context, baseTipSet core.TipSet, ti for _, msg := range res.SuccessfulMessages { mc, err := msg.Cid() if err == nil { - b.messagePool.Remove(mc) + w.messagePool.Remove(mc) } } @@ -148,7 +108,7 @@ func (b blockGenerator) Generate(ctx context.Context, baseTipSet core.TipSet, ti log.Infof("permanent ApplyMessage failure, [%S]", mc.String()) // Intentionally not handling error case, since it just means we won't be able to remove from pool. if err == nil { - b.messagePool.Remove(mc) + w.messagePool.Remove(mc) } } diff --git a/mining/block_generator_test.go b/mining/block_generator_test.go deleted file mode 100644 index 52965997d7..0000000000 --- a/mining/block_generator_test.go +++ /dev/null @@ -1,369 +0,0 @@ -package mining - -import ( - "context" - "errors" - "testing" - - "github.com/filecoin-project/go-filecoin/address" - "github.com/filecoin-project/go-filecoin/core" - "github.com/filecoin-project/go-filecoin/state" - "github.com/filecoin-project/go-filecoin/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "gx/ipfs/QmSkuaNgyGmV8c1L3cZNWcUxRJV6J3nsD96JVQPcWcwtyW/go-hamt-ipld" - "gx/ipfs/QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP/go-cid" - "gx/ipfs/QmcD7SqfyQyA91TZUQ7VPRYbGarxmY7EsQewVYMuN5LNSv/go-ipfs-blockstore" - "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore" - - "github.com/filecoin-project/go-filecoin/vm" -) - -var seed = types.GenerateKeyInfoSeed() -var ki = types.MustGenerateKeyInfo(10, seed) -var mockSigner = types.NewMockSigner(ki) - -func TestGenerate(t *testing.T) { - // TODO fritz use core.FakeActor for state/contract tests for generate: - // - test nonces out of order - // - test nonce gap -} - -func sharedSetupInitial() (*hamt.CborIpldStore, *core.MessagePool, blockstore.Blockstore, *cid.Cid) { - cst := hamt.NewCborStore() - pool := core.NewMessagePool() - // Install the fake actor so we can execute it. - fakeActorCodeCid := types.AccountActorCodeCid - ds := datastore.NewMapDatastore() - bs := blockstore.NewBlockstore(ds) - return cst, pool, bs, fakeActorCodeCid -} - -func sharedSetup(t *testing.T) (state.Tree, *hamt.CborIpldStore, blockstore.Blockstore, *core.MessagePool, []types.Address) { - require := require.New(t) - cst, pool, bs, fakeActorCodeCid := sharedSetupInitial() - vms := vm.NewStorageMap(bs) - - // TODO: We don't need fake actors here, so these could be made real. - // And the NetworkAddress actor can/should be the real one. - // Stick two fake actors in the state tree so they can talk. - addr1, addr2, addr3, addr4 := mockSigner.Addresses[0], mockSigner.Addresses[1], mockSigner.Addresses[2], mockSigner.Addresses[3] - act1, act2, fakeNetAct := core.RequireNewFakeActor(require, vms, addr1, fakeActorCodeCid), core.RequireNewFakeActor(require, - vms, addr2, fakeActorCodeCid), core.RequireNewFakeActor(require, vms, addr3, fakeActorCodeCid) - minerAct := core.RequireNewMinerActor(require, vms, addr4, addr1, []byte{}, types.NewBytesAmount(10000), core.RequireRandomPeerID(), types.NewAttoFILFromFIL(10000)) - _, st := core.RequireMakeStateTree(require, cst, map[types.Address]*types.Actor{ - // Ensure core.NetworkAddress exists to prevent mining reward message failures. - address.NetworkAddress: fakeNetAct, - addr1: act1, - addr2: act2, - addr4: minerAct, - }) - return st, cst, bs, pool, []types.Address{addr1, addr2, addr3, addr4} -} - -func TestApplyMessagesForSuccessTempAndPermFailures(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - cst, _, bs, fakeActorCodeCid := sharedSetupInitial() - vms := vm.NewStorageMap(bs) - - // Stick two fake actors in the state tree so they can talk. - addr1, addr2 := mockSigner.Addresses[0], mockSigner.Addresses[1] - act1 := core.RequireNewFakeActor(require, vms, addr1, fakeActorCodeCid) - _, st := core.RequireMakeStateTree(require, cst, map[types.Address]*types.Actor{ - addr1: act1, - }) - - ctx := context.Background() - - // NOTE: it is important that each category (success, temporary failure, permanent failure) is represented below. - // If a given message's category changes in the future, it needs to be replaced here in tests by another so we fully - // exercise the categorization. - // addr2 doesn't correspond to an extant account, so this will trigger errAccountNotFound -- a temporary failure. - msg1 := types.NewMessage(addr2, addr1, 0, nil, "", nil) - smsg1, err := types.NewSignedMessage(*msg1, &mockSigner) - require.NoError(err) - - // This is actually okay and should result in a receipt - msg2 := types.NewMessage(addr1, addr2, 0, nil, "", nil) - smsg2, err := types.NewSignedMessage(*msg2, &mockSigner) - require.NoError(err) - - // The following two are sending to self -- errSelfSend, a permanent error. - msg3 := types.NewMessage(addr1, addr1, 1, nil, "", nil) - smsg3, err := types.NewSignedMessage(*msg3, &mockSigner) - require.NoError(err) - - msg4 := types.NewMessage(addr2, addr2, 1, nil, "", nil) - smsg4, err := types.NewSignedMessage(*msg4, &mockSigner) - require.NoError(err) - - messages := []*types.SignedMessage{smsg1, smsg2, smsg3, smsg4} - - res, err := core.ApplyMessages(ctx, messages, st, vms, types.NewBlockHeight(0)) - - assert.Len(res.PermanentFailures, 2) - assert.Contains(res.PermanentFailures, smsg3) - assert.Contains(res.PermanentFailures, smsg4) - - assert.Len(res.TemporaryFailures, 1) - assert.Contains(res.TemporaryFailures, smsg1) - - assert.Len(res.Results, 1) - assert.Contains(res.SuccessfulMessages, smsg2) - - assert.NoError(err) -} - -func TestGenerateMultiBlockTipSet(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - ctx := context.Background() - newCid := types.NewCidForTestGetter() - st, cst, bs, pool, addrs := sharedSetup(t) - - getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { - return st, nil - } - getWeight := func(c context.Context, ts core.TipSet) (uint64, uint64, error) { - num, den, err := ts.ParentWeight() - if err != nil { - return uint64(0), uint64(0), err - } - return num + uint64(int64(len(ts))*int64(core.ECV)), den, nil - } - generator := NewBlockGenerator(pool, getStateTree, getWeight, core.ApplyMessages, &core.TestView{}, bs, cst) - - parents := types.NewSortedCidSet(newCid()) - stateRoot := newCid() - baseBlock1 := types.Block{ - Parents: parents, - Height: types.Uint64(100), - ParentWeightNum: types.Uint64(1000), - StateRoot: stateRoot, - } - baseBlock2 := types.Block{ - Parents: parents, - Height: types.Uint64(100), - ParentWeightNum: types.Uint64(1000), - StateRoot: stateRoot, - Nonce: 1, - } - blk, err := generator.Generate(ctx, core.RequireNewTipSet(require, &baseBlock1, &baseBlock2), nil, 0, addrs[3]) - assert.NoError(err) - - assert.Len(blk.Messages, 1) // This is the mining reward. - assert.Equal(types.Uint64(101), blk.Height) - assert.Equal(types.Uint64(1020), blk.ParentWeightNum) -} - -// After calling Generate, do the new block and new state of the message pool conform to our expectations? -func TestGeneratePoolBlockResults(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - ctx := context.Background() - newCid := types.NewCidForTestGetter() - st, cst, bs, pool, addrs := sharedSetup(t) - - getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { - return st, nil - } - getWeight := func(c context.Context, ts core.TipSet) (uint64, uint64, error) { - num, den, err := ts.ParentWeight() - if err != nil { - return uint64(0), uint64(0), err - } - return num + uint64(int64(len(ts))*int64(core.ECV)), den, nil - } - generator := NewBlockGenerator(pool, getStateTree, getWeight, core.ApplyMessages, &core.TestView{}, bs, cst) - - // addr3 doesn't correspond to an extant account, so this will trigger errAccountNotFound -- a temporary failure. - msg1 := types.NewMessage(addrs[2], addrs[0], 0, nil, "", nil) - smsg1, err := types.NewSignedMessage(*msg1, &mockSigner) - require.NoError(err) - - // This is actually okay and should result in a receipt - msg2 := types.NewMessage(addrs[0], addrs[1], 0, nil, "", nil) - smsg2, err := types.NewSignedMessage(*msg2, &mockSigner) - require.NoError(err) - - // The following two are sending to self -- errSelfSend, a permanent error. - msg3 := types.NewMessage(addrs[0], addrs[0], 1, nil, "", nil) - smsg3, err := types.NewSignedMessage(*msg3, &mockSigner) - require.NoError(err) - - msg4 := types.NewMessage(addrs[1], addrs[1], 0, nil, "", nil) - smsg4, err := types.NewSignedMessage(*msg4, &mockSigner) - require.NoError(err) - - pool.Add(smsg1) - pool.Add(smsg2) - pool.Add(smsg3) - pool.Add(smsg4) - - assert.Len(pool.Pending(), 4) - baseBlock := types.Block{ - Parents: types.NewSortedCidSet(newCid()), - Height: types.Uint64(100), - StateRoot: newCid(), - } - blk, err := generator.Generate(ctx, core.RequireNewTipSet(require, &baseBlock), nil, 0, addrs[3]) - assert.NoError(err) - - assert.Len(pool.Pending(), 1) // This is the temporary failure. - assert.Contains(pool.Pending(), smsg1) - - assert.Len(blk.Messages, 2) // This is the good message + the mining reward. - - // Is the mining reward first? This will fail 50% of the time if we don't force the reward to come first. - assert.Equal(address.NetworkAddress, blk.Messages[0].From) -} - -func TestGenerateSetsBasicFields(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - ctx := context.Background() - newCid := types.NewCidForTestGetter() - - st, cst, bs, pool, addrs := sharedSetup(t) - - getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { - return st, nil - } - getWeight := func(c context.Context, ts core.TipSet) (uint64, uint64, error) { - num, den, err := ts.ParentWeight() - if err != nil { - return uint64(0), uint64(0), err - } - return num + uint64(int64(len(ts))*int64(core.ECV)), den, nil - } - generator := NewBlockGenerator(pool, getStateTree, getWeight, core.ApplyMessages, &core.TestView{}, bs, cst) - - h := types.Uint64(100) - wNum := types.Uint64(1000) - wDenom := types.Uint64(1) - baseBlock := types.Block{ - Height: h, - ParentWeightNum: wNum, - ParentWeightDenom: wDenom, - StateRoot: newCid(), - } - baseTipSet := core.RequireNewTipSet(require, &baseBlock) - blk, err := generator.Generate(ctx, baseTipSet, nil, 0, addrs[3]) - assert.NoError(err) - - assert.Equal(h+1, blk.Height) - assert.Equal(addrs[3], blk.Miner) - - blk, err = generator.Generate(ctx, baseTipSet, nil, 1, addrs[3]) - assert.NoError(err) - - assert.Equal(h+2, blk.Height) - assert.Equal(wNum+10.0, blk.ParentWeightNum) - assert.Equal(wDenom, blk.ParentWeightDenom) - assert.Equal(addrs[3], blk.Miner) -} - -func TestGenerateWithoutMessages(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - - ctx := context.Background() - newCid := types.NewCidForTestGetter() - - st, cst, bs, pool, addrs := sharedSetup(t) - - getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { - return st, nil - } - getWeight := func(c context.Context, ts core.TipSet) (uint64, uint64, error) { - num, den, err := ts.ParentWeight() - if err != nil { - return uint64(0), uint64(0), err - } - return num + uint64(int64(len(ts))*int64(core.ECV)), den, nil - } - generator := NewBlockGenerator(pool, getStateTree, getWeight, core.ApplyMessages, &core.TestView{}, bs, cst) - - assert.Len(pool.Pending(), 0) - baseBlock := types.Block{ - Parents: types.NewSortedCidSet(newCid()), - Height: types.Uint64(100), - StateRoot: newCid(), - } - blk, err := generator.Generate(ctx, core.RequireNewTipSet(require, &baseBlock), nil, 0, addrs[3]) - assert.NoError(err) - - assert.Len(pool.Pending(), 0) // This is the temporary failure. - assert.Len(blk.Messages, 1) // This is the mining reward. -} - -// If something goes wrong while generating a new block, even as late as when flushing it, -// no block should be returned, and the message pool should not be pruned. -func TestGenerateError(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - ctx := context.Background() - newCid := types.NewCidForTestGetter() - - st, cst, bs, pool, addrs := sharedSetup(t) - - explodingGetStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { - stt := WrapStateTreeForTest(st) - stt.TestFlush = func(ctx context.Context) (*cid.Cid, error) { - return nil, errors.New("boom no flush") - } - - return stt, nil - } - getWeight := func(c context.Context, ts core.TipSet) (uint64, uint64, error) { - num, den, err := ts.ParentWeight() - if err != nil { - return uint64(0), uint64(0), err - } - return num + uint64(int64(len(ts))*int64(core.ECV)), den, nil - } - - generator := NewBlockGenerator(pool, explodingGetStateTree, getWeight, core.ApplyMessages, &core.TestView{}, bs, cst) - - // This is actually okay and should result in a receipt - msg := types.NewMessage(addrs[0], addrs[1], 0, nil, "", nil) - smsg, err := types.NewSignedMessage(*msg, &mockSigner) - require.NoError(err) - pool.Add(smsg) - - assert.Len(pool.Pending(), 1) - baseBlock := types.Block{ - Parents: types.NewSortedCidSet(newCid()), - Height: types.Uint64(100), - StateRoot: newCid(), - } - baseTipSet := core.RequireNewTipSet(require, &baseBlock) - blk, err := generator.Generate(ctx, baseTipSet, nil, 0, addrs[3]) - assert.Error(err, "boom") - assert.Nil(blk) - - assert.Len(pool.Pending(), 1) // No messages are removed from the pool. -} - -type StateTreeForTest struct { - state.Tree - TestFlush func(ctx context.Context) (*cid.Cid, error) -} - -func WrapStateTreeForTest(st state.Tree) *StateTreeForTest { - stt := StateTreeForTest{ - st, - st.Flush, - } - return &stt -} - -func (st *StateTreeForTest) Flush(ctx context.Context) (*cid.Cid, error) { - return st.TestFlush(ctx) -} diff --git a/mining/scheduler.go b/mining/scheduler.go new file mode 100644 index 0000000000..028d679dfc --- /dev/null +++ b/mining/scheduler.go @@ -0,0 +1,254 @@ +package mining + +// The Scheduler listens for new heaviest TipSets and schedules mining work on +// these TipSets. The scheduler is ultimately responsible for informing the +// rest of the system about new blocks mined by the Worker. This is the +// interface to implement if you want to explore an alternate mining strategy. +// +// The default Scheduler implementation, timingScheduler, is an attempt to +// prevent miners from getting interrupted by attackers strategically releasing +// better base tipsets midway through a proving period. Such attacks are bad +// because they causes miners to waste work. Note that the term 'base tipset', +// or 'mining base' is used to denote the tipset that the miner uses as the +// parent of the block it attempts to generate during mining. +// +// The timingScheduler operates in two states, 'collect', where the scheduler +// listens for new heaviest tipsets to use as the best mining base, and 'ignore', +// where mining proceeds uninterrupted. The scheduler enters the 'collect' state +// each time a new heaviest tipset arrives with a greater height. The +// scheduler finishes the collect state after the mining delay time, a protocol +// parameter, has passed. The scheduler then enters the 'ignore' state. Here +// the scheduler mines, ignoring all inputs with the most recent and lower +// heights. 'ignore' concludes when the scheduler receives an input tipset, +// possibly the tipset consisting of the block the miner just mined, with +// a greater height, and transitions back to collect. It is in miners' +// best interest to wait for the collection period so that they can wait to +// work on a base tipset made up of all blocks mined at the new height. +// +// The current approach is limited. It does not prevent wasted work from all +// strategic block witholding attacks. This is also going to be effected by +// current unknowns surrounding the specifics of the mining protocol (i.e. how +// do VDFs and PoSTs fit into mining, and what is the lookback parameter for +// challenge sampling. For more details see: +// https://gist.github.com/whyrusleeping/4c05fd902f7123bdd1c729e3fffed797 + +import ( + "context" + "sync" + "time" + + "github.com/filecoin-project/go-filecoin/core" +) + +// Scheduler is the mining interface consumers use. When you Start() the +// scheduler it returns two channels (inCh, outCh) and a sync.WaitGroup: +// - inCh: the caller sends Inputs to mine on to this channel. +// - outCh: the scheduler sends Outputs to the caller on this channel. +// - doneWg: signals that the scheduler and any goroutines it launched +// have stopped. (Context cancelation happens async, so you +// need some way to know when it has actually stopped.) +// +// Once Start()ed, the Scheduler can be stopped by canceling its miningCtx, +// which will signal on doneWg when it's actually done. Canceling miningCtx +// cancels any run in progress and shuts the scheduler down. +type Scheduler interface { + Start(miningCtx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) +} + +type timingScheduler struct { + // base tracks the current tipset being mined on. It is only read and + // written from the timingScheduler's main receive loop, i.e. collect + // and ignore. + base Input + // worker contains the actual mining logic. + worker Worker +} + +// mineDelay is the protocol mining delay. The timingScheduler waits for the +// mining delay during its 'collect' state. It's set so low right now to +// facilitate testing. +const mineDelay = time.Millisecond * 20 + +// runWorker launches calls to worker.Mine(). Inputs to worker.Mine() are +// accepted on mineInCh. For each new input on mineInCh, runWorker cancels the +// old call to worker.Mine() and mines on the new input. There is only one +// outstanding call to worker.Mine() at a time. outCh is the channel read by +// the scheduler's caller. Newly mined blocks are sent out on outCh if the +// worker is able to mine a block. Nothing is output over outCh if the worker +// does not mine a block. If there is a mining error it is sent over the +// outCh. +func (s *timingScheduler) runWorker(miningCtx context.Context, outCh chan<- Output, mineInCh <-chan Input, doneWg *sync.WaitGroup) { + defer doneWg.Done() + var currentRunCtx context.Context + currentRunCancel := func() {} + for { + select { + case <-miningCtx.Done(): + currentRunCancel() + return + case input, ok := <-mineInCh: + if !ok { + // sender closed mineCh, close and ignore + mineInCh = nil + continue + } + currentRunCancel() + currentRunCtx, currentRunCancel = context.WithCancel(miningCtx) + doneWg.Add(1) + go func() { + defer doneWg.Done() + s.worker.Mine(currentRunCtx, input, outCh) + }() + } + } +} + +// collect runs for the protocol mining delay "mineDelay" and updates the base +// tipset for mining to the latest tipset read from the input channel. +// If the scheduler should terminate collect() returns true. collect() +// initializes the next round of mining, canceling any previous mining calls +// still running. If the eager flag is set, collect starts mining right away, +// possibly starting and stopping multiple mining jobs. +func (s *timingScheduler) collect(miningCtx context.Context, inCh <-chan Input, mineInCh chan<- Input, eager bool) bool { + delayTimer := time.NewTimer(mineDelay) + for { + select { + case <-miningCtx.Done(): + return true + case <-delayTimer.C: + if !eager { + mineInCh <- s.base + } + return false + case input, ok := <-inCh: + if !ok { + // sender closed inCh, close and ignore + inCh = nil + continue + } + + log.Infof("scheduler receiving new base %s during collect", input.TipSet.String()) + s.base = input + if eager { + mineInCh <- input + } + } + } +} + +// ignore() waits for a new heaviest tipset with a greater height. No new tipsets +// from the current base tipset's height or lower heights are accepted as the +// new mining base. If the scheduler should terminate ignore() returns true. +// The purpose of the ignore state is to de-incentivize strategic lagging of +// block propagation part way through a proving period which can cause +// competing miners to waste work. +// Note: this strategy is limited, it does not prevent witholding tipsets of +// greater heights than the current base or from witholding tipsets from miners +// with a null block mining base. +func (s *timingScheduler) ignore(miningCtx context.Context, inCh <-chan Input) bool { + for { + select { + case <-miningCtx.Done(): + return true + case input, ok := <-inCh: + if !ok { + inCh = nil + continue + } + + // Scheduler begins in ignore state with a nil base. + // This case handles base init on the very first input. + if s.base.TipSet == nil { + s.base = input + return false + } + + curHeight, err := s.base.TipSet.Height() + if err != nil { + panic("this can't be happening") + } + inHeight, err := input.TipSet.Height() + if err != nil { + panic("this can't be happening") + } + + // Newer epoch? Loop back to collect state for new non-null epoch. + if inHeight > curHeight { + log.Infof("scheduler receiving new base %s during ignore, transition to collect", input.TipSet.String()) + s.base = input + return false + } + log.Debugf("scheduler ignoring %s during ignore because height is not greater than height of current base", input.TipSet.String()) + } + } +} + +// Start is the main entrypoint for the timingScheduler. Call it to start +// mining. It returns two channels: an input channel for tipsets and an output +// channel for newly mined blocks. It also returns a waitgroup that will signal +// that all mining runs and auxiliary goroutines have completed. Each tipset +// that is received on the input channel is procssesed by the scheduler. +// Depending on the tipset height and the time the input is received, a new +// input may cancel the context of the previous mining run, or be ignorned. +// Any successfully mined blocks are sent into the output channel. Closing the +// input channel does not cause the scheduler to stop; cancel the miningCtx to +// stop all mining and shut down the scheduler. +func (s *timingScheduler) Start(miningCtx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) { + inCh := make(chan Input) + outCh := make(chan Output) + mineInCh := make(chan Input) + var doneWg sync.WaitGroup // for internal use + var extDoneWg sync.WaitGroup // for external use + + log.Debugf("scheduler starting 'runWorker' goroutine") + doneWg.Add(1) + go s.runWorker(miningCtx, outCh, mineInCh, &doneWg) + + log.Debugf("scheduler starting main receive loop") + doneWg.Add(1) + go func() { + defer doneWg.Done() + // for now keep 'eager' unset. TODO eventually pull from config or CLI flag. + eager := false + // The receive loop. The scheduler operates in two basic states + // collect -- wait for the mining delay to listen for better base tipsets + // ignore -- mine on the best tipset, ignore all tipsets with height <= the height of the current base + for { + if end := s.ignore(miningCtx, inCh); end { + return + } + if end := s.collect(miningCtx, inCh, mineInCh, eager); end { + return + } + } + }() + + // This tear down goroutine waits for all work to be done before closing + // channels. When this goroutine is complete, external code can + // consider the scheduler to be done. + extDoneWg.Add(1) + go func() { + defer extDoneWg.Done() + doneWg.Wait() + close(outCh) + close(mineInCh) + }() + return inCh, outCh, &extDoneWg +} + +// NewScheduler returns a new timingScheduler to schedule mining work on the +// input worker. +func NewScheduler(w Worker) Scheduler { + return &timingScheduler{worker: w} +} + +// MineOnce is a convenience function that presents a synchronous blocking +// interface to the mining scheduler. +func MineOnce(ctx context.Context, s Scheduler, ts core.TipSet) Output { + subCtx, subCtxCancel := context.WithCancel(ctx) + defer subCtxCancel() + + inCh, outCh, _ := s.Start(subCtx) + go func() { inCh <- NewInput(ts) }() + return <-outCh +} diff --git a/mining/scheduler_test.go b/mining/scheduler_test.go new file mode 100644 index 0000000000..1d526a2c37 --- /dev/null +++ b/mining/scheduler_test.go @@ -0,0 +1,197 @@ +package mining + +import ( + "context" + "testing" + "time" + + "github.com/filecoin-project/go-filecoin/core" + "github.com/filecoin-project/go-filecoin/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestUtils(t *testing.T) (*assert.Assertions, *require.Assertions, core.TipSet) { + assert := assert.New(t) + require := require.New(t) + baseBlock := &types.Block{StateRoot: types.SomeCid()} + + ts := core.TipSet{baseBlock.Cid().String(): baseBlock} + return assert, require, ts +} + +// TestMineOnce tests that the MineOnce function results in a mining job being +// scheduled and run by the mining scheduler. +func TestMineOnce(t *testing.T) { + assert, require, ts := newTestUtils(t) + + // Echoes the sent block to output. + worker := NewTestWorkerWithDeps(MakeEchoMine(require)) + scheduler := NewScheduler(worker) + result := MineOnce(context.Background(), scheduler, ts) + assert.NoError(result.Err) + assert.True(ts.ToSlice()[0].StateRoot.Equals(result.NewBlock.StateRoot)) +} + +func TestSchedulerPassesValue(t *testing.T) { + assert, _, ts := newTestUtils(t) + ctx, cancel := context.WithCancel(context.Background()) + + checkValsMine := func(c context.Context, i Input, outCh chan<- Output) { + assert.NotEqual(ctx, c) // individual run ctx splits off from mining ctx + assert.Equal(i.TipSet, ts) + outCh <- Output{} + } + worker := NewTestWorkerWithDeps(checkValsMine) + scheduler := NewScheduler(worker) + inCh, outCh, _ := scheduler.Start(ctx) + inCh <- NewInput(ts) + <-outCh + cancel() +} + +// Test that we can push multiple blocks through. This schedules tipsets +// with successively higher block heights (aka epoch). +func TestSchedulerPassesManyValues(t *testing.T) { + assert, require, ts1 := newTestUtils(t) + ctx, cancel := context.WithCancel(context.Background()) + var checkTS core.TipSet + // make tipsets with progressively higher heights + blk2 := &types.Block{StateRoot: types.SomeCid(), Height: 1} + ts2 := core.RequireNewTipSet(require, blk2) + blk3 := &types.Block{StateRoot: types.SomeCid(), Height: 2} + ts3 := core.RequireNewTipSet(require, blk3) + + checkValsMine := func(c context.Context, i Input, outCh chan<- Output) { + assert.Equal(i.TipSet, checkTS) + outCh <- Output{} + } + worker := NewTestWorkerWithDeps(checkValsMine) + scheduler := NewScheduler(worker) + inCh, outCh, _ := scheduler.Start(ctx) + // Note: inputs have to pass whatever check on newly arriving tipsets + // are in place in Start(). For the (default) timingScheduler tipsets + // need increasing heights. + checkTS = ts1 + inCh <- NewInput(ts1) + <-outCh + checkTS = ts2 + inCh <- NewInput(ts2) + <-outCh + checkTS = ts3 + inCh <- NewInput(ts3) + <-outCh + assert.Equal(ChannelEmpty, ReceiveOutCh(outCh)) + cancel() +} + +// TestSchedulerCollect tests that the scheduler collects tipsets before mining +func TestSchedulerCollect(t *testing.T) { + assert, require, ts1 := newTestUtils(t) + ctx, cancel := context.WithCancel(context.Background()) + blk2 := &types.Block{StateRoot: types.SomeCid(), Height: 1} + ts2 := core.RequireNewTipSet(require, blk2) + blk3 := &types.Block{StateRoot: types.SomeCid(), Height: 1} + ts3 := core.RequireNewTipSet(require, blk3) + checkValsMine := func(c context.Context, i Input, outCh chan<- Output) { + assert.Equal(i.TipSet, ts3) + outCh <- Output{} + } + worker := NewTestWorkerWithDeps(checkValsMine) + scheduler := NewScheduler(worker) + inCh, outCh, _ := scheduler.Start(ctx) + inCh <- NewInput(ts1) + inCh <- NewInput(ts2) + inCh <- NewInput(ts3) // the scheduler will collect the latest input + <-outCh + cancel() +} + +// TestCannotInterruptMiner tests that a tipset from the same epoch, i.e. with +// the same height, does not affect the base tipset for mining once the +// mining delay has finished. +func TestCannotInterruptMiner(t *testing.T) { + assert, require, ts1 := newTestUtils(t) + ctx, cancel := context.WithCancel(context.Background()) + blk1 := ts1.ToSlice()[0] + blk2 := &types.Block{StateRoot: types.SomeCid(), Height: 0} + ts2 := core.RequireNewTipSet(require, blk2) + blockingMine := func(c context.Context, i Input, outCh chan<- Output) { + time.Sleep(mineSleepTime) + assert.Equal(i.TipSet, ts1) + outCh <- Output{NewBlock: blk1} + } + worker := NewTestWorkerWithDeps(blockingMine) + scheduler := NewScheduler(worker) + inCh, outCh, _ := scheduler.Start(ctx) + inCh <- NewInput(ts1) + // Wait until well after the mining delay, and send a new input. + time.Sleep(4 * mineDelay) + inCh <- NewInput(ts2) + out := <-outCh + assert.Equal(out.NewBlock, blk1) + cancel() +} + +func TestSchedulerCancelMiningCtx(t *testing.T) { + assert, _, ts := newTestUtils(t) + // Test that canceling the mining context stops mining, cancels + // the inner context, and closes the output channel. + miningCtx, miningCtxCancel := context.WithCancel(context.Background()) + shouldCancelMine := func(c context.Context, i Input, outCh chan<- Output) { + mineTimer := time.NewTimer(mineSleepTime) + select { + case <-mineTimer.C: + t.Fatal("should not take whole time") + case <-c.Done(): + } + } + worker := NewTestWorkerWithDeps(shouldCancelMine) + scheduler := NewScheduler(worker) + inCh, outCh, doneWg := scheduler.Start(miningCtx) + inCh <- NewInput(ts) + miningCtxCancel() + doneWg.Wait() + assert.Equal(ChannelClosed, ReceiveOutCh(outCh)) +} + +func TestSchedulerMultiRoundWithCollect(t *testing.T) { + assert, require, ts1 := newTestUtils(t) + ctx, cancel := context.WithCancel(context.Background()) + var checkTS core.TipSet + // make tipsets with progressively higher heights + blk2 := &types.Block{StateRoot: types.SomeCid(), Height: 1} + ts2 := core.RequireNewTipSet(require, blk2) + blk3 := &types.Block{StateRoot: types.SomeCid(), Height: 2} + ts3 := core.RequireNewTipSet(require, blk3) + + checkValsMine := func(c context.Context, i Input, outCh chan<- Output) { + assert.Equal(i.TipSet, checkTS) + outCh <- Output{} + } + worker := NewTestWorkerWithDeps(checkValsMine) + scheduler := NewScheduler(worker) + inCh, outCh, doneWg := scheduler.Start(ctx) + // Note: inputs have to pass whatever check on newly arriving tipsets + // are in place in Start(). For the (default) timingScheduler tipsets + // need increasing heights. + checkTS = ts1 + inCh <- NewInput(ts3) + inCh <- NewInput(ts2) + inCh <- NewInput(ts1) + <-outCh + checkTS = ts2 + inCh <- NewInput(ts2) + inCh <- NewInput(ts1) + inCh <- NewInput(ts3) + inCh <- NewInput(ts2) + inCh <- NewInput(ts2) + <-outCh + checkTS = ts3 + inCh <- NewInput(ts3) + <-outCh + assert.Equal(ChannelEmpty, ReceiveOutCh(outCh)) + cancel() + doneWg.Wait() + assert.Equal(ChannelClosed, ReceiveOutCh(outCh)) +} diff --git a/mining/testing.go b/mining/testing.go index 2eb2097c13..2ac729cf74 100644 --- a/mining/testing.go +++ b/mining/testing.go @@ -4,37 +4,53 @@ import ( "context" "sync" - "github.com/filecoin-project/go-filecoin/core" - "github.com/filecoin-project/go-filecoin/types" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) -// MockBlockGenerator is a testify mock for BlockGenerator. -type MockBlockGenerator struct { +// MockScheduler is a mock Scheduler. +type MockScheduler struct { mock.Mock } -var _ BlockGenerator = &MockBlockGenerator{} +// Start is the MockScheduler's Start function. +func (s *MockScheduler) Start(ctx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) { + args := s.Called(ctx) + return args.Get(0).(chan<- Input), args.Get(1).(<-chan Output), args.Get(2).(*sync.WaitGroup) +} + +// TestWorker is a worker with a customizable work function to facilitate +// easy testing. +type TestWorker struct { + WorkFunc func(context.Context, Input, chan<- Output) +} -// Generate is a testify mock implementation. -func (bg *MockBlockGenerator) Generate(ctx context.Context, h core.TipSet, ticket types.Signature, nullBlockCount uint64, m types.Address) (b *types.Block, err error) { - args := bg.Called(ctx, h, nullBlockCount, m) - if args.Get(0) != nil { - b = args.Get(0).(*types.Block) +// Mine is the TestWorker's Work function. It simply calls the WorkFunc +// field. +func (w *TestWorker) Mine(ctx context.Context, input Input, outCh chan<- Output) { + if w.WorkFunc == nil { + panic("must set MutableTestWorker's WorkFunc before calling Work") } - err = args.Error(1) - return + w.WorkFunc(ctx, input, outCh) } -// MockWorker is a mock Worker. -type MockWorker struct { - mock.Mock +// NewTestWorkerWithDeps creates a worker that calls the provided input +// function when Mine() is called. +func NewTestWorkerWithDeps(f func(context.Context, Input, chan<- Output)) *TestWorker { + return &TestWorker{ + WorkFunc: f, + } } -// Start is the MockWorker's Start function. -func (w *MockWorker) Start(ctx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) { - args := w.Called(ctx) - return args.Get(0).(chan<- Input), args.Get(1).(<-chan Output), args.Get(2).(*sync.WaitGroup) +// MakeEchoMine returns a test worker function that itself returns the first +// block of the input tipset as output. +func MakeEchoMine(require *require.Assertions) func(context.Context, Input, chan<- Output) { + echoMine := func(c context.Context, i Input, outCh chan<- Output) { + require.NotEqual(0, len(i.TipSet)) + b := i.TipSet.ToSlice()[0] + outCh <- Output{NewBlock: b} + } + return echoMine } const ( diff --git a/mining/worker.go b/mining/worker.go index 1befbb51ee..29beea44e1 100644 --- a/mining/worker.go +++ b/mining/worker.go @@ -1,40 +1,59 @@ package mining +// The Worker Mines on Input received from a Scheduler. The Worker is +// responsible for generating the necessary proofs, checking for success, +// generating new blocks, and forwarding them out to the wider node. + import ( "bytes" "context" "encoding/binary" "math/big" - "sync" "time" "github.com/filecoin-project/go-filecoin/core" + "github.com/filecoin-project/go-filecoin/state" "github.com/filecoin-project/go-filecoin/types" + "github.com/filecoin-project/go-filecoin/vm" + "gx/ipfs/QmSkuaNgyGmV8c1L3cZNWcUxRJV6J3nsD96JVQPcWcwtyW/go-hamt-ipld" + "gx/ipfs/QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy/errors" sha256 "gx/ipfs/QmXTpwq2AkzQsPjKqFQDNY2bMdsAT53hUBETeyj8QRHTZU/sha256-simd" + "gx/ipfs/QmcD7SqfyQyA91TZUQ7VPRYbGarxmY7EsQewVYMuN5LNSv/go-ipfs-blockstore" + logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log" ) var ( ticketDomain *big.Int + // mineWarnThreshold is the number of seconds of mining after which the worker + // logs a warning that mining is wasting an unexpected amount of work. + mineWarnThreshold float64 + log = logging.Logger("mining") ) +// mineSleepTime is the estimated mining time. We define this so that we can +// fake mining with the current incomplete system. TODO this needs to be +// configurable to expediate both unit and large scale testing. +const mineSleepTime = mineDelay * 30 + func init() { ticketDomain = &big.Int{} ticketDomain.Exp(big.NewInt(2), big.NewInt(256), nil) ticketDomain.Sub(ticketDomain, big.NewInt(1)) + + mineWarnThreshold = mineSleepTime.Seconds() / 2.0 } // Input is the TipSets the worker should mine on, the address // to accrue rewards to, and a context that the caller can use // to cancel this mining run. type Input struct { - Ctx context.Context TipSet core.TipSet } // NewInput instantiates a new Input. -func NewInput(ctx context.Context, ts core.TipSet) Input { - return Input{Ctx: ctx, TipSet: ts} +func NewInput(ts core.TipSet) Input { + return Input{TipSet: ts} } // Output is the result of a single mining run. It has either a new @@ -50,105 +69,57 @@ func NewOutput(b *types.Block, e error) Output { return Output{NewBlock: b, Err: e} } -// AsyncWorker implements the plumbing that drives mining. -type AsyncWorker struct { - blockGenerator BlockGenerator - createPoST DoSomeWorkFunc // TODO: rename createPoSTFunc? - nullBlockTimer NullBlockTimerFunc - minerAddr types.Address // TODO: needs to be a key in the near future -} - -// Worker is the mining interface consumers use. When you Start() a worker -// it returns two channels (inCh, outCh) and a sync.WaitGroup: -// - inCh: caller send Inputs to mine on to this channel -// - outCh: the worker sends Outputs to the caller on this channel -// - doneWg: signals that the worker and any mining runs it launched -// have stopped. (Context cancelation happens async, so you -// need some way to know when it has actually stopped.) -// -// Once Start()ed, the Worker can be stopped by canceling its miningCtx, which -// will signal on doneWg when it's actually done. Canceling an Input.Ctx -// just cancels the run for that input. Canceling miningCtx cancels any run -// in progress and shuts the worker down. +// Worker is the interface called by the Scheduler to run the mining work being +// scheduled. type Worker interface { - Start(miningCtx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) + Mine(runCtx context.Context, input Input, outCh chan<- Output) } -// NewWorker instantiates a new Worker. -func NewWorker(blockGenerator BlockGenerator, miner types.Address) Worker { - return NewWorkerWithDeps(blockGenerator, createPoST, nullBlockTimer, miner) -} +// GetStateTree is a function that gets the aggregate state tree of a TipSet. It's +// its own function to facilitate testing. +type GetStateTree func(context.Context, core.TipSet) (state.Tree, error) -// NewWorkerWithDeps instantiates a new Worker with custom functions. -func NewWorkerWithDeps(blockGenerator BlockGenerator, createPoST DoSomeWorkFunc, nullBlockTimer NullBlockTimerFunc, miner types.Address) Worker { - return &AsyncWorker{ - blockGenerator: blockGenerator, - createPoST: createPoST, - nullBlockTimer: nullBlockTimer, - minerAddr: miner, - } -} +// GetWeight is a function that calculates the weight of a TipSet. Weight is +// expressed as two uint64s comprising a rational number. +type GetWeight func(context.Context, core.TipSet) (uint64, uint64, error) -// MineOnce is a convenience function that presents a synchronous blocking -// interface to the worker. -func MineOnce(ctx context.Context, w Worker, ts core.TipSet) Output { - subCtx, subCtxCancel := context.WithCancel(ctx) - defer subCtxCancel() +type miningApplier func(ctx context.Context, messages []*types.SignedMessage, st state.Tree, vms vm.StorageMap, bh *types.BlockHeight) (core.ApplyMessagesResponse, error) - inCh, outCh, _ := w.Start(subCtx) - go func() { inCh <- NewInput(subCtx, ts) }() - return <-outCh +// DefaultWorker runs a mining job. +type DefaultWorker struct { + createPoST DoSomeWorkFunc // TODO: rename createPoSTFunc + minerAddr types.Address // TODO: needs to be a key in the near future + + // consensus things + getStateTree GetStateTree + getWeight GetWeight + + // core filecoin things + messagePool *core.MessagePool + applyMessages miningApplier + powerTable core.PowerTableView + blockstore blockstore.Blockstore + cstore *hamt.CborIpldStore } -// Start is the main entrypoint for Worker. Call it to start mining. It returns -// two channels: an input channel for blocks and an output channel for results. -// It also returns a waitgroup that will signal that all mining runs have -// completed. Each block that is received on the input channel causes the -// worker to cancel the context of the previous mining run if any and start -// mining on the new block. Any results are sent into its output channel. -// Closing the input channel does not cause the worker to stop; cancel -// the Input.Ctx to cancel an individual mining run or the mininCtx to -// stop all mining and shut down the worker. -// -// TODO A potentially simpler interface here would be for the worker to -// take the input channel from the caller and then shut everything down -// when the input channel is closed. -func (w *AsyncWorker) Start(miningCtx context.Context) (chan<- Input, <-chan Output, *sync.WaitGroup) { - inCh := make(chan Input) - outCh := make(chan Output) - var doneWg sync.WaitGroup - - doneWg.Add(1) - go func() { - defer doneWg.Done() - var currentRunCtx context.Context - var currentRunCancel = func() {} - for { - select { - case <-miningCtx.Done(): - currentRunCancel() - close(outCh) - return - case input, ok := <-inCh: - if ok { - // TODO(EC): implement the mining logic described in the spec here: - // https://github.com/filecoin-project/specs/pull/71/files#diff-a7e9cad7bc42c664eb72d7042276a22fR83 - // specifically: - currentRunCancel() - currentRunCtx, currentRunCancel = context.WithCancel(input.Ctx) - doneWg.Add(1) - go func() { - w.Mine(currentRunCtx, input, outCh) - doneWg.Done() - }() - } else { - // Sender closed the channel. Set it to nil to ignore it. - inCh = nil - } - } - } - }() - return inCh, outCh, &doneWg +// NewDefaultWorker instantiates a new Worker. +func NewDefaultWorker(messagePool *core.MessagePool, getStateTree GetStateTree, getWeight GetWeight, applyMessages miningApplier, powerTable core.PowerTableView, bs blockstore.Blockstore, cst *hamt.CborIpldStore, miner types.Address) *DefaultWorker { + return NewDefaultWorkerWithDeps(messagePool, getStateTree, getWeight, applyMessages, powerTable, bs, cst, miner, createPoST) +} + +// NewDefaultWorkerWithDeps instantiates a new Worker with custom functions. +func NewDefaultWorkerWithDeps(messagePool *core.MessagePool, getStateTree GetStateTree, getWeight GetWeight, applyMessages miningApplier, powerTable core.PowerTableView, bs blockstore.Blockstore, cst *hamt.CborIpldStore, miner types.Address, createPoST DoSomeWorkFunc) *DefaultWorker { + return &DefaultWorker{ + getStateTree: getStateTree, + getWeight: getWeight, + messagePool: messagePool, + applyMessages: applyMessages, + powerTable: powerTable, + blockstore: bs, + cstore: cst, + createPoST: createPoST, + minerAddr: miner, + } } // DoSomeWorkFunc is a dummy function that mimics doing something time-consuming @@ -156,53 +127,60 @@ func (w *AsyncWorker) Start(miningCtx context.Context) (chan<- Input, <-chan Out // is a good idea for now. type DoSomeWorkFunc func() -// NullBlockTimerFunc blocks until it is time to add a null block. -type NullBlockTimerFunc func() - -// Mine does the actual work. It's the implementation of worker.mine. -func (w *AsyncWorker) Mine(ctx context.Context, input Input, outCh chan<- Output) { +// Mine implements the DefaultWorkers main mining function.. +func (w *DefaultWorker) Mine(ctx context.Context, input Input, outCh chan<- Output) { ctx = log.Start(ctx, "Worker.Mine") defer log.Finish(ctx) - + if len(input.TipSet) == 0 { + outCh <- Output{Err: errors.New("bad input tipset with no blocks sent to Mine()")} + return + } // TODO: derive these from actual storage power. - // This means broadening the scope of the State function - // and powerTableView from the generator to the worker. + // This should now be pretty easy because the worker has getState and + // powertable view. + // To fix this and keep mock-mine mode actually generating blocks we'll + // need to update the view to give every miner a little power in the + // network. const myPower = 1 const totalPower = 5 - for nullBlockCount := uint64(0); ; nullBlockCount++ { + for nullBlkCount := uint64(0); ; nullBlkCount++ { + log.Infof("Mining on tipset: %s, with %d null blocks.", input.TipSet.String(), nullBlkCount) + start := time.Now() if ctx.Err() != nil { - break + return } - challenge := createChallenge(input.TipSet, nullBlockCount) - proof := createProof(challenge, createPoST) - ticket := createTicket(proof) + challenge := createChallenge(input.TipSet, nullBlkCount) + prCh := createProof(challenge, w.createPoST) + var ticket []byte + select { + case <-ctx.Done(): + mineTime := time.Since(start) + log.Infof("Mining run on: %s canceled.", input.TipSet.String()) + if mineTime.Seconds() < mineWarnThreshold { + log.Warningf("Abandoning mining after %f seconds. Wasting lots of work...", mineTime.Seconds()) + } + return + case proof := <-prCh: + ticket = createTicket(proof) + } // TODO: Test the interplay of isWinningTicket() and createPoST() if isWinningTicket(ticket, myPower, totalPower) { - next, err := w.blockGenerator.Generate(ctx, input.TipSet, ticket, nullBlockCount, w.minerAddr) + next, err := w.Generate(ctx, input.TipSet, ticket, nullBlkCount) if err == nil { log.SetTag(ctx, "block", next) } - - // TODO(EC): Consider what to do if we have found a winning ticket and are mining with - // it and a new tipset comes in with greater height. Currently Worker.Start() will cancel us. - // We should instead let the successful run proceed unless the context is explicitly canceled. - if ctx.Err() == nil { - outCh <- NewOutput(next, err) - } else { - log.Warningf("Abandoning successfully mined block without publishing: %s", input.TipSet.String()) - } - - break + outCh <- NewOutput(next, err) } - - nullBlockTimer() } } -func createChallenge(parents core.TipSet, nullBlockCount uint64) []byte { +// TODO -- in general this won't work with only the base tipset, we'll potentially +// need some chain manager utils, similar to the State function, to sample +// further back in the chain. +func createChallenge(parents core.TipSet, nullBlkCount uint64) []byte { // Find the smallest ticket from parent set var smallest types.Signature for _, v := range parents { @@ -212,17 +190,22 @@ func createChallenge(parents core.TipSet, nullBlockCount uint64) []byte { } buf := make([]byte, 4) - n := binary.PutUvarint(buf, nullBlockCount) + n := binary.PutUvarint(buf, nullBlkCount) buf = append(smallest, buf[:n]...) h := sha256.Sum256(buf) return h[:] } -func createProof(challenge []byte, createPoST DoSomeWorkFunc) []byte { - // TODO: Actually use the results of the PoST once it is implemented. - createPoST() - return challenge +// TODO: Actually use the results of the PoST once it is implemented. +// Currently createProof just passes the challenge value through. +func createProof(challenge []byte, createPoST DoSomeWorkFunc) <-chan []byte { + c := make(chan []byte) + go func() { + createPoST() // TODO send new PoST on channel once we can create it + c <- challenge + }() + return c } func createTicket(proof []byte) []byte { @@ -243,16 +226,8 @@ var isWinningTicket = func(ticket []byte, myPower, totalPower int64) bool { return lhs.Cmp(rhs) < 0 } -// How long the node's mining Worker should sleep to simulate mining. -const mineSleepTime = time.Millisecond * 10 - // createPoST is the default implementation of DoSomeWorkFunc. Contrary to the // advertisement, it doesn't do anything yet. func createPoST() { time.Sleep(mineSleepTime) } - -// nullBlockTimer is the default implementation of NullBlockTimerFunc. -func nullBlockTimer() { - time.Sleep(mineSleepTime) -} diff --git a/mining/worker_test.go b/mining/worker_test.go index 0863d1506d..8f0740c435 100644 --- a/mining/worker_test.go +++ b/mining/worker_test.go @@ -1,221 +1,78 @@ package mining import ( - //"context" + "context" "encoding/hex" - //"errors" + "errors" "testing" + "github.com/filecoin-project/go-filecoin/address" "github.com/filecoin-project/go-filecoin/core" + "github.com/filecoin-project/go-filecoin/state" "github.com/filecoin-project/go-filecoin/types" - //"github.com/filecoin-project/go-filecoin/util/swap" "github.com/stretchr/testify/assert" - //"github.com/stretchr/testify/mock" - //"github.com/stretchr/testify/require" + "github.com/stretchr/testify/require" + "gx/ipfs/QmSkuaNgyGmV8c1L3cZNWcUxRJV6J3nsD96JVQPcWcwtyW/go-hamt-ipld" sha256 "gx/ipfs/QmXTpwq2AkzQsPjKqFQDNY2bMdsAT53hUBETeyj8QRHTZU/sha256-simd" + "gx/ipfs/QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP/go-cid" + "gx/ipfs/QmcD7SqfyQyA91TZUQ7VPRYbGarxmY7EsQewVYMuN5LNSv/go-ipfs-blockstore" + "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore" ) -/* -func TestMineOnce(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - mockBg := &MockBlockGenerator{} - baseBlock := &types.Block{StateRoot: types.SomeCid()} - tipSet := core.TipSet{baseBlock.Cid().String(): baseBlock} - newAddr := types.NewAddressForTestGetter() - rewardAddr := newAddr() - miningAddr := newAddr() - - var mineCtx context.Context - // Echoes the sent block to output. - echoMine := func(c context.Context, i Input, _ NullBlockTimerFunc, bg BlockGenerator, doSomeWork DoSomeWorkFunc, outCh chan<- Output) { - mineCtx = c - require.Equal(1, len(i.TipSet)) - b := i.TipSet.ToSlice()[0] - outCh <- Output{NewBlock: b} - } - worker := NewWorkerWithDeps(mockBg, echoMine, func() {}, nullBlockImmediately) - result := MineOnce(context.Background(), worker, tipSet, rewardAddr, miningAddr) - assert.NoError(result.Err) - assert.True(baseBlock.StateRoot.Equals(result.NewBlock.StateRoot)) - assert.Error(mineCtx.Err()) -} - -func TestWorker_Start(t *testing.T) { +func Test_Mine(t *testing.T) { assert := assert.New(t) require := require.New(t) newCid := types.NewCidForTestGetter() - baseBlock := &types.Block{StateRoot: newCid()} - tipSet := core.TipSet{baseBlock.Cid().String(): baseBlock} - mockBg := &MockBlockGenerator{} - newAddr := types.NewAddressForTestGetter() - rewardAddr := newAddr() - miningAddr := newAddr() - - // Test that values are passed faithfully. + stateRoot := newCid() + baseBlock := &types.Block{Height: 2, StateRoot: stateRoot} + tipSet := core.RequireNewTipSet(require, baseBlock) ctx, cancel := context.WithCancel(context.Background()) - doSomeWorkCalled := false - doSomeWork := func() { doSomeWorkCalled = true } - mineCalled := false - fakeMine := func(c context.Context, i Input, _ NullBlockTimerFunc, bg BlockGenerator, doSomeWork DoSomeWorkFunc, outCh chan<- Output) { - mineCalled = true - assert.NotEqual(ctx, c) - require.Equal(1, len(i.TipSet)) - b := i.TipSet.ToSlice()[0] - assert.True(baseBlock.StateRoot.Equals(b.StateRoot)) - assert.Equal(mockBg, bg) - doSomeWork() - outCh <- Output{} - } - worker := NewWorkerWithDeps(mockBg, fakeMine, doSomeWork, nullBlockImmediately) - inCh, outCh, _ := worker.Start(ctx) - inCh <- NewInput(context.Background(), tipSet, rewardAddr, miningAddr) - <-outCh - assert.True(mineCalled) - assert.True(doSomeWorkCalled) - cancel() - // Test that multi-block tipsets are passed faithfully - mineCalled = false - ctx, cancel = context.WithCancel(context.Background()) - tipSet = core.RequireNewTipSet(require, []*types.Block{{StateRoot: newCid()}, {StateRoot: newCid()}}...) - fakeMine = func(c context.Context, i Input, _ NullBlockTimerFunc, bg BlockGenerator, doSomeWork DoSomeWorkFunc, outCh chan<- Output) { - mineCalled = true - require.Equal(2, len(i.TipSet)) - tipSetMined := i.TipSet - assert.Equal(tipSet, tipSetMined) - outCh <- Output{} + st, pool, addrs, cst, bs := sharedSetup(t) + getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { + return st, nil } - worker = NewWorkerWithDeps(mockBg, fakeMine, func() {}, nullBlockImmediately) - inCh, outCh, _ = worker.Start(ctx) - inCh <- NewInput(context.Background(), tipSet, rewardAddr, miningAddr) - <-outCh - assert.True(mineCalled) - cancel() - // Test that we can push multiple blocks through. There was an actual bug - // where multiply queued inputs were not all processed. - ctx, cancel = context.WithCancel(context.Background()) - fakeMine = func(c context.Context, i Input, _ NullBlockTimerFunc, bg BlockGenerator, doSomeWork DoSomeWorkFunc, outCh chan<- Output) { - outCh <- Output{} - } - worker = NewWorkerWithDeps(mockBg, fakeMine, func() {}, nullBlockImmediately) - inCh, outCh, _ = worker.Start(ctx) - // Note: inputs have to pass whatever check on newly arriving tipsets - // are in place in Start(). - inCh <- NewInput(context.Background(), tipSet, rewardAddr, miningAddr) - inCh <- NewInput(context.Background(), tipSet, rewardAddr, miningAddr) - inCh <- NewInput(context.Background(), tipSet, rewardAddr, miningAddr) - <-outCh - <-outCh - <-outCh - assert.Equal(ChannelEmpty, ReceiveOutCh(outCh)) - cancel() // Makes vet happy. - - // Test that canceling the Input.Ctx cancels that input's mining run. - miningCtx, miningCtxCancel := context.WithCancel(context.Background()) - inputCtx, inputCtxCancel := context.WithCancel(context.Background()) - var gotMineCtx context.Context - fakeMine = func(c context.Context, i Input, _ NullBlockTimerFunc, bg BlockGenerator, doSomeWork DoSomeWorkFunc, outCh chan<- Output) { - gotMineCtx = c - outCh <- Output{} - } - worker = NewWorkerWithDeps(mockBg, fakeMine, func() {}, nullBlockImmediately) - inCh, outCh, _ = worker.Start(miningCtx) - inCh <- NewInput(inputCtx, tipSet, rewardAddr, miningAddr) - <-outCh - inputCtxCancel() - assert.Error(gotMineCtx.Err()) // Same context as miningRunCtx. - assert.NoError(miningCtx.Err()) - miningCtxCancel() // Make vet happy. - - // Test that canceling the mining context stops mining, cancels - // the inner context, and closes the output channel. - miningCtx, miningCtxCancel = context.WithCancel(context.Background()) - inputCtx, inputCtxCancel = context.WithCancel(context.Background()) - gotMineCtx = context.Background() - fakeMine = func(c context.Context, i Input, _ NullBlockTimerFunc, bg BlockGenerator, doSomeWork DoSomeWorkFunc, outCh chan<- Output) { - gotMineCtx = c - outCh <- Output{} - } - worker = NewWorkerWithDeps(mockBg, fakeMine, func() {}, nullBlockImmediately) - inCh, outCh, doneWg := worker.Start(miningCtx) - inCh <- NewInput(inputCtx, tipSet, rewardAddr, miningAddr) - <-outCh - miningCtxCancel() - doneWg.Wait() - assert.Equal(ChannelClosed, ReceiveOutCh(outCh)) - assert.Error(gotMineCtx.Err()) - inputCtxCancel() // Make vet happy. -} - -func Test_mine(t *testing.T) { - assert := assert.New(t) - baseBlock := &types.Block{Height: 2} - tipSet := core.TipSet{baseBlock.Cid().String(): baseBlock} - newAddr := types.NewAddressForTestGetter() - addr := newAddr() - miningAddr := newAddr() - next := &types.Block{Height: 3} - ctx := context.Background() - - // Success. - mockBg := &MockBlockGenerator{} + // Success case. TODO: this case isn't testing much. Testing w.Mine + // further needs a lot more attention. + worker := NewDefaultWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) outCh := make(chan Output) - mockBg.On("Generate", mock.Anything, tipSet, uint64(0), addr, miningAddr).Return(next, nil) doSomeWorkCalled := false - input := NewInput(ctx, tipSet) - go Mine(ctx, input, nullBlockImmediately, mockBg, func() { doSomeWorkCalled = true }, outCh) + worker.createPoST = func() { doSomeWorkCalled = true } + input := NewInput(tipSet) + go worker.Mine(ctx, input, outCh) r := <-outCh assert.NoError(r.Err) assert.True(doSomeWorkCalled) - assert.True(r.NewBlock.Cid().Equals(next.Cid())) - mockBg.AssertExpectations(t) + cancel() // Block generation fails. - mockBg = &MockBlockGenerator{} + ctx, cancel = context.WithCancel(context.Background()) + worker = NewDefaultWorker(pool, makeExplodingGetStateTree(st), getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) outCh = make(chan Output) - mockBg.On("Generate", mock.Anything, tipSet, uint64(0), addr, miningAddr).Return(nil, errors.New("boom")) doSomeWorkCalled = false - input = NewInput(ctx, tipSet, addr, miningAddr) - go Mine(ctx, input, nullBlockImmediately, mockBg, func() { doSomeWorkCalled = true }, outCh) + worker.createPoST = func() { doSomeWorkCalled = true } + input = NewInput(tipSet) + go worker.Mine(ctx, input, outCh) r = <-outCh assert.Error(r.Err) assert.True(doSomeWorkCalled) - mockBg.AssertExpectations(t) + cancel() - // Null block count is increased until we find a winning ticket. - mockBg = &MockBlockGenerator{} + // Sent empty tipset + ctx, cancel = context.WithCancel(context.Background()) + worker = NewDefaultWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) outCh = make(chan Output) - mockBg.On("Generate", mock.Anything, tipSet, uint64(2), addr, miningAddr).Return(next, nil) - workCount := 0 - input = NewInput(ctx, tipSet, addr, miningAddr) - (func() { - defer swap.Swap(&isWinningTicket, everyThirdWinningTicket())() - go Mine(ctx, input, nullBlockImmediately, mockBg, func() { workCount++ }, outCh) - r = <-outCh - })() - assert.NoError(r.Err) - assert.Equal(3, workCount) - assert.True(r.NewBlock.Cid().Equals(next.Cid())) - mockBg.AssertExpectations(t) -} - -// Implements NullBlockTimerFunc with the policy that it is always a good time -// to create a null block. -func nullBlockImmediately() { -} - -// Returns a ticket checking function that return true every third time -func everyThirdWinningTicket() func(_ []byte, _, _ int64) bool { - count := 0 - return func(_ []byte, _, _ int64) bool { - count++ - return count%3 == 0 - } + doSomeWorkCalled = false + worker.createPoST = func() { doSomeWorkCalled = true } + input = NewInput(core.TipSet{}) + go worker.Mine(ctx, input, outCh) + r = <-outCh + assert.Error(r.Err) + assert.False(doSomeWorkCalled) + cancel() } -*/ func TestIsWinningTicket(t *testing.T) { assert := assert.New(t) @@ -248,6 +105,7 @@ func TestIsWinningTicket(t *testing.T) { } } +// worker test func TestCreateChallenge(t *testing.T) { assert := assert.New(t) @@ -279,3 +137,328 @@ func TestCreateChallenge(t *testing.T) { assert.Equal(decoded, r) } } + +var seed = types.GenerateKeyInfoSeed() +var ki = types.MustGenerateKeyInfo(10, seed) +var mockSigner = types.NewMockSigner(ki) + +func TestGenerate(t *testing.T) { + // TODO use core.FakeActor for state/contract tests for generate: + // - test nonces out of order + // - test nonce gap +} + +func sharedSetupInitial() (*hamt.CborIpldStore, *core.MessagePool, *cid.Cid) { + cst := hamt.NewCborStore() + pool := core.NewMessagePool() + // Install the fake actor so we can execute it. + fakeActorCodeCid := types.AccountActorCodeCid + return cst, pool, fakeActorCodeCid +} + +func sharedSetup(t *testing.T) (state.Tree, *core.MessagePool, []types.Address, *hamt.CborIpldStore, blockstore.Blockstore) { + require := require.New(t) + cst, pool, fakeActorCodeCid := sharedSetupInitial() + vms := core.VMStorage() + d := datastore.NewMapDatastore() + bs := blockstore.NewBlockstore(d) + + // TODO: We don't need fake actors here, so these could be made real. + // And the NetworkAddress actor can/should be the real one. + // Stick two fake actors in the state tree so they can talk. + addr1, addr2, addr3, addr4, addr5 := mockSigner.Addresses[0], mockSigner.Addresses[1], mockSigner.Addresses[2], mockSigner.Addresses[3], mockSigner.Addresses[4] + act1 := core.RequireNewFakeActor(require, vms, addr1, fakeActorCodeCid) + act2 := core.RequireNewFakeActor(require, vms, addr2, fakeActorCodeCid) + fakeNetAct := core.RequireNewFakeActor(require, vms, addr3, fakeActorCodeCid) + minerAct := core.RequireNewMinerActor(require, vms, addr4, addr5, []byte{}, types.NewBytesAmount(10000), core.RequireRandomPeerID(), types.NewAttoFILFromFIL(10000)) + minerOwner := core.RequireNewFakeActor(require, vms, addr5, fakeActorCodeCid) + _, st := core.RequireMakeStateTree(require, cst, map[types.Address]*types.Actor{ + // Ensure core.NetworkAddress exists to prevent mining reward message failures. + address.NetworkAddress: fakeNetAct, + addr1: act1, + addr2: act2, + addr4: minerAct, + addr5: minerOwner, + }) + return st, pool, []types.Address{addr1, addr2, addr3, addr4, addr5}, cst, bs +} + +// TODO this test belongs in core, it calls ApplyMessages +func TestApplyMessagesForSuccessTempAndPermFailures(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + vms := core.VMStorage() + + cst, _, fakeActorCodeCid := sharedSetupInitial() + + // Stick two fake actors in the state tree so they can talk. + addr1, addr2 := mockSigner.Addresses[0], mockSigner.Addresses[1] + act1 := core.RequireNewFakeActor(require, vms, addr1, fakeActorCodeCid) + _, st := core.RequireMakeStateTree(require, cst, map[types.Address]*types.Actor{ + addr1: act1, + }) + + ctx := context.Background() + + // NOTE: it is important that each category (success, temporary failure, permanent failure) is represented below. + // If a given message's category changes in the future, it needs to be replaced here in tests by another so we fully + // exercise the categorization. + // addr2 doesn't correspond to an extant account, so this will trigger errAccountNotFound -- a temporary failure. + msg1 := types.NewMessage(addr2, addr1, 0, nil, "", nil) + smsg1, err := types.NewSignedMessage(*msg1, &mockSigner) + require.NoError(err) + + // This is actually okay and should result in a receipt + msg2 := types.NewMessage(addr1, addr2, 0, nil, "", nil) + smsg2, err := types.NewSignedMessage(*msg2, &mockSigner) + require.NoError(err) + + // The following two are sending to self -- errSelfSend, a permanent error. + msg3 := types.NewMessage(addr1, addr1, 1, nil, "", nil) + smsg3, err := types.NewSignedMessage(*msg3, &mockSigner) + require.NoError(err) + + msg4 := types.NewMessage(addr2, addr2, 1, nil, "", nil) + smsg4, err := types.NewSignedMessage(*msg4, &mockSigner) + require.NoError(err) + + messages := []*types.SignedMessage{smsg1, smsg2, smsg3, smsg4} + + res, err := core.ApplyMessages(ctx, messages, st, vms, types.NewBlockHeight(0)) + + assert.Len(res.PermanentFailures, 2) + assert.Contains(res.PermanentFailures, smsg3) + assert.Contains(res.PermanentFailures, smsg4) + + assert.Len(res.TemporaryFailures, 1) + assert.Contains(res.TemporaryFailures, smsg1) + + assert.Len(res.Results, 1) + assert.Contains(res.SuccessfulMessages, smsg2) + + assert.NoError(err) +} + +func TestGenerateMultiBlockTipSet(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + ctx := context.Background() + newCid := types.NewCidForTestGetter() + st, pool, addrs, cst, bs := sharedSetup(t) + getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { + return st, nil + } + worker := NewDefaultWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + + parents := types.NewSortedCidSet(newCid()) + stateRoot := newCid() + baseBlock1 := types.Block{ + Parents: parents, + Height: types.Uint64(100), + ParentWeightNum: types.Uint64(1000), + StateRoot: stateRoot, + } + baseBlock2 := types.Block{ + Parents: parents, + Height: types.Uint64(100), + ParentWeightNum: types.Uint64(1000), + StateRoot: stateRoot, + Nonce: 1, + } + blk, err := worker.Generate(ctx, core.RequireNewTipSet(require, &baseBlock1, &baseBlock2), nil, 0) + assert.NoError(err) + + assert.Len(blk.Messages, 1) // This is the mining reward. + assert.Equal(types.Uint64(101), blk.Height) + assert.Equal(types.Uint64(1020), blk.ParentWeightNum) +} + +// After calling Generate, do the new block and new state of the message pool conform to our expectations? +func TestGeneratePoolBlockResults(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + ctx := context.Background() + newCid := types.NewCidForTestGetter() + st, pool, addrs, cst, bs := sharedSetup(t) + + getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { + return st, nil + } + worker := NewDefaultWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + + // addr3 doesn't correspond to an extant account, so this will trigger errAccountNotFound -- a temporary failure. + msg1 := types.NewMessage(addrs[2], addrs[0], 0, nil, "", nil) + smsg1, err := types.NewSignedMessage(*msg1, &mockSigner) + require.NoError(err) + + // This is actually okay and should result in a receipt + msg2 := types.NewMessage(addrs[0], addrs[1], 0, nil, "", nil) + smsg2, err := types.NewSignedMessage(*msg2, &mockSigner) + require.NoError(err) + + // The following two are sending to self -- errSelfSend, a permanent error. + msg3 := types.NewMessage(addrs[0], addrs[0], 1, nil, "", nil) + smsg3, err := types.NewSignedMessage(*msg3, &mockSigner) + require.NoError(err) + + msg4 := types.NewMessage(addrs[1], addrs[1], 0, nil, "", nil) + smsg4, err := types.NewSignedMessage(*msg4, &mockSigner) + require.NoError(err) + + pool.Add(smsg1) + pool.Add(smsg2) + pool.Add(smsg3) + pool.Add(smsg4) + + assert.Len(pool.Pending(), 4) + baseBlock := types.Block{ + Parents: types.NewSortedCidSet(newCid()), + Height: types.Uint64(100), + StateRoot: newCid(), + } + blk, err := worker.Generate(ctx, core.RequireNewTipSet(require, &baseBlock), nil, 0) + assert.NoError(err) + + assert.Len(pool.Pending(), 1) // This is the temporary failure. + assert.Contains(pool.Pending(), smsg1) + + assert.Len(blk.Messages, 2) // This is the good message + the mining reward. + + // Is the mining reward first? This will fail 50% of the time if we don't force the reward to come first. + assert.Equal(address.NetworkAddress, blk.Messages[0].From) +} + +func TestGenerateSetsBasicFields(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + ctx := context.Background() + newCid := types.NewCidForTestGetter() + + st, pool, addrs, cst, bs := sharedSetup(t) + + getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { + return st, nil + } + worker := NewDefaultWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + + h := types.Uint64(100) + wNum := types.Uint64(1000) + wDenom := types.Uint64(1) + baseBlock := types.Block{ + Height: h, + ParentWeightNum: wNum, + ParentWeightDenom: wDenom, + StateRoot: newCid(), + } + baseTipSet := core.RequireNewTipSet(require, &baseBlock) + blk, err := worker.Generate(ctx, baseTipSet, nil, 0) + assert.NoError(err) + + assert.Equal(h+1, blk.Height) + assert.Equal(addrs[3], blk.Miner) + + blk, err = worker.Generate(ctx, baseTipSet, nil, 1) + assert.NoError(err) + + assert.Equal(h+2, blk.Height) + assert.Equal(wNum+10.0, blk.ParentWeightNum) + assert.Equal(wDenom, blk.ParentWeightDenom) + assert.Equal(addrs[3], blk.Miner) +} + +func TestGenerateWithoutMessages(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + ctx := context.Background() + newCid := types.NewCidForTestGetter() + + st, pool, addrs, cst, bs := sharedSetup(t) + getStateTree := func(c context.Context, ts core.TipSet) (state.Tree, error) { + return st, nil + } + worker := NewDefaultWorker(pool, getStateTree, getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + + assert.Len(pool.Pending(), 0) + baseBlock := types.Block{ + Parents: types.NewSortedCidSet(newCid()), + Height: types.Uint64(100), + StateRoot: newCid(), + } + blk, err := worker.Generate(ctx, core.RequireNewTipSet(require, &baseBlock), nil, 0) + assert.NoError(err) + + assert.Len(pool.Pending(), 0) // This is the temporary failure. + assert.Len(blk.Messages, 1) // This is the mining reward. +} + +// If something goes wrong while generating a new block, even as late as when flushing it, +// no block should be returned, and the message pool should not be pruned. +func TestGenerateError(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + ctx := context.Background() + newCid := types.NewCidForTestGetter() + + st, pool, addrs, cst, bs := sharedSetup(t) + + worker := NewDefaultWorker(pool, makeExplodingGetStateTree(st), getWeightTest, core.ApplyMessages, &core.TestView{}, bs, cst, addrs[3]) + + // This is actually okay and should result in a receipt + msg := types.NewMessage(addrs[0], addrs[1], 0, nil, "", nil) + smsg, err := types.NewSignedMessage(*msg, &mockSigner) + require.NoError(err) + pool.Add(smsg) + + assert.Len(pool.Pending(), 1) + baseBlock := types.Block{ + Parents: types.NewSortedCidSet(newCid()), + Height: types.Uint64(100), + StateRoot: newCid(), + } + baseTipSet := core.RequireNewTipSet(require, &baseBlock) + blk, err := worker.Generate(ctx, baseTipSet, nil, 0) + assert.Error(err, "boom") + assert.Nil(blk) + + assert.Len(pool.Pending(), 1) // No messages are removed from the pool. +} + +type StateTreeForTest struct { + state.Tree + TestFlush func(ctx context.Context) (*cid.Cid, error) +} + +func WrapStateTreeForTest(st state.Tree) *StateTreeForTest { + stt := StateTreeForTest{ + st, + st.Flush, + } + return &stt +} + +func (st *StateTreeForTest) Flush(ctx context.Context) (*cid.Cid, error) { + return st.TestFlush(ctx) +} + +func getWeightTest(c context.Context, ts core.TipSet) (uint64, uint64, error) { + num, den, err := ts.ParentWeight() + if err != nil { + return uint64(0), uint64(0), err + } + return num + uint64(int64(len(ts))*int64(core.ECV)), den, nil +} + +func makeExplodingGetStateTree(st state.Tree) func(context.Context, core.TipSet) (state.Tree, error) { + return func(c context.Context, ts core.TipSet) (state.Tree, error) { + stt := WrapStateTreeForTest(st) + stt.TestFlush = func(ctx context.Context) (*cid.Cid, error) { + return nil, errors.New("boom no flush") + } + + return stt, nil + } +} diff --git a/node/node.go b/node/node.go index 5cf624e21d..8e9a75bf2e 100644 --- a/node/node.go +++ b/node/node.go @@ -64,8 +64,8 @@ type Node struct { Wallet *wallet.Wallet // Mining stuff. - MiningWorker mining.Worker - mining struct { + MiningScheduler mining.Scheduler + mining struct { sync.Mutex isMining bool } @@ -324,7 +324,9 @@ func (node *Node) handleNewMiningOutput(miningOutCh <-chan mining.Output) { } func (node *Node) handleNewHeaviestTipSet(ctx context.Context, head core.TipSet) { + currentBestCancel := func() {} for ts := range node.HeaviestTipSetCh { + var currentBestCtx context.Context newHead := ts.(core.TipSet) if len(newHead) == 0 { log.Error("TipSet of size 0 published on HeaviestTipSetCh:") @@ -338,19 +340,25 @@ func (node *Node) handleNewHeaviestTipSet(ctx context.Context, head core.TipSet) } head = newHead if node.isMining() { - + // The miningInCh should accept tipsets with strictly increasing + // weight, so cancel the previous input. + currentBestCancel() + currentBestCtx, currentBestCancel = context.WithCancel(context.Background()) node.miningDoneWg.Add(1) go func() { defer func() { node.miningDoneWg.Done() }() select { - case <-node.miningCtx.Done(): + case <-node.miningCtx.Done(): // mining is done + return + case <-currentBestCtx.Done(): // this input is no longer the heaviest return - case node.miningInCh <- mining.NewInput(context.Background(), head): + case node.miningInCh <- mining.NewInput(head): } }() } node.HeaviestTipSetHandled() } + currentBestCancel() // keep the linter happy } func (node *Node) cancelSubscriptions() { @@ -432,15 +440,14 @@ func (node *Node) StartMining() error { return err } - if node.MiningWorker == nil { - blockGenerator := mining.NewBlockGenerator(node.MsgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { + if node.MiningScheduler == nil { + getStateTree := func(ctx context.Context, ts core.TipSet) (state.Tree, error) { return node.ChainMgr.State(ctx, ts.ToSlice()) - }, node.ChainMgr.Weight, core.ApplyMessages, node.ChainMgr.PwrTableView, node.Blockstore, node.CborStore) - - node.MiningWorker = mining.NewWorker(blockGenerator, miningAddress) - + } + worker := mining.NewDefaultWorker(node.MsgPool, getStateTree, node.ChainMgr.Weight, core.ApplyMessages, node.ChainMgr.PwrTableView, node.Blockstore, node.CborStore, miningAddress) + node.MiningScheduler = mining.NewScheduler(worker) node.miningCtx, node.cancelMining = context.WithCancel(context.Background()) - inCh, outCh, doneWg := node.MiningWorker.Start(node.miningCtx) + inCh, outCh, doneWg := node.MiningScheduler.Start(node.miningCtx) node.miningInCh = inCh node.miningDoneWg = doneWg node.AddNewlyMinedBlock = node.addNewlyMinedBlock @@ -456,15 +463,14 @@ func (node *Node) StartMining() error { node.miningDoneWg.Add(1) go func() { defer func() { node.miningDoneWg.Done() }() - // TODO(EC): Here is where we kick mining off when we start off. Will - // need to change to pass in best tipsets, of which there can be multiple. hts := node.ChainMgr.GetHeaviestTipSet() select { case <-node.miningCtx.Done(): return - case node.miningInCh <- mining.NewInput(context.Background(), hts): + case node.miningInCh <- mining.NewInput(hts): } }() + return nil } @@ -483,7 +489,6 @@ func (node *Node) initSectorBuilder(minerAddr types.Address) error { // StopMining stops mining on new blocks. func (node *Node) StopMining() { - // TODO should probably also keep track of and cancel last mining.Input.Ctx. node.setIsMining(false) } diff --git a/node/node_test.go b/node/node_test.go index b2b0183d42..65062de35a 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -145,14 +145,15 @@ func TestNodeMining(t *testing.T) { node := MakeNodesUnstarted(t, 1, true, true)[0] - mockWorker := &mining.MockWorker{} - inCh, outCh, doneWg := make(chan mining.Input), make(chan mining.Output), &sync.WaitGroup{} + mockScheduler := &mining.MockScheduler{} + inCh, outCh, doneWg := make(chan mining.Input), make(chan mining.Output), new(sync.WaitGroup) // Apparently you have to have exact types for testify.mock, so // we use iCh and oCh for the specific return type of Start(). var iCh chan<- mining.Input = inCh var oCh <-chan mining.Output = outCh - mockWorker.On("Start", mock.Anything).Return(iCh, oCh, doneWg) - node.MiningWorker = mockWorker + + mockScheduler.On("Start", mock.Anything).Return(iCh, oCh, doneWg) + node.MiningScheduler = mockScheduler // TODO: this is horrible, this setup needs to be a lot less dependent of the inner workings of the node!! node.miningCtx, node.cancelMining = context.WithCancel(ctx) node.miningInCh = inCh @@ -206,12 +207,12 @@ func TestNodeMining(t *testing.T) { // Ensure that the output is wired up correctly. node = MakeNodesUnstarted(t, 1, true, true)[0] - mockWorker = &mining.MockWorker{} + mockScheduler = &mining.MockScheduler{} inCh, outCh, doneWg = make(chan mining.Input), make(chan mining.Output), new(sync.WaitGroup) iCh = inCh oCh = outCh - mockWorker.On("Start", mock.Anything).Return(iCh, oCh, doneWg) - node.MiningWorker = mockWorker + mockScheduler.On("Start", mock.Anything).Return(iCh, oCh, doneWg) + node.MiningScheduler = mockScheduler node.miningCtx, node.cancelMining = context.WithCancel(ctx) node.miningInCh = inCh node.miningDoneWg = doneWg diff --git a/node/testing.go b/node/testing.go index 1b895db7cc..7d5b2bcb19 100644 --- a/node/testing.go +++ b/node/testing.go @@ -116,12 +116,12 @@ func RunCreateMiner(t *testing.T, node *Node, from types.Address, pledge types.B // wait for create miner call to put a message in the pool _, err = subscription.Next(ctx) require.NoError(err) - - blockGenerator := mining.NewBlockGenerator(node.MsgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { + getStateTree := func(ctx context.Context, ts core.TipSet) (state.Tree, error) { return node.ChainMgr.State(ctx, ts.ToSlice()) - }, node.ChainMgr.Weight, core.ApplyMessages, node.ChainMgr.PwrTableView, node.Blockstore, node.CborStore) + } + w := mining.NewDefaultWorker(node.MsgPool, getStateTree, node.ChainMgr.Weight, core.ApplyMessages, node.ChainMgr.PwrTableView, node.Blockstore, node.CborStore, address.TestAddress) cur := node.ChainMgr.GetHeaviestTipSet() - out := mining.MineOnce(ctx, mining.NewWorker(blockGenerator, address.TestAddress), cur) + out := mining.MineOnce(ctx, mining.NewScheduler(w), cur) require.NoError(out.Err) require.NoError(node.ChainMgr.SetHeaviestTipSetForTest(ctx, core.RequireNewTipSet(require, out.NewBlock))) diff --git a/tools/go-fakecoin/main.go b/tools/go-fakecoin/main.go index 84b124e5eb..44ce949d72 100644 --- a/tools/go-fakecoin/main.go +++ b/tools/go-fakecoin/main.go @@ -108,10 +108,11 @@ func getChainManager(d repo.Datastore, bs blockstore.Blockstore) (*core.ChainMan return cm, cst } -func getBlockGenerator(msgPool *core.MessagePool, cm *core.ChainManager, cst *hamt.CborIpldStore, bs blockstore.Blockstore) mining.BlockGenerator { - return mining.NewBlockGenerator(msgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { +func getWorker(msgPool *core.MessagePool, cm *core.ChainManager, cst *hamt.CborIpldStore, bs blockstore.Blockstore) *mining.DefaultWorker { + ma := types.MakeTestAddress("miningAddress") + return mining.NewDefaultWorker(msgPool, func(ctx context.Context, ts core.TipSet) (state.Tree, error) { return cm.State(ctx, ts.ToSlice()) - }, cm.Weight, core.ApplyMessages, cm.PwrTableView, bs, cst) + }, cm.Weight, core.ApplyMessages, cm.PwrTableView, bs, cst, ma) } func getStateTree(ctx context.Context, d repo.Datastore, bs blockstore.Blockstore) (state.Tree, *hamt.CborIpldStore, *core.ChainManager, core.TipSet, error) { @@ -257,15 +258,15 @@ func fakeActors(ctx context.Context, cst *hamt.CborIpldStore, cm *core.ChainMana } func mineBlock(ctx context.Context, mp *core.MessagePool, cst *hamt.CborIpldStore, cm *core.ChainManager, bs blockstore.Blockstore, blks []*types.Block) (*types.Block, error) { - bg := getBlockGenerator(mp, cm, cst, bs) - ma := types.MakeTestAddress("miningaddress") + mw := getWorker(mp, cm, cst, bs) const nullBlockCount = 0 ts, err := core.NewTipSet(blks...) if err != nil { return nil, err } - blk, err := bg.Generate(ctx, ts, nil, nullBlockCount, ma) + blk, err := mw.Generate(ctx, ts, nil, nullBlockCount) + if err != nil { return nil, err }