[IMP] openerp/service/server.py: lint file

bzr revid: chs@openerp.com-20140221223544-q309ioxit8tzlaki
This commit is contained in:
Christophe Simonis 2014-02-21 23:35:44 +01:00
parent f511109440
commit d81e9debbe
1 changed files with 37 additions and 36 deletions

View File

@ -9,7 +9,6 @@ import os.path
import platform import platform
import psutil import psutil
import random import random
import re
import resource import resource
import select import select
import signal import signal
@ -32,15 +31,14 @@ except ImportError:
setproctitle = lambda x: None setproctitle = lambda x: None
import openerp import openerp
import openerp.tools.config as config from openerp.module.registry import RegistryManager
from openerp.release import nt_service_name from openerp.release import nt_service_name
import openerp.tools.config as config
from openerp.tools.misc import stripped_sys_argv, dumpstacks from openerp.tools.misc import stripped_sys_argv, dumpstacks
import wsgi_server
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
SLEEP_INTERVAL = 60 # 1 min SLEEP_INTERVAL = 60 # 1 min
#---------------------------------------------------------- #----------------------------------------------------------
# Werkzeug WSGI servers patched # Werkzeug WSGI servers patched
@ -126,7 +124,7 @@ class AutoReload(object):
if len(right) < 2: if len(right) < 2:
continue continue
module = right[0] module = right[0]
self.modules[module]=1 self.modules[module] = 1
if self.modules: if self.modules:
_logger.info('autoreload: xml change detected, autoreload activated') _logger.info('autoreload: xml change detected, autoreload activated')
restart() restart()
@ -146,7 +144,7 @@ class AutoReload(object):
if py_errors: if py_errors:
_logger.info('autoreload: python code change detected, errors found') _logger.info('autoreload: python code change detected, errors found')
for i in py_errors: for i in py_errors:
_logger.info('autoreload: SyntaxError %s',i) _logger.info('autoreload: SyntaxError %s', i)
else: else:
_logger.info('autoreload: python code updated, autoreload activated') _logger.info('autoreload: python code updated, autoreload activated')
restart() restart()
@ -213,7 +211,7 @@ class ThreadedServer(CommonServer):
self.httpd = None self.httpd = None
def signal_handler(self, sig, frame): def signal_handler(self, sig, frame):
if sig in [signal.SIGINT,signal.SIGTERM]: if sig in [signal.SIGINT, signal.SIGTERM]:
# shutdown on kill -INT or -TERM # shutdown on kill -INT or -TERM
self.quit_signals_received += 1 self.quit_signals_received += 1
if self.quit_signals_received > 1: if self.quit_signals_received > 1:
@ -227,7 +225,7 @@ class ThreadedServer(CommonServer):
def cron_thread(self, number): def cron_thread(self, number):
while True: while True:
time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style
registries = openerp.modules.registry.RegistryManager.registries registries = openerp.modules.registry.RegistryManager.registries
_logger.debug('cron%d polling for jobs', number) _logger.debug('cron%d polling for jobs', number)
for db_name, registry in registries.items(): for db_name, registry in registries.items():
@ -257,8 +255,8 @@ class ThreadedServer(CommonServer):
_logger.debug("cron%d started!" % i) _logger.debug("cron%d started!" % i)
def http_thread(self): def http_thread(self):
def app(e,s): def app(e, s):
return self.app(e,s) return self.app(e, s)
self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, app) self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, app)
self.httpd.serve_forever() self.httpd.serve_forever()
@ -324,7 +322,6 @@ class ThreadedServer(CommonServer):
self.stop() self.stop()
return rc return rc
# Wait for a first signal to be handled. (time.sleep will be interrupted # Wait for a first signal to be handled. (time.sleep will be interrupted
# by the signal handler.) The try/except is for the win32 case. # by the signal handler.) The try/except is for the win32 case.
try: try:
@ -440,7 +437,7 @@ class PreforkServer(CommonServer):
sys.exit(0) sys.exit(0)
def long_polling_spawn(self): def long_polling_spawn(self):
nargs = stripped_sys_argv('--pidfile','--workers') nargs = stripped_sys_argv('--pidfile', '--workers')
cmd = nargs[0] cmd = nargs[0]
cmd = os.path.join(os.path.dirname(cmd), "openerp-gevent") cmd = os.path.join(os.path.dirname(cmd), "openerp-gevent")
nargs[0] = cmd nargs[0] = cmd
@ -449,10 +446,10 @@ class PreforkServer(CommonServer):
def worker_pop(self, pid): def worker_pop(self, pid):
if pid in self.workers: if pid in self.workers:
_logger.debug("Worker (%s) unregistered",pid) _logger.debug("Worker (%s) unregistered", pid)
try: try:
self.workers_http.pop(pid,None) self.workers_http.pop(pid, None)
self.workers_cron.pop(pid,None) self.workers_cron.pop(pid, None)
u = self.workers.pop(pid) u = self.workers.pop(pid)
u.close() u.close()
except OSError: except OSError:
@ -468,7 +465,7 @@ class PreforkServer(CommonServer):
def process_signals(self): def process_signals(self):
while len(self.queue): while len(self.queue):
sig = self.queue.pop(0) sig = self.queue.pop(0)
if sig in [signal.SIGINT,signal.SIGTERM]: if sig in [signal.SIGINT, signal.SIGTERM]:
raise KeyboardInterrupt raise KeyboardInterrupt
elif sig == signal.SIGHUP: elif sig == signal.SIGHUP:
# restart on kill -HUP # restart on kill -HUP
@ -504,8 +501,8 @@ class PreforkServer(CommonServer):
def process_timeout(self): def process_timeout(self):
now = time.time() now = time.time()
for (pid, worker) in self.workers.items(): for (pid, worker) in self.workers.items():
if (worker.watchdog_timeout is not None) and \ if worker.watchdog_timeout is not None and \
(now - worker.watchdog_time >= worker.watchdog_timeout): (now - worker.watchdog_time) >= worker.watchdog_timeout:
_logger.error("Worker (%s) timeout", pid) _logger.error("Worker (%s) timeout", pid)
self.worker_kill(pid, signal.SIGKILL) self.worker_kill(pid, signal.SIGKILL)
@ -520,7 +517,7 @@ class PreforkServer(CommonServer):
def sleep(self): def sleep(self):
try: try:
# map of fd -> worker # map of fd -> worker
fds = dict([(w.watchdog_pipe[0],w) for k,w in self.workers.items()]) fds = dict([(w.watchdog_pipe[0], w) for k, w in self.workers.items()])
fd_in = fds.keys() + [self.pipe[0]] fd_in = fds.keys() + [self.pipe[0]]
# check for ping or internal wakeups # check for ping or internal wakeups
ready = select.select(fd_in, [], [], self.beat) ready = select.select(fd_in, [], [], self.beat)
@ -560,11 +557,12 @@ class PreforkServer(CommonServer):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.setblocking(0) self.socket.setblocking(0)
self.socket.bind(self.address) self.socket.bind(self.address)
self.socket.listen(8*self.population) self.socket.listen(8 * self.population)
def stop(self, graceful=True): def stop(self, graceful=True):
if self.long_polling_pid is not None: if self.long_polling_pid is not None:
self.worker_kill(self.long_polling_pid, signal.SIGKILL) # FIXME make longpolling process handle SIGTERM correctly # FIXME make longpolling process handle SIGTERM correctly
self.worker_kill(self.long_polling_pid, signal.SIGKILL)
self.long_polling_pid = None self.long_polling_pid = None
if graceful: if graceful:
_logger.info("Stopping gracefully") _logger.info("Stopping gracefully")
@ -602,7 +600,7 @@ class PreforkServer(CommonServer):
_logger.debug("Multiprocess clean stop") _logger.debug("Multiprocess clean stop")
self.stop() self.stop()
break break
except Exception,e: except Exception, e:
_logger.exception(e) _logger.exception(e)
self.stop(False) self.stop(False)
return -1 return -1
@ -634,7 +632,7 @@ class Worker(object):
def sleep(self): def sleep(self):
try: try:
ret = select.select([self.multi.socket], [], [], self.multi.beat) select.select([self.multi.socket], [], [], self.multi.beat)
except select.error, e: except select.error, e:
if e[0] not in [errno.EINTR]: if e[0] not in [errno.EINTR]:
raise raise
@ -652,7 +650,7 @@ class Worker(object):
rss, vms = psutil.Process(os.getpid()).get_memory_info() rss, vms = psutil.Process(os.getpid()).get_memory_info()
if vms > config['limit_memory_soft']: if vms > config['limit_memory_soft']:
_logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms) _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
self.alive = False # Commit suicide after the request. self.alive = False # Commit suicide after the request.
# VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
soft, hard = resource.getrlimit(resource.RLIMIT_AS) soft, hard = resource.getrlimit(resource.RLIMIT_AS)
@ -700,7 +698,7 @@ class Worker(object):
self.process_work() self.process_work()
_logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count) _logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
self.stop() self.stop()
except Exception,e: except Exception:
_logger.exception("Worker (%s) Exception occured, exiting..." % self.pid) _logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
# should we use 3 to abort everything ? # should we use 3 to abort everything ?
sys.exit(1) sys.exit(1)
@ -718,7 +716,7 @@ class WorkerHTTP(Worker):
# tolerate broken pipe when the http client closes the socket before # tolerate broken pipe when the http client closes the socket before
# receiving the full reply # receiving the full reply
try: try:
self.server.process_request(client,addr) self.server.process_request(client, addr)
except IOError, e: except IOError, e:
if e.errno != errno.EPIPE: if e.errno != errno.EPIPE:
raise raise
@ -749,7 +747,7 @@ class WorkerCron(Worker):
def sleep(self): def sleep(self):
# Really sleep once all the databases have been processed. # Really sleep once all the databases have been processed.
if self.db_index == 0: if self.db_index == 0:
interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect
time.sleep(interval) time.sleep(interval)
def _db_list(self): def _db_list(self):
@ -771,7 +769,7 @@ class WorkerCron(Worker):
if rpc_request_flag: if rpc_request_flag:
start_time = time.time() start_time = time.time()
start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info() start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
import openerp.addons.base as base import openerp.addons.base as base
base.ir.ir_cron.ir_cron._acquire_job(db_name) base.ir.ir_cron.ir_cron._acquire_job(db_name)
openerp.modules.registry.RegistryManager.delete(db_name) openerp.modules.registry.RegistryManager.delete(db_name)
@ -780,16 +778,18 @@ class WorkerCron(Worker):
if len(db_names) > 1: if len(db_names) > 1:
openerp.sql_db.close_db(db_name) openerp.sql_db.close_db(db_name)
if rpc_request_flag: if rpc_request_flag:
end_time = time.time() run_time = time.time() - start_time
end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info() end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % (db_name, end_time - start_time, start_vms / 1024, end_vms / 1024, (end_vms - start_vms)/1024) vms_diff = (end_vms - start_vms) / 1024
logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % \
(db_name, run_time, start_vms / 1024, end_vms / 1024, vms_diff)
_logger.debug("WorkerCron (%s) %s", self.pid, logline) _logger.debug("WorkerCron (%s) %s", self.pid, logline)
self.request_count += 1 self.request_count += 1
if self.request_count >= self.request_max and self.request_max < len(db_names): if self.request_count >= self.request_max and self.request_max < len(db_names):
_logger.error("There are more dabatases to process than allowed " _logger.error("There are more dabatases to process than allowed "
"by the `limit_request` configuration variable: %s more.", "by the `limit_request` configuration variable: %s more.",
len(db_names) - self.request_max) len(db_names) - self.request_max)
else: else:
self.db_index = 0 self.db_index = 0
@ -822,7 +822,7 @@ def _reexec(updated_modules=None):
subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True) subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
exe = os.path.basename(sys.executable) exe = os.path.basename(sys.executable)
args = stripped_sys_argv() args = stripped_sys_argv()
args += ["-u", ','.join(updated_modules)] args += ["-u", ','.join(updated_modules)]
if not args or args[0] != exe: if not args or args[0] != exe:
args.insert(0, exe) args.insert(0, exe)
os.execv(sys.executable, args) os.execv(sys.executable, args)
@ -843,7 +843,8 @@ def load_test_file_py(registry, test_file):
for t in unittest2.TestLoader().loadTestsFromModule(mod_mod): for t in unittest2.TestLoader().loadTestsFromModule(mod_mod):
suite.addTest(t) suite.addTest(t)
_logger.log(logging.INFO, 'running tests %s.', mod_mod.__name__) _logger.log(logging.INFO, 'running tests %s.', mod_mod.__name__)
result = unittest2.TextTestRunner(verbosity=2, stream=openerp.modules.module.TestStream()).run(suite) stream = openerp.modules.module.TestStream()
result = unittest2.TextTestRunner(verbosity=2, stream=stream).run(suite)
success = result.wasSuccessful() success = result.wasSuccessful()
registry._assertion_report.report_result(success) registry._assertion_report.report_result(success)
if not success: if not success:
@ -859,7 +860,7 @@ def preload_registries(dbnames):
for dbname in dbnames: for dbname in dbnames:
try: try:
update_module = config['init'] or config['update'] update_module = config['init'] or config['update']
registry = openerp.modules.registry.RegistryManager.new(dbname, update_module=update_module) registry = RegistryManager.new(dbname, update_module=update_module)
# run test_file if provided # run test_file if provided
if test_file: if test_file:
_logger.info('loading test file %s', test_file) _logger.info('loading test file %s', test_file)