Skip to content

Commit

Permalink
[core] Utilize all insert results of the new recv buffer.
Browse files Browse the repository at this point in the history
  • Loading branch information
gou4shi1 committed Nov 12, 2022
1 parent cbfa812 commit 57e2f7b
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 37 deletions.
11 changes: 5 additions & 6 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ CRcvBuffer::~CRcvBuffer()
}
}

int CRcvBuffer::insert(CUnit* unit)
int CRcvBuffer::insert(CUnit* unit, int32_t& w_offset)
{
SRT_ASSERT(unit != NULL);
const int32_t seqno = unit->m_Packet.getSeqNo();
const int offset = CSeqNo::seqoff(m_iStartSeqNo, seqno);
w_offset = offset;

IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::insert: seqno " << seqno);
Expand Down Expand Up @@ -1048,18 +1049,16 @@ void CRcvBuffer::updateTsbPdTimeBase(uint32_t usPktTimestamp)
m_tsbpd.updateTsbPdTimeBase(usPktTimestamp);
}

string CRcvBuffer::strFullnessState(bool enable_debug_log, int iFirstUnackSeqNo, const time_point& tsNow) const
string CRcvBuffer::strFullnessState(bool enable_debug_log, const time_point& tsNow) const
{
stringstream ss;

if (enable_debug_log)
{
ss << "iFirstUnackSeqNo=" << iFirstUnackSeqNo << " m_iStartSeqNo=" << m_iStartSeqNo
<< " m_iStartPos=" << m_iStartPos << " m_iMaxPosInc=" << m_iMaxPosInc << ". ";
ss << "m_iStartSeqNo=" << m_iStartSeqNo << " m_iStartPos=" << m_iStartPos << " m_iMaxPosInc=" << m_iMaxPosInc
<< ". ";
}

ss << "Space avail " << getAvailSize(iFirstUnackSeqNo) << "/" << m_szSize << " pkts. ";

if (m_tsbpd.isEnabled() && m_iMaxPosInc > 0)
{
const PacketInfo nextValidPkt = getFirstValidPacketInfo();
Expand Down
6 changes: 3 additions & 3 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ class CRcvBuffer
/// Similar to CRcvBuffer::addData(CUnit* unit, int offset)
///
/// @param [in] unit pointer to a data unit containing new packet
/// @param [in] offset offset from last ACK point.
/// @param [out] w_offset insert offset from the start pos.
///
/// @return 0 on success, -1 if packet is already in buffer, -2 if packet is before m_iStartSeqNo.
/// -3 if a packet is offset is ahead the buffer capacity.
// TODO: Previously '-2' also meant 'already acknowledged'. Check usage of this value.
int insert(CUnit* unit);
int insert(CUnit* unit, int32_t& w_offset);

/// Drop packets in the receiver buffer from the current position up to the seqno (excluding seqno).
/// @param [in] seqno drop units up to this sequence number
Expand Down Expand Up @@ -343,7 +343,7 @@ class CRcvBuffer

/// Form a string of the current buffer fullness state.
/// number of packets acknowledged, TSBPD readiness, etc.
std::string strFullnessState(bool enable_debug_log, int iFirstUnackSeqNo, const time_point& tsNow) const;
std::string strFullnessState(bool enable_debug_log, const time_point& tsNow) const;

private:
CTsbpdTime m_tsbpd;
Expand Down
56 changes: 28 additions & 28 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9721,59 +9721,56 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&
const int pktrexmitflag = m_bPeerRexmitFlag ? (rpkt.getRexmitFlag() ? 1 : 0) : 2;
const bool retransmitted = pktrexmitflag == 1;

bool adding_successful = true;

// m_iRcvLastSkipAck is the base sequence number for the receiver buffer.
// This is the offset in the buffer; if this is negative, it means that
// this sequence is already in the past and the buffer is not interested.
// Meaning, this packet will be rejected, even if it could potentially be
// one of missing packets in the transmission.
int32_t offset = CSeqNo::seqoff(m_iRcvLastSkipAck, rpkt.m_iSeqNo);

IF_HEAVY_LOGGING(const char *exc_type = "EXPECTED");

if (offset < 0)
if (CSeqNo::seqcmp(rpkt.m_iSeqNo, m_iRcvLastSkipAck) < 0)
{
IF_HEAVY_LOGGING(exc_type = "BELATED");
time_point pts = getPktTsbPdTime(NULL, rpkt);

enterCS(m_StatsLock);
const double bltime = (double) CountIIR<uint64_t>(
uint64_t(m_stats.traceBelatedTime) * 1000,
count_microseconds(steady_clock::now() - pts), 0.2);

m_stats.traceBelatedTime = bltime / 1000.0;
m_stats.rcvr.recvdBelated.count(rpkt.getLength());
leaveCS(m_StatsLock);

HLOGC(qrlog.Debug,
log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo << " offset=" << offset << " (BELATED/"
<< s_rexmitstat_str[pktrexmitflag] << ") FLAGS: " << rpkt.MessageFlagStr());
log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo
<< " offset=" << CSeqNo::seqoff(m_iRcvLastSkipAck, rpkt.m_iSeqNo) << " (BELATED/"
<< s_rexmitstat_str[pktrexmitflag] << ") FLAGS: " << rpkt.MessageFlagStr());
continue;
}

const int avail_bufsize = (int) getAvailRcvBufferSizeNoLock();
IF_HEAVY_LOGGING(const char *exc_type = "EXPECTED");
bool adding_successful = true;

if (offset >= avail_bufsize)
int offset = 0;
int buffer_add_result = m_pRcvBuffer->insert(u, offset);
if (buffer_add_result == -3)
{
// This is already a sequence discrepancy. Probably there could be found
// some way to make it continue reception by overriding the sequence and
// make a kinda TLKPTDROP, but there has been found no reliable way to do this.
// The insert() result is -3 if the insert offset exceeds capacity.
if (m_bTsbPd && m_bTLPktDrop && m_pRcvBuffer->empty())
{
// This is already a sequence discrepancy. Probably there could be found
// some way to make it continue reception by overriding the sequence and
// make a kinda TLKPTDROP, but there has been found no reliable way to do this.
// Only in live mode. In File mode this shall not be possible
// because the sender should stop sending in this situation.
// In Live mode this means that there is a gap between the
// lowest sequence in the empty buffer and the incoming sequence
// that exceeds the buffer size. Receiving data in this situation
// is no longer possible and this is a point of no return.

LOGC(qrlog.Error, log << CONID() <<
"SEQUENCE DISCREPANCY. BREAKING CONNECTION."
" seq=" << rpkt.m_iSeqNo
<< " buffer=(" << m_iRcvLastSkipAck
<< ":" << m_iRcvCurrSeqNo // -1 = size to last index
<< "+" << CSeqNo::incseq(m_iRcvLastSkipAck, int(m_pRcvBuffer->capacity()) - 1)
<< "), " << (offset-avail_bufsize+1)
<< "), " << (offset - int(m_pRcvBuffer->capacity()) + 1)
<< " past max. Reception no longer possible. REQUESTING TO CLOSE.");

return -2;
Expand All @@ -9782,15 +9779,19 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&
{
LOGC(qrlog.Warn, log << CONID() << "No room to store incoming packet seqno " << rpkt.m_iSeqNo
<< ", insert offset " << offset << ". "
<< m_pRcvBuffer->strFullnessState(qrlog.Warn.CheckEnabled(), m_iRcvLastAck, steady_clock::now())
<< m_pRcvBuffer->strFullnessState(qrlog.Warn.CheckEnabled(), steady_clock::now())
);

return -1;
}
}

int buffer_add_result = m_pRcvBuffer->insert(u);
if (buffer_add_result < 0)
else if (buffer_add_result == -2)
{
// The insert() result is -2 if the packet is behind the start of recv buffer.
IF_HEAVY_LOGGING(exc_type = "BELATED");
adding_successful = false;
}
else if (buffer_add_result == -1)
{
// The insert() result is -1 if at the position evaluated from this packet's
// sequence number there already is a packet.
Expand All @@ -9800,10 +9801,12 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&
}
else
{
SRT_ASSERT(buffer_add_result == 0);
w_new_inserted = true;

IF_HEAVY_LOGGING(exc_type = "ACCEPTED");
excessive = false;

if (u->m_Packet.getMsgCryptoFlags() != EK_NOENC)
{
// TODO: reset and restore the timestamp if TSBPD is disabled.
Expand Down Expand Up @@ -9857,12 +9860,9 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&

if (m_pRcvBuffer)
{
bufinfo << " BUFr=" << avail_bufsize
<< " avail=" << getAvailRcvBufferSizeNoLock()
<< " buffer=(" << m_iRcvLastSkipAck
<< ":" << m_iRcvCurrSeqNo // -1 = size to last index
<< "+" << CSeqNo::incseq(m_iRcvLastSkipAck, m_pRcvBuffer->capacity()-1)
<< ")";
bufinfo << " avail=" << getAvailRcvBufferSizeNoLock() << " buffer=(" << m_iRcvLastSkipAck << ":"
<< m_iRcvCurrSeqNo // -1 = size to last index
<< "+" << CSeqNo::incseq(m_iRcvLastSkipAck, m_pRcvBuffer->capacity() - 1) << ")";
}

// Empty buffer info in case of groupwise receiver.
Expand Down

0 comments on commit 57e2f7b

Please sign in to comment.