Skip to content

Commit

Permalink
Merge pull request #338 from livepeer/et/monitor-fix
Browse files Browse the repository at this point in the history
Backoff for websocket connection retry
  • Loading branch information
ericxtang authored Mar 24, 2018
2 parents 6b1d930 + 4537221 commit 967ea35
Show file tree
Hide file tree
Showing 35 changed files with 2,771 additions and 51 deletions.
2 changes: 1 addition & 1 deletion circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies:
- "$HOME/ffmpeg"
- "$HOME/compiled"
override:
- go get github.com/livepeer/go-livepeer/cmd/livepeer
- git clone http://github.com/livepeer/go-livepeer.git
- go get github.com/golang/glog
- go get github.com/ericxtang/m3u8
- npm install -g ffmpeg-static@2.0.0
Expand Down
14 changes: 12 additions & 2 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import (
"sort"
"time"

"github.com/cenkalti/backoff"
"github.com/ericxtang/m3u8"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/common"
Expand Down Expand Up @@ -44,6 +46,7 @@ var DefaultMasterPlaylistWaitTime = 60 * time.Second
var DefaultJobLength = int64(5760) //Avg 1 day in 15 sec blocks
var ConnFileWriteFreq = time.Duration(60) * time.Second
var LivepeerVersion = "0.1.14-unstable"
var SubscribeRetry = uint64(3)

//NodeID can be converted from libp2p PeerID.
type NodeID string
Expand Down Expand Up @@ -119,8 +122,15 @@ func (n *LivepeerNode) CreateTranscodeJob(strmID StreamID, profiles []ffmpeg.Vid
return err
}

blk, err := b.BlockByNumber(context.Background(), nil)
if err != nil {
var blk *types.Block
getBlock := func() error {
blk, err = b.BlockByNumber(context.Background(), nil)
if err != nil {
return err
}
return nil
}
if err := backoff.Retry(getBlock, backoff.NewConstantBackOff(time.Second*2)); err != nil {
glog.Errorf("Cannot get current block number: %v", err)
return ErrNotFound
}
Expand Down
6 changes: 6 additions & 0 deletions eth/claimmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (c *BasicClaimManager) AddReceipt(seqNo int64, data []byte, tDataHash []byt

c.cost = new(big.Int).Add(c.cost, c.pricePerSegment)
c.unclaimedSegs[seqNo] = true
// glog.Infof("Added %v. unclaimSegs: %v", seqNo, c.unclaimedSegs)

return nil
}
Expand Down Expand Up @@ -243,7 +244,12 @@ func (c *BasicClaimManager) markClaimedSegs(segRange [2]int64) {

//Claim creates the onchain claim for all the claims added through AddReceipt
func (c *BasicClaimManager) ClaimVerifyAndDistributeFees() error {
segs := make([]int64, 0)
for k, _ := range c.unclaimedSegs {
segs = append(segs, k)
}
ranges := c.makeRanges()
glog.Infof("Claiming for segs: , ranges: %v", segs, ranges)

for _, segRange := range ranges {
//create concat hashes for each seg
Expand Down
17 changes: 15 additions & 2 deletions eth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -713,16 +714,28 @@ func (c *client) RegisteredTranscoders() ([]*lpTypes.Transcoder, error) {
func (c *client) IsAssignedTranscoder(jobID *big.Int) (bool, error) {
jInfo, err := c.JobsManagerSession.GetJob(jobID)
if err != nil {
glog.Errorf("Error getting job: %v", err)
return false, err
}

blk, err := c.backend.BlockByNumber(context.Background(), jInfo.CreationBlock)
if err != nil {
var blk *types.Block
getBlock := func() error {
blk, err = c.backend.BlockByNumber(context.Background(), jInfo.CreationBlock)
if err != nil {
glog.Errorf("Error getting block by number %v: %v. retrying...", jInfo.CreationBlock.String(), err)
return err
}

return nil
}
if err := backoff.Retry(getBlock, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), SubscribeRetry)); err != nil {
glog.Errorf("BlockByNumber failed: %v", err)
return false, err
}

t, err := c.BondingManagerSession.ElectActiveTranscoder(jInfo.MaxPricePerSegment, blk.Hash(), jInfo.CreationRound)
if err != nil {
glog.Errorf("Error getting ElectActiveTranscoder: %v", err)
return false, err
}

Expand Down
113 changes: 69 additions & 44 deletions eth/eventmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/cenkalti/backoff"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -14,6 +16,8 @@ import (
"github.com/livepeer/go-livepeer/eth/contracts"
)

var SubscribeRetry = uint64(3)

type logCallback func(types.Log) (bool, error)
type headerCallback func(*types.Header) (bool, error)

Expand Down Expand Up @@ -75,30 +79,38 @@ func (em *eventMonitor) SubscribeNewRound(ctx context.Context, subName string, l
Topics: [][]common.Hash{[]common.Hash{eventId}},
}

sub, err := em.backend.SubscribeFilterLogs(ctx, q, logsCh)
if err != nil {
return nil, err
subscribe := func() error {
sub, err := em.backend.SubscribeFilterLogs(ctx, q, logsCh)
if err != nil {
glog.Errorf("SubscribeNewRound error: %v. Retrying...", err)
return err
} else {
glog.Infof("SubscribeNewRound successful.")
}

em.eventSubMap[subName] = &EventSubscription{
sub: sub,
logsCh: logsCh,
active: true,
}

return nil
}

em.eventSubMap[subName] = &EventSubscription{
sub: sub,
logsCh: logsCh,
if err := backoff.Retry(subscribe, backoff.NewConstantBackOff(time.Second*2)); err != nil {
glog.Infof("SubscribeNewRound error: %v", err)
return nil, err
}

go em.watchLogs(subName, cb, func() {
glog.Infof("Trying to resubscribe for %v", subName)

sub, err = em.backend.SubscribeFilterLogs(ctx, q, logsCh)
if err != nil {
glog.Error(err)
if err := backoff.Retry(subscribe, backoff.NewConstantBackOff(time.Second*2)); err != nil {
glog.Infof("Resubscription error: %v", err)
return
}

em.eventSubMap[subName].sub = sub
em.eventSubMap[subName].logsCh = logsCh
})

return sub, nil
return em.eventSubMap[subName].sub, nil
}

func (em *eventMonitor) SubscribeNewJob(ctx context.Context, subName string, logsCh chan types.Log, broadcasterAddr common.Address, cb logCallback) (ethereum.Subscription, error) {
Expand Down Expand Up @@ -127,61 +139,74 @@ func (em *eventMonitor) SubscribeNewJob(ctx context.Context, subName string, log
}
}

sub, err := em.backend.SubscribeFilterLogs(ctx, q, logsCh)
if err != nil {
return nil, err
subscribe := func() error {
sub, err := em.backend.SubscribeFilterLogs(ctx, q, logsCh)
if err != nil {
glog.Errorf("SubscribeNewJob error: %v. retrying...", err)
return err
} else {
glog.Infof("SubscribedNewJob successful.")
}

em.eventSubMap[subName] = &EventSubscription{
sub: sub,
logsCh: logsCh,
active: true,
}
return nil
}

em.eventSubMap[subName] = &EventSubscription{
sub: sub,
logsCh: logsCh,
if err = backoff.Retry(subscribe, backoff.NewConstantBackOff(time.Second*2)); err != nil {
glog.Errorf("SubscribeNewJob failed: %v", err)
return nil, err
}

go em.watchLogs(subName, cb, func() {
glog.Infof("Trying to resubscribe for %v", subName)

sub, err = em.backend.SubscribeFilterLogs(ctx, q, logsCh)
if err != nil {
glog.Error(err)
if err := backoff.Retry(subscribe, backoff.NewConstantBackOff(time.Second*2)); err != nil {
glog.Errorf("Resubscribe failed: %v", err)
return
}

em.eventSubMap[subName].sub = sub
em.eventSubMap[subName].logsCh = logsCh
})

return sub, nil
return em.eventSubMap[subName].sub, nil
}

func (em *eventMonitor) SubscribeNewBlock(ctx context.Context, subName string, headersCh chan *types.Header, cb headerCallback) (ethereum.Subscription, error) {
if _, ok := em.eventSubMap[subName]; ok {
return nil, fmt.Errorf("Event subscription already registered as active with name: %v", subName)
}

sub, err := em.backend.SubscribeNewHead(ctx, headersCh)
if err != nil {
return nil, err
}
subscribe := func() error {
sub, err := em.backend.SubscribeNewHead(ctx, headersCh)
if err != nil {
glog.Errorf("SubscribeNewHead error: %v. retrying...", err)
return err
} else {
glog.Infof("SubscribeNewHead successful.")
}

em.eventSubMap[subName] = &EventSubscription{
sub: sub,
headersCh: headersCh,
em.eventSubMap[subName] = &EventSubscription{
sub: sub,
headersCh: headersCh,
active: true,
}
return nil
}
if err := backoff.Retry(subscribe, backoff.NewConstantBackOff(time.Second*2)); err != nil {
glog.Errorf("SubscribeNewHead failed: %v", err)
return nil, err
}

go em.watchBlocks(subName, sub, headersCh, cb, func() {
go em.watchBlocks(subName, em.eventSubMap[subName].sub, headersCh, cb, func() {
glog.Infof("Trying to resubscribe for %v", subName)

sub, err = em.backend.SubscribeNewHead(ctx, headersCh)
if err != nil {
glog.Error(err)
if err := backoff.Retry(subscribe, backoff.NewConstantBackOff(time.Second*2)); err != nil {
glog.Errorf("Resubscribe failed: %v", err)
return
}

em.eventSubMap[subName].sub = sub
em.eventSubMap[subName].headersCh = headersCh
})

return sub, nil
return em.eventSubMap[subName].sub, nil
}

func (em *eventMonitor) setSubActive(subName string) {
Expand Down
16 changes: 14 additions & 2 deletions eth/eventservices/jobservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package eventservices

import (
"context"
"errors"
"fmt"
"math/big"
"time"

"github.com/cenkalti/backoff"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -46,8 +49,17 @@ func (s *JobService) Start(ctx context.Context) error {
sub, err := s.eventMonitor.SubscribeNewJob(ctx, "NewJob", logsCh, common.Address{}, func(l types.Log) (bool, error) {
_, jid, _, _ := parseNewJobLog(l)

job, err := s.node.Eth.GetJob(jid)
if err != nil {
var job *lpTypes.Job
getJob := func() error {
j, err := s.node.Eth.GetJob(jid)
if j.StreamId == "" {
glog.Errorf("Got empty job for id:%v. Should try again.", jid.Int64())
return errors.New("ErrGetJob")
}
job = j
return err
}
if err := backoff.Retry(getJob, backoff.NewConstantBackOff(time.Second*2)); err != nil {
glog.Errorf("Error getting job info: %v", err)
return false, err
}
Expand Down
22 changes: 22 additions & 0 deletions vendor/github.com/cenkalti/backoff/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions vendor/github.com/cenkalti/backoff/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions vendor/github.com/cenkalti/backoff/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 967ea35

Please sign in to comment.