Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 130 additions & 18 deletions src/distro/distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import subprocess
import sys
import warnings
from pathlib import Path
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -795,9 +796,15 @@ def __init__(
self.usr_lib_dir, _OS_RELEASE_BASENAME
)

def __isfile(path: str) -> bool:
try:
return os.path.isfile(self._resolve_path_relatively_to_chroot(path))
except FileNotFoundError:
return False

# NOTE: The idea is to respect order **and** have it set
# at all times for API backwards compatibility.
if os.path.isfile(etc_dir_os_release_file) or not os.path.isfile(
if __isfile(etc_dir_os_release_file) or not __isfile(
usr_lib_os_release_file
):
self.os_release_file = etc_dir_os_release_file
Expand Down Expand Up @@ -1152,6 +1159,101 @@ def _uname_attr(self, attribute: str) -> str:
"""
return self._uname_info.get(attribute, "")

@staticmethod
def __abs_path_join(root_path: Path, abs_path: Path) -> Path:
rel_path = os.path.splitdrive(abs_path)[1].lstrip(os.sep)
if os.altsep is not None:
rel_path = rel_path.lstrip(os.altsep)

return root_path / Path(rel_path)

def _resolve_path_relatively_to_chroot(self, path: str) -> Path:
"""
Resolves any encountered symbolic links in ``path`` relatively to
``self.root_dir``, if defined. Otherwise it would simply return
original ``path``.
This function could be considered as a "soft-chroot" implementation.
We're doing this check at a central place, to make calling code more readable
and to de-duplicate.

Raises:

* :py:exc:`FileNotFoundError`: ``path`` doesn't resolve in chroot, or resolving
it lead to symbolic links loop

Examples :

* if root_dir="/path/to/chroot" and path="folder/../../../../etc/os-release"
with "etc" resolving to "/mnt/disk/etc" and "os-release" to
"../../usr/lib/os-release", this function returns
"/path/to/chroot/mnt/usr/lib/os-release"

* if root_dir=None and path="/path/to/os-release", this function returns
"/path/to/os-release"
"""
path_to_resolve = Path(path)

if self.root_dir is None:
return path_to_resolve

# resolve `self.root_dir` once and for all
chroot_path = Path(self.root_dir).resolve()

# consider non-absolute `path_to_resolve` relative to chroot
if not path_to_resolve.is_absolute():
path_to_resolve = chroot_path / path_to_resolve

seen_paths = set()
while True:
# although `path_to_resolve` _should_ be relative to chroot (either
# passed from trusted code or already resolved by previous loop
# iteration), we enforce this check as some inputs are available through API
try:
relative_parts = path_to_resolve.relative_to(chroot_path).parts
except ValueError:
raise FileNotFoundError

# iterate over (relative) path segments and try to resolve each one of them
for i, part in enumerate(relative_parts, start=1):
if part == os.pardir:
# normalize path parts up to this segment (relatively to chroot)
path_to_resolve = self.__abs_path_join(
chroot_path,
Path(os.path.normpath("/" / Path(*relative_parts[:i]))),
) / Path(*relative_parts[i:])
break # restart path resolution as path has just been normalized

# attempt symbolic link resolution on current path segment
symlink_candidate = chroot_path / Path(*relative_parts[:i])
try:
symlink_resolved = Path(os.readlink(symlink_candidate))
except (
AttributeError, # `readlink` isn't supported by system
OSError, # not a symlink, go to next path segment
):
continue

# "bend" **absolute** resolved path inside the chroot
# consider **non-absolute** resolved path relatively to chroot
if symlink_resolved.is_absolute():
path_to_resolve = self.__abs_path_join(
chroot_path, symlink_resolved
)
else:
path_to_resolve = symlink_candidate.parent / symlink_resolved

# append remaining path segments to resolved path
path_to_resolve /= Path(*relative_parts[i:])
break # restart path resolution as a symlink has just been resolved
else:
# `path_to_resolve` can be considered resolved, return it
return path_to_resolve

# prevent symlinks infinite loop by tracking successive resolutions
if path_to_resolve in seen_paths:
raise FileNotFoundError
seen_paths.add(path_to_resolve)

@cached_property
def _os_release_info(self) -> Dict[str, str]:
"""
Expand All @@ -1160,10 +1262,14 @@ def _os_release_info(self) -> Dict[str, str]:
Returns:
A dictionary containing all information items.
"""
if os.path.isfile(self.os_release_file):
with open(self.os_release_file, encoding="utf-8") as release_file:
try:
with open(
self._resolve_path_relatively_to_chroot(self.os_release_file),
encoding="utf-8",
) as release_file:
return self._parse_os_release_content(release_file)
return {}
except FileNotFoundError:
return {}

@staticmethod
def _parse_os_release_content(lines: TextIO) -> Dict[str, str]:
Expand Down Expand Up @@ -1286,7 +1392,10 @@ def _oslevel_info(self) -> str:
def _debian_version(self) -> str:
try:
with open(
os.path.join(self.etc_dir, "debian_version"), encoding="ascii"
self._resolve_path_relatively_to_chroot(
os.path.join(self.etc_dir, "debian_version")
),
encoding="ascii",
) as fp:
return fp.readline().rstrip()
except FileNotFoundError:
Expand All @@ -1296,7 +1405,10 @@ def _debian_version(self) -> str:
def _armbian_version(self) -> str:
try:
with open(
os.path.join(self.etc_dir, "armbian-release"), encoding="ascii"
self._resolve_path_relatively_to_chroot(
os.path.join(self.etc_dir, "armbian-release")
),
encoding="ascii",
) as fp:
return self._parse_os_release_content(fp).get("version", "")
except FileNotFoundError:
Expand Down Expand Up @@ -1348,9 +1460,10 @@ def _distro_release_info(self) -> Dict[str, str]:
try:
basenames = [
basename
for basename in os.listdir(self.etc_dir)
for basename in os.listdir(
self._resolve_path_relatively_to_chroot(self.etc_dir)
)
if basename not in _DISTRO_RELEASE_IGNORE_BASENAMES
and os.path.isfile(os.path.join(self.etc_dir, basename))
]
# We sort for repeatability in cases where there are multiple
# distro specific files; e.g. CentOS, Oracle, Enterprise all
Expand All @@ -1366,12 +1479,13 @@ def _distro_release_info(self) -> Dict[str, str]:
match = _DISTRO_RELEASE_BASENAME_PATTERN.match(basename)
if match is None:
continue
filepath = os.path.join(self.etc_dir, basename)
distro_info = self._parse_distro_release_file(filepath)
# NOTE: _parse_distro_release_file below will be resolving for us
unresolved_filepath = os.path.join(self.etc_dir, basename)
distro_info = self._parse_distro_release_file(unresolved_filepath)
# The name is always present if the pattern matches.
if "name" not in distro_info:
continue
self.distro_release_file = filepath
self.distro_release_file = unresolved_filepath
break
else: # the loop didn't "break": no candidate.
return {}
Expand Down Expand Up @@ -1405,7 +1519,10 @@ def _parse_distro_release_file(self, filepath: str) -> Dict[str, str]:
A dictionary containing all information items.
"""
try:
with open(filepath, encoding="utf-8") as fp:
with open(
self._resolve_path_relatively_to_chroot(filepath),
encoding="utf-8",
) as fp:
# Only parse the first line. For instance, on SLES there
# are multiple lines. We don't want them...
return self._parse_distro_release_content(fp.readline())
Expand Down Expand Up @@ -1465,12 +1582,7 @@ def main() -> None:
args = parser.parse_args()

if args.root_dir:
dist = LinuxDistribution(
include_lsb=False,
include_uname=False,
include_oslevel=False,
root_dir=args.root_dir,
)
dist = LinuxDistribution(root_dir=args.root_dir)
else:
dist = _distro

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ID=absolute_symlinks
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ID=root_dir_non_escape
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ID=root_dir_os_release_file
56 changes: 52 additions & 4 deletions tests/test_distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ class TestOSRelease:
def setup_method(self, test_method: FunctionType) -> None:
dist = test_method.__name__.split("_")[1]
self.distro = distro.LinuxDistribution(
include_lsb=False,
distro_release_file="path-to-non-existing-file",
root_dir=os.path.join(DISTROS_DIR, dist),
)
Expand Down Expand Up @@ -559,9 +558,6 @@ def setup_method(self, test_method: FunctionType) -> None:
dist = test_method.__name__.split("_")[1]
root_dir = os.path.join(DISTROS_DIR, dist)
self.distro = distro.LinuxDistribution(
include_lsb=False,
include_uname=False,
include_oslevel=False,
os_release_file="",
distro_release_file="path-to-non-existing-file",
root_dir=root_dir,
Expand Down Expand Up @@ -793,6 +789,58 @@ def test_empty_release(self) -> None:
desired_outcome = {"id": "empty"}
self._test_outcome(desired_outcome)

def test_root_dir_os_release_file_relative(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_os_release_file"),
os_release_file="tmp/os-release",
)
desired_outcome = {"id": "root_dir_os_release_file"}
self._test_outcome(desired_outcome)

def test_root_dir_os_release_file_absolute(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_os_release_file"),
os_release_file="/tmp/os-release",
)
# as we honor `os_release_file`, loading existing file outside of root_dir has
# been prevented (empty data)
self._test_outcome({})

def test_root_dir_absolute_symlinks(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_absolute_symlinks")
)
desired_outcome = {"id": "absolute_symlinks"}
self._test_outcome(desired_outcome)

def test_root_dir_escape(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_escape")
)
# loading existing file outside of root_dir has been prevented (empty data)
self._test_outcome({})

def test_root_dir_escape_abs(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_escape_abs")
)
# loading existing file outside of root_dir has been prevented (empty data)
self._test_outcome({})

def test_root_dir_non_escape(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_non_escape")
)
desired_outcome = {"id": "root_dir_non_escape"}
self._test_outcome(desired_outcome)

def test_root_dir_symlinks_loop(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_symlinks_loop")
)
# due to symbolic links loop, loading of file has been prevented (empty data)
self._test_outcome({})

def test_dontincludeuname(self) -> None:
self._setup_for_distro(os.path.join(TESTDISTROS, "distro", "dontincludeuname"))

Expand Down