Merge "bridge: Hold off more than one imparting channel at a time."

This commit is contained in:
zuul 2016-04-22 17:08:04 -05:00 committed by Gerrit Code Review
commit 1df086f821
3 changed files with 196 additions and 79 deletions

View File

@ -151,47 +151,20 @@ int bridge_channel_internal_push_full(struct ast_bridge_channel *bridge_channel,
void bridge_channel_internal_pull(struct ast_bridge_channel *bridge_channel);
/*!
* \brief Internal bridge channel wait condition and associated result.
*/
struct bridge_channel_internal_cond {
/*! Lock for the data structure */
ast_mutex_t lock;
/*! Wait condition */
ast_cond_t cond;
/*! Wait until done */
int done;
/*! The bridge channel */
struct ast_bridge_channel *bridge_channel;
};
/*!
* \internal
* \brief Wait for the expected signal.
* \since 13.5.0
* \brief Signal imparting threads to wake up.
* \since 13.9.0
*
* \param cond the wait object
* \param chan Channel imparted that we need to signal.
*
* \return Nothing
*/
void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond);
/*!
* \internal
* \brief Signal the condition wait.
* \since 13.5.0
*
* \param cond the wait object
*
* \return Nothing
*/
void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond);
void bridge_channel_impart_signal(struct ast_channel *chan);
/*!
* \internal
* \brief Join the bridge_channel to the bridge (blocking)
*
* \param bridge_channel The Channel in the bridge
* \param cond data used for signaling
*
* \note The bridge_channel->swap holds a channel reference for the swap
* channel going into the bridging system. The ref ensures that the swap
@ -206,8 +179,7 @@ void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond);
* \retval 0 bridge channel successfully joined the bridge
* \retval -1 bridge channel failed to join the bridge
*/
int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
struct bridge_channel_internal_cond *cond);
int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel);
/*!
* \internal

View File

@ -1481,6 +1481,150 @@ void ast_bridge_notify_masquerade(struct ast_channel *chan)
ao2_ref(bridge_channel, -1);
}
/*!
* \brief Internal bridge impart wait condition and associated conditional.
*/
struct bridge_channel_impart_cond {
AST_LIST_ENTRY(bridge_channel_impart_cond) node;
/*! Lock for the data structure */
ast_mutex_t lock;
/*! Wait condition */
ast_cond_t cond;
/*! Wait until done */
int done;
};
AST_LIST_HEAD_NOLOCK(bridge_channel_impart_ds_head, bridge_channel_impart_cond);
/*!
* \internal
* \brief Signal imparting threads to wake up.
* \since 13.9.0
*
* \param ds_head List of imparting threads to wake up.
*
* \return Nothing
*/
static void bridge_channel_impart_ds_head_signal(struct bridge_channel_impart_ds_head *ds_head)
{
if (ds_head) {
struct bridge_channel_impart_cond *cond;
while ((cond = AST_LIST_REMOVE_HEAD(ds_head, node))) {
ast_mutex_lock(&cond->lock);
cond->done = 1;
ast_cond_signal(&cond->cond);
ast_mutex_unlock(&cond->lock);
}
}
}
static void bridge_channel_impart_ds_head_dtor(void *doomed)
{
bridge_channel_impart_ds_head_signal(doomed);
ast_free(doomed);
}
/*!
* \internal
* \brief Fixup the bridge impart datastore.
* \since 13.9.0
*
* \param data Bridge impart datastore data to fixup from old_chan.
* \param old_chan The datastore is moving from this channel.
* \param new_chan The datastore is moving to this channel.
*
* \return Nothing
*/
static void bridge_channel_impart_ds_head_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
{
/*
* Signal any waiting impart threads. The masquerade is going to kill
* old_chan and we don't need to be waiting on new_chan.
*/
bridge_channel_impart_ds_head_signal(data);
}
static const struct ast_datastore_info bridge_channel_impart_ds_info = {
.type = "bridge-impart-ds",
.destroy = bridge_channel_impart_ds_head_dtor,
.chan_fixup = bridge_channel_impart_ds_head_fixup,
};
/*!
* \internal
* \brief Add impart wait datastore conditional to channel.
* \since 13.9.0
*
* \param chan Channel to add the impart wait conditional.
* \param cond Imparting conditional to add.
*
* \retval 0 on success.
* \retval -1 on error.
*/
static int bridge_channel_impart_add(struct ast_channel *chan, struct bridge_channel_impart_cond *cond)
{
struct ast_datastore *datastore;
struct bridge_channel_impart_ds_head *ds_head;
ast_channel_lock(chan);
datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL);
if (!datastore) {
datastore = ast_datastore_alloc(&bridge_channel_impart_ds_info, NULL);
if (!datastore) {
ast_channel_unlock(chan);
return -1;
}
ds_head = ast_calloc(1, sizeof(*ds_head));
if (!ds_head) {
ast_channel_unlock(chan);
ast_datastore_free(datastore);
return -1;
}
datastore->data = ds_head;
ast_channel_datastore_add(chan, datastore);
} else {
ds_head = datastore->data;
ast_assert(ds_head != NULL);
}
AST_LIST_INSERT_TAIL(ds_head, cond, node);
ast_channel_unlock(chan);
return 0;
}
void bridge_channel_impart_signal(struct ast_channel *chan)
{
struct ast_datastore *datastore;
ast_channel_lock(chan);
datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL);
if (datastore) {
bridge_channel_impart_ds_head_signal(datastore->data);
}
ast_channel_unlock(chan);
}
/*!
* \internal
* \brief Block imparting channel thread until signaled.
* \since 13.9.0
*
* \param cond Imparting conditional to wait for.
*
* \return Nothing
*/
static void bridge_channel_impart_wait(struct bridge_channel_impart_cond *cond)
{
ast_mutex_lock(&cond->lock);
while (!cond->done) {
ast_cond_wait(&cond->cond, &cond->lock);
}
ast_mutex_unlock(&cond->lock);
}
/*
* XXX ASTERISK-21271 make ast_bridge_join() require features to be allocated just like ast_bridge_impart() and not expect the struct back.
*
@ -1549,7 +1693,7 @@ int ast_bridge_join(struct ast_bridge *bridge,
}
if (!res) {
res = bridge_channel_internal_join(bridge_channel, NULL);
res = bridge_channel_internal_join(bridge_channel);
}
/* Cleanup all the data in the bridge channel after it leaves the bridge. */
@ -1566,6 +1710,7 @@ int ast_bridge_join(struct ast_bridge *bridge,
join_exit:;
ast_bridge_run_after_callback(chan);
bridge_channel_impart_signal(chan);
if (!(ast_channel_softhangup_internal_flag(chan) & AST_SOFTHANGUP_ASYNCGOTO)
&& !ast_bridge_setup_after_goto(chan)) {
/* Claim the after bridge goto is an async goto destination. */
@ -1579,14 +1724,13 @@ join_exit:;
/*! \brief Thread responsible for imparted bridged channels to be departed */
static void *bridge_channel_depart_thread(void *data)
{
struct bridge_channel_internal_cond *cond = data;
struct ast_bridge_channel *bridge_channel = cond->bridge_channel;
struct ast_bridge_channel *bridge_channel = data;
if (bridge_channel->callid) {
ast_callid_threadassoc_add(bridge_channel->callid);
}
bridge_channel_internal_join(bridge_channel, cond);
bridge_channel_internal_join(bridge_channel);
/*
* cleanup
@ -1599,6 +1743,8 @@ static void *bridge_channel_depart_thread(void *data)
bridge_channel->features = NULL;
ast_bridge_discard_after_callback(bridge_channel->chan, AST_BRIDGE_AFTER_CB_REASON_DEPART);
/* If join failed there will be impart threads waiting. */
bridge_channel_impart_signal(bridge_channel->chan);
ast_bridge_discard_after_goto(bridge_channel->chan);
return NULL;
@ -1607,15 +1753,14 @@ static void *bridge_channel_depart_thread(void *data)
/*! \brief Thread responsible for independent imparted bridged channels */
static void *bridge_channel_ind_thread(void *data)
{
struct bridge_channel_internal_cond *cond = data;
struct ast_bridge_channel *bridge_channel = cond->bridge_channel;
struct ast_bridge_channel *bridge_channel = data;
struct ast_channel *chan;
if (bridge_channel->callid) {
ast_callid_threadassoc_add(bridge_channel->callid);
}
bridge_channel_internal_join(bridge_channel, cond);
bridge_channel_internal_join(bridge_channel);
chan = bridge_channel->chan;
/* cleanup */
@ -1632,15 +1777,18 @@ static void *bridge_channel_ind_thread(void *data)
ao2_ref(bridge_channel, -1);
ast_bridge_run_after_callback(chan);
/* If join failed there will be impart threads waiting. */
bridge_channel_impart_signal(chan);
ast_bridge_run_after_goto(chan);
return NULL;
}
int ast_bridge_impart(struct ast_bridge *bridge,
static int bridge_impart_internal(struct ast_bridge *bridge,
struct ast_channel *chan,
struct ast_channel *swap,
struct ast_bridge_features *features,
enum ast_bridge_impart_flags flags)
enum ast_bridge_impart_flags flags,
struct bridge_channel_impart_cond *cond)
{
int res = 0;
struct ast_bridge_channel *bridge_channel;
@ -1699,27 +1847,20 @@ int ast_bridge_impart(struct ast_bridge *bridge,
/* Actually create the thread that will handle the channel */
if (!res) {
struct bridge_channel_internal_cond cond = {
.done = 0,
.bridge_channel = bridge_channel
};
ast_mutex_init(&cond.lock);
ast_cond_init(&cond.cond, NULL);
res = bridge_channel_impart_add(chan, cond);
}
if (!res) {
if ((flags & AST_BRIDGE_IMPART_CHAN_MASK) == AST_BRIDGE_IMPART_CHAN_INDEPENDENT) {
res = ast_pthread_create_detached(&bridge_channel->thread, NULL,
bridge_channel_ind_thread, &cond);
bridge_channel_ind_thread, bridge_channel);
} else {
res = ast_pthread_create(&bridge_channel->thread, NULL,
bridge_channel_depart_thread, &cond);
bridge_channel_depart_thread, bridge_channel);
}
if (!res) {
bridge_channel_internal_wait(&cond);
bridge_channel_impart_wait(cond);
}
ast_cond_destroy(&cond.cond);
ast_mutex_destroy(&cond.lock);
}
if (res) {
@ -1740,6 +1881,32 @@ int ast_bridge_impart(struct ast_bridge *bridge,
return 0;
}
int ast_bridge_impart(struct ast_bridge *bridge,
struct ast_channel *chan,
struct ast_channel *swap,
struct ast_bridge_features *features,
enum ast_bridge_impart_flags flags)
{
struct bridge_channel_impart_cond cond = {
.done = 0,
};
int res;
ast_mutex_init(&cond.lock);
ast_cond_init(&cond.cond, NULL);
res = bridge_impart_internal(bridge, chan, swap, features, flags, &cond);
if (res) {
/* Impart failed. Signal any other waiting impart threads */
bridge_channel_impart_signal(chan);
}
ast_cond_destroy(&cond.cond);
ast_mutex_destroy(&cond.lock);
return res;
}
int ast_bridge_depart(struct ast_channel *chan)
{
struct ast_bridge_channel *bridge_channel;

View File

@ -2637,27 +2637,7 @@ static void bridge_channel_event_join_leave(struct ast_bridge_channel *bridge_ch
ao2_iterator_destroy(&iter);
}
void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond)
{
ast_mutex_lock(&cond->lock);
while (!cond->done) {
ast_cond_wait(&cond->cond, &cond->lock);
}
ast_mutex_unlock(&cond->lock);
}
void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond)
{
if (cond) {
ast_mutex_lock(&cond->lock);
cond->done = 1;
ast_cond_signal(&cond->cond);
ast_mutex_unlock(&cond->lock);
}
}
int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
struct bridge_channel_internal_cond *cond)
int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel)
{
int res = 0;
struct ast_bridge_features *channel_features;
@ -2687,7 +2667,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
bridge_channel->bridge->uniqueid,
bridge_channel,
ast_channel_name(bridge_channel->chan));
bridge_channel_internal_signal(cond);
return -1;
}
ast_channel_internal_bridge_set(bridge_channel->chan, bridge_channel->bridge);
@ -2722,8 +2701,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
}
bridge_reconfigured(bridge_channel->bridge, !bridge_channel->inhibit_colp);
bridge_channel_internal_signal(cond);
if (bridge_channel->state == BRIDGE_CHANNEL_STATE_WAIT) {
/*
* Indicate a source change since this channel is entering the
@ -2735,6 +2712,7 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
ast_indicate(bridge_channel->chan, AST_CONTROL_SRCCHANGE);
}
bridge_channel_impart_signal(bridge_channel->chan);
ast_bridge_unlock(bridge_channel->bridge);
/* Must release any swap ref after unlocking the bridge. */