From 42f292af93bc1e77b079fb0a934e6724f3e13d5e Mon Sep 17 00:00:00 2001 From: Antony Lesuisse Date: Sun, 9 Dec 2012 18:04:47 +0100 Subject: [PATCH] cron cleanup, back to the Kernighan KISS roots 1min poll time, rely only on database, multiprocess/multiserver ready. Nota: If we replace sequence signaling for cache invalidation with pg listen/notify in the future, we will use the same mechanism for more accurate cron timing. bzr revid: al@openerp.com-20121209170447-zs0k3jazokylwvar --- openerp/addons/base/ir/ir_cron.py | 142 -------------------- openerp/conf/__init__.py | 4 - openerp/cron.py | 215 ------------------------------ openerp/modules/registry.py | 6 +- openerp/service/__init__.py | 8 +- openerp/service/cron.py | 71 ++++++++++ openerp/tools/config.py | 4 +- 7 files changed, 79 insertions(+), 371 deletions(-) delete mode 100644 openerp/cron.py create mode 100644 openerp/service/cron.py diff --git a/openerp/addons/base/ir/ir_cron.py b/openerp/addons/base/ir/ir_cron.py index 243f4c772ea..e2d791281d7 100644 --- a/openerp/addons/base/ir/ir_cron.py +++ b/openerp/addons/base/ir/ir_cron.py @@ -31,7 +31,6 @@ import netsvc import openerp import pooler import tools -from openerp.cron import WAKE_UP_NOW from osv import fields, osv from tools import DEFAULT_SERVER_DATETIME_FORMAT from tools.safe_eval import safe_eval as eval @@ -142,130 +141,6 @@ class ir_cron(osv.osv): except Exception, e: self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e) - def _run_job(self, cr, job, now): - """ Run a given job taking care of the repetition. - - The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this - method is run in a worker thread (spawned by _run_jobs_multithread())). - - :param job: job to be run (as a dictionary). - :param now: timestamp (result of datetime.now(), no need to call it multiple time). - - """ - try: - nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT) - numbercall = job['numbercall'] - - ok = False - while nextcall < now and numbercall: - if numbercall > 0: - numbercall -= 1 - if not ok or job['doall']: - self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id']) - if numbercall: - nextcall += _intervalTypes[job['interval_type']](job['interval_number']) - ok = True - addsql = '' - if not numbercall: - addsql = ', active=False' - cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s", - (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id'])) - - if numbercall: - # Reschedule our own main cron thread if necessary. - # This is really needed if this job runs longer than its rescheduling period. - nextcall = calendar.timegm(nextcall.timetuple()) - openerp.cron.schedule_wakeup(nextcall, cr.dbname) - finally: - cr.commit() - cr.close() - openerp.cron.release_thread_slot() - - def _run_jobs_multithread(self): - # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py - """ Process the cron jobs by spawning worker threads. - - This selects in database all the jobs that should be processed. It then - tries to lock each of them and, if it succeeds, spawns a thread to run - the cron job (if it doesn't succeed, it means the job was already - locked to be taken care of by another thread). - - The cursor used to lock the job in database is given to the worker - thread (which has to close it itself). - - """ - db = self.pool.db - cr = db.cursor() - db_name = db.dbname - try: - jobs = {} # mapping job ids to jobs for all jobs being processed. - now = datetime.now() - # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1. - cr.execute("""SELECT * FROM ir_cron - WHERE numbercall != 0 - AND active AND nextcall <= (now() at time zone 'UTC') - ORDER BY priority""") - for job in cr.dictfetchall(): - if not openerp.cron.get_thread_slots(): - break - jobs[job['id']] = job - - task_cr = db.cursor() - try: - # Try to grab an exclusive lock on the job row from within the task transaction - acquired_lock = False - task_cr.execute("""SELECT * - FROM ir_cron - WHERE id=%s - FOR UPDATE NOWAIT""", - (job['id'],), log_exceptions=False) - acquired_lock = True - except psycopg2.OperationalError, e: - if e.pgcode == '55P03': - # Class 55: Object not in prerequisite state; 55P03: lock_not_available - _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name']) - continue - else: - # Unexpected OperationalError - raise - finally: - if not acquired_lock: - # we're exiting due to an exception while acquiring the lot - task_cr.close() - - # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock - task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now)) - # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default) - task_thread.setDaemon(False) - openerp.cron.take_thread_slot() - task_thread.start() - _logger.debug('Cron execution thread for job `%s` spawned', job['name']) - - # Find next earliest job ignoring currently processed jobs (by this and other cron threads) - find_next_time_query = """SELECT min(nextcall) AS min_next_call - FROM ir_cron WHERE numbercall != 0 AND active""" - if jobs: - cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),)) - else: - cr.execute(find_next_time_query) - next_call = cr.dictfetchone()['min_next_call'] - - if next_call: - next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT)) - else: - # no matching cron job found in database, re-schedule arbitrarily in 1 day, - # this delay will likely be modified when running jobs complete their tasks - next_call = time.time() + (24*3600) - - openerp.cron.schedule_wakeup(next_call, db_name) - - except Exception, ex: - _logger.warning('Exception in cron:', exc_info=True) - - finally: - cr.commit() - cr.close() - def _process_job(self, cr, job): """ Run a given job taking care of the repetition. @@ -365,19 +240,6 @@ class ir_cron(osv.osv): return False - def update_running_cron(self, cr): - """ Schedule as soon as possible a wake-up for this database. """ - # Verify whether the server is already started and thus whether we need to commit - # immediately our changes and restart the cron agent in order to apply the change - # immediately. The commit() is needed because as soon as the cron is (re)started it - # will query the database with its own cursor, possibly before the end of the - # current transaction. - # This commit() is not an issue in most cases, but we must absolutely avoid it - # when the server is only starting or loading modules (hence the test on pool._init). - if not self.pool._init: - cr.commit() - openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname) - def _try_lock(self, cr, uid, ids, context=None): """Try to grab a dummy exclusive write-lock to the rows with the given ids, to make sure a following write() or unlink() will not block due @@ -393,20 +255,16 @@ class ir_cron(osv.osv): def create(self, cr, uid, vals, context=None): res = super(ir_cron, self).create(cr, uid, vals, context=context) - self.update_running_cron(cr) return res def write(self, cr, uid, ids, vals, context=None): self._try_lock(cr, uid, ids, context) res = super(ir_cron, self).write(cr, uid, ids, vals, context=context) - self.update_running_cron(cr) return res def unlink(self, cr, uid, ids, context=None): self._try_lock(cr, uid, ids, context) res = super(ir_cron, self).unlink(cr, uid, ids, context=context) - self.update_running_cron(cr) return res -ir_cron() # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/conf/__init__.py b/openerp/conf/__init__.py index c89ddf44734..0a975c5d4e2 100644 --- a/openerp/conf/__init__.py +++ b/openerp/conf/__init__.py @@ -35,10 +35,6 @@ must be used. import deprecation -# Maximum number of threads processing concurrently cron jobs. -max_cron_threads = 4 # Actually the default value here is meaningless, - # look at tools.config for the default value. - # Paths to search for OpenERP addons. addons_paths = [] diff --git a/openerp/cron.py b/openerp/cron.py deleted file mode 100644 index 8551ed7dadc..00000000000 --- a/openerp/cron.py +++ /dev/null @@ -1,215 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -############################################################################## -# -# OpenERP, Open Source Management Solution -# Copyright (C) 2004-2011 OpenERP SA () -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - -""" Cron jobs scheduling - -Cron jobs are defined in the ir_cron table/model. This module deals with all -cron jobs, for all databases of a single OpenERP server instance. - -It defines a single master thread that will spawn (a bounded number of) -threads to process individual cron jobs. - -The thread runs forever, checking every 60 seconds for new -'database wake-ups'. It maintains a heapq of database wake-ups. At each -wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread -will check the jobs defined in the ir_cron table and spawn accordingly threads -to process them. - -This module's behavior depends on the following configuration variable: -openerp.conf.max_cron_threads. - -""" - -import heapq -import logging -import threading -import time - -import openerp -import tools - -_logger = logging.getLogger(__name__) - -# Heapq of database wake-ups. Note that 'database wake-up' meaning is in -# the context of the cron management. This is not originally about loading -# a database, although having the database name in the queue will -# cause it to be loaded when the schedule time is reached, even if it was -# unloaded in the mean time. Normally a database's wake-up is cancelled by -# the RegistryManager when the database is unloaded - so this should not -# cause it to be reloaded. -# -# TODO: perhaps in the future we could consider a flag on ir.cron jobs -# that would cause database wake-up even if the database has not been -# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something) -# -# Each element is a triple (timestamp, database-name, boolean). The boolean -# specifies if the wake-up is canceled (so a wake-up can be canceled without -# relying on the heapq implementation detail; no need to remove the job from -# the heapq). -_wakeups = [] - -# Mapping of database names to the wake-up defined in the heapq, -# so that we can cancel the wake-up without messing with the heapq -# invariant: lookup the wake-up by database-name, then set -# its third element to True. -_wakeup_by_db = {} - -# Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables. -# We could use a simple (non-reentrant) lock if the runner function below -# was more fine-grained, but we are fine with the loop owning the lock -# while spawning a few threads. -_wakeups_lock = threading.RLock() - -# Maximum number of threads allowed to process cron jobs concurrently. This -# variable is set by start_master_thread using openerp.conf.max_cron_threads. -_thread_slots = None - -# A (non re-entrant) lock to protect the above _thread_slots variable. -_thread_slots_lock = threading.Lock() - -# Sleep duration limits - must not loop too quickly, but can't sleep too long -# either, because a new job might be inserted in ir_cron with a much sooner -# execution date than current known ones. We won't see it until we wake! -MAX_SLEEP = 60 # 1 min -MIN_SLEEP = 1 # 1 sec - -# Dummy wake-up timestamp that can be used to force a database wake-up asap -WAKE_UP_NOW = 1 - -def get_thread_slots(): - """ Return the number of available thread slots """ - return _thread_slots - - -def release_thread_slot(): - """ Increment the number of available thread slots """ - global _thread_slots - with _thread_slots_lock: - _thread_slots += 1 - - -def take_thread_slot(): - """ Decrement the number of available thread slots """ - global _thread_slots - with _thread_slots_lock: - _thread_slots -= 1 - - -def cancel(db_name): - """ Cancel the next wake-up of a given database, if any. - - :param db_name: database name for which the wake-up is canceled. - - """ - _logger.debug("Cancel next wake-up for database '%s'.", db_name) - with _wakeups_lock: - if db_name in _wakeup_by_db: - _wakeup_by_db[db_name][2] = True - - -def cancel_all(): - """ Cancel all database wake-ups. """ - _logger.debug("Cancel all database wake-ups") - global _wakeups - global _wakeup_by_db - with _wakeups_lock: - _wakeups = [] - _wakeup_by_db = {} - - -def schedule_wakeup(timestamp, db_name): - """ Schedule a new wake-up for a database. - - If an earlier wake-up is already defined, the new wake-up is discarded. - If another wake-up is defined, that wake-up is discarded and the new one - is scheduled. - - :param db_name: database name for which a new wake-up is scheduled. - :param timestamp: when the wake-up is scheduled. - - """ - if not timestamp: - return - with _wakeups_lock: - if db_name in _wakeup_by_db: - task = _wakeup_by_db[db_name] - if not task[2] and timestamp > task[0]: - # existing wakeup is valid and occurs earlier than new one - return - task[2] = True # cancel existing task - task = [timestamp, db_name, False] - heapq.heappush(_wakeups, task) - _wakeup_by_db[db_name] = task - _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name, - 'NOW' if timestamp == WAKE_UP_NOW else timestamp) - -def runner(): - """Neverending function (intended to be run in a dedicated thread) that - checks every 60 seconds the next database wake-up. TODO: make configurable - """ - while True: - runner_body() - -def runner_body(): - with _wakeups_lock: - while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots(): - task = heapq.heappop(_wakeups) - timestamp, db_name, canceled = task - if canceled: - continue - del _wakeup_by_db[db_name] - registry = openerp.pooler.get_pool(db_name) - if not registry._init: - _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name) - registry['ir.cron']._run_jobs_multithread() - amount = MAX_SLEEP - with _wakeups_lock: - # Sleep less than MAX_SLEEP if the next known wake-up will happen before that. - if _wakeups and get_thread_slots(): - amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time())) - _logger.debug("Going to sleep for %ss", amount) - time.sleep(amount) - -def start_master_thread(): - """ Start the above runner function in a daemon thread. - - The thread is a typical daemon thread: it will never quit and must be - terminated when the main process exits - with no consequence (the processing - threads it spawns are not marked daemon). - - """ - global _thread_slots - _thread_slots = openerp.conf.max_cron_threads - db_maxconn = tools.config['db_maxconn'] - if _thread_slots >= tools.config.get('db_maxconn', 64): - _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), " - "this may cause trouble if you reach that number of parallel cron tasks.", - db_maxconn, _thread_slots) - if _thread_slots: - t = threading.Thread(target=runner, name="openerp.cron.master_thread") - t.setDaemon(True) - t.start() - _logger.debug("Master cron daemon started!") - else: - _logger.info("No master cron daemon (0 workers needed).") - -# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/modules/registry.py b/openerp/modules/registry.py index cd59a0e7d45..397ab98415c 100644 --- a/openerp/modules/registry.py +++ b/openerp/modules/registry.py @@ -28,7 +28,6 @@ import threading import openerp.sql_db import openerp.osv.orm -import openerp.cron import openerp.tools import openerp.modules.db import openerp.tools.config @@ -58,6 +57,9 @@ class Registry(object): self.db_name = db_name self.db = openerp.sql_db.db_connect(db_name) + # In monoprocess cron jobs flag (pooljobs) + self.cron = False + # Inter-process signaling (used only when openerp.multi_process is True): # The `base_registry_signaling` sequence indicates the whole registry # must be reloaded. @@ -124,7 +126,7 @@ class Registry(object): monitor the ir.cron model for future jobs. See openerp.cron for details. """ - openerp.cron.schedule_wakeup(openerp.cron.WAKE_UP_NOW, self.db.dbname) + self.cron = True def clear_caches(self): """ Clear the caches diff --git a/openerp/service/__init__.py b/openerp/service/__init__.py index 26740ec363b..1c8f27b5029 100644 --- a/openerp/service/__init__.py +++ b/openerp/service/__init__.py @@ -24,10 +24,11 @@ import logging import threading import time +import cron import netrpc_server import web_services +import web_services -import openerp.cron import openerp.modules import openerp.netsvc import openerp.osv @@ -83,8 +84,7 @@ def start_services(): netrpc_server.init_servers() # Start the main cron thread. - if openerp.conf.max_cron_threads: - openerp.cron.start_master_thread() + cron.start_master_thread() # Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC). openerp.netsvc.Server.startAll() @@ -95,8 +95,6 @@ def start_services(): def stop_services(): """ Stop all services. """ # stop scheduling new jobs; we will have to wait for the jobs to complete below - openerp.cron.cancel_all() - openerp.netsvc.Server.quitAll() openerp.service.wsgi_server.stop_server() _logger.info("Initiating shutdown") diff --git a/openerp/service/cron.py b/openerp/service/cron.py new file mode 100644 index 00000000000..08ce6793c75 --- /dev/null +++ b/openerp/service/cron.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +############################################################################## +# +# OpenERP, Open Source Management Solution +# Copyright (C) 2004-2011 OpenERP SA () +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +############################################################################## + +""" Cron jobs scheduling + +Cron jobs are defined in the ir_cron table/model. This module deals with all +cron jobs, for all databases of a single OpenERP server instance. + +""" + +import logging +import threading +import time + +import openerp + +_logger = logging.getLogger(__name__) + +SLEEP_INTERVAL = 60 # 1 min + +def cron_runner(number): + while True: + time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style + registries = openerp.modules.registry.RegistryManager.registries + _logger.debug('cron%d polling for jobs', number) + for db_name, registry in registries.items(): + while True and registry.cron: + # acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name) + # TODO why isnt openerp.addons.base defined ? + import sys + base = sys.modules['addons.base'] + acquired = base.ir.ir_cron.ir_cron._acquire_job(db_name) + if not acquired: + break + +def start_master_thread(): + """ Start the above runner function in a daemon thread. + + The thread is a typical daemon thread: it will never quit and must be + terminated when the main process exits - with no consequence (the processing + threads it spawns are not marked daemon). + + """ + for i in range(openerp.tools.config['max_cron_threads']): + def target(): + cron_runner(i) + t = threading.Thread(target=target, name="openerp.service.cron.cron_runner%d" % i) + t.setDaemon(True) + t.start() + _logger.debug("cron%d started!" % i) + +# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/tools/config.py b/openerp/tools/config.py index 0b5a95d05f6..5b481c3f88c 100644 --- a/openerp/tools/config.py +++ b/openerp/tools/config.py @@ -269,7 +269,7 @@ class configmanager(object): "osv_memory tables. This is a decimal value expressed in hours, " "and the default is 1 hour.", type="float") - group.add_option("--max-cron-threads", dest="max_cron_threads", my_default=4, + group.add_option("--max-cron-threads", dest="max_cron_threads", my_default=2, help="Maximum number of threads processing concurrently cron jobs.", type="int") group.add_option("--unaccent", dest="unaccent", my_default=False, action="store_true", @@ -479,8 +479,6 @@ class configmanager(object): if opt.save: self.save() - openerp.conf.max_cron_threads = self.options['max_cron_threads'] - openerp.conf.addons_paths = self.options['addons_path'].split(',') if opt.server_wide_modules: openerp.conf.server_wide_modules = map(lambda m: m.strip(), opt.server_wide_modules.split(','))