From 6e1e2280df26aa3e9901bb067d6424cc259b6198 Mon Sep 17 00:00:00 2001 From: Ivan Tivonenko Date: Wed, 3 Feb 2021 16:14:41 +0200 Subject: [PATCH] Count bytes of video data received/sent by broadcaster per stream and expose that in /status endpoint. These allows to calculate ingest rate. --- net/interface.go | 6 ++++++ server/broadcast.go | 3 +++ server/cliserver_test.go | 2 +- server/mediaserver.go | 27 +++++++++++++++++++-------- server/push_test.go | 4 ++++ 5 files changed, 33 insertions(+), 9 deletions(-) diff --git a/net/interface.go b/net/interface.go index 1dbd97f7c..e864f2df9 100644 --- a/net/interface.go +++ b/net/interface.go @@ -9,11 +9,17 @@ type RemoteTranscoderInfo struct { Capacity int } +type StreamInfo struct { + SourceBytes uint64 + TranscodedBytes uint64 +} + type NodeStatus struct { Manifests map[string]*m3u8.MasterPlaylist // maps external manifest (provided in HTTP push URL to the internal one // (returned from webhook)) InternalManifests map[string]string + StreamInfo map[string]StreamInfo OrchestratorPool []string Version string GolangRuntimeVersion string diff --git a/server/broadcast.go b/server/broadcast.go index 8acde046a..a7c0dea1e 100644 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" ethcommon "github.com/ethereum/go-ethereum/common" @@ -363,6 +364,7 @@ func processSegment(cxn *rtmpConnection, seg *stream.HLSSegment) ([]string, erro if monitor.Enabled { monitor.SegmentEmerged(nonce, seg.SeqNo, len(BroadcastJobVideoProfiles), seg.Duration) } + atomic.AddUint64(&cxn.sourceBytes, uint64(len(seg.Data))) seg.Name = "" // hijack seg.Name to convey the uploaded URI ext, err := common.ProfileFormatExtension(vProfile.Format) @@ -570,6 +572,7 @@ func transcodeSegment(cxn *rtmpConnection, seg *stream.HLSSegment, name string, } data = d + atomic.AddUint64(&cxn.transcodedBytes, uint64(len(data))) } if bros != nil { diff --git a/server/cliserver_test.go b/server/cliserver_test.go index 367051ba5..7564c45ef 100644 --- a/server/cliserver_test.go +++ b/server/cliserver_test.go @@ -202,7 +202,7 @@ func TestGetStatus(t *testing.T) { defer res.Body.Close() body, err := ioutil.ReadAll(res.Body) req.Nil(err) - expected := fmt.Sprintf(`{"Manifests":{},"InternalManifests":{},"OrchestratorPool":[],"Version":"undefined","GolangRuntimeVersion":"%s","GOArch":"%s","GOOS":"%s","RegisteredTranscodersNumber":1,"RegisteredTranscoders":[{"Address":"TestAddress","Capacity":5}],"LocalTranscoding":false}`, + expected := fmt.Sprintf(`{"Manifests":{},"InternalManifests":{},"StreamInfo":{},"OrchestratorPool":[],"Version":"undefined","GolangRuntimeVersion":"%s","GOArch":"%s","GOOS":"%s","RegisteredTranscodersNumber":1,"RegisteredTranscoders":[{"Address":"TestAddress","Capacity":5}],"LocalTranscoding":false}`, runtime.Version(), runtime.GOARCH, runtime.GOOS) assert.Equal(expected, string(body)) } diff --git a/server/mediaserver.go b/server/mediaserver.go index 897737a01..e14eb2a8f 100644 --- a/server/mediaserver.go +++ b/server/mediaserver.go @@ -22,6 +22,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/livepeer/go-livepeer/drivers" @@ -71,14 +72,16 @@ var httpPushResetTimer = func() (context.Context, context.CancelFunc) { } type rtmpConnection struct { - mid core.ManifestID - nonce uint64 - stream stream.RTMPVideoStream - pl core.PlaylistManager - profile *ffmpeg.VideoProfile - params *core.StreamParameters - sessManager *BroadcastSessionsManager - lastUsed time.Time + mid core.ManifestID + nonce uint64 + stream stream.RTMPVideoStream + pl core.PlaylistManager + profile *ffmpeg.VideoProfile + params *core.StreamParameters + sessManager *BroadcastSessionsManager + lastUsed time.Time + sourceBytes uint64 + transcodedBytes uint64 } type LivepeerServer struct { @@ -1339,16 +1342,24 @@ func (s *LivepeerServer) GetNodeStatus() *net.NodeStatus { s.connectionLock.RLock() defer s.connectionLock.RUnlock() + streamInfo := make(map[string]net.StreamInfo) for _, cxn := range s.rtmpConnections { if cxn.pl == nil { continue } cpl := cxn.pl m[string(cpl.ManifestID())] = cpl.GetHLSMasterPlaylist() + sb := atomic.LoadUint64(&cxn.sourceBytes) + tb := atomic.LoadUint64(&cxn.transcodedBytes) + streamInfo[string(cpl.ManifestID())] = net.StreamInfo{ + SourceBytes: sb, + TranscodedBytes: tb, + } } res := &net.NodeStatus{ Manifests: m, InternalManifests: make(map[string]string), + StreamInfo: streamInfo, Version: core.LivepeerVersion, GolangRuntimeVersion: runtime.Version(), GOArch: runtime.GOARCH, diff --git a/server/push_test.go b/server/push_test.go index 3a2643c80..d77f82436 100644 --- a/server/push_test.go +++ b/server/push_test.go @@ -135,6 +135,8 @@ func TestPush_MultipartReturn(t *testing.T) { i++ } assert.Equal(1, i) + assert.Equal(uint64(12), cxn.sourceBytes) + assert.Equal(uint64(0), cxn.transcodedBytes) bsm.sel.Clear() bsm.sel.Add([]*BroadcastSession{sess}) @@ -190,6 +192,8 @@ func TestPush_MultipartReturn(t *testing.T) { i++ } assert.Equal(1, i) + assert.Equal(uint64(36), cxn.sourceBytes) + assert.Equal(uint64(44), cxn.transcodedBytes) // No sessions error cxn.sessManager.sel.Clear()