From ad4ef089dd281ef864066e41ae5fbfcaa29892e4 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 28 Dec 2024 23:59:42 +0100 Subject: [PATCH] rtmp: support additional Enhanced-RTMP features (#3685) new features: * Opus and AC-3 tracks * multiple tracks --- README.md | 9 +- .../extended_mpeg2ts_sequence_start.go | 26 - .../rtmp/message/extended_sequence_end.go | 26 - internal/protocols/rtmp/message/message.go | 61 +- .../{acknowledge.go => msg_acknowledge.go} | 0 .../rtmp/message/{audio.go => msg_audio.go} | 0 .../rtmp/message/msg_audio_ex_coded_frames.go | 61 ++ .../msg_audio_ex_multichannel_config.go | 76 ++ .../rtmp/message/msg_audio_ex_multitrack.go | 78 ++ .../message/msg_audio_ex_sequence_start.go | 57 ++ .../{command_amf0.go => msg_command_amf0.go} | 0 ..._amf0_test.go => msg_command_amf0_test.go} | 0 .../{data_amf0.go => msg_data_amf0.go} | 0 ...et_chunk_size.go => msg_set_chunk_size.go} | 0 ...bandwidth.go => msg_set_peer_bandwidth.go} | 0 ...ack_size.go => msg_set_window_ack_size.go} | 0 ...st.go => msg_user_control_ping_request.go} | 0 ...e.go => msg_user_control_ping_response.go} | 0 ... => msg_user_control_set_buffer_length.go} | 0 ...in.go => msg_user_control_stream_begin.go} | 0 ..._dry.go => msg_user_control_stream_dry.go} | 0 ..._eof.go => msg_user_control_stream_eof.go} | 0 ...=> msg_user_control_stream_is_recorded.go} | 0 .../rtmp/message/{video.go => msg_video.go} | 0 ...frames.go => msg_video_ex_coded_frames.go} | 28 +- ...d_frames_x.go => msg_video_ex_frames_x.go} | 19 +- ...d_metadata.go => msg_video_ex_metadata.go} | 18 +- .../rtmp/message/msg_video_ex_multitrack.go | 78 ++ ...tart.go => msg_video_ex_sequence_start.go} | 19 +- .../protocols/rtmp/message/opus_id_header.go | 42 + internal/protocols/rtmp/message/reader.go | 62 +- .../protocols/rtmp/message/reader_test.go | 83 +- internal/protocols/rtmp/reader.go | 796 ++++++++++++------ internal/protocols/rtmp/reader_test.go | 349 ++++---- internal/protocols/rtmp/to_stream.go | 151 +++- internal/servers/rtmp/server_test.go | 7 +- 36 files changed, 1425 insertions(+), 621 deletions(-) delete mode 100644 internal/protocols/rtmp/message/extended_mpeg2ts_sequence_start.go delete mode 100644 internal/protocols/rtmp/message/extended_sequence_end.go rename internal/protocols/rtmp/message/{acknowledge.go => msg_acknowledge.go} (100%) rename internal/protocols/rtmp/message/{audio.go => msg_audio.go} (100%) create mode 100644 internal/protocols/rtmp/message/msg_audio_ex_coded_frames.go create mode 100644 internal/protocols/rtmp/message/msg_audio_ex_multichannel_config.go create mode 100644 internal/protocols/rtmp/message/msg_audio_ex_multitrack.go create mode 100644 internal/protocols/rtmp/message/msg_audio_ex_sequence_start.go rename internal/protocols/rtmp/message/{command_amf0.go => msg_command_amf0.go} (100%) rename internal/protocols/rtmp/message/{command_amf0_test.go => msg_command_amf0_test.go} (100%) rename internal/protocols/rtmp/message/{data_amf0.go => msg_data_amf0.go} (100%) rename internal/protocols/rtmp/message/{set_chunk_size.go => msg_set_chunk_size.go} (100%) rename internal/protocols/rtmp/message/{set_peer_bandwidth.go => msg_set_peer_bandwidth.go} (100%) rename internal/protocols/rtmp/message/{set_window_ack_size.go => msg_set_window_ack_size.go} (100%) rename internal/protocols/rtmp/message/{user_control_ping_request.go => msg_user_control_ping_request.go} (100%) rename internal/protocols/rtmp/message/{user_control_ping_response.go => msg_user_control_ping_response.go} (100%) rename internal/protocols/rtmp/message/{user_control_set_buffer_length.go => msg_user_control_set_buffer_length.go} (100%) rename internal/protocols/rtmp/message/{user_control_stream_begin.go => msg_user_control_stream_begin.go} (100%) rename internal/protocols/rtmp/message/{user_control_stream_dry.go => msg_user_control_stream_dry.go} (100%) rename internal/protocols/rtmp/message/{user_control_stream_eof.go => msg_user_control_stream_eof.go} (100%) rename internal/protocols/rtmp/message/{user_control_stream_is_recorded.go => msg_user_control_stream_is_recorded.go} (100%) rename internal/protocols/rtmp/message/{video.go => msg_video.go} (100%) rename internal/protocols/rtmp/message/{extended_coded_frames.go => msg_video_ex_coded_frames.go} (71%) rename internal/protocols/rtmp/message/{extended_frames_x.go => msg_video_ex_frames_x.go} (68%) rename internal/protocols/rtmp/message/{extended_metadata.go => msg_video_ex_metadata.go} (74%) create mode 100644 internal/protocols/rtmp/message/msg_video_ex_multitrack.go rename internal/protocols/rtmp/message/{extended_sequence_start.go => msg_video_ex_sequence_start.go} (64%) create mode 100644 internal/protocols/rtmp/message/opus_id_header.go diff --git a/README.md b/README.md index ad412e9a5254..6a623561df5d 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ Live streams can be published to the server with: |[WebRTC servers](#webrtc-servers)|WHEP|AV1, VP9, VP8, [H265](#supported-browsers), H264|Opus, G722, G711 (PCMA, PCMU)| |[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711 (PCMA, PCMU), LPCM and any RTP-compatible codec| |[RTSP cameras and servers](#rtsp-cameras-and-servers)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G726, G722, G711 (PCMA, PCMU), LPCM and any RTP-compatible codec| -|[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G711 (PCMA, PCMU), LPCM| +|[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, G711 (PCMA, PCMU), LPCM| |[RTMP cameras and servers](#rtmp-cameras-and-servers)|RTMP, RTMPS, Enhanced RTMP|AV1, VP9, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G711 (PCMA, PCMU), LPCM| |[HLS cameras and servers](#hls-cameras-and-servers)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, [H265](#supported-browsers-1), H264|Opus, MPEG-4 Audio (AAC)| |[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| @@ -2438,9 +2438,10 @@ All the code in this repository is released under the [MIT License](LICENSE). Co |----|----| |[RTSP / RTP / RTCP specifications](https://github.com/bluenviron/gortsplib#specifications)|RTSP| |[HLS specifications](https://github.com/bluenviron/gohlslib#specifications)|HLS| -|[RTMP](https://rtmp.veriskope.com/pdf/rtmp_specification_1.0.pdf)|RTMP| -|[Enhanced RTMP v1](https://veovera.org/docs/enhanced/enhanced-rtmp-v1.pdf)|RTMP| -|[Action Message Format](https://rtmp.veriskope.com/pdf/amf0-file-format-specification.pdf)|RTMP| +|[Action Message Format - AMF 0](https://veovera.org/docs/legacy/amf0-file-format-spec.pdf)|RTMP| +|[FLV](https://veovera.org/docs/legacy/video-file-format-v10-1-spec.pdf)|RTMP| +|[RTMP](https://veovera.org/docs/legacy/rtmp-v1-0-spec.pdf)|RTMP| +|[Enhanced RTMP v2](https://veovera.org/docs/enhanced/enhanced-rtmp-v2.pdf)|RTMP| |[WebRTC: Real-Time Communication in Browsers](https://www.w3.org/TR/webrtc/)|WebRTC| |[RFC8835, Transports for WebRTC](https://datatracker.ietf.org/doc/html/rfc8835)|WebRTC| |[RFC7742, WebRTC Video Processing and Codec Requirements](https://datatracker.ietf.org/doc/html/rfc7742)|WebRTC| diff --git a/internal/protocols/rtmp/message/extended_mpeg2ts_sequence_start.go b/internal/protocols/rtmp/message/extended_mpeg2ts_sequence_start.go deleted file mode 100644 index f599c3265649..000000000000 --- a/internal/protocols/rtmp/message/extended_mpeg2ts_sequence_start.go +++ /dev/null @@ -1,26 +0,0 @@ -package message - -import ( - "fmt" - - "github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage" -) - -// ExtendedMPEG2TSSequenceStart is a MPEG2-TS sequence start extended message. -type ExtendedMPEG2TSSequenceStart struct { - FourCC FourCC -} - -func (m *ExtendedMPEG2TSSequenceStart) unmarshal(raw *rawmessage.Message) error { - if len(raw.Body) < 5 { - return fmt.Errorf("invalid body size") - } - - m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) - - return fmt.Errorf("ExtendedMPEG2TSSequenceStart is not implemented yet") -} - -func (m ExtendedMPEG2TSSequenceStart) marshal() (*rawmessage.Message, error) { - return nil, fmt.Errorf("TODO") -} diff --git a/internal/protocols/rtmp/message/extended_sequence_end.go b/internal/protocols/rtmp/message/extended_sequence_end.go deleted file mode 100644 index b595225d8677..000000000000 --- a/internal/protocols/rtmp/message/extended_sequence_end.go +++ /dev/null @@ -1,26 +0,0 @@ -package message - -import ( - "fmt" - - "github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage" -) - -// ExtendedSequenceEnd is a sequence end extended message. -type ExtendedSequenceEnd struct { - FourCC FourCC -} - -func (m *ExtendedSequenceEnd) unmarshal(raw *rawmessage.Message) error { - if len(raw.Body) < 5 { - return fmt.Errorf("invalid body size") - } - - m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) - - return nil -} - -func (m ExtendedSequenceEnd) marshal() (*rawmessage.Message, error) { - return nil, fmt.Errorf("TODO") -} diff --git a/internal/protocols/rtmp/message/message.go b/internal/protocols/rtmp/message/message.go index 4d34d0334863..aa609493c4df 100644 --- a/internal/protocols/rtmp/message/message.go +++ b/internal/protocols/rtmp/message/message.go @@ -18,19 +18,15 @@ const ( TypeSetChunkSize Type = 1 TypeAbortMessage Type = 2 TypeAcknowledge Type = 3 + TypeUserControl Type = 4 TypeSetWindowAckSize Type = 5 TypeSetPeerBandwidth Type = 6 - - TypeUserControl Type = 4 - - TypeCommandAMF3 Type = 17 - TypeCommandAMF0 Type = 20 - - TypeDataAMF3 Type = 15 - TypeDataAMF0 Type = 18 - - TypeAudio Type = 8 - TypeVideo Type = 9 + TypeAudio Type = 8 + TypeVideo Type = 9 + TypeDataAMF3 Type = 15 + TypeDataAMF0 Type = 18 + TypeCommandAMF3 Type = 17 + TypeCommandAMF0 Type = 20 ) // UserControlType is a user control type. @@ -47,27 +43,48 @@ const ( UserControlTypePingResponse UserControlType = 7 ) -// ExtendedType is a message extended type. -type ExtendedType uint8 +// AudioExType is an audio message extended type. +type AudioExType uint8 -// message extended types. +// audio message extended types. const ( - ExtendedTypeSequenceStart ExtendedType = 0 - ExtendedTypeCodedFrames ExtendedType = 1 - ExtendedTypeSequenceEnd ExtendedType = 2 - ExtendedTypeFramesX ExtendedType = 3 - ExtendedTypeMetadata ExtendedType = 4 - ExtendedTypeMPEG2TSSequenceStart ExtendedType = 5 + AudioExTypeSequenceStart AudioExType = 0 + AudioExTypeCodedFrames AudioExType = 1 + AudioExTypeSequenceEnd AudioExType = 2 + AudioExTypeMultichannelConfig AudioExType = 4 + AudioExTypeMultitrack AudioExType = 5 ) -// FourCC is an identifier of a video codec. +// VideoExType is a video message extended type. +type VideoExType uint8 + +// video message extended types. +const ( + VideoExTypeSequenceStart VideoExType = 0 + VideoExTypeCodedFrames VideoExType = 1 + VideoExTypeSequenceEnd VideoExType = 2 + VideoExTypeFramesX VideoExType = 3 + VideoExTypeMetadata VideoExType = 4 + VideoExTypeMPEG2TSSequenceStart VideoExType = 5 + VideoExTypeMultitrack VideoExType = 6 +) + +// FourCC is an identifier of a Extended-RTMP codec. type FourCC uint32 -// video codec identifiers. +// codec identifiers. var ( + // video FourCCAV1 FourCC = 'a'<<24 | 'v'<<16 | '0'<<8 | '1' FourCCVP9 FourCC = 'v'<<24 | 'p'<<16 | '0'<<8 | '9' FourCCHEVC FourCC = 'h'<<24 | 'v'<<16 | 'c'<<8 | '1' + FourCCAVC FourCC = 'a'<<24 | 'v'<<16 | 'c'<<8 | '1' + + // audio + FourCCOpus FourCC = 'O'<<24 | 'p'<<16 | 'u'<<8 | 's' + FourCCAC3 FourCC = 'a'<<24 | 'c'<<16 | '-'<<8 | '3' + FourCCMP4A FourCC = 'm'<<24 | 'p'<<16 | '4'<<8 | 'a' + FourCCMP3 FourCC = '.'<<24 | 'm'<<16 | 'p'<<8 | '3' ) // Message is a message. diff --git a/internal/protocols/rtmp/message/acknowledge.go b/internal/protocols/rtmp/message/msg_acknowledge.go similarity index 100% rename from internal/protocols/rtmp/message/acknowledge.go rename to internal/protocols/rtmp/message/msg_acknowledge.go diff --git a/internal/protocols/rtmp/message/audio.go b/internal/protocols/rtmp/message/msg_audio.go similarity index 100% rename from internal/protocols/rtmp/message/audio.go rename to internal/protocols/rtmp/message/msg_audio.go diff --git a/internal/protocols/rtmp/message/msg_audio_ex_coded_frames.go b/internal/protocols/rtmp/message/msg_audio_ex_coded_frames.go new file mode 100644 index 000000000000..7b480b1960ee --- /dev/null +++ b/internal/protocols/rtmp/message/msg_audio_ex_coded_frames.go @@ -0,0 +1,61 @@ +package message + +import ( + "fmt" + "time" + + "github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage" +) + +// AudioExCodedFrames is a CodedFrames extended message. +type AudioExCodedFrames struct { + ChunkStreamID byte + DTS time.Duration + MessageStreamID uint32 + FourCC FourCC + Payload []byte +} + +func (m *AudioExCodedFrames) unmarshal(raw *rawmessage.Message) error { + if len(raw.Body) < 5 { + return fmt.Errorf("not enough bytes") + } + + m.ChunkStreamID = raw.ChunkStreamID + m.DTS = raw.Timestamp + m.MessageStreamID = raw.MessageStreamID + + m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) + switch m.FourCC { + case FourCCOpus, FourCCAC3, FourCCMP4A: + default: + return fmt.Errorf("unsupported fourCC: %v", m.FourCC) + } + + m.Payload = raw.Body[5:] + + return nil +} + +func (m AudioExCodedFrames) marshalBodySize() int { + return 5 + len(m.Payload) +} + +func (m AudioExCodedFrames) marshal() (*rawmessage.Message, error) { + body := make([]byte, m.marshalBodySize()) + + body[0] = (9 << 4) | byte(AudioExTypeCodedFrames) + body[1] = uint8(m.FourCC >> 24) + body[2] = uint8(m.FourCC >> 16) + body[3] = uint8(m.FourCC >> 8) + body[4] = uint8(m.FourCC) + copy(body[5:], m.Payload) + + return &rawmessage.Message{ + ChunkStreamID: m.ChunkStreamID, + Timestamp: m.DTS, + Type: uint8(TypeAudio), + MessageStreamID: m.MessageStreamID, + Body: body, + }, nil +} diff --git a/internal/protocols/rtmp/message/msg_audio_ex_multichannel_config.go b/internal/protocols/rtmp/message/msg_audio_ex_multichannel_config.go new file mode 100644 index 000000000000..9fb4ce8a8013 --- /dev/null +++ b/internal/protocols/rtmp/message/msg_audio_ex_multichannel_config.go @@ -0,0 +1,76 @@ +package message + +import ( + "fmt" + + "github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage" +) + +// AudioExChannelOrder is an audio channel order. +type AudioExChannelOrder uint8 + +// audio channel orders. +const ( + AudioExChannelOrderUnspecified AudioExChannelOrder = 0 + AudioExChannelOrderNative AudioExChannelOrder = 1 + AudioExChannelOrderCustom AudioExChannelOrder = 2 +) + +// AudioExMultichannelConfig is a multichannel config extended message. +type AudioExMultichannelConfig struct { + ChunkStreamID byte + MessageStreamID uint32 + FourCC FourCC + AudioChannelOrder AudioExChannelOrder + ChannelCount uint8 + AudioChannelMapping uint8 // if AudioChannelOrder == AudioExChannelOrderCustom + AudioChannelFlags uint32 // if AudioChannelOrder == AudioExChannelOrderNative +} + +func (m *AudioExMultichannelConfig) unmarshal(raw *rawmessage.Message) error { + if len(raw.Body) < 7 { + return fmt.Errorf("not enough bytes") + } + + m.ChunkStreamID = raw.ChunkStreamID + m.MessageStreamID = raw.MessageStreamID + + m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) + switch m.FourCC { + case FourCCOpus, FourCCAC3, FourCCMP4A: + default: + return fmt.Errorf("unsupported fourCC: %v", m.FourCC) + } + + m.AudioChannelOrder = AudioExChannelOrder(raw.Body[5]) + m.ChannelCount = raw.Body[6] + + switch m.AudioChannelOrder { + case AudioExChannelOrderCustom: + if len(raw.Body) != 8 { + return fmt.Errorf("invalid AudioExMultichannelConfig size") + } + m.AudioChannelMapping = raw.Body[7] + + case AudioExChannelOrderNative: + if len(raw.Body) != 11 { + return fmt.Errorf("invalid AudioExMultichannelConfig size") + } + m.AudioChannelFlags = uint32(raw.Body[7])<<24 | uint32(raw.Body[8])<<16 | + uint32(raw.Body[9])<<8 | uint32(raw.Body[10]) + + case AudioExChannelOrderUnspecified: + if len(raw.Body) != 7 { + return fmt.Errorf("invalid AudioExMultichannelConfig size") + } + + default: + return fmt.Errorf("invalid AudioChannelOrder: %v", m.AudioChannelOrder) + } + + return nil +} + +func (m AudioExMultichannelConfig) marshal() (*rawmessage.Message, error) { + return nil, fmt.Errorf("TODO") +} diff --git a/internal/protocols/rtmp/message/msg_audio_ex_multitrack.go b/internal/protocols/rtmp/message/msg_audio_ex_multitrack.go new file mode 100644 index 000000000000..289b111504ff --- /dev/null +++ b/internal/protocols/rtmp/message/msg_audio_ex_multitrack.go @@ -0,0 +1,78 @@ +package message //nolint:dupl + +import ( + "fmt" + "time" + + "github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage" +) + +// AudioExMultitrackType is a multitrack type. +type AudioExMultitrackType uint8 + +// multitrack types. +const ( + AudioExMultitrackTypeOneTrack AudioExMultitrackType = 0 + AudioExMultitrackTypeManyTracks AudioExMultitrackType = 1 + AudioExMultitrackTypeManyTracksManyCodecs AudioExMultitrackType = 2 +) + +// AudioExMultitrack is a multitrack extended message. +type AudioExMultitrack struct { + ChunkStreamID byte + DTS time.Duration + MessageStreamID uint32 + MultitrackType AudioExMultitrackType + TrackID uint8 + Wrapped Message +} + +func (m *AudioExMultitrack) unmarshal(raw *rawmessage.Message) error { + if len(raw.Body) < 7 { + return fmt.Errorf("not enough bytes") + } + + m.ChunkStreamID = raw.ChunkStreamID + m.DTS = raw.Timestamp + m.MessageStreamID = raw.MessageStreamID + + m.MultitrackType = AudioExMultitrackType(raw.Body[1] >> 4) + switch m.MultitrackType { + case AudioExMultitrackTypeOneTrack: + default: + return fmt.Errorf("unsupported multitrack type: %v", m.MultitrackType) + } + + packetType := AudioExType(raw.Body[1] & 0b1111) + switch packetType { + case AudioExTypeSequenceStart: + m.Wrapped = &AudioExSequenceStart{} + + case AudioExTypeMultichannelConfig: + m.Wrapped = &AudioExMultichannelConfig{} + + case AudioExTypeCodedFrames: + m.Wrapped = &AudioExCodedFrames{} + + default: + return fmt.Errorf("unsupported multitrack packet type: %v", packetType) + } + + m.TrackID = raw.Body[6] + + wrappedBody := make([]byte, 5+len(raw.Body[7:])) + copy(wrappedBody[1:], raw.Body[2:]) // fourCC + copy(wrappedBody[5:], raw.Body[7:]) // body + err := m.Wrapped.unmarshal(&rawmessage.Message{ + Body: wrappedBody, + }) + if err != nil { + return err + } + + return nil +} + +func (m AudioExMultitrack) marshal() (*rawmessage.Message, error) { + return nil, fmt.Errorf("TODO") +} diff --git a/internal/protocols/rtmp/message/msg_audio_ex_sequence_start.go b/internal/protocols/rtmp/message/msg_audio_ex_sequence_start.go new file mode 100644 index 000000000000..14d8a56c1ee6 --- /dev/null +++ b/internal/protocols/rtmp/message/msg_audio_ex_sequence_start.go @@ -0,0 +1,57 @@ +package message + +import ( + "fmt" + + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" + "github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage" +) + +// AudioExSequenceStart is a sequence start extended message. +type AudioExSequenceStart struct { + ChunkStreamID byte + MessageStreamID uint32 + FourCC FourCC + OpusIDHeader *OpusIDHeader + MP4AudioConfig *mpeg4audio.AudioSpecificConfig +} + +func (m *AudioExSequenceStart) unmarshal(raw *rawmessage.Message) error { + if len(raw.Body) < 5 { + return fmt.Errorf("not enough bytes") + } + + m.ChunkStreamID = raw.ChunkStreamID + m.MessageStreamID = raw.MessageStreamID + + m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) + switch m.FourCC { + case FourCCOpus: + m.OpusIDHeader = &OpusIDHeader{} + err := m.OpusIDHeader.unmarshal(raw.Body[5:]) + if err != nil { + return fmt.Errorf("invalid Opus ID header: %w", err) + } + + case FourCCAC3: + if len(raw.Body) != 5 { + return fmt.Errorf("unexpected size") + } + + case FourCCMP4A: + m.MP4AudioConfig = &mpeg4audio.AudioSpecificConfig{} + err := m.MP4AudioConfig.Unmarshal(raw.Body[5:]) + if err != nil { + return fmt.Errorf("invalid MPEG-4 audio config: %w", err) + } + + default: + return fmt.Errorf("unsupported fourCC: %v", m.FourCC) + } + + return nil +} + +func (m AudioExSequenceStart) marshal() (*rawmessage.Message, error) { + return nil, fmt.Errorf("TODO") +} diff --git a/internal/protocols/rtmp/message/command_amf0.go b/internal/protocols/rtmp/message/msg_command_amf0.go similarity index 100% rename from internal/protocols/rtmp/message/command_amf0.go rename to internal/protocols/rtmp/message/msg_command_amf0.go diff --git a/internal/protocols/rtmp/message/command_amf0_test.go b/internal/protocols/rtmp/message/msg_command_amf0_test.go similarity index 100% rename from internal/protocols/rtmp/message/command_amf0_test.go rename to internal/protocols/rtmp/message/msg_command_amf0_test.go diff --git a/internal/protocols/rtmp/message/data_amf0.go b/internal/protocols/rtmp/message/msg_data_amf0.go similarity index 100% rename from internal/protocols/rtmp/message/data_amf0.go rename to internal/protocols/rtmp/message/msg_data_amf0.go diff --git a/internal/protocols/rtmp/message/set_chunk_size.go b/internal/protocols/rtmp/message/msg_set_chunk_size.go similarity index 100% rename from internal/protocols/rtmp/message/set_chunk_size.go rename to internal/protocols/rtmp/message/msg_set_chunk_size.go diff --git a/internal/protocols/rtmp/message/set_peer_bandwidth.go b/internal/protocols/rtmp/message/msg_set_peer_bandwidth.go similarity index 100% rename from internal/protocols/rtmp/message/set_peer_bandwidth.go rename to internal/protocols/rtmp/message/msg_set_peer_bandwidth.go diff --git a/internal/protocols/rtmp/message/set_window_ack_size.go b/internal/protocols/rtmp/message/msg_set_window_ack_size.go similarity index 100% rename from internal/protocols/rtmp/message/set_window_ack_size.go rename to internal/protocols/rtmp/message/msg_set_window_ack_size.go diff --git a/internal/protocols/rtmp/message/user_control_ping_request.go b/internal/protocols/rtmp/message/msg_user_control_ping_request.go similarity index 100% rename from internal/protocols/rtmp/message/user_control_ping_request.go rename to internal/protocols/rtmp/message/msg_user_control_ping_request.go diff --git a/internal/protocols/rtmp/message/user_control_ping_response.go b/internal/protocols/rtmp/message/msg_user_control_ping_response.go similarity index 100% rename from internal/protocols/rtmp/message/user_control_ping_response.go rename to internal/protocols/rtmp/message/msg_user_control_ping_response.go diff --git a/internal/protocols/rtmp/message/user_control_set_buffer_length.go b/internal/protocols/rtmp/message/msg_user_control_set_buffer_length.go similarity index 100% rename from internal/protocols/rtmp/message/user_control_set_buffer_length.go rename to internal/protocols/rtmp/message/msg_user_control_set_buffer_length.go diff --git a/internal/protocols/rtmp/message/user_control_stream_begin.go b/internal/protocols/rtmp/message/msg_user_control_stream_begin.go similarity index 100% rename from internal/protocols/rtmp/message/user_control_stream_begin.go rename to internal/protocols/rtmp/message/msg_user_control_stream_begin.go diff --git a/internal/protocols/rtmp/message/user_control_stream_dry.go b/internal/protocols/rtmp/message/msg_user_control_stream_dry.go similarity index 100% rename from internal/protocols/rtmp/message/user_control_stream_dry.go rename to internal/protocols/rtmp/message/msg_user_control_stream_dry.go diff --git a/internal/protocols/rtmp/message/user_control_stream_eof.go b/internal/protocols/rtmp/message/msg_user_control_stream_eof.go similarity index 100% rename from internal/protocols/rtmp/message/user_control_stream_eof.go rename to internal/protocols/rtmp/message/msg_user_control_stream_eof.go diff --git a/internal/protocols/rtmp/message/user_control_stream_is_recorded.go b/internal/protocols/rtmp/message/msg_user_control_stream_is_recorded.go similarity index 100% rename from internal/protocols/rtmp/message/user_control_stream_is_recorded.go rename to internal/protocols/rtmp/message/msg_user_control_stream_is_recorded.go diff --git a/internal/protocols/rtmp/message/video.go b/internal/protocols/rtmp/message/msg_video.go similarity index 100% rename from internal/protocols/rtmp/message/video.go rename to internal/protocols/rtmp/message/msg_video.go diff --git a/internal/protocols/rtmp/message/extended_coded_frames.go b/internal/protocols/rtmp/message/msg_video_ex_coded_frames.go similarity index 71% rename from internal/protocols/rtmp/message/extended_coded_frames.go rename to internal/protocols/rtmp/message/msg_video_ex_coded_frames.go index 5759f4b8ea21..59aac3ec535a 100644 --- a/internal/protocols/rtmp/message/extended_coded_frames.go +++ b/internal/protocols/rtmp/message/msg_video_ex_coded_frames.go @@ -7,8 +7,8 @@ import ( "github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage" ) -// ExtendedCodedFrames is a CodedFrames extended message. -type ExtendedCodedFrames struct { +// VideoExCodedFrames is a CodedFrames extended message. +type VideoExCodedFrames struct { ChunkStreamID byte DTS time.Duration MessageStreamID uint32 @@ -17,27 +17,35 @@ type ExtendedCodedFrames struct { Payload []byte } -func (m *ExtendedCodedFrames) unmarshal(raw *rawmessage.Message) error { - if len(raw.Body) < 8 { +func (m *VideoExCodedFrames) unmarshal(raw *rawmessage.Message) error { + if len(raw.Body) < 5 { return fmt.Errorf("not enough bytes") } m.ChunkStreamID = raw.ChunkStreamID m.DTS = raw.Timestamp m.MessageStreamID = raw.MessageStreamID - m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) - if m.FourCC == FourCCHEVC { + m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) + switch m.FourCC { + case FourCCAVC, FourCCHEVC: + if len(raw.Body) < 8 { + return fmt.Errorf("not enough bytes") + } m.PTSDelta = time.Duration(uint32(raw.Body[5])<<16|uint32(raw.Body[6])<<8|uint32(raw.Body[7])) * time.Millisecond m.Payload = raw.Body[8:] - } else { + + case FourCCAV1, FourCCVP9: m.Payload = raw.Body[5:] + + default: + return fmt.Errorf("unsupported fourCC: %v", m.FourCC) } return nil } -func (m ExtendedCodedFrames) marshalBodySize() int { +func (m VideoExCodedFrames) marshalBodySize() int { var l int if m.FourCC == FourCCHEVC { l = 8 + len(m.Payload) @@ -47,10 +55,10 @@ func (m ExtendedCodedFrames) marshalBodySize() int { return l } -func (m ExtendedCodedFrames) marshal() (*rawmessage.Message, error) { +func (m VideoExCodedFrames) marshal() (*rawmessage.Message, error) { body := make([]byte, m.marshalBodySize()) - body[0] = 0b10000000 | byte(ExtendedTypeCodedFrames) + body[0] = 0b10000000 | byte(VideoExTypeCodedFrames) body[1] = uint8(m.FourCC >> 24) body[2] = uint8(m.FourCC >> 16) body[3] = uint8(m.FourCC >> 8) diff --git a/internal/protocols/rtmp/message/extended_frames_x.go b/internal/protocols/rtmp/message/msg_video_ex_frames_x.go similarity index 68% rename from internal/protocols/rtmp/message/extended_frames_x.go rename to internal/protocols/rtmp/message/msg_video_ex_frames_x.go index a85ea764e3cb..9a7e80612456 100644 --- a/internal/protocols/rtmp/message/extended_frames_x.go +++ b/internal/protocols/rtmp/message/msg_video_ex_frames_x.go @@ -7,8 +7,8 @@ import ( "github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage" ) -// ExtendedFramesX is a FramesX extended message. -type ExtendedFramesX struct { +// VideoExFramesX is a FramesX extended message. +type VideoExFramesX struct { ChunkStreamID byte DTS time.Duration MessageStreamID uint32 @@ -16,7 +16,7 @@ type ExtendedFramesX struct { Payload []byte } -func (m *ExtendedFramesX) unmarshal(raw *rawmessage.Message) error { +func (m *VideoExFramesX) unmarshal(raw *rawmessage.Message) error { if len(raw.Body) < 6 { return fmt.Errorf("not enough bytes") } @@ -24,20 +24,27 @@ func (m *ExtendedFramesX) unmarshal(raw *rawmessage.Message) error { m.ChunkStreamID = raw.ChunkStreamID m.DTS = raw.Timestamp m.MessageStreamID = raw.MessageStreamID + m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) + switch m.FourCC { + case FourCCAV1, FourCCVP9, FourCCHEVC, FourCCAVC: + default: + return fmt.Errorf("unsupported fourCC: %v", m.FourCC) + } + m.Payload = raw.Body[5:] return nil } -func (m ExtendedFramesX) marshalBodySize() int { +func (m VideoExFramesX) marshalBodySize() int { return 5 + len(m.Payload) } -func (m ExtendedFramesX) marshal() (*rawmessage.Message, error) { +func (m VideoExFramesX) marshal() (*rawmessage.Message, error) { body := make([]byte, m.marshalBodySize()) - body[0] = 0b10000000 | byte(ExtendedTypeFramesX) + body[0] = 0b10000000 | byte(VideoExTypeFramesX) body[1] = uint8(m.FourCC >> 24) body[2] = uint8(m.FourCC >> 16) body[3] = uint8(m.FourCC >> 8) diff --git a/internal/protocols/rtmp/message/extended_metadata.go b/internal/protocols/rtmp/message/msg_video_ex_metadata.go similarity index 74% rename from internal/protocols/rtmp/message/extended_metadata.go rename to internal/protocols/rtmp/message/msg_video_ex_metadata.go index 8a8ee5172c5a..5cd689628a84 100644 --- a/internal/protocols/rtmp/message/extended_metadata.go +++ b/internal/protocols/rtmp/message/msg_video_ex_metadata.go @@ -8,8 +8,8 @@ import ( "github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage" ) -// ExtendedMetadata is a metadata extended message. -type ExtendedMetadata struct { +// VideoExMetadata is a metadata extended message. +type VideoExMetadata struct { ChunkStreamID byte DTS time.Duration MessageStreamID uint32 @@ -17,7 +17,7 @@ type ExtendedMetadata struct { Payload amf0.Data } -func (m *ExtendedMetadata) unmarshal(raw *rawmessage.Message) error { +func (m *VideoExMetadata) unmarshal(raw *rawmessage.Message) error { if len(raw.Body) < 6 { return fmt.Errorf("invalid body size") } @@ -25,7 +25,13 @@ func (m *ExtendedMetadata) unmarshal(raw *rawmessage.Message) error { m.ChunkStreamID = raw.ChunkStreamID m.DTS = raw.Timestamp m.MessageStreamID = raw.MessageStreamID + m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) + switch m.FourCC { + case FourCCAV1, FourCCVP9, FourCCHEVC: + default: + return fmt.Errorf("unsupported fourCC: %v", m.FourCC) + } var err error m.Payload, err = amf0.Unmarshal(raw.Body[5:]) @@ -36,7 +42,7 @@ func (m *ExtendedMetadata) unmarshal(raw *rawmessage.Message) error { return nil } -func (m ExtendedMetadata) marshalBodySize() (int, error) { +func (m VideoExMetadata) marshalBodySize() (int, error) { ms, err := m.Payload.MarshalSize() if err != nil { return 0, err @@ -44,14 +50,14 @@ func (m ExtendedMetadata) marshalBodySize() (int, error) { return 5 + ms, nil } -func (m ExtendedMetadata) marshal() (*rawmessage.Message, error) { +func (m VideoExMetadata) marshal() (*rawmessage.Message, error) { mbs, err := m.marshalBodySize() if err != nil { return nil, err } body := make([]byte, mbs) - body[0] = 0b10000000 | byte(ExtendedTypeMetadata) + body[0] = 0b10000000 | byte(VideoExTypeMetadata) body[1] = uint8(m.FourCC >> 24) body[2] = uint8(m.FourCC >> 16) body[3] = uint8(m.FourCC >> 8) diff --git a/internal/protocols/rtmp/message/msg_video_ex_multitrack.go b/internal/protocols/rtmp/message/msg_video_ex_multitrack.go new file mode 100644 index 000000000000..cbb9f6dd74e0 --- /dev/null +++ b/internal/protocols/rtmp/message/msg_video_ex_multitrack.go @@ -0,0 +1,78 @@ +package message //nolint:dupl + +import ( + "fmt" + "time" + + "github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage" +) + +// VideoExMultitrackType is a multitrack type. +type VideoExMultitrackType uint8 + +// multitrack types. +const ( + VideoExMultitrackTypeOneTrack VideoExMultitrackType = 0 + VideoExMultitrackTypeManyTracks VideoExMultitrackType = 1 + VideoExMultitrackTypeManyTracksManyCodecs VideoExMultitrackType = 2 +) + +// VideoExMultitrack is a multitrack extended message. +type VideoExMultitrack struct { + ChunkStreamID byte + DTS time.Duration + MessageStreamID uint32 + MultitrackType VideoExMultitrackType + TrackID uint8 + Wrapped Message +} + +func (m *VideoExMultitrack) unmarshal(raw *rawmessage.Message) error { + if len(raw.Body) < 7 { + return fmt.Errorf("not enough bytes") + } + + m.ChunkStreamID = raw.ChunkStreamID + m.DTS = raw.Timestamp + m.MessageStreamID = raw.MessageStreamID + + m.MultitrackType = VideoExMultitrackType(raw.Body[1] >> 4) + switch m.MultitrackType { + case VideoExMultitrackTypeOneTrack: + default: + return fmt.Errorf("unsupported multitrack type: %v", m.MultitrackType) + } + + packetType := VideoExType(raw.Body[1] & 0b1111) + switch packetType { + case VideoExTypeSequenceStart: + m.Wrapped = &VideoExSequenceStart{} + + case VideoExTypeCodedFrames: + m.Wrapped = &VideoExCodedFrames{} + + case VideoExTypeFramesX: + m.Wrapped = &VideoExFramesX{} + + default: + return fmt.Errorf("unsupported multitrack packet type: %v", packetType) + } + + m.TrackID = raw.Body[6] + + wrappedBody := make([]byte, 5+len(raw.Body[7:])) + copy(wrappedBody[1:], raw.Body[2:]) // fourCC + copy(wrappedBody[5:], raw.Body[7:]) // body + err := m.Wrapped.unmarshal(&rawmessage.Message{ + Body: wrappedBody, + }) + if err != nil { + return err + } + + return nil +} + +func (m VideoExMultitrack) marshal() (*rawmessage.Message, error) { + return nil, fmt.Errorf("TODO") +} diff --git a/internal/protocols/rtmp/message/extended_sequence_start.go b/internal/protocols/rtmp/message/msg_video_ex_sequence_start.go similarity index 64% rename from internal/protocols/rtmp/message/extended_sequence_start.go rename to internal/protocols/rtmp/message/msg_video_ex_sequence_start.go index 59804c6b6942..6640f039bdd9 100644 --- a/internal/protocols/rtmp/message/extended_sequence_start.go +++ b/internal/protocols/rtmp/message/msg_video_ex_sequence_start.go @@ -6,35 +6,42 @@ import ( "github.com/bluenviron/mediamtx/internal/protocols/rtmp/rawmessage" ) -// ExtendedSequenceStart is a sequence start extended message. -type ExtendedSequenceStart struct { +// VideoExSequenceStart is a sequence start extended message. +type VideoExSequenceStart struct { ChunkStreamID byte MessageStreamID uint32 FourCC FourCC Config []byte } -func (m *ExtendedSequenceStart) unmarshal(raw *rawmessage.Message) error { +func (m *VideoExSequenceStart) unmarshal(raw *rawmessage.Message) error { if len(raw.Body) < 6 { return fmt.Errorf("not enough bytes") } m.ChunkStreamID = raw.ChunkStreamID m.MessageStreamID = raw.MessageStreamID + m.FourCC = FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) + switch m.FourCC { + case FourCCAV1, FourCCVP9, FourCCHEVC, FourCCAVC: + default: + return fmt.Errorf("unsupported fourCC: %v", m.FourCC) + } + m.Config = raw.Body[5:] return nil } -func (m ExtendedSequenceStart) marshalBodySize() int { +func (m VideoExSequenceStart) marshalBodySize() int { return 5 + len(m.Config) } -func (m ExtendedSequenceStart) marshal() (*rawmessage.Message, error) { +func (m VideoExSequenceStart) marshal() (*rawmessage.Message, error) { body := make([]byte, m.marshalBodySize()) - body[0] = 0b10000000 | byte(ExtendedTypeSequenceStart) + body[0] = 0b10000000 | byte(VideoExTypeSequenceStart) body[1] = uint8(m.FourCC >> 24) body[2] = uint8(m.FourCC >> 16) body[3] = uint8(m.FourCC >> 8) diff --git a/internal/protocols/rtmp/message/opus_id_header.go b/internal/protocols/rtmp/message/opus_id_header.go new file mode 100644 index 000000000000..7f2b83102c9f --- /dev/null +++ b/internal/protocols/rtmp/message/opus_id_header.go @@ -0,0 +1,42 @@ +package message + +import ( + "bytes" + "fmt" +) + +// OpusIDHeader is an Opus identification header. +// Specification: https://datatracker.ietf.org/doc/html/rfc7845#section-5.1 +type OpusIDHeader struct { + Version uint8 + ChannelCount uint8 + PreSkip uint16 + InputSampleRate uint32 + OutputGain uint16 + ChannelMappingFamily uint8 + ChannelMappingTable []uint8 +} + +func (h *OpusIDHeader) unmarshal(buf []byte) error { + if len(buf) < 19 { + return fmt.Errorf("not enough bytes") + } + + if !bytes.Equal(buf[:8], []byte{'O', 'p', 'u', 's', 'H', 'e', 'a', 'd'}) { + return fmt.Errorf("magit signature not corresponds") + } + + h.Version = buf[8] + if h.Version != 1 { + return fmt.Errorf("invalid version: %v", h.Version) + } + + h.ChannelCount = buf[9] + h.PreSkip = uint16(buf[10])<<8 | uint16(buf[11]) + h.InputSampleRate = uint32(buf[12])<<24 | uint32(buf[13])<<16 | uint32(buf[14])<<8 | uint32(buf[15]) + h.OutputGain = uint16(buf[16])<<8 | uint16(buf[17]) + h.ChannelMappingFamily = uint8(buf[18]) + h.ChannelMappingTable = buf[19:] + + return nil +} diff --git a/internal/protocols/rtmp/message/reader.go b/internal/protocols/rtmp/message/reader.go index b4a34b63adba..957f489969bb 100644 --- a/internal/protocols/rtmp/message/reader.go +++ b/internal/protocols/rtmp/message/reader.go @@ -62,45 +62,59 @@ func allocateMessage(raw *rawmessage.Message) (Message, error) { return &DataAMF0{}, nil case TypeAudio: - return &Audio{}, nil - - case TypeVideo: - if len(raw.Body) < 5 { + if len(raw.Body) < 1 { return nil, fmt.Errorf("not enough bytes") } - if (raw.Body[0] & 0b10000000) != 0 { - fourCC := FourCC(raw.Body[1])<<24 | FourCC(raw.Body[2])<<16 | FourCC(raw.Body[3])<<8 | FourCC(raw.Body[4]) + if (raw.Body[0] >> 4) == 9 { + extendedType := AudioExType(raw.Body[0] & 0x0F) + + switch extendedType { + case AudioExTypeSequenceStart: + return &AudioExSequenceStart{}, nil + + case AudioExTypeMultichannelConfig: + return &AudioExMultichannelConfig{}, nil + + case AudioExTypeCodedFrames: + return &AudioExCodedFrames{}, nil + + case AudioExTypeMultitrack: + return &AudioExMultitrack{}, nil - switch fourCC { - case FourCCAV1, FourCCVP9, FourCCHEVC: default: - return nil, fmt.Errorf("invalid fourCC: %v", fourCC) + return nil, fmt.Errorf("unsupported audio extended type: %v", extendedType) } + } - extendedType := ExtendedType(raw.Body[0] & 0x0F) + return &Audio{}, nil - switch extendedType { - case ExtendedTypeSequenceStart: - return &ExtendedSequenceStart{}, nil + case TypeVideo: + if len(raw.Body) < 1 { + return nil, fmt.Errorf("not enough bytes") + } + + if (raw.Body[0] & 0b10000000) != 0 { + extendedType := VideoExType(raw.Body[0] & 0x0F) - case ExtendedTypeCodedFrames: - return &ExtendedCodedFrames{}, nil + switch extendedType { + case VideoExTypeSequenceStart: + return &VideoExSequenceStart{}, nil - case ExtendedTypeSequenceEnd: - return &ExtendedSequenceEnd{}, nil + case VideoExTypeCodedFrames: + return &VideoExCodedFrames{}, nil - case ExtendedTypeFramesX: - return &ExtendedFramesX{}, nil + case VideoExTypeFramesX: + return &VideoExFramesX{}, nil - case ExtendedTypeMetadata: - return &ExtendedMetadata{}, nil + case VideoExTypeMetadata: + return &VideoExMetadata{}, nil - case ExtendedTypeMPEG2TSSequenceStart: - return &ExtendedMPEG2TSSequenceStart{}, nil + case VideoExTypeMultitrack: + return &VideoExMultitrack{}, nil default: - return nil, fmt.Errorf("invalid extended type: %v", extendedType) + return nil, fmt.Errorf("unsupported video extended type: %v", extendedType) } } return &Video{}, nil diff --git a/internal/protocols/rtmp/message/reader_test.go b/internal/protocols/rtmp/message/reader_test.go index b2f8239ddefd..5a86ac90acad 100644 --- a/internal/protocols/rtmp/message/reader_test.go +++ b/internal/protocols/rtmp/message/reader_test.go @@ -22,8 +22,8 @@ var readWriterCases = []struct { Value: 45953968, }, []byte{ - 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x3, - 0x0, 0x0, 0x0, 0x0, 0x2, 0xbd, 0x33, 0xb0, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x03, + 0x00, 0x00, 0x00, 0x00, 0x02, 0xbd, 0x33, 0xb0, }, }, { @@ -39,8 +39,9 @@ var readWriterCases = []struct { Payload: []byte{0x01, 0x02, 0x03, 0x04}, }, []byte{ - 0x7, 0x5b, 0xc3, 0x6e, 0x0, 0x0, 0x5, 0x8, 0x0, 0x45, 0x31, 0xf, 0x2f, - 0x01, 0x02, 0x03, 0x04, + 0x07, 0x5b, 0xc3, 0x6e, 0x00, 0x00, 0x05, 0x08, + 0x00, 0x45, 0x31, 0x0f, 0x2f, 0x01, 0x02, 0x03, + 0x04, }, }, { @@ -57,8 +58,8 @@ var readWriterCases = []struct { Payload: []byte{0x5A, 0xC0, 0x77, 0x40}, }, []byte{ - 0x7, 0x5b, 0xc3, 0x6e, 0x0, 0x0, 0x6, 0x8, - 0x0, 0x45, 0x31, 0xf, 0xaf, 0x1, 0x5a, 0xc0, + 0x07, 0x5b, 0xc3, 0x6e, 0x00, 0x00, 0x06, 0x08, + 0x00, 0x45, 0x31, 0x0f, 0xaf, 0x01, 0x5a, 0xc0, 0x77, 0x40, }, }, @@ -78,14 +79,14 @@ var readWriterCases = []struct { }, }, []byte{ - 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2f, 0x14, - 0x0, 0x5, 0x44, 0x9b, 0x2, 0x0, 0xc, 0x69, + 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2f, 0x14, + 0x00, 0x05, 0x44, 0x9b, 0x02, 0x00, 0x0c, 0x69, 0x38, 0x79, 0x79, 0x74, 0x68, 0x72, 0x65, 0x72, - 0x67, 0x72, 0x65, 0x0, 0x40, 0xeb, 0x91, 0x0, - 0x0, 0x0, 0x0, 0x0, 0x3, 0x0, 0x2, 0x6b, - 0x31, 0x2, 0x0, 0x2, 0x76, 0x31, 0x0, 0x2, - 0x6b, 0x32, 0x2, 0x0, 0x2, 0x76, 0x32, 0x0, - 0x0, 0x9, 0x5, + 0x67, 0x72, 0x65, 0x00, 0x40, 0xeb, 0x91, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x02, 0x6b, + 0x31, 0x02, 0x00, 0x02, 0x76, 0x31, 0x00, 0x02, + 0x6b, 0x32, 0x02, 0x00, 0x02, 0x76, 0x32, 0x00, + 0x00, 0x09, 0x05, }, }, { @@ -100,9 +101,9 @@ var readWriterCases = []struct { }, }, []byte{ - 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x13, 0x12, - 0x0, 0x5, 0x44, 0x9b, 0x0, 0x40, 0x6d, 0x40, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x6, + 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x13, 0x12, + 0x00, 0x05, 0x44, 0x9b, 0x00, 0x40, 0x6d, 0x40, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x06, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x05, }, }, @@ -112,8 +113,8 @@ var readWriterCases = []struct { Value: 10000, }, []byte{ - 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x1, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x27, 0x10, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x01, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x27, 0x10, }, }, { @@ -122,8 +123,8 @@ var readWriterCases = []struct { Value: 10000, }, []byte{ - 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x1, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x27, 0x10, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x01, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x27, 0x10, }, }, { @@ -132,8 +133,8 @@ var readWriterCases = []struct { Value: 10000, }, []byte{ - 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x1, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x27, 0x10, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x01, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x27, 0x10, }, }, { @@ -142,8 +143,8 @@ var readWriterCases = []struct { ServerTime: 569834435, }, []byte{ - 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x4, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x21, 0xf6, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x04, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x21, 0xf6, 0xfb, 0xc3, }, }, @@ -153,8 +154,8 @@ var readWriterCases = []struct { ServerTime: 569834435, }, []byte{ - 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x4, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x7, 0x21, 0xf6, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x04, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x07, 0x21, 0xf6, 0xfb, 0xc3, }, }, @@ -165,9 +166,9 @@ var readWriterCases = []struct { BufferLength: 235345, }, []byte{ - 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa, 0x4, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x0, 0x0, - 0x8a, 0xce, 0x0, 0x3, 0x97, 0x51, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x04, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x8a, 0xce, 0x00, 0x03, 0x97, 0x51, }, }, { @@ -176,8 +177,8 @@ var readWriterCases = []struct { StreamID: 35534, }, []byte{ - 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x4, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x04, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x8a, 0xce, }, }, @@ -187,8 +188,8 @@ var readWriterCases = []struct { StreamID: 35534, }, []byte{ - 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x4, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x04, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x8a, 0xce, }, }, @@ -198,8 +199,8 @@ var readWriterCases = []struct { StreamID: 35534, }, []byte{ - 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x4, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x04, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x8a, 0xce, }, }, @@ -209,8 +210,8 @@ var readWriterCases = []struct { StreamID: 35534, }, []byte{ - 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x6, 0x4, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x04, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x8a, 0xce, }, }, @@ -234,7 +235,7 @@ var readWriterCases = []struct { }, { "extended sequence start", - &ExtendedSequenceStart{ + &VideoExSequenceStart{ ChunkStreamID: 4, MessageStreamID: 0x1000000, FourCC: FourCCHEVC, @@ -248,7 +249,7 @@ var readWriterCases = []struct { }, { "extended coded frames", - &ExtendedCodedFrames{ + &VideoExCodedFrames{ ChunkStreamID: 4, DTS: 15100 * time.Millisecond, MessageStreamID: 0x1000000, @@ -264,7 +265,7 @@ var readWriterCases = []struct { }, { "extended frames x", - &ExtendedFramesX{ + &VideoExFramesX{ ChunkStreamID: 4, DTS: 15100 * time.Millisecond, MessageStreamID: 0x1000000, @@ -279,7 +280,7 @@ var readWriterCases = []struct { }, { "extended metadata", - &ExtendedMetadata{ + &VideoExMetadata{ ChunkStreamID: 0x6, DTS: 0, MessageStreamID: 0x1000000, diff --git a/internal/protocols/rtmp/reader.go b/internal/protocols/rtmp/reader.go index b111780b8058..d28e5040314e 100644 --- a/internal/protocols/rtmp/reader.go +++ b/internal/protocols/rtmp/reader.go @@ -8,6 +8,7 @@ import ( "github.com/abema/go-mp4" "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/pkg/codecs/ac3" "github.com/bluenviron/mediacommon/pkg/codecs/av1" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/h265" @@ -31,12 +32,18 @@ type OnDataVP9Func func(pts time.Duration, frame []byte) // OnDataH26xFunc is the prototype of the callback passed to OnDataH26x(). type OnDataH26xFunc func(pts time.Duration, au [][]byte) +// OnDataOpusFunc is the prototype of the callback passed to OnDataOpus(). +type OnDataOpusFunc func(pts time.Duration, packet []byte) + // OnDataMPEG4AudioFunc is the prototype of the callback passed to OnDataMPEG4Audio(). type OnDataMPEG4AudioFunc func(pts time.Duration, au []byte) // OnDataMPEG1AudioFunc is the prototype of the callback passed to OnDataMPEG1Audio(). type OnDataMPEG1AudioFunc func(pts time.Duration, frame []byte) +// OnDataAC3Func is the prototype of the callback passed to OnDataAC3(). +type OnDataAC3Func func(pts time.Duration, frame []byte) + // OnDataG711Func is the prototype of the callback passed to OnDataG711(). type OnDataG711Func func(pts time.Duration, samples []byte) @@ -67,7 +74,7 @@ func hasVideo(md amf0.Object) (bool, error) { } case string: - if vt == "avc1" || vt == "hvc1" || vt == "av01" { + if vt == "avc1" || vt == "hvc1" || vt == "av01" || vt == "vp09" { return true, nil } } @@ -75,7 +82,7 @@ func hasVideo(md amf0.Object) (bool, error) { return false, fmt.Errorf("unsupported video codec: %v", v) } -func hasAudio(md amf0.Object, audioTrack *format.Format) (bool, error) { +func hasAudio(md amf0.Object) (bool, error) { v, ok := md.Get("audiocodecid") if !ok { return false, nil @@ -87,17 +94,9 @@ func hasAudio(md amf0.Object, audioTrack *format.Format) (bool, error) { case 0: return false, nil - case message.CodecMPEG4Audio, message.CodecLPCM: - return true, nil - - case message.CodecMPEG1Audio: - *audioTrack = &format.MPEG1Audio{} - return true, nil - - case message.CodecPCMA: - return true, nil - - case message.CodecPCMU: + case message.CodecMPEG4Audio, message.CodecMPEG1Audio, + message.CodecLPCM, message.CodecPCMA, message.CodecPCMU, + float64(message.FourCCOpus), float64(message.FourCCAC3): return true, nil } @@ -120,7 +119,7 @@ func h265FindNALU(array []mp4.HEVCNaluArray, typ h265.NALUType) []byte { return nil } -func trackFromH264DecoderConfig(data []byte) (format.Format, error) { +func h264TrackFromConfig(data []byte) (*format.H264, error) { var conf h264conf.Conf err := conf.Unmarshal(data) if err != nil { @@ -135,7 +134,7 @@ func trackFromH264DecoderConfig(data []byte) (format.Format, error) { }, nil } -func trackFromAACDecoderConfig(data []byte) (format.Format, error) { +func mpeg4AudioTrackFromConfig(data []byte) (*format.MPEG4Audio, error) { var mpegConf mpeg4audio.Config err := mpegConf.Unmarshal(data) if err != nil { @@ -151,7 +150,209 @@ func trackFromAACDecoderConfig(data []byte) (format.Format, error) { }, nil } -func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, format.Format, error) { +func audioTrackFromData(msg *message.Audio) (format.Format, error) { + switch { + case msg.Codec == message.CodecMPEG4Audio && + msg.AACType == message.AudioAACTypeConfig: + return mpeg4AudioTrackFromConfig(msg.Payload) + + case msg.Codec == message.CodecMPEG1Audio: + return &format.MPEG1Audio{}, nil + + case msg.Codec == message.CodecPCMA: + return &format.G711{ + PayloadTyp: 8, + MULaw: false, + SampleRate: 8000, + ChannelCount: func() int { + if msg.IsStereo { + return 2 + } + return 1 + }(), + }, nil + + case msg.Codec == message.CodecPCMU: + return &format.G711{ + PayloadTyp: 0, + MULaw: true, + SampleRate: 8000, + ChannelCount: func() int { + if msg.IsStereo { + return 2 + } + return 1 + }(), + }, nil + + case msg.Codec == message.CodecLPCM: + return &format.LPCM{ + PayloadTyp: 96, + BitDepth: func() int { + if msg.Depth == message.Depth16 { + return 16 + } + return 8 + }(), + SampleRate: audioRateRTMPToInt(msg.Rate), + ChannelCount: func() int { + if msg.IsStereo { + return 2 + } + return 1 + }(), + }, nil + + default: + panic("should not happen") + } +} + +func videoTrackFromSequenceStart(msg *message.VideoExSequenceStart) (format.Format, error) { + switch msg.FourCC { + case message.FourCCAV1: + var av1c mp4.Av1C + _, err := mp4.Unmarshal(bytes.NewReader(msg.Config), uint64(len(msg.Config)), &av1c, mp4.Context{}) + if err != nil { + return nil, fmt.Errorf("invalid AV1 configuration: %w", err) + } + + // parse sequence header and metadata contained in ConfigOBUs, but do not use them + _, err = av1.BitstreamUnmarshal(av1c.ConfigOBUs, false) + if err != nil { + return nil, fmt.Errorf("invalid AV1 configuration: %w", err) + } + + return &format.AV1{ + PayloadTyp: 96, + }, nil + + case message.FourCCVP9: + var vpcc mp4.VpcC + _, err := mp4.Unmarshal(bytes.NewReader(msg.Config), uint64(len(msg.Config)), &vpcc, mp4.Context{}) + if err != nil { + return nil, fmt.Errorf("invalid VP9 configuration: %w", err) + } + + return &format.VP9{ + PayloadTyp: 96, + }, nil + + case message.FourCCHEVC: + var hvcc mp4.HvcC + _, err := mp4.Unmarshal(bytes.NewReader(msg.Config), uint64(len(msg.Config)), &hvcc, mp4.Context{}) + if err != nil { + return nil, fmt.Errorf("invalid H265 configuration: %w", err) + } + + vps := h265FindNALU(hvcc.NaluArrays, h265.NALUType_VPS_NUT) + sps := h265FindNALU(hvcc.NaluArrays, h265.NALUType_SPS_NUT) + pps := h265FindNALU(hvcc.NaluArrays, h265.NALUType_PPS_NUT) + if vps == nil || sps == nil || pps == nil { + return nil, fmt.Errorf("H265 parameters are missing") + } + + return &format.H265{ + PayloadTyp: 96, + VPS: vps, + SPS: sps, + PPS: pps, + }, nil + + case message.FourCCAVC: + var avcc mp4.AVCDecoderConfiguration + avcc.SetType(mp4.BoxTypeAvcC()) + _, err := mp4.Unmarshal(bytes.NewReader(msg.Config), uint64(len(msg.Config)), &avcc, mp4.Context{}) + if err != nil { + return nil, fmt.Errorf("invalid H264 configuration: %w", err) + } + + if len(avcc.SequenceParameterSets) != 1 || len(avcc.PictureParameterSets) != 1 { + return nil, fmt.Errorf("H264 parameters are missing") + } + + return &format.H264{ + PayloadTyp: 96, + SPS: avcc.SequenceParameterSets[0].NALUnit, + PPS: avcc.PictureParameterSets[0].NALUnit, + }, nil + + default: + panic("should not happen") + } +} + +func audioTrackFromExtendedMessages( + start *message.AudioExSequenceStart, + frames *message.AudioExCodedFrames, +) (format.Format, error) { + if start.FourCC != frames.FourCC { + return nil, fmt.Errorf("AudioExSequenceStart FourCC and AudioExCodedFrames are different") + } + + switch start.FourCC { + case message.FourCCOpus: + if len(frames.Payload) < 1 { + return nil, fmt.Errorf("invalid Opus frame") + } + + return &format.Opus{ + PayloadTyp: 96, + ChannelCount: int(start.OpusIDHeader.ChannelCount), + }, nil + + case message.FourCCAC3: + if len(frames.Payload) < 6 { + return nil, fmt.Errorf("invalid AC-3 frame") + } + + var syncInfo ac3.SyncInfo + err := syncInfo.Unmarshal(frames.Payload) + if err != nil { + return nil, fmt.Errorf("invalid AC-3 frame: %w", err) + } + + var bsi ac3.BSI + err = bsi.Unmarshal(frames.Payload[5:]) + if err != nil { + return nil, fmt.Errorf("invalid AC-3 frame: %w", err) + } + + return &format.AC3{ + PayloadTyp: 96, + SampleRate: syncInfo.SampleRate(), + ChannelCount: bsi.ChannelCount(), + }, nil + + case message.FourCCMP4A: + return &format.MPEG4Audio{ + PayloadTyp: 96, + Config: start.MP4AudioConfig, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, + }, nil + + default: + panic("should not happen") + } +} + +func allTracksSetupped(tracks map[uint8]format.Format) bool { + if len(tracks) == 0 { + return true + } + + for _, track := range tracks { + if track == nil { + return false + } + } + + return true +} + +func tracksFromMetadata(conn *Conn, payload []interface{}) (map[uint8]format.Format, map[uint8]format.Format, error) { if len(payload) != 1 { return nil, nil, fmt.Errorf("invalid metadata") } @@ -168,15 +369,12 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma return nil, nil, fmt.Errorf("invalid metadata") } - var videoTrack format.Format - var audioTrack format.Format - hasVideo, err := hasVideo(md) if err != nil { return nil, nil, err } - hasAudio, err := hasAudio(md, &audioTrack) + hasAudio, err := hasAudio(md) if err != nil { return nil, nil, err } @@ -185,15 +383,74 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma return nil, nil, nil } + videoTracks := make(map[uint8]format.Format) + if hasVideo { + videoTracks[0] = nil + } + + audioTracks := make(map[uint8]format.Format) + if hasAudio { + audioTracks[0] = nil + } + + handleVideoSequenceStart := func(trackID uint8, msg *message.VideoExSequenceStart) error { + if !hasVideo { + return fmt.Errorf("unexpected video packet") + } + + if videoTracks[trackID] != nil { + return fmt.Errorf("video track %d already setupped", trackID) + } + + videoTracks[trackID], err = videoTrackFromSequenceStart(msg) + if err != nil { + return err + } + + return nil + } + + audioSequenceStarts := make(map[uint8]*message.AudioExSequenceStart) + + handleAudioSequenceStart := func(trackID uint8, msg *message.AudioExSequenceStart) error { + if !hasAudio { + return fmt.Errorf("unexpected audio packet") + } + + if audioSequenceStarts[trackID] != nil { + return fmt.Errorf("audio track %d already setupped", trackID) + } + + audioSequenceStarts[trackID] = msg + return nil + } + + handleAudioCodedFrames := func(trackID uint8, msg *message.AudioExCodedFrames) error { + if !hasAudio { + return fmt.Errorf("unexpected audio packet") + } + + if audioTracks[trackID] != nil { + return nil + } + + sequenceStart := audioSequenceStarts[trackID] + if sequenceStart == nil { + return fmt.Errorf("sequence start of audio track %d not received", trackID) + } + + audioTracks[trackID], err = audioTrackFromExtendedMessages(sequenceStart, msg) + if err != nil { + return err + } + + return nil + } + firstReceived := false var startTime time.Duration for { - if (!hasVideo || videoTrack != nil) && - (!hasAudio || audioTrack != nil) { - return videoTrack, audioTrack, nil - } - msg, err := conn.Read() if err != nil { return nil, nil, err @@ -210,178 +467,99 @@ func tracksFromMetadata(conn *Conn, payload []interface{}) (format.Format, forma startTime = msg.DTS } - if videoTrack == nil { - if msg.Type == message.VideoTypeConfig { - videoTrack, err = trackFromH264DecoderConfig(msg.Payload) - if err != nil { - return nil, nil, err - } - - // format used by OBS < 29.1 to publish H265 - } else if msg.Type == message.VideoTypeAU && msg.IsKeyFrame { - var nalus [][]byte - nalus, err = h264.AVCCUnmarshal(msg.Payload) - if err != nil { - if errors.Is(err, h264.ErrAVCCNoNALUs) { - continue - } - return nil, nil, err - } - - var vps []byte - var sps []byte - var pps []byte + if msg.Type == message.VideoTypeConfig { + if videoTracks[0] != nil { + return nil, nil, fmt.Errorf("video track 0 already setupped") + } - for _, nalu := range nalus { - typ := h265.NALUType((nalu[0] >> 1) & 0b111111) + videoTracks[0], err = h264TrackFromConfig(msg.Payload) + if err != nil { + return nil, nil, err + } + } - switch typ { - case h265.NALUType_VPS_NUT: - vps = nalu + case *message.VideoExSequenceStart: + err = handleVideoSequenceStart(0, msg) + if err != nil { + return nil, nil, err + } - case h265.NALUType_SPS_NUT: - sps = nalu + case *message.Audio: + if !hasAudio { + return nil, nil, fmt.Errorf("unexpected audio packet") + } - case h265.NALUType_PPS_NUT: - pps = nalu - } - } + if len(msg.Payload) == 0 { + continue + } - if vps != nil && sps != nil && pps != nil { - videoTrack = &format.H265{ - PayloadTyp: 96, - VPS: vps, - SPS: sps, - PPS: pps, - } - } + if audioTracks[0] == nil { + audioTracks[0], err = audioTrackFromData(msg) + if err != nil { + return nil, nil, err } } - // video was found, but audio was not - if videoTrack != nil && (msg.DTS-startTime) >= analyzePeriod { - return videoTrack, nil, nil + case *message.AudioExSequenceStart: + err := handleAudioSequenceStart(0, msg) + if err != nil { + return nil, nil, err + } + + case *message.AudioExCodedFrames: + err := handleAudioCodedFrames(0, msg) + if err != nil { + return nil, nil, err } - case *message.ExtendedSequenceStart: + case *message.VideoExMultitrack: if !hasVideo { return nil, nil, fmt.Errorf("unexpected video packet") } - if videoTrack == nil { - switch msg.FourCC { - case message.FourCCHEVC: - var hvcc mp4.HvcC - _, err = mp4.Unmarshal(bytes.NewReader(msg.Config), uint64(len(msg.Config)), &hvcc, mp4.Context{}) - if err != nil { - return nil, nil, fmt.Errorf("invalid H265 configuration: %w", err) - } - - vps := h265FindNALU(hvcc.NaluArrays, h265.NALUType_VPS_NUT) - sps := h265FindNALU(hvcc.NaluArrays, h265.NALUType_SPS_NUT) - pps := h265FindNALU(hvcc.NaluArrays, h265.NALUType_PPS_NUT) - if vps == nil || sps == nil || pps == nil { - return nil, nil, fmt.Errorf("H265 parameters are missing") - } - - videoTrack = &format.H265{ - PayloadTyp: 96, - VPS: vps, - SPS: sps, - PPS: pps, - } - - case message.FourCCAV1: - var av1c mp4.Av1C - _, err = mp4.Unmarshal(bytes.NewReader(msg.Config), uint64(len(msg.Config)), &av1c, mp4.Context{}) - if err != nil { - return nil, nil, fmt.Errorf("invalid AV1 configuration: %w", err) - } - - // parse sequence header and metadata contained in ConfigOBUs, but do not use them - _, err = av1.BitstreamUnmarshal(av1c.ConfigOBUs, false) - if err != nil { - return nil, nil, fmt.Errorf("invalid AV1 configuration: %w", err) - } - - videoTrack = &format.AV1{ - PayloadTyp: 96, - } - - default: // VP9 - var vpcc mp4.VpcC - _, err = mp4.Unmarshal(bytes.NewReader(msg.Config), uint64(len(msg.Config)), &vpcc, mp4.Context{}) - if err != nil { - return nil, nil, fmt.Errorf("invalid VP9 configuration: %w", err) - } + if _, ok := videoTracks[msg.TrackID]; !ok { + videoTracks[msg.TrackID] = nil + } - videoTrack = &format.VP9{ - PayloadTyp: 96, - } + if wmsg, ok := msg.Wrapped.(*message.VideoExSequenceStart); ok { + err := handleVideoSequenceStart(msg.TrackID, wmsg) + if err != nil { + return nil, nil, err } } - case *message.Audio: + case *message.AudioExMultitrack: if !hasAudio { return nil, nil, fmt.Errorf("unexpected audio packet") } - if audioTrack == nil { - if len(msg.Payload) == 0 { - continue + if _, ok := audioTracks[msg.TrackID]; !ok { + audioTracks[msg.TrackID] = nil + } + + switch wmsg := msg.Wrapped.(type) { + case *message.AudioExSequenceStart: + err := handleAudioSequenceStart(msg.TrackID, wmsg) + if err != nil { + return nil, nil, err } - switch { - case msg.Codec == message.CodecMPEG4Audio && - msg.AACType == message.AudioAACTypeConfig: - audioTrack, err = trackFromAACDecoderConfig(msg.Payload) - if err != nil { - return nil, nil, err - } - case msg.Codec == message.CodecPCMA: - audioTrack = &format.G711{ - PayloadTyp: 8, - MULaw: false, - SampleRate: 8000, - ChannelCount: func() int { - if msg.IsStereo { - return 2 - } - return 1 - }(), - } + case *message.AudioExCodedFrames: + err := handleAudioCodedFrames(msg.TrackID, wmsg) + if err != nil { + return nil, nil, err + } + } + } - case msg.Codec == message.CodecPCMU: - audioTrack = &format.G711{ - PayloadTyp: 0, - MULaw: true, - SampleRate: 8000, - ChannelCount: func() int { - if msg.IsStereo { - return 2 - } - return 1 - }(), - } + if allTracksSetupped(videoTracks) && allTracksSetupped(audioTracks) { + return videoTracks, audioTracks, nil + } - case msg.Codec == message.CodecLPCM: - audioTrack = &format.LPCM{ - PayloadTyp: 96, - BitDepth: func() int { - if msg.Depth == message.Depth16 { - return 16 - } - return 8 - }(), - SampleRate: audioRateRTMPToInt(msg.Rate), - ChannelCount: func() int { - if msg.IsStereo { - return 2 - } - return 1 - }(), - } - } + // video was found but audio was not + if msg, ok := msg.(*message.Video); ok { + if len(videoTracks) == 1 && videoTracks[0] != nil && (msg.DTS-startTime) >= analyzePeriod { + return videoTracks, nil, nil } } } @@ -405,13 +583,13 @@ outer: if msg.Type == message.VideoTypeConfig { if videoTrack == nil { var err error - videoTrack, err = trackFromH264DecoderConfig(msg.Payload) + videoTrack, err = h264TrackFromConfig(msg.Payload) if err != nil { return nil, nil, err } // stop the analysis if both tracks are found - if videoTrack != nil && audioTrack != nil { + if audioTrack != nil { return videoTrack, audioTrack, nil } } @@ -430,13 +608,13 @@ outer: if msg.AACType == message.AudioAACTypeConfig { if audioTrack == nil { var err error - audioTrack, err = trackFromAACDecoderConfig(msg.Payload) + audioTrack, err = mpeg4AudioTrackFromConfig(msg.Payload) if err != nil { return nil, nil, err } // stop the analysis if both tracks are found - if videoTrack != nil && audioTrack != nil { + if videoTrack != nil { return videoTrack, audioTrack, nil } } @@ -464,10 +642,10 @@ outer: // Reader is a wrapper around Conn that provides utilities to demux incoming data. type Reader struct { conn *Conn - videoTrack format.Format - audioTrack format.Format - onDataVideo func(message.Message) error - onDataAudio func(*message.Audio) error + videoTracks map[uint8]format.Format + audioTracks map[uint8]format.Format + onVideoData map[uint8]func(message.Message) error + onAudioData map[uint8]func(message.Message) error } // NewReader allocates a Reader. @@ -477,15 +655,18 @@ func NewReader(conn *Conn) (*Reader, error) { } var err error - r.videoTrack, r.audioTrack, err = r.readTracks() + r.videoTracks, r.audioTracks, err = r.readTracks() if err != nil { return nil, err } + r.onVideoData = make(map[uint8]func(message.Message) error) + r.onAudioData = make(map[uint8]func(message.Message) error) + return r, nil } -func (r *Reader) readTracks() (format.Format, format.Format, error) { +func (r *Reader) readTracks() (map[uint8]format.Format, map[uint8]format.Format, error) { for { msg, err := r.conn.Read() if err != nil { @@ -518,68 +699,117 @@ func (r *Reader) readTracks() (format.Format, format.Format, error) { if len(payload) >= 1 { if s, ok := payload[0].(string); ok && s == "onMetaData" { - videoTrack, audioTrack, err := tracksFromMetadata(r.conn, payload[1:]) + var videoTracks map[uint8]format.Format + var audioTracks map[uint8]format.Format + videoTracks, audioTracks, err = tracksFromMetadata(r.conn, payload[1:]) if err != nil { return nil, nil, err } - if videoTrack != nil || audioTrack != nil { - return videoTrack, audioTrack, nil + if len(videoTracks) != 0 || len(audioTracks) != 0 { + return videoTracks, audioTracks, nil } } } } - return tracksFromMessages(r.conn, msg) + videoTrack, audioTrack, err := tracksFromMessages(r.conn, msg) + if err != nil { + return nil, nil, err + } + + videoTracks := make(map[uint8]format.Format) + if videoTrack != nil { + videoTracks[0] = videoTrack + } + + audioTracks := make(map[uint8]format.Format) + if audioTrack != nil { + audioTracks[0] = audioTrack + } + + return videoTracks, audioTracks, nil } } // Tracks returns detected tracks -func (r *Reader) Tracks() (format.Format, format.Format) { - return r.videoTrack, r.audioTrack +func (r *Reader) Tracks() []format.Format { + ret := make([]format.Format, len(r.videoTracks)+len(r.audioTracks)) + i := 0 + + for _, track := range r.videoTracks { + ret[i] = track + i++ + } + for _, track := range r.audioTracks { + ret[i] = track + i++ + } + + return ret +} + +func (r *Reader) videoTrackID(t format.Format) uint8 { + for id, track := range r.videoTracks { + if track == t { + return id + } + } + return 255 +} + +func (r *Reader) audioTrackID(t format.Format) uint8 { + for id, track := range r.audioTracks { + if track == t { + return id + } + } + return 255 } // OnDataAV1 sets a callback that is called when AV1 data is received. -func (r *Reader) OnDataAV1(cb OnDataAV1Func) { - r.onDataVideo = func(msg message.Message) error { - if msg, ok := msg.(*message.ExtendedCodedFrames); ok { +func (r *Reader) OnDataAV1(track *format.AV1, cb OnDataAV1Func) { + r.onVideoData[r.videoTrackID(track)] = func(msg message.Message) error { + switch msg := msg.(type) { + case *message.VideoExFramesX: tu, err := av1.BitstreamUnmarshal(msg.Payload, true) if err != nil { return fmt.Errorf("unable to decode bitstream: %w", err) } cb(msg.DTS, tu) + + case *message.VideoExCodedFrames: + tu, err := av1.BitstreamUnmarshal(msg.Payload, true) + if err != nil { + return fmt.Errorf("unable to decode bitstream: %w", err) + } + + cb(msg.DTS+msg.PTSDelta, tu) } return nil } } // OnDataVP9 sets a callback that is called when VP9 data is received. -func (r *Reader) OnDataVP9(cb OnDataVP9Func) { - r.onDataVideo = func(msg message.Message) error { - if msg, ok := msg.(*message.ExtendedCodedFrames); ok { +func (r *Reader) OnDataVP9(track *format.VP9, cb OnDataVP9Func) { + r.onVideoData[r.videoTrackID(track)] = func(msg message.Message) error { + switch msg := msg.(type) { + case *message.VideoExFramesX: cb(msg.DTS, msg.Payload) + + case *message.VideoExCodedFrames: + cb(msg.DTS+msg.PTSDelta, msg.Payload) } return nil } } // OnDataH265 sets a callback that is called when H265 data is received. -func (r *Reader) OnDataH265(cb OnDataH26xFunc) { - r.onDataVideo = func(msg message.Message) error { +func (r *Reader) OnDataH265(track *format.H265, cb OnDataH26xFunc) { + r.onVideoData[r.videoTrackID(track)] = func(msg message.Message) error { switch msg := msg.(type) { - case *message.Video: - au, err := h264.AVCCUnmarshal(msg.Payload) - if err != nil { - if errors.Is(err, h264.ErrAVCCNoNALUs) { - return nil - } - return fmt.Errorf("unable to decode AVCC: %w", err) - } - - cb(msg.DTS+msg.PTSDelta, au) - - case *message.ExtendedFramesX: + case *message.VideoExFramesX: au, err := h264.AVCCUnmarshal(msg.Payload) if err != nil { if errors.Is(err, h264.ErrAVCCNoNALUs) { @@ -590,7 +820,7 @@ func (r *Reader) OnDataH265(cb OnDataH26xFunc) { cb(msg.DTS, au) - case *message.ExtendedCodedFrames: + case *message.VideoExCodedFrames: au, err := h264.AVCCUnmarshal(msg.Payload) if err != nil { if errors.Is(err, h264.ErrAVCCNoNALUs) { @@ -601,15 +831,15 @@ func (r *Reader) OnDataH265(cb OnDataH26xFunc) { cb(msg.DTS+msg.PTSDelta, au) } - return nil } } // OnDataH264 sets a callback that is called when H264 data is received. -func (r *Reader) OnDataH264(cb OnDataH26xFunc) { - r.onDataVideo = func(msg message.Message) error { - if msg, ok := msg.(*message.Video); ok { +func (r *Reader) OnDataH264(track *format.H264, cb OnDataH26xFunc) { + r.onVideoData[r.videoTrackID(track)] = func(msg message.Message) error { + switch msg := msg.(type) { + case *message.Video: switch msg.Type { case message.VideoTypeConfig: var conf h264conf.Conf @@ -636,16 +866,55 @@ func (r *Reader) OnDataH264(cb OnDataH26xFunc) { cb(msg.DTS+msg.PTSDelta, au) } + + return nil + + case *message.VideoExFramesX: + au, err := h264.AVCCUnmarshal(msg.Payload) + if err != nil { + if errors.Is(err, h264.ErrAVCCNoNALUs) { + return nil + } + return fmt.Errorf("unable to decode AVCC: %w", err) + } + + cb(msg.DTS, au) + + case *message.VideoExCodedFrames: + au, err := h264.AVCCUnmarshal(msg.Payload) + if err != nil { + if errors.Is(err, h264.ErrAVCCNoNALUs) { + return nil + } + return fmt.Errorf("unable to decode AVCC: %w", err) + } + + cb(msg.DTS+msg.PTSDelta, au) } + return nil + } +} +// OnDataOpus sets a callback that is called when Opus data is received. +func (r *Reader) OnDataOpus(track *format.Opus, cb OnDataOpusFunc) { + r.onAudioData[r.audioTrackID(track)] = func(msg message.Message) error { + if msg, ok := msg.(*message.AudioExCodedFrames); ok { + cb(msg.DTS, msg.Payload) + } return nil } } // OnDataMPEG4Audio sets a callback that is called when MPEG-4 Audio data is received. -func (r *Reader) OnDataMPEG4Audio(cb OnDataMPEG4AudioFunc) { - r.onDataAudio = func(msg *message.Audio) error { - if msg.AACType == message.AudioAACTypeAU { +func (r *Reader) OnDataMPEG4Audio(track *format.MPEG4Audio, cb OnDataMPEG4AudioFunc) { + r.onAudioData[r.audioTrackID(track)] = func(msg message.Message) error { + switch msg := msg.(type) { + case *message.Audio: + if msg.AACType == message.AudioAACTypeAU { + cb(msg.DTS, msg.Payload) + } + + case *message.AudioExCodedFrames: cb(msg.DTS, msg.Payload) } return nil @@ -653,43 +922,65 @@ func (r *Reader) OnDataMPEG4Audio(cb OnDataMPEG4AudioFunc) { } // OnDataMPEG1Audio sets a callback that is called when MPEG-1 Audio data is received. -func (r *Reader) OnDataMPEG1Audio(cb OnDataMPEG1AudioFunc) { - r.onDataAudio = func(msg *message.Audio) error { - cb(msg.DTS, msg.Payload) +func (r *Reader) OnDataMPEG1Audio(track *format.MPEG1Audio, cb OnDataMPEG1AudioFunc) { + r.onAudioData[r.audioTrackID(track)] = func(msg message.Message) error { + switch msg := msg.(type) { + case *message.Audio: + cb(msg.DTS, msg.Payload) + + case *message.AudioExCodedFrames: + cb(msg.DTS, msg.Payload) + } + return nil + } +} + +// OnDataAC3 sets a callback that is called when AC-3 data is received. +func (r *Reader) OnDataAC3(track *format.AC3, cb OnDataAC3Func) { + r.onAudioData[r.audioTrackID(track)] = func(msg message.Message) error { + if msg, ok := msg.(*message.AudioExCodedFrames); ok { + cb(msg.DTS, msg.Payload) + } return nil } } // OnDataG711 sets a callback that is called when G711 data is received. -func (r *Reader) OnDataG711(cb OnDataG711Func) { - r.onDataAudio = func(msg *message.Audio) error { - cb(msg.DTS, msg.Payload) +func (r *Reader) OnDataG711(track *format.G711, cb OnDataG711Func) { + r.onAudioData[r.audioTrackID(track)] = func(msg message.Message) error { + if msg, ok := msg.(*message.Audio); ok { + cb(msg.DTS, msg.Payload) + } return nil } } // OnDataLPCM sets a callback that is called when LPCM data is received. -func (r *Reader) OnDataLPCM(cb OnDataLPCMFunc) { - bitDepth := r.audioTrack.(*format.LPCM).BitDepth +func (r *Reader) OnDataLPCM(track *format.LPCM, cb OnDataLPCMFunc) { + bitDepth := track.BitDepth if bitDepth == 16 { - r.onDataAudio = func(msg *message.Audio) error { - le := len(msg.Payload) - if le%2 != 0 { - return fmt.Errorf("invalid payload length: %d", le) - } + r.onAudioData[r.audioTrackID(track)] = func(msg message.Message) error { + if msg, ok := msg.(*message.Audio); ok { + le := len(msg.Payload) + if le%2 != 0 { + return fmt.Errorf("invalid payload length: %d", le) + } - // convert from little endian to big endian - for i := 0; i < le; i += 2 { - msg.Payload[i], msg.Payload[i+1] = msg.Payload[i+1], msg.Payload[i] - } + // convert from little endian to big endian + for i := 0; i < le; i += 2 { + msg.Payload[i], msg.Payload[i+1] = msg.Payload[i+1], msg.Payload[i] + } - cb(msg.DTS, msg.Payload) + cb(msg.DTS, msg.Payload) + } return nil } } else { - r.onDataAudio = func(msg *message.Audio) error { - cb(msg.DTS, msg.Payload) + r.onAudioData[r.audioTrackID(track)] = func(msg message.Message) error { + if msg, ok := msg.(*message.Audio); ok { + cb(msg.DTS, msg.Payload) + } return nil } } @@ -703,19 +994,38 @@ func (r *Reader) Read() error { } switch msg := msg.(type) { - case *message.Video, *message.ExtendedFramesX, *message.ExtendedCodedFrames: - if r.onDataVideo == nil { - return fmt.Errorf("received a video packet, but track is not set up") + case *message.Video, *message.VideoExCodedFrames, *message.VideoExFramesX: + if r.videoTracks[0] == nil { + return fmt.Errorf("received a packet for video track 0, but track is not set up") } - return r.onDataVideo(msg) + return r.onVideoData[0](msg) - case *message.Audio: - if r.onDataAudio == nil { - return fmt.Errorf("received an audio packet, but track is not set up") + case *message.Audio, *message.AudioExCodedFrames: + if r.audioTracks[0] == nil { + return fmt.Errorf("received a packet for audio track 0, but track is not set up") } - return r.onDataAudio(msg) + return r.onAudioData[0](msg) + + case *message.VideoExMultitrack: + switch wmsg := msg.Wrapped.(type) { + case *message.VideoExCodedFrames, *message.VideoExFramesX: + if r.videoTracks[msg.TrackID] == nil { + return fmt.Errorf("received a packet for video track %d, but track is not set up", msg.TrackID) + } + + return r.onVideoData[msg.TrackID](wmsg) + } + + case *message.AudioExMultitrack: + if wmsg, ok := msg.Wrapped.(*message.AudioExCodedFrames); ok { + if r.audioTracks[msg.TrackID] == nil { + return fmt.Errorf("received a packet for audio track %d, but track is not set up", msg.TrackID) + } + + return r.onAudioData[msg.TrackID](wmsg) + } } return nil diff --git a/internal/protocols/rtmp/reader_test.go b/internal/protocols/rtmp/reader_test.go index f5a2e9a5c5d8..51f45d8a04fb 100644 --- a/internal/protocols/rtmp/reader_test.go +++ b/internal/protocols/rtmp/reader_test.go @@ -7,7 +7,6 @@ import ( "github.com/abema/go-mp4" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/h265" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/stretchr/testify/require" @@ -73,29 +72,30 @@ func TestReadTracks(t *testing.T) { } for _, ca := range []struct { - name string - videoTrack format.Format - audioTrack format.Format - messages []message.Message + name string + tracks []format.Format + messages []message.Message }{ { "h264 + aac", - &format.H264{ - PayloadTyp: 96, - SPS: test.FormatH264.SPS, - PPS: test.FormatH264.PPS, - PacketizationMode: 1, - }, - &format.MPEG4Audio{ - PayloadTyp: 96, - Config: &mpeg4audio.Config{ - Type: 2, - SampleRate: 44100, - ChannelCount: 2, + []format.Format{ + &format.H264{ + PayloadTyp: 96, + SPS: test.FormatH264.SPS, + PPS: test.FormatH264.PPS, + PacketizationMode: 1, + }, + &format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, }, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, }, []message.Message{ &message.DataAMF0{ @@ -160,13 +160,14 @@ func TestReadTracks(t *testing.T) { }, { "h264", - &format.H264{ - PayloadTyp: 96, - SPS: test.FormatH264.SPS, - PPS: test.FormatH264.PPS, - PacketizationMode: 1, + []format.Format{ + &format.H264{ + PayloadTyp: 96, + SPS: test.FormatH264.SPS, + PPS: test.FormatH264.PPS, + PacketizationMode: 1, + }, }, - nil, []message.Message{ &message.DataAMF0{ ChunkStreamID: 4, @@ -212,22 +213,24 @@ func TestReadTracks(t *testing.T) { }, { "h264 + aac, issue mediamtx/386 (missing metadata)", - &format.H264{ - PayloadTyp: 96, - SPS: test.FormatH264.SPS, - PPS: test.FormatH264.PPS, - PacketizationMode: 1, - }, - &format.MPEG4Audio{ - PayloadTyp: 96, - Config: &mpeg4audio.Config{ - Type: 2, - SampleRate: 44100, - ChannelCount: 2, + []format.Format{ + &format.H264{ + PayloadTyp: 96, + SPS: test.FormatH264.SPS, + PPS: test.FormatH264.PPS, + PacketizationMode: 1, + }, + &format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, }, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, }, []message.Message{ &message.Video{ @@ -266,22 +269,24 @@ func TestReadTracks(t *testing.T) { }, { "h264 + aac, issue mediamtx/3301 (metadata without tracks)", - &format.H264{ - PayloadTyp: 96, - SPS: test.FormatH264.SPS, - PPS: test.FormatH264.PPS, - PacketizationMode: 1, - }, - &format.MPEG4Audio{ - PayloadTyp: 96, - Config: &mpeg4audio.Config{ - Type: 2, - SampleRate: 44100, - ChannelCount: 2, + []format.Format{ + &format.H264{ + PayloadTyp: 96, + SPS: test.FormatH264.SPS, + PPS: test.FormatH264.PPS, + PacketizationMode: 1, + }, + &format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, }, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, }, []message.Message{ &message.DataAMF0{ @@ -338,17 +343,18 @@ func TestReadTracks(t *testing.T) { }, { "aac, issue mediamtx/386 (missing metadata)", - nil, - &format.MPEG4Audio{ - PayloadTyp: 96, - Config: &mpeg4audio.Config{ - Type: 2, - SampleRate: 44100, - ChannelCount: 2, + []format.Format{ + &format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, }, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, }, []message.Message{ &message.Audio{ @@ -392,17 +398,18 @@ func TestReadTracks(t *testing.T) { }, { "aac, issue mediamtx/3414 (empty audio payload)", - nil, - &format.MPEG4Audio{ - PayloadTyp: 96, - Config: &mpeg4audio.Config{ - Type: 2, - SampleRate: 44100, - ChannelCount: 2, + []format.Format{ + &format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, }, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, }, []message.Message{ &message.DataAMF0{ @@ -461,24 +468,26 @@ func TestReadTracks(t *testing.T) { }, }, }, - { + /*{ "h265 + aac, obs studio pre 29.1 h265", - &format.H265{ - PayloadTyp: 96, - VPS: test.FormatH265.VPS, - SPS: test.FormatH265.SPS, - PPS: test.FormatH265.PPS, - }, - &format.MPEG4Audio{ - PayloadTyp: 96, - Config: &mpeg4audio.Config{ - Type: 2, - SampleRate: 44100, - ChannelCount: 2, + []format.Format{ + &format.H265{ + PayloadTyp: 96, + VPS: test.FormatH265.VPS, + SPS: test.FormatH265.SPS, + PPS: test.FormatH265.PPS, + }, + &format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, }, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, }, []message.Message{ &message.DataAMF0{ @@ -542,16 +551,17 @@ func TestReadTracks(t *testing.T) { }(), }, }, - }, + },*/ { "h265, issue mediamtx/2232 (xsplit broadcaster)", - &format.H265{ - PayloadTyp: 96, - VPS: test.FormatH265.VPS, - SPS: test.FormatH265.SPS, - PPS: test.FormatH265.PPS, + []format.Format{ + &format.H265{ + PayloadTyp: 96, + VPS: test.FormatH265.VPS, + SPS: test.FormatH265.SPS, + PPS: test.FormatH265.PPS, + }, }, - nil, []message.Message{ &message.DataAMF0{ ChunkStreamID: 4, @@ -579,7 +589,7 @@ func TestReadTracks(t *testing.T) { }, }, }, - &message.ExtendedSequenceStart{ + &message.VideoExSequenceStart{ ChunkStreamID: 4, MessageStreamID: 0x1000000, FourCC: message.FourCCHEVC, @@ -594,13 +604,14 @@ func TestReadTracks(t *testing.T) { }, { "h265, obs 30.0", - &format.H265{ - PayloadTyp: 96, - VPS: test.FormatH265.VPS, - SPS: test.FormatH265.SPS, - PPS: test.FormatH265.PPS, + []format.Format{ + &format.H265{ + PayloadTyp: 96, + VPS: test.FormatH265.VPS, + SPS: test.FormatH265.SPS, + PPS: test.FormatH265.PPS, + }, }, - nil, []message.Message{ &message.DataAMF0{ ChunkStreamID: 4, @@ -628,7 +639,7 @@ func TestReadTracks(t *testing.T) { }, }, }, - &message.ExtendedSequenceStart{ + &message.VideoExSequenceStart{ ChunkStreamID: 4, MessageStreamID: 0x1000000, FourCC: message.FourCCHEVC, @@ -643,10 +654,11 @@ func TestReadTracks(t *testing.T) { }, { "av1, ffmpeg", - &format.AV1{ - PayloadTyp: 96, + []format.Format{ + &format.AV1{ + PayloadTyp: 96, + }, }, - nil, []message.Message{ &message.DataAMF0{ ChunkStreamID: 4, @@ -690,7 +702,7 @@ func TestReadTracks(t *testing.T) { }, }, }, - &message.ExtendedSequenceStart{ + &message.VideoExSequenceStart{ ChunkStreamID: 6, MessageStreamID: 0x1000000, FourCC: message.FourCCAV1, @@ -704,27 +716,29 @@ func TestReadTracks(t *testing.T) { }, { "h264 + aac, issue mediamtx/2289 (missing videocodecid)", - &format.H264{ - PayloadTyp: 96, - SPS: []byte{ - 0x67, 0x64, 0x00, 0x1f, 0xac, 0x2c, 0x6a, 0x81, - 0x40, 0x16, 0xe9, 0xb8, 0x28, 0x08, 0x2a, 0x00, - 0x00, 0x03, 0x00, 0x02, 0x00, 0x00, 0x03, 0x00, - 0xc9, 0x08, - }, - PPS: []byte{0x68, 0xee, 0x31, 0xb2, 0x1b}, - PacketizationMode: 1, - }, - &format.MPEG4Audio{ - PayloadTyp: 96, - Config: &mpeg4audio.Config{ - Type: 2, - SampleRate: 48000, - ChannelCount: 1, + []format.Format{ + &format.H264{ + PayloadTyp: 96, + SPS: []byte{ + 0x67, 0x64, 0x00, 0x1f, 0xac, 0x2c, 0x6a, 0x81, + 0x40, 0x16, 0xe9, 0xb8, 0x28, 0x08, 0x2a, 0x00, + 0x00, 0x03, 0x00, 0x02, 0x00, 0x00, 0x03, 0x00, + 0xc9, 0x08, + }, + PPS: []byte{0x68, 0xee, 0x31, 0xb2, 0x1b}, + PacketizationMode: 1, + }, + &format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 48000, + ChannelCount: 1, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, }, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, }, []message.Message{ &message.DataAMF0{ @@ -780,13 +794,14 @@ func TestReadTracks(t *testing.T) { }, { "h264, issue mediamtx/2352", - &format.H264{ - PayloadTyp: 96, - SPS: test.FormatH264.SPS, - PPS: test.FormatH264.PPS, - PacketizationMode: 1, + []format.Format{ + &format.H264{ + PayloadTyp: 96, + SPS: test.FormatH264.SPS, + PPS: test.FormatH264.PPS, + PacketizationMode: 1, + }, }, - nil, []message.Message{ &message.DataAMF0{ ChunkStreamID: 8, @@ -845,6 +860,7 @@ func TestReadTracks(t *testing.T) { MessageStreamID: 0x1000000, Codec: 0x7, IsKeyFrame: true, + Type: message.VideoTypeAU, Payload: []uint8{ 5, }, @@ -854,6 +870,7 @@ func TestReadTracks(t *testing.T) { MessageStreamID: 0x1000000, Codec: 0x7, IsKeyFrame: true, + Type: message.VideoTypeAU, DTS: 2 * time.Second, Payload: []uint8{ 5, @@ -863,8 +880,9 @@ func TestReadTracks(t *testing.T) { }, { "mpeg-1 audio", - nil, - &format.MPEG1Audio{}, + []format.Format{ + &format.MPEG1Audio{}, + }, []message.Message{ &message.DataAMF0{ ChunkStreamID: 4, @@ -880,16 +898,26 @@ func TestReadTracks(t *testing.T) { }, }, }, + &message.Audio{ + ChunkStreamID: message.AudioChunkStreamID, + MessageStreamID: 0x1000000, + Codec: message.CodecMPEG1Audio, + Rate: message.Rate44100, + Depth: message.Depth16, + IsStereo: false, + Payload: []byte{1, 2, 3, 4}, + }, }, }, { "pcma", - nil, - &format.G711{ - PayloadTyp: 8, - MULaw: false, - SampleRate: 8000, - ChannelCount: 1, + []format.Format{ + &format.G711{ + PayloadTyp: 8, + MULaw: false, + SampleRate: 8000, + ChannelCount: 1, + }, }, []message.Message{ &message.DataAMF0{ @@ -919,12 +947,13 @@ func TestReadTracks(t *testing.T) { }, { "pcmu", - nil, - &format.G711{ - PayloadTyp: 0, - MULaw: true, - SampleRate: 8000, - ChannelCount: 1, + []format.Format{ + &format.G711{ + PayloadTyp: 0, + MULaw: true, + SampleRate: 8000, + ChannelCount: 1, + }, }, []message.Message{ &message.DataAMF0{ @@ -954,12 +983,13 @@ func TestReadTracks(t *testing.T) { }, { "lpcm gstreamer", - nil, - &format.LPCM{ - PayloadTyp: 96, - BitDepth: 16, - SampleRate: 44100, - ChannelCount: 2, + []format.Format{ + &format.LPCM{ + PayloadTyp: 96, + BitDepth: 16, + SampleRate: 44100, + ChannelCount: 2, + }, }, []message.Message{ &message.DataAMF0{ @@ -1001,9 +1031,8 @@ func TestReadTracks(t *testing.T) { r, err := NewReader(c) require.NoError(t, err) - videoTrack, audioTrack := r.Tracks() - require.Equal(t, ca.videoTrack, videoTrack) - require.Equal(t, ca.audioTrack, audioTrack) + tracks := r.Tracks() + require.Equal(t, ca.tracks, tracks) }) } } diff --git a/internal/protocols/rtmp/to_stream.go b/internal/protocols/rtmp/to_stream.go index 4b8f8b0ce930..eba2006e4812 100644 --- a/internal/protocols/rtmp/to_stream.go +++ b/internal/protocols/rtmp/to_stream.go @@ -26,114 +26,177 @@ func durationToTimestamp(d time.Duration, clockRate int) int64 { // ToStream maps a RTMP stream to a MediaMTX stream. func ToStream(r *Reader, stream **stream.Stream) ([]*description.Media, error) { - videoFormat, audioFormat := r.Tracks() - var medias []*description.Media - if videoFormat != nil { - medi := &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{videoFormat}, - } - medias = append(medias, medi) + for _, track := range r.Tracks() { + ctrack := track - switch videoFormat.(type) { + switch ttrack := track.(type) { case *format.AV1: - r.OnDataAV1(func(pts time.Duration, tu [][]byte) { - (*stream).WriteUnit(medi, videoFormat, &unit.AV1{ + medi := &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{ctrack}, + } + medias = append(medias, medi) + + r.OnDataAV1(ttrack, func(pts time.Duration, tu [][]byte) { + (*stream).WriteUnit(medi, ctrack, &unit.AV1{ Base: unit.Base{ NTP: time.Now(), - PTS: durationToTimestamp(pts, videoFormat.ClockRate()), + PTS: durationToTimestamp(pts, ctrack.ClockRate()), }, TU: tu, }) }) case *format.VP9: - r.OnDataVP9(func(pts time.Duration, frame []byte) { - (*stream).WriteUnit(medi, videoFormat, &unit.VP9{ + medi := &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{ctrack}, + } + medias = append(medias, medi) + + r.OnDataVP9(ttrack, func(pts time.Duration, frame []byte) { + (*stream).WriteUnit(medi, ctrack, &unit.VP9{ Base: unit.Base{ NTP: time.Now(), - PTS: durationToTimestamp(pts, videoFormat.ClockRate()), + PTS: durationToTimestamp(pts, ctrack.ClockRate()), }, Frame: frame, }) }) case *format.H265: - r.OnDataH265(func(pts time.Duration, au [][]byte) { - (*stream).WriteUnit(medi, videoFormat, &unit.H265{ + medi := &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{ctrack}, + } + medias = append(medias, medi) + + r.OnDataH265(ttrack, func(pts time.Duration, au [][]byte) { + (*stream).WriteUnit(medi, ctrack, &unit.H265{ Base: unit.Base{ NTP: time.Now(), - PTS: durationToTimestamp(pts, videoFormat.ClockRate()), + PTS: durationToTimestamp(pts, ctrack.ClockRate()), }, AU: au, }) }) case *format.H264: - r.OnDataH264(func(pts time.Duration, au [][]byte) { - (*stream).WriteUnit(medi, videoFormat, &unit.H264{ + medi := &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{ctrack}, + } + medias = append(medias, medi) + + r.OnDataH264(ttrack, func(pts time.Duration, au [][]byte) { + (*stream).WriteUnit(medi, ctrack, &unit.H264{ Base: unit.Base{ NTP: time.Now(), - PTS: durationToTimestamp(pts, videoFormat.ClockRate()), + PTS: durationToTimestamp(pts, ctrack.ClockRate()), }, AU: au, }) }) - default: - panic("should not happen") - } - } + case *format.Opus: + medi := &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{ctrack}, + } + medias = append(medias, medi) - if audioFormat != nil { - medi := &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{audioFormat}, - } - medias = append(medias, medi) + r.OnDataOpus(ttrack, func(pts time.Duration, packet []byte) { + (*stream).WriteUnit(medi, ctrack, &unit.Opus{ + Base: unit.Base{ + NTP: time.Now(), + PTS: durationToTimestamp(pts, ctrack.ClockRate()), + }, + Packets: [][]byte{packet}, + }) + }) - switch audioFormat.(type) { case *format.MPEG4Audio: - r.OnDataMPEG4Audio(func(pts time.Duration, au []byte) { - (*stream).WriteUnit(medi, audioFormat, &unit.MPEG4Audio{ + medi := &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{ctrack}, + } + medias = append(medias, medi) + + r.OnDataMPEG4Audio(ttrack, func(pts time.Duration, au []byte) { + (*stream).WriteUnit(medi, ctrack, &unit.MPEG4Audio{ Base: unit.Base{ NTP: time.Now(), - PTS: durationToTimestamp(pts, audioFormat.ClockRate()), + PTS: durationToTimestamp(pts, ctrack.ClockRate()), }, AUs: [][]byte{au}, }) }) case *format.MPEG1Audio: - r.OnDataMPEG1Audio(func(pts time.Duration, frame []byte) { - (*stream).WriteUnit(medi, audioFormat, &unit.MPEG1Audio{ + medi := &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{ctrack}, + } + medias = append(medias, medi) + + r.OnDataMPEG1Audio(ttrack, func(pts time.Duration, frame []byte) { + (*stream).WriteUnit(medi, ctrack, &unit.MPEG1Audio{ + Base: unit.Base{ + NTP: time.Now(), + PTS: durationToTimestamp(pts, ctrack.ClockRate()), + }, + Frames: [][]byte{frame}, + }) + }) + + case *format.AC3: + medi := &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{ctrack}, + } + medias = append(medias, medi) + + r.OnDataAC3(ttrack, func(pts time.Duration, frame []byte) { + (*stream).WriteUnit(medi, ctrack, &unit.AC3{ Base: unit.Base{ NTP: time.Now(), - PTS: durationToTimestamp(pts, audioFormat.ClockRate()), + PTS: durationToTimestamp(pts, ctrack.ClockRate()), }, Frames: [][]byte{frame}, }) }) case *format.G711: - r.OnDataG711(func(pts time.Duration, samples []byte) { - (*stream).WriteUnit(medi, audioFormat, &unit.G711{ + medi := &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{ctrack}, + } + medias = append(medias, medi) + + r.OnDataG711(ttrack, func(pts time.Duration, samples []byte) { + (*stream).WriteUnit(medi, ctrack, &unit.G711{ Base: unit.Base{ NTP: time.Now(), - PTS: durationToTimestamp(pts, audioFormat.ClockRate()), + PTS: durationToTimestamp(pts, ctrack.ClockRate()), }, Samples: samples, }) }) case *format.LPCM: - r.OnDataLPCM(func(pts time.Duration, samples []byte) { - (*stream).WriteUnit(medi, audioFormat, &unit.LPCM{ + medi := &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{ctrack}, + } + medias = append(medias, medi) + + r.OnDataLPCM(ttrack, func(pts time.Duration, samples []byte) { + (*stream).WriteUnit(medi, ctrack, &unit.LPCM{ Base: unit.Base{ NTP: time.Now(), - PTS: durationToTimestamp(pts, audioFormat.ClockRate()), + PTS: durationToTimestamp(pts, ctrack.ClockRate()), }, Samples: samples, }) diff --git a/internal/servers/rtmp/server_test.go b/internal/servers/rtmp/server_test.go index bdd417692354..97539e8efac5 100644 --- a/internal/servers/rtmp/server_test.go +++ b/internal/servers/rtmp/server_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/defs" @@ -247,8 +248,8 @@ func TestServerRead(t *testing.T) { r, err := rtmp.NewReader(conn) require.NoError(t, err) - videoTrack, _ := r.Tracks() - require.Equal(t, test.FormatH264, videoTrack) + tracks := r.Tracks() + require.Equal(t, []format.Format{test.FormatH264}, tracks) stream.WaitRunningReader() @@ -261,7 +262,7 @@ func TestServerRead(t *testing.T) { }, }) - r.OnDataH264(func(_ time.Duration, au [][]byte) { + r.OnDataH264(tracks[0].(*format.H264), func(_ time.Duration, au [][]byte) { require.Equal(t, [][]byte{ test.FormatH264.SPS, test.FormatH264.PPS,