diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index 98ee8cf471..6bb0dcc4f3 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -25,6 +25,15 @@ struct ast_taskprocessor; struct ast_threadpool_listener; struct ast_threadpool_listener_callbacks { + /*! + * \brief Allocate the listener's private data + * + * It is not necessary to assign the private data to the listener. + * \param listener The listener the private data will belong to + * \retval NULL Failure to allocate private data + * \retval non-NULL The newly allocated private data + */ + void *(*alloc)(struct ast_threadpool_listener *listener); /*! * \brief Indicates that the state of threads in the pool has changed * @@ -32,23 +41,31 @@ struct ast_threadpool_listener_callbacks { * \param active_threads The number of active threads in the pool * \param idle_threads The number of idle threads in the pool */ - void (*state_changed)(struct ast_threadpool_listener *listener, + void (*state_changed)(struct ast_threadpool *pool, + struct ast_threadpool_listener *listener, int active_threads, int idle_threads); /*! - * \brief Indicates that a task was pushed to the threadpool's taskprocessor + * \brief Indicates that a task was pushed to the threadpool * * \param listener The threadpool listener - * \param was_empty Indicates whether the taskprocessor was empty prior to adding the task + * \param was_empty Indicates whether there were any tasks prior to adding the new one. */ - void (*tps_task_pushed)(struct ast_threadpool_listener *listener, + void (*task_pushed)(struct ast_threadpool *pool, + struct ast_threadpool_listener *listener, int was_empty); /*! * \brief Indicates the threadpoo's taskprocessor has become empty * * \param listener The threadpool's listener */ - void (*emptied)(struct ast_threadpool_listener *listener); + void (*emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener); + + /*! + * \brief Free the listener's private data + * \param private_data The private data to destroy + */ + void (*destroy)(void *private_data); }; /*! @@ -60,13 +77,24 @@ struct ast_threadpool_listener_callbacks { */ struct ast_threadpool_listener { /*! Callbacks called by the threadpool */ - struct ast_threadpool_listener_callbacks *callbacks; - /*! Handle to the threadpool */ - struct ast_threadpool *threadpool; + const struct ast_threadpool_listener_callbacks *callbacks; /*! User data for the listener */ void *private_data; }; +/*! + * \brief Allocate a threadpool listener + * + * This function will call back into the alloc callback for the + * listener. + * + * \param callbacks Listener callbacks to assign to the listener + * \retval NULL Failed to allocate the listener + * \retval non-NULL The newly-created threadpool listener + */ +struct ast_threadpool_listener *ast_threadpool_listener_alloc( + const struct ast_threadpool_listener_callbacks *callbacks); + /*! * \brief Create a new threadpool * diff --git a/main/threadpool.c b/main/threadpool.c index 1f1812a648..8662c3a393 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -123,7 +123,7 @@ static void threadpool_send_state_changed(struct ast_threadpool *pool) int active_size = ao2_container_count(pool->active_threads); int idle_size = ao2_container_count(pool->idle_threads); - pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size); + pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size); } /*! @@ -232,11 +232,6 @@ static int threadpool_execute(struct ast_threadpool *pool) static void threadpool_destructor(void *obj) { struct ast_threadpool *pool = obj; - /* XXX Probably should let the listener know we're being destroyed? */ - - /* Threads should all be shut down by now, so this should be a painless - * operation - */ ao2_cleanup(pool->listener); } @@ -342,7 +337,7 @@ static int handle_task_pushed(void *data) struct ast_threadpool *pool = tpd->pool; int was_empty = tpd->was_empty; - pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty); + pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty); ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, activate_threads, pool); ao2_ref(tpd, -1); @@ -382,7 +377,7 @@ static int handle_emptied(void *data) { struct ast_threadpool *pool = data; - pool->listener->callbacks->emptied(pool->listener); + pool->listener->callbacks->emptied(pool, pool->listener); return 0; } @@ -587,6 +582,29 @@ void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size) ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd); } +static void listener_destructor(void *obj) +{ + struct ast_threadpool_listener *listener = obj; + + listener->callbacks->destroy(listener->private_data); +} + +struct ast_threadpool_listener *ast_threadpool_listener_alloc( + const struct ast_threadpool_listener_callbacks *callbacks) +{ + struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), listener_destructor); + if (!listener) { + return NULL; + } + listener->callbacks = callbacks; + listener->private_data = listener->callbacks->alloc(listener); + if (!listener->private_data) { + ao2_ref(listener, -1); + return NULL; + } + return listener; +} + struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size) { struct ast_threadpool *pool;