1
0
Fork 0

generalised queue handling

This commit is contained in:
bagyenda 2007-08-20 11:49:30 +00:00
parent 8438ca779d
commit 4c5a8389c0
15 changed files with 446 additions and 286 deletions

View File

@ -1,3 +1,5 @@
2007-08020 P. A. Bagyenda <bagyenda@dsmagic.com>
* Generalised MMS Queue handling interface to allow different storage/delivery mechanisms.
2007-08-09 P. A. Bagyenda <bagyenda@dsmagic.com>
* Added send-dlr-on-fetch config param to MMC settings
2007-08-08 P. A. Bagyenda <bagyenda@dsmagic.com>

View File

@ -45,6 +45,8 @@ SINGLE_GROUP(mbuni,
OCTSTR(host-alias)
OCTSTR(local-prefixes)
OCTSTR(storage-directory)
OCTSTR(queue-manager-module)
OCTSTR(queue-module-init-data)
OCTSTR(max-send-threads)
OCTSTR(send-mail-prog)
OCTSTR(unified-prefix)

View File

@ -3,7 +3,7 @@
*
* Queue management functions
*
* Copyright (C) 2003 - 2005, Digital Solutions Ltd. - http://www.dsmagic.com
* Copyright (C) 2003 - 2007, Digital Solutions Ltd. - http://www.dsmagic.com
*
* Paul Bagyenda <bagyenda@dsmagic.com>
*
@ -34,35 +34,73 @@
#define DIR_SEP_S "/"
int mms_init_queuedir(Octstr *qdir)
struct qfile_t { /* Name of the queue file, pointer to it (locked). DO NOT USE THESE! */
char name[QFNAMEMAX]; /* Name of the file. */
char dir[QFNAMEMAX]; /* Directory in which file is .*/
char subdir[64]; /* and the sub-directory. */
char _pad[16];
int fd;
};
static char sdir[QFNAMEMAX*2+1]; /* top-level storage directory. */
static int inited;
static int mms_init_queue_module(Octstr *storage_dir)
{
gw_assert(inited==0);
gw_assert(storage_dir);
strncpy(sdir, octstr_get_cstr(storage_dir), -1 + sizeof sdir);
inited = 1;
return 0;
}
static Octstr *mms_init_queue_dir(char *qdir, int *error)
{
Octstr *dir;
int i, ret;
char fbuf[512], *xqdir;
octstr_strip_blanks(qdir);
/* Remove trailing slashes. */
gw_assert(inited);
gw_assert(qdir);
for (i = octstr_len(qdir) - 1; i >= 0; i--)
if (octstr_get_char(qdir, i) != DIR_SEP)
if (error == NULL) error = &ret;
*error = 0;
dir = octstr_format("%s%c%s", sdir, DIR_SEP, qdir);
octstr_strip_blanks(dir);
/* Remove trailing slashes. */
for (i = octstr_len(dir) - 1; i >= 0; i--)
if (octstr_get_char(dir, i) != DIR_SEP)
break;
else
octstr_delete(qdir, i,1);
octstr_delete(dir, i,1);
xqdir = octstr_get_cstr(qdir);
xqdir = octstr_get_cstr(dir);
if ((ret = mkdir(xqdir,
S_IRWXU|S_IRWXG)) < 0 &&
errno != EEXIST)
return errno;
errno != EEXIST) {
*error = errno;
goto done;
}
for (i = 0; _TT[i]; i++) { /* initialise the top level only... */
sprintf(fbuf, "%.128s/%c", xqdir, _TT[i]);
if (mkdir(fbuf,
S_IRWXU|S_IRWXG) < 0 &&
errno != EEXIST)
return errno;
errno != EEXIST) {
*error = errno;
goto done;
}
}
return 0;
done:
if (*error == 0)
return dir;
octstr_destroy(dir);
return NULL;
}
static int free_envelope(MmsEnvelope *e, int removefromqueue);
@ -141,7 +179,7 @@ static void get_subdir(char *qf, char subdir[64], char realqf[QFNAMEMAX])
* return NULL (i.e. file is being processed elsewhere -- race condition), otherwise read it.
* - If should block is 1, then does a potentially blocking attempt to lock the file.
*/
MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldblock)
static MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldblock)
{
Octstr *fname;
int fd;
@ -152,6 +190,7 @@ MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldbloc
char subdir[64];
char realqf[QFNAMEMAX];
char xqf[QFNAMEMAX+64];
struct qfile_t *qfs;
get_subdir(qf, subdir, realqf); /* break it down... */
@ -168,15 +207,27 @@ MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldbloc
return NULL;
}
e = gw_malloc(sizeof *e);
memset(e, 0, sizeof *e); /* Clear it all .*/
e = mms_queue_create_envelope(NULL, NULL,
NULL,
NULL, NULL,
0, 0,
NULL,
NULL, NULL,
NULL, NULL,
NULL,
0,
NULL,
NULL,
qf,
sizeof (struct qfile_t));
e->to = gwlist_create();
e->qf.fd = fd;
strncpy(e->qf.name, realqf, sizeof e->qf.name);
strncpy(e->qf.subdir, subdir, sizeof e->qf.subdir);
strncpy(e->qf.dir, mms_queuedir, sizeof e->qf.dir);
strncpy(e->xqfname, qf, sizeof e->xqfname);
qfs = e->qfs_data;
qfs->fd = fd;
strncpy(qfs->name, realqf, sizeof qfs->name);
strncpy(qfs->subdir, subdir, sizeof qfs->subdir);
strncpy(qfs->dir, mms_queuedir, sizeof qfs->dir);
qdata = octstr_read_file(octstr_get_cstr(fname));
octstr_destroy(fname);
@ -339,19 +390,21 @@ static int writeenvelope(MmsEnvelope *e, int newenv)
int fd;
int i, n;
int res = 0;
struct qfile_t *qfs = e ? e->qfs_data : NULL;
gw_assert(e);
if (newenv)
fd = e->qf.fd;
fd = qfs->fd;
else {
tfname = octstr_format(
"%s/%s%c%s.%d", e->qf.dir, e->qf.subdir,
MTF, e->qf.name + 1, random());
"%s/%s%c%s.%d", qfs->dir, qfs->subdir,
MTF, qfs->name + 1, random());
fd = open(octstr_get_cstr(tfname),
O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if (fd < 0 ) {
error(0, "mms_queueadd: Failed to open temp file %s: error = %s\n",
octstr_get_cstr(tfname), strerror(errno));
res = -1;
goto done;
} else if (mm_lockfile(fd, octstr_get_cstr(tfname), 0) != 0) { /* Lock it. */
@ -359,7 +412,6 @@ static int writeenvelope(MmsEnvelope *e, int newenv)
octstr_get_cstr(tfname), strerror(errno));
res = -1;
goto done;
}
}
@ -484,9 +536,9 @@ static int writeenvelope(MmsEnvelope *e, int newenv)
if (!newenv) { /* An update */
Octstr *qfname;
qfname = octstr_format("%s/%s%s", e->qf.dir,
e->qf.subdir,
e->qf.name);
qfname = octstr_format("%s/%s%s", qfs->dir,
qfs->subdir,
qfs->name);
if (rename(octstr_get_cstr(tfname), octstr_get_cstr(qfname)) < 0) {
error(0, "mms_queuewrite: Failed to rename %s to %s: error = %s\n",
octstr_get_cstr(qfname), octstr_get_cstr(tfname), strerror(errno));
@ -494,14 +546,14 @@ static int writeenvelope(MmsEnvelope *e, int newenv)
close(fd); /* Close new one, keep old one. */
res = -1;
} else { /* On success, new descriptor replaces old one and we close old one. */
close(e->qf.fd);
e->qf.fd = fd;
close(qfs->fd);
qfs->fd = fd;
}
octstr_destroy(qfname);
}
done:
if (tfname) octstr_destroy(tfname);
octstr_destroy(tfname);
return res;
}
@ -611,11 +663,14 @@ static int writemmsdata(Octstr *ms, char *df, char subdir[], char *mms_queuedir)
}
static Octstr *copy_and_clean_address(Octstr *addr)
Octstr *copy_and_clean_address(Octstr *addr)
{
Octstr *s = octstr_duplicate(addr);
Octstr *s;
int k, i;
if (addr == NULL) return NULL;
s = octstr_duplicate(addr);
octstr_strip_blanks(s);
/* Only clean up email addresses for now. */
if ((k = octstr_search_char(s, '@',0)) < 0)
@ -653,87 +708,44 @@ done:
return s;
}
Octstr *mms_queue_add(Octstr *from, List *to,
Octstr *subject,
Octstr *fromproxy, Octstr *viaproxy,
time_t senddate, time_t expirydate, MmsMsg *m, Octstr *token,
Octstr *vaspid, Octstr *vasid,
Octstr *url1, Octstr *url2,
List *hdrs,
int dlr,
char *directory, Octstr *mmscname)
static Octstr *mms_queue_add(Octstr *from, List *to,
Octstr *subject,
Octstr *fromproxy, Octstr *viaproxy,
time_t senddate, time_t expirydate, MmsMsg *m, Octstr *token,
Octstr *vaspid, Octstr *vasid,
Octstr *url1, Octstr *url2,
List *hdrs,
int dlr,
char *directory, Octstr *mmscname)
{
char qf[QFNAMEMAX], subdir[64];
int fd, i, n;
int fd;
MmsEnvelope *e;
Octstr *msgid, *r = NULL;
Octstr *ms, *res = NULL, *xfrom = NULL;
int mtype;
fd = mkqf(qf, subdir, directory);
Octstr *ms, *res = NULL;
struct qfile_t *qfs = NULL;
fd = mkqf(qf, subdir, directory);
if (fd < 0) {
error(0, "mms_queue_add[%s]: Failed err=%s\n", directory, strerror(errno));
return NULL;
}
res = xmake_qf(qf, subdir);
mtype = mms_messagetype(m);
/* Get MsgID, Fixup if not there and needed. */
if ((msgid = mms_get_header_value(m, octstr_imm("Message-ID"))) == NULL) {
msgid = mms_maketransid(octstr_get_cstr(res), mmscname);
if (mtype == MMS_MSGTYPE_SEND_REQ)
mms_replace_header_value(m, "Message-ID", octstr_get_cstr(msgid));
}
e = mms_queue_create_envelope(from, to, subject, fromproxy,viaproxy,
senddate,expirydate,token,vaspid,vasid,
url1,url2,hdrs,dlr,mmscname,m,
octstr_get_cstr(res),
sizeof(struct qfile_t));
ms = mms_tobinary(m); /* Convert message to string. */
xfrom = copy_and_clean_address(from);
e = gw_malloc(sizeof *e); /* Make envelope, clear it. */
memset(e, 0, sizeof *e);
strncpy(e->qf.name, qf, sizeof e->qf.name);
strncpy(e->qf.subdir, subdir, sizeof e->qf.subdir);
strncpy(e->qf.dir, directory, sizeof e->qf.dir);
qfs = e->qfs_data;
strncpy(qfs->name, qf, sizeof qfs->name);
strncpy(qfs->subdir, subdir, sizeof qfs->subdir);
strncpy(qfs->dir, directory, sizeof qfs->dir);
e->qf.fd = fd;
e->msgtype = mtype;
e->from = xfrom;
e->created = time(NULL);
e->sendt = senddate;
e->expiryt = expirydate ? expirydate : time(NULL) + DEFAULT_EXPIRE;
e->lasttry = 0;
e->attempts = 0;
e->lastaccess = 0;
e->fromproxy = fromproxy;
e->viaproxy = viaproxy;
e->subject = subject;
e->to = gwlist_create();
e->msize = octstr_len(ms);
e->msgId = msgid;
e->token = token;
e->vaspid = vaspid;
e->vasid = vasid;
e->url1 = url1;
e->url2 = url2;
e->hdrs = hdrs ? http_header_duplicate(hdrs) : NULL;
e->dlr = dlr;
e->bill.billed = 0;
for (i = 0, n = to ? gwlist_len(to) : 0; i<n; i++)
if ((r = gwlist_get(to, i)) != NULL &&
(r = copy_and_clean_address(r)) != NULL) {
MmsEnvelopeTo *t = gw_malloc(sizeof *t);
t->rcpt = r;
t->process = 1;
gwlist_append(e->to, t);
}
qfs->fd = fd;
/* Write queue data. */
if (writeenvelope(e, 1) < 0) {
@ -742,49 +754,52 @@ Octstr *mms_queue_add(Octstr *from, List *to,
goto done;
}
/* Write actual data before relinquishing lock on queue file. */
qf[0]= MDF;
if (writemmsdata(ms, qf, subdir, directory) < 0) {
octstr_destroy(res);
res = NULL;
goto done;
}
close(fd); /* Close queue file, thereby letting go of locks. */
done:
/* Free the envelope stuff since we do not need it any more, then free 'e' */
for (i = 0, n = gwlist_len(e->to); i<n; i++) {
MmsEnvelopeTo *to = gwlist_get(e->to, i);
octstr_destroy(to->rcpt);
gw_free(to);
}
gwlist_destroy(e->to, NULL);
gw_free(e); /* Free struct only, caller responsible for arguments. */
done:
free_envelope(e, 0);
octstr_destroy(ms);
octstr_destroy(msgid);
if (xfrom)
octstr_destroy(xfrom);
return res;
}
static int free_envelope(MmsEnvelope *e, int removefromqueue)
{
int i, n;
struct qfile_t *qfs;
if (e == NULL)
return 0;
qfs = e->qfs_data;
if (removefromqueue) {
char fname[2*QFNAMEMAX];
if (e == NULL) return 0;
snprintf(fname, -1 + sizeof fname, "%s/%s%s", qfs->dir, qfs->subdir, qfs->name);
unlink(fname);
qfs->name[0] = MDF;
snprintf(fname, -1 + sizeof fname, "%s/%s%s", qfs->dir, qfs->subdir, qfs->name);
unlink(fname);
}
close(qfs->fd); /* close and unlock now that we have deleted it. */
mms_queue_free_envelope(e);
return 0;
}
void mms_queue_free_envelope(MmsEnvelope *e)
{
MmsEnvelopeTo *x;
if (e == NULL) return;
octstr_destroy(e->msgId);
for (i = 0, n = gwlist_len(e->to); i < n; i++) {
MmsEnvelopeTo *x = gwlist_get(e->to, i);
while ((x = gwlist_extract_first(e->to)) != NULL) {
octstr_destroy(x->rcpt);
gw_free(x);
}
@ -802,28 +817,91 @@ static int free_envelope(MmsEnvelope *e, int removefromqueue)
octstr_destroy(e->url2);
http_destroy_headers(e->hdrs);
if (removefromqueue) {
char fname[2*QFNAMEMAX];
snprintf(fname, -1 + sizeof fname, "%s/%s%s", e->qf.dir, e->qf.subdir, e->qf.name);
unlink(fname);
e->qf.name[0] = MDF;
snprintf(fname, -1 + sizeof fname, "%s/%s%s", e->qf.dir, e->qf.subdir, e->qf.name);
unlink(fname);
}
close(e->qf.fd); /* close and unlock now that we have deleted it. */
gw_free(e);
gw_free(e);
return 0;
}
int mms_queue_free_env(MmsEnvelope *e)
MmsEnvelope *mms_queue_create_envelope(Octstr *from, List *to,
Octstr *subject,
Octstr *fromproxy, Octstr *viaproxy,
time_t senddate, time_t expirydate,
Octstr *token,
Octstr *vaspid, Octstr *vasid,
Octstr *url1, Octstr *url2,
List *hdrs,
int dlr,
Octstr *mmscname,
MmsMsg *m,
char *xqfname,
int extra_space)
{
MmsEnvelope *e;
Octstr *msgid = NULL, *ms = NULL, *r, *xfrom;
int mtype = -1, i, n;
if (m) {
mtype = mms_messagetype(m);
/* Get MsgID, Fixup if not there and needed. */
if ((msgid = mms_get_header_value(m, octstr_imm("Message-ID"))) == NULL &&
xqfname) {
msgid = mms_maketransid(xqfname, mmscname);
if (mtype == MMS_MSGTYPE_SEND_REQ)
mms_replace_header_value(m, "Message-ID", octstr_get_cstr(msgid));
}
ms = mms_tobinary(m);
}
xfrom = copy_and_clean_address(from);
e = gw_malloc(extra_space + sizeof *e); /* Make envelope, clear it. */
memset(e, 0, sizeof *e);
e->qfs_data = (void *)(e+1); /* pointer to data object for module. */
e->msgtype = mtype;
e->from = xfrom;
e->created = time(NULL);
e->sendt = senddate;
e->expiryt = expirydate ? expirydate : time(NULL) + DEFAULT_EXPIRE;
e->lasttry = 0;
e->attempts = 0;
e->lastaccess = 0;
e->fromproxy = fromproxy ? octstr_duplicate(fromproxy) : NULL;
e->viaproxy = viaproxy ? octstr_duplicate(viaproxy) : NULL;
e->subject = subject ? octstr_duplicate(subject) : NULL;
e->to = gwlist_create();
e->msize = ms ? octstr_len(ms) : 0;
e->msgId = msgid;
e->token = token ? octstr_duplicate(token) : NULL;
e->vaspid = vaspid ? octstr_duplicate(vaspid) : NULL;
e->vasid = vasid ? octstr_duplicate(vasid) : NULL;
e->url1 = url1 ? octstr_duplicate(url1) : NULL;
e->url2 = url2 ? octstr_duplicate(url2) : NULL;
e->hdrs = hdrs ? http_header_duplicate(hdrs) : NULL;
e->dlr = dlr;
if (xqfname)
strncpy(e->xqfname, xqfname, sizeof e->xqfname);
for (i = 0, n = to ? gwlist_len(to) : 0; i<n; i++)
if ((r = gwlist_get(to, i)) != NULL &&
(r = copy_and_clean_address(r)) != NULL) {
MmsEnvelopeTo *t = gw_malloc(sizeof *t);
t->rcpt = r;
t->process = 1;
gwlist_append(e->to, t);
}
octstr_destroy(ms);
return e;
}
static int mms_queue_free_env(MmsEnvelope *e)
{
return free_envelope(e, 0);
}
int mms_queue_update(MmsEnvelope *e)
static int mms_queue_update(MmsEnvelope *e)
{
int i, n = (e && e->to) ? gwlist_len(e->to) : 0;
int hasrcpt = 0;
@ -847,23 +925,23 @@ int mms_queue_update(MmsEnvelope *e)
return writeenvelope(e, 0);
}
int mms_queue_replacedata(MmsEnvelope *e, MmsMsg *m)
static int mms_queue_replacedata(MmsEnvelope *e, MmsMsg *m)
{
Octstr *tfname;
Octstr *ms;
struct qfile_t *qfs;
int ret = 0;
if (!e) return -1;
tfname = octstr_format(".%c%s.%ld.%d", MDF, e->qf.name + 1, time(NULL), random());
ms = mms_tobinary(m);
if (writemmsdata(ms, octstr_get_cstr(tfname), e->qf.subdir, e->qf.dir) < 0)
qfs = e->qfs_data;
tfname = octstr_format(".%c%s.%ld.%d", MDF, qfs->name + 1, time(NULL), random());
ms = mms_tobinary(m);
if (writemmsdata(ms, octstr_get_cstr(tfname), qfs->subdir, qfs->dir) < 0)
ret = -1;
else {
Octstr *fname = octstr_format("%s/%s%c%s", e->qf.dir, e->qf.subdir, MDF, e->qf.name + 1);
Octstr *tmpf = octstr_format("%s/%s%S", e->qf.dir, e->qf.subdir, tfname);
Octstr *fname = octstr_format("%s/%s%c%s", qfs->dir, qfs->subdir, MDF, qfs->name + 1);
Octstr *tmpf = octstr_format("%s/%s%S", qfs->dir, qfs->subdir, tfname);
if (rename(octstr_get_cstr(tmpf), octstr_get_cstr(fname)) < 0) {
error(0, "mms_replacedata: Failed to write data file %s: error = %s\n",
octstr_get_cstr(tmpf), strerror(errno));
@ -879,27 +957,28 @@ int mms_queue_replacedata(MmsEnvelope *e, MmsMsg *m)
return ret;
}
MmsMsg *mms_queue_getdata(MmsEnvelope *e)
static MmsMsg *mms_queue_getdata(MmsEnvelope *e)
{
Octstr *fname;
Octstr *ms;
MmsMsg *m;
struct qfile_t *qfs;
if (!e) return NULL;
fname = octstr_format("%s/%s%c%s", e->qf.dir, e->qf.subdir, MDF, e->qf.name + 1);
qfs = e->qfs_data;
fname = octstr_format("%s/%s%c%s", qfs->dir, qfs->subdir, MDF, qfs->name + 1);
ms = octstr_read_file(octstr_get_cstr(fname));
if (!ms) {
error(0, "mms_queue_getdata: Failed to load data file for queue entry %s in %s",
e->qf.name, e->qf.dir);
qfs->name, qfs->dir);
octstr_destroy(fname);
return NULL;
}
m = mms_frombinary(ms, octstr_imm(""));
if (!m) {
error(0, "mms_queue_getdata: Failed to decode data file for queue entry %s in %s",
e->qf.name, e->qf.dir);
qfs->name, qfs->dir);
octstr_destroy(fname);
return NULL;
}
@ -1012,9 +1091,9 @@ static int run_dir(char *topdir, char *dir, struct Qthread_t *tlist, int num_thr
return ret;
}
void mms_queue_run(char *dir,
int (*deliver)(MmsEnvelope *),
double sleepsecs, int num_threads, int *rstop)
static void mms_queue_run(char *dir,
int (*deliver)(MmsEnvelope *),
double sleepsecs, int num_threads, int *rstop)
{
struct Qthread_t *tlist;
int i, qstop = 0;
@ -1072,4 +1151,22 @@ void mms_queue_run(char *dir,
return;
}
static int mms_cleanup_queue_module(void)
{
return 0;
}
/* export functions... */
MmsQueueHandlerFuncs qfuncs = {
mms_init_queue_module,
mms_init_queue_dir,
mms_cleanup_queue_module,
mms_queue_add,
mms_queue_update,
mms_queue_getdata,
mms_queue_replacedata,
mms_queue_readenvelope,
mms_queue_run,
mms_queue_free_env
};

View File

@ -66,49 +66,53 @@ typedef struct MmsEnvelope {
Octstr *viaproxy; /* Which proxy must we send this message through. */
void *_x; /* Generic storage field used by module clients. */
/* DO NOT MODIFY ANYTHING BEYOND THIS POINT. */
struct { /* Name of the queue file, pointer to it (locked). DO NOT USE THESE! */
char name[QFNAMEMAX]; /* Name of the file. */
char dir[QFNAMEMAX]; /* Directory in which file is .*/
char subdir[64]; /* and the sub-directory. */
char _pad[16];
int fd;
} qf;
void *qfs_data; /* Queue handler module structure -- allocated for you by queue_create
* function.
*/
char xqfname[64+QFNAMEMAX]; /* The full ID for the queue. Use this. */
} MmsEnvelope;
/* Given a queue directory, initialise it. Must be called at least once on each queue dir. */
int mms_init_queuedir(Octstr *qdir);
typedef struct MmsQueueHandlerFuncs {
/* Initialise queue module. Must be called at least once on each queue dir. */
int (*mms_init_queue_module)(Octstr *init_data);
/* initialise a queue directory. There can be multiple directories,
* upperlevel decides what a directory is.
* module returns a directory string.
*/
Octstr *(*mms_init_queue_dir)(char *qdir, int *error);
/* cleanup module, e.g. on exit. */
int (*mms_cleanup_queue_module)(void);
/*
* Add a message to the queue, returns 0 on success -1 otherwise (error is logged).
* 'to' is a list of Octstr * *.
* Returns a queue file name.
*/
extern Octstr *mms_queue_add(Octstr *from, List *to,
Octstr *subject,
Octstr *fromproxy, Octstr *viaproxy,
time_t senddate, time_t expirydate, MmsMsg *m,
Octstr *token,
Octstr *vaspid, Octstr *vasid,
Octstr *url1, Octstr *url2,
List *hdrs,
int dlr,
char *directory, Octstr *mmscname);
Octstr *(*mms_queue_add)(Octstr *from, List *to,
Octstr *subject,
Octstr *fromproxy, Octstr *viaproxy,
time_t senddate, time_t expirydate, MmsMsg *m,
Octstr *token,
Octstr *vaspid, Octstr *vasid,
Octstr *url1, Octstr *url2,
List *hdrs,
int dlr,
char *directory, Octstr *mmscname);
/*
* Update queue status. Returns -1 on error, 0 if queue is updated fine and
* envelope is still valid, 1 if envelope is no longer valid (no more recipients.)
*/
extern int mms_queue_update(MmsEnvelope *e);
int (*mms_queue_update)(MmsEnvelope *e);
/*
* Get the message associated with this queue entry.
*/
extern MmsMsg *mms_queue_getdata(MmsEnvelope *e);
MmsMsg *(*mms_queue_getdata)(MmsEnvelope *e);
/* Replace data for this queue item -- used by mm7 interface. */
int mms_queue_replacedata(MmsEnvelope *e, MmsMsg *m);
int (*mms_queue_replacedata)(MmsEnvelope *e, MmsMsg *m);
/*
* Reads queue, returns up to lim queue entries that are ready for processing. send 0 for no limit.
@ -122,7 +126,7 @@ int mms_queue_replacedata(MmsEnvelope *e, MmsMsg *m);
* - If should block is 1, then does a potentially blocking attempt to lock the file.
*/
MmsEnvelope *mms_queue_readenvelope(char *qf, char *dir, int shouldblock);
MmsEnvelope *(*mms_queue_readenvelope)(char *qf, char *dir, int shouldblock);
/*
* Run the queue in the given directory. For each envelope that is due for sending, call
@ -131,12 +135,39 @@ MmsEnvelope *mms_queue_readenvelope(char *qf, char *dir, int shouldblock);
* if deliver() returns 1, it has deleted envelope.
* Also if rstop becomes true, queue run must stop.
*/
void mms_queue_run(char *dir,
int (*deliver)(MmsEnvelope *),
double sleepsecs,
int num_threads,
int *rstop);
void (*mms_queue_run)(char *dir,
int (*deliver)(MmsEnvelope *),
double sleepsecs,
int num_threads,
int *rstop);
/* Get rid of memory used by this. Typically does internal cleanup then calls
* the general structure free-ing function below.
*/
int (*mms_queue_free_env)(MmsEnvelope *e);
} MmsQueueHandlerFuncs;
/* Module must export this symbol: */
extern MmsQueueHandlerFuncs qfuncs;
/* Utility functions, generally defined. */
/* Creates the queue envelope object, returns it. */
MmsEnvelope *mms_queue_create_envelope(Octstr *from, List *to,
Octstr *subject,
Octstr *fromproxy, Octstr *viaproxy,
time_t senddate, time_t expirydate,
Octstr *token,
Octstr *vaspid, Octstr *vasid,
Octstr *url1, Octstr *url2,
List *hdrs,
int dlr,
Octstr *mmscname,
MmsMsg *m,
char *xqfname,
int extra_space);
/* Get rid of memory used by this. */
extern int mms_queue_free_env(MmsEnvelope *e);
void mms_queue_free_envelope(MmsEnvelope *e);
/* utility function for 'cleaning up' addresses. */
Octstr *copy_and_clean_address(Octstr *addr);
#endif

View File

@ -201,7 +201,7 @@ static void mm7soap_receive(MmsHTTPClientInfo *h)
http_header_add(qh, "X-Mbuni-Timestamp", octstr_get_cstr(sx));
octstr_destroy(sx);
}
qf = mms_queue_add(from, to, subject,
qf = qfs->mms_queue_add(from, to, subject,
h->m->id, NULL,
delivert, expiryt, m, linkedid,
NULL, NULL,
@ -388,7 +388,7 @@ static void mm7eaif_receive(MmsHTTPClientInfo *h)
mms_remove_headers(m, "X-Mms-Sender-Visibility");
/* Save it, put message id in header, return. */
qf = mms_queue_add(hfrom, to, subject,
qf = qfs->mms_queue_add(hfrom, to, subject,
h->m->id, NULL, deliveryt, expiryt, m, NULL,
NULL, NULL,
NULL, NULL,
@ -790,7 +790,7 @@ static int sendMsg(MmsEnvelope *e)
MmsMsg *msg = NULL;
int i, n;
Octstr *otransid = e->hdrs ? http_header_value(e->hdrs, octstr_imm("X-Mbuni-TransactionID")) : NULL;
msg = mms_queue_getdata(e);
msg = qfs->mms_queue_getdata(e);
for (i = 0, n = gwlist_len(e->to); i<n; i++) {
int res = MMS_SEND_OK;
@ -862,7 +862,7 @@ static int sendMsg(MmsEnvelope *e)
octstr_destroy(new_msgid);
e->lasttry = tnow;
if (mms_queue_update(e) == 1) {
if (qfs->mms_queue_update(e) == 1) {
e = NULL;
break; /* Queue entry gone. */
}
@ -879,8 +879,8 @@ static int sendMsg(MmsEnvelope *e)
e->attempts++; /* Update count of number of delivery attempts. */
e->sendt = e->lasttry + mmsbox_send_back_off * e->attempts;
if (mms_queue_update(e) != 1)
mms_queue_free_env(e);
if (qfs->mms_queue_update(e) != 1)
qfs->mms_queue_free_env(e);
}
return 1; /* always delete queue entry. */
@ -888,6 +888,6 @@ static int sendMsg(MmsEnvelope *e)
void mmsbox_outgoing_queue_runner(int *rstop)
{
mms_queue_run(octstr_get_cstr(outgoing_qdir),
qfs->mms_queue_run(octstr_get_cstr(outgoing_qdir),
sendMsg, queue_interval, maxthreads, rstop);
}

View File

@ -474,7 +474,7 @@ static int mmsbox_service_dispatch(MmsEnvelope *e)
gw_assert(e->msgtype == MMS_MSGTYPE_SEND_REQ ||
e->msgtype == MMS_MSGTYPE_RETRIEVE_CONF);
if ((msg = mms_queue_getdata(e)) == NULL) {
if ((msg = qfs->mms_queue_getdata(e)) == NULL) {
err = octstr_format("Failed to read message for queue entry %s!",
e->xqfname);
res = -1;
@ -530,8 +530,8 @@ done:
e->sendt = e->lasttry + mmsbox_send_back_off * e->attempts;
}
if (mms_queue_update(e) != 1)
mms_queue_free_env(e);
if (qfs->mms_queue_update(e) != 1)
qfs->mms_queue_free_env(e);
octstr_destroy(err);
octstr_destroy(keyword);
@ -588,7 +588,7 @@ int main(int argc, char *argv[])
/* Start out-going queue thread. */
qthread = gwthread_create((gwthread_func_t *)mmsbox_outgoing_queue_runner, &rstop);
mms_queue_run(octstr_get_cstr(incoming_qdir),
qfs->mms_queue_run(octstr_get_cstr(incoming_qdir),
mmsbox_service_dispatch,
queue_interval, maxthreads, &rstop);
@ -1091,7 +1091,7 @@ static int make_and_queue_msg(Octstr *data, Octstr *ctype, List *reply_headers,
}
/* Write to queue. */
x = mms_queue_add(from, xto, subject,
x = qfs->mms_queue_add(from, xto, subject,
e ? e->fromproxy : NULL,
mmc,
time(NULL), expiryt, m, NULL,

View File

@ -33,6 +33,7 @@ long maxthreads = 0;
double queue_interval = -1;
Octstr *unified_prefix;
int mt_multipart = 0;
MmsQueueHandlerFuncs *qfs; /* queue functions. */
struct SendMmsPortInfo sendmms_port;
@ -82,7 +83,7 @@ int mms_load_mmsbox_settings(mCfg *cfg, gwthread_func_t *mmsc_handler_func)
Octstr *gdir, *s;
int send_port_ssl = 0;
List *l;
int i, n;
int i, n, xx;
void *catchall = NULL;
if (grp == NULL)
@ -105,19 +106,29 @@ int mms_load_mmsbox_settings(mCfg *cfg, gwthread_func_t *mmsc_handler_func)
panic(0, "Failed to create MMSBox storage directory: %s - %s!",
octstr_get_cstr(gdir), strerror(errno));
incoming_qdir = octstr_format("%S/mmsbox_incoming", gdir);
outgoing_qdir = octstr_format("%S/mmsbox_outgoing", gdir);
dlr_dir = octstr_format("%S/mmsbox_dlr", gdir);
if (mms_init_queuedir(incoming_qdir) < 0)
panic(0, "Failed to initialise incoming queue directory: %s - %s!",
if ((qfs = load_module(grp, "queue-manager-module", "qfuncs")) == NULL) {
qfs = &qfuncs; /* default queue handler. */
qfs->mms_init_queue_module(gdir);
} else {
Octstr *s = _mms_cfg_getx(grp, octstr_imm("queue-module-init-data"));
if (qfs->mms_init_queue_module(s) != 0)
panic(0, "failed to initialise queue module, with data: %s",
octstr_get_cstr(s));
octstr_destroy(s);
}
if ((incoming_qdir = qfs->mms_init_queue_dir("mmsbox_incoming", &xx)) == NULL ||
xx != 0)
panic(0, "Failed to initialise incoming mmsbox queue directory: %s - %s!",
octstr_get_cstr(incoming_qdir), strerror(errno));
if (mms_init_queuedir(outgoing_qdir) < 0)
panic(0, "Failed to initialise outgoing queue directory: %s - %s!",
if ((outgoing_qdir = qfs->mms_init_queue_dir("mmsbox_outgoing", &xx)) == NULL ||
xx != 0)
panic(0, "Failed to initialise outgoing mmsbox queue directory: %s - %s!",
octstr_get_cstr(outgoing_qdir), strerror(errno));
if (mms_init_queuedir(dlr_dir) < 0)
if ((dlr_dir = qfuncs.mms_init_queue_dir("mmsbox_dlr", &xx)) == NULL ||
xx != 0) /* XXX still uses old-style file storage. */
panic(0, "Failed to initialise dlr storage directory: %s - %s!",
octstr_get_cstr(dlr_dir), strerror(errno));

View File

@ -14,6 +14,8 @@
#define __MMSBOX_CFG_INCLUDED__
#include "mms_util.h"
#include "mmsbox_mt_filter.h"
#include "mms_queue.h"
typedef struct MmscGrp {
Octstr *id; /* MMSC id (for logging). */
Octstr *group_id; /* GROUP MMSC id (used for qf). */
@ -87,6 +89,7 @@ extern struct SendMmsPortInfo {
} sendmms_port;
extern struct MmsBoxMTfilter *mt_filter;
extern MmsQueueHandlerFuncs *qfs;
extern int mt_multipart;
extern int mms_load_mmsbox_settings(mCfg *cfg, gwthread_func_t *mmsc_handler_func);
extern MmscGrp *get_handler_mmc(Octstr *id, Octstr *to);

View File

@ -83,7 +83,7 @@ MmscSettings *mms_load_mmsc_settings(mCfg *cfg, List **proxyrelays)
long port = -1;
Octstr *from, *user, *pass;
Octstr *qdir = NULL;
int i, n;
int i, n, xx;
memset(m, 0, sizeof *m);
@ -122,18 +122,28 @@ MmscSettings *mms_load_mmsc_settings(mCfg *cfg, List **proxyrelays)
panic(0, "Failed to create queue directory: %s - %s!",
octstr_get_cstr(qdir), strerror(errno));
m->global_queuedir = octstr_format("%S/global", qdir);
m->mm1_queuedir = octstr_format("%S/mm1", qdir);
if (mms_init_queuedir(m->mm1_queuedir) < 0)
panic(0, "Failed to initialise local queue directory: %s - %s!",
octstr_get_cstr(m->mm1_queuedir), strerror(errno));
else if (mms_init_queuedir(m->global_queuedir) < 0)
if ((m->qfs = load_module(grp, "queue-manager-module", "qfuncs", NULL)) == NULL) {
m->qfs = &qfuncs; /* default queue handler. */
m->qfs->mms_init_queue_module(qdir);
} else {
Octstr *s = _mms_cfg_getx(grp, octstr_imm("queue-module-init-data"));
if (m->qfs->mms_init_queue_module(s) != 0)
panic(0, "failed to initialise queue module, with data: %s",
octstr_get_cstr(s));
octstr_destroy(s);
}
if ((m->global_queuedir = m->qfs->mms_init_queue_dir("global", &xx)) == NULL ||
xx != 0)
panic(0, "Failed to initialise global queue directory: %s - %s!",
octstr_get_cstr(m->global_queuedir), strerror(errno));
m->mmbox_rootdir = octstr_format("%S/mmbox", qdir);
if ((m->mm1_queuedir = m->qfs->mms_init_queue_dir("mm1", &xx)) == NULL ||
xx != 0)
panic(0, "Failed to initialise local queue directory: %s - %s!",
octstr_get_cstr(m->mm1_queuedir), strerror(errno));
m->mmbox_rootdir = octstr_format("%S/mmbox", qdir);
if (mmbox_root_init(octstr_get_cstr(m->mmbox_rootdir)) != 0)
panic(0, "Failed to initialise mmbox root directory, error: %s!",
strerror(errno));

View File

@ -16,6 +16,7 @@
#include "mms_resolve_shell.h"
#include "mms_billing_shell.h"
#include "mms_detokenize_shell.h"
#include "mms_queue.h"
typedef struct MmsProxyRelay {
Octstr *host;
@ -40,9 +41,12 @@ typedef struct MmscSettings {
Octstr *name, *hostname, *host_alias;
Octstr *unified_prefix, *local_prefix;
Octstr *sendmail;
Octstr *global_queuedir, *mm1_queuedir;
Octstr *mmbox_rootdir;
MmsQueueHandlerFuncs *qfs;
Octstr *ua_profile_cache_dir;
long maxthreads;

View File

@ -270,7 +270,7 @@ int main(int argc, char *argv[])
else
dlr = 0;
qf = mms_queue_add(xfrom, lto, NULL, xproxy, NULL,
qf = settings->qfs->mms_queue_add(xfrom, lto, NULL, xproxy, NULL,
0, time(NULL) + settings->default_msgexpiry, msg, NULL,
NULL, NULL,
NULL, NULL,
@ -311,7 +311,7 @@ int main(int argc, char *argv[])
octstr_format_append(xto, "/TYPE=PLMN");
gwlist_append(lto, xto);
qf = mms_queue_add(xfrom, lto, NULL,
qf = settings->qfs->mms_queue_add(xfrom, lto, NULL,
xproxy, NULL,
0, time(NULL) + settings->default_msgexpiry, msg, NULL,
NULL, NULL,
@ -343,7 +343,7 @@ int main(int argc, char *argv[])
octstr_format_append(xto, "/TYPE=PLMN");
gwlist_append(lto, xto);
qf = mms_queue_add(xfrom, lto, NULL,
qf = settings->qfs->mms_queue_add(xfrom, lto, NULL,
xproxy, NULL,
0, time(NULL) + settings->default_msgexpiry, msg, NULL,
NULL, NULL,
@ -386,7 +386,7 @@ int main(int argc, char *argv[])
strip_quotes(qf);
octstr_strip_blanks(o_to);
strip_quotes(o_to);
e = mms_queue_readenvelope(octstr_get_cstr(qf),
e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf),
octstr_get_cstr(settings->global_queuedir),
1);
if (!e)
@ -430,8 +430,8 @@ int main(int argc, char *argv[])
rstatus ? octstr_get_cstr(rstatus) : "",
processed ? "Sender number matched in queue file" : "Sender number not matched in queue file");
if (mms_queue_update(e) != 1)
mms_queue_free_env(e);
if (settings->qfs->mms_queue_update(e) != 1)
settings->qfs->mms_queue_free_env(e);
}
} else

View File

@ -101,7 +101,7 @@ static int sendMsg(MmsEnvelope *e)
}
if (amt >= -1)
if (mms_queue_update(e) == 1) /* Write queue just in case we crash. */
if (settings->qfs->mms_queue_update(e) == 1) /* Write queue just in case we crash. */
e = NULL;
if (e == NULL ||
@ -111,7 +111,7 @@ static int sendMsg(MmsEnvelope *e)
msg = mms_queue_getdata(e);
msg = settings->qfs->mms_queue_getdata(e);
#if 0
if (msg) mms_msgdump(msg,1);
#endif
@ -315,7 +315,7 @@ static int sendMsg(MmsEnvelope *e)
gwlist_append(l, octstr_duplicate(e->from));
/* Add to queue, switch via proxy to be from proxy. */
qfs = mms_queue_add(to->rcpt, l,
qfs = settings->qfs->mms_queue_add(to->rcpt, l,
err, NULL, e->fromproxy,
tnow, tnow+settings->default_msgexpiry, m, NULL,
NULL, NULL,
@ -355,7 +355,7 @@ static int sendMsg(MmsEnvelope *e)
/* Update queue entry so that we know which ones have been processed. */
e->lasttry = tnow;
if (mms_queue_update(e) == 1) {
if (settings->qfs->mms_queue_update(e) == 1) {
e = NULL;
break; /* Queue entry gone. */
}
@ -370,8 +370,8 @@ static int sendMsg(MmsEnvelope *e)
e->attempts++; /* Update count of number of delivery attempts. */
e->sendt = e->lasttry + settings->send_back_off * e->attempts;
if (mms_queue_update(e) != 1)
mms_queue_free_env(e);
if (settings->qfs->mms_queue_update(e) != 1)
settings->qfs->mms_queue_free_env(e);
}
return 1; /* Always deletes the queue entry. */
}
@ -404,7 +404,7 @@ void mbuni_global_queue_runner(int *rstop)
gwthread_create(cdr_thread, NULL);
mms_queue_run(qdir, sendMsg, settings->queue_interval, settings->maxthreads, rstop);
settings->qfs->mms_queue_run(qdir, sendMsg, settings->queue_interval, settings->maxthreads, rstop);
/* When it ends, wait a little for other stuff to stop... */
sleep(2);
gwlist_remove_producer(cdr_list); /* Stop CDR thread. */
@ -452,7 +452,7 @@ int mms_sendtomobile(Octstr *from, Octstr *to,
x = octstr_create(tokenstr);
if (m)
ret = mms_queue_add(from, l, subject, fromproxy, NULL, 0, expires, m,
ret = settings->qfs->mms_queue_add(from, l, subject, fromproxy, NULL, 0, expires, m,
x, NULL, NULL,
NULL, NULL,
NULL,

View File

@ -40,7 +40,7 @@ static MmsEnvelope *update_env_success(MmsEnvelope *env, MmsEnvelopeTo *xto)
env->sendt = env->lasttry + settings->send_back_off * env->attempts;
}
if (mms_queue_update(env) == 1)
if (settings->qfs->mms_queue_update(env) == 1)
env = NULL;
return env;
}
@ -52,7 +52,7 @@ static MmsEnvelope *update_env_failed(MmsEnvelope *env)
env->sendt = tnow + settings->send_back_off;
env->lasttry = tnow;
if (mms_queue_update(env) == 1)
if (settings->qfs->mms_queue_update(env) == 1)
env = NULL;
}
return env;
@ -142,7 +142,7 @@ static void start_push(Octstr *rcpt_to, int isphonenum, MmsEnvelope *e, MmsMsg *
}
octstr_destroy(addr);
if (e)
mms_queue_free_env(e);
settings->qfs->mms_queue_free_env(e);
}
done:
if (to) octstr_destroy(to);
@ -218,7 +218,7 @@ static int receive_push_reply(HTTPCaller *caller)
env = update_env_failed(env);
push_free_env:
if (env && env != &edummy)
mms_queue_free_env(env);
settings->qfs->mms_queue_free_env(env);
}
return 0;
@ -244,7 +244,7 @@ static int sendNotify(MmsEnvelope *e)
e->sendt = e->expiryt + 3600*24*30*12;
info(0, "MM1: Message [ID: %s] fetched/touched at least once. Skipping",
e->xqfname);
return mms_queue_update(e);
return settings->qfs->mms_queue_update(e);
}
if (!xto) {
@ -253,7 +253,7 @@ static int sendNotify(MmsEnvelope *e)
return 0;
}
msg = mms_queue_getdata(e);
msg = settings->qfs->mms_queue_getdata(e);
to = octstr_duplicate(xto->rcpt);
expiryt = e->expiryt;
msgId = e->msgId ? octstr_duplicate(e->msgId) : NULL;
@ -326,10 +326,10 @@ static int sendNotify(MmsEnvelope *e)
e->lasttry = tnow;
e->sendt = e->lasttry + settings->send_back_off * (1 + e->attempts);
if (mms_queue_update(e) == 1)
if (settings->qfs->mms_queue_update(e) == 1)
e = NULL; /* Queue entry gone. */
else
mms_queue_free_env(e);
settings->qfs->mms_queue_free_env(e);
goto done;
} else if (send_ind == 0) { /* provisioned but does not support */
Octstr *s = octstr_format(octstr_get_cstr(settings->mms_notify_txt),
@ -357,10 +357,10 @@ static int sendNotify(MmsEnvelope *e)
err = octstr_imm("No MMS Ind support, sent SMS instead");
xto->process = 0; /* No more processing. */
if (mms_queue_update(e) == 1)
if (settings->qfs->mms_queue_update(e) == 1)
e = NULL;
else
mms_queue_free_env(e);
settings->qfs->mms_queue_free_env(e);
goto done;
}
@ -415,7 +415,7 @@ static int sendNotify(MmsEnvelope *e)
gwlist_append(l, from);
/* Add to queue, switch via proxy to be from proxy. */
res = mms_queue_add(to ? to : settings->system_user, l, err,
res = settings->qfs->mms_queue_add(to ? to : settings->system_user, l, err,
NULL, fromproxy,
tnow, tnow+settings->default_msgexpiry, m, NULL,
NULL, NULL,
@ -440,10 +440,10 @@ static int sendNotify(MmsEnvelope *e)
if (res == MMS_SEND_ERROR_FATAL) {
xto->process = 0; /* No more attempts to deliver, delete this. */
if (mms_queue_update(e) == 1)
if (settings->qfs->mms_queue_update(e) == 1)
e = NULL; /* Queue entry gone. */
else
mms_queue_free_env(e);
settings->qfs->mms_queue_free_env(e);
} /* Else queue will be updated/freed elsewhere. */
@ -477,7 +477,7 @@ void mbuni_mm1_queue_runner(int *rstop)
return;
}
mms_queue_run(octstr_get_cstr(settings->mm1_queuedir),
settings->qfs->mms_queue_run(octstr_get_cstr(settings->mm1_queuedir),
sendNotify, settings->queue_interval, settings->maxthreads, rstop);
sleep(2); /* Wait for it to die. */
http_caller_signal_shutdown(httpcaller);

View File

@ -268,11 +268,11 @@ void fetchmms_proxy(MmsHTTPClientInfo *h)
prof = mms_make_ua_profile(h->headers);
if (loc == MMS_LOC_MQUEUE) { /* where is the message? */
e = mms_queue_readenvelope(octstr_get_cstr(qf),
e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf),
octstr_get_cstr(settings->mm1_queuedir), 1);
if (!e ||
(m = mms_queue_getdata(e)) == NULL) {
(m = settings->qfs->mms_queue_getdata(e)) == NULL) {
error(0, "MMS Fetch interface: failed to find envelope/data %s for request url (%s) from %s (e=%s)!",
octstr_get_cstr(qf), octstr_get_cstr(h->url), octstr_get_cstr(h->ip),
(e)? "found" : "not found");
@ -385,7 +385,7 @@ void fetchmms_proxy(MmsHTTPClientInfo *h)
sprintf(tbuf, "%ld", time(NULL));
http_header_add(qh, "X-Mbuni-Timestamp", tbuf); /* record time of message. */
x = mms_queue_add(from, l, NULL, NULL, NULL, 0,
x = settings->qfs->mms_queue_add(from, l, NULL, NULL, NULL, 0,
time(NULL) + settings->default_msgexpiry, mrpt, NULL,
NULL, NULL,
NULL, NULL,
@ -404,7 +404,7 @@ void fetchmms_proxy(MmsHTTPClientInfo *h)
if (e) {
e->lastaccess = time(NULL); /* No more notifications requests. */
e->sendt = e->expiryt + 3600*24*30*12;
mms_queue_update(e);
settings->qfs->mms_queue_update(e);
}
http_header_add(rh, "Content-Type", "application/vnd.wap.mms-message");
@ -457,7 +457,7 @@ void fetchmms_proxy(MmsHTTPClientInfo *h)
http_destroy_headers(rh);
mms_queue_free_env(e);
settings->qfs->mms_queue_free_env(e);
octstr_destroy(s);
mms_destroy(m);
@ -594,7 +594,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
dlr = 1;
else
dlr = 0;
qf = mms_queue_add(from, to, subject,
qf = settings->qfs->mms_queue_add(from, to, subject,
NULL, NULL, deliveryt, expiryt, m, NULL,
NULL, NULL,
NULL, NULL,
@ -688,11 +688,11 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
}
if (mloc == MMS_LOC_MQUEUE) { /* where is the message? */
e = mms_queue_readenvelope(octstr_get_cstr(qf),
e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf),
octstr_get_cstr(settings->mm1_queuedir), 1);
if (!e ||
(mfwd = mms_queue_getdata(e)) == NULL) {
(mfwd = settings->qfs->mms_queue_getdata(e)) == NULL) {
error(0,
"MMS Send interface: failed to find envelope/data %s for forward url "
"(%s) from %s (e=%s)!",
@ -774,7 +774,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
else
dlr = 0;
/* Message to forward is now ready, write it to queue. */
qf2 = mms_queue_add(from, to, subject,
qf2 = settings->qfs->mms_queue_add(from, to, subject,
NULL, NULL, deliveryt, expiryt, mfwd, NULL,
NULL, NULL,
NULL, NULL,
@ -863,7 +863,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
List *l = gwlist_create();
gwlist_append(l, pfrom);
x = mms_queue_add(from, l, NULL, NULL, NULL, 0,
x = settings->qfs->mms_queue_add(from, l, NULL, NULL, NULL, 0,
time(NULL) + settings->default_msgexpiry,
mrep, NULL,
NULL, NULL,
@ -901,8 +901,8 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
if (e) { /* Update the message queue and go. */
e->lastaccess = time(NULL);
if (mms_queue_update(e) != 1) /* Should be freed. */
mms_queue_free_env(e);
if (settings->qfs->mms_queue_update(e) != 1) /* Should be freed. */
settings->qfs->mms_queue_free_env(e);
e = NULL;
}
@ -931,7 +931,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
Octstr *allow_report = mms_get_header_value(m, octstr_imm("X-Mms-Report-Allowed"));
Octstr *qf = mms_getqf_fromtransid(transid);
MmsEnvelope *e = mms_queue_readenvelope(octstr_get_cstr(qf),
MmsEnvelope *e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf),
octstr_get_cstr(settings->mm1_queuedir), 1);
Octstr *status;
@ -974,7 +974,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
mrpt = mms_deliveryreport(e->msgId, h->client_addr, time(NULL), status);
gwlist_append(l, octstr_duplicate(e->from));
x = mms_queue_add(from, l, NULL, NULL, NULL, 0,
x = settings->qfs->mms_queue_add(from, l, NULL, NULL, NULL, 0,
time(NULL) + settings->default_msgexpiry, mrpt, NULL,
NULL, NULL,
NULL, NULL,
@ -995,8 +995,8 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
h->ua, NULL);
if (e &&
mms_queue_update(e) != 1) /* Should be freed. */
mms_queue_free_env(e);
settings->qfs->mms_queue_update(e) != 1) /* Should be freed. */
settings->qfs->mms_queue_free_env(e);
octstr_destroy(qf);
octstr_destroy(transid);
@ -1024,7 +1024,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
mms_collect_envdata_from_msgheaders(mh, &to, NULL, NULL, NULL, NULL,
settings->default_msgexpiry);
x = mms_queue_add(from, to, NULL, NULL, NULL, time(NULL),
x = settings->qfs->mms_queue_add(from, to, NULL, NULL, NULL, time(NULL),
time(NULL) + settings->default_msgexpiry,
m, NULL,
NULL, NULL,
@ -1087,11 +1087,11 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
}
if (mloc == MMS_LOC_MQUEUE) { /* where is the message? */
e = mms_queue_readenvelope(octstr_get_cstr(qf),
e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf),
octstr_get_cstr(settings->mm1_queuedir), 1);
if (!e ||
(mstore = mms_queue_getdata(e)) == NULL) {
(mstore = settings->qfs->mms_queue_getdata(e)) == NULL) {
error(0,
"MMS Send interface: failed to find envelope/data %s for store url "
"(%s) from %s (e=%s)!",
@ -1150,8 +1150,8 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
if (e) { /* Update the message queue and go. */
e->lastaccess = time(NULL);
if (mms_queue_update(e) != 1) /* Should be freed. */
mms_queue_free_env(e);
if (settings->qfs->mms_queue_update(e) != 1) /* Should be freed. */
settings->qfs->mms_queue_free_env(e);
e = NULL;
}
@ -1277,7 +1277,7 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
rs = octstr_format("%dError-permanent-message-not-found", i);
} else if (mloc == MMS_LOC_MQUEUE) {
MmsEnvelope *e = mms_queue_readenvelope(octstr_get_cstr(qf),
MmsEnvelope *e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf),
octstr_get_cstr(settings->mm1_queuedir), 1);
if (!e)
rs = octstr_format("%dError-permanent-message-not-found", i);
@ -1286,8 +1286,8 @@ static void sendmms_proxy(MmsHTTPClientInfo *h)
MmsEnvelopeTo *x = gwlist_get(e->to,j);
if (x) x->process = 0;
}
if (mms_queue_update(e) != 1) /* Should be freed. */
mms_queue_free_env(e);
if (settings->qfs->mms_queue_update(e) != 1) /* Should be freed. */
settings->qfs->mms_queue_free_env(e);
rs = octstr_format("%dOk", i);
}
} else if (mloc == MMS_LOC_MMBOX) {
@ -1592,7 +1592,7 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h)
if (expiryt < 0)
expiryt = time(NULL) + settings->default_msgexpiry;
mms_remove_headers(m, "Message-ID"); /* cannot be found here. */
qf = mms_queue_add(from ? from : sender, to, subject,
qf = settings->qfs->mms_queue_add(from ? from : sender, to, subject,
NULL, NULL,
delivert, expiryt, m, NULL,
h->vasp->id, vasid,
@ -1616,7 +1616,7 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h)
case MM7_TAG_ReplaceReq:
msgid = mm7_soap_header_value(mreq, octstr_imm("MessageID"));
if (msgid && (qf = mms_getqf_fromtransid(msgid)) != NULL &&
(e = mms_queue_readenvelope(octstr_get_cstr(qf),
(e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf),
octstr_get_cstr(settings->global_queuedir),
1)) != NULL) {
if (!e->vaspid ||
@ -1626,14 +1626,14 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h)
" but vaspid id=%s does not match!",
octstr_get_cstr(msgid), octstr_get_cstr(h->vasp->id));
} else { /* get orig message, change headers of new, replace old. */
MmsMsg *old = mms_queue_getdata(e);
MmsMsg *old = settings->qfs->mms_queue_getdata(e);
MmsMsg *new = mm7_soap_to_mmsmsg(mreq, sender);
List *hh = mms_message_headers(old);
Octstr *s;
if (new) {
mms_add_missing_headers(new, hh);
if (mms_queue_replacedata(e, new) < 0) {
if (settings->qfs->mms_queue_replacedata(e, new) < 0) {
status = 3000;
error(0, "MMS Proxy(MM7): ReplaceReq: Failed to change data, "
"id=%s, vasp=%s!",
@ -1652,8 +1652,8 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h)
octstr_destroy(s);
}
if (mms_queue_update(e) != 1)
mms_queue_free_env(e);
if (settings->qfs->mms_queue_update(e) != 1)
settings->qfs->mms_queue_free_env(e);
e = NULL;
mms_log("Replace",
sender, NULL, -1, msgid, h->vasp->id, NULL, "MM7", h->ua, NULL);
@ -1675,7 +1675,7 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h)
case MM7_TAG_CancelReq:
msgid = mm7_soap_header_value(mreq, octstr_imm("MessageID"));
if (msgid && (qf = mms_getqf_fromtransid(msgid)) != NULL &&
(e = mms_queue_readenvelope(octstr_get_cstr(qf),
(e = settings->qfs->mms_queue_readenvelope(octstr_get_cstr(qf),
octstr_get_cstr(settings->global_queuedir),
1)) != NULL) {
if (!e->vaspid ||
@ -1690,7 +1690,7 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h)
MmsEnvelopeTo *xto = gwlist_get(e->to,i);
xto->process = 0;
}
mms_queue_update(e); /* Will clear it. */
settings->qfs->mms_queue_update(e); /* Will clear it. */
e = NULL;
mms_log("Cancel",
sender, NULL, -1, msgid, h->vasp->id, NULL, "MM7", h->ua, NULL);
@ -1721,7 +1721,7 @@ static void mm7soap_dispatch(MmsHTTPClientInfo *h)
mresp ? "ok" : "(null)",
reply_body ? "ok" : "(null)");
mms_queue_free_env(e);
settings->qfs->mms_queue_free_env(e);
octstr_destroy(sender);
octstr_destroy(from);
octstr_destroy(subject);
@ -1827,7 +1827,7 @@ static void mm7eaif_dispatch(MmsHTTPClientInfo *h)
mms_remove_headers(m, "X-Mms-Sender-Visibility");
/* Save it, make msgid, put message id in header, return. */
qf = mms_queue_add(hfrom, to, subject,
qf = settings->qfs->mms_queue_add(hfrom, to, subject,
NULL, NULL, deliveryt, expiryt, m, NULL,
NULL, NULL,
NULL, NULL,

View File

@ -150,7 +150,7 @@ int main(int argc, char *argv[])
http_header_add(h, "X-Mms-Tool", "mmssend");
http_header_add(h, "X-Mms-CalledFrom", "Terminal");
#endif
s = mms_queue_add(from, to, NULL, NULL, NULL, time(NULL),
s = settings->qfs->mms_queue_add(from, to, NULL, NULL, NULL, time(NULL),
time(NULL) + settings->default_msgexpiry, m,
NULL,
NULL, NULL,