Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

[RFC] Delay dials to relay addresses #57

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 209 additions & 0 deletions dial_delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package swarm

import (
"context"
"time"

ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt"
)

const p_circuit = 290
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably capitalize this to P_CIRCUIT -- that's what we do everywhere else.


const numTiers = 2

var TierDelay = 1 * time.Second

var relay = mafmt.Or(mafmt.And(mafmt.Base(p_circuit), mafmt.Base(ma.P_IPFS)), mafmt.And(mafmt.Base(ma.P_IPFS), mafmt.Base(p_circuit), mafmt.Base(ma.P_IPFS)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could have the transports have some way to indicate that they should be delayed, possibly an optional interface like

type ExpensiveTransport interface {
  TransportCost() int //basically the 'tier' to use here
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be aware of potential problems with changes in the circuit addressing here -- cf libp2p/specs#72

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might break with the changes in libp2p/go-libp2p-circuit#48
We should drop the trailing /ipfs part from the filter and just rely on the presence of /p2p-circuit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also problematic for explicit relay addresses.


// delayDialAddrs returns a address channel sorted by priority, pushing the
// addresses with delay between them. The other channel can be used to trigger
// sending more addresses in case all previous failed
func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multiaddr, chan<- struct{}) {
out := make(chan ma.Multiaddr)
delay := time.NewTimer(TierDelay)
triggerNext := make(chan struct{}, 1)

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to have this broken out into a separate function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

defer delay.Stop()
defer close(out)
var pending [numTiers][]ma.Multiaddr
lastTier := -1

// put enqueues the multiaddr
put := func(addr ma.Multiaddr) {
tier := getTier(addr)
pending[tier] = append(pending[tier], addr)
}

// get gets the best (lowest tier) multiaddr available
// note that within a single tier put/get behave like a stack (LIFO)
get := func() (ma.Multiaddr, int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth noting that we're using a stack within each tier. Might not be the best approach (though definitely simpler and cheaper to implement)

for i, tier := range pending[:] {
if len(tier) > 0 {
addr := tier[len(tier)-1]
tier[len(tier)-1] = nil
pending[i] = tier[:len(tier)-1]
return addr, i
}
}
return nil, -1
}

// fillBuckets reads pending addresses form the channel without blocking
fillBuckets := func() bool {
for {
select {
case addr, ok := <-c:
if !ok {
return false
}
put(addr)
default:
return true
}
}
}

// waitForMore waits for addresses from the channel
waitForMore := func() (bool, error) {
select {
case addr, ok := <-c:
if !ok {
return false, nil
}
put(addr)
case <-ctx.Done():
return false, ctx.Err()
}
return true, nil
}

// maybeJumpTier will check if the address tier is changing and optionally
// wait some time.
maybeJumpTier := func(tier int, next ma.Multiaddr) (cont bool, brk bool, err error) {
if tier > lastTier && lastTier != -1 {
// Wait the delay (preempt with new addresses or when the dialer
// requests more addresses)
select {
case addr, ok := <-c:
put(next)
if !ok {
return false, true, nil
}
put(addr)
return true, false, nil
case <-delay.C:
delay.Reset(TierDelay)
case <-triggerNext:
if !delay.Stop() {
<-delay.C
}
delay.Reset(TierDelay)
case <-ctx.Done():
return false, false, ctx.Err()
}
}

// Note that we want to only update the tier after we've done the waiting
// or we were asked to finish early
lastTier = tier
return false, false, nil
}

recvOrSend := func(next ma.Multiaddr) (brk bool, err error) {
select {
case addr, ok := <-c:
put(next)
if !ok {
return true, nil
}
put(addr)
case out <- next:
// Always count the timeout since the last dial.
if !delay.Stop() {
<-delay.C
}
delay.Reset(TierDelay)
case <-ctx.Done():
return false, ctx.Err()
}
return false, nil
}

// process the address stream
for {
if !fillBuckets() {
break // input channel closed
}

next, tier := get()

// Nothing? Block!
if next == nil {
ok, err := waitForMore()
if err != nil {
return
}
if !ok {
break // input channel closed
}
continue
}

cont, brk, err := maybeJumpTier(tier, next)
if cont {
continue // received an address while waiting, in case it's lower tier
// look at it immediately
}
if brk {
break // input channel closed
}
if err != nil {
return
}

brk, err = recvOrSend(next)
if brk {
break // input channel closed
}
if err != nil {
return
}
}

// the channel is closed by now
c = nil

// finish sending
for {
next, tier := get()
if next == nil {
return
}

_, _, err := maybeJumpTier(tier, next)
if err != nil {
return
}

_, err = recvOrSend(next)
if err != nil {
return
}
}
}()

return out, triggerNext
}

// getTier returns the priority tier of the address.
// return value must be > 0 & < numTiers.
func getTier(addr ma.Multiaddr) int {
switch {
case relay.Matches(addr):
return 1
default:
return 0
}
}
Loading