Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
Delay dials to realy addrs
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Feb 15, 2018
1 parent 881b974 commit 19ab72d
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 12 deletions.
113 changes: 113 additions & 0 deletions dial_delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package swarm

import (
"context"
"sort"
"time"

ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt"
)

const p_circuit = 290

var relay = mafmt.Or(mafmt.Base(p_circuit), mafmt.And(mafmt.IPFS, mafmt.Base(p_circuit)))

// delayDialAddrs returns a address channel sorted by priority, pushing the
// addresses with delay between them. The other channel can be used to trigger
// sending more addresses in case all previous failed
func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multiaddr, chan<- struct{}) {
type delayAddr struct {
maddr ma.Multiaddr
delay time.Duration
}

gatherAddrs := func(c <-chan ma.Multiaddr) []ma.Multiaddr {
addrs := make([]ma.Multiaddr, 0, len(c))
for {
select {
case a, ok := <-c:
if !ok {
return addrs
}
addrs = append(addrs, a)
default: //NOTE: This may need to be switched to a tiny timeout at some point
return addrs
}
}
}

sortTiered := func(addrs []ma.Multiaddr) []*delayAddr {
delayed := make([]*delayAddr, len(addrs))
sort.Slice(addrs, func(i, j int) bool {
return dialOffset(addrs[i]) < dialOffset(addrs[j])
})

if len(addrs) == 0 {
return nil
}

lastOffset := dialOffset(addrs[0])
for i, a := range addrs {
offset := dialOffset(a)
delayed[i] = &delayAddr{
maddr: a,
delay: offset - lastOffset,
}
lastOffset = offset
}

return delayed
}

out := make(chan ma.Multiaddr)
triggerNext := make(chan struct{}, 1)

go func() {
defer close(out)

for {
addrs := make([]ma.Multiaddr, 0, 1)
select {
case a, ok := <-c:
if !ok {
return
}
addrs = append(addrs, a)
}

addrs = append(addrs, gatherAddrs(c)...)
delayed := sortTiered(addrs)

for _, a := range delayed {
if a.delay > 0 {
after := time.After(a.delay)
select {
case <-triggerNext:
case <-after:
case <-ctx.Done():
return
}
}
select {
case out <- a.maddr:
case <-ctx.Done():
return
}
}
}
}()

return out, triggerNext
}

// dialOffset returns the maximum amount of time dialing to the given address
// can be delayed
func dialOffset(addr ma.Multiaddr) time.Duration {
switch {
case relay.Matches(addr):
return 2000 * time.Millisecond //TODO: adjust based on something
default:
return 0
}
}
56 changes: 44 additions & 12 deletions swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,39 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

// Diagram of dial sync:
// Diagram of Dial
//
// many callers of Dial() synched w. dials many addrs results to callers
// ----------------------\ dialsync use earliest /--------------
// -----------------------\ |----------\ /----------------
// ------------------------>------------<------- >---------<-----------------
// -----------------------| \----x \----------------
// ----------------------| \-----x \---------------
// any may fail if no addr at end
// retry dialAttempt x
// Concurrent Calls
// to Dial(peerId) dial finds dialable
// / addresses and passes
// v them to dialAddrs
// Dial(peerId) --------------------\ |
// Dial(peerId) -------------| v
// Dial(peerId) ------------>----> doDial() --> dial() --> dialAddrs() <- dialAddrs concurrently dials
// Dial(peerId) ------------| ^ __/\__ all known peer addresses
// Dial(peerId) ------/ | /||||||\
// / ||||||||
// Synced with dialsync LLLLL--- <- Costly paths are delayed
// calls doDial DDDDD
// |||||
// /--------------------------------> ||--- <- limitedDial (LD) limits
// Each connection is established || concurrency for transports
// by dialAddr, which starts a Some connection attempts -> x|- which require file descriptors
// network connection and sets may fail | ||
// up the encryption layers | || --- <- if needed, dials through costly
// | || ||| paths (such as relay) will start
// | || LLL
// \-> xx--DDD
// |||||
// eventually a connection will |||||
// get established (C), other -> |C|||
// Connection setup will be attempts will get cancelled x|xxx
// performed with dialConnSetup. |
// It adds the connection to the ----------------------------------> |
// swarm and agrees on the |
// muxer to use /---\
// The result is distributed -> |||||
// to callers of Dial |||||

var (
// ErrDialBackoff is returned by the backoff code when a given peer has
Expand Down Expand Up @@ -287,6 +310,8 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.
ctx, cancel := context.WithCancel(ctx)
defer cancel() // cancel work when we exit func

sortedAddrs, triggerMore := delayDialAddrs(ctx, remoteAddrs)

// use a single response type instead of errs and conns, reduces complexity *a ton*
respch := make(chan dialResult)

Expand All @@ -296,11 +321,12 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.
defer s.limiter.clearAllPeerDials(p)

var active int

for {
select {
case addr, ok := <-remoteAddrs:
case addr, ok := <-sortedAddrs:
if !ok {
remoteAddrs = nil
sortedAddrs = nil
if active == 0 {
return nil, exitErr
}
Expand All @@ -321,9 +347,15 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.
// Errors are normal, lots of dials will fail
exitErr = resp.Err

if remoteAddrs == nil && active == 0 {
if sortedAddrs == nil && active == 0 {
return nil, exitErr
}
if active == 0 {
select {
case triggerMore <- struct{}{}:
default:
}
}
} else if resp.Conn != nil {
return resp.Conn, nil
}
Expand Down

0 comments on commit 19ab72d

Please sign in to comment.