Skip to content

Commit

Permalink
capabilities: infer capabilities from segment video codec (#2191)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberj0g authored Feb 8, 2022
1 parent 72408a5 commit 198d694
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 23 deletions.
1 change: 1 addition & 0 deletions build/chain_dev.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !mainnet && !rinkeby
// +build !mainnet,!rinkeby

package build
Expand Down
1 change: 1 addition & 0 deletions build/chain_mainnet.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build mainnet
// +build mainnet

package build
Expand Down
1 change: 1 addition & 0 deletions build/chain_rinkeby.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build rinkeby
// +build rinkeby

package build
Expand Down
48 changes: 46 additions & 2 deletions core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Capabilities struct {
}
type CapabilityTest struct {
inVideoData []byte
outProfile ffmpeg.VideoProfile
outProfile ffmpeg.VideoProfile
}

// Do not rearrange these values! Only append.
Expand All @@ -44,6 +44,8 @@ const (
Capability_HEVC_Encode
Capability_VP8_Decode
Capability_VP9_Decode
Capability_VP8_Encode
Capability_VP9_Encode
)


Expand Down Expand Up @@ -74,6 +76,7 @@ var CapabilityTestLookup = map[Capability]CapabilityTest{
var capFormatConv = errors.New("capability: unknown format")
var capStorageConv = errors.New("capability: unknown storage")
var capProfileConv = errors.New("capability: unknown profile")
var capCodecConv = errors.New("capability: unknown codec")

func DefaultCapabilities() []Capability {
// Add to this list as new features are added.
Expand Down Expand Up @@ -152,7 +155,6 @@ func JobCapabilities(params *StreamParameters) (*Capabilities, error) {
caps := make(map[Capability]bool)

// Define any default capabilities (especially ones that may be mandatory)
caps[Capability_H264] = true
caps[Capability_AuthToken] = true
if params.VerificationFreq > 0 {
caps[Capability_MPEG7VideoSignature] = true
Expand All @@ -167,6 +169,13 @@ func JobCapabilities(params *StreamParameters) (*Capabilities, error) {
}
caps[c] = true

// set encoder
encodeCap, err := outputCodecToCapability(v.Encoder)
if err != nil {
return nil, err
}
caps[encodeCap] = true

// fractional framerates
if v.FramerateDen > 0 {
caps[Capability_FractionalFramerates] = true
Expand Down Expand Up @@ -202,6 +211,13 @@ func JobCapabilities(params *StreamParameters) (*Capabilities, error) {
}
}

// capabilities based on detected input codec
decodeCap, err := inputCodecToCapability(params.Codec)
if err != nil {
return nil, err
}
caps[decodeCap] = true

// generate bitstring
capList := []Capability{}
for k := range caps {
Expand Down Expand Up @@ -263,6 +279,34 @@ func NewCapabilities(caps []Capability, m []Capability) *Capabilities {
return c
}

func inputCodecToCapability(codec ffmpeg.VideoCodec) (Capability, error) {
switch codec {
case ffmpeg.H264:
return Capability_H264, nil
case ffmpeg.H265:
return Capability_HEVC_Decode, nil
case ffmpeg.VP8:
return Capability_VP8_Decode, nil
case ffmpeg.VP9:
return Capability_VP9_Decode, nil
}
return Capability_Invalid, capCodecConv
}

func outputCodecToCapability(codec ffmpeg.VideoCodec) (Capability, error) {
switch codec {
case ffmpeg.H264:
return Capability_H264, nil
case ffmpeg.H265:
return Capability_HEVC_Encode, nil
case ffmpeg.VP8:
return Capability_VP8_Encode, nil
case ffmpeg.VP9:
return Capability_VP9_Encode, nil
}
return Capability_Invalid, capCodecConv
}

func formatToCapability(format ffmpeg.Format) (Capability, error) {
switch format {
case ffmpeg.FormatNone:
Expand Down
1 change: 1 addition & 0 deletions core/capabilities_experimental_off.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !experimental
// +build !experimental

package core
Expand Down
1 change: 1 addition & 0 deletions core/capabilities_experimental_on.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build experimental
// +build experimental

package core
Expand Down
1 change: 1 addition & 0 deletions core/streamdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type StreamParameters struct {
Detection DetectionConfig
VerificationFreq uint
Nonce uint64
Codec ffmpeg.VideoCodec
}

func (s *StreamParameters) StreamID() string {
Expand Down
7 changes: 1 addition & 6 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,12 +706,7 @@ func processSegment(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSSeg
ros := cpl.GetRecordOSSession()
segDurMs := getSegDurMsString(seg)

now := time.Now()
hasZeroVideoFrame, err := ffmpeg.HasZeroVideoFrameBytes(seg.Data)
if err != nil {
clog.Warningf(ctx, "Error checking for zero video frame name=%s bytes=%d took=%s err=%q",
seg.Name, len(seg.Data), time.Since(now), err)
}
hasZeroVideoFrame := seg.IsZeroFrame
if ros != nil && !hasZeroVideoFrame {
go func() {
now := time.Now()
Expand Down
41 changes: 34 additions & 7 deletions server/mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func streamParams(d stream.AppData) *core.StreamParameters {
func gotRTMPStreamHandler(s *LivepeerServer) func(url *url.URL, rtmpStrm stream.RTMPVideoStream) (err error) {
return func(url *url.URL, rtmpStrm stream.RTMPVideoStream) (err error) {

cxn, err := s.registerConnection(context.Background(), rtmpStrm)
cxn, err := s.registerConnection(context.Background(), rtmpStrm, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -534,7 +534,7 @@ func endRTMPStreamHandler(s *LivepeerServer) func(url *url.URL, rtmpStrm stream.
}
}

func (s *LivepeerServer) registerConnection(ctx context.Context, rtmpStrm stream.RTMPVideoStream) (*rtmpConnection, error) {
func (s *LivepeerServer) registerConnection(ctx context.Context, rtmpStrm stream.RTMPVideoStream, actualStreamCodec *ffmpeg.VideoCodec) (*rtmpConnection, error) {
ctx = clog.Clone(context.Background(), ctx)
// Set up the connection tracking
params := streamParams(rtmpStrm.AppData())
Expand All @@ -556,6 +556,10 @@ func (s *LivepeerServer) registerConnection(ctx context.Context, rtmpStrm stream
storage := params.OS

// Generate and set capabilities
if actualStreamCodec != nil {
params.Codec = *actualStreamCodec
}

caps, err := core.JobCapabilities(params)
if err != nil {
return nil, err
Expand Down Expand Up @@ -836,6 +840,28 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
ctx = clog.AddNonce(ctx, cxn.nonce)
}

isZeroFrame, _, vcodecStr, err := ffmpeg.GetCodecInfoBytes(body)
if err != nil {
httpErr := fmt.Sprintf("Error getting codec info url=%s", r.URL)
clog.Errorf(ctx, httpErr)
http.Error(w, httpErr, http.StatusUnprocessableEntity)
return
}

var vcodec *ffmpeg.VideoCodec
if len(vcodecStr) == 0 {
clog.Warningf(ctx, "Couldn't detect input video stream codec")
} else {
vcodecVal, ok := ffmpeg.FfmpegNameToVideoCodec[vcodecStr]
vcodec = &vcodecVal
if !ok {
httpErr := fmt.Sprintf("Unknown input stream codec=%s", vcodecStr)
clog.Errorf(ctx, httpErr)
http.Error(w, httpErr, http.StatusUnprocessableEntity)
return
}
}

// Check for presence and register if a fresh cxn
if !exists {
appData := (createRTMPStreamIDHandler(ctx, s))(r.URL)
Expand Down Expand Up @@ -877,7 +903,7 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
}
}

cxn, err = s.registerConnection(ctx, st)
cxn, err = s.registerConnection(ctx, st, vcodec)
if err != nil {
st.Close()
if err != errAlreadyExists {
Expand Down Expand Up @@ -940,10 +966,11 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
}

seg := &stream.HLSSegment{
Data: body,
Name: fname,
SeqNo: seq,
Duration: float64(duration) / 1000.0,
Data: body,
Name: fname,
SeqNo: seq,
Duration: float64(duration) / 1000.0,
IsZeroFrame: isZeroFrame,
}

// Kick watchdog periodically so session doesn't time out during long transcodes
Expand Down
33 changes: 26 additions & 7 deletions server/mediaserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,13 +1006,13 @@ func TestRegisterConnection(t *testing.T) {

// Should return an error if missing node storage
drivers.NodeStorage = nil
_, err := s.registerConnection(context.TODO(), strm)
_, err := s.registerConnection(context.TODO(), strm, nil)
assert.Equal(err, errStorage)
drivers.NodeStorage = drivers.NewMemoryDriver(nil)

// normal success case
rand.Seed(123)
cxn, err := s.registerConnection(context.TODO(), strm)
cxn, err := s.registerConnection(context.TODO(), strm, nil)
assert.NotNil(cxn)
assert.Nil(err)

Expand All @@ -1028,13 +1028,13 @@ func TestRegisterConnection(t *testing.T) {
assert.NotNil(cxn.params.Capabilities)

// Should return an error if creating another cxn with the same mid
_, err = s.registerConnection(context.TODO(), strm)
_, err = s.registerConnection(context.TODO(), strm, nil)
assert.Equal(err, errAlreadyExists)

// Check for params with an existing OS assigned
storage := drivers.NewS3Driver("", "", "", "", false).NewSession("")
strm = stream.NewBasicRTMPVideoStream(&core.StreamParameters{ManifestID: core.RandomManifestID(), OS: storage})
cxn, err = s.registerConnection(context.TODO(), strm)
cxn, err = s.registerConnection(context.TODO(), strm, nil)
assert.Nil(err)
assert.Equal(storage, cxn.params.OS)
assert.Equal(net.OSInfo_S3, cxn.params.OS.GetInfo().StorageType)
Expand All @@ -1043,15 +1043,34 @@ func TestRegisterConnection(t *testing.T) {
// check for capabilities
profiles := []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9}
strm = stream.NewBasicRTMPVideoStream(&core.StreamParameters{ManifestID: core.RandomManifestID(), Profiles: profiles})
cxn, err = s.registerConnection(context.TODO(), strm)
cxn, err = s.registerConnection(context.TODO(), strm, nil)
assert.Nil(err)
job, err := core.JobCapabilities(streamParams(strm.AppData()))
assert.Nil(err)
assert.Equal(job, cxn.params.Capabilities)

// check for capabilities with codec specified
inCodec := ffmpeg.H264
strm = stream.NewBasicRTMPVideoStream(&core.StreamParameters{ManifestID: core.RandomManifestID(), Profiles: profiles})
cxn, err = s.registerConnection(context.TODO(), strm, &inCodec)
assert.Nil(err)
assert.True(core.NewCapabilities([]core.Capability{core.Capability_H264}, []core.Capability{}).CompatibleWith(cxn.params.Capabilities.ToNetCapabilities()))
assert.False(core.NewCapabilities([]core.Capability{core.Capability_HEVC_Decode}, []core.Capability{}).CompatibleWith(cxn.params.Capabilities.ToNetCapabilities()))

inCodec = ffmpeg.H265
profiles[0].Encoder = ffmpeg.H265
strm = stream.NewBasicRTMPVideoStream(&core.StreamParameters{ManifestID: core.RandomManifestID(), Profiles: profiles})
cxn, err = s.registerConnection(context.TODO(), strm, &inCodec)
assert.Nil(err)
assert.True(core.NewCapabilities([]core.Capability{
core.Capability_HEVC_Decode,
core.Capability_HEVC_Encode,
}, []core.Capability{}).CompatibleWith(cxn.params.Capabilities.ToNetCapabilities()))

// check for capabilities: exit with an invalid cap
profiles[0].Format = -1
strm = stream.NewBasicRTMPVideoStream(&core.StreamParameters{ManifestID: core.RandomManifestID(), Profiles: profiles})
cxn, err = s.registerConnection(context.TODO(), strm)
cxn, err = s.registerConnection(context.TODO(), strm, nil)
assert.Nil(cxn)
assert.Equal("capability: unknown format", err.Error())
// TODO test with non-legacy capabilities once we have some
Expand All @@ -1066,7 +1085,7 @@ func TestRegisterConnection(t *testing.T) {
name := fmt.Sprintf("%v_%v", t.Name(), i)
mid := core.SplitStreamIDString(name).ManifestID
strm := stream.NewBasicRTMPVideoStream(&core.StreamParameters{ManifestID: mid})
cxn, err := s.registerConnection(context.TODO(), strm)
cxn, err := s.registerConnection(context.TODO(), strm, nil)

assert.Nil(err)
assert.NotNil(cxn)
Expand Down
6 changes: 5 additions & 1 deletion server/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func TestPush_ShouldReturn422ForNonRetryable(t *testing.T) {
s, cancel := setupServerWithCancel()
defer serverCleanup(s)
defer cancel()
reader := strings.NewReader("InsteadOf.TS")

d, _ := ioutil.ReadFile("./test.flv")
reader := bytes.NewReader(d)
w := httptest.NewRecorder()
req := httptest.NewRequest("POST", "/live/mani/18.ts", reader)

Expand Down Expand Up @@ -117,6 +119,7 @@ func TestPush_ShouldReturn422ForNonRetryable(t *testing.T) {
buf, err = proto.Marshal(tr)
require.Nil(t, err)
w = httptest.NewRecorder()
reader = bytes.NewReader(d)
req = httptest.NewRequest("POST", "/live/mani/18.ts", reader)
req.Header.Set("Accept", "multipart/mixed")
s.HandlePush(w, req)
Expand All @@ -135,6 +138,7 @@ func TestPush_ShouldReturn422ForNonRetryable(t *testing.T) {
buf, err = proto.Marshal(tr)
require.Nil(t, err)
w = httptest.NewRecorder()
reader = bytes.NewReader(d)
req = httptest.NewRequest("POST", "/live/mani/18.ts", reader)
req.Header.Set("Accept", "multipart/mixed")
s.HandlePush(w, req)
Expand Down

0 comments on commit 198d694

Please sign in to comment.