1
0
Fork 0

Changes to queue directory structure (now uses a maildir-style structure)

Misc. fixes.
This commit is contained in:
bagyenda 2005-04-19 09:11:43 +00:00
parent f992bfd86b
commit 0eddc08f46
10 changed files with 301 additions and 131 deletions

View File

@ -1738,6 +1738,15 @@ report), the interface carries out the necessary action
queue) queue)
</ol> </ol>
<br> <br>
This interface should be invoked from your MTA as follows:<br><br>
<tt>
mmsfromemail -f <i>from_address</i> -t <i>recipient_address</i>
-p <i>sender_mmsc_hostname</i> <i>conf_file</i>
</tt>
<br/>
<br>
Note that no IP-based security is provided at this Note that no IP-based security is provided at this
interface. It is expected that security measures (e.g. firewalls, etc) interface. It is expected that security measures (e.g. firewalls, etc)
will have been setup to ensure that messages can only reach the MTA will have been setup to ensure that messages can only reach the MTA
@ -1754,12 +1763,13 @@ NAME="Section_.1.4.5"></A><!--TableOfContentsAnchor:End-->Utilities</H4>
<br> <br>
<tt>mmsssend</tt> can be used to submit (inject) a message into the <tt>mmsssend</tt> can be used to submit (inject) a message into the
global queue. It should be invoked as follows:<br> global queue. It should be invoked as follows:<br>
<br>
<tt> <tt>
mmssend -f <i>from_address</i> -t <i>recipient_list</i> -m mmssend -f <i>from_address</i> -t <i>recipient_list</i> -m
<i>mmsfile</i> [-b] -- <i>conf_file</i> <i>mmsfile</i> [-b] <i>conf_file</i>
</tt> </tt>
<br> <br><br>
Notes: Notes:
<ul> <ul>
<li>the recipient list can be a colon-separated list of multiple <li>the recipient list can be a colon-separated list of multiple

View File

@ -35,9 +35,6 @@
#define ITEM_DEL 2 #define ITEM_DEL 2
#define ITEM_MOD 3 #define ITEM_MOD 3
#define _TT "0123456789abcdefghijklmnopqrstuvwxyz"
#define _TTSIZE (-1 + sizeof _TT)
static unsigned long hash(char *s) static unsigned long hash(char *s)
{ {
unsigned h = 0; unsigned h = 0;

View File

@ -205,7 +205,7 @@ static void encode_msgbody(Octstr *os, MmsMsg *msg)
} }
/* If ret < 0 then we need to get a field value, else we use what's passed. */ /* If ret < 0 then we need to get a field value, else we use what's passed. */
static Octstr *decode_encoded_string_value(int ret, ParseContext *context, char *hname) static Octstr *decode_encoded_string_value(int ret, ParseContext *context, unsigned char *hname)
{ {
int val; int val;
int ret2; int ret2;
@ -224,7 +224,7 @@ static Octstr *decode_encoded_string_value(int ret, ParseContext *context, char
} }
} else if (ret2 != WSP_FIELD_VALUE_NUL_STRING) { } else if (ret2 != WSP_FIELD_VALUE_NUL_STRING) {
warning(0, "Faulty header value for %s!\n", hname); warning(0, "Faulty header value for %s! [ret=%d,ret2=%d]\n", hname,ret,ret2);
res = octstr_imm(""); res = octstr_imm("");
} else } else
res = parse_get_nul_string(context); res = parse_get_nul_string(context);
@ -241,10 +241,10 @@ static int mms_unpack_well_known_field(List *unpacked, int field_type,
Octstr *xfrom, int msgtype) Octstr *xfrom, int msgtype)
{ {
int val, ret; int val, ret;
char *hname = NULL; unsigned char *hname = NULL;
Octstr *decoded = NULL; Octstr *decoded = NULL;
char *ch = NULL; unsigned char *ch = NULL;
ret = wsp_field_value(context, &val); ret = wsp_field_value(context, &val);
@ -353,13 +353,13 @@ static int mms_unpack_well_known_field(List *unpacked, int field_type,
case MMS_HEADER_RESPONSE_STATUS: case MMS_HEADER_RESPONSE_STATUS:
if (ret == WSP_FIELD_VALUE_DATA) { if (ret == WSP_FIELD_VALUE_DATA) {
char *x; unsigned char *x;
int val; int val;
decoded = wsp_unpack_integer_value(context); decoded = wsp_unpack_integer_value(context);
wsp_field_value(context, &val); wsp_field_value(context, &val);
x = mms_response_status_to_cstr(val|0x80); x = mms_response_status_to_cstr(val|0x80);
octstr_append_cstr(decoded, x); octstr_append_cstr(decoded, (char *)x);
} else } else
ch = mms_response_status_to_cstr(val|0x80); ch = mms_response_status_to_cstr(val|0x80);
@ -554,7 +554,7 @@ static int mms_unpack_well_known_field(List *unpacked, int field_type,
} }
if (ch == NULL && decoded != NULL) if (ch == NULL && decoded != NULL)
ch = octstr_get_cstr(decoded); ch = (char *)octstr_get_cstr(decoded);
if (ch == NULL) if (ch == NULL)
goto value_error; goto value_error;
@ -564,7 +564,7 @@ static int mms_unpack_well_known_field(List *unpacked, int field_type,
goto value_error; goto value_error;
} }
http_header_add(unpacked, hname, ch); http_header_add(unpacked, (char *)hname, (char *)ch);
if (decoded) octstr_destroy(decoded); if (decoded) octstr_destroy(decoded);
return val; return val;
@ -759,10 +759,10 @@ static void mms_pack_well_known_field(Octstr *os, int field_type, Octstr *value)
if (octstr_compare(octstr_imm("#insert"), value) == 0) { if (octstr_compare(octstr_imm("#insert"), value) == 0) {
c = 129; c = 129;
octstr_append_data(encoded, &c, 1); octstr_append_data(encoded, (char *)&c, 1);
} else { } else {
c = 128; c = 128;
octstr_append_data(encoded, &c, 1); octstr_append_data(encoded, (char *)&c, 1);
wsp_pack_text(encoded, value); wsp_pack_text(encoded, value);
} }
wsp_pack_value(os, encoded); wsp_pack_value(os, encoded);
@ -1285,7 +1285,7 @@ MmsMsg *mms_frommime(MIMEEntity *mime)
/* Default type is send */ /* Default type is send */
if ((s = http_header_value(m->headers, octstr_imm("X-Mms-Message-Type"))) == NULL) { if ((s = http_header_value(m->headers, octstr_imm("X-Mms-Message-Type"))) == NULL) {
http_header_add(m->headers, "X-Mms-Message-Type", http_header_add(m->headers, "X-Mms-Message-Type",
mms_message_type_to_cstr(MMS_MSGTYPE_SEND_REQ)); (char *)mms_message_type_to_cstr(MMS_MSGTYPE_SEND_REQ));
m->message_type = MMS_MSGTYPE_SEND_REQ; m->message_type = MMS_MSGTYPE_SEND_REQ;
} else { } else {
m->message_type = mms_string_to_message_type(s); m->message_type = mms_string_to_message_type(s);

View File

@ -29,6 +29,40 @@
#define MDF 'd' #define MDF 'd'
#define MTF 't' #define MTF 't'
#define DIR_SEP '/'
#define DIR_SEP_S "/"
int mms_init_queuedir(Octstr *qdir)
{
int i, ret;
char fbuf[512], *xqdir;
octstr_strip_blanks(qdir);
/* Remove trailing slashes. */
for (i = octstr_len(qdir) - 1; i >= 0; i++)
if (octstr_get_char(qdir, i) != DIR_SEP)
break;
else
octstr_delete(qdir, i,1);
xqdir = octstr_get_cstr(qdir);
if ((ret = mkdir(xqdir,
S_IRWXU|S_IRWXG)) < 0 &&
errno != EEXIST)
return errno;
for (i = 0; _TT[i]; i++) { /* initialise the top level only... */
sprintf(fbuf, "%.128s/%c", xqdir, _TT[i]);
if (mkdir(fbuf,
S_IRWXU|S_IRWXG) < 0 &&
errno != EEXIST)
return errno;
}
return 0;
}
static int free_envelope(MmsEnvelope *e, int removefromqueue); static int free_envelope(MmsEnvelope *e, int removefromqueue);
/* Queue file structure: /* Queue file structure:
@ -67,7 +101,35 @@ static int _putline(int fd, char *code, char buf[])
return res; return res;
} }
Octstr *xmake_qf(char realqf[], char subdir[])
{
Octstr *res = octstr_format("%s%s", subdir, realqf); /* Make the queue identifier -- convert '/' to '-' */
octstr_replace(res, octstr_imm(DIR_SEP_S), octstr_imm("-"));
return res;
}
/* break down a qf into dir and sub-dir. subdir contains final '/'! */
static void get_subdir(char *qf, char subdir[64], char realqf[QFNAMEMAX])
{
char *p = strrchr(qf, '-');
if (p == NULL) {
strncpy(realqf, qf, QFNAMEMAX);
subdir[0] = '\0';
} else {
int i, n;
strncpy(realqf, p + 1, QFNAMEMAX);
n = (p+1) - qf;
strncpy(subdir, qf, n);
subdir[n] = 0;
for (i = 0; i<n; i++)
if (subdir[i] == '-')
subdir[i] = DIR_SEP;
}
}
/* /*
* Attempt to read an envelope from queue file: * Attempt to read an envelope from queue file:
* - opens and locks the file. * - opens and locks the file.
@ -83,9 +145,16 @@ MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldbloc
ParseContext *p; ParseContext *p;
MmsEnvelope *e; MmsEnvelope *e;
int okfile = 0; int okfile = 0;
char subdir[64];
char realqf[QFNAMEMAX];
char *xqf = NULL;
get_subdir(qf, subdir, realqf); /* break it down... */
fname = octstr_format( "%.128s/%s", mms_queuedir, qf); fname = octstr_format( "%.128s/%s%s", mms_queuedir, subdir, realqf);
xqf = octstr_get_cstr(fname);
if ((fd = open(octstr_get_cstr(fname), O_RDONLY)) < 0) { if ((fd = open(octstr_get_cstr(fname), O_RDONLY)) < 0) {
octstr_destroy(fname); octstr_destroy(fname);
return NULL; return NULL;
@ -100,8 +169,10 @@ MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldbloc
e->to = list_create(); e->to = list_create();
e->qf.fd = fd; e->qf.fd = fd;
strncpy(e->qf.name, qf, sizeof e->qf.name); strncpy(e->qf.name, realqf, sizeof e->qf.name);
strncpy(e->qf.subdir, subdir, sizeof e->qf.subdir);
strncpy(e->qf.dir, mms_queuedir, sizeof e->qf.dir); strncpy(e->qf.dir, mms_queuedir, sizeof e->qf.dir);
strncpy(e->xqfname, qf, sizeof e->xqfname);
qdata = octstr_read_file(octstr_get_cstr(fname)); qdata = octstr_read_file(octstr_get_cstr(fname));
octstr_destroy(fname); octstr_destroy(fname);
@ -124,8 +195,8 @@ MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldbloc
octstr_destroy(t); octstr_destroy(t);
if (e->msgtype < 0) { if (e->msgtype < 0) {
e->msgtype = 0; e->msgtype = 0;
error(0, "mms_queueread: Unknown MMS message type (%s) in file %s/%s, skipped!\n", error(0, "mms_queueread: Unknown MMS message type (%s) in file %s, skipped!\n",
res, mms_queuedir, qf); res, xqf);
} }
break; break;
case 'I': case 'I':
@ -134,13 +205,13 @@ MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldbloc
case 'F': case 'F':
e->from = octstr_create(res); e->from = octstr_create(res);
if (mms_validate_address(e->from) != 0) if (mms_validate_address(e->from) != 0)
error(0, "mms_queueread: Mal-formed address %s in file %s/%s!", res, mms_queuedir, qf); error(0, "mms_queueread: Mal-formed address %s in file %s!", res, xqf);
break; break;
case 'R': case 'R':
t = octstr_create(res); t = octstr_create(res);
if (mms_validate_address(t) != 0) if (mms_validate_address(t) != 0)
error(0, "mms_queueread: Mal-formed address %s in file %s/%s!", res, mms_queuedir, qf);; error(0, "mms_queueread: Mal-formed address %s in file %s!", res, xqf);
to = gw_malloc(sizeof *to); to = gw_malloc(sizeof *to);
to->rcpt = t; to->rcpt = t;
to->process = 1; to->process = 1;
@ -199,7 +270,7 @@ MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldbloc
okfile = 1; okfile = 1;
break; break;
default: default:
error(0, "Unknown QF header %c in file %s/%s!", ch, mms_queuedir, qf); error(0, "Unknown QF header %c in file %s!", ch, xqf);
break; break;
} }
octstr_destroy(s); octstr_destroy(s);
@ -213,7 +284,7 @@ MmsEnvelope *mms_queue_readenvelope(char *qf, char *mms_queuedir, int shouldbloc
if (!okfile) { if (!okfile) {
free_envelope(e,0); free_envelope(e,0);
e = NULL; e = NULL;
error(0, "Corrupt queue control file: %s/%s", mms_queuedir, qf); error(0, "Corrupt queue control file: %s", xqf);
} }
return e; return e;
} }
@ -239,8 +310,8 @@ static int writeenvelope(MmsEnvelope *e, int newenv)
fd = e->qf.fd; fd = e->qf.fd;
else { else {
tfname = octstr_format( tfname = octstr_format(
"%s/%c%s", e->qf.dir, "%s/%s%c%s.%d", e->qf.dir, e->qf.subdir,
MTF, e->qf.name + 1); MTF, e->qf.name + 1, random());
fd = open(octstr_get_cstr(tfname), fd = open(octstr_get_cstr(tfname),
O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if (fd < 0 ) { if (fd < 0 ) {
@ -260,7 +331,7 @@ static int writeenvelope(MmsEnvelope *e, int newenv)
/* Write out. */ /* Write out. */
s = mms_message_type_to_cstr(e->msgtype); s = (char *)mms_message_type_to_cstr(e->msgtype);
if (!s) { if (!s) {
error(0, "mms_queuewrite: Unknown MMS message type %d! Skipped\n", e->msgtype); error(0, "mms_queuewrite: Unknown MMS message type %d! Skipped\n", e->msgtype);
s = ""; s = "";
@ -356,7 +427,9 @@ static int writeenvelope(MmsEnvelope *e, int newenv)
if (!newenv) { /* An update */ if (!newenv) { /* An update */
Octstr *qfname; Octstr *qfname;
qfname = octstr_format("%s/%s", e->qf.dir, e->qf.name); qfname = octstr_format("%s/%s%s", e->qf.dir,
e->qf.subdir,
e->qf.name);
if (rename(octstr_get_cstr(tfname), octstr_get_cstr(qfname)) < 0) { if (rename(octstr_get_cstr(tfname), octstr_get_cstr(qfname)) < 0) {
error(0, "mms_queuewrite: Failed to rename %s to %s: error = %s\n", error(0, "mms_queuewrite: Failed to rename %s to %s: error = %s\n",
octstr_get_cstr(qfname), octstr_get_cstr(tfname), strerror(errno)); octstr_get_cstr(qfname), octstr_get_cstr(tfname), strerror(errno));
@ -383,21 +456,46 @@ static int writeenvelope(MmsEnvelope *e, int newenv)
* puts queue file name in qf (without directory name). * puts queue file name in qf (without directory name).
* It is up to the caller to lock the file descriptor if needed. * It is up to the caller to lock the file descriptor if needed.
*/ */
static int mkqf(char qf[32], char *mms_queuedir) static int mkqf(char qf[QFNAMEMAX], char subdir[64], char *mms_queuedir)
{ {
Octstr *tmp; Octstr *xqf = NULL;
char *ctmp; char *ctmp;
int i = 0, fd = -1; int i = 0, fd = -1;
static int ect; static int ect;
if (!mms_queuedir) if (!mms_queuedir)
gw_panic(0, "Queue directory passed as null!"); gw_panic(0, "Queue directory passed as null!");
do { /* First we decide the directory into which it goes... */
tmp = octstr_format("%.64s/%cf%ld.%d.x%d%ld", if ((i = random() % 3) == 0) /* toplevel. */
mms_queuedir, MQF, subdir[0] = 0;
else if (i == 1) /* one in */
sprintf(subdir, "%c/", _TT[random() % _TTSIZE]);
else { /* two in. */
char csubdir[QFNAMEMAX];
sprintf(subdir, "%c/%c%c/",
_TT[random() % _TTSIZE],
_TT[random() % _TTSIZE],
_TT[random() % _TTSIZE]);
sprintf(csubdir, "%s/%s", mms_queuedir, subdir);
if (mkdir(csubdir,
S_IRWXU|S_IRWXG) < 0 &&
errno != EEXIST) {
error(0, "make queue file: Failed to create dir %s - %s!",
csubdir, strerror(errno));
return -1;
}
}
do {
Octstr *tmp;
xqf = octstr_format("%cf%ld.%d.x%d%ld",
MQF,
time(NULL), time(NULL),
++ect, getpid(), random() % 100); ++ect, getpid(), random() % 100);
tmp = octstr_format("%.64s/%s%S", mms_queuedir, subdir, xqf);
ctmp = octstr_get_cstr(tmp); ctmp = octstr_get_cstr(tmp);
fd = open(ctmp, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); fd = open(ctmp, O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if (fd >= 0 && if (fd >= 0 &&
@ -406,32 +504,30 @@ static int mkqf(char qf[32], char *mms_queuedir)
close(fd); close(fd);
fd = -1; fd = -1;
} }
octstr_destroy(tmp);
if (fd >= 0)
break;
if (fd < 0) {
octstr_destroy(tmp);
tmp = NULL;
}
} while (i++ < MAXTRIES && fd < 0);
if (fd >= 0) {
char *s = strrchr(ctmp, '/');
if (s)
strncpy(qf, s + 1, 32);
else /* ???! */
gw_panic(0, "Queue directory name is too long!\n");
}
if (tmp) octstr_destroy(tmp); octstr_destroy(xqf);
xqf = NULL;
} while (i++ < MAXTRIES);
if (fd >= 0)
strncpy(qf, octstr_get_cstr(xqf), QFNAMEMAX);
if (xqf) octstr_destroy(xqf);
return fd; return fd;
} }
static int writemmsdata(Octstr *ms, char *df, char *mms_queuedir) static int writemmsdata(Octstr *ms, char *df, char subdir[], char *mms_queuedir)
{ {
Octstr *dfname; Octstr *dfname;
int fd, n, res = 0; int fd, n, res = 0;
dfname = octstr_format("%s/%s", mms_queuedir, df); dfname = octstr_format("%s/%s%s", mms_queuedir, subdir, df);
fd = open(octstr_get_cstr(dfname), fd = open(octstr_get_cstr(dfname),
O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); O_WRONLY|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
@ -466,20 +562,22 @@ Octstr *mms_queue_add(Octstr *from, List *to,
int dlr, int dlr,
char *directory, Octstr *mmscname) char *directory, Octstr *mmscname)
{ {
char qf[32]; char qf[QFNAMEMAX], subdir[64];
int fd, i, n; int fd, i, n;
MmsEnvelope *e; MmsEnvelope *e;
Octstr *msgid; Octstr *msgid, *s = NULL;
Octstr *ms, *res = NULL; Octstr *ms, *res = NULL;
fd = mkqf(qf, directory); fd = mkqf(qf, subdir, directory);
if (fd < 0) { if (fd < 0) {
error(0, "mms_queue_add: Failed err=%s\n", strerror(errno)); error(0, "mms_queue_add[%s]: Failed err=%s\n", directory, strerror(errno));
return NULL; return NULL;
} }
res = octstr_create(qf); res = xmake_qf(qf, subdir);
ms = mms_tobinary(m); /* Convert message to string. */ ms = mms_tobinary(m); /* Convert message to string. */
msgid = mms_maketransid(octstr_get_cstr(res), mmscname); msgid = mms_maketransid(octstr_get_cstr(res), mmscname);
@ -488,6 +586,7 @@ Octstr *mms_queue_add(Octstr *from, List *to,
memset(e, 0, sizeof *e); memset(e, 0, sizeof *e);
strncpy(e->qf.name, qf, sizeof e->qf.name); strncpy(e->qf.name, qf, sizeof e->qf.name);
strncpy(e->qf.subdir, subdir, sizeof e->qf.subdir);
strncpy(e->qf.dir, directory, sizeof e->qf.dir); strncpy(e->qf.dir, directory, sizeof e->qf.dir);
e->qf.fd = fd; e->qf.fd = fd;
@ -514,12 +613,13 @@ Octstr *mms_queue_add(Octstr *from, List *to,
e->bill.billed = 0; e->bill.billed = 0;
/* Insert message ID into message if it is missing. */ /* Insert message ID into message if it is missing. */
if (!msgid && mms_messagetype(m) == MMS_MSGTYPE_SEND_REQ) if (mms_messagetype(m) == MMS_MSGTYPE_SEND_REQ &&
(s = mms_get_header_value(m, octstr_imm("Message-ID"))) == NULL)
mms_replace_header_value(m, "Message-ID", octstr_get_cstr(msgid)); mms_replace_header_value(m, "Message-ID", octstr_get_cstr(msgid));
else if (s)
n = to ? list_len(to) : 0; octstr_destroy(s);
for (i = 0; i<n; i++) { for (i = 0, n = to ? list_len(to) : 0; i<n; i++) {
MmsEnvelopeTo *t = gw_malloc(sizeof *t); MmsEnvelopeTo *t = gw_malloc(sizeof *t);
Octstr *a = list_get(to, i); Octstr *a = list_get(to, i);
@ -541,7 +641,7 @@ Octstr *mms_queue_add(Octstr *from, List *to,
qf[0]= MDF; qf[0]= MDF;
if (writemmsdata(ms, qf, directory) < 0) { if (writemmsdata(ms, qf, subdir, directory) < 0) {
octstr_destroy(res); octstr_destroy(res);
res = NULL; res = NULL;
goto done; goto done;
@ -598,10 +698,10 @@ static int free_envelope(MmsEnvelope *e, int removefromqueue)
if (removefromqueue) { if (removefromqueue) {
char fname[2*QFNAMEMAX]; char fname[2*QFNAMEMAX];
snprintf(fname, -1 + sizeof fname, "%s/%s", e->qf.dir, e->qf.name); snprintf(fname, -1 + sizeof fname, "%s/%s%s", e->qf.dir, e->qf.subdir, e->qf.name);
unlink(fname); unlink(fname);
e->qf.name[0] = MDF; e->qf.name[0] = MDF;
snprintf(fname, -1 + sizeof fname, "%s/%s", e->qf.dir, e->qf.name); snprintf(fname, -1 + sizeof fname, "%s/%s%s", e->qf.dir, e->qf.subdir, e->qf.name);
unlink(fname); unlink(fname);
} }
close(e->qf.fd); /* close and unlock now that we have deleted it. */ close(e->qf.fd); /* close and unlock now that we have deleted it. */
@ -650,18 +750,17 @@ int mms_queue_replacedata(MmsEnvelope *e, MmsMsg *m)
ms = mms_tobinary(m); ms = mms_tobinary(m);
if (writemmsdata(ms, octstr_get_cstr(tfname), e->qf.dir) < 0) if (writemmsdata(ms, octstr_get_cstr(tfname), e->qf.subdir, e->qf.dir) < 0)
ret = -1; ret = -1;
else { else {
Octstr *fname = octstr_format("%s/%c%s", e->qf.dir, MDF, e->qf.name + 1); Octstr *fname = octstr_format("%s/%s%c%s", e->qf.dir, e->qf.subdir, MDF, e->qf.name + 1);
Octstr *tmpf = octstr_format("%s/%S", e->qf.dir, tfname); Octstr *tmpf = octstr_format("%s/%s%S", e->qf.dir, e->qf.subdir, tfname);
if (rename(octstr_get_cstr(tmpf), octstr_get_cstr(fname)) < 0) { if (rename(octstr_get_cstr(tmpf), octstr_get_cstr(fname)) < 0) {
error(0, "mms_replacedata: Failed to write data file %s: error = %s\n", error(0, "mms_replacedata: Failed to write data file %s: error = %s\n",
octstr_get_cstr(tmpf), strerror(errno)); octstr_get_cstr(tmpf), strerror(errno));
ret = -1; ret = -1;
unlink(octstr_get_cstr(tmpf)); /* remove it. */ unlink(octstr_get_cstr(tmpf)); /* remove it. */
} }
octstr_destroy(fname); octstr_destroy(fname);
octstr_destroy(tmpf); octstr_destroy(tmpf);
} }
@ -679,7 +778,7 @@ MmsMsg *mms_queue_getdata(MmsEnvelope *e)
if (!e) return NULL; if (!e) return NULL;
fname = octstr_format("%s/%c%s", e->qf.dir, MDF, e->qf.name + 1); fname = octstr_format("%s/%s%c%s", e->qf.dir, e->qf.subdir, MDF, e->qf.name + 1);
ms = octstr_read_file(octstr_get_cstr(fname)); ms = octstr_read_file(octstr_get_cstr(fname));
if (!ms) { if (!ms) {
@ -723,14 +822,86 @@ static void tdeliver(struct Qthread_t *qt)
qt->l = NULL; /* Signal that we are gone. */ qt->l = NULL; /* Signal that we are gone. */
} }
/* runs over a single directory, running queue items. return -1 if failed to run some item.
* each directory found is pushed onto stack for future processing.
* dir must have trailing slash
* return value of -2 means quit.
*/
static int run_dir(char *topdir, char *dir, struct Qthread_t *tlist, int num_threads, int *i, List *stack)
{
DIR *dirp;
struct dirent *dp;
time_t tnow = time(NULL);
Octstr *tdir = octstr_format("%s/%s", topdir, dir);
char *xdir = octstr_get_cstr(tdir);
int ret = 0;
dirp = opendir(xdir);
if (!dirp) {
error(0, "mms_queue_run: Failed to read queue directory %s, error=%s",
xdir, strerror(errno));
ret = -1;
goto done;
}
while ((dp = readdir(dirp)) != NULL)
if ((dp->d_type & DT_REG) &&
dp->d_name[0] == MQF &&
dp->d_name[1] == 'f') {
Octstr *xqf = xmake_qf(dp->d_name, dir);
MmsEnvelope *e = mms_queue_readenvelope(octstr_get_cstr(xqf),topdir, 0);
octstr_destroy(xqf);
if (!e)
continue;
if (e->sendt <= tnow) {
int queued = 0;
int j = *i; /* This is the next thread to use. Checking for cycles. */
do {
if (tlist[*i].l) {
debug("queuerun", 0, "Queued to thread %d for %s%s",
*i, xdir, dp->d_name);
list_produce(tlist[*i].l, e);
queued = 1;
}
*i = (*i+1)%num_threads;
} while (!queued && *i != j);
if (!queued) { /* A problem. There are no sender threads! */
free_envelope(e, 0);
error(0, "mms_queue_run: No active sender queues for directory %s. Quiting.",
xdir);
ret = -2;
break;
}
} else
free_envelope(e,0); /* Let go of it. */
} else if ((dp->d_type & DT_DIR) &&
strcmp(dp->d_name, ".") != 0 &&
strcmp(dp->d_name, "..") != 0) {
Octstr *newdir = octstr_format("%s%s/", dir, dp->d_name);
list_append(stack, newdir); /* push it... */
}
if (dirp) closedir(dirp);
done:
if (tdir)
octstr_destroy(tdir);
return ret;
}
void mms_queue_run(char *dir, void mms_queue_run(char *dir,
int (*deliver)(MmsEnvelope *), int (*deliver)(MmsEnvelope *),
double sleepsecs, int num_threads, int *rstop) double sleepsecs, int num_threads, int *rstop)
{ {
struct Qthread_t *tlist; struct Qthread_t *tlist;
int i; int i, qstop = 0;
int qstop = 0; List *stack = list_create();
gw_assert(num_threads>0); gw_assert(num_threads>0);
tlist = gw_malloc(num_threads*sizeof tlist[0]); tlist = gw_malloc(num_threads*sizeof tlist[0]);
@ -742,59 +913,31 @@ void mms_queue_run(char *dir,
gwthread_create((gwthread_func_t *)tdeliver, &tlist[i]); gwthread_create((gwthread_func_t *)tdeliver, &tlist[i]);
} }
i = 0; /* For stepping through above array. */ i = 0; /* For stepping through above array. */
do { do {
DIR *dirp; Octstr *xdir = NULL;
struct dirent *dp; list_append(stack, octstr_create("")); /* Put initial dir on there. */
time_t tnow = time(NULL);
dirp = opendir(dir);
if (!dirp) {
error(0, "mms_queue_run: Failed to read queue directory %s, error=%s",
dir, strerror(errno));
goto qsleep;
}
while ((dp = readdir(dirp)) != NULL && !qstop)
if (dp->d_name[0] == MQF &&
dp->d_name[1] == 'f') {
MmsEnvelope *e = mms_queue_readenvelope(dp->d_name,dir, 0);
if (!e)
continue;
if (e->sendt <= tnow) {
int queued = 0;
int j = i; /* This is the next queue to use. Checking for cycles. */
do {
if (tlist[i].l) {
debug("queuerun", 0, "Queued to thread %d for %s/%s",
i, dir, dp->d_name);
list_produce(tlist[i].l, e);
queued = 1;
}
i = (i+1)%num_threads;
} while (!queued && i != j);
if (!queued) { /* A problem. There are no sender threads! */
free_envelope(e, 0);
qstop = 1;
error(0, "mms_queue_run: No active sender queues for directory %s. Quiting.",
dir);
goto qloop;
}
} else
free_envelope(e,0); /* Let go of it. */
while (!*rstop &&
(xdir = list_extract_first(stack)) != NULL) {
int ret = run_dir(dir, octstr_get_cstr(xdir), tlist, num_threads, &i, stack);
octstr_destroy(xdir);
xdir = NULL;
if (ret < 0) {
if (ret <= -2)
qstop = 1;
goto qloop;
} }
}
if (dirp) closedir(dirp); if (xdir)
qsleep: octstr_destroy(xdir);
gwthread_sleep(sleepsecs); if (*rstop)
break;
qloop: qloop:
(void)0; gwthread_sleep(sleepsecs);
} while (!*rstop && !qstop); } while (!qstop);
/* We are out of the queue, time to go away. */ /* We are out of the queue, time to go away. */
for (i = 0; i<num_threads; i++) for (i = 0; i<num_threads; i++)
@ -807,6 +950,7 @@ void mms_queue_run(char *dir,
list_destroy(tlist[i].l,NULL); /* Final destroy if needed. */ list_destroy(tlist[i].l,NULL); /* Final destroy if needed. */
gw_free(tlist); gw_free(tlist);
list_destroy(stack, (list_item_destructor_t *)octstr_destroy);
return; return;
} }

View File

@ -60,15 +60,18 @@ typedef struct MmsEnvelope {
Octstr *mdata; /* Generic string data used by any interface. */ Octstr *mdata; /* Generic string data used by any interface. */
Octstr *fromproxy; /* Which proxy sent us this message.*/ Octstr *fromproxy; /* Which proxy sent us this message.*/
Octstr *viaproxy; /* Which proxy must we send this message through. */ Octstr *viaproxy; /* Which proxy must we send this message through. */
struct { /* Name of the queue file, pointer to it (locked). DO NOT MUCK WITH THESE! */ struct { /* Name of the queue file, pointer to it (locked). DO NOT USE WITH THESE! */
char name[QFNAMEMAX]; /* Name of the file. */ char name[QFNAMEMAX]; /* Name of the file. */
char dir[QFNAMEMAX]; /* Directory in which file is .*/ char dir[QFNAMEMAX]; /* Directory in which file is .*/
char subdir[64]; /* and the sub-directory. */
char _pad[16]; char _pad[16];
int fd; int fd;
} qf; } qf;
char xqfname[64+QFNAMEMAX]; /* The full ID for the queue. Use this. */
} MmsEnvelope; } MmsEnvelope;
/* Given a queue directory, initialise it. Must be called at least once on each queue dir. */
int mms_init_queuedir(Octstr *qdir);
/* /*
* Add a message to the queue, returns 0 on success -1 otherwise (error is logged). * Add a message to the queue, returns 0 on success -1 otherwise (error is logged).
* 'to' is a list of Octstr * *. * 'to' is a list of Octstr * *.

View File

@ -23,6 +23,7 @@
#include <unistd.h> #include <unistd.h>
#include "mms_util.h" #include "mms_util.h"
#include "mms_queue.h"
#include "mms_uaprof.h" #include "mms_uaprof.h"
#define MAXQTRIES 100 #define MAXQTRIES 100
@ -111,6 +112,16 @@ MmsBoxSettings *mms_load_mmsbox_settings(Cfg *cfg)
m->global_queuedir = cfg_getx(grp, octstr_imm("send-queue-directory")); m->global_queuedir = cfg_getx(grp, octstr_imm("send-queue-directory"));
m->mm1_queuedir = cfg_getx(grp, octstr_imm("mm1-queue-directory")); m->mm1_queuedir = cfg_getx(grp, octstr_imm("mm1-queue-directory"));
m->mm4_queuedir = cfg_getx(grp, octstr_imm("mm4-queue-directory")); m->mm4_queuedir = cfg_getx(grp, octstr_imm("mm4-queue-directory"));
if (mms_init_queuedir(m->mm1_queuedir) < 0)
panic(0, "Failed to initialise local queue directory: %s - %s!",
octstr_get_cstr(m->mm1_queuedir), strerror(errno));
else if (mms_init_queuedir(m->global_queuedir) < 0)
panic(0, "Failed to initialise global queue directory: %s - %s!",
octstr_get_cstr(m->global_queuedir), strerror(errno));
m->mmbox_rootdir = cfg_getx(grp, octstr_imm("mmbox-root-directory")); m->mmbox_rootdir = cfg_getx(grp, octstr_imm("mmbox-root-directory"));
if (cfg_get_integer(&m->maxsendattempts, grp, octstr_imm("maximum-send-attempts")) == -1) if (cfg_get_integer(&m->maxsendattempts, grp, octstr_imm("maximum-send-attempts")) == -1)

View File

@ -37,6 +37,10 @@
#define EAIF_VERSION "3.0" #define EAIF_VERSION "3.0"
/* used by mmbox and queue code -- directory stuff. */
#define _TT "0123456789abcdefghijklmnopqrstuvwxyz"
#define _TTSIZE (-1 + sizeof _TT)
typedef struct MmsProxyRelay { typedef struct MmsProxyRelay {
Octstr *host; Octstr *host;
Octstr *name; Octstr *name;

View File

@ -64,7 +64,7 @@ static void start_push(Octstr *rcpt_to, int isphonenum, MmsEnvelope *e, MmsMsg *
info(0, "mms2mobile.startpush: notification to %s\n", octstr_get_cstr(rcpt_to)); info(0, "mms2mobile.startpush: notification to %s\n", octstr_get_cstr(rcpt_to));
if (!rcpt_to) { if (!rcpt_to) {
error(0, "mobilesender: Queue entry %s has no recipient address!", e->qf.name); error(0, "mobilesender: Queue entry %s has no recipient address!", e->xqfname);
goto done; goto done;
} else } else
to = octstr_duplicate(rcpt_to); to = octstr_duplicate(rcpt_to);
@ -169,12 +169,12 @@ static int receive_push_reply(HTTPCaller *caller)
if (xto) if (xto)
to = xto->rcpt; to = xto->rcpt;
else { else {
error(0, "mobilesender: Queue entry %s has no recipient address!", env->qf.name); error(0, "mobilesender: Queue entry %s has no recipient address!", env->xqfname);
goto push_failed; goto push_failed;
} }
info(0, "send2mobile.push_reply[%s]: From %s, to %s => %d", info(0, "send2mobile.push_reply[%s]: From %s, to %s => %d",
env->qf.name, env->xqfname,
octstr_get_cstr(env->from), octstr_get_cstr(to), http_status); octstr_get_cstr(env->from), octstr_get_cstr(to), http_status);
if (http_status == HTTP_UNAUTHORIZED || if (http_status == HTTP_UNAUTHORIZED ||
@ -239,7 +239,7 @@ static int sendNotify(MmsEnvelope *e)
if (!xto) { if (!xto) {
error(0, "mobilesender: Queue entry %s with no recipients!", error(0, "mobilesender: Queue entry %s with no recipients!",
e->qf.name); e->xqfname);
return 0; return 0;
} }
@ -355,10 +355,10 @@ static int sendNotify(MmsEnvelope *e)
} }
/* To get here means we can send Ind. */ /* To get here means we can send Ind. */
url = mms_makefetchurl(e->qf.name, e->token, MMS_LOC_MQUEUE, url = mms_makefetchurl(e->xqfname, e->token, MMS_LOC_MQUEUE,
to, to,
settings); settings);
transid = mms_maketransid(e->qf.name, settings->host_alias); transid = mms_maketransid(e->xqfname, settings->host_alias);
smsg = mms_notification(msg, e->msize, url, transid, smsg = mms_notification(msg, e->msize, url, transid,
e->expiryt ? e->expiryt : e->expiryt ? e->expiryt :

View File

@ -1565,7 +1565,7 @@ static MmsVasp *find_mm7sender(List *headers, List *vasps)
MmsVasp *m = NULL; MmsVasp *m = NULL;
int i, n; int i, n;
#if 1 #if 0
return list_get(vasps,0); /* XXX for testing... */ return list_get(vasps,0); /* XXX for testing... */
#endif #endif
if (!v || if (!v ||

View File

@ -92,7 +92,8 @@ int main(int argc, char *argv[])
/* Start global queue runner. */ /* Start global queue runner. */
info(0, "Starting Global Queue Runner..."); info(0, "Starting Global Queue Runner...");
qthread = gwthread_create((gwthread_func_t *)mbuni_global_queue_runner, &rstop); qthread = gwthread_create((gwthread_func_t *)mbuni_global_queue_runner, &rstop);
/* Start the local queue runner. */ /* Start the local queue runner. */
info(0, "Starting Local Queue Runner..."); info(0, "Starting Local Queue Runner...");
mbuni_mm1_queue_runner(&rstop); mbuni_mm1_queue_runner(&rstop);