diff --git a/pjmedia/include/pjmedia/transport.h b/pjmedia/include/pjmedia/transport.h index ff7544366b..923593dc0f 100644 --- a/pjmedia/include/pjmedia/transport.h +++ b/pjmedia/include/pjmedia/transport.h @@ -513,6 +513,9 @@ struct pjmedia_transport /** Application/user data */ void *user_data; + + /** Group lock, for synchronization between destroy() & callbacks. */ + pj_grp_lock_t *grp_lock; }; /** diff --git a/pjmedia/src/pjmedia/transport_adapter_sample.c b/pjmedia/src/pjmedia/transport_adapter_sample.c index e150292bed..96a2fa48e5 100644 --- a/pjmedia/src/pjmedia/transport_adapter_sample.c +++ b/pjmedia/src/pjmedia/transport_adapter_sample.c @@ -106,6 +106,9 @@ struct tp_adapter }; +static void adapter_on_destroy(void *arg); + + /* * Create the adapter. */ @@ -135,6 +138,15 @@ PJ_DEF(pj_status_t) pjmedia_tp_adapter_create( pjmedia_endpt *endpt, adapter->slave_tp = transport; adapter->del_base = del_base; + /* Setup group lock handler for destroy and callback synchronization */ + if (transport && transport->grp_lock) { + pj_grp_lock_t *grp_lock = transport->grp_lock; + + adapter->base.grp_lock = grp_lock; + pj_grp_lock_add_ref(grp_lock); + pj_grp_lock_add_handler(grp_lock, pool, adapter, &adapter_on_destroy); + } + /* Done */ *p_tp = &adapter->base; return PJ_SUCCESS; @@ -421,6 +433,14 @@ static pj_status_t transport_simulate_lost(pjmedia_transport *tp, return pjmedia_transport_simulate_lost(adapter->slave_tp, dir, pct_lost); } + +static void adapter_on_destroy(void *arg) +{ + struct tp_adapter *adapter = (struct tp_adapter*)arg; + + pj_pool_release(adapter->pool); +} + /* * destroy() is called when the transport is no longer needed. */ @@ -433,8 +453,11 @@ static pj_status_t transport_destroy (pjmedia_transport *tp) pjmedia_transport_close(adapter->slave_tp); } - /* Self destruct.. */ - pj_pool_release(adapter->pool); + if (adapter->base.grp_lock) { + pj_grp_lock_dec_ref(adapter->base.grp_lock); + } else { + adapter_on_destroy(tp); + } return PJ_SUCCESS; } diff --git a/pjmedia/src/pjmedia/transport_ice.c b/pjmedia/src/pjmedia/transport_ice.c index 8edf1d7106..c8d1ae7516 100644 --- a/pjmedia/src/pjmedia/transport_ice.c +++ b/pjmedia/src/pjmedia/transport_ice.c @@ -338,6 +338,7 @@ PJ_DEF(pj_status_t) pjmedia_ice_create3(pjmedia_endpt *endpt, pj_grp_lock_t *grp_lock = pj_ice_strans_get_grp_lock(tp_ice->ice_st); pj_grp_lock_add_ref(grp_lock); pj_grp_lock_add_handler(grp_lock, pool, tp_ice, &tp_ice_on_destroy); + tp_ice->base.grp_lock = grp_lock; } /* Done */ @@ -2736,6 +2737,8 @@ static pj_status_t transport_simulate_lost(pjmedia_transport *tp, static void tp_ice_on_destroy(void *arg) { struct transport_ice *tp_ice = (struct transport_ice*)arg; + + PJ_LOG(4, (tp_ice->base.name, "ICE transport destroyed")); pj_pool_safe_release(&tp_ice->pool); } @@ -2746,6 +2749,8 @@ static pj_status_t transport_destroy(pjmedia_transport *tp) { struct transport_ice *tp_ice = (struct transport_ice*)tp; + PJ_LOG(4, (tp_ice->base.name, "Destroying ICE transport")); + /* Reset callback and user data */ pj_bzero(&tp_ice->cb, sizeof(tp_ice->cb)); tp_ice->base.user_data = NULL; diff --git a/pjmedia/src/pjmedia/transport_loop.c b/pjmedia/src/pjmedia/transport_loop.c index a1f0ebbd4a..37e6f18cd5 100644 --- a/pjmedia/src/pjmedia/transport_loop.c +++ b/pjmedia/src/pjmedia/transport_loop.c @@ -130,6 +130,7 @@ static pjmedia_transport_op transport_udp_op = &transport_attach2 }; +static void tp_loop_on_destroy(void *arg); /** * Initialize loopback media transport setting with its default values. @@ -164,6 +165,8 @@ pjmedia_transport_loop_create2(pjmedia_endpt *endpt, { struct transport_loop *tp; pj_pool_t *pool; + pj_grp_lock_t *grp_lock; + pj_status_t status; /* Sanity check */ PJ_ASSERT_RETURN(endpt && p_tp, PJ_EINVAL); @@ -179,6 +182,14 @@ pjmedia_transport_loop_create2(pjmedia_endpt *endpt, tp->base.op = &transport_udp_op; tp->base.type = PJMEDIA_TRANSPORT_TYPE_UDP; + /* Create group lock */ + status = pj_grp_lock_create(pool, NULL, &grp_lock); + if (status != PJ_SUCCESS) + return status; + + pj_grp_lock_add_ref(grp_lock); + pj_grp_lock_add_handler(grp_lock, pool, tp, &tp_loop_on_destroy); + if (opt) { tp->setting = *opt; } else { @@ -222,17 +233,25 @@ PJ_DEF(pj_status_t) pjmedia_transport_loop_disable_rx( pjmedia_transport *tp, return PJ_ENOTFOUND; } + +static void tp_loop_on_destroy(void *arg) +{ + struct transport_loop *loop = (struct transport_loop*) arg; + + PJ_LOG(4, (loop->base.name, "Loop transport destroyed")); + pj_pool_release(loop->pool); +} + + /** * Close loopback transport. */ static pj_status_t transport_destroy(pjmedia_transport *tp) { - struct transport_loop *loop = (struct transport_loop*) tp; - /* Sanity check */ PJ_ASSERT_RETURN(tp, PJ_EINVAL); - pj_pool_release(loop->pool); + pj_grp_lock_dec_ref(tp->grp_lock); return PJ_SUCCESS; } @@ -378,6 +397,8 @@ static pj_status_t transport_send_rtp( pjmedia_transport *tp, } } + pj_grp_lock_add_ref(tp->grp_lock); + /* Distribute to users */ for (i=0; iuser_cnt; ++i) { if (loop->users[i].rx_disabled) continue; @@ -395,6 +416,8 @@ static pj_status_t transport_send_rtp( pjmedia_transport *tp, } } + pj_grp_lock_dec_ref(tp->grp_lock); + return PJ_SUCCESS; } @@ -420,6 +443,8 @@ static pj_status_t transport_send_rtcp2(pjmedia_transport *tp, PJ_UNUSED_ARG(addr_len); PJ_UNUSED_ARG(addr); + pj_grp_lock_add_ref(tp->grp_lock); + /* Distribute to users */ for (i=0; iuser_cnt; ++i) { if (!loop->users[i].rx_disabled && loop->users[i].rtcp_cb) @@ -427,6 +452,8 @@ static pj_status_t transport_send_rtcp2(pjmedia_transport *tp, size); } + pj_grp_lock_dec_ref(tp->grp_lock); + return PJ_SUCCESS; } diff --git a/pjmedia/src/pjmedia/transport_srtp.c b/pjmedia/src/pjmedia/transport_srtp.c index f95941581d..fc1478a175 100644 --- a/pjmedia/src/pjmedia/transport_srtp.c +++ b/pjmedia/src/pjmedia/transport_srtp.c @@ -434,6 +434,8 @@ static pj_status_t create_srtp_ctx(transport_srtp *srtp, /* Destroy SRTP context */ static void destroy_srtp_ctx(transport_srtp *p_srtp, srtp_context *ctx); +/* SRTP destroy handler */ +static void srtp_on_destroy(void *arg); /* This function may also be used by other module, e.g: pjmedia/errno.c, * it should have C compatible declaration. @@ -805,6 +807,13 @@ PJ_DEF(pj_status_t) pjmedia_transport_srtp_create( /* Set underlying transport */ srtp->member_tp = tp; + /* Setup group lock handler for destroy and callback synchronization */ + if (tp && tp->grp_lock) { + srtp->base.grp_lock = tp->grp_lock; + pj_grp_lock_add_ref(tp->grp_lock); + pj_grp_lock_add_handler(tp->grp_lock, pool, srtp, &srtp_on_destroy); + } + /* Initialize peer's SRTP usage mode. */ srtp->peer_use = srtp->setting.use; @@ -839,6 +848,8 @@ PJ_DEF(pj_status_t) pjmedia_transport_srtp_create( /* Done */ *p_tp = &srtp->base; + PJ_LOG(4, (srtp->pool->obj_name, "SRTP transport created")); + return PJ_SUCCESS; } @@ -1459,6 +1470,19 @@ static pj_status_t transport_simulate_lost(pjmedia_transport *tp, return pjmedia_transport_simulate_lost(srtp->member_tp, dir, pct_lost); } + +/* SRTP real destroy */ +static void srtp_on_destroy(void *arg) +{ + transport_srtp *srtp = (transport_srtp*)arg; + + PJ_LOG(4, (srtp->pool->obj_name, "SRTP transport destroyed")); + + pj_lock_destroy(srtp->mutex); + pj_pool_safe_release(&srtp->pool); +} + + static pj_status_t transport_destroy (pjmedia_transport *tp) { transport_srtp *srtp = (transport_srtp *) tp; @@ -1467,6 +1491,8 @@ static pj_status_t transport_destroy (pjmedia_transport *tp) PJ_ASSERT_RETURN(tp, PJ_EINVAL); + PJ_LOG(4, (srtp->pool->obj_name, "Destroying SRTP transport")); + /* Close all keying. Note that any keying should not be destroyed before * SRTP transport is destroyed as re-INVITE may initiate new keying method * without destroying SRTP transport. @@ -1481,12 +1507,25 @@ static pj_status_t transport_destroy (pjmedia_transport *tp) status = pjmedia_transport_srtp_stop(tp); - /* In case mutex is being acquired by other thread */ - pj_lock_acquire(srtp->mutex); - pj_lock_release(srtp->mutex); + if (srtp->base.grp_lock) { + pj_grp_lock_dec_ref(srtp->base.grp_lock); + } else { + /* Only get here when the underlying transport does not have + * a group lock, race condition with callbacks may occur. + * Currently UDP, ICE, and loop have a group lock already. + */ + PJ_LOG(4,(srtp->pool->obj_name, + "Warning: underlying transport does not have group lock")); - pj_lock_destroy(srtp->mutex); - pj_pool_release(srtp->pool); + /* In case mutex is being acquired by other thread. + * An effort to synchronize destroy() & callbacks when the underlying + * transport does not provide a group lock. + */ + pj_lock_acquire(srtp->mutex); + pj_lock_release(srtp->mutex); + + srtp_on_destroy(srtp); + } return status; } diff --git a/pjmedia/src/pjmedia/transport_srtp_dtls.c b/pjmedia/src/pjmedia/transport_srtp_dtls.c index 9a374d71fb..183d16e008 100644 --- a/pjmedia/src/pjmedia/transport_srtp_dtls.c +++ b/pjmedia/src/pjmedia/transport_srtp_dtls.c @@ -80,6 +80,8 @@ static void on_ice_complete2(pjmedia_transport *tp, pj_status_t status, void *user_data); +static void dtls_on_destroy(void *arg); + static pjmedia_transport_op dtls_op = { @@ -134,6 +136,7 @@ typedef struct dtls_srtp pj_bool_t pending_start; /* media_start() invoked but DTLS nego not done yet, so start the SRTP once the nego done */ + pj_bool_t is_destroying; /* DTLS being destroyed? */ pj_bool_t got_keys; /* DTLS nego done & keys ready */ pjmedia_srtp_crypto tx_crypto[NUM_CHANNEL]; pjmedia_srtp_crypto rx_crypto[NUM_CHANNEL]; @@ -269,7 +272,7 @@ static pj_status_t dtls_create(transport_srtp *srtp, { dtls_srtp *ds; pj_pool_t *pool; - pj_status_t status; + pj_status_t status; pool = pj_pool_create(srtp->pool->factory, "dtls%p", 2000, 256, NULL); @@ -282,10 +285,19 @@ static pj_status_t dtls_create(transport_srtp *srtp, ds->base.user_data = srtp; ds->srtp = srtp; - status = pj_lock_create_simple_mutex(ds->pool, "dtls_ssl_lock%p", - &ds->ossl_lock); - if (status != PJ_SUCCESS) - return status; + /* Setup group lock handler for destroy and callback synchronization */ + if (srtp->base.grp_lock) { + pj_grp_lock_t *grp_lock = srtp->base.grp_lock; + + ds->base.grp_lock = grp_lock; + pj_grp_lock_add_ref(grp_lock); + pj_grp_lock_add_handler(grp_lock, pool, ds, &dtls_on_destroy); + } else { + status = pj_lock_create_simple_mutex(ds->pool, "dtls_ssl_lock%p", + &ds->ossl_lock); + if (status != PJ_SUCCESS) + return status; + } *p_keying = &ds->base; PJ_LOG(5,(srtp->pool->obj_name, "SRTP keying DTLS-SRTP created")); @@ -293,6 +305,24 @@ static pj_status_t dtls_create(transport_srtp *srtp, } +/* Lock/unlock for DTLS states access protection */ + +static void DTLS_LOCK(dtls_srtp *ds) { + if (ds->base.grp_lock) + pj_grp_lock_acquire(ds->base.grp_lock); + else + pj_lock_acquire(ds->ossl_lock); +} + + +static void DTLS_UNLOCK(dtls_srtp *ds) { + if (ds->base.grp_lock) + pj_grp_lock_release(ds->base.grp_lock); + else + pj_lock_release(ds->ossl_lock); +} + + /** * Mapping from OpenSSL error codes to pjlib error space. */ @@ -545,7 +575,7 @@ static pj_status_t ssl_create(dtls_srtp *ds, unsigned idx) /* Destroy SSL context and instance */ static void ssl_destroy(dtls_srtp *ds, unsigned idx) { - pj_lock_acquire(ds->ossl_lock); + DTLS_LOCK(ds); /* Destroy SSL instance */ if (ds->ossl_ssl[idx]) { @@ -570,7 +600,7 @@ static void ssl_destroy(dtls_srtp *ds, unsigned idx) ds->ossl_ctx[idx] = NULL; } - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); } static pj_status_t ssl_get_srtp_material(dtls_srtp *ds, unsigned idx) @@ -581,7 +611,7 @@ static pj_status_t ssl_get_srtp_material(dtls_srtp *ds, unsigned idx) pjmedia_srtp_crypto *tx, *rx; pj_status_t status = PJ_SUCCESS; - pj_lock_acquire(ds->ossl_lock); + DTLS_LOCK(ds); if (!ds->ossl_ssl[idx]) { status = PJ_EGONE; @@ -652,7 +682,7 @@ static pj_status_t ssl_get_srtp_material(dtls_srtp *ds, unsigned idx) } on_return: - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); return status; } @@ -676,16 +706,16 @@ static pj_status_t ssl_match_fingerprint(dtls_srtp *ds, unsigned idx) return PJ_ENOTSUP; } - pj_lock_acquire(ds->ossl_lock); + DTLS_LOCK(ds); if (!ds->ossl_ssl[idx]) { - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); return PJ_EGONE; } /* Get remote cert & calculate the hash */ rem_cert = SSL_get_peer_certificate(ds->ossl_ssl[idx]); - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); if (!rem_cert) return PJMEDIA_SRTP_DTLS_EPEERNOCERT; @@ -748,10 +778,10 @@ static pj_status_t ssl_flush_wbio(dtls_srtp *ds, unsigned idx) pj_size_t len; pj_status_t status = PJ_SUCCESS; - pj_lock_acquire(ds->ossl_lock); + DTLS_LOCK(ds); if (!ds->ossl_wbio[idx]) { - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); return PJ_EGONE; } @@ -760,7 +790,7 @@ static pj_status_t ssl_flush_wbio(dtls_srtp *ds, unsigned idx) /* Yes, get and send it */ len = BIO_read(ds->ossl_wbio[idx], ds->buf[idx], sizeof(ds->buf)); if (len > 0) { - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); status = send_raw(ds, idx, ds->buf[idx], len); if (status != PJ_SUCCESS) { @@ -771,12 +801,12 @@ static pj_status_t ssl_flush_wbio(dtls_srtp *ds, unsigned idx) * its packet when not receiving from us. */ } - pj_lock_acquire(ds->ossl_lock); + DTLS_LOCK(ds); } } if (!ds->ossl_ssl[idx]) { - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); return PJ_EGONE; } @@ -784,7 +814,7 @@ static pj_status_t ssl_flush_wbio(dtls_srtp *ds, unsigned idx) * verification, etc) has been done or handshake is still in progress. */ if (ds->nego_completed[idx] || !SSL_is_init_finished(ds->ossl_ssl[idx])) { - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); return PJ_SUCCESS; } @@ -793,7 +823,7 @@ static pj_status_t ssl_flush_wbio(dtls_srtp *ds, unsigned idx) PJ_LOG(2,(ds->base.name, "DTLS-SRTP negotiation for %s completed!", CHANNEL_TO_STRING(idx))); - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); /* Stop the retransmission clock. Note that the clock may not be stopped * if this function is called from clock thread context. We'll try again @@ -867,18 +897,18 @@ static void clock_cb(const pj_timestamp *ts, void *user_data) PJ_UNUSED_ARG(ts); - pj_lock_acquire(ds->ossl_lock); + DTLS_LOCK(ds); if (!ds->ossl_ssl[idx]) { - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); return; } if (DTLSv1_handle_timeout(ds->ossl_ssl[idx]) > 0) { - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); ssl_flush_wbio(ds, idx); } else { - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); } } @@ -889,18 +919,18 @@ static pj_status_t ssl_handshake_channel(dtls_srtp *ds, unsigned idx) pj_status_t status; int err; - pj_lock_acquire(ds->ossl_lock); + DTLS_LOCK(ds); /* Init DTLS (if not yet) */ status = ssl_create(ds, idx); if (status != PJ_SUCCESS) { - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); return status; } /* Check if handshake has been initiated or even completed */ if (ds->nego_started[idx] || SSL_is_init_finished(ds->ossl_ssl[idx])) { - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); return PJ_SUCCESS; } @@ -914,7 +944,7 @@ static pj_status_t ssl_handshake_channel(dtls_srtp *ds, unsigned idx) if (err < 0) { err = SSL_get_error(ds->ossl_ssl[idx], err); - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); if (err == SSL_ERROR_WANT_READ) { status = ssl_flush_wbio(ds, idx); @@ -927,7 +957,7 @@ static pj_status_t ssl_handshake_channel(dtls_srtp *ds, unsigned idx) goto on_return; } } else { - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); } /* Create and start clock @4Hz for retransmission */ @@ -1135,10 +1165,10 @@ static pj_status_t ssl_on_recv_packet(dtls_srtp *ds, unsigned idx, char tmp[128]; pj_size_t nwritten; - pj_lock_acquire(ds->ossl_lock); + DTLS_LOCK(ds); if (!ds->ossl_rbio[idx]) { - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); return PJ_EGONE; } @@ -1150,12 +1180,12 @@ static pj_status_t ssl_on_recv_packet(dtls_srtp *ds, unsigned idx, #if DTLS_DEBUG pj_perror(2, ds->base.name, status, "BIO_write() error"); #endif - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); return status; } if (!ds->ossl_ssl[idx]) { - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); return PJ_EGONE; } @@ -1172,7 +1202,7 @@ static pj_status_t ssl_on_recv_packet(dtls_srtp *ds, unsigned idx, } } - pj_lock_release(ds->ossl_lock); + DTLS_UNLOCK(ds); /* Flush anything pending in the write BIO */ return ssl_flush_wbio(ds, idx); @@ -1211,14 +1241,18 @@ static pj_status_t dtls_on_recv(pjmedia_transport *tp, unsigned idx, { dtls_srtp *ds = (dtls_srtp*)tp; + DTLS_LOCK(ds); + /* Destroy the retransmission clock if handshake has been completed. */ if (ds->clock[idx] && ds->nego_completed[idx]) { pjmedia_clock_destroy(ds->clock[idx]); ds->clock[idx] = NULL; } - if (size < 1 || !IS_DTLS_PKT(pkt, size)) + if (size < 1 || !IS_DTLS_PKT(pkt, size) || ds->is_destroying) { + DTLS_UNLOCK(ds); return PJ_EIGNORED; + } #if DTLS_DEBUG PJ_LOG(2,(ds->base.name, "DTLS-SRTP %s receiving %lu bytes", @@ -1258,8 +1292,10 @@ static pj_status_t dtls_on_recv(pjmedia_transport *tp, unsigned idx, } status = pjmedia_transport_attach2(&ds->srtp->base, &ap); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + DTLS_UNLOCK(ds); return status; + } #if DTLS_DEBUG { @@ -1283,12 +1319,17 @@ static pj_status_t dtls_on_recv(pjmedia_transport *tp, unsigned idx, pj_status_t status; ds->setup = DTLS_SETUP_PASSIVE; status = ssl_handshake_channel(ds, idx); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + DTLS_UNLOCK(ds); return status; + } } /* Send it to OpenSSL */ ssl_on_recv_packet(ds, idx, pkt, size); + + DTLS_UNLOCK(ds); + return PJ_SUCCESS; } @@ -1821,6 +1862,15 @@ static void dtls_destroy_channel(dtls_srtp *ds, unsigned idx) ssl_destroy(ds, idx); } +static void dtls_on_destroy(void *arg) { + dtls_srtp *ds = (dtls_srtp *)arg; + + if (ds->ossl_lock) + pj_lock_destroy(ds->ossl_lock); + + pj_pool_safe_release(&ds->pool); +} + static pj_status_t dtls_destroy(pjmedia_transport *tp) { dtls_srtp *ds = (dtls_srtp *)tp; @@ -1829,15 +1879,20 @@ static pj_status_t dtls_destroy(pjmedia_transport *tp) PJ_LOG(2,(ds->base.name, "dtls_destroy()")); #endif + ds->is_destroying = PJ_TRUE; + + DTLS_LOCK(ds); + dtls_destroy_channel(ds, RTP_CHANNEL); dtls_destroy_channel(ds, RTCP_CHANNEL); - if (ds->ossl_lock) { - pj_lock_destroy(ds->ossl_lock); - ds->ossl_lock = NULL; - } + DTLS_UNLOCK(ds); - pj_pool_safe_release(&ds->pool); + if (ds->base.grp_lock) { + pj_grp_lock_dec_ref(ds->base.grp_lock); + } else { + dtls_on_destroy(tp); + } return PJ_SUCCESS; } diff --git a/pjmedia/src/pjmedia/transport_udp.c b/pjmedia/src/pjmedia/transport_udp.c index 9727ac0455..cf3471ced9 100644 --- a/pjmedia/src/pjmedia/transport_udp.c +++ b/pjmedia/src/pjmedia/transport_udp.c @@ -298,6 +298,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt, pj_pool_t *pool; pj_ioqueue_t *ioqueue; pj_ioqueue_callback rtp_cb, rtcp_cb; + pj_grp_lock_t *grp_lock; pj_status_t status; @@ -348,18 +349,29 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt, pj_sockaddr_get_addr_len(&tp->rtp_addr_name)); } + /* Create group lock */ + status = pj_grp_lock_create(pool, NULL, &grp_lock); + if (status != PJ_SUCCESS) + goto on_error; + + pj_grp_lock_add_ref(grp_lock); + tp->base.grp_lock = grp_lock; + /* Setup RTP socket with the ioqueue */ pj_bzero(&rtp_cb, sizeof(rtp_cb)); rtp_cb.on_read_complete = &on_rx_rtp; rtp_cb.on_write_complete = &on_rtp_data_sent; - status = pj_ioqueue_register_sock(pool, ioqueue, tp->rtp_sock, tp, - &rtp_cb, &tp->rtp_key); + status = pj_ioqueue_register_sock2(pool, ioqueue, tp->rtp_sock, grp_lock, + tp, &rtp_cb, &tp->rtp_key); if (status != PJ_SUCCESS) goto on_error; /* Disallow concurrency so that detach() and destroy() are * synchronized with the callback. + * + * Note that we still need this even after group lock is added to + * maintain the above behavior. */ status = pj_ioqueue_set_concurrency(tp->rtp_key, PJ_FALSE); if (status != PJ_SUCCESS) @@ -388,8 +400,8 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt, pj_bzero(&rtcp_cb, sizeof(rtcp_cb)); rtcp_cb.on_read_complete = &on_rx_rtcp; - status = pj_ioqueue_register_sock(pool, ioqueue, tp->rtcp_sock, tp, - &rtcp_cb, &tp->rtcp_key); + status = pj_ioqueue_register_sock2(pool, ioqueue, tp->rtcp_sock, grp_lock, + tp, &rtcp_cb, &tp->rtcp_key); if (status != PJ_SUCCESS) goto on_error; @@ -436,12 +448,13 @@ static pj_status_t transport_destroy(pjmedia_transport *tp) /* Must not close while application is using this */ //PJ_ASSERT_RETURN(!udp->attached, PJ_EINVALIDOP); - + + /* The following calls to pj_ioqueue_unregister() will block the execution + * if callback is still being called because allow_concurrent is false. + * So it is safe to release the pool immediately after. + */ if (udp->rtp_key) { - /* This will block the execution if callback is still - * being called. - */ pj_ioqueue_unregister(udp->rtp_key); udp->rtp_key = NULL; udp->rtp_sock = PJ_INVALID_SOCKET; @@ -459,6 +472,8 @@ static pj_status_t transport_destroy(pjmedia_transport *tp) udp->rtcp_sock = PJ_INVALID_SOCKET; } + pj_grp_lock_dec_ref(tp->grp_lock); + PJ_LOG(4,(udp->base.name, "UDP media transport destroyed")); pj_pool_release(udp->pool);