From e1b959ccbb4e47421b37a0f75a2bf89ccd34dcb1 Mon Sep 17 00:00:00 2001 From: "David M. Lee" Date: Thu, 1 Aug 2013 13:49:34 +0000 Subject: [PATCH] Split caching out from the stasis_caching_topic. In working with res_stasis, I discovered a significant limitation to the current structure of stasis_caching_topics: you cannot subscribe to cache updates for a single channel/bridge/endpoint/etc. To address this, this patch splits the cache away from the stasis_caching_topic, making it a first class object. The stasis_cache object is shared amongst individual stasis_caching_topics that are created per channel/endpoint/etc. These are still forwarded to global whatever_all_cached topics, so their use from most of the code does not change. In making these changes, I noticed that we frequently used a similar pattern for bridges, endpoints and channels: single_topic ----------------> all_topic ^ | single_topic_cached ----+----> all_topic_cached | +----> cache This pattern was extracted as the 'Stasis Caching Pattern', defined in stasis_caching_pattern.h. This avoids a lot of duplicate code between the different domain objects. Since the cache is now disassociated from its upstream caching topics, this also necessitated a change to how the 'guaranteed' flag worked for retrieving from a cache. The code for handling the caching guarantee was extracted into a 'stasis_topic_wait' function, which works for any stasis_topic. (closes issue ASTERISK-22002) Review: https://reviewboard.asterisk.org/r/2672/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@395954 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- apps/app_meetme.c | 2 +- apps/app_voicemail.c | 2 +- apps/confbridge/confbridge_manager.c | 4 +- channels/chan_dahdi.c | 2 +- channels/chan_iax2.c | 2 +- channels/chan_mgcp.c | 2 +- channels/chan_sip.c | 2 +- channels/chan_unistim.c | 2 +- channels/sig_pri.c | 2 +- include/asterisk/app.h | 8 +- include/asterisk/bridge.h | 2 + include/asterisk/channel.h | 15 ++ include/asterisk/channel_internal.h | 2 +- include/asterisk/devicestate.h | 9 +- include/asterisk/presencestate.h | 9 +- include/asterisk/stasis.h | 134 +++++++++++------ include/asterisk/stasis_bridges.h | 34 ++++- include/asterisk/stasis_cache_pattern.h | 153 +++++++++++++++++++ include/asterisk/stasis_channels.h | 23 ++- include/asterisk/stasis_endpoints.h | 35 ++++- main/app.c | 18 ++- main/bridge.c | 18 ++- main/cdr.c | 4 +- main/cel.c | 4 +- main/channel_internal_api.c | 44 ++++-- main/cli.c | 6 +- main/devicestate.c | 26 +++- main/endpoints.c | 53 ++++--- main/manager.c | 6 +- main/manager_bridges.c | 8 +- main/manager_channels.c | 2 +- main/manager_endpoints.c | 8 +- main/pbx.c | 2 +- main/presencestate.c | 21 ++- main/stasis.c | 5 + main/stasis_bridges.c | 141 ++++++++++-------- main/stasis_cache.c | 177 ++++++++++------------ main/stasis_cache_pattern.c | 189 ++++++++++++++++++++++++ main/stasis_channels.c | 109 +++++++++----- main/stasis_endpoints.c | 86 +++++------ main/stasis_wait.c | 133 +++++++++++++++++ res/ari/resource_bridges.c | 10 +- res/ari/resource_channels.c | 18 +-- res/ari/resource_endpoints.c | 20 +-- res/res_agi.c | 2 +- res/res_chan_stats.c | 2 +- res/res_jabber.c | 2 +- res/res_stasis.c | 4 +- res/res_xmpp.c | 2 +- res/stasis/control.c | 6 +- tests/test_cel.c | 5 +- tests/test_devicestate.c | 12 +- tests/test_stasis.c | 38 +++-- tests/test_stasis_endpoints.c | 2 +- 54 files changed, 1179 insertions(+), 448 deletions(-) create mode 100644 include/asterisk/stasis_cache_pattern.h create mode 100644 main/stasis_cache_pattern.c create mode 100644 main/stasis_wait.c diff --git a/apps/app_meetme.c b/apps/app_meetme.c index 0dee926476..5bb8035be1 100644 --- a/apps/app_meetme.c +++ b/apps/app_meetme.c @@ -1167,7 +1167,7 @@ static int meetme_stasis_init(void) STASIS_MESSAGE_TYPE_INIT(meetme_talk_request_type); meetme_event_message_router = stasis_message_router_create( - stasis_caching_get_topic(ast_channel_topic_all_cached())); + ast_channel_cache()); if (!meetme_event_message_router) { meetme_stasis_cleanup(); diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c index df7f1b2ddb..3696c740c6 100644 --- a/apps/app_voicemail.c +++ b/apps/app_voicemail.c @@ -12640,7 +12640,7 @@ static void start_poll_thread(void) mwi_sub_sub = stasis_subscribe(ast_mwi_topic_all(), mwi_event_cb, NULL); if (mwi_sub_sub) { - struct ao2_container *cached = stasis_cache_dump(ast_mwi_topic_cached(), stasis_subscription_change_type()); + struct ao2_container *cached = stasis_cache_dump(ast_mwi_state_cache(), stasis_subscription_change_type()); if (cached) { ao2_callback(cached, OBJ_MULTIPLE | OBJ_NODATA, dump_cache, NULL); } diff --git a/apps/confbridge/confbridge_manager.c b/apps/confbridge/confbridge_manager.c index 41314343f0..10503ff486 100644 --- a/apps/confbridge/confbridge_manager.c +++ b/apps/confbridge/confbridge_manager.c @@ -344,7 +344,7 @@ int manager_confbridge_init(void) STASIS_MESSAGE_TYPE_INIT(confbridge_talking_type); bridge_state_router = stasis_message_router_create( - stasis_caching_get_topic(ast_bridge_topic_all_cached())); + ast_bridge_topic_all_cached()); if (!bridge_state_router) { return -1; @@ -415,7 +415,7 @@ int manager_confbridge_init(void) } channel_state_router = stasis_message_router_create( - stasis_caching_get_topic(ast_channel_topic_all_cached())); + ast_channel_topic_all_cached()); if (!channel_state_router) { manager_confbridge_shutdown(); diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c index 1adf2930e1..3c29b42ebb 100644 --- a/channels/chan_dahdi.c +++ b/channels/chan_dahdi.c @@ -4822,7 +4822,7 @@ static int has_voicemail(struct dahdi_pvt *p) } ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context); - mwi_message = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid)); + mwi_message = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid)); if (mwi_message) { struct ast_mwi_state *mwi_state = stasis_message_data(mwi_message); diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index ac88b513f5..26ca36d75a 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -8803,7 +8803,7 @@ static int update_registry(struct sockaddr_in *sin, int callno, char *devtype, i } ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context); - msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid)); + msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid)); if (msg) { struct ast_mwi_state *mwi_state = stasis_message_data(msg); diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c index 7eeb984566..79208daa0d 100644 --- a/channels/chan_mgcp.c +++ b/channels/chan_mgcp.c @@ -508,7 +508,7 @@ static int has_voicemail(struct mgcp_endpoint *p) ast_str_set(&uniqueid, 0, "%s@%s", mbox, cntx); - msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid)); + msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid)); if (msg) { struct ast_mwi_state *mwi_state = stasis_message_data(msg); diff --git a/channels/chan_sip.c b/channels/chan_sip.c index c0e4a62fd7..9215b5af89 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -28386,7 +28386,7 @@ static int get_cached_mwi(struct sip_peer *peer, int *new, int *old) ast_str_reset(uniqueid); ast_str_set(&uniqueid, 0, "%s@%s", mailbox->mailbox, S_OR(mailbox->context, "default")); - msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid)); + msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid)); if (!msg) { continue; } diff --git a/channels/chan_unistim.c b/channels/chan_unistim.c index 661ffd454c..fd0b407172 100644 --- a/channels/chan_unistim.c +++ b/channels/chan_unistim.c @@ -5502,7 +5502,7 @@ static int unistim_send_mwi_to_peer(struct unistim_line *peer, unsigned int tick ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context); - msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid)); + msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid)); if (msg) { struct ast_mwi_state *mwi_state = stasis_message_data(msg); diff --git a/channels/sig_pri.c b/channels/sig_pri.c index f281f44985..e24752b9fa 100644 --- a/channels/sig_pri.c +++ b/channels/sig_pri.c @@ -8956,7 +8956,7 @@ static void sig_pri_mwi_cache_update(struct sig_pri_span *pri) ast_str_reset(uniqueid); ast_str_set(&uniqueid, 0, "%s@%s", pri->mbox[idx].number, pri->mbox[idx].context); - msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid)); + msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid)); if (!msg) { /* No cached event for this mailbox. */ continue; diff --git a/include/asterisk/app.h b/include/asterisk/app.h index 91438a2d0f..251288546e 100644 --- a/include/asterisk/app.h +++ b/include/asterisk/app.h @@ -1251,7 +1251,13 @@ struct stasis_topic *ast_mwi_topic(const char *uniqueid); * \retval NULL if it has not been allocated * \since 12 */ -struct stasis_caching_topic *ast_mwi_topic_cached(void); +struct stasis_topic *ast_mwi_topic_cached(void); + +/*! + * \brief Backend cache for ast_mwi_topic_cached(). + * \retval Cache of \ref ast_mwi_state. + */ +struct stasis_cache *ast_mwi_state_cache(void); /*! * \brief Get the \ref stasis message type for MWI messages diff --git a/include/asterisk/bridge.h b/include/asterisk/bridge.h index a920a74e40..5f22d37ef1 100644 --- a/include/asterisk/bridge.h +++ b/include/asterisk/bridge.h @@ -277,6 +277,8 @@ struct ast_bridge { struct ast_bridge_technology *technology; /*! Private information unique to the bridge technology */ void *tech_pvt; + /*! Per-bridge topics */ + struct stasis_cp_single *topics; /*! Call ID associated with the bridge */ struct ast_callid *callid; /*! Linked list of channels participating in the bridge */ diff --git a/include/asterisk/channel.h b/include/asterisk/channel.h index f6cdd4e628..282230a02d 100644 --- a/include/asterisk/channel.h +++ b/include/asterisk/channel.h @@ -4185,6 +4185,21 @@ struct varshead *ast_channel_get_vars(struct ast_channel *chan); */ struct stasis_topic *ast_channel_topic(struct ast_channel *chan); +/*! + * \since 12 + * \brief A topic which publishes the events for a particular channel. + * + * \ref ast_channel_snapshot messages are replaced with \ref stasis_cache_update + * + * If the given \a chan is \c NULL, ast_channel_topic_all_cached() is returned. + * + * \param chan Channel, or \c NULL. + * + * \retval Topic for channel's events. + * \retval ast_channel_topic_all() if \a chan is \c NULL. + */ +struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan); + /*! * \brief Get the bridge associated with a channel * \since 12.0.0 diff --git a/include/asterisk/channel_internal.h b/include/asterisk/channel_internal.h index a54a1b8483..c94cc46f74 100644 --- a/include/asterisk/channel_internal.h +++ b/include/asterisk/channel_internal.h @@ -23,5 +23,5 @@ struct ast_channel *__ast_channel_internal_alloc(void (*destructor)(void *obj), void ast_channel_internal_finalize(struct ast_channel *chan); int ast_channel_internal_is_finalized(struct ast_channel *chan); void ast_channel_internal_cleanup(struct ast_channel *chan); -void ast_channel_internal_setup_topics(struct ast_channel *chan); +int ast_channel_internal_setup_topics(struct ast_channel *chan); diff --git a/include/asterisk/devicestate.h b/include/asterisk/devicestate.h index 2b3353ffdb..cda8a5d98c 100644 --- a/include/asterisk/devicestate.h +++ b/include/asterisk/devicestate.h @@ -307,7 +307,14 @@ struct stasis_topic *ast_device_state_topic(const char *device); * \retval NULL if it has not been allocated * \since 12 */ -struct stasis_caching_topic *ast_device_state_topic_cached(void); +struct stasis_topic *ast_device_state_topic_cached(void); + +/*! + * \brief Backend cache for ast_device_state_topic_cached() + * \retval Cache of \ref ast_device_state_message. + * \since 12 + */ +struct stasis_cache *ast_device_state_cache(void); /*! * \brief Get the Stasis message type for device state messages diff --git a/include/asterisk/presencestate.h b/include/asterisk/presencestate.h index 5cacf8698c..1cdf73eda9 100644 --- a/include/asterisk/presencestate.h +++ b/include/asterisk/presencestate.h @@ -168,7 +168,14 @@ struct stasis_topic *ast_presence_state_topic_all(void); * \retval Caching Stasis topic for presence state messages * \since 12 */ -struct stasis_caching_topic *ast_presence_state_topic_cached(void); +struct stasis_topic *ast_presence_state_topic_cached(void); + +/*! + * \brief Backend cache for ast_presence_state_topic_cached() + * \retval Cache of \ref ast_presence_state_message. + * \since 12 + */ +struct stasis_cache *ast_presence_state_cache(void); /*! * \brief Stasis message payload representing a presence state update diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index fd60724c08..3b9cec34f2 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -94,13 +94,24 @@ * in the system, and it's desirable to query that state from the cache without * locking the original object. It's also desirable for subscribers of the * caching topic to receive messages that have both the old cache value and the - * new value being put into the cache. For this, we have - * stasis_caching_topic_create(), providing it with the topic which publishes - * the messages that you wish to cache, and a function that can identify - * cacheable messages. + * new value being put into the cache. For this, we have stasis_cache_create() + * and stasis_caching_topic_create(), providing them with the topic which + * publishes the messages that you wish to cache, and a function that can + * identify cacheable messages. * - * The returned \ref stasis_caching_topic provides a topic that forwards - * non-cacheable messages unchanged. A cacheable message is wrapped in a \ref + * The \ref stasis_cache is designed so that it may be shared amongst several + * \ref stasis_caching_topic objects. This allows you to have individual caching + * topics per-object (i.e. so you can subscribe to updates for a single object), + * and still have a single cache to query for the state of all objects. While a + * cache may be shared amongst different message types, such a usage is probably + * not a good idea. + * + * The \ref stasis_cache can only be written to by \ref stasis_caching_topics. + * It's a thread safe container, so freely use the stasis_cache_get() and + * stasis_cache_dump() to query the cache. + * + * The \ref stasis_caching_topic provides a topic that forwards non-cacheable + * messages unchanged. A cacheable message is wrapped in a \ref * stasis_cache_update message which provides the old snapshot (or \c NULL if * this is a new cache entry), and the new snapshot (or \c NULL if the entry was * removed from the cache). A stasis_cache_clear_create() message must be sent @@ -111,6 +122,9 @@ * stasis_caching_topic will not be freed until after it has been unsubscribed, * and all other ao2_ref()'s have been cleaned up. * + * The \ref stasis_cache object is a normal AO2 managed object, which can be + * release with ao2_cleanup(). + * * \par stasis_subscriber * * Any topic may be subscribed to by simply providing stasis_subscribe() the @@ -345,6 +359,15 @@ void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message); +/*! + * \brief Wait for all pending messages on a given topic to be processed. + * \param topic Topic to await pending messages on. + * \return 0 on success. + * \return Non-zero on error. + * \since 12 + */ +int stasis_topic_wait(struct stasis_topic *topic); + /*! @} */ /*! @{ */ @@ -514,6 +537,8 @@ struct stasis_message_type *stasis_subscription_change_type(void); /*! @} */ +/*! @{ */ + /*! * \brief Pool for topic aggregation */ @@ -574,23 +599,18 @@ struct stasis_message_type *stasis_cache_clear_type(void); /*! @{ */ +/*! + * \brief A message cache, for use with \ref stasis_caching_topic. + * \since 12 + */ +struct stasis_cache; + /*! * \brief A topic wrapper, which caches certain messages. * \since 12 */ struct stasis_caching_topic; -/*! - * \brief A message which instructs the caching topic to remove an entry from its cache. - * - * \param message Message representative of the cache entry that should be cleared. - * This will become the data held in the stasis_cache_clear message. - * - * \return Message which, when sent to the \a topic, will clear the item from the cache. - * \return \c NULL on error. - * \since 12 - */ -struct stasis_message *stasis_cache_clear_create(struct stasis_message *message); /*! * \brief Callback extract a unique identity from a snapshot message. @@ -605,6 +625,21 @@ struct stasis_message *stasis_cache_clear_create(struct stasis_message *message) */ typedef const char *(*snapshot_get_id)(struct stasis_message *message); +/*! + * \brief Create a cache. + * + * This is the backend store for a \ref stasis_caching_topic. The cache is + * thread safe, allowing concurrent reads and writes. + * + * The returned object is AO2 managed, so ao2_cleanup() when you're done. + * + * \param id_fn Callback to extract the id from a snapshot message. + * \return New cache indexed by \a id_fn. + * \return \c NULL on error + * \since 12 + */ +struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn); + /*! * \brief Create a topic which monitors and caches messages from another topic. * @@ -613,13 +648,17 @@ typedef const char *(*snapshot_get_id)(struct stasis_message *message); * is updated, and a stasis_cache_update() message is forwarded, which has both * the original snapshot message and the new message. * + * The returned object is AO2 managed, so ao2_cleanup() when done with it. + * * \param original_topic Topic publishing snapshot messages. - * \param id_fn Callback to extract the id from a snapshot message. + * \param cache Backend cache in which to keep snapshots. * \return New topic which changes snapshot messages to stasis_cache_update() * messages, and forwards all other messages from the original topic. + * \return \c NULL on error * \since 12 */ -struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn); +struct stasis_caching_topic *stasis_caching_topic_create( + struct stasis_topic *original_topic, struct stasis_cache *cache); /*! * \brief Unsubscribes a caching topic from its upstream topic. @@ -651,53 +690,55 @@ struct stasis_caching_topic *stasis_caching_unsubscribe_and_join( /*! * \brief Returns the topic of cached events from a caching topics. * \param caching_topic The caching topic. - * \return The topic that publishes cache update events, along with passthrough events - * from the underlying topic. + * \return The topic that publishes cache update events, along with passthrough + * events from the underlying topic. * \return \c NULL if \a caching_topic is \c NULL. * \since 12 */ -struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic); +struct stasis_topic *stasis_caching_get_topic( + struct stasis_caching_topic *caching_topic); + +/*! + * \brief A message which instructs the caching topic to remove an entry from + * its cache. + * + * \param message Message representative of the cache entry that should be + * cleared. This will become the data held in the + * stasis_cache_clear message. + * + * \return Message which, when sent to a \ref stasis_caching_topic, will clear + * the item from the cache. + * \return \c NULL on error. + * \since 12 + */ +struct stasis_message *stasis_cache_clear_create(struct stasis_message *message); /*! * \brief Retrieve an item from the cache. * * The returned item is AO2 managed, so ao2_cleanup() when you're done with it. * - * \param caching_topic The topic returned from stasis_caching_topic_create(). + * \param cache The cache to query. * \param type Type of message to retrieve. * \param id Identity of the snapshot to retrieve. * \return Message from the cache. * \return \c NULL if message is not found. * \since 12 */ -#define stasis_cache_get(caching_topic, type, id) stasis_cache_get_extended(caching_topic, type, id, 0) - -/*! - * \brief Retrieve an item from the cache. - * \param caching_topic The topic returned from stasis_caching_topic_create(). - * \param type Type of message to retrieve. - * \param id Identity of the snapshot to retrieve. - * \param guaranteed If set to 1 it is guaranteed that any pending messages have been processed. - * \return Message from the cache. The cache still owns the message, so - * ao2_ref() if you want to keep it. - * \return \c NULL if message is not found. - * \since 12 - */ -struct stasis_message *stasis_cache_get_extended(struct stasis_caching_topic *caching_topic, - struct stasis_message_type *type, - const char *id, - unsigned int guaranteed); +struct stasis_message *stasis_cache_get( + struct stasis_cache *cache, struct stasis_message_type *type, + const char *id); /*! * \brief Dump cached items to a subscription - * \param caching_topic The topic returned from stasis_caching_topic_create(). + * \param cache The cache to query. * \param type Type of message to dump (any type if \c NULL). * \return ao2_container containing all matches (must be unreffed by caller) * \return \c NULL on allocation error * \since 12 */ -struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, - struct stasis_message_type *type); +struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, + struct stasis_message_type *type); /*! @} */ @@ -831,13 +872,18 @@ int stasis_cache_init(void); /*! * \internal - * \brief called by stasis_init for config initialization. + * \brief called by stasis_init() for config initialization. * \return 0 on success. * \return Non-zero on error. * \since 12 */ int stasis_config_init(void); +/*! + * \internal + */ +int stasis_wait_init(void); + struct ast_threadpool_options; /*! diff --git a/include/asterisk/stasis_bridges.h b/include/asterisk/stasis_bridges.h index 973b00c476..7fa059fedb 100644 --- a/include/asterisk/stasis_bridges.h +++ b/include/asterisk/stasis_bridges.h @@ -89,6 +89,22 @@ struct stasis_message_type *ast_bridge_snapshot_type(void); */ struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge); +/*! + * \since 12 + * \brief A topic which publishes the events for a particular bridge. + * + * \ref ast_bridge_snapshot messages are replaced with stasis_cache_update + * messages. + * + * If the given \a bridge is \c NULL, ast_bridge_topic_all_cached() is returned. + * + * \param bridge Bridge for which to get a topic or \c NULL. + * + * \retval Topic for bridge's events. + * \retval ast_bridge_topic_all() if \a bridge is \c NULL. + */ +struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge); + /*! * \since 12 * \brief A topic which publishes the events for all bridges. @@ -103,7 +119,14 @@ struct stasis_topic *ast_bridge_topic_all(void); * * \retval Caching topic for all bridge events. */ -struct stasis_caching_topic *ast_bridge_topic_all_cached(void); +struct stasis_topic *ast_bridge_topic_all_cached(void); + +/*! + * \since 12 + * \brief Backend cache for ast_bridge_topic_all_cached(). + * \retval Cache of \ref ast_bridge_snapshot. + */ +struct stasis_cache *ast_bridge_cache(void); /*! * \since 12 @@ -408,6 +431,15 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest( const char *bridge_id); /*! + * \internal + * \brief Initialize the topics for a single bridge. + * \return 0 on success. + * \return Non-zero on error. + */ +int bridge_topics_init(struct ast_bridge *bridge); + +/*! + * \internal * \brief Initialize the stasis bridging topic and message types * \retval 0 on success * \retval -1 on failure diff --git a/include/asterisk/stasis_cache_pattern.h b/include/asterisk/stasis_cache_pattern.h new file mode 100644 index 0000000000..2ea643e192 --- /dev/null +++ b/include/asterisk/stasis_cache_pattern.h @@ -0,0 +1,153 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef _ASTERISK_STASIS_CACHE_PATTERN_H +#define _ASTERISK_STASIS_CACHE_PATTERN_H + +/*! \file + * + * \brief Caching pattern for \ref stasis topics. + * + * A typical pattern for Stasis objects is to have individual objects, which + * have their own topic and caching topic. These individual topics feed an + * upstream aggregate topics, and a shared cache. + * + * The \ref stasis_cp_all object contains the aggregate topics and shared cache. + * This is built with the base name for the topics, and the identity function to + * identify messages in the cache. + * + * The \ref stasis_cp_single object contains the \ref stasis_topic for a single + * instance, and the corresponding \ref stasis_caching_topic. + * + * Since the \ref stasis_cp_single object has subscriptions for forwarding + * and caching, it must be disposed of using stasis_cp_single_unsubscribe() + * instead of simply ao2_cleanup(). + */ + +#include "asterisk/stasis.h" + +/*! + * \brief The 'all' side of the cache pattern. These are typically built as + * global objects for specific modules. + */ +struct stasis_cp_all; + +/*! + * \brief Create an all instance of the cache pattern. + * + * This object is AO2 managed, so dispose of it with ao2_cleanup(). + * + * \param name Base name of the topics. + * \param id_fn Identity function for the cache. + * \return All side instance. + * \return \c NULL on error. + */ +struct stasis_cp_all *stasis_cp_all_create(const char *name, + snapshot_get_id id_fn); + +/*! + * \brief Get the aggregate topic. + * + * This topic aggregates all messages published to corresponding + * stasis_cp_single_topic() topics. + * + * \param all All side caching pattern object. + * \return The aggregate topic. + * \return \c NULL if \a all is \c NULL + */ +struct stasis_topic *stasis_cp_all_topic(struct stasis_cp_all *all); + +/*! + * \brief Get the caching topic. + * + * This topic aggregates all messages from the corresponding + * stasis_cp_single_topic_cached() topics. + * + * Note that one normally only subscribes to the caching topic, since data + * is fed to it from its upstream topic. + * + * \param all All side caching pattern object. + * \return The aggregate caching topic. + * \return \c NULL if \a all is \c NULL + */ +struct stasis_topic *stasis_cp_all_topic_cached( + struct stasis_cp_all *all); + +/*! + * \brief Get the cache. + * + * This is the shared cache for all corresponding \ref stasis_cp_single objects. + * + * \param all All side caching pattern object. + * \return The cache. + * \return \c NULL if \a all is \c NULL + */ +struct stasis_cache *stasis_cp_all_cache(struct stasis_cp_all *all); + +/*! + * \brief The 'one' side of the cache pattern. These are built per-instance for + * some corresponding object, and must be explicitly disposed of using + * stasis_cp_single_unsubscribe(). + */ +struct stasis_cp_single; + +/*! + * \brief Create the 'one' side of the cache pattern. + * + * Dispose of using stasis_cp_single_unsubscribe(). + * + * \param all Corresponding all side. + * \param name Base name for the topics. + * \return One side instance + */ +struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, + const char *name); + +/*! + * \brief Stops caching and forwarding messages. + * + * \param one One side of the cache pattern. + */ +void stasis_cp_single_unsubscribe(struct stasis_cp_single *one); + +/*! + * \brief Get the topic for this instance. + * + * This is the topic to which one would post instance-specific messages, or + * subscribe for single-instance, uncached messages. + * + * \param one One side of the cache pattern. + * \return The main topic. + * \return \c NULL if \a one is \c NULL + */ +struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one); + +/*! + * \brief Get the caching topic for this instance. + * + * Note that one normally only subscribes to the caching topic, since data + * is fed to it from its upstream topic. + * + * \param one One side of the cache pattern. + * \return The caching topic. + * \return \c NULL if \a one is \c NULL + */ +struct stasis_topic *stasis_cp_single_topic_cached( + struct stasis_cp_single *one); + +#endif /* _ASTERISK_STASIS_CACHE_PATTERN_H */ diff --git a/include/asterisk/stasis_channels.h b/include/asterisk/stasis_channels.h index 8cc4c83826..c0b596ac97 100644 --- a/include/asterisk/stasis_channels.h +++ b/include/asterisk/stasis_channels.h @@ -106,6 +106,8 @@ struct ast_channel_blob { */ struct ast_multi_channel_blob; +struct stasis_cp_all *ast_channel_cache_all(void); + /*! * \since 12 * \brief A topic which publishes the events for all channels. @@ -120,16 +122,23 @@ struct stasis_topic *ast_channel_topic_all(void); * * \retval Topic for all channel events. */ -struct stasis_caching_topic *ast_channel_topic_all_cached(void); +struct stasis_topic *ast_channel_topic_all_cached(void); /*! * \since 12 - * \brief A caching topic which caches \ref ast_channel_snapshot messages from - * ast_channel_events_all(void) and indexes them by name. + * \brief Primary channel cache, indexed by Uniqueid. * - * \retval Topic for all channel events. + * \retval Cache of \ref ast_channel_snapshot. */ -struct stasis_caching_topic *ast_channel_topic_all_cached_by_name(void); +struct stasis_cache *ast_channel_cache(void); + +/*! + * \since 12 + * \brief Secondary channel cache, indexed by name. + * + * \retval Cache of \ref ast_channel_snapshot. + */ +struct stasis_cache *ast_channel_cache_by_name(void); /*! * \since 12 @@ -551,7 +560,9 @@ int ast_channel_snapshot_caller_id_equal( /*! * \brief Initialize the stasis channel topic and message types + * \return 0 on success + * \return Non-zero on error */ -void ast_stasis_channels_init(void); +int ast_stasis_channels_init(void); #endif /* STASIS_CHANNELS_H_ */ diff --git a/include/asterisk/stasis_endpoints.h b/include/asterisk/stasis_endpoints.h index 6d7f8dbaa7..4a35e95873 100644 --- a/include/asterisk/stasis_endpoints.h +++ b/include/asterisk/stasis_endpoints.h @@ -30,6 +30,7 @@ #include "asterisk/endpoints.h" #include "asterisk/json.h" #include "asterisk/stasis.h" +#include "asterisk/stasis_cache_pattern.h" #include "asterisk/stringfields.h" /*! \addtogroup StasisTopicsAndMessages @@ -143,6 +144,31 @@ struct ast_endpoint_snapshot *ast_endpoint_snapshot_create( */ struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint); +/*! + * \brief Returns the topic for a specific endpoint. + * + * \ref ast_endpoint_snapshot messages are replaced with + * \ref stasis_cache_update + * + * \param endpoint The endpoint. + * \return The topic for the given endpoint. + * \return ast_endpoint_topic_all() if endpoint is \c NULL. + * \since 12 + */ +struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint); + +/*! + * \internal + * \brief Cache and global topics for endpoints. + * + * This is public simply to be used by endpoints.c. Please use the accessor + * functions (ast_endpoint_topic_all(), ast_endpoint_topic_all_cached(), + * ast_endpoint_cache(), etc.) instead of calling this directly. + * + * \since 12 + */ +struct stasis_cp_all *ast_endpoint_cache_all(void); + /*! * \brief Topic for all endpoint releated messages. * \since 12 @@ -153,7 +179,14 @@ struct stasis_topic *ast_endpoint_topic_all(void); * \brief Cached topic for all endpoint related messages. * \since 12 */ -struct stasis_caching_topic *ast_endpoint_topic_all_cached(void); +struct stasis_topic *ast_endpoint_topic_all_cached(void); + +/*! + * \brief Backend cache for ast_endpoint_topic_all_cached(). + * \return Cache of \ref ast_endpoint_snapshot. + * \since 12 + */ +struct stasis_cache *ast_endpoint_cache(void); /*! * \brief Retrieve the most recent snapshot for the endpoint with the given diff --git a/main/app.c b/main/app.c index 031f6f28f5..8d081fe8c7 100644 --- a/main/app.c +++ b/main/app.c @@ -88,6 +88,7 @@ static AST_LIST_HEAD_STATIC(zombies, zombie); * @{ \brief Define \ref stasis topic objects for MWI */ static struct stasis_topic *mwi_topic_all; +static struct stasis_cache *mwi_state_cache; static struct stasis_caching_topic *mwi_topic_cached; static struct stasis_topic_pool *mwi_topic_pool; /* @} */ @@ -2696,9 +2697,14 @@ struct stasis_topic *ast_mwi_topic_all(void) return mwi_topic_all; } -struct stasis_caching_topic *ast_mwi_topic_cached(void) +struct stasis_cache *ast_mwi_state_cache(void) { - return mwi_topic_cached; + return mwi_state_cache; +} + +struct stasis_topic *ast_mwi_topic_cached(void) +{ + return stasis_caching_get_topic(mwi_topic_cached); } struct stasis_topic *ast_mwi_topic(const char *uniqueid) @@ -2754,7 +2760,7 @@ int ast_publish_mwi_state_full( if (!ast_strlen_zero(channel_id)) { RAII_VAR(struct stasis_message *, chan_message, - stasis_cache_get(ast_channel_topic_all_cached(), + stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), channel_id), ao2_cleanup); @@ -2855,7 +2861,11 @@ int app_init(void) if (!mwi_topic_all) { return -1; } - mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_get_id); + mwi_state_cache = stasis_cache_create(mwi_state_get_id); + if (!mwi_state_cache) { + return -1; + } + mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_cache); if (!mwi_topic_cached) { return -1; } diff --git a/main/bridge.c b/main/bridge.c index 0b4d95d5e4..9e9e65c172 100644 --- a/main/bridge.c +++ b/main/bridge.c @@ -46,6 +46,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/bridge_after.h" #include "asterisk/stasis_bridges.h" #include "asterisk/stasis_channels.h" +#include "asterisk/stasis_cache_pattern.h" #include "asterisk/app.h" #include "asterisk/file.h" #include "asterisk/module.h" @@ -634,6 +635,8 @@ static void destroy_bridge(void *obj) } cleanup_video_mode(bridge); + + stasis_cp_single_unsubscribe(bridge->topics); } struct ast_bridge *bridge_register(struct ast_bridge *bridge) @@ -685,6 +688,13 @@ struct ast_bridge *bridge_base_init(struct ast_bridge *self, uint32_t capabiliti ast_set_flag(&self->feature_flags, flags); self->allowed_capabilities = capabilities; + if (bridge_topics_init(self) != 0) { + ast_log(LOG_WARNING, "Bridge %s: Could not initialize topics\n", + self->uniqueid); + ao2_ref(self, -1); + return NULL; + } + /* Use our helper function to find the "best" bridge technology. */ self->technology = find_best_technology(capabilities, self); if (!self->technology) { @@ -4397,7 +4407,7 @@ static char *complete_bridge(const char *word, int state) struct ao2_iterator iter; struct stasis_message *msg; - if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) { + if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) { return NULL; } @@ -4435,7 +4445,7 @@ static char *handle_bridge_show_all(struct ast_cli_entry *e, int cmd, struct ast return NULL; } - if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) { + if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) { ast_cli(a->fd, "Failed to retrieve cached bridges\n"); return CLI_SUCCESS; } @@ -4467,7 +4477,7 @@ static int bridge_show_specific_print_channel(void *obj, void *arg, int flags) RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); struct ast_channel_snapshot *snapshot; - if (!(msg = stasis_cache_get(ast_channel_topic_all_cached(), ast_channel_snapshot_type(), uniqueid))) { + if (!(msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid))) { return 0; } snapshot = stasis_message_data(msg); @@ -4500,7 +4510,7 @@ static char *handle_bridge_show_specific(struct ast_cli_entry *e, int cmd, struc return CLI_SHOWUSAGE; } - msg = stasis_cache_get(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type(), a->argv[2]); + msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), a->argv[2]); if (!msg) { ast_cli(a->fd, "Bridge '%s' not found\n", a->argv[2]); return CLI_SUCCESS; diff --git a/main/cdr.c b/main/cdr.c index 5129cce035..f3608f0a98 100644 --- a/main/cdr.c +++ b/main/cdr.c @@ -4005,11 +4005,11 @@ int ast_cdr_engine_init(void) return -1; } - channel_subscription = stasis_forward_all(stasis_caching_get_topic(ast_channel_topic_all_cached()), cdr_topic); + channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic); if (!channel_subscription) { return -1; } - bridge_subscription = stasis_forward_all(stasis_caching_get_topic(ast_bridge_topic_all_cached()), cdr_topic); + bridge_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic); if (!bridge_subscription) { return -1; } diff --git a/main/cel.c b/main/cel.c index f66fbdcc0b..a03d081158 100644 --- a/main/cel.c +++ b/main/cel.c @@ -1551,14 +1551,14 @@ int ast_cel_engine_init(void) } cel_channel_forwarder = stasis_forward_all( - stasis_caching_get_topic(ast_channel_topic_all_cached()), + ast_channel_topic_all_cached(), cel_aggregation_topic); if (!cel_channel_forwarder) { return -1; } cel_bridge_forwarder = stasis_forward_all( - stasis_caching_get_topic(ast_bridge_topic_all_cached()), + ast_bridge_topic_all_cached(), cel_aggregation_topic); if (!cel_bridge_forwarder) { return -1; diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c index a1d20871d5..35bcb187dd 100644 --- a/main/channel_internal_api.c +++ b/main/channel_internal_api.c @@ -44,6 +44,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/data.h" #include "asterisk/endpoints.h" #include "asterisk/indications.h" +#include "asterisk/stasis_cache_pattern.h" #include "asterisk/stasis_channels.h" #include "asterisk/stasis_endpoints.h" #include "asterisk/stringfields.h" @@ -208,7 +209,7 @@ struct ast_channel { char dtmf_digit_to_emulate; /*!< Digit being emulated */ char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */ struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */ - struct stasis_topic *topic; /*!< Topic for all channel's events */ + struct stasis_cp_single *topics; /*!< Topic for all channel's events */ struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */ struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */ }; @@ -1434,8 +1435,8 @@ void ast_channel_internal_cleanup(struct ast_channel *chan) chan->forwarder = stasis_unsubscribe(chan->forwarder); chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward); - ao2_cleanup(chan->topic); - chan->topic = NULL; + stasis_cp_single_unsubscribe(chan->topics); + chan->topics = NULL; } void ast_channel_internal_finalize(struct ast_channel *chan) @@ -1450,16 +1451,31 @@ int ast_channel_internal_is_finalized(struct ast_channel *chan) struct stasis_topic *ast_channel_topic(struct ast_channel *chan) { - return chan ? chan->topic : ast_channel_topic_all(); + if (!chan) { + return ast_channel_topic_all(); + } + + return stasis_cp_single_topic(chan->topics); } -int ast_channel_forward_endpoint(struct ast_channel *chan, struct ast_endpoint *endpoint) +struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan) +{ + if (!chan) { + return ast_channel_topic_all_cached(); + } + + return stasis_cp_single_topic_cached(chan->topics); +} + +int ast_channel_forward_endpoint(struct ast_channel *chan, + struct ast_endpoint *endpoint) { ast_assert(chan != NULL); ast_assert(endpoint != NULL); chan->endpoint_forward = - stasis_forward_all(chan->topic, ast_endpoint_topic(endpoint)); + stasis_forward_all(ast_channel_topic(chan), + ast_endpoint_topic(endpoint)); if (chan->endpoint_forward == NULL) { return -1; @@ -1468,19 +1484,21 @@ int ast_channel_forward_endpoint(struct ast_channel *chan, struct ast_endpoint * return 0; } -void ast_channel_internal_setup_topics(struct ast_channel *chan) +int ast_channel_internal_setup_topics(struct ast_channel *chan) { const char *topic_name = chan->uniqueid; - ast_assert(chan->topic == NULL); - ast_assert(chan->forwarder == NULL); + ast_assert(chan->topics == NULL); if (ast_strlen_zero(topic_name)) { topic_name = ""; } - chan->topic = stasis_topic_create(topic_name); - chan->forwarder = stasis_forward_all(chan->topic, ast_channel_topic_all()); + chan->topics = stasis_cp_single_create( + ast_channel_cache_all(), topic_name); - ast_assert(chan->topic != NULL); - ast_assert(chan->forwarder != NULL); + if (!chan->topics) { + return -1; + } + + return 0; } diff --git a/main/cli.c b/main/cli.c index 6474f3e725..6ca0737ab5 100644 --- a/main/cli.c +++ b/main/cli.c @@ -915,7 +915,7 @@ static char *handle_chanlist(struct ast_cli_entry *e, int cmd, struct ast_cli_ar return CLI_SHOWUSAGE; - if (!(channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) { + if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) { ast_cli(a->fd, "Failed to retrieve cached channels\n"); return CLI_SUCCESS; } @@ -1438,7 +1438,7 @@ static char *handle_showchan(struct ast_cli_entry *e, int cmd, struct ast_cli_ar now = ast_tvnow(); - if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), a->argv[3]))) { + if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), a->argv[3]))) { ast_cli(a->fd, "%s is not a known channel\n", a->argv[3]); return CLI_SUCCESS; } @@ -1571,7 +1571,7 @@ char *ast_complete_channels(const char *line, const char *word, int pos, int sta return NULL; } - if (!(cached_channels = stasis_cache_dump(ast_channel_topic_all_cached(), ast_channel_snapshot_type()))) { + if (!(cached_channels = stasis_cache_dump(ast_channel_cache(), ast_channel_snapshot_type()))) { return NULL; } diff --git a/main/devicestate.c b/main/devicestate.c index b2c70f7648..c16a0628b3 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -196,6 +196,7 @@ static ast_cond_t change_pending; struct stasis_subscription *devstate_message_sub; static struct stasis_topic *device_state_topic_all; +static struct stasis_cache *device_state_cache; static struct stasis_caching_topic *device_state_topic_cached; static struct stasis_topic_pool *device_state_topic_pool; @@ -285,7 +286,7 @@ static enum ast_device_state devstate_cached(const char *device) RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup); struct ast_device_state_message *device_state; - cached_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device); + cached_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device); if (!cached_msg) { return AST_DEVICE_UNKNOWN; } @@ -586,7 +587,7 @@ static enum ast_device_state get_aggregate_state(char *device) ast_devstate_aggregate_init(&aggregate); - cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL); + cached = stasis_cache_dump(ast_device_state_cache(), NULL); ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device); @@ -598,7 +599,7 @@ static int aggregate_state_changed(char *device, enum ast_device_state new_aggre RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup); struct ast_device_state_message *cached_aggregate_device_state; - cached_aggregate_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device); + cached_aggregate_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device); if (!cached_aggregate_msg) { return 1; } @@ -719,9 +720,14 @@ struct stasis_topic *ast_device_state_topic_all(void) return device_state_topic_all; } -struct stasis_caching_topic *ast_device_state_topic_cached(void) +struct stasis_cache *ast_device_state_cache(void) { - return device_state_topic_cached; + return device_state_cache; +} + +struct stasis_topic *ast_device_state_topic_cached(void) +{ + return stasis_caching_get_topic(device_state_topic_cached); } struct stasis_topic *ast_device_state_topic(const char *device) @@ -777,6 +783,8 @@ static void devstate_cleanup(void) devstate_message_sub = stasis_unsubscribe_and_join(devstate_message_sub); ao2_cleanup(device_state_topic_all); device_state_topic_all = NULL; + ao2_cleanup(device_state_cache); + device_state_cache = NULL; device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached); STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type); ao2_cleanup(device_state_topic_pool); @@ -794,7 +802,11 @@ int devstate_init(void) if (!device_state_topic_all) { return -1; } - device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_get_id); + device_state_cache = stasis_cache_create(device_state_get_id); + if (!device_state_cache) { + return -1; + } + device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_cache); if (!device_state_topic_cached) { return -1; } @@ -803,7 +815,7 @@ int devstate_init(void) return -1; } - devstate_message_sub = stasis_subscribe(stasis_caching_get_topic(ast_device_state_topic_cached()), devstate_change_collector_cb, NULL); + devstate_message_sub = stasis_subscribe(ast_device_state_topic_cached(), devstate_change_collector_cb, NULL); if (!devstate_message_sub) { ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n"); diff --git a/main/endpoints.c b/main/endpoints.c index d689f2e6ec..b33e33f1a8 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -58,17 +58,29 @@ struct ast_endpoint { */ int max_channels; /*! Topic for this endpoint's messages */ - struct stasis_topic *topic; - /*! - * Forwarding subscription sending messages to ast_endpoint_topic_all() - */ - struct stasis_subscription *forward; + struct stasis_cp_single *topics; /*! Router for handling this endpoint's messages */ struct stasis_message_router *router; /*! ast_str_container of channels associated with this endpoint */ struct ao2_container *channel_ids; }; +struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint) +{ + if (!endpoint) { + return ast_endpoint_topic_all(); + } + return stasis_cp_single_topic(endpoint->topics); +} + +struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint) +{ + if (!endpoint) { + return ast_endpoint_topic_all_cached(); + } + return stasis_cp_single_topic_cached(endpoint->topics); +} + const char *ast_endpoint_state_to_string(enum ast_endpoint_state state) { switch (state) { @@ -88,7 +100,7 @@ static void endpoint_publish_snapshot(struct ast_endpoint *endpoint) RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); ast_assert(endpoint != NULL); - ast_assert(endpoint->topic != NULL); + ast_assert(endpoint->topics != NULL); snapshot = ast_endpoint_snapshot_create(endpoint); if (!snapshot) { @@ -98,7 +110,7 @@ static void endpoint_publish_snapshot(struct ast_endpoint *endpoint) if (!message) { return; } - stasis_publish(endpoint->topic, message); + stasis_publish(ast_endpoint_topic(endpoint), message); } static void endpoint_dtor(void *obj) @@ -110,11 +122,8 @@ static void endpoint_dtor(void *obj) ao2_cleanup(endpoint->router); endpoint->router = NULL; - stasis_unsubscribe(endpoint->forward); - endpoint->forward = NULL; - - ao2_cleanup(endpoint->topic); - endpoint->topic = NULL; + stasis_cp_single_unsubscribe(endpoint->topics); + endpoint->topics = NULL; ao2_cleanup(endpoint->channel_ids); endpoint->channel_ids = NULL; @@ -214,18 +223,13 @@ struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource) return NULL; } - endpoint->topic = stasis_topic_create(endpoint->id); - if (!endpoint->topic) { + endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(), + endpoint->id); + if (!endpoint->topics) { return NULL; } - endpoint->forward = - stasis_forward_all(endpoint->topic, ast_endpoint_topic_all()); - if (!endpoint->forward) { - return NULL; - } - - endpoint->router = stasis_message_router_create(endpoint->topic); + endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint)); if (!endpoint->router) { return NULL; } @@ -271,7 +275,7 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint) RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); message = stasis_cache_clear_create(clear_msg); if (message) { - stasis_publish(endpoint->topic, message); + stasis_publish(ast_endpoint_topic(endpoint), message); } } @@ -285,11 +289,6 @@ const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint) return endpoint->resource; } -struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint) -{ - return endpoint ? endpoint->topic : ast_endpoint_topic_all(); -} - void ast_endpoint_set_state(struct ast_endpoint *endpoint, enum ast_endpoint_state state) { diff --git a/main/manager.c b/main/manager.c index 33bf269773..8ea7f42022 100644 --- a/main/manager.c +++ b/main/manager.c @@ -3874,7 +3874,7 @@ static int action_status(struct mansession *s, const struct message *m) } if (all) { - if (!(cached_channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) { + if (!(cached_channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) { ast_free(str); astman_send_error(s, m, "Memory Allocation Failure"); return 1; @@ -3882,7 +3882,7 @@ static int action_status(struct mansession *s, const struct message *m) it_chans = ao2_iterator_init(cached_channels, 0); msg = ao2_iterator_next(&it_chans); } else { - if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), name))) { + if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), name))) { astman_send_error(s, m, "No such channel"); ast_free(str); return 0; @@ -5356,7 +5356,7 @@ static int action_coreshowchannels(struct mansession *s, const struct message *m idText[0] = '\0'; } - if (!(channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) { + if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) { astman_send_error(s, m, "Could not get cached channels"); return 0; } diff --git a/main/manager_bridges.c b/main/manager_bridges.c index 7f0ae6b016..c791e63f37 100644 --- a/main/manager_bridges.c +++ b/main/manager_bridges.c @@ -350,7 +350,7 @@ static int manager_bridges_list(struct mansession *s, const struct message *m) ast_str_set(&id_text, 0, "ActionID: %s\r\n", id); } - bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()); + bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()); if (!bridges) { astman_send_error(s, m, "Internal error"); return -1; @@ -382,7 +382,7 @@ static int send_bridge_info_item_cb(void *obj, void *arg, void *data, int flags) RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); struct ast_channel_snapshot *snapshot; RAII_VAR(struct ast_str *, channel_text, NULL, ast_free); - msg = stasis_cache_get(ast_channel_topic_all_cached(), + msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid); if (!msg) { @@ -432,7 +432,7 @@ static int manager_bridge_info(struct mansession *s, const struct message *m) ast_str_set(&id_text, 0, "ActionID: %s\r\n", id); } - msg = stasis_cache_get(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type(), bridge_uniqueid); + msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), bridge_uniqueid); if (!msg) { astman_send_error(s, m, "Specified BridgeUniqueid not found"); return -1; @@ -489,7 +489,7 @@ int manager_bridging_init(void) return -1; } - bridge_topic = stasis_caching_get_topic(ast_bridge_topic_all_cached()); + bridge_topic = ast_bridge_topic_all_cached(); if (!bridge_topic) { return -1; } diff --git a/main/manager_channels.c b/main/manager_channels.c index d26f0be06f..cab4aa38d5 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -1269,7 +1269,7 @@ int manager_channels_init(void) if (!message_router) { return -1; } - channel_topic = stasis_caching_get_topic(ast_channel_topic_all_cached()); + channel_topic = ast_channel_topic_all_cached(); if (!channel_topic) { return -1; } diff --git a/main/manager_endpoints.c b/main/manager_endpoints.c index f0ed28a2ce..634283728a 100644 --- a/main/manager_endpoints.c +++ b/main/manager_endpoints.c @@ -49,7 +49,11 @@ static void endpoint_state_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) { - stasis_forward_message(ast_manager_get_topic(), stasis_caching_get_topic(ast_endpoint_topic_all_cached()), message); + /* XXX This looks wrong. Nothing should post or forward to a caching + * topic directly. Maybe ast_endpoint_topic() would be correct? I'd have + * to dig to make sure I don't break anything, though. + */ + stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached(), message); } int manager_endpoints_init(void) @@ -64,7 +68,7 @@ int manager_endpoints_init(void) ast_register_atexit(manager_endpoints_shutdown); - endpoint_topic = stasis_caching_get_topic(ast_endpoint_topic_all_cached()); + endpoint_topic = ast_endpoint_topic_all_cached(); if (!endpoint_topic) { return -1; } diff --git a/main/pbx.c b/main/pbx.c index 0b8024a1f2..27f774ac3c 100644 --- a/main/pbx.c +++ b/main/pbx.c @@ -8042,7 +8042,7 @@ static char *handle_show_chanvar(struct ast_cli_entry *e, int cmd, struct ast_cl if (a->argc != e->args + 1) return CLI_SHOWUSAGE; - if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), a->argv[3]))) { + if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), a->argv[3]))) { ast_cli(a->fd, "Channel '%s' not found\n", a->argv[e->args]); return CLI_FAILURE; } diff --git a/main/presencestate.c b/main/presencestate.c index 45174d5a1a..3f394b6355 100644 --- a/main/presencestate.c +++ b/main/presencestate.c @@ -55,6 +55,7 @@ static const struct { STASIS_MESSAGE_TYPE_DEFN(ast_presence_state_message_type); struct stasis_topic *presence_state_topic_all; +struct stasis_cache *presence_state_cache; struct stasis_caching_topic *presence_state_topic_cached; /*! \brief A presence state provider */ @@ -95,7 +96,7 @@ static enum ast_presence_state presence_state_cached(const char *presence_provid RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); struct ast_presence_state_message *presence_state; - msg = stasis_cache_get(ast_presence_state_topic_cached(), ast_presence_state_message_type(), presence_provider); + msg = stasis_cache_get(ast_presence_state_cache(), ast_presence_state_message_type(), presence_provider); if (!msg) { return res; @@ -294,9 +295,14 @@ struct stasis_topic *ast_presence_state_topic_all(void) return presence_state_topic_all; } -struct stasis_caching_topic *ast_presence_state_topic_cached(void) +struct stasis_cache *ast_presence_state_cache(void) { - return presence_state_topic_cached; + return presence_state_cache; +} + +struct stasis_topic *ast_presence_state_topic_cached(void) +{ + return stasis_caching_get_topic(presence_state_topic_cached); } static const char *presence_state_get_id(struct stasis_message *msg) @@ -314,6 +320,8 @@ static void presence_state_engine_cleanup(void) { ao2_cleanup(presence_state_topic_all); presence_state_topic_all = NULL; + ao2_cleanup(presence_state_cache); + presence_state_cache = NULL; presence_state_topic_cached = stasis_caching_unsubscribe_and_join(presence_state_topic_cached); STASIS_MESSAGE_TYPE_CLEANUP(ast_presence_state_message_type); } @@ -331,7 +339,12 @@ int ast_presence_state_engine_init(void) return -1; } - presence_state_topic_cached = stasis_caching_topic_create(presence_state_topic_all, presence_state_get_id); + presence_state_cache = stasis_cache_create(presence_state_get_id); + if (!presence_state_cache) { + return -1; + } + + presence_state_topic_cached = stasis_caching_topic_create(presence_state_topic_all, presence_state_cache); if (!presence_state_topic_cached) { return -1; } diff --git a/main/stasis.c b/main/stasis.c index 64f77e3091..b1af7b7f66 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -653,6 +653,11 @@ int stasis_init(void) return -1; } + if (stasis_wait_init() != 0) { + ast_log(LOG_ERROR, "Stasis initialization failed\n"); + return -1; + } + if (pool) { ast_log(LOG_ERROR, "Stasis double-initialized\n"); return -1; diff --git a/main/stasis_bridges.c b/main/stasis_bridges.c index 858875ad97..72f4d5055e 100644 --- a/main/stasis_bridges.c +++ b/main/stasis_bridges.c @@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/stasis.h" +#include "asterisk/stasis_cache_pattern.h" #include "asterisk/channel.h" #include "asterisk/stasis_bridges.h" #include "asterisk/stasis_channels.h" @@ -361,6 +362,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") static struct ast_manager_event_blob *attended_transfer_to_ami(struct stasis_message *message); static struct ast_manager_event_blob *blind_transfer_to_ami(struct stasis_message *message); +static struct stasis_cp_all *bridge_cache_all; + /*! * @{ \brief Define bridge message types. */ @@ -372,14 +375,52 @@ STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = blind_transfer_to_am STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami); /*! @} */ -/*! \brief Aggregate topic for bridge messages */ -static struct stasis_topic *bridge_topic_all; +struct stasis_cache *ast_bridge_cache(void) +{ + return stasis_cp_all_cache(bridge_cache_all); +} -/*! \brief Caching aggregate topic for bridge snapshots */ -static struct stasis_caching_topic *bridge_topic_all_cached; +struct stasis_topic *ast_bridge_topic_all(void) +{ + return stasis_cp_all_topic(bridge_cache_all); +} -/*! \brief Topic pool for individual bridge topics */ -static struct stasis_topic_pool *bridge_topic_pool; +struct stasis_topic *ast_bridge_topic_all_cached(void) +{ + return stasis_cp_all_topic_cached(bridge_cache_all); +} + +int bridge_topics_init(struct ast_bridge *bridge) +{ + if (ast_strlen_zero(bridge->uniqueid)) { + ast_log(LOG_ERROR, "Bridge id initialization required\n"); + return -1; + } + bridge->topics = stasis_cp_single_create(bridge_cache_all, + bridge->uniqueid); + if (!bridge->topics) { + return -1; + } + return 0; +} + +struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge) +{ + if (!bridge) { + return ast_bridge_topic_all(); + } + + return stasis_cp_single_topic(bridge->topics); +} + +struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge) +{ + if (!bridge) { + return ast_bridge_topic_all_cached(); + } + + return stasis_cp_single_topic_cached(bridge->topics); +} /*! \brief Destructor for bridge snapshots */ static void bridge_snapshot_dtor(void *obj) @@ -425,25 +466,6 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_create(struct ast_bridge *bridge return snapshot; } -struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge) -{ - struct stasis_topic *bridge_topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid); - if (!bridge_topic) { - return ast_bridge_topic_all(); - } - return bridge_topic; -} - -struct stasis_topic *ast_bridge_topic_all(void) -{ - return bridge_topic_all; -} - -struct stasis_caching_topic *ast_bridge_topic_all_cached(void) -{ - return bridge_topic_all_cached; -} - void ast_bridge_publish_state(struct ast_bridge *bridge) { RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup); @@ -464,7 +486,8 @@ void ast_bridge_publish_state(struct ast_bridge *bridge) stasis_publish(ast_bridge_topic(bridge), msg); } -static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj) +static void bridge_publish_state_from_blob(struct ast_bridge *bridge, + struct ast_bridge_blob *obj) { RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); @@ -475,7 +498,7 @@ static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj) return; } - stasis_publish(stasis_topic_pool_get_topic(bridge_topic_pool, obj->bridge->uniqueid), msg); + stasis_publish(ast_bridge_topic(bridge), msg); } /*! \brief Destructor for bridge merge messages */ @@ -597,7 +620,7 @@ void ast_bridge_publish_enter(struct ast_bridge *bridge, struct ast_channel *cha /* enter blob first, then state */ stasis_publish(ast_bridge_topic(bridge), msg); - bridge_publish_state_from_blob(stasis_message_data(msg)); + bridge_publish_state_from_blob(bridge, stasis_message_data(msg)); } void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *chan) @@ -610,7 +633,7 @@ void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *cha } /* state first, then leave blob (opposite of enter, preserves nesting of events) */ - bridge_publish_state_from_blob(stasis_message_data(msg)); + bridge_publish_state_from_blob(bridge, stasis_message_data(msg)); stasis_publish(ast_bridge_topic(bridge), msg); } @@ -1043,7 +1066,7 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid) ast_assert(!ast_strlen_zero(uniqueid)); - message = stasis_cache_get(ast_bridge_topic_all_cached(), + message = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), uniqueid); if (!message) { @@ -1058,23 +1081,6 @@ struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid) return snapshot; } -static void stasis_bridging_cleanup(void) -{ - ao2_cleanup(bridge_topic_all); - bridge_topic_all = NULL; - bridge_topic_all_cached = stasis_caching_unsubscribe_and_join( - bridge_topic_all_cached); - ao2_cleanup(bridge_topic_pool); - bridge_topic_pool = NULL; - - STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_blind_transfer_type); - STASIS_MESSAGE_TYPE_CLEANUP(ast_attended_transfer_type); -} - /*! \brief snapshot ID getter for caching topic */ static const char *bridge_snapshot_get_id(struct stasis_message *msg) { @@ -1086,21 +1092,38 @@ static const char *bridge_snapshot_get_id(struct stasis_message *msg) return snapshot->uniqueid; } +static void stasis_bridging_cleanup(void) +{ + STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_blind_transfer_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_attended_transfer_type); + + ao2_cleanup(bridge_cache_all); + bridge_cache_all = NULL; +} + int ast_stasis_bridging_init(void) { + int res = 0; + ast_register_cleanup(stasis_bridging_cleanup); - STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type); - STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type); - STASIS_MESSAGE_TYPE_INIT(ast_blind_transfer_type); - STASIS_MESSAGE_TYPE_INIT(ast_attended_transfer_type); - bridge_topic_all = stasis_topic_create("ast_bridge_topic_all"); - bridge_topic_all_cached = stasis_caching_topic_create(bridge_topic_all, bridge_snapshot_get_id); - bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all); + bridge_cache_all = stasis_cp_all_create("ast_bridge_topic_all", + bridge_snapshot_get_id); - return !bridge_topic_all - || !bridge_topic_all_cached - || !bridge_topic_pool ? -1 : 0; + if (!bridge_cache_all) { + return -1; + } + + res |= STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_blind_transfer_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_attended_transfer_type); + + return res; } diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 17be90111f..3d50656657 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -44,15 +44,18 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #endif /*! \internal */ -struct stasis_caching_topic { - struct ao2_container *cache; - struct stasis_topic *topic; - struct stasis_topic *original_topic; - struct stasis_subscription *sub; +struct stasis_cache { + struct ao2_container *entries; snapshot_get_id id_fn; }; -static struct stasis_message_type *cache_guarantee_type(void); +/*! \internal */ +struct stasis_caching_topic { + struct stasis_cache *cache; + struct stasis_topic *topic; + struct stasis_topic *original_topic; + struct stasis_subscription *sub; +}; static void stasis_caching_topic_dtor(void *obj) { struct stasis_caching_topic *caching_topic = obj; @@ -136,7 +139,8 @@ static struct cache_entry *cache_entry_create(struct stasis_message_type *type, ast_assert(type != NULL); ast_assert(id != NULL); - entry = ao2_alloc(sizeof(*entry), cache_entry_dtor); + entry = ao2_alloc_options(sizeof(*entry), cache_entry_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK); if (!entry) { return NULL; } @@ -183,28 +187,62 @@ static int cache_entry_cmp(void *obj, void *arg, int flags) return 0; } -static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot) +static void cache_dtor(void *obj) +{ + struct stasis_cache *cache = obj; + + ao2_cleanup(cache->entries); + cache->entries = NULL; +} + +struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn) +{ + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); + + cache = ao2_alloc_options(sizeof(*cache), cache_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!cache) { + return NULL; + } + + cache->entries = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, + cache_entry_cmp); + if (!cache->entries) { + return NULL; + } + + cache->id_fn = id_fn; + + ao2_ref(cache, +1); + return cache; +} + +static struct stasis_message *cache_put(struct stasis_cache *cache, + struct stasis_message_type *type, const char *id, + struct stasis_message *new_snapshot) { RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup); RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup); struct stasis_message *old_snapshot = NULL; - ast_assert(caching_topic->cache != NULL); + ast_assert(cache->entries != NULL); + ast_assert(new_snapshot == NULL || + type == stasis_message_type(new_snapshot)); new_entry = cache_entry_create(type, id, new_snapshot); if (new_snapshot == NULL) { /* Remove entry from cache */ - cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK); + cached_entry = ao2_find(cache->entries, new_entry, OBJ_POINTER | OBJ_UNLINK); if (cached_entry) { old_snapshot = cached_entry->snapshot; cached_entry->snapshot = NULL; } } else { /* Insert/update cache */ - SCOPED_AO2LOCK(lock, caching_topic->cache); + SCOPED_AO2LOCK(lock, cache->entries); - cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK); + cached_entry = ao2_find(cache->entries, new_entry, OBJ_POINTER | OBJ_NOLOCK); if (cached_entry) { /* Update cache. Because objects are moving, no need to update refcounts. */ old_snapshot = cached_entry->snapshot; @@ -212,7 +250,7 @@ static struct stasis_message *cache_put(struct stasis_caching_topic *caching_top new_entry->snapshot = NULL; } else { /* Insert into the cache */ - ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK); + ao2_link_flags(cache->entries, new_entry, OBJ_NOLOCK); } } @@ -220,68 +258,19 @@ static struct stasis_message *cache_put(struct stasis_caching_topic *caching_top return old_snapshot; } -/*! \internal */ -struct caching_guarantee { - ast_mutex_t lock; - ast_cond_t cond; - unsigned int done:1; -}; - -static void caching_guarantee_dtor(void *obj) -{ - struct caching_guarantee *guarantee = obj; - - ast_assert(guarantee->done == 1); - - ast_mutex_destroy(&guarantee->lock); - ast_cond_destroy(&guarantee->cond); -} - -static struct stasis_message *caching_guarantee_create(void) -{ - RAII_VAR(struct caching_guarantee *, guarantee, NULL, ao2_cleanup); - RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); - - if (!(guarantee = ao2_alloc(sizeof(*guarantee), caching_guarantee_dtor))) { - return NULL; - } - - ast_mutex_init(&guarantee->lock); - ast_cond_init(&guarantee->cond, NULL); - - if (!(msg = stasis_message_create(cache_guarantee_type(), guarantee))) { - return NULL; - } - - ao2_ref(msg, +1); - return msg; -} - -struct stasis_message *stasis_cache_get_extended(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, unsigned int guaranteed) +struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id) { RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup); RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup); - ast_assert(caching_topic->cache != NULL); - - if (guaranteed) { - RAII_VAR(struct stasis_message *, msg, caching_guarantee_create(), ao2_cleanup); - struct caching_guarantee *guarantee = stasis_message_data(msg); - - ast_mutex_lock(&guarantee->lock); - stasis_publish(caching_topic->original_topic, msg); - while (!guarantee->done) { - ast_cond_wait(&guarantee->cond, &guarantee->lock); - } - ast_mutex_unlock(&guarantee->lock); - } + ast_assert(cache->entries != NULL); search_entry = cache_entry_create(type, id, NULL); if (search_entry == NULL) { return NULL; } - cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER); + cached_entry = ao2_find(cache->entries, search_entry, OBJ_POINTER); if (cached_entry == NULL) { return NULL; } @@ -308,25 +297,25 @@ static int cache_dump_cb(void *obj, void *arg, int flags) return 0; } -struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type) +struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type) { struct cache_dump_data cache_dump; - ast_assert(caching_topic->cache != NULL); + ast_assert(cache->entries != NULL); cache_dump.type = type; - cache_dump.cached = ao2_container_alloc(1, NULL, NULL); + cache_dump.cached = ao2_container_alloc_options( + AO2_ALLOC_OPT_LOCK_NOLOCK, 1, NULL, NULL); if (!cache_dump.cached) { return NULL; } - ao2_callback(caching_topic->cache, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump); + ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump); return cache_dump.cached; } STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type); STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type); -STASIS_MESSAGE_TYPE_DEFN(cache_guarantee_type); struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_message) { @@ -362,7 +351,8 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s ast_assert(topic != NULL); ast_assert(old_snapshot != NULL || new_snapshot != NULL); - update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor); + update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK); if (!update) { return NULL; } @@ -393,7 +383,8 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s return msg; } -static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +static void caching_topic_exec(void *data, struct stasis_subscription *sub, + struct stasis_topic *topic, struct stasis_message *message) { RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup); struct stasis_caching_topic *caching_topic = data; @@ -401,36 +392,25 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru ast_assert(caching_topic != NULL); ast_assert(caching_topic->topic != NULL); - ast_assert(caching_topic->id_fn != NULL); + ast_assert(caching_topic->cache != NULL); + ast_assert(caching_topic->cache->id_fn != NULL); if (stasis_subscription_final_message(sub, message)) { caching_topic_needs_unref = caching_topic; } - /* Handle cache guarantee event */ - if (cache_guarantee_type() == stasis_message_type(message)) { - struct caching_guarantee *guarantee = stasis_message_data(message); - - ast_mutex_lock(&guarantee->lock); - guarantee->done = 1; - ast_cond_signal(&guarantee->cond); - ast_mutex_unlock(&guarantee->lock); - - return; - } - /* Handle cache clear event */ if (stasis_cache_clear_type() == stasis_message_type(message)) { RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup); struct stasis_message *clear_msg = stasis_message_data(message); - const char *clear_id = caching_topic->id_fn(clear_msg); + const char *clear_id = caching_topic->cache->id_fn(clear_msg); struct stasis_message_type *clear_type = stasis_message_type(clear_msg); ast_assert(clear_type != NULL); if (clear_id) { - old_snapshot = cache_put(caching_topic, clear_type, clear_id, NULL); + old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL); if (old_snapshot) { update = update_create(topic, old_snapshot, NULL); stasis_publish(caching_topic->topic, update); @@ -444,7 +424,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru } } - id = caching_topic->id_fn(message); + id = caching_topic->cache->id_fn(message); if (id == NULL) { /* Object isn't cached; forward */ stasis_forward_message(caching_topic->topic, topic, message); @@ -453,7 +433,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup); - old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message); + old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message); update = update_create(topic, old_snapshot, message); if (update == NULL) { @@ -464,7 +444,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, stru } } -struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn) +struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache) { RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); struct stasis_subscription *sub; @@ -476,23 +456,19 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or return NULL; } - caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor); + caching_topic = ao2_alloc_options(sizeof(*caching_topic), + stasis_caching_topic_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK); if (caching_topic == NULL) { return NULL; } - caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp); - if (!caching_topic->cache) { - ast_log(LOG_ERROR, "Stasis cache allocation failed\n"); - return NULL; - } - caching_topic->topic = stasis_topic_create(new_name); if (caching_topic->topic == NULL) { return NULL; } - caching_topic->id_fn = id_fn; + ao2_ref(cache, +1); + caching_topic->cache = cache; sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0); if (sub == NULL) { @@ -514,7 +490,6 @@ static void stasis_cache_cleanup(void) { STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_clear_type); STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_update_type); - STASIS_MESSAGE_TYPE_CLEANUP(cache_guarantee_type); } int stasis_cache_init(void) @@ -529,10 +504,6 @@ int stasis_cache_init(void) return -1; } - if (STASIS_MESSAGE_TYPE_INIT(cache_guarantee_type) != 0) { - return -1; - } - return 0; } diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c new file mode 100644 index 0000000000..18ae8617ed --- /dev/null +++ b/main/stasis_cache_pattern.c @@ -0,0 +1,189 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * David M. Lee, II + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Typical cache pattern for Stasis topics. + * + * \author David M. Lee, II + */ + +/*** MODULEINFO + core + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/stasis_cache_pattern.h" + +struct stasis_cp_all { + struct stasis_topic *topic; + struct stasis_topic *topic_cached; + struct stasis_cache *cache; +}; + +struct stasis_cp_single { + struct stasis_topic *topic; + struct stasis_caching_topic *topic_cached; + + struct stasis_subscription *forward; + struct stasis_subscription *forward_cached; +}; + +static void all_dtor(void *obj) +{ + struct stasis_cp_all *all = obj; + + ao2_cleanup(all->topic); + ao2_cleanup(all->topic_cached); + ao2_cleanup(all->cache); +} + +struct stasis_cp_all *stasis_cp_all_create(const char *name, + snapshot_get_id id_fn) +{ + RAII_VAR(char *, cached_name, NULL, ast_free); + RAII_VAR(struct stasis_cp_all *, all, NULL, ao2_cleanup); + + all = ao2_alloc(sizeof(*all), all_dtor); + if (!all) { + return NULL; + } + + ast_asprintf(&cached_name, "%s-cached", name); + if (!cached_name) { + return NULL; + } + + all->topic = stasis_topic_create(name); + all->topic_cached = stasis_topic_create(cached_name); + all->cache = stasis_cache_create(id_fn); + + if (!all->topic || !all->topic_cached || !all->cache) { + return NULL; + } + + ao2_ref(all, +1); + return all; +} + +struct stasis_topic *stasis_cp_all_topic(struct stasis_cp_all *all) +{ + if (!all) { + return NULL; + } + return all->topic; +} + +struct stasis_topic *stasis_cp_all_topic_cached( + struct stasis_cp_all *all) +{ + if (!all) { + return NULL; + } + return all->topic_cached; +} + +struct stasis_cache *stasis_cp_all_cache(struct stasis_cp_all *all) +{ + if (!all) { + return NULL; + } + return all->cache; +} + +static void one_dtor(void *obj) +{ + struct stasis_cp_single *one = obj; + + /* Should already be unsubscribed */ + ast_assert(one->topic_cached == NULL); + ast_assert(one->forward == NULL); + ast_assert(one->forward_cached == NULL); + + ao2_cleanup(one->topic); + one->topic = NULL; +} + +struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, + const char *name) +{ + RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup); + + one = ao2_alloc(sizeof(*one), one_dtor); + if (!one) { + return NULL; + } + + one->topic = stasis_topic_create(name); + if (!one->topic) { + return NULL; + } + one->topic_cached = stasis_caching_topic_create(one->topic, all->cache); + if (!one->topic_cached) { + return NULL; + } + + one->forward = stasis_forward_all(one->topic, all->topic); + if (!one->forward) { + return NULL; + } + one->forward_cached = stasis_forward_all( + stasis_caching_get_topic(one->topic_cached), all->topic_cached); + if (!one->forward_cached) { + return NULL; + } + + ao2_ref(one, +1); + return one; +} + +void stasis_cp_single_unsubscribe(struct stasis_cp_single *one) +{ + if (!one) { + return; + } + + stasis_caching_unsubscribe(one->topic_cached); + one->topic_cached = NULL; + stasis_unsubscribe(one->forward); + one->forward = NULL; + stasis_unsubscribe(one->forward_cached); + one->forward_cached = NULL; +} + +struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one) +{ + if (!one) { + return NULL; + } + return one->topic; +} + +struct stasis_topic *stasis_cp_single_topic_cached( + struct stasis_cp_single *one) +{ + if (!one) { + return NULL; + } + return stasis_caching_get_topic(one->topic_cached); +} + diff --git a/main/stasis_channels.c b/main/stasis_channels.c index c71bbbd141..6729a10722 100644 --- a/main/stasis_channels.c +++ b/main/stasis_channels.c @@ -38,6 +38,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/bridge.h" #include "asterisk/translate.h" #include "asterisk/stasis.h" +#include "asterisk/stasis_cache_pattern.h" #include "asterisk/stasis_channels.h" /*** DOCUMENTATION @@ -88,23 +89,33 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7 -/*! \brief Topic for all channels */ -struct stasis_topic *channel_topic_all; +static struct stasis_cp_all *channel_cache_all; +static struct stasis_cache *channel_cache_by_name; +static struct stasis_caching_topic *channel_by_name_topic; -/*! \brief Caching topic for all channels */ -struct stasis_caching_topic *channel_topic_all_cached; +struct stasis_cp_all *ast_channel_cache_all(void) +{ + return channel_cache_all; +} -/*! \brief Caching topic for all channels indexed by name */ -struct stasis_caching_topic *channel_topic_all_cached_by_name; +struct stasis_cache *ast_channel_cache(void) +{ + return stasis_cp_all_cache(channel_cache_all); +} struct stasis_topic *ast_channel_topic_all(void) { - return channel_topic_all; + return stasis_cp_all_topic(channel_cache_all); } -struct stasis_caching_topic *ast_channel_topic_all_cached(void) +struct stasis_topic *ast_channel_topic_all_cached(void) { - return channel_topic_all_cached; + return stasis_cp_all_topic_cached(channel_cache_all); +} + +struct stasis_cache *ast_channel_cache_by_name(void) +{ + return channel_cache_by_name; } static const char *channel_snapshot_get_id(struct stasis_message *message) @@ -117,11 +128,6 @@ static const char *channel_snapshot_get_id(struct stasis_message *message) return snapshot->uniqueid; } -struct stasis_caching_topic *ast_channel_topic_all_cached_by_name(void) -{ - return channel_topic_all_cached_by_name; -} - static const char *channel_snapshot_get_name(struct stasis_message *message) { struct ast_channel_snapshot *snapshot; @@ -461,7 +467,7 @@ struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniquei ast_assert(!ast_strlen_zero(uniqueid)); - message = stasis_cache_get(ast_channel_topic_all_cached(), + message = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid); if (!message) { @@ -483,7 +489,7 @@ struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char ast_assert(!ast_strlen_zero(name)); - message = stasis_cache_get(ast_channel_topic_all_cached_by_name(), + message = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), name); if (!message) { @@ -906,10 +912,6 @@ STASIS_MESSAGE_TYPE_DEFN(ast_channel_agent_logoff_type, static void stasis_channels_cleanup(void) { - channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached); - channel_topic_all_cached_by_name = stasis_caching_unsubscribe_and_join(channel_topic_all_cached_by_name); - ao2_cleanup(channel_topic_all); - channel_topic_all = NULL; STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type); @@ -929,33 +931,58 @@ static void stasis_channels_cleanup(void) STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_stop_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_login_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_agent_logoff_type); + + stasis_caching_unsubscribe_and_join(channel_by_name_topic); + channel_by_name_topic = NULL; + ao2_cleanup(channel_cache_by_name); + channel_cache_by_name = NULL; + ao2_cleanup(channel_cache_all); + channel_cache_all = NULL; } -void ast_stasis_channels_init(void) +int ast_stasis_channels_init(void) { + int res = 0; + ast_register_cleanup(stasis_channels_cleanup); - STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_user_event_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_hold_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_unhold_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_start_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_stop_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_fax_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_handler_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_start_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_stop_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_start_type); - STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_stop_type); + channel_cache_all = stasis_cp_all_create("ast_channel_topic_all", + channel_snapshot_get_id); + if (!channel_cache_all) { + return -1; + } STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_login_type); STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_logoff_type); - channel_topic_all = stasis_topic_create("ast_channel_topic_all"); - channel_topic_all_cached = stasis_caching_topic_create(channel_topic_all, channel_snapshot_get_id); - channel_topic_all_cached_by_name = stasis_caching_topic_create(channel_topic_all, channel_snapshot_get_name); + channel_cache_by_name = stasis_cache_create(channel_snapshot_get_name); + if (!channel_cache_by_name) { + return -1; + } + + channel_by_name_topic = stasis_caching_topic_create( + stasis_cp_all_topic(channel_cache_all), + channel_cache_by_name); + if (!channel_by_name_topic) { + return -1; + } + + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_user_event_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hold_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_unhold_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_start_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_stop_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_fax_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_handler_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_start_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_stop_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_start_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_stop_type); + + return res; } diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c index a6756182c7..831b4aee0b 100644 --- a/main/stasis_endpoints.c +++ b/main/stasis_endpoints.c @@ -73,6 +73,28 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") ***/ +static struct stasis_cp_all *endpoint_cache_all; + +struct stasis_cp_all *ast_endpoint_cache_all(void) +{ + return endpoint_cache_all; +} + +struct stasis_cache *ast_endpoint_cache(void) +{ + return stasis_cp_all_cache(endpoint_cache_all); +} + +struct stasis_topic *ast_endpoint_topic_all(void) +{ + return stasis_cp_all_topic(endpoint_cache_all); +} + +struct stasis_topic *ast_endpoint_topic_all_cached(void) +{ + return stasis_cp_all_topic_cached(endpoint_cache_all); +} + static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg); STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_snapshot_type); @@ -80,10 +102,6 @@ STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_state_type, .to_ami = peerstatus_to_ami, ); -static struct stasis_topic *endpoint_topic_all; - -static struct stasis_caching_topic *endpoint_topic_all_cached; - static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg) { struct ast_endpoint_blob *obj = stasis_message_data(msg); @@ -168,16 +186,6 @@ void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_mess } } -struct stasis_topic *ast_endpoint_topic_all(void) -{ - return endpoint_topic_all; -} - -struct stasis_caching_topic *ast_endpoint_topic_all_cached(void) -{ - return endpoint_topic_all_cached; -} - struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech, const char *name, unsigned int guaranteed) { @@ -190,8 +198,12 @@ struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech, return NULL; } - msg = stasis_cache_get_extended(ast_endpoint_topic_all_cached(), - ast_endpoint_snapshot_type(), id, guaranteed); + if (guaranteed) { + stasis_topic_wait(ast_endpoint_topic_all_cached()); + } + + msg = stasis_cache_get(ast_endpoint_cache(), + ast_endpoint_snapshot_type(), id); if (!msg) { return NULL; } @@ -267,44 +279,28 @@ struct ast_json *ast_endpoint_snapshot_to_json( return ast_json_ref(json); } -static void endpoints_stasis_shutdown(void) +static void endpoints_stasis_cleanup(void) { - stasis_caching_unsubscribe_and_join(endpoint_topic_all_cached); - endpoint_topic_all_cached = NULL; + STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_snapshot_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_state_type); - ao2_cleanup(endpoint_topic_all); - endpoint_topic_all = NULL; + ao2_cleanup(endpoint_cache_all); + endpoint_cache_all = NULL; } int ast_endpoint_stasis_init(void) { - ast_register_atexit(endpoints_stasis_shutdown); + int res = 0; + ast_register_cleanup(endpoints_stasis_cleanup); - if (STASIS_MESSAGE_TYPE_INIT(ast_endpoint_snapshot_type) != 0) { + endpoint_cache_all = stasis_cp_all_create("endpoint_topic_all", + endpoint_snapshot_get_id); + if (!endpoint_cache_all) { return -1; } - if (!endpoint_topic_all) { - endpoint_topic_all = stasis_topic_create("endpoint_topic_all"); - } + res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_snapshot_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_state_type); - if (!endpoint_topic_all) { - return -1; - } - - if (!endpoint_topic_all_cached) { - endpoint_topic_all_cached = - stasis_caching_topic_create( - endpoint_topic_all, endpoint_snapshot_get_id); - } - - if (!endpoint_topic_all_cached) { - return -1; - } - - if (STASIS_MESSAGE_TYPE_INIT(ast_endpoint_state_type) != 0) { - return -1; - } - - return 0; + return res; } diff --git a/main/stasis_wait.c b/main/stasis_wait.c new file mode 100644 index 0000000000..e94c686e1b --- /dev/null +++ b/main/stasis_wait.c @@ -0,0 +1,133 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * Joshua Colp + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Wait support for Stasis topics. + * + * \author Joshua Colp + */ + +/*** MODULEINFO + core + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" + +static struct stasis_message_type *cache_guarantee_type(void); +STASIS_MESSAGE_TYPE_DEFN(cache_guarantee_type); + +/*! \internal */ +struct caching_guarantee { + ast_mutex_t lock; + ast_cond_t cond; + unsigned int done:1; +}; + +static void caching_guarantee_dtor(void *obj) +{ + struct caching_guarantee *guarantee = obj; + + ast_assert(guarantee->done == 1); + + ast_mutex_destroy(&guarantee->lock); + ast_cond_destroy(&guarantee->cond); +} + +static void guarantee_handler(void *data, struct stasis_subscription *sub, + struct stasis_topic *topic, struct stasis_message *message) +{ + /* Wait for our particular message */ + if (data == message) { + struct caching_guarantee *guarantee; + ast_assert(cache_guarantee_type() == stasis_message_type(message)); + guarantee = stasis_message_data(message); + + ast_mutex_lock(&guarantee->lock); + guarantee->done = 1; + ast_cond_signal(&guarantee->cond); + ast_mutex_unlock(&guarantee->lock); + } +} + +static struct stasis_message *caching_guarantee_create(void) +{ + RAII_VAR(struct caching_guarantee *, guarantee, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + if (!(guarantee = ao2_alloc(sizeof(*guarantee), caching_guarantee_dtor))) { + return NULL; + } + + ast_mutex_init(&guarantee->lock); + ast_cond_init(&guarantee->cond, NULL); + + if (!(msg = stasis_message_create(cache_guarantee_type(), guarantee))) { + return NULL; + } + + ao2_ref(msg, +1); + return msg; +} + +int stasis_topic_wait(struct stasis_topic *topic) +{ + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); + struct caching_guarantee *guarantee; + + msg = caching_guarantee_create(); + if (!msg) { + return -1; + } + + sub = stasis_subscribe(topic, guarantee_handler, msg); + if (!sub) { + return -1; + } + + guarantee = stasis_message_data(msg); + + ast_mutex_lock(&guarantee->lock); + stasis_publish(topic, msg); + while (!guarantee->done) { + ast_cond_wait(&guarantee->cond, &guarantee->lock); + } + ast_mutex_unlock(&guarantee->lock); + return 0; +} + +static void wait_cleanup(void) +{ + STASIS_MESSAGE_TYPE_CLEANUP(cache_guarantee_type); +} + +int stasis_wait_init(void) +{ + ast_register_cleanup(wait_cleanup); + + if (STASIS_MESSAGE_TYPE_INIT(cache_guarantee_type) != 0) { + return -1; + } + return 0; +} diff --git a/res/ari/resource_bridges.c b/res/ari/resource_bridges.c index 17a8bb1325..7730d0cd9d 100644 --- a/res/ari/resource_bridges.c +++ b/res/ari/resource_bridges.c @@ -448,22 +448,22 @@ void ast_ari_delete_bridge(struct ast_variable *headers, struct ast_delete_bridg void ast_ari_get_bridges(struct ast_variable *headers, struct ast_get_bridges_args *args, struct ast_ari_response *response) { - RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); struct ao2_iterator i; void *obj; - caching_topic = ast_bridge_topic_all_cached(); - if (!caching_topic) { + cache = ast_bridge_cache(); + if (!cache) { ast_ari_response_error( response, 500, "Internal Server Error", "Message bus not initialized"); return; } - ao2_ref(caching_topic, +1); + ao2_ref(cache, +1); - snapshots = stasis_cache_dump(caching_topic, ast_bridge_snapshot_type()); + snapshots = stasis_cache_dump(cache, ast_bridge_snapshot_type()); if (!snapshots) { ast_ari_response_alloc_failed(response); return; diff --git a/res/ari/resource_channels.c b/res/ari/resource_channels.c index 7f3a91fbaa..dd323bac5b 100644 --- a/res/ari/resource_channels.c +++ b/res/ari/resource_channels.c @@ -466,18 +466,18 @@ void ast_ari_get_channel(struct ast_variable *headers, struct ast_ari_response *response) { RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); - struct stasis_caching_topic *caching_topic; + struct stasis_cache *cache; struct ast_channel_snapshot *snapshot; - caching_topic = ast_channel_topic_all_cached(); - if (!caching_topic) { + cache = ast_channel_cache(); + if (!cache) { ast_ari_response_error( response, 500, "Internal Server Error", "Message bus not initialized"); return; } - msg = stasis_cache_get(caching_topic, ast_channel_snapshot_type(), + msg = stasis_cache_get(cache, ast_channel_snapshot_type(), args->channel_id); if (!msg) { ast_ari_response_error( @@ -516,22 +516,22 @@ void ast_ari_get_channels(struct ast_variable *headers, struct ast_get_channels_args *args, struct ast_ari_response *response) { - RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); struct ao2_iterator i; void *obj; - caching_topic = ast_channel_topic_all_cached(); - if (!caching_topic) { + cache = ast_channel_cache(); + if (!cache) { ast_ari_response_error( response, 500, "Internal Server Error", "Message bus not initialized"); return; } - ao2_ref(caching_topic, +1); + ao2_ref(cache, +1); - snapshots = stasis_cache_dump(caching_topic, ast_channel_snapshot_type()); + snapshots = stasis_cache_dump(cache, ast_channel_snapshot_type()); if (!snapshots) { ast_ari_response_alloc_failed(response); return; diff --git a/res/ari/resource_endpoints.c b/res/ari/resource_endpoints.c index bb28df03c2..35d8a45cc5 100644 --- a/res/ari/resource_endpoints.c +++ b/res/ari/resource_endpoints.c @@ -37,22 +37,22 @@ void ast_ari_get_endpoints(struct ast_variable *headers, struct ast_get_endpoints_args *args, struct ast_ari_response *response) { - RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); struct ao2_iterator i; void *obj; - caching_topic = ast_endpoint_topic_all_cached(); - if (!caching_topic) { + cache = ast_endpoint_cache(); + if (!cache) { ast_ari_response_error( response, 500, "Internal Server Error", "Message bus not initialized"); return; } - ao2_ref(caching_topic, +1); + ao2_ref(cache, +1); - snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type()); + snapshots = stasis_cache_dump(cache, ast_endpoint_snapshot_type()); if (!snapshots) { ast_ari_response_alloc_failed(response); return; @@ -83,7 +83,7 @@ void ast_ari_get_endpoints_by_tech(struct ast_variable *headers, struct ast_get_endpoints_by_tech_args *args, struct ast_ari_response *response) { - RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); struct ao2_iterator i; @@ -91,16 +91,16 @@ void ast_ari_get_endpoints_by_tech(struct ast_variable *headers, /* TODO - if tech isn't a recognized type of endpoint, it should 404 */ - caching_topic = ast_endpoint_topic_all_cached(); - if (!caching_topic) { + cache = ast_endpoint_cache(); + if (!cache) { ast_ari_response_error( response, 500, "Internal Server Error", "Message bus not initialized"); return; } - ao2_ref(caching_topic, +1); + ao2_ref(cache, +1); - snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type()); + snapshots = stasis_cache_dump(cache, ast_endpoint_snapshot_type()); if (!snapshots) { ast_ari_response_alloc_failed(response); return; diff --git a/res/res_agi.c b/res/res_agi.c index 07735130c4..5c79ec27f3 100644 --- a/res/res_agi.c +++ b/res/res_agi.c @@ -2771,7 +2771,7 @@ static int handle_channelstatus(struct ast_channel *chan, AGI *agi, int argc, co RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); /* one argument: look for info on the specified channel */ - if ((msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), argv[2]))) { + if ((msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), argv[2]))) { struct ast_channel_snapshot *snapshot = stasis_message_data(msg); ast_agi_send(agi->fd, chan, "200 result=%d\n", snapshot->state); diff --git a/res/res_chan_stats.c b/res/res_chan_stats.c index 0f39f071fb..a43c564b1e 100644 --- a/res/res_chan_stats.c +++ b/res/res_chan_stats.c @@ -154,7 +154,7 @@ static int load_module(void) { /* You can create a message router to route messages by type */ router = stasis_message_router_create( - stasis_caching_get_topic(ast_channel_topic_all_cached())); + ast_channel_topic_all_cached()); if (!router) { return AST_MODULE_LOAD_FAILURE; } diff --git a/res/res_jabber.c b/res/res_jabber.c index 7ca0bf81e6..0e373e5e7d 100644 --- a/res/res_jabber.c +++ b/res/res_jabber.c @@ -3310,7 +3310,7 @@ static void aji_init_event_distribution(struct aji_client *client) RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup); device_state_sub = stasis_subscribe(ast_device_state_topic_all(), aji_devstate_cb, client); - cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL); + cached = stasis_cache_dump(ast_device_state_cache(), NULL); ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client); } diff --git a/res/res_stasis.c b/res/res_stasis.c index 6249503999..e4ad97eaef 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -993,7 +993,7 @@ static int load_module(void) return AST_MODULE_LOAD_FAILURE; } - channel_router = stasis_message_router_create(stasis_caching_get_topic(ast_channel_topic_all_cached())); + channel_router = stasis_message_router_create(ast_channel_topic_all_cached()); if (!channel_router) { return AST_MODULE_LOAD_FAILURE; } @@ -1013,7 +1013,7 @@ static int load_module(void) return AST_MODULE_LOAD_FAILURE; } - bridge_router = stasis_message_router_create(stasis_caching_get_topic(ast_bridge_topic_all_cached())); + bridge_router = stasis_message_router_create(ast_bridge_topic_all_cached()); if (!bridge_router) { return AST_MODULE_LOAD_FAILURE; } diff --git a/res/res_xmpp.c b/res/res_xmpp.c index 89eb45d189..3be8aa458f 100644 --- a/res/res_xmpp.c +++ b/res/res_xmpp.c @@ -1605,7 +1605,7 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client) return; } - cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL); + cached = stasis_cache_dump(ast_device_state_cache(), NULL); ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client); xmpp_pubsub_subscribe(client, "device_state"); diff --git a/res/stasis/control.c b/res/stasis/control.c index 1fbae0c7c9..94f1d700dc 100644 --- a/res/stasis/control.c +++ b/res/stasis/control.c @@ -364,13 +364,9 @@ struct ast_channel_snapshot *stasis_app_control_get_snapshot( const struct stasis_app_control *control) { RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); - struct stasis_caching_topic *caching_topic; struct ast_channel_snapshot *snapshot; - caching_topic = ast_channel_topic_all_cached(); - ast_assert(caching_topic != NULL); - - msg = stasis_cache_get(caching_topic, ast_channel_snapshot_type(), + msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), stasis_app_control_get_channel_id(control)); if (!msg) { return NULL; diff --git a/tests/test_cel.c b/tests/test_cel.c index 31f73468c4..395ec0ccca 100644 --- a/tests/test_cel.c +++ b/tests/test_cel.c @@ -224,8 +224,9 @@ static void do_sleep(void) ast_hangup((channel)); \ HANGUP_EVENT(channel, cause, dialstatus); \ APPEND_EVENT(channel, AST_CEL_CHANNEL_END, NULL, NULL, NULL); \ - ao2_cleanup(stasis_cache_get_extended(ast_channel_topic_all_cached(), \ - ast_channel_snapshot_type(), ast_channel_uniqueid(channel), 1)); \ + stasis_topic_wait(ast_channel_topic_all_cached()); \ + ao2_cleanup(stasis_cache_get(ast_channel_cache(), \ + ast_channel_snapshot_type(), ast_channel_uniqueid(channel))); \ ao2_cleanup(channel); \ channel = NULL; \ } while (0) diff --git a/tests/test_devicestate.c b/tests/test_devicestate.c index 4aff9394bc..ff5d681f4c 100644 --- a/tests/test_devicestate.c +++ b/tests/test_devicestate.c @@ -394,7 +394,7 @@ static void cache_cleanup(int unused) { RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup); /* remove all device states created during this test */ - cache_dump = stasis_cache_dump(ast_device_state_topic_cached(), NULL); + cache_dump = stasis_cache_dump(ast_device_state_cache(), NULL); if (!cache_dump) { return; } @@ -434,7 +434,7 @@ AST_TEST_DEFINE(device_state_aggregation_test) consumer = consumer_create(); ast_test_validate(test, NULL != consumer); - device_msg_router = stasis_message_router_create(stasis_caching_get_topic(ast_device_state_topic_cached())); + device_msg_router = stasis_message_router_create(ast_device_state_topic_cached()); ast_test_validate(test, NULL != device_msg_router); ao2_ref(consumer, +1); @@ -451,7 +451,7 @@ AST_TEST_DEFINE(device_state_aggregation_test) ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->state); ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->aggregate_state); - msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); + msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); device_state = stasis_message_data(msg); ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state); ao2_cleanup(msg); @@ -466,7 +466,7 @@ AST_TEST_DEFINE(device_state_aggregation_test) ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->state); ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->aggregate_state); - msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); + msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); device_state = stasis_message_data(msg); ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state); ao2_cleanup(msg); @@ -479,7 +479,7 @@ AST_TEST_DEFINE(device_state_aggregation_test) ast_test_validate(test, AST_DEVICE_INUSE == consumer->state); ast_test_validate(test, AST_DEVICE_INUSE == consumer->aggregate_state); - msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); + msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); device_state = stasis_message_data(msg); ast_test_validate(test, AST_DEVICE_INUSE == device_state->state); ao2_cleanup(msg); @@ -492,7 +492,7 @@ AST_TEST_DEFINE(device_state_aggregation_test) ast_test_validate(test, AST_DEVICE_RINGING == consumer->state); ast_test_validate(test, AST_DEVICE_RINGINUSE == consumer->aggregate_state); - msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); + msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); device_state = stasis_message_data(msg); ast_test_validate(test, AST_DEVICE_RINGINUSE == device_state->state); ao2_cleanup(msg); diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 6636633555..0b63da42e9 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -614,6 +614,7 @@ AST_TEST_DEFINE(cache_passthrough) { RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup); RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe); RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); @@ -636,7 +637,9 @@ AST_TEST_DEFINE(cache_passthrough) ast_test_validate(test, NULL != non_cache_type); topic = stasis_topic_create("SomeTopic"); ast_test_validate(test, NULL != topic); - caching_topic = stasis_caching_topic_create(topic, cache_test_data_id); + cache = stasis_cache_create(cache_test_data_id); + ast_test_validate(test, NULL != cache); + caching_topic = stasis_caching_topic_create(topic, cache); ast_test_validate(test, NULL != caching_topic); consumer = consumer_create(1); ast_test_validate(test, NULL != consumer); @@ -664,6 +667,7 @@ AST_TEST_DEFINE(cache) { RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup); RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe); RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); @@ -689,7 +693,9 @@ AST_TEST_DEFINE(cache) ast_test_validate(test, NULL != cache_type); topic = stasis_topic_create("SomeTopic"); ast_test_validate(test, NULL != topic); - caching_topic = stasis_caching_topic_create(topic, cache_test_data_id); + cache = stasis_cache_create(cache_test_data_id); + ast_test_validate(test, NULL != cache); + caching_topic = stasis_caching_topic_create(topic, cache); ast_test_validate(test, NULL != caching_topic); consumer = consumer_create(1); ast_test_validate(test, NULL != consumer); @@ -714,7 +720,7 @@ AST_TEST_DEFINE(cache) ast_test_validate(test, topic == actual_update->topic); ast_test_validate(test, NULL == actual_update->old_snapshot); ast_test_validate(test, test_message1_1 == actual_update->new_snapshot); - ast_test_validate(test, test_message1_1 == stasis_cache_get(caching_topic, cache_type, "1")); + ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1")); /* stasis_cache_get returned a ref, so unref test_message1_1 */ ao2_ref(test_message1_1, -1); @@ -723,7 +729,7 @@ AST_TEST_DEFINE(cache) ast_test_validate(test, topic == actual_update->topic); ast_test_validate(test, NULL == actual_update->old_snapshot); ast_test_validate(test, test_message2_1 == actual_update->new_snapshot); - ast_test_validate(test, test_message2_1 == stasis_cache_get(caching_topic, cache_type, "2")); + ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2")); /* stasis_cache_get returned a ref, so unref test_message2_1 */ ao2_ref(test_message2_1, -1); @@ -739,7 +745,7 @@ AST_TEST_DEFINE(cache) ast_test_validate(test, topic == actual_update->topic); ast_test_validate(test, test_message2_1 == actual_update->old_snapshot); ast_test_validate(test, test_message2_2 == actual_update->new_snapshot); - ast_test_validate(test, test_message2_2 == stasis_cache_get(caching_topic, cache_type, "2")); + ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2")); /* stasis_cache_get returned a ref, so unref test_message2_2 */ ao2_ref(test_message2_2, -1); @@ -755,7 +761,7 @@ AST_TEST_DEFINE(cache) ast_test_validate(test, topic == actual_update->topic); ast_test_validate(test, test_message1_1 == actual_update->old_snapshot); ast_test_validate(test, NULL == actual_update->new_snapshot); - ast_test_validate(test, NULL == stasis_cache_get(caching_topic, cache_type, "1")); + ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1")); return AST_TEST_PASS; } @@ -764,6 +770,7 @@ AST_TEST_DEFINE(cache_dump) { RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup); RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe); RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); @@ -791,7 +798,9 @@ AST_TEST_DEFINE(cache_dump) ast_test_validate(test, NULL != cache_type); topic = stasis_topic_create("SomeTopic"); ast_test_validate(test, NULL != topic); - caching_topic = stasis_caching_topic_create(topic, cache_test_data_id); + cache = stasis_cache_create(cache_test_data_id); + ast_test_validate(test, NULL != cache); + caching_topic = stasis_caching_topic_create(topic, cache); ast_test_validate(test, NULL != caching_topic); consumer = consumer_create(1); ast_test_validate(test, NULL != consumer); @@ -811,7 +820,7 @@ AST_TEST_DEFINE(cache_dump) ast_test_validate(test, 2 == actual_len); /* Check the cache */ - cache_dump = stasis_cache_dump(caching_topic, NULL); + cache_dump = stasis_cache_dump(cache, NULL); ast_test_validate(test, NULL != cache_dump); ast_test_validate(test, 2 == ao2_container_count(cache_dump)); i = ao2_iterator_init(cache_dump, 0); @@ -829,7 +838,7 @@ AST_TEST_DEFINE(cache_dump) ast_test_validate(test, 3 == actual_len); /* Check the cache */ - cache_dump = stasis_cache_dump(caching_topic, NULL); + cache_dump = stasis_cache_dump(cache, NULL); ast_test_validate(test, NULL != cache_dump); ast_test_validate(test, 2 == ao2_container_count(cache_dump)); i = ao2_iterator_init(cache_dump, 0); @@ -847,7 +856,7 @@ AST_TEST_DEFINE(cache_dump) ast_test_validate(test, 4 == actual_len); /* Check the cache */ - cache_dump = stasis_cache_dump(caching_topic, NULL); + cache_dump = stasis_cache_dump(cache, NULL); ast_test_validate(test, NULL != cache_dump); ast_test_validate(test, 1 == ao2_container_count(cache_dump)); i = ao2_iterator_init(cache_dump, 0); @@ -858,7 +867,7 @@ AST_TEST_DEFINE(cache_dump) /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */ ao2_cleanup(cache_dump); - cache_dump = stasis_cache_dump(caching_topic, stasis_subscription_change_type()); + cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type()); ast_test_validate(test, 0 == ao2_container_count(cache_dump)); return AST_TEST_PASS; @@ -1019,7 +1028,8 @@ static const char *cache_simple(struct stasis_message *message) { AST_TEST_DEFINE(router_cache_updates) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); - RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); + RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe_and_join); RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup); RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup); RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup); @@ -1051,7 +1061,9 @@ AST_TEST_DEFINE(router_cache_updates) topic = stasis_topic_create("TestTopic"); ast_test_validate(test, NULL != topic); - caching_topic = stasis_caching_topic_create(topic, cache_simple); + cache = stasis_cache_create(cache_simple); + ast_test_validate(test, NULL != cache); + caching_topic = stasis_caching_topic_create(topic, cache); ast_test_validate(test, NULL != caching_topic); consumer1 = consumer_create(1); diff --git a/tests/test_stasis_endpoints.c b/tests/test_stasis_endpoints.c index 9fe3ecfadd..c0be07ca83 100644 --- a/tests/test_stasis_endpoints.c +++ b/tests/test_stasis_endpoints.c @@ -152,7 +152,7 @@ AST_TEST_DEFINE(cache_clear) ast_test_validate(test, NULL != sink); sub = stasis_subscribe( - stasis_caching_get_topic(ast_endpoint_topic_all_cached()), + ast_endpoint_topic_all_cached(), stasis_message_sink_cb(), sink); ast_test_validate(test, NULL != sub);