diff --git a/app-store/whoop/.env.example b/app-store/whoop/.env.example new file mode 100644 index 0000000..8e664f4 --- /dev/null +++ b/app-store/whoop/.env.example @@ -0,0 +1,4 @@ +WHOOP_CLIENT_ID= +WHOOP_CLIENT_SECRET= +# for bootstrap_whoop_oauth.py +WHOOP_REDIRECT_URI=http://127.0.0.1:8765/callback diff --git a/app-store/whoop/.gitignore b/app-store/whoop/.gitignore new file mode 100644 index 0000000..62dd1dd --- /dev/null +++ b/app-store/whoop/.gitignore @@ -0,0 +1 @@ +artifacts/* diff --git a/app-store/whoop/config.py b/app-store/whoop/config.py new file mode 100644 index 0000000..d664089 --- /dev/null +++ b/app-store/whoop/config.py @@ -0,0 +1,55 @@ +"""Configuration helpers for the WHOOP Truffle app.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + + +DEFAULT_WHOOP_API_BASE = "https://api.prod.whoop.com/developer" +DEFAULT_WHOOP_TOKEN_URL = "https://api.prod.whoop.com/oauth/oauth2/token" +DEFAULT_TOKEN_STORE_PATH = Path.home() / ".whoop-truffle" / "oauth.json" + + +def _parse_optional_float(raw: str | None) -> float | None: + value = (raw or "").strip() + if not value: + return None + try: + return float(value) + except ValueError: + return None + + +@dataclass(frozen=True, slots=True) +class WhoopConfig: + client_id: str = "" + client_secret: str = "" + redirect_uri: str = "" + access_token: str = "" + refresh_token: str = "" + api_base: str = DEFAULT_WHOOP_API_BASE + token_url: str = DEFAULT_WHOOP_TOKEN_URL + token_store_path: Path = DEFAULT_TOKEN_STORE_PATH + access_token_expires_at: float | None = None + token_scope: str = "" + token_type: str = "bearer" + + @classmethod + def from_env(cls) -> WhoopConfig: + token_store_raw = os.getenv("WHOOP_TOKEN_STORE_PATH", "").strip() + token_store_path = Path(token_store_raw) if token_store_raw else DEFAULT_TOKEN_STORE_PATH + return cls( + client_id=os.getenv("WHOOP_CLIENT_ID", "").strip(), + client_secret=os.getenv("WHOOP_CLIENT_SECRET", "").strip(), + redirect_uri=os.getenv("WHOOP_REDIRECT_URI", "").strip(), + access_token=os.getenv("WHOOP_ACCESS_TOKEN", "").strip(), + refresh_token=os.getenv("WHOOP_REFRESH_TOKEN", "").strip(), + api_base=os.getenv("WHOOP_API_BASE", DEFAULT_WHOOP_API_BASE).strip() or DEFAULT_WHOOP_API_BASE, + token_url=os.getenv("WHOOP_TOKEN_URL", DEFAULT_WHOOP_TOKEN_URL).strip() or DEFAULT_WHOOP_TOKEN_URL, + token_store_path=token_store_path, + access_token_expires_at=_parse_optional_float(os.getenv("WHOOP_ACCESS_TOKEN_EXPIRES_AT")), + token_scope=os.getenv("WHOOP_TOKEN_SCOPE", "").strip(), + token_type=os.getenv("WHOOP_TOKEN_TYPE", "bearer").strip() or "bearer", + ) diff --git a/app-store/whoop/icon.png b/app-store/whoop/icon.png new file mode 100644 index 0000000..37c89a4 Binary files /dev/null and b/app-store/whoop/icon.png differ diff --git a/app-store/whoop/prompts/whoop_cli_prompts.txt b/app-store/whoop/prompts/whoop_cli_prompts.txt new file mode 100644 index 0000000..b19ccbf --- /dev/null +++ b/app-store/whoop/prompts/whoop_cli_prompts.txt @@ -0,0 +1,10 @@ +Check my WHOOP connection status. +If auth fails, explain exactly what failed and what needs to be reauthorized. +--- +Pull my latest WHOOP summary and give me recovery, sleep quality, day strain, and recent workouts. +--- +Show my last 7 recovery records and summarize the trend in recovery score, HRV, and resting heart rate. +--- +Show my last 7 cycles and tell me whether day strain is rising, falling, or flat. +--- +Pull my WHOOP sleep records from the last week and tell me which night had the best and worst sleep performance. diff --git a/app-store/whoop/scripts/bootstrap_whoop_oauth.py b/app-store/whoop/scripts/bootstrap_whoop_oauth.py new file mode 100644 index 0000000..fc9a355 --- /dev/null +++ b/app-store/whoop/scripts/bootstrap_whoop_oauth.py @@ -0,0 +1,421 @@ +#!/usr/bin/env python3 +"""Bootstrap WHOOP OAuth locally and print Truffle deploy prompt values.""" + +from __future__ import annotations + +import json +import os +import queue +import secrets +import subprocess +import threading +import time +import urllib.parse +import webbrowser +from collections.abc import Mapping +from dataclasses import dataclass +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Any + + +SCRIPT_DIR = Path(__file__).resolve().parent +ENV_FILE = SCRIPT_DIR / ".env" +DEFAULT_REDIRECT_URI = "http://127.0.0.1:8765/callback" +AUTH_URL = "https://api.prod.whoop.com/oauth/oauth2/auth" +TOKEN_URL = "https://api.prod.whoop.com/oauth/oauth2/token" +PROFILE_URL = "https://api.prod.whoop.com/developer/v2/user/profile/basic" +SCOPES = ( + "offline", + "read:profile", + "read:body_measurement", + "read:cycles", + "read:recovery", + "read:sleep", + "read:workout", +) +CALLBACK_TIMEOUT_SECONDS = 180 + + +@dataclass(frozen=True, slots=True) +class BootstrapConfig: + client_id: str + client_secret: str + redirect_uri: str = DEFAULT_REDIRECT_URI + + +@dataclass(frozen=True, slots=True) +class OAuthCallback: + code: str + state: str + error: str = "" + error_description: str = "" + + +def _decode_quoted_env_value(value: str, quote: str) -> str: + chars: list[str] = [] + escaped = False + for char in value[1:]: + if escaped: + if quote == '"' and char == "n": + chars.append("\n") + elif quote == '"' and char == "r": + chars.append("\r") + elif quote == '"' and char == "t": + chars.append("\t") + else: + chars.append(char) + escaped = False + continue + if char == "\\": + escaped = True + continue + if char == quote: + return "".join(chars) + chars.append(char) + return "".join(chars) + + +def _strip_unquoted_env_comment(value: str) -> str: + for index, char in enumerate(value): + if char == "#" and (index == 0 or value[index - 1].isspace()): + return value[:index].rstrip() + return value.rstrip() + + +def parse_env_file(path: Path) -> dict[str, str]: + values: dict[str, str] = {} + if not path.exists(): + return values + + for raw_line in path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + if line.startswith("export "): + line = line[len("export ") :].lstrip() + if "=" not in line: + continue + + key, raw_value = line.split("=", 1) + key = key.strip() + if not key: + continue + + value = raw_value.strip() + if value.startswith('"'): + values[key] = _decode_quoted_env_value(value, '"') + elif value.startswith("'"): + values[key] = _decode_quoted_env_value(value, "'") + else: + values[key] = _strip_unquoted_env_comment(value) + return values + + +def load_config(*, env: Mapping[str, str] = os.environ, env_file: Path = ENV_FILE) -> BootstrapConfig: + file_values = parse_env_file(env_file) + + def get_value(name: str, default: str = "") -> str: + value = str(env.get(name, "") or "").strip() + if value: + return value + return str(file_values.get(name, default) or "").strip() + + config = BootstrapConfig( + client_id=get_value("WHOOP_CLIENT_ID"), + client_secret=get_value("WHOOP_CLIENT_SECRET"), + redirect_uri=get_value("WHOOP_REDIRECT_URI", DEFAULT_REDIRECT_URI), + ) + + missing = [ + name + for name, value in ( + ("WHOOP_CLIENT_ID", config.client_id), + ("WHOOP_CLIENT_SECRET", config.client_secret), + ) + if not value + ] + if missing: + raise RuntimeError( + "Missing required WHOOP OAuth setting(s): " + f"{', '.join(missing)}. Create {env_file} from .env.example or export them in your shell." + ) + + return config + + +def generate_state() -> str: + return secrets.token_hex(4) + + +def build_authorization_url(*, state: str, config: BootstrapConfig) -> str: + query = urllib.parse.urlencode( + { + "response_type": "code", + "client_id": config.client_id, + "redirect_uri": config.redirect_uri, + "scope": " ".join(SCOPES), + "state": state, + } + ) + return f"{AUTH_URL}?{query}" + + +def exchange_authorization_code( + code: str, + *, + config: BootstrapConfig, + curl_runner: Any = subprocess.run, + now: float | None = None, +) -> dict[str, Any]: + raw = _curl_form_json( + TOKEN_URL, + { + "grant_type": "authorization_code", + "code": code, + "client_id": config.client_id, + "client_secret": config.client_secret, + "redirect_uri": config.redirect_uri, + }, + error_prefix="WHOOP token exchange failed", + curl_runner=curl_runner, + ) + + data = json.loads(raw) + if not isinstance(data, dict): + raise RuntimeError("WHOOP token exchange returned a non-object payload") + + access_token = str(data.get("access_token", "") or "").strip() + refresh_token = str(data.get("refresh_token", "") or "").strip() + if not access_token or not refresh_token: + raise RuntimeError("WHOOP token exchange did not return both access and refresh tokens") + + expires_in = int(data.get("expires_in", 0) or 0) + data["expires_at"] = int((now if now is not None else time.time()) + expires_in) + data["scope"] = str(data.get("scope", "") or "").strip() + data["token_type"] = str(data.get("token_type", "bearer") or "bearer").strip() or "bearer" + return data + + +def fetch_profile( + access_token: str, + *, + curl_runner: Any = subprocess.run, +) -> dict[str, Any]: + raw = _curl_json( + [ + "curl", + "-sS", + "--fail-with-body", + "--max-time", + "30", + "-H", + "Accept: application/json", + "-H", + f"Authorization: Bearer {access_token}", + PROFILE_URL, + ], + error_prefix="WHOOP profile smoke test failed", + curl_runner=curl_runner, + ) + + payload = json.loads(raw) + if not isinstance(payload, dict): + raise RuntimeError("WHOOP profile smoke test returned a non-object payload") + return payload + + +def _curl_form_json( + url: str, + form: dict[str, str], + *, + error_prefix: str, + curl_runner: Any = subprocess.run, +) -> str: + command = [ + "curl", + "-sS", + "--fail-with-body", + "--max-time", + "30", + "-X", + "POST", + "-H", + "Accept: application/json", + ] + for key, value in form.items(): + command.extend(["--data-urlencode", f"{key}={value}"]) + command.append(url) + return _curl_json(command, error_prefix=error_prefix, curl_runner=curl_runner) + + +def _curl_json( + command: list[str], + *, + error_prefix: str, + curl_runner: Any = subprocess.run, +) -> str: + try: + result = curl_runner(command, capture_output=True, text=True, timeout=35, check=False) + except FileNotFoundError as exc: + raise RuntimeError("curl is required to run the WHOOP OAuth bootstrap script") from exc + except subprocess.TimeoutExpired as exc: + raise RuntimeError(f"{error_prefix}: curl timed out") from exc + + stdout = str(getattr(result, "stdout", "") or "") + stderr = str(getattr(result, "stderr", "") or "") + returncode = int(getattr(result, "returncode", 1) or 0) + if returncode != 0: + detail = (stdout or stderr).strip() + suffix = f": {detail[:300]}" if detail else "" + raise RuntimeError(f"{error_prefix}: curl exited {returncode}{suffix}") + return stdout + + +def prompt_values(token_payload: dict[str, Any], *, config: BootstrapConfig) -> list[tuple[str, str]]: + return [ + ("WHOOP Client ID", config.client_id), + ("WHOOP Client Secret", config.client_secret), + ("WHOOP Redirect URI", config.redirect_uri), + ("WHOOP Access Token", str(token_payload["access_token"])), + ("WHOOP Refresh Token", str(token_payload["refresh_token"])), + ("WHOOP Access Token Expires At", str(token_payload["expires_at"])), + ("WHOOP Token Scope", str(token_payload.get("scope", "") or "")), + ("WHOOP Token Type", str(token_payload.get("token_type", "bearer") or "bearer")), + ] + + +def render_prompt_values(token_payload: dict[str, Any], *, config: BootstrapConfig) -> str: + lines = ["Paste these values into the matching `truffile deploy` prompts:"] + for label, value in prompt_values(token_payload, config=config): + lines.append(f"{label}:") + lines.append(value) + return "\n".join(lines) + + +class OAuthCallbackServer: + def __init__(self, *, redirect_uri: str) -> None: + parsed = urllib.parse.urlparse(redirect_uri) + self._expected_path = parsed.path or "/" + self._events: queue.Queue[OAuthCallback] = queue.Queue(maxsize=1) + handler_cls = self._build_handler() + host = parsed.hostname or "127.0.0.1" + port = parsed.port or 80 + try: + self._server = ThreadingHTTPServer((host, port), handler_cls) + except OSError as exc: + raise RuntimeError( + f"Could not bind the local WHOOP callback server on {host}:{port}. " + "Make sure the redirect URI is free and another process is not already using that port." + ) from exc + self._thread = threading.Thread(target=self._server.serve_forever, daemon=True) + + def start(self) -> None: + self._thread.start() + + def wait_for_callback(self, *, timeout: float) -> OAuthCallback: + try: + return self._events.get(timeout=timeout) + except queue.Empty as exc: + raise TimeoutError(f"Timed out waiting for WHOOP OAuth redirect after {int(timeout)} seconds") from exc + + def close(self) -> None: + self._server.shutdown() + self._server.server_close() + self._thread.join(timeout=2) + + def _build_handler(self) -> type[BaseHTTPRequestHandler]: + outer = self + + class Handler(BaseHTTPRequestHandler): + def do_GET(self) -> None: # noqa: N802 + parsed = urllib.parse.urlparse(self.path) + if parsed.path != outer._expected_path: + self.send_error(404, "Unexpected callback path") + return + + params = urllib.parse.parse_qs(parsed.query) + callback = OAuthCallback( + code=str(params.get("code", [""])[0] or "").strip(), + state=str(params.get("state", [""])[0] or "").strip(), + error=str(params.get("error", [""])[0] or "").strip(), + error_description=str(params.get("error_description", [""])[0] or "").strip(), + ) + if outer._events.empty(): + outer._events.put(callback) + + self.send_response(200) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.end_headers() + if callback.error: + message = "WHOOP authorization failed. You can close this window." + else: + message = "WHOOP authorization received. You can close this window and return to the terminal." + self.wfile.write(message.encode("utf-8")) + + def log_message(self, format: str, *args: Any) -> None: # noqa: A003 + return None + + return Handler + + +def authorize_and_print_env(*, open_browser: bool = True) -> int: + config = load_config() + state = generate_state() + auth_url = build_authorization_url(state=state, config=config) + server = OAuthCallbackServer(redirect_uri=config.redirect_uri) + server.start() + try: + print("Before continuing, make sure this redirect URI is saved in the WHOOP dashboard:", flush=True) + print(f" {config.redirect_uri}", flush=True) + print("Click Update App after adding it; an unsaved redirect will be rejected.", flush=True) + print(flush=True) + print( + "The authorization URL includes the OAuth request scope 'offline' so WHOOP returns a refresh token. " + "It may not appear as a dashboard checkbox.", + flush=True, + ) + print(flush=True) + print("WHOOP authorization URL:", flush=True) + print(auth_url, flush=True) + print(flush=True) + + if open_browser: + try: + webbrowser.open(auth_url) + except Exception: + pass + + callback = server.wait_for_callback(timeout=CALLBACK_TIMEOUT_SECONDS) + if callback.error: + description = f": {callback.error_description}" if callback.error_description else "" + raise RuntimeError(f"WHOOP authorization failed with error '{callback.error}'{description}") + if not callback.code: + raise RuntimeError("WHOOP redirect did not include an authorization code") + if callback.state != state: + raise RuntimeError("WHOOP redirect state did not match the request state") + + token_payload = exchange_authorization_code(callback.code, config=config) + profile = fetch_profile(str(token_payload["access_token"])) + print( + f"WHOOP OAuth OK for user_id={profile.get('user_id')} email={profile.get('email')}", + flush=True, + ) + print(flush=True) + print("Run `truffile deploy ./app-store/whoop` and paste the generated values when prompted.", flush=True) + print("Do not commit these token values.", flush=True) + print(flush=True) + print(render_prompt_values(token_payload, config=config), flush=True) + return 0 + finally: + server.close() + + +def main() -> int: + if "BROWSER" in os.environ and os.environ["BROWSER"].strip().lower() == "none": + return authorize_and_print_env(open_browser=False) + return authorize_and_print_env(open_browser=True) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/app-store/whoop/scripts/run_whoop_prompts.py b/app-store/whoop/scripts/run_whoop_prompts.py new file mode 100644 index 0000000..1e1a71e --- /dev/null +++ b/app-store/whoop/scripts/run_whoop_prompts.py @@ -0,0 +1,485 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import subprocess +import sys +import tempfile +import time +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +APP_DIR = Path(__file__).resolve().parents[1] +DEFAULT_PROMPTS = APP_DIR / "prompts" / "whoop_cli_prompts.txt" +DEFAULT_OUT = APP_DIR / "artifacts" / "whoop-cli" + + +@dataclass +class CommandResult: + command: list[str] + returncode: int + stdout: str + stderr: str + error: str | None = None + + +@dataclass +class PromptRun: + result: CommandResult + payload: dict[str, Any] | None + validation_error: str | None + warnings: list[str] + settle_attempts: list[dict[str, Any]] + + +def _slug(value: str) -> str: + return value.lower().replace(" ", "-") + + +def _resolve_app_ref(ref: str, apps: list[dict[str, Any]]) -> dict[str, Any] | None: + ref_s = ref.strip() + ref_l = ref_s.lower() + for app in apps: + if app.get("uuid") == ref_s: + return app + for app in apps: + if str(app.get("name", "")).lower() == ref_l: + return app + for app in apps: + if _slug(str(app.get("name", ""))) == ref_l: + return app + for app in apps: + if ref_l in str(app.get("name", "")).lower(): + return app + return None + + +def _parse_prompt_file(path: Path) -> list[str]: + text = path.read_text(encoding="utf-8") + prompts: list[str] = [] + current: list[str] = [] + for line in text.splitlines(): + if line.strip() == "---": + block = "\n".join(current).strip() + if block: + prompts.append(block) + current = [] + else: + current.append(line) + block = "\n".join(current).strip() + if block: + prompts.append(block) + return prompts + + +def _run(command: list[str], *, timeout: float | None) -> CommandResult: + try: + completed = subprocess.run( + command, + capture_output=True, + text=True, + timeout=timeout, + check=False, + ) + return CommandResult( + command=command, + returncode=completed.returncode, + stdout=completed.stdout, + stderr=completed.stderr, + ) + except FileNotFoundError as exc: + return CommandResult( + command=command, + returncode=127, + stdout="", + stderr="", + error=f"command not found: {command[0]} ({exc})", + ) + except subprocess.TimeoutExpired as exc: + return CommandResult( + command=command, + returncode=124, + stdout=exc.stdout or "", + stderr=exc.stderr or "", + error=f"command timed out after {timeout} seconds", + ) + + +def _load_json(stdout: str) -> tuple[dict[str, Any] | None, str | None]: + try: + payload = json.loads(stdout) + except json.JSONDecodeError as exc: + return None, str(exc) + if not isinstance(payload, dict): + return None, "stdout JSON was not an object" + return payload, None + + +def _write_json(path: Path, payload: dict[str, Any]) -> None: + path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") + + +def _content_preview(content: Any, *, limit: int = 180) -> str: + text = " ".join(str(content or "").split()) + if len(text) <= limit: + return text + return text[: limit - 1].rstrip() + "..." + + +def _make_run_dir(base: Path) -> Path: + stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + candidate = base / stamp + if not candidate.exists(): + candidate.mkdir(parents=True) + return candidate + for index in range(2, 1000): + candidate = base / f"{stamp}-{index}" + if not candidate.exists(): + candidate.mkdir(parents=True) + return candidate + raise RuntimeError(f"could not create unique run directory under {base}") + + +def _list_apps(truffile: str, *, timeout: float | None) -> tuple[list[dict[str, Any]] | None, str | None]: + result = _run([truffile, "chat", "--quiet", "--json", "--list-apps"], timeout=timeout) + if result.returncode != 0: + detail = result.error or result.stderr.strip() or result.stdout.strip() or "unknown error" + return None, f"`truffile chat --list-apps` failed: {detail}" + payload, error = _load_json(result.stdout) + if error: + return None, f"`truffile chat --list-apps` returned invalid JSON: {error}" + apps = payload.get("apps") if payload else None + if not isinstance(apps, list): + return None, "`truffile chat --list-apps` JSON did not include an apps list" + return [app for app in apps if isinstance(app, dict)], None + + +def _peek_task( + *, + truffile: str, + task_id: str, + timeout: float | None, +) -> tuple[CommandResult, dict[str, Any] | None, str | None]: + command = [truffile, "chat", "--quiet", "--json", "--task-id", task_id] + result = _run(command, timeout=timeout) + if result.returncode != 0: + detail = result.error or result.stderr.strip() or result.stdout.strip() or "unknown error" + return result, None, f"settle command failed: {detail}" + payload, error = _load_json(result.stdout) + if error: + return result, None, f"settle command returned invalid JSON: {error}" + return result, payload, None + + +def _record_settle_attempt( + *, + attempt: int, + result: CommandResult, + payload: dict[str, Any] | None, + error: str | None, +) -> dict[str, Any]: + return { + "attempt": attempt, + "command": result.command, + "exit_code": result.returncode, + "error": error, + "task_id": payload.get("task_id") if payload else None, + "pending_user_response": payload.get("pending_user_response") if payload else None, + "content_preview": _content_preview(payload.get("content") if payload else ""), + "stdout": result.stdout if error else None, + "stderr": result.stderr if error else None, + "command_error": result.error, + } + + +def _validate_payload( + payload: dict[str, Any], + *, + fail_on_pending: bool, +) -> tuple[str | None, list[str]]: + warnings: list[str] = [] + content = str(payload.get("content") or "").strip() + if not content: + return "response content was empty", warnings + if payload.get("pending_user_response") is True: + message = "response is waiting for a follow-up user message" + if fail_on_pending: + return message, warnings + warnings.append(message) + return None, warnings + + +def _run_prompt( + *, + truffile: str, + app_ref: str, + prompt: str, + timeout: float | None, + settle_checks: int, + settle_delay: float, + fail_on_pending: bool, +) -> PromptRun: + prompt_path: Path | None = None + try: + with tempfile.NamedTemporaryFile("w", encoding="utf-8", suffix=".txt", delete=False) as handle: + handle.write(prompt) + handle.write("\n") + prompt_path = Path(handle.name) + command = [ + truffile, + "chat", + "--quiet", + "--json", + "--app", + app_ref, + "--prompt-file", + str(prompt_path), + ] + result = _run(command, timeout=timeout) + finally: + if prompt_path is not None: + try: + prompt_path.unlink() + except OSError: + pass + + if result.returncode != 0: + detail = result.error or result.stderr.strip() or result.stdout.strip() or "unknown error" + return PromptRun(result, None, f"command failed: {detail}", [], []) + payload, error = _load_json(result.stdout) + if error: + return PromptRun(result, None, f"invalid JSON response: {error}", [], []) + + settle_attempts: list[dict[str, Any]] = [] + warnings: list[str] = [] + task_id = str(payload.get("task_id") or "") + for attempt in range(1, max(0, settle_checks) + 1): + content = str(payload.get("content") or "").strip() + pending = payload.get("pending_user_response") is True + if content and not pending: + break + if not task_id: + break + if settle_delay > 0: + time.sleep(settle_delay) + settle_result, settle_payload, settle_error = _peek_task( + truffile=truffile, + task_id=task_id, + timeout=timeout, + ) + settle_attempts.append( + _record_settle_attempt( + attempt=attempt, + result=settle_result, + payload=settle_payload, + error=settle_error, + ) + ) + if settle_error: + warnings.append(f"settle check {attempt} failed: {settle_error}") + break + if settle_payload is not None: + payload = settle_payload + + validation_error, validation_warnings = _validate_payload(payload, fail_on_pending=fail_on_pending) + warnings.extend(validation_warnings) + return PromptRun(result, payload, validation_error, warnings, settle_attempts) + + +def _failure_artifact( + *, + prompt_index: int, + prompt: str, + result: CommandResult, + validation_error: str, + parsed_payload: dict[str, Any] | None, + warnings: list[str], + settle_attempts: list[dict[str, Any]], +) -> dict[str, Any]: + return { + "status": "error", + "prompt_index": prompt_index, + "prompt": prompt, + "validation_error": validation_error, + "warnings": warnings, + "parsed_payload": parsed_payload, + "settle_attempts": settle_attempts, + "command": result.command, + "exit_code": result.returncode, + "stdout": result.stdout, + "stderr": result.stderr, + "command_error": result.error, + } + + +def _summary_row( + *, + prompt_index: int, + prompt: str, + result: CommandResult, + payload: dict[str, Any] | None, + validation_error: str | None, + warnings: list[str], + settle_attempts: list[dict[str, Any]], + artifact: Path, +) -> dict[str, Any]: + return { + "prompt_index": prompt_index, + "exit_code": result.returncode, + "ok": validation_error is None, + "task_id": payload.get("task_id") if payload else None, + "attached_apps": payload.get("attached_apps") if payload else None, + "tool_calls": payload.get("tool_calls") if payload else None, + "pending_user_response": payload.get("pending_user_response") if payload else None, + "content_preview": _content_preview(payload.get("content") if payload else ""), + "prompt_preview": _content_preview(prompt), + "artifact": str(artifact), + "error": validation_error, + "warnings": warnings, + "settle_attempts": len(settle_attempts), + } + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Run prompt-file tests against a deployed WHOOP Truffle app.", + ) + parser.add_argument( + "--prompts", + type=Path, + default=DEFAULT_PROMPTS, + help=f"prompt file to run; blocks are separated by lines containing only --- (default: {DEFAULT_PROMPTS})", + ) + parser.add_argument("--app", default="whoop", help="Truffle app name, slug, or uuid to attach") + parser.add_argument("--out", type=Path, default=DEFAULT_OUT, help=f"artifact root (default: {DEFAULT_OUT})") + parser.add_argument("--truffile", default="truffile", help="truffile executable path (default: truffile)") + parser.add_argument( + "--timeout", + type=float, + default=None, + help="per truffile command timeout in seconds (default: no subprocess timeout)", + ) + parser.add_argument( + "--settle-checks", + type=int, + default=1, + help="number of --task-id polls after a pending or empty response before starting the next prompt (default: 1)", + ) + parser.add_argument( + "--settle-delay", + type=float, + default=0.5, + help="seconds to wait before each settle poll (default: 0.5)", + ) + parser.add_argument( + "--fail-on-pending", + action="store_true", + help="treat pending_user_response=true as a failure even when response content is present", + ) + parser.add_argument("--fail-fast", action="store_true", help="stop after the first failed prompt") + return parser + + +def main(argv: list[str] | None = None) -> int: + args = build_parser().parse_args(argv) + prompt_path = args.prompts.expanduser() + if not prompt_path.is_file(): + print(f"prompt file not found: {prompt_path}", file=sys.stderr) + return 2 + + try: + prompts = _parse_prompt_file(prompt_path) + except OSError as exc: + print(f"could not read prompt file: {exc}", file=sys.stderr) + return 2 + if not prompts: + print(f"prompt file did not contain any prompts: {prompt_path}", file=sys.stderr) + return 2 + + apps, app_error = _list_apps(args.truffile, timeout=args.timeout) + if app_error: + print(app_error, file=sys.stderr) + return 1 + matched_app = _resolve_app_ref(args.app, apps or []) + if matched_app is None: + app_names = ", ".join(str(app.get("name") or app.get("uuid") or "") for app in apps or []) + print(f"could not find app matching {args.app!r}. Installed apps: {app_names}", file=sys.stderr) + return 1 + + run_dir = _make_run_dir(args.out.expanduser()) + summary_path = run_dir / "summary.jsonl" + app_name = str(matched_app.get("name") or args.app) + app_uuid = str(matched_app.get("uuid") or "") + print(f"WHOOP app: {app_name}" + (f" ({app_uuid})" if app_uuid else "")) + print(f"Prompts: {len(prompts)}") + print(f"Artifacts: {run_dir}") + + failures = 0 + with summary_path.open("w", encoding="utf-8") as summary: + for index, prompt in enumerate(prompts, start=1): + print(f"[{index}/{len(prompts)}] running: {_content_preview(prompt, limit=90)}") + prompt_run = _run_prompt( + truffile=args.truffile, + app_ref=args.app, + prompt=prompt, + timeout=args.timeout, + settle_checks=args.settle_checks, + settle_delay=args.settle_delay, + fail_on_pending=args.fail_on_pending, + ) + result = prompt_run.result + payload = prompt_run.payload + validation_error = prompt_run.validation_error + artifact_path = run_dir / f"{index:03d}.json" + if validation_error is None and payload is not None: + _write_json(artifact_path, payload) + else: + failures += 1 + _write_json( + artifact_path, + _failure_artifact( + prompt_index=index, + prompt=prompt, + result=result, + validation_error=validation_error or "unknown validation error", + parsed_payload=payload, + warnings=prompt_run.warnings, + settle_attempts=prompt_run.settle_attempts, + ), + ) + + row = _summary_row( + prompt_index=index, + prompt=prompt, + result=result, + payload=payload, + validation_error=validation_error, + warnings=prompt_run.warnings, + settle_attempts=prompt_run.settle_attempts, + artifact=artifact_path, + ) + summary.write(json.dumps(row, ensure_ascii=False) + "\n") + summary.flush() + if validation_error is None: + print(f"[{index}/{len(prompts)}] ok: {_content_preview(payload.get('content'), limit=90)}") + for warning in prompt_run.warnings: + print(f"[{index}/{len(prompts)}] warning: {warning}", file=sys.stderr) + else: + print(f"[{index}/{len(prompts)}] failed: {validation_error}", file=sys.stderr) + if args.fail_fast: + break + + print(f"Summary: {summary_path}") + if failures: + print(f"Failed prompts: {failures}", file=sys.stderr) + return 1 + print("All prompts completed.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/app-store/whoop/tests/conftest.py b/app-store/whoop/tests/conftest.py new file mode 100644 index 0000000..3183229 --- /dev/null +++ b/app-store/whoop/tests/conftest.py @@ -0,0 +1,7 @@ +import sys +from pathlib import Path + + +_app_dir = Path(__file__).resolve().parent.parent +if str(_app_dir) not in sys.path: + sys.path.insert(0, str(_app_dir)) diff --git a/app-store/whoop/tests/test_whoop_app_shells.py b/app-store/whoop/tests/test_whoop_app_shells.py new file mode 100644 index 0000000..24e0774 --- /dev/null +++ b/app-store/whoop/tests/test_whoop_app_shells.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import unittest +import sys +from pathlib import Path +from unittest.mock import AsyncMock, patch + + +_app_dir = Path(__file__).resolve().parent.parent +if str(_app_dir) not in sys.path: + sys.path.insert(0, str(_app_dir)) + +from truffile.app_runtime import AppAuthError +from truffile.app_runtime.testing import AppHarness, FakeBackgroundRuntime + +from whoop_background import app as whoop_bg_app +from whoop_bg_worker import BgRunResult, PreparedSubmission +from whoop_foreground import WhoopForegroundApp + + +class _BackgroundWorkerStub: + def __init__(self, results: list[BgRunResult]) -> None: + self._results = list(results) + self.close_calls = 0 + + async def verify(self) -> tuple[bool, str]: + return True, "WHOOP background verify OK" + + async def run_cycle(self) -> BgRunResult: + if self._results: + return self._results.pop(0) + return BgRunResult() + + async def close(self) -> None: + self.close_calls += 1 + + +class _ForegroundClientStub: + async def verify(self) -> tuple[bool, str]: + return False, "WHOOP access token is missing" + + async def close(self) -> None: + return None + + +class TestWhoopAppShells(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self) -> None: + whoop_bg_app.reset_for_test() + + async def asyncTearDown(self) -> None: + whoop_bg_app.reset_for_test() + + async def test_background_harness_captures_submission(self) -> None: + worker = _BackgroundWorkerStub( + [ + BgRunResult( + submissions=[ + PreparedSubmission( + text="WHOOP recovery scored: 72% recovery, RHR 54 bpm.", + priority=1, + ) + ] + ) + ] + ) + + with patch.object(whoop_bg_app, "build_worker", return_value=worker): + harness = AppHarness(bg_app=whoop_bg_app, logger_names=["whoop.background"]) + result = await harness.run_bg(cycles=1) + + self.assertTrue(result.success) + self.assertEqual(len(result.submissions), 1) + self.assertIn("WHOOP recovery scored", result.submissions[0]["text"]) + + async def test_background_runtime_reports_repeated_auth_failure(self) -> None: + worker = _BackgroundWorkerStub( + [ + BgRunResult(auth_error="WHOOP rejected the installed credentials"), + BgRunResult(auth_error="WHOOP rejected the installed credentials"), + BgRunResult(auth_error="WHOOP rejected the installed credentials"), + ] + ) + + with patch.object(whoop_bg_app, "build_worker", return_value=worker): + runtime = FakeBackgroundRuntime(whoop_bg_app, cycles=3) + runtime.run() + + self.assertEqual(len(runtime.all_reported_errors), 1) + self.assertIn("whoop authentication failure", runtime.all_reported_errors[0].error_message) + + async def test_reset_for_test_closes_worker(self) -> None: + worker = _BackgroundWorkerStub([BgRunResult()]) + + with patch.object(whoop_bg_app, "build_worker", return_value=worker): + runtime = FakeBackgroundRuntime(whoop_bg_app, cycles=1) + runtime.run() + + whoop_bg_app.reset_for_test() + + self.assertGreaterEqual(worker.close_calls, 1) + + async def test_foreground_status_raises_auth_error_for_missing_token(self) -> None: + app = WhoopForegroundApp(client=_ForegroundClientStub()) # type: ignore[arg-type] + + with patch.object(app._error_reporter, "report_foreground_exception", new=AsyncMock()): + with self.assertRaisesRegex(AppAuthError, "WHOOP access token is missing"): + await app.invoke_tool("whoop_status") + + async def test_foreground_bad_limit_returns_tool_error(self) -> None: + app = WhoopForegroundApp(client=_ForegroundClientStub()) # type: ignore[arg-type] + + result = await app.invoke_tool("list_sleep", limit=0) + + self.assertEqual(result["status"], "error") + self.assertIn("limit must be between 1 and 25", result["message"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/app-store/whoop/truffile.yaml b/app-store/whoop/truffile.yaml new file mode 100644 index 0000000..bcc8d26 --- /dev/null +++ b/app-store/whoop/truffile.yaml @@ -0,0 +1,91 @@ +metadata: + name: WHOOP + bundle_id: org.truffle.whoop.bridge + description: | + Read your WHOOP profile, recovery, sleep, cycle, and workout data + through a Truffle-managed WHOOP bridge. + During install, connect a WHOOP developer app and authorize access + to your WHOOP account. + icon_file: ./icon.png + foreground: + process: + cmd: + - python + - whoop_foreground.py + working_directory: / + environment: + PYTHONUNBUFFERED: "1" + +steps: + - name: Install Python dependencies + type: bash + run: | + pip install --no-cache-dir "httpx>=0.27.0" + + - name: Copy application files + type: files + files: + - source: ./config.py + destination: ./config.py + - source: ./whoop_auth.py + destination: ./whoop_auth.py + - source: ./whoop_client.py + destination: ./whoop_client.py + - source: ./whoop_foreground.py + destination: ./whoop_foreground.py + + - name: Configure WHOOP OAuth tokens + type: text + ui_state_on_show: user_interaction_ready + ui_state_on_complete: move_to_background + content: | + Run `python app-store/whoop/scripts/bootstrap_whoop_oauth.py` locally before deploy. + Paste each generated value below. + + This manual token bootstrap is temporary for the app-store PR. + For public release, Truffle should provide the official WHOOP developer app + and callback URL. + + The OAuth request includes the `offline` scope so WHOOP returns a refresh token. + It may not appear as a dashboard checkbox. + fields: + - name: client_id + label: WHOOP Client ID + type: text + env: WHOOP_CLIENT_ID + - name: client_secret + label: WHOOP Client Secret + type: password + env: WHOOP_CLIENT_SECRET + - name: redirect_uri + label: WHOOP Redirect URI + type: text + default: http://127.0.0.1:8765/callback + env: WHOOP_REDIRECT_URI + - name: access_token + label: WHOOP Access Token + type: password + env: WHOOP_ACCESS_TOKEN + - name: refresh_token + label: WHOOP Refresh Token + type: password + env: WHOOP_REFRESH_TOKEN + - name: access_token_expires_at + label: WHOOP Access Token Expires At + type: text + env: WHOOP_ACCESS_TOKEN_EXPIRES_AT + - name: token_scope + label: WHOOP Token Scope + type: text + default: offline read:profile read:body_measurement read:cycles read:recovery read:sleep read:workout + env: WHOOP_TOKEN_SCOPE + - name: token_type + label: WHOOP Token Type + type: text + default: bearer + env: WHOOP_TOKEN_TYPE + validator: + type: bash + run: python ./whoop_foreground.py --verify + timeout: 120 + error_message: Could not verify WHOOP OAuth credentials. diff --git a/app-store/whoop/whoop_auth.py b/app-store/whoop/whoop_auth.py new file mode 100644 index 0000000..91157db --- /dev/null +++ b/app-store/whoop/whoop_auth.py @@ -0,0 +1,164 @@ +"""WHOOP OAuth token helpers for manual-token Truffle installs.""" + +from __future__ import annotations + +from datetime import datetime, timezone +import time +from typing import Any, Callable + +from truffile.app_runtime import AppAuthError, OAuth + +from config import WhoopConfig + + +class WhoopOAuth(OAuth): + APP_VAR_KEY = "whoop_oauth" + + def __init__( + self, + config: WhoopConfig | None = None, + *, + read_only: bool = False, + time_fn: Callable[[], float] | None = None, + ) -> None: + self._config = config or WhoopConfig.from_env() + self._time_fn = time_fn or time.time + super().__init__(self._config.token_store_path, read_only=read_only) + + @staticmethod + def token_from_payload(payload: dict[str, Any]) -> str: + return str(payload.get("access_token", "") or "").strip() + + def config_errors(self) -> list[str]: + errors: list[str] = [] + if not self._config.client_id: + errors.append("WHOOP_CLIENT_ID is missing") + if not self._config.client_secret: + errors.append("WHOOP_CLIENT_SECRET is missing") + if not self._config.redirect_uri: + errors.append("WHOOP_REDIRECT_URI is missing") + payload = self.get_oauth_payload() or {} + if not self.token_from_payload(payload): + errors.append("WHOOP access token is missing") + if not str(payload.get("refresh_token", "") or "").strip(): + errors.append("WHOOP refresh token is missing") + return errors + + def verify(self) -> tuple[bool, str]: + errors = self.config_errors() + if errors: + return False, "; ".join(errors) + return True, "WHOOP OAuth credentials loaded" + + def get_client_id(self) -> str: + return self._config.client_id + + def get_client_secret(self) -> str: + return self._config.client_secret + + def get_redirect_uri(self) -> str: + return self._config.redirect_uri + + def get_refresh_token(self) -> str: + payload = self.get_oauth_payload() or {} + token = str(payload.get("refresh_token", "") or "").strip() + if not token: + raise AppAuthError("WHOOP refresh token missing") + return token + + def can_refresh(self) -> bool: + return bool( + self._config.client_id + and self._config.client_secret + and str((self.get_oauth_payload() or {}).get("refresh_token", "") or "").strip() + ) + + def get_auth_headers(self) -> dict[str, str]: + token = self.get_access_token() + if not token: + return {} + return {"Authorization": f"Bearer {token}"} + + def token_expires_soon(self, *, leeway_seconds: float = 120.0) -> bool: + expires_at_value = self._token_expires_at_value() + if expires_at_value is None: + return False + return self._time_fn() + leeway_seconds >= expires_at_value + + def remember_token_response( + self, + payload: dict[str, Any], + *, + now: float | None = None, + ) -> dict[str, Any]: + current = self.get_oauth_payload() or {} + merged = dict(current) + merged.update({key: value for key, value in payload.items() if value is not None}) + if "expires_in" in payload: + try: + merged["expires_at"] = float(now if now is not None else self._time_fn()) + float(payload["expires_in"]) + except (TypeError, ValueError): + merged.pop("expires_at", None) + merged["token_type"] = str(merged.get("token_type", "bearer") or "bearer").strip() or "bearer" + self.save_oauth_payload(merged) + return merged + + def status(self) -> dict[str, Any]: + payload = self.get_oauth_payload() or {} + expires_at_value = self._token_expires_at_value(payload) + seconds_remaining = None + expires_at_utc = None + token_expired = None + token_expires_soon = None + if expires_at_value is not None: + now = self._time_fn() + seconds_remaining = max(0, int(expires_at_value - now)) + expires_at_utc = datetime.fromtimestamp(expires_at_value, tz=timezone.utc).isoformat() + token_expired = now >= expires_at_value + token_expires_soon = now + 120.0 >= expires_at_value + return { + "client_id_configured": bool(self._config.client_id), + "client_secret_configured": bool(self._config.client_secret), + "redirect_uri_configured": bool(self._config.redirect_uri), + "access_token_present": bool(self.token_from_payload(payload)), + "refresh_token_present": bool(str(payload.get("refresh_token", "") or "").strip()), + "token_expires_at": payload.get("expires_at"), + "token_expires_at_unix": expires_at_value, + "token_expires_at_utc": expires_at_utc, + "token_seconds_remaining": seconds_remaining, + "token_expired": token_expired, + "token_expires_soon": token_expires_soon, + "scope": str(payload.get("scope", "") or "").strip(), + "token_type": str(payload.get("token_type", "bearer") or "bearer").strip() or "bearer", + "token_store_path": str(self.token_file), + } + + def get_oauth_payload(self) -> dict[str, Any] | None: + payload = super().get_oauth_payload() + if payload: + return payload + env_payload = self._payload_from_env() + if env_payload and self.token_from_payload(env_payload) and not self._read_only: + self.save_oauth_payload(env_payload) + return env_payload + + def _payload_from_env(self) -> dict[str, Any] | None: + payload: dict[str, Any] = {} + if self._config.access_token: + payload["access_token"] = self._config.access_token + if self._config.refresh_token: + payload["refresh_token"] = self._config.refresh_token + if self._config.access_token_expires_at is not None: + payload["expires_at"] = self._config.access_token_expires_at + if self._config.token_scope: + payload["scope"] = self._config.token_scope + if self._config.token_type: + payload["token_type"] = self._config.token_type + return payload or None + + def _token_expires_at_value(self, payload: dict[str, Any] | None = None) -> float | None: + payload = payload if payload is not None else self.get_oauth_payload() or {} + try: + return float(payload.get("expires_at")) + except (TypeError, ValueError): + return None diff --git a/app-store/whoop/whoop_background.py b/app-store/whoop/whoop_background.py new file mode 100644 index 0000000..cc06249 --- /dev/null +++ b/app-store/whoop/whoop_background.py @@ -0,0 +1,100 @@ +# Draft background app shell. It is intentionally not enabled in truffile.yaml +# for the foreground-only PR. +"""Background WHOOP app shell.""" + +from __future__ import annotations + +import atexit +import asyncio +import threading +from typing import Any + +from truffile.app_runtime import BackgroundWorkerApp + +from whoop_bg_worker import BgRunResult, WhoopBackgroundWorker + + +class WhoopBackgroundApp(BackgroundWorkerApp[WhoopBackgroundWorker, BgRunResult]): + def __init__(self) -> None: + super().__init__("whoop", logger_name="whoop.background") + self._loop: asyncio.AbstractEventLoop | None = None + + def _run(self, coro: Any) -> Any: + try: + asyncio.get_running_loop() + except RuntimeError: + if self._loop is None or self._loop.is_closed(): + self._loop = asyncio.new_event_loop() + return self._loop.run_until_complete(coro) + + result: dict[str, Any] = {} + error: dict[str, BaseException] = {} + + def _runner() -> None: + try: + result["value"] = asyncio.run(coro) + except BaseException as exc: # noqa: BLE001 + error["exc"] = exc + + thread = threading.Thread(target=_runner, daemon=True) + thread.start() + thread.join() + if "exc" in error: + raise error["exc"] + return result.get("value") + + def build_worker(self) -> WhoopBackgroundWorker: + return WhoopBackgroundWorker() + + def verify_worker(self, worker: WhoopBackgroundWorker) -> tuple[bool, str]: + return self._run(worker.verify()) + + def run_cycle(self, worker: WhoopBackgroundWorker) -> BgRunResult: + return self._run(worker.run_cycle()) + + def handle_cycle_result(self, ctx: object, result: BgRunResult) -> None: + if result.auth_error: + self.report_auth_failure(ctx, result.auth_error) + return + + self.reset_auth_failures() + if result.error: + self.logger.error("WHOOP background cycle failed: %s", result.error) + return + + if not result.submissions: + self.logger.info("WHOOP background cycle produced no new signals") + return + + for submission in result.submissions: + self.submit_text( + ctx, + content=submission.text, + uris=submission.uris, + priority=submission.priority, + ) + + def reset_for_test(self) -> None: + worker = getattr(self, "_worker", None) + if worker is not None: + try: + self._run(worker.close()) + except Exception: + self.logger.exception("Failed to close WHOOP background worker during reset") + super().reset_for_test() + if self._loop is not None and not self._loop.is_closed(): + self._loop.close() + self._loop = None + + def cleanup(self) -> None: + self.reset_for_test() + + +app = WhoopBackgroundApp() + + +atexit.register(app.cleanup) + + +if __name__ == "__main__": + app.main() diff --git a/app-store/whoop/whoop_bg_worker.py b/app-store/whoop/whoop_bg_worker.py new file mode 100644 index 0000000..bc8259d --- /dev/null +++ b/app-store/whoop/whoop_bg_worker.py @@ -0,0 +1,457 @@ +# Draft background worker. It is intentionally not enabled in truffile.yaml +# for the foreground-only PR. +"""Background WHOOP worker that prepares ambient health signals.""" + +from __future__ import annotations + +import hashlib +import json +from dataclasses import dataclass, field +from typing import Any + +from truffile.app_runtime import AppAuthError +from truffle.app.background_pb2 import BackgroundContext + +from whoop_client import WhoopApiError, WhoopClient + +LOW_RECOVERY_THRESHOLD = 34.0 +LOW_SLEEP_PERFORMANCE_THRESHOLD = 70.0 +HIGH_STRAIN_THRESHOLD = 14.0 +SHORT_SLEEP_MILLI = 6 * 60 * 60 * 1000 + +_PRIORITY_LOW = getattr(BackgroundContext, "PRIORITY_LOW", getattr(BackgroundContext, "PRIORITY_DEFAULT", 0)) +_PRIORITY_DEFAULT = getattr(BackgroundContext, "PRIORITY_DEFAULT", getattr(BackgroundContext, "PRIORITY_HIGH", 1)) +_PRIORITY_HIGH = getattr(BackgroundContext, "PRIORITY_HIGH", _PRIORITY_DEFAULT) + + +@dataclass(frozen=True, slots=True) +class PreparedSubmission: + text: str + uris: tuple[str, ...] = () + priority: int = _PRIORITY_DEFAULT + + +@dataclass(frozen=True, slots=True) +class BgRunResult: + submissions: list[PreparedSubmission] = field(default_factory=list) + auth_error: str | None = None + error: str | None = None + + +class WhoopBackgroundWorker: + def __init__(self, *, client: Any | None = None) -> None: + self._client = client or WhoopClient() + self._seeded = False + self._seen_fingerprints: set[str] = set() + self._last_mismatch_key: str | None = None + + async def close(self) -> None: + close = getattr(self._client, "close", None) + if callable(close): + await close() + + async def verify(self) -> tuple[bool, str]: + try: + return await self._client.verify() + except Exception as exc: + return False, f"WHOOP background verification failed: {exc}" + + async def run_cycle(self) -> BgRunResult: + try: + summary = await self._client.get_recent_summary() + except AppAuthError as exc: + return BgRunResult(auth_error=str(exc)) + except WhoopApiError as exc: + if exc.status_code in {401, 403}: + return BgRunResult(auth_error=str(exc)) + return BgRunResult(error=f"WHOOP API error: HTTP {exc.status_code}") + except Exception as exc: + return BgRunResult(error=str(exc)) + + if not self._seeded: + self._seed(summary) + self._seeded = True + snapshot = self._build_snapshot(summary) + if not snapshot: + return BgRunResult() + return BgRunResult(submissions=[PreparedSubmission(text=snapshot, priority=_PRIORITY_LOW)]) + + submissions = self._build_changed_submissions(summary) + return BgRunResult(submissions=submissions) + + def _seed(self, summary: dict[str, Any]) -> None: + for kind, item in self._iter_items(summary): + fp = self._fingerprint(kind, item) + if fp: + self._seen_fingerprints.add(fp) + + recovery = self._dict_or_none(summary.get("latest_recovery")) + cycle = self._dict_or_none(summary.get("latest_cycle")) + self._last_mismatch_key = self._mismatch_key(recovery, cycle) + + def _build_changed_submissions(self, summary: dict[str, Any]) -> list[PreparedSubmission]: + submissions: list[PreparedSubmission] = [] + for kind, item in self._iter_items(summary): + fp = self._fingerprint(kind, item) + if not fp or fp in self._seen_fingerprints: + continue + self._seen_fingerprints.add(fp) + submission = self._submission_for(kind, item) + if submission is not None: + submissions.append(submission) + + recovery = self._dict_or_none(summary.get("latest_recovery")) + cycle = self._dict_or_none(summary.get("latest_cycle")) + mismatch_key = self._mismatch_key(recovery, cycle) + if mismatch_key and mismatch_key != self._last_mismatch_key: + self._last_mismatch_key = mismatch_key + submission = self._build_recovery_strain_mismatch(recovery, cycle) + if submission is not None: + submissions.append(submission) + + return submissions + + def _iter_items(self, summary: dict[str, Any]) -> list[tuple[str, dict[str, Any]]]: + items: list[tuple[str, dict[str, Any]]] = [] + for kind, key in ( + ("recovery", "latest_recovery"), + ("sleep", "latest_sleep"), + ("cycle", "latest_cycle"), + ): + item = self._dict_or_none(summary.get(key)) + if item and self._is_scored(item): + items.append((kind, item)) + + workouts = summary.get("recent_workouts") + if isinstance(workouts, list): + for workout in workouts: + item = self._dict_or_none(workout) + if item and self._is_scored(item): + items.append(("workout", item)) + return items + + def _submission_for(self, kind: str, item: dict[str, Any]) -> PreparedSubmission | None: + if kind == "recovery": + return self._build_recovery_submission(item) + if kind == "sleep": + return self._build_sleep_submission(item) + if kind == "workout": + return self._build_workout_submission(item) + if kind == "cycle": + return self._build_cycle_submission(item) + return None + + def _build_snapshot(self, summary: dict[str, Any]) -> str: + parts: list[str] = ["WHOOP current snapshot:"] + + recovery = self._dict_or_none(summary.get("latest_recovery")) + if recovery and self._is_scored(recovery): + score = self._dict_or_none(recovery.get("score")) or {} + parts.append( + self._compact_sentence( + "recovery", + [ + self._fmt_percent(score.get("recovery_score")), + self._fmt_bpm("RHR", score.get("resting_heart_rate")), + self._fmt_number("HRV", score.get("hrv_rmssd_milli"), "ms", decimals=1), + ], + ) + ) + + sleep = self._dict_or_none(summary.get("latest_sleep")) + if sleep and self._is_scored(sleep): + score = self._dict_or_none(sleep.get("score")) or {} + stage = self._dict_or_none(score.get("stage_summary")) or {} + parts.append( + self._compact_sentence( + "sleep", + [ + self._fmt_percent(score.get("sleep_performance_percentage"), label="performance"), + self._fmt_duration(self._as_float(stage.get("total_in_bed_time_milli")), label="in bed"), + self._fmt_duration(self._sleep_time_milli(stage), label="asleep"), + ], + ) + ) + + cycle = self._dict_or_none(summary.get("latest_cycle")) + if cycle and self._is_scored(cycle): + score = self._dict_or_none(cycle.get("score")) or {} + parts.append( + self._compact_sentence( + "day strain", + [ + self._fmt_number(None, score.get("strain"), None, decimals=1), + self._fmt_bpm("avg HR", score.get("average_heart_rate")), + ], + ) + ) + + workouts = [item for kind, item in self._iter_items(summary) if kind == "workout"] + if workouts: + workout = workouts[0] + score = self._dict_or_none(workout.get("score")) or {} + parts.append( + self._compact_sentence( + "latest workout", + [ + str(workout.get("sport_name") or "workout"), + self._fmt_number(None, score.get("strain"), "strain", decimals=1), + ], + ) + ) + + if len(parts) == 1: + return "" + return " ".join(parts) + + def _build_recovery_submission(self, recovery: dict[str, Any]) -> PreparedSubmission | None: + score = self._dict_or_none(recovery.get("score")) + if not score: + return None + + recovery_score = self._as_float(score.get("recovery_score")) + parts = [ + self._fmt_percent(recovery_score, label="recovery"), + self._fmt_bpm("RHR", score.get("resting_heart_rate")), + self._fmt_number("HRV", score.get("hrv_rmssd_milli"), "ms", decimals=1), + self._fmt_percent(score.get("spo2_percentage"), label="SpO2"), + self._fmt_number("skin temp", score.get("skin_temp_celsius"), "C", decimals=1), + ] + content = f"WHOOP recovery scored: {self._join_parts(parts)}." + priority = _PRIORITY_HIGH if recovery_score is not None and recovery_score < LOW_RECOVERY_THRESHOLD else _PRIORITY_DEFAULT + return PreparedSubmission(text=content, priority=priority) + + def _build_sleep_submission(self, sleep: dict[str, Any]) -> PreparedSubmission | None: + score = self._dict_or_none(sleep.get("score")) + if not score: + return None + stage = self._dict_or_none(score.get("stage_summary")) or {} + + asleep_milli = self._sleep_time_milli(stage) + performance = self._as_float(score.get("sleep_performance_percentage")) + parts = [ + self._fmt_percent(performance, label="performance"), + self._fmt_percent(score.get("sleep_consistency_percentage"), label="consistency"), + self._fmt_percent(score.get("sleep_efficiency_percentage"), label="efficiency"), + self._fmt_duration(asleep_milli, label="asleep"), + self._fmt_duration(self._as_float(stage.get("total_in_bed_time_milli")), label="in bed"), + self._fmt_duration(self._as_float(stage.get("total_rem_sleep_time_milli")), label="REM"), + self._fmt_duration(self._as_float(stage.get("total_slow_wave_sleep_time_milli")), label="SWS"), + self._fmt_duration(self._as_float(stage.get("total_awake_time_milli")), label="awake"), + self._fmt_int(stage.get("disturbance_count"), label="disturbances"), + ] + content = f"WHOOP sleep scored: {self._join_parts(parts)}." + high = (performance is not None and performance < LOW_SLEEP_PERFORMANCE_THRESHOLD) or ( + asleep_milli is not None and asleep_milli < SHORT_SLEEP_MILLI + ) + return PreparedSubmission(text=content, priority=_PRIORITY_HIGH if high else _PRIORITY_DEFAULT) + + def _build_workout_submission(self, workout: dict[str, Any]) -> PreparedSubmission | None: + score = self._dict_or_none(workout.get("score")) + if not score: + return None + + strain = self._as_float(score.get("strain")) + duration = self._duration_between(workout.get("start"), workout.get("end")) + parts = [ + str(workout.get("sport_name") or "workout"), + self._fmt_duration(duration, label=None), + self._fmt_number(None, strain, "strain", decimals=1), + self._fmt_bpm("avg HR", score.get("average_heart_rate")), + self._fmt_bpm("max HR", score.get("max_heart_rate")), + self._fmt_number(None, score.get("kilojoule"), "kJ", decimals=0), + self._fmt_percent(score.get("percent_recorded"), label="recorded"), + self._fmt_distance(score.get("distance_meter")), + self._format_zones(self._dict_or_none(score.get("zone_durations"))), + ] + content = f"WHOOP workout: {self._join_parts(parts)}." + priority = _PRIORITY_HIGH if strain is not None and strain >= HIGH_STRAIN_THRESHOLD else _PRIORITY_DEFAULT + return PreparedSubmission(text=content, priority=priority) + + def _build_cycle_submission(self, cycle: dict[str, Any]) -> PreparedSubmission | None: + score = self._dict_or_none(cycle.get("score")) + if not score: + return None + + open_label = "open cycle" if not cycle.get("end") else "closed cycle" + parts = [ + self._fmt_number("day strain", score.get("strain"), None, decimals=1), + self._fmt_bpm("avg HR", score.get("average_heart_rate")), + self._fmt_bpm("max HR", score.get("max_heart_rate")), + self._fmt_number(None, score.get("kilojoule"), "kJ", decimals=0), + open_label, + ] + return PreparedSubmission(text=f"WHOOP cycle strain updated: {self._join_parts(parts)}.") + + def _build_recovery_strain_mismatch( + self, + recovery: dict[str, Any] | None, + cycle: dict[str, Any] | None, + ) -> PreparedSubmission | None: + if not recovery or not cycle: + return None + recovery_score = self._as_float((self._dict_or_none(recovery.get("score")) or {}).get("recovery_score")) + strain = self._as_float((self._dict_or_none(cycle.get("score")) or {}).get("strain")) + if recovery_score is None or strain is None: + return None + if recovery_score >= LOW_RECOVERY_THRESHOLD or strain < HIGH_STRAIN_THRESHOLD: + return None + content = f"WHOOP load/recovery mismatch: {recovery_score:.0f}% recovery with {strain:.1f} day strain." + return PreparedSubmission(text=content, priority=_PRIORITY_HIGH) + + def _mismatch_key(self, recovery: dict[str, Any] | None, cycle: dict[str, Any] | None) -> str | None: + mismatch = self._build_recovery_strain_mismatch(recovery, cycle) + if mismatch is None: + return None + recovery_score = (self._dict_or_none(recovery.get("score")) or {}).get("recovery_score") if recovery else None + strain = (self._dict_or_none(cycle.get("score")) or {}).get("strain") if cycle else None + return f"{recovery.get('cycle_id')}:{cycle.get('id')}:{recovery_score}:{strain}" if recovery and cycle else None + + def _fingerprint(self, kind: str, item: dict[str, Any]) -> str: + if kind == "recovery": + score = self._dict_or_none(item.get("score")) or {} + basis = { + "kind": kind, + "cycle_id": item.get("cycle_id"), + "updated_at": item.get("updated_at"), + "recovery_score": score.get("recovery_score"), + "resting_heart_rate": score.get("resting_heart_rate"), + "hrv_rmssd_milli": score.get("hrv_rmssd_milli"), + } + elif kind in {"sleep", "workout"}: + score = self._dict_or_none(item.get("score")) or {} + basis = {"kind": kind, "id": item.get("id"), "updated_at": item.get("updated_at"), "score": score} + elif kind == "cycle": + score = self._dict_or_none(item.get("score")) or {} + basis = { + "kind": kind, + "id": item.get("id"), + "updated_at": item.get("updated_at"), + "strain": score.get("strain"), + "average_heart_rate": score.get("average_heart_rate"), + "max_heart_rate": score.get("max_heart_rate"), + } + else: + basis = {"kind": kind, "item": item} + raw = json.dumps(basis, sort_keys=True, separators=(",", ":"), default=str) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + @staticmethod + def _is_scored(item: dict[str, Any]) -> bool: + return str(item.get("score_state") or "").upper() == "SCORED" and isinstance(item.get("score"), dict) + + @staticmethod + def _dict_or_none(value: Any) -> dict[str, Any] | None: + return value if isinstance(value, dict) else None + + @staticmethod + def _as_float(value: Any) -> float | None: + try: + return float(value) + except (TypeError, ValueError): + return None + + @staticmethod + def _fmt_percent(value: Any, *, label: str | None = None) -> str: + numeric = WhoopBackgroundWorker._as_float(value) + if numeric is None: + return "" + rendered = f"{numeric:.0f}%" + return f"{rendered} {label}" if label else rendered + + @staticmethod + def _fmt_bpm(label: str, value: Any) -> str: + numeric = WhoopBackgroundWorker._as_float(value) + if numeric is None: + return "" + return f"{label} {numeric:.0f} bpm" + + @staticmethod + def _fmt_number(label: str | None, value: Any, unit: str | None, *, decimals: int) -> str: + numeric = WhoopBackgroundWorker._as_float(value) + if numeric is None: + return "" + rendered = f"{numeric:.{decimals}f}" + if unit: + rendered = f"{rendered} {unit}" + return f"{label} {rendered}" if label else rendered + + @staticmethod + def _fmt_int(value: Any, *, label: str) -> str: + numeric = WhoopBackgroundWorker._as_float(value) + if numeric is None: + return "" + return f"{numeric:.0f} {label}" + + @staticmethod + def _fmt_duration(value: Any, *, label: str | None) -> str: + milli = WhoopBackgroundWorker._as_float(value) + if milli is None: + return "" + total_minutes = max(0, int(round(milli / 60_000))) + hours, minutes = divmod(total_minutes, 60) + if hours: + rendered = f"{hours}h{minutes:02d}m" + else: + rendered = f"{minutes}m" + return f"{rendered} {label}" if label else rendered + + @staticmethod + def _fmt_distance(value: Any) -> str: + meters = WhoopBackgroundWorker._as_float(value) + if meters is None: + return "" + if meters >= 1000: + return f"{meters / 1000:.2f} km" + return f"{meters:.0f} m" + + @staticmethod + def _duration_between(start: Any, end: Any) -> float | None: + from datetime import datetime + + if not isinstance(start, str) or not isinstance(end, str): + return None + try: + start_dt = datetime.fromisoformat(start.replace("Z", "+00:00")) + end_dt = datetime.fromisoformat(end.replace("Z", "+00:00")) + except ValueError: + return None + return max(0.0, (end_dt - start_dt).total_seconds() * 1000) + + @staticmethod + def _sleep_time_milli(stage: dict[str, Any]) -> float | None: + values = [ + WhoopBackgroundWorker._as_float(stage.get("total_light_sleep_time_milli")), + WhoopBackgroundWorker._as_float(stage.get("total_slow_wave_sleep_time_milli")), + WhoopBackgroundWorker._as_float(stage.get("total_rem_sleep_time_milli")), + ] + present = [value for value in values if value is not None] + if not present: + return None + return sum(present) + + @staticmethod + def _format_zones(zones: dict[str, Any] | None) -> str: + if not zones: + return "" + ordered = [ + ("Z5", zones.get("zone_five_milli")), + ("Z4", zones.get("zone_four_milli")), + ("Z3", zones.get("zone_three_milli")), + ("Z2", zones.get("zone_two_milli")), + ] + rendered = [ + f"{label} {WhoopBackgroundWorker._fmt_duration(value, label=None)}" + for label, value in ordered + if WhoopBackgroundWorker._as_float(value) + ] + return "zones " + ", ".join(rendered) if rendered else "" + + @staticmethod + def _join_parts(parts: list[str]) -> str: + return ", ".join(part for part in parts if part) + + @staticmethod + def _compact_sentence(label: str, parts: list[str]) -> str: + body = WhoopBackgroundWorker._join_parts(parts) + return f"{label}: {body}." if body else "" diff --git a/app-store/whoop/whoop_client.py b/app-store/whoop/whoop_client.py new file mode 100644 index 0000000..f839a35 --- /dev/null +++ b/app-store/whoop/whoop_client.py @@ -0,0 +1,348 @@ +"""Thin async WHOOP client with token refresh support.""" + +from __future__ import annotations + +import asyncio +import time +from typing import Any, Callable +from urllib.parse import urlencode + +import httpx + +from truffile.app_runtime import AppAuthError, AppRuntimeFailure, HttpTransport + +from config import WhoopConfig +from whoop_auth import WhoopOAuth + + +class WhoopApiError(AppRuntimeFailure): + def __init__( + self, + message: str, + *, + status_code: int, + response_text: str = "", + ) -> None: + super().__init__(message) + self.status_code = status_code + self.response_text = response_text + + +class _HttpxTransport: + def __init__(self, *, timeout: float = 30.0) -> None: + self._client = httpx.AsyncClient(timeout=timeout) + + async def request( + self, + method: str, + url: str, + *, + params: dict[str, Any] | None = None, + json: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, + content: str | None = None, + ) -> httpx.Response: + return await self._client.request( + method=method.upper(), + url=url, + params=params, + json=json, + headers=headers, + content=content, + ) + + async def close(self) -> None: + await self._client.aclose() + + +def _mask_token(token: str) -> str: + cleaned = token.strip() + if len(cleaned) <= 8: + return cleaned[:2] + "..." if cleaned else "none" + return f"{cleaned[:4]}...{cleaned[-4:]}" + + +class WhoopClient: + def __init__( + self, + *, + config: WhoopConfig | None = None, + auth: WhoopOAuth | None = None, + http: HttpTransport | None = None, + time_fn: Callable[[], float] | None = None, + ) -> None: + self._config = config or WhoopConfig.from_env() + self._time_fn = time_fn or time.time + self._auth = auth or WhoopOAuth(self._config, time_fn=self._time_fn) + self._http = http or _HttpxTransport() + self._refresh_lock = asyncio.Lock() + + @property + def auth(self) -> WhoopOAuth: + return self._auth + + async def close(self) -> None: + try: + await self._http.close() + except Exception: + pass + + async def verify(self) -> tuple[bool, str]: + errors = self._auth.config_errors() + if errors: + return False, "; ".join(errors) + try: + profile = await self.get_profile_basic() + except AppAuthError as exc: + return False, f"WHOOP verification failed: {exc}" + except WhoopApiError as exc: + if exc.status_code: + return False, f"WHOOP verification failed: HTTP {exc.status_code} {exc}" + return False, f"WHOOP verification failed: {exc}" + except Exception as exc: + return False, f"WHOOP verification failed: {exc}" + + masked = _mask_token(self._auth.get_access_token()) + return True, ( + "WHOOP credentials verified. " + f"user_id={profile.get('user_id')} " + f"email={profile.get('email')} " + f"token={masked}" + ) + + async def get_profile_basic(self) -> dict[str, Any]: + return await self._request("GET", "/v2/user/profile/basic") + + async def get_body_measurements(self) -> dict[str, Any]: + return await self._request("GET", "/v2/user/measurement/body") + + async def list_cycles( + self, + *, + limit: int | None = None, + start: str | None = None, + end: str | None = None, + next_token: str | None = None, + ) -> dict[str, Any]: + return await self._request( + "GET", + "/v2/cycle", + params={ + "limit": limit, + "start": start, + "end": end, + "nextToken": next_token, + }, + ) + + async def get_cycle_by_id(self, cycle_id: int) -> dict[str, Any]: + return await self._request("GET", f"/v2/cycle/{cycle_id}") + + async def list_recovery( + self, + *, + limit: int | None = None, + start: str | None = None, + end: str | None = None, + next_token: str | None = None, + ) -> dict[str, Any]: + return await self._request( + "GET", + "/v2/recovery", + params={ + "limit": limit, + "start": start, + "end": end, + "nextToken": next_token, + }, + ) + + async def get_recovery_for_cycle(self, cycle_id: int) -> dict[str, Any]: + return await self._request("GET", f"/v2/cycle/{cycle_id}/recovery") + + async def list_sleep( + self, + *, + limit: int | None = None, + start: str | None = None, + end: str | None = None, + next_token: str | None = None, + ) -> dict[str, Any]: + return await self._request( + "GET", + "/v2/activity/sleep", + params={ + "limit": limit, + "start": start, + "end": end, + "nextToken": next_token, + }, + ) + + async def get_sleep_by_id(self, sleep_id: str) -> dict[str, Any]: + return await self._request("GET", f"/v2/activity/sleep/{sleep_id}") + + async def get_sleep_for_cycle(self, cycle_id: int) -> dict[str, Any]: + return await self._request("GET", f"/v2/cycle/{cycle_id}/sleep") + + async def list_workouts( + self, + *, + limit: int | None = None, + start: str | None = None, + end: str | None = None, + next_token: str | None = None, + ) -> dict[str, Any]: + return await self._request( + "GET", + "/v2/activity/workout", + params={ + "limit": limit, + "start": start, + "end": end, + "nextToken": next_token, + }, + ) + + async def get_workout_by_id(self, workout_id: str) -> dict[str, Any]: + return await self._request("GET", f"/v2/activity/workout/{workout_id}") + + async def get_recent_summary(self) -> dict[str, Any]: + profile = await self.get_profile_basic() + body = await self.get_body_measurements() + cycles = await self.list_cycles(limit=1) + latest_cycle = self._first_record(cycles) + latest_recovery = None + latest_sleep = None + if isinstance(latest_cycle, dict) and latest_cycle.get("id") is not None: + cycle_id = int(latest_cycle["id"]) + latest_recovery = await self.get_recovery_for_cycle(cycle_id) + latest_sleep = await self.get_sleep_for_cycle(cycle_id) + workouts = await self.list_workouts(limit=3) + return { + "profile": profile, + "body_measurements": body, + "latest_cycle": latest_cycle, + "latest_recovery": latest_recovery, + "latest_sleep": latest_sleep, + "recent_workouts": workouts.get("records", []), + "recent_workouts_next_token": workouts.get("next_token"), + } + + def auth_status(self) -> dict[str, Any]: + return self._auth.status() + + async def _request( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + retry_on_auth: bool = True, + ) -> dict[str, Any]: + if not self._auth.get_access_token() and self._auth.can_refresh(): + await self._refresh_access_token() + + if self._auth.token_expires_soon(): + await self._refresh_access_token() + + response = await self._send_request(method, path, params=params) + if response.status_code == 401 and retry_on_auth: + await self._refresh_access_token(force=True) + response = await self._send_request(method, path, params=params) + + if response.status_code in {401, 403}: + raise AppAuthError( + "WHOOP rejected the installed credentials. Reinstall the app with fresh WHOOP tokens." + ) + + if not response.is_success: + raise WhoopApiError( + f"WHOOP API request failed for {path}", + status_code=response.status_code, + response_text=self._response_text(response), + ) + + data = response.json() + if not isinstance(data, dict): + raise AppRuntimeFailure(f"WHOOP API returned a non-object payload for {path}") + return data + + async def _send_request( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + ) -> Any: + clean_params = {key: value for key, value in (params or {}).items() if value is not None} + headers = { + "Accept": "application/json", + } + headers.update(self._auth.get_auth_headers()) + return await self._http.request( + method.upper(), + f"{self._config.api_base.rstrip('/')}{path}", + params=clean_params or None, + headers=headers, + ) + + async def _refresh_access_token(self, *, force: bool = False) -> dict[str, Any]: + async with self._refresh_lock: + if not force and self._auth.get_access_token() and not self._auth.token_expires_soon(): + payload = self._auth.get_oauth_payload() + return payload if payload is not None else {} + + if not self._auth.can_refresh(): + raise AppAuthError( + "WHOOP refresh requires WHOOP_CLIENT_ID, WHOOP_CLIENT_SECRET, and WHOOP_REFRESH_TOKEN." + ) + + refresh_payload = { + "grant_type": "refresh_token", + "refresh_token": self._auth.get_refresh_token(), + "client_id": self._auth.get_client_id(), + "client_secret": self._auth.get_client_secret(), + "scope": "offline", + } + response = await self._http.request( + "POST", + self._config.token_url, + headers={ + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + }, + content=urlencode(refresh_payload), + ) + + if response.status_code in {401, 403}: + raise AppAuthError( + "WHOOP token refresh failed. Check the client credentials and refresh token." + ) + + if not response.is_success: + raise WhoopApiError( + "WHOOP token refresh failed", + status_code=response.status_code, + response_text=self._response_text(response), + ) + + data = response.json() + if not isinstance(data, dict) or not str(data.get("access_token", "") or "").strip(): + raise AppRuntimeFailure("WHOOP token refresh returned an invalid payload") + return self._auth.remember_token_response(data, now=self._time_fn()) + + @staticmethod + def _first_record(payload: dict[str, Any]) -> dict[str, Any] | None: + records = payload.get("records") + if not isinstance(records, list) or not records: + return None + first = records[0] + return first if isinstance(first, dict) else None + + @staticmethod + def _response_text(response: Any) -> str: + try: + return str(response.text or "") + except Exception: + return "" diff --git a/app-store/whoop/whoop_foreground.py b/app-store/whoop/whoop_foreground.py new file mode 100644 index 0000000..b2da2c3 --- /dev/null +++ b/app-store/whoop/whoop_foreground.py @@ -0,0 +1,386 @@ +"""Foreground WHOOP app exposing read-only health data tools.""" + +from __future__ import annotations + +import argparse +import atexit +import asyncio +import sys +from typing import Any + +from truffile.app_runtime import ForegroundApp, ToolSpec, err, ok, phosphor_icon_url +from truffile.app_runtime.errors import AppAuthError + +from whoop_client import WhoopApiError, WhoopClient + + +WHOOP_CONTEXT_NOTE = ( + "WHOOP strain is scored from 0-21; this app treats strain >=14 as high for alerts. " + "Recovery is tied to the scored sleep/recovery period for a cycle, while an open cycle's " + "strain is current-day load. Stick to WHOOP data and avoid medical claims." +) + + +def _validate_limit(limit: int) -> None: + if limit < 1 or limit > 25: + raise ValueError("limit must be between 1 and 25") + + +class WhoopForegroundApp(ForegroundApp): + def __init__(self, *, client: WhoopClient | None = None) -> None: + super().__init__("whoop", logger_name="whoop.foreground") + self._client = client + self._register_tools() + + def _get_client(self) -> WhoopClient: + if self._client is None: + self._client = WhoopClient() + return self._client + + async def aclose(self) -> None: + client = self._client + if client is None: + return + self._client = None + await client.close() + + async def _tool_error(self, exc: BaseException) -> dict[str, Any]: + if isinstance(exc, AppAuthError): + raise exc + if isinstance(exc, WhoopApiError): + return err( + f"WHOOP API error: {exc.status_code}", + kind="http", + status_code=exc.status_code, + response=exc.response_text[:1500], + ) + return err(str(exc)) + + @staticmethod + def _list_result(label: str, payload: dict[str, Any]) -> dict[str, Any]: + records = payload.get("records", []) + count = len(records) if isinstance(records, list) else 0 + return ok( + f"Fetched {count} {label}", + records=records, + count=count, + next_token=payload.get("next_token"), + ) + + def _register_tools(self) -> None: + @self.tool( + ToolSpec( + name="whoop_status", + description="Verify WHOOP connectivity and show installed token status, including human-readable token expiry.", + icon=phosphor_icon_url("heartbeat"), + readonly=True, + ) + ) + async def whoop_status() -> dict[str, Any]: + try: + client = self._get_client() + ok_state, message = await client.verify() + if not ok_state: + raise AppAuthError(message) + profile = await client.get_profile_basic() + return ok(message, auth=client.auth_status(), profile=profile) + except Exception as exc: + return await self._tool_error(exc) + + @self.tool( + ToolSpec( + name="get_profile_basic", + description="Fetch the WHOOP user's basic profile.", + icon=phosphor_icon_url("user-circle"), + readonly=True, + ) + ) + async def get_profile_basic() -> dict[str, Any]: + try: + return ok("WHOOP profile fetched", profile=await self._get_client().get_profile_basic()) + except Exception as exc: + return await self._tool_error(exc) + + @self.tool( + ToolSpec( + name="get_body_measurements", + description="Fetch the WHOOP user's body measurements and max heart rate.", + icon=phosphor_icon_url("ruler"), + readonly=True, + ) + ) + async def get_body_measurements() -> dict[str, Any]: + try: + return ok( + "WHOOP body measurements fetched", + measurements=await self._get_client().get_body_measurements(), + ) + except Exception as exc: + return await self._tool_error(exc) + + @self.tool( + ToolSpec( + name="list_cycles", + description=( + "List recent WHOOP cycles with optional time range filters. " + f"{WHOOP_CONTEXT_NOTE}" + ), + icon=phosphor_icon_url("repeat"), + readonly=True, + ) + ) + async def list_cycles( + limit: int = 10, + start: str | None = None, + end: str | None = None, + next_token: str | None = None, + ) -> dict[str, Any]: + try: + _validate_limit(limit) + payload = await self._get_client().list_cycles( + limit=limit, + start=start, + end=end, + next_token=next_token, + ) + return self._list_result("cycles", payload) + except Exception as exc: + return await self._tool_error(exc) + + @self.tool( + ToolSpec( + name="get_cycle_by_id", + description=( + "Fetch a single WHOOP cycle by numeric cycle id. " + f"{WHOOP_CONTEXT_NOTE}" + ), + icon=phosphor_icon_url("clock-counter-clockwise"), + readonly=True, + ) + ) + async def get_cycle_by_id(cycle_id: int) -> dict[str, Any]: + try: + return ok("WHOOP cycle fetched", cycle=await self._get_client().get_cycle_by_id(cycle_id)) + except Exception as exc: + return await self._tool_error(exc) + + @self.tool( + ToolSpec( + name="list_recovery", + description=( + "List recent WHOOP recovery records with optional time range filters. " + f"{WHOOP_CONTEXT_NOTE}" + ), + icon=phosphor_icon_url("battery-high"), + readonly=True, + ) + ) + async def list_recovery( + limit: int = 10, + start: str | None = None, + end: str | None = None, + next_token: str | None = None, + ) -> dict[str, Any]: + try: + _validate_limit(limit) + payload = await self._get_client().list_recovery( + limit=limit, + start=start, + end=end, + next_token=next_token, + ) + return self._list_result("recovery records", payload) + except Exception as exc: + return await self._tool_error(exc) + + @self.tool( + ToolSpec( + name="get_recovery_for_cycle", + description=( + "Fetch the WHOOP recovery record associated with a cycle id. " + f"{WHOOP_CONTEXT_NOTE}" + ), + icon=phosphor_icon_url("battery-charging"), + readonly=True, + ) + ) + async def get_recovery_for_cycle(cycle_id: int) -> dict[str, Any]: + try: + return ok( + "WHOOP recovery fetched", + recovery=await self._get_client().get_recovery_for_cycle(cycle_id), + ) + except Exception as exc: + return await self._tool_error(exc) + + @self.tool( + ToolSpec( + name="list_sleep", + description=( + "List recent WHOOP sleep records with optional time range filters. " + "Use this for sleep performance, efficiency, REM, SWS, awake time, and sleep duration comparisons." + ), + icon=phosphor_icon_url("moon-stars"), + readonly=True, + ) + ) + async def list_sleep( + limit: int = 10, + start: str | None = None, + end: str | None = None, + next_token: str | None = None, + ) -> dict[str, Any]: + try: + _validate_limit(limit) + payload = await self._get_client().list_sleep( + limit=limit, + start=start, + end=end, + next_token=next_token, + ) + return self._list_result("sleep records", payload) + except Exception as exc: + return await self._tool_error(exc) + + @self.tool( + ToolSpec( + name="get_sleep_by_id", + description="Fetch a WHOOP sleep record by sleep UUID, including sleep stages and sleep score details.", + icon=phosphor_icon_url("bed"), + readonly=True, + ) + ) + async def get_sleep_by_id(sleep_id: str) -> dict[str, Any]: + try: + return ok("WHOOP sleep fetched", sleep=await self._get_client().get_sleep_by_id(sleep_id)) + except Exception as exc: + return await self._tool_error(exc) + + @self.tool( + ToolSpec( + name="get_sleep_for_cycle", + description="Fetch the WHOOP sleep record associated with a cycle id, including sleep stages and sleep score details.", + icon=phosphor_icon_url("moon"), + readonly=True, + ) + ) + async def get_sleep_for_cycle(cycle_id: int) -> dict[str, Any]: + try: + return ok("WHOOP cycle sleep fetched", sleep=await self._get_client().get_sleep_for_cycle(cycle_id)) + except Exception as exc: + return await self._tool_error(exc) + + @self.tool( + ToolSpec( + name="list_workouts", + description=( + "List recent WHOOP workouts with optional time range filters. " + "Use WHOOP strain, duration, heart rate, zones, and energy as data; avoid medical claims." + ), + icon=phosphor_icon_url("barbell"), + readonly=True, + ) + ) + async def list_workouts( + limit: int = 10, + start: str | None = None, + end: str | None = None, + next_token: str | None = None, + ) -> dict[str, Any]: + try: + _validate_limit(limit) + payload = await self._get_client().list_workouts( + limit=limit, + start=start, + end=end, + next_token=next_token, + ) + return self._list_result("workouts", payload) + except Exception as exc: + return await self._tool_error(exc) + + @self.tool( + ToolSpec( + name="get_workout_by_id", + description=( + "Fetch a WHOOP workout by workout UUID. " + "Use WHOOP strain, duration, heart rate, zones, and energy as data; avoid medical claims." + ), + icon=phosphor_icon_url("person-simple-run"), + readonly=True, + ) + ) + async def get_workout_by_id(workout_id: str) -> dict[str, Any]: + try: + return ok( + "WHOOP workout fetched", + workout=await self._get_client().get_workout_by_id(workout_id), + ) + except Exception as exc: + return await self._tool_error(exc) + + @self.tool( + ToolSpec( + name="get_recent_whoop_summary", + description=( + "Fetch a compact WHOOP snapshot: profile, body measurements, latest cycle, latest recovery, " + f"latest sleep, and recent workouts. {WHOOP_CONTEXT_NOTE}" + ), + icon=phosphor_icon_url("chart-line"), + readonly=True, + ) + ) + async def get_recent_whoop_summary() -> dict[str, Any]: + try: + return ok("WHOOP summary fetched", summary=await self._get_client().get_recent_summary()) + except Exception as exc: + return await self._tool_error(exc) + + +app = WhoopForegroundApp() + + +def _cleanup() -> None: + try: + asyncio.run(app.aclose()) + except Exception: + pass + + +atexit.register(_cleanup) + + +async def _verify_async() -> int: + client = WhoopClient() + try: + ok_state, message = await client.verify() + finally: + await client.close() + print(message, flush=True) + return 0 if ok_state else 1 + + +def verify() -> int: + return asyncio.run(_verify_async()) + + +def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="WHOOP foreground app") + parser.add_argument("--verify", action="store_true", help="Verify the installed WHOOP OAuth credentials") + return parser.parse_args(argv) + + +def main(argv: list[str] | None = None) -> int: + try: + args = _parse_args(argv) + if args.verify: + return verify() + app.run() + return 0 + except Exception as exc: + print(str(exc), file=sys.stderr, flush=True) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main())