[IMP] cron: removed unnecessary multi-tasks per db in Agent, some cleaning.

bzr revid: vmt@openerp.com-20110714110809-sgsoev9i24589sn8
This commit is contained in:
Vo Minh Thu 2011-07-14 13:08:09 +02:00
parent b5daffc115
commit 32e830eb99
3 changed files with 48 additions and 85 deletions

View File

@ -199,7 +199,8 @@ if os.name == 'posix':
signal.signal(signal.SIGQUIT, dumpstacks) signal.signal(signal.SIGQUIT, dumpstacks)
def quit(): def quit():
openerp.netsvc.Agent.quit() # stop scheduling new jobs; we will have to wait for the jobs to complete below
openerp.netsvc.Agent.cancel_all()
openerp.netsvc.Server.quitAll() openerp.netsvc.Server.quitAll()
if config['pidfile']: if config['pidfile']:
os.unlink(config['pidfile']) os.unlink(config['pidfile'])
@ -215,7 +216,7 @@ def quit():
if thread != threading.currentThread() and not thread.isDaemon(): if thread != threading.currentThread() and not thread.isDaemon():
while thread.isAlive(): while thread.isAlive():
# need a busyloop here as thread.join() masks signals # need a busyloop here as thread.join() masks signals
# and would present the forced shutdown # and would prevent the forced shutdown
thread.join(0.05) thread.join(0.05)
time.sleep(0.05) time.sleep(0.05)
openerp.modules.registry.RegistryManager.delete_all() openerp.modules.registry.RegistryManager.delete_all()

View File

@ -76,10 +76,8 @@ class ir_cron(osv.osv, netsvc.Agent):
'doall' : lambda *a: 1 'doall' : lambda *a: 1
} }
def __init__(self, pool, cr): thread_count_lock = threading.Lock()
self.thread_count_lock = threading.Lock() thread_count = 2 # maximum allowed number of thread.
self.thread_count = 2 # maximum allowed number of thread.
super(osv.osv, self).__init__(pool, cr)
def get_thread_count(self): def get_thread_count(self):
return self.thread_count return self.thread_count
@ -137,25 +135,6 @@ class ir_cron(osv.osv, netsvc.Agent):
except Exception, e: except Exception, e:
self._handle_callback_exception(cr, uid, model, func, args, job_id, e) self._handle_callback_exception(cr, uid, model, func, args, job_id, e)
def _compute_nextcall(self, job, now):
""" Compute the nextcall for a job exactly as _run_job does.
Return either the nextcall or None if it shouldn't be called.
"""
nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
numbercall = job['numbercall']
while nextcall < now and numbercall:
if numbercall > 0:
numbercall -= 1
if numbercall:
nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
if not numbercall:
return None
return nextcall.strftime('%Y-%m-%d %H:%M:%S')
def _run_job(self, cr, job, now): def _run_job(self, cr, job, now):
""" Run a given job taking care of the repetition. """ """ Run a given job taking care of the repetition. """
try: try:
@ -182,20 +161,17 @@ class ir_cron(osv.osv, netsvc.Agent):
print ">>> advance at", nextcall print ">>> advance at", nextcall
nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S')) nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
with self.thread_count_lock: with self.thread_count_lock:
self.reschedule_in_advance(self._poolJobs, nextcall, cr.dbname, cr.dbname) self.schedule_in_advance(nextcall, cr.dbname)
finally: finally:
cr.commit() cr.commit()
cr.close() cr.close()
with self.thread_count_lock: with self.thread_count_lock:
self.thread_count += 1 self.thread_count += 1
# reschedule the master thread in advance, using its saved next_call value. # reschedule the master thread in advance, using its saved next_call value.
self.reschedule_in_advance(self._poolJobs, self.next_call, cr.dbname, cr.dbname) self.schedule_in_advance(self.next_call, cr.dbname)
self.next_call = None self.next_call = None
def _poolJobs(self, db_name): def _run_jobs(self):
return self._run_jobs(db_name)
def _run_jobs(self, db_name):
# TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
""" Process the cron jobs by spawning worker threads. """ Process the cron jobs by spawning worker threads.
@ -205,8 +181,9 @@ class ir_cron(osv.osv, netsvc.Agent):
locked to be taken care of by another thread. locked to be taken care of by another thread.
""" """
db_name = self.pool.db.dbname
try: try:
db, pool = pooler.get_db_and_pool(db_name) db, pool = self.pool.db, self.pool
except: except:
return False return False
print ">>> _run_jobs" print ">>> _run_jobs"
@ -269,7 +246,7 @@ class ir_cron(osv.osv, netsvc.Agent):
self.next_call = next_call self.next_call = next_call
next_call = int(time.time()) + 3600 # no available thread, it will run again after 1 day next_call = int(time.time()) + 3600 # no available thread, it will run again after 1 day
self.reschedule_in_advance(self._poolJobs, next_call, db_name, db_name) self.schedule_in_advance(next_call, db_name)
except Exception, ex: except Exception, ex:
self._logger.warning('Exception in cron:', exc_info=True) self._logger.warning('Exception in cron:', exc_info=True)
@ -286,7 +263,7 @@ class ir_cron(osv.osv, netsvc.Agent):
def restart(self, dbname): def restart(self, dbname):
self.cancel(dbname) self.cancel(dbname)
# Reschedule cron processing job asap, but not in the current thread # Reschedule cron processing job asap, but not in the current thread
self.setAlarm(self._poolJobs, time.time(), dbname, dbname) self.schedule_in_advance(time.time(), dbname)
def update_running_cron(self, cr): def update_running_cron(self, cr):
# Verify whether the server is already started and thus whether we need to commit # Verify whether the server is already started and thus whether we need to commit

View File

@ -37,6 +37,7 @@ from pprint import pformat
# TODO modules that import netsvc only for things from loglevels must be changed to use loglevels. # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
from loglevels import * from loglevels import *
import tools import tools
import openerp
def close_socket(sock): def close_socket(sock):
""" Closes a socket instance cleanly """ Closes a socket instance cleanly
@ -252,81 +253,65 @@ class Agent(object):
* a timestamp * a timestamp
* the database on which the task run * the database on which the task run
* the function to call * a boolean attribute specifying if the task is canceled
* the arguments and keyword arguments to pass to the function
Implementation details: Implementation details:
- Tasks are stored as list, allowing the cancellation by setting - Tasks are stored as list, allowing the cancellation by setting
the timestamp to 0. the boolean to True.
- A heapq is used to store tasks, so we don't need to sort - A heapq is used to store tasks, so we don't need to sort
tasks ourself. tasks ourself.
""" """
__tasks = [] _wakeups = []
__tasks_by_db = {} _wakeup_by_db = {}
_logger = logging.getLogger('netsvc.agent') _logger = logging.getLogger('netsvc.agent')
@classmethod
def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
task = [timestamp, db_name, function, args, kwargs]
heapq.heappush(cls.__tasks, task)
cls.__tasks_by_db.setdefault(db_name, []).append(task)
@classmethod @classmethod
def cancel(cls, db_name): def cancel(cls, db_name):
"""Cancel all tasks for a given database. If None is passed, all tasks are cancelled""" """ Cancel next wakeup for a given database. """
cls._logger.debug("Cancel timers for %s db", db_name or 'all') cls._logger.debug("Cancel next wake-up for database '%s'.", db_name)
if db_name is None: if db_name in cls._wakeup_by_db:
cls.__tasks, cls.__tasks_by_db = [], {} cls._wakeup_by_db[db_name][2] = True
else:
if db_name in cls.__tasks_by_db:
for task in cls.__tasks_by_db[db_name]:
task[0] = 0
@classmethod @classmethod
def reschedule_in_advance(cls, function, timestamp, db_name, *args, **kwargs): def cancel_all(cls):
cls._wakeups = []
cls._wakeup_by_db = {}
@classmethod
def schedule_in_advance(cls, timestamp, db_name):
if not timestamp: if not timestamp:
return return
# Cancel the previous task if any. # Cancel the previous wakeup if any.
old_timestamp = False add_wakeup = False
if db_name in cls.__tasks_by_db: if db_name in cls._wakeup_by_db:
for task in cls.__tasks_by_db[db_name]: task = cls._wakeup_by_db[db_name]
print ">>> function:", function if task[2] or timestamp < task[0]:
if task[2] == function and (not task[0] or timestamp < task[0]): add_wakeup = True
old_timestamp = True task[2] = True
task[0] = 0 else:
if old_timestamp or db_name not in cls.__tasks_by_db or not cls.__tasks_by_db[db_name]: add_wakeup = True
if add_wakeup:
print ">>> rescheduled earlier", timestamp print ">>> rescheduled earlier", timestamp
cls.setAlarm(function, timestamp, db_name, *args, **kwargs) task = [timestamp, db_name, False]
heapq.heappush(cls._wakeups, task)
@classmethod cls._wakeup_by_db[db_name] = task
def quit(cls):
cls.cancel(None)
@classmethod @classmethod
def runner(cls): def runner(cls):
"""Neverending function (intended to be ran in a dedicated thread) that """Neverending function (intended to be ran in a dedicated thread) that
checks every 60 seconds tasks to run. TODO: make configurable checks every 60 seconds tasks to run. TODO: make configurable
""" """
current_thread = threading.currentThread()
while True: while True:
print ">>>>> starting thread for" print ">>>>> cron for"
while cls.__tasks and cls.__tasks[0][0] < time.time(): while cls._wakeups and cls._wakeups[0][0] < time.time():
task = heapq.heappop(cls.__tasks) task = heapq.heappop(cls._wakeups)
timestamp, dbname, function, args, kwargs = task timestamp, db_name, canceled = task
cls.__tasks_by_db[dbname].remove(task) del cls._wakeup_by_db[db_name]
if not timestamp: if canceled:
# null timestamp -> cancelled task
continue continue
current_thread.dbname = dbname # hack hack ir_cron = openerp.pooler.get_pool(db_name).get('ir.cron')
cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs) ir_cron._run_jobs()
delattr(current_thread, 'dbname')
task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
task_thread.setDaemon(False)
print ">>>>> -", function.func_name
task_thread.start()
time.sleep(1)
time.sleep(60) time.sleep(60)
def start_agent(): def start_agent():