Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fixed proper reporting of sending blocked state. Fixed API value for connect-non-blocking #1698

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1839,6 +1839,9 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar
if (retval == -1)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);

if (!block_new_opened && arraysize > 1)
return 0;

return retval;
}
#endif
Expand Down
36 changes: 31 additions & 5 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5744,6 +5744,12 @@ void *CUDT::tsbpd(void *param)
tsbpdtime = steady_clock::time_point();
}

if (self->m_bClosing)
{
HLOGC(tslog.Debug, log << "tsbpd: IPE? Closing flag set in the meantime of checking. Exiting");
break;
}

if (!is_zero(tsbpdtime))
{
const steady_clock::duration timediff = tsbpdtime - steady_clock::now();
Expand Down Expand Up @@ -5774,13 +5780,25 @@ void *CUDT::tsbpd(void *param)
*/
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack");
self->m_bTsbPdAckWakeup = true;
THREAD_PAUSED();
tsbpd_cc.wait();
THREAD_RESUMED();

bool signaled = false;
while (!signaled)
{
// For safety reasons, do wakeup once per 1s and re-check the flag.
THREAD_PAUSED();
signaled = tsbpd_cc.wait_for(seconds_from(1));
THREAD_RESUMED();
if (self->m_bClosing)
{
HLOGC(tslog.Debug, log << "tsbpd: IPE? Closing flag set in the meantime of waiting. Exiting");
goto ExitLoops;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just let TSBPD to re-check the receiver buffer even in case of a spurious wake-up?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, yes, might be a better idea. OTOH this 1s delay is too much if that deadlock would have happened when closing a socket in the group receiver.

}

HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!");
}
ExitLoops:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No new goto statements, please.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to exit two loops, and additionally maintain the thread checker.

THREAD_EXIT();
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING");
return NULL;
Expand Down Expand Up @@ -7819,15 +7837,22 @@ void CUDT::destroySynch()
void CUDT::releaseSynch()
{
SRT_ASSERT(m_bClosing);
#if ENABLE_HEAVY_LOGGING
if (!m_bClosing)
{
LOGC(smlog.Debug, log << "releaseSynch: IPE: m_bClosing not set to false, TSBPD might hangup!");
}
#endif
m_bClosing = true;
// wake up user calls
CSync::lock_signal(m_SendBlockCond, m_SendBlockLock);

enterCS(m_SendLock);
leaveCS(m_SendLock);

// Awake tsbpd() and srt_recv*(..) threads for them to check m_bClosing.
CSync::lock_signal(m_RecvDataCond, m_RecvLock);
CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);
CSync::lock_broadcast(m_RecvDataCond, m_RecvLock);
CSync::lock_broadcast(m_RcvTsbPdCond, m_RecvLock);

// Azquiring m_RcvTsbPdStartupLock protects race in starting
// the tsbpd() thread in CUDT::processData().
Expand Down Expand Up @@ -11192,6 +11217,7 @@ void CUDT::checkTimers()

void CUDT::updateBrokenConnection()
{
m_bClosing = true;
releaseSynch();
// app can call any UDT API to learn the connection_broken error
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true);
Expand Down
12 changes: 9 additions & 3 deletions srtcore/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,9 +864,15 @@ int CEPoll::update_events(const SRTSOCKET& uid, std::set<int>& eids, const int e
CEPollDesc::Wait* pwait = ed.watch_find(uid);
if (!pwait)
{
// As this is mapped in the socket's data, it should be impossible.
LOGC(eilog.Error, log << "epoll/update: IPE: update struck E"
<< (*i) << " which is NOT SUBSCRIBED to @" << uid);
// Don't print this error, if the intention was to clear the readiness.
// This is being usually done together with unsubscription, so this error
// would be misleading and unnecessary here. Report only cases when setting readiness.
if (enable)
{
// As this is mapped in the socket's data, it should be impossible.
LOGC(eilog.Error, log << "epoll/update: IPE: update struck E"
<< (*i) << " which is NOT SUBSCRIBED to @" << uid);
}
continue;
}

Expand Down
Loading