aboutsummaryrefslogtreecommitdiffstats
path: root/tests/wpt/web-platform-tests/tools/pytest/_pytest/pytester.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/wpt/web-platform-tests/tools/pytest/_pytest/pytester.py')
-rw-r--r--tests/wpt/web-platform-tests/tools/pytest/_pytest/pytester.py1110
1 files changed, 0 insertions, 1110 deletions
diff --git a/tests/wpt/web-platform-tests/tools/pytest/_pytest/pytester.py b/tests/wpt/web-platform-tests/tools/pytest/_pytest/pytester.py
deleted file mode 100644
index faed7f581c9..00000000000
--- a/tests/wpt/web-platform-tests/tools/pytest/_pytest/pytester.py
+++ /dev/null
@@ -1,1110 +0,0 @@
-""" (disabled by default) support for testing pytest and pytest plugins. """
-import codecs
-import gc
-import os
-import platform
-import re
-import subprocess
-import sys
-import time
-import traceback
-from fnmatch import fnmatch
-
-from py.builtin import print_
-
-from _pytest._code import Source
-import py
-import pytest
-from _pytest.main import Session, EXIT_OK
-
-
-def pytest_addoption(parser):
- # group = parser.getgroup("pytester", "pytester (self-tests) options")
- parser.addoption('--lsof',
- action="store_true", dest="lsof", default=False,
- help=("run FD checks if lsof is available"))
-
- parser.addoption('--runpytest', default="inprocess", dest="runpytest",
- choices=("inprocess", "subprocess", ),
- help=("run pytest sub runs in tests using an 'inprocess' "
- "or 'subprocess' (python -m main) method"))
-
-
-def pytest_configure(config):
- # This might be called multiple times. Only take the first.
- global _pytest_fullpath
- try:
- _pytest_fullpath
- except NameError:
- _pytest_fullpath = os.path.abspath(pytest.__file__.rstrip("oc"))
- _pytest_fullpath = _pytest_fullpath.replace("$py.class", ".py")
-
- if config.getvalue("lsof"):
- checker = LsofFdLeakChecker()
- if checker.matching_platform():
- config.pluginmanager.register(checker)
-
-
-class LsofFdLeakChecker(object):
- def get_open_files(self):
- out = self._exec_lsof()
- open_files = self._parse_lsof_output(out)
- return open_files
-
- def _exec_lsof(self):
- pid = os.getpid()
- return py.process.cmdexec("lsof -Ffn0 -p %d" % pid)
-
- def _parse_lsof_output(self, out):
- def isopen(line):
- return line.startswith('f') and ("deleted" not in line and
- 'mem' not in line and "txt" not in line and 'cwd' not in line)
-
- open_files = []
-
- for line in out.split("\n"):
- if isopen(line):
- fields = line.split('\0')
- fd = fields[0][1:]
- filename = fields[1][1:]
- if filename.startswith('/'):
- open_files.append((fd, filename))
-
- return open_files
-
- def matching_platform(self):
- try:
- py.process.cmdexec("lsof -v")
- except (py.process.cmdexec.Error, UnicodeDecodeError):
- # cmdexec may raise UnicodeDecodeError on Windows systems
- # with locale other than english:
- # https://bitbucket.org/pytest-dev/py/issues/66
- return False
- else:
- return True
-
- @pytest.hookimpl(hookwrapper=True, tryfirst=True)
- def pytest_runtest_item(self, item):
- lines1 = self.get_open_files()
- yield
- if hasattr(sys, "pypy_version_info"):
- gc.collect()
- lines2 = self.get_open_files()
-
- new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1])
- leaked_files = [t for t in lines2 if t[0] in new_fds]
- if leaked_files:
- error = []
- error.append("***** %s FD leakage detected" % len(leaked_files))
- error.extend([str(f) for f in leaked_files])
- error.append("*** Before:")
- error.extend([str(f) for f in lines1])
- error.append("*** After:")
- error.extend([str(f) for f in lines2])
- error.append(error[0])
- error.append("*** function %s:%s: %s " % item.location)
- pytest.fail("\n".join(error), pytrace=False)
-
-
-# XXX copied from execnet's conftest.py - needs to be merged
-winpymap = {
- 'python2.7': r'C:\Python27\python.exe',
- 'python2.6': r'C:\Python26\python.exe',
- 'python3.1': r'C:\Python31\python.exe',
- 'python3.2': r'C:\Python32\python.exe',
- 'python3.3': r'C:\Python33\python.exe',
- 'python3.4': r'C:\Python34\python.exe',
- 'python3.5': r'C:\Python35\python.exe',
-}
-
-def getexecutable(name, cache={}):
- try:
- return cache[name]
- except KeyError:
- executable = py.path.local.sysfind(name)
- if executable:
- if name == "jython":
- import subprocess
- popen = subprocess.Popen([str(executable), "--version"],
- universal_newlines=True, stderr=subprocess.PIPE)
- out, err = popen.communicate()
- if not err or "2.5" not in err:
- executable = None
- if "2.5.2" in err:
- executable = None # http://bugs.jython.org/issue1790
- cache[name] = executable
- return executable
-
-@pytest.fixture(params=['python2.6', 'python2.7', 'python3.3', "python3.4",
- 'pypy', 'pypy3'])
-def anypython(request):
- name = request.param
- executable = getexecutable(name)
- if executable is None:
- if sys.platform == "win32":
- executable = winpymap.get(name, None)
- if executable:
- executable = py.path.local(executable)
- if executable.check():
- return executable
- pytest.skip("no suitable %s found" % (name,))
- return executable
-
-# used at least by pytest-xdist plugin
-@pytest.fixture
-def _pytest(request):
- """ Return a helper which offers a gethookrecorder(hook)
- method which returns a HookRecorder instance which helps
- to make assertions about called hooks.
- """
- return PytestArg(request)
-
-class PytestArg:
- def __init__(self, request):
- self.request = request
-
- def gethookrecorder(self, hook):
- hookrecorder = HookRecorder(hook._pm)
- self.request.addfinalizer(hookrecorder.finish_recording)
- return hookrecorder
-
-
-def get_public_names(l):
- """Only return names from iterator l without a leading underscore."""
- return [x for x in l if x[0] != "_"]
-
-
-class ParsedCall:
- def __init__(self, name, kwargs):
- self.__dict__.update(kwargs)
- self._name = name
-
- def __repr__(self):
- d = self.__dict__.copy()
- del d['_name']
- return "<ParsedCall %r(**%r)>" %(self._name, d)
-
-
-class HookRecorder:
- """Record all hooks called in a plugin manager.
-
- This wraps all the hook calls in the plugin manager, recording
- each call before propagating the normal calls.
-
- """
-
- def __init__(self, pluginmanager):
- self._pluginmanager = pluginmanager
- self.calls = []
-
- def before(hook_name, hook_impls, kwargs):
- self.calls.append(ParsedCall(hook_name, kwargs))
-
- def after(outcome, hook_name, hook_impls, kwargs):
- pass
-
- self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after)
-
- def finish_recording(self):
- self._undo_wrapping()
-
- def getcalls(self, names):
- if isinstance(names, str):
- names = names.split()
- return [call for call in self.calls if call._name in names]
-
- def assert_contains(self, entries):
- __tracebackhide__ = True
- i = 0
- entries = list(entries)
- backlocals = sys._getframe(1).f_locals
- while entries:
- name, check = entries.pop(0)
- for ind, call in enumerate(self.calls[i:]):
- if call._name == name:
- print_("NAMEMATCH", name, call)
- if eval(check, backlocals, call.__dict__):
- print_("CHECKERMATCH", repr(check), "->", call)
- else:
- print_("NOCHECKERMATCH", repr(check), "-", call)
- continue
- i += ind + 1
- break
- print_("NONAMEMATCH", name, "with", call)
- else:
- pytest.fail("could not find %r check %r" % (name, check))
-
- def popcall(self, name):
- __tracebackhide__ = True
- for i, call in enumerate(self.calls):
- if call._name == name:
- del self.calls[i]
- return call
- lines = ["could not find call %r, in:" % (name,)]
- lines.extend([" %s" % str(x) for x in self.calls])
- pytest.fail("\n".join(lines))
-
- def getcall(self, name):
- l = self.getcalls(name)
- assert len(l) == 1, (name, l)
- return l[0]
-
- # functionality for test reports
-
- def getreports(self,
- names="pytest_runtest_logreport pytest_collectreport"):
- return [x.report for x in self.getcalls(names)]
-
- def matchreport(self, inamepart="",
- names="pytest_runtest_logreport pytest_collectreport", when=None):
- """ return a testreport whose dotted import path matches """
- l = []
- for rep in self.getreports(names=names):
- try:
- if not when and rep.when != "call" and rep.passed:
- # setup/teardown passing reports - let's ignore those
- continue
- except AttributeError:
- pass
- if when and getattr(rep, 'when', None) != when:
- continue
- if not inamepart or inamepart in rep.nodeid.split("::"):
- l.append(rep)
- if not l:
- raise ValueError("could not find test report matching %r: "
- "no test reports at all!" % (inamepart,))
- if len(l) > 1:
- raise ValueError(
- "found 2 or more testreports matching %r: %s" %(inamepart, l))
- return l[0]
-
- def getfailures(self,
- names='pytest_runtest_logreport pytest_collectreport'):
- return [rep for rep in self.getreports(names) if rep.failed]
-
- def getfailedcollections(self):
- return self.getfailures('pytest_collectreport')
-
- def listoutcomes(self):
- passed = []
- skipped = []
- failed = []
- for rep in self.getreports(
- "pytest_collectreport pytest_runtest_logreport"):
- if rep.passed:
- if getattr(rep, "when", None) == "call":
- passed.append(rep)
- elif rep.skipped:
- skipped.append(rep)
- elif rep.failed:
- failed.append(rep)
- return passed, skipped, failed
-
- def countoutcomes(self):
- return [len(x) for x in self.listoutcomes()]
-
- def assertoutcome(self, passed=0, skipped=0, failed=0):
- realpassed, realskipped, realfailed = self.listoutcomes()
- assert passed == len(realpassed)
- assert skipped == len(realskipped)
- assert failed == len(realfailed)
-
- def clear(self):
- self.calls[:] = []
-
-
-@pytest.fixture
-def linecomp(request):
- return LineComp()
-
-
-def pytest_funcarg__LineMatcher(request):
- return LineMatcher
-
-
-@pytest.fixture
-def testdir(request, tmpdir_factory):
- return Testdir(request, tmpdir_factory)
-
-
-rex_outcome = re.compile("(\d+) ([\w-]+)")
-class RunResult:
- """The result of running a command.
-
- Attributes:
-
- :ret: The return value.
- :outlines: List of lines captured from stdout.
- :errlines: List of lines captures from stderr.
- :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to
- reconstruct stdout or the commonly used
- ``stdout.fnmatch_lines()`` method.
- :stderrr: :py:class:`LineMatcher` of stderr.
- :duration: Duration in seconds.
-
- """
- def __init__(self, ret, outlines, errlines, duration):
- self.ret = ret
- self.outlines = outlines
- self.errlines = errlines
- self.stdout = LineMatcher(outlines)
- self.stderr = LineMatcher(errlines)
- self.duration = duration
-
- def parseoutcomes(self):
- """ Return a dictionary of outcomestring->num from parsing
- the terminal output that the test process produced."""
- for line in reversed(self.outlines):
- if 'seconds' in line:
- outcomes = rex_outcome.findall(line)
- if outcomes:
- d = {}
- for num, cat in outcomes:
- d[cat] = int(num)
- return d
-
- def assert_outcomes(self, passed=0, skipped=0, failed=0):
- """ assert that the specified outcomes appear with the respective
- numbers (0 means it didn't occur) in the text output from a test run."""
- d = self.parseoutcomes()
- assert passed == d.get("passed", 0)
- assert skipped == d.get("skipped", 0)
- assert failed == d.get("failed", 0)
-
-
-
-class Testdir:
- """Temporary test directory with tools to test/run py.test itself.
-
- This is based on the ``tmpdir`` fixture but provides a number of
- methods which aid with testing py.test itself. Unless
- :py:meth:`chdir` is used all methods will use :py:attr:`tmpdir` as
- current working directory.
-
- Attributes:
-
- :tmpdir: The :py:class:`py.path.local` instance of the temporary
- directory.
-
- :plugins: A list of plugins to use with :py:meth:`parseconfig` and
- :py:meth:`runpytest`. Initially this is an empty list but
- plugins can be added to the list. The type of items to add to
- the list depend on the method which uses them so refer to them
- for details.
-
- """
-
- def __init__(self, request, tmpdir_factory):
- self.request = request
- # XXX remove duplication with tmpdir plugin
- basetmp = tmpdir_factory.ensuretemp("testdir")
- name = request.function.__name__
- for i in range(100):
- try:
- tmpdir = basetmp.mkdir(name + str(i))
- except py.error.EEXIST:
- continue
- break
- self.tmpdir = tmpdir
- self.plugins = []
- self._savesyspath = (list(sys.path), list(sys.meta_path))
- self._savemodulekeys = set(sys.modules)
- self.chdir() # always chdir
- self.request.addfinalizer(self.finalize)
- method = self.request.config.getoption("--runpytest")
- if method == "inprocess":
- self._runpytest_method = self.runpytest_inprocess
- elif method == "subprocess":
- self._runpytest_method = self.runpytest_subprocess
-
- def __repr__(self):
- return "<Testdir %r>" % (self.tmpdir,)
-
- def finalize(self):
- """Clean up global state artifacts.
-
- Some methods modify the global interpreter state and this
- tries to clean this up. It does not remove the temporary
- directory however so it can be looked at after the test run
- has finished.
-
- """
- sys.path[:], sys.meta_path[:] = self._savesyspath
- if hasattr(self, '_olddir'):
- self._olddir.chdir()
- self.delete_loaded_modules()
-
- def delete_loaded_modules(self):
- """Delete modules that have been loaded during a test.
-
- This allows the interpreter to catch module changes in case
- the module is re-imported.
- """
- for name in set(sys.modules).difference(self._savemodulekeys):
- # it seems zope.interfaces is keeping some state
- # (used by twisted related tests)
- if name != "zope.interface":
- del sys.modules[name]
-
- def make_hook_recorder(self, pluginmanager):
- """Create a new :py:class:`HookRecorder` for a PluginManager."""
- assert not hasattr(pluginmanager, "reprec")
- pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
- self.request.addfinalizer(reprec.finish_recording)
- return reprec
-
- def chdir(self):
- """Cd into the temporary directory.
-
- This is done automatically upon instantiation.
-
- """
- old = self.tmpdir.chdir()
- if not hasattr(self, '_olddir'):
- self._olddir = old
-
- def _makefile(self, ext, args, kwargs):
- items = list(kwargs.items())
- if args:
- source = py.builtin._totext("\n").join(
- map(py.builtin._totext, args)) + py.builtin._totext("\n")
- basename = self.request.function.__name__
- items.insert(0, (basename, source))
- ret = None
- for name, value in items:
- p = self.tmpdir.join(name).new(ext=ext)
- source = Source(value)
- def my_totext(s, encoding="utf-8"):
- if py.builtin._isbytes(s):
- s = py.builtin._totext(s, encoding=encoding)
- return s
- source_unicode = "\n".join([my_totext(line) for line in source.lines])
- source = py.builtin._totext(source_unicode)
- content = source.strip().encode("utf-8") # + "\n"
- #content = content.rstrip() + "\n"
- p.write(content, "wb")
- if ret is None:
- ret = p
- return ret
-
- def makefile(self, ext, *args, **kwargs):
- """Create a new file in the testdir.
-
- ext: The extension the file should use, including the dot.
- E.g. ".py".
-
- args: All args will be treated as strings and joined using
- newlines. The result will be written as contents to the
- file. The name of the file will be based on the test
- function requesting this fixture.
- E.g. "testdir.makefile('.txt', 'line1', 'line2')"
-
- kwargs: Each keyword is the name of a file, while the value of
- it will be written as contents of the file.
- E.g. "testdir.makefile('.ini', pytest='[pytest]\naddopts=-rs\n')"
-
- """
- return self._makefile(ext, args, kwargs)
-
- def makeconftest(self, source):
- """Write a contest.py file with 'source' as contents."""
- return self.makepyfile(conftest=source)
-
- def makeini(self, source):
- """Write a tox.ini file with 'source' as contents."""
- return self.makefile('.ini', tox=source)
-
- def getinicfg(self, source):
- """Return the pytest section from the tox.ini config file."""
- p = self.makeini(source)
- return py.iniconfig.IniConfig(p)['pytest']
-
- def makepyfile(self, *args, **kwargs):
- """Shortcut for .makefile() with a .py extension."""
- return self._makefile('.py', args, kwargs)
-
- def maketxtfile(self, *args, **kwargs):
- """Shortcut for .makefile() with a .txt extension."""
- return self._makefile('.txt', args, kwargs)
-
- def syspathinsert(self, path=None):
- """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`.
-
- This is undone automatically after the test.
- """
- if path is None:
- path = self.tmpdir
- sys.path.insert(0, str(path))
- # a call to syspathinsert() usually means that the caller
- # wants to import some dynamically created files.
- # with python3 we thus invalidate import caches.
- self._possibly_invalidate_import_caches()
-
- def _possibly_invalidate_import_caches(self):
- # invalidate caches if we can (py33 and above)
- try:
- import importlib
- except ImportError:
- pass
- else:
- if hasattr(importlib, "invalidate_caches"):
- importlib.invalidate_caches()
-
- def mkdir(self, name):
- """Create a new (sub)directory."""
- return self.tmpdir.mkdir(name)
-
- def mkpydir(self, name):
- """Create a new python package.
-
- This creates a (sub)direcotry with an empty ``__init__.py``
- file so that is recognised as a python package.
-
- """
- p = self.mkdir(name)
- p.ensure("__init__.py")
- return p
-
- Session = Session
- def getnode(self, config, arg):
- """Return the collection node of a file.
-
- :param config: :py:class:`_pytest.config.Config` instance, see
- :py:meth:`parseconfig` and :py:meth:`parseconfigure` to
- create the configuration.
-
- :param arg: A :py:class:`py.path.local` instance of the file.
-
- """
- session = Session(config)
- assert '::' not in str(arg)
- p = py.path.local(arg)
- config.hook.pytest_sessionstart(session=session)
- res = session.perform_collect([str(p)], genitems=False)[0]
- config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
- return res
-
- def getpathnode(self, path):
- """Return the collection node of a file.
-
- This is like :py:meth:`getnode` but uses
- :py:meth:`parseconfigure` to create the (configured) py.test
- Config instance.
-
- :param path: A :py:class:`py.path.local` instance of the file.
-
- """
- config = self.parseconfigure(path)
- session = Session(config)
- x = session.fspath.bestrelpath(path)
- config.hook.pytest_sessionstart(session=session)
- res = session.perform_collect([x], genitems=False)[0]
- config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
- return res
-
- def genitems(self, colitems):
- """Generate all test items from a collection node.
-
- This recurses into the collection node and returns a list of
- all the test items contained within.
-
- """
- session = colitems[0].session
- result = []
- for colitem in colitems:
- result.extend(session.genitems(colitem))
- return result
-
- def runitem(self, source):
- """Run the "test_func" Item.
-
- The calling test instance (the class which contains the test
- method) must provide a ``.getrunner()`` method which should
- return a runner which can run the test protocol for a single
- item, like e.g. :py:func:`_pytest.runner.runtestprotocol`.
-
- """
- # used from runner functional tests
- item = self.getitem(source)
- # the test class where we are called from wants to provide the runner
- testclassinstance = self.request.instance
- runner = testclassinstance.getrunner()
- return runner(item)
-
- def inline_runsource(self, source, *cmdlineargs):
- """Run a test module in process using ``pytest.main()``.
-
- This run writes "source" into a temporary file and runs
- ``pytest.main()`` on it, returning a :py:class:`HookRecorder`
- instance for the result.
-
- :param source: The source code of the test module.
-
- :param cmdlineargs: Any extra command line arguments to use.
-
- :return: :py:class:`HookRecorder` instance of the result.
-
- """
- p = self.makepyfile(source)
- l = list(cmdlineargs) + [p]
- return self.inline_run(*l)
-
- def inline_genitems(self, *args):
- """Run ``pytest.main(['--collectonly'])`` in-process.
-
- Retuns a tuple of the collected items and a
- :py:class:`HookRecorder` instance.
-
- This runs the :py:func:`pytest.main` function to run all of
- py.test inside the test process itself like
- :py:meth:`inline_run`. However the return value is a tuple of
- the collection items and a :py:class:`HookRecorder` instance.
-
- """
- rec = self.inline_run("--collect-only", *args)
- items = [x.item for x in rec.getcalls("pytest_itemcollected")]
- return items, rec
-
- def inline_run(self, *args, **kwargs):
- """Run ``pytest.main()`` in-process, returning a HookRecorder.
-
- This runs the :py:func:`pytest.main` function to run all of
- py.test inside the test process itself. This means it can
- return a :py:class:`HookRecorder` instance which gives more
- detailed results from then run then can be done by matching
- stdout/stderr from :py:meth:`runpytest`.
-
- :param args: Any command line arguments to pass to
- :py:func:`pytest.main`.
-
- :param plugin: (keyword-only) Extra plugin instances the
- ``pytest.main()`` instance should use.
-
- :return: A :py:class:`HookRecorder` instance.
-
- """
- rec = []
- class Collect:
- def pytest_configure(x, config):
- rec.append(self.make_hook_recorder(config.pluginmanager))
-
- plugins = kwargs.get("plugins") or []
- plugins.append(Collect())
- ret = pytest.main(list(args), plugins=plugins)
- self.delete_loaded_modules()
- if len(rec) == 1:
- reprec = rec.pop()
- else:
- class reprec:
- pass
- reprec.ret = ret
-
- # typically we reraise keyboard interrupts from the child run
- # because it's our user requesting interruption of the testing
- if ret == 2 and not kwargs.get("no_reraise_ctrlc"):
- calls = reprec.getcalls("pytest_keyboard_interrupt")
- if calls and calls[-1].excinfo.type == KeyboardInterrupt:
- raise KeyboardInterrupt()
- return reprec
-
- def runpytest_inprocess(self, *args, **kwargs):
- """ Return result of running pytest in-process, providing a similar
- interface to what self.runpytest() provides. """
- if kwargs.get("syspathinsert"):
- self.syspathinsert()
- now = time.time()
- capture = py.io.StdCapture()
- try:
- try:
- reprec = self.inline_run(*args, **kwargs)
- except SystemExit as e:
- class reprec:
- ret = e.args[0]
- except Exception:
- traceback.print_exc()
- class reprec:
- ret = 3
- finally:
- out, err = capture.reset()
- sys.stdout.write(out)
- sys.stderr.write(err)
-
- res = RunResult(reprec.ret,
- out.split("\n"), err.split("\n"),
- time.time()-now)
- res.reprec = reprec
- return res
-
- def runpytest(self, *args, **kwargs):
- """ Run pytest inline or in a subprocess, depending on the command line
- option "--runpytest" and return a :py:class:`RunResult`.
-
- """
- args = self._ensure_basetemp(args)
- return self._runpytest_method(*args, **kwargs)
-
- def _ensure_basetemp(self, args):
- args = [str(x) for x in args]
- for x in args:
- if str(x).startswith('--basetemp'):
- #print ("basedtemp exists: %s" %(args,))
- break
- else:
- args.append("--basetemp=%s" % self.tmpdir.dirpath('basetemp'))
- #print ("added basetemp: %s" %(args,))
- return args
-
- def parseconfig(self, *args):
- """Return a new py.test Config instance from given commandline args.
-
- This invokes the py.test bootstrapping code in _pytest.config
- to create a new :py:class:`_pytest.core.PluginManager` and
- call the pytest_cmdline_parse hook to create new
- :py:class:`_pytest.config.Config` instance.
-
- If :py:attr:`plugins` has been populated they should be plugin
- modules which will be registered with the PluginManager.
-
- """
- args = self._ensure_basetemp(args)
-
- import _pytest.config
- config = _pytest.config._prepareconfig(args, self.plugins)
- # we don't know what the test will do with this half-setup config
- # object and thus we make sure it gets unconfigured properly in any
- # case (otherwise capturing could still be active, for example)
- self.request.addfinalizer(config._ensure_unconfigure)
- return config
-
- def parseconfigure(self, *args):
- """Return a new py.test configured Config instance.
-
- This returns a new :py:class:`_pytest.config.Config` instance
- like :py:meth:`parseconfig`, but also calls the
- pytest_configure hook.
-
- """
- config = self.parseconfig(*args)
- config._do_configure()
- self.request.addfinalizer(config._ensure_unconfigure)
- return config
-
- def getitem(self, source, funcname="test_func"):
- """Return the test item for a test function.
-
- This writes the source to a python file and runs py.test's
- collection on the resulting module, returning the test item
- for the requested function name.
-
- :param source: The module source.
-
- :param funcname: The name of the test function for which the
- Item must be returned.
-
- """
- items = self.getitems(source)
- for item in items:
- if item.name == funcname:
- return item
- assert 0, "%r item not found in module:\n%s\nitems: %s" %(
- funcname, source, items)
-
- def getitems(self, source):
- """Return all test items collected from the module.
-
- This writes the source to a python file and runs py.test's
- collection on the resulting module, returning all test items
- contained within.
-
- """
- modcol = self.getmodulecol(source)
- return self.genitems([modcol])
-
- def getmodulecol(self, source, configargs=(), withinit=False):
- """Return the module collection node for ``source``.
-
- This writes ``source`` to a file using :py:meth:`makepyfile`
- and then runs the py.test collection on it, returning the
- collection node for the test module.
-
- :param source: The source code of the module to collect.
-
- :param configargs: Any extra arguments to pass to
- :py:meth:`parseconfigure`.
-
- :param withinit: Whether to also write a ``__init__.py`` file
- to the temporarly directory to ensure it is a package.
-
- """
- kw = {self.request.function.__name__: Source(source).strip()}
- path = self.makepyfile(**kw)
- if withinit:
- self.makepyfile(__init__ = "#")
- self.config = config = self.parseconfigure(path, *configargs)
- node = self.getnode(config, path)
- return node
-
- def collect_by_name(self, modcol, name):
- """Return the collection node for name from the module collection.
-
- This will search a module collection node for a collection
- node matching the given name.
-
- :param modcol: A module collection node, see
- :py:meth:`getmodulecol`.
-
- :param name: The name of the node to return.
-
- """
- for colitem in modcol._memocollect():
- if colitem.name == name:
- return colitem
-
- def popen(self, cmdargs, stdout, stderr, **kw):
- """Invoke subprocess.Popen.
-
- This calls subprocess.Popen making sure the current working
- directory is the PYTHONPATH.
-
- You probably want to use :py:meth:`run` instead.
-
- """
- env = os.environ.copy()
- env['PYTHONPATH'] = os.pathsep.join(filter(None, [
- str(os.getcwd()), env.get('PYTHONPATH', '')]))
- kw['env'] = env
- return subprocess.Popen(cmdargs,
- stdout=stdout, stderr=stderr, **kw)
-
- def run(self, *cmdargs):
- """Run a command with arguments.
-
- Run a process using subprocess.Popen saving the stdout and
- stderr.
-
- Returns a :py:class:`RunResult`.
-
- """
- return self._run(*cmdargs)
-
- def _run(self, *cmdargs):
- cmdargs = [str(x) for x in cmdargs]
- p1 = self.tmpdir.join("stdout")
- p2 = self.tmpdir.join("stderr")
- print_("running:", ' '.join(cmdargs))
- print_(" in:", str(py.path.local()))
- f1 = codecs.open(str(p1), "w", encoding="utf8")
- f2 = codecs.open(str(p2), "w", encoding="utf8")
- try:
- now = time.time()
- popen = self.popen(cmdargs, stdout=f1, stderr=f2,
- close_fds=(sys.platform != "win32"))
- ret = popen.wait()
- finally:
- f1.close()
- f2.close()
- f1 = codecs.open(str(p1), "r", encoding="utf8")
- f2 = codecs.open(str(p2), "r", encoding="utf8")
- try:
- out = f1.read().splitlines()
- err = f2.read().splitlines()
- finally:
- f1.close()
- f2.close()
- self._dump_lines(out, sys.stdout)
- self._dump_lines(err, sys.stderr)
- return RunResult(ret, out, err, time.time()-now)
-
- def _dump_lines(self, lines, fp):
- try:
- for line in lines:
- py.builtin.print_(line, file=fp)
- except UnicodeEncodeError:
- print("couldn't print to %s because of encoding" % (fp,))
-
- def _getpytestargs(self):
- # we cannot use "(sys.executable,script)"
- # because on windows the script is e.g. a py.test.exe
- return (sys.executable, _pytest_fullpath,) # noqa
-
- def runpython(self, script):
- """Run a python script using sys.executable as interpreter.
-
- Returns a :py:class:`RunResult`.
- """
- return self.run(sys.executable, script)
-
- def runpython_c(self, command):
- """Run python -c "command", return a :py:class:`RunResult`."""
- return self.run(sys.executable, "-c", command)
-
- def runpytest_subprocess(self, *args, **kwargs):
- """Run py.test as a subprocess with given arguments.
-
- Any plugins added to the :py:attr:`plugins` list will added
- using the ``-p`` command line option. Addtionally
- ``--basetemp`` is used put any temporary files and directories
- in a numbered directory prefixed with "runpytest-" so they do
- not conflict with the normal numberd pytest location for
- temporary files and directories.
-
- Returns a :py:class:`RunResult`.
-
- """
- p = py.path.local.make_numbered_dir(prefix="runpytest-",
- keep=None, rootdir=self.tmpdir)
- args = ('--basetemp=%s' % p, ) + args
- #for x in args:
- # if '--confcutdir' in str(x):
- # break
- #else:
- # pass
- # args = ('--confcutdir=.',) + args
- plugins = [x for x in self.plugins if isinstance(x, str)]
- if plugins:
- args = ('-p', plugins[0]) + args
- args = self._getpytestargs() + args
- return self.run(*args)
-
- def spawn_pytest(self, string, expect_timeout=10.0):
- """Run py.test using pexpect.
-
- This makes sure to use the right py.test and sets up the
- temporary directory locations.
-
- The pexpect child is returned.
-
- """
- basetemp = self.tmpdir.mkdir("pexpect")
- invoke = " ".join(map(str, self._getpytestargs()))
- cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string)
- return self.spawn(cmd, expect_timeout=expect_timeout)
-
- def spawn(self, cmd, expect_timeout=10.0):
- """Run a command using pexpect.
-
- The pexpect child is returned.
- """
- pexpect = pytest.importorskip("pexpect", "3.0")
- if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
- pytest.skip("pypy-64 bit not supported")
- if sys.platform == "darwin":
- pytest.xfail("pexpect does not work reliably on darwin?!")
- if sys.platform.startswith("freebsd"):
- pytest.xfail("pexpect does not work reliably on freebsd")
- logfile = self.tmpdir.join("spawn.out").open("wb")
- child = pexpect.spawn(cmd, logfile=logfile)
- self.request.addfinalizer(logfile.close)
- child.timeout = expect_timeout
- return child
-
-def getdecoded(out):
- try:
- return out.decode("utf-8")
- except UnicodeDecodeError:
- return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (
- py.io.saferepr(out),)
-
-
-class LineComp:
- def __init__(self):
- self.stringio = py.io.TextIO()
-
- def assert_contains_lines(self, lines2):
- """ assert that lines2 are contained (linearly) in lines1.
- return a list of extralines found.
- """
- __tracebackhide__ = True
- val = self.stringio.getvalue()
- self.stringio.truncate(0)
- self.stringio.seek(0)
- lines1 = val.split("\n")
- return LineMatcher(lines1).fnmatch_lines(lines2)
-
-
-class LineMatcher:
- """Flexible matching of text.
-
- This is a convenience class to test large texts like the output of
- commands.
-
- The constructor takes a list of lines without their trailing
- newlines, i.e. ``text.splitlines()``.
-
- """
-
- def __init__(self, lines):
- self.lines = lines
-
- def str(self):
- """Return the entire original text."""
- return "\n".join(self.lines)
-
- def _getlines(self, lines2):
- if isinstance(lines2, str):
- lines2 = Source(lines2)
- if isinstance(lines2, Source):
- lines2 = lines2.strip().lines
- return lines2
-
- def fnmatch_lines_random(self, lines2):
- """Check lines exist in the output.
-
- The argument is a list of lines which have to occur in the
- output, in any order. Each line can contain glob whildcards.
-
- """
- lines2 = self._getlines(lines2)
- for line in lines2:
- for x in self.lines:
- if line == x or fnmatch(x, line):
- print_("matched: ", repr(line))
- break
- else:
- raise ValueError("line %r not found in output" % line)
-
- def get_lines_after(self, fnline):
- """Return all lines following the given line in the text.
-
- The given line can contain glob wildcards.
- """
- for i, line in enumerate(self.lines):
- if fnline == line or fnmatch(line, fnline):
- return self.lines[i+1:]
- raise ValueError("line %r not found in output" % fnline)
-
- def fnmatch_lines(self, lines2):
- """Search the text for matching lines.
-
- The argument is a list of lines which have to match and can
- use glob wildcards. If they do not match an pytest.fail() is
- called. The matches and non-matches are also printed on
- stdout.
-
- """
- def show(arg1, arg2):
- py.builtin.print_(arg1, arg2, file=sys.stderr)
- lines2 = self._getlines(lines2)
- lines1 = self.lines[:]
- nextline = None
- extralines = []
- __tracebackhide__ = True
- for line in lines2:
- nomatchprinted = False
- while lines1:
- nextline = lines1.pop(0)
- if line == nextline:
- show("exact match:", repr(line))
- break
- elif fnmatch(nextline, line):
- show("fnmatch:", repr(line))
- show(" with:", repr(nextline))
- break
- else:
- if not nomatchprinted:
- show("nomatch:", repr(line))
- nomatchprinted = True
- show(" and:", repr(nextline))
- extralines.append(nextline)
- else:
- pytest.fail("remains unmatched: %r, see stderr" % (line,))