asterisk/res/stasis/control.c

1087 lines
28 KiB
C
Raw Normal View History

/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*! \file
*
* \brief Stasis application control support.
*
* \author David M. Lee, II <dlee@digium.com>
*/
#include "asterisk.h"
git migration: Refactor the ASTERISK_FILE_VERSION macro Git does not support the ability to replace a token with a version string during check-in. While it does have support for replacing a token on clone, this is somewhat sub-optimal: the token is replaced with the object hash, which is not particularly easy for human consumption. What's more, in practice, the source file version was often not terribly useful. Generally, when triaging bugs, the overall version of Asterisk is far more useful than an individual SVN version of a file. As a result, this patch removes Asterisk's support for showing source file versions. Specifically, it does the following: * Rename ASTERISK_FILE_VERSION macro to ASTERISK_REGISTER_FILE, and remove passing the version in with the macro. Other facilities than 'core show file version' make use of the file names, such as setting a debug level only on a specific file. As such, the act of registering source files with the Asterisk core still has use. The macro rename now reflects the new macro purpose. * main/asterisk: - Refactor the file_version structure to reflect that it no longer tracks a version field. - Remove the "core show file version" CLI command. Without the file version, it is no longer useful. - Remove the ast_file_version_find function. The file version is no longer tracked. - Rename ast_register_file_version/ast_unregister_file_version to ast_register_file/ast_unregister_file, respectively. * main/manager: Remove value from the Version key of the ModuleCheck Action. The actual key itself has not been removed, as doing so would absolutely constitute a backwards incompatible change. However, since the file version is no longer tracked, there is no need to attempt to include it in the Version key. * UPGRADE: Add notes for: - Modification to the ModuleCheck AMI Action - Removal of the "core show file version" CLI command Change-Id: I6cf0ff280e1668bf4957dc21f32a5ff43444a40e
2015-04-12 02:38:22 +00:00
ASTERISK_REGISTER_FILE()
#include "asterisk/stasis_channels.h"
#include "command.h"
#include "control.h"
#include "app.h"
#include "asterisk/dial.h"
#include "asterisk/bridge.h"
#include "asterisk/bridge_after.h"
#include "asterisk/bridge_basic.h"
#include "asterisk/frame.h"
#include "asterisk/pbx.h"
#include "asterisk/musiconhold.h"
#include "asterisk/app.h"
AST_LIST_HEAD(app_control_rules, stasis_app_control_rule);
struct stasis_app_control {
ast_cond_t wait_cond;
/*! Queue of commands to dispatch on the channel */
struct ao2_container *command_queue;
/*!
* The associated channel.
* Be very careful with the threading associated w/ manipulating
* the channel.
*/
struct ast_channel *channel;
/*!
* When a channel is in a bridge, the bridge that it is in.
*/
struct ast_bridge *bridge;
/*!
* Holding place for channel's PBX while imparted to a bridge.
*/
struct ast_pbx *pbx;
/*!
* A list of rules to check before adding a channel to a bridge.
*/
struct app_control_rules add_rules;
/*!
* A list of rules to check before removing a channel from a bridge.
*/
struct app_control_rules remove_rules;
/*!
* Silence generator, when silence is being generated.
*/
struct ast_silence_generator *silgen;
/*!
* The app for which this control was created
*/
struct stasis_app *app;
/*!
* When set, /c app_stasis should exit and continue in the dialplan.
*/
int is_done:1;
};
static void control_dtor(void *obj)
{
struct stasis_app_control *control = obj;
AST_LIST_HEAD_DESTROY(&control->add_rules);
AST_LIST_HEAD_DESTROY(&control->remove_rules);
/* We may have a lingering silence generator; free it */
ast_channel_stop_silence_generator(control->channel, control->silgen);
control->silgen = NULL;
ao2_cleanup(control->command_queue);
ast_cond_destroy(&control->wait_cond);
ao2_cleanup(control->app);
}
struct stasis_app_control *control_create(struct ast_channel *channel, struct stasis_app *app)
{
RAII_VAR(struct stasis_app_control *, control, NULL, ao2_cleanup);
int res;
control = ao2_alloc(sizeof(*control), control_dtor);
if (!control) {
return NULL;
}
control->app = ao2_bump(app);
res = ast_cond_init(&control->wait_cond, NULL);
if (res != 0) {
ast_log(LOG_ERROR, "Error initializing ast_cond_t: %s\n",
strerror(errno));
return NULL;
}
control->command_queue = ao2_container_alloc_list(
AO2_ALLOC_OPT_LOCK_MUTEX, 0, NULL, NULL);
if (!control->command_queue) {
return NULL;
}
control->channel = channel;
AST_LIST_HEAD_INIT(&control->add_rules);
AST_LIST_HEAD_INIT(&control->remove_rules);
ao2_ref(control, +1);
return control;
}
static void app_control_register_rule(
struct stasis_app_control *control,
struct app_control_rules *list, struct stasis_app_control_rule *obj)
{
SCOPED_AO2LOCK(lock, control->command_queue);
AST_LIST_INSERT_TAIL(list, obj, next);
}
static void app_control_unregister_rule(
struct stasis_app_control *control,
struct app_control_rules *list, struct stasis_app_control_rule *obj)
{
struct stasis_app_control_rule *rule;
SCOPED_AO2LOCK(lock, control->command_queue);
AST_RWLIST_TRAVERSE_SAFE_BEGIN(list, rule, next) {
if (rule == obj) {
AST_RWLIST_REMOVE_CURRENT(next);
break;
}
}
AST_RWLIST_TRAVERSE_SAFE_END;
}
/*!
* \internal
* \brief Checks to make sure each rule in the given list passes.
*
* \details Loops over a list of rules checking for rejections or failures.
* If one rule fails its resulting error code is returned.
*
* \note Command queue should be locked before calling this function.
*
* \param control The stasis application control
* \param list The list of rules to check
*
* \retval 0 if all rules pass
* \retval non-zero error code if a rule fails
*/
static enum stasis_app_control_channel_result app_control_check_rules(
const struct stasis_app_control *control,
struct app_control_rules *list)
{
int res = 0;
struct stasis_app_control_rule *rule;
AST_LIST_TRAVERSE(list, rule, next) {
if ((res = rule->check_rule(control))) {
return res;
}
}
return res;
}
void stasis_app_control_register_add_rule(
struct stasis_app_control *control,
struct stasis_app_control_rule *rule)
{
return app_control_register_rule(control, &control->add_rules, rule);
}
void stasis_app_control_unregister_add_rule(
struct stasis_app_control *control,
struct stasis_app_control_rule *rule)
{
app_control_unregister_rule(control, &control->add_rules, rule);
}
void stasis_app_control_register_remove_rule(
struct stasis_app_control *control,
struct stasis_app_control_rule *rule)
{
return app_control_register_rule(control, &control->remove_rules, rule);
}
void stasis_app_control_unregister_remove_rule(
struct stasis_app_control *control,
struct stasis_app_control_rule *rule)
{
app_control_unregister_rule(control, &control->remove_rules, rule);
}
static int app_control_can_add_channel_to_bridge(
struct stasis_app_control *control)
{
return app_control_check_rules(control, &control->add_rules);
}
static int app_control_can_remove_channel_from_bridge(
struct stasis_app_control *control)
{
return app_control_check_rules(control, &control->remove_rules);
}
static int noop_cb(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
return 0;
}
/*! Callback type to see if the command can execute
note: command_queue is locked during callback */
typedef int (*app_command_can_exec_cb)(struct stasis_app_control *control);
static struct stasis_app_command *exec_command_on_condition(
struct stasis_app_control *control, stasis_app_command_cb command_fn,
void *data, command_data_destructor_fn data_destructor,
app_command_can_exec_cb can_exec_fn)
{
int retval;
struct stasis_app_command *command;
command_fn = command_fn ? : noop_cb;
command = command_create(command_fn, data, data_destructor);
if (!command) {
return NULL;
}
ao2_lock(control->command_queue);
if (can_exec_fn && (retval = can_exec_fn(control))) {
ao2_unlock(control->command_queue);
command_complete(command, retval);
return command;
}
ao2_link_flags(control->command_queue, command, OBJ_NOLOCK);
ast_cond_signal(&control->wait_cond);
ao2_unlock(control->command_queue);
return command;
}
static struct stasis_app_command *exec_command(
struct stasis_app_control *control, stasis_app_command_cb command_fn,
void *data, command_data_destructor_fn data_destructor)
{
return exec_command_on_condition(control, command_fn, data, data_destructor, NULL);
}
struct stasis_app_control_dial_data {
char endpoint[AST_CHANNEL_NAME];
int timeout;
};
static int app_control_dial(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
RAII_VAR(struct ast_dial *, dial, ast_dial_create(), ast_dial_destroy);
struct stasis_app_control_dial_data *dial_data = data;
enum ast_dial_result res;
char *tech, *resource;
struct ast_channel *new_chan;
RAII_VAR(struct ast_bridge *, bridge, NULL, ao2_cleanup);
tech = dial_data->endpoint;
if (!(resource = strchr(tech, '/'))) {
return -1;
}
*resource++ = '\0';
if (!dial) {
ast_log(LOG_ERROR, "Failed to create dialing structure.\n");
return -1;
}
if (ast_dial_append(dial, tech, resource, NULL) < 0) {
ast_log(LOG_ERROR, "Failed to add %s/%s to dialing structure.\n", tech, resource);
return -1;
}
ast_dial_set_global_timeout(dial, dial_data->timeout);
res = ast_dial_run(dial, NULL, 0);
if (res != AST_DIAL_RESULT_ANSWERED || !(new_chan = ast_dial_answered_steal(dial))) {
return -1;
}
if (!(bridge = ast_bridge_basic_new())) {
ast_log(LOG_ERROR, "Failed to create basic bridge.\n");
return -1;
}
if (ast_bridge_impart(bridge, new_chan, NULL, NULL,
AST_BRIDGE_IMPART_CHAN_INDEPENDENT)) {
ast_hangup(new_chan);
} else {
control_add_channel_to_bridge(control, chan, bridge);
}
return 0;
}
int stasis_app_control_dial(struct stasis_app_control *control, const char *endpoint, const char *exten, const char *context,
int timeout)
{
struct stasis_app_control_dial_data *dial_data;
if (!(dial_data = ast_calloc(1, sizeof(*dial_data)))) {
return -1;
}
if (!ast_strlen_zero(endpoint)) {
ast_copy_string(dial_data->endpoint, endpoint, sizeof(dial_data->endpoint));
} else if (!ast_strlen_zero(exten) && !ast_strlen_zero(context)) {
snprintf(dial_data->endpoint, sizeof(dial_data->endpoint), "Local/%s@%s", exten, context);
} else {
return -1;
}
if (timeout > 0) {
dial_data->timeout = timeout * 1000;
} else if (timeout == -1) {
dial_data->timeout = -1;
} else {
dial_data->timeout = 30000;
}
stasis_app_send_command_async(control, app_control_dial, dial_data, ast_free_ptr);
return 0;
}
int stasis_app_control_add_role(struct stasis_app_control *control, const char *role)
{
return ast_channel_add_bridge_role(control->channel, role);
}
void stasis_app_control_clear_roles(struct stasis_app_control *control)
{
ast_channel_clear_bridge_roles(control->channel);
}
int control_command_count(struct stasis_app_control *control)
{
return ao2_container_count(control->command_queue);
}
int control_is_done(struct stasis_app_control *control)
{
/* Called from stasis_app_exec thread; no lock needed */
return control->is_done;
}
void control_mark_done(struct stasis_app_control *control)
{
control->is_done = 1;
}
struct stasis_app_control_continue_data {
char context[AST_MAX_CONTEXT];
char extension[AST_MAX_EXTENSION];
int priority;
};
static int app_control_continue(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
struct stasis_app_control_continue_data *continue_data = data;
ast_assert(control->channel != NULL);
/* If we're in a Stasis bridge, depart it before going back to the
* dialplan */
if (stasis_app_get_bridge(control)) {
ast_bridge_depart(control->channel);
}
/* Called from stasis_app_exec thread; no lock needed */
ast_explicit_goto(control->channel, continue_data->context, continue_data->extension, continue_data->priority);
control->is_done = 1;
return 0;
}
int stasis_app_control_continue(struct stasis_app_control *control, const char *context, const char *extension, int priority)
{
struct stasis_app_control_continue_data *continue_data;
if (!(continue_data = ast_calloc(1, sizeof(*continue_data)))) {
return -1;
}
ast_copy_string(continue_data->context, S_OR(context, ""), sizeof(continue_data->context));
ast_copy_string(continue_data->extension, S_OR(extension, ""), sizeof(continue_data->extension));
if (priority > 0) {
continue_data->priority = priority;
} else {
continue_data->priority = -1;
}
stasis_app_send_command_async(control, app_control_continue, continue_data, ast_free_ptr);
return 0;
}
ARI/PJSIP: Add the ability to redirect (transfer) a channel in a Stasis app This patch adds a new feature to ARI to redirect a channel to another server, and fixes a few bugs in PJSIP's handling of the Transfer dialplan application/ARI redirect capability. *New Feature* A new operation has been added to the ARI channels resource, redirect. With this, a channel in a Stasis application can be redirected to another endpoint of the same underlying channel technology. *Bug fixes* In the process of writing this new feature, two bugs were fixed in the PJSIP stack: (1) The existing .transfer channel callback had the limitation that it could only transfer channels to a SIP URI, i.e., you had to pass 'PJSIP/sip:foo@my_provider.com' to the dialplan application. While this is still supported, it is somewhat unintuitive - particularly in a world full of endpoints. As such, we now also support specifying the PJSIP endpoint to transfer to. (2) res_pjsip_multihomed was, unfortunately, trying to 'help' a 302 redirect by updating its Contact header. Alas, that resulted in the forwarding destination set by the dialplan application/ARI resource/whatever being rewritten with very incorrect information. Hence, we now don't bother updating an outgoing response if it is a 302. Since this took a looong time to find, some additional debug statements have been added to those modules that update the Contact headers. Review: https://reviewboard.asterisk.org/r/4316/ ASTERISK-24015 #close Reported by: Private Name ASTERISK-24703 #close Reported by: Matt Jordan ........ Merged revisions 431717 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@431718 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2015-02-12 20:34:37 +00:00
static int app_control_redirect(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
char *endpoint = data;
int res;
ast_assert(control->channel != NULL);
ast_assert(endpoint != NULL);
res = ast_transfer(control->channel, endpoint);
if (!res) {
ast_log(LOG_NOTICE, "Unsupported transfer requested on channel '%s'\n",
ast_channel_name(control->channel));
return 0;
}
return 0;
}
int stasis_app_control_redirect(struct stasis_app_control *control, const char *endpoint)
{
char *endpoint_data = ast_strdup(endpoint);
if (!endpoint_data) {
return -1;
}
stasis_app_send_command_async(control, app_control_redirect, endpoint_data, ast_free_ptr);
return 0;
}
struct stasis_app_control_dtmf_data {
int before;
int between;
unsigned int duration;
int after;
char dtmf[];
};
static int app_control_dtmf(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
struct stasis_app_control_dtmf_data *dtmf_data = data;
if (ast_channel_state(chan) != AST_STATE_UP) {
ast_indicate(chan, AST_CONTROL_PROGRESS);
}
if (dtmf_data->before) {
ast_safe_sleep(chan, dtmf_data->before);
}
ast_dtmf_stream(chan, NULL, dtmf_data->dtmf, dtmf_data->between, dtmf_data->duration);
if (dtmf_data->after) {
ast_safe_sleep(chan, dtmf_data->after);
}
return 0;
}
int stasis_app_control_dtmf(struct stasis_app_control *control, const char *dtmf, int before, int between, unsigned int duration, int after)
{
struct stasis_app_control_dtmf_data *dtmf_data;
if (!(dtmf_data = ast_calloc(1, sizeof(*dtmf_data) + strlen(dtmf) + 1))) {
return -1;
}
dtmf_data->before = before;
dtmf_data->between = between;
dtmf_data->duration = duration;
dtmf_data->after = after;
strcpy(dtmf_data->dtmf, dtmf);
stasis_app_send_command_async(control, app_control_dtmf, dtmf_data, ast_free_ptr);
return 0;
}
static int app_control_ring(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
ast_indicate(control->channel, AST_CONTROL_RINGING);
return 0;
}
int stasis_app_control_ring(struct stasis_app_control *control)
{
stasis_app_send_command_async(control, app_control_ring, NULL, NULL);
return 0;
}
static int app_control_ring_stop(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
ast_indicate(control->channel, -1);
return 0;
}
int stasis_app_control_ring_stop(struct stasis_app_control *control)
{
stasis_app_send_command_async(control, app_control_ring_stop, NULL, NULL);
return 0;
}
struct stasis_app_control_mute_data {
enum ast_frame_type frametype;
unsigned int direction;
};
static int app_control_mute(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
struct stasis_app_control_mute_data *mute_data = data;
SCOPED_CHANNELLOCK(lockvar, chan);
ast_channel_suppress(control->channel, mute_data->direction, mute_data->frametype);
return 0;
}
int stasis_app_control_mute(struct stasis_app_control *control, unsigned int direction, enum ast_frame_type frametype)
{
struct stasis_app_control_mute_data *mute_data;
if (!(mute_data = ast_calloc(1, sizeof(*mute_data)))) {
return -1;
}
mute_data->direction = direction;
mute_data->frametype = frametype;
stasis_app_send_command_async(control, app_control_mute, mute_data, ast_free_ptr);
return 0;
}
static int app_control_unmute(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
struct stasis_app_control_mute_data *mute_data = data;
SCOPED_CHANNELLOCK(lockvar, chan);
ast_channel_unsuppress(control->channel, mute_data->direction, mute_data->frametype);
return 0;
}
int stasis_app_control_unmute(struct stasis_app_control *control, unsigned int direction, enum ast_frame_type frametype)
{
struct stasis_app_control_mute_data *mute_data;
if (!(mute_data = ast_calloc(1, sizeof(*mute_data)))) {
return -1;
}
mute_data->direction = direction;
mute_data->frametype = frametype;
stasis_app_send_command_async(control, app_control_unmute, mute_data, ast_free_ptr);
return 0;
}
int stasis_app_control_set_channel_var(struct stasis_app_control *control, const char *variable, const char *value)
{
return pbx_builtin_setvar_helper(control->channel, variable, value);
}
static int app_control_hold(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
ast_indicate(control->channel, AST_CONTROL_HOLD);
return 0;
}
void stasis_app_control_hold(struct stasis_app_control *control)
{
stasis_app_send_command_async(control, app_control_hold, NULL, NULL);
}
static int app_control_unhold(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
ast_indicate(control->channel, AST_CONTROL_UNHOLD);
return 0;
}
void stasis_app_control_unhold(struct stasis_app_control *control)
{
stasis_app_send_command_async(control, app_control_unhold, NULL, NULL);
}
static int app_control_moh_start(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
char *moh_class = data;
if (ast_channel_state(chan) != AST_STATE_UP) {
ast_indicate(chan, AST_CONTROL_PROGRESS);
}
ast_moh_start(chan, moh_class, NULL);
return 0;
}
void stasis_app_control_moh_start(struct stasis_app_control *control, const char *moh_class)
{
char *data = NULL;
if (!ast_strlen_zero(moh_class)) {
data = ast_strdup(moh_class);
}
stasis_app_send_command_async(control, app_control_moh_start, data, ast_free_ptr);
}
static int app_control_moh_stop(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
ast_moh_stop(chan);
return 0;
}
void stasis_app_control_moh_stop(struct stasis_app_control *control)
{
stasis_app_send_command_async(control, app_control_moh_stop, NULL, NULL);
}
static int app_control_silence_start(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
if (ast_channel_state(chan) != AST_STATE_UP) {
ast_indicate(chan, AST_CONTROL_PROGRESS);
}
if (control->silgen) {
/* We have a silence generator, but it may have been implicitly
* disabled by media actions (music on hold, playing media,
* etc.) Just stop it and restart a new one.
*/
ast_channel_stop_silence_generator(
control->channel, control->silgen);
}
ast_debug(3, "%s: Starting silence generator\n",
stasis_app_control_get_channel_id(control));
control->silgen = ast_channel_start_silence_generator(control->channel);
if (!control->silgen) {
ast_log(LOG_WARNING,
"%s: Failed to start silence generator.\n",
stasis_app_control_get_channel_id(control));
}
return 0;
}
void stasis_app_control_silence_start(struct stasis_app_control *control)
{
stasis_app_send_command_async(control, app_control_silence_start, NULL, NULL);
}
static int app_control_silence_stop(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
if (control->silgen) {
ast_debug(3, "%s: Stopping silence generator\n",
stasis_app_control_get_channel_id(control));
ast_channel_stop_silence_generator(
control->channel, control->silgen);
control->silgen = NULL;
}
return 0;
}
void stasis_app_control_silence_stop(struct stasis_app_control *control)
{
stasis_app_send_command_async(control, app_control_silence_stop, NULL, NULL);
}
struct ast_channel_snapshot *stasis_app_control_get_snapshot(
const struct stasis_app_control *control)
{
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
struct ast_channel_snapshot *snapshot;
msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(),
stasis_app_control_get_channel_id(control));
if (!msg) {
return NULL;
}
snapshot = stasis_message_data(msg);
ast_assert(snapshot != NULL);
ao2_ref(snapshot, +1);
return snapshot;
}
static int app_send_command_on_condition(struct stasis_app_control *control,
stasis_app_command_cb command_fn, void *data,
command_data_destructor_fn data_destructor,
app_command_can_exec_cb can_exec_fn)
{
RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
Resolve race conditions involving Stasis bridges. This resolves two observed race conditions. First, a bit of background on what the Stasis application does: 1a Creates a stasis_app_control structure. This structure is linked into a global container and can be looked up using a channel's unique ID. 2a Puts the channel in an event loop. The event loop can exit either because the stasis_app_control structure has been marked done, or because of some other factor, such as a hangup. In the event loop, the stasis_app_control determines if any specific ARI commands need to be run on the channel and will run them from this thread. 3a Checks if the channel is bridged. If the channel is bridged, then ast_bridge_depart() is called since channels that are added to Stasis bridges are always imparted as departable. 4a Unlink the stasis_app_control from the container. When an ARI command is received by Asterisk, the following occurs 1b A thread is spawned to handle the HTTP request 2b The stasis_app_control(s) that corresponds to the channel(s) in the request is/are retrieved. If the stasis_app_control cannot be retrieved, then it is assumed that the channel in question has exited the Stasis app or perhaps was never in Stasis in the first place. 3b A command is queued onto the stasis_app_control, and the channel's event loop thread is signaled to run the command. 4b While most ARI commands do nothing further, some, such as adding or removing channels from a bridge, will block until the command they issued has been completed by the channel's event loop. The first race condition that is solved by this patch involves a crash that can occur due to faulty detection of the channel's bridged status in step 3a. What can happen is that in step 2a, the event loop may run the ast_bridge_impart() function to asynchronously place the channel into a bridge, then immediately exit the event loop because the channel has hung up. In step 3a, we would detect that the channel was not bridged and would not call ast_bridge_depart(). The reason that the channel did not appear to be bridged was that the depart_thread that is spawned by ast_bridge_impart() had not yet started. That is the thread where the channel is marked as being bridged. Since we did not call ast_bridge_depart(), the Stasis application would exit, and then the channel would be destroyed Then the depart_thread would start up and try to manipulate the destroyed channel, causing a crash. The fix for this is to switch from using ast_channel_is_bridged() to checking the NULLity of ast_channel_internal_bridge_channel() to determine if ast_bridge_depart() needs to be called. The channel's internal bridge_channel is set when ast_bridge_impart() is called and is NULLed by the call to ast_bridge_depart(). If the channel's internal bridge_channel is non-NULL, then the channel must have been imparted into the bridge and needs to be departed, even if the actual bridging operation has not yet started. By departing the channel when necessary, the thread that is running the Stasis application will block until the bridge gives the okay that the depart_thread has exited. The second race condition that is solved by this patch involves a leak of HTTP handler threads. The problem was that step 2b would successfully retrieve a stasis_app_control structure. Then step 2a would exit the channel from the event loop due to a hangup. Steps 3a and 4a would execute, and then finally steps 3b and 4b would. The problem is that at step 4b, when attempting to add a channel to a bridge, the thread would block forever since the channel would never execute the queued command since it was finished with the event loop. This meant that the HTTP handling thread would be leaked, along with any references that thread may have owned (in my case, I was seeing bridges leaked). The fix for this is to hone in better on when the channel has exited the event loop. The stasis_app_control structure has an is_done field that is now set at each point where the channel may exit the event loop. If step 2b retrieves a valid stasis_app_control structure but the control is marked as done, then the attempted operation exits immediately since there will be nothing to service the attempted command. ASTERISK-25091 #close Reported by Ilya Trikoz Change-Id: If66265b73b4c9f8f58599124d777fedc54576628
2015-06-18 18:16:29 +00:00
if (control == NULL || control->is_done) {
return -1;
}
command = exec_command_on_condition(
control, command_fn, data, data_destructor, can_exec_fn);
if (!command) {
return -1;
}
return command_join(command);
}
int stasis_app_send_command(struct stasis_app_control *control,
stasis_app_command_cb command_fn, void *data, command_data_destructor_fn data_destructor)
{
return app_send_command_on_condition(control, command_fn, data, data_destructor, NULL);
}
int stasis_app_send_command_async(struct stasis_app_control *control,
stasis_app_command_cb command_fn, void *data,
command_data_destructor_fn data_destructor)
{
RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
Resolve race conditions involving Stasis bridges. This resolves two observed race conditions. First, a bit of background on what the Stasis application does: 1a Creates a stasis_app_control structure. This structure is linked into a global container and can be looked up using a channel's unique ID. 2a Puts the channel in an event loop. The event loop can exit either because the stasis_app_control structure has been marked done, or because of some other factor, such as a hangup. In the event loop, the stasis_app_control determines if any specific ARI commands need to be run on the channel and will run them from this thread. 3a Checks if the channel is bridged. If the channel is bridged, then ast_bridge_depart() is called since channels that are added to Stasis bridges are always imparted as departable. 4a Unlink the stasis_app_control from the container. When an ARI command is received by Asterisk, the following occurs 1b A thread is spawned to handle the HTTP request 2b The stasis_app_control(s) that corresponds to the channel(s) in the request is/are retrieved. If the stasis_app_control cannot be retrieved, then it is assumed that the channel in question has exited the Stasis app or perhaps was never in Stasis in the first place. 3b A command is queued onto the stasis_app_control, and the channel's event loop thread is signaled to run the command. 4b While most ARI commands do nothing further, some, such as adding or removing channels from a bridge, will block until the command they issued has been completed by the channel's event loop. The first race condition that is solved by this patch involves a crash that can occur due to faulty detection of the channel's bridged status in step 3a. What can happen is that in step 2a, the event loop may run the ast_bridge_impart() function to asynchronously place the channel into a bridge, then immediately exit the event loop because the channel has hung up. In step 3a, we would detect that the channel was not bridged and would not call ast_bridge_depart(). The reason that the channel did not appear to be bridged was that the depart_thread that is spawned by ast_bridge_impart() had not yet started. That is the thread where the channel is marked as being bridged. Since we did not call ast_bridge_depart(), the Stasis application would exit, and then the channel would be destroyed Then the depart_thread would start up and try to manipulate the destroyed channel, causing a crash. The fix for this is to switch from using ast_channel_is_bridged() to checking the NULLity of ast_channel_internal_bridge_channel() to determine if ast_bridge_depart() needs to be called. The channel's internal bridge_channel is set when ast_bridge_impart() is called and is NULLed by the call to ast_bridge_depart(). If the channel's internal bridge_channel is non-NULL, then the channel must have been imparted into the bridge and needs to be departed, even if the actual bridging operation has not yet started. By departing the channel when necessary, the thread that is running the Stasis application will block until the bridge gives the okay that the depart_thread has exited. The second race condition that is solved by this patch involves a leak of HTTP handler threads. The problem was that step 2b would successfully retrieve a stasis_app_control structure. Then step 2a would exit the channel from the event loop due to a hangup. Steps 3a and 4a would execute, and then finally steps 3b and 4b would. The problem is that at step 4b, when attempting to add a channel to a bridge, the thread would block forever since the channel would never execute the queued command since it was finished with the event loop. This meant that the HTTP handling thread would be leaked, along with any references that thread may have owned (in my case, I was seeing bridges leaked). The fix for this is to hone in better on when the channel has exited the event loop. The stasis_app_control structure has an is_done field that is now set at each point where the channel may exit the event loop. If step 2b retrieves a valid stasis_app_control structure but the control is marked as done, then the attempted operation exits immediately since there will be nothing to service the attempted command. ASTERISK-25091 #close Reported by Ilya Trikoz Change-Id: If66265b73b4c9f8f58599124d777fedc54576628
2015-06-18 18:16:29 +00:00
if (control == NULL || control->is_done) {
return -1;
}
command = exec_command(control, command_fn, data, data_destructor);
if (!command) {
return -1;
}
return 0;
}
struct ast_bridge *stasis_app_get_bridge(struct stasis_app_control *control)
{
if (!control) {
return NULL;
} else {
SCOPED_AO2LOCK(lock, control);
return control->bridge;
}
}
static int bridge_channel_depart(struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
struct ast_bridge_channel *bridge_channel = data;
{
SCOPED_CHANNELLOCK(lock, chan);
if (bridge_channel != ast_channel_internal_bridge_channel(chan)) {
ast_debug(3, "%s: Channel is no longer in departable state\n",
ast_channel_uniqueid(chan));
return -1;
}
}
ast_debug(3, "%s: Channel departing bridge\n",
ast_channel_uniqueid(chan));
ast_bridge_depart(chan);
return 0;
}
static void bridge_after_cb(struct ast_channel *chan, void *data)
{
struct stasis_app_control *control = data;
SCOPED_AO2LOCK(lock, control);
struct ast_bridge_channel *bridge_channel;
ast_debug(3, "%s, %s: Channel leaving bridge\n",
ast_channel_uniqueid(chan), control->bridge->uniqueid);
ast_assert(chan == control->channel);
/* Restore the channel's PBX */
ast_channel_pbx_set(control->channel, control->pbx);
control->pbx = NULL;
app_unsubscribe_bridge(control->app, control->bridge);
/* No longer in the bridge */
control->bridge = NULL;
/* Get the bridge channel so we don't depart from the wrong bridge */
ast_channel_lock(chan);
bridge_channel = ast_channel_get_bridge_channel(chan);
ast_channel_unlock(chan);
/* Depart this channel from the bridge using the command queue if possible */
stasis_app_send_command_async(control, bridge_channel_depart, bridge_channel, __ao2_cleanup);
if (stasis_app_channel_is_stasis_end_published(chan)) {
/* The channel has had a StasisEnd published on it, but until now had remained in
* the bridging system. This means that the channel moved from a Stasis bridge to a
* non-Stasis bridge and is now exiting the bridging system. Because of this, the
* channel needs to exit the Stasis application and go to wherever the non-Stasis
* bridge has directed it to go. If the non-Stasis bridge has not set up an after
* bridge destination, then the channel should be hung up.
*/
int hangup_flag;
hangup_flag = ast_bridge_setup_after_goto(chan) ? AST_SOFTHANGUP_DEV : AST_SOFTHANGUP_ASYNCGOTO;
ast_channel_lock(chan);
ast_softhangup_nolock(chan, hangup_flag);
ast_channel_unlock(chan);
}
}
static void bridge_after_cb_failed(enum ast_bridge_after_cb_reason reason,
void *data)
{
struct stasis_app_control *control = data;
bridge_after_cb(control->channel, data);
ast_debug(3, " reason: %s\n",
ast_bridge_after_cb_reason_string(reason));
}
int control_add_channel_to_bridge(
struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
struct ast_bridge *bridge = data;
int res;
if (!control || !bridge) {
return -1;
}
ast_debug(3, "%s: Adding to bridge %s\n",
stasis_app_control_get_channel_id(control),
bridge->uniqueid);
ast_assert(chan != NULL);
/* Depart whatever Stasis bridge we're currently in. */
if (stasis_app_get_bridge(control)) {
/* Note that it looks like there's a race condition here, since
* we don't have control locked. But this happens from the
* control callback thread, so there won't be any other
* concurrent attempts to bridge.
*/
ast_bridge_depart(chan);
}
res = ast_bridge_set_after_callback(chan, bridge_after_cb,
bridge_after_cb_failed, control);
if (res != 0) {
ast_log(LOG_ERROR, "Error setting after-bridge callback\n");
return -1;
}
{
/* pbx and bridge are modified by the bridging impart thread.
* It shouldn't happen concurrently, but we still need to lock
* for the memory fence.
*/
SCOPED_AO2LOCK(lock, control);
/* Ensure the controlling application is subscribed early enough
* to receive the ChannelEnteredBridge message. This works in concert
* with the subscription handled in the Stasis application execution
* loop */
app_subscribe_bridge(control->app, bridge);
/* Save off the channel's PBX */
ast_assert(control->pbx == NULL);
if (!control->pbx) {
control->pbx = ast_channel_pbx(chan);
ast_channel_pbx_set(chan, NULL);
}
res = ast_bridge_impart(bridge,
chan,
NULL, /* swap channel */
NULL, /* features */
AST_BRIDGE_IMPART_CHAN_DEPARTABLE);
if (res != 0) {
ast_log(LOG_ERROR, "Error adding channel to bridge\n");
ast_channel_pbx_set(chan, control->pbx);
control->pbx = NULL;
return -1;
}
ast_assert(stasis_app_get_bridge(control) == NULL);
control->bridge = bridge;
}
return 0;
}
int stasis_app_control_add_channel_to_bridge(
struct stasis_app_control *control, struct ast_bridge *bridge)
{
ast_debug(3, "%s: Sending channel add_to_bridge command\n",
stasis_app_control_get_channel_id(control));
return app_send_command_on_condition(
control, control_add_channel_to_bridge, bridge, NULL,
app_control_can_add_channel_to_bridge);
}
static int app_control_remove_channel_from_bridge(
struct stasis_app_control *control,
struct ast_channel *chan, void *data)
{
struct ast_bridge *bridge = data;
if (!control) {
return -1;
}
/* We should only depart from our own bridge */
ast_debug(3, "%s: Departing bridge %s\n",
stasis_app_control_get_channel_id(control),
bridge->uniqueid);
if (bridge != stasis_app_get_bridge(control)) {
ast_log(LOG_WARNING, "%s: Not in bridge %s; not removing\n",
stasis_app_control_get_channel_id(control),
bridge->uniqueid);
return -1;
}
ast_bridge_depart(chan);
return 0;
}
int stasis_app_control_remove_channel_from_bridge(
struct stasis_app_control *control, struct ast_bridge *bridge)
{
ast_debug(3, "%s: Sending channel remove_from_bridge command\n",
stasis_app_control_get_channel_id(control));
return app_send_command_on_condition(
control, app_control_remove_channel_from_bridge, bridge, NULL,
app_control_can_remove_channel_from_bridge);
}
const char *stasis_app_control_get_channel_id(
const struct stasis_app_control *control)
{
return ast_channel_uniqueid(control->channel);
}
void stasis_app_control_publish(
struct stasis_app_control *control, struct stasis_message *message)
{
if (!control || !control->channel || !message) {
return;
}
stasis_publish(ast_channel_topic(control->channel), message);
}
int stasis_app_control_queue_control(struct stasis_app_control *control,
enum ast_control_frame_type frame_type)
{
return ast_queue_control(control->channel, frame_type);
}
int control_dispatch_all(struct stasis_app_control *control,
struct ast_channel *chan)
{
int count = 0;
struct ao2_iterator i;
void *obj;
ast_assert(control->channel == chan);
i = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_app_command *, command, obj, ao2_cleanup);
command_invoke(command, control, chan);
++count;
}
ao2_iterator_destroy(&i);
return count;
}
void control_wait(struct stasis_app_control *control)
{
if (!control) {
return;
}
ast_assert(control->command_queue != NULL);
ao2_lock(control->command_queue);
while (ao2_container_count(control->command_queue) == 0) {
int res = ast_cond_wait(&control->wait_cond,
ao2_object_get_lockaddr(control->command_queue));
if (res < 0) {
ast_log(LOG_ERROR, "Error waiting on command queue\n");
break;
}
}
ao2_unlock(control->command_queue);
}
int control_prestart_dispatch_all(struct stasis_app_control *control,
struct ast_channel *chan)
{
struct ao2_container *command_queue;
int count = 0;
struct ao2_iterator iter;
struct stasis_app_command *command;
ast_channel_lock(chan);
command_queue = command_prestart_get_container(chan);
ast_channel_unlock(chan);
if (!command_queue) {
return 0;
}
iter = ao2_iterator_init(command_queue, AO2_ITERATOR_UNLINK);
while ((command = ao2_iterator_next(&iter))) {
command_invoke(command, control, chan);
ao2_cleanup(command);
++count;
}
ao2_iterator_destroy(&iter);
ao2_cleanup(command_queue);
return count;
}
struct stasis_app *control_app(struct stasis_app_control *control)
{
return control->app;
}