[MERGE] multiprocess signaling

trying trunk, feature branch fails with Address already in use

bzr revid: al@openerp.com-20121208210954-pi55sib4x7eyamem
This commit is contained in:
Antony Lesuisse 2012-12-08 22:09:54 +01:00
commit a6700e0e3f
7 changed files with 131 additions and 5 deletions

View File

@ -43,7 +43,7 @@ class ir_ui_menu(osv.osv):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.cache_lock = threading.RLock() self.cache_lock = threading.RLock()
self.clear_cache() self._cache = {}
r = super(ir_ui_menu, self).__init__(*args, **kwargs) r = super(ir_ui_menu, self).__init__(*args, **kwargs)
self.pool.get('ir.model.access').register_cache_clearing_method(self._name, 'clear_cache') self.pool.get('ir.model.access').register_cache_clearing_method(self._name, 'clear_cache')
return r return r
@ -51,6 +51,10 @@ class ir_ui_menu(osv.osv):
def clear_cache(self): def clear_cache(self):
with self.cache_lock: with self.cache_lock:
# radical but this doesn't frequently happen # radical but this doesn't frequently happen
if self._cache:
# Normally this is done by openerp.tools.ormcache
# but since we do not use it, set it by ourself.
self.pool._any_cache_cleared = True
self._cache = {} self._cache = {}
def _filter_visible_menus(self, cr, uid, ids, context=None): def _filter_visible_menus(self, cr, uid, ids, context=None):

View File

@ -30,6 +30,7 @@ import re
import urllib import urllib
import zipimport import zipimport
import openerp
from openerp import modules, pooler, release, tools, addons from openerp import modules, pooler, release, tools, addons
from openerp.modules.db import create_categories from openerp.modules.db import create_categories
from openerp.tools.parse_version import parse_version from openerp.tools.parse_version import parse_version
@ -385,6 +386,8 @@ class module(osv.osv):
# Mark them to be installed. # Mark them to be installed.
if to_install_ids: if to_install_ids:
self.button_install(cr, uid, to_install_ids, context=context) self.button_install(cr, uid, to_install_ids, context=context)
openerp.modules.registry.RegistryManager.signal_registry_change(cr.dbname)
return dict(ACTION_DICT, name=_('Install')) return dict(ACTION_DICT, name=_('Install'))
def button_immediate_install(self, cr, uid, ids, context=None): def button_immediate_install(self, cr, uid, ids, context=None):

View File

@ -204,9 +204,12 @@ def start_master_thread():
_logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), " _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
"this may cause trouble if you reach that number of parallel cron tasks.", "this may cause trouble if you reach that number of parallel cron tasks.",
db_maxconn, _thread_slots) db_maxconn, _thread_slots)
t = threading.Thread(target=runner, name="openerp.cron.master_thread") if _thread_slots:
t.setDaemon(True) t = threading.Thread(target=runner, name="openerp.cron.master_thread")
t.start() t.setDaemon(True)
_logger.debug("Master cron daemon started!") t.start()
_logger.debug("Master cron daemon started!")
else:
_logger.info("No master cron daemon (0 workers needed).")
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -58,6 +58,18 @@ class Registry(object):
self.db_name = db_name self.db_name = db_name
self.db = openerp.sql_db.db_connect(db_name) self.db = openerp.sql_db.db_connect(db_name)
# Inter-process signaling (used only when openerp.multi_process is True):
# The `base_registry_signaling` sequence indicates the whole registry
# must be reloaded.
# The `base_cache_signaling sequence` indicates all caches must be
# invalidated (i.e. cleared).
self.base_registry_signaling_sequence = 1
self.base_cache_signaling_sequence = 1
# Flag indicating if at least one model cache has been cleared.
# Useful only in a multi-process context.
self._any_cache_cleared = False
cr = self.db.cursor() cr = self.db.cursor()
has_unaccent = openerp.modules.db.has_unaccent(cr) has_unaccent = openerp.modules.db.has_unaccent(cr)
if openerp.tools.config['unaccent'] and not has_unaccent: if openerp.tools.config['unaccent'] and not has_unaccent:
@ -121,6 +133,36 @@ class Registry(object):
""" """
for model in self.models.itervalues(): for model in self.models.itervalues():
model.clear_caches() model.clear_caches()
# Special case for ir_ui_menu which does not use openerp.tools.ormcache.
ir_ui_menu = self.models.get('ir.ui.menu')
if ir_ui_menu:
ir_ui_menu.clear_cache()
# Useful only in a multi-process context.
def reset_any_cache_cleared(self):
self._any_cache_cleared = False
# Useful only in a multi-process context.
def any_cache_cleared(self):
return self._any_cache_cleared
@classmethod
def setup_multi_process_signaling(cls, cr):
if not openerp.multi_process:
return
# Inter-process signaling:
# The `base_registry_signaling` sequence indicates the whole registry
# must be reloaded.
# The `base_cache_signaling sequence` indicates all caches must be
# invalidated (i.e. cleared).
cr.execute("""SELECT sequence_name FROM information_schema.sequences WHERE sequence_name='base_registry_signaling'""")
if not cr.fetchall():
cr.execute("""CREATE SEQUENCE base_registry_signaling INCREMENT BY 1 START WITH 1""")
cr.execute("""SELECT nextval('base_registry_signaling')""")
cr.execute("""CREATE SEQUENCE base_cache_signaling INCREMENT BY 1 START WITH 1""")
cr.execute("""SELECT nextval('base_cache_signaling')""")
@contextmanager @contextmanager
def cursor(self, auto_commit=True): def cursor(self, auto_commit=True):
@ -182,6 +224,7 @@ class RegistryManager(object):
cr = registry.db.cursor() cr = registry.db.cursor()
try: try:
Registry.setup_multi_process_signaling(cr)
registry.do_parent_store(cr) registry.do_parent_store(cr)
registry.get('ir.actions.report.xml').register_all(cr) registry.get('ir.actions.report.xml').register_all(cr)
cr.commit() cr.commit()
@ -232,5 +275,71 @@ class RegistryManager(object):
if db_name in cls.registries: if db_name in cls.registries:
cls.registries[db_name].clear_caches() cls.registries[db_name].clear_caches()
@classmethod
def check_registry_signaling(cls, db_name):
if openerp.multi_process and db_name in cls.registries:
registry = cls.get(db_name, pooljobs=False)
cr = registry.db.cursor()
try:
cr.execute("""
SELECT base_registry_signaling.last_value,
base_cache_signaling.last_value
FROM base_registry_signaling, base_cache_signaling""")
r, c = cr.fetchone()
# Check if the model registry must be reloaded (e.g. after the
# database has been updated by another process).
if registry.base_registry_signaling_sequence != r:
_logger.info("Reloading the model registry after database signaling.")
# Don't run the cron in the Gunicorn worker.
registry = cls.new(db_name, pooljobs=False)
registry.base_registry_signaling_sequence = r
# Check if the model caches must be invalidated (e.g. after a write
# occured on another process). Don't clear right after a registry
# has been reload.
elif registry.base_cache_signaling_sequence != c:
_logger.info("Invalidating all model caches after database signaling.")
registry.base_cache_signaling_sequence = c
registry.clear_caches()
registry.reset_any_cache_cleared()
# One possible reason caches have been invalidated is the
# use of decimal_precision.write(), in which case we need
# to refresh fields.float columns.
for model in registry.models.values():
for column in model._columns.values():
if hasattr(column, 'digits_change'):
column.digits_change(cr)
finally:
cr.close()
@classmethod
def signal_caches_change(cls, db_name):
if openerp.multi_process and db_name in cls.registries:
# Check the registries if any cache has been cleared and signal it
# through the database to other processes.
registry = cls.get(db_name, pooljobs=False)
if registry.any_cache_cleared():
_logger.info("At least one model cache has been cleared, signaling through the database.")
cr = registry.db.cursor()
r = 1
try:
cr.execute("select nextval('base_cache_signaling')")
r = cr.fetchone()[0]
finally:
cr.close()
registry.base_cache_signaling_sequence = r
registry.reset_any_cache_cleared()
@classmethod
def signal_registry_change(cls, db_name):
if openerp.multi_process and db_name in cls.registries:
registry = cls.get(db_name, pooljobs=False)
cr = registry.db.cursor()
r = 1
try:
cr.execute("select nextval('base_registry_signaling')")
r = cr.fetchone()[0]
finally:
cr.close()
registry.base_registry_signaling_sequence = r
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -2509,6 +2509,7 @@ class BaseModel(object):
try: try:
getattr(self, '_ormcache') getattr(self, '_ormcache')
self._ormcache = {} self._ormcache = {}
self.pool._any_cache_cleared = True
except AttributeError: except AttributeError:
pass pass

View File

@ -597,8 +597,10 @@ class objects_proxy(netsvc.ExportService):
raise NameError("Method not available %s" % method) raise NameError("Method not available %s" % method)
security.check(db,uid,passwd) security.check(db,uid,passwd)
assert openerp.osv.osv.service, "The object_proxy class must be started with start_object_proxy." assert openerp.osv.osv.service, "The object_proxy class must be started with start_object_proxy."
openerp.modules.registry.RegistryManager.check_registry_signaling(db)
fn = getattr(openerp.osv.osv.service, method) fn = getattr(openerp.osv.osv.service, method)
res = fn(db, uid, *params) res = fn(db, uid, *params)
openerp.modules.registry.RegistryManager.signal_caches_change(db)
return res return res
@ -680,8 +682,10 @@ class report_spool(netsvc.ExportService):
if method not in ['report', 'report_get', 'render_report']: if method not in ['report', 'report_get', 'render_report']:
raise KeyError("Method not supported %s" % method) raise KeyError("Method not supported %s" % method)
security.check(db,uid,passwd) security.check(db,uid,passwd)
openerp.modules.registry.RegistryManager.check_registry_signaling(db)
fn = getattr(self, 'exp_' + method) fn = getattr(self, 'exp_' + method)
res = fn(db, uid, *params) res = fn(db, uid, *params)
openerp.modules.registry.RegistryManager.signal_caches_change(db)
return res return res
def exp_render_report(self, db, uid, object, ids, datas=None, context=None): def exp_render_report(self, db, uid, object, ids, datas=None, context=None):

View File

@ -57,10 +57,12 @@ class ormcache(object):
try: try:
key = args[self.skiparg-2:] key = args[self.skiparg-2:]
del d[key] del d[key]
self2.pool._any_cache_cleared = True
except KeyError: except KeyError:
pass pass
else: else:
d.clear() d.clear()
self2.pool._any_cache_cleared = True
class ormcache_multi(ormcache): class ormcache_multi(ormcache):
def __init__(self, skiparg=2, size=8192, multi=3): def __init__(self, skiparg=2, size=8192, multi=3):