[MERGE] trunk-test-al easier http testing

bzr revid: al@openerp.com-20140209132830-gm3lh535tq814jvi
This commit is contained in:
Antony Lesuisse 2014-02-09 14:28:30 +01:00
commit ffde426da4
86 changed files with 457 additions and 736 deletions

View File

@ -34,7 +34,4 @@ Importing them from here is deprecated.
"""
# get_module_path is used only by base_module_quality
from openerp.modules import get_module_resource, get_module_path
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -25,6 +25,7 @@ import module
import res
import report
import test
import tests
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -92,8 +92,6 @@ The kernel of OpenERP, needed for all installation.
],
'test': [
'test/base_test.yml',
'test/test_context.xml',
'test/bug_lp541545.xml',
'test/test_osv_expression.yml',
'test/test_ir_rule.yml', # <-- These tests modify/add/delete ir_rules.
],

View File

@ -1,14 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<openerp>
<!-- Test count attribute for assertions -->
<data>
<assert
model="res.currency.rate"
search="[('currency_id', '=', ref('INR'))]"
count="1"
string="Rate entries for Indian rupee">
<test expr="True"/>
</assert>
</data>
</openerp>

View File

@ -1,20 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<openerp>
<!-- Test context attribute for unit tests -->
<data context="{'date': '2009-06-01'}">
<!-- <assert -->
<!-- model="res.currency"-->
<!-- id="INR"-->
<!-- string="Indian rupee rate in 2009">-->
<!-- date specified in data element's context -->
<!-- <test expr="str(rate)">65.8287</test>-->
<!-- </assert> -->
<assert
model="res.currency"
id="INR"
string="Indian rupee rate in 2002"
context="{'date': '2010-06-01'}">
<test expr="rate_ids and str(rate_ids[0].rate)">59.9739</test>
</assert>
</data>
</openerp>

View File

@ -1,3 +1,17 @@
import test_acl
import test_basecase
import test_db_cursor
import test_expression
import test_fields
import test_ir_filters
import test_ir_sequence
import test_mail
import test_orm
import test_osv
import test_translate
#import test_uninstall
import test_view_validation
import test_xmlrpc
import test_base
import test_expression
import test_ir_actions
@ -8,16 +22,4 @@ import test_res_config
import test_res_lang
import test_search
import test_views
checks = [
test_base,
test_expression,
test_ir_actions,
test_ir_attachment,
test_ir_values,
test_menu,
test_res_config,
test_res_lang,
test_search,
test_views,
]
import test_phantom

View File

@ -3,8 +3,7 @@ from lxml import etree
import openerp
from openerp.tools.misc import mute_logger
import common
from openerp.tests import common
# test group that demo user should not have
GROUP_TECHNICAL_FEATURES = 'base.group_no_one'

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import unittest2
import common
from openerp.tests import common
class test_single_transaction_case(common.SingleTransactionCase):
"""

View File

@ -4,7 +4,7 @@ import unittest2
import openerp
from openerp.tools.misc import mute_logger
import common
from openerp.tests import common
DB = common.DB
ADMIN_USER_ID = common.ADMIN_USER_ID

View File

@ -1,8 +1,9 @@
import unittest2
import openerp
from openerp.osv.orm import BaseModel
import openerp.tests.common as common
class test_expression(common.TransactionCase):
def _reinit_mock(self):
@ -439,5 +440,13 @@ class test_expression(common.TransactionCase):
partner_parent_id_col._auto_join = False
state_country_id_col._auto_join = False
def test_30_normalize_domain(self):
expression = openerp.osv.expression
norm_domain = domain = ['&', (1, '=', 1), ('a', '=', 'b')]
assert norm_domain == expression.normalize_domain(domain), "Normalized domains should be left untouched"
domain = [('x', 'in', ['y', 'z']), ('a.v', '=', 'e'), '|', '|', ('a', '=', 'b'), '!', ('c', '>', 'd'), ('e', '!=', 'f'), ('g', '=', 'h')]
norm_domain = ['&', '&', '&'] + domain
assert norm_domain == expression.normalize_domain(domain), "Non-normalized domains should be properly normalized"
if __name__ == '__main__':
unittest2.main()

View File

@ -1,9 +1,8 @@
#
# test cases for fields access, etc.
#
import common
from openerp.osv import fields
from openerp.tests import common
class TestRelatedField(common.TransactionCase):

View File

@ -2,7 +2,7 @@
import functools
from openerp import exceptions
from . import common
from openerp.tests import common
def noid(d):
""" Removes `id` key from a dict so we don't have to keep these things

View File

@ -11,7 +11,7 @@ import psycopg2
import unittest2
import openerp
import common
from openerp.tests import common
DB = common.DB
ADMIN_USER_ID = common.ADMIN_USER_ID

View File

@ -26,9 +26,10 @@ import unittest2
from lxml import etree
from openerp.tests import test_mail_examples
from openerp.tools import html_sanitize, html_email_clean, append_content_to_html, plaintext2html, email_split
import test_mail_examples
class TestSanitizer(unittest2.TestCase):
""" Test the html sanitizer that filters html to remove unwanted attributes """

View File

@ -1,13 +1,6 @@
# This test can be run stand-alone with something like:
# > PYTHONPATH=. python2 openerp/tests/test_misc.py
import datetime
import locale
import unittest2
import babel
import babel.dates
from ..tools import misc
from openerp.tools import misc
class test_countingstream(unittest2.TestCase):

View File

@ -1,6 +1,6 @@
from collections import defaultdict
from openerp.tools import mute_logger
import common
from openerp.tests import common
UID = common.ADMIN_USER_ID
DB = common.DB

View File

@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
import openerp
from openerp.tests import common
class test_phantom(common.HttpCase):
def test_01_dummy(self):
self.phantomjs(openerp.modules.module.get_module_resource('base','tests','test_phantom_dummy.js'))
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -0,0 +1,4 @@
console.log('{ "event": "success", "message": "Phantomjs success"}');
// For a failure:
// console.log('{ "event": "failure", "message": "The test failed" }');
phantom.exit();

View File

@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
# This assumes an existing but uninitialized database.
import unittest2
import openerp
from openerp import SUPERUSER_ID
import common
DB = common.DB
ADMIN_USER_ID = common.ADMIN_USER_ID
def registry(model):
return openerp.modules.registry.RegistryManager.get(DB)[model]
def cursor():
return openerp.modules.registry.RegistryManager.get(DB).db.cursor()
def get_module(module_name):
registry = openerp.modules.registry.RegistryManager.get(DB)
return registry.get(module_name)
def reload_registry():
openerp.modules.registry.RegistryManager.new(
DB, update_module=True)
def search_registry(model_name, domain):
cr = cursor()
model = registry(model_name)
record_ids = model.search(cr, SUPERUSER_ID, domain, {})
cr.close()
return record_ids
def install_module(module_name):
ir_module_module = registry('ir.module.module')
cr = cursor()
module_ids = ir_module_module.search(cr, SUPERUSER_ID,
[('name', '=', module_name)], {})
assert len(module_ids) == 1
ir_module_module.button_install(cr, SUPERUSER_ID, module_ids, {})
cr.commit()
cr.close()
reload_registry()
def uninstall_module(module_name):
ir_module_module = registry('ir.module.module')
cr = cursor()
module_ids = ir_module_module.search(cr, SUPERUSER_ID,
[('name', '=', module_name)], {})
assert len(module_ids) == 1
ir_module_module.button_uninstall(cr, SUPERUSER_ID, module_ids, {})
cr.commit()
cr.close()
reload_registry()
class test_uninstall(unittest2.TestCase):
"""
Test the install/uninstall of a test module. The module is available in
`openerp.tests` which should be present in the addons-path.
"""
def test_01_install(self):
""" Check a few things showing the module is installed. """
install_module('test_uninstall')
assert get_module('test_uninstall.model')
assert search_registry('ir.model.data',
[('module', '=', 'test_uninstall')])
assert search_registry('ir.model.fields',
[('model', '=', 'test_uninstall.model')])
def test_02_uninstall(self):
""" Check a few things showing the module is uninstalled. """
uninstall_module('test_uninstall')
assert not get_module('test_uninstall.model')
assert not search_registry('ir.model.data',
[('module', '=', 'test_uninstall')])
assert not search_registry('ir.model.fields',
[('model', '=', 'test_uninstall.model')])
if __name__ == '__main__':
unittest2.main()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
import time
import unittest2
import xmlrpclib
from openerp.tests import common
DB = common.DB
ADMIN_USER = common.ADMIN_USER
ADMIN_USER_ID = common.ADMIN_USER_ID
ADMIN_PASSWORD = common.ADMIN_PASSWORD
class test_xmlrpc(common.HttpCase):
def test_01_xmlrpc_login(self):
""" Try to login on the common service. """
uid = self.xmlrpc_common.login(DB, ADMIN_USER, ADMIN_PASSWORD)
self.assertTrue(uid == ADMIN_USER_ID)
def test_xmlrpc_ir_model_search(self):
""" Try a search on the object service. """
o = self.xmlrpc_object
ids = o.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD, 'ir.model', 'search', [])
self.assertIsInstance(ids, list)
ids = o.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD, 'ir.model', 'search', [], {})
self.assertIsInstance(ids, list)
# This test was written to test the creation of a new RPC endpoint, not
# really for the EDI itself.
#def test_xmlrpc_import_edi_document(self):
# """ Try to call an EDI method. """
# msg_re = 'EDI Document is empty!'
# with self.assertRaisesRegexp(Exception, msg_re):
# self.proxy.edi_60.import_edi_document(DB, ADMIN_USER_ID, ADMIN_PASSWORD, {})
if __name__ == '__main__':
unittest2.main()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

Before

Width:  |  Height:  |  Size: 37 KiB

After

Width:  |  Height:  |  Size: 37 KiB

View File

@ -89,29 +89,6 @@ def setup_pid_file():
pidtext = "%d" % (os.getpid())
fd.write(pidtext)
def preload_registry(dbname):
""" Preload a registry, and start the cron."""
try:
update_module = True if openerp.tools.config['init'] or openerp.tools.config['update'] else False
registry = openerp.modules.registry.RegistryManager.new(dbname, update_module=update_module)
except Exception:
_logger.exception('Failed to initialize database `%s`.', dbname)
return False
return registry._assertion_report.failures == 0
def run_test_file(dbname, test_file):
""" Preload a registry, possibly run a test file, and start the cron."""
try:
config = openerp.tools.config
registry = openerp.modules.registry.RegistryManager.new(dbname, update_module=config['init'] or config['update'])
cr = registry.db.cursor()
_logger.info('loading test file %s', test_file)
openerp.tools.convert_yaml_import(cr, 'base', file(test_file), 'test', {}, 'test', True)
cr.rollback()
cr.close()
except Exception:
_logger.exception('Failed to initialize database `%s` and run test file `%s`.', dbname, test_file)
def export_translation():
config = openerp.tools.config
dbname = config['db_name']
@ -156,8 +133,7 @@ def main(args):
config = openerp.tools.config
if config["test_file"]:
run_test_file(config['db_name'], config['test_file'])
sys.exit(0)
config["test_enable"] = True
if config["translate_out"]:
export_translation()
@ -172,23 +148,17 @@ def main(args):
if config['workers']:
openerp.multi_process = True
# preload registries, needed for -u --stop_after_init
rc = 0
preload = []
if config['db_name']:
for dbname in config['db_name'].split(','):
if not preload_registry(dbname):
rc += 1
preload = config['db_name'].split(',')
if not config["stop_after_init"]:
setup_pid_file()
openerp.service.server.start()
if config['pidfile']:
os.unlink(config['pidfile'])
else:
sys.exit(rc)
stop = config["stop_after_init"]
_logger.info('OpenERP server is running, waiting for connections...')
quit_on_signals()
setup_pid_file()
rc = openerp.service.server.start(preload=preload, stop=stop)
if config['pidfile']:
os.unlink(config['pidfile'])
sys.exit(rc)
class Server(Command):
def run(self, args):

View File

@ -168,7 +168,10 @@ class WebRequest(object):
"""
# some magic to lazy create the cr
if not self._cr:
self._cr = self.registry.db.cursor()
if openerp.tools.config['test_enable'] and self.session_id in openerp.tests.common.HTTP_SESSION:
self._cr = openerp.tests.common.HTTP_SESSION[self.session_id]
else:
self._cr = self.registry.db.cursor()
return self._cr
def __enter__(self):
@ -177,7 +180,7 @@ class WebRequest(object):
def __exit__(self, exc_type, exc_value, traceback):
_request_stack.pop()
if self._cr:
if self._cr and not (openerp.tools.config['test_enable'] and self.session_id in openerp.tests.common.HTTP_SESSION):
if exc_type is None:
self._cr.commit()
self._cr.close()

View File

@ -26,16 +26,10 @@
from . import db, graph, loading, migration, module, registry
# TODO temporarily expose those things
from openerp.modules.module import \
get_modules, get_modules_with_version, \
load_information_from_description_file, \
get_module_resource, zip_directory, \
get_module_path, initialize_sys_path, \
load_openerp_module, init_module_models, \
adapt_version
from openerp.modules.loading import load_modules
from openerp.modules.module import get_modules, get_modules_with_version, \
load_information_from_description_file, get_module_resource, get_module_path, \
initialize_sys_path, load_openerp_module, init_module_models, adapt_version
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -182,18 +182,12 @@ def load_module_graph(cr, graph, status=None, perform_checks=True, skip_modules=
migrations.migrate_module(package, 'post')
if has_demo:
# launch tests only in demo mode, as most tests will depend
# on demo data. Other tests can be added into the regular
# 'data' section, but should probably not alter the data,
# as there is no rollback.
# launch tests only in demo mode, allowing tests to use demo data.
if tools.config.options['test_enable']:
# Yamel test
report.record_result(load_test(module_name, idref, mode))
# Run the `fast_suite` and `checks` tests given by the module.
if module_name == 'base':
# Also run the core tests after the database is created.
report.record_result(openerp.modules.module.run_unit_tests('openerp'))
report.record_result(openerp.modules.module.run_unit_tests(module_name))
# Python tests
report.record_result(openerp.modules.module.run_unit_tests(module_name, cr.dbname))
processed_modules.append(package.name)
@ -247,7 +241,6 @@ def load_marked_modules(cr, graph, states, force, progressdict, report, loaded_m
if not processed: break
return processed_modules
def load_modules(db, force_demo=False, status=None, update_module=False):
# TODO status['progress'] reporting is broken: used twice (and reset each
# time to zero) in load_module_graph, not fine-grained enough.

View File

@ -20,32 +20,31 @@
#
##############################################################################
import base64
import imp
import itertools
import logging
import os
from os.path import join as opj
import re
import sys
import types
import zipimport
import openerp.tools as tools
import openerp.tools.osutil as osutil
from openerp.tools.safe_eval import safe_eval as eval
import zipfile
import openerp.release as release
import re
import base64
from zipfile import PyZipFile, ZIP_DEFLATED
from cStringIO import StringIO
from os.path import join as opj
import logging
import unittest2
import openerp
import openerp.tools as tools
import openerp.release as release
from openerp.tools.safe_eval import safe_eval as eval
_logger = logging.getLogger(__name__)
_test_logger = logging.getLogger('openerp.tests')
# addons path ','.joined
_ad = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'addons') # default addons path (base)
# addons path as a list
ad_paths = []
# Modules already loaded
@ -120,7 +119,6 @@ def get_module_path(module, downloaded=False, display_warning=True):
_logger.warning('module %s: module not found', module)
return False
def get_module_filetree(module, dir='.'):
path = get_module_path(module)
if not path:
@ -132,12 +130,7 @@ def get_module_filetree(module, dir='.'):
if dir.startswith('..') or (dir and dir[0] == '/'):
raise Exception('Cannot access file outside the module')
if not os.path.isdir(path):
# zipmodule
zip = zipfile.ZipFile(path + ".zip")
files = ['/'.join(f.split('/')[1:]) for f in zip.namelist()]
else:
files = osutil.listdir(path, True)
files = openerp.tools.osutil.listdir(path, True)
tree = {}
for f in files:
@ -154,68 +147,6 @@ def get_module_filetree(module, dir='.'):
return tree
def zip_directory(directory, b64enc=True, src=True):
"""Compress a directory
@param directory: The directory to compress
@param base64enc: if True the function will encode the zip file with base64
@param src: Integrate the source files
@return: a string containing the zip file
"""
RE_exclude = re.compile('(?:^\..+\.swp$)|(?:\.py[oc]$)|(?:\.bak$)|(?:\.~.~$)', re.I)
def _zippy(archive, path, src=True):
path = os.path.abspath(path)
base = os.path.basename(path)
for f in osutil.listdir(path, True):
bf = os.path.basename(f)
if not RE_exclude.search(bf) and (src or bf == '__openerp__.py' or not bf.endswith('.py')):
archive.write(os.path.join(path, f), os.path.join(base, f))
archname = StringIO()
archive = PyZipFile(archname, "w", ZIP_DEFLATED)
# for Python 2.5, ZipFile.write() still expects 8-bit strings (2.6 converts to utf-8)
directory = tools.ustr(directory).encode('utf-8')
archive.writepy(directory)
_zippy(archive, directory, src=src)
archive.close()
archive_data = archname.getvalue()
archname.close()
if b64enc:
return base64.encodestring(archive_data)
return archive_data
def get_module_as_zip(modulename, b64enc=True, src=True):
"""Generate a module as zip file with the source or not and can do a base64 encoding
@param modulename: The module name
@param b64enc: if True the function will encode the zip file with base64
@param src: Integrate the source files
@return: a stream to store in a file-like object
"""
ap = get_module_path(str(modulename))
if not ap:
raise Exception('Unable to find path for module %s' % modulename)
ap = ap.encode('utf8')
if os.path.isfile(ap + '.zip'):
val = file(ap + '.zip', 'rb').read()
if b64enc:
val = base64.encodestring(val)
else:
val = zip_directory(ap, b64enc, src)
return val
def get_module_resource(module, *args):
"""Return the full path of a resource of the given module.
@ -235,12 +166,6 @@ def get_module_resource(module, *args):
# the module is a directory - ignore zip behavior
if os.path.exists(resource_path):
return resource_path
elif zipfile.is_zipfile(mod_path + '.zip'):
zip = zipfile.ZipFile( mod_path + ".zip")
files = ['/'.join(f.split('/')[1:]) for f in zip.namelist()]
resource_path = '/'.join(args)
if resource_path in files:
return opj(mod_path, resource_path)
return False
def get_module_icon(module):
@ -258,7 +183,7 @@ def load_information_from_description_file(module):
mod_path = get_module_path(module)
if terp_file:
info = {}
if os.path.isfile(terp_file) or zipfile.is_zipfile(mod_path+'.zip'):
if os.path.isfile(terp_file):
# default values for descriptor
info = {
'application': False,
@ -300,7 +225,6 @@ def load_information_from_description_file(module):
_logger.debug('module %s: no __openerp__.py file found.', module)
return {}
def init_module_models(cr, module_name, obj_list):
""" Initialize a list of models.
@ -342,12 +266,7 @@ def load_openerp_module(module_name):
initialize_sys_path()
try:
mod_path = get_module_path(module_name)
zip_mod_path = '' if not mod_path else mod_path + '.zip'
if not os.path.isfile(zip_mod_path):
__import__('openerp.addons.' + module_name)
else:
zimp = zipimport.zipimporter(zip_mod_path)
zimp.load_module(module_name)
__import__('openerp.addons.' + module_name)
# Call the module's post-load hook. This can done before any model or
# data has been initialized. This is ok as the post-load hook is for
@ -357,8 +276,7 @@ def load_openerp_module(module_name):
getattr(sys.modules['openerp.addons.' + module_name], info['post_load'])()
except Exception, e:
mt = isinstance(e, zipimport.ZipImportError) and 'zip ' or ''
msg = "Couldn't load %smodule %s" % (mt, module_name)
msg = "Couldn't load module %s" % (module_name)
_logger.critical(msg)
_logger.critical(e)
raise
@ -378,7 +296,7 @@ def get_modules():
def is_really_module(name):
manifest_name = opj(dir, name, '__openerp__.py')
zipfile_name = opj(dir, name)
return os.path.isfile(manifest_name) or zipfile.is_zipfile(zipfile_name)
return os.path.isfile(manifest_name)
return map(clean, filter(is_really_module, os.listdir(dir)))
plist = []
@ -387,7 +305,6 @@ def get_modules():
plist.extend(listdir(ad))
return list(set(plist))
def get_modules_with_version():
modules = get_modules()
res = dict.fromkeys(modules, adapt_version('1.0'))
@ -405,126 +322,42 @@ def adapt_version(version):
version = '%s.%s' % (serie, version)
return version
def get_test_modules(module):
# backward compatibility for oe
return []
def get_test_modules(module, submodule, explode):
"""
Return a list of submodules containing tests.
`submodule` can be:
- None
- the name of a submodule
- '__fast_suite__'
- '__sanity_checks__'
"""
# Turn command-line module, submodule into importable names.
if module is None:
# Use a custom stream object to log the test executions.
class TestStream(object):
def __init__(self):
self.r = re.compile(r'^-*$|^ *... *$|^ok$')
def flush(self):
pass
elif module == 'openerp':
module = 'openerp.tests'
else:
module = 'openerp.addons.' + module + '.tests'
def write(self, s):
if self.r.match(s):
return
first = True
for c in s.split('\n'):
if not first:
c = '` ' + c
first = False
_test_logger.info(c)
# Try to import the module
try:
__import__(module)
except Exception, e:
if explode:
print 'Can not `import %s`.' % module
import logging
logging.exception('')
sys.exit(1)
else:
if str(e) == 'No module named tests':
# It seems the module has no `tests` sub-module, no problem.
pass
else:
_logger.exception('Can not `import %s`.', module)
return []
# Discover available test sub-modules.
m = sys.modules[module]
submodule_names = sorted([x for x in dir(m) \
if x.startswith('test_') and \
isinstance(getattr(m, x), types.ModuleType)])
submodules = [getattr(m, x) for x in submodule_names]
def show_submodules_and_exit():
if submodule_names:
print 'Available submodules are:'
for x in submodule_names:
print ' ', x
sys.exit(1)
if submodule is None:
# Use auto-discovered sub-modules.
ms = submodules
elif submodule == '__fast_suite__':
# Obtain the explicit test sub-modules list.
ms = getattr(sys.modules[module], 'fast_suite', None)
# `suite` was used before the 6.1 release instead of `fast_suite`.
ms = ms if ms else getattr(sys.modules[module], 'suite', None)
if ms is None:
if explode:
print 'The module `%s` has no defined test suite.' % (module,)
show_submodules_and_exit()
else:
ms = []
elif submodule == '__sanity_checks__':
ms = getattr(sys.modules[module], 'checks', None)
if ms is None:
if explode:
print 'The module `%s` has no defined sanity checks.' % (module,)
show_submodules_and_exit()
else:
ms = []
else:
# Pick the command-line-specified test sub-module.
m = getattr(sys.modules[module], submodule, None)
ms = [m]
if m is None:
if explode:
print 'The module `%s` has no submodule named `%s`.' % \
(module, submodule)
show_submodules_and_exit()
else:
ms = []
return ms
def run_unit_tests(module_name):
def run_unit_tests(module_name, dbname):
"""
Return True or False if some tests were found and succeeded or failed.
Return None if no test was found.
"""
import unittest2
ms = get_test_modules(module_name, '__fast_suite__', explode=False)
# TODO: No need to try again if the above call failed because of e.g. a syntax error.
ms.extend(get_test_modules(module_name, '__sanity_checks__', explode=False))
suite = unittest2.TestSuite()
for m in ms:
suite.addTests(unittest2.TestLoader().loadTestsFromModule(m))
if ms:
_test_logger.info('module %s: executing %s `fast_suite` and/or `checks` sub-modules', module_name, len(ms))
# Use a custom stream object to log the test executions.
class MyStream(object):
def __init__(self):
self.r = re.compile(r'^-*$|^ *... *$|^ok$')
def flush(self):
pass
def write(self, s):
if self.r.match(s):
return
first = True
for c in s.split('\n'):
if not first:
c = '` ' + c
first = False
_test_logger.info(c)
result = unittest2.TextTestRunner(verbosity=2, stream=MyStream()).run(suite)
if result.wasSuccessful():
return True
else:
mods = get_test_modules(module_name)
r = True
for m in mods:
suite = unittest2.TestSuite()
for t in unittest2.TestLoader().loadTestsFromModule(m):
suite.addTest(t)
_logger.log(logging.INFO, 'module %s: running test %s.', module_name, m.__name__)
result = unittest2.TextTestRunner(verbosity=2, stream=TestStream()).run(suite)
if not result.wasSuccessful():
r = False
_logger.error('module %s: at least one error occurred in a test', module_name)
return False
return r
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -193,6 +193,30 @@ class Registry(Mapping):
finally:
cr.close()
class TestRLock(object):
def __init__(self):
self._lock = threading.RLock()
def acquire(self):
if openerp.tools.config['test_enable']:
return
return self._lock.acquire()
def release(self):
if openerp.tools.config['test_enable']:
return
return self._lock.release()
def __enter__(self):
self.acquire()
def __exit__(self, type, value, traceback):
self.release()
# def __enter__(self, *l, **kw):
# if openerp.tools.config['test_enable']:
# return
# return super(TestRlock, self).__enter__(*l, **kw)
# def __exit__(self, *l, **kw):
# if openerp.tools.config['test_enable']:
# return
# return super(TestRlock, self).__exit__(*l, **kw)
class RegistryManager(object):
""" Model registries manager.
@ -204,7 +228,7 @@ class RegistryManager(object):
# Mapping between db name and model registry.
# Accessed through the methods below.
registries = {}
registries_lock = threading.RLock()
registries_lock = TestRLock()
@classmethod
def get(cls, db_name, force_demo=False, status=None, update_module=False):

View File

@ -108,7 +108,7 @@ def check(f):
tries = 0
while True:
try:
if openerp.registry(dbname)._init:
if openerp.registry(dbname)._init and not openerp.tools.config['test_enable']:
raise openerp.exceptions.Warning('Currently, this database is not fully loaded and can not be used.')
return f(dbname, *args, **kwargs)
except OperationalError, e:

View File

@ -9,6 +9,7 @@ import os.path
import platform
import psutil
import random
import re
import resource
import select
import signal
@ -17,6 +18,7 @@ import subprocess
import sys
import threading
import time
import unittest2
import werkzeug.serving
@ -306,7 +308,7 @@ class ThreadedServer(CommonServer):
openerp.modules.registry.RegistryManager.delete_all()
logging.shutdown()
def run(self):
def run(self, preload=None, stop=False):
""" Start the http server and the cron thread then wait for a signal.
The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
@ -314,6 +316,13 @@ class ThreadedServer(CommonServer):
"""
self.start()
preload_registries(preload)
if stop:
self.stop()
return
# Wait for a first signal to be handled. (time.sleep will be interrupted
# by the signal handler.) The try/except is for the win32 case.
try:
@ -362,7 +371,7 @@ class GeventServer(CommonServer):
self.httpd.stop()
gevent.shutdown()
def run(self):
def run(self, preload, stop):
self.start()
self.stop()
@ -569,8 +578,15 @@ class PreforkServer(CommonServer):
self.worker_kill(pid, signal.SIGTERM)
self.socket.close()
def run(self):
def run(self, preload, stop):
self.start()
preload_registries(preload)
if stop:
self.stop()
return
_logger.debug("Multiprocess starting")
while 1:
try:
@ -587,7 +603,7 @@ class PreforkServer(CommonServer):
except Exception,e:
_logger.exception(e)
self.stop(False)
sys.exit(-1)
return -1
class Worker(object):
""" Workers """
@ -809,7 +825,50 @@ def _reexec(updated_modules=None):
args.insert(0, exe)
os.execv(sys.executable, args)
def start():
def load_test_file_yml(test_file):
cr = registry.db.cursor()
openerp.tools.convert_yaml_import(cr, 'base', file(test_file), 'test', {}, 'test', True)
cr.rollback()
cr.close()
def load_test_file_py(test_file):
# Locate python module based on its filename and run the tests
test_path, _ = os.path.splitext(os.path.abspath(test_file))
for mod_name, mod_mod in sys.modules.items():
if mod_mod:
mod_path, _ = os.path.splitext(getattr(mod_mod, '__file__', ''))
if test_path == mod_path:
suite = unittest2.TestSuite()
for t in unittest2.TestLoader().loadTestsFromModule(mod_mod):
suite.addTest(t)
_logger.log(logging.INFO, 'running tests %s.', mod_mod.__name__)
result = unittest2.TextTestRunner(verbosity=2, stream=openerp.modules.module.TestStream()).run(suite)
if not result.wasSuccessful():
r = False
_logger.error('module %s: at least one error occurred in a test', module_name)
def preload_registries(dbnames):
""" Preload a registries, possibly run a test file."""
# TODO: move all config checks to args dont check tools.config here
config = openerp.tools.config
test_file = config['test_file']
dbnames = dbnames or []
for dbname in dbnames:
try:
update_module = config['init'] or config['update']
registry = openerp.modules.registry.RegistryManager.new(dbname, update_module=update_module)
# run test_file if provided
if test_file:
_logger.info('loading test file %s', test_file)
if test_file.endswith('yml'):
load_test_file_yml(test_file)
elif test_file.endswith('py'):
load_test_file_py(test_file)
except Exception:
_logger.exception('Failed to initialize database `%s`.', dbname)
return
def start(preload=None, stop=False):
""" Start the openerp http server and cron processor.
"""
global server
@ -825,7 +884,7 @@ def start():
autoreload = AutoReload(server)
autoreload.run()
server.run()
rc = server.run(preload, stop)
# like the legend of the phoenix, all ends with beginnings
if getattr(openerp, 'phoenix', False):
@ -833,7 +892,8 @@ def start():
if config['auto_reload']:
modules = autoreload.modules.keys()
_reexec(modules)
sys.exit(0)
return rc if rc else 0
def restart():
""" Restart the server

View File

@ -7,43 +7,9 @@ This module groups a few sub-modules containing unittest2 test cases.
Tests can be explicitely added to the `fast_suite` or `checks` lists or not.
See the :ref:`test-framework` section in the :ref:`features` list.
"""
import test_acl
import test_basecase
import test_db_cursor
import test_expression
import test_fields
import test_ir_filters
import test_ir_sequence
import test_mail
import test_misc
import test_orm
import test_osv
import test_translate
import test_view_validation
import test_qweb
import test_func
# This need a change in `oe run-tests` to only run fast_suite + checks by default.
# import test_xmlrpc
fast_suite = [
test_ir_sequence,
test_ir_filters
]
import common
from common import *
checks = [
test_acl,
test_expression,
test_mail,
test_db_cursor,
test_orm,
test_fields,
test_basecase,
test_view_validation,
test_misc,
test_osv,
test_translate,
test_qweb,
test_func,
]
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -3,15 +3,24 @@
The module :mod:`openerp.tests.common` provides a few helpers and classes to write
tests.
"""
import json
import os
import select
import subprocess
import threading
import time
import unittest2
import uuid
import xmlrpclib
import logging
import openerp
_logger = logging.getLogger(__name__)
# The openerp library is supposed already configured.
ADDONS_PATH = openerp.tools.config['addons_path']
HOST = '127.0.0.1'
PORT = openerp.tools.config['xmlrpc_port']
DB = openerp.tools.config['db_name']
@ -22,26 +31,11 @@ DB = openerp.tools.config['db_name']
if not DB and hasattr(threading.current_thread(), 'dbname'):
DB = threading.current_thread().dbname
HOST = '127.0.0.1'
ADMIN_USER = 'admin'
ADMIN_USER_ID = openerp.SUPERUSER_ID
ADMIN_PASSWORD = 'admin'
def start_openerp():
"""
Start the OpenERP server similary to the openerp-server script.
"""
openerp.service.start_services()
# Ugly way to ensure the server is listening.
time.sleep(2)
def stop_openerp():
"""
Shutdown the OpenERP server similarly to a single ctrl-c.
"""
openerp.service.stop_services()
HTTP_SESSION = {}
class BaseCase(unittest2.TestCase):
"""
@ -116,37 +110,106 @@ class SingleTransactionCase(BaseCase):
cls.cr.close()
class RpcCase(unittest2.TestCase):
"""
Subclass of TestCase with a few XML-RPC proxies.
class HttpCase(SingleTransactionCase):
""" Transactionnal HTTP TestCase with a phantomjs helper.
"""
def __init__(self, methodName='runTest'):
super(RpcCase, self).__init__(methodName)
class A(object):
pass
self.proxy = A()
# Use the old (pre 6.1) API.
self.proxy.url_60 = url_60 = 'http://%s:%d/xmlrpc/' % (HOST, PORT)
self.proxy.common_60 = xmlrpclib.ServerProxy(url_60 + 'common')
self.proxy.db_60 = xmlrpclib.ServerProxy(url_60 + 'db')
self.proxy.object_60 = xmlrpclib.ServerProxy(url_60 + 'object')
#self.proxy.edi_60 = xmlrpclib.ServerProxy(url_60 + 'edi')
# Use the new (8) API.
self.proxy.url_8 = url_8 = 'http://%s:%d/xmlrpc/2/' % (HOST, PORT)
self.proxy.common_8 = xmlrpclib.ServerProxy(url_8 + 'common')
self.proxy.db_8 = xmlrpclib.ServerProxy(url_8 + 'db')
self.proxy.object_8 = xmlrpclib.ServerProxy(url_8 + 'object')
super(HttpCase, self).__init__(methodName)
# v8 api with correct xmlrpc exception handling.
self.xmlrpc_url = url_8 = 'http://%s:%d/xmlrpc/2/' % (HOST, PORT)
self.xmlrpc_common = xmlrpclib.ServerProxy(url_8 + 'common')
self.xmlrpc_db = xmlrpclib.ServerProxy(url_8 + 'db')
self.xmlrpc_object = xmlrpclib.ServerProxy(url_8 + 'object')
@classmethod
def generate_database_name(cls):
if hasattr(cls, '_database_id'):
cls._database_id += 1
else:
cls._database_id = 0
return '_fresh_name_' + str(cls._database_id) + '_'
def setUpClass(cls):
super(HttpCase, cls).setUpClass()
cls.session_id = uuid.uuid4().hex
HTTP_SESSION[cls.session_id] = cls.cr
@classmethod
def tearDownClass(cls):
del HTTP_SESSION[cls.session_id]
super(HttpCase, cls).tearDownClass()
def phantomjs(self, jsfile, timeout=30, options=None):
""" Phantomjs Test protocol.
Use console.log in phantomjs to output test results evrey line must be
a one line JSON message using the following format:
- for a success: { "event": "success", "message": "Log message" }
- for an error: { "event": "error", "message": "Short error description" }
if a non json parsable line is received the helper will raise an
exception, the output buffer will be printed and phantom will be
killed
"""
self.timeout = timeout
self.options = {
'timeout' : timeout,
'port': PORT,
'db': DB,
'user': ADMIN_USER,
'password': ADMIN_PASSWORD,
'session_id': self.session_id,
}
if options:
self.options.update(options)
self.ignore_filters = [
# Ignore phantomjs warnings
"*** WARNING:",
# Fixes an issue with PhantomJS 1.9.2 on OS X 10.9 (Mavericks)
# cf. https://github.com/ariya/phantomjs/issues/11418
"CoreText performance note",
]
cmd = ['phantomjs', jsfile, json.dumps(self.options)]
_logger.info('executing %s', cmd)
try:
phantom = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except OSError:
_logger.info("phantomjs not found, test %s skipped", jsfile)
try:
t0 = time.time()
buf = ''
while 1:
if time.time() > t0 + self.timeout:
raise Exception("Phantom js timeout (%ss)" % self.timeout)
ready, _, _ = select.select([phantom.stdout], [], [], 0.5)
if ready:
s = phantom.stdout.read(4096)
if s:
buf += s
else:
break
# process lines
if '\n' in buf:
line, buf = buf.split('\n', 1)
if line not in self.ignore_filters:
try:
line_json = json.loads(line)
if line_json.get('event') == 'success':
_logger.info(line_json.get('message','ok'))
continue
elif line_json.get('event') == 'error':
err = line_json.get('message','error')
_logger.info(err)
else:
err = line + buf
except ValueError:
err = line + buf
raise Exception(err)
finally:
# kill phantomjs if phantom.exit() wasn't called in the test
if phantom.poll() is None:
phantom.terminate()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -1,82 +0,0 @@
# -*- coding: utf-8 -*-
# Run with one of these commands:
# > OPENERP_ADDONS_PATH='../../addons/trunk' OPENERP_PORT=8069 \
# OPENERP_DATABASE=yy PYTHONPATH=. python tests/test_xmlrpc.py
# > OPENERP_ADDONS_PATH='../../addons/trunk' OPENERP_PORT=8069 \
# OPENERP_DATABASE=yy nosetests tests/test_xmlrpc.py
# > OPENERP_ADDONS_PATH='../../../addons/trunk' OPENERP_PORT=8069 \
# OPENERP_DATABASE=yy PYTHONPATH=../:. unit2 test_xmlrpc
import time
import unittest2
import xmlrpclib
import common
DB = None
ADMIN_USER = common.ADMIN_USER
ADMIN_USER_ID = common.ADMIN_USER_ID
ADMIN_PASSWORD = common.ADMIN_PASSWORD
def setUpModule():
common.start_openerp()
global DB
DB = common.RpcCase.generate_database_name()
tearDownModule = common.stop_openerp
class test_xmlrpc(common.RpcCase):
def test_00_xmlrpc_create_database_polling(self):
"""
Simulate a OpenERP client requesting the creation of a database and
polling the server until the creation is complete.
"""
progress_id = self.proxy.db_60.create(ADMIN_PASSWORD,DB, True, False,
ADMIN_PASSWORD)
while True:
time.sleep(1)
progress, users = self.proxy.db_60.get_progress(ADMIN_PASSWORD,
progress_id)
if progress == 1.0:
break
def test_xmlrpc_login(self):
""" Try to login on the common service. """
uid = self.proxy.common_60.login(DB, ADMIN_USER, ADMIN_PASSWORD)
assert uid == ADMIN_USER_ID
def test_xmlrpc_ir_model_search(self):
""" Try a search on the object service. """
o = self.proxy.object_60
ids = o.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD, 'ir.model', 'search', [])
assert ids
ids = o.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD, 'ir.model', 'search', [], {})
assert ids
def test_xmlrpc_8_ir_model_search(self):
""" Try a search on the object service. """
o = self.proxy.object_8
ids = o.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD, 'ir.model', 'search', [])
assert ids
ids = o.execute(DB, ADMIN_USER_ID, ADMIN_PASSWORD, 'ir.model', 'search', [], {})
assert ids
# This test was written to test the creation of a new RPC endpoint, not
# really for the EDI itself.
#def test_xmlrpc_import_edi_document(self):
# """ Try to call an EDI method. """
# msg_re = 'EDI Document is empty!'
# with self.assertRaisesRegexp(Exception, msg_re):
# self.proxy.edi_60.import_edi_document(DB, ADMIN_USER_ID, ADMIN_PASSWORD, {})
def test_zz_xmlrpc_drop_database(self):
"""
Simulate a OpenERP client requesting the deletion of a database.
"""
assert self.proxy.db_60.drop(ADMIN_PASSWORD, DB) is True
if __name__ == '__main__':
unittest2.main()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

View File

@ -157,7 +157,7 @@ class configmanager(object):
# Testing Group
group = optparse.OptionGroup(parser, "Testing Configuration")
group.add_option("--test-file", dest="test_file", my_default=False,
help="Launch a YML test file.")
help="Launch a python or YML test file.")
group.add_option("--test-report-directory", dest="test_report_directory", my_default=False,
help="If set, will save sample of all reports in this directory.")
group.add_option("--test-enable", action="store_true", dest="test_enable",

View File

@ -321,7 +321,6 @@ class YamlInterpreter(object):
view_info = model.fields_view_get(self.cr, SUPERUSER_ID, varg, 'form', context)
record_dict = self._create_record(model, fields, view_info, default=default)
_logger.debug("RECORD_DICT %s" % record_dict)
id = self.pool['ir.model.data']._update(self.cr, SUPERUSER_ID, record.model, \
self.module, record_dict, record.id, noupdate=self.isnoupdate(record), mode=self.mode, context=context)
self.id_map[record.id] = int(id)
@ -931,7 +930,7 @@ class YamlInterpreter(object):
def yaml_import(cr, module, yamlfile, kind, idref=None, mode='init', noupdate=False, report=None):
if idref is None:
idref = {}
loglevel = logging.INFO if kind == 'test' else logging.DEBUG
loglevel = logging.DEBUG
yaml_string = yamlfile.read()
yaml_interpreter = YamlInterpreter(cr, module, idref, mode, filename=yamlfile.name, report=report, noupdate=noupdate, loglevel=loglevel)
yaml_interpreter.process(yaml_string)

View File

@ -14,14 +14,13 @@ from . import initialize
from . import model
from . import module
from . import read
from . import run_tests
from . import scaffold
from . import uninstall
from . import update
from . import web
from . import grunt_tests
command_list_server = (conf, cron, drop, initialize, model, module, read, run_tests,
command_list_server = (conf, cron, drop, initialize, model, module, read,
scaffold, uninstall, update, web, grunt_tests, )
command_list_client = (Call, Open, Show, ConsumeNothing, ConsumeMemory,

View File

@ -22,6 +22,8 @@ def install_openerp(database_name, create_database_flag, module_names, install_d
# Install the import hook, to import openerp.addons.<module>.
openerp.modules.module.initialize_sys_path()
print
registry = openerp.modules.registry.RegistryManager.get(
database_name, update_module=True, force_demo=install_demo_data)
@ -88,6 +90,8 @@ def run(args):
args.addons = []
config['addons_path'] = ','.join(args.addons)
print "MY addons path is", config['addons_path']
if args.all_modules:
module_names = common.get_addons_from_paths(args.addons, args.exclude)
elif args.module:

View File

@ -1,214 +0,0 @@
"""
Execute the unittest2 tests available in OpenERP addons.
"""
import sys
import types
import argparse
import common
def get_test_modules(module, submodule, explode):
"""
Return a list of submodules containing tests.
`submodule` can be:
- None
- the name of a submodule
- '__fast_suite__'
- '__sanity_checks__'
"""
# Turn command-line module, submodule into importable names.
if module is None:
pass
elif module == 'openerp':
module = 'openerp.tests'
else:
module = 'openerp.addons.' + module + '.tests'
# Try to import the module
try:
__import__(module)
except Exception, e:
if explode:
print 'Can not `import %s`.' % module
import logging
logging.exception('')
sys.exit(1)
else:
if str(e) == 'No module named tests':
# It seems the module has no `tests` sub-module, no problem.
pass
else:
print 'Can not `import %s`.' % module
return []
# Discover available test sub-modules.
m = sys.modules[module]
submodule_names = sorted([x for x in dir(m) \
if x.startswith('test_') and \
isinstance(getattr(m, x), types.ModuleType)])
submodules = [getattr(m, x) for x in submodule_names]
def show_submodules_and_exit():
if submodule_names:
print 'Available submodules are:'
for x in submodule_names:
print ' ', x
sys.exit(1)
fast_suite = getattr(m, 'fast_suite', [])
checks = getattr(m, 'checks', [])
if submodule is None:
# Use auto-discovered sub-modules.
ms = submodules
elif submodule == '__fast_suite__':
# `suite` was used before the 6.1 release instead of `fast_suite`.
ms = fast_suite if hasattr(m, 'fast_suite') else getattr(m, 'suite', None)
if not ms:
if explode:
print 'The module `%s` has no defined test suite.' % (module,)
show_submodules_and_exit()
else:
ms = []
elif submodule == '__sanity_checks__':
ms = checks
if not ms:
if explode:
print 'The module `%s` has no defined sanity checks.' % (module,)
show_submodules_and_exit()
else:
ms = []
elif submodule == '__slow_suite__':
ms = list(set(submodules).difference(fast_suite, checks))
else:
# Pick the command-line-specified test sub-module.
m = getattr(m, submodule, None)
ms = [m]
if m is None:
if explode:
print 'The module `%s` has no submodule named `%s`.' % \
(module, submodule)
show_submodules_and_exit()
else:
ms = []
return ms
def run(args):
import unittest2
import openerp
config = openerp.tools.config
config.load()
config['db_name'] = args.database
if args.port:
config['xmlrpc_port'] = int(args.port)
if args.addons:
args.addons = args.addons.replace(':',',').split(',')
else:
args.addons = []
# ensure no duplication in addons paths
args.addons = list(set(args.addons))
config['addons_path'] = ','.join(args.addons)
import logging
openerp.netsvc.init_alternative_logger()
logging.getLogger('openerp').setLevel(logging.CRITICAL)
# Install the import hook, to import openerp.addons.<module>.
openerp.modules.module.initialize_sys_path()
module = args.module
submodule = args.submodule
# Import the necessary modules and get the corresponding suite.
if module is None:
# TODO
modules = common.get_addons_from_paths(args.addons, []) # TODO openerp.addons.base is not included ?
test_modules = []
for module in ['openerp'] + modules:
test_modules.extend(
get_test_modules(module, submodule, explode=False))
else:
test_modules = get_test_modules(module, submodule, explode=True)
print 'Test modules:'
for test_module in test_modules:
print ' ', test_module.__name__
print
sys.stdout.flush()
if not args.dry_run:
suite = unittest2.TestSuite()
for test_module in test_modules:
suite.addTests(unittest2.TestLoader().loadTestsFromModule(test_module))
r = unittest2.TextTestRunner(verbosity=2).run(suite)
if r.errors or r.failures:
sys.exit(1)
def add_parser(subparsers):
parser = subparsers.add_parser('run-tests',
description='Run the OpenERP server and/or addons tests.')
parser.add_argument('-d', '--database', metavar='DATABASE', required=True,
help='the database to test. Depending on the test suites, the '
'database must already exist or not.')
parser.add_argument('-p', '--port', metavar='PORT',
help='the port used for WML-RPC tests')
common.add_addons_argument(parser)
parser.add_argument(
'-m', '--module', metavar='MODULE', action=ModuleAction, default=None,
help="the module to test in `module[.submodule]` notation. "
"Use `openerp` for the core OpenERP tests. "
"Leave empty to run every declared tests. "
"Give a module but no submodule to run all the module's declared "
"tests. If both the module and the submodule are given, "
"the sub-module can be run even if it is not declared in the module.")
group = parser.add_mutually_exclusive_group()
group.add_argument(
'--fast-suite',
dest='submodule', action=GuardAction, nargs=0, const='__fast_suite__',
help='run only the tests explicitly declared in the fast suite (this '
'makes sense only with the bare `module` notation or no module at '
'all).')
group.add_argument(
'--sanity-checks',
dest='submodule', action=GuardAction, nargs=0, const='__sanity_checks__',
help='run only the sanity check tests')
group.add_argument(
'--slow-suite',
dest='submodule', action=GuardAction, nargs=0, const='__slow_suite__',
help="Only run slow tests (tests which are neither in the fast nor in"
" the sanity suite)")
parser.add_argument('--dry-run', action='store_true',
help='do not run the tests')
parser.set_defaults(run=run, submodule=None)
class ModuleAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
split = values.split('.')
if len(split) == 1:
module, submodule = values, None
elif len(split) == 2:
module, submodule = split
else:
raise argparse.ArgumentError(
option_string,
"must have the form 'module[.submodule]', got '%s'" % values)
setattr(namespace, self.dest, module)
if submodule is not None:
setattr(namespace, 'submodule', submodule)
class GuardAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
if getattr(namespace, self.dest, None):
print "%s value provided, ignoring '%s'" % (self.dest, option_string)
return
setattr(namespace, self.dest, self.const)