diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index 6bb0dcc4f3..3f3634129e 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -82,6 +82,18 @@ struct ast_threadpool_listener { void *private_data; }; +struct ast_threadpool_options { +#define AST_THREADPOOL_OPTIONS_VERSION 1 + /*! Version of thradpool options in use */ + int version; + /* ! + * \brief Time limit in seconds for idle threads + * + * A time of 0 or less will mean an infinite timeout. + */ + int idle_timeout; +}; + /*! * \brief Allocate a threadpool listener * @@ -106,7 +118,8 @@ struct ast_threadpool_listener *ast_threadpool_listener_alloc( * \retval NULL Failed to create the threadpool * \retval non-NULL The newly-created threadpool */ -struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size); +struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, + int initial_size, const struct ast_threadpool_options *options); /*! * \brief Set the number of threads for the thread pool diff --git a/main/threadpool.c b/main/threadpool.c index 45e8638057..18c1349ae5 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -95,6 +95,8 @@ struct ast_threadpool { struct ast_taskprocessor *control_tps; /*! True if the threadpool is in the processof shutting down */ int shutting_down; + /*! Threadpool-specific options */ + struct ast_threadpool_options options; }; /*! @@ -266,6 +268,32 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair); } +static int queued_idle_thread_dead(void *data) +{ + struct thread_worker_pair *pair = data; + + ao2_unlink(pair->pool->idle_threads, pair->worker); + threadpool_send_state_changed(pair->pool); + + ao2_ref(pair, -1); + return 0; +} + +static void threadpool_idle_thread_dead(struct ast_threadpool *pool, + struct worker_thread *worker) +{ + struct thread_worker_pair *pair; + SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { + return; + } + pair = thread_worker_pair_alloc(pool, worker); + if (!pair) { + return; + } + ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair); +} + /*! * \brief Execute a task in the threadpool * @@ -749,7 +777,13 @@ struct ast_threadpool_listener *ast_threadpool_listener_alloc( return listener; } -struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size) +struct pool_options_pair { + struct ast_threadpool *pool; + struct ast_threadpool_options options; +}; + +struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, + int initial_size, const struct ast_threadpool_options *options) { struct ast_threadpool *pool; struct ast_taskprocessor *tps; @@ -771,6 +805,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis pool->tps = tps; ao2_ref(listener, +1); pool->listener = listener; + pool->options = *options; ast_threadpool_set_size(pool, initial_size); return pool; } @@ -814,6 +849,8 @@ struct worker_thread { enum worker_state state; /*! A boolean used to determine if an idle thread should become active */ int wake_up; + /*! Options for this threadpool */ + struct ast_threadpool_options options; }; /*! @@ -864,7 +901,7 @@ static void worker_shutdown(struct worker_thread *worker) static void worker_thread_destroy(void *obj) { struct worker_thread *worker = obj; - ast_log(LOG_NOTICE, "Worker dying\n"); + ast_debug(1, "Destroying worker thread\n"); worker_shutdown(worker); ast_mutex_destroy(&worker->lock); ast_cond_destroy(&worker->cond); @@ -909,6 +946,7 @@ static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool) worker->pool = pool; worker->thread = AST_PTHREADT_NULL; worker->state = ALIVE; + worker->options = pool->options; if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) { ast_log(LOG_ERROR, "Unable to start worker thread!\n"); ao2_ref(worker, -1); @@ -961,13 +999,28 @@ static void worker_active(struct worker_thread *worker) */ static int worker_idle(struct worker_thread *worker) { + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + worker->options.idle_timeout, + .tv_nsec = start.tv_usec * 1000, + }; SCOPED_MUTEX(lock, &worker->lock); if (worker->state != ALIVE) { return 0; } threadpool_active_thread_idle(worker->pool, worker); while (!worker->wake_up) { - ast_cond_wait(&worker->cond, lock); + if (worker->options.idle_timeout <= 0) { + ast_cond_wait(&worker->cond, lock); + } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) { + break; + } + } + + if (!worker->wake_up) { + ast_debug(1, "Worker thread idle timeout reached. Dying.\n"); + threadpool_idle_thread_dead(worker->pool, worker); + worker->state = DEAD; } worker->wake_up = 0; return worker->state == ALIVE; diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index 373d0c0283..5de5168394 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -257,6 +257,10 @@ AST_TEST_DEFINE(threadpool_push) struct ast_threadpool_listener *listener = NULL; struct simple_task_data *std = NULL; enum ast_test_result_state res = AST_TEST_FAIL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + }; switch (cmd) { case TEST_INIT: @@ -275,7 +279,7 @@ AST_TEST_DEFINE(threadpool_push) return AST_TEST_FAIL; } - pool = ast_threadpool_create(listener, 0); + pool = ast_threadpool_create(listener, 0, &options); if (!pool) { goto end; } @@ -306,6 +310,10 @@ AST_TEST_DEFINE(threadpool_thread_creation) struct ast_threadpool_listener *listener = NULL; enum ast_test_result_state res = AST_TEST_FAIL; struct test_listener_data *tld; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + }; switch (cmd) { case TEST_INIT: @@ -325,7 +333,7 @@ AST_TEST_DEFINE(threadpool_thread_creation) } tld = listener->private_data; - pool = ast_threadpool_create(listener, 0); + pool = ast_threadpool_create(listener, 0, &options); if (!pool) { goto end; } @@ -353,6 +361,10 @@ AST_TEST_DEFINE(threadpool_thread_destruction) struct ast_threadpool_listener *listener = NULL; enum ast_test_result_state res = AST_TEST_FAIL; struct test_listener_data *tld; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + }; switch (cmd) { case TEST_INIT: @@ -372,7 +384,7 @@ AST_TEST_DEFINE(threadpool_thread_destruction) } tld = listener->private_data; - pool = ast_threadpool_create(listener, 0); + pool = ast_threadpool_create(listener, 0, &options); if (!pool) { goto end; } @@ -400,6 +412,62 @@ end: return res; } +AST_TEST_DEFINE(threadpool_thread_timeout) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 5, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "threadpool_thread_timeout"; + info->category = "/main/threadpool/"; + info->summary = "Test threadpool thread timeout"; + info->description = + "Ensure that a thread with a five second timeout dies as expected."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks); + if (!listener) { + return AST_TEST_FAIL; + } + tld = listener->private_data; + + pool = ast_threadpool_create(listener, 0, &options); + if (!pool) { + goto end; + } + + ast_threadpool_set_size(pool, 1); + + WAIT_WHILE(tld, tld->num_idle < 1); + + res = listener_check(test, listener, 0, 0, 0, 0, 1, 0); + if (res == AST_TEST_FAIL) { + goto end; + } + + /* The thread should time out after 5 seconds */ + WAIT_WHILE(tld, tld->num_idle > 0); + + res = listener_check(test, listener, 0, 0, 0, 0, 0, 0); + +end: + if (pool) { + ast_threadpool_shutdown(pool); + } + ao2_cleanup(listener); + return res; +} + AST_TEST_DEFINE(threadpool_one_task_one_thread) { struct ast_threadpool *pool = NULL; @@ -407,6 +475,10 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread) struct simple_task_data *std = NULL; enum ast_test_result_state res = AST_TEST_FAIL; struct test_listener_data *tld; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + }; switch (cmd) { case TEST_INIT: @@ -426,7 +498,7 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread) } tld = listener->private_data; - pool = ast_threadpool_create(listener, 0); + pool = ast_threadpool_create(listener, 0, &options); if (!pool) { goto end; } @@ -476,6 +548,10 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task) struct simple_task_data *std = NULL; enum ast_test_result_state res = AST_TEST_FAIL; struct test_listener_data *tld; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + }; switch (cmd) { case TEST_INIT: @@ -495,7 +571,7 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task) } tld = listener->private_data; - pool = ast_threadpool_create(listener, 0); + pool = ast_threadpool_create(listener, 0, &options); if (!pool) { goto end; } @@ -545,6 +621,10 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks) struct simple_task_data *std3 = NULL; enum ast_test_result_state res = AST_TEST_FAIL; struct test_listener_data *tld; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + }; switch (cmd) { case TEST_INIT: @@ -564,7 +644,7 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks) } tld = listener->private_data; - pool = ast_threadpool_create(listener, 0); + pool = ast_threadpool_create(listener, 0, &options); if (!pool) { goto end; } @@ -626,6 +706,10 @@ AST_TEST_DEFINE(threadpool_reactivation) struct simple_task_data *std2 = NULL; enum ast_test_result_state res = AST_TEST_FAIL; struct test_listener_data *tld; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + }; switch (cmd) { case TEST_INIT: @@ -647,7 +731,7 @@ AST_TEST_DEFINE(threadpool_reactivation) } tld = listener->private_data; - pool = ast_threadpool_create(listener, 0); + pool = ast_threadpool_create(listener, 0, &options); if (!pool) { goto end; } @@ -773,6 +857,10 @@ AST_TEST_DEFINE(threadpool_task_distribution) struct complex_task_data *ctd2 = NULL; enum ast_test_result_state res = AST_TEST_FAIL; struct test_listener_data *tld; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + }; switch (cmd) { case TEST_INIT: @@ -793,7 +881,7 @@ AST_TEST_DEFINE(threadpool_task_distribution) } tld = listener->private_data; - pool = ast_threadpool_create(listener, 0); + pool = ast_threadpool_create(listener, 0, &options); if (!pool) { goto end; } @@ -851,6 +939,10 @@ AST_TEST_DEFINE(threadpool_more_destruction) struct complex_task_data *ctd2 = NULL; enum ast_test_result_state res = AST_TEST_FAIL; struct test_listener_data *tld; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + }; switch (cmd) { case TEST_INIT: @@ -873,7 +965,7 @@ AST_TEST_DEFINE(threadpool_more_destruction) } tld = listener->private_data; - pool = ast_threadpool_create(listener, 0); + pool = ast_threadpool_create(listener, 0, &options); if (!pool) { goto end; } @@ -940,6 +1032,7 @@ static int unload_module(void) ast_test_unregister(threadpool_push); ast_test_unregister(threadpool_thread_creation); ast_test_unregister(threadpool_thread_destruction); + ast_test_unregister(threadpool_thread_timeout); ast_test_unregister(threadpool_one_task_one_thread); ast_test_unregister(threadpool_one_thread_one_task); ast_test_unregister(threadpool_one_thread_multiple_tasks); @@ -954,6 +1047,7 @@ static int load_module(void) ast_test_register(threadpool_push); ast_test_register(threadpool_thread_creation); ast_test_register(threadpool_thread_destruction); + ast_test_register(threadpool_thread_timeout); ast_test_register(threadpool_one_task_one_thread); ast_test_register(threadpool_one_thread_one_task); ast_test_register(threadpool_one_thread_multiple_tasks);