Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fixed bug: tsbpd might miss m_bClosing flag setting, flag not always properly set. #1709

Merged
merged 13 commits into from
Mar 12, 2025
26 changes: 22 additions & 4 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4951,6 +4951,11 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous,
s->m_Status = SRTS_CONNECTED;

// acknowledde any waiting epolls to write
// This must be done AFTER the group member status is upgraded to IDLE because
// this state change will trigger the waiting function in blocking-mode groupConnect
// and this may be immediately followed by exit from connect and start sending function,
// which must see this very link already as IDLE, not PENDING, which will make this
// link unable to be used and therefore the sending call would fail.
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_CONNECT, true);

CGlobEvent::triggerEvent();
Expand Down Expand Up @@ -5541,6 +5546,12 @@ void * srt::CUDT::tsbpd(void* param)
gkeeper.group->updateLatestRcv(self->m_parent);
}
}

// After re-acquisition of the m_RecvLock, re-check the closing flag
if (self->m_bClosing)
{
break;
}
#endif
CGlobEvent::triggerEvent();
tsNextDelivery = steady_clock::time_point(); // Ready to read, nothing to wait for.
Expand All @@ -5566,6 +5577,7 @@ void * srt::CUDT::tsbpd(void* param)
THREAD_PAUSED();
bWokeUpOnSignal = tsbpd_cc.wait_until(tsNextDelivery);
THREAD_RESUMED();
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP on " << (bWokeUpOnSignal? "SIGNAL" : "TIMEOUIT") << "!!!");
}
else
{
Expand Down Expand Up @@ -6328,7 +6340,7 @@ bool srt::CUDT::closeInternal() ATR_NOEXCEPT
// Inform the threads handler to stop.
m_bClosing = true;

HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE. Acquiring connection lock");
HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE (closing=true). Acquiring connection lock");

ScopedLock connectguard(m_ConnectionLock);

Expand Down Expand Up @@ -7824,15 +7836,20 @@ void srt::CUDT::destroySynch()
void srt::CUDT::releaseSynch()
{
SRT_ASSERT(m_bClosing);
if (!m_bClosing)
{
LOGC(smlog.Error, log << "releaseSynch: IPE: m_bClosing not set to false, TSBPD might hangup!");
m_bClosing = true;
}
// wake up user calls
CSync::lock_notify_one(m_SendBlockCond, m_SendBlockLock);

enterCS(m_SendLock);
leaveCS(m_SendLock);

// Awake tsbpd() and srt_recv*(..) threads for them to check m_bClosing.
CSync::lock_notify_one(m_RecvDataCond, m_RecvLock);
CSync::lock_notify_one(m_RcvTsbPdCond, m_RecvLock);
CSync::lock_notify_all(m_RecvDataCond, m_RecvLock);
CSync::lock_notify_all(m_RcvTsbPdCond, m_RecvLock);

// Azquiring m_RcvTsbPdStartupLock protects race in starting
// the tsbpd() thread in CUDT::processData().
Expand Down Expand Up @@ -9939,7 +9956,7 @@ void srt::CUDT::processClose()
m_bBroken = true;
m_iBrokenCounter = 60;

HLOGP(smlog.Debug, "processClose: sent message and set flags");
HLOGP(smlog.Debug, "processClose: (closing=true) sent message and set flags");

if (m_bTsbPd)
{
Expand Down Expand Up @@ -11691,6 +11708,7 @@ void srt::CUDT::checkTimers()

void srt::CUDT::updateBrokenConnection()
{
HLOGC(smlog.Debug, log << "updateBrokenConnection: setting closing=true and taking out epoll events");
m_bClosing = true;
releaseSynch();
// app can call any UDT API to learn the connection_broken error
Expand Down
Loading