diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index f159a572e2..95b8f23eb9 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -2104,4 +2104,10 @@ int ast_sip_get_host_ip(int af, pj_sockaddr *addr); */ const char *ast_sip_get_host_ip_string(int af); +/*! + * \brief Return the size of the SIP threadpool's task queue + * \since 13.7.0 + */ +long ast_sip_threadpool_queue_size(void); + #endif /* _RES_PJSIP_H */ diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index f16f144cb5..06368867a8 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -262,4 +262,10 @@ int ast_taskprocessor_is_task(struct ast_taskprocessor *tps); */ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps); +/*! + * \brief Return the current size of the taskprocessor queue + * \since 13.7.0 + */ +long ast_taskprocessor_size(struct ast_taskprocessor *tps); + #endif /* __AST_TASKPROCESSOR_H__ */ diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index 75ce0e4e4e..0f360c7a4a 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -292,4 +292,10 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group); +/*! + * \brief Return the size of the threadpool's task queue + * \since 13.7.0 + */ +long ast_threadpool_queue_size(struct ast_threadpool *pool); + #endif /* ASTERISK_THREADPOOL_H */ diff --git a/main/taskprocessor.c b/main/taskprocessor.c index 0719deec2c..91125ad2af 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -130,9 +130,6 @@ static int tps_ping_handler(void *datap); /*! \brief Remove the front task off the taskprocessor queue */ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps); -/*! \brief Return the size of the taskprocessor queue */ -static int tps_taskprocessor_depth(struct ast_taskprocessor *tps); - static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); @@ -508,7 +505,7 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps) return task; } -static int tps_taskprocessor_depth(struct ast_taskprocessor *tps) +long ast_taskprocessor_size(struct ast_taskprocessor *tps) { return (tps) ? tps->tps_queue_size : -1; } @@ -765,7 +762,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) { struct ast_taskprocessor_local local; struct tps_task *t; - int size; + long size; ao2_lock(tps); t = tps_taskprocessor_pop(tps); @@ -797,7 +794,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) * after we pop an empty stack. */ tps->executing = 0; - size = tps_taskprocessor_depth(tps); + size = ast_taskprocessor_size(tps); /* If we executed a task, bump the stats */ if (tps->stats) { tps->stats->_tasks_processed_count++; diff --git a/main/threadpool.c b/main/threadpool.c index 46de9b7f80..60e1e9a3b4 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -1397,3 +1397,8 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast { return ast_threadpool_serializer_group(name, pool, NULL); } + +long ast_threadpool_queue_size(struct ast_threadpool *pool) +{ + return ast_taskprocessor_size(pool->tps); +} diff --git a/res/res_pjsip.c b/res/res_pjsip.c index cdaed4ee70..9d0540d420 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -3809,6 +3809,11 @@ static void remove_request_headers(pjsip_endpoint *endpt) } } +long ast_sip_threadpool_queue_size(void) +{ + return ast_threadpool_queue_size(sip_threadpool); +} + AST_TEST_DEFINE(xml_sanitization_end_null) { char sanitized[8]; diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c index 9b052603a9..1d39e0fd28 100644 --- a/res/res_pjsip/pjsip_distributor.c +++ b/res/res_pjsip/pjsip_distributor.c @@ -246,6 +246,8 @@ static pjsip_module endpoint_mod = { .on_rx_request = endpoint_lookup, }; +#define SIP_MAX_QUEUE 500L + static pj_bool_t distributor(pjsip_rx_data *rdata) { pjsip_dialog *dlg = find_dialog(rdata); @@ -280,7 +282,17 @@ static pj_bool_t distributor(pjsip_rx_data *rdata) clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint); } - ast_sip_push_task(serializer, distribute, clone); + if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) { + /* When the threadpool is backed up this much, there is a good chance that we have encountered + * some sort of terrible condition and don't need to be adding more work to the threadpool. + * It's in our best interest to send back a 503 response and be done with it. + */ + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL); + ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]); + pjsip_rx_data_free_cloned(clone); + } else { + ast_sip_push_task(serializer, distribute, clone); + } end: if (dlg) {