Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backoff for websocket connection retry #338

Merged
merged 8 commits into from
Mar 24, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies:
- "$HOME/ffmpeg"
- "$HOME/compiled"
override:
- go get github.com/livepeer/go-livepeer/cmd/livepeer
# - go get github.com/livepeer/go-livepeer/cmd/livepeer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with git clone ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

- go get github.com/golang/glog
- go get github.com/ericxtang/m3u8
- npm install -g ffmpeg-static@2.0.0
Expand Down
16 changes: 14 additions & 2 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import (
"sort"
"time"

"github.com/cenkalti/backoff"
"github.com/ericxtang/m3u8"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/common"
Expand Down Expand Up @@ -44,6 +46,7 @@ var DefaultMasterPlaylistWaitTime = 60 * time.Second
var DefaultJobLength = int64(5760) //Avg 1 day in 15 sec blocks
var ConnFileWriteFreq = time.Duration(60) * time.Second
var LivepeerVersion = "0.1.14-unstable"
var SubscribeRetry = uint64(3)

//NodeID can be converted from libp2p PeerID.
type NodeID string
Expand Down Expand Up @@ -119,8 +122,17 @@ func (n *LivepeerNode) CreateTranscodeJob(strmID StreamID, profiles []ffmpeg.Vid
return err
}

blk, err := b.BlockByNumber(context.Background(), nil)
if err != nil {
var blk *types.Block
getBlock := func() error {
blk, err = b.BlockByNumber(context.Background(), nil)
if err != nil {
return err
}
return nil
}
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = time.Second * 15
if err := backoff.Retry(getBlock, backoff.WithMaxRetries(bo, SubscribeRetry)); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of websocket connections to Infura, we really should retry indefinitely. We don't want to force user interaction if all they need to do is re-establish the connection.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. My concern was around spamming the Infura network, but I guess they are there for a reason and we should always try to reconnect.

glog.Errorf("Cannot get current block number: %v", err)
return ErrNotFound
}
Expand Down
6 changes: 6 additions & 0 deletions eth/claimmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (c *BasicClaimManager) AddReceipt(seqNo int64, data []byte, tDataHash []byt

c.cost = new(big.Int).Add(c.cost, c.pricePerSegment)
c.unclaimedSegs[seqNo] = true
// glog.Infof("Added %v. unclaimSegs: %v", seqNo, c.unclaimedSegs)

return nil
}
Expand Down Expand Up @@ -243,7 +244,12 @@ func (c *BasicClaimManager) markClaimedSegs(segRange [2]int64) {

//Claim creates the onchain claim for all the claims added through AddReceipt
func (c *BasicClaimManager) ClaimVerifyAndDistributeFees() error {
// segs := make([]int64, 0)
// for k, _ := range c.unclaimedSegs {
// segs = append(segs, k)
// }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extraneous?

ranges := c.makeRanges()
// glog.Infof("Claiming for segs: , ranges: %v", segs, ranges)

for _, segRange := range ranges {
//create concat hashes for each seg
Expand Down
17 changes: 15 additions & 2 deletions eth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -713,16 +714,28 @@ func (c *client) RegisteredTranscoders() ([]*lpTypes.Transcoder, error) {
func (c *client) IsAssignedTranscoder(jobID *big.Int) (bool, error) {
jInfo, err := c.JobsManagerSession.GetJob(jobID)
if err != nil {
glog.Errorf("Error getting job: %v", err)
return false, err
}

blk, err := c.backend.BlockByNumber(context.Background(), jInfo.CreationBlock)
if err != nil {
var blk *types.Block
getBlock := func() error {
blk, err = c.backend.BlockByNumber(context.Background(), jInfo.CreationBlock)
if err != nil {
glog.Errorf("Error getting block by number %v: %v. retrying...", jInfo.CreationBlock.String(), err)
return err
}

return nil
}
if err := backoff.Retry(getBlock, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), SubscribeRetry)); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a constant backoff here, but an exponential backoff strategy in CreateTranscodeJob in livepeernode.go?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I removed all the exponential backoffs.

glog.Errorf("BlockByNumber failed: %v", err)
return false, err
}

t, err := c.BondingManagerSession.ElectActiveTranscoder(jInfo.MaxPricePerSegment, blk.Hash(), jInfo.CreationRound)
if err != nil {
glog.Errorf("Error getting ElectActiveTranscoder: %v", err)
return false, err
}

Expand Down
127 changes: 83 additions & 44 deletions eth/eventmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package eth
import (
"context"
"fmt"
"math/big"
"strings"
"time"

"github.com/cenkalti/backoff"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -14,6 +17,8 @@ import (
"github.com/livepeer/go-livepeer/eth/contracts"
)

var SubscribeRetry = uint64(3)

type logCallback func(types.Log) (bool, error)
type headerCallback func(*types.Header) (bool, error)

Expand All @@ -35,6 +40,7 @@ type eventMonitor struct {
backend *ethclient.Client
contractAddrMap map[string]common.Address
eventSubMap map[string]*EventSubscription
latestBlock *big.Int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere?

}

func NewEventMonitor(backend *ethclient.Client, contractAddrMap map[string]common.Address) EventMonitor {
Expand Down Expand Up @@ -75,30 +81,42 @@ func (em *eventMonitor) SubscribeNewRound(ctx context.Context, subName string, l
Topics: [][]common.Hash{[]common.Hash{eventId}},
}

sub, err := em.backend.SubscribeFilterLogs(ctx, q, logsCh)
if err != nil {
return nil, err
subscribe := func() error {
sub, err := em.backend.SubscribeFilterLogs(ctx, q, logsCh)
if err != nil {
glog.Errorf("SubscribeNewRound error: %v. Retrying...", err)
return err
} else {
glog.Infof("SubscribeNewRound successful.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a one-time message or would it lead to additional logging after each round?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a one-time message per connection (it will re-print if there is a re-connection)

}

em.eventSubMap[subName] = &EventSubscription{
sub: sub,
logsCh: logsCh,
active: true,
}

return nil
}

em.eventSubMap[subName] = &EventSubscription{
sub: sub,
logsCh: logsCh,
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = time.Second * 15
if err := backoff.Retry(subscribe, backoff.WithMaxRetries(b, SubscribeRetry)); err != nil {
glog.Infof("SubscribeNewRound error: %v", err)
return nil, err
}

go em.watchLogs(subName, cb, func() {
glog.Infof("Trying to resubscribe for %v", subName)

sub, err = em.backend.SubscribeFilterLogs(ctx, q, logsCh)
if err != nil {
glog.Error(err)
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = time.Second * 15
if err := backoff.Retry(subscribe, backoff.WithMaxRetries(b, SubscribeRetry)); err != nil {
glog.Infof("Resubscription error: %v", err)
return
}

em.eventSubMap[subName].sub = sub
em.eventSubMap[subName].logsCh = logsCh
})

return sub, nil
return em.eventSubMap[subName].sub, nil
}

func (em *eventMonitor) SubscribeNewJob(ctx context.Context, subName string, logsCh chan types.Log, broadcasterAddr common.Address, cb logCallback) (ethereum.Subscription, error) {
Expand Down Expand Up @@ -127,61 +145,82 @@ func (em *eventMonitor) SubscribeNewJob(ctx context.Context, subName string, log
}
}

sub, err := em.backend.SubscribeFilterLogs(ctx, q, logsCh)
if err != nil {
return nil, err
subscribe := func() error {
sub, err := em.backend.SubscribeFilterLogs(ctx, q, logsCh)
if err != nil {
glog.Errorf("SubscribeNewJob error: %v. retrying...", err)
return err
} else {
glog.Infof("SubscribedNewJob successful.")
}

em.eventSubMap[subName] = &EventSubscription{
sub: sub,
logsCh: logsCh,
active: true,
}
return nil
}

em.eventSubMap[subName] = &EventSubscription{
sub: sub,
logsCh: logsCh,
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = time.Second * 15
if err = backoff.Retry(subscribe, backoff.WithMaxRetries(b, 3)); err != nil {
glog.Errorf("SubscribeNewJob failed: %v", err)
return nil, err
}

go em.watchLogs(subName, cb, func() {
glog.Infof("Trying to resubscribe for %v", subName)

sub, err = em.backend.SubscribeFilterLogs(ctx, q, logsCh)
if err != nil {
glog.Error(err)
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = time.Second * 15
if err := backoff.Retry(subscribe, backoff.WithMaxRetries(b, SubscribeRetry)); err != nil {
glog.Errorf("Resubscribe failed: %v", err)
return
}

em.eventSubMap[subName].sub = sub
em.eventSubMap[subName].logsCh = logsCh
})

return sub, nil
return em.eventSubMap[subName].sub, nil
}

func (em *eventMonitor) SubscribeNewBlock(ctx context.Context, subName string, headersCh chan *types.Header, cb headerCallback) (ethereum.Subscription, error) {
if _, ok := em.eventSubMap[subName]; ok {
return nil, fmt.Errorf("Event subscription already registered as active with name: %v", subName)
}

sub, err := em.backend.SubscribeNewHead(ctx, headersCh)
if err != nil {
return nil, err
}
subscribe := func() error {
sub, err := em.backend.SubscribeNewHead(ctx, headersCh)
if err != nil {
glog.Errorf("SubscribeNewHead error: %v. retrying...", err)
return err
} else {
glog.Infof("SubscribeNewHead successful.")
}

em.eventSubMap[subName] = &EventSubscription{
sub: sub,
headersCh: headersCh,
em.eventSubMap[subName] = &EventSubscription{
sub: sub,
headersCh: headersCh,
active: true,
}
return nil
}
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = time.Second * 15
if err := backoff.Retry(subscribe, backoff.WithMaxRetries(b, SubscribeRetry)); err != nil {
glog.Errorf("SubscribeNewHead failed: %v", err)
return nil, err
}

go em.watchBlocks(subName, sub, headersCh, cb, func() {
go em.watchBlocks(subName, em.eventSubMap[subName].sub, headersCh, cb, func() {
glog.Infof("Trying to resubscribe for %v", subName)

sub, err = em.backend.SubscribeNewHead(ctx, headersCh)
if err != nil {
glog.Error(err)
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = time.Second * 15
if err := backoff.Retry(subscribe, backoff.WithMaxRetries(b, SubscribeRetry)); err != nil {
glog.Errorf("Resubscribe failed: %v", err)
return
}

em.eventSubMap[subName].sub = sub
em.eventSubMap[subName].headersCh = headersCh
})

return sub, nil
return em.eventSubMap[subName].sub, nil
}

func (em *eventMonitor) setSubActive(subName string) {
Expand Down
22 changes: 22 additions & 0 deletions vendor/github.com/backoff/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions vendor/github.com/backoff/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions vendor/github.com/backoff/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions vendor/github.com/backoff/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading