From 2544b6d71843b82bf25f7cc0d3a3cc9c3f6140aa Mon Sep 17 00:00:00 2001 From: bagyenda <> Date: Wed, 17 Dec 2008 17:29:40 +0000 Subject: [PATCH] fixes for mm1 push --- mbuni/ChangeLog | 2 + mbuni/extras/pgsql-queue/mms_pgsql_queue.c | 2 +- mbuni/extras/pgsql-queue/tables.sql | 1 + mbuni/mmlib/mms_util.c | 1 + mbuni/mmsc/mmsglobalsender.c | 13 +- mbuni/mmsc/mmsmobilesender.c | 252 +++++++-------------- 6 files changed, 97 insertions(+), 174 deletions(-) diff --git a/mbuni/ChangeLog b/mbuni/ChangeLog index 1cf6d60..cd9c099 100644 --- a/mbuni/ChangeLog +++ b/mbuni/ChangeLog @@ -1,3 +1,5 @@ +2008-12-17 P. A. Bagyenda + * MM1 notify: increase parallelism 2008-12-10 P. A. Bagyenda * Extra mbuni headers in mmsbox url service call * mmsbox EAIF receiver bug fix thanks to Marcin Bockowski (bocian@gmail.com) diff --git a/mbuni/extras/pgsql-queue/mms_pgsql_queue.c b/mbuni/extras/pgsql-queue/mms_pgsql_queue.c index d5c84dc..4e4f1f1 100644 --- a/mbuni/extras/pgsql-queue/mms_pgsql_queue.c +++ b/mbuni/extras/pgsql-queue/mms_pgsql_queue.c @@ -27,7 +27,7 @@ /* first we need the db connection pooling. */ #define DEFAULT_CONNECTIONS 5 -#define MIN_QRUN_INTERVAL 2 /* we don't want to hurt DB. */ +#define MIN_QRUN_INTERVAL 0.1 /* we don't want to hurt DB. */ #define MQF 'q' #define MIN_PG_VERSION 80200 /* v8.2 */ diff --git a/mbuni/extras/pgsql-queue/tables.sql b/mbuni/extras/pgsql-queue/tables.sql index 872632e..6ac1c2b 100644 --- a/mbuni/extras/pgsql-queue/tables.sql +++ b/mbuni/extras/pgsql-queue/tables.sql @@ -22,6 +22,7 @@ CREATE TABLE mms_messages ( CREATE index mm_idx1 on mms_messages(qdir); -- because we use it for lookups. CREATE index mm_idx2 on mms_messages(send_time); +CREATE index mm_idx3 on mms_messages(qfname); -- create a view for message lookup CREATE VIEW mms_messages_view AS SELECT diff --git a/mbuni/mmlib/mms_util.c b/mbuni/mmlib/mms_util.c index f4bd3e6..ed04fb9 100644 --- a/mbuni/mmlib/mms_util.c +++ b/mbuni/mmlib/mms_util.c @@ -56,6 +56,7 @@ int mms_load_core_settings(mCfg *cfg, mCfgGrp *cgrp) if (mms_cfg_get_int(cfg, cgrp, octstr_imm("log-level"), &loglevel) == -1) loglevel = 0; log_open(octstr_get_cstr(log), loglevel, GW_NON_EXCL); + log_set_log_level(GW_INFO); octstr_destroy(log); } diff --git a/mbuni/mmsc/mmsglobalsender.c b/mbuni/mmsc/mmsglobalsender.c index e3dc371..2a5b475 100644 --- a/mbuni/mmsc/mmsglobalsender.c +++ b/mbuni/mmsc/mmsglobalsender.c @@ -70,7 +70,7 @@ static int sendMsg(MmsEnvelope *e) { int i, n; MmsMsg *msg = NULL; - + time_t tstart = time(NULL); if (e->msgtype == MMS_MSGTYPE_SEND_REQ && !e->bill.billed) { /* Attempt to bill if not already billed */ @@ -405,17 +405,17 @@ static int sendMsg(MmsEnvelope *e) /* Write to log */ if (res == MMS_SEND_ERROR_FATAL) - mms_error(0, "MM2", NULL, "%s Global Queue MMS Send [%.128s]: From %s, to %s, msgsize=%ld: %s", + mms_error(0, "MM2", NULL, "%s Global Queue MMS Send [%.128s]: From %s, to %s, msgsize=%ld: %s. ", SEND_ERROR_STR(res), e->xqfname, octstr_get_cstr(e->from), octstr_get_cstr(to->rcpt), e->msize, - err ? octstr_get_cstr(err) : "(null)"); + err ? octstr_get_cstr(err) : "n/a"); else - mms_info(0, "MM2", NULL, "%s Global Queue MMS Send [%.128s]: From %s, to %s, msgsize=%ld: %s", + mms_info(0, "MM2", NULL, "%s Global Queue MMS Send [%.128s]: From %s, to %s, msgsize=%ld: %s.", SEND_ERROR_STR(res), e->xqfname, octstr_get_cstr(e->from), octstr_get_cstr(to->rcpt), e->msize, - err ? octstr_get_cstr(err) : "(null)"); + err ? octstr_get_cstr(err) : "n/a"); if (res == MMS_SEND_OK && (e->msgtype == MMS_MSGTYPE_SEND_REQ || @@ -453,6 +453,9 @@ static int sendMsg(MmsEnvelope *e) if (settings->qfs->mms_queue_update(e) != 1) settings->qfs->mms_queue_free_env(e); } + + mms_info(3, "MMSC.global", NULL, "Processed in %d secs", (int)(time(NULL) - tstart)); /* report processing time. */ + return 1; /* Always deletes the queue entry. */ } diff --git a/mbuni/mmsc/mmsmobilesender.c b/mbuni/mmsc/mmsmobilesender.c index c595a09..b565c9b 100644 --- a/mbuni/mmsc/mmsmobilesender.c +++ b/mbuni/mmsc/mmsmobilesender.c @@ -19,60 +19,52 @@ #define WAPPUSH_PORT 2948 -static HTTPCaller *httpcaller; -static MmsEnvelope edummy; - - -static MmsEnvelope *update_env_success(MmsEnvelope *env, MmsEnvelopeTo *xto) +static MmsEnvelope *update_env_success(MmsEnvelope *e, MmsEnvelopeTo *xto) { time_t tnow = time(NULL); - if (xto && !(env->msgtype == MMS_MSGTYPE_SEND_REQ || - env->msgtype == MMS_MSGTYPE_RETRIEVE_CONF)) + if (xto && !(e->msgtype == MMS_MSGTYPE_SEND_REQ || + e->msgtype == MMS_MSGTYPE_RETRIEVE_CONF)) xto->process = 0; /* No more processing. */ else { - env->lasttry = tnow; - env->attempts++; + e->lasttry = tnow; + e->attempts++; /* If max send attempts has been reached, set next try to expiry time, otherwise * use normal back-off procedure */ - if (env->attempts >= settings->maxsendattempts) - env->sendt = env->expiryt; + if (e->attempts >= settings->maxsendattempts) + e->sendt = e->expiryt; else - env->sendt = env->lasttry + settings->send_back_off * env->attempts; + e->sendt = e->lasttry + settings->send_back_off * e->attempts; + + if (settings->qfs->mms_queue_update(e) == 1) + e = NULL; } - if (settings->qfs->mms_queue_update(env) == 1) - env = NULL; - return env; + return e; } -static MmsEnvelope *update_env_failed(MmsEnvelope *env) +static MmsEnvelope *update_env_failed(MmsEnvelope *e) { + time_t tnow = time(NULL); - if (env && env != &edummy) { - env->sendt = tnow + settings->send_back_off; - env->lasttry = tnow; - - if (settings->qfs->mms_queue_update(env) == 1) - env = NULL; - } - return env; + + e->sendt = tnow + settings->send_back_off; + e->lasttry = tnow; + + if (settings->qfs->mms_queue_update(e) == 1) + e = NULL; + return e; } -static void start_push(Octstr *rcpt_to, int isphonenum, MmsEnvelope *e, MmsMsg *msg) +static void do_mm1_push(Octstr *rcpt_to, int isphonenum, MmsEnvelope *e, MmsMsg *msg) { List *pheaders; static unsigned char ct; /* Transaction counter -- do we need it? */ - Octstr *to = NULL; - - Octstr *pduhdr = octstr_create(""); - - Octstr *s = NULL; - - - mms_info(0, "MM1", NULL, "mms2mobile.startpush: notification to %s\n", octstr_get_cstr(rcpt_to)); + Octstr *to = NULL; + Octstr *pduhdr = octstr_create(""); + Octstr *s = NULL; if (!rcpt_to) { mms_error(0, "MM1", NULL, "mobilesender: Queue entry %s has no recipient address!", e->xqfname); @@ -99,29 +91,35 @@ static void start_push(Octstr *rcpt_to, int isphonenum, MmsEnvelope *e, MmsMsg * octstr_append_char(pduhdr, 0x84); /* ... */ s = mms_tobinary(msg); - if (isphonenum) { - Octstr *url; - - octstr_url_encode(to); - octstr_url_encode(s); -#if 0 - octstr_dump(pduhdr, 0); -#endif - - octstr_url_encode(pduhdr); - - url = octstr_format("%S&text=%S%S&to=%S&udh=%%06%%05%%04%%0B%%84%%23%%F0", - settings->sendsms_url, pduhdr, s, to); + Octstr *url = octstr_format("%S&text=%E%E&to=%E&udh=%%06%%05%%04%%0B%%84%%23%%F0", + settings->sendsms_url, pduhdr, s, to); + int status; + List *rph = NULL; + Octstr *rbody = NULL; + MmsEnvelopeTo *xto = gwlist_get(e->to, 0); pheaders = http_create_empty_headers(); http_header_add(pheaders, "Connection", "close"); http_header_add(pheaders, "User-Agent", MM_NAME "/" MMSC_VERSION); - http_start_request(httpcaller, HTTP_METHOD_GET, url, - pheaders, NULL, 0, e, NULL); - + if ((status = mms_url_fetch_content(HTTP_METHOD_GET, url, pheaders, NULL, &rph, &rbody)) < 0 || + http_status_class(status) != HTTP_STATUS_SUCCESSFUL) { + + mms_error(0, "MM1", NULL, " Push[%s] from %s, to %s, failed, HTTP code => %d", e->xqfname, + octstr_get_cstr(e->from), octstr_get_cstr(to), status); + + e = update_env_failed(e); + } else { /* Successful push. */ + + mms_log2("Notify", octstr_imm("system"), to, + -1, e ? e->msgId : NULL, NULL, NULL, "MM1", NULL,NULL); + e = update_env_success(e, xto); + } + http_destroy_headers(pheaders); + http_destroy_headers(rph); + octstr_destroy(rbody); octstr_destroy(url); } else { /* An IP Address: Send packet, forget. */ Octstr *addr = udp_create_address(to, WAPPUSH_PORT); @@ -144,87 +142,14 @@ static void start_push(Octstr *rcpt_to, int isphonenum, MmsEnvelope *e, MmsMsg * mms_error(0, "MM1", NULL, "push to %s:%d failed: %s", octstr_get_cstr(to), WAPPUSH_PORT, strerror(errno)); } octstr_destroy(addr); - if (e) - settings->qfs->mms_queue_free_env(e); } done: octstr_destroy(to); octstr_destroy(pduhdr); octstr_destroy(s); -} - - -static int receive_push_reply(HTTPCaller *caller) -{ - int http_status; - List *reply_headers = NULL; - Octstr *final_url = NULL, *reply_body = NULL; - - MmsEnvelope *env; - http_status = HTTP_UNAUTHORIZED; - - while ((env = http_receive_result_real(caller, &http_status, &final_url, &reply_headers, - &reply_body,1)) != NULL) { - MmsEnvelopeTo *xto = NULL; - Octstr *to = NULL; - - if (http_status == -1 || final_url == NULL) { - mms_error(0, "MM1", NULL, "push failed, no reason found"); - goto push_failed; - } - - if (env == &edummy) /* Skip this one it is a dummy. */ - goto push_free_env; - xto = gwlist_get(env->to, 0); - if (xto) - to = xto->rcpt; - else { - mms_error(0, "MM1", NULL, "mobilesender: Queue entry %s has no recipient address!", env->xqfname); - goto push_failed; - } - - mms_info(0, "MM1", NULL, "send2mobile.push_reply[%s]: From %s, to %s => %d", - env->xqfname, - octstr_get_cstr(env->from), octstr_get_cstr(to), http_status); - - if (http_status == HTTP_UNAUTHORIZED || - http_status == HTTP_NOT_FOUND || - http_status == HTTP_FORBIDDEN) { /* This is a temporary system error - * do not increase attempts, count, - * merely reschedule - * for a minute or so later. - */ - - mms_error(0, "MM1", NULL, "Deffered notification, WAP Push failed for " - "msgid %s to %s, http error: %d!", octstr_get_cstr(env->msgId), - octstr_get_cstr(to), http_status); - goto push_failed; - } - - - debug("mobilesender.push", 0, "Push reply headers were"); - http_header_dump(reply_headers); - - mms_log2("Notify", octstr_imm("system"), to, - -1, env ? env->msgId : NULL, NULL, NULL, "MM1", NULL,NULL); - - if ((env = update_env_success(env, xto)) != NULL) - goto push_free_env; - - /* Fall through. */ - push_failed: - env = update_env_failed(env); - push_free_env: - if (env && env != &edummy) - settings->qfs->mms_queue_free_env(env); - - octstr_destroy(final_url); - octstr_destroy(reply_body); - http_destroy_headers(reply_headers); - } - - return 0; + if (e) + settings->qfs->mms_queue_free_env(e); } static int sendNotify(MmsEnvelope *e) @@ -241,7 +166,7 @@ static int sendNotify(MmsEnvelope *e) time_t expiryt; char *prov_notify_event = NULL; char *rtype = NULL; - + if (e->lastaccess != 0) { /* This message has been fetched at least once, no more signals. */ e->sendt = e->expiryt + 3600*24*30*12; @@ -344,24 +269,28 @@ static int sendNotify(MmsEnvelope *e) Octstr *s = octstr_format(octstr_get_cstr(settings->mms_notify_txt), from); if (settings->notify_unprovisioned && s && octstr_len(s) > 0) { /* Only send if the string was set. */ - List *pheaders; - Octstr *sto = octstr_duplicate(phonenum); + List *pheaders = http_create_empty_headers(), *rph = NULL; + Octstr *rbody = NULL; + int status; - octstr_url_encode(s); - octstr_url_encode(sto); - - url = octstr_format("%S&text=%S&to=%S",settings->sendsms_url,s, sto); - pheaders = http_create_empty_headers(); + url = octstr_format("%S&text=%E&to=%E",settings->sendsms_url,s, phonenum); + http_header_add(pheaders, "Connection", "close"); http_header_add(pheaders, "User-Agent", MM_NAME "/" VERSION); - http_start_request(httpcaller, HTTP_METHOD_GET, url, - pheaders, NULL, 0, &edummy, NULL); + if ((status = mms_url_fetch_content(HTTP_METHOD_GET, url, + pheaders, NULL, &rph, &rbody)) <0 || + http_status_class(status) != HTTP_STATUS_SUCCESSFUL) + mms_error(0, "MM1", NULL, "Notify unprovisioned url fetch failed => %d", status); + http_destroy_headers(pheaders); + http_destroy_headers(rph); octstr_destroy(url); - octstr_destroy(sto); - } else if (s) - octstr_destroy(s); + octstr_destroy(rbody); + + } + + octstr_destroy(s); res = MMS_SEND_OK; err = octstr_imm("No MMS Ind support, sent SMS instead"); @@ -399,24 +328,22 @@ static int sendNotify(MmsEnvelope *e) goto done; } - if (smsg) - start_push(phonenum ? phonenum : rcpt_ip, - phonenum ? 1 : 0, - e, smsg); /* Send the message. - * Don't touch 'e' after this point! - * It may be freed by receive thread. - */ + if (smsg) { + do_mm1_push(phonenum ? phonenum : rcpt_ip, + phonenum ? 1 : 0, + e, smsg); /* Don't touch 'e' after this point. It is gone */ + e = NULL; + } if (smsg != msg && smsg) mms_destroy(smsg); done: - if (err != NULL && - res != MMS_SEND_ERROR_TRANSIENT) { /* If there was a report request and this is a legit error - * queue it. - */ - - if (dlr) { + if (e != NULL && + err != NULL && + res != MMS_SEND_ERROR_TRANSIENT && dlr) { /* If there was a report request and this is a legit error + * queue it. + */ MmsMsg *m = mms_deliveryreport(msgId, to, e->from, tnow, rtype ? octstr_imm(rtype) : octstr_imm("Indeterminate")); @@ -438,19 +365,18 @@ static int sendNotify(MmsEnvelope *e) settings->host_alias); gwlist_destroy(l, NULL); mms_destroy(m); - octstr_destroy(res); - } - + octstr_destroy(res); } /* Write to log */ - mms_info(0, "MM1", NULL, "%s Mobile Queue MMS Send Notify: From=%s, to=%s, msgsize=%d, reason=%s", + mms_info(2, "MM1", NULL, "%s Mobile Queue MMS Send Notify: From=%s, to=%s, msgsize=%d, reason=%s. Processed in %d secs", SEND_ERROR_STR(res), octstr_get_cstr(from), octstr_get_cstr(to), msize, - err ? octstr_get_cstr(err) : ""); + err ? octstr_get_cstr(err) : "", + (int)(time(NULL) - tnow)); - if (res == MMS_SEND_ERROR_FATAL) { + if (res == MMS_SEND_ERROR_FATAL && xto && e) { xto->process = 0; /* No more attempts to deliver, delete this. */ if (settings->qfs->mms_queue_update(e) == 1) e = NULL; /* Queue entry gone. */ @@ -465,7 +391,7 @@ static int sendNotify(MmsEnvelope *e) prov_notify_event, rtype ? rtype : "", e ? e->msgId : NULL, NULL, NULL); - + if (msg) mms_destroy(msg); octstr_destroy(phonenum); @@ -477,24 +403,14 @@ static int sendNotify(MmsEnvelope *e) octstr_destroy(from); octstr_destroy(err); - return 1; + return 1; /* Tell caller we dealt with envelope */ } void mbuni_mm1_queue_runner(int *rstop) { - httpcaller = http_caller_create(); - if (gwthread_create((gwthread_func_t *)receive_push_reply, httpcaller) < 0) { /* Listener thread. */ - mms_error(0, "MM1", NULL, "Mobile sender: Failed to create push reply thread: %d: %s!", - errno, strerror(errno)); - return; - } - settings->qfs->mms_queue_run(octstr_get_cstr(settings->mm1_queuedir), sendNotify, settings->queue_interval, settings->maxthreads, rstop); gwthread_sleep(2); /* Wait for it to die. */ - http_caller_signal_shutdown(httpcaller); - gwthread_sleep(2); - http_caller_destroy(httpcaller); return; }