Added support for postgres database retry query on disconnection to res_config_pgsql

If your postgres connection died suddenly in between res_config_pgsql
queries, the next query will fail because the query is executed on a
disconnected/disconnecting handle.  The query is abandoned and is
returned from in error.

Now we will reconnect and try again if a query was run on a
disconnected connection.

(closes issue #18071)


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@300882 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Mark Murawki 2011-01-07 07:47:36 +00:00
parent a58b2fb395
commit a7f9ce2e77
1 changed files with 166 additions and 189 deletions

View File

@ -125,13 +125,134 @@ static void destroy_table(struct tables *table)
ast_free(table);
}
static struct tables *find_table(const char *orig_tablename)
/*! \brief Helper function for pgsql_exec. For running querys, use pgsql_exec()
*
* Connect if not currently connected. Run the given query.
*
* \param database database name we are connected to (used for error logging)
* \param tablename table name we are connected to (used for error logging)
* \param sql sql query string to execute
* \param result pointer for where to store the result handle
*
* \return -1 on fatal query error
* \return -2 on query failure that resulted in disconnection
* \return 0 on success
*
* \example see pgsql_exec for full example
*/
static int _pgsql_exec(const char *database, const char *tablename, const char *sql, PGresult **result)
{
ExecStatusType result_status;
if (!pgsqlConn) {
ast_debug(1, "PostgreSQL connection not defined, connecting\n");
if (pgsql_reconnect(database) != 1) {
ast_log(LOG_NOTICE, "reconnect failed\n");
*result = NULL;
return -1;
}
ast_debug(1, "PostgreSQL connection successful\n");
}
*result = PQexec(pgsqlConn, sql);
result_status = PQresultStatus(*result);
if (result_status != PGRES_COMMAND_OK
&& result_status != PGRES_TUPLES_OK
&& result_status != PGRES_NONFATAL_ERROR) {
ast_log(LOG_ERROR, "PostgreSQL RealTime: Failed to query '%s@%s'.\n", tablename, database);
ast_log(LOG_ERROR, "PostgreSQL RealTime: Query Failed: %s\n", sql);
ast_log(LOG_ERROR, "PostgreSQL RealTime: Query Failed because: %s (%s)\n",
PQresultErrorMessage(*result),
PQresStatus(result_status));
/* we may have tried to run a command on a disconnected/disconnecting handle */
/* are we no longer connected to the database... if not try again */
if (PQstatus(pgsqlConn) != CONNECTION_OK) {
PQfinish(pgsqlConn);
pgsqlConn = NULL;
return -2;
}
/* connection still okay, which means the query is just plain bad */
return -1;
}
ast_debug(1, "PostgreSQL query successful: %s\n", sql);
return 0;
}
/*! \brief Do a postgres query, with reconnection support
*
* Connect if not currently connected. Run the given query
* and if we're disconnected afterwards, reconnect and query again.
*
* \param database database name we are connected to (used for error logging)
* \param tablename table name we are connected to (used for error logging)
* \param sql sql query string to execute
* \param result pointer for where to store the result handle
*
* \return -1 on query failure
* \return 0 on success
*
* \example
* int i, rows;
* PGresult *result;
* char *field_name, *field_type, *field_len, *field_notnull, *field_default;
*
* pgsql_exec("db", "table", "SELECT 1", &result)
*
* rows = PQntuples(result);
* for (i = 0; i < rows; i++) {
* field_name = PQgetvalue(result, i, 0);
* field_type = PQgetvalue(result, i, 1);
* field_len = PQgetvalue(result, i, 2);
* field_notnull = PQgetvalue(result, i, 3);
* field_default = PQgetvalue(result, i, 4);
* }
*
*/
static int pgsql_exec(const char *database, const char *tablename, const char *sql, PGresult **result)
{
int attempts = 0;
int res;
/* Try the query, note failure if any */
/* On first failure, reconnect and try again (_pgsql_exec handles reconnect) */
/* On second failure, treat as fatal query error */
while (attempts++ < 2) {
ast_debug(1, "PostgreSQL query attempt %d\n", attempts);
res = _pgsql_exec(database, tablename, sql, result);
if (res == 0) {
if (attempts > 1) {
ast_log(LOG_NOTICE, "PostgreSQL RealTime: Query finally succeeded: %s\n", sql);
}
return 0;
}
if (res == -1) {
return -1; /* Still connected to db, but could not process query (fatal error) */
}
/* res == -2 (query on a disconnected handle) */
ast_debug(1, "PostgreSQL query attempt %d failed, trying again\n", attempts);
}
return -1;
}
static struct tables *find_table(const char *database, const char *orig_tablename)
{
struct columns *column;
struct tables *table;
struct ast_str *sql = ast_str_thread_get(&findtable_buf, 330);
char *pgerror;
PGresult *result;
PGresult *result;
int exec_result;
char *fname, *ftype, *flen, *fnotnull, *fdef;
int i, rows;
@ -209,11 +330,10 @@ static struct tables *find_table(const char *orig_tablename)
ast_str_set(&sql, 0, "SELECT a.attname, t.typname, a.attlen, a.attnotnull, d.adsrc, a.atttypmod FROM pg_class c, pg_type t, pg_attribute a LEFT OUTER JOIN pg_attrdef d ON a.atthasdef AND d.adrelid = a.attrelid AND d.adnum = a.attnum WHERE c.oid = a.attrelid AND a.atttypid = t.oid AND (a.attnum > 0) AND c.relname = '%s' ORDER BY c.relname, attnum", orig_tablename);
}
result = PQexec(pgsqlConn, ast_str_buffer(sql));
exec_result = pgsql_exec(database, orig_tablename, ast_str_buffer(sql), &result);
ast_debug(1, "Query of table structure complete. Now retrieving results.\n");
if (PQresultStatus(result) != PGRES_TUPLES_OK) {
pgerror = PQresultErrorMessage(result);
ast_log(LOG_ERROR, "Failed to query database columns: %s\n", pgerror);
if (exec_result != 0) {
ast_log(LOG_ERROR, "Failed to query database columns for table %s\n", orig_tablename);
PQclear(result);
AST_LIST_UNLOCK(&psql_tables);
return NULL;
@ -327,7 +447,7 @@ static struct ast_variable *realtime_pgsql(const char *database, const char *tab
ESCAPE_STRING(escapebuf, newval);
if (pgresult) {
ast_log(LOG_ERROR, "Postgres detected invalid input: '%s'\n", newval);
ast_log(LOG_ERROR, "PostgreSQL RealTime: detected invalid input: '%s'\n", newval);
va_end(ap);
return NULL;
}
@ -342,7 +462,7 @@ static struct ast_variable *realtime_pgsql(const char *database, const char *tab
ESCAPE_STRING(escapebuf, newval);
if (pgresult) {
ast_log(LOG_ERROR, "Postgres detected invalid input: '%s'\n", newval);
ast_log(LOG_ERROR, "PostgreSQL RealTime: detected invalid input: '%s'\n", newval);
va_end(ap);
return NULL;
}
@ -353,32 +473,11 @@ static struct ast_variable *realtime_pgsql(const char *database, const char *tab
/* We now have our complete statement; Lets connect to the server and execute it. */
ast_mutex_lock(&pgsql_lock);
if (!pgsql_reconnect(database)) {
ast_mutex_unlock(&pgsql_lock);
return NULL;
}
if (!(result = PQexec(pgsqlConn, ast_str_buffer(sql)))) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query '%s@%s'. Check debug for more info.\n", tablename, database);
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s\n", PQerrorMessage(pgsqlConn));
if (pgsql_exec(database, tablename, ast_str_buffer(sql), &result) != 0) {
ast_mutex_unlock(&pgsql_lock);
return NULL;
} else {
ExecStatusType result_status = PQresultStatus(result);
if (result_status != PGRES_COMMAND_OK
&& result_status != PGRES_TUPLES_OK
&& result_status != PGRES_NONFATAL_ERROR) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query '%s@%s'. Check debug for more info.\n", tablename, database);
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s (%s)\n",
PQresultErrorMessage(result), PQresStatus(result_status));
ast_mutex_unlock(&pgsql_lock);
return NULL;
}
}
}
ast_debug(1, "PostgreSQL RealTime: Result=%p Query: %s\n", result, ast_str_buffer(sql));
@ -477,7 +576,7 @@ static struct ast_config *realtime_multi_pgsql(const char *database, const char
ESCAPE_STRING(escapebuf, newval);
if (pgresult) {
ast_log(LOG_ERROR, "Postgres detected invalid input: '%s'\n", newval);
ast_log(LOG_ERROR, "PostgreSQL RealTime: detected invalid input: '%s'\n", newval);
va_end(ap);
return NULL;
}
@ -492,7 +591,7 @@ static struct ast_config *realtime_multi_pgsql(const char *database, const char
ESCAPE_STRING(escapebuf, newval);
if (pgresult) {
ast_log(LOG_ERROR, "Postgres detected invalid input: '%s'\n", newval);
ast_log(LOG_ERROR, "PostgreSQL RealTime: detected invalid input: '%s'\n", newval);
va_end(ap);
return NULL;
}
@ -508,32 +607,11 @@ static struct ast_config *realtime_multi_pgsql(const char *database, const char
/* We now have our complete statement; Lets connect to the server and execute it. */
ast_mutex_lock(&pgsql_lock);
if (!pgsql_reconnect(database)) {
ast_mutex_unlock(&pgsql_lock);
return NULL;
}
if (!(result = PQexec(pgsqlConn, ast_str_buffer(sql)))) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query %s@%s. Check debug for more info.\n", table, database);
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s\n", PQerrorMessage(pgsqlConn));
if (pgsql_exec(database, table, ast_str_buffer(sql), &result) != 0) {
ast_mutex_unlock(&pgsql_lock);
return NULL;
} else {
ExecStatusType result_status = PQresultStatus(result);
if (result_status != PGRES_COMMAND_OK
&& result_status != PGRES_TUPLES_OK
&& result_status != PGRES_NONFATAL_ERROR) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query %s@%s. Check debug for more info.\n", table, database);
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s (%s)\n",
PQresultErrorMessage(result), PQresStatus(result_status));
ast_mutex_unlock(&pgsql_lock);
return NULL;
}
}
return NULL;
}
ast_debug(1, "PostgreSQL RealTime: Result=%p Query: %s\n", result, ast_str_buffer(sql));
@ -599,7 +677,7 @@ static int update_pgsql(const char *database, const char *tablename, const char
return -1;
}
if (!(table = find_table(tablename))) {
if (!(table = find_table(database, tablename))) {
ast_log(LOG_ERROR, "Table '%s' does not exist!!\n", tablename);
return -1;
}
@ -636,7 +714,7 @@ static int update_pgsql(const char *database, const char *tablename, const char
ESCAPE_STRING(escapebuf, newval);
if (pgresult) {
ast_log(LOG_ERROR, "Postgres detected invalid input: '%s'\n", newval);
ast_log(LOG_ERROR, "PostgreSQL RealTime: detected invalid input: '%s'\n", newval);
va_end(ap);
release_table(table);
return -1;
@ -653,7 +731,7 @@ static int update_pgsql(const char *database, const char *tablename, const char
ESCAPE_STRING(escapebuf, newval);
if (pgresult) {
ast_log(LOG_ERROR, "Postgres detected invalid input: '%s'\n", newval);
ast_log(LOG_ERROR, "PostgreSQL RealTime: detected invalid input: '%s'\n", newval);
va_end(ap);
release_table(table);
return -1;
@ -666,7 +744,7 @@ static int update_pgsql(const char *database, const char *tablename, const char
ESCAPE_STRING(escapebuf, lookup);
if (pgresult) {
ast_log(LOG_ERROR, "Postgres detected invalid input: '%s'\n", lookup);
ast_log(LOG_ERROR, "PostgreSQL RealTime: detected invalid input: '%s'\n", lookup);
va_end(ap);
return -1;
}
@ -677,31 +755,10 @@ static int update_pgsql(const char *database, const char *tablename, const char
/* We now have our complete statement; Lets connect to the server and execute it. */
ast_mutex_lock(&pgsql_lock);
if (!pgsql_reconnect(database)) {
ast_mutex_unlock(&pgsql_lock);
return -1;
}
if (!(result = PQexec(pgsqlConn, ast_str_buffer(sql)))) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query database. Check debug for more info.\n");
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s\n", PQerrorMessage(pgsqlConn));
if (pgsql_exec(database, tablename, ast_str_buffer(sql), &result) != 0) {
ast_mutex_unlock(&pgsql_lock);
return -1;
} else {
ExecStatusType result_status = PQresultStatus(result);
if (result_status != PGRES_COMMAND_OK
&& result_status != PGRES_TUPLES_OK
&& result_status != PGRES_NONFATAL_ERROR) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query database. Check debug for more info.\n");
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s (%s)\n",
PQresultErrorMessage(result), PQresStatus(result_status));
ast_mutex_unlock(&pgsql_lock);
return -1;
}
}
numrows = atoi(PQcmdTuples(result));
@ -741,7 +798,7 @@ static int update2_pgsql(const char *database, const char *tablename, va_list ap
return -1;
}
if (!(table = find_table(tablename))) {
if (!(table = find_table(database, tablename))) {
ast_log(LOG_ERROR, "Table '%s' does not exist!!\n", tablename);
return -1;
}
@ -759,7 +816,7 @@ static int update2_pgsql(const char *database, const char *tablename, va_list ap
newval = va_arg(ap, const char *);
ESCAPE_STRING(escapebuf, newval);
if (pgresult) {
ast_log(LOG_ERROR, "Postgres detected invalid input: '%s'\n", newval);
ast_log(LOG_ERROR, "PostgreSQL RealTime: detected invalid input: '%s'\n", newval);
release_table(table);
ast_free(sql);
return -1;
@ -792,7 +849,7 @@ static int update2_pgsql(const char *database, const char *tablename, va_list ap
ESCAPE_STRING(escapebuf, newval);
if (pgresult) {
ast_log(LOG_ERROR, "Postgres detected invalid input: '%s'\n", newval);
ast_log(LOG_ERROR, "PostgreSQL RealTime: detected invalid input: '%s'\n", newval);
release_table(table);
ast_free(sql);
return -1;
@ -807,33 +864,10 @@ static int update2_pgsql(const char *database, const char *tablename, va_list ap
ast_debug(1, "PostgreSQL RealTime: Update SQL: %s\n", ast_str_buffer(sql));
/* We now have our complete statement; connect to the server and execute it. */
ast_mutex_lock(&pgsql_lock);
if (!pgsql_reconnect(database)) {
if (pgsql_exec(database, tablename, ast_str_buffer(sql), &result) != 0) {
ast_mutex_unlock(&pgsql_lock);
return -1;
}
if (!(result = PQexec(pgsqlConn, ast_str_buffer(sql)))) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query database. Check debug for more info.\n");
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s\n", PQerrorMessage(pgsqlConn));
ast_mutex_unlock(&pgsql_lock);
return -1;
} else {
ExecStatusType result_status = PQresultStatus(result);
if (result_status != PGRES_COMMAND_OK
&& result_status != PGRES_TUPLES_OK
&& result_status != PGRES_NONFATAL_ERROR) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query database. Check debug for more info.\n");
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s (%s)\n",
PQresultErrorMessage(result), PQresStatus(result_status));
ast_mutex_unlock(&pgsql_lock);
return -1;
}
}
return -1;
}
numrows = atoi(PQcmdTuples(result));
ast_mutex_unlock(&pgsql_lock);
@ -906,27 +940,10 @@ static int store_pgsql(const char *database, const char *table, va_list ap)
ast_debug(1, "PostgreSQL RealTime: Insert SQL: %s\n", ast_str_buffer(sql1));
if (!(result = PQexec(pgsqlConn, ast_str_buffer(sql1)))) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query database. Check debug for more info.\n");
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql1));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s\n", PQerrorMessage(pgsqlConn));
if (pgsql_exec(database, table, ast_str_buffer(sql1), &result) != 0) {
ast_mutex_unlock(&pgsql_lock);
return -1;
} else {
ExecStatusType result_status = PQresultStatus(result);
if (result_status != PGRES_COMMAND_OK
&& result_status != PGRES_TUPLES_OK
&& result_status != PGRES_NONFATAL_ERROR) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query database. Check debug for more info.\n");
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql1));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s (%s)\n",
PQresultErrorMessage(result), PQresStatus(result_status));
ast_mutex_unlock(&pgsql_lock);
return -1;
}
}
return -1;
}
insertid = PQoidValue(result);
ast_mutex_unlock(&pgsql_lock);
@ -997,27 +1014,10 @@ static int destroy_pgsql(const char *database, const char *table, const char *ke
ast_debug(1, "PostgreSQL RealTime: Delete SQL: %s\n", ast_str_buffer(sql));
if (!(result = PQexec(pgsqlConn, ast_str_buffer(sql)))) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query database. Check debug for more info.\n");
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s\n", PQerrorMessage(pgsqlConn));
if (pgsql_exec(database, table, ast_str_buffer(sql), &result) != 0) {
ast_mutex_unlock(&pgsql_lock);
return -1;
} else {
ExecStatusType result_status = PQresultStatus(result);
if (result_status != PGRES_COMMAND_OK
&& result_status != PGRES_TUPLES_OK
&& result_status != PGRES_NONFATAL_ERROR) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query database. Check debug for more info.\n");
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s (%s)\n",
PQresultErrorMessage(result), PQresStatus(result_status));
ast_mutex_unlock(&pgsql_lock);
return -1;
}
}
return -1;
}
numrows = atoi(PQcmdTuples(result));
ast_mutex_unlock(&pgsql_lock);
@ -1057,39 +1057,18 @@ static struct ast_config *config_pgsql(const char *database, const char *table,
}
ast_str_set(&sql, 0, "SELECT category, var_name, var_val, cat_metric FROM %s "
"WHERE filename='%s' and commented=0"
"WHERE filename='%s' and commented=0 "
"ORDER BY cat_metric DESC, var_metric ASC, category, var_name ", table, file);
ast_debug(1, "PostgreSQL RealTime: Static SQL: %s\n", ast_str_buffer(sql));
/* We now have our complete statement; Lets connect to the server and execute it. */
ast_mutex_lock(&pgsql_lock);
if (!pgsql_reconnect(database)) {
ast_mutex_unlock(&pgsql_lock);
return NULL;
}
if (!(result = PQexec(pgsqlConn, ast_str_buffer(sql)))) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query '%s@%s'. Check debug for more info.\n", table, database);
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s\n", PQerrorMessage(pgsqlConn));
/* We now have our complete statement; Lets connect to the server and execute it. */
if (pgsql_exec(database, table, ast_str_buffer(sql), &result) != 0) {
ast_mutex_unlock(&pgsql_lock);
return NULL;
} else {
ExecStatusType result_status = PQresultStatus(result);
if (result_status != PGRES_COMMAND_OK
&& result_status != PGRES_TUPLES_OK
&& result_status != PGRES_NONFATAL_ERROR) {
ast_log(LOG_WARNING,
"PostgreSQL RealTime: Failed to query database. Check debug for more info.\n");
ast_debug(1, "PostgreSQL RealTime: Query: %s\n", ast_str_buffer(sql));
ast_debug(1, "PostgreSQL RealTime: Query Failed because: %s (%s)\n",
PQresultErrorMessage(result), PQresStatus(result_status));
ast_mutex_unlock(&pgsql_lock);
return NULL;
}
}
return NULL;
}
if ((num_rows = PQntuples(result)) > 0) {
int rowIndex = 0;
@ -1135,7 +1114,7 @@ static struct ast_config *config_pgsql(const char *database, const char *table,
static int require_pgsql(const char *database, const char *tablename, va_list ap)
{
struct columns *column;
struct tables *table = find_table(tablename);
struct tables *table = find_table(database, tablename);
char *elm;
int type, size, res = 0;
@ -1236,15 +1215,13 @@ static int require_pgsql(const char *database, const char *tablename, va_list ap
ast_debug(1, "About to lock pgsql_lock (running alter on table '%s' to add column '%s')\n", tablename, elm);
ast_mutex_lock(&pgsql_lock);
if (!pgsql_reconnect(database)) {
ast_mutex_unlock(&pgsql_lock);
ast_log(LOG_ERROR, "Unable to add column: %s\n", ast_str_buffer(sql));
ast_free(sql);
continue;
}
ast_debug(1, "About to run ALTER query on table '%s' to add column '%s'\n", tablename, elm);
result = PQexec(pgsqlConn, ast_str_buffer(sql));
if (pgsql_exec(database, tablename, ast_str_buffer(sql), &result) != 0) {
ast_mutex_unlock(&pgsql_lock);
return -1;
}
ast_debug(1, "Finished running ALTER query on table '%s'\n", tablename);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
ast_log(LOG_ERROR, "Unable to add column: %s\n", ast_str_buffer(sql));
@ -1537,7 +1514,7 @@ static char *handle_cli_realtime_pgsql_cache(struct ast_cli_entry *e, int cmd, s
AST_LIST_UNLOCK(&psql_tables);
} else if (a->argc == 5) {
/* List of columns */
if ((cur = find_table(a->argv[4]))) {
if ((cur = find_table(cur->name, a->argv[4]))) {
struct columns *col;
ast_cli(a->fd, "Columns for Table Cache '%s':\n", a->argv[4]);
ast_cli(a->fd, "%-20.20s %-20.20s %-3.3s %-8.8s\n", "Name", "Type", "Len", "Nullable");