ARI: Add subscription support

This patch adds an /applications API to ARI, allowing explicit management of
Stasis applications.

 * GET /applications - list current applications
 * GET /applications/{applicationName} - get details of a specific application
 * POST /applications/{applicationName}/subscription - explicitly subscribe to
   a channel, bridge or endpoint
 * DELETE /applications/{applicationName}/subscription - explicitly unsubscribe
   from a channel, bridge or endpoint

Subscriptions work by a reference counting mechanism: if you subscript to an
event source X number of times, you must unsubscribe X number of times to stop
receiveing events for that event source.

Review: https://reviewboard.asterisk.org/r/2862

(issue ASTERISK-22451)
Reported by: Matt Jordan
........

Merged revisions 400522 from http://svn.asterisk.org/svn/asterisk/branches/12


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@400523 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Matthew Jordan 2013-10-04 16:01:48 +00:00
parent b52c972b17
commit 8d7873b836
21 changed files with 1722 additions and 15 deletions

View File

@ -142,4 +142,12 @@ int ast_parking_stasis_init(void);
/*! \brief initialize the sounds index */
int ast_sounds_index_init(void);
/*!
* \brief Endpoint support initialization.
* \return 0 on success.
* \return Non-zero on error.
*/
int ast_endpoint_init(void);
#endif /* _ASTERISK__PRIVATE_H */

View File

@ -76,6 +76,19 @@ const char *ast_endpoint_state_to_string(enum ast_endpoint_state state);
*/
struct ast_endpoint;
/*!
* \brief Finds the endpoint with the given tech/resource id.
*
* Endpoints are refcounted, so ao2_cleanup() when you're done.
*
* \param id Tech/resource id to look for.
* \return Associated endpoint.
* \return \c NULL if not found.
*
* \since 12
*/
struct ast_endpoint *ast_endpoint_find_by_id(const char *id);
/*!
* \brief Create an endpoint struct.
*
@ -125,6 +138,18 @@ const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint);
*/
const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint);
/*!
* \brief Gets the tech/resource id of the given endpoint.
*
* This is unique across all endpoints, and immutable.
*
* \param endpoint The endpoint.
* \return Tech/resource id of the endpoint.
* \return \c NULL if endpoint is \c NULL.
* \since 12
*/
const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint);
/*!
* \brief Updates the state of the given endpoint.
*

View File

@ -68,6 +68,14 @@
typedef void (*stasis_app_cb)(void *data, const char *app_name,
struct ast_json *message);
/*!
* \brief Gets the names of all registered Stasis applications.
*
* \return \c ast_str_container of container names.
* \return \c NULL on error.
*/
struct ao2_container *stasis_app_get_all(void);
/*!
* \brief Register a new Stasis application.
*
@ -77,6 +85,7 @@ typedef void (*stasis_app_cb)(void *data, const char *app_name,
* \param app_name Name of this application.
* \param handler Callback for application messages.
* \param data Data blob to pass to the callback. Must be AO2 managed.
*
* \return 0 for success
* \return -1 for error.
*/
@ -96,11 +105,61 @@ void stasis_app_unregister(const char *app_name);
*
* \param app_name Name of the application to invoke.
* \param message Message to send (borrowed reference)
*
* \return 0 for success.
* \return -1 for error.
*/
int stasis_app_send(const char *app_name, struct ast_json *message);
/*!
* \brief Return the JSON representation of a Stasis application.
*
* \param app_name Name of the application.
*
* \return JSON representation of app with given name.
* \return \c NULL on error.
*/
struct ast_json *stasis_app_to_json(const char *app_name);
/*! \brief Return code for stasis_app_[un]subscribe */
enum stasis_app_subscribe_res {
STASIS_ASR_OK,
STASIS_ASR_APP_NOT_FOUND,
STASIS_ASR_EVENT_SOURCE_NOT_FOUND,
STASIS_ASR_EVENT_SOURCE_BAD_SCHEME,
STASIS_ASR_INTERNAL_ERROR,
};
/*!
* \brief Subscribes an application to a list of event sources.
*
* \param app_name Name of the application to subscribe.
* \param event_source_uris URIs for the event sources to subscribe to.
* \param event_sources_count Array size of event_source_uris.
* \param json Optional output pointer for JSON representation of the app
* after adding the subscription.
*
* \return \ref stasis_app_subscribe_res return code.
*/
enum stasis_app_subscribe_res stasis_app_subscribe(const char *app_name,
const char **event_source_uris, int event_sources_count,
struct ast_json **json);
/*!
* \brief Unsubscribes an application from a list of event sources.
*
* \param app_name Name of the application to subscribe.
* \param event_source_uris URIs for the event sources to subscribe to.
* \param event_sources_count Array size of event_source_uris.
* \param json Optional output pointer for JSON representation of the app
* after adding the subscription.
*
* \return \ref stasis_app_subscribe_res return code.
*/
enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
const char **event_source_uris, int event_sources_count,
struct ast_json **json);
/*! @} */
/*! @{ */
@ -111,6 +170,7 @@ struct stasis_app_control;
/*!
* \brief Returns the handler for the given channel.
* \param chan Channel to handle.
*
* \return NULL channel not in Stasis application.
* \return Pointer to \c res_stasis handler.
*/
@ -120,6 +180,7 @@ struct stasis_app_control *stasis_app_control_find_by_channel(
/*!
* \brief Returns the handler for the channel with the given id.
* \param channel_id Uniqueid of the channel.
*
* \return NULL channel not in Stasis application, or channel does not exist.
* \return Pointer to \c res_stasis handler.
*/
@ -153,6 +214,7 @@ void stasis_app_control_execute_until_exhausted(
* \brief Returns the uniqueid of the channel associated with this control
*
* \param control Control object.
*
* \return Uniqueid of the associate channel.
* \return \c NULL if \a control is \c NULL.
*/
@ -245,6 +307,7 @@ int stasis_app_control_answer(struct stasis_app_control *control);
* \brief Get the value of a variable on the channel associated with this control.
* \param control Control for \c res_stasis.
* \param variable The name of the variable.
*
* \return The value of the variable. The returned variable must be freed.
*/
char *stasis_app_control_get_channel_var(struct stasis_app_control *control, const char *variable);
@ -291,6 +354,7 @@ void stasis_app_control_moh_stop(struct stasis_app_control *control);
* The returned pointer is AO2 managed, so ao2_cleanup() when you're done.
*
* \param control Control for \c res_stasis.
*
* \return Most recent snapshot. ao2_cleanup() when done.
* \return \c NULL if channel isn't in cache.
*/
@ -331,6 +395,7 @@ struct ast_bridge *stasis_app_bridge_create(const char *type);
/*!
* \brief Returns the bridge with the given id.
* \param bridge_id Uniqueid of the bridge.
*
* \return NULL bridge not created by a Stasis application, or bridge does not exist.
* \return Pointer to bridge.
*/
@ -364,6 +429,7 @@ int stasis_app_bridge_moh_stop(
*
* \param control Control whose channel should be added to the bridge
* \param bridge Pointer to the bridge
*
* \return non-zero on failure
* \return zero on success
*/
@ -375,6 +441,7 @@ int stasis_app_control_add_channel_to_bridge(
*
* \param control Control whose channel should be removed from the bridge
* \param bridge Pointer to the bridge
*
* \return non-zero on failure
* \return zero on success
*/
@ -386,6 +453,7 @@ int stasis_app_control_remove_channel_from_bridge(
* \brief Gets the bridge currently associated with a control object.
*
* \param control Control object for the channel to query.
*
* \return Associated \ref ast_bridge.
* \return \c NULL if not associated with a bridge.
*/

View File

@ -247,6 +247,7 @@ int daemon(int, int); /* defined in libresolv of all places */
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_system.h"
#include "asterisk/security_events.h"
#include "asterisk/endpoints.h"
#include "../defaults.h"
@ -4340,6 +4341,11 @@ int main(int argc, char *argv[])
ast_channels_init();
if (ast_endpoint_init()) {
printf ("%s", term_quit());
exit(1);
}
if ((moduleresult = load_modules(1))) { /* Load modules, pre-load only */
printf("%s", term_quit());
exit(moduleresult == -2 ? 2 : 1);

View File

@ -38,10 +38,16 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/stringfields.h"
#include "asterisk/_private.h"
/*! Buckets for endpoint->channel mappings. Keep it prime! */
#define ENDPOINT_CHANNEL_BUCKETS 127
/*! Buckets for endpoint hash. Keep it prime! */
#define ENDPOINT_BUCKETS 127
static struct ao2_container *endpoints;
struct ast_endpoint {
AST_DECLARE_STRING_FIELDS(
AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
@ -65,6 +71,59 @@ struct ast_endpoint {
struct ao2_container *channel_ids;
};
static int endpoint_hash(const void *obj, int flags)
{
const struct ast_endpoint *endpoint;
const char *key;
switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_KEY:
key = obj;
return ast_str_hash(key);
case OBJ_SEARCH_OBJECT:
endpoint = obj;
return ast_str_hash(endpoint->id);
default:
/* Hash can only work on something with a full key. */
ast_assert(0);
return 0;
}
}
static int endpoint_cmp(void *obj, void *arg, int flags)
{
const struct ast_endpoint *left = obj;
const struct ast_endpoint *right = arg;
const char *right_key = arg;
int cmp;
switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_OBJECT:
right_key = right->id;
/* Fall through */
case OBJ_SEARCH_KEY:
cmp = strcmp(left->id, right_key);
break;
case OBJ_SEARCH_PARTIAL_KEY:
cmp = strncmp(left->id, right_key, strlen(right_key));
break;
default:
ast_assert(0);
cmp = 0;
break;
}
if (cmp) {
return 0;
}
return CMP_MATCH;
}
struct ast_endpoint *ast_endpoint_find_by_id(const char *id)
{
return ao2_find(endpoints, id, OBJ_KEY);
}
struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
{
if (!endpoint) {
@ -218,7 +277,7 @@ struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
/* All access to channel_ids should be covered by the endpoint's
* lock; no extra lock needed. */
endpoint->channel_ids = ast_str_container_alloc_options(
AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_BUCKETS);
AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_CHANNEL_BUCKETS);
if (!endpoint->channel_ids) {
return NULL;
}
@ -241,16 +300,12 @@ struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
endpoint_publish_snapshot(endpoint);
ao2_link(endpoints, endpoint);
ao2_ref(endpoint, +1);
return endpoint;
}
const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
{
ast_assert(endpoint != NULL);
return endpoint->tech;
}
static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
{
RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
@ -270,6 +325,8 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
return;
}
ao2_unlink(endpoints, endpoint);
clear_msg = create_endpoint_snapshot_message(endpoint);
if (clear_msg) {
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
@ -284,11 +341,30 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
stasis_message_router_unsubscribe(endpoint->router);
}
const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
{
if (!endpoint) {
return NULL;
}
return endpoint->tech;
}
const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
{
if (!endpoint) {
return NULL;
}
return endpoint->resource;
}
const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint)
{
if (!endpoint) {
return NULL;
}
return endpoint->id;
}
void ast_endpoint_set_state(struct ast_endpoint *endpoint,
enum ast_endpoint_state state)
{
@ -354,3 +430,23 @@ struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
ao2_ref(snapshot, +1);
return snapshot;
}
static void endpoint_cleanup(void)
{
ao2_cleanup(endpoints);
endpoints = NULL;
}
int ast_endpoint_init(void)
{
ast_register_cleanup(endpoint_cleanup);
endpoints = ao2_container_alloc(ENDPOINT_BUCKETS, endpoint_hash,
endpoint_cmp);
if (!endpoints) {
return -1;
}
return 0;
}

View File

@ -686,9 +686,15 @@ struct ast_json *ast_json_pack(char const *format, ...)
}
struct ast_json *ast_json_vpack(char const *format, va_list ap)
{
json_error_t error;
struct ast_json *r = NULL;
if (format) {
r = (struct ast_json *)json_vpack_ex(NULL, 0, format, ap);
r = (struct ast_json *)json_vpack_ex(&error, 0, format, ap);
if (!r) {
ast_log(LOG_ERROR,
"Error building JSON from '%s': %s.\n",
format, error.text);
}
}
return r;
}

View File

@ -49,3 +49,7 @@ res_ari_events.so: ari/resource_events.o
ari/resource_events.o: _ASTCFLAGS+=$(call MOD_ASTCFLAGS,res_ari_events)
res_ari_applications.so: ari/resource_applications.o
ari/resource_applications.o: _ASTCFLAGS+=$(call MOD_ASTCFLAGS,res_ari_applications)

View File

@ -2530,6 +2530,7 @@ int ast_ari_validate_channel_userevent(struct ast_json *json)
int has_application = 0;
int has_channel = 0;
int has_eventname = 0;
int has_userevent = 0;
for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
if (strcmp("type", ast_json_object_iter_key(iter)) == 0) {
@ -2581,6 +2582,16 @@ int ast_ari_validate_channel_userevent(struct ast_json *json)
res = 0;
}
} else
if (strcmp("userevent", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_userevent = 1;
prop_is_valid = ast_ari_validate_object(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI ChannelUserevent field userevent failed validation\n");
res = 0;
}
} else
{
ast_log(LOG_ERROR,
"ARI ChannelUserevent has undocumented field %s\n",
@ -2609,6 +2620,11 @@ int ast_ari_validate_channel_userevent(struct ast_json *json)
res = 0;
}
if (!has_userevent) {
ast_log(LOG_ERROR, "ARI ChannelUserevent missing required field userevent\n");
res = 0;
}
return res;
}
@ -2721,6 +2737,85 @@ ari_validator ast_ari_validate_channel_varset_fn(void)
return ast_ari_validate_channel_varset;
}
int ast_ari_validate_endpoint_state_change(struct ast_json *json)
{
int res = 1;
struct ast_json_iter *iter;
int has_type = 0;
int has_application = 0;
int has_endpoint = 0;
for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
if (strcmp("type", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_type = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI EndpointStateChange field type failed validation\n");
res = 0;
}
} else
if (strcmp("application", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_application = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI EndpointStateChange field application failed validation\n");
res = 0;
}
} else
if (strcmp("timestamp", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
prop_is_valid = ast_ari_validate_date(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI EndpointStateChange field timestamp failed validation\n");
res = 0;
}
} else
if (strcmp("endpoint", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_endpoint = 1;
prop_is_valid = ast_ari_validate_endpoint(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI EndpointStateChange field endpoint failed validation\n");
res = 0;
}
} else
{
ast_log(LOG_ERROR,
"ARI EndpointStateChange has undocumented field %s\n",
ast_json_object_iter_key(iter));
res = 0;
}
}
if (!has_type) {
ast_log(LOG_ERROR, "ARI EndpointStateChange missing required field type\n");
res = 0;
}
if (!has_application) {
ast_log(LOG_ERROR, "ARI EndpointStateChange missing required field application\n");
res = 0;
}
if (!has_endpoint) {
ast_log(LOG_ERROR, "ARI EndpointStateChange missing required field endpoint\n");
res = 0;
}
return res;
}
ari_validator ast_ari_validate_endpoint_state_change_fn(void)
{
return ast_ari_validate_endpoint_state_change;
}
int ast_ari_validate_event(struct ast_json *json)
{
int res = 1;
@ -2783,6 +2878,9 @@ int ast_ari_validate_event(struct ast_json *json)
if (strcmp("ChannelVarset", discriminator) == 0) {
return ast_ari_validate_channel_varset(json);
} else
if (strcmp("EndpointStateChange", discriminator) == 0) {
return ast_ari_validate_endpoint_state_change(json);
} else
if (strcmp("PlaybackFinished", discriminator) == 0) {
return ast_ari_validate_playback_finished(json);
} else
@ -2918,6 +3016,9 @@ int ast_ari_validate_message(struct ast_json *json)
if (strcmp("ChannelVarset", discriminator) == 0) {
return ast_ari_validate_channel_varset(json);
} else
if (strcmp("EndpointStateChange", discriminator) == 0) {
return ast_ari_validate_endpoint_state_change(json);
} else
if (strcmp("Event", discriminator) == 0) {
return ast_ari_validate_event(json);
} else
@ -3361,3 +3462,92 @@ ari_validator ast_ari_validate_stasis_start_fn(void)
{
return ast_ari_validate_stasis_start;
}
int ast_ari_validate_application(struct ast_json *json)
{
int res = 1;
struct ast_json_iter *iter;
int has_bridge_ids = 0;
int has_channel_ids = 0;
int has_endpoint_ids = 0;
int has_name = 0;
for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
if (strcmp("bridge_ids", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_bridge_ids = 1;
prop_is_valid = ast_ari_validate_list(
ast_json_object_iter_value(iter),
ast_ari_validate_string);
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI Application field bridge_ids failed validation\n");
res = 0;
}
} else
if (strcmp("channel_ids", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_channel_ids = 1;
prop_is_valid = ast_ari_validate_list(
ast_json_object_iter_value(iter),
ast_ari_validate_string);
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI Application field channel_ids failed validation\n");
res = 0;
}
} else
if (strcmp("endpoint_ids", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_endpoint_ids = 1;
prop_is_valid = ast_ari_validate_list(
ast_json_object_iter_value(iter),
ast_ari_validate_string);
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI Application field endpoint_ids failed validation\n");
res = 0;
}
} else
if (strcmp("name", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_name = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI Application field name failed validation\n");
res = 0;
}
} else
{
ast_log(LOG_ERROR,
"ARI Application has undocumented field %s\n",
ast_json_object_iter_key(iter));
res = 0;
}
}
if (!has_bridge_ids) {
ast_log(LOG_ERROR, "ARI Application missing required field bridge_ids\n");
res = 0;
}
if (!has_channel_ids) {
ast_log(LOG_ERROR, "ARI Application missing required field channel_ids\n");
res = 0;
}
if (!has_endpoint_ids) {
ast_log(LOG_ERROR, "ARI Application missing required field endpoint_ids\n");
res = 0;
}
if (!has_name) {
ast_log(LOG_ERROR, "ARI Application missing required field name\n");
res = 0;
}
return res;
}
ari_validator ast_ari_validate_application_fn(void)
{
return ast_ari_validate_application;
}

View File

@ -54,6 +54,15 @@
*/
int ast_ari_validate_void(struct ast_json *json);
/*!
* \brief Validator for native Swagger object.
*
* \param json JSON object to validate.
* \returns True (non-zero) if valid.
* \returns False (zero) if invalid.
*/
int ast_ari_validate_object(struct ast_json *json);
/*!
* \brief Validator for native Swagger byte.
*
@ -745,6 +754,24 @@ int ast_ari_validate_channel_varset(struct ast_json *json);
*/
ari_validator ast_ari_validate_channel_varset_fn(void);
/*!
* \brief Validator for EndpointStateChange.
*
* Endpoint state changed.
*
* \param json JSON object to validate.
* \returns True (non-zero) if valid.
* \returns False (zero) if invalid.
*/
int ast_ari_validate_endpoint_state_change(struct ast_json *json);
/*!
* \brief Function pointer to ast_ari_validate_endpoint_state_change().
*
* See \ref ast_ari_model_validators.h for more details.
*/
ari_validator ast_ari_validate_endpoint_state_change_fn(void);
/*!
* \brief Validator for Event.
*
@ -871,6 +898,24 @@ int ast_ari_validate_stasis_start(struct ast_json *json);
*/
ari_validator ast_ari_validate_stasis_start_fn(void);
/*!
* \brief Validator for Application.
*
* Details of a Stasis application
*
* \param json JSON object to validate.
* \returns True (non-zero) if valid.
* \returns False (zero) if invalid.
*/
int ast_ari_validate_application(struct ast_json *json);
/*!
* \brief Function pointer to ast_ari_validate_application().
*
* See \ref ast_ari_model_validators.h for more details.
*/
ari_validator ast_ari_validate_application_fn(void);
/*
* JSON models
*
@ -1035,6 +1080,7 @@ ari_validator ast_ari_validate_stasis_start_fn(void);
* - timestamp: Date
* - channel: Channel (required)
* - eventname: string (required)
* - userevent: object (required)
* ChannelVarset
* - type: string (required)
* - application: string (required)
@ -1042,6 +1088,11 @@ ari_validator ast_ari_validate_stasis_start_fn(void);
* - channel: Channel
* - value: string (required)
* - variable: string (required)
* EndpointStateChange
* - type: string (required)
* - application: string (required)
* - timestamp: Date
* - endpoint: Endpoint (required)
* Event
* - type: string (required)
* - application: string (required)
@ -1072,6 +1123,11 @@ ari_validator ast_ari_validate_stasis_start_fn(void);
* - timestamp: Date
* - args: List[string] (required)
* - channel: Channel (required)
* Application
* - bridge_ids: List[string] (required)
* - channel_ids: List[string] (required)
* - endpoint_ids: List[string] (required)
* - name: string (required)
*/
#endif /* _ASTERISK_ARI_MODEL_H */

View File

@ -0,0 +1,173 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*! \file
*
* \brief /api-docs/applications.{format} implementation - Stasis application
* resources
*
* \author David M. Lee, II <dlee@digium.com>
*/
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_app.h"
#include "resource_applications.h"
static int append_json(void *obj, void *arg, int flags)
{
const char *app = obj;
struct ast_json *array = arg;
ast_json_array_append(array, stasis_app_to_json(app));
return 0;
}
void ast_ari_get_applications(struct ast_variable *headers,
struct ast_get_applications_args *args,
struct ast_ari_response *response)
{
RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
size_t count;
apps = stasis_app_get_all();
json = ast_json_array_create();
if (!apps || !json) {
ast_ari_response_error(response, 500, "Internal Server Error",
"Allocation failed");
return;
}
ao2_lock(apps);
count = ao2_container_count(apps);
ao2_callback(apps, OBJ_NOLOCK | OBJ_NODATA, append_json, json);
ao2_lock(apps);
if (count != ast_json_array_size(json)) {
ast_ari_response_error(response, 500, "Internal Server Error",
"Allocation failed");
return;
}
ast_ari_response_ok(response, json);
}
void ast_ari_get_application(struct ast_variable *headers,
struct ast_get_application_args *args,
struct ast_ari_response *response)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
json = stasis_app_to_json(args->application_name);
if (!json) {
ast_ari_response_error(response, 404, "Not Found",
"Application not found");
return;
}
ast_ari_response_ok(response, json);
}
void ast_ari_application_subscribe(struct ast_variable *headers,
struct ast_application_subscribe_args *args,
struct ast_ari_response *response)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
enum stasis_app_subscribe_res res;
if (args->event_source_count <= 0) {
ast_ari_response_error(response, 400, "Bad Request",
"Missing parameter eventSource");
return;
}
if (ast_strlen_zero(args->application_name)) {
ast_ari_response_error(response, 400, "Bad Request",
"Missing parameter applicationName");
return;
}
res = stasis_app_subscribe(args->application_name, args->event_source,
args->event_source_count, &json);
switch (res) {
case STASIS_ASR_OK:
ast_ari_response_ok(response, json);
break;
case STASIS_ASR_APP_NOT_FOUND:
ast_ari_response_error(response, 404, "Not Found",
"Application not found");
break;
case STASIS_ASR_EVENT_SOURCE_NOT_FOUND:
ast_ari_response_error(response, 422, "Unprocessable Entity",
"Event source does not exist");
break;
case STASIS_ASR_EVENT_SOURCE_BAD_SCHEME:
ast_ari_response_error(response, 400, "Bad Request",
"Invalid event source URI scheme");
break;
case STASIS_ASR_INTERNAL_ERROR:
ast_ari_response_error(response, 500, "Internal Server Error",
"Error processing request");
break;
}
}
void ast_ari_application_unsubscribe(struct ast_variable *headers,
struct ast_application_unsubscribe_args *args,
struct ast_ari_response *response)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
enum stasis_app_subscribe_res res;
if (args->event_source_count == 0) {
ast_ari_response_error(response, 400, "Bad Request",
"Missing parameter eventSource");
return;
}
res = stasis_app_unsubscribe(args->application_name, args->event_source,
args->event_source_count, &json);
switch (res) {
case STASIS_ASR_OK:
ast_ari_response_ok(response, json);
break;
case STASIS_ASR_APP_NOT_FOUND:
ast_ari_response_error(response, 404, "Not Found",
"Application not found");
break;
case STASIS_ASR_EVENT_SOURCE_NOT_FOUND:
ast_ari_response_error(response, 422, "Unprocessable Entity",
"Event source was not subscribed to");
break;
case STASIS_ASR_EVENT_SOURCE_BAD_SCHEME:
ast_ari_response_error(response, 400, "Bad Request",
"Invalid event source URI scheme");
break;
case STASIS_ASR_INTERNAL_ERROR:
ast_ari_response_error(response, 500, "Internal Server Error",
"Error processing request");
}
}

View File

@ -0,0 +1,109 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*! \file
*
* \brief Generated file - declares stubs to be implemented in
* res/ari/resource_applications.c
*
* Stasis application resources
*
* \author David M. Lee, II <dlee@digium.com>
*/
/*
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
* !!!!! DO NOT EDIT !!!!!
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
* This file is generated by a mustache template. Please see the original
* template in rest-api-templates/ari_resource.h.mustache
*/
#ifndef _ASTERISK_RESOURCE_APPLICATIONS_H
#define _ASTERISK_RESOURCE_APPLICATIONS_H
#include "asterisk/ari.h"
/*! \brief Argument struct for ast_ari_get_applications() */
struct ast_get_applications_args {
};
/*!
* \brief List all applications.
*
* \param headers HTTP headers
* \param args Swagger parameters
* \param[out] response HTTP response
*/
void ast_ari_get_applications(struct ast_variable *headers, struct ast_get_applications_args *args, struct ast_ari_response *response);
/*! \brief Argument struct for ast_ari_get_application() */
struct ast_get_application_args {
/*! \brief Application's name */
const char *application_name;
};
/*!
* \brief Get details of an application.
*
* \param headers HTTP headers
* \param args Swagger parameters
* \param[out] response HTTP response
*/
void ast_ari_get_application(struct ast_variable *headers, struct ast_get_application_args *args, struct ast_ari_response *response);
/*! \brief Argument struct for ast_ari_application_subscribe() */
struct ast_application_subscribe_args {
/*! \brief Application's name */
const char *application_name;
/*! \brief Array of URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}/{resource} */
const char **event_source;
/*! \brief Length of event_source array. */
size_t event_source_count;
/*! \brief Parsing context for event_source. */
char *event_source_parse;
};
/*!
* \brief Subscribe an application to a event source.
*
* Returns the state of the application after the subscriptions have changed
*
* \param headers HTTP headers
* \param args Swagger parameters
* \param[out] response HTTP response
*/
void ast_ari_application_subscribe(struct ast_variable *headers, struct ast_application_subscribe_args *args, struct ast_ari_response *response);
/*! \brief Argument struct for ast_ari_application_unsubscribe() */
struct ast_application_unsubscribe_args {
/*! \brief Application's name */
const char *application_name;
/*! \brief Array of URI for event source (channel:{channelId}, bridge:{bridgeId}, endpoint:{tech}/{resource} */
const char **event_source;
/*! \brief Length of event_source array. */
size_t event_source_count;
/*! \brief Parsing context for event_source. */
char *event_source_parse;
};
/*!
* \brief Unsubscribe an application from an event source.
*
* Returns the state of the application after the subscriptions have changed
*
* \param headers HTTP headers
* \param args Swagger parameters
* \param[out] response HTTP response
*/
void ast_ari_application_unsubscribe(struct ast_variable *headers, struct ast_application_unsubscribe_args *args, struct ast_ari_response *response);
#endif /* _ASTERISK_RESOURCE_APPLICATIONS_H */

View File

@ -43,7 +43,7 @@
struct ast_get_endpoints_args {
};
/*!
* \brief List all endoints.
* \brief List all endpoints.
*
* \param headers HTTP headers
* \param args Swagger parameters

425
res/res_ari_applications.c Normal file
View File

@ -0,0 +1,425 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
* !!!!! DO NOT EDIT !!!!!
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
* This file is generated by a mustache template. Please see the original
* template in rest-api-templates/res_ari_resource.c.mustache
*/
/*! \file
*
* \brief Stasis application resources
*
* \author David M. Lee, II <dlee@digium.com>
*/
/*** MODULEINFO
<depend type="module">res_ari</depend>
<depend type="module">res_stasis</depend>
<support_level>core</support_level>
***/
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/app.h"
#include "asterisk/module.h"
#include "asterisk/stasis_app.h"
#include "ari/resource_applications.h"
#if defined(AST_DEVMODE)
#include "ari/ari_model_validators.h"
#endif
#define MAX_VALS 128
/*!
* \brief Parameter parsing callback for /applications.
* \param get_params GET parameters in the HTTP request.
* \param path_vars Path variables extracted from the request.
* \param headers HTTP headers.
* \param[out] response Response to the HTTP request.
*/
static void ast_ari_get_applications_cb(
struct ast_variable *get_params, struct ast_variable *path_vars,
struct ast_variable *headers, struct ast_ari_response *response)
{
struct ast_get_applications_args args = {};
#if defined(AST_DEVMODE)
int is_valid;
int code;
#endif /* AST_DEVMODE */
ast_ari_get_applications(headers, &args, response);
#if defined(AST_DEVMODE)
code = response->response_code;
switch (code) {
case 0: /* Implementation is still a stub, or the code wasn't set */
is_valid = response->message == NULL;
break;
case 500: /* Internal Server Error */
case 501: /* Not Implemented */
is_valid = 1;
break;
default:
if (200 <= code && code <= 299) {
is_valid = ast_ari_validate_list(response->message,
ast_ari_validate_application_fn());
} else {
ast_log(LOG_ERROR, "Invalid error response %d for /applications\n", code);
is_valid = 0;
}
}
if (!is_valid) {
ast_log(LOG_ERROR, "Response validation failed for /applications\n");
ast_ari_response_error(response, 500,
"Internal Server Error", "Response validation failed");
}
#endif /* AST_DEVMODE */
fin: __attribute__((unused))
return;
}
/*!
* \brief Parameter parsing callback for /applications/{applicationName}.
* \param get_params GET parameters in the HTTP request.
* \param path_vars Path variables extracted from the request.
* \param headers HTTP headers.
* \param[out] response Response to the HTTP request.
*/
static void ast_ari_get_application_cb(
struct ast_variable *get_params, struct ast_variable *path_vars,
struct ast_variable *headers, struct ast_ari_response *response)
{
struct ast_get_application_args args = {};
struct ast_variable *i;
#if defined(AST_DEVMODE)
int is_valid;
int code;
#endif /* AST_DEVMODE */
for (i = path_vars; i; i = i->next) {
if (strcmp(i->name, "applicationName") == 0) {
args.application_name = (i->value);
} else
{}
}
ast_ari_get_application(headers, &args, response);
#if defined(AST_DEVMODE)
code = response->response_code;
switch (code) {
case 0: /* Implementation is still a stub, or the code wasn't set */
is_valid = response->message == NULL;
break;
case 500: /* Internal Server Error */
case 501: /* Not Implemented */
case 404: /* Application does not exist. */
is_valid = 1;
break;
default:
if (200 <= code && code <= 299) {
is_valid = ast_ari_validate_application(
response->message);
} else {
ast_log(LOG_ERROR, "Invalid error response %d for /applications/{applicationName}\n", code);
is_valid = 0;
}
}
if (!is_valid) {
ast_log(LOG_ERROR, "Response validation failed for /applications/{applicationName}\n");
ast_ari_response_error(response, 500,
"Internal Server Error", "Response validation failed");
}
#endif /* AST_DEVMODE */
fin: __attribute__((unused))
return;
}
/*!
* \brief Parameter parsing callback for /applications/{applicationName}/subscription.
* \param get_params GET parameters in the HTTP request.
* \param path_vars Path variables extracted from the request.
* \param headers HTTP headers.
* \param[out] response Response to the HTTP request.
*/
static void ast_ari_application_subscribe_cb(
struct ast_variable *get_params, struct ast_variable *path_vars,
struct ast_variable *headers, struct ast_ari_response *response)
{
struct ast_application_subscribe_args args = {};
struct ast_variable *i;
#if defined(AST_DEVMODE)
int is_valid;
int code;
#endif /* AST_DEVMODE */
for (i = get_params; i; i = i->next) {
if (strcmp(i->name, "eventSource") == 0) {
/* Parse comma separated list */
char *vals[MAX_VALS];
size_t j;
args.event_source_parse = ast_strdup(i->value);
if (!args.event_source_parse) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (strlen(args.event_source_parse) == 0) {
/* ast_app_separate_args can't handle "" */
args.event_source_count = 1;
vals[0] = args.event_source_parse;
} else {
args.event_source_count = ast_app_separate_args(
args.event_source_parse, ',', vals,
ARRAY_LEN(vals));
}
if (args.event_source_count == 0) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (args.event_source_count >= MAX_VALS) {
ast_ari_response_error(response, 400,
"Bad Request",
"Too many values for event_source");
goto fin;
}
args.event_source = ast_malloc(sizeof(*args.event_source) * args.event_source_count);
if (!args.event_source) {
ast_ari_response_alloc_failed(response);
goto fin;
}
for (j = 0; j < args.event_source_count; ++j) {
args.event_source[j] = (vals[j]);
}
} else
{}
}
for (i = path_vars; i; i = i->next) {
if (strcmp(i->name, "applicationName") == 0) {
args.application_name = (i->value);
} else
{}
}
ast_ari_application_subscribe(headers, &args, response);
#if defined(AST_DEVMODE)
code = response->response_code;
switch (code) {
case 0: /* Implementation is still a stub, or the code wasn't set */
is_valid = response->message == NULL;
break;
case 500: /* Internal Server Error */
case 501: /* Not Implemented */
case 400: /* Missing parameter. */
case 404: /* Application does not exist. */
case 422: /* Event source does not exist. */
is_valid = 1;
break;
default:
if (200 <= code && code <= 299) {
is_valid = ast_ari_validate_application(
response->message);
} else {
ast_log(LOG_ERROR, "Invalid error response %d for /applications/{applicationName}/subscription\n", code);
is_valid = 0;
}
}
if (!is_valid) {
ast_log(LOG_ERROR, "Response validation failed for /applications/{applicationName}/subscription\n");
ast_ari_response_error(response, 500,
"Internal Server Error", "Response validation failed");
}
#endif /* AST_DEVMODE */
fin: __attribute__((unused))
ast_free(args.event_source_parse);
ast_free(args.event_source);
return;
}
/*!
* \brief Parameter parsing callback for /applications/{applicationName}/subscription.
* \param get_params GET parameters in the HTTP request.
* \param path_vars Path variables extracted from the request.
* \param headers HTTP headers.
* \param[out] response Response to the HTTP request.
*/
static void ast_ari_application_unsubscribe_cb(
struct ast_variable *get_params, struct ast_variable *path_vars,
struct ast_variable *headers, struct ast_ari_response *response)
{
struct ast_application_unsubscribe_args args = {};
struct ast_variable *i;
#if defined(AST_DEVMODE)
int is_valid;
int code;
#endif /* AST_DEVMODE */
for (i = get_params; i; i = i->next) {
if (strcmp(i->name, "eventSource") == 0) {
/* Parse comma separated list */
char *vals[MAX_VALS];
size_t j;
args.event_source_parse = ast_strdup(i->value);
if (!args.event_source_parse) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (strlen(args.event_source_parse) == 0) {
/* ast_app_separate_args can't handle "" */
args.event_source_count = 1;
vals[0] = args.event_source_parse;
} else {
args.event_source_count = ast_app_separate_args(
args.event_source_parse, ',', vals,
ARRAY_LEN(vals));
}
if (args.event_source_count == 0) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (args.event_source_count >= MAX_VALS) {
ast_ari_response_error(response, 400,
"Bad Request",
"Too many values for event_source");
goto fin;
}
args.event_source = ast_malloc(sizeof(*args.event_source) * args.event_source_count);
if (!args.event_source) {
ast_ari_response_alloc_failed(response);
goto fin;
}
for (j = 0; j < args.event_source_count; ++j) {
args.event_source[j] = (vals[j]);
}
} else
{}
}
for (i = path_vars; i; i = i->next) {
if (strcmp(i->name, "applicationName") == 0) {
args.application_name = (i->value);
} else
{}
}
ast_ari_application_unsubscribe(headers, &args, response);
#if defined(AST_DEVMODE)
code = response->response_code;
switch (code) {
case 0: /* Implementation is still a stub, or the code wasn't set */
is_valid = response->message == NULL;
break;
case 500: /* Internal Server Error */
case 501: /* Not Implemented */
case 400: /* Missing parameter; event source scheme not recognized. */
case 404: /* Application does not exist. */
case 409: /* Application not subscribed to event source. */
case 422: /* Event source does not exist. */
is_valid = 1;
break;
default:
if (200 <= code && code <= 299) {
is_valid = ast_ari_validate_application(
response->message);
} else {
ast_log(LOG_ERROR, "Invalid error response %d for /applications/{applicationName}/subscription\n", code);
is_valid = 0;
}
}
if (!is_valid) {
ast_log(LOG_ERROR, "Response validation failed for /applications/{applicationName}/subscription\n");
ast_ari_response_error(response, 500,
"Internal Server Error", "Response validation failed");
}
#endif /* AST_DEVMODE */
fin: __attribute__((unused))
ast_free(args.event_source_parse);
ast_free(args.event_source);
return;
}
/*! \brief REST handler for /api-docs/applications.{format} */
static struct stasis_rest_handlers applications_applicationName_subscription = {
.path_segment = "subscription",
.callbacks = {
[AST_HTTP_POST] = ast_ari_application_subscribe_cb,
[AST_HTTP_DELETE] = ast_ari_application_unsubscribe_cb,
},
.num_children = 0,
.children = { }
};
/*! \brief REST handler for /api-docs/applications.{format} */
static struct stasis_rest_handlers applications_applicationName = {
.path_segment = "applicationName",
.is_wildcard = 1,
.callbacks = {
[AST_HTTP_GET] = ast_ari_get_application_cb,
},
.num_children = 1,
.children = { &applications_applicationName_subscription, }
};
/*! \brief REST handler for /api-docs/applications.{format} */
static struct stasis_rest_handlers applications = {
.path_segment = "applications",
.callbacks = {
[AST_HTTP_GET] = ast_ari_get_applications_cb,
},
.num_children = 1,
.children = { &applications_applicationName, }
};
static int load_module(void)
{
int res = 0;
stasis_app_ref();
res |= ast_ari_add_handler(&applications);
return res;
}
static int unload_module(void)
{
ast_ari_remove_handler(&applications);
stasis_app_unref();
return 0;
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_DEFAULT, "RESTful API module - Stasis application resources",
.load = load_module,
.unload = unload_module,
.nonoptreq = "res_ari,res_stasis",
);

View File

@ -95,6 +95,11 @@ int ast_ari_validate_void(struct ast_json *json)
return check_type(json, AST_JSON_NULL);
}
int ast_ari_validate_object(struct ast_json *json)
{
return check_type(json, AST_JSON_OBJECT);
}
int ast_ari_validate_byte(struct ast_json *json)
{
/* Java bytes are signed, which accounts for great fun for all */

View File

@ -734,6 +734,29 @@ int stasis_app_send(const char *app_name, struct ast_json *message)
return 0;
}
static int append_name(void *obj, void *arg, int flags)
{
struct app *app = obj;
struct ao2_container *apps = arg;
ast_str_container_add(apps, app_name(app));
return 0;
}
struct ao2_container *stasis_app_get_all(void)
{
RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
apps = ast_str_container_alloc(1);
if (!apps) {
return NULL;
}
ao2_callback(apps_registry, OBJ_NODATA, append_name, apps);
return ao2_bump(apps);
}
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
{
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
@ -783,6 +806,219 @@ void stasis_app_unregister(const char *app_name)
cleanup();
}
struct ast_json *stasis_app_to_json(const char *app_name)
{
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
if (app_name) {
app = ao2_find(apps_registry, app_name, OBJ_KEY);
}
if (!app) {
return NULL;
}
return app_to_json(app);
}
#define CHANNEL_SCHEME "channel:"
#define BRIDGE_SCHEME "bridge:"
#define ENDPOINT_SCHEME "endpoint:"
/*! Struct for capturing event source information */
struct event_source {
enum {
EVENT_SOURCE_CHANNEL,
EVENT_SOURCE_BRIDGE,
EVENT_SOURCE_ENDPOINT,
} event_source_type;
union {
struct ast_channel *channel;
struct ast_bridge *bridge;
struct ast_endpoint *endpoint;
};
};
enum stasis_app_subscribe_res stasis_app_subscribe(const char *app_name,
const char **event_source_uris, int event_sources_count,
struct ast_json **json)
{
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
RAII_VAR(struct event_source *, event_sources, NULL, ast_free);
enum stasis_app_subscribe_res res = STASIS_ASR_OK;
int i;
if (app_name) {
app = ao2_find(apps_registry, app_name, OBJ_KEY);
}
if (!app) {
ast_log(LOG_WARNING, "Could not find app '%s'\n",
app_name ? : "(null)");
return STASIS_ASR_APP_NOT_FOUND;
}
event_sources = ast_calloc(event_sources_count, sizeof(*event_sources));
if (!event_sources) {
return STASIS_ASR_INTERNAL_ERROR;
}
for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
const char *uri = event_source_uris[i];
ast_debug(3, "%s: Checking %s\n", app_name,
uri);
if (ast_begins_with(uri, CHANNEL_SCHEME)) {
event_sources[i].event_source_type =
EVENT_SOURCE_CHANNEL;
event_sources[i].channel = ast_channel_get_by_name(
uri + strlen(CHANNEL_SCHEME));
if (!event_sources[i].channel) {
ast_log(LOG_WARNING, "Channel not found: %s\n", uri);
res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
} else if (ast_begins_with(uri, BRIDGE_SCHEME)) {
event_sources[i].event_source_type =
EVENT_SOURCE_BRIDGE;
event_sources[i].bridge = stasis_app_bridge_find_by_id(
uri + strlen(BRIDGE_SCHEME));
if (!event_sources[i].bridge) {
ast_log(LOG_WARNING, "Bridge not found: %s\n", uri);
res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
} else if (ast_begins_with(uri, ENDPOINT_SCHEME)) {
event_sources[i].event_source_type =
EVENT_SOURCE_ENDPOINT;
event_sources[i].endpoint = ast_endpoint_find_by_id(
uri + strlen(ENDPOINT_SCHEME));
if (!event_sources[i].endpoint) {
ast_log(LOG_WARNING, "Endpoint not found: %s\n", uri);
res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
} else {
ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
res = STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
}
}
for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
int sub_res = -1;
ast_debug(1, "%s: Subscribing to %s\n", app_name,
event_source_uris[i]);
switch (event_sources[i].event_source_type) {
case EVENT_SOURCE_CHANNEL:
sub_res = app_subscribe_channel(app,
event_sources[i].channel);
break;
case EVENT_SOURCE_BRIDGE:
sub_res = app_subscribe_bridge(app,
event_sources[i].bridge);
break;
case EVENT_SOURCE_ENDPOINT:
sub_res = app_subscribe_endpoint(app,
event_sources[i].endpoint);
break;
}
if (sub_res != 0) {
ast_log(LOG_WARNING,
"Error subscribing app '%s' to '%s'\n",
app_name, event_source_uris[i]);
res = STASIS_ASR_INTERNAL_ERROR;
}
}
if (res == STASIS_ASR_OK && json) {
ast_debug(1, "%s: Successful; setting results\n", app_name);
*json = app_to_json(app);
}
for (i = 0; i < event_sources_count; ++i) {
switch (event_sources[i].event_source_type) {
case EVENT_SOURCE_CHANNEL:
event_sources[i].channel =
ast_channel_cleanup(event_sources[i].channel);
break;
case EVENT_SOURCE_BRIDGE:
ao2_cleanup(event_sources[i].bridge);
event_sources[i].bridge = NULL;
break;
case EVENT_SOURCE_ENDPOINT:
ao2_cleanup(event_sources[i].endpoint);
event_sources[i].endpoint = NULL;
break;
}
}
return res;
}
enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
const char **event_source_uris, int event_sources_count,
struct ast_json **json)
{
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
enum stasis_app_subscribe_res res = STASIS_ASR_OK;
int i;
if (app_name) {
ast_log(LOG_WARNING, "Could not find app '%s'\n",
app_name ? : "(null)");
app = ao2_find(apps_registry, app_name, OBJ_KEY);
}
if (!app) {
return STASIS_ASR_APP_NOT_FOUND;
}
/* Validate the input */
for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
if (ast_begins_with(event_source_uris[i], CHANNEL_SCHEME)) {
const char *channel_id = event_source_uris[i] +
strlen(CHANNEL_SCHEME);
if (!app_is_subscribed_channel_id(app, channel_id)) {
res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
} else if (ast_begins_with(event_source_uris[i], BRIDGE_SCHEME)) {
const char *bridge_id = event_source_uris[i] +
strlen(BRIDGE_SCHEME);
if (!app_is_subscribed_bridge_id(app, bridge_id)) {
res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
} else if (ast_begins_with(event_source_uris[i], ENDPOINT_SCHEME)) {
const char *endpoint_id = event_source_uris[i] +
strlen(ENDPOINT_SCHEME);
if (!app_is_subscribed_endpoint_id(app, endpoint_id)) {
res = STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
} else {
res = STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
}
}
for (i = 0; res == STASIS_ASR_OK && i < event_sources_count; ++i) {
if (ast_begins_with(event_source_uris[i], CHANNEL_SCHEME)) {
const char *channel_id = event_source_uris[i] +
strlen(CHANNEL_SCHEME);
app_unsubscribe_channel_id(app, channel_id);
} else if (ast_begins_with(event_source_uris[i], BRIDGE_SCHEME)) {
const char *bridge_id = event_source_uris[i] +
strlen(BRIDGE_SCHEME);
app_unsubscribe_bridge_id(app, bridge_id);
} else if (ast_begins_with(event_source_uris[i], ENDPOINT_SCHEME)) {
const char *endpoint_id = event_source_uris[i] +
strlen(ENDPOINT_SCHEME);
app_unsubscribe_endpoint_id(app, endpoint_id);
}
}
if (res == STASIS_ASR_OK && json) {
*json = app_to_json(app);
}
return res;
}
void stasis_app_ref(void)
{
ast_module_ref(ast_module_info->self);

View File

@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/stasis_app.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
struct app {
@ -52,6 +53,12 @@ struct app {
char name[];
};
enum forward_type {
FORWARD_CHANNEL,
FORWARD_BRIDGE,
FORWARD_ENDPOINT,
};
/*! Subscription info for a particular channel/bridge. */
struct app_forwards {
/*! Count of number of times this channel/bridge has been subscribed */
@ -62,6 +69,8 @@ struct app_forwards {
/*! Forward for the caching topic */
struct stasis_forward *topic_cached_forward;
/* Type of object being forwarded */
enum forward_type forward_type;
/*! Unique id of the object being forwarded */
char id[];
};
@ -119,6 +128,7 @@ static struct app_forwards *forwards_create_channel(struct app *app,
return NULL;
}
forwards->forward_type = FORWARD_CHANNEL;
forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
app->topic);
if (!forwards->topic_forward) {
@ -153,6 +163,7 @@ static struct app_forwards *forwards_create_bridge(struct app *app,
return NULL;
}
forwards->forward_type = FORWARD_BRIDGE;
forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
app->topic);
if (!forwards->topic_forward) {
@ -172,6 +183,41 @@ static struct app_forwards *forwards_create_bridge(struct app *app,
return forwards;
}
/*! Forward a endpoint's topics to an app */
static struct app_forwards *forwards_create_endpoint(struct app *app,
struct ast_endpoint *endpoint)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
if (!app || !endpoint) {
return NULL;
}
forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_ENDPOINT;
forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
app->topic);
if (!forwards->topic_forward) {
return NULL;
}
forwards->topic_cached_forward = stasis_forward_all(
ast_endpoint_topic_cached(endpoint), app->topic);
if (!forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
return NULL;
}
ao2_ref(forwards, +1);
return forwards;
}
static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
{
const struct app_forwards *object_left = obj_left;
@ -397,6 +443,47 @@ static void sub_channel_update_handler(void *data,
}
}
static struct ast_json *simple_endpoint_event(
const char *type,
struct ast_endpoint_snapshot *snapshot,
const struct timeval *tv)
{
return ast_json_pack("{s: s, s: o, s: o}",
"type", type,
"timestamp", ast_json_timeval(*tv, NULL),
"endpoint", ast_endpoint_snapshot_to_json(snapshot));
}
static void sub_endpoint_update_handler(void *data,
struct stasis_subscription *sub,
struct stasis_message *message)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct app *app = data;
struct stasis_cache_update *update;
struct ast_endpoint_snapshot *new_snapshot;
const struct timeval *tv;
ast_assert(stasis_message_type(message) == stasis_cache_update_type());
update = stasis_message_data(message);
ast_assert(update->type == ast_endpoint_snapshot_type());
new_snapshot = stasis_message_data(update->new_snapshot);
tv = update->new_snapshot ?
stasis_message_timestamp(update->new_snapshot) :
stasis_message_timestamp(message);
json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
if (!json) {
return;
}
app_send(app, json);
}
static struct ast_json *simple_bridge_event(
const char *type,
struct ast_bridge_snapshot *snapshot,
@ -526,6 +613,9 @@ struct app *app_create(const char *name, stasis_app_cb handler, void *data)
res |= stasis_message_router_add_cache_update(app->router,
ast_channel_snapshot_type(), sub_channel_update_handler, app);
res |= stasis_message_router_add_cache_update(app->router,
ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
res |= stasis_message_router_set_default(app->router,
sub_default_handler, app);
@ -640,6 +730,56 @@ const char *app_name(const struct app *app)
return app->name;
}
struct ast_json *app_to_json(const struct app *app)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct ast_json *channels;
struct ast_json *bridges;
struct ast_json *endpoints;
struct ao2_iterator i;
void *obj;
json = ast_json_pack("{s: s, s: [], s: [], s: []}",
"name", app->name,
"channel_ids", "bridge_ids", "endpoint_ids");
channels = ast_json_object_get(json, "channel_ids");
bridges = ast_json_object_get(json, "bridge_ids");
endpoints = ast_json_object_get(json, "endpoint_ids");
i = ao2_iterator_init(app->forwards, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup);
RAII_VAR(struct ast_json *, id, NULL, ast_json_unref);
int append_res = -1;
id = ast_json_string_create(forwards->id);
switch (forwards->forward_type) {
case FORWARD_CHANNEL:
append_res = ast_json_array_append(channels,
ast_json_ref(id));
break;
case FORWARD_BRIDGE:
append_res = ast_json_array_append(bridges,
ast_json_ref(id));
break;
case FORWARD_ENDPOINT:
append_res = ast_json_array_append(endpoints,
ast_json_ref(id));
break;
}
if (append_res != 0) {
ast_log(LOG_ERROR, "Error building response\n");
ao2_iterator_destroy(&i);
return NULL;
}
}
ao2_iterator_destroy(&i);
return ast_json_ref(json);
}
int app_subscribe_channel(struct app *app, struct ast_channel *chan)
{
int res;
@ -678,8 +818,8 @@ static int unsubscribe(struct app *app, const char *kind, const char *id)
forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
ast_log(LOG_ERROR,
"App '%s' not subscribed to %s '%s'",
ast_log(LOG_WARNING,
"App '%s' not subscribed to %s '%s'\n",
app->name, kind, id);
return -1;
}
@ -701,7 +841,23 @@ int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
return -1;
}
return unsubscribe(app, "channel", ast_channel_uniqueid(chan));
return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
}
int app_unsubscribe_channel_id(struct app *app, const char *channel_id)
{
if (!app || !channel_id) {
return -1;
}
return unsubscribe(app, "channel", channel_id);
}
int app_is_subscribed_channel_id(struct app *app, const char *channel_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
@ -735,5 +891,46 @@ int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
return -1;
}
return unsubscribe(app, "bridge", bridge->uniqueid);
return app_unsubscribe_bridge_id(app, bridge->uniqueid);
}
int app_unsubscribe_bridge_id(struct app *app, const char *bridge_id)
{
if (!app || !bridge_id) {
return -1;
}
return unsubscribe(app, "bridge", bridge_id);
}
int app_is_subscribed_bridge_id(struct app *app, const char *bridge_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
int app_subscribe_endpoint(struct app *app, struct ast_endpoint *endpoint)
{
if (!app || !endpoint) {
return -1;
} else {
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
/* Forwards not found, create one */
forwards = forwards_create_endpoint(app, endpoint);
if (!forwards) {
return -1;
}
ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
}
++forwards->interested;
return 0;
}
}

View File

@ -114,6 +114,8 @@ void app_send(struct app *app, struct ast_json *message);
struct app_forwards;
struct ast_json *app_to_json(const struct app *app);
/*!
* \brief Subscribes an application to a channel.
*
@ -128,10 +130,32 @@ int app_subscribe_channel(struct app *app, struct ast_channel *chan);
* \brief Cancel the subscription an app has for a channel.
*
* \param app Subscribing application.
* \param forwards Returned object from app_subscribe_channel().
* \param chan Channel to unsubscribe from.
* \return 0 on success.
* \return Non-zero on error.
*/
int app_unsubscribe_channel(struct app *app, struct ast_channel *chan);
/*!
* \brief Cancel the subscription an app has for a channel.
*
* \param app Subscribing application.
* \param channel_id Id of channel to unsubscribe from.
* \return 0 on success.
* \return Non-zero on error.
*/
int app_unsubscribe_channel_id(struct app *app, const char *channel_id);
/*!
* \brief Test if an app is subscribed to a channel.
*
* \param app Subscribing application.
* \param channel_id Id of channel to check.
* \return True (non-zero) if channel is subscribed to \a app.
* \return False (zero) if channel is not subscribed.
*/
int app_is_subscribed_channel_id(struct app *app, const char *channel_id);
/*!
* \brief Add a bridge subscription to an existing channel subscription.
*
@ -152,4 +176,54 @@ int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge);
*/
int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge);
/*!
* \brief Cancel the subscription an app has for a bridge.
*
* \param app Subscribing application.
* \param bridge_id Id of bridge to unsubscribe from.
* \return 0 on success.
* \return Non-zero on error.
*/
int app_unsubscribe_bridge_id(struct app *app, const char *bridge_id);
/*!
* \brief Test if an app is subscribed to a bridge.
*
* \param app Subscribing application.
* \param bridge_id Id of bridge to check.
* \return True (non-zero) if bridge is subscribed to \a app.
* \return False (zero) if bridge is not subscribed.
*/
int app_is_subscribed_bridge_id(struct app *app, const char *bridge_id);
/*!
* \brief Subscribes an application to a endpoint.
*
* \param app Application.
* \param chan Endpoint to subscribe to.
* \return 0 on success.
* \return Non-zero on error.
*/
int app_subscribe_endpoint(struct app *app, struct ast_endpoint *endpoint);
/*!
* \brief Cancel the subscription an app has for a endpoint.
*
* \param app Subscribing application.
* \param endpoint_id Id of endpoint to unsubscribe from.
* \return 0 on success.
* \return Non-zero on error.
*/
int app_unsubscribe_endpoint_id(struct app *app, const char *endpoint_id);
/*!
* \brief Test if an app is subscribed to a endpoint.
*
* \param app Subscribing application.
* \param endpoint_id Id of endpoint to check.
* \return True (non-zero) if endpoint is subscribed to \a app.
* \return False (zero) if endpoint is not subscribed.
*/
int app_is_subscribed_endpoint_id(struct app *app, const char *endpoint_id);
#endif /* _ASTERISK_RES_STASIS_APP_H */

View File

@ -52,6 +52,15 @@
*/
int ast_ari_validate_void(struct ast_json *json);
/*!
* \brief Validator for native Swagger object.
*
* \param json JSON object to validate.
* \returns True (non-zero) if valid.
* \returns False (zero) if invalid.
*/
int ast_ari_validate_object(struct ast_json *json);
/*!
* \brief Validator for native Swagger byte.
*

View File

@ -13,7 +13,7 @@
"operations": [
{
"httpMethod": "GET",
"summary": "List all endoints.",
"summary": "List all endpoints.",
"nickname": "getEndpoints",
"responseClass": "List[Endpoint]"
}

View File

@ -293,6 +293,11 @@
"required": true,
"type": "Channel",
"description": "The channel that signaled the user event."
},
"userevent": {
"required": true,
"type": "object",
"description": "Custom Userevent data"
}
}
},
@ -338,6 +343,17 @@
}
}
},
"EndpointStateChange": {
"id": "EndpointStateChange",
"extends": "Event",
"description": "Endpoint state changed.",
"properties": {
"endpoint": {
"required": true,
"type": "Endpoint"
}
}
},
"StasisEnd": {
"id": "StasisEnd",
"extends": "Event",

View File

@ -37,6 +37,10 @@
{
"path": "/api-docs/events.{format}",
"description": "WebSocket resource"
},
{
"path": "/api-docs/applications.{format}",
"description": "Stasis application resources"
}
]
}