From 73310fa0370257f5f8662f4f5182cef495026d16 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 17 Jan 2024 16:02:20 -0800 Subject: [PATCH 01/23] Add ingestion block queue --- .../ingestion/block_queue/block_queue.go | 282 +++++++++++++ .../ingestion/block_queue/block_queue_test.go | 397 ++++++++++++++++++ engine/execution/ingestion_test.go | 98 +++++ utils/unittest/fixtures.go | 40 ++ 4 files changed, 817 insertions(+) create mode 100644 engine/execution/ingestion/block_queue/block_queue.go create mode 100644 engine/execution/ingestion/block_queue/block_queue_test.go create mode 100644 engine/execution/ingestion_test.go diff --git a/engine/execution/ingestion/block_queue/block_queue.go b/engine/execution/ingestion/block_queue/block_queue.go new file mode 100644 index 00000000000..16615bec33c --- /dev/null +++ b/engine/execution/ingestion/block_queue/block_queue.go @@ -0,0 +1,282 @@ +package block_queue + +import ( + "fmt" + "sync" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/mempool/entity" +) + +type BlockQueue struct { + sync.Mutex + // when receiving a new block, adding it to the map, and add missing collections to the map + blocks map[flow.Identifier]*entity.ExecutableBlock + // a collection could be included in multiple blocks, + // when a missing block is received, it might trigger multiple blocks to be executable, which + // can be looked up by the map + // when a block is executed, its collections should be removed from this map unless a collection + // is still referenced by other blocks, which will eventually be removed when those blocks are + // executed. + collections map[flow.Identifier]*CollectionInfo + + // blockIDsByHeight is used to find next executable block. + // when a block is executed, the next executable block must be a block with height = current block height + 1 + // the following map allows us to find the next executable block by height + blockIDsByHeight map[uint64]map[flow.Identifier]*entity.ExecutableBlock // for finding next executable block +} + +type CollectionInfo struct { + Collection *entity.CompleteCollection + IncludedIn map[flow.Identifier]*entity.ExecutableBlock +} + +func NewBlockQueue() *BlockQueue { + return &BlockQueue{ + blocks: make(map[flow.Identifier]*entity.ExecutableBlock), + collections: make(map[flow.Identifier]*CollectionInfo), + blockIDsByHeight: make(map[uint64]map[flow.Identifier]*entity.ExecutableBlock), + } +} + +// OnBlock is called when a new block is received, and its parent is not executed. +// It returns a list of missing collections and a list of executable blocks +// Note caller must check if the parent is executed, if yes, then +// caller must call OnBlock again with the finalState of its parent +func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateCommitment) ( + []*flow.CollectionGuarantee, // missing collections + []*entity.ExecutableBlock, // blocks ready to execute + error, // exceptions +) { + q.Lock() + defer q.Unlock() + + // check if the block already exists + blockID := block.ID() + executable, ok := q.blocks[blockID] + if ok { + if executable.StartState == nil && parentFinalState == nil { + return nil, nil, nil + } + + if executable.StartState == nil || parentFinalState == nil { + return nil, nil, fmt.Errorf("block %s has already been executed with a nil parent final state, %v != %v", + blockID, executable.StartState, parentFinalState) + } + + if *executable.StartState != *parentFinalState { + return nil, nil, + fmt.Errorf("block %s has already been executed with a different parent final state, %v != %v", + blockID, *executable.StartState, parentFinalState) + } + + return nil, nil, nil + } + + executable = &entity.ExecutableBlock{ + Block: block, + StartState: parentFinalState, + } + + // add block to blocks + q.blocks[blockID] = executable + + // update collection + colls := make(map[flow.Identifier]*entity.CompleteCollection, len(block.Payload.Guarantees)) + executable.CompleteCollections = colls + + // find missing collections and update collection index + missingCollections := make([]*flow.CollectionGuarantee, 0, len(block.Payload.Guarantees)) + + for _, guarantee := range block.Payload.Guarantees { + colID := guarantee.ID() + colInfo, ok := q.collections[colID] + if ok { + // some other block also includes this collection + colInfo.IncludedIn[blockID] = executable + colls[colID] = colInfo.Collection + } else { + col := &entity.CompleteCollection{ + Guarantee: guarantee, + } + colls[colID] = col + + // add new collection to collections + q.collections[colID] = &CollectionInfo{ + Collection: col, + IncludedIn: map[flow.Identifier]*entity.ExecutableBlock{ + blockID: executable, + }, + } + + missingCollections = append(missingCollections, guarantee) + } + } + + // index height + blocksAtSameHeight, ok := q.blockIDsByHeight[block.Header.Height] + if !ok { + blocksAtSameHeight = make(map[flow.Identifier]*entity.ExecutableBlock) + q.blockIDsByHeight[block.Header.Height] = blocksAtSameHeight + } + blocksAtSameHeight[blockID] = executable + + // check if the block is executable + var executables []*entity.ExecutableBlock + if executable.IsComplete() { + executables = []*entity.ExecutableBlock{executable} + } + + return missingCollections, executables, nil +} + +// OnCollection is called when a new collection is received +// It returns a list of executable blocks that contains the collection +func (q *BlockQueue) OnCollection(collection *flow.Collection) ([]*entity.ExecutableBlock, error) { + q.Lock() + defer q.Unlock() + // when a collection is received, we find the blocks the collection is included in, + // and check if the blocks become executable. + // Note a collection could be included in multiple blocks, so receiving a collection + // might trigger multiple blocks to be executable. + + // check if the collection is for any block in the queue + colID := collection.ID() + colInfo, ok := q.collections[colID] + if !ok { + // no block in the queue includes this collection + return nil, nil + } + + if colInfo.Collection.IsCompleted() { + // the collection is already received, no action needed because an action must + // have been returned when the collection is first received. + return nil, nil + } + + // update collection + colInfo.Collection.Transactions = collection.Transactions + + // check if any block, which includes this collection, become executable + executables := make([]*entity.ExecutableBlock, 0, len(colInfo.IncludedIn)) + for _, block := range colInfo.IncludedIn { + if !block.IsComplete() { + continue + } + executables = append(executables, block) + } + + if len(executables) == 0 { + return nil, nil + } + + return executables, nil +} + +// OnBlockExecuted is called when a block is executed +// It returns a list of executable blocks (usually its child blocks) +func (q *BlockQueue) OnBlockExecuted( + blockID flow.Identifier, + commit flow.StateCommitment, +) ([]*entity.ExecutableBlock, error) { + q.Lock() + defer q.Unlock() + // when a block is executed, the child block might become executable + // we also remove it from all the indexes + + // remove block + block, ok := q.blocks[blockID] + if !ok { + return nil, nil + } + + delete(q.blocks, blockID) + + // remove height index + height := block.Block.Header.Height + delete(q.blockIDsByHeight[height], blockID) + if len(q.blockIDsByHeight[height]) == 0 { + delete(q.blockIDsByHeight, height) + } + + // remove colections if no other blocks include it + for colID := range block.CompleteCollections { + colInfo, ok := q.collections[colID] + if !ok { + return nil, fmt.Errorf("collection %s not found", colID) + } + + delete(colInfo.IncludedIn, blockID) + if len(colInfo.IncludedIn) == 0 { + // no other blocks includes this collection, + // so this collection can be removed from the index + delete(q.collections, colID) + } + } + + return q.checkIfChildBlockBecomeExecutable(block, commit) +} + +func (q *BlockQueue) checkIfChildBlockBecomeExecutable( + block *entity.ExecutableBlock, + commit flow.StateCommitment, +) ([]*entity.ExecutableBlock, error) { + childHeight := block.Block.Header.Height + 1 + blocksAtNextHeight, ok := q.blockIDsByHeight[childHeight] + if !ok { + // no block at next height + return nil, nil + } + + // find children and update their start state + children := make([]*entity.ExecutableBlock, 0, len(blocksAtNextHeight)) + for _, childBlock := range blocksAtNextHeight { + // a child block at the next height must have the same parent ID + // as the current block + isChild := childBlock.Block.Header.ParentID == block.ID() + if !isChild { + continue + } + + // update child block's start state with current block's end state + childBlock.StartState = &commit + children = append(children, childBlock) + } + + if len(children) == 0 { + return nil, nil + } + + // check if children are executable + executables := make([]*entity.ExecutableBlock, 0, len(children)) + for _, child := range children { + if child.IsComplete() { + executables = append(executables, child) + } + } + + return executables, nil +} + +// GetMissingCollections returns the missing collections and the start state +// It returns an error if the block is not found +func (q *BlockQueue) GetMissingCollections(blockID flow.Identifier) ( + []*flow.CollectionGuarantee, *flow.StateCommitment, error) { + q.Lock() + defer q.Unlock() + block, ok := q.blocks[blockID] + if !ok { + return nil, nil, fmt.Errorf("block %s not found", blockID) + } + + missingCollections := make([]*flow.CollectionGuarantee, 0, len(block.Block.Payload.Guarantees)) + for _, col := range block.CompleteCollections { + // check if the collection is already received + if col.IsCompleted() { + continue + } + missingCollections = append(missingCollections, col.Guarantee) + } + + return missingCollections, block.StartState, nil +} diff --git a/engine/execution/ingestion/block_queue/block_queue_test.go b/engine/execution/ingestion/block_queue/block_queue_test.go new file mode 100644 index 00000000000..8a3ab3d8047 --- /dev/null +++ b/engine/execution/ingestion/block_queue/block_queue_test.go @@ -0,0 +1,397 @@ +package block_queue + +import ( + "testing" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/mempool/entity" + "github.com/onflow/flow-go/utils/unittest" + "github.com/stretchr/testify/require" +) + +func TestSingleBlockBecomeReady(t *testing.T) { + t.Parallel() + // Given a chain + // R <- A(C1) <- B(C2,C3) <- C() <- D() + // - ^------- E(C4,C5) <- F(C6) + // - ^-----------G() + block, coll, commitFor := makeChainABCDEFG() + blockA := block("A") + c1, c2 := coll(1), coll(2) + + q := NewBlockQueue() + + // verify receving a collection (C1) before its block (A) will be ignored + executables, err := q.OnCollection(c1) + require.NoError(t, err) + requireExecutableHas(t, executables) + + // verify receving a block (A) will return missing collection (C1) + missing, executables, err := q.OnBlock(blockA, commitFor("R")) + require.NoError(t, err) + require.Empty(t, executables) + requireCollectionHas(t, missing, c1) + + // verify receving a collection (C2) that is not for the block (A) will be ignored + executables, err = q.OnCollection(c2) + require.NoError(t, err) + requireExecutableHas(t, executables) + + // verify after receiving all collections (C1), block (A) becomes executable + executables, err = q.OnCollection(c1) + require.NoError(t, err) + requireExecutableHas(t, executables, blockA) + + // verify after the block (A) is executed, no more block is executable and + // nothing left in the queue + executables, err = q.OnBlockExecuted(blockA.ID(), *commitFor("A")) + require.NoError(t, err) + requireExecutableHas(t, executables) + requireQueueIsEmpty(t, q) +} + +func TestMultipleBlockBecomesReady(t *testing.T) { + t.Parallel() + // Given a chain + // R <- A(C1) <- B(C2,C3) <- C() <- D() + // - ^------- E(C4,C5) <- F(C6) + // - ^-----------G() + block, coll, commitFor := makeChainABCDEFG() + blockA, blockB, blockC, blockD, blockE, blockF, blockG := + block("A"), block("B"), block("C"), block("D"), block("E"), block("F"), block("G") + c1, c2, c3, c4, c5, c6 := coll(1), coll(2), coll(3), coll(4), coll(5), coll(6) + + q := NewBlockQueue() + + // verify receiving blocks without collections will return missing collections and no executables + missing, executables, err := q.OnBlock(blockA, commitFor("R")) + require.NoError(t, err) + require.Empty(t, executables) + requireCollectionHas(t, missing, c1) + + missing, executables, err = q.OnBlock(blockB, nil) + require.NoError(t, err) + require.Empty(t, executables) // because A is not executed + requireCollectionHas(t, missing, c2, c3) + + // creating forks + missing, executables, err = q.OnBlock(blockE, nil) + require.NoError(t, err) + require.Empty(t, executables) // because A is not executed + requireCollectionHas(t, missing, c4, c5) + + // creating forks with empty block + missing, executables, err = q.OnBlock(blockG, nil) + require.NoError(t, err) + require.Empty(t, executables) // because E is not executed + requireCollectionHas(t, missing) + + missing, executables, err = q.OnBlock(blockF, nil) + require.NoError(t, err) + require.Empty(t, executables) // because E is not executed + requireCollectionHas(t, missing, c6) + + missing, executables, err = q.OnBlock(blockC, nil) + require.NoError(t, err) + require.Empty(t, executables) // because B is not executed + require.Empty(t, missing) + + // verify receiving all collections makes block executable + executables, err = q.OnCollection(c1) + require.NoError(t, err) + requireExecutableHas(t, executables, blockA) + + // verify receiving partial collections won't make block executable + executables, err = q.OnCollection(c2) + require.NoError(t, err) + requireExecutableHas(t, executables) // because A is not executed and C3 is not received for B to be executable + + // verify when parent block (A) is executed, the child block (B) will not become executable if + // some collection (c3) is still missing + executables, err = q.OnBlockExecuted(blockA.ID(), *commitFor("A")) + require.NoError(t, err) + requireExecutableHas(t, executables) // because C3 is not received for B to be executable + + // verify when parent block (A) has been executed, the child block (B) has all the collections + // it will become executable + executables, err = q.OnCollection(c3) + require.NoError(t, err) + requireExecutableHas(t, executables, blockB) // c2, c3 are received, blockB is executable + + executables, err = q.OnCollection(c5) + require.NoError(t, err) + requireExecutableHas(t, executables) // c2, c3 are received, blockB is executable + + executables, err = q.OnCollection(c6) + require.NoError(t, err) + requireExecutableHas(t, executables) // c2, c3 are received, blockB is executable + + executables, err = q.OnCollection(c4) + require.NoError(t, err) + requireExecutableHas(t, executables, blockE) // c2, c3 are received, blockB is executable + + // verify when parent block (E) is executed, all children block (F,G) will become executable if all + // collections (C6) have already received + executables, err = q.OnBlockExecuted(blockE.ID(), *commitFor("E")) + require.NoError(t, err) + requireExecutableHas(t, executables, blockF, blockG) + + executables, err = q.OnBlockExecuted(blockB.ID(), *commitFor("B")) + require.NoError(t, err) + requireExecutableHas(t, executables, blockC) + + executables, err = q.OnBlockExecuted(blockC.ID(), *commitFor("C")) + require.NoError(t, err) + requireExecutableHas(t, executables) + + // verify receiving a block whose parent was executed before + missing, executables, err = q.OnBlock(blockD, commitFor("C")) + require.NoError(t, err) + require.Empty(t, missing) + requireExecutableHas(t, executables, blockD) + + executables, err = q.OnBlockExecuted(blockD.ID(), *commitFor("D")) + require.NoError(t, err) + requireExecutableHas(t, executables) + + executables, err = q.OnBlockExecuted(blockF.ID(), *commitFor("F")) + require.NoError(t, err) + requireExecutableHas(t, executables) + + executables, err = q.OnBlockExecuted(blockG.ID(), *commitFor("G")) + require.NoError(t, err) + requireExecutableHas(t, executables) + + // verify after all blocks are executed, the queue is empty + requireQueueIsEmpty(t, q) +} + +func TestOnForksWithSameCollections(t *testing.T) { + t.Parallel() + // Given a chain + // A() <- B(C1, C2) <- C (C3) + // ^--- D(C1, C2) <- E (C3) + block, coll, commitFor := makeChainABCDE() + blockA, blockB, blockC, blockD, blockE := + block("A"), block("B"), block("C"), block("D"), block("E") + c1, c2, c3 := coll(1), coll(2), coll(3) + + q := NewBlockQueue() + + missing, executables, err := q.OnBlock(blockA, commitFor("R")) + require.NoError(t, err) + require.Empty(t, executables) + requireCollectionHas(t, missing) + + // receiving block B and D which have the same collections (C1, C2) + missing, executables, err = q.OnBlock(blockB, nil) + require.NoError(t, err) + require.Empty(t, executables) + requireCollectionHas(t, missing, c1, c2) + + // verify receiving D will not return any missing collections because + // missing collections were returned when receiving B + missing, executables, err = q.OnBlock(blockD, nil) + require.NoError(t, err) + require.Empty(t, executables) + requireCollectionHas(t, missing) + + // verify receiving all collections makes all blocks executable + executables, err = q.OnCollection(c1) + require.NoError(t, err) + requireExecutableHas(t, executables) + + executables, err = q.OnCollection(c2) + require.NoError(t, err) + requireExecutableHas(t, executables, blockB, blockD) + + // verify if 2 blocks (C, E) having the same collections (C3), if all collections are received, + // but only one block (C) whose parent (B) is executed, then only that block (C) becomes executable + // the other block (E) is not executable + + executables, err = q.OnBlockExecuted(blockB.ID(), *commitFor("B")) + require.NoError(t, err) + requireExecutableHas(t, executables) + + missing, executables, err = q.OnBlock(blockC, commitFor("B")) + require.NoError(t, err) + require.Empty(t, executables) + requireCollectionHas(t, missing, c3) + + missing, executables, err = q.OnBlock(blockE, nil) + require.NoError(t, err) + require.Empty(t, executables) + requireCollectionHas(t, missing) + + // verify only E is executable, because F's parent is not executed yet. + executables, err = q.OnCollection(c1) + require.NoError(t, err) + requireExecutableHas(t, executables, blockE) +} + +/* ==== Test utils ==== */ + +// GetBlock("A") => A +type GetBlock func(name string) *flow.Block + +// GetCollection(1) => C1 +type GetCollection func(name int) *flow.Collection + +// GetCommit("A") => A_FinalState +type GetCommit func(name string) *flow.StateCommitment + +// R <- A(C1) <- B(C2,C3) <- C() <- D() +// - ^------- E(C4,C5) <- F(C6) +// - ^-----------G() +func makeChainABCDEFG() (GetBlock, GetCollection, GetCommit) { + cs := unittest.CollectionListFixture(6) + c1, c2, c3, c4, c5, c6 := + cs[0], cs[1], cs[2], cs[3], cs[4], cs[5] + getCol := func(name int) *flow.Collection { + if name < 1 || name > len(cs) { + return nil + } + return cs[name-1] + } + + r := unittest.BlockFixture() + blockR := &r + bs := unittest.ChainBlockFixtureWithRoot(blockR.Header, 4) + blockA, blockB, blockC, blockD := bs[0], bs[1], bs[2], bs[3] + unittest.AddCollectionsToBlock(blockA, []*flow.Collection{c1}) + unittest.AddCollectionsToBlock(blockB, []*flow.Collection{c2, c3}) + unittest.RechainBlocks(bs) + + bs = unittest.ChainBlockFixtureWithRoot(blockA.Header, 2) + blockE, blockF := bs[0], bs[1] + unittest.AddCollectionsToBlock(blockE, []*flow.Collection{c4, c5}) + unittest.AddCollectionsToBlock(blockF, []*flow.Collection{c6}) + unittest.RechainBlocks(bs) + + bs = unittest.ChainBlockFixtureWithRoot(blockE.Header, 1) + blockG := bs[0] + + blockLookup := map[string]*flow.Block{ + "R": blockR, + "A": blockA, + "B": blockB, + "C": blockC, + "D": blockD, + "E": blockE, + "F": blockF, + "G": blockG, + } + + getBlock := func(name string) *flow.Block { + return blockLookup[name] + } + + commitLookup := make(map[string]*flow.StateCommitment, len(blockLookup)) + for name := range blockLookup { + commit := unittest.StateCommitmentFixture() + commitLookup[name] = &commit + } + + getCommit := func(name string) *flow.StateCommitment { + commit, ok := commitLookup[name] + if !ok { + panic("commit not found") + } + return commit + } + + return getBlock, getCol, getCommit +} + +// A() <- B(C1, C2) <- C (C3) +// ^--- D(C1, C2) <- E (C3) +func makeChainABCDE() (GetBlock, GetCollection, GetCommit) { + cs := unittest.CollectionListFixture(3) + c1, c2, c3 := cs[0], cs[1], cs[2] + getCol := func(name int) *flow.Collection { + if name < 1 || name >= len(cs) { + return nil + } + return cs[name-1] + } + + a := unittest.BlockFixture() + blockA := &a + bs := unittest.ChainBlockFixtureWithRoot(blockA.Header, 2) + blockB, blockC := bs[0], bs[1] + unittest.AddCollectionsToBlock(blockB, []*flow.Collection{c1, c2}) + unittest.AddCollectionsToBlock(blockC, []*flow.Collection{c3}) + unittest.RechainBlocks(bs) + + bs = unittest.ChainBlockFixtureWithRoot(blockA.Header, 2) + blockD, blockE := bs[0], bs[1] + unittest.AddCollectionsToBlock(blockD, []*flow.Collection{c1, c2}) + unittest.AddCollectionsToBlock(blockE, []*flow.Collection{c3}) + unittest.RechainBlocks(bs) + + blockLookup := map[string]*flow.Block{ + "A": blockA, + "B": blockB, + "C": blockC, + "D": blockD, + "E": blockE, + } + + getBlock := func(name string) *flow.Block { + return blockLookup[name] + } + + commitLookup := make(map[string]*flow.StateCommitment, len(blockLookup)) + for name := range blockLookup { + commit := unittest.StateCommitmentFixture() + commitLookup[name] = &commit + } + + getCommit := func(name string) *flow.StateCommitment { + commit, ok := commitLookup[name] + if !ok { + panic("commit not found") + } + return commit + } + + return getBlock, getCol, getCommit +} + +func requireExecutableHas(t *testing.T, executables []*entity.ExecutableBlock, bs ...*flow.Block) { + blocks := make(map[flow.Identifier]*flow.Block, len(bs)) + for _, b := range bs { + blocks[b.ID()] = b + } + + for _, e := range executables { + _, ok := blocks[e.Block.ID()] + require.True(t, ok) + delete(blocks, e.Block.ID()) + } + + require.Equal(t, len(bs), len(executables)) + require.Equal(t, 0, len(blocks)) +} + +func requireCollectionHas(t *testing.T, missing []*flow.CollectionGuarantee, cs ...*flow.Collection) { + collections := make(map[flow.Identifier]*flow.Collection, len(cs)) + for _, c := range cs { + collections[c.ID()] = c + } + + for _, m := range missing { + _, ok := collections[m.CollectionID] + require.True(t, ok) + delete(collections, m.CollectionID) + } + + require.Equal(t, len(cs), len(missing)) + require.Equal(t, 0, len(collections)) +} + +func requireQueueIsEmpty(t *testing.T, q *BlockQueue) { + require.Equal(t, 0, len(q.blocks)) + require.Equal(t, 0, len(q.collections)) + require.Equal(t, 0, len(q.blockIDsByHeight)) +} diff --git a/engine/execution/ingestion_test.go b/engine/execution/ingestion_test.go new file mode 100644 index 00000000000..41ccea0362e --- /dev/null +++ b/engine/execution/ingestion_test.go @@ -0,0 +1,98 @@ +package execution_test + +import ( + "fmt" + "sync" + + "github.com/rs/zerolog/log" + + "github.com/onflow/flow-go/engine/execution" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/mempool/entity" +) + +type QueueAction struct { + ToFetch []*flow.Collection + Executables []*entity.ExecutableBlock +} + +type BlockQueue interface { + OnBlock(*flow.Block) (*QueueAction, error) + OnCollection(*flow.Collection) (*QueueAction, error) + OnBlockExecuted(blockID flow.Identifier, height uint64, commit flow.StateCommitment) (*QueueAction, error) +} + +type BlockQueues struct { + sync.Mutex + // when receiving a new block, adding it to the map, and add missing collections to the map + blocks map[flow.Identifier]*entity.ExecutableBlock + // a collection could be included in multiple blocks, + // when a missing block is received, it might trigger multiple blocks to be executable, which + // can be looked up by the map + // when a block is executed, its collections should be removed from this map unless a collection + // is still referenced by other blocks, which will eventually be removed when those blocks are + // executed. + collections map[flow.Identifier]map[flow.Identifier]*entity.ExecutableBlock + + // blockIDsByHeight is used to find next executable block. + // when a block is executed, the next executable block must be a block with height = current block height + 1 + // if there are multiple blocks at the same height as current height + 1, then only those whose parent is the + // current block could be executable (assuming their collections are ready), which can be checked by + // the parentByBlockID map + // the following map allows us to find the next executable block by height + blockIDsByHeight map[uint64]map[flow.Identifier]struct{} // for finding next executable block + parentByBlockID map[flow.Identifier]flow.Identifier // block - parent + +} + +type Uploader interface { + Upload(*execution.ComputationResult) error +} + +type Broadcaster interface { + BroadcastExecutionReceipt(uint64, *flow.ExecutionReceipt) (bool, error) +} + +type UploadAndBroadcast struct { + uploader Uploader + broadcaster Broadcaster +} + +var _ ExecutionNotifier = (*UploadAndBroadcast)(nil) + +func (uab *UploadAndBroadcast) OnBlockExecuted(result *execution.ComputationResult) (string, error) { + wg := sync.WaitGroup{} + wg.Add(1) + defer wg.Wait() + go func() { + defer wg.Done() + err := uab.uploader.Upload(result) + if err != nil { + log.Err(err).Msg("error while uploading block") + // continue processing. uploads should not block execution + } + + }() + + broadcasted, err := uab.broadcaster.BroadcastExecutionReceipt( + result.ExecutableBlock.Block.Header.Height, result.ExecutionReceipt) + if err != nil { + log.Err(err).Msg("critical: failed to broadcast the receipt") + } + + return fmt.Sprintf("broadcasted: %v", broadcasted), nil +} + +// var _ Ingestion = (*IngestionCore)(nil) + +type IngestionCore struct { + notifier ExecutionNotifier +} + +type ExecutionNotifier interface { + OnBlockExecuted(*execution.ComputationResult) (log string, err error) +} + +type BlockExecutor interface { + ExecuteBlock(*entity.ExecutableBlock) (*execution.ComputationResult, error) +} diff --git a/utils/unittest/fixtures.go b/utils/unittest/fixtures.go index 7d6c24a31c4..78c699519d1 100644 --- a/utils/unittest/fixtures.go +++ b/utils/unittest/fixtures.go @@ -194,6 +194,35 @@ func BlockFixture() flow.Block { return *BlockWithParentFixture(header) } +func ChainBlockFixture(n int) []*flow.Block { + root := BlockHeaderFixture() + return ChainBlockFixtureWithRoot(root, n) +} + +func ChainBlockFixtureWithRoot(root *flow.Header, n int) []*flow.Block { + bs := make([]*flow.Block, 0, n) + parent := root + for i := 0; i < n; i++ { + b := BlockWithParentFixture(parent) + bs = append(bs, b) + parent = b.Header + } + return bs +} + +func RechainBlocks(blocks []*flow.Block) { + if len(blocks) == 0 { + return + } + + parent := blocks[0] + + for _, block := range blocks[1:] { + block.Header.ParentID = parent.ID() + parent = block + } +} + func FullBlockFixture() flow.Block { block := BlockFixture() payload := block.Payload @@ -605,6 +634,17 @@ func WithCollection(collection *flow.Collection) func(guarantee *flow.Collection } } +func AddCollectionsToBlock(block *flow.Block, collections []*flow.Collection) { + gs := make([]*flow.CollectionGuarantee, 0, len(collections)) + for _, collection := range collections { + g := collection.Guarantee() + gs = append(gs, &g) + } + + block.Payload.Guarantees = gs + block.SetPayload(*block.Payload) +} + func CollectionGuaranteeFixture(options ...func(*flow.CollectionGuarantee)) *flow.CollectionGuarantee { guarantee := &flow.CollectionGuarantee{ CollectionID: IdentifierFixture(), From 7e2060098641cb29afd2157932db1885dc27bf91 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 17 Jan 2024 17:10:57 -0800 Subject: [PATCH 02/23] fix test --- engine/execution/ingestion/block_queue/block_queue_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/execution/ingestion/block_queue/block_queue_test.go b/engine/execution/ingestion/block_queue/block_queue_test.go index 8a3ab3d8047..f2a6ffca529 100644 --- a/engine/execution/ingestion/block_queue/block_queue_test.go +++ b/engine/execution/ingestion/block_queue/block_queue_test.go @@ -178,7 +178,7 @@ func TestOnForksWithSameCollections(t *testing.T) { q := NewBlockQueue() - missing, executables, err := q.OnBlock(blockA, commitFor("R")) + missing, executables, err := q.OnBlock(blockA, commitFor("A")) require.NoError(t, err) require.Empty(t, executables) requireCollectionHas(t, missing) @@ -350,7 +350,7 @@ func makeChainABCDE() (GetBlock, GetCollection, GetCommit) { getCommit := func(name string) *flow.StateCommitment { commit, ok := commitLookup[name] if !ok { - panic("commit not found") + panic("commit not found for " + name) } return commit } From ba281598a9059fcd7ea0856b9e30cc3a90d00942 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 17 Jan 2024 17:21:13 -0800 Subject: [PATCH 03/23] fix tests --- .../ingestion/block_queue/block_queue_test.go | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/engine/execution/ingestion/block_queue/block_queue_test.go b/engine/execution/ingestion/block_queue/block_queue_test.go index f2a6ffca529..ec8557f407c 100644 --- a/engine/execution/ingestion/block_queue/block_queue_test.go +++ b/engine/execution/ingestion/block_queue/block_queue_test.go @@ -169,8 +169,9 @@ func TestMultipleBlockBecomesReady(t *testing.T) { func TestOnForksWithSameCollections(t *testing.T) { t.Parallel() // Given a chain - // A() <- B(C1, C2) <- C (C3) - // ^--- D(C1, C2) <- E (C3) + // R() <- A() <- B(C1, C2) <- C(C3) + // - ^----- D(C1, C2) <- E(C3) + // TODO: add A <- F(C1, C2, C3) block, coll, commitFor := makeChainABCDE() blockA, blockB, blockC, blockD, blockE := block("A"), block("B"), block("C"), block("D"), block("E") @@ -178,9 +179,9 @@ func TestOnForksWithSameCollections(t *testing.T) { q := NewBlockQueue() - missing, executables, err := q.OnBlock(blockA, commitFor("A")) + missing, executables, err := q.OnBlock(blockA, commitFor("R")) require.NoError(t, err) - require.Empty(t, executables) + requireExecutableHas(t, executables, blockA) requireCollectionHas(t, missing) // receiving block B and D which have the same collections (C1, C2) @@ -201,6 +202,11 @@ func TestOnForksWithSameCollections(t *testing.T) { require.NoError(t, err) requireExecutableHas(t, executables) + // A is executed + executables, err = q.OnBlockExecuted(blockA.ID(), *commitFor("A")) + require.NoError(t, err) + requireExecutableHas(t, executables) // because C2 is not received + executables, err = q.OnCollection(c2) require.NoError(t, err) requireExecutableHas(t, executables, blockB, blockD) @@ -303,8 +309,8 @@ func makeChainABCDEFG() (GetBlock, GetCollection, GetCommit) { return getBlock, getCol, getCommit } -// A() <- B(C1, C2) <- C (C3) -// ^--- D(C1, C2) <- E (C3) +// R() <- A() <- B(C1, C2) <- C(C3) +// - ^----- D(C1, C2) <- E(C3) func makeChainABCDE() (GetBlock, GetCollection, GetCommit) { cs := unittest.CollectionListFixture(3) c1, c2, c3 := cs[0], cs[1], cs[2] @@ -315,10 +321,10 @@ func makeChainABCDE() (GetBlock, GetCollection, GetCommit) { return cs[name-1] } - a := unittest.BlockFixture() - blockA := &a - bs := unittest.ChainBlockFixtureWithRoot(blockA.Header, 2) - blockB, blockC := bs[0], bs[1] + r := unittest.BlockFixture() + blockR := &r + bs := unittest.ChainBlockFixtureWithRoot(blockR.Header, 3) + blockA, blockB, blockC := bs[0], bs[1], bs[2] unittest.AddCollectionsToBlock(blockB, []*flow.Collection{c1, c2}) unittest.AddCollectionsToBlock(blockC, []*flow.Collection{c3}) unittest.RechainBlocks(bs) @@ -330,6 +336,7 @@ func makeChainABCDE() (GetBlock, GetCollection, GetCommit) { unittest.RechainBlocks(bs) blockLookup := map[string]*flow.Block{ + "R": blockR, "A": blockA, "B": blockB, "C": blockC, From d55b442de6055f5a0e5aeae957e9eebfeec34b2a Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 17 Jan 2024 17:25:53 -0800 Subject: [PATCH 04/23] fix tests --- .../execution/ingestion/block_queue/block_queue_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/engine/execution/ingestion/block_queue/block_queue_test.go b/engine/execution/ingestion/block_queue/block_queue_test.go index ec8557f407c..1668dae3ef1 100644 --- a/engine/execution/ingestion/block_queue/block_queue_test.go +++ b/engine/execution/ingestion/block_queue/block_queue_test.go @@ -229,10 +229,11 @@ func TestOnForksWithSameCollections(t *testing.T) { require.Empty(t, executables) requireCollectionHas(t, missing) - // verify only E is executable, because F's parent is not executed yet. - executables, err = q.OnCollection(c1) + // verify only C is executable, because C's parent (B) has been executed, + // E is not executable, because E's parent (D) is not executed yet. + executables, err = q.OnCollection(c3) require.NoError(t, err) - requireExecutableHas(t, executables, blockE) + requireExecutableHas(t, executables, blockC) } /* ==== Test utils ==== */ @@ -315,7 +316,7 @@ func makeChainABCDE() (GetBlock, GetCollection, GetCommit) { cs := unittest.CollectionListFixture(3) c1, c2, c3 := cs[0], cs[1], cs[2] getCol := func(name int) *flow.Collection { - if name < 1 || name >= len(cs) { + if name < 1 || name > len(cs) { return nil } return cs[name-1] From bfead019704f2df3c9754cf6a741889b1865a0b1 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 17 Jan 2024 17:40:51 -0800 Subject: [PATCH 05/23] update tests --- .../ingestion/block_queue/block_queue_test.go | 59 +++++++++++++++---- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/engine/execution/ingestion/block_queue/block_queue_test.go b/engine/execution/ingestion/block_queue/block_queue_test.go index 1668dae3ef1..08e4d7f4156 100644 --- a/engine/execution/ingestion/block_queue/block_queue_test.go +++ b/engine/execution/ingestion/block_queue/block_queue_test.go @@ -171,10 +171,10 @@ func TestOnForksWithSameCollections(t *testing.T) { // Given a chain // R() <- A() <- B(C1, C2) <- C(C3) // - ^----- D(C1, C2) <- E(C3) - // TODO: add A <- F(C1, C2, C3) - block, coll, commitFor := makeChainABCDE() - blockA, blockB, blockC, blockD, blockE := - block("A"), block("B"), block("C"), block("D"), block("E") + // - ^----- F(C1, C2, C3) + block, coll, commitFor := makeChainABCDEF() + blockA, blockB, blockC, blockD, blockE, blockF := + block("A"), block("B"), block("C"), block("D"), block("E"), block("F") c1, c2, c3 := coll(1), coll(2), coll(3) q := NewBlockQueue() @@ -190,6 +190,12 @@ func TestOnForksWithSameCollections(t *testing.T) { require.Empty(t, executables) requireCollectionHas(t, missing, c1, c2) + // receiving block F (C1, C2, C3) + missing, executables, err = q.OnBlock(blockF, nil) + require.NoError(t, err) + require.Empty(t, executables) + requireCollectionHas(t, missing, c3) // c1 and c2 are requested before, only c3 is missing + // verify receiving D will not return any missing collections because // missing collections were returned when receiving B missing, executables, err = q.OnBlock(blockD, nil) @@ -215,25 +221,45 @@ func TestOnForksWithSameCollections(t *testing.T) { // but only one block (C) whose parent (B) is executed, then only that block (C) becomes executable // the other block (E) is not executable - executables, err = q.OnBlockExecuted(blockB.ID(), *commitFor("B")) - require.NoError(t, err) - requireExecutableHas(t, executables) - - missing, executables, err = q.OnBlock(blockC, commitFor("B")) + missing, executables, err = q.OnBlock(blockC, nil) require.NoError(t, err) require.Empty(t, executables) - requireCollectionHas(t, missing, c3) + requireCollectionHas(t, missing) // because C3 is requested when F is received missing, executables, err = q.OnBlock(blockE, nil) require.NoError(t, err) require.Empty(t, executables) requireCollectionHas(t, missing) - // verify only C is executable, because C's parent (B) has been executed, + executables, err = q.OnBlockExecuted(blockB.ID(), *commitFor("B")) + require.NoError(t, err) + requireExecutableHas(t, executables) + + // verify C and F are executable, because their parent have been executed // E is not executable, because E's parent (D) is not executed yet. executables, err = q.OnCollection(c3) require.NoError(t, err) - requireExecutableHas(t, executables, blockC) + requireExecutableHas(t, executables, blockC, blockF) + + // verify when D is executed, E becomes executable + executables, err = q.OnBlockExecuted(blockD.ID(), *commitFor("D")) + require.NoError(t, err) + requireExecutableHas(t, executables, blockE) + + // verify the remaining blocks (C,E,F) are executed, the queue is empty + executables, err = q.OnBlockExecuted(blockE.ID(), *commitFor("E")) + require.NoError(t, err) + requireExecutableHas(t, executables) + + executables, err = q.OnBlockExecuted(blockF.ID(), *commitFor("F")) + require.NoError(t, err) + requireExecutableHas(t, executables) + + executables, err = q.OnBlockExecuted(blockC.ID(), *commitFor("C")) + require.NoError(t, err) + requireExecutableHas(t, executables) + + requireQueueIsEmpty(t, q) } /* ==== Test utils ==== */ @@ -312,7 +338,8 @@ func makeChainABCDEFG() (GetBlock, GetCollection, GetCommit) { // R() <- A() <- B(C1, C2) <- C(C3) // - ^----- D(C1, C2) <- E(C3) -func makeChainABCDE() (GetBlock, GetCollection, GetCommit) { +// - ^----- F(C1, C2, C3) +func makeChainABCDEF() (GetBlock, GetCollection, GetCommit) { cs := unittest.CollectionListFixture(3) c1, c2, c3 := cs[0], cs[1], cs[2] getCol := func(name int) *flow.Collection { @@ -336,6 +363,11 @@ func makeChainABCDE() (GetBlock, GetCollection, GetCommit) { unittest.AddCollectionsToBlock(blockE, []*flow.Collection{c3}) unittest.RechainBlocks(bs) + bs = unittest.ChainBlockFixtureWithRoot(blockA.Header, 1) + blockF := bs[0] + unittest.AddCollectionsToBlock(blockF, []*flow.Collection{c1, c2, c3}) + unittest.RechainBlocks(bs) + blockLookup := map[string]*flow.Block{ "R": blockR, "A": blockA, @@ -343,6 +375,7 @@ func makeChainABCDE() (GetBlock, GetCollection, GetCommit) { "C": blockC, "D": blockD, "E": blockE, + "F": blockF, } getBlock := func(name string) *flow.Block { From 89b2ed4eb2053ebcf427de2249efc289ceb11d43 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 22 Jan 2024 09:07:24 -0800 Subject: [PATCH 06/23] fix lint --- engine/execution/ingestion/block_queue/block_queue_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/execution/ingestion/block_queue/block_queue_test.go b/engine/execution/ingestion/block_queue/block_queue_test.go index 08e4d7f4156..2ca09e3a2cf 100644 --- a/engine/execution/ingestion/block_queue/block_queue_test.go +++ b/engine/execution/ingestion/block_queue/block_queue_test.go @@ -3,10 +3,11 @@ package block_queue import ( "testing" + "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/mempool/entity" "github.com/onflow/flow-go/utils/unittest" - "github.com/stretchr/testify/require" ) func TestSingleBlockBecomeReady(t *testing.T) { From 1e794624b20cc556aaa81981cfb64864e1984605 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 22 Jan 2024 09:50:19 -0800 Subject: [PATCH 07/23] update comments --- .../ingestion/block_queue/block_queue.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/engine/execution/ingestion/block_queue/block_queue.go b/engine/execution/ingestion/block_queue/block_queue.go index 16615bec33c..a71ac0f02ad 100644 --- a/engine/execution/ingestion/block_queue/block_queue.go +++ b/engine/execution/ingestion/block_queue/block_queue.go @@ -8,6 +8,11 @@ import ( "github.com/onflow/flow-go/module/mempool/entity" ) +// BlockQueue keeps track of state of blocks and determines which blocks are executable +// A block becomes executable when all the following conditions are met: +// 1. the block has been validated by consensus algorithm +// 2. the block's parent has been executed +// 3. all the collections included in the block have been received type BlockQueue struct { sync.Mutex // when receiving a new block, adding it to the map, and add missing collections to the map @@ -18,7 +23,7 @@ type BlockQueue struct { // when a block is executed, its collections should be removed from this map unless a collection // is still referenced by other blocks, which will eventually be removed when those blocks are // executed. - collections map[flow.Identifier]*CollectionInfo + collections map[flow.Identifier]*collectionInfo // blockIDsByHeight is used to find next executable block. // when a block is executed, the next executable block must be a block with height = current block height + 1 @@ -26,7 +31,9 @@ type BlockQueue struct { blockIDsByHeight map[uint64]map[flow.Identifier]*entity.ExecutableBlock // for finding next executable block } -type CollectionInfo struct { +// collectionInfo is an internal struct used to keep track of the state of a collection, +// and the blocks that include the collection +type collectionInfo struct { Collection *entity.CompleteCollection IncludedIn map[flow.Identifier]*entity.ExecutableBlock } @@ -34,7 +41,7 @@ type CollectionInfo struct { func NewBlockQueue() *BlockQueue { return &BlockQueue{ blocks: make(map[flow.Identifier]*entity.ExecutableBlock), - collections: make(map[flow.Identifier]*CollectionInfo), + collections: make(map[flow.Identifier]*collectionInfo), blockIDsByHeight: make(map[uint64]map[flow.Identifier]*entity.ExecutableBlock), } } @@ -102,7 +109,7 @@ func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateComm colls[colID] = col // add new collection to collections - q.collections[colID] = &CollectionInfo{ + q.collections[colID] = &collectionInfo{ Collection: col, IncludedIn: map[flow.Identifier]*entity.ExecutableBlock{ blockID: executable, From 1ad9dc5e8254538fde6b98cfe19168bcfad8a964 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 22 Jan 2024 10:20:32 -0800 Subject: [PATCH 08/23] remove tests --- engine/execution/ingestion_test.go | 98 ------------------------------ 1 file changed, 98 deletions(-) delete mode 100644 engine/execution/ingestion_test.go diff --git a/engine/execution/ingestion_test.go b/engine/execution/ingestion_test.go deleted file mode 100644 index 41ccea0362e..00000000000 --- a/engine/execution/ingestion_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package execution_test - -import ( - "fmt" - "sync" - - "github.com/rs/zerolog/log" - - "github.com/onflow/flow-go/engine/execution" - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module/mempool/entity" -) - -type QueueAction struct { - ToFetch []*flow.Collection - Executables []*entity.ExecutableBlock -} - -type BlockQueue interface { - OnBlock(*flow.Block) (*QueueAction, error) - OnCollection(*flow.Collection) (*QueueAction, error) - OnBlockExecuted(blockID flow.Identifier, height uint64, commit flow.StateCommitment) (*QueueAction, error) -} - -type BlockQueues struct { - sync.Mutex - // when receiving a new block, adding it to the map, and add missing collections to the map - blocks map[flow.Identifier]*entity.ExecutableBlock - // a collection could be included in multiple blocks, - // when a missing block is received, it might trigger multiple blocks to be executable, which - // can be looked up by the map - // when a block is executed, its collections should be removed from this map unless a collection - // is still referenced by other blocks, which will eventually be removed when those blocks are - // executed. - collections map[flow.Identifier]map[flow.Identifier]*entity.ExecutableBlock - - // blockIDsByHeight is used to find next executable block. - // when a block is executed, the next executable block must be a block with height = current block height + 1 - // if there are multiple blocks at the same height as current height + 1, then only those whose parent is the - // current block could be executable (assuming their collections are ready), which can be checked by - // the parentByBlockID map - // the following map allows us to find the next executable block by height - blockIDsByHeight map[uint64]map[flow.Identifier]struct{} // for finding next executable block - parentByBlockID map[flow.Identifier]flow.Identifier // block - parent - -} - -type Uploader interface { - Upload(*execution.ComputationResult) error -} - -type Broadcaster interface { - BroadcastExecutionReceipt(uint64, *flow.ExecutionReceipt) (bool, error) -} - -type UploadAndBroadcast struct { - uploader Uploader - broadcaster Broadcaster -} - -var _ ExecutionNotifier = (*UploadAndBroadcast)(nil) - -func (uab *UploadAndBroadcast) OnBlockExecuted(result *execution.ComputationResult) (string, error) { - wg := sync.WaitGroup{} - wg.Add(1) - defer wg.Wait() - go func() { - defer wg.Done() - err := uab.uploader.Upload(result) - if err != nil { - log.Err(err).Msg("error while uploading block") - // continue processing. uploads should not block execution - } - - }() - - broadcasted, err := uab.broadcaster.BroadcastExecutionReceipt( - result.ExecutableBlock.Block.Header.Height, result.ExecutionReceipt) - if err != nil { - log.Err(err).Msg("critical: failed to broadcast the receipt") - } - - return fmt.Sprintf("broadcasted: %v", broadcasted), nil -} - -// var _ Ingestion = (*IngestionCore)(nil) - -type IngestionCore struct { - notifier ExecutionNotifier -} - -type ExecutionNotifier interface { - OnBlockExecuted(*execution.ComputationResult) (log string, err error) -} - -type BlockExecutor interface { - ExecuteBlock(*entity.ExecutableBlock) (*execution.ComputationResult, error) -} From 8959c91cf7792d23357953e7476415359e552e83 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 22 Jan 2024 14:24:36 -0800 Subject: [PATCH 09/23] update comments --- engine/execution/ingestion/block_queue/block_queue.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/engine/execution/ingestion/block_queue/block_queue.go b/engine/execution/ingestion/block_queue/block_queue.go index a71ac0f02ad..842bb1439e4 100644 --- a/engine/execution/ingestion/block_queue/block_queue.go +++ b/engine/execution/ingestion/block_queue/block_queue.go @@ -48,8 +48,9 @@ func NewBlockQueue() *BlockQueue { // OnBlock is called when a new block is received, and its parent is not executed. // It returns a list of missing collections and a list of executable blocks -// Note caller must check if the parent is executed, if yes, then -// caller must call OnBlock again with the finalState of its parent +// Note: caller must ensure when OnBlock is called with a block, +// if its parent is not executed, then the parent must be added to the queue first. +// if it sparent is executed, then the parent's finalState must be passed in. func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateCommitment) ( []*flow.CollectionGuarantee, // missing collections []*entity.ExecutableBlock, // blocks ready to execute From e8f0108de168daa97b2c180ad02c3865e8d0a2ea Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 22 Jan 2024 19:41:13 -0800 Subject: [PATCH 10/23] handle and log edge cases --- .../ingestion/block_queue/block_queue.go | 59 +++++++++++++++++-- 1 file changed, 54 insertions(+), 5 deletions(-) diff --git a/engine/execution/ingestion/block_queue/block_queue.go b/engine/execution/ingestion/block_queue/block_queue.go index 842bb1439e4..21b813d4cb5 100644 --- a/engine/execution/ingestion/block_queue/block_queue.go +++ b/engine/execution/ingestion/block_queue/block_queue.go @@ -6,6 +6,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/mempool/entity" + "github.com/rs/zerolog" ) // BlockQueue keeps track of state of blocks and determines which blocks are executable @@ -15,6 +16,8 @@ import ( // 3. all the collections included in the block have been received type BlockQueue struct { sync.Mutex + log zerolog.Logger + // when receiving a new block, adding it to the map, and add missing collections to the map blocks map[flow.Identifier]*entity.ExecutableBlock // a collection could be included in multiple blocks, @@ -38,8 +41,11 @@ type collectionInfo struct { IncludedIn map[flow.Identifier]*entity.ExecutableBlock } -func NewBlockQueue() *BlockQueue { +func NewBlockQueue(logger zerolog.Logger) *BlockQueue { + log := logger.With().Str("module", "block_queue").Logger() + return &BlockQueue{ + log: log, blocks: make(map[flow.Identifier]*entity.ExecutableBlock), collections: make(map[flow.Identifier]*collectionInfo), blockIDsByHeight: make(map[uint64]map[flow.Identifier]*entity.ExecutableBlock), @@ -50,7 +56,7 @@ func NewBlockQueue() *BlockQueue { // It returns a list of missing collections and a list of executable blocks // Note: caller must ensure when OnBlock is called with a block, // if its parent is not executed, then the parent must be added to the queue first. -// if it sparent is executed, then the parent's finalState must be passed in. +// if its parent is executed, then the parent's finalState must be passed in. func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateCommitment) ( []*flow.CollectionGuarantee, // missing collections []*entity.ExecutableBlock, // blocks ready to execute @@ -62,16 +68,38 @@ func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateComm // check if the block already exists blockID := block.ID() executable, ok := q.blocks[blockID] + // handle the case where the block has seen before if ok { + // we have already received this block, and its parent still has not been executed yet if executable.StartState == nil && parentFinalState == nil { return nil, nil, nil } - if executable.StartState == nil || parentFinalState == nil { - return nil, nil, fmt.Errorf("block %s has already been executed with a nil parent final state, %v != %v", - blockID, executable.StartState, parentFinalState) + // this is an edge case where parentFinalState is provided, but its parent block has not been + // marked as executed yet (OnBlockExecuted(parent) is not called), + // in this edge case, we will internally call OnBlockExecuted(parentBlockID, parentFinalState) first + if executable.StartState == nil && parentFinalState != nil { + executables, err := q.onBlockExecuted(block.Header.ParentID, *parentFinalState) + if err != nil { + return nil, nil, fmt.Errorf("receiving block %v with parent commitment %v, but parent block %v already exists with no commitment, fail to call mark parent as executed: %w", + blockID, *parentFinalState, block.Header.ParentID, err) + } + + // we already have this block, its collection must have been fetched, so we only return the + // executables from marking its parent as executed. + return nil, executables, nil + } + + // this is an edge case could be ignored + if executable.StartState != nil && parentFinalState == nil { + q.log.Warn(). + Str("blockID", blockID.String()). + Hex("parentID", block.Header.ParentID[:]). + Msg("edge case: receiving block with no parent commitment, but its parent block actually has been executed") + return nil, nil, nil } + // this is an exception that should not happen if *executable.StartState != *parentFinalState { return nil, nil, fmt.Errorf("block %s has already been executed with a different parent final state, %v != %v", @@ -81,6 +109,16 @@ func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateComm return nil, nil, nil } + // if parentFinalState is not provided, then its parent block must exists in the queue + // otherwise it's an exception + if parentFinalState == nil { + _, parentExists := q.blocks[block.Header.ParentID] + if !parentExists { + return nil, nil, fmt.Errorf("parent block %s of block %s is not in the queue", + block.Header.ParentID, blockID) + } + } + executable = &entity.ExecutableBlock{ Block: block, StartState: parentFinalState, @@ -132,6 +170,9 @@ func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateComm // check if the block is executable var executables []*entity.ExecutableBlock if executable.IsComplete() { + // executables might contain other siblings, but won't contain "executable", + // which is the block itself, that's because executables are created + // from OnBlockExecuted( executables = []*entity.ExecutableBlock{executable} } @@ -189,6 +230,14 @@ func (q *BlockQueue) OnBlockExecuted( ) ([]*entity.ExecutableBlock, error) { q.Lock() defer q.Unlock() + + return q.onBlockExecuted(blockID, commit) +} + +func (q *BlockQueue) onBlockExecuted( + blockID flow.Identifier, + commit flow.StateCommitment, +) ([]*entity.ExecutableBlock, error) { // when a block is executed, the child block might become executable // we also remove it from all the indexes From 6a160b44115687d2634f45b2d42339ca27736086 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 24 Jan 2024 11:13:03 -0800 Subject: [PATCH 11/23] update block queue --- .../ingestion/block_queue/block_queue.go | 37 ++++++++++---- .../ingestion/block_queue/block_queue_test.go | 50 ++++++++++++++++--- 2 files changed, 72 insertions(+), 15 deletions(-) diff --git a/engine/execution/ingestion/block_queue/block_queue.go b/engine/execution/ingestion/block_queue/block_queue.go index 21b813d4cb5..54a73f40c1a 100644 --- a/engine/execution/ingestion/block_queue/block_queue.go +++ b/engine/execution/ingestion/block_queue/block_queue.go @@ -34,6 +34,16 @@ type BlockQueue struct { blockIDsByHeight map[uint64]map[flow.Identifier]*entity.ExecutableBlock // for finding next executable block } +type MissingCollection struct { + BlockID flow.Identifier + Height uint64 + Guarantee *flow.CollectionGuarantee +} + +func (m *MissingCollection) ID() flow.Identifier { + return m.Guarantee.ID() +} + // collectionInfo is an internal struct used to keep track of the state of a collection, // and the blocks that include the collection type collectionInfo struct { @@ -58,7 +68,7 @@ func NewBlockQueue(logger zerolog.Logger) *BlockQueue { // if its parent is not executed, then the parent must be added to the queue first. // if its parent is executed, then the parent's finalState must be passed in. func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateCommitment) ( - []*flow.CollectionGuarantee, // missing collections + []*MissingCollection, // missing collections []*entity.ExecutableBlock, // blocks ready to execute error, // exceptions ) { @@ -75,9 +85,10 @@ func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateComm return nil, nil, nil } - // this is an edge case where parentFinalState is provided, but its parent block has not been - // marked as executed yet (OnBlockExecuted(parent) is not called), - // in this edge case, we will internally call OnBlockExecuted(parentBlockID, parentFinalState) first + // this is an edge case where parentFinalState is provided, and its parent block exists + // in the queue but has not been marked as executed yet (OnBlockExecuted(parent) is not called), + // in this case, we will internally call OnBlockExecuted(parentBlockID, parentFinalState). + // there is no need to create the executable block again, since it's already created. if executable.StartState == nil && parentFinalState != nil { executables, err := q.onBlockExecuted(block.Header.ParentID, *parentFinalState) if err != nil { @@ -132,7 +143,7 @@ func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateComm executable.CompleteCollections = colls // find missing collections and update collection index - missingCollections := make([]*flow.CollectionGuarantee, 0, len(block.Payload.Guarantees)) + missingCollections := make([]*MissingCollection, 0, len(block.Payload.Guarantees)) for _, guarantee := range block.Payload.Guarantees { colID := guarantee.ID() @@ -155,7 +166,7 @@ func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateComm }, } - missingCollections = append(missingCollections, guarantee) + missingCollections = append(missingCollections, missingCollectionForBlock(executable, guarantee)) } } @@ -318,7 +329,7 @@ func (q *BlockQueue) checkIfChildBlockBecomeExecutable( // GetMissingCollections returns the missing collections and the start state // It returns an error if the block is not found func (q *BlockQueue) GetMissingCollections(blockID flow.Identifier) ( - []*flow.CollectionGuarantee, *flow.StateCommitment, error) { + []*MissingCollection, *flow.StateCommitment, error) { q.Lock() defer q.Unlock() block, ok := q.blocks[blockID] @@ -326,14 +337,22 @@ func (q *BlockQueue) GetMissingCollections(blockID flow.Identifier) ( return nil, nil, fmt.Errorf("block %s not found", blockID) } - missingCollections := make([]*flow.CollectionGuarantee, 0, len(block.Block.Payload.Guarantees)) + missingCollections := make([]*MissingCollection, 0, len(block.Block.Payload.Guarantees)) for _, col := range block.CompleteCollections { // check if the collection is already received if col.IsCompleted() { continue } - missingCollections = append(missingCollections, col.Guarantee) + missingCollections = append(missingCollections, missingCollectionForBlock(block, col.Guarantee)) } return missingCollections, block.StartState, nil } + +func missingCollectionForBlock(block *entity.ExecutableBlock, guarantee *flow.CollectionGuarantee) *MissingCollection { + return &MissingCollection{ + BlockID: block.ID(), + Height: block.Block.Header.Height, + Guarantee: guarantee, + } +} diff --git a/engine/execution/ingestion/block_queue/block_queue_test.go b/engine/execution/ingestion/block_queue/block_queue_test.go index 2ca09e3a2cf..1c1da0d1ab8 100644 --- a/engine/execution/ingestion/block_queue/block_queue_test.go +++ b/engine/execution/ingestion/block_queue/block_queue_test.go @@ -20,7 +20,7 @@ func TestSingleBlockBecomeReady(t *testing.T) { blockA := block("A") c1, c2 := coll(1), coll(2) - q := NewBlockQueue() + q := NewBlockQueue(unittest.Logger()) // verify receving a collection (C1) before its block (A) will be ignored executables, err := q.OnCollection(c1) @@ -62,7 +62,7 @@ func TestMultipleBlockBecomesReady(t *testing.T) { block("A"), block("B"), block("C"), block("D"), block("E"), block("F"), block("G") c1, c2, c3, c4, c5, c6 := coll(1), coll(2), coll(3), coll(4), coll(5), coll(6) - q := NewBlockQueue() + q := NewBlockQueue(unittest.Logger()) // verify receiving blocks without collections will return missing collections and no executables missing, executables, err := q.OnBlock(blockA, commitFor("R")) @@ -178,7 +178,7 @@ func TestOnForksWithSameCollections(t *testing.T) { block("A"), block("B"), block("C"), block("D"), block("E"), block("F") c1, c2, c3 := coll(1), coll(2), coll(3) - q := NewBlockQueue() + q := NewBlockQueue(unittest.Logger()) missing, executables, err := q.OnBlock(blockA, commitFor("R")) require.NoError(t, err) @@ -263,6 +263,44 @@ func TestOnForksWithSameCollections(t *testing.T) { requireQueueIsEmpty(t, q) } +func TestOnBlockWithMissingParentCommit(t *testing.T) { + t.Parallel() + // Given a chain + // R <- A(C1) <- B(C2,C3) <- C() <- D() + // - ^------- E(C4,C5) <- F(C6) + // - ^-----------G() + + block, coll, commitFor := makeChainABCDEFG() + blockA, blockB := block("A"), block("B") + c1, c2, c3 := coll(1), coll(2), coll(3) + + q := NewBlockQueue(unittest.Logger()) + + missing, executables, err := q.OnBlock(blockA, commitFor("R")) + require.NoError(t, err) + require.Empty(t, executables) + requireCollectionHas(t, missing, c1) + + // block A is executable + executables, err = q.OnCollection(c1) + require.NoError(t, err) + requireExecutableHas(t, executables, blockA) + + // the following two calls create an edge case where A is executed, + // and B is received, however, due to race condition, the parent commit + // was not saved in the database yet + executables, err = q.OnBlockExecuted(blockA.ID(), *commitFor("A")) + require.NoError(t, err) + requireExecutableHas(t, executables) + requireQueueIsEmpty(t, q) + + missing, executables, err = q.OnBlock(blockB, nil) + require.NoError(t, err) + require.Empty(t, executables) + requireCollectionHas(t, missing, c2, c3) + +} + /* ==== Test utils ==== */ // GetBlock("A") => A @@ -416,16 +454,16 @@ func requireExecutableHas(t *testing.T, executables []*entity.ExecutableBlock, b require.Equal(t, 0, len(blocks)) } -func requireCollectionHas(t *testing.T, missing []*flow.CollectionGuarantee, cs ...*flow.Collection) { +func requireCollectionHas(t *testing.T, missing []*MissingCollection, cs ...*flow.Collection) { collections := make(map[flow.Identifier]*flow.Collection, len(cs)) for _, c := range cs { collections[c.ID()] = c } for _, m := range missing { - _, ok := collections[m.CollectionID] + _, ok := collections[m.Guarantee.CollectionID] require.True(t, ok) - delete(collections, m.CollectionID) + delete(collections, m.Guarantee.CollectionID) } require.Equal(t, len(cs), len(missing)) From 13af9da0afc4d96b8c9e3b7b7581f3e5b79e8e4f Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 24 Jan 2024 14:17:01 -0800 Subject: [PATCH 12/23] handle race condition --- .../ingestion/block_queue/block_queue.go | 45 ++++++++++++++----- .../ingestion/block_queue/block_queue_test.go | 24 +++++++++- 2 files changed, 56 insertions(+), 13 deletions(-) diff --git a/engine/execution/ingestion/block_queue/block_queue.go b/engine/execution/ingestion/block_queue/block_queue.go index 54a73f40c1a..3d42d50bd5a 100644 --- a/engine/execution/ingestion/block_queue/block_queue.go +++ b/engine/execution/ingestion/block_queue/block_queue.go @@ -9,6 +9,8 @@ import ( "github.com/rs/zerolog" ) +var ErrMissingParent = fmt.Errorf("missing parent block") + // BlockQueue keeps track of state of blocks and determines which blocks are executable // A block becomes executable when all the following conditions are met: // 1. the block has been validated by consensus algorithm @@ -18,8 +20,14 @@ type BlockQueue struct { sync.Mutex log zerolog.Logger - // when receiving a new block, adding it to the map, and add missing collections to the map + // if a block still exists in this map, it means either some of its collection is missing, + // or its parent block has not been executed. + // if a block's StartState is not nil, it means its parent block has been executed, and + // its parent block must have been removed from this map + // if a block's StartState is nil, it means its parent block has not been executed yet. + // and its parent must be found in the this map as well blocks map[flow.Identifier]*entity.ExecutableBlock + // a collection could be included in multiple blocks, // when a missing block is received, it might trigger multiple blocks to be executable, which // can be looked up by the map @@ -30,8 +38,8 @@ type BlockQueue struct { // blockIDsByHeight is used to find next executable block. // when a block is executed, the next executable block must be a block with height = current block height + 1 - // the following map allows us to find the next executable block by height - blockIDsByHeight map[uint64]map[flow.Identifier]*entity.ExecutableBlock // for finding next executable block + // the following map allows us to find the next executable block by height and their parent block ID + blockIDsByHeight map[uint64]map[flow.Identifier]*entity.ExecutableBlock } type MissingCollection struct { @@ -62,11 +70,13 @@ func NewBlockQueue(logger zerolog.Logger) *BlockQueue { } } -// OnBlock is called when a new block is received, and its parent is not executed. -// It returns a list of missing collections and a list of executable blocks -// Note: caller must ensure when OnBlock is called with a block, -// if its parent is not executed, then the parent must be added to the queue first. -// if its parent is executed, then the parent's finalState must be passed in. +// OnBlock is called when a new block is received, the parentFinalState indicates +// whether its parent block has been executed. +// Caller must ensure: +// 1. blocks are passsed in order, i.e. parent block is passed in before its child block +// 2. if a block's parent is not executed, then the parent block must be passed in first +// 3. if a block's parent is executed, then the parent's finalState must be passed in +// It returns (nil, nil, nil) if this block is a duplication func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateCommitment) ( []*MissingCollection, // missing collections []*entity.ExecutableBlock, // blocks ready to execute @@ -125,8 +135,9 @@ func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateComm if parentFinalState == nil { _, parentExists := q.blocks[block.Header.ParentID] if !parentExists { - return nil, nil, fmt.Errorf("parent block %s of block %s is not in the queue", - block.Header.ParentID, blockID) + return nil, nil, + fmt.Errorf("block %s has no parent commitment, but its parent block %s does not exist in the queue: %w", + blockID, block.Header.ParentID, ErrMissingParent) } } @@ -235,6 +246,8 @@ func (q *BlockQueue) OnCollection(collection *flow.Collection) ([]*entity.Execut // OnBlockExecuted is called when a block is executed // It returns a list of executable blocks (usually its child blocks) +// The caller has to ensure OnBlockExecuted is not called in a wrong order, such as +// OnBlockExecuted(childBlock) being called before OnBlockExecuted(parentBlock). func (q *BlockQueue) OnBlockExecuted( blockID flow.Identifier, commit flow.StateCommitment, @@ -258,6 +271,15 @@ func (q *BlockQueue) onBlockExecuted( return nil, nil } + // sanity check + // if a block exists in the queue and is executed, then its parent block + // must not exist in the queue, otherwise the state is inconsistent + _, parentExists := q.blocks[block.Block.Header.ParentID] + if parentExists { + return nil, fmt.Errorf("parent block %s of block %s is in the queue", + block.Block.Header.ParentID, blockID) + } + delete(q.blocks, blockID) // remove height index @@ -326,7 +348,8 @@ func (q *BlockQueue) checkIfChildBlockBecomeExecutable( return executables, nil } -// GetMissingCollections returns the missing collections and the start state +// GetMissingCollections returns the missing collections and the start state for the given block +// Useful for debugging what is missing for the next unexecuted block to become executable. // It returns an error if the block is not found func (q *BlockQueue) GetMissingCollections(blockID flow.Identifier) ( []*MissingCollection, *flow.StateCommitment, error) { diff --git a/engine/execution/ingestion/block_queue/block_queue_test.go b/engine/execution/ingestion/block_queue/block_queue_test.go index 1c1da0d1ab8..376f1138b51 100644 --- a/engine/execution/ingestion/block_queue/block_queue_test.go +++ b/engine/execution/ingestion/block_queue/block_queue_test.go @@ -1,6 +1,7 @@ package block_queue import ( + "errors" "testing" "github.com/stretchr/testify/require" @@ -281,7 +282,7 @@ func TestOnBlockWithMissingParentCommit(t *testing.T) { require.Empty(t, executables) requireCollectionHas(t, missing, c1) - // block A is executable + // block A has all the collections and become executable executables, err = q.OnCollection(c1) require.NoError(t, err) requireExecutableHas(t, executables, blockA) @@ -294,11 +295,30 @@ func TestOnBlockWithMissingParentCommit(t *testing.T) { requireExecutableHas(t, executables) requireQueueIsEmpty(t, q) - missing, executables, err = q.OnBlock(blockB, nil) + // verify when race condition happens, ErrMissingParent will be returned + _, _, err = q.OnBlock(blockB, nil) + require.True(t, errors.Is(err, ErrMissingParent), err) + + // verify if called again with parent commit, it will be successful + missing, executables, err = q.OnBlock(blockB, commitFor("A")) require.NoError(t, err) require.Empty(t, executables) requireCollectionHas(t, missing, c2, c3) + // verify after receiving all collections, B becomes executable + executables, err = q.OnCollection(c2) + require.NoError(t, err) + require.Empty(t, executables) + + executables, err = q.OnCollection(c3) + require.NoError(t, err) + requireExecutableHas(t, executables, blockB) + + // verify after B is executed, the queue is empty + executables, err = q.OnBlockExecuted(blockB.ID(), *commitFor("B")) + require.NoError(t, err) + requireExecutableHas(t, executables) + requireQueueIsEmpty(t, q) } /* ==== Test utils ==== */ From 33d907f7a0df6581e53fe5361eb77c52da032068 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 1 Feb 2024 08:13:47 -0800 Subject: [PATCH 13/23] rename --- .../ingestion/{block_queue/block_queue.go => blocks/queue.go} | 2 +- .../{block_queue/block_queue_test.go => blocks/queue_test.go} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename engine/execution/ingestion/{block_queue/block_queue.go => blocks/queue.go} (99%) rename engine/execution/ingestion/{block_queue/block_queue_test.go => blocks/queue_test.go} (99%) diff --git a/engine/execution/ingestion/block_queue/block_queue.go b/engine/execution/ingestion/blocks/queue.go similarity index 99% rename from engine/execution/ingestion/block_queue/block_queue.go rename to engine/execution/ingestion/blocks/queue.go index 3d42d50bd5a..bc0347dae31 100644 --- a/engine/execution/ingestion/block_queue/block_queue.go +++ b/engine/execution/ingestion/blocks/queue.go @@ -1,4 +1,4 @@ -package block_queue +package blocks import ( "fmt" diff --git a/engine/execution/ingestion/block_queue/block_queue_test.go b/engine/execution/ingestion/blocks/queue_test.go similarity index 99% rename from engine/execution/ingestion/block_queue/block_queue_test.go rename to engine/execution/ingestion/blocks/queue_test.go index 376f1138b51..17add81b3a1 100644 --- a/engine/execution/ingestion/block_queue/block_queue_test.go +++ b/engine/execution/ingestion/blocks/queue_test.go @@ -1,4 +1,4 @@ -package block_queue +package blocks import ( "errors" From 2d04bb960a72ea6906293d0deda7f3b2bf524273 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 1 Feb 2024 09:00:13 -0800 Subject: [PATCH 14/23] rename module --- engine/execution/ingestion/{blocks => block_queue}/queue.go | 2 +- .../execution/ingestion/{blocks => block_queue}/queue_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename engine/execution/ingestion/{blocks => block_queue}/queue.go (99%) rename engine/execution/ingestion/{blocks => block_queue}/queue_test.go (99%) diff --git a/engine/execution/ingestion/blocks/queue.go b/engine/execution/ingestion/block_queue/queue.go similarity index 99% rename from engine/execution/ingestion/blocks/queue.go rename to engine/execution/ingestion/block_queue/queue.go index bc0347dae31..3d42d50bd5a 100644 --- a/engine/execution/ingestion/blocks/queue.go +++ b/engine/execution/ingestion/block_queue/queue.go @@ -1,4 +1,4 @@ -package blocks +package block_queue import ( "fmt" diff --git a/engine/execution/ingestion/blocks/queue_test.go b/engine/execution/ingestion/block_queue/queue_test.go similarity index 99% rename from engine/execution/ingestion/blocks/queue_test.go rename to engine/execution/ingestion/block_queue/queue_test.go index 17add81b3a1..376f1138b51 100644 --- a/engine/execution/ingestion/blocks/queue_test.go +++ b/engine/execution/ingestion/block_queue/queue_test.go @@ -1,4 +1,4 @@ -package blocks +package block_queue import ( "errors" From 895e1a3575c77b847c50b2d364f1088eaa9acc2c Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 5 Feb 2024 09:25:59 -0800 Subject: [PATCH 15/23] fix lint --- engine/execution/ingestion/block_queue/queue.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/execution/ingestion/block_queue/queue.go b/engine/execution/ingestion/block_queue/queue.go index 3d42d50bd5a..b1a37ce9282 100644 --- a/engine/execution/ingestion/block_queue/queue.go +++ b/engine/execution/ingestion/block_queue/queue.go @@ -4,9 +4,10 @@ import ( "fmt" "sync" + "github.com/rs/zerolog" + "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/mempool/entity" - "github.com/rs/zerolog" ) var ErrMissingParent = fmt.Errorf("missing parent block") From 2ddeaf73169032e8acdec68339dd6c894a3a3193 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 29 Feb 2024 15:33:45 -0800 Subject: [PATCH 16/23] address review comments --- .../execution/ingestion/block_queue/queue.go | 11 +++--- .../ingestion/block_queue/queue_test.go | 34 +++++++++---------- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/engine/execution/ingestion/block_queue/queue.go b/engine/execution/ingestion/block_queue/queue.go index b1a37ce9282..8708424d24c 100644 --- a/engine/execution/ingestion/block_queue/queue.go +++ b/engine/execution/ingestion/block_queue/queue.go @@ -71,14 +71,14 @@ func NewBlockQueue(logger zerolog.Logger) *BlockQueue { } } -// OnBlock is called when a new block is received, the parentFinalState indicates +// HandleBlock is called when a new block is received, the parentFinalState indicates // whether its parent block has been executed. // Caller must ensure: // 1. blocks are passsed in order, i.e. parent block is passed in before its child block // 2. if a block's parent is not executed, then the parent block must be passed in first // 3. if a block's parent is executed, then the parent's finalState must be passed in // It returns (nil, nil, nil) if this block is a duplication -func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateCommitment) ( +func (q *BlockQueue) HandleBlock(block *flow.Block, parentFinalState *flow.StateCommitment) ( []*MissingCollection, // missing collections []*entity.ExecutableBlock, // blocks ready to execute error, // exceptions @@ -112,7 +112,10 @@ func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateComm return nil, executables, nil } - // this is an edge case could be ignored + // this means the caller think it's parent has not been executed, but the queue's internal state + // shows the parent has been executed, then it's probably a race condition where the call to + // inform the parent block has been executed arrives earlier than this call, which is an edge case + // and we can simply ignore this call. if executable.StartState != nil && parentFinalState == nil { q.log.Warn(). Str("blockID", blockID.String()). @@ -229,7 +232,7 @@ func (q *BlockQueue) OnCollection(collection *flow.Collection) ([]*entity.Execut // update collection colInfo.Collection.Transactions = collection.Transactions - // check if any block, which includes this collection, become executable + // check if any block, which includes this collection, became executable executables := make([]*entity.ExecutableBlock, 0, len(colInfo.IncludedIn)) for _, block := range colInfo.IncludedIn { if !block.IsComplete() { diff --git a/engine/execution/ingestion/block_queue/queue_test.go b/engine/execution/ingestion/block_queue/queue_test.go index 376f1138b51..98d0d31fbc1 100644 --- a/engine/execution/ingestion/block_queue/queue_test.go +++ b/engine/execution/ingestion/block_queue/queue_test.go @@ -29,7 +29,7 @@ func TestSingleBlockBecomeReady(t *testing.T) { requireExecutableHas(t, executables) // verify receving a block (A) will return missing collection (C1) - missing, executables, err := q.OnBlock(blockA, commitFor("R")) + missing, executables, err := q.HandleBlock(blockA, commitFor("R")) require.NoError(t, err) require.Empty(t, executables) requireCollectionHas(t, missing, c1) @@ -66,34 +66,34 @@ func TestMultipleBlockBecomesReady(t *testing.T) { q := NewBlockQueue(unittest.Logger()) // verify receiving blocks without collections will return missing collections and no executables - missing, executables, err := q.OnBlock(blockA, commitFor("R")) + missing, executables, err := q.HandleBlock(blockA, commitFor("R")) require.NoError(t, err) require.Empty(t, executables) requireCollectionHas(t, missing, c1) - missing, executables, err = q.OnBlock(blockB, nil) + missing, executables, err = q.HandleBlock(blockB, nil) require.NoError(t, err) require.Empty(t, executables) // because A is not executed requireCollectionHas(t, missing, c2, c3) // creating forks - missing, executables, err = q.OnBlock(blockE, nil) + missing, executables, err = q.HandleBlock(blockE, nil) require.NoError(t, err) require.Empty(t, executables) // because A is not executed requireCollectionHas(t, missing, c4, c5) // creating forks with empty block - missing, executables, err = q.OnBlock(blockG, nil) + missing, executables, err = q.HandleBlock(blockG, nil) require.NoError(t, err) require.Empty(t, executables) // because E is not executed requireCollectionHas(t, missing) - missing, executables, err = q.OnBlock(blockF, nil) + missing, executables, err = q.HandleBlock(blockF, nil) require.NoError(t, err) require.Empty(t, executables) // because E is not executed requireCollectionHas(t, missing, c6) - missing, executables, err = q.OnBlock(blockC, nil) + missing, executables, err = q.HandleBlock(blockC, nil) require.NoError(t, err) require.Empty(t, executables) // because B is not executed require.Empty(t, missing) @@ -147,7 +147,7 @@ func TestMultipleBlockBecomesReady(t *testing.T) { requireExecutableHas(t, executables) // verify receiving a block whose parent was executed before - missing, executables, err = q.OnBlock(blockD, commitFor("C")) + missing, executables, err = q.HandleBlock(blockD, commitFor("C")) require.NoError(t, err) require.Empty(t, missing) requireExecutableHas(t, executables, blockD) @@ -181,26 +181,26 @@ func TestOnForksWithSameCollections(t *testing.T) { q := NewBlockQueue(unittest.Logger()) - missing, executables, err := q.OnBlock(blockA, commitFor("R")) + missing, executables, err := q.HandleBlock(blockA, commitFor("R")) require.NoError(t, err) requireExecutableHas(t, executables, blockA) requireCollectionHas(t, missing) // receiving block B and D which have the same collections (C1, C2) - missing, executables, err = q.OnBlock(blockB, nil) + missing, executables, err = q.HandleBlock(blockB, nil) require.NoError(t, err) require.Empty(t, executables) requireCollectionHas(t, missing, c1, c2) // receiving block F (C1, C2, C3) - missing, executables, err = q.OnBlock(blockF, nil) + missing, executables, err = q.HandleBlock(blockF, nil) require.NoError(t, err) require.Empty(t, executables) requireCollectionHas(t, missing, c3) // c1 and c2 are requested before, only c3 is missing // verify receiving D will not return any missing collections because // missing collections were returned when receiving B - missing, executables, err = q.OnBlock(blockD, nil) + missing, executables, err = q.HandleBlock(blockD, nil) require.NoError(t, err) require.Empty(t, executables) requireCollectionHas(t, missing) @@ -223,12 +223,12 @@ func TestOnForksWithSameCollections(t *testing.T) { // but only one block (C) whose parent (B) is executed, then only that block (C) becomes executable // the other block (E) is not executable - missing, executables, err = q.OnBlock(blockC, nil) + missing, executables, err = q.HandleBlock(blockC, nil) require.NoError(t, err) require.Empty(t, executables) requireCollectionHas(t, missing) // because C3 is requested when F is received - missing, executables, err = q.OnBlock(blockE, nil) + missing, executables, err = q.HandleBlock(blockE, nil) require.NoError(t, err) require.Empty(t, executables) requireCollectionHas(t, missing) @@ -277,7 +277,7 @@ func TestOnBlockWithMissingParentCommit(t *testing.T) { q := NewBlockQueue(unittest.Logger()) - missing, executables, err := q.OnBlock(blockA, commitFor("R")) + missing, executables, err := q.HandleBlock(blockA, commitFor("R")) require.NoError(t, err) require.Empty(t, executables) requireCollectionHas(t, missing, c1) @@ -296,11 +296,11 @@ func TestOnBlockWithMissingParentCommit(t *testing.T) { requireQueueIsEmpty(t, q) // verify when race condition happens, ErrMissingParent will be returned - _, _, err = q.OnBlock(blockB, nil) + _, _, err = q.HandleBlock(blockB, nil) require.True(t, errors.Is(err, ErrMissingParent), err) // verify if called again with parent commit, it will be successful - missing, executables, err = q.OnBlock(blockB, commitFor("A")) + missing, executables, err = q.HandleBlock(blockB, commitFor("A")) require.NoError(t, err) require.Empty(t, executables) requireCollectionHas(t, missing, c2, c3) From 3e49775c57fe37fcb8c301d34bb8d2398cc47844 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 29 Feb 2024 15:39:12 -0800 Subject: [PATCH 17/23] rename OnCollection to HandleCollection --- .../execution/ingestion/block_queue/queue.go | 4 +-- .../ingestion/block_queue/queue_test.go | 30 +++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/engine/execution/ingestion/block_queue/queue.go b/engine/execution/ingestion/block_queue/queue.go index 8708424d24c..8fed60ba141 100644 --- a/engine/execution/ingestion/block_queue/queue.go +++ b/engine/execution/ingestion/block_queue/queue.go @@ -205,9 +205,9 @@ func (q *BlockQueue) HandleBlock(block *flow.Block, parentFinalState *flow.State return missingCollections, executables, nil } -// OnCollection is called when a new collection is received +// HandleCollection is called when a new collection is received // It returns a list of executable blocks that contains the collection -func (q *BlockQueue) OnCollection(collection *flow.Collection) ([]*entity.ExecutableBlock, error) { +func (q *BlockQueue) HandleCollection(collection *flow.Collection) ([]*entity.ExecutableBlock, error) { q.Lock() defer q.Unlock() // when a collection is received, we find the blocks the collection is included in, diff --git a/engine/execution/ingestion/block_queue/queue_test.go b/engine/execution/ingestion/block_queue/queue_test.go index 98d0d31fbc1..664280b8320 100644 --- a/engine/execution/ingestion/block_queue/queue_test.go +++ b/engine/execution/ingestion/block_queue/queue_test.go @@ -24,7 +24,7 @@ func TestSingleBlockBecomeReady(t *testing.T) { q := NewBlockQueue(unittest.Logger()) // verify receving a collection (C1) before its block (A) will be ignored - executables, err := q.OnCollection(c1) + executables, err := q.HandleCollection(c1) require.NoError(t, err) requireExecutableHas(t, executables) @@ -35,12 +35,12 @@ func TestSingleBlockBecomeReady(t *testing.T) { requireCollectionHas(t, missing, c1) // verify receving a collection (C2) that is not for the block (A) will be ignored - executables, err = q.OnCollection(c2) + executables, err = q.HandleCollection(c2) require.NoError(t, err) requireExecutableHas(t, executables) // verify after receiving all collections (C1), block (A) becomes executable - executables, err = q.OnCollection(c1) + executables, err = q.HandleCollection(c1) require.NoError(t, err) requireExecutableHas(t, executables, blockA) @@ -99,12 +99,12 @@ func TestMultipleBlockBecomesReady(t *testing.T) { require.Empty(t, missing) // verify receiving all collections makes block executable - executables, err = q.OnCollection(c1) + executables, err = q.HandleCollection(c1) require.NoError(t, err) requireExecutableHas(t, executables, blockA) // verify receiving partial collections won't make block executable - executables, err = q.OnCollection(c2) + executables, err = q.HandleCollection(c2) require.NoError(t, err) requireExecutableHas(t, executables) // because A is not executed and C3 is not received for B to be executable @@ -116,19 +116,19 @@ func TestMultipleBlockBecomesReady(t *testing.T) { // verify when parent block (A) has been executed, the child block (B) has all the collections // it will become executable - executables, err = q.OnCollection(c3) + executables, err = q.HandleCollection(c3) require.NoError(t, err) requireExecutableHas(t, executables, blockB) // c2, c3 are received, blockB is executable - executables, err = q.OnCollection(c5) + executables, err = q.HandleCollection(c5) require.NoError(t, err) requireExecutableHas(t, executables) // c2, c3 are received, blockB is executable - executables, err = q.OnCollection(c6) + executables, err = q.HandleCollection(c6) require.NoError(t, err) requireExecutableHas(t, executables) // c2, c3 are received, blockB is executable - executables, err = q.OnCollection(c4) + executables, err = q.HandleCollection(c4) require.NoError(t, err) requireExecutableHas(t, executables, blockE) // c2, c3 are received, blockB is executable @@ -206,7 +206,7 @@ func TestOnForksWithSameCollections(t *testing.T) { requireCollectionHas(t, missing) // verify receiving all collections makes all blocks executable - executables, err = q.OnCollection(c1) + executables, err = q.HandleCollection(c1) require.NoError(t, err) requireExecutableHas(t, executables) @@ -215,7 +215,7 @@ func TestOnForksWithSameCollections(t *testing.T) { require.NoError(t, err) requireExecutableHas(t, executables) // because C2 is not received - executables, err = q.OnCollection(c2) + executables, err = q.HandleCollection(c2) require.NoError(t, err) requireExecutableHas(t, executables, blockB, blockD) @@ -239,7 +239,7 @@ func TestOnForksWithSameCollections(t *testing.T) { // verify C and F are executable, because their parent have been executed // E is not executable, because E's parent (D) is not executed yet. - executables, err = q.OnCollection(c3) + executables, err = q.HandleCollection(c3) require.NoError(t, err) requireExecutableHas(t, executables, blockC, blockF) @@ -283,7 +283,7 @@ func TestOnBlockWithMissingParentCommit(t *testing.T) { requireCollectionHas(t, missing, c1) // block A has all the collections and become executable - executables, err = q.OnCollection(c1) + executables, err = q.HandleCollection(c1) require.NoError(t, err) requireExecutableHas(t, executables, blockA) @@ -306,11 +306,11 @@ func TestOnBlockWithMissingParentCommit(t *testing.T) { requireCollectionHas(t, missing, c2, c3) // verify after receiving all collections, B becomes executable - executables, err = q.OnCollection(c2) + executables, err = q.HandleCollection(c2) require.NoError(t, err) require.Empty(t, executables) - executables, err = q.OnCollection(c3) + executables, err = q.HandleCollection(c3) require.NoError(t, err) requireExecutableHas(t, executables, blockB) From 92a7f7045df81434da55061d6b5fa1a5e298b4d1 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 1 Mar 2024 09:18:55 -0800 Subject: [PATCH 18/23] address review comments --- .../execution/ingestion/block_queue/queue.go | 99 +++++++++++-------- 1 file changed, 57 insertions(+), 42 deletions(-) diff --git a/engine/execution/ingestion/block_queue/queue.go b/engine/execution/ingestion/block_queue/queue.go index 8fed60ba141..905dcfca086 100644 --- a/engine/execution/ingestion/block_queue/queue.go +++ b/engine/execution/ingestion/block_queue/queue.go @@ -89,51 +89,13 @@ func (q *BlockQueue) HandleBlock(block *flow.Block, parentFinalState *flow.State // check if the block already exists blockID := block.ID() executable, ok := q.blocks[blockID] - // handle the case where the block has seen before if ok { - // we have already received this block, and its parent still has not been executed yet - if executable.StartState == nil && parentFinalState == nil { - return nil, nil, nil - } - - // this is an edge case where parentFinalState is provided, and its parent block exists - // in the queue but has not been marked as executed yet (OnBlockExecuted(parent) is not called), - // in this case, we will internally call OnBlockExecuted(parentBlockID, parentFinalState). - // there is no need to create the executable block again, since it's already created. - if executable.StartState == nil && parentFinalState != nil { - executables, err := q.onBlockExecuted(block.Header.ParentID, *parentFinalState) - if err != nil { - return nil, nil, fmt.Errorf("receiving block %v with parent commitment %v, but parent block %v already exists with no commitment, fail to call mark parent as executed: %w", - blockID, *parentFinalState, block.Header.ParentID, err) - } - - // we already have this block, its collection must have been fetched, so we only return the - // executables from marking its parent as executed. - return nil, executables, nil - } - - // this means the caller think it's parent has not been executed, but the queue's internal state - // shows the parent has been executed, then it's probably a race condition where the call to - // inform the parent block has been executed arrives earlier than this call, which is an edge case - // and we can simply ignore this call. - if executable.StartState != nil && parentFinalState == nil { - q.log.Warn(). - Str("blockID", blockID.String()). - Hex("parentID", block.Header.ParentID[:]). - Msg("edge case: receiving block with no parent commitment, but its parent block actually has been executed") - return nil, nil, nil - } - - // this is an exception that should not happen - if *executable.StartState != *parentFinalState { - return nil, nil, - fmt.Errorf("block %s has already been executed with a different parent final state, %v != %v", - blockID, *executable.StartState, parentFinalState) - } - - return nil, nil, nil + // handle the case where the block has seen before + return q.handleKnownBlock(executable, parentFinalState) } + // handling a new block + // if parentFinalState is not provided, then its parent block must exists in the queue // otherwise it's an exception if parentFinalState == nil { @@ -262,6 +224,59 @@ func (q *BlockQueue) OnBlockExecuted( return q.onBlockExecuted(blockID, commit) } +func (q *BlockQueue) handleKnownBlock(executable *entity.ExecutableBlock, parentFinalState *flow.StateCommitment) ( + []*MissingCollection, // missing collections + []*entity.ExecutableBlock, // blocks ready to execute + error, // exceptions +) { + // we have already received this block, and its parent still has not been executed yet + if executable.StartState == nil && parentFinalState == nil { + return nil, nil, nil + } + + // this is an edge case where parentFinalState is provided, and its parent block exists + // in the queue but has not been marked as executed yet (OnBlockExecuted(parent) is not called), + // in this case, we will internally call OnBlockExecuted(parentBlockID, parentFinalState). + // there is no need to create the executable block again, since it's already created. + if executable.StartState == nil && parentFinalState != nil { + executables, err := q.onBlockExecuted(executable.Block.Header.ParentID, *parentFinalState) + if err != nil { + return nil, nil, fmt.Errorf("receiving block %v with parent commitment %v, but parent block %v already exists with no commitment, fail to call mark parent as executed: %w", + executable.ID(), *parentFinalState, executable.Block.Header.ParentID, err) + } + + // we already have this block, its collection must have been fetched, so we only return the + // executables from marking its parent as executed. + return nil, executables, nil + } + + // this means the caller think it's parent has not been executed, but the queue's internal state + // shows the parent has been executed, then it's probably a race condition where the call to + // inform the parent block has been executed arrives earlier than this call, which is an edge case + // and we can simply ignore this call. + if executable.StartState != nil && parentFinalState == nil { + q.log.Warn(). + Str("blockID", executable.ID().String()). + Uint64("height", executable.Block.Header.Height). + Hex("parentID", executable.Block.Header.ParentID[:]). + Msg("edge case: receiving block with no parent commitment, but its parent block actually has been executed") + return nil, nil, nil + } + + // this is an exception that should not happen + if *executable.StartState != *parentFinalState { + return nil, nil, + fmt.Errorf("block %s has already been executed with a different parent final state, %v != %v", + executable.ID(), *executable.StartState, parentFinalState) + } + + q.log.Warn(). + Str("blockID", executable.ID().String()). + Uint64("height", executable.Block.Header.Height). + Msg("edge case: OnBlockExecuted is called with the same arguments again") + return nil, nil, nil +} + func (q *BlockQueue) onBlockExecuted( blockID flow.Identifier, commit flow.StateCommitment, From 00c0818cc26cbfd8c541fc7f00b48aecaa0d5ed4 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 1 Mar 2024 09:34:48 -0800 Subject: [PATCH 19/23] address review comments --- engine/execution/ingestion/block_queue/queue.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/engine/execution/ingestion/block_queue/queue.go b/engine/execution/ingestion/block_queue/queue.go index 905dcfca086..dafdd528ffb 100644 --- a/engine/execution/ingestion/block_queue/queue.go +++ b/engine/execution/ingestion/block_queue/queue.go @@ -371,7 +371,10 @@ func (q *BlockQueue) checkIfChildBlockBecomeExecutable( // Useful for debugging what is missing for the next unexecuted block to become executable. // It returns an error if the block is not found func (q *BlockQueue) GetMissingCollections(blockID flow.Identifier) ( - []*MissingCollection, *flow.StateCommitment, error) { + []*MissingCollection, + *flow.StateCommitment, + error, +) { q.Lock() defer q.Unlock() block, ok := q.blocks[blockID] From 5e8f6d2a65d541c54705fb217d6a45cb549302d4 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Mon, 4 Mar 2024 18:18:37 -0800 Subject: [PATCH 20/23] add test --- .../ingestion/block_queue/queue_test.go | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/engine/execution/ingestion/block_queue/queue_test.go b/engine/execution/ingestion/block_queue/queue_test.go index 664280b8320..ce496c163ae 100644 --- a/engine/execution/ingestion/block_queue/queue_test.go +++ b/engine/execution/ingestion/block_queue/queue_test.go @@ -168,6 +168,50 @@ func TestMultipleBlockBecomesReady(t *testing.T) { requireQueueIsEmpty(t, q) } +func TestOneReadyAndMultiplePending(t *testing.T) { + t.Parallel() + // Given a chain + // R() <- A() <- B(C1, C2) <- C(C3) + // - ^----- D(C1, C2) <- E(C3) + // - ^----- F(C1, C2, C3) + block, coll, commitFor := makeChainABCDEF() + blockA, blockB, blockC := block("A"), block("B"), block("C") + c1, c2, c3 := coll(1), coll(2), coll(3) + + q := NewBlockQueue(unittest.Logger()) + _, _, err := q.HandleBlock(blockA, commitFor("R")) + require.NoError(t, err) + + // received B when A is not execured + missing, executables, err := q.HandleBlock(blockB, nil) + require.NoError(t, err) + require.Empty(t, executables) + requireCollectionHas(t, missing, c1, c2) + + executables, err = q.HandleCollection(c1) + require.NoError(t, err) + + executables, err = q.HandleCollection(c2) + require.NoError(t, err) + + // received C when B is not executed + missing, executables, err = q.HandleBlock(blockB, nil) + require.NoError(t, err) + + executables, err = q.HandleCollection(c3) + require.NoError(t, err) + + // A is executed + executables, err = q.OnBlockExecuted(blockA.ID(), *commitFor("A")) + require.NoError(t, err) + requireExecutableHas(t, executables, blockB) // B is executable + + // B is executed + executables, err = q.OnBlockExecuted(blockB.ID(), *commitFor("B")) + require.NoError(t, err) + requireExecutableHas(t, executables, blockC) // C is executable +} + func TestOnForksWithSameCollections(t *testing.T) { t.Parallel() // Given a chain From b88346b97dab79b9a81d342301a7e5cabda25cd2 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Tue, 5 Mar 2024 09:51:55 -0800 Subject: [PATCH 21/23] fix test --- engine/execution/ingestion/block_queue/queue_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/execution/ingestion/block_queue/queue_test.go b/engine/execution/ingestion/block_queue/queue_test.go index ce496c163ae..c17a6175c1e 100644 --- a/engine/execution/ingestion/block_queue/queue_test.go +++ b/engine/execution/ingestion/block_queue/queue_test.go @@ -195,7 +195,7 @@ func TestOneReadyAndMultiplePending(t *testing.T) { require.NoError(t, err) // received C when B is not executed - missing, executables, err = q.HandleBlock(blockB, nil) + missing, executables, err = q.HandleBlock(blockC, nil) require.NoError(t, err) executables, err = q.HandleCollection(c3) From e39315a2ea75a434f7bd89a075c1b2c1843ea050 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 7 Mar 2024 09:27:44 -0800 Subject: [PATCH 22/23] fix tests lint --- engine/execution/ingestion/block_queue/queue_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/engine/execution/ingestion/block_queue/queue_test.go b/engine/execution/ingestion/block_queue/queue_test.go index c17a6175c1e..baf72c21162 100644 --- a/engine/execution/ingestion/block_queue/queue_test.go +++ b/engine/execution/ingestion/block_queue/queue_test.go @@ -188,17 +188,17 @@ func TestOneReadyAndMultiplePending(t *testing.T) { require.Empty(t, executables) requireCollectionHas(t, missing, c1, c2) - executables, err = q.HandleCollection(c1) + _, err = q.HandleCollection(c1) require.NoError(t, err) - executables, err = q.HandleCollection(c2) + _, err = q.HandleCollection(c2) require.NoError(t, err) // received C when B is not executed - missing, executables, err = q.HandleBlock(blockC, nil) + _, _, err = q.HandleBlock(blockC, nil) require.NoError(t, err) - executables, err = q.HandleCollection(c3) + _, err = q.HandleCollection(c3) require.NoError(t, err) // A is executed From 63671f8fa144e5f35876845d15e3ea08f0db5a2c Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Tue, 26 Mar 2024 16:35:19 -0700 Subject: [PATCH 23/23] update comments --- engine/execution/ingestion/block_queue/queue.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/engine/execution/ingestion/block_queue/queue.go b/engine/execution/ingestion/block_queue/queue.go index dafdd528ffb..d3ea82b005b 100644 --- a/engine/execution/ingestion/block_queue/queue.go +++ b/engine/execution/ingestion/block_queue/queue.go @@ -21,12 +21,17 @@ type BlockQueue struct { sync.Mutex log zerolog.Logger - // if a block still exists in this map, it means either some of its collection is missing, - // or its parent block has not been executed. - // if a block's StartState is not nil, it means its parent block has been executed, and - // its parent block must have been removed from this map - // if a block's StartState is nil, it means its parent block has not been executed yet. - // and its parent must be found in the this map as well + // if a block still exists in this map, it means the block has not been executed. + // it could either be one of the following cases: + // 1) block is not executed due to some of its collection is missing + // 2) block is not executed due to its parent block has not been executed + // 3) block is ready to execute, but the execution has not been finished yet. + // some consistency checks: + // 1) since an executed block must have been removed from this map, if a block's + // parent block has been executed, then its parent block must have been removed + // from this map + // 2) if a block's parent block has not been executed, then its parent block must still + // exist in this map blocks map[flow.Identifier]*entity.ExecutableBlock // a collection could be included in multiple blocks,