diff --git a/mbuni/extras/pgsql-queue/mms_pgsql_queue.c b/mbuni/extras/pgsql-queue/mms_pgsql_queue.c index 5282c13..5e4c5b2 100644 --- a/mbuni/extras/pgsql-queue/mms_pgsql_queue.c +++ b/mbuni/extras/pgsql-queue/mms_pgsql_queue.c @@ -345,12 +345,12 @@ static int pgq_free_envelope(MmsEnvelope *e, int removefromqueue) * H - generic headers associated with message (e.g. for passing to MMC) */ -static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int shouldblock) +static MmsEnvelope *pgq_queue_readenvelope_ex(char *qf, char *mms_queuedir, int shouldblock, int check_send_time) { int64_t qid; long num_attempts, i, n; time_t sendt, created, lastt, edate; - char cmd[4*QFNAMEMAX], _qf[QFNAMEMAX*2+1]; + char cmd[4*QFNAMEMAX], _qf[QFNAMEMAX*2+1], tmp[128]; char data_file[4*QFNAMEMAX+1]; Octstr *from = NULL; @@ -366,10 +366,13 @@ static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int sho n = strlen(qf); PQescapeStringConn(c, _qf, qf, n < QFNAMEMAX ? n : QFNAMEMAX, NULL); + strncpy(tmp, (check_send_time) ? " AND send_time <= current_timestamp " : "", sizeof tmp); /* handle checking of due time */ + /* read and block, to ensure no one else touches it. */ sprintf(cmd, "SELECT id,cdate,lastt,sendt,edate,num_attempts,sender,data FROM " - " mms_messages_view WHERE qdir='%s' AND qfname = '%s' FOR UPDATE %s", + " mms_messages_view WHERE qdir='%s' AND qfname = '%s' %s FOR UPDATE %s", mms_queuedir, _qf, + tmp, shouldblock ? "" : "NOWAIT"); /* nice little PostgreSQL 8.x addition. */ r = PQexec(c, cmd); @@ -590,6 +593,12 @@ static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int sho return e; } +/* The one available outside... */ +static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int shouldblock) +{ + return pgq_queue_readenvelope_ex(qf, mms_queuedir, shouldblock, 0); +} + /* utility writer function. */ static int _puthdr(PGconn *c, int64_t qid, char *hname, char *val) { @@ -1033,24 +1042,18 @@ static void pgdeliver(List *item_list) struct Qthread_data_t *d; while ((d = gwlist_consume(item_list)) != NULL) { - MmsEnvelope *e = pgq_queue_readenvelope(d->qf, d->dir, 0); + MmsEnvelope *e = pgq_queue_readenvelope_ex(d->qf, d->dir, 0,1); /* force checking of send_time */ int res; if (!e) continue; - if (e->sendt <= time(NULL)) { /* in case of fast reads */ - debug("pgqueue_run", 0, "Queued entry %s/%s to thread %ld", - d->dir, d->qf, gwthread_self()); - res = d->deliver(e); - - if (res != 1) - pgq_free_envelope(e, 0); - } else { + + debug("pgqueue_run", 0, "Queued entry %s/%s to thread %ld", + d->dir, d->qf, gwthread_self()); + res = d->deliver(e); + + if (res != 1) pgq_free_envelope(e, 0); - debug("pgqueue_run", 0, - "Queue entry %s/%s skipped. Not yet delivery time", - d->dir, d->qf); - } gw_free(d); }