diff --git a/mbuni/ChangeLog b/mbuni/ChangeLog index fcb1d16..c17dd76 100644 --- a/mbuni/ChangeLog +++ b/mbuni/ChangeLog @@ -1,3 +1,5 @@ +2009-02-11 P. A. Bagyenda + * Fix: Improved pgsql queue processing -- prevent queue flooding 2009-02-09 P. A. Bagyenda * Fix crash when message format is invalid 2009-02-03 P. A. Bagyenda diff --git a/mbuni/extras/pgsql-queue/mms_pgsql_queue.c b/mbuni/extras/pgsql-queue/mms_pgsql_queue.c index a14746d..1612838 100644 --- a/mbuni/extras/pgsql-queue/mms_pgsql_queue.c +++ b/mbuni/extras/pgsql-queue/mms_pgsql_queue.c @@ -1038,14 +1038,30 @@ struct Qthread_data_t { int (*deliver)(MmsEnvelope *e); }; -static void pgdeliver(List *item_list) +struct PGDeliverData_t { + List *item_list; + Dict *keylist; + char dir[128]; +}; + +static void pgdeliver(struct PGDeliverData_t *pgdata) { struct Qthread_data_t *d; + List *item_list = pgdata->item_list; + mms_info(0, "pgdeliver", NULL, "Starting up on queue [%s]", pgdata->dir); while ((d = gwlist_consume(item_list)) != NULL) { MmsEnvelope *e = pgq_queue_readenvelope_ex(d->qf, d->dir, 0,1); /* force checking of send_time */ int res; + char buf[64]; + Octstr *x; + sprintf(buf, "%lld", d->qid); + x = octstr_create(buf); + + dict_remove(pgdata->keylist, x); /* Signal that we got it. */ + octstr_destroy(x); + if (!e) continue; @@ -1059,17 +1075,21 @@ static void pgdeliver(List *item_list) gw_free(d); } /* we're done, exit. */ + mms_info(0, "pgdeliver", NULL, "Shutdown on queue [%s]", pgdata->dir); } - +#define MAX_QLEN 10 /* we don't allow more than this number pending per thread. */ static void pgq_queue_run(char *dir, int (*deliver)(MmsEnvelope *), double sleepsecs, int num_threads, int *rstop) { + struct PGDeliverData_t pgdata; List *items_list = gwlist_create(); + Dict *dict = dict_create(num_threads*MAX_QLEN*2, NULL); /* for keeping list short */ int i, n; char cmd[512]; long *th_ids = NULL; + gw_assert(num_threads > 0); @@ -1083,8 +1103,12 @@ static void pgq_queue_run(char *dir, gwlist_add_producer(items_list); th_ids = gw_malloc(num_threads*sizeof th_ids[0]); + pgdata.item_list = items_list; + pgdata.keylist = dict; + strncpy(pgdata.dir, dir, sizeof pgdata.dir); + for (i = 0; i 0) - for (i = 0; iqid = strtoull(qid, NULL, 10); - strncpy(d->qf, qfname, sizeof d->qf); - strncpy(d->dir, dir, sizeof d->dir); - d->_pad1 = d->_pad2 = 0; /* Just in case! */ + Octstr *xs = octstr_create(qid); - d->deliver = deliver; - - gwlist_produce(items_list, d); + if (dict_put_once(dict, xs, (void *)1) == 1) { /* Item not on list */ + struct Qthread_data_t *d = gw_malloc(sizeof *d); + d->qid = strtoull(qid, NULL, 10); + strncpy(d->qf, qfname, sizeof d->qf); + strncpy(d->dir, dir, sizeof d->dir); + d->_pad1 = d->_pad2 = 0; /* Just in case! */ + + d->deliver = deliver; + gwlist_produce(items_list, d); + } + octstr_destroy(xs); } PQclear(r); return_conn(c); /* return connection to pool. */ + l1: if (*rstop) break; gwthread_sleep(sleepsecs); + if (gwlist_len(items_list) > MAX_QLEN * num_threads) + goto l1; /* don't allow too many pending requests in the waiting queue. */ } while (1); mms_info(0, "pgsql_queue", NULL, "Queue runner on [%s] shutdown, started...", dir); @@ -1137,7 +1168,7 @@ static void pgq_queue_run(char *dir, gw_free(th_ids); gwlist_destroy(items_list, NULL); - + dict_destroy(dict); mms_info(0, "pgsql_queue", NULL, "Queue runner on [%s] shutdown, complete...", dir); } diff --git a/mbuni/mmsc/mmsproxy.c b/mbuni/mmsc/mmsproxy.c index 64b7c07..b7ba1fb 100644 --- a/mbuni/mmsc/mmsproxy.c +++ b/mbuni/mmsc/mmsproxy.c @@ -192,7 +192,8 @@ void stop_mmsproxy(void) http_close_port(settings->port); http_close_port(settings->mm7port); - + if (mm7_thread > 0) + gwthread_wakeup(mm7_thread); mms_info(0, "mmsproxy", NULL, "Signalling shutdown complete."); }