asterisk/tests/test_stasis.c

2505 lines
86 KiB
C
Raw Normal View History

/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*!
* \file
* \brief Test Stasis message bus.
*
* \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
*
* \ingroup tests
*/
/*** MODULEINFO
<depend>TEST_FRAMEWORK</depend>
<support_level>core</support_level>
***/
#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/module.h"
#include "asterisk/stasis.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/test.h"
#define test_category "/stasis/core/"
static struct ast_event *fake_event(struct stasis_message *message)
{
return ast_event_new(AST_EVENT_CUSTOM,
AST_EVENT_IE_DESCRIPTION, AST_EVENT_IE_PLTYPE_STR, "Dummy", AST_EVENT_IE_END);
}
static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
{
const char *text = stasis_message_data(message);
return ast_json_string_create(text);
}
static struct ast_manager_event_blob *fake_ami(struct stasis_message *message)
{
RAII_VAR(struct ast_manager_event_blob *, res, NULL, ao2_cleanup);
const char *text = stasis_message_data(message);
res = ast_manager_event_blob_create(EVENT_FLAG_TEST, "FakeMI",
"Message: %s\r\n", text);
if (res == NULL) {
return NULL;
}
ao2_ref(res, +1);
return res;
}
static struct stasis_message_vtable fake_vtable = {
.to_json = fake_json,
.to_ami = fake_ami
};
AST_TEST_DEFINE(message_type)
{
RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup);
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test basic message_type functions";
info->description = "Test basic message_type functions";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
ast_test_validate(test, stasis_message_type_create(NULL, NULL, NULL) == STASIS_MESSAGE_TYPE_ERROR);
ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &uut) == STASIS_MESSAGE_TYPE_SUCCESS);
ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
return AST_TEST_PASS;
}
AST_TEST_DEFINE(message)
{
RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, uut1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, uut2, NULL, ao2_cleanup);
RAII_VAR(char *, data, NULL, ao2_cleanup);
char *expected = "SomeData";
struct timeval expected_timestamp;
struct timeval time_diff;
struct ast_eid foreign_eid;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test basic message functions";
info->description = "Test basic message functions";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
memset(&foreign_eid, 0xFF, sizeof(foreign_eid));
ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
ast_test_validate(test, NULL == stasis_message_create_full(NULL, NULL, NULL));
ast_test_validate(test, NULL == stasis_message_create_full(type, NULL, NULL));
data = ao2_alloc(strlen(expected) + 1, NULL);
strcpy(data, expected);/* Safe */
expected_timestamp = ast_tvnow();
uut1 = stasis_message_create_full(type, data, &foreign_eid);
uut2 = stasis_message_create_full(type, data, NULL);
ast_test_validate(test, NULL != uut1);
ast_test_validate(test, NULL != uut2);
ast_test_validate(test, type == stasis_message_type(uut1));
ast_test_validate(test, type == stasis_message_type(uut2));
ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut1)));
ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut2)));
ast_test_validate(test, NULL != stasis_message_eid(uut1));
ast_test_validate(test, NULL == stasis_message_eid(uut2));
ast_test_validate(test, !ast_eid_cmp(&foreign_eid, stasis_message_eid(uut1)));
ast_test_validate(test, 3 == ao2_ref(data, 0)); /* uut1 and uut2 have ref to data */
time_diff = ast_tvsub(*stasis_message_timestamp(uut1), expected_timestamp);
/* 10ms is certainly long enough for the two calls to complete */
ast_test_validate(test, time_diff.tv_sec == 0);
ast_test_validate(test, time_diff.tv_usec < 10000);
ao2_ref(uut1, -1);
uut1 = NULL;
ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut1 unreffed data */
ao2_ref(uut2, -1);
uut2 = NULL;
ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut2 unreffed data */
return AST_TEST_PASS;
}
struct consumer {
ast_cond_t out;
struct stasis_message **messages_rxed;
size_t messages_rxed_len;
int ignore_subscriptions;
int complete;
};
static void consumer_dtor(void *obj)
{
struct consumer *consumer = obj;
ast_cond_destroy(&consumer->out);
while (consumer->messages_rxed_len > 0) {
ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
}
ast_free(consumer->messages_rxed);
consumer->messages_rxed = NULL;
}
static struct consumer *consumer_create(int ignore_subscriptions)
{
struct consumer *consumer;
consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
if (!consumer) {
return NULL;
}
consumer->ignore_subscriptions = ignore_subscriptions;
consumer->messages_rxed = ast_malloc(sizeof(*consumer->messages_rxed));
if (!consumer->messages_rxed) {
ao2_cleanup(consumer);
return NULL;
}
ast_cond_init(&consumer->out, NULL);
return consumer;
}
Multiple revisions 399887,400138,400178,400180-400181 ........ r399887 | dlee | 2013-09-26 10:41:47 -0500 (Thu, 26 Sep 2013) | 1 line Minor performance bump by not allocate manager variable struct if we don't need it ........ r400138 | dlee | 2013-09-30 10:24:00 -0500 (Mon, 30 Sep 2013) | 23 lines Stasis performance improvements This patch addresses several performance problems that were found in the initial performance testing of Asterisk 12. The Stasis dispatch object was allocated as an AO2 object, even though it has a very confined lifecycle. This was replaced with a straight ast_malloc(). The Stasis message router was spending an inordinate amount of time searching hash tables. In this case, most of our routers had 6 or fewer routes in them to begin with. This was replaced with an array that's searched linearly for the route. We more heavily rely on AO2 objects in Asterisk 12, and the memset() in ao2_ref() actually became noticeable on the profile. This was #ifdef'ed to only run when AO2_DEBUG was enabled. After being misled by an erroneous comment in taskprocessor.c during profiling, the wrong comment was removed. Review: https://reviewboard.asterisk.org/r/2873/ ........ r400178 | dlee | 2013-09-30 13:26:27 -0500 (Mon, 30 Sep 2013) | 24 lines Taskprocessor optimization; switch Stasis to use taskprocessors This patch optimizes taskprocessor to use a semaphore for signaling, which the OS can do a better job at managing contention and waiting that we can with a mutex and condition. The taskprocessor execution was also slightly optimized to reduce the number of locks taken. The only observable difference in the taskprocessor implementation is that when the final reference to the taskprocessor goes away, it will execute all tasks to completion instead of discarding the unexecuted tasks. For systems where unnamed semaphores are not supported, a really simple semaphore implementation is provided. (Which gives identical performance as the original taskprocessor implementation). The way we ended up implementing Stasis caused the threadpool to be a burden instead of a boost to performance. This was switched to just use taskprocessors directly for subscriptions. Review: https://reviewboard.asterisk.org/r/2881/ ........ r400180 | dlee | 2013-09-30 13:39:34 -0500 (Mon, 30 Sep 2013) | 28 lines Optimize how Stasis forwards are dispatched This patch optimizes how forwards are dispatched in Stasis. Originally, forwards were dispatched as subscriptions that are invoked on the publishing thread. This did not account for the vast number of forwards we would end up having in the system, and the amount of work it would take to walk though the forward subscriptions. This patch modifies Stasis so that rather than walking the tree of forwards on every dispatch, when forwards and subscriptions are changed, the subscriber list for every topic in the tree is changed. This has a couple of benefits. First, this reduces the workload of dispatching messages. It also reduces contention when dispatching to different topics that happen to forward to the same aggregation topic (as happens with all of the channel, bridge and endpoint topics). Since forwards are no longer subscriptions, the bulk of this patch is simply changing stasis_subscription objects to stasis_forward objects (which, admittedly, I should have done in the first place.) Since this required me to yet again put in a growing array, I finally abstracted that out into a set of ast_vector macros in asterisk/vector.h. Review: https://reviewboard.asterisk.org/r/2883/ ........ r400181 | dlee | 2013-09-30 13:48:57 -0500 (Mon, 30 Sep 2013) | 28 lines Remove dispatch object allocation from Stasis publishing While looking for areas for performance improvement, I realized that an unused feature in Stasis was negatively impacting performance. When a message is sent to a subscriber, a dispatch object is allocated for the dispatch, containing the topic the message was published to, the subscriber the message is being sent to, and the message itself. The topic is actually unused by any subscriber in Asterisk today. And the subscriber is associated with the taskprocessor the message is being dispatched to. First, this patch removes the unused topic parameter from Stasis subscription callbacks. Second, this patch introduces the concept of taskprocessor local data, data that may be set on a taskprocessor and provided along with the data pointer when a task is pushed using the ast_taskprocessor_push_local() call. This allows the task to have both data specific to that taskprocessor, in addition to data specific to that invocation. With those two changes, the dispatch object can be removed completely, and the message is simply refcounted and sent directly to the taskprocessor. Review: https://reviewboard.asterisk.org/r/2884/ ........ Merged revisions 399887,400138,400178,400180-400181 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@400186 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-09-30 18:55:27 +00:00
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct consumer *consumer = data;
RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, consumer);
if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
++consumer->messages_rxed_len;
consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
ast_assert(consumer->messages_rxed != NULL);
consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
ao2_ref(message, +1);
}
if (stasis_subscription_final_message(sub, message)) {
consumer->complete = 1;
consumer_needs_cleanup = consumer;
}
ast_cond_signal(&consumer->out);
}
static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct consumer *consumer = data;
RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, consumer);
if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
++consumer->messages_rxed_len;
consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
ast_assert(consumer->messages_rxed != NULL);
consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
ao2_ref(message, +1);
}
if (stasis_subscription_final_message(sub, message)) {
consumer->complete = 1;
consumer_needs_cleanup = consumer;
}
}
static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
{
struct timeval start = ast_tvnow();
struct timespec end = {
.tv_sec = start.tv_sec + 30,
.tv_nsec = start.tv_usec * 1000
};
SCOPED_AO2LOCK(lock, consumer);
while (consumer->messages_rxed_len < expected_len) {
int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
if (r == ETIMEDOUT) {
break;
}
ast_assert(r == 0); /* Not expecting any other types of errors */
}
return consumer->messages_rxed_len;
}
static int consumer_wait_for_completion(struct consumer *consumer)
{
struct timeval start = ast_tvnow();
struct timespec end = {
.tv_sec = start.tv_sec + 3,
.tv_nsec = start.tv_usec * 1000
};
SCOPED_AO2LOCK(lock, consumer);
while (!consumer->complete) {
int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
if (r == ETIMEDOUT) {
break;
}
ast_assert(r == 0); /* Not expecting any other types of errors */
}
return consumer->complete;
}
static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
{
struct timeval start = ast_tvnow();
struct timeval diff = {
.tv_sec = 0,
.tv_usec = 100000 /* wait for 100ms */
};
struct timeval end_tv = ast_tvadd(start, diff);
struct timespec end = {
.tv_sec = end_tv.tv_sec,
.tv_nsec = end_tv.tv_usec * 1000
};
SCOPED_AO2LOCK(lock, consumer);
while (consumer->messages_rxed_len == expected_len) {
int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
if (r == ETIMEDOUT) {
break;
}
ast_assert(r == 0); /* Not expecting any other types of errors */
}
return consumer->messages_rxed_len;
}
AST_TEST_DEFINE(subscription_messages)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
int complete;
struct stasis_subscription_change *change;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test subscribe/unsubscribe messages";
info->description = "Test subscribe/unsubscribe messages";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer = consumer_create(0);
ast_test_validate(test, NULL != consumer);
uut = stasis_subscribe(topic, consumer_exec, consumer);
ast_test_validate(test, NULL != uut);
ao2_ref(consumer, +1);
expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
uut = stasis_unsubscribe(uut);
complete = consumer_wait_for_completion(consumer);
ast_test_validate(test, 1 == complete);
ast_test_validate(test, 2 == consumer->messages_rxed_len);
ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
change = stasis_message_data(consumer->messages_rxed[0]);
ast_test_validate(test, topic == change->topic);
ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
change = stasis_message_data(consumer->messages_rxed[1]);
ast_test_validate(test, topic == change->topic);
ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
return AST_TEST_PASS;
}
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
AST_TEST_DEFINE(subscription_pool_messages)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
int complete;
struct stasis_subscription_change *change;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription";
info->description = "Test subscribe/unsubscribe messages using a threadpool subscription";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer = consumer_create(0);
ast_test_validate(test, NULL != consumer);
uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
ast_test_validate(test, NULL != uut);
ao2_ref(consumer, +1);
expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
uut = stasis_unsubscribe(uut);
complete = consumer_wait_for_completion(consumer);
ast_test_validate(test, 1 == complete);
ast_test_validate(test, 2 == consumer->messages_rxed_len);
ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
change = stasis_message_data(consumer->messages_rxed[0]);
ast_test_validate(test, topic == change->topic);
ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
change = stasis_message_data(consumer->messages_rxed[1]);
ast_test_validate(test, topic == change->topic);
ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
return AST_TEST_PASS;
}
AST_TEST_DEFINE(publish)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
int actual_len;
const char *actual;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test publishing";
info->description = "Test publishing";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
uut = stasis_subscribe(topic, consumer_exec, consumer);
ast_test_validate(test, NULL != uut);
ao2_ref(consumer, +1);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
test_message = stasis_message_create(test_message_type, test_data);
stasis_publish(topic, test_message);
actual_len = consumer_wait_for(consumer, 1);
ast_test_validate(test, 1 == actual_len);
actual = stasis_message_data(consumer->messages_rxed[0]);
ast_test_validate(test, test_data == actual);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(publish_sync)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
int actual_len;
const char *actual;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test synchronous publishing";
info->description = "Test synchronous publishing";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
uut = stasis_subscribe(topic, consumer_exec_sync, consumer);
ast_test_validate(test, NULL != uut);
ao2_ref(consumer, +1);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
test_message = stasis_message_create(test_message_type, test_data);
stasis_publish_sync(uut, test_message);
actual_len = consumer->messages_rxed_len;
ast_test_validate(test, 1 == actual_len);
actual = stasis_message_data(consumer->messages_rxed[0]);
ast_test_validate(test, test_data == actual);
return AST_TEST_PASS;
}
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
AST_TEST_DEFINE(publish_pool)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
int actual_len;
const char *actual;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test publishing with a threadpool";
info->description = "Test publishing to a subscriber whose\n"
"subscription dictates messages are received through a\n"
"threadpool.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
ast_test_validate(test, NULL != uut);
ao2_ref(consumer, +1);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
test_message = stasis_message_create(test_message_type, test_data);
stasis_publish(topic, test_message);
actual_len = consumer_wait_for(consumer, 1);
ast_test_validate(test, 1 == actual_len);
actual = stasis_message_data(consumer->messages_rxed[0]);
ast_test_validate(test, test_data == actual);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(unsubscribe_stops_messages)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
int actual_len;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test simple subscriptions";
info->description = "Test simple subscriptions";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
uut = stasis_subscribe(topic, consumer_exec, consumer);
ast_test_validate(test, NULL != uut);
ao2_ref(consumer, +1);
uut = stasis_unsubscribe(uut);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
test_message = stasis_message_create(test_message_type, test_data);
stasis_publish(topic, test_message);
actual_len = consumer_should_stay(consumer, 0);
ast_test_validate(test, 0 == actual_len);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(forward)
{
RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
Multiple revisions 399887,400138,400178,400180-400181 ........ r399887 | dlee | 2013-09-26 10:41:47 -0500 (Thu, 26 Sep 2013) | 1 line Minor performance bump by not allocate manager variable struct if we don't need it ........ r400138 | dlee | 2013-09-30 10:24:00 -0500 (Mon, 30 Sep 2013) | 23 lines Stasis performance improvements This patch addresses several performance problems that were found in the initial performance testing of Asterisk 12. The Stasis dispatch object was allocated as an AO2 object, even though it has a very confined lifecycle. This was replaced with a straight ast_malloc(). The Stasis message router was spending an inordinate amount of time searching hash tables. In this case, most of our routers had 6 or fewer routes in them to begin with. This was replaced with an array that's searched linearly for the route. We more heavily rely on AO2 objects in Asterisk 12, and the memset() in ao2_ref() actually became noticeable on the profile. This was #ifdef'ed to only run when AO2_DEBUG was enabled. After being misled by an erroneous comment in taskprocessor.c during profiling, the wrong comment was removed. Review: https://reviewboard.asterisk.org/r/2873/ ........ r400178 | dlee | 2013-09-30 13:26:27 -0500 (Mon, 30 Sep 2013) | 24 lines Taskprocessor optimization; switch Stasis to use taskprocessors This patch optimizes taskprocessor to use a semaphore for signaling, which the OS can do a better job at managing contention and waiting that we can with a mutex and condition. The taskprocessor execution was also slightly optimized to reduce the number of locks taken. The only observable difference in the taskprocessor implementation is that when the final reference to the taskprocessor goes away, it will execute all tasks to completion instead of discarding the unexecuted tasks. For systems where unnamed semaphores are not supported, a really simple semaphore implementation is provided. (Which gives identical performance as the original taskprocessor implementation). The way we ended up implementing Stasis caused the threadpool to be a burden instead of a boost to performance. This was switched to just use taskprocessors directly for subscriptions. Review: https://reviewboard.asterisk.org/r/2881/ ........ r400180 | dlee | 2013-09-30 13:39:34 -0500 (Mon, 30 Sep 2013) | 28 lines Optimize how Stasis forwards are dispatched This patch optimizes how forwards are dispatched in Stasis. Originally, forwards were dispatched as subscriptions that are invoked on the publishing thread. This did not account for the vast number of forwards we would end up having in the system, and the amount of work it would take to walk though the forward subscriptions. This patch modifies Stasis so that rather than walking the tree of forwards on every dispatch, when forwards and subscriptions are changed, the subscriber list for every topic in the tree is changed. This has a couple of benefits. First, this reduces the workload of dispatching messages. It also reduces contention when dispatching to different topics that happen to forward to the same aggregation topic (as happens with all of the channel, bridge and endpoint topics). Since forwards are no longer subscriptions, the bulk of this patch is simply changing stasis_subscription objects to stasis_forward objects (which, admittedly, I should have done in the first place.) Since this required me to yet again put in a growing array, I finally abstracted that out into a set of ast_vector macros in asterisk/vector.h. Review: https://reviewboard.asterisk.org/r/2883/ ........ r400181 | dlee | 2013-09-30 13:48:57 -0500 (Mon, 30 Sep 2013) | 28 lines Remove dispatch object allocation from Stasis publishing While looking for areas for performance improvement, I realized that an unused feature in Stasis was negatively impacting performance. When a message is sent to a subscriber, a dispatch object is allocated for the dispatch, containing the topic the message was published to, the subscriber the message is being sent to, and the message itself. The topic is actually unused by any subscriber in Asterisk today. And the subscriber is associated with the taskprocessor the message is being dispatched to. First, this patch removes the unused topic parameter from Stasis subscription callbacks. Second, this patch introduces the concept of taskprocessor local data, data that may be set on a taskprocessor and provided along with the data pointer when a task is pushed using the ast_taskprocessor_push_local() call. This allows the task to have both data specific to that taskprocessor, in addition to data specific to that invocation. With those two changes, the dispatch object can be removed completely, and the message is simply refcounted and sent directly to the taskprocessor. Review: https://reviewboard.asterisk.org/r/2884/ ........ Merged revisions 399887,400138,400178,400180-400181 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@400186 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-09-30 18:55:27 +00:00
RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
int actual_len;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test sending events to a parent topic";
info->description = "Test sending events to a parent topic.\n"
"This test creates three topics (one parent, two children)\n"
"and publishes a message to one child, and verifies it's\n"
"only seen by that child and the parent";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
parent_topic = stasis_topic_create("ParentTestTopic");
ast_test_validate(test, NULL != parent_topic);
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
forward_sub = stasis_forward_all(topic, parent_topic);
ast_test_validate(test, NULL != forward_sub);
parent_consumer = consumer_create(1);
ast_test_validate(test, NULL != parent_consumer);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
ast_test_validate(test, NULL != parent_sub);
ao2_ref(parent_consumer, +1);
sub = stasis_subscribe(topic, consumer_exec, consumer);
ast_test_validate(test, NULL != sub);
ao2_ref(consumer, +1);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
test_message = stasis_message_create(test_message_type, test_data);
stasis_publish(topic, test_message);
actual_len = consumer_wait_for(consumer, 1);
ast_test_validate(test, 1 == actual_len);
actual_len = consumer_wait_for(parent_consumer, 1);
ast_test_validate(test, 1 == actual_len);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(interleaving)
{
RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
Multiple revisions 399887,400138,400178,400180-400181 ........ r399887 | dlee | 2013-09-26 10:41:47 -0500 (Thu, 26 Sep 2013) | 1 line Minor performance bump by not allocate manager variable struct if we don't need it ........ r400138 | dlee | 2013-09-30 10:24:00 -0500 (Mon, 30 Sep 2013) | 23 lines Stasis performance improvements This patch addresses several performance problems that were found in the initial performance testing of Asterisk 12. The Stasis dispatch object was allocated as an AO2 object, even though it has a very confined lifecycle. This was replaced with a straight ast_malloc(). The Stasis message router was spending an inordinate amount of time searching hash tables. In this case, most of our routers had 6 or fewer routes in them to begin with. This was replaced with an array that's searched linearly for the route. We more heavily rely on AO2 objects in Asterisk 12, and the memset() in ao2_ref() actually became noticeable on the profile. This was #ifdef'ed to only run when AO2_DEBUG was enabled. After being misled by an erroneous comment in taskprocessor.c during profiling, the wrong comment was removed. Review: https://reviewboard.asterisk.org/r/2873/ ........ r400178 | dlee | 2013-09-30 13:26:27 -0500 (Mon, 30 Sep 2013) | 24 lines Taskprocessor optimization; switch Stasis to use taskprocessors This patch optimizes taskprocessor to use a semaphore for signaling, which the OS can do a better job at managing contention and waiting that we can with a mutex and condition. The taskprocessor execution was also slightly optimized to reduce the number of locks taken. The only observable difference in the taskprocessor implementation is that when the final reference to the taskprocessor goes away, it will execute all tasks to completion instead of discarding the unexecuted tasks. For systems where unnamed semaphores are not supported, a really simple semaphore implementation is provided. (Which gives identical performance as the original taskprocessor implementation). The way we ended up implementing Stasis caused the threadpool to be a burden instead of a boost to performance. This was switched to just use taskprocessors directly for subscriptions. Review: https://reviewboard.asterisk.org/r/2881/ ........ r400180 | dlee | 2013-09-30 13:39:34 -0500 (Mon, 30 Sep 2013) | 28 lines Optimize how Stasis forwards are dispatched This patch optimizes how forwards are dispatched in Stasis. Originally, forwards were dispatched as subscriptions that are invoked on the publishing thread. This did not account for the vast number of forwards we would end up having in the system, and the amount of work it would take to walk though the forward subscriptions. This patch modifies Stasis so that rather than walking the tree of forwards on every dispatch, when forwards and subscriptions are changed, the subscriber list for every topic in the tree is changed. This has a couple of benefits. First, this reduces the workload of dispatching messages. It also reduces contention when dispatching to different topics that happen to forward to the same aggregation topic (as happens with all of the channel, bridge and endpoint topics). Since forwards are no longer subscriptions, the bulk of this patch is simply changing stasis_subscription objects to stasis_forward objects (which, admittedly, I should have done in the first place.) Since this required me to yet again put in a growing array, I finally abstracted that out into a set of ast_vector macros in asterisk/vector.h. Review: https://reviewboard.asterisk.org/r/2883/ ........ r400181 | dlee | 2013-09-30 13:48:57 -0500 (Mon, 30 Sep 2013) | 28 lines Remove dispatch object allocation from Stasis publishing While looking for areas for performance improvement, I realized that an unused feature in Stasis was negatively impacting performance. When a message is sent to a subscriber, a dispatch object is allocated for the dispatch, containing the topic the message was published to, the subscriber the message is being sent to, and the message itself. The topic is actually unused by any subscriber in Asterisk today. And the subscriber is associated with the taskprocessor the message is being dispatched to. First, this patch removes the unused topic parameter from Stasis subscription callbacks. Second, this patch introduces the concept of taskprocessor local data, data that may be set on a taskprocessor and provided along with the data pointer when a task is pushed using the ast_taskprocessor_push_local() call. This allows the task to have both data specific to that taskprocessor, in addition to data specific to that invocation. With those two changes, the dispatch object can be removed completely, and the message is simply refcounted and sent directly to the taskprocessor. Review: https://reviewboard.asterisk.org/r/2884/ ........ Merged revisions 399887,400138,400178,400180-400181 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@400186 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-09-30 18:55:27 +00:00
RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
int actual_len;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test sending interleaved events to a parent topic";
info->description = "Test sending events to a parent topic.\n"
"This test creates three topics (one parent, two children)\n"
"and publishes messages alternately between the children.\n"
"It verifies that the messages are received in the expected\n"
"order.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
ast_test_validate(test, stasis_message_type_create("test", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
ast_test_validate(test, NULL != test_message_type);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
test_message1 = stasis_message_create(test_message_type, test_data);
ast_test_validate(test, NULL != test_message1);
test_message2 = stasis_message_create(test_message_type, test_data);
ast_test_validate(test, NULL != test_message2);
test_message3 = stasis_message_create(test_message_type, test_data);
ast_test_validate(test, NULL != test_message3);
parent_topic = stasis_topic_create("ParentTestTopic");
ast_test_validate(test, NULL != parent_topic);
topic1 = stasis_topic_create("Topic1");
ast_test_validate(test, NULL != topic1);
topic2 = stasis_topic_create("Topic2");
ast_test_validate(test, NULL != topic2);
forward_sub1 = stasis_forward_all(topic1, parent_topic);
ast_test_validate(test, NULL != forward_sub1);
forward_sub2 = stasis_forward_all(topic2, parent_topic);
ast_test_validate(test, NULL != forward_sub2);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
ast_test_validate(test, NULL != sub);
ao2_ref(consumer, +1);
stasis_publish(topic1, test_message1);
stasis_publish(topic2, test_message2);
stasis_publish(topic1, test_message3);
actual_len = consumer_wait_for(consumer, 3);
ast_test_validate(test, 3 == actual_len);
ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
return AST_TEST_PASS;
}
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
AST_TEST_DEFINE(subscription_interleaving)
{
RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
RAII_VAR(struct stasis_subscription *, sub1, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_subscription *, sub2, NULL, stasis_unsubscribe);
RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
int actual_len;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test sending interleaved events to a parent topic with different subscribers";
info->description = "Test sending events to a parent topic.\n"
"This test creates three topics (one parent, two children)\n"
"and publishes messages alternately between the children.\n"
"It verifies that the messages are received in the expected\n"
"order, for different subscription types: one with a dedicated\n"
"thread, the other on the Stasis threadpool.";
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
ast_test_validate(test, stasis_message_type_create("test", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
ast_test_validate(test, NULL != test_message_type);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
test_message1 = stasis_message_create(test_message_type, test_data);
ast_test_validate(test, NULL != test_message1);
test_message2 = stasis_message_create(test_message_type, test_data);
ast_test_validate(test, NULL != test_message2);
test_message3 = stasis_message_create(test_message_type, test_data);
ast_test_validate(test, NULL != test_message3);
parent_topic = stasis_topic_create("ParentTestTopic");
ast_test_validate(test, NULL != parent_topic);
topic1 = stasis_topic_create("Topic1");
ast_test_validate(test, NULL != topic1);
topic2 = stasis_topic_create("Topic2");
ast_test_validate(test, NULL != topic2);
forward_sub1 = stasis_forward_all(topic1, parent_topic);
ast_test_validate(test, NULL != forward_sub1);
forward_sub2 = stasis_forward_all(topic2, parent_topic);
ast_test_validate(test, NULL != forward_sub2);
consumer1 = consumer_create(1);
ast_test_validate(test, NULL != consumer1);
consumer2 = consumer_create(1);
ast_test_validate(test, NULL != consumer2);
sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1);
ast_test_validate(test, NULL != sub1);
ao2_ref(consumer1, +1);
sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2);
ast_test_validate(test, NULL != sub2);
ao2_ref(consumer2, +1);
stasis_publish(topic1, test_message1);
stasis_publish(topic2, test_message2);
stasis_publish(topic1, test_message3);
actual_len = consumer_wait_for(consumer1, 3);
ast_test_validate(test, 3 == actual_len);
actual_len = consumer_wait_for(consumer2, 3);
ast_test_validate(test, 3 == actual_len);
ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]);
ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]);
ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]);
ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]);
ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]);
ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]);
return AST_TEST_PASS;
}
struct cache_test_data {
char *id;
char *value;
};
static void cache_test_data_dtor(void *obj)
{
struct cache_test_data *data = obj;
ast_free(data->id);
ast_free(data->value);
}
static struct stasis_message *cache_test_message_create_full(struct stasis_message_type *type, const char *name, const char *value, struct ast_eid *eid)
{
RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
data = ao2_alloc(sizeof(*data), cache_test_data_dtor);
if (data == NULL) {
return NULL;
}
ast_assert(name != NULL);
ast_assert(value != NULL);
data->id = ast_strdup(name);
data->value = ast_strdup(value);
if (!data->id || !data->value) {
return NULL;
}
return stasis_message_create_full(type, data, eid);
}
static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
{
return cache_test_message_create_full(type, name, value, &ast_eid_default);
}
static const char *cache_test_data_id(struct stasis_message *message)
{
struct cache_test_data *cachable = stasis_message_data(message);
if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
return NULL;
}
return cachable->id;
}
static struct stasis_message *cache_test_aggregate_calc_fn(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
{
struct stasis_message *aggregate_snapshot;
struct stasis_message *snapshot;
struct stasis_message_type *type = NULL;
struct cache_test_data *test_data = NULL;
int idx;
int accumulated = 0;
char aggregate_str[30];
/* Accumulate the aggregate value. */
snapshot = stasis_cache_entry_get_local(entry);
if (snapshot) {
type = stasis_message_type(snapshot);
test_data = stasis_message_data(snapshot);
accumulated += atoi(test_data->value);
}
for (idx = 0; ; ++idx) {
snapshot = stasis_cache_entry_get_remote(entry, idx);
if (!snapshot) {
break;
}
type = stasis_message_type(snapshot);
test_data = stasis_message_data(snapshot);
accumulated += atoi(test_data->value);
}
if (!test_data) {
/* There are no test entries cached. Delete the aggregate. */
return NULL;
}
snapshot = stasis_cache_entry_get_aggregate(entry);
if (snapshot) {
type = stasis_message_type(snapshot);
test_data = stasis_message_data(snapshot);
if (accumulated == atoi(test_data->value)) {
/* Aggregate test entry did not change. */
return ao2_bump(snapshot);
}
}
snprintf(aggregate_str, sizeof(aggregate_str), "%d", accumulated);
aggregate_snapshot = cache_test_message_create_full(type, test_data->id, aggregate_str, NULL);
if (!aggregate_snapshot) {
/* Bummer. We have to keep the old aggregate snapshot. */
ast_log(LOG_ERROR, "Could not create aggregate snapshot.\n");
return ao2_bump(snapshot);
}
return aggregate_snapshot;
}
static void cache_test_aggregate_publish_fn(struct stasis_topic *topic, struct stasis_message *aggregate)
{
stasis_publish(topic, aggregate);
}
static int check_cache_aggregate(struct stasis_cache *cache, struct stasis_message_type *cache_type, const char *id, const char *value)
{
RAII_VAR(struct stasis_message *, aggregate, NULL, ao2_cleanup);
struct cache_test_data *test_data;
aggregate = stasis_cache_get_by_eid(cache, cache_type, id, NULL);
if (!aggregate) {
/* No aggregate, return true if given no value. */
return !value;
}
/* Return true if the given value matches the aggregate value. */
test_data = stasis_message_data(aggregate);
return value && !strcmp(value, test_data->value);
}
Tweak caching topics to fix CEL tests The Stasis changes in r395954 had an unanticipated side effect: messages published directly to an _all topic does not get forwarded to the corresponding caching topic. This patch fixes that by changing how caching topics forward messages, and how the caching pattern forwards are setup. For the caching pattern, the all_topic is forwarded to the all_topic_cached. This forwards messages published directly to the all_topic to all_topic_cached. In order to avoid duplicate messages on all_topic_cached, caching topics were changed to no longer forward uncached messages. Subscribers to an individual caching topic should only expect to receive cache updates, and subscription change messages. Since individual caching topics are new, this shouldn't be a problem. There are a few minor changes to the pre-cache split behavior. * For topics changed to use the caching pattern, the all_topic_cached will forward snapshots in addition to cache updates. Since subscribers by design ignore unexpected messages, this should be fine. * Caching topics that don't use the caching pattern no longer forward non-cache updates. This makes no difference for the current caching topics. * mwi_topic_cached, channel_by_name_topic and presence_state_topic_cached have no subscribers * device_state_topic_cached's only subscriber only processes cache udpates (issue ASTERISK-22243) Review: https://reviewboard.asterisk.org/r/2738 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@396329 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-08-06 14:28:23 +00:00
AST_TEST_DEFINE(cache_filter)
{
RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
int actual_len;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
Tweak caching topics to fix CEL tests The Stasis changes in r395954 had an unanticipated side effect: messages published directly to an _all topic does not get forwarded to the corresponding caching topic. This patch fixes that by changing how caching topics forward messages, and how the caching pattern forwards are setup. For the caching pattern, the all_topic is forwarded to the all_topic_cached. This forwards messages published directly to the all_topic to all_topic_cached. In order to avoid duplicate messages on all_topic_cached, caching topics were changed to no longer forward uncached messages. Subscribers to an individual caching topic should only expect to receive cache updates, and subscription change messages. Since individual caching topics are new, this shouldn't be a problem. There are a few minor changes to the pre-cache split behavior. * For topics changed to use the caching pattern, the all_topic_cached will forward snapshots in addition to cache updates. Since subscribers by design ignore unexpected messages, this should be fine. * Caching topics that don't use the caching pattern no longer forward non-cache updates. This makes no difference for the current caching topics. * mwi_topic_cached, channel_by_name_topic and presence_state_topic_cached have no subscribers * device_state_topic_cached's only subscriber only processes cache udpates (issue ASTERISK-22243) Review: https://reviewboard.asterisk.org/r/2738 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@396329 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-08-06 14:28:23 +00:00
info->summary = "Test caching topics only forward cache_update messages.";
info->description = "Test caching topics only forward cache_update messages.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
ast_test_validate(test, stasis_message_type_create("NonCacheable", NULL, &non_cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
ast_test_validate(test, NULL != non_cache_type);
topic = stasis_topic_create("SomeTopic");
ast_test_validate(test, NULL != topic);
cache = stasis_cache_create(cache_test_data_id);
ast_test_validate(test, NULL != cache);
caching_topic = stasis_caching_topic_create(topic, cache);
ast_test_validate(test, NULL != caching_topic);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
ast_test_validate(test, NULL != sub);
ao2_ref(consumer, +1);
test_message = cache_test_message_create(non_cache_type, "1", "1");
ast_test_validate(test, NULL != test_message);
stasis_publish(topic, test_message);
Tweak caching topics to fix CEL tests The Stasis changes in r395954 had an unanticipated side effect: messages published directly to an _all topic does not get forwarded to the corresponding caching topic. This patch fixes that by changing how caching topics forward messages, and how the caching pattern forwards are setup. For the caching pattern, the all_topic is forwarded to the all_topic_cached. This forwards messages published directly to the all_topic to all_topic_cached. In order to avoid duplicate messages on all_topic_cached, caching topics were changed to no longer forward uncached messages. Subscribers to an individual caching topic should only expect to receive cache updates, and subscription change messages. Since individual caching topics are new, this shouldn't be a problem. There are a few minor changes to the pre-cache split behavior. * For topics changed to use the caching pattern, the all_topic_cached will forward snapshots in addition to cache updates. Since subscribers by design ignore unexpected messages, this should be fine. * Caching topics that don't use the caching pattern no longer forward non-cache updates. This makes no difference for the current caching topics. * mwi_topic_cached, channel_by_name_topic and presence_state_topic_cached have no subscribers * device_state_topic_cached's only subscriber only processes cache udpates (issue ASTERISK-22243) Review: https://reviewboard.asterisk.org/r/2738 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@396329 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-08-06 14:28:23 +00:00
actual_len = consumer_should_stay(consumer, 0);
ast_test_validate(test, 0 == actual_len);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(cache)
{
RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
int actual_len;
struct stasis_cache_update *actual_update;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test passing messages through cache topic unscathed.";
info->description = "Test passing messages through cache topic unscathed.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
ast_test_validate(test, NULL != cache_type);
topic = stasis_topic_create("SomeTopic");
ast_test_validate(test, NULL != topic);
cache = stasis_cache_create(cache_test_data_id);
ast_test_validate(test, NULL != cache);
caching_topic = stasis_caching_topic_create(topic, cache);
ast_test_validate(test, NULL != caching_topic);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
ast_test_validate(test, NULL != sub);
ao2_ref(consumer, +1);
test_message1_1 = cache_test_message_create(cache_type, "1", "1");
ast_test_validate(test, NULL != test_message1_1);
test_message2_1 = cache_test_message_create(cache_type, "2", "1");
ast_test_validate(test, NULL != test_message2_1);
/* Post a couple of snapshots */
stasis_publish(topic, test_message1_1);
stasis_publish(topic, test_message2_1);
actual_len = consumer_wait_for(consumer, 2);
ast_test_validate(test, 2 == actual_len);
/* Check for new snapshot messages */
ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
actual_update = stasis_message_data(consumer->messages_rxed[0]);
ast_test_validate(test, NULL == actual_update->old_snapshot);
ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
/* stasis_cache_get returned a ref, so unref test_message1_1 */
ao2_ref(test_message1_1, -1);
ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
actual_update = stasis_message_data(consumer->messages_rxed[1]);
ast_test_validate(test, NULL == actual_update->old_snapshot);
ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
/* stasis_cache_get returned a ref, so unref test_message2_1 */
ao2_ref(test_message2_1, -1);
/* Update snapshot 2 */
test_message2_2 = cache_test_message_create(cache_type, "2", "2");
ast_test_validate(test, NULL != test_message2_2);
stasis_publish(topic, test_message2_2);
actual_len = consumer_wait_for(consumer, 3);
ast_test_validate(test, 3 == actual_len);
actual_update = stasis_message_data(consumer->messages_rxed[2]);
ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
/* stasis_cache_get returned a ref, so unref test_message2_2 */
ao2_ref(test_message2_2, -1);
/* Clear snapshot 1 */
test_message1_clear = stasis_cache_clear_create(test_message1_1);
ast_test_validate(test, NULL != test_message1_clear);
stasis_publish(topic, test_message1_clear);
actual_len = consumer_wait_for(consumer, 4);
ast_test_validate(test, 4 == actual_len);
actual_update = stasis_message_data(consumer->messages_rxed[3]);
ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
ast_test_validate(test, NULL == actual_update->new_snapshot);
ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
return AST_TEST_PASS;
}
AST_TEST_DEFINE(cache_dump)
{
RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
int actual_len;
struct ao2_iterator i;
void *obj;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test cache dump routines.";
info->description = "Test cache dump routines.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
ast_test_validate(test, NULL != cache_type);
topic = stasis_topic_create("SomeTopic");
ast_test_validate(test, NULL != topic);
cache = stasis_cache_create(cache_test_data_id);
ast_test_validate(test, NULL != cache);
caching_topic = stasis_caching_topic_create(topic, cache);
ast_test_validate(test, NULL != caching_topic);
consumer = consumer_create(1);
ast_test_validate(test, NULL != consumer);
sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
ast_test_validate(test, NULL != sub);
ao2_ref(consumer, +1);
test_message1_1 = cache_test_message_create(cache_type, "1", "1");
ast_test_validate(test, NULL != test_message1_1);
test_message2_1 = cache_test_message_create(cache_type, "2", "1");
ast_test_validate(test, NULL != test_message2_1);
/* Post a couple of snapshots */
stasis_publish(topic, test_message1_1);
stasis_publish(topic, test_message2_1);
actual_len = consumer_wait_for(consumer, 2);
ast_test_validate(test, 2 == actual_len);
/* Check the cache */
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_dump(cache, NULL);
ast_test_validate(test, NULL != cache_dump);
ast_test_validate(test, 2 == ao2_container_count(cache_dump));
i = ao2_iterator_init(cache_dump, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
}
ao2_iterator_destroy(&i);
/* Update snapshot 2 */
test_message2_2 = cache_test_message_create(cache_type, "2", "2");
ast_test_validate(test, NULL != test_message2_2);
stasis_publish(topic, test_message2_2);
actual_len = consumer_wait_for(consumer, 3);
ast_test_validate(test, 3 == actual_len);
/* Check the cache */
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_dump(cache, NULL);
ast_test_validate(test, NULL != cache_dump);
ast_test_validate(test, 2 == ao2_container_count(cache_dump));
i = ao2_iterator_init(cache_dump, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
}
ao2_iterator_destroy(&i);
/* Clear snapshot 1 */
test_message1_clear = stasis_cache_clear_create(test_message1_1);
ast_test_validate(test, NULL != test_message1_clear);
stasis_publish(topic, test_message1_clear);
actual_len = consumer_wait_for(consumer, 4);
ast_test_validate(test, 4 == actual_len);
/* Check the cache */
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_dump(cache, NULL);
ast_test_validate(test, NULL != cache_dump);
ast_test_validate(test, 1 == ao2_container_count(cache_dump));
i = ao2_iterator_init(cache_dump, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
ast_test_validate(test, actual_cache_entry == test_message2_2);
}
ao2_iterator_destroy(&i);
/* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
ast_test_validate(test, 0 == ao2_container_count(cache_dump));
return AST_TEST_PASS;
}
AST_TEST_DEFINE(cache_eid_aggregate)
{
RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
RAII_VAR(struct consumer *, cache_consumer, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, topic_consumer, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, topic_sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2_3, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2_4, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2_clear, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
int actual_len;
struct ao2_iterator i;
void *obj;
struct ast_eid foreign_eid1;
struct ast_eid foreign_eid2;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test cache eid and aggregate support.";
info->description = "Test cache eid and aggregate support.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
memset(&foreign_eid1, 0xAA, sizeof(foreign_eid1));
memset(&foreign_eid2, 0xBB, sizeof(foreign_eid2));
ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
ast_test_validate(test, NULL != cache_type);
topic = stasis_topic_create("SomeTopic");
ast_test_validate(test, NULL != topic);
/* To consume events published to the topic. */
topic_consumer = consumer_create(1);
ast_test_validate(test, NULL != topic_consumer);
topic_sub = stasis_subscribe(topic, consumer_exec, topic_consumer);
ast_test_validate(test, NULL != topic_sub);
ao2_ref(topic_consumer, +1);
cache = stasis_cache_create_full(cache_test_data_id,
cache_test_aggregate_calc_fn, cache_test_aggregate_publish_fn);
ast_test_validate(test, NULL != cache);
caching_topic = stasis_caching_topic_create(topic, cache);
ast_test_validate(test, NULL != caching_topic);
/* To consume update events published to the caching_topic. */
cache_consumer = consumer_create(1);
ast_test_validate(test, NULL != cache_consumer);
cache_sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, cache_consumer);
ast_test_validate(test, NULL != cache_sub);
ao2_ref(cache_consumer, +1);
/* Create test messages. */
test_message1_1 = cache_test_message_create_full(cache_type, "1", "1", &ast_eid_default);
ast_test_validate(test, NULL != test_message1_1);
test_message2_1 = cache_test_message_create_full(cache_type, "2", "1", &ast_eid_default);
ast_test_validate(test, NULL != test_message2_1);
test_message2_2 = cache_test_message_create_full(cache_type, "2", "2", &foreign_eid1);
ast_test_validate(test, NULL != test_message2_2);
test_message2_3 = cache_test_message_create_full(cache_type, "2", "3", &foreign_eid2);
ast_test_validate(test, NULL != test_message2_3);
test_message2_4 = cache_test_message_create_full(cache_type, "2", "4", &foreign_eid2);
ast_test_validate(test, NULL != test_message2_4);
/* Post some snapshots */
stasis_publish(topic, test_message1_1);
ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", "1"));
stasis_publish(topic, test_message2_1);
ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "1"));
stasis_publish(topic, test_message2_2);
ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "3"));
actual_len = consumer_wait_for(cache_consumer, 6);
ast_test_validate(test, 6 == actual_len);
actual_len = consumer_wait_for(topic_consumer, 6);
ast_test_validate(test, 6 == actual_len);
/* Check the cache */
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_dump_all(cache, NULL);
ast_test_validate(test, NULL != cache_dump);
ast_test_validate(test, 3 == ao2_container_count(cache_dump));
i = ao2_iterator_init(cache_dump, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
ast_test_validate(test,
actual_cache_entry == test_message1_1
|| actual_cache_entry == test_message2_1
|| actual_cache_entry == test_message2_2);
}
ao2_iterator_destroy(&i);
/* Check the local cached items */
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_dump_by_eid(cache, NULL, &ast_eid_default);
ast_test_validate(test, NULL != cache_dump);
ast_test_validate(test, 2 == ao2_container_count(cache_dump));
i = ao2_iterator_init(cache_dump, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
ast_test_validate(test,
actual_cache_entry == test_message1_1
|| actual_cache_entry == test_message2_1);
}
ao2_iterator_destroy(&i);
/* Post snapshot 2 from another eid. */
stasis_publish(topic, test_message2_3);
ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "6"));
actual_len = consumer_wait_for(cache_consumer, 8);
ast_test_validate(test, 8 == actual_len);
actual_len = consumer_wait_for(topic_consumer, 8);
ast_test_validate(test, 8 == actual_len);
/* Check the cache */
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_dump_all(cache, NULL);
ast_test_validate(test, NULL != cache_dump);
ast_test_validate(test, 4 == ao2_container_count(cache_dump));
i = ao2_iterator_init(cache_dump, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
ast_test_validate(test,
actual_cache_entry == test_message1_1
|| actual_cache_entry == test_message2_1
|| actual_cache_entry == test_message2_2
|| actual_cache_entry == test_message2_3);
}
ao2_iterator_destroy(&i);
/* Check the remote cached items */
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_dump_by_eid(cache, NULL, &foreign_eid1);
ast_test_validate(test, NULL != cache_dump);
ast_test_validate(test, 1 == ao2_container_count(cache_dump));
i = ao2_iterator_init(cache_dump, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
ast_test_validate(test, actual_cache_entry == test_message2_2);
}
ao2_iterator_destroy(&i);
/* Post snapshot 2 from a repeated eid. */
stasis_publish(topic, test_message2_4);
ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "7"));
actual_len = consumer_wait_for(cache_consumer, 10);
ast_test_validate(test, 10 == actual_len);
actual_len = consumer_wait_for(topic_consumer, 10);
ast_test_validate(test, 10 == actual_len);
/* Check the cache */
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_dump_all(cache, NULL);
ast_test_validate(test, NULL != cache_dump);
ast_test_validate(test, 4 == ao2_container_count(cache_dump));
i = ao2_iterator_init(cache_dump, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
ast_test_validate(test,
actual_cache_entry == test_message1_1
|| actual_cache_entry == test_message2_1
|| actual_cache_entry == test_message2_2
|| actual_cache_entry == test_message2_4);
}
ao2_iterator_destroy(&i);
/* Check all snapshot 2 cache entries. */
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_get_all(cache, cache_type, "2");
ast_test_validate(test, NULL != cache_dump);
ast_test_validate(test, 3 == ao2_container_count(cache_dump));
i = ao2_iterator_init(cache_dump, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
ast_test_validate(test,
actual_cache_entry == test_message2_1
|| actual_cache_entry == test_message2_2
|| actual_cache_entry == test_message2_4);
}
ao2_iterator_destroy(&i);
/* Clear snapshot 1 */
test_message1_clear = stasis_cache_clear_create(test_message1_1);
ast_test_validate(test, NULL != test_message1_clear);
stasis_publish(topic, test_message1_clear);
ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", NULL));
actual_len = consumer_wait_for(cache_consumer, 12);
ast_test_validate(test, 12 == actual_len);
actual_len = consumer_wait_for(topic_consumer, 11);
ast_test_validate(test, 11 == actual_len);
/* Check the cache */
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_dump_all(cache, NULL);
ast_test_validate(test, NULL != cache_dump);
ast_test_validate(test, 3 == ao2_container_count(cache_dump));
i = ao2_iterator_init(cache_dump, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
ast_test_validate(test,
actual_cache_entry == test_message2_1
|| actual_cache_entry == test_message2_2
|| actual_cache_entry == test_message2_4);
}
ao2_iterator_destroy(&i);
/* Clear snapshot 2 from a remote eid */
test_message2_clear = stasis_cache_clear_create(test_message2_2);
ast_test_validate(test, NULL != test_message2_clear);
stasis_publish(topic, test_message2_clear);
ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "5"));
actual_len = consumer_wait_for(cache_consumer, 14);
ast_test_validate(test, 14 == actual_len);
actual_len = consumer_wait_for(topic_consumer, 13);
ast_test_validate(test, 13 == actual_len);
/* Check the cache */
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_dump_all(cache, NULL);
ast_test_validate(test, NULL != cache_dump);
ast_test_validate(test, 2 == ao2_container_count(cache_dump));
i = ao2_iterator_init(cache_dump, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
ast_test_validate(test,
actual_cache_entry == test_message2_1
|| actual_cache_entry == test_message2_4);
}
ao2_iterator_destroy(&i);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(router)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
int actual_len, ret;
struct stasis_message *actual;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test simple message routing";
info->description = "Test simple message routing";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer1 = consumer_create(1);
ast_test_validate(test, NULL != consumer1);
consumer2 = consumer_create(1);
ast_test_validate(test, NULL != consumer2);
consumer3 = consumer_create(1);
ast_test_validate(test, NULL != consumer3);
ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
ast_test_validate(test, NULL != test_message_type1);
ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
ast_test_validate(test, NULL != test_message_type2);
ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
ast_test_validate(test, NULL != test_message_type3);
uut = stasis_message_router_create(topic);
ast_test_validate(test, NULL != uut);
ret = stasis_message_router_add(
uut, test_message_type1, consumer_exec, consumer1);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer1, +1);
ret = stasis_message_router_add(
uut, test_message_type2, consumer_exec, consumer2);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer2, +1);
ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer3, +1);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
test_message1 = stasis_message_create(test_message_type1, test_data);
ast_test_validate(test, NULL != test_message1);
test_message2 = stasis_message_create(test_message_type2, test_data);
ast_test_validate(test, NULL != test_message2);
test_message3 = stasis_message_create(test_message_type3, test_data);
ast_test_validate(test, NULL != test_message3);
stasis_publish(topic, test_message1);
stasis_publish(topic, test_message2);
stasis_publish(topic, test_message3);
actual_len = consumer_wait_for(consumer1, 1);
ast_test_validate(test, 1 == actual_len);
actual_len = consumer_wait_for(consumer2, 1);
ast_test_validate(test, 1 == actual_len);
actual_len = consumer_wait_for(consumer3, 1);
ast_test_validate(test, 1 == actual_len);
actual = consumer1->messages_rxed[0];
ast_test_validate(test, test_message1 == actual);
actual = consumer2->messages_rxed[0];
ast_test_validate(test, test_message2 == actual);
actual = consumer3->messages_rxed[0];
ast_test_validate(test, test_message3 == actual);
/* consumer1 and consumer2 do not get the final message. */
ao2_cleanup(consumer1);
ao2_cleanup(consumer2);
return AST_TEST_PASS;
}
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
AST_TEST_DEFINE(router_pool)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
int actual_len, ret;
struct stasis_message *actual;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test message routing via threadpool";
info->description = "Test simple message routing when\n"
"the subscriptions dictate usage of the Stasis\n"
"threadpool.";
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer1 = consumer_create(1);
ast_test_validate(test, NULL != consumer1);
consumer2 = consumer_create(1);
ast_test_validate(test, NULL != consumer2);
consumer3 = consumer_create(1);
ast_test_validate(test, NULL != consumer3);
ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
ast_test_validate(test, NULL != test_message_type1);
ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
ast_test_validate(test, NULL != test_message_type2);
ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
ast_test_validate(test, NULL != test_message_type3);
uut = stasis_message_router_create_pool(topic);
ast_test_validate(test, NULL != uut);
ret = stasis_message_router_add(
uut, test_message_type1, consumer_exec, consumer1);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer1, +1);
ret = stasis_message_router_add(
uut, test_message_type2, consumer_exec, consumer2);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer2, +1);
ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer3, +1);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
test_message1 = stasis_message_create(test_message_type1, test_data);
ast_test_validate(test, NULL != test_message1);
test_message2 = stasis_message_create(test_message_type2, test_data);
ast_test_validate(test, NULL != test_message2);
test_message3 = stasis_message_create(test_message_type3, test_data);
ast_test_validate(test, NULL != test_message3);
stasis_publish(topic, test_message1);
stasis_publish(topic, test_message2);
stasis_publish(topic, test_message3);
actual_len = consumer_wait_for(consumer1, 1);
ast_test_validate(test, 1 == actual_len);
actual_len = consumer_wait_for(consumer2, 1);
ast_test_validate(test, 1 == actual_len);
actual_len = consumer_wait_for(consumer3, 1);
ast_test_validate(test, 1 == actual_len);
actual = consumer1->messages_rxed[0];
ast_test_validate(test, test_message1 == actual);
actual = consumer2->messages_rxed[0];
ast_test_validate(test, test_message2 == actual);
actual = consumer3->messages_rxed[0];
ast_test_validate(test, test_message3 == actual);
/* consumer1 and consumer2 do not get the final message. */
ao2_cleanup(consumer1);
ao2_cleanup(consumer2);
return AST_TEST_PASS;
}
static const char *cache_simple(struct stasis_message *message)
{
const char *type_name =
stasis_message_type_name(stasis_message_type(message));
if (!ast_begins_with(type_name, "Cache")) {
return NULL;
}
return "cached";
}
AST_TEST_DEFINE(router_cache_updates)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe_and_join);
RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
struct stasis_cache_update *update;
int actual_len, ret;
struct stasis_message *actual;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test special handling cache_update messages";
info->description = "Test special handling cache_update messages";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
cache = stasis_cache_create(cache_simple);
ast_test_validate(test, NULL != cache);
caching_topic = stasis_caching_topic_create(topic, cache);
ast_test_validate(test, NULL != caching_topic);
consumer1 = consumer_create(1);
ast_test_validate(test, NULL != consumer1);
consumer2 = consumer_create(1);
ast_test_validate(test, NULL != consumer2);
consumer3 = consumer_create(1);
ast_test_validate(test, NULL != consumer3);
ast_test_validate(test, stasis_message_type_create("Cache1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
ast_test_validate(test, NULL != test_message_type1);
ast_test_validate(test, stasis_message_type_create("Cache2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
ast_test_validate(test, NULL != test_message_type2);
ast_test_validate(test, stasis_message_type_create("NonCache", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
ast_test_validate(test, NULL != test_message_type3);
uut = stasis_message_router_create(
stasis_caching_get_topic(caching_topic));
ast_test_validate(test, NULL != uut);
ret = stasis_message_router_add_cache_update(
uut, test_message_type1, consumer_exec, consumer1);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer1, +1);
ret = stasis_message_router_add(
uut, stasis_cache_update_type(), consumer_exec, consumer2);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer2, +1);
ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer3, +1);
test_data = ao2_alloc(1, NULL);
ast_test_validate(test, NULL != test_data);
test_message1 = stasis_message_create(test_message_type1, test_data);
ast_test_validate(test, NULL != test_message1);
test_message2 = stasis_message_create(test_message_type2, test_data);
ast_test_validate(test, NULL != test_message2);
test_message3 = stasis_message_create(test_message_type3, test_data);
ast_test_validate(test, NULL != test_message3);
stasis_publish(topic, test_message1);
stasis_publish(topic, test_message2);
stasis_publish(topic, test_message3);
actual_len = consumer_wait_for(consumer1, 1);
ast_test_validate(test, 1 == actual_len);
actual_len = consumer_wait_for(consumer2, 1);
ast_test_validate(test, 1 == actual_len);
Tweak caching topics to fix CEL tests The Stasis changes in r395954 had an unanticipated side effect: messages published directly to an _all topic does not get forwarded to the corresponding caching topic. This patch fixes that by changing how caching topics forward messages, and how the caching pattern forwards are setup. For the caching pattern, the all_topic is forwarded to the all_topic_cached. This forwards messages published directly to the all_topic to all_topic_cached. In order to avoid duplicate messages on all_topic_cached, caching topics were changed to no longer forward uncached messages. Subscribers to an individual caching topic should only expect to receive cache updates, and subscription change messages. Since individual caching topics are new, this shouldn't be a problem. There are a few minor changes to the pre-cache split behavior. * For topics changed to use the caching pattern, the all_topic_cached will forward snapshots in addition to cache updates. Since subscribers by design ignore unexpected messages, this should be fine. * Caching topics that don't use the caching pattern no longer forward non-cache updates. This makes no difference for the current caching topics. * mwi_topic_cached, channel_by_name_topic and presence_state_topic_cached have no subscribers * device_state_topic_cached's only subscriber only processes cache udpates (issue ASTERISK-22243) Review: https://reviewboard.asterisk.org/r/2738 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@396329 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-08-06 14:28:23 +00:00
/* Uncacheable message should not be passed through */
actual_len = consumer_should_stay(consumer3, 0);
ast_test_validate(test, 0 == actual_len);
actual = consumer1->messages_rxed[0];
ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
update = stasis_message_data(actual);
ast_test_validate(test, test_message_type1 == update->type);
ast_test_validate(test, test_message1 == update->new_snapshot);
actual = consumer2->messages_rxed[0];
ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
update = stasis_message_data(actual);
ast_test_validate(test, test_message_type2 == update->type);
ast_test_validate(test, test_message2 == update->new_snapshot);
/* consumer1 and consumer2 do not get the final message. */
ao2_cleanup(consumer1);
ao2_cleanup(consumer2);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(no_to_json)
{
RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
RAII_VAR(char *, data, NULL, ao2_cleanup);
RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
char *expected = "SomeData";
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test message to_json function";
info->description = "Test message to_json function";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
/* Test NULL */
actual = stasis_message_to_json(NULL, NULL);
ast_test_validate(test, NULL == actual);
/* Test message with NULL to_json function */
ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
data = ao2_alloc(strlen(expected) + 1, NULL);
strcpy(data, expected);
uut = stasis_message_create(type, data);
ast_test_validate(test, NULL != uut);
actual = stasis_message_to_json(uut, NULL);
ast_test_validate(test, NULL == actual);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(to_json)
{
RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
RAII_VAR(char *, data, NULL, ao2_cleanup);
RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
const char *expected_text = "SomeData";
RAII_VAR(struct ast_json *, expected, NULL, ast_json_unref);
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test message to_json function when NULL";
info->description = "Test message to_json function when NULL";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
data = ao2_alloc(strlen(expected_text) + 1, NULL);
strcpy(data, expected_text);
uut = stasis_message_create(type, data);
ast_test_validate(test, NULL != uut);
expected = ast_json_string_create(expected_text);
actual = stasis_message_to_json(uut, NULL);
ast_test_validate(test, ast_json_equal(expected, actual));
return AST_TEST_PASS;
}
AST_TEST_DEFINE(no_to_ami)
{
RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
RAII_VAR(char *, data, NULL, ao2_cleanup);
RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
char *expected = "SomeData";
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test message to_ami function when NULL";
info->description = "Test message to_ami function when NULL";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
/* Test NULL */
actual = stasis_message_to_ami(NULL);
ast_test_validate(test, NULL == actual);
/* Test message with NULL to_ami function */
ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
data = ao2_alloc(strlen(expected) + 1, NULL);
strcpy(data, expected);
uut = stasis_message_create(type, data);
ast_test_validate(test, NULL != uut);
actual = stasis_message_to_ami(uut);
ast_test_validate(test, NULL == actual);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(to_ami)
{
RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
RAII_VAR(char *, data, NULL, ao2_cleanup);
RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
const char *expected_text = "SomeData";
const char *expected = "Message: SomeData\r\n";
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test message to_ami function";
info->description = "Test message to_ami function";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
data = ao2_alloc(strlen(expected_text) + 1, NULL);
strcpy(data, expected_text);
uut = stasis_message_create(type, data);
ast_test_validate(test, NULL != uut);
actual = stasis_message_to_ami(uut);
ast_test_validate(test, strcmp(expected, actual->extra_fields) == 0);
return AST_TEST_PASS;
}
static void noop(void *data, struct stasis_subscription *sub,
Multiple revisions 399887,400138,400178,400180-400181 ........ r399887 | dlee | 2013-09-26 10:41:47 -0500 (Thu, 26 Sep 2013) | 1 line Minor performance bump by not allocate manager variable struct if we don't need it ........ r400138 | dlee | 2013-09-30 10:24:00 -0500 (Mon, 30 Sep 2013) | 23 lines Stasis performance improvements This patch addresses several performance problems that were found in the initial performance testing of Asterisk 12. The Stasis dispatch object was allocated as an AO2 object, even though it has a very confined lifecycle. This was replaced with a straight ast_malloc(). The Stasis message router was spending an inordinate amount of time searching hash tables. In this case, most of our routers had 6 or fewer routes in them to begin with. This was replaced with an array that's searched linearly for the route. We more heavily rely on AO2 objects in Asterisk 12, and the memset() in ao2_ref() actually became noticeable on the profile. This was #ifdef'ed to only run when AO2_DEBUG was enabled. After being misled by an erroneous comment in taskprocessor.c during profiling, the wrong comment was removed. Review: https://reviewboard.asterisk.org/r/2873/ ........ r400178 | dlee | 2013-09-30 13:26:27 -0500 (Mon, 30 Sep 2013) | 24 lines Taskprocessor optimization; switch Stasis to use taskprocessors This patch optimizes taskprocessor to use a semaphore for signaling, which the OS can do a better job at managing contention and waiting that we can with a mutex and condition. The taskprocessor execution was also slightly optimized to reduce the number of locks taken. The only observable difference in the taskprocessor implementation is that when the final reference to the taskprocessor goes away, it will execute all tasks to completion instead of discarding the unexecuted tasks. For systems where unnamed semaphores are not supported, a really simple semaphore implementation is provided. (Which gives identical performance as the original taskprocessor implementation). The way we ended up implementing Stasis caused the threadpool to be a burden instead of a boost to performance. This was switched to just use taskprocessors directly for subscriptions. Review: https://reviewboard.asterisk.org/r/2881/ ........ r400180 | dlee | 2013-09-30 13:39:34 -0500 (Mon, 30 Sep 2013) | 28 lines Optimize how Stasis forwards are dispatched This patch optimizes how forwards are dispatched in Stasis. Originally, forwards were dispatched as subscriptions that are invoked on the publishing thread. This did not account for the vast number of forwards we would end up having in the system, and the amount of work it would take to walk though the forward subscriptions. This patch modifies Stasis so that rather than walking the tree of forwards on every dispatch, when forwards and subscriptions are changed, the subscriber list for every topic in the tree is changed. This has a couple of benefits. First, this reduces the workload of dispatching messages. It also reduces contention when dispatching to different topics that happen to forward to the same aggregation topic (as happens with all of the channel, bridge and endpoint topics). Since forwards are no longer subscriptions, the bulk of this patch is simply changing stasis_subscription objects to stasis_forward objects (which, admittedly, I should have done in the first place.) Since this required me to yet again put in a growing array, I finally abstracted that out into a set of ast_vector macros in asterisk/vector.h. Review: https://reviewboard.asterisk.org/r/2883/ ........ r400181 | dlee | 2013-09-30 13:48:57 -0500 (Mon, 30 Sep 2013) | 28 lines Remove dispatch object allocation from Stasis publishing While looking for areas for performance improvement, I realized that an unused feature in Stasis was negatively impacting performance. When a message is sent to a subscriber, a dispatch object is allocated for the dispatch, containing the topic the message was published to, the subscriber the message is being sent to, and the message itself. The topic is actually unused by any subscriber in Asterisk today. And the subscriber is associated with the taskprocessor the message is being dispatched to. First, this patch removes the unused topic parameter from Stasis subscription callbacks. Second, this patch introduces the concept of taskprocessor local data, data that may be set on a taskprocessor and provided along with the data pointer when a task is pushed using the ast_taskprocessor_push_local() call. This allows the task to have both data specific to that taskprocessor, in addition to data specific to that invocation. With those two changes, the dispatch object can be removed completely, and the message is simply refcounted and sent directly to the taskprocessor. Review: https://reviewboard.asterisk.org/r/2884/ ........ Merged revisions 399887,400138,400178,400180-400181 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@400186 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-09-30 18:55:27 +00:00
struct stasis_message *message)
{
/* no-op */
}
AST_TEST_DEFINE(dtor_order)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test that destruction order doesn't bomb stuff";
info->description = "Test that destruction order doesn't bomb stuff";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("test-topic");
ast_test_validate(test, NULL != topic);
sub = stasis_subscribe(topic, noop, NULL);
ast_test_validate(test, NULL != sub);
/* With any luck, this won't completely blow everything up */
ao2_cleanup(topic);
stasis_unsubscribe(sub);
/* These refs were cleaned up manually */
topic = NULL;
sub = NULL;
return AST_TEST_PASS;
}
static const char *noop_get_id(struct stasis_message *message)
{
return NULL;
}
AST_TEST_DEFINE(caching_dtor_order)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL,
stasis_caching_unsubscribe);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary = "Test that destruction order doesn't bomb stuff";
info->description = "Test that destruction order doesn't bomb stuff";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
cache = stasis_cache_create(noop_get_id);
ast_test_validate(test, NULL != cache);
topic = stasis_topic_create("test-topic");
ast_test_validate(test, NULL != topic);
caching_topic = stasis_caching_topic_create(topic, cache);
ast_test_validate(test, NULL != caching_topic);
sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), noop,
NULL);
ast_test_validate(test, NULL != sub);
/* With any luck, this won't completely blow everything up */
ao2_cleanup(cache);
ao2_cleanup(topic);
stasis_caching_unsubscribe(caching_topic);
stasis_unsubscribe(sub);
/* These refs were cleaned up manually */
cache = NULL;
topic = NULL;
caching_topic = NULL;
sub = NULL;
return AST_TEST_PASS;
}
struct test_message_types {
struct stasis_message_type *none;
struct stasis_message_type *ami;
struct stasis_message_type *json;
struct stasis_message_type *event;
struct stasis_message_type *amievent;
struct stasis_message_type *type1;
struct stasis_message_type *type2;
struct stasis_message_type *type3;
struct stasis_message_type *change;
};
static void destroy_message_types(void *obj)
{
struct test_message_types *types = obj;
ao2_cleanup(types->none);
ao2_cleanup(types->ami);
ao2_cleanup(types->json);
ao2_cleanup(types->event);
ao2_cleanup(types->amievent);
ao2_cleanup(types->type1);
ao2_cleanup(types->type2);
ao2_cleanup(types->type3);
/* N.B. Don't cleanup types->change! */
}
static struct test_message_types *create_message_types(struct ast_test *test)
{
struct stasis_message_vtable vtable = { 0 };
struct test_message_types *types;
enum ast_test_result_state __attribute__ ((unused)) rc;
types = ao2_alloc(sizeof(*types), destroy_message_types);
if (!types) {
return NULL;
}
ast_test_validate_cleanup(test,
stasis_message_type_create("TestMessageNONE", &vtable, &types->none) == STASIS_MESSAGE_TYPE_SUCCESS,
rc, cleanup);
vtable.to_ami = fake_ami;
ast_test_validate_cleanup(test,
stasis_message_type_create("TestMessageAMI", &vtable, &types->ami) == STASIS_MESSAGE_TYPE_SUCCESS,
rc, cleanup);
vtable.to_ami = NULL;
vtable.to_json = fake_json;
ast_test_validate_cleanup(test,
stasis_message_type_create("TestMessageJSON", &vtable, &types->json) == STASIS_MESSAGE_TYPE_SUCCESS,
rc, cleanup);
vtable.to_ami = NULL;
vtable.to_json = NULL;
vtable.to_event = fake_event;
ast_test_validate_cleanup(test,
stasis_message_type_create("TestMessageEVENT", &vtable, &types->event) == STASIS_MESSAGE_TYPE_SUCCESS,
rc, cleanup);
vtable.to_ami = fake_ami;
ast_test_validate_cleanup(test,
stasis_message_type_create("TestMessageAMIEVENT", &vtable, &types->amievent) == STASIS_MESSAGE_TYPE_SUCCESS,
rc, cleanup);
ast_test_validate_cleanup(test,
stasis_message_type_create("TestMessageType1", NULL, &types->type1) == STASIS_MESSAGE_TYPE_SUCCESS,
rc, cleanup);
ast_test_validate_cleanup(test,
stasis_message_type_create("TestMessageType2", NULL, &types->type2) == STASIS_MESSAGE_TYPE_SUCCESS,
rc, cleanup);
ast_test_validate_cleanup(test,
stasis_message_type_create("TestMessageType3", NULL, &types->type3) == STASIS_MESSAGE_TYPE_SUCCESS,
rc, cleanup);
types->change = stasis_subscription_change_type();
return types;
cleanup:
ao2_cleanup(types);
return NULL;
}
struct cts {
struct consumer *consumer;
struct stasis_topic *topic;
struct stasis_subscription *sub;
};
static void destroy_cts(void *obj)
{
struct cts *c = obj;
stasis_unsubscribe(c->sub);
ao2_cleanup(c->topic);
ao2_cleanup(c->consumer);
}
static struct cts *create_cts(struct ast_test *test)
{
struct cts *cts = ao2_alloc(sizeof(*cts), destroy_cts);
enum ast_test_result_state __attribute__ ((unused)) rc;
ast_test_validate_cleanup(test, cts, rc, cleanup);
cts->topic = stasis_topic_create("TestTopic");
ast_test_validate_cleanup(test, NULL != cts->topic, rc, cleanup);
cts->consumer = consumer_create(0);
ast_test_validate_cleanup(test, NULL != cts->consumer, rc, cleanup);
ao2_ref(cts->consumer, +1);
cts->sub = stasis_subscribe(cts->topic, consumer_exec, cts->consumer);
ast_test_validate_cleanup(test, NULL != cts->sub, rc, cleanup);
return cts;
cleanup:
ao2_cleanup(cts);
return NULL;
}
static int is_msg(struct stasis_message *msg, struct stasis_message_type *mtype, const char *data)
{
struct stasis_subscription_change *msg_data = stasis_message_data(msg);
if (stasis_message_type(msg) != mtype) {
return 0;
}
if (data) {
return (strcmp(data, msg_data->description) == 0);
}
return 1;
}
static void dump_consumer(struct ast_test *test, struct cts *cts)
{
int i;
struct stasis_subscription_change *data;
ast_test_status_update(test, "Messages received: %zu Final? %s\n", cts->consumer->messages_rxed_len,
cts->consumer->complete ? "yes" : "no");
for (i = 0; i < cts->consumer->messages_rxed_len; i++) {
data = stasis_message_data(cts->consumer->messages_rxed[i]);
ast_test_status_update(test, "Message type received: %s %s\n",
stasis_message_type_name(stasis_message_type(cts->consumer->messages_rxed[i])),
data && !ast_strlen_zero(data->description) ? data->description : "no data");
}
}
static int send_msg(struct ast_test *test, struct cts *cts, struct stasis_message_type *msg_type,
const char *data)
{
struct stasis_message *msg;
struct stasis_subscription_change *test_data =
ao2_alloc(sizeof(*test_data) + (data ? strlen(data) : strlen("no data")) + 1, NULL);
if (!test_data) {
return 0;
}
strcpy(test_data->description, S_OR(data, "no data")); /* Safe */
msg = stasis_message_create(msg_type, test_data);
ao2_ref(test_data, -1);
if (!msg) {
ast_test_status_update(test, "Unable to create %s message\n",
stasis_message_type_name(msg_type));
return 0;
}
stasis_publish(cts->topic, msg);
ao2_ref(msg, -1);
return 1;
}
AST_TEST_DEFINE(type_filters)
{
RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
int ix = 0;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category "filtering/";
info->summary = "Test message filtering by type";
info->description = "Test message filtering by type";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
types = create_message_types(test);
ast_test_validate(test, NULL != types);
cts = create_cts(test);
ast_test_validate(test, NULL != cts);
ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
/* We should get these */
ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
/* ... but not this one */
ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
/* Wait for change(subscribe) and "Pass" messages */
consumer_wait_for(cts->consumer, 3);
/* Remove type 1 */
ast_test_validate(test, stasis_subscription_decline_message_type(cts->sub, types->type1) == 0);
/* We should now NOT get this one */
ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
/* We should get this one (again) */
ast_test_validate(test, send_msg(test, cts, types->type2, "Pass2"));
/* We still should NOT get this one */
ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
/* We should now have a second type2 */
consumer_wait_for(cts->consumer, 4);
stasis_unsubscribe(cts->sub);
cts->sub = NULL;
consumer_wait_for_completion(cts->consumer);
dump_consumer(test, cts);
ast_test_validate(test, 1 == cts->consumer->complete);
ast_test_validate(test, 5 == cts->consumer->messages_rxed_len);
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass2"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
return AST_TEST_PASS;
}
AST_TEST_DEFINE(formatter_filters)
{
RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup) ;
int ix = 0;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category "filtering/";
info->summary = "Test message filtering by formatter";
info->description = "Test message filtering by formatter";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
types = create_message_types(test);
ast_test_validate(test, NULL != types);
cts = create_cts(test);
ast_test_validate(test, NULL != cts);
stasis_subscription_accept_formatters(cts->sub,
STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
/* We should get these */
ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
/* ... but not these */
ast_test_validate(test, send_msg(test, cts, types->none, "FAIL"));
ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
/* Wait for change(subscribe) and the "Pass" messages */
consumer_wait_for(cts->consumer, 4);
/* Change the subscription to accept only event formatters */
stasis_subscription_accept_formatters(cts->sub, STASIS_SUBSCRIPTION_FORMATTER_EVENT);
/* We should NOT get these now */
ast_test_validate(test, send_msg(test, cts, types->ami, "FAIL"));
ast_test_validate(test, send_msg(test, cts, types->json, "FAIL"));
/* ... but we should still get this one */
ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass2"));
/* ... and this one should be new */
ast_test_validate(test, send_msg(test, cts, types->event, "Pass"));
/* We should now have a second amievent */
consumer_wait_for(cts->consumer, 6);
stasis_unsubscribe(cts->sub);
cts->sub = NULL;
consumer_wait_for_completion(cts->consumer);
dump_consumer(test, cts);
ast_test_validate(test, 1 == cts->consumer->complete);
ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass2"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->event, "Pass"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
return AST_TEST_PASS;
}
AST_TEST_DEFINE(combo_filters)
{
RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
int ix = 0;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category "filtering/";
info->summary = "Test message filtering by type and formatter";
info->description = "Test message filtering by type and formatter";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
types = create_message_types(test);
ast_test_validate(test, NULL != types);
cts = create_cts(test);
ast_test_validate(test, NULL != cts);
ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
stasis_subscription_accept_formatters(cts->sub,
STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
/* We should get these */
ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
/* ... but not these */
ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
/* Wait for change(subscribe) and the "Pass" messages */
consumer_wait_for(cts->consumer, 6);
stasis_unsubscribe(cts->sub);
cts->sub = NULL;
consumer_wait_for_completion(cts->consumer);
dump_consumer(test, cts);
ast_test_validate(test, 1 == cts->consumer->complete);
ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
return AST_TEST_PASS;
}
static int unload_module(void)
{
AST_TEST_UNREGISTER(message_type);
AST_TEST_UNREGISTER(message);
AST_TEST_UNREGISTER(subscription_messages);
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
AST_TEST_UNREGISTER(subscription_pool_messages);
AST_TEST_UNREGISTER(publish);
AST_TEST_UNREGISTER(publish_sync);
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
AST_TEST_UNREGISTER(publish_pool);
AST_TEST_UNREGISTER(unsubscribe_stops_messages);
AST_TEST_UNREGISTER(forward);
Tweak caching topics to fix CEL tests The Stasis changes in r395954 had an unanticipated side effect: messages published directly to an _all topic does not get forwarded to the corresponding caching topic. This patch fixes that by changing how caching topics forward messages, and how the caching pattern forwards are setup. For the caching pattern, the all_topic is forwarded to the all_topic_cached. This forwards messages published directly to the all_topic to all_topic_cached. In order to avoid duplicate messages on all_topic_cached, caching topics were changed to no longer forward uncached messages. Subscribers to an individual caching topic should only expect to receive cache updates, and subscription change messages. Since individual caching topics are new, this shouldn't be a problem. There are a few minor changes to the pre-cache split behavior. * For topics changed to use the caching pattern, the all_topic_cached will forward snapshots in addition to cache updates. Since subscribers by design ignore unexpected messages, this should be fine. * Caching topics that don't use the caching pattern no longer forward non-cache updates. This makes no difference for the current caching topics. * mwi_topic_cached, channel_by_name_topic and presence_state_topic_cached have no subscribers * device_state_topic_cached's only subscriber only processes cache udpates (issue ASTERISK-22243) Review: https://reviewboard.asterisk.org/r/2738 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@396329 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-08-06 14:28:23 +00:00
AST_TEST_UNREGISTER(cache_filter);
AST_TEST_UNREGISTER(cache);
AST_TEST_UNREGISTER(cache_dump);
AST_TEST_UNREGISTER(cache_eid_aggregate);
AST_TEST_UNREGISTER(router);
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
AST_TEST_UNREGISTER(router_pool);
AST_TEST_UNREGISTER(router_cache_updates);
AST_TEST_UNREGISTER(interleaving);
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
AST_TEST_UNREGISTER(subscription_interleaving);
AST_TEST_UNREGISTER(no_to_json);
AST_TEST_UNREGISTER(to_json);
AST_TEST_UNREGISTER(no_to_ami);
AST_TEST_UNREGISTER(to_ami);
AST_TEST_UNREGISTER(dtor_order);
AST_TEST_UNREGISTER(caching_dtor_order);
AST_TEST_UNREGISTER(type_filters);
AST_TEST_UNREGISTER(formatter_filters);
AST_TEST_UNREGISTER(combo_filters);
return 0;
}
static int load_module(void)
{
AST_TEST_REGISTER(message_type);
AST_TEST_REGISTER(message);
AST_TEST_REGISTER(subscription_messages);
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
AST_TEST_REGISTER(subscription_pool_messages);
AST_TEST_REGISTER(publish);
AST_TEST_REGISTER(publish_sync);
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
AST_TEST_REGISTER(publish_pool);
AST_TEST_REGISTER(unsubscribe_stops_messages);
AST_TEST_REGISTER(forward);
Tweak caching topics to fix CEL tests The Stasis changes in r395954 had an unanticipated side effect: messages published directly to an _all topic does not get forwarded to the corresponding caching topic. This patch fixes that by changing how caching topics forward messages, and how the caching pattern forwards are setup. For the caching pattern, the all_topic is forwarded to the all_topic_cached. This forwards messages published directly to the all_topic to all_topic_cached. In order to avoid duplicate messages on all_topic_cached, caching topics were changed to no longer forward uncached messages. Subscribers to an individual caching topic should only expect to receive cache updates, and subscription change messages. Since individual caching topics are new, this shouldn't be a problem. There are a few minor changes to the pre-cache split behavior. * For topics changed to use the caching pattern, the all_topic_cached will forward snapshots in addition to cache updates. Since subscribers by design ignore unexpected messages, this should be fine. * Caching topics that don't use the caching pattern no longer forward non-cache updates. This makes no difference for the current caching topics. * mwi_topic_cached, channel_by_name_topic and presence_state_topic_cached have no subscribers * device_state_topic_cached's only subscriber only processes cache udpates (issue ASTERISK-22243) Review: https://reviewboard.asterisk.org/r/2738 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@396329 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2013-08-06 14:28:23 +00:00
AST_TEST_REGISTER(cache_filter);
AST_TEST_REGISTER(cache);
AST_TEST_REGISTER(cache_dump);
AST_TEST_REGISTER(cache_eid_aggregate);
AST_TEST_REGISTER(router);
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
AST_TEST_REGISTER(router_pool);
AST_TEST_REGISTER(router_cache_updates);
AST_TEST_REGISTER(interleaving);
main/stasis: Allow subscriptions to use a threadpool for message delivery Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
2014-12-01 17:59:21 +00:00
AST_TEST_REGISTER(subscription_interleaving);
AST_TEST_REGISTER(no_to_json);
AST_TEST_REGISTER(to_json);
AST_TEST_REGISTER(no_to_ami);
AST_TEST_REGISTER(to_ami);
AST_TEST_REGISTER(dtor_order);
AST_TEST_REGISTER(caching_dtor_order);
AST_TEST_REGISTER(type_filters);
AST_TEST_REGISTER(formatter_filters);
AST_TEST_REGISTER(combo_filters);
return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
.support_level = AST_MODULE_SUPPORT_CORE,
.load = load_module,
.unload = unload_module
);