Skip to content

Commit

Permalink
Merge branch 'master' into force-reachability
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos authored Sep 28, 2023
2 parents 200b854 + 7f466c1 commit accd406
Show file tree
Hide file tree
Showing 28 changed files with 493 additions and 270 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ nix develop
docker run -i -t -p 60000:60000 -p 9000:9000/udp \
wakuorg/go-waku:latest \ # or, the image:tag of your choice
--dns-discovery:true \
--dns-discovery-url:enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im \
--dns-discovery-url:enrtree://ANEDLO25QVUGJOUTQFRYKWX6P4Z4GKVESBMHML7DZ6YK4LGS5FC5O@prod.wakuv2.nodes.status.im \
--discv5-discovery
```

Expand Down
6 changes: 6 additions & 0 deletions cmd/waku/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ var (
Destination: &options.MaxPeerConnections,
EnvVars: []string{"WAKUNODE2_MAX_CONNECTIONS"},
})
PeerStoreCapacity = altsrc.NewIntFlag(&cli.IntFlag{
Name: "peer-store-capacity",
Usage: "Maximum stored peers in the peerstore.",
Destination: &options.PeerStoreCapacity,
EnvVars: []string{"WAKUNODE2_PEERSTORE_CAPACITY"},
})
WebsocketSupport = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "websocket-support",
Aliases: []string{"ws"},
Expand Down
1 change: 1 addition & 0 deletions cmd/waku/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func main() {
TcpPort,
Address,
MaxPeerConnections,
PeerStoreCapacity,
WebsocketSupport,
WebsocketPort,
WebsocketSecurePort,
Expand Down
3 changes: 2 additions & 1 deletion cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func Execute(options NodeOptions) {
node.WithKeepAlive(options.KeepAlive),
node.WithMaxPeerConnections(options.MaxPeerConnections),
node.WithPrometheusRegisterer(prometheus.DefaultRegisterer),
node.WithPeerStoreCapacity(options.PeerStoreCapacity),
}
if len(options.AdvertiseAddresses) != 0 {
nodeOpts = append(nodeOpts, node.WithAdvertiseAddresses(options.AdvertiseAddresses...))
Expand Down Expand Up @@ -330,7 +331,7 @@ func Execute(options NodeOptions) {
}

for _, d := range discoveredNodes {
wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DNSDiscovery, nil)
wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DNSDiscovery, nil, true)
}

//For now assuming that static peers added support/listen on all topics specified via commandLine.
Expand Down
1 change: 1 addition & 0 deletions cmd/waku/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ type NodeOptions struct {
UserAgent string
PProf bool
MaxPeerConnections int
PeerStoreCapacity int

PeerExchange PeerExchangeOptions
Websocket WSOptions
Expand Down
4 changes: 2 additions & 2 deletions docs/api/dnsdisc.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

func main() {
discoveryURL := "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@test.waku.nodes.status.im"
discoveryURL := "enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im"
nodes, err := dnsdisc.RetrieveNodes(context.Background(), discoveryURL)
if err != nil {
panic(err)
Expand All @@ -40,4 +40,4 @@ func main() {
}
```

`dnsdisc.RetrieveNodes` can also accept a `WithNameserver(nameserver string)` option which can be used to specify the nameserver to use to retrieve the TXT record from the domain name
`dnsdisc.RetrieveNodes` can also accept a `WithNameserver(nameserver string)` option which can be used to specify the nameserver to use to retrieve the TXT record from the domain name
6 changes: 3 additions & 3 deletions docs/operators/how-to/configure-dns-disc.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ A node will attempt connection to all discovered nodes.

This can be used, for example, to connect to one of the existing fleets.
Current URLs for the published fleet lists:
- production fleet: `enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im`
- test fleet: `enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@test.waku.nodes.status.im`
- production fleet: `enrtree://ANEDLO25QVUGJOUTQFRYKWX6P4Z4GKVESBMHML7DZ6YK4LGS5FC5O@prod.wakuv2.nodes.status.im`
- test fleet: `enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im`

See the [separate tutorial](../../tutorial/dns-disc.md) for a complete guide to DNS discovery.
See the [separate tutorial](../../tutorial/dns-disc.md) for a complete guide to DNS discovery.
8 changes: 4 additions & 4 deletions docs/operators/how-to/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ Discovery v5 will attempt to extract the ENRs of the discovered nodes as bootstr
```sh
./build/waku \
--dns-discovery=true \
--dns-discovery-url=enrtree://ANTL4SLG2COUILKAPE7EF2BYNL2SHSHVCHLRD5J7ZJLN5R3PRJD2Y@prod.waku.nodes.status.im \
--dns-discovery-url=enrtree://ANEDLO25QVUGJOUTQFRYKWX6P4Z4GKVESBMHML7DZ6YK4LGS5FC5O@prod.wakuv2.nodes.status.im \
--discv5-discovery=true
```

Expand All @@ -150,7 +150,7 @@ Discovery v5 will attempt to extract the ENRs of the discovered nodes as bootstr
```sh
./build/waku \
--dns-discovery=true \
--dns-discovery-url=enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@test.waku.nodes.status.im \
--dns-discovery-url=enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im \
--discv5-discovery=true
```

Expand All @@ -169,7 +169,7 @@ appears below.
--db-path=/mnt/go-waku/data/db1/ \
--store-capacity=150000 \
--dns-discovery=true \
--dns-discovery-url=enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@test.waku.nodes.status.im \
--dns-discovery-url=enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im \
--discv5-discovery=true
```

Expand All @@ -181,4 +181,4 @@ A running go-waku node can be interacted with using the [Waku v2 JSON RPC API](h

> **Note:** Private and Admin API functionality are disabled by default.
To configure a go-waku node with these enabled,
use the `--rpc-admin:true` and `--rpc-private:true` CLI options.
use the `--rpc-admin:true` and `--rpc-private:true` CLI options.
2 changes: 1 addition & 1 deletion examples/c-bindings/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ int main(int argc, char *argv[])
WAKU_CALL(waku_connect("/dns4/node-01.gc-us-central1-a.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS", 0, handle_error)); // Connect to a node

// To use dns discovery, and retrieve nodes from a enrtree url
WAKU_CALL( waku_dns_discovery("enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@test.waku.nodes.status.im", "", 0, handle_ok, handle_error)); // Discover Nodes
WAKU_CALL( waku_dns_discovery("enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im", "", 0, handle_ok, handle_error)); // Discover Nodes
printf("Discovered nodes: %s\n", result);

WAKU_CALL(waku_default_pubsub_topic(handle_ok));
Expand Down
4 changes: 2 additions & 2 deletions examples/chat2/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,10 @@ func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) {
var dnsDiscoveryUrl string
if options.Fleet != fleetNone {
if options.Fleet == fleetTest {
dnsDiscoveryUrl = "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@test.waku.nodes.status.im"
dnsDiscoveryUrl = "enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im"
} else {
// Connect to prod by default
dnsDiscoveryUrl = "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im"
dnsDiscoveryUrl = "enrtree://ANEDLO25QVUGJOUTQFRYKWX6P4Z4GKVESBMHML7DZ6YK4LGS5FC5O@prod.wakuv2.nodes.status.im"
}
}

Expand Down
2 changes: 1 addition & 1 deletion examples/noise/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func main() {
func discoverFleetNodes(wakuNode *node.WakuNode) error {
log.Info("Connecting to test fleet...")

dnsDiscoveryUrl := "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@test.waku.nodes.status.im"
dnsDiscoveryUrl := "enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im"
nodes, err := dnsdisc.RetrieveNodes(context.Background(), dnsDiscoveryUrl)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions library/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ func FilterUnsubscribe(filterJSON string, peerID string, ms int) error {
ctx = context.Background()
}

var fOptions []filter.FilterUnsubscribeOption
var fOptions []filter.FilterSubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
return err
}
fOptions = append(fOptions, filter.Peer(p))
fOptions = append(fOptions, filter.WithPeer(p))
} else {
return errors.New("peerID is required")
}
Expand Down Expand Up @@ -176,13 +176,13 @@ func FilterUnsubscribeAll(peerID string, ms int) (string, error) {
ctx = context.Background()
}

var fOptions []filter.FilterUnsubscribeOption
var fOptions []filter.FilterSubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
return "", err
}
fOptions = append(fOptions, filter.Peer(p))
fOptions = append(fOptions, filter.WithPeer(p))
} else {
fOptions = append(fOptions, filter.UnsubscribeAll())
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/dnsdisc/enr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// TestRetrieveNodes uses a live connection, so it could be
// flaky, it should though pay for itself and should be fairly stable
func TestRetrieveNodes(t *testing.T) {
url := "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@test.waku.nodes.status.im"
url := "enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im"

nodes, err := RetrieveNodes(context.Background(), url)
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
}

//Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log)
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, w.log)

w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log)
if err != nil {
Expand Down Expand Up @@ -701,7 +701,7 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics
}

// AddDiscoveredPeer to add a discovered peer to the node peerStore
func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string) {
func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, connectNow bool) {
p := peermanager.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
Expand All @@ -710,7 +710,7 @@ func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wp
},
PubSubTopics: pubsubTopics,
}
w.peermanager.AddDiscoveredPeer(p)
w.peermanager.AddDiscoveredPeer(p, connectNow)
}

// DialPeerWithMultiAddress is used to connect to a peer using a multiaddress
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/wakunode2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestUpAndDown(t *testing.T) {
key1, _ := tests.RandomHex(32)
prvKey1, _ := crypto.HexToECDSA(key1)

nodes, err := dnsdisc.RetrieveNodes(context.Background(), "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im")
nodes, err := dnsdisc.RetrieveNodes(context.Background(), "enrtree://ANEDLO25QVUGJOUTQFRYKWX6P4Z4GKVESBMHML7DZ6YK4LGS5FC5O@prod.wakuv2.nodes.status.im")
require.NoError(t, err)

var bootnodes []*enode.Node
Expand Down
8 changes: 8 additions & 0 deletions waku/v2/node/wakuoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type WakuNodeParameters struct {
rendezvousDB *rendezvous.DB

maxPeerConnections int
peerStoreCapacity int

enableDiscV5 bool
udpPort uint
Expand Down Expand Up @@ -356,6 +357,13 @@ func WithMaxPeerConnections(maxPeers int) WakuNodeOption {
}
}

func WithPeerStoreCapacity(capacity int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.peerStoreCapacity = capacity
return nil
}
}

// WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery
func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool) WakuNodeOption {
return func(params *WakuNodeParameters) error {
Expand Down
31 changes: 11 additions & 20 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"

"go.uber.org/zap"

Expand Down Expand Up @@ -113,8 +114,14 @@ func (c *PeerConnectionStrategy) consumeSubscription(ch <-chan PeerData) {
if !ok {
return
}
c.pm.AddDiscoveredPeer(p)
c.PushToChan(p)
triggerImmediateConnection := false
//Not connecting to peer as soon as it is discovered,
// rather expecting this to be pushed from PeerManager based on the need.
if len(c.host.Network().Peers()) < waku_proto.GossipSubOptimalFullMeshSize {
triggerImmediateConnection = true
}
c.pm.AddDiscoveredPeer(p, triggerImmediateConnection)

case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine
break
Expand All @@ -137,8 +144,8 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error {

}
func (c *PeerConnectionStrategy) start() error {
c.WaitGroup().Add(2)
go c.shouldDialPeers()
c.WaitGroup().Add(1)

go c.dialPeers()

c.consumeSubscriptions()
Expand All @@ -155,22 +162,6 @@ func (c *PeerConnectionStrategy) isPaused() bool {
return c.paused.Load()
}

func (c *PeerConnectionStrategy) shouldDialPeers() {
defer c.WaitGroup().Done()

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.Context().Done():
return
case <-ticker.C:
_, outRelayPeers := c.pm.getRelayPeers()
c.paused.Store(outRelayPeers.Len() >= c.pm.OutRelayPeersTarget) // pause if no of OutPeers more than or eq to target
}
}
}

// it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set.
func (c *PeerConnectionStrategy) consumeSubscriptions() {
for _, subs := range c.subscriptions {
Expand Down
Loading

0 comments on commit accd406

Please sign in to comment.