Improve performance of the code handling the frame queue in chan_iax2.
In my tests that exercised full frame handling in chan_iax2, the version with these changes took 30% to 40% of the CPU time compared to the same test of Asterisk trunk before these modifications. While doing some profiling for <http://reviewboard.digium.com/r/205/>, one function that caught my eye was network_thread() in chan_iax2.c. After the things that I was working on there, it was the next target for analysis and optimization. I used oprofile's source annotation functionality and found that the loop traversing the frame queue in network_thread() was to blame for the excessive CPU cycle consumption. The frame_queue in chan_iax2 previously held all frames that either were pending transmission or had been transmitted and are still pending acknowledgment. In network_thread(), the previous code would go back through the main for loop after reading a single incoming frame or after being signaled because a frame had been queued up for initial transmission. In each iteration of the loop, it traverses the entire frame queue looking for frames that need to be transmitted. On a busy server, this could easily be quite a few entries. This patch is actually quite simple. The frame_queue has become only a list of frames pending acknowledgment. Frames that need to be transmitted are queued up to a dedicated transmit thread via the taskprocessor API. As a result, the code in network_thread() becomes much simpler, as its only job is to read incoming frames. In addition to the previously described changes, this patch includes some additional changes to the frame_queue. Instead of one big frame_queue, now there is a list per call number to further reduce wasted list traversals. The biggest impact of this change is in socket_process(). For additional details on testing and test results, see the review request. Review: http://reviewboard.digium.com/r/212/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@185432 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
parent
b90ee93f70
commit
8dfcd7e418
|
@ -88,6 +88,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
|
|||
#include "asterisk/event.h"
|
||||
#include "asterisk/astobj2.h"
|
||||
#include "asterisk/timing.h"
|
||||
#include "asterisk/taskprocessor.h"
|
||||
|
||||
#include "iax2.h"
|
||||
#include "iax2-parser.h"
|
||||
|
@ -758,8 +759,12 @@ struct chan_iax2_pvt {
|
|||
* \note The contents of this list do not need to be explicitly destroyed
|
||||
* on module unload. This is because all active calls are destroyed, and
|
||||
* all frames in this queue will get destroyed as a part of that process.
|
||||
*
|
||||
* \note Contents protected by the iaxsl[] locks
|
||||
*/
|
||||
static AST_LIST_HEAD_STATIC(frame_queue, iax_frame);
|
||||
static AST_LIST_HEAD_NOLOCK(, iax_frame) frame_queue[IAX_MAX_CALLS];
|
||||
|
||||
static struct ast_taskprocessor *transmit_processor;
|
||||
|
||||
/*!
|
||||
* This module will get much higher performance when doing a lot of
|
||||
|
@ -1588,20 +1593,18 @@ static void pvt_destructor(void *obj)
|
|||
struct iax_frame *cur = NULL;
|
||||
|
||||
ast_mutex_lock(&iaxsl[pvt->callno]);
|
||||
|
||||
iax2_destroy_helper(pvt);
|
||||
ast_mutex_unlock(&iaxsl[pvt->callno]);
|
||||
|
||||
/* Already gone */
|
||||
ast_set_flag(pvt, IAX_ALREADYGONE);
|
||||
ast_set_flag(pvt, IAX_ALREADYGONE);
|
||||
|
||||
AST_LIST_LOCK(&frame_queue);
|
||||
AST_LIST_TRAVERSE(&frame_queue, cur, list) {
|
||||
AST_LIST_TRAVERSE(&frame_queue[pvt->callno], cur, list) {
|
||||
/* Cancel any pending transmissions */
|
||||
if (cur->callno == pvt->callno) {
|
||||
cur->retries = -1;
|
||||
}
|
||||
cur->retries = -1;
|
||||
}
|
||||
AST_LIST_UNLOCK(&frame_queue);
|
||||
|
||||
ast_mutex_unlock(&iaxsl[pvt->callno]);
|
||||
|
||||
if (pvt->reg) {
|
||||
pvt->reg->callno = 0;
|
||||
|
@ -2623,17 +2626,16 @@ static void __attempt_transmit(const void *data)
|
|||
f->retries = -1;
|
||||
freeme = 1;
|
||||
}
|
||||
if (callno)
|
||||
ast_mutex_unlock(&iaxsl[callno]);
|
||||
/* Do not try again */
|
||||
|
||||
if (freeme) {
|
||||
/* Don't attempt delivery, just remove it from the queue */
|
||||
AST_LIST_LOCK(&frame_queue);
|
||||
AST_LIST_REMOVE(&frame_queue, f, list);
|
||||
AST_LIST_UNLOCK(&frame_queue);
|
||||
AST_LIST_REMOVE(&frame_queue[callno], f, list);
|
||||
ast_mutex_unlock(&iaxsl[callno]);
|
||||
f->retrans = -1;
|
||||
/* Free the IAX frame */
|
||||
iax2_frame_free(f);
|
||||
} else if (callno) {
|
||||
ast_mutex_unlock(&iaxsl[callno]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2923,7 +2925,7 @@ static char *complete_iax2_peers(const char *line, const char *word, int pos, in
|
|||
static char *handle_cli_iax2_show_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
|
||||
{
|
||||
struct iax_frame *cur;
|
||||
int cnt = 0, dead = 0, final = 0;
|
||||
int cnt = 0, dead = 0, final = 0, i = 0;
|
||||
|
||||
switch (cmd) {
|
||||
case CLI_INIT:
|
||||
|
@ -2939,15 +2941,17 @@ static char *handle_cli_iax2_show_stats(struct ast_cli_entry *e, int cmd, struct
|
|||
if (a->argc != 3)
|
||||
return CLI_SHOWUSAGE;
|
||||
|
||||
AST_LIST_LOCK(&frame_queue);
|
||||
AST_LIST_TRAVERSE(&frame_queue, cur, list) {
|
||||
if (cur->retries < 0)
|
||||
dead++;
|
||||
if (cur->final)
|
||||
final++;
|
||||
cnt++;
|
||||
for (i = 0; i < ARRAY_LEN(frame_queue); i++) {
|
||||
ast_mutex_lock(&iaxsl[i]);
|
||||
AST_LIST_TRAVERSE(&frame_queue[i], cur, list) {
|
||||
if (cur->retries < 0)
|
||||
dead++;
|
||||
if (cur->final)
|
||||
final++;
|
||||
cnt++;
|
||||
}
|
||||
ast_mutex_unlock(&iaxsl[i]);
|
||||
}
|
||||
AST_LIST_UNLOCK(&frame_queue);
|
||||
|
||||
ast_cli(a->fd, " IAX Statistics\n");
|
||||
ast_cli(a->fd, "---------------------\n");
|
||||
|
@ -3304,23 +3308,39 @@ static int schedule_delivery(struct iax_frame *fr, int updatehistory, int fromtr
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int iax2_transmit(struct iax_frame *fr)
|
||||
static int transmit_frame(void *data)
|
||||
{
|
||||
/* Lock the queue and place this packet at the end */
|
||||
/* By setting this to 0, the network thread will send it for us, and
|
||||
queue retransmission if necessary */
|
||||
fr->sentyet = 0;
|
||||
AST_LIST_LOCK(&frame_queue);
|
||||
AST_LIST_INSERT_TAIL(&frame_queue, fr, list);
|
||||
AST_LIST_UNLOCK(&frame_queue);
|
||||
/* Wake up the network and scheduler thread */
|
||||
if (netthreadid != AST_PTHREADT_NULL)
|
||||
pthread_kill(netthreadid, SIGURG);
|
||||
ast_sched_thread_poke(sched);
|
||||
struct iax_frame *fr = data;
|
||||
|
||||
ast_mutex_lock(&iaxsl[fr->callno]);
|
||||
|
||||
fr->sentyet = 1;
|
||||
|
||||
if (iaxs[fr->callno]) {
|
||||
send_packet(fr);
|
||||
}
|
||||
|
||||
if (fr->retries < 0) {
|
||||
ast_mutex_unlock(&iaxsl[fr->callno]);
|
||||
/* No retransmit requested */
|
||||
iax_frame_free(fr);
|
||||
} else {
|
||||
/* We need reliable delivery. Schedule a retransmission */
|
||||
AST_LIST_INSERT_TAIL(&frame_queue[fr->callno], fr, list);
|
||||
ast_mutex_unlock(&iaxsl[fr->callno]);
|
||||
fr->retries++;
|
||||
fr->retrans = iax2_sched_add(sched, fr->retrytime, attempt_transmit, fr);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int iax2_transmit(struct iax_frame *fr)
|
||||
{
|
||||
fr->sentyet = 0;
|
||||
|
||||
return ast_taskprocessor_push(transmit_processor, transmit_frame, fr);
|
||||
}
|
||||
|
||||
static int iax2_digit_begin(struct ast_channel *c, char digit)
|
||||
{
|
||||
|
@ -7015,16 +7035,13 @@ static int complete_transfer(int callno, struct iax_ies *ies)
|
|||
pvt->lastsent = 0;
|
||||
pvt->nextpred = 0;
|
||||
pvt->pingtime = DEFAULT_RETRY_TIME;
|
||||
AST_LIST_LOCK(&frame_queue);
|
||||
AST_LIST_TRAVERSE(&frame_queue, cur, list) {
|
||||
AST_LIST_TRAVERSE(&frame_queue[callno], cur, list) {
|
||||
/* We must cancel any packets that would have been transmitted
|
||||
because now we're talking to someone new. It's okay, they
|
||||
were transmitted to someone that didn't care anyway. */
|
||||
if (callno == cur->callno)
|
||||
cur->retries = -1;
|
||||
cur->retries = -1;
|
||||
}
|
||||
AST_LIST_UNLOCK(&frame_queue);
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*! \brief Acknowledgment received for OUR registration */
|
||||
|
@ -7627,16 +7644,13 @@ static void vnak_retransmit(int callno, int last)
|
|||
{
|
||||
struct iax_frame *f;
|
||||
|
||||
AST_LIST_LOCK(&frame_queue);
|
||||
AST_LIST_TRAVERSE(&frame_queue, f, list) {
|
||||
AST_LIST_TRAVERSE(&frame_queue[callno], f, list) {
|
||||
/* Send a copy immediately */
|
||||
if ((f->callno == callno) && iaxs[f->callno] &&
|
||||
((unsigned char ) (f->oseqno - last) < 128) &&
|
||||
(f->retries >= 0)) {
|
||||
if (((unsigned char) (f->oseqno - last) < 128) &&
|
||||
(f->retries >= 0)) {
|
||||
send_packet(f);
|
||||
}
|
||||
}
|
||||
AST_LIST_UNLOCK(&frame_queue);
|
||||
}
|
||||
|
||||
static void __iax2_poke_peer_s(const void *data)
|
||||
|
@ -8653,17 +8667,15 @@ static int socket_process(struct iax2_thread *thread)
|
|||
if (iaxdebug)
|
||||
ast_debug(1, "Cancelling transmission of packet %d\n", x);
|
||||
call_to_destroy = 0;
|
||||
AST_LIST_LOCK(&frame_queue);
|
||||
AST_LIST_TRAVERSE(&frame_queue, cur, list) {
|
||||
AST_LIST_TRAVERSE(&frame_queue[fr->callno], cur, list) {
|
||||
/* If it's our call, and our timestamp, mark -1 retries */
|
||||
if ((fr->callno == cur->callno) && (x == cur->oseqno)) {
|
||||
if (x == cur->oseqno) {
|
||||
cur->retries = -1;
|
||||
/* Destroy call if this is the end */
|
||||
if (cur->final)
|
||||
call_to_destroy = fr->callno;
|
||||
}
|
||||
}
|
||||
AST_LIST_UNLOCK(&frame_queue);
|
||||
if (call_to_destroy) {
|
||||
if (iaxdebug)
|
||||
ast_debug(1, "Really destroying %d, having been acked on final message\n", call_to_destroy);
|
||||
|
@ -8905,13 +8917,12 @@ retryowner:
|
|||
case IAX_COMMAND_TXACC:
|
||||
if (iaxs[fr->callno]->transferring == TRANSFER_BEGIN) {
|
||||
/* Ack the packet with the given timestamp */
|
||||
AST_LIST_LOCK(&frame_queue);
|
||||
AST_LIST_TRAVERSE(&frame_queue, cur, list) {
|
||||
AST_LIST_TRAVERSE(&frame_queue[fr->callno], cur, list) {
|
||||
/* Cancel any outstanding txcnt's */
|
||||
if ((fr->callno == cur->callno) && (cur->transfer))
|
||||
if (cur->transfer) {
|
||||
cur->retries = -1;
|
||||
}
|
||||
}
|
||||
AST_LIST_UNLOCK(&frame_queue);
|
||||
memset(&ied1, 0, sizeof(ied1));
|
||||
iax_ie_append_short(&ied1, IAX_IE_CALLNO, iaxs[fr->callno]->callno);
|
||||
send_command(iaxs[fr->callno], AST_FRAME_IAX, IAX_COMMAND_TXREADY, 0, ied1.buf, ied1.pos, -1);
|
||||
|
@ -9810,13 +9821,12 @@ immediatedial:
|
|||
break;
|
||||
case IAX_COMMAND_TXMEDIA:
|
||||
if (iaxs[fr->callno]->transferring == TRANSFER_READY) {
|
||||
AST_LIST_LOCK(&frame_queue);
|
||||
AST_LIST_TRAVERSE(&frame_queue, cur, list) {
|
||||
AST_LIST_TRAVERSE(&frame_queue[fr->callno], cur, list) {
|
||||
/* Cancel any outstanding frames and start anew */
|
||||
if ((fr->callno == cur->callno) && (cur->transfer))
|
||||
if (cur->transfer) {
|
||||
cur->retries = -1;
|
||||
}
|
||||
}
|
||||
AST_LIST_UNLOCK(&frame_queue);
|
||||
/* Start sending our media to the transfer address, but otherwise leave the call as-is */
|
||||
iaxs[fr->callno]->transferring = TRANSFER_MEDIAPASS;
|
||||
}
|
||||
|
@ -10495,66 +10505,18 @@ static struct ast_channel *iax2_request(const char *type, int format, void *data
|
|||
|
||||
static void *network_thread(void *ignore)
|
||||
{
|
||||
/* Our job is simple: Send queued messages, retrying if necessary. Read frames
|
||||
from the network, and queue them for delivery to the channels */
|
||||
int res, count, wakeup;
|
||||
struct iax_frame *f;
|
||||
|
||||
if (timer)
|
||||
if (timer) {
|
||||
ast_io_add(io, ast_timer_fd(timer), timing_read, AST_IO_IN | AST_IO_PRI, NULL);
|
||||
|
||||
for(;;) {
|
||||
pthread_testcancel();
|
||||
|
||||
/* Go through the queue, sending messages which have not yet been
|
||||
sent, and scheduling retransmissions if appropriate */
|
||||
AST_LIST_LOCK(&frame_queue);
|
||||
count = 0;
|
||||
wakeup = -1;
|
||||
AST_LIST_TRAVERSE_SAFE_BEGIN(&frame_queue, f, list) {
|
||||
if (f->sentyet)
|
||||
continue;
|
||||
|
||||
/* Try to lock the pvt, if we can't... don't fret - defer it till later */
|
||||
if (ast_mutex_trylock(&iaxsl[f->callno])) {
|
||||
wakeup = 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
f->sentyet = 1;
|
||||
|
||||
if (iaxs[f->callno]) {
|
||||
send_packet(f);
|
||||
count++;
|
||||
}
|
||||
|
||||
ast_mutex_unlock(&iaxsl[f->callno]);
|
||||
|
||||
if (f->retries < 0) {
|
||||
/* This is not supposed to be retransmitted */
|
||||
AST_LIST_REMOVE_CURRENT(list);
|
||||
/* Free the iax frame */
|
||||
iax_frame_free(f);
|
||||
} else {
|
||||
/* We need reliable delivery. Schedule a retransmission */
|
||||
f->retries++;
|
||||
f->retrans = iax2_sched_add(sched, f->retrytime, attempt_transmit, f);
|
||||
}
|
||||
}
|
||||
AST_LIST_TRAVERSE_SAFE_END;
|
||||
AST_LIST_UNLOCK(&frame_queue);
|
||||
|
||||
pthread_testcancel();
|
||||
if (count >= 20)
|
||||
ast_debug(1, "chan_iax2: Sent %d queued outbound frames all at once\n", count);
|
||||
|
||||
/* Now do the IO, and run scheduled tasks */
|
||||
res = ast_io_wait(io, wakeup);
|
||||
if (res >= 0) {
|
||||
if (res >= 20)
|
||||
ast_debug(1, "chan_iax2: ast_io_wait ran %d I/Os all at once\n", res);
|
||||
}
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
pthread_testcancel();
|
||||
/* Wake up once a second just in case SIGURG was sent while
|
||||
* we weren't in poll(), to make sure we don't hang when trying
|
||||
* to unload. */
|
||||
ast_io_wait(io, 1000);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -12426,19 +12388,14 @@ static int __unload_module(void)
|
|||
struct ast_context *con;
|
||||
int x;
|
||||
|
||||
/* Make sure threads do not hold shared resources when they are canceled */
|
||||
|
||||
/* Grab the sched lock resource to keep it away from threads about to die */
|
||||
/* Cancel the network thread, close the net socket */
|
||||
if (netthreadid != AST_PTHREADT_NULL) {
|
||||
AST_LIST_LOCK(&frame_queue);
|
||||
pthread_cancel(netthreadid);
|
||||
AST_LIST_UNLOCK(&frame_queue);
|
||||
pthread_kill(netthreadid, SIGURG);
|
||||
pthread_join(netthreadid, NULL);
|
||||
}
|
||||
|
||||
sched = ast_sched_thread_destroy(sched);
|
||||
|
||||
|
||||
/* Call for all threads to halt */
|
||||
AST_LIST_LOCK(&idle_list);
|
||||
while ((thread = AST_LIST_REMOVE_HEAD(&idle_list, list)))
|
||||
|
@ -12489,6 +12446,7 @@ static int __unload_module(void)
|
|||
if (timer) {
|
||||
ast_timer_close(timer);
|
||||
}
|
||||
transmit_processor = ast_taskprocessor_unreference(transmit_processor);
|
||||
|
||||
con = ast_context_find(regcontext);
|
||||
if (con)
|
||||
|
@ -12557,19 +12515,23 @@ static int load_module(void)
|
|||
struct iax2_registry *reg = NULL;
|
||||
|
||||
peers = ao2_container_alloc(MAX_PEER_BUCKETS, peer_hash_cb, peer_cmp_cb);
|
||||
if (!peers)
|
||||
if (!peers) {
|
||||
return AST_MODULE_LOAD_FAILURE;
|
||||
}
|
||||
|
||||
users = ao2_container_alloc(MAX_USER_BUCKETS, user_hash_cb, user_cmp_cb);
|
||||
if (!users) {
|
||||
ao2_ref(peers, -1);
|
||||
return AST_MODULE_LOAD_FAILURE;
|
||||
}
|
||||
|
||||
iax_peercallno_pvts = ao2_container_alloc(IAX_MAX_CALLS, pvt_hash_cb, pvt_cmp_cb);
|
||||
if (!iax_peercallno_pvts) {
|
||||
ao2_ref(peers, -1);
|
||||
ao2_ref(users, -1);
|
||||
return AST_MODULE_LOAD_FAILURE;
|
||||
}
|
||||
|
||||
iax_transfercallno_pvts = ao2_container_alloc(IAX_MAX_CALLS, transfercallno_pvt_hash_cb, transfercallno_pvt_cmp_cb);
|
||||
if (!iax_transfercallno_pvts) {
|
||||
ao2_ref(peers, -1);
|
||||
|
@ -12577,6 +12539,16 @@ static int load_module(void)
|
|||
ao2_ref(iax_peercallno_pvts, -1);
|
||||
return AST_MODULE_LOAD_FAILURE;
|
||||
}
|
||||
|
||||
transmit_processor = ast_taskprocessor_get("iax2_transmit", TPS_REF_DEFAULT);
|
||||
if (!transmit_processor) {
|
||||
ao2_ref(peers, -1);
|
||||
ao2_ref(users, -1);
|
||||
ao2_ref(iax_peercallno_pvts, -1);
|
||||
ao2_ref(iax_transfercallno_pvts, -1);
|
||||
return AST_MODULE_LOAD_FAILURE;
|
||||
}
|
||||
|
||||
ast_custom_function_register(&iaxpeer_function);
|
||||
ast_custom_function_register(&iaxvar_function);
|
||||
|
||||
|
|
Loading…
Reference in New Issue