From 3bb0294d8f6819890ead8b36cea4a11f178d839e Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 18 Jul 2024 15:57:47 -0400 Subject: [PATCH 1/3] support EnableGossipService in p2p streams --- network/p2p/p2p.go | 2 +- network/p2p/streams.go | 60 +++++++---------- network/p2pNetwork_test.go | 130 +++++++++++++++++++++++++++++++++++++ 3 files changed, 156 insertions(+), 36 deletions(-) diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 2c64b63eab..21782dce44 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -159,7 +159,7 @@ func configureResourceManager(cfg config.Local) (network.ResourceManager, error) // MakeService creates a P2P service instance func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, bootstrapPeers []*peer.AddrInfo) (*serviceImpl, error) { - sm := makeStreamManager(ctx, log, h, wsStreamHandler) + sm := makeStreamManager(ctx, log, h, wsStreamHandler, cfg.EnableGossipService) h.Network().Notify(sm) h.SetStreamHandler(AlgorandWsProtocol, sm.streamHandler) diff --git a/network/p2p/streams.go b/network/p2p/streams.go index e7277f4871..d79471f664 100644 --- a/network/p2p/streams.go +++ b/network/p2p/streams.go @@ -30,10 +30,11 @@ import ( // streamManager implements network.Notifiee to create and manage streams for use with non-gossipsub protocols. type streamManager struct { - ctx context.Context - log logging.Logger - host host.Host - handler StreamHandler + ctx context.Context + log logging.Logger + host host.Host + handler StreamHandler + allowIncomingGossip bool streams map[peer.ID]network.Stream streamsLock deadlock.Mutex @@ -42,18 +43,25 @@ type streamManager struct { // StreamHandler is called when a new bidirectional stream for a given protocol and peer is opened. type StreamHandler func(ctx context.Context, pid peer.ID, s network.Stream, incoming bool) -func makeStreamManager(ctx context.Context, log logging.Logger, h host.Host, handler StreamHandler) *streamManager { +func makeStreamManager(ctx context.Context, log logging.Logger, h host.Host, handler StreamHandler, allowIncomingGossip bool) *streamManager { return &streamManager{ - ctx: ctx, - log: log, - host: h, - handler: handler, - streams: make(map[peer.ID]network.Stream), + ctx: ctx, + log: log, + host: h, + handler: handler, + allowIncomingGossip: allowIncomingGossip, + streams: make(map[peer.ID]network.Stream), } } // streamHandler is called by libp2p when a new stream is accepted func (n *streamManager) streamHandler(stream network.Stream) { + if stream.Conn().Stat().Direction == network.DirInbound && !n.allowIncomingGossip { + n.log.Debugf("rejecting stream from incoming connection from %s", stream.Conn().RemotePeer().String()) + stream.Close() + return + } + n.streamsLock.Lock() defer n.streamsLock.Unlock() @@ -74,15 +82,7 @@ func (n *streamManager) streamHandler(stream network.Stream) { } n.streams[stream.Conn().RemotePeer()] = stream - // streamHandler is supposed to be called for accepted streams, so we expect incoming here incoming := stream.Conn().Stat().Direction == network.DirInbound - if !incoming { - if stream.Stat().Direction == network.DirUnknown { - n.log.Warnf("Unknown direction for a steam %s to/from %s", stream.ID(), remotePeer) - } else { - n.log.Warnf("Unexpected outgoing stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String()) - } - } n.handler(n.ctx, remotePeer, stream, incoming) return } @@ -92,20 +92,18 @@ func (n *streamManager) streamHandler(stream network.Stream) { } // no old stream n.streams[stream.Conn().RemotePeer()] = stream - // streamHandler is supposed to be called for accepted streams, so we expect incoming here incoming := stream.Conn().Stat().Direction == network.DirInbound - if !incoming { - if stream.Stat().Direction == network.DirUnknown { - n.log.Warnf("streamHandler: unknown direction for a steam %s to/from %s", stream.ID(), remotePeer) - } else { - n.log.Warnf("Unexpected outgoing stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String()) - } - } n.handler(n.ctx, remotePeer, stream, incoming) } -// Connected is called when a connection is opened +// Connected is called when a connection is opened. +// It is called for both incoming (listener -> addConn) and outgoing (dialer -> addConn) connections. func (n *streamManager) Connected(net network.Network, conn network.Conn) { + if conn.Stat().Direction == network.DirInbound && !n.allowIncomingGossip { + n.log.Debugf("ignoring incoming connection from %s", conn.RemotePeer().String()) + return + } + remotePeer := conn.RemotePeer() localPeer := n.host.ID() @@ -138,15 +136,7 @@ func (n *streamManager) Connected(net network.Network, conn network.Conn) { needUnlock = false n.streamsLock.Unlock() - // a new stream created above, expected direction is outbound incoming := stream.Conn().Stat().Direction == network.DirInbound - if incoming { - n.log.Warnf("Unexpected incoming stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String()) - } else { - if stream.Stat().Direction == network.DirUnknown { - n.log.Warnf("Connected: unknown direction for a steam %s to/from %s", stream.ID(), remotePeer) - } - } n.handler(n.ctx, remotePeer, stream, incoming) } diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 0eac398431..308ea4a31c 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -1154,3 +1154,133 @@ func TestMergeP2PAddrInfoResolvedAddresses(t *testing.T) { }) } } + +// TestP2PEnableGossipService_NodeDisable ensures that a node with EnableGossipService=false +// still can participate in the network by sending and receiving messages. +func TestP2PEnableGossipService_NodeDisable(t *testing.T) { + partitiontest.PartitionTest(t) + + log := logging.TestingLog(t) + + // prepare configs + cfg := config.GetDefaultLocal() + cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses + + relayCfg := cfg + relayCfg.NetAddress = "127.0.0.1:0" + + nodeCfg := cfg + nodeCfg.EnableGossipService = false + nodeCfg2 := nodeCfg + nodeCfg2.NetAddress = "127.0.0.1:0" + + tests := []struct { + name string + relayCfg config.Local + nodeCfg config.Local + }{ + {"non-listening-node", relayCfg, nodeCfg}, + {"listening-node", relayCfg, nodeCfg2}, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + relayCfg := test.relayCfg + netA, err := NewP2PNetwork(log, relayCfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) + require.NoError(t, err) + netA.Start() + defer netA.Stop() + + peerInfoA := netA.service.AddrInfo() + addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA) + require.NoError(t, err) + require.NotZero(t, addrsA[0]) + multiAddrStr := addrsA[0].String() + phoneBookAddresses := []string{multiAddrStr} + + // start netB with gossip service disabled + nodeCfg := test.nodeCfg + netB, err := NewP2PNetwork(log, nodeCfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) + require.NoError(t, err) + netB.Start() + defer netB.Stop() + + require.Eventually(t, func() bool { + return netA.hasPeers() && netB.hasPeers() + }, 1*time.Second, 50*time.Millisecond) + + testTag := protocol.AgreementVoteTag + + var handlerCountA atomic.Uint32 + passThroughHandlerA := []TaggedMessageHandler{ + {Tag: testTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage { + handlerCountA.Add(1) + return OutgoingMessage{Action: Broadcast} + })}, + } + var handlerCountB atomic.Uint32 + passThroughHandlerB := []TaggedMessageHandler{ + {Tag: testTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage { + handlerCountB.Add(1) + return OutgoingMessage{Action: Broadcast} + })}, + } + netA.RegisterHandlers(passThroughHandlerA) + netB.RegisterHandlers(passThroughHandlerB) + + // send messages from B and confirm that they get received by C (via A) + for i := 0; i < 10; i++ { + err = netA.Broadcast(context.Background(), testTag, []byte(fmt.Sprintf("hello from A %d", i)), false, nil) + require.NoError(t, err) + err = netB.Broadcast(context.Background(), testTag, []byte(fmt.Sprintf("hello from B %d", i)), false, nil) + require.NoError(t, err) + } + + require.Eventually( + t, + func() bool { + return handlerCountA.Load() == 10 && handlerCountB.Load() == 10 + }, + 2*time.Second, + 50*time.Millisecond, + ) + }) + } +} + +func TestP2PEnableGossipService_BothDisable(t *testing.T) { + partitiontest.PartitionTest(t) + + log := logging.TestingLog(t) + + // prepare configs + cfg := config.GetDefaultLocal() + cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses + cfg.EnableGossipService = false // disable gossip service by default + + relayCfg := cfg + relayCfg.NetAddress = "127.0.0.1:0" + + netA, err := NewP2PNetwork(log, relayCfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) + require.NoError(t, err) + netA.Start() + defer netA.Stop() + + peerInfoA := netA.service.AddrInfo() + addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA) + require.NoError(t, err) + require.NotZero(t, addrsA[0]) + multiAddrStr := addrsA[0].String() + phoneBookAddresses := []string{multiAddrStr} + + nodeCfg := cfg + nodeCfg.NetAddress = "" + + netB, err := NewP2PNetwork(log, nodeCfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) + require.NoError(t, err) + netB.Start() + defer netB.Stop() + + require.Eventually(t, func() bool { + return !netA.hasPeers() && !netB.hasPeers() + }, 1*time.Second, 50*time.Millisecond) +} From c28419a649790e1d06f714532832377bd34f71a2 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 19 Jul 2024 14:48:40 -0400 Subject: [PATCH 2/3] self review --- network/p2p/streams.go | 4 ++-- network/p2pNetwork_test.go | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/network/p2p/streams.go b/network/p2p/streams.go index d79471f664..0b7838ffdc 100644 --- a/network/p2p/streams.go +++ b/network/p2p/streams.go @@ -96,8 +96,8 @@ func (n *streamManager) streamHandler(stream network.Stream) { n.handler(n.ctx, remotePeer, stream, incoming) } -// Connected is called when a connection is opened. -// It is called for both incoming (listener -> addConn) and outgoing (dialer -> addConn) connections. +// Connected is called when a connection is opened +// for both incoming (listener -> addConn) and outgoing (dialer -> addConn) connections. func (n *streamManager) Connected(net network.Network, conn network.Conn) { if conn.Stat().Direction == network.DirInbound && !n.allowIncomingGossip { n.log.Debugf("ignoring incoming connection from %s", conn.RemotePeer().String()) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 308ea4a31c..74c66fa000 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -1247,6 +1247,8 @@ func TestP2PEnableGossipService_NodeDisable(t *testing.T) { } } +// TestP2PEnableGossipService_BothDisable checks if both relay and node have EnableGossipService=false +// they do not connect to each other. func TestP2PEnableGossipService_BothDisable(t *testing.T) { partitiontest.PartitionTest(t) From 4864b187b3fbace0b1a4fc51936250fc49683192 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 24 Jul 2024 11:13:38 -0400 Subject: [PATCH 3/3] update TestP2PEnableGossipService_BothDisable --- network/p2pNetwork_test.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 74c66fa000..de578866ff 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -1227,7 +1227,7 @@ func TestP2PEnableGossipService_NodeDisable(t *testing.T) { netA.RegisterHandlers(passThroughHandlerA) netB.RegisterHandlers(passThroughHandlerB) - // send messages from B and confirm that they get received by C (via A) + // send messages from both nodes to each other and confirm they are received. for i := 0; i < 10; i++ { err = netA.Broadcast(context.Background(), testTag, []byte(fmt.Sprintf("hello from A %d", i)), false, nil) require.NoError(t, err) @@ -1248,7 +1248,10 @@ func TestP2PEnableGossipService_NodeDisable(t *testing.T) { } // TestP2PEnableGossipService_BothDisable checks if both relay and node have EnableGossipService=false -// they do not connect to each other. +// they do not gossip to each other. +// +// Note, this test checks a configuration where node A (relay) does not know about node B, +// and node B is configured to connect to A, and this scenario rejecting logic is guaranteed to work. func TestP2PEnableGossipService_BothDisable(t *testing.T) { partitiontest.PartitionTest(t) @@ -1262,7 +1265,7 @@ func TestP2PEnableGossipService_BothDisable(t *testing.T) { relayCfg := cfg relayCfg.NetAddress = "127.0.0.1:0" - netA, err := NewP2PNetwork(log, relayCfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) + netA, err := NewP2PNetwork(log.With("net", "netA"), relayCfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netA.Start() defer netA.Stop() @@ -1277,12 +1280,15 @@ func TestP2PEnableGossipService_BothDisable(t *testing.T) { nodeCfg := cfg nodeCfg.NetAddress = "" - netB, err := NewP2PNetwork(log, nodeCfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) + netB, err := NewP2PNetwork(log.With("net", "netB"), nodeCfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) require.NoError(t, err) netB.Start() defer netB.Stop() require.Eventually(t, func() bool { - return !netA.hasPeers() && !netB.hasPeers() + return len(netA.service.Conns()) > 0 && len(netB.service.Conns()) > 0 }, 1*time.Second, 50*time.Millisecond) + + require.False(t, netA.hasPeers()) + require.False(t, netB.hasPeers()) }