[IMP] make cron timers per db
bzr revid: christophe@taupe-20090304235328-z2h1e1u5zy2ex4c7
This commit is contained in:
parent
810611d566
commit
2631f6e3da
|
@ -77,45 +77,45 @@ class ir_cron(osv.osv, netsvc.Agent):
|
|||
def _poolJobs(self, db_name, check=False):
|
||||
try:
|
||||
db, pool = pooler.get_db_and_pool(db_name)
|
||||
if pool._init:
|
||||
# retry in a few minutes
|
||||
self.setAlarm(self._poolJobs, int(time.time())+10*60, [db_name])
|
||||
cr = db.cursor()
|
||||
except:
|
||||
return False
|
||||
|
||||
now = DateTime.now()
|
||||
#FIXME: multidb. Solution: a l'instanciation d'une nouvelle connection bd (ds pooler) fo que j'instancie
|
||||
# un nouveau pooljob avec comme parametre la bd
|
||||
|
||||
try:
|
||||
cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
|
||||
for job in cr.dictfetchall():
|
||||
nextcall = DateTime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
|
||||
numbercall = job['numbercall']
|
||||
if pool._init:
|
||||
# retry in a few minutes
|
||||
next_call = 600
|
||||
else:
|
||||
next_call = next_wait
|
||||
cr = db.cursor()
|
||||
now = DateTime.now()
|
||||
try:
|
||||
cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
|
||||
for job in cr.dictfetchall():
|
||||
nextcall = DateTime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
|
||||
numbercall = job['numbercall']
|
||||
|
||||
ok = False
|
||||
while nextcall<now and numbercall:
|
||||
if numbercall > 0:
|
||||
numbercall -= 1
|
||||
if not ok or job['doall']:
|
||||
self._callback(cr, job['user_id'], job['model'], job['function'], job['args'])
|
||||
if numbercall:
|
||||
nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
|
||||
ok = True
|
||||
addsql=''
|
||||
if not numbercall:
|
||||
addsql = ', active=False'
|
||||
cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
|
||||
cr.commit()
|
||||
finally:
|
||||
cr.close()
|
||||
ok = False
|
||||
while nextcall < now and numbercall:
|
||||
if numbercall > 0:
|
||||
numbercall -= 1
|
||||
if not ok or job['doall']:
|
||||
self._callback(cr, job['user_id'], job['model'], job['function'], job['args'])
|
||||
if numbercall:
|
||||
nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
|
||||
ok = True
|
||||
addsql=''
|
||||
if not numbercall:
|
||||
addsql = ', active=False'
|
||||
cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
|
||||
cr.commit()
|
||||
finally:
|
||||
cr.close()
|
||||
|
||||
#
|
||||
# Can be improved to do at the min(min(nextcalls), time()+next_wait)
|
||||
# Can be improved to do at the min(min(nextcalls), time()+next_call)
|
||||
# But is this an improvement ?
|
||||
#
|
||||
if not check:
|
||||
self.setAlarm(self._poolJobs, int(time.time())+next_wait, [db_name])
|
||||
self.setAlarm(self._poolJobs, int(time.time()) + next_call, db_name, db_name)
|
||||
ir_cron()
|
||||
|
||||
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|
||||
|
|
|
@ -182,29 +182,33 @@ import tools
|
|||
init_logger()
|
||||
|
||||
class Agent(object):
|
||||
_timers = []
|
||||
_timers = {}
|
||||
_logger = Logger()
|
||||
|
||||
def setAlarm(self, fn, dt, args=None, kwargs=None):
|
||||
if not args:
|
||||
args = []
|
||||
if not kwargs:
|
||||
kwargs = {}
|
||||
def setAlarm(self, fn, dt, db_name, *args, **kwargs):
|
||||
wait = dt - time.time()
|
||||
if wait > 0:
|
||||
self._logger.notifyChannel('timers', LOG_DEBUG, "Job scheduled in %s seconds for %s.%s" % (wait, fn.im_class.__name__, fn.func_name))
|
||||
timer = threading.Timer(wait, fn, args, kwargs)
|
||||
timer.start()
|
||||
self._timers.append(timer)
|
||||
for timer in self._timers[:]:
|
||||
if not timer.isAlive():
|
||||
self._timers.remove(timer)
|
||||
self._timers.setdefault(db_name, []).append(timer)
|
||||
|
||||
for db in self._timers:
|
||||
for timer in self._timers[db]:
|
||||
if not timer.isAlive():
|
||||
self._timers[db].remove(timer)
|
||||
|
||||
@classmethod
|
||||
def cancel(cls, db_name):
|
||||
"""Cancel all timers for a given database. If None passed, all timers are cancelled"""
|
||||
for db in cls._timers:
|
||||
if db_name is None or db == db_name:
|
||||
for timer in cls._timers[db]:
|
||||
timer.cancel()
|
||||
|
||||
@classmethod
|
||||
def quit(cls):
|
||||
for timer in cls._timers:
|
||||
timer.cancel()
|
||||
quit = classmethod(quit)
|
||||
|
||||
cls.cancel(None)
|
||||
|
||||
import traceback
|
||||
|
||||
|
|
|
@ -57,6 +57,7 @@ def get_db_and_pool(db_name, force_demo=False, status=None, update_module=False)
|
|||
|
||||
def restart_pool(db_name, force_demo=False, status=None, update_module=False):
|
||||
if db_name in pool_dic:
|
||||
pool_dic[db_name].get('ir.cron').cancel(db_name)
|
||||
del pool_dic[db_name]
|
||||
return get_db_and_pool(db_name, force_demo, status, update_module=update_module)
|
||||
|
||||
|
|
Loading…
Reference in New Issue