Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fixed proper reporting of sending blocked state. Fixed API value for connect-non-blocking #1698

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
22 changes: 21 additions & 1 deletion srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1688,6 +1688,26 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar

vector<SRTSOCKET> broken;

// Return value rules:
// In non-blocking mode:
// - return Socket ID, if:
// - you requested only one connection in this call
// In blocking mode:
// - return Socket ID, if:
// - you requested only one connection in this call
// - you connect a group that was not connected yet
// - otherwise return 0

Comment on lines +1654 to +1663
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"if you requested only one connection in this call"
Not clear. I guess the meaning is that there is only one target passed to this group connect function (arraysize).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that it's so if you do call srt_connect or if you do srt_connect_group with an array of size 1.

// Leave the last SID value in retval if you had only one
// connection to start. Otherwise override it with 0.
if (arraysize > 1)
retval = 0;
Comment on lines +1666 to +1667
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that arraysize is 400 lines above from this line, it would make sense to make it const, so that no one mistakenly changes it in between.

int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, const int arraysize


// For blocking mode only, and only in case when the group
// was not yet connected, this retval could be overridden
// again with the first ready socket ID, and this socket ID
// will be returned.

while (block_new_opened)
{
if (spawned.empty())
Expand Down Expand Up @@ -2708,7 +2728,7 @@ void CUDTUnited::checkBrokenSockets()
|| (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize())
|| (j->second->m_pUDT->m_tsLingerExpiration <= steady_clock::now()))
{
HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << j->second->m_SocketID);
HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED (closing=true) qualified @" << j->second->m_SocketID);
j->second->m_pUDT->m_tsLingerExpiration = steady_clock::time_point();
j->second->m_pUDT->m_bClosing = true;
j->second->m_tsClosureTimeStamp = steady_clock::now();
Expand Down
65 changes: 54 additions & 11 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5739,11 +5739,23 @@ void *CUDT::tsbpd(void *param)
gkeeper.group->updateLatestRcv(self->m_parent);
}
}

// After re-acquisition of the m_RecvLock, re-check the closing flag
if (self->m_bClosing)
{
break;
}
#endif
CGlobEvent::triggerEvent();
tsbpdtime = steady_clock::time_point();
}

if (self->m_bClosing)
{
HLOGC(tslog.Debug, log << "tsbpd: IPE? Closing flag set in the meantime of checking. Exiting");
break;
}

if (!is_zero(tsbpdtime))
{
const steady_clock::duration timediff = tsbpdtime - steady_clock::now();
Expand All @@ -5756,8 +5768,9 @@ void *CUDT::tsbpd(void *param)
log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << current_pkt_seq
<< " T=" << FormatTime(tsbpdtime) << " - waiting " << count_milliseconds(timediff) << "ms");
THREAD_PAUSED();
tsbpd_cc.wait_for(timediff);
const bool ATR_UNUSED signaled = tsbpd_cc.wait_for(timediff);
THREAD_RESUMED();
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP on " << (signaled? "SIGNAL" : "TIMEOUIT") << "!!!");
}
else
{
Expand All @@ -5774,13 +5787,35 @@ void *CUDT::tsbpd(void *param)
*/
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack");
self->m_bTsbPdAckWakeup = true;
THREAD_PAUSED();
tsbpd_cc.wait();
THREAD_RESUMED();
}

HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!");
bool signaled = false;
while (!signaled)
{
// For safety reasons, do wakeup once per 1/8s and re-check the flag.
// This should be enough long time that during a normal transmission
// the TSBPD thread would be woken up much earlier when required by
// ACK per ACK timer (at most 10ms since the last check) and in case
// when this might result in a deadlock, it would only hold up to 125ms,
// which should be little harmful for the application. NOTE THAT THIS
// IS A SANITY CHECK FOR A SITUATION THAT SHALL NEVER HAPPEN.
THREAD_PAUSED();
signaled = tsbpd_cc.wait_for(milliseconds_from(125));
Comment on lines +5810 to +5819
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will hide possible synchronization issues, and it would much harder to track them.
Also might introduce a potential minor performance degradation.

CUDT::releaseSynch() notifies m_RcvTsbPdCond used in this tsbpd_cc. If it happens to be blocked forever, then there is another issue to fix.

Copy link
Collaborator Author

@ethouris ethouris Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's only a sanity check, and I doubt the performance degradation here, although might be that 250ms would be a less likely watchdog to bite, while applications also usually have their own independent watchdogs. Initially I planned 1s here, but that would be way too much. I understand that it might make errors kinda hidden, but OTOH a deadlock here would make a scrap in video transmission (so it won't go "invisible", just it won't make a total disaster).

Note also that releaseSynch won't be called another time, that's true, but TSBPD should simply exit on seeing m_bClosing flag anyway.

THREAD_RESUMED();
if (self->m_bClosing && !signaled)
{
HLOGC(tslog.Debug, log << "tsbpd: IPE: Closing flag set in the meantime of waiting. Continue to EXIT");

// This break doesn't have to be done in case when signaled
// because if so this current loop will be interrupted anyway,
// and the outer loop will be terminated at the check of self->m_bClosing.
// This is only a sanity check.
break;
}
}
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP on " << (signaled? "SIGNAL" : "TIMEOUIT") << "!!!");
}
}

THREAD_EXIT();
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING");
return NULL;
Expand Down Expand Up @@ -6354,7 +6389,7 @@ bool CUDT::closeInternal()
// Inform the threads handler to stop.
m_bClosing = true;

HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE. Acquiring connection lock");
HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE (closing=true). Acquiring connection lock");

ScopedLock connectguard(m_ConnectionLock);

Expand Down Expand Up @@ -7819,15 +7854,20 @@ void CUDT::destroySynch()
void CUDT::releaseSynch()
{
SRT_ASSERT(m_bClosing);
if (!m_bClosing)
{
HLOGC(smlog.Debug, log << "releaseSynch: IPE: m_bClosing not set to false, TSBPD might hangup!");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might rather be an error message: LOGC(smlog.Error, ...).

m_bClosing = true;
}
// wake up user calls
CSync::lock_signal(m_SendBlockCond, m_SendBlockLock);

enterCS(m_SendLock);
leaveCS(m_SendLock);

// Awake tsbpd() and srt_recv*(..) threads for them to check m_bClosing.
CSync::lock_signal(m_RecvDataCond, m_RecvLock);
CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);
CSync::lock_broadcast(m_RecvDataCond, m_RecvLock);
CSync::lock_broadcast(m_RcvTsbPdCond, m_RecvLock);

// Azquiring m_RcvTsbPdStartupLock protects race in starting
// the tsbpd() thread in CUDT::processData().
Expand Down Expand Up @@ -9520,7 +9560,7 @@ void CUDT::processClose()
m_bBroken = true;
m_iBrokenCounter = 60;

HLOGP(smlog.Debug, "processClose: sent message and set flags");
HLOGP(smlog.Debug, "processClose: (closing=true) sent message and set flags");

if (m_bTsbPd)
{
Expand Down Expand Up @@ -11032,7 +11072,8 @@ bool CUDT::checkExpTimer(const steady_clock::time_point& currtime, int check_rea
// Application will detect this when it calls any UDT methods next time.
//
HLOGC(xtlog.Debug,
log << "CONNECTION EXPIRED after " << count_milliseconds(currtime - m_tsLastRspTime) << "ms");
log << "CONNECTION EXPIRED after " << count_milliseconds(currtime - m_tsLastRspTime)
<< "ms (closing=true)");
m_bClosing = true;
m_bBroken = true;
m_iBrokenCounter = 30;
Expand Down Expand Up @@ -11192,6 +11233,8 @@ void CUDT::checkTimers()

void CUDT::updateBrokenConnection()
{
HLOGC(smlog.Debug, log << "updateBrokenConnection: setting closing=true and taking out epoll events");
m_bClosing = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this fix require other changes introduced in the PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is simply a completion for all cases when releaseSync is called, which is connected to the fact that releaseSync is about to make TSBPD thread exit, and it can only exit if m_bClosing == true.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does only this change make sense without the rest changes in this PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest of the changes are formal fixes for real problems, others - including this one - are "sanity fixes". This flag must be set true when the TSBPD thread is expected to exit otherwise it risks a deadlock. Relying on that a flag is set appropriately before exitting isn't the best approach, but it's simply derived from UDT and it's a consequence of how the TSBPD thread is expected to work. Changes here were due to an in-between lifted lock on m_RecvLock, which was newly introduced with the groups.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this fix to a separate PR, so it is not blocked by the review of other changes introduced here.

releaseSynch();
// app can call any UDT API to learn the connection_broken error
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true);
Expand Down
2 changes: 1 addition & 1 deletion srtcore/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ int CEPoll::update_events(const SRTSOCKET& uid, std::set<int>& eids, const int e
if (!pwait)
{
// As this is mapped in the socket's data, it should be impossible.
LOGC(eilog.Error, log << "epoll/update: IPE: update struck E"
HLOGC(eilog.Debug, log << "epoll/update: IPE: update struck E"
<< (*i) << " which is NOT SUBSCRIBED to @" << uid);
continue;
}
Expand Down
Loading