1
0
Fork 0

pgsql-queue fix

This commit is contained in:
bagyenda 2009-01-22 06:09:36 +00:00
parent e402334756
commit 2aed655a12
3 changed files with 27 additions and 29 deletions

View File

@ -1,3 +1,5 @@
2009-01-22 P. A. Bagyenda <bagyenda@dsmagic.com>
* Fix: PGSQL Queue module was not honouring send_time fully
2009-01-21 P. A. Bagyenda <bagyenda@dsmagic.com> 2009-01-21 P. A. Bagyenda <bagyenda@dsmagic.com>
* Minor fix in mmsc: fetch URL * Minor fix in mmsc: fetch URL
2009-01-17 P. A. Bagyenda <bagyenda@dsmagic.com> 2009-01-17 P. A. Bagyenda <bagyenda@dsmagic.com>

View File

@ -1035,15 +1035,22 @@ static void pgdeliver(List *item_list)
while ((d = gwlist_consume(item_list)) != NULL) { while ((d = gwlist_consume(item_list)) != NULL) {
MmsEnvelope *e = pgq_queue_readenvelope(d->qf, d->dir, 0); MmsEnvelope *e = pgq_queue_readenvelope(d->qf, d->dir, 0);
int res; int res;
if (e) { /* no need to check time -- it was checked in queue runner, so we know it's time to send.*/ if (!e)
continue;
if (e->sendt <= time(NULL)) { /* in case of fast reads */
debug("pgqueue_run", 0, "Queued entry %s/%s to thread %ld", debug("pgqueue_run", 0, "Queued entry %s/%s to thread %ld",
d->dir, d->qf, gwthread_self()); d->dir, d->qf, gwthread_self());
res = d->deliver(e); res = d->deliver(e);
if (res != 1) if (res != 1)
pgq_free_envelope(e, 0); pgq_free_envelope(e, 0);
} } else {
pgq_free_envelope(e, 0);
debug("pgqueue_run", 0,
"Queue entry %s/%s skipped. Not yet delivery time",
d->dir, d->qf);
}
gw_free(d); gw_free(d);
} }

View File

@ -23,22 +23,14 @@ static MmsEnvelope *update_env(MmsEnvelope *e, MmsEnvelopeTo *xto, int success)
{ {
time_t tnow = time(NULL); time_t tnow = time(NULL);
if (success && xto && !(e->msgtype == MMS_MSGTYPE_SEND_REQ || if (success && xto &&
e->msgtype == MMS_MSGTYPE_RETRIEVE_CONF)) !(e->msgtype == MMS_MSGTYPE_SEND_REQ ||
e->msgtype == MMS_MSGTYPE_RETRIEVE_CONF))
xto->process = 0; /* No more processing. */ xto->process = 0; /* No more processing. */
else {
e->lasttry = tnow; e->lasttry = tnow;
e->attempts++; e->attempts++;
e->sendt = tnow + settings->send_back_off * e->attempts;
/* If max send attempts has been reached, set next try to expiry time, otherwise
* use normal back-off procedure
*/
if (e->attempts >= settings->maxsendattempts)
e->sendt = e->expiryt;
else
e->sendt = e->lasttry + settings->send_back_off * e->attempts;
}
if (settings->qfs->mms_queue_update(e) == 1) if (settings->qfs->mms_queue_update(e) == 1)
e = NULL; e = NULL;
@ -127,9 +119,9 @@ static void do_mm1_push(Octstr *rcpt_to, int isphonenum, MmsEnvelope *e, MmsMsg
} else { /* An IP Address: Send packet, forget. */ } else { /* An IP Address: Send packet, forget. */
Octstr *addr = udp_create_address(to, WAPPUSH_PORT); Octstr *addr = udp_create_address(to, WAPPUSH_PORT);
int sock = udp_client_socket(); int sock = udp_client_socket();
MmsEnvelopeTo *xto = gwlist_get(e->to,0);
if (sock > 0) { if (sock > 0) {
MmsEnvelopeTo *xto = gwlist_get(e->to,0);
octstr_append(pduhdr, s); octstr_append(pduhdr, s);
#if 0 #if 0
octstr_dump(pduhdr, 0); octstr_dump(pduhdr, 0);
@ -141,8 +133,9 @@ static void do_mm1_push(Octstr *rcpt_to, int isphonenum, MmsEnvelope *e, MmsMsg
NULL, NULL, "MM1", NULL,NULL); NULL, NULL, "MM1", NULL,NULL);
e = update_env(e, xto, 1); e = update_env(e, xto, 1);
} else { } else {
e = update_env(e, NULL, 0); e = update_env(e, xto, 0);
mms_error(0, "MM1", NULL, "push to %s:%d failed: %s", octstr_get_cstr(to), WAPPUSH_PORT, strerror(errno)); mms_error(0, "MM1", NULL, "push to %s:%d failed: %s",
octstr_get_cstr(to), WAPPUSH_PORT, strerror(errno));
} }
octstr_destroy(addr); octstr_destroy(addr);
} }
@ -216,14 +209,10 @@ static int sendNotify(MmsEnvelope *e)
if (j > 0 && j - 1 + sizeof "/TYPE=PLMN" == len) { /* A proper number. */ if (j > 0 && j - 1 + sizeof "/TYPE=PLMN" == len) { /* A proper number. */
phonenum = octstr_copy(to, 0, j); phonenum = octstr_copy(to, 0, j);
#if 0
normalize_number(octstr_get_cstr(settings->unified_prefix), &phonenum);
#else
mms_normalize_phonenum(&phonenum, mms_normalize_phonenum(&phonenum,
octstr_get_cstr(settings->unified_prefix), octstr_get_cstr(settings->unified_prefix),
settings->strip_prefixes); settings->strip_prefixes);
#endif } else if (k > 0 && k + sizeof "/TYPE=IPv" == len)
} else if (k > 0 && k + sizeof "/TYPE=IPv" == len)
rcpt_ip = octstr_copy(to, 0, k); rcpt_ip = octstr_copy(to, 0, k);
else { else {
/* We only handle phone numbers here. */ /* We only handle phone numbers here. */
@ -324,7 +313,7 @@ static int sendNotify(MmsEnvelope *e)
mtype == MMS_MSGTYPE_READ_ORIG_IND) mtype == MMS_MSGTYPE_READ_ORIG_IND)
smsg = settings->qfs->mms_queue_getdata(e); smsg = settings->qfs->mms_queue_getdata(e);
else { else {
mms_error(0, "MM1", NULL, "Unexpected message type %s for %s found in MT queue!", mms_error(0, "MM1", NULL, "Unexpected message type [%s] for [%s] found in MT queue!",
mms_message_type_to_cstr(mtype), octstr_get_cstr(to)); mms_message_type_to_cstr(mtype), octstr_get_cstr(to));
res = MMS_SEND_ERROR_FATAL; res = MMS_SEND_ERROR_FATAL;
goto done; goto done;
@ -372,8 +361,8 @@ static int sendNotify(MmsEnvelope *e)
/* Write to log */ /* Write to log */
mms_info(2, "MM1", NULL, "%s Mobile Queue MMS Send Notify: From=%s, to=%s, msgsize=%d, reason=%s. Processed in %d secs", mms_info(2, "MM1", NULL, "%s Mobile Queue MMS Send Notify: From=%s, to=%s, msgsize=%d, reason=%s. Processed in %d secs",
SEND_ERROR_STR(res), SEND_ERROR_STR(res),
octstr_get_cstr(from), octstr_get_cstr(to), msize, octstr_get_cstr(from), octstr_get_cstr(to), msize,
err ? octstr_get_cstr(err) : "", err ? octstr_get_cstr(err) : "",
(int)(time(NULL) - tnow)); (int)(time(NULL) - tnow));