diff --git a/openerp-server b/openerp-server index 3586b11c7d2..347cd9f6723 100755 --- a/openerp-server +++ b/openerp-server @@ -109,6 +109,7 @@ if config['db_name']: cr.rollback() pool.get('ir.cron')._poolJobs(db.dbname) + # pool.get('ir.cron').restart(db.dbname) # jobs will start to be processed later, when start_agent below is called. cr.close() diff --git a/openerp/addons/base/ir/ir_cron.py b/openerp/addons/base/ir/ir_cron.py index 054a15257ef..ba25f59525d 100644 --- a/openerp/addons/base/ir/ir_cron.py +++ b/openerp/addons/base/ir/ir_cron.py @@ -21,6 +21,8 @@ import time import logging +import threading +import psycopg2 from datetime import datetime from dateutil.relativedelta import relativedelta import netsvc @@ -74,6 +76,19 @@ class ir_cron(osv.osv, netsvc.Agent): 'doall' : lambda *a: 1 } + def f(a, b, c): + print ">>> in f" + + def expensive(a, b, c): + print ">>> in expensive" + time.sleep(80) + print ">>> out expensive" + + def expensive_2(a, b, c): + print ">>> in expensive_2" + time.sleep(80) + print ">>> out expensive_2" + def _check_args(self, cr, uid, ids, context=None): try: for this in self.browse(cr, uid, ids, context): @@ -109,45 +124,96 @@ class ir_cron(osv.osv, netsvc.Agent): except Exception, e: self._handle_callback_exception(cr, uid, model, func, args, job_id, e) - def _poolJobs(self, db_name, check=False): + def _compute_nextcall(self, job, now): + """ Compute the nextcall for a job exactly as _run_job does. """ + nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S') + numbercall = job['numbercall'] + + while nextcall < now and numbercall: + if numbercall > 0: + numbercall -= 1 + if numbercall: + nextcall += _intervalTypes[job['interval_type']](job['interval_number']) + + return nextcall.strftime('%Y-%m-%d %H:%M:%S') + + def _run_job(self, cr, job, now): + """ Run a given job. """ + try: + nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S') + numbercall = job['numbercall'] + + ok = False + while nextcall < now and numbercall: + if numbercall > 0: + numbercall -= 1 + if not ok or job['doall']: + self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id']) + if numbercall: + nextcall += _intervalTypes[job['interval_type']](job['interval_number']) + ok = True + addsql = '' + if not numbercall: + addsql = ', active=False' + cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id'])) + # TODO re-schedule the master thread to nextcall if its wake-up time is later than nextcall. + # TODO NOTIFY the 'ir_cron' channel. + finally: + cr.commit() + cr.close() + + def _poolJobs(self, db_name): + # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py try: db, pool = pooler.get_db_and_pool(db_name) except: return False cr = db.cursor() try: + jobs = {} # mapping job ids to jobs for all jobs being processed. if not pool._init: now = datetime.now() cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority') for job in cr.dictfetchall(): - nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S') - numbercall = job['numbercall'] + task_cr = db.cursor() + task_job = None + try: + task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False) + task_job = task_cr.dictfetchall()[0] + jobs[job['id']] = job + except psycopg2.OperationalError, e: + if e.pgcode == '55P03': + # Class 55: Object not in prerequisite state, 55P03: lock_not_available + continue + else: + raise + finally: + if not task_job: + task_cr.close() - ok = False - while nextcall < now and numbercall: - if numbercall > 0: - numbercall -= 1 - if not ok or job['doall']: - self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id']) - if numbercall: - nextcall += _intervalTypes[job['interval_type']](job['interval_number']) - ok = True - addsql = '' - if not numbercall: - addsql = ', active=False' - cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id'])) - cr.commit() + task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now)) + # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default) + task_thread.setDaemon(False) + task_thread.start() - - cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active') + # Wake up time, without considering the currently processed jobs. + if jobs.keys(): + cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active and id not in %s', (tuple(jobs.keys()),)) + else: + cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active') next_call = cr.dictfetchone()['min_next_call'] if next_call: next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S')) else: next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day - if not check: - self.setAlarm(self._poolJobs, next_call, db_name, db_name) + # Take the smallest nextcall value. + for job in jobs.values(): + nextcall = self._compute_nextcall(job, now) + if nextcall < next_call: + next_call = nextcall + + self.setAlarm(self._poolJobs, next_call, db_name, db_name) except Exception, ex: self._logger.warning('Exception in cron:', exc_info=True) @@ -156,6 +222,11 @@ class ir_cron(osv.osv, netsvc.Agent): cr.commit() cr.close() + def restart_all(self): + import openerp.models.registry + for dbname in openerp.models.registry.RegistryManager.registries: + self.restart(self, dbname) + def restart(self, dbname): self.cancel(dbname) # Reschedule cron processing job asap, but not in the current thread