From a87fef6cdf54ccc5a67aab65906ab22a0d338fed Mon Sep 17 00:00:00 2001 From: Christophe Simonis Date: Mon, 20 Sep 2010 13:22:23 +0200 Subject: [PATCH] [IMP] use a single thread for crons (forwardport of 2123 chs@openerp.com-20100920112005-hk9zrgpbx9m0w6tv from 5.0 branch) lp bug: https://launchpad.net/bugs/640493 fixed bzr revid: chs@openerp.com-20100920112223-poqz0q1yb4kzsqs5 --- bin/netsvc.py | 73 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 53 insertions(+), 20 deletions(-) diff --git a/bin/netsvc.py b/bin/netsvc.py index 13041e2d9bc..d29aebd92b5 100644 --- a/bin/netsvc.py +++ b/bin/netsvc.py @@ -35,6 +35,7 @@ import time import release from pprint import pformat import warnings +import heapq class Service(object): """ Base class for *Local* services @@ -292,36 +293,68 @@ import tools init_logger() class Agent(object): - _timers = {} + """Singleton that keeps track of cancellable tasks to run at a given + timestamp. + The tasks are caracterised by: + * a timestamp + * the database on which the task run + * the function to call + * the arguments and keyword arguments to pass to the function + + Implementation details: + Tasks are stored as list, allowing the cancellation by setting + the timestamp to 0. + A heapq is used to store tasks, so we don't need to sort + tasks ourself. + """ + __tasks = [] + __tasks_by_db = {} _logger = Logger() - __logger = logging.getLogger('timer') - - def setAlarm(self, fn, dt, db_name, *args, **kwargs): - wait = dt - time.time() - if wait > 0: - self.__logger.debug("Job scheduled in %.3g seconds for %s.%s" % (wait, fn.im_class.__name__, fn.func_name)) - timer = threading.Timer(wait, fn, args, kwargs) - timer.start() - self._timers.setdefault(db_name, []).append(timer) - - for db in self._timers: - for timer in self._timers[db]: - if not timer.isAlive(): - self._timers[db].remove(timer) + @classmethod + def setAlarm(cls, function, timestamp, db_name, *args, **kwargs): + task = [timestamp, db_name, function, args, kwargs] + heapq.heappush(cls.__tasks, task) + cls.__tasks_by_db.setdefault(db_name, []).append(task) @classmethod def cancel(cls, db_name): - """Cancel all timers for a given database. If None passed, all timers are cancelled""" - for db in cls._timers: - if db_name is None or db == db_name: - for timer in cls._timers[db]: - timer.cancel() + """Cancel all tasks for a given database. If None is passed, all tasks are cancelled""" + if db_name is None: + cls.__tasks, cls.__tasks_by_db = [], {} + else: + if db_name in cls.__tasks_by_db: + for task in cls.__tasks_by_db[db_name]: + task[0] = 0 @classmethod def quit(cls): cls.cancel(None) + @classmethod + def runner(cls): + """Neverending function (intended to be ran in a dedicated thread) that + checks every 60 seconds tasks to run. + """ + current_thread = threading.currentThread() + while True: + while cls.__tasks and cls.__tasks[0][0] < time.time(): + task = heapq.heappop(cls.__tasks) + timestamp, dbname, function, args, kwargs = task + cls.__tasks_by_db[dbname].remove(task) + if not timestamp: + # null timestamp -> cancelled task + continue + current_thread.dbname = dbname # hack hack + cls._logger.notifyChannel('timers', LOG_DEBUG, "Run %s.%s(*%r, **%r)" % (function.im_class.__name__, function.func_name, args, kwargs)) + delattr(current_thread, 'dbname') + threading.Thread(target=function, args=args, kwargs=kwargs).start() + time.sleep(1) + time.sleep(60) + +threading.Thread(target=Agent.runner).start() + + import traceback class Server: