From 454d44088a58ead3a072973377bf4e1f63412117 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 15 Feb 2018 21:28:59 +0100 Subject: [PATCH] Delay dials to realy addrs --- dial_delay.go | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++ swarm_dial.go | 56 +++++++++++++++++++------ 2 files changed, 155 insertions(+), 12 deletions(-) create mode 100644 dial_delay.go diff --git a/dial_delay.go b/dial_delay.go new file mode 100644 index 00000000..9a11187d --- /dev/null +++ b/dial_delay.go @@ -0,0 +1,111 @@ +package swarm + +import ( + "context" + "sort" + "time" + + ma "github.com/multiformats/go-multiaddr" + mafmt "github.com/whyrusleeping/mafmt" +) + +const p_circuit = 290 + +var relay = mafmt.Or(mafmt.Base(p_circuit), mafmt.And(mafmt.IPFS, mafmt.Base(p_circuit))) + +// delayDialAddrs returns a address channel sorted by priority, pushing the +// addresses with delay between them. The other channel can be used to trigger +// sending more addresses in case all previous failed +func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multiaddr, chan<- struct{}) { + type delayAddr struct { + maddr ma.Multiaddr + delay time.Duration + } + + gatherAddrs := func(c <-chan ma.Multiaddr) []ma.Multiaddr { + addrs := make([]ma.Multiaddr, 0, len(c)) + for { + select { + case a, ok := <-c: + if !ok { + return addrs + } + addrs = append(addrs, a) + default: //NOTE: This may need to be switched to a tiny timeout at some point + return addrs + } + } + } + + sortTiered := func(addrs []ma.Multiaddr) []*delayAddr { + delayed := make([]*delayAddr, len(addrs)) + sort.Slice(addrs, func(i, j int) bool { + return dialOffset(addrs[i]) < dialOffset(addrs[j]) + }) + + if len(addrs) == 0 { + return nil + } + + lastOffset := dialOffset(addrs[0]) + for i, a := range addrs { + offset := dialOffset(a) + delayed[i] = &delayAddr{ + maddr: a, + delay: offset - lastOffset, + } + lastOffset = offset + } + + return delayed + } + + out := make(chan ma.Multiaddr) + triggerNext := make(chan struct{}, 1) + + go func() { + defer close(out) + + for { + addrs := make([]ma.Multiaddr, 0, 1) + select { + case a, ok := <-c: + if !ok { + return + } + addrs = append(addrs, a) + } + + addrs = append(addrs, gatherAddrs(c)...) + delayed := sortTiered(addrs) + + for _, a := range delayed { + if a.delay > 0 { + after := time.After(a.delay) + select { + case <-triggerNext: + case <-after: + case <-ctx.Done(): + return + } + } + select { + case out <- a.maddr: + case <-ctx.Done(): + return + } + } + } + }() + + return out, triggerNext +} + +func dialOffset(addr ma.Multiaddr) time.Duration { + switch { + case relay.Matches(addr): + return 2000 * time.Millisecond //TODO: adjust based on something + default: + return 0 + } +} diff --git a/swarm_dial.go b/swarm_dial.go index 1531bb2e..1370ce6a 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -14,16 +14,39 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -// Diagram of dial sync: +// Diagram of Dial // -// many callers of Dial() synched w. dials many addrs results to callers -// ----------------------\ dialsync use earliest /-------------- -// -----------------------\ |----------\ /---------------- -// ------------------------>------------<------- >---------<----------------- -// -----------------------| \----x \---------------- -// ----------------------| \-----x \--------------- -// any may fail if no addr at end -// retry dialAttempt x +// Concurrent Calls +// to Dial(peerId) dial finds dialable +// / addresses and passes +// v them to dialAddrs +// Dial(peerId) --------------------\ | +// Dial(peerId) -------------| v +// Dial(peerId) ------------>----> doDial() --> dial() --> dialAddrs() <- dialAddrs concurrently dials +// Dial(peerId) ------------| ^ __/\__ all known peer addresses +// Dial(peerId) ------/ | /||||||\ +// / |||||||| +// Synced with dialsync LLLLL--- <- Costly paths are delayed +// calls doDial DDDDD +// ||||| +// /--------------------------------> ||--- <- limitedDial (LD) limits +// Each connection is established || concurrency for transports +// by dialAddr, which starts a Some connection attempts -> x|- which require file descriptors +// network connection and sets may fail | || +// up the encryption layers | || --- <- if needed, dials through costly +// | || ||| paths (such as relay) will start +// | || LLL +// \-> xx--DDD +// ||||| +// eventually a connection will ||||| +// get established (C), other -> |C||| +// Connection setup will be attempts will get cancelled x|xxx +// performed with dialConnSetup. | +// It adds the connection to the ----------------------------------> | +// swarm and agrees on the | +// muxer to use /---\ +// The result is distributed -> ||||| +// to callers of Dial ||||| var ( // ErrDialBackoff is returned by the backoff code when a given peer has @@ -287,6 +310,8 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma. ctx, cancel := context.WithCancel(ctx) defer cancel() // cancel work when we exit func + sortedAddrs, triggerMore := delayDialAddrs(ctx, remoteAddrs) + // use a single response type instead of errs and conns, reduces complexity *a ton* respch := make(chan dialResult) @@ -296,11 +321,12 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma. defer s.limiter.clearAllPeerDials(p) var active int + for { select { - case addr, ok := <-remoteAddrs: + case addr, ok := <-sortedAddrs: if !ok { - remoteAddrs = nil + sortedAddrs = nil if active == 0 { return nil, exitErr } @@ -321,9 +347,15 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma. // Errors are normal, lots of dials will fail exitErr = resp.Err - if remoteAddrs == nil && active == 0 { + if sortedAddrs == nil && active == 0 { return nil, exitErr } + if active == 0 { + select { + case triggerMore <- struct{}{}: + default: + } + } } else if resp.Conn != nil { return resp.Conn, nil }