[MERGE] tests: added database creation test via XML-RPC.

bzr revid: vmt@openerp.com-20110924171507-p6x41jshfkrquz2f
This commit is contained in:
Vo Minh Thu 2011-09-24 19:15:07 +02:00
commit 459213444d
4 changed files with 142 additions and 39 deletions

View File

@ -139,27 +139,6 @@ def import_translation():
cr.commit()
cr.close()
def start_services():
http_server = openerp.service.http_server
netrpc_server = openerp.service.netrpc_server
# Instantiate local services (this is a legacy design).
openerp.osv.osv.start_object_proxy()
# Export (for RPC) services.
openerp.service.web_services.start_web_services()
# Initialize the HTTP stack.
http_server.init_servers()
http_server.init_xmlrpc()
http_server.init_static_http()
netrpc_server.init_servers()
# Start the main cron thread.
openerp.netsvc.start_agent()
# Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
openerp.netsvc.Server.startAll()
# Variable keeping track of the number of calls to the signal handler defined
# below. This variable is monitored by ``quit_on_signals()``.
quit_signals_received = 0
@ -211,26 +190,10 @@ def quit_on_signals():
while quit_signals_received == 0:
time.sleep(60)
openerp.netsvc.Agent.quit()
openerp.netsvc.Server.quitAll()
config = openerp.tools.config
if config['pidfile']:
os.unlink(config['pidfile'])
logger = logging.getLogger('server')
logger.info("Initiating shutdown")
logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
logging.shutdown()
# manually join() all threads before calling sys.exit() to allow a second signal
# to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
# threading.Thread.join() should not mask signals (at least in python 2.5)
for thread in threading.enumerate():
if thread != threading.currentThread() and not thread.isDaemon():
while thread.isAlive():
# need a busyloop here as thread.join() masks signals
# and would present the forced shutdown
thread.join(0.05)
time.sleep(0.05)
openerp.service.stop_services()
sys.exit(0)
if __name__ == "__main__":
@ -262,7 +225,7 @@ if __name__ == "__main__":
if not config["stop_after_init"]:
# Some module register themselves when they are loaded so we need the
# services to be running before loading any registry.
start_services()
openerp.service.start_services()
if config['db_name']:
for dbname in config['db_name'].split(','):

View File

@ -19,10 +19,18 @@
#
##############################################################################
import logging
import threading
import time
import http_server
import netrpc_server
import web_services
import openerp.netsvc
import openerp.osv
import openerp.tools
#.apidoc title: RPC Services
""" Classes of this module implement the network protocols that the
@ -34,5 +42,50 @@ import web_services
low-level behavior of the wire.
"""
def start_services():
""" Start all services.
Services include the different servers and cron threads.
"""
# Instantiate local services (this is a legacy design).
openerp.osv.osv.start_object_proxy()
# Export (for RPC) services.
web_services.start_web_services()
# Initialize the HTTP stack.
http_server.init_servers()
http_server.init_xmlrpc()
http_server.init_static_http()
netrpc_server.init_servers()
# Start the main cron thread.
openerp.netsvc.start_agent()
# Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
openerp.netsvc.Server.startAll()
def stop_services():
""" Stop all services. """
openerp.netsvc.Agent.quit()
openerp.netsvc.Server.quitAll()
config = openerp.tools.config
logger = logging.getLogger('server')
logger.info("Initiating shutdown")
logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
logging.shutdown()
# Manually join() all threads before calling sys.exit() to allow a second signal
# to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
# threading.Thread.join() should not mask signals (at least in python 2.5).
for thread in threading.enumerate():
if thread != threading.currentThread() and not thread.isDaemon():
while thread.isAlive():
# Need a busyloop here as thread.join() masks signals
# and would prevent the forced shutdown.
thread.join(0.05)
time.sleep(0.05)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

2
tests/__init__.py Normal file
View File

@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
import test_xmlrpc

85
tests/test_xmlrpc.py Normal file
View File

@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
# Run with one of these commands:
# > OPENERP_ADDONS_PATH='../../addons/trunk' OPENERP_PORT=8069 \
# OPENERP_DATABASE=yy PYTHONPATH=. python tests/test_xmlrpc.py
# > OPENERP_ADDONS_PATH='../../addons/trunk' OPENERP_PORT=8069 \
# OPENERP_DATABASE=yy nosetests tests/test_xmlrpc.py
# > OPENERP_ADDONS_PATH='../../../addons/trunk' OPENERP_PORT=8069 \
# OPENERP_DATABASE=yy PYTHONPATH=../:. unit2 test_xmlrpc
import os
import time
import unittest2
import xmlrpclib
import openerp
ADDONS_PATH = os.environ['OPENERP_ADDONS_PATH']
PORT = int(os.environ['OPENERP_PORT'])
DB = os.environ['OPENERP_DATABASE']
HOST = '127.0.0.1'
ADMIN_USER = 'admin'
ADMIN_USER_ID = 1
ADMIN_PASSWORD = 'admin'
common_proxy_60 = None
db_proxy_60 = None
object_proxy_60 = None
def setUpModule():
"""
Start the OpenERP server similary to the openerp-server script and
setup some xmlrpclib proxies.
"""
openerp.tools.config['addons_path'] = ADDONS_PATH
openerp.tools.config['xmlrpc_port'] = PORT
openerp.service.start_services()
global common_proxy_60
global db_proxy_60
global object_proxy_60
# Use the old (pre 6.1) API.
url = 'http://%s:%d/xmlrpc/' % (HOST, PORT)
common_proxy_60 = xmlrpclib.ServerProxy(url + 'common')
db_proxy_60 = xmlrpclib.ServerProxy(url + 'db')
object_proxy_60 = xmlrpclib.ServerProxy(url + 'object')
def tearDownModule():
""" Shutdown the OpenERP server similarly to a single ctrl-c. """
openerp.service.stop_services()
class test_xmlrpc(unittest2.TestCase):
def test_xmlrpc_create_database_polling(self):
"""
Simulate a OpenERP client requesting the creation of a database and
polling the server until the creation is complete.
"""
progress_id = db_proxy_60.create(ADMIN_PASSWORD, DB, True, False,
ADMIN_PASSWORD)
while True:
time.sleep(1)
progress, users = db_proxy_60.get_progress(ADMIN_PASSWORD,
progress_id)
if progress == 1.0:
break
def test_xmlrpc_login(self):
""" Try to login on the common service. """
uid = common_proxy_60.login(DB, ADMIN_USER, ADMIN_PASSWORD)
assert uid == ADMIN_USER_ID
def test_xmlrpc_ir_model_search(self):
""" Try a search on the object service. """
ids = object_proxy_60.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD,
'ir.model', 'search', [])
assert ids
ids = object_proxy_60.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD,
'ir.model', 'search', [], {})
assert ids
if __name__ == '__main__':
unittest2.main()