diff --git a/bin/addons/base/ir/ir_cron.py b/bin/addons/base/ir/ir_cron.py index f86577889e4..e4cae8569c7 100644 --- a/bin/addons/base/ir/ir_cron.py +++ b/bin/addons/base/ir/ir_cron.py @@ -97,8 +97,8 @@ class ir_cron(osv.osv, netsvc.Agent): db, pool = pooler.get_db_and_pool(db_name) except: return False + cr = db.cursor() try: - cr = db.cursor() if not pool._init: now = DateTime.now() cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority') diff --git a/bin/service/web_services.py b/bin/service/web_services.py index b89fa829266..a71e4ea379a 100644 --- a/bin/service/web_services.py +++ b/bin/service/web_services.py @@ -71,13 +71,14 @@ class db(netsvc.Service): self.actions[id] = {'clean': False} db = sql_db.db_connect('template1') - cr = db.serialized_cursor() + cr = db.cursor() try: - cr.autocommit(True) # XXX inhibit the effect of a serialized cursor. is it what we want ? + cr.autocommit(True) # avoid transaction block cr.execute('CREATE DATABASE "%s" ENCODING \'unicode\'' % db_name) finally: cr.close() - sql_db.close_db('template1') + del db + class DBInitialize(object): def __call__(self, serv, id, db_name, demo, lang, user_password='admin'): cr = None @@ -152,8 +153,8 @@ class db(netsvc.Service): logger = netsvc.Logger() db = sql_db.db_connect('template1') - cr = db.serialized_cursor() - cr.autocommit(True) # XXX inhibit the effect of a serialized cursor. is it what we want ? + cr = db.cursor() + cr.autocommit(True) # avoid transaction block try: try: cr.execute('DROP DATABASE "%s"' % db_name) @@ -165,9 +166,7 @@ class db(netsvc.Service): logger.notifyChannel("web-services", netsvc.LOG_INFO, 'DROP DB: %s' % (db_name)) finally: - cr.commit() cr.close() - sql_db.close_db('template1') return True def _set_pg_psw_env_var(self): @@ -221,13 +220,13 @@ class db(netsvc.Service): raise Exception, "Database already exists" db = sql_db.db_connect('template1') - cr = db.serialized_cursor() - cr.autocommit(True) # XXX inhibit the effect of a serialized cursor. is it what we want ? + cr = db.cursor() + cr.autocommit(True) # avoid transaction block try: cr.execute("""CREATE DATABASE "%s" ENCODING 'unicode' TEMPLATE "template0" """ % db_name) finally: cr.close() - sql_db.close_db('template1') + del db cmd = ['pg_restore', '--no-owner'] if tools.config['db_user']: @@ -266,7 +265,7 @@ class db(netsvc.Service): logger = netsvc.Logger() db = sql_db.db_connect('template1') - cr = db.serialized_cursor() + cr = db.cursor() try: try: cr.execute('ALTER DATABASE "%s" RENAME TO "%s"' % (old_name, new_name)) @@ -282,9 +281,7 @@ class db(netsvc.Service): logger.notifyChannel("web-services", netsvc.LOG_INFO, 'RENAME DB: %s -> %s' % (old_name, new_name)) finally: - cr.commit() cr.close() - sql_db.close_db('template1') return True def db_exist(self, db_name): @@ -319,7 +316,6 @@ class db(netsvc.Service): res = [] finally: cr.close() - sql_db.close_db('template1') res.sort() return res diff --git a/bin/sql_db.py b/bin/sql_db.py index 550ed14555e..48e0c9bb58f 100644 --- a/bin/sql_db.py +++ b/bin/sql_db.py @@ -19,10 +19,12 @@ # ############################################################################## +__all__ = ['db_connect', 'close_db'] + import netsvc from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE -from psycopg2.pool import ThreadedConnectionPool from psycopg2.psycopg1 import cursor as psycopg1cursor +from psycopg2.pool import PoolError import psycopg2.extensions @@ -48,26 +50,23 @@ psycopg2.extensions.register_type(psycopg2.extensions.new_type((700, 701, 1700,) import tools -import re +from tools.func import wraps +from datetime import datetime as mdt +import threading -from mx import DateTime as mdt +import re re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$'); re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$'); + def log(msg, lvl=netsvc.LOG_DEBUG): logger = netsvc.Logger() logger.notifyChannel('sql', lvl, msg) class Cursor(object): IN_MAX = 1000 - sql_from_log = {} - sql_into_log = {} - sql_log = False - count = 0 - - def check(f): - from tools.func import wraps + def check(f): @wraps(f) def wrapper(self, *args, **kwargs): if self.__closed: @@ -75,36 +74,41 @@ class Cursor(object): return f(self, *args, **kwargs) return wrapper - def __init__(self, pool, serialized=False): + def __init__(self, pool, dbname, serialized=False): + self.sql_from_log = {} + self.sql_into_log = {} + self.sql_log = False + self.sql_log_count = 0 + + self.__closed = True # avoid the call of close() (by __del__) if an exception + # is raised by any of the following initialisations self._pool = pool + self.dbname = dbname self._serialized = serialized - self._cnx = pool.getconn() + self._cnx = pool.borrow(dsn(dbname)) self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor) - self.__closed = False + self.__closed = False # real initialisation value self.autocommit(False) - self.dbname = pool.dbname if tools.config['log_level'] in (netsvc.LOG_DEBUG, netsvc.LOG_DEBUG_RPC): from inspect import stack self.__caller = tuple(stack()[2][1:3]) - + def __del__(self): if not self.__closed: + # Oops. 'self' has not been closed explicitly. + # The cursor will be deleted by the garbage collector, + # but the database connection is not put back into the connection + # pool, preventing some operation on the database like dropping it. + # This can also lead to a server overload. if tools.config['log_level'] in (netsvc.LOG_DEBUG, netsvc.LOG_DEBUG_RPC): - # Oops. 'self' has not been closed explicitly. - # The cursor will be deleted by the garbage collector, - # but the database connection is not put back into the connection - # pool, preventing some operation on the database like dropping it. - # This can also lead to a server overload. msg = "Cursor not closed explicitly\n" \ "Cursor was created at %s:%s" % self.__caller - log(msg, netsvc.LOG_WARNING) self.close() @check def execute(self, query, params=None): - self.count+=1 if '%d' in query or '%f' in query: log(query, netsvc.LOG_WARNING) log("SQL queries mustn't contain %d or %f anymore. Use only %s", netsvc.LOG_WARNING) @@ -124,7 +128,7 @@ class Cursor(object): if self.sql_log: log("query: %s" % self._obj.query) - self.count+=1 + self.sql_log_count+=1 res_from = re_from.match(query.lower()) if res_from: self.sql_from_log.setdefault(res_from.group(1), [0, 0]) @@ -138,6 +142,9 @@ class Cursor(object): return res def print_log(self): + if not self.sql_log: + return + def process(type): sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log} if not sqllogs[type]: @@ -149,17 +156,23 @@ class Cursor(object): for r in sqllogitems: log("table: %s: %s/%s" %(r[0], str(r[1][1]), r[1][0])) sum+= r[1][1] - log("SUM:%s/%d" % (sum, self.count)) + log("SUM:%s/%d" % (sum, self.sql_log_count)) sqllogs[type].clear() process('from') process('into') - self.count = 0 + self.sql_log_count = 0 self.sql_log = False @check def close(self): - self.rollback() # Ensure we close the current transaction. + if not self._obj: + return + self.print_log() + + if not self._serialized: + self.rollback() # Ensure we close the current transaction. + self._obj.close() # This force the cursor to be freed, and thus, available again. It is @@ -169,8 +182,8 @@ class Cursor(object): # part because browse records keep a reference to the cursor. del self._obj self.__closed = True - self._pool.putconn(self._cnx) - + self._pool.give_back(self._cnx) + @check def autocommit(self, on): offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)] @@ -188,63 +201,128 @@ class Cursor(object): def __getattr__(self, name): return getattr(self._obj, name) + class ConnectionPool(object): - def __init__(self, pool, dbname): + + def locked(fun): + @wraps(fun) + def _locked(self, *args, **kwargs): + self._lock.acquire() + try: + return fun(self, *args, **kwargs) + finally: + self._lock.release() + return _locked + + + def __init__(self, maxconn=64): + self._connections = [] + self._maxconn = max(maxconn, 1) + self._lock = threading.Lock() + self._logger = netsvc.Logger() + + def _log(self, msg): + #self._logger.notifyChannel('ConnectionPool', netsvc.LOG_INFO, msg) + pass + def _debug(self, msg): + #self._logger.notifyChannel('ConnectionPool', netsvc.LOG_DEBUG, msg) + pass + + @locked + def borrow(self, dsn): + self._log('Borrow connection to %s' % (dsn,)) + + result = None + for i, (cnx, used) in enumerate(self._connections): + if not used and cnx.dsn == dsn: + self._debug('Existing connection found at index %d' % i) + + self._connections.pop(i) + self._connections.append((cnx, True)) + + result = cnx + break + if result: + return result + + if len(self._connections) >= self._maxconn: + # try to remove the older connection not used + for i, (cnx, used) in enumerate(self._connections): + if not used: + self._debug('Removing old connection at index %d: %s' % (i, cnx.dsn)) + self._connections.pop(i) + break + else: + # note: this code is called only if the for loop has completed (no break) + raise PoolError('Connection Pool Full') + + self._debug('Create new connection') + result = psycopg2.connect(dsn=dsn) + self._connections.append((result, True)) + return result + + @locked + def give_back(self, connection): + self._log('Give back connection to %s' % (connection.dsn,)) + for i, (cnx, used) in enumerate(self._connections): + if cnx is connection: + self._connections.pop(i) + self._connections.append((cnx, False)) + break + else: + raise PoolError('This connection does not below to the pool') + + @locked + def close_all(self, dsn): + for i, (cnx, used) in tools.reverse_enumerate(self._connections): + if cnx.dsn == dsn: + cnx.close() + self._connections.pop(i) + + +class Connection(object): + __LOCKS = {} + + def __init__(self, pool, dbname, unique=False): self.dbname = dbname self._pool = pool + self._unique = unique + if unique: + if dbname not in self.__LOCKS: + self.__LOCKS[dbname] = threading.Lock() + self.__LOCKS[dbname].acquire() - def cursor(self): - return Cursor(self) + def __del__(self): + if self._unique: + self.__LOCKS[self.dbname].release() + + def cursor(self, serialized=False): + return Cursor(self._pool, self.dbname, serialized=serialized) def serialized_cursor(self): - return Cursor(self, True) + return self.cursor(True) - def __getattr__(self, name): - return getattr(self._pool, name) -class PoolManager(object): - _pools = {} - _dsn = None - maxconn = int(tools.config['db_maxconn']) or 64 - - @classmethod - def dsn(cls, db_name): - if cls._dsn is None: - cls._dsn = '' - for p in ('host', 'port', 'user', 'password'): - cfg = tools.config['db_' + p] - if cfg: - cls._dsn += '%s=%s ' % (p, cfg) - return '%s dbname=%s' % (cls._dsn, db_name) +_dsn = '' +for p in ('host', 'port', 'user', 'password'): + cfg = tools.config['db_' + p] + if cfg: + _dsn += '%s=%s ' % (p, cfg) - @classmethod - def get(cls, db_name): - if db_name not in cls._pools: - logger = netsvc.Logger() - try: - logger.notifyChannel('dbpool', netsvc.LOG_INFO, 'Connecting to %s' % (db_name,)) - cls._pools[db_name] = ConnectionPool(ThreadedConnectionPool(1, cls.maxconn, cls.dsn(db_name)), db_name) - except Exception, e: - logger.notifyChannel('dbpool', netsvc.LOG_ERROR, 'Unable to connect to %s: %s' % - (db_name, str(e))) - raise - return cls._pools[db_name] +def dsn(db_name): + return '%sdbname=%s' % (_dsn, db_name) - @classmethod - def close(cls, db_name): - if db_name in cls._pools: - logger = netsvc.Logger() - logger.notifyChannel('dbpool', netsvc.LOG_INFO, 'Closing all connections to %s' % (db_name,)) - cls._pools[db_name].closeall() - del cls._pools[db_name] + +_Pool = ConnectionPool(int(tools.config['db_maxconn'])) def db_connect(db_name): - return PoolManager.get(db_name) + unique = db_name in ['template1', 'template0'] + return Connection(_Pool, db_name, unique) def close_db(db_name): - PoolManager.close(db_name) + _Pool.close_all(dsn(db_name)) tools.cache.clean_caches_for_db(db_name) - + # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: