res/res_stasis_device_state: Allow for subscribing to 'all' device state

This patch adds support for subscribing to all device state changes. This is
done either by subscribing to an empty device, e.g., 'eventSource=deviceState:',
or by the WebSocket connection specifying that it wants all state in the
system.

ASTERISK-24870

Change-Id: I9cfeca1c9e2231bd7ea73e45919111d44d2eda32
This commit is contained in:
Matt Jordan 2015-09-04 12:24:57 -05:00
parent 5206aa9d30
commit 47813cc51c
1 changed files with 44 additions and 10 deletions

View File

@ -44,6 +44,9 @@ ASTERISK_REGISTER_FILE()
/*! Number of hash buckets for device state subscriptions */
#define DEVICE_STATE_BUCKETS 37
/*! The key used for tracking a subscription to all device states */
#define DEVICE_STATE_ALL "__AST_DEVICE_STATE_ALL_TOPIC"
/*! Container for subscribed device states */
static struct ao2_container *device_state_subscriptions;
@ -112,11 +115,17 @@ static void device_state_subscription_destroy(void *obj)
static struct device_state_subscription *device_state_subscription_create(
const struct stasis_app *app, const char *device_name)
{
struct device_state_subscription *sub = ao2_alloc(
sizeof(*sub), device_state_subscription_destroy);
struct device_state_subscription *sub;
const char *app_name = stasis_app_name(app);
size_t size = strlen(device_name) + strlen(app_name) + 2;
size_t size;
if (ast_strlen_zero(device_name)) {
device_name = DEVICE_STATE_ALL;
}
size = strlen(device_name) + strlen(app_name) + 2;
sub = ao2_alloc(sizeof(*sub), device_state_subscription_destroy);
if (!sub) {
return NULL;
}
@ -314,25 +323,50 @@ static void *find_device_state(const struct stasis_app *app, const char *name)
static int is_subscribed_device_state(struct stasis_app *app, const char *name)
{
RAII_VAR(struct device_state_subscription *, sub,
find_device_state_subscription(app, name), ao2_cleanup);
return sub != NULL;
struct device_state_subscription *sub;
sub = find_device_state_subscription(app, DEVICE_STATE_ALL);
if (sub) {
ao2_ref(sub, -1);
return 1;
}
sub = find_device_state_subscription(app, name);
if (sub) {
ao2_ref(sub, -1);
return 1;
}
return 0;
}
static int subscribe_device_state(struct stasis_app *app, void *obj)
{
struct device_state_subscription *sub = obj;
struct stasis_topic *topic;
ast_debug(3, "Subscribing to device %s", sub->device_name);
if (!sub) {
sub = device_state_subscription_create(app, NULL);
if (!sub) {
return -1;
}
}
if (strcmp(sub->device_name, DEVICE_STATE_ALL)) {
topic = ast_device_state_topic(sub->device_name);
} else {
topic = ast_device_state_topic_all();
}
if (is_subscribed_device_state(app, sub->device_name)) {
ast_debug(3, "App %s is already subscribed to %s\n", stasis_app_name(app), sub->device_name);
return 0;
}
if (!(sub->sub = stasis_subscribe_pool(
ast_device_state_topic(sub->device_name),
device_state_cb, sub))) {
ast_debug(3, "Subscribing to device %s\n", sub->device_name);
sub->sub = stasis_subscribe_pool(topic, device_state_cb, sub);
if (!sub->sub) {
ast_log(LOG_ERROR, "Unable to subscribe to device %s\n",
sub->device_name);
return -1;