Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable Peer Filtering #471

Merged
merged 18 commits into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type IpfsDHT struct {
beta int // The number of peers closest to a target that must have responded for a query path to terminate
d int // Number of Disjoint Paths to query

queryPeerFilter QueryFilterFunc
routingTablePeerFilter RouteTableFilterFunc

autoRefresh bool
rtRefreshQueryTimeout time.Duration
rtRefreshPeriod time.Duration
Expand Down Expand Up @@ -204,7 +207,6 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
}

func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {

protocols := []protocol.ID{cfg.protocolPrefix + kad2}
serverProtocols := []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}

Expand All @@ -219,21 +221,23 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
}

dht := &IpfsDHT{
datastore: cfg.datastore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
birth: time.Now(),
rng: rand.New(rand.NewSource(rand.Int63())),
protocols: protocols,
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
beta: cfg.resiliency,
d: cfg.disjointPaths,
triggerRtRefresh: make(chan chan<- error),
triggerSelfLookup: make(chan chan<- error),
datastore: cfg.datastore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
birth: time.Now(),
rng: rand.New(rand.NewSource(rand.Int63())),
protocols: protocols,
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
beta: cfg.resiliency,
d: cfg.disjointPaths,
triggerRtRefresh: make(chan chan<- error),
triggerSelfLookup: make(chan chan<- error),
queryPeerFilter: cfg.queryPeerFilter,
routingTablePeerFilter: cfg.routingTable.peerFilter,
}

// construct routing table
Expand Down Expand Up @@ -273,7 +277,7 @@ func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
return false
}

return b
return b && cfg.routingTable.peerFilter(dht, dht.Host().Network().ConnsToPeer(p))
}

rtOpts := []kb.Option{kb.PeerValidationFnc(pvF)}
Expand Down Expand Up @@ -411,7 +415,15 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
// supports the DHT protocol.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerFound", p)
dht.routingTable.HandlePeerAlive(p)
currentConns := dht.host.Network().ConnsToPeer(p)
if len(currentConns) > 0 {
if dht.routingTablePeerFilter(dht, currentConns) {
willscott marked this conversation as resolved.
Show resolved Hide resolved
dht.routingTable.HandlePeerAlive(p)
}
} else {
// speculative addition; validated by PeerValidationFunction upon connectedness.
willscott marked this conversation as resolved.
Show resolved Hide resolved
dht.routingTable.HandlePeerAlive(p)
}
}

// peerStoppedDHT signals the routing table that a peer has stopped supporting the DHT protocol.
Expand All @@ -426,7 +438,6 @@ func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
func (dht *IpfsDHT) peerDisconnected(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerDisconnected", p)
dht.routingTable.HandlePeerDisconnect(p)

}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Expand Down
137 changes: 137 additions & 0 deletions dht_filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package dht

import (
"bytes"
"net"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
netroute "github.com/libp2p/go-netroute"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

// QueryFilterFunc is a filter applied when considering peers to dial when querying
type QueryFilterFunc func(dht *IpfsDHT, ai peer.AddrInfo) bool

// RouteTableFilterFunc is a filter applied when considering connections to keep in
// the local route table.
type RouteTableFilterFunc func(dht *IpfsDHT, conns []network.Conn) bool

// PublicQueryFilter returns true if the peer is suspected of being publicly accessible
func PublicQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool {
if len(ai.Addrs) == 0 {
return false
}

var hasPublicAddr bool
for _, a := range ai.Addrs {
if isRelayAddr(a) {
return false
}
if manet.IsPublicAddr(a) {
hasPublicAddr = true
}
}
return hasPublicAddr
}

var _ QueryFilterFunc = PublicQueryFilter

// PublicRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate
// that it is on a public network
func PublicRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
if len(conns) == 0 {
return false
}

// Is one of these connections to a public address?
for _, c := range conns {
addr := c.RemoteMultiaddr()
if !isRelayAddr(addr) && manet.IsPublicAddr(addr) {
return true
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On further consideration, drop this entire check. We only want peers that announce public addresses. Direct connections form undialable peers will have public addresses.

The check below should (I think?) look at all the addresses advertised by the peer (assuming identify has finished).


// Do we have a public address for this peer?
id := conns[0].RemotePeer()
known := dht.peerstore.PeerInfo(id)
for _, a := range known.Addrs {
if !isRelayAddr(a) && manet.IsPublicAddr(a) {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
return true
}
}

return false
}

var _ RouteTableFilterFunc = PublicRoutingTableFilter

// PrivateQueryFilter doens't currently restrict which peers we are willing to query from the local DHT.
func PrivateQueryFilter(dht *IpfsDHT, ai peer.AddrInfo) bool {
return len(ai.Addrs) > 0
}

var _ QueryFilterFunc = PrivateQueryFilter

// PrivateRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate
// that it is on a private network
func PrivateRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
router, _ := netroute.New()
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
myAdvertisedIPs := make([]net.IP, 0)
for _, a := range dht.Host().Addrs() {
if manet.IsPublicAddr(a) {
willscott marked this conversation as resolved.
Show resolved Hide resolved
ip, _ := manet.ToIP(a)
willscott marked this conversation as resolved.
Show resolved Hide resolved
myAdvertisedIPs = append(myAdvertisedIPs, ip)
}
}

for _, c := range conns {
if manet.IsPrivateAddr(c.RemoteMultiaddr()) {
return true
}

willscott marked this conversation as resolved.
Show resolved Hide resolved
if manet.IsPublicAddr(c.RemoteMultiaddr()) {
ip, _ := manet.ToIP(c.RemoteMultiaddr())
willscott marked this conversation as resolved.
Show resolved Hide resolved

// if the ip is the same as one of the local host's public advertised IPs - then consider it local
for _, i := range myAdvertisedIPs {
if i.Equal(ip) {
return true
}
if ip.To4() == nil {
if i.To4() == nil && isEUI(ip) && sameV6Net(i, ip) {
return true
}
}
}

// if there's no gateway - a direct host in the OS routing table - then consider it local
if router != nil {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
_, gw, _, err := router.Route(ip)
if gw == nil && err == nil {
return true
}
}
}
}

return false
}

var _ RouteTableFilterFunc = PrivateRoutingTableFilter

func isEUI(ip net.IP) bool {
// per rfc 2373
return ip[11] == 0xff && ip[12] == 0xfe
}

func sameV6Net(a, b net.IP) bool {
return bytes.Equal(a[0:8], b[0:8])
}

func isRelayAddr(a ma.Multiaddr) bool {
val, err := a.ValueForProtocol(ma.P_CIRCUIT)
return err != nil && val != ""
willscott marked this conversation as resolved.
Show resolved Hide resolved
}
28 changes: 27 additions & 1 deletion dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (

ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-record"
record "github.com/libp2p/go-libp2p-record"
)

// ModeOpt describes what mode the dht should operate in
Expand Down Expand Up @@ -38,13 +40,15 @@ type config struct {
maxRecordAge time.Duration
enableProviders bool
enableValues bool
queryPeerFilter QueryFilterFunc

routingTable struct {
refreshQueryTimeout time.Duration
refreshPeriod time.Duration
autoRefresh bool
latencyTolerance time.Duration
checkInterval time.Duration
peerFilter RouteTableFilterFunc
}

// internal parameters, not publicly exposed
Expand All @@ -54,6 +58,9 @@ type config struct {
testProtocols []protocol.ID
}

func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true }
func emptyRTFilter(_ *IpfsDHT, conns []network.Conn) bool { return true }

// apply applies the given options to this Option
func (c *config) apply(opts ...Option) error {
for i, opt := range opts {
Expand All @@ -79,11 +86,13 @@ var defaults = func(o *config) error {
o.protocolPrefix = DefaultPrefix
o.enableProviders = true
o.enableValues = true
o.queryPeerFilter = emptyQueryFilter

o.routingTable.latencyTolerance = time.Minute
o.routingTable.refreshQueryTimeout = 30 * time.Second
o.routingTable.refreshPeriod = 10 * time.Minute
o.routingTable.autoRefresh = true
o.routingTable.peerFilter = emptyRTFilter
o.maxRecordAge = time.Hour * 36

o.bucketSize = defaultBucketSize
Expand Down Expand Up @@ -312,6 +321,23 @@ func DisableValues() Option {
}
}

// QueryFilter sets a function that approves which peers may be dialed in a query
func QueryFilter(filter QueryFilterFunc) Option {
return func(c *config) error {
c.queryPeerFilter = filter
return nil
}
}

// RoutingTableFilter sets a function that approves which peers may be added to the routing table. The host should
// already have at least one connection to the peer under consideration.
func RoutingTableFilter(filter RouteTableFilterFunc) Option {
return func(c *config) error {
c.routingTable.peerFilter = filter
return nil
}
}

// customProtocols is only to be used for testing. It sets the protocols that the DHT listens on and queries with to be
// the ones passed in. The custom protocols are still augmented by the Prefix.
func customProtocols(protos ...protocol.ID) Option {
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ require (
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.6.2-0.20200323213923-1425d551c298
github.com/libp2p/go-libp2p-core v0.5.0
github.com/libp2p/go-libp2p-host v0.1.0
willscott marked this conversation as resolved.
Show resolved Hide resolved
github.com/libp2p/go-libp2p-kbucket v0.3.2-0.20200320132433-d1a1e9242e0c
github.com/libp2p/go-libp2p-peerstore v0.2.1
github.com/libp2p/go-libp2p-record v0.1.2
github.com/libp2p/go-libp2p-swarm v0.2.2
github.com/libp2p/go-libp2p-testing v0.1.1
github.com/libp2p/go-msgio v0.0.4
github.com/libp2p/go-netroute v0.0.0-20200310024203-5e4206756dff
willscott marked this conversation as resolved.
Show resolved Hide resolved
github.com/mr-tron/base58 v1.1.3
github.com/multiformats/go-base32 v0.0.3
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multiaddr-dns v0.2.0
github.com/multiformats/go-multiaddr-net v0.1.3
github.com/multiformats/go-multihash v0.0.13
github.com/multiformats/go-multistream v0.1.1
github.com/stretchr/testify v1.5.1
Expand Down
Loading