diff --git a/config/config.go b/config/config.go index 0b8c676bd4..79b04420b3 100644 --- a/config/config.go +++ b/config/config.go @@ -228,6 +228,9 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) { } } + // start the host background tasks + h.Start() + if router != nil { return routed.Wrap(h, router), nil } diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 0725fa702d..1b11bbdc80 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -4,6 +4,7 @@ import ( "context" "io" "net" + "sync" "time" logging "github.com/ipfs/go-log" @@ -70,6 +71,11 @@ type BasicHost struct { negtimeout time.Duration proc goprocess.Process + + ctx context.Context + cancel func() + mx sync.Mutex + lastAddrs []ma.Multiaddr } // HostOpts holds options that can be passed to NewHost in order to @@ -164,6 +170,11 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost, net.SetConnHandler(h.newConnHandler) net.SetStreamHandler(h.newStreamHandler) + + bgctx, cancel := context.WithCancel(ctx) + h.ctx = bgctx + h.cancel = cancel + return h, nil } @@ -204,6 +215,11 @@ func New(net inet.Network, opts ...interface{}) *BasicHost { return h } +// Start starts background tasks in the host +func (h *BasicHost) Start() { + go h.background() +} + // newConnHandler is the remote-opened conn handler for inet.Network func (h *BasicHost) newConnHandler(c inet.Conn) { // Clear protocols on connecting to new peer to avoid issues caused @@ -263,7 +279,63 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { // PushIdentify pushes an identify update through the identify push protocol // Warning: this interface is unstable and may disappear in the future. func (h *BasicHost) PushIdentify() { - h.ids.Push() + push := false + + h.mx.Lock() + addrs := h.Addrs() + if !sameAddrs(addrs, h.lastAddrs) { + push = true + h.lastAddrs = addrs + } + h.mx.Unlock() + + if push { + h.ids.Push() + } +} + +func (h *BasicHost) background() { + // periodically schedules an IdentifyPush to update our peers for changes + // in our address set (if needed) + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + // initialize lastAddrs + h.mx.Lock() + if h.lastAddrs == nil { + h.lastAddrs = h.Addrs() + } + h.mx.Unlock() + + for { + select { + case <-ticker.C: + h.PushIdentify() + + case <-h.ctx.Done(): + return + } + } +} + +func sameAddrs(a, b []ma.Multiaddr) bool { + if len(a) != len(b) { + return false + } + + bmap := make(map[string]struct{}, len(b)) + for _, addr := range b { + bmap[string(addr.Bytes())] = struct{}{} + } + + for _, addr := range a { + _, ok := bmap[string(addr.Bytes())] + if !ok { + return false + } + } + + return true } // ID returns the (local) peer.ID associated with this Host @@ -646,6 +718,7 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr { // Close shuts down the Host's services (network, etc). func (h *BasicHost) Close() error { + h.cancel() return h.proc.Close() } diff --git a/p2p/net/mock/mock_test.go b/p2p/net/mock/mock_test.go index fa53c16ec5..473e4424df 100644 --- a/p2p/net/mock/mock_test.go +++ b/p2p/net/mock/mock_test.go @@ -588,7 +588,7 @@ func TestLimitedStreams(t *testing.T) { func TestFuzzManyPeers(t *testing.T) { peerCount := 50000 if detectrace.WithRace() { - peerCount = 1000 + peerCount = 100 } for i := 0; i < peerCount; i++ { _, err := FullMeshConnected(context.Background(), 2) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 9aac465bfa..cfc4d08897 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -47,6 +47,8 @@ const transientTTL = 10 * time.Second type IDService struct { Host host.Host + ctx context.Context + // connections undergoing identification // for wait purposes currid map[inet.Conn]chan struct{} @@ -64,6 +66,7 @@ type IDService struct { func NewIDService(ctx context.Context, h host.Host) *IDService { s := &IDService{ Host: h, + ctx: ctx, currid: make(map[inet.Conn]chan struct{}), observedAddrs: NewObservedAddrSet(ctx), } @@ -156,19 +159,42 @@ func (ids *IDService) pushHandler(s inet.Stream) { } func (ids *IDService) Push() { + var wg sync.WaitGroup + + ctx, cancel := context.WithTimeout(ids.ctx, 30*time.Second) + ctx = inet.WithNoDial(ctx, "identify push") + for _, p := range ids.Host.Network().Peers() { + wg.Add(1) go func(p peer.ID) { - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() + defer wg.Done() + s, err := ids.Host.NewStream(ctx, p, IDPush) if err != nil { - log.Debugf("error opening push stream: %s", err.Error()) + log.Debugf("error opening push stream to %s: %s", p, err.Error()) return } - ids.requestHandler(s) + rch := make(chan struct{}, 1) + go func() { + ids.requestHandler(s) + rch <- struct{}{} + }() + + select { + case <-rch: + case <-ctx.Done(): + // this is taking too long, abort! + s.Reset() + } }(p) } + + // this supervisory goroutine is necessary to cancel the context + go func() { + wg.Wait() + cancel() + }() } func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) {