1
0
Fork 0

*** empty log message ***

This commit is contained in:
bagyenda 2009-02-11 04:12:52 +00:00
parent af393b2afe
commit b00dcbe383
3 changed files with 49 additions and 15 deletions

View File

@ -1,3 +1,5 @@
2009-02-11 P. A. Bagyenda <bagyenda@dsmagic.com>
* Fix: Improved pgsql queue processing -- prevent queue flooding
2009-02-09 P. A. Bagyenda <bagyenda@dsmagic.com>
* Fix crash when message format is invalid
2009-02-03 P. A. Bagyenda <bagyenda@dsmagic.com>

View File

@ -1038,14 +1038,30 @@ struct Qthread_data_t {
int (*deliver)(MmsEnvelope *e);
};
static void pgdeliver(List *item_list)
struct PGDeliverData_t {
List *item_list;
Dict *keylist;
char dir[128];
};
static void pgdeliver(struct PGDeliverData_t *pgdata)
{
struct Qthread_data_t *d;
List *item_list = pgdata->item_list;
mms_info(0, "pgdeliver", NULL, "Starting up on queue [%s]", pgdata->dir);
while ((d = gwlist_consume(item_list)) != NULL) {
MmsEnvelope *e = pgq_queue_readenvelope_ex(d->qf, d->dir, 0,1); /* force checking of send_time */
int res;
char buf[64];
Octstr *x;
sprintf(buf, "%lld", d->qid);
x = octstr_create(buf);
dict_remove(pgdata->keylist, x); /* Signal that we got it. */
octstr_destroy(x);
if (!e)
continue;
@ -1059,17 +1075,21 @@ static void pgdeliver(List *item_list)
gw_free(d);
}
/* we're done, exit. */
mms_info(0, "pgdeliver", NULL, "Shutdown on queue [%s]", pgdata->dir);
}
#define MAX_QLEN 10 /* we don't allow more than this number pending per thread. */
static void pgq_queue_run(char *dir,
int (*deliver)(MmsEnvelope *),
double sleepsecs, int num_threads, int *rstop)
{
struct PGDeliverData_t pgdata;
List *items_list = gwlist_create();
Dict *dict = dict_create(num_threads*MAX_QLEN*2, NULL); /* for keeping list short */
int i, n;
char cmd[512];
long *th_ids = NULL;
gw_assert(num_threads > 0);
@ -1083,8 +1103,12 @@ static void pgq_queue_run(char *dir,
gwlist_add_producer(items_list);
th_ids = gw_malloc(num_threads*sizeof th_ids[0]);
pgdata.item_list = items_list;
pgdata.keylist = dict;
strncpy(pgdata.dir, dir, sizeof pgdata.dir);
for (i = 0; i<num_threads; i++)
th_ids[i] = gwthread_create((gwthread_func_t *)pgdeliver, items_list);
th_ids[i] = gwthread_create((gwthread_func_t *)pgdeliver, &pgdata);
/* Note that we get messages ready for delivery (whether or not they have expired).
* any other conditions will be handled by upper level,
@ -1101,27 +1125,34 @@ static void pgq_queue_run(char *dir,
break;
r = PQexec(c, cmd);
if (PQresultStatus(r) == PGRES_TUPLES_OK && (n = PQntuples(r))> 0)
for (i = 0; i<n; i++)
for (i = 0; i<n;
i++)
if ((qfname = PQgetvalue(r, i, 1)) != NULL &&
(qid = PQgetvalue(r, i, 0)) != NULL) {
struct Qthread_data_t *d = gw_malloc(sizeof *d);
d->qid = strtoull(qid, NULL, 10);
strncpy(d->qf, qfname, sizeof d->qf);
strncpy(d->dir, dir, sizeof d->dir);
d->_pad1 = d->_pad2 = 0; /* Just in case! */
Octstr *xs = octstr_create(qid);
d->deliver = deliver;
gwlist_produce(items_list, d);
if (dict_put_once(dict, xs, (void *)1) == 1) { /* Item not on list */
struct Qthread_data_t *d = gw_malloc(sizeof *d);
d->qid = strtoull(qid, NULL, 10);
strncpy(d->qf, qfname, sizeof d->qf);
strncpy(d->dir, dir, sizeof d->dir);
d->_pad1 = d->_pad2 = 0; /* Just in case! */
d->deliver = deliver;
gwlist_produce(items_list, d);
}
octstr_destroy(xs);
}
PQclear(r);
return_conn(c); /* return connection to pool. */
l1:
if (*rstop)
break;
gwthread_sleep(sleepsecs);
if (gwlist_len(items_list) > MAX_QLEN * num_threads)
goto l1; /* don't allow too many pending requests in the waiting queue. */
} while (1);
mms_info(0, "pgsql_queue", NULL, "Queue runner on [%s] shutdown, started...", dir);
@ -1137,7 +1168,7 @@ static void pgq_queue_run(char *dir,
gw_free(th_ids);
gwlist_destroy(items_list, NULL);
dict_destroy(dict);
mms_info(0, "pgsql_queue", NULL, "Queue runner on [%s] shutdown, complete...", dir);
}

View File

@ -192,7 +192,8 @@ void stop_mmsproxy(void)
http_close_port(settings->port);
http_close_port(settings->mm7port);
if (mm7_thread > 0)
gwthread_wakeup(mm7_thread);
mms_info(0, "mmsproxy", NULL, "Signalling shutdown complete.");
}