From 4a95481eab58f0eab3d8d8649014fb54b5009b79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Mon, 7 Dec 2020 10:57:13 +0100 Subject: [PATCH 1/7] [core] Fixed proper reporting of sending blocked state. Fixed API value for connect-non-blocking --- srtcore/api.cpp | 3 + srtcore/group.cpp | 151 ++++++++++++++++++++++++++++++++++++++++------ srtcore/group.h | 2 +- 3 files changed, 136 insertions(+), 20 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 2a23dace2..eac0da84b 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1839,6 +1839,9 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar if (retval == -1) throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + if (!block_new_opened) + return 0; + return retval; } #endif diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 9b3a26a01..e7d50a7ec 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -1369,7 +1369,26 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) // { send_CheckBrokenSockets() - if (!pending.empty()) + // Make an extra loop check to see if we could be + // in a condition of "all sockets either blocked or pending" + + int nsuccessful = 0; + int nblocked = 0; + bool is_pending_blocked = false; + for (vector::iterator is = sendstates.begin(); is != sendstates.end(); ++is) + { + if (is->stat == -1) + { + if (is->code == SRT_EASYNCSND) + ++nblocked; + } + else + { + nsuccessful++; + } + } + + if (!pending.empty() || nblocked) { HLOGC(gslog.Debug, log << "grp/sendBroadcast: found pending sockets, polling them."); @@ -1386,12 +1405,24 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) } else { + int swait_timeout = 0; + + // There's also a hidden condition here that is the upper if condition. + is_pending_blocked = (nsuccessful == 0); + + // If this is the case when + if (m_bSynSending && is_pending_blocked) + { + HLOGC(gslog.Debug, log << "grp/sendBroadcast: will block for " << m_iSndTimeOut << " - waiting for any writable in blocking mode"); + swait_timeout = m_iSndTimeOut; + } + { InvertedLock ug(m_GroupLock); THREAD_PAUSED(); m_pGlobal->m_EPoll.swait( - *m_SndEpolld, sready, 0, false /*report by retval*/); // Just check if anything happened + *m_SndEpolld, (sready), swait_timeout, false /*report by retval*/); // Just check if anything happened THREAD_RESUMED(); } @@ -1404,6 +1435,10 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) HLOGC(gslog.Debug, log << "grp/sendBroadcast: RDY: " << DisplayEpollResults(sready)); // sockets in EX: should be moved to wipeme. + // IMPORTANT: we check only PENDING sockets (not blocked) because only + // pending sockets might report ERR epoll without being explicitly broken. + // Sockets that did connect and just have buffer full will be always broken, + // if they're going to report ERR in epoll. for (vector::iterator i = pending.begin(); i != pending.end(); ++i) { if (CEPoll::isready(sready, *i, SRT_EPOLL_ERR)) @@ -1415,6 +1450,9 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) int no_events = 0; m_pGlobal->m_EPoll.update_usock(m_SndEID, *i, &no_events); } + + if (CEPoll::isready(sready, *i, SRT_EPOLL_OUT)) + is_pending_blocked = false; } // After that, all sockets that have been reported @@ -1431,7 +1469,10 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) if (m_bClosing) throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); - send_CloseBrokenSockets(wipeme); + // Just for a case, when a socket that was blocked or pending + // had switched to write-enabled, + + send_CloseBrokenSockets((wipeme)); // wipeme will be cleared by this function // Re-check after the waiting lock has been reacquired if (m_bClosing) @@ -1693,9 +1734,18 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) if (none_succeeded) { - HLOGC(gslog.Debug, log << "grp/sendBroadcast: all links broken (none succeeded to send a payload)"); m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false); - m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true); + if (!m_bSynSending && (is_pending_blocked || was_blocked)) + { + HLOGC(gslog.Debug, log << "grp/sendBroadcast: no links are ready for sending"); + ercode = SRT_EASYNCSND; + } + else + { + HLOGC(gslog.Debug, log << "grp/sendBroadcast: all links broken (none succeeded to send a payload)"); + m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true); + } + // Reparse error code, if set. // It might be set, if the last operation was failed. // If any operation succeeded, this will not be executed anyway. @@ -3320,7 +3370,7 @@ size_t CUDTGroup::sendBackup_CheckNeedActivate(const vector& idler } // [[using locked(this->m_GroupLock)]] -void CUDTGroup::send_CheckPendingSockets(const vector& pending, vector& w_wipeme) +bool CUDTGroup::send_CheckPendingSockets(const vector& pending, int nsuccessful, int nblocked, vector& w_wipeme) { // If we have at least one stable link, then select a link that have the // highest priority and silence the rest. @@ -3332,7 +3382,8 @@ void CUDTGroup::send_CheckPendingSockets(const vector& pending, vecto // we have one link that is stable and the freshly activated link is actually // stable too, we'll check this next time. // - if (!pending.empty()) + bool is_pending_blocked = false; + if (!pending.empty() || nblocked) { HLOGC(gslog.Debug, log << "grp/send*: found pending sockets, polling them."); @@ -3348,19 +3399,34 @@ void CUDTGroup::send_CheckPendingSockets(const vector& pending, vecto } else { + int swait_timeout = 0; + + // There's also a hidden condition here that is the upper if condition. + is_pending_blocked = (nsuccessful == 0); + + // If this is the case when + if (m_bSynSending && is_pending_blocked) + { + HLOGC(gslog.Debug, log << "grp/sendBroadcast: will block for " << m_iSndTimeOut << " - waiting for any writable in blocking mode"); + swait_timeout = m_iSndTimeOut; + } + // Some sockets could have been closed in the meantime. if (m_SndEpolld->watch_empty()) + { + LOGC(gslog.Error, log << "grp/send*: IPE: reported pending sockets, but EID is empty - ERROR!"); throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + } { InvertedLock ug(m_GroupLock); m_pGlobal->m_EPoll.swait( - *m_SndEpolld, sready, 0, false /*report by retval*/); // Just check if anything happened + *m_SndEpolld, sready, swait_timeout, false /*report by retval*/); // Just check if anything happened } if (m_bClosing) { - HLOGC(gslog.Debug, log << "grp/send...: GROUP CLOSED, ABANDONING"); + LOGC(gslog.Error, log << "grp/send...: GROUP CLOSED, ABANDONING"); throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); } @@ -3377,6 +3443,9 @@ void CUDTGroup::send_CheckPendingSockets(const vector& pending, vecto int no_events = 0; m_pGlobal->m_EPoll.update_usock(m_SndEID, *i, &no_events); } + + if (CEPoll::isready(sready, *i, SRT_EPOLL_OUT)) + is_pending_blocked = false; } // After that, all sockets that have been reported @@ -3388,6 +3457,8 @@ void CUDTGroup::send_CheckPendingSockets(const vector& pending, vecto m_pGlobal->m_EPoll.clear_ready_usocks(*m_SndEpolld, SRT_EPOLL_OUT); } } + + return is_pending_blocked; } // [[using locked(this->m_GroupLock)]] @@ -3489,13 +3560,13 @@ void CUDTGroup::sendBackup_CheckParallelLinks(const vector& unstable, { // wipeme wiped, pending sockets checked, it can only mean that // all sockets are broken. - HLOGC(gslog.Debug, log << "grp/sendBackup: epolld empty - all sockets broken?"); + LOGC(gslog.Error, log << "grp/sendBackup: epolld empty - all sockets broken?"); throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); } if (!m_bSynSending) { - HLOGC(gslog.Debug, log << "grp/sendBackup: non-blocking mode - exit with no-write-ready"); + LOGC(gslog.Error , log << "grp/sendBackup: non-blocking mode - exit with no-write-ready"); throw CUDTException(MJ_AGAIN, MN_WRAVAIL, 0); } // Here is the situation that the only links left here are: @@ -3523,7 +3594,7 @@ void CUDTGroup::sendBackup_CheckParallelLinks(const vector& unstable, // Some sockets could have been closed in the meantime. if (m_SndEpolld->watch_empty()) { - HLOGC(gslog.Debug, log << "grp/sendBackup: no more sendable sockets - group broken"); + LOGC(gslog.Error, log << "grp/sendBackup: no more sendable sockets - group broken"); throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); } @@ -3536,6 +3607,7 @@ void CUDTGroup::sendBackup_CheckParallelLinks(const vector& unstable, if (brdy == 0) // SND timeout exceeded { + LOGC(gslog.Error, log << "grp/sendBackup: not ready to write"); throw CUDTException(MJ_AGAIN, MN_WRAVAIL, 0); } @@ -3572,7 +3644,10 @@ void CUDTGroup::sendBackup_CheckParallelLinks(const vector& unstable, // Re-check after the waiting lock has been reacquired if (m_bClosing) + { + LOGC(gslog.Error, log << "grp/sendBackup: group closed in the meantime"); throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + } if (brdy == -1 || ndead >= nlinks) { @@ -3738,6 +3813,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc) // Avoid stupid errors in the beginning. if (len <= 0) { + LOGC(gslog.Error, log << "grp/send(backup): negative length: " << len); throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); } @@ -3772,6 +3848,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc) if (m_bClosing) { leaveCS(m_pGlobal->m_GlobControlLock); + LOGC(gslog.Error, log << "grp/send(backup): Cannot send, connection lost!"); throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); } @@ -3890,6 +3967,10 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc) // and therefore need to be activated. set sendable_pri; + // Likely will need to survive unlock-lock cycle on the group, + // so keep this by IDs. + vector blocked; + // We believe that we need to send the payload over every sendable link anyway. for (vector::iterator snd = sendable.begin(); snd != sendable.end(); ++snd) { @@ -3931,6 +4012,9 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc) if (is_unstable && is_zero(u.m_tsUnstableSince)) // Add to unstable only if it wasn't unstable already insert_uniq((unstable), d); + if (is_unstable) + blocked.push_back(d->id); + const Sendstate cstate = {d->id, d, stat, erc}; sendstates.push_back(cstate); d->sndresult = stat; @@ -4103,31 +4187,60 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc) << " unstable=" << unstable.size()); } - send_CheckPendingSockets(pending, (wipeme)); + int nsuccess = 0; + int nblocked = 0; + for (vector::iterator is = sendstates.begin(); is != sendstates.end(); ++is) + { + if (is->stat == -1) + { + if (is->code == SRT_EASYNCSND) + ++nblocked; + } + else + { + nsuccess++; + } + } + + bool is_pending_blocked = send_CheckPendingSockets(pending, nsuccess, nblocked, (wipeme)); // Re-check after the waiting lock has been reacquired if (m_bClosing) + { + LOGC(gslog.Error, log << "grp/sendBackup: closing the group during operation"); throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + } send_CloseBrokenSockets((wipeme)); // Re-check after the waiting lock has been reacquired if (m_bClosing) + { + LOGC(gslog.Error, log << "grp/sendBackup: closing the group during operation"); throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + } sendBackup_CheckParallelLinks(unstable, (parallel), (final_stat), (none_succeeded), (w_mc), (cx)); // (closing condition checked inside this call) if (none_succeeded) { - HLOGC(gslog.Debug, log << "grp/sendBackup: all links broken (none succeeded to send a payload)"); m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false); - m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true); - // Reparse error code, if set. - // It might be set, if the last operation was failed. - // If any operation succeeded, this will not be executed anyway. + if (!m_bSynSending && (is_pending_blocked || nblocked)) + { + HLOGC(gslog.Debug, log << "grp/sendBackup: no links are ready for sending"); + throw CUDTException(MJ_AGAIN, MN_WRAVAIL); + } + else + { + HLOGC(gslog.Debug, log << "grp/sendBackup: all links broken (none succeeded to send a payload)"); + m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true); + // Reparse error code, if set. + // It might be set, if the last operation was failed. + // If any operation succeeded, this will not be executed anyway. - throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + } } // Now fill in the socket table. Check if the size is enough, if not, diff --git a/srtcore/group.h b/srtcore/group.h index 75d58eac6..2465d0cbb 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -260,7 +260,7 @@ class CUDTGroup std::vector& w_parallel, std::vector& w_wipeme, const std::string& activate_reason); - void send_CheckPendingSockets(const std::vector& pending, std::vector& w_wipeme); + bool send_CheckPendingSockets(const std::vector& pending, int nsuccessful, int nblocked, std::vector& w_wipeme); void send_CloseBrokenSockets(std::vector& w_wipeme); void sendBackup_CheckParallelLinks(const std::vector& unstable, std::vector& w_parallel, From c272a059dc192878ca92e8c75170d041b6c53180 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Tue, 8 Dec 2020 11:40:49 +0100 Subject: [PATCH 2/7] Fixed return 0 on multi-connect call. Added lock-preventive emergency exit checks in tsbpd. Suppressed IPE log when unsubscribing --- srtcore/api.cpp | 2 +- srtcore/core.cpp | 36 +++++++++++++++++++++++++++++++----- srtcore/epoll.cpp | 12 +++++++++--- 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index eac0da84b..5d320bcc2 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1839,7 +1839,7 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar if (retval == -1) throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); - if (!block_new_opened) + if (!block_new_opened && arraysize > 1) return 0; return retval; diff --git a/srtcore/core.cpp b/srtcore/core.cpp index e39bbce32..4278becac 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -5744,6 +5744,12 @@ void *CUDT::tsbpd(void *param) tsbpdtime = steady_clock::time_point(); } + if (self->m_bClosing) + { + HLOGC(tslog.Debug, log << "tsbpd: IPE? Closing flag set in the meantime of checking. Exiting"); + break; + } + if (!is_zero(tsbpdtime)) { const steady_clock::duration timediff = tsbpdtime - steady_clock::now(); @@ -5774,13 +5780,25 @@ void *CUDT::tsbpd(void *param) */ HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack"); self->m_bTsbPdAckWakeup = true; - THREAD_PAUSED(); - tsbpd_cc.wait(); - THREAD_RESUMED(); + + bool signaled = false; + while (!signaled) + { + // For safety reasons, do wakeup once per 1s and re-check the flag. + THREAD_PAUSED(); + signaled = tsbpd_cc.wait_for(seconds_from(1)); + THREAD_RESUMED(); + if (self->m_bClosing) + { + HLOGC(tslog.Debug, log << "tsbpd: IPE? Closing flag set in the meantime of waiting. Exiting"); + goto ExitLoops; + } + } } HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!"); } +ExitLoops: THREAD_EXIT(); HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING"); return NULL; @@ -7819,6 +7837,13 @@ void CUDT::destroySynch() void CUDT::releaseSynch() { SRT_ASSERT(m_bClosing); +#if ENABLE_HEAVY_LOGGING + if (!m_bClosing) + { + LOGC(smlog.Debug, log << "releaseSynch: IPE: m_bClosing not set to false, TSBPD might hangup!"); + } +#endif + m_bClosing = true; // wake up user calls CSync::lock_signal(m_SendBlockCond, m_SendBlockLock); @@ -7826,8 +7851,8 @@ void CUDT::releaseSynch() leaveCS(m_SendLock); // Awake tsbpd() and srt_recv*(..) threads for them to check m_bClosing. - CSync::lock_signal(m_RecvDataCond, m_RecvLock); - CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock); + CSync::lock_broadcast(m_RecvDataCond, m_RecvLock); + CSync::lock_broadcast(m_RcvTsbPdCond, m_RecvLock); // Azquiring m_RcvTsbPdStartupLock protects race in starting // the tsbpd() thread in CUDT::processData(). @@ -11192,6 +11217,7 @@ void CUDT::checkTimers() void CUDT::updateBrokenConnection() { + m_bClosing = true; releaseSynch(); // app can call any UDT API to learn the connection_broken error s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true); diff --git a/srtcore/epoll.cpp b/srtcore/epoll.cpp index 9d2317c0d..3c021a672 100644 --- a/srtcore/epoll.cpp +++ b/srtcore/epoll.cpp @@ -864,9 +864,15 @@ int CEPoll::update_events(const SRTSOCKET& uid, std::set& eids, const int e CEPollDesc::Wait* pwait = ed.watch_find(uid); if (!pwait) { - // As this is mapped in the socket's data, it should be impossible. - LOGC(eilog.Error, log << "epoll/update: IPE: update struck E" - << (*i) << " which is NOT SUBSCRIBED to @" << uid); + // Don't print this error, if the intention was to clear the readiness. + // This is being usually done together with unsubscription, so this error + // would be misleading and unnecessary here. Report only cases when setting readiness. + if (enable) + { + // As this is mapped in the socket's data, it should be impossible. + LOGC(eilog.Error, log << "epoll/update: IPE: update struck E" + << (*i) << " which is NOT SUBSCRIBED to @" << uid); + } continue; } From 834b702c8e0864094ce7036c3e0bbb209f365375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Tue, 8 Dec 2020 14:44:44 +0100 Subject: [PATCH 3/7] Fixed TSBPD deadlock-prone continuation. Demoted epoll IPE log. Made tsbpd_cc wait 125ms interrupt for sanity check --- srtcore/api.cpp | 2 +- srtcore/core.cpp | 49 +++++++++++++++++++++++++++++++---------------- srtcore/epoll.cpp | 12 +++--------- 3 files changed, 37 insertions(+), 26 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 5d320bcc2..6d8050158 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -2711,7 +2711,7 @@ void CUDTUnited::checkBrokenSockets() || (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize()) || (j->second->m_pUDT->m_tsLingerExpiration <= steady_clock::now())) { - HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << j->second->m_SocketID); + HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED (closing=true) qualified @" << j->second->m_SocketID); j->second->m_pUDT->m_tsLingerExpiration = steady_clock::time_point(); j->second->m_pUDT->m_bClosing = true; j->second->m_tsClosureTimeStamp = steady_clock::now(); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 4278becac..b5c4269c9 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -5739,6 +5739,12 @@ void *CUDT::tsbpd(void *param) gkeeper.group->updateLatestRcv(self->m_parent); } } + + // After re-acquisition of the m_RecvLock, re-check the closing flag + if (self->m_bClosing) + { + break; + } #endif CGlobEvent::triggerEvent(); tsbpdtime = steady_clock::time_point(); @@ -5762,8 +5768,9 @@ void *CUDT::tsbpd(void *param) log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << current_pkt_seq << " T=" << FormatTime(tsbpdtime) << " - waiting " << count_milliseconds(timediff) << "ms"); THREAD_PAUSED(); - tsbpd_cc.wait_for(timediff); + const bool ATR_UNUSED signaled = tsbpd_cc.wait_for(timediff); THREAD_RESUMED(); + HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP on " << (signaled? "SIGNAL" : "TIMEOUIT") << "!!!"); } else { @@ -5784,21 +5791,31 @@ void *CUDT::tsbpd(void *param) bool signaled = false; while (!signaled) { - // For safety reasons, do wakeup once per 1s and re-check the flag. + // For safety reasons, do wakeup once per 1/8s and re-check the flag. + // This should be enough long time that during a normal transmission + // the TSBPD thread would be woken up much earlier when required by + // ACK per ACK timer (at most 10ms since the last check) and in case + // when this might result in a deadlock, it would only hold up to 125ms, + // which should be little harmful for the application. NOTE THAT THIS + // IS A SANITY CHECK FOR A SITUATION THAT SHALL NEVER HAPPEN. THREAD_PAUSED(); - signaled = tsbpd_cc.wait_for(seconds_from(1)); + signaled = tsbpd_cc.wait_for(milliseconds_from(125)); THREAD_RESUMED(); - if (self->m_bClosing) + if (self->m_bClosing && !signaled) { - HLOGC(tslog.Debug, log << "tsbpd: IPE? Closing flag set in the meantime of waiting. Exiting"); - goto ExitLoops; + HLOGC(tslog.Debug, log << "tsbpd: IPE: Closing flag set in the meantime of waiting. Continue to EXIT"); + + // This break doesn't have to be done in case when signaled + // because if so this current loop will be interrupted anyway, + // and the outer loop will be terminated at the check of self->m_bClosing. + // This is only a sanity check. + break; } } + HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP on " << (signaled? "SIGNAL" : "TIMEOUIT") << "!!!"); } - - HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!"); } -ExitLoops: + THREAD_EXIT(); HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING"); return NULL; @@ -6372,7 +6389,7 @@ bool CUDT::closeInternal() // Inform the threads handler to stop. m_bClosing = true; - HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE. Acquiring connection lock"); + HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE (closing=true). Acquiring connection lock"); ScopedLock connectguard(m_ConnectionLock); @@ -7837,13 +7854,11 @@ void CUDT::destroySynch() void CUDT::releaseSynch() { SRT_ASSERT(m_bClosing); -#if ENABLE_HEAVY_LOGGING if (!m_bClosing) { - LOGC(smlog.Debug, log << "releaseSynch: IPE: m_bClosing not set to false, TSBPD might hangup!"); + HLOGC(smlog.Debug, log << "releaseSynch: IPE: m_bClosing not set to false, TSBPD might hangup!"); + m_bClosing = true; } -#endif - m_bClosing = true; // wake up user calls CSync::lock_signal(m_SendBlockCond, m_SendBlockLock); @@ -9545,7 +9560,7 @@ void CUDT::processClose() m_bBroken = true; m_iBrokenCounter = 60; - HLOGP(smlog.Debug, "processClose: sent message and set flags"); + HLOGP(smlog.Debug, "processClose: (closing=true) sent message and set flags"); if (m_bTsbPd) { @@ -11057,7 +11072,8 @@ bool CUDT::checkExpTimer(const steady_clock::time_point& currtime, int check_rea // Application will detect this when it calls any UDT methods next time. // HLOGC(xtlog.Debug, - log << "CONNECTION EXPIRED after " << count_milliseconds(currtime - m_tsLastRspTime) << "ms"); + log << "CONNECTION EXPIRED after " << count_milliseconds(currtime - m_tsLastRspTime) + << "ms (closing=true)"); m_bClosing = true; m_bBroken = true; m_iBrokenCounter = 30; @@ -11217,6 +11233,7 @@ void CUDT::checkTimers() void CUDT::updateBrokenConnection() { + HLOGC(smlog.Debug, log << "updateBrokenConnection: setting closing=true and taking out epoll events"); m_bClosing = true; releaseSynch(); // app can call any UDT API to learn the connection_broken error diff --git a/srtcore/epoll.cpp b/srtcore/epoll.cpp index 3c021a672..f74d6cd6c 100644 --- a/srtcore/epoll.cpp +++ b/srtcore/epoll.cpp @@ -864,15 +864,9 @@ int CEPoll::update_events(const SRTSOCKET& uid, std::set& eids, const int e CEPollDesc::Wait* pwait = ed.watch_find(uid); if (!pwait) { - // Don't print this error, if the intention was to clear the readiness. - // This is being usually done together with unsubscription, so this error - // would be misleading and unnecessary here. Report only cases when setting readiness. - if (enable) - { - // As this is mapped in the socket's data, it should be impossible. - LOGC(eilog.Error, log << "epoll/update: IPE: update struck E" - << (*i) << " which is NOT SUBSCRIBED to @" << uid); - } + // As this is mapped in the socket's data, it should be impossible. + HLOGC(eilog.Debug, log << "epoll/update: IPE: update struck E" + << (*i) << " which is NOT SUBSCRIBED to @" << uid); continue; } From 8b4d0b75e881b551f372c59c5e10aa55c96772be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Tue, 8 Dec 2020 17:42:12 +0100 Subject: [PATCH 4/7] Fixed return value from srt_connect for groups --- srtcore/api.cpp | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 6d8050158..68e087ae5 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1688,6 +1688,26 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar vector broken; + // Return value rules: + // In non-blocking mode: + // - return Socket ID, if: + // - you requested only one connection in this call + // In blocking mode: + // - return Socket ID, if: + // - you requested only one connection in this call + // - you connect a group that was not connected yet + // - otherwise return 0 + + // Leave the last SID value in retval if you had only one + // connection to start. Otherwise override it with 0. + if (arraysize > 1) + retval = 0; + + // For blocking mode only, and only in case when the group + // was not yet connected, this retval could be overridden + // again with the first ready socket ID, and this socket ID + // will be returned. + while (block_new_opened) { if (spawned.empty()) @@ -1839,9 +1859,6 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar if (retval == -1) throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); - if (!block_new_opened && arraysize > 1) - return 0; - return retval; } #endif From 575807b4d4f7c1acb4f87fe376c4e1724bc83015 Mon Sep 17 00:00:00 2001 From: Sektor van Skijlen Date: Tue, 8 Dec 2020 18:06:08 +0100 Subject: [PATCH 5/7] Apply suggestions from code review Co-authored-by: Maxim Sharabayko --- srtcore/group.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/srtcore/group.cpp b/srtcore/group.cpp index e7d50a7ec..80dc7bf93 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -1372,19 +1372,19 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) // Make an extra loop check to see if we could be // in a condition of "all sockets either blocked or pending" - int nsuccessful = 0; - int nblocked = 0; + int nsuccessful = 0; // number of successfully connected sockets + int nblocked = 0; // number of sockets blocked in connection bool is_pending_blocked = false; for (vector::iterator is = sendstates.begin(); is != sendstates.end(); ++is) { - if (is->stat == -1) + if (is->stat != -1) { - if (is->code == SRT_EASYNCSND) - ++nblocked; + nsuccessful++; } - else + // is->stat == -1 + else if (is->code == SRT_EASYNCSND) { - nsuccessful++; + ++nblocked; } } From fa55144783504b933ae18f5a7c3b061e94460223 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 9 Dec 2020 10:54:10 +0100 Subject: [PATCH 6/7] Fixed test that reused a dead socket --- test/test_connection_timeout.cpp | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/test/test_connection_timeout.cpp b/test/test_connection_timeout.cpp index bbd87e145..5d0548029 100644 --- a/test/test_connection_timeout.cpp +++ b/test/test_connection_timeout.cpp @@ -172,20 +172,20 @@ TEST_F(TestConnectionTimeout, Nonblocking) { */ TEST_F(TestConnectionTimeout, BlockingLoop) { - const SRTSOCKET client_sock = srt_create_socket(); - ASSERT_GT(client_sock, 0); // socket_id should be > 0 - - // Set connection timeout to 999 ms to reduce the test execution time. - // Also need to hit a time point between two threads: - // srt_connect will check TTL every second, - // CRcvQueue::worker will wait on a socket for 10 ms. - // Need to have a condition, when srt_connect will process the timeout. - const int connection_timeout_ms = 999; - EXPECT_EQ(srt_setsockopt(client_sock, 0, SRTO_CONNTIMEO, &connection_timeout_ms, sizeof connection_timeout_ms), SRT_SUCCESS); - const sockaddr* psa = reinterpret_cast(&m_sa); + const int connection_timeout_ms = 999; for (int i = 0; i < 10; ++i) { + const SRTSOCKET client_sock = srt_create_socket(); + ASSERT_GT(client_sock, 0); // socket_id should be > 0 + + // Set connection timeout to 999 ms to reduce the test execution time. + // Also need to hit a time point between two threads: + // srt_connect will check TTL every second, + // CRcvQueue::worker will wait on a socket for 10 ms. + // Need to have a condition, when srt_connect will process the timeout. + EXPECT_EQ(srt_setsockopt(client_sock, 0, SRTO_CONNTIMEO, &connection_timeout_ms, sizeof connection_timeout_ms), SRT_SUCCESS); + EXPECT_EQ(srt_connect(client_sock, psa, sizeof m_sa), SRT_ERROR); const int error_code = srt_getlasterror(nullptr); @@ -196,9 +196,9 @@ TEST_F(TestConnectionTimeout, BlockingLoop) << error_code << " " << srt_getlasterror_str() << "\n"; break; } - } - EXPECT_EQ(srt_close(client_sock), SRT_SUCCESS); + EXPECT_EQ(srt_close(client_sock), SRT_SUCCESS); + } } From ece9a2813a9e4a34a88967fb4c9c2027789299cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 17 Dec 2020 17:16:17 +0100 Subject: [PATCH 7/7] Updated constness of arraysize --- srtcore/api.cpp | 2 +- srtcore/api.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index ae393d3b6..83a5fac69 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1310,7 +1310,7 @@ int CUDTUnited::singleMemberConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* gd) } // [[using assert(pg->m_iBusy > 0)]] -int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int arraysize) +int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, const int arraysize) { CUDTGroup& g = *pg; SRT_ASSERT(g.m_iBusy > 0); diff --git a/srtcore/api.h b/srtcore/api.h index 8ad6bb3fd..de429e8b1 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -256,7 +256,7 @@ friend class CRendezvousQueue; int connect(const SRTSOCKET u, const sockaddr* name, int namelen, int32_t forced_isn); int connectIn(CUDTSocket* s, const sockaddr_any& target, int32_t forced_isn); #if ENABLE_EXPERIMENTAL_BONDING - int groupConnect(CUDTGroup* g, SRT_SOCKGROUPCONFIG targets [], int arraysize); + int groupConnect(CUDTGroup* g, SRT_SOCKGROUPCONFIG targets [], const int arraysize); int singleMemberConnect(CUDTGroup* g, SRT_SOCKGROUPCONFIG* target); #endif int close(const SRTSOCKET u);