Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
dial delay: simplify the code
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Feb 19, 2018
1 parent 5750ea7 commit 35bbd29
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 41 deletions.
123 changes: 82 additions & 41 deletions dial_delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,49 +49,51 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi
return nil, -1
}

outer:
for {
fill:
// fillBuckets reads pending addresses form the channel without blocking
fillBuckets := func() bool {
loop:
for {
select {
case addr, ok := <-c:
if !ok {
break outer
return false
}
put(addr)
default:
break fill
break loop
}
}
return true
}

next, tier := get()

// Nothing? Block!
if next == nil {
select {
case addr, ok := <-c:
if !ok {
break outer
}
put(addr)
case <-ctx.Done():
return
// waitForMore woits for addresses from the channel
waitForMore := func() (bool, error) {
select {
case addr, ok := <-c:
if !ok {
return false, nil
}
continue
put(addr)
case <-ctx.Done():
return false, ctx.Err()
}
return true, nil
}

// Jumping a tier?
// maybeJumpTier will check if the address tier is changing and optionally
// wait some time.
maybeJumpTier := func(tier int, next ma.Multiaddr) (cont bool, brk bool, err error) {
if tier > lastTier && lastTier != -1 {
// Wait the delay (preempt with new addresses or when the dialer
// requests more addresses)
select {
case addr, ok := <-c:
put(next)
if !ok {
break outer
return false, true, nil
}
put(addr)
continue
return true, false, nil
case <-delay.C:
delay.Reset(tierDelay)
case <-triggerNext:
Expand All @@ -100,55 +102,94 @@ func delayDialAddrs(ctx context.Context, c <-chan ma.Multiaddr) (<-chan ma.Multi
}
delay.Reset(tierDelay)
case <-ctx.Done():
return
return false, false, ctx.Err()
}
}

// Note that we want to only update the tier after we've done the waiting
// or we were asked to finish early
lastTier = tier
return false, false, nil
}

recvOrSend := func(next ma.Multiaddr) (brk bool, err error) {
select {
case addr, ok := <-c:
put(next)
if !ok {
break outer
return true, nil
}
put(addr)
continue
case out <- next:
// Always count the timeout since the last dial.
if !delay.Stop() {
<-delay.C
}
delay.Reset(tierDelay)
case <-ctx.Done():
return false, ctx.Err()
}
return false, nil
}

// process the address stream
for {
if !fillBuckets() {
break // input channel closed
}

next, tier := get()

// Nothing? Block!
if next == nil {
ok, err := waitForMore()
if err != nil {
return
}
if !ok {
break // input channel closed
}
continue
}

cont, brk, err := maybeJumpTier(tier, next)
if cont {
continue // received an address while waiting, in case it's lower tier
// look at it immediately
}
if brk {
break // input channel closed
}
if err != nil {
return
}

brk, err = recvOrSend(next)
if brk {
break // input channel closed
}
if err != nil {
return
}
}

// the channel is closed by now
c = nil

// finish sending
for {
next, tier := get()
if next == nil {
return
}
if tier > lastTier && lastTier != -1 {
select {
case <-delay.C:
case <-triggerNext:
if !delay.Stop() {
<-delay.C
}
delay.Reset(tierDelay)
case <-ctx.Done():
return
}

_, _, err := maybeJumpTier(tier, next)
if err != nil {
return
}
lastTier = tier
select {
case out <- next:
delay.Stop()
delay.Reset(tierDelay)
case <-ctx.Done():

_, err = recvOrSend(next)
if err != nil {
return
}
}
Expand Down
6 changes: 6 additions & 0 deletions dial_delay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ func prepare() {
tierDelay = 32 * time.Millisecond // 2x windows timer resolution
}

// addrChan creates a multiaddr channel with `nsync` size. If nsync is larger
// than 0, the entries will get pre-buffered in the channel.
// addrDelays is a set of addresses and delays between sending them. If a string
// starts with '/' it will be parsed as an address and sent to the channel.
// Otherwise it will get parsed as a time to sleep before sending next addresses
// or closing the channel
func addrChan(t *testing.T, nsync int, addrDelays ...string) <-chan ma.Multiaddr {
out := make(chan ma.Multiaddr, nsync)
c := sync.NewCond(&sync.Mutex{})
Expand Down

0 comments on commit 35bbd29

Please sign in to comment.