Skip to content

Commit

Permalink
Merge pull request #6850 from onflow/tim/5357-immutable-chunk-assignment
Browse files Browse the repository at this point in the history
Make chunk assignment immutable
  • Loading branch information
tim-barry authored Jan 13, 2025
2 parents 23c5222 + 115aeb5 commit b740fc0
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 125 deletions.
7 changes: 5 additions & 2 deletions engine/consensus/approvals/approval_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ func NewApprovalCollector(
) (*ApprovalCollector, error) {
chunkCollectors := make([]*ChunkApprovalCollector, 0, result.Result.Chunks.Len())
for _, chunk := range result.Result.Chunks {
chunkAssignment := assignment.Verifiers(chunk).Lookup()
collector := NewChunkApprovalCollector(chunkAssignment, requiredApprovalsForSealConstruction)
assignedVerifiers, err := assignment.Verifiers(chunk.Index)
if err != nil {
return nil, fmt.Errorf("getting verifiers for chunk %d failed: %w", chunk.Index, err)
}
collector := NewChunkApprovalCollector(assignedVerifiers, requiredApprovalsForSealConstruction)
chunkCollectors = append(chunkCollectors, collector)
}

Expand Down
9 changes: 8 additions & 1 deletion engine/consensus/approvals/approval_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,14 @@ func (s *ApprovalCollectorTestSuite) TestCollectMissingVerifiers() {

assignedVerifiers := make(map[uint64]flow.IdentifierList)
for _, chunk := range s.Chunks {
assignedVerifiers[chunk.Index] = s.ChunksAssignment.Verifiers(chunk)
verifiers, err := s.ChunksAssignment.Verifiers(chunk.Index)
require.NoError(s.T(), err)
// we need a consistent iteration order later, so convert to slice
v := make([]flow.Identifier, 0, len(verifiers))
for id := range verifiers {
v = append(v, id)
}
assignedVerifiers[chunk.Index] = v
}

// no approvals processed
Expand Down
7 changes: 3 additions & 4 deletions engine/consensus/approvals/chunk_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ type ChunkApprovalCollectorTestSuite struct {
func (s *ChunkApprovalCollectorTestSuite) SetupTest() {
s.BaseApprovalsTestSuite.SetupTest()
s.chunk = s.Chunks[0]
s.chunkAssignment = make(map[flow.Identifier]struct{})
for _, verifier := range s.ChunksAssignment.Verifiers(s.chunk) {
s.chunkAssignment[verifier] = struct{}{}
}
verifiers, err := s.ChunksAssignment.Verifiers(s.chunk.Index)
require.NoError(s.T(), err)
s.chunkAssignment = verifiers
s.collector = NewChunkApprovalCollector(s.chunkAssignment, uint(len(s.chunkAssignment)))
}

Expand Down
6 changes: 4 additions & 2 deletions engine/consensus/approvals/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/gammazero/workerpool"
"github.com/onflow/crypto/hash"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/onflow/flow-go/model/chunks"
Expand Down Expand Up @@ -42,7 +43,7 @@ func (s *BaseApprovalsTestSuite) SetupTest() {
s.Block = unittest.BlockHeaderWithParentFixture(s.ParentBlock)
verifiers := make(flow.IdentifierList, 0)
s.AuthorizedVerifiers = make(map[flow.Identifier]*flow.Identity)
s.ChunksAssignment = chunks.NewAssignment()
assignmentBuilder := chunks.NewAssignmentBuilder()
s.Chunks = unittest.ChunkListFixture(50, s.Block.ID())
// mock public key to mock signature verifications
s.PublicKey = &module.PublicKey{}
Expand All @@ -59,8 +60,9 @@ func (s *BaseApprovalsTestSuite) SetupTest() {

// create assignment
for _, chunk := range s.Chunks {
s.ChunksAssignment.Add(chunk, verifiers)
require.NoError(s.T(), assignmentBuilder.Add(chunk.Index, verifiers))
}
s.ChunksAssignment = assignmentBuilder.Build()

s.VerID = verifiers[0]
result := unittest.ExecutionResultFixture()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,18 @@ func (s *AssignmentCollectorTestSuite) TestProcessApproval_BeforeIncorporatedRes
// rate limiting is respected.
func (s *AssignmentCollectorTestSuite) TestRequestMissingApprovals() {
// build new assignment with 2 verifiers
assignment := chunks.NewAssignment()
assignmentBuilder := chunks.NewAssignmentBuilder()
for _, chunk := range s.Chunks {
verifiers := s.ChunksAssignment.Verifiers(chunk)
assignment.Add(chunk, verifiers[:2])
verifiers, err := s.ChunksAssignment.Verifiers(chunk.Index)
require.NoError(s.T(), err)
v := make([]flow.Identifier, 0, len(verifiers))
for id := range verifiers {
v = append(v, id)
}
require.NoError(s.T(), assignmentBuilder.Add(chunk.Index, v[:2]))
}
// replace old one
s.ChunksAssignment = assignment
s.ChunksAssignment = assignmentBuilder.Build()

incorporatedBlocks := make([]*flow.Header, 0)

Expand Down
5 changes: 3 additions & 2 deletions engine/consensus/sealing/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,12 +594,13 @@ func (s *ApprovalProcessingCoreTestSuite) TestRequestPendingApprovals() {

prevResult = ir.Result

s.ChunksAssignment = chunks.NewAssignment()
assignmentBuilder := chunks.NewAssignmentBuilder()

for _, chunk := range ir.Result.Chunks {
// assign the verifier to this chunk
s.ChunksAssignment.Add(chunk, verifiers)
require.NoError(s.T(), assignmentBuilder.Add(chunk.Index, verifiers))
}
s.ChunksAssignment = assignmentBuilder.Build()

err := s.core.processIncorporatedResult(ir)
require.NoError(s.T(), err)
Expand Down
32 changes: 16 additions & 16 deletions engine/verification/assigner/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func SetupTest(options ...func(suite *AssignerEngineTestSuite)) *AssignerEngineT

// createContainerBlock creates and returns a block that contains an execution receipt, with its corresponding chunks assignment based
// on the input options.
func createContainerBlock(options ...func(result *flow.ExecutionResult, assignments *chunks.Assignment)) (*flow.Block, *chunks.Assignment) {
func createContainerBlock(options ...func(result *flow.ExecutionResult, assignments *chunks.AssignmentBuilder)) (*flow.Block, *chunks.Assignment) {
result, assignment := vertestutils.CreateExecutionResult(unittest.IdentifierFixture(), options...)
receipt := &flow.ExecutionReceipt{
ExecutorID: unittest.IdentifierFixture(),
Expand Down Expand Up @@ -159,7 +159,7 @@ func newBlockHappyPath(t *testing.T) {
// one assigned chunk to verification node.
containerBlock, assignment := createContainerBlock(
vertestutils.WithChunks(
vertestutils.WithAssignee(s.myID())))
vertestutils.WithAssignee(t, s.myID())))
result := containerBlock.Payload.Results[0]
s.mockStateAtBlockID(result.BlockID)
chunksNum := s.mockChunkAssigner(flow.NewIncorporatedResult(containerBlock.ID(), result), assignment)
Expand Down Expand Up @@ -206,9 +206,9 @@ func newBlockVerifierNotAuthorized(t *testing.T) {
// no assigned chunk to verification node.
containerBlock, _ := createContainerBlock(
vertestutils.WithChunks( // all chunks assigned to some (random) identifiers, but not this verification node
vertestutils.WithAssignee(unittest.IdentifierFixture()),
vertestutils.WithAssignee(unittest.IdentifierFixture()),
vertestutils.WithAssignee(unittest.IdentifierFixture())))
vertestutils.WithAssignee(t, unittest.IdentifierFixture()),
vertestutils.WithAssignee(t, unittest.IdentifierFixture()),
vertestutils.WithAssignee(t, unittest.IdentifierFixture())))
result := containerBlock.Payload.Results[0]
s.mockStateAtBlockID(result.BlockID)

Expand Down Expand Up @@ -300,11 +300,11 @@ func newBlockNoAssignedChunk(t *testing.T) {
// none of them is assigned to this verification node.
containerBlock, assignment := createContainerBlock(
vertestutils.WithChunks(
vertestutils.WithAssignee(unittest.IdentifierFixture()), // assigned to others
vertestutils.WithAssignee(unittest.IdentifierFixture()), // assigned to others
vertestutils.WithAssignee(unittest.IdentifierFixture()), // assigned to others
vertestutils.WithAssignee(unittest.IdentifierFixture()), // assigned to others
vertestutils.WithAssignee(unittest.IdentifierFixture()))) // assigned to others
vertestutils.WithAssignee(t, unittest.IdentifierFixture()), // assigned to others
vertestutils.WithAssignee(t, unittest.IdentifierFixture()), // assigned to others
vertestutils.WithAssignee(t, unittest.IdentifierFixture()), // assigned to others
vertestutils.WithAssignee(t, unittest.IdentifierFixture()), // assigned to others
vertestutils.WithAssignee(t, unittest.IdentifierFixture()))) // assigned to others
result := containerBlock.Payload.Results[0]
s.mockStateAtBlockID(result.BlockID)
chunksNum := s.mockChunkAssigner(flow.NewIncorporatedResult(containerBlock.ID(), result), assignment)
Expand Down Expand Up @@ -340,11 +340,11 @@ func newBlockMultipleAssignment(t *testing.T) {
// only 3 of them is assigned to this verification node.
containerBlock, assignment := createContainerBlock(
vertestutils.WithChunks(
vertestutils.WithAssignee(unittest.IdentifierFixture()), // assigned to others
vertestutils.WithAssignee(s.myID()), // assigned to me
vertestutils.WithAssignee(s.myID()), // assigned to me
vertestutils.WithAssignee(unittest.IdentifierFixture()), // assigned to others
vertestutils.WithAssignee(s.myID()))) // assigned to me
vertestutils.WithAssignee(t, unittest.IdentifierFixture()), // assigned to others
vertestutils.WithAssignee(t, s.myID()), // assigned to me
vertestutils.WithAssignee(t, s.myID()), // assigned to me
vertestutils.WithAssignee(t, unittest.IdentifierFixture()), // assigned to others
vertestutils.WithAssignee(t, s.myID()))) // assigned to me
result := containerBlock.Payload.Results[0]
s.mockStateAtBlockID(result.BlockID)
chunksNum := s.mockChunkAssigner(flow.NewIncorporatedResult(containerBlock.ID(), result), assignment)
Expand Down Expand Up @@ -383,7 +383,7 @@ func chunkQueueUnhappyPathDuplicate(t *testing.T) {
// creates a container block, with a single receipt, that contains a single chunk assigned
// to verification node.
containerBlock, assignment := createContainerBlock(
vertestutils.WithChunks(vertestutils.WithAssignee(s.myID())))
vertestutils.WithChunks(vertestutils.WithAssignee(t, s.myID())))
result := containerBlock.Payload.Results[0]
s.mockStateAtBlockID(result.BlockID)
chunksNum := s.mockChunkAssigner(flow.NewIncorporatedResult(containerBlock.ID(), result), assignment)
Expand Down
9 changes: 6 additions & 3 deletions engine/verification/utils/mocked.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ func (m *MockAssigner) Assign(result *flow.ExecutionResult, blockID flow.Identif
if len(result.Chunks) == 0 {
return nil, fmt.Errorf("assigner called with empty chunk list")
}
a := chmodel.NewAssignment()
a := chmodel.NewAssignmentBuilder()
for _, c := range result.Chunks {
if m.isAssigned(c.Index) {
a.Add(c, flow.IdentifierList{m.me})
err := a.Add(c.Index, flow.IdentifierList{m.me})
if err != nil {
return nil, err
}
}
}

return a, nil
return a.Build(), nil
}
36 changes: 21 additions & 15 deletions engine/verification/utils/unittest/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,23 +266,23 @@ func isSystemChunk(index uint64, chunkNum int) bool {
return int(index) == chunkNum-1
}

func CreateExecutionResult(blockID flow.Identifier, options ...func(result *flow.ExecutionResult, assignments *chunks.Assignment)) (*flow.ExecutionResult, *chunks.Assignment) {
func CreateExecutionResult(blockID flow.Identifier, options ...func(result *flow.ExecutionResult, assignments *chunks.AssignmentBuilder)) (*flow.ExecutionResult, *chunks.Assignment) {
result := &flow.ExecutionResult{
BlockID: blockID,
Chunks: flow.ChunkList{},
}
assignments := chunks.NewAssignment()
assignmentsBuilder := chunks.NewAssignmentBuilder()

for _, option := range options {
option(result, assignments)
option(result, assignmentsBuilder)
}
return result, assignments
return result, assignmentsBuilder.Build()
}

func WithChunks(setAssignees ...func(flow.Identifier, uint64, *chunks.Assignment) *flow.Chunk) func(*flow.ExecutionResult, *chunks.Assignment) {
return func(result *flow.ExecutionResult, assignment *chunks.Assignment) {
func WithChunks(setAssignees ...func(flow.Identifier, uint64, *chunks.AssignmentBuilder) *flow.Chunk) func(*flow.ExecutionResult, *chunks.AssignmentBuilder) {
return func(result *flow.ExecutionResult, assignmentBuilder *chunks.AssignmentBuilder) {
for i, setAssignee := range setAssignees {
chunk := setAssignee(result.BlockID, uint64(i), assignment)
chunk := setAssignee(result.BlockID, uint64(i), assignmentBuilder)
result.Chunks.Insert(chunk)
}
}
Expand All @@ -301,11 +301,11 @@ func ChunkWithIndex(blockID flow.Identifier, index int) *flow.Chunk {
return chunk
}

func WithAssignee(assignee flow.Identifier) func(flow.Identifier, uint64, *chunks.Assignment) *flow.Chunk {
return func(blockID flow.Identifier, index uint64, assignment *chunks.Assignment) *flow.Chunk {
func WithAssignee(t *testing.T, assignee flow.Identifier) func(flow.Identifier, uint64, *chunks.AssignmentBuilder) *flow.Chunk {
return func(blockID flow.Identifier, index uint64, assignmentBuilder *chunks.AssignmentBuilder) *flow.Chunk {
chunk := ChunkWithIndex(blockID, int(index))
fmt.Printf("with assignee: %v, chunk id: %v\n", index, chunk.ID())
assignment.Add(chunk, flow.IdentifierList{assignee})
require.NoError(t, assignmentBuilder.Add(chunk.Index, flow.IdentifierList{assignee}))
return chunk
}
}
Expand All @@ -323,7 +323,8 @@ type ChunkAssignerFunc func(chunkIndex uint64, chunks int) bool
//
// It returns the list of chunk locator ids assigned to the input verification nodes, as well as the list of their chunk IDs.
// All verification nodes are assigned the same chunks.
func MockChunkAssignmentFixture(chunkAssigner *mock.ChunkAssigner,
func MockChunkAssignmentFixture(t *testing.T,
chunkAssigner *mock.ChunkAssigner,
verIds flow.IdentityList,
completeERs CompleteExecutionReceiptList,
isAssigned ChunkAssignerFunc) (flow.IdentifierList, flow.IdentifierList) {
Expand All @@ -336,7 +337,7 @@ func MockChunkAssignmentFixture(chunkAssigner *mock.ChunkAssigner,

for _, completeER := range completeERs {
for _, receipt := range completeER.Receipts {
a := chunks.NewAssignment()
a := chunks.NewAssignmentBuilder()

_, duplicate := visited[receipt.ExecutionResult.ID()]
if duplicate {
Expand All @@ -352,12 +353,16 @@ func MockChunkAssignmentFixture(chunkAssigner *mock.ChunkAssigner,
}.ID()
expectedLocatorIds = append(expectedLocatorIds, locatorID)
expectedChunkIds = append(expectedChunkIds, chunk.ID())
a.Add(chunk, verIds.NodeIDs())
require.NoError(t, a.Add(chunk.Index, verIds.NodeIDs()))
} else {
// the chunk has no verifiers assigned
require.NoError(t, a.Add(chunk.Index, flow.IdentifierList{}))
}

}
assignment := a.Build()

chunkAssigner.On("Assign", &receipt.ExecutionResult, completeER.ContainerBlock.ID()).Return(a, nil)
chunkAssigner.On("Assign", &receipt.ExecutionResult, completeER.ContainerBlock.ID()).Return(assignment, nil)
visited[receipt.ExecutionResult.ID()] = struct{}{}
}
}
Expand Down Expand Up @@ -519,7 +524,8 @@ func withConsumers(t *testing.T,
assignedChunkIDs := flow.IdentifierList{}
if authorized {
// only authorized verification node has some chunks assigned to it.
_, assignedChunkIDs = MockChunkAssignmentFixture(chunkAssigner,
_, assignedChunkIDs = MockChunkAssignmentFixture(t,
chunkAssigner,
flow.IdentityList{verID.Identity()},
completeERs,
EvenChunkIndexAssigner)
Expand Down
Loading

0 comments on commit b740fc0

Please sign in to comment.