From 299627a14408ce297277d2adfce2e4bdffa1fcbc Mon Sep 17 00:00:00 2001 From: Christophe Simonis Date: Mon, 8 Dec 2008 15:03:42 +0100 Subject: [PATCH] [IMP] use psycopg2 instead of psycopg1 bzr revid: christophe@tinyerp.com-20081208140342-qstsqxm0xr5rppzz --- bin/netsvc.py | 1 + bin/openerp-server.py | 27 +------- bin/osv/fields.py | 4 +- bin/osv/osv.py | 4 +- bin/pooler.py | 54 ++++----------- bin/sql_db.py | 149 ++++++++++++++++++++++++------------------ bin/tools/misc.py | 3 - 7 files changed, 101 insertions(+), 141 deletions(-) diff --git a/bin/netsvc.py b/bin/netsvc.py index daa29788e07..31832004067 100644 --- a/bin/netsvc.py +++ b/bin/netsvc.py @@ -206,6 +206,7 @@ class Logger(object): log = logging.getLogger(name) getattr(log, level)(msg) +init_logger() class Agent(object): _timers = [] diff --git a/bin/openerp-server.py b/bin/openerp-server.py index 11a5b32705c..f353dc236b9 100755 --- a/bin/openerp-server.py +++ b/bin/openerp-server.py @@ -53,9 +53,6 @@ __version__ = release.version # get logger #---------------------------------------------------------- import netsvc - -netsvc.init_logger() - logger = netsvc.Logger() #----------------------------------------------------------------------- @@ -81,35 +78,13 @@ dispatcher.monitor(signal.SIGINT) #--------------------------------------------------------------- # connect to the database and initialize it with base if needed #--------------------------------------------------------------- -import psycopg import pooler -db_name = tools.config["db_name"] - -# test whether it is needed to initialize the db (the db is empty) -#try: -# cr = pooler.get_db_only(db_name).cursor() -#except psycopg.OperationalError: -# logger.notifyChannel("init", netsvc.LOG_INFO, "could not connect to database '%s'!" % db_name,) -# cr = None -#if cr: -# cr.execute("SELECT relname FROM pg_class WHERE relkind='r' AND relname='ir_ui_menu'") -# if len(cr.fetchall())==0: -##if False: -# logger.notifyChannel("init", netsvc.LOG_INFO, "init db") -# tools.init_db(cr) -# # in that case, force --init=all -# tools.config["init"]["all"] = 1 -# tools.config['update']['all'] = 1 -# if not tools.config['without_demo']: -# tools.config["demo"]['all'] = 1 -# cr.close() - #---------------------------------------------------------- # launch modules install/upgrade/removes if needed #---------------------------------------------------------- if tools.config['upgrade']: - print 'Upgrading new modules...' + logger.notifyChannel('init', netsvc.LOG_INFO, 'Upgrading new modules...') import tools.upgrade (toinit, toupdate) = tools.upgrade.upgrade() for m in toinit: diff --git a/bin/osv/fields.py b/bin/osv/fields.py index e8c0c3b1ede..097697cb3e1 100644 --- a/bin/osv/fields.py +++ b/bin/osv/fields.py @@ -35,7 +35,7 @@ import string import netsvc -import psycopg +from psycopg2 import Binary import warnings import tools @@ -195,7 +195,7 @@ class time(_column): class binary(_column): _type = 'binary' _symbol_c = '%s' - _symbol_f = lambda symb: symb and psycopg.Binary(symb) or None + _symbol_f = lambda symb: symb and Binary(symb) or None _symbol_set = (_symbol_c, _symbol_f) _classic_read = False diff --git a/bin/osv/osv.py b/bin/osv/osv.py index 2efc3a41a3c..9e6d4171c37 100644 --- a/bin/osv/osv.py +++ b/bin/osv/osv.py @@ -30,7 +30,7 @@ import pooler import copy import sys -import psycopg +from psycopg2 import IntegrityError from netsvc import Logger, LOG_ERROR from tools.misc import UpdateableDict @@ -87,7 +87,7 @@ class osv_pool(netsvc.Service): self.abortResponse(1, inst.name, 'warning', inst.value) except except_osv, inst: self.abortResponse(1, inst.name, inst.exc_type, inst.value) - except psycopg.IntegrityError, inst: + except IntegrityError, inst: for key in self._sql_error.keys(): if key in inst[0]: self.abortResponse(1, 'Constraint Error', 'warning', self._sql_error[key]) diff --git a/bin/pooler.py b/bin/pooler.py index c838641f01d..132e99b5089 100644 --- a/bin/pooler.py +++ b/bin/pooler.py @@ -20,30 +20,19 @@ # ############################################################################## -import sql_db -import osv.osv -import tools -import addons -import netsvc - -db_dic = {} pool_dic = {} - def get_db_and_pool(db_name, force_demo=False, status=None, update_module=False): if not status: status={} - if db_name in db_dic: - db = db_dic[db_name] - else: - logger = netsvc.Logger() - logger.notifyChannel('pooler', netsvc.LOG_INFO, 'Connecting to %s' % (db_name.lower())) - db = sql_db.db_connect(db_name) - db_dic[db_name] = db + + db = get_db_only(db_name) if db_name in pool_dic: pool = pool_dic[db_name] else: + import addons + import osv.osv pool = osv.osv.osv_pool() pool_dic[db_name] = pool addons.load_modules(db, force_demo, status, update_module) @@ -60,49 +49,28 @@ def get_db_and_pool(db_name, force_demo=False, status=None, update_module=False) def restart_pool(db_name, force_demo=False, update_module=False): -# del db_dic[db_name] del pool_dic[db_name] return get_db_and_pool(db_name, force_demo, update_module=update_module) -def close_db(db_name): - if db_name in db_dic: - db_dic[db_name].truedb.close() - del db_dic[db_name] - if db_name in pool_dic: - del pool_dic[db_name] - - def get_db_only(db_name): - if db_name in db_dic: - db = db_dic[db_name] - else: - db = sql_db.db_connect(db_name) - db_dic[db_name] = db + # ATTENTION: + # do not put this import outside this function + # sql_db must not be loaded before the logger is initialized. + # sql_db import psycopg2.tool which create a default logger if there is not. + # this resulting of having the logs outputed twice... + import sql_db + db = sql_db.db_connect(db_name) return db def get_db(db_name): -# print "get_db", db_name return get_db_and_pool(db_name)[0] def get_pool(db_name, force_demo=False, status=None, update_module=False): -# print "get_pool", db_name pool = get_db_and_pool(db_name, force_demo, status, update_module)[1] -# addons.load_modules(db_name, False) -# if not pool.obj_list(): -# pool.instanciate() -# print "pool", pool return pool -# return get_db_and_pool(db_name)[1] - - -def init(): - global db -# db = get_db_only(tools.config['db_name']) - sql_db.init() - # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/bin/sql_db.py b/bin/sql_db.py index c43fed6d2e7..e52b94d89fa 100644 --- a/bin/sql_db.py +++ b/bin/sql_db.py @@ -20,38 +20,40 @@ # ############################################################################## -import psycopg +import netsvc +from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_SERIALIZABLE +from psycopg2.pool import ThreadedConnectionPool +from psycopg2.psycopg1 import cursor as psycopg1cursor + +import psycopg2.extensions +psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) + import tools -import sys,os - -#try: -# import decimal -#except ImportError: -# from tools import decimal - - import re from mx import DateTime as mdt re_from = re.compile('.* from "?([a-zA-Z_0-9]+)"? .*$'); re_into = re.compile('.* into "?([a-zA-Z_0-9]+)"? .*$'); -class fake_cursor: +def log(msg, lvl=netsvc.LOG_DEBUG): + logger = netsvc.Logger() + logger.notifyChannel('sql', lvl, msg) + +class Cursor(object): IN_MAX = 1000 - nbr = 0 - _tables = {} sql_from_log = {} sql_into_log = {} sql_log = False count = 0 - def __init__(self, db, con, dbname): - self.db = db - self.obj = db.cursor() - self.con = con - self.dbname = dbname + def __init__(self, pool): + self._pool = pool + self._cnx = pool.getconn() + self.autocommit(False) + self._obj = self._cnx.cursor(cursor_factory=psycopg1cursor) + self.dbname = pool.dbname - def execute(self, sql, params=None): + def execute(self, query, params=None): if not params: params=() def base_string(s): @@ -59,24 +61,28 @@ class fake_cursor: return s.encode('utf-8') return s p=map(base_string, params) - if isinstance(sql, unicode): - sql = sql.encode('utf-8') + query = base_string(query) + + if '%d' in query or '%f' in query: + log(queyr, netsvc.LOG_WARNING) + log("SQL queries mustn't containt %d or %f anymore. Use only %s", netsvc.LOG_WARNING) + query = query.replace('%d', '%s').replace('%f', '%s') + if self.sql_log: now = mdt.now() - print "SQL LOG query:", sql - print "SQL LOG params:", repr(p) - if p: - res = self.obj.execute(sql, p) - else: - res = self.obj.execute(sql) + log("SQL LOG query: %s" % (query,)) + log("SQL LOG params: %r" % (p,)) + + res = self._obj.execute(query, p) + if self.sql_log: self.count+=1 - res_from = re_from.match(sql.lower()) + res_from = re_from.match(query.lower()) if res_from: self.sql_from_log.setdefault(res_from.group(1), [0, 0]) self.sql_from_log[res_from.group(1)][0] += 1 self.sql_from_log[res_from.group(1)][1] += mdt.now() - now - res_into = re_into.match(sql.lower()) + res_into = re_into.match(query.lower()) if res_into: self.sql_into_log.setdefault(res_into.group(1), [0, 0]) self.sql_into_log[res_into.group(1)][0] += 1 @@ -84,7 +90,7 @@ class fake_cursor: return res def print_log(self, type='from'): - print "SQL LOG %s:" % (type,) + log("SQL LOG %s:" % (type,)) if type == 'from': logs = self.sql_from_log.items() else: @@ -92,63 +98,76 @@ class fake_cursor: logs.sort(lambda x, y: cmp(x[1][1], y[1][1])) sum=0 for r in logs: - print "table:", r[0], ":", str(r[1][1]), "/", r[1][0] + log("table: %s: %s/%s" %(r[0], str(r[1][1]), r[1][0])) sum+= r[1][1] - print "SUM:%s/%d"% (sum, self.count) + log("SUM:%s/%d" % (sum, self.count)) def close(self): if self.sql_log: self.print_log('from') self.print_log('into') - self.obj.close() + self._obj.close() # This force the cursor to be freed, and thus, available again. It is # important because otherwise we can overload the server very easily # because of a cursor shortage (because cursors are not garbage # collected as fast as they should). The problem is probably due in # part because browse records keep a reference to the cursor. - del self.obj + del self._obj + self._pool.putconn(self._cnx) + + def autocommit(self, on): + self._cnx.set_isolation_level([ISOLATION_LEVEL_SERIALIZABLE, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)]) + + def commit(self): + return self._cnx.commit() + + def rollback(self): + return self._cnx.rollback() def __getattr__(self, name): - return getattr(self.obj, name) + return getattr(self._obj, name) -class fakedb: - def __init__(self, truedb, dbname): - self.truedb = truedb +class ConnectionPool(object): + def __init__(self, pool, dbname): self.dbname = dbname + self._pool = pool def cursor(self): - return fake_cursor(self.truedb, {}, self.dbname) + return Cursor(self) -def decimalize(symb): - if symb is None: return None - if isinstance(symb, float): - return decimal.Decimal('%f' % symb) - return decimal.Decimal(symb) + def __getattr__(self, name): + return getattr(self._pool, name) + +class PoolManager(object): + _pools = {} + _dsn = None + maxconn = int(tools.config['db_maxconn']) or 64 + + def dsn(db_name): + if PoolManager._dsn is None: + PoolManager._dsn = '' + for p in ('host', 'port', 'user', 'password'): + cfg = tools.config['db_' + p] + if cfg: + PoolManager._dsn += '%s=%s ' % (p, cfg) + return '%s dbname=%s' % (PoolManager._dsn, db_name) + dsn = staticmethod(dsn) + + def get(db_name): + if db_name not in PoolManager._pools: + logger = netsvc.Logger() + try: + logger.notifyChannel('dbpool', netsvc.LOG_INFO, 'Connecting to %s' % (db_name.lower())) + PoolManager._pools[db_name] = ConnectionPool(ThreadedConnectionPool(0, PoolManager.maxconn, PoolManager.dsn(db_name)), db_name) + except Exception, e: + logger.notifyChannel('dbpool', netsvc.LOG_CRITICAL, 'Unable to connect to %s: %r' % (db_name, e)) + raise + return PoolManager._pools[db_name] + get = staticmethod(get) def db_connect(db_name, serialize=0): - host = tools.config['db_host'] and "host=%s" % tools.config['db_host'] or '' - port = tools.config['db_port'] and "port=%s" % tools.config['db_port'] or '' - name = "dbname=%s" % db_name - user = tools.config['db_user'] and "user=%s" % tools.config['db_user'] or '' - password = tools.config['db_password'] and "password=%s" % tools.config['db_password'] or '' - maxconn = int(tools.config['db_maxconn']) or 64 - tdb = psycopg.connect('%s %s %s %s %s' % (host, port, name, user, password), - serialize=serialize, maxconn=maxconn) - fdb = fakedb(tdb, db_name) - return fdb - -def init(): - #define DATEOID 1082, define TIMESTAMPOID 1114 see pgtypes.h - psycopg.register_type(psycopg.new_type((1082,), "date", lambda x:x)) - psycopg.register_type(psycopg.new_type((1083,), "time", lambda x:x)) - psycopg.register_type(psycopg.new_type((1114,), "datetime", lambda x:x)) - #psycopg.register_type(psycopg.new_type((700, 701, 1700), 'decimal', decimalize)) - -psycopg.register_type(psycopg.new_type((1082,), "date", lambda x:x)) -psycopg.register_type(psycopg.new_type((1083,), "time", lambda x:x)) -psycopg.register_type(psycopg.new_type((1114,), "datetime", lambda x:x)) - + return PoolManager.get(db_name) # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/bin/tools/misc.py b/bin/tools/misc.py index 6c6efc5438b..212501a8703 100644 --- a/bin/tools/misc.py +++ b/bin/tools/misc.py @@ -27,10 +27,7 @@ Miscelleanous tools used by OpenERP. import os, time, sys import inspect -import psycopg -#import netsvc from config import config -#import tools import zipfile import release