Skip to content

Commit

Permalink
rtptwcc: changes to use rtp buffer arrival time and current time.
Browse files Browse the repository at this point in the history
For TWCC we are more interested to track the arrival time (receive side)
and the current time (sender side) of the buffers rather than the
running time.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/927>
  • Loading branch information
tbeloqui authored and GStreamer Marge Bot committed Aug 25, 2021
1 parent 0440cb1 commit 266c2d0
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 46 deletions.
2 changes: 2 additions & 0 deletions gst/rtpmanager/rtpsession.c
Original file line number Diff line number Diff line change
Expand Up @@ -2172,9 +2172,11 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
res =
gst_buffer_list_foreach (list, (GstBufferListFunc) update_packet,
pinfo);
pinfo->arrival_time = GST_CLOCK_TIME_NONE;
} else {
GstBuffer *buffer = GST_BUFFER_CAST (data);
res = update_packet (&buffer, 0, pinfo);
pinfo->arrival_time = GST_BUFFER_DTS (buffer);
}

return res;
Expand Down
2 changes: 2 additions & 0 deletions gst/rtpmanager/rtpstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ typedef struct {
* @address: address of the sender of the packet
* @current_time: current time according to the system clock
* @running_time: time of a packet as buffer running_time
* @arrival_time: time of arrival of a packet
* @ntpnstime: time of a packet NTP time in nanoseconds
* @header_len: number of overhead bytes per packet
* @bytes: bytes of the packet including lowlevel overhead
Expand All @@ -92,6 +93,7 @@ typedef struct {
GSocketAddress *address;
GstClockTime current_time;
GstClockTime running_time;
GstClockTime arrival_time;
guint64 ntpnstime;
guint header_len;
guint bytes;
Expand Down
15 changes: 10 additions & 5 deletions gst/rtpmanager/rtptwcc.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ recv_packet_init (RecvPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo)
{
memset (packet, 0, sizeof (RecvPacket));
packet->seqnum = seqnum;
packet->ts = pinfo->running_time;

if (GST_CLOCK_TIME_IS_VALID (pinfo->arrival_time))
packet->ts = pinfo->arrival_time;
else
packet->ts = pinfo->current_time;
}

static guint8
Expand Down Expand Up @@ -784,7 +788,7 @@ rtp_twcc_manager_recv_packet (RTPTWCCManager * twcc, RTPPacketInfo * pinfo)

GST_LOG ("Receive: twcc-seqnum: %u, pt: %u, marker: %d, ts: %"
GST_TIME_FORMAT, seqnum, pinfo->pt, pinfo->marker,
GST_TIME_ARGS (pinfo->running_time));
GST_TIME_ARGS (pinfo->arrival_time));

if (!pinfo->marker)
twcc->packet_count_no_marker++;
Expand Down Expand Up @@ -841,7 +845,7 @@ static void
sent_packet_init (SentPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo)
{
packet->seqnum = seqnum;
packet->ts = pinfo->running_time;
packet->ts = pinfo->current_time;
packet->size = pinfo->payload_len;
packet->pt = pinfo->pt;
packet->remote_ts = GST_CLOCK_TIME_NONE;
Expand All @@ -864,8 +868,9 @@ rtp_twcc_manager_send_packet (RTPTWCCManager * twcc, RTPPacketInfo * pinfo)
g_array_append_val (twcc->sent_packets, packet);


GST_LOG ("Send: twcc-seqnum: %u, pt: %u, marker: %d, ts: %" GST_TIME_FORMAT,
seqnum, pinfo->pt, pinfo->marker, GST_TIME_ARGS (pinfo->running_time));
GST_LOG ("Send: twcc-seqnum: %u, pt: %u, marker: %d, ts: %"
GST_TIME_FORMAT, seqnum, pinfo->pt, pinfo->marker,
GST_TIME_ARGS (pinfo->current_time));
}

static void
Expand Down
152 changes: 111 additions & 41 deletions tests/check/elements/rtpsession.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ generate_caps (void)
static GstBuffer *
generate_test_buffer_full (GstClockTime ts,
guint seqnum, guint32 rtp_ts, guint ssrc,
gboolean marker_bit, guint8 twcc_ext_id, guint16 twcc_seqnum)
gboolean marker_bit, guint8 payload_type, guint8 twcc_ext_id,
guint16 twcc_seqnum)
{
GstBuffer *buf;
guint8 *payload;
Expand All @@ -67,7 +68,7 @@ generate_test_buffer_full (GstClockTime ts,
GST_BUFFER_DTS (buf) = ts;

gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
gst_rtp_buffer_set_payload_type (&rtp, TEST_BUF_PT);
gst_rtp_buffer_set_payload_type (&rtp, payload_type);
gst_rtp_buffer_set_seq (&rtp, seqnum);
gst_rtp_buffer_set_timestamp (&rtp, rtp_ts);
gst_rtp_buffer_set_ssrc (&rtp, ssrc);
Expand All @@ -93,26 +94,33 @@ static GstBuffer *
generate_test_buffer (guint seqnum, guint ssrc)
{
return generate_test_buffer_full (seqnum * TEST_BUF_DURATION,
seqnum, seqnum * TEST_RTP_TS_DURATION, ssrc, FALSE, 0, 0);
seqnum, seqnum * TEST_RTP_TS_DURATION, ssrc, FALSE, TEST_BUF_PT, 0, 0);
}

static GstBuffer *
generate_twcc_recv_buffer (guint seqnum,
GstClockTime arrival_time, gboolean marker_bit)
{
return generate_test_buffer_full (arrival_time, seqnum,
seqnum * TEST_RTP_TS_DURATION, TEST_BUF_SSRC, marker_bit,
seqnum * TEST_RTP_TS_DURATION, TEST_BUF_SSRC, marker_bit, TEST_BUF_PT,
TEST_TWCC_EXT_ID, seqnum);
}

static GstBuffer *
generate_twcc_send_buffer (guint seqnum, gboolean marker_bit)
generate_twcc_send_buffer_full (guint seqnum, gboolean marker_bit,
guint ssrc, guint8 payload_type)
{
return generate_test_buffer_full (seqnum * TEST_BUF_DURATION,
seqnum, seqnum * TEST_RTP_TS_DURATION, TEST_BUF_SSRC, marker_bit,
TEST_TWCC_EXT_ID, seqnum);
seqnum, seqnum * TEST_RTP_TS_DURATION, ssrc, marker_bit,
payload_type, TEST_TWCC_EXT_ID, seqnum);
}

static GstBuffer *
generate_twcc_send_buffer (guint seqnum, gboolean marker_bit)
{
return generate_twcc_send_buffer_full (seqnum, marker_bit, TEST_BUF_SSRC,
TEST_BUF_PT);
}

typedef struct
{
Expand Down Expand Up @@ -2569,7 +2577,8 @@ generate_stepped_ts_buffer (guint i, gboolean stepped)
GST_TIME_ARGS (gst_util_uint64_scale_int (GST_SECOND, ts,
TEST_BUF_CLOCK_RATE)), i);

buf = generate_test_buffer_full (i * GST_MSECOND, i, ts, 0xAAAA, FALSE, 0, 0);
buf = generate_test_buffer_full (i * GST_MSECOND, i, ts, 0xAAAA, FALSE,
TEST_BUF_PT, 0, 0);
return buf;
}

Expand Down Expand Up @@ -2716,8 +2725,12 @@ GST_START_TEST (test_twcc_header_and_run_length)
for (i = 0; i < td->num_packets; i++) {
gboolean last_packet = i == (td->num_packets - 1);

buf = generate_twcc_recv_buffer (i + td->base_seqnum,
td->base_time + i * td->duration, last_packet);
GstClockTime now = gst_clock_get_time (GST_CLOCK_CAST (h->testclock));
GstClockTime ts = td->base_time + i * td->duration;
if (ts > now)
gst_test_clock_set_time (h->testclock, ts);

buf = generate_twcc_recv_buffer (i + td->base_seqnum, ts, last_packet);
res = session_harness_recv_rtp (h, buf);
fail_unless_equals_int (GST_FLOW_OK, res);
}
Expand Down Expand Up @@ -2869,6 +2882,32 @@ G_STMT_START { \
twcc_verify_packets_to_event (packets, event); \
} G_STMT_END

#define twcc_verify_stats(h, bitrate_sent, bitrate_recv, pkts_sent, pkts_recv, loss_pct, avg_dod) \
G_STMT_START { \
GstStructure *twcc_stats; \
guint stats_bitrate_sent; \
guint stats_bitrate_recv; \
guint stats_packets_sent; \
guint stats_packets_recv; \
gdouble stats_loss_pct; \
GstClockTimeDiff stats_avg_dod; \
twcc_stats = session_harness_get_last_twcc_stats (h); \
fail_unless (gst_structure_get (twcc_stats, \
"bitrate-sent", G_TYPE_UINT, &stats_bitrate_sent, \
"bitrate-recv", G_TYPE_UINT, &stats_bitrate_recv, \
"packets-sent", G_TYPE_UINT, &stats_packets_sent, \
"packets-recv", G_TYPE_UINT, &stats_packets_recv, \
"packet-loss-pct", G_TYPE_DOUBLE, &stats_loss_pct, \
"avg-delta-of-delta", G_TYPE_INT64, &stats_avg_dod, NULL)); \
fail_unless_equals_int (bitrate_sent, stats_bitrate_sent); \
fail_unless_equals_int (bitrate_recv, stats_bitrate_recv); \
fail_unless_equals_int (pkts_sent, stats_packets_sent); \
fail_unless_equals_int (pkts_recv, stats_packets_recv); \
fail_unless_equals_float (loss_pct, stats_loss_pct); \
fail_unless_equals_int64 (avg_dod, stats_avg_dod); \
gst_structure_free (twcc_stats); \
} G_STMT_END

GST_START_TEST (test_twcc_1_bit_status_vector)
{
SessionHarness *h0 = session_harness_new ();
Expand Down Expand Up @@ -3776,7 +3815,7 @@ GST_START_TEST (test_twcc_recv_late_packet_fb_pkt_count_wrap)
guint8 exp_fci0[] = {
0x01, 0x00, /* base sequence number: 256 */
0x00, 0x01, /* packet status count: 1 */
0x00, 0x00, 0x01, /* reference time: 0 */
0x00, 0x00, 0x01, /* reference time: 1 */
0x00, /* feedback packet count: 00 */
/* packet chunks: */
0x20, 0x01, /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 0 1 */
Expand All @@ -3787,8 +3826,8 @@ GST_START_TEST (test_twcc_recv_late_packet_fb_pkt_count_wrap)
guint8 exp_fci1[] = {
0x01, 0x01, /* base sequence number: 257 */
0x00, 0x01, /* packet status count: 1 */
0x00, 0x00, 0x01, /* reference time: 0 */
0x01, /* feedback packet count: 0 */
0x00, 0x00, 0x01, /* reference time: 1 */
0x01, /* feedback packet count: 1 */
/* packet chunks: */
0x20, 0x01, /* 0 0 1 0 0 0 0 0 | 0 0 0 0 0 0 0 1 */
0x01, /* 1 recv-delta */
Expand All @@ -3799,15 +3838,15 @@ GST_START_TEST (test_twcc_recv_late_packet_fb_pkt_count_wrap)

/* Push packets to get the feedback packet count wrap limit */
for (i = 0; i < 255; i++) {
GstClockTime ts = i * 250 * GST_USECOND;
gst_test_clock_set_time (h->testclock, ts);
fail_unless_equals_int (GST_FLOW_OK,
session_harness_recv_rtp ((h),
generate_twcc_recv_buffer (i, i * 250 * GST_USECOND, TRUE)));

/* ignore the twcc for these ones */
gst_buffer_unref (session_harness_produce_twcc (h));
generate_twcc_recv_buffer (i, ts, TRUE)));
}

/* push pkt #256 to jump ahead and force the overflow */
gst_test_clock_set_time (h->testclock, 256 * 250 * GST_USECOND);
fail_unless_equals_int (GST_FLOW_OK,
session_harness_recv_rtp ((h),
generate_twcc_recv_buffer (256, 256 * 250 * GST_USECOND, TRUE)));
Expand All @@ -3817,11 +3856,16 @@ GST_START_TEST (test_twcc_recv_late_packet_fb_pkt_count_wrap)
session_harness_recv_rtp ((h),
generate_twcc_recv_buffer (255, 255 * 250 * GST_USECOND, TRUE)));


/* push pkt #257 to verify fci is correct */
gst_test_clock_set_time (h->testclock, 257 * 250 * GST_USECOND);
fail_unless_equals_int (GST_FLOW_OK,
session_harness_recv_rtp ((h),
generate_twcc_recv_buffer (257, 257 * 250 * GST_USECOND, TRUE)));

/* ignore the twcc for the first 255 packets */
for (i = 0; i < 255; i++)
gst_buffer_unref (session_harness_produce_twcc (h));

/* we expect a fci for pkt #256 */
buf = session_harness_produce_twcc (h);
Expand Down Expand Up @@ -3943,6 +3987,7 @@ GST_START_TEST (test_twcc_send_and_recv)
buf = generate_twcc_send_buffer (seq, slice == num_slices - 1);
res = session_harness_send_rtp (h_send, buf);
fail_unless_equals_int (GST_FLOW_OK, res);
session_harness_advance_and_crank (h_send, TEST_BUF_DURATION);

/* get the buffer ready for the network */
buf = session_harness_pull_send_rtp (h_send);
Expand All @@ -3958,30 +4003,9 @@ GST_START_TEST (test_twcc_send_and_recv)
/* sender receives the TWCC packet */
session_harness_recv_rtcp (h_send, buf);

if (frame > 0) {
GstStructure *twcc_stats;
guint bitrate_sent;
guint bitrate_recv;
guint packets_sent;
guint packets_recv;
gdouble packet_loss_pct;
GstClockTimeDiff avg_delta_of_delta;
twcc_stats = session_harness_get_last_twcc_stats (h_send);
fail_unless (gst_structure_get (twcc_stats,
"bitrate-sent", G_TYPE_UINT, &bitrate_sent,
"bitrate-recv", G_TYPE_UINT, &bitrate_recv,
"packets-sent", G_TYPE_UINT, &packets_sent,
"packets-recv", G_TYPE_UINT, &packets_recv,
"packet-loss-pct", G_TYPE_DOUBLE, &packet_loss_pct,
"avg-delta-of-delta", G_TYPE_INT64, &avg_delta_of_delta, NULL));
fail_unless_equals_int (TEST_BUF_BPS, bitrate_sent);
fail_unless_equals_int (TEST_BUF_BPS, bitrate_recv);
fail_unless_equals_int (num_slices, packets_sent);
fail_unless_equals_int (num_slices, packets_recv);
fail_unless_equals_float (0.0f, packet_loss_pct);
fail_unless_equals_int64 (0, avg_delta_of_delta);
gst_structure_free (twcc_stats);
}
if (frame > 0)
twcc_verify_stats (h_send, TEST_BUF_BPS, TEST_BUF_BPS, num_slices,
num_slices, 0.0f, 0);
}

session_harness_free (h_send);
Expand All @@ -3990,6 +4014,51 @@ GST_START_TEST (test_twcc_send_and_recv)

GST_END_TEST;

GST_START_TEST (test_twcc_multiple_payloads_below_window)
{
SessionHarness *h_send = session_harness_new ();
SessionHarness *h_recv = session_harness_new ();

guint i;

GstBuffer *buffers[] = {
generate_twcc_send_buffer_full (0, FALSE, 0xabc, 98),
generate_twcc_send_buffer_full (0, FALSE, 0xdef, 111),
generate_twcc_send_buffer_full (1, FALSE, 0xdef, 111),
generate_twcc_send_buffer_full (2, FALSE, 0xdef, 111),
generate_twcc_send_buffer_full (1, TRUE, 0xabc, 98),
};

/* enable twcc */
session_harness_set_twcc_recv_ext_id (h_recv, TEST_TWCC_EXT_ID);
session_harness_set_twcc_send_ext_id (h_send, TEST_TWCC_EXT_ID);

for (i = 0; i < G_N_ELEMENTS (buffers); i++) {
GstBuffer *buf = buffers[i];
GstFlowReturn res;

/* from payloder to rtpbin */
res = session_harness_send_rtp (h_send, buf);
fail_unless_equals_int (GST_FLOW_OK, res);

buf = session_harness_pull_send_rtp (h_send);
session_harness_advance_and_crank (h_send, TEST_BUF_DURATION);

/* buffer arrives at the receiver */
res = session_harness_recv_rtp (h_recv, buf);
fail_unless_equals_int (GST_FLOW_OK, res);
}

/* sender receives the TWCC packet from the receiver */
session_harness_recv_rtcp (h_send, session_harness_produce_twcc (h_recv));
twcc_verify_stats (h_send, 0, 0, 5, 5, 0.0f, GST_CLOCK_STIME_NONE);

session_harness_free (h_send);
session_harness_free (h_recv);
}

GST_END_TEST;

typedef struct
{
GstClockTime interval;
Expand Down Expand Up @@ -4281,6 +4350,7 @@ rtpsession_suite (void)
tcase_add_test (tc_chain, test_twcc_recv_rtcp_reordered);
tcase_add_test (tc_chain, test_twcc_no_exthdr_in_buffer);
tcase_add_test (tc_chain, test_twcc_send_and_recv);
tcase_add_test (tc_chain, test_twcc_multiple_payloads_below_window);
tcase_add_loop_test (tc_chain, test_twcc_feedback_interval, 0,
G_N_ELEMENTS (test_twcc_feedback_interval_ctx));
tcase_add_test (tc_chain, test_twcc_feedback_count_wrap);
Expand Down

0 comments on commit 266c2d0

Please sign in to comment.