-
Notifications
You must be signed in to change notification settings - Fork 866
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Fixed proper reporting of sending blocked state. Fixed API value for connect-non-blocking #1698
base: master
Are you sure you want to change the base?
[core] Fixed proper reporting of sending blocked state. Fixed API value for connect-non-blocking #1698
Changes from 5 commits
4a95481
c272a05
834b702
8b4d0b7
f0d185c
575807b
fa55144
4b74a4c
ece9a28
5572363
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1688,6 +1688,26 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar | |
|
||
vector<SRTSOCKET> broken; | ||
|
||
// Return value rules: | ||
// In non-blocking mode: | ||
// - return Socket ID, if: | ||
// - you requested only one connection in this call | ||
// In blocking mode: | ||
// - return Socket ID, if: | ||
// - you requested only one connection in this call | ||
// - you connect a group that was not connected yet | ||
// - otherwise return 0 | ||
|
||
// Leave the last SID value in retval if you had only one | ||
// connection to start. Otherwise override it with 0. | ||
if (arraysize > 1) | ||
retval = 0; | ||
Comment on lines
+1666
to
+1667
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, const int arraysize |
||
|
||
// For blocking mode only, and only in case when the group | ||
// was not yet connected, this retval could be overridden | ||
// again with the first ready socket ID, and this socket ID | ||
// will be returned. | ||
|
||
while (block_new_opened) | ||
{ | ||
if (spawned.empty()) | ||
|
@@ -2708,7 +2728,7 @@ void CUDTUnited::checkBrokenSockets() | |
|| (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize()) | ||
|| (j->second->m_pUDT->m_tsLingerExpiration <= steady_clock::now())) | ||
{ | ||
HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << j->second->m_SocketID); | ||
HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED (closing=true) qualified @" << j->second->m_SocketID); | ||
j->second->m_pUDT->m_tsLingerExpiration = steady_clock::time_point(); | ||
j->second->m_pUDT->m_bClosing = true; | ||
j->second->m_tsClosureTimeStamp = steady_clock::now(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5739,11 +5739,23 @@ void *CUDT::tsbpd(void *param) | |
gkeeper.group->updateLatestRcv(self->m_parent); | ||
} | ||
} | ||
|
||
// After re-acquisition of the m_RecvLock, re-check the closing flag | ||
if (self->m_bClosing) | ||
{ | ||
break; | ||
} | ||
#endif | ||
CGlobEvent::triggerEvent(); | ||
tsbpdtime = steady_clock::time_point(); | ||
} | ||
|
||
if (self->m_bClosing) | ||
{ | ||
HLOGC(tslog.Debug, log << "tsbpd: IPE? Closing flag set in the meantime of checking. Exiting"); | ||
break; | ||
} | ||
|
||
if (!is_zero(tsbpdtime)) | ||
{ | ||
const steady_clock::duration timediff = tsbpdtime - steady_clock::now(); | ||
|
@@ -5756,8 +5768,9 @@ void *CUDT::tsbpd(void *param) | |
log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << current_pkt_seq | ||
<< " T=" << FormatTime(tsbpdtime) << " - waiting " << count_milliseconds(timediff) << "ms"); | ||
THREAD_PAUSED(); | ||
tsbpd_cc.wait_for(timediff); | ||
const bool ATR_UNUSED signaled = tsbpd_cc.wait_for(timediff); | ||
THREAD_RESUMED(); | ||
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP on " << (signaled? "SIGNAL" : "TIMEOUIT") << "!!!"); | ||
} | ||
else | ||
{ | ||
|
@@ -5774,13 +5787,35 @@ void *CUDT::tsbpd(void *param) | |
*/ | ||
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack"); | ||
self->m_bTsbPdAckWakeup = true; | ||
THREAD_PAUSED(); | ||
tsbpd_cc.wait(); | ||
THREAD_RESUMED(); | ||
} | ||
|
||
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!"); | ||
bool signaled = false; | ||
while (!signaled) | ||
{ | ||
// For safety reasons, do wakeup once per 1/8s and re-check the flag. | ||
// This should be enough long time that during a normal transmission | ||
// the TSBPD thread would be woken up much earlier when required by | ||
// ACK per ACK timer (at most 10ms since the last check) and in case | ||
// when this might result in a deadlock, it would only hold up to 125ms, | ||
// which should be little harmful for the application. NOTE THAT THIS | ||
// IS A SANITY CHECK FOR A SITUATION THAT SHALL NEVER HAPPEN. | ||
THREAD_PAUSED(); | ||
signaled = tsbpd_cc.wait_for(milliseconds_from(125)); | ||
Comment on lines
+5810
to
+5819
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will hide possible synchronization issues, and it would much harder to track them.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's only a sanity check, and I doubt the performance degradation here, although might be that 250ms would be a less likely watchdog to bite, while applications also usually have their own independent watchdogs. Initially I planned 1s here, but that would be way too much. I understand that it might make errors kinda hidden, but OTOH a deadlock here would make a scrap in video transmission (so it won't go "invisible", just it won't make a total disaster). Note also that releaseSynch won't be called another time, that's true, but TSBPD should simply exit on seeing |
||
THREAD_RESUMED(); | ||
if (self->m_bClosing && !signaled) | ||
{ | ||
HLOGC(tslog.Debug, log << "tsbpd: IPE: Closing flag set in the meantime of waiting. Continue to EXIT"); | ||
|
||
// This break doesn't have to be done in case when signaled | ||
// because if so this current loop will be interrupted anyway, | ||
// and the outer loop will be terminated at the check of self->m_bClosing. | ||
// This is only a sanity check. | ||
break; | ||
} | ||
} | ||
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP on " << (signaled? "SIGNAL" : "TIMEOUIT") << "!!!"); | ||
} | ||
} | ||
|
||
THREAD_EXIT(); | ||
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING"); | ||
return NULL; | ||
|
@@ -6354,7 +6389,7 @@ bool CUDT::closeInternal() | |
// Inform the threads handler to stop. | ||
m_bClosing = true; | ||
|
||
HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE. Acquiring connection lock"); | ||
HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE (closing=true). Acquiring connection lock"); | ||
|
||
ScopedLock connectguard(m_ConnectionLock); | ||
|
||
|
@@ -7819,15 +7854,20 @@ void CUDT::destroySynch() | |
void CUDT::releaseSynch() | ||
{ | ||
SRT_ASSERT(m_bClosing); | ||
if (!m_bClosing) | ||
{ | ||
HLOGC(smlog.Debug, log << "releaseSynch: IPE: m_bClosing not set to false, TSBPD might hangup!"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might rather be an error message: |
||
m_bClosing = true; | ||
} | ||
// wake up user calls | ||
CSync::lock_signal(m_SendBlockCond, m_SendBlockLock); | ||
|
||
enterCS(m_SendLock); | ||
leaveCS(m_SendLock); | ||
|
||
// Awake tsbpd() and srt_recv*(..) threads for them to check m_bClosing. | ||
CSync::lock_signal(m_RecvDataCond, m_RecvLock); | ||
CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock); | ||
CSync::lock_broadcast(m_RecvDataCond, m_RecvLock); | ||
CSync::lock_broadcast(m_RcvTsbPdCond, m_RecvLock); | ||
|
||
// Azquiring m_RcvTsbPdStartupLock protects race in starting | ||
// the tsbpd() thread in CUDT::processData(). | ||
|
@@ -9520,7 +9560,7 @@ void CUDT::processClose() | |
m_bBroken = true; | ||
m_iBrokenCounter = 60; | ||
|
||
HLOGP(smlog.Debug, "processClose: sent message and set flags"); | ||
HLOGP(smlog.Debug, "processClose: (closing=true) sent message and set flags"); | ||
|
||
if (m_bTsbPd) | ||
{ | ||
|
@@ -11032,7 +11072,8 @@ bool CUDT::checkExpTimer(const steady_clock::time_point& currtime, int check_rea | |
// Application will detect this when it calls any UDT methods next time. | ||
// | ||
HLOGC(xtlog.Debug, | ||
log << "CONNECTION EXPIRED after " << count_milliseconds(currtime - m_tsLastRspTime) << "ms"); | ||
log << "CONNECTION EXPIRED after " << count_milliseconds(currtime - m_tsLastRspTime) | ||
<< "ms (closing=true)"); | ||
m_bClosing = true; | ||
m_bBroken = true; | ||
m_iBrokenCounter = 30; | ||
|
@@ -11192,6 +11233,8 @@ void CUDT::checkTimers() | |
|
||
void CUDT::updateBrokenConnection() | ||
{ | ||
HLOGC(smlog.Debug, log << "updateBrokenConnection: setting closing=true and taking out epoll events"); | ||
m_bClosing = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this fix require other changes introduced in the PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is simply a completion for all cases when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does only this change make sense without the rest changes in this PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rest of the changes are formal fixes for real problems, others - including this one - are "sanity fixes". This flag must be set true when the TSBPD thread is expected to exit otherwise it risks a deadlock. Relying on that a flag is set appropriately before exitting isn't the best approach, but it's simply derived from UDT and it's a consequence of how the TSBPD thread is expected to work. Changes here were due to an in-between lifted lock on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's move this fix to a separate PR, so it is not blocked by the review of other changes introduced here. |
||
releaseSynch(); | ||
// app can call any UDT API to learn the connection_broken error | ||
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"if you requested only one connection in this call"
Not clear. I guess the meaning is that there is only one target passed to this group connect function (
arraysize
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means that it's so if you do call
srt_connect
or if you dosrt_connect_group
with an array of size 1.