diff --git a/cmd/scaffold.go b/cmd/scaffold.go index e89f2d2b33d..93622b8bf74 100644 --- a/cmd/scaffold.go +++ b/cmd/scaffold.go @@ -27,6 +27,7 @@ import ( "github.com/onflow/flow-go/module/trace" jsoncodec "github.com/onflow/flow-go/network/codec/json" "github.com/onflow/flow-go/network/gossip/libp2p" + "github.com/onflow/flow-go/network/gossip/libp2p/topology" "github.com/onflow/flow-go/network/gossip/libp2p/validators" protocol "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/state/protocol/events" @@ -183,9 +184,18 @@ func (fnb *FlowNodeBuilder) enqueueNetworkInit() { return nil, fmt.Errorf("could not get node id: %w", err) } nodeRole := nodeID.Role - topology := libp2p.NewRandPermTopology(nodeRole) - net, err := libp2p.NewNetwork(fnb.Logger, codec, participants, fnb.Me, fnb.Middleware, 10e6, topology, fnb.Metrics.Network) + var nodeTopology topology.Topology + if nodeRole == flow.RoleCollection { + nodeTopology, err = topology.NewCollectionTopology(nodeID.NodeID, fnb.State) + } else { + nodeTopology, err = topology.NewRandPermTopology(nodeRole, nodeID.NodeID) + } + if err != nil { + return nil, fmt.Errorf("could not create topology: %w", err) + } + + net, err := libp2p.NewNetwork(fnb.Logger, codec, participants, fnb.Me, fnb.Middleware, 10e6, nodeTopology, fnb.Metrics.Network) if err != nil { return nil, fmt.Errorf("could not initialize network: %w", err) } diff --git a/engine/ghost/engine/all_connect_topology.go b/engine/ghost/engine/all_connect_topology.go deleted file mode 100644 index aed91acef5b..00000000000 --- a/engine/ghost/engine/all_connect_topology.go +++ /dev/null @@ -1,29 +0,0 @@ -package engine - -import ( - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/network/gossip/libp2p/middleware" -) - -var _ middleware.Topology = &AllConnectTopology{} - -// AllConnectTopology returns all the nodes as the target ids to connect to -// It is allows the current node to be directly connected to all the nodes -// NOTE: To be used only for testing -type AllConnectTopology struct { -} - -func NewAllConnectTopology() middleware.Topology { - return &AllConnectTopology{} -} - -func (a AllConnectTopology) Subset(idList flow.IdentityList, _ int, _ string) (map[flow.Identifier]flow.Identity, error) { - - // creates a map of all the ids - topMap := make(map[flow.Identifier]flow.Identity, len(idList)) - for _, v := range idList { - topMap[v.NodeID] = *v - } - - return topMap, nil -} diff --git a/model/flow/identity.go b/model/flow/identity.go index 65c5b4b8225..99fbb60f69d 100644 --- a/model/flow/identity.go +++ b/model/flow/identity.go @@ -268,6 +268,12 @@ func (il IdentityList) Sample(size uint) IdentityList { return dup[:size] } +// DeterministicSample returns deterministic random sample from the `IdentityList` using the given seed +func (il IdentityList) DeterministicSample(size uint, seed int64) IdentityList { + rand.Seed(seed) + return il.Sample(size) +} + // SamplePct returns a random sample from the receiver identity list. The // sample contains `pct` percentage of the list. The sample is rounded up // if `pct>0`, so this will always select at least one identity. diff --git a/network/gossip/libp2p/middleware/middleware.go b/network/gossip/libp2p/middleware/middleware.go index 3638443e7c8..51ca90269c2 100644 --- a/network/gossip/libp2p/middleware/middleware.go +++ b/network/gossip/libp2p/middleware/middleware.go @@ -70,8 +70,3 @@ type Connection interface { Send(msg interface{}) error Receive() (interface{}, error) } - -// Topology represents an interface to get subset of nodes which a given node should directly connect to for 1-k messaging -type Topology interface { - Subset(idList flow.IdentityList, size int, seed string) (map[flow.Identifier]flow.Identity, error) -} diff --git a/network/gossip/libp2p/network.go b/network/gossip/libp2p/network.go index 4c1b0110cd4..da4b8be8fd0 100644 --- a/network/gossip/libp2p/network.go +++ b/network/gossip/libp2p/network.go @@ -15,6 +15,7 @@ import ( "github.com/onflow/flow-go/network/gossip/libp2p/message" "github.com/onflow/flow-go/network/gossip/libp2p/middleware" "github.com/onflow/flow-go/network/gossip/libp2p/queue" + "github.com/onflow/flow-go/network/gossip/libp2p/topology" ) type identifierFilter func(ids ...flow.Identifier) ([]flow.Identifier, error) @@ -27,7 +28,7 @@ type Network struct { ids flow.IdentityList me module.Local mw middleware.Middleware - top middleware.Topology + top topology.Topology metrics module.NetworkMetrics rcache *cache.RcvCache // used to deduplicate incoming messages queue queue.MessageQueue @@ -47,7 +48,7 @@ func NewNetwork( me module.Local, mw middleware.Middleware, csize int, - top middleware.Topology, + top topology.Topology, metrics module.NetworkMetrics, ) (*Network, error) { @@ -156,7 +157,17 @@ func (n *Network) Identity() (map[flow.Identifier]flow.Identity, error) { // Topology returns the identities of a uniform subset of nodes in protocol state using the topology provided earlier func (n *Network) Topology() (map[flow.Identifier]flow.Identity, error) { - return n.top.Subset(n.ids, n.fanout(), n.me.NodeID().String()) + subset, err := n.top.Subset(n.ids, n.fanout()) + if err != nil { + return nil, fmt.Errorf("failed to derive list of peer nodes to connect to: %w", err) + } + + // creates a map of all the selected ids + topMap := make(map[flow.Identifier]flow.Identity) + for _, id := range subset { + topMap[id.NodeID] = *id + } + return topMap, nil } func (n *Network) Receive(nodeID flow.Identifier, msg *message.Message) error { @@ -176,9 +187,9 @@ func (n *Network) SetIDs(ids flow.IdentityList) { } // fanout returns the node fanout derived from the identity list -func (n *Network) fanout() int { +func (n *Network) fanout() uint { // fanout is currently set to half of the system size for connectivity assurance - return (len(n.ids) + 1) / 2 + return uint(len(n.ids)+1) / 2 } func (n *Network) processNetworkMessage(senderID flow.Identifier, message *message.Message) error { diff --git a/network/gossip/libp2p/randPermTopology.go b/network/gossip/libp2p/randPermTopology.go deleted file mode 100644 index eea7e147124..00000000000 --- a/network/gossip/libp2p/randPermTopology.go +++ /dev/null @@ -1,118 +0,0 @@ -package libp2p - -import ( - "fmt" - - "github.com/onflow/flow-go/crypto/random" - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/model/flow/filter" - "github.com/onflow/flow-go/network/gossip/libp2p/middleware" -) - -var _ middleware.Topology = &RandPermTopology{} - -// RandPermTopology generates a deterministic random topology from a given set of nodes and for a given role -// The topology generated is a union of three sets: -// 1. a random subset of the given size -// 2. one node of each of the flow role from the remaining ids -// 3. half of the nodes of the same role as this node from the remaining ids -type RandPermTopology struct { - myRole flow.Role -} - -func NewRandPermTopology(role flow.Role) middleware.Topology { - return &RandPermTopology{ - myRole: role, - } -} - -func (r RandPermTopology) Subset(idList flow.IdentityList, size int, seed string) (map[flow.Identifier]flow.Identity, error) { - - if len(idList) < size { - return nil, fmt.Errorf("cannot sample topology idList %d smaller than desired fanout %d", len(idList), size) - } - - // use the node's identifier as the random generator seed - rndSeed := make([]byte, 32) - copy(rndSeed, seed) - rng, err := random.NewRand(rndSeed) - if err != nil { - return nil, fmt.Errorf("cannot seed the prng: %w", err) - } - - // find a random subset of the given size from the list - fanoutIDs, err := randomSubset(idList, size, rng) - if err != nil { - return nil, fmt.Errorf("cannot sample topology: %w", err) - } - - remainder := idList.Filter(filter.Not(filter.In(fanoutIDs))) - - // find one id for each role from the remaining list, - // if it is not already part of fanoutIDs - oneOfEachRoleIDs := make(flow.IdentityList, 0) - for _, role := range flow.Roles() { - - if len(fanoutIDs.Filter(filter.HasRole(role))) > 0 { - // we already have a node with this role - continue - } - - ids := remainder.Filter(filter.HasRole(role)) - if len(ids) == 0 { - // there are no more nodes of this role to choose from - continue - } - - // choose 1 out of all the remaining nodes of this role - selectedID := rng.UintN(uint64(len(ids))) - - oneOfEachRoleIDs = append(oneOfEachRoleIDs, ids[selectedID]) - } - - remainder = remainder.Filter(filter.Not(filter.In(oneOfEachRoleIDs))) - - // find a n/2 random subset of nodes of the given role from the remaining list - ids := remainder.Filter(filter.HasRole(r.myRole)) - sameRoleIDs := (len(ids) + 1) / 2 // rounded up to the closest integer - - selfRoleIDs, err := randomSubset(ids, sameRoleIDs, rng) - if err != nil { - return nil, fmt.Errorf("cannot sample topology: %w", err) - } - - // combine all three subsets - finalIDs := append(fanoutIDs, oneOfEachRoleIDs...) - finalIDs = append(finalIDs, selfRoleIDs...) - - // creates a map of all the selected ids - topMap := make(map[flow.Identifier]flow.Identity) - for _, id := range finalIDs { - topMap[id.NodeID] = *id - } - - return topMap, nil - -} - -func randomSubset(ids flow.IdentityList, size int, rnd random.Rand) (flow.IdentityList, error) { - - if size == 0 { - return flow.IdentityList{}, nil - } - - if len(ids) < size { - return ids, nil - } - - copy := make(flow.IdentityList, 0, len(ids)) - copy = append(copy, ids...) - err := rnd.Samples(len(copy), size, func(i int, j int) { - copy[i], copy[j] = copy[j], copy[i] - }) - if err != nil { - return nil, err - } - - return copy[:size], nil -} diff --git a/network/gossip/libp2p/randPermTopology_test.go b/network/gossip/libp2p/randPermTopology_test.go deleted file mode 100644 index 75fdf115b90..00000000000 --- a/network/gossip/libp2p/randPermTopology_test.go +++ /dev/null @@ -1,125 +0,0 @@ -package libp2p - -import ( - "math" - "testing" - - "github.com/stretchr/testify/suite" - - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/utils/unittest" -) - -type RandPermTopologyTestSuite struct { - suite.Suite -} - -func TestRandPermTopologyTestSuite(t *testing.T) { - suite.Run(t, new(RandPermTopologyTestSuite)) -} - -// TestMinorityNodesConnected tests node configuration of different sizes with one node role in minority -func (r *RandPermTopologyTestSuite) TestMinorityNodesConnected() { - - // test vectors for different network sizes - testVector := []struct { - total int - minorityRole flow.Role - nodeRole flow.Role - }{ - // integration tests - order of 10s - { - total: 12, - minorityRole: flow.RoleCollection, - nodeRole: flow.RoleCollection, - }, - { - total: 12, - minorityRole: flow.RoleCollection, - nodeRole: flow.RoleConsensus, - }, - // alpha main net order of 100s - { - total: 100, - minorityRole: flow.RoleCollection, - nodeRole: flow.RoleCollection, - }, - { - total: 100, - minorityRole: flow.RoleCollection, - nodeRole: flow.RoleConsensus, - }, - // mature flow order of 1000s - { - total: 1000, - minorityRole: flow.RoleCollection, - nodeRole: flow.RoleCollection, - }, - { - total: 1000, - minorityRole: flow.RoleCollection, - nodeRole: flow.RoleConsensus, - }, - } - - for _, v := range testVector { - r.testTopology(v.total, v.minorityRole, v.nodeRole) - } -} - -func (r *RandPermTopologyTestSuite) testTopology(total int, minorityRole flow.Role, nodeRole flow.Role) { - - distribution := createDistribution(total, minorityRole) - - ids := make(flow.IdentityList, 0) - for role, count := range distribution { - roleIDs := unittest.IdentityListFixture(count, unittest.WithRole(role)) - ids = append(ids, roleIDs...) - } - - rpt := NewRandPermTopology(nodeRole) - fanout := len(ids) / 2 - seed := unittest.IdentityFixture() - - fanoutMap := make(map[flow.Role]int, len(flow.Roles())) - - // test repeated runs of the random topology - for i := 0; i < 1000; i++ { - - top, err := rpt.Subset(ids, fanout, seed.String()) - r.NoError(err) - - for _, v := range top { - fanoutMap[v.Role]++ - } - - // assert that nodes of all role are selected - for _, role := range flow.Roles() { - r.GreaterOrEqualf(fanoutMap[role], 1, "node with role %s not selected", role) - } - } -} - -// createDistribution creates a count distribution of ~total number of nodes with 2% minority node count -func createDistribution(total int, minority flow.Role) map[flow.Role]int { - - minorityPercentage := 0.02 - count := func(per float64) int { - nodes := int(math.Ceil(per * float64(total))) // assume atleast one node of the minority role - return nodes - } - minorityCount, majorityCount := count(minorityPercentage), count(1-minorityPercentage) - roles := flow.Roles() - totalRoles := len(roles) - 1 - majorityCountPerRole := int(math.Ceil(float64(majorityCount) / float64(totalRoles))) - - countMap := make(map[flow.Role]int, totalRoles) // map of role to the number of nodes for that role - for _, r := range roles { - if r == minority { - countMap[r] = minorityCount - } else { - countMap[r] = majorityCountPerRole - } - } - return countMap -} diff --git a/network/gossip/libp2p/test/helper.go b/network/gossip/libp2p/test/helper.go index b0a0b2b4100..5a1d2554383 100644 --- a/network/gossip/libp2p/test/helper.go +++ b/network/gossip/libp2p/test/helper.go @@ -16,7 +16,7 @@ import ( "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/codec/json" "github.com/onflow/flow-go/network/gossip/libp2p" - "github.com/onflow/flow-go/network/gossip/libp2p/middleware" + "github.com/onflow/flow-go/network/gossip/libp2p/topology" "github.com/onflow/flow-go/utils/unittest" ) @@ -44,7 +44,7 @@ func CreateIDs(count int) []*flow.Identity { // it returns the slice of created middlewares // csize is the receive cache size of the nodes func createNetworks(log zerolog.Logger, mws []*libp2p.Middleware, ids flow.IdentityList, csize int, dryrun bool, - tops ...middleware.Topology) ([]*libp2p.Network, error) { + tops ...topology.Topology) ([]*libp2p.Network, error) { count := len(mws) nets := make([]*libp2p.Network, 0) metrics := metrics.NewNoopCollector() @@ -54,9 +54,12 @@ func createNetworks(log zerolog.Logger, mws []*libp2p.Middleware, ids flow.Ident // if no topology is passed in, use the default topology for all networks if tops == nil { - tops = make([]middleware.Topology, count) - rpt := libp2p.NewRandPermTopology(flow.RoleCollection) + tops = make([]topology.Topology, count) for i := range tops { + rpt, err := topology.NewRandPermTopology(ids[i].Role, ids[i].NodeID) + if err != nil { + return nil, fmt.Errorf("could not create network: %w", err) + } tops[i] = rpt } } @@ -68,7 +71,7 @@ func createNetworks(log zerolog.Logger, mws []*libp2p.Middleware, ids flow.Ident me.On("NotMeFilter").Return(flow.IdentityFilter(filter.Any)) net, err := libp2p.NewNetwork(log, json.NewCodec(), identities, me, mws[i], csize, tops[i], metrics) if err != nil { - return nil, fmt.Errorf("could not create error %w", err) + return nil, fmt.Errorf("could not create network: %w", err) } nets = append(nets, net) diff --git a/network/gossip/libp2p/test/sparsetopology_test.go b/network/gossip/libp2p/test/sparsetopology_test.go index 9f77cecc052..3069b78aea0 100644 --- a/network/gossip/libp2p/test/sparsetopology_test.go +++ b/network/gossip/libp2p/test/sparsetopology_test.go @@ -18,7 +18,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/libp2p/message" "github.com/onflow/flow-go/network/gossip/libp2p" - "github.com/onflow/flow-go/network/gossip/libp2p/middleware" + "github.com/onflow/flow-go/network/gossip/libp2p/topology" ) // SparseTopologyTestSuite test 1-k messaging in a sparsely connected network @@ -214,20 +214,15 @@ type IndexBoundTopology struct { } // Returns a subset of ids bounded by [minIndex, maxIndex) for the SparseTopology -func (ibt IndexBoundTopology) Subset(idList flow.IdentityList, _ int, _ string) (map[flow.Identifier]flow.Identity, error) { - subsetLen := ibt.maxIndex - ibt.minIndex - var result = make(map[flow.Identifier]flow.Identity, subsetLen) +func (ibt IndexBoundTopology) Subset(idList flow.IdentityList, _ uint) (flow.IdentityList, error) { sub := idList[ibt.minIndex:ibt.maxIndex] - for _, id := range sub { - result[id.ID()] = *id - } - return result, nil + return sub, nil } // createSparseTopology creates topologies for nodes such that subsets have one overlapping node // e.g. top 1 - 0,1,2,3; top 2 - 3,4,5,6; top 3 - 6,7,8,9 -func createSparseTopology(count int, subsets int) []middleware.Topology { - tops := make([]middleware.Topology, count) +func createSparseTopology(count int, subsets int) []topology.Topology { + tops := make([]topology.Topology, count) subsetLen := count / subsets for i := 0; i < count; i++ { s := i / subsets // which subset does this node belong to @@ -249,8 +244,8 @@ func createSparseTopology(count int, subsets int) []middleware.Topology { // createDisjointedTopology creates topologies for nodes such that subsets don't have any overlap // e.g. top 1 - 0,1,2; top 2 - 3,4,5; top 3 - 6,7,8 -func createDisjointedTopology(count int, subsets int) []middleware.Topology { - tops := make([]middleware.Topology, count) +func createDisjointedTopology(count int, subsets int) []topology.Topology { + tops := make([]topology.Topology, count) subsetLen := count / subsets for i := 0; i < count; i++ { s := i / subsets // which subset does this node belong to diff --git a/network/gossip/libp2p/test/topology_test.go b/network/gossip/libp2p/test/topology_test.go index a6b83677695..2ca59516d77 100644 --- a/network/gossip/libp2p/test/topology_test.go +++ b/network/gossip/libp2p/test/topology_test.go @@ -1,6 +1,7 @@ package test import ( + "math" "os" "sort" "testing" @@ -14,6 +15,7 @@ import ( "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/network/codec/json" "github.com/onflow/flow-go/network/gossip/libp2p" + "github.com/onflow/flow-go/network/gossip/libp2p/topology" "github.com/onflow/flow-go/utils/unittest" ) @@ -37,9 +39,10 @@ func TestNetworkTestSuit(t *testing.T) { func (n *TopologyTestSuite) SetupTest() { n.count = 100 n.ids = CreateIDs(n.count) - rndSubsetSize := n.count / 2 + rndSubsetSize := int(math.Ceil(float64(n.count+1) / 2)) oneOfEachNodetype := 0 // there is only one node type in this test - halfOfRemainingNodes := (n.count - rndSubsetSize - oneOfEachNodetype) / 2 + remaining := n.count - rndSubsetSize - oneOfEachNodetype + halfOfRemainingNodes := int(math.Ceil(float64(remaining+1) / 2)) n.expectedSize = rndSubsetSize + oneOfEachNodetype + halfOfRemainingNodes // takes firs id as the current nodes id @@ -48,7 +51,7 @@ func (n *TopologyTestSuite) SetupTest() { logger := log.Output(zerolog.ConsoleWriter{Out: os.Stderr}).With().Caller().Logger() key, err := GenerateNetworkingKey(me.NodeID) - require.NoError(n.Suite.T(), err) + require.NoError(n.T(), err) metrics := metrics.NewNoopCollector() @@ -62,47 +65,48 @@ func (n *TopologyTestSuite) SetupTest() { libp2p.DefaultMaxUnicastMsgSize, libp2p.DefaultMaxPubSubMsgSize, unittest.IdentifierFixture().String()) - require.NoError(n.Suite.T(), err) + require.NoError(n.T(), err) // creates and mocks a network instance nets, err := createNetworks(logger, []*libp2p.Middleware{mw}, n.ids, 1, true) - require.NoError(n.Suite.T(), err) - require.Len(n.Suite.T(), nets, 1) + require.NoError(n.T(), err) + require.Len(n.T(), nets, 1) n.nets = nets[0] } func (n *TopologyTestSuite) TestTopologySize() { // topology of size the entire network top, err := n.nets.Topology() - require.NoError(n.Suite.T(), err) - require.Len(n.Suite.T(), top, n.expectedSize) + require.NoError(n.T(), err) + require.Len(n.T(), top, n.expectedSize) } // TestMembership evaluates every id in topology to be a protocol id func (n *TopologyTestSuite) TestMembership() { top, err := n.nets.Topology() - require.NoError(n.Suite.T(), err) - require.Len(n.Suite.T(), top, n.expectedSize) + require.NoError(n.T(), err) + require.Len(n.T(), top, n.expectedSize) // every id in topology should be an id of the protocol for id := range top { - require.Contains(n.Suite.T(), n.ids.NodeIDs(), id) + require.Contains(n.T(), n.ids.NodeIDs(), id) } } // TestDeteministicity verifies that the same seed generates the same topology func (n *TopologyTestSuite) TestDeteministicity() { - top := libp2p.NewRandPermTopology(flow.RoleCollection) + top, err := topology.NewRandPermTopology(flow.RoleCollection, unittest.IdentifierFixture()) + require.NoError(n.T(), err) // topology of size count/2 - topSize := n.count / 2 + topSize := uint(n.count / 2) var previous, current []string for i := 0; i < n.count; i++ { previous = current current = nil // generate a new topology with a the same ids, size and seed - idMap, err := top.Subset(n.ids, topSize, "sameseed") - require.NoError(n.Suite.T(), err) + idMap, err := top.Subset(n.ids, topSize) + require.NoError(n.T(), err) for _, v := range idMap { current = append(current, v.NodeID.String()) @@ -115,15 +119,15 @@ func (n *TopologyTestSuite) TestDeteministicity() { } // assert that a different seed generates a different topology - require.Equal(n.Suite.T(), previous, current) + require.Equal(n.T(), previous, current) } } // TestUniqueness verifies that different seeds generates different topologies func (n *TopologyTestSuite) TestUniqueness() { - top := libp2p.NewRandPermTopology(flow.RoleCollection) + // topology of size count/2 - topSize := n.count / 2 + topSize := uint(n.count / 2) var previous, current []string for i := 0; i < n.count; i++ { @@ -131,8 +135,10 @@ func (n *TopologyTestSuite) TestUniqueness() { current = nil // generate a new topology with a the same ids, size but a different seed for each iteration identity, _ := n.ids.ByIndex(uint(i)) - idMap, err := top.Subset(n.ids, topSize, identity.NodeID.String()) - require.NoError(n.Suite.T(), err) + top, err := topology.NewRandPermTopology(flow.RoleCollection, identity.NodeID) + require.NoError(n.T(), err) + idMap, err := top.Subset(n.ids, topSize) + require.NoError(n.T(), err) for _, v := range idMap { current = append(current, v.NodeID.String()) @@ -144,6 +150,6 @@ func (n *TopologyTestSuite) TestUniqueness() { } // assert that a different seed generates a different topology - require.NotEqual(n.Suite.T(), previous, current) + require.NotEqual(n.T(), previous, current) } } diff --git a/network/gossip/libp2p/topology/collectionTopology.go b/network/gossip/libp2p/topology/collectionTopology.go new file mode 100644 index 00000000000..3ce5bb0468c --- /dev/null +++ b/network/gossip/libp2p/topology/collectionTopology.go @@ -0,0 +1,83 @@ +package topology + +import ( + "fmt" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/model/flow/filter" + "github.com/onflow/flow-go/state/protocol" +) + +// CollectionTopology builds on top of RandPermTopology and generates a deterministic random topology for collection node +// such that nodes withing the same collection cluster form a connected graph. +type CollectionTopology struct { + RandPermTopology + nodeID flow.Identifier + state protocol.ReadOnlyState + seed int64 +} + +func NewCollectionTopology(nodeID flow.Identifier, state protocol.ReadOnlyState) (CollectionTopology, error) { + rpt, err := NewRandPermTopology(flow.RoleCollection, nodeID) + if err != nil { + return CollectionTopology{}, err + } + return CollectionTopology{ + RandPermTopology: rpt, + nodeID: nodeID, + state: state, + }, nil +} + +// Subset samples the idList and returns a list of nodes to connect with such that: +// a. this node is directly or indirectly connected to all other nodes in the same cluster +// b. to all other nodes +// c. to at least one node of each type and +///d. to all other nodes of the same type. +// The collection nodes within a collection cluster need to form a connected graph among themselves independent of any +// other nodes to ensure reliable dissemination of cluster specific topic messages. e.g ClusterBlockProposal +// Similarly, all nodes of network need to form a connected graph, to ensure reliable dissemination of messages for +// topics subscribed by all node types e.g. BlockProposals +// Each node should be connected to at least one node of each type to ensure nodes don't form an island of a specific +// role, specially since some node types are represented by a very small number of nodes (e.g. few access nodes compared +//to tens or hundreds of collection nodes) +// Finally, all nodes of the same type should form a connected graph for exchanging messages for role specific topics +// e.g. Transaction +func (c CollectionTopology) Subset(idList flow.IdentityList, fanout uint) (flow.IdentityList, error) { + + randPermSample, err := c.RandPermTopology.Subset(idList, fanout) + if err != nil { + return nil, err + } + + clusterPeers, err := c.clusterPeers() + if err != nil { + return nil, fmt.Errorf("failed to find cluster peers for node %s", c.nodeID.String()) + } + clusterSample, _ := connectedGraphSample(clusterPeers, c.seed) + + // include only those cluster peers which have not already been chosen by RandPermTopology + uniqueClusterSample := clusterSample.Filter(filter.Not(filter.In(randPermSample))) + + // add those to the earlier sample from randPerm + randPermSample = append(randPermSample, uniqueClusterSample...) + + // return the aggregated set + return randPermSample, nil +} + +// clusterPeers returns the list of other nodes within the same cluster as this node +func (c CollectionTopology) clusterPeers() (flow.IdentityList, error) { + currentEpoch := c.state.Final().Epochs().Current() + clusterList, err := currentEpoch.Clustering() + if err != nil { + return nil, err + } + + myCluster, _, found := clusterList.ByNodeID(c.nodeID) + if !found { + return nil, fmt.Errorf("failed to find the cluster for node ID %s", c.nodeID.String()) + } + + return myCluster, nil +} diff --git a/network/gossip/libp2p/topology/collectionTopology_test.go b/network/gossip/libp2p/topology/collectionTopology_test.go new file mode 100644 index 00000000000..bf96d23d706 --- /dev/null +++ b/network/gossip/libp2p/topology/collectionTopology_test.go @@ -0,0 +1,74 @@ +package topology_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/model/flow/filter" + "github.com/onflow/flow-go/network/gossip/libp2p/topology" + protocol "github.com/onflow/flow-go/state/protocol/mock" + "github.com/onflow/flow-go/utils/unittest" +) + +type CollectionTopologyTestSuite struct { + suite.Suite + state *protocol.State + snapshot *protocol.Snapshot + epochQuery *protocol.EpochQuery + clusterList flow.ClusterList + ids flow.IdentityList + collectors flow.IdentityList +} + +func TestCollectionTopologyTestSuite(t *testing.T) { + suite.Run(t, new(CollectionTopologyTestSuite)) +} + +func (suite *CollectionTopologyTestSuite) SetupTest() { + suite.state = new(protocol.State) + suite.snapshot = new(protocol.Snapshot) + suite.epochQuery = new(protocol.EpochQuery) + nClusters := 3 + nCollectors := 7 + suite.collectors = unittest.IdentityListFixture(nCollectors, unittest.WithRole(flow.RoleCollection)) + suite.ids = append(unittest.IdentityListFixture(1000, unittest.WithAllRolesExcept(flow.RoleCollection)), suite.collectors...) + assignments := unittest.ClusterAssignment(uint(nClusters), suite.collectors) + clusters, err := flow.NewClusterList(assignments, suite.collectors) + require.NoError(suite.T(), err) + suite.clusterList = clusters + epoch := new(protocol.Epoch) + epoch.On("Clustering").Return(clusters, nil).Times(nCollectors) + suite.epochQuery.On("Current").Return(epoch).Times(nCollectors) + suite.snapshot.On("Epochs").Return(suite.epochQuery).Times(nCollectors) + suite.state.On("Final").Return(suite.snapshot, nil).Times(nCollectors) +} + +// TestSubset tests that the collection nodes using CollectionTopology form a connected graph and nodes within the same +// collection clusters also form a connected graph +func (suite *CollectionTopologyTestSuite) TestSubset() { + var adjencyMap = make(map[flow.Identifier]flow.IdentityList, len(suite.collectors)) + // for each of the collector node, find a subset of nodes it should connect to using the CollectionTopology + for _, c := range suite.collectors { + collectionTopology, err := topology.NewCollectionTopology(c.NodeID, suite.state) + assert.NoError(suite.T(), err) + subset, err := collectionTopology.Subset(suite.ids, uint(len(suite.ids))) + assert.NoError(suite.T(), err) + adjencyMap[c.NodeID] = subset + } + + // check that all collection nodes are either directly connected or indirectly connected via other collection nodes + checkConnectednessByRole(suite.T(), adjencyMap, suite.collectors, flow.RoleCollection) + + // check that each of the collection clusters forms a connected graph + for _, cluster := range suite.clusterList { + checkConnectednessByCluster(suite.T(), adjencyMap, cluster, suite.collectors) + } +} + +func checkConnectednessByCluster(t *testing.T, adjMap map[flow.Identifier]flow.IdentityList, cluster flow.IdentityList, ids flow.IdentityList) { + checkGraphConnected(t, adjMap, ids, filter.In(cluster)) +} diff --git a/network/gossip/libp2p/topology/randPermTopology.go b/network/gossip/libp2p/topology/randPermTopology.go new file mode 100644 index 00000000000..f2675570587 --- /dev/null +++ b/network/gossip/libp2p/topology/randPermTopology.go @@ -0,0 +1,67 @@ +package topology + +import ( + "fmt" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/model/flow/filter" +) + +var _ Topology = &RandPermTopology{} + +// RandPermTopology generates a random topology from a given set of nodes and for a given role +// The topology generated is a union of three sets: +// 1. a random subset of the size (n+1)/2 to make sure the nodes form a connected graph with no islands +// 2. one node of each of the flow role from the remaining ids to make a node can talk to any other type of node +// 3. (n+1)/2 of the nodes of the same role as this node from the remaining ids to make sure that nodes of the same type +// form a connected graph with no islands. +type RandPermTopology struct { + myRole flow.Role + seed int64 +} + +func NewRandPermTopology(role flow.Role, id flow.Identifier) (RandPermTopology, error) { + seed, err := seedFromID(id) + if err != nil { + return RandPermTopology{}, fmt.Errorf("failed to seed topology: %w", err) + } + return RandPermTopology{ + myRole: role, + seed: seed, + }, nil +} + +func (r RandPermTopology) Subset(idList flow.IdentityList, fanout uint) (flow.IdentityList, error) { + + if uint(len(idList)) < fanout { + return nil, fmt.Errorf("cannot sample topology idList %d smaller than desired fanout %d", len(idList), fanout) + } + + // connect to (n+1)/2 other nodes to ensure graph is connected (no islands) + result, remainder := connectedGraphSample(idList, r.seed) + + // find one id for each role from the remaining list, if it hasn't already been chosen + for _, role := range flow.Roles() { + + if len(result.Filter(filter.HasRole(role))) > 0 { + // we already have a node with this role + continue + } + + var selectedIDs flow.IdentityList + + // connect to one of each type (ignore remainder) + selectedIDs, _ = oneOfEachRoleSample(remainder, r.seed, role) + + // add it to result + result = append(result, selectedIDs...) + } + + // connect to (k+1)/2 other nodes of the same type to ensure all nodes of the same type are fully connected, + // where k is the number of nodes of each type + selfRoleIDs, _ := connectedGraphByRoleSample(remainder, r.seed, r.myRole) // ignore the remaining ids + + result = append(result, selfRoleIDs...) + + return result, nil +} diff --git a/network/gossip/libp2p/topology/randPermTopology_test.go b/network/gossip/libp2p/topology/randPermTopology_test.go new file mode 100644 index 00000000000..64952b15f61 --- /dev/null +++ b/network/gossip/libp2p/topology/randPermTopology_test.go @@ -0,0 +1,186 @@ +package topology_test + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/model/flow/filter" + "github.com/onflow/flow-go/network/gossip/libp2p/topology" + "github.com/onflow/flow-go/utils/unittest" +) + +type RandPermTopologyTestSuite struct { + suite.Suite +} + +func TestRandPermTopologyTestSuite(t *testing.T) { + suite.Run(t, new(RandPermTopologyTestSuite)) +} + +// TestNodesConnected tests overall node connectedness and connectedness by role by keeping nodes of one role type in +// minority (~2%) +func (r *RandPermTopologyTestSuite) TestNodesConnected() { + + // test vectors for different network sizes + testVector := []struct { + total int + minorityRole flow.Role + nodeRole flow.Role + }{ + // integration tests - order of 10s + { + total: 12, + minorityRole: flow.RoleCollection, + nodeRole: flow.RoleCollection, + }, + { + total: 12, + minorityRole: flow.RoleCollection, + nodeRole: flow.RoleConsensus, + }, + // alpha main net order of 100s + { + total: 100, + minorityRole: flow.RoleCollection, + nodeRole: flow.RoleCollection, + }, + { + total: 100, + minorityRole: flow.RoleCollection, + nodeRole: flow.RoleConsensus, + }, + // mature flow order of 1000s + { + total: 1000, + minorityRole: flow.RoleCollection, + nodeRole: flow.RoleCollection, + }, + { + total: 1000, + minorityRole: flow.RoleCollection, + nodeRole: flow.RoleConsensus, + }, + } + + for _, v := range testVector { + r.testTopology(v.total, v.minorityRole, v.nodeRole) + } +} + +func (r *RandPermTopologyTestSuite) testTopology(total int, minorityRole flow.Role, nodeRole flow.Role) { + + distribution := createDistribution(total, minorityRole) + + ids := make(flow.IdentityList, 0) + for role, count := range distribution { + roleIDs := unittest.IdentityListFixture(count, unittest.WithRole(role)) + ids = append(ids, roleIDs...) + } + + n := len(ids) + adjencyMap := make(map[flow.Identifier]flow.IdentityList, n) + + for _, id := range ids { + rpt, err := topology.NewRandPermTopology(id.Role, id.NodeID) + r.NoError(err) + top, err := rpt.Subset(ids, uint(n)) + r.NoError(err) + adjencyMap[id.NodeID] = top + } + + // check that nodes of the same role form a connected graph + checkConnectednessByRole(r.T(), adjencyMap, ids, minorityRole) + + // check that nodes form a connected graph + checkConnectedness(r.T(), adjencyMap, ids) +} + +// TestSubsetDeterminism tests that if the id list remains the same, the Topology.Subset call always yields the same +// list of nodes +func (r *RandPermTopologyTestSuite) TestSubsetDeterminism() { + ids := unittest.IdentityListFixture(100, unittest.WithAllRoles()) + for _, id := range ids { + rpt, err := topology.NewRandPermTopology(flow.RoleConsensus, id.NodeID) + r.NoError(err) + var prev flow.IdentityList + for i := 0; i < 10; i++ { + current, err := rpt.Subset(ids, uint(100)) + r.NoError(err) + if prev != nil { + assert.EqualValues(r.T(), prev, current) + } + } + } +} + +// createDistribution creates a count distribution of ~total number of nodes with 2% minority node count +func createDistribution(total int, minority flow.Role) map[flow.Role]int { + + minorityPercentage := 0.02 + count := func(per float64) int { + nodes := int(math.Ceil(per * float64(total))) // assume atleast one node of the minority role + return nodes + } + minorityCount, majorityCount := count(minorityPercentage), count(1-minorityPercentage) + roles := flow.Roles() + totalRoles := len(roles) - 1 + majorityCountPerRole := int(math.Ceil(float64(majorityCount) / float64(totalRoles))) + + countMap := make(map[flow.Role]int, totalRoles) // map of role to the number of nodes for that role + for _, r := range roles { + if r == minority { + countMap[r] = minorityCount + } else { + countMap[r] = majorityCountPerRole + } + } + return countMap +} + +func checkConnectednessByRole(t *testing.T, adjMap map[flow.Identifier]flow.IdentityList, ids flow.IdentityList, role flow.Role) { + checkGraphConnected(t, adjMap, ids, filter.HasRole(role)) +} + +func checkConnectedness(t *testing.T, adjMap map[flow.Identifier]flow.IdentityList, ids flow.IdentityList) { + checkGraphConnected(t, adjMap, ids, filter.Any) +} + +// checkGraphConnected checks if the graph represented by the adjacency matrix is connected. +// It traverses the adjacency map starting from an arbitrary node and checks if all nodes that satisfy the filter +// were visited. +func checkGraphConnected(t *testing.T, adjMap map[flow.Identifier]flow.IdentityList, ids flow.IdentityList, f flow.IdentityFilter) { + + // filter the ids and find the expected node count + expectedIDs := ids.Filter(f) + expectedCount := len(expectedIDs) + + // start with an arbitrary node which satisfies the filter + startID := expectedIDs.Sample(1)[0].NodeID + + visited := make(map[flow.Identifier]bool) + dfs(startID, adjMap, visited, f) + + // assert that expected number of nodes were visited by DFS + assert.Equal(t, expectedCount, len(visited)) +} + +// dfs to check graph connectedness +func dfs(currentID flow.Identifier, + adjMap map[flow.Identifier]flow.IdentityList, + visited map[flow.Identifier]bool, + filter flow.IdentityFilter) { + + if visited[currentID] { + return + } + + visited[currentID] = true + + for _, id := range adjMap[currentID].Filter(filter) { + dfs(id.NodeID, adjMap, visited, filter) + } +} diff --git a/network/gossip/libp2p/topology/topology.go b/network/gossip/libp2p/topology/topology.go new file mode 100644 index 00000000000..b361cfd95a5 --- /dev/null +++ b/network/gossip/libp2p/topology/topology.go @@ -0,0 +1,9 @@ +package topology + +import "github.com/onflow/flow-go/model/flow" + +// Topology provides a subset of nodes which a given node should directly connect to for 1-k messaging +type Topology interface { + // Subset returns a random subset of the identity list that is passed + Subset(idList flow.IdentityList, fanout uint) (flow.IdentityList, error) +} diff --git a/network/gossip/libp2p/topology/topology_utils.go b/network/gossip/libp2p/topology/topology_utils.go new file mode 100644 index 00000000000..510b28aff02 --- /dev/null +++ b/network/gossip/libp2p/topology/topology_utils.go @@ -0,0 +1,69 @@ +package topology + +import ( + "bytes" + "encoding/binary" + "math" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/model/flow/filter" +) + +// connectedGraph returns a random subset of length (n+1)/2. +// If each node connects to the nodes returned by connectedGraph, the graph of such nodes is connected. +func connectedGraph(ids flow.IdentityList, seed int64) flow.IdentityList { + // choose (n+1)/2 random nodes so that each node in the graph will have a degree >= (n+1) / 2, + // guaranteeing a connected graph. + size := uint(math.Ceil(float64(len(ids)+1) / 2)) + return ids.DeterministicSample(size, seed) +} + +// connectedGraph returns a random subset of length (n+1)/2 of the specified role +func connectedGraphByRole(ids flow.IdentityList, seed int64, role flow.Role) flow.IdentityList { + filteredIds := ids.Filter(filter.HasRole(role)) + if len(filteredIds) == 0 { + // there are no more nodes of this role to choose from + return flow.IdentityList{} + } + return connectedGraph(filteredIds, seed) +} + +// oneOfRole returns one random id of the given role +func oneOfRole(ids flow.IdentityList, seed int64, role flow.Role) flow.IdentityList { + filteredIds := ids.Filter(filter.HasRole(role)) + if len(filteredIds) == 0 { + // there are no more nodes of this role to choose from + return flow.IdentityList{} + } + + // choose 1 out of all the remaining nodes of this role + selectedID := filteredIds.DeterministicSample(1, seed) + + return selectedID +} + +func connectedGraphSample(ids flow.IdentityList, seed int64) (flow.IdentityList, flow.IdentityList) { + result := connectedGraph(ids, seed) + remainder := ids.Filter(filter.Not(filter.In(result))) + return result, remainder +} + +func connectedGraphByRoleSample(ids flow.IdentityList, seed int64, role flow.Role) (flow.IdentityList, flow.IdentityList) { + result := connectedGraphByRole(ids, seed, role) + remainder := ids.Filter(filter.Not(filter.In(result))) + return result, remainder +} + +func oneOfEachRoleSample(ids flow.IdentityList, seed int64, role flow.Role) (flow.IdentityList, flow.IdentityList) { + result := oneOfRole(ids, seed, role) + remainder := ids.Filter(filter.Not(filter.In(result))) + return result, remainder +} + +// seedFromID generates a int64 seed from a flow.Identifier +func seedFromID(id flow.Identifier) (int64, error) { + var seed int64 + buf := bytes.NewBuffer(id[:]) + err := binary.Read(buf, binary.LittleEndian, &seed) + return seed, err +}