From 3d2a09a9730cb7fd8635b9c0b3836060e973d57b Mon Sep 17 00:00:00 2001 From: Antony Lesuisse Date: Sat, 8 Dec 2012 19:11:51 +0100 Subject: [PATCH] multiprocessing signaling manually backported from 6.1 bzr revid: al@openerp.com-20121208181151-lfy956ysnok5b5hf --- openerp/addons/base/ir/ir_ui_menu.py | 6 +- openerp/addons/base/module/module.py | 3 + openerp/cron.py | 11 ++- openerp/modules/registry.py | 109 +++++++++++++++++++++++++++ openerp/osv/orm.py | 1 + openerp/service/web_services.py | 4 + openerp/tools/cache.py | 2 + 7 files changed, 131 insertions(+), 5 deletions(-) diff --git a/openerp/addons/base/ir/ir_ui_menu.py b/openerp/addons/base/ir/ir_ui_menu.py index a8475d33552..94b5c8ab7b6 100644 --- a/openerp/addons/base/ir/ir_ui_menu.py +++ b/openerp/addons/base/ir/ir_ui_menu.py @@ -43,7 +43,7 @@ class ir_ui_menu(osv.osv): def __init__(self, *args, **kwargs): self.cache_lock = threading.RLock() - self.clear_cache() + self._cache = {} r = super(ir_ui_menu, self).__init__(*args, **kwargs) self.pool.get('ir.model.access').register_cache_clearing_method(self._name, 'clear_cache') return r @@ -51,6 +51,10 @@ class ir_ui_menu(osv.osv): def clear_cache(self): with self.cache_lock: # radical but this doesn't frequently happen + if self._cache: + # Normally this is done by openerp.tools.ormcache + # but since we do not use it, set it by ourself. + self.pool._any_cache_cleared = True self._cache = {} def _filter_visible_menus(self, cr, uid, ids, context=None): diff --git a/openerp/addons/base/module/module.py b/openerp/addons/base/module/module.py index dbb8d6a3f40..cbd046a86e3 100644 --- a/openerp/addons/base/module/module.py +++ b/openerp/addons/base/module/module.py @@ -30,6 +30,7 @@ import re import urllib import zipimport +import openerp from openerp import modules, pooler, release, tools, addons from openerp.modules.db import create_categories from openerp.tools.parse_version import parse_version @@ -385,6 +386,8 @@ class module(osv.osv): # Mark them to be installed. if to_install_ids: self.button_install(cr, uid, to_install_ids, context=context) + + openerp.modules.registry.RegistryManager.signal_registry_change(cr.dbname) return dict(ACTION_DICT, name=_('Install')) def button_immediate_install(self, cr, uid, ids, context=None): diff --git a/openerp/cron.py b/openerp/cron.py index 7b67877f0fe..8551ed7dadc 100644 --- a/openerp/cron.py +++ b/openerp/cron.py @@ -204,9 +204,12 @@ def start_master_thread(): _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), " "this may cause trouble if you reach that number of parallel cron tasks.", db_maxconn, _thread_slots) - t = threading.Thread(target=runner, name="openerp.cron.master_thread") - t.setDaemon(True) - t.start() - _logger.debug("Master cron daemon started!") + if _thread_slots: + t = threading.Thread(target=runner, name="openerp.cron.master_thread") + t.setDaemon(True) + t.start() + _logger.debug("Master cron daemon started!") + else: + _logger.info("No master cron daemon (0 workers needed).") # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/modules/registry.py b/openerp/modules/registry.py index 5aa2eca26ef..cd59a0e7d45 100644 --- a/openerp/modules/registry.py +++ b/openerp/modules/registry.py @@ -58,6 +58,18 @@ class Registry(object): self.db_name = db_name self.db = openerp.sql_db.db_connect(db_name) + # Inter-process signaling (used only when openerp.multi_process is True): + # The `base_registry_signaling` sequence indicates the whole registry + # must be reloaded. + # The `base_cache_signaling sequence` indicates all caches must be + # invalidated (i.e. cleared). + self.base_registry_signaling_sequence = 1 + self.base_cache_signaling_sequence = 1 + + # Flag indicating if at least one model cache has been cleared. + # Useful only in a multi-process context. + self._any_cache_cleared = False + cr = self.db.cursor() has_unaccent = openerp.modules.db.has_unaccent(cr) if openerp.tools.config['unaccent'] and not has_unaccent: @@ -121,6 +133,36 @@ class Registry(object): """ for model in self.models.itervalues(): model.clear_caches() + # Special case for ir_ui_menu which does not use openerp.tools.ormcache. + ir_ui_menu = self.models.get('ir.ui.menu') + if ir_ui_menu: + ir_ui_menu.clear_cache() + + + # Useful only in a multi-process context. + def reset_any_cache_cleared(self): + self._any_cache_cleared = False + + # Useful only in a multi-process context. + def any_cache_cleared(self): + return self._any_cache_cleared + + @classmethod + def setup_multi_process_signaling(cls, cr): + if not openerp.multi_process: + return + + # Inter-process signaling: + # The `base_registry_signaling` sequence indicates the whole registry + # must be reloaded. + # The `base_cache_signaling sequence` indicates all caches must be + # invalidated (i.e. cleared). + cr.execute("""SELECT sequence_name FROM information_schema.sequences WHERE sequence_name='base_registry_signaling'""") + if not cr.fetchall(): + cr.execute("""CREATE SEQUENCE base_registry_signaling INCREMENT BY 1 START WITH 1""") + cr.execute("""SELECT nextval('base_registry_signaling')""") + cr.execute("""CREATE SEQUENCE base_cache_signaling INCREMENT BY 1 START WITH 1""") + cr.execute("""SELECT nextval('base_cache_signaling')""") @contextmanager def cursor(self, auto_commit=True): @@ -182,6 +224,7 @@ class RegistryManager(object): cr = registry.db.cursor() try: + Registry.setup_multi_process_signaling(cr) registry.do_parent_store(cr) registry.get('ir.actions.report.xml').register_all(cr) cr.commit() @@ -232,5 +275,71 @@ class RegistryManager(object): if db_name in cls.registries: cls.registries[db_name].clear_caches() + @classmethod + def check_registry_signaling(cls, db_name): + if openerp.multi_process and db_name in cls.registries: + registry = cls.get(db_name, pooljobs=False) + cr = registry.db.cursor() + try: + cr.execute(""" + SELECT base_registry_signaling.last_value, + base_cache_signaling.last_value + FROM base_registry_signaling, base_cache_signaling""") + r, c = cr.fetchone() + # Check if the model registry must be reloaded (e.g. after the + # database has been updated by another process). + if registry.base_registry_signaling_sequence != r: + _logger.info("Reloading the model registry after database signaling.") + # Don't run the cron in the Gunicorn worker. + registry = cls.new(db_name, pooljobs=False) + registry.base_registry_signaling_sequence = r + # Check if the model caches must be invalidated (e.g. after a write + # occured on another process). Don't clear right after a registry + # has been reload. + elif registry.base_cache_signaling_sequence != c: + _logger.info("Invalidating all model caches after database signaling.") + registry.base_cache_signaling_sequence = c + registry.clear_caches() + registry.reset_any_cache_cleared() + # One possible reason caches have been invalidated is the + # use of decimal_precision.write(), in which case we need + # to refresh fields.float columns. + for model in registry.models.values(): + for column in model._columns.values(): + if hasattr(column, 'digits_change'): + column.digits_change(cr) + finally: + cr.close() + + @classmethod + def signal_caches_change(cls, db_name): + if openerp.multi_process and db_name in cls.registries: + # Check the registries if any cache has been cleared and signal it + # through the database to other processes. + registry = cls.get(db_name, pooljobs=False) + if registry.any_cache_cleared(): + _logger.info("At least one model cache has been cleared, signaling through the database.") + cr = registry.db.cursor() + r = 1 + try: + cr.execute("select nextval('base_cache_signaling')") + r = cr.fetchone()[0] + finally: + cr.close() + registry.base_cache_signaling_sequence = r + registry.reset_any_cache_cleared() + + @classmethod + def signal_registry_change(cls, db_name): + if openerp.multi_process and db_name in cls.registries: + registry = cls.get(db_name, pooljobs=False) + cr = registry.db.cursor() + r = 1 + try: + cr.execute("select nextval('base_registry_signaling')") + r = cr.fetchone()[0] + finally: + cr.close() + registry.base_registry_signaling_sequence = r # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/osv/orm.py b/openerp/osv/orm.py index ef13253864b..faa71e9f08c 100644 --- a/openerp/osv/orm.py +++ b/openerp/osv/orm.py @@ -2509,6 +2509,7 @@ class BaseModel(object): try: getattr(self, '_ormcache') self._ormcache = {} + self.pool._any_cache_cleared = True except AttributeError: pass diff --git a/openerp/service/web_services.py b/openerp/service/web_services.py index 4824cea84e4..168c930f731 100644 --- a/openerp/service/web_services.py +++ b/openerp/service/web_services.py @@ -597,8 +597,10 @@ class objects_proxy(netsvc.ExportService): raise NameError("Method not available %s" % method) security.check(db,uid,passwd) assert openerp.osv.osv.service, "The object_proxy class must be started with start_object_proxy." + openerp.modules.registry.RegistryManager.check_registry_signaling(db) fn = getattr(openerp.osv.osv.service, method) res = fn(db, uid, *params) + openerp.modules.registry.RegistryManager.signal_caches_change(db) return res @@ -680,8 +682,10 @@ class report_spool(netsvc.ExportService): if method not in ['report', 'report_get', 'render_report']: raise KeyError("Method not supported %s" % method) security.check(db,uid,passwd) + openerp.modules.registry.RegistryManager.check_registry_signaling(db) fn = getattr(self, 'exp_' + method) res = fn(db, uid, *params) + openerp.modules.registry.RegistryManager.signal_caches_change(db) return res def exp_render_report(self, db, uid, object, ids, datas=None, context=None): diff --git a/openerp/tools/cache.py b/openerp/tools/cache.py index 6e18007c340..4b4dcea9b8e 100644 --- a/openerp/tools/cache.py +++ b/openerp/tools/cache.py @@ -57,10 +57,12 @@ class ormcache(object): try: key = args[self.skiparg-2:] del d[key] + self2.pool._any_cache_cleared = True except KeyError: pass else: d.clear() + self2.pool._any_cache_cleared = True class ormcache_multi(ormcache): def __init__(self, skiparg=2, size=8192, multi=3):