Skip to content

Commit

Permalink
Add a basic pipeline to webrtcconnection (lynckia#1456)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcague authored Sep 16, 2019
1 parent c55e0f2 commit e573f8d
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 11 deletions.
2 changes: 1 addition & 1 deletion erizo/src/erizo/MediaStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ void MediaStream::parseIncomingPayloadType(char *buf, int len, packetType type)

void MediaStream::write(std::shared_ptr<DataPacket> packet) {
if (connection_) {
connection_->write(packet);
connection_->send(packet);
}
}

Expand Down
60 changes: 54 additions & 6 deletions erizo/src/erizo/WebRtcConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ WebRtcConnection::WebRtcConnection(std::shared_ptr<Worker> worker, std::shared_p
ice_config_{ice_config}, rtp_mappings_{rtp_mappings}, extension_processor_{ext_mappings},
worker_{worker}, io_worker_{io_worker},
remote_sdp_{std::make_shared<SdpInfo>(rtp_mappings)}, local_sdp_{std::make_shared<SdpInfo>(rtp_mappings)},
audio_muted_{false}, video_muted_{false}, first_remote_sdp_processed_{false}
{
audio_muted_{false}, video_muted_{false}, first_remote_sdp_processed_{false}, pipeline_{Pipeline::create()},
pipeline_initialized_{false} {
ELOG_INFO("%s message: constructor, stunserver: %s, stunPort: %d, minPort: %d, maxPort: %d",
toLog(), ice_config.stun_server.c_str(), ice_config.stun_port, ice_config.min_port, ice_config.max_port);
stats_ = std::make_shared<Stats>();
Expand Down Expand Up @@ -84,6 +84,9 @@ void WebRtcConnection::syncClose() {
if (conn_event_listener_ != nullptr) {
conn_event_listener_ = nullptr;
}
pipeline_initialized_ = false;
pipeline_->close();
pipeline_.reset();

ELOG_DEBUG("%s message: Close ended", toLog());
}
Expand All @@ -97,10 +100,31 @@ void WebRtcConnection::close() {
}

bool WebRtcConnection::init() {
maybeNotifyWebRtcConnectionEvent(global_state_, "");
asyncTask([] (std::shared_ptr<WebRtcConnection> connection) {
connection->initializePipeline();
connection->maybeNotifyWebRtcConnectionEvent(connection->global_state_, "");
});
return true;
}

void WebRtcConnection::initializePipeline() {
if (pipeline_initialized_) {
return;
}
handler_manager_ = std::make_shared<HandlerManager>(shared_from_this());
pipeline_->addService(shared_from_this());
pipeline_->addService(handler_manager_);

pipeline_->addFront(std::make_shared<ConnectionPacketReader>(this));

pipeline_->addFront(std::make_shared<ConnectionPacketWriter>(this));
pipeline_->finalize();
pipeline_initialized_ = true;
}

void WebRtcConnection::notifyUpdateToHandlers() {
}

boost::future<void> WebRtcConnection::createOffer(bool video_enabled, bool audio_enabled, bool bundle) {
return asyncTask([video_enabled, audio_enabled, bundle] (std::shared_ptr<WebRtcConnection> connection) {
connection->createOfferSync(video_enabled, audio_enabled, bundle);
Expand Down Expand Up @@ -589,6 +613,28 @@ void WebRtcConnection::onTransportData(std::shared_ptr<DataPacket> packet, Trans
if (getCurrentState() != CONN_READY) {
return;
}
if (transport->mediaType == AUDIO_TYPE) {
packet->type = AUDIO_PACKET;
} else if (transport->mediaType == VIDEO_TYPE) {
packet->type = VIDEO_PACKET;
}
asyncTask([packet] (std::shared_ptr<WebRtcConnection> connection) {
if (!connection->pipeline_initialized_) {
ELOG_DEBUG("%s message: Pipeline not initialized yet.", connection->toLog());
return;
}

if (connection->pipeline_) {
connection->pipeline_->read(std::move(packet));
}
});
}

void WebRtcConnection::read(std::shared_ptr<DataPacket> packet) {
Transport *transport = (bundle_ || packet->type == VIDEO_PACKET) ? video_transport_.get() : audio_transport_.get();
if (transport == nullptr) {
return;
}
char* buf = packet->data;
RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf);
if (chead->isRtcp()) {
Expand Down Expand Up @@ -771,13 +817,15 @@ WebRTCEvent WebRtcConnection::getCurrentState() {
return global_state_;
}

void WebRtcConnection::write(std::shared_ptr<DataPacket> packet) {
void WebRtcConnection::send(std::shared_ptr<DataPacket> packet) {
asyncTask([packet] (std::shared_ptr<WebRtcConnection> connection) {
connection->syncWrite(packet);
if (connection->pipeline_) {
connection->pipeline_->write(std::move(packet));
}
});
}

void WebRtcConnection::syncWrite(std::shared_ptr<DataPacket> packet) {
void WebRtcConnection::write(std::shared_ptr<DataPacket> packet) {
if (!sending_) {
return;
}
Expand Down
63 changes: 59 additions & 4 deletions erizo/src/erizo/WebRtcConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "rtp/RtpExtensionProcessor.h"
#include "lib/Clock.h"
#include "pipeline/Handler.h"
#include "pipeline/HandlerManager.h"
#include "pipeline/Service.h"
#include "rtp/QualityManager.h"
#include "rtp/PacketBufferService.h"
Expand Down Expand Up @@ -57,8 +58,8 @@ class WebRtcConnectionEventListener {
* A WebRTC Connection. This class represents a WebRTC Connection that can be established with other peers via a SDP negotiation
* it comprises all the necessary Transport components.
*/
class WebRtcConnection: public TransportListener, public LogContext,
public std::enable_shared_from_this<WebRtcConnection> {
class WebRtcConnection: public TransportListener, public LogContext, public HandlerManagerListener,
public std::enable_shared_from_this<WebRtcConnection>, public Service {
DECLARE_LOGGER();

public:
Expand Down Expand Up @@ -138,8 +139,7 @@ class WebRtcConnection: public TransportListener, public LogContext,

void setMetadata(std::map<std::string, std::string> metadata);

void write(std::shared_ptr<DataPacket> packet);
void syncWrite(std::shared_ptr<DataPacket> packet);
void send(std::shared_ptr<DataPacket> packet);

boost::future<void> asyncTask(std::function<void(std::shared_ptr<WebRtcConnection>)> f);

Expand All @@ -164,6 +164,13 @@ class WebRtcConnection: public TransportListener, public LogContext,
return "id: " + connection_id_ + ", " + printLogContext();
}

bool isPipelineInitialized() { return pipeline_initialized_; }
bool isRunning() { return pipeline_initialized_ && sending_; }
Pipeline::Ptr getPipeline() { return pipeline_; }
void read(std::shared_ptr<DataPacket> packet);
void write(std::shared_ptr<DataPacket> packet);
void notifyUpdateToHandlers() override;

private:
bool createOfferSync(bool video_enabled, bool audio_enabled, bool bundle);
boost::future<void> processRemoteSdp();
Expand All @@ -173,6 +180,7 @@ class WebRtcConnection: public TransportListener, public LogContext,
void onRtcpFromTransport(std::shared_ptr<DataPacket> packet, Transport *transport);
void onREMBFromTransport(RtcpHeader *chead, Transport *transport);
void maybeNotifyWebRtcConnectionEvent(const WebRTCEvent& event, const std::string& message);
void initializePipeline();

protected:
std::atomic<WebRTCEvent> global_state_;
Expand Down Expand Up @@ -208,6 +216,53 @@ class WebRtcConnection: public TransportListener, public LogContext,
bool first_remote_sdp_processed_;

std::unique_ptr<BandwidthDistributionAlgorithm> distributor_;
Pipeline::Ptr pipeline_;
bool pipeline_initialized_;
std::shared_ptr<HandlerManager> handler_manager_;
};

class ConnectionPacketReader : public InboundHandler {
public:
explicit ConnectionPacketReader(WebRtcConnection *connection) : connection_{connection} {}

void enable() override {}
void disable() override {}

std::string getName() override {
return "connection-reader";
}

void read(Context *ctx, std::shared_ptr<DataPacket> packet) override {
connection_->read(std::move(packet));
}

void notifyUpdate() override {
}

private:
WebRtcConnection *connection_;
};

class ConnectionPacketWriter : public OutboundHandler {
public:
explicit ConnectionPacketWriter(WebRtcConnection *connection) : connection_{connection} {}

void enable() override {}
void disable() override {}

std::string getName() override {
return "connection-writer";
}

void write(Context *ctx, std::shared_ptr<DataPacket> packet) override {
connection_->write(std::move(packet));
}

void notifyUpdate() override {
}

private:
WebRtcConnection *connection_;
};

} // namespace erizo
Expand Down
1 change: 1 addition & 0 deletions erizo/src/test/WebRtcConnectionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class WebRtcConnectionTest :
simulated_worker, io_worker);
connection->setTransport(transport);
connection->updateState(TRANSPORT_READY, transport.get());
connection->init();
max_video_bw_list = std::tr1::get<0>(GetParam());
bitrate_value = std::tr1::get<1>(GetParam());
add_to_remb_list = std::tr1::get<2>(GetParam());
Expand Down

0 comments on commit e573f8d

Please sign in to comment.