[FIX] put long polling worker in a completely separate process to avoid problems with gevent monkey patching
bzr revid: nicolas.vanhoren@openerp.com-20130610164954-d55rurfkwt95ibqp
This commit is contained in:
commit
3587f43662
|
@ -0,0 +1,11 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import gevent.monkey
|
||||
gevent.monkey.patch_all()
|
||||
import gevent_psycopg2
|
||||
gevent_psycopg2.monkey_patch()
|
||||
|
||||
import openerp
|
||||
|
||||
if __name__ == "__main__":
|
||||
openerp.cli.main()
|
|
@ -23,6 +23,14 @@
|
|||
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
# Is the server running with gevent.
|
||||
evented = False
|
||||
if sys.modules.get("gevent") is not None:
|
||||
evented = True
|
||||
|
||||
|
||||
# Make sure the OpenERP server runs in UTC. This is especially necessary
|
||||
# under Windows as under Linux it seems the real import of time is
|
||||
# sufficiently deferred so that setting the TZ environment variable
|
||||
|
@ -61,8 +69,6 @@ wsgi.register_wsgi_handler = wsgi.wsgi_server.register_wsgi_handler
|
|||
# its own copy of the data structure and we don't need to care about
|
||||
# locks between threads.
|
||||
multi_process = False
|
||||
# Is the server running with gevent.
|
||||
evented = False
|
||||
|
||||
def registry(database_name):
|
||||
"""
|
||||
|
|
|
@ -221,6 +221,18 @@ def quit_on_signals():
|
|||
os.unlink(config['pidfile'])
|
||||
sys.exit(0)
|
||||
|
||||
def watch_parent(beat=4):
|
||||
import gevent
|
||||
ppid = os.getppid()
|
||||
while True:
|
||||
if ppid != os.getppid():
|
||||
pid = os.getpid()
|
||||
_logger.info("LongPolling (%s) Parent changed", pid)
|
||||
# suicide !!
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
return
|
||||
gevent.sleep(beat)
|
||||
|
||||
def main(args):
|
||||
check_root_user()
|
||||
openerp.tools.config.parse_config(args)
|
||||
|
@ -257,9 +269,15 @@ def main(args):
|
|||
setup_pid_file()
|
||||
# Some module register themselves when they are loaded so we need the
|
||||
# services to be running before loading any registry.
|
||||
if config['workers']:
|
||||
openerp.service.start_services_workers()
|
||||
if not openerp.evented:
|
||||
if config['workers']:
|
||||
openerp.service.start_services_workers()
|
||||
else:
|
||||
openerp.service.start_services()
|
||||
else:
|
||||
config['xmlrpc_port'] = config['longpolling_port']
|
||||
import gevent
|
||||
gevent.spawn(watch_parent)
|
||||
openerp.service.start_services()
|
||||
|
||||
rc = 0
|
||||
|
|
|
@ -14,6 +14,8 @@ import signal
|
|||
import socket
|
||||
import sys
|
||||
import time
|
||||
import subprocess
|
||||
import os.path
|
||||
|
||||
import werkzeug.serving
|
||||
try:
|
||||
|
@ -35,7 +37,6 @@ class Multicorn(object):
|
|||
def __init__(self, app):
|
||||
# config
|
||||
self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
|
||||
self.long_polling_address = (config['xmlrpc_interface'] or '0.0.0.0', config['longpolling_port'])
|
||||
self.population = config['workers']
|
||||
self.timeout = config['limit_time_real']
|
||||
self.limit_request = config['limit_request']
|
||||
|
@ -46,10 +47,10 @@ class Multicorn(object):
|
|||
self.socket = None
|
||||
self.workers_http = {}
|
||||
self.workers_cron = {}
|
||||
self.workers_longpolling = {}
|
||||
self.workers = {}
|
||||
self.generation = 0
|
||||
self.queue = []
|
||||
self.long_polling_pid = None
|
||||
|
||||
def pipe_new(self):
|
||||
pipe = os.pipe()
|
||||
|
@ -89,6 +90,14 @@ class Multicorn(object):
|
|||
worker.run()
|
||||
sys.exit(0)
|
||||
|
||||
def long_polling_spawn(self):
|
||||
nargs = [] + sys.argv
|
||||
cmd = nargs[0]
|
||||
cmd = os.path.join(os.path.dirname(cmd), "openerp-long-polling")
|
||||
nargs[0] = cmd
|
||||
popen = subprocess.Popen(nargs)
|
||||
self.long_polling_pid = popen.pid
|
||||
|
||||
def worker_pop(self, pid):
|
||||
if pid in self.workers:
|
||||
_logger.debug("Worker (%s) unregistered",pid)
|
||||
|
@ -143,8 +152,8 @@ class Multicorn(object):
|
|||
self.worker_spawn(WorkerHTTP, self.workers_http)
|
||||
while len(self.workers_cron) < config['max_cron_threads']:
|
||||
self.worker_spawn(WorkerCron, self.workers_cron)
|
||||
while len(self.workers_longpolling) < 1:
|
||||
self.worker_spawn(WorkerLongPolling, self.workers_longpolling)
|
||||
if not self.long_polling_pid:
|
||||
self.long_polling_spawn()
|
||||
|
||||
def sleep(self):
|
||||
try:
|
||||
|
@ -183,14 +192,11 @@ class Multicorn(object):
|
|||
self.socket.setblocking(0)
|
||||
self.socket.bind(self.address)
|
||||
self.socket.listen(8*self.population)
|
||||
# long polling socket
|
||||
self.long_polling_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.long_polling_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.long_polling_socket.setblocking(0)
|
||||
self.long_polling_socket.bind(self.long_polling_address)
|
||||
self.long_polling_socket.listen(8)
|
||||
|
||||
def stop(self, graceful=True):
|
||||
if self.long_polling_pid is not None:
|
||||
self.worker_kill(self.long_polling_pid, signal.SIGTERM)
|
||||
self.long_polling_pid = None
|
||||
if graceful:
|
||||
_logger.info("Stopping gracefully")
|
||||
limit = time.time() + self.timeout
|
||||
|
@ -353,49 +359,8 @@ class WorkerHTTP(Worker):
|
|||
|
||||
def start(self):
|
||||
Worker.start(self)
|
||||
self.multi.long_polling_socket.close()
|
||||
self.server = WorkerBaseWSGIServer(self.multi.app)
|
||||
|
||||
class WorkerLongPolling(Worker):
|
||||
""" Long polling workers """
|
||||
def __init__(self, multi):
|
||||
super(WorkerLongPolling, self).__init__(multi)
|
||||
# Disable the watchdog feature for this kind of worker.
|
||||
self.watchdog_timeout = None
|
||||
|
||||
def watch_parent(self):
|
||||
import gevent
|
||||
while True:
|
||||
if self.ppid != os.getppid():
|
||||
_logger.info("WorkerLongPolling (%s) Parent changed", self.pid)
|
||||
os.kill(os.getpid(), signal.SIGTERM)
|
||||
return
|
||||
gevent.sleep(self.multi.beat)
|
||||
|
||||
def start(self):
|
||||
openerp.evented = True
|
||||
_logger.info('Using gevent mode')
|
||||
import gevent.monkey
|
||||
gevent.monkey.patch_all()
|
||||
import gevent_psycopg2
|
||||
gevent_psycopg2.monkey_patch()
|
||||
from openerp.modules.registry import RegistryManager
|
||||
from gevent.coros import RLock
|
||||
RegistryManager.registries_lock = RLock()
|
||||
|
||||
Worker.start(self)
|
||||
self.multi.socket.close()
|
||||
|
||||
import gevent
|
||||
watcher = gevent.spawn(self.watch_parent)
|
||||
|
||||
log = _logger.getChild(self.__class__.__name__)
|
||||
log.write = lambda msg: log.info(msg.strip())
|
||||
|
||||
from gevent.wsgi import WSGIServer
|
||||
self.server = WSGIServer(self.multi.long_polling_socket, self.multi.app, log=log)
|
||||
self.server.serve_forever()
|
||||
|
||||
class WorkerBaseWSGIServer(werkzeug.serving.BaseWSGIServer):
|
||||
""" werkzeug WSGI Server patched to allow using an external listen socket
|
||||
"""
|
||||
|
@ -469,7 +434,6 @@ class WorkerCron(Worker):
|
|||
def start(self):
|
||||
Worker.start(self)
|
||||
self.multi.socket.close()
|
||||
self.multi.long_polling_socket.close()
|
||||
openerp.service.start_internal()
|
||||
|
||||
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|
||||
|
|
|
@ -449,7 +449,10 @@ def start_service():
|
|||
interface = config['xmlrpc_interface'] or '0.0.0.0'
|
||||
port = config['xmlrpc_port']
|
||||
_logger.info('HTTP service (werkzeug) running on %s:%s', interface, port)
|
||||
threading.Thread(target=serve, args=(interface, port, True)).start()
|
||||
if not openerp.evented:
|
||||
threading.Thread(target=serve, args=(interface, port, True)).start()
|
||||
else:
|
||||
serve(interface, port, True)
|
||||
|
||||
def stop_service():
|
||||
""" Initiate the shutdown of the WSGI server.
|
||||
|
|
Loading…
Reference in New Issue