ARI: Add the ability to subscribe to all events

This patch adds the ability to subscribe to all events. There are two possible
ways to accomplish this:
(1) On initial WebSocket connection. This patch adds a new query parameter,
    'subscribeAll'. If present and True, Asterisk will subscribe the
    applications to all ARI events.
(2) Via the applications resource. When subscribing in this manner, an ARI
    client should merely specify a blank resource name, i.e., 'channels:'
    instead of 'channels:12354'. This will subscribe the application to all
    resources of the 'channels' type.

ASTERISK-24870 #close

Change-Id: I4a943b4db24442cf28bc64b24bfd541249790ad6
This commit is contained in:
Matt Jordan 2015-09-04 12:25:07 -05:00
parent 47813cc51c
commit b99a705262
9 changed files with 361 additions and 151 deletions

View File

@ -91,6 +91,21 @@ struct ao2_container *stasis_app_get_all(void);
*/
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data);
/*!
* \brief Register a new Stasis application that receives all Asterisk events.
*
* If an application is already registered with the given name, the old
* application is sent a 'replaced' message and unregistered.
*
* \param app_name Name of this application.
* \param handler Callback for application messages.
* \param data Data blob to pass to the callback. Must be AO2 managed.
*
* \return 0 for success
* \return -1 for error.
*/
int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data);
/*!
* \brief Unregister a Stasis application.
* \param app_name Name of the application to unregister.

View File

@ -280,7 +280,9 @@ static void event_session_cleanup(struct event_session *session)
}
event_session_shutdown(session);
ao2_unlink(event_session_registry, session);
if (event_session_registry) {
ao2_unlink(event_session_registry, session);
}
}
/*!
@ -367,6 +369,7 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser,
struct ast_ari_events_event_websocket_args *args, const char *session_id)
{
RAII_VAR(struct event_session *, session, NULL, ao2_cleanup);
int (* register_handler)(const char *, stasis_app_cb handler, void *data);
size_t size, i;
/* The request must have at least one [app] parameter */
@ -399,6 +402,12 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser,
}
/* Register the apps with Stasis */
if (args->subscribe_all) {
register_handler = &stasis_app_register_all;
} else {
register_handler = &stasis_app_register;
}
for (i = 0; i < args->app_count; ++i) {
const char *app = args->app[i];
@ -411,10 +420,10 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser,
return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser);
}
if (stasis_app_register(app, stasis_app_message_handler, session)) {
if (register_handler(app, stasis_app_message_handler, session)) {
ast_log(LOG_WARNING, "Stasis registration failed for application: '%s'\n", app);
return event_session_allocation_error_handler(
session, ERROR_TYPE_STASIS_REGISTRATION, ser);
session, ERROR_TYPE_STASIS_REGISTRATION, ser);
}
}
@ -426,8 +435,17 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser,
return 0;
}
static int event_session_shutdown_cb(void *session, void *arg, int flags)
{
event_session_cleanup(session);
return 0;
}
void ast_ari_websocket_events_event_websocket_dtor(void)
{
ao2_callback(event_session_registry, OBJ_MULTIPLE | OBJ_NODATA, event_session_shutdown_cb, NULL);
ao2_cleanup(event_session_registry);
event_session_registry = NULL;
}
@ -462,7 +480,8 @@ void ast_ari_websocket_events_event_websocket_established(
struct ast_ari_websocket_session *ws_session, struct ast_variable *headers,
struct ast_ari_events_event_websocket_args *args)
{
RAII_VAR(struct event_session *, session, NULL, event_session_cleanup);
struct event_session *session;
struct ast_json *msg;
const char *session_id;
@ -474,7 +493,6 @@ void ast_ari_websocket_events_event_websocket_established(
/* Find the event_session and update its websocket */
session = ao2_find(event_session_registry, session_id, OBJ_SEARCH_KEY);
if (session) {
ao2_unlink(event_session_registry, session);
event_session_update_websocket(session, ws_session);
@ -487,6 +505,9 @@ void ast_ari_websocket_events_event_websocket_established(
while ((msg = ast_ari_websocket_session_read(ws_session))) {
ast_json_unref(msg);
}
event_session_cleanup(session);
ao2_ref(session, -1);
}
void ast_ari_events_user_event(struct ast_variable *headers,

View File

@ -47,6 +47,8 @@ struct ast_ari_events_event_websocket_args {
size_t app_count;
/*! Parsing context for app. */
char *app_parse;
/*! Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'. */
int subscribe_all;
};
/*!

View File

@ -111,6 +111,9 @@ static int ast_ari_events_event_websocket_ws_attempted_cb(struct ast_tcptls_sess
args.app[j] = (vals[j]);
}
} else
if (strcmp(i->name, "subscribeAll") == 0) {
args.subscribe_all = ast_true(i->value);
} else
{}
}
@ -209,6 +212,9 @@ static void ast_ari_events_event_websocket_ws_established_cb(struct ast_websocke
args.app[j] = (vals[j]);
}
} else
if (strcmp(i->name, "subscribeAll") == 0) {
args.subscribe_all = ast_true(i->value);
} else
{}
}

View File

@ -109,6 +109,11 @@ struct ao2_container *app_bridges_moh;
struct ao2_container *app_bridges_playback;
/*!
* \internal \brief List of registered event sources.
*/
AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
static struct ast_json *stasis_end_to_json(struct stasis_message *message,
const struct stasis_message_sanitizer *sanitize)
{
@ -1469,7 +1474,7 @@ struct ao2_container *stasis_app_get_all(void)
return ao2_bump(apps);
}
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
static int __stasis_app_register(const char *app_name, stasis_app_cb handler, void *data, int all_events)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
@ -1482,8 +1487,20 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
if (app) {
app_update(app, handler, data);
} else {
app = app_create(app_name, handler, data);
app = app_create(app_name, handler, data, all_events ? STASIS_APP_SUBSCRIBE_ALL : STASIS_APP_SUBSCRIBE_MANUAL);
if (app) {
if (all_events) {
struct stasis_app_event_source *source;
SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
AST_LIST_TRAVERSE(&event_sources, source, next) {
if (!source->subscribe) {
continue;
}
source->subscribe(app, NULL);
}
}
ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
} else {
ao2_unlock(apps_registry);
@ -1499,6 +1516,16 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
return 0;
}
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
{
return __stasis_app_register(app_name, handler, data, 0);
}
int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
{
return __stasis_app_register(app_name, handler, data, 1);
}
void stasis_app_unregister(const char *app_name)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
@ -1526,11 +1553,6 @@ void stasis_app_unregister(const char *app_name)
cleanup();
}
/*!
* \internal \brief List of registered event sources.
*/
AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
void stasis_app_register_event_source(struct stasis_app_event_source *obj)
{
SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
@ -1727,8 +1749,8 @@ static enum stasis_app_subscribe_res app_subscribe(
ast_debug(3, "%s: Checking %s\n", app_name, uri);
if (!event_source->find ||
(!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) {
if (!ast_strlen_zero(uri + strlen(event_source->scheme)) &&
(!event_source->find || (!(obj = event_source->find(app, uri + strlen(event_source->scheme)))))) {
ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
@ -2062,6 +2084,7 @@ static int load_module(void)
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
.load_pri = AST_MODPRI_APP_DEPEND,
.support_level = AST_MODULE_SUPPORT_CORE,
.load = load_module,
.unload = unload_module,

View File

@ -38,6 +38,10 @@ ASTERISK_REGISTER_FILE()
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
struct stasis_app {
@ -47,12 +51,16 @@ struct stasis_app {
struct stasis_message_router *router;
/*! Router for handling messages to the bridge all \a topic. */
struct stasis_message_router *bridge_router;
/*! Optional router for handling endpoint messages in 'all' subscriptions */
struct stasis_message_router *endpoint_router;
/*! Container of the channel forwards to this app's topic. */
struct ao2_container *forwards;
/*! Callback function for this application. */
stasis_app_cb handler;
/*! Opaque data to hand to callback function. */
void *data;
/*! Subscription model for the application */
enum stasis_app_subscription_model subscription_model;
/*! Name of the Stasis application */
char name[];
};
@ -121,34 +129,33 @@ static struct app_forwards *forwards_create(struct stasis_app *app,
static struct app_forwards *forwards_create_channel(struct stasis_app *app,
struct ast_channel *chan)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
struct app_forwards *forwards;
if (!app || !chan) {
if (!app) {
return NULL;
}
forwards = forwards_create(app, ast_channel_uniqueid(chan));
forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_CHANNEL;
forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
app->topic);
if (!forwards->topic_forward) {
return NULL;
if (chan) {
forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
app->topic);
}
forwards->topic_cached_forward = stasis_forward_all(
ast_channel_topic_cached(chan), app->topic);
if (!forwards->topic_cached_forward) {
chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(),
app->topic);
if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
forwards_unsubscribe(forwards);
ao2_ref(forwards, -1);
return NULL;
}
ao2_ref(forwards, +1);
return forwards;
}
@ -156,69 +163,100 @@ static struct app_forwards *forwards_create_channel(struct stasis_app *app,
static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
struct ast_bridge *bridge)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
struct app_forwards *forwards;
if (!app || !bridge) {
if (!app) {
return NULL;
}
forwards = forwards_create(app, bridge->uniqueid);
forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_BRIDGE;
forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
app->topic);
if (!forwards->topic_forward) {
return NULL;
if (bridge) {
forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
app->topic);
}
forwards->topic_cached_forward = stasis_forward_all(
ast_bridge_topic_cached(bridge), app->topic);
if (!forwards->topic_cached_forward) {
bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(),
app->topic);
if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
forwards_unsubscribe(forwards);
ao2_ref(forwards, -1);
return NULL;
}
ao2_ref(forwards, +1);
return forwards;
}
static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_app *app = data;
stasis_publish(app->topic, message);
}
/*! Forward a endpoint's topics to an app */
static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
struct ast_endpoint *endpoint)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
struct app_forwards *forwards;
int ret = 0;
if (!app || !endpoint) {
if (!app) {
return NULL;
}
forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_ENDPOINT;
forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
app->topic);
if (!forwards->topic_forward) {
return NULL;
if (endpoint) {
forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
app->topic);
forwards->topic_cached_forward = stasis_forward_all(
ast_endpoint_topic_cached(endpoint), app->topic);
if (!forwards->topic_forward || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
forwards_unsubscribe(forwards);
ao2_ref(forwards, -1);
return NULL;
}
} else {
/* Since endpoint subscriptions also subscribe to channels, in the case
* of all endpoint subscriptions, we only want messages for the endpoints.
* As such, we route those particular messages and then re-publish them
* on the app's topic.
*/
ast_assert(app->endpoint_router == NULL);
app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
if (!app->endpoint_router) {
forwards_unsubscribe(forwards);
ao2_ref(forwards, -1);
return NULL;
}
ret |= stasis_message_router_add(app->endpoint_router,
ast_endpoint_state_type(), endpoint_state_cb, app);
ret |= stasis_message_router_add(app->endpoint_router,
ast_endpoint_contact_state_type(), endpoint_state_cb, app);
if (ret) {
ao2_ref(app->endpoint_router, -1);
app->endpoint_router = NULL;
ao2_ref(forwards, -1);
return NULL;
}
}
forwards->topic_cached_forward = stasis_forward_all(
ast_endpoint_topic_cached(endpoint), app->topic);
if (!forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
return NULL;
}
ao2_ref(forwards, +1);
return forwards;
}
@ -260,6 +298,7 @@ static void app_dtor(void *obj)
ast_assert(app->router == NULL);
ast_assert(app->bridge_router == NULL);
ast_assert(app->endpoint_router == NULL);
ao2_cleanup(app->topic);
app->topic = NULL;
@ -793,7 +832,7 @@ static void bridge_default_handler(void *data, struct stasis_subscription *sub,
}
}
struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
size_t size;
@ -806,10 +845,10 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
size = sizeof(*app) + strlen(name) + 1;
app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
if (!app) {
return NULL;
}
app->subscription_model = subscription_model;
app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
@ -877,7 +916,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
return app;
}
struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
{
return app->topic;
}
@ -930,6 +970,8 @@ void app_shutdown(struct stasis_app *app)
app->router = NULL;
stasis_message_router_unsubscribe(app->bridge_router);
app->bridge_router = NULL;
stasis_message_router_unsubscribe(app->endpoint_router);
app->endpoint_router = NULL;
}
int app_is_active(struct stasis_app *app)
@ -1029,34 +1071,47 @@ struct ast_json *app_to_json(const struct stasis_app *app)
int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
struct app_forwards *forwards;
SCOPED_AO2LOCK(lock, app->forwards);
int res;
if (!app || !chan) {
if (!app) {
return -1;
} else {
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
}
forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
/* Forwards not found, create one */
forwards = forwards_create_channel(app, chan);
if (!forwards) {
return -1;
}
res = ao2_link_flags(app->forwards, forwards,
OBJ_NOLOCK);
if (!res) {
return -1;
}
}
++forwards->interested;
ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
/* If subscribed to all, don't subscribe again */
forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) {
ao2_ref(forwards, -1);
return 0;
}
forwards = ao2_find(app->forwards,
chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
/* Forwards not found, create one */
forwards = forwards_create_channel(app, chan);
if (!forwards) {
return -1;
}
res = ao2_link_flags(app->forwards, forwards,
OBJ_NOLOCK);
if (!res) {
ao2_ref(forwards, -1);
return -1;
}
}
++forwards->interested;
ast_debug(3, "Channel '%s' is %d interested in %s\n",
chan ? ast_channel_uniqueid(chan) : "ALL",
forwards->interested,
app->name);
ao2_ref(forwards, -1);
return 0;
}
static int subscribe_channel(struct stasis_app *app, void *obj)
@ -1069,6 +1124,19 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
if (!id) {
if (!strcmp(kind, "bridge")) {
id = BRIDGE_ALL;
} else if (!strcmp(kind, "channel")) {
id = CHANNEL_ALL;
} else if (!strcmp(kind, "endpoint")) {
id = ENDPOINT_ALL;
} else {
ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
return -1;
}
}
forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
@ -1095,16 +1163,16 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
if (!app || !chan) {
if (!app) {
return -1;
}
return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
}
int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
{
if (!app || !channel_id) {
if (!app) {
return -1;
}
@ -1114,6 +1182,10 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
if (ast_strlen_zero(channel_id)) {
channel_id = CHANNEL_ALL;
}
forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
@ -1133,28 +1205,39 @@ struct stasis_app_event_source channel_event_source = {
int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
if (!app || !bridge) {
struct app_forwards *forwards;
SCOPED_AO2LOCK(lock, app->forwards);
if (!app) {
return -1;
} else {
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
}
forwards = ao2_find(app->forwards, bridge->uniqueid,
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
/* Forwards not found, create one */
forwards = forwards_create_bridge(app, bridge);
if (!forwards) {
return -1;
}
ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
}
++forwards->interested;
ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
/* If subscribed to all, don't subscribe again */
forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) {
ao2_ref(forwards, -1);
return 0;
}
forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL,
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
/* Forwards not found, create one */
forwards = forwards_create_bridge(app, bridge);
if (!forwards) {
return -1;
}
ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
}
++forwards->interested;
ast_debug(3, "Bridge '%s' is %d interested in %s\n",
bridge ? bridge->uniqueid : "ALL",
forwards->interested,
app->name);
ao2_ref(forwards, -1);
return 0;
}
static int subscribe_bridge(struct stasis_app *app, void *obj)
@ -1164,16 +1247,16 @@ static int subscribe_bridge(struct stasis_app *app, void *obj)
int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
if (!app || !bridge) {
if (!app) {
return -1;
}
return app_unsubscribe_bridge_id(app, bridge->uniqueid);
return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
}
int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
{
if (!app || !bridge_id) {
if (!app) {
return -1;
}
@ -1182,9 +1265,26 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
return forwards != NULL;
struct app_forwards *forwards;
SCOPED_AO2LOCK(lock, app->forwards);
forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) {
ao2_ref(forwards, -1);
return 1;
}
if (ast_strlen_zero(bridge_id)) {
bridge_id = BRIDGE_ALL;
}
forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) {
ao2_ref(forwards, -1);
return 1;
}
return 0;
}
static void *bridge_find(const struct stasis_app *app, const char *id)
@ -1202,31 +1302,43 @@ struct stasis_app_event_source bridge_event_source = {
int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
{
if (!app || !endpoint) {
struct app_forwards *forwards;
SCOPED_AO2LOCK(lock, app->forwards);
if (!app) {
return -1;
} else {
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
}
forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
/* Forwards not found, create one */
forwards = forwards_create_endpoint(app, endpoint);
if (!forwards) {
return -1;
}
ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
/* Subscribe for messages */
messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
}
++forwards->interested;
ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
/* If subscribed to all, don't subscribe again */
forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) {
ao2_ref(forwards, -1);
return 0;
}
forwards = ao2_find(app->forwards,
endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
/* Forwards not found, create one */
forwards = forwards_create_endpoint(app, endpoint);
if (!forwards) {
return -1;
}
ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
/* Subscribe for messages */
messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
}
++forwards->interested;
ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
forwards->interested,
app->name);
ao2_ref(forwards, -1);
return 0;
}
static int subscribe_endpoint(struct stasis_app *app, void *obj)
@ -1236,7 +1348,7 @@ static int subscribe_endpoint(struct stasis_app *app, void *obj)
int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
if (!app || !endpoint_id) {
if (!app) {
return -1;
}
@ -1246,6 +1358,10 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
if (ast_strlen_zero(endpoint_id)) {
endpoint_id = ENDPOINT_ALL;
}
forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}

View File

@ -36,6 +36,19 @@
*/
struct stasis_app;
enum stasis_app_subscription_model {
/*
* \brief An application must manually subscribe to each
* resource that it cares about. This is the default approach.
*/
STASIS_APP_SUBSCRIBE_MANUAL,
/*
* \brief An application is automatically subscribed to all
* resources in Asterisk, even if it does not control them.
*/
STASIS_APP_SUBSCRIBE_ALL
};
/*!
* \brief Create a res_stasis application.
*
@ -45,7 +58,7 @@ struct stasis_app;
* \return New \c res_stasis application.
* \return \c NULL on error.
*/
struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data);
struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model);
/*!
* \brief Tears down an application.

View File

@ -37,6 +37,11 @@ ASTERISK_REGISTER_FILE()
#include "asterisk/test.h"
#include "messaging.h"
/*!
* \brief Subscription to all technologies
*/
#define TECH_WILDCARD "__AST_ALL_TECH"
/*!
* \brief Number of buckets for the \ref endpoint_subscriptions container
*/
@ -219,10 +224,14 @@ static int has_destination_cb(const struct ast_msg *msg)
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i);
if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token))
|| !strncasecmp(sub->token, buf, strlen(sub->token)))) {
if (!sub) {
continue;
}
if (!strcmp(sub->token, TECH_WILDCARD)
|| !strncasecmp(sub->token, buf, strlen(sub->token))
|| !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock);
sub = NULL; /* No ref bump! */
goto match;
}
@ -231,6 +240,7 @@ static int has_destination_cb(const struct ast_msg *msg)
sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
if (sub) {
ao2_ref(sub, -1);
goto match;
}
@ -238,7 +248,6 @@ static int has_destination_cb(const struct ast_msg *msg)
return 0;
match:
ao2_cleanup(sub);
return 1;
}
@ -301,7 +310,8 @@ static int handle_msg_cb(struct ast_msg *msg)
continue;
}
if (!strncasecmp(sub->token, buf, strlen(sub->token))) {
if (!strcmp(sub->token, TECH_WILDCARD)
|| !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock);
ao2_bump(sub);
endpoint_name = buf;
@ -374,7 +384,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi
{
struct message_subscription *sub = NULL;
if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY);
} else {
int i;
@ -383,7 +393,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i);
if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) {
if (sub && !strcmp(sub->token, endpoint ? ast_endpoint_get_tech(endpoint) : TECH_WILDCARD)) {
ao2_bump(sub);
break;
}
@ -400,10 +410,6 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
endpoint = ast_endpoint_find_by_id(endpoint_id);
if (!endpoint) {
return;
}
sub = get_subscription(endpoint);
if (!sub) {
return;
@ -417,11 +423,11 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup);
if (AST_VECTOR_SIZE(&sub->applications) == 0) {
if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
ao2_unlink(endpoint_subscriptions, sub);
} else {
ast_rwlock_wrlock(&tech_subscriptions_lock);
AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint),
AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD,
messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP);
ast_rwlock_unlock(&tech_subscriptions_lock);
}
@ -429,9 +435,9 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
ao2_unlock(sub);
ao2_ref(sub, -1);
ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n",
app_name, ast_endpoint_get_id(endpoint));
app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
}
static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint)
@ -442,12 +448,12 @@ static struct message_subscription *get_or_create_subscription(struct ast_endpoi
return sub;
}
sub = message_subscription_alloc(ast_endpoint_get_id(endpoint));
sub = message_subscription_alloc(endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD);
if (!sub) {
return NULL;
}
if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
ao2_link(endpoint_subscriptions, sub);
} else {
ast_rwlock_wrlock(&tech_subscriptions_lock);
@ -482,9 +488,9 @@ int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *
AST_VECTOR_APPEND(&sub->applications, tuple);
ao2_unlock(sub);
ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n",
app_name, ast_endpoint_get_id(endpoint));
app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
return 0;
}

View File

@ -26,6 +26,14 @@
"required": true,
"allowMultiple": true,
"dataType": "string"
},
{
"name": "subscribeAll",
"description": "Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'.",
"paramType": "query",
"required": false,
"allowMultiple": false,
"dataType": "boolean"
}
]
}