This repository has been archived by the owner on May 26, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 37
[RFC] Delay dials to relay addresses #57
Open
magik6k
wants to merge
7
commits into
master
Choose a base branch
from
feat/dial-priority
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 5 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
973940e
Delay dials to realy addrs
magik6k 2e22aa8
dial delay: tests and fixes
magik6k 928ffbc
dial delay: simplify the code
magik6k a945f49
dial delay: address review
magik6k 91fe548
dial delay: safer relay address filter
vyzo 835519e
dial delay: reify delay logic into top-level function
vyzo 4be8cf4
dial delay: fix failing test
vyzo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
package swarm | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
ma "github.com/multiformats/go-multiaddr" | ||
) | ||
|
||
const P_CIRCUIT = 290 | ||
|
||
const numTiers = 2 | ||
|
||
var TierDelay = 1 * time.Second | ||
|
||
// delayDialAddrs returns a address channel sorted by priority, pushing the | ||
// addresses with delay between them. The other channel can be used to trigger | ||
// sending more addresses in case all previous failed | ||
func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multiaddr, chan<- struct{}) { | ||
out := make(chan ma.Multiaddr) | ||
delay := time.NewTimer(TierDelay) | ||
triggerNext := make(chan struct{}, 1) | ||
|
||
go func() { | ||
defer delay.Stop() | ||
defer close(out) | ||
var pending [numTiers][]ma.Multiaddr | ||
lastTier := -1 | ||
|
||
// put enqueues the multiaddr | ||
put := func(addr ma.Multiaddr) { | ||
tier := getTier(addr) | ||
pending[tier] = append(pending[tier], addr) | ||
} | ||
|
||
// get gets the best (lowest tier) multiaddr available | ||
// note that within a single tier put/get behave like a stack (LIFO) | ||
get := func() (ma.Multiaddr, int) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. worth noting that we're using a stack within each tier. Might not be the best approach (though definitely simpler and cheaper to implement) |
||
for i, tier := range pending[:] { | ||
if len(tier) > 0 { | ||
addr := tier[len(tier)-1] | ||
tier[len(tier)-1] = nil | ||
pending[i] = tier[:len(tier)-1] | ||
return addr, i | ||
} | ||
} | ||
return nil, -1 | ||
} | ||
|
||
// fillBuckets reads pending addresses form the channel without blocking | ||
fillBuckets := func() bool { | ||
for { | ||
select { | ||
case addr, ok := <-c: | ||
if !ok { | ||
return false | ||
} | ||
put(addr) | ||
default: | ||
return true | ||
} | ||
} | ||
} | ||
|
||
// waitForMore waits for addresses from the channel | ||
waitForMore := func() (bool, error) { | ||
select { | ||
case addr, ok := <-c: | ||
if !ok { | ||
return false, nil | ||
} | ||
put(addr) | ||
case <-ctx.Done(): | ||
return false, ctx.Err() | ||
} | ||
return true, nil | ||
} | ||
|
||
// maybeJumpTier will check if the address tier is changing and optionally | ||
// wait some time. | ||
maybeJumpTier := func(tier int, next ma.Multiaddr) (cont bool, brk bool, err error) { | ||
if tier > lastTier && lastTier != -1 { | ||
// Wait the delay (preempt with new addresses or when the dialer | ||
// requests more addresses) | ||
select { | ||
case addr, ok := <-c: | ||
put(next) | ||
if !ok { | ||
return false, true, nil | ||
} | ||
put(addr) | ||
return true, false, nil | ||
case <-delay.C: | ||
delay.Reset(TierDelay) | ||
case <-triggerNext: | ||
if !delay.Stop() { | ||
<-delay.C | ||
} | ||
delay.Reset(TierDelay) | ||
case <-ctx.Done(): | ||
return false, false, ctx.Err() | ||
} | ||
} | ||
|
||
// Note that we want to only update the tier after we've done the waiting | ||
// or we were asked to finish early | ||
lastTier = tier | ||
return false, false, nil | ||
} | ||
|
||
recvOrSend := func(next ma.Multiaddr) (brk bool, err error) { | ||
select { | ||
case addr, ok := <-c: | ||
put(next) | ||
if !ok { | ||
return true, nil | ||
} | ||
put(addr) | ||
case out <- next: | ||
// Always count the timeout since the last dial. | ||
if !delay.Stop() { | ||
<-delay.C | ||
} | ||
delay.Reset(TierDelay) | ||
case <-ctx.Done(): | ||
return false, ctx.Err() | ||
} | ||
return false, nil | ||
} | ||
|
||
// process the address stream | ||
for { | ||
if !fillBuckets() { | ||
break // input channel closed | ||
} | ||
|
||
next, tier := get() | ||
|
||
// Nothing? Block! | ||
if next == nil { | ||
ok, err := waitForMore() | ||
if err != nil { | ||
return | ||
} | ||
if !ok { | ||
break // input channel closed | ||
} | ||
continue | ||
} | ||
|
||
cont, brk, err := maybeJumpTier(tier, next) | ||
if cont { | ||
continue // received an address while waiting, in case it's lower tier | ||
// look at it immediately | ||
} | ||
if brk { | ||
break // input channel closed | ||
} | ||
if err != nil { | ||
return | ||
} | ||
|
||
brk, err = recvOrSend(next) | ||
if brk { | ||
break // input channel closed | ||
} | ||
if err != nil { | ||
return | ||
} | ||
} | ||
|
||
// the channel is closed by now | ||
c = nil | ||
|
||
// finish sending | ||
for { | ||
next, tier := get() | ||
if next == nil { | ||
return | ||
} | ||
|
||
_, _, err := maybeJumpTier(tier, next) | ||
if err != nil { | ||
return | ||
} | ||
|
||
_, err = recvOrSend(next) | ||
if err != nil { | ||
return | ||
} | ||
} | ||
}() | ||
|
||
return out, triggerNext | ||
} | ||
|
||
// getTier returns the priority tier of the address. | ||
// return value must be > 0 & < numTiers. | ||
func getTier(addr ma.Multiaddr) int { | ||
switch { | ||
case isRelayAddr(addr): | ||
return 1 | ||
default: | ||
return 0 | ||
} | ||
} | ||
|
||
func isRelayAddr(addr ma.Multiaddr) bool { | ||
_, err := addr.ValueForProtocol(P_CIRCUIT) | ||
return err == nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to have this broken out into a separate function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.