[FIX] remove the lock on database connections
[ADD] exported method check_connectivity [REF] better logging of the Connection and ConnectionPool objects [REF] use template0 as template when create new database bzr revid: chs@tinyerp.com-20100120163913-5ftbs7e85lwqb1et
This commit is contained in:
parent
5303dc2e92
commit
fecc3a360d
|
@ -24,7 +24,6 @@ import base64
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import security
|
import security
|
||||||
import string
|
|
||||||
import thread
|
import thread
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
@ -39,6 +38,8 @@ import release
|
||||||
import sql_db
|
import sql_db
|
||||||
import tools
|
import tools
|
||||||
import locale
|
import locale
|
||||||
|
from cStringIO import StringIO
|
||||||
|
|
||||||
logging.basicConfig()
|
logging.basicConfig()
|
||||||
|
|
||||||
class db(netsvc.Service):
|
class db(netsvc.Service):
|
||||||
|
@ -62,6 +63,15 @@ class db(netsvc.Service):
|
||||||
|
|
||||||
self._pg_psw_env_var_is_set = False # on win32, pg_dump need the PGPASSWORD env var
|
self._pg_psw_env_var_is_set = False # on win32, pg_dump need the PGPASSWORD env var
|
||||||
|
|
||||||
|
def _create_empty_database(self, name):
|
||||||
|
db = sql_db.db_connect('template1')
|
||||||
|
cr = db.cursor()
|
||||||
|
try:
|
||||||
|
cr.autocommit(True) # avoid transaction block
|
||||||
|
cr.execute("""CREATE DATABASE "%s" ENCODING 'unicode' TEMPLATE "template0" """ % name)
|
||||||
|
finally:
|
||||||
|
cr.close()
|
||||||
|
|
||||||
def create(self, password, db_name, demo, lang, user_password='admin'):
|
def create(self, password, db_name, demo, lang, user_password='admin'):
|
||||||
security.check_super(password)
|
security.check_super(password)
|
||||||
self.id_protect.acquire()
|
self.id_protect.acquire()
|
||||||
|
@ -71,24 +81,13 @@ class db(netsvc.Service):
|
||||||
|
|
||||||
self.actions[id] = {'clean': False}
|
self.actions[id] = {'clean': False}
|
||||||
|
|
||||||
db = sql_db.db_connect('template1')
|
self._create_empty_database(db_name)
|
||||||
db.lock()
|
|
||||||
try:
|
|
||||||
cr = db.cursor()
|
|
||||||
try:
|
|
||||||
cr.autocommit(True) # avoid transaction block
|
|
||||||
cr.execute('CREATE DATABASE "%s" ENCODING \'unicode\'' % db_name)
|
|
||||||
finally:
|
|
||||||
cr.close()
|
|
||||||
finally:
|
|
||||||
db.release()
|
|
||||||
|
|
||||||
class DBInitialize(object):
|
class DBInitialize(object):
|
||||||
def __call__(self, serv, id, db_name, demo, lang, user_password='admin'):
|
def __call__(self, serv, id, db_name, demo, lang, user_password='admin'):
|
||||||
cr = None
|
cr = None
|
||||||
try:
|
try:
|
||||||
serv.actions[id]['progress'] = 0
|
serv.actions[id]['progress'] = 0
|
||||||
clean = False
|
|
||||||
cr = sql_db.db_connect(db_name).cursor()
|
cr = sql_db.db_connect(db_name).cursor()
|
||||||
tools.init_db(cr)
|
tools.init_db(cr)
|
||||||
cr.commit()
|
cr.commit()
|
||||||
|
@ -116,7 +115,6 @@ class db(netsvc.Service):
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
serv.actions[id]['clean'] = False
|
serv.actions[id]['clean'] = False
|
||||||
serv.actions[id]['exception'] = e
|
serv.actions[id]['exception'] = e
|
||||||
from cStringIO import StringIO
|
|
||||||
import traceback
|
import traceback
|
||||||
e_str = StringIO()
|
e_str = StringIO()
|
||||||
traceback.print_exc(file=e_str)
|
traceback.print_exc(file=e_str)
|
||||||
|
@ -137,6 +135,7 @@ class db(netsvc.Service):
|
||||||
|
|
||||||
def get_progress(self, password, id):
|
def get_progress(self, password, id):
|
||||||
security.check_super(password)
|
security.check_super(password)
|
||||||
|
tools.debug((id, self.actions.keys()))
|
||||||
if self.actions[id]['thread'].isAlive():
|
if self.actions[id]['thread'].isAlive():
|
||||||
# return addons.init_progress[db_name]
|
# return addons.init_progress[db_name]
|
||||||
return (min(self.actions[id].get('progress', 0),0.95), [])
|
return (min(self.actions[id].get('progress', 0),0.95), [])
|
||||||
|
@ -144,11 +143,11 @@ class db(netsvc.Service):
|
||||||
clean = self.actions[id]['clean']
|
clean = self.actions[id]['clean']
|
||||||
if clean:
|
if clean:
|
||||||
users = self.actions[id]['users']
|
users = self.actions[id]['users']
|
||||||
del self.actions[id]
|
self.actions.pop(id)
|
||||||
return (1.0, users)
|
return (1.0, users)
|
||||||
else:
|
else:
|
||||||
e = self.actions[id]['exception']
|
e = self.actions[id]['exception']
|
||||||
del self.actions[id]
|
self.actions.pop(id)
|
||||||
raise Exception, e
|
raise Exception, e
|
||||||
|
|
||||||
def drop(self, password, db_name):
|
def drop(self, password, db_name):
|
||||||
|
@ -157,24 +156,20 @@ class db(netsvc.Service):
|
||||||
logger = netsvc.Logger()
|
logger = netsvc.Logger()
|
||||||
|
|
||||||
db = sql_db.db_connect('template1')
|
db = sql_db.db_connect('template1')
|
||||||
db.lock()
|
cr = db.cursor()
|
||||||
|
cr.autocommit(True) # avoid transaction block
|
||||||
try:
|
try:
|
||||||
cr = db.cursor()
|
|
||||||
cr.autocommit(True) # avoid transaction block
|
|
||||||
try:
|
try:
|
||||||
try:
|
cr.execute('DROP DATABASE "%s"' % db_name)
|
||||||
cr.execute('DROP DATABASE "%s"' % db_name)
|
except Exception, e:
|
||||||
except Exception, e:
|
logger.notifyChannel("web-services", netsvc.LOG_ERROR,
|
||||||
logger.notifyChannel("web-services", netsvc.LOG_ERROR,
|
'DROP DB: %s failed:\n%s' % (db_name, e))
|
||||||
'DROP DB: %s failed:\n%s' % (db_name, e))
|
raise Exception("Couldn't drop database %s: %s" % (db_name, e))
|
||||||
raise Exception("Couldn't drop database %s: %s" % (db_name, e))
|
else:
|
||||||
else:
|
logger.notifyChannel("web-services", netsvc.LOG_INFO,
|
||||||
logger.notifyChannel("web-services", netsvc.LOG_INFO,
|
'DROP DB: %s' % (db_name))
|
||||||
'DROP DB: %s' % (db_name))
|
|
||||||
finally:
|
|
||||||
cr.close()
|
|
||||||
finally:
|
finally:
|
||||||
db.release()
|
cr.close()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _set_pg_psw_env_var(self):
|
def _set_pg_psw_env_var(self):
|
||||||
|
@ -227,17 +222,7 @@ class db(netsvc.Service):
|
||||||
'RESTORE DB: %s already exists' % (db_name,))
|
'RESTORE DB: %s already exists' % (db_name,))
|
||||||
raise Exception, "Database already exists"
|
raise Exception, "Database already exists"
|
||||||
|
|
||||||
db = sql_db.db_connect('template1')
|
self._create_empty_database(db_name)
|
||||||
db.lock()
|
|
||||||
try:
|
|
||||||
cr = db.cursor()
|
|
||||||
cr.autocommit(True) # avoid transaction block
|
|
||||||
try:
|
|
||||||
cr.execute("""CREATE DATABASE "%s" ENCODING 'unicode' TEMPLATE "template0" """ % db_name)
|
|
||||||
finally:
|
|
||||||
cr.close()
|
|
||||||
finally:
|
|
||||||
db.release()
|
|
||||||
|
|
||||||
cmd = ['pg_restore', '--no-owner']
|
cmd = ['pg_restore', '--no-owner']
|
||||||
if tools.config['db_user']:
|
if tools.config['db_user']:
|
||||||
|
@ -276,27 +261,23 @@ class db(netsvc.Service):
|
||||||
logger = netsvc.Logger()
|
logger = netsvc.Logger()
|
||||||
|
|
||||||
db = sql_db.db_connect('template1')
|
db = sql_db.db_connect('template1')
|
||||||
db.lock()
|
cr = db.cursor()
|
||||||
try:
|
try:
|
||||||
cr = db.cursor()
|
|
||||||
try:
|
try:
|
||||||
try:
|
cr.execute('ALTER DATABASE "%s" RENAME TO "%s"' % (old_name, new_name))
|
||||||
cr.execute('ALTER DATABASE "%s" RENAME TO "%s"' % (old_name, new_name))
|
except Exception, e:
|
||||||
except Exception, e:
|
logger.notifyChannel("web-services", netsvc.LOG_ERROR,
|
||||||
logger.notifyChannel("web-services", netsvc.LOG_ERROR,
|
'RENAME DB: %s -> %s failed:\n%s' % (old_name, new_name, e))
|
||||||
'RENAME DB: %s -> %s failed:\n%s' % (old_name, new_name, e))
|
raise Exception("Couldn't rename database %s to %s: %s" % (old_name, new_name, e))
|
||||||
raise Exception("Couldn't rename database %s to %s: %s" % (old_name, new_name, e))
|
else:
|
||||||
else:
|
fs = os.path.join(tools.config['root_path'], 'filestore')
|
||||||
fs = os.path.join(tools.config['root_path'], 'filestore')
|
if os.path.exists(os.path.join(fs, old_name)):
|
||||||
if os.path.exists(os.path.join(fs, old_name)):
|
os.rename(os.path.join(fs, old_name), os.path.join(fs, new_name))
|
||||||
os.rename(os.path.join(fs, old_name), os.path.join(fs, new_name))
|
|
||||||
|
|
||||||
logger.notifyChannel("web-services", netsvc.LOG_INFO,
|
logger.notifyChannel("web-services", netsvc.LOG_INFO,
|
||||||
'RENAME DB: %s -> %s' % (old_name, new_name))
|
'RENAME DB: %s -> %s' % (old_name, new_name))
|
||||||
finally:
|
|
||||||
cr.close()
|
|
||||||
finally:
|
finally:
|
||||||
db.release()
|
cr.close()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def db_exist(self, db_name):
|
def db_exist(self, db_name):
|
||||||
|
@ -308,30 +289,26 @@ class db(netsvc.Service):
|
||||||
raise Exception('AccessDenied')
|
raise Exception('AccessDenied')
|
||||||
|
|
||||||
db = sql_db.db_connect('template1')
|
db = sql_db.db_connect('template1')
|
||||||
db.lock()
|
cr = db.cursor()
|
||||||
try:
|
try:
|
||||||
cr = db.cursor()
|
|
||||||
try:
|
try:
|
||||||
try:
|
db_user = tools.config["db_user"]
|
||||||
db_user = tools.config["db_user"]
|
if not db_user and os.name == 'posix':
|
||||||
if not db_user and os.name == 'posix':
|
import pwd
|
||||||
import pwd
|
db_user = pwd.getpwuid(os.getuid())[0]
|
||||||
db_user = pwd.getpwuid(os.getuid())[0]
|
if not db_user:
|
||||||
if not db_user:
|
cr.execute("select decode(usename, 'escape') from pg_user where usesysid=(select datdba from pg_database where datname=%s)", (tools.config["db_name"],))
|
||||||
cr.execute("select decode(usename, 'escape') from pg_user where usesysid=(select datdba from pg_database where datname=%s)", (tools.config["db_name"],))
|
res = cr.fetchone()
|
||||||
res = cr.fetchone()
|
db_user = res and str(res[0])
|
||||||
db_user = res and str(res[0])
|
if db_user:
|
||||||
if db_user:
|
cr.execute("select decode(datname, 'escape') from pg_database where datdba=(select usesysid from pg_user where usename=%s) and datname not in ('template0', 'template1', 'postgres') order by datname", (db_user,))
|
||||||
cr.execute("select decode(datname, 'escape') from pg_database where datdba=(select usesysid from pg_user where usename=%s) and datname not in ('template0', 'template1', 'postgres') order by datname", (db_user,))
|
else:
|
||||||
else:
|
cr.execute("select decode(datname, 'escape') from pg_database where datname not in('template0', 'template1','postgres') order by datname")
|
||||||
cr.execute("select decode(datname, 'escape') from pg_database where datname not in('template0', 'template1','postgres') order by datname")
|
res = [str(name) for (name,) in cr.fetchall()]
|
||||||
res = [str(name) for (name,) in cr.fetchall()]
|
except:
|
||||||
except:
|
res = []
|
||||||
res = []
|
|
||||||
finally:
|
|
||||||
cr.close()
|
|
||||||
finally:
|
finally:
|
||||||
db.release()
|
cr.close()
|
||||||
res.sort()
|
res.sort()
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
@ -366,7 +343,7 @@ class db(netsvc.Service):
|
||||||
self.abortResponse(1, inst.name, 'warning', inst.value)
|
self.abortResponse(1, inst.name, 'warning', inst.value)
|
||||||
except except_osv, inst:
|
except except_osv, inst:
|
||||||
self.abortResponse(1, inst.name, inst.exc_type, inst.value)
|
self.abortResponse(1, inst.name, inst.exc_type, inst.value)
|
||||||
except Exception, e:
|
except Exception:
|
||||||
import traceback
|
import traceback
|
||||||
tb_s = reduce(lambda x, y: x+y, traceback.format_exception( sys.exc_type, sys.exc_value, sys.exc_traceback))
|
tb_s = reduce(lambda x, y: x+y, traceback.format_exception( sys.exc_type, sys.exc_value, sys.exc_traceback))
|
||||||
l.notifyChannel('web-services', netsvc.LOG_ERROR, tb_s)
|
l.notifyChannel('web-services', netsvc.LOG_ERROR, tb_s)
|
||||||
|
@ -389,6 +366,7 @@ class common(netsvc.Service):
|
||||||
self.exportMethod(self.get_migration_scripts)
|
self.exportMethod(self.get_migration_scripts)
|
||||||
self.exportMethod(self.get_server_environment)
|
self.exportMethod(self.get_server_environment)
|
||||||
self.exportMethod(self.login_message)
|
self.exportMethod(self.login_message)
|
||||||
|
self.exportMethod(self.check_connectivity)
|
||||||
|
|
||||||
def ir_set(self, db, uid, password, keys, args, name, value, replace=True, isobject=False):
|
def ir_set(self, db, uid, password, keys, args, name, value, replace=True, isobject=False):
|
||||||
security.check(db, uid, password)
|
security.check(db, uid, password)
|
||||||
|
@ -512,7 +490,7 @@ GNU Public Licence.
|
||||||
l.notifyChannel('migration', netsvc.LOG_ERROR, 'unable to read the module %s' % (module,))
|
l.notifyChannel('migration', netsvc.LOG_ERROR, 'unable to read the module %s' % (module,))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
zip_contents = cStringIO.StringIO(base64_decoded)
|
zip_contents = StringIO(base64_decoded)
|
||||||
zip_contents.seek(0)
|
zip_contents.seek(0)
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
|
@ -570,11 +548,13 @@ GNU Public Licence.
|
||||||
%(platform.release(), platform.version(), platform.architecture()[0],
|
%(platform.release(), platform.version(), platform.architecture()[0],
|
||||||
os_lang, platform.python_version(),release.version,rev_id)
|
os_lang, platform.python_version(),release.version,rev_id)
|
||||||
return environment
|
return environment
|
||||||
|
|
||||||
|
|
||||||
def login_message(self):
|
def login_message(self):
|
||||||
return tools.config.get('login_message', False)
|
return tools.config.get('login_message', False)
|
||||||
|
|
||||||
|
def check_connectivity(self):
|
||||||
|
return bool(sql_db.db_connect('template1'))
|
||||||
|
|
||||||
common()
|
common()
|
||||||
|
|
||||||
class objects_proxy(netsvc.Service):
|
class objects_proxy(netsvc.Service):
|
||||||
|
|
|
@ -119,7 +119,7 @@ class Cursor(object):
|
||||||
|
|
||||||
if self.sql_log:
|
if self.sql_log:
|
||||||
now = mdt.now()
|
now = mdt.now()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
params = params or None
|
params = params or None
|
||||||
res = self._obj.execute(query, params)
|
res = self._obj.execute(query, params)
|
||||||
|
@ -195,11 +195,11 @@ class Cursor(object):
|
||||||
def autocommit(self, on):
|
def autocommit(self, on):
|
||||||
offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
|
offlevel = [ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE][bool(self._serialized)]
|
||||||
self._cnx.set_isolation_level([offlevel, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)])
|
self._cnx.set_isolation_level([offlevel, ISOLATION_LEVEL_AUTOCOMMIT][bool(on)])
|
||||||
|
|
||||||
@check
|
@check
|
||||||
def commit(self):
|
def commit(self):
|
||||||
return self._cnx.commit()
|
return self._cnx.commit()
|
||||||
|
|
||||||
@check
|
@check
|
||||||
def rollback(self):
|
def rollback(self):
|
||||||
return self._cnx.rollback()
|
return self._cnx.rollback()
|
||||||
|
@ -228,16 +228,13 @@ class ConnectionPool(object):
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
self._logger = netsvc.Logger()
|
self._logger = netsvc.Logger()
|
||||||
|
|
||||||
def _log(self, msg):
|
|
||||||
#self._logger.notifyChannel('ConnectionPool', netsvc.LOG_INFO, msg)
|
|
||||||
pass
|
|
||||||
def _debug(self, msg):
|
def _debug(self, msg):
|
||||||
#self._logger.notifyChannel('ConnectionPool', netsvc.LOG_DEBUG, msg)
|
self._logger.notifyChannel('ConnectionPool', netsvc.LOG_DEBUG, msg)
|
||||||
pass
|
#pass
|
||||||
|
|
||||||
@locked
|
@locked
|
||||||
def borrow(self, dsn):
|
def borrow(self, dsn):
|
||||||
self._log('Borrow connection to %s' % (dsn,))
|
self._debug('Borrow connection to %s' % (dsn,))
|
||||||
|
|
||||||
result = None
|
result = None
|
||||||
for i, (cnx, used) in enumerate(self._connections):
|
for i, (cnx, used) in enumerate(self._connections):
|
||||||
|
@ -270,7 +267,7 @@ class ConnectionPool(object):
|
||||||
|
|
||||||
@locked
|
@locked
|
||||||
def give_back(self, connection):
|
def give_back(self, connection):
|
||||||
self._log('Give back connection to %s' % (connection.dsn,))
|
self._debug('Give back connection to %s' % (connection.dsn,))
|
||||||
for i, (cnx, used) in enumerate(self._connections):
|
for i, (cnx, used) in enumerate(self._connections):
|
||||||
if cnx is connection:
|
if cnx is connection:
|
||||||
self._connections.pop(i)
|
self._connections.pop(i)
|
||||||
|
@ -281,6 +278,7 @@ class ConnectionPool(object):
|
||||||
|
|
||||||
@locked
|
@locked
|
||||||
def close_all(self, dsn):
|
def close_all(self, dsn):
|
||||||
|
self._debug('Close all connections to %s' % (dsn,))
|
||||||
for i, (cnx, used) in tools.reverse_enumerate(self._connections):
|
for i, (cnx, used) in tools.reverse_enumerate(self._connections):
|
||||||
if dsn_are_equals(cnx.dsn, dsn):
|
if dsn_are_equals(cnx.dsn, dsn):
|
||||||
cnx.close()
|
cnx.close()
|
||||||
|
@ -288,37 +286,17 @@ class ConnectionPool(object):
|
||||||
|
|
||||||
|
|
||||||
class Connection(object):
|
class Connection(object):
|
||||||
__LOCKS = {}
|
def _debug(self, msg):
|
||||||
|
self._logger.notifyChannel('Connection', netsvc.LOG_DEBUG, msg)
|
||||||
|
|
||||||
def __init__(self, pool, dbname, unique=False):
|
def __init__(self, pool, dbname):
|
||||||
self.dbname = dbname
|
self.dbname = dbname
|
||||||
self._pool = pool
|
self._pool = pool
|
||||||
self._unique = unique
|
self._logger = netsvc.Logger()
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
if self._unique:
|
|
||||||
self.lock()
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, traceback):
|
|
||||||
if self._unique:
|
|
||||||
self.release()
|
|
||||||
|
|
||||||
def lock(self):
|
|
||||||
if self.dbname not in self.__LOCKS:
|
|
||||||
self.__LOCKS[self.dbname] = threading.Lock()
|
|
||||||
self.__LOCKS[self.dbname].acquire()
|
|
||||||
|
|
||||||
def release(self):
|
|
||||||
close_db(self.dbname)
|
|
||||||
self.__LOCKS[self.dbname].release()
|
|
||||||
|
|
||||||
def cursor(self, serialized=False):
|
def cursor(self, serialized=False):
|
||||||
if self._unique:
|
cursor_type = serialized and 'serialized ' or ''
|
||||||
lock = self.__LOCKS.get(self.dbname, None)
|
self._debug('create %scursor to "%s"' % (cursor_type, self.dbname,))
|
||||||
if not (lock and lock.locked()):
|
|
||||||
netsvc.Logger().notifyChannel('Connection', netsvc.LOG_WARNING, 'Unprotected connection to %s' % (self.dbname,))
|
|
||||||
|
|
||||||
return Cursor(self._pool, self.dbname, serialized=serialized)
|
return Cursor(self._pool, self.dbname, serialized=serialized)
|
||||||
|
|
||||||
def serialized_cursor(self):
|
def serialized_cursor(self):
|
||||||
|
@ -354,8 +332,7 @@ def dsn_are_equals(first, second):
|
||||||
_Pool = ConnectionPool(int(tools.config['db_maxconn']))
|
_Pool = ConnectionPool(int(tools.config['db_maxconn']))
|
||||||
|
|
||||||
def db_connect(db_name):
|
def db_connect(db_name):
|
||||||
unique = db_name in ['template1', 'template0']
|
return Connection(_Pool, db_name)
|
||||||
return Connection(_Pool, db_name, unique)
|
|
||||||
|
|
||||||
def close_db(db_name):
|
def close_db(db_name):
|
||||||
_Pool.close_all(dsn(db_name))
|
_Pool.close_all(dsn(db_name))
|
||||||
|
|
Loading…
Reference in New Issue