diff --git a/.github/workflows/export-to-gemara.yml b/.github/workflows/export-to-gemara.yml new file mode 100644 index 00000000000..0083468006f --- /dev/null +++ b/.github/workflows/export-to-gemara.yml @@ -0,0 +1,190 @@ +name: Export NIST 800-53 Controls to Gemara Format + +on: + push: + branches: + - master + paths: + - 'products/rhel8/controls/nist_800_53/**' + - 'products/rhel9/controls/nist_800_53/**' + - 'products/rhel10/controls/nist_800_53/**' + - 'utils/nist_sync/export_to_gemara.py' + - 'utils/nist_sync/gemara/**' + - 'utils/nist_sync/data/nist_800_53_rev5_catalog.json' + - 'utils/nist_sync/data/nist_800_53_rev5_*_baseline.json' + schedule: + # Run every Wednesday at 03:17 UTC (off-peak, avoids :00/:30 fleet collisions) + - cron: '17 3 * * 3' + workflow_dispatch: + inputs: + products: + description: 'Comma-separated list of products to export' + required: false + default: 'rhel8,rhel9,rhel10' + validate: + description: 'Run CUE schema validation after export' + required: false + default: 'true' + type: choice + options: + - 'true' + - 'false' + +jobs: + export-to-gemara: + name: Export NIST 800-53 to Gemara + runs-on: ubuntu-latest + container: + image: fedora:latest + + steps: + - name: Install system dependencies + run: | + dnf install -y \ + git \ + python3 \ + python3-pip \ + python3-jinja2 \ + python3-pyyaml \ + python3-setuptools \ + curl + + - name: Checkout repository + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v4 + with: + fetch-depth: 0 + + - name: Configure git safe directory + run: git config --global --add safe.directory "$GITHUB_WORKSPACE" + + - name: Install Python dependencies + run: | + pip install --upgrade pip + pip install ruamel.yaml + + - name: Install CUE binary + run: | + CUE_VERSION="v0.16.1" + curl -sSL \ + "https://github.com/cue-lang/cue/releases/download/${CUE_VERSION}/cue_${CUE_VERSION}_linux_amd64.tar.gz" \ + | tar -xz -C /usr/local/bin cue + cue version + + - name: Clone Gemara schema repository + run: | + git clone --depth 1 https://github.com/gemaraproj/gemara.git /tmp/gemara + + - name: Determine export configuration + id: config + run: | + if [ "${{ github.event_name }}" = "workflow_dispatch" ] && \ + [ -n "${{ inputs.products }}" ]; then + PRODUCTS="${{ inputs.products }}" + else + PRODUCTS="rhel8,rhel9,rhel10" + fi + echo "products=${PRODUCTS}" >> "$GITHUB_OUTPUT" + + if [ "${{ github.event_name }}" = "workflow_dispatch" ] && \ + [ "${{ inputs.validate }}" = "false" ]; then + echo "validate=false" >> "$GITHUB_OUTPUT" + else + echo "validate=true" >> "$GITHUB_OUTPUT" + fi + + - name: Export NIST 800-53 controls to Gemara format + id: export + env: + PYTHONPATH: ${{ github.workspace }} + run: | + mkdir -p build/gemara + python3 utils/nist_sync/export_to_gemara.py \ + --products "${{ steps.config.outputs.products }}" \ + --output-dir build/gemara \ + --oscal-catalog utils/nist_sync/data/nist_800_53_rev5_catalog.json \ + --data-dir utils/nist_sync/data \ + --verbose + + - name: Validate output against Gemara CUE schema + if: steps.config.outputs.validate == 'true' + env: + PYTHONPATH: ${{ github.workspace }} + run: | + python3 utils/nist_sync/export_to_gemara.py \ + --products "${{ steps.config.outputs.products }}" \ + --output-dir build/gemara \ + --oscal-catalog utils/nist_sync/data/nist_800_53_rev5_catalog.json \ + --data-dir utils/nist_sync/data \ + --validate \ + --gemara-schema /tmp/gemara \ + --no-mapping + + - name: Write job summary + if: always() + run: | + echo "## Gemara Export Summary" >> "$GITHUB_STEP_SUMMARY" + echo "" >> "$GITHUB_STEP_SUMMARY" + if [ -f build/gemara/metadata.json ]; then + echo "### Statistics" >> "$GITHUB_STEP_SUMMARY" + echo '```json' >> "$GITHUB_STEP_SUMMARY" + cat build/gemara/metadata.json >> "$GITHUB_STEP_SUMMARY" + echo '```' >> "$GITHUB_STEP_SUMMARY" + fi + echo "" >> "$GITHUB_STEP_SUMMARY" + echo "### Output files" >> "$GITHUB_STEP_SUMMARY" + find build/gemara -type f | sort | while read -r f; do + SIZE=$(wc -l < "$f") + echo "- \`${f}\` (${SIZE} lines)" >> "$GITHUB_STEP_SUMMARY" + done + + - name: Upload Gemara export artifacts + uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v4 + if: always() + with: + name: gemara-export-${{ github.run_number }} + path: build/gemara/ + retention-days: 90 + + # Optional: push the generated files to a dedicated Gemara data repository. + # + # Prerequisites: + # 1. Create the target repository (e.g. ComplianceAsCode/gemara-data). + # 2. Add a deploy key or PAT with write access as secret GEMARA_DATA_REPO_TOKEN. + # 3. Set secret GEMARA_DATA_REPO to "/" (e.g. ComplianceAsCode/gemara-data). + # 4. Uncomment the step below. + # + # - name: Push to Gemara data repository + # if: >- + # github.repository == 'ComplianceAsCode/content' && + # (github.event_name == 'schedule' || github.event_name == 'workflow_dispatch') && + # steps.export.outcome == 'success' + # env: + # GEMARA_DATA_REPO: ${{ secrets.GEMARA_DATA_REPO }} + # GEMARA_DATA_REPO_TOKEN: ${{ secrets.GEMARA_DATA_REPO_TOKEN }} + # GIT_AUTHOR_NAME: github-actions[bot] + # GIT_AUTHOR_EMAIL: github-actions[bot]@users.noreply.github.com + # GIT_COMMITTER_NAME: github-actions[bot] + # GIT_COMMITTER_EMAIL: github-actions[bot]@users.noreply.github.com + # run: | + # git clone --depth 1 \ + # "https://x-access-token:${GEMARA_DATA_REPO_TOKEN}@github.com/${GEMARA_DATA_REPO}.git" \ + # /tmp/gemara-data + # for product in rhel8 rhel9 rhel10; do + # mkdir -p "/tmp/gemara-data/data/${product}/nist_800_53" + # cp "build/gemara/${product}/control_catalog.yaml" \ + # "/tmp/gemara-data/data/${product}/nist_800_53/" + # cp "build/gemara/${product}/rules_mapping.yaml" \ + # "/tmp/gemara-data/data/${product}/nist_800_53/" + # done + # # guidance_catalog.yaml is platform-independent — stored at the top level + # cp build/gemara/guidance_catalog.yaml /tmp/gemara-data/data/nist_800_53/ + # cp build/gemara/metadata.json /tmp/gemara-data/metadata.json + # cd /tmp/gemara-data + # git add -A + # if git diff --cached --quiet; then + # echo "No changes — gemara-data repository is already up to date." + # else + # SHA="${{ github.sha }}" + # git commit -m "chore: sync from content@${SHA:0:8} (${{ github.event_name }})" + # git push + # fi diff --git a/.gitignore b/.gitignore index 5e3eb1f8bfe..ed1db22987d 100644 --- a/.gitignore +++ b/.gitignore @@ -63,6 +63,9 @@ release_tools/artifacts # Ignore the test profile that utils/add_kubernetes_rule.py creates ocp4/profiles/test.profile +# Ignore the NIST 800-53 tailoring base profile generated by export_to_gemara.py +products/*/profiles/nist_800_53.profile + # Ignore the build profiling files .build_profiling/* diff --git a/utils/nist_sync/TESTING.md b/utils/nist_sync/TESTING.md new file mode 100644 index 00000000000..5285b050169 --- /dev/null +++ b/utils/nist_sync/TESTING.md @@ -0,0 +1,346 @@ +# Testing Gemara NIST 800-53 Export with complyctl + +End-to-end guide for validating the Gemara export against complyctl. +Tests all three NIST baselines (Low, Moderate, High) using the `nist_800_53` XCCDF profile. + +The recommended approach uses a RHEL9 Vagrant VM so that OpenSCAP evaluates actual system +state and compliance findings are meaningful. See the [Vagrant workflow](#vagrant-workflow-realistic-os-scanning) section. + +--- + +## Prerequisites + +### 1. Python dependencies + +```bash +pip install ruamel.yaml +source ./.pyenv.sh # adds ssg/ modules to PYTHONPATH +``` + +### 2. SCAP data stream + +The data stream provides the XCCDF rules that complyctl tailors and OpenSCAP evaluates. + +```bash +# Option A — install from RPM (Fedora/RHEL host) +sudo dnf install scap-security-guide + +# Option B — build from source (this repo) +./build_product rhel9 --datastream +sudo mkdir -p /usr/share/xml/scap/ssg/content +sudo cp build/ssg-rhel9-ds.xml /usr/share/xml/scap/ssg/content/ +``` + +Verify: `/usr/share/xml/scap/ssg/content/ssg-rhel9-ds.xml` exists. + +### 3. OSCAL data (for GuidanceCatalog generation) + +The OSCAL catalog is needed to enrich controls with NIST prose. It is gitignored (10 MB). + +```bash +python3 utils/nist_sync/download_oscal.py +``` + +### 4. complyctl binary + +```bash +curl -L https://github.com/complytime/complyctl/releases/download/v1.0.0-alpha.0/complyctl_linux_x86_64.tar.gz \ + | tar -xz -C ~/bin complyctl +chmod +x ~/bin/complyctl +complyctl version +``` + +### 5. complyctl-provider-openscap + +```bash +mkdir -p ~/.complytime/providers +# Download from the complytime releases or build from source +# Place the binary at: ~/.complytime/providers/complyctl-provider-openscap +chmod +x ~/.complytime/providers/complyctl-provider-openscap +``` + +### 6. oras CLI + +Used to push split-layer OCI bundles to the VM's OCI registry. + +```bash +# Fedora/RHEL +sudo dnf install oras + +# Or download from https://oras.land +``` + +--- + +## Step-by-step walkthrough + +Follow these steps to understand exactly what each phase does. + +### Step 1 — Generate Gemara artifacts + +Reads the NIST 800-53 control files for rhel9 and produces three YAML files. + +```bash +source ./.pyenv.sh + +python3 utils/nist_sync/export_to_gemara.py \ + --products rhel9 \ + --output-dir build/gemara \ + --data-dir utils/nist_sync/data \ + --validate +``` + +Output: +``` +build/gemara/ + rhel9/ + control_catalog.yaml # NIST controls → CaC rule IDs (ControlCatalog) + rules_mapping.yaml # rule IDs → NIST controls (MappingDocument) + guidance_catalog.yaml # NIST prose / objectives (GuidanceCatalog, needs OSCAL) +``` + +Verify: `python3 utils/nist_sync/test_gemara_export.py --products rhel9` + +### Step 2 — Build and push per-baseline OCI bundles + +One bundle per baseline. Each contains a Gemara Policy filtered to that baseline's rules. + +```bash +for baseline in low moderate high; do + python3 utils/nist_sync/generate_complyctl_bundle.py \ + --product rhel9 \ + --gemara-dir build/gemara \ + --output-dir "build/gemara-bundle/rhel9/${baseline}" \ + --baseline "$baseline" \ + --base-profile nist_800_53 \ + --registry 127.0.0.1:5500 \ + --tag "nist-800-53-rev5-rhel9-${baseline}:latest" \ + --push --verbose + + echo "Pushed ${baseline} bundle:" + grep -c "requirement-id:" "build/gemara-bundle/rhel9/${baseline}/rhel9_policy.yaml" | \ + xargs echo " assessment-plans:" +done +``` + +Why `nist_800_53` as the base profile? +The profile at `products/rhel9/profiles/nist_800_53.profile` selects **all** NIST-mapped rules +(`nist_800_53:all`). complyctl uses it as the tailoring base and then restricts evaluation to +only the rules present in the Policy's assessment-plans. + +### Step 3 — Verify bundle contents + +```bash +# Inspect the policy for a baseline +python3 -c " +from ruamel.yaml import YAML +y = YAML() +p = y.load(open('build/gemara-bundle/rhel9/moderate/rhel9_policy.yaml')) +plans = p['adherence']['assessment-plans'] +print(f'moderate: {len(plans)} rules') +print('First 5:', [ap[\"id\"] for ap in plans[:5]]) +" +``` + +### Step 4 — Interpret results + +The scan results are in ARF (Assessment Results Format). Use the MappingDocument to +trace rule results back to NIST controls: + +```bash +# Which NIST controls does a passing rule satisfy? +python3 - << 'EOF' +from ruamel.yaml import YAML +y = YAML() +mapping = y.load(open("build/gemara/rhel9/rules_mapping.yaml")) + +rule = "accounts_tmout" +controls = [ + m["source"] for m in mapping["mappings"] + if any(t["entry-id"] == rule for t in m.get("targets", [])) +] +print(f"{rule} → NIST controls: {controls}") +EOF +``` + +--- + +## Architecture notes + +### Why `nist_800_53` profile as the base? + +complyctl uses the base profile as the starting point for XCCDF tailoring. It then enables only +the rules listed in the Policy's assessment-plans. The `nist_800_53.profile` selects all +NIST-mapped rules (`nist_800_53:all`), ensuring every assessment-plan rule is available for +tailoring regardless of which baseline is being tested. + +### Why `datastream:` in complytime.yaml? + +Without an explicit datastream path, the OpenSCAP provider reads `ID_LIKE` from +`/etc/os-release` to pick the data stream. On some systems or containers this can resolve +to the wrong file. The `datastream:` variable bypasses auto-detection and pins the path. + +### Per-baseline rule counts (rhel9) + +| Baseline | Rules | Notes | +|----------|-------|-------| +| low | 383 | All rules with any NIST mapping | +| moderate | 22 | Rules that first appear at moderate level | +| high | 4 | Rules that first appear at high level | + +Counts vary with the state of NIST control mappings in the product control files. + +--- + +## Vagrant workflow (realistic OS scanning) + +Mirrors the [complytime-demos](https://github.com/complytime/complytime-demos) pattern: +a RHEL9 VM runs complyctl against its own OS state, giving compliance findings that reflect +a real system rather than a minimal UBI container. + +``` +Host (your laptop / CI machine) + ├── export_to_gemara.py — generates Gemara YAML artifacts + ├── generate_complyctl_bundle.py — builds per-baseline Policy bundle + ├── oras — pushes bundle to VM_IP:5500 (HOST → VM) + └── Ansible — orchestrates everything below + +VM (generic/rhel9 via Vagrant) + ├── openscap-scanner — evaluates XCCDF rules against the real OS + ├── ssg-rhel9-ds.xml — from scap-security-guide RPM (or copied from host) + ├── registry (distribution binary) — OCI registry at 0.0.0.0:5500 (systemd service) + └── complyctl — fetches from localhost:5500, runs scan + +Note: podman is NOT installed in the VM (containers-common conflicts with redhat-release-9.3 +on generic/rhel9 boxes). The distribution/distribution registry binary is used instead. +``` + +### Prerequisites + +| Tool | Install | +|------|---------| +| Vagrant | https://developer.hashicorp.com/vagrant/install | +| vagrant-libvirt plugin | `vagrant plugin install vagrant-libvirt` | +| Ansible ≥ 2.14 | `pip install ansible` | +| complyctl binary | see [§4 above](#4-complyctl-binary) | +| complyctl-provider-openscap | see [§5 above](#5-complyctl-provider-openscap) | +| Python deps | `pip install ruamel.yaml` | + +VirtualBox can be used instead of libvirt — Vagrant auto-detects the available provider. + +### Step 1 — Start the VM + +```bash +cd utils/nist_sync/vagrant +vagrant up + +# Vagrant triggers populate_inventory.sh automatically after boot. +# Verify the inventory was written: +cat ansible/inventory.ini +``` + +If the trigger did not run (e.g. permission issue), run it manually: + +```bash +cd utils/nist_sync/vagrant +bash populate_inventory.sh +``` + +### Step 2 — One-time setup + +Install complyctl, the provider, and start the distribution registry binary inside the VM. + +```bash +cd utils/nist_sync + +ansible-playbook -i ansible/inventory.ini ansible/setup.yml \ + -e complyctl_bin=/tmp/complyctl \ + -e provider_bin=~/.complytime/providers/complyctl-provider-openscap +``` + +`setup.yml` also copies `build/ssg-rhel9-ds.xml` to the VM if `scap-security-guide` is not +available from the VM's package repos. + +### Step 3 — Run scans (all baselines) + +```bash +cd utils/nist_sync + +ansible-playbook -i ansible/inventory.ini ansible/scan.yml +``` + +What happens per baseline (low / moderate / high): + +1. **Host**: exports Gemara artifacts (`export_to_gemara.py`) +2. **Host**: generates a filtered Policy bundle (`generate_complyctl_bundle.py --push`) + and pushes it to `VM_IP:5500` via `oras` +3. **VM**: writes `complytime.yaml` pointing to `localhost:5500` +4. **VM**: `complyctl get` pulls bundle metadata +5. **VM**: `complyctl generate` builds a tailored XCCDF profile +6. **VM**: `complyctl scan` runs OpenSCAP against the live RHEL9 OS +7. **Host**: results fetched to `build/complyctl-results/rhel9/{baseline}/` + +To test a single baseline: + +```bash +ansible-playbook -i ansible/inventory.ini ansible/scan.yml -e baseline=moderate +``` + +### Step 4 — Inspect results + +```bash +# ARF result (OpenSCAP native format) +ls build/complyctl-results/rhel9/moderate/ + +# Count pass/fail at the rule level +python3 - << 'EOF' +import xml.etree.ElementTree as ET +tree = ET.parse("build/complyctl-results/rhel9/moderate/arf.xml") +ns = {"xccdf": "http://checklists.nist.gov/xccdf/1.2"} +rules = tree.findall(".//xccdf:rule-result", ns) +summary = {} +for r in rules: + result = r.find("xccdf:result", ns) + if result is not None: + summary[result.text] = summary.get(result.text, 0) + 1 +for outcome, count in sorted(summary.items()): + print(f" {outcome:20s}: {count}") +EOF + +# Trace a rule result back to NIST controls +python3 - << 'EOF' +from ruamel.yaml import YAML +y = YAML() +mapping = y.load(open("build/gemara/rhel9/rules_mapping.yaml")) +rule = "accounts_tmout" +controls = [ + m["source"] for m in mapping["mappings"] + if any(t["entry-id"] == rule for t in m.get("targets", [])) +] +print(f"{rule} → NIST controls: {controls}") +EOF +``` + +### Teardown + +```bash +cd utils/nist_sync/vagrant +vagrant halt # power off (preserves disk) +vagrant destroy # remove completely +``` + +--- + +## Troubleshooting + +| Symptom | Cause | Fix | +|---------|-------|-----| +| `0 rules matched` in scan | Wrong base profile or data stream | Verify `nist_800_53` profile exists in the data stream; build from source if needed | +| `oras push failed` | Registry not running in VM | `vagrant ssh -- sudo systemctl restart gemara-registry` | +| `guidance_catalog.yaml` missing | OSCAL data not downloaded | `python3 utils/nist_sync/download_oscal.py` | +| `complyctl: permission denied` | Binary not executable | `chmod +x /path/to/complyctl` | +| Provider not found | Wrong path | Check `~/.complytime/providers/complyctl-provider-openscap` | +| `ansible/inventory.ini` empty or stale | VM IP changed after re-provision | `cd vagrant && bash populate_inventory.sh` | +| Registry unreachable from host during push | VM firewall blocks port 5500 | `vagrant ssh -- sudo firewall-cmd --add-port=5500/tcp --permanent --zone=public && sudo firewall-cmd --reload` | +| `vagrant up` fails with libvirt errors | libvirt not running | `sudo systemctl start libvirtd` | +| `scap-security-guide` not installed on VM | Unsubscribed RHEL9 box | `setup.yml` copies `build/ssg-rhel9-ds.xml` automatically — build the data stream first: `./build_product rhel9 -d` | diff --git a/utils/nist_sync/WALKTHROUGH.md b/utils/nist_sync/WALKTHROUGH.md new file mode 100644 index 00000000000..3d24d268de8 --- /dev/null +++ b/utils/nist_sync/WALKTHROUGH.md @@ -0,0 +1,781 @@ +# Gemara + complyctl: Full Walkthrough with File Inspection + +End-to-end manual walkthrough for NIST 800-53 compliance scanning on RHEL9. +Shows every file produced at each step with real content excerpts. + +--- + +## Repository files involved + +``` +utils/nist_sync/ + export_to_gemara.py # Step 2 — generates Gemara YAML from CaC content + generate_complyctl_bundle.py # Step 3 — builds per-baseline OCI bundle + pushes + download_oscal.py # Optional — enriches guidance_catalog with NIST prose + TESTING.md # Full prerequisite + usage guide + WALKTHROUGH.md # This file + + vagrant/ + Vagrantfile # RHEL9 scanner VM definition (libvirt) + populate_inventory.sh # Extracts VM IP → writes ansible/inventory.ini + + ansible/ + inventory.ini # AUTO-GENERATED — host address + SSH key + setup.yml # One-time VM setup (complyctl, provider, registry) + scan.yml # Orchestrates Steps 2–7 across all baselines + tasks/scan_baseline.yml # Per-baseline subtasks called by scan.yml + templates/complytime.yaml.j2 # complyctl config template written to VM + +products/rhel9/ + profiles/nist_800_53.profile # AUTO-GENERATED by export_to_gemara.py — gitignored, do not edit + +controls/nist_800_53.yml # NIST 800-53 control → rule mappings (source of truth) + +build/ # Generated — gitignored + ssg-rhel9-ds.xml # Step 1 output — SCAP data stream with nist_800_53 profile + gemara/ + guidance_catalog.yaml # Step 2 output — NIST control prose (needs OSCAL data) + rhel9/ + control_catalog.yaml # Step 2 output — controls + rule IDs + applicability + rules_mapping.yaml # Step 2 output — rule IDs ↔ NIST control IDs + gemara-bundle/rhel9/{baseline}/ + rhel9_policy.yaml # Step 3 output — complyctl Policy (assessment-plans) + rhel9_catalog.yaml # Step 3 output — filtered ControlCatalog for baseline + complytime.yaml # Step 3 reference — local test config (not used by Ansible) + HOWTO.txt # Step 3 output — usage instructions + complyctl-results/rhel9/{baseline}/ + arf.xml # Step 7 output — OpenSCAP Assessment Results Format + results.xml # Step 7 output — XCCDF benchmark with rule results + evaluation-log-*.yaml # Step 7 output — complyctl structured evaluation log + report.html # Step 8 output — oscap HTML report (human-readable) +``` + +--- + +## Step 0 — Prerequisites + +### Tools needed (on your host machine) + +| Tool | Purpose | Get it | +|------|---------|--------| +| Python 3 + `ruamel.yaml` | Run export/bundle scripts | `pip install ruamel.yaml` | +| complyctl v1.0.0-alpha.0 | Fetch/generate/scan workflow | `~/bin/complyctl` | +| complyctl-provider-openscap | OpenSCAP backend for complyctl | `~/.complytime/providers/` | +| oras | Pushes OCI bundles to registry | `dnf install oras` | +| Vagrant + vagrant-libvirt | RHEL9 VM (Vagrant workflow) | `vagrant plugin install vagrant-libvirt` | +| Ansible ≥ 2.14 | Orchestrates VM setup + scan | `pip install ansible` | + +```bash +# Verify all tools before starting +complyctl version # should print 1.0.0-alpha.0 +oras version # should print oras/v1.x +vagrant --version +ansible --version +python3 -c "import ruamel.yaml; print('ok')" +``` + +--- + +## Step 1 — Build the SCAP data stream + +The data stream is the source of XCCDF rule definitions. The system RPM +(`scap-security-guide`) does NOT contain the `nist_800_53` profile — always +build from source. + +**Prerequisite — generate the profile first (Step 2 does this automatically):** + +`products/rhel9/profiles/nist_800_53.profile` is not committed to the repository. +It is generated by `export_to_gemara.py` (Step 2) and listed in `.gitignore`. The +generated content is deterministic and trivial — it just selects every rule touched +by `controls/nist_800_53.yml` via the `nist_800_53:all` selector. complyctl then +narrows the selection to one baseline using the Gemara Policy's `assessment-plans`. + +Run Step 2 first (or standalone): + +```bash +source ./.pyenv.sh +python3 utils/nist_sync/export_to_gemara.py --products rhel9 --output-dir build/gemara +``` + +This writes (among other files): + +```yaml +# products/rhel9/profiles/nist_800_53.profile (gitignored — do not commit) +documentation_complete: true +title: 'NIST SP 800-53 Rev 5' +description: |- + Contains all rules mapped to NIST SP 800-53 Revision 5 controls in + ComplianceAsCode for Red Hat Enterprise Linux 9, across all baselines + (Low, Moderate, High). + + Generated by utils/nist_sync/export_to_gemara.py. Do not edit manually. +platform: rhel9 +selections: + - nist_800_53:all +``` + +**Command:** + +```bash +./build_product rhel9 --datastream +``` + +**Output:** + +``` +build/ssg-rhel9-ds.xml (~28 MB) +``` + +**Verify the profile is present:** + +```bash +grep -c 'nist_800_53' build/ssg-rhel9-ds.xml +# should print a non-zero number +``` + +--- + +## Step 2 — Generate Gemara artifacts + +Reads the NIST 800-53 control mappings from the CaC content and produces +three Gemara-schema YAML files. + +**Source files read:** + +- `controls/nist_800_53.yml` — control IDs, levels (low/moderate/high), and rule lists +- `products/rhel9/product.yml` — product metadata +- `utils/nist_sync/data/nist_oscal.json` — NIST prose (optional; download with `download_oscal.py`) + +**Command:** + +```bash +source ./.pyenv.sh +python3 utils/nist_sync/export_to_gemara.py \ + --products rhel9 \ + --output-dir build/gemara \ + --data-dir utils/nist_sync/data \ + --validate +``` + +**Output — three files:** + +### `build/gemara/rhel9/control_catalog.yaml` (ControlCatalog) + +Maps each NIST control to its CaC rule IDs and baseline applicability. +One `controls:` entry per control ID. Each entry lists `assessment-requirements` +(the rule checks that satisfy the control), with `applicability` showing which +baselines require it (`low`, `moderate`, or `high`). + +```yaml +metadata: + id: nist-800-53-rev5-rhel9 + type: ControlCatalog + gemara-version: 1.2.0 + description: NIST Special Publication 800-53 Revision 5 controls for RHEL9, generated from ComplianceAsCode + applicability-groups: + - id: rhel9-low # ← product-scoped: "rhel9-" prefix avoids collisions + title: RHEL9 Low Baseline # when catalogs from multiple products coexist + description: NIST 800-53 Low impact baseline for RHEL9 + - id: rhel9-moderate + title: RHEL9 Moderate Baseline + description: NIST 800-53 Moderate impact baseline for RHEL9 (inherits Low) + - id: rhel9-high + title: RHEL9 High Baseline + description: NIST 800-53 High impact baseline for RHEL9 (inherits Low, Moderate) + +title: NIST Special Publication 800-53 Revision 5 for RHEL9 +groups: +- id: ac + title: Access Control + # ... 20 control families ... + +controls: +- id: ac-2.5 + group: ac + title: Inactivity Logout + objective: 'Require that users log out when {{ insert: param, ac-02.05_odp }}.' + state: Active + assessment-requirements: + - id: accounts_tmout # ← bare rule name (identity of the rule itself) + state: Active + text: "Rule 'accounts_tmout' MUST be verified" + applicability: [rhel9-moderate] # ← only required from moderate baseline up + - id: no_invalid_shell_accounts_unlocked + state: Active + text: "Rule 'no_invalid_shell_accounts_unlocked' MUST be verified" + applicability: [rhel9-moderate] + - id: no_password_auth_for_systemaccounts + state: Active + text: "Rule 'no_password_auth_for_systemaccounts' MUST be verified" + applicability: [rhel9-moderate] + # ... +``` + +**ID design — ControlCatalog vs MappingDocument:** + +`assessment-requirements[].id` uses the **bare rule name** (`accounts_tmout`) because +it identifies the *rule itself* within a control. The `rules_mapping.yaml` uses +**compound IDs** (`ac-2.5--accounts_tmout`) because a mapping entry identifies the +*relationship* between a control and a rule — the same rule can appear under multiple +controls and each (control, rule) pair is a distinct relationship. + +### `build/gemara/rhel9/rules_mapping.yaml` (MappingDocument) + +Bidirectional index: given a CaC rule ID, find which NIST controls it satisfies. +Used after scanning to trace a rule PASS/FAIL back to specific controls. + +```yaml +metadata: + id: nist-800-53-rev5-rhel9-rules-mapping + type: MappingDocument + gemara-version: 1.2.0 + +source-reference: + entry-type: Control # ← "source" = the NIST control +target-reference: + entry-type: AssessmentRequirement # ← "target" = the CaC rule + +mappings: +- id: ac-2.5--accounts_tmout + source: ac-2.5 # NIST control ID + relationship: implements + targets: + - entry-id: accounts_tmout # CaC rule (short name, no prefix) + strength: 8 + confidence-level: High + rationale: Automated enforcement via ComplianceAsCode rule + +- id: ac-2.5--no_invalid_shell_accounts_unlocked + source: ac-2.5 + relationship: implements + targets: + - entry-id: no_invalid_shell_accounts_unlocked + strength: 8 + confidence-level: High + rationale: Automated enforcement via ComplianceAsCode rule + # ... (hundreds more mappings) ... +``` + +### `build/gemara/guidance_catalog.yaml` (GuidanceCatalog) + +Optional — only generated when OSCAL data is present (`download_oscal.py`). +Contains the official NIST prose for each control (the "what should be" layer). + +```yaml +metadata: + id: nist-800-53-rev5-guidance + type: GuidanceCatalog + gemara-version: 1.2.0 + author: + id: nist + name: National Institute of Standards and Technology + +controls: +- id: ac-2.5 + title: Inactivity Logout + objective: > + Require that users log out when [Assignment: organization-defined time period + of expected inactivity or description of when to log out]. + guidance: > + Inactivity logout is behavior- or policy-based and requires users to take + physical action to log out when they are expecting inactivity longer than + the defined period. + # ... +``` + +**Verify the export:** + +```bash +python3 utils/nist_sync/test_gemara_export.py --products rhel9 +# prints: PASS for all three Gemara document types +``` + +--- + +## Step 3 — Generate per-baseline OCI bundle + +For each baseline (low / moderate / high), generate a filtered Policy that +contains only the rules applicable to that baseline, then push to an OCI registry. + +**Command (example: moderate baseline):** + +```bash +python3 utils/nist_sync/generate_complyctl_bundle.py \ + --product rhel9 \ + --gemara-dir build/gemara \ + --output-dir build/gemara-bundle/rhel9/moderate \ + --baseline moderate \ + --base-profile nist_800_53 \ + --registry 127.0.0.1:5500 \ + --tag nist-800-53-rev5-rhel9-moderate:latest \ + --push --verbose +``` + +**Output — four files written, one bundle pushed:** + +### `build/gemara-bundle/rhel9/moderate/rhel9_policy.yaml` (Policy) + +The file complyctl reads to know which rules to evaluate. Each `assessment-plans` +entry maps to one OpenSCAP rule check. The `id` field **must** be the short CaC +rule name (no `xccdf_org.ssgproject.content_rule_` prefix). + +```yaml +title: NIST SP 800-53 Rev 5 for Red Hat Enterprise Linux 9 +metadata: + id: nist-800-53-rev5-rhel9-policy + type: Policy + gemara-version: 1.2.0 + description: > + Automated evaluation policy for NIST SP 800-53 Rev 5 on RHEL9. + requirement-id values are short CaC rule names (the OpenSCAP provider adds + the xccdf_org.ssgproject.content_rule_ prefix). + +imports: + catalogs: + - reference-id: nist-800-53-rev5-rhel9 + +adherence: + evaluation-methods: + - id: openscap-automated + type: Behavioral + mode: Automated + executor: + id: openscap + name: OpenSCAP + + assessment-plans: + - id: accounts_tmout # ← short CaC rule name + requirement-id: accounts_tmout # ← same value (required by go-gemara) + frequency: on-demand + evaluation-methods: + - id: openscap-automated + type: Behavioral + mode: Automated + + - id: configure_custom_crypto_policy_cis + requirement-id: configure_custom_crypto_policy_cis + # ... + + # 22 total assessment-plans for moderate baseline +``` + +**Why 22 rules for moderate?** +The generator reads `applicability` from `control_catalog.yaml` and includes +only rules where `applicability` contains the product-scoped baseline key +(e.g., `rhel9-moderate`). Because baselines inherit upward, rules in the +`rhel9-low` applicability group are already covered by a lower-baseline +bundle — the moderate bundle only adds the rules that first appear at +moderate level. +- `rhel9-low` baseline → 383 rules (rules applicable to low-impact systems) +- `rhel9-moderate` baseline → 22 rules (rules first required at moderate level) +- `rhel9-high` baseline → 4 rules (rules first required at high level) + +### `build/gemara-bundle/rhel9/moderate/rhel9_catalog.yaml` (ControlCatalog) + +A subset of `control_catalog.yaml` filtered to the moderate baseline's controls. +Bundled alongside the Policy so complyctl has the full control context. + +```yaml +metadata: + id: nist-800-53-rev5-rhel9 + type: ControlCatalog + gemara-version: 1.2.0 + # ... same header as the full control_catalog.yaml ... + +controls: +# Only controls that have assessment-requirements with applicability: [moderate] +- id: ac-2.5 + group: ac + title: Inactivity Logout + # ... +``` + +### OCI bundle pushed to registry + +The bundle is pushed as a two-layer OCI artifact: + +``` +nist-800-53-rev5-rhel9-moderate:latest + └── Layer 1: application/vnd.gemara.policy.v1+yaml (rhel9_policy.yaml) + └── Layer 2: application/vnd.gemara.catalog.v1+yaml (rhel9_catalog.yaml) + Artifact type: application/vnd.gemara.bundle.v1 +``` + +**Verify the bundle is in the registry:** + +```bash +curl -s http://127.0.0.1:5500/v2/nist-800-53-rev5-rhel9-moderate/tags/list +# {"name":"nist-800-53-rev5-rhel9-moderate","tags":["latest"]} +``` + +--- + +## Step 4 — Write `complytime.yaml` + +complyctl needs a configuration file pointing it at the registry and telling it: +- Where to find the policy bundle (OCI registry URL) +- Which XCCDF profile to use as the tailoring base +- Which data stream file to use (bypasses OS auto-detection) + +```yaml +# /root/.complytime/complytime.yaml (inside the VM) +policies: + - url: http://localhost:5500/nist-800-53-rev5-rhel9-moderate + id: nist-800-53-rev5-rhel9-moderate + +targets: + - id: local + policies: + - nist-800-53-rev5-rhel9-moderate + variables: + profile: nist_800_53 + datastream: /usr/share/xml/scap/ssg/content/ssg-rhel9-ds.xml +``` + +**Key gotcha — profile variable constraint:** The `profile` value is validated +against `^[a-zA-Z0-9-_.]+$`. Use the short name only — do NOT use the full XCCDF ID +(`xccdf_org.ssgproject.content_profile_nist_800_53`). + +**Key gotcha — `datastream:` is required:** Without this, the OpenSCAP provider +reads `ID_LIKE` from `/etc/os-release` to pick the data stream, which can resolve +to the wrong file. Always set it explicitly to pin the path. + +--- + +## Step 5 — `complyctl get` + +Downloads the Policy and ControlCatalog from the OCI registry into the local +complyctl workspace. + +```bash +cd /root/.complytime +complyctl get +``` + +**What happens:** +1. Reads `complytime.yaml` to find the registry URL +2. Pulls the two-layer OCI bundle via HTTP +3. Writes bundle files into the workspace under `.complytime/` + +**Directory after `get`:** + +``` +/root/.complytime/ + complytime.yaml + providers/ + complyctl-provider-openscap + nist-800-53-rev5-rhel9-moderate/ + rhel9_policy.yaml # pulled from OCI layer 1 + rhel9_catalog.yaml # pulled from OCI layer 2 +``` + +--- + +## Step 6 — `complyctl generate` + +Reads the Policy's `assessment-plans` and the XCCDF data stream. +Generates a tailored XCCDF profile that selects only the rules listed in +the Policy's `assessment-plans`. + +```bash +complyctl generate --policy-id nist-800-53-rev5-rhel9-moderate +``` + +**What happens:** +1. Opens `nist-800-53-rev5-rhel9-moderate/rhel9_policy.yaml` +2. Extracts all `assessment-plans[*].id` → these are short CaC rule names +3. Opens `build/ssg-rhel9-ds.xml` (via the `datastream:` variable) +4. Finds the base profile `xccdf_org.ssgproject.content_profile_nist_800_53` +5. Creates a tailoring document that extends the base profile, enabling only + the 22 rules from the Policy + +**Why `nist_800_53` as the base profile?** +The tailoring mechanism uses `extend` — it starts from `nist_800_53` (which +selects ALL NIST-mapped rules) and then uses `select selected="false"` to +deselect every rule NOT in the Policy. This is more reliable than enabling +rules one by one from an empty base. + +**Output:** A tailored XCCDF XML embedded in the workspace, used by Step 7. + +--- + +## Step 7 — `complyctl scan` + +Runs OpenSCAP against the system using the tailored profile from Step 6. + +```bash +complyctl scan --policy-id nist-800-53-rev5-rhel9-moderate +``` + +**What happens:** +1. Invokes the `complyctl-provider-openscap` plugin +2. Plugin calls `oscap xccdf eval` with the tailored profile +3. OpenSCAP evaluates each of the 22 selected rules against the live OS +4. Results are written as ARF (Assessment Results Format) XML +5. complyctl writes a structured evaluation log in YAML + +**Three output files per baseline:** + +### `evaluation-log-nist-800-53-rev5-rhel9-moderate-.yaml` + +complyctl's structured summary. Shows Passed/Failed per rule name with +the control reference-id for traceability. + +```yaml +evaluations: +- name: configure_custom_crypto_policy_cis + result: Failed + control: + reference-id: nist-800-53-rev5-rhel9-moderate + entry-id: configure_custom_crypto_policy_cis + assessment-logs: + - result: Failed + start: "2026-06-26T13:12:51Z" + confidence-level: High + +- name: package_sudo_installed + result: Passed + control: + reference-id: nist-800-53-rev5-rhel9-moderate + entry-id: package_sudo_installed + +- name: sudo_add_use_pty + result: Failed + +- name: sudo_remove_no_authenticate + result: Passed + +- name: sudo_remove_nopasswd + result: Failed + +- name: no_invalid_shell_accounts_unlocked + result: Passed + +- name: accounts_tmout + result: Failed # ← terminal timeout not configured on fresh VM + +- name: sshd_disable_root_login + result: Failed # ← root SSH login allowed on fresh VM + +- name: kernel_module_usb-storage_disabled + result: Failed + +- name: sysctl_kernel_randomize_va_space + result: Failed + +- name: dir_perms_world_writable_sticky_bits + result: Passed + +- name: file_permissions_unauthorized_world_writable + result: Passed + +- name: file_group_ownership_var_log_audit + result: Passed + +- name: file_permissions_var_log_audit + result: Passed + +# ... (22 total for moderate) +``` + +### `arf.xml` — Assessment Results Format + +Full OpenSCAP output. Contains per-rule results plus OVAL check details. +Parsed with the XCCDF namespace: + +```bash +python3 << 'EOF' +import xml.etree.ElementTree as ET +ns = {"xccdf": "http://checklists.nist.gov/xccdf/1.2"} +tree = ET.parse("build/complyctl-results/rhel9/moderate/arf.xml") +rules = tree.findall(".//xccdf:rule-result", ns) +summary = {} +for r in rules: + res = r.find("xccdf:result", ns) + if res is not None: + summary[res.text] = summary.get(res.text, 0) + 1 +for outcome, count in sorted(summary.items()): + print(f" {outcome:25s}: {count}") +EOF +# Output: +# fail : 7 +# notapplicable : 6 +# notselected : 1511 +# pass : 9 +``` + +**`notselected`: 1511** — these are the other NIST-mapped rules in the data stream +that were deselected by the tailoring. Only 22 rules were actually evaluated. + +### `results.xml` — XCCDF benchmark export + +The full XCCDF benchmark with the tailored profile embedded, including all +rule definitions and their result states. Useful for detailed analysis with +oscap report tools. + +--- + +## Step 8 — Generate HTML report + +Convert the XCCDF results into a human-readable HTML report with rule-level +pass/fail details, severity, and rationale. + +```bash +oscap xccdf generate report \ + build/complyctl-results/rhel9/moderate/results.xml \ + > build/complyctl-results/rhel9/moderate/report.html +``` + +**Open in browser:** + +```bash +xdg-open build/complyctl-results/rhel9/moderate/report.html +# or +firefox build/complyctl-results/rhel9/moderate/report.html +``` + +The report shows: +- **Score** — percentage of selected rules that passed +- **Rule table** — each rule with its result (pass/fail/notapplicable), severity, + and the XCCDF description of what was checked +- **Profile info** — which tailored profile was used + +**Loop for all baselines:** + +```bash +for baseline in low moderate high; do + oscap xccdf generate report \ + "build/complyctl-results/rhel9/${baseline}/results.xml" \ + > "build/complyctl-results/rhel9/${baseline}/report.html" + echo "${baseline}: $(wc -c < "build/complyctl-results/rhel9/${baseline}/report.html") bytes" +done +``` + +The Ansible `scan.yml` does this automatically after each baseline scan. + +--- + +## Step 9 — Trace results back to NIST controls + +Use `rules_mapping.yaml` to translate a rule PASS/FAIL into a NIST control +compliance statement. + +```bash +python3 << 'EOF' +from ruamel.yaml import YAML +y = YAML() +mapping = y.load(open("build/gemara/rhel9/rules_mapping.yaml")) + +# For every rule that failed, find which controls it maps to +failed_rules = [ + "accounts_tmout", + "sshd_disable_root_login", + "sudo_add_use_pty", + "configure_custom_crypto_policy_cis", + "kernel_module_usb-storage_disabled", + "sysctl_kernel_randomize_va_space", + "sudo_remove_nopasswd", +] + +print("Failed rules → NIST controls:") +for rule in failed_rules: + controls = [ + m["source"] for m in mapping["mappings"] + if any(t["entry-id"] == rule for t in m.get("targets", [])) + ] + print(f" {rule}") + for c in controls: + print(f" ← {c.upper()}") +EOF +``` + +**Expected output:** + +``` +Failed rules → NIST controls: + accounts_tmout + ← AC-2.5 + sshd_disable_root_login + ← AC-17 + ← AC-17.1 + sudo_add_use_pty + ← CM-6 + configure_custom_crypto_policy_cis + ← SC-8 + ← SC-8.1 + kernel_module_usb-storage_disabled + ← MP-7 + sysctl_kernel_randomize_va_space + ← SI-16 + sudo_remove_nopasswd + ← IA-11 +``` + +--- + +## Summary: file flow diagram + +``` +controls/nist_800_53.yml + │ + └─► Step 2: export_to_gemara.py + ├─► build/gemara/rhel9/control_catalog.yaml (ControlCatalog) + ├─► build/gemara/rhel9/rules_mapping.yaml (MappingDocument) + ├─► build/gemara/guidance_catalog.yaml (GuidanceCatalog, needs OSCAL) + └─► products/rhel9/profiles/nist_800_53.profile (gitignored, Step 1 input) + +products/rhel9/profiles/nist_800_53.profile [generated above] + │ + └─► Step 1: ./build_product rhel9 -d + └─► build/ssg-rhel9-ds.xml (28 MB, has nist_800_53 XCCDF profile) + +build/gemara/rhel9/control_catalog.yaml + │ + └─► Step 3: generate_complyctl_bundle.py --baseline moderate + ├─► build/gemara-bundle/rhel9/moderate/rhel9_policy.yaml (22 rules) + ├─► build/gemara-bundle/rhel9/moderate/rhel9_catalog.yaml (filtered catalog) + └─► [oras push] → registry:5500/nist-800-53-rev5-rhel9-moderate:latest + +complytime.yaml + registry:5500/... + │ + ├─► Step 5: complyctl get → pulls policy + catalog into workspace + ├─► Step 6: complyctl generate → creates tailored XCCDF (22 of 1533 rules selected) + └─► Step 7: complyctl scan → OpenSCAP evaluates 22 rules against live OS + ├─► evaluation-log-*.yaml (complyctl structured log: Pass/Fail per rule) + ├─► arf.xml (OpenSCAP Assessment Results Format) + └─► results.xml (XCCDF benchmark with embedded results) + +results.xml + │ + └─► Step 8: oscap xccdf generate report results.xml > report.html + └─► report.html (interactive HTML with rule-level pass/fail + rationale) + +evaluation-log-*.yaml + build/gemara/rhel9/rules_mapping.yaml + │ + └─► Step 9: trace rule FAIL → NIST control (AC-2.5, SC-8, IA-11, ...) +``` + +--- + +## Running everything with Vagrant (automated) + +The Ansible playbooks orchestrate Steps 2–7 on a real RHEL9 VM: + +```bash +# 1. Start VM (one time) +cd utils/nist_sync/vagrant +vagrant up +bash populate_inventory.sh # writes ansible/inventory.ini + +# 2. Setup VM (one time) +cd .. +ansible-playbook -i ansible/inventory.ini ansible/setup.yml \ + -e complyctl_bin=~/bin/complyctl \ + -e provider_bin=~/.complytime/providers/complyctl-provider-openscap + +# 3. Run all three baselines +ansible-playbook -i ansible/inventory.ini ansible/scan.yml + +# Results at: +ls build/complyctl-results/rhel9/{low,moderate,high}/ +``` diff --git a/utils/nist_sync/ansible/.gitignore b/utils/nist_sync/ansible/.gitignore new file mode 100644 index 00000000000..80507a37147 --- /dev/null +++ b/utils/nist_sync/ansible/.gitignore @@ -0,0 +1,2 @@ +# Auto-generated by vagrant/populate_inventory.sh after 'vagrant up' +inventory.ini diff --git a/utils/nist_sync/ansible/scan.yml b/utils/nist_sync/ansible/scan.yml new file mode 100644 index 00000000000..e906150a25f --- /dev/null +++ b/utils/nist_sync/ansible/scan.yml @@ -0,0 +1,103 @@ +--- +# NIST 800-53 Gemara scan — all baselines (low / moderate / high). +# +# Flow per baseline: +# 1. Export Gemara artifacts on the host (delegate_to: localhost) +# 2. Generate per-baseline Policy bundle (delegate_to: localhost) +# 3. Push bundle from host → VM registry (delegate_to: localhost, via ansible_host IP) +# 4. Write complytime.yaml on the VM +# 5. complyctl get / generate / scan (runs ON the VM, against the VM's own OS) +# 6. Fetch results → host +# +# Usage (from utils/nist_sync/): +# ansible-playbook -i ansible/inventory.ini ansible/scan.yml +# ansible-playbook -i ansible/inventory.ini ansible/scan.yml -e baseline=moderate +# +# Optional variables: +# baseline low | moderate | high | all (default: all) +# product rhel9 (default: rhel9) +# base_profile nist_800_53 (default: nist_800_53) +# registry_port 5500 (default: 5500) + +- name: NIST 800-53 Gemara scan on RHEL9 VM + hosts: rhel9_scanner + become: true + vars: + product: "{{ lookup('env', 'PRODUCT') | default('rhel9', true) }}" + base_profile: "{{ lookup('env', 'BASE_PROFILE') | default('nist_800_53', true) }}" + registry_port: "{{ lookup('env', 'REGISTRY_PORT') | default('5500', true) }}" + # Resolve the list of baselines to test. + _baseline_arg: "{{ baseline | default(lookup('env', 'BASELINE') | default('all', true)) }}" + baselines: >- + {{ ['low', 'moderate', 'high'] if _baseline_arg == 'all' + else [_baseline_arg] }} + # Paths on the host machine. + # playbook_dir = .../utils/nist_sync/ansible — three levels below repo root. + repo_root: "{{ playbook_dir | realpath + '/../../..' }}" + gemara_dir: "{{ repo_root }}/build/gemara" + results_base: "{{ repo_root }}/build/complyctl-results/{{ product }}" + # complytime working directory inside the VM. + complyctl_home: /root/.complytime + + pre_tasks: + - name: Ensure host result directory exists + delegate_to: localhost + become: false + file: + path: "{{ results_base }}" + state: directory + mode: "0755" + + # Export once — covers all baselines. + - name: "Export Gemara artifacts for {{ product }} (host)" + delegate_to: localhost + become: false + command: > + python3 utils/nist_sync/export_to_gemara.py + --products {{ product }} + --output-dir build/gemara + --data-dir utils/nist_sync/data + args: + chdir: "{{ repo_root }}" + environment: + PYTHONPATH: "{{ repo_root }}" + + - name: Show exported files + delegate_to: localhost + become: false + find: + paths: "{{ gemara_dir }}" + recurse: true + patterns: "*.yaml" + register: exported + + - name: Gemara files exported + debug: + msg: "{{ exported.files | map(attribute='path') | map('replace', repo_root + '/', '') | list }}" + + tasks: + - name: Scan each baseline + include_tasks: tasks/scan_baseline.yml + loop: "{{ baselines }}" + loop_control: + loop_var: baseline_name + + post_tasks: + - name: Final results summary + delegate_to: localhost + become: false + find: + paths: "{{ results_base }}" + recurse: true + patterns: "*.xml,*.yaml" + register: all_results + + - name: Results written to host + debug: + msg: | + {{ all_results.files | length }} result file(s) under build/complyctl-results/{{ product }}/ + {% for f in all_results.files | sort(attribute='path') %} + - {{ f.path | replace(repo_root + '/', '') }} + {% endfor %} + Interpret results: + build/gemara/{{ product }}/rules_mapping.yaml — maps rule PASS/FAIL → NIST controls diff --git a/utils/nist_sync/ansible/setup.yml b/utils/nist_sync/ansible/setup.yml new file mode 100644 index 00000000000..247cfb708d9 --- /dev/null +++ b/utils/nist_sync/ansible/setup.yml @@ -0,0 +1,176 @@ +--- +# One-time setup of the RHEL9 scanner VM. +# +# Usage (from utils/nist_sync/): +# ansible-playbook -i ansible/inventory.ini ansible/setup.yml \ +# -e complyctl_bin=/tmp/complyctl \ +# -e provider_bin=~/.complytime/providers/complyctl-provider-openscap +# +# Optional env overrides (also accepted as -e vars): +# COMPLYCTL_BIN path to complyctl binary on the host (default: /tmp/complyctl) +# PROVIDER_BIN path to complyctl-provider-openscap (default: ~/.complytime/providers/...) +# ORAS_VERSION oras release to install (default: 1.2.3) +# REGISTRY_PORT container registry port inside the VM (default: 5500) + +- name: Set up complyctl NIST scanner on RHEL9 + hosts: rhel9_scanner + become: true + vars: + complyctl_bin: "{{ lookup('env', 'COMPLYCTL_BIN') | default('/tmp/complyctl', true) }}" + provider_bin: "{{ lookup('env', 'PROVIDER_BIN') | default(ansible_env.HOME + '/.complytime/providers/complyctl-provider-openscap', true) }}" + oras_version: "{{ lookup('env', 'ORAS_VERSION') | default('1.2.3', true) }}" + registry_port: "{{ lookup('env', 'REGISTRY_PORT') | default('5500', true) }}" + # Root of the content repo on the host — three levels above the playbook dir. + # playbook_dir = .../utils/nist_sync/ansible + repo_root: "{{ playbook_dir | realpath + '/../../..' }}" + ds_dest: /usr/share/xml/scap/ssg/content/ssg-rhel9-ds.xml + + tasks: + # ------------------------------------------------------------------------- + # complyctl binary + # ------------------------------------------------------------------------- + - name: Copy complyctl binary + copy: + src: "{{ complyctl_bin }}" + dest: /usr/local/bin/complyctl + mode: "0755" + + - name: Verify complyctl runs + command: /usr/local/bin/complyctl version + register: ver + changed_when: false + + - name: Show complyctl version + debug: + msg: "{{ ver.stdout }}" + + # ------------------------------------------------------------------------- + # complyctl-provider-openscap + # ------------------------------------------------------------------------- + - name: Create provider directory + file: + path: /root/.complytime/providers + state: directory + mode: "0755" + + - name: Copy complyctl-provider-openscap + copy: + src: "{{ provider_bin }}" + dest: /root/.complytime/providers/complyctl-provider-openscap + mode: "0755" + + # ------------------------------------------------------------------------- + # OCI registry — distribution/distribution binary (no podman required). + # Runs as a systemd service inside the VM listening on all interfaces. + # The host pushes bundles to ansible_host:{{ registry_port }} via oras; + # complyctl on the VM uses http://localhost:{{ registry_port }}. + # ------------------------------------------------------------------------- + - name: Check if registry binary is already installed + stat: + path: /usr/local/bin/registry + register: registry_bin_stat + + - name: Download distribution/distribution registry binary + shell: | + curl -sL \ + "https://github.com/distribution/distribution/releases/download/v2.8.3/registry_2.8.3_linux_amd64.tar.gz" \ + | tar -xz -C /usr/local/bin registry + chmod +x /usr/local/bin/registry + when: not registry_bin_stat.stat.exists + + - name: Create registry storage directory + file: + path: /var/lib/gemara-registry + state: directory + mode: "0755" + + - name: Write registry config + copy: + dest: /etc/gemara-registry.yml + mode: "0644" + content: | + version: 0.1 + log: + level: warn + storage: + filesystem: + rootdirectory: /var/lib/gemara-registry + delete: + enabled: true + http: + addr: :{{ registry_port }} + + - name: Create systemd service for registry + copy: + dest: /etc/systemd/system/gemara-registry.service + mode: "0644" + content: | + [Unit] + Description=Gemara OCI Registry + After=network.target + [Service] + ExecStart=/usr/local/bin/registry serve /etc/gemara-registry.yml + Restart=always + RestartSec=3 + [Install] + WantedBy=multi-user.target + + - name: Enable and start registry service + systemd: + name: gemara-registry + state: started + enabled: true + daemon_reload: true + + - name: Wait for registry to become ready + uri: + url: "http://localhost:{{ registry_port }}/v2/" + status_code: 200 + retries: 15 + delay: 2 + register: registry_ready + until: registry_ready.status == 200 + + - name: Registry is ready + debug: + msg: "OCI registry running at localhost:{{ registry_port }} (and {{ ansible_host }}:{{ registry_port }} from the host)" + + - name: Open registry port in the VM firewall + shell: | + firewall-cmd --add-port={{ registry_port }}/tcp --permanent + firewall-cmd --reload + args: + executable: /bin/bash + changed_when: true + + # ------------------------------------------------------------------------- + # SCAP data stream — always copy the repo-built version. + # The system RPM (scap-security-guide) predates the nist_800_53 profile + # commit and will not contain that profile. The built data stream must + # be deployed even if the RPM is already installed. + # ------------------------------------------------------------------------- + - name: "Copy built data stream from host" + copy: + src: "{{ repo_root }}/build/ssg-rhel9-ds.xml" + dest: "{{ ds_dest }}" + mode: "0644" + force: true + + - name: Confirm data stream has nist_800_53 profile + command: "grep -c nist_800_53 {{ ds_dest }}" + register: profile_check + changed_when: false + failed_when: profile_check.rc != 0 or profile_check.stdout | int == 0 + + - name: Show data stream status + debug: + msg: "Data stream: {{ ds_dest }} — nist_800_53 profile present ({{ profile_check.stdout }} occurrences)" + + # ------------------------------------------------------------------------- + # Summary + # ------------------------------------------------------------------------- + - name: Setup complete + debug: + msg: | + VM is ready. Run the scan with: + ansible-playbook -i ansible/inventory.ini ansible/scan.yml diff --git a/utils/nist_sync/ansible/tasks/scan_baseline.yml b/utils/nist_sync/ansible/tasks/scan_baseline.yml new file mode 100644 index 00000000000..9930a2959a0 --- /dev/null +++ b/utils/nist_sync/ansible/tasks/scan_baseline.yml @@ -0,0 +1,141 @@ +--- +# Tasks for a single NIST baseline. Called from scan.yml via include_tasks (loop_var: baseline_name). +# +# When 'delegate_to: localhost' is used, {{ ansible_host }} still resolves to the VM's IP +# because Ansible evaluates hostvars from the play's target host, not the delegate. + +- name: "[ {{ baseline_name | upper }} ] Create host output directories" + delegate_to: localhost + become: false + file: + path: "{{ item }}" + state: directory + mode: "0755" + loop: + - "{{ repo_root }}/build/gemara-bundle/{{ product }}/{{ baseline_name }}" + - "{{ results_base }}/{{ baseline_name }}" + +# --------------------------------------------------------------------------- +# 1. Generate Gemara Policy bundle on the HOST and push to the VM's registry +# --------------------------------------------------------------------------- +- name: "[ {{ baseline_name | upper }} ] Generate Policy bundle and push to VM registry" + delegate_to: localhost + become: false + command: > + python3 utils/nist_sync/generate_complyctl_bundle.py + --product {{ product }} + --gemara-dir build/gemara + --output-dir build/gemara-bundle/{{ product }}/{{ baseline_name }} + --baseline {{ baseline_name }} + --base-profile {{ base_profile }} + --registry {{ ansible_host }}:{{ registry_port }} + --tag nist-800-53-rev5-{{ product }}-{{ baseline_name }}:latest + --push + --verbose + args: + chdir: "{{ repo_root }}" + environment: + PYTHONPATH: "{{ repo_root }}" + register: bundle_result + +- name: "[ {{ baseline_name | upper }} ] Bundle push complete" + debug: + msg: >- + {{ + (bundle_result.stdout_lines | select('match', '.*assessment-plan.*|.*Pushed.*|.*rules.*') | list) + if bundle_result.stdout_lines | length > 0 + else bundle_result.stdout_lines + }} + +# --------------------------------------------------------------------------- +# 2. Configure complyctl on the VM +# --------------------------------------------------------------------------- +- name: "[ {{ baseline_name | upper }} ] Write complytime.yaml on the VM" + template: + src: "{{ playbook_dir }}/templates/complytime.yaml.j2" + dest: "{{ complyctl_home }}/complytime.yaml" + mode: "0644" + vars: + policy_id: "nist-800-53-rev5-{{ product }}-{{ baseline_name }}" + +# --------------------------------------------------------------------------- +# 3. Run complyctl inside the VM +# --------------------------------------------------------------------------- +- name: "[ {{ baseline_name | upper }} ] complyctl get (pull bundle metadata from registry)" + command: /usr/local/bin/complyctl get + args: + chdir: "{{ complyctl_home }}" + environment: + HOME: /root + register: get_result + +- name: "[ {{ baseline_name | upper }} ] complyctl generate (build tailored XCCDF)" + command: > + /usr/local/bin/complyctl generate + --policy-id nist-800-53-rev5-{{ product }}-{{ baseline_name }} + args: + chdir: "{{ complyctl_home }}" + environment: + HOME: /root + register: gen_result + +- name: "[ {{ baseline_name | upper }} ] complyctl scan" + command: > + /usr/local/bin/complyctl scan + --policy-id nist-800-53-rev5-{{ product }}-{{ baseline_name }} + args: + chdir: "{{ complyctl_home }}" + environment: + HOME: /root + # Non-zero exit is expected when rules fail (compliance findings). + failed_when: false + register: scan_result + +- name: "[ {{ baseline_name | upper }} ] Scan exit code" + debug: + msg: >- + complyctl scan exit={{ scan_result.rc }} + (0=pass, non-zero=compliance findings found — expected for a fresh VM) + +# --------------------------------------------------------------------------- +# 4. Collect result files from the VM and fetch to the host +# --------------------------------------------------------------------------- +- name: "[ {{ baseline_name | upper }} ] Find result files on VM" + find: + paths: "{{ complyctl_home }}" + recurse: true + patterns: + - "arf.xml" + - "results.xml" + - "evaluation-log-*{{ baseline_name }}*.yaml" + register: result_files + failed_when: false + +- name: "[ {{ baseline_name | upper }} ] Fetch result files to host" + fetch: + src: "{{ item.path }}" + dest: "{{ results_base }}/{{ baseline_name }}/{{ item.path | basename }}" + flat: true + fail_on_missing: false + loop: "{{ result_files.files | unique(attribute='path') }}" + loop_control: + label: "{{ item.path | basename }}" + +- name: "[ {{ baseline_name | upper }} ] Generate HTML report from XCCDF results" + delegate_to: localhost + become: false + shell: | + oscap xccdf generate report \ + "{{ results_base }}/{{ baseline_name }}/results.xml" \ + > "{{ results_base }}/{{ baseline_name }}/report.html" + args: + executable: /bin/bash + failed_when: false + register: html_report + +- name: "[ {{ baseline_name | upper }} ] Baseline complete" + debug: + msg: >- + Results: build/complyctl-results/{{ product }}/{{ baseline_name }}/ + ({{ result_files.files | length }} file(s)) + HTML report: build/complyctl-results/{{ product }}/{{ baseline_name }}/report.html diff --git a/utils/nist_sync/ansible/templates/complytime.yaml.j2 b/utils/nist_sync/ansible/templates/complytime.yaml.j2 new file mode 100644 index 00000000000..6f20469a77e --- /dev/null +++ b/utils/nist_sync/ansible/templates/complytime.yaml.j2 @@ -0,0 +1,16 @@ +# Generated by Ansible (scan.yml) — do not edit manually. +# Recreated for each baseline by tasks/scan_baseline.yml. +policies: + - url: http://localhost:{{ registry_port }}/{{ policy_id }} + id: {{ policy_id }} + +targets: + - id: local + policies: + - {{ policy_id }} + variables: + profile: {{ base_profile }} + # Explicit datastream path: prevents the OpenSCAP provider from using + # OS auto-detection, which would pick the wrong data stream if + # /etc/os-release contains unexpected ID_LIKE values. + datastream: /usr/share/xml/scap/ssg/content/ssg-{{ product }}-ds.xml diff --git a/utils/nist_sync/export_to_gemara.py b/utils/nist_sync/export_to_gemara.py new file mode 100644 index 00000000000..230d53c2266 --- /dev/null +++ b/utils/nist_sync/export_to_gemara.py @@ -0,0 +1,493 @@ +#!/usr/bin/env python3 +""" +Export ComplianceAsCode NIST 800-53 controls to Gemara format. + +Reads product-specific NIST 800-53 control files and produces per product: + - control_catalog.yaml (ControlCatalog: NIST controls → XCCDF rule IDs) + - rules_mapping.yaml (MappingDocument: traceability between layers) + - products/{product}/profiles/nist_800_53.profile (XCCDF tailoring base) + +Also produces a single platform-independent artifact: + - guidance_catalog.yaml (GuidanceCatalog: abstract NIST 800-53 standard text) + +Usage: + python3 utils/nist_sync/export_to_gemara.py --products rhel9 --validate + python3 utils/nist_sync/export_to_gemara.py --products rhel8,rhel9,rhel10 +""" + +import argparse +import io +import json +import shutil +import subprocess +import sys +from datetime import datetime, timezone +from pathlib import Path + +try: + from ruamel.yaml import YAML +except ImportError: + sys.stderr.write("Error: ruamel.yaml is required. Install with: pip install ruamel.yaml\n") + sys.exit(1) + +try: + import ssg.controls + import ssg.yaml +except (ModuleNotFoundError, ImportError): + sys.stderr.write("Unable to load ssg python modules.\n") + sys.stderr.write("Hint: run source ./.pyenv.sh\n") + sys.exit(3) + +_SCRIPT_DIR = Path(__file__).parent +_REPO_ROOT = _SCRIPT_DIR.parent.parent + +sys.path.insert(0, str(_SCRIPT_DIR)) +from gemara.catalog import GemaraCatalogBuilder +from gemara.guidance import GemaraGuidanceCatalogBuilder +from gemara.mapping import GemaraMappingBuilder +from gemara.schema import validate_catalog, validate_guidance, validate_mapping + + +DEFAULT_PRODUCTS = ["rhel8", "rhel9", "rhel10"] +DEFAULT_OUTPUT_DIR = _REPO_ROOT / "build" / "gemara" +DEFAULT_OSCAL_CATALOG = _SCRIPT_DIR / "data" / "nist_800_53_rev5_catalog.json" +DEFAULT_DATA_DIR = _SCRIPT_DIR / "data" + +_PRODUCT_FULL_NAMES = { + "rhel8": "Red Hat Enterprise Linux 8", + "rhel9": "Red Hat Enterprise Linux 9", + "rhel10": "Red Hat Enterprise Linux 10", +} + + +def _write_xccdf_profile(product, repo_root, verbose): + """Generate products/{product}/profiles/nist_800_53.profile. + + This profile selects every rule touched by the nist_800_53 control file. + complyctl then narrows the selection to one baseline via the Gemara Policy's + assessment-plans — so this single profile covers Low, Moderate, and High. + The file is intentionally not committed; re-run export_to_gemara.py to + regenerate it after adding or removing rules from the control file. + """ + full_name = _PRODUCT_FULL_NAMES.get(product, product.upper()) + profile_path = repo_root / "products" / product / "profiles" / "nist_800_53.profile" + content = f"""\ +documentation_complete: true +title: 'NIST SP 800-53 Rev 5' +description: |- + Contains all rules mapped to NIST SP 800-53 Revision 5 controls in + ComplianceAsCode for {full_name}, across all baselines (Low, Moderate, High). + + Generated by utils/nist_sync/export_to_gemara.py. Do not edit manually. +platform: {product} +selections: + - nist_800_53:all +""" + profile_path.write_text(content, encoding="utf-8") + if verbose: + print(f" Wrote {profile_path}") + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Export ComplianceAsCode NIST 800-53 controls to Gemara format", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--products", + default=",".join(DEFAULT_PRODUCTS), + help="Comma-separated product list (default: %(default)s)", + ) + parser.add_argument( + "--output-dir", + type=Path, + default=DEFAULT_OUTPUT_DIR, + help="Output directory (default: %(default)s)", + ) + parser.add_argument( + "--repo-root", + type=Path, + default=_REPO_ROOT, + help="Repository root (default: auto-detected)", + ) + parser.add_argument( + "--oscal-catalog", + type=Path, + default=DEFAULT_OSCAL_CATALOG, + help="Path to OSCAL catalog JSON for objective text enrichment", + ) + parser.add_argument( + "--validate", + action="store_true", + help="Validate output against Gemara structural rules (Python) " + "and CUE schema (if --gemara-schema is provided and cue is on PATH)", + ) + parser.add_argument( + "--gemara-schema", + type=Path, + default=None, + metavar="DIR", + help="Path to a cloned gemara repo (https://github.com/gemaraproj/gemara) " + "containing the CUE schema files. When provided with --validate, " + "each output file is validated with 'cue vet'.", + ) + parser.add_argument( + "--no-mapping", + action="store_true", + help="Skip MappingDocument generation", + ) + parser.add_argument( + "--no-guidance", + action="store_true", + help="Skip GuidanceCatalog generation (platform-independent NIST standard text)", + ) + parser.add_argument( + "--data-dir", + type=Path, + default=DEFAULT_DATA_DIR, + help="Directory with NIST baseline JSON files for applicability (default: %(default)s)", + ) + parser.add_argument( + "--verbose", + action="store_true", + help="Print per-control details", + ) + return parser.parse_args() + + +def load_oscal_catalog(path): + """Load the OSCAL catalog JSON file, returning None if unavailable.""" + if not path or not Path(path).exists(): + return None + try: + with open(path) as f: + return json.load(f) + except Exception as exc: + sys.stderr.write(f"Warning: could not load OSCAL catalog {path}: {exc}\n") + return None + + +def load_policy(product, repo_root): + """ + Load the NIST 800-53 Policy for a product without requiring a build. + + The NIST control files are plain YAML (no Jinja2), so env_yaml=None is safe. + """ + policy_file = repo_root / "products" / product / "controls" / "nist_800_53.yml" + if not policy_file.exists(): + raise FileNotFoundError( + f"Policy file not found for {product}: {policy_file}" + ) + policy = ssg.controls.Policy(str(policy_file), env_yaml=None) + policy.load() + return policy + + +def _yaml_instance(): + yaml = YAML() + yaml.default_flow_style = False + yaml.allow_unicode = True + yaml.width = 120 + return yaml + + +def write_yaml(data, path): + """Serialize data to YAML at path.""" + yaml = _yaml_instance() + buf = io.StringIO() + yaml.dump(data, buf) + content = buf.getvalue() + path.write_text(content, encoding="utf-8") + + +def find_cue(): + """Return the path to the cue binary, or None if not on PATH.""" + return shutil.which("cue") + + +def cue_validate(schema_dir, schema_expr, yaml_path): + """ + Run 'cue vet' against yaml_path using the CUE schema in schema_dir. + + Args: + schema_dir: Path to the cloned gemara repo (contains *.cue files). + schema_expr: CUE expression selecting the schema, e.g. '#ControlCatalog'. + yaml_path: Path to the YAML file to validate. + + Returns: + (passed: bool, output: str) — output is empty on success. + """ + cue_bin = find_cue() + if not cue_bin: + return None, "cue binary not found on PATH" + + cmd = [cue_bin, "vet", "-d", schema_expr, "-E", ".", str(yaml_path)] + try: + result = subprocess.run( + cmd, + cwd=str(schema_dir), + capture_output=True, + text=True, + ) + combined = (result.stdout + result.stderr).strip() + return result.returncode == 0, combined + except Exception as exc: + return False, str(exc) + + +def export_guidance(oscal_catalog, data_dir, output_dir, validate, gemara_schema, verbose): + """Generate the platform-independent GuidanceCatalog. Returns stats dict.""" + builder = GemaraGuidanceCatalogBuilder(oscal_catalog, data_dir=data_dir) + guidance = builder.build() + guideline_count = len(guidance.get("guidelines", [])) + + if validate: + errors = validate_guidance(guidance) + if errors: + sys.stderr.write(" [WARN] GuidanceCatalog validation errors:\n") + for e in errors: + sys.stderr.write(f" - {e}\n") + + guidance_path = output_dir / "guidance_catalog.yaml" + write_yaml(guidance, guidance_path) + if verbose: + print(f" Wrote {guidance_path}") + + if validate and gemara_schema: + passed, output = cue_validate(gemara_schema, "#GuidanceCatalog", guidance_path) + if passed is None: + print(f" [CUE] guidance_catalog.yaml SKIP ({output})") + elif passed: + print(" [CUE] guidance_catalog.yaml PASS") + else: + print(" [CUE] guidance_catalog.yaml FAIL") + for line in output.splitlines(): + print(f" {line}") + + return {"guideline_count": guideline_count} + + +def export_product(product, repo_root, oscal_catalog, output_dir, include_mapping, validate, gemara_schema, verbose): + """Export one product. Returns stats dict.""" + if verbose: + print(f" Loading policy for {product}...") + + policy = load_policy(product, repo_root) + total_controls = len(policy.controls) + + # Build ControlCatalog + builder = GemaraCatalogBuilder(product, policy, oscal_catalog) + catalog = builder.build() + catalog_id = catalog["metadata"]["id"] + + # Validate + if validate: + errors = validate_catalog(catalog) + if errors: + sys.stderr.write(f" [WARN] ControlCatalog validation errors for {product}:\n") + for e in errors: + sys.stderr.write(f" - {e}\n") + + # Write ControlCatalog + product_dir = output_dir / product + product_dir.mkdir(parents=True, exist_ok=True) + catalog_path = product_dir / "control_catalog.yaml" + write_yaml(catalog, catalog_path) + if verbose: + print(f" Wrote {catalog_path}") + + # Generate the XCCDF tailoring base profile (not committed — see .gitignore) + _write_xccdf_profile(product, repo_root, verbose) + + if validate and gemara_schema: + passed, output = cue_validate(gemara_schema, "#ControlCatalog", catalog_path) + if passed is None: + print(f" [CUE] control_catalog.yaml SKIP ({output})") + elif passed: + print(" [CUE] control_catalog.yaml PASS") + else: + print(" [CUE] control_catalog.yaml FAIL") + for line in output.splitlines(): + print(f" {line}") + + # Count rules referenced across all controls + all_rules = set() + for ctrl in policy.controls: + for r in (ctrl.rules or []): + if "=" not in r: + all_rules.add(r) + + stats = { + "product": product, + "control_count": total_controls, + "rule_count": len(all_rules), + "mapping_count": 0, + } + + if not include_mapping: + return stats + + # Build MappingDocument + mapping_builder = GemaraMappingBuilder(product, catalog_id, policy) + mapping = mapping_builder.build() + + if validate: + errors = validate_mapping(mapping) + if errors: + sys.stderr.write(f" [WARN] MappingDocument validation errors for {product}:\n") + for e in errors: + sys.stderr.write(f" - {e}\n") + + mapping_path = product_dir / "rules_mapping.yaml" + write_yaml(mapping, mapping_path) + if verbose: + print(f" Wrote {mapping_path}") + + if validate and gemara_schema: + passed, output = cue_validate(gemara_schema, "#MappingDocument", mapping_path) + if passed is None: + print(f" [CUE] rules_mapping.yaml SKIP ({output})") + elif passed: + print(" [CUE] rules_mapping.yaml PASS") + else: + print(" [CUE] rules_mapping.yaml FAIL") + for line in output.splitlines(): + print(f" {line}") + + stats["mapping_count"] = len(mapping["mappings"]) + return stats + + +def write_metadata(output_dir, all_stats, guidance_stats=None): + """Write a metadata.json summary file.""" + meta = { + "generated_at": datetime.now(timezone.utc).isoformat(), + "products": {s["product"]: s for s in all_stats}, + "totals": { + "control_count": sum(s["control_count"] for s in all_stats), + "rule_count": sum(s["rule_count"] for s in all_stats), + "mapping_count": sum(s["mapping_count"] for s in all_stats), + }, + } + if guidance_stats: + meta["guidance"] = guidance_stats + meta_path = output_dir / "metadata.json" + meta_path.write_text(json.dumps(meta, indent=2), encoding="utf-8") + return meta_path + + +def main(): + args = parse_args() + products = [p.strip() for p in args.products.split(",") if p.strip()] + output_dir = args.output_dir + include_mapping = not args.no_mapping + include_guidance = not args.no_guidance + + print("Exporting NIST 800-53 to Gemara format") + print(f" Products: {', '.join(products)}") + print(f" Output dir: {output_dir}") + + oscal_catalog = load_oscal_catalog(args.oscal_catalog) + if oscal_catalog: + print(f" OSCAL: {args.oscal_catalog} (loaded)") + else: + print(" OSCAL: not found — using control titles as objectives") + + gemara_schema = args.gemara_schema + if args.validate: + cue_bin = find_cue() + if gemara_schema and gemara_schema.is_dir() and cue_bin: + print(f" CUE: {cue_bin} (schema: {gemara_schema})") + elif gemara_schema and not gemara_schema.is_dir(): + sys.stderr.write(f" [WARN] --gemara-schema path not found: {gemara_schema}\n") + gemara_schema = None + elif not cue_bin: + print(" CUE: not found on PATH — skipping CUE validation") + gemara_schema = None + else: + print(" CUE: pass --gemara-schema to enable CUE validation") + + output_dir.mkdir(parents=True, exist_ok=True) + + all_stats = [] + failed = [] + for product in products: + print(f"\n[{product}]") + try: + stats = export_product( + product, + args.repo_root, + oscal_catalog, + output_dir, + include_mapping, + args.validate, + gemara_schema, + args.verbose, + ) + all_stats.append(stats) + print( + f" controls={stats['control_count']} " + f"rules={stats['rule_count']} " + f"mappings={stats['mapping_count']}" + ) + except FileNotFoundError as exc: + sys.stderr.write(f" [SKIP] {exc}\n") + failed.append(product) + except Exception as exc: + sys.stderr.write(f" [ERROR] {product}: {exc}\n") + failed.append(product) + if args.verbose: + import traceback + traceback.print_exc() + + # GuidanceCatalog — generated once, platform-independent + guidance_stats = None + if include_guidance and not oscal_catalog: + print("\n[guidance_catalog]") + print(" [SKIP] OSCAL catalog not available — guidance_catalog.yaml not generated") + print(" To generate it, download the OSCAL data first:") + print(" python3 utils/nist_sync/download_oscal.py") + print(f" Expected at: {args.oscal_catalog}") + elif include_guidance and oscal_catalog: + print("\n[guidance_catalog]") + try: + guidance_stats = export_guidance( + oscal_catalog, + args.data_dir, + output_dir, + args.validate, + gemara_schema, + args.verbose, + ) + print(f" guidelines={guidance_stats['guideline_count']}") + except Exception as exc: + sys.stderr.write(f" [ERROR] guidance_catalog: {exc}\n") + if args.verbose: + import traceback + traceback.print_exc() + + if all_stats: + meta_path = write_metadata(output_dir, all_stats, guidance_stats) + print(f"\nWrote metadata: {meta_path}") + + totals = { + "controls": sum(s["control_count"] for s in all_stats), + "rules": sum(s["rule_count"] for s in all_stats), + "mappings": sum(s["mapping_count"] for s in all_stats), + } + guidance_note = ( + f", {guidance_stats['guideline_count']} guidelines" if guidance_stats else "" + ) + print( + f"\nDone. Total: {totals['controls']} controls, " + f"{totals['rules']} rules, {totals['mappings']} mappings{guidance_note}" + ) + + if failed: + sys.stderr.write(f"\nFailed products: {', '.join(failed)}\n") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/utils/nist_sync/gemara/__init__.py b/utils/nist_sync/gemara/__init__.py new file mode 100644 index 00000000000..682c39453c3 --- /dev/null +++ b/utils/nist_sync/gemara/__init__.py @@ -0,0 +1 @@ +# Gemara export utilities for ComplianceAsCode NIST 800-53 controls diff --git a/utils/nist_sync/gemara/catalog.py b/utils/nist_sync/gemara/catalog.py new file mode 100644 index 00000000000..38128727961 --- /dev/null +++ b/utils/nist_sync/gemara/catalog.py @@ -0,0 +1,220 @@ +"""Builds a Gemara ControlCatalog from ComplianceAsCode NIST 800-53 controls.""" + +import re +from datetime import datetime, timezone + +from .schema import GEMARA_VERSION +from .status_map import map_state + +# NIST 800-53 Rev 5 control families (matches sync_nist_split.py) +NIST_FAMILIES = { + 'ac': 'Access Control', + 'at': 'Awareness and Training', + 'au': 'Audit and Accountability', + 'ca': 'Assessment, Authorization, and Monitoring', + 'cm': 'Configuration Management', + 'cp': 'Contingency Planning', + 'ia': 'Identification and Authentication', + 'ir': 'Incident Response', + 'ma': 'Maintenance', + 'mp': 'Media Protection', + 'pe': 'Physical and Environmental Protection', + 'pl': 'Planning', + 'pm': 'Program Management', + 'ps': 'Personnel Security', + 'pt': 'PII Processing and Transparency', + 'ra': 'Risk Assessment', + 'sa': 'System and Services Acquisition', + 'sc': 'System and Communications Protection', + 'si': 'System and Information Integrity', + 'sr': 'Supply Chain Risk Management', +} + +_VAR_ASSIGN_RE = re.compile(r'^[a-z][a-z0-9_]*=[^\s]+$') + + +def _is_variable_assignment(rule_entry): + return bool(_VAR_ASSIGN_RE.match(rule_entry)) + + +def _extract_family(control_id): + return control_id.split('-')[0].lower() + + +def _now_iso(): + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _build_oscal_index(oscal_catalog): + """Build a dict mapping lowercase control IDs to their statement prose.""" + index = {} + if not oscal_catalog: + return index + catalog = oscal_catalog.get("catalog", {}) + for group in catalog.get("groups", []): + for ctrl in group.get("controls", []): + _index_control(ctrl, index) + return index + + +def _index_control(ctrl, index): + ctrl_id = ctrl.get("id", "").lower() + prose = "" + for part in ctrl.get("parts", []): + if part.get("name") == "statement": + prose = part.get("prose", "").strip() + if not prose: + sub_parts = [p.get("prose", "").strip() for p in part.get("parts", [])] + prose = " ".join(p for p in sub_parts if p) + break + if ctrl_id and prose: + index[ctrl_id] = prose + for enhancement in ctrl.get("controls", []): + _index_control(enhancement, index) + + +class GemaraCatalogBuilder: + """Builds a Gemara ControlCatalog dict from a loaded CaC Policy object.""" + + def __init__(self, product, policy, oscal_catalog=None): + self.product = product + self.policy = policy + self._oscal_index = _build_oscal_index(oscal_catalog) + # Collect all baseline IDs for use as default applicability + self._all_baselines = [lv.id for lv in policy.levels] + + def _metadata(self): + catalog_id = f"nist-800-53-rev5-{self.product}" + return { + "id": catalog_id, + "type": "ControlCatalog", + "gemara-version": GEMARA_VERSION, + "description": ( + f"NIST Special Publication 800-53 Revision 5 controls for " + f"{self.product.upper()}, generated from ComplianceAsCode" + ), + "author": { + "id": "complianceascode", + "name": "ComplianceAsCode Project", + "type": "Software", + "uri": "https://github.com/ComplianceAsCode/content", + }, + "version": "Revision 5", + # #Datetime requires full ISO 8601 with time component + "date": _now_iso(), + "applicability-groups": self._applicability_groups(), + } + + def _applicability_groups(self): + groups = [] + for level in self.policy.levels: + group_id = f"{self.product}-{level.id}" + desc = f"NIST 800-53 {level.id.capitalize()} impact baseline for {self.product.upper()}" + if level.inherits_from: + parents = ", ".join(p.capitalize() for p in level.inherits_from) + desc += f" (inherits {parents})" + groups.append({ + "id": group_id, + "title": f"{self.product.upper()} {level.id.capitalize()} Baseline", + "description": desc, + }) + return groups + + def _groups(self): + return [ + { + "id": fam_id, + "title": fam_title, + "description": f"NIST 800-53 {fam_id.upper()} family: {fam_title}", + } + for fam_id, fam_title in NIST_FAMILIES.items() + ] + + def _objective(self, control): + """Return objective text: OSCAL statement prose, or title as fallback.""" + ctrl_id = control.id.lower() + if ctrl_id in self._oscal_index: + return self._oscal_index[ctrl_id] + return control.title + + def _applicability_for(self, control): + """Return non-empty product-scoped applicability list for a control.""" + seen = set() + deduped = [] + for level in (control.levels or []): + scoped = f"{self.product}-{level}" + if scoped not in seen: + seen.add(scoped) + deduped.append(scoped) + # applicability must be non-empty: fall back to all baselines + return deduped if deduped else [f"{self.product}-{b}" for b in self._all_baselines] + + def _assessment_requirements(self, control): + """ + Convert control.rules to Gemara assessment requirements. + + If the control has no rules, returns a single placeholder requirement + so that the non-empty constraint on assessment-requirements is satisfied. + """ + applicability = self._applicability_for(control) + reqs = [] + seen_req_ids = set() + + for rule_entry in (control.rules or []): + if _is_variable_assignment(rule_entry): + var_name, var_value = rule_entry.split("=", 1) + req_id = var_name + req_text = f"Variable '{var_name}' is set to '{var_value}'" + else: + req_id = rule_entry + req_text = f"Rule '{rule_entry}' MUST be verified" + + if req_id in seen_req_ids: + continue + seen_req_ids.add(req_id) + + reqs.append({ + "id": req_id, + "state": "Active", + "text": req_text, + "applicability": applicability, + }) + + if not reqs: + cac_status = control.status if control.status else "pending" + reqs.append({ + "id": "no-automated-check", + "state": "Active", + "text": ( + f"This control has no automated checks. " + f"ComplianceAsCode status: {cac_status}. Manual assessment required." + ), + "applicability": applicability, + }) + + return reqs + + def _build_control(self, control): + family = _extract_family(control.id) + if family not in NIST_FAMILIES: + family = list(NIST_FAMILIES.keys())[0] # fallback to first family + cac_status = control.status if control.status else "pending" + return { + "id": control.id, + "title": control.title, + "objective": self._objective(control), + "group": family, + "assessment-requirements": self._assessment_requirements(control), + # #Lifecycle: "Active" | "Draft" | "Deprecated" | "Retired" + "state": map_state(cac_status), + } + + def build(self): + """Return a complete ControlCatalog dict ready for serialization.""" + controls = [self._build_control(ctrl) for ctrl in self.policy.controls] + return { + "metadata": self._metadata(), + "title": self.policy.title, + "groups": self._groups(), + "controls": controls, + } diff --git a/utils/nist_sync/gemara/guidance.py b/utils/nist_sync/gemara/guidance.py new file mode 100644 index 00000000000..2de7ea288fa --- /dev/null +++ b/utils/nist_sync/gemara/guidance.py @@ -0,0 +1,249 @@ +"""Builds a Gemara GuidanceCatalog from the NIST 800-53 Rev 5 OSCAL catalog. + +The GuidanceCatalog is the abstract "what should be" layer — it contains the +official NIST 800-53 control text (objectives, statements, guidance prose) +independent of any particular platform or implementation. + +Sources: + - OSCAL catalog: utils/nist_sync/data/nist_800_53_rev5_catalog.json + - Baseline profiles: utils/nist_sync/data/nist_800_53_rev5_{low,moderate,high}_baseline.json +""" + +import json +import re +from datetime import datetime, timezone +from pathlib import Path + +from .catalog import NIST_FAMILIES +from .schema import GEMARA_VERSION + +BASELINES = ["low", "moderate", "high"] + + +def _load_json(path): + with open(path) as f: + return json.load(f) + + +def _build_baseline_index(data_dir): + """Return dict mapping control_id (lowercase) -> list of applicable baseline IDs.""" + index = {} + for baseline in BASELINES: + path = Path(data_dir) / f"nist_800_53_rev5_{baseline}_baseline.json" + if not path.exists(): + continue + data = _load_json(path) + for imp in data["profile"].get("imports", []): + for incl in imp.get("include-controls", []): + for ctrl_id in incl.get("with-ids", []): + ctrl_id = ctrl_id.lower() + if ctrl_id not in index: + index[ctrl_id] = [] + index[ctrl_id].append(baseline) + return index + + +def _build_param_index(ctrl, parent_params=None): + """Build param_id -> label dict for {{ insert: param, ... }} substitution.""" + index = dict(parent_params) if parent_params else {} + for param in ctrl.get("params", []): + pid = param.get("id", "") + label = param.get("label", "") + if not label: + select = param.get("select", {}) + if isinstance(select, dict): + choices = select.get("choice", []) + label = " or ".join(c for c in choices if isinstance(c, str)) + index[pid] = label or pid + return index + + +_PARAM_RE = re.compile(r"\{\{\s*insert:\s*param,\s*([^}]+?)\s*\}\}") + + +def _sub_params(text, param_index): + """Replace OSCAL {{ insert: param, ID }} markers with human-readable labels.""" + def replacer(m): + pid = m.group(1).strip() + return param_index.get(pid, f"[{pid}]") + return _PARAM_RE.sub(replacer, text) + + +def _collect_part_prose(parts, name, param_index): + """Return prose from the first part matching name, substituting params.""" + for part in parts: + if part.get("name") != name: + continue + prose = part.get("prose", "").strip() + if prose: + return _sub_params(prose, param_index) + # Empty top-level prose: join sub-part items + items = [ + _sub_params(sp.get("prose", "").strip(), param_index) + for sp in part.get("parts", []) + if sp.get("prose", "").strip() + ] + return " ".join(items) + return "" + + +def _build_statements(parts, ctrl_id, param_index): + """Build Gemara Statement list from OSCAL statement sub-parts.""" + statements = [] + for part in parts: + if part.get("name") != "statement": + continue + top_prose = part.get("prose", "").strip() + if top_prose: + statements.append({ + "id": f"{ctrl_id}--stmt", + "text": _sub_params(top_prose, param_index), + }) + else: + for i, sp in enumerate(part.get("parts", []), 1): + sp_prose = sp.get("prose", "").strip() + if sp_prose: + statements.append({ + "id": f"{ctrl_id}--stmt-{i}", + "text": _sub_params(sp_prose, param_index), + }) + return statements + + +def _build_guideline(ctrl, family_id, param_index, baseline_index, all_baselines): + """Convert one OSCAL control to a Gemara Guideline dict.""" + ctrl_id = ctrl["id"].lower() + parts = ctrl.get("parts", []) + + # Objective: statement prose (verbatim NIST text), fall back to title + objective = _collect_part_prose(parts, "statement", param_index) + if not objective: + objective = ctrl.get("title", ctrl_id) + + # Applicability: which baselines include this control + applicability = baseline_index.get(ctrl_id) + + # Detailed statements from OSCAL statement sub-parts + statements = _build_statements(parts, ctrl_id, param_index) + + # Rationale from OSCAL guidance prose + guidance_prose = _collect_part_prose(parts, "guidance", param_index) + + guideline = { + "id": ctrl_id, + "title": ctrl["title"], + "objective": objective, + "group": family_id, + "state": "Active", + } + + if applicability: + guideline["applicability"] = applicability + + if statements: + guideline["statements"] = statements + + if guidance_prose: + guideline["rationale"] = { + "importance": guidance_prose, + "goals": [f"Satisfy NIST 800-53 Rev 5 control {ctrl_id.upper()}"], + } + + return guideline + + +def _now_iso(): + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +class GemaraGuidanceCatalogBuilder: + """Builds a Gemara GuidanceCatalog from the NIST 800-53 OSCAL catalog.""" + + def __init__(self, oscal_catalog, data_dir=None): + """ + Args: + oscal_catalog: Parsed OSCAL catalog dict (top-level with 'catalog' key, + or already the inner 'catalog' dict). + data_dir: Path to the directory containing baseline JSON files. + When provided, control applicability is set from the baselines. + """ + raw = oscal_catalog if isinstance(oscal_catalog, dict) else {} + self._catalog = raw.get("catalog", raw) + if data_dir: + self._baseline_index = _build_baseline_index(data_dir) + else: + self._baseline_index = {} + + def _metadata(self): + return { + "id": "nist-800-53-rev5-guidance", + "type": "GuidanceCatalog", + "gemara-version": GEMARA_VERSION, + "description": ( + "NIST Special Publication 800-53 Revision 5 — Security and Privacy Controls " + "for Information Systems and Organizations. This catalog provides the abstract " + "'what should be' layer: official control objectives and guidance prose." + ), + "author": { + "id": "nist", + "name": "National Institute of Standards and Technology", + "type": "Human", + "uri": "https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final", + }, + "version": "Revision 5", + "date": _now_iso(), + "applicability-groups": [ + { + "id": "low", + "title": "Low Baseline", + "description": "NIST 800-53 Low Impact Baseline", + }, + { + "id": "moderate", + "title": "Moderate Baseline", + "description": "NIST 800-53 Moderate Impact Baseline", + }, + { + "id": "high", + "title": "High Baseline", + "description": "NIST 800-53 High Impact Baseline", + }, + ], + } + + def _groups(self): + return [ + { + "id": fam_id, + "title": fam_title, + "description": f"NIST 800-53 {fam_id.upper()} family: {fam_title}", + } + for fam_id, fam_title in NIST_FAMILIES.items() + ] + + def build(self): + """Return a complete GuidanceCatalog dict ready for serialization.""" + guidelines = [] + for oscal_group in self._catalog.get("groups", []): + family_id = oscal_group.get("id", "").lower() + if family_id not in NIST_FAMILIES: + continue + for ctrl in oscal_group.get("controls", []): + param_index = _build_param_index(ctrl) + guidelines.append( + _build_guideline(ctrl, family_id, param_index, self._baseline_index, BASELINES) + ) + # Enhancements (ac-2.1, ac-2.2, …) — merge parent params + for enh in ctrl.get("controls", []): + enh_params = _build_param_index(enh, parent_params=param_index) + guidelines.append( + _build_guideline(enh, family_id, enh_params, self._baseline_index, BASELINES) + ) + + return { + "metadata": self._metadata(), + "title": "NIST Special Publication 800-53 Revision 5", + "type": "Standard", + "groups": self._groups(), + "guidelines": guidelines, + } diff --git a/utils/nist_sync/gemara/mapping.py b/utils/nist_sync/gemara/mapping.py new file mode 100644 index 00000000000..890f7fb8e2d --- /dev/null +++ b/utils/nist_sync/gemara/mapping.py @@ -0,0 +1,132 @@ +"""Builds a Gemara MappingDocument linking CaC controls to rule IDs.""" + +import re +from datetime import datetime, timezone + +from .schema import GEMARA_VERSION +from .status_map import ( + has_mapping, + map_confidence, + map_relationship, + map_strength, +) + +_VAR_ASSIGN_RE = re.compile(r'^[a-z][a-z0-9_]*=[^\s]+$') + +_CATALOG_REF_ID = "cac-nist-800-53-control-catalog" +_RULES_REF_ID = "cac-rules" + + +def _is_variable_assignment(rule_entry): + return bool(_VAR_ASSIGN_RE.match(rule_entry)) + + +def _now_iso(): + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +class GemaraMappingBuilder: + """Builds a Gemara MappingDocument from CaC policy controls.""" + + def __init__(self, product, catalog_id, policy): + self.product = product + self.catalog_id = catalog_id + self.policy = policy + + def _metadata(self): + mapping_id = f"{self.catalog_id}-rules-mapping" + return { + "id": mapping_id, + "type": "MappingDocument", + "gemara-version": GEMARA_VERSION, + "description": ( + f"Mapping from NIST 800-53 Rev 5 controls to ComplianceAsCode " + f"rules for {self.product.upper()}" + ), + "author": { + "id": "complianceascode", + "name": "ComplianceAsCode Project", + "type": "Software", + "uri": "https://github.com/ComplianceAsCode/content", + }, + "date": _now_iso(), + # #MappingReference requires id, title, version (version is required) + "mapping-references": [ + { + "id": _CATALOG_REF_ID, + "title": f"ComplianceAsCode NIST 800-53 Rev 5 Control Catalog for {self.product.upper()}", + "version": "Revision 5", + "url": "https://github.com/ComplianceAsCode/content", + }, + { + "id": _RULES_REF_ID, + "title": f"ComplianceAsCode {self.product.upper()} Rules", + "version": "1.0.0", + "url": "https://github.com/ComplianceAsCode/content", + }, + ], + } + + def _build_mapping_entry(self, control, rule_id): + cac_status = control.status if control.status else "pending" + relationship = map_relationship(cac_status) or "implements" + strength = map_strength(cac_status) or 5 + confidence = map_confidence(cac_status) or "Medium" + + rationale = ( + control.notes.strip() + if getattr(control, "notes", None) + else "Automated enforcement via ComplianceAsCode rule" + ) + + return { + "id": f"{control.id}--{rule_id}", + "source": control.id, + "relationship": relationship, + "targets": [ + { + "entry-id": rule_id, + "strength": strength, + # #ConfidenceLevel: "Undetermined" | "Low" | "Medium" | "High" + "confidence-level": confidence, + "rationale": rationale, + } + ], + } + + def build(self): + """Return a complete MappingDocument dict ready for serialization.""" + mappings = [] + seen_ids = set() + + for control in self.policy.controls: + cac_status = control.status if control.status else "pending" + if not has_mapping(cac_status): + continue + + for rule_entry in (control.rules or []): + if _is_variable_assignment(rule_entry): + continue + + mapping_id = f"{control.id}--{rule_entry}" + if mapping_id in seen_ids: + continue + seen_ids.add(mapping_id) + + mappings.append(self._build_mapping_entry(control, rule_entry)) + + return { + "metadata": self._metadata(), + "title": f"ComplianceAsCode Rules to NIST 800-53 for {self.product.upper()}", + # source-reference uses reference-id pointing to a mapping-reference + "source-reference": { + "reference-id": _CATALOG_REF_ID, + # #EntryType: Guideline|Statement|Control|AssessmentRequirement|... + "entry-type": "Control", + }, + "target-reference": { + "reference-id": _RULES_REF_ID, + "entry-type": "AssessmentRequirement", + }, + "mappings": mappings, + } diff --git a/utils/nist_sync/gemara/schema.py b/utils/nist_sync/gemara/schema.py new file mode 100644 index 00000000000..060ee452285 --- /dev/null +++ b/utils/nist_sync/gemara/schema.py @@ -0,0 +1,203 @@ +"""Gemara schema constants and structural validation.""" + +GEMARA_VERSION = "1.2.0" + +# #Lifecycle: "Active" | "Draft" | "Deprecated" | "Retired" (default: "Active") +VALID_STATES = {"Active", "Draft", "Deprecated", "Retired"} + +# #RelationshipType enum from mappingdocument.cue +VALID_RELATIONSHIPS = { + "implements", + "implemented-by", + "supports", + "supported-by", + "equivalent", + "subsumes", + "no-match", + "relates-to", +} + +# #ConfidenceLevel from collections.cue +VALID_CONFIDENCE_LEVELS = {"Undetermined", "Low", "Medium", "High"} +VALID_ARTIFACT_TYPES = { + "CapabilityCatalog", + "ControlCatalog", + "GuidanceCatalog", + "ThreatCatalog", + "RiskCatalog", + "Policy", + "MappingDocument", + "Lexicon", + "EvaluationLog", + "EnforcementLog", + "VectorCatalog", + "PrincipleCatalog", + "AuditLog", +} + + +def _err(errors, msg): + errors.append(msg) + + +def validate_catalog(catalog): + """ + Validate a ControlCatalog dict against Gemara structural rules. + Returns a list of error strings (empty list means valid). + """ + errors = [] + + if not isinstance(catalog, dict): + return ["catalog must be a dict"] + + # Required top-level fields + for field in ("metadata", "title", "groups"): + if field not in catalog: + _err(errors, f"missing required field: {field!r}") + + metadata = catalog.get("metadata", {}) + if not isinstance(metadata, dict): + _err(errors, "metadata must be a dict") + else: + if metadata.get("type") != "ControlCatalog": + _err(errors, f"metadata.type must be 'ControlCatalog', got {metadata.get('type')!r}") + for field in ("id", "gemara-version", "description", "author"): + if field not in metadata: + _err(errors, f"missing required metadata field: {field!r}") + + # Collect defined group IDs + groups = catalog.get("groups", []) + group_ids = {g["id"] for g in groups if isinstance(g, dict) and "id" in g} + + # Collect defined applicability-group IDs + app_groups = metadata.get("applicability-groups", []) if isinstance(metadata, dict) else [] + app_group_ids = {g["id"] for g in app_groups if isinstance(g, dict) and "id" in g} + + controls = catalog.get("controls", []) + if not isinstance(controls, list): + _err(errors, "controls must be a list") + else: + seen_ids = set() + for i, ctrl in enumerate(controls): + if not isinstance(ctrl, dict): + _err(errors, f"controls[{i}] must be a dict") + continue + for field in ("id", "title", "objective", "group", "state"): + if field not in ctrl: + _err(errors, f"controls[{i}] missing required field: {field!r}") + ctrl_id = ctrl.get("id", f"") + if ctrl_id in seen_ids: + _err(errors, f"duplicate control id: {ctrl_id!r}") + seen_ids.add(ctrl_id) + if ctrl.get("state") not in VALID_STATES: + _err(errors, f"control {ctrl_id!r}: invalid state {ctrl.get('state')!r}") + if ctrl.get("group") and ctrl["group"] not in group_ids: + _err(errors, f"control {ctrl_id!r}: group {ctrl['group']!r} not in groups") + for req in ctrl.get("assessment-requirements", []): + for ref in req.get("applicability", []): + if ref not in app_group_ids: + _err(errors, f"control {ctrl_id!r}: applicability {ref!r} not in applicability-groups") + + return errors + + +def validate_mapping(mapping): + """ + Validate a MappingDocument dict against Gemara structural rules. + Returns a list of error strings (empty list means valid). + """ + errors = [] + + if not isinstance(mapping, dict): + return ["mapping must be a dict"] + + for field in ("metadata", "title", "source-reference", "target-reference", "mappings"): + if field not in mapping: + _err(errors, f"missing required field: {field!r}") + + metadata = mapping.get("metadata", {}) + if isinstance(metadata, dict): + if metadata.get("type") != "MappingDocument": + _err(errors, f"metadata.type must be 'MappingDocument', got {metadata.get('type')!r}") + + mappings = mapping.get("mappings", []) + if not isinstance(mappings, list): + _err(errors, "mappings must be a list") + else: + seen_ids = set() + for i, m in enumerate(mappings): + if not isinstance(m, dict): + _err(errors, f"mappings[{i}] must be a dict") + continue + mid = m.get("id", f"") + if mid in seen_ids: + _err(errors, f"duplicate mapping id: {mid!r}") + seen_ids.add(mid) + rel = m.get("relationship") + if rel not in VALID_RELATIONSHIPS: + _err(errors, f"mapping {mid!r}: invalid relationship {rel!r}") + if rel != "no-match" and not m.get("targets"): + _err(errors, f"mapping {mid!r}: non-no-match relationship requires targets") + + return errors + + +def validate_guidance(guidance): + """ + Validate a GuidanceCatalog dict against Gemara structural rules. + Returns a list of error strings (empty list means valid). + """ + errors = [] + + if not isinstance(guidance, dict): + return ["guidance must be a dict"] + + for field in ("metadata", "title", "type", "groups", "guidelines"): + if field not in guidance: + _err(errors, f"missing required field: {field!r}") + + metadata = guidance.get("metadata", {}) + if not isinstance(metadata, dict): + _err(errors, "metadata must be a dict") + else: + if metadata.get("type") != "GuidanceCatalog": + _err(errors, f"metadata.type must be 'GuidanceCatalog', got {metadata.get('type')!r}") + for field in ("id", "gemara-version", "description", "author"): + if field not in metadata: + _err(errors, f"missing required metadata field: {field!r}") + + valid_guidance_types = {"Standard", "Regulation", "Best Practice", "Framework"} + if guidance.get("type") not in valid_guidance_types: + _err(errors, f"type must be one of {sorted(valid_guidance_types)}, got {guidance.get('type')!r}") + + groups = guidance.get("groups", []) + group_ids = {g["id"] for g in groups if isinstance(g, dict) and "id" in g} + + app_groups = metadata.get("applicability-groups", []) if isinstance(metadata, dict) else [] + app_group_ids = {g["id"] for g in app_groups if isinstance(g, dict) and "id" in g} + + guidelines = guidance.get("guidelines", []) + if not isinstance(guidelines, list): + _err(errors, "guidelines must be a list") + else: + seen_ids = set() + for i, g in enumerate(guidelines): + if not isinstance(g, dict): + _err(errors, f"guidelines[{i}] must be a dict") + continue + for field in ("id", "title", "objective", "group", "state"): + if field not in g: + _err(errors, f"guidelines[{i}] missing required field: {field!r}") + gid = g.get("id", f"") + if gid in seen_ids: + _err(errors, f"duplicate guideline id: {gid!r}") + seen_ids.add(gid) + if g.get("state") not in VALID_STATES: + _err(errors, f"guideline {gid!r}: invalid state {g.get('state')!r}") + if g.get("group") and g["group"] not in group_ids: + _err(errors, f"guideline {gid!r}: group {g['group']!r} not in groups") + for ref in g.get("applicability", []): + if app_group_ids and ref not in app_group_ids: + _err(errors, f"guideline {gid!r}: applicability {ref!r} not in applicability-groups") + + return errors diff --git a/utils/nist_sync/gemara/status_map.py b/utils/nist_sync/gemara/status_map.py new file mode 100644 index 00000000000..3e2044fa1e3 --- /dev/null +++ b/utils/nist_sync/gemara/status_map.py @@ -0,0 +1,78 @@ +"""Maps ComplianceAsCode control status values to Gemara fields.""" + +# CaC status -> Gemara #Lifecycle state (capitalized as per CUE schema) +# Gemara state reflects control *definition* maturity, not automation level. +# Automation level is captured in MappingDocument strength/confidence fields. +CAC_TO_GEMARA_STATE = { + "automated": "Active", + "supported": "Active", + "partial": "Active", + "manual": "Active", + "inherently met": "Active", + "documentation": "Active", + "planned": "Draft", + "pending": "Draft", + "does not meet": "Deprecated", + "not applicable": "Retired", +} + +# CaC status -> Gemara #RelationshipType +# Valid values: implements, implemented-by, supports, supported-by, +# equivalent, subsumes, no-match, relates-to +CAC_TO_RELATIONSHIP = { + "automated": "implements", + "supported": "implements", + "partial": "supports", # "partially-implements" is not in the schema + "manual": "implements", + "inherently met": "equivalent", + "documentation": "implements", +} + +# CaC status -> mapping strength (1-10, measures automation completeness) +CAC_TO_STRENGTH = { + "automated": 8, + "supported": 7, + "partial": 5, + "manual": 6, + "inherently met": 9, + "documentation": 4, +} + +# CaC status -> Gemara #ConfidenceLevel (capitalized as per CUE schema) +# Valid values: "Undetermined" | "Low" | "Medium" | "High" +CAC_TO_CONFIDENCE = { + "automated": "High", + "supported": "High", + "partial": "Medium", + "manual": "Medium", + "inherently met": "High", + "documentation": "Medium", +} + +# Statuses that produce no mapping entry (control not implemented) +NO_MAPPING_STATUSES = {"planned", "pending", "does not meet", "not applicable"} + + +def map_state(cac_status): + """Return the Gemara state for a CaC status string.""" + return CAC_TO_GEMARA_STATE.get(cac_status, "Draft") + + +def map_relationship(cac_status): + """Return the Gemara relationship type for a CaC status, or None if not mappable.""" + return CAC_TO_RELATIONSHIP.get(cac_status) + + +def map_strength(cac_status): + """Return the Gemara mapping strength (1-10) for a CaC status, or None if not mappable.""" + return CAC_TO_STRENGTH.get(cac_status) + + +def map_confidence(cac_status): + """Return the Gemara confidence level string for a CaC status, or None if not mappable.""" + return CAC_TO_CONFIDENCE.get(cac_status) + + +def has_mapping(cac_status): + """Return True if the status produces mapping entries in the MappingDocument.""" + return cac_status not in NO_MAPPING_STATUSES diff --git a/utils/nist_sync/generate_complyctl_bundle.py b/utils/nist_sync/generate_complyctl_bundle.py new file mode 100644 index 00000000000..32ab77db36f --- /dev/null +++ b/utils/nist_sync/generate_complyctl_bundle.py @@ -0,0 +1,480 @@ +#!/usr/bin/env python3 +""" +Generate a complyctl-compatible OCI bundle from Gemara export artifacts. + +This script: + 1. Reads a Gemara ControlCatalog produced by export_to_gemara.py + 2. Generates a Gemara Policy YAML with SHORT CaC rule names in assessment-plans + (the OpenSCAP provider adds the xccdf_org.ssgproject.content_rule_ prefix internally + and compares short names against data stream rules after stripping the prefix) + 3. Optionally packages everything into a split-layer OCI artifact using oras and + pushes it to a local OCI registry + +The generated complytime.yaml includes a 'datastream' target variable pointing to the +product's SCAP data stream, bypassing the provider's OS auto-detection and ensuring +the correct content is always used regardless of the host OS. + +Usage: + # Generate policy YAML only (no registry needed) + python3 utils/nist_sync/generate_complyctl_bundle.py --product rhel9 + + # Package and push to a local registry + python3 utils/nist_sync/generate_complyctl_bundle.py --product rhel9 --push + + # Use a specific rule subset (baseline filter) + python3 utils/nist_sync/generate_complyctl_bundle.py --product rhel9 --baseline moderate + +Prerequisites for --push: + - oras CLI (https://oras.land) on PATH + - A running OCI registry at 127.0.0.1:5000 (start with: + podman run -d -p 5000:5000 --name registry docker.io/library/registry:2) + - complyctl binary on PATH or in ~/.complytime/ +""" + +import argparse +import io +import json +import shutil +import subprocess +import sys +from datetime import datetime, timezone +from pathlib import Path + +try: + from ruamel.yaml import YAML +except ImportError: + sys.stderr.write("Error: ruamel.yaml is required. Install with: pip install ruamel.yaml\n") + sys.exit(1) + +_SCRIPT_DIR = Path(__file__).parent +_REPO_ROOT = _SCRIPT_DIR.parent.parent +_GEMARA_VERSION = "1.2.0" + +# OCI media types for complyctl v1.0.0-alpha.0 (go-gemara v0.0.1 split-layer format) +_MEDIA_TYPE_POLICY = "application/vnd.gemara.policy.v1+yaml" +_MEDIA_TYPE_CATALOG = "application/vnd.gemara.catalog.v1+yaml" +_ARTIFACT_TYPE = "application/vnd.gemara.bundle.v1" + +_PRODUCT_FULL_NAMES = { + "rhel8": "Red Hat Enterprise Linux 8", + "rhel9": "Red Hat Enterprise Linux 9", + "rhel10": "Red Hat Enterprise Linux 10", +} + + +def _now_iso(): + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _yaml(): + y = YAML() + y.default_flow_style = False + y.allow_unicode = True + y.width = 120 + return y + + +def load_yaml(path): + y = _yaml() + with open(path) as f: + return y.load(f) + + +def dump_yaml(data, path): + y = _yaml() + buf = io.StringIO() + y.dump(data, buf) + path.write_text(buf.getvalue(), encoding="utf-8") + + +def extract_rules_from_catalog(catalog, baseline=None, product=None): + """ + Extract unique XCCDF rule IDs from a ControlCatalog. + + Returns a list of (rule_id, nist_control_ids) tuples where: + - rule_id is the raw CaC rule ID (e.g. 'accounts_tmout') + - nist_control_ids is the list of NIST controls that reference this rule + """ + rule_to_controls = {} + # Applicability groups use product-scoped IDs (e.g. "rhel9-low"), so build the key to match. + baseline_key = f"{product}-{baseline}" if (baseline and product) else baseline + + for ctrl in catalog.get("controls", []): + ctrl_id = ctrl.get("id", "") + ctrl_state = ctrl.get("state", "") + + # Skip deprecated/retired controls + if ctrl_state in ("Deprecated", "Retired"): + continue + + # Baseline filter: check if any requirement covers the requested baseline group + if baseline_key: + any_in_baseline = False + for req in ctrl.get("assessment-requirements", []): + if baseline_key in req.get("applicability", []): + any_in_baseline = True + break + if not any_in_baseline: + continue + + for req in ctrl.get("assessment-requirements", []): + req_id = req.get("id", "") + # Skip placeholder and variable requirements + if req_id == "no-automated-check": + continue + text = req.get("text", "") + if text.startswith("Variable '"): + continue + + # req_id is now the bare CaC rule name (e.g. 'accounts_tmout') + rule_id = req_id + + if rule_id not in rule_to_controls: + rule_to_controls[rule_id] = [] + if ctrl_id not in rule_to_controls[rule_id]: + rule_to_controls[rule_id].append(ctrl_id) + + return sorted(rule_to_controls.items()) + + +def generate_policy(product, catalog_id, rules_with_controls): + """ + Build a Gemara Policy YAML dict with short CaC rule names in assessment-plans. + + The OpenSCAP provider's validateRuleExistence() strips 'xccdf_org.ssgproject.content_rule_' + from each data stream rule ID and compares against the requirement-id. So requirement-id + must be the SHORT rule name (e.g. 'accounts_tmout'), not the full XCCDF ID. + The provider then uses getDsRuleID() to re-add the prefix when building the tailoring XML. + """ + full_name = _PRODUCT_FULL_NAMES.get(product, product.upper()) + policy_id = f"nist-800-53-rev5-{product}-policy" + + assessment_plans = [] + for rule_id, _nist_controls in rules_with_controls: + assessment_plans.append({ + # IMPORTANT: complyctl v1.0.0-alpha.0 (go-gemara v0.0.1) reads AssessmentConfiguration.RequirementID + # from the plan 'id' field, not 'requirement-id'. Set both to the short CaC rule name so it works. + "id": rule_id, + "requirement-id": rule_id, + "frequency": "on-demand", + "evaluation-methods": [ + { + "id": "openscap-automated", + "type": "Behavioral", + "mode": "Automated", + } + ], + }) + + return { + "title": f"NIST SP 800-53 Rev 5 for {full_name}", + "metadata": { + "id": policy_id, + "type": "Policy", + "gemara-version": _GEMARA_VERSION, + "description": ( + f"Automated evaluation policy for NIST SP 800-53 Rev 5 on {full_name}, " + "using ComplianceAsCode rules. requirement-id values are short CaC rule names " + "(the OpenSCAP provider adds the xccdf_org.ssgproject.content_rule_ prefix)." + ), + "author": { + "id": "complianceascode", + "name": "ComplianceAsCode Project", + "type": "Software", + "uri": "https://github.com/ComplianceAsCode/content", + }, + "date": _now_iso(), + "mapping-references": [ + { + "id": catalog_id, + "title": f"NIST SP 800-53 Rev 5 Control Catalog for {product.upper()}", + "version": "Revision 5", + "url": "https://github.com/ComplianceAsCode/content", + } + ], + }, + "contacts": { + "responsible": [{"name": "System Administrator"}], + "accountable": [{"name": "Security Team"}], + }, + "scope": { + "in": { + "technologies": [full_name], + } + }, + "imports": { + "catalogs": [ + {"reference-id": catalog_id} + ] + }, + "adherence": { + "evaluation-methods": [ + { + "id": "openscap-automated", + "type": "Behavioral", + "mode": "Automated", + "description": "OpenSCAP automated compliance evaluation", + "executor": { + "id": "openscap", + "name": "OpenSCAP", + "type": "Software", + }, + } + ], + "assessment-plans": assessment_plans, + }, + } + + +def generate_complytime_yaml(product, registry_url, bundle_tag, base_profile="cis"): + """Generate a ~/.complytime/complytime.yaml for this bundle. + + Format expected by complyctl v1.0.0-alpha.0: + - http:// prefix triggers PlainHTTP mode in the OCI client + - 'profile' variable: short XCCDF profile name (provider adds xccdf_org.ssgproject.content_profile_ prefix) + - 'datastream' variable: explicit path to the SCAP data stream, bypassing OS auto-detection + (the provider's findMatchingDatastream() may pick the wrong file on mixed-OS systems) + """ + policy_id = f"nist-800-53-rev5-{product}" + # complyctl appends :latest by default — strip any existing tag to avoid "latest:latest" + bundle_ref = bundle_tag.split(":")[0] + # Product-specific SCAP data stream path + datastream = f"/usr/share/xml/scap/ssg/content/ssg-{product}-ds.xml" + return f"""\ +# complytime.yaml — complyctl v1.0.0-alpha.0 workspace configuration +policies: + - url: {registry_url}/{bundle_ref} + id: {policy_id} + +targets: + - id: local + policies: + - {policy_id} + variables: + profile: {base_profile} + datastream: {datastream} +""" + + +def push_bundle(policy_path, catalog_path, registry_url, tag, verbose=False): + """Package and push split-layer OCI bundle using oras.""" + oras = shutil.which("oras") + if not oras: + sys.stderr.write("ERROR: 'oras' not found on PATH. Install from https://oras.land\n") + return False + + # oras reference must not include the http(s):// scheme — that's handled by --plain-http + registry_host = registry_url.removeprefix("http://").removeprefix("https://") + + if verbose: + print(f" Pushing to {registry_host}/{tag}") + + # oras push with two layers, each with a distinct media type. + # complyctl v1.0.0-alpha.0 (go-gemara v0.0.1) uses split-layer detection: + # layer[mediaType=policy] → policy file + # layer[mediaType=catalog] → catalog file + # Run from the output dir so oras sees relative paths (avoids path-validation error). + cwd = policy_path.parent + policy_rel = policy_path.name + catalog_rel = catalog_path.name + + cmd = [ + oras, "push", + "--plain-http", + f"{registry_host}/{tag}", + f"--artifact-type={_ARTIFACT_TYPE}", + f"{policy_rel}:{_MEDIA_TYPE_POLICY}", + f"{catalog_rel}:{_MEDIA_TYPE_CATALOG}", + ] + + result = subprocess.run(cmd, cwd=str(cwd), capture_output=not verbose, text=True) + if result.returncode != 0: + sys.stderr.write(f"ERROR: oras push failed:\n{result.stderr}\n") + return False + + if verbose: + print(f" Pushed successfully: {registry_host}/{tag}") + return True + + +def write_instructions(output_dir, product, registry_url, bundle_tag): + """Write a HOWTO file with complyctl commands.""" + instructions = f"""\ +# Testing the NIST 800-53 Gemara bundle with complyctl +# Generated: {_now_iso()} + +## Prerequisites + +1. Start a local OCI registry (if not already running): + podman run -d -p 5000:5000 --name registry docker.io/library/registry:2 + +2. Ensure complyctl is on PATH: + export PATH="$HOME/.complytime:$PATH" + +3. Copy complytime.yaml to your config directory: + cp {output_dir}/complytime.yaml ~/.complytime/complytime.yaml + +## Run the tests + +### Step 1: Pull the bundle +complyctl get + +### Step 2: Generate tailored XCCDF (validates the Policy and provider) +complyctl generate + +### Step 3: Run the scan (requires OpenSCAP installed) +complyctl scan + +### Step 4: View results +complyctl report + +## Bundle contents + + Policy: {output_dir}/{product}_policy.yaml + {len(open(f'{output_dir}/{product}_policy.yaml').readlines())} lines + assessment-plans use SHORT CaC rule names (provider adds XCCDF prefix internally) + + Catalog: {output_dir}/{product}_catalog.yaml (copy of build/gemara/{product}/control_catalog.yaml) + Maps NIST controls → XCCDF rules (for traceability and reporting) + +## Traceability + +After the scan, use the MappingDocument to interpret results at the NIST control level: + build/gemara/{product}/rules_mapping.yaml + +Example: if 'accounts_tmout' PASSES, then NIST ac-2.5 is satisfied. +""" + path = output_dir / "HOWTO.txt" + path.write_text(instructions, encoding="utf-8") + return path + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Generate a complyctl-compatible OCI bundle from Gemara export artifacts", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument("--product", default="rhel9", help="Product to generate bundle for (default: rhel9)") + parser.add_argument( + "--gemara-dir", + type=Path, + default=_REPO_ROOT / "build" / "gemara", + help="Directory containing gemara export output (default: build/gemara)", + ) + parser.add_argument( + "--output-dir", + type=Path, + default=Path("/tmp/complyctl-bundle"), + help="Output directory for bundle files (default: /tmp/complyctl-bundle)", + ) + parser.add_argument( + "--registry", + default="127.0.0.1:5000", + help="OCI registry host:port (default: 127.0.0.1:5000)", + ) + parser.add_argument( + "--tag", + default=None, + help="OCI tag (default: nist-800-53-rev5-{product}:latest)", + ) + parser.add_argument( + "--baseline", + choices=["low", "moderate", "high"], + default=None, + help="Filter rules to a NIST baseline (default: all automated rules)", + ) + parser.add_argument( + "--base-profile", + default="cis", + help=( + "XCCDF base profile for tailoring (short name without xccdf_org.ssgproject.content_profile_ prefix). " + "Must contain all assessment-plan rules. For rhel9 moderate baseline, 'cis' covers all 22 rules. " + "(default: cis)" + ), + ) + parser.add_argument("--push", action="store_true", help="Push bundle to the OCI registry using oras") + parser.add_argument("--verbose", action="store_true") + return parser.parse_args() + + +def main(): + args = parse_args() + product = args.product + gemara_dir = args.gemara_dir + output_dir = args.output_dir + registry_url = f"http://{args.registry}" + tag = args.tag or f"nist-800-53-rev5-{product}:latest" + + catalog_yaml_path = gemara_dir / product / "control_catalog.yaml" + if not catalog_yaml_path.exists(): + sys.stderr.write( + f"ERROR: {catalog_yaml_path} not found.\n" + f"Run first: python3 utils/nist_sync/export_to_gemara.py --products {product}\n" + ) + sys.exit(1) + + output_dir.mkdir(parents=True, exist_ok=True) + baseline_note = f" (baseline: {args.baseline})" if args.baseline else " (all automated rules)" + print(f"Generating complyctl bundle for {product}{baseline_note}") + + # Load catalog and extract rules + print(f" Reading {catalog_yaml_path}") + catalog = load_yaml(catalog_yaml_path) + catalog_id = catalog["metadata"]["id"] + rules_with_controls = extract_rules_from_catalog(catalog, baseline=args.baseline, product=product) + print(f" Found {len(rules_with_controls)} unique CaC rules") + print(f" Base profile: {args.base_profile} (XCCDF tailoring base)") + + # Generate Policy YAML + policy = generate_policy(product, catalog_id, rules_with_controls) + policy_path = output_dir / f"{product}_policy.yaml" + dump_yaml(policy, policy_path) + print(f" Wrote Policy: {policy_path}") + print(f" {len(rules_with_controls)} assessment-plans with short CaC rule names") + + # Copy catalog (complyctl needs it in the bundle for traceability) + catalog_copy_path = output_dir / f"{product}_catalog.yaml" + import shutil + shutil.copy2(catalog_yaml_path, catalog_copy_path) + print(f" Wrote Catalog: {catalog_copy_path}") + + # Generate complytime.yaml + complytime_yaml = generate_complytime_yaml(product, registry_url, tag, base_profile=args.base_profile) + complytime_path = output_dir / "complytime.yaml" + complytime_path.write_text(complytime_yaml, encoding="utf-8") + print(f" Wrote complytime.yaml: {complytime_path}") + + # Write HOWTO + howto_path = write_instructions(output_dir, product, registry_url, tag) + print(f" Wrote HOWTO: {howto_path}") + + if args.push: + print(f"\nPushing to OCI registry: {registry_url}/{tag}") + ok = push_bundle( + policy_path, + catalog_copy_path, + registry_url, + tag, + verbose=args.verbose, + ) + if ok: + print("\n Bundle pushed. Next steps:") + print(f" cp {complytime_path} ~/.complytime/complytime.yaml") + print(" complyctl get") + print(" complyctl generate") + print(" complyctl scan") + else: + sys.exit(1) + else: + print(f"\nBundle files written to {output_dir}") + print("To push to a local registry:") + print(" podman run -d -p 5000:5000 --name registry docker.io/library/registry:2") + print(f" python3 utils/nist_sync/generate_complyctl_bundle.py --product {product} --push") + print("\nThen test with complyctl:") + print(f" cp {complytime_path} ~/.complytime/complytime.yaml") + print(" complyctl get && complyctl generate && complyctl scan") + + +if __name__ == "__main__": + main() diff --git a/utils/nist_sync/test_gemara_export.py b/utils/nist_sync/test_gemara_export.py new file mode 100644 index 00000000000..5e1a4186e20 --- /dev/null +++ b/utils/nist_sync/test_gemara_export.py @@ -0,0 +1,448 @@ +#!/usr/bin/env python3 +""" +Tests for the Gemara export output. + +Verifies that the generated Gemara YAML files: + 1. Can be parsed as valid YAML + 2. Have correct structural cross-references (group IDs, applicability IDs) + 3. Are accurate: rules in the output match rules in the source control files + 4. Have expected counts (no controls dropped, no rules silently omitted) + +Usage: + python3 utils/nist_sync/test_gemara_export.py + python3 utils/nist_sync/test_gemara_export.py --products rhel9 + python3 utils/nist_sync/test_gemara_export.py --gemara-dir /tmp/gemara +""" + +import argparse +import sys +from pathlib import Path + +try: + from ruamel.yaml import YAML +except ImportError: + sys.stderr.write("Error: ruamel.yaml is required.\n") + sys.exit(1) + +try: + import ssg.controls +except (ModuleNotFoundError, ImportError): + sys.stderr.write("Unable to load ssg python modules.\n") + sys.stderr.write("Hint: run source ./.pyenv.sh\n") + sys.exit(3) + +_SCRIPT_DIR = Path(__file__).parent +_REPO_ROOT = _SCRIPT_DIR.parent.parent +_YAML = YAML() + + +def load_yaml(path): + with open(path) as f: + return _YAML.load(f) + + +def load_policy(product, repo_root): + policy_file = repo_root / "products" / product / "controls" / "nist_800_53.yml" + policy = ssg.controls.Policy(str(policy_file), env_yaml=None) + policy.load() + return policy + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +class TestResult: + def __init__(self): + self.passed = [] + self.failed = [] + + def ok(self, msg): + self.passed.append(msg) + print(f" [PASS] {msg}") + + def fail(self, msg): + self.failed.append(msg) + print(f" [FAIL] {msg}") + + def check(self, condition, ok_msg, fail_msg): + if condition: + self.ok(ok_msg) + else: + self.fail(fail_msg) + + +# --------------------------------------------------------------------------- +# Test suites +# --------------------------------------------------------------------------- + +def test_catalog_structure(catalog, result): + """Verify internal cross-reference integrity of the ControlCatalog.""" + meta = catalog.get("metadata", {}) + result.check( + meta.get("type") == "ControlCatalog", + "metadata.type is 'ControlCatalog'", + f"metadata.type is wrong: {meta.get('type')}", + ) + result.check( + "gemara-version" in meta, + "metadata.gemara-version present", + "metadata.gemara-version missing", + ) + + defined_group_ids = {g["id"] for g in catalog.get("groups", [])} + app_group_ids = {g["id"] for g in meta.get("applicability-groups", [])} + + result.check(len(defined_group_ids) >= 20, f"{len(defined_group_ids)} NIST families defined as groups", "fewer than 20 NIST families defined") + result.check(len(app_group_ids) >= 3, f"{len(app_group_ids)} applicability groups (baselines) defined", "fewer than 3 baselines defined") + + controls = catalog.get("controls", []) + result.check(len(controls) > 0, f"{len(controls)} controls present in catalog", "no controls in catalog") + + bad_groups = [] + bad_app_refs = [] + missing_objective = [] + bad_states = [] + valid_states = {"Active", "Draft", "Deprecated", "Retired"} + seen_ids = set() + dup_ids = [] + + for ctrl in controls: + cid = ctrl.get("id", "") + if cid in seen_ids: + dup_ids.append(cid) + seen_ids.add(cid) + + if ctrl.get("group") not in defined_group_ids: + bad_groups.append(cid) + if ctrl.get("state") not in valid_states: + bad_states.append(cid) + if not ctrl.get("objective"): + missing_objective.append(cid) + for req in ctrl.get("assessment-requirements", []): + for ref in req.get("applicability", []): + if ref not in app_group_ids: + bad_app_refs.append(f"{cid}:{ref}") + + result.check(not dup_ids, "no duplicate control IDs", f"duplicate IDs: {dup_ids[:5]}") + result.check(not bad_groups, "all control group references resolve", f"unresolved groups: {bad_groups[:5]}") + result.check(not bad_states, "all control states are valid", f"invalid states: {bad_states[:5]}") + result.check(not missing_objective, "all controls have an objective", f"missing objective: {missing_objective[:5]}") + result.check(not bad_app_refs, "all applicability references resolve", f"unresolved: {bad_app_refs[:5]}") + + +def test_mapping_structure(mapping, result): + """Verify internal cross-reference integrity of the MappingDocument.""" + meta = mapping.get("metadata", {}) + result.check( + meta.get("type") == "MappingDocument", + "metadata.type is 'MappingDocument'", + f"metadata.type wrong: {meta.get('type')}", + ) + + mappings = mapping.get("mappings", []) + result.check(len(mappings) > 0, f"{len(mappings)} mapping entries", "no mapping entries") + + valid_rels = {"implements", "implemented-by", "supports", "supported-by", "equivalent", "subsumes", "no-match", "relates-to"} + bad_rels = [] + missing_targets = [] + seen_ids = set() + dup_ids = [] + + for m in mappings: + mid = m.get("id", "") + if mid in seen_ids: + dup_ids.append(mid) + seen_ids.add(mid) + rel = m.get("relationship") + if rel not in valid_rels: + bad_rels.append(f"{mid}:{rel}") + if rel != "no-match" and not m.get("targets"): + missing_targets.append(mid) + for t in m.get("targets", []): + s = t.get("strength", 0) + if not (1 <= s <= 10): + bad_rels.append(f"{mid}: strength {s} out of range") + + result.check(not dup_ids, "no duplicate mapping IDs", f"duplicate IDs: {dup_ids[:5]}") + result.check(not bad_rels, "all relationships and strengths are valid", f"invalid: {bad_rels[:5]}") + result.check(not missing_targets, "all non-no-match mappings have targets", f"missing targets: {missing_targets[:5]}") + + +def test_accuracy_vs_source(catalog, mapping, policy, product, result): + """Cross-check generated output against the source CaC control files.""" + # Control count must match exactly + src_count = len(policy.controls) + out_count = len(catalog.get("controls", [])) + result.check( + src_count == out_count, + f"control count matches source: {out_count}", + f"control count mismatch: source={src_count} output={out_count}", + ) + + catalog_by_id = {c["id"]: c for c in catalog.get("controls", [])} + mapping_by_source = {} + for m in mapping.get("mappings", []): + mapping_by_source.setdefault(m["source"], []).append(m) + + # Spot-check all controls that have rules in source + rule_mismatch = [] + missing_controls = [] + + for src_ctrl in policy.controls: + cid = src_ctrl.id + if cid not in catalog_by_id: + missing_controls.append(cid) + continue + + out_ctrl = catalog_by_id[cid] + + # Collect expected pure rule IDs from source (excluding variable assignments) + src_rules = {r for r in (src_ctrl.rules or []) if "=" not in r} + + # Collect rule IDs from assessment-requirements in catalog output. + # Exclude variable-assignment requirements (text starts with "Variable '") + # and placeholder requirements (id ends with "--no-automated-check") + out_req_rules = set() + for req in out_ctrl.get("assessment-requirements", []): + req_text = req.get("text", "") + if req_text.startswith("Variable '"): + continue + req_id = req["id"] + if req_id.endswith("--no-automated-check"): + continue + rule_part = req_id.split("--", 1)[1] if "--" in req_id else "" + if rule_part: + out_req_rules.add(rule_part) + + missing_from_output = src_rules - out_req_rules + extra_in_output = out_req_rules - src_rules + if missing_from_output or extra_in_output: + rule_mismatch.append( + f"{cid}: missing={sorted(missing_from_output)[:3]} extra={sorted(extra_in_output)[:3]}" + ) + + result.check(not missing_controls, "all source controls present in output", f"missing: {missing_controls[:5]}") + result.check(not rule_mismatch, "all source rules present in output assessment-requirements", f"mismatches (first 3): {rule_mismatch[:3]}") + + # Spot-check ac-2.5 if it exists (known automated control with specific rules) + ac25_src = next((c for c in policy.controls if c.id == "ac-2.5"), None) + if ac25_src and ac25_src.rules: + ac25_out = catalog_by_id.get("ac-2.5") + if ac25_out: + req_rule_ids = { + req["id"].split("--", 1)[1] + for req in ac25_out.get("assessment-requirements", []) + } + expected = {"accounts_tmout", "no_invalid_shell_accounts_unlocked"} + found = expected & req_rule_ids + result.check( + found == expected, + f"ac-2.5 has expected rules: {sorted(found)}", + f"ac-2.5 missing rules: {expected - found}", + ) + result.check( + ac25_out.get("state") == "Active", + "ac-2.5 state is 'Active' (automated control)", + f"ac-2.5 state is {ac25_out.get('state')!r}", + ) + ac25_maps = mapping_by_source.get("ac-2.5", []) + mapped_rule_ids = {t["entry-id"] for m in ac25_maps for t in m.get("targets", [])} + result.check( + "accounts_tmout" in mapped_rule_ids, + "ac-2.5 → accounts_tmout appears in MappingDocument", + "ac-2.5 → accounts_tmout missing from MappingDocument", + ) + + # Pending controls should not appear in mapping (they have no rules) + pending_in_mapping = [ + m["source"] for m in mapping.get("mappings", []) + if any(c.id == m["source"] and (c.status or "pending") in {"pending", "planned", "does not meet", "not applicable"} + for c in policy.controls) + ] + result.check( + not pending_in_mapping, + "pending/planned/does-not-meet controls absent from MappingDocument", + f"pending controls leaked into mapping: {pending_in_mapping[:5]}", + ) + + +def test_guidance_structure(guidance, result): + """Verify internal cross-reference integrity of the GuidanceCatalog.""" + meta = guidance.get("metadata", {}) + result.check( + meta.get("type") == "GuidanceCatalog", + "metadata.type is 'GuidanceCatalog'", + f"metadata.type is wrong: {meta.get('type')}", + ) + result.check( + "gemara-version" in meta, + "metadata.gemara-version present", + "metadata.gemara-version missing", + ) + result.check( + guidance.get("type") == "Standard", + "type is 'Standard'", + f"type is wrong: {guidance.get('type')}", + ) + + defined_group_ids = {g["id"] for g in guidance.get("groups", [])} + app_group_ids = {g["id"] for g in meta.get("applicability-groups", [])} + + result.check(len(defined_group_ids) >= 20, f"{len(defined_group_ids)} NIST families defined as groups", "fewer than 20 NIST families defined") + result.check("low" in app_group_ids and "moderate" in app_group_ids and "high" in app_group_ids, + "low/moderate/high applicability-groups present", + f"missing baseline applicability-groups: {app_group_ids}") + + guidelines = guidance.get("guidelines", []) + result.check(len(guidelines) >= 1000, f"{len(guidelines)} guidelines present", f"fewer than 1000 guidelines: {len(guidelines)}") + + bad_groups = [] + bad_app_refs = [] + missing_objective = [] + bad_states = [] + valid_states = {"Active", "Draft", "Deprecated", "Retired"} + seen_ids = set() + dup_ids = [] + + for gl in guidelines: + gid = gl.get("id", "") + if gid in seen_ids: + dup_ids.append(gid) + seen_ids.add(gid) + if gl.get("group") not in defined_group_ids: + bad_groups.append(gid) + if gl.get("state") not in valid_states: + bad_states.append(gid) + if not gl.get("objective"): + missing_objective.append(gid) + for ref in gl.get("applicability", []): + if ref not in app_group_ids: + bad_app_refs.append(f"{gid}:{ref}") + + result.check(not dup_ids, "no duplicate guideline IDs", f"duplicate IDs: {dup_ids[:5]}") + result.check(not bad_groups, "all guideline group references resolve", f"unresolved groups: {bad_groups[:5]}") + result.check(not bad_states, "all guideline states are valid", f"invalid states: {bad_states[:5]}") + result.check(not missing_objective, "all guidelines have an objective", f"missing objective: {missing_objective[:5]}") + result.check(not bad_app_refs, "all applicability references resolve", f"unresolved: {bad_app_refs[:5]}") + + # Spot-check ac-2.5: moderate+high only, not low + ac25 = next((g for g in guidelines if g.get("id") == "ac-2.5"), None) + if ac25: + appl = set(ac25.get("applicability", [])) + result.check( + "moderate" in appl and "high" in appl and "low" not in appl, + "ac-2.5 applicability is [moderate, high] (not low)", + f"ac-2.5 applicability wrong: {sorted(appl)}", + ) + result.check( + ac25.get("title") == "Inactivity Logout", + "ac-2.5 title is 'Inactivity Logout'", + f"ac-2.5 title wrong: {ac25.get('title')!r}", + ) + result.check( + "log out" in (ac25.get("objective") or "").lower(), + "ac-2.5 objective mentions 'log out'", + f"ac-2.5 objective unexpected: {ac25.get('objective')!r}", + ) + else: + result.fail("ac-2.5 not found in guidelines") + + +# --------------------------------------------------------------------------- +# Runner +# --------------------------------------------------------------------------- + +def run_guidance(gemara_dir, result): + guidance_path = gemara_dir / "guidance_catalog.yaml" + if not guidance_path.exists(): + print(" [SKIP] guidance_catalog.yaml not found — OSCAL data not downloaded") + print(" Run: python3 utils/nist_sync/download_oscal.py && python3 utils/nist_sync/export_to_gemara.py") + return + guidance = load_yaml(guidance_path) + result.ok(f"guidance_catalog.yaml parsed ({guidance_path})") + test_guidance_structure(guidance, result) + + +def run_product(product, gemara_dir, repo_root): + print(f"\n{'='*60}") + print(f"Product: {product}") + print(f"{'='*60}") + result = TestResult() + + catalog_path = gemara_dir / product / "control_catalog.yaml" + mapping_path = gemara_dir / product / "rules_mapping.yaml" + + if not catalog_path.exists(): + print(f" [SKIP] {catalog_path} not found — run export_to_gemara.py first") + return result + + print("\n[1] Loading output files...") + catalog = load_yaml(catalog_path) + result.ok(f"control_catalog.yaml parsed ({catalog_path})") + mapping = None + if mapping_path.exists(): + mapping = load_yaml(mapping_path) + result.ok(f"rules_mapping.yaml parsed ({mapping_path})") + else: + result.fail(f"rules_mapping.yaml not found at {mapping_path}") + + print("\n[2] ControlCatalog structure...") + test_catalog_structure(catalog, result) + + if mapping: + print("\n[3] MappingDocument structure...") + test_mapping_structure(mapping, result) + + print("\n[4] Accuracy vs source control files...") + policy = load_policy(product, repo_root) + test_accuracy_vs_source(catalog, mapping or {}, policy, product, result) + + return result + + +def main(): + parser = argparse.ArgumentParser(description="Test Gemara export output") + parser.add_argument( + "--products", + default="rhel8,rhel9,rhel10", + help="Comma-separated product list", + ) + parser.add_argument( + "--gemara-dir", + type=Path, + default=_REPO_ROOT / "build" / "gemara", + help="Directory containing gemara export output", + ) + parser.add_argument( + "--repo-root", + type=Path, + default=_REPO_ROOT, + ) + args = parser.parse_args() + products = [p.strip() for p in args.products.split(",") if p.strip()] + + all_passed = 0 + all_failed = 0 + + print(f"\n{'='*60}") + print("GuidanceCatalog (platform-independent)") + print(f"{'='*60}") + guidance_result = TestResult() + run_guidance(args.gemara_dir, guidance_result) + all_passed += len(guidance_result.passed) + all_failed += len(guidance_result.failed) + + for product in products: + result = run_product(product, args.gemara_dir, args.repo_root) + all_passed += len(result.passed) + all_failed += len(result.failed) + + print(f"\n{'='*60}") + print(f"SUMMARY: {all_passed} passed, {all_failed} failed") + print(f"{'='*60}") + sys.exit(0 if all_failed == 0 else 1) + + +if __name__ == "__main__": + main() diff --git a/utils/nist_sync/vagrant/Vagrantfile b/utils/nist_sync/vagrant/Vagrantfile new file mode 100755 index 00000000000..46cb933d6eb --- /dev/null +++ b/utils/nist_sync/vagrant/Vagrantfile @@ -0,0 +1,84 @@ +# -*- mode: ruby -*- +# vi: set ft=ruby : +# +# RHEL9 scanner VM for NIST 800-53 Gemara / complyctl E2E testing. +# +# Usage: +# cd utils/nist_sync/vagrant +# vagrant up # brings up the VM (libvirt by default) +# vagrant ssh # open a shell +# vagrant halt # power off +# vagrant destroy # remove completely +# +# After 'vagrant up', run: +# bash populate_inventory.sh # writes ../ansible/inventory.ini +# ansible-playbook -i ../ansible/inventory.ini ../ansible/setup.yml \ +# -e complyctl_bin=/home/$USER/bin/complyctl \ +# -e provider_bin=~/.complytime/providers/complyctl-provider-openscap +# ansible-playbook -i ../ansible/inventory.ini ../ansible/scan.yml + +Vagrant.configure("2") do |config| + # generic/rhel9 ships without a Red Hat subscription — no repos by default. + # The provisioner below adds CentOS Stream 9 BaseOS/AppStream mirrors so that + # openscap-scanner, scap-security-guide, and podman can be installed. + config.vm.box = "generic/rhel9" + config.vm.hostname = "nist-rhel9-scanner" + + # Private network — host can reach VM via its DHCP-assigned IP. + # populate_inventory.sh extracts this IP via 'vagrant ssh-config'. + config.vm.network "private_network", type: "dhcp" + + config.vm.provider "libvirt" do |v| + v.memory = 2048 + v.cpus = 2 + # The libvirt domain name is auto-derived from the Vagrantfile directory + hostname, + # so it won't collide with a pre-existing 'rhel9' libvirt domain. + end + + config.vm.provider "virtualbox" do |v| + v.memory = 2048 + v.cpus = 2 + v.name = "nist-gemara-rhel9" + v.customize ["modifyvm", :id, "--nictype1", "virtio"] + end + + # Configure CentOS Stream 9 repos and install OS-level dependencies. + # These repos are binary-compatible with RHEL9 and publicly accessible without subscription. + config.vm.provision "shell", name: "base-packages", inline: <<~SHELL + set -euo pipefail + + echo "=== Configuring CentOS Stream 9 repos ===" + cat > /etc/yum.repos.d/centos-stream9.repo << 'REPO' +[cs9-baseos] +name=CentOS Stream 9 - BaseOS +baseurl=https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/ +gpgcheck=0 +enabled=1 + +[cs9-appstream] +name=CentOS Stream 9 - AppStream +baseurl=https://mirror.stream.centos.org/9-stream/AppStream/x86_64/os/ +gpgcheck=0 +enabled=1 +REPO + + echo "=== Installing base packages ===" + # Only openscap — podman has a hardcoded RPM file conflict with redhat-release on + # generic/rhel9 boxes (containers-common vs redhat-release-9.3). + # The OCI registry runs on the host instead (see setup.yml / scan.yml). + dnf install -y openscap-scanner openscap-engine-sce 2>&1 | tail -5 + + # scap-security-guide provides /usr/share/xml/scap/ssg/content/ssg-rhel9-ds.xml. + # If unavailable here, setup.yml will copy the data stream built from source. + dnf install -y scap-security-guide 2>&1 | tail -5 || \ + echo " [WARN] scap-security-guide unavailable; Ansible will copy the built data stream." + + echo "=== Base provisioning complete ===" + SHELL + + # After 'vagrant up', write the Ansible inventory on the host machine. + config.trigger.after [:up, :reload] do |trigger| + trigger.info = "Updating Ansible inventory (../ansible/inventory.ini)..." + trigger.run = { path: "populate_inventory.sh" } + end +end diff --git a/utils/nist_sync/vagrant/populate_inventory.sh b/utils/nist_sync/vagrant/populate_inventory.sh new file mode 100755 index 00000000000..7c92ae88ef2 --- /dev/null +++ b/utils/nist_sync/vagrant/populate_inventory.sh @@ -0,0 +1,44 @@ +#!/usr/bin/env bash +# Writes ../ansible/inventory.ini from the current 'vagrant ssh-config' output. +# Called automatically by the Vagrantfile trigger after 'vagrant up'. +# Safe to re-run manually at any time. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +INVENTORY="${SCRIPT_DIR}/../ansible/inventory.ini" + +# vagrant ssh-config is relative to the Vagrantfile location +cd "$SCRIPT_DIR" + +VM_HOST=$(vagrant ssh-config 2>/dev/null | awk '/^ HostName / { print $2 }') +VM_PORT=$(vagrant ssh-config 2>/dev/null | awk '/^ Port / { print $2 }') +VM_KEY=$(vagrant ssh-config 2>/dev/null | awk '/^ IdentityFile / { print $2 }') +VM_USER=$(vagrant ssh-config 2>/dev/null | awk '/^ User / { print $2 }') + +if [[ -z "$VM_HOST" ]]; then + echo "ERROR: Could not read VM address from 'vagrant ssh-config'. Is the VM running?" >&2 + exit 1 +fi + +mkdir -p "$(dirname "$INVENTORY")" + +cat > "$INVENTORY" << EOF +# Auto-generated by populate_inventory.sh — do not edit manually. +# Regenerate with: cd utils/nist_sync/vagrant && bash populate_inventory.sh + +[rhel9_scanner] +nist-rhel9-scanner \ + ansible_host=${VM_HOST} \ + ansible_port=${VM_PORT:-22} \ + ansible_user=${VM_USER:-vagrant} \ + ansible_ssh_private_key_file=${VM_KEY} \ + ansible_ssh_common_args='-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' + +[rhel9_scanner:vars] +ansible_python_interpreter=/usr/bin/python3 +EOF + +echo "Written: ${INVENTORY}" +echo " VM: ${VM_USER:-vagrant}@${VM_HOST}:${VM_PORT:-22}" +echo " Key: ${VM_KEY}"