Skip to content

Commit

Permalink
Count bytes of video data received/sent by broadcaster per stream
Browse files Browse the repository at this point in the history
and expose that in /status endpoint.
These allows to calculate ingest rate.
  • Loading branch information
darkdarkdragon committed Feb 3, 2021
1 parent 6b1fca4 commit 6e1e228
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 9 deletions.
6 changes: 6 additions & 0 deletions net/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,17 @@ type RemoteTranscoderInfo struct {
Capacity int
}

type StreamInfo struct {
SourceBytes uint64
TranscodedBytes uint64
}

type NodeStatus struct {
Manifests map[string]*m3u8.MasterPlaylist
// maps external manifest (provided in HTTP push URL to the internal one
// (returned from webhook))
InternalManifests map[string]string
StreamInfo map[string]StreamInfo
OrchestratorPool []string
Version string
GolangRuntimeVersion string
Expand Down
3 changes: 3 additions & 0 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -363,6 +364,7 @@ func processSegment(cxn *rtmpConnection, seg *stream.HLSSegment) ([]string, erro
if monitor.Enabled {
monitor.SegmentEmerged(nonce, seg.SeqNo, len(BroadcastJobVideoProfiles), seg.Duration)
}
atomic.AddUint64(&cxn.sourceBytes, uint64(len(seg.Data)))

seg.Name = "" // hijack seg.Name to convey the uploaded URI
ext, err := common.ProfileFormatExtension(vProfile.Format)
Expand Down Expand Up @@ -570,6 +572,7 @@ func transcodeSegment(cxn *rtmpConnection, seg *stream.HLSSegment, name string,
}

data = d
atomic.AddUint64(&cxn.transcodedBytes, uint64(len(data)))
}

if bros != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/cliserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestGetStatus(t *testing.T) {
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
req.Nil(err)
expected := fmt.Sprintf(`{"Manifests":{},"InternalManifests":{},"OrchestratorPool":[],"Version":"undefined","GolangRuntimeVersion":"%s","GOArch":"%s","GOOS":"%s","RegisteredTranscodersNumber":1,"RegisteredTranscoders":[{"Address":"TestAddress","Capacity":5}],"LocalTranscoding":false}`,
expected := fmt.Sprintf(`{"Manifests":{},"InternalManifests":{},"StreamInfo":{},"OrchestratorPool":[],"Version":"undefined","GolangRuntimeVersion":"%s","GOArch":"%s","GOOS":"%s","RegisteredTranscodersNumber":1,"RegisteredTranscoders":[{"Address":"TestAddress","Capacity":5}],"LocalTranscoding":false}`,
runtime.Version(), runtime.GOARCH, runtime.GOOS)
assert.Equal(expected, string(body))
}
Expand Down
27 changes: 19 additions & 8 deletions server/mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/livepeer/go-livepeer/drivers"
Expand Down Expand Up @@ -71,14 +72,16 @@ var httpPushResetTimer = func() (context.Context, context.CancelFunc) {
}

type rtmpConnection struct {
mid core.ManifestID
nonce uint64
stream stream.RTMPVideoStream
pl core.PlaylistManager
profile *ffmpeg.VideoProfile
params *core.StreamParameters
sessManager *BroadcastSessionsManager
lastUsed time.Time
mid core.ManifestID
nonce uint64
stream stream.RTMPVideoStream
pl core.PlaylistManager
profile *ffmpeg.VideoProfile
params *core.StreamParameters
sessManager *BroadcastSessionsManager
lastUsed time.Time
sourceBytes uint64
transcodedBytes uint64
}

type LivepeerServer struct {
Expand Down Expand Up @@ -1339,16 +1342,24 @@ func (s *LivepeerServer) GetNodeStatus() *net.NodeStatus {

s.connectionLock.RLock()
defer s.connectionLock.RUnlock()
streamInfo := make(map[string]net.StreamInfo)
for _, cxn := range s.rtmpConnections {
if cxn.pl == nil {
continue
}
cpl := cxn.pl
m[string(cpl.ManifestID())] = cpl.GetHLSMasterPlaylist()
sb := atomic.LoadUint64(&cxn.sourceBytes)
tb := atomic.LoadUint64(&cxn.transcodedBytes)
streamInfo[string(cpl.ManifestID())] = net.StreamInfo{
SourceBytes: sb,
TranscodedBytes: tb,
}
}
res := &net.NodeStatus{
Manifests: m,
InternalManifests: make(map[string]string),
StreamInfo: streamInfo,
Version: core.LivepeerVersion,
GolangRuntimeVersion: runtime.Version(),
GOArch: runtime.GOARCH,
Expand Down
4 changes: 4 additions & 0 deletions server/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func TestPush_MultipartReturn(t *testing.T) {
i++
}
assert.Equal(1, i)
assert.Equal(uint64(12), cxn.sourceBytes)
assert.Equal(uint64(0), cxn.transcodedBytes)

bsm.sel.Clear()
bsm.sel.Add([]*BroadcastSession{sess})
Expand Down Expand Up @@ -190,6 +192,8 @@ func TestPush_MultipartReturn(t *testing.T) {
i++
}
assert.Equal(1, i)
assert.Equal(uint64(36), cxn.sourceBytes)
assert.Equal(uint64(44), cxn.transcodedBytes)

// No sessions error
cxn.sessManager.sel.Clear()
Expand Down

0 comments on commit 6e1e228

Please sign in to comment.