From 6f5eb6b91e27f1c5aa83fd09badc3bb41fe88e3a Mon Sep 17 00:00:00 2001 From: Vo Minh Thu Date: Wed, 13 Jul 2011 15:49:33 +0200 Subject: [PATCH] [IMP] ir.cron: continued implementing multithreaded cron: - use a lock to protect the number of threads - the not task[0] condition in reschedule_in_advance is not really correct - but we have to remove the Agent in favor of a real cron master thread. bzr revid: vmt@openerp.com-20110713134933-gmfwddot50a3ib4k --- openerp/addons/base/ir/ir_cron.py | 61 ++++++++++++++++--------------- openerp/netsvc.py | 14 +++++-- 2 files changed, 42 insertions(+), 33 deletions(-) diff --git a/openerp/addons/base/ir/ir_cron.py b/openerp/addons/base/ir/ir_cron.py index 9360442e922..0998b43c8b7 100644 --- a/openerp/addons/base/ir/ir_cron.py +++ b/openerp/addons/base/ir/ir_cron.py @@ -76,24 +76,18 @@ class ir_cron(osv.osv, netsvc.Agent): 'doall' : lambda *a: 1 } - thread_count_lock = threading.Lock() - thread_count = 1 # maximum allowed number of thread. + def __init__(self, pool, cr): + self.thread_count_lock = threading.Lock() + self.thread_count = 2 # maximum allowed number of thread. + super(osv.osv, self).__init__(pool, cr) - @classmethod - def get_thread_count(cls): - return cls.thread_count + def get_thread_count(self): + return self.thread_count - @classmethod - def inc_thread_count(cls): - cls.thread_count_lock.acquire() - cls.thread_count += 1 - cls.thread_count_lock.release() - - @classmethod - def dec_thread_count(cls): - cls.thread_count_lock.acquire() - cls.thread_count -= 1 - cls.thread_count_lock.release() + def dec_thread_count(self): + self.thread_count_lock.acquire() + self.thread_count -= 1 + self.thread_count_lock.release() def f(a, b, c): print ">>> in f" @@ -105,7 +99,7 @@ class ir_cron(osv.osv, netsvc.Agent): def expensive_2(a, b, c): print ">>> in expensive_2" - time.sleep(80) + time.sleep(30) print ">>> out expensive_2" def _check_args(self, cr, uid, ids, context=None): @@ -187,10 +181,16 @@ class ir_cron(osv.osv, netsvc.Agent): # This is really needed if this job run longer that its rescheduling period. print ">>> advance at", nextcall nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S')) - self.reschedule_in_advance(self._poolJobs, nextcall, cr.dbname, cr.dbname) + with self.thread_count_lock: + self.reschedule_in_advance(self._poolJobs, nextcall, cr.dbname, cr.dbname) finally: cr.commit() cr.close() + with self.thread_count_lock: + self.thread_count += 1 + # reschedule the master thread in advance, using its saved next_call value. + self.reschedule_in_advance(self._poolJobs, self.next_call, cr.dbname, cr.dbname) + self.next_call = None def _poolJobs(self, db_name): return self._run_jobs(db_name) @@ -210,6 +210,7 @@ class ir_cron(osv.osv, netsvc.Agent): except: return False print ">>> _run_jobs" + self.next_call = None cr = db.cursor() try: jobs = {} # mapping job ids to jobs for all jobs being processed. @@ -217,9 +218,13 @@ class ir_cron(osv.osv, netsvc.Agent): now = datetime.now() cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority') for job in cr.dictfetchall(): + print ">>>", self.get_thread_count(), "threads" + if not self.get_thread_count(): + break task_cr = db.cursor() task_job = None jobs[job['id']] = job + try: # Try to lock the job... task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False) @@ -241,6 +246,7 @@ class ir_cron(osv.osv, netsvc.Agent): task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now)) # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default) task_thread.setDaemon(False) + self.dec_thread_count() task_thread.start() # Wake up time, without considering the currently processed jobs. @@ -251,22 +257,19 @@ class ir_cron(osv.osv, netsvc.Agent): next_call = cr.dictfetchone()['min_next_call'] print ">>> possibility at ", next_call - # Wake up time, taking the smallest processed job nextcall value. - for job in jobs.values(): - nextcall = self._compute_nextcall(job, now) - print ">>> or at ", nextcall - if not nextcall: - continue - if not next_call or nextcall < next_call: - next_call = nextcall - print ">>> rescheduling at", next_call - if next_call: next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S')) else: next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day - self.setAlarm(self._poolJobs, next_call, db_name, db_name) + # avoid race condition: the thread rescheduled the main thread, then the main thread puts +3600. + with self.thread_count_lock: + if not self.thread_count: + print ">>> no more threads" + self.next_call = next_call + next_call = int(time.time()) + 3600 # no available thread, it will run again after 1 day + + self.reschedule_in_advance(self._poolJobs, next_call, db_name, db_name) except Exception, ex: self._logger.warning('Exception in cron:', exc_info=True) diff --git a/openerp/netsvc.py b/openerp/netsvc.py index 26f7cbe7875..5ec114f7ce1 100644 --- a/openerp/netsvc.py +++ b/openerp/netsvc.py @@ -285,14 +285,18 @@ class Agent(object): @classmethod def reschedule_in_advance(cls, function, timestamp, db_name, *args, **kwargs): + if not timestamp: + return # Cancel the previous task if any. - old_timestamp = None + old_timestamp = False if db_name in cls.__tasks_by_db: for task in cls.__tasks_by_db[db_name]: - if task[2] == function and timestamp < task[0]: - old_timestamp = task[0] + print ">>> function:", function + if task[2] == function and (not task[0] or timestamp < task[0]): + old_timestamp = True task[0] = 0 - if not old_timestamp or timestamp < old_timestamp: + if old_timestamp or db_name not in cls.__tasks_by_db or not cls.__tasks_by_db[db_name]: + print ">>> rescheduled earlier", timestamp cls.setAlarm(function, timestamp, db_name, *args, **kwargs) @classmethod @@ -306,6 +310,7 @@ class Agent(object): """ current_thread = threading.currentThread() while True: + print ">>>>> starting thread for" while cls.__tasks and cls.__tasks[0][0] < time.time(): task = heapq.heappop(cls.__tasks) timestamp, dbname, function, args, kwargs = task @@ -319,6 +324,7 @@ class Agent(object): task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs) # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default) task_thread.setDaemon(False) + print ">>>>> -", function.func_name task_thread.start() time.sleep(1) time.sleep(60)