2011-09-28 10:30:09 +00:00
|
|
|
# -*- coding: utf-8 -*-
|
2012-10-30 11:37:37 +00:00
|
|
|
"""
|
2014-02-09 15:18:22 +00:00
|
|
|
The module :mod:`openerp.tests.common` provides unittest2 test cases and a few
|
|
|
|
helpers and classes to write tests.
|
|
|
|
|
2012-10-30 11:37:37 +00:00
|
|
|
"""
|
2014-02-19 10:11:10 +00:00
|
|
|
import errno
|
2014-02-09 00:37:45 +00:00
|
|
|
import json
|
2014-02-09 22:39:17 +00:00
|
|
|
import logging
|
2014-02-09 00:37:45 +00:00
|
|
|
import os
|
|
|
|
import select
|
|
|
|
import subprocess
|
2012-08-22 13:48:20 +00:00
|
|
|
import threading
|
2011-09-28 10:30:09 +00:00
|
|
|
import time
|
2012-02-16 16:27:06 +00:00
|
|
|
import unittest2
|
2014-03-06 23:45:35 +00:00
|
|
|
import urllib2
|
2011-09-28 10:30:09 +00:00
|
|
|
import xmlrpclib
|
|
|
|
|
2014-03-10 10:15:54 +00:00
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
2011-09-28 10:30:09 +00:00
|
|
|
import openerp
|
|
|
|
|
2014-02-09 00:37:45 +00:00
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
2011-12-01 14:00:12 +00:00
|
|
|
# The openerp library is supposed already configured.
|
|
|
|
ADDONS_PATH = openerp.tools.config['addons_path']
|
2014-02-09 00:37:45 +00:00
|
|
|
HOST = '127.0.0.1'
|
2011-12-01 14:00:12 +00:00
|
|
|
PORT = openerp.tools.config['xmlrpc_port']
|
|
|
|
DB = openerp.tools.config['db_name']
|
2012-08-22 13:48:20 +00:00
|
|
|
# If the database name is not provided on the command-line,
|
|
|
|
# use the one on the thread (which means if it is provided on
|
|
|
|
# the command-line, this will break when installing another
|
|
|
|
# database from XML-RPC).
|
|
|
|
if not DB and hasattr(threading.current_thread(), 'dbname'):
|
|
|
|
DB = threading.current_thread().dbname
|
2014-02-16 21:22:22 +00:00
|
|
|
# Useless constant, tests are aware of the content of demo data
|
2012-05-22 11:00:31 +00:00
|
|
|
ADMIN_USER_ID = openerp.SUPERUSER_ID
|
2011-09-28 10:30:09 +00:00
|
|
|
|
2014-02-16 21:22:22 +00:00
|
|
|
# Magic session_id, unfortunately we have to serialize access to the cursors to
|
|
|
|
# serialize requests. We first tried to duplicate the database for each tests
|
|
|
|
# but this proved too slow. Any idea to improve this is welcome.
|
2014-02-09 00:37:45 +00:00
|
|
|
HTTP_SESSION = {}
|
2011-11-22 08:58:48 +00:00
|
|
|
|
2014-02-16 21:22:22 +00:00
|
|
|
def acquire_test_cursor(session_id):
|
|
|
|
if openerp.tools.config['test_enable']:
|
|
|
|
cr = HTTP_SESSION.get(session_id)
|
|
|
|
if cr:
|
|
|
|
cr._test_lock.acquire()
|
|
|
|
return cr
|
|
|
|
|
|
|
|
def release_test_cursor(session_id):
|
|
|
|
if openerp.tools.config['test_enable']:
|
|
|
|
cr = HTTP_SESSION.get(session_id)
|
|
|
|
if cr:
|
|
|
|
cr._test_lock.release()
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
2014-02-18 09:54:52 +00:00
|
|
|
def at_install(flag):
|
|
|
|
""" Sets the at-install state of a test, the flag is a boolean specifying
|
|
|
|
whether the test should (``True``) or should not (``False``) run during
|
|
|
|
module installation.
|
|
|
|
|
|
|
|
By default, tests are run at install.
|
|
|
|
"""
|
|
|
|
def decorator(obj):
|
|
|
|
obj.at_install = flag
|
|
|
|
return obj
|
|
|
|
return decorator
|
|
|
|
def post_install(flag):
|
|
|
|
""" Sets the post-install state of a test. The flag is a boolean
|
|
|
|
specifying whether the test should or should not run after a set of
|
|
|
|
module installations.
|
|
|
|
|
|
|
|
By default, tests are *not* run after installation.
|
|
|
|
"""
|
|
|
|
def decorator(obj):
|
|
|
|
obj.post_install = flag
|
|
|
|
return obj
|
|
|
|
return decorator
|
2014-02-16 21:22:22 +00:00
|
|
|
|
2012-10-29 14:23:02 +00:00
|
|
|
class BaseCase(unittest2.TestCase):
|
2012-02-16 16:27:06 +00:00
|
|
|
"""
|
2012-10-29 14:23:02 +00:00
|
|
|
Subclass of TestCase for common OpenERP-specific code.
|
2012-10-31 10:30:27 +00:00
|
|
|
|
|
|
|
This class is abstract and expects self.cr and self.uid to be initialized by subclasses.
|
2012-10-29 14:23:02 +00:00
|
|
|
"""
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def cursor(self):
|
|
|
|
return openerp.modules.registry.RegistryManager.get(DB).db.cursor()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def registry(self, model):
|
|
|
|
return openerp.modules.registry.RegistryManager.get(DB)[model]
|
2012-10-31 15:21:49 +00:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def ref(self, xid):
|
|
|
|
""" Returns database ID corresponding to a given identifier.
|
|
|
|
|
|
|
|
:param xid: fully-qualified record identifier, in the form ``module.identifier``
|
|
|
|
:raise: ValueError if not found
|
|
|
|
"""
|
|
|
|
assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
|
|
|
|
module, xid = xid.split('.')
|
|
|
|
_, id = self.registry('ir.model.data').get_object_reference(self.cr, self.uid, module, xid)
|
2012-10-31 10:30:27 +00:00
|
|
|
return id
|
2012-10-31 15:21:49 +00:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def browse_ref(self, xid):
|
|
|
|
""" Returns a browsable record for the given identifier.
|
|
|
|
|
|
|
|
:param xid: fully-qualified record identifier, in the form ``module.identifier``
|
|
|
|
:raise: ValueError if not found
|
|
|
|
"""
|
|
|
|
assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
|
|
|
|
module, xid = xid.split('.')
|
|
|
|
return self.registry('ir.model.data').get_object(self.cr, self.uid, module, xid)
|
|
|
|
|
2012-10-29 14:23:02 +00:00
|
|
|
|
|
|
|
class TransactionCase(BaseCase):
|
2012-02-16 16:27:06 +00:00
|
|
|
"""
|
2012-10-29 14:23:02 +00:00
|
|
|
Subclass of BaseCase with a single transaction, rolled-back at the end of
|
|
|
|
each test (method).
|
2012-02-16 16:27:06 +00:00
|
|
|
"""
|
|
|
|
|
|
|
|
def setUp(self):
|
2012-11-09 17:08:26 +00:00
|
|
|
# Store cr and uid in class variables, to allow ref() and browse_ref to be BaseCase @classmethods
|
|
|
|
# and still access them
|
|
|
|
TransactionCase.cr = self.cursor()
|
|
|
|
TransactionCase.uid = openerp.SUPERUSER_ID
|
2012-02-16 16:27:06 +00:00
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
self.cr.rollback()
|
|
|
|
self.cr.close()
|
|
|
|
|
2012-07-23 15:03:06 +00:00
|
|
|
|
2012-10-29 14:23:02 +00:00
|
|
|
class SingleTransactionCase(BaseCase):
|
|
|
|
"""
|
|
|
|
Subclass of BaseCase with a single transaction for the whole class,
|
|
|
|
rolled-back after all the tests.
|
|
|
|
"""
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def setUpClass(cls):
|
|
|
|
cls.cr = cls.cursor()
|
|
|
|
cls.uid = openerp.SUPERUSER_ID
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def tearDownClass(cls):
|
|
|
|
cls.cr.rollback()
|
|
|
|
cls.cr.close()
|
|
|
|
|
2012-03-01 13:46:08 +00:00
|
|
|
|
2014-02-10 00:45:17 +00:00
|
|
|
class HttpCase(TransactionCase):
|
2014-03-06 23:45:35 +00:00
|
|
|
""" Transactionnal HTTP TestCase with url_open and phantomjs helpers.
|
2012-02-16 16:27:06 +00:00
|
|
|
"""
|
|
|
|
|
2012-10-30 11:37:37 +00:00
|
|
|
def __init__(self, methodName='runTest'):
|
2014-02-09 00:40:05 +00:00
|
|
|
super(HttpCase, self).__init__(methodName)
|
|
|
|
# v8 api with correct xmlrpc exception handling.
|
|
|
|
self.xmlrpc_url = url_8 = 'http://%s:%d/xmlrpc/2/' % (HOST, PORT)
|
|
|
|
self.xmlrpc_common = xmlrpclib.ServerProxy(url_8 + 'common')
|
|
|
|
self.xmlrpc_db = xmlrpclib.ServerProxy(url_8 + 'db')
|
|
|
|
self.xmlrpc_object = xmlrpclib.ServerProxy(url_8 + 'object')
|
2012-02-16 16:27:06 +00:00
|
|
|
|
2014-02-10 00:45:17 +00:00
|
|
|
def setUp(self):
|
|
|
|
super(HttpCase, self).setUp()
|
2014-02-16 21:22:22 +00:00
|
|
|
# setup a magic session_id that will be rollbacked
|
|
|
|
self.session = openerp.http.root.session_store.new()
|
|
|
|
self.session_id = self.session.sid
|
|
|
|
self.session.db = DB
|
|
|
|
openerp.http.root.session_store.save(self.session)
|
|
|
|
self.cr._test_lock = threading.RLock()
|
2014-02-10 00:45:17 +00:00
|
|
|
HTTP_SESSION[self.session_id] = self.cr
|
2012-02-16 16:27:06 +00:00
|
|
|
|
2014-02-10 00:45:17 +00:00
|
|
|
def tearDown(self):
|
|
|
|
del HTTP_SESSION[self.session_id]
|
|
|
|
super(HttpCase, self).tearDown()
|
2012-02-16 16:27:06 +00:00
|
|
|
|
2014-03-06 23:45:35 +00:00
|
|
|
def url_open(self, url, data=None, timeout=10):
|
|
|
|
opener = urllib2.build_opener()
|
|
|
|
opener.addheaders.append(('Cookie', 'session_id=%s' % self.session_id))
|
|
|
|
if url.startswith('/'):
|
|
|
|
url = "http://localhost:%s%s" % (PORT, url)
|
|
|
|
return opener.open(url, data, timeout)
|
|
|
|
|
2014-02-10 00:45:17 +00:00
|
|
|
def phantom_poll(self, phantom, timeout):
|
2014-02-09 00:40:05 +00:00
|
|
|
""" Phantomjs Test protocol.
|
|
|
|
|
2014-02-09 22:39:17 +00:00
|
|
|
Use console.log in phantomjs to output test results:
|
2014-02-09 00:40:05 +00:00
|
|
|
|
2014-02-09 22:39:17 +00:00
|
|
|
- for a success: console.log("ok")
|
|
|
|
- for an error: console.log("error")
|
2014-02-09 00:40:05 +00:00
|
|
|
|
2014-02-09 22:39:17 +00:00
|
|
|
Other lines are relayed to the test log.
|
2014-02-09 00:40:05 +00:00
|
|
|
|
|
|
|
"""
|
2014-03-10 10:15:54 +00:00
|
|
|
t0 = datetime.now()
|
|
|
|
td = timedelta(seconds=timeout)
|
2014-02-18 13:34:45 +00:00
|
|
|
buf = bytearray()
|
2014-02-19 10:13:34 +00:00
|
|
|
while True:
|
2014-02-10 00:45:17 +00:00
|
|
|
# timeout
|
2014-03-10 10:15:54 +00:00
|
|
|
self.assertLess(datetime.now() - t0, td,
|
2014-02-18 13:34:45 +00:00
|
|
|
"PhantomJS tests should take less than %s seconds" % timeout)
|
2014-02-10 00:45:17 +00:00
|
|
|
|
|
|
|
# read a byte
|
2014-02-19 10:11:10 +00:00
|
|
|
try:
|
|
|
|
ready, _, _ = select.select([phantom.stdout], [], [], 0.5)
|
2014-02-19 15:53:03 +00:00
|
|
|
except select.error, e:
|
|
|
|
# In Python 2, select.error has no relation to IOError or
|
|
|
|
# OSError, and no errno/strerror/filename, only a pair of
|
|
|
|
# unnamed arguments (matching errno and strerror)
|
|
|
|
err, _ = e.args
|
2014-03-16 16:06:34 +00:00
|
|
|
if err == errno.EINTR:
|
|
|
|
continue
|
2014-02-19 10:11:10 +00:00
|
|
|
raise
|
|
|
|
|
2014-02-10 00:45:17 +00:00
|
|
|
if ready:
|
|
|
|
s = phantom.stdout.read(1)
|
2014-02-19 10:13:34 +00:00
|
|
|
if not s:
|
2014-02-10 00:45:17 +00:00
|
|
|
break
|
2014-02-19 10:13:34 +00:00
|
|
|
buf.append(s)
|
2014-02-10 00:45:17 +00:00
|
|
|
|
|
|
|
# process lines
|
|
|
|
if '\n' in buf:
|
|
|
|
line, buf = buf.split('\n', 1)
|
2014-02-19 12:38:50 +00:00
|
|
|
line = str(line)
|
2014-03-16 16:06:34 +00:00
|
|
|
|
|
|
|
# relay everything from console.log, even 'ok' or 'error...' lines
|
2014-03-16 19:52:52 +00:00
|
|
|
_logger.info("phantomjs: %s", line)
|
2014-03-16 16:06:34 +00:00
|
|
|
|
2014-02-10 00:45:17 +00:00
|
|
|
if line == "ok":
|
2014-02-19 10:13:34 +00:00
|
|
|
break
|
2014-02-18 13:34:45 +00:00
|
|
|
if line.startswith("error"):
|
2014-03-16 16:06:34 +00:00
|
|
|
line_ = self.line[6:]
|
|
|
|
# when error occurs the execution stack may be sent as as JSON
|
|
|
|
try:
|
|
|
|
line_ = json.loads(line_)
|
|
|
|
except ValueError:
|
|
|
|
pass
|
2014-02-19 12:38:50 +00:00
|
|
|
self.fail(line_ or "phantomjs test failed")
|
|
|
|
|
2014-02-10 00:45:17 +00:00
|
|
|
def phantom_run(self, cmd, timeout):
|
2014-03-16 19:52:52 +00:00
|
|
|
_logger.info('phantom_run executing %s', ' '.join(cmd))
|
2014-02-09 00:40:05 +00:00
|
|
|
try:
|
|
|
|
phantom = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
|
|
except OSError:
|
2014-02-18 13:34:45 +00:00
|
|
|
raise unittest2.SkipTest("PhantomJS not found")
|
2014-02-09 00:40:05 +00:00
|
|
|
try:
|
2014-02-10 00:45:17 +00:00
|
|
|
self.phantom_poll(phantom, timeout)
|
2014-02-09 00:40:05 +00:00
|
|
|
finally:
|
|
|
|
# kill phantomjs if phantom.exit() wasn't called in the test
|
|
|
|
if phantom.poll() is None:
|
|
|
|
phantom.terminate()
|
2014-03-16 19:52:52 +00:00
|
|
|
_logger.info("phantom_run execution finished")
|
2012-02-16 16:27:06 +00:00
|
|
|
|
2014-02-10 00:45:17 +00:00
|
|
|
def phantom_jsfile(self, jsfile, timeout=30, **kw):
|
|
|
|
options = {
|
|
|
|
'timeout' : timeout,
|
|
|
|
'port': PORT,
|
|
|
|
'db': DB,
|
|
|
|
'session_id': self.session_id,
|
|
|
|
}
|
|
|
|
options.update(kw)
|
|
|
|
phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
|
|
|
|
# phantom.args[0] == phantomtest path
|
|
|
|
# phantom.args[1] == options
|
|
|
|
cmd = ['phantomjs', jsfile, phantomtest, json.dumps(options)]
|
|
|
|
self.phantom_run(cmd, timeout)
|
|
|
|
|
2014-02-16 21:22:22 +00:00
|
|
|
def phantom_js(self, url_path, code, ready="window", login=None, timeout=30, **kw):
|
2014-02-10 00:45:17 +00:00
|
|
|
""" Test js code running in the browser
|
2014-02-16 21:22:22 +00:00
|
|
|
- optionnally log as 'login'
|
2014-02-10 00:45:17 +00:00
|
|
|
- load page given by url_path
|
|
|
|
- wait for ready object to be available
|
|
|
|
- eval(code) inside the page
|
|
|
|
|
|
|
|
To signal success test do:
|
|
|
|
console.log('ok')
|
|
|
|
|
|
|
|
To signal failure do:
|
|
|
|
console.log('error')
|
|
|
|
|
|
|
|
If neither are done before timeout test fails.
|
|
|
|
"""
|
|
|
|
options = {
|
2014-02-16 21:22:22 +00:00
|
|
|
'port': PORT,
|
|
|
|
'db': DB,
|
2014-02-10 00:45:17 +00:00
|
|
|
'url_path': url_path,
|
|
|
|
'code': code,
|
|
|
|
'ready': ready,
|
|
|
|
'timeout' : timeout,
|
2014-02-16 21:22:22 +00:00
|
|
|
'login' : login,
|
2014-02-10 00:45:17 +00:00
|
|
|
'session_id': self.session_id,
|
|
|
|
}
|
|
|
|
options.update(kw)
|
2014-02-16 21:22:22 +00:00
|
|
|
options.setdefault('password', options.get('login'))
|
2014-02-10 00:45:17 +00:00
|
|
|
phantomtest = os.path.join(os.path.dirname(__file__), 'phantomtest.js')
|
|
|
|
cmd = ['phantomjs', phantomtest, json.dumps(options)]
|
|
|
|
self.phantom_run(cmd, timeout)
|
|
|
|
|
2012-03-01 10:54:19 +00:00
|
|
|
|
2011-11-22 08:58:48 +00:00
|
|
|
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
|