diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 126b95c..b1ac43c 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -15,6 +15,23 @@ Fixes # ## Testing +## Pre-PR Checklist (REQUIRED) + +Tick every box. If something doesn't apply, write "N/A — " next to it. + +- [ ] **Rebased on latest `main`** — ran `git fetch origin && git rebase origin/main` +- [ ] **Files changed tab reviewed** — only files I intended to change are listed; no accidental deletions +- [ ] **Local lint / tests pass** — `pytest tests/ -q` clean and `python solutions/ess-maker-skills/scripts/flightcheck/cli.py --help` parses, OR I noted below why end-to-end testing wasn't possible +- [ ] **No references to files outside the repo** — header comments and docs don't point at internal source-of-truth files +- [ ] **Defaults match repo conventions** — output paths default to `workspace/flightcheck/...`; risky operations (writes, deletions, destructive API calls) are opt-in via explicit flags, not opt-out +- [ ] **FlightCheck integration** — new checks are wired into a scope in `solutions/ess-maker-skills/scripts/flightcheck/cli.py` (`SCOPE_MAP` and `FULL_SCOPE`); new tests added under `tests/flightcheck/checks/` +- [ ] **API tier registry honored** — new external API calls reference the tier in `tests/fixtures/cassettes/INDEX.md`; any new tier rows added there with rationale (see `solutions/ess-maker-skills/scripts/flightcheck/AGENTS.md`) +- [ ] **Docs updated** — relevant `README.md` / `AGENTS.md` reflect the change + +> **Why "rebased on latest `main`" matters:** Stale branches can silently +> delete files added after your branch was cut. We've already caught one +> case of this. GitHub branch protection enforces this rule automatically. + ## Checklist - [ ] My code follows the existing style - [ ] I have added/updated tests where applicable diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index c4f484a..0554dff 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -12,6 +12,26 @@ This project has adopted the [Microsoft Open Source Code of Conduct](https://ope For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. +## Pre-PR Checklist (REQUIRED) + +Before opening a pull request, confirm each of the following. The PR template +([`.github/PULL_REQUEST_TEMPLATE.md`](.github/PULL_REQUEST_TEMPLATE.md)) will +ask you to tick these off — incomplete PRs may be sent back for cleanup. + +- [ ] **Rebased on latest `main`** — `git fetch origin && git rebase origin/main` +- [ ] **PR diff shows ONLY files you intended to change** — review the "Files changed" tab; no accidental deletions or unrelated edits +- [ ] **Local lint / tests pass** — `pytest tests/ -q` clean, or note in the PR description why end-to-end testing wasn't possible (e.g., no tenant access) +- [ ] **No references to files outside the repo** — header comments and docs should not point to internal source-of-truth files +- [ ] **Defaults match repo conventions** — output paths default to `workspace/flightcheck/...`; risky operations (writes, deletions, destructive API calls) are opt-in via explicit flags +- [ ] **FlightCheck integration** — new checks wired into a scope in `solutions/ess-maker-skills/scripts/flightcheck/cli.py` (`SCOPE_MAP` and `FULL_SCOPE`); new tests live under `tests/flightcheck/checks/` +- [ ] **API tier registry honored** — every new external API call references the tier assigned in `tests/fixtures/cassettes/INDEX.md`; any new tier rows added there with rationale (see [`solutions/ess-maker-skills/scripts/flightcheck/AGENTS.md`](solutions/ess-maker-skills/scripts/flightcheck/AGENTS.md)) +- [ ] **Docs updated** — relevant `README.md` / `AGENTS.md` updated to mention the new check, scope, or behavior change + +> **Why "rebased on latest `main`" matters:** Stale branches can silently +> delete files added after your branch was cut. We've already caught one +> case of this in a sibling repo. GitHub branch protection enforces this +> rule automatically. + ## Maintenance ### Security maintenance diff --git a/solutions/ess-maker-skills/scripts/diagnostics/README.md b/solutions/ess-maker-skills/scripts/diagnostics/README.md new file mode 100644 index 0000000..8cd6a56 --- /dev/null +++ b/solutions/ess-maker-skills/scripts/diagnostics/README.md @@ -0,0 +1,231 @@ +# Workday REST endpoint diagnostic + +Standalone interactive diagnostic that validates the 9 Workday REST +connector actions the Employee Self-Service (ESS) agent invokes at +runtime. Uses the OAuth 2.0 Authorization Code grant against a Workday +API Client you register in your own tenant. + +## Why this lives outside FlightCheck + +The FlightCheck runner (`scripts/flightcheck/cli.py`) is designed to +authenticate against a customer's environment using credentials the +operator already has (Dataverse, Microsoft Graph, Power Platform Admin, +Copilot Studio Island Gateway) and validate everything automatically. +Workday REST validation can't fit that model: Workday REST endpoints +accept ONLY OAuth 2.0 Bearer tokens, and obtaining one requires the +customer to register their own API Client in Workday. That's the same +chicken-and-egg auth problem documented in +[`tests/fixtures/cassettes/INDEX.md`](../../../../tests/fixtures/cassettes/INDEX.md) +under "Workday WQL config-validation pattern." + +So FlightCheck surfaces a `NotConfigured` checkpoint (`WD-REST-MANUAL`) +that points customers at this script. Customers run it interactively +once and attach the resulting JSON to their deployment ticket. + +## What it tests + +9 endpoints corresponding to the 9 Workday REST connector actions in +ESS. The PowerShell ancestor (`Test-WorkdayRESTEndpoints.ps1` in +`ess-preflight-validator`) used identical checkpoint IDs. + +| # | Checkpoint | Operation | Type | Validates | +|---|---|---|---|---| +| 0 | `WD-REST-AUTH` | OAuth Token | Auth | Authorization Code flow yields a bearer token | +| 1 | `WD-REST-ME` | `GET workers/me` | Identity | Authenticated user's profile is returned (gate for all subsequent reads) | +| 2 | `WD-REST-001` | `GetWorkerInboxTasks` | Read | Inbox tasks endpoint is reachable + permitted | +| 3 | `WD-REST-002` | `GetWorkerPaySlips` | Read | Pay slips endpoint is reachable + permitted | +| 4 | `WD-REST-003` | `SearchWorkers` | Read | People picker queries work | +| 5 | `WD-REST-004` | `GetWorkerDirectReports` | Read | Manager view of direct reports works | +| 6 | `WD-REST-005` | `GetSupervisoryOrganizationsManaged` | Read | Manager view of orgs works | +| 7 | `WD-REST-006` | `GetFeedbackTemplates` | Read | Feedback templates endpoint is reachable | +| 8 | `WD-REST-007` | `TransferEmployee` | Write | Job change endpoint reachable (opt-in via `--include-write-tests`) | +| 9 | `WD-REST-008` | `RequestFeedback` | Write | Feedback request endpoint reachable (opt-in) | + +Write tests send a minimal body; an HTTP 400 or 422 response is treated +as a PASS because it confirms the endpoint is reachable and the OAuth +client is authorized — the request body was intentionally not a real +business payload. + +## Prerequisites + +1. **Python 3.11+** (matches the rest of the kit). +2. The kit's script dependencies installed: + ```bash + pip install -r solutions/ess-maker-skills/scripts/requirements.txt + ``` +3. **A Workday OAuth API Client** registered in your tenant: + - Workday > **Register API Client** (or **Edit API Client**) + - Grant Type: **Authorization Code** + - Redirect URI: `https://localhost:8888/callback` (default) or your + own — pass it via `--redirect-uri` + - Note the **Client ID** and **Client Secret** (the secret is shown + exactly once; copy it immediately) +4. **Security domain access** for the API Client: + - Self-Service: Current Staffing Information (gates `/workers/me`) + - Worker Data: Inbox / Pay / Reports / Organizations (read tests) + - Performance Management (feedback templates) + - Staffing (write tests, if you opt in) + +## Usage + +### Interactive (recommended for the first run) + +```bash +python solutions/ess-maker-skills/scripts/diagnostics/test_workday_rest_endpoints.py +``` + +You'll be prompted for the tenant, hosts, Client ID, and Client Secret. +The browser opens to Workday's login page; sign in. The browser then +redirects to `https://localhost:8888/callback?code=...` and shows a +connection error (expected — there's no server listening on HTTPS). +**Copy the FULL URL from the address bar** and paste it back at the +prompt. The script extracts the `code`, verifies the `state` parameter +matches what it sent, exchanges the code for an access token, and runs +the 9 endpoint tests. + +### Fully parameterized (CI-friendly, non-interactive prompts disabled) + +```bash +python solutions/ess-maker-skills/scripts/diagnostics/test_workday_rest_endpoints.py \ + --workday-tenant contoso_impl1 \ + --workday-host wd2-impl-services1.workday.com \ + --authorize-host impl.workday.com \ + --oauth-client-id YTIzM2RlNDct... \ + --oauth-client-secret '' +``` + +Even fully-parameterized, the OAuth flow still needs a browser. There +is no fully-headless mode by design — the chicken-and-egg auth bootstrap +problem is precisely what this script does NOT try to solve. + +### Optional: HTTP loopback listener (advanced) + +If your Workday API Client is registered with `http://localhost:8888/callback` +(plain HTTP, not HTTPS), you can let the script spin up a tiny stdlib +HTTP server to capture the callback automatically: + +```bash +python solutions/ess-maker-skills/scripts/diagnostics/test_workday_rest_endpoints.py \ + --redirect-uri http://localhost:8888/callback \ + --listen +``` + +The script falls back to the paste-the-URL flow if `--listen` is set +but `--redirect-uri` is HTTPS, because stdlib `http.server` cannot +terminate TLS without a cert and shipping a self-signed cert with the +diagnostic causes its own trust-store friction. + +### Include write tests (test/impl tenants only!) + +```bash +python solutions/ess-maker-skills/scripts/diagnostics/test_workday_rest_endpoints.py \ + --include-write-tests \ + ... +``` + +> ⚠️ Only enable write tests in test/impl tenants. The bodies are +> intentionally minimal placeholders; if your security domain permits +> them, the requests are recorded by Workday. A `400` or `422` response +> is a PASS — the endpoint is reachable. + +## Output + +A summary is printed to stdout and a structured JSON file is written +to `workspace/flightcheck/workday-rest-.json` (override +with `--output-dir`). + +### What the JSON contains + +- Test metadata: timestamp, tenant, API root, totals per status. +- Per-checkpoint result: `id`, `operation`, `type`, `status`, + `details`, `latency_ms`, `http_status`. +- A `workers_me_response` block with the GetWorkerMe response, useful + for reviewing which Workday security domains the API client has. + +### Secrets and PII hygiene + +- The OAuth **client secret**, **authorization code**, **access token**, + and **refresh token** are NEVER logged to stdout, the JSON output, or + the OAuth callback log. The token endpoint's error responses are + reduced to status + error class to avoid leaking either the secret + or the code. +- The **GetWorkerMe response** is included in the JSON for diagnostic + value but PII fields (`descriptor`, `primaryWorkEmail`, + `businessTitle`, `primarySupervisoryOrganization.descriptor`, and the + raw WID in `id`) are **redacted by default**. Pass `--include-pii` to + keep them when debugging inside your own tenant. Even then, do not + paste the JSON into a public issue tracker. + +## Parameters + +| Flag | Required | Description | +|------|----------|-------------| +| `--workday-tenant` | Yes (or prompted) | Workday tenant name (e.g. `contoso_impl1`). | +| `--workday-host` | Yes (or prompted) | Workday REST API host (e.g. `wd2-impl-services1.workday.com`). | +| `--authorize-host` | Yes (or prompted) | Workday OAuth authorize host (e.g. `impl.workday.com`). | +| `--oauth-client-id` | Yes (or prompted) | OAuth Client ID. | +| `--oauth-client-secret` | Yes (or prompted) | OAuth Client Secret (use a credential manager when scripting). | +| `--redirect-uri` | No | Override the OAuth redirect URI. Default: `https://localhost:8888/callback`. | +| `--listen` | No | Start an HTTP loopback listener for the callback. Only valid with `http://localhost` redirect URIs. | +| `--include-write-tests` | No | Include `TransferEmployee` + `RequestFeedback`. Skipped by default. | +| `--include-pii` | No | Keep employee PII in the JSON output. Default: redacted. | +| `--test-worker-id` | No | Worker ID to use for employee-specific reads. Default: WID returned by GetWorkerMe. | +| `--search-term` | No | Search term for SearchWorkers. Default: first word of GetWorkerMe descriptor. | +| `--output-dir` | No | Where to write the JSON. Default: `workspace/flightcheck`. | + +## Common host values + +| Environment | `--workday-host` (REST API) | `--authorize-host` (OAuth) | +|-------------|------------------------------|----------------------------| +| Implementation (DC2) | `wd2-impl-services1.workday.com` | `impl.workday.com` | +| Implementation (DC5) | `wd5-impl-services1.workday.com` | `impl.workday.com` | +| Production (DC5) | `wd5-services1.workday.com` | `wd5.myworkday.com` | + +## Troubleshooting + +### `[WD-REST-AUTH] FAIL — token endpoint returned HTTP 401` + +Workday rejected the client credentials. Verify: +- Client ID and Secret match what Workday > Register API Client shows +- The grant type on the API Client is **Authorization Code** +- The redirect URI you used matches exactly (including scheme and port) + +### `[WD-REST-ME] FAIL 403` + +Auth worked but the API Client lacks +**Self-Service: Current Staffing Information**. Ask the Workday admin to +grant that domain to the API Client. + +### `[WD-REST-00x] FAIL 403` + +The API Client is missing one of the Worker Data / Performance +Management security domains. The `details` field on the result names +which checkpoint failed; map back to the prerequisites section above. + +### `[WD-REST-00x] FAIL 404` + +The endpoint path doesn't exist on this tenant's API version. This is +rare for the ESS-supported endpoints but possible when running against +a very old tenant or a tenant in a different data center than expected. +Check `--workday-host` against the common host values above. + +### Browser shows "connection refused" — expected + +When the redirect URI is `https://localhost:8888/callback` (default), +Workday will redirect the browser there after sign-in. There's no +server listening because the script does not ship a TLS cert. The +browser shows a connection error; this is normal — copy the URL from +the address bar and paste it. + +## How this relates to other validators + +- **SOAP-side SSO**: `solutions/ess-maker-skills/src/reference/workday-sso-test-flow/` + is a Power Automate flow template that tests the `OAuthUser` Entra + SSO connection via `Get_Workers` SOAP. Different connection, different + auth model — not a substitute for this REST diagnostic. +- **FlightCheck Workday checks** (`checks/workday.py`): validate + Dataverse env vars, connection references, flow status, and SOAP + workflows. They do NOT validate REST endpoints (deliberately — see + the architecture note above). +- **Tier registry**: this diagnostic is referenced from the + `Workday WQL / REST` row of `tests/fixtures/cassettes/INDEX.md`. diff --git a/solutions/ess-maker-skills/scripts/diagnostics/__init__.py b/solutions/ess-maker-skills/scripts/diagnostics/__init__.py new file mode 100644 index 0000000..59e481e --- /dev/null +++ b/solutions/ess-maker-skills/scripts/diagnostics/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/solutions/ess-maker-skills/scripts/diagnostics/test_workday_rest_endpoints.py b/solutions/ess-maker-skills/scripts/diagnostics/test_workday_rest_endpoints.py new file mode 100644 index 0000000..620a200 --- /dev/null +++ b/solutions/ess-maker-skills/scripts/diagnostics/test_workday_rest_endpoints.py @@ -0,0 +1,801 @@ +#!/usr/bin/env python3 +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""ESS Workday REST endpoint pre-flight diagnostic. + +Validates the 9 Workday REST connector actions the Employee Self-Service +(ESS) agent calls at runtime, using the OAuth 2.0 Authorization Code +flow against a customer-registered Workday API Client. Same checkpoint +IDs (``WD-REST-AUTH`` / ``WD-REST-ME`` / ``WD-REST-001`` ... ``WD-REST-008``) +as the source PowerShell script (``Test-WorkdayRESTEndpoints.ps1`` from +ess-preflight-validator commits 5eb19bc and 9ed2055). + +This script lives outside the FlightCheck runner intentionally. Workday +REST OAuth bootstrap is the chicken-and-egg auth problem documented in +``tests/fixtures/cassettes/INDEX.md`` (see "Workday WQL config-validation +pattern"): validating that the customer's Workday API Client is wired up +is hard to automate because the validator itself needs a registered API +Client to talk to Workday. The source repo accepts this and ships an +interactive customer-run script. We do the same — but use Python, and +default to a paste-the-URL UX so we don't have to ship a self-signed +TLS cert for the ``https://localhost`` redirect URI Workday expects. + +Usage:: + + python solutions/ess-maker-skills/scripts/diagnostics/test_workday_rest_endpoints.py + +Prerequisites: register an OAuth API Client in Workday with +"Authorization Code" grant type and redirect URI +``https://localhost:8888/callback`` (or pass ``--redirect-uri`` to use +your own). See ``README.md`` in this directory for the full setup. + +Secrets hygiene +--------------- + +The OAuth client secret, authorization code, access token, and refresh +token are NEVER logged. Employee PII returned by the endpoints +(name, email, business title, organization, manager) is redacted in the +JSON output by default; pass ``--include-pii`` to keep it for in-tenant +debugging. Do not paste the redacted JSON into public issues regardless +— Workday WIDs are tenant-internal identifiers. +""" + +from __future__ import annotations + +import argparse +import datetime as dt +import getpass +import http.server +import json +import os +import secrets as _stdlib_secrets +import socketserver +import ssl +import sys +import threading +import time +import urllib.parse +import webbrowser +from dataclasses import asdict, dataclass, field +from typing import Any, Optional + +import requests +from requests.exceptions import RequestException + +DEFAULT_REDIRECT_URI = "https://localhost:8888/callback" +DEFAULT_OUTPUT_DIR = "workspace/flightcheck" + +# Fields in the GetWorkerMe + collection responses we consider PII. Redacted +# by default in JSON output. Each entry is a JSON-path-ish dot string; nested +# access uses dict.get() and is forgiving of missing intermediates. +_PII_PATHS_WORKER = ( + "id", + "descriptor", + "primaryWorkEmail", + "businessTitle", + "primarySupervisoryOrganization.descriptor", + "primarySupervisoryOrganization.id", +) + + +# ─────────────────────────────────────────────────────────────────────── +# Result dataclass — same shape as the source PS PSCustomObject. +# ─────────────────────────────────────────────────────────────────────── + + +@dataclass +class CheckResult: + id: str + operation: str + type: str # "Auth" | "Identity" | "Read" | "Write" + status: str # "PASS" | "FAIL" | "WARN" | "SKIP" + details: str = "" + latency_ms: int = 0 + http_status: Optional[int] = None + + +@dataclass +class SuiteResult: + timestamp: str + tenant: str + api_root: str + worker_wid: Optional[str] = None + results: list[CheckResult] = field(default_factory=list) + + def add(self, r: CheckResult) -> None: + self.results.append(r) + + @property + def passed(self) -> int: + return sum(1 for r in self.results if r.status == "PASS") + + @property + def failed(self) -> int: + return sum(1 for r in self.results if r.status == "FAIL") + + @property + def warned(self) -> int: + return sum(1 for r in self.results if r.status == "WARN") + + @property + def skipped(self) -> int: + return sum(1 for r in self.results if r.status == "SKIP") + + +# ─────────────────────────────────────────────────────────────────────── +# OAuth Authorization Code flow +# ─────────────────────────────────────────────────────────────────────── + + +class _LocalhostCallbackHandler(http.server.BaseHTTPRequestHandler): + """Captures ``?code=...&state=...`` from the redirect, stores it on the + server, and serves a small HTML page so the user knows they can close + the browser tab. + """ + + captured: dict[str, str] = {} + + # Quiet down the noisy default access log; we don't want OAuth params + # echoed to stderr where they'd end up in CI logs. + def log_message(self, format: str, *args: Any) -> None: # noqa: A002 + return + + def do_GET(self) -> None: # noqa: N802 — stdlib API name + parsed = urllib.parse.urlparse(self.path) + params = urllib.parse.parse_qs(parsed.query) + if "code" in params: + self.__class__.captured["code"] = params["code"][0] + if "state" in params: + self.__class__.captured["state"] = params["state"][0] + if "error" in params: + self.__class__.captured["error"] = params["error"][0] + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.end_headers() + body = ( + b"

You can close this tab.

" + b"

The Workday REST diagnostic captured the authorization " + b"code; switch back to your terminal.

" + ) + self.wfile.write(body) + + +def _generate_state() -> str: + return _stdlib_secrets.token_urlsafe(24) + + +def _start_loopback_server(host: str, port: int) -> socketserver.TCPServer: + """Bind a tiny HTTP server on ``host:port`` for the OAuth redirect. + + Only safe when ``redirect_uri`` is ``http://localhost...`` — TLS + termination on stdlib http.server is brittle and the customer's + Workday API Client config is what dictates which scheme is in use. + """ + server = socketserver.TCPServer((host, port), _LocalhostCallbackHandler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + return server + + +def _exchange_code_for_token( + *, + token_url: str, + client_id: str, + client_secret: str, + code: str, + redirect_uri: str, + timeout: float = 30.0, +) -> dict[str, Any]: + """POST to Workday's token endpoint. Returns parsed JSON on 200, + or raises ``RuntimeError`` with a redacted error string on failure. + + We use HTTP Basic auth (RFC 6749 §2.3.1) because that's what the + source PS does — Workday's REST OAuth accepts both Basic and + body-encoded client credentials and Basic is the safer default. + """ + body = { + "grant_type": "authorization_code", + "code": code, + "redirect_uri": redirect_uri, + } + resp = requests.post( + token_url, + data=body, + auth=(client_id, client_secret), + headers={"Accept": "application/json"}, + timeout=timeout, + ) + if resp.status_code != 200: + # Don't echo the raw body — it can contain the code or hints + # about what the secret looked like. Keep only status + a + # generic error class taken from JSON if present. + err_class = "" + try: + err_class = resp.json().get("error", "") + except (ValueError, KeyError): + pass + raise RuntimeError( + f"token endpoint returned HTTP {resp.status_code}" + + (f" ({err_class})" if err_class else "") + ) + return resp.json() + + +def acquire_oauth_token( + *, + workday_tenant: str, + workday_host: str, + authorize_host: str, + client_id: str, + client_secret: str, + redirect_uri: str = DEFAULT_REDIRECT_URI, + listen: bool = False, + browser_opener=webbrowser.open, + paste_prompt=input, +) -> str: + """Perform the OAuth 2.0 Authorization Code flow and return the access token. + + Default flow (no ``--listen``): open the browser, tell the user to + copy the redirect URL from the address bar, parse the code and + state out of it. Works for the conventional Workday API Client + config with ``https://localhost:...`` redirect URI without needing + a TLS cert. + + Listen flow (``--listen`` AND ``redirect_uri`` starts with + ``http://localhost``): bind a tiny HTTP loopback server, wait for + the browser to hit it, capture code and state automatically. + """ + parsed = urllib.parse.urlparse(redirect_uri) + use_listener = listen and parsed.scheme == "http" and parsed.hostname in ( + "localhost", "127.0.0.1", + ) + + state = _generate_state() + auth_url = ( + f"https://{authorize_host}/{workday_tenant}/authorize" + f"?response_type=code" + f"&client_id={urllib.parse.quote(client_id, safe='')}" + f"&redirect_uri={urllib.parse.quote(redirect_uri, safe='')}" + f"&state={urllib.parse.quote(state, safe='')}" + ) + + server: Optional[socketserver.TCPServer] = None + captured: dict[str, str] = {} + if use_listener: + _LocalhostCallbackHandler.captured = captured + host = parsed.hostname or "localhost" + port = parsed.port or 8888 + server = _start_loopback_server(host, port) + print(f" Listening for OAuth callback on http://{host}:{port}...") + + print() + print(" Opening browser for Workday OAuth login...") + browser_opener(auth_url) + + if use_listener: + try: + assert server is not None + deadline = time.monotonic() + 300 # 5 minutes + while time.monotonic() < deadline and "code" not in captured: + time.sleep(0.25) + finally: + assert server is not None + server.shutdown() + server.server_close() + if "error" in captured: + raise RuntimeError(f"Workday returned error '{captured['error']}'") + if "code" not in captured: + raise RuntimeError("Timed out waiting for OAuth callback") + code = captured["code"] + returned_state = captured.get("state", "") + else: + print() + print(" After signing in, the browser will redirect to a URL that") + print(" starts with your redirect URI (it may show a connection") + print(f" error — that's expected because Workday redirects to {redirect_uri}).") + print(" Copy the FULL URL from the address bar and paste it below.") + print() + raw = paste_prompt(" Paste redirect URL (or just the code): ").strip() + code, returned_state = _parse_code_from_paste(raw) + + if returned_state and returned_state != state: + raise RuntimeError("OAuth state mismatch (possible CSRF or stale paste). Aborting.") + + token_url = f"https://{workday_host}/ccx/oauth2/{workday_tenant}/token" + token_json = _exchange_code_for_token( + token_url=token_url, + client_id=client_id, + client_secret=client_secret, + code=code, + redirect_uri=redirect_uri, + ) + access_token = token_json.get("access_token") + if not access_token: + raise RuntimeError("Workday token response had no access_token") + return access_token + + +def _parse_code_from_paste(raw: str) -> tuple[str, str]: + """Extract ``code`` and ``state`` from a pasted redirect URL or raw code.""" + if "?" in raw or raw.startswith("http"): + parsed = urllib.parse.urlparse(raw) + params = urllib.parse.parse_qs(parsed.query) + code = params.get("code", [""])[0] + state = params.get("state", [""])[0] + if not code: + raise RuntimeError("Pasted URL did not contain ?code=...") + return code, state + return raw, "" + + +# ─────────────────────────────────────────────────────────────────────── +# REST helpers +# ─────────────────────────────────────────────────────────────────────── + + +def _invoke_workday_rest( + *, + api_root: str, + tenant: str, + bearer_token: str, + module: str, + version: str, + resource: str, + method: str = "GET", + query_params: dict[str, str] | None = None, + body: dict[str, Any] | None = None, + timeout: float = 30.0, +) -> tuple[Optional[int], Any, int, str]: + """Single REST request. Returns ``(status_code, json_or_none, latency_ms, error_or_empty)``.""" + url = f"{api_root}/{module}/{version}/{tenant}/{resource.lstrip('/')}" + headers = { + "Authorization": f"Bearer {bearer_token}", + "Accept": "application/json", + } + started = time.monotonic() + try: + if method == "GET": + resp = requests.get(url, headers=headers, params=query_params or {}, timeout=timeout) + else: + resp = requests.request( + method, url, headers={**headers, "Content-Type": "application/json"}, + params=query_params or {}, json=body, timeout=timeout, + ) + except RequestException as e: + return None, None, int((time.monotonic() - started) * 1000), str(e) + latency_ms = int((time.monotonic() - started) * 1000) + payload: Any = None + try: + payload = resp.json() if resp.content else None + except ValueError: + payload = None + return resp.status_code, payload, latency_ms, "" + + +# ─────────────────────────────────────────────────────────────────────── +# Endpoint catalog (ported from source $readOperations / $writeOperations) +# ─────────────────────────────────────────────────────────────────────── + + +def _build_read_operations(worker_id: str, search_term: str) -> list[dict[str, Any]]: + return [ + {"id": "WD-REST-001", "name": "GetWorkerInboxTasks", "module": "common", + "version": "v1", "resource": f"workers/{worker_id}/inboxTasks", + "query": {"limit": "5"}, "check_field": "data"}, + {"id": "WD-REST-002", "name": "GetWorkerPaySlips", "module": "common", + "version": "v1", "resource": f"workers/{worker_id}/paySlips", + "query": {"limit": "5"}, "check_field": "data"}, + {"id": "WD-REST-003", "name": "SearchWorkers", "module": "common", + "version": "v1", "resource": "workers", + "query": {"search": search_term, "limit": "5"}, "check_field": "data"}, + {"id": "WD-REST-004", "name": "GetWorkerDirectReports", "module": "common", + "version": "v1", "resource": f"workers/{worker_id}/directReports", + "query": {}, "check_field": "data"}, + {"id": "WD-REST-005", "name": "GetSupervisoryOrganizationsManaged", + "module": "common", "version": "v1", + "resource": f"workers/{worker_id}/supervisoryOrganizationsManaged", + "query": {}, "check_field": "data"}, + {"id": "WD-REST-006", "name": "GetFeedbackTemplates", + "module": "performanceEnablement", "version": "v5", + "resource": "values/feedbackTemplate/feedbackTemplate/", + "query": {}, "check_field": "data"}, + ] + + +def _build_write_operations(worker_id: str) -> list[dict[str, Any]]: + future = (dt.date.today() + dt.timedelta(days=30)).isoformat() + return [ + {"id": "WD-REST-007", "name": "TransferEmployee", "module": "common", + "version": "v1", "resource": f"workers/{worker_id}/jobChanges", + "method": "POST", + "body": { + "supervisoryOrganization": {"id": "test-validation-only"}, + "jobChangeReason": {"id": "test-validation-only"}, + "effective": future, + "moveManagersTeam": False, + }}, + {"id": "WD-REST-008", "name": "RequestFeedback", + "module": "performanceEnablement", "version": "v5", + "resource": f"workers/{worker_id}/requestedFeedbackOnWorkerEvents", + "method": "POST", + "body": { + "feedbackResponders": [{"id": worker_id}], + "feedbackConfidential": False, + "showFeedbackProviderName": True, + "expirationDate": future, + }}, + ] + + +def _classify_failure(status_code: int | None, op_type: str) -> tuple[str, str]: + """Map an HTTP status to (status, details) per source PS semantics.""" + if status_code == 401: + return "FAIL", "401 Unauthorized — token expired or invalid" + if status_code == 403: + return "FAIL", "403 Forbidden — OAuth client missing required security domain" + if status_code == 404: + return "FAIL", "404 Not Found — endpoint path may differ for this tenant" + if op_type == "Write" and status_code in (400, 422): + return "PASS", f"HTTP {status_code} (expected — minimal body, endpoint reachable + auth OK)" + if status_code is None: + return "FAIL", "network error" + return "FAIL", f"HTTP {status_code}" + + +# ─────────────────────────────────────────────────────────────────────── +# PII / secrets redaction +# ─────────────────────────────────────────────────────────────────────── + + +def _redact_worker_fields(obj: Any) -> Any: + """Strip PII fields from a worker-shaped response copy. Returns a new + structure; the input is not mutated. + + Best-effort: ESS Workday REST responses use ``descriptor`` for the + display name and ``id`` for the WID, plus a handful of contact and + org fields. We walk the structure and overwrite those by name. + """ + if isinstance(obj, dict): + out: dict[str, Any] = {} + for k, v in obj.items(): + if k in ("descriptor", "primaryWorkEmail", "businessTitle"): + out[k] = "[REDACTED]" + elif k == "id" and isinstance(v, str): + out[k] = "[REDACTED-WID]" + else: + out[k] = _redact_worker_fields(v) + return out + if isinstance(obj, list): + return [_redact_worker_fields(v) for v in obj] + return obj + + +# ─────────────────────────────────────────────────────────────────────── +# Suite execution +# ─────────────────────────────────────────────────────────────────────── + + +def _resolve_worker_identity( + suite: SuiteResult, api_root: str, tenant: str, bearer_token: str, + user_supplied_worker_id: Optional[str], user_supplied_search: Optional[str], +) -> tuple[Optional[str], Optional[str], dict[str, Any] | None]: + """Call ``workers/me`` and parse out the WID + a search term for use + by subsequent read tests. Returns ``(worker_id, search_term, full_response)``. + + The full response is stashed only so callers can include a redacted + summary in the JSON output; it must not leak through ``print``. + """ + status_code, payload, latency_ms, err = _invoke_workday_rest( + api_root=api_root, tenant=tenant, bearer_token=bearer_token, + module="common", version="v1", resource="workers/me", + ) + if status_code == 200 and isinstance(payload, dict) and payload.get("id"): + worker_id = payload["id"] + descriptor = payload.get("descriptor", "") + search_term = (user_supplied_search + or (descriptor.split(" ")[0] if descriptor else worker_id)) + suite.add(CheckResult( + id="WD-REST-ME", operation="GetWorkerMe", type="Identity", + status="PASS", details=f"WID and identity returned ({latency_ms}ms)", + latency_ms=latency_ms, http_status=status_code, + )) + return (user_supplied_worker_id or worker_id), search_term, payload + + status, details = _classify_failure(status_code, "Identity") + if err: + details = f"network error: {err}" + suite.add(CheckResult( + id="WD-REST-ME", operation="GetWorkerMe", type="Identity", + status=status, details=details, latency_ms=latency_ms, + http_status=status_code, + )) + return None, None, None + + +def _run_read_operations( + suite: SuiteResult, api_root: str, tenant: str, bearer_token: str, + worker_id: str, search_term: str, +) -> None: + for op in _build_read_operations(worker_id, search_term): + status_code, payload, latency_ms, err = _invoke_workday_rest( + api_root=api_root, tenant=tenant, bearer_token=bearer_token, + module=op["module"], version=op["version"], + resource=op["resource"], query_params=op["query"], + ) + if status_code is not None and 200 <= status_code < 300: + has_field = isinstance(payload, dict) and op["check_field"] in payload + if has_field: + total = payload.get("total") if isinstance(payload, dict) else None + detail = f"OK, total={total}" if total is not None else "OK" + suite.add(CheckResult( + id=op["id"], operation=op["name"], type="Read", + status="PASS", details=detail, latency_ms=latency_ms, + http_status=status_code, + )) + else: + suite.add(CheckResult( + id=op["id"], operation=op["name"], type="Read", + status="WARN", + details=f"Endpoint reachable but no '{op['check_field']}' in response", + latency_ms=latency_ms, http_status=status_code, + )) + else: + status, details = _classify_failure(status_code, "Read") + if err: + details = f"network error: {err}" + suite.add(CheckResult( + id=op["id"], operation=op["name"], type="Read", + status=status, details=details, latency_ms=latency_ms, + http_status=status_code, + )) + + +def _run_write_operations( + suite: SuiteResult, api_root: str, tenant: str, bearer_token: str, + worker_id: str, include_write_tests: bool, +) -> None: + write_ops = _build_write_operations(worker_id) + if not include_write_tests: + for op in write_ops: + suite.add(CheckResult( + id=op["id"], operation=op["name"], type="Write", + status="SKIP", details="Skipped (use --include-write-tests to enable)", + )) + return + + for op in write_ops: + status_code, _payload, latency_ms, err = _invoke_workday_rest( + api_root=api_root, tenant=tenant, bearer_token=bearer_token, + module=op["module"], version=op["version"], resource=op["resource"], + method=op["method"], body=op["body"], + ) + if status_code is not None and 200 <= status_code < 300: + suite.add(CheckResult( + id=op["id"], operation=op["name"], type="Write", + status="PASS", details="Endpoint reachable, auth accepted (HTTP 2xx)", + latency_ms=latency_ms, http_status=status_code, + )) + continue + status, details = _classify_failure(status_code, "Write") + if err: + details = f"network error: {err}" + suite.add(CheckResult( + id=op["id"], operation=op["name"], type="Write", + status=status, details=details, latency_ms=latency_ms, + http_status=status_code, + )) + + +# ─────────────────────────────────────────────────────────────────────── +# Output +# ─────────────────────────────────────────────────────────────────────── + + +def _print_summary(suite: SuiteResult) -> None: + print() + print("=" * 64) + print(" WORKDAY REST DIAGNOSTIC — SUMMARY") + print("=" * 64) + print(f" Total: {len(suite.results)} checks") + print(f" Passed: {suite.passed}") + print(f" Failed: {suite.failed}") + print(f" Warned: {suite.warned}") + print(f" Skipped: {suite.skipped}") + print() + for r in suite.results: + marker = {"PASS": " ✓", "FAIL": " ✗", "WARN": " ~", "SKIP": " -"}.get(r.status, " ?") + print(f"{marker} [{r.id}] {r.operation:36s} {r.status:5s} {r.details}") + print() + + +def _write_json_output( + suite: SuiteResult, output_dir: str, *, me_response: dict | None, + include_pii: bool, +) -> str: + os.makedirs(output_dir, exist_ok=True) + stamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ") + out_path = os.path.join(output_dir, f"workday-rest-{stamp}.json") + + me_section: Any = None + if me_response is not None: + me_section = me_response if include_pii else _redact_worker_fields(me_response) + + body = { + "timestamp": suite.timestamp, + "tenant": suite.tenant, + "api_root": suite.api_root, + "worker_wid": suite.worker_wid if include_pii else ( + "[REDACTED-WID]" if suite.worker_wid else None + ), + "totals": { + "checks": len(suite.results), + "passed": suite.passed, + "failed": suite.failed, + "warned": suite.warned, + "skipped": suite.skipped, + }, + "include_pii": include_pii, + "results": [asdict(r) for r in suite.results], + "workers_me_response": me_section, + } + with open(out_path, "w", encoding="utf-8") as f: + json.dump(body, f, indent=2) + return out_path + + +# ─────────────────────────────────────────────────────────────────────── +# Entry point +# ─────────────────────────────────────────────────────────────────────── + + +def _prompt_missing(args: argparse.Namespace) -> argparse.Namespace: + """Interactive prompts for the values the source PS prompts for.""" + if not args.workday_tenant: + args.workday_tenant = input("Enter Workday Tenant (e.g., contoso_impl1): ").strip() + if not args.workday_host: + default = "wd2-impl-services1.workday.com" + v = input(f"Enter Workday REST API Host [{default}]: ").strip() + args.workday_host = v or default + if not args.authorize_host: + default = "impl.workday.com" + v = input(f"Enter Workday Authorize Host [{default}]: ").strip() + args.authorize_host = v or default + if not args.oauth_client_id: + args.oauth_client_id = input( + "Enter OAuth Client ID (from Workday > Register API Client): " + ).strip() + if not args.oauth_client_secret: + args.oauth_client_secret = getpass.getpass("Enter OAuth Client Secret: ") + return args + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + prog="test_workday_rest_endpoints.py", + description=( + "Validate the 9 Workday REST connector actions ESS uses at runtime, " + "via OAuth 2.0 Authorization Code flow." + ), + ) + parser.add_argument("--workday-tenant", + help="Workday tenant name (e.g. contoso_impl1)") + parser.add_argument("--workday-host", + help="Workday REST API host (e.g. wd2-impl-services1.workday.com)") + parser.add_argument("--authorize-host", + help="Workday OAuth authorize host (e.g. impl.workday.com)") + parser.add_argument("--redirect-uri", default=DEFAULT_REDIRECT_URI, + help="OAuth redirect URI configured in the API client " + f"(default: {DEFAULT_REDIRECT_URI}).") + parser.add_argument("--oauth-client-id", + help="OAuth Client ID from Workday > Register API Client.") + parser.add_argument("--oauth-client-secret", + help="OAuth Client Secret. Prompted securely if omitted.") + parser.add_argument("--test-worker-id", default=None, + help="Worker ID to use for employee-specific reads. " + "Default: WID returned by GetWorkerMe.") + parser.add_argument("--search-term", default=None, + help="Search term for SearchWorkers. Default: first " + "word of GetWorkerMe descriptor.") + parser.add_argument("--include-write-tests", action="store_true", + help="Include TransferEmployee + RequestFeedback. " + "Skipped by default. Use ONLY in test tenants.") + parser.add_argument("--listen", action="store_true", + help="Spin up an HTTP loopback server for the OAuth " + "callback. Only valid when --redirect-uri starts " + "with http://localhost; otherwise falls back to " + "the paste-the-URL flow.") + parser.add_argument("--include-pii", action="store_true", + help="Keep employee PII (name, email, WID, etc.) in " + "the JSON output. Default: PII is redacted. Use " + "ONLY in your own tenant for debugging.") + parser.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR, + help=f"Where to write the JSON result file (default: {DEFAULT_OUTPUT_DIR}).") + parser.add_argument("--print-help-only", action="store_true", + help=argparse.SUPPRESS) # used by smoke tests + + args = parser.parse_args(argv) + if args.print_help_only: + return 0 + + args = _prompt_missing(args) + if not all([args.workday_tenant, args.workday_host, args.authorize_host, + args.oauth_client_id, args.oauth_client_secret]): + print("ERROR: Missing required parameter.", file=sys.stderr) + return 2 + + api_root = f"https://{args.workday_host}/ccx/api" + suite = SuiteResult( + timestamp=dt.datetime.now(dt.timezone.utc).isoformat(), + tenant=args.workday_tenant, + api_root=api_root, + ) + + print() + print("=" * 64) + print(" Workday REST Endpoint Diagnostic") + print(" Auth: OAuth 2.0 Authorization Code") + print("=" * 64) + print(f" Tenant: {args.workday_tenant}") + print(f" API Root: {api_root}") + print(f" Authorize: https://{args.authorize_host}/{args.workday_tenant}/authorize") + print(f" Redirect URI: {args.redirect_uri}") + print(f" Write Tests: {'ENABLED' if args.include_write_tests else 'SKIPPED'}") + print() + + # Step 1: OAuth + try: + bearer_token = acquire_oauth_token( + workday_tenant=args.workday_tenant, + workday_host=args.workday_host, + authorize_host=args.authorize_host, + client_id=args.oauth_client_id, + client_secret=args.oauth_client_secret, + redirect_uri=args.redirect_uri, + listen=args.listen, + ) + suite.add(CheckResult( + id="WD-REST-AUTH", operation="OAuth Token (AuthCode)", type="Auth", + status="PASS", details="Access token acquired", + )) + except Exception as e: + suite.add(CheckResult( + id="WD-REST-AUTH", operation="OAuth Token (AuthCode)", type="Auth", + status="FAIL", details=str(e), + )) + _print_summary(suite) + path = _write_json_output(suite, args.output_dir, me_response=None, + include_pii=args.include_pii) + print(f" Results: {path}") + return 1 + + # Step 2: identity + worker_id, search_term, me_resp = _resolve_worker_identity( + suite, api_root, args.workday_tenant, bearer_token, + args.test_worker_id, args.search_term, + ) + if not worker_id: + _print_summary(suite) + path = _write_json_output(suite, args.output_dir, me_response=None, + include_pii=args.include_pii) + print(f" Results: {path}") + return 1 + suite.worker_wid = worker_id + + # Step 3: reads + _run_read_operations(suite, api_root, args.workday_tenant, bearer_token, + worker_id, search_term or worker_id) + + # Step 4: writes + _run_write_operations(suite, api_root, args.workday_tenant, bearer_token, + worker_id, args.include_write_tests) + + _print_summary(suite) + path = _write_json_output(suite, args.output_dir, me_response=me_resp, + include_pii=args.include_pii) + print(f" Results: {path}") + return 1 if suite.failed else 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/solutions/ess-maker-skills/scripts/flightcheck/checks/firewall_export.py b/solutions/ess-maker-skills/scripts/flightcheck/checks/firewall_export.py new file mode 100644 index 0000000..ee00ef6 --- /dev/null +++ b/solutions/ess-maker-skills/scripts/flightcheck/checks/firewall_export.py @@ -0,0 +1,142 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Firewall requirements export — emits a markdown handoff doc for the +customer's network team. + +Pure file-render helper. Does NOT make any network calls or consume any +external API contracts, so the cardinal rule does not apply. The same +required-endpoints.json that ``checks/network.py`` reads is the +authoritative source. + +Ported from ess-preflight-validator commit 9ed2055 +(`PowerShell/Export-FirewallRequirements.ps1`). +""" + +from __future__ import annotations + +import json +import os +from datetime import datetime, timezone + +DEFAULT_CONFIG_PATH = os.path.join( + os.path.dirname(os.path.dirname(__file__)), + "config", + "required-endpoints.json", +) + + +def export_firewall_requirements( + config: dict, + out_path: str, + *, + catalog_path: str | None = None, + now: datetime | None = None, +) -> str: + """Render the firewall-requirements markdown doc and write it to ``out_path``. + + Parameters + ---------- + config: + The customer's ``.local/config.json`` (used only to surface + ``network.servicenow_instance`` if set, so the network team sees a + resolved hostname instead of ``{instance}``). + out_path: + Path of the file to write. + catalog_path: + Optional override for the ``required-endpoints.json`` location. + Defaults to the kit-shipped copy under + ``solutions/ess-maker-skills/scripts/flightcheck/config/``. + now: + Optional ``datetime`` for the document's "generated at" stamp. + Defaults to ``datetime.now(timezone.utc)``. Exposed so tests can + pin a deterministic timestamp for golden-file comparison. + + Returns + ------- + The absolute path of the written file. Existence of the parent + directory is the caller's responsibility (``cli.py`` creates it). + """ + path = catalog_path or DEFAULT_CONFIG_PATH + with open(path, "r", encoding="utf-8") as f: + catalog = json.load(f) + + servicenow_instance = (config.get("network") or {}).get("servicenow_instance") + stamp = (now or datetime.now(timezone.utc)).strftime("%Y-%m-%d %H:%M:%S UTC") + + lines: list[str] = [] + lines.append("# ESS Firewall Allow-List Requirements") + lines.append("") + lines.append(f"_Generated: {stamp}_") + lines.append("") + lines.append( + "This document lists the outbound network endpoints the Employee " + "Self-Service (ESS) Copilot Studio agent's connectors need to reach. " + "Hand it to your corporate IT / network team and ask them to allow " + "outbound HTTPS (TCP 443) to every host listed below." + ) + lines.append("") + lines.append("**Scope:** Vendor endpoints only (Workday, ServiceNow, SAP SuccessFactors).") + lines.append("Microsoft endpoints are documented authoritatively by Microsoft:") + for link in catalog.get("microsoftEndpointsReference", {}).get("links", []): + lines.append(f"- [{link.get('title', '')}]({link.get('url', '')})") + lines.append("") + lines.append("---") + lines.append("") + + for integration in catalog.get("integrations", []): + name = integration.get("name", "") + required = integration.get("required", False) + hosting = integration.get("hostingPattern", "") + ip_note = integration.get("ipRangeNote", "") + + lines.append(f"## {name}") + lines.append("") + lines.append(f"- **Required:** {'Yes' if required else 'Optional'}") + lines.append(f"- **Hosting pattern:** {hosting}") + if ip_note: + lines.append(f"- **IP range guidance:** {ip_note}") + lines.append("") + lines.append("| Host | Port | Purpose |") + lines.append("|---|---|---|") + for endpoint in integration.get("endpoints", []): + host = endpoint.get("host", "") + port = endpoint.get("port", 443) + purpose = endpoint.get("purpose", "") + display_host = host + if "{instance}" in host: + if servicenow_instance: + display_host = host.replace("{instance}", servicenow_instance) + else: + display_host = host + " _(set `network.servicenow_instance` in `.local/config.json` to resolve)_" + lines.append(f"| `{display_host}` | {port} | {purpose} |") + lines.append("") + + lines.append("---") + lines.append("") + lines.append("## Notes for the network team") + lines.append("") + lines.append( + "- All listed hosts must be reachable on **TCP port 443** outbound from " + "the Power Platform and Copilot Studio runtime infrastructure as well " + "as from the customer's deployment workstations." + ) + lines.append( + "- **TLS inspection (SSL bumping)** between Power Platform and these " + "vendor hosts can break the connectors. If TLS inspection is in place, " + "please exempt the listed hostnames or ensure the inspected certificate " + "chains validate cleanly." + ) + lines.append( + "- Workday and SAP SuccessFactors hostnames are **data-center based**, " + "not tenant-prefixed. Confirm with the customer's Workday / SAP account " + "team which data center their tenant is hosted in before pruning the " + "list to a subset." + ) + lines.append("") + + content = "\n".join(lines) + "\n" + with open(out_path, "w", encoding="utf-8") as f: + f.write(content) + return out_path diff --git a/solutions/ess-maker-skills/scripts/flightcheck/checks/network.py b/solutions/ess-maker-skills/scripts/flightcheck/checks/network.py new file mode 100644 index 0000000..d843f4b --- /dev/null +++ b/solutions/ess-maker-skills/scripts/flightcheck/checks/network.py @@ -0,0 +1,407 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +ESS FlightCheck — Vendor Network Reachability (NET-001, NET-002, NET-003) + +"Shift-left" pre-deployment validator that probes outbound TCP + HTTPS +reachability to the vendor hostnames that the Employee Self-Service (ESS) +agent's connectors need at runtime. Catches missing firewall allow-list +entries, SSL inspection, and proxy interference WEEKS before the ESS +deployment cutover, when network-team change requests still have time to +land. + +Vendor scope only: Workday, ServiceNow, SAP SuccessFactors. Microsoft +endpoints (Power Platform, Entra ID, Dataverse, Copilot Studio) are +documented authoritatively by Microsoft — see +https://learn.microsoft.com/en-us/power-platform/admin/online-requirements +— and this check deliberately does NOT duplicate that allowlist. + +Cardinal-rule note (see scripts/flightcheck/AGENTS.md): this is a +transport-level diagnostic. It does NOT consume vendor API response +contracts, so the validated/validatable/documented tier system does not +apply. Instead the production code uses injectable `TcpProber` / +`HttpsProber` implementations so tests can substitute deterministic +fakes for the six relevant failure modes (refused, timeout, DNS failure, +TLS error, 4xx, 5xx). The tier registry in +`tests/fixtures/cassettes/INDEX.md` has a dedicated +"Vendor TCP/HTTPS reachability" row documenting this exception. + +Ported from ess-preflight-validator commit 9ed2055 +(`PowerShell/Test-NetworkConnectivity.ps1`). +""" + +from __future__ import annotations + +import json +import os +import socket +import ssl +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass +from typing import Callable, Iterable, Optional, Protocol + +import requests +from requests.exceptions import ( + ConnectionError as RequestsConnectionError, + SSLError as RequestsSSLError, + Timeout as RequestsTimeout, +) + +from ..runner import CheckResult, Priority, Status + +DOC_BASE = "https://learn.microsoft.com/en-us/copilot/microsoft-365/employee-self-service" + +DEFAULT_TIMEOUT_SECS = 5.0 +DEFAULT_MAX_WORKERS = 8 + +CONFIG_PATH = os.path.join( + os.path.dirname(os.path.dirname(__file__)), + "config", + "required-endpoints.json", +) + + +# --------------------------------------------------------------------------- +# Probe protocol — injectable so tests can substitute deterministic fakes. +# Production implementations live below; tests pass their own. +# --------------------------------------------------------------------------- + +class ProbeStatus: + """String constants for probe outcomes. Stable, used in tests.""" + + REACHABLE = "reachable" # TCP open, HTTPS 2xx/3xx/4xx (auth-required) + HTTP_5XX = "http_5xx" # TCP open, HTTPS 5xx (server-side problem) + TLS_ERROR = "tls_error" # TCP open, TLS handshake failed (likely SSL inspection) + REFUSED = "refused" # TCP connection actively refused (firewall block) + TIMEOUT = "timeout" # TCP / HTTPS exceeded timeout (silent drop) + DNS_FAILURE = "dns_failure" # Hostname did not resolve + SKIPPED = "skipped" # Placeholder host left unresolved (e.g. {instance}) + + +@dataclass +class ProbeResult: + host: str + port: int + status: str # ProbeStatus.* + detail: str = "" # Human-readable explanation + latency_ms: int = 0 # Set when probe completed + + +class TcpProber(Protocol): + def probe(self, host: str, port: int, timeout: float) -> ProbeResult: ... + + +class HttpsProber(Protocol): + def probe(self, host: str, port: int, timeout: float) -> ProbeResult: ... + + +class _StdlibTcpProber: + """Production TCP prober — uses ``socket.create_connection``.""" + + def probe(self, host: str, port: int, timeout: float) -> ProbeResult: + try: + with socket.create_connection((host, port), timeout=timeout): + return ProbeResult(host=host, port=port, status=ProbeStatus.REACHABLE, + detail=f"TCP {port} open") + except socket.gaierror as e: + return ProbeResult(host=host, port=port, status=ProbeStatus.DNS_FAILURE, + detail=f"DNS resolution failed: {e}") + except socket.timeout: + return ProbeResult(host=host, port=port, status=ProbeStatus.TIMEOUT, + detail=f"TCP {port} timed out after {timeout}s (firewall silent drop?)") + except (ConnectionRefusedError, OSError) as e: + # OSError covers "no route to host", "network unreachable", and refused. + # We treat them all as REFUSED for remediation purposes (firewall is + # blocking us); the detail line carries the underlying message. + return ProbeResult(host=host, port=port, status=ProbeStatus.REFUSED, + detail=f"TCP {port} refused/unreachable: {e}") + + +class _RequestsHttpsProber: + """Production HTTPS prober — uses ``requests`` HEAD against ``https://host:port/``. + + A HEAD request is enough to detect TLS interception and reach the application + layer. We accept ANY HTTP status code as "reachable" — even 401 / 403 / 404 — + because the goal is to confirm the connector can complete a TLS handshake + and exchange HTTP framing, not to authenticate or authorize. + """ + + def probe(self, host: str, port: int, timeout: float) -> ProbeResult: + url = f"https://{host}" if port == 443 else f"https://{host}:{port}" + try: + resp = requests.head(url, timeout=timeout, allow_redirects=False) + if resp.status_code >= 500: + return ProbeResult(host=host, port=port, status=ProbeStatus.HTTP_5XX, + detail=f"HTTPS {resp.status_code} server error") + return ProbeResult(host=host, port=port, status=ProbeStatus.REACHABLE, + detail=f"HTTPS {resp.status_code}") + except RequestsSSLError as e: + return ProbeResult(host=host, port=port, status=ProbeStatus.TLS_ERROR, + detail=f"TLS handshake failed (SSL inspection?): {e}") + except ssl.SSLError as e: + return ProbeResult(host=host, port=port, status=ProbeStatus.TLS_ERROR, + detail=f"TLS error: {e}") + except RequestsTimeout: + return ProbeResult(host=host, port=port, status=ProbeStatus.TIMEOUT, + detail=f"HTTPS timed out after {timeout}s") + except RequestsConnectionError as e: + return ProbeResult(host=host, port=port, status=ProbeStatus.REFUSED, + detail=f"HTTPS connection failed: {e}") + + +# --------------------------------------------------------------------------- +# Checkpoint mapping — one integration name -> one NET-* checkpoint id. +# Order is stable and matches the JSON config's `integrations` array. +# --------------------------------------------------------------------------- + +_CHECKPOINT_IDS: dict[str, str] = { + "Workday": "NET-001", + "ServiceNow": "NET-002", + "SAP SuccessFactors": "NET-003", +} + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def run_network_checks( + runner, + *, + tcp_prober: Optional[TcpProber] = None, + https_prober: Optional[HttpsProber] = None, + config_path: Optional[str] = None, + timeout: float = DEFAULT_TIMEOUT_SECS, + max_workers: int = DEFAULT_MAX_WORKERS, +) -> list[CheckResult]: + """Run TCP + HTTPS reachability probes against vendor endpoints. + + Reads the endpoint catalog from ``required-endpoints.json``. Per-customer + selection comes from ``runner.config['network']``: + + - ``network.integrations``: list of integration names to probe. + Defaults to every integration in the JSON marked ``required: true``. + - ``network.servicenow_instance``: substitutes ``{instance}`` in + ServiceNow endpoint hostnames. Required if "ServiceNow" is selected + (otherwise the ServiceNow row is emitted as ``Skipped``). + + Workday and SuccessFactors endpoints are data-center-based, NOT + tenant-prefixed, so there is intentionally no ``workday_tenant`` or + ``successfactors_tenant`` knob. See the JSON's ``_hostingNote``. + + The ``tcp_prober`` and ``https_prober`` keyword arguments exist for tests. + Production callers pass nothing and get the stdlib / requests-backed + implementations. + """ + tcp = tcp_prober or _StdlibTcpProber() + https = https_prober or _RequestsHttpsProber() + path = config_path or CONFIG_PATH + + try: + with open(path, "r", encoding="utf-8") as f: + catalog = json.load(f) + except FileNotFoundError: + return [CheckResult( + checkpoint_id="NET-CONFIG", category="Network", + priority=Priority.HIGH.value, status=Status.ERROR.value, + description="Network endpoint catalog", + result=f"required-endpoints.json not found at {path}", + remediation="Restore required-endpoints.json from the kit's scripts/flightcheck/config/ directory.", + )] + + network_config = (runner.config or {}).get("network", {}) if hasattr(runner, "config") else {} + selected = network_config.get("integrations") + servicenow_instance = network_config.get("servicenow_instance") + + results: list[CheckResult] = [] + integrations = catalog.get("integrations", []) + + # Default to all required integrations if the user didn't specify. + if selected is None: + selected_names = {it["name"] for it in integrations if it.get("required")} + else: + selected_names = set(selected) + + for integration in integrations: + name = integration.get("name", "") + checkpoint_id = _CHECKPOINT_IDS.get(name) + if not checkpoint_id: + continue # Catalog has a new integration we don't have an ID for yet. + + if name not in selected_names: + # Skipped: customer didn't opt in. Don't emit noise — but DO emit + # one Skipped line so customers can confirm the check at least + # saw the integration. + results.append(CheckResult( + checkpoint_id=checkpoint_id, category="Network", + priority=Priority.MEDIUM.value, status=Status.SKIPPED.value, + description=f"{name} outbound reachability", + result=f"Skipped — not in network.integrations", + remediation=( + f"Add \"{name}\" to network.integrations in .local/config.json " + "to probe its endpoints." + ), + )) + continue + + results.append(_probe_integration( + integration=integration, + checkpoint_id=checkpoint_id, + tcp=tcp, + https=https, + timeout=timeout, + max_workers=max_workers, + servicenow_instance=servicenow_instance, + )) + + return results + + +def _probe_integration( + *, + integration: dict, + checkpoint_id: str, + tcp: TcpProber, + https: HttpsProber, + timeout: float, + max_workers: int, + servicenow_instance: Optional[str], +) -> CheckResult: + """Probe every endpoint for a single integration and aggregate to one CheckResult.""" + name = integration.get("name", "") + endpoints = integration.get("endpoints", []) + resolved, skipped_hosts = _resolve_hosts(endpoints, servicenow_instance) + + if not resolved and skipped_hosts: + # Every endpoint was a placeholder we couldn't resolve. Skip the whole + # integration with a remediation pointing at the missing config key. + return CheckResult( + checkpoint_id=checkpoint_id, category="Network", + priority=Priority.HIGH.value, status=Status.SKIPPED.value, + description=f"{name} outbound reachability", + result=f"Skipped — {len(skipped_hosts)} placeholder host(s) had no value", + remediation=( + f"Set network.servicenow_instance in .local/config.json to your ServiceNow " + "instance prefix (e.g. \"contoso\") so {instance} can be substituted." + ) if name == "ServiceNow" else "Configure the placeholder substitution for this integration.", + ) + + # Probe TCP + HTTPS concurrently across all resolved endpoints. + probe_results: list[tuple[dict, ProbeResult, ProbeResult]] = [] + with ThreadPoolExecutor(max_workers=max_workers) as pool: + futures = { + pool.submit(_probe_one, ep, tcp, https, timeout): ep + for ep in resolved + } + for future in futures: + ep = futures[future] + tcp_res, https_res = future.result() + probe_results.append((ep, tcp_res, https_res)) + + # Aggregate: one CheckResult per integration; per-host detail in the result field. + reachable_count = sum( + 1 for _, t, h in probe_results + if t.status == ProbeStatus.REACHABLE and h.status == ProbeStatus.REACHABLE + ) + warning_count = sum( + 1 for _, t, h in probe_results + if t.status == ProbeStatus.REACHABLE + and h.status in (ProbeStatus.HTTP_5XX, ProbeStatus.TLS_ERROR) + ) + failed_count = sum( + 1 for _, t, h in probe_results + if t.status in (ProbeStatus.REFUSED, ProbeStatus.TIMEOUT, ProbeStatus.DNS_FAILURE) + ) + total = len(probe_results) + + detail_lines = [] + for ep, t, h in probe_results: + host_port = f"{ep['host']}:{ep.get('port', 443)}" + if t.status == ProbeStatus.REACHABLE and h.status == ProbeStatus.REACHABLE: + detail_lines.append(f" [OK] {host_port} — {h.detail}") + elif t.status == ProbeStatus.REACHABLE: + detail_lines.append(f" [WARN] {host_port} — TCP open but HTTPS: {h.detail}") + else: + detail_lines.append(f" [FAIL] {host_port} — {t.detail}") + for placeholder in skipped_hosts: + detail_lines.append(f" [SKIP] {placeholder} — placeholder not resolved") + + summary = f"{reachable_count}/{total} reachable" + if warning_count: + summary += f", {warning_count} warning" + if failed_count: + summary += f", {failed_count} failed" + if skipped_hosts: + summary += f", {len(skipped_hosts)} skipped" + + result_text = summary + "\n" + "\n".join(detail_lines) + + if failed_count > 0: + status = Status.FAILED.value + remediation = ( + "Open the affected hostnames + port 443 on your outbound firewall. " + f"Use `python solutions/ess-maker-skills/scripts/flightcheck/cli.py " + f"--export-firewall-requirements` to generate a network-team handoff doc. " + f"Vendor IP ranges: {integration.get('ipRangeNote', 'see vendor documentation')}." + ) + elif warning_count > 0: + status = Status.WARNING.value + remediation = ( + "TCP reachable but HTTPS layer failed for at least one host. Common causes: " + "TLS inspection / SSL bumping by a corporate proxy, or vendor-side 5xx during the probe. " + "Retry; if persistent, ask your network team to confirm TLS interception is disabled for these hosts." + ) + else: + status = Status.PASSED.value + remediation = "" + + return CheckResult( + checkpoint_id=checkpoint_id, category="Network", + priority=Priority.HIGH.value if integration.get("required") else Priority.MEDIUM.value, + status=status, + description=f"{name} outbound reachability", + result=result_text, + remediation=remediation, + doc_link=DOC_BASE, + ) + + +def _probe_one( + endpoint: dict, + tcp: TcpProber, + https: HttpsProber, + timeout: float, +) -> tuple[ProbeResult, ProbeResult]: + """Probe a single endpoint: TCP first, then HTTPS (only if TCP succeeded).""" + host = endpoint["host"] + port = endpoint.get("port", 443) + tcp_res = tcp.probe(host, port, timeout) + if tcp_res.status != ProbeStatus.REACHABLE: + # Don't bother with HTTPS probe if TCP is blocked — short-circuit. + return tcp_res, ProbeResult(host=host, port=port, status=ProbeStatus.SKIPPED, + detail="HTTPS not probed (TCP failed)") + https_res = https.probe(host, port, timeout) + return tcp_res, https_res + + +def _resolve_hosts( + endpoints: list[dict], + servicenow_instance: Optional[str], +) -> tuple[list[dict], list[str]]: + """Resolve ``{instance}`` placeholders and partition into resolved vs skipped. + + Workday and SAP SuccessFactors endpoints are NOT tenant-prefixed (per the + JSON's ``_hostingNote``); only ServiceNow uses ``{instance}``. + """ + resolved: list[dict] = [] + skipped: list[str] = [] + for ep in endpoints: + host = ep.get("host", "") + if "{instance}" in host: + if not servicenow_instance: + skipped.append(host) + continue + ep = {**ep, "host": host.replace("{instance}", servicenow_instance)} + resolved.append(ep) + return resolved, skipped diff --git a/solutions/ess-maker-skills/scripts/flightcheck/checks/workday.py b/solutions/ess-maker-skills/scripts/flightcheck/checks/workday.py index 473aa4c..b064b2e 100644 --- a/solutions/ess-maker-skills/scripts/flightcheck/checks/workday.py +++ b/solutions/ess-maker-skills/scripts/flightcheck/checks/workday.py @@ -169,6 +169,9 @@ def run_workday_checks(runner) -> list[CheckResult]: # --- SOAP Workflow Tests (only if Workday MCP creds available) --- results.extend(_check_workflows(runner)) + # --- REST Endpoints (manual diagnostic surfaced as a checklist item) --- + results.extend(_check_rest_endpoints_manual(runner)) + return results @@ -831,3 +834,44 @@ def _soap_call( if password and password in msg: msg = msg.replace(password, '[REDACTED]') return {"success": False, "error": msg} + + +def _check_rest_endpoints_manual(runner) -> list[CheckResult]: + """Surface the Workday REST endpoint diagnostic as a manual checklist item. + + The 9 Workday REST connector actions (`/workers/me`, inbox, payslips, + search, direct reports, supervisory orgs, feedback templates, + TransferEmployee, RequestFeedback) cannot be validated automatically + by the FlightCheck runner: they require an OAuth 2.0 Authorization + Code flow against a customer-registered Workday API Client, which is + the same chicken-and-egg auth problem documented in + `tests/fixtures/cassettes/INDEX.md` "Workday WQL config-validation pattern". + + Instead the kit ships a standalone interactive diagnostic at + `solutions/ess-maker-skills/scripts/diagnostics/test_workday_rest_endpoints.py`. + This check emits a single NotConfigured result that tells the operator + to run that script. It is intentionally only added when `run_workday_checks` + has already confirmed Workday is configured for the agent (via the + `wd_flows` gate at the top of `run_workday_checks`) — customers without + a Workday integration get no noise. + """ + return [CheckResult( + checkpoint_id="WD-REST-MANUAL", + category="Workday", + priority=Priority.HIGH.value, + status=Status.NOT_CONFIGURED.value, + description="Workday REST endpoint pre-flight (manual)", + result=( + "Workday REST OAuth (Authorization Code) cannot be automated by " + "the FlightCheck runner; run the standalone diagnostic to validate " + "the 9 REST connector actions." + ), + remediation=( + "Register an OAuth 2.0 API Client in Workday (grant type " + "Authorization Code, redirect URI https://localhost:8888/callback), " + "then run: python solutions/ess-maker-skills/scripts/diagnostics/" + "test_workday_rest_endpoints.py. Attach the resulting JSON file " + "to your deployment ticket." + ), + doc_link=f"{DOC_BASE}/workday", + )] diff --git a/solutions/ess-maker-skills/scripts/flightcheck/cli.py b/solutions/ess-maker-skills/scripts/flightcheck/cli.py index ac12f78..aaaa6ed 100644 --- a/solutions/ess-maker-skills/scripts/flightcheck/cli.py +++ b/solutions/ess-maker-skills/scripts/flightcheck/cli.py @@ -15,9 +15,15 @@ environment — PP environment, Dataverse, DLP authentication — Entra ID, SSO, CA policies external — Integration discovery (flows) + network — Vendor TCP/HTTPS reachability (no Microsoft auth required) workday — Workday deep validation local — Local agent file validation publishing — Publishing/QA checklist + +Standalone modes: + --export-firewall-requirements Write a network-team handoff doc listing + the required outbound vendor endpoints. + No Microsoft auth required. """ import argparse @@ -38,6 +44,7 @@ from flightcheck.checks.environment import run_environment_checks from flightcheck.checks.authentication import run_authentication_checks from flightcheck.checks.external_systems import run_external_systems_checks +from flightcheck.checks.network import run_network_checks from flightcheck.checks.workday import run_workday_checks from flightcheck.checks.local_files import run_local_file_checks from flightcheck.checks.publishing import run_publishing_checks @@ -48,6 +55,7 @@ "environment": [("Environment", run_environment_checks)], "authentication": [("Authentication", run_authentication_checks)], "external": [("External Systems", run_external_systems_checks)], + "network": [("Network", run_network_checks)], "workday": [ ("External Systems", run_external_systems_checks), ("Workday", run_workday_checks), @@ -61,11 +69,48 @@ ("Environment", run_environment_checks), ("Authentication", run_authentication_checks), ("External Systems", run_external_systems_checks), + ("Network", run_network_checks), ("Workday", run_workday_checks), ("Local Files", run_local_file_checks), ("Publishing", run_publishing_checks), ] +# Scopes that exercise vendor transport-only or local-file-only checks and +# therefore do NOT need Dataverse / Graph / Power Platform Admin authentication. +# Keep this set explicit so a future scope addition doesn't silently regress the +# no-auth path. The PVA gating below uses its own (narrower) allowlist because +# PVA is already lazy-auth. +_NO_MS_AUTH_SCOPES = frozenset({"network"}) + + +def _requires_microsoft_auth(scope: str) -> bool: + """Return True if the scope needs Dataverse / Graph / PP Admin auth. + + Pure function so tests can pin the scope-to-auth mapping without spinning + up MSAL or any HTTP client. Any new scope added to SCOPE_MAP should also + be considered here — either by adding it to ``_NO_MS_AUTH_SCOPES`` (if + it's a transport-only or local-only check) or by leaving it as the default + (auth-required). + """ + return scope not in _NO_MS_AUTH_SCOPES + + +def _run_export_firewall_requirements(config: dict, output_dir: str) -> int: + """Standalone mode: emit the firewall-requirements markdown handoff doc. + + Runs before any auth path so customers can hand the file to their network + team without first authenticating to Microsoft. Returns the process exit + code (0 on success, non-zero on render failure). + """ + from flightcheck.checks.firewall_export import export_firewall_requirements + + os.makedirs(output_dir, exist_ok=True) + out_path = os.path.join(output_dir, "firewall-requirements.md") + written = export_firewall_requirements(config, out_path) + print(f"Firewall requirements written to: {written}") + return 0 + + def main(): parser = argparse.ArgumentParser(description="ESS FlightCheck — Pre-deployment Validator") @@ -78,6 +123,14 @@ def main(): "--output", default="workspace/flightcheck", help="Output directory (default: workspace/flightcheck)", ) + parser.add_argument( + "--export-firewall-requirements", action="store_true", + help=( + "Standalone mode: render the firewall-requirements markdown doc " + "from required-endpoints.json and exit. Does NOT authenticate " + "to Microsoft." + ), + ) args = parser.parse_args() # Load config @@ -89,8 +142,15 @@ def main(): with open(config_path, "r", encoding="utf-8") as f: config = json.load(f) + # --- Standalone mode: firewall-requirements export --- + # Runs BEFORE the env_url check + any auth so customers can use it on a + # fresh checkout with only the config keys their network team needs. + if args.export_firewall_requirements: + sys.exit(_run_export_firewall_requirements(config, args.output)) + env_url = config.get("dataverseEndpoint", "") - if not env_url: + needs_ms_auth = _requires_microsoft_auth(args.scope) + if needs_ms_auth and not env_url: print("ERROR: No dataverseEndpoint in .local/config.json.") sys.exit(1) @@ -109,51 +169,61 @@ def main(): print("=" * 64) if len(agents) == 1: print(f" Agent: {agents[0].get('name', 'N/A')}") - else: + elif agents: print(f" Agents: {len(agents)} discovered") for a in agents: marker = "→" if a.get("slug") == active else " " print(f" {marker} {a.get('name', 'Unknown')}") - print(f" Environment: {env_url}") + if env_url: + print(f" Environment: {env_url}") print(f" Scope: {args.scope}") print("=" * 64) print() - # --- Authenticate --- - from auth import authenticate, discover_tenant - - print("Authenticating to Dataverse...") - dv_token = authenticate(env_url) - - tenant_id = discover_tenant(env_url) - print(f"Tenant: {tenant_id}") + # --- Authenticate (scope-gated) --- + dv_token = None + tenant_id = None + env_id = None + graph = None + pp_admin = None + + if needs_ms_auth: + from auth import authenticate, discover_tenant + + print("Authenticating to Dataverse...") + dv_token = authenticate(env_url) + + tenant_id = discover_tenant(env_url) + print(f"Tenant: {tenant_id}") + + # Derive PP environment ID + print("Deriving Power Platform environment ID...") + env_id = derive_environment_id(env_url, dv_token) + if env_id: + print(f"Environment ID: {env_id}") + else: + print("WARNING: Could not derive environment ID. Some checks may be limited.") + + # Initialize clients + print("Authenticating to Microsoft Graph...") + graph = GraphClient(tenant_id) + try: + graph.authenticate() + print(" Graph: OK") + except Exception as e: + print(f" Graph: WARNING — {e}") + print(" (Some checks will be skipped)") - # Derive PP environment ID - print("Deriving Power Platform environment ID...") - env_id = derive_environment_id(env_url, dv_token) - if env_id: - print(f"Environment ID: {env_id}") + print("Authenticating to Power Platform Admin API...") + pp_admin = PPAdminClient(tenant_id) + try: + pp_admin.authenticate() + print(" Power Platform: OK") + except Exception as e: + print(f" Power Platform: WARNING — {e}") + print(" (Some checks will be skipped)") else: - print("WARNING: Could not derive environment ID. Some checks may be limited.") - - # Initialize clients - print("Authenticating to Microsoft Graph...") - graph = GraphClient(tenant_id) - try: - graph.authenticate() - print(" Graph: OK") - except Exception as e: - print(f" Graph: WARNING — {e}") - print(" (Some checks will be skipped)") - - print("Authenticating to Power Platform Admin API...") - pp_admin = PPAdminClient(tenant_id) - try: - pp_admin.authenticate() - print(" Power Platform: OK") - except Exception as e: - print(f" Power Platform: WARNING — {e}") - print(" (Some checks will be skipped)") + print(f"Skipping Microsoft auth (not required for --scope {args.scope}).") # Gate PVA (Copilot Studio Island Gateway) auth on scope. # Only CONFIG-013 needs PVA today, and it lives in run_local_file_checks. @@ -174,7 +244,7 @@ def main(): print(f" Copilot Studio: WARNING — {e}") print(" (Knowledge source status check will use local-only validation)") pva = None - else: + elif needs_ms_auth: print("Skipping Copilot Studio auth (not required for this scope).") # --- Build runner --- diff --git a/solutions/ess-maker-skills/scripts/flightcheck/config/required-endpoints.json b/solutions/ess-maker-skills/scripts/flightcheck/config/required-endpoints.json new file mode 100644 index 0000000..ac76e41 --- /dev/null +++ b/solutions/ess-maker-skills/scripts/flightcheck/config/required-endpoints.json @@ -0,0 +1,51 @@ +{ + "_comment": "ESS required outbound network endpoints by integration. Used by Test-NetworkConnectivity.ps1 (BL-014).", + "_scope": "VENDOR ENDPOINTS ONLY. Microsoft endpoints are documented authoritatively by Microsoft - see https://learn.microsoft.com/en-us/power-platform/admin/online-requirements. This config covers Workday, ServiceNow, SAP SuccessFactors only.", + "_hostingNote": "Workday and SuccessFactors hostnames are data-center-based, not tenant-based. The customer tenant lives in the URL path or as a request parameter, NOT in the hostname. Therefore firewall reachability is the same for every customer using a given data center. Only ServiceNow uses an instance-prefixed hostname pattern.", + "_schemaVersion": "2.1", + "_lastUpdated": "2026-05-08", + "integrations": [ + { + "name": "Workday", + "required": true, + "hostingPattern": "Data center based (NOT tenant-prefixed). Customer tenant is in URL path.", + "endpoints": [ + { "host": "impl.workday.com", "port": 443, "purpose": "Implementation tenants UI host", "tenantPlaceholder": false }, + { "host": "wd2-impl-services1.workday.com", "port": 443, "purpose": "Implementation services (DC2)", "tenantPlaceholder": false }, + { "host": "wd5-impl-services1.workday.com", "port": 443, "purpose": "Implementation services (DC5)", "tenantPlaceholder": false }, + { "host": "wd5.myworkday.com", "port": 443, "purpose": "Production tenants UI host (DC5)", "tenantPlaceholder": false } + ], + "ipRangeNote": "Workday publishes IP ranges per data center at https://community.workday.com (login required). Customers should request the current list from their Workday account team. Customers should also confirm WHICH data centers their tenants are hosted in." + }, + { + "name": "ServiceNow", + "required": true, + "hostingPattern": "Instance-prefixed hostname (instance.service-now.com).", + "endpoints": [ + { "host": "{instance}.service-now.com", "port": 443, "purpose": "ServiceNow instance API", "tenantPlaceholder": true }, + { "host": "{instance}.servicenowservices.com", "port": 443, "purpose": "ServiceNow services endpoint", "tenantPlaceholder": true } + ], + "ipRangeNote": "ServiceNow IP ranges published at https://docs.servicenow.com under Platform Security > IP Address Access Control." + }, + { + "name": "SAP SuccessFactors", + "required": false, + "hostingPattern": "Data center based (NOT tenant-prefixed). Customer tenant is in URL path or request parameter.", + "endpoints": [ + { "host": "api.successfactors.com", "port": 443, "purpose": "SuccessFactors API (global)", "tenantPlaceholder": false }, + { "host": "api4.successfactors.com", "port": 443, "purpose": "SuccessFactors API (DC4)", "tenantPlaceholder": false }, + { "host": "api8.successfactors.com", "port": 443, "purpose": "SuccessFactors API (DC8)", "tenantPlaceholder": false } + ], + "ipRangeNote": "SAP publishes datacenter IP ranges at https://help.sap.com under SuccessFactors Platform > Network Configuration. Customers should confirm which datacenter (DC4 vs DC8 vs others) their tenant is hosted in." + } + ], + "microsoftEndpointsReference": { + "note": "We deliberately do NOT probe Microsoft endpoints. Customers should follow the official Microsoft allowlist documentation, which covers wildcard domains, Azure service tag IP ranges, and the consolidated domain initiative.", + "links": [ + { "title": "Power Platform URLs and IP address ranges", "url": "https://learn.microsoft.com/en-us/power-platform/admin/online-requirements" }, + { "title": "Microsoft 365 URLs and IP address ranges", "url": "https://learn.microsoft.com/en-us/microsoft-365/enterprise/urls-and-ip-address-ranges" }, + { "title": "Azure IP Ranges and Service Tags - Public Cloud", "url": "https://www.microsoft.com/download/details.aspx?id=56519" }, + { "title": "Power Platform connectors outbound IP addresses", "url": "https://learn.microsoft.com/en-us/connectors/common/outbound-ip-addresses" } + ] + } +} \ No newline at end of file diff --git a/solutions/ess-maker-skills/src/reference/workday-sso-test-flow/README.md b/solutions/ess-maker-skills/src/reference/workday-sso-test-flow/README.md new file mode 100644 index 0000000..4107a82 --- /dev/null +++ b/solutions/ess-maker-skills/src/reference/workday-sso-test-flow/README.md @@ -0,0 +1,75 @@ +# Workday SSO Test Flow Template + +A Power Automate flow template that helps customers validate Entra SSO +permissions on the Workday `OAuthUser` connection before deploying the +Employee Self-Service (ESS) agent. The flow performs a lightweight +`Get_Workers_Request` SOAP call as the test user and confirms that the +user has the correct Workday security domain access for ESS. + +This template is **reference content** — it is not deployed automatically +by any kit script. Customers import it manually in Power Automate. + +## When to use this + +Run this flow once after configuring the Workday connectors and SSO, and +once again for every new test user before granting them access to ESS. + +If the flow fails, the response payload identifies which security +domain is missing or misconfigured. Common failures: + +- The user is not in the Workday security group that ESS requires. +- The `OAuthUser` connection is not using Entra ID Integrated auth (the + flow will fail with a `403` or empty `Worker` element). +- The `Get_Workers` operation is not exposed to the user's domain + security policy. + +## Files + +| File | Purpose | +|---|---| +| [`sso-test-flow-template.json`](./sso-test-flow-template.json) | Power Automate flow definition + setup instructions + permissions-tested matrix. | + +The JSON's `flow_setup_instructions`, `manual_creation_guide`, +`soap_request_readable`, and `permissions_tested` blocks are +self-contained — open the file in any editor for the full reference. + +## Quick setup + +1. Open [Power Automate](https://make.powerautomate.com) and select + your ESS environment. +2. Create a new **Instant cloud flow** with an HTTP Request trigger. + The trigger schema is in the JSON under + `manual_creation_guide.trigger.schema`. +3. Add a **Workday SOAP — Execute SOAP operation (Preview)** action and + point it at your `OAuthUser` connection (Entra ID Integrated auth). +4. Paste the SOAP body from the JSON's `soap_request_readable.xml` + block, replacing the `@{triggerBody()?[...]}` expressions with the + trigger inputs. +5. Add a **Response** action that returns the SOAP result. +6. Save the flow and copy the HTTP trigger URL. +7. Run the flow with a real test user's UPN. A `200` with a populated + `Worker` element confirms SSO + permissions are wired correctly. + +## Security domains tested + +The `Get_Workers_Request` shape in the template exercises the following +Workday security domains. The full list lives in the JSON under +`permissions_tested.domains`: + +- **Worker Data: Worker ID** — minimum auth gate. +- **Worker Data: Personal Information (Self)** — name, contact info. +- **Worker Data: National Identifiers / Government IDs** — + identity-related fields. +- **Worker Data: Employment Information** — hire date, position. +- **Worker Data: Current Staffing Information** — cost center, company. +- **Person Data: Emergency Contacts**. +- **Worker Data: Qualifications / Skills and Experience**. + +## Related + +- [ESS Workday integration setup](../../reference/ess-docs/integrations/workday.md) +- [ESS Workday extensibility patterns](../../reference/ess-docs/integrations/workday-extensibility.md) +- Workday REST endpoints diagnostic: + `solutions/ess-maker-skills/scripts/diagnostics/test_workday_rest_endpoints.py` + (covers the REST-side connector validation; this template covers the + SOAP-side SSO validation). diff --git a/solutions/ess-maker-skills/src/reference/workday-sso-test-flow/sso-test-flow-template.json b/solutions/ess-maker-skills/src/reference/workday-sso-test-flow/sso-test-flow-template.json new file mode 100644 index 0000000..bfc3f12 --- /dev/null +++ b/solutions/ess-maker-skills/src/reference/workday-sso-test-flow/sso-test-flow-template.json @@ -0,0 +1,112 @@ +{ + "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#", + "metadata": { + "name": "ESS SSO Permission Test", + "description": "Pre-flight validator flow that tests Workday SSO permissions via the OAuthUser connection. Executes a lightweight Get_Workers_Request to prove the OAuth user has correct Workday security domain access.", + "version": "1.0.0", + "author": "ESS Pre-flight Validator", + "usage": "Import this flow into Power Automate, map the Workday SOAP connection to your OAuthUser connection, then run the ESS Validator SSO test." + }, + "flow_setup_instructions": { + "step_1": "Go to https://make.powerautomate.com and select your ESS environment", + "step_2": "Click 'My Flows' > 'Import' > 'Import Package (Legacy)'", + "step_3": "Select this template or create a new Instant Cloud Flow manually", + "step_4": "Add an HTTP Request trigger (see trigger_schema below)", + "step_5": "Add a 'Workday SOAP - Execute SOAP operation' action", + "step_6": "Set the connection to your OAuthUser connection (Entra ID Integrated)", + "step_7": "Configure the action parameters as shown in action_config below", + "step_8": "Add a Response action returning the SOAP result", + "step_9": "Save the flow - the HTTP trigger URL will be generated automatically" + }, + "manual_creation_guide": { + "flow_name": "ESS SSO Permission Test", + "flow_type": "Instant cloud flow (HTTP Request trigger)", + "trigger": { + "type": "Request", + "kind": "Http", + "method": "POST", + "schema": { + "type": "object", + "properties": { + "testUserUPN": { + "type": "string", + "description": "UPN of the user to test (e.g., user@contoso.com)" + }, + "workdayTenant": { + "type": "string", + "description": "Workday tenant name (e.g., contoso_impl)" + } + }, + "required": ["testUserUPN", "workdayTenant"] + } + }, + "actions": [ + { + "order": 1, + "name": "Execute_SOAP_Operation", + "connector": "Workday SOAP", + "action": "Execute SOAP operation (Preview)", + "operation_id": "SOAP_Operation", + "connection": "Use your OAuthUser connection (Entra ID Integrated auth type)", + "parameters": { + "service": "Human_Resources", + "version": "v42.0", + "requestBody": "See soap_request_template below - paste the XML with @{triggerBody()?['testUserUPN']} for the UPN" + } + }, + { + "order": 2, + "name": "Response", + "type": "Response", + "parameters": { + "statusCode": 200, + "headers": { + "Content-Type": "application/json" + }, + "body": { + "status": "completed", + "soapResponse": "@{body('Execute_SOAP_Operation')}" + } + } + } + ] + }, + "soap_request_template": "@{triggerBody()?['testUserUPN']}@{utcNow('yyyy-MM-dd')}truetruetruetruetrue", + "soap_request_readable": { + "_comment": "This is the human-readable version of the SOAP request above. Copy the XML below into the 'SOAP request body' field in Power Automate, replacing the @{...} expressions.", + "xml": [ + "", + " ", + " ", + " @{triggerBody()?['testUserUPN']}", + " ", + " ", + " ", + " @{utcNow('yyyy-MM-dd')}", + " ", + " ", + " true", + " true", + " true", + " true", + " true", + " ", + "" + ] + }, + "permissions_tested": { + "_comment": "The Get_Workers_Request with these response groups tests the following security domains for the OAuth user", + "domains": [ + { "response_group": "Include_Reference", "domain": "Worker Data: Worker ID", "test": "Employee_ID present in response" }, + { "response_group": "Include_Personal_Information", "domain": "Worker Data: Personal Information (Self)", "test": "First_Name / Last_Name present" }, + { "response_group": "Include_Personal_Information", "domain": "Person Data: Emergency Contacts", "test": "Emergency_Contact node present" }, + { "response_group": "Include_Personal_Information", "domain": "Worker Data: National Identifiers", "test": "National_ID node present" }, + { "response_group": "Include_Personal_Information", "domain": "Worker Data: Government IDs", "test": "Passport_ID / Visa_ID nodes" }, + { "response_group": "Include_Employment_Information", "domain": "Worker Data: Employment Information", "test": "Hire_Date / Employment_Data present" }, + { "response_group": "Include_Employment_Information", "domain": "Worker Data: Current Staffing Information", "test": "Position_ID / Organization_Data present" }, + { "response_group": "Include_Organizations", "domain": "Worker Data: Current Staffing Information", "test": "Organization_Data with Cost Center / Company" }, + { "response_group": "Include_Qualifications", "domain": "Worker Data: Qualifications", "test": "Certification nodes present" }, + { "response_group": "Include_Qualifications", "domain": "Worker Data: Skills and Experience", "test": "Language nodes present" } + ] + } +} diff --git a/tests/fixtures/cassettes/INDEX.md b/tests/fixtures/cassettes/INDEX.md index 7a9cd32..a316fb9 100644 --- a/tests/fixtures/cassettes/INDEX.md +++ b/tests/fixtures/cassettes/INDEX.md @@ -50,8 +50,9 @@ the PR. | **PowerApps Admin API** (`/Microsoft.PowerApps/...`, `/Microsoft.ProcessSimple/.../v2/flows`) | `validated` | Cassette at `flightcheck_pp_admin.yaml`. | The 404-on-Dataverse-only-env behavior is undocumented and discovered empirically; cassette is the only ground truth. | | **PVA Island Gateway** (`/api/botmanagement/v1/...`) | `validated` | Cassette at `island_gateway_botcomponents.yaml`. | Internal Copilot Studio API; not publicly documented. | | **Workday SOAP** (Human_Resources, Identity_Management, Compensation, Absence_Management, etc.) | `validated` | Cassettes at `flightcheck_workday.yaml`, `workday_config.yaml`. | Vendor docs require Workday Community login; tenant-specific WSDL varies. | -| **Workday WQL / REST** (`/ccx/api/wql/v1/...`, `/ccx/api/v1/...`) | `validated` | Cassette at `workday_wql_admin.yaml`. **Known auth blocker** — see "Workday WQL config-validation pattern" section below before authoring any runtime check on this cassette. | Per-tenant API client registration creates the chicken-and-egg blocker. | +| **Workday WQL / REST** (`/ccx/api/wql/v1/...`, `/ccx/api/v1/...`) | `validated` | Cassette at `workday_wql_admin.yaml`. **Known auth blocker** — see "Workday WQL config-validation pattern" section below before authoring any runtime check on this cassette. The 9 Workday REST connector actions (`/workers/me`, inbox, payslips, search, direct reports, supervisory orgs, feedback templates, TransferEmployee, RequestFeedback) intentionally do NOT have a runtime FlightCheck check — they are validated by the standalone `solutions/ess-maker-skills/scripts/diagnostics/test_workday_rest_endpoints.py` customer-run diagnostic. The runner surfaces this via `WD-REST-MANUAL` (`NotConfigured`) so the diagnostic appears on the deployment checklist without breaking the cardinal rule. | Per-tenant API client registration creates the chicken-and-egg blocker. | | **ServiceNow Table API** | `validated` | Cassette at `flightcheck_servicenow.yaml`. | Per-instance custom field variance + dev portal access required for live testing makes the documented tier insufficient. | +| **Vendor TCP/HTTPS reachability** (Workday/ServiceNow/SAP SuccessFactors outbound hostnames) | `n/a — transport diagnostic` | Transport-level probe (`socket.create_connection` + HTTPS `HEAD`). No vendor API response contract consumed, so the cardinal rule does not apply. Tests substitute fake `TcpProber` / `HttpsProber` implementations covering reachable / refused / timeout / DNS-failure / TLS-error / 4xx / 5xx branches. No cassette required. | Used by `solutions/ess-maker-skills/scripts/flightcheck/checks/network.py` (NET-001 / NET-002 / NET-003). | If you need to call an API that isn't in this registry, STOP and tell the user — the tier must be decided (and recorded here) before any diff --git a/tests/flightcheck/checks/test_firewall_export.py b/tests/flightcheck/checks/test_firewall_export.py new file mode 100644 index 0000000..e1ab74f --- /dev/null +++ b/tests/flightcheck/checks/test_firewall_export.py @@ -0,0 +1,126 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for the firewall-requirements markdown renderer. + +Pure file-render helper — no external API calls, no probes. Tests verify +the markdown structure against a small deterministic fixture catalog so +the rendered output stays in sync with what the network team expects to +hand-off to corporate IT. +""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from flightcheck.checks.firewall_export import export_firewall_requirements + + +_FIXTURE_CATALOG = { + "integrations": [ + { + "name": "Workday", + "required": True, + "hostingPattern": "Data center based (not tenant-prefixed)", + "ipRangeNote": "Workday IP ranges per data center at https://community.workday.com (login required).", + "endpoints": [ + {"host": "wd2-impl-services1.workday.com", "port": 443, + "purpose": "Implementation services (DC2)"}, + {"host": "wd5.myworkday.com", "port": 443, + "purpose": "Production services (DC5)"}, + ], + }, + { + "name": "ServiceNow", + "required": True, + "hostingPattern": "Instance-prefixed hostname", + "ipRangeNote": "ServiceNow IP ranges at https://docs.servicenow.com.", + "endpoints": [ + {"host": "{instance}.service-now.com", "port": 443, + "purpose": "Instance API"}, + ], + }, + ], + "microsoftEndpointsReference": { + "links": [ + {"title": "Power Platform URLs and IP address ranges", + "url": "https://learn.microsoft.com/en-us/power-platform/admin/online-requirements"}, + ], + }, +} + +_FIXED_NOW = datetime(2026, 5, 19, 12, 0, 0, tzinfo=timezone.utc) + + +@pytest.fixture +def catalog_path(tmp_path: Path) -> Path: + p = tmp_path / "required-endpoints.json" + p.write_text(json.dumps(_FIXTURE_CATALOG), encoding="utf-8") + return p + + +def _render(catalog_path: Path, tmp_path: Path, config: dict | None = None) -> str: + out = tmp_path / "out.md" + export_firewall_requirements( + config or {}, str(out), + catalog_path=str(catalog_path), + now=_FIXED_NOW, + ) + return out.read_text(encoding="utf-8") + + +class TestRender: + def test_includes_title_and_timestamp(self, catalog_path: Path, tmp_path: Path) -> None: + text = _render(catalog_path, tmp_path) + assert text.startswith("# ESS Firewall Allow-List Requirements") + assert "2026-05-19 12:00:00 UTC" in text + + def test_lists_every_integration_with_required_flag( + self, catalog_path: Path, tmp_path: Path + ) -> None: + text = _render(catalog_path, tmp_path) + assert "## Workday" in text + assert "## ServiceNow" in text + # Workday is required, ServiceNow is required → both show "Required: Yes" + assert text.count("**Required:** Yes") == 2 + + def test_includes_all_endpoint_hosts(self, catalog_path: Path, tmp_path: Path) -> None: + text = _render(catalog_path, tmp_path) + assert "wd2-impl-services1.workday.com" in text + assert "wd5.myworkday.com" in text + assert "{instance}.service-now.com" in text # left unresolved when no instance configured + + def test_servicenow_instance_substituted_when_configured( + self, catalog_path: Path, tmp_path: Path + ) -> None: + text = _render(catalog_path, tmp_path, + config={"network": {"servicenow_instance": "contoso"}}) + assert "contoso.service-now.com" in text + # We replace the host outright when configured — no leftover placeholder. + assert "{instance}.service-now.com" not in text + + def test_microsoft_endpoints_referenced_not_listed( + self, catalog_path: Path, tmp_path: Path + ) -> None: + text = _render(catalog_path, tmp_path) + # Reference link present + assert "https://learn.microsoft.com/en-us/power-platform/admin/online-requirements" in text + # Scope statement is clear that we don't enumerate Microsoft hosts + assert "Vendor endpoints only" in text + + def test_includes_tls_inspection_note(self, catalog_path: Path, tmp_path: Path) -> None: + text = _render(catalog_path, tmp_path) + assert "TLS inspection" in text + + def test_writes_to_specified_path(self, catalog_path: Path, tmp_path: Path) -> None: + out = tmp_path / "subdir" / "out.md" + out.parent.mkdir() # caller is responsible per docstring + returned = export_firewall_requirements( + {}, str(out), catalog_path=str(catalog_path), now=_FIXED_NOW, + ) + assert returned == str(out) + assert out.exists() diff --git a/tests/flightcheck/checks/test_network.py b/tests/flightcheck/checks/test_network.py new file mode 100644 index 0000000..d032589 --- /dev/null +++ b/tests/flightcheck/checks/test_network.py @@ -0,0 +1,381 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""End-to-end integration tests for the vendor network reachability +FlightCheck checks (NET-001 / NET-002 / NET-003). + +Cardinal-rule note: this is the deliberate exception to the +"validated/validatable/documented mock required" rule documented in +``tests/AGENTS.md`` and the "API tier registry" of +``tests/fixtures/cassettes/INDEX.md`` (see the "Vendor TCP/HTTPS +reachability" row). The check is a transport-level diagnostic — it does +NOT consume vendor API response contracts — so the tier system does not +apply. Instead, ``run_network_checks`` accepts injectable ``TcpProber`` +and ``HttpsProber`` arguments, and these tests substitute deterministic +fake implementations for the six relevant failure modes (refused, +timeout, DNS failure, TLS error, 4xx-style, 5xx). + +There is intentionally no ``require_validated_mock`` here and no +``responses`` / ``respx`` involvement. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import pytest + +from flightcheck.checks.network import ( + ProbeResult, + ProbeStatus, + run_network_checks, +) + + +# ─────────────────────────────────────────────────────────────────────── +# Fakes +# ─────────────────────────────────────────────────────────────────────── + + +@dataclass +class _MinimalRunner: + config: dict[str, Any] + + +class _ScriptedProber: + """Returns a pre-programmed ``ProbeResult`` per ``host:port`` key. + + Tests construct one of these with a dict mapping ``"host:port"`` to a + ``ProbeResult`` and pass it as ``tcp_prober`` / ``https_prober``. + Unknown hosts default to ``REACHABLE`` so test setups stay terse — + individual tests override only the hosts they care about. + """ + + def __init__(self, scripted: dict[str, ProbeResult] | None = None, + default_status: str = ProbeStatus.REACHABLE): + self.scripted = scripted or {} + self.default_status = default_status + self.calls: list[tuple[str, int, float]] = [] + + def probe(self, host: str, port: int, timeout: float) -> ProbeResult: + self.calls.append((host, port, timeout)) + key = f"{host}:{port}" + if key in self.scripted: + return self.scripted[key] + return ProbeResult(host=host, port=port, status=self.default_status, + detail=f"default {self.default_status}") + + +# ─────────────────────────────────────────────────────────────────────── +# Fixture catalog — minimal, deterministic. Avoids using the real +# required-endpoints.json so the tests don't break if vendor endpoints +# are added later. +# ─────────────────────────────────────────────────────────────────────── + +_FIXTURE_CATALOG = { + "integrations": [ + { + "name": "Workday", + "required": True, + "hostingPattern": "Data center based", + "ipRangeNote": "Workday IP ranges per data center at https://community.workday.com", + "endpoints": [ + {"host": "wd2-impl-services1.workday.com", "port": 443, "purpose": "Impl services"}, + {"host": "wd5.myworkday.com", "port": 443, "purpose": "Prod services"}, + ], + }, + { + "name": "ServiceNow", + "required": True, + "hostingPattern": "Instance-prefixed hostname", + "ipRangeNote": "ServiceNow IP ranges at https://docs.servicenow.com", + "endpoints": [ + {"host": "{instance}.service-now.com", "port": 443, "purpose": "Instance API"}, + ], + }, + { + "name": "SAP SuccessFactors", + "required": False, + "hostingPattern": "Data center based", + "ipRangeNote": "SAP DC IP ranges at https://help.sap.com", + "endpoints": [ + {"host": "api.successfactors.com", "port": 443, "purpose": "SF API"}, + ], + }, + ], +} + + +@pytest.fixture +def catalog_path(tmp_path: Path) -> Path: + path = tmp_path / "required-endpoints.json" + path.write_text(json.dumps(_FIXTURE_CATALOG), encoding="utf-8") + return path + + +def _runner(network_config: dict | None = None) -> _MinimalRunner: + return _MinimalRunner(config={"network": network_config or {}}) + + +def _by_id(results, checkpoint_id): + matches = [r for r in results if r.checkpoint_id == checkpoint_id] + if len(matches) != 1: + ids = [r.checkpoint_id for r in results] + raise AssertionError(f"Expected exactly one {checkpoint_id} in {ids}") + return matches[0] + + +# ─────────────────────────────────────────────────────────────────────── +# Happy path +# ─────────────────────────────────────────────────────────────────────── + + +class TestAllReachable: + def test_default_selects_required_only(self, catalog_path: Path) -> None: + """No ``network.integrations`` in config → required integrations are + probed, optional ones are Skipped. Matches the source PS behavior.""" + runner = _runner() # no integrations key + tcp = _ScriptedProber() + https = _ScriptedProber() + + results = run_network_checks( + runner, tcp_prober=tcp, https_prober=https, + config_path=str(catalog_path), + ) + + wd = _by_id(results, "NET-001") + assert wd.status == "Passed" + assert "2/2 reachable" in wd.result + + sn = _by_id(results, "NET-002") + # No servicenow_instance configured -> all hosts are placeholders -> Skipped + assert sn.status == "Skipped" + + sap = _by_id(results, "NET-003") + # Optional and not in selected_names default (required only) -> Skipped + assert sap.status == "Skipped" + assert "not in network.integrations" in sap.result + + def test_explicit_integrations_list_probes_each(self, catalog_path: Path) -> None: + runner = _runner({ + "integrations": ["Workday", "ServiceNow", "SAP SuccessFactors"], + "servicenow_instance": "contoso", + }) + tcp = _ScriptedProber() + https = _ScriptedProber() + + results = run_network_checks( + runner, tcp_prober=tcp, https_prober=https, + config_path=str(catalog_path), + ) + + assert _by_id(results, "NET-001").status == "Passed" + assert _by_id(results, "NET-002").status == "Passed" + assert _by_id(results, "NET-003").status == "Passed" + + # Confirm placeholder substitution happened. + sn_hosts = [call[0] for call in tcp.calls] + assert "contoso.service-now.com" in sn_hosts + assert "{instance}.service-now.com" not in sn_hosts + + +# ─────────────────────────────────────────────────────────────────────── +# Failure modes — exactly the six the docstring promises to cover +# ─────────────────────────────────────────────────────────────────────── + + +class TestFailureBranches: + def test_tcp_refused_is_failed(self, catalog_path: Path) -> None: + runner = _runner({"integrations": ["Workday"]}) + tcp = _ScriptedProber({ + "wd5.myworkday.com:443": ProbeResult( + host="wd5.myworkday.com", port=443, + status=ProbeStatus.REFUSED, + detail="TCP 443 refused/unreachable", + ), + }) + https = _ScriptedProber() + + results = run_network_checks( + runner, tcp_prober=tcp, https_prober=https, + config_path=str(catalog_path), + ) + wd = _by_id(results, "NET-001") + assert wd.status == "Failed" + assert "1/2 reachable" in wd.result + assert "FAIL" in wd.result + assert "firewall" in wd.remediation.lower() + assert "export-firewall-requirements" in wd.remediation + + def test_tcp_timeout_is_failed(self, catalog_path: Path) -> None: + runner = _runner({"integrations": ["Workday"]}) + tcp = _ScriptedProber({ + "wd2-impl-services1.workday.com:443": ProbeResult( + host="wd2-impl-services1.workday.com", port=443, + status=ProbeStatus.TIMEOUT, + detail="TCP 443 timed out after 5.0s", + ), + }) + https = _ScriptedProber() + results = run_network_checks( + runner, tcp_prober=tcp, https_prober=https, + config_path=str(catalog_path), + ) + assert _by_id(results, "NET-001").status == "Failed" + + def test_dns_failure_is_failed(self, catalog_path: Path) -> None: + runner = _runner({"integrations": ["Workday"]}) + tcp = _ScriptedProber({ + "wd2-impl-services1.workday.com:443": ProbeResult( + host="wd2-impl-services1.workday.com", port=443, + status=ProbeStatus.DNS_FAILURE, + detail="DNS resolution failed", + ), + "wd5.myworkday.com:443": ProbeResult( + host="wd5.myworkday.com", port=443, + status=ProbeStatus.DNS_FAILURE, + detail="DNS resolution failed", + ), + }) + https = _ScriptedProber() + results = run_network_checks( + runner, tcp_prober=tcp, https_prober=https, + config_path=str(catalog_path), + ) + wd = _by_id(results, "NET-001") + assert wd.status == "Failed" + assert "DNS resolution failed" in wd.result + + def test_tls_error_is_warning(self, catalog_path: Path) -> None: + """TCP open + TLS handshake failure = likely SSL inspection. Surface + as Warning, not Failed — the network is reachable, just intercepted.""" + runner = _runner({"integrations": ["Workday"]}) + tcp = _ScriptedProber() # All TCP OK + https = _ScriptedProber({ + "wd2-impl-services1.workday.com:443": ProbeResult( + host="wd2-impl-services1.workday.com", port=443, + status=ProbeStatus.TLS_ERROR, + detail="TLS handshake failed", + ), + }) + results = run_network_checks( + runner, tcp_prober=tcp, https_prober=https, + config_path=str(catalog_path), + ) + wd = _by_id(results, "NET-001") + assert wd.status == "Warning" + assert "1 warning" in wd.result + assert "TLS" in wd.remediation or "SSL" in wd.remediation + + def test_http_5xx_is_warning(self, catalog_path: Path) -> None: + """5xx means vendor reachable but something server-side; surface as + warning so deployment teams can retry or escalate to the vendor.""" + runner = _runner({"integrations": ["Workday"]}) + tcp = _ScriptedProber() + https = _ScriptedProber({ + "wd2-impl-services1.workday.com:443": ProbeResult( + host="wd2-impl-services1.workday.com", port=443, + status=ProbeStatus.HTTP_5XX, + detail="HTTPS 503 server error", + ), + }) + results = run_network_checks( + runner, tcp_prober=tcp, https_prober=https, + config_path=str(catalog_path), + ) + assert _by_id(results, "NET-001").status == "Warning" + + def test_https_4xx_is_still_reachable(self, catalog_path: Path) -> None: + """4xx (e.g. 401, 403, 404) means TLS + HTTP layer worked. The probe + intentionally does not authenticate, so a 401 IS reachable.""" + runner = _runner({"integrations": ["Workday"]}) + tcp = _ScriptedProber() + https = _ScriptedProber({ + "wd2-impl-services1.workday.com:443": ProbeResult( + host="wd2-impl-services1.workday.com", port=443, + status=ProbeStatus.REACHABLE, + detail="HTTPS 401", + ), + "wd5.myworkday.com:443": ProbeResult( + host="wd5.myworkday.com", port=443, + status=ProbeStatus.REACHABLE, + detail="HTTPS 404", + ), + }) + results = run_network_checks( + runner, tcp_prober=tcp, https_prober=https, + config_path=str(catalog_path), + ) + assert _by_id(results, "NET-001").status == "Passed" + + +# ─────────────────────────────────────────────────────────────────────── +# Selection / configuration edge cases +# ─────────────────────────────────────────────────────────────────────── + + +class TestSelectionAndConfig: + def test_servicenow_without_instance_is_skipped_with_remediation( + self, catalog_path: Path + ) -> None: + runner = _runner({"integrations": ["ServiceNow"]}) + tcp = _ScriptedProber() + https = _ScriptedProber() + results = run_network_checks( + runner, tcp_prober=tcp, https_prober=https, + config_path=str(catalog_path), + ) + sn = _by_id(results, "NET-002") + assert sn.status == "Skipped" + assert "network.servicenow_instance" in sn.remediation + + def test_integration_not_selected_is_skipped(self, catalog_path: Path) -> None: + runner = _runner({"integrations": ["Workday"]}) # ServiceNow not opted-in + tcp = _ScriptedProber() + https = _ScriptedProber() + results = run_network_checks( + runner, tcp_prober=tcp, https_prober=https, + config_path=str(catalog_path), + ) + sn = _by_id(results, "NET-002") + assert sn.status == "Skipped" + assert "not in network.integrations" in sn.result + + def test_no_https_call_when_tcp_blocked(self, catalog_path: Path) -> None: + """Short-circuit guarantee: if TCP is closed, we don't waste a 5s + timeout on a follow-up HTTPS attempt that's also going to fail.""" + runner = _runner({"integrations": ["Workday"]}) + tcp = _ScriptedProber({ + "wd2-impl-services1.workday.com:443": ProbeResult( + host="wd2-impl-services1.workday.com", port=443, + status=ProbeStatus.REFUSED, detail="refused", + ), + "wd5.myworkday.com:443": ProbeResult( + host="wd5.myworkday.com", port=443, + status=ProbeStatus.REFUSED, detail="refused", + ), + }) + https = _ScriptedProber() + run_network_checks( + runner, tcp_prober=tcp, https_prober=https, + config_path=str(catalog_path), + ) + # HTTPS prober should have been called zero times — TCP failed + # for every host so the short-circuit kicked in. + assert https.calls == [] + + def test_missing_config_file_returns_error_result(self, tmp_path: Path) -> None: + """Defensive: tampered repo with a missing config doesn't crash; + emits a single ERROR result so the operator sees what to fix.""" + runner = _runner() + missing_path = tmp_path / "does-not-exist.json" + results = run_network_checks( + runner, tcp_prober=_ScriptedProber(), https_prober=_ScriptedProber(), + config_path=str(missing_path), + ) + assert len(results) == 1 + assert results[0].checkpoint_id == "NET-CONFIG" + assert results[0].status == "Error" diff --git a/tests/flightcheck/test_cli_lazy_auth.py b/tests/flightcheck/test_cli_lazy_auth.py new file mode 100644 index 0000000..977cfa3 --- /dev/null +++ b/tests/flightcheck/test_cli_lazy_auth.py @@ -0,0 +1,85 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Regression tests for the cli.py lazy-auth scope gate. + +The FlightCheck CLI authenticates to Dataverse / Graph / Power Platform +Admin before any check function runs. This is expensive (interactive +MSAL prompt) AND wrong for scopes that don't need Microsoft auth — +notably ``--scope network`` and the ``--export-firewall-requirements`` +standalone mode. + +``_requires_microsoft_auth(scope)`` is the single source of truth for +which scopes need the heavy auth path. These tests pin its current +behavior so a future scope addition can't silently regress the no-auth +path. If a new scope is added, either: + + * Add it to ``_NO_MS_AUTH_SCOPES`` and pin a ``False`` assertion here, or + * Leave it auth-required (default) and pin a ``True`` assertion here. + +Whichever it is — pin it. Don't let the question be implicit. +""" + +from __future__ import annotations + +import pytest + +from flightcheck.cli import _NO_MS_AUTH_SCOPES, SCOPE_MAP, _requires_microsoft_auth + + +# --------------------------------------------------------------------------- +# Per-scope assertions — one per known scope key. +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("scope", ["full", "prerequisites", "environment", + "authentication", "external", "workday", + "local", "publishing"]) +def test_auth_required_scopes(scope: str) -> None: + """These scopes hit Microsoft APIs (Dataverse, Graph, PP Admin, PVA).""" + assert _requires_microsoft_auth(scope) is True, ( + f"--scope {scope} should require Microsoft auth but the helper said no" + ) + + +def test_network_scope_does_not_require_microsoft_auth() -> None: + """``--scope network`` is the original motivation for the lazy-auth gate. + + Vendor TCP/HTTPS reachability probing has no Microsoft API surface, so + Dataverse / Graph / PP Admin auth would be both wasteful and a + user-experience regression (interactive MSAL prompt before a probe + that doesn't need it). Pin this hard. + """ + assert _requires_microsoft_auth("network") is False + + +# --------------------------------------------------------------------------- +# Guard against silent regressions: every scope key must be classified. +# --------------------------------------------------------------------------- + + +def test_every_scope_key_is_classified() -> None: + """If a new scope gets added to ``SCOPE_MAP``, this test fails until + the author decides whether the new scope is auth-required and updates + either the parametrize list above or ``_NO_MS_AUTH_SCOPES``. + """ + auth_required = {"full", "prerequisites", "environment", "authentication", + "external", "workday", "local", "publishing"} + classified = auth_required | set(_NO_MS_AUTH_SCOPES) + + all_scopes = {"full"} | set(SCOPE_MAP.keys()) + unclassified = all_scopes - classified + assert not unclassified, ( + f"Scope(s) {unclassified} are in SCOPE_MAP but not classified in this test. " + "Decide whether each needs Microsoft auth and update _NO_MS_AUTH_SCOPES or the " + "auth_required set above." + ) + + +def test_no_ms_auth_scopes_subset_of_known() -> None: + """``_NO_MS_AUTH_SCOPES`` should only name scopes that actually exist.""" + all_scopes = {"full"} | set(SCOPE_MAP.keys()) + stray = set(_NO_MS_AUTH_SCOPES) - all_scopes + assert not stray, ( + f"_NO_MS_AUTH_SCOPES has {stray} that aren't in SCOPE_MAP. Remove them." + )