
173 lines
5.3 KiB

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
""" Cron jobs scheduling
Cron jobs are defined in the ir_cron table/model. This module deals with all
cron jobs, for all databases of a single OpenERP server instance.
It defines a single master thread that will spawn (a bounded number of)
threads to process individual cron jobs.
import heapq
import logging
import threading
import time
import openerp
""" Singleton that keeps track of cancellable tasks to run at a given
The tasks are characterised by:
* a timestamp
* the database on which the task run
* a boolean attribute specifying if the task is canceled
Implementation details:
- Tasks are stored as list, allowing the cancellation by setting
the boolean to True.
- A heapq is used to store tasks, so we don't need to sort
tasks ourself.
# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
# the context of the cron management. This is not about loading a database
# or otherwise making anything about it.
_wakeups = [] # TODO protect this variable with a lock?
# Mapping of database names to the wake-up defined in the heapq,
# so that we can cancel the wake-up without messing with the heapq
# internal structure.
_wakeup_by_db = {}
_logger = logging.getLogger('cron')
# We could use a simple (non-reentrant) lock if the runner function below
# was more fine-grained, but we are fine with the loop owning the lock
# while spawning a few threads.
_wakeups_lock = threading.RLock()
_thread_count_lock = threading.Lock()
# Maximum number of threads allowed to process cron jobs concurrently.
_thread_count = 2
def get_thread_count():
return _thread_count
def inc_thread_count():
global _thread_count
with _thread_count_lock:
_thread_count += 1
def dec_thread_count():
global _thread_count
with _thread_count_lock:
_thread_count -= 1
def cancel(db_name):
""" Cancel the next wake-up of a given database, if any. """
_logger.debug("Cancel next wake-up for database '%s'.", db_name)
with _wakeups_lock:
if db_name in _wakeup_by_db:
_wakeup_by_db[db_name][2] = True
def cancel_all():
""" Cancel all database wake-ups. """
global _wakeups
global _wakeup_by_db
with _wakeups_lock:
_wakeups = []
_wakeup_by_db = {}
def schedule_in_advance(timestamp, db_name):
""" Schedule a wake-up for a new database.
If an earlier wake-up is already defined, the new wake-up is discarded.
If another wake-up is defined, it is discarded.
if not timestamp:
with _wakeups_lock:
# Cancel the previous wakeup if any.
add_wakeup = False
if db_name in _wakeup_by_db:
task = _wakeup_by_db[db_name]
if task[2] or timestamp < task[0]:
add_wakeup = True
task[2] = True
add_wakeup = True
if add_wakeup:
task = [timestamp, db_name, False]
heapq.heappush(_wakeups, task)
_wakeup_by_db[db_name] = task
def runner():
"""Neverending function (intended to be ran in a dedicated thread) that
checks every 60 seconds the next database wake-up. TODO: make configurable
while True:
with _wakeups_lock:
while _wakeups and _wakeups[0][0] < time.time() and get_thread_count():
task = heapq.heappop(_wakeups)
timestamp, db_name, canceled = task
if canceled:
task[2] = True
registry = openerp.pooler.get_pool(db_name)
if not registry._init:
amount = 60
with _wakeups_lock:
if _wakeups and get_thread_count():
amount = min(60, _wakeups[0][0] - time.time())
def start_master_thread():
""" Start the above runner function in a daemon thread.
The thread is a typical daemon thread: it will never quit and must be
terminated when the main process exits - with no consequence (the processing
threads it spawns are not marked daemon).
t = threading.Thread(target=runner, name="openerp.cron.master_thread")
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: