diff options
author | Angelos Mouzakitis <a.mouzakitis@virtualopensystems.com> | 2023-10-10 14:33:42 +0000 |
---|---|---|
committer | Angelos Mouzakitis <a.mouzakitis@virtualopensystems.com> | 2023-10-10 14:33:42 +0000 |
commit | af1a266670d040d2f4083ff309d732d648afba2a (patch) | |
tree | 2fc46203448ddcc6f81546d379abfaeb323575e9 /meson/mesonbuild/mtest.py | |
parent | e02cda008591317b1625707ff8e115a4841aa889 (diff) |
Change-Id: Iaf8d18082d3991dec7c0ebbea540f092188eb4ec
Diffstat (limited to 'meson/mesonbuild/mtest.py')
-rw-r--r-- | meson/mesonbuild/mtest.py | 2011 |
1 files changed, 2011 insertions, 0 deletions
diff --git a/meson/mesonbuild/mtest.py b/meson/mesonbuild/mtest.py new file mode 100644 index 000000000..a44f7f291 --- /dev/null +++ b/meson/mesonbuild/mtest.py @@ -0,0 +1,2011 @@ +# Copyright 2016-2017 The Meson development team + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# A tool to run tests in many different ways. + +from pathlib import Path +from collections import deque +from copy import deepcopy +import argparse +import asyncio +import datetime +import enum +import json +import multiprocessing +import os +import pickle +import platform +import random +import re +import signal +import subprocess +import shlex +import sys +import textwrap +import time +import typing as T +import unicodedata +import xml.etree.ElementTree as et + +from . import build +from . import environment +from . import mlog +from .coredata import major_versions_differ, MesonVersionMismatchException +from .coredata import version as coredata_version +from .mesonlib import (MesonException, OrderedSet, RealPathAction, + get_wine_shortpath, join_args, split_args) +from .mintro import get_infodir, load_info_file +from .programs import ExternalProgram +from .backend.backends import TestProtocol, TestSerialisation + +# GNU autotools interprets a return code of 77 from tests it executes to +# mean that the test should be skipped. +GNU_SKIP_RETURNCODE = 77 + +# GNU autotools interprets a return code of 99 from tests it executes to +# mean that the test failed even before testing what it is supposed to test. +GNU_ERROR_RETURNCODE = 99 + +# Exit if 3 Ctrl-C's are received within one second +MAX_CTRLC = 3 + +def is_windows() -> bool: + platname = platform.system().lower() + return platname == 'windows' + +def is_cygwin() -> bool: + return sys.platform == 'cygwin' + +UNIWIDTH_MAPPING = {'F': 2, 'H': 1, 'W': 2, 'Na': 1, 'N': 1, 'A': 1} +def uniwidth(s: str) -> int: + result = 0 + for c in s: + w = unicodedata.east_asian_width(c) + result += UNIWIDTH_MAPPING[w] + return result + +def determine_worker_count() -> int: + varname = 'MESON_TESTTHREADS' + if varname in os.environ: + try: + num_workers = int(os.environ[varname]) + except ValueError: + print(f'Invalid value in {varname}, using 1 thread.') + num_workers = 1 + else: + try: + # Fails in some weird environments such as Debian + # reproducible build. + num_workers = multiprocessing.cpu_count() + except Exception: + num_workers = 1 + return num_workers + +def add_arguments(parser: argparse.ArgumentParser) -> None: + parser.add_argument('--repeat', default=1, dest='repeat', type=int, + help='Number of times to run the tests.') + parser.add_argument('--no-rebuild', default=False, action='store_true', + help='Do not rebuild before running tests.') + parser.add_argument('--gdb', default=False, dest='gdb', action='store_true', + help='Run test under gdb.') + parser.add_argument('--gdb-path', default='gdb', dest='gdb_path', + help='Path to the gdb binary (default: gdb).') + parser.add_argument('--list', default=False, dest='list', action='store_true', + help='List available tests.') + parser.add_argument('--wrapper', default=None, dest='wrapper', type=split_args, + help='wrapper to run tests with (e.g. Valgrind)') + parser.add_argument('-C', dest='wd', action=RealPathAction, + # https://github.com/python/typeshed/issues/3107 + # https://github.com/python/mypy/issues/7177 + type=os.path.abspath, # type: ignore + help='directory to cd into before running') + parser.add_argument('--suite', default=[], dest='include_suites', action='append', metavar='SUITE', + help='Only run tests belonging to the given suite.') + parser.add_argument('--no-suite', default=[], dest='exclude_suites', action='append', metavar='SUITE', + help='Do not run tests belonging to the given suite.') + parser.add_argument('--no-stdsplit', default=True, dest='split', action='store_false', + help='Do not split stderr and stdout in test logs.') + parser.add_argument('--print-errorlogs', default=False, action='store_true', + help="Whether to print failing tests' logs.") + parser.add_argument('--benchmark', default=False, action='store_true', + help="Run benchmarks instead of tests.") + parser.add_argument('--logbase', default='testlog', + help="Base name for log file.") + parser.add_argument('--num-processes', default=determine_worker_count(), type=int, + help='How many parallel processes to use.') + parser.add_argument('-v', '--verbose', default=False, action='store_true', + help='Do not redirect stdout and stderr') + parser.add_argument('-q', '--quiet', default=False, action='store_true', + help='Produce less output to the terminal.') + parser.add_argument('-t', '--timeout-multiplier', type=float, default=None, + help='Define a multiplier for test timeout, for example ' + ' when running tests in particular conditions they might take' + ' more time to execute. (<= 0 to disable timeout)') + parser.add_argument('--setup', default=None, dest='setup', + help='Which test setup to use.') + parser.add_argument('--test-args', default=[], type=split_args, + help='Arguments to pass to the specified test(s) or all tests') + parser.add_argument('args', nargs='*', + help='Optional list of test names to run. "testname" to run all tests with that name, ' + '"subprojname:testname" to specifically run "testname" from "subprojname", ' + '"subprojname:" to run all tests defined by "subprojname".') + + +def print_safe(s: str) -> None: + end = '' if s[-1] == '\n' else '\n' + try: + print(s, end=end) + except UnicodeEncodeError: + s = s.encode('ascii', errors='backslashreplace').decode('ascii') + print(s, end=end) + +def join_lines(a: str, b: str) -> str: + if not a: + return b + if not b: + return a + return a + '\n' + b + +def dashes(s: str, dash: str, cols: int) -> str: + if not s: + return dash * cols + s = ' ' + s + ' ' + width = uniwidth(s) + first = (cols - width) // 2 + s = dash * first + s + return s + dash * (cols - first - width) + +def returncode_to_status(retcode: int) -> str: + # Note: We can't use `os.WIFSIGNALED(result.returncode)` and the related + # functions here because the status returned by subprocess is munged. It + # returns a negative value if the process was killed by a signal rather than + # the raw status returned by `wait()`. Also, If a shell sits between Meson + # the the actual unit test that shell is likely to convert a termination due + # to a signal into an exit status of 128 plus the signal number. + if retcode < 0: + signum = -retcode + try: + signame = signal.Signals(signum).name + except ValueError: + signame = 'SIGinvalid' + return f'killed by signal {signum} {signame}' + + if retcode <= 128: + return f'exit status {retcode}' + + signum = retcode - 128 + try: + signame = signal.Signals(signum).name + except ValueError: + signame = 'SIGinvalid' + return f'(exit status {retcode} or signal {signum} {signame})' + +# TODO for Windows +sh_quote: T.Callable[[str], str] = lambda x: x +if not is_windows(): + sh_quote = shlex.quote + +def env_tuple_to_str(env: T.Iterable[T.Tuple[str, str]]) -> str: + return ''.join(["{}={} ".format(k, sh_quote(v)) for k, v in env]) + + +class TestException(MesonException): + pass + + +@enum.unique +class ConsoleUser(enum.Enum): + + # the logger can use the console + LOGGER = 0 + + # the console is used by gdb + GDB = 1 + + # the console is used to write stdout/stderr + STDOUT = 2 + + +@enum.unique +class TestResult(enum.Enum): + + PENDING = 'PENDING' + RUNNING = 'RUNNING' + OK = 'OK' + TIMEOUT = 'TIMEOUT' + INTERRUPT = 'INTERRUPT' + SKIP = 'SKIP' + FAIL = 'FAIL' + EXPECTEDFAIL = 'EXPECTEDFAIL' + UNEXPECTEDPASS = 'UNEXPECTEDPASS' + ERROR = 'ERROR' + + @staticmethod + def maxlen() -> int: + return 14 # len(UNEXPECTEDPASS) + + def is_ok(self) -> bool: + return self in {TestResult.OK, TestResult.EXPECTEDFAIL} + + def is_bad(self) -> bool: + return self in {TestResult.FAIL, TestResult.TIMEOUT, TestResult.INTERRUPT, + TestResult.UNEXPECTEDPASS, TestResult.ERROR} + + def is_finished(self) -> bool: + return self not in {TestResult.PENDING, TestResult.RUNNING} + + def was_killed(self) -> bool: + return self in (TestResult.TIMEOUT, TestResult.INTERRUPT) + + def colorize(self, s: str) -> mlog.AnsiDecorator: + if self.is_bad(): + decorator = mlog.red + elif self in (TestResult.SKIP, TestResult.EXPECTEDFAIL): + decorator = mlog.yellow + elif self.is_finished(): + decorator = mlog.green + else: + decorator = mlog.blue + return decorator(s) + + def get_text(self, colorize: bool) -> str: + result_str = '{res:{reslen}}'.format(res=self.value, reslen=self.maxlen()) + return self.colorize(result_str).get_text(colorize) + + def get_command_marker(self) -> str: + return str(self.colorize('>>> ')) + + +TYPE_TAPResult = T.Union['TAPParser.Test', 'TAPParser.Error', 'TAPParser.Version', 'TAPParser.Plan', 'TAPParser.Bailout'] + +class TAPParser: + class Plan(T.NamedTuple): + num_tests: int + late: bool + skipped: bool + explanation: T.Optional[str] + + class Bailout(T.NamedTuple): + message: str + + class Test(T.NamedTuple): + number: int + name: str + result: TestResult + explanation: T.Optional[str] + + def __str__(self) -> str: + return f'{self.number} {self.name}'.strip() + + class Error(T.NamedTuple): + message: str + + class Version(T.NamedTuple): + version: int + + _MAIN = 1 + _AFTER_TEST = 2 + _YAML = 3 + + _RE_BAILOUT = re.compile(r'Bail out!\s*(.*)') + _RE_DIRECTIVE = re.compile(r'(?:\s*\#\s*([Ss][Kk][Ii][Pp]\S*|[Tt][Oo][Dd][Oo])\b\s*(.*))?') + _RE_PLAN = re.compile(r'1\.\.([0-9]+)' + _RE_DIRECTIVE.pattern) + _RE_TEST = re.compile(r'((?:not )?ok)\s*(?:([0-9]+)\s*)?([^#]*)' + _RE_DIRECTIVE.pattern) + _RE_VERSION = re.compile(r'TAP version ([0-9]+)') + _RE_YAML_START = re.compile(r'(\s+)---.*') + _RE_YAML_END = re.compile(r'\s+\.\.\.\s*') + + found_late_test = False + bailed_out = False + plan: T.Optional[Plan] = None + lineno = 0 + num_tests = 0 + yaml_lineno: T.Optional[int] = None + yaml_indent = '' + state = _MAIN + version = 12 + + def parse_test(self, ok: bool, num: int, name: str, directive: T.Optional[str], explanation: T.Optional[str]) -> \ + T.Generator[T.Union['TAPParser.Test', 'TAPParser.Error'], None, None]: + name = name.strip() + explanation = explanation.strip() if explanation else None + if directive is not None: + directive = directive.upper() + if directive.startswith('SKIP'): + if ok: + yield self.Test(num, name, TestResult.SKIP, explanation) + return + elif directive == 'TODO': + yield self.Test(num, name, TestResult.UNEXPECTEDPASS if ok else TestResult.EXPECTEDFAIL, explanation) + return + else: + yield self.Error(f'invalid directive "{directive}"') + + yield self.Test(num, name, TestResult.OK if ok else TestResult.FAIL, explanation) + + async def parse_async(self, lines: T.AsyncIterator[str]) -> T.AsyncIterator[TYPE_TAPResult]: + async for line in lines: + for event in self.parse_line(line): + yield event + for event in self.parse_line(None): + yield event + + def parse(self, io: T.Iterator[str]) -> T.Iterator[TYPE_TAPResult]: + for line in io: + yield from self.parse_line(line) + yield from self.parse_line(None) + + def parse_line(self, line: T.Optional[str]) -> T.Iterator[TYPE_TAPResult]: + if line is not None: + self.lineno += 1 + line = line.rstrip() + + # YAML blocks are only accepted after a test + if self.state == self._AFTER_TEST: + if self.version >= 13: + m = self._RE_YAML_START.match(line) + if m: + self.state = self._YAML + self.yaml_lineno = self.lineno + self.yaml_indent = m.group(1) + return + self.state = self._MAIN + + elif self.state == self._YAML: + if self._RE_YAML_END.match(line): + self.state = self._MAIN + return + if line.startswith(self.yaml_indent): + return + yield self.Error(f'YAML block not terminated (started on line {self.yaml_lineno})') + self.state = self._MAIN + + assert self.state == self._MAIN + if line.startswith('#'): + return + + m = self._RE_TEST.match(line) + if m: + if self.plan and self.plan.late and not self.found_late_test: + yield self.Error('unexpected test after late plan') + self.found_late_test = True + self.num_tests += 1 + num = self.num_tests if m.group(2) is None else int(m.group(2)) + if num != self.num_tests: + yield self.Error('out of order test numbers') + yield from self.parse_test(m.group(1) == 'ok', num, + m.group(3), m.group(4), m.group(5)) + self.state = self._AFTER_TEST + return + + m = self._RE_PLAN.match(line) + if m: + if self.plan: + yield self.Error('more than one plan found') + else: + num_tests = int(m.group(1)) + skipped = (num_tests == 0) + if m.group(2): + if m.group(2).upper().startswith('SKIP'): + if num_tests > 0: + yield self.Error('invalid SKIP directive for plan') + skipped = True + else: + yield self.Error('invalid directive for plan') + self.plan = self.Plan(num_tests=num_tests, late=(self.num_tests > 0), + skipped=skipped, explanation=m.group(3)) + yield self.plan + return + + m = self._RE_BAILOUT.match(line) + if m: + yield self.Bailout(m.group(1)) + self.bailed_out = True + return + + m = self._RE_VERSION.match(line) + if m: + # The TAP version is only accepted as the first line + if self.lineno != 1: + yield self.Error('version number must be on the first line') + return + self.version = int(m.group(1)) + if self.version < 13: + yield self.Error('version number should be at least 13') + else: + yield self.Version(version=self.version) + return + + if not line: + return + + yield self.Error('unexpected input at line {}'.format((self.lineno,))) + else: + # end of file + if self.state == self._YAML: + yield self.Error(f'YAML block not terminated (started on line {self.yaml_lineno})') + + if not self.bailed_out and self.plan and self.num_tests != self.plan.num_tests: + if self.num_tests < self.plan.num_tests: + yield self.Error(f'Too few tests run (expected {self.plan.num_tests}, got {self.num_tests})') + else: + yield self.Error(f'Too many tests run (expected {self.plan.num_tests}, got {self.num_tests})') + +class TestLogger: + def flush(self) -> None: + pass + + def start(self, harness: 'TestHarness') -> None: + pass + + def start_test(self, harness: 'TestHarness', test: 'TestRun') -> None: + pass + + def log_subtest(self, harness: 'TestHarness', test: 'TestRun', s: str, res: TestResult) -> None: + pass + + def log(self, harness: 'TestHarness', result: 'TestRun') -> None: + pass + + async def finish(self, harness: 'TestHarness') -> None: + pass + + def close(self) -> None: + pass + + +class TestFileLogger(TestLogger): + def __init__(self, filename: str, errors: str = 'replace') -> None: + self.filename = filename + self.file = open(filename, 'w', encoding='utf-8', errors=errors) + + def close(self) -> None: + if self.file: + self.file.close() + self.file = None + + +class ConsoleLogger(TestLogger): + SPINNER = "\U0001f311\U0001f312\U0001f313\U0001f314" + \ + "\U0001f315\U0001f316\U0001f317\U0001f318" + + SCISSORS = "\u2700 " + HLINE = "\u2015" + RTRI = "\u25B6 " + + def __init__(self) -> None: + self.update = asyncio.Event() + self.running_tests = OrderedSet() # type: OrderedSet['TestRun'] + self.progress_test = None # type: T.Optional['TestRun'] + self.progress_task = None # type: T.Optional[asyncio.Future] + self.max_left_width = 0 # type: int + self.stop = False + self.update = asyncio.Event() + self.should_erase_line = '' + self.test_count = 0 + self.started_tests = 0 + self.spinner_index = 0 + try: + self.cols, _ = os.get_terminal_size(1) + self.is_tty = True + except OSError: + self.cols = 80 + self.is_tty = False + + self.output_start = dashes(self.SCISSORS, self.HLINE, self.cols - 2) + self.output_end = dashes('', self.HLINE, self.cols - 2) + self.sub = self.RTRI + try: + self.output_start.encode(sys.stdout.encoding or 'ascii') + except UnicodeEncodeError: + self.output_start = dashes('8<', '-', self.cols - 2) + self.output_end = dashes('', '-', self.cols - 2) + self.sub = '| ' + + def flush(self) -> None: + if self.should_erase_line: + print(self.should_erase_line, end='') + self.should_erase_line = '' + + def print_progress(self, line: str) -> None: + print(self.should_erase_line, line, sep='', end='\r') + self.should_erase_line = '\x1b[K' + + def request_update(self) -> None: + self.update.set() + + def emit_progress(self, harness: 'TestHarness') -> None: + if self.progress_test is None: + self.flush() + return + + if len(self.running_tests) == 1: + count = f'{self.started_tests}/{self.test_count}' + else: + count = '{}-{}/{}'.format(self.started_tests - len(self.running_tests) + 1, + self.started_tests, self.test_count) + + left = '[{}] {} '.format(count, self.SPINNER[self.spinner_index]) + self.spinner_index = (self.spinner_index + 1) % len(self.SPINNER) + + right = '{spaces} {dur:{durlen}}'.format( + spaces=' ' * TestResult.maxlen(), + dur=int(time.time() - self.progress_test.starttime), + durlen=harness.duration_max_len) + if self.progress_test.timeout: + right += '/{timeout:{durlen}}'.format( + timeout=self.progress_test.timeout, + durlen=harness.duration_max_len) + right += 's' + detail = self.progress_test.detail + if detail: + right += ' ' + detail + + line = harness.format(self.progress_test, colorize=True, + max_left_width=self.max_left_width, + left=left, right=right) + self.print_progress(line) + + def start(self, harness: 'TestHarness') -> None: + async def report_progress() -> None: + loop = asyncio.get_event_loop() + next_update = 0.0 + self.request_update() + while not self.stop: + await self.update.wait() + self.update.clear() + + # We may get here simply because the progress line has been + # overwritten, so do not always switch. Only do so every + # second, or if the printed test has finished + if loop.time() >= next_update: + self.progress_test = None + next_update = loop.time() + 1 + loop.call_at(next_update, self.request_update) + + if (self.progress_test and + self.progress_test.res is not TestResult.RUNNING): + self.progress_test = None + + if not self.progress_test: + if not self.running_tests: + continue + # Pick a test in round robin order + self.progress_test = self.running_tests.pop(last=False) + self.running_tests.add(self.progress_test) + + self.emit_progress(harness) + self.flush() + + self.test_count = harness.test_count + self.cols = max(self.cols, harness.max_left_width + 30) + + if self.is_tty and not harness.need_console: + # Account for "[aa-bb/cc] OO " in the progress report + self.max_left_width = 3 * len(str(self.test_count)) + 8 + self.progress_task = asyncio.ensure_future(report_progress()) + + def start_test(self, harness: 'TestHarness', test: 'TestRun') -> None: + if harness.options.verbose and test.cmdline: + self.flush() + print(harness.format(test, mlog.colorize_console(), + max_left_width=self.max_left_width, + right=test.res.get_text(mlog.colorize_console()))) + print(test.res.get_command_marker() + test.cmdline) + if test.needs_parsing: + pass + elif not test.is_parallel: + print(self.output_start, flush=True) + else: + print(flush=True) + + self.started_tests += 1 + self.running_tests.add(test) + self.running_tests.move_to_end(test, last=False) + self.request_update() + + def shorten_log(self, harness: 'TestHarness', result: 'TestRun') -> str: + if not harness.options.verbose and not harness.options.print_errorlogs: + return '' + + log = result.get_log(mlog.colorize_console(), + stderr_only=result.needs_parsing) + if harness.options.verbose: + return log + + lines = log.splitlines() + if len(lines) < 100: + return log + else: + return str(mlog.bold('Listing only the last 100 lines from a long log.\n')) + '\n'.join(lines[-100:]) + + def print_log(self, harness: 'TestHarness', result: 'TestRun') -> None: + if not harness.options.verbose: + cmdline = result.cmdline + if not cmdline: + print(result.res.get_command_marker() + result.stdo) + return + print(result.res.get_command_marker() + cmdline) + + log = self.shorten_log(harness, result) + if log: + print(self.output_start) + print_safe(log) + print(self.output_end) + + def log_subtest(self, harness: 'TestHarness', test: 'TestRun', s: str, result: TestResult) -> None: + if harness.options.verbose or (harness.options.print_errorlogs and result.is_bad()): + self.flush() + print(harness.format(test, mlog.colorize_console(), max_left_width=self.max_left_width, + prefix=self.sub, + middle=s, + right=result.get_text(mlog.colorize_console())), flush=True) + + self.request_update() + + def log(self, harness: 'TestHarness', result: 'TestRun') -> None: + self.running_tests.remove(result) + if result.res is TestResult.TIMEOUT and harness.options.verbose: + self.flush() + print(f'{result.name} time out (After {result.timeout} seconds)') + + if not harness.options.quiet or not result.res.is_ok(): + self.flush() + if harness.options.verbose and not result.is_parallel and result.cmdline: + if not result.needs_parsing: + print(self.output_end) + print(harness.format(result, mlog.colorize_console(), max_left_width=self.max_left_width)) + else: + print(harness.format(result, mlog.colorize_console(), max_left_width=self.max_left_width), + flush=True) + if harness.options.verbose or result.res.is_bad(): + self.print_log(harness, result) + if harness.options.verbose or result.res.is_bad(): + print(flush=True) + + self.request_update() + + async def finish(self, harness: 'TestHarness') -> None: + self.stop = True + self.request_update() + if self.progress_task: + await self.progress_task + + if harness.collected_failures and \ + (harness.options.print_errorlogs or harness.options.verbose): + print("\nSummary of Failures:\n") + for i, result in enumerate(harness.collected_failures, 1): + print(harness.format(result, mlog.colorize_console())) + + print(harness.summary()) + + +class TextLogfileBuilder(TestFileLogger): + def start(self, harness: 'TestHarness') -> None: + self.file.write(f'Log of Meson test suite run on {datetime.datetime.now().isoformat()}\n\n') + inherit_env = env_tuple_to_str(os.environ.items()) + self.file.write(f'Inherited environment: {inherit_env}\n\n') + + def log(self, harness: 'TestHarness', result: 'TestRun') -> None: + self.file.write(harness.format(result, False) + '\n') + cmdline = result.cmdline + if cmdline: + starttime_str = time.strftime("%H:%M:%S", time.gmtime(result.starttime)) + self.file.write(starttime_str + ' ' + cmdline + '\n') + self.file.write(dashes('output', '-', 78) + '\n') + self.file.write(result.get_log()) + self.file.write(dashes('', '-', 78) + '\n\n') + + async def finish(self, harness: 'TestHarness') -> None: + if harness.collected_failures: + self.file.write("\nSummary of Failures:\n\n") + for i, result in enumerate(harness.collected_failures, 1): + self.file.write(harness.format(result, False) + '\n') + self.file.write(harness.summary()) + + print(f'Full log written to {self.filename}') + + +class JsonLogfileBuilder(TestFileLogger): + def log(self, harness: 'TestHarness', result: 'TestRun') -> None: + jresult = {'name': result.name, + 'stdout': result.stdo, + 'result': result.res.value, + 'starttime': result.starttime, + 'duration': result.duration, + 'returncode': result.returncode, + 'env': result.env, + 'command': result.cmd} # type: T.Dict[str, T.Any] + if result.stde: + jresult['stderr'] = result.stde + self.file.write(json.dumps(jresult) + '\n') + + +class JunitBuilder(TestLogger): + + """Builder for Junit test results. + + Junit is impossible to stream out, it requires attributes counting the + total number of tests, failures, skips, and errors in the root element + and in each test suite. As such, we use a builder class to track each + test case, and calculate all metadata before writing it out. + + For tests with multiple results (like from a TAP test), we record the + test as a suite with the project_name.test_name. This allows us to track + each result separately. For tests with only one result (such as exit-code + tests) we record each one into a suite with the name project_name. The use + of the project_name allows us to sort subproject tests separately from + the root project. + """ + + def __init__(self, filename: str) -> None: + self.filename = filename + self.root = et.Element( + 'testsuites', tests='0', errors='0', failures='0') + self.suites = {} # type: T.Dict[str, et.Element] + + def log(self, harness: 'TestHarness', test: 'TestRun') -> None: + """Log a single test case.""" + if test.junit is not None: + for suite in test.junit.findall('.//testsuite'): + # Assume that we don't need to merge anything here... + suite.attrib['name'] = '{}.{}.{}'.format(test.project, test.name, suite.attrib['name']) + + # GTest can inject invalid attributes + for case in suite.findall('.//testcase[@result]'): + del case.attrib['result'] + for case in suite.findall('.//testcase[@timestamp]'): + del case.attrib['timestamp'] + self.root.append(suite) + return + + # In this case we have a test binary with multiple results. + # We want to record this so that each result is recorded + # separately + if test.results: + suitename = f'{test.project}.{test.name}' + assert suitename not in self.suites or harness.options.repeat > 1, 'duplicate suite' + + suite = self.suites[suitename] = et.Element( + 'testsuite', + name=suitename, + tests=str(len(test.results)), + errors=str(sum(1 for r in test.results if r.result in + {TestResult.INTERRUPT, TestResult.ERROR})), + failures=str(sum(1 for r in test.results if r.result in + {TestResult.FAIL, TestResult.UNEXPECTEDPASS, TestResult.TIMEOUT})), + skipped=str(sum(1 for r in test.results if r.result is TestResult.SKIP)), + time=str(test.duration), + ) + + for subtest in test.results: + # Both name and classname are required. Use the suite name as + # the class name, so that e.g. GitLab groups testcases correctly. + testcase = et.SubElement(suite, 'testcase', name=str(subtest), classname=suitename) + if subtest.result is TestResult.SKIP: + et.SubElement(testcase, 'skipped') + elif subtest.result is TestResult.ERROR: + et.SubElement(testcase, 'error') + elif subtest.result is TestResult.FAIL: + et.SubElement(testcase, 'failure') + elif subtest.result is TestResult.UNEXPECTEDPASS: + fail = et.SubElement(testcase, 'failure') + fail.text = 'Test unexpected passed.' + elif subtest.result is TestResult.INTERRUPT: + fail = et.SubElement(testcase, 'error') + fail.text = 'Test was interrupted by user.' + elif subtest.result is TestResult.TIMEOUT: + fail = et.SubElement(testcase, 'error') + fail.text = 'Test did not finish before configured timeout.' + if subtest.explanation: + et.SubElement(testcase, 'system-out').text = subtest.explanation + if test.stdo: + out = et.SubElement(suite, 'system-out') + out.text = test.stdo.rstrip() + if test.stde: + err = et.SubElement(suite, 'system-err') + err.text = test.stde.rstrip() + else: + if test.project not in self.suites: + suite = self.suites[test.project] = et.Element( + 'testsuite', name=test.project, tests='1', errors='0', + failures='0', skipped='0', time=str(test.duration)) + else: + suite = self.suites[test.project] + suite.attrib['tests'] = str(int(suite.attrib['tests']) + 1) + + testcase = et.SubElement(suite, 'testcase', name=test.name, + classname=test.project, time=str(test.duration)) + if test.res is TestResult.SKIP: + et.SubElement(testcase, 'skipped') + suite.attrib['skipped'] = str(int(suite.attrib['skipped']) + 1) + elif test.res is TestResult.ERROR: + et.SubElement(testcase, 'error') + suite.attrib['errors'] = str(int(suite.attrib['errors']) + 1) + elif test.res is TestResult.FAIL: + et.SubElement(testcase, 'failure') + suite.attrib['failures'] = str(int(suite.attrib['failures']) + 1) + if test.stdo: + out = et.SubElement(testcase, 'system-out') + out.text = test.stdo.rstrip() + if test.stde: + err = et.SubElement(testcase, 'system-err') + err.text = test.stde.rstrip() + + async def finish(self, harness: 'TestHarness') -> None: + """Calculate total test counts and write out the xml result.""" + for suite in self.suites.values(): + self.root.append(suite) + # Skipped is really not allowed in the "testsuits" element + for attr in ['tests', 'errors', 'failures']: + self.root.attrib[attr] = str(int(self.root.attrib[attr]) + int(suite.attrib[attr])) + + tree = et.ElementTree(self.root) + with open(self.filename, 'wb') as f: + tree.write(f, encoding='utf-8', xml_declaration=True) + + +class TestRun: + TEST_NUM = 0 + PROTOCOL_TO_CLASS: T.Dict[TestProtocol, T.Type['TestRun']] = {} + + def __new__(cls, test: TestSerialisation, *args: T.Any, **kwargs: T.Any) -> T.Any: + return super().__new__(TestRun.PROTOCOL_TO_CLASS[test.protocol]) + + def __init__(self, test: TestSerialisation, test_env: T.Dict[str, str], + name: str, timeout: T.Optional[int], is_parallel: bool): + self.res = TestResult.PENDING + self.test = test + self._num = None # type: T.Optional[int] + self.name = name + self.timeout = timeout + self.results = list() # type: T.List[TAPParser.Test] + self.returncode = 0 + self.starttime = None # type: T.Optional[float] + self.duration = None # type: T.Optional[float] + self.stdo = None # type: T.Optional[str] + self.stde = None # type: T.Optional[str] + self.cmd = None # type: T.Optional[T.List[str]] + self.env = test_env # type: T.Dict[str, str] + self.should_fail = test.should_fail + self.project = test.project_name + self.junit = None # type: T.Optional[et.ElementTree] + self.is_parallel = is_parallel + + def start(self, cmd: T.List[str]) -> None: + self.res = TestResult.RUNNING + self.starttime = time.time() + self.cmd = cmd + + @property + def num(self) -> int: + if self._num is None: + TestRun.TEST_NUM += 1 + self._num = TestRun.TEST_NUM + return self._num + + @property + def detail(self) -> str: + if self.res is TestResult.PENDING: + return '' + if self.returncode: + return returncode_to_status(self.returncode) + if self.results: + # running or succeeded + passed = sum(x.result.is_ok() for x in self.results) + ran = sum(x.result is not TestResult.SKIP for x in self.results) + if passed == ran: + return f'{passed} subtests passed' + else: + return f'{passed}/{ran} subtests passed' + return '' + + def _complete(self, returncode: int, res: TestResult, + stdo: T.Optional[str], stde: T.Optional[str]) -> None: + assert isinstance(res, TestResult) + if self.should_fail and res in (TestResult.OK, TestResult.FAIL): + res = TestResult.UNEXPECTEDPASS if res.is_ok() else TestResult.EXPECTEDFAIL + + self.res = res + self.returncode = returncode + self.duration = time.time() - self.starttime + self.stdo = stdo + self.stde = stde + + @property + def cmdline(self) -> T.Optional[str]: + if not self.cmd: + return None + test_only_env = set(self.env.items()) - set(os.environ.items()) + return env_tuple_to_str(test_only_env) + \ + ' '.join(sh_quote(x) for x in self.cmd) + + def complete_skip(self, message: str) -> None: + self.starttime = time.time() + self._complete(GNU_SKIP_RETURNCODE, TestResult.SKIP, message, None) + + def complete(self, returncode: int, res: TestResult, + stdo: T.Optional[str], stde: T.Optional[str]) -> None: + self._complete(returncode, res, stdo, stde) + + def get_log(self, colorize: bool = False, stderr_only: bool = False) -> str: + stdo = '' if stderr_only else self.stdo + if self.stde: + res = '' + if stdo: + res += mlog.cyan('stdout:').get_text(colorize) + '\n' + res += stdo + if res[-1:] != '\n': + res += '\n' + res += mlog.cyan('stderr:').get_text(colorize) + '\n' + res += self.stde + else: + res = stdo + if res and res[-1:] != '\n': + res += '\n' + return res + + @property + def needs_parsing(self) -> bool: + return False + + async def parse(self, harness: 'TestHarness', lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: + async for l in lines: + pass + return TestResult.OK, '' + + +class TestRunExitCode(TestRun): + + def complete(self, returncode: int, res: TestResult, + stdo: T.Optional[str], stde: T.Optional[str]) -> None: + if res: + pass + elif returncode == GNU_SKIP_RETURNCODE: + res = TestResult.SKIP + elif returncode == GNU_ERROR_RETURNCODE: + res = TestResult.ERROR + else: + res = TestResult.FAIL if bool(returncode) else TestResult.OK + super().complete(returncode, res, stdo, stde) + +TestRun.PROTOCOL_TO_CLASS[TestProtocol.EXITCODE] = TestRunExitCode + + +class TestRunGTest(TestRunExitCode): + def complete(self, returncode: int, res: TestResult, + stdo: T.Optional[str], stde: T.Optional[str]) -> None: + filename = f'{self.test.name}.xml' + if self.test.workdir: + filename = os.path.join(self.test.workdir, filename) + + try: + self.junit = et.parse(filename) + except FileNotFoundError: + # This can happen if the test fails to run or complete for some + # reason, like the rpath for libgtest isn't properly set. ExitCode + # will handle the failure, don't generate a stacktrace. + pass + + super().complete(returncode, res, stdo, stde) + +TestRun.PROTOCOL_TO_CLASS[TestProtocol.GTEST] = TestRunGTest + + +class TestRunTAP(TestRun): + @property + def needs_parsing(self) -> bool: + return True + + def complete(self, returncode: int, res: TestResult, + stdo: str, stde: str) -> None: + if returncode != 0 and not res.was_killed(): + res = TestResult.ERROR + stde = stde or '' + stde += f'\n(test program exited with status code {returncode})' + + super().complete(returncode, res, stdo, stde) + + async def parse(self, harness: 'TestHarness', lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: + res = TestResult.OK + error = '' + + async for i in TAPParser().parse_async(lines): + if isinstance(i, TAPParser.Bailout): + res = TestResult.ERROR + harness.log_subtest(self, i.message, res) + elif isinstance(i, TAPParser.Test): + self.results.append(i) + if i.result.is_bad(): + res = TestResult.FAIL + harness.log_subtest(self, i.name or f'subtest {i.number}', i.result) + elif isinstance(i, TAPParser.Error): + error = '\nTAP parsing error: ' + i.message + res = TestResult.ERROR + + if all(t.result is TestResult.SKIP for t in self.results): + # This includes the case where self.results is empty + res = TestResult.SKIP + return res, error + +TestRun.PROTOCOL_TO_CLASS[TestProtocol.TAP] = TestRunTAP + + +class TestRunRust(TestRun): + @property + def needs_parsing(self) -> bool: + return True + + async def parse(self, harness: 'TestHarness', lines: T.AsyncIterator[str]) -> T.Tuple[TestResult, str]: + def parse_res(n: int, name: str, result: str) -> TAPParser.Test: + if result == 'ok': + return TAPParser.Test(n, name, TestResult.OK, None) + elif result == 'ignored': + return TAPParser.Test(n, name, TestResult.SKIP, None) + elif result == 'FAILED': + return TAPParser.Test(n, name, TestResult.FAIL, None) + return TAPParser.Test(n, name, TestResult.ERROR, + f'Unsupported output from rust test: {result}') + + n = 1 + async for line in lines: + if line.startswith('test ') and not line.startswith('test result'): + _, name, _, result = line.rstrip().split(' ') + name = name.replace('::', '.') + t = parse_res(n, name, result) + self.results.append(t) + harness.log_subtest(self, name, t.result) + n += 1 + + if all(t.result is TestResult.SKIP for t in self.results): + # This includes the case where self.results is empty + return TestResult.SKIP, '' + elif any(t.result is TestResult.ERROR for t in self.results): + return TestResult.ERROR, '' + elif any(t.result is TestResult.FAIL for t in self.results): + return TestResult.FAIL, '' + return TestResult.OK, '' + +TestRun.PROTOCOL_TO_CLASS[TestProtocol.RUST] = TestRunRust + + +def decode(stream: T.Union[None, bytes]) -> str: + if stream is None: + return '' + try: + return stream.decode('utf-8') + except UnicodeDecodeError: + return stream.decode('iso-8859-1', errors='ignore') + +async def read_decode(reader: asyncio.StreamReader, console_mode: ConsoleUser) -> str: + stdo_lines = [] + try: + while not reader.at_eof(): + line = decode(await reader.readline()) + stdo_lines.append(line) + if console_mode is ConsoleUser.STDOUT: + print(line, end='', flush=True) + return ''.join(stdo_lines) + except asyncio.CancelledError: + return ''.join(stdo_lines) + +# Extract lines out of the StreamReader. Print them +# along the way if requested, and at the end collect +# them all into a future. +async def read_decode_lines(reader: asyncio.StreamReader, q: 'asyncio.Queue[T.Optional[str]]', + console_mode: ConsoleUser) -> str: + stdo_lines = [] + try: + while not reader.at_eof(): + line = decode(await reader.readline()) + stdo_lines.append(line) + if console_mode is ConsoleUser.STDOUT: + print(line, end='', flush=True) + await q.put(line) + return ''.join(stdo_lines) + except asyncio.CancelledError: + return ''.join(stdo_lines) + finally: + await q.put(None) + +def run_with_mono(fname: str) -> bool: + return fname.endswith('.exe') and not (is_windows() or is_cygwin()) + +def check_testdata(objs: T.List[TestSerialisation]) -> T.List[TestSerialisation]: + if not isinstance(objs, list): + raise MesonVersionMismatchException('<unknown>', coredata_version) + for obj in objs: + if not isinstance(obj, TestSerialisation): + raise MesonVersionMismatchException('<unknown>', coredata_version) + if not hasattr(obj, 'version'): + raise MesonVersionMismatchException('<unknown>', coredata_version) + if major_versions_differ(obj.version, coredata_version): + raise MesonVersionMismatchException(obj.version, coredata_version) + return objs + +# Custom waiting primitives for asyncio + +async def try_wait_one(*awaitables: T.Any, timeout: T.Optional[T.Union[int, float]]) -> None: + """Wait for completion of one of the given futures, ignoring timeouts.""" + await asyncio.wait(awaitables, + timeout=timeout, return_when=asyncio.FIRST_COMPLETED) + +async def queue_iter(q: 'asyncio.Queue[T.Optional[str]]') -> T.AsyncIterator[str]: + while True: + item = await q.get() + q.task_done() + if item is None: + break + yield item + +async def complete(future: asyncio.Future) -> None: + """Wait for completion of the given future, ignoring cancellation.""" + try: + await future + except asyncio.CancelledError: + pass + +async def complete_all(futures: T.Iterable[asyncio.Future], + timeout: T.Optional[T.Union[int, float]] = None) -> None: + """Wait for completion of all the given futures, ignoring cancellation. + If timeout is not None, raise an asyncio.TimeoutError after the given + time has passed. asyncio.TimeoutError is only raised if some futures + have not completed and none have raised exceptions, even if timeout + is zero.""" + + def check_futures(futures: T.Iterable[asyncio.Future]) -> None: + # Raise exceptions if needed + left = False + for f in futures: + if not f.done(): + left = True + elif not f.cancelled(): + f.result() + if left: + raise asyncio.TimeoutError + + # Python is silly and does not have a variant of asyncio.wait with an + # absolute time as deadline. + deadline = None if timeout is None else asyncio.get_event_loop().time() + timeout + while futures and (timeout is None or timeout > 0): + done, futures = await asyncio.wait(futures, timeout=timeout, + return_when=asyncio.FIRST_EXCEPTION) + check_futures(done) + if deadline: + timeout = deadline - asyncio.get_event_loop().time() + + check_futures(futures) + + +class TestSubprocess: + def __init__(self, p: asyncio.subprocess.Process, + stdout: T.Optional[int], stderr: T.Optional[int], + postwait_fn: T.Callable[[], None] = None): + self._process = p + self.stdout = stdout + self.stderr = stderr + self.stdo_task = None # type: T.Optional[asyncio.Future[str]] + self.stde_task = None # type: T.Optional[asyncio.Future[str]] + self.postwait_fn = postwait_fn # type: T.Callable[[], None] + self.all_futures = [] # type: T.List[asyncio.Future] + + def stdout_lines(self, console_mode: ConsoleUser) -> T.AsyncIterator[str]: + q = asyncio.Queue() # type: asyncio.Queue[T.Optional[str]] + decode_coro = read_decode_lines(self._process.stdout, q, console_mode) + self.stdo_task = asyncio.ensure_future(decode_coro) + return queue_iter(q) + + def communicate(self, console_mode: ConsoleUser) -> T.Tuple[T.Optional[T.Awaitable[str]], + T.Optional[T.Awaitable[str]]]: + # asyncio.ensure_future ensures that printing can + # run in the background, even before it is awaited + if self.stdo_task is None and self.stdout is not None: + decode_coro = read_decode(self._process.stdout, console_mode) + self.stdo_task = asyncio.ensure_future(decode_coro) + self.all_futures.append(self.stdo_task) + if self.stderr is not None and self.stderr != asyncio.subprocess.STDOUT: + decode_coro = read_decode(self._process.stderr, console_mode) + self.stde_task = asyncio.ensure_future(decode_coro) + self.all_futures.append(self.stde_task) + + return self.stdo_task, self.stde_task + + async def _kill(self) -> T.Optional[str]: + # Python does not provide multiplatform support for + # killing a process and all its children so we need + # to roll our own. + p = self._process + try: + if is_windows(): + subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)]) + else: + # Send a termination signal to the process group that setsid() + # created - giving it a chance to perform any cleanup. + os.killpg(p.pid, signal.SIGTERM) + + # Make sure the termination signal actually kills the process + # group, otherwise retry with a SIGKILL. + await try_wait_one(p.wait(), timeout=0.5) + if p.returncode is not None: + return None + + os.killpg(p.pid, signal.SIGKILL) + + await try_wait_one(p.wait(), timeout=1) + if p.returncode is not None: + return None + + # An earlier kill attempt has not worked for whatever reason. + # Try to kill it one last time with a direct call. + # If the process has spawned children, they will remain around. + p.kill() + await try_wait_one(p.wait(), timeout=1) + if p.returncode is not None: + return None + return 'Test process could not be killed.' + except ProcessLookupError: + # Sometimes (e.g. with Wine) this happens. There's nothing + # we can do, probably the process already died so just wait + # for the event loop to pick that up. + await p.wait() + return None + finally: + if self.stdo_task: + self.stdo_task.cancel() + if self.stde_task: + self.stde_task.cancel() + + async def wait(self, timeout: T.Optional[int]) -> T.Tuple[int, TestResult, T.Optional[str]]: + p = self._process + result = None + additional_error = None + + self.all_futures.append(asyncio.ensure_future(p.wait())) + try: + await complete_all(self.all_futures, timeout=timeout) + except asyncio.TimeoutError: + additional_error = await self._kill() + result = TestResult.TIMEOUT + except asyncio.CancelledError: + # The main loop must have seen Ctrl-C. + additional_error = await self._kill() + result = TestResult.INTERRUPT + finally: + if self.postwait_fn: + self.postwait_fn() + + return p.returncode or 0, result, additional_error + +class SingleTestRunner: + + def __init__(self, test: TestSerialisation, env: T.Dict[str, str], name: str, + options: argparse.Namespace): + self.test = test + self.options = options + self.cmd = self._get_cmd() + + if self.cmd and self.test.extra_paths: + env['PATH'] = os.pathsep.join(self.test.extra_paths + ['']) + env['PATH'] + winecmd = [] + for c in self.cmd: + winecmd.append(c) + if os.path.basename(c).startswith('wine'): + env['WINEPATH'] = get_wine_shortpath( + winecmd, + ['Z:' + p for p in self.test.extra_paths] + env.get('WINEPATH', '').split(';') + ) + break + + # If MALLOC_PERTURB_ is not set, or if it is set to an empty value, + # (i.e., the test or the environment don't explicitly set it), set + # it ourselves. We do this unconditionally for regular tests + # because it is extremely useful to have. + # Setting MALLOC_PERTURB_="0" will completely disable this feature. + if ('MALLOC_PERTURB_' not in env or not env['MALLOC_PERTURB_']) and not options.benchmark: + env['MALLOC_PERTURB_'] = str(random.randint(1, 255)) + + if self.options.gdb or self.test.timeout is None or self.test.timeout <= 0: + timeout = None + elif self.options.timeout_multiplier is None: + timeout = self.test.timeout + elif self.options.timeout_multiplier <= 0: + timeout = None + else: + timeout = self.test.timeout * self.options.timeout_multiplier + + is_parallel = test.is_parallel and self.options.num_processes > 1 and not self.options.gdb + self.runobj = TestRun(test, env, name, timeout, is_parallel) + + if self.options.gdb: + self.console_mode = ConsoleUser.GDB + elif self.options.verbose and not is_parallel and not self.runobj.needs_parsing: + self.console_mode = ConsoleUser.STDOUT + else: + self.console_mode = ConsoleUser.LOGGER + + def _get_test_cmd(self) -> T.Optional[T.List[str]]: + if self.test.fname[0].endswith('.jar'): + return ['java', '-jar'] + self.test.fname + elif not self.test.is_cross_built and run_with_mono(self.test.fname[0]): + return ['mono'] + self.test.fname + elif self.test.cmd_is_built and self.test.is_cross_built and self.test.needs_exe_wrapper: + if self.test.exe_runner is None: + # Can not run test on cross compiled executable + # because there is no execute wrapper. + return None + elif self.test.cmd_is_built: + # If the command is not built (ie, its a python script), + # then we don't check for the exe-wrapper + if not self.test.exe_runner.found(): + msg = ('The exe_wrapper defined in the cross file {!r} was not ' + 'found. Please check the command and/or add it to PATH.') + raise TestException(msg.format(self.test.exe_runner.name)) + return self.test.exe_runner.get_command() + self.test.fname + return self.test.fname + + def _get_cmd(self) -> T.Optional[T.List[str]]: + test_cmd = self._get_test_cmd() + if not test_cmd: + return None + return TestHarness.get_wrapper(self.options) + test_cmd + + @property + def is_parallel(self) -> bool: + return self.runobj.is_parallel + + @property + def visible_name(self) -> str: + return self.runobj.name + + @property + def timeout(self) -> T.Optional[int]: + return self.runobj.timeout + + async def run(self, harness: 'TestHarness') -> TestRun: + if self.cmd is None: + skip_stdout = 'Not run because can not execute cross compiled binaries.' + harness.log_start_test(self.runobj) + self.runobj.complete_skip(skip_stdout) + else: + cmd = self.cmd + self.test.cmd_args + self.options.test_args + self.runobj.start(cmd) + harness.log_start_test(self.runobj) + await self._run_cmd(harness, cmd) + return self.runobj + + async def _run_subprocess(self, args: T.List[str], *, + stdout: int, stderr: int, + env: T.Dict[str, str], cwd: T.Optional[str]) -> TestSubprocess: + # Let gdb handle ^C instead of us + if self.options.gdb: + previous_sigint_handler = signal.getsignal(signal.SIGINT) + # Make the meson executable ignore SIGINT while gdb is running. + signal.signal(signal.SIGINT, signal.SIG_IGN) + + def preexec_fn() -> None: + if self.options.gdb: + # Restore the SIGINT handler for the child process to + # ensure it can handle it. + signal.signal(signal.SIGINT, signal.SIG_DFL) + else: + # We don't want setsid() in gdb because gdb needs the + # terminal in order to handle ^C and not show tcsetpgrp() + # errors avoid not being able to use the terminal. + os.setsid() + + def postwait_fn() -> None: + if self.options.gdb: + # Let us accept ^C again + signal.signal(signal.SIGINT, previous_sigint_handler) + + p = await asyncio.create_subprocess_exec(*args, + stdout=stdout, + stderr=stderr, + env=env, + cwd=cwd, + preexec_fn=preexec_fn if not is_windows() else None) + return TestSubprocess(p, stdout=stdout, stderr=stderr, + postwait_fn=postwait_fn if not is_windows() else None) + + async def _run_cmd(self, harness: 'TestHarness', cmd: T.List[str]) -> None: + if self.console_mode is ConsoleUser.GDB: + stdout = None + stderr = None + else: + stdout = asyncio.subprocess.PIPE + stderr = asyncio.subprocess.STDOUT \ + if not self.options.split and not self.runobj.needs_parsing \ + else asyncio.subprocess.PIPE + + extra_cmd = [] # type: T.List[str] + if self.test.protocol is TestProtocol.GTEST: + gtestname = self.test.name + if self.test.workdir: + gtestname = os.path.join(self.test.workdir, self.test.name) + extra_cmd.append(f'--gtest_output=xml:{gtestname}.xml') + + p = await self._run_subprocess(cmd + extra_cmd, + stdout=stdout, + stderr=stderr, + env=self.runobj.env, + cwd=self.test.workdir) + + parse_task = None + if self.runobj.needs_parsing: + parse_coro = self.runobj.parse(harness, p.stdout_lines(self.console_mode)) + parse_task = asyncio.ensure_future(parse_coro) + + stdo_task, stde_task = p.communicate(self.console_mode) + returncode, result, additional_error = await p.wait(self.runobj.timeout) + + if parse_task is not None: + res, error = await parse_task + if error: + additional_error = join_lines(additional_error, error) + result = result or res + + stdo = await stdo_task if stdo_task else '' + stde = await stde_task if stde_task else '' + stde = join_lines(stde, additional_error) + self.runobj.complete(returncode, result, stdo, stde) + + +class TestHarness: + def __init__(self, options: argparse.Namespace): + self.options = options + self.collected_failures = [] # type: T.List[TestRun] + self.fail_count = 0 + self.expectedfail_count = 0 + self.unexpectedpass_count = 0 + self.success_count = 0 + self.skip_count = 0 + self.timeout_count = 0 + self.test_count = 0 + self.name_max_len = 0 + self.is_run = False + self.loggers = [] # type: T.List[TestLogger] + self.loggers.append(ConsoleLogger()) + self.need_console = False + + self.logfile_base = None # type: T.Optional[str] + if self.options.logbase and not self.options.gdb: + namebase = None + self.logfile_base = os.path.join(self.options.wd, 'meson-logs', self.options.logbase) + + if self.options.wrapper: + namebase = os.path.basename(self.get_wrapper(self.options)[0]) + elif self.options.setup: + namebase = self.options.setup.replace(":", "_") + + if namebase: + self.logfile_base += '-' + namebase.replace(' ', '_') + + startdir = os.getcwd() + try: + os.chdir(self.options.wd) + self.build_data = build.load(os.getcwd()) + if not self.options.setup: + self.options.setup = self.build_data.test_setup_default_name + if self.options.benchmark: + self.tests = self.load_tests('meson_benchmark_setup.dat') + else: + self.tests = self.load_tests('meson_test_setup.dat') + finally: + os.chdir(startdir) + + ss = set() + for t in self.tests: + for s in t.suite: + ss.add(s) + self.suites = list(ss) + + def load_tests(self, file_name: str) -> T.List[TestSerialisation]: + datafile = Path('meson-private') / file_name + if not datafile.is_file(): + raise TestException(f'Directory {self.options.wd!r} does not seem to be a Meson build directory.') + with datafile.open('rb') as f: + objs = check_testdata(pickle.load(f)) + return objs + + def __enter__(self) -> 'TestHarness': + return self + + def __exit__(self, exc_type: T.Any, exc_value: T.Any, traceback: T.Any) -> None: + self.close_logfiles() + + def close_logfiles(self) -> None: + for l in self.loggers: + l.close() + + def get_test_setup(self, test: T.Optional[TestSerialisation]) -> build.TestSetup: + if ':' in self.options.setup: + if self.options.setup not in self.build_data.test_setups: + sys.exit(f"Unknown test setup '{self.options.setup}'.") + return self.build_data.test_setups[self.options.setup] + else: + full_name = test.project_name + ":" + self.options.setup + if full_name not in self.build_data.test_setups: + sys.exit(f"Test setup '{self.options.setup}' not found from project '{test.project_name}'.") + return self.build_data.test_setups[full_name] + + def merge_setup_options(self, options: argparse.Namespace, test: TestSerialisation) -> T.Dict[str, str]: + current = self.get_test_setup(test) + if not options.gdb: + options.gdb = current.gdb + if options.gdb: + options.verbose = True + if options.timeout_multiplier is None: + options.timeout_multiplier = current.timeout_multiplier + # if options.env is None: + # options.env = current.env # FIXME, should probably merge options here. + if options.wrapper is None: + options.wrapper = current.exe_wrapper + elif current.exe_wrapper: + sys.exit('Conflict: both test setup and command line specify an exe wrapper.') + return current.env.get_env(os.environ.copy()) + + def get_test_runner(self, test: TestSerialisation) -> SingleTestRunner: + name = self.get_pretty_suite(test) + options = deepcopy(self.options) + if self.options.setup: + env = self.merge_setup_options(options, test) + else: + env = os.environ.copy() + test_env = test.env.get_env(env) + env.update(test_env) + if (test.is_cross_built and test.needs_exe_wrapper and + test.exe_runner and test.exe_runner.found()): + env['MESON_EXE_WRAPPER'] = join_args(test.exe_runner.get_command()) + return SingleTestRunner(test, env, name, options) + + def process_test_result(self, result: TestRun) -> None: + if result.res is TestResult.TIMEOUT: + self.timeout_count += 1 + elif result.res is TestResult.SKIP: + self.skip_count += 1 + elif result.res is TestResult.OK: + self.success_count += 1 + elif result.res in {TestResult.FAIL, TestResult.ERROR, TestResult.INTERRUPT}: + self.fail_count += 1 + elif result.res is TestResult.EXPECTEDFAIL: + self.expectedfail_count += 1 + elif result.res is TestResult.UNEXPECTEDPASS: + self.unexpectedpass_count += 1 + else: + sys.exit(f'Unknown test result encountered: {result.res}') + + if result.res.is_bad(): + self.collected_failures.append(result) + for l in self.loggers: + l.log(self, result) + + @property + def numlen(self) -> int: + return len(str(self.test_count)) + + @property + def max_left_width(self) -> int: + return 2 * self.numlen + 2 + + def format(self, result: TestRun, colorize: bool, + max_left_width: int = 0, + prefix: str = '', + left: T.Optional[str] = None, + middle: T.Optional[str] = None, + right: T.Optional[str] = None) -> str: + + if left is None: + left = '{num:{numlen}}/{testcount} '.format( + numlen=self.numlen, + num=result.num, + testcount=self.test_count) + + # A non-default max_left_width lets the logger print more stuff before the + # name, while ensuring that the rightmost columns remain aligned. + max_left_width = max(max_left_width, self.max_left_width) + + if middle is None: + middle = result.name + extra_mid_width = max_left_width + self.name_max_len + 1 - uniwidth(middle) - uniwidth(left) - uniwidth(prefix) + middle += ' ' * max(1, extra_mid_width) + + if right is None: + right = '{res} {dur:{durlen}.2f}s'.format( + res=result.res.get_text(colorize), + dur=result.duration, + durlen=self.duration_max_len + 3) + detail = result.detail + if detail: + right += ' ' + detail + return prefix + left + middle + right + + def summary(self) -> str: + return textwrap.dedent(''' + + Ok: {:<4} + Expected Fail: {:<4} + Fail: {:<4} + Unexpected Pass: {:<4} + Skipped: {:<4} + Timeout: {:<4} + ''').format(self.success_count, self.expectedfail_count, self.fail_count, + self.unexpectedpass_count, self.skip_count, self.timeout_count) + + def total_failure_count(self) -> int: + return self.fail_count + self.unexpectedpass_count + self.timeout_count + + def doit(self) -> int: + if self.is_run: + raise RuntimeError('Test harness object can only be used once.') + self.is_run = True + tests = self.get_tests() + if not tests: + return 0 + if not self.options.no_rebuild and not rebuild_deps(self.options.wd, tests): + # We return 125 here in case the build failed. + # The reason is that exit code 125 tells `git bisect run` that the current + # commit should be skipped. Thus users can directly use `meson test` to + # bisect without needing to handle the does-not-build case separately in a + # wrapper script. + sys.exit(125) + + self.name_max_len = max([uniwidth(self.get_pretty_suite(test)) for test in tests]) + startdir = os.getcwd() + try: + os.chdir(self.options.wd) + runners = [] # type: T.List[SingleTestRunner] + for i in range(self.options.repeat): + runners.extend(self.get_test_runner(test) for test in tests) + if i == 0: + self.duration_max_len = max([len(str(int(runner.timeout or 99))) + for runner in runners]) + # Disable the progress report if it gets in the way + self.need_console = any(runner.console_mode is not ConsoleUser.LOGGER + for runner in runners) + + self.test_count = len(runners) + self.run_tests(runners) + finally: + os.chdir(startdir) + return self.total_failure_count() + + @staticmethod + def split_suite_string(suite: str) -> T.Tuple[str, str]: + if ':' in suite: + split = suite.split(':', 1) + assert len(split) == 2 + return split[0], split[1] + else: + return suite, "" + + @staticmethod + def test_in_suites(test: TestSerialisation, suites: T.List[str]) -> bool: + for suite in suites: + (prj_match, st_match) = TestHarness.split_suite_string(suite) + for prjst in test.suite: + (prj, st) = TestHarness.split_suite_string(prjst) + + # the SUITE can be passed as + # suite_name + # or + # project_name:suite_name + # so we need to select only the test belonging to project_name + + # this if handle the first case (i.e., SUITE == suite_name) + + # in this way we can run tests belonging to different + # (sub)projects which share the same suite_name + if not st_match and st == prj_match: + return True + + # these two conditions are needed to handle the second option + # i.e., SUITE == project_name:suite_name + + # in this way we select the only the tests of + # project_name with suite_name + if prj_match and prj != prj_match: + continue + if st_match and st != st_match: + continue + return True + return False + + def test_suitable(self, test: TestSerialisation) -> bool: + if TestHarness.test_in_suites(test, self.options.exclude_suites): + return False + + if self.options.include_suites: + # Both force inclusion (overriding add_test_setup) and exclude + # everything else + return TestHarness.test_in_suites(test, self.options.include_suites) + + if self.options.setup: + setup = self.get_test_setup(test) + if TestHarness.test_in_suites(test, setup.exclude_suites): + return False + + return True + + def tests_from_args(self, tests: T.List[TestSerialisation]) -> T.Generator[TestSerialisation, None, None]: + ''' + Allow specifying test names like "meson test foo1 foo2", where test('foo1', ...) + + Also support specifying the subproject to run tests from like + "meson test subproj:" (all tests inside subproj) or "meson test subproj:foo1" + to run foo1 inside subproj. Coincidentally also "meson test :foo1" to + run all tests with that name across all subprojects, which is + identical to "meson test foo1" + ''' + for arg in self.options.args: + if ':' in arg: + subproj, name = arg.split(':', maxsplit=1) + else: + subproj, name = '', arg + for t in tests: + if subproj and t.project_name != subproj: + continue + if name and t.name != name: + continue + yield t + + def get_tests(self) -> T.List[TestSerialisation]: + if not self.tests: + print('No tests defined.') + return [] + + tests = [t for t in self.tests if self.test_suitable(t)] + if self.options.args: + tests = list(self.tests_from_args(tests)) + + if not tests: + print('No suitable tests defined.') + return [] + + return tests + + def flush_logfiles(self) -> None: + for l in self.loggers: + l.flush() + + def open_logfiles(self) -> None: + if not self.logfile_base: + return + + self.loggers.append(JunitBuilder(self.logfile_base + '.junit.xml')) + self.loggers.append(JsonLogfileBuilder(self.logfile_base + '.json')) + self.loggers.append(TextLogfileBuilder(self.logfile_base + '.txt', errors='surrogateescape')) + + @staticmethod + def get_wrapper(options: argparse.Namespace) -> T.List[str]: + wrap = [] # type: T.List[str] + if options.gdb: + wrap = [options.gdb_path, '--quiet', '--nh'] + if options.repeat > 1: + wrap += ['-ex', 'run', '-ex', 'quit'] + # Signal the end of arguments to gdb + wrap += ['--args'] + if options.wrapper: + wrap += options.wrapper + return wrap + + def get_pretty_suite(self, test: TestSerialisation) -> str: + if len(self.suites) > 1 and test.suite: + rv = TestHarness.split_suite_string(test.suite[0])[0] + s = "+".join(TestHarness.split_suite_string(s)[1] for s in test.suite) + if s: + rv += ":" + return rv + s + " / " + test.name + else: + return test.name + + def run_tests(self, runners: T.List[SingleTestRunner]) -> None: + try: + self.open_logfiles() + # Replace with asyncio.run once we can require Python 3.7 + loop = asyncio.get_event_loop() + loop.run_until_complete(self._run_tests(runners)) + finally: + self.close_logfiles() + + def log_subtest(self, test: TestRun, s: str, res: TestResult) -> None: + for l in self.loggers: + l.log_subtest(self, test, s, res) + + def log_start_test(self, test: TestRun) -> None: + for l in self.loggers: + l.start_test(self, test) + + async def _run_tests(self, runners: T.List[SingleTestRunner]) -> None: + semaphore = asyncio.Semaphore(self.options.num_processes) + futures = deque() # type: T.Deque[asyncio.Future] + running_tests = dict() # type: T.Dict[asyncio.Future, str] + interrupted = False + ctrlc_times = deque(maxlen=MAX_CTRLC) # type: T.Deque[float] + + async def run_test(test: SingleTestRunner) -> None: + async with semaphore: + if interrupted or (self.options.repeat > 1 and self.fail_count): + return + res = await test.run(self) + self.process_test_result(res) + + def test_done(f: asyncio.Future) -> None: + if not f.cancelled(): + f.result() + futures.remove(f) + try: + del running_tests[f] + except KeyError: + pass + + def cancel_one_test(warn: bool) -> None: + future = futures.popleft() + futures.append(future) + if warn: + self.flush_logfiles() + mlog.warning('CTRL-C detected, interrupting {}'.format(running_tests[future])) + del running_tests[future] + future.cancel() + + def cancel_all_tests() -> None: + nonlocal interrupted + interrupted = True + while running_tests: + cancel_one_test(False) + + def sigterm_handler() -> None: + if interrupted: + return + self.flush_logfiles() + mlog.warning('Received SIGTERM, exiting') + cancel_all_tests() + + def sigint_handler() -> None: + # We always pick the longest-running future that has not been cancelled + # If all the tests have been CTRL-C'ed, just stop + nonlocal interrupted + if interrupted: + return + ctrlc_times.append(asyncio.get_event_loop().time()) + if len(ctrlc_times) == MAX_CTRLC and ctrlc_times[-1] - ctrlc_times[0] < 1: + self.flush_logfiles() + mlog.warning('CTRL-C detected, exiting') + cancel_all_tests() + elif running_tests: + cancel_one_test(True) + else: + self.flush_logfiles() + mlog.warning('CTRL-C detected, exiting') + interrupted = True + + for l in self.loggers: + l.start(self) + + if sys.platform != 'win32': + if os.getpgid(0) == os.getpid(): + asyncio.get_event_loop().add_signal_handler(signal.SIGINT, sigint_handler) + else: + asyncio.get_event_loop().add_signal_handler(signal.SIGINT, sigterm_handler) + asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, sigterm_handler) + try: + for runner in runners: + if not runner.is_parallel: + await complete_all(futures) + future = asyncio.ensure_future(run_test(runner)) + futures.append(future) + running_tests[future] = runner.visible_name + future.add_done_callback(test_done) + if not runner.is_parallel: + await complete(future) + if self.options.repeat > 1 and self.fail_count: + break + + await complete_all(futures) + finally: + if sys.platform != 'win32': + asyncio.get_event_loop().remove_signal_handler(signal.SIGINT) + asyncio.get_event_loop().remove_signal_handler(signal.SIGTERM) + for l in self.loggers: + await l.finish(self) + +def list_tests(th: TestHarness) -> bool: + tests = th.get_tests() + for t in tests: + print(th.get_pretty_suite(t)) + return not tests + +def rebuild_deps(wd: str, tests: T.List[TestSerialisation]) -> bool: + def convert_path_to_target(path: str) -> str: + path = os.path.relpath(path, wd) + if os.sep != '/': + path = path.replace(os.sep, '/') + return path + + if not (Path(wd) / 'build.ninja').is_file(): + print('Only ninja backend is supported to rebuild tests before running them.') + return True + + ninja = environment.detect_ninja() + if not ninja: + print("Can't find ninja, can't rebuild test.") + return False + + depends = set() # type: T.Set[str] + targets = set() # type: T.Set[str] + intro_targets = dict() # type: T.Dict[str, T.List[str]] + for target in load_info_file(get_infodir(wd), kind='targets'): + intro_targets[target['id']] = [ + convert_path_to_target(f) + for f in target['filename']] + for t in tests: + for d in t.depends: + if d in depends: + continue + depends.update(d) + targets.update(intro_targets[d]) + + ret = subprocess.run(ninja + ['-C', wd] + sorted(targets)).returncode + if ret != 0: + print(f'Could not rebuild {wd}') + return False + + return True + +def run(options: argparse.Namespace) -> int: + if options.benchmark: + options.num_processes = 1 + + if options.verbose and options.quiet: + print('Can not be both quiet and verbose at the same time.') + return 1 + + check_bin = None + if options.gdb: + options.verbose = True + if options.wrapper: + print('Must not specify both a wrapper and gdb at the same time.') + return 1 + check_bin = 'gdb' + + if options.wrapper: + check_bin = options.wrapper[0] + + if sys.platform == 'win32': + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + + if check_bin is not None: + exe = ExternalProgram(check_bin, silent=True) + if not exe.found(): + print(f'Could not find requested program: {check_bin!r}') + return 1 + + with TestHarness(options) as th: + try: + if options.list: + return list_tests(th) + return th.doit() + except TestException as e: + print('Meson test encountered an error:\n') + if os.environ.get('MESON_FORCE_BACKTRACE'): + raise e + else: + print(e) + return 1 + +def run_with_args(args: T.List[str]) -> int: + parser = argparse.ArgumentParser(prog='meson test') + add_arguments(parser) + options = parser.parse_args(args) + return run(options) |