taskprocessor: Warn on unused result from pushing task.
Add attribute_warn_unused_result to ast_taskprocessor_push, ast_taskprocessor_push_local and ast_threadpool_push. This will help ensure we perform the necessary cleanup upon failure. Change-Id: I7e4079bd7b21cfe52fb431ea79e41314520c3f6d
This commit is contained in:
parent
e349bf40ca
commit
5ab94d2a3e
|
@ -1111,13 +1111,15 @@ static void destroy_conference_bridge(void *obj)
|
|||
if (conference->playback_queue) {
|
||||
struct hangup_data hangup;
|
||||
hangup_data_init(&hangup, conference);
|
||||
ast_taskprocessor_push(conference->playback_queue, hangup_playback, &hangup);
|
||||
|
||||
ast_mutex_lock(&hangup.lock);
|
||||
while (!hangup.hungup) {
|
||||
ast_cond_wait(&hangup.cond, &hangup.lock);
|
||||
if (!ast_taskprocessor_push(conference->playback_queue, hangup_playback, &hangup)) {
|
||||
ast_mutex_lock(&hangup.lock);
|
||||
while (!hangup.hungup) {
|
||||
ast_cond_wait(&hangup.cond, &hangup.lock);
|
||||
}
|
||||
ast_mutex_unlock(&hangup.lock);
|
||||
}
|
||||
ast_mutex_unlock(&hangup.lock);
|
||||
|
||||
hangup_data_destroy(&hangup);
|
||||
} else {
|
||||
/* Playback queue is not yet allocated. Just hang up the channel straight */
|
||||
|
|
|
@ -213,7 +213,8 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
|
|||
* \retval -1 failure
|
||||
* \since 1.6.1
|
||||
*/
|
||||
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
|
||||
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
|
||||
attribute_warn_unused_result;
|
||||
|
||||
/*! \brief Local data parameter */
|
||||
struct ast_taskprocessor_local {
|
||||
|
@ -239,7 +240,8 @@ struct ast_taskprocessor_local {
|
|||
* \since 12.0.0
|
||||
*/
|
||||
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
|
||||
int (*task_exe)(struct ast_taskprocessor_local *local), void *datap);
|
||||
int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
|
||||
attribute_warn_unused_result;
|
||||
|
||||
/*!
|
||||
* \brief Indicate the taskprocessor is suspended.
|
||||
|
|
|
@ -186,7 +186,8 @@ void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int siz
|
|||
* \retval 0 success
|
||||
* \retval -1 failure
|
||||
*/
|
||||
int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data);
|
||||
int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
|
||||
attribute_warn_unused_result;
|
||||
|
||||
/*!
|
||||
* \brief Shut down a threadpool and destroy it
|
||||
|
|
|
@ -561,7 +561,10 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
|
|||
|
||||
/* When all that's done, remove the ref the mailbox has on the sub */
|
||||
if (sub->mailbox) {
|
||||
ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
|
||||
if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) {
|
||||
/* Nothing we can do here, the conditional is just to keep
|
||||
* the compiler happy that we're not ignoring the result. */
|
||||
}
|
||||
}
|
||||
|
||||
/* Unsubscribing unrefs the subscription */
|
||||
|
|
|
@ -235,7 +235,11 @@ static void default_listener_shutdown(struct ast_taskprocessor_listener *listene
|
|||
/* Hold a reference during shutdown */
|
||||
ao2_t_ref(listener->tps, +1, "tps-shutdown");
|
||||
|
||||
ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
|
||||
if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) {
|
||||
/* This will cause the thread to exit early without completing tasks already
|
||||
* in the queue. This is probably the least bad option in this situation. */
|
||||
default_listener_die(pvt);
|
||||
}
|
||||
|
||||
ast_assert(pvt->poll_thread != AST_PTHREADT_NULL);
|
||||
|
||||
|
|
|
@ -658,7 +658,9 @@ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
|
|||
}
|
||||
|
||||
if (pool->listener && pool->listener->callbacks->emptied) {
|
||||
ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
|
||||
if (ast_taskprocessor_push(pool->control_tps, queued_emptied, pool)) {
|
||||
/* Nothing to do here but we need the check to keep the compiler happy. */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -151,7 +151,10 @@ AST_TEST_DEFINE(default_taskprocessor)
|
|||
return AST_TEST_FAIL;
|
||||
}
|
||||
|
||||
ast_taskprocessor_push(tps, task, task_data);
|
||||
if (ast_taskprocessor_push(tps, task, task_data)) {
|
||||
ast_test_status_update(test, "Failed to queue task\n");
|
||||
return AST_TEST_FAIL;
|
||||
}
|
||||
|
||||
res = task_wait(task_data);
|
||||
if (res != 0) {
|
||||
|
@ -240,7 +243,11 @@ AST_TEST_DEFINE(default_taskprocessor_load)
|
|||
|
||||
for (i = 0; i < NUM_TASKS; ++i) {
|
||||
rand_data[i] = ast_random();
|
||||
ast_taskprocessor_push(tps, load_task, &rand_data[i]);
|
||||
if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) {
|
||||
ast_test_status_update(test, "Failed to queue task\n");
|
||||
res = AST_TEST_FAIL;
|
||||
goto test_end;
|
||||
}
|
||||
}
|
||||
|
||||
ast_mutex_lock(&load_task_results.lock);
|
||||
|
@ -438,14 +445,22 @@ AST_TEST_DEFINE(taskprocessor_listener)
|
|||
goto test_exit;
|
||||
}
|
||||
|
||||
ast_taskprocessor_push(tps, listener_test_task, NULL);
|
||||
if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
|
||||
ast_test_status_update(test, "Failed to queue task\n");
|
||||
res = AST_TEST_FAIL;
|
||||
goto test_exit;
|
||||
}
|
||||
|
||||
if (check_stats(test, pvt, 1, 0, 1) < 0) {
|
||||
res = AST_TEST_FAIL;
|
||||
goto test_exit;
|
||||
}
|
||||
|
||||
ast_taskprocessor_push(tps, listener_test_task, NULL);
|
||||
if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
|
||||
ast_test_status_update(test, "Failed to queue task\n");
|
||||
res = AST_TEST_FAIL;
|
||||
goto test_exit;
|
||||
}
|
||||
|
||||
if (check_stats(test, pvt, 2, 0, 1) < 0) {
|
||||
res = AST_TEST_FAIL;
|
||||
|
@ -710,7 +725,10 @@ AST_TEST_DEFINE(taskprocessor_push_local)
|
|||
local_data = 0;
|
||||
ast_taskprocessor_set_local(tps, &local_data);
|
||||
|
||||
ast_taskprocessor_push_local(tps, local_task_exe, task_data);
|
||||
if (ast_taskprocessor_push_local(tps, local_task_exe, task_data)) {
|
||||
ast_test_status_update(test, "Failed to queue task\n");
|
||||
return AST_TEST_FAIL;
|
||||
}
|
||||
|
||||
res = task_wait(task_data);
|
||||
if (res != 0) {
|
||||
|
|
|
@ -127,6 +127,18 @@ static struct simple_task_data *simple_task_data_alloc(void)
|
|||
return std;
|
||||
}
|
||||
|
||||
static void simple_task_data_free(struct simple_task_data *std)
|
||||
{
|
||||
if (!std) {
|
||||
return;
|
||||
}
|
||||
|
||||
ast_mutex_destroy(&std->lock);
|
||||
ast_cond_destroy(&std->cond);
|
||||
|
||||
ast_free(std);
|
||||
}
|
||||
|
||||
static int simple_task(void *data)
|
||||
{
|
||||
struct simple_task_data *std = data;
|
||||
|
@ -319,7 +331,9 @@ AST_TEST_DEFINE(threadpool_push)
|
|||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_push(pool, simple_task, std);
|
||||
if (ast_threadpool_push(pool, simple_task, std)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
wait_for_task_pushed(listener);
|
||||
|
||||
|
@ -328,7 +342,7 @@ AST_TEST_DEFINE(threadpool_push)
|
|||
end:
|
||||
ast_threadpool_shutdown(pool);
|
||||
ao2_cleanup(listener);
|
||||
ast_free(std);
|
||||
simple_task_data_free(std);
|
||||
ast_free(tld);
|
||||
return res;
|
||||
}
|
||||
|
@ -635,11 +649,13 @@ AST_TEST_DEFINE(threadpool_thread_timeout_thrash)
|
|||
}
|
||||
ast_mutex_unlock(&tld->lock);
|
||||
|
||||
ast_threadpool_push(pool, simple_task, std);
|
||||
if (ast_threadpool_push(pool, simple_task, std)) {
|
||||
res = AST_TEST_FAIL;
|
||||
} else {
|
||||
res = wait_for_completion(test, std);
|
||||
}
|
||||
|
||||
res = wait_for_completion(test, std);
|
||||
|
||||
ast_free(std);
|
||||
simple_task_data_free(std);
|
||||
|
||||
if (res == AST_TEST_FAIL) {
|
||||
goto end;
|
||||
|
@ -707,7 +723,9 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread)
|
|||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_push(pool, simple_task, std);
|
||||
if (ast_threadpool_push(pool, simple_task, std)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_set_size(pool, 1);
|
||||
|
||||
|
@ -736,7 +754,7 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread)
|
|||
end:
|
||||
ast_threadpool_shutdown(pool);
|
||||
ao2_cleanup(listener);
|
||||
ast_free(std);
|
||||
simple_task_data_free(std);
|
||||
ast_free(tld);
|
||||
return res;
|
||||
|
||||
|
@ -796,7 +814,10 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task)
|
|||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_push(pool, simple_task, std);
|
||||
if (ast_threadpool_push(pool, simple_task, std)) {
|
||||
res = AST_TEST_FAIL;
|
||||
goto end;
|
||||
}
|
||||
|
||||
res = wait_for_completion(test, std);
|
||||
if (res == AST_TEST_FAIL) {
|
||||
|
@ -819,7 +840,7 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task)
|
|||
end:
|
||||
ast_threadpool_shutdown(pool);
|
||||
ao2_cleanup(listener);
|
||||
ast_free(std);
|
||||
simple_task_data_free(std);
|
||||
ast_free(tld);
|
||||
return res;
|
||||
}
|
||||
|
@ -882,9 +903,18 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
|
|||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_push(pool, simple_task, std1);
|
||||
ast_threadpool_push(pool, simple_task, std2);
|
||||
ast_threadpool_push(pool, simple_task, std3);
|
||||
res = AST_TEST_FAIL;
|
||||
if (ast_threadpool_push(pool, simple_task, std1)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (ast_threadpool_push(pool, simple_task, std2)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (ast_threadpool_push(pool, simple_task, std3)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
res = wait_for_completion(test, std1);
|
||||
if (res == AST_TEST_FAIL) {
|
||||
|
@ -914,9 +944,9 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
|
|||
end:
|
||||
ast_threadpool_shutdown(pool);
|
||||
ao2_cleanup(listener);
|
||||
ast_free(std1);
|
||||
ast_free(std2);
|
||||
ast_free(std3);
|
||||
simple_task_data_free(std1);
|
||||
simple_task_data_free(std2);
|
||||
simple_task_data_free(std3);
|
||||
ast_free(tld);
|
||||
return res;
|
||||
}
|
||||
|
@ -1011,7 +1041,9 @@ AST_TEST_DEFINE(threadpool_auto_increment)
|
|||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_push(pool, simple_task, std1);
|
||||
if (ast_threadpool_push(pool, simple_task, std1)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
/* Pushing the task should result in the threadpool growing
|
||||
* by three threads. This will allow the task to actually execute
|
||||
|
@ -1034,9 +1066,19 @@ AST_TEST_DEFINE(threadpool_auto_increment)
|
|||
/* Now push three tasks into the pool and ensure the pool does not
|
||||
* grow.
|
||||
*/
|
||||
ast_threadpool_push(pool, simple_task, std2);
|
||||
ast_threadpool_push(pool, simple_task, std3);
|
||||
ast_threadpool_push(pool, simple_task, std4);
|
||||
res = AST_TEST_FAIL;
|
||||
|
||||
if (ast_threadpool_push(pool, simple_task, std2)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (ast_threadpool_push(pool, simple_task, std3)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (ast_threadpool_push(pool, simple_task, std4)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
res = wait_for_completion(test, std2);
|
||||
if (res == AST_TEST_FAIL) {
|
||||
|
@ -1064,10 +1106,10 @@ AST_TEST_DEFINE(threadpool_auto_increment)
|
|||
end:
|
||||
ast_threadpool_shutdown(pool);
|
||||
ao2_cleanup(listener);
|
||||
ast_free(std1);
|
||||
ast_free(std2);
|
||||
ast_free(std3);
|
||||
ast_free(std4);
|
||||
simple_task_data_free(std1);
|
||||
simple_task_data_free(std2);
|
||||
simple_task_data_free(std3);
|
||||
simple_task_data_free(std4);
|
||||
ast_free(tld);
|
||||
return res;
|
||||
}
|
||||
|
@ -1121,7 +1163,9 @@ AST_TEST_DEFINE(threadpool_max_size)
|
|||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_push(pool, simple_task, std);
|
||||
if (ast_threadpool_push(pool, simple_task, std)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
res = wait_for_completion(test, std);
|
||||
if (res == AST_TEST_FAIL) {
|
||||
|
@ -1137,7 +1181,7 @@ AST_TEST_DEFINE(threadpool_max_size)
|
|||
end:
|
||||
ast_threadpool_shutdown(pool);
|
||||
ao2_cleanup(listener);
|
||||
ast_free(std);
|
||||
simple_task_data_free(std);
|
||||
ast_free(tld);
|
||||
return res;
|
||||
}
|
||||
|
@ -1193,7 +1237,9 @@ AST_TEST_DEFINE(threadpool_reactivation)
|
|||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_push(pool, simple_task, std1);
|
||||
if (ast_threadpool_push(pool, simple_task, std1)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_set_size(pool, 1);
|
||||
|
||||
|
@ -1218,7 +1264,10 @@ AST_TEST_DEFINE(threadpool_reactivation)
|
|||
}
|
||||
|
||||
/* Now make sure the threadpool reactivates when we add a second task */
|
||||
ast_threadpool_push(pool, simple_task, std2);
|
||||
if (ast_threadpool_push(pool, simple_task, std2)) {
|
||||
res = AST_TEST_FAIL;
|
||||
goto end;
|
||||
}
|
||||
|
||||
res = wait_for_completion(test, std2);
|
||||
if (res == AST_TEST_FAIL) {
|
||||
|
@ -1240,8 +1289,8 @@ AST_TEST_DEFINE(threadpool_reactivation)
|
|||
end:
|
||||
ast_threadpool_shutdown(pool);
|
||||
ao2_cleanup(listener);
|
||||
ast_free(std1);
|
||||
ast_free(std2);
|
||||
simple_task_data_free(std1);
|
||||
simple_task_data_free(std2);
|
||||
ast_free(tld);
|
||||
return res;
|
||||
|
||||
|
@ -1269,6 +1318,19 @@ static struct complex_task_data *complex_task_data_alloc(void)
|
|||
return ctd;
|
||||
}
|
||||
|
||||
static void complex_task_data_free(struct complex_task_data *ctd)
|
||||
{
|
||||
if (!ctd) {
|
||||
return;
|
||||
}
|
||||
|
||||
ast_mutex_destroy(&ctd->lock);
|
||||
ast_cond_destroy(&ctd->stall_cond);
|
||||
ast_cond_destroy(&ctd->notify_cond);
|
||||
|
||||
ast_free(ctd);
|
||||
}
|
||||
|
||||
static int complex_task(void *data)
|
||||
{
|
||||
struct complex_task_data *ctd = data;
|
||||
|
@ -1400,8 +1462,13 @@ AST_TEST_DEFINE(threadpool_task_distribution)
|
|||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_push(pool, complex_task, ctd1);
|
||||
ast_threadpool_push(pool, complex_task, ctd2);
|
||||
if (ast_threadpool_push(pool, complex_task, ctd1)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (ast_threadpool_push(pool, complex_task, ctd2)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_set_size(pool, 2);
|
||||
|
||||
|
@ -1438,8 +1505,8 @@ AST_TEST_DEFINE(threadpool_task_distribution)
|
|||
end:
|
||||
ast_threadpool_shutdown(pool);
|
||||
ao2_cleanup(listener);
|
||||
ast_free(ctd1);
|
||||
ast_free(ctd2);
|
||||
complex_task_data_free(ctd1);
|
||||
complex_task_data_free(ctd2);
|
||||
ast_free(tld);
|
||||
return res;
|
||||
}
|
||||
|
@ -1496,8 +1563,13 @@ AST_TEST_DEFINE(threadpool_more_destruction)
|
|||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_push(pool, complex_task, ctd1);
|
||||
ast_threadpool_push(pool, complex_task, ctd2);
|
||||
if (ast_threadpool_push(pool, complex_task, ctd1)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (ast_threadpool_push(pool, complex_task, ctd2)) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
ast_threadpool_set_size(pool, 4);
|
||||
|
||||
|
@ -1549,8 +1621,8 @@ AST_TEST_DEFINE(threadpool_more_destruction)
|
|||
end:
|
||||
ast_threadpool_shutdown(pool);
|
||||
ao2_cleanup(listener);
|
||||
ast_free(ctd1);
|
||||
ast_free(ctd2);
|
||||
complex_task_data_free(ctd1);
|
||||
complex_task_data_free(ctd2);
|
||||
ast_free(tld);
|
||||
return res;
|
||||
}
|
||||
|
@ -1666,9 +1738,9 @@ end:
|
|||
poke_worker(data3);
|
||||
ast_taskprocessor_unreference(uut);
|
||||
ast_threadpool_shutdown(pool);
|
||||
ast_free(data1);
|
||||
ast_free(data2);
|
||||
ast_free(data3);
|
||||
complex_task_data_free(data1);
|
||||
complex_task_data_free(data2);
|
||||
complex_task_data_free(data3);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue