From d71339cd9f41d8c4aa4883e89ac0c615392e7ab0 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 8 Jul 2024 15:22:38 -0400 Subject: [PATCH 1/6] tests: enable debug logging in TestP2PRelay --- network/p2pNetwork_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 3f77d55f69..e139f902d9 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -795,7 +795,9 @@ func TestP2PRelay(t *testing.T) { partitiontest.PartitionTest(t) cfg := config.GetDefaultLocal() + cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses cfg.ForceFetchTransactions = true + cfg.BaseLoggerDebugLevel = 5 log := logging.TestingLog(t) log.Debugln("Starting netA") netA, err := NewP2PNetwork(log.With("net", "netA"), cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) From 3f984952f155d95fd8419b853b9a4d39360b9d59 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 8 Jul 2024 16:06:43 -0400 Subject: [PATCH 2/6] enable debug logging in TestVotersReloadFromDiskAfterOneStateProofCommitted as well --- ledger/ledger_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go index 968e6d8b21..bd32fca64e 100644 --- a/ledger/ledger_test.go +++ b/ledger/ledger_test.go @@ -1792,6 +1792,9 @@ func TestLedgerMemoryLeak(t *testing.T) { log := logging.TestingLog(t) log.SetLevel(logging.Info) // prevent spamming with ledger.AddValidatedBlock debug message deadlock.Opts.Disable = true // catchpoint writing might take long + defer func() { + deadlock.Opts.Disable = false + }() l, err := OpenLedger(log, dbName, inMem, genesisInitState, cfg) require.NoError(t, err) defer l.Close() @@ -2898,7 +2901,7 @@ func testVotersReloadFromDiskAfterOneStateProofCommitted(t *testing.T, cfg confi const inMem = true log := logging.TestingLog(t) - log.SetLevel(logging.Info) + log.SetLevel(logging.Debug) l, err := OpenLedger(log, dbName, inMem, genesisInitState, cfg) require.NoError(t, err) defer l.Close() From 1b1e0cbd75508d8339eb0ffff68e65cb81b3ec43 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 9 Jul 2024 12:34:20 -0400 Subject: [PATCH 3/6] make connectivity checks more exact --- network/p2pNetwork_test.go | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index e139f902d9..d4b901d667 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "net/http" + "slices" "sync" "sync/atomic" "testing" @@ -52,6 +53,13 @@ func (n *P2PNetwork) hasPeers() bool { return len(n.wsPeers) > 0 } +func (n *P2PNetwork) hasPeer(peerID peer.ID) bool { + n.wsPeersLock.RLock() + defer n.wsPeersLock.RUnlock() + _, ok := n.wsPeers[peerID] + return ok +} + func TestP2PSubmitTX(t *testing.T) { partitiontest.PartitionTest(t) @@ -894,16 +902,23 @@ func TestP2PRelay(t *testing.T) { require.Eventually( t, func() bool { - return len(netA.service.ListPeersForTopic(p2p.TXTopicName)) >= 2 && - len(netB.service.ListPeersForTopic(p2p.TXTopicName)) > 0 && - len(netC.service.ListPeersForTopic(p2p.TXTopicName)) > 0 + netAtopicPeers := netA.service.ListPeersForTopic(p2p.TXTopicName) + netBtopicPeers := netB.service.ListPeersForTopic(p2p.TXTopicName) + netCtopicPeers := netC.service.ListPeersForTopic(p2p.TXTopicName) + netBConnected := slices.Contains(netAtopicPeers, netB.service.ID()) + netCConnected := slices.Contains(netAtopicPeers, netC.service.ID()) + return len(netAtopicPeers) >= 2 && + len(netBtopicPeers) > 0 && + len(netCtopicPeers) > 0 && + netBConnected && netCConnected }, 10*time.Second, // wait until netC node gets actually connected to netA after starting 50*time.Millisecond, ) require.Eventually(t, func() bool { - return netA.hasPeers() && netB.hasPeers() && netC.hasPeers() + return netA.hasPeers() && netB.hasPeers() && netC.hasPeers() && + netA.hasPeer(netB.service.ID()) && netA.hasPeer(netC.service.ID()) }, 2*time.Second, 50*time.Millisecond) const expectedMsgs = 10 From 796a41616fff7e498de19458f437f3a65827430d Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 9 Jul 2024 16:03:32 -0400 Subject: [PATCH 4/6] temporary enable pubsub tracing --- network/p2p/p2p.go | 2 +- network/p2p/pubsub.go | 26 +++++++++++++++++++++++++- network/p2pNetwork_test.go | 1 + 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index ac0489d5e1..7cd32d7d77 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -150,7 +150,7 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho telemetryProtoInfo := formatPeerTelemetryInfoProtocolName(telemetryID, telemetryInstance) h.SetStreamHandler(protocol.ID(telemetryProtoInfo), func(s network.Stream) { s.Close() }) - ps, err := makePubSub(ctx, cfg, h) + ps, err := makePubSub(ctx, log, cfg, h) if err != nil { return nil, err } diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go index a968bcb6a9..53adfddd75 100644 --- a/network/p2p/pubsub.go +++ b/network/p2p/pubsub.go @@ -21,6 +21,7 @@ import ( "time" "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/logging" pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p/core/host" @@ -55,7 +56,7 @@ const TXTopicName = "/algo/tx/0.1.0" const incomingThreads = 20 // matches to number wsNetwork workers -func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub.PubSub, error) { +func makePubSub(ctx context.Context, log logging.Logger, cfg config.Local, host host.Host) (*pubsub.PubSub, error) { //defaultParams := pubsub.DefaultGossipSubParams() options := []pubsub.Option{ @@ -98,11 +99,34 @@ func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub. pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), // pubsub.WithValidateThrottle(cfg.TxBacklogSize), pubsub.WithValidateWorkers(incomingThreads), + pubsub.WithEventTracer(&pubsubEventTracer{log: log}), } return pubsub.NewGossipSub(ctx, host, options...) } +type pubsubEventTracer struct { + pubsub.EventTracer + log logging.Logger +} + +func (t *pubsubEventTracer) Trace(evt *pubsub_pb.TraceEvent) { + log := t.log.With("pubsub", "trace").With("type", evt.GetType().String()).With("peerID", peer.ID(evt.GetPeerID()).String()) + if evt.DeliverMessage != nil { + log = log.With("msg-from", peer.ID(evt.DeliverMessage.GetReceivedFrom()).String()) + } + if evt.RecvRPC != nil { + log = log.With("rpc-from", peer.ID(evt.RecvRPC.GetReceivedFrom()).String()) + } + if evt.RejectMessage != nil { + log = log.With("rej-from", peer.ID(evt.RejectMessage.GetReceivedFrom()).String()) + } + if evt.DuplicateMessage != nil { + log = log.With("dup-from", peer.ID(evt.DuplicateMessage.GetReceivedFrom()).String()) + } + log.Debugf("%s", evt) +} + func txMsgID(m *pubsub_pb.Message) string { h := blake2b.Sum256(m.Data) return string(h[:]) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index d4b901d667..3d4edbce99 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -893,6 +893,7 @@ func TestP2PRelay(t *testing.T) { log.Debugf("Starting netC with phonebook addresses %v", phoneBookAddresses) netC, err := NewP2PNetwork(log.With("net", "netC"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) require.NoError(t, err) + require.True(t, netC.relayMessages) err = netC.Start() require.NoError(t, err) defer netC.Stop() From e27356ba142366d5d8483cb6109eb4c15a6f81b1 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 10 Jul 2024 10:52:25 -0400 Subject: [PATCH 5/6] Revert "temporary enable pubsub tracing" This reverts commit 796a41616fff7e498de19458f437f3a65827430d. --- network/p2p/p2p.go | 2 +- network/p2p/pubsub.go | 26 +------------------------- network/p2pNetwork_test.go | 1 - 3 files changed, 2 insertions(+), 27 deletions(-) diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 7cd32d7d77..ac0489d5e1 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -150,7 +150,7 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho telemetryProtoInfo := formatPeerTelemetryInfoProtocolName(telemetryID, telemetryInstance) h.SetStreamHandler(protocol.ID(telemetryProtoInfo), func(s network.Stream) { s.Close() }) - ps, err := makePubSub(ctx, log, cfg, h) + ps, err := makePubSub(ctx, cfg, h) if err != nil { return nil, err } diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go index 53adfddd75..a968bcb6a9 100644 --- a/network/p2p/pubsub.go +++ b/network/p2p/pubsub.go @@ -21,7 +21,6 @@ import ( "time" "github.com/algorand/go-algorand/config" - "github.com/algorand/go-algorand/logging" pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p/core/host" @@ -56,7 +55,7 @@ const TXTopicName = "/algo/tx/0.1.0" const incomingThreads = 20 // matches to number wsNetwork workers -func makePubSub(ctx context.Context, log logging.Logger, cfg config.Local, host host.Host) (*pubsub.PubSub, error) { +func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub.PubSub, error) { //defaultParams := pubsub.DefaultGossipSubParams() options := []pubsub.Option{ @@ -99,34 +98,11 @@ func makePubSub(ctx context.Context, log logging.Logger, cfg config.Local, host pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), // pubsub.WithValidateThrottle(cfg.TxBacklogSize), pubsub.WithValidateWorkers(incomingThreads), - pubsub.WithEventTracer(&pubsubEventTracer{log: log}), } return pubsub.NewGossipSub(ctx, host, options...) } -type pubsubEventTracer struct { - pubsub.EventTracer - log logging.Logger -} - -func (t *pubsubEventTracer) Trace(evt *pubsub_pb.TraceEvent) { - log := t.log.With("pubsub", "trace").With("type", evt.GetType().String()).With("peerID", peer.ID(evt.GetPeerID()).String()) - if evt.DeliverMessage != nil { - log = log.With("msg-from", peer.ID(evt.DeliverMessage.GetReceivedFrom()).String()) - } - if evt.RecvRPC != nil { - log = log.With("rpc-from", peer.ID(evt.RecvRPC.GetReceivedFrom()).String()) - } - if evt.RejectMessage != nil { - log = log.With("rej-from", peer.ID(evt.RejectMessage.GetReceivedFrom()).String()) - } - if evt.DuplicateMessage != nil { - log = log.With("dup-from", peer.ID(evt.DuplicateMessage.GetReceivedFrom()).String()) - } - log.Debugf("%s", evt) -} - func txMsgID(m *pubsub_pb.Message) string { h := blake2b.Sum256(m.Data) return string(h[:]) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 3d4edbce99..d4b901d667 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -893,7 +893,6 @@ func TestP2PRelay(t *testing.T) { log.Debugf("Starting netC with phonebook addresses %v", phoneBookAddresses) netC, err := NewP2PNetwork(log.With("net", "netC"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) require.NoError(t, err) - require.True(t, netC.relayMessages) err = netC.Start() require.NoError(t, err) defer netC.Stop() From 6af46df9ad17562bdc3baf3ee52745c973ae7b3f Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 10 Jul 2024 10:54:13 -0400 Subject: [PATCH 6/6] skip TestP2PRelay on circleci --- network/p2pNetwork_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index d4b901d667..5b3470689f 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -22,7 +22,9 @@ import ( "fmt" "io" "net/http" + "os" "slices" + "strings" "sync" "sync/atomic" "testing" @@ -802,6 +804,10 @@ func TestP2PHTTPHandler(t *testing.T) { func TestP2PRelay(t *testing.T) { partitiontest.PartitionTest(t) + if strings.ToUpper(os.Getenv("CIRCLECI")) == "TRUE" { + t.Skip("Flaky on CIRCLECI") + } + cfg := config.GetDefaultLocal() cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses cfg.ForceFetchTransactions = true @@ -893,6 +899,7 @@ func TestP2PRelay(t *testing.T) { log.Debugf("Starting netC with phonebook addresses %v", phoneBookAddresses) netC, err := NewP2PNetwork(log.With("net", "netC"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) require.NoError(t, err) + require.True(t, netC.relayMessages) err = netC.Start() require.NoError(t, err) defer netC.Stop()