[MERGE] multi-process/multi-threaded ir.cron implementation

bzr revid: odo@openerp.com-20110929002157-31pgkmqc96by15ak
This commit is contained in:
Olivier Dony 2011-09-29 02:21:57 +02:00
commit b971680a53
16 changed files with 669 additions and 171 deletions

View File

@ -89,15 +89,17 @@ def setup_pid_file():
def preload_registry(dbname):
""" Preload a registry, and start the cron."""
try:
db, pool = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
pool.get('ir.cron').restart(db.dbname)
db, registry = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
# jobs will start to be processed later, when openerp.cron.start_master_thread() is called by openerp.service.start_services()
registry.schedule_cron_jobs()
except Exception:
logging.exception('Failed to initialize database `%s`.', dbname)
def run_test_file(dbname, test_file):
""" Preload a registry, possibly run a test file, and start the cron."""
try:
db, pool = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
db, registry = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
cr = db.cursor()
logger = logging.getLogger('server')
logger.info('loading test file %s', test_file)

View File

@ -24,6 +24,7 @@ import module
import res
import publisher_warranty
import report
import test
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -92,6 +92,10 @@
'test/test_osv_expression.yml',
'test/test_ir_rule.yml', # <-- These tests modify/add/delete ir_rules.
'test/test_ir_values.yml',
# Commented because this takes some time.
# This must be (un)commented with the corresponding import statement
# in test/__init__.py.
# 'test/test_ir_cron.yml', # <-- These tests perform a roolback.
],
'installable': True,
'active': True,

View File

@ -21,13 +21,20 @@
import time
import logging
import threading
import psycopg2
from datetime import datetime
from dateutil.relativedelta import relativedelta
import netsvc
import tools
from tools.safe_eval import safe_eval as eval
import openerp
import pooler
import tools
from openerp.cron import WAKE_UP_NOW
from osv import fields, osv
from tools import DEFAULT_SERVER_DATETIME_FORMAT
from tools.safe_eval import safe_eval as eval
from tools.translate import _
def str2tuple(s):
return eval('tuple(%s)' % (s or ''))
@ -41,10 +48,15 @@ _intervalTypes = {
'minutes': lambda interval: relativedelta(minutes=interval),
}
class ir_cron(osv.osv, netsvc.Agent):
""" This is the ORM object that periodically executes actions.
Note that we use the netsvc.Agent()._logger member.
class ir_cron(osv.osv):
""" Model describing cron jobs (also called actions or tasks).
"""
# TODO: perhaps in the future we could consider a flag on ir.cron jobs
# that would cause database wake-up even if the database has not been
# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
# See also openerp.cron
_name = "ir.cron"
_order = 'name'
_columns = {
@ -54,17 +66,17 @@ class ir_cron(osv.osv, netsvc.Agent):
'interval_number': fields.integer('Interval Number',help="Repeat every x."),
'interval_type': fields.selection( [('minutes', 'Minutes'),
('hours', 'Hours'), ('work_days','Work Days'), ('days', 'Days'),('weeks', 'Weeks'), ('months', 'Months')], 'Interval Unit'),
'numbercall': fields.integer('Number of Calls', help='Number of time the function is called,\na negative number indicates no limit'),
'doall' : fields.boolean('Repeat Missed', help="Enable this if you want to execute missed occurences as soon as the server restarts."),
'nextcall' : fields.datetime('Next Execution Date', required=True, help="Next planned execution date for this scheduler"),
'model': fields.char('Object', size=64, help="Name of object whose function will be called when this scheduler will run. e.g. 'res.partener'"),
'function': fields.char('Function', size=64, help="Name of the method to be called on the object when this scheduler is executed."),
'args': fields.text('Arguments', help="Arguments to be passed to the method. e.g. (uid,)"),
'priority': fields.integer('Priority', help='0=Very Urgent\n10=Not urgent')
'numbercall': fields.integer('Number of Calls', help='How many times the method is called,\na negative number indicates no limit.'),
'doall' : fields.boolean('Repeat Missed', help="Specify if missed occurrences should be executed when the server restarts."),
'nextcall' : fields.datetime('Next Execution Date', required=True, help="Next planned execution date for this job."),
'model': fields.char('Object', size=64, help="Model name on which the method to be called is located, e.g. 'res.partner'."),
'function': fields.char('Method', size=64, help="Name of the method to be called when this job is processed."),
'args': fields.text('Arguments', help="Arguments to be passed to the method, e.g. (uid,)."),
'priority': fields.integer('Priority', help='The priority of the job, as an integer: 0 means higher priority, 10 means lower priority.')
}
_defaults = {
'nextcall' : lambda *a: time.strftime('%Y-%m-%d %H:%M:%S'),
'nextcall' : lambda *a: time.strftime(DEFAULT_SERVER_DATETIME_FORMAT),
'priority' : lambda *a: 5,
'user_id' : lambda obj,cr,uid,context: uid,
'interval_number' : lambda *a: 1,
@ -74,6 +86,8 @@ class ir_cron(osv.osv, netsvc.Agent):
'doall' : lambda *a: 1
}
_logger = logging.getLogger('cron')
def _check_args(self, cr, uid, ids, context=None):
try:
for this in self.browse(cr, uid, ids, context):
@ -86,68 +100,164 @@ class ir_cron(osv.osv, netsvc.Agent):
(_check_args, 'Invalid arguments', ['args']),
]
def _handle_callback_exception(self, cr, uid, model, func, args, job_id, job_exception):
cr.rollback()
logger=logging.getLogger('cron')
logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model, func, args, job_id))
def _handle_callback_exception(self, cr, uid, model_name, method_name, args, job_id, job_exception):
""" Method called when an exception is raised by a job.
def _callback(self, cr, uid, model, func, args, job_id):
Simply logs the exception and rollback the transaction.
:param model_name: model name on which the job method is located.
:param method_name: name of the method to call when this job is processed.
:param args: arguments of the method (without the usual self, cr, uid).
:param job_id: job id.
:param job_exception: exception raised by the job.
"""
cr.rollback()
self._logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model_name, method_name, args, job_id))
def _callback(self, cr, uid, model_name, method_name, args, job_id):
""" Run the method associated to a given job
It takes care of logging and exception handling.
:param model_name: model name on which the job method is located.
:param method_name: name of the method to call when this job is processed.
:param args: arguments of the method (without the usual self, cr, uid).
:param job_id: job id.
"""
args = str2tuple(args)
m = self.pool.get(model)
if m and hasattr(m, func):
f = getattr(m, func)
model = self.pool.get(model_name)
if model and hasattr(model, method_name):
method = getattr(model, method_name)
try:
netsvc.log('cron', (cr.dbname,uid,'*',model,func)+tuple(args), channel=logging.DEBUG,
netsvc.log('cron', (cr.dbname,uid,'*',model_name,method_name)+tuple(args), channel=logging.DEBUG,
depth=(None if self._logger.isEnabledFor(logging.DEBUG_RPC_ANSWER) else 1), fn='object.execute')
logger = logging.getLogger('execution time')
if logger.isEnabledFor(logging.DEBUG):
start_time = time.time()
f(cr, uid, *args)
method(cr, uid, *args)
if logger.isEnabledFor(logging.DEBUG):
end_time = time.time()
logger.log(logging.DEBUG, '%.3fs (%s, %s)' % (end_time - start_time, model, func))
logger.log(logging.DEBUG, '%.3fs (%s, %s)' % (end_time - start_time, model_name, method_name))
except Exception, e:
self._handle_callback_exception(cr, uid, model, func, args, job_id, e)
self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
def _poolJobs(self, db_name, check=False):
def _run_job(self, cr, job, now):
""" Run a given job taking care of the repetition.
The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
method is run in a worker thread (spawned by _run_jobs_multithread())).
:param job: job to be run (as a dictionary).
:param now: timestamp (result of datetime.now(), no need to call it multiple time).
"""
try:
db, pool = pooler.get_db_and_pool(db_name)
except:
return False
nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
numbercall = job['numbercall']
ok = False
while nextcall < now and numbercall:
if numbercall > 0:
numbercall -= 1
if not ok or job['doall']:
self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
if numbercall:
nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
ok = True
addsql = ''
if not numbercall:
addsql = ', active=False'
cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
(nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
if numbercall:
# Reschedule our own main cron thread if necessary.
# This is really needed if this job runs longer than its rescheduling period.
nextcall = time.mktime(nextcall.timetuple())
openerp.cron.schedule_wakeup(nextcall, cr.dbname)
finally:
cr.commit()
cr.close()
openerp.cron.release_thread_slot()
def _run_jobs_multithread(self):
# TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
""" Process the cron jobs by spawning worker threads.
This selects in database all the jobs that should be processed. It then
tries to lock each of them and, if it succeeds, spawns a thread to run
the cron job (if it doesn't succeed, it means the job was already
locked to be taken care of by another thread).
The cursor used to lock the job in database is given to the worker
thread (which has to close it itself).
"""
db = self.pool.db
cr = db.cursor()
db_name = db.dbname
try:
if not pool._init:
now = datetime.now()
cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
for job in cr.dictfetchall():
nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
numbercall = job['numbercall']
jobs = {} # mapping job ids to jobs for all jobs being processed.
now = datetime.now()
# Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
cr.execute("""SELECT * FROM ir_cron
WHERE numbercall != 0
AND active AND nextcall <= (now() at time zone 'UTC')
ORDER BY priority""")
for job in cr.dictfetchall():
if not openerp.cron.get_thread_slots():
break
jobs[job['id']] = job
ok = False
while nextcall < now and numbercall:
if numbercall > 0:
numbercall -= 1
if not ok or job['doall']:
self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
if numbercall:
nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
ok = True
addsql = ''
if not numbercall:
addsql = ', active=False'
cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
cr.commit()
task_cr = db.cursor()
try:
# Try to grab an exclusive lock on the job row from within the task transaction
acquired_lock = False
task_cr.execute("""SELECT *
FROM ir_cron
WHERE id=%s
FOR UPDATE NOWAIT""",
(job['id'],), log_exceptions=False)
acquired_lock = True
except psycopg2.OperationalError, e:
if e.pgcode == '55P03':
# Class 55: Object not in prerequisite state; 55P03: lock_not_available
self._logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
continue
else:
# Unexpected OperationalError
raise
finally:
if not acquired_lock:
# we're exiting due to an exception while acquiring the lot
task_cr.close()
# Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
task_thread.setDaemon(False)
openerp.cron.take_thread_slot()
task_thread.start()
self._logger.debug('Cron execution thread for job `%s` spawned', job['name'])
cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
next_call = cr.dictfetchone()['min_next_call']
if next_call:
next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
# Find next earliest job ignoring currently processed jobs (by this and other cron threads)
find_next_time_query = """SELECT min(nextcall) AS min_next_call
FROM ir_cron WHERE numbercall != 0 AND active"""
if jobs:
cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
else:
next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day
cr.execute(find_next_time_query)
next_call = cr.dictfetchone()['min_next_call']
if not check:
self.setAlarm(self._poolJobs, next_call, db_name, db_name)
if next_call:
next_call = time.mktime(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
else:
# no matching cron job found in database, re-schedule arbitrarily in 1 day,
# this delay will likely be modified when running jobs complete their tasks
next_call = time.time() + (24*3600)
openerp.cron.schedule_wakeup(next_call, db_name)
except Exception, ex:
self._logger.warning('Exception in cron:', exc_info=True)
@ -156,12 +266,8 @@ class ir_cron(osv.osv, netsvc.Agent):
cr.commit()
cr.close()
def restart(self, dbname):
self.cancel(dbname)
# Reschedule cron processing job asap, but not in the current thread
self.setAlarm(self._poolJobs, time.time(), dbname, dbname)
def update_running_cron(self, cr):
""" Schedule as soon as possible a wake-up for this database. """
# Verify whether the server is already started and thus whether we need to commit
# immediately our changes and restart the cron agent in order to apply the change
# immediately. The commit() is needed because as soon as the cron is (re)started it
@ -171,23 +277,37 @@ class ir_cron(osv.osv, netsvc.Agent):
# when the server is only starting or loading modules (hence the test on pool._init).
if not self.pool._init:
cr.commit()
self.restart(cr.dbname)
openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
def _try_lock(self, cr, uid, ids, context=None):
"""Try to grab a dummy exclusive write-lock to the rows with the given ids,
to make sure a following write() or unlink() will not block due
to a process currently executing those cron tasks"""
try:
cr.execute("""SELECT id FROM "%s" WHERE id IN %%s FOR UPDATE NOWAIT""" % self._table,
(tuple(ids),), log_exceptions=False)
except psycopg2.OperationalError:
cr.rollback() # early rollback to allow translations to work for the user feedback
raise osv.except_osv(_("Record cannot be modified right now"),
_("This cron task is currently being executed and may not be modified, "
"please try again in a few minutes"))
def create(self, cr, uid, vals, context=None):
res = super(ir_cron, self).create(cr, uid, vals, context=context)
self.update_running_cron(cr)
return res
def write(self, cr, user, ids, vals, context=None):
res = super(ir_cron, self).write(cr, user, ids, vals, context=context)
def write(self, cr, uid, ids, vals, context=None):
self._try_lock(cr, uid, ids, context)
res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
self.update_running_cron(cr)
return res
def unlink(self, cr, uid, ids, context=None):
self._try_lock(cr, uid, ids, context)
res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
self.update_running_cron(cr)
return res
ir_cron()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
##############################################################################
#
# OpenERP, Open Source Management Solution
# Copyright (C) 2011-TODAY OpenERP S.A. <http://www.openerp.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
# Useful for manual testing of cron jobs scheduling.
# This must be (un)commented with the corresponding yml file
# in ../__openerp__.py.
# import test_ir_cron
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
##############################################################################
#
# OpenERP, Open Source Management Solution
# Copyright (C) 2011-TODAY OpenERP S.A. <http://www.openerp.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
import time
from datetime import datetime
from dateutil.relativedelta import relativedelta
import openerp
JOB = {
'function': u'_0_seconds',
'interval_type': u'minutes',
'user_id': 1,
'name': u'test',
'args': False,
'numbercall': 1,
'nextcall': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'priority': 5,
'doall': True,
'active': True,
'interval_number': 1,
'model': u'ir.cron'
}
class test_ir_cron(openerp.osv.osv.osv):
""" Add a few handy methods to test cron jobs scheduling. """
_inherit = "ir.cron"
def _0_seconds(a, b, c):
print ">>> _0_seconds"
def _20_seconds(self, cr, uid):
print ">>> in _20_seconds"
time.sleep(20)
print ">>> out _20_seconds"
def _80_seconds(self, cr, uid):
print ">>> in _80_seconds"
time.sleep(80)
print ">>> out _80_seconds"
def test_0(self, cr, uid):
now = datetime.now()
t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
t2 = (now + relativedelta(minutes=1, seconds=5)).strftime('%Y-%m-%d %H:%M:%S')
t3 = (now + relativedelta(minutes=1, seconds=10)).strftime('%Y-%m-%d %H:%M:%S')
self.create(cr, uid, dict(JOB, name='test_0 _20_seconds A', function='_20_seconds', nextcall=t1))
self.create(cr, uid, dict(JOB, name='test_0 _20_seconds B', function='_20_seconds', nextcall=t2))
self.create(cr, uid, dict(JOB, name='test_0 _20_seconds C', function='_20_seconds', nextcall=t3))
def test_1(self, cr, uid):
now = datetime.now()
t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
self.create(cr, uid, dict(JOB, name='test_1 _20_seconds * 3', function='_20_seconds', nextcall=t1, numbercall=3))
def test_2(self, cr, uid):
now = datetime.now()
t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
self.create(cr, uid, dict(JOB, name='test_2 _80_seconds * 2', function='_80_seconds', nextcall=t1, numbercall=2))
def test_3(self, cr, uid):
now = datetime.now()
t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
t2 = (now + relativedelta(minutes=1, seconds=5)).strftime('%Y-%m-%d %H:%M:%S')
t3 = (now + relativedelta(minutes=1, seconds=10)).strftime('%Y-%m-%d %H:%M:%S')
self.create(cr, uid, dict(JOB, name='test_3 _80_seconds A', function='_80_seconds', nextcall=t1))
self.create(cr, uid, dict(JOB, name='test_3 _20_seconds B', function='_20_seconds', nextcall=t2))
self.create(cr, uid, dict(JOB, name='test_3 _20_seconds C', function='_20_seconds', nextcall=t3))
# This test assumes 4 cron threads.
def test_00(self, cr, uid):
self.test_00_set = set()
now = datetime.now()
t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
t2 = (now + relativedelta(minutes=1, seconds=5)).strftime('%Y-%m-%d %H:%M:%S')
t3 = (now + relativedelta(minutes=1, seconds=10)).strftime('%Y-%m-%d %H:%M:%S')
self.create(cr, uid, dict(JOB, name='test_00 _20_seconds_A', function='_20_seconds_A', nextcall=t1))
self.create(cr, uid, dict(JOB, name='test_00 _20_seconds_B', function='_20_seconds_B', nextcall=t2))
self.create(cr, uid, dict(JOB, name='test_00 _20_seconds_C', function='_20_seconds_C', nextcall=t3))
def _expect(self, cr, uid, to_add, to_sleep, to_expect_in, to_expect_out):
assert self.test_00_set == to_expect_in
self.test_00_set.add(to_add)
time.sleep(to_sleep)
self.test_00_set.discard(to_add)
assert self.test_00_set == to_expect_out
def _20_seconds_A(self, cr, uid):
self._expect(cr, uid, 'A', 20, set(), set(['B', 'C']))
def _20_seconds_B(self, cr, uid):
self._expect(cr, uid, 'B', 20, set('A'), set('C'))
def _20_seconds_C(self, cr, uid):
self._expect(cr, uid, 'C', 20, set(['A', 'B']), set())
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -0,0 +1,61 @@
-
Test the cron jobs scheduling.
-
Disable the existing cron jobs if any during the tests.
-
!python {model: ir.cron }: |
# For this test to work, as it involves multiple database cursors,
# we have to commit changes. But YAML tests must be rollbacked, so
# the final database state is left untouched. So we have to be a bit
# ugly here: use our own cursor, commit, and clean after ourselves.
# We also pass around some ids using setattr/delattr, and we have to
# rollback the previous tests otherwise we won't be able to touch the
# db.
# Well, this should probably be a standalone, or regular unit test,
# instead of using the YAML infrastructure.
cr.rollback()
our_cr = self.pool.db.cursor()
try:
ids = self.search(our_cr, uid, [], {})
setattr(self, 'saved_ids', ids)
self.write(our_cr, uid, ids, {'active': False}, {})
our_cr.commit()
finally:
our_cr.close()
-
Three concurrent jobs started with a slight time gap. Assume 4 cron threads.
This will take about 2 minutes.
-
!python {model: ir.cron }: |
# Pretend initialization is already done. We the use a try/finally
# to reset _init correctly.
self.pool._init = False
our_cr = self.pool.db.cursor()
try:
self.test_00(our_cr, uid) # this will commit using the passed cursor
import openerp.cron
openerp.cron._thread_slots = 4
# Wake up this db as soon as the master cron thread starts.
openerp.cron.schedule_wakeup(1, self.pool.db.dbname)
# Pretend to be the master thread, for 4 iterations.
openerp.cron.runner_body()
openerp.cron.runner_body()
openerp.cron.runner_body()
openerp.cron.runner_body()
finally:
self.pool._init = True
our_cr.close()
-
Clean after ourselves.
-
!python {model: ir.cron }: |
our_cr = self.pool.db.cursor()
try:
ids = [x for x in self.search(our_cr, uid, ['|', ('active', '=', True), ('active', '=', False)], {}) if x not in self.saved_ids]
self.unlink(our_cr, uid, ids, {})
ids = self.saved_ids
delattr(self, 'saved_ids')
self.write(our_cr, uid, ids, {'active': True}, {})
our_cr.commit()
finally:
our_cr.close()

View File

@ -35,6 +35,10 @@ must be used.
import deprecation
# Maximum number of threads processing concurrently cron jobs.
max_cron_threads = 4 # Actually the default value here is meaningless,
# look at tools.config for the default value.
# Paths to search for OpenERP addons.
addons_paths = []

212
openerp/cron.py Normal file
View File

@ -0,0 +1,212 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##############################################################################
#
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
""" Cron jobs scheduling
Cron jobs are defined in the ir_cron table/model. This module deals with all
cron jobs, for all databases of a single OpenERP server instance.
It defines a single master thread that will spawn (a bounded number of)
threads to process individual cron jobs.
The thread runs forever, checking every 60 seconds for new
'database wake-ups'. It maintains a heapq of database wake-ups. At each
wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread
will check the jobs defined in the ir_cron table and spawn accordingly threads
to process them.
This module's behavior depends on the following configuration variable:
openerp.conf.max_cron_threads.
"""
import heapq
import logging
import threading
import time
import openerp
import tools
# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
# the context of the cron management. This is not originally about loading
# a database, although having the database name in the queue will
# cause it to be loaded when the schedule time is reached, even if it was
# unloaded in the mean time. Normally a database's wake-up is cancelled by
# the RegistryManager when the database is unloaded - so this should not
# cause it to be reloaded.
#
# TODO: perhaps in the future we could consider a flag on ir.cron jobs
# that would cause database wake-up even if the database has not been
# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
#
# Each element is a triple (timestamp, database-name, boolean). The boolean
# specifies if the wake-up is canceled (so a wake-up can be canceled without
# relying on the heapq implementation detail; no need to remove the job from
# the heapq).
_wakeups = []
# Mapping of database names to the wake-up defined in the heapq,
# so that we can cancel the wake-up without messing with the heapq
# invariant: lookup the wake-up by database-name, then set
# its third element to True.
_wakeup_by_db = {}
# Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
# We could use a simple (non-reentrant) lock if the runner function below
# was more fine-grained, but we are fine with the loop owning the lock
# while spawning a few threads.
_wakeups_lock = threading.RLock()
# Maximum number of threads allowed to process cron jobs concurrently. This
# variable is set by start_master_thread using openerp.conf.max_cron_threads.
_thread_slots = None
# A (non re-entrant) lock to protect the above _thread_slots variable.
_thread_slots_lock = threading.Lock()
_logger = logging.getLogger('cron')
# Sleep duration limits - must not loop too quickly, but can't sleep too long
# either, because a new job might be inserted in ir_cron with a much sooner
# execution date than current known ones. We won't see it until we wake!
MAX_SLEEP = 60 # 1 min
MIN_SLEEP = 1 # 1 sec
# Dummy wake-up timestamp that can be used to force a database wake-up asap
WAKE_UP_NOW = 1
def get_thread_slots():
""" Return the number of available thread slots """
return _thread_slots
def release_thread_slot():
""" Increment the number of available thread slots """
global _thread_slots
with _thread_slots_lock:
_thread_slots += 1
def take_thread_slot():
""" Decrement the number of available thread slots """
global _thread_slots
with _thread_slots_lock:
_thread_slots -= 1
def cancel(db_name):
""" Cancel the next wake-up of a given database, if any.
:param db_name: database name for which the wake-up is canceled.
"""
_logger.debug("Cancel next wake-up for database '%s'.", db_name)
with _wakeups_lock:
if db_name in _wakeup_by_db:
_wakeup_by_db[db_name][2] = True
def cancel_all():
""" Cancel all database wake-ups. """
_logger.debug("Cancel all database wake-ups")
global _wakeups
global _wakeup_by_db
with _wakeups_lock:
_wakeups = []
_wakeup_by_db = {}
def schedule_wakeup(timestamp, db_name):
""" Schedule a new wake-up for a database.
If an earlier wake-up is already defined, the new wake-up is discarded.
If another wake-up is defined, that wake-up is discarded and the new one
is scheduled.
:param db_name: database name for which a new wake-up is scheduled.
:param timestamp: when the wake-up is scheduled.
"""
if not timestamp:
return
with _wakeups_lock:
if db_name in _wakeup_by_db:
task = _wakeup_by_db[db_name]
if not task[2] and timestamp > task[0]:
# existing wakeup is valid and occurs earlier than new one
return
task[2] = True # cancel existing task
task = [timestamp, db_name, False]
heapq.heappush(_wakeups, task)
_wakeup_by_db[db_name] = task
_logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
'NOW' if timestamp == WAKE_UP_NOW else timestamp)
def runner():
"""Neverending function (intended to be run in a dedicated thread) that
checks every 60 seconds the next database wake-up. TODO: make configurable
"""
while True:
runner_body()
def runner_body():
with _wakeups_lock:
while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
task = heapq.heappop(_wakeups)
timestamp, db_name, canceled = task
if canceled:
continue
del _wakeup_by_db[db_name]
registry = openerp.pooler.get_pool(db_name)
if not registry._init:
_logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name)
registry['ir.cron']._run_jobs_multithread()
amount = MAX_SLEEP
with _wakeups_lock:
# Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
if _wakeups and get_thread_slots():
amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
_logger.debug("Going to sleep for %ss", amount)
time.sleep(amount)
def start_master_thread():
""" Start the above runner function in a daemon thread.
The thread is a typical daemon thread: it will never quit and must be
terminated when the main process exits - with no consequence (the processing
threads it spawns are not marked daemon).
"""
global _thread_slots
_thread_slots = openerp.conf.max_cron_threads
db_maxconn = tools.config['db_maxconn']
if _thread_slots >= tools.config.get('db_maxconn', 64):
_logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
"this may cause trouble if you reach that number of parallel cron tasks.",
db_maxconn, _thread_slots)
t = threading.Thread(target=runner, name="openerp.cron.master_thread")
t.setDaemon(True)
t.start()
_logger.debug("Master cron daemon started!")
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -28,6 +28,8 @@ import logging
import openerp.sql_db
import openerp.osv.orm
import openerp.cron
import openerp.tools
import openerp.modules.db
import openerp.tools.config
@ -96,9 +98,17 @@ class Registry(object):
return res
def schedule_cron_jobs(self):
""" Make the cron thread care about this registry/database jobs.
This will initiate the cron thread to check for any pending jobs for
this registry/database as soon as possible. Then it will continuously
monitor the ir.cron model for future jobs. See openerp.cron for
details.
"""
openerp.cron.schedule_wakeup(openerp.cron.WAKE_UP_NOW, self.db.dbname)
def clear_caches(self):
""" Clear the caches
This clears the caches associated to methods decorated with
``tools.ormcache`` or ``tools.ormcache_multi`` for all the models.
"""
@ -112,13 +122,11 @@ class RegistryManager(object):
registries (essentially database connection/model registry pairs).
"""
# Mapping between db name and model registry.
# Accessed through the methods below.
registries = {}
registries_lock = threading.RLock()
@classmethod
def get(cls, db_name, force_demo=False, status=None, update_module=False,
pooljobs=True):
@ -131,7 +139,6 @@ class RegistryManager(object):
update_module, pooljobs)
return registry
@classmethod
def new(cls, db_name, force_demo=False, status=None,
update_module=False, pooljobs=True):
@ -166,22 +173,38 @@ class RegistryManager(object):
cr.close()
if pooljobs:
registry.get('ir.cron').restart(registry.db.dbname)
registry.schedule_cron_jobs()
return registry
@classmethod
def delete(cls, db_name):
""" Delete the registry linked to a given database. """
"""Delete the registry linked to a given database.
This also cleans the associated caches. For good measure this also
cancels the associated cron job. But please note that the cron job can
be running and take some time before ending, and that you should not
remove a registry if it can still be used by some thread. So it might
be necessary to call yourself openerp.cron.Agent.cancel(db_name) and
and join (i.e. wait for) the thread.
"""
with cls.registries_lock:
if db_name in cls.registries:
cls.registries[db_name].clear_caches()
del cls.registries[db_name]
openerp.cron.cancel(db_name)
@classmethod
def delete_all(cls):
"""Delete all the registries. """
with cls.registries_lock:
for db_name in cls.registries.keys():
cls.delete(db_name)
@classmethod
def clear_caches(cls, db_name):
""" Clear the caches
"""Clear caches
This clears the caches associated to methods decorated with
``tools.ormcache`` or ``tools.ormcache_multi`` for all the models
@ -195,4 +218,4 @@ class RegistryManager(object):
cls.registries[db_name].clear_caches()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -21,7 +21,6 @@
##############################################################################
import errno
import heapq
import logging
import logging.handlers
import os
@ -31,13 +30,14 @@ import socket
import sys
import threading
import time
import traceback
import types
from pprint import pformat
# TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
from loglevels import *
import tools
import openerp.exceptions
import openerp
def close_socket(sock):
""" Closes a socket instance cleanly
@ -211,83 +211,6 @@ def init_alternative_logger():
logger.addHandler(handler)
logger.setLevel(logging.ERROR)
class Agent(object):
""" Singleton that keeps track of cancellable tasks to run at a given
timestamp.
The tasks are characterised by:
* a timestamp
* the database on which the task run
* the function to call
* the arguments and keyword arguments to pass to the function
Implementation details:
- Tasks are stored as list, allowing the cancellation by setting
the timestamp to 0.
- A heapq is used to store tasks, so we don't need to sort
tasks ourself.
"""
__tasks = []
__tasks_by_db = {}
_logger = logging.getLogger('netsvc.agent')
@classmethod
def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
task = [timestamp, db_name, function, args, kwargs]
heapq.heappush(cls.__tasks, task)
cls.__tasks_by_db.setdefault(db_name, []).append(task)
@classmethod
def cancel(cls, db_name):
"""Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
cls._logger.debug("Cancel timers for %s db", db_name or 'all')
if db_name is None:
cls.__tasks, cls.__tasks_by_db = [], {}
else:
if db_name in cls.__tasks_by_db:
for task in cls.__tasks_by_db[db_name]:
task[0] = 0
@classmethod
def quit(cls):
cls.cancel(None)
@classmethod
def runner(cls):
"""Neverending function (intended to be ran in a dedicated thread) that
checks every 60 seconds tasks to run. TODO: make configurable
"""
current_thread = threading.currentThread()
while True:
while cls.__tasks and cls.__tasks[0][0] < time.time():
task = heapq.heappop(cls.__tasks)
timestamp, dbname, function, args, kwargs = task
cls.__tasks_by_db[dbname].remove(task)
if not timestamp:
# null timestamp -> cancelled task
continue
current_thread.dbname = dbname # hack hack
cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
delattr(current_thread, 'dbname')
task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
task_thread.setDaemon(False)
task_thread.start()
time.sleep(1)
time.sleep(60)
def start_agent():
agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
# the agent runner is a typical daemon thread, that will never quit and must be
# terminated when the main process exits - with no consequence (the processing
# threads it spawns are not marked daemon)
agent_runner.setDaemon(True)
agent_runner.start()
import traceback
class Server:
""" Generic interface for all servers with an event loop etc.
Override this to impement http, net-rpc etc. servers.

View File

@ -34,11 +34,6 @@ def get_db_and_pool(db_name, force_demo=False, status=None, update_module=False,
return registry.db, registry
def delete_pool(db_name):
"""Delete an existing registry."""
RegistryManager.delete(db_name)
def restart_pool(db_name, force_demo=False, status=None, update_module=False):
"""Delete an existing registry and return a database connection and a newly initialized registry."""
registry = RegistryManager.new(db_name, force_demo, status, update_module, True)

View File

@ -28,6 +28,8 @@ import netrpc_server
import web_services
import websrv_lib
import openerp.cron
import openerp.modules
import openerp.netsvc
import openerp.osv
import openerp.tools
@ -61,7 +63,7 @@ def start_services():
netrpc_server.init_servers()
# Start the main cron thread.
openerp.netsvc.start_agent()
openerp.cron.start_master_thread()
# Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
openerp.netsvc.Server.startAll()
@ -72,7 +74,9 @@ def start_services():
def stop_services():
""" Stop all services. """
openerp.netsvc.Agent.quit()
# stop scheduling new jobs; we will have to wait for the jobs to complete below
openerp.cron.cancel_all()
openerp.netsvc.Server.quitAll()
openerp.wsgi.stop_server()
config = openerp.tools.config
@ -92,6 +96,8 @@ def stop_services():
thread.join(0.05)
time.sleep(0.05)
openerp.modules.registry.RegistryManager.delete_all()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -167,9 +167,8 @@ class db(netsvc.ExportService):
raise Exception, e
def exp_drop(self, db_name):
openerp.modules.registry.RegistryManager.delete(db_name)
sql_db.close_db(db_name)
openerp.modules.registry.RegistryManager.clear_caches(db_name)
openerp.netsvc.Agent.cancel(db_name)
logger = netsvc.Logger()
db = sql_db.db_connect('template1')
@ -271,9 +270,8 @@ class db(netsvc.ExportService):
return True
def exp_rename(self, old_name, new_name):
openerp.modules.registry.RegistryManager.delete(old_name)
sql_db.close_db(old_name)
openerp.modules.registry.RegistryManager.clear_caches(old_name)
openerp.netsvc.Agent.cancel(old_name)
logger = netsvc.Logger()
db = sql_db.db_connect('template1')

View File

@ -214,11 +214,11 @@ class Cursor(object):
params = params or None
res = self._obj.execute(query, params)
except psycopg2.ProgrammingError, pe:
if self._default_log_exceptions or log_exceptions:
if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
self.__logger.error("Programming error: %s, in query %s", pe, query)
raise
except Exception:
if self._default_log_exceptions or log_exceptions:
if (self._default_log_exceptions if log_exceptions is None else log_exceptions):
self.__logger.exception("bad query: %s", self._obj.query or query)
raise
@ -504,7 +504,7 @@ def db_connect(db_name):
return Connection(_Pool, db_name)
def close_db(db_name):
""" You might want to call openerp.netsvc.Agent.cancel(db_name) along this function."""
""" You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
_Pool.close_all(dsn(db_name))
ct = currentThread()
if hasattr(ct, 'dbname'):

View File

@ -261,8 +261,12 @@ class configmanager(object):
"osv_memory tables. This is a decimal value expressed in hours, "
"and the default is 1 hour.",
type="float")
group.add_option("--max-cron-threads", dest="max_cron_threads", my_default=4,
help="Maximum number of threads processing concurrently cron jobs.",
type="int")
group.add_option("--unaccent", dest="unaccent", my_default=False, action="store_true",
help="Use the unaccent function provided by the database when available.")
parser.add_option_group(group)
# Copy all optparse options (i.e. MyOption) into self.options.
@ -365,7 +369,7 @@ class configmanager(object):
'stop_after_init', 'logrotate', 'without_demo', 'netrpc', 'xmlrpc', 'syslog',
'list_db', 'xmlrpcs',
'test_file', 'test_disable', 'test_commit', 'test_report_directory',
'osv_memory_count_limit', 'osv_memory_age_limit', 'unaccent',
'osv_memory_count_limit', 'osv_memory_age_limit', 'max_cron_threads', 'unaccent',
]
for arg in keys:
@ -447,6 +451,8 @@ class configmanager(object):
if opt.save:
self.save()
openerp.conf.max_cron_threads = self.options['max_cron_threads']
openerp.conf.addons_paths = self.options['addons_path'].split(',')
openerp.conf.server_wide_modules = \
map(lambda m: m.strip(), opt.server_wide_modules.split(',')) if \