From 32e830eb9963ff0fb276352ac87920ca8e2d870c Mon Sep 17 00:00:00 2001 From: Vo Minh Thu Date: Thu, 14 Jul 2011 13:08:09 +0200 Subject: [PATCH] [IMP] cron: removed unnecessary multi-tasks per db in Agent, some cleaning. bzr revid: vmt@openerp.com-20110714110809-sgsoev9i24589sn8 --- openerp-server | 5 +- openerp/addons/base/ir/ir_cron.py | 41 ++++----------- openerp/netsvc.py | 87 +++++++++++++------------------ 3 files changed, 48 insertions(+), 85 deletions(-) diff --git a/openerp-server b/openerp-server index 56aa278940a..d5876111c3b 100755 --- a/openerp-server +++ b/openerp-server @@ -199,7 +199,8 @@ if os.name == 'posix': signal.signal(signal.SIGQUIT, dumpstacks) def quit(): - openerp.netsvc.Agent.quit() + # stop scheduling new jobs; we will have to wait for the jobs to complete below + openerp.netsvc.Agent.cancel_all() openerp.netsvc.Server.quitAll() if config['pidfile']: os.unlink(config['pidfile']) @@ -215,7 +216,7 @@ def quit(): if thread != threading.currentThread() and not thread.isDaemon(): while thread.isAlive(): # need a busyloop here as thread.join() masks signals - # and would present the forced shutdown + # and would prevent the forced shutdown thread.join(0.05) time.sleep(0.05) openerp.modules.registry.RegistryManager.delete_all() diff --git a/openerp/addons/base/ir/ir_cron.py b/openerp/addons/base/ir/ir_cron.py index 0998b43c8b7..61a8b6d1f52 100644 --- a/openerp/addons/base/ir/ir_cron.py +++ b/openerp/addons/base/ir/ir_cron.py @@ -76,10 +76,8 @@ class ir_cron(osv.osv, netsvc.Agent): 'doall' : lambda *a: 1 } - def __init__(self, pool, cr): - self.thread_count_lock = threading.Lock() - self.thread_count = 2 # maximum allowed number of thread. - super(osv.osv, self).__init__(pool, cr) + thread_count_lock = threading.Lock() + thread_count = 2 # maximum allowed number of thread. def get_thread_count(self): return self.thread_count @@ -137,25 +135,6 @@ class ir_cron(osv.osv, netsvc.Agent): except Exception, e: self._handle_callback_exception(cr, uid, model, func, args, job_id, e) - def _compute_nextcall(self, job, now): - """ Compute the nextcall for a job exactly as _run_job does. - - Return either the nextcall or None if it shouldn't be called. - - """ - nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S') - numbercall = job['numbercall'] - - while nextcall < now and numbercall: - if numbercall > 0: - numbercall -= 1 - if numbercall: - nextcall += _intervalTypes[job['interval_type']](job['interval_number']) - - if not numbercall: - return None - return nextcall.strftime('%Y-%m-%d %H:%M:%S') - def _run_job(self, cr, job, now): """ Run a given job taking care of the repetition. """ try: @@ -182,20 +161,17 @@ class ir_cron(osv.osv, netsvc.Agent): print ">>> advance at", nextcall nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S')) with self.thread_count_lock: - self.reschedule_in_advance(self._poolJobs, nextcall, cr.dbname, cr.dbname) + self.schedule_in_advance(nextcall, cr.dbname) finally: cr.commit() cr.close() with self.thread_count_lock: self.thread_count += 1 # reschedule the master thread in advance, using its saved next_call value. - self.reschedule_in_advance(self._poolJobs, self.next_call, cr.dbname, cr.dbname) + self.schedule_in_advance(self.next_call, cr.dbname) self.next_call = None - def _poolJobs(self, db_name): - return self._run_jobs(db_name) - - def _run_jobs(self, db_name): + def _run_jobs(self): # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py """ Process the cron jobs by spawning worker threads. @@ -205,8 +181,9 @@ class ir_cron(osv.osv, netsvc.Agent): locked to be taken care of by another thread. """ + db_name = self.pool.db.dbname try: - db, pool = pooler.get_db_and_pool(db_name) + db, pool = self.pool.db, self.pool except: return False print ">>> _run_jobs" @@ -269,7 +246,7 @@ class ir_cron(osv.osv, netsvc.Agent): self.next_call = next_call next_call = int(time.time()) + 3600 # no available thread, it will run again after 1 day - self.reschedule_in_advance(self._poolJobs, next_call, db_name, db_name) + self.schedule_in_advance(next_call, db_name) except Exception, ex: self._logger.warning('Exception in cron:', exc_info=True) @@ -286,7 +263,7 @@ class ir_cron(osv.osv, netsvc.Agent): def restart(self, dbname): self.cancel(dbname) # Reschedule cron processing job asap, but not in the current thread - self.setAlarm(self._poolJobs, time.time(), dbname, dbname) + self.schedule_in_advance(time.time(), dbname) def update_running_cron(self, cr): # Verify whether the server is already started and thus whether we need to commit diff --git a/openerp/netsvc.py b/openerp/netsvc.py index 5ec114f7ce1..7ad5778244c 100644 --- a/openerp/netsvc.py +++ b/openerp/netsvc.py @@ -37,6 +37,7 @@ from pprint import pformat # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels. from loglevels import * import tools +import openerp def close_socket(sock): """ Closes a socket instance cleanly @@ -252,81 +253,65 @@ class Agent(object): * a timestamp * the database on which the task run - * the function to call - * the arguments and keyword arguments to pass to the function + * a boolean attribute specifying if the task is canceled Implementation details: - Tasks are stored as list, allowing the cancellation by setting - the timestamp to 0. + the boolean to True. - A heapq is used to store tasks, so we don't need to sort tasks ourself. """ - __tasks = [] - __tasks_by_db = {} + _wakeups = [] + _wakeup_by_db = {} _logger = logging.getLogger('netsvc.agent') - @classmethod - def setAlarm(cls, function, timestamp, db_name, *args, **kwargs): - task = [timestamp, db_name, function, args, kwargs] - heapq.heappush(cls.__tasks, task) - cls.__tasks_by_db.setdefault(db_name, []).append(task) - @classmethod def cancel(cls, db_name): - """Cancel all tasks for a given database. If None is passed, all tasks are cancelled""" - cls._logger.debug("Cancel timers for %s db", db_name or 'all') - if db_name is None: - cls.__tasks, cls.__tasks_by_db = [], {} - else: - if db_name in cls.__tasks_by_db: - for task in cls.__tasks_by_db[db_name]: - task[0] = 0 + """ Cancel next wakeup for a given database. """ + cls._logger.debug("Cancel next wake-up for database '%s'.", db_name) + if db_name in cls._wakeup_by_db: + cls._wakeup_by_db[db_name][2] = True @classmethod - def reschedule_in_advance(cls, function, timestamp, db_name, *args, **kwargs): + def cancel_all(cls): + cls._wakeups = [] + cls._wakeup_by_db = {} + + @classmethod + def schedule_in_advance(cls, timestamp, db_name): if not timestamp: return - # Cancel the previous task if any. - old_timestamp = False - if db_name in cls.__tasks_by_db: - for task in cls.__tasks_by_db[db_name]: - print ">>> function:", function - if task[2] == function and (not task[0] or timestamp < task[0]): - old_timestamp = True - task[0] = 0 - if old_timestamp or db_name not in cls.__tasks_by_db or not cls.__tasks_by_db[db_name]: + # Cancel the previous wakeup if any. + add_wakeup = False + if db_name in cls._wakeup_by_db: + task = cls._wakeup_by_db[db_name] + if task[2] or timestamp < task[0]: + add_wakeup = True + task[2] = True + else: + add_wakeup = True + if add_wakeup: print ">>> rescheduled earlier", timestamp - cls.setAlarm(function, timestamp, db_name, *args, **kwargs) - - @classmethod - def quit(cls): - cls.cancel(None) + task = [timestamp, db_name, False] + heapq.heappush(cls._wakeups, task) + cls._wakeup_by_db[db_name] = task @classmethod def runner(cls): """Neverending function (intended to be ran in a dedicated thread) that checks every 60 seconds tasks to run. TODO: make configurable """ - current_thread = threading.currentThread() while True: - print ">>>>> starting thread for" - while cls.__tasks and cls.__tasks[0][0] < time.time(): - task = heapq.heappop(cls.__tasks) - timestamp, dbname, function, args, kwargs = task - cls.__tasks_by_db[dbname].remove(task) - if not timestamp: - # null timestamp -> cancelled task + print ">>>>> cron for" + while cls._wakeups and cls._wakeups[0][0] < time.time(): + task = heapq.heappop(cls._wakeups) + timestamp, db_name, canceled = task + del cls._wakeup_by_db[db_name] + if canceled: continue - current_thread.dbname = dbname # hack hack - cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs) - delattr(current_thread, 'dbname') - task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs) - # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default) - task_thread.setDaemon(False) - print ">>>>> -", function.func_name - task_thread.start() - time.sleep(1) + ir_cron = openerp.pooler.get_pool(db_name).get('ir.cron') + ir_cron._run_jobs() time.sleep(60) def start_agent():