cron cleanup, back to the Kernighan KISS roots 1min poll time, rely only on database, multiprocess/multiserver ready.

Nota: If we replace sequence signaling for cache invalidation with pg
listen/notify in the future, we will use the same mechanism for more accurate
cron timing.

bzr revid: al@openerp.com-20121209170447-zs0k3jazokylwvar
This commit is contained in:
Antony Lesuisse 2012-12-09 18:04:47 +01:00
parent d7d1fe562c
commit 42f292af93
7 changed files with 79 additions and 371 deletions

View File

@ -31,7 +31,6 @@ import netsvc
import openerp import openerp
import pooler import pooler
import tools import tools
from openerp.cron import WAKE_UP_NOW
from osv import fields, osv from osv import fields, osv
from tools import DEFAULT_SERVER_DATETIME_FORMAT from tools import DEFAULT_SERVER_DATETIME_FORMAT
from tools.safe_eval import safe_eval as eval from tools.safe_eval import safe_eval as eval
@ -142,130 +141,6 @@ class ir_cron(osv.osv):
except Exception, e: except Exception, e:
self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e) self._handle_callback_exception(cr, uid, model_name, method_name, args, job_id, e)
def _run_job(self, cr, job, now):
""" Run a given job taking care of the repetition.
The cursor has a lock on the job (aquired by _run_jobs_multithread()) and this
method is run in a worker thread (spawned by _run_jobs_multithread())).
:param job: job to be run (as a dictionary).
:param now: timestamp (result of datetime.now(), no need to call it multiple time).
"""
try:
nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
numbercall = job['numbercall']
ok = False
while nextcall < now and numbercall:
if numbercall > 0:
numbercall -= 1
if not ok or job['doall']:
self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
if numbercall:
nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
ok = True
addsql = ''
if not numbercall:
addsql = ', active=False'
cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
(nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
if numbercall:
# Reschedule our own main cron thread if necessary.
# This is really needed if this job runs longer than its rescheduling period.
nextcall = calendar.timegm(nextcall.timetuple())
openerp.cron.schedule_wakeup(nextcall, cr.dbname)
finally:
cr.commit()
cr.close()
openerp.cron.release_thread_slot()
def _run_jobs_multithread(self):
# TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
""" Process the cron jobs by spawning worker threads.
This selects in database all the jobs that should be processed. It then
tries to lock each of them and, if it succeeds, spawns a thread to run
the cron job (if it doesn't succeed, it means the job was already
locked to be taken care of by another thread).
The cursor used to lock the job in database is given to the worker
thread (which has to close it itself).
"""
db = self.pool.db
cr = db.cursor()
db_name = db.dbname
try:
jobs = {} # mapping job ids to jobs for all jobs being processed.
now = datetime.now()
# Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
cr.execute("""SELECT * FROM ir_cron
WHERE numbercall != 0
AND active AND nextcall <= (now() at time zone 'UTC')
ORDER BY priority""")
for job in cr.dictfetchall():
if not openerp.cron.get_thread_slots():
break
jobs[job['id']] = job
task_cr = db.cursor()
try:
# Try to grab an exclusive lock on the job row from within the task transaction
acquired_lock = False
task_cr.execute("""SELECT *
FROM ir_cron
WHERE id=%s
FOR UPDATE NOWAIT""",
(job['id'],), log_exceptions=False)
acquired_lock = True
except psycopg2.OperationalError, e:
if e.pgcode == '55P03':
# Class 55: Object not in prerequisite state; 55P03: lock_not_available
_logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
continue
else:
# Unexpected OperationalError
raise
finally:
if not acquired_lock:
# we're exiting due to an exception while acquiring the lot
task_cr.close()
# Got the lock on the job row, now spawn a thread to execute it in the transaction with the lock
task_thread = threading.Thread(target=self._run_job, name=job['name'], args=(task_cr, job, now))
# force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
task_thread.setDaemon(False)
openerp.cron.take_thread_slot()
task_thread.start()
_logger.debug('Cron execution thread for job `%s` spawned', job['name'])
# Find next earliest job ignoring currently processed jobs (by this and other cron threads)
find_next_time_query = """SELECT min(nextcall) AS min_next_call
FROM ir_cron WHERE numbercall != 0 AND active"""
if jobs:
cr.execute(find_next_time_query + " AND id NOT IN %s", (tuple(jobs.keys()),))
else:
cr.execute(find_next_time_query)
next_call = cr.dictfetchone()['min_next_call']
if next_call:
next_call = calendar.timegm(time.strptime(next_call, DEFAULT_SERVER_DATETIME_FORMAT))
else:
# no matching cron job found in database, re-schedule arbitrarily in 1 day,
# this delay will likely be modified when running jobs complete their tasks
next_call = time.time() + (24*3600)
openerp.cron.schedule_wakeup(next_call, db_name)
except Exception, ex:
_logger.warning('Exception in cron:', exc_info=True)
finally:
cr.commit()
cr.close()
def _process_job(self, cr, job): def _process_job(self, cr, job):
""" Run a given job taking care of the repetition. """ Run a given job taking care of the repetition.
@ -365,19 +240,6 @@ class ir_cron(osv.osv):
return False return False
def update_running_cron(self, cr):
""" Schedule as soon as possible a wake-up for this database. """
# Verify whether the server is already started and thus whether we need to commit
# immediately our changes and restart the cron agent in order to apply the change
# immediately. The commit() is needed because as soon as the cron is (re)started it
# will query the database with its own cursor, possibly before the end of the
# current transaction.
# This commit() is not an issue in most cases, but we must absolutely avoid it
# when the server is only starting or loading modules (hence the test on pool._init).
if not self.pool._init:
cr.commit()
openerp.cron.schedule_wakeup(WAKE_UP_NOW, self.pool.db.dbname)
def _try_lock(self, cr, uid, ids, context=None): def _try_lock(self, cr, uid, ids, context=None):
"""Try to grab a dummy exclusive write-lock to the rows with the given ids, """Try to grab a dummy exclusive write-lock to the rows with the given ids,
to make sure a following write() or unlink() will not block due to make sure a following write() or unlink() will not block due
@ -393,20 +255,16 @@ class ir_cron(osv.osv):
def create(self, cr, uid, vals, context=None): def create(self, cr, uid, vals, context=None):
res = super(ir_cron, self).create(cr, uid, vals, context=context) res = super(ir_cron, self).create(cr, uid, vals, context=context)
self.update_running_cron(cr)
return res return res
def write(self, cr, uid, ids, vals, context=None): def write(self, cr, uid, ids, vals, context=None):
self._try_lock(cr, uid, ids, context) self._try_lock(cr, uid, ids, context)
res = super(ir_cron, self).write(cr, uid, ids, vals, context=context) res = super(ir_cron, self).write(cr, uid, ids, vals, context=context)
self.update_running_cron(cr)
return res return res
def unlink(self, cr, uid, ids, context=None): def unlink(self, cr, uid, ids, context=None):
self._try_lock(cr, uid, ids, context) self._try_lock(cr, uid, ids, context)
res = super(ir_cron, self).unlink(cr, uid, ids, context=context) res = super(ir_cron, self).unlink(cr, uid, ids, context=context)
self.update_running_cron(cr)
return res return res
ir_cron()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -35,10 +35,6 @@ must be used.
import deprecation import deprecation
# Maximum number of threads processing concurrently cron jobs.
max_cron_threads = 4 # Actually the default value here is meaningless,
# look at tools.config for the default value.
# Paths to search for OpenERP addons. # Paths to search for OpenERP addons.
addons_paths = [] addons_paths = []

View File

@ -1,215 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##############################################################################
#
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
""" Cron jobs scheduling
Cron jobs are defined in the ir_cron table/model. This module deals with all
cron jobs, for all databases of a single OpenERP server instance.
It defines a single master thread that will spawn (a bounded number of)
threads to process individual cron jobs.
The thread runs forever, checking every 60 seconds for new
'database wake-ups'. It maintains a heapq of database wake-ups. At each
wake-up, it will call ir_cron._run_jobs_multithread() for the given database. _run_jobs_multithread
will check the jobs defined in the ir_cron table and spawn accordingly threads
to process them.
This module's behavior depends on the following configuration variable:
openerp.conf.max_cron_threads.
"""
import heapq
import logging
import threading
import time
import openerp
import tools
_logger = logging.getLogger(__name__)
# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
# the context of the cron management. This is not originally about loading
# a database, although having the database name in the queue will
# cause it to be loaded when the schedule time is reached, even if it was
# unloaded in the mean time. Normally a database's wake-up is cancelled by
# the RegistryManager when the database is unloaded - so this should not
# cause it to be reloaded.
#
# TODO: perhaps in the future we could consider a flag on ir.cron jobs
# that would cause database wake-up even if the database has not been
# loaded yet or was already unloaded (e.g. 'force_db_wakeup' or something)
#
# Each element is a triple (timestamp, database-name, boolean). The boolean
# specifies if the wake-up is canceled (so a wake-up can be canceled without
# relying on the heapq implementation detail; no need to remove the job from
# the heapq).
_wakeups = []
# Mapping of database names to the wake-up defined in the heapq,
# so that we can cancel the wake-up without messing with the heapq
# invariant: lookup the wake-up by database-name, then set
# its third element to True.
_wakeup_by_db = {}
# Re-entrant lock to protect the above _wakeups and _wakeup_by_db variables.
# We could use a simple (non-reentrant) lock if the runner function below
# was more fine-grained, but we are fine with the loop owning the lock
# while spawning a few threads.
_wakeups_lock = threading.RLock()
# Maximum number of threads allowed to process cron jobs concurrently. This
# variable is set by start_master_thread using openerp.conf.max_cron_threads.
_thread_slots = None
# A (non re-entrant) lock to protect the above _thread_slots variable.
_thread_slots_lock = threading.Lock()
# Sleep duration limits - must not loop too quickly, but can't sleep too long
# either, because a new job might be inserted in ir_cron with a much sooner
# execution date than current known ones. We won't see it until we wake!
MAX_SLEEP = 60 # 1 min
MIN_SLEEP = 1 # 1 sec
# Dummy wake-up timestamp that can be used to force a database wake-up asap
WAKE_UP_NOW = 1
def get_thread_slots():
""" Return the number of available thread slots """
return _thread_slots
def release_thread_slot():
""" Increment the number of available thread slots """
global _thread_slots
with _thread_slots_lock:
_thread_slots += 1
def take_thread_slot():
""" Decrement the number of available thread slots """
global _thread_slots
with _thread_slots_lock:
_thread_slots -= 1
def cancel(db_name):
""" Cancel the next wake-up of a given database, if any.
:param db_name: database name for which the wake-up is canceled.
"""
_logger.debug("Cancel next wake-up for database '%s'.", db_name)
with _wakeups_lock:
if db_name in _wakeup_by_db:
_wakeup_by_db[db_name][2] = True
def cancel_all():
""" Cancel all database wake-ups. """
_logger.debug("Cancel all database wake-ups")
global _wakeups
global _wakeup_by_db
with _wakeups_lock:
_wakeups = []
_wakeup_by_db = {}
def schedule_wakeup(timestamp, db_name):
""" Schedule a new wake-up for a database.
If an earlier wake-up is already defined, the new wake-up is discarded.
If another wake-up is defined, that wake-up is discarded and the new one
is scheduled.
:param db_name: database name for which a new wake-up is scheduled.
:param timestamp: when the wake-up is scheduled.
"""
if not timestamp:
return
with _wakeups_lock:
if db_name in _wakeup_by_db:
task = _wakeup_by_db[db_name]
if not task[2] and timestamp > task[0]:
# existing wakeup is valid and occurs earlier than new one
return
task[2] = True # cancel existing task
task = [timestamp, db_name, False]
heapq.heappush(_wakeups, task)
_wakeup_by_db[db_name] = task
_logger.debug("Wake-up scheduled for database '%s' @ %s", db_name,
'NOW' if timestamp == WAKE_UP_NOW else timestamp)
def runner():
"""Neverending function (intended to be run in a dedicated thread) that
checks every 60 seconds the next database wake-up. TODO: make configurable
"""
while True:
runner_body()
def runner_body():
with _wakeups_lock:
while _wakeups and _wakeups[0][0] < time.time() and get_thread_slots():
task = heapq.heappop(_wakeups)
timestamp, db_name, canceled = task
if canceled:
continue
del _wakeup_by_db[db_name]
registry = openerp.pooler.get_pool(db_name)
if not registry._init:
_logger.debug("Database '%s' wake-up! Firing multi-threaded cron job processing", db_name)
registry['ir.cron']._run_jobs_multithread()
amount = MAX_SLEEP
with _wakeups_lock:
# Sleep less than MAX_SLEEP if the next known wake-up will happen before that.
if _wakeups and get_thread_slots():
amount = min(MAX_SLEEP, max(MIN_SLEEP, _wakeups[0][0] - time.time()))
_logger.debug("Going to sleep for %ss", amount)
time.sleep(amount)
def start_master_thread():
""" Start the above runner function in a daemon thread.
The thread is a typical daemon thread: it will never quit and must be
terminated when the main process exits - with no consequence (the processing
threads it spawns are not marked daemon).
"""
global _thread_slots
_thread_slots = openerp.conf.max_cron_threads
db_maxconn = tools.config['db_maxconn']
if _thread_slots >= tools.config.get('db_maxconn', 64):
_logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
"this may cause trouble if you reach that number of parallel cron tasks.",
db_maxconn, _thread_slots)
if _thread_slots:
t = threading.Thread(target=runner, name="openerp.cron.master_thread")
t.setDaemon(True)
t.start()
_logger.debug("Master cron daemon started!")
else:
_logger.info("No master cron daemon (0 workers needed).")
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -28,7 +28,6 @@ import threading
import openerp.sql_db import openerp.sql_db
import openerp.osv.orm import openerp.osv.orm
import openerp.cron
import openerp.tools import openerp.tools
import openerp.modules.db import openerp.modules.db
import openerp.tools.config import openerp.tools.config
@ -58,6 +57,9 @@ class Registry(object):
self.db_name = db_name self.db_name = db_name
self.db = openerp.sql_db.db_connect(db_name) self.db = openerp.sql_db.db_connect(db_name)
# In monoprocess cron jobs flag (pooljobs)
self.cron = False
# Inter-process signaling (used only when openerp.multi_process is True): # Inter-process signaling (used only when openerp.multi_process is True):
# The `base_registry_signaling` sequence indicates the whole registry # The `base_registry_signaling` sequence indicates the whole registry
# must be reloaded. # must be reloaded.
@ -124,7 +126,7 @@ class Registry(object):
monitor the ir.cron model for future jobs. See openerp.cron for monitor the ir.cron model for future jobs. See openerp.cron for
details. details.
""" """
openerp.cron.schedule_wakeup(openerp.cron.WAKE_UP_NOW, self.db.dbname) self.cron = True
def clear_caches(self): def clear_caches(self):
""" Clear the caches """ Clear the caches

View File

@ -24,10 +24,11 @@ import logging
import threading import threading
import time import time
import cron
import netrpc_server import netrpc_server
import web_services import web_services
import web_services
import openerp.cron
import openerp.modules import openerp.modules
import openerp.netsvc import openerp.netsvc
import openerp.osv import openerp.osv
@ -83,8 +84,7 @@ def start_services():
netrpc_server.init_servers() netrpc_server.init_servers()
# Start the main cron thread. # Start the main cron thread.
if openerp.conf.max_cron_threads: cron.start_master_thread()
openerp.cron.start_master_thread()
# Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC). # Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
openerp.netsvc.Server.startAll() openerp.netsvc.Server.startAll()
@ -95,8 +95,6 @@ def start_services():
def stop_services(): def stop_services():
""" Stop all services. """ """ Stop all services. """
# stop scheduling new jobs; we will have to wait for the jobs to complete below # stop scheduling new jobs; we will have to wait for the jobs to complete below
openerp.cron.cancel_all()
openerp.netsvc.Server.quitAll() openerp.netsvc.Server.quitAll()
openerp.service.wsgi_server.stop_server() openerp.service.wsgi_server.stop_server()
_logger.info("Initiating shutdown") _logger.info("Initiating shutdown")

71
openerp/service/cron.py Normal file
View File

@ -0,0 +1,71 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##############################################################################
#
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
""" Cron jobs scheduling
Cron jobs are defined in the ir_cron table/model. This module deals with all
cron jobs, for all databases of a single OpenERP server instance.
"""
import logging
import threading
import time
import openerp
_logger = logging.getLogger(__name__)
SLEEP_INTERVAL = 60 # 1 min
def cron_runner(number):
while True:
time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style
registries = openerp.modules.registry.RegistryManager.registries
_logger.debug('cron%d polling for jobs', number)
for db_name, registry in registries.items():
while True and registry.cron:
# acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
# TODO why isnt openerp.addons.base defined ?
import sys
base = sys.modules['addons.base']
acquired = base.ir.ir_cron.ir_cron._acquire_job(db_name)
if not acquired:
break
def start_master_thread():
""" Start the above runner function in a daemon thread.
The thread is a typical daemon thread: it will never quit and must be
terminated when the main process exits - with no consequence (the processing
threads it spawns are not marked daemon).
"""
for i in range(openerp.tools.config['max_cron_threads']):
def target():
cron_runner(i)
t = threading.Thread(target=target, name="openerp.service.cron.cron_runner%d" % i)
t.setDaemon(True)
t.start()
_logger.debug("cron%d started!" % i)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -269,7 +269,7 @@ class configmanager(object):
"osv_memory tables. This is a decimal value expressed in hours, " "osv_memory tables. This is a decimal value expressed in hours, "
"and the default is 1 hour.", "and the default is 1 hour.",
type="float") type="float")
group.add_option("--max-cron-threads", dest="max_cron_threads", my_default=4, group.add_option("--max-cron-threads", dest="max_cron_threads", my_default=2,
help="Maximum number of threads processing concurrently cron jobs.", help="Maximum number of threads processing concurrently cron jobs.",
type="int") type="int")
group.add_option("--unaccent", dest="unaccent", my_default=False, action="store_true", group.add_option("--unaccent", dest="unaccent", my_default=False, action="store_true",
@ -479,8 +479,6 @@ class configmanager(object):
if opt.save: if opt.save:
self.save() self.save()
openerp.conf.max_cron_threads = self.options['max_cron_threads']
openerp.conf.addons_paths = self.options['addons_path'].split(',') openerp.conf.addons_paths = self.options['addons_path'].split(',')
if opt.server_wide_modules: if opt.server_wide_modules:
openerp.conf.server_wide_modules = map(lambda m: m.strip(), opt.server_wide_modules.split(',')) openerp.conf.server_wide_modules = map(lambda m: m.strip(), opt.server_wide_modules.split(','))