Skip to content

Commit

Permalink
identify: remove support for Identify Delta
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jan 3, 2023
1 parent e5fa2d2 commit fc47d18
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 748 deletions.
2 changes: 1 addition & 1 deletion limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func SetDefaultServiceLimits(config *rcmgr.ScalingLimitConfig) {
rcmgr.BaseLimit{StreamsInbound: 16, StreamsOutbound: 16, Streams: 32, Memory: 1 << 20},
rcmgr.BaseLimitIncrease{},
)
for _, id := range [...]protocol.ID{identify.ID, identify.IDDelta, identify.IDPush} {
for _, id := range [...]protocol.ID{identify.ID, identify.IDPush} {
config.AddProtocolLimit(
id,
rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 128, Memory: 4 << 20},
Expand Down
2 changes: 0 additions & 2 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ func TestHostProtoPreference(t *testing.T) {

// Prevent pushing identify information so this test works.
h1.RemoveStreamHandler(identify.IDPush)
h1.RemoveStreamHandler(identify.IDDelta)

h2.SetStreamHandler(protoOld, handler)

Expand Down Expand Up @@ -362,7 +361,6 @@ func TestHostProtoPreknowledge(t *testing.T) {
h2.SetStreamHandler("/super", handler)
// Prevent pushing identify information so this test actually _uses_ the super protocol.
h1.RemoveStreamHandler(identify.IDPush)
h1.RemoveStreamHandler(identify.IDDelta)

h2pi := h2.Peerstore().PeerInfo(h2.ID())
require.NoError(t, h1.Connect(ctx, h2pi))
Expand Down
9 changes: 3 additions & 6 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
}

// register protocols that do not depend on peer records.
h.SetStreamHandler(IDDelta, s.deltaHandler)
h.SetStreamHandler(ID, s.sendIdentifyResp)
h.SetStreamHandler(IDPush, s.pushHandler)

Expand Down Expand Up @@ -268,20 +267,18 @@ func (ids *idService) loop() {
select {
case phs[pid].pushCh <- struct{}{}:
default:
log.Debugf("dropping addr updated message for %s as buffer full", pid.Pretty())
log.Debugf("dropping addr updated message for %s as buffer full", pid.String())
}
}

case event.EvtLocalProtocolsUpdated:
for pid := range phs {
select {
case phs[pid].deltaCh <- struct{}{}:
case phs[pid].pushCh <- struct{}{}:
default:
log.Debugf("dropping protocol updated message for %s as buffer full", pid.Pretty())
log.Debugf("dropping protocol updated message for %s as buffer full", pid.String())
}
}
}

case <-ids.ctx.Done():
return
}
Expand Down
82 changes: 0 additions & 82 deletions p2p/protocol/identify/id_delta.go

This file was deleted.

188 changes: 5 additions & 183 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package identify_test
import (
"context"
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -360,169 +358,8 @@ func TestLocalhostAddrFiltering(t *testing.T) {
}
}

func TestIdentifyDeltaOnProtocolChange(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h1 := blhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t))
defer h2.Close()
defer h1.Close()

h2.SetStreamHandler(protocol.TestingID, func(_ network.Stream) {})

ids1, err := identify.NewIDService(h1)
require.NoError(t, err)

ids2, err := identify.NewIDService(h2)
require.NoError(t, err)

defer func() {
ids1.Close()
ids2.Close()
}()

idComplete, err := h1.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
require.NoError(t, err)
defer idComplete.Close()
idFailed, err := h1.EventBus().Subscribe(&event.EvtPeerIdentificationFailed{})
require.NoError(t, err)
defer idFailed.Close()

if err := h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
t.Fatal(err)
}

conn := h1.Network().ConnsToPeer(h2.ID())[0]
select {
case <-ids1.IdentifyWait(conn):
case <-time.After(5 * time.Second):
t.Fatal("took over 5 seconds to identify")
}

select {
case <-idComplete.Out():
case evt := <-idFailed.Out():
t.Fatalf("Failed to identify: %v", evt.(event.EvtPeerIdentificationFailed).Reason)
case <-time.After(5 * time.Second):
t.Fatal("Missing id event")
}

protos, err := h1.Peerstore().GetProtocols(h2.ID())
if err != nil {
t.Fatal(err)
}
sort.Strings(protos)
if sort.SearchStrings(protos, string(protocol.TestingID)) == len(protos) {
t.Fatalf("expected peer 1 to know that peer 2 speaks the Test protocol amongst others")
}

// set up a subscriber to listen to peer protocol updated events in h1. We expect to receive events from h2
// as protocols are added and removed.
sub, err := h1.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{})
if err != nil {
t.Fatal(err)
}
defer sub.Close()

h1ProtocolsUpdates, err := h1.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{})
require.NoError(t, err)
defer h1ProtocolsUpdates.Close()

waitForDelta := make(chan struct{})
go func() {
expectedCount := 2
for expectedCount > 0 {
evt := <-h1ProtocolsUpdates.Out()
expectedCount -= len(evt.(event.EvtPeerProtocolsUpdated).Added)
}
close(waitForDelta)
}()

// add two new protocols in h2 and wait for identify to send deltas.
h2.SetStreamHandler(protocol.ID("foo"), func(_ network.Stream) {})
h2.SetStreamHandler(protocol.ID("bar"), func(_ network.Stream) {})

recvWithTimeout(t, waitForDelta, 10*time.Second, "Timed out waiting to read protocol ids from the wire")

protos, err = h1.Peerstore().GetProtocols(h2.ID())
require.NoError(t, err)

have := make(map[string]bool, len(protos))
for _, p := range protos {
have[p] = true
}
require.True(t, have["foo"])
require.True(t, have["bar"])

// remove one of the newly added protocols from h2, and wait for identify to send the delta.
h2.RemoveStreamHandler(protocol.ID("bar"))

waitForDelta = make(chan struct{})
go func() {
expectedCount := 1
for expectedCount > 0 {
evt := <-h1ProtocolsUpdates.Out()
expectedCount -= len(evt.(event.EvtPeerProtocolsUpdated).Removed)
}
close(waitForDelta)
}()

// check that h1 now has forgotten about h2's bar protocol.
recvWithTimeout(t, waitForDelta, 10*time.Second, "timed out waiting for protocol to be removed")
protos, err = h1.Peerstore().GetProtocols(h2.ID())
require.NoError(t, err)
have = make(map[string]bool, len(protos))
for _, p := range protos {
have[p] = true
}
require.True(t, have["foo"])
require.False(t, have["bar"])

// make sure that h1 emitted events in the eventbus for h2's protocol updates.
done := make(chan struct{})

var lk sync.Mutex
var added []string
var removed []string
var success bool

go func() {
defer close(done)
for {
select {
case <-time.After(5 * time.Second):
return
case e, ok := <-sub.Out():
if !ok {
return
}
evt := e.(event.EvtPeerProtocolsUpdated)
lk.Lock()
added = append(added, protocol.ConvertToStrings(evt.Added)...)
removed = append(removed, protocol.ConvertToStrings(evt.Removed)...)
sort.Strings(added)
sort.Strings(removed)
if reflect.DeepEqual(added, []string{"bar", "foo"}) &&
reflect.DeepEqual(removed, []string{"bar"}) {
success = true
lk.Unlock()
return
}
lk.Unlock()
}
}
}()

<-done

lk.Lock()
defer lk.Unlock()
require.True(t, success, "did not get correct peer protocol updated events")
}

// TestIdentifyDeltaWhileIdentifyingConn tests that the host waits to push delta updates if an identify is ongoing.
func TestIdentifyDeltaWhileIdentifyingConn(t *testing.T) {
// TestIdentifyPushWhileIdentifyingConn tests that the host waits to push delta updates if an identify is ongoing.
func TestIdentifyPushWhileIdentifyingConn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -559,9 +396,7 @@ func TestIdentifyDeltaWhileIdentifyingConn(t *testing.T) {

// from h2, identify h1.
conn := h2.Network().ConnsToPeer(h1.ID())[0]
go func() {
ids2.IdentifyConn(conn)
}()
go ids2.IdentifyConn(conn)

<-time.After(500 * time.Millisecond)

Expand Down Expand Up @@ -722,7 +557,7 @@ func TestNotListening(t *testing.T) {
}
}

func TestSendPushIfDeltaNotSupported(t *testing.T) {
func TestSendPush(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -749,19 +584,6 @@ func TestSendPushIfDeltaNotSupported(t *testing.T) {
ids1.IdentifyConn(h1.Network().ConnsToPeer(h2.ID())[0])
ids2.IdentifyConn(h2.Network().ConnsToPeer(h1.ID())[0])

// h1 knows h2 speaks Delta
sup, err := h1.Peerstore().SupportsProtocols(h2.ID(), []string{identify.IDDelta}...)
require.NoError(t, err)
require.Equal(t, []string{identify.IDDelta}, sup)

// h2 stops supporting Delta and that information flows to h1
h2.RemoveStreamHandler(identify.IDDelta)

require.Eventually(t, func() bool {
sup, err := h1.Peerstore().SupportsProtocols(h2.ID(), []string{identify.IDDelta}...)
return err == nil && len(sup) == 0
}, time.Second, 10*time.Millisecond)

// h1 starts listening on a new protocol and h2 finds out about that through a push
h1.SetStreamHandler("rand", func(network.Stream) {})
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -1019,7 +841,7 @@ func TestIncomingIDStreamsTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

protocols := []protocol.ID{identify.IDPush, identify.IDDelta}
protocols := []protocol.ID{identify.IDPush}

for _, p := range protocols {
h1 := blhost.NewBlankHost(swarmt.GenSwarm(t))
Expand Down
Loading

0 comments on commit fc47d18

Please sign in to comment.