Skip to content

Commit

Permalink
Handler API (#1690)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arri98 authored May 25, 2021
1 parent 36a7742 commit da4c767
Show file tree
Hide file tree
Showing 21 changed files with 325 additions and 26 deletions.
17 changes: 17 additions & 0 deletions doc/client_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1021,3 +1021,20 @@ You can also use Erizo Client Logger for managing log levels, etc.
```
Erizo.Logger.setLogLevel(Erizo.Logger.ERROR);
```

#Developing custom handlers

Every time a packet is sent or received by erizo it goes through a pipeline that reads or modifies the packet to provide funcionalities such as muting the stream or configuring simulcast. The pipeline is composed by a series of handlers that receive a DataPacket struct from the previous handler in the chain and send a DataPacket struct to the next handler. DataPackets encapsulates all packets which are stored inside DataPacket->data. Licode offers an interface to create custom handlers to later insert inside the pipeline using the client API. A CustomHandler is a C++ class that extends the CustomHandler interface located at licode/erizo/src/pipeline/Handler.h. The different methods that you must implement are:

void read(Context *ctx, std::shared_ptr <DataPacket> packet); This function is called when erizo receives a packet from the user's mediaStream. As a parameter you receive a DataPacket struct which encapsulates the received packet. The method must end calling ctx->fireRead(std::move(packet)); which will give ownership of the packet to the next handler so beware that once this function is called you are no longer able to access it. The packet sent does not have to be the same that was received and you can create your own packet and send multiple packets to the next handler but you will have to handle the rtp ssrc and timestamp reenumeration.

void write(Context *ctx, std::shared_ptr <DataPacket> packet) override; This function is called when erizo sends a packet to the user's mediaStream. As a parameter you receive a DataPacket struct which encapsulates the received packet. The method must end calling ctx->fireRead(std::move(packet)); which will give ownership of the packet to the next handler so beware that once this function is called you are no longer able to access it. The packet sent does not have to be the same that was received and you can create your own packet and send multiple packets to the next handler but you will have to handle the rtp ssrc and timestamp reenumeration.

Positions position() override; // Returns position to place handler. The method return one of the three Positions in which the handler can be inserted: AFTER_READER, MIDDLE, BEFORE_WRITER. A handler located at the beggining AFTER_READER will be the first one to read the packet and the last one to write it. This is the opposite for the BEFORE_WRITER position. It is recommended for most handler to be inserted in the MIDDLE position unless required otherwise.

Read and write functions are used for different purpouses. When a client sends a packet it's mediaStram will read the packet and then all the mediaStreams for the rest of the clients will write the packet to their clients. Read is used when we want to send the modified packet to all users and the write method is used when we only want certain users to receive the modified stream.

Inside the handlers folder there is an example handler that will log different infomation related to the packet. This handler serves as an example and shows how to process the received packet, some utilites that Licode offer to read and modify rtp packetes and how to implement the logging. Be mind that the logger created in the handler must be added to the erizo_controller/erizoAgent/log4cxx.properties file for logs to appear.

Once the handler is finished it has to be added to the HandlerImporter to be accesible trought the API. The handler importer located at erizo/src/erizo/handlers/HandlerImporter.h receives a string vector and creates a dictinoray with pairs with its name as key and the pointer as value. The basic implementation consists on a dictinary that maps string to an enum value which is later used in a switch case structure to create the corresponding pointer. To add a new hanler to the APi you have to add a new value to the HandlerEnum enum and a new pair to the handlerDic with the values {'NameOfTheHandlerForTheAPI',EnumValue}. Finally add a new case to to the function loadHandlers in the HandlerImpoter.cpp file, using the new enum value as case. This implementation can be changed but the HandlerImporterInterface defined in HandlerImporter.h has to be maintained as it is used in the Client API. Recompile erizo after doing all changes for them to have effect.

20 changes: 20 additions & 0 deletions doc/server_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,23 @@ Finally, we will start our service, that will listen to port 80 (line 19).
```
app.listen (80);
```




## Add handlers to the mediaStream

Handlers allow the user to modify the media stream inside erizo and can be used to transcode, drop packages, filter audio... If you do not have any handler installed or you want to create your own handler, read developing custom handlers in the client documentation. Once you have developed and installed your handler you will be able to use them in erizo. Handlers are grouped into profiles, which are a set of handler that are inserted in the given order. In the licode.config file you are able to create profiles with the added handlers, configuring the config.erizo.handlerProfiles file. HandlerProfiles are stored inside and array so to create a new profile add a new row to the config file with the following syntax:

```
config.erizo.handlerProfiles[profileNumber] = [{"name":"name",param1:"value1","param2":"value2",...},{"name":"name2"},...}
```

Each handler accepts different parameters specified in each handler documentation. Profile 0 is the default one and is used if no profile is specified when subscribing or publishing a new stream, so leave this profile empty, [], if you don't want to add any handler by default. After creating a profile it will be available in the API and you can select it when subscribing or publishing with the handlerProfile parameter.

```
room.publish(localStream, handlerProfile: 'profileNumber');
```

As an example, a logger handler has been included which logs all packets received and send by erizo and added to the profile 1. To use it, just publish or subscribe using profileNumber = 1.

25 changes: 21 additions & 4 deletions erizo/src/erizo/MediaStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,18 @@ MediaStream::MediaStream(std::shared_ptr<Worker> worker,
const std::string& media_stream_label,
bool is_publisher,
int session_version,
std::string priority = "default") :
std::string priority = "default",
std::vector<std::string> handler_order,
std::map<std::string, std::shared_ptr<erizo::CustomHandler>> handler_pointer_dic) :
audio_enabled_{false}, video_enabled_{false},
media_stream_event_listener_{nullptr},
connection_{std::move(connection)},
stream_id_{media_stream_id},
mslabel_ {media_stream_label},
priority_{priority},
bundle_{false},
handler_order{handler_order},
handler_pointer_dic{handler_pointer_dic},
pipeline_{Pipeline::create()},
worker_{std::move(worker)},
audio_muted_{false}, video_muted_{false},
Expand Down Expand Up @@ -140,10 +144,10 @@ void MediaStream::cleanPriorityState() {

void MediaStream::setPriority(const std::string& priority) {
boost::mutex::scoped_lock lock(priority_mutex_);
ELOG_INFO("%s message: setting Priority to %s", toLog(), priority.c_str());
if (priority == priority_) {
return;
}
ELOG_INFO("%s message: setting Priority to %s", toLog(), priority.c_str());
priority_ = priority;
cleanPriorityState();
asyncTask([priority] (std::shared_ptr<MediaStream> media_stream) {
Expand Down Expand Up @@ -452,7 +456,7 @@ void MediaStream::initializePipeline() {
pipeline_->addService(packet_buffer_);

pipeline_->addFront(std::make_shared<PacketReader>(this));

addHandlerInPosition(AFTER_READER, handler_pointer_dic, handler_order);
pipeline_->addFront(std::make_shared<RtcpProcessorHandler>());
pipeline_->addFront(std::make_shared<FecReceiverHandler>());
pipeline_->addFront(std::make_shared<LayerBitrateCalculationHandler>());
Expand All @@ -464,6 +468,7 @@ void MediaStream::initializePipeline() {
pipeline_->addFront(std::make_shared<RtpPaddingGeneratorHandler>());
pipeline_->addFront(std::make_shared<PeriodicPliHandler>());
pipeline_->addFront(std::make_shared<PliPriorityHandler>());
addHandlerInPosition(MIDDLE, handler_pointer_dic, handler_order);
pipeline_->addFront(std::make_shared<PliPacerHandler>());
pipeline_->addFront(std::make_shared<RtpPaddingRemovalHandler>());
pipeline_->addFront(std::make_shared<BandwidthEstimationHandler>());
Expand All @@ -473,7 +478,7 @@ void MediaStream::initializePipeline() {
pipeline_->addFront(std::make_shared<LayerDetectorHandler>());
pipeline_->addFront(std::make_shared<OutgoingStatsHandler>());
pipeline_->addFront(std::make_shared<PacketCodecParser>());

addHandlerInPosition(BEFORE_WRITER, handler_pointer_dic, handler_order);
pipeline_->addFront(std::make_shared<PacketWriter>(this));
pipeline_->finalize();

Expand Down Expand Up @@ -1056,6 +1061,18 @@ void MediaStream::enableSlideShowBelowSpatialLayer(bool enabled, int spatial_lay
});
}

void MediaStream::addHandlerInPosition(Positions position,
std::map<std::string, std::shared_ptr<erizo::CustomHandler>> handlers_pointer_dic,
std::vector<std::string> handler_order) {
for (unsigned int i = 0; i < handler_order.size() ; i++) {
std::string handler_name = handler_order[i];
if (handlers_pointer_dic.at(handler_name) && handlers_pointer_dic.at(handler_name)->position() == position) {
pipeline_->addFront(handlers_pointer_dic.at(handler_name));
ELOG_DEBUG(" message: Added handler %s", handler_name);
}
}
}

void MediaStream::enableFallbackBelowMinLayer(bool enabled) {
asyncTask([enabled] (std::shared_ptr<MediaStream> media_stream) {
media_stream->quality_manager_->enableFallbackBelowMinLayer(enabled);
Expand Down
12 changes: 11 additions & 1 deletion erizo/src/erizo/MediaStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "pipeline/Service.h"
#include "rtp/QualityManager.h"
#include "rtp/PacketBufferService.h"
#include "rtp/RtcpProcessorHandler.h"

namespace erizo {

Expand Down Expand Up @@ -74,7 +75,9 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
*/
MediaStream(std::shared_ptr<Worker> worker, std::shared_ptr<WebRtcConnection> connection,
const std::string& media_stream_id, const std::string& media_stream_label,
bool is_publisher, int session_version, const std::string priority);
bool is_publisher, int session_version, const std::string priority,
std::vector<std::string> handler_order = {},
std::map<std::string, std::shared_ptr<erizo::CustomHandler>> handler_pointer_dic = {});
/**
* Destructor.
*/
Expand Down Expand Up @@ -241,6 +244,13 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
std::shared_ptr<PacketBufferService> packet_buffer_;
std::shared_ptr<HandlerManager> handler_manager_;

void addHandlerInPosition(Positions position,
std::map<std::string, std::shared_ptr<erizo::CustomHandler>> handler_pointer_dic,
std::vector<std::string> handler_order);
std::vector<std::string> handler_order;
std::map<std::string, std::shared_ptr<erizo::CustomHandler>> handler_pointer_dic;


Pipeline::Ptr pipeline_;

std::shared_ptr<Worker> worker_;
Expand Down
24 changes: 24 additions & 0 deletions erizo/src/erizo/handlers/HandlerImporter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#include "HandlerImporter.h"
namespace erizo {

HandlerImporter::HandlerImporter() {}

void HandlerImporter::loadHandlers(std::vector<std::map<std::string, std::string>> custom_handlers) {
for (unsigned int i = 0; i < custom_handlers.size(); i++) {
std::map <std::string, std::string> parameters = custom_handlers[i];
std::string handler_name = parameters.at("name");
handler_order.push_back(handler_name);
std::shared_ptr <CustomHandler> ptr;
HandlersEnum handler_enum = handlers_dic[handler_name];

switch (handler_enum) {
case LoggerHandlerEnum :
ptr = std::make_shared<LoggerHandler>(parameters);
break;
default:
break;
}
handlers_pointer_dic.insert({handler_name, ptr});
}
}
} // namespace erizo
41 changes: 41 additions & 0 deletions erizo/src/erizo/handlers/HandlerImporter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//
// Created by licode20 on 22/3/21.
//

#ifndef ERIZO_SRC_ERIZO_HANDLERS_HANDLERIMPORTER_H_
#define ERIZO_SRC_ERIZO_HANDLERS_HANDLERIMPORTER_H_

#include <string>
#include <map>
#include "../pipeline/Handler.h"
#include "./logger.h"
#include "handlers/LoggerHandler.h"



namespace erizo {


class HandlerImporterInterface {
DECLARE_LOGGER();
public:
void loadHandlers(std::vector<std::map<std::string, std::string>> custom_handlers);
std::map<std::string, std::shared_ptr<erizo::CustomHandler>> handlers_pointer_dic = {};
std::vector<std::string> handler_order = {};
};


class HandlerImporter: public HandlerImporterInterface {
DECLARE_LOGGER();
public:
HandlerImporter();
void loadHandlers(std::vector<std::map<std::string, std::string>> custom_handlers);
private:
enum HandlersEnum {LoggerHandlerEnum};

std::map<std::string, HandlersEnum> handlers_dic ={{"LoggerHandler", LoggerHandlerEnum}};
};

} // namespace erizo

#endif // ERIZO_SRC_ERIZO_HANDLERS_HANDLERIMPORTER_H_
92 changes: 92 additions & 0 deletions erizo/src/erizo/handlers/handlers/LoggerHandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@

#include "LoggerHandler.h"

#include "../../rtp/RtpHeaders.h" // Imports headers utilities

namespace erizo {

DEFINE_LOGGER(LoggerHandler, "rtp.LoggerHandler"); // Defines handler logger name

LoggerHandler::LoggerHandler(std::map <std::string, std::string> parameters) {
ELOG_DEBUG("Creating Logger Handler ");
}


LoggerHandler::~LoggerHandler() {
ELOG_DEBUG("Destroying Logger Handler ");
}

Positions LoggerHandler::position() {
return MIDDLE;
}

void LoggerHandler::write(Context *ctx, std::shared_ptr <DataPacket> packet) {
if (is_enabled) {
ELOG_DEBUG("------------------------");
ELOG_DEBUG("Writing packet");
ELOG_DEBUG("Packet length %d", packet->length);
rtcp_head = reinterpret_cast<RtcpHeader*> (packet->data); // Read data as RTCP packet
if (rtcp_head->isRtcp()) {
ELOG_DEBUG("Rtcp packet received");
} else if (packet->type == VIDEO_PACKET) {
rtp_head = reinterpret_cast<RtpHeader*> (packet->data); // Read RTP data and log some fields
ELOG_DEBUG("Rtp video packet received");
ELOG_DEBUG("Timestamp %d", rtp_head->timestamp);
ELOG_DEBUG("SSRC %d", rtp_head->ssrc);
} else if (packet->type == AUDIO_PACKET) { // Audio packets
ELOG_DEBUG("Audio packet received");
} else if (packet->type == OTHER_PACKET) { // Data or other packets
ELOG_DEBUG("Other packet received");
}
if (packet->priority == HIGH_PRIORITY) { // Log priority field for packet
ELOG_DEBUG("Packet with high priority");
} else if (packet->priority == LOW_PRIORITY) {
ELOG_DEBUG("Packet with low priority");
}
ctx->fireWrite(std::move(packet)); // Gives packet ownership to next handler in pipeline
}
}

void LoggerHandler::read(Context *ctx, std::shared_ptr <DataPacket> packet) {
if (is_enabled) {
ELOG_DEBUG("------------------------");
ELOG_DEBUG("Reading packet ");
ELOG_DEBUG("Packet length %d", packet->length);
rtcp_head = reinterpret_cast<RtcpHeader*> (packet->data);
if (rtcp_head->isRtcp()) {
ELOG_DEBUG("Rtcp packet received");
} else if (packet->type == VIDEO_PACKET) {
rtp_head = reinterpret_cast<RtpHeader*> (packet->data);
ELOG_DEBUG("Rtp video packet received");
ELOG_DEBUG("Timestamp %d", rtp_head->timestamp);
ELOG_DEBUG("SSRC %d", rtp_head->ssrc);
} else if (packet->type == AUDIO_PACKET) {
ELOG_DEBUG("Audio packet received");
} else if (packet->type == OTHER_PACKET) {
ELOG_DEBUG("Other packet received");
}
if (packet->priority == HIGH_PRIORITY) {
ELOG_DEBUG("Packet with high priority");
} else if (packet->priority == LOW_PRIORITY) {
ELOG_DEBUG("Packet with low priority");
}
ELOG_DEBUG("------------------------");
ctx->fireRead(std::move(packet)); // Gives packet ownership to next handler in pipeline
}
}

void LoggerHandler::enable() {
is_enabled = true;
}

void LoggerHandler::disable() {
is_enabled = false;
}

std::string LoggerHandler::getName() {
return "LoggerHandler";
}

void LoggerHandler::notifyUpdate() {
}
} // namespace erizo
29 changes: 29 additions & 0 deletions erizo/src/erizo/handlers/handlers/LoggerHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#ifndef ERIZO_SRC_ERIZO_HANDLERS_HANDLERS_LOGGERHANDLER_H_
#define ERIZO_SRC_ERIZO_HANDLERS_HANDLERS_LOGGERHANDLER_H_

#include "../../pipeline/Handler.h" // Import CustomHandler interface
#include "../../MediaDefinitions.h" // Imports DataPacket struct
#include "./logger.h" // Include logger


namespace erizo { // Handlers are include in erizo namespace
class LoggerHandler : public CustomHandler {
DECLARE_LOGGER(); // Declares logger for debugging and logging
public:
explicit LoggerHandler(std::map <std::string, std::string> parameters);
~LoggerHandler();
void read(Context *ctx, std::shared_ptr <DataPacket> packet) override; // Process packet sent by client
void write(Context *ctx, std::shared_ptr <DataPacket> packet) override; // Process packet sent to client
Positions position() override; // Returns position to place handler.
void enable() override; // Enable handler
void disable() override; // Disable handler
std::string getName() override; // Returns handler name
void notifyUpdate() override; // Recieves update
private:
RtcpHeader* rtcp_head;
RtpHeader* rtp_head;
bool is_enabled = true;
};
} // namespace erizo

#endif // ERIZO_SRC_ERIZO_HANDLERS_HANDLERS_LOGGERHANDLER_H_"
11 changes: 11 additions & 0 deletions erizo/src/erizo/pipeline/Handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ class HandlerAdapter : public Handler {
void notifyEvent(MediaEventPtr event) override {
}
};

enum Positions{AFTER_READER , MIDDLE, BEFORE_WRITER };

class CustomHandler : public Handler {
public:
virtual ~CustomHandler() = default;
static const HandlerDir dir = HandlerDir::BOTH;
virtual Positions position() = 0;
};


} // namespace erizo

#endif // ERIZO_SRC_ERIZO_PIPELINE_HANDLER_H_
Loading

0 comments on commit da4c767

Please sign in to comment.