Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/macaron/build_spec_generator/common_spec/pypi_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ def resolve_fields(self, purl: PackageURL) -> None:
python_version_set: set[str] = set()
wheel_name_python_version_list: list[str] = []
wheel_name_platforms: set[str] = set()
# Precautionary fallback to default version
chronologically_likeliest_version: str = defaults.get("heuristic.pypi", "default_setuptools")

if pypi_package_json is not None:
if pypi_package_json.package_json or pypi_package_json.download(dest=""):
Expand Down Expand Up @@ -150,6 +152,9 @@ def resolve_fields(self, purl: PackageURL) -> None:
parsed_build_requires["setuptools"] = "==" + defaults.get(
"heuristic.pypi", "setuptools_version_emitting_platform_unknown"
)
chronologically_likeliest_version = (
pypi_package_json.get_chronologically_suitable_setuptools_version()
)
except SourceCodeError:
logger.debug("Could not find pure wheel matching this PURL")

Expand All @@ -165,6 +170,10 @@ def resolve_fields(self, purl: PackageURL) -> None:
requires = json_extract(content, ["build-system", "requires"], list)
if requires:
build_requires_set.update(elem.replace(" ", "") for elem in requires)
# If we cannot find [build-system] requires, we lean on the fact that setuptools
# was the de-facto build tool, and infer a setuptools version to include.
else:
build_requires_set.add(f"setuptools=={chronologically_likeliest_version}")
backend = json_extract(content, ["build-system", "build-backend"], str)
if backend:
build_backends_set.add(backend.replace(" ", ""))
Expand All @@ -177,6 +186,10 @@ def resolve_fields(self, purl: PackageURL) -> None:
build_requires_set,
build_backends_set,
)
# Here we have successfully analyzed the pyproject.toml file. Now, if we have a setup.py/cfg,
# we also need to infer a setuptools version to infer.
if pypi_package_json.file_exists("setup.py") or pypi_package_json.file_exists("setup.cfg"):
build_requires_set.add(f"setuptools=={chronologically_likeliest_version}")
except TypeError as error:
logger.debug(
"Found a type error while reading the pyproject.toml file from the sdist: %s", error
Expand All @@ -185,6 +198,9 @@ def resolve_fields(self, purl: PackageURL) -> None:
logger.debug("Failed to read the pyproject.toml file from the sdist: %s", error)
except SourceCodeError as error:
logger.debug("No pyproject.toml found: %s", error)
# Here we do not have a pyproject.toml file. Instead, we lean on the fact that setuptools
# was the de-facto build tool, and infer a setuptools version to include.
build_requires_set.add(f"setuptools=={chronologically_likeliest_version}")
except SourceCodeError as error:
logger.debug("No source distribution found: %s", error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
False,
pypi_package_json.pypi_registry,
{},
"",
"",
"",
PyPIInspectorAsset("", [], {}),
)
if not adjacent_pypi_json.download(""):
Expand Down
2 changes: 1 addition & 1 deletion src/macaron/repo_finder/repo_finder_pypi.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def find_repo(
if not pypi_registry:
return "", RepoFinderInfo.PYPI_NO_REGISTRY
pypi_asset = PyPIPackageJsonAsset(
purl.name, purl.version, False, pypi_registry, {}, "", "", "", PyPIInspectorAsset("", [], {})
purl.name, purl.version, False, pypi_registry, {}, PyPIInspectorAsset("", [], {})
)

if not pypi_asset:
Expand Down
90 changes: 83 additions & 7 deletions src/macaron/slsa_analyzer/package_registry/pypi_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""The module provides abstractions for the pypi package registry."""
from __future__ import annotations

import bisect
import hashlib
import logging
import os
Expand All @@ -15,7 +16,7 @@
import zipfile
from collections.abc import Callable, Generator, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -502,6 +503,42 @@ def get_maintainer_join_date(self, username: str) -> datetime | None:

return res.replace(tzinfo=None) if res else None

def get_matching_setuptools_version(self, package_release_datetime: datetime) -> str:
"""Find the setuptools that would be "latest" for the input datetime.

Parameters
----------
package_release_datetime: str
Release datetime of a package we wish to rebuild

Returns
-------
str: Matching version of setuptools
"""
setuptools_endpoint = urllib.parse.urljoin(self.registry_url, "pypi/setuptools/json")
setuptools_json = self.download_package_json(setuptools_endpoint)
releases = json_extract(setuptools_json, ["releases"], dict)
if releases:
release_tuples = [
(version, release_info[0].get("upload_time"))
for version, release_info in releases.items()
if release_info
]
# Cannot assume this is sorted, as releases is just a dict
release_tuples.sort(key=lambda x: x[1])
# bisect_left gives position to insert package_release_datetime to maintain order, hence we do -1
index = (
bisect.bisect_left(
release_tuples, package_release_datetime, key=lambda x: datetime.strptime(x[1], "%Y-%m-%dT%H:%M:%S")
)
- 1
)
return str(release_tuples[index][0])
# This realistically cannot happen: it would mean we somehow are trying to rebuild
# for a package and version with no releases.
# Return default just in case.
return defaults.get("heuristic.pypi", "default_setuptools")

@staticmethod
def extract_attestation(attestation_data: dict) -> dict | None:
"""Extract the first attestation file from a PyPI attestation response.
Expand Down Expand Up @@ -618,13 +655,16 @@ class PyPIPackageJsonAsset:
package_json: dict

#: The source code temporary location name.
package_sourcecode_path: str
package_sourcecode_path: str = field(init=False)

#: The wheel temporary location name.
wheel_path: str
wheel_path: str = field(init=False)

#: Name of the wheel file.
wheel_filename: str
wheel_filename: str = field(init=False)

#: The datetime that the wheel was uploaded.
wheel_upload_time: datetime = field(init=False)

#: The pypi inspector information about this package
inspector_asset: PyPIInspectorAsset
Expand Down Expand Up @@ -779,6 +819,7 @@ def get_wheel_url(self, tag: str = "none-any") -> str | None:
# Continue to getting url
wheel_url: str = distribution.get("url") or ""
if wheel_url:
self.wheel_upload_time = datetime.strptime(distribution.get("upload_time") or "", "%Y-%m-%dT%H:%M:%S")
try:
parsed_url = urllib.parse.urlparse(wheel_url)
except ValueError:
Expand Down Expand Up @@ -919,6 +960,33 @@ def get_sourcecode_file_contents(self, path: str) -> bytes:
logger.debug(error_msg)
raise SourceCodeError(error_msg) from read_error

def file_exists(self, path: str) -> bool:
"""Check if a file exists in the downloaded source code.

The path can be relative to the package_sourcecode_path attribute, or an absolute path.

Parameters
----------
path: str
The absolute or relative to package_sourcecode_path file path to check for.

Returns
-------
bool: Whether or not a file at path absolute or relative to package_sourcecode_path exists.
"""
if not self.package_sourcecode_path:
# No source code files were downloaded
return False

if not os.path.isabs(path):
path = os.path.join(self.package_sourcecode_path, path)

if not os.path.exists(path):
# Could not find a file at that path
return False

return True

def iter_sourcecode(self) -> Iterator[tuple[str, bytes]]:
"""
Iterate through all source code files.
Expand Down Expand Up @@ -1054,6 +1122,16 @@ def get_inspector_src_preview_links(self) -> bool:
# If all distributions were invalid and went along a 'continue' path.
return bool(self.inspector_asset)

def get_chronologically_suitable_setuptools_version(self) -> str:
"""Find version of setuptools that would be "latest" for this package.

Returns
-------
str
Chronologically likeliest setuptools version
"""
return self.pypi_registry.get_matching_setuptools_version(self.wheel_upload_time)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a project does not release a wheel, should we also check for the source tarball distribution?



def find_or_create_pypi_asset(
asset_name: str, asset_version: str | None, pypi_registry_info: PackageRegistryInfo
Expand Down Expand Up @@ -1091,8 +1169,6 @@ def find_or_create_pypi_asset(
logger.debug("Failed to create PyPIPackageJson asset.")
return None

asset = PyPIPackageJsonAsset(
asset_name, asset_version, False, package_registry, {}, "", "", "", PyPIInspectorAsset("", [], {})
)
asset = PyPIPackageJsonAsset(asset_name, asset_version, False, package_registry, {}, PyPIInspectorAsset("", [], {}))
pypi_registry_info.metadata.append(asset)
return asset
2 changes: 1 addition & 1 deletion tests/malware_analyzer/pypi/test_wheel_absence.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def test_get_inspector_src_preview_links(mock_send_head_http_raw: MagicMock) ->
mock_send_head_http_raw.return_value = MagicMock() # Assume valid URL for testing purposes.

pypi_package_json = PyPIPackageJsonAsset(
package_name, version, False, pypi_registry, package_json, "", "", "", PyPIInspectorAsset("", [], {})
package_name, version, False, pypi_registry, package_json, PyPIInspectorAsset("", [], {})
)

assert pypi_package_json.get_inspector_src_preview_links() is True
Expand Down
Loading