diff --git a/dht.go b/dht.go index c5676d39b..be670bc88 100644 --- a/dht.go +++ b/dht.go @@ -108,10 +108,10 @@ type IpfsDHT struct { // networks). enableProviders, enableValues bool - // maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "lastSuccessfulOutboundQuery" - // of the peer in the bucket above which we will evict it to make place for a new peer if the bucket - // is full - maxLastSuccessfulOutboundThreshold time.Duration + // successfulOutboundQueryGracePeriod is the maximum grace period we will give to a peer + // to between two successful query responses from it, failing which, + // we will ping it to see if it's alive. + successfulOutboundQueryGracePeriod time.Duration fixLowPeersChan chan struct{} } @@ -289,11 +289,12 @@ func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) { // be published soon. l1 := math.Log(float64(1) / float64(defaultBucketSize)) //(Log(1/K)) l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K)) - maxLastSuccessfulOutboundThreshold := l1 / l2 * float64(cfg.routingTable.refreshInterval) + maxLastSuccessfulOutboundThreshold := time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval)) self := kb.ConvertPeerID(dht.host.ID()) rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold) + dht.successfulOutboundQueryGracePeriod = maxLastSuccessfulOutboundThreshold cmgr := dht.host.ConnManager() rt.PeerAdded = func(p peer.ID) { @@ -448,22 +449,37 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error { // peerFound signals the routingTable that we've found a peer that // might support the DHT protocol. +// If we have a connection a peer but no exchange of a query RPC -> +// LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check) +// LastUsefulAt=N/A +// If we connect to a peer and exchange a query RPC -> +// LastQueriedAt=time.Now (same reason as above) +// LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it) +// If we query a peer we already have in our Routing Table -> +// LastQueriedAt=time.Now() +// LastUsefulAt remains unchanged +// If we connect to a peer we already have in the RT but do not exchange a query (rare) +// Do Nothing. func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) { logger.Debugw("peer found", "peer", p) b, err := dht.validRTPeer(p) if err != nil { logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err) } else if b { - _, err := dht.routingTable.TryAddPeer(p, queryPeer) + newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer) if err != nil { // peer not added. return } - // If we discovered the peer because of a query, we need to ensure we override the "zero" lastSuccessfulOutboundQuery + // If we freshly added the peer because of a query, we need to ensure we override the "zero" lastUsefulAt // value that must have been set in the Routing Table for this peer when it was first added during a connection. - if queryPeer { - dht.routingTable.UpdateLastSuccessfulOutboundQuery(p, time.Now()) + if newlyAdded && queryPeer { + dht.routingTable.UpdateLastUsefulAt(p, time.Now()) + } else if queryPeer { + // the peer is already in our RT, but we just successfully queried it and so let's give it a + // bump on the query time so we don't ping it too soon for a liveliness check. + dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now()) } } } diff --git a/dht_bootstrap.go b/dht_bootstrap.go index fb3961dc9..3cb2dee2c 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -128,7 +128,7 @@ func (dht *IpfsDHT) startRefreshing() { // ping Routing Table peers that haven't been hear of/from in the interval they should have been. for _, ps := range dht.routingTable.GetPeerInfos() { // ping the peer if it's due for a ping and evict it if the ping fails - if time.Since(ps.LastSuccessfulOutboundQuery) > dht.maxLastSuccessfulOutboundThreshold { + if time.Since(ps.LastSuccessfulOutboundQueryAt) > dht.successfulOutboundQueryGracePeriod { livelinessCtx, cancel := context.WithTimeout(ctx, peerPingTimeout) if err := dht.host.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil { logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err) diff --git a/go.mod b/go.mod index edd3a3f12..0cb165f36 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,8 @@ require ( github.com/libp2p/go-eventbus v0.1.0 github.com/libp2p/go-libp2p v0.7.4 github.com/libp2p/go-libp2p-core v0.5.1 - github.com/libp2p/go-libp2p-kbucket v0.3.3 - github.com/libp2p/go-libp2p-peerstore v0.2.2 + github.com/libp2p/go-libp2p-kbucket v0.4.1 + github.com/libp2p/go-libp2p-peerstore v0.2.3 github.com/libp2p/go-libp2p-record v0.1.2 github.com/libp2p/go-libp2p-swarm v0.2.3 github.com/libp2p/go-libp2p-testing v0.1.1 diff --git a/go.sum b/go.sum index c9bd54f76..00b292650 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,7 @@ github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOv github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78= @@ -21,6 +22,7 @@ github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVa github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= @@ -38,6 +40,8 @@ github.com/davidlazar/go-crypto v0.0.0-20170701192655-dcfb0a7ac018/go.mod h1:rQY github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ= github.com/dgraph-io/badger v1.6.0-rc1/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= +github.com/dgraph-io/badger v1.6.1/go.mod h1:FRmFw3uxvcpa8zG3Rxs0th+hCLIuaQg8HlNV5bjgnuU= +github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -118,9 +122,11 @@ github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjv github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaHzfGTzuE3s= github.com/ipfs/go-ds-badger v0.0.7/go.mod h1:qt0/fWzZDoPW6jpQeqUjR5kBfhDNB65jd9YlmAvpQBk= github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE= +github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBRn4FS6UHUk= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= +github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50= github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= @@ -167,6 +173,8 @@ github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d h1:68u9r4wEvL3gYg2jv github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d/go.mod h1:5Ky9EC2xfoUKUor0Hjgi2BJhCSXJfMOFlmyYrVKGQMk= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -230,8 +238,8 @@ github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-discovery v0.3.0 h1:+JnYBRLzZQtRq0mK3xhyjBwHytLmJXMTZkQfbw+UrGA= github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw= -github.com/libp2p/go-libp2p-kbucket v0.3.3 h1:V2Zwv6QnCK6Who0iiJW2eUKwdlTYGJ2HnLViaolDOcs= -github.com/libp2p/go-libp2p-kbucket v0.3.3/go.mod h1:IWFdYRBOYzaLEHnvrfzEkr+UcuveCXIoeO8QeFZSI6A= +github.com/libp2p/go-libp2p-kbucket v0.4.1 h1:6FyzbQuGLPzbMv3HiD232zqscIz5iB8ppJwb380+OGI= +github.com/libp2p/go-libp2p-kbucket v0.4.1/go.mod h1:7sCeZx2GkNK1S6lQnGUW5JYZCFPnXzAZCCBBS70lytY= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= @@ -260,6 +268,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.1 h1:u+gOfsKgu73ZkGWhvckRm03z9C+iS9Tr github.com/libp2p/go-libp2p-peerstore v0.2.1/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRjwRLBr4TYKfNgrUkOPA= github.com/libp2p/go-libp2p-peerstore v0.2.2 h1:iqc/m03jHn5doXN3+kS6JKvqQRHEltiXljQB85iVHWE= github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRjwRLBr4TYKfNgrUkOPA= +github.com/libp2p/go-libp2p-peerstore v0.2.3 h1:MofRq2l3c15vQpEygTetV+zRRrncz+ktiXW7H2EKoEQ= +github.com/libp2p/go-libp2p-peerstore v0.2.3/go.mod h1:K8ljLdFn590GMttg/luh4caB/3g0vKuY01psze0upRw= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= github.com/libp2p/go-libp2p-record v0.1.2 h1:M50VKzWnmUrk/M5/Dz99qO9Xh4vs8ijsK+7HkJvRP+0= @@ -472,6 +482,7 @@ github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a h1:/eS3yfGjQ github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= diff --git a/query.go b/query.go index d65c5cab1..7e8a27123 100644 --- a/query.go +++ b/query.go @@ -174,7 +174,7 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn } func (q *query) recordPeerIsValuable(p peer.ID) { - q.dht.routingTable.UpdateLastSuccessfulOutboundQuery(p, time.Now()) + q.dht.routingTable.UpdateLastUsefulAt(p, time.Now()) } func (q *query) recordValuablePeers() {