[IMP] --auto-reload to enable code and xml auto reload, service cleanup.

Added an autoreload mecanism based on pyinotify that restart the server as soon
as a .py or .xml file change is detected, for xml updates the relevant -u are
automatically added to exec(2).

pyinotify is linux specific and should be replaced by a cross plaform library
such as watchdog. Unfortunatly watchdog is not yet packaged in debian. We could
support both libraries, patches are welcome.

Refactored the code in cli/* and service/*. The 3 running modes of openerp
(threaded, prefork, gevent) are now in openerp/service/server.py, one class per
mode, and they share the same interface.

Added a signal handler to increase or decrase the number of HTTP workers in
prefork mode (SIGTTIN, SIGTTOU).

bzr revid: al@openerp.com-20131005225740-hrxwy50ldi5yql0e
This commit is contained in:
Antony Lesuisse 2013-10-06 00:57:40 +02:00
commit 40ba925691
12 changed files with 880 additions and 826 deletions

View File

@ -2,8 +2,8 @@
import gevent.monkey
gevent.monkey.patch_all()
import gevent_psycopg2
gevent_psycopg2.monkey_patch()
import psycogreen.gevent
psycogreen.gevent.patch_psycopg()
import openerp

View File

@ -30,7 +30,6 @@ evented = False
if sys.modules.get("gevent") is not None:
evented = True
# Make sure the OpenERP server runs in UTC. This is especially necessary
# under Windows as under Linux it seems the real import of time is
# sufficiently deferred so that setting the TZ environment variable

View File

@ -716,7 +716,7 @@ class module(osv.osv):
if already_installed:
# in this case, force server restart to reload python code...
cr.commit()
openerp.service.restart_server()
openerp.service.server.restart()
return {
'type': 'ir.actions.client',
'tag': 'home',

View File

@ -147,112 +147,15 @@ def import_translation():
cr.commit()
cr.close()
# Variable keeping track of the number of calls to the signal handler defined
# below. This variable is monitored by ``quit_on_signals()``.
quit_signals_received = 0
def signal_handler(sig, frame):
""" Signal handler: exit ungracefully on the second handled signal.
:param sig: the signal number
:param frame: the interrupted stack frame or None
"""
global quit_signals_received
quit_signals_received += 1
if quit_signals_received > 1:
# logging.shutdown was already called at this point.
sys.stderr.write("Forced shutdown.\n")
os._exit(0)
def dumpstacks(sig, frame):
""" Signal handler: dump a stack trace for each existing thread."""
# code from http://stackoverflow.com/questions/132058/getting-stack-trace-from-a-running-python-application#answer-2569696
# modified for python 2.5 compatibility
threads_info = dict([(th.ident, {'name': th.name,
'uid': getattr(th,'uid','n/a')})
for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
thread_info = threads_info.get(threadId)
code.append("\n# Thread: %s (id:%s) (uid:%s)" % \
(thread_info and thread_info['name'] or 'n/a',
threadId,
thread_info and thread_info['uid'] or 'n/a'))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
_logger.info("\n".join(code))
def setup_signal_handlers(signal_handler):
""" Register the given signal handler. """
SIGNALS = (signal.SIGINT, signal.SIGTERM)
if os.name == 'posix':
map(lambda sig: signal.signal(sig, signal_handler), SIGNALS)
signal.signal(signal.SIGQUIT, dumpstacks)
elif os.name == 'nt':
import win32api
win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1)
def quit_on_signals():
""" Wait for one or two signals then shutdown the server.
The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
a second one if any will force an immediate exit.
"""
# Wait for a first signal to be handled. (time.sleep will be interrupted
# by the signal handler.) The try/except is for the win32 case.
try:
while quit_signals_received == 0:
time.sleep(60)
except KeyboardInterrupt:
pass
config = openerp.tools.config
openerp.service.stop_services()
if getattr(openerp, 'phoenix', False):
# like the phoenix, reborn from ashes...
openerp.service._reexec()
return
if config['pidfile']:
os.unlink(config['pidfile'])
sys.exit(0)
def watch_parent(beat=4):
import gevent
ppid = os.getppid()
while True:
if ppid != os.getppid():
pid = os.getpid()
_logger.info("LongPolling (%s) Parent changed", pid)
# suicide !!
os.kill(pid, signal.SIGTERM)
return
gevent.sleep(beat)
def main(args):
check_root_user()
openerp.tools.config.parse_config(args)
if openerp.tools.config.options["gevent"]:
openerp.evented = True
_logger.info('Using gevent mode')
import gevent.monkey
gevent.monkey.patch_all()
import gevent_psycopg2
gevent_psycopg2.monkey_patch()
check_postgres_user()
openerp.netsvc.init_logger()
report_configuration()
config = openerp.tools.config
setup_signal_handlers(signal_handler)
if config["test_file"]:
run_test_file(config['db_name'], config['test_file'])
sys.exit(0)
@ -265,28 +168,19 @@ def main(args):
import_translation()
sys.exit(0)
if not config["stop_after_init"]:
setup_pid_file()
# Some module register themselves when they are loaded so we need the
# services to be running before loading any registry.
if not openerp.evented:
if config['workers']:
openerp.service.start_services_workers()
else:
openerp.service.start_services()
else:
config['xmlrpc_port'] = config['longpolling_port']
import gevent
gevent.spawn(watch_parent)
openerp.service.start_services()
# preload registryies, needed for -u --stop_after_init
rc = 0
if config['db_name']:
for dbname in config['db_name'].split(','):
if not preload_registry(dbname):
rc += 1
if config["stop_after_init"]:
if not config["stop_after_init"]:
setup_pid_file()
openerp.service.server.start()
if config['pidfile']:
os.unlink(config['pidfile'])
else:
sys.exit(rc)
_logger.info('OpenERP server is running, waiting for connections...')

View File

@ -20,29 +20,12 @@
#
##############################################################################
import logging
import os
import signal
import subprocess
import sys
import threading
import time
import cron
import wsgi_server
import openerp
import openerp.modules
import openerp.netsvc
import openerp.osv
from openerp.release import nt_service_name
import openerp.tools
from openerp.tools.misc import stripped_sys_argv
import common
import db
import model
import report
import wsgi_server
import server
#.apidoc title: RPC Services
@ -55,100 +38,5 @@ import report
low-level behavior of the wire.
"""
_logger = logging.getLogger(__name__)
def load_server_wide_modules():
for m in openerp.conf.server_wide_modules:
try:
openerp.modules.module.load_openerp_module(m)
except Exception:
msg = ''
if m == 'web':
msg = """
The `web` module is provided by the addons found in the `openerp-web` project.
Maybe you forgot to add those addons in your addons_path configuration."""
_logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
start_internal_done = False
main_thread_id = threading.currentThread().ident
def start_internal():
global start_internal_done
if start_internal_done:
return
openerp.netsvc.init_logger()
load_server_wide_modules()
start_internal_done = True
def start_services():
""" Start all services including http, and cron """
start_internal()
# Start the WSGI server.
wsgi_server.start_service()
# Start the main cron thread.
if not openerp.evented:
cron.start_service()
def stop_services():
""" Stop all services. """
# stop services
if not openerp.evented:
cron.stop_service()
wsgi_server.stop_service()
_logger.info("Initiating shutdown")
_logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
# Manually join() all threads before calling sys.exit() to allow a second signal
# to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
# threading.Thread.join() should not mask signals (at least in python 2.5).
me = threading.currentThread()
_logger.debug('current thread: %r', me)
for thread in threading.enumerate():
_logger.debug('process %r (%r)', thread, thread.isDaemon())
if thread != me and not thread.isDaemon() and thread.ident != main_thread_id:
while thread.isAlive():
_logger.debug('join and sleep')
# Need a busyloop here as thread.join() masks signals
# and would prevent the forced shutdown.
thread.join(0.05)
time.sleep(0.05)
_logger.debug('--')
openerp.modules.registry.RegistryManager.delete_all()
logging.shutdown()
def start_services_workers():
import openerp.service.workers
openerp.multi_process = True
openerp.service.workers.Multicorn(openerp.service.wsgi_server.application).run()
def _reexec():
"""reexecute openerp-server process with (nearly) the same arguments"""
if openerp.tools.osutil.is_running_as_nt_service():
subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
exe = os.path.basename(sys.executable)
args = stripped_sys_argv()
if not args or args[0] != exe:
args.insert(0, exe)
os.execv(sys.executable, args)
def restart_server():
if openerp.multi_process:
raise NotImplementedError("Multicorn is not supported (but gunicorn was)")
pid = openerp.wsgi.core.arbiter_pid
os.kill(pid, signal.SIGHUP)
else:
if os.name == 'nt':
def reborn():
stop_services()
_reexec()
# run in a thread to let the current thread return response to the caller.
threading.Thread(target=reborn).start()
else:
openerp.phoenix = True
os.kill(os.getpid(), signal.SIGINT)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -1,76 +0,0 @@
# -*- coding: utf-8 -*-
##############################################################################
#
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
""" Cron jobs scheduling
Cron jobs are defined in the ir_cron table/model. This module deals with all
cron jobs, for all databases of a single OpenERP server instance.
"""
import logging
import threading
import time
from datetime import datetime
import openerp
_logger = logging.getLogger(__name__)
SLEEP_INTERVAL = 60 # 1 min
def cron_runner(number):
while True:
time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style
registries = openerp.modules.registry.RegistryManager.registries
_logger.debug('cron%d polling for jobs', number)
for db_name, registry in registries.items():
while True and registry.ready:
acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
if not acquired:
break
def start_service():
""" Start the above runner function in a daemon thread.
The thread is a typical daemon thread: it will never quit and must be
terminated when the main process exits - with no consequence (the processing
threads it spawns are not marked daemon).
"""
# Force call to strptime just before starting the cron thread
# to prevent time.strptime AttributeError within the thread.
# See: http://bugs.python.org/issue7980
datetime.strptime('2012-01-01', '%Y-%m-%d')
for i in range(openerp.tools.config['max_cron_threads']):
def target():
cron_runner(i)
t = threading.Thread(target=target, name="openerp.service.cron.cron%d" % i)
t.setDaemon(True)
t.start()
_logger.debug("cron%d started!" % i)
def stop_service():
pass
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

863
openerp/service/server.py Normal file
View File

@ -0,0 +1,863 @@
#-----------------------------------------------------------
# Threaded, Gevent and Prefork Servers
#-----------------------------------------------------------
import datetime
import errno
import fcntl
import logging
import os
import os.path
import platform
import psutil
import random
import resource
import select
import signal
import socket
import subprocess
import sys
import threading
import time
import traceback
import werkzeug.serving
try:
from setproctitle import setproctitle
except ImportError:
setproctitle = lambda x: None
import openerp
import openerp.tools.config as config
from openerp.release import nt_service_name
from openerp.tools.misc import stripped_sys_argv
import wsgi_server
_logger = logging.getLogger(__name__)
SLEEP_INTERVAL = 60 # 1 min
#----------------------------------------------------------
# Werkzeug WSGI servers patched
#----------------------------------------------------------
class BaseWSGIServerNoBind(werkzeug.serving.BaseWSGIServer):
""" werkzeug Base WSGI Server patched to skip socket binding. PreforkServer
use this class, sets the socket and calls the process_request() manually
"""
def __init__(self, app):
werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app)
def server_bind(self):
# we dont bind beause we use the listen socket of PreforkServer#socket
# instead we close the socket
if self.socket:
self.socket.close()
def server_activate(self):
# dont listen as we use PreforkServer#socket
pass
# MAybe NOT useful BECAUSE of SOCKET_REUSE, need to test
class ThreadedWSGIServerReloadable(werkzeug.serving.ThreadedWSGIServer):
""" werkzeug Threaded WSGI Server patched to allow reusing a listen socket
given by the environement, this is used by autoreload to keep the listen
socket open when a reload happens.
"""
def server_bind(self):
envfd = os.environ.get('OPENERP_AUTO_RELOAD_FD')
if envfd:
self.reload_socket = socket.fromfd
# close os.close()fd if fd has been diplucated ?!
else:
self.reload_socket = False
super(ThreadedWSGIServerReloadable, self).server_bind()
def server_activate(self):
if not self.reload_socket:
super(ThreadedWSGIServerReloadable, self).server_activate()
#----------------------------------------------------------
# AutoReload watcher
#----------------------------------------------------------
class AutoReload(object):
def __init__(self, server):
self.server = server
self.files = {}
self.modules = {}
import pyinotify
class EventHandler(pyinotify.ProcessEvent):
def __init__(self, autoreload):
self.autoreload = autoreload
def process_IN_CREATE(self, event):
_logger.debug('File created: %s', event.pathname)
self.autoreload.files[event.pathname] = 1
def process_IN_MODIFY(self, event):
_logger.debug('File modified: %s', event.pathname)
self.autoreload.files[event.pathname] = 1
self.wm = pyinotify.WatchManager()
self.handler = EventHandler(self)
self.notifier = pyinotify.Notifier(self.wm, self.handler, timeout=0)
mask = pyinotify.IN_MODIFY | pyinotify.IN_CREATE # IN_MOVED_FROM, IN_MOVED_TO ?
for path in openerp.tools.config.options["addons_path"].split(','):
_logger.info('Watching addons folder %s', path)
self.wm.add_watch(path, mask, rec=True)
def process_data(self, files):
xml_files = [i for i in files if i.endswith('.xml')]
addons_path = openerp.tools.config.options["addons_path"].split(',')
for i in xml_files:
for path in addons_path:
if i.startswith(path):
# find out wich addons path the file belongs to
# and extract it's module name
right = i[len(path) + 1:].split('/')
if len(right) < 2:
continue
module = right[0]
self.modules[module]=1
if self.modules:
_logger.info('autoreload: xml change detected, autoreload activated')
restart()
def process_python(self, files):
# process python changes
py_files = [i for i in files if i.endswith('.py')]
py_errors = []
# TODO keep python errors until they are ok
if py_files:
for i in py_files:
try:
source = open(i, 'rb').read() + '\n'
compile(source, i, 'exec')
except SyntaxError:
py_errors.append(i)
if py_errors:
_logger.info('autoreload: python code change detected, errors found')
for i in py_errors:
_logger.info('autoreload: SyntaxError %s',i)
else:
_logger.info('autoreload: python code updated, autoreload activated')
restart()
def check_thread(self):
# Check if some files have been touched in the addons path.
# If true, check if the touched file belongs to an installed module
# in any of the database used in the registry manager.
while 1:
while self.notifier.check_events(1000):
self.notifier.read_events()
self.notifier.process_events()
l = self.files.keys()
self.files.clear()
self.process_data(l)
self.process_python(l)
def run(self):
t = threading.Thread(target=self.check_thread)
t.setDaemon(True)
t.start()
_logger.info('AutoReload watcher running')
#----------------------------------------------------------
# Servers: Threaded, Gevented and Prefork
#----------------------------------------------------------
class CommonServer(object):
def __init__(self, app):
# TODO Change the xmlrpc_* options to http_*
self.app = app
# config
self.interface = config['xmlrpc_interface'] or '0.0.0.0'
self.port = config['xmlrpc_port']
# runtime
self.pid = os.getpid()
def dumpstacks(self):
""" Signal handler: dump a stack trace for each existing thread."""
# code from http://stackoverflow.com/questions/132058/getting-stack-trace-from-a-running-python-application#answer-2569696
# modified for python 2.5 compatibility
threads_info = dict([(th.ident, {'name': th.name,
'uid': getattr(th,'uid','n/a')})
for th in threading.enumerate()])
code = []
for threadId, stack in sys._current_frames().items():
thread_info = threads_info.get(threadId)
code.append("\n# Thread: %s (id:%s) (uid:%s)" % \
(thread_info and thread_info['name'] or 'n/a',
threadId,
thread_info and thread_info['uid'] or 'n/a'))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
_logger.info("\n".join(code))
def close_socket(self, sock):
""" Closes a socket instance cleanly
:param sock: the network socket to close
:type sock: socket.socket
"""
try:
sock.shutdown(socket.SHUT_RDWR)
except socket.error, e:
# On OSX, socket shutdowns both sides if any side closes it
# causing an error 57 'Socket is not connected' on shutdown
# of the other side (or something), see
# http://bugs.python.org/issue4397
# note: stdlib fixed test, not behavior
if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
raise
sock.close()
class ThreadedServer(CommonServer):
def __init__(self, app):
super(ThreadedServer, self).__init__(app)
self.main_thread_id = threading.currentThread().ident
# Variable keeping track of the number of calls to the signal handler defined
# below. This variable is monitored by ``quit_on_signals()``.
self.quit_signals_received = 0
#self.socket = None
self.httpd = None
def signal_handler(self, sig, frame):
if sig in [signal.SIGINT,signal.SIGTERM]:
# shutdown on kill -INT or -TERM
self.quit_signals_received += 1
if self.quit_signals_received > 1:
# logging.shutdown was already called at this point.
sys.stderr.write("Forced shutdown.\n")
os._exit(0)
elif sig == signal.SIGHUP:
# restart on kill -HUP
openerp.phoenix = True
self.quit_signals_received += 1
elif sig == signal.SIGQUIT:
# dump stacks on kill -3
self.dumpstacks()
def cron_thread(self, number):
while True:
time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style
registries = openerp.modules.registry.RegistryManager.registries
_logger.debug('cron%d polling for jobs', number)
for db_name, registry in registries.items():
while True and registry.ready:
acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
if not acquired:
break
def cron_spawn(self):
""" Start the above runner function in a daemon thread.
The thread is a typical daemon thread: it will never quit and must be
terminated when the main process exits - with no consequence (the processing
threads it spawns are not marked daemon).
"""
# Force call to strptime just before starting the cron thread
# to prevent time.strptime AttributeError within the thread.
# See: http://bugs.python.org/issue7980
datetime.datetime.strptime('2012-01-01', '%Y-%m-%d')
for i in range(openerp.tools.config['max_cron_threads']):
def target():
self.cron_thread(i)
t = threading.Thread(target=target, name="openerp.service.cron.cron%d" % i)
t.setDaemon(True)
t.start()
_logger.debug("cron%d started!" % i)
def http_thread(self):
def app(e,s):
return self.app(e,s)
self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, app)
self.httpd.serve_forever()
def http_spawn(self):
threading.Thread(target=self.http_thread).start()
_logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
def start(self):
_logger.debug("Setting signal handlers")
if os.name == 'posix':
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGCHLD, self.signal_handler)
signal.signal(signal.SIGHUP, self.signal_handler)
signal.signal(signal.SIGQUIT, self.signal_handler)
elif os.name == 'nt':
import win32api
win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1)
self.cron_spawn()
self.http_spawn()
def stop(self):
""" Shutdown the WSGI server. Wait for non deamon threads.
"""
_logger.info("Initiating shutdown")
_logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
self.httpd.shutdown()
self.close_socket(self.httpd.socket)
# Manually join() all threads before calling sys.exit() to allow a second signal
# to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
# threading.Thread.join() should not mask signals (at least in python 2.5).
me = threading.currentThread()
_logger.debug('current thread: %r', me)
for thread in threading.enumerate():
_logger.debug('process %r (%r)', thread, thread.isDaemon())
if thread != me and not thread.isDaemon() and thread.ident != self.main_thread_id:
while thread.isAlive():
_logger.debug('join and sleep')
# Need a busyloop here as thread.join() masks signals
# and would prevent the forced shutdown.
thread.join(0.05)
time.sleep(0.05)
_logger.debug('--')
openerp.modules.registry.RegistryManager.delete_all()
logging.shutdown()
def run(self):
""" Start the http server and the cron thread then wait for a signal.
The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
a second one if any will force an immediate exit.
"""
self.start()
# Wait for a first signal to be handled. (time.sleep will be interrupted
# by the signal handler.) The try/except is for the win32 case.
try:
while self.quit_signals_received == 0:
time.sleep(60)
except KeyboardInterrupt:
pass
self.stop()
def reload(self):
os.kill(self.pid, signal.SIGHUP)
class GeventServer(CommonServer):
def __init__(self, app):
super(GeventServer, self).__init__(app)
self.port = config['longpolling_port']
self.httpd = None
def watch_parent(self, beat=4):
import gevent
ppid = os.getppid()
while True:
if ppid != os.getppid():
pid = os.getpid()
_logger.info("LongPolling (%s) Parent changed", pid)
# suicide !!
os.kill(pid, signal.SIGTERM)
return
gevent.sleep(beat)
def start(self):
import gevent
from gevent.wsgi import WSGIServer
gevent.spawn(self.watch_parent)
self.httpd = WSGIServer((self.interface, self.port), self.app)
_logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port)
self.httpd.serve_forever()
def stop(self):
import gevent
self.httpd.stop()
gevent.shutdown()
def run(self):
self.start()
self.stop()
class PreforkServer(CommonServer):
""" Multiprocessing inspired by (g)unicorn.
PreforkServer (aka Multicorn) currently uses accept(2) as dispatching
method between workers but we plan to replace it by a more intelligent
dispatcher to will parse the first HTTP request line.
"""
def __init__(self, app):
# config
self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
self.population = config['workers']
self.timeout = config['limit_time_real']
self.limit_request = config['limit_request']
# working vars
self.beat = 4
self.app = app
self.pid = os.getpid()
self.socket = None
self.workers_http = {}
self.workers_cron = {}
self.workers = {}
self.generation = 0
self.queue = []
self.long_polling_pid = None
def pipe_new(self):
pipe = os.pipe()
for fd in pipe:
# non_blocking
flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
# close_on_exec
flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
fcntl.fcntl(fd, fcntl.F_SETFD, flags)
return pipe
def pipe_ping(self, pipe):
try:
os.write(pipe[1], '.')
except IOError, e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
def signal_handler(self, sig, frame):
if len(self.queue) < 5 or sig == signal.SIGCHLD:
self.queue.append(sig)
self.pipe_ping(self.pipe)
else:
_logger.warn("Dropping signal: %s", sig)
def worker_spawn(self, klass, workers_registry):
self.generation += 1
worker = klass(self)
pid = os.fork()
if pid != 0:
worker.pid = pid
self.workers[pid] = worker
workers_registry[pid] = worker
return worker
else:
worker.run()
sys.exit(0)
def long_polling_spawn(self):
nargs = stripped_sys_argv('--pidfile','--workers')
cmd = nargs[0]
cmd = os.path.join(os.path.dirname(cmd), "openerp-gevent")
nargs[0] = cmd
popen = subprocess.Popen(nargs)
self.long_polling_pid = popen.pid
def worker_pop(self, pid):
if pid in self.workers:
_logger.debug("Worker (%s) unregistered",pid)
try:
self.workers_http.pop(pid,None)
self.workers_cron.pop(pid,None)
u = self.workers.pop(pid)
u.close()
except OSError:
return
def worker_kill(self, pid, sig):
try:
os.kill(pid, sig)
except OSError, e:
if e.errno == errno.ESRCH:
self.worker_pop(pid)
def process_signals(self):
while len(self.queue):
sig = self.queue.pop(0)
if sig in [signal.SIGINT,signal.SIGTERM]:
raise KeyboardInterrupt
elif sig == signal.SIGHUP:
# restart on kill -HUP
openerp.phoenix = True
raise KeyboardInterrupt
elif sig == signal.SIGQUIT:
# dump stacks on kill -3
self.dumpstacks()
elif sig == signal.SIGTTIN:
# increase number of workers
self.population += 1
elif sig == signal.SIGTTOU:
# decrease number of workers
self.population -= 1
def process_zombie(self):
# reap dead workers
while 1:
try:
wpid, status = os.waitpid(-1, os.WNOHANG)
if not wpid:
break
if (status >> 8) == 3:
msg = "Critial worker error (%s)"
_logger.critical(msg, wpid)
raise Exception(msg % wpid)
self.worker_pop(wpid)
except OSError, e:
if e.errno == errno.ECHILD:
break
raise
def process_timeout(self):
now = time.time()
for (pid, worker) in self.workers.items():
if (worker.watchdog_timeout is not None) and \
(now - worker.watchdog_time >= worker.watchdog_timeout):
_logger.error("Worker (%s) timeout", pid)
self.worker_kill(pid, signal.SIGKILL)
def process_spawn(self):
while len(self.workers_http) < self.population:
self.worker_spawn(WorkerHTTP, self.workers_http)
while len(self.workers_cron) < config['max_cron_threads']:
self.worker_spawn(WorkerCron, self.workers_cron)
if not self.long_polling_pid:
self.long_polling_spawn()
def sleep(self):
try:
# map of fd -> worker
fds = dict([(w.watchdog_pipe[0],w) for k,w in self.workers.items()])
fd_in = fds.keys() + [self.pipe[0]]
# check for ping or internal wakeups
ready = select.select(fd_in, [], [], self.beat)
# update worker watchdogs
for fd in ready[0]:
if fd in fds:
fds[fd].watchdog_time = time.time()
try:
# empty pipe
while os.read(fd, 1):
pass
except OSError, e:
if e.errno not in [errno.EAGAIN]:
raise
except select.error, e:
if e[0] not in [errno.EINTR]:
raise
def start(self):
# wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
# by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
# signal handler to overcome this behaviour
self.pipe = self.pipe_new()
# set signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGHUP, self.signal_handler)
signal.signal(signal.SIGCHLD, self.signal_handler)
signal.signal(signal.SIGQUIT, self.signal_handler)
signal.signal(signal.SIGTTIN, self.signal_handler)
signal.signal(signal.SIGTTOU, self.signal_handler)
# listen to socket
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.setblocking(0)
self.socket.bind(self.address)
self.socket.listen(8*self.population)
def stop(self, graceful=True):
if self.long_polling_pid is not None:
self.worker_kill(self.long_polling_pid, signal.SIGKILL) # FIXME make longpolling process handle SIGTERM correctly
self.long_polling_pid = None
if graceful:
_logger.info("Stopping gracefully")
limit = time.time() + self.timeout
for pid in self.workers.keys():
self.worker_kill(pid, signal.SIGTERM)
while self.workers and time.time() < limit:
self.process_zombie()
time.sleep(0.1)
else:
_logger.info("Stopping forcefully")
for pid in self.workers.keys():
self.worker_kill(pid, signal.SIGTERM)
self.socket.close()
def run(self):
self.start()
_logger.debug("Multiprocess starting")
while 1:
try:
#_logger.debug("Multiprocess beat (%s)",time.time())
self.process_signals()
self.process_zombie()
self.process_timeout()
self.process_spawn()
self.sleep()
except KeyboardInterrupt:
_logger.debug("Multiprocess clean stop")
self.stop()
break
except Exception,e:
_logger.exception(e)
self.stop(False)
sys.exit(-1)
class Worker(object):
""" Workers """
def __init__(self, multi):
self.multi = multi
self.watchdog_time = time.time()
self.watchdog_pipe = multi.pipe_new()
# Can be set to None if no watchdog is desired.
self.watchdog_timeout = multi.timeout
self.ppid = os.getpid()
self.pid = None
self.alive = True
# should we rename into lifetime ?
self.request_max = multi.limit_request
self.request_count = 0
def setproctitle(self, title=""):
setproctitle('openerp: %s %s %s' % (self.__class__.__name__, self.pid, title))
def close(self):
os.close(self.watchdog_pipe[0])
os.close(self.watchdog_pipe[1])
def signal_handler(self, sig, frame):
self.alive = False
def sleep(self):
try:
ret = select.select([self.multi.socket], [], [], self.multi.beat)
except select.error, e:
if e[0] not in [errno.EINTR]:
raise
def process_limit(self):
# If our parent changed sucide
if self.ppid != os.getppid():
_logger.info("Worker (%s) Parent changed", self.pid)
self.alive = False
# check for lifetime
if self.request_count >= self.request_max:
_logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
self.alive = False
# Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
rss, vms = psutil.Process(os.getpid()).get_memory_info()
if vms > config['limit_memory_soft']:
_logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
self.alive = False # Commit suicide after the request.
# VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
soft, hard = resource.getrlimit(resource.RLIMIT_AS)
resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard))
# SIGXCPU (exceeded CPU time) signal handler will raise an exception.
r = resource.getrusage(resource.RUSAGE_SELF)
cpu_time = r.ru_utime + r.ru_stime
def time_expired(n, stack):
_logger.info('Worker (%d) CPU time limit (%s) reached.', config['limit_time_cpu'])
# We dont suicide in such case
raise Exception('CPU time limit exceeded.')
signal.signal(signal.SIGXCPU, time_expired)
soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard))
def process_work(self):
pass
def start(self):
self.pid = os.getpid()
self.setproctitle()
_logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
# Reseed the random number generator
random.seed()
# Prevent fd inherientence close_on_exec
flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
# reset blocking status
self.multi.socket.setblocking(0)
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
def stop(self):
pass
def run(self):
try:
self.start()
while self.alive:
self.process_limit()
self.multi.pipe_ping(self.watchdog_pipe)
self.sleep()
self.process_work()
_logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
self.stop()
except Exception,e:
_logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
# should we use 3 to abort everything ?
sys.exit(1)
class WorkerHTTP(Worker):
""" HTTP Request workers """
def process_request(self, client, addr):
client.setblocking(1)
client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# Prevent fd inherientence close_on_exec
flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
fcntl.fcntl(client, fcntl.F_SETFD, flags)
# do request using BaseWSGIServerNoBind monkey patched with socket
self.server.socket = client
# tolerate broken pipe when the http client closes the socket before
# receiving the full reply
try:
self.server.process_request(client,addr)
except IOError, e:
if e.errno != errno.EPIPE:
raise
self.request_count += 1
def process_work(self):
try:
client, addr = self.multi.socket.accept()
self.process_request(client, addr)
except socket.error, e:
if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
raise
def start(self):
Worker.start(self)
self.server = BaseWSGIServerNoBind(self.multi.app)
class WorkerCron(Worker):
""" Cron workers """
def __init__(self, multi):
super(WorkerCron, self).__init__(multi)
# process_work() below process a single database per call.
# The variable db_index is keeping track of the next database to
# process.
self.db_index = 0
def sleep(self):
# Really sleep once all the databases have been processed.
if self.db_index == 0:
interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect
time.sleep(interval)
def _db_list(self):
if config['db_name']:
db_names = config['db_name'].split(',')
else:
db_names = openerp.service.db.exp_list(True)
return db_names
def process_work(self):
rpc_request = logging.getLogger('openerp.netsvc.rpc.request')
rpc_request_flag = rpc_request.isEnabledFor(logging.DEBUG)
_logger.debug("WorkerCron (%s) polling for jobs", self.pid)
db_names = self._db_list()
if len(db_names):
self.db_index = (self.db_index + 1) % len(db_names)
db_name = db_names[self.db_index]
self.setproctitle(db_name)
if rpc_request_flag:
start_time = time.time()
start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
import openerp.addons.base as base
base.ir.ir_cron.ir_cron._acquire_job(db_name)
openerp.modules.registry.RegistryManager.delete(db_name)
# dont keep cursors in multi database mode
if len(db_names) > 1:
openerp.sql_db.close_db(db_name)
if rpc_request_flag:
end_time = time.time()
end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % (db_name, end_time - start_time, start_vms / 1024, end_vms / 1024, (end_vms - start_vms)/1024)
_logger.debug("WorkerCron (%s) %s", self.pid, logline)
self.request_count += 1
if self.request_count >= self.request_max and self.request_max < len(db_names):
_logger.error("There are more dabatases to process than allowed "
"by the `limit_request` configuration variable: %s more.",
len(db_names) - self.request_max)
else:
self.db_index = 0
def start(self):
os.nice(10) # mommy always told me to be nice with others...
Worker.start(self)
self.multi.socket.close()
# chorus effect: make cron workers do not all start at first database
mct = config['max_cron_threads']
p = float(self.pid % mct) / mct
self.db_index = int(len(self._db_list()) * p)
#----------------------------------------------------------
# start/stop public api
#----------------------------------------------------------
server = None
def load_server_wide_modules():
for m in openerp.conf.server_wide_modules:
try:
openerp.modules.module.load_openerp_module(m)
except Exception:
msg = ''
if m == 'web':
msg = """
The `web` module is provided by the addons found in the `openerp-web` project.
Maybe you forgot to add those addons in your addons_path configuration."""
_logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
def _reexec(updated_modules=None):
"""reexecute openerp-server process with (nearly) the same arguments"""
if openerp.tools.osutil.is_running_as_nt_service():
subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
exe = os.path.basename(sys.executable)
args = stripped_sys_argv()
args += ["-u", ','.join(updated_modules)]
if not args or args[0] != exe:
args.insert(0, exe)
os.execv(sys.executable, args)
def start():
""" Start the openerp http server and cron processor.
"""
global server
load_server_wide_modules()
if config['workers']:
openerp.multi_process = True
server = PreforkServer(openerp.service.wsgi_server.application)
elif openerp.evented:
server = GeventServer(openerp.service.wsgi_server.application)
else:
server = ThreadedServer(openerp.service.wsgi_server.application)
if config['auto_reload']:
autoreload = AutoReload(server)
autoreload.run()
server.run()
# like the legend of the phoenix, all ends with beginnings
if getattr(openerp, 'phoenix', False):
modules = []
if config['auto_reload']:
modules = autoreload.modules.keys()
_reexec(modules)
sys.exit(0)
def restart():
""" Restart the server
"""
if os.name == 'nt':
# run in a thread to let the current thread return response to the caller.
threading.Thread(target=_reexec).start()
else:
os.kill(server.pid, signal.SIGHUP)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -1,448 +0,0 @@
#-----------------------------------------------------------
# Multicorn, multiprocessing inspired by gunicorn
# TODO rename class: Multicorn -> Arbiter ?
#-----------------------------------------------------------
import errno
import fcntl
import logging
import os
import psutil
import random
import resource
import select
import signal
import socket
import sys
import time
import subprocess
import os.path
import werkzeug.serving
try:
from setproctitle import setproctitle
except ImportError:
setproctitle = lambda x: None
import openerp
import openerp.tools.config as config
from openerp.tools.misc import stripped_sys_argv
_logger = logging.getLogger(__name__)
class Multicorn(object):
""" Multiprocessing inspired by (g)unicorn.
Multicorn currently uses accept(2) as dispatching method between workers
but we plan to replace it by a more intelligent dispatcher to will parse
the first HTTP request line.
"""
def __init__(self, app):
# config
self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
self.population = config['workers']
self.timeout = config['limit_time_real']
self.limit_request = config['limit_request']
# working vars
self.beat = 4
self.app = app
self.pid = os.getpid()
self.socket = None
self.workers_http = {}
self.workers_cron = {}
self.workers = {}
self.generation = 0
self.queue = []
self.long_polling_pid = None
def pipe_new(self):
pipe = os.pipe()
for fd in pipe:
# non_blocking
flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
# close_on_exec
flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
fcntl.fcntl(fd, fcntl.F_SETFD, flags)
return pipe
def pipe_ping(self, pipe):
try:
os.write(pipe[1], '.')
except IOError, e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
def signal_handler(self, sig, frame):
if len(self.queue) < 5 or sig == signal.SIGCHLD:
self.queue.append(sig)
self.pipe_ping(self.pipe)
else:
_logger.warn("Dropping signal: %s", sig)
def worker_spawn(self, klass, workers_registry):
self.generation += 1
worker = klass(self)
pid = os.fork()
if pid != 0:
worker.pid = pid
self.workers[pid] = worker
workers_registry[pid] = worker
return worker
else:
worker.run()
sys.exit(0)
def long_polling_spawn(self):
nargs = stripped_sys_argv('--pidfile')
cmd = nargs[0]
cmd = os.path.join(os.path.dirname(cmd), "openerp-long-polling")
nargs[0] = cmd
popen = subprocess.Popen(nargs)
self.long_polling_pid = popen.pid
def worker_pop(self, pid):
if pid in self.workers:
_logger.debug("Worker (%s) unregistered",pid)
try:
self.workers_http.pop(pid,None)
self.workers_cron.pop(pid,None)
u = self.workers.pop(pid)
u.close()
except OSError:
return
def worker_kill(self, pid, sig):
try:
os.kill(pid, sig)
except OSError, e:
if e.errno == errno.ESRCH:
self.worker_pop(pid)
def process_signals(self):
while len(self.queue):
sig = self.queue.pop(0)
if sig in [signal.SIGINT,signal.SIGTERM]:
raise KeyboardInterrupt
def process_zombie(self):
# reap dead workers
while 1:
try:
wpid, status = os.waitpid(-1, os.WNOHANG)
if not wpid:
break
if (status >> 8) == 3:
msg = "Critial worker error (%s)"
_logger.critical(msg, wpid)
raise Exception(msg % wpid)
self.worker_pop(wpid)
except OSError, e:
if e.errno == errno.ECHILD:
break
raise
def process_timeout(self):
now = time.time()
for (pid, worker) in self.workers.items():
if (worker.watchdog_timeout is not None) and \
(now - worker.watchdog_time >= worker.watchdog_timeout):
_logger.error("Worker (%s) timeout", pid)
self.worker_kill(pid, signal.SIGKILL)
def process_spawn(self):
while len(self.workers_http) < self.population:
self.worker_spawn(WorkerHTTP, self.workers_http)
while len(self.workers_cron) < config['max_cron_threads']:
self.worker_spawn(WorkerCron, self.workers_cron)
if not self.long_polling_pid:
self.long_polling_spawn()
def sleep(self):
try:
# map of fd -> worker
fds = dict([(w.watchdog_pipe[0],w) for k,w in self.workers.items()])
fd_in = fds.keys() + [self.pipe[0]]
# check for ping or internal wakeups
ready = select.select(fd_in, [], [], self.beat)
# update worker watchdogs
for fd in ready[0]:
if fd in fds:
fds[fd].watchdog_time = time.time()
try:
# empty pipe
while os.read(fd, 1):
pass
except OSError, e:
if e.errno not in [errno.EAGAIN]:
raise
except select.error, e:
if e[0] not in [errno.EINTR]:
raise
def start(self):
# wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
# by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
# signal handler to overcome this behaviour
self.pipe = self.pipe_new()
# set signal
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGCHLD, self.signal_handler)
# listen to socket
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.setblocking(0)
self.socket.bind(self.address)
self.socket.listen(8*self.population)
def stop(self, graceful=True):
if self.long_polling_pid is not None:
self.worker_kill(self.long_polling_pid, signal.SIGKILL) # FIXME make longpolling process handle SIGTERM correctly
self.long_polling_pid = None
if graceful:
_logger.info("Stopping gracefully")
limit = time.time() + self.timeout
for pid in self.workers.keys():
self.worker_kill(pid, signal.SIGTERM)
while self.workers and time.time() < limit:
self.process_zombie()
time.sleep(0.1)
else:
_logger.info("Stopping forcefully")
for pid in self.workers.keys():
self.worker_kill(pid, signal.SIGTERM)
self.socket.close()
openerp.cli.server.quit_signals_received = 1
def run(self):
self.start()
_logger.debug("Multiprocess starting")
while 1:
try:
#_logger.debug("Multiprocess beat (%s)",time.time())
self.process_signals()
self.process_zombie()
self.process_timeout()
self.process_spawn()
self.sleep()
except KeyboardInterrupt:
_logger.debug("Multiprocess clean stop")
self.stop()
break
except Exception,e:
_logger.exception(e)
self.stop(False)
sys.exit(-1)
class Worker(object):
""" Workers """
def __init__(self, multi):
self.multi = multi
self.watchdog_time = time.time()
self.watchdog_pipe = multi.pipe_new()
# Can be set to None if no watchdog is desired.
self.watchdog_timeout = multi.timeout
self.ppid = os.getpid()
self.pid = None
self.alive = True
# should we rename into lifetime ?
self.request_max = multi.limit_request
self.request_count = 0
def setproctitle(self, title=""):
setproctitle('openerp: %s %s %s' % (self.__class__.__name__, self.pid, title))
def close(self):
os.close(self.watchdog_pipe[0])
os.close(self.watchdog_pipe[1])
def signal_handler(self, sig, frame):
self.alive = False
def sleep(self):
try:
ret = select.select([self.multi.socket], [], [], self.multi.beat)
except select.error, e:
if e[0] not in [errno.EINTR]:
raise
def process_limit(self):
# If our parent changed sucide
if self.ppid != os.getppid():
_logger.info("Worker (%s) Parent changed", self.pid)
self.alive = False
# check for lifetime
if self.request_count >= self.request_max:
_logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
self.alive = False
# Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
rss, vms = psutil.Process(os.getpid()).get_memory_info()
if vms > config['limit_memory_soft']:
_logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
self.alive = False # Commit suicide after the request.
# VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
soft, hard = resource.getrlimit(resource.RLIMIT_AS)
resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard))
# SIGXCPU (exceeded CPU time) signal handler will raise an exception.
r = resource.getrusage(resource.RUSAGE_SELF)
cpu_time = r.ru_utime + r.ru_stime
def time_expired(n, stack):
_logger.info('Worker (%d) CPU time limit (%s) reached.', config['limit_time_cpu'])
# We dont suicide in such case
raise Exception('CPU time limit exceeded.')
signal.signal(signal.SIGXCPU, time_expired)
soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard))
def process_work(self):
pass
def start(self):
self.pid = os.getpid()
self.setproctitle()
_logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
# Reseed the random number generator
random.seed()
# Prevent fd inherientence close_on_exec
flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
# reset blocking status
self.multi.socket.setblocking(0)
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
def stop(self):
pass
def run(self):
try:
self.start()
while self.alive:
self.process_limit()
self.multi.pipe_ping(self.watchdog_pipe)
self.sleep()
self.process_work()
_logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
self.stop()
except Exception,e:
_logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
# should we use 3 to abort everything ?
sys.exit(1)
class WorkerHTTP(Worker):
""" HTTP Request workers """
def process_request(self, client, addr):
client.setblocking(1)
client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# Prevent fd inherientence close_on_exec
flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
fcntl.fcntl(client, fcntl.F_SETFD, flags)
# do request using WorkerBaseWSGIServer monkey patched with socket
self.server.socket = client
# tolerate broken pipe when the http client closes the socket before
# receiving the full reply
try:
self.server.process_request(client,addr)
except IOError, e:
if e.errno != errno.EPIPE:
raise
self.request_count += 1
def process_work(self):
try:
client, addr = self.multi.socket.accept()
self.process_request(client, addr)
except socket.error, e:
if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
raise
def start(self):
Worker.start(self)
self.server = WorkerBaseWSGIServer(self.multi.app)
class WorkerBaseWSGIServer(werkzeug.serving.BaseWSGIServer):
""" werkzeug WSGI Server patched to allow using an external listen socket
"""
def __init__(self, app):
werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app)
def server_bind(self):
# we dont bind beause we use the listen socket of Multicorn#socket
# instead we close the socket
if self.socket:
self.socket.close()
def server_activate(self):
# dont listen as we use Multicorn#socket
pass
class WorkerCron(Worker):
""" Cron workers """
def __init__(self, multi):
super(WorkerCron, self).__init__(multi)
# process_work() below process a single database per call.
# The variable db_index is keeping track of the next database to
# process.
self.db_index = 0
def sleep(self):
# Really sleep once all the databases have been processed.
if self.db_index == 0:
interval = 60 + self.pid % 10 # chorus effect
time.sleep(interval)
def _db_list(self):
if config['db_name']:
db_names = config['db_name'].split(',')
else:
db_names = openerp.service.db.exp_list(True)
return db_names
def process_work(self):
rpc_request = logging.getLogger('openerp.netsvc.rpc.request')
rpc_request_flag = rpc_request.isEnabledFor(logging.DEBUG)
_logger.debug("WorkerCron (%s) polling for jobs", self.pid)
db_names = self._db_list()
if len(db_names):
self.db_index = (self.db_index + 1) % len(db_names)
db_name = db_names[self.db_index]
self.setproctitle(db_name)
if rpc_request_flag:
start_time = time.time()
start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
import openerp.addons.base as base
base.ir.ir_cron.ir_cron._acquire_job(db_name)
openerp.modules.registry.RegistryManager.delete(db_name)
# dont keep cursors in multi database mode
if len(db_names) > 1:
openerp.sql_db.close_db(db_name)
if rpc_request_flag:
end_time = time.time()
end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % (db_name, end_time - start_time, start_vms / 1024, end_vms / 1024, (end_vms - start_vms)/1024)
_logger.debug("WorkerCron (%s) %s", self.pid, logline)
self.request_count += 1
if self.request_count >= self.request_max and self.request_max < len(db_names):
_logger.error("There are more dabatases to process than allowed "
"by the `limit_request` configuration variable: %s more.",
len(db_names) - self.request_max)
else:
self.db_index = 0
def start(self):
os.nice(10) # mommy always told me to be nice with others...
Worker.start(self)
self.multi.socket.close()
openerp.service.start_internal()
# chorus effect: make cron workers do not all start at first database
mct = config['max_cron_threads']
p = float(self.pid % mct) / mct
self.db_index = int(len(self._db_list()) * p)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -400,8 +400,6 @@ def application_unproxied(environ, start_response):
if hasattr(threading.current_thread(), 'dbname'):
del threading.current_thread().dbname
openerp.service.start_internal()
# Try all handlers until one returns some result (i.e. not None).
wsgi_handlers = [wsgi_xmlrpc_1, wsgi_xmlrpc, wsgi_xmlrpc_legacy, wsgi_webdav]
wsgi_handlers += module_handlers
@ -422,69 +420,5 @@ def application(environ, start_response):
else:
return application_unproxied(environ, start_response)
# The WSGI server, started by start_server(), stopped by stop_server().
httpd = None
def serve(interface, port, threaded):
""" Serve HTTP requests via werkzeug development server.
Calling this function is blocking, you might want to call it in its own
thread.
"""
global httpd
if not openerp.evented:
httpd = werkzeug.serving.make_server(interface, port, application, threaded=threaded)
else:
from gevent.wsgi import WSGIServer
httpd = WSGIServer((interface, port), application)
httpd.serve_forever()
def start_service():
""" Call serve() in its own thread.
The WSGI server can be shutdown with stop_server() below.
"""
# TODO Change the xmlrpc_* options to http_*
interface = config['xmlrpc_interface'] or '0.0.0.0'
port = config['xmlrpc_port']
_logger.info('HTTP service (werkzeug) running on %s:%s', interface, port)
if not openerp.evented:
threading.Thread(target=serve, args=(interface, port, True)).start()
else:
serve(interface, port, True)
def stop_service():
""" Initiate the shutdown of the WSGI server.
The server is supposed to have been started by start_server() above.
"""
if httpd:
if not openerp.evented:
httpd.shutdown()
close_socket(httpd.socket)
else:
import gevent
httpd.stop()
gevent.shutdown()
def close_socket(sock):
""" Closes a socket instance cleanly
:param sock: the network socket to close
:type sock: socket.socket
"""
try:
sock.shutdown(socket.SHUT_RDWR)
except socket.error, e:
# On OSX, socket shutdowns both sides if any side closes it
# causing an error 57 'Socket is not connected' on shutdown
# of the other side (or something), see
# http://bugs.python.org/issue4397
# note: stdlib fixed test, not behavior
if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
raise
sock.close()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -106,7 +106,6 @@ class configmanager(object):
help="specify additional addons paths (separated by commas).",
action="callback", callback=self._check_addons_path, nargs=1, type="string")
group.add_option("--load", dest="server_wide_modules", help="Comma-separated list of server-wide modules default=web")
group.add_option("--gevent", dest="gevent", action="store_true", my_default=False, help="Activate the GEvent mode, this also desactivate the cron.")
parser.add_option_group(group)
# XML-RPC / HTTP
@ -246,6 +245,7 @@ class configmanager(object):
# Advanced options
group = optparse.OptionGroup(parser, "Advanced options")
group.add_option('--auto-reload', dest='auto_reload', action='store_true', my_default=False, help='enable auto reload')
group.add_option('--debug', dest='debug_mode', action='store_true', my_default=False, help='enable debug mode')
group.add_option("--stop-after-init", action="store_true", dest="stop_after_init", my_default=False,
help="stop the server after its initialization")
@ -399,7 +399,7 @@ class configmanager(object):
'list_db', 'xmlrpcs', 'proxy_mode',
'test_file', 'test_enable', 'test_commit', 'test_report_directory',
'osv_memory_count_limit', 'osv_memory_age_limit', 'max_cron_threads', 'unaccent',
'workers', 'limit_memory_hard', 'limit_memory_soft', 'limit_time_cpu', 'limit_time_real', 'limit_request', 'gevent'
'workers', 'limit_memory_hard', 'limit_memory_soft', 'limit_time_cpu', 'limit_time_real', 'limit_request', 'auto_reload'
]
for arg in keys:

View File

@ -30,9 +30,9 @@ def run(args):
import gevent
import gevent.monkey
import gevent.wsgi
import gevent_psycopg2
import psycogreen.gevent
gevent.monkey.patch_all()
gevent_psycopg2.monkey_patch()
psycogreen.gevent.patch_psycopg()
import threading
import openerp
import openerp.cli.server

View File

@ -120,7 +120,7 @@ setuptools.setup(
'feedparser',
'gdata',
'gevent',
'gevent-psycopg2',
'psycogreen',
'Jinja2',
'lxml', # windows binary http://www.lfd.uci.edu/~gohlke/pythonlibs/
'mako',