1
0
Fork 0
mbuni/mbuni/mmlib/mms_queue.c

798 lines
19 KiB
C

/*
* MMS Queue handler functions - P. A. Bagyenda
*/
#include <sys/file.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <dirent.h>
#include "mms_queue.h"
#include "gwlib/log.h"
#include "gwlib/accesslog.h"
#define MQF 'q'
#define MDF 'd'
#define MTF 't'
static int free_envelope(MmsEnvelope *e, int removefromqueue);
/*
* lockfile: tries to lock a file, returns 0 if success, errno (which could be +ve) otherwise.
* we use flock()
*/
static int lockfile(int fd, int shouldblock)
{
int n, stop;
unsigned flg = shouldblock ? 0 : LOCK_NB;
do {
n = flock(fd, LOCK_EX|flg);
if (n < 0) {
if (errno == EINTR)
stop = 0;
else
stop = 1;
} else
stop = 1;
} while (!stop);
return (n == 0) ? 0 : errno;
}
static int check_lock(int fd, char *fname)
{
struct stat fs = {0}, ds = {0};
if (fstat(fd, &ds) < 0 ||
stat(fname, &fs) < 0 ||
ds.st_nlink != fs.st_nlink ||
memcmp(&ds.st_dev,&fs.st_dev, sizeof ds.st_dev) != 0 ||
memcmp(&ds.st_ino,&fs.st_ino, sizeof ds.st_ino) != 0 ||
ds.st_uid != fs.st_uid ||
ds.st_gid != fs.st_gid ||
ds.st_size != fs.st_size)
return -1;
else
return 0;
}
/* Queue file structure:
* - File consists of a series of lines, each line begins with a single letter, followed by
* a parameter. Letters mean:
* T - message type (full text string -- MMS message type.
* I - message ID
* F - From address
* R - Recipient (the ones pending) for this message
* C - Time queue entry was created
* L - Time of last delivery attempt
* D - Time of (next) delivery attempt
* X - Time of expiry of message
* N - Number of delivery attempts so far
* P - Proxy who sent it to us
* V - Proxy through which this message shd be delivered (e.g. delivery report)
* S - Message size
* s - Message subject.
* f - time of last content fetch
* t - user defined token.
* b - billed amount.
* r - whether delivery receipts are required or not.
* M - Application specific data (string)
*/
static int _putline(int fd, char *code, char buf[])
{
Octstr *s = octstr_format("%s%s\n", code, buf);
int res;
res = octstr_write_to_socket(fd, s);
octstr_destroy(s);
return res;
}
/*
* Attempt to read an envelope from queue file:
* - opens and locks the file.
* - if the lock succeeds, check that file hasn't changed since opening. If it has
* return NULL (i.e. file is being processed elsewhere -- race condition), otherwise read it.
* - If should block is 1, then does a potentially blocking attempt to lock the file.
*/
MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldblock)
{
Octstr *fname;
int fd;
Octstr *qdata, *s;
ParseContext *p;
MmsEnvelope *e;
int okfile = 0;
fname = octstr_format( "%.128s/%s", mms_queuedir, qf);
if ((fd = open(octstr_get_cstr(fname), O_RDONLY)) < 0) {
octstr_destroy(fname);
return NULL;
}
if (lockfile(fd, shouldblock) != 0 ||
check_lock(fd, octstr_get_cstr(fname)) != 0) {
close(fd);
octstr_destroy(fname);
return NULL;
}
e = gw_malloc(sizeof *e);
memset(e, 0, sizeof *e); /* Clear it all .*/
e->to = list_create();
e->qf.fd = fd;
strncpy(e->qf.name, qf, sizeof e->qf.name);
strncpy(e->qf.dir, mms_queuedir, sizeof e->qf.dir);
qdata = octstr_read_file(octstr_get_cstr(fname));
octstr_destroy(fname);
if (qdata == NULL)
qdata = octstr_imm("");
p = parse_context_create(qdata);
for (s = parse_get_line(p); s;
s = parse_get_line(p)) {
char *line = octstr_get_cstr(s);
int ch = line[0];
char *res = line + 1;
switch (ch) {
Octstr *t;
MmsEnvelopeTo *to;
case 'T':
t = octstr_create(res);
e->msgtype = mms_string_to_message_type(t);
octstr_destroy(t);
if (e->msgtype < 0) {
e->msgtype = 0;
error(0, "mms_queueread: Unknown MMS message type (%s) in file %s/%s, skipped!\n",
res, mms_queuedir, qf);
}
break;
case 'I':
e->msgId = octstr_create(res);
break;
case 'F':
e->from = octstr_create(res);
if (mms_validate_address(e->from) != 0)
error(0, "mms_queueread: Mal-formed address %s in file %s/%s!", res, mms_queuedir, qf);
break;
case 'R':
t = octstr_create(res);
if (mms_validate_address(t) != 0)
error(0, "mms_queueread: Mal-formed address %s in file %s/%s!", res, mms_queuedir, qf);;
to = gw_malloc(sizeof *to);
to->rcpt = t;
to->process = 1;
list_append(e->to, to);
break;
case 'C':
e->created = atol(res);
break;
case 'L':
e->lasttry = atol(res);
break;
case 'D':
e->sendt = atol(res);
break;
case 'X':
e->expiryt = atol(res);
break;
case 'N':
e->attempts = atol(res);
break;
case 'P':
e->fromproxy = octstr_create(res);
break;
case 'M':
e->mdata = octstr_create(res);
break;
case 'V':
e->viaproxy = octstr_create(res);
break;
case 'S':
e->msize = atol(res);
break;
case 's':
e->subject = octstr_create(res);
break;
case 't':
e->token = octstr_create(res);
break;
case 'f':
e->lastaccess = atol(res);
break;
case 'b':
e->bill.billed = 1;
e->bill.amt = atof(res);
break;
case 'r':
e->dlr = 1;
break;
case '.':
okfile = 1;
break;
default:
error(0, "Unknown QF header %c in file %s/%s!", ch, mms_queuedir, qf);
break;
}
octstr_destroy(s);
if (okfile)
break; /* We are done. */
}
parse_context_destroy(p);
octstr_destroy(qdata);
/* We should properly validate the queue file here. */
if (!okfile) {
free_envelope(e,0);
e = NULL;
error(0, "Corrupt queue control file: %s/%s", mms_queuedir, qf);
}
return e;
}
/* Updates envelope to queue file:
* - opens temp file
* - writes output to temp file, if not new else writes directly.
* - renames temp file to queue file (if not new)
* This function doesn't check that this envelope is useless (i.e. no recipients)
* - If function returns -1, caller should check errno for error.
*/
static int writeenvelope(MmsEnvelope *e, int newenv)
{
Octstr *tfname = NULL;
char *s;
char buf[16];
int fd;
int i, n;
int res = 0;
if (newenv)
fd = e->qf.fd;
else {
tfname = octstr_format(
"%s/%c%s", e->qf.dir,
MTF, e->qf.name + 1);
fd = open(octstr_get_cstr(tfname),
O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if (fd < 0 ) {
error(0, "mms_queueadd: Failed to open temp file %s: error = %s\n",
octstr_get_cstr(tfname), strerror(errno));
res = -1;
goto done;
} else if (lockfile(fd, 0) != 0 ||
check_lock(fd, octstr_get_cstr(tfname)) != 0) { /* Lock it. */
error(0, "mms_queueadd: Failed lock temp file %s: error = %s\n",
octstr_get_cstr(tfname), strerror(errno));
res = -1;
goto done;
}
}
/* Write out. */
s = mms_message_type_to_cstr(e->msgtype);
if (!s) {
error(0, "mms_queuewrite: Unknown MMS message type %d! Skipped\n", e->msgtype);
s = "";
}
_putline(fd, "T", s);
if (e->msgId)
_putline(fd, "I", octstr_get_cstr(e->msgId));
_putline(fd, "F", octstr_get_cstr(e->from));
if (e->to)
n = list_len(e->to);
else
n = 0;
for (i = 0; i < n; i++) {
MmsEnvelopeTo *to = list_get(e->to, i);
if (to->process)
_putline(fd, "R", octstr_get_cstr(to->rcpt));
}
sprintf(buf, "%ld", e->created);
_putline(fd, "C", buf);
if (e->lasttry) {
sprintf(buf, "%ld", e->lasttry);
_putline(fd, "L", buf);
}
if (e->sendt) {
sprintf(buf, "%ld", e->sendt);
_putline(fd, "D", buf);
}
if (e->expiryt) {
sprintf(buf, "%ld", e->expiryt);
_putline(fd, "X", buf);
}
if (e->attempts) {
sprintf(buf, "%ld", e->attempts);
_putline(fd, "N", buf);
}
if (e->lastaccess) {
sprintf(buf, "%ld", e->lastaccess);
_putline(fd, "f", buf);
}
sprintf(buf, "%ld", e->msize);
_putline(fd, "S", buf);
if (e->fromproxy)
_putline(fd, "P", octstr_get_cstr(e->fromproxy));
if (e->mdata)
_putline(fd, "M", octstr_get_cstr(e->mdata));
if (e->subject)
_putline(fd, "s", octstr_get_cstr(e->subject));
if (e->viaproxy)
_putline(fd, "V", octstr_get_cstr(e->viaproxy));
if (e->token)
_putline(fd, "t", octstr_get_cstr(e->token));
if (e->dlr)
_putline(fd, "r", "Yes");
if (e->bill.billed) {
sprintf(buf, "%.3f", e->bill.amt);
_putline(fd,"b", buf);
}
_putline(fd, "", ".");
fsync(fd); /* Sync data. */
if (!newenv) { /* An update */
Octstr *qfname;
qfname = octstr_format("%s/%s", e->qf.dir, e->qf.name);
if (rename(octstr_get_cstr(tfname), octstr_get_cstr(qfname)) < 0) {
error(0, "mms_queuewrite: Failed to rename %s to %s: error = %s\n",
octstr_get_cstr(qfname), octstr_get_cstr(tfname), strerror(errno));
close(fd); /* Close new one, keep old one. */
res = -1;
} else { /* On success, new descriptor replaces old one and we close old one. */
close(e->qf.fd);
e->qf.fd = fd;
}
octstr_destroy(qfname);
}
done:
if (tfname) octstr_destroy(tfname);
return res;
}
#define MAXTRIES 10
/* Makes a qf file in the queue directory.
* Makes several attempts then fails (returns -1) if it can't, fd otherwise
* puts queue file name in qf (without directory name).
* It is up to the caller to lock the file descriptor if needed.
*/
static int mkqf(char qf[32], char *mms_queuedir)
{
Octstr *tmp;
char *ctmp;
int i = 0, fd = -1;
static int ect;
if (!mms_queuedir)
gw_panic(0, "Queue directory passed as null!");
do {
tmp = octstr_format("%.64s/%cf%ld.%d.x%d%ld",
mms_queuedir, MQF,
time(NULL),
++ect, getpid(), random() % 100);
ctmp = octstr_get_cstr(tmp);
fd = open(ctmp, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if (fd >= 0 &&
(lockfile(fd,1) != 0 ||
check_lock(fd, ctmp) != 0)) {
unlink(ctmp);
close(fd);
fd = -1;
}
if (fd < 0) {
octstr_destroy(tmp);
tmp = NULL;
}
} while (i++ < MAXTRIES && fd < 0);
if (fd >= 0) {
char *s = strrchr(ctmp, '/');
if (s)
strncpy(qf, s + 1, 32);
else /* ???! */
gw_panic(0, "Queue directory name is too long!\n");
}
if (tmp) octstr_destroy(tmp);
return fd;
}
static int writemmsdata(Octstr *ms, char *df, char *mms_queuedir)
{
Octstr *dfname;
int fd, n, res = 0;
dfname = octstr_format("%s/%s", mms_queuedir, df);
fd = open(octstr_get_cstr(dfname),
O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if (fd < 0) {
error(0, "mms_queuadd: Failed to open data file %s: error = %s\n",
octstr_get_cstr(dfname), strerror(errno));
res = -1;
goto done;
}
n = octstr_write_to_socket(fd, ms);
close(fd);
if (n != 0) {
error(0, "mms_queuadd: Failed to write data file %s: error = %s\n",
octstr_get_cstr(dfname), strerror(errno));
unlink(octstr_get_cstr(dfname));
res = -1;
}
done:
octstr_destroy(dfname);
return res;
}
Octstr *mms_queue_add(Octstr *from, List *to, Octstr *msgid,
Octstr *subject,
Octstr *fromproxy, Octstr *viaproxy,
time_t senddate, time_t expirydate, MmsMsg *m, Octstr *token,
int dlr,
char *directory)
{
char qf[32];
int fd, i, n;
MmsEnvelope *e;
Octstr *ms, *res = NULL;
fd = mkqf(qf, directory);
if (fd < 0) {
error(0, "mms_queue_add: Failed err=%s\n", strerror(errno));
return NULL;
}
res = octstr_create(qf);
ms = mms_tobinary(m); /* Convert message to string. */
e = gw_malloc(sizeof *e); /* Make envelope, clear it. */
memset(e, 0, sizeof *e);
strncpy(e->qf.name, qf, sizeof e->qf.name);
strncpy(e->qf.dir, directory, sizeof e->qf.dir);
e->qf.fd = fd;
e->msgtype = mms_messagetype(m);
e->from = from;
e->created = time(NULL);
e->sendt = senddate;
e->expiryt = expirydate;
e->lasttry = 0;
e->attempts = 0;
e->lastaccess = 0;
e->fromproxy = fromproxy;
e->viaproxy = viaproxy;
e->subject = subject;
e->to = list_create();
e->msize = octstr_len(ms);
e->msgId = msgid ? msgid : res;
e->token = token;
e->dlr = dlr;
e->bill.billed = 0;
/* Insert message ID into message if it is missing. */
if (!msgid && mms_messagetype(m) == MMS_MSGTYPE_SEND_REQ)
mms_replace_header_value(m, "Message-ID", octstr_get_cstr(res));
n = to ? list_len(to) : 0;
for (i = 0; i<n; i++) {
MmsEnvelopeTo *t = gw_malloc(sizeof *t);
Octstr *a = list_get(to, i);
t->rcpt = a;
t->process = 1;
list_append(e->to, t);
}
/* Write queue data. */
if (writeenvelope(e, 1) < 0) {
octstr_destroy(res);
res = NULL;
goto done;
}
/* Write actual data before relinquishing lock on queue file. */
qf[0]= MDF;
if (writemmsdata(ms, qf, directory) < 0) {
octstr_destroy(res);
res = NULL;
goto done;
}
close(fd); /* Close queue file, thereby letting go of locks. */
done:
/* Free the envelope stuff since we do not need it any more, then free e */
for (i = 0, n = list_len(e->to); i<n; i++) {
MmsEnvelopeTo *to = list_get(e->to, i);
gw_free(to);
}
list_destroy(e->to, NULL);
gw_free(e);
octstr_destroy(ms);
return res;
}
static int free_envelope(MmsEnvelope *e, int removefromqueue)
{
int i, n;
if (e->msgId)
octstr_destroy(e->msgId);
for (i = 0, n = list_len(e->to); i < n; i++) {
MmsEnvelopeTo *x = list_get(e->to, i);
octstr_destroy(x->rcpt);
gw_free(x);
}
list_destroy(e->to, NULL);
if (e->from)
octstr_destroy(e->from);
if (e->fromproxy)
octstr_destroy(e->fromproxy);
if (e->mdata)
octstr_destroy(e->mdata);
if (e->viaproxy)
octstr_destroy(e->viaproxy);
if (e->token)
octstr_destroy(e->token);
if (e->subject)
octstr_destroy(e->subject);
if (removefromqueue) {
char fname[2*QFNAMEMAX];
snprintf(fname, -1 + sizeof fname, "%s/%s", e->qf.dir, e->qf.name);
unlink(fname);
e->qf.name[0] = MDF;
snprintf(fname, -1 + sizeof fname, "%s/%s", e->qf.dir, e->qf.name);
unlink(fname);
}
close(e->qf.fd); /* close and unlock now that we have deleted it. */
gw_free(e);
return 0;
}
int mms_queue_free_env(MmsEnvelope *e)
{
return free_envelope(e, 0);
}
int mms_queue_update(MmsEnvelope *e)
{
int i, n = (e && e->to) ? list_len(e->to) : 0;
int hasrcpt = 0;
MmsEnvelopeTo *x;
if (!e) return -1;
for (i = 0; i < n; i++)
if ((x = list_get(e->to, i)) &&
x->process) {
hasrcpt = 1;
break;
}
if (!hasrcpt) {
free_envelope(e,1);
return 1;
} else
return writeenvelope(e, 0);
}
MmsMsg *mms_queue_getdata(MmsEnvelope *e)
{
Octstr *fname;
Octstr *ms;
MmsMsg *m;
if (!e) return NULL;
fname = octstr_format("%s/%c%s", e->qf.dir, MDF, e->qf.name + 1);
ms = octstr_read_file(octstr_get_cstr(fname));
if (!ms) {
error(0, "mms_queue_getdata: Failed to load data file for queue entry %s in %s",
e->qf.name, e->qf.dir);
octstr_destroy(fname);
return NULL;
}
m = mms_frombinary(ms, octstr_imm(""));
if (!m) {
error(0, "mms_queue_getdata: Failed to load decode data file for queue entry %s in %s",
e->qf.name, e->qf.dir);
octstr_destroy(fname);
return NULL;
}
octstr_destroy(ms);
octstr_destroy(fname);
return m;
}
struct Qthread_t {
List *l;
int (*deliver)(MmsEnvelope *e);
};
static void tdeliver(struct Qthread_t *qt)
{
MmsEnvelope *e;
while ((e = list_consume(qt->l)) != NULL) {
int res = qt->deliver(e); /* If it is on the queue, it has to be delivered. */
if (res != 1) /* Then delete as it wasn't deleted. */
free_envelope(e, 0);
}
/* Consume failed, time to go away. */
if (qt->l)
list_destroy(qt->l, NULL);
qt->l = NULL; /* Signal that we are gone. */
}
void mms_queue_run(char *dir,
int (*deliver)(MmsEnvelope *),
double sleepsecs, int num_threads, int *rstop)
{
struct Qthread_t *tlist;
int i;
int qstop = 0;
gw_assert(num_threads>0);
tlist = gw_malloc(num_threads*sizeof tlist[0]);
for (i = 0; i<num_threads; i++) { /* Create threads for sending. */
tlist[i].l = list_create();
list_add_producer(tlist[i].l);
tlist[i].deliver = deliver;
gwthread_create((gwthread_func_t *)tdeliver, &tlist[i]);
}
i = 0; /* For stepping through above array. */
do {
DIR *dirp;
struct dirent *dp;
time_t tnow = time(NULL);
dirp = opendir(dir);
if (!dirp) {
error(0, "mms_queue_run: Failed to read queue directory %s, error=%s",
dir, strerror(errno));
goto qsleep;
}
while ((dp = readdir(dirp)) != NULL && !qstop)
if (dp->d_name[0] == MQF &&
dp->d_name[1] == 'f') {
MmsEnvelope *e = mms_queue_readenvelope(dp->d_name,dir, 0);
if (!e)
continue;
if (e->sendt <= tnow) {
int queued = 0;
int j = i; /* This is the next queue to use. Checking for cycles. */
do {
if (tlist[i].l) {
debug("queuerun", 0, "Queued to thread %d for %s/%s",
i, dir, dp->d_name);
list_produce(tlist[i].l, e);
queued = 1;
}
i = (i+1)%num_threads;
} while (!queued && i != j);
if (!queued) { /* A problem. There are no sender threads! */
free_envelope(e, 0);
qstop = 1;
error(0, "mms_queue_run: No active sender queues for directory %s. Quiting.",
dir);
goto qloop;
}
} else
free_envelope(e,0); /* Let go of it. */
}
if (dirp) closedir(dirp);
qsleep:
gwthread_sleep(sleepsecs);
qloop:
(void)0;
} while (!*rstop && !qstop);
/* We are out of the queue, time to go away. */
for (i = 0; i<num_threads; i++)
if (tlist[i].l)
list_remove_producer(tlist[i].l);
gwthread_join_every((gwthread_func_t *)tdeliver); /* Wait for them all to terminate. */
for (i = 0; i<num_threads; i++)
if (tlist[i].l)
list_destroy(tlist[i].l,NULL); /* Final destroy if needed. */
gw_free(tlist);
return;
}