new test execution engine

remove deprecated zipfile support
add preload_registry option when server is running
allow registries to be used in contruction in test mode
add a rollback test case for http tests
add a phantomjs helper

bzr revid: al@openerp.com-20140209004005-p5pwym4sqc23vw5b
This commit is contained in:
Antony Lesuisse 2014-02-09 01:40:05 +01:00
parent 82372e6a02
commit 278ed718e9
10 changed files with 240 additions and 293 deletions

View File

@ -89,16 +89,6 @@ def setup_pid_file():
pidtext = "%d" % (os.getpid()) pidtext = "%d" % (os.getpid())
fd.write(pidtext) fd.write(pidtext)
def preload_registry(dbname):
""" Preload a registry, and start the cron."""
try:
update_module = True if openerp.tools.config['init'] or openerp.tools.config['update'] else False
registry = openerp.modules.registry.RegistryManager.new(dbname, update_module=update_module)
except Exception:
_logger.exception('Failed to initialize database `%s`.', dbname)
return False
return registry._assertion_report.failures == 0
def run_test_file(dbname, test_file): def run_test_file(dbname, test_file):
""" Preload a registry, possibly run a test file, and start the cron.""" """ Preload a registry, possibly run a test file, and start the cron."""
try: try:
@ -172,23 +162,17 @@ def main(args):
if config['workers']: if config['workers']:
openerp.multi_process = True openerp.multi_process = True
# preload registries, needed for -u --stop_after_init preload = []
rc = 0
if config['db_name']: if config['db_name']:
for dbname in config['db_name'].split(','): preload = config['db_name'].split(',')
if not preload_registry(dbname):
rc += 1
if not config["stop_after_init"]: stop = config["stop_after_init"]
setup_pid_file()
openerp.service.server.start()
if config['pidfile']:
os.unlink(config['pidfile'])
else:
sys.exit(rc)
_logger.info('OpenERP server is running, waiting for connections...') setup_pid_file()
quit_on_signals() rc = openerp.service.server.start(preload=preload, stop=stop)
if config['pidfile']:
os.unlink(config['pidfile'])
sys.exit(rc)
class Server(Command): class Server(Command):
def run(self, args): def run(self, args):

View File

@ -168,7 +168,10 @@ class WebRequest(object):
""" """
# some magic to lazy create the cr # some magic to lazy create the cr
if not self._cr: if not self._cr:
self._cr = self.registry.db.cursor() if openerp.tools.config['test_enable'] and self.session_id in openerp.tests.common.HTTP_SESSION:
self._cr = openerp.tests.common.HTTP_SESSION[self.session_id]
else:
self._cr = self.registry.db.cursor()
return self._cr return self._cr
def __enter__(self): def __enter__(self):
@ -177,7 +180,7 @@ class WebRequest(object):
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
_request_stack.pop() _request_stack.pop()
if self._cr: if self._cr and not (openerp.tools.config['test_enable'] and self.session_id in openerp.tests.common.HTTP_SESSION):
if exc_type is None: if exc_type is None:
self._cr.commit() self._cr.commit()
self._cr.close() self._cr.close()

View File

@ -26,16 +26,10 @@
from . import db, graph, loading, migration, module, registry from . import db, graph, loading, migration, module, registry
# TODO temporarily expose those things
from openerp.modules.module import \
get_modules, get_modules_with_version, \
load_information_from_description_file, \
get_module_resource, zip_directory, \
get_module_path, initialize_sys_path, \
load_openerp_module, init_module_models, \
adapt_version
from openerp.modules.loading import load_modules from openerp.modules.loading import load_modules
from openerp.modules.module import get_modules, get_modules_with_version, \
load_information_from_description_file, get_module_resource, get_module_path, \
initialize_sys_path, load_openerp_module, init_module_models, adapt_version
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -182,18 +182,12 @@ def load_module_graph(cr, graph, status=None, perform_checks=True, skip_modules=
migrations.migrate_module(package, 'post') migrations.migrate_module(package, 'post')
if has_demo: if has_demo:
# launch tests only in demo mode, as most tests will depend # launch tests only in demo mode, allowing tests to use demo data.
# on demo data. Other tests can be added into the regular
# 'data' section, but should probably not alter the data,
# as there is no rollback.
if tools.config.options['test_enable']: if tools.config.options['test_enable']:
# Yamel test
report.record_result(load_test(module_name, idref, mode)) report.record_result(load_test(module_name, idref, mode))
# Python tests
# Run the `fast_suite` and `checks` tests given by the module. report.record_result(openerp.modules.module.run_unit_tests(module_name, cr.dbname))
if module_name == 'base':
# Also run the core tests after the database is created.
report.record_result(openerp.modules.module.run_unit_tests('openerp'))
report.record_result(openerp.modules.module.run_unit_tests(module_name))
processed_modules.append(package.name) processed_modules.append(package.name)
@ -247,7 +241,6 @@ def load_marked_modules(cr, graph, states, force, progressdict, report, loaded_m
if not processed: break if not processed: break
return processed_modules return processed_modules
def load_modules(db, force_demo=False, status=None, update_module=False): def load_modules(db, force_demo=False, status=None, update_module=False):
# TODO status['progress'] reporting is broken: used twice (and reset each # TODO status['progress'] reporting is broken: used twice (and reset each
# time to zero) in load_module_graph, not fine-grained enough. # time to zero) in load_module_graph, not fine-grained enough.

View File

@ -20,32 +20,31 @@
# #
############################################################################## ##############################################################################
import base64
import imp import imp
import itertools import itertools
import logging
import os import os
from os.path import join as opj import re
import sys import sys
import types import types
import zipimport
import openerp.tools as tools
import openerp.tools.osutil as osutil
from openerp.tools.safe_eval import safe_eval as eval
import zipfile
import openerp.release as release
import re
import base64
from zipfile import PyZipFile, ZIP_DEFLATED
from cStringIO import StringIO from cStringIO import StringIO
from os.path import join as opj
import logging import unittest2
import openerp
import openerp.tools as tools
import openerp.release as release
from openerp.tools.safe_eval import safe_eval as eval
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
_test_logger = logging.getLogger('openerp.tests') _test_logger = logging.getLogger('openerp.tests')
# addons path ','.joined
_ad = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'addons') # default addons path (base) _ad = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'addons') # default addons path (base)
# addons path as a list
ad_paths = [] ad_paths = []
# Modules already loaded # Modules already loaded
@ -120,7 +119,6 @@ def get_module_path(module, downloaded=False, display_warning=True):
_logger.warning('module %s: module not found', module) _logger.warning('module %s: module not found', module)
return False return False
def get_module_filetree(module, dir='.'): def get_module_filetree(module, dir='.'):
path = get_module_path(module) path = get_module_path(module)
if not path: if not path:
@ -132,12 +130,7 @@ def get_module_filetree(module, dir='.'):
if dir.startswith('..') or (dir and dir[0] == '/'): if dir.startswith('..') or (dir and dir[0] == '/'):
raise Exception('Cannot access file outside the module') raise Exception('Cannot access file outside the module')
if not os.path.isdir(path): files = openerp.tools.osutil.listdir(path, True)
# zipmodule
zip = zipfile.ZipFile(path + ".zip")
files = ['/'.join(f.split('/')[1:]) for f in zip.namelist()]
else:
files = osutil.listdir(path, True)
tree = {} tree = {}
for f in files: for f in files:
@ -154,68 +147,6 @@ def get_module_filetree(module, dir='.'):
return tree return tree
def zip_directory(directory, b64enc=True, src=True):
"""Compress a directory
@param directory: The directory to compress
@param base64enc: if True the function will encode the zip file with base64
@param src: Integrate the source files
@return: a string containing the zip file
"""
RE_exclude = re.compile('(?:^\..+\.swp$)|(?:\.py[oc]$)|(?:\.bak$)|(?:\.~.~$)', re.I)
def _zippy(archive, path, src=True):
path = os.path.abspath(path)
base = os.path.basename(path)
for f in osutil.listdir(path, True):
bf = os.path.basename(f)
if not RE_exclude.search(bf) and (src or bf == '__openerp__.py' or not bf.endswith('.py')):
archive.write(os.path.join(path, f), os.path.join(base, f))
archname = StringIO()
archive = PyZipFile(archname, "w", ZIP_DEFLATED)
# for Python 2.5, ZipFile.write() still expects 8-bit strings (2.6 converts to utf-8)
directory = tools.ustr(directory).encode('utf-8')
archive.writepy(directory)
_zippy(archive, directory, src=src)
archive.close()
archive_data = archname.getvalue()
archname.close()
if b64enc:
return base64.encodestring(archive_data)
return archive_data
def get_module_as_zip(modulename, b64enc=True, src=True):
"""Generate a module as zip file with the source or not and can do a base64 encoding
@param modulename: The module name
@param b64enc: if True the function will encode the zip file with base64
@param src: Integrate the source files
@return: a stream to store in a file-like object
"""
ap = get_module_path(str(modulename))
if not ap:
raise Exception('Unable to find path for module %s' % modulename)
ap = ap.encode('utf8')
if os.path.isfile(ap + '.zip'):
val = file(ap + '.zip', 'rb').read()
if b64enc:
val = base64.encodestring(val)
else:
val = zip_directory(ap, b64enc, src)
return val
def get_module_resource(module, *args): def get_module_resource(module, *args):
"""Return the full path of a resource of the given module. """Return the full path of a resource of the given module.
@ -235,12 +166,6 @@ def get_module_resource(module, *args):
# the module is a directory - ignore zip behavior # the module is a directory - ignore zip behavior
if os.path.exists(resource_path): if os.path.exists(resource_path):
return resource_path return resource_path
elif zipfile.is_zipfile(mod_path + '.zip'):
zip = zipfile.ZipFile( mod_path + ".zip")
files = ['/'.join(f.split('/')[1:]) for f in zip.namelist()]
resource_path = '/'.join(args)
if resource_path in files:
return opj(mod_path, resource_path)
return False return False
def get_module_icon(module): def get_module_icon(module):
@ -258,7 +183,7 @@ def load_information_from_description_file(module):
mod_path = get_module_path(module) mod_path = get_module_path(module)
if terp_file: if terp_file:
info = {} info = {}
if os.path.isfile(terp_file) or zipfile.is_zipfile(mod_path+'.zip'): if os.path.isfile(terp_file):
# default values for descriptor # default values for descriptor
info = { info = {
'application': False, 'application': False,
@ -300,7 +225,6 @@ def load_information_from_description_file(module):
_logger.debug('module %s: no __openerp__.py file found.', module) _logger.debug('module %s: no __openerp__.py file found.', module)
return {} return {}
def init_module_models(cr, module_name, obj_list): def init_module_models(cr, module_name, obj_list):
""" Initialize a list of models. """ Initialize a list of models.
@ -342,12 +266,7 @@ def load_openerp_module(module_name):
initialize_sys_path() initialize_sys_path()
try: try:
mod_path = get_module_path(module_name) mod_path = get_module_path(module_name)
zip_mod_path = '' if not mod_path else mod_path + '.zip' __import__('openerp.addons.' + module_name)
if not os.path.isfile(zip_mod_path):
__import__('openerp.addons.' + module_name)
else:
zimp = zipimport.zipimporter(zip_mod_path)
zimp.load_module(module_name)
# Call the module's post-load hook. This can done before any model or # Call the module's post-load hook. This can done before any model or
# data has been initialized. This is ok as the post-load hook is for # data has been initialized. This is ok as the post-load hook is for
@ -357,8 +276,7 @@ def load_openerp_module(module_name):
getattr(sys.modules['openerp.addons.' + module_name], info['post_load'])() getattr(sys.modules['openerp.addons.' + module_name], info['post_load'])()
except Exception, e: except Exception, e:
mt = isinstance(e, zipimport.ZipImportError) and 'zip ' or '' msg = "Couldn't load module %s" % (module_name)
msg = "Couldn't load %smodule %s" % (mt, module_name)
_logger.critical(msg) _logger.critical(msg)
_logger.critical(e) _logger.critical(e)
raise raise
@ -378,7 +296,7 @@ def get_modules():
def is_really_module(name): def is_really_module(name):
manifest_name = opj(dir, name, '__openerp__.py') manifest_name = opj(dir, name, '__openerp__.py')
zipfile_name = opj(dir, name) zipfile_name = opj(dir, name)
return os.path.isfile(manifest_name) or zipfile.is_zipfile(zipfile_name) return os.path.isfile(manifest_name)
return map(clean, filter(is_really_module, os.listdir(dir))) return map(clean, filter(is_really_module, os.listdir(dir)))
plist = [] plist = []
@ -387,7 +305,6 @@ def get_modules():
plist.extend(listdir(ad)) plist.extend(listdir(ad))
return list(set(plist)) return list(set(plist))
def get_modules_with_version(): def get_modules_with_version():
modules = get_modules() modules = get_modules()
res = dict.fromkeys(modules, adapt_version('1.0')) res = dict.fromkeys(modules, adapt_version('1.0'))
@ -405,126 +322,58 @@ def adapt_version(version):
version = '%s.%s' % (serie, version) version = '%s.%s' % (serie, version)
return version return version
def get_test_modules(module):
def get_test_modules(module, submodule, explode): """ Return a list of module for the addons potentialy containing tests to
""" feed unittest2.TestLoader.loadTestsFromModule() """
Return a list of submodules containing tests.
`submodule` can be:
- None
- the name of a submodule
- '__fast_suite__'
- '__sanity_checks__'
"""
# Turn command-line module, submodule into importable names.
if module is None:
pass
elif module == 'openerp':
module = 'openerp.tests'
else:
module = 'openerp.addons.' + module + '.tests'
# Try to import the module # Try to import the module
module = 'openerp.addons.' + module + '.tests'
try: try:
__import__(module) m = __import__(module)
except Exception, e: except Exception, e:
if explode: # If module has no `tests` sub-module, no problem.
print 'Can not `import %s`.' % module if str(e) != 'No module named tests':
import logging _logger.exception('Can not `import %s`.', module)
logging.exception('') return []
sys.exit(1)
else:
if str(e) == 'No module named tests':
# It seems the module has no `tests` sub-module, no problem.
pass
else:
_logger.exception('Can not `import %s`.', module)
return []
# Discover available test sub-modules. # include submodules too
m = sys.modules[module] result = []
submodule_names = sorted([x for x in dir(m) \ for name in sys.modules:
if x.startswith('test_') and \ if name.startswith(module) and sys.modules[name]:
isinstance(getattr(m, x), types.ModuleType)]) result.append(sys.modules[name])
submodules = [getattr(m, x) for x in submodule_names] return result
def show_submodules_and_exit(): # Use a custom stream object to log the test executions.
if submodule_names: class TestStream(object):
print 'Available submodules are:' def __init__(self):
for x in submodule_names: self.r = re.compile(r'^-*$|^ *... *$|^ok$')
print ' ', x def flush(self):
sys.exit(1) pass
def write(self, s):
if self.r.match(s):
return
first = True
for c in s.split('\n'):
if not first:
c = '` ' + c
first = False
_test_logger.info(c)
if submodule is None: def run_unit_tests(module_name, dbname):
# Use auto-discovered sub-modules.
ms = submodules
elif submodule == '__fast_suite__':
# Obtain the explicit test sub-modules list.
ms = getattr(sys.modules[module], 'fast_suite', None)
# `suite` was used before the 6.1 release instead of `fast_suite`.
ms = ms if ms else getattr(sys.modules[module], 'suite', None)
if ms is None:
if explode:
print 'The module `%s` has no defined test suite.' % (module,)
show_submodules_and_exit()
else:
ms = []
elif submodule == '__sanity_checks__':
ms = getattr(sys.modules[module], 'checks', None)
if ms is None:
if explode:
print 'The module `%s` has no defined sanity checks.' % (module,)
show_submodules_and_exit()
else:
ms = []
else:
# Pick the command-line-specified test sub-module.
m = getattr(sys.modules[module], submodule, None)
ms = [m]
if m is None:
if explode:
print 'The module `%s` has no submodule named `%s`.' % \
(module, submodule)
show_submodules_and_exit()
else:
ms = []
return ms
def run_unit_tests(module_name):
""" """
Return True or False if some tests were found and succeeded or failed. Return True or False if some tests were found and succeeded or failed.
Return None if no test was found. Return None if no test was found.
""" """
import unittest2 mods = get_test_modules(module_name)
ms = get_test_modules(module_name, '__fast_suite__', explode=False) r = True
# TODO: No need to try again if the above call failed because of e.g. a syntax error. for m in mods:
ms.extend(get_test_modules(module_name, '__sanity_checks__', explode=False)) suite = unittest2.TestSuite()
suite = unittest2.TestSuite() for t in unittest2.TestLoader().loadTestsFromModule(m):
for m in ms: suite.addTest(t)
suite.addTests(unittest2.TestLoader().loadTestsFromModule(m)) _logger.log(logging.INFO, 'module %s: running test %s.', module_name, m.__name__)
if ms: result = unittest2.TextTestRunner(verbosity=2, stream=TestStream()).run(suite)
_test_logger.info('module %s: executing %s `fast_suite` and/or `checks` sub-modules', module_name, len(ms)) if not result.wasSuccessful():
# Use a custom stream object to log the test executions. r = False
class MyStream(object):
def __init__(self):
self.r = re.compile(r'^-*$|^ *... *$|^ok$')
def flush(self):
pass
def write(self, s):
if self.r.match(s):
return
first = True
for c in s.split('\n'):
if not first:
c = '` ' + c
first = False
_test_logger.info(c)
result = unittest2.TextTestRunner(verbosity=2, stream=MyStream()).run(suite)
if result.wasSuccessful():
return True
else:
_logger.error('module %s: at least one error occurred in a test', module_name) _logger.error('module %s: at least one error occurred in a test', module_name)
return False return r
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -193,6 +193,30 @@ class Registry(Mapping):
finally: finally:
cr.close() cr.close()
class TestRLock(object):
def __init__(self):
self._lock = threading.RLock()
def acquire(self):
if openerp.tools.config['test_enable']:
return
return self._lock.acquire()
def release(self):
if openerp.tools.config['test_enable']:
return
return self._lock.release()
def __enter__(self):
self.acquire()
def __exit__(self, type, value, traceback):
self.release()
# def __enter__(self, *l, **kw):
# if openerp.tools.config['test_enable']:
# return
# return super(TestRlock, self).__enter__(*l, **kw)
# def __exit__(self, *l, **kw):
# if openerp.tools.config['test_enable']:
# return
# return super(TestRlock, self).__exit__(*l, **kw)
class RegistryManager(object): class RegistryManager(object):
""" Model registries manager. """ Model registries manager.
@ -204,7 +228,7 @@ class RegistryManager(object):
# Mapping between db name and model registry. # Mapping between db name and model registry.
# Accessed through the methods below. # Accessed through the methods below.
registries = {} registries = {}
registries_lock = threading.RLock() registries_lock = TestRLock()
@classmethod @classmethod
def get(cls, db_name, force_demo=False, status=None, update_module=False): def get(cls, db_name, force_demo=False, status=None, update_module=False):

View File

@ -108,7 +108,7 @@ def check(f):
tries = 0 tries = 0
while True: while True:
try: try:
if openerp.registry(dbname)._init: if openerp.registry(dbname)._init and not openerp.tools.config['test_enable']:
raise openerp.exceptions.Warning('Currently, this database is not fully loaded and can not be used.') raise openerp.exceptions.Warning('Currently, this database is not fully loaded and can not be used.')
return f(dbname, *args, **kwargs) return f(dbname, *args, **kwargs)
except OperationalError, e: except OperationalError, e:

View File

@ -306,7 +306,7 @@ class ThreadedServer(CommonServer):
openerp.modules.registry.RegistryManager.delete_all() openerp.modules.registry.RegistryManager.delete_all()
logging.shutdown() logging.shutdown()
def run(self): def run(self, preload=None, stop=False):
""" Start the http server and the cron thread then wait for a signal. """ Start the http server and the cron thread then wait for a signal.
The first SIGINT or SIGTERM signal will initiate a graceful shutdown while The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
@ -314,6 +314,13 @@ class ThreadedServer(CommonServer):
""" """
self.start() self.start()
preload_registries(preload)
if stop:
self.stop()
return
# Wait for a first signal to be handled. (time.sleep will be interrupted # Wait for a first signal to be handled. (time.sleep will be interrupted
# by the signal handler.) The try/except is for the win32 case. # by the signal handler.) The try/except is for the win32 case.
try: try:
@ -362,7 +369,7 @@ class GeventServer(CommonServer):
self.httpd.stop() self.httpd.stop()
gevent.shutdown() gevent.shutdown()
def run(self): def run(self, preload, stop):
self.start() self.start()
self.stop() self.stop()
@ -569,8 +576,15 @@ class PreforkServer(CommonServer):
self.worker_kill(pid, signal.SIGTERM) self.worker_kill(pid, signal.SIGTERM)
self.socket.close() self.socket.close()
def run(self): def run(self, preload, stop):
self.start() self.start()
preload_registries(preload)
if stop:
self.stop()
return
_logger.debug("Multiprocess starting") _logger.debug("Multiprocess starting")
while 1: while 1:
try: try:
@ -587,7 +601,7 @@ class PreforkServer(CommonServer):
except Exception,e: except Exception,e:
_logger.exception(e) _logger.exception(e)
self.stop(False) self.stop(False)
sys.exit(-1) return -1
class Worker(object): class Worker(object):
""" Workers """ """ Workers """
@ -809,7 +823,21 @@ def _reexec(updated_modules=None):
args.insert(0, exe) args.insert(0, exe)
os.execv(sys.executable, args) os.execv(sys.executable, args)
def start(): def preload_registries(dbnames):
""" Preload a registries."""
dbnames = dbnames or []
for dbname in dbnames:
try:
update_module = openerp.tools.config['init'] or openerp.tools.config['update']
registry = openerp.modules.registry.RegistryManager.new(dbname, update_module=update_module)
#if config['test_enable']:
# openerp.modules.module.run_http_test(config['db_name'])
#if registry._assertion_report.failures != 0:
except Exception:
_logger.exception('Failed to initialize database `%s`.', dbname)
return
def start(preload=None, stop=False):
""" Start the openerp http server and cron processor. """ Start the openerp http server and cron processor.
""" """
global server global server
@ -825,7 +853,7 @@ def start():
autoreload = AutoReload(server) autoreload = AutoReload(server)
autoreload.run() autoreload.run()
server.run() rc = server.run(preload, stop)
# like the legend of the phoenix, all ends with beginnings # like the legend of the phoenix, all ends with beginnings
if getattr(openerp, 'phoenix', False): if getattr(openerp, 'phoenix', False):
@ -833,7 +861,8 @@ def start():
if config['auto_reload']: if config['auto_reload']:
modules = autoreload.modules.keys() modules = autoreload.modules.keys()
_reexec(modules) _reexec(modules)
sys.exit(0)
return rc if rc else 0
def restart(): def restart():
""" Restart the server """ Restart the server

View File

@ -110,37 +110,106 @@ class SingleTransactionCase(BaseCase):
cls.cr.close() cls.cr.close()
class RpcCase(unittest2.TestCase): class HttpCase(SingleTransactionCase):
""" """ Transactionnal HTTP TestCase with a phantomjs helper.
Subclass of TestCase with a few XML-RPC proxies.
""" """
def __init__(self, methodName='runTest'): def __init__(self, methodName='runTest'):
super(RpcCase, self).__init__(methodName) super(HttpCase, self).__init__(methodName)
# v8 api with correct xmlrpc exception handling.
class A(object): self.xmlrpc_url = url_8 = 'http://%s:%d/xmlrpc/2/' % (HOST, PORT)
pass self.xmlrpc_common = xmlrpclib.ServerProxy(url_8 + 'common')
self.proxy = A() self.xmlrpc_db = xmlrpclib.ServerProxy(url_8 + 'db')
self.xmlrpc_object = xmlrpclib.ServerProxy(url_8 + 'object')
# Use the old (pre 6.1) API.
self.proxy.url_60 = url_60 = 'http://%s:%d/xmlrpc/' % (HOST, PORT)
self.proxy.common_60 = xmlrpclib.ServerProxy(url_60 + 'common')
self.proxy.db_60 = xmlrpclib.ServerProxy(url_60 + 'db')
self.proxy.object_60 = xmlrpclib.ServerProxy(url_60 + 'object')
#self.proxy.edi_60 = xmlrpclib.ServerProxy(url_60 + 'edi')
# Use the new (8) API.
self.proxy.url_8 = url_8 = 'http://%s:%d/xmlrpc/2/' % (HOST, PORT)
self.proxy.common_8 = xmlrpclib.ServerProxy(url_8 + 'common')
self.proxy.db_8 = xmlrpclib.ServerProxy(url_8 + 'db')
self.proxy.object_8 = xmlrpclib.ServerProxy(url_8 + 'object')
@classmethod @classmethod
def generate_database_name(cls): def setUpClass(cls):
if hasattr(cls, '_database_id'): super(HttpCase, cls).setUpClass()
cls._database_id += 1 cls.session_id = uuid.uuid4().hex
else: HTTP_SESSION[cls.session_id] = cls.cr
cls._database_id = 0
return '_fresh_name_' + str(cls._database_id) + '_' @classmethod
def tearDownClass(cls):
del HTTP_SESSION[cls.session_id]
super(HttpCase, cls).tearDownClass()
def phantomjs(self, jsfile, timeout=30, options=None):
""" Phantomjs Test protocol.
Use console.log in phantomjs to output test results evrey line must be
a one line JSON message using the following format:
- for a success: { "event": "success", "message": "Log message" }
- for an error: { "event": "error", "message": "Short error description" }
if a non json parsable line is received the helper will raise an
exception, the output buffer will be printed and phantom will be
killed
"""
self.timeout = timeout
self.options = {
'timeout' : timeout,
'port': PORT,
'db': DB,
'user': ADMIN_USER,
'password': ADMIN_PASSWORD,
'session_id': self.session_id,
}
if options:
self.options.update(options)
self.ignore_filters = [
# Ignore phantomjs warnings
"*** WARNING:",
# Fixes an issue with PhantomJS 1.9.2 on OS X 10.9 (Mavericks)
# cf. https://github.com/ariya/phantomjs/issues/11418
"CoreText performance note",
]
cmd = ['phantomjs', jsfile, json.dumps(self.options)]
_logger.info('executing %s', cmd)
try:
phantom = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except OSError:
_logger.info("phantomjs not found, test %s skipped", jsfile)
try:
t0 = time.time()
buf = ''
while 1:
if time.time() > t0 + self.timeout:
raise Exception("Phantom js timeout (%ss)" % self.timeout)
ready, _, _ = select.select([phantom.stdout], [], [], 0.5)
if ready:
s = phantom.stdout.read(4096)
if s:
buf += s
else:
break
# process lines
if '\n' in buf:
line, buf = buf.split('\n', 1)
if line not in self.ignore_filters:
try:
line_json = json.loads(line)
if line_json.get('event') == 'success':
_logger.info(line_json.get('message','ok'))
continue
elif line_json.get('event') == 'error':
err = line_json.get('message','error')
_logger.info(err)
else:
err = line + buf
except ValueError:
err = line + buf
raise Exception(err)
finally:
# kill phantomjs if phantom.exit() wasn't called in the test
if phantom.poll() is None:
phantom.terminate()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -96,6 +96,8 @@ def get_test_modules(module, submodule, explode):
return ms return ms
def run(args): def run(args):
print "Please use openerp-server --test-enable -d <dbname> -i modulename"
sys.exit(0)
import unittest2 import unittest2
import openerp import openerp