[IMP] ir.cron: continued implementing multithreaded cron:
- use a lock to protect the number of threads - the not task[0] condition in reschedule_in_advance is not really correct - but we have to remove the Agent in favor of a real cron master thread. bzr revid: vmt@openerp.com-20110713134933-gmfwddot50a3ib4k
This commit is contained in:
parent
95d21a8a7e
commit
6f5eb6b91e
|
@ -76,24 +76,18 @@ class ir_cron(osv.osv, netsvc.Agent):
|
||||||
'doall' : lambda *a: 1
|
'doall' : lambda *a: 1
|
||||||
}
|
}
|
||||||
|
|
||||||
thread_count_lock = threading.Lock()
|
def __init__(self, pool, cr):
|
||||||
thread_count = 1 # maximum allowed number of thread.
|
self.thread_count_lock = threading.Lock()
|
||||||
|
self.thread_count = 2 # maximum allowed number of thread.
|
||||||
|
super(osv.osv, self).__init__(pool, cr)
|
||||||
|
|
||||||
@classmethod
|
def get_thread_count(self):
|
||||||
def get_thread_count(cls):
|
return self.thread_count
|
||||||
return cls.thread_count
|
|
||||||
|
|
||||||
@classmethod
|
def dec_thread_count(self):
|
||||||
def inc_thread_count(cls):
|
self.thread_count_lock.acquire()
|
||||||
cls.thread_count_lock.acquire()
|
self.thread_count -= 1
|
||||||
cls.thread_count += 1
|
self.thread_count_lock.release()
|
||||||
cls.thread_count_lock.release()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def dec_thread_count(cls):
|
|
||||||
cls.thread_count_lock.acquire()
|
|
||||||
cls.thread_count -= 1
|
|
||||||
cls.thread_count_lock.release()
|
|
||||||
|
|
||||||
def f(a, b, c):
|
def f(a, b, c):
|
||||||
print ">>> in f"
|
print ">>> in f"
|
||||||
|
@ -105,7 +99,7 @@ class ir_cron(osv.osv, netsvc.Agent):
|
||||||
|
|
||||||
def expensive_2(a, b, c):
|
def expensive_2(a, b, c):
|
||||||
print ">>> in expensive_2"
|
print ">>> in expensive_2"
|
||||||
time.sleep(80)
|
time.sleep(30)
|
||||||
print ">>> out expensive_2"
|
print ">>> out expensive_2"
|
||||||
|
|
||||||
def _check_args(self, cr, uid, ids, context=None):
|
def _check_args(self, cr, uid, ids, context=None):
|
||||||
|
@ -187,10 +181,16 @@ class ir_cron(osv.osv, netsvc.Agent):
|
||||||
# This is really needed if this job run longer that its rescheduling period.
|
# This is really needed if this job run longer that its rescheduling period.
|
||||||
print ">>> advance at", nextcall
|
print ">>> advance at", nextcall
|
||||||
nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
|
nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
|
||||||
self.reschedule_in_advance(self._poolJobs, nextcall, cr.dbname, cr.dbname)
|
with self.thread_count_lock:
|
||||||
|
self.reschedule_in_advance(self._poolJobs, nextcall, cr.dbname, cr.dbname)
|
||||||
finally:
|
finally:
|
||||||
cr.commit()
|
cr.commit()
|
||||||
cr.close()
|
cr.close()
|
||||||
|
with self.thread_count_lock:
|
||||||
|
self.thread_count += 1
|
||||||
|
# reschedule the master thread in advance, using its saved next_call value.
|
||||||
|
self.reschedule_in_advance(self._poolJobs, self.next_call, cr.dbname, cr.dbname)
|
||||||
|
self.next_call = None
|
||||||
|
|
||||||
def _poolJobs(self, db_name):
|
def _poolJobs(self, db_name):
|
||||||
return self._run_jobs(db_name)
|
return self._run_jobs(db_name)
|
||||||
|
@ -210,6 +210,7 @@ class ir_cron(osv.osv, netsvc.Agent):
|
||||||
except:
|
except:
|
||||||
return False
|
return False
|
||||||
print ">>> _run_jobs"
|
print ">>> _run_jobs"
|
||||||
|
self.next_call = None
|
||||||
cr = db.cursor()
|
cr = db.cursor()
|
||||||
try:
|
try:
|
||||||
jobs = {} # mapping job ids to jobs for all jobs being processed.
|
jobs = {} # mapping job ids to jobs for all jobs being processed.
|
||||||
|
@ -217,9 +218,13 @@ class ir_cron(osv.osv, netsvc.Agent):
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
|
cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
|
||||||
for job in cr.dictfetchall():
|
for job in cr.dictfetchall():
|
||||||
|
print ">>>", self.get_thread_count(), "threads"
|
||||||
|
if not self.get_thread_count():
|
||||||
|
break
|
||||||
task_cr = db.cursor()
|
task_cr = db.cursor()
|
||||||
task_job = None
|
task_job = None
|
||||||
jobs[job['id']] = job
|
jobs[job['id']] = job
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Try to lock the job...
|
# Try to lock the job...
|
||||||
task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
|
task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
|
||||||
|
@ -241,6 +246,7 @@ class ir_cron(osv.osv, netsvc.Agent):
|
||||||
task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
|
task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
|
||||||
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
|
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
|
||||||
task_thread.setDaemon(False)
|
task_thread.setDaemon(False)
|
||||||
|
self.dec_thread_count()
|
||||||
task_thread.start()
|
task_thread.start()
|
||||||
|
|
||||||
# Wake up time, without considering the currently processed jobs.
|
# Wake up time, without considering the currently processed jobs.
|
||||||
|
@ -251,22 +257,19 @@ class ir_cron(osv.osv, netsvc.Agent):
|
||||||
next_call = cr.dictfetchone()['min_next_call']
|
next_call = cr.dictfetchone()['min_next_call']
|
||||||
print ">>> possibility at ", next_call
|
print ">>> possibility at ", next_call
|
||||||
|
|
||||||
# Wake up time, taking the smallest processed job nextcall value.
|
|
||||||
for job in jobs.values():
|
|
||||||
nextcall = self._compute_nextcall(job, now)
|
|
||||||
print ">>> or at ", nextcall
|
|
||||||
if not nextcall:
|
|
||||||
continue
|
|
||||||
if not next_call or nextcall < next_call:
|
|
||||||
next_call = nextcall
|
|
||||||
print ">>> rescheduling at", next_call
|
|
||||||
|
|
||||||
if next_call:
|
if next_call:
|
||||||
next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
|
next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
|
||||||
else:
|
else:
|
||||||
next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day
|
next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day
|
||||||
|
|
||||||
self.setAlarm(self._poolJobs, next_call, db_name, db_name)
|
# avoid race condition: the thread rescheduled the main thread, then the main thread puts +3600.
|
||||||
|
with self.thread_count_lock:
|
||||||
|
if not self.thread_count:
|
||||||
|
print ">>> no more threads"
|
||||||
|
self.next_call = next_call
|
||||||
|
next_call = int(time.time()) + 3600 # no available thread, it will run again after 1 day
|
||||||
|
|
||||||
|
self.reschedule_in_advance(self._poolJobs, next_call, db_name, db_name)
|
||||||
|
|
||||||
except Exception, ex:
|
except Exception, ex:
|
||||||
self._logger.warning('Exception in cron:', exc_info=True)
|
self._logger.warning('Exception in cron:', exc_info=True)
|
||||||
|
|
|
@ -285,14 +285,18 @@ class Agent(object):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def reschedule_in_advance(cls, function, timestamp, db_name, *args, **kwargs):
|
def reschedule_in_advance(cls, function, timestamp, db_name, *args, **kwargs):
|
||||||
|
if not timestamp:
|
||||||
|
return
|
||||||
# Cancel the previous task if any.
|
# Cancel the previous task if any.
|
||||||
old_timestamp = None
|
old_timestamp = False
|
||||||
if db_name in cls.__tasks_by_db:
|
if db_name in cls.__tasks_by_db:
|
||||||
for task in cls.__tasks_by_db[db_name]:
|
for task in cls.__tasks_by_db[db_name]:
|
||||||
if task[2] == function and timestamp < task[0]:
|
print ">>> function:", function
|
||||||
old_timestamp = task[0]
|
if task[2] == function and (not task[0] or timestamp < task[0]):
|
||||||
|
old_timestamp = True
|
||||||
task[0] = 0
|
task[0] = 0
|
||||||
if not old_timestamp or timestamp < old_timestamp:
|
if old_timestamp or db_name not in cls.__tasks_by_db or not cls.__tasks_by_db[db_name]:
|
||||||
|
print ">>> rescheduled earlier", timestamp
|
||||||
cls.setAlarm(function, timestamp, db_name, *args, **kwargs)
|
cls.setAlarm(function, timestamp, db_name, *args, **kwargs)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -306,6 +310,7 @@ class Agent(object):
|
||||||
"""
|
"""
|
||||||
current_thread = threading.currentThread()
|
current_thread = threading.currentThread()
|
||||||
while True:
|
while True:
|
||||||
|
print ">>>>> starting thread for"
|
||||||
while cls.__tasks and cls.__tasks[0][0] < time.time():
|
while cls.__tasks and cls.__tasks[0][0] < time.time():
|
||||||
task = heapq.heappop(cls.__tasks)
|
task = heapq.heappop(cls.__tasks)
|
||||||
timestamp, dbname, function, args, kwargs = task
|
timestamp, dbname, function, args, kwargs = task
|
||||||
|
@ -319,6 +324,7 @@ class Agent(object):
|
||||||
task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
|
task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
|
||||||
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
|
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
|
||||||
task_thread.setDaemon(False)
|
task_thread.setDaemon(False)
|
||||||
|
print ">>>>> -", function.func_name
|
||||||
task_thread.start()
|
task_thread.start()
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
time.sleep(60)
|
time.sleep(60)
|
||||||
|
|
Loading…
Reference in New Issue