[IMP] ir_cron: reschedule the main cron thread if a worker takes too long.

bzr revid: vmt@openerp.com-20110707135843-z38f4r8s373ctnd2
This commit is contained in:
Vo Minh Thu 2011-07-07 15:58:43 +02:00
parent b9e581303b
commit 46f8243877
3 changed files with 78 additions and 13 deletions

View File

@ -108,8 +108,8 @@ if config['db_name']:
openerp.tools.convert_yaml_import(cr, 'base', file(config["test_file"]), {}, 'test', True)
cr.rollback()
pool.get('ir.cron')._poolJobs(db.dbname)
# pool.get('ir.cron').restart(db.dbname) # jobs will start to be processed later, when start_agent below is called.
# jobs will start to be processed later, when start_agent below is called.
pool.get('ir.cron').restart(db.dbname)
cr.close()

View File

@ -76,6 +76,25 @@ class ir_cron(osv.osv, netsvc.Agent):
'doall' : lambda *a: 1
}
thread_count_lock = threading.Lock()
thread_count = 1 # maximum allowed number of thread.
@classmethod
def get_thread_count(cls):
return cls.thread_count
@classmethod
def inc_thread_count(cls):
cls.thread_count_lock.acquire()
cls.thread_count += 1
cls.thread_count_lock.release()
@classmethod
def dec_thread_count(cls):
cls.thread_count_lock.acquire()
cls.thread_count -= 1
cls.thread_count_lock.release()
def f(a, b, c):
print ">>> in f"
@ -125,7 +144,11 @@ class ir_cron(osv.osv, netsvc.Agent):
self._handle_callback_exception(cr, uid, model, func, args, job_id, e)
def _compute_nextcall(self, job, now):
""" Compute the nextcall for a job exactly as _run_job does. """
""" Compute the nextcall for a job exactly as _run_job does.
Return either the nextcall or None if it shouldn't be called.
"""
nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
numbercall = job['numbercall']
@ -135,10 +158,12 @@ class ir_cron(osv.osv, netsvc.Agent):
if numbercall:
nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
if not numbercall:
return None
return nextcall.strftime('%Y-%m-%d %H:%M:%S')
def _run_job(self, cr, job, now):
""" Run a given job. """
""" Run a given job taking care of the repetition. """
try:
nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
numbercall = job['numbercall']
@ -156,18 +181,35 @@ class ir_cron(osv.osv, netsvc.Agent):
if not numbercall:
addsql = ', active=False'
cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
# TODO re-schedule the master thread to nextcall if its wake-up time is later than nextcall.
# TODO NOTIFY the 'ir_cron' channel.
if numbercall:
# Reschedule our own main cron thread if necessary.
# This is really needed if this job run longer that its rescheduling period.
print ">>> advance at", nextcall
nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
self.reschedule_in_advance(self._poolJobs, nextcall, cr.dbname, cr.dbname)
finally:
cr.commit()
cr.close()
def _poolJobs(self, db_name):
return self._run_jobs(db_name)
def _run_jobs(self, db_name):
# TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
""" Process the cron jobs by spawning worker threads.
This selects in database all the jobs that should be processed. It then
try to lock each of them and, if it succeeds, spawn a thread to run the
cron job (if doesn't succeed, it means another the job was already
locked to be taken care of by another thread.
"""
try:
db, pool = pooler.get_db_and_pool(db_name)
except:
return False
print ">>> _run_jobs"
cr = db.cursor()
try:
jobs = {} # mapping job ids to jobs for all jobs being processed.
@ -177,13 +219,16 @@ class ir_cron(osv.osv, netsvc.Agent):
for job in cr.dictfetchall():
task_cr = db.cursor()
task_job = None
jobs[job['id']] = job
try:
# Try to lock the job...
task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
task_job = task_cr.dictfetchall()[0]
jobs[job['id']] = job
except psycopg2.OperationalError, e:
if e.pgcode == '55P03':
# Class 55: Object not in prerequisite state, 55P03: lock_not_available
# ... and fail.
print ">>>", job['name'], " is already being processed"
continue
else:
raise
@ -191,6 +236,8 @@ class ir_cron(osv.osv, netsvc.Agent):
if not task_job:
task_cr.close()
# ... and succeed.
print ">>> taking care of", job['name']
task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
task_thread.setDaemon(False)
@ -202,17 +249,23 @@ class ir_cron(osv.osv, netsvc.Agent):
else:
cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
next_call = cr.dictfetchone()['min_next_call']
print ">>> possibility at ", next_call
# Wake up time, taking the smallest processed job nextcall value.
for job in jobs.values():
nextcall = self._compute_nextcall(job, now)
print ">>> or at ", nextcall
if not nextcall:
continue
if not next_call or nextcall < next_call:
next_call = nextcall
print ">>> rescheduling at", next_call
if next_call:
next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
else:
next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day
# Take the smallest nextcall value.
for job in jobs.values():
nextcall = self._compute_nextcall(job, now)
if nextcall < next_call:
next_call = nextcall
self.setAlarm(self._poolJobs, next_call, db_name, db_name)
except Exception, ex:

View File

@ -283,6 +283,18 @@ class Agent(object):
for task in cls.__tasks_by_db[db_name]:
task[0] = 0
@classmethod
def reschedule_in_advance(cls, function, timestamp, db_name, *args, **kwargs):
# Cancel the previous task if any.
old_timestamp = None
if db_name in cls.__tasks_by_db:
for task in cls.__tasks_by_db[db_name]:
if task[2] == function and timestamp < task[0]:
old_timestamp = task[0]
task[0] = 0
if not old_timestamp or timestamp < old_timestamp:
cls.setAlarm(function, timestamp, db_name, *args, **kwargs)
@classmethod
def quit(cls):
cls.cancel(None)