diff --git a/core/blockchain.go b/core/blockchain.go index 81d1eedd74e6..cf4ecc1b9ee2 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1269,16 +1269,16 @@ func (bc *BlockChain) AddToStateDiffProcessedCollection(hash common.Hash) { } // WriteBlockWithState writes the block and all associated state to the database. -func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { +func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { bc.chainmu.Lock() defer bc.chainmu.Unlock() - return bc.writeBlockWithState(block, receipts, state) + return bc.writeBlockWithState(block, receipts, logs, state, emitHeadEvent) } // writeBlockWithState writes the block and all associated state to the database, // but is expects the chain mutex to be held. -func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { +func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { bc.wg.Add(1) defer bc.wg.Done() @@ -1416,6 +1416,23 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. bc.insert(block) } bc.futureBlocks.Remove(block.Hash()) + + if status == CanonStatTy { + bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) + if len(logs) > 0 { + bc.logsFeed.Send(logs) + } + // In theory we should fire a ChainHeadEvent when we inject + // a canonical block, but sometimes we can insert a batch of + // canonicial blocks. Avoid firing too much ChainHeadEvents, + // we will fire an accumulated ChainHeadEvent and disable fire + // event here. + if emitHeadEvent { + bc.chainHeadFeed.Send(ChainHeadEvent{Block: block}) + } + } else { + bc.chainSideFeed.Send(ChainSideEvent{Block: block}) + } return status, nil } @@ -1475,11 +1492,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // Pre-checks passed, start the full block imports bc.wg.Add(1) bc.chainmu.Lock() - n, events, logs, err := bc.insertChain(chain, true) + n, err := bc.insertChain(chain, true) bc.chainmu.Unlock() bc.wg.Done() - bc.PostChainEvents(events, logs) return n, err } @@ -1491,23 +1507,24 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // racey behaviour. If a sidechain import is in progress, and the historic state // is imported, but then new canon-head is added before the actual sidechain // completes, then the historic state could be pruned again -func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { +func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, error) { // If the chain is terminating, don't even bother starting up if atomic.LoadInt32(&bc.procInterrupt) == 1 { - return 0, nil, nil, nil + return 0, nil } // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) - // A queued approach to delivering events. This is generally - // faster than direct delivery and requires much less mutex - // acquiring. var ( - stats = insertStats{startTime: mclock.Now()} - events = make([]interface{}, 0, len(chain)) - lastCanon *types.Block - coalescedLogs []*types.Log + stats = insertStats{startTime: mclock.Now()} + lastCanon *types.Block ) + // Fire a single chain head event if we've progressed the chain + defer func() { + if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { + bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon}) + } + }() // Start the parallel header verifier headers := make([]*types.Header, len(chain)) seals := make([]bool, len(chain)) @@ -1557,7 +1574,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] for block != nil && err == ErrKnownBlock { log.Debug("Writing previously known block", "number", block.Number(), "hash", block.Hash()) if err := bc.writeKnownBlock(block); err != nil { - return it.index, nil, nil, err + return it.index, err } lastCanon = block @@ -1576,7 +1593,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) { log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash()) if err := bc.addFutureBlock(block); err != nil { - return it.index, events, coalescedLogs, err + return it.index, err } block, err = it.next() } @@ -1584,14 +1601,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] stats.ignored += it.remaining() // If there are any still remaining, mark as ignored - return it.index, events, coalescedLogs, err + return it.index, err // Some other error occurred, abort case err != nil: bc.futureBlocks.Remove(block.Hash()) stats.ignored += len(it.chain) bc.reportBlock(block, nil, err) - return it.index, events, coalescedLogs, err + return it.index, err } // No validation errors for the first block (or chain prefix skipped) for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() { @@ -1603,7 +1620,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // If the header is a banned one, straight out abort if BadHashes[block.Hash()] { bc.reportBlock(block, nil, ErrBlacklistedHash) - return it.index, events, coalescedLogs, ErrBlacklistedHash + return it.index, ErrBlacklistedHash } // If the block is known (in the middle of the chain), it's a special case for // Clique blocks where they can share state among each other, so importing an @@ -1620,15 +1637,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] "root", block.Root()) if err := bc.writeKnownBlock(block); err != nil { - return it.index, nil, nil, err + return it.index, err } stats.processed++ // We can assume that logs are empty here, since the only way for consecutive // Clique blocks to have the same state is if there are no transactions. - events = append(events, ChainEvent{block, block.Hash(), nil}) lastCanon = block - continue } // Retrieve the parent block and it's state to execute on top @@ -1640,7 +1655,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] } statedb, err := state.New(parent.Root, bc.stateCache) if err != nil { - return it.index, events, coalescedLogs, err + return it.index, err } // If we have a followup block, run that against the current state to pre-cache // transactions and probabilistically some of the account/storage trie nodes. @@ -1665,7 +1680,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if err != nil { bc.reportBlock(block, receipts, err) atomic.StoreUint32(&followupInterrupt, 1) - return it.index, events, coalescedLogs, err + return it.index, err } // Update the metrics touched during block processing accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them @@ -1684,7 +1699,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) atomic.StoreUint32(&followupInterrupt, 1) - return it.index, events, coalescedLogs, err + return it.index, err } proctime := time.Since(start) @@ -1696,10 +1711,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // Write the block to the chain and get the status. substart = time.Now() - status, err := bc.writeBlockWithState(block, receipts, statedb) + status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false) if err != nil { atomic.StoreUint32(&followupInterrupt, 1) - return it.index, events, coalescedLogs, err + return it.index, err } atomic.StoreUint32(&followupInterrupt, 1) @@ -1717,8 +1732,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] "elapsed", common.PrettyDuration(time.Since(start)), "root", block.Root()) - coalescedLogs = append(coalescedLogs, logs...) - events = append(events, ChainEvent{block, block.Hash(), logs}) lastCanon = block // Only count canonical blocks for GC processing time @@ -1729,7 +1742,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()), "root", block.Root()) - events = append(events, ChainSideEvent{block}) default: // This in theory is impossible, but lets be nice to our future selves and leave @@ -1748,24 +1760,20 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // Any blocks remaining here? The only ones we care about are the future ones if block != nil && err == consensus.ErrFutureBlock { if err := bc.addFutureBlock(block); err != nil { - return it.index, events, coalescedLogs, err + return it.index, err } block, err = it.next() for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() { if err := bc.addFutureBlock(block); err != nil { - return it.index, events, coalescedLogs, err + return it.index, err } stats.queued++ } } stats.ignored += it.remaining() - // Append a single chain head event if we've progressed the chain - if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { - events = append(events, ChainHeadEvent{lastCanon}) - } - return it.index, events, coalescedLogs, err + return it.index, err } // insertSideChain is called when an import batch hits upon a pruned ancestor @@ -1774,7 +1782,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // // The method writes all (header-and-body-valid) blocks to disk, then tries to // switch over to the new chain if the TD exceeded the current chain. -func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) { +func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, error) { var ( externTd *big.Int current = bc.CurrentBlock() @@ -1810,7 +1818,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i // If someone legitimately side-mines blocks, they would still be imported as usual. However, // we cannot risk writing unverified blocks to disk when they obviously target the pruning // mechanism. - return it.index, nil, nil, errors.New("sidechain ghost-state attack") + return it.index, errors.New("sidechain ghost-state attack") } } if externTd == nil { @@ -1821,7 +1829,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i if !bc.HasBlock(block.Hash(), block.NumberU64()) { start := time.Now() if err := bc.writeBlockWithoutState(block, externTd); err != nil { - return it.index, nil, nil, err + return it.index, err } log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), @@ -1838,7 +1846,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i localTd := bc.GetTd(current.Hash(), current.NumberU64()) if localTd.Cmp(externTd) > 0 { log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().Number, "sidetd", externTd, "localtd", localTd) - return it.index, nil, nil, err + return it.index, err } // Gather all the sidechain hashes (full blocks may be memory heavy) var ( @@ -1853,7 +1861,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1) } if parent == nil { - return it.index, nil, nil, errors.New("missing parent") + return it.index, errors.New("missing parent") } // Import all the pruned blocks to make the state available var ( @@ -1872,15 +1880,15 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i // memory here. if len(blocks) >= 2048 || memory > 64*1024*1024 { log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64()) - if _, _, _, err := bc.insertChain(blocks, false); err != nil { - return 0, nil, nil, err + if _, err := bc.insertChain(blocks, false); err != nil { + return 0, err } blocks, memory = blocks[:0], 0 // If the chain is terminating, stop processing blocks if atomic.LoadInt32(&bc.procInterrupt) == 1 { log.Debug("Premature abort during blocks processing") - return 0, nil, nil, nil + return 0, nil } } } @@ -1888,7 +1896,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64()) return bc.insertChain(blocks, false) } - return 0, nil, nil, nil + return 0, nil } // reorg takes two blocks, an old chain and a new chain and will reconstruct the @@ -1903,11 +1911,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { deletedTxs types.Transactions addedTxs types.Transactions - deletedLogs []*types.Log - rebirthLogs []*types.Log + deletedLogs [][]*types.Log + rebirthLogs [][]*types.Log - // collectLogs collects the logs that were generated during the - // processing of the block that corresponds with the given hash. + // collectLogs collects the logs that were generated or removed during + // the processing of the block that corresponds with the given hash. // These logs are later announced as deleted or reborn collectLogs = func(hash common.Hash, removed bool) { number := bc.hc.GetBlockNumber(hash) @@ -1915,18 +1923,40 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return } receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig) + + var logs []*types.Log for _, receipt := range receipts { for _, log := range receipt.Logs { l := *log if removed { l.Removed = true - deletedLogs = append(deletedLogs, &l) } else { - rebirthLogs = append(rebirthLogs, &l) } + logs = append(logs, &l) + } + } + if len(logs) > 0 { + if removed { + deletedLogs = append(deletedLogs, logs) + } else { + rebirthLogs = append(rebirthLogs, logs) } } } + // mergeLogs returns a merged log slice with specified sort order. + mergeLogs = func(logs [][]*types.Log, reverse bool) []*types.Log { + var ret []*types.Log + if reverse { + for i := len(logs) - 1; i >= 0; i-- { + ret = append(ret, logs[i]...) + } + } else { + for i := 0; i < len(logs); i++ { + ret = append(ret, logs[i]...) + } + } + return ret + } ) // Reduce the longer chain to the same number as the shorter one if oldBlock.NumberU64() > newBlock.NumberU64() { @@ -2021,45 +2051,18 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // this goroutine if there are no events to fire, but realistcally that only // ever happens if we're reorging empty blocks, which will only happen on idle // networks where performance is not an issue either way. - // - // TODO(karalabe): Can we get rid of the goroutine somehow to guarantee correct - // event ordering? - go func() { - if len(deletedLogs) > 0 { - bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) - } - if len(rebirthLogs) > 0 { - bc.logsFeed.Send(rebirthLogs) - } - if len(oldChain) > 0 { - for _, block := range oldChain { - bc.chainSideFeed.Send(ChainSideEvent{Block: block}) - } - } - }() - return nil -} - -// PostChainEvents iterates over the events generated by a chain insertion and -// posts them into the event feed. -// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock. -func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { - // post event logs for further processing - if logs != nil { - bc.logsFeed.Send(logs) + if len(deletedLogs) > 0 { + bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)}) } - for _, event := range events { - switch ev := event.(type) { - case ChainEvent: - bc.chainFeed.Send(ev) - - case ChainHeadEvent: - bc.chainHeadFeed.Send(ev) - - case ChainSideEvent: - bc.chainSideFeed.Send(ev) + if len(rebirthLogs) > 0 { + bc.logsFeed.Send(mergeLogs(rebirthLogs, false)) + } + if len(oldChain) > 0 { + for i := len(oldChain) - 1; i >= 0; i-- { + bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]}) } } + return nil } func (bc *BlockChain) update() { diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 20556fb06455..8cd6fac78824 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -22,6 +22,7 @@ import ( "math/big" "math/rand" "os" + "reflect" "sync" "testing" "time" @@ -960,16 +961,20 @@ func TestLogReorgs(t *testing.T) { } chain, _ = GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 3, func(i int, gen *BlockGen) {}) + done := make(chan struct{}) + go func() { + ev := <-rmLogsCh + if len(ev.Logs) == 0 { + t.Error("expected logs") + } + close(done) + }() if _, err := blockchain.InsertChain(chain); err != nil { t.Fatalf("failed to insert forked chain: %v", err) } - timeout := time.NewTimer(1 * time.Second) select { - case ev := <-rmLogsCh: - if len(ev.Logs) == 0 { - t.Error("expected logs") - } + case <-done: case <-timeout.C: t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.") } @@ -982,39 +987,47 @@ func TestLogRebirth(t *testing.T) { db = rawdb.NewMemoryDatabase() // this code generates a log - code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") - gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}} - genesis = gspec.MustCommit(db) - signer = types.NewEIP155Signer(gspec.Config.ChainID) - newLogCh = make(chan bool) + code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") + gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}} + genesis = gspec.MustCommit(db) + signer = types.NewEIP155Signer(gspec.Config.ChainID) + newLogCh = make(chan bool) + removeLogCh = make(chan bool) ) - // listenNewLog checks whether the received logs number is equal with expected. - listenNewLog := func(sink chan []*types.Log, expect int) { + // validateLogEvent checks whether the received logs number is equal with expected. + validateLogEvent := func(sink interface{}, result chan bool, expect int) { + chanval := reflect.ValueOf(sink) + chantyp := chanval.Type() + if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.RecvDir == 0 { + t.Fatalf("invalid channel, given type %v", chantyp) + } cnt := 0 + var recv []reflect.Value + timeout := time.After(1 * time.Second) + cases := []reflect.SelectCase{{Chan: chanval, Dir: reflect.SelectRecv}, {Chan: reflect.ValueOf(timeout), Dir: reflect.SelectRecv}} for { - select { - case logs := <-sink: - cnt += len(logs) - case <-time.NewTimer(5 * time.Second).C: - // new logs timeout - newLogCh <- false + chose, v, _ := reflect.Select(cases) + if chose == 1 { + // Not enough event received + result <- false return } + cnt += 1 + recv = append(recv, v) if cnt == expect { break - } else if cnt > expect { - // redundant logs received - newLogCh <- false - return } } - select { - case <-sink: - // redundant logs received - newLogCh <- false - case <-time.NewTimer(100 * time.Millisecond).C: - newLogCh <- true + done := time.After(50 * time.Millisecond) + cases = cases[:1] + cases = append(cases, reflect.SelectCase{Chan: reflect.ValueOf(done), Dir: reflect.SelectRecv}) + chose, _, _ := reflect.Select(cases) + // If chose equal 0, it means receiving redundant events. + if chose == 1 { + result <- true + } else { + result <- false } } @@ -1038,12 +1051,12 @@ func TestLogRebirth(t *testing.T) { }) // Spawn a goroutine to receive log events - go listenNewLog(logsCh, 1) + go validateLogEvent(logsCh, newLogCh, 1) if _, err := blockchain.InsertChain(chain); err != nil { t.Fatalf("failed to insert chain: %v", err) } if !<-newLogCh { - t.Fatalf("failed to receive new log event") + t.Fatal("failed to receive new log event") } // Generate long reorg chain @@ -1060,40 +1073,31 @@ func TestLogRebirth(t *testing.T) { }) // Spawn a goroutine to receive log events - go listenNewLog(logsCh, 1) + go validateLogEvent(logsCh, newLogCh, 1) + go validateLogEvent(rmLogsCh, removeLogCh, 1) if _, err := blockchain.InsertChain(forkChain); err != nil { t.Fatalf("failed to insert forked chain: %v", err) } if !<-newLogCh { - t.Fatalf("failed to receive new log event") + t.Fatal("failed to receive new log event") } - // Ensure removedLog events received - select { - case ev := <-rmLogsCh: - if len(ev.Logs) == 0 { - t.Error("expected logs") - } - case <-time.NewTimer(1 * time.Second).C: - t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.") + if !<-removeLogCh { + t.Fatal("failed to receive removed log event") } newBlocks, _ := GenerateChain(params.TestChainConfig, chain[len(chain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {}) - go listenNewLog(logsCh, 1) + go validateLogEvent(logsCh, newLogCh, 1) + go validateLogEvent(rmLogsCh, removeLogCh, 1) if _, err := blockchain.InsertChain(newBlocks); err != nil { t.Fatalf("failed to insert forked chain: %v", err) } - // Ensure removedLog events received - select { - case ev := <-rmLogsCh: - if len(ev.Logs) == 0 { - t.Error("expected logs") - } - case <-time.NewTimer(1 * time.Second).C: - t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.") - } // Rebirth logs should omit a newLogEvent if !<-newLogCh { - t.Fatalf("failed to receive new log event") + t.Fatal("failed to receive new log event") + } + // Ensure removedLog events received + if !<-removeLogCh { + t.Fatal("failed to receive removed log event") } } @@ -1145,7 +1149,6 @@ func TestSideLogRebirth(t *testing.T) { logsCh := make(chan []*types.Log) blockchain.SubscribeLogsEvent(logsCh) - chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) { if i == 1 { // Higher block difficulty diff --git a/miner/worker.go b/miner/worker.go index 52f8919f0dc2..c95b32042f14 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -590,7 +590,7 @@ func (w *worker) resultLoop() { logs = append(logs, receipt.Logs...) } // Commit block and state to database. - stat, err := w.chain.WriteBlockWithState(block, receipts, task.state) + _, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true) if err != nil { log.Error("Failed writing block to chain", "err", err) continue @@ -601,16 +601,6 @@ func (w *worker) resultLoop() { // Broadcast the block and announce chain insertion event w.mux.Post(core.NewMinedBlockEvent{Block: block}) - var events []interface{} - switch stat { - case core.CanonStatTy: - events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) - events = append(events, core.ChainHeadEvent{Block: block}) - case core.SideStatTy: - events = append(events, core.ChainSideEvent{Block: block}) - } - w.chain.PostChainEvents(events, logs) - // Insert the block into the set of pending ones to resultLoop for confirmations w.unconfirmed.Insert(block.NumberU64(), block.Hash()) @@ -996,3 +986,11 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st } return nil } + +// postSideBlock fires a side chain event, only use it for testing. +func (w *worker) postSideBlock(event core.ChainSideEvent) { + select { + case w.chainSideCh <- event: + case <-w.exitCh: + } +} diff --git a/miner/worker_test.go b/miner/worker_test.go index 95d5f64b1c03..81fb8f1b9477 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -149,9 +149,6 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool } -func (b *testWorkerBackend) PostChainEvents(events []interface{}) { - b.chain.PostChainEvents(events, nil) -} func (b *testWorkerBackend) newRandomUncle() *types.Block { var parent *types.Block @@ -243,8 +240,8 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) { for i := 0; i < 5; i++ { b.txPool.AddLocal(b.newRandomTx(true)) b.txPool.AddLocal(b.newRandomTx(false)) - b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.newRandomUncle()}}) - b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.newRandomUncle()}}) + w.postSideBlock(core.ChainSideEvent{Block: b.newRandomUncle()}) + w.postSideBlock(core.ChainSideEvent{Block: b.newRandomUncle()}) select { case e := <-loopErr: t.Fatal(e) @@ -295,7 +292,7 @@ func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consens } w.skipSealHook = func(task *task) bool { return true } w.fullTaskHook = func() { - // Aarch64 unit tests are running in a VM on travis, they must + // Arch64 unit tests are running in a VM on travis, they must // be given more time to execute. time.Sleep(time.Second) } @@ -351,7 +348,8 @@ func TestStreamUncleBlock(t *testing.T) { } } - b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.uncleBlock}}) + w.postSideBlock(core.ChainSideEvent{Block: b.uncleBlock}) + select { case <-taskCh: case <-time.NewTimer(time.Second).C: