diff --git a/openerp-server b/openerp-server index 60eccd777bc..52ad49104b1 100755 --- a/openerp-server +++ b/openerp-server @@ -89,15 +89,17 @@ def setup_pid_file(): def preload_registry(dbname): """ Preload a registry, and start the cron.""" try: - db, pool = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False) - pool.get('ir.cron').restart(db.dbname) + db, registry = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False) + + # jobs will start to be processed later, when openerp.cron.start_master_thread() is called by openerp.service.start_services() + registry.schedule_cron_jobs() except Exception: logging.exception('Failed to initialize database `%s`.', dbname) def run_test_file(dbname, test_file): """ Preload a registry, possibly run a test file, and start the cron.""" try: - db, pool = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False) + db, registry = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False) cr = db.cursor() logger = logging.getLogger('server') logger.info('loading test file %s', test_file) diff --git a/openerp/addons/base/__init__.py b/openerp/addons/base/__init__.py index 847bef71f8c..16a61521fb4 100644 --- a/openerp/addons/base/__init__.py +++ b/openerp/addons/base/__init__.py @@ -24,6 +24,7 @@ import module import res import publisher_warranty import report +import test # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/addons/base/__openerp__.py b/openerp/addons/base/__openerp__.py index 474cb384049..e436a0a6005 100644 --- a/openerp/addons/base/__openerp__.py +++ b/openerp/addons/base/__openerp__.py @@ -92,6 +92,10 @@ 'test/test_osv_expression.yml', 'test/test_ir_rule.yml', # <-- These tests modify/add/delete ir_rules. 'test/test_ir_values.yml', + # Commented because this takes some time. + # This must be (un)commented with the corresponding import statement + # in test/__init__.py. + # 'test/test_ir_cron.yml', # <-- These tests perform a roolback. ], 'installable': True, 'active': True, diff --git a/openerp/addons/base/ir/ir_cron.py b/openerp/addons/base/ir/ir_cron.py index 054a15257ef..af46b0ef3e0 100644 --- a/openerp/addons/base/ir/ir_cron.py +++ b/openerp/addons/base/ir/ir_cron.py @@ -21,13 +21,20 @@ import time import logging +import threading +import psycopg2 from datetime import datetime from dateutil.relativedelta import relativedelta + import netsvc -import tools -from tools.safe_eval import safe_eval as eval +import openerp import pooler +import tools +from openerp.cron import WAKE_UP_NOW from osv import fields, osv +from tools import DEFAULT_SERVER_DATETIME_FORMAT +from tools.safe_eval import safe_eval as eval +from tools.translate import _ def str2tuple(s): return eval('tuple(%s)' % (s or '')) @@ -41,10 +48,15 @@ _intervalTypes = { 'minutes': lambda interval: relativedelta(minutes=interval), } -class ir_cron(osv.osv, netsvc.Agent): - """ This is the ORM object that periodically executes actions. - Note that we use the netsvc.Agent()._logger member. +class ir_cron(osv.osv): + """ Model describing cron jobs (also called actions or tasks). """ + + # TODO: perhaps in the future we could consider a flag on ir.cron jobs + # that would cause database wake-up even if the database has not been + # loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something) + # See also openerp.cron + _name = "ir.cron" _order = 'name' _columns = { @@ -54,17 +66,17 @@ class ir_cron(osv.osv, netsvc.Agent): 'interval_number': fields.integer('Interval Number',help="Repeat every x."), 'interval_type': fields.selection( [('minutes', 'Minutes'), ('hours', 'Hours'), ('work_days','Work Days'), ('days', 'Days'),('weeks', 'Weeks'), ('months', 'Months')], 'Interval Unit'), - 'numbercall': fields.integer('Number of Calls', help='Number of time the function is called,\na negative number indicates no limit'), - 'doall' : fields.boolean('Repeat Missed', help="Enable this if you want to execute missed occurences as soon as the server restarts."), - 'nextcall' : fields.datetime('Next Execution Date', required=True, help="Next planned execution date for this scheduler"), - 'model': fields.char('Object', size=64, help="Name of object whose function will be called when this scheduler will run. e.g. 'res.partener'"), - 'function': fields.char('Function', size=64, help="Name of the method to be called on the object when this scheduler is executed."), - 'args': fields.text('Arguments', help="Arguments to be passed to the method. e.g. (uid,)"), - 'priority': fields.integer('Priority', help='0=Very Urgent\n10=Not urgent') + 'numbercall': fields.integer('Number of Calls', help='How many times the method is called,\na negative number indicates no limit.'), + 'doall' : fields.boolean('Repeat Missed', help="Specify if missed occurrences should be executed when the server restarts."), + 'nextcall' : fields.datetime('Next Execution Date', required=True, help="Next planned execution date for this job."), + 'model': fields.char('Object', size=64, help="Model name on which the method to be called is located, e.g. 'res.partner'."), + 'function': fields.char('Method', size=64, help="Name of the method to be called when this job is processed."), + 'args': fields.text('Arguments', help="Arguments to be passed to the method, e.g. (uid,)."), + 'priority': fields.integer('Priority', help='The priority of the job, as an integer: 0 means higher priority, 10 means lower priority.') } _defaults = { - 'nextcall' : lambda *a: time.strftime('%Y-%m-%d %H:%M:%S'), + 'nextcall' : lambda *a: time.strftime(DEFAULT_SERVER_DATETIME_FORMAT), 'priority' : lambda *a: 5, 'user_id' : lambda obj,cr,uid,context: uid, 'interval_number' : lambda *a: 1, @@ -74,6 +86,8 @@ class ir_cron(osv.osv, netsvc.Agent): 'doall' : lambda *a: 1 } + _logger = logging.getLogger('cron') + def _check_args(self, cr, uid, ids, context=None): try: for this in self.browse(cr, uid, ids, context): @@ -86,68 +100,164 @@ class ir_cron(osv.osv, netsvc.Agent): (_check_args, 'Invalid arguments', ['args']), ] - def _handle_callback_exception(self, cr, uid, model, func, args, job_id, job_exception): - cr.rollback() - logger=logging.getLogger('cron') - logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model, func, args, job_id)) + def _handle_callback_exception(self, cr, uid, model_name, method_name, args, job_id, job_exception): + """ Method called when an exception is raised by a job. - def _callback(self, cr, uid, model, func, args, job_id): + Simply logs the exception and rollback the transaction. + + :param model_name: model name on which the job method is located. + :param method_name: name of the method to call when this job is processed. + :param args: arguments of the method (without the usual self, cr, uid). + :param job_id: job id. + :param job_exception: exception raised by the job. + + """ + cr.rollback() + self._logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model_name, method_name, args, job_id)) + + def _callback(self, cr, uid, model_name, method_name, args, job_id): + """ Run the method associated to a given job + + It takes care of logging and exception handling. + + :param model_name: model name on which the job method is located. + :param method_name: name of the method to call when this job is processed. + :param args: arguments of the method (without the usual self, cr, uid). + :param job_id: job id. + """ args = str2tuple(args) - m = self.pool.get(model) - if m and hasattr(m, func): - f = getattr(m, func) + model = self.pool.get(model_name) + if model and hasattr(model, method_name): + method = getattr(model, method_name) try: - netsvc.log('cron', (cr.dbname,uid,'*',model,func)+tuple(args), channel=logging.DEBUG, + netsvc.log('cron', (cr.dbname,uid,'*',model_name,method_name)+tuple(args), channel=logging.DEBUG, depth=(None if self._logger.isEnabledFor(logging.DEBUG_RPC_ANSWER) else 1), fn='object.execute') logger = logging.getLogger('execution time') if logger.isEnabledFor(logging.DEBUG): start_time = time.time() - f(cr, uid, *args) + method(cr, uid, *args) if logger.isEnabledFor(logging.DEBUG): end_time = time.time() - logger.log(logging.DEBUG, '%.3fs (%s, %s)' % (end_time - start_time, model, func)) + logger.log(logging.DEBUG, '%.3fs (%s, %s)' % (end_time - start_time, model_name, method_name)) except Exception, e: - self._handle_callback_exception(cr, uid, model, func, args, job_id, e) + self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e) - def _poolJobs(self, db_name, check=False): + def _run_job(self, cr, job, now): + """ Run a given job taking care of the repetition. + + The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this + method is run in a worker thread (spawned by _run_jobs_multithread())). + + :param job: job to be run (as a dictionary). + :param now: timestamp (result of datetime.now(), no need to call it multiple time). + + """ try: - db, pool = pooler.get_db_and_pool(db_name) - except: - return False + nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT) + numbercall = job['numbercall'] + + ok = False + while nextcall < now and numbercall: + if numbercall > 0: + numbercall -= 1 + if not ok or job['doall']: + self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id']) + if numbercall: + nextcall += _intervalTypes[job['interval_type']](job['interval_number']) + ok = True + addsql = '' + if not numbercall: + addsql = ', active=False' + cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s", + (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id'])) + + if numbercall: + # Reschedule our own main cron thread if necessary. + # This is really needed if this job runs longer than its rescheduling period. + nextcall = time.mktime(nextcall.timetuple()) + openerp.cron.schedule_wakeup(nextcall, cr.dbname) + finally: + cr.commit() + cr.close() + openerp.cron.release_thread_slot() + + def _run_jobs_multithread(self): + # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py + """ Process the cron jobs by spawning worker threads. + + This selects in database all the jobs that should be processed. It then + tries to lock each of them and, if it succeeds, spawns a thread to run + the cron job (if it doesn't succeed, it means the job was already + locked to be taken care of by another thread). + + The cursor used to lock the job in database is given to the worker + thread (which has to close it itself). + + """ + db = self.pool.db cr = db.cursor() + db_name = db.dbname try: - if not pool._init: - now = datetime.now() - cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority') - for job in cr.dictfetchall(): - nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S') - numbercall = job['numbercall'] + jobs = {} # mapping job ids to jobs for all jobs being processed. + now = datetime.now() + # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1. + cr.execute("""SELECT * FROM ir_cron + WHERE numbercall != 0 + AND active AND nextcall <= (now() at time zone 'UTC') + ORDER BY priority""") + for job in cr.dictfetchall(): + if not openerp.cron.get_thread_slots(): + break + jobs[job['id']] = job - ok = False - while nextcall < now and numbercall: - if numbercall > 0: - numbercall -= 1 - if not ok or job['doall']: - self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id']) - if numbercall: - nextcall += _intervalTypes[job['interval_type']](job['interval_number']) - ok = True - addsql = '' - if not numbercall: - addsql = ', active=False' - cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id'])) - cr.commit() + task_cr = db.cursor() + try: + # Try to grab an exclusive lock on the job row from within the task transaction + acquired_lock = False + task_cr.execute("""SELECT * + FROM ir_cron + WHERE id=%s + FOR UPDATE NOWAIT""", + (job['id'],), log_exceptions=False) + acquired_lock = True + except psycopg2.OperationalError, e: + if e.pgcode == '55P03': + # Class 55: Object not in prerequisite state; 55P03: lock_not_available + self._logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name']) + continue + else: + # Unexpected OperationalError + raise + finally: + if not acquired_lock: + # we're exiting due to an exception while acquiring the lot + task_cr.close() + # Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock + task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now)) + # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default) + task_thread.setDaemon(False) + openerp.cron.take_thread_slot() + task_thread.start() + self._logger.debug('Cron execution thread for job `%s` spawned', job['name']) - cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active') - next_call = cr.dictfetchone()['min_next_call'] - if next_call: - next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S')) + # Find next earliest job ignoring currently processed jobs (by this and other cron threads) + find_next_time_query = """SELECT min(nextcall) AS min_next_call + FROM ir_cron WHERE numbercall != 0 AND active""" + if jobs: + cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),)) else: - next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day + cr.execute(find_next_time_query) + next_call = cr.dictfetchone()['min_next_call'] - if not check: - self.setAlarm(self._poolJobs, next_call, db_name, db_name) + if next_call: + next_call = time.mktime(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT)) + else: + # no matching cron job found in database, re-schedule arbitrarily in 1 day, + # this delay will likely be modified when running jobs complete their tasks + next_call = time.time() + (24*3600) + + openerp.cron.schedule_wakeup(next_call, db_name) except Exception, ex: self._logger.warning('Exception in cron:', exc_info=True) @@ -156,12 +266,8 @@ class ir_cron(osv.osv, netsvc.Agent): cr.commit() cr.close() - def restart(self, dbname): - self.cancel(dbname) - # Reschedule cron processing job asap, but not in the current thread - self.setAlarm(self._poolJobs, time.time(), dbname, dbname) - def update_running_cron(self, cr): + """ Schedule as soon as possible a wake-up for this database. """ # Verify whether the server is already started and thus whether we need to commit # immediately our changes and restart the cron agent in order to apply the change # immediately. The commit() is needed because as soon as the cron is (re)started it @@ -171,23 +277,37 @@ class ir_cron(osv.osv, netsvc.Agent): # when the server is only starting or loading modules (hence the test on pool._init). if not self.pool._init: cr.commit() - self.restart(cr.dbname) + openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname) + + def _try_lock(self, cr, uid, ids, context=None): + """Try to grab a dummy exclusive write-lock to the rows with the given ids, + to make sure a following write() or unlink() will not block due + to a process currently executing those cron tasks""" + try: + cr.execute("""SELECT id FROM "%s" WHERE id IN %%s FOR UPDATE NOWAIT""" % self._table, + (tuple(ids),), log_exceptions=False) + except psycopg2.OperationalError: + cr.rollback() # early rollback to allow translations to work for the user feedback + raise osv.except_osv(_("Record cannot be modified right now"), + _("This cron task is currently being executed and may not be modified, " + "please try again in a few minutes")) def create(self, cr, uid, vals, context=None): res = super(ir_cron, self).create(cr, uid, vals, context=context) self.update_running_cron(cr) return res - def write(self, cr, user, ids, vals, context=None): - res = super(ir_cron, self).write(cr, user, ids, vals, context=context) + def write(self, cr, uid, ids, vals, context=None): + self._try_lock(cr, uid, ids, context) + res = super(ir_cron, self).write(cr, uid, ids, vals, context=context) self.update_running_cron(cr) return res def unlink(self, cr, uid, ids, context=None): + self._try_lock(cr, uid, ids, context) res = super(ir_cron, self).unlink(cr, uid, ids, context=context) self.update_running_cron(cr) return res ir_cron() # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: - diff --git a/openerp/addons/base/test/__init__.py b/openerp/addons/base/test/__init__.py new file mode 100644 index 00000000000..c0cc8f7cb78 --- /dev/null +++ b/openerp/addons/base/test/__init__.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +############################################################################## +# +# OpenERP, Open Source Management Solution +# Copyright (C) 2011-TODAY OpenERP S.A. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +############################################################################## + +# Useful for manual testing of cron jobs scheduling. +# This must be (un)commented with the corresponding yml file +# in ../__openerp__.py. +# import test_ir_cron + +# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/addons/base/test/test_ir_cron.py b/openerp/addons/base/test/test_ir_cron.py new file mode 100644 index 00000000000..6ef79ac3a78 --- /dev/null +++ b/openerp/addons/base/test/test_ir_cron.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +############################################################################## +# +# OpenERP, Open Source Management Solution +# Copyright (C) 2011-TODAY OpenERP S.A. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +############################################################################## + +import time +from datetime import datetime +from dateutil.relativedelta import relativedelta + +import openerp + +JOB = { + 'function': u'_0_seconds', + 'interval_type': u'minutes', + 'user_id': 1, + 'name': u'test', + 'args': False, + 'numbercall': 1, + 'nextcall': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'priority': 5, + 'doall': True, + 'active': True, + 'interval_number': 1, + 'model': u'ir.cron' +} + +class test_ir_cron(openerp.osv.osv.osv): + """ Add a few handy methods to test cron jobs scheduling. """ + _inherit = "ir.cron" + + def _0_seconds(a, b, c): + print ">>> _0_seconds" + + def _20_seconds(self, cr, uid): + print ">>> in _20_seconds" + time.sleep(20) + print ">>> out _20_seconds" + + def _80_seconds(self, cr, uid): + print ">>> in _80_seconds" + time.sleep(80) + print ">>> out _80_seconds" + + def test_0(self, cr, uid): + now = datetime.now() + t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S') + t2 = (now + relativedelta(minutes=1, seconds=5)).strftime('%Y-%m-%d %H:%M:%S') + t3 = (now + relativedelta(minutes=1, seconds=10)).strftime('%Y-%m-%d %H:%M:%S') + self.create(cr, uid, dict(JOB, name='test_0 _20_seconds A', function='_20_seconds', nextcall=t1)) + self.create(cr, uid, dict(JOB, name='test_0 _20_seconds B', function='_20_seconds', nextcall=t2)) + self.create(cr, uid, dict(JOB, name='test_0 _20_seconds C', function='_20_seconds', nextcall=t3)) + + def test_1(self, cr, uid): + now = datetime.now() + t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S') + self.create(cr, uid, dict(JOB, name='test_1 _20_seconds * 3', function='_20_seconds', nextcall=t1, numbercall=3)) + + def test_2(self, cr, uid): + now = datetime.now() + t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S') + self.create(cr, uid, dict(JOB, name='test_2 _80_seconds * 2', function='_80_seconds', nextcall=t1, numbercall=2)) + + def test_3(self, cr, uid): + now = datetime.now() + t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S') + t2 = (now + relativedelta(minutes=1, seconds=5)).strftime('%Y-%m-%d %H:%M:%S') + t3 = (now + relativedelta(minutes=1, seconds=10)).strftime('%Y-%m-%d %H:%M:%S') + self.create(cr, uid, dict(JOB, name='test_3 _80_seconds A', function='_80_seconds', nextcall=t1)) + self.create(cr, uid, dict(JOB, name='test_3 _20_seconds B', function='_20_seconds', nextcall=t2)) + self.create(cr, uid, dict(JOB, name='test_3 _20_seconds C', function='_20_seconds', nextcall=t3)) + + # This test assumes 4 cron threads. + def test_00(self, cr, uid): + self.test_00_set = set() + now = datetime.now() + t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S') + t2 = (now + relativedelta(minutes=1, seconds=5)).strftime('%Y-%m-%d %H:%M:%S') + t3 = (now + relativedelta(minutes=1, seconds=10)).strftime('%Y-%m-%d %H:%M:%S') + self.create(cr, uid, dict(JOB, name='test_00 _20_seconds_A', function='_20_seconds_A', nextcall=t1)) + self.create(cr, uid, dict(JOB, name='test_00 _20_seconds_B', function='_20_seconds_B', nextcall=t2)) + self.create(cr, uid, dict(JOB, name='test_00 _20_seconds_C', function='_20_seconds_C', nextcall=t3)) + + def _expect(self, cr, uid, to_add, to_sleep, to_expect_in, to_expect_out): + assert self.test_00_set == to_expect_in + self.test_00_set.add(to_add) + time.sleep(to_sleep) + self.test_00_set.discard(to_add) + assert self.test_00_set == to_expect_out + + def _20_seconds_A(self, cr, uid): + self._expect(cr, uid, 'A', 20, set(), set(['B', 'C'])) + + def _20_seconds_B(self, cr, uid): + self._expect(cr, uid, 'B', 20, set('A'), set('C')) + + def _20_seconds_C(self, cr, uid): + self._expect(cr, uid, 'C', 20, set(['A', 'B']), set()) + +# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: + diff --git a/openerp/addons/base/test/test_ir_cron.yml b/openerp/addons/base/test/test_ir_cron.yml new file mode 100644 index 00000000000..16d648e1c30 --- /dev/null +++ b/openerp/addons/base/test/test_ir_cron.yml @@ -0,0 +1,61 @@ +- + Test the cron jobs scheduling. +- + Disable the existing cron jobs if any during the tests. +- + !python {model: ir.cron }: | + # For this test to work, as it involves multiple database cursors, + # we have to commit changes. But YAML tests must be rollbacked, so + # the final database state is left untouched. So we have to be a bit + # ugly here: use our own cursor, commit, and clean after ourselves. + # We also pass around some ids using setattr/delattr, and we have to + # rollback the previous tests otherwise we won't be able to touch the + # db. + # Well, this should probably be a standalone, or regular unit test, + # instead of using the YAML infrastructure. + cr.rollback() + our_cr = self.pool.db.cursor() + try: + ids = self.search(our_cr, uid, [], {}) + setattr(self, 'saved_ids', ids) + self.write(our_cr, uid, ids, {'active': False}, {}) + our_cr.commit() + finally: + our_cr.close() +- + Three concurrent jobs started with a slight time gap. Assume 4 cron threads. + This will take about 2 minutes. +- + !python {model: ir.cron }: | + # Pretend initialization is already done. We the use a try/finally + # to reset _init correctly. + self.pool._init = False + our_cr = self.pool.db.cursor() + try: + self.test_00(our_cr, uid) # this will commit using the passed cursor + import openerp.cron + openerp.cron._thread_slots = 4 + # Wake up this db as soon as the master cron thread starts. + openerp.cron.schedule_wakeup(1, self.pool.db.dbname) + # Pretend to be the master thread, for 4 iterations. + openerp.cron.runner_body() + openerp.cron.runner_body() + openerp.cron.runner_body() + openerp.cron.runner_body() + finally: + self.pool._init = True + our_cr.close() +- + Clean after ourselves. +- + !python {model: ir.cron }: | + our_cr = self.pool.db.cursor() + try: + ids = [x for x in self.search(our_cr, uid, ['|', ('active', '=', True), ('active', '=', False)], {}) if x not in self.saved_ids] + self.unlink(our_cr, uid, ids, {}) + ids = self.saved_ids + delattr(self, 'saved_ids') + self.write(our_cr, uid, ids, {'active': True}, {}) + our_cr.commit() + finally: + our_cr.close() diff --git a/openerp/conf/__init__.py b/openerp/conf/__init__.py index 0a975c5d4e2..c89ddf44734 100644 --- a/openerp/conf/__init__.py +++ b/openerp/conf/__init__.py @@ -35,6 +35,10 @@ must be used. import deprecation +# Maximum number of threads processing concurrently cron jobs. +max_cron_threads = 4 # Actually the default value here is meaningless, + # look at tools.config for the default value. + # Paths to search for OpenERP addons. addons_paths = [] diff --git a/openerp/cron.py b/openerp/cron.py new file mode 100644 index 00000000000..d24300b7f43 --- /dev/null +++ b/openerp/cron.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +############################################################################## +# +# OpenERP, Open Source Management Solution +# Copyright (C) 2004-2011 OpenERP SA () +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +############################################################################## + +""" Cron jobs scheduling + +Cron jobs are defined in the ir_cron table/model. This module deals with all +cron jobs, for all databases of a single OpenERP server instance. + +It defines a single master thread that will spawn (a bounded number of) +threads to process individual cron jobs. + +The thread runs forever, checking every 60 seconds for new +'database wake-ups'. It maintains a heapq of database wake-ups. At each +wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread +will check the jobs defined in the ir_cron table and spawn accordingly threads +to process them. + +This module's behavior depends on the following configuration variable: +openerp.conf.max_cron_threads. + +""" + +import heapq +import logging +import threading +import time + +import openerp +import tools + +# Heapq of database wake-ups. Note that 'database wake-up' meaning is in +# the context of the cron management. This is not originally about loading +# a database, although having the database name in the queue will +# cause it to be loaded when the schedule time is reached, even if it was +# unloaded in the mean time. Normally a database's wake-up is cancelled by +# the RegistryManager when the database is unloaded - so this should not +# cause it to be reloaded. +# +# TODO: perhaps in the future we could consider a flag on ir.cron jobs +# that would cause database wake-up even if the database has not been +# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something) +# +# Each element is a triple (timestamp, database-name, boolean). The boolean +# specifies if the wake-up is canceled (so a wake-up can be canceled without +# relying on the heapq implementation detail; no need to remove the job from +# the heapq). +_wakeups = [] + +# Mapping of database names to the wake-up defined in the heapq, +# so that we can cancel the wake-up without messing with the heapq +# invariant: lookup the wake-up by database-name, then set +# its third element to True. +_wakeup_by_db = {} + +# Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables. +# We could use a simple (non-reentrant) lock if the runner function below +# was more fine-grained, but we are fine with the loop owning the lock +# while spawning a few threads. +_wakeups_lock = threading.RLock() + +# Maximum number of threads allowed to process cron jobs concurrently. This +# variable is set by start_master_thread using openerp.conf.max_cron_threads. +_thread_slots = None + +# A (non re-entrant) lock to protect the above _thread_slots variable. +_thread_slots_lock = threading.Lock() + +_logger = logging.getLogger('cron') + +# Sleep duration limits - must not loop too quickly, but can't sleep too long +# either, because a new job might be inserted in ir_cron with a much sooner +# execution date than current known ones. We won't see it until we wake! +MAX_SLEEP = 60 # 1 min +MIN_SLEEP = 1 # 1 sec + +# Dummy wake-up timestamp that can be used to force a database wake-up asap +WAKE_UP_NOW = 1 + +def get_thread_slots(): + """ Return the number of available thread slots """ + return _thread_slots + + +def release_thread_slot(): + """ Increment the number of available thread slots """ + global _thread_slots + with _thread_slots_lock: + _thread_slots += 1 + + +def take_thread_slot(): + """ Decrement the number of available thread slots """ + global _thread_slots + with _thread_slots_lock: + _thread_slots -= 1 + + +def cancel(db_name): + """ Cancel the next wake-up of a given database, if any. + + :param db_name: database name for which the wake-up is canceled. + + """ + _logger.debug("Cancel next wake-up for database '%s'.", db_name) + with _wakeups_lock: + if db_name in _wakeup_by_db: + _wakeup_by_db[db_name][2] = True + + +def cancel_all(): + """ Cancel all database wake-ups. """ + _logger.debug("Cancel all database wake-ups") + global _wakeups + global _wakeup_by_db + with _wakeups_lock: + _wakeups = [] + _wakeup_by_db = {} + + +def schedule_wakeup(timestamp, db_name): + """ Schedule a new wake-up for a database. + + If an earlier wake-up is already defined, the new wake-up is discarded. + If another wake-up is defined, that wake-up is discarded and the new one + is scheduled. + + :param db_name: database name for which a new wake-up is scheduled. + :param timestamp: when the wake-up is scheduled. + + """ + if not timestamp: + return + with _wakeups_lock: + if db_name in _wakeup_by_db: + task = _wakeup_by_db[db_name] + if not task[2] and timestamp > task[0]: + # existing wakeup is valid and occurs earlier than new one + return + task[2] = True # cancel existing task + task = [timestamp, db_name, False] + heapq.heappush(_wakeups, task) + _wakeup_by_db[db_name] = task + _logger.debug("Wake-up scheduled for database '%s' @ %s", db_name, + 'NOW' if timestamp == WAKE_UP_NOW else timestamp) + +def runner(): + """Neverending function (intended to be run in a dedicated thread) that + checks every 60 seconds the next database wake-up. TODO: make configurable + """ + while True: + runner_body() + +def runner_body(): + with _wakeups_lock: + while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots(): + task = heapq.heappop(_wakeups) + timestamp, db_name, canceled = task + if canceled: + continue + del _wakeup_by_db[db_name] + registry = openerp.pooler.get_pool(db_name) + if not registry._init: + _logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name) + registry['ir.cron']._run_jobs_multithread() + amount = MAX_SLEEP + with _wakeups_lock: + # Sleep less than MAX_SLEEP if the next known wake-up will happen before that. + if _wakeups and get_thread_slots(): + amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time())) + _logger.debug("Going to sleep for %ss", amount) + time.sleep(amount) + +def start_master_thread(): + """ Start the above runner function in a daemon thread. + + The thread is a typical daemon thread: it will never quit and must be + terminated when the main process exits - with no consequence (the processing + threads it spawns are not marked daemon). + + """ + global _thread_slots + _thread_slots = openerp.conf.max_cron_threads + db_maxconn = tools.config['db_maxconn'] + if _thread_slots >= tools.config.get('db_maxconn', 64): + _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), " + "this may cause trouble if you reach that number of parallel cron tasks.", + db_maxconn, _thread_slots) + t = threading.Thread(target=runner, name="openerp.cron.master_thread") + t.setDaemon(True) + t.start() + _logger.debug("Master cron daemon started!") + +# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/modules/registry.py b/openerp/modules/registry.py index ea6d863e442..72cd4498cfa 100644 --- a/openerp/modules/registry.py +++ b/openerp/modules/registry.py @@ -28,6 +28,8 @@ import logging import openerp.sql_db import openerp.osv.orm +import openerp.cron +import openerp.tools import openerp.modules.db import openerp.tools.config @@ -96,9 +98,17 @@ class Registry(object): return res + def schedule_cron_jobs(self): + """ Make the cron thread care about this registry/database jobs. + This will initiate the cron thread to check for any pending jobs for + this registry/database as soon as possible. Then it will continuously + monitor the ir.cron model for future jobs. See openerp.cron for + details. + """ + openerp.cron.schedule_wakeup(openerp.cron.WAKE_UP_NOW, self.db.dbname) + def clear_caches(self): """ Clear the caches - This clears the caches associated to methods decorated with ``tools.ormcache`` or ``tools.ormcache_multi`` for all the models. """ @@ -112,13 +122,11 @@ class RegistryManager(object): registries (essentially database connection/model registry pairs). """ - # Mapping between db name and model registry. # Accessed through the methods below. registries = {} registries_lock = threading.RLock() - @classmethod def get(cls, db_name, force_demo=False, status=None, update_module=False, pooljobs=True): @@ -131,7 +139,6 @@ class RegistryManager(object): update_module, pooljobs) return registry - @classmethod def new(cls, db_name, force_demo=False, status=None, update_module=False, pooljobs=True): @@ -166,22 +173,38 @@ class RegistryManager(object): cr.close() if pooljobs: - registry.get('ir.cron').restart(registry.db.dbname) + registry.schedule_cron_jobs() return registry - @classmethod def delete(cls, db_name): - """ Delete the registry linked to a given database. """ + """Delete the registry linked to a given database. + + This also cleans the associated caches. For good measure this also + cancels the associated cron job. But please note that the cron job can + be running and take some time before ending, and that you should not + remove a registry if it can still be used by some thread. So it might + be necessary to call yourself openerp.cron.Agent.cancel(db_name) and + and join (i.e. wait for) the thread. + """ with cls.registries_lock: if db_name in cls.registries: + cls.registries[db_name].clear_caches() del cls.registries[db_name] + openerp.cron.cancel(db_name) + @classmethod + def delete_all(cls): + """Delete all the registries. """ + with cls.registries_lock: + for db_name in cls.registries.keys(): + cls.delete(db_name) + @classmethod def clear_caches(cls, db_name): - """ Clear the caches + """Clear caches This clears the caches associated to methods decorated with ``tools.ormcache`` or ``tools.ormcache_multi`` for all the models @@ -195,4 +218,4 @@ class RegistryManager(object): cls.registries[db_name].clear_caches() -# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: +# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: \ No newline at end of file diff --git a/openerp/netsvc.py b/openerp/netsvc.py index 7e62687e335..1d0b2a25726 100644 --- a/openerp/netsvc.py +++ b/openerp/netsvc.py @@ -21,7 +21,6 @@ ############################################################################## import errno -import heapq import logging import logging.handlers import os @@ -31,13 +30,14 @@ import socket import sys import threading import time +import traceback import types from pprint import pformat # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels. from loglevels import * import tools -import openerp.exceptions +import openerp def close_socket(sock): """ Closes a socket instance cleanly @@ -211,83 +211,6 @@ def init_alternative_logger(): logger.addHandler(handler) logger.setLevel(logging.ERROR) -class Agent(object): - """ Singleton that keeps track of cancellable tasks to run at a given - timestamp. - - The tasks are characterised by: - - * a timestamp - * the database on which the task run - * the function to call - * the arguments and keyword arguments to pass to the function - - Implementation details: - - - Tasks are stored as list, allowing the cancellation by setting - the timestamp to 0. - - A heapq is used to store tasks, so we don't need to sort - tasks ourself. - """ - __tasks = [] - __tasks_by_db = {} - _logger = logging.getLogger('netsvc.agent') - - @classmethod - def setAlarm(cls, function, timestamp, db_name, *args, **kwargs): - task = [timestamp, db_name, function, args, kwargs] - heapq.heappush(cls.__tasks, task) - cls.__tasks_by_db.setdefault(db_name, []).append(task) - - @classmethod - def cancel(cls, db_name): - """Cancel all tasks for a given database. If None is passed, all tasks are cancelled""" - cls._logger.debug("Cancel timers for %s db", db_name or 'all') - if db_name is None: - cls.__tasks, cls.__tasks_by_db = [], {} - else: - if db_name in cls.__tasks_by_db: - for task in cls.__tasks_by_db[db_name]: - task[0] = 0 - - @classmethod - def quit(cls): - cls.cancel(None) - - @classmethod - def runner(cls): - """Neverending function (intended to be ran in a dedicated thread) that - checks every 60 seconds tasks to run. TODO: make configurable - """ - current_thread = threading.currentThread() - while True: - while cls.__tasks and cls.__tasks[0][0] < time.time(): - task = heapq.heappop(cls.__tasks) - timestamp, dbname, function, args, kwargs = task - cls.__tasks_by_db[dbname].remove(task) - if not timestamp: - # null timestamp -> cancelled task - continue - current_thread.dbname = dbname # hack hack - cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs) - delattr(current_thread, 'dbname') - task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs) - # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default) - task_thread.setDaemon(False) - task_thread.start() - time.sleep(1) - time.sleep(60) - -def start_agent(): - agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner") - # the agent runner is a typical daemon thread, that will never quit and must be - # terminated when the main process exits - with no consequence (the processing - # threads it spawns are not marked daemon) - agent_runner.setDaemon(True) - agent_runner.start() - -import traceback - class Server: """ Generic interface for all servers with an event loop etc. Override this to impement http, net-rpc etc. servers. diff --git a/openerp/pooler.py b/openerp/pooler.py index c754385d6b3..88b60864b90 100644 --- a/openerp/pooler.py +++ b/openerp/pooler.py @@ -34,11 +34,6 @@ def get_db_and_pool(db_name, force_demo=False, status=None, update_module=False, return registry.db, registry -def delete_pool(db_name): - """Delete an existing registry.""" - RegistryManager.delete(db_name) - - def restart_pool(db_name, force_demo=False, status=None, update_module=False): """Delete an existing registry and return a database connection and a newly initialized registry.""" registry = RegistryManager.new(db_name, force_demo, status, update_module, True) diff --git a/openerp/service/__init__.py b/openerp/service/__init__.py index 5dbb764847a..1bb83dfd228 100644 --- a/openerp/service/__init__.py +++ b/openerp/service/__init__.py @@ -28,6 +28,8 @@ import netrpc_server import web_services import websrv_lib +import openerp.cron +import openerp.modules import openerp.netsvc import openerp.osv import openerp.tools @@ -61,7 +63,7 @@ def start_services(): netrpc_server.init_servers() # Start the main cron thread. - openerp.netsvc.start_agent() + openerp.cron.start_master_thread() # Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC). openerp.netsvc.Server.startAll() @@ -72,7 +74,9 @@ def start_services(): def stop_services(): """ Stop all services. """ - openerp.netsvc.Agent.quit() + # stop scheduling new jobs; we will have to wait for the jobs to complete below + openerp.cron.cancel_all() + openerp.netsvc.Server.quitAll() openerp.wsgi.stop_server() config = openerp.tools.config @@ -92,6 +96,8 @@ def stop_services(): thread.join(0.05) time.sleep(0.05) + openerp.modules.registry.RegistryManager.delete_all() + # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/service/web_services.py b/openerp/service/web_services.py index a6be9827c52..41951cc9fc3 100644 --- a/openerp/service/web_services.py +++ b/openerp/service/web_services.py @@ -167,9 +167,8 @@ class db(netsvc.ExportService): raise Exception, e def exp_drop(self, db_name): + openerp.modules.registry.RegistryManager.delete(db_name) sql_db.close_db(db_name) - openerp.modules.registry.RegistryManager.clear_caches(db_name) - openerp.netsvc.Agent.cancel(db_name) logger = netsvc.Logger() db = sql_db.db_connect('template1') @@ -271,9 +270,8 @@ class db(netsvc.ExportService): return True def exp_rename(self, old_name, new_name): + openerp.modules.registry.RegistryManager.delete(old_name) sql_db.close_db(old_name) - openerp.modules.registry.RegistryManager.clear_caches(old_name) - openerp.netsvc.Agent.cancel(old_name) logger = netsvc.Logger() db = sql_db.db_connect('template1') diff --git a/openerp/sql_db.py b/openerp/sql_db.py index 8038c39486d..ab92d7cc21d 100644 --- a/openerp/sql_db.py +++ b/openerp/sql_db.py @@ -214,11 +214,11 @@ class Cursor(object): params = params or None res = self._obj.execute(query, params) except psycopg2.ProgrammingError, pe: - if self._default_log_exceptions or log_exceptions: + if (self._default_log_exceptions if log_exceptions is None else log_exceptions): self.__logger.error("Programming error: %s, in query %s", pe, query) raise except Exception: - if self._default_log_exceptions or log_exceptions: + if (self._default_log_exceptions if log_exceptions is None else log_exceptions): self.__logger.exception("bad query: %s", self._obj.query or query) raise @@ -504,7 +504,7 @@ def db_connect(db_name): return Connection(_Pool, db_name) def close_db(db_name): - """ You might want to call openerp.netsvc.Agent.cancel(db_name) along this function.""" + """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function.""" _Pool.close_all(dsn(db_name)) ct = currentThread() if hasattr(ct, 'dbname'): diff --git a/openerp/tools/config.py b/openerp/tools/config.py index 3304a4f4a99..ce6745af7d4 100644 --- a/openerp/tools/config.py +++ b/openerp/tools/config.py @@ -261,8 +261,12 @@ class configmanager(object): "osv_memory tables. This is a decimal value expressed in hours, " "and the default is 1 hour.", type="float") + group.add_option("--max-cron-threads", dest="max_cron_threads", my_default=4, + help="Maximum number of threads processing concurrently cron jobs.", + type="int") group.add_option("--unaccent", dest="unaccent", my_default=False, action="store_true", help="Use the unaccent function provided by the database when available.") + parser.add_option_group(group) # Copy all optparse options (i.e. MyOption) into self.options. @@ -365,7 +369,7 @@ class configmanager(object): 'stop_after_init', 'logrotate', 'without_demo', 'netrpc', 'xmlrpc', 'syslog', 'list_db', 'xmlrpcs', 'test_file', 'test_disable', 'test_commit', 'test_report_directory', - 'osv_memory_count_limit', 'osv_memory_age_limit', 'unaccent', + 'osv_memory_count_limit', 'osv_memory_age_limit', 'max_cron_threads', 'unaccent', ] for arg in keys: @@ -447,6 +451,8 @@ class configmanager(object): if opt.save: self.save() + openerp.conf.max_cron_threads = self.options['max_cron_threads'] + openerp.conf.addons_paths = self.options['addons_path'].split(',') openerp.conf.server_wide_modules = \ map(lambda m: m.strip(), opt.server_wide_modules.split(',')) if \