1
0
Fork 0

Improved pgsql-queue code

This commit is contained in:
bagyenda 2009-03-30 11:45:10 +00:00
parent b2ba6dfdb5
commit 53b7e978e4
1 changed files with 22 additions and 6 deletions

View File

@ -41,6 +41,19 @@ static int archive_msgs = 1;
static Octstr *xcinfo = NULL;
static const char *tdirs[] = {"active", "archive", NULL};
static int conn_status(PGconn *c)
{
if (c == NULL)
return CONNECTION_BAD;
else {
PGresult *r = PQexec(c, "SELECT 1"); /* Ping the database */
int x = (PQresultStatus(r) == PGRES_TUPLES_OK) ? CONNECTION_OK : CONNECTION_BAD;
PQclear(r);
return x;
}
}
static int pgq_init_module(Octstr *conninfo, char *xtopdir, int max_connections)
{
long i;
@ -76,7 +89,7 @@ static int pgq_init_module(Octstr *conninfo, char *xtopdir, int max_connections)
/* Make one connection, just so we konw it works. */
if ((c = PQconnectdb(octstr_get_cstr(xcinfo))) != NULL &&
PQstatus(c) == CONNECTION_OK) {
conn_status(c) == CONNECTION_OK) {
gwlist_produce(free_conns, c);
alloc_conns++;
@ -172,7 +185,7 @@ static PGconn *get_conn_real(const char *function, const char *file, const int l
do {
if ((c = gwlist_extract_first(free_conns)) != NULL && /* First try to swipe a connection. */
(PQstatus(c) == CONNECTION_OK))
(conn_status(c) == CONNECTION_OK))
break;
else if (c) {/* dead connection */
PQfinish(c);
@ -182,7 +195,7 @@ static PGconn *get_conn_real(const char *function, const char *file, const int l
/* We failed to get a connection: if there are not max_connections, allocate one. */
if (alloc_conns < max_conns &&
(c = PQconnectdb(octstr_get_cstr(xcinfo))) != NULL &&
(PQstatus(c) == CONNECTION_OK)) {
(conn_status(c) == CONNECTION_OK)) {
alloc_conns++;
break;
@ -197,11 +210,11 @@ static PGconn *get_conn_real(const char *function, const char *file, const int l
* of allowed connections, or db connection allocation failed.
*/
if ((c = gwlist_timed_consume(free_conns, DEFAULT_PG_WAIT)) != NULL &&
(PQstatus(c) == CONNECTION_OK))
(conn_status(c) == CONNECTION_OK))
break;
} while (num_tries-- > 0);
if (c && (PQstatus(c) == CONNECTION_OK)) { /* might fail if we are shutting down. */
if (c && (conn_status(c) == CONNECTION_OK)) { /* might fail if we are shutting down. */
r = PQexec(c, "BEGIN"); /* start a transaction. */
PQclear(r);
} else {
@ -224,6 +237,8 @@ static void return_conn_real(PGconn *c, const char *function, const char *file,
{
PGresult *r;
gw_assert(c);
/* debug("pg_cp", 0, "pg_release_conn> %s:%d, %s => %d", file, line, function, (int)c); */
if (free_conns == NULL || c == NULL) return;
@ -234,7 +249,8 @@ static void return_conn_real(PGconn *c, const char *function, const char *file,
else
r = PQexec(c, "COMMIT");
PQclear(r);
if (PQstatus(c) == CONNECTION_OK)
if (conn_status(c) == CONNECTION_OK)
gwlist_produce(free_conns,c);
else {
PQfinish(c);