244 lines
8.4 KiB
Python
244 lines
8.4 KiB
Python
# Copyright (C) 2016 Intel Corporation
|
|
# Released under the MIT license (see COPYING.MIT)
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import logging
|
|
import collections
|
|
import re
|
|
|
|
from oeqa.core.loader import OETestLoader
|
|
from oeqa.core.runner import OETestRunner, OEStreamLogger, xmlEnabled
|
|
|
|
class OETestContext(object):
|
|
loaderClass = OETestLoader
|
|
runnerClass = OETestRunner
|
|
streamLoggerClass = OEStreamLogger
|
|
|
|
files_dir = os.path.abspath(os.path.join(os.path.dirname(
|
|
os.path.abspath(__file__)), "../files"))
|
|
|
|
def __init__(self, td=None, logger=None):
|
|
if not type(td) is dict:
|
|
raise TypeError("td isn't dictionary type")
|
|
|
|
self.td = td
|
|
self.logger = logger
|
|
self._registry = {}
|
|
self._registry['cases'] = collections.OrderedDict()
|
|
self._results = {}
|
|
|
|
def _read_modules_from_manifest(self, manifest):
|
|
if not os.path.exists(manifest):
|
|
raise
|
|
|
|
modules = []
|
|
for line in open(manifest).readlines():
|
|
line = line.strip()
|
|
if line and not line.startswith("#"):
|
|
modules.append(line)
|
|
|
|
return modules
|
|
|
|
def loadTests(self, module_paths, modules=[], tests=[],
|
|
modules_manifest="", modules_required=[], filters={}):
|
|
if modules_manifest:
|
|
modules = self._read_modules_from_manifest(modules_manifest)
|
|
|
|
self.loader = self.loaderClass(self, module_paths, modules, tests,
|
|
modules_required, filters)
|
|
self.suites = self.loader.discover()
|
|
|
|
def runTests(self):
|
|
streamLogger = self.streamLoggerClass(self.logger)
|
|
self.runner = self.runnerClass(self, stream=streamLogger, verbosity=2)
|
|
|
|
self._run_start_time = time.time()
|
|
result = self.runner.run(self.suites)
|
|
self._run_end_time = time.time()
|
|
|
|
return result
|
|
|
|
def logSummary(self, result, component, context_msg=''):
|
|
self.logger.info("SUMMARY:")
|
|
self.logger.info("%s (%s) - Ran %d test%s in %.3fs" % (component,
|
|
context_msg, result.testsRun, result.testsRun != 1 and "s" or "",
|
|
(self._run_end_time - self._run_start_time)))
|
|
|
|
if result.wasSuccessful():
|
|
msg = "%s - OK - All required tests passed" % component
|
|
else:
|
|
msg = "%s - FAIL - Required tests failed" % component
|
|
skipped = len(self._results['skipped'])
|
|
if skipped:
|
|
msg += " (skipped=%d)" % skipped
|
|
self.logger.info(msg)
|
|
|
|
def _getDetailsNotPassed(self, case, type, desc):
|
|
found = False
|
|
|
|
for (scase, msg) in self._results[type]:
|
|
# XXX: When XML reporting is enabled scase is
|
|
# xmlrunner.result._TestInfo instance instead of
|
|
# string.
|
|
if xmlEnabled:
|
|
if case.id() == scase.test_id:
|
|
found = True
|
|
break
|
|
scase_str = scase.test_id
|
|
else:
|
|
if case == scase:
|
|
found = True
|
|
break
|
|
scase_str = str(scase)
|
|
|
|
# When fails at module or class level the class name is passed as string
|
|
# so figure out to see if match
|
|
m = re.search("^setUpModule \((?P<module_name>.*)\)$", scase_str)
|
|
if m:
|
|
if case.__class__.__module__ == m.group('module_name'):
|
|
found = True
|
|
break
|
|
|
|
m = re.search("^setUpClass \((?P<class_name>.*)\)$", scase_str)
|
|
if m:
|
|
class_name = "%s.%s" % (case.__class__.__module__,
|
|
case.__class__.__name__)
|
|
|
|
if class_name == m.group('class_name'):
|
|
found = True
|
|
break
|
|
|
|
if found:
|
|
return (found, msg)
|
|
|
|
return (found, None)
|
|
|
|
def logDetails(self):
|
|
self.logger.info("RESULTS:")
|
|
for case_name in self._registry['cases']:
|
|
case = self._registry['cases'][case_name]
|
|
|
|
result_types = ['failures', 'errors', 'skipped', 'expectedFailures']
|
|
result_desc = ['FAILED', 'ERROR', 'SKIPPED', 'EXPECTEDFAIL']
|
|
|
|
fail = False
|
|
desc = None
|
|
for idx, name in enumerate(result_types):
|
|
(fail, msg) = self._getDetailsNotPassed(case, result_types[idx],
|
|
result_desc[idx])
|
|
if fail:
|
|
desc = result_desc[idx]
|
|
break
|
|
|
|
oeid = -1
|
|
for d in case.decorators:
|
|
if hasattr(d, 'oeid'):
|
|
oeid = d.oeid
|
|
|
|
if fail:
|
|
self.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(),
|
|
oeid, desc))
|
|
if msg:
|
|
self.logger.info(msg)
|
|
else:
|
|
self.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(),
|
|
oeid, 'PASSED'))
|
|
|
|
class OETestContextExecutor(object):
|
|
_context_class = OETestContext
|
|
|
|
name = 'core'
|
|
help = 'core test component example'
|
|
description = 'executes core test suite example'
|
|
|
|
default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)),
|
|
'cases/example')]
|
|
default_test_data = os.path.join(default_cases[0], 'data.json')
|
|
default_tests = None
|
|
|
|
def register_commands(self, logger, subparsers):
|
|
self.parser = subparsers.add_parser(self.name, help=self.help,
|
|
description=self.description, group='components')
|
|
|
|
self.default_output_log = '%s-results-%s.log' % (self.name,
|
|
time.strftime("%Y%m%d%H%M%S"))
|
|
self.parser.add_argument('--output-log', action='store',
|
|
default=self.default_output_log,
|
|
help="results output log, default: %s" % self.default_output_log)
|
|
self.parser.add_argument('--run-tests', action='store',
|
|
default=self.default_tests,
|
|
help="tests to run in <module>[.<class>[.<name>]] format. Just works for modules now")
|
|
|
|
if self.default_test_data:
|
|
self.parser.add_argument('--test-data-file', action='store',
|
|
default=self.default_test_data,
|
|
help="data file to load, default: %s" % self.default_test_data)
|
|
else:
|
|
self.parser.add_argument('--test-data-file', action='store',
|
|
help="data file to load")
|
|
|
|
if self.default_cases:
|
|
self.parser.add_argument('CASES_PATHS', action='store',
|
|
default=self.default_cases, nargs='*',
|
|
help="paths to directories with test cases, default: %s"\
|
|
% self.default_cases)
|
|
else:
|
|
self.parser.add_argument('CASES_PATHS', action='store',
|
|
nargs='+', help="paths to directories with test cases")
|
|
|
|
self.parser.set_defaults(func=self.run)
|
|
|
|
def _setup_logger(self, logger, args):
|
|
formatter = logging.Formatter('%(asctime)s - ' + self.name + \
|
|
' - %(levelname)s - %(message)s')
|
|
sh = logger.handlers[0]
|
|
sh.setFormatter(formatter)
|
|
fh = logging.FileHandler(args.output_log)
|
|
fh.setFormatter(formatter)
|
|
logger.addHandler(fh)
|
|
|
|
return logger
|
|
|
|
def _process_args(self, logger, args):
|
|
self.tc_kwargs = {}
|
|
self.tc_kwargs['init'] = {}
|
|
self.tc_kwargs['load'] = {}
|
|
self.tc_kwargs['run'] = {}
|
|
|
|
self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args)
|
|
if args.test_data_file:
|
|
self.tc_kwargs['init']['td'] = json.load(
|
|
open(args.test_data_file, "r"))
|
|
else:
|
|
self.tc_kwargs['init']['td'] = {}
|
|
|
|
|
|
if args.run_tests:
|
|
self.tc_kwargs['load']['modules'] = args.run_tests.split()
|
|
else:
|
|
self.tc_kwargs['load']['modules'] = None
|
|
|
|
self.module_paths = args.CASES_PATHS
|
|
|
|
def run(self, logger, args):
|
|
self._process_args(logger, args)
|
|
|
|
self.tc = self._context_class(**self.tc_kwargs['init'])
|
|
self.tc.loadTests(self.module_paths, **self.tc_kwargs['load'])
|
|
rc = self.tc.runTests(**self.tc_kwargs['run'])
|
|
self.tc.logSummary(rc, self.name)
|
|
self.tc.logDetails()
|
|
|
|
output_link = os.path.join(os.path.dirname(args.output_log),
|
|
"%s-results.log" % self.name)
|
|
if os.path.exists(output_link):
|
|
os.remove(output_link)
|
|
os.symlink(args.output_log, output_link)
|
|
|
|
return rc
|
|
|
|
_executor_class = OETestContextExecutor
|