#!/usr/bin/env python # -*- coding: utf-8 -*- ############################################################################## # # OpenERP, Open Source Management Solution # Copyright (C) 2004-2011 OpenERP SA () # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # ############################################################################## """ Cron jobs scheduling Cron jobs are defined in the ir_cron table/model. This module deals with all cron jobs, for all databases of a single OpenERP server instance. It defines a single master thread that will spawn (a bounded number of) threads to process individual cron jobs. """ import heapq import logging import threading import time import openerp """ Singleton that keeps track of cancellable tasks to run at a given timestamp. The tasks are characterised by: * a timestamp * the database on which the task run * a boolean attribute specifying if the task is canceled Implementation details: - Tasks are stored as list, allowing the cancellation by setting the boolean to True. - A heapq is used to store tasks, so we don't need to sort tasks ourself. """ # Heapq of database wake-ups. Note that 'database wake-up' meaning is in # the context of the cron management. This is not about loading a database # or otherwise making anything about it. _wakeups = [] # TODO protect this variable with a lock? # Mapping of database names to the wake-up defined in the heapq, # so that we can cancel the wake-up without messing with the heapq # internal structure. _wakeup_by_db = {} _logger = logging.getLogger('cron') _thread_count_lock = threading.Lock() # Maximum number of threads allowed to process cron jobs concurrently. _thread_count = 2 def get_thread_count(): return _thread_count def inc_thread_count(): global _thread_count with _thread_count_lock: _thread_count += 1 def dec_thread_count(): global _thread_count with _thread_count_lock: _thread_count -= 1 def cancel(db_name): """ Cancel the next wake-up of a given database, if any. """ _logger.debug("Cancel next wake-up for database '%s'.", db_name) if db_name in _wakeup_by_db: _wakeup_by_db[db_name][2] = True def cancel_all(): """ Cancel all database wake-ups. """ global _wakeups global _wakeup_by_db _wakeups = [] _wakeup_by_db = {} def schedule_in_advance(timestamp, db_name): """ Schedule a wake-up for a new database. If an earlier wake-up is already defined, the new wake-up is discarded. If another wake-up is defined, it is discarded. """ if not timestamp: return # Cancel the previous wakeup if any. add_wakeup = False if db_name in _wakeup_by_db: task = _wakeup_by_db[db_name] if task[2] or timestamp < task[0]: add_wakeup = True task[2] = True else: add_wakeup = True if add_wakeup: task = [timestamp, db_name, False] heapq.heappush(_wakeups, task) _wakeup_by_db[db_name] = task def runner(): """Neverending function (intended to be ran in a dedicated thread) that checks every 60 seconds the next database wake-up. TODO: make configurable """ while True: while _wakeups and _wakeups[0][0] < time.time() and get_thread_count(): task = heapq.heappop(_wakeups) timestamp, db_name, canceled = task if canceled: continue task[2] = True registry = openerp.pooler.get_pool(db_name) if not registry._init: registry['ir.cron']._run_jobs() if _wakeups and get_thread_count(): time.sleep(min(60, _wakeups[0][0] - time.time())) else: time.sleep(60) def start_master_thread(): """ Start the above runner function in a daemon thread. The thread is a typical daemon thread: it will never quit and must be terminated when the main process exits - with no consequence (the processing threads it spawns are not marked daemon). """ t = threading.Thread(target=runner, name="openerp.cron.master_thread") t.setDaemon(True) t.start() # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: