Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infer capabilities from input codec #2191

Merged
merged 1 commit into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build/chain_dev.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !mainnet && !rinkeby
// +build !mainnet,!rinkeby

package build
Expand Down
1 change: 1 addition & 0 deletions build/chain_mainnet.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build mainnet
// +build mainnet

package build
Expand Down
1 change: 1 addition & 0 deletions build/chain_rinkeby.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build rinkeby
// +build rinkeby

package build
Expand Down
48 changes: 46 additions & 2 deletions core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Capabilities struct {
}
type CapabilityTest struct {
inVideoData []byte
outProfile ffmpeg.VideoProfile
outProfile ffmpeg.VideoProfile
}

// Do not rearrange these values! Only append.
Expand All @@ -44,6 +44,8 @@ const (
Capability_HEVC_Encode
Capability_VP8_Decode
Capability_VP9_Decode
Capability_VP8_Encode
Capability_VP9_Encode
)


Expand Down Expand Up @@ -74,6 +76,7 @@ var CapabilityTestLookup = map[Capability]CapabilityTest{
var capFormatConv = errors.New("capability: unknown format")
var capStorageConv = errors.New("capability: unknown storage")
var capProfileConv = errors.New("capability: unknown profile")
var capCodecConv = errors.New("capability: unknown codec")

func DefaultCapabilities() []Capability {
// Add to this list as new features are added.
Expand Down Expand Up @@ -152,7 +155,6 @@ func JobCapabilities(params *StreamParameters) (*Capabilities, error) {
caps := make(map[Capability]bool)

// Define any default capabilities (especially ones that may be mandatory)
caps[Capability_H264] = true
caps[Capability_AuthToken] = true
if params.VerificationFreq > 0 {
caps[Capability_MPEG7VideoSignature] = true
Expand All @@ -167,6 +169,13 @@ func JobCapabilities(params *StreamParameters) (*Capabilities, error) {
}
caps[c] = true

// set encoder
encodeCap, err := outputCodecToCapability(v.Encoder)
if err != nil {
return nil, err
}
caps[encodeCap] = true

// fractional framerates
if v.FramerateDen > 0 {
caps[Capability_FractionalFramerates] = true
Expand Down Expand Up @@ -202,6 +211,13 @@ func JobCapabilities(params *StreamParameters) (*Capabilities, error) {
}
}

// capabilities based on detected input codec
decodeCap, err := inputCodecToCapability(params.Codec)
if err != nil {
return nil, err
}
caps[decodeCap] = true

// generate bitstring
capList := []Capability{}
for k := range caps {
Expand Down Expand Up @@ -263,6 +279,34 @@ func NewCapabilities(caps []Capability, m []Capability) *Capabilities {
return c
}

func inputCodecToCapability(codec ffmpeg.VideoCodec) (Capability, error) {
switch codec {
case ffmpeg.H264:
return Capability_H264, nil
case ffmpeg.H265:
return Capability_HEVC_Decode, nil
case ffmpeg.VP8:
return Capability_VP8_Decode, nil
case ffmpeg.VP9:
return Capability_VP9_Decode, nil
}
return Capability_Invalid, capCodecConv
}

func outputCodecToCapability(codec ffmpeg.VideoCodec) (Capability, error) {
switch codec {
case ffmpeg.H264:
return Capability_H264, nil
case ffmpeg.H265:
return Capability_HEVC_Encode, nil
case ffmpeg.VP8:
return Capability_VP8_Encode, nil
case ffmpeg.VP9:
return Capability_VP9_Encode, nil
}
return Capability_Invalid, capCodecConv
}

func formatToCapability(format ffmpeg.Format) (Capability, error) {
switch format {
case ffmpeg.FormatNone:
Expand Down
1 change: 1 addition & 0 deletions core/capabilities_experimental_off.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !experimental
// +build !experimental

package core
Expand Down
1 change: 1 addition & 0 deletions core/capabilities_experimental_on.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build experimental
// +build experimental

package core
Expand Down
1 change: 1 addition & 0 deletions core/streamdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type StreamParameters struct {
Detection DetectionConfig
VerificationFreq uint
Nonce uint64
Codec ffmpeg.VideoCodec
}

func (s *StreamParameters) StreamID() string {
Expand Down
7 changes: 1 addition & 6 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,12 +706,7 @@ func processSegment(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSSeg
ros := cpl.GetRecordOSSession()
segDurMs := getSegDurMsString(seg)

now := time.Now()
hasZeroVideoFrame, err := ffmpeg.HasZeroVideoFrameBytes(seg.Data)
if err != nil {
clog.Warningf(ctx, "Error checking for zero video frame name=%s bytes=%d took=%s err=%q",
seg.Name, len(seg.Data), time.Since(now), err)
}
hasZeroVideoFrame := seg.IsZeroFrame
if ros != nil && !hasZeroVideoFrame {
go func() {
now := time.Now()
Expand Down
41 changes: 34 additions & 7 deletions server/mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func streamParams(d stream.AppData) *core.StreamParameters {
func gotRTMPStreamHandler(s *LivepeerServer) func(url *url.URL, rtmpStrm stream.RTMPVideoStream) (err error) {
return func(url *url.URL, rtmpStrm stream.RTMPVideoStream) (err error) {

cxn, err := s.registerConnection(context.Background(), rtmpStrm)
cxn, err := s.registerConnection(context.Background(), rtmpStrm, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -534,7 +534,7 @@ func endRTMPStreamHandler(s *LivepeerServer) func(url *url.URL, rtmpStrm stream.
}
}

func (s *LivepeerServer) registerConnection(ctx context.Context, rtmpStrm stream.RTMPVideoStream) (*rtmpConnection, error) {
func (s *LivepeerServer) registerConnection(ctx context.Context, rtmpStrm stream.RTMPVideoStream, actualStreamCodec *ffmpeg.VideoCodec) (*rtmpConnection, error) {
ctx = clog.Clone(context.Background(), ctx)
// Set up the connection tracking
params := streamParams(rtmpStrm.AppData())
Expand All @@ -556,6 +556,10 @@ func (s *LivepeerServer) registerConnection(ctx context.Context, rtmpStrm stream
storage := params.OS

// Generate and set capabilities
if actualStreamCodec != nil {
params.Codec = *actualStreamCodec
}

caps, err := core.JobCapabilities(params)
if err != nil {
return nil, err
Expand Down Expand Up @@ -836,6 +840,28 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
ctx = clog.AddNonce(ctx, cxn.nonce)
}

isZeroFrame, _, vcodecStr, err := ffmpeg.GetCodecInfoBytes(body)
if err != nil {
httpErr := fmt.Sprintf("Error getting codec info url=%s", r.URL)
clog.Errorf(ctx, httpErr)
http.Error(w, httpErr, http.StatusUnprocessableEntity)
return
}

var vcodec *ffmpeg.VideoCodec
if len(vcodecStr) == 0 {
clog.Warningf(ctx, "Couldn't detect input video stream codec")
} else {
vcodecVal, ok := ffmpeg.FfmpegNameToVideoCodec[vcodecStr]
vcodec = &vcodecVal
if !ok {
httpErr := fmt.Sprintf("Unknown input stream codec=%s", vcodecStr)
clog.Errorf(ctx, httpErr)
http.Error(w, httpErr, http.StatusUnprocessableEntity)
return
}
}

// Check for presence and register if a fresh cxn
if !exists {
appData := (createRTMPStreamIDHandler(ctx, s))(r.URL)
Expand Down Expand Up @@ -877,7 +903,7 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
}
}

cxn, err = s.registerConnection(ctx, st)
cxn, err = s.registerConnection(ctx, st, vcodec)
if err != nil {
st.Close()
if err != errAlreadyExists {
Expand Down Expand Up @@ -940,10 +966,11 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
}

seg := &stream.HLSSegment{
Data: body,
Name: fname,
SeqNo: seq,
Duration: float64(duration) / 1000.0,
Data: body,
Name: fname,
SeqNo: seq,
Duration: float64(duration) / 1000.0,
IsZeroFrame: isZeroFrame,
}

// Kick watchdog periodically so session doesn't time out during long transcodes
Expand Down
33 changes: 26 additions & 7 deletions server/mediaserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,13 +1006,13 @@ func TestRegisterConnection(t *testing.T) {

// Should return an error if missing node storage
drivers.NodeStorage = nil
_, err := s.registerConnection(context.TODO(), strm)
_, err := s.registerConnection(context.TODO(), strm, nil)
assert.Equal(err, errStorage)
drivers.NodeStorage = drivers.NewMemoryDriver(nil)

// normal success case
rand.Seed(123)
cxn, err := s.registerConnection(context.TODO(), strm)
cxn, err := s.registerConnection(context.TODO(), strm, nil)
assert.NotNil(cxn)
assert.Nil(err)

Expand All @@ -1028,13 +1028,13 @@ func TestRegisterConnection(t *testing.T) {
assert.NotNil(cxn.params.Capabilities)

// Should return an error if creating another cxn with the same mid
_, err = s.registerConnection(context.TODO(), strm)
_, err = s.registerConnection(context.TODO(), strm, nil)
assert.Equal(err, errAlreadyExists)

// Check for params with an existing OS assigned
storage := drivers.NewS3Driver("", "", "", "", false).NewSession("")
strm = stream.NewBasicRTMPVideoStream(&core.StreamParameters{ManifestID: core.RandomManifestID(), OS: storage})
cxn, err = s.registerConnection(context.TODO(), strm)
cxn, err = s.registerConnection(context.TODO(), strm, nil)
assert.Nil(err)
assert.Equal(storage, cxn.params.OS)
assert.Equal(net.OSInfo_S3, cxn.params.OS.GetInfo().StorageType)
Expand All @@ -1043,15 +1043,34 @@ func TestRegisterConnection(t *testing.T) {
// check for capabilities
profiles := []ffmpeg.VideoProfile{ffmpeg.P144p30fps16x9}
strm = stream.NewBasicRTMPVideoStream(&core.StreamParameters{ManifestID: core.RandomManifestID(), Profiles: profiles})
cxn, err = s.registerConnection(context.TODO(), strm)
cxn, err = s.registerConnection(context.TODO(), strm, nil)
assert.Nil(err)
job, err := core.JobCapabilities(streamParams(strm.AppData()))
assert.Nil(err)
assert.Equal(job, cxn.params.Capabilities)

// check for capabilities with codec specified
inCodec := ffmpeg.H264
strm = stream.NewBasicRTMPVideoStream(&core.StreamParameters{ManifestID: core.RandomManifestID(), Profiles: profiles})
cxn, err = s.registerConnection(context.TODO(), strm, &inCodec)
assert.Nil(err)
assert.True(core.NewCapabilities([]core.Capability{core.Capability_H264}, []core.Capability{}).CompatibleWith(cxn.params.Capabilities.ToNetCapabilities()))
assert.False(core.NewCapabilities([]core.Capability{core.Capability_HEVC_Decode}, []core.Capability{}).CompatibleWith(cxn.params.Capabilities.ToNetCapabilities()))

inCodec = ffmpeg.H265
profiles[0].Encoder = ffmpeg.H265
strm = stream.NewBasicRTMPVideoStream(&core.StreamParameters{ManifestID: core.RandomManifestID(), Profiles: profiles})
cxn, err = s.registerConnection(context.TODO(), strm, &inCodec)
assert.Nil(err)
assert.True(core.NewCapabilities([]core.Capability{
core.Capability_HEVC_Decode,
core.Capability_HEVC_Encode,
}, []core.Capability{}).CompatibleWith(cxn.params.Capabilities.ToNetCapabilities()))

// check for capabilities: exit with an invalid cap
profiles[0].Format = -1
strm = stream.NewBasicRTMPVideoStream(&core.StreamParameters{ManifestID: core.RandomManifestID(), Profiles: profiles})
cxn, err = s.registerConnection(context.TODO(), strm)
cxn, err = s.registerConnection(context.TODO(), strm, nil)
assert.Nil(cxn)
assert.Equal("capability: unknown format", err.Error())
// TODO test with non-legacy capabilities once we have some
Expand All @@ -1066,7 +1085,7 @@ func TestRegisterConnection(t *testing.T) {
name := fmt.Sprintf("%v_%v", t.Name(), i)
mid := core.SplitStreamIDString(name).ManifestID
strm := stream.NewBasicRTMPVideoStream(&core.StreamParameters{ManifestID: mid})
cxn, err := s.registerConnection(context.TODO(), strm)
cxn, err := s.registerConnection(context.TODO(), strm, nil)

assert.Nil(err)
assert.NotNil(cxn)
Expand Down
6 changes: 5 additions & 1 deletion server/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func TestPush_ShouldReturn422ForNonRetryable(t *testing.T) {
s, cancel := setupServerWithCancel()
defer serverCleanup(s)
defer cancel()
reader := strings.NewReader("InsteadOf.TS")

d, _ := ioutil.ReadFile("./test.flv")
reader := bytes.NewReader(d)
w := httptest.NewRecorder()
req := httptest.NewRequest("POST", "/live/mani/18.ts", reader)

Expand Down Expand Up @@ -117,6 +119,7 @@ func TestPush_ShouldReturn422ForNonRetryable(t *testing.T) {
buf, err = proto.Marshal(tr)
require.Nil(t, err)
w = httptest.NewRecorder()
reader = bytes.NewReader(d)
req = httptest.NewRequest("POST", "/live/mani/18.ts", reader)
req.Header.Set("Accept", "multipart/mixed")
s.HandlePush(w, req)
Expand All @@ -135,6 +138,7 @@ func TestPush_ShouldReturn422ForNonRetryable(t *testing.T) {
buf, err = proto.Marshal(tr)
require.Nil(t, err)
w = httptest.NewRecorder()
reader = bytes.NewReader(d)
req = httptest.NewRequest("POST", "/live/mani/18.ts", reader)
req.Header.Set("Accept", "multipart/mixed")
s.HandlePush(w, req)
Expand Down