diff --git a/openerp/addons/base/ir/ir_cron.py b/openerp/addons/base/ir/ir_cron.py index fdfb6adc26a..e4f78e84b74 100644 --- a/openerp/addons/base/ir/ir_cron.py +++ b/openerp/addons/base/ir/ir_cron.py @@ -25,12 +25,15 @@ import threading import psycopg2 from datetime import datetime from dateutil.relativedelta import relativedelta + import netsvc -import tools -from tools.safe_eval import safe_eval as eval -import pooler -from osv import fields, osv import openerp +import pooler +import tools +from openerp.cron import WAKE_UP_NOW +from osv import fields, osv +from tools import DEFAULT_SERVER_DATETIME_FORMAT +from tools.safe_eval import safe_eval as eval def str2tuple(s): return eval('tuple(%s)' % (s or '')) @@ -47,6 +50,12 @@ _intervalTypes = { class ir_cron(osv.osv): """ Model describing cron jobs (also called actions or tasks). """ + + # TODO: perhaps in the future we could consider a flag on ir.cron jobs + # that would cause database wake-up even if the database has not been + # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something) + # See also openerp.cron + _name = "ir.cron" _order = 'name' _columns = { @@ -66,7 +75,7 @@ class ir_cron(osv.osv): } _defaults = { - 'nextcall' : lambda *a: time.strftime('%Y-%m-%d %H:%M:%S'), + 'nextcall' : lambda *a: time.strftime(DEFAULT_SERVER_DATETIME_FORMAT), 'priority' : lambda *a: 5, 'user_id' : lambda obj,cr,uid,context: uid, 'interval_number' : lambda *a: 1, @@ -135,15 +144,15 @@ class ir_cron(osv.osv): def _run_job(self, cr, job, now): """ Run a given job taking care of the repetition. - The cursor has a lock on the job (aquired by _run_jobs()) and this - method is run in a worker thread (spawned by _run_jobs())). + The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this + method is run in a worker thread (spawned by _run_jobs_multithread())). :param job: job to be run (as a dictionary). :param now: timestamp (result of datetime.now(), no need to call it multiple time). """ try: - nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S') + nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT) numbercall = job['numbercall'] ok = False @@ -158,20 +167,19 @@ class ir_cron(osv.osv): addsql = '' if not numbercall: addsql = ', active=False' - cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id'])) + cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s", (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id'])) if numbercall: # Reschedule our own main cron thread if necessary. # This is really needed if this job runs longer than its rescheduling period. - print ">>> advance at", nextcall nextcall = time.mktime(nextcall.timetuple()) - openerp.cron.schedule_in_advance(nextcall, cr.dbname) + openerp.cron.schedule_wakeup(nextcall, cr.dbname) finally: cr.commit() cr.close() - openerp.cron.inc_thread_count() + openerp.cron.release_thread_slot() - def _run_jobs(self): + def _run_jobs_multithread(self): # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py """ Process the cron jobs by spawning worker threads. @@ -184,61 +192,69 @@ class ir_cron(osv.osv): thread (which has to close it itself). """ - print ">>> _run_jobs" db = self.pool.db cr = db.cursor() db_name = db.dbname try: jobs = {} # mapping job ids to jobs for all jobs being processed. - now = datetime.now() - cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority') + now = datetime.now() + cr.execute("""SELECT * FROM ir_cron + WHERE numbercall != 0 + AND active AND nextcall<=now() + ORDER BY priority""") for job in cr.dictfetchall(): - print ">>>", openerp.cron.get_thread_count(), "threads" - if not openerp.cron.get_thread_count(): + if not openerp.cron.get_thread_slots(): break - task_cr = db.cursor() - task_job = None jobs[job['id']] = job + task_cr = db.cursor() try: - # Try to lock the job... - task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False) - task_job = task_cr.dictfetchall()[0] + # Try to grab an exclusive lock on the job row from within the task transaction + acquired_lock = False + task_cr.execute("""SELECT * + FROM ir_cron + WHERE id=%s + FOR UPDATE NOWAIT""", + (job['id'],), log_exceptions=False) + acquired_lock = True except psycopg2.OperationalError, e: if e.pgcode == '55P03': # Class 55: Object not in prerequisite state; 55P03: lock_not_available - # ... and fail (in a good way for our purpose). - print ">>>", job['name'], " is already being processed" + self._logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name']) continue else: - # ... and fail (badly). + # Unexpected OperationalError raise finally: - if not task_job: + if not acquired_lock: + # we're exiting due to an exception while acquiring the lot task_cr.close() - # ... and succeed. - print ">>> taking care of", job['name'] - task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now)) + # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock + task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now)) # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default) task_thread.setDaemon(False) - openerp.cron.dec_thread_count() + openerp.cron.take_thread_slot() task_thread.start() + self._logger.debug('Cron execution thread for job `%s` spawned', job['name']) - # Wake up time, without considering the currently processed jobs. - if jobs.keys(): - cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active and id not in %s', (tuple(jobs.keys()),)) + # Find next earliest job ignoring currently processed jobs (by this and other cron threads) + find_next_time_query = """SELECT min(nextcall) AS min_next_call + FROM ir_cron WHERE numbercall != 0 AND active""" + if jobs: + cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),)) else: - cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active') + cr.execute(find_next_time_query) next_call = cr.dictfetchone()['min_next_call'] - print ">>> possibility at ", next_call if next_call: - next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S')) + next_call = time.mktime(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT)) else: - next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day + # no matching cron job found in database, re-schedule arbitrarily in 1 day, + # this delay will likely be modified when running jobs complete their tasks + next_call = time.time() + (24*3600) - openerp.cron.schedule_in_advance(next_call, db_name) + openerp.cron.schedule_wakeup(next_call, db_name) except Exception, ex: self._logger.warning('Exception in cron:', exc_info=True) @@ -258,7 +274,7 @@ class ir_cron(osv.osv): # when the server is only starting or loading modules (hence the test on pool._init). if not self.pool._init: cr.commit() - openerp.cron.schedule_in_advance(1, self.pool.db.dbname) + openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname) def create(self, cr, uid, vals, context=None): res = super(ir_cron, self).create(cr, uid, vals, context=context) diff --git a/openerp/cron.py b/openerp/cron.py index 64edf04208d..b1c58198062 100644 --- a/openerp/cron.py +++ b/openerp/cron.py @@ -30,11 +30,11 @@ threads to process individual cron jobs. The thread runs forever, checking every 60 seconds for new 'database wake-ups'. It maintains a heapq of database wake-ups. At each -wake-up, it will call ir_cron._run_jobs() for the given database. _run_jobs +wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread will check the jobs defined in the ir_cron table and spawn accordingly threads to process them. -This module behavior depends on the following configuration variable: +This module's behavior depends on the following configuration variable: openerp.conf.max_cron_threads. """ @@ -47,8 +47,17 @@ import time import openerp # Heapq of database wake-ups. Note that 'database wake-up' meaning is in -# the context of the cron management. This is not about loading a database -# or otherwise making anything about it. +# the context of the cron management. This is not originally about loading +# a database, although having the database name in the queue will +# cause it to be loaded when the schedule time is reached, even if it was +# unloaded in the mean time. Normally a database's wake-up is cancelled by +# the RegistryManager when the database is unloaded - so this should not +# cause it to be reloaded. +# +# TODO: perhaps in the future we could consider a flag on ir.cron jobs +# that would cause database wake-up even if the database has not been +# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something) +# # Each element is a triple (timestamp, database-name, boolean). The boolean # specifies if the wake-up is canceled (so a wake-up can be canceled without # relying on the heapq implementation detail; no need to remove the job from @@ -57,7 +66,7 @@ _wakeups = [] # Mapping of database names to the wake-up defined in the heapq, # so that we can cancel the wake-up without messing with the heapq -# internal structure: lookup the wake-up by database-name, then set +# invariant: lookup the wake-up by database-name, then set # its third element to True. _wakeup_by_db = {} @@ -69,31 +78,39 @@ _wakeups_lock = threading.RLock() # Maximum number of threads allowed to process cron jobs concurrently. This # variable is set by start_master_thread using openerp.conf.max_cron_threads. -_thread_count = None +_thread_slots = None -# A (non re-entrant) lock to protect the above _thread_count variable. -_thread_count_lock = threading.Lock() +# A (non re-entrant) lock to protect the above _thread_slots variable. +_thread_slots_lock = threading.Lock() _logger = logging.getLogger('cron') +# Sleep duration limits - must not loop too quickly, but can't sleep too long +# either, because a new job might be inserted in ir_cron with a much sooner +# execution date than current known ones. We won't see it until we wake! +MAX_SLEEP = 60 # 1 min +MIN_SLEEP = 1 # 1 sec -def get_thread_count(): - """ Return the number of available threads. """ - return _thread_count +# Dummy wake-up timestamp that can be used to force a database wake-up asap +WAKE_UP_NOW = 1 + +def get_thread_slots(): + """ Return the number of available thread slots """ + return _thread_slots -def inc_thread_count(): - """ Increment by the number of available threads. """ - global _thread_count - with _thread_count_lock: - _thread_count += 1 +def release_thread_slot(): + """ Increment the number of available thread slots """ + global _thread_slots + with _thread_slots_lock: + _thread_slots += 1 -def dec_thread_count(): - """ Decrement by the number of available threads. """ - global _thread_count - with _thread_count_lock: - _thread_count -= 1 +def take_thread_slot(): + """ Decrement the number of available thread slots """ + global _thread_slots + with _thread_slots_lock: + _thread_slots -= 1 def cancel(db_name): @@ -110,6 +127,7 @@ def cancel(db_name): def cancel_all(): """ Cancel all database wake-ups. """ + _logger.debug("Cancel all database wake-ups") global _wakeups global _wakeup_by_db with _wakeups_lock: @@ -117,7 +135,7 @@ def cancel_all(): _wakeup_by_db = {} -def schedule_in_advance(timestamp, db_name): +def schedule_wakeup(timestamp, db_name): """ Schedule a new wake-up for a database. If an earlier wake-up is already defined, the new wake-up is discarded. @@ -131,23 +149,20 @@ def schedule_in_advance(timestamp, db_name): if not timestamp: return with _wakeups_lock: - # Cancel the previous wake-up if any. - add_wakeup = False if db_name in _wakeup_by_db: task = _wakeup_by_db[db_name] - if task[2] or timestamp < task[0]: - add_wakeup = True - task[2] = True - else: - add_wakeup = True - if add_wakeup: - task = [timestamp, db_name, False] - heapq.heappush(_wakeups, task) - _wakeup_by_db[db_name] = task - + if not task[2] and timestamp > task[0]: + # existing wakeup is valid and occurs earlier than new one + return + task[2] = True # cancel existing task + task = [timestamp, db_name, False] + heapq.heappush(_wakeups, task) + _wakeup_by_db[db_name] = task + _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name, + 'NOW' if timestamp == WAKE_UP_NOW else timestamp) def runner(): - """Neverending function (intended to be ran in a dedicated thread) that + """Neverending function (intended to be run in a dedicated thread) that checks every 60 seconds the next database wake-up. TODO: make configurable """ while True: @@ -155,23 +170,23 @@ def runner(): def runner_body(): with _wakeups_lock: - while _wakeups and _wakeups[0][0] < time.time() and get_thread_count(): + while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots(): task = heapq.heappop(_wakeups) timestamp, db_name, canceled = task if canceled: continue - task[2] = True registry = openerp.pooler.get_pool(db_name) if not registry._init: - registry['ir.cron']._run_jobs() - amount = 60 + _logger.debug("Database '%s' wake-up! Firing cron jobs in multithreads", db_name) + registry['ir.cron']._run_jobs_multithread() + amount = MAX_SLEEP with _wakeups_lock: - # Sleep less than 60s if the next known wake-up will happen before. - if _wakeups and get_thread_count(): - amount = min(60, _wakeups[0][0] - time.time()) + # Sleep less than MAX_SLEEP if the next known wake-up will happen before that. + if _wakeups and get_thread_slots(): + amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time())) + _logger.debug("Going to sleep for %ss", amount) time.sleep(amount) - def start_master_thread(): """ Start the above runner function in a daemon thread. @@ -180,10 +195,11 @@ def start_master_thread(): threads it spawns are not marked daemon). """ - global _thread_count - _thread_count = openerp.conf.max_cron_threads + global _thread_slots + _thread_slots = openerp.conf.max_cron_threads t = threading.Thread(target=runner, name="openerp.cron.master_thread") t.setDaemon(True) t.start() + _logger.debug("Master cron daemon started!") # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/modules/registry.py b/openerp/modules/registry.py index 325c2502ef9..36e83a6181d 100644 --- a/openerp/modules/registry.py +++ b/openerp/modules/registry.py @@ -105,7 +105,7 @@ class Registry(object): monitor the ir.cron model for future jobs. See openerp.cron for details. """ - openerp.cron.schedule_in_advance(1, self.db.dbname) + openerp.cron.schedule_wakeup(openerp.cron.WAKE_UP_NOW, self.db.dbname) def clear_caches(self): """ Clear the caches