[IMP] use a single thread for crons (forwardport of 2123 chs@openerp.com-20100920112005-hk9zrgpbx9m0w6tv from 5.0 branch)

lp bug: https://launchpad.net/bugs/640493 fixed

bzr revid: chs@openerp.com-20100920112223-poqz0q1yb4kzsqs5
This commit is contained in:
Christophe Simonis 2010-09-20 13:22:23 +02:00
parent 6c8bc73943
commit a87fef6cdf
1 changed files with 53 additions and 20 deletions

View File

@ -35,6 +35,7 @@ import time
import release
from pprint import pformat
import warnings
import heapq
class Service(object):
""" Base class for *Local* services
@ -292,36 +293,68 @@ import tools
init_logger()
class Agent(object):
_timers = {}
"""Singleton that keeps track of cancellable tasks to run at a given
timestamp.
The tasks are caracterised by:
* a timestamp
* the database on which the task run
* the function to call
* the arguments and keyword arguments to pass to the function
Implementation details:
Tasks are stored as list, allowing the cancellation by setting
the timestamp to 0.
A heapq is used to store tasks, so we don't need to sort
tasks ourself.
"""
__tasks = []
__tasks_by_db = {}
_logger = Logger()
__logger = logging.getLogger('timer')
def setAlarm(self, fn, dt, db_name, *args, **kwargs):
wait = dt - time.time()
if wait > 0:
self.__logger.debug("Job scheduled in %.3g seconds for %s.%s" % (wait, fn.im_class.__name__, fn.func_name))
timer = threading.Timer(wait, fn, args, kwargs)
timer.start()
self._timers.setdefault(db_name, []).append(timer)
for db in self._timers:
for timer in self._timers[db]:
if not timer.isAlive():
self._timers[db].remove(timer)
@classmethod
def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
task = [timestamp, db_name, function, args, kwargs]
heapq.heappush(cls.__tasks, task)
cls.__tasks_by_db.setdefault(db_name, []).append(task)
@classmethod
def cancel(cls, db_name):
"""Cancel all timers for a given database. If None passed, all timers are cancelled"""
for db in cls._timers:
if db_name is None or db == db_name:
for timer in cls._timers[db]:
timer.cancel()
"""Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
if db_name is None:
cls.__tasks, cls.__tasks_by_db = [], {}
else:
if db_name in cls.__tasks_by_db:
for task in cls.__tasks_by_db[db_name]:
task[0] = 0
@classmethod
def quit(cls):
cls.cancel(None)
@classmethod
def runner(cls):
"""Neverending function (intended to be ran in a dedicated thread) that
checks every 60 seconds tasks to run.
"""
current_thread = threading.currentThread()
while True:
while cls.__tasks and cls.__tasks[0][0] < time.time():
task = heapq.heappop(cls.__tasks)
timestamp, dbname, function, args, kwargs = task
cls.__tasks_by_db[dbname].remove(task)
if not timestamp:
# null timestamp -> cancelled task
continue
current_thread.dbname = dbname # hack hack
cls._logger.notifyChannel('timers', LOG_DEBUG, "Run %s.%s(*%r, **%r)" % (function.im_class.__name__, function.func_name, args, kwargs))
delattr(current_thread, 'dbname')
threading.Thread(target=function, args=args, kwargs=kwargs).start()
time.sleep(1)
time.sleep(60)
threading.Thread(target=Agent.runner).start()
import traceback
class Server: