Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P: Introduce profiles for hybridRelay, hybridArchival, and hybridClient. #6062

Merged
merged 9 commits into from
Jul 18, 2024
70 changes: 62 additions & 8 deletions cmd/algocfg/profileCommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ var (
},
}

relay = configUpdater{
description: "Relay consensus messages across the network and support catchup.",
wsRelay = configUpdater{
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
description: "Relay consensus messages across the ws network and support recent catchup.",
updateFunc: func(cfg config.Local) config.Local {
cfg.MaxBlockHistoryLookback = 22000 // Enough to support 2 catchpoints with some wiggle room for nodes to catch up from the older one
cfg.CatchpointFileHistoryLength = 3
Expand All @@ -80,7 +80,7 @@ var (
}

archival = configUpdater{
description: "Store the full chain history and support catchup.",
description: "Store the full chain history and support full catchup.",
updateFunc: func(cfg config.Local) config.Local {
cfg.Archival = true
cfg.EnableLedgerService = true
Expand All @@ -91,13 +91,67 @@ var (
},
}

hybridRelay = configUpdater{
description: "Relay consensus messages across both ws and p2p networks, also support recent catchup.",
updateFunc: func(cfg config.Local) config.Local {
// WS relay config defaults
cfg.MaxBlockHistoryLookback = 22000 // Enough to support 2 catchpoints with some wiggle room for nodes to catch up from the older one
cfg.CatchpointFileHistoryLength = 3
cfg.CatchpointTracking = 2
cfg.EnableLedgerService = true
cfg.EnableBlockService = true
cfg.NetAddress = ":4160"
// This should be set to the public address of the node if public access is desired
cfg.PublicAddress = config.PlaceholderPublicAddress

// P2P config defaults
cfg.EnableP2PHybridMode = true
cfg.P2PNetAddress = ":4190"
cfg.EnableDHTProviders = true
return cfg
},
}

hybridArchival = configUpdater{
description: "Store the full chain history, support full catchup, P2P enabled, discoverable via DHT.",
updateFunc: func(cfg config.Local) config.Local {
cfg.Archival = true
cfg.EnableLedgerService = true
cfg.EnableBlockService = true
cfg.NetAddress = ":4160"
cfg.EnableGossipService = false
// This should be set to the public address of the node
cfg.PublicAddress = config.PlaceholderPublicAddress

// P2P config defaults
cfg.EnableP2PHybridMode = true
cfg.P2PNetAddress = ":4190"
cfg.EnableDHTProviders = true
return cfg
},
}

hybridClient = configUpdater{
description: "Participate in consensus or simply ensure chain health by validating blocks and supporting P2P traffic propagation.",
updateFunc: func(cfg config.Local) config.Local {

// P2P config defaults
cfg.EnableP2PHybridMode = true
cfg.EnableDHTProviders = true
return cfg
},
}

// profileNames are the supported pre-configurations of config values
profileNames = map[string]configUpdater{
"participation": participation,
"conduit": conduit,
"relay": relay,
"archival": archival,
"development": development,
"participation": participation,
"conduit": conduit,
"wsRelay": wsRelay,
"archival": archival,
"development": development,
"hybridRelay": hybridRelay,
"hybridArchival": hybridArchival,
"hybridClient": hybridClient,
}

forceUpdate bool
Expand Down
62 changes: 62 additions & 0 deletions cmd/algocfg/profileCommand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package main

import (
"github.com/algorand/go-algorand/config"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -62,4 +63,65 @@ func Test_getConfigForArg(t *testing.T) {
require.Equal(t, ":4160", cfg.NetAddress)
require.False(t, cfg.EnableGossipService)
})

t.Run("valid config test hybrid relay", func(t *testing.T) {
t.Parallel()
cfg, err := getConfigForArg("hybridRelay")
require.NoError(t, err)

require.False(t, cfg.Archival)
require.Equal(t, uint64(22000), cfg.MaxBlockHistoryLookback)
require.Equal(t, 3, cfg.CatchpointFileHistoryLength)
require.Equal(t, int64(2), cfg.CatchpointTracking)
require.True(t, cfg.EnableLedgerService)
require.True(t, cfg.EnableBlockService)
require.Equal(t, ":4160", cfg.NetAddress)
require.True(t, cfg.EnableGossipService)
require.Equal(t, config.PlaceholderPublicAddress, cfg.PublicAddress)

require.True(t, cfg.EnableP2PHybridMode)
require.Equal(t, ":4190", cfg.P2PNetAddress)
require.True(t, cfg.EnableDHTProviders)
})

t.Run("valid config test hybrid archival", func(t *testing.T) {
t.Parallel()
cfg, err := getConfigForArg("hybridArchival")
require.NoError(t, err)

require.True(t, cfg.Archival)
require.Equal(t, uint64(0), cfg.MaxBlockHistoryLookback)
require.Equal(t, 365, cfg.CatchpointFileHistoryLength)
require.Equal(t, int64(0), cfg.CatchpointTracking)
require.True(t, cfg.EnableLedgerService)
require.True(t, cfg.EnableBlockService)
require.Equal(t, ":4160", cfg.NetAddress)
require.False(t, cfg.EnableGossipService)
require.Equal(t, config.PlaceholderPublicAddress, cfg.PublicAddress)

require.True(t, cfg.EnableP2PHybridMode)
require.Equal(t, ":4190", cfg.P2PNetAddress)
require.True(t, cfg.EnableDHTProviders)
})

t.Run("valid config test hybrid client", func(t *testing.T) {
t.Parallel()
cfg, err := getConfigForArg("hybridClient")
require.NoError(t, err)

require.False(t, cfg.Archival)
require.Equal(t, uint64(0), cfg.MaxBlockHistoryLookback)
require.Equal(t, 365, cfg.CatchpointFileHistoryLength)
require.Equal(t, int64(0), cfg.CatchpointTracking)
require.False(t, cfg.EnableLedgerService)
require.False(t, cfg.EnableBlockService)
require.Empty(t, cfg.NetAddress)
// True because it is the default value, net address is blank so has no effect in practice
require.True(t, cfg.EnableGossipService)
require.Equal(t, "", cfg.PublicAddress)

require.True(t, cfg.EnableP2PHybridMode)
require.Equal(t, "", cfg.P2PNetAddress)
require.True(t, cfg.EnableDHTProviders)
})
}
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@
// as long as CatchpointInterval > 0
const CatchpointTrackingModeStored = 2

// PlaceholderPublicAddress is a placeholder for the public address generated in certain profiles
const PlaceholderPublicAddress = "PLEASE_SET_ME"

// LoadConfigFromDisk returns a Local config structure based on merging the defaults
// with settings loaded from the config file from the custom dir. If the custom file
// cannot be loaded, the default config is returned (with the error from loading the
Expand Down Expand Up @@ -145,6 +148,11 @@

err = loadConfig(f, &source)

// If the PublicAddress in config file has the PlaceholderPublicAddress, treat it as if it were empty
if source.PublicAddress == PlaceholderPublicAddress {
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
source.PublicAddress = ""

Check warning on line 153 in config/config.go

View check run for this annotation

Codecov / codecov/patch

config/config.go#L153

Added line #L153 was not covered by tests
}

if source.NetAddress != "" {
source.EnableLedgerService = true
source.EnableBlockService = true
Expand Down
8 changes: 7 additions & 1 deletion network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@
listenAddr = parsedListenAddr
}
} else {
listenAddr = "/ip4/0.0.0.0/tcp/0"
// don't listen if NetAddress is not set.
listenAddr = ""
}

var disableMetrics = func(cfg *libp2p.Config) error { return nil }
Expand Down Expand Up @@ -163,6 +164,11 @@

// Start starts the P2P service
func (s *serviceImpl) Start() error {
if s.listenAddr == "" {

Check warning on line 167 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L167

Added line #L167 was not covered by tests
// don't listen if no listen address configured
return nil

Check warning on line 169 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L169

Added line #L169 was not covered by tests
}

listenAddr, err := multiaddr.NewMultiaddr(s.listenAddr)
if err != nil {
s.log.Errorf("failed to create multiaddress: %s", err)
Expand Down
8 changes: 6 additions & 2 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,11 @@ func (n *P2PNetwork) Start() error {
go n.handler.messageHandlerThread(&n.wg, n.wsPeersConnectivityCheckTicker.C, n, "network", "P2PNetwork")
}

n.wg.Add(1)
go n.httpdThread()
// start the HTTP server if configured to listen
if n.config.NetAddress != "" {
n.wg.Add(1)
go n.httpdThread()
}

n.wg.Add(1)
go n.broadcaster.broadcastThread(&n.wg, n, "network", "P2PNetwork")
Expand Down Expand Up @@ -471,6 +474,7 @@ func (n *P2PNetwork) meshThread() {

func (n *P2PNetwork) httpdThread() {
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
defer n.wg.Done()

err := n.httpServer.Serve()
if err != nil {
n.log.Errorf("Error serving libp2phttp: %v", err)
Expand Down
16 changes: 14 additions & 2 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestP2PSubmitTX(t *testing.T) {

cfg := config.GetDefaultLocal()
cfg.ForceFetchTransactions = true
cfg.NetAddress = "127.0.0.1:0"
log := logging.TestingLog(t)
netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
Expand Down Expand Up @@ -159,6 +160,7 @@ func TestP2PSubmitTXNoGossip(t *testing.T) {

cfg := config.GetDefaultLocal()
cfg.ForceFetchTransactions = true
cfg.NetAddress = "127.0.0.1:0"
log := logging.TestingLog(t)
netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
Expand Down Expand Up @@ -189,6 +191,8 @@ func TestP2PSubmitTXNoGossip(t *testing.T) {

// run netC in NPN mode (no relay => no gossip sup => no TX receiving)
cfg.ForceFetchTransactions = false
// Have to unset NetAddress to get IsGossipServer to return false
cfg.NetAddress = ""
netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
netC.Start()
Expand Down Expand Up @@ -253,6 +257,7 @@ func TestP2PSubmitWS(t *testing.T) {
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.NetAddress = "127.0.0.1:0"
log := logging.TestingLog(t)
netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
Expand Down Expand Up @@ -584,6 +589,7 @@ func TestP2PNetworkDHTCapabilities(t *testing.T) {
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.NetAddress = "127.0.0.1:0"
cfg.EnableDHTProviders = true
log := logging.TestingLog(t)

Expand Down Expand Up @@ -744,6 +750,7 @@ func TestP2PHTTPHandler(t *testing.T) {
cfg := config.GetDefaultLocal()
cfg.EnableDHTProviders = true
cfg.GossipFanout = 1
cfg.NetAddress = "127.0.0.1:0"
log := logging.TestingLog(t)

netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
Expand Down Expand Up @@ -812,6 +819,7 @@ func TestP2PRelay(t *testing.T) {
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses
cfg.ForceFetchTransactions = true
cfg.BaseLoggerDebugLevel = 5
cfg.NetAddress = "127.0.0.1:0"
log := logging.TestingLog(t)
log.Debugln("Starting netA")
netA, err := NewP2PNetwork(log.With("net", "netA"), cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
Expand All @@ -829,6 +837,8 @@ func TestP2PRelay(t *testing.T) {
multiAddrStr := addrsA[0].String()
phoneBookAddresses := []string{multiAddrStr}

// Explicitly unset NetAddress for netB
cfg.NetAddress = ""
log.Debugf("Starting netB with phonebook addresses %v", phoneBookAddresses)
netB, err := NewP2PNetwork(log.With("net", "netB"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
Expand Down Expand Up @@ -880,8 +890,8 @@ func TestP2PRelay(t *testing.T) {
counterHandler, counterDone := makeCounterHandler(1, &counter, nil)
netA.RegisterProcessors(counterHandler)

// send 5 messages from both netB to netA
// since there is no node with listening address set => no messages should be received
// send 5 messages from netB to netA
// since relaying is disabled on net B => no messages should be received by net A
for i := 0; i < 5; i++ {
err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1031,6 +1041,7 @@ func TestP2PWantTXGossip(t *testing.T) {
net.wantTXGossip.Store(true)
net.nodeInfo = &nopeNodeInfo{}
net.config.ForceFetchTransactions = false
net.config.NetAddress = ""
net.relayMessages = false
net.OnNetworkAdvance()
require.Eventually(t, func() bool { net.wg.Wait(); return true }, 1*time.Second, 50*time.Millisecond)
Expand All @@ -1048,6 +1059,7 @@ func TestP2PWantTXGossip(t *testing.T) {
net.wantTXGossip.Store(false)
net.nodeInfo = &nopeNodeInfo{}
net.config.ForceFetchTransactions = false
net.config.NetAddress = ""
net.relayMessages = true
net.OnNetworkAdvance()
require.Eventually(t, func() bool { return mockService.count.Load() == 3 }, 1*time.Second, 50*time.Millisecond)
Expand Down
Loading