Merge "res_odbc: Implement a connection pool."

This commit is contained in:
Joshua Colp 2016-06-07 12:17:16 -05:00 committed by Gerrit Code Review
commit 33787459c3
3 changed files with 186 additions and 55 deletions

View file

@ -381,6 +381,12 @@ res_hep
valid value using the specified 'uuid_type', the module may fallback to a
more readily available source for the correlation UUID.
res_odbc
------------------
* A new option has been added, 'max_connections', which sets the maximum number
of concurrent connections to the database. This option defaults to 1 which
returns the behavior to that of Asterisk 13.7 and prior.
app_confbridge
------------------
* Added a bridge profile option called regcontext that allows you to

View file

@ -51,6 +51,11 @@ pre-connect => yes
; that we should attempt?
;limit => 5
;
; The maximum number of connections to have open at any given time.
; This defaults to 1 and it is highly recommended to only set this higher
; if using a version of UnixODBC greater than 2.3.1.
;max_connections => 20
;
; When the channel is destroyed, should any uncommitted open transactions
; automatically be committed?
;forcecommit => no

View file

@ -78,10 +78,19 @@ struct odbc_class
unsigned int forcecommit:1; /*!< Should uncommitted transactions be auto-committed on handle release? */
unsigned int isolation; /*!< Flags for how the DB should deal with data in other, uncommitted transactions */
unsigned int conntimeout; /*!< Maximum time the connection process should take */
unsigned int maxconnections; /*!< Maximum number of allowed connections */
/*! When a connection fails, cache that failure for how long? */
struct timeval negative_connection_cache;
/*! When a connection fails, when did that last occur? */
struct timeval last_negative_connect;
/*! A pool of available connections */
AST_LIST_HEAD_NOLOCK(, odbc_obj) connections;
/*! Lock to protect the connections */
ast_mutex_t lock;
/*! Condition to notify any pending connection requesters */
ast_cond_t cond;
/*! The total number of current connections */
size_t connection_cnt;
};
static struct ao2_container *class_container;
@ -90,7 +99,7 @@ static AST_RWLIST_HEAD_STATIC(odbc_tables, odbc_cache_tables);
static odbc_status odbc_obj_connect(struct odbc_obj *obj);
static odbc_status odbc_obj_disconnect(struct odbc_obj *obj);
static int odbc_register_class(struct odbc_class *class, int connect);
static void odbc_register_class(struct odbc_class *class, int connect);
AST_THREADSTORAGE(errors_buf);
@ -157,6 +166,8 @@ int ast_odbc_text2isolation(const char *txt)
static void odbc_class_destructor(void *data)
{
struct odbc_class *class = data;
struct odbc_obj *obj;
/* Due to refcounts, we can safely assume that any objects with a reference
* to us will prevent our destruction, so we don't need to worry about them.
*/
@ -169,7 +180,14 @@ static void odbc_class_destructor(void *data)
if (class->sanitysql) {
ast_free(class->sanitysql);
}
while ((obj = AST_LIST_REMOVE_HEAD(&class->connections, list))) {
ao2_ref(obj, -1);
}
SQLFreeHandle(SQL_HANDLE_ENV, class->env);
ast_mutex_destroy(&class->lock);
ast_cond_destroy(&class->cond);
}
static int null_hash_fn(const void *obj, const int flags)
@ -180,21 +198,23 @@ static int null_hash_fn(const void *obj, const int flags)
static void odbc_obj_destructor(void *data)
{
struct odbc_obj *obj = data;
struct odbc_class *class = obj->parent;
obj->parent = NULL;
odbc_obj_disconnect(obj);
ao2_ref(class, -1);
}
static void destroy_table_cache(struct odbc_cache_tables *table) {
static void destroy_table_cache(struct odbc_cache_tables *table)
{
struct odbc_cache_columns *col;
ast_debug(1, "Destroying table cache for %s\n", table->table);
AST_RWLIST_WRLOCK(&table->columns);
while ((col = AST_RWLIST_REMOVE_HEAD(&table->columns, list))) {
ast_free(col);
}
AST_RWLIST_UNLOCK(&table->columns);
AST_RWLIST_HEAD_DESTROY(&table->columns);
ast_free(table);
}
@ -370,18 +390,19 @@ SQLHSTMT ast_odbc_prepare_and_execute(struct odbc_obj *obj, SQLHSTMT (*prepare_c
* We must therefore redo everything when we establish a new
* connection. */
stmt = prepare_cb(obj, data);
if (!stmt) {
return NULL;
}
if (stmt) {
res = SQLExecute(stmt);
if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO) && (res != SQL_NO_DATA)) {
if (res == SQL_ERROR) {
ast_odbc_print_errors(SQL_HANDLE_STMT, stmt, "SQL Execute");
}
ast_log(LOG_WARNING, "SQL Execute error %d!\n", res);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
stmt = NULL;
res = SQLExecute(stmt);
if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO) && (res != SQL_NO_DATA)) {
if (res == SQL_ERROR) {
ast_odbc_print_errors(SQL_HANDLE_STMT, stmt, "SQL Execute");
}
ast_log(LOG_WARNING, "SQL Execute error %d!\n", res);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
stmt = NULL;
}
return stmt;
@ -468,7 +489,7 @@ static int load_odbc_config(void)
struct ast_variable *v;
char *cat;
const char *dsn, *username, *password, *sanitysql;
int enabled, bse, conntimeout, forcecommit, isolation;
int enabled, bse, conntimeout, forcecommit, isolation, maxconnections;
struct timeval ncache = { 0, 0 };
int preconnect = 0, res = 0;
struct ast_flags config_flags = { 0 };
@ -495,6 +516,7 @@ static int load_odbc_config(void)
conntimeout = 10;
forcecommit = 0;
isolation = SQL_TXN_READ_COMMITTED;
maxconnections = 1;
for (v = ast_variable_browse(config, cat); v; v = v->next) {
if (!strcasecmp(v->name, "pooling") ||
!strncasecmp(v->name, "share", 5) ||
@ -538,6 +560,11 @@ static int load_odbc_config(void)
ast_log(LOG_ERROR, "Unrecognized value for 'isolation': '%s' in section '%s'\n", v->value, cat);
isolation = SQL_TXN_READ_COMMITTED;
}
} else if (!strcasecmp(v->name, "max_connections")) {
if (sscanf(v->value, "%30d", &maxconnections) != 1 || maxconnections < 1) {
ast_log(LOG_WARNING, "max_connections must be a positive integer\n");
maxconnections = 1;
}
}
}
@ -563,6 +590,7 @@ static int load_odbc_config(void)
new->isolation = isolation;
new->conntimeout = conntimeout;
new->negative_connection_cache = ncache;
new->maxconnections = maxconnections;
if (cat)
ast_copy_string(new->name, cat, sizeof(new->name));
@ -581,6 +609,9 @@ static int load_odbc_config(void)
break;
}
ast_mutex_init(&new->lock);
ast_cond_init(&new->cond, NULL);
odbc_register_class(new, preconnect);
ast_log(LOG_NOTICE, "Registered ODBC class '%s' dsn->[%s]\n", cat, dsn);
ao2_ref(new, -1);
@ -641,6 +672,7 @@ static char *handle_cli_odbc_show(struct ast_cli_entry *e, int cmd, struct ast_c
ast_strftime(timestr, sizeof(timestr), "%Y-%m-%d %T", &tm);
ast_cli(a->fd, " Name: %s\n DSN: %s\n", class->name, class->dsn);
ast_cli(a->fd, " Last connection attempt: %s\n", timestr);
ast_cli(a->fd, " Number of active connections: %zd (out of %d)\n", class->connection_cnt, class->maxconnections);
ast_cli(a->fd, "\n");
}
ao2_ref(class, -1);
@ -654,38 +686,47 @@ static struct ast_cli_entry cli_odbc[] = {
AST_CLI_DEFINE(handle_cli_odbc_show, "List ODBC DSN(s)")
};
static int odbc_register_class(struct odbc_class *class, int preconnect)
static void odbc_register_class(struct odbc_class *class, int preconnect)
{
struct odbc_obj *obj;
if (class) {
ao2_link(class_container, class);
/* I still have a reference in the caller, so a deref is NOT missing here. */
if (preconnect) {
/* Request and release builds a connection */
obj = ast_odbc_request_obj(class->name, 0);
if (obj) {
ast_odbc_release_obj(obj);
}
}
ao2_link(class_container, class);
/* I still have a reference in the caller, so a deref is NOT missing here. */
return 0;
} else {
ast_log(LOG_WARNING, "Attempted to register a NULL class?\n");
return -1;
if (!preconnect) {
return;
}
/* Request and release builds a connection */
obj = ast_odbc_request_obj(class->name, 0);
if (obj) {
ast_odbc_release_obj(obj);
}
return;
}
void ast_odbc_release_obj(struct odbc_obj *obj)
{
ast_debug(2, "Releasing ODBC handle %p\n", obj);
struct odbc_class *class = obj->parent;
#ifdef DEBUG_THREADS
obj->file[0] = '\0';
obj->function[0] = '\0';
obj->lineno = 0;
#endif
ao2_ref(obj, -1);
ast_debug(2, "Releasing ODBC handle %p into pool\n", obj);
/* The odbc_obj only holds a reference to the class when it is
* actively being used. This guarantees no circular reference
* between odbc_class and odbc_obj. Since it is being released
* we also release our class reference. If a reload occurred before
* the class will go away automatically once all odbc_obj are
* released back.
*/
obj->parent = NULL;
ast_mutex_lock(&class->lock);
AST_LIST_INSERT_HEAD(&class->connections, obj, list);
ast_cond_signal(&class->cond);
ast_mutex_unlock(&class->lock);
ao2_ref(class, -1);
}
int ast_odbc_backslash_is_escape(struct odbc_obj *obj)
@ -703,6 +744,50 @@ static int aoro2_class_cb(void *obj, void *arg, int flags)
return 0;
}
/*
* \brief Determine if the connection has died.
*
* \param connection The connection to check
* \param class The ODBC class
* \retval 1 Yep, it's dead
* \retval 0 It's alive and well
*/
static int connection_dead(struct odbc_obj *connection, struct odbc_class *class)
{
char *test_sql = "select 1";
SQLINTEGER dead;
SQLRETURN res;
SQLHSTMT stmt;
res = SQLGetConnectAttr(connection->con, SQL_ATTR_CONNECTION_DEAD, &dead, 0, 0);
if (SQL_SUCCEEDED(res)) {
return dead == SQL_CD_TRUE ? 1 : 0;
}
/* If the Driver doesn't support SQL_ATTR_CONNECTION_DEAD do a
* probing query instead
*/
res = SQLAllocHandle(SQL_HANDLE_STMT, connection->con, &stmt);
if (!SQL_SUCCEEDED(res)) {
return 1;
}
if (!ast_strlen_zero(class->sanitysql)) {
test_sql = class->sanitysql;
}
res = SQLPrepare(stmt, (unsigned char *)test_sql, SQL_NTS);
if (!SQL_SUCCEEDED(res)) {
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
return 1;
}
res = SQLExecute(stmt);
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
return SQL_SUCCEEDED(res) ? 0 : 1;
}
struct odbc_obj *_ast_odbc_request_obj2(const char *name, struct ast_flags flags, const char *file, const char *function, int lineno)
{
struct odbc_obj *obj = NULL;
@ -713,17 +798,60 @@ struct odbc_obj *_ast_odbc_request_obj2(const char *name, struct ast_flags flags
return NULL;
}
/* XXX ODBC connection objects do not have shared ownership, so there is no reason
* to use refcounted objects here.
*/
obj = ao2_alloc(sizeof(*obj), odbc_obj_destructor);
/* Inherit reference from the ao2_callback from before */
obj->parent = class;
if (odbc_obj_connect(obj) == ODBC_FAIL) {
ao2_ref(obj, -1);
return NULL;
ast_mutex_lock(&class->lock);
while (!obj) {
obj = AST_LIST_REMOVE_HEAD(&class->connections, list);
if (!obj) {
if (class->connection_cnt < class->maxconnections) {
/* If no connection is immediately available establish a new
* one if allowed. If we try and fail we give up completely as
* we could go into an infinite loop otherwise.
*/
obj = ao2_alloc(sizeof(*obj), odbc_obj_destructor);
if (!obj) {
break;
}
obj->parent = ao2_bump(class);
if (odbc_obj_connect(obj) == ODBC_FAIL) {
ao2_ref(obj->parent, -1);
ao2_ref(obj, -1);
obj = NULL;
break;
}
class->connection_cnt++;
ast_debug(2, "Created ODBC handle %p on class '%s', new count is %zd\n", obj,
name, class->connection_cnt);
} else {
/* Otherwise if we're not allowed to create a new one we
* wait for another thread to give up the connection they
* own.
*/
ast_cond_wait(&class->cond, &class->lock);
}
} else if (connection_dead(obj, class)) {
/* If the connection is dead try to grab another functional one from the
* pool instead of trying to resurrect this one.
*/
ao2_ref(obj, -1);
obj = NULL;
class->connection_cnt--;
ast_debug(2, "ODBC handle %p dead - removing from class '%s', new count is %zd\n",
obj, name, class->connection_cnt);
} else {
/* We successfully grabbed a connection from the pool and all is well!
*/
obj->parent = ao2_bump(class);
ast_debug(2, "Reusing ODBC handle %p from class '%s'\n", obj, name);
}
}
ast_mutex_unlock(&class->lock);
ao2_ref(class, -1);
return obj;
}
@ -755,14 +883,6 @@ static odbc_status odbc_obj_disconnect(struct odbc_obj *obj)
obj->con = NULL;
res = SQLDisconnect(con);
if (obj->parent) {
if (res == SQL_SUCCESS || res == SQL_SUCCESS_WITH_INFO) {
ast_debug(3, "Disconnected %d from %s [%s](%p)\n", res, obj->parent->name, obj->parent->dsn, obj);
} else {
ast_debug(3, "res_odbc: %s [%s](%p) already disconnected\n", obj->parent->name, obj->parent->dsn, obj);
}
}
if ((res = SQLFreeHandle(SQL_HANDLE_DBC, con)) == SQL_SUCCESS) {
ast_debug(3, "Database handle %p (connection %p) deallocated\n", obj, con);
} else {