qemu/python/scripts/mkvenv.py
Paolo Bonzini fc00123f3a python: mkvenv: remove ensure command
This was used to bootstrap the venv with a TOML parser, after which
ensuregroup is used.  Now that we expect it to be present as a system
package (either tomli or, for Python 3.11, tomllib), it is not needed
anymore.

Note that this means that, when implemented, the hypothetical "isolated"
mode that does not use any system packages will only work with Python
3.11+.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
2024-06-08 10:33:39 +02:00

884 lines
27 KiB
Python

"""
mkvenv - QEMU pyvenv bootstrapping utility
usage: mkvenv [-h] command ...
QEMU pyvenv bootstrapping utility
options:
-h, --help show this help message and exit
Commands:
command Description
create create a venv
post_init
post-venv initialization
ensuregroup
Ensure that the specified package group is installed.
--------------------------------------------------
usage: mkvenv create [-h] target
positional arguments:
target Target directory to install virtual environment into.
options:
-h, --help show this help message and exit
--------------------------------------------------
usage: mkvenv post_init [-h]
options:
-h, --help show this help message and exit
--------------------------------------------------
usage: mkvenv ensuregroup [-h] [--online] [--dir DIR] file group...
positional arguments:
file pointer to a TOML file
group section name in the TOML file
options:
-h, --help show this help message and exit
--online Install packages from PyPI, if necessary.
--dir DIR Path to vendored packages where we may install from.
"""
# Copyright (C) 2022-2023 Red Hat, Inc.
#
# Authors:
# John Snow <jsnow@redhat.com>
# Paolo Bonzini <pbonzini@redhat.com>
#
# This work is licensed under the terms of the GNU GPL, version 2 or
# later. See the COPYING file in the top-level directory.
import argparse
from importlib.metadata import (
Distribution,
EntryPoint,
PackageNotFoundError,
distribution,
version,
)
from importlib.util import find_spec
import logging
import os
from pathlib import Path
import re
import shutil
import site
import subprocess
import sys
import sysconfig
from types import SimpleNamespace
from typing import (
Any,
Dict,
Iterator,
Optional,
Sequence,
Tuple,
Union,
)
import venv
# Try to load distlib, with a fallback to pip's vendored version.
# HAVE_DISTLIB is checked below, just-in-time, so that mkvenv does not fail
# outside the venv or before a potential call to ensurepip in checkpip().
HAVE_DISTLIB = True
try:
import distlib.scripts
import distlib.version
except ImportError:
try:
# Reach into pip's cookie jar. pylint and flake8 don't understand
# that these imports will be used via distlib.xxx.
from pip._vendor import distlib
import pip._vendor.distlib.scripts # noqa, pylint: disable=unused-import
import pip._vendor.distlib.version # noqa, pylint: disable=unused-import
except ImportError:
HAVE_DISTLIB = False
# Try to load tomllib, with a fallback to tomli.
# HAVE_TOMLLIB is checked below, just-in-time, so that mkvenv does not fail
# outside the venv or before a potential call to ensurepip in checkpip().
HAVE_TOMLLIB = True
try:
import tomllib
except ImportError:
try:
import tomli as tomllib
except ImportError:
HAVE_TOMLLIB = False
# Do not add any mandatory dependencies from outside the stdlib:
# This script *must* be usable standalone!
DirType = Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"]
logger = logging.getLogger("mkvenv")
def inside_a_venv() -> bool:
"""Returns True if it is executed inside of a virtual environment."""
return sys.prefix != sys.base_prefix
class Ouch(RuntimeError):
"""An Exception class we can't confuse with a builtin."""
class QemuEnvBuilder(venv.EnvBuilder):
"""
An extension of venv.EnvBuilder for building QEMU's configure-time venv.
The primary difference is that it emulates a "nested" virtual
environment when invoked from inside of an existing virtual
environment by including packages from the parent. Also,
"ensurepip" is replaced if possible with just recreating pip's
console_scripts inside the virtual environment.
Parameters for base class init:
- system_site_packages: bool = False
- clear: bool = False
- symlinks: bool = False
- upgrade: bool = False
- with_pip: bool = False
- prompt: Optional[str] = None
- upgrade_deps: bool = False (Since 3.9)
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
logger.debug("QemuEnvBuilder.__init__(...)")
# For nested venv emulation:
self.use_parent_packages = False
if inside_a_venv():
# Include parent packages only if we're in a venv and
# system_site_packages was True.
self.use_parent_packages = kwargs.pop(
"system_site_packages", False
)
# Include system_site_packages only when the parent,
# The venv we are currently in, also does so.
kwargs["system_site_packages"] = sys.base_prefix in site.PREFIXES
# ensurepip is slow: venv creation can be very fast for cases where
# we allow the use of system_site_packages. Therefore, ensurepip is
# replaced with our own script generation once the virtual environment
# is setup.
self.want_pip = kwargs.get("with_pip", False)
if self.want_pip:
if (
kwargs.get("system_site_packages", False)
and not need_ensurepip()
):
kwargs["with_pip"] = False
else:
check_ensurepip()
super().__init__(*args, **kwargs)
# Make the context available post-creation:
self._context: Optional[SimpleNamespace] = None
def get_parent_libpath(self) -> Optional[str]:
"""Return the libpath of the parent venv, if applicable."""
if self.use_parent_packages:
return sysconfig.get_path("purelib")
return None
@staticmethod
def compute_venv_libpath(context: SimpleNamespace) -> str:
"""
Compatibility wrapper for context.lib_path for Python < 3.12
"""
# Python 3.12+, not strictly necessary because it's documented
# to be the same as 3.10 code below:
if sys.version_info >= (3, 12):
return context.lib_path
# Python 3.10+
if "venv" in sysconfig.get_scheme_names():
lib_path = sysconfig.get_path(
"purelib", scheme="venv", vars={"base": context.env_dir}
)
assert lib_path is not None
return lib_path
# For Python <= 3.9 we need to hardcode this. Fortunately the
# code below was the same in Python 3.6-3.10, so there is only
# one case.
if sys.platform == "win32":
return os.path.join(context.env_dir, "Lib", "site-packages")
return os.path.join(
context.env_dir,
"lib",
"python%d.%d" % sys.version_info[:2],
"site-packages",
)
def ensure_directories(self, env_dir: DirType) -> SimpleNamespace:
logger.debug("ensure_directories(env_dir=%s)", env_dir)
self._context = super().ensure_directories(env_dir)
return self._context
def create(self, env_dir: DirType) -> None:
logger.debug("create(env_dir=%s)", env_dir)
super().create(env_dir)
assert self._context is not None
self.post_post_setup(self._context)
def post_post_setup(self, context: SimpleNamespace) -> None:
"""
The final, final hook. Enter the venv and run commands inside of it.
"""
if self.use_parent_packages:
# We're inside of a venv and we want to include the parent
# venv's packages.
parent_libpath = self.get_parent_libpath()
assert parent_libpath is not None
logger.debug("parent_libpath: %s", parent_libpath)
our_libpath = self.compute_venv_libpath(context)
logger.debug("our_libpath: %s", our_libpath)
pth_file = os.path.join(our_libpath, "nested.pth")
with open(pth_file, "w", encoding="UTF-8") as file:
file.write(parent_libpath + os.linesep)
if self.want_pip:
args = [
context.env_exe,
__file__,
"post_init",
]
subprocess.run(args, check=True)
def get_value(self, field: str) -> str:
"""
Get a string value from the context namespace after a call to build.
For valid field names, see:
https://docs.python.org/3/library/venv.html#venv.EnvBuilder.ensure_directories
"""
ret = getattr(self._context, field)
assert isinstance(ret, str)
return ret
def need_ensurepip() -> bool:
"""
Tests for the presence of setuptools and pip.
:return: `True` if we do not detect both packages.
"""
# Don't try to actually import them, it's fraught with danger:
# https://github.com/pypa/setuptools/issues/2993
if find_spec("setuptools") and find_spec("pip"):
return False
return True
def check_ensurepip() -> None:
"""
Check that we have ensurepip.
Raise a fatal exception with a helpful hint if it isn't available.
"""
if not find_spec("ensurepip"):
msg = (
"Python's ensurepip module is not found.\n"
"It's normally part of the Python standard library, "
"maybe your distribution packages it separately?\n"
"Either install ensurepip, or alleviate the need for it in the "
"first place by installing pip and setuptools for "
f"'{sys.executable}'.\n"
"(Hint: Debian puts ensurepip in its python3-venv package.)"
)
raise Ouch(msg)
# ensurepip uses pyexpat, which can also go missing on us:
if not find_spec("pyexpat"):
msg = (
"Python's pyexpat module is not found.\n"
"It's normally part of the Python standard library, "
"maybe your distribution packages it separately?\n"
"Either install pyexpat, or alleviate the need for it in the "
"first place by installing pip and setuptools for "
f"'{sys.executable}'.\n\n"
"(Hint: NetBSD's pkgsrc debundles this to e.g. 'py310-expat'.)"
)
raise Ouch(msg)
def make_venv( # pylint: disable=too-many-arguments
env_dir: Union[str, Path],
system_site_packages: bool = False,
clear: bool = True,
symlinks: Optional[bool] = None,
with_pip: bool = True,
) -> None:
"""
Create a venv using `QemuEnvBuilder`.
This is analogous to the `venv.create` module-level convenience
function that is part of the Python stdblib, except it uses
`QemuEnvBuilder` instead.
:param env_dir: The directory to create/install to.
:param system_site_packages:
Allow inheriting packages from the system installation.
:param clear: When True, fully remove any prior venv and files.
:param symlinks:
Whether to use symlinks to the target interpreter or not. If
left unspecified, it will use symlinks except on Windows to
match behavior with the "venv" CLI tool.
:param with_pip:
Whether to install "pip" binaries or not.
"""
logger.debug(
"%s: make_venv(env_dir=%s, system_site_packages=%s, "
"clear=%s, symlinks=%s, with_pip=%s)",
__file__,
str(env_dir),
system_site_packages,
clear,
symlinks,
with_pip,
)
if symlinks is None:
# Default behavior of standard venv CLI
symlinks = os.name != "nt"
builder = QemuEnvBuilder(
system_site_packages=system_site_packages,
clear=clear,
symlinks=symlinks,
with_pip=with_pip,
)
style = "non-isolated" if builder.system_site_packages else "isolated"
nested = ""
if builder.use_parent_packages:
nested = f"(with packages from '{builder.get_parent_libpath()}') "
print(
f"mkvenv: Creating {style} virtual environment"
f" {nested}at '{str(env_dir)}'",
file=sys.stderr,
)
try:
logger.debug("Invoking builder.create()")
try:
builder.create(str(env_dir))
except SystemExit as exc:
# Some versions of the venv module raise SystemExit; *nasty*!
# We want the exception that prompted it. It might be a subprocess
# error that has output we *really* want to see.
logger.debug("Intercepted SystemExit from EnvBuilder.create()")
raise exc.__cause__ or exc.__context__ or exc
logger.debug("builder.create() finished")
except subprocess.CalledProcessError as exc:
logger.error("mkvenv subprocess failed:")
logger.error("cmd: %s", exc.cmd)
logger.error("returncode: %d", exc.returncode)
def _stringify(data: Union[str, bytes]) -> str:
if isinstance(data, bytes):
return data.decode()
return data
lines = []
if exc.stdout:
lines.append("========== stdout ==========")
lines.append(_stringify(exc.stdout))
lines.append("============================")
if exc.stderr:
lines.append("========== stderr ==========")
lines.append(_stringify(exc.stderr))
lines.append("============================")
if lines:
logger.error(os.linesep.join(lines))
raise Ouch("VENV creation subprocess failed.") from exc
# print the python executable to stdout for configure.
print(builder.get_value("env_exe"))
def _get_entry_points(packages: Sequence[str]) -> Iterator[str]:
def _generator() -> Iterator[str]:
for package in packages:
try:
entry_points: Iterator[EntryPoint] = \
iter(distribution(package).entry_points)
except PackageNotFoundError:
continue
# The EntryPoints type is only available in 3.10+,
# treat this as a vanilla list and filter it ourselves.
entry_points = filter(
lambda ep: ep.group == "console_scripts", entry_points
)
for entry_point in entry_points:
yield f"{entry_point.name} = {entry_point.value}"
return _generator()
def generate_console_scripts(
packages: Sequence[str],
python_path: Optional[str] = None,
bin_path: Optional[str] = None,
) -> None:
"""
Generate script shims for console_script entry points in @packages.
"""
if python_path is None:
python_path = sys.executable
if bin_path is None:
bin_path = sysconfig.get_path("scripts")
assert bin_path is not None
logger.debug(
"generate_console_scripts(packages=%s, python_path=%s, bin_path=%s)",
packages,
python_path,
bin_path,
)
if not packages:
return
maker = distlib.scripts.ScriptMaker(None, bin_path)
maker.variants = {""}
maker.clobber = False
for entry_point in _get_entry_points(packages):
for filename in maker.make(entry_point):
logger.debug("wrote console_script '%s'", filename)
def pkgname_from_depspec(dep_spec: str) -> str:
"""
Parse package name out of a PEP-508 depspec.
See https://peps.python.org/pep-0508/#names
"""
match = re.match(
r"^([A-Z0-9]([A-Z0-9._-]*[A-Z0-9])?)", dep_spec, re.IGNORECASE
)
if not match:
raise ValueError(
f"dep_spec '{dep_spec}'"
" does not appear to contain a valid package name"
)
return match.group(0)
def _path_is_prefix(prefix: Optional[str], path: str) -> bool:
try:
return (
prefix is not None and os.path.commonpath([prefix, path]) == prefix
)
except ValueError:
return False
def _is_system_package(dist: Distribution) -> bool:
path = str(dist.locate_file("."))
return not (
_path_is_prefix(sysconfig.get_path("purelib"), path)
or _path_is_prefix(sysconfig.get_path("platlib"), path)
)
def diagnose(
dep_spec: str,
online: bool,
wheels_dir: Optional[Union[str, Path]],
prog: Optional[str],
) -> Tuple[str, bool]:
"""
Offer a summary to the user as to why a package failed to be installed.
:param dep_spec: The package we tried to ensure, e.g. 'meson>=0.61.5'
:param online: Did we allow PyPI access?
:param prog:
Optionally, a shell program name that can be used as a
bellwether to detect if this program is installed elsewhere on
the system. This is used to offer advice when a program is
detected for a different python version.
:param wheels_dir:
Optionally, a directory that was searched for vendored packages.
"""
# pylint: disable=too-many-branches
# Some errors are not particularly serious
bad = False
pkg_name = pkgname_from_depspec(dep_spec)
pkg_version: Optional[str] = None
try:
pkg_version = version(pkg_name)
except PackageNotFoundError:
pass
lines = []
if pkg_version:
lines.append(
f"Python package '{pkg_name}' version '{pkg_version}' was found,"
" but isn't suitable."
)
else:
lines.append(
f"Python package '{pkg_name}' was not found nor installed."
)
if wheels_dir:
lines.append(
"No suitable version found in, or failed to install from"
f" '{wheels_dir}'."
)
bad = True
if online:
lines.append("A suitable version could not be obtained from PyPI.")
bad = True
else:
lines.append(
"mkvenv was configured to operate offline and did not check PyPI."
)
if prog and not pkg_version:
which = shutil.which(prog)
if which:
if sys.base_prefix in site.PREFIXES:
pypath = Path(sys.executable).resolve()
lines.append(
f"'{prog}' was detected on your system at '{which}', "
f"but the Python package '{pkg_name}' was not found by "
f"this Python interpreter ('{pypath}'). "
f"Typically this means that '{prog}' has been installed "
"against a different Python interpreter on your system."
)
else:
lines.append(
f"'{prog}' was detected on your system at '{which}', "
"but the build is using an isolated virtual environment."
)
bad = True
lines = [f"{line}" for line in lines]
if bad:
lines.insert(0, f"Could not provide build dependency '{dep_spec}':")
else:
lines.insert(0, f"'{dep_spec}' not found:")
return os.linesep.join(lines), bad
def pip_install(
args: Sequence[str],
online: bool = False,
wheels_dir: Optional[Union[str, Path]] = None,
) -> None:
"""
Use pip to install a package or package(s) as specified in @args.
"""
loud = bool(
os.environ.get("DEBUG")
or os.environ.get("GITLAB_CI")
or os.environ.get("V")
)
full_args = [
sys.executable,
"-m",
"pip",
"install",
"--disable-pip-version-check",
"-v" if loud else "-q",
]
if not online:
full_args += ["--no-index"]
if wheels_dir:
full_args += ["--find-links", f"file://{str(wheels_dir)}"]
full_args += list(args)
subprocess.run(
full_args,
check=True,
)
def _make_version_constraint(info: Dict[str, str], install: bool) -> str:
"""
Construct the version constraint part of a PEP 508 dependency
specification (for example '>=0.61.5') from the accepted and
installed keys of the provided dictionary.
:param info: A dictionary corresponding to a TOML key-value list.
:param install: True generates install constraints, False generates
presence constraints
"""
if install and "installed" in info:
return "==" + info["installed"]
dep_spec = info.get("accepted", "")
dep_spec = dep_spec.strip()
# Double check that they didn't just use a version number
if dep_spec and dep_spec[0] not in "!~><=(":
raise Ouch(
"invalid dependency specifier " + dep_spec + " in dependency file"
)
return dep_spec
def _do_ensure(
group: Dict[str, Dict[str, str]],
online: bool = False,
wheels_dir: Optional[Union[str, Path]] = None,
) -> Optional[Tuple[str, bool]]:
"""
Use pip to ensure we have the packages specified in @group.
If the packages are already installed, do nothing. If online and
wheels_dir are both provided, prefer packages found in wheels_dir
first before connecting to PyPI.
:param group: A dictionary of dictionaries, corresponding to a
section in a pythondeps.toml file.
:param online: If True, fall back to PyPI.
:param wheels_dir: If specified, search this path for packages.
"""
absent = []
present = []
canary = None
for name, info in group.items():
constraint = _make_version_constraint(info, False)
matcher = distlib.version.LegacyMatcher(name + constraint)
print(f"mkvenv: checking for {matcher}", file=sys.stderr)
dist: Optional[Distribution] = None
try:
dist = distribution(matcher.name)
except PackageNotFoundError:
pass
if (
dist is None
# Always pass installed package to pip, so that they can be
# updated if the requested version changes
or not _is_system_package(dist)
or not matcher.match(distlib.version.LegacyVersion(dist.version))
):
absent.append(name + _make_version_constraint(info, True))
if len(absent) == 1:
canary = info.get("canary", None)
else:
logger.info("found %s %s", name, dist.version)
present.append(name)
if present:
generate_console_scripts(present)
if absent:
if online or wheels_dir:
# Some packages are missing or aren't a suitable version,
# install a suitable (possibly vendored) package.
print(f"mkvenv: installing {', '.join(absent)}", file=sys.stderr)
try:
pip_install(args=absent, online=online, wheels_dir=wheels_dir)
return None
except subprocess.CalledProcessError:
pass
return diagnose(
absent[0],
online,
wheels_dir,
canary,
)
return None
def _parse_groups(file: str) -> Dict[str, Dict[str, Any]]:
if not HAVE_TOMLLIB:
if sys.version_info < (3, 11):
raise Ouch("found no usable tomli, please install it")
raise Ouch(
"Python >=3.11 does not have tomllib... what have you done!?"
)
# Use loads() to support both tomli v1.2.x (Ubuntu 22.04,
# Debian bullseye-backports) and v2.0.x
with open(file, "r", encoding="ascii") as depfile:
contents = depfile.read()
return tomllib.loads(contents) # type: ignore
def ensure_group(
file: str,
groups: Sequence[str],
online: bool = False,
wheels_dir: Optional[Union[str, Path]] = None,
) -> None:
"""
Use pip to ensure we have the package specified by @dep_specs.
If the package is already installed, do nothing. If online and
wheels_dir are both provided, prefer packages found in wheels_dir
first before connecting to PyPI.
:param dep_specs:
PEP 508 dependency specifications. e.g. ['meson>=0.61.5'].
:param online: If True, fall back to PyPI.
:param wheels_dir: If specified, search this path for packages.
"""
if not HAVE_DISTLIB:
raise Ouch("found no usable distlib, please install it")
parsed_deps = _parse_groups(file)
to_install: Dict[str, Dict[str, str]] = {}
for group in groups:
try:
to_install.update(parsed_deps[group])
except KeyError as exc:
raise Ouch(f"group {group} not defined") from exc
result = _do_ensure(to_install, online, wheels_dir)
if result:
# Well, that's not good.
if result[1]:
raise Ouch(result[0])
raise SystemExit(f"\n{result[0]}\n\n")
def post_venv_setup() -> None:
"""
This is intended to be run *inside the venv* after it is created.
"""
logger.debug("post_venv_setup()")
# Generate a 'pip' script so the venv is usable in a normal
# way from the CLI. This only happens when we inherited pip from a
# parent/system-site and haven't run ensurepip in some way.
generate_console_scripts(["pip"])
def _add_create_subcommand(subparsers: Any) -> None:
subparser = subparsers.add_parser("create", help="create a venv")
subparser.add_argument(
"target",
type=str,
action="store",
help="Target directory to install virtual environment into.",
)
def _add_post_init_subcommand(subparsers: Any) -> None:
subparsers.add_parser("post_init", help="post-venv initialization")
def _add_ensuregroup_subcommand(subparsers: Any) -> None:
subparser = subparsers.add_parser(
"ensuregroup",
help="Ensure that the specified package group is installed.",
)
subparser.add_argument(
"--online",
action="store_true",
help="Install packages from PyPI, if necessary.",
)
subparser.add_argument(
"--dir",
type=str,
action="store",
help="Path to vendored packages where we may install from.",
)
subparser.add_argument(
"file",
type=str,
action="store",
help=("Path to a TOML file describing package groups"),
)
subparser.add_argument(
"group",
type=str,
action="store",
help="One or more package group names",
nargs="+",
)
def main() -> int:
"""CLI interface to make_qemu_venv. See module docstring."""
if os.environ.get("DEBUG") or os.environ.get("GITLAB_CI"):
# You're welcome.
logging.basicConfig(level=logging.DEBUG)
else:
if os.environ.get("V"):
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(
prog="mkvenv",
description="QEMU pyvenv bootstrapping utility",
)
subparsers = parser.add_subparsers(
title="Commands",
dest="command",
required=True,
metavar="command",
help="Description",
)
_add_create_subcommand(subparsers)
_add_post_init_subcommand(subparsers)
_add_ensuregroup_subcommand(subparsers)
args = parser.parse_args()
try:
if args.command == "create":
make_venv(
args.target,
system_site_packages=True,
clear=True,
)
if args.command == "post_init":
post_venv_setup()
if args.command == "ensuregroup":
ensure_group(
file=args.file,
groups=args.group,
online=args.online,
wheels_dir=args.dir,
)
logger.debug("mkvenv.py %s: exiting", args.command)
except Ouch as exc:
print("\n*** Ouch! ***\n", file=sys.stderr)
print(str(exc), "\n\n", file=sys.stderr)
return 1
except SystemExit:
raise
except: # pylint: disable=bare-except
logger.exception("mkvenv did not complete successfully:")
return 2
return 0
if __name__ == "__main__":
sys.exit(main())