From 61d76e1a1440c328bee2b88f65a17dcb72d78662 Mon Sep 17 00:00:00 2001 From: bagyenda <> Date: Wed, 5 Sep 2007 21:37:04 +0000 Subject: [PATCH] added extras/ directory --- mbuni/ChangeLog | 2 + mbuni/extras/pgsql-queue/Makefile | 30 + mbuni/extras/pgsql-queue/Readme.txt | 16 + mbuni/extras/pgsql-queue/mms_pgsql_queue.c | 890 +++++++++++++++++++++ mbuni/extras/pgsql-queue/tables.sql | 51 ++ 5 files changed, 989 insertions(+) create mode 100644 mbuni/extras/pgsql-queue/Makefile create mode 100644 mbuni/extras/pgsql-queue/Readme.txt create mode 100644 mbuni/extras/pgsql-queue/mms_pgsql_queue.c create mode 100644 mbuni/extras/pgsql-queue/tables.sql diff --git a/mbuni/ChangeLog b/mbuni/ChangeLog index 3570870..727b8ba 100644 --- a/mbuni/ChangeLog +++ b/mbuni/ChangeLog @@ -1,3 +1,5 @@ +2007-09-06 P. A. Bagyenda + * Added 'extras' directory, containing miscellaneous Mbuni addons. 2007-08-27 P. A. Bagyenda * added 'reroute' and 'reroute-mmsc-id' config. params to VAS GW MMSC configs 2007-08-20 P. A. Bagyenda diff --git a/mbuni/extras/pgsql-queue/Makefile b/mbuni/extras/pgsql-queue/Makefile new file mode 100644 index 0000000..3692c13 --- /dev/null +++ b/mbuni/extras/pgsql-queue/Makefile @@ -0,0 +1,30 @@ +# PGSQL queue handler for Mbuni Makefile (c) 2007 Digital Solutions +# Licence: See http://mbuni.org/license.shtml +KANNELCONF = /usr/local/bin/gw-config +PGCONF = /usr/local/pgsql/bin/pg_config +CC = gcc +MBUNIDIR = ../mbuni +LIBTOOL = libtool +ARCH = $(shell uname -s) +CFLAGS = -Wall -g -fPIC -I$(MBUNIDIR) -g -I$(MBUNIDIR)/mmlib `$(KANNELCONF) --cflags` -I`$(PGCONF) --includedir` + +ifeq ($(ARCH),Darwin) + XLDFLAGS=-dynamiclib -Wl,-flat_namespace,-undefined,suppress -arch i386 -arch ppc + LIB_EXT=dylib + CFLAGS+=-DDARWIN=1 -arch ppc -arch i386 +else + XLDFLAGS=-shared + LIB_EXT=so +endif + +LIB = libmms_pgsql_queue.$(LIB_EXT) + +ALL: $(LIB) + +$(LIB): mms_pgsql_queue.o + $(CC) $(XLDFLAGS) -o $@ $? -L$(MBUNIDIR)/mmlib -lmms `$(KANNELCONF) --libs` -L`$(PGCONF) --libdir` -lpq + +clean: + rm -f *.o *~ +clobber: clean + rm -f $(LIB) diff --git a/mbuni/extras/pgsql-queue/Readme.txt b/mbuni/extras/pgsql-queue/Readme.txt new file mode 100644 index 0000000..192f09f --- /dev/null +++ b/mbuni/extras/pgsql-queue/Readme.txt @@ -0,0 +1,16 @@ +This module provides Queue management for mbuni using PostgreSQL as the storage engine. +To use it, you need only add two lines to the mbuni config 'core' group: + +queue-manager-module = "/path_to/libmms_pgsql_queue.so" +queue-module-init-data = "number_of_db_connections:host=dbhost user=db_user passwowrd=dbpassword dbname=dbname" + +Make sure the database you are trying to connect to has already been created +and the relevant tables created using the supplied file "tables.sql". + +Some notes: + - Mbuni will open number_of_db_connections connections to the database at startup. + Ensure your PostgreSQL installation limits are not exceeded. DB connections will + be shared by all parts of Mbuni, which means that in high traffic situations, + you may experience mbuni component slow-down as all db connections might be in use. + - Vacuum your DB often, since a lot of rows are updated/deleted during operation. + diff --git a/mbuni/extras/pgsql-queue/mms_pgsql_queue.c b/mbuni/extras/pgsql-queue/mms_pgsql_queue.c new file mode 100644 index 0000000..0c2bc84 --- /dev/null +++ b/mbuni/extras/pgsql-queue/mms_pgsql_queue.c @@ -0,0 +1,890 @@ +#include +#include +#include "mms_queue.h" + +/* + * Mbuni - Open Source MMS Gateway + * + * Mbuni Queue handler module using PostgreSQL database storage + * + * Copyright (C) 2007, Digital Solutions Ltd. - http://www.dsmagic.com + * + * Paul Bagyenda + * + * This program is free software, distributed under the terms of + * the GNU General Public License, with a few exceptions granted (see LICENSE) + */ + + +/* first we need the db connection pooling. */ +#define DEFAULT_CONNECTIONS 5 +#define MIN_QRUN_INTERVAL 2 /* we don't want to hurt DB. */ +#define MQF 'q' + +static List *free_conns; +static int pgq_init_module(Octstr *conninfo) +{ + long i, n = 0; + + gw_assert(conninfo); + i = octstr_search_char(conninfo,':', 0); + if (i>0) { + n = strtoul(octstr_get_cstr(conninfo), NULL, 10); + octstr_delete(conninfo, 0, i+1); + } + + if (n <= 0) + n = DEFAULT_CONNECTIONS; + + free_conns = gwlist_create(); + gwlist_add_producer(free_conns); + for (i = 0; i 0 ? 0 : -1; +} + +static int pgq_cleanup_module(void) +{ + gw_assert(free_conns); + + gwlist_remove_producer(free_conns); + gwlist_destroy(free_conns, (void *)PQfinish); + free_conns = NULL; + + return 0; +} + + +static PGconn *get_conn(void) +{ + PGconn *c; + PGresult *r; + gw_assert(free_conns); + + c = gwlist_consume(free_conns); + + r = PQexec(c, "BEGIN"); /* start a transaction. */ + PQclear(r); + return c; +} + +static void return_conn(PGconn *c) +{ + PGresult *r; + gw_assert(free_conns); + + /* commit or destroy transaction. */ + if (PQtransactionStatus(c) == PQTRANS_INERROR) + r = PQexec(c, "ROLLBACK"); + else + r = PQexec(c, "COMMIT"); + PQclear(r); + gwlist_produce(free_conns,c); +} + + +/* structure for use within the Envelope thingie. */ +struct pgfile_t { + PGconn *conn; + char dir[256+1]; + char _pad[4]; /* paranoia */ + long qid; /* internal key into table (if any) */ +}; + + +static Octstr *pgq_init_queue_dir(char *qdir, int *error) +{ + gw_assert(free_conns); + if (error) *error = 0; + return octstr_create(qdir); /* just make a string out of it. nothing more to do. */ +} + + +static int pgq_free_envelope(MmsEnvelope *e, int removefromqueue) +{ + int ret = 0; + + struct pgfile_t *qfs; + if (e == NULL) + return 0; + + qfs = e->qfs_data; + gw_assert(qfs->conn); + + if (removefromqueue) { + char cmd[256]; + /* copy to separate table. */ + PGresult *res; + sprintf(cmd, "INSERT INTO archived_mms_messages SELECT " + " * from mms_messages WHERE id = %ld", qfs->qid); + res = PQexec(qfs->conn, cmd); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + PQclear(res); + ret = -1; + goto done; + } else + PQclear(res); + + sprintf(cmd, "INSERT INTO archived_mms_message_headers SELECT " + " * from mms_message_headers WHERE qid = %ld", qfs->qid); + res = PQexec(qfs->conn, cmd); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + PQclear(res); + ret = -1; + goto done; + } else + PQclear(res); + + sprintf(cmd, "DELETE from mms_messages WHERE id = %ld", qfs->qid); + res = PQexec(qfs->conn, cmd); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ret = -1; + PQclear(res); + } + + done: + return_conn(qfs->conn); + mms_queue_free_envelope(e); + return ret; +} + +/* Queue header 'item' names -- copied largely from file-based scheme. + * - each header represented by single letter, followed by + * a parameter. Letters mean: + * T - message type (full text string -- MMS message type. + * I - message ID + * F - From address + * R - Recipient (the ones pending) for this message + * Z - Recipient (ones already routed to) for this message. + * P - Proxy who sent it to us + * p - Proxy through which this message shd be delivered (e.g. delivery report) + * S - Message size + * s - Message subject. + * f - time of last content fetch + * t - user defined token. + * b - billed amount. + * r - whether delivery receipts are required or not. + * M - Application specific data (string) + * V - VASPID -- from VASP + * v - vasid -- from VASP + * U - url1 -- e.g. for delivery report + * u - url2 -- e.g. for read report + * H - generic headers associated with message (e.g. for passing to MMC) + */ + +static MmsEnvelope *pgq_queue_readenvelope(char *qf, char *mms_queuedir, int shouldblock) +{ + long qid, num_attempts, i, n; + time_t sendt, created, lastt, edate; + char cmd[4*QFNAMEMAX], _qf[QFNAMEMAX*2+1]; + Octstr *from = NULL; + + MmsEnvelope *e = NULL; + struct pgfile_t *pgs; + + PGconn *c = get_conn(); /* grab yourself a connection. */ + PGresult *r = NULL; + char *s; + + if (c == NULL) + return NULL; + n = strlen(qf); + PQescapeStringConn(c, _qf, qf, n < QFNAMEMAX ? n : QFNAMEMAX, NULL); + + /* read and block, to ensure no one else touches it. */ + sprintf(cmd, "SELECT id,cdate,lastt,sendt,edate,num_attempts,sender FROM " + " mms_messages_view WHERE qdir='%s' AND qfname = '%s' FOR UPDATE %s", + mms_queuedir, _qf, + shouldblock ? "" : "NOWAIT"); /* nice little PostgreSQL 8.x addition. */ + r = PQexec(c, cmd); + + if (PQresultStatus(r) != PGRES_TUPLES_OK || PQntuples(r) < 1) { + return_conn(c); + goto done; + } + + /* Get top-level values. */ + if ((s = PQgetvalue(r, 0, PQfnumber(r, "id"))) != NULL) + qid = strtoul(s, NULL, 10); + else + qid = 0; + + + if ((s = PQgetvalue(r, 0, PQfnumber(r, "cdate"))) != NULL) + created = strtoul(s, NULL, 10); + else + created = 0; + + if ((s = PQgetvalue(r, 0, PQfnumber(r, "lastt"))) != NULL) + lastt = strtoul(s, NULL, 10); + else + lastt = 0; + + if ((s = PQgetvalue(r, 0, PQfnumber(r, "sendt"))) != NULL) + sendt = strtoul(s, NULL, 10); + else + sendt = 0; + + if ((s = PQgetvalue(r, 0, PQfnumber(r, "edate"))) != NULL) + edate = strtoul(s, NULL, 10); + else + edate = 0; + + if ((s = PQgetvalue(r, 0, PQfnumber(r, "num_attempts"))) != NULL) + num_attempts = strtoul(s, NULL, 10); + else + num_attempts = 0; + + if ((s = PQgetvalue(r, 0, PQfnumber(r, "sender"))) != NULL) + from = octstr_create(s); + + PQclear(r); + + sprintf(cmd, "SELECT item,value,id FROM mms_message_headers WHERE qid=%ld FOR UPDATE", qid); + r = PQexec(c, cmd); + + if (PQresultStatus(r) != PGRES_TUPLES_OK) { + return_conn(c); + goto done; + } + + /* now create the structure. */ + e = mms_queue_create_envelope(from, NULL, + NULL, + NULL, NULL, + 0, 0, + NULL, + NULL, NULL, + NULL, NULL, + NULL, + 0, + NULL, + NULL, + qf, + sizeof (struct pgfile_t)); + + /* set the private data. */ + pgs = e->qfs_data; + pgs->conn = c; + pgs->qid = qid; + strncpy(pgs->dir, mms_queuedir, sizeof pgs->dir); + + /* set some top-level stuff. */ + e->created = created; + e->lasttry = lastt; + e->sendt = sendt; + e->expiryt = edate; + e->attempts = num_attempts; + + if (mms_validate_address(e->from) != 0) { + warning(0, "mms_queueread: Mal-formed address [%s] in queue entry %s/%s! " + "Attempting fixup.", octstr_get_cstr(e->from), mms_queuedir, qf); + _mms_fixup_address(e->from, NULL); + } + + /* now read the headers... */ + for (i = 0, n = PQntuples(r); imsgtype = mms_string_to_message_type(t); + octstr_destroy(t); + if (e->msgtype < 0) { + e->msgtype = 0; + error(0, "mms_queueread: Unknown MMS message type (%s) in queue entry %s/%s, skipped!\n", + res, mms_queuedir, qf); + } + break; + case 'I': + e->msgId = octstr_create(res); + break; + case 'R': + case 'Z': + t = octstr_create(res); + if (mms_validate_address(t) != 0) { + warning(0, "mms_queueread: Mal-formed address [%s] in queue entry %s/%s! " + "Attempting fixup.", res, mms_queuedir, qf); + _mms_fixup_address(t, NULL); + } + to = gw_malloc(sizeof *to); + to->rcpt = t; + to->process = (item[0] == 'Z') ? 0 : 1; + gwlist_append(e->to, to); + break; + case 'P': + e->fromproxy = octstr_create(res); + break; + case 'M': + e->mdata = octstr_create(res); + break; + case 'p': + e->viaproxy = octstr_create(res); + break; + case 'S': + e->msize = atol(res); + break; + case 's': + e->subject = octstr_create(res); + break; + case 't': + e->token = octstr_create(res); + break; + case 'f': + e->lastaccess = atol(res); + break; + case 'b': + e->bill.billed = 1; + e->bill.amt = atof(res); + break; + case 'r': + e->dlr = 1; + break; + case 'V': + e->vaspid = octstr_create(res); + break; + case 'v': + e->vasid = octstr_create(res); + break; + + case 'U': + e->url1 = octstr_create(res); + break; + + case 'u': + e->url2 = octstr_create(res); + break; + case 'H': + if (e->hdrs == NULL) + e->hdrs = http_create_empty_headers(); + if ((ptmp = index(res, ':')) == NULL) + error(0, "Incorrectly formatted line header [id=%ld] in queue file %s/%s!", + hid, mms_queuedir, qf); + else { + char *value = ptmp + 1; + char hname[512]; + int xlen = (ptmp - res < sizeof hname) ? ptmp - res : -1 + sizeof hname; + strncpy(hname, res, xlen); + hname[xlen] = 0; /* terminate it. */ + http_header_add(e->hdrs, hname, value); + } + break; + default: + error(0, "Unknown QF header %c in file %s/%s, skipped!", item[0], mms_queuedir, qf); + break; + } + } + + done: + if (r) + PQclear(r); + return e; +} + +/* utility writer function. */ +static int _puthdr(PGconn *c, long qid, char *hname, char *val) +{ + char cmd[QFNAMEMAX*4+1], vbuf[2*QFNAMEMAX+1]; + PGresult *r; + int ret, n; + + n = strlen(val); + PQescapeStringConn(c, vbuf, val, n < QFNAMEMAX ? n : QFNAMEMAX, NULL); + + sprintf(cmd, "INSERT INTO mms_message_headers (qid,item,value) VALUES (%ld, '%.128s', '%.128s')", + qid, hname, vbuf); + + r = PQexec(c, cmd); + ret = (PQresultStatus(r) == PGRES_COMMAND_OK) ? 0 : -1; + PQclear(r); + + return ret; +} + + +static int writeenvelope(MmsEnvelope *e, int newenv) +{ + char *s, buf[512], cmd[QFNAMEMAX*4 + 1], lastt[128], sendt[128], expiryt[128], *xfrom; + int i, n, res = 0; + struct pgfile_t *qfs = e ? e->qfs_data : NULL; + PGresult *r; + + gw_assert(e); + + if (!newenv) { + sprintf(cmd, "DELETE FROM mms_message_headers WHERE qid = %ld", qfs->qid); + r = PQexec(qfs->conn, cmd); + PQclear(r); + } + + /* Write out. */ + + /* first the top-level stuff... */ + + + if (e->lasttry) + sprintf(lastt, ", last_try='epoch'::timestamp with time zone+ '%ld secs'::interval", + e->lasttry); + else + lastt[0] = 0; + + if (e->sendt) + sprintf(sendt, ", send_time='epoch'::timestamp with time zone + '%ld secs'::interval", + e->sendt); + else + sendt[0] = 0; + + if (e->expiryt) + sprintf(expiryt, ", expire_date='epoch'::timestamp with time zone + '%ld secs'::interval", + e->expiryt); + else + expiryt[0] = 0; + + xfrom = gw_malloc(2*octstr_len(e->from)+1); + PQescapeStringConn(qfs->conn, xfrom, octstr_get_cstr(e->from), octstr_len(e->from), NULL); + sprintf(cmd, "UPDATE mms_messages SET num_attempts = %ld, sender='%s' %s %s %s WHERE id = %ld", + e->attempts, xfrom, lastt, sendt, expiryt, qfs->qid); + gw_free(xfrom); + + r = PQexec(qfs->conn, cmd); + + if (PQresultStatus(r) != PGRES_COMMAND_OK) + error(0, "pgwriteenvelope: Failed to update queue entry %s in %s: %s", + e->xqfname, qfs->dir, PQresultErrorMessage(r)); + + PQclear(r); + + /* then the rest... */ + s = (char *)mms_message_type_to_cstr(e->msgtype); + if (!s) { + error(0, "mms_queuewrite: Unknown MMS message type %d! Skipped\n", e->msgtype); + s = ""; + } + _puthdr(qfs->conn, qfs->qid, "T", s); + + if (e->msgId) + _puthdr(qfs->conn, qfs->qid, "I", octstr_get_cstr(e->msgId)); + + if (e->to) + n = gwlist_len(e->to); + else + n = 0; + + for (i = 0; i < n; i++) { + MmsEnvelopeTo *to = gwlist_get(e->to, i); + + _puthdr(qfs->conn, qfs->qid, + (to->process) ? "R" : "Z", + octstr_get_cstr(to->rcpt)); + } + + /* Output headers if any. */ + n = (e->hdrs) ? gwlist_len(e->hdrs) : 0; + for (i = 0; i < n; i++) { + Octstr *h = NULL, *v = NULL; + + http_header_get(e->hdrs, i, &h, &v); + if (h && v) { + Octstr *x = octstr_format("%s:%s", octstr_get_cstr(h), + octstr_get_cstr(v)); + _puthdr(qfs->conn, qfs->qid, "H", octstr_get_cstr(x)); + octstr_destroy(x); + } + if (h) octstr_destroy(h); + if (v) octstr_destroy(v); + + } + + if (e->lastaccess) { + sprintf(buf, "%ld", e->lastaccess); + _puthdr(qfs->conn, qfs->qid, "f", buf); + } + + sprintf(buf, "%ld", e->msize); + _puthdr(qfs->conn, qfs->qid, "S", buf); + + + if (e->fromproxy) + _puthdr(qfs->conn, qfs->qid, "P", octstr_get_cstr(e->fromproxy)); + + + if (e->mdata) + _puthdr(qfs->conn, qfs->qid, "M", octstr_get_cstr(e->mdata)); + + if (e->subject) + _puthdr(qfs->conn, qfs->qid, "s", octstr_get_cstr(e->subject)); + + + if (e->viaproxy) + _puthdr(qfs->conn, qfs->qid, "p", octstr_get_cstr(e->viaproxy)); + + if (e->token) + _puthdr(qfs->conn, qfs->qid, "t", octstr_get_cstr(e->token)); + + + if (e->vaspid) + _puthdr(qfs->conn, qfs->qid, "V", octstr_get_cstr(e->vaspid)); + + if (e->vasid) + _puthdr(qfs->conn, qfs->qid, "v", octstr_get_cstr(e->vasid)); + + if (e->url1) + _puthdr(qfs->conn, qfs->qid, "U", octstr_get_cstr(e->url1)); + + if (e->url2) + _puthdr(qfs->conn, qfs->qid, "u", octstr_get_cstr(e->url2)); + + if (e->dlr) + _puthdr(qfs->conn, qfs->qid, "r", "Yes"); + + if (e->bill.billed) { + sprintf(buf, "%.3f", e->bill.amt); + _puthdr(qfs->conn, qfs->qid,"b", buf); + } + + return res; +} + + +static Octstr *pgq_queue_add(Octstr *from, List *to, + Octstr *subject, + Octstr *fromproxy, Octstr *viaproxy, + time_t senddate, time_t expirydate, MmsMsg *m, Octstr *token, + Octstr *vaspid, Octstr *vasid, + Octstr *url1, Octstr *url2, + List *hdrs, + int dlr, + char *directory, Octstr *mmscname) +{ + char qf[QFNAMEMAX]; + long qid; + MmsEnvelope *e; + Octstr *ms = NULL, *res = NULL, *xcmd = NULL; + struct pgfile_t *qfs = NULL; + PGconn *conn = get_conn(); + PGresult *r; + char *data, *xfrom, *s; + size_t dlen; + static int ect; + + if (conn == NULL) + return NULL; + + /* get an ID for it. */ + r = PQexec(conn, "SELECT nextval('mms_messages_id_seq') as qid"); + if (PQresultStatus(r) != PGRES_TUPLES_OK || PQntuples(r) < 1) { + PQclear(r); + return_conn(conn); + return NULL; + } + s = PQgetvalue(r, 0, 0); + gw_assert(s); + + qid = strtoul(s, NULL, 10); + PQclear(r); + + /* make the long queue id. Including the integer qid ensures uniqueness.*/ + sprintf(qf, "%cf%ld-%ld.%d.x%d.%ld", + MQF, + qid, + (long)time(NULL) % 10000, + (++ect % 10000), getpid()%1000, random() % 100); + + res = octstr_create(qf); + + e = mms_queue_create_envelope(from, to, subject, fromproxy,viaproxy, + senddate,expirydate,token,vaspid,vasid, + url1,url2,hdrs,dlr,mmscname,m, + qf, + sizeof(struct pgfile_t)); + + ms = mms_tobinary(m); /* Convert message to string. */ + + qfs = e->qfs_data; + qfs->conn = conn; + strncpy(qfs->dir, directory, sizeof qfs->dir); + qfs->qid = qid; + + /* write the basic data: qid, qfname, qdir, sender, data, escape those that are not trusted. */ + + xfrom = gw_malloc(2*octstr_len(from)+1); + PQescapeStringConn(qfs->conn, xfrom, octstr_get_cstr(from), octstr_len(from), NULL); + data = (void *)PQescapeByteaConn(qfs->conn, (void *)octstr_get_cstr(ms), octstr_len(ms), &dlen); + + xcmd = octstr_format("INSERT INTO mms_messages (id, qdir, qfname, sender, data,expire_date) VALUES " + " (%ld, '%s', '%s', '%s', E'%s'::bytea, current_timestamp)", + qid, directory, qf, xfrom, data); + PQfreemem(data); + gw_free(xfrom); + + r = PQexec(qfs->conn, octstr_get_cstr(xcmd)); + + octstr_destroy(xcmd); + + if (PQresultStatus(r) != PGRES_COMMAND_OK) { + + error(0, "mms_queue_add: Failed to add data for queue entry %s in %s: %s", + e->xqfname, qfs->dir, PQresultErrorMessage(r)); + + PQclear(r); + octstr_destroy(res); + res = NULL; + goto done; + } else + PQclear(r); + + /* inserted it, now write fuller envelope. */ + + if (writeenvelope(e, 1) < 0) { + octstr_destroy(res); + res = NULL; + goto done; + } + + done: + pgq_free_envelope(e, 0); /* free thingie, relinquish connection. If error occured, this will cause a rollback.*/ + octstr_destroy(ms); + + return res; +} + +static int pgq_queue_free_env(MmsEnvelope *e) +{ + return pgq_free_envelope(e,0); +} + +/* taken exactly from file-based. XXX perhaps we didn't modularize right! */ +static int pgq_queue_update(MmsEnvelope *e) +{ + int i, n = (e && e->to) ? gwlist_len(e->to) : 0; + int hasrcpt = 0; + MmsEnvelopeTo *x; + + if (!e) return -1; + /* FIX: Don't allow expiry to be <= 0 */ + if (e->expiryt <= 0) + e->expiryt = time(NULL) + DEFAULT_EXPIRE; + for (i = 0; i < n; i++) + if ((x = gwlist_get(e->to, i)) != NULL && + x->process) { + hasrcpt = 1; + break; + } + + if (!hasrcpt) { + pgq_free_envelope(e,1); + return 1; + } else + return writeenvelope(e, 0); +} + + +static int pgq_queue_replacedata(MmsEnvelope *e, MmsMsg *m) +{ + char *data; + size_t dlen; + struct pgfile_t *qfs; + int ret = 0; + Octstr *ms, *xcmd; + PGresult *r; + + if (!e) return -1; + + qfs = e->qfs_data; + ms = mms_tobinary(m); + + data = (void *)PQescapeByteaConn(qfs->conn, (void *)octstr_get_cstr(ms), octstr_len(ms), &dlen); + xcmd = octstr_format("UPDATE mms_messages SET data=E'%s' WHERE id = %ld", + data, qfs->qid); + r = PQexec(qfs->conn, octstr_get_cstr(xcmd)); + ret = (PQresultStatus(r) != PGRES_COMMAND_OK) ? -1 : 0; /* do nothing about error. we are in a transaction.*/ + PQclear(r); + octstr_destroy(ms); + PQfreemem(data); + octstr_destroy(xcmd); + + return ret; +} + +static MmsMsg *pgq_queue_getdata(MmsEnvelope *e) +{ + struct pgfile_t *qfs; + MmsMsg *m; + Octstr *ms; + size_t dlen, n; + char cmd[512], *data, *x; + PGresult *r; + + if (e == NULL) + return NULL; + + qfs = e->qfs_data; + + sprintf(cmd, "SELECT data from mms_messages WHERE id = %ld", qfs->qid); + r = PQexec(qfs->conn, cmd); + + if (PQresultStatus(r) != PGRES_TUPLES_OK || + (n = PQntuples(r)) < 1) { + PQclear(r); + error(0, "mms_queue_getdata: Failed to load data for queue entry %s in %s", + e->xqfname, qfs->dir); + return NULL; + } + + x = PQgetvalue(r, 0, 0); + if (x && (isprint(x[0]) || x[0] == '\\')) {/* data was sent to us escapaed, so un-escape it. */ + data = (void *)PQunescapeBytea((void *)x, &dlen); + } else { + dlen = PQgetlength(r, 0, 0); /* get data length before you fetch it. */ + data = x; + } + ms = octstr_create_from_data(data, dlen); + if (x != data) PQfreemem(data); + + PQclear(r); + if (ms == NULL) { + error(0, "mms_queue_getdata: Failed to read data for queue entry %s in %s", + e->xqfname, qfs->dir); + return NULL; + } + + m = mms_frombinary(ms, octstr_imm("")); + if (!m) + error(0, "mms_queue_getdata: Failed to decode data for queue entry %s in %s", + e->xqfname, qfs->dir); + + octstr_destroy(ms); + return m; +} + +struct Qthread_data_t { + long qid; + char qf[QFNAMEMAX]; + char dir[QFNAMEMAX]; /* item to load. */ + + int (*deliver)(MmsEnvelope *e); +}; + +static void pgdeliver(List *item_list) +{ + struct Qthread_data_t *d; + + while ((d = gwlist_consume(item_list)) != NULL) { + MmsEnvelope *e = pgq_queue_readenvelope(d->qf, d->dir, 0); + int res; + time_t tnow = time(NULL); + + if (e && e->sendt <= tnow) { + debug("pgqueue_run", 0, "Queued entry %s/%s to thread %ld", + d->dir, d->qf, gwthread_self()); + res = d->deliver(e); + + if (res != 1) + pgq_free_envelope(e, 0); + } else if (e) + pgq_free_envelope(e, 0); + gw_free(d); + } + /* we're done, exit. */ +} + +#if 0 +static int cmp_thread_data(char *qfname, struct Qthread_data_t *d) +{ + gw_assert(qfname); + gw_assert(d); + + return (strncmp(d->qf, qfname, sizeof d->qf) == 0); +} +#endif +static void pgq_queue_run(char *dir, + int (*deliver)(MmsEnvelope *), + double sleepsecs, int num_threads, int *rstop) +{ + + List *items_list = gwlist_create(); + int i, n; + char cmd[512]; + + gw_assert(num_threads > 0); + + if (sleepsecs < MIN_QRUN_INTERVAL) { + warning(0, "minimum queue run interval for PG Queue module is %d secs.", MIN_QRUN_INTERVAL); + sleepsecs = MIN_QRUN_INTERVAL; + } + + gwlist_add_producer(items_list); + for (i = 0; i 0) + for (i = 0; iqid = strtoul(qid, NULL, 10); + strncpy(d->qf, qfname, sizeof d->qf); + strncpy(d->dir, dir, sizeof d->dir); + d->deliver = deliver; + + gwlist_produce(items_list, d); + } + + PQclear(r); + return_conn(c); /* return connection to pool. */ + + if (*rstop) + break; + gwthread_sleep(sleepsecs); + } while (1); + + gwlist_remove_producer(items_list); + + gwthread_join_every((gwthread_func_t *)pgdeliver); /* Wait for them all to terminate. */ + gwlist_destroy(items_list, NULL); +} + +/* export functions... */ +MmsQueueHandlerFuncs qfuncs = { + pgq_init_module, + pgq_init_queue_dir, + pgq_cleanup_module, + pgq_queue_add, + pgq_queue_update, + pgq_queue_getdata, + pgq_queue_replacedata, + pgq_queue_readenvelope, + pgq_queue_run, + pgq_queue_free_env +}; diff --git a/mbuni/extras/pgsql-queue/tables.sql b/mbuni/extras/pgsql-queue/tables.sql new file mode 100644 index 0000000..d79a052 --- /dev/null +++ b/mbuni/extras/pgsql-queue/tables.sql @@ -0,0 +1,51 @@ +-- Table structure for PostgreSQL MMS storage engine +-- (c) 2007 Digital Solutions +-- Licence: See http://mbuni.org/license.shtml +-- Author: P. A. Bagyenda +-- Requires: PostgresQL v8.x + +-- Master messages table +CREATE TABLE mms_messages ( + id serial PRIMARY KEY, + qdir varchar(256) NOT NULL, + qfname varchar(256) NOT NULL, + sender varchar(256) NOT NULL, + created timestamp with time zone NOT NULL DEFAULT current_timestamp, + last_try timestamp with time zone NOT NULL DEFAULT '-infinity', + send_time timestamp with time zone NOT NULL DEFAULT '-infinity', + expire_date timestamp with time zone NOT NULL, + num_attempts int NOT NULL DEFAULT 0, + + data bytea NOT NULL DEFAULT '', + UNIQUE(qdir, qfname) +); + +CREATE index mm_idx1 on mms_messages(qdir); -- because we use it for lookups. +CREATE index mm_idx2 on mms_messages(send_time); + +-- create a view for message lookup +CREATE VIEW mms_messages_view AS SELECT + *, + EXTRACT(EPOCH FROM created) AS cdate, + EXTRACT(EPOCH FROM last_try) AS lastt, + EXTRACT(EPOCH FROM send_time) AS sendt, + EXTRACT(EPOCH FROM expire_date) AS edate FROM mms_messages; + +-- Table for envelope headers. +CREATE TABLE mms_message_headers ( + id serial PRIMARY KEY, + qid int REFERENCES mms_messages ON UPDATE CASCADE ON DELETE CASCADE, + + item varchar(64) NOT NULL, + value text NOT NULL +); + +-- When messages are deleted from the queue, they are moved to the achived_XXX tables. +-- archive tables are exact copies of old ones, field for field. +-- DBA should clear these tables as needed +CREATE TABLE archived_mms_messages (LIKE mms_messages INCLUDING DEFAULTS INCLUDING CONSTRAINTS); +CREATE TABLE archived_mms_message_headers (LIKE mms_message_headers INCLUDING DEFAULTS INCLUDING CONSTRAINTS); + +ALTER table archived_mms_messages add unique(id); +ALTER table archived_mms_message_headers add foreign key (qid) + references archived_mms_messages (id);