diff --git a/srtcore/buffer_rcv.cpp b/srtcore/buffer_rcv.cpp index 7bfb00ad8..27c0bbc81 100644 --- a/srtcore/buffer_rcv.cpp +++ b/srtcore/buffer_rcv.cpp @@ -1048,18 +1048,16 @@ void CRcvBuffer::updateTsbPdTimeBase(uint32_t usPktTimestamp) m_tsbpd.updateTsbPdTimeBase(usPktTimestamp); } -string CRcvBuffer::strFullnessState(bool enable_debug_log, int iFirstUnackSeqNo, const time_point& tsNow) const +string CRcvBuffer::strFullnessState(bool enable_debug_log, const time_point& tsNow) const { stringstream ss; if (enable_debug_log) { - ss << "iFirstUnackSeqNo=" << iFirstUnackSeqNo << " m_iStartSeqNo=" << m_iStartSeqNo - << " m_iStartPos=" << m_iStartPos << " m_iMaxPosInc=" << m_iMaxPosInc << ". "; + ss << "m_iStartSeqNo=" << m_iStartSeqNo << " m_iStartPos=" << m_iStartPos << " m_iMaxPosInc=" << m_iMaxPosInc + << ". "; } - ss << "Space avail " << getAvailSize(iFirstUnackSeqNo) << "/" << m_szSize << " pkts. "; - if (m_tsbpd.isEnabled() && m_iMaxPosInc > 0) { const PacketInfo nextValidPkt = getFirstValidPacketInfo(); diff --git a/srtcore/buffer_rcv.h b/srtcore/buffer_rcv.h index 4ee791ec1..b0db20bb6 100644 --- a/srtcore/buffer_rcv.h +++ b/srtcore/buffer_rcv.h @@ -58,7 +58,6 @@ class CRcvBuffer /// Similar to CRcvBuffer::addData(CUnit* unit, int offset) /// /// @param [in] unit pointer to a data unit containing new packet - /// @param [in] offset offset from last ACK point. /// /// @return 0 on success, -1 if packet is already in buffer, -2 if packet is before m_iStartSeqNo. /// -3 if a packet is offset is ahead the buffer capacity. @@ -343,7 +342,7 @@ class CRcvBuffer /// Form a string of the current buffer fullness state. /// number of packets acknowledged, TSBPD readiness, etc. - std::string strFullnessState(bool enable_debug_log, int iFirstUnackSeqNo, const time_point& tsNow) const; + std::string strFullnessState(bool enable_debug_log, const time_point& tsNow) const; private: CTsbpdTime m_tsbpd; diff --git a/srtcore/core.cpp b/srtcore/core.cpp index e1b4b9e49..869016372 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -9721,76 +9721,69 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& const int pktrexmitflag = m_bPeerRexmitFlag ? (rpkt.getRexmitFlag() ? 1 : 0) : 2; const bool retransmitted = pktrexmitflag == 1; - bool adding_successful = true; - - // m_iRcvLastSkipAck is the base sequence number for the receiver buffer. - // This is the offset in the buffer; if this is negative, it means that - // this sequence is already in the past and the buffer is not interested. - // Meaning, this packet will be rejected, even if it could potentially be - // one of missing packets in the transmission. - int32_t offset = CSeqNo::seqoff(m_iRcvLastSkipAck, rpkt.m_iSeqNo); - - IF_HEAVY_LOGGING(const char *exc_type = "EXPECTED"); + const int insert_offset = CSeqNo::seqoff(m_pRcvBuffer->getStartSeqNo(), rpkt.m_iSeqNo); + const int ack_offset = CSeqNo::seqoff(m_iRcvLastSkipAck, rpkt.m_iSeqNo); - if (offset < 0) + // If this is negative, it means that this sequence is already in the past + // and the buffer is not interested. Meaning, this packet will be rejected, + // even if it could potentially be one of missing packets in the transmission. + if (insert_offset < 0 || ack_offset < 0) { - IF_HEAVY_LOGGING(exc_type = "BELATED"); time_point pts = getPktTsbPdTime(NULL, rpkt); enterCS(m_StatsLock); const double bltime = (double) CountIIR( uint64_t(m_stats.traceBelatedTime) * 1000, count_microseconds(steady_clock::now() - pts), 0.2); - m_stats.traceBelatedTime = bltime / 1000.0; m_stats.rcvr.recvdBelated.count(rpkt.getLength()); leaveCS(m_StatsLock); + HLOGC(qrlog.Debug, - log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo << " offset=" << offset << " (BELATED/" - << s_rexmitstat_str[pktrexmitflag] << ") FLAGS: " << rpkt.MessageFlagStr()); + log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo << " insert_offset=" << insert_offset + << " ack_offset=" << ack_offset << " (BELATED/" << s_rexmitstat_str[pktrexmitflag] + << ") FLAGS: " << rpkt.MessageFlagStr()); continue; } - const int avail_bufsize = (int) getAvailRcvBufferSizeNoLock(); + IF_HEAVY_LOGGING(const char *exc_type = "EXPECTED"); + bool adding_successful = true; - if (offset >= avail_bufsize) + const int insert_res = m_pRcvBuffer->insert(u); + if (insert_res == -3) { - // This is already a sequence discrepancy. Probably there could be found - // some way to make it continue reception by overriding the sequence and - // make a kinda TLKPTDROP, but there has been found no reliable way to do this. + // The insert() result is -3 if the insert offset exceeds capacity. if (m_bTsbPd && m_bTLPktDrop && m_pRcvBuffer->empty()) { + // This is already a sequence discrepancy. Probably there could be found + // some way to make it continue reception by overriding the sequence and + // make a kinda TLKPTDROP, but there has been found no reliable way to do this. // Only in live mode. In File mode this shall not be possible // because the sender should stop sending in this situation. // In Live mode this means that there is a gap between the // lowest sequence in the empty buffer and the incoming sequence // that exceeds the buffer size. Receiving data in this situation // is no longer possible and this is a point of no return. - LOGC(qrlog.Error, log << CONID() << "SEQUENCE DISCREPANCY. BREAKING CONNECTION." " seq=" << rpkt.m_iSeqNo << " buffer=(" << m_iRcvLastSkipAck << ":" << m_iRcvCurrSeqNo // -1 = size to last index << "+" << CSeqNo::incseq(m_iRcvLastSkipAck, int(m_pRcvBuffer->capacity()) - 1) - << "), " << (offset-avail_bufsize+1) + << "), " << (insert_offset - int(m_pRcvBuffer->capacity()) + 1) << " past max. Reception no longer possible. REQUESTING TO CLOSE."); - return -2; } else { - LOGC(qrlog.Warn, log << CONID() << "No room to store incoming packet seqno " << rpkt.m_iSeqNo - << ", insert offset " << offset << ". " - << m_pRcvBuffer->strFullnessState(qrlog.Warn.CheckEnabled(), m_iRcvLastAck, steady_clock::now()) - ); - + LOGC(qrlog.Warn, + log << CONID() << "No room to store incoming packet. seqno=" << rpkt.m_iSeqNo + << " insert_offset=" << insert_offset << " ack_offset=" << ack_offset << ". " + << m_pRcvBuffer->strFullnessState(qrlog.Warn.CheckEnabled(), steady_clock::now())); return -1; } } - - int buffer_add_result = m_pRcvBuffer->insert(u); - if (buffer_add_result < 0) + else if (insert_res == -1) { // The insert() result is -1 if at the position evaluated from this packet's // sequence number there already is a packet. @@ -9800,10 +9793,12 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& } else { + SRT_ASSERT(insert_res == 0); w_new_inserted = true; IF_HEAVY_LOGGING(exc_type = "ACCEPTED"); excessive = false; + if (u->m_Packet.getMsgCryptoFlags() != EK_NOENC) { // TODO: reset and restore the timestamp if TSBPD is disabled. @@ -9857,8 +9852,7 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& if (m_pRcvBuffer) { - bufinfo << " BUFr=" << avail_bufsize - << " avail=" << getAvailRcvBufferSizeNoLock() + bufinfo << " avail=" << getAvailRcvBufferSizeNoLock() << " buffer=(" << m_iRcvLastSkipAck << ":" << m_iRcvCurrSeqNo // -1 = size to last index << "+" << CSeqNo::incseq(m_iRcvLastSkipAck, m_pRcvBuffer->capacity()-1) @@ -9869,7 +9863,8 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& // There's no way to obtain this information here. LOGC(qrlog.Debug, log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo - << " offset=" << offset + << " insert_offset=" << insert_offset + << " ack_offset=" << ack_offset << bufinfo.str() << " RSL=" << expectspec.str() << " SN=" << s_rexmitstat_str[pktrexmitflag]