asterisk/res/res_ari_events.c

476 lines
12 KiB
C
Raw Permalink Normal View History

/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2012 - 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
* !!!!! DO NOT EDIT !!!!!
* !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
* This file is generated by a mustache template. Please see the original
* template in rest-api-templates/res_ari_resource.c.mustache
*/
/*! \file
*
* \brief WebSocket resource
*
* \author David M. Lee, II <dlee@digium.com>
*/
/*** MODULEINFO
<depend type="module">res_ari</depend>
<depend type="module">res_ari_model</depend>
<depend type="module">res_stasis</depend>
<depend type="module">res_http_websocket</depend>
<support_level>core</support_level>
***/
#include "asterisk.h"
#include "asterisk/app.h"
#include "asterisk/module.h"
#include "asterisk/stasis_app.h"
#include "ari/resource_events.h"
Update events to use Swagger 1.3 subtyping, and related aftermath This patch started with the simple idea of changing the /events data model to be more sane. The original model would send out events like: { "stasis_start": { "args": [], "channel": { ... } } } The event discriminator was the field name instead of being a value in the object, due to limitations in how Swagger 1.1 could model objects. While technically sufficient in communicating event information, it was really difficult to deal with in terms of client side JSON handling. This patch takes advantage of a proposed extension[1] to Swagger which allows type variance through the use of a discriminator field. This had a domino effect that made this a surprisingly large patch. [1]: https://groups.google.com/d/msg/wordnik-api/EC3rGajE0os/ey_5dBI_jWcJ In changing the models, I also had to change the swagger_model.py processor so it can handle the type discriminator and subtyping. I took that a big step forward, and using that information to generate an ari_model module, which can validate a JSON object against the Swagger model. The REST and WebSocket generators were changed to take advantage of the validators. If compiled with AST_DEVMODE enabled, JSON objects that don't match their corresponding models will not be sent out. For REST API calls, a 500 Internal Server response is sent. For WebSockets, the invalid JSON message is replaced with an error message. Since this took over about half of the job of the existing JSON generators, and the .to_json virtual function on messages took over the other half, I reluctantly removed the generators. The validators turned up all sorts of errors and inconsistencies in our data models, and the code. These were cleaned up, with checks in the code generator avoid some of the consistency problems in the future. * The model for a channel snapshot was trimmed down to match the information sent via AMI. Many of the field being sent were not useful in the general case. * The model for a bridge snapshot was updated to be more consistent with the other ARI models. Another impact of introducing subtyping was that the swagger-codegen documentation generator was insufficient (at least until it catches up with Swagger 1.2). I wanted it to be easier to generate docs for the API anyways, so I ported the wiki pages to use the Asterisk Swagger generator. In the process, I was able to clean up many of the model links, which would occasionally give inconsistent results on the wiki. I also added error responses to the wiki docs, making the wiki documentation more complete. Finally, since Stasis-HTTP will now be named Asterisk REST Interface (ARI), any new functions and files I created carry the ari_ prefix. I changed a few stasis_http references to ari where it was non-intrusive and made sense. (closes issue ASTERISK-21885) Review: https://reviewboard.asterisk.org/r/2639/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@393529 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-07-03 16:32:41 +00:00
#if defined(AST_DEVMODE)
#include "ari/ari_model_validators.h"
Update events to use Swagger 1.3 subtyping, and related aftermath This patch started with the simple idea of changing the /events data model to be more sane. The original model would send out events like: { "stasis_start": { "args": [], "channel": { ... } } } The event discriminator was the field name instead of being a value in the object, due to limitations in how Swagger 1.1 could model objects. While technically sufficient in communicating event information, it was really difficult to deal with in terms of client side JSON handling. This patch takes advantage of a proposed extension[1] to Swagger which allows type variance through the use of a discriminator field. This had a domino effect that made this a surprisingly large patch. [1]: https://groups.google.com/d/msg/wordnik-api/EC3rGajE0os/ey_5dBI_jWcJ In changing the models, I also had to change the swagger_model.py processor so it can handle the type discriminator and subtyping. I took that a big step forward, and using that information to generate an ari_model module, which can validate a JSON object against the Swagger model. The REST and WebSocket generators were changed to take advantage of the validators. If compiled with AST_DEVMODE enabled, JSON objects that don't match their corresponding models will not be sent out. For REST API calls, a 500 Internal Server response is sent. For WebSockets, the invalid JSON message is replaced with an error message. Since this took over about half of the job of the existing JSON generators, and the .to_json virtual function on messages took over the other half, I reluctantly removed the generators. The validators turned up all sorts of errors and inconsistencies in our data models, and the code. These were cleaned up, with checks in the code generator avoid some of the consistency problems in the future. * The model for a channel snapshot was trimmed down to match the information sent via AMI. Many of the field being sent were not useful in the general case. * The model for a bridge snapshot was updated to be more consistent with the other ARI models. Another impact of introducing subtyping was that the swagger-codegen documentation generator was insufficient (at least until it catches up with Swagger 1.2). I wanted it to be easier to generate docs for the API anyways, so I ported the wiki pages to use the Asterisk Swagger generator. In the process, I was able to clean up many of the model links, which would occasionally give inconsistent results on the wiki. I also added error responses to the wiki docs, making the wiki documentation more complete. Finally, since Stasis-HTTP will now be named Asterisk REST Interface (ARI), any new functions and files I created carry the ari_ prefix. I changed a few stasis_http references to ari where it was non-intrusive and made sense. (closes issue ASTERISK-21885) Review: https://reviewboard.asterisk.org/r/2639/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@393529 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-07-03 16:32:41 +00:00
#endif
optional_api: Fix linking problems between modules that export global symbols With the new work in Asterisk 12, there are some uses of the optional_api that are prone to failure. The details are rather involved, and captured on [the wiki][1]. This patch addresses the issue by removing almost all of the magic from the optional API implementation. Instead of relying on weak symbol resolution, a new optional_api.c module was added to Asterisk core. For modules providing an optional API, the pointer to the implementation function is registered with the core. For modules that use an optional API, a pointer to a stub function, along with a optional_ref function pointer are registered with the core. The optional_ref function pointers is set to the implementation function when it's provided, or the stub function when it's now. Since the implementation no longer relies on magic, it is now supported on all platforms. In the spirit of choice, an OPTIONAL_API flag was added, so we can disable the optional_api if needed (maybe it's buggy on some bizarre platform I haven't tested on) The AST_OPTIONAL_API*() macros themselves remained unchanged, so existing code could remain unchanged. But to help with debugging the optional_api, the patch limits the #include of optional API's to just the modules using the API. This also reduces resource waste maintaining optional_ref pointers that aren't used. Other changes made as a part of this patch: * The stubs for http_websocket that wrap system calls set errno to ENOSYS. * res_http_websocket now properly increments module use count. * In loader.c, the while() wrappers around dlclose() were removed. The while(!dlclose()) is actually an anti-pattern, which can lead to infinite loops if the module you're attempting to unload exports a symbol that was directly linked to. * The special handling of nonoptreq on systems without weak symbol support was removed, since we no longer rely on weak symbols for optional_api. [1]: https://wiki.asterisk.org/wiki/x/wACUAQ (closes issue ASTERISK-22296) Reported by: Matt Jordan Review: https://reviewboard.asterisk.org/r/2797/ ........ Merged revisions 397989 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@397990 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-08-30 13:40:27 +00:00
#include "asterisk/http_websocket.h"
#define MAX_VALS 128
ARI: Channels added to Stasis application during WebSocket creation ... Prior to ASTERISK-24988, the WebSocket handshake was resolved before Stasis applications were registered. This was done such that the WebSocket would be ready when an application is registered. However, by creating the WebSocket first, the client had the ability to make requests for the Stasis application it thought had been created with the initial handshake request. The inevitable conclusion of this scenario was the cart being put before the horse. ASTERISK-24988 resolved half of the problem by ensuring that the applications were created and registered with Stasis prior to completing the handshake with the client. While this meant that Stasis was ready when the client received the green-light from Asterisk, it also meant that the WebSocket was not yet ready for Stasis to dispatch messages. This patch introduces a message queuing mechanism for delaying messages from Stasis applications while the WebSocket is being constructed. When the ARI event processor receives the message from the WebSocket that it is being created, the event processor instantiates an event session which contains a message queue. It then tries to create and register the requested applications with Stasis. Messages that are dispatched from Stasis between this point and the point at which the event processor is notified the WebSocket is ready, are stashed in the queue. Once the WebSocket has been built, the queue's messages are dispatched in the order in which they were originally received and the queue is concurrently cleared. ASTERISK-25181 #close Reported By: Matt Jordan Change-Id: Iafef7b85a2e0bf78c114db4c87ffc3d16d671a17
2015-07-31 16:27:23 +00:00
static int ast_ari_events_event_websocket_ws_attempted_cb(struct ast_tcptls_session_instance *ser,
struct ast_variable *get_params, struct ast_variable *headers, const char *session_id)
{
struct ast_ari_events_event_websocket_args args = {};
int res = 0;
RAII_VAR(struct ast_ari_response *, response, NULL, ast_free);
struct ast_variable *i;
response = ast_calloc(1, sizeof(*response));
if (!response) {
ast_log(LOG_ERROR, "Failed to create response.\n");
goto fin;
}
for (i = get_params; i; i = i->next) {
if (strcmp(i->name, "app") == 0) {
/* Parse comma separated list */
char *vals[MAX_VALS];
size_t j;
args.app_parse = ast_strdup(i->value);
if (!args.app_parse) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (strlen(args.app_parse) == 0) {
/* ast_app_separate_args can't handle "" */
args.app_count = 1;
vals[0] = args.app_parse;
} else {
args.app_count = ast_app_separate_args(
args.app_parse, ',', vals,
ARRAY_LEN(vals));
}
if (args.app_count == 0) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (args.app_count >= MAX_VALS) {
ast_ari_response_error(response, 400,
"Bad Request",
"Too many values for app");
goto fin;
}
args.app = ast_malloc(sizeof(*args.app) * args.app_count);
if (!args.app) {
ast_ari_response_alloc_failed(response);
goto fin;
}
for (j = 0; j < args.app_count; ++j) {
args.app[j] = (vals[j]);
}
} else
if (strcmp(i->name, "subscribeAll") == 0) {
args.subscribe_all = ast_true(i->value);
} else
{}
}
ARI: Channels added to Stasis application during WebSocket creation ... Prior to ASTERISK-24988, the WebSocket handshake was resolved before Stasis applications were registered. This was done such that the WebSocket would be ready when an application is registered. However, by creating the WebSocket first, the client had the ability to make requests for the Stasis application it thought had been created with the initial handshake request. The inevitable conclusion of this scenario was the cart being put before the horse. ASTERISK-24988 resolved half of the problem by ensuring that the applications were created and registered with Stasis prior to completing the handshake with the client. While this meant that Stasis was ready when the client received the green-light from Asterisk, it also meant that the WebSocket was not yet ready for Stasis to dispatch messages. This patch introduces a message queuing mechanism for delaying messages from Stasis applications while the WebSocket is being constructed. When the ARI event processor receives the message from the WebSocket that it is being created, the event processor instantiates an event session which contains a message queue. It then tries to create and register the requested applications with Stasis. Messages that are dispatched from Stasis between this point and the point at which the event processor is notified the WebSocket is ready, are stashed in the queue. Once the WebSocket has been built, the queue's messages are dispatched in the order in which they were originally received and the queue is concurrently cleared. ASTERISK-25181 #close Reported By: Matt Jordan Change-Id: Iafef7b85a2e0bf78c114db4c87ffc3d16d671a17
2015-07-31 16:27:23 +00:00
res = ast_ari_websocket_events_event_websocket_attempted(ser, headers, &args, session_id);
fin: __attribute__((unused))
if (!response) {
ast_http_error(ser, 500, "Server Error", "Memory allocation error");
res = -1;
} else if (response->response_code != 0) {
/* Param parsing failure */
RAII_VAR(char *, msg, NULL, ast_json_free);
if (response->message) {
msg = ast_json_dump_string(response->message);
} else {
ast_log(LOG_ERROR, "Missing response message\n");
}
if (msg) {
ast_http_error(ser, response->response_code, response->response_text, msg);
}
res = -1;
}
ast_free(args.app_parse);
ast_free(args.app);
return res;
}
static void ast_ari_events_event_websocket_ws_established_cb(struct ast_websocket *ws_session,
struct ast_variable *get_params, struct ast_variable *headers)
{
struct ast_ari_events_event_websocket_args args = {};
RAII_VAR(struct ast_ari_response *, response, NULL, ast_free);
struct ast_variable *i;
RAII_VAR(struct ast_websocket *, s, ws_session, ast_websocket_unref);
RAII_VAR(struct ast_ari_websocket_session *, session, NULL, ao2_cleanup);
SCOPED_MODULE_USE(ast_module_info->self);
response = ast_calloc(1, sizeof(*response));
if (!response) {
ast_log(LOG_ERROR, "Failed to create response.\n");
goto fin;
}
Update events to use Swagger 1.3 subtyping, and related aftermath This patch started with the simple idea of changing the /events data model to be more sane. The original model would send out events like: { "stasis_start": { "args": [], "channel": { ... } } } The event discriminator was the field name instead of being a value in the object, due to limitations in how Swagger 1.1 could model objects. While technically sufficient in communicating event information, it was really difficult to deal with in terms of client side JSON handling. This patch takes advantage of a proposed extension[1] to Swagger which allows type variance through the use of a discriminator field. This had a domino effect that made this a surprisingly large patch. [1]: https://groups.google.com/d/msg/wordnik-api/EC3rGajE0os/ey_5dBI_jWcJ In changing the models, I also had to change the swagger_model.py processor so it can handle the type discriminator and subtyping. I took that a big step forward, and using that information to generate an ari_model module, which can validate a JSON object against the Swagger model. The REST and WebSocket generators were changed to take advantage of the validators. If compiled with AST_DEVMODE enabled, JSON objects that don't match their corresponding models will not be sent out. For REST API calls, a 500 Internal Server response is sent. For WebSockets, the invalid JSON message is replaced with an error message. Since this took over about half of the job of the existing JSON generators, and the .to_json virtual function on messages took over the other half, I reluctantly removed the generators. The validators turned up all sorts of errors and inconsistencies in our data models, and the code. These were cleaned up, with checks in the code generator avoid some of the consistency problems in the future. * The model for a channel snapshot was trimmed down to match the information sent via AMI. Many of the field being sent were not useful in the general case. * The model for a bridge snapshot was updated to be more consistent with the other ARI models. Another impact of introducing subtyping was that the swagger-codegen documentation generator was insufficient (at least until it catches up with Swagger 1.2). I wanted it to be easier to generate docs for the API anyways, so I ported the wiki pages to use the Asterisk Swagger generator. In the process, I was able to clean up many of the model links, which would occasionally give inconsistent results on the wiki. I also added error responses to the wiki docs, making the wiki documentation more complete. Finally, since Stasis-HTTP will now be named Asterisk REST Interface (ARI), any new functions and files I created carry the ari_ prefix. I changed a few stasis_http references to ari where it was non-intrusive and made sense. (closes issue ASTERISK-21885) Review: https://reviewboard.asterisk.org/r/2639/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@393529 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-07-03 16:32:41 +00:00
#if defined(AST_DEVMODE)
session = ast_ari_websocket_session_create(ws_session,
ast_ari_validate_message_fn());
Update events to use Swagger 1.3 subtyping, and related aftermath This patch started with the simple idea of changing the /events data model to be more sane. The original model would send out events like: { "stasis_start": { "args": [], "channel": { ... } } } The event discriminator was the field name instead of being a value in the object, due to limitations in how Swagger 1.1 could model objects. While technically sufficient in communicating event information, it was really difficult to deal with in terms of client side JSON handling. This patch takes advantage of a proposed extension[1] to Swagger which allows type variance through the use of a discriminator field. This had a domino effect that made this a surprisingly large patch. [1]: https://groups.google.com/d/msg/wordnik-api/EC3rGajE0os/ey_5dBI_jWcJ In changing the models, I also had to change the swagger_model.py processor so it can handle the type discriminator and subtyping. I took that a big step forward, and using that information to generate an ari_model module, which can validate a JSON object against the Swagger model. The REST and WebSocket generators were changed to take advantage of the validators. If compiled with AST_DEVMODE enabled, JSON objects that don't match their corresponding models will not be sent out. For REST API calls, a 500 Internal Server response is sent. For WebSockets, the invalid JSON message is replaced with an error message. Since this took over about half of the job of the existing JSON generators, and the .to_json virtual function on messages took over the other half, I reluctantly removed the generators. The validators turned up all sorts of errors and inconsistencies in our data models, and the code. These were cleaned up, with checks in the code generator avoid some of the consistency problems in the future. * The model for a channel snapshot was trimmed down to match the information sent via AMI. Many of the field being sent were not useful in the general case. * The model for a bridge snapshot was updated to be more consistent with the other ARI models. Another impact of introducing subtyping was that the swagger-codegen documentation generator was insufficient (at least until it catches up with Swagger 1.2). I wanted it to be easier to generate docs for the API anyways, so I ported the wiki pages to use the Asterisk Swagger generator. In the process, I was able to clean up many of the model links, which would occasionally give inconsistent results on the wiki. I also added error responses to the wiki docs, making the wiki documentation more complete. Finally, since Stasis-HTTP will now be named Asterisk REST Interface (ARI), any new functions and files I created carry the ari_ prefix. I changed a few stasis_http references to ari where it was non-intrusive and made sense. (closes issue ASTERISK-21885) Review: https://reviewboard.asterisk.org/r/2639/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@393529 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-07-03 16:32:41 +00:00
#else
session = ast_ari_websocket_session_create(ws_session, NULL);
Update events to use Swagger 1.3 subtyping, and related aftermath This patch started with the simple idea of changing the /events data model to be more sane. The original model would send out events like: { "stasis_start": { "args": [], "channel": { ... } } } The event discriminator was the field name instead of being a value in the object, due to limitations in how Swagger 1.1 could model objects. While technically sufficient in communicating event information, it was really difficult to deal with in terms of client side JSON handling. This patch takes advantage of a proposed extension[1] to Swagger which allows type variance through the use of a discriminator field. This had a domino effect that made this a surprisingly large patch. [1]: https://groups.google.com/d/msg/wordnik-api/EC3rGajE0os/ey_5dBI_jWcJ In changing the models, I also had to change the swagger_model.py processor so it can handle the type discriminator and subtyping. I took that a big step forward, and using that information to generate an ari_model module, which can validate a JSON object against the Swagger model. The REST and WebSocket generators were changed to take advantage of the validators. If compiled with AST_DEVMODE enabled, JSON objects that don't match their corresponding models will not be sent out. For REST API calls, a 500 Internal Server response is sent. For WebSockets, the invalid JSON message is replaced with an error message. Since this took over about half of the job of the existing JSON generators, and the .to_json virtual function on messages took over the other half, I reluctantly removed the generators. The validators turned up all sorts of errors and inconsistencies in our data models, and the code. These were cleaned up, with checks in the code generator avoid some of the consistency problems in the future. * The model for a channel snapshot was trimmed down to match the information sent via AMI. Many of the field being sent were not useful in the general case. * The model for a bridge snapshot was updated to be more consistent with the other ARI models. Another impact of introducing subtyping was that the swagger-codegen documentation generator was insufficient (at least until it catches up with Swagger 1.2). I wanted it to be easier to generate docs for the API anyways, so I ported the wiki pages to use the Asterisk Swagger generator. In the process, I was able to clean up many of the model links, which would occasionally give inconsistent results on the wiki. I also added error responses to the wiki docs, making the wiki documentation more complete. Finally, since Stasis-HTTP will now be named Asterisk REST Interface (ARI), any new functions and files I created carry the ari_ prefix. I changed a few stasis_http references to ari where it was non-intrusive and made sense. (closes issue ASTERISK-21885) Review: https://reviewboard.asterisk.org/r/2639/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@393529 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-07-03 16:32:41 +00:00
#endif
if (!session) {
ast_log(LOG_ERROR, "Failed to create ARI session\n");
goto fin;
}
for (i = get_params; i; i = i->next) {
if (strcmp(i->name, "app") == 0) {
/* Parse comma separated list */
char *vals[MAX_VALS];
size_t j;
args.app_parse = ast_strdup(i->value);
if (!args.app_parse) {
ast_ari_response_alloc_failed(response);
goto fin;
}
ARI: WebSocket event cleanup Stasis events (which get distributed over the ARI WebSocket) are created by subscribing to the channel_all_cached and bridge_all_cached topics, filtering out events for channels/bridges currently subscribed to. There are two issues with that. First was a race condition, where messages in-flight to the master subscribe-to-all-things topic would get sent out, even though the events happened before the channel was put into Stasis. Secondly, as the number of channels and bridges grow in the system, the work spent filtering messages becomes excessive. Since r395954, individual channels and bridges have caching topics, and can be subscribed to individually. This patch takes advantage, so that channels and bridges are subscribed to on demand, instead of filtering the global topics. The one case where filtering is still required is handling BridgeMerge messages, which are published directly to the bridge_all topic. Other than the change to how subscriptions work, this patch mostly just moves code around. Most of the work generating JSON objects from messages was moved to .to_json handlers on the message types. The callback functions handling app subscriptions were moved from res_stasis (b/c they were global to the model) to stasis/app.c (b/c they are local to the app now). (closes issue ASTERISK-21969) Reported by: Matt Jordan Review: https://reviewboard.asterisk.org/r/2754/ ........ Merged revisions 397816 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@397820 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-08-27 19:19:36 +00:00
if (strlen(args.app_parse) == 0) {
/* ast_app_separate_args can't handle "" */
args.app_count = 1;
vals[0] = args.app_parse;
} else {
args.app_count = ast_app_separate_args(
args.app_parse, ',', vals,
ARRAY_LEN(vals));
}
if (args.app_count == 0) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (args.app_count >= MAX_VALS) {
ast_ari_response_error(response, 400,
"Bad Request",
"Too many values for app");
goto fin;
}
args.app = ast_malloc(sizeof(*args.app) * args.app_count);
if (!args.app) {
ast_ari_response_alloc_failed(response);
goto fin;
}
for (j = 0; j < args.app_count; ++j) {
args.app[j] = (vals[j]);
}
} else
if (strcmp(i->name, "subscribeAll") == 0) {
args.subscribe_all = ast_true(i->value);
} else
{}
}
ast_ari_websocket_events_event_websocket_established(session, headers, &args);
fin: __attribute__((unused))
if (response && response->response_code != 0) {
/* Param parsing failure */
ARI: WebSocket event cleanup Stasis events (which get distributed over the ARI WebSocket) are created by subscribing to the channel_all_cached and bridge_all_cached topics, filtering out events for channels/bridges currently subscribed to. There are two issues with that. First was a race condition, where messages in-flight to the master subscribe-to-all-things topic would get sent out, even though the events happened before the channel was put into Stasis. Secondly, as the number of channels and bridges grow in the system, the work spent filtering messages becomes excessive. Since r395954, individual channels and bridges have caching topics, and can be subscribed to individually. This patch takes advantage, so that channels and bridges are subscribed to on demand, instead of filtering the global topics. The one case where filtering is still required is handling BridgeMerge messages, which are published directly to the bridge_all topic. Other than the change to how subscriptions work, this patch mostly just moves code around. Most of the work generating JSON objects from messages was moved to .to_json handlers on the message types. The callback functions handling app subscriptions were moved from res_stasis (b/c they were global to the model) to stasis/app.c (b/c they are local to the app now). (closes issue ASTERISK-21969) Reported by: Matt Jordan Review: https://reviewboard.asterisk.org/r/2754/ ........ Merged revisions 397816 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@397820 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-08-27 19:19:36 +00:00
RAII_VAR(char *, msg, NULL, ast_json_free);
if (response->message) {
msg = ast_json_dump_string(response->message);
} else {
ARI: WebSocket event cleanup Stasis events (which get distributed over the ARI WebSocket) are created by subscribing to the channel_all_cached and bridge_all_cached topics, filtering out events for channels/bridges currently subscribed to. There are two issues with that. First was a race condition, where messages in-flight to the master subscribe-to-all-things topic would get sent out, even though the events happened before the channel was put into Stasis. Secondly, as the number of channels and bridges grow in the system, the work spent filtering messages becomes excessive. Since r395954, individual channels and bridges have caching topics, and can be subscribed to individually. This patch takes advantage, so that channels and bridges are subscribed to on demand, instead of filtering the global topics. The one case where filtering is still required is handling BridgeMerge messages, which are published directly to the bridge_all topic. Other than the change to how subscriptions work, this patch mostly just moves code around. Most of the work generating JSON objects from messages was moved to .to_json handlers on the message types. The callback functions handling app subscriptions were moved from res_stasis (b/c they were global to the model) to stasis/app.c (b/c they are local to the app now). (closes issue ASTERISK-21969) Reported by: Matt Jordan Review: https://reviewboard.asterisk.org/r/2754/ ........ Merged revisions 397816 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@397820 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-08-27 19:19:36 +00:00
ast_log(LOG_ERROR, "Missing response message\n");
}
if (msg) {
ast_websocket_write(ws_session,
AST_WEBSOCKET_OPCODE_TEXT, msg, strlen(msg));
}
}
ast_free(args.app_parse);
ast_free(args.app);
}
int ast_ari_events_user_event_parse_body(
struct ast_json *body,
struct ast_ari_events_user_event_args *args)
{
struct ast_json *field;
/* Parse query parameters out of it */
field = ast_json_object_get(body, "application");
if (field) {
args->application = ast_json_string_get(field);
}
field = ast_json_object_get(body, "source");
if (field) {
/* If they were silly enough to both pass in a query param and a
* JSON body, free up the query value.
*/
ast_free(args->source);
if (ast_json_typeof(field) == AST_JSON_ARRAY) {
/* Multiple param passed as array */
size_t i;
args->source_count = ast_json_array_size(field);
args->source = ast_malloc(sizeof(*args->source) * args->source_count);
if (!args->source) {
return -1;
}
for (i = 0; i < args->source_count; ++i) {
args->source[i] = ast_json_string_get(ast_json_array_get(field, i));
}
} else {
/* Multiple param passed as single value */
args->source_count = 1;
args->source = ast_malloc(sizeof(*args->source) * args->source_count);
if (!args->source) {
return -1;
}
args->source[0] = ast_json_string_get(field);
}
}
return 0;
}
/*!
* \brief Parameter parsing callback for /events/user/{eventName}.
* \param ser TCP/TLS session object
* \param get_params GET parameters in the HTTP request.
* \param path_vars Path variables extracted from the request.
* \param headers HTTP headers.
* \param body
* \param[out] response Response to the HTTP request.
*/
static void ast_ari_events_user_event_cb(
struct ast_tcptls_session_instance *ser,
struct ast_variable *get_params, struct ast_variable *path_vars,
struct ast_variable *headers, struct ast_json *body, struct ast_ari_response *response)
{
struct ast_ari_events_user_event_args args = {};
struct ast_variable *i;
#if defined(AST_DEVMODE)
int is_valid;
int code;
#endif /* AST_DEVMODE */
for (i = get_params; i; i = i->next) {
if (strcmp(i->name, "application") == 0) {
args.application = (i->value);
} else
if (strcmp(i->name, "source") == 0) {
/* Parse comma separated list */
char *vals[MAX_VALS];
size_t j;
args.source_parse = ast_strdup(i->value);
if (!args.source_parse) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (strlen(args.source_parse) == 0) {
/* ast_app_separate_args can't handle "" */
args.source_count = 1;
vals[0] = args.source_parse;
} else {
args.source_count = ast_app_separate_args(
args.source_parse, ',', vals,
ARRAY_LEN(vals));
}
if (args.source_count == 0) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (args.source_count >= MAX_VALS) {
ast_ari_response_error(response, 400,
"Bad Request",
"Too many values for source");
goto fin;
}
args.source = ast_malloc(sizeof(*args.source) * args.source_count);
if (!args.source) {
ast_ari_response_alloc_failed(response);
goto fin;
}
for (j = 0; j < args.source_count; ++j) {
args.source[j] = (vals[j]);
}
} else
{}
}
for (i = path_vars; i; i = i->next) {
if (strcmp(i->name, "eventName") == 0) {
args.event_name = (i->value);
} else
{}
}
args.variables = body;
ast_ari_events_user_event(headers, &args, response);
#if defined(AST_DEVMODE)
code = response->response_code;
switch (code) {
case 0: /* Implementation is still a stub, or the code wasn't set */
is_valid = response->message == NULL;
break;
case 500: /* Internal Server Error */
case 501: /* Not Implemented */
case 404: /* Application does not exist. */
case 422: /* Event source not found. */
case 400: /* Invalid even tsource URI or userevent data. */
is_valid = 1;
break;
default:
if (200 <= code && code <= 299) {
is_valid = ast_ari_validate_void(
response->message);
} else {
ast_log(LOG_ERROR, "Invalid error response %d for /events/user/{eventName}\n", code);
is_valid = 0;
}
}
if (!is_valid) {
ast_log(LOG_ERROR, "Response validation failed for /events/user/{eventName}\n");
ast_ari_response_error(response, 500,
"Internal Server Error", "Response validation failed");
}
#endif /* AST_DEVMODE */
fin: __attribute__((unused))
ast_free(args.source_parse);
ast_free(args.source);
return;
}
/*! \brief REST handler for /api-docs/events.json */
static struct stasis_rest_handlers events_user_eventName = {
.path_segment = "eventName",
.is_wildcard = 1,
.callbacks = {
[AST_HTTP_POST] = ast_ari_events_user_event_cb,
},
.num_children = 0,
.children = { }
};
/*! \brief REST handler for /api-docs/events.json */
static struct stasis_rest_handlers events_user = {
.path_segment = "user",
.callbacks = {
},
.num_children = 1,
.children = { &events_user_eventName, }
};
/*! \brief REST handler for /api-docs/events.json */
static struct stasis_rest_handlers events = {
.path_segment = "events",
.callbacks = {
},
.num_children = 1,
.children = { &events_user, }
};
static int unload_module(void)
{
ast_ari_remove_handler(&events);
ao2_cleanup(events.ws_server);
events.ws_server = NULL;
ast_ari_websocket_events_event_websocket_dtor();
return 0;
}
static int load_module(void)
{
int res = 0;
struct ast_websocket_protocol *protocol;
ARI: Channels added to Stasis application during WebSocket creation ... Prior to ASTERISK-24988, the WebSocket handshake was resolved before Stasis applications were registered. This was done such that the WebSocket would be ready when an application is registered. However, by creating the WebSocket first, the client had the ability to make requests for the Stasis application it thought had been created with the initial handshake request. The inevitable conclusion of this scenario was the cart being put before the horse. ASTERISK-24988 resolved half of the problem by ensuring that the applications were created and registered with Stasis prior to completing the handshake with the client. While this meant that Stasis was ready when the client received the green-light from Asterisk, it also meant that the WebSocket was not yet ready for Stasis to dispatch messages. This patch introduces a message queuing mechanism for delaying messages from Stasis applications while the WebSocket is being constructed. When the ARI event processor receives the message from the WebSocket that it is being created, the event processor instantiates an event session which contains a message queue. It then tries to create and register the requested applications with Stasis. Messages that are dispatched from Stasis between this point and the point at which the event processor is notified the WebSocket is ready, are stashed in the queue. Once the WebSocket has been built, the queue's messages are dispatched in the order in which they were originally received and the queue is concurrently cleared. ASTERISK-25181 #close Reported By: Matt Jordan Change-Id: Iafef7b85a2e0bf78c114db4c87ffc3d16d671a17
2015-07-31 16:27:23 +00:00
if (ast_ari_websocket_events_event_websocket_init() == -1) {
return AST_MODULE_LOAD_DECLINE;
}
events.ws_server = ast_websocket_server_create();
if (!events.ws_server) {
ast_ari_websocket_events_event_websocket_dtor();
return AST_MODULE_LOAD_DECLINE;
}
protocol = ast_websocket_sub_protocol_alloc("ari");
if (!protocol) {
ao2_ref(events.ws_server, -1);
events.ws_server = NULL;
ast_ari_websocket_events_event_websocket_dtor();
return AST_MODULE_LOAD_DECLINE;
}
protocol->session_attempted = ast_ari_events_event_websocket_ws_attempted_cb;
protocol->session_established = ast_ari_events_event_websocket_ws_established_cb;
res |= ast_websocket_server_add_protocol2(events.ws_server, protocol);
res |= ast_ari_add_handler(&events);
if (res) {
unload_module();
return AST_MODULE_LOAD_DECLINE;
}
return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_DEFAULT, "RESTful API module - WebSocket resource",
.support_level = AST_MODULE_SUPPORT_CORE,
.load = load_module,
.unload = unload_module,
.requires = "res_ari,res_ari_model,res_stasis,res_http_websocket",
);