Skip to content

Commit

Permalink
Add ioThreadPool and all nICEr logic (#884)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcague authored Jun 13, 2017
1 parent b4b1639 commit b595e39
Show file tree
Hide file tree
Showing 37 changed files with 1,769 additions and 167 deletions.
18 changes: 9 additions & 9 deletions erizo/src/erizo/DtlsTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ void Resender::scheduleNext() {
DtlsTransport::DtlsTransport(MediaType med, const std::string &transport_name, const std::string& connection_id,
bool bundle, bool rtcp_mux, std::weak_ptr<TransportListener> transport_listener,
const IceConfig& iceConfig, std::string username, std::string password,
bool isServer, std::shared_ptr<Worker> worker):
Transport(med, transport_name, connection_id, bundle, rtcp_mux, transport_listener, iceConfig, worker),
bool isServer, std::shared_ptr<Worker> worker, std::shared_ptr<IOWorker> io_worker):
Transport(med, transport_name, connection_id, bundle, rtcp_mux, transport_listener, iceConfig, worker, io_worker),
unprotect_packet_{std::make_shared<dataPacket>()},
readyRtp(false), readyRtcp(false), isServer_(isServer) {
ELOG_DEBUG("%s message: constructor, transportName: %s, isBundle: %d", toLog(), transport_name.c_str(), bundle);
Expand Down Expand Up @@ -106,11 +106,11 @@ DtlsTransport::DtlsTransport(MediaType med, const std::string &transport_name, c
iceConfig_.ice_components = comps;
iceConfig_.username = username;
iceConfig_.password = password;
#ifdef USE_NICER
ice_.reset(NicerConnection::create(this, iceConfig_));
#else
ice_.reset(LibNiceConnection::create(this, iceConfig_));
#endif
if (iceConfig_.use_nicer) {
ice_ = NicerConnection::create(io_worker_, this, iceConfig_);
} else {
ice_.reset(LibNiceConnection::create(this, iceConfig_));
}
rtp_resender_.reset(new Resender(this, dtlsRtp.get()));
if (!rtcp_mux) {
rtcp_resender_.reset(new Resender(this, dtlsRtcp.get()));
Expand Down Expand Up @@ -321,14 +321,14 @@ std::string DtlsTransport::getMyFingerprint() {
}

void DtlsTransport::updateIceState(IceState state, IceConnection *conn) {
ELOG_DEBUG("%s message:NiceState, transportName: %s, state: %d, isBundle: %d",
ELOG_DEBUG("%s message:IceState, transportName: %s, state: %d, isBundle: %d",
toLog(), transport_name.c_str(), state, bundle_);
if (state == IceState::INITIAL && this->getTransportState() != TRANSPORT_STARTED) {
updateTransportState(TRANSPORT_STARTED);
} else if (state == IceState::CANDIDATES_RECEIVED && this->getTransportState() != TRANSPORT_GATHERED) {
updateTransportState(TRANSPORT_GATHERED);
} else if (state == IceState::FAILED) {
ELOG_DEBUG("%s message: Nice Failed", toLog());
ELOG_DEBUG("%s message: Ice Failed", toLog());
running_ = false;
updateTransportState(TRANSPORT_FAILED);
} else if (state == IceState::READY) {
Expand Down
3 changes: 2 additions & 1 deletion erizo/src/erizo/DtlsTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class DtlsTransport : dtls::DtlsReceiver, public Transport {
public:
DtlsTransport(MediaType med, const std::string& transport_name, const std::string& connection_id, bool bundle,
bool rtcp_mux, std::weak_ptr<TransportListener> transport_listener, const IceConfig& iceConfig,
std::string username, std::string password, bool isServer, std::shared_ptr<Worker> worker);
std::string username, std::string password, bool isServer, std::shared_ptr<Worker> worker,
std::shared_ptr<IOWorker> io_worker);
virtual ~DtlsTransport();
void connectionStateChanged(IceState newState);
std::string getMyFingerprint();
Expand Down
50 changes: 50 additions & 0 deletions erizo/src/erizo/IceConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

namespace erizo {

DEFINE_LOGGER(IceConnection, "IceConnection")

IceConnection::IceConnection(IceConnectionListener* listener, const IceConfig& ice_config) :
listener_{listener}, ice_state_{INITIAL}, ice_config_{ice_config} {
for (unsigned int i = 1; i <= ice_config_.ice_components; i++) {
Expand All @@ -19,6 +21,7 @@ IceConnection::IceConnection(IceConnectionListener* listener, const IceConfig& i
}

IceConnection::~IceConnection() {
this->listener_ = nullptr;
}

void IceConnection::setIceListener(IceConnectionListener *listener) {
Expand All @@ -37,4 +40,51 @@ std::string IceConnection::getLocalPassword() {
return upass_;
}


IceState IceConnection::checkIceState() {
return ice_state_;
}

std::string IceConnection::iceStateToString(IceState state) const {
switch (state) {
case IceState::INITIAL: return "initial";
case IceState::FINISHED: return "finished";
case IceState::FAILED: return "failed";
case IceState::READY: return "ready";
case IceState::CANDIDATES_RECEIVED: return "cand_received";
}
return "unknown";
}

void IceConnection::updateIceState(IceState state) {
if (state <= ice_state_) {
if (state != IceState::READY)
ELOG_WARN("%s message: unexpected ice state transition, iceState: %s, newIceState: %s",
toLog(), iceStateToString(ice_state_).c_str(), iceStateToString(state).c_str());
return;
}

ELOG_INFO("%s message: iceState transition, ice_config_.transport_name: %s, iceState: %s, newIceState: %s, this: %p",
toLog(), ice_config_.transport_name.c_str(),
iceStateToString(ice_state_).c_str(), iceStateToString(state).c_str(), this);
this->ice_state_ = state;
switch (ice_state_) {
case IceState::FINISHED:
return;
case IceState::FAILED:
ELOG_WARN("%s message: Ice Failed", toLog());
break;

case IceState::READY:
case IceState::CANDIDATES_RECEIVED:
break;
default:
break;
}

// Important: send this outside our state lock. Otherwise, serious risk of deadlock.
if (this->listener_ != NULL)
this->listener_->updateIceState(state, this);
}

} // namespace erizo
17 changes: 10 additions & 7 deletions erizo/src/erizo/IceConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class IceConfig {
std::string stun_server, network_interface;
uint16_t stun_port, turn_port, min_port, max_port;
bool should_trickle;
bool use_nicer;
IceConfig()
: media_type{MediaType::OTHER},
transport_name{""},
Expand All @@ -67,7 +68,8 @@ class IceConfig {
turn_port{0},
min_port{0},
max_port{0},
should_trickle{false} {
should_trickle{false},
use_nicer{false} {
}
};

Expand All @@ -86,32 +88,33 @@ class IceConnectionListener {
};

class IceConnection : public LogContext {
DECLARE_LOGGER();

public:
IceConnection(IceConnectionListener* listener, const IceConfig& ice_config);

virtual ~IceConnection();

virtual void start() = 0;
virtual bool setRemoteCandidates(const std::vector<CandidateInfo> &candidates, bool is_bundle) = 0;
virtual void gatheringDone(uint stream_id) = 0;
virtual void getCandidate(uint stream_id, uint component_id, const std::string &foundation) = 0;
virtual void setRemoteCredentials(const std::string& username, const std::string& password) = 0;
virtual int sendData(unsigned int compId, const void* buf, int len) = 0;
virtual int sendData(unsigned int component_id, const void* buf, int len) = 0;

virtual void updateIceState(IceState state) = 0;
virtual IceState checkIceState() = 0;
virtual void updateComponentState(unsigned int compId, IceState state) = 0;
virtual void onData(unsigned int component_id, char* buf, int len) = 0;
virtual CandidatePair getSelectedPair() = 0;
virtual void setReceivedLastCandidate(bool hasReceived) = 0;
virtual void close() = 0;

virtual void updateIceState(IceState state);
virtual IceState checkIceState();
virtual void setIceListener(IceConnectionListener *listener);
virtual IceConnectionListener* getIceListener();

virtual std::string getLocalUsername();
virtual std::string getLocalPassword();

private:
virtual std::string iceStateToString(IceState state) const;

protected:
inline const char* toLog() {
Expand Down
60 changes: 7 additions & 53 deletions erizo/src/erizo/LibNiceConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ void LibNiceConnection::onData(unsigned int component_id, char* buf, int len) {
}
}

int LibNiceConnection::sendData(unsigned int compId, const void* buf, int len) {
int LibNiceConnection::sendData(unsigned int component_id, const void* buf, int len) {
int val = -1;
if (this->checkIceState() == IceState::READY) {
val = lib_nice_->NiceAgentSend(agent_, 1, compId, len, reinterpret_cast<const gchar*>(buf));
val = lib_nice_->NiceAgentSend(agent_, 1, component_id, len, reinterpret_cast<const gchar*>(buf));
}
if (val != len) {
ELOG_DEBUG("%s message: Sending less data than expected, sent: %d, to_send: %d", toLog(), val, len);
Expand Down Expand Up @@ -323,7 +323,7 @@ bool LibNiceConnection::setRemoteCandidates(const std::vector<CandidateInfo> &ca

void LibNiceConnection::gatheringDone(uint stream_id) {
ELOG_DEBUG("%s message: gathering done, stream_id: %u", toLog(), stream_id);
this->updateIceState(IceState::CANDIDATES_RECEIVED);
updateIceState(IceState::CANDIDATES_RECEIVED);
}

void LibNiceConnection::getCandidate(uint stream_id, uint component_id, const std::string &foundation) {
Expand Down Expand Up @@ -400,10 +400,10 @@ void LibNiceConnection::setRemoteCredentials(const std::string& username, const
lib_nice_->NiceAgentSetRemoteCredentials(agent_, (guint) 1, username.c_str(), password.c_str());
}

void LibNiceConnection::updateComponentState(unsigned int compId, IceState state) {
void LibNiceConnection::updateComponentState(unsigned int component_id, IceState state) {
ELOG_DEBUG("%s message: new ice component state, newState: %u, transportName: %s, componentId %u, iceComponents: %u",
toLog(), state, ice_config_.transport_name.c_str(), compId, ice_config_.ice_components);
comp_state_list_[compId] = state;
toLog(), state, ice_config_.transport_name.c_str(), component_id, ice_config_.ice_components);
comp_state_list_[component_id] = state;
if (state == IceState::READY) {
for (unsigned int i = 1; i <= ice_config_.ice_components; i++) {
if (comp_state_list_[i] != IceState::READY) {
Expand All @@ -413,7 +413,7 @@ void LibNiceConnection::updateComponentState(unsigned int compId, IceState state
} else if (state == IceState::FAILED) {
if (receivedLastCandidate_) {
ELOG_WARN("%s message: component failed, ice_config_.transport_name: %s, componentId: %u",
toLog(), ice_config_.transport_name.c_str(), compId);
toLog(), ice_config_.transport_name.c_str(), component_id);
for (unsigned int i = 1; i <= ice_config_.ice_components; i++) {
if (comp_state_list_[i] != IceState::FAILED) {
return;
Expand All @@ -427,52 +427,6 @@ void LibNiceConnection::updateComponentState(unsigned int compId, IceState state
this->updateIceState(state);
}

IceState LibNiceConnection::checkIceState() {
return ice_state_;
}

std::string LibNiceConnection::iceStateToString(IceState state) const {
switch (state) {
case IceState::INITIAL: return "initial";
case IceState::FINISHED: return "finished";
case IceState::FAILED: return "failed";
case IceState::READY: return "ready";
case IceState::CANDIDATES_RECEIVED: return "cand_received";
}
return "unknown";
}

void LibNiceConnection::updateIceState(IceState state) {
if (state <= ice_state_) {
if (state != IceState::READY)
ELOG_WARN("%s message: unexpected ice state transition, iceState: %s, newIceState: %s",
toLog(), iceStateToString(ice_state_).c_str(), iceStateToString(state).c_str());
return;
}

ELOG_INFO("%s message: iceState transition, ice_config_.transport_name: %s, iceState: %s, newIceState: %s, this: %p",
toLog(), ice_config_.transport_name.c_str(),
iceStateToString(ice_state_).c_str(), iceStateToString(state).c_str(), this);
this->ice_state_ = state;
switch (ice_state_) {
case IceState::FINISHED:
return;
case IceState::FAILED:
ELOG_WARN("%s message: Ice Failed", toLog());
break;

case IceState::READY:
case IceState::CANDIDATES_RECEIVED:
break;
default:
break;
}

// Important: send this outside our state lock. Otherwise, serious risk of deadlock.
if (this->listener_ != NULL)
this->listener_->updateIceState(state, this);
}

std::string getHostTypeFromCandidate(NiceCandidate *candidate) {
switch (candidate->type) {
case NICE_CANDIDATE_TYPE_HOST: return "host";
Expand Down
11 changes: 4 additions & 7 deletions erizo/src/erizo/LibNiceConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,12 @@ class LibNiceConnection : public IceConnection {
*/
void start() override;
bool setRemoteCandidates(const std::vector<CandidateInfo> &candidates, bool is_bundle) override;
void gatheringDone(uint stream_id) override;
void getCandidate(uint stream_id, uint component_id, const std::string &foundation) override;
void gatheringDone(uint stream_id);
void getCandidate(uint stream_id, uint component_id, const std::string &foundation);
void setRemoteCredentials(const std::string& username, const std::string& password) override;
int sendData(unsigned int compId, const void* buf, int len) override;
int sendData(unsigned int component_id, const void* buf, int len) override;

void updateIceState(IceState state) override;
IceState checkIceState() override;
void updateComponentState(unsigned int compId, IceState state) override;
void updateComponentState(unsigned int component_id, IceState state);
void onData(unsigned int component_id, char* buf, int len) override;
CandidatePair getSelectedPair() override;
void setReceivedLastCandidate(bool hasReceived) override;
Expand All @@ -66,7 +64,6 @@ class LibNiceConnection : public IceConnection {
static LibNiceConnection* create(IceConnectionListener *listener, const IceConfig& ice_config);

private:
std::string iceStateToString(IceState state) const;
void mainLoop();

private:
Expand Down
Loading

0 comments on commit b595e39

Please sign in to comment.