diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index 708b3d7503..2191663057 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -163,7 +163,9 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o * * \since 12.0.0 * - * The listener's alloc() and start() callbacks will be called during this function. + * Note that when a taskprocessor is created in this way, it does not create + * any threads to execute the tasks. This job is left up to the listener. + * The listener's start() callback will be called during this function. * * \param name The name of the taskprocessor to create * \param listener The listener for operations on this taskprocessor diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index 04daa9faa5..e18ba6c696 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -1,7 +1,7 @@ /* * Asterisk -- An open source telephony toolkit. * - * Copyright (C) 2012, Digium, Inc. + * Copyright (C) 2012-2013, Digium, Inc. * * Mark Michelson * @@ -69,7 +69,7 @@ struct ast_threadpool_listener_callbacks { struct ast_threadpool_options { #define AST_THREADPOOL_OPTIONS_VERSION 1 - /*! Version of thradpool options in use */ + /*! Version of threadpool options in use */ int version; /*! * \brief Time limit in seconds for idle threads @@ -98,6 +98,13 @@ struct ast_threadpool_options { * without any threads allocated. */ int initial_size; + /*! + * \brief Maximum number of threads a pool may have + * + * When the threadpool's size increases, it can never increase + * beyond this number of threads. + */ + int max_size; }; /*! @@ -127,7 +134,10 @@ void *ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener * This function creates a threadpool. Tasks may be pushed onto this thread pool * in and will be automatically acted upon by threads within the pool. * - * \param name The name for the threadpool + * Only a single threadpool with a given name may exist. This function will fail + * if a threadpool with the given name already exists. + * + * \param name The unique name for the threadpool * \param listener The listener the threadpool will notify of changes. Can be NULL. * \param options The behavioral options for this threadpool * \retval NULL Failed to create the threadpool diff --git a/main/threadpool.c b/main/threadpool.c index e27b05345b..adaf8a5543 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -1,7 +1,7 @@ /* * Asterisk -- An open source telephony toolkit. * - * Copyright (C) 2012, Digium, Inc. + * Copyright (C) 2012-2013, Digium, Inc. * * Mark Michelson * @@ -24,6 +24,7 @@ #include "asterisk/astobj2.h" #include "asterisk/utils.h" +/* Needs to stay prime if increased */ #define THREAD_BUCKETS 89 /*! @@ -494,6 +495,13 @@ static void grow(struct ast_threadpool *pool, int delta) { int i; + int current_size = ao2_container_count(pool->active_threads) + + ao2_container_count(pool->idle_threads); + + if (pool->options.max_size && current_size + delta > pool->options.max_size) { + delta = pool->options.max_size - current_size; + } + ast_debug(3, "Increasing threadpool %s's size by %d\n", ast_taskprocessor_name(pool->tps), delta); @@ -788,7 +796,7 @@ static int queued_set_size(void *data) struct ast_threadpool *pool = ssd->pool; unsigned int num_threads = ssd->size; - /* We don't count zombie threads as being "live when potentially resizing */ + /* We don't count zombie threads as being "live" when potentially resizing */ unsigned int current_size = ao2_container_count(pool->active_threads) + ao2_container_count(pool->idle_threads); @@ -895,6 +903,9 @@ int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), vo void ast_threadpool_shutdown(struct ast_threadpool *pool) { + if (!pool) { + return; + } /* Shut down the taskprocessors and everything else just * takes care of itself via the taskprocessor callbacks */ diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c index c717ad0092..e370dd78f3 100644 --- a/tests/test_taskprocessor.c +++ b/tests/test_taskprocessor.c @@ -1,7 +1,7 @@ /* * Asterisk -- An open source telephony toolkit. * - * Copyright (C) 2012, Digium, Inc. + * Copyright (C) 2012-2013, Digium, Inc. * * Mark Michelson * diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index 4e04411b81..712b8581b5 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -1,7 +1,7 @@ /* * Asterisk -- An open source telephony toolkit. * - * Copyright (C) 2012, Digium, Inc. + * Copyright (C) 2012-2013, Digium, Inc. * * Mark Michelson * @@ -284,6 +284,7 @@ AST_TEST_DEFINE(threadpool_push) .idle_timeout = 0, .auto_increment = 0, .initial_size = 0, + .max_size = 0, }; switch (cmd) { @@ -324,9 +325,7 @@ AST_TEST_DEFINE(threadpool_push) res = listener_check(test, listener, 1, 1, 1, 0, 0, 0); end: - if (pool) { - ast_threadpool_shutdown(pool); - } + ast_threadpool_shutdown(pool); ao2_cleanup(listener); ast_free(std); ast_free(tld); @@ -338,12 +337,13 @@ AST_TEST_DEFINE(threadpool_initial_threads) struct ast_threadpool *pool = NULL; struct ast_threadpool_listener *listener = NULL; enum ast_test_result_state res = AST_TEST_FAIL; - struct test_listener_data *tld; + struct test_listener_data *tld = NULL; struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, .auto_increment = 0, .initial_size = 3, + .max_size = 0, }; switch (cmd) { @@ -377,9 +377,7 @@ AST_TEST_DEFINE(threadpool_initial_threads) res = wait_until_thread_state(test, tld, 0, 3); end: - if (pool) { - ast_threadpool_shutdown(pool); - } + ast_threadpool_shutdown(pool); ao2_cleanup(listener); ast_free(tld); return res; @@ -391,12 +389,13 @@ AST_TEST_DEFINE(threadpool_thread_creation) struct ast_threadpool *pool = NULL; struct ast_threadpool_listener *listener = NULL; enum ast_test_result_state res = AST_TEST_FAIL; - struct test_listener_data *tld; + struct test_listener_data *tld = NULL; struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, .auto_increment = 0, .initial_size = 0, + .max_size = 0, }; switch (cmd) { @@ -434,9 +433,7 @@ AST_TEST_DEFINE(threadpool_thread_creation) res = wait_until_thread_state(test, tld, 0, 1); end: - if (pool) { - ast_threadpool_shutdown(pool); - } + ast_threadpool_shutdown(pool); ao2_cleanup(listener); ast_free(tld); return res; @@ -447,12 +444,13 @@ AST_TEST_DEFINE(threadpool_thread_destruction) struct ast_threadpool *pool = NULL; struct ast_threadpool_listener *listener = NULL; enum ast_test_result_state res = AST_TEST_FAIL; - struct test_listener_data *tld; + struct test_listener_data *tld = NULL; struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, .auto_increment = 0, .initial_size = 0, + .max_size = 0, }; switch (cmd) { @@ -499,9 +497,7 @@ AST_TEST_DEFINE(threadpool_thread_destruction) res = wait_until_thread_state(test, tld, 0, 2); end: - if (pool) { - ast_threadpool_shutdown(pool); - } + ast_threadpool_shutdown(pool); ao2_cleanup(listener); ast_free(tld); return res; @@ -512,12 +508,13 @@ AST_TEST_DEFINE(threadpool_thread_timeout) struct ast_threadpool *pool = NULL; struct ast_threadpool_listener *listener = NULL; enum ast_test_result_state res = AST_TEST_FAIL; - struct test_listener_data *tld; + struct test_listener_data *tld = NULL; struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 2, .auto_increment = 0, .initial_size = 0, + .max_size = 0, }; switch (cmd) { @@ -567,9 +564,7 @@ AST_TEST_DEFINE(threadpool_thread_timeout) res = listener_check(test, listener, 0, 0, 0, 0, 0, 0); end: - if (pool) { - ast_threadpool_shutdown(pool); - } + ast_threadpool_shutdown(pool); ao2_cleanup(listener); ast_free(tld); return res; @@ -581,12 +576,13 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread) struct ast_threadpool_listener *listener = NULL; struct simple_task_data *std = NULL; enum ast_test_result_state res = AST_TEST_FAIL; - struct test_listener_data *tld; + struct test_listener_data *tld = NULL; struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, .auto_increment = 0, .initial_size = 0, + .max_size = 0, }; switch (cmd) { @@ -648,9 +644,7 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread) res = listener_check(test, listener, 1, 1, 1, 0, 1, 1); end: - if (pool) { - ast_threadpool_shutdown(pool); - } + ast_threadpool_shutdown(pool); ao2_cleanup(listener); ast_free(std); ast_free(tld); @@ -664,12 +658,13 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task) struct ast_threadpool_listener *listener = NULL; struct simple_task_data *std = NULL; enum ast_test_result_state res = AST_TEST_FAIL; - struct test_listener_data *tld; + struct test_listener_data *tld = NULL; struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, .auto_increment = 0, .initial_size = 0, + .max_size = 0, }; switch (cmd) { @@ -732,9 +727,7 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task) res = listener_check(test, listener, 1, 1, 1, 0, 1, 1); end: - if (pool) { - ast_threadpool_shutdown(pool); - } + ast_threadpool_shutdown(pool); ao2_cleanup(listener); ast_free(std); ast_free(tld); @@ -749,12 +742,13 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks) struct simple_task_data *std2 = NULL; struct simple_task_data *std3 = NULL; enum ast_test_result_state res = AST_TEST_FAIL; - struct test_listener_data *tld; + struct test_listener_data *tld = NULL; struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, .auto_increment = 0, .initial_size = 0, + .max_size = 0, }; switch (cmd) { @@ -828,9 +822,7 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks) res = listener_check(test, listener, 1, 0, 3, 0, 1, 1); end: - if (pool) { - ast_threadpool_shutdown(pool); - } + ast_threadpool_shutdown(pool); ao2_cleanup(listener); ast_free(std1); ast_free(std2); @@ -848,12 +840,13 @@ AST_TEST_DEFINE(threadpool_auto_increment) struct simple_task_data *std3 = NULL; struct simple_task_data *std4 = NULL; enum ast_test_result_state res = AST_TEST_FAIL; - struct test_listener_data *tld; + struct test_listener_data *tld = NULL; struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, .auto_increment = 3, .initial_size = 0, + .max_size = 0, }; switch (cmd) { @@ -945,9 +938,7 @@ AST_TEST_DEFINE(threadpool_auto_increment) res = listener_check(test, listener, 1, 0, 4, 0, 3, 1); end: - if (pool) { - ast_threadpool_shutdown(pool); - } + ast_threadpool_shutdown(pool); ao2_cleanup(listener); ast_free(std1); ast_free(std2); @@ -957,6 +948,76 @@ end: return res; } +AST_TEST_DEFINE(threadpool_max_size) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct simple_task_data *std = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 3, + .initial_size = 0, + .max_size = 2, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "max_size"; + info->category = "/main/threadpool/"; + info->summary = "Test that the threadpool does not exceed its maximum size restriction"; + info->description = + "Create an empty threadpool and push a task to it. Once the task is\n" + "pushed, the threadpool should attempt to grow by three threads, but the\n" + "pool's restrictions should only allow two threads to be added.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + std = simple_task_data_alloc(); + if (!std) { + goto end; + } + + ast_threadpool_push(pool, simple_task, std); + + res = wait_for_completion(test, std); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 2); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 1, 1, 0, 2, 1); +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(std); + ast_free(tld); + return res; +} + AST_TEST_DEFINE(threadpool_reactivation) { struct ast_threadpool *pool = NULL; @@ -964,12 +1025,13 @@ AST_TEST_DEFINE(threadpool_reactivation) struct simple_task_data *std1 = NULL; struct simple_task_data *std2 = NULL; enum ast_test_result_state res = AST_TEST_FAIL; - struct test_listener_data *tld; + struct test_listener_data *tld = NULL; struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, .auto_increment = 0, .initial_size = 0, + .max_size = 0, }; switch (cmd) { @@ -1052,9 +1114,7 @@ AST_TEST_DEFINE(threadpool_reactivation) res = listener_check(test, listener, 1, 1, 2, 0, 1, 1); end: - if (pool) { - ast_threadpool_shutdown(pool); - } + ast_threadpool_shutdown(pool); ao2_cleanup(listener); ast_free(std1); ast_free(std2); @@ -1133,12 +1193,13 @@ AST_TEST_DEFINE(threadpool_task_distribution) struct complex_task_data *ctd1 = NULL; struct complex_task_data *ctd2 = NULL; enum ast_test_result_state res = AST_TEST_FAIL; - struct test_listener_data *tld; + struct test_listener_data *tld = NULL; struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, .auto_increment = 0, .initial_size = 0, + .max_size = 0, }; switch (cmd) { @@ -1211,9 +1272,7 @@ AST_TEST_DEFINE(threadpool_task_distribution) res = listener_check(test, listener, 1, 0, 2, 0, 2, 1); end: - if (pool) { - ast_threadpool_shutdown(pool); - } + ast_threadpool_shutdown(pool); ao2_cleanup(listener); ast_free(ctd1); ast_free(ctd2); @@ -1228,12 +1287,13 @@ AST_TEST_DEFINE(threadpool_more_destruction) struct complex_task_data *ctd1 = NULL; struct complex_task_data *ctd2 = NULL; enum ast_test_result_state res = AST_TEST_FAIL; - struct test_listener_data *tld; + struct test_listener_data *tld = NULL; struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, .auto_increment = 0, .initial_size = 0, + .max_size = 0, }; switch (cmd) { @@ -1244,7 +1304,7 @@ AST_TEST_DEFINE(threadpool_more_destruction) info->description = "Push two tasks into a threadpool. Set the threadpool size to 4\n" "Ensure that there are 2 active and 2 idle threads. Then shrink the\n" - "threadpool down to 1 thread. Ensure that the thread leftove is active\n" + "threadpool down to 1 thread. Ensure that the thread leftover is active\n" "and ensure that both tasks complete.\n"; return AST_TEST_NOT_RUN; case TEST_EXECUTE: @@ -1323,9 +1383,7 @@ AST_TEST_DEFINE(threadpool_more_destruction) res = listener_check(test, listener, 1, 0, 2, 0, 1, 1); end: - if (pool) { - ast_threadpool_shutdown(pool); - } + ast_threadpool_shutdown(pool); ao2_cleanup(listener); ast_free(ctd1); ast_free(ctd2); @@ -1344,6 +1402,7 @@ static int unload_module(void) ast_test_unregister(threadpool_one_thread_one_task); ast_test_unregister(threadpool_one_thread_multiple_tasks); ast_test_unregister(threadpool_auto_increment); + ast_test_unregister(threadpool_max_size); ast_test_unregister(threadpool_reactivation); ast_test_unregister(threadpool_task_distribution); ast_test_unregister(threadpool_more_destruction); @@ -1361,6 +1420,7 @@ static int load_module(void) ast_test_register(threadpool_one_thread_one_task); ast_test_register(threadpool_one_thread_multiple_tasks); ast_test_register(threadpool_auto_increment); + ast_test_register(threadpool_max_size); ast_test_register(threadpool_reactivation); ast_test_register(threadpool_task_distribution); ast_test_register(threadpool_more_destruction);