/* * Asterisk -- An open source telephony toolkit. * * Copyright (C) 2013, Digium, Inc. * * Kinsey Moore * * See http://www.asterisk.org for more information about * the Asterisk project. Please do not directly contact * any of the maintainers of this project for assistance; * the project provides a web site, mailing lists and IRC * channels for your use. * * This program is free software, distributed under the terms of * the GNU General Public License Version 2. See the LICENSE file * at the top of the source tree. */ /*! \file * * \brief Stasis Messages and Data Types for Bridge Objects * * \author Kinsey Moore */ /*** MODULEINFO core ***/ #include "asterisk.h" ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/stasis.h" #include "asterisk/channel.h" #include "asterisk/stasis_bridging.h" #include "asterisk/stasis_channels.h" #include "asterisk/bridging.h" #include "asterisk/bridging_technology.h" #define SNAPSHOT_CHANNELS_BUCKETS 13 /*! * @{ \brief Define bridge message types. */ STASIS_MESSAGE_TYPE_DEFN(ast_bridge_snapshot_type); STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type); STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type); STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type); /*! @} */ /*! \brief Aggregate topic for bridge messages */ static struct stasis_topic *bridge_topic_all; /*! \brief Caching aggregate topic for bridge snapshots */ static struct stasis_caching_topic *bridge_topic_all_cached; /*! \brief Topic pool for individual bridge topics */ static struct stasis_topic_pool *bridge_topic_pool; /*! \brief Destructor for bridge snapshots */ static void bridge_snapshot_dtor(void *obj) { struct ast_bridge_snapshot *snapshot = obj; ast_string_field_free_memory(snapshot); ao2_cleanup(snapshot->channels); snapshot->channels = NULL; } struct ast_bridge_snapshot *ast_bridge_snapshot_create(struct ast_bridge *bridge) { RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup); struct ast_bridge_channel *bridge_channel; snapshot = ao2_alloc(sizeof(*snapshot), bridge_snapshot_dtor); if (!snapshot || ast_string_field_init(snapshot, 128)) { return NULL; } snapshot->channels = ast_str_container_alloc(SNAPSHOT_CHANNELS_BUCKETS); if (!snapshot->channels) { return NULL; } AST_LIST_TRAVERSE(&bridge->channels, bridge_channel, entry) { if (ast_str_container_add(snapshot->channels, ast_channel_uniqueid(bridge_channel->chan))) { return NULL; } } ast_string_field_set(snapshot, uniqueid, bridge->uniqueid); ast_string_field_set(snapshot, technology, bridge->technology->name); ast_string_field_set(snapshot, subclass, bridge->v_table->name); snapshot->feature_flags = bridge->feature_flags; snapshot->capabilities = bridge->technology->capabilities; snapshot->num_channels = bridge->num_channels; snapshot->num_active = bridge->num_active; ao2_ref(snapshot, +1); return snapshot; } struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge) { struct stasis_topic *bridge_topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid); if (!bridge_topic) { return ast_bridge_topic_all(); } return bridge_topic; } struct stasis_topic *ast_bridge_topic_all(void) { return bridge_topic_all; } struct stasis_caching_topic *ast_bridge_topic_all_cached(void) { return bridge_topic_all_cached; } void ast_bridge_publish_state(struct ast_bridge *bridge) { RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); ast_assert(bridge != NULL); snapshot = ast_bridge_snapshot_create(bridge); if (!snapshot) { return; } msg = stasis_message_create(ast_bridge_snapshot_type(), snapshot); if (!msg) { return; } stasis_publish(ast_bridge_topic(bridge), msg); } static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj) { RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); ast_assert(obj != NULL); msg = stasis_message_create(ast_bridge_snapshot_type(), obj->bridge); if (!msg) { return; } stasis_publish(stasis_topic_pool_get_topic(bridge_topic_pool, obj->bridge->uniqueid), msg); } /*! \brief Destructor for bridge merge messages */ static void bridge_merge_message_dtor(void *obj) { struct ast_bridge_merge_message *msg = obj; ao2_cleanup(msg->to); msg->to = NULL; ao2_cleanup(msg->from); msg->from = NULL; } /*! \brief Bridge merge message creation helper */ static struct ast_bridge_merge_message *bridge_merge_message_create(struct ast_bridge *to, struct ast_bridge *from) { RAII_VAR(struct ast_bridge_merge_message *, msg, NULL, ao2_cleanup); msg = ao2_alloc(sizeof(*msg), bridge_merge_message_dtor); if (!msg) { return NULL; } msg->to = ast_bridge_snapshot_create(to); if (!msg->to) { return NULL; } msg->from = ast_bridge_snapshot_create(from); if (!msg->from) { return NULL; } ao2_ref(msg, +1); return msg; } void ast_bridge_publish_merge(struct ast_bridge *to, struct ast_bridge *from) { RAII_VAR(struct ast_bridge_merge_message *, merge_msg, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); ast_assert(to != NULL); ast_assert(from != NULL); merge_msg = bridge_merge_message_create(to, from); if (!merge_msg) { return; } msg = stasis_message_create(ast_bridge_merge_message_type(), merge_msg); if (!msg) { return; } stasis_publish(ast_bridge_topic_all(), msg); } static void bridge_blob_dtor(void *obj) { struct ast_bridge_blob *event = obj; ao2_cleanup(event->bridge); event->bridge = NULL; ao2_cleanup(event->channel); event->channel = NULL; ast_json_unref(event->blob); event->blob = NULL; } struct stasis_message *ast_bridge_blob_create( struct stasis_message_type *message_type, struct ast_bridge *bridge, struct ast_channel *chan, struct ast_json *blob) { RAII_VAR(struct ast_bridge_blob *, obj, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); obj = ao2_alloc(sizeof(*obj), bridge_blob_dtor); if (!obj) { return NULL; } if (bridge) { obj->bridge = ast_bridge_snapshot_create(bridge); if (obj->bridge == NULL) { return NULL; } } if (chan) { obj->channel = ast_channel_snapshot_create(chan); if (obj->channel == NULL) { return NULL; } } if (blob) { obj->blob = ast_json_ref(blob); } msg = stasis_message_create(message_type, obj); if (!msg) { return NULL; } ao2_ref(msg, +1); return msg; } void ast_bridge_publish_enter(struct ast_bridge *bridge, struct ast_channel *chan) { RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); msg = ast_bridge_blob_create(ast_channel_entered_bridge_type(), bridge, chan, NULL); if (!msg) { return; } /* enter blob first, then state */ stasis_publish(ast_bridge_topic(bridge), msg); bridge_publish_state_from_blob(stasis_message_data(msg)); } void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *chan) { RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); msg = ast_bridge_blob_create(ast_channel_left_bridge_type(), bridge, chan, NULL); if (!msg) { return; } /* state first, then leave blob (opposite of enter, preserves nesting of events) */ bridge_publish_state_from_blob(stasis_message_data(msg)); stasis_publish(ast_bridge_topic(bridge), msg); } typedef struct ast_json *(*json_item_serializer_cb)(void *obj); static struct ast_json *container_to_json_array(struct ao2_container *items, json_item_serializer_cb item_cb) { RAII_VAR(struct ast_json *, json_items, ast_json_array_create(), ast_json_unref); void *item; struct ao2_iterator it; if (!json_items) { return NULL; } it = ao2_iterator_init(items, 0); while ((item = ao2_iterator_next(&it))) { if (ast_json_array_append(json_items, item_cb(item))) { ao2_iterator_destroy(&it); return NULL; } } ao2_iterator_destroy(&it); return ast_json_ref(json_items); } static const char *capability2str(uint32_t capabilities) { if (capabilities & AST_BRIDGE_CAPABILITY_HOLDING) { return "holding"; } else { return "mixing"; } } struct ast_json *ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot) { RAII_VAR(struct ast_json *, json_bridge, NULL, ast_json_unref); struct ast_json *json_channels; if (snapshot == NULL) { return NULL; } json_channels = container_to_json_array(snapshot->channels, (json_item_serializer_cb)ast_json_string_create); if (!json_channels) { return NULL; } json_bridge = ast_json_pack("{s: s, s: s, s: s, s: s, s: o}", "bridgeUniqueid", snapshot->uniqueid, "bridgeTechnology", snapshot->technology, "bridgeType", capability2str(snapshot->capabilities), "bridgeClass", snapshot->subclass, "channels", json_channels); if (!json_bridge) { return NULL; } return ast_json_ref(json_bridge); } struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid) { RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); struct ast_bridge_snapshot *snapshot; ast_assert(!ast_strlen_zero(uniqueid)); message = stasis_cache_get(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type(), uniqueid); if (!message) { return NULL; } snapshot = stasis_message_data(message); if (!snapshot) { return NULL; } ao2_ref(snapshot, +1); return snapshot; } static void stasis_bridging_cleanup(void) { ao2_cleanup(bridge_topic_all); bridge_topic_all = NULL; bridge_topic_all_cached = stasis_caching_unsubscribe_and_join( bridge_topic_all_cached); ao2_cleanup(bridge_topic_pool); bridge_topic_pool = NULL; STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type); } /*! \brief snapshot ID getter for caching topic */ static const char *bridge_snapshot_get_id(struct stasis_message *msg) { struct ast_bridge_snapshot *snapshot; if (stasis_message_type(msg) != ast_bridge_snapshot_type()) { return NULL; } snapshot = stasis_message_data(msg); return snapshot->uniqueid; } int ast_stasis_bridging_init(void) { ast_register_cleanup(stasis_bridging_cleanup); STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type); STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type); STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type); STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type); bridge_topic_all = stasis_topic_create("ast_bridge_topic_all"); bridge_topic_all_cached = stasis_caching_topic_create(bridge_topic_all, bridge_snapshot_get_id); bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all); return !bridge_topic_all || !bridge_topic_all_cached || !bridge_topic_pool ? -1 : 0; }