PGSQL queue module changes: optionally move message storage to file
This commit is contained in:
parent
6c5759f1e9
commit
bc1a24200e
|
@ -1,3 +1,5 @@
|
||||||
|
2008-11-14 P. A. Bagyenda <bagyenda@dsmagic.com>
|
||||||
|
* PSQL Queue module changes: optionally move message storage to file to improve performance
|
||||||
2008-11-12 P. A. Bagyenda <bagyenda@dsmagic.com>
|
2008-11-12 P. A. Bagyenda <bagyenda@dsmagic.com>
|
||||||
* Improved DLR delivery to external URL (retries) in mmsbox
|
* Improved DLR delivery to external URL (retries) in mmsbox
|
||||||
2008-11-04 P. A. Bagyenda <bagyenda@dsmagic.com>
|
2008-11-04 P. A. Bagyenda <bagyenda@dsmagic.com>
|
||||||
|
|
|
@ -1,6 +1,13 @@
|
||||||
#include <unistd.h>
|
#include <sys/file.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <string.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
#include <errno.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <sys/stat.h>
|
||||||
|
#include <unistd.h>
|
||||||
#include <ctype.h>
|
#include <ctype.h>
|
||||||
|
|
||||||
#include <libpq-fe.h>
|
#include <libpq-fe.h>
|
||||||
#include "mms_queue.h"
|
#include "mms_queue.h"
|
||||||
|
|
||||||
|
@ -22,15 +29,38 @@
|
||||||
#define DEFAULT_CONNECTIONS 5
|
#define DEFAULT_CONNECTIONS 5
|
||||||
#define MIN_QRUN_INTERVAL 2 /* we don't want to hurt DB. */
|
#define MIN_QRUN_INTERVAL 2 /* we don't want to hurt DB. */
|
||||||
#define MQF 'q'
|
#define MQF 'q'
|
||||||
|
#define MIN_PG_VERSION 80200 /* v8.2 */
|
||||||
|
|
||||||
static List *free_conns;
|
static List *free_conns;
|
||||||
static int pool_size;
|
static int pool_size;
|
||||||
static int pgq_init_module(Octstr *conninfo, int max_threads)
|
static char topdir[512];
|
||||||
|
/* Control flags: Read from conninfo */
|
||||||
|
static int external_storage = 1;
|
||||||
|
static int archive_msgs = 1;
|
||||||
|
static const char *tdirs[] = {"active", "archive", NULL};
|
||||||
|
|
||||||
|
static int pgq_init_module(Octstr *conninfo, char *xtopdir, int max_threads)
|
||||||
{
|
{
|
||||||
long i, n;
|
long i, n;
|
||||||
|
Octstr *xcinfo;
|
||||||
gw_assert(conninfo);
|
gw_assert(conninfo);
|
||||||
|
int ver = -1;
|
||||||
|
|
||||||
|
xcinfo = octstr_duplicate(conninfo);
|
||||||
|
|
||||||
|
/* Now look for flags */
|
||||||
|
if ((i = octstr_search_char(xcinfo, ';', 0)) > 0) {
|
||||||
|
Octstr *x = octstr_copy(xcinfo, 0, i);
|
||||||
|
|
||||||
|
octstr_delete(xcinfo, 0, i+1);
|
||||||
|
|
||||||
|
if (octstr_case_search(x, octstr_imm("internal"), 0) >= 0)
|
||||||
|
external_storage = 0;
|
||||||
|
if (octstr_case_search(x, octstr_imm("no-archive"), 0) >= 0)
|
||||||
|
archive_msgs = 0;
|
||||||
|
|
||||||
|
octstr_destroy(x);
|
||||||
|
}
|
||||||
n = max_threads + 1;
|
n = max_threads + 1;
|
||||||
|
|
||||||
if (n <= 0)
|
if (n <= 0)
|
||||||
|
@ -39,10 +69,18 @@ static int pgq_init_module(Octstr *conninfo, int max_threads)
|
||||||
free_conns = gwlist_create();
|
free_conns = gwlist_create();
|
||||||
gwlist_add_producer(free_conns);
|
gwlist_add_producer(free_conns);
|
||||||
for (i = 0; i<n;i++) {
|
for (i = 0; i<n;i++) {
|
||||||
PGconn *c = PQconnectdb(octstr_get_cstr(conninfo));
|
PGconn *c = PQconnectdb(octstr_get_cstr(xcinfo));
|
||||||
if (c && PQstatus(c) == CONNECTION_OK) {
|
if (c && PQstatus(c) == CONNECTION_OK) {
|
||||||
gwlist_produce(free_conns, c);
|
gwlist_produce(free_conns, c);
|
||||||
pool_size++;
|
pool_size++;
|
||||||
|
|
||||||
|
if (ver < 0) {
|
||||||
|
ver = PQserverVersion(c);
|
||||||
|
if (ver<MIN_PG_VERSION)
|
||||||
|
mms_error(0, "pgsql_queue", NULL, "PostgreSQL server must be version >= v%d.%d",
|
||||||
|
MIN_PG_VERSION/10000,
|
||||||
|
(MIN_PG_VERSION/100) % 100);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
mms_error(0, "pgsql_queue", NULL, "init: failed to connect to db: %s",
|
mms_error(0, "pgsql_queue", NULL, "init: failed to connect to db: %s",
|
||||||
PQerrorMessage(c));
|
PQerrorMessage(c));
|
||||||
|
@ -51,6 +89,47 @@ static int pgq_init_module(Octstr *conninfo, int max_threads)
|
||||||
}
|
}
|
||||||
|
|
||||||
srand(time(NULL));
|
srand(time(NULL));
|
||||||
|
|
||||||
|
if (external_storage) {
|
||||||
|
char buf[512], fbuf[512];
|
||||||
|
|
||||||
|
int j;
|
||||||
|
sprintf(topdir, "%.192s/pgsql-queue-data", xtopdir);
|
||||||
|
|
||||||
|
if (mkdir(topdir,
|
||||||
|
S_IRWXU|S_IRWXG) < 0 &&
|
||||||
|
errno != EEXIST)
|
||||||
|
mms_error(0, "pgsql_queue", NULL, "init: Failed to create message storage directory [%s]: %s",
|
||||||
|
topdir,
|
||||||
|
strerror(errno));
|
||||||
|
|
||||||
|
|
||||||
|
/* initialise a irectory structure for the messages */
|
||||||
|
for (j = 0; tdirs[j]; j++) {
|
||||||
|
sprintf(buf, "%.256s/%s", topdir, tdirs[j]);
|
||||||
|
if (mkdir(buf,
|
||||||
|
S_IRWXU|S_IRWXG) < 0 &&
|
||||||
|
errno != EEXIST)
|
||||||
|
mms_error(0, "pgsql_queue", NULL,
|
||||||
|
"init: Failed to create message storage directory [%s]: %s",
|
||||||
|
buf,
|
||||||
|
strerror(errno));
|
||||||
|
else
|
||||||
|
for (i = 0; _TT[i]; i++) { /* initialise the top level only... */
|
||||||
|
sprintf(fbuf, "%.270s/%c", buf, _TT[i]);
|
||||||
|
if (mkdir(fbuf,
|
||||||
|
S_IRWXU|S_IRWXG) < 0 &&
|
||||||
|
errno != EEXIST)
|
||||||
|
mms_error(0, "pgsql_queue", NULL,
|
||||||
|
"init: Failed to create message storage directory [%s]: %s",
|
||||||
|
fbuf,
|
||||||
|
strerror(errno));
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
octstr_destroy(xcinfo);
|
||||||
return gwlist_len(free_conns) > 0 ? 0 : -1;
|
return gwlist_len(free_conns) > 0 ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,7 +191,8 @@ struct pgfile_t {
|
||||||
PGconn *conn;
|
PGconn *conn;
|
||||||
char dir[256+1];
|
char dir[256+1];
|
||||||
char _pad[4]; /* paranoia */
|
char _pad[4]; /* paranoia */
|
||||||
int64_t qid; /* internal key into table (if any) */
|
int64_t qid; /* internal key into table (if any) */
|
||||||
|
char data_file[QFNAMEMAX*4+1]; /* location of the data file, if on file system */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -120,10 +200,45 @@ static Octstr *pgq_init_queue_dir(char *qdir, int *error)
|
||||||
{
|
{
|
||||||
gw_assert(free_conns);
|
gw_assert(free_conns);
|
||||||
if (error) *error = 0;
|
if (error) *error = 0;
|
||||||
|
|
||||||
return octstr_create(qdir); /* just make a string out of it. nothing more to do. */
|
return octstr_create(qdir); /* just make a string out of it. nothing more to do. */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int mk_data_file(int64_t qid, char *loc, char *dir, char out[])
|
||||||
|
{
|
||||||
|
char subdir[10];
|
||||||
|
int i;
|
||||||
|
|
||||||
|
if (topdir[0] == 0)
|
||||||
|
gw_panic(0, "Top dir not set! ");
|
||||||
|
|
||||||
|
/* Copied largely from mms_queue.c */
|
||||||
|
if ((i = random() % 3) == 0) /* toplevel. */
|
||||||
|
subdir[0] = 0;
|
||||||
|
else if (i == 1) /* one in */
|
||||||
|
sprintf(subdir, "%c/", _TT[random() % _TTSIZE]);
|
||||||
|
else { /* two in. */
|
||||||
|
char csubdir[QFNAMEMAX*4+1];
|
||||||
|
sprintf(subdir, "%c/%c%c/",
|
||||||
|
_TT[random() % _TTSIZE],
|
||||||
|
_TT[random() % _TTSIZE],
|
||||||
|
_TT[random() % _TTSIZE]);
|
||||||
|
|
||||||
|
sprintf(csubdir, "%.128s/%.64s/%s", topdir, loc, subdir);
|
||||||
|
if (mkdir(csubdir,
|
||||||
|
S_IRWXU|S_IRWXG) < 0 &&
|
||||||
|
errno != EEXIST) {
|
||||||
|
mms_error(0, "pgsql_queue", NULL, "Failed to create dir %s - %s!",
|
||||||
|
csubdir, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sprintf(out, "%.128s/%.64s/%s%s-%lld.mms", topdir, loc, subdir, dir, qid);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int pgq_free_envelope(MmsEnvelope *e, int removefromqueue)
|
static int pgq_free_envelope(MmsEnvelope *e, int removefromqueue)
|
||||||
{
|
{
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
@ -139,33 +254,64 @@ static int pgq_free_envelope(MmsEnvelope *e, int removefromqueue)
|
||||||
char cmd[256];
|
char cmd[256];
|
||||||
/* copy to separate table. */
|
/* copy to separate table. */
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
sprintf(cmd, "INSERT INTO archived_mms_messages SELECT "
|
|
||||||
" * from mms_messages WHERE id = %lld", qfs->qid);
|
|
||||||
res = PQexec(qfs->conn, cmd);
|
|
||||||
|
|
||||||
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
|
|
||||||
PQclear(res);
|
|
||||||
ret = -1;
|
|
||||||
goto done;
|
|
||||||
} else
|
|
||||||
PQclear(res);
|
|
||||||
|
|
||||||
sprintf(cmd, "INSERT INTO archived_mms_message_headers SELECT "
|
|
||||||
" * from mms_message_headers WHERE qid = %lld", qfs->qid);
|
|
||||||
res = PQexec(qfs->conn, cmd);
|
|
||||||
|
|
||||||
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
|
if (archive_msgs) {
|
||||||
PQclear(res);
|
if (qfs->data_file[0]) { /* we need to move the file to archives. */
|
||||||
ret = -1;
|
char afile[512];
|
||||||
goto done;
|
FILE *f;
|
||||||
} else
|
|
||||||
PQclear(res);
|
afile[0] = 0;
|
||||||
|
if (mk_data_file(qfs->qid, "archive", qfs->dir, afile) < 0)
|
||||||
|
goto done;
|
||||||
|
else if ((f = fopen(afile, "w")) != NULL) {
|
||||||
|
Octstr *x = octstr_read_file(qfs->data_file);
|
||||||
|
|
||||||
|
if (x)
|
||||||
|
octstr_print(f, x);
|
||||||
|
octstr_destroy(x);
|
||||||
|
fclose(f);
|
||||||
|
} else
|
||||||
|
mms_warning(0, "pgsql_queue", NULL, "Failed to archive MMS data file [%s]: %s",
|
||||||
|
qfs->data_file, strerror(errno));
|
||||||
|
|
||||||
|
sprintf(cmd, "INSERT INTO archived_mms_messages SELECT "
|
||||||
|
" id,qdir,qfname,sender,created,last_try,send_time,"
|
||||||
|
"expire_date,num_attempts,'@%.128s' as data from mms_messages WHERE id = %lld",
|
||||||
|
afile,
|
||||||
|
qfs->qid); /* put in new file name */
|
||||||
|
} else
|
||||||
|
sprintf(cmd, "INSERT INTO archived_mms_messages SELECT "
|
||||||
|
" * from mms_messages WHERE id = %lld", qfs->qid);
|
||||||
|
|
||||||
|
res = PQexec(qfs->conn, cmd);
|
||||||
|
|
||||||
|
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
|
||||||
|
PQclear(res);
|
||||||
|
ret = -1;
|
||||||
|
goto done;
|
||||||
|
} else
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
sprintf(cmd, "INSERT INTO archived_mms_message_headers SELECT "
|
||||||
|
" * from mms_message_headers WHERE qid = %lld", qfs->qid);
|
||||||
|
res = PQexec(qfs->conn, cmd);
|
||||||
|
|
||||||
|
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
|
||||||
|
PQclear(res);
|
||||||
|
ret = -1;
|
||||||
|
goto done;
|
||||||
|
} else
|
||||||
|
PQclear(res);
|
||||||
|
}
|
||||||
|
|
||||||
sprintf(cmd, "DELETE from mms_messages WHERE id = %lld", qfs->qid);
|
sprintf(cmd, "DELETE from mms_messages WHERE id = %lld", qfs->qid);
|
||||||
res = PQexec(qfs->conn, cmd);
|
res = PQexec(qfs->conn, cmd);
|
||||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
ret = -1;
|
ret = -1;
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
|
|
||||||
|
if (qfs->data_file[0]) /* If external storage is in use */
|
||||||
|
unlink(qfs->data_file);
|
||||||
}
|
}
|
||||||
|
|
||||||
done:
|
done:
|
||||||
|
@ -204,6 +350,7 @@ static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int sho
|
||||||
long num_attempts, i, n;
|
long num_attempts, i, n;
|
||||||
time_t sendt, created, lastt, edate;
|
time_t sendt, created, lastt, edate;
|
||||||
char cmd[4*QFNAMEMAX], _qf[QFNAMEMAX*2+1];
|
char cmd[4*QFNAMEMAX], _qf[QFNAMEMAX*2+1];
|
||||||
|
char data_file[4*QFNAMEMAX+1];
|
||||||
Octstr *from = NULL;
|
Octstr *from = NULL;
|
||||||
|
|
||||||
MmsEnvelope *e = NULL;
|
MmsEnvelope *e = NULL;
|
||||||
|
@ -219,7 +366,7 @@ static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int sho
|
||||||
PQescapeStringConn(c, _qf, qf, n < QFNAMEMAX ? n : QFNAMEMAX, NULL);
|
PQescapeStringConn(c, _qf, qf, n < QFNAMEMAX ? n : QFNAMEMAX, NULL);
|
||||||
|
|
||||||
/* read and block, to ensure no one else touches it. */
|
/* read and block, to ensure no one else touches it. */
|
||||||
sprintf(cmd, "SELECT id,cdate,lastt,sendt,edate,num_attempts,sender FROM "
|
sprintf(cmd, "SELECT id,cdate,lastt,sendt,edate,num_attempts,sender,data FROM "
|
||||||
" mms_messages_view WHERE qdir='%s' AND qfname = '%s' FOR UPDATE %s",
|
" mms_messages_view WHERE qdir='%s' AND qfname = '%s' FOR UPDATE %s",
|
||||||
mms_queuedir, _qf,
|
mms_queuedir, _qf,
|
||||||
shouldblock ? "" : "NOWAIT"); /* nice little PostgreSQL 8.x addition. */
|
shouldblock ? "" : "NOWAIT"); /* nice little PostgreSQL 8.x addition. */
|
||||||
|
@ -265,6 +412,18 @@ static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int sho
|
||||||
if ((s = PQgetvalue(r, 0, PQfnumber(r, "sender"))) != NULL)
|
if ((s = PQgetvalue(r, 0, PQfnumber(r, "sender"))) != NULL)
|
||||||
from = octstr_create(s);
|
from = octstr_create(s);
|
||||||
|
|
||||||
|
/* If the data field starts with a '@' then it indicates a file on the hard disk:
|
||||||
|
* -- we therefore load the file location. If not, then it is a raw message, so we skip it.
|
||||||
|
* Note: '@' can't appear as first byte of a binary MMS so we're safe.
|
||||||
|
* If you use inline binary MMS storage, this will cause a bit of a slowdown.
|
||||||
|
* But you don't want to store binary MMS inline anyway because that is slow...
|
||||||
|
*/
|
||||||
|
if ((s = PQgetvalue(r, 0, PQfnumber(r, "data"))) != NULL &&
|
||||||
|
s[0] == '@')
|
||||||
|
strncpy(data_file, s+1, sizeof data_file);
|
||||||
|
else
|
||||||
|
data_file[0] = 0; /* nada */
|
||||||
|
|
||||||
PQclear(r);
|
PQclear(r);
|
||||||
|
|
||||||
sprintf(cmd, "SELECT item,value,id FROM mms_message_headers WHERE qid=%lld FOR UPDATE", qid);
|
sprintf(cmd, "SELECT item,value,id FROM mms_message_headers WHERE qid=%lld FOR UPDATE", qid);
|
||||||
|
@ -296,6 +455,7 @@ static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int sho
|
||||||
pgs->conn = c;
|
pgs->conn = c;
|
||||||
pgs->qid = qid;
|
pgs->qid = qid;
|
||||||
strncpy(pgs->dir, mms_queuedir, sizeof pgs->dir);
|
strncpy(pgs->dir, mms_queuedir, sizeof pgs->dir);
|
||||||
|
strncpy(pgs->data_file, data_file, sizeof pgs->data_file);
|
||||||
|
|
||||||
/* set some top-level stuff. */
|
/* set some top-level stuff. */
|
||||||
e->created = created;
|
e->created = created;
|
||||||
|
@ -610,10 +770,11 @@ static Octstr *pgq_queue_add(Octstr *from, List *to,
|
||||||
struct pgfile_t *qfs = NULL;
|
struct pgfile_t *qfs = NULL;
|
||||||
PGconn *conn = get_conn();
|
PGconn *conn = get_conn();
|
||||||
PGresult *r;
|
PGresult *r;
|
||||||
char *data, *xfrom, *s;
|
char *data, *xfrom, *s, pbuf[512];
|
||||||
size_t dlen;
|
size_t dlen;
|
||||||
static int ect;
|
static int ect;
|
||||||
|
FILE *f;
|
||||||
|
|
||||||
if (conn == NULL)
|
if (conn == NULL)
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
|
@ -649,19 +810,35 @@ static Octstr *pgq_queue_add(Octstr *from, List *to,
|
||||||
qfs = e->qfs_data;
|
qfs = e->qfs_data;
|
||||||
qfs->conn = conn;
|
qfs->conn = conn;
|
||||||
strncpy(qfs->dir, directory, sizeof qfs->dir);
|
strncpy(qfs->dir, directory, sizeof qfs->dir);
|
||||||
|
|
||||||
|
|
||||||
qfs->qid = qid;
|
qfs->qid = qid;
|
||||||
|
|
||||||
/* write the basic data: qid, qfname, qdir, sender, data, escape those that are not trusted. */
|
/* write the basic data: qid, qfname, qdir, sender, data, escape those that are not trusted. */
|
||||||
|
|
||||||
xfrom = gw_malloc(2*octstr_len(from)+1);
|
xfrom = gw_malloc(2*octstr_len(from)+1);
|
||||||
PQescapeStringConn(qfs->conn, xfrom, octstr_get_cstr(from), octstr_len(from), NULL);
|
PQescapeStringConn(qfs->conn, xfrom, octstr_get_cstr(from), octstr_len(from), NULL);
|
||||||
data = (void *)PQescapeByteaConn(qfs->conn, (void *)octstr_get_cstr(ms), octstr_len(ms), &dlen);
|
|
||||||
|
if (external_storage && /* make a file and use that instead. */
|
||||||
|
mk_data_file(qid, "active", qfs->dir, qfs->data_file) == 0 &&
|
||||||
|
(f = fopen(qfs->data_file, "w")) != NULL) {
|
||||||
|
|
||||||
|
sprintf(pbuf, "@%.500s", qfs->data_file); /* indicate that this is a file. */
|
||||||
|
data = pbuf;
|
||||||
|
|
||||||
|
octstr_print(f, ms);
|
||||||
|
fclose(f);
|
||||||
|
} else {
|
||||||
|
data = (void *)PQescapeByteaConn(qfs->conn, (void *)octstr_get_cstr(ms), octstr_len(ms), &dlen);
|
||||||
|
qfs->data_file[0] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
sprintf(xqid, "%lld", qid);
|
sprintf(xqid, "%lld", qid);
|
||||||
xcmd = octstr_format("INSERT INTO mms_messages (id, qdir, qfname, sender, data,expire_date) VALUES "
|
xcmd = octstr_format("INSERT INTO mms_messages (id, qdir, qfname, sender, data,expire_date) VALUES "
|
||||||
" (%s, '%s', '%s', '%s', E'%s'::bytea, current_timestamp)",
|
" (%s, '%s', '%s', '%s', E'%s'::bytea, current_timestamp)",
|
||||||
xqid, directory, qf, xfrom, data);
|
xqid, directory, qf, xfrom, data);
|
||||||
PQfreemem(data);
|
if (data != pbuf)
|
||||||
|
PQfreemem(data);
|
||||||
gw_free(xfrom);
|
gw_free(xfrom);
|
||||||
|
|
||||||
r = PQexec(qfs->conn, octstr_get_cstr(xcmd));
|
r = PQexec(qfs->conn, octstr_get_cstr(xcmd));
|
||||||
|
@ -728,11 +905,11 @@ static int pgq_queue_update(MmsEnvelope *e)
|
||||||
|
|
||||||
static int pgq_queue_replacedata(MmsEnvelope *e, MmsMsg *m)
|
static int pgq_queue_replacedata(MmsEnvelope *e, MmsMsg *m)
|
||||||
{
|
{
|
||||||
char *data, xqid[128];
|
char xqid[128];
|
||||||
size_t dlen;
|
|
||||||
struct pgfile_t *qfs;
|
struct pgfile_t *qfs;
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
Octstr *ms, *xcmd;
|
Octstr *ms;
|
||||||
PGresult *r;
|
PGresult *r;
|
||||||
|
|
||||||
if (!e) return -1;
|
if (!e) return -1;
|
||||||
|
@ -740,16 +917,37 @@ static int pgq_queue_replacedata(MmsEnvelope *e, MmsMsg *m)
|
||||||
qfs = e->qfs_data;
|
qfs = e->qfs_data;
|
||||||
ms = mms_tobinary(m);
|
ms = mms_tobinary(m);
|
||||||
|
|
||||||
data = (void *)PQescapeByteaConn(qfs->conn, (void *)octstr_get_cstr(ms), octstr_len(ms), &dlen);
|
if (qfs->data_file[0]) { /* we're using file-based storage. Try a non-destructive replace */
|
||||||
sprintf(xqid, "%lld", qfs->qid);
|
char pbuf[640];
|
||||||
xcmd = octstr_format("UPDATE mms_messages SET data=E'%s' WHERE id = %s",
|
FILE *f;
|
||||||
data, xqid);
|
|
||||||
r = PQexec(qfs->conn, octstr_get_cstr(xcmd));
|
sprintf(pbuf, "%s.new", qfs->data_file);
|
||||||
ret = (PQresultStatus(r) != PGRES_COMMAND_OK) ? -1 : 0; /* do nothing about error. we are in a transaction.*/
|
|
||||||
PQclear(r);
|
if ((f = fopen(pbuf, "w")) != NULL) {
|
||||||
|
octstr_print(f, ms);
|
||||||
|
|
||||||
|
fclose(f);
|
||||||
|
|
||||||
|
ret = rename(pbuf, qfs->data_file);
|
||||||
|
} else
|
||||||
|
ret = -1;
|
||||||
|
} else {
|
||||||
|
size_t dlen;
|
||||||
|
char *data = (void *)PQescapeByteaConn(qfs->conn,
|
||||||
|
(void *)octstr_get_cstr(ms),
|
||||||
|
octstr_len(ms), &dlen);
|
||||||
|
Octstr *xcmd;
|
||||||
|
sprintf(xqid, "%lld", qfs->qid);
|
||||||
|
xcmd = octstr_format("UPDATE mms_messages SET data=E'%s' WHERE id = %s",
|
||||||
|
data, xqid);
|
||||||
|
r = PQexec(qfs->conn, octstr_get_cstr(xcmd));
|
||||||
|
ret = (PQresultStatus(r) != PGRES_COMMAND_OK) ? -1 : 0; /* do nothing about error. we are in a transaction.*/
|
||||||
|
PQfreemem(data);
|
||||||
|
PQclear(r);
|
||||||
|
octstr_destroy(xcmd);
|
||||||
|
}
|
||||||
|
|
||||||
octstr_destroy(ms);
|
octstr_destroy(ms);
|
||||||
PQfreemem(data);
|
|
||||||
octstr_destroy(xcmd);
|
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -759,48 +957,54 @@ static MmsMsg *pgq_queue_getdata(MmsEnvelope *e)
|
||||||
struct pgfile_t *qfs;
|
struct pgfile_t *qfs;
|
||||||
MmsMsg *m;
|
MmsMsg *m;
|
||||||
Octstr *ms;
|
Octstr *ms;
|
||||||
size_t dlen, n;
|
|
||||||
char cmd[512], *data, *x;
|
|
||||||
PGresult *r;
|
|
||||||
|
|
||||||
if (e == NULL)
|
if (e == NULL)
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
qfs = e->qfs_data;
|
qfs = e->qfs_data;
|
||||||
|
|
||||||
sprintf(cmd, "SELECT data from mms_messages WHERE id = %lld", qfs->qid);
|
if (qfs->data_file[0])
|
||||||
r = PQexec(qfs->conn, cmd);
|
ms = octstr_read_file(qfs->data_file);
|
||||||
|
else {
|
||||||
if (PQresultStatus(r) != PGRES_TUPLES_OK ||
|
size_t dlen, n;
|
||||||
(n = PQntuples(r)) < 1) {
|
char cmd[512], *data, *x;
|
||||||
|
PGresult *r;
|
||||||
|
|
||||||
|
sprintf(cmd, "SELECT data from mms_messages WHERE id = %lld", qfs->qid);
|
||||||
|
r = PQexec(qfs->conn, cmd);
|
||||||
|
|
||||||
|
if (PQresultStatus(r) != PGRES_TUPLES_OK ||
|
||||||
|
(n = PQntuples(r)) < 1) {
|
||||||
|
PQclear(r);
|
||||||
|
mms_error(0, "pgsql_queue", NULL, "mms_queue_getdata: Failed to load data for queue entry %s in %s",
|
||||||
|
e->xqfname, qfs->dir);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
x = PQgetvalue(r, 0, 0);
|
||||||
|
if (x && (isprint(x[0]) || x[0] == '\\')) {/* data was sent to us escapaed, so un-escape it. */
|
||||||
|
data = (void *)PQunescapeBytea((void *)x, &dlen);
|
||||||
|
} else {
|
||||||
|
dlen = PQgetlength(r, 0, 0); /* get data length before you fetch it. */
|
||||||
|
data = x;
|
||||||
|
}
|
||||||
|
ms = octstr_create_from_data(data, dlen);
|
||||||
|
if (x != data) PQfreemem(data);
|
||||||
|
|
||||||
PQclear(r);
|
PQclear(r);
|
||||||
mms_error(0, "pgsql_queue", NULL, "mms_queue_getdata: Failed to load data for queue entry %s in %s",
|
|
||||||
e->xqfname, qfs->dir);
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
x = PQgetvalue(r, 0, 0);
|
|
||||||
if (x && (isprint(x[0]) || x[0] == '\\')) {/* data was sent to us escapaed, so un-escape it. */
|
|
||||||
data = (void *)PQunescapeBytea((void *)x, &dlen);
|
|
||||||
} else {
|
|
||||||
dlen = PQgetlength(r, 0, 0); /* get data length before you fetch it. */
|
|
||||||
data = x;
|
|
||||||
}
|
|
||||||
ms = octstr_create_from_data(data, dlen);
|
|
||||||
if (x != data) PQfreemem(data);
|
|
||||||
|
|
||||||
PQclear(r);
|
|
||||||
if (ms == NULL) {
|
if (ms == NULL) {
|
||||||
mms_error(0, "pgsql_queue", NULL, "mms_queue_getdata: Failed to read data for queue entry %s in %s",
|
mms_error(0, "pgsql_queue", NULL, "mms_queue_getdata: Failed to read data for queue entry %s in %s",
|
||||||
e->xqfname, qfs->dir);
|
e->xqfname, qfs->dir);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
m = mms_frombinary(ms, octstr_imm(""));
|
m = mms_frombinary(ms, octstr_imm(""));
|
||||||
if (!m)
|
if (!m)
|
||||||
mms_error(0, "pgsql_queue", NULL, "mms_queue_getdata: Failed to decode data for queue entry %s in %s",
|
mms_error(0, "pgsql_queue", NULL, "mms_queue_getdata: Failed to decode data for queue entry %s in %s",
|
||||||
e->xqfname, qfs->dir);
|
e->xqfname, qfs->dir);
|
||||||
|
|
||||||
octstr_destroy(ms);
|
octstr_destroy(ms);
|
||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
|
@ -849,7 +1053,8 @@ static void pgq_queue_run(char *dir,
|
||||||
|
|
||||||
mms_info(0, "pgsql_queue", NULL, "Queue runner on [%s] startup...", dir);
|
mms_info(0, "pgsql_queue", NULL, "Queue runner on [%s] startup...", dir);
|
||||||
if (sleepsecs < MIN_QRUN_INTERVAL) {
|
if (sleepsecs < MIN_QRUN_INTERVAL) {
|
||||||
mms_warning(0, "pgsql_queue", NULL, "minimum queue run interval for PG Queue module is %d secs.", MIN_QRUN_INTERVAL);
|
mms_warning(0, "pgsql_queue", NULL,
|
||||||
|
"minimum queue run interval for PG Queue module is %d secs.", MIN_QRUN_INTERVAL);
|
||||||
sleepsecs = MIN_QRUN_INTERVAL;
|
sleepsecs = MIN_QRUN_INTERVAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,7 +48,7 @@ struct qfile_t { /* Name of the queue file, pointer to it (locked). D
|
||||||
|
|
||||||
static char sdir[QFNAMEMAX*2+1]; /* top-level storage directory. */
|
static char sdir[QFNAMEMAX*2+1]; /* top-level storage directory. */
|
||||||
static int inited;
|
static int inited;
|
||||||
static int mms_init_queue_module(Octstr *storage_dir, int max_threads)
|
static int mms_init_queue_module(Octstr *storage_dir, char *unused, int max_threads)
|
||||||
{
|
{
|
||||||
gw_assert(inited==0);
|
gw_assert(inited==0);
|
||||||
gw_assert(storage_dir);
|
gw_assert(storage_dir);
|
||||||
|
@ -115,6 +115,7 @@ static int free_envelope(MmsEnvelope *e, int removefromqueue);
|
||||||
* i - source interface (MM1, MM4, etc.)
|
* i - source interface (MM1, MM4, etc.)
|
||||||
* F - From address
|
* F - From address
|
||||||
* R - Recipient (the ones pending) for this message
|
* R - Recipient (the ones pending) for this message
|
||||||
|
* z - Recipient (those who already received)
|
||||||
* C - Time queue entry was created
|
* C - Time queue entry was created
|
||||||
* L - Time of last delivery attempt
|
* L - Time of last delivery attempt
|
||||||
* D - Time of (next) delivery attempt
|
* D - Time of (next) delivery attempt
|
||||||
|
@ -273,7 +274,7 @@ static MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int sho
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case 'R':
|
case 'R':
|
||||||
|
case 'z':
|
||||||
t = octstr_create(res);
|
t = octstr_create(res);
|
||||||
if (mms_validate_address(t) != 0) {
|
if (mms_validate_address(t) != 0) {
|
||||||
mms_warning(0, "mms_queueread", NULL, "Mal-formed address [%s] in file %s! "
|
mms_warning(0, "mms_queueread", NULL, "Mal-formed address [%s] in file %s! "
|
||||||
|
@ -282,9 +283,10 @@ static MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int sho
|
||||||
}
|
}
|
||||||
to = gw_malloc(sizeof *to);
|
to = gw_malloc(sizeof *to);
|
||||||
to->rcpt = t;
|
to->rcpt = t;
|
||||||
to->process = 1;
|
to->process = (ch == 'R') ? 1 : 0;
|
||||||
gwlist_append(e->to, to);
|
gwlist_append(e->to, to);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case 'C':
|
case 'C':
|
||||||
e->created = atol(res);
|
e->created = atol(res);
|
||||||
break;
|
break;
|
||||||
|
@ -445,10 +447,8 @@ static int writeenvelope(MmsEnvelope *e, int newenv)
|
||||||
n = 0;
|
n = 0;
|
||||||
|
|
||||||
for (i = 0; i < n; i++) {
|
for (i = 0; i < n; i++) {
|
||||||
MmsEnvelopeTo *to = gwlist_get(e->to, i);
|
MmsEnvelopeTo *to = gwlist_get(e->to, i);
|
||||||
|
_putline(fd, (to->process) ? "R" : "z", octstr_get_cstr(to->rcpt));
|
||||||
if (to->process)
|
|
||||||
_putline(fd, "R", octstr_get_cstr(to->rcpt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Output headers if any. */
|
/* Output headers if any. */
|
||||||
|
@ -887,7 +887,8 @@ MmsEnvelope *mms_queue_create_envelope(Octstr *from, List *to,
|
||||||
e->url1 = url1 ? octstr_duplicate(url1) : NULL;
|
e->url1 = url1 ? octstr_duplicate(url1) : NULL;
|
||||||
e->url2 = url2 ? octstr_duplicate(url2) : NULL;
|
e->url2 = url2 ? octstr_duplicate(url2) : NULL;
|
||||||
e->hdrs = hdrs ? http_header_duplicate(hdrs) : NULL;
|
e->hdrs = hdrs ? http_header_duplicate(hdrs) : NULL;
|
||||||
|
|
||||||
|
|
||||||
e->dlr = dlr;
|
e->dlr = dlr;
|
||||||
|
|
||||||
strncpy(e->src_interface, src_interface ? src_interface : "", sizeof e->src_interface);
|
strncpy(e->src_interface, src_interface ? src_interface : "", sizeof e->src_interface);
|
||||||
|
|
|
@ -40,7 +40,7 @@ typedef struct MmsEnvelope {
|
||||||
Octstr *url2;
|
Octstr *url2;
|
||||||
List *hdrs; /* Generic list of headers associated with message. */
|
List *hdrs; /* Generic list of headers associated with message. */
|
||||||
|
|
||||||
List *to; /* List of recipients: MmsEnvelopeTo */
|
List *to; /* List of recipients: MmsEnvelopeTo (if process=0 then already sent to)*/
|
||||||
|
|
||||||
Octstr *subject; /* Message subject (if any). */
|
Octstr *subject; /* Message subject (if any). */
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ typedef struct MmsQueueHandlerFuncs {
|
||||||
/* Initialise queue module. Must be called at least once on each queue dir.
|
/* Initialise queue module. Must be called at least once on each queue dir.
|
||||||
* max_concurrent is a best guess as to number of concurrent queue requests
|
* max_concurrent is a best guess as to number of concurrent queue requests
|
||||||
*/
|
*/
|
||||||
int (*mms_init_queue_module)(Octstr *init_data, int max_concurrent);
|
int (*mms_init_queue_module)(Octstr *init_data, char *top_storage_dir, int max_concurrent);
|
||||||
|
|
||||||
/* initialise a queue directory. There can be multiple directories,
|
/* initialise a queue directory. There can be multiple directories,
|
||||||
* upperlevel decides what a directory is.
|
* upperlevel decides what a directory is.
|
||||||
|
|
|
@ -151,12 +151,16 @@ int mms_load_mmsbox_settings(Octstr *fname, gwthread_func_t *mmsc_handler_func)
|
||||||
|
|
||||||
if ((qfs = _mms_load_module(cfg, grp, "queue-manager-module", "qfuncs", NULL)) == NULL) {
|
if ((qfs = _mms_load_module(cfg, grp, "queue-manager-module", "qfuncs", NULL)) == NULL) {
|
||||||
qfs = &default_qfuncs; /* default queue handler. */
|
qfs = &default_qfuncs; /* default queue handler. */
|
||||||
qfs->mms_init_queue_module(gdir, (2 + 1)*maxthreads); /* We expect 2 each for each mmsbox thread,
|
qfs->mms_init_queue_module(gdir,
|
||||||
|
octstr_get_cstr(gdir),
|
||||||
|
(2 + 1)*maxthreads); /* We expect 2 each for each mmsbox thread,
|
||||||
* one each for each bearerbox thread.
|
* one each for each bearerbox thread.
|
||||||
*/
|
*/
|
||||||
} else {
|
} else {
|
||||||
Octstr *s = _mms_cfg_getx(cfg, grp, octstr_imm("queue-module-init-data"));
|
Octstr *s = _mms_cfg_getx(cfg, grp, octstr_imm("queue-module-init-data"));
|
||||||
if (qfs->mms_init_queue_module(s, (2+1)*maxthreads) != 0)
|
if (qfs->mms_init_queue_module(s,
|
||||||
|
octstr_get_cstr(gdir),
|
||||||
|
(2+1)*maxthreads) != 0)
|
||||||
panic(0, "failed to initialise queue module, with data: %s",
|
panic(0, "failed to initialise queue module, with data: %s",
|
||||||
octstr_get_cstr(s));
|
octstr_get_cstr(s));
|
||||||
octstr_destroy(s);
|
octstr_destroy(s);
|
||||||
|
@ -186,12 +190,14 @@ int mms_load_mmsbox_settings(Octstr *fname, gwthread_func_t *mmsc_handler_func)
|
||||||
|
|
||||||
/* XXX still uses old-style file storage. */
|
/* XXX still uses old-style file storage. */
|
||||||
if (qfs != &default_qfuncs)
|
if (qfs != &default_qfuncs)
|
||||||
default_qfuncs.mms_init_queue_module(gdir, maxthreads);
|
default_qfuncs.mms_init_queue_module(gdir,
|
||||||
|
octstr_get_cstr(gdir),
|
||||||
|
maxthreads);
|
||||||
if ((dlr_dir = default_qfuncs.mms_init_queue_dir("mmsbox_dlr", &xx)) == NULL ||
|
if ((dlr_dir = default_qfuncs.mms_init_queue_dir("mmsbox_dlr", &xx)) == NULL ||
|
||||||
xx != 0)
|
xx != 0)
|
||||||
panic(0, "Failed to initialise dlr storage directory: %s - %s!",
|
panic(0, "Failed to initialise dlr storage directory: %s - %s!",
|
||||||
octstr_get_cstr(dlr_dir), strerror(errno));
|
octstr_get_cstr(dlr_dir), strerror(errno));
|
||||||
|
|
||||||
|
|
||||||
unified_prefix = _mms_cfg_getx(cfg, grp, octstr_imm("unified-prefix"));
|
unified_prefix = _mms_cfg_getx(cfg, grp, octstr_imm("unified-prefix"));
|
||||||
|
|
||||||
|
|
|
@ -141,12 +141,15 @@ MmscSettings *mms_load_mmsc_settings(Octstr *fname, List **proxyrelays)
|
||||||
|
|
||||||
if ((m->qfs = _mms_load_module(cfg, grp, "queue-manager-module", "qfuncs", NULL)) == NULL) {
|
if ((m->qfs = _mms_load_module(cfg, grp, "queue-manager-module", "qfuncs", NULL)) == NULL) {
|
||||||
m->qfs = &default_qfuncs; /* default queue handler. */
|
m->qfs = &default_qfuncs; /* default queue handler. */
|
||||||
m->qfs->mms_init_queue_module(qdir, (2 + 2 + 2)*m->maxthreads); /* We expect 2 max for each mmsrelay component (= 4)
|
m->qfs->mms_init_queue_module(qdir,
|
||||||
|
octstr_get_cstr(qdir),
|
||||||
|
(2 + 2 + 2)*m->maxthreads); /* We expect 2 max for each mmsrelay component (= 4)
|
||||||
* + 2 for mmsproxy (on for mm1proxy, one for mm7proxy
|
* + 2 for mmsproxy (on for mm1proxy, one for mm7proxy
|
||||||
*/
|
*/
|
||||||
} else {
|
} else {
|
||||||
Octstr *s = _mms_cfg_getx(cfg, grp, octstr_imm("queue-module-init-data"));
|
Octstr *s = _mms_cfg_getx(cfg, grp, octstr_imm("queue-module-init-data"));
|
||||||
if (m->qfs->mms_init_queue_module(s, (2 + 2 + 2)*m->maxthreads) != 0)
|
if (m->qfs->mms_init_queue_module(s, octstr_get_cstr(qdir),
|
||||||
|
(2 + 2 + 2)*m->maxthreads) != 0)
|
||||||
panic(0, "failed to initialise queue module, with data: %s",
|
panic(0, "failed to initialise queue module, with data: %s",
|
||||||
octstr_get_cstr(s));
|
octstr_get_cstr(s));
|
||||||
octstr_destroy(s);
|
octstr_destroy(s);
|
||||||
|
|
Loading…
Reference in New Issue