1
0
Fork 0

*** empty log message ***

This commit is contained in:
bagyenda 2009-01-22 06:34:47 +00:00
parent 2aed655a12
commit 25d8d878c7
1 changed files with 19 additions and 16 deletions

View File

@ -345,12 +345,12 @@ static int pgq_free_envelope(MmsEnvelope *e, int removefromqueue)
* H - generic headers associated with message (e.g. for passing to MMC) * H - generic headers associated with message (e.g. for passing to MMC)
*/ */
static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int shouldblock) static MmsEnvelope *pgq_queue_readenvelope_ex(char *qf, char *mms_queuedir, int shouldblock, int check_send_time)
{ {
int64_t qid; int64_t qid;
long num_attempts, i, n; long num_attempts, i, n;
time_t sendt, created, lastt, edate; time_t sendt, created, lastt, edate;
char cmd[4*QFNAMEMAX], _qf[QFNAMEMAX*2+1]; char cmd[4*QFNAMEMAX], _qf[QFNAMEMAX*2+1], tmp[128];
char data_file[4*QFNAMEMAX+1]; char data_file[4*QFNAMEMAX+1];
Octstr *from = NULL; Octstr *from = NULL;
@ -366,10 +366,13 @@ static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int sho
n = strlen(qf); n = strlen(qf);
PQescapeStringConn(c, _qf, qf, n < QFNAMEMAX ? n : QFNAMEMAX, NULL); PQescapeStringConn(c, _qf, qf, n < QFNAMEMAX ? n : QFNAMEMAX, NULL);
strncpy(tmp, (check_send_time) ? " AND send_time <= current_timestamp " : "", sizeof tmp); /* handle checking of due time */
/* read and block, to ensure no one else touches it. */ /* read and block, to ensure no one else touches it. */
sprintf(cmd, "SELECT id,cdate,lastt,sendt,edate,num_attempts,sender,data FROM " sprintf(cmd, "SELECT id,cdate,lastt,sendt,edate,num_attempts,sender,data FROM "
" mms_messages_view WHERE qdir='%s' AND qfname = '%s' FOR UPDATE %s", " mms_messages_view WHERE qdir='%s' AND qfname = '%s' %s FOR UPDATE %s",
mms_queuedir, _qf, mms_queuedir, _qf,
tmp,
shouldblock ? "" : "NOWAIT"); /* nice little PostgreSQL 8.x addition. */ shouldblock ? "" : "NOWAIT"); /* nice little PostgreSQL 8.x addition. */
r = PQexec(c, cmd); r = PQexec(c, cmd);
@ -590,6 +593,12 @@ static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int sho
return e; return e;
} }
/* The one available outside... */
static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int shouldblock)
{
return pgq_queue_readenvelope_ex(qf, mms_queuedir, shouldblock, 0);
}
/* utility writer function. */ /* utility writer function. */
static int _puthdr(PGconn *c, int64_t qid, char *hname, char *val) static int _puthdr(PGconn *c, int64_t qid, char *hname, char *val)
{ {
@ -1033,24 +1042,18 @@ static void pgdeliver(List *item_list)
struct Qthread_data_t *d; struct Qthread_data_t *d;
while ((d = gwlist_consume(item_list)) != NULL) { while ((d = gwlist_consume(item_list)) != NULL) {
MmsEnvelope *e = pgq_queue_readenvelope(d->qf, d->dir, 0); MmsEnvelope *e = pgq_queue_readenvelope_ex(d->qf, d->dir, 0,1); /* force checking of send_time */
int res; int res;
if (!e) if (!e)
continue; continue;
if (e->sendt <= time(NULL)) { /* in case of fast reads */
debug("pgqueue_run", 0, "Queued entry %s/%s to thread %ld", debug("pgqueue_run", 0, "Queued entry %s/%s to thread %ld",
d->dir, d->qf, gwthread_self()); d->dir, d->qf, gwthread_self());
res = d->deliver(e); res = d->deliver(e);
if (res != 1) if (res != 1)
pgq_free_envelope(e, 0);
} else {
pgq_free_envelope(e, 0); pgq_free_envelope(e, 0);
debug("pgqueue_run", 0,
"Queue entry %s/%s skipped. Not yet delivery time",
d->dir, d->qf);
}
gw_free(d); gw_free(d);
} }