[IMP] tests: added a simple test case to create a database via XML-RPC.

The test setup starts an OpenERP server similarly to the
openerp-server script. For this to be possible, some of
its code is moved to openerp.services.

bzr revid: vmt@openerp.com-20110923133408-qqf4i5l86pkk7xn5
This commit is contained in:
Vo Minh Thu 2011-09-23 15:34:08 +02:00
parent 0c123711c5
commit cc26f2b5d2
4 changed files with 136 additions and 39 deletions

View File

@ -136,27 +136,6 @@ def import_translation():
cr.commit()
cr.close()
def start_services():
http_server = openerp.service.http_server
netrpc_server = openerp.service.netrpc_server
# Instantiate local services (this is a legacy design).
openerp.osv.osv.start_object_proxy()
# Export (for RPC) services.
openerp.service.web_services.start_web_services()
# Initialize the HTTP stack.
http_server.init_servers()
http_server.init_xmlrpc()
http_server.init_static_http()
netrpc_server.init_servers()
# Start the main cron thread.
openerp.netsvc.start_agent()
# Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
openerp.netsvc.Server.startAll()
# Variable keeping track of the number of calls to the signal handler defined
# below. This variable is monitored by ``quit_on_signals()``.
quit_signals_received = 0
@ -208,26 +187,10 @@ def quit_on_signals():
while quit_signals_received == 0:
time.sleep(60)
openerp.netsvc.Agent.quit()
openerp.netsvc.Server.quitAll()
config = openerp.tools.config
if config['pidfile']:
os.unlink(config['pidfile'])
logger = logging.getLogger('server')
logger.info("Initiating shutdown")
logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
logging.shutdown()
# manually join() all threads before calling sys.exit() to allow a second signal
# to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
# threading.Thread.join() should not mask signals (at least in python 2.5)
for thread in threading.enumerate():
if thread != threading.currentThread() and not thread.isDaemon():
while thread.isAlive():
# need a busyloop here as thread.join() masks signals
# and would present the forced shutdown
thread.join(0.05)
time.sleep(0.05)
openerp.service.stop_services()
sys.exit(0)
if __name__ == "__main__":
@ -259,7 +222,7 @@ if __name__ == "__main__":
if not config["stop_after_init"]:
# Some module register themselves when they are loaded so we need the
# services to be running before loading any registry.
start_services()
openerp.service.start_services()
if config['db_name']:
for dbname in config['db_name'].split(','):

View File

@ -34,5 +34,52 @@ import web_services
low-level behavior of the wire.
"""
def start_services():
import openerp
http_server = openerp.service.http_server
netrpc_server = openerp.service.netrpc_server
# Instantiate local services (this is a legacy design).
openerp.osv.osv.start_object_proxy()
# Export (for RPC) services.
openerp.service.web_services.start_web_services()
# Initialize the HTTP stack.
http_server.init_servers()
http_server.init_xmlrpc()
http_server.init_static_http()
netrpc_server.init_servers()
# Start the main cron thread.
openerp.netsvc.start_agent()
# Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
openerp.netsvc.Server.startAll()
def stop_services():
import openerp
import logging
import threading
import time
openerp.netsvc.Agent.quit()
openerp.netsvc.Server.quitAll()
config = openerp.tools.config
logger = logging.getLogger('server')
logger.info("Initiating shutdown")
logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
logging.shutdown()
# manually join() all threads before calling sys.exit() to allow a second signal
# to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
# threading.Thread.join() should not mask signals (at least in python 2.5)
for thread in threading.enumerate():
if thread != threading.currentThread() and not thread.isDaemon():
while thread.isAlive():
# need a busyloop here as thread.join() masks signals
# and would present the forced shutdown
thread.join(0.05)
time.sleep(0.05)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

2
tests/__init__.py Normal file
View File

@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
import test_xmlrpc

85
tests/test_xmlrpc.py Normal file
View File

@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
# Run with one of these commands:
# > OPENERP_ADDONS_PATH='../../addons/trunk' OPENERP_PORT=8069 \
# OPENERP_DATABASE=yy PYTHONPATH=. python tests/test_xmlrpc.py
# > OPENERP_ADDONS_PATH='../../addons/trunk' OPENERP_PORT=8069 \
# OPENERP_DATABASE=yy nosetests tests/test_xmlrpc.py
# > OPENERP_ADDONS_PATH='../../../addons/trunk' OPENERP_PORT=8069 \
# OPENERP_DATABASE=yy PYTHONPATH=../:. unit2 test_xmlrpc
import os
import time
import unittest2
import xmlrpclib
import openerp
ADDONS_PATH = os.environ['OPENERP_ADDONS_PATH']
PORT = int(os.environ['OPENERP_PORT'])
DB = os.environ['OPENERP_DATABASE']
HOST = '127.0.0.1'
ADMIN_USER = 'admin'
ADMIN_USER_ID = 1
ADMIN_PASSWORD = 'admin'
common_proxy_60 = None
db_proxy_60 = None
object_proxy_60 = None
def setUpModule():
"""
Start the OpenERP server similary to the openerp-server script and
setup some xmlrpclib proxies.
"""
openerp.tools.config['addons_path'] = ADDONS_PATH
openerp.tools.config['xmlrpc_port'] = PORT
openerp.service.start_services()
global common_proxy_60
global db_proxy_60
global object_proxy_60
# Use the old (pre 6.1) API.
url = 'http://%s:%d/xmlrpc/' % (HOST, PORT)
common_proxy_60 = xmlrpclib.ServerProxy(url + 'common')
db_proxy_60 = xmlrpclib.ServerProxy(url + 'db')
object_proxy_60 = xmlrpclib.ServerProxy(url + 'object')
def tearDownModule():
""" Shutdown the OpenERP server similarly to a single ctrl-c. """
openerp.service.stop_services()
class test_xmlrpc(unittest2.TestCase):
def test_xmlrpc_create_database_polling(self):
"""
Simulate a OpenERP client requesting the creation of a database and
polling the server until the creation is complete.
"""
progress_id = db_proxy_60.create(ADMIN_PASSWORD, DB, True, False,
ADMIN_PASSWORD)
while True:
time.sleep(1)
progress, users = db_proxy_60.get_progress(ADMIN_PASSWORD,
progress_id)
if progress == 1.0:
break
def test_xmlrpc_login(self):
""" Try to login on the common service. """
uid = common_proxy_60.login(DB, ADMIN_USER, ADMIN_PASSWORD)
assert uid == ADMIN_USER_ID
def test_xmlrpc_ir_model_search(self):
""" Try a search on the object service. """
ids = object_proxy_60.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD,
'ir.model', 'search', [])
assert ids
ids = object_proxy_60.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD,
'ir.model', 'search', [], {})
assert ids
if __name__ == '__main__':
unittest2.main()