1
0
Fork 0

*** empty log message ***

This commit is contained in:
bagyenda 2009-02-16 07:30:54 +00:00
parent 9a0785830c
commit 462972c4a3
14 changed files with 155 additions and 36 deletions

View File

@ -1,6 +1,7 @@
2009-02-16 P. A. Bagyenda <bagyenda@dsmagic.com> 2009-02-16 P. A. Bagyenda <bagyenda@dsmagic.com>
* Improved pgsql queue module performance * Improved pgsql queue module performance
* MM1 queue changes - expire message even if fetched at least once * MM1 queue changes - expire message even if fetched at least once
* MMSC changes: Allow start of subset of services (MM1, MM7, Relay) on host instance using mmsc-services config variable
2009-02-11 P. A. Bagyenda <bagyenda@dsmagic.com> 2009-02-11 P. A. Bagyenda <bagyenda@dsmagic.com>
* Fix: Improved pgsql queue processing -- prevent queue flooding * Fix: Improved pgsql queue processing -- prevent queue flooding
2009-02-09 P. A. Bagyenda <bagyenda@dsmagic.com> 2009-02-09 P. A. Bagyenda <bagyenda@dsmagic.com>

View File

@ -14,6 +14,8 @@ send-mail-prog = /usr/sbin/sendmail -f '%f' '%t'
unified-prefix = "+25637,037,37" unified-prefix = "+25637,037,37"
maximum-send-attempts = 50 maximum-send-attempts = 50
default-message-expiry = 360000 default-message-expiry = 360000
max-message-expiry = 720000
mmsc-services = Relay,MM1
queue-run-interval = 5 queue-run-interval = 5
send-attempt-back-off = 300 send-attempt-back-off = 300
sendsms-url = http://localhost:13013/cgi-bin/sendsms sendsms-url = http://localhost:13013/cgi-bin/sendsms

View File

@ -781,6 +781,30 @@ lists all the configuration directives. The column <b>Mode</b>
prefixes will be delivered locally (via <tt>mmsrelay</tt>) prefixes will be delivered locally (via <tt>mmsrelay</tt>)
&nbsp; &nbsp;</td> &nbsp; &nbsp;</td>
</tr> </tr>
<tr >
<td valign=top ><tt>mmsc-services</tt>
&nbsp; &nbsp;</td>
<td valign=top >
<i>MMSC</i>
&nbsp; &nbsp;
</td>
<td valign=top >
Comma-separated list
&nbsp; &nbsp;</td>
<td valign=top >
Comma-separated list of MMSC services to be started/activated on
this host. List should contain one or more of the following
items <tt>MM1</tt>, <tt>MM7</tt>, <tt>Relay</tt>: These activate MM1
message processing, MM7 message processing, and the Relay function
respectively. To start all services, leave out this directive
entirely, or set it to <tt>All</tt>.
&nbsp; &nbsp;</td>
</tr>
<tr> <tr>
<td valign=top > <td valign=top >
<tt>storage-directory <tt>storage-directory
@ -928,9 +952,31 @@ lists all the configuration directives. The column <b>Mode</b>
<td valign=top > <td valign=top >
Default number Default number
of seconds in which message expires and is purged from queue (if not yet of seconds in which message expires and is purged from queue (if not yet
delivered). This figure is overridden by whatever is in the message. delivered). This value is overridden by whatever is in the message.
&nbsp; &nbsp;</td> &nbsp; &nbsp;</td>
</tr> </tr>
<tr>
<td valign=top >
<tt>max-message-expiry</tt>
&nbsp; &nbsp;</td>
<td valign=top >
<i>ALL</i>
&nbsp; &nbsp;
</td>
<td valign=top >
Integer
&nbsp; &nbsp;</td>
<td valign=top >
Maximum age (in seconds) allowed for all messages. If set, this
determines when messages must mandatorily be expired. This cannot be
overridden by the expiry value requested in the message.
&nbsp; &nbsp;</td>
</tr>
<tr> <tr>
<td valign=top > <td valign=top >
<tt>queue-run-interval</tt> <tt>queue-run-interval</tt>

View File

@ -63,6 +63,7 @@ SINGLE_GROUP(mbuni,
OCTSTR(strip-prefixes) OCTSTR(strip-prefixes)
OCTSTR(maximum-send-attempts) OCTSTR(maximum-send-attempts)
OCTSTR(default-message-expiry) OCTSTR(default-message-expiry)
OCTSTR(max-message-expiry)
OCTSTR(queue-run-interval) OCTSTR(queue-run-interval)
OCTSTR(send-attempt-back-off) OCTSTR(send-attempt-back-off)
OCTSTR(sendsms-url) OCTSTR(sendsms-url)
@ -117,6 +118,8 @@ SINGLE_GROUP(mbuni,
OCTSTR(mmsbox-cdr-module) OCTSTR(mmsbox-cdr-module)
OCTSTR(mmsbox-cdr-module-parameters) OCTSTR(mmsbox-cdr-module-parameters)
OCTSTR(mmsc-services)
) )
MULTI_GROUP(mmsproxy, MULTI_GROUP(mmsproxy,

View File

@ -980,6 +980,7 @@ void mms_collect_envdata_from_msgheaders(List *mh, List **xto,
Octstr **subject, Octstr **subject,
Octstr **otransid, time_t *expiryt, Octstr **otransid, time_t *expiryt,
time_t *deliveryt, long default_msgexpiry, time_t *deliveryt, long default_msgexpiry,
long max_msgexpiry,
char *unified_prefix, List *strip_prefixes) char *unified_prefix, List *strip_prefixes)
{ {
@ -1035,6 +1036,10 @@ void mms_collect_envdata_from_msgheaders(List *mh, List **xto,
octstr_destroy(s); octstr_destroy(s);
} else } else
*expiryt = time(NULL) + default_msgexpiry; *expiryt = time(NULL) + default_msgexpiry;
if (max_msgexpiry > 0
&& (*expiryt - time(NULL)) > max_msgexpiry)
*expiryt = time(NULL) + max_msgexpiry;
} }
if (deliveryt) { if (deliveryt) {

View File

@ -155,6 +155,7 @@ void mms_collect_envdata_from_msgheaders(List *mh, List **xto,
Octstr **subject, Octstr **subject,
Octstr **otransid, time_t *expiryt, Octstr **otransid, time_t *expiryt,
time_t *deliveryt, long default_msgexpiry, time_t *deliveryt, long default_msgexpiry,
long max_msgexpiry,
char *unified_prefix, List *strip_prefixes); char *unified_prefix, List *strip_prefixes);
/* Simple hash function */ /* Simple hash function */

View File

@ -497,6 +497,7 @@ static int mm7eaif_receive(MmsBoxHTTPClientInfo *h)
mms_collect_envdata_from_msgheaders(mh, &to, &subject, mms_collect_envdata_from_msgheaders(mh, &to, &subject,
&otransid, &expiryt, &deliveryt, &otransid, &expiryt, &deliveryt,
DEFAULT_EXPIRE, DEFAULT_EXPIRE,
-1,
octstr_get_cstr(unified_prefix), octstr_get_cstr(unified_prefix),
strip_prefixes); strip_prefixes);

View File

@ -29,7 +29,7 @@
List *sendmms_users = NULL; /* list of SendMmsUser structs */ List *sendmms_users = NULL; /* list of SendMmsUser structs */
List *mms_services = NULL; /* list of MMS Services */ List *mms_services = NULL; /* list of MMS Services */
Octstr *incoming_qdir, *outgoing_qdir, *dlr_dir; Octstr *incoming_qdir, *outgoing_qdir, *dlr_dir;
long mmsbox_maxsendattempts, mmsbox_send_back_off, default_msgexpiry; long mmsbox_maxsendattempts, mmsbox_send_back_off, default_msgexpiry, max_msgexpiry = -1;
long maxthreads = 0; long maxthreads = 0;
double queue_interval = -1; double queue_interval = -1;
Octstr *unified_prefix = NULL; Octstr *unified_prefix = NULL;
@ -118,6 +118,9 @@ int mms_load_mmsbox_settings(Octstr *fname, gwthread_func_t *mmsc_handler_func)
if (mms_cfg_get_int(cfg, grp, octstr_imm("default-message-expiry"), &default_msgexpiry) == -1) if (mms_cfg_get_int(cfg, grp, octstr_imm("default-message-expiry"), &default_msgexpiry) == -1)
default_msgexpiry = DEFAULT_EXPIRE; default_msgexpiry = DEFAULT_EXPIRE;
if (mms_cfg_get_int(cfg, grp, octstr_imm("max-message-expiry"), &max_msgexpiry) == -1)
max_msgexpiry = -1;
if (mms_cfg_get_int(cfg, grp, octstr_imm("max-send-threads"), &maxthreads) < 0 || if (mms_cfg_get_int(cfg, grp, octstr_imm("max-send-threads"), &maxthreads) < 0 ||
maxthreads < 1) maxthreads < 1)
maxthreads = 10; maxthreads = 10;

View File

@ -126,7 +126,7 @@ extern Octstr *unified_prefix;
extern Octstr *sendmail_cmd; extern Octstr *sendmail_cmd;
extern Octstr *myhostname; extern Octstr *myhostname;
extern List *strip_prefixes; extern List *strip_prefixes;
extern long mmsbox_maxsendattempts, mmsbox_send_back_off, default_msgexpiry; extern long mmsbox_maxsendattempts, mmsbox_send_back_off, default_msgexpiry, max_msgexpiry;
extern long maxthreads; extern long maxthreads;
extern double queue_interval; extern double queue_interval;
extern struct SendMmsPortInfo { extern struct SendMmsPortInfo {

View File

@ -66,6 +66,13 @@ int main(int argc, char *argv[])
signal(SIGPIPE,SIG_IGN); /* Ignore pipe errors. They kill us sometimes for no reason*/ signal(SIGPIPE,SIG_IGN); /* Ignore pipe errors. They kill us sometimes for no reason*/
mms_info(0, "mmsc", NULL," " MM_NAME " MMSC services:%s%s%s%s",
(settings->svc_list & SvcMM1) ? " MM1" : "",
(settings->svc_list & SvcMM7) ? " MM7" : "",
(settings->svc_list & SvcRelay) ? " Relay" : "",
(settings->svc_list & (SvcMM1 | SvcMM7 | SvcRelay)) ? "" : " None");
mms_info(0, "mmsc", NULL, "----------------------------------------");
if ((r_thread = gwthread_create((gwthread_func_t *)mmsrelay, NULL)) < 0) if ((r_thread = gwthread_create((gwthread_func_t *)mmsrelay, NULL)) < 0)
panic(0, "Failed to start MMSC Relay component!"); panic(0, "Failed to start MMSC Relay component!");
@ -73,10 +80,9 @@ int main(int argc, char *argv[])
panic(0, "Failed to start MMSC Relay component!"); panic(0, "Failed to start MMSC Relay component!");
/* We are done. Cleanup. */ /* We are done. Cleanup. */
gwthread_join(r_thread);
mms_info(0, "mmsc", NULL, "MMSC shutdown commenced."); mms_info(0, "mmsc", NULL, "MMSC shutdown commenced.");
gwthread_join(r_thread);
gwthread_sleep(2); /* Wait for them to die. */ gwthread_sleep(2); /* Wait for them to die. */
mms_info(0, "mmsc", NULL, "Final cleanup..."); mms_info(0, "mmsc", NULL, "Final cleanup...");

View File

@ -124,6 +124,26 @@ MmscSettings *mms_load_mmsc_settings(Octstr *fname, List **proxyrelays, int skip
} else } else
m->strip_prefixes = NULL; m->strip_prefixes = NULL;
if ((s = mms_cfg_get(cfg, grp, octstr_imm("mmsc-services"))) != NULL) {
char *p = octstr_get_cstr(s), *q, *r;
m->svc_list = 0;
for (q = strtok_r(p, ",; ", &r);
q;
q = strtok_r(NULL, ",; ", &r))
if (strcasecmp(q, "MM1") == 0)
m->svc_list |= SvcMM1;
else if (strcasecmp(q, "MM7") == 0)
m->svc_list |= SvcMM7;
else if (strcasecmp(q, "Relay") == 0)
m->svc_list |= SvcRelay;
else if (strcasecmp(q, "All") == 0)
m->svc_list = (SvcRelay | SvcMM1 | SvcMM7);
else
mms_warning(0, "mmsc", NULL,"unknown mmsc-service [%s]. Skipped.", q);
octstr_destroy(s);
} else
m->svc_list = ~0U;
m->name = _mms_cfg_getx(cfg, grp, octstr_imm("name")); m->name = _mms_cfg_getx(cfg, grp, octstr_imm("name"));
@ -184,6 +204,9 @@ MmscSettings *mms_load_mmsc_settings(Octstr *fname, List **proxyrelays, int skip
if (mms_cfg_get_int(cfg, grp, octstr_imm("default-message-expiry"), &m->default_msgexpiry) == -1) if (mms_cfg_get_int(cfg, grp, octstr_imm("default-message-expiry"), &m->default_msgexpiry) == -1)
m->default_msgexpiry = DEFAULT_EXPIRE; m->default_msgexpiry = DEFAULT_EXPIRE;
if (mms_cfg_get_int(cfg, grp, octstr_imm("max-message-expiry"), &m->max_msgexpiry) == -1)
m->max_msgexpiry = -1;
s = _mms_cfg_getx(cfg, grp, octstr_imm("queue-run-interval")); s = _mms_cfg_getx(cfg, grp, octstr_imm("queue-run-interval"));
if (!s || (m->queue_interval = atof(octstr_get_cstr(s))) <= 0) if (!s || (m->queue_interval = atof(octstr_get_cstr(s))) <= 0)
m->queue_interval = QUEUERUN_INTERVAL; m->queue_interval = QUEUERUN_INTERVAL;

View File

@ -70,6 +70,7 @@ typedef struct MmscSettings {
long maxthreads; long maxthreads;
long maxsendattempts; long maxsendattempts;
long default_msgexpiry; long default_msgexpiry;
long max_msgexpiry;
double queue_interval; double queue_interval;
long send_back_off; long send_back_off;
@ -129,9 +130,13 @@ typedef struct MmscSettings {
Octstr *admin_allow_ip, *admin_deny_ip; Octstr *admin_allow_ip, *admin_deny_ip;
Octstr *admin_pass; Octstr *admin_pass;
long admin_thread; long admin_thread;
unsigned int svc_list; /* List of started services */
mCfg *cfg; /* have a pointer to it. */ mCfg *cfg; /* have a pointer to it. */
} MmscSettings; } MmscSettings;
enum {SvcMM1=1, SvcMM7=2, SvcRelay=4}; /* List of started services */
/* Returns mmsc settings. */ /* Returns mmsc settings. */
MmscSettings *mms_load_mmsc_settings(Octstr *fname, List **proxyrelays, int skip_admin_port); MmscSettings *mms_load_mmsc_settings(Octstr *fname, List **proxyrelays, int skip_admin_port);
MmsVasp *mmsc_load_vasp_from_conf(MmscSettings *m, mCfgGrp *grp, MmsVasp *mmsc_load_vasp_from_conf(MmscSettings *m, mCfgGrp *grp,

View File

@ -152,34 +152,42 @@ static void mm1proxy(void)
int mmsproxy(void) int mmsproxy(void)
{ {
mms_info(0, "mmsproxy", NULL, " " MM_NAME " MMSC Proxy version %s starting", MMSC_VERSION); if (!(settings->svc_list & (SvcMM7 | SvcMM1))) {
mms_info(0, "mmsproxy", NULL, " " MM_NAME " MMSC Proxy version %s, no services to be started.", MMSC_VERSION);
return 0;
} else
mms_info(0, "mmsproxy", NULL, " " MM_NAME " MMSC Proxy version %s starting", MMSC_VERSION);
mms_start_profile_engine(octstr_get_cstr(settings->ua_profile_cache_dir)); mms_start_profile_engine(octstr_get_cstr(settings->ua_profile_cache_dir));
/* If we have mm7 port, start thread for it. */ if (settings->svc_list & SvcMM7) {
/* Now open port and start dispatching requests. */ /* If we have mm7 port, start thread for it. */
if (http_open_port(settings->port, 0) < 0) { if (settings->mm7port > 0 &&
mms_error(0, "MM1", NULL, "MMS Proxy: Failed to start http server: %d => %s!", http_open_port(settings->mm7port, 0) >= 0)
errno, strerror(errno)); mm7_thread = gwthread_create((gwthread_func_t *)mm7proxy, NULL);
return -1; else
} mms_warning(0, "MM7", NULL,"MMS Proxy: MM7 interface not open, port=%ld",
settings->mm7port);
if (settings->mm7port > 0 && } else
http_open_port(settings->mm7port, 0) >= 0) mm7_thread = -1;
mm7_thread = gwthread_create((gwthread_func_t *)mm7proxy, NULL);
else if (settings->svc_list & SvcMM1) {
mms_warning(0, "MM7", NULL,"MMS Proxy: MM7 interface not open, port=%ld", /* Now open port and start dispatching requests. */
settings->mm7port); if (http_open_port(settings->port, 0) < 0)
mms_error(0, "MM1", NULL, "MMS Proxy: Failed to start http server: %d => %s!",
errno, strerror(errno));
else
mm1proxy(); /* run mm1 proxy in current thread. */
}
mm1proxy(); /* run mm1 proxy in current thread. */
if (mm7_thread >0) if (mm7_thread >0)
gwthread_join(mm7_thread); gwthread_join(mm7_thread);
mms_info(0, "mmsproxy", NULL, "Stopping profile engine..."); mms_info(0, "mmsproxy", NULL, "Stopping profile engine...");
mms_stop_profile_engine(); /* Stop profile stuff. */ mms_stop_profile_engine(); /* Stop profile stuff. */
mms_info(0, "mmsproxy", NULL,"Shutdown complete."); mms_info(0, "mmsproxy", NULL,"Shutdown complete.");
return 0; return 0;
} }
@ -190,8 +198,10 @@ void stop_mmsproxy(void)
mms_info(0, "mmsproxy", NULL, "Shutdown commenced..."); mms_info(0, "mmsproxy", NULL, "Shutdown commenced...");
rstop = 1; rstop = 1;
http_close_port(settings->port); if (settings->svc_list & SvcMM1)
http_close_port(settings->mm7port); http_close_port(settings->port);
if (settings->svc_list & SvcMM7)
http_close_port(settings->mm7port);
if (mm7_thread > 0) if (mm7_thread > 0)
gwthread_wakeup(mm7_thread); gwthread_wakeup(mm7_thread);
mms_info(0, "mmsproxy", NULL, "Signalling shutdown complete."); mms_info(0, "mmsproxy", NULL, "Signalling shutdown complete.");
@ -533,6 +543,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
mms_collect_envdata_from_msgheaders(mh, &to, &subject, &otransid, &expiryt, mms_collect_envdata_from_msgheaders(mh, &to, &subject, &otransid, &expiryt,
&deliveryt, settings->default_msgexpiry, &deliveryt, settings->default_msgexpiry,
settings->max_msgexpiry,
NULL, NULL); /* already normalized. */ NULL, NULL); /* already normalized. */
if (!h->client_addr) { if (!h->client_addr) {
@ -658,6 +669,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
mms_collect_envdata_from_msgheaders(mh, &to, &subject, &otransid, &expiryt, mms_collect_envdata_from_msgheaders(mh, &to, &subject, &otransid, &expiryt,
&deliveryt, settings->default_msgexpiry, &deliveryt, settings->default_msgexpiry,
settings->max_msgexpiry,
NULL, NULL); NULL, NULL);
if (!h->client_addr) { if (!h->client_addr) {
@ -1004,6 +1016,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
mms_collect_envdata_from_msgheaders(mh, &to, NULL, NULL, NULL, NULL, mms_collect_envdata_from_msgheaders(mh, &to, NULL, NULL, NULL, NULL,
settings->default_msgexpiry, settings->default_msgexpiry,
settings->max_msgexpiry,
NULL, NULL); NULL, NULL);
x = settings->qfs->mms_queue_add(from, to, NULL, NULL, NULL, time(NULL), x = settings->qfs->mms_queue_add(from, to, NULL, NULL, NULL, time(NULL),
@ -1776,6 +1789,7 @@ static void mm7eaif_dispatch(MmsHTTPClientInfo *h)
*/ */
mms_collect_envdata_from_msgheaders(mh, &to, &subject, &otransid, &expiryt, mms_collect_envdata_from_msgheaders(mh, &to, &subject, &otransid, &expiryt,
&deliveryt, settings->default_msgexpiry, &deliveryt, settings->default_msgexpiry,
settings->max_msgexpiry,
NULL, NULL); NULL, NULL);

View File

@ -20,17 +20,26 @@ static int rstop = 0; /* Set to 1 to stop relay. */
int mmsrelay() int mmsrelay()
{ {
mms_info(0, "mmsrelay", NULL, " " MM_NAME " MMSC Relay version %s starting", MMSC_VERSION); if (!(settings->svc_list & (SvcMM1 | SvcRelay))) {
mms_info(0, "mmsrelay", NULL, " " MM_NAME " MMSC Relay version %s, no services to be started.", MMSC_VERSION);
return 0;
} else
mms_info(0, "mmsrelay", NULL, " " MM_NAME " MMSC Relay version %s starting", MMSC_VERSION);
/* Start global queue runner. */ /* Start global queue runner. */
mms_info(0, "mmsrelay", NULL, "Starting Global Queue Runner..."); if (settings->svc_list & SvcRelay) {
qthread = gwthread_create((gwthread_func_t *)mbuni_global_queue_runner, &rstop); mms_info(0, "mmsrelay", NULL, "Starting Global Queue Runner...");
qthread = gwthread_create((gwthread_func_t *)mbuni_global_queue_runner, &rstop);
/* Start the local queue runner. */ }
mms_info(0, "mmsrelay", NULL,"Starting Local Queue Runner...");
mbuni_mm1_queue_runner(&rstop);
gwthread_join(qthread); /* Wait for it to die... */ if (settings->svc_list & SvcMM1) {
/* Start the local queue runner. */
mms_info(0, "mmsrelay", NULL,"Starting Local Queue Runner...");
mbuni_mm1_queue_runner(&rstop);
}
if (qthread >= 0)
gwthread_join(qthread); /* Wait for it to die... */
mms_info(0, "mmsrelay", NULL, "MMSC Relay MM1 queue runner terminates..."); mms_info(0, "mmsrelay", NULL, "MMSC Relay MM1 queue runner terminates...");
return 0; return 0;