From bd12e99a8e988caec4f80c90525d9fefb2d7e8c9 Mon Sep 17 00:00:00 2001 From: Yondon Fu Date: Wed, 20 Mar 2024 21:10:02 +0000 Subject: [PATCH] server: Re-use session logic for AI --- server/ai_session.go | 30 ++++++++++++++++++++++++++++++ server/broadcast.go | 15 +++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/server/ai_session.go b/server/ai_session.go index 7a5930ce58..6274e397d5 100644 --- a/server/ai_session.go +++ b/server/ai_session.go @@ -3,6 +3,7 @@ package server import ( "context" "math" + "math/rand" "strconv" "sync" "time" @@ -11,6 +12,7 @@ import ( "github.com/livepeer/go-livepeer/common" "github.com/livepeer/go-livepeer/core" "github.com/livepeer/go-tools/drivers" + "github.com/livepeer/lpms/stream" ) type AISession struct { @@ -25,6 +27,7 @@ type AISession struct { type AISessionPool struct { selector BroadcastSessionsSelector sessMap map[string]*BroadcastSession + inUseSess []*BroadcastSession suspender *suspender mu sync.RWMutex } @@ -44,6 +47,13 @@ func (pool *AISessionPool) Select(ctx context.Context) *BroadcastSession { for { sess := pool.selector.Select(ctx) + if sess == nil { + sess = pool.selectInUse() + } else { + // Track in-use session the first time it is returned by the selector + pool.inUseSess = append(pool.inUseSess, sess) + } + if sess == nil { return nil } @@ -53,6 +63,9 @@ func (pool *AISessionPool) Select(ctx context.Context) *BroadcastSession { continue } + // Track a dummy segment for the session in indicate an in-flight request + sess.pushSegInFlight(&stream.HLSSegment{}) + return sess } } @@ -73,6 +86,14 @@ func (pool *AISessionPool) Complete(sess *BroadcastSession) { return } + // If there are still in-flight requests for the session return early + // and do not return the session to the selector + inFlight, _ := sess.popSegInFlight() + if inFlight > 0 { + return + } + + pool.inUseSess = removeSessionFromList(pool.inUseSess, sess) pool.selector.Complete(sess) } @@ -103,6 +124,7 @@ func (pool *AISessionPool) Remove(sess *BroadcastSession) { defer pool.mu.Unlock() delete(pool.sessMap, sess.Transcoder()) + pool.inUseSess = removeSessionFromList(pool.inUseSess, sess) // Magic number for now penalty := 3 @@ -118,6 +140,14 @@ func (pool *AISessionPool) Size() int { return len(pool.sessMap) } +func (pool *AISessionPool) selectInUse() *BroadcastSession { + if len(pool.inUseSess) == 0 { + return nil + } + // Select a random in-use session + return pool.inUseSess[rand.Intn(len(pool.inUseSess))] +} + type AISessionSelector struct { // Pool of sessions with orchs that have the requested model warm warmPool *AISessionPool diff --git a/server/broadcast.go b/server/broadcast.go index f96b02bfbc..bb0a8344fc 100755 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -507,6 +507,21 @@ func (bs *BroadcastSession) pushSegInFlight(seg *stream.HLSSegment) { bs.lock.Unlock() } +// Pop a SegFlightMetadata from a session's SegsInFlight +// Returns the end length of a session's SegsInFlight and the popped SegFlightMetadata +func (bs *BroadcastSession) popSegInFlight() (int, SegFlightMetadata) { + bs.lock.Lock() + defer bs.lock.Unlock() + + if len(bs.SegsInFlight) == 0 { + return 0, SegFlightMetadata{} + } + + sm := bs.SegsInFlight[0] + bs.SegsInFlight = bs.SegsInFlight[1:] + return len(bs.SegsInFlight), sm +} + // selects number of sessions to use according to current algorithm func (bsm *BroadcastSessionsManager) selectSessions(ctx context.Context) (bs []*BroadcastSession, calcPerceptualHash bool, verified bool) { bsm.sessLock.Lock()