From 4d64c251929dabcf1a11760d37476c3b18130fc7 Mon Sep 17 00:00:00 2001 From: Vincent Shen Date: Fri, 20 Mar 2026 02:35:37 -0700 Subject: [PATCH] Add Python CEL content bundler and integrate into Konflux build --- ...nce-operator-content-konflux.Containerfile | 14 + tests/unit/utils/test_cel_bundler.py | 317 ++++++++++++++++++ utils/cel_bundler.py | 167 +++++++++ 3 files changed, 498 insertions(+) create mode 100644 tests/unit/utils/test_cel_bundler.py create mode 100644 utils/cel_bundler.py diff --git a/Dockerfiles/compliance-operator-content-konflux.Containerfile b/Dockerfiles/compliance-operator-content-konflux.Containerfile index 1ad5605840c1..ff770d4824c3 100644 --- a/Dockerfiles/compliance-operator-content-konflux.Containerfile +++ b/Dockerfiles/compliance-operator-content-konflux.Containerfile @@ -88,6 +88,19 @@ RUN if [ "$(uname -m)" = "x86_64" ] || [ "$(uname -m)" = "aarch64" ] || [ "$(una else ./build_product ocp4 --datastream-only; \ fi +# Bundle CEL rules and profiles into a single content YAML. +# The cel-rules/ and cel-profiles/ directories are expected in the build/ +# directory at build time. If they are absent, create an empty placeholder +# so the COPY in the final stage always succeeds. +RUN if [ -d build/cel-rules ] && [ -d build/cel-profiles ]; then \ + python3 utils/cel_bundler.py \ + --rules build/cel-rules \ + --profiles build/cel-profiles \ + --output build/cel-content.yaml; \ + else \ + touch build/cel-content.yaml; \ + fi + FROM registry.redhat.io/ubi9/ubi-minimal:latest LABEL \ @@ -110,3 +123,4 @@ LABEL \ WORKDIR / COPY --from=builder /go/src/github.com/ComplianceAsCode/content/LICENSE /licenses/LICENSE COPY --from=builder /go/src/github.com/ComplianceAsCode/content/build/ssg-*-ds.xml . +COPY --from=builder /go/src/github.com/ComplianceAsCode/content/build/cel-content.yaml . diff --git a/tests/unit/utils/test_cel_bundler.py b/tests/unit/utils/test_cel_bundler.py new file mode 100644 index 000000000000..8084f2d0e29e --- /dev/null +++ b/tests/unit/utils/test_cel_bundler.py @@ -0,0 +1,317 @@ +import os +import textwrap + +import pytest +import yaml + +from utils.cel_bundler import bundle_from_dirs, bundle_to_file, bundle_to_yaml + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _write(path, content): + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w") as fh: + fh.write(textwrap.dedent(content)) + + +def _make_rule(tmp_path, filename, name="my-rule", expression='"true"', + inputs=None): + """Write a minimal valid rule YAML and return its path.""" + if inputs is None: + inputs = ( + "inputs:\n" + " - name: pods\n" + " kubernetesInputSpec:\n" + " apiVersion: v1\n" + " resource: pods\n" + ) + content = ( + f"name: {name}\n" + f"id: {name.replace('-', '_')}\n" + f"title: Title for {name}\n" + f"severity: medium\n" + f"checkType: Platform\n" + f"expression: {expression}\n" + f"{inputs}" + ) + path = os.path.join(tmp_path, "rules", filename) + _write(path, content) + return path + + +def _make_profile(tmp_path, filename, name="my-profile", rules=None): + """Write a minimal valid profile YAML and return its path.""" + if rules is None: + rules = ["my-rule"] + rules_yaml = "\n".join(f" - {r}" for r in rules) + content = ( + f"name: {name}\n" + f"id: {name.replace('-', '_')}\n" + f"title: Title for {name}\n" + f"productType: Platform\n" + f"rules:\n{rules_yaml}\n" + ) + path = os.path.join(tmp_path, "profiles", filename) + _write(path, content) + return path + + +def _setup_dirs(tmp_path): + rules_dir = os.path.join(tmp_path, "rules") + profiles_dir = os.path.join(tmp_path, "profiles") + os.makedirs(rules_dir, exist_ok=True) + os.makedirs(profiles_dir, exist_ok=True) + return rules_dir, profiles_dir + + +# --------------------------------------------------------------------------- +# Happy-path tests +# --------------------------------------------------------------------------- + +class TestBundleFromDirs: + def test_single_rule_single_profile(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _make_rule(tmp_path, "r.yaml") + _make_profile(tmp_path, "p.yaml") + + bundle = bundle_from_dirs(rules_dir, profiles_dir) + assert len(bundle["rules"]) == 1 + assert len(bundle["profiles"]) == 1 + assert bundle["rules"][0]["name"] == "my-rule" + assert bundle["profiles"][0]["name"] == "my-profile" + + def test_rules_sorted_by_name(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _make_rule(tmp_path, "z.yaml", name="z-rule") + _make_rule(tmp_path, "a.yaml", name="a-rule") + _make_profile(tmp_path, "p.yaml", rules=["a-rule", "z-rule"]) + + bundle = bundle_from_dirs(rules_dir, profiles_dir) + assert [r["name"] for r in bundle["rules"]] == ["a-rule", "z-rule"] + + def test_profiles_sorted_by_name(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _make_rule(tmp_path, "r.yaml") + _make_profile(tmp_path, "z.yaml", name="z-profile") + _make_profile(tmp_path, "a.yaml", name="a-profile") + + bundle = bundle_from_dirs(rules_dir, profiles_dir) + assert [p["name"] for p in bundle["profiles"]] == [ + "a-profile", "z-profile" + ] + + def test_non_yaml_files_ignored(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _make_rule(tmp_path, "r.yaml") + _write(os.path.join(rules_dir, "README.md"), "# ignore me") + _write(os.path.join(profiles_dir, ".gitkeep"), "") + _make_profile(tmp_path, "p.yaml") + + bundle = bundle_from_dirs(rules_dir, profiles_dir) + assert len(bundle["rules"]) == 1 + + def test_yml_extension_accepted(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _make_rule(tmp_path, "r.yml") + _make_profile(tmp_path, "p.yml") + + bundle = bundle_from_dirs(rules_dir, profiles_dir) + assert len(bundle["rules"]) == 1 + + def test_rule_fields_preserved(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + rule_yaml = textwrap.dedent("""\ + name: check-pods + id: check_pods + title: Check pods + description: Ensure pods exist. + rationale: Pods are important. + severity: high + checkType: Platform + expression: "pods.items.size() > 0" + inputs: + - name: pods + kubernetesInputSpec: + apiVersion: v1 + resource: pods + resourceNamespace: default + failureReason: No pods found. + instructions: Run oc get pods. + controls: + NIST-800-53: + - AC-6 + - CM-7 + CIS-OCP: + - "5.7.4" + """) + _write(os.path.join(rules_dir, "r.yaml"), rule_yaml) + _make_profile(tmp_path, "p.yaml", name="p", rules=["check-pods"]) + + bundle = bundle_from_dirs(rules_dir, profiles_dir) + rule = bundle["rules"][0] + assert rule["id"] == "check_pods" + assert rule["severity"] == "high" + assert rule["description"] == "Ensure pods exist." + assert rule["rationale"] == "Pods are important." + assert rule["failureReason"] == "No pods found." + assert rule["instructions"] == "Run oc get pods." + assert rule["inputs"][0]["kubernetesInputSpec"]["resourceNamespace"] == "default" + assert rule["controls"]["NIST-800-53"] == ["AC-6", "CM-7"] + assert rule["controls"]["CIS-OCP"] == ["5.7.4"] + + def test_profile_version_preserved(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _make_rule(tmp_path, "r.yaml") + profile_yaml = textwrap.dedent("""\ + name: my-profile + id: my_profile + title: My Profile + productType: Platform + version: "1.2.3" + rules: + - my-rule + values: + - var-timeout + """) + _write(os.path.join(profiles_dir, "p.yaml"), profile_yaml) + + bundle = bundle_from_dirs(rules_dir, profiles_dir) + profile = bundle["profiles"][0] + assert profile["version"] == "1.2.3" + assert profile["values"] == ["var-timeout"] + + +# --------------------------------------------------------------------------- +# Validation error tests +# --------------------------------------------------------------------------- + +class TestBundleValidation: + def test_rule_missing_name(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _write( + os.path.join(rules_dir, "r.yaml"), + 'id: x\nseverity: medium\ncheckType: Platform\n' + 'expression: "true"\ninputs:\n - name: x\n' + ' kubernetesInputSpec:\n apiVersion: v1\n' + ' resource: pods\n', + ) + with pytest.raises(ValueError, match="has no name"): + bundle_from_dirs(rules_dir, profiles_dir) + + def test_rule_missing_expression(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _write( + os.path.join(rules_dir, "r.yaml"), + 'name: x\nid: x\nseverity: medium\ncheckType: Platform\n' + 'inputs:\n - name: x\n kubernetesInputSpec:\n' + ' apiVersion: v1\n resource: pods\n', + ) + with pytest.raises(ValueError, match="has no expression"): + bundle_from_dirs(rules_dir, profiles_dir) + + def test_rule_missing_inputs(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _write( + os.path.join(rules_dir, "r.yaml"), + 'name: x\nid: x\nseverity: medium\ncheckType: Platform\n' + 'expression: "true"\n', + ) + with pytest.raises(ValueError, match="has no inputs"): + bundle_from_dirs(rules_dir, profiles_dir) + + def test_profile_missing_name(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _make_rule(tmp_path, "r.yaml") + _write( + os.path.join(profiles_dir, "p.yaml"), + "id: x\ntitle: X\nrules:\n - my-rule\n", + ) + with pytest.raises(ValueError, match="has no name"): + bundle_from_dirs(rules_dir, profiles_dir) + + def test_profile_missing_rules(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _make_rule(tmp_path, "r.yaml") + _write( + os.path.join(profiles_dir, "p.yaml"), + "name: p\nid: p\ntitle: P\n", + ) + with pytest.raises(ValueError, match="has no rules"): + bundle_from_dirs(rules_dir, profiles_dir) + + def test_duplicate_rule_name(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _make_rule(tmp_path, "a.yaml", name="dup") + _make_rule(tmp_path, "b.yaml", name="dup") + _make_profile(tmp_path, "p.yaml", rules=["dup"]) + + with pytest.raises(ValueError, match="duplicate rule name"): + bundle_from_dirs(rules_dir, profiles_dir) + + def test_unknown_rule_reference(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _make_rule(tmp_path, "r.yaml", name="real-rule") + _make_profile(tmp_path, "p.yaml", name="p", + rules=["real-rule", "ghost-rule"]) + + with pytest.raises(ValueError, match="unknown rule"): + bundle_from_dirs(rules_dir, profiles_dir) + + +# --------------------------------------------------------------------------- +# Serialization tests +# --------------------------------------------------------------------------- + +class TestBundleToYAML: + def test_roundtrip(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _make_rule(tmp_path, "r.yaml") + _make_profile(tmp_path, "p.yaml") + + bundle = bundle_from_dirs(rules_dir, profiles_dir) + yaml_str = bundle_to_yaml(bundle) + loaded = yaml.safe_load(yaml_str) + + assert len(loaded["rules"]) == 1 + assert len(loaded["profiles"]) == 1 + assert loaded["rules"][0]["name"] == "my-rule" + assert loaded["profiles"][0]["name"] == "my-profile" + + def test_keys_sorted_alphabetically(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _make_rule(tmp_path, "r.yaml") + _make_profile(tmp_path, "p.yaml") + + bundle = bundle_from_dirs(rules_dir, profiles_dir) + yaml_str = bundle_to_yaml(bundle) + + assert yaml_str.index("profiles:") < yaml_str.index("rules:") + rule_block = yaml_str[yaml_str.index("rules:"):] + assert rule_block.index("checkType") < rule_block.index("name") + + +class TestBundleToFile: + def test_writes_valid_yaml(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _make_rule(tmp_path, "r.yaml") + _make_profile(tmp_path, "p.yaml") + output = os.path.join(tmp_path, "bundle.yaml") + + bundle_to_file(rules_dir, profiles_dir, output) + + with open(output) as fh: + loaded = yaml.safe_load(fh) + assert len(loaded["rules"]) == 1 + assert len(loaded["profiles"]) == 1 + + def test_error_propagates(self, tmp_path): + rules_dir, profiles_dir = _setup_dirs(tmp_path) + _write(os.path.join(rules_dir, "bad.yaml"), "id: no-name\n") + output = os.path.join(tmp_path, "bundle.yaml") + + with pytest.raises(ValueError, match="has no name"): + bundle_to_file(rules_dir, profiles_dir, output) + assert not os.path.exists(output) diff --git a/utils/cel_bundler.py b/utils/cel_bundler.py new file mode 100644 index 000000000000..fc5e92d1677d --- /dev/null +++ b/utils/cel_bundler.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +"""Bundle individual CEL rule and profile YAML files into a single content file. + +This is the Python equivalent of the Go cel-bundler in the compliance-operator +repository (cmd/cel-bundler + pkg/celcontent). It reads individual rule and +profile YAML files, validates them, and produces a single bundle YAML that +the compliance-operator parser consumes via ProfileBundle.celContentFile. +""" + +import argparse +import os +import sys + +import yaml + + +# -- YAML key ordering -------------------------------------------------------- +# The Go bundler (sigs.k8s.io/yaml) emits map keys in alphabetical order. +# We replicate that so the output is byte-for-byte comparable. + +_RULE_KEY_ORDER = [ + "checkType", "controls", "description", "expression", "failureReason", + "id", "inputs", "instructions", "name", "rationale", "severity", "title", + "variables", +] + +_PROFILE_KEY_ORDER = [ + "description", "id", "name", "productName", "productType", "rules", + "title", "values", "version", +] + + +def _ordered_rule_dict(rule): + """Return an OrderedDict-style list of tuples with alphabetical keys.""" + return {k: rule[k] for k in _RULE_KEY_ORDER if k in rule} + + +def _ordered_profile_dict(profile): + return {k: profile[k] for k in _PROFILE_KEY_ORDER if k in profile} + + +# -- Loading helpers ----------------------------------------------------------- + +def _list_yaml_files(directory): + """Return sorted list of .yaml/.yml file paths in *directory*.""" + entries = sorted(os.listdir(directory)) + result = [] + for name in entries: + if os.path.isdir(os.path.join(directory, name)): + continue + ext = os.path.splitext(name)[1].lower() + if ext in (".yaml", ".yml"): + result.append(os.path.join(directory, name)) + return result + + +def _load_rules(rules_dir): + rules = [] + for path in _list_yaml_files(rules_dir): + with open(path, "r") as fh: + data = yaml.safe_load(fh) + if not data: + raise ValueError(f"empty or invalid YAML in {path}") + if not data.get("name"): + raise ValueError(f"rule in {path} has no name") + if not data.get("expression"): + raise ValueError( + f"rule {data['name']!r} in {path} has no expression" + ) + if not data.get("inputs"): + raise ValueError( + f"rule {data['name']!r} in {path} has no inputs" + ) + rules.append(data) + rules.sort(key=lambda r: r["name"]) + return rules + + +def _load_profiles(profiles_dir): + profiles = [] + for path in _list_yaml_files(profiles_dir): + with open(path, "r") as fh: + data = yaml.safe_load(fh) + if not data: + raise ValueError(f"empty or invalid YAML in {path}") + if not data.get("name"): + raise ValueError(f"profile in {path} has no name") + if not data.get("rules"): + raise ValueError( + f"profile {data['name']!r} in {path} has no rules" + ) + profiles.append(data) + profiles.sort(key=lambda p: p["name"]) + return profiles + + +# -- Public API ---------------------------------------------------------------- + +def bundle_from_dirs(rules_dir, profiles_dir): + """Load, validate, and return a bundle dict with *rules* and *profiles*.""" + rules = _load_rules(rules_dir) + profiles = _load_profiles(profiles_dir) + + rule_names = set() + for r in rules: + if r["name"] in rule_names: + raise ValueError(f"duplicate rule name: {r['name']}") + rule_names.add(r["name"]) + + for p in profiles: + for rule_ref in p["rules"]: + if rule_ref not in rule_names: + raise ValueError( + f"profile {p['name']!r} references unknown rule {rule_ref!r}" + ) + + return { + "rules": rules, + "profiles": profiles, + } + + +def bundle_to_yaml(bundle): + """Serialize a bundle dict to a YAML string with sorted keys.""" + ordered = { + "profiles": [_ordered_profile_dict(p) for p in bundle["profiles"]], + "rules": [_ordered_rule_dict(r) for r in bundle["rules"]], + } + return yaml.dump(ordered, default_flow_style=False, sort_keys=True) + + +def bundle_to_file(rules_dir, profiles_dir, output_path): + """Load rules/profiles, validate, and write the bundle YAML to a file.""" + bundle = bundle_from_dirs(rules_dir, profiles_dir) + content = bundle_to_yaml(bundle) + with open(output_path, "w") as fh: + fh.write(content) + + +# -- CLI ----------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser( + description="Bundle CEL rules and profiles into a single YAML file." + ) + parser.add_argument( + "--rules", required=True, help="Path to the CEL rules directory" + ) + parser.add_argument( + "--profiles", required=True, help="Path to the CEL profiles directory" + ) + parser.add_argument( + "--output", required=True, help="Output path for the bundled YAML file" + ) + args = parser.parse_args() + + try: + bundle_to_file(args.rules, args.profiles, args.output) + except Exception as exc: + print(f"Error: {exc}", file=sys.stderr) + sys.exit(1) + + print(f"Generated {args.output}") + + +if __name__ == "__main__": + main()