odoo/openerp/tests/common.py

280 lines
8.9 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""
The module :mod:`openerp.tests.common` provides unittest2 test cases and a few
helpers and classes to write tests.
"""
import errno
import json
import logging
import os
import select
import subprocess
import threading
import time
import unittest2
import xmlrpclib
import openerp
_logger = logging.getLogger(__name__)
# The openerp library is supposed already configured.
ADDONS_PATH = openerp.tools.config['addons_path']
HOST = '127.0.0.1'
PORT = openerp.tools.config['xmlrpc_port']
DB = openerp.tools.config['db_name']
# If the database name is not provided on the command-line,
# use the one on the thread (which means if it is provided on
# the command-line, this will break when installing another
# database from XML-RPC).
if not DB and hasattr(threading.current_thread(), 'dbname'):
DB = threading.current_thread().dbname
# Useless constant, tests are aware of the content of demo data
ADMIN_USER_ID = openerp.SUPERUSER_ID
# Magic session_id, unfortunately we have to serialize access to the cursors to
# serialize requests. We first tried to duplicate the database for each tests
# but this proved too slow. Any idea to improve this is welcome.
HTTP_SESSION = {}
def acquire_test_cursor(session_id):
if openerp.tools.config['test_enable']:
cr = HTTP_SESSION.get(session_id)
if cr:
cr._test_lock.acquire()
return cr
def release_test_cursor(session_id):
if openerp.tools.config['test_enable']:
cr = HTTP_SESSION.get(session_id)
if cr:
cr._test_lock.release()
return True
return False
def at_install(flag):
""" Sets the at-install state of a test, the flag is a boolean specifying
whether the test should (``True``) or should not (``False``) run during
module installation.
By default, tests are run at install.
"""
def decorator(obj):
obj.at_install = flag
return obj
return decorator
def post_install(flag):
""" Sets the post-install state of a test. The flag is a boolean
specifying whether the test should or should not run after a set of
module installations.
By default, tests are *not* run after installation.
"""
def decorator(obj):
obj.post_install = flag
return obj
return decorator
class BaseCase(unittest2.TestCase):
"""
Subclass of TestCase for common OpenERP-specific code.
This class is abstract and expects self.cr and self.uid to be initialized by subclasses.
"""
@classmethod
def cursor(self):
return openerp.modules.registry.RegistryManager.get(DB).db.cursor()
@classmethod
def registry(self, model):
return openerp.modules.registry.RegistryManager.get(DB)[model]
@classmethod
def ref(self, xid):
""" Returns database ID corresponding to a given identifier.
:param xid: fully-qualified record identifier, in the form ``module.identifier``
:raise: ValueError if not found
"""
assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
module, xid = xid.split('.')
_, id = self.registry('ir.model.data').get_object_reference(self.cr, self.uid, module, xid)
return id
@classmethod
def browse_ref(self, xid):
""" Returns a browsable record for the given identifier.
:param xid: fully-qualified record identifier, in the form ``module.identifier``
:raise: ValueError if not found
"""
assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
module, xid = xid.split('.')
return self.registry('ir.model.data').get_object(self.cr, self.uid, module, xid)
class TransactionCase(BaseCase):
"""
Subclass of BaseCase with a single transaction, rolled-back at the end of
each test (method).
"""
def setUp(self):
# Store cr and uid in class variables, to allow ref() and browse_ref to be BaseCase @classmethods
# and still access them
TransactionCase.cr = self.cursor()
TransactionCase.uid = openerp.SUPERUSER_ID
def tearDown(self):
self.cr.rollback()
self.cr.close()
class SingleTransactionCase(BaseCase):
"""
Subclass of BaseCase with a single transaction for the whole class,
rolled-back after all the tests.
"""
@classmethod
def setUpClass(cls):
cls.cr = cls.cursor()
cls.uid = openerp.SUPERUSER_ID
@classmethod
def tearDownClass(cls):
cls.cr.rollback()
cls.cr.close()
class HttpCase(TransactionCase):
""" Transactionnal HTTP TestCase with a phantomjs helper.
"""
def __init__(self, methodName='runTest'):
super(HttpCase, self).__init__(methodName)
# v8 api with correct xmlrpc exception handling.
self.xmlrpc_url = url_8 = 'http://%s:%d/xmlrpc/2/' % (HOST, PORT)
self.xmlrpc_common = xmlrpclib.ServerProxy(url_8 + 'common')
self.xmlrpc_db = xmlrpclib.ServerProxy(url_8 + 'db')
self.xmlrpc_object = xmlrpclib.ServerProxy(url_8 + 'object')
def setUp(self):
super(HttpCase, self).setUp()
# setup a magic session_id that will be rollbacked
self.session = openerp.http.root.session_store.new()
self.session_id = self.session.sid
self.session.db = DB
openerp.http.root.session_store.save(self.session)
self.cr._test_lock = threading.RLock()
HTTP_SESSION[self.session_id] = self.cr
def tearDown(self):
del HTTP_SESSION[self.session_id]
super(HttpCase, self).tearDown()
def phantom_poll(self, phantom, timeout):
""" Phantomjs Test protocol.
Use console.log in phantomjs to output test results:
- for a success: console.log("ok")
- for an error: console.log("error")
Other lines are relayed to the test log.
"""
t0 = time.time()
buf = bytearray()
while True:
# timeout
self.assertLess(time.time(), t0 + timeout,
"PhantomJS tests should take less than %s seconds" % timeout)
# read a byte
try:
ready, _, _ = select.select([phantom.stdout], [], [], 0.5)
except EnvironmentError, e:
if e.errno == errno.EINTR: continue
raise
if ready:
s = phantom.stdout.read(1)
if not s:
break
buf.append(s)
# process lines
if '\n' in buf:
line, buf = buf.split('\n', 1)
if line == "ok":
break
if line.startswith("error"):
# 'error $message' or use generic message
self.fail(line[6:] or "phantomjs test failed")
_logger.info("phantomjs: %s", line)
def phantom_run(self, cmd, timeout):
_logger.debug('executing `%s`', ' '.join(cmd))
try:
phantom = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except OSError:
raise unittest2.SkipTest("PhantomJS not found")
try:
self.phantom_poll(phantom, timeout)
finally:
# kill phantomjs if phantom.exit() wasn't called in the test
if phantom.poll() is None:
phantom.terminate()
def phantom_jsfile(self, jsfile, timeout=30, **kw):
options = {
'timeout' : timeout,
'port': PORT,
'db': DB,
'session_id': self.session_id,
}
options.update(kw)
phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
# phantom.args[0] == phantomtest path
# phantom.args[1] == options
cmd = ['phantomjs', jsfile, phantomtest, json.dumps(options)]
self.phantom_run(cmd, timeout)
def phantom_js(self, url_path, code, ready="window", login=None, timeout=30, **kw):
""" Test js code running in the browser
- optionnally log as 'login'
- load page given by url_path
- wait for ready object to be available
- eval(code) inside the page
To signal success test do:
console.log('ok')
To signal failure do:
console.log('error')
If neither are done before timeout test fails.
"""
options = {
'port': PORT,
'db': DB,
'url_path': url_path,
'code': code,
'ready': ready,
'timeout' : timeout,
'login' : login,
'session_id': self.session_id,
}
options.update(kw)
options.setdefault('password', options.get('login'))
phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
cmd = ['phantomjs', phantomtest, json.dumps(options)]
self.phantom_run(cmd, timeout)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: