From 6bf208769aeeb1fb16f76ba784ad068bd280543e Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 31 Aug 2022 15:54:37 -0300 Subject: [PATCH] server: Fix timeout for stream recording background jobs (#2573) * server: Use separate timeout for recording segments * server: Increase segment save attempts to 3 The retries last arg to SaveRetried is actually the number of attempts, not the number of retries. So to try saving 3 times, increased it to 3. * CHANGELOG --- CHANGELOG_PENDING.md | 1 + clog/clog.go | 6 ++++++ server/broadcast.go | 10 ++++++++-- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 3ca5044811..dc8f3bd0c4 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -21,6 +21,7 @@ #### General #### Broadcaster +- [#2573](https://github.com/livepeer/go-livepeer/pull/2573) server: Fix timeout for stream recording background jobs (@victorges) #### Orchestrator diff --git a/clog/clog.go b/clog/clog.go index 9cd8314f50..20ed637cf6 100644 --- a/clog/clog.go +++ b/clog/clog.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/golang/glog" ) @@ -68,6 +69,11 @@ func Clone(parentCtx, logCtx context.Context) context.Context { return context.WithValue(parentCtx, clogContextKey, newCmap) } +func WithTimeout(parentCtx, logCtx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + ctx := Clone(parentCtx, logCtx) + return context.WithTimeout(ctx, timeout) +} + func AddManifestID(ctx context.Context, val string) context.Context { return AddVal(ctx, manifestID, val) } diff --git a/server/broadcast.go b/server/broadcast.go index fac459807c..467fb392c4 100755 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -40,6 +40,8 @@ var maxDurationSec = common.MaxDuration.Seconds() // Max threshold for # of broadcast sessions under which we will refresh the session list var maxRefreshSessionsThreshold = 8.0 +var recordSegmentsMaxTimeout = 1 * time.Minute + var Policy *verification.Policy var BroadcastCfg = &BroadcastConfig{} var MaxAttempts = 3 @@ -787,8 +789,10 @@ func processSegment(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSSeg hasZeroVideoFrame := seg.IsZeroFrame if ros != nil && !hasZeroVideoFrame { go func() { + ctx, cancel := clog.WithTimeout(context.Background(), ctx, recordSegmentsMaxTimeout) + defer cancel() now := time.Now() - uri, err := drivers.SaveRetried(ctx, ros, name, seg.Data, map[string]string{"duration": segDurMs}, 2) + uri, err := drivers.SaveRetried(ctx, ros, name, seg.Data, map[string]string{"duration": segDurMs}, 3) took := time.Since(now) if err != nil { clog.Errorf(ctx, "Error saving name=%s bytes=%d to record store err=%q", @@ -1184,11 +1188,13 @@ func downloadResults(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSSe if bros != nil { go func() { + ctx, cancel := clog.WithTimeout(context.Background(), ctx, recordSegmentsMaxTimeout) + defer cancel() ext, _ := common.ProfileFormatExtension(profile.Format) name := fmt.Sprintf("%s/%d%s", profile.Name, seg.SeqNo, ext) segDurMs := getSegDurMsString(seg) now := time.Now() - uri, err := drivers.SaveRetried(ctx, bros, name, data, map[string]string{"duration": segDurMs}, 2) + uri, err := drivers.SaveRetried(ctx, bros, name, data, map[string]string{"duration": segDurMs}, 3) took := time.Since(now) if err != nil { clog.Errorf(ctx, "Error saving nonce=%d manifestID=%s name=%s to record store err=%q", nonce, cxn.mid, name, err)