Skip to content

Commit

Permalink
Serialize writes of the JSON playlist
Browse files Browse the repository at this point in the history
  • Loading branch information
darkdarkdragon committed Jun 29, 2021
1 parent f383b08 commit 61d77c2
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 24 deletions.
54 changes: 31 additions & 23 deletions core/playlistmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@ import (
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/drivers"
"github.com/livepeer/go-livepeer/monitor"
ffmpeg "github.com/livepeer/lpms/ffmpeg"
"github.com/livepeer/m3u8"
)

const LIVE_LIST_LENGTH uint = 6

const jsonPlaylistRotationInterval = 60 * 60 * 1000 // 1 hour (in ms)
const (
jsonPlaylistRotationInterval = 60 * 60 * 1000 // 1 hour (in ms)
jsonPlaylistMaxRetries = 30
JsonPlaylistInitialTimeout = 5 * time.Second
JsonPlaylistMaxTimeout = 120 * time.Second
)

var JsonPlaylistQuitTimeout = 60 * time.Second

// PlaylistManager manages playlists and data for one video stream, backed by one object storage.
type PlaylistManager interface {
Expand Down Expand Up @@ -47,11 +53,12 @@ type BasicPlaylistManager struct {
recordSession drivers.OSSession
manifestID ManifestID
// Live playlist used for broadcasting
masterPList *m3u8.MasterPlaylist
mediaLists map[string]*m3u8.MediaPlaylist
mapSync *sync.RWMutex
jsonList *JsonPlaylist
jsonListSync *sync.Mutex
masterPList *m3u8.MasterPlaylist
mediaLists map[string]*m3u8.MediaPlaylist
mapSync *sync.RWMutex
jsonList *JsonPlaylist
jsonListWriteQueue *drivers.OvewriteQueue
jsonListSync *sync.Mutex
}

type jsonSeg struct {
Expand Down Expand Up @@ -226,16 +233,30 @@ func NewBasicPlaylistManager(manifestID ManifestID,
if recordSession != nil {
bplm.jsonList = NewJSONPlaylist()
bplm.jsonListSync = &sync.Mutex{}
bplm.makeNewOverwriteQueue()
}
return bplm
}

func (mgr *BasicPlaylistManager) makeNewOverwriteQueue() {
if mgr.jsonListWriteQueue != nil {
mgr.jsonListWriteQueue.StopAfter(JsonPlaylistQuitTimeout)
}
mgr.jsonListWriteQueue = drivers.NewOverwriteQueue(mgr.recordSession, mgr.jsonList.name, "json playlist",
jsonPlaylistMaxRetries, JsonPlaylistInitialTimeout, JsonPlaylistMaxTimeout)
}

func (mgr *BasicPlaylistManager) ManifestID() ManifestID {
return mgr.manifestID
}

func (mgr *BasicPlaylistManager) Cleanup() {
mgr.storageSession.EndSession()
if mgr.storageSession != nil {
mgr.storageSession.EndSession()
}
if mgr.jsonListWriteQueue != nil {
mgr.jsonListWriteQueue.StopAfter(JsonPlaylistQuitTimeout)
}
}

func (mgr *BasicPlaylistManager) GetOSSession() drivers.OSSession {
Expand All @@ -255,23 +276,10 @@ func (mgr *BasicPlaylistManager) FlushRecord() {
glog.Error("Error encoding playlist: ", err)
return
}
go func(name string, data []byte) {
now := time.Now()
_, err := mgr.recordSession.SaveData(name, b, nil, 0)
took := time.Since(now)
if err != nil {
glog.Errorf("Error saving json playlist name=%s bytes=%d took=%s err=%v", name,
len(b), took, err)
} else {
glog.V(common.VERBOSE).Infof("Saving json playlist name=%s bytes=%d took=%s err=%v", name,
len(b), took, err)
}
if monitor.Enabled {
monitor.RecordingPlaylistSaved(took, err)
}
}(mgr.jsonList.name, b)
go mgr.jsonListWriteQueue.Save(b)
if mgr.jsonList.DurationMs > jsonPlaylistRotationInterval {
mgr.jsonList = NewJSONPlaylist()
mgr.makeNewOverwriteQueue()
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions core/playlistmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"github.com/stretchr/testify/assert"
)

func init() {
JsonPlaylistQuitTimeout = 0 * time.Second
}

func TestJSONList1(t *testing.T) {
assert := assert.New(t)
jspl1 := NewJSONPlaylist()
Expand Down Expand Up @@ -128,6 +132,7 @@ func TestJsonFlush(t *testing.T) {
data, err := ioutil.ReadAll(fir.Body)
assert.Nil(err)
assert.Equal(`{"duration_ms":43200000,"tracks":[{"name":"source","bandwidth":400000,"resolution":"256x144"}],"segments":{"source":[{"seq_no":1,"uri":"test_seg/1.ts","duration_ms":43200000}]}}`, string(data))
c.Cleanup()
}

func TestGetMasterPlaylist(t *testing.T) {
Expand All @@ -151,6 +156,7 @@ func TestGetMasterPlaylist(t *testing.T) {
assert.NotNil(mpl)
s := mpl.Segments[0]
assert.Equal(segName, s.URI)
c.Cleanup()
}

func TestGetOrCreatePL(t *testing.T) {
Expand Down Expand Up @@ -198,6 +204,7 @@ func TestGetOrCreatePL(t *testing.T) {
if len(masterPL.Variants) != 2 || masterPL.Variants[1].Resolution != vProfile.Resolution {
t.Error("Master PL had some unexpected variants or properties")
}
c.Cleanup()
}

func TestPlaylists(t *testing.T) {
Expand Down Expand Up @@ -267,6 +274,7 @@ func TestPlaylists(t *testing.T) {
if !compareSeg(seg1, newPL.Segments[0]) || !compareSeg(pl.Segments[1], newPL.Segments[0]) {
t.Error("Unexpected seg properties in new playlist")
}
c.Cleanup()

}

Expand Down
119 changes: 119 additions & 0 deletions drivers/overwrite_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package drivers

import (
"time"

"github.com/golang/glog"
"github.com/livepeer/go-livepeer/common"
)

const (
timeoutMultiplier = 1.5
)

type (
OvewriteQueue struct {
desc string
name string
session OSSession
maxRetries int
initialTimeout time.Duration
maxTimeout time.Duration
queue chan []byte
quit chan struct{}
}
)

func NewOverwriteQueue(session OSSession, name, desc string, maxRetries int, initialTimeout, maxTimeout time.Duration) *OvewriteQueue {
oq := &OvewriteQueue{
desc: desc,
name: name,
maxRetries: maxRetries,
session: session,
initialTimeout: initialTimeout,
maxTimeout: maxTimeout,
queue: make(chan []byte, 32),
quit: make(chan struct{}),
}
if maxRetries < 1 {
panic("maxRetries should be greater than zero")
}
go oq.workerLoop()
return oq
}

// Save queues data to be saved
func (oq *OvewriteQueue) Save(data []byte) {
oq.queue <- data
}

// StopAfter stops reading loop after some time
func (oq *OvewriteQueue) StopAfter(pause time.Duration) {
go func(p time.Duration) {
time.Sleep(p)
close(oq.quit)
}(pause)
}

// waitForQueueToClear used in tests
func (oq *OvewriteQueue) waitForQueueToClear(timeout time.Duration) {
// wait for work to be queued
time.Sleep(5 * time.Millisecond)
start := time.Now()
for {
if len(oq.queue) == 0 {
return
}
if time.Since(start) > timeout {
return
}
time.Sleep(20 * time.Millisecond)
}
}

func (oq *OvewriteQueue) workerLoop() {
var err error
var took time.Duration
for {
select {
case data := <-oq.queue:
timeout := oq.initialTimeout
for try := 0; try < oq.maxRetries; try++ {
// we only care about last data
data = oq.getLastMessage(data)
glog.V(common.VERBOSE).Infof("Start saving %s name=%s bytes=%d try=%d", oq.desc, oq.name, len(data), try)
now := time.Now()
_, err = oq.session.SaveData(oq.name, data, nil, timeout)
took := time.Since(now)
if err == nil {
glog.V(common.VERBOSE).Infof("Saving %s name=%s bytes=%d took=%s try=%d", oq.desc, oq.name,
len(data), took, try)
break
}
timeout = time.Duration(float64(timeout) * timeoutMultiplier)
if timeout > oq.maxTimeout {
timeout = oq.maxTimeout
}
}
if err != nil {
glog.Errorf("Error saving %s name=%s bytes=%d took=%s try=%d err=%v", oq.desc, oq.name,
len(data), took, oq.maxRetries, err)
}

case <-oq.quit:
return
}
}
}

func (oq *OvewriteQueue) getLastMessage(current []byte) []byte {
res := current
for {
select {
case data := <-oq.queue:
res = data
default:
return res
}
}
}
70 changes: 70 additions & 0 deletions drivers/overwrite_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package drivers

import (
"errors"
"testing"
"time"
)

func TestOverwriteQueueShouldCallSave(t *testing.T) {
timeout := 500 * time.Millisecond
mos := &MockOSSession{}
oq := NewOverwriteQueue(mos, "f1", "save", 2, timeout, 15*time.Second)

data1 := []byte("data01")

var meta map[string]string
mos.On("SaveData", "f1", data1, meta, timeout).Return("not used", nil).Once()

oq.Save(data1)
oq.waitForQueueToClear(5 * time.Second)
mos.AssertExpectations(t)
oq.StopAfter(0)
time.Sleep(10 * time.Millisecond)
}

func TestOverwriteQueueShouldRetry(t *testing.T) {
timeout := 500 * time.Millisecond
mos := &MockOSSession{}
oq := NewOverwriteQueue(mos, "f1", "retry", 2, timeout, 15*time.Second)

data1 := []byte("data01")

var meta map[string]string
mos.On("SaveData", "f1", data1, meta, timeout).Return("not used", errors.New("no1")).Once()
timeout = time.Duration(float64(timeout) * timeoutMultiplier)
mos.On("SaveData", "f1", data1, meta, timeout).Return("not used", nil).Once()

oq.Save(data1)
oq.waitForQueueToClear(5 * time.Second)
mos.AssertExpectations(t)
oq.StopAfter(0)
time.Sleep(10 * time.Millisecond)
}

func TestOverwriteQueueShouldUseLastValue(t *testing.T) {
timeout := 300 * time.Millisecond
mos := &MockOSSession{}
mos.init()
oq := NewOverwriteQueue(mos, "f1", "use last", 2, timeout, 15*time.Second)

dataw1 := []byte("dataw01")
data2 := []byte("data02")
data3 := []byte("data03")

var meta map[string]string
mos.On("SaveData", "f1", dataw1, meta, timeout).Return("not used", nil).Once()
mos.On("SaveData", "f1", data3, meta, timeout).Return("not used", nil).Once()

mos.waitForCh = true
oq.Save(dataw1)
<-mos.back
oq.Save(data2)
oq.Save(data3)
mos.waitCh <- nil

oq.waitForQueueToClear(5 * time.Second)
mos.AssertExpectations(t)
oq.StopAfter(0)
time.Sleep(10 * time.Millisecond)
}
17 changes: 16 additions & 1 deletion drivers/session_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package drivers

import (
"context"
"fmt"
"time"

"github.com/livepeer/go-livepeer/net"
Expand All @@ -10,10 +11,24 @@ import (

type MockOSSession struct {
mock.Mock
waitForCh bool
waitCh chan interface{}
back chan interface{}
}

func (s *MockOSSession) init() {
s.waitCh = make(chan interface{})
s.back = make(chan interface{})
}

func (s *MockOSSession) SaveData(name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) {
args := s.Called()
fmt.Printf("mock SaveData %s %s %s\n", name, data, timeout)
args := s.Called(name, data, meta, timeout)
if s.waitForCh {
s.back <- nil
<-s.waitCh
s.waitForCh = false
}
return args.String(0), args.Error(1)
}

Expand Down
2 changes: 2 additions & 0 deletions server/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,7 @@ func TestVerifier_Verify(t *testing.T) {
assert := assert.New(t)

os := drivers.NewMemoryDriver(nil).NewSession("")
core.JsonPlaylistQuitTimeout = 0 * time.Second
c := core.NewBasicPlaylistManager(core.ManifestID("streamName"), os, nil)
cxn := &rtmpConnection{
pl: c,
Expand Down Expand Up @@ -1456,6 +1457,7 @@ func TestVerifier_Verify(t *testing.T) {
err = verify(verifier, cxn, sess, source, res, URIs, renditionData)
assert.Nil(err)
assert.Equal([]byte("attempt1"), mem.GetData(name))
c.Cleanup()
}

func TestVerifier_HLSInsertion(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions server/mediaserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func serverCleanup(s *LivepeerServer) {
if cxn != nil && cxn.stream != nil {
cxn.stream.Close()
}
if cxn != nil {
cxn.pl.Cleanup()
}
}
s.connectionLock.Unlock()
}
Expand Down
Loading

0 comments on commit 61d77c2

Please sign in to comment.