asterisk/main/stasis.c

828 lines
22 KiB
C

/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*! \file
*
* \brief Stasis Message Bus API.
*
* \author David M. Lee, II <dlee@digium.com>
*/
/*** MODULEINFO
<support_level>core</support_level>
***/
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
/*!
* \page stasis-impl Stasis Implementation Notes
*
* \par Reference counting
*
* Stasis introduces a number of objects, which are tightly related to one
* another. Because we rely on ref-counting for memory management, understanding
* these relationships is important to understanding this code.
*
* \code{.txt}
*
* stasis_topic <----> stasis_subscription
* ^ ^
* \ /
* \ /
* dispatch
* |
* |
* v
* stasis_message
* |
* |
* v
* stasis_message_type
*
* \endcode
*
* The most troubling thing in this chart is the cyclic reference between
* stasis_topic and stasis_subscription. This is both unfortunate, and
* necessary. Topics need the subscription in order to dispatch messages;
* subscriptions need the topics to unsubscribe and check subscription status.
*
* The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
* topic's reference to a subscription. When the subcription is destroyed, it
* will remove its reference to the topic.
*
* This means that until a subscription has be explicitly unsubscribed, it will
* not be destroyed. Neither will a topic be destroyed while it has subscribers.
* The destructors of both have assertions regarding this to catch ref-counting
* problems where a subscription or topic has had an extra ao2_cleanup().
*
* The \ref dispatch object is a transient object, which is posted to a
* subscription's taskprocessor to send a message to the subscriber. They have
* short life cycles, allocated on one thread, destroyed on another.
*
* During shutdown, or the deletion of a domain object, there are a flurry of
* ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
* are processed. Any one of these cleanups could be the one to actually destroy
* a given object, so care must be taken to ensure that an object isn't
* referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
* that might happen when a RAII_VAR() goes out of scope.
*
* \par Typical life cycles
*
* \li stasis_topic - There are several topics which live for the duration of
* the Asterisk process (ast_channel_topic_all(), etc.) but most of these
* are actually fed by shorter-lived topics whose lifetime is associated
* with some domain object (like ast_channel_topic() for a given
* ast_channel).
*
* \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
* topics, for similar reasons.
*
* \li dispatch - Very short lived; just long enough to post a message to a
* subscriber.
*
* \li stasis_message - Short to intermediate lifetimes, but that is mostly
* irrelevant. Messages are strictly data and have no behavior associated
* with them, so it doesn't really matter if/when they are destroyed. By
* design, a component could hold a ref to a message forever without any
* ill consequences (aside from consuming more memory).
*
* \li stasis_message_type - Long life cycles, typically only destroyed on
* module unloading or _clean_ process exit.
*
* \par Subscriber shutdown sequencing
*
* Subscribers are sensitive to shutdown sequencing, specifically in how the
* reference message types. This is fully detailed on the wiki at
* https://wiki.asterisk.org/wiki/x/K4BqAQ.
*
* In short, the lifetime of the \a data (and \a callback, if in a module) must
* be held until the stasis_subscription_final_message() has been received.
* Depending on the structure of the subscriber code, this can be handled by
* using stasis_subscription_final_message() to free resources on the final
* message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
* block until the unsubscribe has completed.
*/
/*! Initial size of the subscribers list. */
#define INITIAL_SUBSCRIBERS_MAX 4
/*! The number of buckets to use for topic pools */
#define TOPIC_POOL_BUCKETS 57
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
/*! \internal */
struct stasis_topic {
char *name;
/*! Variable length array of the subscribers */
ast_vector(struct stasis_subscription *) subscribers;
/*! Topics forwarding into this topic */
ast_vector(struct stasis_topic *) upstream_topics;
};
/* Forward declarations for the tightly-coupled subscription object */
static int topic_add_subscription(struct stasis_topic *topic,
struct stasis_subscription *sub);
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
static void topic_dtor(void *obj)
{
struct stasis_topic *topic = obj;
/* Subscribers hold a reference to topics, so they should all be
* unsubscribed before we get here. */
ast_assert(ast_vector_size(topic->subscribers) == 0);
ast_free(topic->name);
topic->name = NULL;
ast_vector_free(topic->subscribers);
ast_vector_free(topic->upstream_topics);
}
struct stasis_topic *stasis_topic_create(const char *name)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
int res = 0;
topic = ao2_alloc(sizeof(*topic), topic_dtor);
if (!topic) {
return NULL;
}
topic->name = ast_strdup(name);
if (!topic->name) {
return NULL;
}
res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
res |= ast_vector_init(topic->upstream_topics, 0);
if (res != 0) {
return NULL;
}
ao2_ref(topic, +1);
return topic;
}
const char *stasis_topic_name(const struct stasis_topic *topic)
{
return topic->name;
}
/*! \internal */
struct stasis_subscription {
/*! Unique ID for this subscription */
char uniqueid[AST_UUID_STR_LEN];
/*! Topic subscribed to. */
struct stasis_topic *topic;
/*! Mailbox for processing incoming messages. */
struct ast_taskprocessor *mailbox;
/*! Callback function for incoming message processing. */
stasis_subscription_cb callback;
/*! Data pointer to be handed to the callback. */
void *data;
/*! Lock for completion flags \c final_message_{rxed,processed}. */
ast_mutex_t join_lock;
/*! Condition for joining with subscription. */
ast_cond_t join_cond;
/*! Flag set when final message for sub has been received.
* Be sure join_lock is held before reading/setting. */
int final_message_rxed;
/*! Flag set when final message for sub has been processed.
* Be sure join_lock is held before reading/setting. */
int final_message_processed;
};
static void subscription_dtor(void *obj)
{
struct stasis_subscription *sub = obj;
/* Subscriptions need to be manually unsubscribed before destruction
* b/c there's a cyclic reference between topics and subscriptions */
ast_assert(!stasis_subscription_is_subscribed(sub));
/* If there are any messages in flight to this subscription; that would
* be bad. */
ast_assert(stasis_subscription_is_done(sub));
ao2_cleanup(sub->topic);
sub->topic = NULL;
ast_taskprocessor_unreference(sub->mailbox);
sub->mailbox = NULL;
ast_mutex_destroy(&sub->join_lock);
ast_cond_destroy(&sub->join_cond);
}
/*!
* \brief Invoke the subscription's callback.
* \param sub Subscription to invoke.
* \param topic Topic message was published to.
* \param message Message to send.
*/
static void subscription_invoke(struct stasis_subscription *sub,
struct stasis_message *message)
{
/* Notify that the final message has been received */
if (stasis_subscription_final_message(sub, message)) {
SCOPED_MUTEX(lock, &sub->join_lock);
sub->final_message_rxed = 1;
ast_cond_signal(&sub->join_cond);
}
/* Since sub is mostly immutable, no need to lock sub */
sub->callback(sub->data, sub, message);
/* Notify that the final message has been processed */
if (stasis_subscription_final_message(sub, message)) {
SCOPED_MUTEX(lock, &sub->join_lock);
sub->final_message_processed = 1;
ast_cond_signal(&sub->join_cond);
}
}
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
int needs_mailbox)
{
RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
sub = ao2_alloc(sizeof(*sub), subscription_dtor);
if (!sub) {
return NULL;
}
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
if (needs_mailbox) {
/* With a small number of subscribers, a thread-per-sub is
* acceptable. If our usage changes so that we have larger
* numbers of subscribers, we'll probably want to consider
* a threadpool. We had that originally, but with so few
* subscribers it was actually a performance loss instead of
* a gain.
*/
sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
TPS_REF_DEFAULT);
if (!sub->mailbox) {
return NULL;
}
ast_taskprocessor_set_local(sub->mailbox, sub);
/* Taskprocessor has a reference */
ao2_ref(sub, +1);
}
ao2_ref(topic, +1);
sub->topic = topic;
sub->callback = callback;
sub->data = data;
ast_mutex_init(&sub->join_lock);
ast_cond_init(&sub->join_cond, NULL);
if (topic_add_subscription(topic, sub) != 0) {
return NULL;
}
send_subscription_subscribe(topic, sub);
ao2_ref(sub, +1);
return sub;
}
struct stasis_subscription *stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data)
{
return internal_stasis_subscribe(topic, callback, data, 1);
}
static int sub_cleanup(void *data)
{
struct stasis_subscription *sub = data;
ao2_cleanup(sub);
return 0;
}
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
/* The subscription may be the last ref to this topic. Hold
* the topic ref open until after the unlock. */
RAII_VAR(struct stasis_topic *, topic,
ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
if (!sub) {
return NULL;
}
/* We have to remove the subscription first, to ensure the unsubscribe
* is the final message */
if (topic_remove_subscription(sub->topic, sub) != 0) {
ast_log(LOG_ERROR,
"Internal error: subscription has invalid topic\n");
return NULL;
}
/* Now let everyone know about the unsubscribe */
send_subscription_unsubscribe(topic, sub);
/* When all that's done, remove the ref the mailbox has on the sub */
if (sub->mailbox) {
ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
}
/* Unsubscribing unrefs the subscription */
ao2_cleanup(sub);
return NULL;
}
void stasis_subscription_join(struct stasis_subscription *subscription)
{
if (subscription) {
SCOPED_MUTEX(lock, &subscription->join_lock);
/* Wait until the processed flag has been set */
while (!subscription->final_message_processed) {
ast_cond_wait(&subscription->join_cond,
&subscription->join_lock);
}
}
}
int stasis_subscription_is_done(struct stasis_subscription *subscription)
{
if (subscription) {
SCOPED_MUTEX(lock, &subscription->join_lock);
return subscription->final_message_rxed;
}
/* Null subscription is about as done as you can get */
return 1;
}
struct stasis_subscription *stasis_unsubscribe_and_join(
struct stasis_subscription *subscription)
{
if (!subscription) {
return NULL;
}
/* Bump refcount to hold it past the unsubscribe */
ao2_ref(subscription, +1);
stasis_unsubscribe(subscription);
stasis_subscription_join(subscription);
/* Now decrement the refcount back */
ao2_cleanup(subscription);
return NULL;
}
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
{
if (sub) {
size_t i;
struct stasis_topic *topic = sub->topic;
SCOPED_AO2LOCK(lock_topic, topic);
for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
if (ast_vector_get(topic->subscribers, i) == sub) {
return 1;
}
}
}
return 0;
}
const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
{
return sub->uniqueid;
}
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
{
struct stasis_subscription_change *change;
if (stasis_message_type(msg) != stasis_subscription_change_type()) {
return 0;
}
change = stasis_message_data(msg);
if (strcmp("Unsubscribe", change->description)) {
return 0;
}
if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
return 0;
}
return 1;
}
/*!
* \brief Add a subscriber to a topic.
* \param topic Topic
* \param sub Subscriber
* \return 0 on success
* \return Non-zero on error
*/
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{
size_t idx;
SCOPED_AO2LOCK(lock, topic);
/* The reference from the topic to the subscription is shared with
* the owner of the subscription, which will explicitly unsubscribe
* to release it.
*
* If we bumped the refcount here, the owner would have to unsubscribe
* and cleanup, which is a bit awkward. */
ast_vector_append(topic->subscribers, sub);
for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
topic_add_subscription(
ast_vector_get(topic->upstream_topics, idx), sub);
}
return 0;
}
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{
size_t idx;
SCOPED_AO2LOCK(lock_topic, topic);
for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
topic_remove_subscription(
ast_vector_get(topic->upstream_topics, idx), sub);
}
return ast_vector_remove_elem_unordered(topic->subscribers, sub);
}
/*!
* \brief Dispatch a message to a subscriber
* \param data \ref dispatch object
* \return 0
*/
static int dispatch_exec(struct ast_taskprocessor_local *local)
{
struct stasis_subscription *sub = local->local_data;
struct stasis_message *message = local->data;
subscription_invoke(sub, message);
ao2_cleanup(message);
return 0;
}
static void dispatch_message(struct stasis_subscription *sub,
struct stasis_message *message)
{
if (sub->mailbox) {
ao2_bump(message);
if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) {
/* Push failed; ugh. */
ast_log(LOG_DEBUG, "Dropping dispatch\n");
ao2_cleanup(message);
}
} else {
/* Dispatch directly */
subscription_invoke(sub, message);
}
}
void stasis_publish(struct stasis_topic *_topic, struct stasis_message *message)
{
size_t i;
/* The topic may be unref'ed by the subscription invocation.
* Make sure we hold onto a reference while dispatching. */
RAII_VAR(struct stasis_topic *, topic, ao2_bump(_topic),
ao2_cleanup);
SCOPED_AO2LOCK(lock, topic);
ast_assert(topic != NULL);
ast_assert(message != NULL);
for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
struct stasis_subscription *sub =
ast_vector_get(topic->subscribers, i);
ast_assert(sub != NULL);
dispatch_message(sub, message);
}
}
/*!
* \brief Forwarding information
*
* Any message posted to \a from_topic is forwarded to \a to_topic.
*
* In cases where both the \a from_topic and \a to_topic need to be locked,
* always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
*/
struct stasis_forward {
/*! Originating topic */
struct stasis_topic *from_topic;
/*! Destination topic */
struct stasis_topic *to_topic;
};
static void forward_dtor(void *obj)
{
struct stasis_forward *forward = obj;
ao2_cleanup(forward->from_topic);
forward->from_topic = NULL;
ao2_cleanup(forward->to_topic);
forward->to_topic = NULL;
}
struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
{
if (forward) {
int idx;
struct stasis_topic *from = forward->from_topic;
struct stasis_topic *to = forward->to_topic;
SCOPED_AO2LOCK(to_lock, to);
ast_vector_remove_elem_unordered(to->upstream_topics, from);
ao2_lock(from);
for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) {
topic_remove_subscription(
from, ast_vector_get(to->subscribers, idx));
}
ao2_unlock(from);
}
ao2_cleanup(forward);
return NULL;
}
struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
struct stasis_topic *to_topic)
{
RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
if (!from_topic || !to_topic) {
return NULL;
}
forward = ao2_alloc(sizeof(*forward), forward_dtor);
if (!forward) {
return NULL;
}
forward->from_topic = ao2_bump(from_topic);
forward->to_topic = ao2_bump(to_topic);
{
SCOPED_AO2LOCK(lock, to_topic);
int res;
res = ast_vector_append(to_topic->upstream_topics, from_topic);
if (res != 0) {
return NULL;
}
{
SCOPED_AO2LOCK(lock, from_topic);
size_t idx;
for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) {
topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx));
}
}
}
return ao2_bump(forward);
}
static void subscription_change_dtor(void *obj)
{
struct stasis_subscription_change *change = obj;
ast_string_field_free_memory(change);
ao2_cleanup(change->topic);
}
static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
if (ast_string_field_init(change, 128)) {
return NULL;
}
ast_string_field_set(change, uniqueid, uniqueid);
ast_string_field_set(change, description, description);
ao2_ref(topic, +1);
change->topic = topic;
ao2_ref(change, +1);
return change;
}
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
/* This assumes that we have already unsubscribed */
ast_assert(stasis_subscription_is_subscribed(sub));
change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
if (!change) {
return;
}
msg = stasis_message_create(stasis_subscription_change_type(), change);
if (!msg) {
return;
}
stasis_publish(topic, msg);
}
static void send_subscription_unsubscribe(struct stasis_topic *topic,
struct stasis_subscription *sub)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
/* This assumes that we have already unsubscribed */
ast_assert(!stasis_subscription_is_subscribed(sub));
change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
if (!change) {
return;
}
msg = stasis_message_create(stasis_subscription_change_type(), change);
if (!msg) {
return;
}
stasis_publish(topic, msg);
/* Now we have to dispatch to the subscription itself */
dispatch_message(sub, msg);
}
struct topic_pool_entry {
struct stasis_forward *forward;
struct stasis_topic *topic;
};
static void topic_pool_entry_dtor(void *obj)
{
struct topic_pool_entry *entry = obj;
entry->forward = stasis_forward_cancel(entry->forward);
ao2_cleanup(entry->topic);
entry->topic = NULL;
}
static struct topic_pool_entry *topic_pool_entry_alloc(void)
{
return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
}
struct stasis_topic_pool {
struct ao2_container *pool_container;
struct stasis_topic *pool_topic;
};
static void topic_pool_dtor(void *obj)
{
struct stasis_topic_pool *pool = obj;
ao2_cleanup(pool->pool_container);
pool->pool_container = NULL;
ao2_cleanup(pool->pool_topic);
pool->pool_topic = NULL;
}
static int topic_pool_entry_hash(const void *obj, const int flags)
{
const char *topic_name = (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
return ast_str_case_hash(topic_name);
}
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
{
struct topic_pool_entry *opt1 = obj, *opt2 = arg;
const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
}
struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
{
RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
if (!pool) {
return NULL;
}
pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
ao2_ref(pooled_topic, +1);
pool->pool_topic = pooled_topic;
ao2_ref(pool, +1);
return pool;
}
struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
{
RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
if (topic_pool_entry) {
return topic_pool_entry->topic;
}
topic_pool_entry = topic_pool_entry_alloc();
if (!topic_pool_entry) {
return NULL;
}
topic_pool_entry->topic = stasis_topic_create(topic_name);
if (!topic_pool_entry->topic) {
return NULL;
}
topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
if (!topic_pool_entry->forward) {
return NULL;
}
ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
return topic_pool_entry->topic;
}
void stasis_log_bad_type_access(const char *name)
{
ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
}
/*! \brief Cleanup function for graceful shutdowns */
static void stasis_cleanup(void)
{
STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
}
int stasis_init(void)
{
int cache_init;
/* Be sure the types are cleaned up after the message bus */
ast_register_cleanup(stasis_cleanup);
cache_init = stasis_cache_init();
if (cache_init != 0) {
return -1;
}
if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
return -1;
}
return 0;
}