Skip to content

Commit

Permalink
ffstream and ffstreamctl both work in a simple test
Browse files Browse the repository at this point in the history
  • Loading branch information
xaionaro committed Jan 5, 2025
1 parent 1e66c40 commit 47c0315
Show file tree
Hide file tree
Showing 17 changed files with 174 additions and 118 deletions.
2 changes: 1 addition & 1 deletion cmd/ffstream/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func getListener(
parts := strings.SplitN(addr, ":", 2)

if len(parts) == 1 {
return net.Listen("unixpacket", addr)
return net.Listen("unix", addr)
}

switch parts[0] {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ffstreamctl/commands/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func flagIntGet(cmd *cobra.Command, args []string) {
value, err := client.GetFlagInt(ctx, flagID)
assertNoError(ctx, err)

fmt.Fprintf(cmd.OutOrStdout(), "%d", value)
fmt.Fprintf(cmd.OutOrStdout(), "%d\n", value)
}

func flagIntSet(cmd *cobra.Command, args []string) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ require (
github.com/xaionaro-go/gorex v0.0.0-20241010205749-bcd59d639c4d
github.com/xaionaro-go/grpcproxy v0.0.0-20241103205849-a8fef42e72f9
github.com/xaionaro-go/kickcom v0.0.0-20241022142825-25a234cc8628
github.com/xaionaro-go/libsrt v0.0.0-20250105221133-977cb871b11c
github.com/xaionaro-go/libsrt v0.0.0-20250105232601-e760c79b2bc3
github.com/xaionaro-go/lockmap v0.0.0-20240901172806-e17aea364748
github.com/xaionaro-go/mediamtx v0.0.0-20241103200202-882a99e8df73
github.com/xaionaro-go/object v0.0.0-20241026212449-753ce10ec94c
Expand Down
16 changes: 2 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -951,20 +951,8 @@ github.com/xaionaro-go/grpcproxy v0.0.0-20241103205849-a8fef42e72f9 h1:epEpV8x9A
github.com/xaionaro-go/grpcproxy v0.0.0-20241103205849-a8fef42e72f9/go.mod h1:2Q+/D11JlgTmBylj7QPCYsuDfSYYlHs3NnYp3/T9XSs=
github.com/xaionaro-go/kickcom v0.0.0-20241022142825-25a234cc8628 h1:/ohdYrl4nFgEJJTQqP8hSzXqXxyetiB07jnQu5pcvNo=
github.com/xaionaro-go/kickcom v0.0.0-20241022142825-25a234cc8628/go.mod h1:gzKL0qgtR13PXl4woI3nvxVhQ9Z6lHtGL5tk9HmITxA=
github.com/xaionaro-go/libsrt v0.0.0-20250105190022-0cd4121f2c0b h1:nYsbzJE91d+pAbbmKyMehKBZCem+7TlbOgxHT/ntvvQ=
github.com/xaionaro-go/libsrt v0.0.0-20250105190022-0cd4121f2c0b/go.mod h1:3Klw9cShOvz/8l2oZ4rZ3gZzWq/4mU4VHKD/mTs4xDU=
github.com/xaionaro-go/libsrt v0.0.0-20250105192237-23b8975acd17 h1:akn6Rr92LzriuWaiuBoW8NtsKGnSZwmRi6uZsZOob64=
github.com/xaionaro-go/libsrt v0.0.0-20250105192237-23b8975acd17/go.mod h1:3Klw9cShOvz/8l2oZ4rZ3gZzWq/4mU4VHKD/mTs4xDU=
github.com/xaionaro-go/libsrt v0.0.0-20250105201653-27a437d1e25b h1:9aSTOS4XkOnlFX+0mHav+9x1aT4g1RljjwparOya7JQ=
github.com/xaionaro-go/libsrt v0.0.0-20250105201653-27a437d1e25b/go.mod h1:3Klw9cShOvz/8l2oZ4rZ3gZzWq/4mU4VHKD/mTs4xDU=
github.com/xaionaro-go/libsrt v0.0.0-20250105201930-8347ba6331d3 h1:ICP2hgG9tAyoVv2p3CDIbNDprZkGngcLKRD5Pp6zPLs=
github.com/xaionaro-go/libsrt v0.0.0-20250105201930-8347ba6331d3/go.mod h1:3Klw9cShOvz/8l2oZ4rZ3gZzWq/4mU4VHKD/mTs4xDU=
github.com/xaionaro-go/libsrt v0.0.0-20250105220321-234355a2a45a h1:kNZQhBbEhYCcpR131ktRoofQCbelexNs/SHjExaubEA=
github.com/xaionaro-go/libsrt v0.0.0-20250105220321-234355a2a45a/go.mod h1:3Klw9cShOvz/8l2oZ4rZ3gZzWq/4mU4VHKD/mTs4xDU=
github.com/xaionaro-go/libsrt v0.0.0-20250105220714-db4bb349813c h1:+dZZEdl6OQMha/o1aN2KIOfe8WrHgb6ZwqTzM6heTbE=
github.com/xaionaro-go/libsrt v0.0.0-20250105220714-db4bb349813c/go.mod h1:3Klw9cShOvz/8l2oZ4rZ3gZzWq/4mU4VHKD/mTs4xDU=
github.com/xaionaro-go/libsrt v0.0.0-20250105221133-977cb871b11c h1:dz7UpX5njoHIF8KP99Rtp0Tii241MCo3sPVAv72xiGo=
github.com/xaionaro-go/libsrt v0.0.0-20250105221133-977cb871b11c/go.mod h1:3Klw9cShOvz/8l2oZ4rZ3gZzWq/4mU4VHKD/mTs4xDU=
github.com/xaionaro-go/libsrt v0.0.0-20250105232601-e760c79b2bc3 h1:OeqZZ9i3KHaTKOxLtrF7nBkao299F5QK4acl9Nd/GrI=
github.com/xaionaro-go/libsrt v0.0.0-20250105232601-e760c79b2bc3/go.mod h1:7k3ZDcOLrS8ZKOu3P30tWMSno2blKqmH+9pKfDCZ2mE=
github.com/xaionaro-go/libvlc-go/v3 v3.0.0-20241011194409-0fe4e2a9d901 h1:HX0CO6h5oDQfp9NquzQT0xWH4Gn9Z5BZ0IFJrYFl88k=
github.com/xaionaro-go/libvlc-go/v3 v3.0.0-20241011194409-0fe4e2a9d901/go.mod h1:xJK0YD8cyMDejnrTFQinStE6RYCV1nlfS8KmqTpszSc=
github.com/xaionaro-go/lockmap v0.0.0-20240901172806-e17aea364748 h1:SlB3zLAuLgRxdOo250gMUG/7hSiEU2NzEUNYbJDuI2A=
Expand Down
45 changes: 33 additions & 12 deletions pkg/ffstream/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/asticode/go-astiav"
"github.com/facebookincubator/go-belt/tool/logger"
Expand All @@ -16,6 +17,7 @@ type Encoder struct {

Config EncoderConfig

prevEncodeTS time.Time
locker sync.Mutex
inputStreams map[*recoder.Input]map[int]*astiav.Stream
skippedVideoFrame bool
Expand All @@ -25,10 +27,10 @@ type Encoder struct {
var _ recoder.Encoder = (*Encoder)(nil)

type CodecConfig struct {
CodecName string
BitRateAveragerBufferSizeInBits uint64
AverageBitRate uint64
CustomOptions []recoder.CustomOption
CodecName string
AveragingPeriod time.Duration
AverageBitRate uint64
CustomOptions []recoder.CustomOption
}

type EncoderConfig struct {
Expand Down Expand Up @@ -75,9 +77,9 @@ func (e *Encoder) Configure(
}
}

if cfg.Video.AverageBitRate != 0 && cfg.Video.BitRateAveragerBufferSizeInBits == 0 {
cfg.Video.BitRateAveragerBufferSizeInBits = cfg.Video.AverageBitRate * 10
logger.Warnf(ctx, "BitRateAveragerBufferSizeInBits is not set, defaulting to %v", cfg.Video.BitRateAveragerBufferSizeInBits)
if cfg.Video.AverageBitRate != 0 && cfg.Video.AveragingPeriod == 0 {
cfg.Video.AveragingPeriod = time.Second * 10
logger.Warnf(ctx, "AveragingPeriod is not set, defaulting to %v", cfg.Video.AveragingPeriod)
}

e.locker.Lock()
Expand Down Expand Up @@ -143,32 +145,49 @@ func (e *Encoder) encodeVideoPacket(
input.Packet.Pos(), input.Packet.Pts(), input.Packet.Duration(),
)
defer func() { logger.Tracef(ctx, "a video packet: %v", _err) }()
e.FramesRead.VideoUnprocessed.Add(1)

if e.Config.Video.AverageBitRate == 0 {
e.videoAveragerBufferConsumed = 0
e.FramesWrote.VideoUnprocessed.Add(1)
return &recoder.EncoderOutput{
Packet: recoder.ClonePacketAsWritable(input.Packet),
Stream: inputStream,
}, nil
}

pktSize := input.Packet.Size()
now := time.Now()
prevTS := e.prevEncodeTS
e.prevEncodeTS = now

if e.videoAveragerBufferConsumed+int64(pktSize)*8 > int64(e.Config.Audio.BitRateAveragerBufferSizeInBits) {
tsDiff := now.Sub(prevTS)
allowMoreBits := 1 + int64(tsDiff.Seconds()*float64(e.Config.Video.AverageBitRate))

e.videoAveragerBufferConsumed -= allowMoreBits
if e.videoAveragerBufferConsumed < 0 {
e.videoAveragerBufferConsumed = 0
}

pktSize := input.Packet.Size()
averagingBuffer := int64(e.Config.Video.AveragingPeriod.Seconds() * float64(e.Config.Video.AverageBitRate))
consumedWithPacket := e.videoAveragerBufferConsumed + int64(pktSize)*8
if consumedWithPacket > averagingBuffer {
e.skippedVideoFrame = true
logger.Tracef(ctx, "skipping a frame to reduce the bitrate")
logger.Tracef(ctx, "skipping a frame to reduce the bitrate: %d > %d", consumedWithPacket, averagingBuffer)
return nil, nil
}

if e.skippedVideoFrame {
isKeyFrame := input.Packet.Flags().Has(astiav.PacketFlagKey)
if !isKeyFrame {
logger.Tracef(ctx, "skipping a non-key frame")
logger.Tracef(ctx, "skipping a non-key frame (BTW, the consumedWithPacket is %d/%d)", consumedWithPacket, averagingBuffer)
return nil, nil
}
}

e.skippedVideoFrame = false
e.videoAveragerBufferConsumed += int64(pktSize) * 8
e.videoAveragerBufferConsumed = consumedWithPacket
e.FramesWrote.VideoUnprocessed.Add(1)
return &recoder.EncoderOutput{
Packet: recoder.ClonePacketAsWritable(input.Packet),
Stream: inputStream,
Expand All @@ -186,6 +205,8 @@ func (e *Encoder) encodeAudioPacket(
input.Packet.Pos(), input.Packet.Pts(), input.Packet.Dts(), input.Packet.Duration(),
)
defer func() { logger.Tracef(ctx, "an audio packet: %v", _err) }()
e.FramesRead.AudioUnprocessed.Add(1)
e.FramesWrote.AudioUnprocessed.Add(1)
return &recoder.EncoderOutput{
Packet: recoder.ClonePacketAsWritable(input.Packet),
Stream: inputStream,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ffstream/ffstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ func (s *FFStream) SetEncoderConfig(

func (s *FFStream) GetEncoderStats(
ctx context.Context,
) *recoder.CommonsEncoderStatistics {
) *recoder.EncoderStatistics {
return s.Encoder.GetStats()
}

func (s *FFStream) WithSRTOutput(
_ context.Context,
ctx context.Context,
callback func(*threadsafe.Socket) error,
) error {
sock, err := s.Output.SRT()
sock, err := s.Output.SRT(ctx)
if err != nil {
return fmt.Errorf("unable to get the SRT socket handler: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ffstreamserver/client/ffstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (c *Client) End(

func (c *Client) GetEncoderStats(
ctx context.Context,
) (*recoder.CommonsEncoderStatistics, error) {
) (*recoder.EncoderStatistics, error) {
client, conn, err := c.grpcClient()
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ffstreamserver/grpc/ffstream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ message RemoveOutputReply {}

message CodecConfig {
string codecName = 1;
uint64 bitRateAveragerBufferSizeInBits = 2;
uint64 averagingPeriod = 2;
uint64 AverageBitRate = 3;
repeated customOption customOptions = 4;
}
Expand Down
24 changes: 11 additions & 13 deletions pkg/ffstreamserver/grpc/go/ffstream_grpc/ffstream.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions pkg/ffstreamserver/grpc/goconv/duration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package goconv

import "time"

func DurationToGRPC(d time.Duration) int64 {
return d.Nanoseconds()
}

func DurationFromGRPC(d int64) time.Duration {
return time.Nanosecond * time.Duration(d)
}
32 changes: 16 additions & 16 deletions pkg/ffstreamserver/grpc/goconv/encoder_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ func EncoderConfigFromGRPC(
) ffstream.EncoderConfig {
return ffstream.EncoderConfig{
Audio: ffstream.CodecConfig{
CodecName: req.GetAudio().GetCodecName(),
BitRateAveragerBufferSizeInBits: req.GetAudio().GetBitRateAveragerBufferSizeInBits(),
AverageBitRate: req.GetAudio().GetAverageBitRate(),
CustomOptions: CustomOptionsFromGRPC(req.GetAudio().GetCustomOptions()),
CodecName: req.GetAudio().GetCodecName(),
AveragingPeriod: DurationFromGRPC(int64(req.GetAudio().GetAveragingPeriod())),
AverageBitRate: req.GetAudio().GetAverageBitRate(),
CustomOptions: CustomOptionsFromGRPC(req.GetAudio().GetCustomOptions()),
},
Video: ffstream.CodecConfig{
CodecName: req.GetVideo().GetCodecName(),
BitRateAveragerBufferSizeInBits: req.GetVideo().GetBitRateAveragerBufferSizeInBits(),
AverageBitRate: req.GetVideo().GetAverageBitRate(),
CustomOptions: CustomOptionsFromGRPC(req.GetVideo().GetCustomOptions()),
CodecName: req.GetVideo().GetCodecName(),
AveragingPeriod: DurationFromGRPC(int64(req.GetVideo().GetAveragingPeriod())),
AverageBitRate: req.GetVideo().GetAverageBitRate(),
CustomOptions: CustomOptionsFromGRPC(req.GetVideo().GetCustomOptions()),
},
}
}
Expand All @@ -29,16 +29,16 @@ func EncoderConfigToGRPC(
) *ffstream_grpc.EncoderConfig {
return &ffstream_grpc.EncoderConfig{
Audio: &ffstream_grpc.CodecConfig{
CodecName: cfg.Audio.CodecName,
BitRateAveragerBufferSizeInBits: cfg.Audio.BitRateAveragerBufferSizeInBits,
AverageBitRate: cfg.Audio.AverageBitRate,
CustomOptions: CustomOptionsToGRPC(cfg.Audio.CustomOptions),
CodecName: cfg.Audio.CodecName,
AveragingPeriod: uint64(DurationToGRPC(cfg.Audio.AveragingPeriod)),
AverageBitRate: cfg.Audio.AverageBitRate,
CustomOptions: CustomOptionsToGRPC(cfg.Audio.CustomOptions),
},
Video: &ffstream_grpc.CodecConfig{
CodecName: cfg.Video.CodecName,
BitRateAveragerBufferSizeInBits: cfg.Video.BitRateAveragerBufferSizeInBits,
AverageBitRate: cfg.Video.AverageBitRate,
CustomOptions: CustomOptionsToGRPC(cfg.Video.CustomOptions),
CodecName: cfg.Video.CodecName,
AveragingPeriod: uint64(DurationToGRPC(cfg.Video.AveragingPeriod)),
AverageBitRate: cfg.Video.AverageBitRate,
CustomOptions: CustomOptionsToGRPC(cfg.Video.CustomOptions),
},
}
}
59 changes: 32 additions & 27 deletions pkg/ffstreamserver/grpc/goconv/encoder_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,44 @@ import (
)

func EncoderStatsToGRPC(
req *recoder.CommonsEncoderStatistics,
req *recoder.EncoderStatistics,
) *ffstream_grpc.GetEncoderStatsReply {
return &ffstream_grpc.GetEncoderStatsReply{
BytesCountRead: req.BytesCountRead.Load(),
BytesCountWrote: req.BytesCountWrote.Load(),
FramesReadUnparsed: req.FramesRead.Unparsed.Load(),
FramesReadVideoUnprocessed: req.FramesRead.VideoUnprocessed.Load(),
FramesReadAudioUnprocessed: req.FramesRead.AudioUnprocessed.Load(),
FramesReadVideoProcessed: req.FramesRead.VideoProcessed.Load(),
FramesReadAudioProcessed: req.FramesRead.AudioProcessed.Load(),
FramesWroteUnparsed: req.FramesWrote.Unparsed.Load(),
FramesWroteVideoUnprocessed: req.FramesWrote.VideoUnprocessed.Load(),
FramesWroteAudioUnprocessed: req.FramesWrote.AudioUnprocessed.Load(),
FramesWroteVideoProcessed: req.FramesWrote.VideoProcessed.Load(),
FramesWroteAudioProcessed: req.FramesWrote.AudioProcessed.Load(),
BytesCountRead: req.BytesCountRead,
BytesCountWrote: req.BytesCountWrote,
FramesReadUnparsed: req.FramesRead.Unparsed,
FramesReadVideoUnprocessed: req.FramesRead.VideoUnprocessed,
FramesReadAudioUnprocessed: req.FramesRead.AudioUnprocessed,
FramesReadVideoProcessed: req.FramesRead.VideoProcessed,
FramesReadAudioProcessed: req.FramesRead.AudioProcessed,
FramesWroteUnparsed: req.FramesWrote.Unparsed,
FramesWroteVideoUnprocessed: req.FramesWrote.VideoUnprocessed,
FramesWroteAudioUnprocessed: req.FramesWrote.AudioUnprocessed,
FramesWroteVideoProcessed: req.FramesWrote.VideoProcessed,
FramesWroteAudioProcessed: req.FramesWrote.AudioProcessed,
}
}

func EncoderStatsFromGRPC(
req *ffstream_grpc.GetEncoderStatsReply,
) *recoder.CommonsEncoderStatistics {
result := &recoder.CommonsEncoderStatistics{}
result.BytesCountRead.Store(req.BytesCountRead)
result.BytesCountWrote.Store(req.BytesCountWrote)
result.FramesRead.Unparsed.Store(req.FramesReadUnparsed)
result.FramesRead.VideoUnprocessed.Store(req.FramesReadVideoUnprocessed)
result.FramesRead.AudioUnprocessed.Store(req.FramesReadAudioUnprocessed)
result.FramesRead.VideoProcessed.Store(req.FramesReadVideoProcessed)
result.FramesRead.AudioProcessed.Store(req.FramesReadAudioProcessed)
result.FramesWrote.Unparsed.Store(req.FramesWroteUnparsed)
result.FramesWrote.VideoUnprocessed.Store(req.FramesWroteVideoUnprocessed)
result.FramesWrote.AudioUnprocessed.Store(req.FramesWroteAudioUnprocessed)
result.FramesWrote.VideoProcessed.Store(req.FramesWroteVideoProcessed)
result.FramesWrote.AudioProcessed.Store(req.FramesWroteAudioProcessed)
) *recoder.EncoderStatistics {
result := &recoder.EncoderStatistics{
BytesCountRead: req.GetBytesCountRead(),
BytesCountWrote: req.GetBytesCountWrote(),
FramesRead: recoder.EncoderFramesStatistics{
Unparsed: req.GetFramesReadUnparsed(),
VideoUnprocessed: req.GetFramesReadVideoUnprocessed(),
AudioUnprocessed: req.GetFramesReadAudioUnprocessed(),
VideoProcessed: req.GetFramesReadVideoProcessed(),
AudioProcessed: req.GetFramesReadAudioProcessed(),
},
FramesWrote: recoder.EncoderFramesStatistics{
Unparsed: req.GetFramesWroteUnparsed(),
VideoUnprocessed: req.GetFramesWroteVideoUnprocessed(),
AudioUnprocessed: req.GetFramesWroteAudioUnprocessed(),
VideoProcessed: req.GetFramesWroteVideoProcessed(),
AudioProcessed: req.GetFramesWroteAudioProcessed(),
},
}
return result
}
Loading

0 comments on commit 47c0315

Please sign in to comment.