Skip to content

Commit

Permalink
server: Re-use session logic for AI
Browse files Browse the repository at this point in the history
  • Loading branch information
yondonfu committed Mar 25, 2024
1 parent f698708 commit bd12e99
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
30 changes: 30 additions & 0 deletions server/ai_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"math"
"math/rand"
"strconv"
"sync"
"time"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-tools/drivers"
"github.com/livepeer/lpms/stream"
)

type AISession struct {
Expand All @@ -25,6 +27,7 @@ type AISession struct {
type AISessionPool struct {
selector BroadcastSessionsSelector
sessMap map[string]*BroadcastSession
inUseSess []*BroadcastSession
suspender *suspender
mu sync.RWMutex
}
Expand All @@ -44,6 +47,13 @@ func (pool *AISessionPool) Select(ctx context.Context) *BroadcastSession {

for {
sess := pool.selector.Select(ctx)
if sess == nil {
sess = pool.selectInUse()
} else {
// Track in-use session the first time it is returned by the selector
pool.inUseSess = append(pool.inUseSess, sess)
}

if sess == nil {
return nil
}
Expand All @@ -53,6 +63,9 @@ func (pool *AISessionPool) Select(ctx context.Context) *BroadcastSession {
continue
}

// Track a dummy segment for the session in indicate an in-flight request
sess.pushSegInFlight(&stream.HLSSegment{})

return sess
}
}
Expand All @@ -73,6 +86,14 @@ func (pool *AISessionPool) Complete(sess *BroadcastSession) {
return
}

// If there are still in-flight requests for the session return early
// and do not return the session to the selector
inFlight, _ := sess.popSegInFlight()
if inFlight > 0 {
return
}

pool.inUseSess = removeSessionFromList(pool.inUseSess, sess)
pool.selector.Complete(sess)
}

Expand Down Expand Up @@ -103,6 +124,7 @@ func (pool *AISessionPool) Remove(sess *BroadcastSession) {
defer pool.mu.Unlock()

delete(pool.sessMap, sess.Transcoder())
pool.inUseSess = removeSessionFromList(pool.inUseSess, sess)

// Magic number for now
penalty := 3
Expand All @@ -118,6 +140,14 @@ func (pool *AISessionPool) Size() int {
return len(pool.sessMap)
}

func (pool *AISessionPool) selectInUse() *BroadcastSession {
if len(pool.inUseSess) == 0 {
return nil
}
// Select a random in-use session
return pool.inUseSess[rand.Intn(len(pool.inUseSess))]
}

type AISessionSelector struct {
// Pool of sessions with orchs that have the requested model warm
warmPool *AISessionPool
Expand Down
15 changes: 15 additions & 0 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,21 @@ func (bs *BroadcastSession) pushSegInFlight(seg *stream.HLSSegment) {
bs.lock.Unlock()
}

// Pop a SegFlightMetadata from a session's SegsInFlight
// Returns the end length of a session's SegsInFlight and the popped SegFlightMetadata
func (bs *BroadcastSession) popSegInFlight() (int, SegFlightMetadata) {
bs.lock.Lock()
defer bs.lock.Unlock()

if len(bs.SegsInFlight) == 0 {
return 0, SegFlightMetadata{}
}

sm := bs.SegsInFlight[0]
bs.SegsInFlight = bs.SegsInFlight[1:]
return len(bs.SegsInFlight), sm
}

// selects number of sessions to use according to current algorithm
func (bsm *BroadcastSessionsManager) selectSessions(ctx context.Context) (bs []*BroadcastSession, calcPerceptualHash bool, verified bool) {
bsm.sessLock.Lock()
Expand Down

0 comments on commit bd12e99

Please sign in to comment.