Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions oar/cli/cmd_controller_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from oar.controller.detector import start_release_detector
from oar.core.const import CONTEXT_SETTINGS
from oar.notificator.jira_notificator import jira_notificator
from oar.image_consistency_check.checker import image_consistency_check

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -36,6 +37,7 @@ def cli(debug):

cli.add_command(start_release_detector)
cli.add_command(jira_notificator)
cli.add_command(image_consistency_check)

if __name__ == '__main__':
cli()
Empty file.
162 changes: 162 additions & 0 deletions oar/image_consistency_check/checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import logging
import sys
import click
import requests

from oar.image_consistency_check.image import ImageMetadata
from oar.image_consistency_check.payload import Payload
from oar.image_consistency_check.shipment import Shipment


logger = logging.getLogger(__name__)


class ImageConsistencyChecker:

def __init__(self, payload: Payload, shipment: Shipment):
"""
Initialize the ImageConsistencyChecker object.

Args:
payload (Payload): The payload object
shipment (Shipment): The shipment object
"""
self.payload_image_pullspecs = payload.get_image_pullspecs()
self.shipment_image_pullspecs = shipment.get_image_pullspecs()
self.all_image_metadata: dict[str, ImageMetadata] = self._create_image_metadata(self.payload_image_pullspecs, self.shipment_image_pullspecs)

def _create_image_metadata(self, payload_image_pullspecs: list[str], shipment_image_pullspecs: list[str]) -> dict[str, ImageMetadata]:
"""
Create the image metadata for the payload and shipment.

Args:
payload_image_pullspecs (list[str]): The list of payload image pullspecs
shipment_image_pullspecs (list[str]): The list of shipment image pullspecs

Returns:
dict[str, ImageMetadata]: The dictionary of image metadata
"""
all_image_metadata: dict[str, ImageMetadata] = {}
for payload_pullspec in payload_image_pullspecs:
if payload_pullspec not in all_image_metadata.keys():
all_image_metadata[payload_pullspec] = ImageMetadata(payload_pullspec)
for shipment_pullspec in shipment_image_pullspecs:
if shipment_pullspec not in all_image_metadata.keys():
all_image_metadata[shipment_pullspec] = ImageMetadata(shipment_pullspec)
return all_image_metadata

def _is_payload_image_in_shipment(self, payload_pullspec: str) -> bool:
"""
Check if the payload image is in the shipment.

Args:
payload_pullspec (str): The pullspec of the payload image

Returns:
bool: True if the payload image is in the shipment, False otherwise
"""
match_pullspecs = []
for shipment_pullspec in self.shipment_image_pullspecs:
if self.all_image_metadata[payload_pullspec].has_same_identifier(self.all_image_metadata[shipment_pullspec]):
match_pullspecs.append(shipment_pullspec)
if len(match_pullspecs) > 0:
logger.info(f"Payload pullspec {payload_pullspec} is in the shipment. Number of matches: {len(match_pullspecs)}")
for mp in match_pullspecs:
logger.info(f"Match pullspec: {mp}")
self.all_image_metadata[mp].log_pullspec_details()
return True
else:
logger.info(f"Payload pullspec {payload_pullspec} is not in the shipment")
return False

def _is_payload_image_released(self, payload_pullspec: str) -> bool:
"""
Check if the payload image is released in Red Hat catalog.

Args:
payload_pullspec (str): The pullspec of the payload image

Returns:
bool: True if only one image is found in Red Hat catalog, False otherwise
"""
payload_image_digest = self.all_image_metadata[payload_pullspec].digest
url = f"https://catalog.redhat.com/api/containers/v1/images?filter=image_id=={payload_image_digest}"
logger.debug(f"Checking payload pullspec: {payload_pullspec} in Red Hat catalog. URL: {url}")
resp = requests.get(url)
if resp.ok:
resp_data = resp.json()
if resp_data["total"] > 0:
logger.info(f"Image {payload_pullspec} found in Red Hat catalog.")
for data in resp_data["data"]:
for repo in data["repositories"]:
logger.info(f"Repository: {repo["registry"]}/{repo["repository"]}")
return True
else:
logger.error(f"No image found in Red Hat catalog.")
return False
else:
logger.error(f"Access to catalog.redhat.com failed. Status code: {resp.status_code}, Reason: {resp.reason}")
return False

def _find_images_with_same_name(self, payload_pullspec: str) -> None:
"""
Find images with the same name but different identifier.

Args:
payload_pullspec (str): The pullspec of the payload image
"""
has_same_name = False

for shipment_pullspec in self.shipment_image_pullspecs:
if self.all_image_metadata[payload_pullspec].has_same_name(self.all_image_metadata[shipment_pullspec]):
has_same_name = True
logger.info(f"Found an image with the same name but different identifier. Please check manually.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a valid case? even name is same, but digest is not, they're not same image

Copy link
Contributor Author

@tomasdavidorg tomasdavidorg Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. See the line 136-138.
The method _find_images_with_same_name is just for investigation purposes. If there is image with same name but different identificators, it will debug log detais.

It is taken from the previous implementation. https://gitlab.cee.redhat.com/aosqe/openshift-misc/-/blob/master/jenkins/v4-image-test/errata_test/errata_test.py?ref_type=heads#L97-L127

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debugging purpose is ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking as resolved.

self.all_image_metadata[shipment_pullspec].log_pullspec_details()

if not has_same_name:
logger.error(f"No image with the same name found in the shipment. Please check manually.")

def is_consistent(self) -> bool:
"""
Check if the images in payload are consistent with images in shipment.

Returns:
bool: True if the images in payload are found in the shipment or Red Hat catalog, False otherwise
"""
all_pullspecs_ok = True
for payload_pullspec in self.payload_image_pullspecs:
logger.info(f"Checking payload pullspec: {payload_pullspec}")
self.all_image_metadata[payload_pullspec].log_pullspec_details()
if self._is_payload_image_in_shipment(payload_pullspec):
logger.info(f"Checking payload pullspec: {payload_pullspec} is passed. Found in the Shipment")
elif self._is_payload_image_released(payload_pullspec):
logger.info(f"Checking payload pullspec: {payload_pullspec} is passed. Found in Red Hat catalog")
else:
logger.error(f"Checking payload pullspec: {payload_pullspec} is failed. Not found in the Shipment and Red Hat catalog")
self._find_images_with_same_name(payload_pullspec)
all_pullspecs_ok = False
return all_pullspecs_ok

@click.command()
@click.option("-p", "--payload-url", type=str, required=True, help="Payload URL")
@click.option("-m", "--mr-id", type=int, required=True, help="Merge request ID")
def image_consistency_check(payload_url: str, mr_id: int) -> None:
"""
Check if images in payload are consistent with images in shipment.

Args:
payload_url (str): The URL of the payload
mr_id (int): The ID of the merge request
"""
payload = Payload(payload_url)
shipment = Shipment(mr_id)
checker = ImageConsistencyChecker(payload, shipment)
if checker.is_consistent():
logger.info("All payload images are consistent with images in shipment.")
sys.exit(0)
else:
logger.error("Payload images are not consistent with images in shipment.")
sys.exit(1)

if __name__ == "__main__":
image_consistency_check()
86 changes: 86 additions & 0 deletions oar/image_consistency_check/image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import json
import logging
import subprocess

logger = logging.getLogger(__name__)


class ImageMetadata:
"""
Represents an image and its metadata.
"""
def __init__(self, pull_spec):
"""
Initialize the ImageMetadata object.

Args:
pull_spec (str): The pull spec of the image
"""
self.pull_spec = pull_spec
self.metadata = self._get_image_metadata() or {}
self.digest = self.metadata.get('digest', '')
self.listdigest = self.metadata.get('listDigest', '')
self.labels = self.metadata.get('config', {}).get('config', {}).get('Labels', {})
self.build_commit_id = self.labels.get('io.openshift.build.commit.id', '')
self.vcs_ref = self.labels.get('vcs-ref', '')
self.name = self.labels.get('name', '')
self.version = self.labels.get('version', '')
self.release = self.labels.get('release', '')
self.tag = f"{self.version}-{self.release}"

def _get_image_metadata(self) -> dict:
"""
Get the metadata of the image.

Returns:
dict: The metadata of the image
"""
cmd = ["oc", "image", "info", "--filter-by-os", "linux/amd64", "-o", "json", "--insecure=true", self.pull_spec]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
return json.loads(result.stdout)
else:
logger.error(f"Command {cmd} returned with error. Return code: {result.returncode}")
logger.error(f"Stderr: {result.stderr}")
return None

def has_same_identifier(self, other) -> bool:
"""
Check if the image matches another image.

Args:
other (ImageMetadata): The other image to compare to

Returns:
bool: True if the images match, False otherwise
"""
if self.listdigest != "" and self.listdigest == other.listdigest:
return True
if self.digest != "" and self.digest == other.digest:
return True
if self.vcs_ref != "" and self.vcs_ref == other.vcs_ref:
return True
return False

def has_same_name(self, other) -> bool:
"""
Check if the image has the same name as another image.

Args:
other (ImageMetadata): The other image to compare to

Returns:
bool: True if the images have the same name, False otherwise
"""
return self.name != "" and self.name == other.name

def log_pullspec_details(self) -> None:
"""
Log the details of the image pullspec.
"""
logger.debug(f"Digest: {self.digest}")
logger.debug(f"Listdigest: {self.listdigest}")
logger.debug(f"Build commit ID: {self.build_commit_id}")
logger.debug(f"VCS ref: {self.vcs_ref}")
logger.debug(f"Name: {self.name}")
logger.debug(f"Tag: {self.tag}")
58 changes: 58 additions & 0 deletions oar/image_consistency_check/payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
import subprocess
import json

logger = logging.getLogger(__name__)


class Payload:
"""
Represents an OpenShift release payload and provides methods to get the image pullspecs.

Class Attributes:
SKIPPED_TAGS (set[str]): Set of tag names to skip when extracting images.
"""

SKIPPED_TAGS = {
"machine-os-content",
"rhel-coreos",
"rhel-coreos-extensions",
}

def __init__(self, payload_url: str):
"""
Initialize the Payload object.

Args:
payload_url (str): The URL of the OpenShift release payload
"""
self._url = payload_url

def get_image_pullspecs(self) -> list[str]:
"""
Fetch image pullspecs from the payload URL, skipping unwanted tags.

Returns:
list[str]: List of container image pullspecs extracted from the payload
"""
cmd = ["oc", "adm", "release", "info", "--pullspecs", self._url, "-o", "json"]
logger.debug(f"Running command: {' '.join(cmd)}")

result = subprocess.run(cmd, capture_output=True, text=True, check=True)
build_data = json.loads(result.stdout)

pullspecs = []
tags = build_data['references']['spec']['tags']
logger.debug(f"Found {len(tags)} tags in payload")

for tag in tags:
tag_name = tag['name']
if tag_name in self.SKIPPED_TAGS:
logger.debug(f"Skipping tag: {tag_name}")
continue

pullspec_name = tag['from']['name']
logger.debug(f"Adding pullspec: {pullspec_name}")
pullspecs.append(pullspec_name)

return pullspecs
Loading