Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transcoder capabilities discovery and validation #2150

Merged
merged 7 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#### Orchestrator

#### Transcoder
* H.265/HEVC encoding and decoding
* VP8/VP9 decoding

### Bug Fixes 🐞

Expand Down
14 changes: 8 additions & 6 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func main() {
n.OrchSecret, _ = common.GetPass(*orchSecret)
}

transcoderCaps := core.DefaultCapabilities()
if *transcoder {
core.WorkDir = *datadir
if *nvidia != "" {
Expand All @@ -282,9 +283,9 @@ func main() {
glog.Infof("Transcoding on these Nvidia GPUs: %v", devices)
// Test transcoding with nvidia
if *testTranscoder {
err := core.TestNvidiaTranscoder(devices)
transcoderCaps, err = core.TestTranscoderCapabilities(devices)
if err != nil {
glog.Fatalf("Unable to transcode using Nvidia gpu=%q err=%q", strings.Join(devices, ","), err)
glog.Fatal(err)
}
}
// FIXME: Short-term hack to pre-load the detection models on every device
Expand All @@ -303,6 +304,8 @@ func main() {
// Initialize LB transcoder
n.Transcoder = core.NewLoadBalancingTranscoder(devices, core.NewNvidiaTranscoder, core.NewNvidiaTranscoderWithDetector)
} else {
// for local software mode, enable all capabilities
transcoderCaps = append(core.DefaultCapabilities(), core.OptionalCapabilities()...)
n.Transcoder = core.NewLocalTranscoder(*datadir)
}
}
Expand Down Expand Up @@ -901,12 +904,11 @@ func main() {
// take the port to listen to from the service URI
*httpAddr = defaultAddr(*httpAddr, "", n.GetServiceURI().Port())

caps := core.DefaultCapabilities()
if *sceneClassificationModelPath != "" {
// Only enable experimental capabilities if scene classification model is actually loaded
caps = append(caps, core.ExperimentalCapabilities()...)
transcoderCaps = append(transcoderCaps, core.ExperimentalCapabilities()...)
}
n.Capabilities = core.NewCapabilities(caps, core.MandatoryCapabilities())
n.Capabilities = core.NewCapabilities(transcoderCaps, core.MandatoryOCapabilities())
darkdarkdragon marked this conversation as resolved.
Show resolved Hide resolved

if !*transcoder && n.OrchSecret == "" {
glog.Fatal("Running an orchestrator requires an -orchSecret for standalone mode or -transcoder for orchestrator+transcoder mode")
Expand Down Expand Up @@ -999,7 +1001,7 @@ func main() {
glog.Fatal("Missing -orchAddr")
}

go server.RunTranscoder(n, orchURLs[0].Host, *maxSessions)
go server.RunTranscoder(n, orchURLs[0].Host, *maxSessions, transcoderCaps)
}

switch n.NodeType {
Expand Down
44 changes: 43 additions & 1 deletion core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type Capabilities struct {
mandatories CapabilityString
constraints Constraints
}
type CapabilityTest struct {
inVideoData []byte
outProfile ffmpeg.VideoProfile
}

// Do not rearrange these values! Only append.
const (
Expand All @@ -36,8 +40,37 @@ const (
Capability_AuthToken
Capability_SceneClassification
Capability_MPEG7VideoSignature
Capability_HEVC_Decode
Capability_HEVC_Encode
Capability_VP8_Decode
Capability_VP9_Decode
)


var CapabilityTestLookup = map[Capability]CapabilityTest{
// 145x145 is the lowest resolution supported by NVENC on Windows
Capability_H264: {
inVideoData: testSegment_H264,
outProfile: ffmpeg.VideoProfile{Resolution: "145x145", Bitrate: "1000k", Format: ffmpeg.FormatMPEGTS},
},
Capability_HEVC_Decode: {
inVideoData: testSegment_HEVC,
outProfile: ffmpeg.VideoProfile{Resolution: "145x145", Bitrate: "1000k", Format: ffmpeg.FormatMPEGTS},
},
Capability_HEVC_Encode: {
inVideoData: testSegment_H264,
outProfile: ffmpeg.VideoProfile{Resolution: "145x145", Bitrate: "1000k", Format: ffmpeg.FormatMPEGTS, Encoder: ffmpeg.H265},
},
Capability_VP8_Decode: {
inVideoData: testSegment_VP8,
outProfile: ffmpeg.VideoProfile{Resolution: "145x145", Bitrate: "1000k", Format: ffmpeg.FormatMPEGTS},
},
Capability_VP9_Decode: {
inVideoData: testSegment_VP9,
outProfile: ffmpeg.VideoProfile{Resolution: "145x145", Bitrate: "1000k", Format: ffmpeg.FormatMPEGTS},
},
}

var capFormatConv = errors.New("capability: unknown format")
var capStorageConv = errors.New("capability: unknown storage")
var capProfileConv = errors.New("capability: unknown profile")
Expand All @@ -62,12 +95,21 @@ func DefaultCapabilities() []Capability {
}
}

func OptionalCapabilities() []Capability {
return []Capability{
Capability_HEVC_Decode,
Capability_HEVC_Encode,
Capability_VP8_Decode,
Capability_VP9_Decode,
}
}

func ExperimentalCapabilities() []Capability {
// Add experimental capabilities if enabled during build
return experimentalCapabilities
}

func MandatoryCapabilities() []Capability {
func MandatoryOCapabilities() []Capability {
// Add to this list as certain features become mandatory.
// Use sparingly, as adding to this is a hard break with older nodes
return []Capability{
Expand Down
4 changes: 2 additions & 2 deletions core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ func TestTranscode(t *testing.T) {
}

// Check transcode result
if Over1Pct(len(tr.TranscodeData.Segments[0].Data), 64484) { // 144p
if Over1Pct(len(tr.TranscodeData.Segments[0].Data), 218268) { // 144p
t.Error("Unexpected transcode result ", len(tr.TranscodeData.Segments[0].Data))
}
if Over1Pct(len(tr.TranscodeData.Segments[1].Data), 88172) { // 240p
if Over1Pct(len(tr.TranscodeData.Segments[1].Data), 302868) { // 240p
t.Error("Unexpected transcode result ", len(tr.TranscodeData.Segments[1].Data))
}

Expand Down
57 changes: 41 additions & 16 deletions core/orch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func TestServeTranscoder(t *testing.T) {
strm := &StubTranscoderServer{}

// test that a transcoder was created
go n.serveTranscoder(strm, 5)
capabilities := NewCapabilities(DefaultCapabilities(), []Capability{})
go n.serveTranscoder(strm, 5, capabilities.ToNetCapabilities())
time.Sleep(1 * time.Second)

tc, ok := n.TranscoderManager.liveTranscoders[strm]
Expand All @@ -102,7 +103,7 @@ func TestRemoteTranscoder(t *testing.T) {
m := NewRemoteTranscoderManager()
initTranscoder := func() (*RemoteTranscoder, *StubTranscoderServer) {
strm := &StubTranscoderServer{manager: m}
tc := NewRemoteTranscoder(m, strm, 5)
tc := NewRemoteTranscoder(m, strm, 5, nil)
return tc, strm
}

Expand Down Expand Up @@ -237,9 +238,11 @@ func TestManageTranscoders(t *testing.T) {
assert.Empty(m.remoteTranscoders)
assert.Equal(0, m.RegisteredTranscodersCount())

capabilities := NewCapabilities(DefaultCapabilities(), []Capability{})

// test that transcoder is added to liveTranscoders and remoteTranscoders
wg1 := newWg(1)
go func() { m.Manage(strm, 5); wg1.Done() }()
go func() { m.Manage(strm, 5, capabilities.ToNetCapabilities()); wg1.Done() }()
time.Sleep(1 * time.Millisecond) // allow the manager to activate

assert.NotNil(m.liveTranscoders[strm])
Expand All @@ -253,7 +256,7 @@ func TestManageTranscoders(t *testing.T) {

// test that additional transcoder is added to liveTranscoders and remoteTranscoders
wg2 := newWg(1)
go func() { m.Manage(strm2, 4); wg2.Done() }()
go func() { m.Manage(strm2, 4, capabilities.ToNetCapabilities()); wg2.Done() }()
time.Sleep(1 * time.Millisecond) // allow the manager to activate

assert.NotNil(m.liveTranscoders[strm])
Expand Down Expand Up @@ -285,16 +288,20 @@ func TestSelectTranscoder(t *testing.T) {
strm := &StubTranscoderServer{manager: m, WithholdResults: false}
strm2 := &StubTranscoderServer{manager: m}

capabilities := NewCapabilities(DefaultCapabilities(), []Capability{})
richCapabilities := NewCapabilities(append(DefaultCapabilities(), Capability_HEVC_Encode), []Capability{})
allCapabilities := NewCapabilities(append(DefaultCapabilities(), OptionalCapabilities()...), []Capability{})

// sanity check that transcoder is not in liveTranscoders or remoteTranscoders
assert := assert.New(t)
assert.Nil(m.liveTranscoders[strm])
assert.Empty(m.remoteTranscoders)

// register transcoders, which adds transcoder to liveTranscoders and remoteTranscoders
wg := newWg(1)
go func() { m.Manage(strm, 1) }()
go func() { m.Manage(strm, 1, capabilities.ToNetCapabilities()) }()
time.Sleep(1 * time.Millisecond) // allow time for first stream to register
go func() { m.Manage(strm2, 1); wg.Done() }()
go func() { m.Manage(strm2, 1, richCapabilities.ToNetCapabilities()); wg.Done() }()
time.Sleep(1 * time.Millisecond) // allow time for second stream to register

assert.NotNil(m.liveTranscoders[strm])
Expand All @@ -308,7 +315,7 @@ func TestSelectTranscoder(t *testing.T) {
// assert transcoder is returned from selectTranscoder
t1 := m.liveTranscoders[strm]
t2 := m.liveTranscoders[strm2]
currentTranscoder, err := m.selectTranscoder(testSessionId)
currentTranscoder, err := m.selectTranscoder(testSessionId, nil)
assert.Nil(err)
assert.Equal(t2, currentTranscoder)
assert.Equal(1, t2.load)
Expand All @@ -317,23 +324,37 @@ func TestSelectTranscoder(t *testing.T) {

// assert that same transcoder is selected for same sessionId
// and that load stays the same
currentTranscoder, err = m.selectTranscoder(testSessionId)
currentTranscoder, err = m.selectTranscoder(testSessionId, nil)
assert.Nil(err)
assert.Equal(t2, currentTranscoder)
assert.Equal(1, t2.load)
m.completeStreamSession(testSessionId)

// assert that transcoders are selected according to capabilities
currentTranscoder, err = m.selectTranscoder(testSessionId, capabilities)
assert.Nil(err)
m.completeStreamSession(testSessionId)
currentTranscoderRich, err := m.selectTranscoder(testSessionId, richCapabilities)
assert.Nil(err)
assert.NotEqual(currentTranscoder, currentTranscoderRich)
m.completeStreamSession(testSessionId)

// assert no transcoders available for unsupported capability
currentTranscoder, err = m.selectTranscoder(testSessionId, allCapabilities)
assert.NotNil(err)
m.completeStreamSession(testSessionId)

// assert that a new transcoder is selected for a new sessionId
currentTranscoder, err = m.selectTranscoder(testSessionId2)
currentTranscoder, err = m.selectTranscoder(testSessionId2, nil)
assert.Nil(err)
assert.Equal(t1, currentTranscoder)
assert.Equal(1, t1.load)

// Add some more load and assert no transcoder returned if all at capacity
currentTranscoder, err = m.selectTranscoder(testSessionId)
currentTranscoder, err = m.selectTranscoder(testSessionId, nil)
assert.Nil(err)
assert.Equal(t2, currentTranscoder)
noTrans, err := m.selectTranscoder(testSessionId3)
noTrans, err := m.selectTranscoder(testSessionId3, nil)
assert.Equal(err, ErrNoTranscodersAvailable)
assert.Nil(noTrans)

Expand All @@ -348,7 +369,7 @@ func TestSelectTranscoder(t *testing.T) {
assert.NotNil(m.liveTranscoders[strm])

// assert t1 is selected and t2 drained, but was previously selected
currentTranscoder, err = m.selectTranscoder(testSessionId)
currentTranscoder, err = m.selectTranscoder(testSessionId, nil)
assert.Nil(err)
assert.Equal(t1, currentTranscoder)
assert.Equal(1, t1.load)
Expand All @@ -371,13 +392,15 @@ func TestCompleteStreamSession(t *testing.T) {
testSessionId := "testID"
assert := assert.New(t)

capabilities := NewCapabilities(DefaultCapabilities(), []Capability{})

// register transcoders
go func() { m.Manage(strm, 1) }()
go func() { m.Manage(strm, 1, capabilities.ToNetCapabilities()) }()
time.Sleep(1 * time.Millisecond) // allow time for first stream to register
t1 := m.liveTranscoders[strm]

// selectTranscoder and assert that session is added
m.selectTranscoder(testSessionId)
m.selectTranscoder(testSessionId, nil)
assert.Equal(t1, m.streamSessions[testSessionId])
assert.Equal(1, t1.load)

Expand Down Expand Up @@ -436,6 +459,8 @@ func TestTranscoderManagerTranscoding(t *testing.T) {
s := &StubTranscoderServer{manager: m}
testSessionId := "testID"

capabilities := NewCapabilities(DefaultCapabilities(), []Capability{})

// sanity checks
assert := assert.New(t)
assert.Empty(m.liveTranscoders)
Expand All @@ -448,7 +473,7 @@ func TestTranscoderManagerTranscoding(t *testing.T) {
assert.Equal(err, ErrNoTranscodersAvailable)

wg := newWg(1)
go func() { m.Manage(s, 5); wg.Done() }()
go func() { m.Manage(s, 5, capabilities.ToNetCapabilities()); wg.Done() }()
time.Sleep(1 * time.Millisecond)

assert.Len(m.remoteTranscoders, 1) // sanity
Expand Down Expand Up @@ -489,7 +514,7 @@ func TestTranscoderManagerTranscoding(t *testing.T) {

// fatal error should not retry
wg.Add(1)
go func() { m.Manage(s, 5); wg.Done() }()
go func() { m.Manage(s, 5, capabilities.ToNetCapabilities()); wg.Done() }()
time.Sleep(1 * time.Millisecond)

assert.Len(m.remoteTranscoders, 1) // sanity check
Expand Down
Loading