diff --git a/main/threadpool.c b/main/threadpool.c index 362c765cff..15450bc2ff 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -22,7 +22,16 @@ #include "asterisk/threadpool.h" #include "asterisk/taskprocessor.h" -struct ast_threadpool; +#define THREAD_BUCKETS 89 + +static int id_counter; + +struct ast_threadpool { + struct ast_threadpool_listener *threadpool_listener; + struct ao2_container *active_threads; + struct ao2_container *idle_threads; + struct ao2_container *zombie_threads; +} enum worker_state { ALIVE, @@ -31,15 +40,131 @@ enum worker_state { }; struct worker_thread { + int id; ast_cond_t cond; ast_mutex_t lock; pthread_t thread; struct ast_threadpool *pool; - AST_LIST_ENTRY(struct worker_thread) next; - int wake_up; enum worker_state state; + int wake_up; }; +static int worker_thread_hash(const void *obj) +{ + struct worker_thread *worker= obj; + + return worker->id; +} + +static int worker_thread_cmp(void *obj, void *arg, int flags) +{ + struct worker_thread *worker1 = obj; + struct worker_thread *worker2 = arg; + + return worker1->id == worker2->id ? CMP_MATCH : 0; +} + +static worker_thread *worker_thread_alloc(struct ast_threadpool *pool) +{ + struct worker_thread *worker = ao2_alloc(1, sizeof(*worker)); + if (!worker) { + /* XXX Dangit! */ + return NULL; + } + worker->id = ast_atomic_fetchadd_int(&id_counter, 1); + ast_mutex_init(&worker->lock); + ast_cond_init(&worker->cond, NULL); + worker->pool = pool; + worker->thread = AST_PTHREADT_NULL; + worker->state = ALIVE; + if (ast_pthread_create(&worker->thread, NULL, worker_active, worker) < 0) { + /* XXX Poop! */ + ao2_ref(worker, -1); + return NULL; + } + return worker; +} + +static void threadpool_send_state_changed(struct ast_threadpool *pool) +{ + int active_size = ao2_container_count(pool->active_threads); + int idle_size = ao2_container_count(pool->idle_threads); + int zombie_size = ao2_container_count(pool->zombie_threads); + + pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size, zombie_size); +} + +struct thread_worker_pair { + struct ast_threadpool *pool; + struct worker_thread *worker; +}; + +static void thread_worker_pair_destructor(void *obj) +{ + struct thread_worker_pair *pair = obj; + ao2_ref(pair->pool, -1); + ao2_ref(pair->worker, -1); +} + +struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool, + struct worker_thread *worker) +{ + struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor); + if (!pair) { + /*XXX Crap */ + return NULL; + } + pair->pool = ao2_ref(pool); + pair->worker = ao2_ref(worker); + return pair; +} + +static int queued_active_thread_idle(void *data) +{ + struct thread_worker_pair *pair = data; + + ao2_link(pair->pool->idle_threads, pair->worker); + ao2_unlink(pair->pool->active_threads, pair->worker); + + threadpool_send_state_changed(pair->pool); + + ao2_ref(pair, -1); + return 0; +} + +static void threadpool_active_thread_idle(struct ast_threadpool *pool, + struct worker_thread *worker) +{ + struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker); + if (!pair) { + /*XXX Crap */ + return; + } + ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle(pair)); +} + +static int queued_zombie_thread_dead(void *data) +{ + struct thread_worker_pair *pair = data; + + ao2_unlink(pair->pool->zombie_threads, pair->worker); + threadpool_send_state_changed(pair->pool); + + ao2_ref(pair, -1); + return 0; +} + +static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, + struct worker_thread *worker) +{ + struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker); + if (!pair) { + /* XXX Crap */ + return; + } + ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead(pair)); +} + static int worker_idle(struct worker_thread *worker) { SCOPED_MUTEX(lock, &worker->lock); @@ -78,19 +203,31 @@ static int worker_active(struct worker_thread *worker) return 0; } -struct ast_threadpool { - struct ast_threadpool_listener *threadpool_listener; - int active_threads; - int idle_threads; - int zombie_threads; -} static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener) { - RAII_VAR(ast_threadpool *, threadpool, - ao2_alloc(sizeof(*threadpool), threadpool_destroy), ao2_cleanup); + RAII_VAR(ast_threadpool *, pool, + ao2_alloc(sizeof(*pool), threadpool_destroy), ao2_cleanup); - return threadpool; + pool->control_tps = ast_taskprocessor_get(/* XXX ??? */, TPS_REF_DEFAULT); + if (!pool->control_tps) { + return NULL; + } + pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp); + if (!pool->active_threads) { + return NULL; + } + pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp); + if (!pool->idle_threads) { + return NULL; + } + pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp); + if (!pool->zombie_thread) { + return NULL; + } + + ao2_ref(pool, +1); + return pool; } static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener) @@ -140,8 +277,116 @@ static struct ast_taskprocessor_listener *threadpool_tps_alloc(void) return ast_taskprocessor_create_with_listener(tps_listener); } +static void grow(struct ast_threadpool *pool, int delta) +{ + int i; + for (i = 0; i < delta; ++i) { + struct worker_thread *worker = worker_thread_alloc(pool); + if (!worker) { + /* XXX Abandon */ + return; + } + ao2_link(pool->active_threads, worker); + } +} + +static int kill_threads(void *obj, void *arg, int flags) +{ + int *num_to_kill = arg; + + if ((*num_to_kill)-- > 0) { + return CMP_MATCH; + } else { + return CMP_STOP; + } +} + +static int zombify_threads(void *obj, void *arg, void *data, int flags) +{ + struct worker_thread *worker = obj; + struct ast_threadpool *pool = arg; + int *num_to_zombify = data; + + if ((*num_to_zombify)-- > 0) { + ao2_link(pool->zombie_threads, worker); + return CMP_MATCH; + } else { + return CMP_STOP; + } +} + +static void shrink(struct ast_threadpool *pool, int delta) +{ + /* + * Preference is to kill idle threads, but + * we'll move on to deactivating active threads + * if we have to + */ + int idle_threads = ao2_container_count(pool->idle_threads); + int idle_threads_to_kill = MIN(delta, idle_threads); + int active_threads_to_zombify = delta - idle_threads_to_kill; + int i = 0; + + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK, + kill_threads, &idle_threads_to_kill); + + ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK, + zombify_threads, pool, &active_threads_to_zombify); +} + +struct set_size_data { + struct threadpool *pool; + int size; +}; + +void set_size_data_destroy(void *obj) +{ + struct set_size_data *ssd = obj; + ao2_ref(ssd->pool, -1); +} + +static int queued_set_size(void *data) +{ + struct set_size_data *ssd = data; + struct ast_threadpool *pool = ssd->pool; + int num_threads = ssd->size; + + /* We don't count zombie threads as being "live when potentially resizing */ + int current_size = ao2_container_count(pool->active_threads) + + ao2_container_count(pool->idle_threads); + + if (current_size = num_threads) { + return 0; + } + + if (current_size < num_threads) { + grow(pool, num_threads - current_size); + } else { + shrink(pool, current_size - num_threads); + } + + threadpool_send_state_changed(pool); + ao2_ref(set_size_data, -1); +} + void ast_threadpool_set_size(struct ast_threadpool *pool, int size) { + struct set_size_data *ssd; + if (size < 0) { + ast_log(LOG_WARNING, "Invalid threadpool size used for resizing: %d\n", size); + return; + } + + ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy); + if (!ssd) { + /* XXX Crap */ + return; + } + + ssd->pool = ao2_ref(pool); + ssd->size = size; + + ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd); } struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)