From b2ba6dfdb56ebdaefebd96c0337630cfc0837e28 Mon Sep 17 00:00:00 2001 From: bagyenda <> Date: Sat, 28 Mar 2009 12:29:03 +0000 Subject: [PATCH] *** empty log message *** --- mbuni/ChangeLog | 2 + mbuni/extras/pgsql-queue/mms_pgsql_queue.c | 130 ++++++++++++++------- mbuni/mmsbox/mmsbox_cfg.c | 3 +- mbuni/mmsc/mmsproxy.c | 2 +- 4 files changed, 94 insertions(+), 43 deletions(-) diff --git a/mbuni/ChangeLog b/mbuni/ChangeLog index e77f3bf..fa4c005 100644 --- a/mbuni/ChangeLog +++ b/mbuni/ChangeLog @@ -1,3 +1,5 @@ +2009-03-28 P. A. Bagyenda + * Improved pgsql-queue module: re-connect to db if connections die, only allocate as many connections as needed 2009-03-05 P. A. Bagyenda * Added strip_type flag (-x) to mmsfromemail 2009-03-03 P. A. Bagyenda diff --git a/mbuni/extras/pgsql-queue/mms_pgsql_queue.c b/mbuni/extras/pgsql-queue/mms_pgsql_queue.c index 42d0d2b..312bafc 100644 --- a/mbuni/extras/pgsql-queue/mms_pgsql_queue.c +++ b/mbuni/extras/pgsql-queue/mms_pgsql_queue.c @@ -32,20 +32,23 @@ #define MIN_PG_VERSION 80200 /* v8.2 */ static List *free_conns; -static int pool_size; +static int alloc_conns, max_conns; static char topdir[512]; /* Control flags: Read from conninfo */ static int external_storage = 1; static int archive_msgs = 1; + +static Octstr *xcinfo = NULL; static const char *tdirs[] = {"active", "archive", NULL}; -static int pgq_init_module(Octstr *conninfo, char *xtopdir, int max_threads) +static int pgq_init_module(Octstr *conninfo, char *xtopdir, int max_connections) { - long i, n; - Octstr *xcinfo; - gw_assert(conninfo); + long i; + + PGconn *c; int ver = -1; + gw_assert(conninfo); xcinfo = octstr_duplicate(conninfo); /* Now look for flags */ @@ -61,32 +64,40 @@ static int pgq_init_module(Octstr *conninfo, char *xtopdir, int max_threads) octstr_destroy(x); } - n = max_threads + 1; + max_conns = max_connections + 1; - if (n <= 0) - n = DEFAULT_CONNECTIONS; - mms_info(0, "pgsql_queue", NULL, "init: Number of DB connections set to %d", (int)n); + if (max_conns <= 0) + max_conns = DEFAULT_CONNECTIONS + 1; + mms_info(0, "pgsql_queue", NULL, "init: Max # of DB connections set to %d", (int)max_conns); free_conns = gwlist_create(); + alloc_conns = 0; /* none allocated */ gwlist_add_producer(free_conns); - for (i = 0; i= v%d.%d", - MIN_PG_VERSION/10000, - (MIN_PG_VERSION/100) % 100); - } - } else { - mms_error(0, "pgsql_queue", NULL, "init: failed to connect to db: %s", - PQerrorMessage(c)); - PQfinish(c); - } - } + /* Make one connection, just so we konw it works. */ + + if ((c = PQconnectdb(octstr_get_cstr(xcinfo))) != NULL && + PQstatus(c) == CONNECTION_OK) { + gwlist_produce(free_conns, c); + alloc_conns++; + + if (ver < 0) { + ver = PQserverVersion(c); + if (ver= v%d.%d", + MIN_PG_VERSION/10000, + (MIN_PG_VERSION/100) % 100); + } + } else { + mms_error(0, "pgsql_queue", NULL, "init: failed to connect to db: %s", + PQerrorMessage(c)); + PQfinish(c); + octstr_destroy(xcinfo); + gwlist_remove_producer(free_conns); + gwlist_destroy(free_conns, (void *)PQfinish); + free_conns = NULL; + + return -1; + } srand(time(NULL)); @@ -129,7 +140,7 @@ static int pgq_init_module(Octstr *conninfo, char *xtopdir, int max_threads) } } - octstr_destroy(xcinfo); + return gwlist_len(free_conns) > 0 ? 0 : -1; } @@ -141,38 +152,71 @@ static int pgq_cleanup_module(void) sleep(2); gwlist_destroy(free_conns, (void *)PQfinish); free_conns = NULL; - pool_size = 0; + alloc_conns = 0; + octstr_destroy(xcinfo); return 0; } #define get_conn() get_conn_real(__FUNCTION__, __FILE__, __LINE__) #define return_conn(conn) return_conn_real((conn), __FUNCTION__, __FILE__, __LINE__) #define DEFAULT_PG_WAIT 60 /* fail after 2 minutes. */ +#define MAXTRIES 10 /* number of attempts to get a new connection */ + static PGconn *get_conn_real(const char *function, const char *file, const int line) { - PGconn *c; + int num_tries = MAXTRIES; + PGconn *c = NULL; PGresult *r; - - if (free_conns == NULL) return NULL; - c = gwlist_timed_consume(free_conns, DEFAULT_PG_WAIT); + if (free_conns == NULL || xcinfo == NULL) return NULL; + + do { + if ((c = gwlist_extract_first(free_conns)) != NULL && /* First try to swipe a connection. */ + (PQstatus(c) == CONNECTION_OK)) + break; + else if (c) {/* dead connection */ + PQfinish(c); + c = NULL; + } + + /* We failed to get a connection: if there are not max_connections, allocate one. */ + if (alloc_conns < max_conns && + (c = PQconnectdb(octstr_get_cstr(xcinfo))) != NULL && + (PQstatus(c) == CONNECTION_OK)) { + alloc_conns++; - /* debug("pg_cp",0, "pg_get_conn> %s:%d, %s => %d", file, line, function, (int)c); */ + break; + } else if (alloc_conns < max_conns) { /* we failed to connect, report it. */ + mms_error(0, "pgsql_queue", NULL, "get_conn: failed to connect to db: %s", + PQerrorMessage(c)); + PQfinish(c); + c = NULL; + } - if (c && PQstatus(c) == CONNECTION_OK) { /* might fail if we are shutting down. */ + /* we get here because we don't have any free connections, and we've hit the limit + * of allowed connections, or db connection allocation failed. + */ + if ((c = gwlist_timed_consume(free_conns, DEFAULT_PG_WAIT)) != NULL && + (PQstatus(c) == CONNECTION_OK)) + break; + } while (num_tries-- > 0); + + if (c && (PQstatus(c) == CONNECTION_OK)) { /* might fail if we are shutting down. */ r = PQexec(c, "BEGIN"); /* start a transaction. */ PQclear(r); } else { mms_error(0, "pgsql_queue", NULL, - "pg_get_conn: Failed to get a free connection [%s] from connection pool! " - " Consider increasing pool size (currently %d)?", + "get_conn: Failed to get a free connection [%s] from connection pool after %d tries " + " Consider increasing max pool size [%d] (allocated [%d], free [%d])?", c ? "NOT CONNECTED" : "TIMEOUT", - pool_size); + MAXTRIES, + max_conns, alloc_conns, gwlist_len(free_conns)); if (c) { PQfinish(c); c = NULL; } } + return c; } @@ -192,8 +236,10 @@ static void return_conn_real(PGconn *c, const char *function, const char *file, PQclear(r); if (PQstatus(c) == CONNECTION_OK) gwlist_produce(free_conns,c); - else + else { PQfinish(c); + alloc_conns--; + } } @@ -1137,8 +1183,10 @@ static void pgq_queue_run(char *dir, PGresult *r; char *qfname, *qid; - if (c == NULL) - break; + if (c == NULL) { + mms_warning(0, "pgsql_queue", NULL, "No connection allocated in queue_runner. will wait"); + goto l1; + } r = PQexec(c, cmd); if (PQresultStatus(r) == PGRES_TUPLES_OK && (n = PQntuples(r))> 0) for (i = 0; itype == CUSTOM_MMSC) { - if (m->fns->start_conn(m, qfs, unified_prefix, strip_prefixes, &m->data) != 0) { + if (m->fns == NULL || + m->fns->start_conn(m, qfs, unified_prefix, strip_prefixes, &m->data) != 0) { WARNING("MMSBox: Failed to start custom MMSC [%s]", octstr_get_cstr(m->id)); m->custom_started = 0; } else diff --git a/mbuni/mmsc/mmsproxy.c b/mbuni/mmsc/mmsproxy.c index f71c161..d2c60f9 100644 --- a/mbuni/mmsc/mmsproxy.c +++ b/mbuni/mmsc/mmsproxy.c @@ -202,7 +202,7 @@ void stop_mmsproxy(void) http_close_port(settings->port); if (settings->svc_list & SvcMM7) http_close_port(settings->mm7port); - if (mm7_thread > 0) + if (mm7_thread >= 0) gwthread_wakeup(mm7_thread); mms_info(0, "mmsproxy", NULL, "Signalling shutdown complete."); }