1
0
Fork 0

improved scaling

This commit is contained in:
bagyenda 2008-07-18 21:30:18 +00:00
parent 06e4a94eb0
commit 7c7de0e22d
6 changed files with 170 additions and 111 deletions

View File

@ -1,3 +1,5 @@
2008-07-18 P. A. Bagyenda <bagyenda@dsmagic.com>
* Improved scalability for mmsbox - multiple receive handlers for mmc incoming and sendmms interface
2008-07-15 P. A. Bagyenda <bagyenda@dsmagic.com>
* Added file type application/vnd.mbuni.url-list for basic multipart/mixed type
2008-07-10 P. A. Bagyenda <bagyenda@dsmagic.com>

View File

@ -743,11 +743,11 @@ int mm7_soapmsg_to_httpmsg(MSoapMsg_t *m, MM7Version_t *ver, List **hdrs, Octstr
*body = mime_entity_body(mime);
*hdrs = mime_entity_headers(mime);
#if 0
debug("mms2soap", 0, "SOAP headers dump follows: ");
http_header_dump(*hdrs);
debug("mms2soap", 0, "SOAP MSG is: %s", octstr_get_cstr(*body));
#endif
mime_entity_destroy(mime);
octstr_destroy(ctype);

View File

@ -39,18 +39,8 @@
} \
} while(0)
typedef struct MmsHTTPClientInfo {
HTTPClient *client;
Octstr *ua;
Octstr *ip;
List *headers;
Octstr *url;
Octstr *body;
List *cgivars;
MmscGrp *m;
} MmsHTTPClientInfo;
static void free_clientInfo(MmsHTTPClientInfo *h, int freeh);
static int auth_check(Octstr *user, Octstr *pass, List *headers)
{
@ -150,7 +140,7 @@ int mmsbox_send_report(Octstr *from, char *report_type,
}
/* These functions are very similar to those in mmsproxy */
static void mm7soap_receive(MmsHTTPClientInfo *h)
static void mm7soap_receive(MmsBoxHTTPClientInfo *h)
{
MSoapMsg_t *mreq = NULL, *mresp = NULL;
@ -354,7 +344,7 @@ static void mm7soap_receive(MmsHTTPClientInfo *h)
octstr_destroy(mmc_id);
}
static void mm7eaif_receive(MmsHTTPClientInfo *h)
static void mm7eaif_receive(MmsBoxHTTPClientInfo *h)
{
MmsMsg *m = NULL;
List *mh = NULL;
@ -561,11 +551,44 @@ static void mm7eaif_receive(MmsHTTPClientInfo *h)
}
void mmsc_receive_func(MmscGrp *m)
static void dispatch_mm7_recv(List *rl)
{
MmsHTTPClientInfo h = {NULL};
MmsBoxHTTPClientInfo *h;
while ((h = gwlist_consume(rl)) != NULL) {
MmscGrp *m = h->m;
if (auth_check(m->incoming.user,
m->incoming.pass,
h->headers) != 0) { /* Ask it to authenticate... */
List *hh = http_create_empty_headers();
http_header_add(hh, "WWW-Authenticate",
"Basic realm=\"" MM_NAME "\"");
http_send_reply(h->client, HTTP_UNAUTHORIZED, hh,
octstr_imm("Authentication failed"));
http_destroy_headers(hh);
info(0, "MMSBox: Auth failed, incoming connection, MMC group=[%s]",
m->id ? octstr_get_cstr(m->id) : "(none)");
} else if (h->m->type == SOAP_MMSC)
mm7soap_receive(h);
else
mm7eaif_receive(h);
free_mmsbox_http_clientInfo(h, 1);
}
}
void mmsc_receive_func(MmscGrp *m)
{
int i;
MmsBoxHTTPClientInfo h = {NULL};
Octstr *err = NULL;
List *mmsc_incoming_reqs = gwlist_create();
gwlist_add_producer(mmsc_incoming_reqs);
for (i = 0; i<maxthreads; i++)
gwthread_create((gwthread_func_t *)dispatch_mm7_recv, mmsc_incoming_reqs);
h.m = m;
while(rstop == 0 &&
@ -573,74 +596,47 @@ void mmsc_receive_func(MmscGrp *m)
&h.ip, &h.url, &h.headers,
&h.body, &h.cgivars)) != NULL)
if (is_allowed_ip(m->incoming.allow_ip, m->incoming.deny_ip, h.ip)) {
MmsBoxHTTPClientInfo *hx = gw_malloc(sizeof hx[0]);
h.ua = http_header_value(h.headers, octstr_imm("User-Agent"));
*hx = h;
debug("mmsbox", 0,
" MM7 Incoming, IP=[%s], MMSC=[%s], dest_port=[%ld] ",
h.ip ? octstr_get_cstr(h.ip) : "",
octstr_get_cstr(m->id),
m->incoming.port);
/* Dump headers, url etc. */
#if 0
http_header_dump(h.headers);
if (h.body) octstr_dump(h.body, 0);
if (h.ip) octstr_dump(h.ip, 0);
#endif
if (auth_check(m->incoming.user,
m->incoming.pass,
h.headers) != 0) { /* Ask it to authenticate... */
List *hh = http_create_empty_headers();
http_header_add(hh, "WWW-Authenticate",
"Basic realm=\"" MM_NAME "\"");
http_send_reply(h.client, HTTP_UNAUTHORIZED, hh,
octstr_imm("Authentication failed"));
http_destroy_headers(hh);
info(0, "MMSBox: Auth failed, incoming connection, MMC group=[%s]",
m->id ? octstr_get_cstr(m->id) : "(none)");
} else if (m->type == SOAP_MMSC)
mm7soap_receive(&h);
else
mm7eaif_receive(&h);
free_clientInfo(&h, 0);
gwlist_produce(mmsc_incoming_reqs, hx);
} else {
h.ua = http_header_value(h.headers, octstr_imm("User-Agent"));
err = octstr_format("HTTP: Incoming IP denied MMSC[%s] ip=[%s], ua=[%s], disconnected",
m->id ? octstr_get_cstr(m->id) : "(none)",
h.ip ? octstr_get_cstr(h.ip) : "(none)",
h.ua ? octstr_get_cstr(h.ua) : "(none)");
if (err) {
error(0, "%s", octstr_get_cstr(err));
octstr_destroy(err);
}
error(0, "HTTP: Incoming IP denied MMSC[%s] ip=[%s], ua=[%s], disconnected",
m->id ? octstr_get_cstr(m->id) : "(none)",
h.ip ? octstr_get_cstr(h.ip) : "(none)",
h.ua ? octstr_get_cstr(h.ua) : "(none)");
http_send_reply(h.client, HTTP_FORBIDDEN, NULL,
octstr_imm("Access denied."));
free_clientInfo(&h, 0);
free_mmsbox_http_clientInfo(&h, 0);
}
debug("proxy", 0, "MMSBox: MM7 Shutting down...");
debug("proxy", 0, "MMSBox: MM7 receiver Shutting down...");
gwlist_remove_producer(mmsc_incoming_reqs);
gwthread_join_every((void *)dispatch_mm7_recv);
gwlist_destroy(mmsc_incoming_reqs, NULL);
debug("proxy", 0, "MMSBox: MM7 receiver Shutting down complete.");
}
static void free_clientInfo(MmsHTTPClientInfo *h, int freeh)
{
debug("free info", 0,
" entered free_clientinfo %d, ip=[%ld]", freeh, (long)h->ip);
octstr_destroy(h->ip);
octstr_destroy(h->url);
octstr_destroy(h->ua);
octstr_destroy(h->body);
http_destroy_cgiargs(h->cgivars);
http_destroy_headers(h->headers);
if (freeh) gw_free(h);
debug("free info", 0, " left free_clientinfo");
}
/* XXX Returns msgid in mmsc or NULL if error. Caller uses this for DLR issues.
* Caller must make sure throughput issues
@ -911,8 +907,12 @@ static int sendMsg(MmsEnvelope *e)
MmsMsg *msg = NULL;
int i, n;
Octstr *otransid = e->hdrs ? http_header_value(e->hdrs, octstr_imm("X-Mbuni-TransactionID")) : NULL;
msg = qfs->mms_queue_getdata(e);
if ((msg = qfs->mms_queue_getdata(e)) == NULL) {
error(0, "MMSBox queue error: Failed to load message for queue id [%s]!", e->xqfname);
goto done2;
}
for (i = 0, n = gwlist_len(e->to); i<n; i++) {
int res = MMS_SEND_OK;
MmsEnvelopeTo *to = gwlist_get(e->to, i);
@ -990,6 +990,7 @@ static int sendMsg(MmsEnvelope *e)
octstr_destroy(err);
}
done2:
mms_destroy(msg);
octstr_destroy(otransid);

View File

@ -1189,16 +1189,11 @@ static SendMmsUser *auth_user(Octstr *user, Octstr *pass)
return NULL;
}
static void sendmms_func(void *unused)
static void dispatch_sendmms_recv(List *rl)
{
HTTPClient *client = NULL;
Octstr *ip = NULL, *url = NULL;
Octstr *body = NULL;
List *cgivars = NULL, *h = NULL;
MmsBoxHTTPClientInfo *h;
while (!rstop &&
(client = http_accept_request(sendmms_port.port,
&ip, &url, &h, &body, &cgivars)) != NULL) {
while ((h = gwlist_consume(rl)) != NULL) {
SendMmsUser *u = NULL;
List *hh = http_create_empty_headers();
Octstr *username;
@ -1206,50 +1201,50 @@ static void sendmms_func(void *unused)
Octstr *err = NULL;
List *cgivar_ctypes = NULL;
int res = 0, tparse = parse_cgivars(h, body, &cgivars, &cgivar_ctypes);
username = http_cgi_variable(cgivars, "username");
password = http_cgi_variable(cgivars, "password");
int res = 0, tparse = parse_cgivars(h->headers, h->body, &h->cgivars, &cgivar_ctypes);
username = http_cgi_variable(h->cgivars, "username");
password = http_cgi_variable(h->cgivars, "password");
if (username)
octstr_strip_blanks(username);
if (password)
octstr_strip_blanks(password);
if ((u = auth_user(username, password)) != NULL &&
is_allowed_ip(sendmms_port.allow_ip, sendmms_port.deny_ip, ip)) {
is_allowed_ip(sendmms_port.allow_ip, sendmms_port.deny_ip, h->ip)) {
Octstr *data, *ctype = NULL, *mmc, *to, *from, *dlr_url;
Octstr *rr_url, *allow_adaptations, *subject = NULL;
List *lto = NULL, *rh = http_create_empty_headers();
Octstr *rb = NULL, *base_url;
int i, n, skip_ctype = 0;
Octstr *vasid = http_cgi_variable(cgivars, "vasid");
Octstr *service_code = http_cgi_variable(cgivars, "servicecode");
Octstr *mclass = http_cgi_variable(cgivars, "mclass");
Octstr *prio = http_cgi_variable(cgivars, "priority");
Octstr *distro = http_cgi_variable(cgivars, "distribution");
Octstr *send_type = http_cgi_variable(cgivars, "mms-direction");
Octstr *vasid = http_cgi_variable(h->cgivars, "vasid");
Octstr *service_code = http_cgi_variable(h->cgivars, "servicecode");
Octstr *mclass = http_cgi_variable(h->cgivars, "mclass");
Octstr *prio = http_cgi_variable(h->cgivars, "priority");
Octstr *distro = http_cgi_variable(h->cgivars, "distribution");
Octstr *send_type = http_cgi_variable(h->cgivars, "mms-direction");
Octstr *data_url = NULL;
dlr_url = http_cgi_variable(cgivars, "dlr-url");
rr_url = http_cgi_variable(cgivars, "rr-url");
allow_adaptations = http_cgi_variable(cgivars, "allow-adaptations");
mmc = http_cgi_variable(cgivars, "mmsc");
subject = http_cgi_variable(cgivars, "subject");
dlr_url = http_cgi_variable(h->cgivars, "dlr-url");
rr_url = http_cgi_variable(h->cgivars, "rr-url");
allow_adaptations = http_cgi_variable(h->cgivars, "allow-adaptations");
mmc = http_cgi_variable(h->cgivars, "mmsc");
subject = http_cgi_variable(h->cgivars, "subject");
if ((base_url = http_cgi_variable(cgivars, "base-url")) == NULL)
if ((base_url = http_cgi_variable(h->cgivars, "base-url")) == NULL)
base_url = octstr_imm("http://localhost");
else
base_url = octstr_duplicate(base_url); /* because we need to delete it below. */
/* Now get the data. */
if ((data = http_cgi_variable(cgivars, "text")) != NULL) { /* text. */
Octstr *charset = http_cgi_variable(cgivars, "charset");
if ((data = http_cgi_variable(h->cgivars, "text")) != NULL) { /* text. */
Octstr *charset = http_cgi_variable(h->cgivars, "charset");
ctype = octstr_format("text/plain");
if (charset)
octstr_format_append(ctype, "; charset=%S", charset);
} else if ((data = http_cgi_variable(cgivars, "smil")) != NULL) /* smil. */
} else if ((data = http_cgi_variable(h->cgivars, "smil")) != NULL) /* smil. */
ctype = octstr_imm("application/smil");
else if ((data = http_cgi_variable(cgivars, "content-url")) != NULL) { /* arbitrary content. */
else if ((data = http_cgi_variable(h->cgivars, "content-url")) != NULL) { /* arbitrary content. */
List *reqh = http_create_empty_headers(), *rph = NULL;
Octstr *reply = NULL, *params = NULL;
@ -1269,27 +1264,27 @@ static void sendmms_func(void *unused)
rh = rph; /* replace as real reply headers. */
skip_ctype = 1;
octstr_destroy(params);
} else if ((data = http_cgi_variable(cgivars, "content")) != NULL) { /* any content. */
} else if ((data = http_cgi_variable(h->cgivars, "content")) != NULL) { /* any content. */
Octstr *_xctype = NULL; /* ... because cgi var stuff is destroyed elsewhere, we must dup it !! */
/* If the user sent us a content element, then they should have
* sent us its type either as part of the HTTP POST (multipart/form-data)
* or as CGI VAR called content_type
*/
if ((_xctype = http_cgi_variable(cgivars, "content_type")) == NULL)
if ((_xctype = http_cgi_variable(h->cgivars, "content_type")) == NULL)
if (cgivar_ctypes)
_xctype = http_cgi_variable(cgivar_ctypes, "content");
if (_xctype)
ctype = octstr_duplicate(_xctype);
} else if (body && tparse != 0) { /* if all above fails,
} else if (h->body && tparse != 0) { /* if all above fails,
use send-mms msg body if any. */
data = octstr_duplicate(body);
ctype = http_header_value(h, octstr_imm("Content-Type"));
data = octstr_duplicate(h->body);
ctype = http_header_value(h->headers, octstr_imm("Content-Type"));
} else
rb = octstr_imm("Missing content");
if ((to = http_cgi_variable(cgivars, "to")) == NULL)
if ((to = http_cgi_variable(h->cgivars, "to")) == NULL)
rb = octstr_imm("Missing recipient!");
else
lto = octstr_split_words(to);
@ -1298,7 +1293,7 @@ static void sendmms_func(void *unused)
if (ctype && !skip_ctype)
http_header_add(rh, "Content-Type", octstr_get_cstr(ctype));
if ((from = http_cgi_variable(cgivars, "from")) != NULL) {
if ((from = http_cgi_variable(h->cgivars, "from")) != NULL) {
from = octstr_duplicate(from);
_mms_fixup_address(&from, NULL, NULL, 1);
if (from)
@ -1362,7 +1357,7 @@ static void sendmms_func(void *unused)
octstr_destroy(transid);
} else if (!rb)
rb = octstr_imm("Failed to send message");
http_send_reply(client, (res == 0) ? HTTP_OK : HTTP_BAD_REQUEST, hh,
http_send_reply(h->client, (res == 0) ? HTTP_OK : HTTP_BAD_REQUEST, hh,
rb ? rb : octstr_imm("Sent"));
info(0, "MMSBox.mmssend: u=%s, %s [%s]",
u ? octstr_get_cstr(u->user) : "none",
@ -1375,7 +1370,7 @@ static void sendmms_func(void *unused)
octstr_destroy(base_url);
octstr_destroy(data_url);
} else {
http_send_reply(client, HTTP_UNAUTHORIZED, hh,
http_send_reply(h->client, HTTP_UNAUTHORIZED, hh,
octstr_imm("Authentication failed"));
err = octstr_format("MMSBox: SendMMS, Authentication failed, "
"username=%s, password=%s!",
@ -1387,13 +1382,43 @@ static void sendmms_func(void *unused)
error(0, "%s", octstr_get_cstr(err));
octstr_destroy(err);
/* Free the ip and other stuff here. */
octstr_destroy(ip);
octstr_destroy(body);
octstr_destroy(url);
http_destroy_cgiargs(cgivars);
http_destroy_cgiargs(cgivar_ctypes);
http_destroy_headers(h);
http_destroy_headers(hh);
free_mmsbox_http_clientInfo(h, 1);
}
}
static void sendmms_func(void *unused)
{
MmsBoxHTTPClientInfo h = {NULL};
int i;
List *rl = gwlist_create();
gwlist_add_producer(rl);
for (i = 0; i<maxthreads; i++)
gwthread_create((gwthread_func_t *)dispatch_sendmms_recv, rl);
while (!rstop &&
(h.client = http_accept_request(sendmms_port.port,
&h.ip, &h.url, &h.headers, &h.body, &h.cgivars)) != NULL) {
MmsBoxHTTPClientInfo *hx = gw_malloc(sizeof hx[0]);
*hx = h;
hx->ua = http_header_value(h.headers, octstr_imm("User-Agent"));
gwlist_produce(rl, hx);
}
debug("proxy", 0, "MMSBox: MMS-Send Interface shutting down ... ");
gwlist_remove_producer(rl);
gwthread_join_every((void *)dispatch_sendmms_recv);
gwlist_destroy(rl, NULL);
debug("proxy", 0, "MMSBox: MMS-Send Interface shutdown complete. ");
}

View File

@ -74,8 +74,9 @@ int mms_load_mmsbox_settings(mCfg *cfg, gwthread_func_t *mmsc_handler_func)
if (mms_cfg_get_int(grp, octstr_imm("default-message-expiry"), &default_msgexpiry) == -1)
default_msgexpiry = DEFAULT_EXPIRE;
if (mms_cfg_get_int(grp, octstr_imm("max-send-threads"), &maxthreads) == -1)
maxthreads = 10;
if (mms_cfg_get_int(grp, octstr_imm("max-send-threads"), &maxthreads) < 0 ||
maxthreads < 1)
maxthreads = 10;
s = mms_cfg_get(grp, octstr_imm("queue-run-interval"));
if (s) {
@ -584,3 +585,21 @@ void mmsbox_cleanup_mmsc_settings(void)
if (qfs)
qfs->mms_cleanup_queue_module();
}
void free_mmsbox_http_clientInfo(MmsBoxHTTPClientInfo *h, int freeh)
{
debug("free info", 0,
" entered free_clientinfo %d, ip=[%ld]", freeh, (long)h->ip);
octstr_destroy(h->ip);
octstr_destroy(h->url);
octstr_destroy(h->ua);
octstr_destroy(h->body);
http_destroy_cgiargs(h->cgivars);
http_destroy_headers(h->headers);
if (freeh) gw_free(h);
debug("free info", 0, " left free_clientinfo");
}

View File

@ -119,4 +119,16 @@ extern MmscGrp *get_handler_mmc(Octstr *id, Octstr *to, Octstr *from);
extern Octstr *get_mmsbox_queue_dir(Octstr *from, List *to, MmscGrp *m,
Octstr **mmc_id);
extern void mmsbox_cleanup_mmsc_settings(void);
typedef struct MmsBoxHTTPClientInfo {
HTTPClient *client;
Octstr *ua;
Octstr *ip;
List *headers;
Octstr *url;
Octstr *body;
List *cgivars;
MmscGrp *m;
} MmsBoxHTTPClientInfo;
void free_mmsbox_http_clientInfo(MmsBoxHTTPClientInfo *h, int freeh);
#endif