[IMP] openerp threaded, gevent, prefork service cleanup
- unify signal handling - unify start and stop no new feature yet, it paves the way for - developement mode auto reload - graceful restart on HUP - multiprocessing and gevent on windows bzr revid: al@openerp.com-20130908173535-xomt5w7xmqtwkmyy
This commit is contained in:
parent
9b9abd31af
commit
12580e690a
|
@ -1,11 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import gevent.monkey
|
||||
gevent.monkey.patch_all()
|
||||
import gevent_psycopg2
|
||||
gevent_psycopg2.monkey_patch()
|
||||
|
||||
import openerp
|
||||
|
||||
if __name__ == "__main__":
|
||||
openerp.cli.main()
|
|
@ -27,9 +27,16 @@ import sys
|
|||
|
||||
# Is the server running with gevent.
|
||||
evented = False
|
||||
if sys.modules.get("gevent") is not None:
|
||||
evented = True
|
||||
for i in sys.argv:
|
||||
if i.startswith('--gevent'):
|
||||
evented = True
|
||||
break
|
||||
|
||||
if evented:
|
||||
import gevent.monkey
|
||||
gevent.monkey.patch_all()
|
||||
import gevent_psycopg2
|
||||
gevent_psycopg2.monkey_patch()
|
||||
|
||||
# Make sure the OpenERP server runs in UTC. This is especially necessary
|
||||
# under Windows as under Linux it seems the real import of time is
|
||||
|
|
|
@ -147,112 +147,15 @@ def import_translation():
|
|||
cr.commit()
|
||||
cr.close()
|
||||
|
||||
# Variable keeping track of the number of calls to the signal handler defined
|
||||
# below. This variable is monitored by ``quit_on_signals()``.
|
||||
quit_signals_received = 0
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
""" Signal handler: exit ungracefully on the second handled signal.
|
||||
|
||||
:param sig: the signal number
|
||||
:param frame: the interrupted stack frame or None
|
||||
"""
|
||||
global quit_signals_received
|
||||
quit_signals_received += 1
|
||||
if quit_signals_received > 1:
|
||||
# logging.shutdown was already called at this point.
|
||||
sys.stderr.write("Forced shutdown.\n")
|
||||
os._exit(0)
|
||||
|
||||
def dumpstacks(sig, frame):
|
||||
""" Signal handler: dump a stack trace for each existing thread."""
|
||||
# code from http://stackoverflow.com/questions/132058/getting-stack-trace-from-a-running-python-application#answer-2569696
|
||||
# modified for python 2.5 compatibility
|
||||
threads_info = dict([(th.ident, {'name': th.name,
|
||||
'uid': getattr(th,'uid','n/a')})
|
||||
for th in threading.enumerate()])
|
||||
code = []
|
||||
for threadId, stack in sys._current_frames().items():
|
||||
thread_info = threads_info.get(threadId)
|
||||
code.append("\n# Thread: %s (id:%s) (uid:%s)" % \
|
||||
(thread_info and thread_info['name'] or 'n/a',
|
||||
threadId,
|
||||
thread_info and thread_info['uid'] or 'n/a'))
|
||||
for filename, lineno, name, line in traceback.extract_stack(stack):
|
||||
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
|
||||
if line:
|
||||
code.append(" %s" % (line.strip()))
|
||||
_logger.info("\n".join(code))
|
||||
|
||||
def setup_signal_handlers(signal_handler):
|
||||
""" Register the given signal handler. """
|
||||
SIGNALS = (signal.SIGINT, signal.SIGTERM)
|
||||
if os.name == 'posix':
|
||||
map(lambda sig: signal.signal(sig, signal_handler), SIGNALS)
|
||||
signal.signal(signal.SIGQUIT, dumpstacks)
|
||||
elif os.name == 'nt':
|
||||
import win32api
|
||||
win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1)
|
||||
|
||||
def quit_on_signals():
|
||||
""" Wait for one or two signals then shutdown the server.
|
||||
|
||||
The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
|
||||
a second one if any will force an immediate exit.
|
||||
|
||||
"""
|
||||
# Wait for a first signal to be handled. (time.sleep will be interrupted
|
||||
# by the signal handler.) The try/except is for the win32 case.
|
||||
try:
|
||||
while quit_signals_received == 0:
|
||||
time.sleep(60)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
config = openerp.tools.config
|
||||
openerp.service.stop_services()
|
||||
|
||||
if getattr(openerp, 'phoenix', False):
|
||||
# like the phoenix, reborn from ashes...
|
||||
openerp.service._reexec()
|
||||
return
|
||||
|
||||
if config['pidfile']:
|
||||
os.unlink(config['pidfile'])
|
||||
sys.exit(0)
|
||||
|
||||
def watch_parent(beat=4):
|
||||
import gevent
|
||||
ppid = os.getppid()
|
||||
while True:
|
||||
if ppid != os.getppid():
|
||||
pid = os.getpid()
|
||||
_logger.info("LongPolling (%s) Parent changed", pid)
|
||||
# suicide !!
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
return
|
||||
gevent.sleep(beat)
|
||||
|
||||
def main(args):
|
||||
check_root_user()
|
||||
openerp.tools.config.parse_config(args)
|
||||
|
||||
if openerp.tools.config.options["gevent"]:
|
||||
openerp.evented = True
|
||||
_logger.info('Using gevent mode')
|
||||
import gevent.monkey
|
||||
gevent.monkey.patch_all()
|
||||
import gevent_psycopg2
|
||||
gevent_psycopg2.monkey_patch()
|
||||
|
||||
check_postgres_user()
|
||||
openerp.netsvc.init_logger()
|
||||
report_configuration()
|
||||
|
||||
config = openerp.tools.config
|
||||
|
||||
setup_signal_handlers(signal_handler)
|
||||
|
||||
if config["test_file"]:
|
||||
run_test_file(config['db_name'], config['test_file'])
|
||||
sys.exit(0)
|
||||
|
@ -265,28 +168,19 @@ def main(args):
|
|||
import_translation()
|
||||
sys.exit(0)
|
||||
|
||||
if not config["stop_after_init"]:
|
||||
setup_pid_file()
|
||||
# Some module register themselves when they are loaded so we need the
|
||||
# services to be running before loading any registry.
|
||||
if not openerp.evented:
|
||||
if config['workers']:
|
||||
openerp.service.start_services_workers()
|
||||
else:
|
||||
openerp.service.start_services()
|
||||
else:
|
||||
config['xmlrpc_port'] = config['longpolling_port']
|
||||
import gevent
|
||||
gevent.spawn(watch_parent)
|
||||
openerp.service.start_services()
|
||||
|
||||
# preload registryies, needed for -u --stop_after_init
|
||||
rc = 0
|
||||
if config['db_name']:
|
||||
for dbname in config['db_name'].split(','):
|
||||
if not preload_registry(dbname):
|
||||
rc += 1
|
||||
|
||||
if config["stop_after_init"]:
|
||||
if not config["stop_after_init"]:
|
||||
setup_pid_file()
|
||||
openerp.service.workers.start()
|
||||
if config['pidfile']:
|
||||
os.unlink(config['pidfile'])
|
||||
else:
|
||||
sys.exit(rc)
|
||||
|
||||
_logger.info('OpenERP server is running, waiting for connections...')
|
||||
|
|
|
@ -20,29 +20,12 @@
|
|||
#
|
||||
##############################################################################
|
||||
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
import cron
|
||||
import wsgi_server
|
||||
|
||||
import openerp
|
||||
import openerp.modules
|
||||
import openerp.netsvc
|
||||
import openerp.osv
|
||||
from openerp.release import nt_service_name
|
||||
import openerp.tools
|
||||
from openerp.tools.misc import stripped_sys_argv
|
||||
|
||||
import common
|
||||
import db
|
||||
import model
|
||||
import report
|
||||
import wsgi_server
|
||||
import workers
|
||||
|
||||
#.apidoc title: RPC Services
|
||||
|
||||
|
@ -55,100 +38,5 @@ import report
|
|||
low-level behavior of the wire.
|
||||
"""
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
def load_server_wide_modules():
|
||||
for m in openerp.conf.server_wide_modules:
|
||||
try:
|
||||
openerp.modules.module.load_openerp_module(m)
|
||||
except Exception:
|
||||
msg = ''
|
||||
if m == 'web':
|
||||
msg = """
|
||||
The `web` module is provided by the addons found in the `openerp-web` project.
|
||||
Maybe you forgot to add those addons in your addons_path configuration."""
|
||||
_logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
|
||||
|
||||
start_internal_done = False
|
||||
main_thread_id = threading.currentThread().ident
|
||||
|
||||
def start_internal():
|
||||
global start_internal_done
|
||||
if start_internal_done:
|
||||
return
|
||||
openerp.netsvc.init_logger()
|
||||
|
||||
load_server_wide_modules()
|
||||
start_internal_done = True
|
||||
|
||||
def start_services():
|
||||
""" Start all services including http, and cron """
|
||||
start_internal()
|
||||
# Start the WSGI server.
|
||||
wsgi_server.start_service()
|
||||
# Start the main cron thread.
|
||||
if not openerp.evented:
|
||||
cron.start_service()
|
||||
|
||||
def stop_services():
|
||||
""" Stop all services. """
|
||||
# stop services
|
||||
if not openerp.evented:
|
||||
cron.stop_service()
|
||||
wsgi_server.stop_service()
|
||||
|
||||
_logger.info("Initiating shutdown")
|
||||
_logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
|
||||
|
||||
# Manually join() all threads before calling sys.exit() to allow a second signal
|
||||
# to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
|
||||
# threading.Thread.join() should not mask signals (at least in python 2.5).
|
||||
me = threading.currentThread()
|
||||
_logger.debug('current thread: %r', me)
|
||||
for thread in threading.enumerate():
|
||||
_logger.debug('process %r (%r)', thread, thread.isDaemon())
|
||||
if thread != me and not thread.isDaemon() and thread.ident != main_thread_id:
|
||||
while thread.isAlive():
|
||||
_logger.debug('join and sleep')
|
||||
# Need a busyloop here as thread.join() masks signals
|
||||
# and would prevent the forced shutdown.
|
||||
thread.join(0.05)
|
||||
time.sleep(0.05)
|
||||
|
||||
_logger.debug('--')
|
||||
openerp.modules.registry.RegistryManager.delete_all()
|
||||
logging.shutdown()
|
||||
|
||||
def start_services_workers():
|
||||
import openerp.service.workers
|
||||
openerp.multi_process = True
|
||||
openerp.service.workers.Multicorn(openerp.service.wsgi_server.application).run()
|
||||
|
||||
def _reexec():
|
||||
"""reexecute openerp-server process with (nearly) the same arguments"""
|
||||
if openerp.tools.osutil.is_running_as_nt_service():
|
||||
subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
|
||||
exe = os.path.basename(sys.executable)
|
||||
args = stripped_sys_argv()
|
||||
if not args or args[0] != exe:
|
||||
args.insert(0, exe)
|
||||
os.execv(sys.executable, args)
|
||||
|
||||
def restart_server():
|
||||
if openerp.multi_process:
|
||||
raise NotImplementedError("Multicorn is not supported (but gunicorn was)")
|
||||
pid = openerp.wsgi.core.arbiter_pid
|
||||
os.kill(pid, signal.SIGHUP)
|
||||
else:
|
||||
if os.name == 'nt':
|
||||
def reborn():
|
||||
stop_services()
|
||||
_reexec()
|
||||
|
||||
# run in a thread to let the current thread return response to the caller.
|
||||
threading.Thread(target=reborn).start()
|
||||
else:
|
||||
openerp.phoenix = True
|
||||
os.kill(os.getpid(), signal.SIGINT)
|
||||
|
||||
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|
||||
|
|
|
@ -1,76 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
##############################################################################
|
||||
#
|
||||
# OpenERP, Open Source Management Solution
|
||||
# Copyright (C) 2004-2011 OpenERP SA (<http://www.openerp.com>)
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
##############################################################################
|
||||
|
||||
""" Cron jobs scheduling
|
||||
|
||||
Cron jobs are defined in the ir_cron table/model. This module deals with all
|
||||
cron jobs, for all databases of a single OpenERP server instance.
|
||||
|
||||
"""
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
import openerp
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
SLEEP_INTERVAL = 60 # 1 min
|
||||
|
||||
def cron_runner(number):
|
||||
while True:
|
||||
time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style
|
||||
registries = openerp.modules.registry.RegistryManager.registries
|
||||
_logger.debug('cron%d polling for jobs', number)
|
||||
for db_name, registry in registries.items():
|
||||
while True and registry.ready:
|
||||
acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
|
||||
if not acquired:
|
||||
break
|
||||
|
||||
def start_service():
|
||||
""" Start the above runner function in a daemon thread.
|
||||
|
||||
The thread is a typical daemon thread: it will never quit and must be
|
||||
terminated when the main process exits - with no consequence (the processing
|
||||
threads it spawns are not marked daemon).
|
||||
|
||||
"""
|
||||
|
||||
# Force call to strptime just before starting the cron thread
|
||||
# to prevent time.strptime AttributeError within the thread.
|
||||
# See: http://bugs.python.org/issue7980
|
||||
datetime.strptime('2012-01-01', '%Y-%m-%d')
|
||||
|
||||
for i in range(openerp.tools.config['max_cron_threads']):
|
||||
def target():
|
||||
cron_runner(i)
|
||||
t = threading.Thread(target=target, name="openerp.service.cron.cron%d" % i)
|
||||
t.setDaemon(True)
|
||||
t.start()
|
||||
_logger.debug("cron%d started!" % i)
|
||||
|
||||
def stop_service():
|
||||
pass
|
||||
|
||||
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|
|
@ -1,7 +1,7 @@
|
|||
#-----------------------------------------------------------
|
||||
# Multicorn, multiprocessing inspired by gunicorn
|
||||
# TODO rename class: Multicorn -> Arbiter ?
|
||||
# Threaded, Gevent and Prefork Servers
|
||||
#-----------------------------------------------------------
|
||||
import datetime
|
||||
import errno
|
||||
import fcntl
|
||||
import logging
|
||||
|
@ -13,10 +13,14 @@ import select
|
|||
import signal
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
import subprocess
|
||||
import os.path
|
||||
|
||||
import wsgi_server
|
||||
|
||||
import werkzeug.serving
|
||||
try:
|
||||
from setproctitle import setproctitle
|
||||
|
@ -25,11 +29,238 @@ except ImportError:
|
|||
|
||||
import openerp
|
||||
import openerp.tools.config as config
|
||||
from openerp.release import nt_service_name
|
||||
from openerp.tools.misc import stripped_sys_argv
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
class Multicorn(object):
|
||||
SLEEP_INTERVAL = 60 # 1 min
|
||||
|
||||
#----------------------------------------------------------
|
||||
# Common
|
||||
#----------------------------------------------------------
|
||||
|
||||
class CommonServer(object):
|
||||
def __init__(self, app):
|
||||
# TODO Change the xmlrpc_* options to http_*
|
||||
self.app = app
|
||||
# config
|
||||
self.interface = config['xmlrpc_interface'] or '0.0.0.0'
|
||||
self.port = config['xmlrpc_port']
|
||||
# runtime
|
||||
self.pid = os.getpid()
|
||||
|
||||
def dumpstacks(self):
|
||||
""" Signal handler: dump a stack trace for each existing thread."""
|
||||
# code from http://stackoverflow.com/questions/132058/getting-stack-trace-from-a-running-python-application#answer-2569696
|
||||
# modified for python 2.5 compatibility
|
||||
threads_info = dict([(th.ident, {'name': th.name,
|
||||
'uid': getattr(th,'uid','n/a')})
|
||||
for th in threading.enumerate()])
|
||||
code = []
|
||||
for threadId, stack in sys._current_frames().items():
|
||||
thread_info = threads_info.get(threadId)
|
||||
code.append("\n# Thread: %s (id:%s) (uid:%s)" % \
|
||||
(thread_info and thread_info['name'] or 'n/a',
|
||||
threadId,
|
||||
thread_info and thread_info['uid'] or 'n/a'))
|
||||
for filename, lineno, name, line in traceback.extract_stack(stack):
|
||||
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
|
||||
if line:
|
||||
code.append(" %s" % (line.strip()))
|
||||
_logger.info("\n".join(code))
|
||||
|
||||
def close_socket(self, sock):
|
||||
""" Closes a socket instance cleanly
|
||||
:param sock: the network socket to close
|
||||
:type sock: socket.socket
|
||||
"""
|
||||
try:
|
||||
sock.shutdown(socket.SHUT_RDWR)
|
||||
except socket.error, e:
|
||||
# On OSX, socket shutdowns both sides if any side closes it
|
||||
# causing an error 57 'Socket is not connected' on shutdown
|
||||
# of the other side (or something), see
|
||||
# http://bugs.python.org/issue4397
|
||||
# note: stdlib fixed test, not behavior
|
||||
if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
|
||||
raise
|
||||
sock.close()
|
||||
|
||||
#----------------------------------------------------------
|
||||
# Threaded
|
||||
#----------------------------------------------------------
|
||||
|
||||
class ThreadedServer(CommonServer):
|
||||
def __init__(self, app):
|
||||
super(ThreadedServer, self).__init__(app)
|
||||
self.main_thread_id = threading.currentThread().ident
|
||||
# Variable keeping track of the number of calls to the signal handler defined
|
||||
# below. This variable is monitored by ``quit_on_signals()``.
|
||||
self.quit_signals_received = 0
|
||||
|
||||
#self.socket = None
|
||||
#self.queue = []
|
||||
self.httpd = None
|
||||
|
||||
def signal_handler(self, sig, frame):
|
||||
if sig in [signal.SIGINT,signal.SIGTERM]:
|
||||
# shutdown on kill -INT or -TERM
|
||||
self.quit_signals_received += 1
|
||||
if self.quit_signals_received > 1:
|
||||
# logging.shutdown was already called at this point.
|
||||
sys.stderr.write("Forced shutdown.\n")
|
||||
os._exit(0)
|
||||
elif sig == signal.SIGHUP:
|
||||
# restart on kill -HUP
|
||||
openerp.phoenix = True
|
||||
self.quit_signals_received += 1
|
||||
elif sig == signal.SIGQUIT:
|
||||
# dump stacks on kill -3
|
||||
self.dumpstacks()
|
||||
|
||||
def cron_thread(self, number):
|
||||
while True:
|
||||
time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style
|
||||
registries = openerp.modules.registry.RegistryManager.registries
|
||||
_logger.debug('cron%d polling for jobs', number)
|
||||
for db_name, registry in registries.items():
|
||||
while True and registry.ready:
|
||||
acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
|
||||
if not acquired:
|
||||
break
|
||||
|
||||
def cron_spawn(self):
|
||||
""" Start the above runner function in a daemon thread.
|
||||
|
||||
The thread is a typical daemon thread: it will never quit and must be
|
||||
terminated when the main process exits - with no consequence (the processing
|
||||
threads it spawns are not marked daemon).
|
||||
|
||||
"""
|
||||
# Force call to strptime just before starting the cron thread
|
||||
# to prevent time.strptime AttributeError within the thread.
|
||||
# See: http://bugs.python.org/issue7980
|
||||
datetime.datetime.strptime('2012-01-01', '%Y-%m-%d')
|
||||
for i in range(openerp.tools.config['max_cron_threads']):
|
||||
def target():
|
||||
self.cron_thread(i)
|
||||
t = threading.Thread(target=target, name="openerp.service.cron.cron%d" % i)
|
||||
t.setDaemon(True)
|
||||
t.start()
|
||||
_logger.debug("cron%d started!" % i)
|
||||
|
||||
def http_thread(self):
|
||||
self.httpd = werkzeug.serving.make_server(self.interface, self.port, self.app, threaded=True)
|
||||
self.httpd.serve_forever()
|
||||
|
||||
def http_spawn(self):
|
||||
threading.Thread(target=self.http_thread).start()
|
||||
_logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
|
||||
|
||||
def start(self):
|
||||
_logger.debug("Setting signal handlers")
|
||||
if os.name == 'posix':
|
||||
signal.signal(signal.SIGINT, self.signal_handler)
|
||||
signal.signal(signal.SIGTERM, self.signal_handler)
|
||||
signal.signal(signal.SIGCHLD, self.signal_handler)
|
||||
signal.signal(signal.SIGHUP, self.signal_handler)
|
||||
signal.signal(signal.SIGQUIT, self.signal_handler)
|
||||
elif os.name == 'nt':
|
||||
import win32api
|
||||
win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1)
|
||||
self.cron_spawn()
|
||||
self.http_spawn()
|
||||
|
||||
def stop(self):
|
||||
""" Shutdown the WSGI server. Wait for non deamon threads.
|
||||
"""
|
||||
_logger.info("Initiating shutdown")
|
||||
_logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
|
||||
|
||||
self.httpd.shutdown()
|
||||
self.close_socket(self.httpd.socket)
|
||||
|
||||
# Manually join() all threads before calling sys.exit() to allow a second signal
|
||||
# to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
|
||||
# threading.Thread.join() should not mask signals (at least in python 2.5).
|
||||
me = threading.currentThread()
|
||||
_logger.debug('current thread: %r', me)
|
||||
for thread in threading.enumerate():
|
||||
_logger.debug('process %r (%r)', thread, thread.isDaemon())
|
||||
if thread != me and not thread.isDaemon() and thread.ident != main_thread_id:
|
||||
while thread.isAlive():
|
||||
_logger.debug('join and sleep')
|
||||
# Need a busyloop here as thread.join() masks signals
|
||||
# and would prevent the forced shutdown.
|
||||
thread.join(0.05)
|
||||
time.sleep(0.05)
|
||||
|
||||
_logger.debug('--')
|
||||
openerp.modules.registry.RegistryManager.delete_all()
|
||||
logging.shutdown()
|
||||
|
||||
def run(self):
|
||||
""" Start the http server and the cron thread then wait for a signal.
|
||||
|
||||
The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
|
||||
a second one if any will force an immediate exit.
|
||||
"""
|
||||
self.start()
|
||||
|
||||
# Wait for a first signal to be handled. (time.sleep will be interrupted
|
||||
# by the signal handler.) The try/except is for the win32 case.
|
||||
try:
|
||||
while self.quit_signals_received == 0:
|
||||
time.sleep(60)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
self.stop()
|
||||
|
||||
#----------------------------------------------------------
|
||||
# Gevent
|
||||
#----------------------------------------------------------
|
||||
|
||||
class GeventServer(CommonServer):
|
||||
def __init__(self, app):
|
||||
super(GeventServer, self).__init__(app)
|
||||
self.port = config['longpolling_port']
|
||||
self.httpd = None
|
||||
|
||||
def watch_parent(self, beat=4):
|
||||
import gevent
|
||||
ppid = os.getppid()
|
||||
while True:
|
||||
if ppid != os.getppid():
|
||||
pid = os.getpid()
|
||||
_logger.info("LongPolling (%s) Parent changed", pid)
|
||||
# suicide !!
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
return
|
||||
gevent.sleep(beat)
|
||||
|
||||
def start(self):
|
||||
import gevent
|
||||
from gevent.wsgi import WSGIServer
|
||||
gevent.spawn(self.watch_parent)
|
||||
self.httpd = WSGIServer((self.interface, self.port), self.app)
|
||||
self.httpd.serve_forever()
|
||||
|
||||
def stop(self):
|
||||
import gevent
|
||||
self.httpd.stop()
|
||||
gevent.shutdown()
|
||||
|
||||
def run(self):
|
||||
self.start()
|
||||
self.stop()
|
||||
|
||||
#----------------------------------------------------------
|
||||
# Prefork
|
||||
#----------------------------------------------------------
|
||||
|
||||
class Multicorn(CommonServer):
|
||||
""" Multiprocessing inspired by (g)unicorn.
|
||||
Multicorn currently uses accept(2) as dispatching method between workers
|
||||
but we plan to replace it by a more intelligent dispatcher to will parse
|
||||
|
@ -92,10 +323,8 @@ class Multicorn(object):
|
|||
sys.exit(0)
|
||||
|
||||
def long_polling_spawn(self):
|
||||
nargs = stripped_sys_argv('--pidfile')
|
||||
cmd = nargs[0]
|
||||
cmd = os.path.join(os.path.dirname(cmd), "openerp-long-polling")
|
||||
nargs[0] = cmd
|
||||
nargs = stripped_sys_argv('--pidfile','--workers')
|
||||
nargs += ['--gevent']
|
||||
popen = subprocess.Popen(nargs)
|
||||
self.long_polling_pid = popen.pid
|
||||
|
||||
|
@ -122,6 +351,13 @@ class Multicorn(object):
|
|||
sig = self.queue.pop(0)
|
||||
if sig in [signal.SIGINT,signal.SIGTERM]:
|
||||
raise KeyboardInterrupt
|
||||
elif sig == signal.SIGHUP:
|
||||
# restart on kill -HUP
|
||||
openerp.phoenix = True
|
||||
raise KeyboardInterrupt
|
||||
elif sig == signal.SIGQUIT:
|
||||
# dump stacks on kill -3
|
||||
self.dumpstacks()
|
||||
|
||||
def process_zombie(self):
|
||||
# reap dead workers
|
||||
|
@ -211,7 +447,6 @@ class Multicorn(object):
|
|||
for pid in self.workers.keys():
|
||||
self.worker_kill(pid, signal.SIGTERM)
|
||||
self.socket.close()
|
||||
openerp.cli.server.quit_signals_received = 1
|
||||
|
||||
def run(self):
|
||||
self.start()
|
||||
|
@ -389,7 +624,7 @@ class WorkerCron(Worker):
|
|||
def sleep(self):
|
||||
# Really sleep once all the databases have been processed.
|
||||
if self.db_index == 0:
|
||||
interval = 60 + self.pid % 10 # chorus effect
|
||||
interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect
|
||||
time.sleep(interval)
|
||||
|
||||
def _db_list(self):
|
||||
|
@ -437,12 +672,66 @@ class WorkerCron(Worker):
|
|||
os.nice(10) # mommy always told me to be nice with others...
|
||||
Worker.start(self)
|
||||
self.multi.socket.close()
|
||||
openerp.service.start_internal()
|
||||
|
||||
# chorus effect: make cron workers do not all start at first database
|
||||
mct = config['max_cron_threads']
|
||||
p = float(self.pid % mct) / mct
|
||||
self.db_index = int(len(self._db_list()) * p)
|
||||
|
||||
#----------------------------------------------------------
|
||||
# start/stop public api
|
||||
#----------------------------------------------------------
|
||||
|
||||
server = None
|
||||
|
||||
def load_server_wide_modules():
|
||||
for m in openerp.conf.server_wide_modules:
|
||||
try:
|
||||
openerp.modules.module.load_openerp_module(m)
|
||||
except Exception:
|
||||
msg = ''
|
||||
if m == 'web':
|
||||
msg = """
|
||||
The `web` module is provided by the addons found in the `openerp-web` project.
|
||||
Maybe you forgot to add those addons in your addons_path configuration."""
|
||||
_logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
|
||||
|
||||
def _reexec():
|
||||
"""reexecute openerp-server process with (nearly) the same arguments"""
|
||||
if openerp.tools.osutil.is_running_as_nt_service():
|
||||
subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
|
||||
exe = os.path.basename(sys.executable)
|
||||
args = stripped_sys_argv()
|
||||
if not args or args[0] != exe:
|
||||
args.insert(0, exe)
|
||||
os.execv(sys.executable, args)
|
||||
|
||||
def start():
|
||||
""" Start the openerp http server and cron processor.
|
||||
"""
|
||||
load_server_wide_modules()
|
||||
if config['workers']:
|
||||
openerp.multi_process = True
|
||||
server = Multicorn(openerp.service.wsgi_server.application)
|
||||
elif openerp.evented:
|
||||
server = GeventServer(openerp.service.wsgi_server.application)
|
||||
else:
|
||||
server = ThreadedServer(openerp.service.wsgi_server.application)
|
||||
server.run()
|
||||
|
||||
# like the legend of the phoenix, all ends with beginnings
|
||||
if getattr(openerp, 'phoenix', False):
|
||||
_reexec()
|
||||
sys.exit(0)
|
||||
|
||||
def restart_server():
|
||||
""" Restart the server
|
||||
"""
|
||||
if os.name == 'nt':
|
||||
# run in a thread to let the current thread return response to the caller.
|
||||
threading.Thread(target=_reexec).start()
|
||||
else:
|
||||
os.kill(server.pid, signal.SIGHUP)
|
||||
|
||||
|
||||
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|
||||
|
|
|
@ -400,8 +400,6 @@ def application_unproxied(environ, start_response):
|
|||
if hasattr(threading.current_thread(), 'dbname'):
|
||||
del threading.current_thread().dbname
|
||||
|
||||
openerp.service.start_internal()
|
||||
|
||||
# Try all handlers until one returns some result (i.e. not None).
|
||||
wsgi_handlers = [wsgi_xmlrpc_1, wsgi_xmlrpc, wsgi_xmlrpc_legacy, wsgi_webdav]
|
||||
wsgi_handlers += module_handlers
|
||||
|
@ -422,69 +420,5 @@ def application(environ, start_response):
|
|||
else:
|
||||
return application_unproxied(environ, start_response)
|
||||
|
||||
# The WSGI server, started by start_server(), stopped by stop_server().
|
||||
httpd = None
|
||||
|
||||
def serve(interface, port, threaded):
|
||||
""" Serve HTTP requests via werkzeug development server.
|
||||
|
||||
Calling this function is blocking, you might want to call it in its own
|
||||
thread.
|
||||
"""
|
||||
|
||||
global httpd
|
||||
if not openerp.evented:
|
||||
httpd = werkzeug.serving.make_server(interface, port, application, threaded=threaded)
|
||||
else:
|
||||
from gevent.wsgi import WSGIServer
|
||||
httpd = WSGIServer((interface, port), application)
|
||||
httpd.serve_forever()
|
||||
|
||||
def start_service():
|
||||
""" Call serve() in its own thread.
|
||||
|
||||
The WSGI server can be shutdown with stop_server() below.
|
||||
"""
|
||||
# TODO Change the xmlrpc_* options to http_*
|
||||
interface = config['xmlrpc_interface'] or '0.0.0.0'
|
||||
port = config['xmlrpc_port']
|
||||
_logger.info('HTTP service (werkzeug) running on %s:%s', interface, port)
|
||||
if not openerp.evented:
|
||||
threading.Thread(target=serve, args=(interface, port, True)).start()
|
||||
else:
|
||||
serve(interface, port, True)
|
||||
|
||||
def stop_service():
|
||||
""" Initiate the shutdown of the WSGI server.
|
||||
|
||||
The server is supposed to have been started by start_server() above.
|
||||
"""
|
||||
if httpd:
|
||||
if not openerp.evented:
|
||||
httpd.shutdown()
|
||||
close_socket(httpd.socket)
|
||||
else:
|
||||
import gevent
|
||||
httpd.stop()
|
||||
gevent.shutdown()
|
||||
|
||||
def close_socket(sock):
|
||||
""" Closes a socket instance cleanly
|
||||
|
||||
:param sock: the network socket to close
|
||||
:type sock: socket.socket
|
||||
"""
|
||||
try:
|
||||
sock.shutdown(socket.SHUT_RDWR)
|
||||
except socket.error, e:
|
||||
# On OSX, socket shutdowns both sides if any side closes it
|
||||
# causing an error 57 'Socket is not connected' on shutdown
|
||||
# of the other side (or something), see
|
||||
# http://bugs.python.org/issue4397
|
||||
# note: stdlib fixed test, not behavior
|
||||
if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
|
||||
raise
|
||||
sock.close()
|
||||
|
||||
|
||||
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|
||||
|
|
Loading…
Reference in New Issue