Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodically schedule identify push if the address set has changed #597

Merged
merged 9 commits into from
Apr 19, 2019
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
}
}

// start the host background tasks
h.Start()

if router != nil {
return routed.Wrap(h, router), nil
}
Expand Down
75 changes: 74 additions & 1 deletion p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"net"
"sync"
"time"

logging "github.com/ipfs/go-log"
Expand Down Expand Up @@ -70,6 +71,11 @@ type BasicHost struct {
negtimeout time.Duration

proc goprocess.Process

ctx context.Context
cancel func()
mx sync.Mutex
lastAddrs []ma.Multiaddr
}

// HostOpts holds options that can be passed to NewHost in order to
Expand Down Expand Up @@ -164,6 +170,11 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost,

net.SetConnHandler(h.newConnHandler)
net.SetStreamHandler(h.newStreamHandler)

bgctx, cancel := context.WithCancel(ctx)
h.ctx = bgctx
h.cancel = cancel

return h, nil
}

Expand Down Expand Up @@ -204,6 +215,11 @@ func New(net inet.Network, opts ...interface{}) *BasicHost {
return h
}

// Start starts background tasks in the host
func (h *BasicHost) Start() {
go h.background()
}

// newConnHandler is the remote-opened conn handler for inet.Network
func (h *BasicHost) newConnHandler(c inet.Conn) {
// Clear protocols on connecting to new peer to avoid issues caused
Expand Down Expand Up @@ -263,7 +279,63 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) {
// PushIdentify pushes an identify update through the identify push protocol
// Warning: this interface is unstable and may disappear in the future.
func (h *BasicHost) PushIdentify() {
h.ids.Push()
push := false

h.mx.Lock()
addrs := h.Addrs()
if !sameAddrs(addrs, h.lastAddrs) {
push = true
h.lastAddrs = addrs
}
h.mx.Unlock()

if push {
h.ids.Push()
}
}

func (h *BasicHost) background() {
// periodically schedules an IdentifyPush to update our peers for changes
// in our address set (if needed)
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()

// initialize lastAddrs
h.mx.Lock()
if h.lastAddrs == nil {
h.lastAddrs = h.Addrs()
}
h.mx.Unlock()

for {
select {
case <-ticker.C:
h.PushIdentify()

case <-h.ctx.Done():
return
}
}
}

func sameAddrs(a, b []ma.Multiaddr) bool {
if len(a) != len(b) {
return false
}

bmap := make(map[string]struct{}, len(b))
for _, addr := range b {
bmap[string(addr.Bytes())] = struct{}{}
}

for _, addr := range a {
_, ok := bmap[string(addr.Bytes())]
if !ok {
return false
}
}

return true
}

// ID returns the (local) peer.ID associated with this Host
Expand Down Expand Up @@ -646,6 +718,7 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr {

// Close shuts down the Host's services (network, etc).
func (h *BasicHost) Close() error {
h.cancel()
return h.proc.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/net/mock/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ func TestLimitedStreams(t *testing.T) {
func TestFuzzManyPeers(t *testing.T) {
peerCount := 50000
if detectrace.WithRace() {
peerCount = 1000
peerCount = 100
}
for i := 0; i < peerCount; i++ {
_, err := FullMeshConnected(context.Background(), 2)
Expand Down
34 changes: 30 additions & 4 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const transientTTL = 10 * time.Second
type IDService struct {
Host host.Host

ctx context.Context

// connections undergoing identification
// for wait purposes
currid map[inet.Conn]chan struct{}
Expand All @@ -64,6 +66,7 @@ type IDService struct {
func NewIDService(ctx context.Context, h host.Host) *IDService {
s := &IDService{
Host: h,
ctx: ctx,
currid: make(map[inet.Conn]chan struct{}),
observedAddrs: NewObservedAddrSet(ctx),
}
Expand Down Expand Up @@ -156,19 +159,42 @@ func (ids *IDService) pushHandler(s inet.Stream) {
}

func (ids *IDService) Push() {
var wg sync.WaitGroup

ctx, cancel := context.WithTimeout(ids.ctx, 30*time.Second)
ctx = inet.WithNoDial(ctx, "identify push")

for _, p := range ids.Host.Network().Peers() {
wg.Add(1)
go func(p peer.ID) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
defer wg.Done()

s, err := ids.Host.NewStream(ctx, p, IDPush)
if err != nil {
log.Debugf("error opening push stream: %s", err.Error())
log.Debugf("error opening push stream to %s: %s", p, err.Error())
return
}

ids.requestHandler(s)
rch := make(chan struct{}, 1)
go func() {
ids.requestHandler(s)
rch <- struct{}{}
}()

select {
case <-rch:
case <-ctx.Done():
// this is taking too long, abort!
s.Reset()
}
}(p)
}

// this supervisory goroutine is necessary to cancel the context
go func() {
wg.Wait()
cancel()
}()
}

func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) {
Expand Down