Skip to content

Commit 5e7a64f

Browse files
committed
Manually resolves links relatively to root_dir, and prevent escape
This patch is a followup of #311 (2a89f76). It appeared that we were not resolving paths when reading from files. This means that a symbolic link present under `root_dir` could be blindly followed _outside_ of `root_dir`, possibly leading to host files. Note : this patch **only** changes `root_dir` behavior.
1 parent 791fc0d commit 5e7a64f

File tree

14 files changed

+162
-11
lines changed

14 files changed

+162
-11
lines changed

src/distro/distro.py

+99-11
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,80 @@ def uname_attr(self, attribute: str) -> str:
10911091
"""
10921092
return self._uname_info.get(attribute, "")
10931093

1094+
@staticmethod
1095+
def __abs_path_join(root_path: str, abs_path: str) -> str:
1096+
rel_path = os.path.splitdrive(abs_path)[1].lstrip(os.sep)
1097+
if os.altsep is not None:
1098+
rel_path = rel_path.lstrip(os.altsep)
1099+
1100+
return os.path.join(root_path, rel_path)
1101+
1102+
def __resolve_chroot_symlink_as_needed(self, link_location: str) -> str:
1103+
"""
1104+
Resolves a potential symlink in ``link_location`` against
1105+
``self.root_dir`` if inside the chroot, else just return the original
1106+
path.
1107+
We're doing this check at a central place, to making the calling code
1108+
more readable and to de-duplicate.
1109+
"""
1110+
if self.root_dir is None:
1111+
return link_location
1112+
1113+
# resolve `self.root_dir`, once and for all.
1114+
root_dir = os.path.realpath(self.root_dir)
1115+
1116+
# consider non-absolute `link_location` relative to `root_dir` (as
1117+
# `os.path.commonpath` does not support mixing absolute and relative
1118+
# paths).
1119+
if not os.path.isabs(link_location):
1120+
link_location = self.__abs_path_join(root_dir, link_location)
1121+
1122+
seen_paths = set()
1123+
while True:
1124+
# while `link_location` _should_ be relative to chroot (either
1125+
# passed from trusted code or already resolved by previous loop
1126+
# iteration), we enforce this check as `self.os_release_file` and
1127+
# `self.distro_release_file` may be user-supplied.
1128+
if os.path.commonpath([root_dir, link_location]) != root_dir:
1129+
raise FileNotFoundError
1130+
1131+
if not os.path.islink(link_location):
1132+
# assert _final_ path is actually inside chroot (this is
1133+
# required to address `..` usages, potentially leading to
1134+
# outside, after subsequent link resolutions).
1135+
if (
1136+
os.path.commonpath([root_dir, os.path.realpath(link_location)])
1137+
!= root_dir
1138+
):
1139+
raise FileNotFoundError
1140+
1141+
return link_location
1142+
1143+
resolved = os.readlink(link_location)
1144+
if not os.path.isabs(resolved):
1145+
# compute resolved path relatively to previous `link_location`
1146+
# and accordingly to chroot. We also canonize "top" `..`
1147+
# components (relatively to `root_dir`), as they would
1148+
# legitimately resolve to chroot itself).
1149+
resolved = os.path.relpath(
1150+
os.path.join(os.path.dirname(link_location), resolved),
1151+
start=root_dir,
1152+
).lstrip(
1153+
os.pardir
1154+
+ os.pathsep
1155+
+ (os.altsep if os.altsep is not None else "")
1156+
)
1157+
1158+
# "move" back (absolute) path inside the chroot
1159+
resolved = self.__abs_path_join(root_dir, resolved)
1160+
1161+
# prevent symlinks infinite loop
1162+
if resolved in seen_paths:
1163+
raise FileNotFoundError
1164+
1165+
seen_paths.add(link_location)
1166+
link_location = resolved
1167+
10941168
@cached_property
10951169
def _os_release_info(self) -> Dict[str, str]:
10961170
"""
@@ -1099,10 +1173,14 @@ def _os_release_info(self) -> Dict[str, str]:
10991173
Returns:
11001174
A dictionary containing all information items.
11011175
"""
1102-
if os.path.isfile(self.os_release_file):
1103-
with open(self.os_release_file, encoding="utf-8") as release_file:
1176+
try:
1177+
with open(
1178+
self.__resolve_chroot_symlink_as_needed(self.os_release_file),
1179+
encoding="utf-8",
1180+
) as release_file:
11041181
return self._parse_os_release_content(release_file)
1105-
return {}
1182+
except FileNotFoundError:
1183+
return {}
11061184

11071185
@staticmethod
11081186
def _parse_os_release_content(lines: TextIO) -> Dict[str, str]:
@@ -1223,7 +1301,10 @@ def _oslevel_info(self) -> str:
12231301
def _debian_version(self) -> str:
12241302
try:
12251303
with open(
1226-
os.path.join(self.etc_dir, "debian_version"), encoding="ascii"
1304+
self.__resolve_chroot_symlink_as_needed(
1305+
os.path.join(self.etc_dir, "debian_version")
1306+
),
1307+
encoding="ascii",
12271308
) as fp:
12281309
return fp.readline().rstrip()
12291310
except FileNotFoundError:
@@ -1233,7 +1314,10 @@ def _debian_version(self) -> str:
12331314
def _armbian_version(self) -> str:
12341315
try:
12351316
with open(
1236-
os.path.join(self.etc_dir, "armbian-release"), encoding="ascii"
1317+
self.__resolve_chroot_symlink_as_needed(
1318+
os.path.join(self.etc_dir, "armbian-release")
1319+
),
1320+
encoding="ascii",
12371321
) as fp:
12381322
return self._parse_os_release_content(fp).get("version", "")
12391323
except FileNotFoundError:
@@ -1285,9 +1369,10 @@ def _distro_release_info(self) -> Dict[str, str]:
12851369
try:
12861370
basenames = [
12871371
basename
1288-
for basename in os.listdir(self.etc_dir)
1372+
for basename in os.listdir(
1373+
self.__resolve_chroot_symlink_as_needed(self.etc_dir)
1374+
)
12891375
if basename not in _DISTRO_RELEASE_IGNORE_BASENAMES
1290-
and os.path.isfile(os.path.join(self.etc_dir, basename))
12911376
]
12921377
# We sort for repeatability in cases where there are multiple
12931378
# distro specific files; e.g. CentOS, Oracle, Enterprise all
@@ -1303,12 +1388,13 @@ def _distro_release_info(self) -> Dict[str, str]:
13031388
match = _DISTRO_RELEASE_BASENAME_PATTERN.match(basename)
13041389
if match is None:
13051390
continue
1306-
filepath = os.path.join(self.etc_dir, basename)
1307-
distro_info = self._parse_distro_release_file(filepath)
1391+
# NOTE: _parse_distro_release_file below will be resolving for us
1392+
unresolved_filepath = os.path.join(self.etc_dir, basename)
1393+
distro_info = self._parse_distro_release_file(unresolved_filepath)
13081394
# The name is always present if the pattern matches.
13091395
if "name" not in distro_info:
13101396
continue
1311-
self.distro_release_file = filepath
1397+
self.distro_release_file = unresolved_filepath
13121398
break
13131399
else: # the loop didn't "break": no candidate.
13141400
return {}
@@ -1342,7 +1428,9 @@ def _parse_distro_release_file(self, filepath: str) -> Dict[str, str]:
13421428
A dictionary containing all information items.
13431429
"""
13441430
try:
1345-
with open(filepath, encoding="utf-8") as fp:
1431+
with open(
1432+
self.__resolve_chroot_symlink_as_needed(filepath), encoding="utf-8"
1433+
) as fp:
13461434
# Only parse the first line. For instance, on SLES there
13471435
# are multiple lines. We don't want them...
13481436
return self._parse_distro_release_content(fp.readline())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/tmp/another-link
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/usr/lib/os-release
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ID=absolute_symlinks
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../../../distros/debian8/etc/os-release
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../usr/lib/os-release
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/tmp/another-link/../../../../../../usr/lib/os-release
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
nested/nested

tests/resources/testdistros/distro/root_dir_non_escape/tmp/nested/nested/.gitkeep

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ID=root_dir_non_escape
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ID=root_dir_os_release_file
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
os-release-symlink
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
os-release

tests/test_distro.py

+52
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,58 @@ def test_empty_release(self) -> None:
771771
desired_outcome = {"id": "empty"}
772772
self._test_outcome(desired_outcome)
773773

774+
def test_root_dir_os_release_file_relative(self) -> None:
775+
self.distro = distro.LinuxDistribution(
776+
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_os_release_file"),
777+
os_release_file="tmp/os-release",
778+
)
779+
desired_outcome = {"id": "root_dir_os_release_file"}
780+
self._test_outcome(desired_outcome)
781+
782+
def test_root_dir_os_release_file_absolute(self) -> None:
783+
self.distro = distro.LinuxDistribution(
784+
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_os_release_file"),
785+
os_release_file="/tmp/os-release",
786+
)
787+
# as we honor `os_release_file`, loading existing file outside of root_dir has
788+
# been prevented (empty data)
789+
self._test_outcome({})
790+
791+
def test_root_dir_absolute_symlinks(self) -> None:
792+
self.distro = distro.LinuxDistribution(
793+
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_absolute_symlinks")
794+
)
795+
desired_outcome = {"id": "absolute_symlinks"}
796+
self._test_outcome(desired_outcome)
797+
798+
def test_root_dir_escape(self) -> None:
799+
self.distro = distro.LinuxDistribution(
800+
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_escape")
801+
)
802+
# loading existing file outside of root_dir has been prevented (empty data)
803+
self._test_outcome({})
804+
805+
def test_root_dir_escape_abs(self) -> None:
806+
self.distro = distro.LinuxDistribution(
807+
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_escape_abs")
808+
)
809+
# loading existing file outside of root_dir has been prevented (empty data)
810+
self._test_outcome({})
811+
812+
def test_root_dir_non_escape(self) -> None:
813+
self.distro = distro.LinuxDistribution(
814+
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_non_escape")
815+
)
816+
desired_outcome = {"id": "root_dir_non_escape"}
817+
self._test_outcome(desired_outcome)
818+
819+
def test_root_dir_symlinks_loop(self) -> None:
820+
self.distro = distro.LinuxDistribution(
821+
root_dir=os.path.join(TESTDISTROS, "distro", "root_dir_symlinks_loop")
822+
)
823+
# due to symbolic links loop, loading of file has been prevented (empty data)
824+
self._test_outcome({})
825+
774826
def test_dontincludeuname(self) -> None:
775827
self._setup_for_distro(os.path.join(TESTDISTROS, "distro", "dontincludeuname"))
776828

0 commit comments

Comments
 (0)