Skip to content

Commit

Permalink
Move routers into senders
Browse files Browse the repository at this point in the history
  • Loading branch information
OrlandoCo committed Sep 23, 2020
1 parent 490de0a commit c7cf37e
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 143 deletions.
9 changes: 6 additions & 3 deletions pkg/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type rtpExtInfo struct {
// Receiver defines a interface for a track receivers
type Receiver interface {
Track() *webrtc.Track
AddSender(pid string, sender Sender)
AddSender(sender Sender)
DeleteSender(pid string)
GetPacket(sn uint16) *rtp.Packet
ReadRTP() chan *rtp.Packet
Expand Down Expand Up @@ -98,6 +98,8 @@ func NewWebRTCReceiver(ctx context.Context, track *webrtc.Track) Receiver {
w.spatialLayer = 2
case fullResolution:
w.spatialLayer = 3
default:
w.spatialLayer = 0
}

waitStart := make(chan struct{})
Expand All @@ -116,10 +118,10 @@ func (w *WebRTCReceiver) OnCloseHandler(fn func()) {
w.onCloseHandler = fn
}

func (w *WebRTCReceiver) AddSender(pid string, sender Sender) {
func (w *WebRTCReceiver) AddSender(sender Sender) {
w.Lock()
defer w.Unlock()
w.senders[pid] = sender
w.senders[sender.ID()] = sender
}

func (w *WebRTCReceiver) DeleteSender(pid string) {
Expand Down Expand Up @@ -492,6 +494,7 @@ func startAudioReceiver(w *WebRTCReceiver, wStart chan struct{}) {
}
}
}()
go w.fwdRTP()
wStart <- struct{}{}
w.wg.Wait()
}
120 changes: 25 additions & 95 deletions pkg/router.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package sfu

import (
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/pion/ion-sfu/pkg/log"
"github.com/pion/rtcp"
)

var routerConfig RouterConfig
Expand All @@ -16,113 +11,48 @@ var routerConfig RouterConfig
type Router struct {
tid string
mu sync.RWMutex
receivers []Receiver
receivers [3 + 1]Receiver
lastNack int64

simulcast bool
simulcastSSRC uint32
simulcast bool
}

// NewRouter for routing rtp/rtcp packets
func NewRouter(tid string) *Router {
r := &Router{
tid: tid,
lastNack: time.Now().Unix(),
}
return r
}

// NewRouterWithSimulcast
func NewRouterWithSimulcast(tid string) *Router {
func NewRouter(tid string, hasSimulcast bool) *Router {
r := &Router{
tid: tid,
lastNack: time.Now().Unix(),
simulcast: true,
simulcastSSRC: rand.Uint32(),
tid: tid,
lastNack: time.Now().Unix(),
simulcast: hasSimulcast,
}
return r
}

func (r *Router) AddReceiver(recv Receiver) {
r.mu.Lock()
defer r.mu.Unlock()
r.receivers = append(r.receivers, recv)
r.receivers[recv.SpatialLayer()] = recv
}

// AddSender to router
func (r *Router) AddSender(pid string, sub Sender) {
r.receivers[0].AddSender(pid, sub)
go r.subFeedbackLoop(pid, r.receivers[0], sub)
}

// subFeedbackLoop reads rtcp packets from the sub
// and either handles them or forwards them to the receivers.
func (r *Router) subFeedbackLoop(pid string, recv Receiver, sub Sender) {
defer func() {
recv.DeleteSender(pid)
}()

for {
select {
case pkt, opn := <-sub.ReadRTCP():
if !opn {
return
}
switch pkt := pkt.(type) {
case *rtcp.TransportLayerNack:
log.Tracef("Router got nack: %+v", pkt)
for _, pair := range pkt.Nacks {
bufferPkt := recv.GetPacket(pair.PacketID)
if bufferPkt != nil {
// We found the packet in the buffer, resend to sub
sub.WriteRTP(bufferPkt)
continue
}
if routerConfig.MaxNackTime > 0 {
ln := atomic.LoadInt64(&r.lastNack)
if (time.Now().Unix() - ln) < routerConfig.MaxNackTime {
continue
}
atomic.StoreInt64(&r.lastNack, time.Now().Unix())
}
// Packet not found, request from receivers
nack := &rtcp.TransportLayerNack{
// origin ssrc
SenderSSRC: pkt.SenderSSRC,
MediaSSRC: pkt.MediaSSRC,
Nacks: []rtcp.NackPair{{PacketID: pair.PacketID}},
}
if err := recv.WriteRTCP(nack); err != nil {
log.Errorf("Error writing nack RTCP %s", err)
}
}
default:
if err := recv.WriteRTCP(pkt); err != nil {
log.Errorf("Error writing RTCP %s", err)
}
}
case layer := <-sub.Switch():
if layer == recv.SpatialLayer() {
// Same layer ignore msg
continue
}
if rcv := r.switchSpatialLayer(layer); rcv != nil {
recv.DeleteSender(pid)
rcv.AddSender(pid, sub)
go r.subFeedbackLoop(pid, rcv, sub)
return
}
}
}
func (r *Router) GetReceiver(layer uint8) Receiver {
r.mu.Lock()
defer r.mu.Unlock()
return r.receivers[layer]
}

func (r *Router) switchSpatialLayer(layer uint8) Receiver {
for _, recv := range r.receivers {
if recv.SpatialLayer() == layer {
return recv
}
// AddSender to router
func (r *Router) AddSender(sub Sender) {
r.receivers[sub.CurrentLayer()].AddSender(sub)
}

func (r *Router) SwitchSpatialLayer(currentLayer, targetLayer uint8, sub Sender) bool {
currentRecv := r.GetReceiver(currentLayer)
targetRecv := r.GetReceiver(targetLayer)
if targetRecv != nil {
// TODO do a more smart layer change
currentRecv.DeleteSender(sub.ID())
targetRecv.AddSender(sub)
return true
}
return nil
return false
}

func (r *Router) stats() string {
Expand Down
70 changes: 55 additions & 15 deletions pkg/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"
"time"

"github.com/pion/ion-sfu/pkg/log"
Expand All @@ -15,44 +16,49 @@ import (

// Sender defines a interface for a track receivers
type Sender interface {
ID() string
ReadRTCP() chan rtcp.Packet
WriteRTP(*rtp.Packet)
Close()
OnCloseHandler(fn func())
CurrentLayer() uint8
stats() string
// Simulcast events
Switch() chan uint8
SwitchTo(layer uint8)
}

// WebRTCSender represents a Sender which writes RTP to a webrtc track
type WebRTCSender struct {
id string
ctx context.Context
cancel context.CancelFunc
onCloseHandler func()
sender *webrtc.RTPSender
track *webrtc.Track
router *Router
rtcpCh chan rtcp.Packet
switchCh chan uint8
sendCh chan *rtp.Packet
maxBitrate uint64
target uint64
sendChan chan *rtp.Packet
currentLayer uint8
onCloseHandler func()

once sync.Once
}

// NewWebRTCSender creates a new track sender instance
func NewWebRTCSender(ctx context.Context, track *webrtc.Track, sender *webrtc.RTPSender) Sender {
func NewWebRTCSender(ctx context.Context, id string, router *Router, sender *webrtc.RTPSender) Sender {
ctx, cancel := context.WithCancel(ctx)
sender.Track()
s := &WebRTCSender{
id: id,
ctx: ctx,
cancel: cancel,
router: router,
sender: sender,
track: track,
track: sender.Track(),
maxBitrate: routerConfig.MaxBandwidth * 1000,
rtcpCh: make(chan rtcp.Packet, maxSize),
switchCh: make(chan uint8, 1),
sendChan: make(chan *rtp.Packet, maxSize),
sendCh: make(chan *rtp.Packet, maxSize),
}

go s.receiveRTCP()
Expand All @@ -61,6 +67,10 @@ func NewWebRTCSender(ctx context.Context, track *webrtc.Track, sender *webrtc.RT
return s
}

func (s *WebRTCSender) ID() string {
return s.id
}

func (s *WebRTCSender) sendRTP() {
// There exists a bug in chrome where setLocalDescription
// fails if track RTP arrives before the sfu offer is set.
Expand All @@ -70,7 +80,7 @@ func (s *WebRTCSender) sendRTP() {

for {
select {
case pkt := <-s.sendChan:
case pkt := <-s.sendCh:
// Transform payload type
pt := s.track.Codec().PayloadType
newPkt := *pkt
Expand All @@ -97,16 +107,16 @@ func (s *WebRTCSender) ReadRTCP() chan rtcp.Packet {
// WriteRTP to the track
func (s *WebRTCSender) WriteRTP(pkt *rtp.Packet) {
if s.ctx.Err() == nil {
s.sendChan <- pkt
s.sendCh <- pkt
}
}

func (s *WebRTCSender) Switch() chan uint8 {
return s.switchCh
func (s *WebRTCSender) CurrentLayer() uint8 {
return s.currentLayer
}

func (s *WebRTCSender) SwitchTo(layer uint8) {
s.switchCh <- layer
log.Warnf("can't change layers in simple senders, current: %d target: %d", s.currentLayer, layer)
}

// OnClose is called when the sender is closed
Expand Down Expand Up @@ -146,8 +156,38 @@ func (s *WebRTCSender) receiveRTCP() {

for _, pkt := range pkts {
switch pkt := pkt.(type) {
case *rtcp.PictureLossIndication, *rtcp.FullIntraRequest, *rtcp.TransportLayerNack:
s.rtcpCh <- pkt
case *rtcp.PictureLossIndication, *rtcp.FullIntraRequest:
if err := s.router.GetReceiver(0).WriteRTCP(pkt); err != nil {
log.Errorf("writing RTCP err %v", err)
}
case *rtcp.TransportLayerNack:
log.Tracef("Router got nack: %+v", pkt)
recv := s.router.GetReceiver(0)
for _, pair := range pkt.Nacks {
bufferPkt := recv.GetPacket(pair.PacketID)
if bufferPkt != nil {
// We found the packet in the buffer, resend to sub
s.sendCh <- bufferPkt
continue
}
if routerConfig.MaxNackTime > 0 {
ln := atomic.LoadInt64(&s.router.lastNack)
if (time.Now().Unix() - ln) < routerConfig.MaxNackTime {
continue
}
atomic.StoreInt64(&s.router.lastNack, time.Now().Unix())
}
// Packet not found, request from receivers
nack := &rtcp.TransportLayerNack{
// origin ssrc
SenderSSRC: pkt.SenderSSRC,
MediaSSRC: pkt.MediaSSRC,
Nacks: []rtcp.NackPair{{PacketID: pair.PacketID}},
}
if err := recv.WriteRTCP(nack); err != nil {
log.Errorf("writing nack RTCP err %v", err)
}
}
default:
// TODO: Use fb packets for congestion control
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (r *Session) AddRouter(router *Router) {
}

// Attach sender to source
router.AddSender(tid, sender)
router.AddSender(sender)
}
}

Expand Down
Loading

0 comments on commit c7cf37e

Please sign in to comment.