[IMP] better connection pool

bzr revid: chs@tinyerp.com-20091123170734-usmmwc5uzoik73hs
This commit is contained in:
Christophe Simonis 2009-11-23 18:07:34 +01:00
parent 7d9177fc2d
commit 86beb91507
3 changed files with 159 additions and 85 deletions

View File

@ -97,8 +97,8 @@ class ir_cron(osv.osv, netsvc.Agent):
db, pool = pooler.get_db_and_pool(db_name)
except:
return False
cr = db.cursor()
try:
cr = db.cursor()
if not pool._init:
now = DateTime.now()
cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')

View File

@ -71,13 +71,14 @@ class db(netsvc.Service):
self.actions[id] = {'clean': False}
db = sql_db.db_connect('template1')
cr = db.serialized_cursor()
cr = db.cursor()
try:
cr.autocommit(True) # XXX inhibit the effect of a serialized cursor. is it what we want ?
cr.autocommit(True) # avoid transaction block
cr.execute('CREATE DATABASE "%s" ENCODING \'unicode\'' % db_name)
finally:
cr.close()
sql_db.close_db('template1')
del db
class DBInitialize(object):
def __call__(self, serv, id, db_name, demo, lang, user_password='admin'):
cr = None
@ -152,8 +153,8 @@ class db(netsvc.Service):
logger = netsvc.Logger()
db = sql_db.db_connect('template1')
cr = db.serialized_cursor()
cr.autocommit(True) # XXX inhibit the effect of a serialized cursor. is it what we want ?
cr = db.cursor()
cr.autocommit(True) # avoid transaction block
try:
try:
cr.execute('DROP DATABASE "%s"' % db_name)
@ -165,9 +166,7 @@ class db(netsvc.Service):
logger.notifyChannel("web-services", netsvc.LOG_INFO,
'DROP DB: %s' % (db_name))
finally:
cr.commit()
cr.close()
sql_db.close_db('template1')
return True
def _set_pg_psw_env_var(self):
@ -221,13 +220,13 @@ class db(netsvc.Service):
raise Exception, "Database already exists"
db = sql_db.db_connect('template1')
cr = db.serialized_cursor()
cr.autocommit(True) # XXX inhibit the effect of a serialized cursor. is it what we want ?
cr = db.cursor()
cr.autocommit(True) # avoid transaction block
try:
cr.execute("""CREATE DATABASE "%s" ENCODING 'unicode' TEMPLATE "template0" """ % db_name)
finally:
cr.close()
sql_db.close_db('template1')
del db
cmd = ['pg_restore', '--no-owner']
if tools.config['db_user']:
@ -266,7 +265,7 @@ class db(netsvc.Service):
logger = netsvc.Logger()
db = sql_db.db_connect('template1')
cr = db.serialized_cursor()
cr = db.cursor()
try:
try:
cr.execute('ALTER DATABASE "%s" RENAME TO "%s"' % (old_name, new_name))
@ -282,9 +281,7 @@ class db(netsvc.Service):
logger.notifyChannel("web-services", netsvc.LOG_INFO,
'RENAME DB: %s -> %s' % (old_name, new_name))
finally:
cr.commit()
cr.close()
sql_db.close_db('template1')
return True
def db_exist(self, db_name):
@ -319,7 +316,6 @@ class db(netsvc.Service):
res = []
finally:
cr.close()
sql_db.close_db('template1')
res.sort()
return res

View File

@ -19,10 +19,12 @@
#
##############################################################################
__all__ = ['db_connect', 'close_db']
import netsvc
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE
from psycopg2.pool import ThreadedConnectionPool
from psycopg2.psycopg1 import cursor as psycopg1cursor
from psycopg2.pool import PoolError
import psycopg2.extensions
@ -48,26 +50,23 @@ psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,)
import tools
import re
from tools.func import wraps
from datetime import datetime as mdt
import threading
from mx import DateTime as mdt
import re
re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$');
re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$');
def log(msg, lvl=netsvc.LOG_DEBUG):
logger = netsvc.Logger()
logger.notifyChannel('sql', lvl, msg)
class Cursor(object):
IN_MAX = 1000
sql_from_log = {}
sql_into_log = {}
sql_log = False
count = 0
def check(f):
from tools.func import wraps
def check(f):
@wraps(f)
def wrapper(self, *args, **kwargs):
if self.__closed:
@ -75,36 +74,41 @@ class Cursor(object):
return f(self, *args, **kwargs)
return wrapper
def __init__(self, pool, serialized=False):
def __init__(self, pool, dbname, serialized=False):
self.sql_from_log = {}
self.sql_into_log = {}
self.sql_log = False
self.sql_log_count = 0
self.__closed = True # avoid the call of close() (by __del__) if an exception
# is raised by any of the following initialisations
self._pool = pool
self.dbname = dbname
self._serialized = serialized
self._cnx = pool.getconn()
self._cnx = pool.borrow(dsn(dbname))
self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor)
self.__closed = False
self.__closed = False # real initialisation value
self.autocommit(False)
self.dbname = pool.dbname
if tools.config['log_level'] in (netsvc.LOG_DEBUG, netsvc.LOG_DEBUG_RPC):
from inspect import stack
self.__caller = tuple(stack()[2][1:3])
def __del__(self):
if not self.__closed:
# Oops. 'self' has not been closed explicitly.
# The cursor will be deleted by the garbage collector,
# but the database connection is not put back into the connection
# pool, preventing some operation on the database like dropping it.
# This can also lead to a server overload.
if tools.config['log_level'] in (netsvc.LOG_DEBUG, netsvc.LOG_DEBUG_RPC):
# Oops. 'self' has not been closed explicitly.
# The cursor will be deleted by the garbage collector,
# but the database connection is not put back into the connection
# pool, preventing some operation on the database like dropping it.
# This can also lead to a server overload.
msg = "Cursor not closed explicitly\n" \
"Cursor was created at %s:%s" % self.__caller
log(msg, netsvc.LOG_WARNING)
self.close()
@check
def execute(self, query, params=None):
self.count+=1
if '%d' in query or '%f' in query:
log(query, netsvc.LOG_WARNING)
log("SQL queries mustn't contain %d or %f anymore. Use only %s", netsvc.LOG_WARNING)
@ -124,7 +128,7 @@ class Cursor(object):
if self.sql_log:
log("query: %s" % self._obj.query)
self.count+=1
self.sql_log_count+=1
res_from = re_from.match(query.lower())
if res_from:
self.sql_from_log.setdefault(res_from.group(1), [0, 0])
@ -138,6 +142,9 @@ class Cursor(object):
return res
def print_log(self):
if not self.sql_log:
return
def process(type):
sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log}
if not sqllogs[type]:
@ -149,17 +156,23 @@ class Cursor(object):
for r in sqllogitems:
log("table: %s: %s/%s" %(r[0], str(r[1][1]), r[1][0]))
sum+= r[1][1]
log("SUM:%s/%d" % (sum, self.count))
log("SUM:%s/%d" % (sum, self.sql_log_count))
sqllogs[type].clear()
process('from')
process('into')
self.count = 0
self.sql_log_count = 0
self.sql_log = False
@check
def close(self):
self.rollback() # Ensure we close the current transaction.
if not self._obj:
return
self.print_log()
if not self._serialized:
self.rollback() # Ensure we close the current transaction.
self._obj.close()
# This force the cursor to be freed, and thus, available again. It is
@ -169,8 +182,8 @@ class Cursor(object):
# part because browse records keep a reference to the cursor.
del self._obj
self.__closed = True
self._pool.putconn(self._cnx)
self._pool.give_back(self._cnx)
@check
def autocommit(self, on):
offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
@ -188,63 +201,128 @@ class Cursor(object):
def __getattr__(self, name):
return getattr(self._obj, name)
class ConnectionPool(object):
def __init__(self, pool, dbname):
def locked(fun):
@wraps(fun)
def _locked(self, *args, **kwargs):
self._lock.acquire()
try:
return fun(self, *args, **kwargs)
finally:
self._lock.release()
return _locked
def __init__(self, maxconn=64):
self._connections = []
self._maxconn = max(maxconn, 1)
self._lock = threading.Lock()
self._logger = netsvc.Logger()
def _log(self, msg):
#self._logger.notifyChannel('ConnectionPool', netsvc.LOG_INFO, msg)
pass
def _debug(self, msg):
#self._logger.notifyChannel('ConnectionPool', netsvc.LOG_DEBUG, msg)
pass
@locked
def borrow(self, dsn):
self._log('Borrow connection to %s' % (dsn,))
result = None
for i, (cnx, used) in enumerate(self._connections):
if not used and cnx.dsn == dsn:
self._debug('Existing connection found at index %d' % i)
self._connections.pop(i)
self._connections.append((cnx, True))
result = cnx
break
if result:
return result
if len(self._connections) >= self._maxconn:
# try to remove the older connection not used
for i, (cnx, used) in enumerate(self._connections):
if not used:
self._debug('Removing old connection at index %d: %s' % (i, cnx.dsn))
self._connections.pop(i)
break
else:
# note: this code is called only if the for loop has completed (no break)
raise PoolError('Connection Pool Full')
self._debug('Create new connection')
result = psycopg2.connect(dsn=dsn)
self._connections.append((result, True))
return result
@locked
def give_back(self, connection):
self._log('Give back connection to %s' % (connection.dsn,))
for i, (cnx, used) in enumerate(self._connections):
if cnx is connection:
self._connections.pop(i)
self._connections.append((cnx, False))
break
else:
raise PoolError('This connection does not below to the pool')
@locked
def close_all(self, dsn):
for i, (cnx, used) in tools.reverse_enumerate(self._connections):
if cnx.dsn == dsn:
cnx.close()
self._connections.pop(i)
class Connection(object):
__LOCKS = {}
def __init__(self, pool, dbname, unique=False):
self.dbname = dbname
self._pool = pool
self._unique = unique
if unique:
if dbname not in self.__LOCKS:
self.__LOCKS[dbname] = threading.Lock()
self.__LOCKS[dbname].acquire()
def cursor(self):
return Cursor(self)
def __del__(self):
if self._unique:
self.__LOCKS[self.dbname].release()
def cursor(self, serialized=False):
return Cursor(self._pool, self.dbname, serialized=serialized)
def serialized_cursor(self):
return Cursor(self, True)
return self.cursor(True)
def __getattr__(self, name):
return getattr(self._pool, name)
class PoolManager(object):
_pools = {}
_dsn = None
maxconn = int(tools.config['db_maxconn']) or 64
@classmethod
def dsn(cls, db_name):
if cls._dsn is None:
cls._dsn = ''
for p in ('host', 'port', 'user', 'password'):
cfg = tools.config['db_' + p]
if cfg:
cls._dsn += '%s=%s ' % (p, cfg)
return '%s dbname=%s' % (cls._dsn, db_name)
_dsn = ''
for p in ('host', 'port', 'user', 'password'):
cfg = tools.config['db_' + p]
if cfg:
_dsn += '%s=%s ' % (p, cfg)
@classmethod
def get(cls, db_name):
if db_name not in cls._pools:
logger = netsvc.Logger()
try:
logger.notifyChannel('dbpool', netsvc.LOG_INFO, 'Connecting to %s' % (db_name,))
cls._pools[db_name] = ConnectionPool(ThreadedConnectionPool(1, cls.maxconn, cls.dsn(db_name)), db_name)
except Exception, e:
logger.notifyChannel('dbpool', netsvc.LOG_ERROR, 'Unable to connect to %s: %s' %
(db_name, str(e)))
raise
return cls._pools[db_name]
def dsn(db_name):
return '%sdbname=%s' % (_dsn, db_name)
@classmethod
def close(cls, db_name):
if db_name in cls._pools:
logger = netsvc.Logger()
logger.notifyChannel('dbpool', netsvc.LOG_INFO, 'Closing all connections to %s' % (db_name,))
cls._pools[db_name].closeall()
del cls._pools[db_name]
_Pool = ConnectionPool(int(tools.config['db_maxconn']))
def db_connect(db_name):
return PoolManager.get(db_name)
unique = db_name in ['template1', 'template0']
return Connection(_Pool, db_name, unique)
def close_db(db_name):
PoolManager.close(db_name)
_Pool.close_all(dsn(db_name))
tools.cache.clean_caches_for_db(db_name)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: