[IMP] cron: misc renaming and minor changes to improve reviewability ;-)

bzr revid: odo@openerp.com-20110928224429-up0k4ts3cdks1u2s
This commit is contained in:
Olivier Dony 2011-09-29 00:44:29 +02:00
parent e7601e68cb
commit f2b9b58fd3
3 changed files with 118 additions and 86 deletions

View File

@ -25,12 +25,15 @@ import threading
import psycopg2
from datetime import datetime
from dateutil.relativedelta import relativedelta
import netsvc
import tools
from tools.safe_eval import safe_eval as eval
import pooler
from osv import fields, osv
import openerp
import pooler
import tools
from openerp.cron import WAKE_UP_NOW
from osv import fields, osv
from tools import DEFAULT_SERVER_DATETIME_FORMAT
from tools.safe_eval import safe_eval as eval
def str2tuple(s):
return eval('tuple(%s)' % (s or ''))
@ -47,6 +50,12 @@ _intervalTypes = {
class ir_cron(osv.osv):
""" Model describing cron jobs (also called actions or tasks).
"""
# TODO: perhaps in the future we could consider a flag on ir.cron jobs
# that would cause database wake-up even if the database has not been
# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
# See also openerp.cron
_name = "ir.cron"
_order = 'name'
_columns = {
@ -66,7 +75,7 @@ class ir_cron(osv.osv):
}
_defaults = {
'nextcall' : lambda *a: time.strftime('%Y-%m-%d %H:%M:%S'),
'nextcall' : lambda *a: time.strftime(DEFAULT_SERVER_DATETIME_FORMAT),
'priority' : lambda *a: 5,
'user_id' : lambda obj,cr,uid,context: uid,
'interval_number' : lambda *a: 1,
@ -135,15 +144,15 @@ class ir_cron(osv.osv):
def _run_job(self, cr, job, now):
""" Run a given job taking care of the repetition.
The cursor has a lock on the job (aquired by _run_jobs()) and this
method is run in a worker thread (spawned by _run_jobs())).
The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
method is run in a worker thread (spawned by _run_jobs_multithread())).
:param job: job to be run (as a dictionary).
:param now: timestamp (result of datetime.now(), no need to call it multiple time).
"""
try:
nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
numbercall = job['numbercall']
ok = False
@ -158,20 +167,19 @@ class ir_cron(osv.osv):
addsql = ''
if not numbercall:
addsql = ', active=False'
cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s", (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
if numbercall:
# Reschedule our own main cron thread if necessary.
# This is really needed if this job runs longer than its rescheduling period.
print ">>> advance at", nextcall
nextcall = time.mktime(nextcall.timetuple())
openerp.cron.schedule_in_advance(nextcall, cr.dbname)
openerp.cron.schedule_wakeup(nextcall, cr.dbname)
finally:
cr.commit()
cr.close()
openerp.cron.inc_thread_count()
openerp.cron.release_thread_slot()
def _run_jobs(self):
def _run_jobs_multithread(self):
# TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
""" Process the cron jobs by spawning worker threads.
@ -184,61 +192,69 @@ class ir_cron(osv.osv):
thread (which has to close it itself).
"""
print ">>> _run_jobs"
db = self.pool.db
cr = db.cursor()
db_name = db.dbname
try:
jobs = {} # mapping job ids to jobs for all jobs being processed.
now = datetime.now()
cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
now = datetime.now()
cr.execute("""SELECT * FROM ir_cron
WHERE numbercall != 0
AND active AND nextcall<=now()
ORDER BY priority""")
for job in cr.dictfetchall():
print ">>>", openerp.cron.get_thread_count(), "threads"
if not openerp.cron.get_thread_count():
if not openerp.cron.get_thread_slots():
break
task_cr = db.cursor()
task_job = None
jobs[job['id']] = job
task_cr = db.cursor()
try:
# Try to lock the job...
task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
task_job = task_cr.dictfetchall()[0]
# Try to grab an exclusive lock on the job row from within the task transaction
acquired_lock = False
task_cr.execute("""SELECT *
FROM ir_cron
WHERE id=%s
FOR UPDATE NOWAIT""",
(job['id'],), log_exceptions=False)
acquired_lock = True
except psycopg2.OperationalError, e:
if e.pgcode == '55P03':
# Class 55: Object not in prerequisite state; 55P03: lock_not_available
# ... and fail (in a good way for our purpose).
print ">>>", job['name'], " is already being processed"
self._logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
continue
else:
# ... and fail (badly).
# Unexpected OperationalError
raise
finally:
if not task_job:
if not acquired_lock:
# we're exiting due to an exception while acquiring the lot
task_cr.close()
# ... and succeed.
print ">>> taking care of", job['name']
task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
# Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
task_thread.setDaemon(False)
openerp.cron.dec_thread_count()
openerp.cron.take_thread_slot()
task_thread.start()
self._logger.debug('Cron execution thread for job `%s` spawned', job['name'])
# Wake up time, without considering the currently processed jobs.
if jobs.keys():
cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active and id not in %s', (tuple(jobs.keys()),))
# Find next earliest job ignoring currently processed jobs (by this and other cron threads)
find_next_time_query = """SELECT min(nextcall) AS min_next_call
FROM ir_cron WHERE numbercall != 0 AND active"""
if jobs:
cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
else:
cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
cr.execute(find_next_time_query)
next_call = cr.dictfetchone()['min_next_call']
print ">>> possibility at ", next_call
if next_call:
next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
next_call = time.mktime(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
else:
next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day
# no matching cron job found in database, re-schedule arbitrarily in 1 day,
# this delay will likely be modified when running jobs complete their tasks
next_call = time.time() + (24*3600)
openerp.cron.schedule_in_advance(next_call, db_name)
openerp.cron.schedule_wakeup(next_call, db_name)
except Exception, ex:
self._logger.warning('Exception in cron:', exc_info=True)
@ -258,7 +274,7 @@ class ir_cron(osv.osv):
# when the server is only starting or loading modules (hence the test on pool._init).
if not self.pool._init:
cr.commit()
openerp.cron.schedule_in_advance(1, self.pool.db.dbname)
openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
def create(self, cr, uid, vals, context=None):
res = super(ir_cron, self).create(cr, uid, vals, context=context)

View File

@ -30,11 +30,11 @@ threads to process individual cron jobs.
The thread runs forever, checking every 60 seconds for new
'database wake-ups'. It maintains a heapq of database wake-ups. At each
wake-up, it will call ir_cron._run_jobs() for the given database. _run_jobs
wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread
will check the jobs defined in the ir_cron table and spawn accordingly threads
to process them.
This module behavior depends on the following configuration variable:
This module's behavior depends on the following configuration variable:
openerp.conf.max_cron_threads.
"""
@ -47,8 +47,17 @@ import time
import openerp
# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
# the context of the cron management. This is not about loading a database
# or otherwise making anything about it.
# the context of the cron management. This is not originally about loading
# a database, although having the database name in the queue will
# cause it to be loaded when the schedule time is reached, even if it was
# unloaded in the mean time. Normally a database's wake-up is cancelled by
# the RegistryManager when the database is unloaded - so this should not
# cause it to be reloaded.
#
# TODO: perhaps in the future we could consider a flag on ir.cron jobs
# that would cause database wake-up even if the database has not been
# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
#
# Each element is a triple (timestamp, database-name, boolean). The boolean
# specifies if the wake-up is canceled (so a wake-up can be canceled without
# relying on the heapq implementation detail; no need to remove the job from
@ -57,7 +66,7 @@ _wakeups = []
# Mapping of database names to the wake-up defined in the heapq,
# so that we can cancel the wake-up without messing with the heapq
# internal structure: lookup the wake-up by database-name, then set
# invariant: lookup the wake-up by database-name, then set
# its third element to True.
_wakeup_by_db = {}
@ -69,31 +78,39 @@ _wakeups_lock = threading.RLock()
# Maximum number of threads allowed to process cron jobs concurrently. This
# variable is set by start_master_thread using openerp.conf.max_cron_threads.
_thread_count = None
_thread_slots = None
# A (non re-entrant) lock to protect the above _thread_count variable.
_thread_count_lock = threading.Lock()
# A (non re-entrant) lock to protect the above _thread_slots variable.
_thread_slots_lock = threading.Lock()
_logger = logging.getLogger('cron')
# Sleep duration limits - must not loop too quickly, but can't sleep too long
# either, because a new job might be inserted in ir_cron with a much sooner
# execution date than current known ones. We won't see it until we wake!
MAX_SLEEP = 60 # 1 min
MIN_SLEEP = 1 # 1 sec
def get_thread_count():
""" Return the number of available threads. """
return _thread_count
# Dummy wake-up timestamp that can be used to force a database wake-up asap
WAKE_UP_NOW = 1
def get_thread_slots():
""" Return the number of available thread slots """
return _thread_slots
def inc_thread_count():
""" Increment by the number of available threads. """
global _thread_count
with _thread_count_lock:
_thread_count += 1
def release_thread_slot():
""" Increment the number of available thread slots """
global _thread_slots
with _thread_slots_lock:
_thread_slots += 1
def dec_thread_count():
""" Decrement by the number of available threads. """
global _thread_count
with _thread_count_lock:
_thread_count -= 1
def take_thread_slot():
""" Decrement the number of available thread slots """
global _thread_slots
with _thread_slots_lock:
_thread_slots -= 1
def cancel(db_name):
@ -110,6 +127,7 @@ def cancel(db_name):
def cancel_all():
""" Cancel all database wake-ups. """
_logger.debug("Cancel all database wake-ups")
global _wakeups
global _wakeup_by_db
with _wakeups_lock:
@ -117,7 +135,7 @@ def cancel_all():
_wakeup_by_db = {}
def schedule_in_advance(timestamp, db_name):
def schedule_wakeup(timestamp, db_name):
""" Schedule a new wake-up for a database.
If an earlier wake-up is already defined, the new wake-up is discarded.
@ -131,23 +149,20 @@ def schedule_in_advance(timestamp, db_name):
if not timestamp:
return
with _wakeups_lock:
# Cancel the previous wake-up if any.
add_wakeup = False
if db_name in _wakeup_by_db:
task = _wakeup_by_db[db_name]
if task[2] or timestamp < task[0]:
add_wakeup = True
task[2] = True
else:
add_wakeup = True
if add_wakeup:
task = [timestamp, db_name, False]
heapq.heappush(_wakeups, task)
_wakeup_by_db[db_name] = task
if not task[2] and timestamp > task[0]:
# existing wakeup is valid and occurs earlier than new one
return
task[2] = True # cancel existing task
task = [timestamp, db_name, False]
heapq.heappush(_wakeups, task)
_wakeup_by_db[db_name] = task
_logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
'NOW' if timestamp == WAKE_UP_NOW else timestamp)
def runner():
"""Neverending function (intended to be ran in a dedicated thread) that
"""Neverending function (intended to be run in a dedicated thread) that
checks every 60 seconds the next database wake-up. TODO: make configurable
"""
while True:
@ -155,23 +170,23 @@ def runner():
def runner_body():
with _wakeups_lock:
while _wakeups and _wakeups[0][0] < time.time() and get_thread_count():
while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
task = heapq.heappop(_wakeups)
timestamp, db_name, canceled = task
if canceled:
continue
task[2] = True
registry = openerp.pooler.get_pool(db_name)
if not registry._init:
registry['ir.cron']._run_jobs()
amount = 60
_logger.debug("Database '%s' wake-up! Firing cron jobs in multithreads", db_name)
registry['ir.cron']._run_jobs_multithread()
amount = MAX_SLEEP
with _wakeups_lock:
# Sleep less than 60s if the next known wake-up will happen before.
if _wakeups and get_thread_count():
amount = min(60, _wakeups[0][0] - time.time())
# Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
if _wakeups and get_thread_slots():
amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
_logger.debug("Going to sleep for %ss", amount)
time.sleep(amount)
def start_master_thread():
""" Start the above runner function in a daemon thread.
@ -180,10 +195,11 @@ def start_master_thread():
threads it spawns are not marked daemon).
"""
global _thread_count
_thread_count = openerp.conf.max_cron_threads
global _thread_slots
_thread_slots = openerp.conf.max_cron_threads
t = threading.Thread(target=runner, name="openerp.cron.master_thread")
t.setDaemon(True)
t.start()
_logger.debug("Master cron daemon started!")
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -105,7 +105,7 @@ class Registry(object):
monitor the ir.cron model for future jobs. See openerp.cron for
details.
"""
openerp.cron.schedule_in_advance(1, self.db.dbname)
openerp.cron.schedule_wakeup(openerp.cron.WAKE_UP_NOW, self.db.dbname)
def clear_caches(self):
""" Clear the caches