1
0
Fork 0

*** empty log message ***

This commit is contained in:
bagyenda 2009-03-28 12:29:03 +00:00
parent 828a78bcf3
commit b2ba6dfdb5
4 changed files with 94 additions and 43 deletions

View File

@ -1,3 +1,5 @@
2009-03-28 P. A. Bagyenda <bagyenda@dsmagic.com>
* Improved pgsql-queue module: re-connect to db if connections die, only allocate as many connections as needed
2009-03-05 P. A. Bagyenda <bagyenda@dsmagic.com>
* Added strip_type flag (-x) to mmsfromemail
2009-03-03 P. A. Bagyenda <bagyenda@dsmagic.com>

View File

@ -32,20 +32,23 @@
#define MIN_PG_VERSION 80200 /* v8.2 */
static List *free_conns;
static int pool_size;
static int alloc_conns, max_conns;
static char topdir[512];
/* Control flags: Read from conninfo */
static int external_storage = 1;
static int archive_msgs = 1;
static Octstr *xcinfo = NULL;
static const char *tdirs[] = {"active", "archive", NULL};
static int pgq_init_module(Octstr *conninfo, char *xtopdir, int max_threads)
static int pgq_init_module(Octstr *conninfo, char *xtopdir, int max_connections)
{
long i, n;
Octstr *xcinfo;
gw_assert(conninfo);
long i;
PGconn *c;
int ver = -1;
gw_assert(conninfo);
xcinfo = octstr_duplicate(conninfo);
/* Now look for flags */
@ -61,32 +64,40 @@ static int pgq_init_module(Octstr *conninfo, char *xtopdir, int max_threads)
octstr_destroy(x);
}
n = max_threads + 1;
max_conns = max_connections + 1;
if (n <= 0)
n = DEFAULT_CONNECTIONS;
mms_info(0, "pgsql_queue", NULL, "init: Number of DB connections set to %d", (int)n);
if (max_conns <= 0)
max_conns = DEFAULT_CONNECTIONS + 1;
mms_info(0, "pgsql_queue", NULL, "init: Max # of DB connections set to %d", (int)max_conns);
free_conns = gwlist_create();
alloc_conns = 0; /* none allocated */
gwlist_add_producer(free_conns);
for (i = 0; i<n;i++) {
PGconn *c = PQconnectdb(octstr_get_cstr(xcinfo));
if (c && PQstatus(c) == CONNECTION_OK) {
gwlist_produce(free_conns, c);
pool_size++;
if (ver < 0) {
ver = PQserverVersion(c);
if (ver<MIN_PG_VERSION)
mms_error(0, "pgsql_queue", NULL, "PostgreSQL server must be version >= v%d.%d",
MIN_PG_VERSION/10000,
(MIN_PG_VERSION/100) % 100);
}
} else {
mms_error(0, "pgsql_queue", NULL, "init: failed to connect to db: %s",
PQerrorMessage(c));
PQfinish(c);
}
}
/* Make one connection, just so we konw it works. */
if ((c = PQconnectdb(octstr_get_cstr(xcinfo))) != NULL &&
PQstatus(c) == CONNECTION_OK) {
gwlist_produce(free_conns, c);
alloc_conns++;
if (ver < 0) {
ver = PQserverVersion(c);
if (ver<MIN_PG_VERSION)
mms_warning(0, "pgsql_queue", NULL, "PostgreSQL server must be version >= v%d.%d",
MIN_PG_VERSION/10000,
(MIN_PG_VERSION/100) % 100);
}
} else {
mms_error(0, "pgsql_queue", NULL, "init: failed to connect to db: %s",
PQerrorMessage(c));
PQfinish(c);
octstr_destroy(xcinfo);
gwlist_remove_producer(free_conns);
gwlist_destroy(free_conns, (void *)PQfinish);
free_conns = NULL;
return -1;
}
srand(time(NULL));
@ -129,7 +140,7 @@ static int pgq_init_module(Octstr *conninfo, char *xtopdir, int max_threads)
}
}
octstr_destroy(xcinfo);
return gwlist_len(free_conns) > 0 ? 0 : -1;
}
@ -141,38 +152,71 @@ static int pgq_cleanup_module(void)
sleep(2);
gwlist_destroy(free_conns, (void *)PQfinish);
free_conns = NULL;
pool_size = 0;
alloc_conns = 0;
octstr_destroy(xcinfo);
return 0;
}
#define get_conn() get_conn_real(__FUNCTION__, __FILE__, __LINE__)
#define return_conn(conn) return_conn_real((conn), __FUNCTION__, __FILE__, __LINE__)
#define DEFAULT_PG_WAIT 60 /* fail after 2 minutes. */
#define MAXTRIES 10 /* number of attempts to get a new connection */
static PGconn *get_conn_real(const char *function, const char *file, const int line)
{
PGconn *c;
int num_tries = MAXTRIES;
PGconn *c = NULL;
PGresult *r;
if (free_conns == NULL) return NULL;
c = gwlist_timed_consume(free_conns, DEFAULT_PG_WAIT);
if (free_conns == NULL || xcinfo == NULL) return NULL;
do {
if ((c = gwlist_extract_first(free_conns)) != NULL && /* First try to swipe a connection. */
(PQstatus(c) == CONNECTION_OK))
break;
else if (c) {/* dead connection */
PQfinish(c);
c = NULL;
}
/* We failed to get a connection: if there are not max_connections, allocate one. */
if (alloc_conns < max_conns &&
(c = PQconnectdb(octstr_get_cstr(xcinfo))) != NULL &&
(PQstatus(c) == CONNECTION_OK)) {
alloc_conns++;
/* debug("pg_cp",0, "pg_get_conn> %s:%d, %s => %d", file, line, function, (int)c); */
break;
} else if (alloc_conns < max_conns) { /* we failed to connect, report it. */
mms_error(0, "pgsql_queue", NULL, "get_conn: failed to connect to db: %s",
PQerrorMessage(c));
PQfinish(c);
c = NULL;
}
if (c && PQstatus(c) == CONNECTION_OK) { /* might fail if we are shutting down. */
/* we get here because we don't have any free connections, and we've hit the limit
* of allowed connections, or db connection allocation failed.
*/
if ((c = gwlist_timed_consume(free_conns, DEFAULT_PG_WAIT)) != NULL &&
(PQstatus(c) == CONNECTION_OK))
break;
} while (num_tries-- > 0);
if (c && (PQstatus(c) == CONNECTION_OK)) { /* might fail if we are shutting down. */
r = PQexec(c, "BEGIN"); /* start a transaction. */
PQclear(r);
} else {
mms_error(0, "pgsql_queue", NULL,
"pg_get_conn: Failed to get a free connection [%s] from connection pool! "
" Consider increasing pool size (currently %d)?",
"get_conn: Failed to get a free connection [%s] from connection pool after %d tries "
" Consider increasing max pool size [%d] (allocated [%d], free [%d])?",
c ? "NOT CONNECTED" : "TIMEOUT",
pool_size);
MAXTRIES,
max_conns, alloc_conns, gwlist_len(free_conns));
if (c) {
PQfinish(c);
c = NULL;
}
}
return c;
}
@ -192,8 +236,10 @@ static void return_conn_real(PGconn *c, const char *function, const char *file,
PQclear(r);
if (PQstatus(c) == CONNECTION_OK)
gwlist_produce(free_conns,c);
else
else {
PQfinish(c);
alloc_conns--;
}
}
@ -1137,8 +1183,10 @@ static void pgq_queue_run(char *dir,
PGresult *r;
char *qfname, *qid;
if (c == NULL)
break;
if (c == NULL) {
mms_warning(0, "pgsql_queue", NULL, "No connection allocated in queue_runner. will wait");
goto l1;
}
r = PQexec(c, cmd);
if (PQresultStatus(r) == PGRES_TUPLES_OK && (n = PQntuples(r))> 0)
for (i = 0; i<n;

View File

@ -497,7 +497,8 @@ static void mmsbox_start_mmsc_conn(MmscGrp *m, gwthread_func_t *mmsc_handler_fun
List *errors, List *warnings)
{
if (m->type == CUSTOM_MMSC) {
if (m->fns->start_conn(m, qfs, unified_prefix, strip_prefixes, &m->data) != 0) {
if (m->fns == NULL ||
m->fns->start_conn(m, qfs, unified_prefix, strip_prefixes, &m->data) != 0) {
WARNING("MMSBox: Failed to start custom MMSC [%s]", octstr_get_cstr(m->id));
m->custom_started = 0;
} else

View File

@ -202,7 +202,7 @@ void stop_mmsproxy(void)
http_close_port(settings->port);
if (settings->svc_list & SvcMM7)
http_close_port(settings->mm7port);
if (mm7_thread > 0)
if (mm7_thread >= 0)
gwthread_wakeup(mm7_thread);
mms_info(0, "mmsproxy", NULL, "Signalling shutdown complete.");
}