[FIX] allow `--log-db` to connect to another postgresql server than the one setted in config

This commit is contained in:
Christophe Simonis 2014-08-25 18:52:50 +02:00
parent c6df857533
commit 415525cecc
2 changed files with 60 additions and 52 deletions

View File

@ -80,7 +80,7 @@ class PostgreSQLHandler(logging.Handler):
dbname = tools.config['log_db'] or ct_db dbname = tools.config['log_db'] or ct_db
if not dbname: if not dbname:
return return
with tools.ignore(Exception), tools.mute_logger('openerp.sql_db'), sql_db.db_connect(dbname).cursor() as cr: with tools.ignore(Exception), tools.mute_logger('openerp.sql_db'), sql_db.db_connect(dbname, allow_uri=True).cursor() as cr:
msg = tools.ustr(record.msg) msg = tools.ustr(record.msg)
if record.args: if record.args:
msg = msg % record.args msg = msg % record.args

View File

@ -30,6 +30,7 @@ the ORM does, in fact.
from contextlib import contextmanager from contextlib import contextmanager
from functools import wraps from functools import wraps
import logging import logging
import urlparse
import uuid import uuid
import psycopg2.extras import psycopg2.extras
import psycopg2.extensions import psycopg2.extensions
@ -47,11 +48,13 @@ types_mapping = {
} }
def unbuffer(symb, cr): def unbuffer(symb, cr):
if symb is None: return None if symb is None:
return None
return str(symb) return str(symb)
def undecimalize(symb, cr): def undecimalize(symb, cr):
if symb is None: return None if symb is None:
return None
return float(symb) return float(symb)
for name, typeoid in types_mapping.items(): for name, typeoid in types_mapping.items():
@ -142,7 +145,7 @@ class Cursor(object):
*any* data which may be modified during the life of the cursor. *any* data which may be modified during the life of the cursor.
""" """
IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit IN_MAX = 1000 # decent limit on size of IN queries - guideline = Oracle limit
def check(f): def check(f):
@wraps(f) @wraps(f)
@ -155,7 +158,7 @@ class Cursor(object):
return f(self, *args, **kwargs) return f(self, *args, **kwargs)
return wrapper return wrapper
def __init__(self, pool, dbname, serialized=True): def __init__(self, pool, dbname, dsn, serialized=True):
self.sql_from_log = {} self.sql_from_log = {}
self.sql_into_log = {} self.sql_into_log = {}
@ -164,19 +167,21 @@ class Cursor(object):
self.sql_log = _logger.isEnabledFor(logging.DEBUG) self.sql_log = _logger.isEnabledFor(logging.DEBUG)
self.sql_log_count = 0 self.sql_log_count = 0
self._closed = True # avoid the call of close() (by __del__) if an exception
# is raised by any of the following initialisations # avoid the call of close() (by __del__) if an exception
# is raised by any of the following initialisations
self._closed = True
self.__pool = pool self.__pool = pool
self.dbname = dbname self.dbname = dbname
# Whether to enable snapshot isolation level for this cursor. # Whether to enable snapshot isolation level for this cursor.
# see also the docstring of Cursor. # see also the docstring of Cursor.
self._serialized = serialized self._serialized = serialized
self._cnx = pool.borrow(dsn(dbname)) self._cnx = pool.borrow(dsn)
self._obj = self._cnx.cursor() self._obj = self._cnx.cursor()
if self.sql_log: if self.sql_log:
self.__caller = frame_codeinfo(currentframe(),2) self.__caller = frame_codeinfo(currentframe(), 2)
else: else:
self.__caller = False self.__caller = False
self._closed = False # real initialisation value self._closed = False # real initialisation value
@ -188,7 +193,7 @@ class Cursor(object):
self.cache = {} self.cache = {}
def __build_dict(self, row): def __build_dict(self, row):
return { d.name: row[i] for i, d in enumerate(self._obj.description) } return {d.name: row[i] for i, d in enumerate(self._obj.description)}
def dictfetchone(self): def dictfetchone(self):
row = self._obj.fetchone() row = self._obj.fetchone()
return row and self.__build_dict(row) return row and self.__build_dict(row)
@ -216,8 +221,7 @@ class Cursor(object):
def execute(self, query, params=None, log_exceptions=None): def execute(self, query, params=None, log_exceptions=None):
if '%d' in query or '%f' in query: if '%d' in query or '%f' in query:
_logger.warning(query) _logger.warning(query)
_logger.warning("SQL queries cannot contain %d or %f anymore. " _logger.warning("SQL queries cannot contain %d or %f anymore. Use only %s")
"Use only %s")
if params and not isinstance(params, (tuple, list, dict)): if params and not isinstance(params, (tuple, list, dict)):
_logger.error("SQL query parameters should be a tuple, list or dict; got %r", params) _logger.error("SQL query parameters should be a tuple, list or dict; got %r", params)
raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,)) raise ValueError("SQL query parameters should be a tuple, list or dict; got %r" % (params,))
@ -258,7 +262,6 @@ class Cursor(object):
self.sql_into_log[res_into.group(1)][1] += delay self.sql_into_log[res_into.group(1)][1] += delay
return res return res
def split_for_in_conditions(self, ids): def split_for_in_conditions(self, ids):
"""Split a list of identifiers into one or more smaller tuples """Split a list of identifiers into one or more smaller tuples
safe for IN conditions, after uniquifying them.""" safe for IN conditions, after uniquifying them."""
@ -270,22 +273,20 @@ class Cursor(object):
if not self.sql_log: if not self.sql_log:
return return
def process(type): def process(type):
sqllogs = {'from':self.sql_from_log, 'into':self.sql_into_log} sqllogs = {'from': self.sql_from_log, 'into': self.sql_into_log}
sum = 0 sum = 0
if sqllogs[type]: if sqllogs[type]:
sqllogitems = sqllogs[type].items() sqllogitems = sqllogs[type].items()
sqllogitems.sort(key=lambda k: k[1][1]) sqllogitems.sort(key=lambda k: k[1][1])
_logger.debug("SQL LOG %s:", type) _logger.debug("SQL LOG %s:", type)
sqllogitems.sort(lambda x,y: cmp(x[1][0], y[1][0])) sqllogitems.sort(lambda x, y: cmp(x[1][0], y[1][0]))
for r in sqllogitems: for r in sqllogitems:
delay = timedelta(microseconds=r[1][1]) delay = timedelta(microseconds=r[1][1])
_logger.debug("table: %s: %s/%s", _logger.debug("table: %s: %s/%s", r[0], delay, r[1][0])
r[0], delay, r[1][0]) sum += r[1][1]
sum+= r[1][1]
sqllogs[type].clear() sqllogs[type].clear()
sum = timedelta(microseconds=sum) sum = timedelta(microseconds=sum)
_logger.debug("SUM %s:%s/%d [%d]", _logger.debug("SUM %s:%s/%d [%d]", type, sum, self.sql_log_count, sql_counter)
type, sum, self.sql_log_count, sql_counter)
sqllogs[type].clear() sqllogs[type].clear()
process('from') process('from')
process('into') process('into')
@ -305,7 +306,7 @@ class Cursor(object):
del self.cache del self.cache
if self.sql_log: if self.sql_log:
self.__closer = frame_codeinfo(currentframe(),3) self.__closer = frame_codeinfo(currentframe(), 3)
# simple query count is always computed # simple query count is always computed
sql_counter += self.sql_log_count sql_counter += self.sql_log_count
@ -349,9 +350,10 @@ class Cursor(object):
# is remapped to serializable before being # is remapped to serializable before being
# sent to the database, so it is in fact # sent to the database, so it is in fact
# unavailable for use with pg 9.1. # unavailable for use with pg 9.1.
isolation_level = ISOLATION_LEVEL_REPEATABLE_READ \ isolation_level = \
if self._serialized \ ISOLATION_LEVEL_REPEATABLE_READ \
else ISOLATION_LEVEL_READ_COMMITTED if self._serialized \
else ISOLATION_LEVEL_READ_COMMITTED
self._cnx.set_isolation_level(isolation_level) self._cnx.set_isolation_level(isolation_level)
@check @check
@ -442,10 +444,10 @@ class PsycoConnection(psycopg2.extensions.connection):
class ConnectionPool(object): class ConnectionPool(object):
""" The pool of connections to database(s) """ The pool of connections to database(s)
Keep a set of connections to pg databases open, and reuse them Keep a set of connections to pg databases open, and reuse them
to open cursors for all transactions. to open cursors for all transactions.
The connections are *not* automatically closed. Only a close_db() The connections are *not* automatically closed. Only a close_db()
can trigger that. can trigger that.
""" """
@ -460,7 +462,6 @@ class ConnectionPool(object):
self._lock.release() self._lock.release()
return _locked return _locked
def __init__(self, maxconn=64): def __init__(self, maxconn=64):
self._connections = [] self._connections = []
self._maxconn = max(maxconn, 1) self._maxconn = max(maxconn, 1)
@ -491,7 +492,7 @@ class ConnectionPool(object):
_logger.warning('%r: Free leaked connection to %r', self, cnx.dsn) _logger.warning('%r: Free leaked connection to %r', self, cnx.dsn)
for i, (cnx, used) in enumerate(self._connections): for i, (cnx, used) in enumerate(self._connections):
if not used and dsn_are_equals(cnx.dsn, dsn): if not used and cnx._original_dsn == dsn:
try: try:
cnx.reset() cnx.reset()
except psycopg2.OperationalError: except psycopg2.OperationalError:
@ -522,6 +523,7 @@ class ConnectionPool(object):
except psycopg2.Error: except psycopg2.Error:
_logger.exception('Connection to the database failed') _logger.exception('Connection to the database failed')
raise raise
result._original_dsn = dsn
self._connections.append((result, True)) self._connections.append((result, True))
self._debug('Create new connection') self._debug('Create new connection')
return result return result
@ -546,7 +548,7 @@ class ConnectionPool(object):
def close_all(self, dsn=None): def close_all(self, dsn=None):
_logger.info('%r: Close all connections to %r', self, dsn) _logger.info('%r: Close all connections to %r', self, dsn)
for i, (cnx, used) in tools.reverse_enumerate(self._connections): for i, (cnx, used) in tools.reverse_enumerate(self._connections):
if dsn is None or dsn_are_equals(cnx.dsn, dsn): if dsn is None or cnx._original_dsn == dsn:
cnx.close() cnx.close()
self._connections.pop(i) self._connections.pop(i)
@ -554,20 +556,20 @@ class ConnectionPool(object):
class Connection(object): class Connection(object):
""" A lightweight instance of a connection to postgres """ A lightweight instance of a connection to postgres
""" """
def __init__(self, pool, dbname, dsn):
def __init__(self, pool, dbname):
self.dbname = dbname self.dbname = dbname
self.dsn = dsn
self.__pool = pool self.__pool = pool
def cursor(self, serialized=True): def cursor(self, serialized=True):
cursor_type = serialized and 'serialized ' or '' cursor_type = serialized and 'serialized ' or ''
_logger.debug('create %scursor to %r', cursor_type, self.dbname) _logger.debug('create %scursor to %r', cursor_type, self.dsn)
return Cursor(self.__pool, self.dbname, serialized=serialized) return Cursor(self.__pool, self.dbname, self.dsn, serialized=serialized)
def test_cursor(self, serialized=True): def test_cursor(self, serialized=True):
cursor_type = serialized and 'serialized ' or '' cursor_type = serialized and 'serialized ' or ''
_logger.debug('create test %scursor to %r', cursor_type, self.dbname) _logger.debug('create test %scursor to %r', cursor_type, self.dsn)
return TestCursor(self.__pool, self.dbname, serialized=serialized) return TestCursor(self.__pool, self.dbname, self.dsn, serialized=serialized)
# serialized_cursor is deprecated - cursors are serialized by default # serialized_cursor is deprecated - cursors are serialized by default
serialized_cursor = cursor serialized_cursor = cursor
@ -582,42 +584,48 @@ class Connection(object):
except Exception: except Exception:
return False return False
def dsn(db_name): def dsn(db_or_uri):
"""parse the given `db_or_uri` and return a 2-tuple (dbname, uri)"""
if db_or_uri.startswith(('postgresql://', 'postgres://')):
# extract db from uri
us = urlparse.urlsplit(db_or_uri)
if len(us.path) > 1:
db_name = us.path[1:]
elif us.username:
db_name = us.username
else:
db_name = us.hostname
return db_name, db_or_uri
_dsn = '' _dsn = ''
for p in ('host', 'port', 'user', 'password'): for p in ('host', 'port', 'user', 'password'):
cfg = tools.config['db_' + p] cfg = tools.config['db_' + p]
if cfg: if cfg:
_dsn += '%s=%s ' % (p, cfg) _dsn += '%s=%s ' % (p, cfg)
return '%sdbname=%s' % (_dsn, db_name) return db_or_uri, '%sdbname=%s' % (_dsn, db_or_uri)
def dsn_are_equals(first, second):
def key(dsn):
k = dict(x.split('=', 1) for x in dsn.strip().split())
k.pop('password', None) # password is not relevant
return k
return key(first) == key(second)
_Pool = None _Pool = None
def db_connect(db_name): def db_connect(to, allow_uri=False):
global _Pool global _Pool
if _Pool is None: if _Pool is None:
_Pool = ConnectionPool(int(tools.config['db_maxconn'])) _Pool = ConnectionPool(int(tools.config['db_maxconn']))
return Connection(_Pool, db_name)
db, uri = dsn(to)
if not allow_uri and db != to:
raise ValueError('URI connections not allowed')
return Connection(_Pool, db, uri)
def close_db(db_name): def close_db(db_name):
""" You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function.""" """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
global _Pool global _Pool
if _Pool: if _Pool:
_Pool.close_all(dsn(db_name)) _Pool.close_all(dsn(db_name)[1])
def close_all(): def close_all():
global _Pool global _Pool
if _Pool: if _Pool:
_Pool.close_all() _Pool.close_all()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: