From 4679680dba42a4071c8fbbc43a33e2e61bdb812e Mon Sep 17 00:00:00 2001 From: Antony Lesuisse Date: Sat, 22 Sep 2012 12:51:07 +0200 Subject: [PATCH] manual merge of multicorn bzr revid: al@openerp.com-20120922105107-q02ogtsfudphzkl6 --- gunicorn.conf.py | 63 --- openerp-server | 26 +- openerp-wsgi.py | 53 +++ openerp/__init__.py | 12 +- openerp/addons/base/ir/ir_cron.py | 99 +++++ openerp/db/__init__.py | 30 -- openerp/service/__init__.py | 45 ++- openerp/service/netrpc_server.py | 4 +- .../netrpc_socket.py} | 5 +- openerp/service/workers.py | 368 ++++++++++++++++++ .../{wsgi/core.py => service/wsgi_server.py} | 131 +------ openerp/tools/config.py | 47 +-- openerp/wsgi/__init__.py | 33 -- openerp/wsgi/proxied.py | 34 -- 14 files changed, 618 insertions(+), 332 deletions(-) delete mode 100644 gunicorn.conf.py create mode 100644 openerp-wsgi.py delete mode 100644 openerp/db/__init__.py rename openerp/{tiny_socket.py => service/netrpc_socket.py} (98%) create mode 100644 openerp/service/workers.py rename openerp/{wsgi/core.py => service/wsgi_server.py} (79%) delete mode 100644 openerp/wsgi/__init__.py delete mode 100644 openerp/wsgi/proxied.py diff --git a/gunicorn.conf.py b/gunicorn.conf.py deleted file mode 100644 index 0f1d93a8638..00000000000 --- a/gunicorn.conf.py +++ /dev/null @@ -1,63 +0,0 @@ -# Gunicorn sample configuration file. -# See http://gunicorn.org/configure.html for more details. -# -# To run the OpenERP server via Gunicorn, change the appropriate -# settings below, in order to provide the parameters that -# would normally be passed in the command-line, -# (at least `bind` and `conf['addons_path']`), then execute: -# $ gunicorn openerp:wsgi.core.application -c gunicorn.conf.py -# or if you want to run it behind a reverse proxy, add the line -# import openerp.wsgi.proxied -# in this file and execute: -# $ gunicorn openerp:wsgi.proxied.application -c gunicorn.conf.py - -import openerp - -# Standard OpenERP XML-RPC port is 8069 -bind = '127.0.0.1:8069' - -pidfile = '.gunicorn.pid' - -# Gunicorn recommends 2-4 x number_of_cpu_cores, but -# you'll want to vary this a bit to find the best for your -# particular work load. -workers = 4 - -# Some application-wide initialization is needed. -on_starting = openerp.wsgi.core.on_starting -when_ready = openerp.wsgi.core.when_ready -pre_request = openerp.wsgi.core.pre_request -post_request = openerp.wsgi.core.post_request - -# openerp request-response cycle can be quite long for -# big reports for example -timeout = 240 - -max_requests = 2000 - -# Equivalent of --load command-line option -openerp.conf.server_wide_modules = ['web'] - -# internal TODO: use openerp.conf.xxx when available -conf = openerp.tools.config - -# Path to the OpenERP Addons repository (comma-separated for -# multiple locations) -conf['addons_path'] = '/home/openerp/addons/trunk,/home/openerp/web/trunk/addons' - -# Optional database config if not using local socket -#conf['db_name'] = 'mycompany' -#conf['db_host'] = 'localhost' -#conf['db_user'] = 'foo' -#conf['db_port'] = 5432 -#conf['db_password'] = 'secret' - -# OpenERP Log Level -# DEBUG=10, DEBUG_RPC=8, DEBUG_RPC_ANSWER=6, DEBUG_SQL=5, INFO=20, -# WARNING=30, ERROR=40, CRITICAL=50 -# conf['log_level'] = 20 - -# If --static-http-enable is used, path for the static web directory -#conf['static_http_document_root'] = '/var/www' - -# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp-server b/openerp-server index 5d717c834de..5222de88eb4 100755 --- a/openerp-server +++ b/openerp-server @@ -110,7 +110,6 @@ def run_test_file(dbname, test_file): except Exception: _logger.exception('Failed to initialize database `%s` and run test file `%s`.', dbname, test_file) - def export_translation(): config = openerp.tools.config dbname = config['db_name'] @@ -205,6 +204,7 @@ def quit_on_signals(): except KeyboardInterrupt: pass + config = openerp.tools.config if config['pidfile']: os.unlink(config['pidfile']) @@ -217,8 +217,7 @@ def configure_babel_localedata_path(): import babel babel.localedata._dirname = os.path.join(os.path.dirname(sys.executable), 'localedata') -if __name__ == "__main__": - +def main(): os.environ["TZ"] = "UTC" check_root_user() @@ -247,20 +246,13 @@ if __name__ == "__main__": sys.exit(0) if not config["stop_after_init"]: + setup_pid_file() # Some module register themselves when they are loaded so we need the # services to be running before loading any registry. - openerp.service.start_services() - - for m in openerp.conf.server_wide_modules: - try: - openerp.modules.module.load_openerp_module(m) - except Exception: - msg = '' - if m == 'web': - msg = """ -The `web` module is provided by the addons found in the `openerp-web` project. -Maybe you forgot to add those addons in your addons_path configuration.""" - _logger.exception('Failed to load server-wide module `%s`.%s', m, msg) + if config['workers']: + openerp.service.start_services_workers() + else: + openerp.service.start_services() if config['db_name']: for dbname in config['db_name'].split(','): @@ -269,8 +261,10 @@ Maybe you forgot to add those addons in your addons_path configuration.""" if config["stop_after_init"]: sys.exit(0) - setup_pid_file() _logger.info('OpenERP server is running, waiting for connections...') quit_on_signals() +if __name__ == "__main__": + main() + # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp-wsgi.py b/openerp-wsgi.py new file mode 100644 index 00000000000..7382b90d3b7 --- /dev/null +++ b/openerp-wsgi.py @@ -0,0 +1,53 @@ +#!/usr/bin/python +# WSGI Handler sample configuration file. +# +# Change the appropriate settings below, in order to provide the parameters +# that would normally be passed in the command-line. +# (at least conf['addons_path']) +# +# For generic wsgi handlers a global application is defined. +# For uwsgi this should work: +# $ uwsgi --http :9090 --pythonpath . --wsgi-file openerp-wsgi.py +# +# For gunicorn additional globals need to be defined in the Gunicorn section. +# Then the following command should run: +# $ gunicorn openerp:wsgi.core.application -c gunicorn.conf.py + +import openerp + +#---------------------------------------------------------- +# Common +#---------------------------------------------------------- +# Equivalent of --load command-line option +openerp.conf.server_wide_modules = ['web'] +conf = openerp.tools.config + +# Path to the OpenERP Addons repository (comma-separated for +# multiple locations) +conf['addons_path'] = '/home/openerp/addons/trunk,/home/openerp/web/trunk/addons' +conf['addons_path'] = '/home/wis/stuff/version/openerp/source/addons/6.1,/home/wis/stuff/version/openerp/source/web/6.1/addons' + + +# Optional database config if not using local socket +#conf['db_name'] = 'mycompany' +#conf['db_host'] = 'localhost' +#conf['db_user'] = 'foo' +#conf['db_port'] = 5432 +#conf['db_password'] = 'secret' + +#---------------------------------------------------------- +# Generic WSGI handlers application +#---------------------------------------------------------- +application = openerp.service.wsgi_server.application + +#---------------------------------------------------------- +# Gunicorn +#---------------------------------------------------------- +# Standard OpenERP XML-RPC port is 8069 +bind = '127.0.0.1:8069' +pidfile = '.gunicorn.pid' +workers = 4 +timeout = 240 +max_requests = 2000 + +# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/__init__.py b/openerp/__init__.py index 5827b744cf8..2fd2a9812f9 100644 --- a/openerp/__init__.py +++ b/openerp/__init__.py @@ -38,11 +38,19 @@ import run_tests import service import sql_db import test -import tiny_socket import tools import wizard import workflow -import wsgi +# backward compatilbility +# TODO: This is for the web addons, can be removed later. +wsgi = service +wsgi.register_wsgi_handler = wsgi.wsgi_server.register_wsgi_handler +# Is the server running in multi-process mode (e.g. behind Gunicorn). +# If this is True, the processes have to communicate some events, +# e.g. database update or cache invalidation. Each process has also +# its own copy of the data structure and we don't need to care about +# locks between threads. +multi_process = True # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/addons/base/ir/ir_cron.py b/openerp/addons/base/ir/ir_cron.py index 3c437c93282..dbc89278ece 100644 --- a/openerp/addons/base/ir/ir_cron.py +++ b/openerp/addons/base/ir/ir_cron.py @@ -266,6 +266,105 @@ class ir_cron(osv.osv): cr.commit() cr.close() + def _process_job(self, cr, job): + """ Run a given job taking care of the repetition. + + The cursor has a lock on the job (aquired by _acquire_job()). + + :param job: job to be run (as a dictionary). + """ + try: + now = datetime.now() + nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT) + numbercall = job['numbercall'] + + ok = False + while nextcall < now and numbercall: + if numbercall > 0: + numbercall -= 1 + if not ok or job['doall']: + self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id']) + if numbercall: + nextcall += _intervalTypes[job['interval_type']](job['interval_number']) + ok = True + addsql = '' + if not numbercall: + addsql = ', active=False' + cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s", + (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id'])) + + finally: + cr.commit() + cr.close() + + @classmethod + def _acquire_job(cls, db_name): + # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py + """ Try to process one cron job. + + This selects in database all the jobs that should be processed. It then + tries to lock each of them and, if it succeeds, run the cron job (if it + doesn't succeed, it means the job was already locked to be taken care + of by another thread) and return. + + If a job was processed, returns True, otherwise returns False. + """ + db = openerp.sql_db.db_connect(db_name) + cr = db.cursor() + try: + # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1. + cr.execute("""SELECT * FROM ir_cron + WHERE numbercall != 0 + AND active AND nextcall <= (now() at time zone 'UTC') + ORDER BY priority""") + for job in cr.dictfetchall(): + task_cr = db.cursor() + try: + # Try to grab an exclusive lock on the job row from within the task transaction + acquired_lock = False + task_cr.execute("""SELECT * + FROM ir_cron + WHERE id=%s + FOR UPDATE NOWAIT""", + (job['id'],), log_exceptions=False) + acquired_lock = True + except psycopg2.OperationalError, e: + if e.pgcode == '55P03': + # Class 55: Object not in prerequisite state; 55P03: lock_not_available + _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name']) + continue + else: + # Unexpected OperationalError + raise + finally: + if not acquired_lock: + # we're exiting due to an exception while acquiring the lot + task_cr.close() + + # Got the lock on the job row, run its code + _logger.debug('Starting job `%s`.', job['name']) + openerp.modules.registry.RegistryManager.check_registry_signaling(db_name) + registry = openerp.pooler.get_pool(db_name) + registry[cls._name]._process_job(task_cr, job) + openerp.modules.registry.RegistryManager.signal_caches_change(db_name) + return True + + except psycopg2.ProgrammingError, e: + if e.pgcode == '42P01': + # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table + # The table ir_cron does not exist; this is probably not an OpenERP database. + _logger.warning('Tried to poll an undefined table on database %s.', db_name) + else: + raise + except Exception, ex: + _logger.warning('Exception in cron:', exc_info=True) + + finally: + cr.commit() + cr.close() + + return False + def update_running_cron(self, cr): """ Schedule as soon as possible a wake-up for this database. """ # Verify whether the server is already started and thus whether we need to commit diff --git a/openerp/db/__init__.py b/openerp/db/__init__.py deleted file mode 100644 index 5eb06af1ad1..00000000000 --- a/openerp/db/__init__.py +++ /dev/null @@ -1,30 +0,0 @@ -# -*- coding: utf-8 -*- -############################################################################## -# -# OpenERP, Open Source Management Solution -# Copyright (C) 2011 OpenERP s.a. (). -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - -""" Lower-level database access. - -This module provides access to the underlying database without going -through the ORM. The goal is to gather sql_db.py and other various db -code. - -""" - -# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/service/__init__.py b/openerp/service/__init__.py index 511d62c5862..30a765b1f60 100644 --- a/openerp/service/__init__.py +++ b/openerp/service/__init__.py @@ -33,7 +33,8 @@ import openerp.modules import openerp.netsvc import openerp.osv import openerp.tools -import openerp.wsgi +import openerp.service.wsgi_server +import openerp.service.workers #.apidoc title: RPC Services @@ -48,22 +49,40 @@ import openerp.wsgi _logger = logging.getLogger(__name__) -# TODO block until the server is really up, accepting connections -# TODO be idemptotent (as long as stop_service was not called). -def start_services(): - """ Start all services. +def load_server_wide_modules(): + for m in openerp.conf.server_wide_modules: + try: + openerp.modules.module.load_openerp_module(m) + except Exception: + msg = '' + if m == 'web': + msg = """ +The `web` module is provided by the addons found in the `openerp-web` project. +Maybe you forgot to add those addons in your addons_path configuration.""" + _logger.exception('Failed to load server-wide module `%s`.%s', m, msg) - Services include the different servers and cron threads. +start_internal_done = False - """ +def start_internal(): + global start_internal_done + if start_internal_done: + return + openerp.netsvc.init_logger() + openerp.modules.loading.open_openerp_namespace() # Instantiate local services (this is a legacy design). openerp.osv.osv.start_object_proxy() # Export (for RPC) services. web_services.start_web_services() + load_server_wide_modules() + start_internal_done = True + +def start_services(): + """ Start all services including http, netrpc and cron """ + openerp.multi_process = False # Nah! + + start_internal() # Initialize the HTTP stack. - #http_server.init_servers() - #http_server.init_static_http() netrpc_server.init_servers() # Start the main cron thread. @@ -73,8 +92,7 @@ def start_services(): openerp.netsvc.Server.startAll() # Start the WSGI server. - openerp.wsgi.core.start_server() - + openerp.service.wsgi_server.start_server() def stop_services(): """ Stop all services. """ @@ -82,7 +100,7 @@ def stop_services(): openerp.cron.cancel_all() openerp.netsvc.Server.quitAll() - openerp.wsgi.core.stop_server() + openerp.service.wsgi_server.stop_server() config = openerp.tools.config _logger.info("Initiating shutdown") _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.") @@ -101,6 +119,9 @@ def stop_services(): openerp.modules.registry.RegistryManager.delete_all() +def start_services_workers(): + openerp.service.workers.Multicorn(openerp.service.wsgi_server.application).run() + # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/service/netrpc_server.py b/openerp/service/netrpc_server.py index fcbfaf0ba82..cc78d0620cf 100644 --- a/openerp/service/netrpc_server.py +++ b/openerp/service/netrpc_server.py @@ -32,8 +32,8 @@ import sys import threading import traceback import openerp +import openerp.service.netrpc_socket import openerp.netsvc as netsvc -import openerp.tiny_socket as tiny_socket import openerp.tools as tools _logger = logging.getLogger(__name__) @@ -52,7 +52,7 @@ class TinySocketClientThread(threading.Thread): def run(self): self.running = True try: - ts = tiny_socket.mysocket(self.sock) + ts = openerp.server.netrpc_socket.mysocket(self.sock) except Exception: self.threads.remove(self) self.running = False diff --git a/openerp/tiny_socket.py b/openerp/service/netrpc_socket.py similarity index 98% rename from openerp/tiny_socket.py rename to openerp/service/netrpc_socket.py index b5f5bbccee3..c49c8f7c2a2 100644 --- a/openerp/tiny_socket.py +++ b/openerp/service/netrpc_socket.py @@ -23,14 +23,11 @@ import socket import cPickle import cStringIO -import netsvc - -#.apidoc title: Net-RPC classes +import openerp.netsvc as netsvc # Pickle protocol version 2 is optimized compared to default (version 0) PICKLE_PROTOCOL = 2 - class Myexception(Exception): """ custom exception object store diff --git a/openerp/service/workers.py b/openerp/service/workers.py new file mode 100644 index 00000000000..824bc07bb53 --- /dev/null +++ b/openerp/service/workers.py @@ -0,0 +1,368 @@ +#----------------------------------------------------------- +# Multicorn, multiprocessing inspired by gunicorn +# TODO rename class: Multicorn -> Arbiter ? +#----------------------------------------------------------- +import errno +import fcntl +import psutil +import random +import resource +import select +import socket +import time +import logging +import os +import signal +import sys + +import werkzeug.serving + +import openerp +import openerp.tools.config as config + +_logger = logging.getLogger(__name__) + +class Multicorn(object): + """ Multiprocessing inspired by (g)unicorn. + Multicorn currently uses accept(2) as dispatching method between workers + but we plan to replace it by a more intelligent dispatcher to will parse + the first HTTP request line. + """ + def __init__(self, app): + # config + self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port']) + self.population = config['workers'] + self.timeout = config['limit_time_real'] + self.limit_request = config['limit_request'] + # working vars + self.beat = 4 + self.app = app + self.pid = os.getpid() + self.socket = None + self.workers_http = {} + self.workers_cron = {} + self.workers = {} + self.generation = 0 + self.queue = [] + + def pipe_new(self): + pipe = os.pipe() + for fd in pipe: + # non_blocking + flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + # close_on_exec + flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC + fcntl.fcntl(fd, fcntl.F_SETFD, flags) + return pipe + + def pipe_ping(self, pipe): + try: + os.write(pipe[1], '.') + except IOError, e: + if e.errno not in [errno.EAGAIN, errno.EINTR]: + raise + + def signal_handler(self, sig, frame): + if len(self.queue) < 5 or sig == signal.SIGCHLD: + self.queue.append(sig) + self.pipe_ping(self.pipe) + else: + _logger.warn("Dropping signal: %s", sig) + + def worker_spawn(self, klass, workers_registry): + self.generation += 1 + worker = klass(self) + pid = os.fork() + if pid != 0: + worker.pid = pid + self.workers[pid] = worker + workers_registry[pid] = worker + return worker + else: + worker.run() + sys.exit(0) + + def worker_pop(self, pid): + if pid in self.workers: + _logger.debug("Worker (%s) unregistered",pid) + try: + self.workers_http.pop(pid,None) + self.workers_cron.pop(pid,None) + u = self.workers.pop(pid) + u.close() + except OSError: + return + + def worker_kill(self, pid, sig): + try: + os.kill(pid, sig) + except OSError, e: + if e.errno == errno.ESRCH: + self.worker_pop(pid) + + def process_signals(self): + while len(self.queue): + sig = self.queue.pop(0) + if sig in [signal.SIGINT,signal.SIGTERM]: + raise KeyboardInterrupt + + def process_zombie(self): + # reap dead workers + while 1: + try: + wpid, status = os.waitpid(-1, os.WNOHANG) + if not wpid: + break + if (status >> 8) == 3: + msg = "Critial worker error (%s)" + _logger.critical(msg, wpid) + raise Exception(msg % wpid) + self.worker_pop(wpid) + except OSError, e: + if e.errno == errno.ECHILD: + break + raise + + def process_timeout(self): + now = time.time() + for (pid, worker) in self.workers.items(): + if now - worker.watchdog_time >= worker.watchdog_timeout: + _logger.error("Worker (%s) timeout", pid) + self.worker_kill(pid, signal.SIGKILL) + + def process_spawn(self): + while len(self.workers_http) < self.population: + self.worker_spawn(WorkerHTTP, self.workers_http) + while len(self.workers_cron) < 1: # config option ? + self.worker_spawn(WorkerCron, self.workers_cron) + + def sleep(self): + try: + # map of fd -> worker + fds = dict([(w.watchdog_pipe[0],w) for k,w in self.workers.items()]) + fd_in = fds.keys() + [self.pipe[0]] + # check for ping or internal wakeups + ready = select.select(fd_in, [], [], self.beat) + # update worker watchdogs + for fd in ready[0]: + if fd in fds: + fds[fd].watchdog_time = time.time() + try: + # empty pipe + while os.read(fd, 1): + pass + except OSError, e: + if e.errno not in [errno.EAGAIN]: + raise + except select.error, e: + if e[0] not in [errno.EINTR]: + raise + + def start(self): + # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted + # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the + # signal handler to overcome this behaviour + self.pipe = self.pipe_new() + # set signal + signal.signal(signal.SIGINT, self.signal_handler) + signal.signal(signal.SIGTERM, self.signal_handler) + signal.signal(signal.SIGCHLD, self.signal_handler) + # listen to socket + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.setblocking(0) + self.socket.bind(self.address) + self.socket.listen(8) + + def stop(self, graceful=True): + if graceful: + _logger.info("Stopping gracefully") + limit = time.time() + self.timeout + for pid in self.workers.keys(): + self.worker_kill(pid, signal.SIGTERM) + while self.workers and time.time() < limit: + self.process_zombie() + time.sleep(0.1) + else: + _logger.info("Stopping forcefully") + for pid in self.workers.keys(): + self.worker_kill(pid, signal.SIGTERM) + self.socket.close() + import __main__ + __main__.quit_signals_received = 1 + + def run(self): + self.start() + _logger.debug("Multiprocess starting") + while 1: + try: + #_logger.debug("Multiprocess beat (%s)",time.time()) + self.process_signals() + self.process_zombie() + self.process_timeout() + self.process_spawn() + self.sleep() + except KeyboardInterrupt: + _logger.debug("Multiprocess clean stop") + self.stop() + break + except Exception,e: + _logger.exception(e) + self.stop(False) + sys.exit(-1) + +class Worker(object): + """ Workers """ + def __init__(self, multi): + self.multi = multi + self.watchdog_time = time.time() + self.watchdog_pipe = multi.pipe_new() + self.watchdog_timeout = multi.timeout + self.ppid = os.getpid() + self.pid = None + self.alive = True + # should we rename into lifetime ? + self.request_max = multi.limit_request + self.request_count = 0 + + def close(self): + os.close(self.watchdog_pipe[0]) + os.close(self.watchdog_pipe[1]) + + def signal_handler(self, sig, frame): + self.alive = False + + def sleep(self): + try: + ret = select.select([self.multi.socket], [], [], self.multi.beat) + except select.error, e: + if e[0] not in [errno.EINTR]: + raise + + def process_limit(self): + # If our parent changed sucide + if self.ppid != os.getppid(): + _logger.info("Worker (%s) Parent changed", self.pid) + self.alive = False + # check for lifetime + if self.request_count >= self.request_max: + _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count) + self.alive = False + # Reset the worker if it consumes too much memory (e.g. caused by a memory leak). + rss, vms = psutil.Process(os.getpid()).get_memory_info() + if vms > config['limit_memory_soft']: + _logger.info('Virtual memory consumption too high, rebooting the worker.') + self.alive = False # Commit suicide after the request. + + # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space + soft, hard = resource.getrlimit(resource.RLIMIT_AS) + resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard)) + + # SIGXCPU (exceeded CPU time) signal handler will raise an exception. + r = resource.getrusage(resource.RUSAGE_SELF) + cpu_time = r.ru_utime + r.ru_stime + def time_expired(n, stack): + _logger.info('CPU time limit exceeded.') + raise Exception('CPU time limit exceeded.') + signal.signal(signal.SIGXCPU, time_expired) + soft, hard = resource.getrlimit(resource.RLIMIT_CPU) + resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard)) + + def process_work(self): + pass + + def start(self): + self.pid = os.getpid() + _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid) + # Reseed the random number generator + random.seed() + # Prevent fd inherientence close_on_exec + flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC + fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags) + # reset blocking status + self.multi.socket.setblocking(0) + signal.signal(signal.SIGINT, self.signal_handler) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGCHLD, signal.SIG_DFL) + + def stop(self): + pass + + def run(self): + try: + self.start() + while self.alive: + self.process_limit() + self.multi.pipe_ping(self.watchdog_pipe) + self.sleep() + self.process_work() + _logger.info("Worker (%s) exiting...",self.pid) + self.stop() + except Exception,e: + _logger.exception("Worker (%s) Exception occured, exiting..."%self.pid) + # should we use 3 to abort everything ? + sys.exit(1) + +class WorkerHTTP(Worker): + """ HTTP Request workers """ + def process_request(self, client, addr): + client.setblocking(1) + client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + # Prevent fd inherientence close_on_exec + flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC + fcntl.fcntl(client, fcntl.F_SETFD, flags) + # do request using WorkerBaseWSGIServer monkey patched with socket + self.server.socket = client + self.server.process_request(client,addr) + self.request_count += 1 + + def process_work(self): + try: + client, addr = self.multi.socket.accept() + self.process_request(client, addr) + except socket.error, e: + if e[0] not in (errno.EAGAIN, errno.ECONNABORTED): + raise + + def start(self): + Worker.start(self) + self.server = WorkerBaseWSGIServer(self.multi.app) + +class WorkerBaseWSGIServer(werkzeug.serving.BaseWSGIServer): + """ werkzeug WSGI Server patched to allow using an external listen socket + """ + def __init__(self, app): + werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app) + def server_bind(self): + # we dont bind beause we use the listen socket of Multicorn#socket + # instead we close the socket + if self.socket: + self.socket.close() + def server_activate(self): + # dont listen as we use Multicorn#socket + pass + +class WorkerCron(Worker): + """ Cron workers """ + def sleep(self): + time.sleep(60) + + def process_work(self): + if config['db_name']: + db_names = config['db_name'].split(',') + else: + db_names = openerp.netsvc.ExportService._services['db'].exp_list(True) + for db_name in db_names: + while True: + # TODO Each job should be considered as one request in multiprocessing + acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name) + if not acquired: + break + self.request_count += 1 + + def start(self): + Worker.start(self) + openerp.service.start_internal() + +# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/wsgi/core.py b/openerp/service/wsgi_server.py similarity index 79% rename from openerp/wsgi/core.py rename to openerp/service/wsgi_server.py index 0aff7172480..61556974a00 100644 --- a/openerp/wsgi/core.py +++ b/openerp/service/wsgi_server.py @@ -38,10 +38,13 @@ import sys import threading import traceback +import werkzeug.serving +import werkzeug.contrib.fixers + import openerp import openerp.modules import openerp.tools.config as config -from ..service import websrv_lib +import websrv_lib _logger = logging.getLogger(__name__) @@ -221,9 +224,6 @@ def wsgi_xmlrpc_legacy(environ, start_response): params, method = xmlrpclib.loads(data) return xmlrpc_return(start_response, path, method, params, True) -def wsgi_jsonrpc(environ, start_response): - pass - def wsgi_webdav(environ, start_response): pi = environ['PATH_INFO'] if environ['REQUEST_METHOD'] == 'OPTIONS' and pi in ['*','/']: @@ -382,28 +382,31 @@ def register_wsgi_handler(handler): """ module_handlers.append(handler) -def application(environ, start_response): +def application_unproxied(environ, start_response): """ WSGI entry point.""" + openerp.service.start_internal() # Try all handlers until one returns some result (i.e. not None). - wsgi_handlers = [ - wsgi_xmlrpc_1, - wsgi_xmlrpc, - wsgi_jsonrpc, - wsgi_xmlrpc_legacy, - wsgi_webdav - ] + module_handlers + wsgi_handlers = [wsgi_xmlrpc_1, wsgi_xmlrpc, wsgi_xmlrpc_legacy, wsgi_webdav] + wsgi_handlers += module_handlers for handler in wsgi_handlers: result = handler(environ, start_response) if result is None: continue return result + # We never returned from the loop. response = 'No handler found.\n' start_response('404 Not Found', [('Content-Type', 'text/plain'), ('Content-Length', str(len(response)))]) return [response] +def application(environ, start_response): + if 'HTTP_X_FORWARDED_HOST' in environ: + return werkzeug.contrib.fixers.ProxyFix(application_unproxied)(environ, start_response) + else: + return application_unproxied(environ, start_response) + # The WSGI server, started by start_server(), stopped by stop_server(). httpd = None @@ -421,25 +424,8 @@ def serve(): # TODO Change the xmlrpc_* options to http_* interface = config['xmlrpc_interface'] or '0.0.0.0' port = config['xmlrpc_port'] - try: - import werkzeug.serving - if config['proxy_mode']: - from werkzeug.contrib.fixers import ProxyFix - app = ProxyFix(application) - suffix = ' (in proxy mode)' - else: - app = application - suffix = '' - httpd = werkzeug.serving.make_server(interface, port, app, threaded=True) - _logger.info('HTTP service (werkzeug) running on %s:%s%s', interface, port, suffix) - except ImportError: - import wsgiref.simple_server - _logger.warning('Werkzeug module unavailable, falling back to wsgiref.') - if config['proxy_mode']: - _logger.warning('Werkzeug module unavailable, not using proxy mode.') - httpd = wsgiref.simple_server.make_server(interface, port, application) - _logger.info('HTTP service (wsgiref) running on %s:%s', interface, port) - + httpd = werkzeug.serving.make_server(interface, port, application, threaded=True) + _logger.info('HTTP service (werkzeug) running on %s:%s', interface, port) httpd.serve_forever() def start_server(): @@ -457,87 +443,4 @@ def stop_server(): if httpd: httpd.shutdown() -# Master process id, can be used for signaling. -arbiter_pid = None - -# Application setup before we can spawn any worker process. -# This is suitable for e.g. gunicorn's on_starting hook. -def on_starting(server): - global arbiter_pid - arbiter_pid = os.getpid() # TODO check if this is true even after replacing the executable - #openerp.tools.cache = kill_workers_cache - openerp.netsvc.init_logger() - openerp.osv.osv.start_object_proxy() - openerp.service.web_services.start_web_services() - openerp.modules.module.initialize_sys_path() - openerp.modules.loading.open_openerp_namespace() - for m in openerp.conf.server_wide_modules: - try: - openerp.modules.module.load_openerp_module(m) - except Exception: - msg = '' - if m == 'web': - msg = """ -The `web` module is provided by the addons found in the `openerp-web` project. -Maybe you forgot to add those addons in your addons_path configuration.""" - _logger.exception('Failed to load server-wide module `%s`.%s', m, msg) - -# Install our own signal handler on the master process. -def when_ready(server): - # Hijack gunicorn's SIGWINCH handling; we can choose another one. - signal.signal(signal.SIGWINCH, make_winch_handler(server)) - -# Install limits on virtual memory and CPU time consumption. -def pre_request(worker, req): - import os - import psutil - import resource - import signal - # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space - rss, vms = psutil.Process(os.getpid()).get_memory_info() - soft, hard = resource.getrlimit(resource.RLIMIT_AS) - resource.setrlimit(resource.RLIMIT_AS, (config['virtual_memory_limit'], hard)) - - r = resource.getrusage(resource.RUSAGE_SELF) - cpu_time = r.ru_utime + r.ru_stime - signal.signal(signal.SIGXCPU, time_expired) - soft, hard = resource.getrlimit(resource.RLIMIT_CPU) - resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['cpu_time_limit'], hard)) - -# Reset the worker if it consumes too much memory (e.g. caused by a memory leak). -def post_request(worker, req, environ): - import os - import psutil - rss, vms = psutil.Process(os.getpid()).get_memory_info() - if vms > config['virtual_memory_reset']: - _logger.info('Virtual memory consumption ' - 'too high, rebooting the worker.') - worker.alive = False # Commit suicide after the request. - -# Our signal handler will signal a SGIQUIT to all workers. -def make_winch_handler(server): - def handle_winch(sig, fram): - server.kill_workers(signal.SIGQUIT) # This is gunicorn specific. - return handle_winch - -# SIGXCPU (exceeded CPU time) signal handler will raise an exception. -def time_expired(n, stack): - _logger.info('CPU time limit exceeded.') - raise Exception('CPU time limit exceeded.') # TODO one of openerp.exception - -# Kill gracefuly the workers (e.g. because we want to clear their cache). -# This is done by signaling a SIGWINCH to the master process, so it can be -# called by the workers themselves. -def kill_workers(): - try: - os.kill(arbiter_pid, signal.SIGWINCH) - except OSError, e: - if e.errno == errno.ESRCH: # no such pid - return - raise - -class kill_workers_cache(openerp.tools.ormcache): - def clear(self, dbname, *args, **kwargs): - kill_workers() - # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/tools/config.py b/openerp/tools/config.py index ce157c5e697..58d1d3c9c82 100644 --- a/openerp/tools/config.py +++ b/openerp/tools/config.py @@ -116,8 +116,6 @@ class configmanager(object): help="specify the TCP port for the XML-RPC protocol", type="int") group.add_option("--no-xmlrpc", dest="xmlrpc", action="store_false", my_default=True, help="disable the XML-RPC protocol") - group.add_option("--proxy-mode", dest="proxy_mode", action="store_true", my_default=False, - help="Enable correct behavior when behind a reverse proxy") parser.add_option_group(group) # XML-RPC / HTTPS @@ -253,8 +251,6 @@ class configmanager(object): # Advanced options group = optparse.OptionGroup(parser, "Advanced options") - group.add_option("--cache-timeout", dest="cache_timeout", my_default=100000, - help="set the timeout for the cache system", type="int") group.add_option('--debug', dest='debug_mode', action='store_true', my_default=False, help='enable debug mode') group.add_option("--stop-after-init", action="store_true", dest="stop_after_init", my_default=False, help="stop the server after its initialization") @@ -272,23 +268,30 @@ class configmanager(object): group.add_option("--max-cron-threads", dest="max_cron_threads", my_default=4, help="Maximum number of threads processing concurrently cron jobs.", type="int") - # TODO sensible default for the three following limits. - group.add_option("--virtual-memory-limit", dest="virtual_memory_limit", my_default=768 * 1024 * 1024, - help="Maximum allowed virtual memory per Gunicorn process. " - "When the limit is reached, any memory allocation will fail.", - type="int") - group.add_option("--virtual-memory-reset", dest="virtual_memory_reset", my_default=640 * 1024 * 1024, - help="Maximum allowed virtual memory per Gunicorn process. " - "When the limit is reached, the worker will be reset after " - "the current request.", - type="int") - group.add_option("--cpu-time-limit", dest="cpu_time_limit", my_default=60, - help="Maximum allowed CPU time per Gunicorn process. " - "When the limit is reached, an exception is raised.", - type="int") group.add_option("--unaccent", dest="unaccent", my_default=False, action="store_true", help="Use the unaccent function provided by the database when available.") + parser.add_option_group(group) + group = optparse.OptionGroup(parser, "Multiprocessing options") + # TODO sensible default for the three following limits. + group.add_option("--workers", dest="workers", my_default=0, + help="Specify the number of workers, 0 disable prefork mode.", + type="int") + group.add_option("--limit-memory-soft", dest="limit_memory_soft", my_default=640 * 1024 * 1024, + help="Maximum allowed virtual memory per worker, when reached the worker be reset after the current request.", + type="int") + group.add_option("--limit-memory-hard", dest="limit_memory_hard", my_default=768 * 1024 * 1024, + help="Maximum allowed virtual memory per worker, when reached, any memory allocation will fail.", + type="int") + group.add_option("--limit-time-cpu", dest="limit_time_cpu", my_default=60, + help="Maximum allowed CPU time per request.", + type="int") + group.add_option("--limit-time-real", dest="limit_time_real", my_default=60, + help="Maximum allowed Real time per request. ", + type="int") + group.add_option("--limit-request", dest="limit_request", my_default=8192, + help="Maximum number of request to be processed per worker.", + type="int") parser.add_option_group(group) # Copy all optparse options (i.e. MyOption) into self.options. @@ -369,7 +372,7 @@ class configmanager(object): # if defined dont take the configfile value even if the defined value is None keys = ['xmlrpc_interface', 'xmlrpc_port', 'db_name', 'db_user', 'db_password', 'db_host', - 'db_port', 'db_template', 'logfile', 'pidfile', 'smtp_port', 'cache_timeout', + 'db_port', 'db_template', 'logfile', 'pidfile', 'smtp_port', 'email_from', 'smtp_server', 'smtp_user', 'smtp_password', 'netrpc_interface', 'netrpc_port', 'db_maxconn', 'import_partial', 'addons_path', 'netrpc', 'xmlrpc', 'syslog', 'without_demo', 'timezone', @@ -391,10 +394,10 @@ class configmanager(object): 'language', 'translate_out', 'translate_in', 'overwrite_existing_translations', 'debug_mode', 'smtp_ssl', 'load_language', 'stop_after_init', 'logrotate', 'without_demo', 'netrpc', 'xmlrpc', 'syslog', - 'list_db', 'xmlrpcs', 'proxy_mode', + 'list_db', 'xmlrpcs', 'test_file', 'test_enable', 'test_commit', 'test_report_directory', - 'osv_memory_count_limit', 'osv_memory_age_limit', 'max_cron_threads', - 'virtual_memory_limit', 'virtual_memory_reset', 'cpu_time_limit', 'unaccent', + 'osv_memory_count_limit', 'osv_memory_age_limit', 'max_cron_threads', 'unaccent', + 'workers', 'limit_memory_hard', 'limit_memory_soft', 'limit_time_cpu', 'limit_time_real', 'limit_request' ] for arg in keys: diff --git a/openerp/wsgi/__init__.py b/openerp/wsgi/__init__.py deleted file mode 100644 index a536ee57327..00000000000 --- a/openerp/wsgi/__init__.py +++ /dev/null @@ -1,33 +0,0 @@ -# -*- coding: utf-8 -*- -############################################################################## -# -# OpenERP, Open Source Management Solution -# Copyright (C) 2012-2012 OpenERP s.a. (). -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - -""" WSGI stack - -This module offers a WSGI interface to/from OpenERP. - -""" - -from . import core - -# TODO: This is for the web addons, can be removed later. -register_wsgi_handler = core.register_wsgi_handler - -# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: diff --git a/openerp/wsgi/proxied.py b/openerp/wsgi/proxied.py deleted file mode 100644 index 658f6f29c55..00000000000 --- a/openerp/wsgi/proxied.py +++ /dev/null @@ -1,34 +0,0 @@ -# -*- coding: utf-8 -*- -############################################################################## -# -# OpenERP, Open Source Management Solution -# Copyright (C) 2012 OpenERP s.a. (). -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - -""" - -WSGI entry point with Proxy mode (from Werkzeug). - -""" - -from werkzeug.contrib.fixers import ProxyFix - -from . import core - -application = ProxyFix(core.application) - -# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: