From bc1a24200edf642c39138d836605032b7d18a898 Mon Sep 17 00:00:00 2001 From: bagyenda <> Date: Fri, 14 Nov 2008 08:52:15 +0000 Subject: [PATCH] PGSQL queue module changes: optionally move message storage to file --- mbuni/ChangeLog | 2 + mbuni/extras/pgsql-queue/mms_pgsql_queue.c | 347 ++++++++++++++++----- mbuni/mmlib/mms_queue.c | 17 +- mbuni/mmlib/mms_queue.h | 4 +- mbuni/mmsbox/mmsbox_cfg.c | 14 +- mbuni/mmsc/mmsc_cfg.c | 7 +- 6 files changed, 304 insertions(+), 87 deletions(-) diff --git a/mbuni/ChangeLog b/mbuni/ChangeLog index a435b83..25e1d52 100644 --- a/mbuni/ChangeLog +++ b/mbuni/ChangeLog @@ -1,3 +1,5 @@ +2008-11-14 P. A. Bagyenda + * PSQL Queue module changes: optionally move message storage to file to improve performance 2008-11-12 P. A. Bagyenda * Improved DLR delivery to external URL (retries) in mmsbox 2008-11-04 P. A. Bagyenda diff --git a/mbuni/extras/pgsql-queue/mms_pgsql_queue.c b/mbuni/extras/pgsql-queue/mms_pgsql_queue.c index 0bcd7d1..5508c13 100644 --- a/mbuni/extras/pgsql-queue/mms_pgsql_queue.c +++ b/mbuni/extras/pgsql-queue/mms_pgsql_queue.c @@ -1,6 +1,13 @@ -#include +#include +#include +#include #include +#include +#include +#include +#include #include + #include #include "mms_queue.h" @@ -22,15 +29,38 @@ #define DEFAULT_CONNECTIONS 5 #define MIN_QRUN_INTERVAL 2 /* we don't want to hurt DB. */ #define MQF 'q' +#define MIN_PG_VERSION 80200 /* v8.2 */ static List *free_conns; static int pool_size; -static int pgq_init_module(Octstr *conninfo, int max_threads) +static char topdir[512]; +/* Control flags: Read from conninfo */ +static int external_storage = 1; +static int archive_msgs = 1; +static const char *tdirs[] = {"active", "archive", NULL}; + +static int pgq_init_module(Octstr *conninfo, char *xtopdir, int max_threads) { long i, n; - + Octstr *xcinfo; gw_assert(conninfo); + int ver = -1; + xcinfo = octstr_duplicate(conninfo); + + /* Now look for flags */ + if ((i = octstr_search_char(xcinfo, ';', 0)) > 0) { + Octstr *x = octstr_copy(xcinfo, 0, i); + + octstr_delete(xcinfo, 0, i+1); + + if (octstr_case_search(x, octstr_imm("internal"), 0) >= 0) + external_storage = 0; + if (octstr_case_search(x, octstr_imm("no-archive"), 0) >= 0) + archive_msgs = 0; + + octstr_destroy(x); + } n = max_threads + 1; if (n <= 0) @@ -39,10 +69,18 @@ static int pgq_init_module(Octstr *conninfo, int max_threads) free_conns = gwlist_create(); gwlist_add_producer(free_conns); for (i = 0; i= v%d.%d", + MIN_PG_VERSION/10000, + (MIN_PG_VERSION/100) % 100); + } } else { mms_error(0, "pgsql_queue", NULL, "init: failed to connect to db: %s", PQerrorMessage(c)); @@ -51,6 +89,47 @@ static int pgq_init_module(Octstr *conninfo, int max_threads) } srand(time(NULL)); + + if (external_storage) { + char buf[512], fbuf[512]; + + int j; + sprintf(topdir, "%.192s/pgsql-queue-data", xtopdir); + + if (mkdir(topdir, + S_IRWXU|S_IRWXG) < 0 && + errno != EEXIST) + mms_error(0, "pgsql_queue", NULL, "init: Failed to create message storage directory [%s]: %s", + topdir, + strerror(errno)); + + + /* initialise a irectory structure for the messages */ + for (j = 0; tdirs[j]; j++) { + sprintf(buf, "%.256s/%s", topdir, tdirs[j]); + if (mkdir(buf, + S_IRWXU|S_IRWXG) < 0 && + errno != EEXIST) + mms_error(0, "pgsql_queue", NULL, + "init: Failed to create message storage directory [%s]: %s", + buf, + strerror(errno)); + else + for (i = 0; _TT[i]; i++) { /* initialise the top level only... */ + sprintf(fbuf, "%.270s/%c", buf, _TT[i]); + if (mkdir(fbuf, + S_IRWXU|S_IRWXG) < 0 && + errno != EEXIST) + mms_error(0, "pgsql_queue", NULL, + "init: Failed to create message storage directory [%s]: %s", + fbuf, + strerror(errno)); + + } + } + + } + octstr_destroy(xcinfo); return gwlist_len(free_conns) > 0 ? 0 : -1; } @@ -112,7 +191,8 @@ struct pgfile_t { PGconn *conn; char dir[256+1]; char _pad[4]; /* paranoia */ - int64_t qid; /* internal key into table (if any) */ + int64_t qid; /* internal key into table (if any) */ + char data_file[QFNAMEMAX*4+1]; /* location of the data file, if on file system */ }; @@ -120,10 +200,45 @@ static Octstr *pgq_init_queue_dir(char *qdir, int *error) { gw_assert(free_conns); if (error) *error = 0; + return octstr_create(qdir); /* just make a string out of it. nothing more to do. */ } +static int mk_data_file(int64_t qid, char *loc, char *dir, char out[]) +{ + char subdir[10]; + int i; + + if (topdir[0] == 0) + gw_panic(0, "Top dir not set! "); + + /* Copied largely from mms_queue.c */ + if ((i = random() % 3) == 0) /* toplevel. */ + subdir[0] = 0; + else if (i == 1) /* one in */ + sprintf(subdir, "%c/", _TT[random() % _TTSIZE]); + else { /* two in. */ + char csubdir[QFNAMEMAX*4+1]; + sprintf(subdir, "%c/%c%c/", + _TT[random() % _TTSIZE], + _TT[random() % _TTSIZE], + _TT[random() % _TTSIZE]); + + sprintf(csubdir, "%.128s/%.64s/%s", topdir, loc, subdir); + if (mkdir(csubdir, + S_IRWXU|S_IRWXG) < 0 && + errno != EEXIST) { + mms_error(0, "pgsql_queue", NULL, "Failed to create dir %s - %s!", + csubdir, strerror(errno)); + return -1; + } + } + + sprintf(out, "%.128s/%.64s/%s%s-%lld.mms", topdir, loc, subdir, dir, qid); + return 0; +} + static int pgq_free_envelope(MmsEnvelope *e, int removefromqueue) { int ret = 0; @@ -139,33 +254,64 @@ static int pgq_free_envelope(MmsEnvelope *e, int removefromqueue) char cmd[256]; /* copy to separate table. */ PGresult *res; - sprintf(cmd, "INSERT INTO archived_mms_messages SELECT " - " * from mms_messages WHERE id = %lld", qfs->qid); - res = PQexec(qfs->conn, cmd); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - PQclear(res); - ret = -1; - goto done; - } else - PQclear(res); - - sprintf(cmd, "INSERT INTO archived_mms_message_headers SELECT " - " * from mms_message_headers WHERE qid = %lld", qfs->qid); - res = PQexec(qfs->conn, cmd); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - PQclear(res); - ret = -1; - goto done; - } else - PQclear(res); - + if (archive_msgs) { + if (qfs->data_file[0]) { /* we need to move the file to archives. */ + char afile[512]; + FILE *f; + + afile[0] = 0; + if (mk_data_file(qfs->qid, "archive", qfs->dir, afile) < 0) + goto done; + else if ((f = fopen(afile, "w")) != NULL) { + Octstr *x = octstr_read_file(qfs->data_file); + + if (x) + octstr_print(f, x); + octstr_destroy(x); + fclose(f); + } else + mms_warning(0, "pgsql_queue", NULL, "Failed to archive MMS data file [%s]: %s", + qfs->data_file, strerror(errno)); + + sprintf(cmd, "INSERT INTO archived_mms_messages SELECT " + " id,qdir,qfname,sender,created,last_try,send_time," + "expire_date,num_attempts,'@%.128s' as data from mms_messages WHERE id = %lld", + afile, + qfs->qid); /* put in new file name */ + } else + sprintf(cmd, "INSERT INTO archived_mms_messages SELECT " + " * from mms_messages WHERE id = %lld", qfs->qid); + + res = PQexec(qfs->conn, cmd); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + PQclear(res); + ret = -1; + goto done; + } else + PQclear(res); + + sprintf(cmd, "INSERT INTO archived_mms_message_headers SELECT " + " * from mms_message_headers WHERE qid = %lld", qfs->qid); + res = PQexec(qfs->conn, cmd); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + PQclear(res); + ret = -1; + goto done; + } else + PQclear(res); + } + sprintf(cmd, "DELETE from mms_messages WHERE id = %lld", qfs->qid); res = PQexec(qfs->conn, cmd); if (PQresultStatus(res) != PGRES_COMMAND_OK) ret = -1; PQclear(res); + + if (qfs->data_file[0]) /* If external storage is in use */ + unlink(qfs->data_file); } done: @@ -204,6 +350,7 @@ static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int sho long num_attempts, i, n; time_t sendt, created, lastt, edate; char cmd[4*QFNAMEMAX], _qf[QFNAMEMAX*2+1]; + char data_file[4*QFNAMEMAX+1]; Octstr *from = NULL; MmsEnvelope *e = NULL; @@ -219,7 +366,7 @@ static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int sho PQescapeStringConn(c, _qf, qf, n < QFNAMEMAX ? n : QFNAMEMAX, NULL); /* read and block, to ensure no one else touches it. */ - sprintf(cmd, "SELECT id,cdate,lastt,sendt,edate,num_attempts,sender FROM " + sprintf(cmd, "SELECT id,cdate,lastt,sendt,edate,num_attempts,sender,data FROM " " mms_messages_view WHERE qdir='%s' AND qfname = '%s' FOR UPDATE %s", mms_queuedir, _qf, shouldblock ? "" : "NOWAIT"); /* nice little PostgreSQL 8.x addition. */ @@ -265,6 +412,18 @@ static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int sho if ((s = PQgetvalue(r, 0, PQfnumber(r, "sender"))) != NULL) from = octstr_create(s); + /* If the data field starts with a '@' then it indicates a file on the hard disk: + * -- we therefore load the file location. If not, then it is a raw message, so we skip it. + * Note: '@' can't appear as first byte of a binary MMS so we're safe. + * If you use inline binary MMS storage, this will cause a bit of a slowdown. + * But you don't want to store binary MMS inline anyway because that is slow... + */ + if ((s = PQgetvalue(r, 0, PQfnumber(r, "data"))) != NULL && + s[0] == '@') + strncpy(data_file, s+1, sizeof data_file); + else + data_file[0] = 0; /* nada */ + PQclear(r); sprintf(cmd, "SELECT item,value,id FROM mms_message_headers WHERE qid=%lld FOR UPDATE", qid); @@ -296,6 +455,7 @@ static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int sho pgs->conn = c; pgs->qid = qid; strncpy(pgs->dir, mms_queuedir, sizeof pgs->dir); + strncpy(pgs->data_file, data_file, sizeof pgs->data_file); /* set some top-level stuff. */ e->created = created; @@ -610,10 +770,11 @@ static Octstr *pgq_queue_add(Octstr *from, List *to, struct pgfile_t *qfs = NULL; PGconn *conn = get_conn(); PGresult *r; - char *data, *xfrom, *s; + char *data, *xfrom, *s, pbuf[512]; size_t dlen; static int ect; - + FILE *f; + if (conn == NULL) return NULL; @@ -649,19 +810,35 @@ static Octstr *pgq_queue_add(Octstr *from, List *to, qfs = e->qfs_data; qfs->conn = conn; strncpy(qfs->dir, directory, sizeof qfs->dir); + + qfs->qid = qid; /* write the basic data: qid, qfname, qdir, sender, data, escape those that are not trusted. */ xfrom = gw_malloc(2*octstr_len(from)+1); PQescapeStringConn(qfs->conn, xfrom, octstr_get_cstr(from), octstr_len(from), NULL); - data = (void *)PQescapeByteaConn(qfs->conn, (void *)octstr_get_cstr(ms), octstr_len(ms), &dlen); - + + if (external_storage && /* make a file and use that instead. */ + mk_data_file(qid, "active", qfs->dir, qfs->data_file) == 0 && + (f = fopen(qfs->data_file, "w")) != NULL) { + + sprintf(pbuf, "@%.500s", qfs->data_file); /* indicate that this is a file. */ + data = pbuf; + + octstr_print(f, ms); + fclose(f); + } else { + data = (void *)PQescapeByteaConn(qfs->conn, (void *)octstr_get_cstr(ms), octstr_len(ms), &dlen); + qfs->data_file[0] = 0; + } + sprintf(xqid, "%lld", qid); xcmd = octstr_format("INSERT INTO mms_messages (id, qdir, qfname, sender, data,expire_date) VALUES " " (%s, '%s', '%s', '%s', E'%s'::bytea, current_timestamp)", xqid, directory, qf, xfrom, data); - PQfreemem(data); + if (data != pbuf) + PQfreemem(data); gw_free(xfrom); r = PQexec(qfs->conn, octstr_get_cstr(xcmd)); @@ -728,11 +905,11 @@ static int pgq_queue_update(MmsEnvelope *e) static int pgq_queue_replacedata(MmsEnvelope *e, MmsMsg *m) { - char *data, xqid[128]; - size_t dlen; + char xqid[128]; + struct pgfile_t *qfs; int ret = 0; - Octstr *ms, *xcmd; + Octstr *ms; PGresult *r; if (!e) return -1; @@ -740,16 +917,37 @@ static int pgq_queue_replacedata(MmsEnvelope *e, MmsMsg *m) qfs = e->qfs_data; ms = mms_tobinary(m); - data = (void *)PQescapeByteaConn(qfs->conn, (void *)octstr_get_cstr(ms), octstr_len(ms), &dlen); - sprintf(xqid, "%lld", qfs->qid); - xcmd = octstr_format("UPDATE mms_messages SET data=E'%s' WHERE id = %s", - data, xqid); - r = PQexec(qfs->conn, octstr_get_cstr(xcmd)); - ret = (PQresultStatus(r) != PGRES_COMMAND_OK) ? -1 : 0; /* do nothing about error. we are in a transaction.*/ - PQclear(r); + if (qfs->data_file[0]) { /* we're using file-based storage. Try a non-destructive replace */ + char pbuf[640]; + FILE *f; + + sprintf(pbuf, "%s.new", qfs->data_file); + + if ((f = fopen(pbuf, "w")) != NULL) { + octstr_print(f, ms); + + fclose(f); + + ret = rename(pbuf, qfs->data_file); + } else + ret = -1; + } else { + size_t dlen; + char *data = (void *)PQescapeByteaConn(qfs->conn, + (void *)octstr_get_cstr(ms), + octstr_len(ms), &dlen); + Octstr *xcmd; + sprintf(xqid, "%lld", qfs->qid); + xcmd = octstr_format("UPDATE mms_messages SET data=E'%s' WHERE id = %s", + data, xqid); + r = PQexec(qfs->conn, octstr_get_cstr(xcmd)); + ret = (PQresultStatus(r) != PGRES_COMMAND_OK) ? -1 : 0; /* do nothing about error. we are in a transaction.*/ + PQfreemem(data); + PQclear(r); + octstr_destroy(xcmd); + } + octstr_destroy(ms); - PQfreemem(data); - octstr_destroy(xcmd); return ret; } @@ -759,48 +957,54 @@ static MmsMsg *pgq_queue_getdata(MmsEnvelope *e) struct pgfile_t *qfs; MmsMsg *m; Octstr *ms; - size_t dlen, n; - char cmd[512], *data, *x; - PGresult *r; if (e == NULL) return NULL; qfs = e->qfs_data; - sprintf(cmd, "SELECT data from mms_messages WHERE id = %lld", qfs->qid); - r = PQexec(qfs->conn, cmd); - - if (PQresultStatus(r) != PGRES_TUPLES_OK || - (n = PQntuples(r)) < 1) { + if (qfs->data_file[0]) + ms = octstr_read_file(qfs->data_file); + else { + size_t dlen, n; + char cmd[512], *data, *x; + PGresult *r; + + sprintf(cmd, "SELECT data from mms_messages WHERE id = %lld", qfs->qid); + r = PQexec(qfs->conn, cmd); + + if (PQresultStatus(r) != PGRES_TUPLES_OK || + (n = PQntuples(r)) < 1) { + PQclear(r); + mms_error(0, "pgsql_queue", NULL, "mms_queue_getdata: Failed to load data for queue entry %s in %s", + e->xqfname, qfs->dir); + return NULL; + } + + x = PQgetvalue(r, 0, 0); + if (x && (isprint(x[0]) || x[0] == '\\')) {/* data was sent to us escapaed, so un-escape it. */ + data = (void *)PQunescapeBytea((void *)x, &dlen); + } else { + dlen = PQgetlength(r, 0, 0); /* get data length before you fetch it. */ + data = x; + } + ms = octstr_create_from_data(data, dlen); + if (x != data) PQfreemem(data); + PQclear(r); - mms_error(0, "pgsql_queue", NULL, "mms_queue_getdata: Failed to load data for queue entry %s in %s", - e->xqfname, qfs->dir); - return NULL; } - - x = PQgetvalue(r, 0, 0); - if (x && (isprint(x[0]) || x[0] == '\\')) {/* data was sent to us escapaed, so un-escape it. */ - data = (void *)PQunescapeBytea((void *)x, &dlen); - } else { - dlen = PQgetlength(r, 0, 0); /* get data length before you fetch it. */ - data = x; - } - ms = octstr_create_from_data(data, dlen); - if (x != data) PQfreemem(data); - - PQclear(r); + if (ms == NULL) { mms_error(0, "pgsql_queue", NULL, "mms_queue_getdata: Failed to read data for queue entry %s in %s", - e->xqfname, qfs->dir); + e->xqfname, qfs->dir); return NULL; } m = mms_frombinary(ms, octstr_imm("")); if (!m) mms_error(0, "pgsql_queue", NULL, "mms_queue_getdata: Failed to decode data for queue entry %s in %s", - e->xqfname, qfs->dir); - + e->xqfname, qfs->dir); + octstr_destroy(ms); return m; } @@ -849,7 +1053,8 @@ static void pgq_queue_run(char *dir, mms_info(0, "pgsql_queue", NULL, "Queue runner on [%s] startup...", dir); if (sleepsecs < MIN_QRUN_INTERVAL) { - mms_warning(0, "pgsql_queue", NULL, "minimum queue run interval for PG Queue module is %d secs.", MIN_QRUN_INTERVAL); + mms_warning(0, "pgsql_queue", NULL, + "minimum queue run interval for PG Queue module is %d secs.", MIN_QRUN_INTERVAL); sleepsecs = MIN_QRUN_INTERVAL; } diff --git a/mbuni/mmlib/mms_queue.c b/mbuni/mmlib/mms_queue.c index 55e7a88..1f7c4bb 100644 --- a/mbuni/mmlib/mms_queue.c +++ b/mbuni/mmlib/mms_queue.c @@ -48,7 +48,7 @@ struct qfile_t { /* Name of the queue file, pointer to it (locked). D static char sdir[QFNAMEMAX*2+1]; /* top-level storage directory. */ static int inited; -static int mms_init_queue_module(Octstr *storage_dir, int max_threads) +static int mms_init_queue_module(Octstr *storage_dir, char *unused, int max_threads) { gw_assert(inited==0); gw_assert(storage_dir); @@ -115,6 +115,7 @@ static int free_envelope(MmsEnvelope *e, int removefromqueue); * i - source interface (MM1, MM4, etc.) * F - From address * R - Recipient (the ones pending) for this message + * z - Recipient (those who already received) * C - Time queue entry was created * L - Time of last delivery attempt * D - Time of (next) delivery attempt @@ -273,7 +274,7 @@ static MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int sho } break; case 'R': - + case 'z': t = octstr_create(res); if (mms_validate_address(t) != 0) { mms_warning(0, "mms_queueread", NULL, "Mal-formed address [%s] in file %s! " @@ -282,9 +283,10 @@ static MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int sho } to = gw_malloc(sizeof *to); to->rcpt = t; - to->process = 1; + to->process = (ch == 'R') ? 1 : 0; gwlist_append(e->to, to); break; + case 'C': e->created = atol(res); break; @@ -445,10 +447,8 @@ static int writeenvelope(MmsEnvelope *e, int newenv) n = 0; for (i = 0; i < n; i++) { - MmsEnvelopeTo *to = gwlist_get(e->to, i); - - if (to->process) - _putline(fd, "R", octstr_get_cstr(to->rcpt)); + MmsEnvelopeTo *to = gwlist_get(e->to, i); + _putline(fd, (to->process) ? "R" : "z", octstr_get_cstr(to->rcpt)); } /* Output headers if any. */ @@ -887,7 +887,8 @@ MmsEnvelope *mms_queue_create_envelope(Octstr *from, List *to, e->url1 = url1 ? octstr_duplicate(url1) : NULL; e->url2 = url2 ? octstr_duplicate(url2) : NULL; e->hdrs = hdrs ? http_header_duplicate(hdrs) : NULL; - + + e->dlr = dlr; strncpy(e->src_interface, src_interface ? src_interface : "", sizeof e->src_interface); diff --git a/mbuni/mmlib/mms_queue.h b/mbuni/mmlib/mms_queue.h index 2e59700..ca0b21c 100644 --- a/mbuni/mmlib/mms_queue.h +++ b/mbuni/mmlib/mms_queue.h @@ -40,7 +40,7 @@ typedef struct MmsEnvelope { Octstr *url2; List *hdrs; /* Generic list of headers associated with message. */ - List *to; /* List of recipients: MmsEnvelopeTo */ + List *to; /* List of recipients: MmsEnvelopeTo (if process=0 then already sent to)*/ Octstr *subject; /* Message subject (if any). */ @@ -79,7 +79,7 @@ typedef struct MmsQueueHandlerFuncs { /* Initialise queue module. Must be called at least once on each queue dir. * max_concurrent is a best guess as to number of concurrent queue requests */ - int (*mms_init_queue_module)(Octstr *init_data, int max_concurrent); + int (*mms_init_queue_module)(Octstr *init_data, char *top_storage_dir, int max_concurrent); /* initialise a queue directory. There can be multiple directories, * upperlevel decides what a directory is. diff --git a/mbuni/mmsbox/mmsbox_cfg.c b/mbuni/mmsbox/mmsbox_cfg.c index 63eda21..7d1fa44 100644 --- a/mbuni/mmsbox/mmsbox_cfg.c +++ b/mbuni/mmsbox/mmsbox_cfg.c @@ -151,12 +151,16 @@ int mms_load_mmsbox_settings(Octstr *fname, gwthread_func_t *mmsc_handler_func) if ((qfs = _mms_load_module(cfg, grp, "queue-manager-module", "qfuncs", NULL)) == NULL) { qfs = &default_qfuncs; /* default queue handler. */ - qfs->mms_init_queue_module(gdir, (2 + 1)*maxthreads); /* We expect 2 each for each mmsbox thread, + qfs->mms_init_queue_module(gdir, + octstr_get_cstr(gdir), + (2 + 1)*maxthreads); /* We expect 2 each for each mmsbox thread, * one each for each bearerbox thread. */ } else { Octstr *s = _mms_cfg_getx(cfg, grp, octstr_imm("queue-module-init-data")); - if (qfs->mms_init_queue_module(s, (2+1)*maxthreads) != 0) + if (qfs->mms_init_queue_module(s, + octstr_get_cstr(gdir), + (2+1)*maxthreads) != 0) panic(0, "failed to initialise queue module, with data: %s", octstr_get_cstr(s)); octstr_destroy(s); @@ -186,12 +190,14 @@ int mms_load_mmsbox_settings(Octstr *fname, gwthread_func_t *mmsc_handler_func) /* XXX still uses old-style file storage. */ if (qfs != &default_qfuncs) - default_qfuncs.mms_init_queue_module(gdir, maxthreads); + default_qfuncs.mms_init_queue_module(gdir, + octstr_get_cstr(gdir), + maxthreads); if ((dlr_dir = default_qfuncs.mms_init_queue_dir("mmsbox_dlr", &xx)) == NULL || xx != 0) panic(0, "Failed to initialise dlr storage directory: %s - %s!", octstr_get_cstr(dlr_dir), strerror(errno)); - + unified_prefix = _mms_cfg_getx(cfg, grp, octstr_imm("unified-prefix")); diff --git a/mbuni/mmsc/mmsc_cfg.c b/mbuni/mmsc/mmsc_cfg.c index a0c1852..5e3fe65 100644 --- a/mbuni/mmsc/mmsc_cfg.c +++ b/mbuni/mmsc/mmsc_cfg.c @@ -141,12 +141,15 @@ MmscSettings *mms_load_mmsc_settings(Octstr *fname, List **proxyrelays) if ((m->qfs = _mms_load_module(cfg, grp, "queue-manager-module", "qfuncs", NULL)) == NULL) { m->qfs = &default_qfuncs; /* default queue handler. */ - m->qfs->mms_init_queue_module(qdir, (2 + 2 + 2)*m->maxthreads); /* We expect 2 max for each mmsrelay component (= 4) + m->qfs->mms_init_queue_module(qdir, + octstr_get_cstr(qdir), + (2 + 2 + 2)*m->maxthreads); /* We expect 2 max for each mmsrelay component (= 4) * + 2 for mmsproxy (on for mm1proxy, one for mm7proxy */ } else { Octstr *s = _mms_cfg_getx(cfg, grp, octstr_imm("queue-module-init-data")); - if (m->qfs->mms_init_queue_module(s, (2 + 2 + 2)*m->maxthreads) != 0) + if (m->qfs->mms_init_queue_module(s, octstr_get_cstr(qdir), + (2 + 2 + 2)*m->maxthreads) != 0) panic(0, "failed to initialise queue module, with data: %s", octstr_get_cstr(s)); octstr_destroy(s);