Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
Delay dials to realy addrs
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Feb 16, 2018
1 parent 881b974 commit 3699629
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 12 deletions.
167 changes: 167 additions & 0 deletions dial_delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package swarm

import (
"context"
"time"

ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt"
)

const p_circuit = 290

const numTiers = 2
const tierDelay = 2 * time.Second

var relay = mafmt.Or(mafmt.Base(p_circuit), mafmt.And(mafmt.IPFS, mafmt.Base(p_circuit)))

// delayDialAddrs returns a address channel sorted by priority, pushing the
// addresses with delay between them. The other channel can be used to trigger
// sending more addresses in case all previous failed
func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multiaddr, chan<- struct{}) {
out := make(chan ma.Multiaddr)
delay := time.NewTimer(tierDelay)
triggerNext := make(chan struct{}, 1)

go func() {
defer delay.Stop()
defer close(out)
var pending [numTiers][]ma.Multiaddr
lastTier := 0

// put enqueues the mutliaddr
put := func(addr ma.Multiaddr) {
tier := getTier(addr)
pending[tier] = append(pending[tier], addr)
}

// get gets the best (lowest tier) multiaddr available
get := func() (ma.Multiaddr, int) {
for i, tier := range pending[:] {
if len(tier) > 0 {
addr := tier[len(tier)-1]
tier[len(tier)-1] = nil
pending[i] = tier[:len(tier)-1]
return addr, i
}
}
return nil, -1
}

outer:
for {
fill:
for {
select {
case addr, ok := <-c:
if !ok {
break outer
}
put(addr)
default:
break fill
}
}

next, tier := get()

// Nothing? Block!
if next == nil {
select {
case addr, ok := <-c:
if !ok {
break outer
}
put(addr)
case <-ctx.Done():
return
}
continue
}

// Jumping a tier?
if tier > lastTier {
// Wait the delay (preempt with new addresses or when the dialer
// requests more addresses)
select {
case addr, ok := <-c:
if !ok {
break outer
}
put(addr)
continue
case <-delay.C:
delay.Reset(tierDelay)
case <-triggerNext:
if !delay.Stop() {
<-delay.C
}
delay.Reset(tierDelay)
case <-ctx.Done():
return
}
}

lastTier = tier

select {
case addr, ok := <-c:
put(next)
if !ok {
break outer
}
put(addr)
continue
case out <- next:
// Always count the timeout since the last dial.
if !delay.Stop() {
<-delay.C
}
delay.Reset(tierDelay)
case <-ctx.Done():
return
}
}

// finish sending
for {
next, tier := get()
if next == nil {
return
}
if tier > lastTier {
select {
case <-delay.C:
case <-triggerNext:
if !delay.Stop() {
<-delay.C
}
delay.Reset(tierDelay)
case <-ctx.Done():
return
}
}
tier = lastTier
select {
case out <- next:
delay.Stop()
delay.Reset(tierDelay)
case <-ctx.Done():
return
}
}
}()

return out, triggerNext
}

// getTier returns the priority tier of the address.
// return value must be > 0 & < numTiers.
func getTier(addr ma.Multiaddr) int {
switch {
case relay.Matches(addr):
return 1
default:
return 0
}
}
56 changes: 44 additions & 12 deletions swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,39 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

// Diagram of dial sync:
// Diagram of Dial
//
// many callers of Dial() synched w. dials many addrs results to callers
// ----------------------\ dialsync use earliest /--------------
// -----------------------\ |----------\ /----------------
// ------------------------>------------<------- >---------<-----------------
// -----------------------| \----x \----------------
// ----------------------| \-----x \---------------
// any may fail if no addr at end
// retry dialAttempt x
// Concurrent Calls
// to Dial(peerId) dial finds dialable
// / addresses and passes
// v them to dialAddrs
// Dial(peerId) --------------------\ |
// Dial(peerId) -------------| v
// Dial(peerId) ------------>----> doDial() --> dial() --> dialAddrs() <- dialAddrs concurrently dials
// Dial(peerId) ------------| ^ __/\__ all known peer addresses
// Dial(peerId) ------/ | /||||||\
// / ||||||||
// Synced with dialsync LLLLL--- <- Costly paths are delayed
// calls doDial DDDDD
// |||||
// /--------------------------------> ||--- <- limitedDial (LD) limits
// Each connection is established || concurrency for transports
// by dialAddr, which starts a Some connection attempts -> x|- which require file descriptors
// network connection and sets may fail | ||
// up the encryption layers | || --- <- if needed, dials through costly
// | || ||| paths (such as relay) will start
// | || LLL
// \-> xx--DDD
// |||||
// eventually a connection will |||||
// get established (C), other -> |C|||
// Connection setup will be attempts will get cancelled x|xxx
// performed with dialConnSetup. |
// It adds the connection to the ----------------------------------> |
// swarm and agrees on the |
// muxer to use /---\
// The result is distributed -> |||||
// to callers of Dial |||||

var (
// ErrDialBackoff is returned by the backoff code when a given peer has
Expand Down Expand Up @@ -287,6 +310,8 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.
ctx, cancel := context.WithCancel(ctx)
defer cancel() // cancel work when we exit func

sortedAddrs, triggerMore := delayDialAddrs(ctx, remoteAddrs)

// use a single response type instead of errs and conns, reduces complexity *a ton*
respch := make(chan dialResult)

Expand All @@ -296,11 +321,12 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.
defer s.limiter.clearAllPeerDials(p)

var active int

for {
select {
case addr, ok := <-remoteAddrs:
case addr, ok := <-sortedAddrs:
if !ok {
remoteAddrs = nil
sortedAddrs = nil
if active == 0 {
return nil, exitErr
}
Expand All @@ -321,9 +347,15 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.
// Errors are normal, lots of dials will fail
exitErr = resp.Err

if remoteAddrs == nil && active == 0 {
if sortedAddrs == nil && active == 0 {
return nil, exitErr
}
if active == 0 {
select {
case triggerMore <- struct{}{}:
default:
}
}
} else if resp.Conn != nil {
return resp.Conn, nil
}
Expand Down

0 comments on commit 3699629

Please sign in to comment.