From 4c5a8389c08e81ecfc8e374cdacd3bdd2c42f80d Mon Sep 17 00:00:00 2001 From: bagyenda <> Date: Mon, 20 Aug 2007 11:49:30 +0000 Subject: [PATCH] generalised queue handling --- mbuni/ChangeLog | 2 + mbuni/mmlib/mms_cfg.def | 2 + mbuni/mmlib/mms_queue.c | 415 +++++++++++++++++++++-------------- mbuni/mmlib/mms_queue.h | 95 +++++--- mbuni/mmsbox/bearerbox.c | 14 +- mbuni/mmsbox/mmsbox.c | 10 +- mbuni/mmsbox/mmsbox_cfg.c | 31 ++- mbuni/mmsbox/mmsbox_cfg.h | 3 + mbuni/mmsc/mmsc_cfg.c | 30 ++- mbuni/mmsc/mmsc_cfg.h | 4 + mbuni/mmsc/mmsfromemail.c | 12 +- mbuni/mmsc/mmsglobalsender.c | 16 +- mbuni/mmsc/mmsmobilesender.c | 28 +-- mbuni/mmsc/mmsproxy.c | 68 +++--- mbuni/mmsc/mmssend.c | 2 +- 15 files changed, 446 insertions(+), 286 deletions(-) diff --git a/mbuni/ChangeLog b/mbuni/ChangeLog index 6a99159..555e4d3 100644 --- a/mbuni/ChangeLog +++ b/mbuni/ChangeLog @@ -1,3 +1,5 @@ +2007-08020 P. A. Bagyenda + * Generalised MMS Queue handling interface to allow different storage/delivery mechanisms. 2007-08-09 P. A. Bagyenda * Added send-dlr-on-fetch config param to MMC settings 2007-08-08 P. A. Bagyenda diff --git a/mbuni/mmlib/mms_cfg.def b/mbuni/mmlib/mms_cfg.def index b989b17..a6fd208 100644 --- a/mbuni/mmlib/mms_cfg.def +++ b/mbuni/mmlib/mms_cfg.def @@ -45,6 +45,8 @@ SINGLE_GROUP(mbuni, OCTSTR(host-alias) OCTSTR(local-prefixes) OCTSTR(storage-directory) + OCTSTR(queue-manager-module) + OCTSTR(queue-module-init-data) OCTSTR(max-send-threads) OCTSTR(send-mail-prog) OCTSTR(unified-prefix) diff --git a/mbuni/mmlib/mms_queue.c b/mbuni/mmlib/mms_queue.c index acd93aa..0f8abb7 100644 --- a/mbuni/mmlib/mms_queue.c +++ b/mbuni/mmlib/mms_queue.c @@ -3,7 +3,7 @@ * * Queue management functions * - * Copyright (C) 2003 - 2005, Digital Solutions Ltd. - http://www.dsmagic.com + * Copyright (C) 2003 - 2007, Digital Solutions Ltd. - http://www.dsmagic.com * * Paul Bagyenda * @@ -34,35 +34,73 @@ #define DIR_SEP_S "/" -int mms_init_queuedir(Octstr *qdir) +struct qfile_t { /* Name of the queue file, pointer to it (locked). DO NOT USE THESE! */ + char name[QFNAMEMAX]; /* Name of the file. */ + char dir[QFNAMEMAX]; /* Directory in which file is .*/ + char subdir[64]; /* and the sub-directory. */ + char _pad[16]; + int fd; +}; + + +static char sdir[QFNAMEMAX*2+1]; /* top-level storage directory. */ +static int inited; +static int mms_init_queue_module(Octstr *storage_dir) { + gw_assert(inited==0); + gw_assert(storage_dir); + + strncpy(sdir, octstr_get_cstr(storage_dir), -1 + sizeof sdir); + inited = 1; + return 0; +} + +static Octstr *mms_init_queue_dir(char *qdir, int *error) +{ + Octstr *dir; int i, ret; char fbuf[512], *xqdir; - octstr_strip_blanks(qdir); - /* Remove trailing slashes. */ + gw_assert(inited); + gw_assert(qdir); - for (i = octstr_len(qdir) - 1; i >= 0; i--) - if (octstr_get_char(qdir, i) != DIR_SEP) + if (error == NULL) error = &ret; + *error = 0; + + dir = octstr_format("%s%c%s", sdir, DIR_SEP, qdir); + octstr_strip_blanks(dir); + + /* Remove trailing slashes. */ + for (i = octstr_len(dir) - 1; i >= 0; i--) + if (octstr_get_char(dir, i) != DIR_SEP) break; else - octstr_delete(qdir, i,1); + octstr_delete(dir, i,1); - xqdir = octstr_get_cstr(qdir); + xqdir = octstr_get_cstr(dir); if ((ret = mkdir(xqdir, S_IRWXU|S_IRWXG)) < 0 && - errno != EEXIST) - return errno; + errno != EEXIST) { + *error = errno; + goto done; + } for (i = 0; _TT[i]; i++) { /* initialise the top level only... */ sprintf(fbuf, "%.128s/%c", xqdir, _TT[i]); if (mkdir(fbuf, S_IRWXU|S_IRWXG) < 0 && - errno != EEXIST) - return errno; + errno != EEXIST) { + *error = errno; + goto done; + } } - return 0; + + done: + if (*error == 0) + return dir; + octstr_destroy(dir); + return NULL; } static int free_envelope(MmsEnvelope *e, int removefromqueue); @@ -141,7 +179,7 @@ static void get_subdir(char *qf, char subdir[64], char realqf[QFNAMEMAX]) * return NULL (i.e. file is being processed elsewhere -- race condition), otherwise read it. * - If should block is 1, then does a potentially blocking attempt to lock the file. */ -MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldblock) +static MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldblock) { Octstr *fname; int fd; @@ -152,6 +190,7 @@ MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldbloc char subdir[64]; char realqf[QFNAMEMAX]; char xqf[QFNAMEMAX+64]; + struct qfile_t *qfs; get_subdir(qf, subdir, realqf); /* break it down... */ @@ -168,15 +207,27 @@ MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldbloc return NULL; } - e = gw_malloc(sizeof *e); - memset(e, 0, sizeof *e); /* Clear it all .*/ + e = mms_queue_create_envelope(NULL, NULL, + NULL, + NULL, NULL, + 0, 0, + NULL, + NULL, NULL, + NULL, NULL, + NULL, + 0, + NULL, + NULL, + qf, + sizeof (struct qfile_t)); e->to = gwlist_create(); - e->qf.fd = fd; - strncpy(e->qf.name, realqf, sizeof e->qf.name); - strncpy(e->qf.subdir, subdir, sizeof e->qf.subdir); - strncpy(e->qf.dir, mms_queuedir, sizeof e->qf.dir); - strncpy(e->xqfname, qf, sizeof e->xqfname); + qfs = e->qfs_data; + + qfs->fd = fd; + strncpy(qfs->name, realqf, sizeof qfs->name); + strncpy(qfs->subdir, subdir, sizeof qfs->subdir); + strncpy(qfs->dir, mms_queuedir, sizeof qfs->dir); qdata = octstr_read_file(octstr_get_cstr(fname)); octstr_destroy(fname); @@ -339,19 +390,21 @@ static int writeenvelope(MmsEnvelope *e, int newenv) int fd; int i, n; int res = 0; - + struct qfile_t *qfs = e ? e->qfs_data : NULL; + + gw_assert(e); + if (newenv) - fd = e->qf.fd; + fd = qfs->fd; else { tfname = octstr_format( - "%s/%s%c%s.%d", e->qf.dir, e->qf.subdir, - MTF, e->qf.name + 1, random()); + "%s/%s%c%s.%d", qfs->dir, qfs->subdir, + MTF, qfs->name + 1, random()); fd = open(octstr_get_cstr(tfname), O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); if (fd < 0 ) { error(0, "mms_queueadd: Failed to open temp file %s: error = %s\n", octstr_get_cstr(tfname), strerror(errno)); - res = -1; goto done; } else if (mm_lockfile(fd, octstr_get_cstr(tfname), 0) != 0) { /* Lock it. */ @@ -359,7 +412,6 @@ static int writeenvelope(MmsEnvelope *e, int newenv) octstr_get_cstr(tfname), strerror(errno)); res = -1; goto done; - } } @@ -484,9 +536,9 @@ static int writeenvelope(MmsEnvelope *e, int newenv) if (!newenv) { /* An update */ Octstr *qfname; - qfname = octstr_format("%s/%s%s", e->qf.dir, - e->qf.subdir, - e->qf.name); + qfname = octstr_format("%s/%s%s", qfs->dir, + qfs->subdir, + qfs->name); if (rename(octstr_get_cstr(tfname), octstr_get_cstr(qfname)) < 0) { error(0, "mms_queuewrite: Failed to rename %s to %s: error = %s\n", octstr_get_cstr(qfname), octstr_get_cstr(tfname), strerror(errno)); @@ -494,14 +546,14 @@ static int writeenvelope(MmsEnvelope *e, int newenv) close(fd); /* Close new one, keep old one. */ res = -1; } else { /* On success, new descriptor replaces old one and we close old one. */ - close(e->qf.fd); - e->qf.fd = fd; + close(qfs->fd); + qfs->fd = fd; } octstr_destroy(qfname); } done: - if (tfname) octstr_destroy(tfname); + octstr_destroy(tfname); return res; } @@ -611,11 +663,14 @@ static int writemmsdata(Octstr *ms, char *df, char subdir[], char *mms_queuedir) } -static Octstr *copy_and_clean_address(Octstr *addr) +Octstr *copy_and_clean_address(Octstr *addr) { - Octstr *s = octstr_duplicate(addr); + Octstr *s; int k, i; + if (addr == NULL) return NULL; + + s = octstr_duplicate(addr); octstr_strip_blanks(s); /* Only clean up email addresses for now. */ if ((k = octstr_search_char(s, '@',0)) < 0) @@ -653,87 +708,44 @@ done: return s; } -Octstr *mms_queue_add(Octstr *from, List *to, - Octstr *subject, - Octstr *fromproxy, Octstr *viaproxy, - time_t senddate, time_t expirydate, MmsMsg *m, Octstr *token, - Octstr *vaspid, Octstr *vasid, - Octstr *url1, Octstr *url2, - List *hdrs, - int dlr, - char *directory, Octstr *mmscname) +static Octstr *mms_queue_add(Octstr *from, List *to, + Octstr *subject, + Octstr *fromproxy, Octstr *viaproxy, + time_t senddate, time_t expirydate, MmsMsg *m, Octstr *token, + Octstr *vaspid, Octstr *vasid, + Octstr *url1, Octstr *url2, + List *hdrs, + int dlr, + char *directory, Octstr *mmscname) { char qf[QFNAMEMAX], subdir[64]; - int fd, i, n; + int fd; MmsEnvelope *e; - Octstr *msgid, *r = NULL; - Octstr *ms, *res = NULL, *xfrom = NULL; - int mtype; - fd = mkqf(qf, subdir, directory); - + Octstr *ms, *res = NULL; + struct qfile_t *qfs = NULL; + + fd = mkqf(qf, subdir, directory); if (fd < 0) { error(0, "mms_queue_add[%s]: Failed err=%s\n", directory, strerror(errno)); return NULL; } res = xmake_qf(qf, subdir); - mtype = mms_messagetype(m); - /* Get MsgID, Fixup if not there and needed. */ - if ((msgid = mms_get_header_value(m, octstr_imm("Message-ID"))) == NULL) { - msgid = mms_maketransid(octstr_get_cstr(res), mmscname); - if (mtype == MMS_MSGTYPE_SEND_REQ) - mms_replace_header_value(m, "Message-ID", octstr_get_cstr(msgid)); - } + e = mms_queue_create_envelope(from, to, subject, fromproxy,viaproxy, + senddate,expirydate,token,vaspid,vasid, + url1,url2,hdrs,dlr,mmscname,m, + octstr_get_cstr(res), + sizeof(struct qfile_t)); - ms = mms_tobinary(m); /* Convert message to string. */ - xfrom = copy_and_clean_address(from); - e = gw_malloc(sizeof *e); /* Make envelope, clear it. */ - memset(e, 0, sizeof *e); - - strncpy(e->qf.name, qf, sizeof e->qf.name); - strncpy(e->qf.subdir, subdir, sizeof e->qf.subdir); - strncpy(e->qf.dir, directory, sizeof e->qf.dir); + qfs = e->qfs_data; + strncpy(qfs->name, qf, sizeof qfs->name); + strncpy(qfs->subdir, subdir, sizeof qfs->subdir); + strncpy(qfs->dir, directory, sizeof qfs->dir); - e->qf.fd = fd; - - e->msgtype = mtype; - e->from = xfrom; - e->created = time(NULL); - e->sendt = senddate; - e->expiryt = expirydate ? expirydate : time(NULL) + DEFAULT_EXPIRE; - e->lasttry = 0; - e->attempts = 0; - e->lastaccess = 0; - e->fromproxy = fromproxy; - e->viaproxy = viaproxy; - e->subject = subject; - e->to = gwlist_create(); - e->msize = octstr_len(ms); - e->msgId = msgid; - e->token = token; - e->vaspid = vaspid; - e->vasid = vasid; - e->url1 = url1; - e->url2 = url2; - e->hdrs = hdrs ? http_header_duplicate(hdrs) : NULL; - - e->dlr = dlr; - e->bill.billed = 0; - - - for (i = 0, n = to ? gwlist_len(to) : 0; ircpt = r; - t->process = 1; - - gwlist_append(e->to, t); - } + qfs->fd = fd; /* Write queue data. */ if (writeenvelope(e, 1) < 0) { @@ -742,49 +754,52 @@ Octstr *mms_queue_add(Octstr *from, List *to, goto done; } - /* Write actual data before relinquishing lock on queue file. */ qf[0]= MDF; - if (writemmsdata(ms, qf, subdir, directory) < 0) { octstr_destroy(res); res = NULL; goto done; } - close(fd); /* Close queue file, thereby letting go of locks. */ - - done: - /* Free the envelope stuff since we do not need it any more, then free 'e' */ - for (i = 0, n = gwlist_len(e->to); ito, i); - octstr_destroy(to->rcpt); - gw_free(to); - } - gwlist_destroy(e->to, NULL); - - gw_free(e); /* Free struct only, caller responsible for arguments. */ - + done: + free_envelope(e, 0); octstr_destroy(ms); - octstr_destroy(msgid); - - if (xfrom) - octstr_destroy(xfrom); - return res; } static int free_envelope(MmsEnvelope *e, int removefromqueue) { - int i, n; + struct qfile_t *qfs; + if (e == NULL) + return 0; + qfs = e->qfs_data; + if (removefromqueue) { + char fname[2*QFNAMEMAX]; - if (e == NULL) return 0; + + snprintf(fname, -1 + sizeof fname, "%s/%s%s", qfs->dir, qfs->subdir, qfs->name); + unlink(fname); + qfs->name[0] = MDF; + snprintf(fname, -1 + sizeof fname, "%s/%s%s", qfs->dir, qfs->subdir, qfs->name); + unlink(fname); + } + close(qfs->fd); /* close and unlock now that we have deleted it. */ + + mms_queue_free_envelope(e); + + return 0; +} + +void mms_queue_free_envelope(MmsEnvelope *e) +{ + MmsEnvelopeTo *x; + + if (e == NULL) return; octstr_destroy(e->msgId); - for (i = 0, n = gwlist_len(e->to); i < n; i++) { - MmsEnvelopeTo *x = gwlist_get(e->to, i); - + while ((x = gwlist_extract_first(e->to)) != NULL) { octstr_destroy(x->rcpt); gw_free(x); } @@ -802,28 +817,91 @@ static int free_envelope(MmsEnvelope *e, int removefromqueue) octstr_destroy(e->url2); http_destroy_headers(e->hdrs); - if (removefromqueue) { - char fname[2*QFNAMEMAX]; - - snprintf(fname, -1 + sizeof fname, "%s/%s%s", e->qf.dir, e->qf.subdir, e->qf.name); - unlink(fname); - e->qf.name[0] = MDF; - snprintf(fname, -1 + sizeof fname, "%s/%s%s", e->qf.dir, e->qf.subdir, e->qf.name); - unlink(fname); - } - close(e->qf.fd); /* close and unlock now that we have deleted it. */ + gw_free(e); - gw_free(e); - - return 0; } -int mms_queue_free_env(MmsEnvelope *e) +MmsEnvelope *mms_queue_create_envelope(Octstr *from, List *to, + Octstr *subject, + Octstr *fromproxy, Octstr *viaproxy, + time_t senddate, time_t expirydate, + Octstr *token, + Octstr *vaspid, Octstr *vasid, + Octstr *url1, Octstr *url2, + List *hdrs, + int dlr, + Octstr *mmscname, + MmsMsg *m, + char *xqfname, + int extra_space) { + MmsEnvelope *e; + Octstr *msgid = NULL, *ms = NULL, *r, *xfrom; + int mtype = -1, i, n; + if (m) { + mtype = mms_messagetype(m); + /* Get MsgID, Fixup if not there and needed. */ + if ((msgid = mms_get_header_value(m, octstr_imm("Message-ID"))) == NULL && + xqfname) { + msgid = mms_maketransid(xqfname, mmscname); + if (mtype == MMS_MSGTYPE_SEND_REQ) + mms_replace_header_value(m, "Message-ID", octstr_get_cstr(msgid)); + } + ms = mms_tobinary(m); + } + + xfrom = copy_and_clean_address(from); + e = gw_malloc(extra_space + sizeof *e); /* Make envelope, clear it. */ + memset(e, 0, sizeof *e); + + e->qfs_data = (void *)(e+1); /* pointer to data object for module. */ + + e->msgtype = mtype; + e->from = xfrom; + e->created = time(NULL); + e->sendt = senddate; + e->expiryt = expirydate ? expirydate : time(NULL) + DEFAULT_EXPIRE; + e->lasttry = 0; + e->attempts = 0; + e->lastaccess = 0; + e->fromproxy = fromproxy ? octstr_duplicate(fromproxy) : NULL; + e->viaproxy = viaproxy ? octstr_duplicate(viaproxy) : NULL; + e->subject = subject ? octstr_duplicate(subject) : NULL; + e->to = gwlist_create(); + e->msize = ms ? octstr_len(ms) : 0; + e->msgId = msgid; + e->token = token ? octstr_duplicate(token) : NULL; + e->vaspid = vaspid ? octstr_duplicate(vaspid) : NULL; + e->vasid = vasid ? octstr_duplicate(vasid) : NULL; + e->url1 = url1 ? octstr_duplicate(url1) : NULL; + e->url2 = url2 ? octstr_duplicate(url2) : NULL; + e->hdrs = hdrs ? http_header_duplicate(hdrs) : NULL; + + e->dlr = dlr; + + if (xqfname) + strncpy(e->xqfname, xqfname, sizeof e->xqfname); + + for (i = 0, n = to ? gwlist_len(to) : 0; ircpt = r; + t->process = 1; + gwlist_append(e->to, t); + } + octstr_destroy(ms); + return e; +} + +static int mms_queue_free_env(MmsEnvelope *e) +{ return free_envelope(e, 0); } -int mms_queue_update(MmsEnvelope *e) + +static int mms_queue_update(MmsEnvelope *e) { int i, n = (e && e->to) ? gwlist_len(e->to) : 0; int hasrcpt = 0; @@ -847,23 +925,23 @@ int mms_queue_update(MmsEnvelope *e) return writeenvelope(e, 0); } -int mms_queue_replacedata(MmsEnvelope *e, MmsMsg *m) +static int mms_queue_replacedata(MmsEnvelope *e, MmsMsg *m) { Octstr *tfname; Octstr *ms; + struct qfile_t *qfs; int ret = 0; if (!e) return -1; - tfname = octstr_format(".%c%s.%ld.%d", MDF, e->qf.name + 1, time(NULL), random()); - - ms = mms_tobinary(m); - - if (writemmsdata(ms, octstr_get_cstr(tfname), e->qf.subdir, e->qf.dir) < 0) + qfs = e->qfs_data; + tfname = octstr_format(".%c%s.%ld.%d", MDF, qfs->name + 1, time(NULL), random()); + ms = mms_tobinary(m); + if (writemmsdata(ms, octstr_get_cstr(tfname), qfs->subdir, qfs->dir) < 0) ret = -1; else { - Octstr *fname = octstr_format("%s/%s%c%s", e->qf.dir, e->qf.subdir, MDF, e->qf.name + 1); - Octstr *tmpf = octstr_format("%s/%s%S", e->qf.dir, e->qf.subdir, tfname); + Octstr *fname = octstr_format("%s/%s%c%s", qfs->dir, qfs->subdir, MDF, qfs->name + 1); + Octstr *tmpf = octstr_format("%s/%s%S", qfs->dir, qfs->subdir, tfname); if (rename(octstr_get_cstr(tmpf), octstr_get_cstr(fname)) < 0) { error(0, "mms_replacedata: Failed to write data file %s: error = %s\n", octstr_get_cstr(tmpf), strerror(errno)); @@ -879,27 +957,28 @@ int mms_queue_replacedata(MmsEnvelope *e, MmsMsg *m) return ret; } -MmsMsg *mms_queue_getdata(MmsEnvelope *e) +static MmsMsg *mms_queue_getdata(MmsEnvelope *e) { Octstr *fname; Octstr *ms; MmsMsg *m; + struct qfile_t *qfs; if (!e) return NULL; - - fname = octstr_format("%s/%s%c%s", e->qf.dir, e->qf.subdir, MDF, e->qf.name + 1); - + qfs = e->qfs_data; + + fname = octstr_format("%s/%s%c%s", qfs->dir, qfs->subdir, MDF, qfs->name + 1); ms = octstr_read_file(octstr_get_cstr(fname)); if (!ms) { error(0, "mms_queue_getdata: Failed to load data file for queue entry %s in %s", - e->qf.name, e->qf.dir); + qfs->name, qfs->dir); octstr_destroy(fname); return NULL; } m = mms_frombinary(ms, octstr_imm("")); if (!m) { error(0, "mms_queue_getdata: Failed to decode data file for queue entry %s in %s", - e->qf.name, e->qf.dir); + qfs->name, qfs->dir); octstr_destroy(fname); return NULL; } @@ -1012,9 +1091,9 @@ static int run_dir(char *topdir, char *dir, struct Qthread_t *tlist, int num_thr return ret; } -void mms_queue_run(char *dir, - int (*deliver)(MmsEnvelope *), - double sleepsecs, int num_threads, int *rstop) +static void mms_queue_run(char *dir, + int (*deliver)(MmsEnvelope *), + double sleepsecs, int num_threads, int *rstop) { struct Qthread_t *tlist; int i, qstop = 0; @@ -1072,4 +1151,22 @@ void mms_queue_run(char *dir, return; } +static int mms_cleanup_queue_module(void) +{ + return 0; +} + +/* export functions... */ +MmsQueueHandlerFuncs qfuncs = { + mms_init_queue_module, + mms_init_queue_dir, + mms_cleanup_queue_module, + mms_queue_add, + mms_queue_update, + mms_queue_getdata, + mms_queue_replacedata, + mms_queue_readenvelope, + mms_queue_run, + mms_queue_free_env +}; diff --git a/mbuni/mmlib/mms_queue.h b/mbuni/mmlib/mms_queue.h index f4bff62..1967315 100644 --- a/mbuni/mmlib/mms_queue.h +++ b/mbuni/mmlib/mms_queue.h @@ -66,49 +66,53 @@ typedef struct MmsEnvelope { Octstr *viaproxy; /* Which proxy must we send this message through. */ void *_x; /* Generic storage field used by module clients. */ - - /* DO NOT MODIFY ANYTHING BEYOND THIS POINT. */ - struct { /* Name of the queue file, pointer to it (locked). DO NOT USE THESE! */ - char name[QFNAMEMAX]; /* Name of the file. */ - char dir[QFNAMEMAX]; /* Directory in which file is .*/ - char subdir[64]; /* and the sub-directory. */ - char _pad[16]; - int fd; - } qf; + void *qfs_data; /* Queue handler module structure -- allocated for you by queue_create + * function. + */ char xqfname[64+QFNAMEMAX]; /* The full ID for the queue. Use this. */ } MmsEnvelope; -/* Given a queue directory, initialise it. Must be called at least once on each queue dir. */ -int mms_init_queuedir(Octstr *qdir); +typedef struct MmsQueueHandlerFuncs { +/* Initialise queue module. Must be called at least once on each queue dir. */ + int (*mms_init_queue_module)(Octstr *init_data); + + /* initialise a queue directory. There can be multiple directories, + * upperlevel decides what a directory is. + * module returns a directory string. + */ + Octstr *(*mms_init_queue_dir)(char *qdir, int *error); + + /* cleanup module, e.g. on exit. */ + int (*mms_cleanup_queue_module)(void); /* * Add a message to the queue, returns 0 on success -1 otherwise (error is logged). * 'to' is a list of Octstr * *. * Returns a queue file name. */ -extern Octstr *mms_queue_add(Octstr *from, List *to, - Octstr *subject, - Octstr *fromproxy, Octstr *viaproxy, - time_t senddate, time_t expirydate, MmsMsg *m, - Octstr *token, - Octstr *vaspid, Octstr *vasid, - Octstr *url1, Octstr *url2, - List *hdrs, - int dlr, - char *directory, Octstr *mmscname); - + Octstr *(*mms_queue_add)(Octstr *from, List *to, + Octstr *subject, + Octstr *fromproxy, Octstr *viaproxy, + time_t senddate, time_t expirydate, MmsMsg *m, + Octstr *token, + Octstr *vaspid, Octstr *vasid, + Octstr *url1, Octstr *url2, + List *hdrs, + int dlr, + char *directory, Octstr *mmscname); + /* * Update queue status. Returns -1 on error, 0 if queue is updated fine and * envelope is still valid, 1 if envelope is no longer valid (no more recipients.) */ -extern int mms_queue_update(MmsEnvelope *e); + int (*mms_queue_update)(MmsEnvelope *e); /* * Get the message associated with this queue entry. */ -extern MmsMsg *mms_queue_getdata(MmsEnvelope *e); + MmsMsg *(*mms_queue_getdata)(MmsEnvelope *e); /* Replace data for this queue item -- used by mm7 interface. */ -int mms_queue_replacedata(MmsEnvelope *e, MmsMsg *m); + int (*mms_queue_replacedata)(MmsEnvelope *e, MmsMsg *m); /* * Reads queue, returns up to lim queue entries that are ready for processing. send 0 for no limit. @@ -122,7 +126,7 @@ int mms_queue_replacedata(MmsEnvelope *e, MmsMsg *m); * - If should block is 1, then does a potentially blocking attempt to lock the file. */ -MmsEnvelope *mms_queue_readenvelope(char *qf, char *dir, int shouldblock); + MmsEnvelope *(*mms_queue_readenvelope)(char *qf, char *dir, int shouldblock); /* * Run the queue in the given directory. For each envelope that is due for sending, call @@ -131,12 +135,39 @@ MmsEnvelope *mms_queue_readenvelope(char *qf, char *dir, int shouldblock); * if deliver() returns 1, it has deleted envelope. * Also if rstop becomes true, queue run must stop. */ -void mms_queue_run(char *dir, - int (*deliver)(MmsEnvelope *), - double sleepsecs, - int num_threads, - int *rstop); + void (*mms_queue_run)(char *dir, + int (*deliver)(MmsEnvelope *), + double sleepsecs, + int num_threads, + int *rstop); +/* Get rid of memory used by this. Typically does internal cleanup then calls + * the general structure free-ing function below. + */ + int (*mms_queue_free_env)(MmsEnvelope *e); +} MmsQueueHandlerFuncs; + +/* Module must export this symbol: */ +extern MmsQueueHandlerFuncs qfuncs; + +/* Utility functions, generally defined. */ +/* Creates the queue envelope object, returns it. */ +MmsEnvelope *mms_queue_create_envelope(Octstr *from, List *to, + Octstr *subject, + Octstr *fromproxy, Octstr *viaproxy, + time_t senddate, time_t expirydate, + Octstr *token, + Octstr *vaspid, Octstr *vasid, + Octstr *url1, Octstr *url2, + List *hdrs, + int dlr, + Octstr *mmscname, + MmsMsg *m, + char *xqfname, + int extra_space); /* Get rid of memory used by this. */ -extern int mms_queue_free_env(MmsEnvelope *e); +void mms_queue_free_envelope(MmsEnvelope *e); + +/* utility function for 'cleaning up' addresses. */ +Octstr *copy_and_clean_address(Octstr *addr); #endif diff --git a/mbuni/mmsbox/bearerbox.c b/mbuni/mmsbox/bearerbox.c index 05973a3..b680d38 100644 --- a/mbuni/mmsbox/bearerbox.c +++ b/mbuni/mmsbox/bearerbox.c @@ -201,7 +201,7 @@ static void mm7soap_receive(MmsHTTPClientInfo *h) http_header_add(qh, "X-Mbuni-Timestamp", octstr_get_cstr(sx)); octstr_destroy(sx); } - qf = mms_queue_add(from, to, subject, + qf = qfs->mms_queue_add(from, to, subject, h->m->id, NULL, delivert, expiryt, m, linkedid, NULL, NULL, @@ -388,7 +388,7 @@ static void mm7eaif_receive(MmsHTTPClientInfo *h) mms_remove_headers(m, "X-Mms-Sender-Visibility"); /* Save it, put message id in header, return. */ - qf = mms_queue_add(hfrom, to, subject, + qf = qfs->mms_queue_add(hfrom, to, subject, h->m->id, NULL, deliveryt, expiryt, m, NULL, NULL, NULL, NULL, NULL, @@ -790,7 +790,7 @@ static int sendMsg(MmsEnvelope *e) MmsMsg *msg = NULL; int i, n; Octstr *otransid = e->hdrs ? http_header_value(e->hdrs, octstr_imm("X-Mbuni-TransactionID")) : NULL; - msg = mms_queue_getdata(e); + msg = qfs->mms_queue_getdata(e); for (i = 0, n = gwlist_len(e->to); ilasttry = tnow; - if (mms_queue_update(e) == 1) { + if (qfs->mms_queue_update(e) == 1) { e = NULL; break; /* Queue entry gone. */ } @@ -879,8 +879,8 @@ static int sendMsg(MmsEnvelope *e) e->attempts++; /* Update count of number of delivery attempts. */ e->sendt = e->lasttry + mmsbox_send_back_off * e->attempts; - if (mms_queue_update(e) != 1) - mms_queue_free_env(e); + if (qfs->mms_queue_update(e) != 1) + qfs->mms_queue_free_env(e); } return 1; /* always delete queue entry. */ @@ -888,6 +888,6 @@ static int sendMsg(MmsEnvelope *e) void mmsbox_outgoing_queue_runner(int *rstop) { - mms_queue_run(octstr_get_cstr(outgoing_qdir), + qfs->mms_queue_run(octstr_get_cstr(outgoing_qdir), sendMsg, queue_interval, maxthreads, rstop); } diff --git a/mbuni/mmsbox/mmsbox.c b/mbuni/mmsbox/mmsbox.c index ee1766f..d542fff 100644 --- a/mbuni/mmsbox/mmsbox.c +++ b/mbuni/mmsbox/mmsbox.c @@ -474,7 +474,7 @@ static int mmsbox_service_dispatch(MmsEnvelope *e) gw_assert(e->msgtype == MMS_MSGTYPE_SEND_REQ || e->msgtype == MMS_MSGTYPE_RETRIEVE_CONF); - if ((msg = mms_queue_getdata(e)) == NULL) { + if ((msg = qfs->mms_queue_getdata(e)) == NULL) { err = octstr_format("Failed to read message for queue entry %s!", e->xqfname); res = -1; @@ -530,8 +530,8 @@ done: e->sendt = e->lasttry + mmsbox_send_back_off * e->attempts; } - if (mms_queue_update(e) != 1) - mms_queue_free_env(e); + if (qfs->mms_queue_update(e) != 1) + qfs->mms_queue_free_env(e); octstr_destroy(err); octstr_destroy(keyword); @@ -588,7 +588,7 @@ int main(int argc, char *argv[]) /* Start out-going queue thread. */ qthread = gwthread_create((gwthread_func_t *)mmsbox_outgoing_queue_runner, &rstop); - mms_queue_run(octstr_get_cstr(incoming_qdir), + qfs->mms_queue_run(octstr_get_cstr(incoming_qdir), mmsbox_service_dispatch, queue_interval, maxthreads, &rstop); @@ -1091,7 +1091,7 @@ static int make_and_queue_msg(Octstr *data, Octstr *ctype, List *reply_headers, } /* Write to queue. */ - x = mms_queue_add(from, xto, subject, + x = qfs->mms_queue_add(from, xto, subject, e ? e->fromproxy : NULL, mmc, time(NULL), expiryt, m, NULL, diff --git a/mbuni/mmsbox/mmsbox_cfg.c b/mbuni/mmsbox/mmsbox_cfg.c index 73fdf6a..253c3ea 100644 --- a/mbuni/mmsbox/mmsbox_cfg.c +++ b/mbuni/mmsbox/mmsbox_cfg.c @@ -33,6 +33,7 @@ long maxthreads = 0; double queue_interval = -1; Octstr *unified_prefix; int mt_multipart = 0; +MmsQueueHandlerFuncs *qfs; /* queue functions. */ struct SendMmsPortInfo sendmms_port; @@ -82,7 +83,7 @@ int mms_load_mmsbox_settings(mCfg *cfg, gwthread_func_t *mmsc_handler_func) Octstr *gdir, *s; int send_port_ssl = 0; List *l; - int i, n; + int i, n, xx; void *catchall = NULL; if (grp == NULL) @@ -105,19 +106,29 @@ int mms_load_mmsbox_settings(mCfg *cfg, gwthread_func_t *mmsc_handler_func) panic(0, "Failed to create MMSBox storage directory: %s - %s!", octstr_get_cstr(gdir), strerror(errno)); - incoming_qdir = octstr_format("%S/mmsbox_incoming", gdir); - outgoing_qdir = octstr_format("%S/mmsbox_outgoing", gdir); - dlr_dir = octstr_format("%S/mmsbox_dlr", gdir); - - if (mms_init_queuedir(incoming_qdir) < 0) - panic(0, "Failed to initialise incoming queue directory: %s - %s!", + if ((qfs = load_module(grp, "queue-manager-module", "qfuncs")) == NULL) { + qfs = &qfuncs; /* default queue handler. */ + qfs->mms_init_queue_module(gdir); + } else { + Octstr *s = _mms_cfg_getx(grp, octstr_imm("queue-module-init-data")); + if (qfs->mms_init_queue_module(s) != 0) + panic(0, "failed to initialise queue module, with data: %s", + octstr_get_cstr(s)); + octstr_destroy(s); + } + + if ((incoming_qdir = qfs->mms_init_queue_dir("mmsbox_incoming", &xx)) == NULL || + xx != 0) + panic(0, "Failed to initialise incoming mmsbox queue directory: %s - %s!", octstr_get_cstr(incoming_qdir), strerror(errno)); - if (mms_init_queuedir(outgoing_qdir) < 0) - panic(0, "Failed to initialise outgoing queue directory: %s - %s!", + if ((outgoing_qdir = qfs->mms_init_queue_dir("mmsbox_outgoing", &xx)) == NULL || + xx != 0) + panic(0, "Failed to initialise outgoing mmsbox queue directory: %s - %s!", octstr_get_cstr(outgoing_qdir), strerror(errno)); - if (mms_init_queuedir(dlr_dir) < 0) + if ((dlr_dir = qfuncs.mms_init_queue_dir("mmsbox_dlr", &xx)) == NULL || + xx != 0) /* XXX still uses old-style file storage. */ panic(0, "Failed to initialise dlr storage directory: %s - %s!", octstr_get_cstr(dlr_dir), strerror(errno)); diff --git a/mbuni/mmsbox/mmsbox_cfg.h b/mbuni/mmsbox/mmsbox_cfg.h index 12815eb..a836f74 100644 --- a/mbuni/mmsbox/mmsbox_cfg.h +++ b/mbuni/mmsbox/mmsbox_cfg.h @@ -14,6 +14,8 @@ #define __MMSBOX_CFG_INCLUDED__ #include "mms_util.h" #include "mmsbox_mt_filter.h" +#include "mms_queue.h" + typedef struct MmscGrp { Octstr *id; /* MMSC id (for logging). */ Octstr *group_id; /* GROUP MMSC id (used for qf). */ @@ -87,6 +89,7 @@ extern struct SendMmsPortInfo { } sendmms_port; extern struct MmsBoxMTfilter *mt_filter; +extern MmsQueueHandlerFuncs *qfs; extern int mt_multipart; extern int mms_load_mmsbox_settings(mCfg *cfg, gwthread_func_t *mmsc_handler_func); extern MmscGrp *get_handler_mmc(Octstr *id, Octstr *to); diff --git a/mbuni/mmsc/mmsc_cfg.c b/mbuni/mmsc/mmsc_cfg.c index 7b99784..4f7ab3a 100644 --- a/mbuni/mmsc/mmsc_cfg.c +++ b/mbuni/mmsc/mmsc_cfg.c @@ -83,7 +83,7 @@ MmscSettings *mms_load_mmsc_settings(mCfg *cfg, List **proxyrelays) long port = -1; Octstr *from, *user, *pass; Octstr *qdir = NULL; - int i, n; + int i, n, xx; memset(m, 0, sizeof *m); @@ -122,18 +122,28 @@ MmscSettings *mms_load_mmsc_settings(mCfg *cfg, List **proxyrelays) panic(0, "Failed to create queue directory: %s - %s!", octstr_get_cstr(qdir), strerror(errno)); - m->global_queuedir = octstr_format("%S/global", qdir); - m->mm1_queuedir = octstr_format("%S/mm1", qdir); - - if (mms_init_queuedir(m->mm1_queuedir) < 0) - panic(0, "Failed to initialise local queue directory: %s - %s!", - octstr_get_cstr(m->mm1_queuedir), strerror(errno)); - else if (mms_init_queuedir(m->global_queuedir) < 0) + if ((m->qfs = load_module(grp, "queue-manager-module", "qfuncs", NULL)) == NULL) { + m->qfs = &qfuncs; /* default queue handler. */ + m->qfs->mms_init_queue_module(qdir); + } else { + Octstr *s = _mms_cfg_getx(grp, octstr_imm("queue-module-init-data")); + if (m->qfs->mms_init_queue_module(s) != 0) + panic(0, "failed to initialise queue module, with data: %s", + octstr_get_cstr(s)); + octstr_destroy(s); + } + + if ((m->global_queuedir = m->qfs->mms_init_queue_dir("global", &xx)) == NULL || + xx != 0) panic(0, "Failed to initialise global queue directory: %s - %s!", octstr_get_cstr(m->global_queuedir), strerror(errno)); - m->mmbox_rootdir = octstr_format("%S/mmbox", qdir); - + if ((m->mm1_queuedir = m->qfs->mms_init_queue_dir("mm1", &xx)) == NULL || + xx != 0) + panic(0, "Failed to initialise local queue directory: %s - %s!", + octstr_get_cstr(m->mm1_queuedir), strerror(errno)); + + m->mmbox_rootdir = octstr_format("%S/mmbox", qdir); if (mmbox_root_init(octstr_get_cstr(m->mmbox_rootdir)) != 0) panic(0, "Failed to initialise mmbox root directory, error: %s!", strerror(errno)); diff --git a/mbuni/mmsc/mmsc_cfg.h b/mbuni/mmsc/mmsc_cfg.h index 5c76d5a..3ab3b5a 100644 --- a/mbuni/mmsc/mmsc_cfg.h +++ b/mbuni/mmsc/mmsc_cfg.h @@ -16,6 +16,7 @@ #include "mms_resolve_shell.h" #include "mms_billing_shell.h" #include "mms_detokenize_shell.h" +#include "mms_queue.h" typedef struct MmsProxyRelay { Octstr *host; @@ -40,9 +41,12 @@ typedef struct MmscSettings { Octstr *name, *hostname, *host_alias; Octstr *unified_prefix, *local_prefix; Octstr *sendmail; + Octstr *global_queuedir, *mm1_queuedir; Octstr *mmbox_rootdir; + MmsQueueHandlerFuncs *qfs; + Octstr *ua_profile_cache_dir; long maxthreads; diff --git a/mbuni/mmsc/mmsfromemail.c b/mbuni/mmsc/mmsfromemail.c index 757ae08..f04124d 100644 --- a/mbuni/mmsc/mmsfromemail.c +++ b/mbuni/mmsc/mmsfromemail.c @@ -270,7 +270,7 @@ int main(int argc, char *argv[]) else dlr = 0; - qf = mms_queue_add(xfrom, lto, NULL, xproxy, NULL, + qf = settings->qfs->mms_queue_add(xfrom, lto, NULL, xproxy, NULL, 0, time(NULL) + settings->default_msgexpiry, msg, NULL, NULL, NULL, NULL, NULL, @@ -311,7 +311,7 @@ int main(int argc, char *argv[]) octstr_format_append(xto, "/TYPE=PLMN"); gwlist_append(lto, xto); - qf = mms_queue_add(xfrom, lto, NULL, + qf = settings->qfs->mms_queue_add(xfrom, lto, NULL, xproxy, NULL, 0, time(NULL) + settings->default_msgexpiry, msg, NULL, NULL, NULL, @@ -343,7 +343,7 @@ int main(int argc, char *argv[]) octstr_format_append(xto, "/TYPE=PLMN"); gwlist_append(lto, xto); - qf = mms_queue_add(xfrom, lto, NULL, + qf = settings->qfs->mms_queue_add(xfrom, lto, NULL, xproxy, NULL, 0, time(NULL) + settings->default_msgexpiry, msg, NULL, NULL, NULL, @@ -386,7 +386,7 @@ int main(int argc, char *argv[]) strip_quotes(qf); octstr_strip_blanks(o_to); strip_quotes(o_to); - e = mms_queue_readenvelope(octstr_get_cstr(qf), + e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf), octstr_get_cstr(settings->global_queuedir), 1); if (!e) @@ -430,8 +430,8 @@ int main(int argc, char *argv[]) rstatus ? octstr_get_cstr(rstatus) : "", processed ? "Sender number matched in queue file" : "Sender number not matched in queue file"); - if (mms_queue_update(e) != 1) - mms_queue_free_env(e); + if (settings->qfs->mms_queue_update(e) != 1) + settings->qfs->mms_queue_free_env(e); } } else diff --git a/mbuni/mmsc/mmsglobalsender.c b/mbuni/mmsc/mmsglobalsender.c index 624103e..c101ee3 100644 --- a/mbuni/mmsc/mmsglobalsender.c +++ b/mbuni/mmsc/mmsglobalsender.c @@ -101,7 +101,7 @@ static int sendMsg(MmsEnvelope *e) } if (amt >= -1) - if (mms_queue_update(e) == 1) /* Write queue just in case we crash. */ + if (settings->qfs->mms_queue_update(e) == 1) /* Write queue just in case we crash. */ e = NULL; if (e == NULL || @@ -111,7 +111,7 @@ static int sendMsg(MmsEnvelope *e) - msg = mms_queue_getdata(e); + msg = settings->qfs->mms_queue_getdata(e); #if 0 if (msg) mms_msgdump(msg,1); #endif @@ -315,7 +315,7 @@ static int sendMsg(MmsEnvelope *e) gwlist_append(l, octstr_duplicate(e->from)); /* Add to queue, switch via proxy to be from proxy. */ - qfs = mms_queue_add(to->rcpt, l, + qfs = settings->qfs->mms_queue_add(to->rcpt, l, err, NULL, e->fromproxy, tnow, tnow+settings->default_msgexpiry, m, NULL, NULL, NULL, @@ -355,7 +355,7 @@ static int sendMsg(MmsEnvelope *e) /* Update queue entry so that we know which ones have been processed. */ e->lasttry = tnow; - if (mms_queue_update(e) == 1) { + if (settings->qfs->mms_queue_update(e) == 1) { e = NULL; break; /* Queue entry gone. */ } @@ -370,8 +370,8 @@ static int sendMsg(MmsEnvelope *e) e->attempts++; /* Update count of number of delivery attempts. */ e->sendt = e->lasttry + settings->send_back_off * e->attempts; - if (mms_queue_update(e) != 1) - mms_queue_free_env(e); + if (settings->qfs->mms_queue_update(e) != 1) + settings->qfs->mms_queue_free_env(e); } return 1; /* Always deletes the queue entry. */ } @@ -404,7 +404,7 @@ void mbuni_global_queue_runner(int *rstop) gwthread_create(cdr_thread, NULL); - mms_queue_run(qdir, sendMsg, settings->queue_interval, settings->maxthreads, rstop); + settings->qfs->mms_queue_run(qdir, sendMsg, settings->queue_interval, settings->maxthreads, rstop); /* When it ends, wait a little for other stuff to stop... */ sleep(2); gwlist_remove_producer(cdr_list); /* Stop CDR thread. */ @@ -452,7 +452,7 @@ int mms_sendtomobile(Octstr *from, Octstr *to, x = octstr_create(tokenstr); if (m) - ret = mms_queue_add(from, l, subject, fromproxy, NULL, 0, expires, m, + ret = settings->qfs->mms_queue_add(from, l, subject, fromproxy, NULL, 0, expires, m, x, NULL, NULL, NULL, NULL, NULL, diff --git a/mbuni/mmsc/mmsmobilesender.c b/mbuni/mmsc/mmsmobilesender.c index e63a8fe..dd7aa66 100644 --- a/mbuni/mmsc/mmsmobilesender.c +++ b/mbuni/mmsc/mmsmobilesender.c @@ -40,7 +40,7 @@ static MmsEnvelope *update_env_success(MmsEnvelope *env, MmsEnvelopeTo *xto) env->sendt = env->lasttry + settings->send_back_off * env->attempts; } - if (mms_queue_update(env) == 1) + if (settings->qfs->mms_queue_update(env) == 1) env = NULL; return env; } @@ -52,7 +52,7 @@ static MmsEnvelope *update_env_failed(MmsEnvelope *env) env->sendt = tnow + settings->send_back_off; env->lasttry = tnow; - if (mms_queue_update(env) == 1) + if (settings->qfs->mms_queue_update(env) == 1) env = NULL; } return env; @@ -142,7 +142,7 @@ static void start_push(Octstr *rcpt_to, int isphonenum, MmsEnvelope *e, MmsMsg * } octstr_destroy(addr); if (e) - mms_queue_free_env(e); + settings->qfs->mms_queue_free_env(e); } done: if (to) octstr_destroy(to); @@ -218,7 +218,7 @@ static int receive_push_reply(HTTPCaller *caller) env = update_env_failed(env); push_free_env: if (env && env != &edummy) - mms_queue_free_env(env); + settings->qfs->mms_queue_free_env(env); } return 0; @@ -244,7 +244,7 @@ static int sendNotify(MmsEnvelope *e) e->sendt = e->expiryt + 3600*24*30*12; info(0, "MM1: Message [ID: %s] fetched/touched at least once. Skipping", e->xqfname); - return mms_queue_update(e); + return settings->qfs->mms_queue_update(e); } if (!xto) { @@ -253,7 +253,7 @@ static int sendNotify(MmsEnvelope *e) return 0; } - msg = mms_queue_getdata(e); + msg = settings->qfs->mms_queue_getdata(e); to = octstr_duplicate(xto->rcpt); expiryt = e->expiryt; msgId = e->msgId ? octstr_duplicate(e->msgId) : NULL; @@ -326,10 +326,10 @@ static int sendNotify(MmsEnvelope *e) e->lasttry = tnow; e->sendt = e->lasttry + settings->send_back_off * (1 + e->attempts); - if (mms_queue_update(e) == 1) + if (settings->qfs->mms_queue_update(e) == 1) e = NULL; /* Queue entry gone. */ else - mms_queue_free_env(e); + settings->qfs->mms_queue_free_env(e); goto done; } else if (send_ind == 0) { /* provisioned but does not support */ Octstr *s = octstr_format(octstr_get_cstr(settings->mms_notify_txt), @@ -357,10 +357,10 @@ static int sendNotify(MmsEnvelope *e) err = octstr_imm("No MMS Ind support, sent SMS instead"); xto->process = 0; /* No more processing. */ - if (mms_queue_update(e) == 1) + if (settings->qfs->mms_queue_update(e) == 1) e = NULL; else - mms_queue_free_env(e); + settings->qfs->mms_queue_free_env(e); goto done; } @@ -415,7 +415,7 @@ static int sendNotify(MmsEnvelope *e) gwlist_append(l, from); /* Add to queue, switch via proxy to be from proxy. */ - res = mms_queue_add(to ? to : settings->system_user, l, err, + res = settings->qfs->mms_queue_add(to ? to : settings->system_user, l, err, NULL, fromproxy, tnow, tnow+settings->default_msgexpiry, m, NULL, NULL, NULL, @@ -440,10 +440,10 @@ static int sendNotify(MmsEnvelope *e) if (res == MMS_SEND_ERROR_FATAL) { xto->process = 0; /* No more attempts to deliver, delete this. */ - if (mms_queue_update(e) == 1) + if (settings->qfs->mms_queue_update(e) == 1) e = NULL; /* Queue entry gone. */ else - mms_queue_free_env(e); + settings->qfs->mms_queue_free_env(e); } /* Else queue will be updated/freed elsewhere. */ @@ -477,7 +477,7 @@ void mbuni_mm1_queue_runner(int *rstop) return; } - mms_queue_run(octstr_get_cstr(settings->mm1_queuedir), + settings->qfs->mms_queue_run(octstr_get_cstr(settings->mm1_queuedir), sendNotify, settings->queue_interval, settings->maxthreads, rstop); sleep(2); /* Wait for it to die. */ http_caller_signal_shutdown(httpcaller); diff --git a/mbuni/mmsc/mmsproxy.c b/mbuni/mmsc/mmsproxy.c index f33a886..205a8d5 100644 --- a/mbuni/mmsc/mmsproxy.c +++ b/mbuni/mmsc/mmsproxy.c @@ -268,11 +268,11 @@ void fetchmms_proxy(MmsHTTPClientInfo *h) prof = mms_make_ua_profile(h->headers); if (loc == MMS_LOC_MQUEUE) { /* where is the message? */ - e = mms_queue_readenvelope(octstr_get_cstr(qf), + e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf), octstr_get_cstr(settings->mm1_queuedir), 1); if (!e || - (m = mms_queue_getdata(e)) == NULL) { + (m = settings->qfs->mms_queue_getdata(e)) == NULL) { error(0, "MMS Fetch interface: failed to find envelope/data %s for request url (%s) from %s (e=%s)!", octstr_get_cstr(qf), octstr_get_cstr(h->url), octstr_get_cstr(h->ip), (e)? "found" : "not found"); @@ -385,7 +385,7 @@ void fetchmms_proxy(MmsHTTPClientInfo *h) sprintf(tbuf, "%ld", time(NULL)); http_header_add(qh, "X-Mbuni-Timestamp", tbuf); /* record time of message. */ - x = mms_queue_add(from, l, NULL, NULL, NULL, 0, + x = settings->qfs->mms_queue_add(from, l, NULL, NULL, NULL, 0, time(NULL) + settings->default_msgexpiry, mrpt, NULL, NULL, NULL, NULL, NULL, @@ -404,7 +404,7 @@ void fetchmms_proxy(MmsHTTPClientInfo *h) if (e) { e->lastaccess = time(NULL); /* No more notifications requests. */ e->sendt = e->expiryt + 3600*24*30*12; - mms_queue_update(e); + settings->qfs->mms_queue_update(e); } http_header_add(rh, "Content-Type", "application/vnd.wap.mms-message"); @@ -457,7 +457,7 @@ void fetchmms_proxy(MmsHTTPClientInfo *h) http_destroy_headers(rh); - mms_queue_free_env(e); + settings->qfs->mms_queue_free_env(e); octstr_destroy(s); mms_destroy(m); @@ -594,7 +594,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h) dlr = 1; else dlr = 0; - qf = mms_queue_add(from, to, subject, + qf = settings->qfs->mms_queue_add(from, to, subject, NULL, NULL, deliveryt, expiryt, m, NULL, NULL, NULL, NULL, NULL, @@ -688,11 +688,11 @@ static void sendmms_proxy(MmsHTTPClientInfo *h) } if (mloc == MMS_LOC_MQUEUE) { /* where is the message? */ - e = mms_queue_readenvelope(octstr_get_cstr(qf), + e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf), octstr_get_cstr(settings->mm1_queuedir), 1); if (!e || - (mfwd = mms_queue_getdata(e)) == NULL) { + (mfwd = settings->qfs->mms_queue_getdata(e)) == NULL) { error(0, "MMS Send interface: failed to find envelope/data %s for forward url " "(%s) from %s (e=%s)!", @@ -774,7 +774,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h) else dlr = 0; /* Message to forward is now ready, write it to queue. */ - qf2 = mms_queue_add(from, to, subject, + qf2 = settings->qfs->mms_queue_add(from, to, subject, NULL, NULL, deliveryt, expiryt, mfwd, NULL, NULL, NULL, NULL, NULL, @@ -863,7 +863,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h) List *l = gwlist_create(); gwlist_append(l, pfrom); - x = mms_queue_add(from, l, NULL, NULL, NULL, 0, + x = settings->qfs->mms_queue_add(from, l, NULL, NULL, NULL, 0, time(NULL) + settings->default_msgexpiry, mrep, NULL, NULL, NULL, @@ -901,8 +901,8 @@ static void sendmms_proxy(MmsHTTPClientInfo *h) if (e) { /* Update the message queue and go. */ e->lastaccess = time(NULL); - if (mms_queue_update(e) != 1) /* Should be freed. */ - mms_queue_free_env(e); + if (settings->qfs->mms_queue_update(e) != 1) /* Should be freed. */ + settings->qfs->mms_queue_free_env(e); e = NULL; } @@ -931,7 +931,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h) Octstr *allow_report = mms_get_header_value(m, octstr_imm("X-Mms-Report-Allowed")); Octstr *qf = mms_getqf_fromtransid(transid); - MmsEnvelope *e = mms_queue_readenvelope(octstr_get_cstr(qf), + MmsEnvelope *e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf), octstr_get_cstr(settings->mm1_queuedir), 1); Octstr *status; @@ -974,7 +974,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h) mrpt = mms_deliveryreport(e->msgId, h->client_addr, time(NULL), status); gwlist_append(l, octstr_duplicate(e->from)); - x = mms_queue_add(from, l, NULL, NULL, NULL, 0, + x = settings->qfs->mms_queue_add(from, l, NULL, NULL, NULL, 0, time(NULL) + settings->default_msgexpiry, mrpt, NULL, NULL, NULL, NULL, NULL, @@ -995,8 +995,8 @@ static void sendmms_proxy(MmsHTTPClientInfo *h) h->ua, NULL); if (e && - mms_queue_update(e) != 1) /* Should be freed. */ - mms_queue_free_env(e); + settings->qfs->mms_queue_update(e) != 1) /* Should be freed. */ + settings->qfs->mms_queue_free_env(e); octstr_destroy(qf); octstr_destroy(transid); @@ -1024,7 +1024,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h) mms_collect_envdata_from_msgheaders(mh, &to, NULL, NULL, NULL, NULL, settings->default_msgexpiry); - x = mms_queue_add(from, to, NULL, NULL, NULL, time(NULL), + x = settings->qfs->mms_queue_add(from, to, NULL, NULL, NULL, time(NULL), time(NULL) + settings->default_msgexpiry, m, NULL, NULL, NULL, @@ -1087,11 +1087,11 @@ static void sendmms_proxy(MmsHTTPClientInfo *h) } if (mloc == MMS_LOC_MQUEUE) { /* where is the message? */ - e = mms_queue_readenvelope(octstr_get_cstr(qf), + e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf), octstr_get_cstr(settings->mm1_queuedir), 1); if (!e || - (mstore = mms_queue_getdata(e)) == NULL) { + (mstore = settings->qfs->mms_queue_getdata(e)) == NULL) { error(0, "MMS Send interface: failed to find envelope/data %s for store url " "(%s) from %s (e=%s)!", @@ -1150,8 +1150,8 @@ static void sendmms_proxy(MmsHTTPClientInfo *h) if (e) { /* Update the message queue and go. */ e->lastaccess = time(NULL); - if (mms_queue_update(e) != 1) /* Should be freed. */ - mms_queue_free_env(e); + if (settings->qfs->mms_queue_update(e) != 1) /* Should be freed. */ + settings->qfs->mms_queue_free_env(e); e = NULL; } @@ -1277,7 +1277,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h) rs = octstr_format("%dError-permanent-message-not-found", i); } else if (mloc == MMS_LOC_MQUEUE) { - MmsEnvelope *e = mms_queue_readenvelope(octstr_get_cstr(qf), + MmsEnvelope *e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf), octstr_get_cstr(settings->mm1_queuedir), 1); if (!e) rs = octstr_format("%dError-permanent-message-not-found", i); @@ -1286,8 +1286,8 @@ static void sendmms_proxy(MmsHTTPClientInfo *h) MmsEnvelopeTo *x = gwlist_get(e->to,j); if (x) x->process = 0; } - if (mms_queue_update(e) != 1) /* Should be freed. */ - mms_queue_free_env(e); + if (settings->qfs->mms_queue_update(e) != 1) /* Should be freed. */ + settings->qfs->mms_queue_free_env(e); rs = octstr_format("%dOk", i); } } else if (mloc == MMS_LOC_MMBOX) { @@ -1592,7 +1592,7 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h) if (expiryt < 0) expiryt = time(NULL) + settings->default_msgexpiry; mms_remove_headers(m, "Message-ID"); /* cannot be found here. */ - qf = mms_queue_add(from ? from : sender, to, subject, + qf = settings->qfs->mms_queue_add(from ? from : sender, to, subject, NULL, NULL, delivert, expiryt, m, NULL, h->vasp->id, vasid, @@ -1616,7 +1616,7 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h) case MM7_TAG_ReplaceReq: msgid = mm7_soap_header_value(mreq, octstr_imm("MessageID")); if (msgid && (qf = mms_getqf_fromtransid(msgid)) != NULL && - (e = mms_queue_readenvelope(octstr_get_cstr(qf), + (e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf), octstr_get_cstr(settings->global_queuedir), 1)) != NULL) { if (!e->vaspid || @@ -1626,14 +1626,14 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h) " but vaspid id=%s does not match!", octstr_get_cstr(msgid), octstr_get_cstr(h->vasp->id)); } else { /* get orig message, change headers of new, replace old. */ - MmsMsg *old = mms_queue_getdata(e); + MmsMsg *old = settings->qfs->mms_queue_getdata(e); MmsMsg *new = mm7_soap_to_mmsmsg(mreq, sender); List *hh = mms_message_headers(old); Octstr *s; if (new) { mms_add_missing_headers(new, hh); - if (mms_queue_replacedata(e, new) < 0) { + if (settings->qfs->mms_queue_replacedata(e, new) < 0) { status = 3000; error(0, "MMS Proxy(MM7): ReplaceReq: Failed to change data, " "id=%s, vasp=%s!", @@ -1652,8 +1652,8 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h) octstr_destroy(s); } - if (mms_queue_update(e) != 1) - mms_queue_free_env(e); + if (settings->qfs->mms_queue_update(e) != 1) + settings->qfs->mms_queue_free_env(e); e = NULL; mms_log("Replace", sender, NULL, -1, msgid, h->vasp->id, NULL, "MM7", h->ua, NULL); @@ -1675,7 +1675,7 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h) case MM7_TAG_CancelReq: msgid = mm7_soap_header_value(mreq, octstr_imm("MessageID")); if (msgid && (qf = mms_getqf_fromtransid(msgid)) != NULL && - (e = mms_queue_readenvelope(octstr_get_cstr(qf), + (e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf), octstr_get_cstr(settings->global_queuedir), 1)) != NULL) { if (!e->vaspid || @@ -1690,7 +1690,7 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h) MmsEnvelopeTo *xto = gwlist_get(e->to,i); xto->process = 0; } - mms_queue_update(e); /* Will clear it. */ + settings->qfs->mms_queue_update(e); /* Will clear it. */ e = NULL; mms_log("Cancel", sender, NULL, -1, msgid, h->vasp->id, NULL, "MM7", h->ua, NULL); @@ -1721,7 +1721,7 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h) mresp ? "ok" : "(null)", reply_body ? "ok" : "(null)"); - mms_queue_free_env(e); + settings->qfs->mms_queue_free_env(e); octstr_destroy(sender); octstr_destroy(from); octstr_destroy(subject); @@ -1827,7 +1827,7 @@ static void mm7eaif_dispatch(MmsHTTPClientInfo *h) mms_remove_headers(m, "X-Mms-Sender-Visibility"); /* Save it, make msgid, put message id in header, return. */ - qf = mms_queue_add(hfrom, to, subject, + qf = settings->qfs->mms_queue_add(hfrom, to, subject, NULL, NULL, deliveryt, expiryt, m, NULL, NULL, NULL, NULL, NULL, diff --git a/mbuni/mmsc/mmssend.c b/mbuni/mmsc/mmssend.c index 417d807..3a17622 100644 --- a/mbuni/mmsc/mmssend.c +++ b/mbuni/mmsc/mmssend.c @@ -150,7 +150,7 @@ int main(int argc, char *argv[]) http_header_add(h, "X-Mms-Tool", "mmssend"); http_header_add(h, "X-Mms-CalledFrom", "Terminal"); #endif - s = mms_queue_add(from, to, NULL, NULL, NULL, time(NULL), + s = settings->qfs->mms_queue_add(from, to, NULL, NULL, NULL, time(NULL), time(NULL) + settings->default_msgexpiry, m, NULL, NULL, NULL,