|
| 1 | +# mypy: disable-error-code="no-untyped-def" |
| 2 | +# this is a modified version of [pytest-regtest](https://gitlab.com/uweschmitt/pytest-regtest) |
| 3 | +# which is licensed under the MIT license |
| 4 | +# author: Uwe Schmitt (https://gitlab.com/uweschmitt) |
| 5 | +# this is mainly a copy of the original code, with some modifications to make it work with our tests and |
| 6 | +# to work with python 3.12 |
| 7 | +import difflib |
| 8 | +import functools |
| 9 | +import os |
| 10 | +import re |
| 11 | +import sys |
| 12 | +import tempfile |
| 13 | +from hashlib import sha512 |
| 14 | +from io import StringIO |
| 15 | + |
| 16 | +import pytest |
| 17 | +from _pytest._code.code import ExceptionInfo, TerminalRepr |
| 18 | +from _pytest._io.terminalwriter import TerminalWriter |
| 19 | +from _pytest.outcomes import skip |
| 20 | + |
| 21 | +pytest_plugins = ["pytester"] |
| 22 | + |
| 23 | + |
| 24 | +IS_WIN = sys.platform == "win32" |
| 25 | + |
| 26 | + |
| 27 | +open = functools.partial(open, encoding="utf-8") |
| 28 | + |
| 29 | + |
| 30 | +_converters_pre = [] |
| 31 | +_converters_post = [] |
| 32 | + |
| 33 | + |
| 34 | +def register_converter_pre(function): |
| 35 | + if function not in _converters_pre: |
| 36 | + _converters_pre.append(function) |
| 37 | + |
| 38 | + |
| 39 | +def register_converter_post(function): |
| 40 | + if function not in _converters_post: |
| 41 | + _converters_post.append(function) |
| 42 | + |
| 43 | + |
| 44 | +def _std_replacements(request): |
| 45 | + if "tmpdir" in request.fixturenames: |
| 46 | + tmpdir = request.getfixturevalue("tmpdir").strpath + os.path.sep |
| 47 | + yield tmpdir, "<tmpdir_from_fixture>/" |
| 48 | + tmpdir = request.getfixturevalue("tmpdir").strpath |
| 49 | + yield tmpdir, "<tmpdir_from_fixture>" |
| 50 | + |
| 51 | + regexp = os.path.join(os.path.realpath(tempfile.gettempdir()), "pytest-of-.*", "pytest-\\d+/") |
| 52 | + yield regexp, "<pytest_tempdir>/" |
| 53 | + |
| 54 | + regexp = os.path.join(tempfile.gettempdir(), "tmp[_a-zA-Z0-9]+") |
| 55 | + |
| 56 | + yield regexp, "<tmpdir_from_tempfile_module>" |
| 57 | + yield os.path.realpath(tempfile.gettempdir()) + os.path.sep, "<tmpdir_from_tempfile_module>/" |
| 58 | + yield os.path.realpath(tempfile.gettempdir()), "<tmpdir_from_tempfile_module>" |
| 59 | + if tempfile.tempdir: |
| 60 | + yield tempfile.tempdir + os.path.sep, "<tmpdir_from_tempfile_module>/" |
| 61 | + yield tempfile.tempdir, "<tmpdir_from_tempfile_module>" |
| 62 | + yield r"var/folders/.*/pytest-of.*/", "<pytest_tempdir>/" |
| 63 | + |
| 64 | + # replace hex object ids in output by 0x????????? |
| 65 | + yield r" 0x[0-9a-fA-F]+", " 0x?????????" |
| 66 | + |
| 67 | + |
| 68 | +def _std_conversion(recorded, request): |
| 69 | + fixed = [] |
| 70 | + for line in recorded.split("\n"): |
| 71 | + for regex, replacement in _std_replacements(request): |
| 72 | + if IS_WIN: |
| 73 | + # fix windows backwards slashes in regex |
| 74 | + regex = regex.replace("\\", "\\\\") |
| 75 | + line, __ = re.subn(regex, replacement, line) |
| 76 | + fixed.append(line) |
| 77 | + return "\n".join(fixed) |
| 78 | + |
| 79 | + |
| 80 | +def _call_converter(converter, recorded, request): |
| 81 | + if converter.__code__.co_argcount == 2: |
| 82 | + # new api for converters |
| 83 | + return converter(recorded, request) |
| 84 | + # old api for converters |
| 85 | + return converter(recorded) |
| 86 | + |
| 87 | + |
| 88 | +def cleanup(recorded, request): |
| 89 | + for converter in _converters_pre: |
| 90 | + recorded = _call_converter(converter, recorded, request) |
| 91 | + |
| 92 | + recorded = _std_conversion(recorded, request) |
| 93 | + |
| 94 | + for converter in _converters_post: |
| 95 | + recorded = _call_converter(converter, recorded, request) |
| 96 | + |
| 97 | + return recorded |
| 98 | + |
| 99 | + |
| 100 | +class CollectErrorRepr(TerminalRepr): |
| 101 | + def __init__(self, messages, colors): |
| 102 | + self.messages = messages |
| 103 | + self.colors = colors |
| 104 | + |
| 105 | + def toterminal(self, out): |
| 106 | + for message, color in zip(self.messages, self.colors): |
| 107 | + out.line(message, **color) |
| 108 | + |
| 109 | + |
| 110 | +def pytest_addoption(parser): |
| 111 | + """Add options to control the timeout plugin""" |
| 112 | + group = parser.getgroup("regtest2", "regression test plugin") |
| 113 | + group.addoption( |
| 114 | + "--regtest2-reset", |
| 115 | + action="store_true", |
| 116 | + help="do not run regtest but record current output", |
| 117 | + ) |
| 118 | + group.addoption( |
| 119 | + "--regtest2-tee", |
| 120 | + action="store_true", |
| 121 | + default=False, |
| 122 | + help="print recorded results to console too", |
| 123 | + ) |
| 124 | + group.addoption( |
| 125 | + "--regtest2-regard-line-endings", |
| 126 | + action="store_true", |
| 127 | + default=False, |
| 128 | + help="do not strip whitespaces at end of recorded lines", |
| 129 | + ) |
| 130 | + group.addoption( |
| 131 | + "--regtest2-nodiff", |
| 132 | + action="store_true", |
| 133 | + default=False, |
| 134 | + help="do not show diff output for failed regresson tests", |
| 135 | + ) |
| 136 | + |
| 137 | + |
| 138 | +class Config: |
| 139 | + ignore_line_endings = True |
| 140 | + tee = False |
| 141 | + reset = False |
| 142 | + nodiff = False |
| 143 | + |
| 144 | + |
| 145 | +def pytest_configure(config): |
| 146 | + Config.tee = config.getvalue("--regtest2-tee") |
| 147 | + Config.ignore_line_endings = not config.getvalue("--regtest2-regard-line-endings") |
| 148 | + Config.reset = config.getvalue("--regtest2-reset") |
| 149 | + Config.nodiff = config.getvalue("--regtest2-nodiff") |
| 150 | + |
| 151 | + |
| 152 | +class RegTestFixture: |
| 153 | + def __init__(self, request, nodeid): |
| 154 | + self.request = request |
| 155 | + self.nodeid = nodeid |
| 156 | + |
| 157 | + self.test_folder = request.fspath.dirname |
| 158 | + self.buffer = StringIO() |
| 159 | + self.identifier = None |
| 160 | + |
| 161 | + @property |
| 162 | + def old_output_file_name(self): |
| 163 | + file_name, __, test_function = self.nodeid.partition("::") |
| 164 | + file_name = os.path.basename(file_name) |
| 165 | + |
| 166 | + test_function = test_function.replace("/", "--") |
| 167 | + if len(test_function) > 100: |
| 168 | + test_function = sha512(test_function.encode("utf-8")).hexdigest()[:10] |
| 169 | + |
| 170 | + stem, __ = os.path.splitext(file_name) |
| 171 | + if self.identifier is not None: |
| 172 | + return stem + "." + test_function + "__" + self.identifier + ".out" |
| 173 | + |
| 174 | + return stem + "." + test_function + ".out" |
| 175 | + |
| 176 | + @property |
| 177 | + def output_file_name(self): |
| 178 | + file_name, __, test_function = self.nodeid.partition("::") |
| 179 | + file_name = os.path.basename(file_name) |
| 180 | + |
| 181 | + for c in "/\\:*\"'?<>|": |
| 182 | + test_function = test_function.replace(c, "-") |
| 183 | + |
| 184 | + # If file name is too long, hash parameters. |
| 185 | + if len(test_function) > 100: |
| 186 | + test_function = test_function[:88] + "__" + sha512(test_function.encode("utf-8")).hexdigest()[:10] |
| 187 | + |
| 188 | + test_function = test_function.replace(" ", "_") |
| 189 | + stem, __ = os.path.splitext(file_name) |
| 190 | + if self.identifier is not None: |
| 191 | + return stem + "." + test_function + "__" + self.identifier + ".out" |
| 192 | + |
| 193 | + return stem + "." + test_function + ".out" |
| 194 | + |
| 195 | + @property |
| 196 | + def old_result_file(self): |
| 197 | + return os.path.join(self.test_folder, "_regtest_outputs", self.old_output_file_name) |
| 198 | + |
| 199 | + @property |
| 200 | + def result_file(self): |
| 201 | + return os.path.join(self.test_folder, "_regtest_outputs", self.output_file_name) |
| 202 | + |
| 203 | + def write(self, what): |
| 204 | + self.buffer.write(what) |
| 205 | + |
| 206 | + def flush(self): |
| 207 | + pass |
| 208 | + |
| 209 | + @property |
| 210 | + def tobe(self): |
| 211 | + if os.path.exists(self.result_file): |
| 212 | + with open(self.result_file) as f: |
| 213 | + return f.read() |
| 214 | + if os.path.exists(self.old_result_file): |
| 215 | + with open(self.old_result_file) as f: |
| 216 | + return f.read() |
| 217 | + |
| 218 | + return "" |
| 219 | + |
| 220 | + @property |
| 221 | + def current(self): |
| 222 | + return cleanup(self.buffer.getvalue(), self.request) |
| 223 | + |
| 224 | + @property |
| 225 | + def current_raw(self): |
| 226 | + return self.buffer.getvalue() |
| 227 | + |
| 228 | + def write_current(self): |
| 229 | + folder = os.path.dirname(self.result_file) |
| 230 | + if not os.path.exists(folder): |
| 231 | + os.makedirs(folder) |
| 232 | + if os.path.exists(self.old_result_file): |
| 233 | + with open(self.old_result_file, "w") as fh: |
| 234 | + fh.write(self.current) |
| 235 | + return |
| 236 | + |
| 237 | + with open(self.result_file, "w") as fh: |
| 238 | + fh.write(self.current) |
| 239 | + |
| 240 | + def __enter__(self): |
| 241 | + self.stdout = sys.stdout |
| 242 | + sys.stdout = self.buffer |
| 243 | + |
| 244 | + def __exit__(self, exc_type, exc_value, traceback): |
| 245 | + sys.stdout = self.stdout |
| 246 | + return False # don't suppress exception |
| 247 | + |
| 248 | + |
| 249 | +@pytest.fixture() |
| 250 | +def regtest(request: pytest.FixtureRequest): |
| 251 | + item = request.node |
| 252 | + |
| 253 | + return RegTestFixture(request, item.nodeid) |
| 254 | + |
| 255 | + |
| 256 | +@pytest.hookimpl(hookwrapper=True) |
| 257 | +def pytest_runtest_makereport(item, call): |
| 258 | + if not hasattr(item, "fixturenames") or "regtest" not in item.fixturenames: |
| 259 | + yield |
| 260 | + return |
| 261 | + |
| 262 | + outcome = yield |
| 263 | + |
| 264 | + excinfo = call.excinfo |
| 265 | + when = call.when |
| 266 | + duration = call.stop - call.start |
| 267 | + keywords = dict([(x, 1) for x in item.keywords]) |
| 268 | + |
| 269 | + result = outcome.get_result() |
| 270 | + result.when = when |
| 271 | + result.duration = duration |
| 272 | + result.keywords = keywords |
| 273 | + |
| 274 | + xfail = item.get_closest_marker("xfail") is not None |
| 275 | + |
| 276 | + if excinfo: |
| 277 | + if not isinstance(excinfo, ExceptionInfo): |
| 278 | + _outcome = "failed" |
| 279 | + longrepr = excinfo |
| 280 | + elif excinfo.errisinstance(skip.Exception): |
| 281 | + _outcome = "skipped" |
| 282 | + r = excinfo._getreprcrash() |
| 283 | + longrepr = (str(r.path), r.lineno, r.message) if r is not None else excinfo |
| 284 | + else: |
| 285 | + _outcome = "failed" if not xfail else "skipped" |
| 286 | + if call.when == "call": |
| 287 | + longrepr = item.repr_failure(excinfo) |
| 288 | + else: # exception in setup or teardown |
| 289 | + longrepr = item._repr_failure_py(excinfo, style=item.config.option.tbstyle) |
| 290 | + result.longrepr = longrepr |
| 291 | + result.outcome = _outcome |
| 292 | + |
| 293 | + else: |
| 294 | + result.outcome = "passed" |
| 295 | + result.longrepr = None |
| 296 | + |
| 297 | + if call.when == "call": |
| 298 | + regtest = getattr(item, "funcargs", {}).get("regtest") |
| 299 | + if regtest is not None: |
| 300 | + xfail = item.get_closest_marker("xfail") is not None |
| 301 | + handle_regtest_result(regtest, result, xfail) |
| 302 | + |
| 303 | + |
| 304 | +def handle_regtest_result(regtest, result, xfail): |
| 305 | + if Config.tee: |
| 306 | + tw = TerminalWriter() |
| 307 | + |
| 308 | + tw.line() |
| 309 | + line = "recorded raw output to regtest fixture:" |
| 310 | + line = line.ljust(tw.fullwidth, "-") |
| 311 | + tw.line(line, green=True) |
| 312 | + tw.write(regtest.current_raw, cyan=True) |
| 313 | + tw.line("-" * tw.fullwidth, green=True) |
| 314 | + |
| 315 | + if not Config.reset: |
| 316 | + current = regtest.current.split("\n") |
| 317 | + tobe = regtest.tobe.split("\n") |
| 318 | + |
| 319 | + if Config.ignore_line_endings: |
| 320 | + current = [l.rstrip() for l in current] |
| 321 | + tobe = [l.rstrip() for l in tobe] |
| 322 | + |
| 323 | + if current != tobe: |
| 324 | + if xfail: |
| 325 | + result.outcome = "skipped" |
| 326 | + else: |
| 327 | + result.outcome = "failed" |
| 328 | + |
| 329 | + nodeid = regtest.nodeid + ("" if regtest.identifier is None else "__" + regtest.identifier) |
| 330 | + if Config.nodiff: |
| 331 | + result.longrepr = CollectErrorRepr( |
| 332 | + ["regression test for {} failed\n".format(nodeid)], |
| 333 | + [dict(red=True, bold=True)], |
| 334 | + ) |
| 335 | + return |
| 336 | + |
| 337 | + if not Config.ignore_line_endings: |
| 338 | + # add quotes around lines in diff: |
| 339 | + current = map(repr, current) |
| 340 | + tobe = map(repr, tobe) |
| 341 | + collected = list(difflib.unified_diff(current, tobe, "current", "tobe", lineterm="")) |
| 342 | + |
| 343 | + msg = "\nregression test output differences for {}:\n".format(nodeid) |
| 344 | + msg_diff = "> " + "\n> ".join(collected) |
| 345 | + result.longrepr = CollectErrorRepr([msg, msg_diff + "\n"], [dict(), dict(red=True, bold=True)]) |
| 346 | + |
| 347 | + else: |
| 348 | + regtest.write_current() |
0 commit comments