Skip to content

Commit

Permalink
server: Fix timeout for stream recording background jobs (#2573)
Browse files Browse the repository at this point in the history
* server: Use separate timeout for recording segments

* server: Increase segment save attempts to 3

The retries last arg to SaveRetried is actually
the number of attempts, not the number of retries.
So to try saving 3 times, increased it to 3.

* CHANGELOG
  • Loading branch information
victorges authored Aug 31, 2022
1 parent a0ec61f commit 6bf2087
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#### General

#### Broadcaster
- [#2573](https://github.com/livepeer/go-livepeer/pull/2573) server: Fix timeout for stream recording background jobs (@victorges)

#### Orchestrator

Expand Down
6 changes: 6 additions & 0 deletions clog/clog.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/golang/glog"
)
Expand Down Expand Up @@ -68,6 +69,11 @@ func Clone(parentCtx, logCtx context.Context) context.Context {
return context.WithValue(parentCtx, clogContextKey, newCmap)
}

func WithTimeout(parentCtx, logCtx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
ctx := Clone(parentCtx, logCtx)
return context.WithTimeout(ctx, timeout)
}

func AddManifestID(ctx context.Context, val string) context.Context {
return AddVal(ctx, manifestID, val)
}
Expand Down
10 changes: 8 additions & 2 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var maxDurationSec = common.MaxDuration.Seconds()
// Max threshold for # of broadcast sessions under which we will refresh the session list
var maxRefreshSessionsThreshold = 8.0

var recordSegmentsMaxTimeout = 1 * time.Minute

var Policy *verification.Policy
var BroadcastCfg = &BroadcastConfig{}
var MaxAttempts = 3
Expand Down Expand Up @@ -787,8 +789,10 @@ func processSegment(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSSeg
hasZeroVideoFrame := seg.IsZeroFrame
if ros != nil && !hasZeroVideoFrame {
go func() {
ctx, cancel := clog.WithTimeout(context.Background(), ctx, recordSegmentsMaxTimeout)
defer cancel()
now := time.Now()
uri, err := drivers.SaveRetried(ctx, ros, name, seg.Data, map[string]string{"duration": segDurMs}, 2)
uri, err := drivers.SaveRetried(ctx, ros, name, seg.Data, map[string]string{"duration": segDurMs}, 3)
took := time.Since(now)
if err != nil {
clog.Errorf(ctx, "Error saving name=%s bytes=%d to record store err=%q",
Expand Down Expand Up @@ -1184,11 +1188,13 @@ func downloadResults(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSSe

if bros != nil {
go func() {
ctx, cancel := clog.WithTimeout(context.Background(), ctx, recordSegmentsMaxTimeout)
defer cancel()
ext, _ := common.ProfileFormatExtension(profile.Format)
name := fmt.Sprintf("%s/%d%s", profile.Name, seg.SeqNo, ext)
segDurMs := getSegDurMsString(seg)
now := time.Now()
uri, err := drivers.SaveRetried(ctx, bros, name, data, map[string]string{"duration": segDurMs}, 2)
uri, err := drivers.SaveRetried(ctx, bros, name, data, map[string]string{"duration": segDurMs}, 3)
took := time.Since(now)
if err != nil {
clog.Errorf(ctx, "Error saving nonce=%d manifestID=%s name=%s to record store err=%q", nonce, cxn.mid, name, err)
Expand Down

0 comments on commit 6bf2087

Please sign in to comment.