asterisk/main/stasis_wait.c
David M. Lee e1b959ccbb Split caching out from the stasis_caching_topic.
In working with res_stasis, I discovered a significant limitation to
the current structure of stasis_caching_topics: you cannot subscribe
to cache updates for a single channel/bridge/endpoint/etc.

To address this, this patch splits the cache away from the
stasis_caching_topic, making it a first class object. The stasis_cache
object is shared amongst individual stasis_caching_topics that are
created per channel/endpoint/etc. These are still forwarded to global
whatever_all_cached topics, so their use from most of the code does
not change.

In making these changes, I noticed that we frequently used a similar
pattern for bridges, endpoints and channels:

     single_topic  ---------------->  all_topic
           ^
           |
     single_topic_cached  ----+---->  all_topic_cached
                              |
                              +---->  cache

This pattern was extracted as the 'Stasis Caching Pattern', defined in
stasis_caching_pattern.h. This avoids a lot of duplicate code between
the different domain objects.

Since the cache is now disassociated from its upstream caching topics,
this also necessitated a change to how the 'guaranteed' flag worked
for retrieving from a cache. The code for handling the caching
guarantee was extracted into a 'stasis_topic_wait' function, which
works for any stasis_topic.

(closes issue ASTERISK-22002)
Review: https://reviewboard.asterisk.org/r/2672/


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@395954 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-08-01 13:49:34 +00:00

134 lines
3.1 KiB
C

/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* Joshua Colp <jcolp@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*! \file
*
* \brief Wait support for Stasis topics.
*
* \author Joshua Colp <jcolp@digium.com>
*/
/*** MODULEINFO
<support_level>core</support_level>
***/
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/stasis.h"
static struct stasis_message_type *cache_guarantee_type(void);
STASIS_MESSAGE_TYPE_DEFN(cache_guarantee_type);
/*! \internal */
struct caching_guarantee {
ast_mutex_t lock;
ast_cond_t cond;
unsigned int done:1;
};
static void caching_guarantee_dtor(void *obj)
{
struct caching_guarantee *guarantee = obj;
ast_assert(guarantee->done == 1);
ast_mutex_destroy(&guarantee->lock);
ast_cond_destroy(&guarantee->cond);
}
static void guarantee_handler(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
{
/* Wait for our particular message */
if (data == message) {
struct caching_guarantee *guarantee;
ast_assert(cache_guarantee_type() == stasis_message_type(message));
guarantee = stasis_message_data(message);
ast_mutex_lock(&guarantee->lock);
guarantee->done = 1;
ast_cond_signal(&guarantee->cond);
ast_mutex_unlock(&guarantee->lock);
}
}
static struct stasis_message *caching_guarantee_create(void)
{
RAII_VAR(struct caching_guarantee *, guarantee, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
if (!(guarantee = ao2_alloc(sizeof(*guarantee), caching_guarantee_dtor))) {
return NULL;
}
ast_mutex_init(&guarantee->lock);
ast_cond_init(&guarantee->cond, NULL);
if (!(msg = stasis_message_create(cache_guarantee_type(), guarantee))) {
return NULL;
}
ao2_ref(msg, +1);
return msg;
}
int stasis_topic_wait(struct stasis_topic *topic)
{
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
struct caching_guarantee *guarantee;
msg = caching_guarantee_create();
if (!msg) {
return -1;
}
sub = stasis_subscribe(topic, guarantee_handler, msg);
if (!sub) {
return -1;
}
guarantee = stasis_message_data(msg);
ast_mutex_lock(&guarantee->lock);
stasis_publish(topic, msg);
while (!guarantee->done) {
ast_cond_wait(&guarantee->cond, &guarantee->lock);
}
ast_mutex_unlock(&guarantee->lock);
return 0;
}
static void wait_cleanup(void)
{
STASIS_MESSAGE_TYPE_CLEANUP(cache_guarantee_type);
}
int stasis_wait_init(void)
{
ast_register_cleanup(wait_cleanup);
if (STASIS_MESSAGE_TYPE_INIT(cache_guarantee_type) != 0) {
return -1;
}
return 0;
}