Add threadpool options and accompanying test.
The only test added so far is an idle thread timeout option. This will greatly aid threadpool users who wish to maintain a threadpool by allowing for idle threads to die out as necessary. Test passes. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377580 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
parent
5dd22df050
commit
64deed062a
|
@ -82,6 +82,18 @@ struct ast_threadpool_listener {
|
|||
void *private_data;
|
||||
};
|
||||
|
||||
struct ast_threadpool_options {
|
||||
#define AST_THREADPOOL_OPTIONS_VERSION 1
|
||||
/*! Version of thradpool options in use */
|
||||
int version;
|
||||
/* !
|
||||
* \brief Time limit in seconds for idle threads
|
||||
*
|
||||
* A time of 0 or less will mean an infinite timeout.
|
||||
*/
|
||||
int idle_timeout;
|
||||
};
|
||||
|
||||
/*!
|
||||
* \brief Allocate a threadpool listener
|
||||
*
|
||||
|
@ -106,7 +118,8 @@ struct ast_threadpool_listener *ast_threadpool_listener_alloc(
|
|||
* \retval NULL Failed to create the threadpool
|
||||
* \retval non-NULL The newly-created threadpool
|
||||
*/
|
||||
struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size);
|
||||
struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener,
|
||||
int initial_size, const struct ast_threadpool_options *options);
|
||||
|
||||
/*!
|
||||
* \brief Set the number of threads for the thread pool
|
||||
|
|
|
@ -95,6 +95,8 @@ struct ast_threadpool {
|
|||
struct ast_taskprocessor *control_tps;
|
||||
/*! True if the threadpool is in the processof shutting down */
|
||||
int shutting_down;
|
||||
/*! Threadpool-specific options */
|
||||
struct ast_threadpool_options options;
|
||||
};
|
||||
|
||||
/*!
|
||||
|
@ -266,6 +268,32 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
|
|||
ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
|
||||
}
|
||||
|
||||
static int queued_idle_thread_dead(void *data)
|
||||
{
|
||||
struct thread_worker_pair *pair = data;
|
||||
|
||||
ao2_unlink(pair->pool->idle_threads, pair->worker);
|
||||
threadpool_send_state_changed(pair->pool);
|
||||
|
||||
ao2_ref(pair, -1);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
|
||||
struct worker_thread *worker)
|
||||
{
|
||||
struct thread_worker_pair *pair;
|
||||
SCOPED_AO2LOCK(lock, pool);
|
||||
if (pool->shutting_down) {
|
||||
return;
|
||||
}
|
||||
pair = thread_worker_pair_alloc(pool, worker);
|
||||
if (!pair) {
|
||||
return;
|
||||
}
|
||||
ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
|
||||
}
|
||||
|
||||
/*!
|
||||
* \brief Execute a task in the threadpool
|
||||
*
|
||||
|
@ -749,7 +777,13 @@ struct ast_threadpool_listener *ast_threadpool_listener_alloc(
|
|||
return listener;
|
||||
}
|
||||
|
||||
struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
|
||||
struct pool_options_pair {
|
||||
struct ast_threadpool *pool;
|
||||
struct ast_threadpool_options options;
|
||||
};
|
||||
|
||||
struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener,
|
||||
int initial_size, const struct ast_threadpool_options *options)
|
||||
{
|
||||
struct ast_threadpool *pool;
|
||||
struct ast_taskprocessor *tps;
|
||||
|
@ -771,6 +805,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis
|
|||
pool->tps = tps;
|
||||
ao2_ref(listener, +1);
|
||||
pool->listener = listener;
|
||||
pool->options = *options;
|
||||
ast_threadpool_set_size(pool, initial_size);
|
||||
return pool;
|
||||
}
|
||||
|
@ -814,6 +849,8 @@ struct worker_thread {
|
|||
enum worker_state state;
|
||||
/*! A boolean used to determine if an idle thread should become active */
|
||||
int wake_up;
|
||||
/*! Options for this threadpool */
|
||||
struct ast_threadpool_options options;
|
||||
};
|
||||
|
||||
/*!
|
||||
|
@ -864,7 +901,7 @@ static void worker_shutdown(struct worker_thread *worker)
|
|||
static void worker_thread_destroy(void *obj)
|
||||
{
|
||||
struct worker_thread *worker = obj;
|
||||
ast_log(LOG_NOTICE, "Worker dying\n");
|
||||
ast_debug(1, "Destroying worker thread\n");
|
||||
worker_shutdown(worker);
|
||||
ast_mutex_destroy(&worker->lock);
|
||||
ast_cond_destroy(&worker->cond);
|
||||
|
@ -909,6 +946,7 @@ static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
|
|||
worker->pool = pool;
|
||||
worker->thread = AST_PTHREADT_NULL;
|
||||
worker->state = ALIVE;
|
||||
worker->options = pool->options;
|
||||
if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
|
||||
ast_log(LOG_ERROR, "Unable to start worker thread!\n");
|
||||
ao2_ref(worker, -1);
|
||||
|
@ -961,13 +999,28 @@ static void worker_active(struct worker_thread *worker)
|
|||
*/
|
||||
static int worker_idle(struct worker_thread *worker)
|
||||
{
|
||||
struct timeval start = ast_tvnow();
|
||||
struct timespec end = {
|
||||
.tv_sec = start.tv_sec + worker->options.idle_timeout,
|
||||
.tv_nsec = start.tv_usec * 1000,
|
||||
};
|
||||
SCOPED_MUTEX(lock, &worker->lock);
|
||||
if (worker->state != ALIVE) {
|
||||
return 0;
|
||||
}
|
||||
threadpool_active_thread_idle(worker->pool, worker);
|
||||
while (!worker->wake_up) {
|
||||
ast_cond_wait(&worker->cond, lock);
|
||||
if (worker->options.idle_timeout <= 0) {
|
||||
ast_cond_wait(&worker->cond, lock);
|
||||
} else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!worker->wake_up) {
|
||||
ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
|
||||
threadpool_idle_thread_dead(worker->pool, worker);
|
||||
worker->state = DEAD;
|
||||
}
|
||||
worker->wake_up = 0;
|
||||
return worker->state == ALIVE;
|
||||
|
|
|
@ -257,6 +257,10 @@ AST_TEST_DEFINE(threadpool_push)
|
|||
struct ast_threadpool_listener *listener = NULL;
|
||||
struct simple_task_data *std = NULL;
|
||||
enum ast_test_result_state res = AST_TEST_FAIL;
|
||||
struct ast_threadpool_options options = {
|
||||
.version = AST_THREADPOOL_OPTIONS_VERSION,
|
||||
.idle_timeout = 0,
|
||||
};
|
||||
|
||||
switch (cmd) {
|
||||
case TEST_INIT:
|
||||
|
@ -275,7 +279,7 @@ AST_TEST_DEFINE(threadpool_push)
|
|||
return AST_TEST_FAIL;
|
||||
}
|
||||
|
||||
pool = ast_threadpool_create(listener, 0);
|
||||
pool = ast_threadpool_create(listener, 0, &options);
|
||||
if (!pool) {
|
||||
goto end;
|
||||
}
|
||||
|
@ -306,6 +310,10 @@ AST_TEST_DEFINE(threadpool_thread_creation)
|
|||
struct ast_threadpool_listener *listener = NULL;
|
||||
enum ast_test_result_state res = AST_TEST_FAIL;
|
||||
struct test_listener_data *tld;
|
||||
struct ast_threadpool_options options = {
|
||||
.version = AST_THREADPOOL_OPTIONS_VERSION,
|
||||
.idle_timeout = 0,
|
||||
};
|
||||
|
||||
switch (cmd) {
|
||||
case TEST_INIT:
|
||||
|
@ -325,7 +333,7 @@ AST_TEST_DEFINE(threadpool_thread_creation)
|
|||
}
|
||||
tld = listener->private_data;
|
||||
|
||||
pool = ast_threadpool_create(listener, 0);
|
||||
pool = ast_threadpool_create(listener, 0, &options);
|
||||
if (!pool) {
|
||||
goto end;
|
||||
}
|
||||
|
@ -353,6 +361,10 @@ AST_TEST_DEFINE(threadpool_thread_destruction)
|
|||
struct ast_threadpool_listener *listener = NULL;
|
||||
enum ast_test_result_state res = AST_TEST_FAIL;
|
||||
struct test_listener_data *tld;
|
||||
struct ast_threadpool_options options = {
|
||||
.version = AST_THREADPOOL_OPTIONS_VERSION,
|
||||
.idle_timeout = 0,
|
||||
};
|
||||
|
||||
switch (cmd) {
|
||||
case TEST_INIT:
|
||||
|
@ -372,7 +384,7 @@ AST_TEST_DEFINE(threadpool_thread_destruction)
|
|||
}
|
||||
tld = listener->private_data;
|
||||
|
||||
pool = ast_threadpool_create(listener, 0);
|
||||
pool = ast_threadpool_create(listener, 0, &options);
|
||||
if (!pool) {
|
||||
goto end;
|
||||
}
|
||||
|
@ -400,6 +412,62 @@ end:
|
|||
return res;
|
||||
}
|
||||
|
||||
AST_TEST_DEFINE(threadpool_thread_timeout)
|
||||
{
|
||||
struct ast_threadpool *pool = NULL;
|
||||
struct ast_threadpool_listener *listener = NULL;
|
||||
enum ast_test_result_state res = AST_TEST_FAIL;
|
||||
struct test_listener_data *tld;
|
||||
struct ast_threadpool_options options = {
|
||||
.version = AST_THREADPOOL_OPTIONS_VERSION,
|
||||
.idle_timeout = 5,
|
||||
};
|
||||
|
||||
switch (cmd) {
|
||||
case TEST_INIT:
|
||||
info->name = "threadpool_thread_timeout";
|
||||
info->category = "/main/threadpool/";
|
||||
info->summary = "Test threadpool thread timeout";
|
||||
info->description =
|
||||
"Ensure that a thread with a five second timeout dies as expected.";
|
||||
return AST_TEST_NOT_RUN;
|
||||
case TEST_EXECUTE:
|
||||
break;
|
||||
}
|
||||
|
||||
listener = ast_threadpool_listener_alloc(&test_callbacks);
|
||||
if (!listener) {
|
||||
return AST_TEST_FAIL;
|
||||
}
|
||||
tld = listener->private_data;
|
||||
|
||||
pool = ast_threadpool_create(listener, 0, &options);
|
||||
if (!pool) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_set_size(pool, 1);
|
||||
|
||||
WAIT_WHILE(tld, tld->num_idle < 1);
|
||||
|
||||
res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
|
||||
if (res == AST_TEST_FAIL) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
/* The thread should time out after 5 seconds */
|
||||
WAIT_WHILE(tld, tld->num_idle > 0);
|
||||
|
||||
res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
|
||||
|
||||
end:
|
||||
if (pool) {
|
||||
ast_threadpool_shutdown(pool);
|
||||
}
|
||||
ao2_cleanup(listener);
|
||||
return res;
|
||||
}
|
||||
|
||||
AST_TEST_DEFINE(threadpool_one_task_one_thread)
|
||||
{
|
||||
struct ast_threadpool *pool = NULL;
|
||||
|
@ -407,6 +475,10 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread)
|
|||
struct simple_task_data *std = NULL;
|
||||
enum ast_test_result_state res = AST_TEST_FAIL;
|
||||
struct test_listener_data *tld;
|
||||
struct ast_threadpool_options options = {
|
||||
.version = AST_THREADPOOL_OPTIONS_VERSION,
|
||||
.idle_timeout = 0,
|
||||
};
|
||||
|
||||
switch (cmd) {
|
||||
case TEST_INIT:
|
||||
|
@ -426,7 +498,7 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread)
|
|||
}
|
||||
tld = listener->private_data;
|
||||
|
||||
pool = ast_threadpool_create(listener, 0);
|
||||
pool = ast_threadpool_create(listener, 0, &options);
|
||||
if (!pool) {
|
||||
goto end;
|
||||
}
|
||||
|
@ -476,6 +548,10 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task)
|
|||
struct simple_task_data *std = NULL;
|
||||
enum ast_test_result_state res = AST_TEST_FAIL;
|
||||
struct test_listener_data *tld;
|
||||
struct ast_threadpool_options options = {
|
||||
.version = AST_THREADPOOL_OPTIONS_VERSION,
|
||||
.idle_timeout = 0,
|
||||
};
|
||||
|
||||
switch (cmd) {
|
||||
case TEST_INIT:
|
||||
|
@ -495,7 +571,7 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task)
|
|||
}
|
||||
tld = listener->private_data;
|
||||
|
||||
pool = ast_threadpool_create(listener, 0);
|
||||
pool = ast_threadpool_create(listener, 0, &options);
|
||||
if (!pool) {
|
||||
goto end;
|
||||
}
|
||||
|
@ -545,6 +621,10 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
|
|||
struct simple_task_data *std3 = NULL;
|
||||
enum ast_test_result_state res = AST_TEST_FAIL;
|
||||
struct test_listener_data *tld;
|
||||
struct ast_threadpool_options options = {
|
||||
.version = AST_THREADPOOL_OPTIONS_VERSION,
|
||||
.idle_timeout = 0,
|
||||
};
|
||||
|
||||
switch (cmd) {
|
||||
case TEST_INIT:
|
||||
|
@ -564,7 +644,7 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
|
|||
}
|
||||
tld = listener->private_data;
|
||||
|
||||
pool = ast_threadpool_create(listener, 0);
|
||||
pool = ast_threadpool_create(listener, 0, &options);
|
||||
if (!pool) {
|
||||
goto end;
|
||||
}
|
||||
|
@ -626,6 +706,10 @@ AST_TEST_DEFINE(threadpool_reactivation)
|
|||
struct simple_task_data *std2 = NULL;
|
||||
enum ast_test_result_state res = AST_TEST_FAIL;
|
||||
struct test_listener_data *tld;
|
||||
struct ast_threadpool_options options = {
|
||||
.version = AST_THREADPOOL_OPTIONS_VERSION,
|
||||
.idle_timeout = 0,
|
||||
};
|
||||
|
||||
switch (cmd) {
|
||||
case TEST_INIT:
|
||||
|
@ -647,7 +731,7 @@ AST_TEST_DEFINE(threadpool_reactivation)
|
|||
}
|
||||
tld = listener->private_data;
|
||||
|
||||
pool = ast_threadpool_create(listener, 0);
|
||||
pool = ast_threadpool_create(listener, 0, &options);
|
||||
if (!pool) {
|
||||
goto end;
|
||||
}
|
||||
|
@ -773,6 +857,10 @@ AST_TEST_DEFINE(threadpool_task_distribution)
|
|||
struct complex_task_data *ctd2 = NULL;
|
||||
enum ast_test_result_state res = AST_TEST_FAIL;
|
||||
struct test_listener_data *tld;
|
||||
struct ast_threadpool_options options = {
|
||||
.version = AST_THREADPOOL_OPTIONS_VERSION,
|
||||
.idle_timeout = 0,
|
||||
};
|
||||
|
||||
switch (cmd) {
|
||||
case TEST_INIT:
|
||||
|
@ -793,7 +881,7 @@ AST_TEST_DEFINE(threadpool_task_distribution)
|
|||
}
|
||||
tld = listener->private_data;
|
||||
|
||||
pool = ast_threadpool_create(listener, 0);
|
||||
pool = ast_threadpool_create(listener, 0, &options);
|
||||
if (!pool) {
|
||||
goto end;
|
||||
}
|
||||
|
@ -851,6 +939,10 @@ AST_TEST_DEFINE(threadpool_more_destruction)
|
|||
struct complex_task_data *ctd2 = NULL;
|
||||
enum ast_test_result_state res = AST_TEST_FAIL;
|
||||
struct test_listener_data *tld;
|
||||
struct ast_threadpool_options options = {
|
||||
.version = AST_THREADPOOL_OPTIONS_VERSION,
|
||||
.idle_timeout = 0,
|
||||
};
|
||||
|
||||
switch (cmd) {
|
||||
case TEST_INIT:
|
||||
|
@ -873,7 +965,7 @@ AST_TEST_DEFINE(threadpool_more_destruction)
|
|||
}
|
||||
tld = listener->private_data;
|
||||
|
||||
pool = ast_threadpool_create(listener, 0);
|
||||
pool = ast_threadpool_create(listener, 0, &options);
|
||||
if (!pool) {
|
||||
goto end;
|
||||
}
|
||||
|
@ -940,6 +1032,7 @@ static int unload_module(void)
|
|||
ast_test_unregister(threadpool_push);
|
||||
ast_test_unregister(threadpool_thread_creation);
|
||||
ast_test_unregister(threadpool_thread_destruction);
|
||||
ast_test_unregister(threadpool_thread_timeout);
|
||||
ast_test_unregister(threadpool_one_task_one_thread);
|
||||
ast_test_unregister(threadpool_one_thread_one_task);
|
||||
ast_test_unregister(threadpool_one_thread_multiple_tasks);
|
||||
|
@ -954,6 +1047,7 @@ static int load_module(void)
|
|||
ast_test_register(threadpool_push);
|
||||
ast_test_register(threadpool_thread_creation);
|
||||
ast_test_register(threadpool_thread_destruction);
|
||||
ast_test_register(threadpool_thread_timeout);
|
||||
ast_test_register(threadpool_one_task_one_thread);
|
||||
ast_test_register(threadpool_one_thread_one_task);
|
||||
ast_test_register(threadpool_one_thread_multiple_tasks);
|
||||
|
|
Loading…
Reference in New Issue