[IMP] cron: moved netsvc.Agent to openerp.cron.

bzr revid: vmt@openerp.com-20110714143209-bebn6xg91fcrxro9
This commit is contained in:
Vo Minh Thu 2011-07-14 16:32:09 +02:00
parent 32e830eb99
commit 2f115c21aa
4 changed files with 46 additions and 150 deletions

View File

@ -153,7 +153,7 @@ if config["translate_in"]:
if config["stop_after_init"]: if config["stop_after_init"]:
sys.exit(0) sys.exit(0)
openerp.netsvc.start_agent() openerp.cron.start_master_thread()
#---------------------------------------------------------- #----------------------------------------------------------
# Launch Servers # Launch Servers
@ -200,7 +200,7 @@ if os.name == 'posix':
def quit(): def quit():
# stop scheduling new jobs; we will have to wait for the jobs to complete below # stop scheduling new jobs; we will have to wait for the jobs to complete below
openerp.netsvc.Agent.cancel_all() openerp.cron.cancel_all()
openerp.netsvc.Server.quitAll() openerp.netsvc.Server.quitAll()
if config['pidfile']: if config['pidfile']:
os.unlink(config['pidfile']) os.unlink(config['pidfile'])

View File

@ -30,6 +30,7 @@ import tools
from tools.safe_eval import safe_eval as eval from tools.safe_eval import safe_eval as eval
import pooler import pooler
from osv import fields, osv from osv import fields, osv
import openerp
def str2tuple(s): def str2tuple(s):
return eval('tuple(%s)' % (s or '')) return eval('tuple(%s)' % (s or ''))
@ -43,9 +44,8 @@ _intervalTypes = {
'minutes': lambda interval: relativedelta(minutes=interval), 'minutes': lambda interval: relativedelta(minutes=interval),
} }
class ir_cron(osv.osv, netsvc.Agent): class ir_cron(osv.osv):
""" This is the ORM object that periodically executes actions. """ This is the ORM object that periodically executes actions.
Note that we use the netsvc.Agent()._logger member.
""" """
_name = "ir.cron" _name = "ir.cron"
_order = 'name' _order = 'name'
@ -76,16 +76,7 @@ class ir_cron(osv.osv, netsvc.Agent):
'doall' : lambda *a: 1 'doall' : lambda *a: 1
} }
thread_count_lock = threading.Lock() _logger = logging.getLogger('cron')
thread_count = 2 # maximum allowed number of thread.
def get_thread_count(self):
return self.thread_count
def dec_thread_count(self):
self.thread_count_lock.acquire()
self.thread_count -= 1
self.thread_count_lock.release()
def f(a, b, c): def f(a, b, c):
print ">>> in f" print ">>> in f"
@ -160,16 +151,11 @@ class ir_cron(osv.osv, netsvc.Agent):
# This is really needed if this job run longer that its rescheduling period. # This is really needed if this job run longer that its rescheduling period.
print ">>> advance at", nextcall print ">>> advance at", nextcall
nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S')) nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
with self.thread_count_lock: openerp.cron.schedule_in_advance(nextcall, cr.dbname)
self.schedule_in_advance(nextcall, cr.dbname)
finally: finally:
cr.commit() cr.commit()
cr.close() cr.close()
with self.thread_count_lock: openerp.cron.inc_thread_count()
self.thread_count += 1
# reschedule the master thread in advance, using its saved next_call value.
self.schedule_in_advance(self.next_call, cr.dbname)
self.next_call = None
def _run_jobs(self): def _run_jobs(self):
# TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
@ -181,50 +167,45 @@ class ir_cron(osv.osv, netsvc.Agent):
locked to be taken care of by another thread. locked to be taken care of by another thread.
""" """
db_name = self.pool.db.dbname
try:
db, pool = self.pool.db, self.pool
except:
return False
print ">>> _run_jobs" print ">>> _run_jobs"
self.next_call = None db = self.pool.db
cr = db.cursor() cr = db.cursor()
db_name = db.dbname
try: try:
jobs = {} # mapping job ids to jobs for all jobs being processed. jobs = {} # mapping job ids to jobs for all jobs being processed.
if not pool._init: now = datetime.now()
now = datetime.now() cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority') for job in cr.dictfetchall():
for job in cr.dictfetchall(): print ">>>", openerp.cron.get_thread_count(), "threads"
print ">>>", self.get_thread_count(), "threads" if not openerp.cron.get_thread_count():
if not self.get_thread_count(): break
break task_cr = db.cursor()
task_cr = db.cursor() task_job = None
task_job = None jobs[job['id']] = job
jobs[job['id']] = job
try: try:
# Try to lock the job... # Try to lock the job...
task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False) task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
task_job = task_cr.dictfetchall()[0] task_job = task_cr.dictfetchall()[0]
except psycopg2.OperationalError, e: except psycopg2.OperationalError, e:
if e.pgcode == '55P03': if e.pgcode == '55P03':
# Class 55: Object not in prerequisite state, 55P03: lock_not_available # Class 55: Object not in prerequisite state, 55P03: lock_not_available
# ... and fail. # ... and fail.
print ">>>", job['name'], " is already being processed" print ">>>", job['name'], " is already being processed"
continue continue
else: else:
raise raise
finally: finally:
if not task_job: if not task_job:
task_cr.close() task_cr.close()
# ... and succeed. # ... and succeed.
print ">>> taking care of", job['name'] print ">>> taking care of", job['name']
task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now)) task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default) # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
task_thread.setDaemon(False) task_thread.setDaemon(False)
self.dec_thread_count() openerp.cron.dec_thread_count()
task_thread.start() task_thread.start()
# Wake up time, without considering the currently processed jobs. # Wake up time, without considering the currently processed jobs.
if jobs.keys(): if jobs.keys():
@ -239,14 +220,7 @@ class ir_cron(osv.osv, netsvc.Agent):
else: else:
next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day
# avoid race condition: the thread rescheduled the main thread, then the main thread puts +3600. openerp.cron.schedule_in_advance(next_call, db_name)
with self.thread_count_lock:
if not self.thread_count:
print ">>> no more threads"
self.next_call = next_call
next_call = int(time.time()) + 3600 # no available thread, it will run again after 1 day
self.schedule_in_advance(next_call, db_name)
except Exception, ex: except Exception, ex:
self._logger.warning('Exception in cron:', exc_info=True) self._logger.warning('Exception in cron:', exc_info=True)
@ -261,9 +235,9 @@ class ir_cron(osv.osv, netsvc.Agent):
self.restart(self, dbname) self.restart(self, dbname)
def restart(self, dbname): def restart(self, dbname):
self.cancel(dbname) openerp.cron.cancel(dbname)
# Reschedule cron processing job asap, but not in the current thread # Reschedule cron processing job asap, but not in the current thread
self.schedule_in_advance(time.time(), dbname) openerp.cron.schedule_in_advance(time.time(), dbname)
def update_running_cron(self, cr): def update_running_cron(self, cr):
# Verify whether the server is already started and thus whether we need to commit # Verify whether the server is already started and thus whether we need to commit

View File

@ -25,7 +25,7 @@
import openerp.sql_db import openerp.sql_db
import openerp.osv.orm import openerp.osv.orm
import openerp.netsvc import openerp.cron
import openerp.tools import openerp.tools
@ -161,14 +161,14 @@ class RegistryManager(object):
cancels the associated cron job. But please note that the cron job can cancels the associated cron job. But please note that the cron job can
be running and take some time before ending, and that you should not be running and take some time before ending, and that you should not
remove a registry if it can still be used by some thread. So it might remove a registry if it can still be used by some thread. So it might
be necessary to call yourself openerp.netsvc.Agent.cancel(db_name) and be necessary to call yourself openerp.cron.Agent.cancel(db_name) and
and join (i.e. wait for) the thread. and join (i.e. wait for) the thread.
""" """
if db_name in cls.registries: if db_name in cls.registries:
del cls.registries[db_name] del cls.registries[db_name]
openerp.tools.cache.clean_caches_for_db(db_name) openerp.tools.cache.clean_caches_for_db(db_name)
openerp.netsvc.Agent.cancel(db_name) openerp.cron.cancel(db_name)
@classmethod @classmethod

View File

@ -21,7 +21,6 @@
############################################################################## ##############################################################################
import errno import errno
import heapq
import logging import logging
import logging.handlers import logging.handlers
import os import os
@ -245,83 +244,6 @@ def init_alternative_logger():
logger.addHandler(handler) logger.addHandler(handler)
logger.setLevel(logging.ERROR) logger.setLevel(logging.ERROR)
class Agent(object):
""" Singleton that keeps track of cancellable tasks to run at a given
timestamp.
The tasks are characterised by:
* a timestamp
* the database on which the task run
* a boolean attribute specifying if the task is canceled
Implementation details:
- Tasks are stored as list, allowing the cancellation by setting
the boolean to True.
- A heapq is used to store tasks, so we don't need to sort
tasks ourself.
"""
_wakeups = []
_wakeup_by_db = {}
_logger = logging.getLogger('netsvc.agent')
@classmethod
def cancel(cls, db_name):
""" Cancel next wakeup for a given database. """
cls._logger.debug("Cancel next wake-up for database '%s'.", db_name)
if db_name in cls._wakeup_by_db:
cls._wakeup_by_db[db_name][2] = True
@classmethod
def cancel_all(cls):
cls._wakeups = []
cls._wakeup_by_db = {}
@classmethod
def schedule_in_advance(cls, timestamp, db_name):
if not timestamp:
return
# Cancel the previous wakeup if any.
add_wakeup = False
if db_name in cls._wakeup_by_db:
task = cls._wakeup_by_db[db_name]
if task[2] or timestamp < task[0]:
add_wakeup = True
task[2] = True
else:
add_wakeup = True
if add_wakeup:
print ">>> rescheduled earlier", timestamp
task = [timestamp, db_name, False]
heapq.heappush(cls._wakeups, task)
cls._wakeup_by_db[db_name] = task
@classmethod
def runner(cls):
"""Neverending function (intended to be ran in a dedicated thread) that
checks every 60 seconds tasks to run. TODO: make configurable
"""
while True:
print ">>>>> cron for"
while cls._wakeups and cls._wakeups[0][0] < time.time():
task = heapq.heappop(cls._wakeups)
timestamp, db_name, canceled = task
del cls._wakeup_by_db[db_name]
if canceled:
continue
ir_cron = openerp.pooler.get_pool(db_name).get('ir.cron')
ir_cron._run_jobs()
time.sleep(60)
def start_agent():
agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
# the agent runner is a typical daemon thread, that will never quit and must be
# terminated when the main process exits - with no consequence (the processing
# threads it spawns are not marked daemon)
agent_runner.setDaemon(True)
agent_runner.start()
import traceback import traceback
class Server: class Server: