#----------------------------------------------------------- # Multicorn, multiprocessing inspired by gunicorn # TODO rename class: Multicorn -> Arbiter ? #----------------------------------------------------------- import errno import fcntl import logging import os import psutil import random import resource import select import signal import socket import sys import time import werkzeug.serving import openerp import openerp.tools.config as config _logger = logging.getLogger(__name__) class Multicorn(object): """ Multiprocessing inspired by (g)unicorn. Multicorn currently uses accept(2) as dispatching method between workers but we plan to replace it by a more intelligent dispatcher to will parse the first HTTP request line. """ def __init__(self, app): # config self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port']) self.population = config['workers'] self.timeout = config['limit_time_real'] self.limit_request = config['limit_request'] # working vars self.beat = 4 self.app = app self.pid = os.getpid() self.socket = None self.workers_http = {} self.workers_cron = {} self.workers = {} self.generation = 0 self.queue = [] def pipe_new(self): pipe = os.pipe() for fd in pipe: # non_blocking flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK fcntl.fcntl(fd, fcntl.F_SETFL, flags) # close_on_exec flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC fcntl.fcntl(fd, fcntl.F_SETFD, flags) return pipe def pipe_ping(self, pipe): try: os.write(pipe[1], '.') except IOError, e: if e.errno not in [errno.EAGAIN, errno.EINTR]: raise def signal_handler(self, sig, frame): if len(self.queue) < 5 or sig == signal.SIGCHLD: self.queue.append(sig) self.pipe_ping(self.pipe) else: _logger.warn("Dropping signal: %s", sig) def worker_spawn(self, klass, workers_registry): self.generation += 1 worker = klass(self) pid = os.fork() if pid != 0: worker.pid = pid self.workers[pid] = worker workers_registry[pid] = worker return worker else: worker.run() sys.exit(0) def worker_pop(self, pid): if pid in self.workers: _logger.debug("Worker (%s) unregistered",pid) try: self.workers_http.pop(pid,None) self.workers_cron.pop(pid,None) u = self.workers.pop(pid) u.close() except OSError: return def worker_kill(self, pid, sig): try: os.kill(pid, sig) except OSError, e: if e.errno == errno.ESRCH: self.worker_pop(pid) def process_signals(self): while len(self.queue): sig = self.queue.pop(0) if sig in [signal.SIGINT,signal.SIGTERM]: raise KeyboardInterrupt def process_zombie(self): # reap dead workers while 1: try: wpid, status = os.waitpid(-1, os.WNOHANG) if not wpid: break if (status >> 8) == 3: msg = "Critial worker error (%s)" _logger.critical(msg, wpid) raise Exception(msg % wpid) self.worker_pop(wpid) except OSError, e: if e.errno == errno.ECHILD: break raise def process_timeout(self): now = time.time() for (pid, worker) in self.workers.items(): if now - worker.watchdog_time >= worker.watchdog_timeout: _logger.error("Worker (%s) timeout", pid) self.worker_kill(pid, signal.SIGKILL) def process_spawn(self): while len(self.workers_http) < self.population: self.worker_spawn(WorkerHTTP, self.workers_http) while len(self.workers_cron) < 1: # config option ? self.worker_spawn(WorkerCron, self.workers_cron) def sleep(self): try: # map of fd -> worker fds = dict([(w.watchdog_pipe[0],w) for k,w in self.workers.items()]) fd_in = fds.keys() + [self.pipe[0]] # check for ping or internal wakeups ready = select.select(fd_in, [], [], self.beat) # update worker watchdogs for fd in ready[0]: if fd in fds: fds[fd].watchdog_time = time.time() try: # empty pipe while os.read(fd, 1): pass except OSError, e: if e.errno not in [errno.EAGAIN]: raise except select.error, e: if e[0] not in [errno.EINTR]: raise def start(self): # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the # signal handler to overcome this behaviour self.pipe = self.pipe_new() # set signal signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) signal.signal(signal.SIGCHLD, self.signal_handler) # listen to socket self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.setblocking(0) self.socket.bind(self.address) self.socket.listen(8) def stop(self, graceful=True): if graceful: _logger.info("Stopping gracefully") limit = time.time() + self.timeout for pid in self.workers.keys(): self.worker_kill(pid, signal.SIGTERM) while self.workers and time.time() < limit: self.process_zombie() time.sleep(0.1) else: _logger.info("Stopping forcefully") for pid in self.workers.keys(): self.worker_kill(pid, signal.SIGTERM) self.socket.close() import __main__ __main__.quit_signals_received = 1 def run(self): self.start() _logger.debug("Multiprocess starting") while 1: try: #_logger.debug("Multiprocess beat (%s)",time.time()) self.process_signals() self.process_zombie() self.process_timeout() self.process_spawn() self.sleep() except KeyboardInterrupt: _logger.debug("Multiprocess clean stop") self.stop() break except Exception,e: _logger.exception(e) self.stop(False) sys.exit(-1) class Worker(object): """ Workers """ def __init__(self, multi): self.multi = multi self.watchdog_time = time.time() self.watchdog_pipe = multi.pipe_new() self.watchdog_timeout = multi.timeout self.ppid = os.getpid() self.pid = None self.alive = True # should we rename into lifetime ? self.request_max = multi.limit_request self.request_count = 0 def close(self): os.close(self.watchdog_pipe[0]) os.close(self.watchdog_pipe[1]) def signal_handler(self, sig, frame): self.alive = False def sleep(self): try: ret = select.select([self.multi.socket], [], [], self.multi.beat) except select.error, e: if e[0] not in [errno.EINTR]: raise def process_limit(self): # If our parent changed sucide if self.ppid != os.getppid(): _logger.info("Worker (%s) Parent changed", self.pid) self.alive = False # check for lifetime if self.request_count >= self.request_max: _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count) self.alive = False # Reset the worker if it consumes too much memory (e.g. caused by a memory leak). rss, vms = psutil.Process(os.getpid()).get_memory_info() if vms > config['limit_memory_soft']: _logger.info('Virtual memory consumption too high, rebooting the worker.') self.alive = False # Commit suicide after the request. # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space soft, hard = resource.getrlimit(resource.RLIMIT_AS) resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard)) # SIGXCPU (exceeded CPU time) signal handler will raise an exception. r = resource.getrusage(resource.RUSAGE_SELF) cpu_time = r.ru_utime + r.ru_stime def time_expired(n, stack): _logger.info('CPU time limit exceeded.') raise Exception('CPU time limit exceeded.') signal.signal(signal.SIGXCPU, time_expired) soft, hard = resource.getrlimit(resource.RLIMIT_CPU) resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard)) def process_work(self): pass def start(self): self.pid = os.getpid() _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid) # Reseed the random number generator random.seed() # Prevent fd inherientence close_on_exec flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags) # reset blocking status self.multi.socket.setblocking(0) signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGCHLD, signal.SIG_DFL) def stop(self): pass def run(self): try: self.start() while self.alive: self.process_limit() self.multi.pipe_ping(self.watchdog_pipe) self.sleep() self.process_work() _logger.info("Worker (%s) exiting...",self.pid) self.stop() except Exception,e: _logger.exception("Worker (%s) Exception occured, exiting..."%self.pid) # should we use 3 to abort everything ? sys.exit(1) class WorkerHTTP(Worker): """ HTTP Request workers """ def process_request(self, client, addr): client.setblocking(1) client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # Prevent fd inherientence close_on_exec flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC fcntl.fcntl(client, fcntl.F_SETFD, flags) # do request using WorkerBaseWSGIServer monkey patched with socket self.server.socket = client self.server.process_request(client,addr) self.request_count += 1 def process_work(self): try: client, addr = self.multi.socket.accept() self.process_request(client, addr) except socket.error, e: if e[0] not in (errno.EAGAIN, errno.ECONNABORTED): raise def start(self): Worker.start(self) self.server = WorkerBaseWSGIServer(self.multi.app) class WorkerBaseWSGIServer(werkzeug.serving.BaseWSGIServer): """ werkzeug WSGI Server patched to allow using an external listen socket """ def __init__(self, app): werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app) def server_bind(self): # we dont bind beause we use the listen socket of Multicorn#socket # instead we close the socket if self.socket: self.socket.close() def server_activate(self): # dont listen as we use Multicorn#socket pass class WorkerCron(Worker): """ Cron workers """ def sleep(self): time.sleep(60) def process_work(self): if config['db_name']: db_names = config['db_name'].split(',') else: db_names = openerp.netsvc.ExportService._services['db'].exp_list(True) for db_name in db_names: while True: # TODO Each job should be considered as one request in multiprocessing acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name) if not acquired: break self.request_count += 1 def start(self): Worker.start(self) openerp.service.start_internal() # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: