[IMP] ir_cron: each job in its own thread, first stab.

bzr revid: vmt@openerp.com-20110705170053-q3xgeoq21oc7dh8h
This commit is contained in:
Vo Minh Thu 2011-07-05 19:00:53 +02:00
parent 63ebd1cdb3
commit b9e581303b
2 changed files with 93 additions and 21 deletions

View File

@ -109,6 +109,7 @@ if config['db_name']:
cr.rollback()
pool.get('ir.cron')._poolJobs(db.dbname)
# pool.get('ir.cron').restart(db.dbname) # jobs will start to be processed later, when start_agent below is called.
cr.close()

View File

@ -21,6 +21,8 @@
import time
import logging
import threading
import psycopg2
from datetime import datetime
from dateutil.relativedelta import relativedelta
import netsvc
@ -74,6 +76,19 @@ class ir_cron(osv.osv, netsvc.Agent):
'doall' : lambda *a: 1
}
def f(a, b, c):
print ">>> in f"
def expensive(a, b, c):
print ">>> in expensive"
time.sleep(80)
print ">>> out expensive"
def expensive_2(a, b, c):
print ">>> in expensive_2"
time.sleep(80)
print ">>> out expensive_2"
def _check_args(self, cr, uid, ids, context=None):
try:
for this in self.browse(cr, uid, ids, context):
@ -109,45 +124,96 @@ class ir_cron(osv.osv, netsvc.Agent):
except Exception, e:
self._handle_callback_exception(cr, uid, model, func, args, job_id, e)
def _poolJobs(self, db_name, check=False):
def _compute_nextcall(self, job, now):
""" Compute the nextcall for a job exactly as _run_job does. """
nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
numbercall = job['numbercall']
while nextcall < now and numbercall:
if numbercall > 0:
numbercall -= 1
if numbercall:
nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
return nextcall.strftime('%Y-%m-%d %H:%M:%S')
def _run_job(self, cr, job, now):
""" Run a given job. """
try:
nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
numbercall = job['numbercall']
ok = False
while nextcall < now and numbercall:
if numbercall > 0:
numbercall -= 1
if not ok or job['doall']:
self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
if numbercall:
nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
ok = True
addsql = ''
if not numbercall:
addsql = ', active=False'
cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
# TODO re-schedule the master thread to nextcall if its wake-up time is later than nextcall.
# TODO NOTIFY the 'ir_cron' channel.
finally:
cr.commit()
cr.close()
def _poolJobs(self, db_name):
# TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
try:
db, pool = pooler.get_db_and_pool(db_name)
except:
return False
cr = db.cursor()
try:
jobs = {} # mapping job ids to jobs for all jobs being processed.
if not pool._init:
now = datetime.now()
cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
for job in cr.dictfetchall():
nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
numbercall = job['numbercall']
task_cr = db.cursor()
task_job = None
try:
task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
task_job = task_cr.dictfetchall()[0]
jobs[job['id']] = job
except psycopg2.OperationalError, e:
if e.pgcode == '55P03':
# Class 55: Object not in prerequisite state, 55P03: lock_not_available
continue
else:
raise
finally:
if not task_job:
task_cr.close()
ok = False
while nextcall < now and numbercall:
if numbercall > 0:
numbercall -= 1
if not ok or job['doall']:
self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
if numbercall:
nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
ok = True
addsql = ''
if not numbercall:
addsql = ', active=False'
cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
cr.commit()
task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
task_thread.setDaemon(False)
task_thread.start()
cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
# Wake up time, without considering the currently processed jobs.
if jobs.keys():
cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active and id not in %s', (tuple(jobs.keys()),))
else:
cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
next_call = cr.dictfetchone()['min_next_call']
if next_call:
next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
else:
next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day
if not check:
self.setAlarm(self._poolJobs, next_call, db_name, db_name)
# Take the smallest nextcall value.
for job in jobs.values():
nextcall = self._compute_nextcall(job, now)
if nextcall < next_call:
next_call = nextcall
self.setAlarm(self._poolJobs, next_call, db_name, db_name)
except Exception, ex:
self._logger.warning('Exception in cron:', exc_info=True)
@ -156,6 +222,11 @@ class ir_cron(osv.osv, netsvc.Agent):
cr.commit()
cr.close()
def restart_all(self):
import openerp.models.registry
for dbname in openerp.models.registry.RegistryManager.registries:
self.restart(self, dbname)
def restart(self, dbname):
self.cancel(dbname)
# Reschedule cron processing job asap, but not in the current thread