diff --git a/openerp/addons/base/ir/ir_cron.py b/openerp/addons/base/ir/ir_cron.py index 2cdb15b4990..04486cafdd0 100644 --- a/openerp/addons/base/ir/ir_cron.py +++ b/openerp/addons/base/ir/ir_cron.py @@ -162,7 +162,7 @@ class ir_cron(osv.osv): if numbercall: # Reschedule our own main cron thread if necessary. - # This is really needed if this job run longer that its rescheduling period. + # This is really needed if this job runs longer than its rescheduling period. print ">>> advance at", nextcall nextcall = time.mktime(nextcall.timetuple()) openerp.cron.schedule_in_advance(nextcall, cr.dbname) diff --git a/openerp/cron.py b/openerp/cron.py index 79347fbe178..f6e8a5d49a2 100644 --- a/openerp/cron.py +++ b/openerp/cron.py @@ -28,6 +28,12 @@ cron jobs, for all databases of a single OpenERP server instance. It defines a single master thread that will spawn (a bounded number of) threads to process individual cron jobs. +The thread runs forever, checking every 60 seconds for new +'database wake-ups'. It maintains a heapq of database wake-ups. At each +wake-up, it will call ir_cron._run_jobs() for the given database. _run_jobs +will check the jobs defined in the ir_cron table and spawn accordingly threads +to process them. + """ import heapq @@ -37,64 +43,61 @@ import time import openerp -""" Singleton that keeps track of cancellable tasks to run at a given - timestamp. - - The tasks are characterised by: - - * a timestamp - * the database on which the task run - * a boolean attribute specifying if the task is canceled - - Implementation details: - - - Tasks are stored as list, allowing the cancellation by setting - the boolean to True. - - A heapq is used to store tasks, so we don't need to sort - tasks ourself. -""" - # Heapq of database wake-ups. Note that 'database wake-up' meaning is in # the context of the cron management. This is not about loading a database # or otherwise making anything about it. -_wakeups = [] # TODO protect this variable with a lock? +# Each element is a triple (timestamp, database-name, boolean). The boolean +# specifies if the wake-up is canceled (so a wake-up can be canceled without +# relying on the heapq implementation detail; no need to remove the job from +# the heapq). +_wakeups = [] # Mapping of database names to the wake-up defined in the heapq, # so that we can cancel the wake-up without messing with the heapq -# internal structure. +# internal structure: lookup the wake-up by database-name, then set +# its third element to True. _wakeup_by_db = {} -_logger = logging.getLogger('cron') - +# Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables. # We could use a simple (non-reentrant) lock if the runner function below # was more fine-grained, but we are fine with the loop owning the lock # while spawning a few threads. _wakeups_lock = threading.RLock() +# Maximum number of threads allowed to process cron jobs concurrently. +_thread_count = 2 # TODO make it configurable + +# A (non re-entrant) lock to protect the above _thread_count variable. _thread_count_lock = threading.Lock() -# Maximum number of threads allowed to process cron jobs concurrently. -_thread_count = 2 +_logger = logging.getLogger('cron') def get_thread_count(): + """ Return the number of available threads. """ return _thread_count def inc_thread_count(): + """ Increment by the number of available threads. """ global _thread_count with _thread_count_lock: _thread_count += 1 def dec_thread_count(): + """ Decrement by the number of available threads. """ global _thread_count with _thread_count_lock: _thread_count -= 1 def cancel(db_name): - """ Cancel the next wake-up of a given database, if any. """ + """ Cancel the next wake-up of a given database, if any. + + :param db_name: database name for which the wake-up is canceled. + + """ _logger.debug("Cancel next wake-up for database '%s'.", db_name) with _wakeups_lock: if db_name in _wakeup_by_db: @@ -111,16 +114,20 @@ def cancel_all(): def schedule_in_advance(timestamp, db_name): - """ Schedule a wake-up for a new database. + """ Schedule a new wake-up for a database. If an earlier wake-up is already defined, the new wake-up is discarded. - If another wake-up is defined, it is discarded. + If another wake-up is defined, that wake-up is discarded and the new one + is scheduled. + + :param db_name: database name for which a new wake-up is scheduled. + :param timestamp: when the wake-up is scheduled. """ if not timestamp: return with _wakeups_lock: - # Cancel the previous wakeup if any. + # Cancel the previous wake-up if any. add_wakeup = False if db_name in _wakeup_by_db: task = _wakeup_by_db[db_name] @@ -152,6 +159,7 @@ def runner(): registry['ir.cron']._run_jobs() amount = 60 with _wakeups_lock: + # Sleep less than 60s if the next known wake-up will happen before. if _wakeups and get_thread_count(): amount = min(60, _wakeups[0][0] - time.time()) time.sleep(amount)