Merge pull request from GHSA-f76w-fh7c-pc66

* Add group lock to media transport

* Also add group lock to SRTP-DTLS

* Put lock protection to avoid race condition between destroy() & dtls_on_recv()
This commit is contained in:
Nanang Izzuddin 2023-10-03 09:59:03 +07:00 committed by GitHub
parent 2c1207c30b
commit 6dc9b8c181
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 226 additions and 59 deletions

View File

@ -513,6 +513,9 @@ struct pjmedia_transport
/** Application/user data */
void *user_data;
/** Group lock, for synchronization between destroy() & callbacks. */
pj_grp_lock_t *grp_lock;
};
/**

View File

@ -106,6 +106,9 @@ struct tp_adapter
};
static void adapter_on_destroy(void *arg);
/*
* Create the adapter.
*/
@ -135,6 +138,15 @@ PJ_DEF(pj_status_t) pjmedia_tp_adapter_create( pjmedia_endpt *endpt,
adapter->slave_tp = transport;
adapter->del_base = del_base;
/* Setup group lock handler for destroy and callback synchronization */
if (transport && transport->grp_lock) {
pj_grp_lock_t *grp_lock = transport->grp_lock;
adapter->base.grp_lock = grp_lock;
pj_grp_lock_add_ref(grp_lock);
pj_grp_lock_add_handler(grp_lock, pool, adapter, &adapter_on_destroy);
}
/* Done */
*p_tp = &adapter->base;
return PJ_SUCCESS;
@ -421,6 +433,14 @@ static pj_status_t transport_simulate_lost(pjmedia_transport *tp,
return pjmedia_transport_simulate_lost(adapter->slave_tp, dir, pct_lost);
}
static void adapter_on_destroy(void *arg)
{
struct tp_adapter *adapter = (struct tp_adapter*)arg;
pj_pool_release(adapter->pool);
}
/*
* destroy() is called when the transport is no longer needed.
*/
@ -433,8 +453,11 @@ static pj_status_t transport_destroy (pjmedia_transport *tp)
pjmedia_transport_close(adapter->slave_tp);
}
/* Self destruct.. */
pj_pool_release(adapter->pool);
if (adapter->base.grp_lock) {
pj_grp_lock_dec_ref(adapter->base.grp_lock);
} else {
adapter_on_destroy(tp);
}
return PJ_SUCCESS;
}

View File

@ -338,6 +338,7 @@ PJ_DEF(pj_status_t) pjmedia_ice_create3(pjmedia_endpt *endpt,
pj_grp_lock_t *grp_lock = pj_ice_strans_get_grp_lock(tp_ice->ice_st);
pj_grp_lock_add_ref(grp_lock);
pj_grp_lock_add_handler(grp_lock, pool, tp_ice, &tp_ice_on_destroy);
tp_ice->base.grp_lock = grp_lock;
}
/* Done */
@ -2736,6 +2737,8 @@ static pj_status_t transport_simulate_lost(pjmedia_transport *tp,
static void tp_ice_on_destroy(void *arg)
{
struct transport_ice *tp_ice = (struct transport_ice*)arg;
PJ_LOG(4, (tp_ice->base.name, "ICE transport destroyed"));
pj_pool_safe_release(&tp_ice->pool);
}
@ -2746,6 +2749,8 @@ static pj_status_t transport_destroy(pjmedia_transport *tp)
{
struct transport_ice *tp_ice = (struct transport_ice*)tp;
PJ_LOG(4, (tp_ice->base.name, "Destroying ICE transport"));
/* Reset callback and user data */
pj_bzero(&tp_ice->cb, sizeof(tp_ice->cb));
tp_ice->base.user_data = NULL;

View File

@ -130,6 +130,7 @@ static pjmedia_transport_op transport_udp_op =
&transport_attach2
};
static void tp_loop_on_destroy(void *arg);
/**
* Initialize loopback media transport setting with its default values.
@ -164,6 +165,8 @@ pjmedia_transport_loop_create2(pjmedia_endpt *endpt,
{
struct transport_loop *tp;
pj_pool_t *pool;
pj_grp_lock_t *grp_lock;
pj_status_t status;
/* Sanity check */
PJ_ASSERT_RETURN(endpt && p_tp, PJ_EINVAL);
@ -179,6 +182,14 @@ pjmedia_transport_loop_create2(pjmedia_endpt *endpt,
tp->base.op = &transport_udp_op;
tp->base.type = PJMEDIA_TRANSPORT_TYPE_UDP;
/* Create group lock */
status = pj_grp_lock_create(pool, NULL, &grp_lock);
if (status != PJ_SUCCESS)
return status;
pj_grp_lock_add_ref(grp_lock);
pj_grp_lock_add_handler(grp_lock, pool, tp, &tp_loop_on_destroy);
if (opt) {
tp->setting = *opt;
} else {
@ -222,17 +233,25 @@ PJ_DEF(pj_status_t) pjmedia_transport_loop_disable_rx( pjmedia_transport *tp,
return PJ_ENOTFOUND;
}
static void tp_loop_on_destroy(void *arg)
{
struct transport_loop *loop = (struct transport_loop*) arg;
PJ_LOG(4, (loop->base.name, "Loop transport destroyed"));
pj_pool_release(loop->pool);
}
/**
* Close loopback transport.
*/
static pj_status_t transport_destroy(pjmedia_transport *tp)
{
struct transport_loop *loop = (struct transport_loop*) tp;
/* Sanity check */
PJ_ASSERT_RETURN(tp, PJ_EINVAL);
pj_pool_release(loop->pool);
pj_grp_lock_dec_ref(tp->grp_lock);
return PJ_SUCCESS;
}
@ -378,6 +397,8 @@ static pj_status_t transport_send_rtp( pjmedia_transport *tp,
}
}
pj_grp_lock_add_ref(tp->grp_lock);
/* Distribute to users */
for (i=0; i<loop->user_cnt; ++i) {
if (loop->users[i].rx_disabled) continue;
@ -395,6 +416,8 @@ static pj_status_t transport_send_rtp( pjmedia_transport *tp,
}
}
pj_grp_lock_dec_ref(tp->grp_lock);
return PJ_SUCCESS;
}
@ -420,6 +443,8 @@ static pj_status_t transport_send_rtcp2(pjmedia_transport *tp,
PJ_UNUSED_ARG(addr_len);
PJ_UNUSED_ARG(addr);
pj_grp_lock_add_ref(tp->grp_lock);
/* Distribute to users */
for (i=0; i<loop->user_cnt; ++i) {
if (!loop->users[i].rx_disabled && loop->users[i].rtcp_cb)
@ -427,6 +452,8 @@ static pj_status_t transport_send_rtcp2(pjmedia_transport *tp,
size);
}
pj_grp_lock_dec_ref(tp->grp_lock);
return PJ_SUCCESS;
}

View File

@ -434,6 +434,8 @@ static pj_status_t create_srtp_ctx(transport_srtp *srtp,
/* Destroy SRTP context */
static void destroy_srtp_ctx(transport_srtp *p_srtp, srtp_context *ctx);
/* SRTP destroy handler */
static void srtp_on_destroy(void *arg);
/* This function may also be used by other module, e.g: pjmedia/errno.c,
* it should have C compatible declaration.
@ -805,6 +807,13 @@ PJ_DEF(pj_status_t) pjmedia_transport_srtp_create(
/* Set underlying transport */
srtp->member_tp = tp;
/* Setup group lock handler for destroy and callback synchronization */
if (tp && tp->grp_lock) {
srtp->base.grp_lock = tp->grp_lock;
pj_grp_lock_add_ref(tp->grp_lock);
pj_grp_lock_add_handler(tp->grp_lock, pool, srtp, &srtp_on_destroy);
}
/* Initialize peer's SRTP usage mode. */
srtp->peer_use = srtp->setting.use;
@ -839,6 +848,8 @@ PJ_DEF(pj_status_t) pjmedia_transport_srtp_create(
/* Done */
*p_tp = &srtp->base;
PJ_LOG(4, (srtp->pool->obj_name, "SRTP transport created"));
return PJ_SUCCESS;
}
@ -1459,6 +1470,19 @@ static pj_status_t transport_simulate_lost(pjmedia_transport *tp,
return pjmedia_transport_simulate_lost(srtp->member_tp, dir, pct_lost);
}
/* SRTP real destroy */
static void srtp_on_destroy(void *arg)
{
transport_srtp *srtp = (transport_srtp*)arg;
PJ_LOG(4, (srtp->pool->obj_name, "SRTP transport destroyed"));
pj_lock_destroy(srtp->mutex);
pj_pool_safe_release(&srtp->pool);
}
static pj_status_t transport_destroy (pjmedia_transport *tp)
{
transport_srtp *srtp = (transport_srtp *) tp;
@ -1467,6 +1491,8 @@ static pj_status_t transport_destroy (pjmedia_transport *tp)
PJ_ASSERT_RETURN(tp, PJ_EINVAL);
PJ_LOG(4, (srtp->pool->obj_name, "Destroying SRTP transport"));
/* Close all keying. Note that any keying should not be destroyed before
* SRTP transport is destroyed as re-INVITE may initiate new keying method
* without destroying SRTP transport.
@ -1481,12 +1507,25 @@ static pj_status_t transport_destroy (pjmedia_transport *tp)
status = pjmedia_transport_srtp_stop(tp);
/* In case mutex is being acquired by other thread */
pj_lock_acquire(srtp->mutex);
pj_lock_release(srtp->mutex);
if (srtp->base.grp_lock) {
pj_grp_lock_dec_ref(srtp->base.grp_lock);
} else {
/* Only get here when the underlying transport does not have
* a group lock, race condition with callbacks may occur.
* Currently UDP, ICE, and loop have a group lock already.
*/
PJ_LOG(4,(srtp->pool->obj_name,
"Warning: underlying transport does not have group lock"));
pj_lock_destroy(srtp->mutex);
pj_pool_release(srtp->pool);
/* In case mutex is being acquired by other thread.
* An effort to synchronize destroy() & callbacks when the underlying
* transport does not provide a group lock.
*/
pj_lock_acquire(srtp->mutex);
pj_lock_release(srtp->mutex);
srtp_on_destroy(srtp);
}
return status;
}

View File

@ -80,6 +80,8 @@ static void on_ice_complete2(pjmedia_transport *tp,
pj_status_t status,
void *user_data);
static void dtls_on_destroy(void *arg);
static pjmedia_transport_op dtls_op =
{
@ -134,6 +136,7 @@ typedef struct dtls_srtp
pj_bool_t pending_start; /* media_start() invoked but DTLS
nego not done yet, so start
the SRTP once the nego done */
pj_bool_t is_destroying; /* DTLS being destroyed? */
pj_bool_t got_keys; /* DTLS nego done & keys ready */
pjmedia_srtp_crypto tx_crypto[NUM_CHANNEL];
pjmedia_srtp_crypto rx_crypto[NUM_CHANNEL];
@ -269,7 +272,7 @@ static pj_status_t dtls_create(transport_srtp *srtp,
{
dtls_srtp *ds;
pj_pool_t *pool;
pj_status_t status;
pj_status_t status;
pool = pj_pool_create(srtp->pool->factory, "dtls%p",
2000, 256, NULL);
@ -282,10 +285,19 @@ static pj_status_t dtls_create(transport_srtp *srtp,
ds->base.user_data = srtp;
ds->srtp = srtp;
status = pj_lock_create_simple_mutex(ds->pool, "dtls_ssl_lock%p",
&ds->ossl_lock);
if (status != PJ_SUCCESS)
return status;
/* Setup group lock handler for destroy and callback synchronization */
if (srtp->base.grp_lock) {
pj_grp_lock_t *grp_lock = srtp->base.grp_lock;
ds->base.grp_lock = grp_lock;
pj_grp_lock_add_ref(grp_lock);
pj_grp_lock_add_handler(grp_lock, pool, ds, &dtls_on_destroy);
} else {
status = pj_lock_create_simple_mutex(ds->pool, "dtls_ssl_lock%p",
&ds->ossl_lock);
if (status != PJ_SUCCESS)
return status;
}
*p_keying = &ds->base;
PJ_LOG(5,(srtp->pool->obj_name, "SRTP keying DTLS-SRTP created"));
@ -293,6 +305,24 @@ static pj_status_t dtls_create(transport_srtp *srtp,
}
/* Lock/unlock for DTLS states access protection */
static void DTLS_LOCK(dtls_srtp *ds) {
if (ds->base.grp_lock)
pj_grp_lock_acquire(ds->base.grp_lock);
else
pj_lock_acquire(ds->ossl_lock);
}
static void DTLS_UNLOCK(dtls_srtp *ds) {
if (ds->base.grp_lock)
pj_grp_lock_release(ds->base.grp_lock);
else
pj_lock_release(ds->ossl_lock);
}
/**
* Mapping from OpenSSL error codes to pjlib error space.
*/
@ -545,7 +575,7 @@ static pj_status_t ssl_create(dtls_srtp *ds, unsigned idx)
/* Destroy SSL context and instance */
static void ssl_destroy(dtls_srtp *ds, unsigned idx)
{
pj_lock_acquire(ds->ossl_lock);
DTLS_LOCK(ds);
/* Destroy SSL instance */
if (ds->ossl_ssl[idx]) {
@ -570,7 +600,7 @@ static void ssl_destroy(dtls_srtp *ds, unsigned idx)
ds->ossl_ctx[idx] = NULL;
}
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
}
static pj_status_t ssl_get_srtp_material(dtls_srtp *ds, unsigned idx)
@ -581,7 +611,7 @@ static pj_status_t ssl_get_srtp_material(dtls_srtp *ds, unsigned idx)
pjmedia_srtp_crypto *tx, *rx;
pj_status_t status = PJ_SUCCESS;
pj_lock_acquire(ds->ossl_lock);
DTLS_LOCK(ds);
if (!ds->ossl_ssl[idx]) {
status = PJ_EGONE;
@ -652,7 +682,7 @@ static pj_status_t ssl_get_srtp_material(dtls_srtp *ds, unsigned idx)
}
on_return:
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
return status;
}
@ -676,16 +706,16 @@ static pj_status_t ssl_match_fingerprint(dtls_srtp *ds, unsigned idx)
return PJ_ENOTSUP;
}
pj_lock_acquire(ds->ossl_lock);
DTLS_LOCK(ds);
if (!ds->ossl_ssl[idx]) {
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
return PJ_EGONE;
}
/* Get remote cert & calculate the hash */
rem_cert = SSL_get_peer_certificate(ds->ossl_ssl[idx]);
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
if (!rem_cert)
return PJMEDIA_SRTP_DTLS_EPEERNOCERT;
@ -748,10 +778,10 @@ static pj_status_t ssl_flush_wbio(dtls_srtp *ds, unsigned idx)
pj_size_t len;
pj_status_t status = PJ_SUCCESS;
pj_lock_acquire(ds->ossl_lock);
DTLS_LOCK(ds);
if (!ds->ossl_wbio[idx]) {
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
return PJ_EGONE;
}
@ -760,7 +790,7 @@ static pj_status_t ssl_flush_wbio(dtls_srtp *ds, unsigned idx)
/* Yes, get and send it */
len = BIO_read(ds->ossl_wbio[idx], ds->buf[idx], sizeof(ds->buf));
if (len > 0) {
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
status = send_raw(ds, idx, ds->buf[idx], len);
if (status != PJ_SUCCESS) {
@ -771,12 +801,12 @@ static pj_status_t ssl_flush_wbio(dtls_srtp *ds, unsigned idx)
* its packet when not receiving from us.
*/
}
pj_lock_acquire(ds->ossl_lock);
DTLS_LOCK(ds);
}
}
if (!ds->ossl_ssl[idx]) {
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
return PJ_EGONE;
}
@ -784,7 +814,7 @@ static pj_status_t ssl_flush_wbio(dtls_srtp *ds, unsigned idx)
* verification, etc) has been done or handshake is still in progress.
*/
if (ds->nego_completed[idx] || !SSL_is_init_finished(ds->ossl_ssl[idx])) {
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
return PJ_SUCCESS;
}
@ -793,7 +823,7 @@ static pj_status_t ssl_flush_wbio(dtls_srtp *ds, unsigned idx)
PJ_LOG(2,(ds->base.name, "DTLS-SRTP negotiation for %s completed!",
CHANNEL_TO_STRING(idx)));
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
/* Stop the retransmission clock. Note that the clock may not be stopped
* if this function is called from clock thread context. We'll try again
@ -867,18 +897,18 @@ static void clock_cb(const pj_timestamp *ts, void *user_data)
PJ_UNUSED_ARG(ts);
pj_lock_acquire(ds->ossl_lock);
DTLS_LOCK(ds);
if (!ds->ossl_ssl[idx]) {
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
return;
}
if (DTLSv1_handle_timeout(ds->ossl_ssl[idx]) > 0) {
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
ssl_flush_wbio(ds, idx);
} else {
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
}
}
@ -889,18 +919,18 @@ static pj_status_t ssl_handshake_channel(dtls_srtp *ds, unsigned idx)
pj_status_t status;
int err;
pj_lock_acquire(ds->ossl_lock);
DTLS_LOCK(ds);
/* Init DTLS (if not yet) */
status = ssl_create(ds, idx);
if (status != PJ_SUCCESS) {
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
return status;
}
/* Check if handshake has been initiated or even completed */
if (ds->nego_started[idx] || SSL_is_init_finished(ds->ossl_ssl[idx])) {
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
return PJ_SUCCESS;
}
@ -914,7 +944,7 @@ static pj_status_t ssl_handshake_channel(dtls_srtp *ds, unsigned idx)
if (err < 0) {
err = SSL_get_error(ds->ossl_ssl[idx], err);
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
if (err == SSL_ERROR_WANT_READ) {
status = ssl_flush_wbio(ds, idx);
@ -927,7 +957,7 @@ static pj_status_t ssl_handshake_channel(dtls_srtp *ds, unsigned idx)
goto on_return;
}
} else {
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
}
/* Create and start clock @4Hz for retransmission */
@ -1135,10 +1165,10 @@ static pj_status_t ssl_on_recv_packet(dtls_srtp *ds, unsigned idx,
char tmp[128];
pj_size_t nwritten;
pj_lock_acquire(ds->ossl_lock);
DTLS_LOCK(ds);
if (!ds->ossl_rbio[idx]) {
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
return PJ_EGONE;
}
@ -1150,12 +1180,12 @@ static pj_status_t ssl_on_recv_packet(dtls_srtp *ds, unsigned idx,
#if DTLS_DEBUG
pj_perror(2, ds->base.name, status, "BIO_write() error");
#endif
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
return status;
}
if (!ds->ossl_ssl[idx]) {
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
return PJ_EGONE;
}
@ -1172,7 +1202,7 @@ static pj_status_t ssl_on_recv_packet(dtls_srtp *ds, unsigned idx,
}
}
pj_lock_release(ds->ossl_lock);
DTLS_UNLOCK(ds);
/* Flush anything pending in the write BIO */
return ssl_flush_wbio(ds, idx);
@ -1211,14 +1241,18 @@ static pj_status_t dtls_on_recv(pjmedia_transport *tp, unsigned idx,
{
dtls_srtp *ds = (dtls_srtp*)tp;
DTLS_LOCK(ds);
/* Destroy the retransmission clock if handshake has been completed. */
if (ds->clock[idx] && ds->nego_completed[idx]) {
pjmedia_clock_destroy(ds->clock[idx]);
ds->clock[idx] = NULL;
}
if (size < 1 || !IS_DTLS_PKT(pkt, size))
if (size < 1 || !IS_DTLS_PKT(pkt, size) || ds->is_destroying) {
DTLS_UNLOCK(ds);
return PJ_EIGNORED;
}
#if DTLS_DEBUG
PJ_LOG(2,(ds->base.name, "DTLS-SRTP %s receiving %lu bytes",
@ -1258,8 +1292,10 @@ static pj_status_t dtls_on_recv(pjmedia_transport *tp, unsigned idx,
}
status = pjmedia_transport_attach2(&ds->srtp->base, &ap);
if (status != PJ_SUCCESS)
if (status != PJ_SUCCESS) {
DTLS_UNLOCK(ds);
return status;
}
#if DTLS_DEBUG
{
@ -1283,12 +1319,17 @@ static pj_status_t dtls_on_recv(pjmedia_transport *tp, unsigned idx,
pj_status_t status;
ds->setup = DTLS_SETUP_PASSIVE;
status = ssl_handshake_channel(ds, idx);
if (status != PJ_SUCCESS)
if (status != PJ_SUCCESS) {
DTLS_UNLOCK(ds);
return status;
}
}
/* Send it to OpenSSL */
ssl_on_recv_packet(ds, idx, pkt, size);
DTLS_UNLOCK(ds);
return PJ_SUCCESS;
}
@ -1821,6 +1862,15 @@ static void dtls_destroy_channel(dtls_srtp *ds, unsigned idx)
ssl_destroy(ds, idx);
}
static void dtls_on_destroy(void *arg) {
dtls_srtp *ds = (dtls_srtp *)arg;
if (ds->ossl_lock)
pj_lock_destroy(ds->ossl_lock);
pj_pool_safe_release(&ds->pool);
}
static pj_status_t dtls_destroy(pjmedia_transport *tp)
{
dtls_srtp *ds = (dtls_srtp *)tp;
@ -1829,15 +1879,20 @@ static pj_status_t dtls_destroy(pjmedia_transport *tp)
PJ_LOG(2,(ds->base.name, "dtls_destroy()"));
#endif
ds->is_destroying = PJ_TRUE;
DTLS_LOCK(ds);
dtls_destroy_channel(ds, RTP_CHANNEL);
dtls_destroy_channel(ds, RTCP_CHANNEL);
if (ds->ossl_lock) {
pj_lock_destroy(ds->ossl_lock);
ds->ossl_lock = NULL;
}
DTLS_UNLOCK(ds);
pj_pool_safe_release(&ds->pool);
if (ds->base.grp_lock) {
pj_grp_lock_dec_ref(ds->base.grp_lock);
} else {
dtls_on_destroy(tp);
}
return PJ_SUCCESS;
}

View File

@ -298,6 +298,7 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt,
pj_pool_t *pool;
pj_ioqueue_t *ioqueue;
pj_ioqueue_callback rtp_cb, rtcp_cb;
pj_grp_lock_t *grp_lock;
pj_status_t status;
@ -348,18 +349,29 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt,
pj_sockaddr_get_addr_len(&tp->rtp_addr_name));
}
/* Create group lock */
status = pj_grp_lock_create(pool, NULL, &grp_lock);
if (status != PJ_SUCCESS)
goto on_error;
pj_grp_lock_add_ref(grp_lock);
tp->base.grp_lock = grp_lock;
/* Setup RTP socket with the ioqueue */
pj_bzero(&rtp_cb, sizeof(rtp_cb));
rtp_cb.on_read_complete = &on_rx_rtp;
rtp_cb.on_write_complete = &on_rtp_data_sent;
status = pj_ioqueue_register_sock(pool, ioqueue, tp->rtp_sock, tp,
&rtp_cb, &tp->rtp_key);
status = pj_ioqueue_register_sock2(pool, ioqueue, tp->rtp_sock, grp_lock,
tp, &rtp_cb, &tp->rtp_key);
if (status != PJ_SUCCESS)
goto on_error;
/* Disallow concurrency so that detach() and destroy() are
* synchronized with the callback.
*
* Note that we still need this even after group lock is added to
* maintain the above behavior.
*/
status = pj_ioqueue_set_concurrency(tp->rtp_key, PJ_FALSE);
if (status != PJ_SUCCESS)
@ -388,8 +400,8 @@ PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt,
pj_bzero(&rtcp_cb, sizeof(rtcp_cb));
rtcp_cb.on_read_complete = &on_rx_rtcp;
status = pj_ioqueue_register_sock(pool, ioqueue, tp->rtcp_sock, tp,
&rtcp_cb, &tp->rtcp_key);
status = pj_ioqueue_register_sock2(pool, ioqueue, tp->rtcp_sock, grp_lock,
tp, &rtcp_cb, &tp->rtcp_key);
if (status != PJ_SUCCESS)
goto on_error;
@ -436,12 +448,13 @@ static pj_status_t transport_destroy(pjmedia_transport *tp)
/* Must not close while application is using this */
//PJ_ASSERT_RETURN(!udp->attached, PJ_EINVALIDOP);
/* The following calls to pj_ioqueue_unregister() will block the execution
* if callback is still being called because allow_concurrent is false.
* So it is safe to release the pool immediately after.
*/
if (udp->rtp_key) {
/* This will block the execution if callback is still
* being called.
*/
pj_ioqueue_unregister(udp->rtp_key);
udp->rtp_key = NULL;
udp->rtp_sock = PJ_INVALID_SOCKET;
@ -459,6 +472,8 @@ static pj_status_t transport_destroy(pjmedia_transport *tp)
udp->rtcp_sock = PJ_INVALID_SOCKET;
}
pj_grp_lock_dec_ref(tp->grp_lock);
PJ_LOG(4,(udp->base.name, "UDP media transport destroyed"));
pj_pool_release(udp->pool);