From b5c6fe37177c8d6f960a221e07ad5beb9ff2d674 Mon Sep 17 00:00:00 2001 From: Yesudeep Mangalapilly Date: Sat, 31 Jan 2026 17:30:07 -0800 Subject: [PATCH 1/2] feat(py): implement Reflection API v2 with WebSocket and JSON-RPC 2.0 This implements the Reflection API v2 for Python following RFC #4211. Architecture Change: - V1 (HTTP server): Runtime hosts an HTTP server, CLI/DevUI connect to it - V2 (WebSocket client): CLI hosts a WebSocket server, Runtimes connect outbound The v2 API reverses the connection direction, allowing better support for bidirectional actions and environments where binding a port is impractical. Module Reorganization: - reflection.py: New v2 WebSocket client (primary when enabled) - reflection_v1.py: Existing HTTP server implementation (default) V2 Implementation: - ReflectionClientV2 connects to a runtime manager via WebSocket - JSON-RPC 2.0 protocol for all communication - Supports: listActions, runAction, cancelAction, listValues - Streaming: runActionState and streamChunk notifications - Auto-reconnection with exponential backoff (1s to 60s max) - Proper task-based cancellation via asyncio.current_task().cancel() Activation: - V2 is activated when GENKIT_REFLECTION_V2_SERVER env var is set - V1 remains the default when env var is not set Dependencies: - websockets>=15.0 as core dependency Tests: - 25 tests for v2 WebSocket client - 7 tests for v1 HTTP server See: RFC #4211 --- py/packages/genkit/pyproject.toml | 1 + .../genkit/src/genkit/ai/_base_async.py | 151 ++- .../genkit/src/genkit/core/constants.py | 4 + .../genkit/src/genkit/core/reflection.py | 1091 ++++++++++------- .../genkit/src/genkit/core/reflection_v1.py | 648 ++++++++++ .../genkit/core/endpoints/reflection_test.py | 536 +++++--- .../core/endpoints/reflection_v1_test.py | 220 ++++ py/samples/multi-server/src/main.py | 2 +- py/uv.lock | 2 + 9 files changed, 1942 insertions(+), 713 deletions(-) create mode 100644 py/packages/genkit/src/genkit/core/reflection_v1.py create mode 100644 py/packages/genkit/tests/genkit/core/endpoints/reflection_v1_test.py diff --git a/py/packages/genkit/pyproject.toml b/py/packages/genkit/pyproject.toml index 000e89053e..ad23cf023b 100644 --- a/py/packages/genkit/pyproject.toml +++ b/py/packages/genkit/pyproject.toml @@ -54,6 +54,7 @@ dependencies = [ "uvloop>=0.21.0; sys_platform != 'win32'", "anyio>=4.9.0", "opentelemetry-instrumentation-logging>=0.60b1", + "websockets>=15.0", ] description = "Genkit AI Framework" license = "Apache-2.0" diff --git a/py/packages/genkit/src/genkit/ai/_base_async.py b/py/packages/genkit/src/genkit/ai/_base_async.py index 9e59f5fc35..b07bbfcca7 100644 --- a/py/packages/genkit/src/genkit/ai/_base_async.py +++ b/py/packages/genkit/src/genkit/ai/_base_async.py @@ -30,7 +30,11 @@ from genkit.core.environment import is_dev_environment from genkit.core.logging import get_logger from genkit.core.plugin import Plugin -from genkit.core.reflection import create_reflection_asgi_app +from genkit.core.reflection import ( + ReflectionClientV2, + get_reflection_v2_url, +) +from genkit.core.reflection_v1 import create_reflection_asgi_app from genkit.core.registry import Registry from genkit.web.manager._ports import find_free_port_sync @@ -145,8 +149,6 @@ async def run_user_coro_wrapper() -> None: finally: user_task_finished_event.set() - reflection_server = _make_reflection_server(self.registry, server_spec) - # Setup signal handlers for graceful shutdown (parity with JS) # Actually, anyio.run handles Ctrl+C (SIGINT) by raising KeyboardInterrupt/CancelledError @@ -162,65 +164,36 @@ async def handle_sigterm(tg_to_cancel: anyio.abc.TaskGroup) -> None: # type: ig return try: - # Use lazy_write=True to prevent race condition where file exists before server is up - async with RuntimeManager(server_spec, lazy_write=True) as runtime_manager: - # We use anyio.TaskGroup because it is compatible with - # asyncio's event loop and works with Python 3.10 - # (asyncio.TaskGroup was added in 3.11, and we can switch to - # that when we drop support for 3.10). + # Check if Reflection API v2 is enabled + v2_url = get_reflection_v2_url() + + if v2_url: + # Reflection API v2: Use WebSocket client connecting to runtime manager + client = ReflectionClientV2(self.registry, v2_url) + async with anyio.create_task_group() as tg: - # Start reflection server in the background. - tg.start_soon(reflection_server.serve, name='genkit-reflection-server') - await logger.ainfo(f'Started Genkit reflection server at {server_spec.url}') + # Start v2 client in background (handles its own reconnection) + tg.start_soon(client.run, name='genkit-reflection-v2-client') + await logger.ainfo(f'Started Genkit Reflection v2 client connecting to {v2_url}') # Start SIGTERM handler tg.start_soon(handle_sigterm, tg, name='genkit-sigterm-handler') - # Wait for server to be responsive - # We need to loop and poll the health endpoint or wait for uvicorn to be ready - # Since uvicorn run is blocking (but we are in a task), we can't easily hook into its startup - # unless we use uvicorn's server object directly which we do. - # reflection_server.started is set when uvicorn starts. - - # Simple polling loop - - max_retries = 20 # 2 seconds total roughly - for _i in range(max_retries): - try: - # TODO(#4334): Use async http client if available to avoid blocking loop? - # But we are in dev mode, so maybe okay. - # Actually we should use anyio.to_thread to avoid blocking event loop - # or assume standard lib urllib is fast enough for localhost. - - # Using sync urllib in async loop blocks the loop! - # We must use anyio.to_thread or a non-blocking check. - # But let's check if reflection_server object has a 'started' flag we can trust. - # uvicorn.Server has 'started' attribute but it might be internal state. - - # Let's stick to simple polling with to_thread for safety - def check_health() -> bool: - health_url = f'{server_spec.url}/api/__health' - with urllib.request.urlopen(health_url, timeout=0.5) as response: - return response.status == 200 - - is_healthy = await anyio.to_thread.run_sync(check_health) # type: ignore[attr-defined] - if is_healthy: - break - except Exception: - await anyio.sleep(0.1) - else: - logger.warning(f'Reflection server at {server_spec.url} did not become healthy in time.') - - # Now write the file (or verify it persisted) - _ = runtime_manager.write_runtime_file() - - # Start the (potentially short-lived) user coroutine wrapper + # Start the user coroutine tg.start_soon(run_user_coro_wrapper, name='genkit-user-coroutine') await logger.ainfo('Started Genkit user coroutine') # Block here until the task group is canceled (e.g. Ctrl+C) - # or a task raises an unhandled exception. It should not - # exit just because the user coroutine finishes. + # or a task raises an unhandled exception + + else: + # Reflection API v1: Start HTTP server + await _run_v1_reflection_server( + registry=self.registry, + server_spec=server_spec, + handle_sigterm=handle_sigterm, + run_user_coro_wrapper=run_user_coro_wrapper, + ) except anyio.get_cancelled_exc_class(): logger.info('Development server task group cancelled (e.g., Ctrl+C).') @@ -242,6 +215,78 @@ def check_health() -> bool: return anyio.run(dev_runner) +async def _run_v1_reflection_server( + registry: Registry, + server_spec: ServerSpec, + handle_sigterm: Any, # noqa: ANN401 - callback type is complex + run_user_coro_wrapper: Any, # noqa: ANN401 - callback type is complex +) -> None: + """Run the Reflection API v1 HTTP server with health checking. + + This function encapsulates all V1 server startup logic including: + - Creating and starting the uvicorn server + - Managing the runtime file lifecycle + - Polling for server health before writing the runtime file + - Starting the user coroutine and SIGTERM handler + + Args: + registry: The Genkit registry. + server_spec: Server specification (host, port, scheme). + handle_sigterm: Callback to handle SIGTERM signals. + run_user_coro_wrapper: The user's coroutine wrapped for execution. + """ + reflection_server = _make_reflection_server(registry, server_spec) + + # Use lazy_write=True to prevent race condition where file exists before server is up + async with RuntimeManager(server_spec, lazy_write=True) as runtime_manager: + async with anyio.create_task_group() as tg: + # Start reflection server in the background. + tg.start_soon(reflection_server.serve, name='genkit-reflection-server') + await logger.ainfo(f'Started Genkit reflection server at {server_spec.url}') + + # Start SIGTERM handler + tg.start_soon(handle_sigterm, tg, name='genkit-sigterm-handler') + + # Poll for server readiness before writing runtime file + await _wait_for_server_health(server_spec) + + # Now write the runtime file + _ = runtime_manager.write_runtime_file() + + # Start the user coroutine + tg.start_soon(run_user_coro_wrapper, name='genkit-user-coroutine') + await logger.ainfo('Started Genkit user coroutine') + + # Block here until the task group is canceled (e.g. Ctrl+C) + # or a task raises an unhandled exception + + +async def _wait_for_server_health(server_spec: ServerSpec, max_retries: int = 20) -> None: + """Wait for the reflection server to become healthy. + + Polls the health endpoint until it responds successfully or max retries reached. + + Args: + server_spec: Server specification with URL. + max_retries: Maximum number of retry attempts (default 20, ~2 seconds total). + """ + for _i in range(max_retries): + try: + + def check_health() -> bool: + health_url = f'{server_spec.url}/api/__health' + with urllib.request.urlopen(health_url, timeout=0.5) as response: + return response.status == 200 + + is_healthy = await anyio.to_thread.run_sync(check_health) # type: ignore[attr-defined] + if is_healthy: + return + except Exception: + await anyio.sleep(0.1) + + logger.warning(f'Reflection server at {server_spec.url} did not become healthy in time.') + + def _make_reflection_server(registry: Registry, spec: ServerSpec) -> uvicorn.Server: """Make a reflection server for the given registry and spec. diff --git a/py/packages/genkit/src/genkit/core/constants.py b/py/packages/genkit/src/genkit/core/constants.py index 77cbb4c17d..a76e21b14f 100644 --- a/py/packages/genkit/src/genkit/core/constants.py +++ b/py/packages/genkit/src/genkit/core/constants.py @@ -23,3 +23,7 @@ GENKIT_VERSION = DEFAULT_GENKIT_VERSION GENKIT_CLIENT_HEADER = f'genkit-python/{DEFAULT_GENKIT_VERSION}' + +# Reflection API specification version. +# This should match the value in JS (genkit-tools). +GENKIT_REFLECTION_API_SPEC_VERSION = 1 diff --git a/py/packages/genkit/src/genkit/core/reflection.py b/py/packages/genkit/src/genkit/core/reflection.py index ace57ef2dd..8a4f9df72e 100644 --- a/py/packages/genkit/src/genkit/core/reflection.py +++ b/py/packages/genkit/src/genkit/core/reflection.py @@ -14,568 +14,729 @@ # # SPDX-License-Identifier: Apache-2.0 -"""Development API for inspecting and interacting with Genkit. - -This module provides a reflection API server for inspection and interaction -during development. It exposes endpoints for health checks, action discovery, -and action execution. - -## Caveats - -The reflection API server predates the flows server implementation and differs -in the protocol it uses to interface with the Dev UI. The streaming protocol -uses unadorned JSON per streamed chunk. This may change in the future to use -Server-Sent Events (SSE). - -## Key endpoints - - | Method | Path | Handler | - |--------|---------------------|-----------------------| - | GET | /api/__health | Health check | - | GET | /api/actions | List actions | - | POST | /api/__quitquitquit | Trigger shutdown | - | POST | /api/notify | Handle notification | - | POST | /api/runAction | Run action (streaming)| +"""Reflection API v2 client using WebSocket and JSON-RPC 2.0. + +This module implements a WebSocket-based client that connects to a Genkit +runtime manager server. Unlike v1 which starts an HTTP server, v2 acts as +a client connecting to a centralized manager. + +Key Concepts (ELI5):: + + ┌─────────────────────┬────────────────────────────────────────────────┐ + │ Concept │ ELI5 Explanation │ + ├─────────────────────┼────────────────────────────────────────────────┤ + │ Reflection API v1 │ Genkit starts a server, tools connect to it. │ + │ │ Like opening a shop and waiting for customers. │ + ├─────────────────────┼────────────────────────────────────────────────┤ + │ Reflection API v2 │ Genkit connects to a manager as a client. │ + │ │ Like calling the headquarters to report in. │ + ├─────────────────────┼────────────────────────────────────────────────┤ + │ JSON-RPC 2.0 │ A simple protocol for remote procedure calls. │ + │ │ Like a structured phone conversation. │ + ├─────────────────────┼────────────────────────────────────────────────┤ + │ WebSocket │ A persistent two-way connection. │ + │ │ Like keeping a phone line open. │ + └─────────────────────┴────────────────────────────────────────────────┘ + +Architecture Comparison:: + + ┌─────────────────────────────────────────────────────────────────────────┐ + │ Reflection API V1 (HTTP Server) │ + └─────────────────────────────────────────────────────────────────────────┘ + + Genkit Runtime starts an HTTP server; CLI/DevUI connect to it: + + ┌─────────────────────┐ ┌─────────────────────┐ + │ Genkit CLI │ │ Dev UI │ + │ (Client) │ │ (Client) │ + └──────────┬──────────┘ └──────────┬──────────┘ + │ │ + │ HTTP Requests │ + │ (GET /api/actions, etc) │ + │ │ + ▼ ▼ + ┌───────────────────────────────────────────────────────┐ + │ Genkit Runtime │ + │ ┌────────────────────┐ │ + │ │ HTTP Server │ │ + │ │ (port 3100) │ │ + │ └────────────────────┘ │ + │ ┌────────────────────┐ │ + │ │ Registry │ │ + │ │ (Actions, Flows) │ │ + │ └────────────────────┘ │ + └───────────────────────────────────────────────────────┘ + + Discovery: Runtime writes file to ~/.genkit/{runtimeId}.runtime.json + Connection: CLI reads file, finds port, connects via HTTP + + ┌─────────────────────────────────────────────────────────────────────────┐ + │ Reflection API V2 (WebSocket) │ + └─────────────────────────────────────────────────────────────────────────┘ + + CLI acts as WebSocket server; Genkit Runtimes connect as clients: + + ┌───────────────────────────────────────────────────────┐ + │ Runtime Manager │ + │ (CLI WebSocket Server) │ + │ ┌────────────────────┐ │ + │ │ WebSocket Server │ │ + │ │ (port 4100) │ │ + │ └────────────────────┘ │ + │ ┌────────────────────┐ │ + │ │ Dev UI │ │ + │ └────────────────────┘ │ + └───────────────────────────────────────────────────────┘ + ▲ ▲ ▲ + │ │ │ + WebSocket │ │ │ WebSocket + Connect │ │ │ Connect + │ │ │ + ┌───────────────┴───┐ ┌─────┴─────┐ ┌───┴───────────────┐ + │ Genkit Runtime │ │ Runtime │ │ Genkit Runtime │ + │ (Python app) │ │ (JS app) │ │ (Go app) │ + └───────────────────┘ └───────────┘ └───────────────────┘ + + Discovery: Runtime reads GENKIT_REFLECTION_V2_SERVER env var + Connection: Runtime connects outbound to Manager via WebSocket + +Data Flow (V2):: + + Genkit Runtime Runtime Manager Server + │ │ + │ ──── WebSocket Connect ────► │ + │ │ + │ ──── register (JSON-RPC) ────► │ + │ │ + │ ◄──── configure notification ── │ + │ │ + │ ◄──── listActions request ──── │ + │ ──── response with actions ────► │ + │ │ + │ ◄──── runAction request ──── │ + │ ──── runActionState notif ────► │ (sends traceId early) + │ ──── streamChunk notification ──► │ (if streaming) + │ ──── response with result ────► │ + │ │ + │ ◄──── cancelAction request ──── │ + │ ──── response (cancelled) ────► │ + │ │ + +Protocol Methods (V2):: + + ┌──────────────────┬─────────────────┬─────────┬─────────────────────────┐ + │ Method │ Direction │ Type │ Description │ + ├──────────────────┼─────────────────┼─────────┼─────────────────────────┤ + │ register │ Runtime→Manager │ Notif │ Register runtime info │ + │ configure │ Manager→Runtime │ Notif │ Push config (telemetry) │ + │ listActions │ Manager→Runtime │ Request │ List available actions │ + │ listValues │ Manager→Runtime │ Request │ List values by type │ + │ runAction │ Manager→Runtime │ Request │ Execute an action │ + │ runActionState │ Runtime→Manager │ Notif │ Send traceId early │ + │ streamChunk │ Runtime→Manager │ Notif │ Stream output chunk │ + │ cancelAction │ Manager→Runtime │ Request │ Cancel running action │ + └──────────────────┴─────────────────┴─────────┴─────────────────────────┘ + +Environment Variables: + GENKIT_REFLECTION_V2_SERVER: WebSocket URL to connect to (e.g., ws://localhost:4100) + GENKIT_TELEMETRY_SERVER: Optional telemetry server URL + +Example: + >>> import asyncio + >>> from genkit.core.reflection import ReflectionClientV2 + >>> async def main(): + ... client = ReflectionClientV2(registry, 'ws://localhost:4100') + ... await client.run() + +See Also: + - RFC: https://github.com/firebase/genkit/pull/4211 + - V1 HTTP server implementation: genkit.core.reflection_v1 """ from __future__ import annotations import asyncio import json -from collections.abc import AsyncGenerator, Callable -from typing import Any, cast - -from starlette.applications import Starlette -from starlette.middleware import Middleware -from starlette.middleware.cors import CORSMiddleware -from starlette.requests import Request -from starlette.responses import JSONResponse, StreamingResponse -from starlette.routing import Route - -from genkit.codec import dump_dict, dump_json -from genkit.core.action import Action +import os +import traceback +from collections.abc import Callable +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any + +import websockets + +from genkit.codec import dump_dict from genkit.core.action.types import ActionKind -from genkit.core.constants import DEFAULT_GENKIT_VERSION +from genkit.core.constants import ( + DEFAULT_GENKIT_VERSION, + GENKIT_REFLECTION_API_SPEC_VERSION, +) from genkit.core.error import get_reflection_json from genkit.core.logging import get_logger -from genkit.core.registry import Registry -from genkit.web.manager.signals import terminate_all_servers -from genkit.web.requests import ( - is_streaming_requested, -) -from genkit.web.typing import ( - Application, - StartupHandler, -) + +if TYPE_CHECKING: + from genkit.core.action import Action + from genkit.core.registry import Registry logger = get_logger(__name__) +# Environment variable for v2 server URL +GENKIT_REFLECTION_V2_SERVER_ENV = 'GENKIT_REFLECTION_V2_SERVER' -def _list_registered_actions(registry: Registry) -> dict[str, Action]: - """Return all locally registered actions keyed as `//`.""" - registered: dict[str, Action] = {} - for kind in ActionKind.__members__.values(): - for name, action in registry.get_actions_by_kind(kind).items(): - registered[f'/{kind.value}/{name}'] = action - return registered +@dataclass +class JsonRpcRequest: + """JSON-RPC 2.0 request or notification.""" -def _build_actions_payload( - *, - registered_actions: dict[str, Action], - plugin_metas: list[Any], -) -> dict[str, dict[str, Any]]: - """Build payload for GET /api/actions.""" - actions: dict[str, dict[str, Any]] = {} + jsonrpc: str = '2.0' + method: str = '' + params: dict[str, Any] | list[Any] | None = None + id: int | str | None = None - # 1) Registered actions (flows/tools/etc). - for key, action in registered_actions.items(): - actions[key] = { - 'key': key, - 'name': action.name, - 'type': action.kind.value, - 'description': action.description, - 'inputSchema': action.input_schema, - 'outputSchema': action.output_schema, - 'metadata': action.metadata, - } + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + result: dict[str, Any] = {'jsonrpc': self.jsonrpc, 'method': self.method} + if self.params is not None: + result['params'] = self.params + if self.id is not None: + result['id'] = self.id + return result - # 2) Plugin-advertised actions (may not be registered yet). - for meta in plugin_metas or []: - try: - key = f'/{meta.kind.value}/{meta.name}' - except Exception as exc: - # Defensive: skip unexpected plugin metadata objects. - logger.warning('Skipping invalid plugin action metadata', error=str(exc)) - continue - advertised = { - 'key': key, - 'name': meta.name, - 'type': meta.kind.value, - 'description': getattr(meta, 'description', None), - 'inputSchema': getattr(meta, 'input_json_schema', None), - 'outputSchema': getattr(meta, 'output_json_schema', None), - 'metadata': getattr(meta, 'metadata', None), - } +@dataclass +class JsonRpcError: + """JSON-RPC 2.0 error object.""" - if key not in actions: - actions[key] = advertised - continue + code: int + message: str + data: Any = None - # Merge into the existing (registered) action entry; prefer registered data. - existing = actions[key] + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + result: dict[str, Any] = {'code': self.code, 'message': self.message} + if self.data is not None: + result['data'] = self.data + return result - if not existing.get('description') and advertised.get('description'): - existing['description'] = advertised['description'] - if not existing.get('inputSchema') and advertised.get('inputSchema'): - existing['inputSchema'] = advertised['inputSchema'] +@dataclass +class JsonRpcResponse: + """JSON-RPC 2.0 response.""" - if not existing.get('outputSchema') and advertised.get('outputSchema'): - existing['outputSchema'] = advertised['outputSchema'] + jsonrpc: str = '2.0' + result: Any = None + error: JsonRpcError | None = None + id: int | str | None = None - existing_meta = existing.get('metadata') or {} - advertised_meta = advertised.get('metadata') or {} - if isinstance(existing_meta, dict) and isinstance(advertised_meta, dict): - # Prefer registered action metadata on key conflicts. - existing['metadata'] = {**advertised_meta, **existing_meta} + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + response: dict[str, Any] = {'jsonrpc': self.jsonrpc, 'id': self.id} + if self.error is not None: + response['error'] = self.error.to_dict() + else: + response['result'] = self.result + return response - return actions +@dataclass +class ActiveAction: + """Represents an in-flight action that can be cancelled.""" -def create_reflection_asgi_app( - registry: Registry, - on_app_startup: StartupHandler | None = None, - on_app_shutdown: StartupHandler | None = None, - version: str = DEFAULT_GENKIT_VERSION, - _encoding: str = 'utf-8', -) -> Application: - """Create and return a ASGI application for the Genkit reflection API. + cancel: Callable[[], None] + start_time: float + trace_id: str + task: asyncio.Task[Any] | None = None - Caveats: - The reflection API server predates the flows server implementation and - differs in the protocol it uses to interface with the Dev UI. The - streaming protocol uses unadorned JSON per streamed chunk. This may - change in the future to use Server-Sent Events (SSE). +@dataclass +class ActiveActionsMap: + """Thread-safe map of active actions.""" - Key endpoints: + _actions: dict[str, ActiveAction] = field(default_factory=dict) + _lock: asyncio.Lock = field(default_factory=asyncio.Lock) - | Method | Path | Handler | - |--------|---------------------|-----------------------| - | GET | /api/__health | Health check | - | GET | /api/actions | List actions | - | POST | /api/__quitquitquit | Trigger shutdown | - | POST | /api/notify | Handle notification | - | POST | /api/runAction | Run action (streaming)| + async def set(self, trace_id: str, action: ActiveAction) -> None: + """Add an active action.""" + async with self._lock: + self._actions[trace_id] = action - Args: - registry: The registry to use for the reflection server. - on_app_startup: Optional callback to execute when the app's - lifespan starts. Must be an async function. - on_app_shutdown: Optional callback to execute when the app's - lifespan ends. Must be an async function. - version: The version string to use when setting the value of - the X-GENKIT-VERSION HTTP header. - encoding: The text encoding to use; default 'utf-8'. + async def get(self, trace_id: str) -> ActiveAction | None: + """Get an active action by trace ID.""" + async with self._lock: + return self._actions.get(trace_id) - Returns: - An ASGI application configured with the given registry. - """ + async def delete(self, trace_id: str) -> None: + """Remove an active action.""" + async with self._lock: + self._actions.pop(trace_id, None) - async def handle_health_check(_request: Request) -> JSONResponse: - """Handle health check requests. - Args: - _request: The Starlette request object (unused). +class ReflectionClientV2: + """Reflection API v2 client using WebSocket and JSON-RPC 2.0. - Returns: - A JSON response with status code 200. - """ - return JSONResponse(content={'status': 'OK'}) + This client connects to a Genkit runtime manager server and handles + requests for listing actions, running actions, and other reflection + operations. - async def handle_terminate(_request: Request) -> JSONResponse: - """Handle the quit endpoint. + Attributes: + registry: The Genkit registry containing actions and values. + url: The WebSocket URL to connect to. + active_actions: Map of currently running actions for cancellation. + """ - Args: - _request: The Starlette request object (unused). + def __init__( + self, + registry: Registry, + url: str, + *, + version: str = DEFAULT_GENKIT_VERSION, + configured_envs: list[str] | None = None, + ) -> None: + """Initialize the Reflection v2 client. - Returns: - An empty JSON response with status code 200. + Args: + registry: The Genkit registry. + url: WebSocket URL to connect to. + version: Genkit version string. + configured_envs: List of configured environments. """ - await logger.ainfo('Shutting down servers...') - terminate_all_servers() - return JSONResponse(content={'status': 'OK'}) + self._registry = registry + self._url = url + self._version = version + self._configured_envs = configured_envs or ['dev'] + self._ws: Any = None # WebSocket connection + self._active_actions = ActiveActionsMap() + self._running = False + self._reconnect_delay = 1.0 # seconds + self._max_reconnect_delay = 60.0 # maximum delay for exponential backoff + + @property + def runtime_id(self) -> str: + """Generate a unique runtime ID based on process ID.""" + return str(os.getpid()) + + async def run(self) -> None: + """Run the reflection client with automatic reconnection. + + This method will continuously try to connect to the server and + handle messages. If the connection drops, it will attempt to + reconnect after a delay. + """ + self._running = True + logger.info(f'Connecting to Reflection v2 server: {self._url}') - async def handle_list_actions(_request: Request) -> JSONResponse: - """Handle the request for listing available actions. + while self._running: + try: + async with websockets.connect(self._url) as ws: + self._ws = ws + self._reconnect_delay = 1.0 # Reset delay on successful connection + logger.info('Connected to Reflection v2 server') + + # Register immediately upon connection + await self._register() + + # Handle messages + async for message in ws: + if isinstance(message, bytes): + message = message.decode('utf-8') + asyncio.create_task(self._handle_message(message)) + + except asyncio.CancelledError: + logger.debug('Reflection v2 client cancelled') + break + except Exception as e: + delay = self._reconnect_delay + logger.debug(f'Failed to connect to Reflection v2 server, retrying in {delay:.1f}s: {e}') + self._ws = None + await asyncio.sleep(self._reconnect_delay) + self._reconnect_delay = min(self._reconnect_delay * 2, self._max_reconnect_delay) + + self._running = False + logger.info('Disconnected from Reflection v2 server') + + async def stop(self) -> None: + """Stop the reflection client.""" + self._running = False + if self._ws: + await self._ws.close() + + async def _register(self) -> None: + """Send registration message to the server.""" + request = JsonRpcRequest( + method='register', + params={ + 'id': self.runtime_id, + 'name': self.runtime_id, + 'pid': os.getpid(), + 'genkitVersion': f'python/{self._version}', + 'reflectionApiSpecVersion': GENKIT_REFLECTION_API_SPEC_VERSION, + 'envs': self._configured_envs, + }, + ) + await self._send(request.to_dict()) - Args: - _request: The Starlette request object (unused). + async def _send(self, message: dict[str, Any]) -> None: + """Send a message to the server.""" + if self._ws is None: + raise RuntimeError('WebSocket not connected') - Returns: - A JSON response containing all serializable actions. - """ - registered = _list_registered_actions(registry) - metas = await registry.list_actions() - actions = _build_actions_payload(registered_actions=registered, plugin_metas=metas) - - return JSONResponse( - content=actions, - status_code=200, - headers={'x-genkit-version': version}, - ) + data = json.dumps(message) + logger.debug(f'Sending v2 message: {data}') + await self._ws.send(data) - async def handle_list_values(request: Request) -> JSONResponse: - """Handle the request for listing registered values. + async def _handle_message(self, data: str) -> None: + """Handle an incoming message from the server.""" + logger.debug(f'Received v2 message: {data}') - Args: - request: The Starlette request object. + try: + msg = json.loads(data) + except json.JSONDecodeError as e: + logger.error(f'Failed to parse JSON-RPC message: {e}') + return + + method = msg.get('method', '') + msg_id = msg.get('id') + + if method: + if msg_id is not None: + # Request (has ID, expects response) + await self._handle_request(msg) + else: + # Notification (no ID, no response expected) + await self._handle_notification(msg) + elif msg_id is not None: + # Response to a request we sent + logger.debug(f'Received response for id={msg_id}') - Returns: - A JSON response containing value names. - """ - kind = request.query_params.get('type') - if not kind: - return JSONResponse(content='Query parameter "type" is required.', status_code=400) + async def _handle_request(self, msg: dict[str, Any]) -> None: + """Handle an incoming JSON-RPC request.""" + method = msg.get('method', '') + params = msg.get('params', {}) + msg_id = msg.get('id') - if kind != 'defaultModel': - return JSONResponse( - content=f"'type' {kind} is not supported. Only 'defaultModel' is supported", status_code=400 + result: Any = None + error: JsonRpcError | None = None + + try: + if method == 'listActions': + result = await self._handle_list_actions() + elif method == 'listValues': + result = await self._handle_list_values(params) + elif method == 'runAction': + # runAction handles its own response + await self._handle_run_action(msg) + return + elif method == 'cancelAction': + result, error = await self._handle_cancel_action(params) + else: + error = JsonRpcError( + code=-32601, + message=f'Method not found: {method}', + ) + except Exception as e: + logger.exception(f'Error handling request {method}') + error = JsonRpcError( + code=-32000, + message=str(e), + data={'stack': traceback.format_exc()}, ) - values = registry.list_values(kind) - return JSONResponse(content=values, status_code=200) + # Send response + response = JsonRpcResponse( + id=msg_id, + result=result, + error=error, + ) + await self._send(response.to_dict()) + + async def _handle_notification(self, msg: dict[str, Any]) -> None: + """Handle an incoming JSON-RPC notification.""" + method = msg.get('method', '') + params = msg.get('params', {}) - async def handle_list_envs(_request: Request) -> JSONResponse: - """Handle the request for listing environments. + if method == 'configure': + await self._handle_configure(params) + else: + logger.debug(f'Unknown notification: {method}') - Args: - _request: The Starlette request object (unused). + async def _handle_list_actions(self) -> dict[str, dict[str, Any]]: + """Handle listActions request. Returns: - A JSON response containing environments. + Dictionary of action descriptors keyed by action key. """ - return JSONResponse(content=['dev'], status_code=200) + actions: dict[str, dict[str, Any]] = {} - async def handle_notify(_request: Request) -> JSONResponse: - """Handle the notification endpoint. + # Get registered actions + for kind in ActionKind.__members__.values(): + for name, action in self._registry.get_actions_by_kind(kind).items(): + key = f'/{kind.value}/{name}' + actions[key] = self._action_to_desc(action, key) - Args: - _request: The Starlette request object (unused). + # Get plugin-advertised actions + metas = await self._registry.list_actions() + for meta in metas or []: + try: + key = f'/{meta.kind.value}/{meta.name}' + if key not in actions: + actions[key] = { + 'key': key, + 'name': meta.name, + 'type': meta.kind.value, + 'description': getattr(meta, 'description', None), + 'inputSchema': getattr(meta, 'input_json_schema', None), + 'outputSchema': getattr(meta, 'output_json_schema', None), + 'metadata': getattr(meta, 'metadata', None), + } + except Exception as e: + logger.warning(f'Skipping invalid plugin action metadata: {e}') - Returns: - An empty JSON response with status code 200. - """ - return JSONResponse( - content={}, - status_code=200, - headers={'x-genkit-version': version}, - ) + return actions - # Map of active actions indexed by trace ID for cancellation support. - active_actions: dict[str, asyncio.Task[Any]] = {} + def _action_to_desc(self, action: Action, key: str) -> dict[str, Any]: + """Convert an Action to an action descriptor dictionary.""" + return { + 'key': key, + 'name': action.name, + 'type': action.kind.value, + 'description': action.description, + 'inputSchema': action.input_schema, + 'outputSchema': action.output_schema, + 'metadata': action.metadata, + } - async def handle_cancel_action(request: Request) -> JSONResponse: - """Handle the cancelAction endpoint. + async def _handle_list_values(self, params: dict[str, Any]) -> list[str]: + """Handle listValues request. Args: - request: The Starlette request object. + params: Request parameters containing 'type'. Returns: - A JSON response. - """ - try: - payload = await request.json() - trace_id = payload.get('traceId') - if not trace_id: - return JSONResponse(content={'error': 'traceId is required'}, status_code=400) - - task = active_actions.get(trace_id) - if task: - _ = task.cancel() - return JSONResponse(content={'message': 'Action cancelled'}, status_code=200) - else: - return JSONResponse(content={'message': 'Action not found or already completed'}, status_code=404) - except Exception as e: - logger.error(f'Error cancelling action: {e}', exc_info=True) - return JSONResponse( - content={'error': 'An unexpected error occurred while cancelling the action.'}, - status_code=500, - ) - - async def handle_run_action( - request: Request, - ) -> JSONResponse | StreamingResponse: - """Handle the runAction endpoint for executing registered actions. + List of value names. - Flow: - 1. Reads and validates the request payload - 2. Looks up the requested action - 3. Executes the action with the provided input - 4. Returns the action result as JSON with trace ID + Raises: + ValueError: If type parameter is missing or unsupported. + """ + value_type = params.get('type') - Args: - request: The Starlette request object. + if not value_type: + raise ValueError("The 'type' parameter is required for listValues.") - Returns: - A JSON or StreamingResponse with the action result, or an error - response. - """ - # Get the action using async resolve. - payload = await request.json() - action = await registry.resolve_action_by_key(payload['key']) - if action is None: - return JSONResponse( - content={'error': f'Action not found: {payload["key"]}'}, - status_code=404, + if value_type != 'defaultModel': + raise ValueError( + f"Value type '{value_type}' is not supported. " + "Only 'defaultModel' is currently supported." ) - # Run the action. - context = payload.get('context', {}) - action_input = payload.get('input') - stream = is_streaming_requested(request) + return self._registry.list_values(value_type) - # Wrap execution to track the task for cancellation support - task = asyncio.current_task() + async def _handle_run_action(self, msg: dict[str, Any]) -> None: + """Handle runAction request with streaming support. - def on_trace_start(trace_id: str) -> None: - if task: - active_actions[trace_id] = task + This method handles its own response sending since it needs to + send intermediate notifications for streaming and telemetry. - handler = run_streaming_action if stream else run_standard_action + Args: + msg: The JSON-RPC request message. + """ + msg_id = msg.get('id') + params = msg.get('params', {}) - try: - return await handler(action, payload, action_input, context, version, on_trace_start) - except asyncio.CancelledError: - logger.info('Action execution cancelled.') - # Can't really send response if cancelled? Starlette/uvicorn closes connection? - # Just raise. - raise - - async def run_streaming_action( - action: Action, - payload: dict[str, Any], - _action_input: object, - context: dict[str, Any], - version: str, - on_trace_start: Callable[[str], None], - ) -> StreamingResponse: - """Handle streaming action execution with early header flushing. - - Uses early header flushing to send X-Genkit-Trace-Id immediately when - the trace starts, enabling the Dev UI to subscribe to SSE for real-time - trace updates. + key = params.get('key', '') + action_input = params.get('input') + context = params.get('context', {}) + stream = params.get('stream', False) - Args: - action: The action to execute. - payload: Request payload with input data. - action_input: The input for the action. - context: Execution context. - version: The Genkit version header value. - on_trace_start: Callback for trace start. + # Look up action + action = await self._registry.resolve_action_by_key(key) + if action is None: + await self._send_error(msg_id, -32602, f'Action not found: {key}') + return - Returns: - A StreamingResponse with JSON chunks containing result or error - events. - """ - # Use a queue to pass chunks from the callback to the generator - chunk_queue: asyncio.Queue[str | None] = asyncio.Queue() + # Get the current task to allow for cancellation + current_task = asyncio.current_task() - # Event to signal when trace ID is available - trace_id_event: asyncio.Event = asyncio.Event() + # Track trace ID for telemetry run_trace_id: str | None = None + sent_trace_ids: set[str] = set() - def wrapped_on_trace_start(tid: str) -> None: + async def on_trace_start(tid: str) -> None: nonlocal run_trace_id + if tid in sent_trace_ids: + return + sent_trace_ids.add(tid) run_trace_id = tid - on_trace_start(tid) - trace_id_event.set() # Signal that trace ID is ready - - async def run_action_task() -> None: - """Run the action and put chunks on the queue.""" - try: - def send_chunk(chunk: Any) -> None: # noqa: ANN401 - """Callback that puts chunks on the queue.""" - out = dump_json(chunk) - chunk_queue.put_nowait(f'{out}\n') + # Register active action with task cancellation + # Wrap cancel() in lambda to discard bool return value (expected: () -> None) + def cancel_fn() -> None: + if current_task: + _ = current_task.cancel() + + await self._active_actions.set( + tid, + ActiveAction( + cancel=cancel_fn, + start_time=asyncio.get_event_loop().time(), + trace_id=tid, + task=current_task, + ), + ) - output = await action.arun_raw( - raw_input=payload.get('input'), - on_chunk=send_chunk, - context=context, - on_trace_start=wrapped_on_trace_start, + # Send runActionState notification + notification = JsonRpcRequest( + method='runActionState', + params={ + 'requestId': msg_id, + 'state': {'traceId': tid}, + }, + ) + await self._send(notification.to_dict()) + + # Streaming callback + async def send_chunk(chunk: Any) -> None: # noqa: ANN401 + if stream: + notification = JsonRpcRequest( + method='streamChunk', + params={ + 'requestId': msg_id, + 'chunk': dump_dict(chunk), + }, ) - final_response = { + await self._send(notification.to_dict()) + + # Set up synchronous wrapper for on_trace_start + # (action.arun_raw expects sync callback currently) + def sync_on_trace_start(tid: str) -> None: + asyncio.create_task(on_trace_start(tid)) + + # Synchronous chunk callback wrapper + def sync_send_chunk(chunk: Any) -> None: # noqa: ANN401 + asyncio.create_task(send_chunk(chunk)) + + try: + output = await action.arun_raw( + raw_input=action_input, + on_chunk=sync_send_chunk if stream else None, + context=context, + on_trace_start=sync_on_trace_start, + ) + + # Clean up active action + if run_trace_id: + await self._active_actions.delete(run_trace_id) + + # Send success response + await self._send_response( + msg_id, + { 'result': dump_dict(output.response), 'telemetry': {'traceId': output.trace_id}, - } - chunk_queue.put_nowait(json.dumps(final_response)) + }, + ) - except Exception as e: - error_response = get_reflection_json(e).model_dump(by_alias=True) - # Log with exc_info for pretty exception output via rich/structlog - logger.exception('Error streaming action', exc_info=e) - # Error response also should not have trailing newline (final message) - chunk_queue.put_nowait(json.dumps(error_response)) - # Ensure trace_id_event is set even on error - trace_id_event.set() - - finally: - if not trace_id_event.is_set(): - trace_id_event.set() - # Signal end of stream - chunk_queue.put_nowait(None) - if run_trace_id: - _ = active_actions.pop(run_trace_id, None) - - # Start the action task immediately so trace ID becomes available ASAP - action_task = asyncio.create_task(run_action_task()) - - # Wait for trace ID before returning response - this enables early header flushing - _ = await trace_id_event.wait() - - # Now we have the trace ID, include it in headers - headers = { - 'x-genkit-version': version, - 'Transfer-Encoding': 'chunked', - } - if run_trace_id: - headers['X-Genkit-Trace-Id'] = run_trace_id # pyright: ignore[reportUnreachable] + except asyncio.CancelledError: + logger.info(f'Action {key} with traceId {run_trace_id} was cancelled.') + if run_trace_id: + await self._active_actions.delete(run_trace_id) + await self._send_error( + msg_id, + -32000, + 'Action was cancelled by request.', + data={'traceId': run_trace_id} if run_trace_id else None, + ) - async def stream_generator() -> AsyncGenerator[str, None]: - """Yield chunks from the queue as they arrive.""" - try: - while True: - chunk = await chunk_queue.get() - if chunk is None: - break - yield chunk - finally: - # Cancel task if still running (no-op if already done) - _ = action_task.cancel() - - return StreamingResponse( - stream_generator(), - # Reflection server uses text/plain for streaming (not SSE format) - # to match Go implementation - media_type='text/plain', - headers=headers, - ) + except Exception as e: + logger.exception(f'Error running action {key}') - async def run_standard_action( - action: Action, - payload: dict[str, Any], - _action_input: object, - context: dict[str, Any], - version: str, - on_trace_start: Callable[[str], None], - ) -> StreamingResponse: - """Handle standard (non-streaming) action execution with early header flushing. + # Clean up active action + if run_trace_id: + await self._active_actions.delete(run_trace_id) - Uses StreamingResponse to enable sending the X-Genkit-Trace-Id header - immediately when the trace starts, allowing the Dev UI to subscribe to - the SSE stream for real-time trace updates. + # Send error response + error_data = get_reflection_json(e).model_dump(by_alias=True) + if run_trace_id: + error_data.setdefault('details', {})['traceId'] = run_trace_id + + await self._send_error( + msg_id, + -32000, + str(e), + data=error_data, + ) + + async def _handle_cancel_action(self, params: dict[str, Any]) -> tuple[dict[str, str] | None, JsonRpcError | None]: + """Handle cancelAction request. Args: - action: The action to execute. - payload: Request payload with input data. - action_input: The input for the action. - context: Execution context. - version: The Genkit version header value. - on_trace_start: Callback for trace start. + params: Request parameters containing 'traceId'. Returns: - A StreamingResponse that flushes headers early. + Tuple of (result, error). """ - # Event to signal when trace ID is available - trace_id_event: asyncio.Event = asyncio.Event() - run_trace_id: str | None = None - action_result: dict[str, Any] | None = None - action_error: Exception | None = None + trace_id = params.get('traceId', '') - def wrapped_on_trace_start(tid: str) -> None: - nonlocal run_trace_id - run_trace_id = tid - on_trace_start(tid) - trace_id_event.set() # Signal that trace ID is ready + if not trace_id: + return None, JsonRpcError(code=-32602, message='traceId is required') - async def run_action_and_get_result() -> None: - nonlocal action_result, action_error - try: - output = await action.arun_raw( - raw_input=payload.get('input'), - context=context, - on_trace_start=wrapped_on_trace_start, - ) - action_result = { - 'result': dump_dict(output.response), - 'telemetry': {'traceId': output.trace_id}, - } - except Exception as e: - action_error = e - finally: - if not trace_id_event.is_set(): - trace_id_event.set() - - # Start the action immediately so trace ID becomes available ASAP - action_task = asyncio.create_task(run_action_and_get_result()) - - # Wait for trace ID before returning response - _ = await trace_id_event.wait() - - # Now return streaming response - headers will include trace ID - async def body_generator() -> AsyncGenerator[bytes, None]: - # Wait for action to complete - await action_task - - if action_error: - error_response = get_reflection_json(action_error).model_dump(by_alias=True) - # Log with exc_info for pretty exception output via rich/structlog - logger.exception('Error executing action', exc_info=action_error) - yield json.dumps(error_response).encode('utf-8') - else: - yield json.dumps(action_result).encode('utf-8') + action = await self._active_actions.get(trace_id) + if action is None: + return None, JsonRpcError( + code=-32004, # JSON-RPC implementation-defined server error + message='Action not found or already completed', + ) - if run_trace_id: - _ = active_actions.pop(run_trace_id, None) + # Cancel the action + action.cancel() + await self._active_actions.delete(trace_id) - headers = { - 'x-genkit-version': version, - } - if run_trace_id: - headers['X-Genkit-Trace-Id'] = run_trace_id # pyright: ignore[reportUnreachable] + return {'message': 'Action cancelled'}, None + + async def _handle_configure(self, params: dict[str, Any]) -> None: + """Handle configure notification. - return StreamingResponse( - body_generator(), - media_type='application/json', - headers=headers, + Args: + params: Configuration parameters. + """ + telemetry_url = params.get('telemetryServerUrl', '') + + if not os.environ.get('GENKIT_TELEMETRY_SERVER') and telemetry_url: + # TODO(#4401): Implement telemetry server URL configuration + logger.debug(f'Telemetry server URL configured: {telemetry_url}') + + async def _send_response(self, msg_id: int | str | None, result: object) -> None: + """Send a success response.""" + response = JsonRpcResponse(id=msg_id, result=result) + await self._send(response.to_dict()) + + async def _send_error( + self, + msg_id: int | str | None, + code: int, + message: str, + data: object = None, + ) -> None: + """Send an error response.""" + response = JsonRpcResponse( + id=msg_id, + error=JsonRpcError(code=code, message=message, data=data), ) + await self._send(response.to_dict()) - app = Starlette( - routes=[ - Route('/api/__health', handle_health_check, methods=['GET']), - Route('/api/__quitquitquit', handle_terminate, methods=['GET', 'POST']), # Support both for parity - Route('/api/actions', handle_list_actions, methods=['GET']), - Route('/api/values', handle_list_values, methods=['GET']), - Route('/api/envs', handle_list_envs, methods=['GET']), - Route('/api/notify', handle_notify, methods=['POST']), - Route('/api/runAction', handle_run_action, methods=['POST']), - Route('/api/cancelAction', handle_cancel_action, methods=['POST']), - ], - middleware=[ - Middleware( - CORSMiddleware, # type: ignore[arg-type] - allow_origins=['*'], - allow_methods=['*'], - allow_headers=['*'], - expose_headers=['X-Genkit-Trace-Id', 'X-Genkit-Span-Id', 'x-genkit-version'], - ) - ], - on_startup=[on_app_startup] if on_app_startup else [], - on_shutdown=[on_app_shutdown] if on_app_shutdown else [], - ) - app.active_actions = active_actions # type: ignore[attr-defined] - return cast(Application, app) + +def is_reflection_v2_enabled() -> bool: + """Check if Reflection API v2 is enabled. + + Returns: + True if GENKIT_REFLECTION_V2_SERVER is set, False otherwise. + """ + return bool(os.environ.get(GENKIT_REFLECTION_V2_SERVER_ENV)) + + +def get_reflection_v2_url() -> str | None: + """Get the Reflection API v2 server URL. + + Returns: + The WebSocket URL if set, None otherwise. + """ + return os.environ.get(GENKIT_REFLECTION_V2_SERVER_ENV) diff --git a/py/packages/genkit/src/genkit/core/reflection_v1.py b/py/packages/genkit/src/genkit/core/reflection_v1.py new file mode 100644 index 0000000000..eaf9edfae3 --- /dev/null +++ b/py/packages/genkit/src/genkit/core/reflection_v1.py @@ -0,0 +1,648 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Reflection API v1: HTTP-based server for Genkit development tools. + +This module provides an HTTP reflection server that exposes endpoints for +health checks, action discovery, and action execution. The CLI and Dev UI +connect to this server as HTTP clients. + +Key Concepts (ELI5):: + + ┌─────────────────────┬────────────────────────────────────────────────┐ + │ Concept │ ELI5 Explanation │ + ├─────────────────────┼────────────────────────────────────────────────┤ + │ Reflection Server │ Genkit starts a mini web server that lets │ + │ │ tools peek inside and run your AI flows. │ + ├─────────────────────┼────────────────────────────────────────────────┤ + │ Action │ Any registered flow, tool, or model that can │ + │ │ be discovered and executed via this API. │ + ├─────────────────────┼────────────────────────────────────────────────┤ + │ Streaming Response │ Results sent piece by piece, like watching │ + │ │ a video stream instead of downloading first. │ + └─────────────────────┴────────────────────────────────────────────────┘ + +Architecture:: + + ┌─────────────────────┐ ┌─────────────────────┐ + │ Genkit CLI │ │ Dev UI │ + │ (HTTP Client) │ │ (HTTP Client) │ + └──────────┬──────────┘ └──────────┬──────────┘ + │ │ + │ HTTP Requests │ + │ (GET /api/actions, etc) │ + │ │ + ▼ ▼ + ┌───────────────────────────────────────────────────────┐ + │ Genkit Runtime │ + │ ┌────────────────────┐ │ + │ │ HTTP Server │ │ + │ │ (port 3100) │ │ + │ └────────────────────┘ │ + │ ┌────────────────────┐ │ + │ │ Registry │ │ + │ │ (Actions, Flows) │ │ + │ └────────────────────┘ │ + └───────────────────────────────────────────────────────┘ + + Discovery: Runtime writes file to ~/.genkit/{runtimeId}.runtime.json + Connection: CLI reads file, finds port, connects via HTTP + +Data Flow:: + + CLI / Dev UI Reflection Server + │ │ + │ ──── GET /api/__health ────► │ + │ ◄──── {"status": "OK"} ──── │ + │ │ + │ ──── GET /api/actions ────► │ + │ ◄──── {actions dict} ──── │ + │ │ + │ ──── POST /api/runAction ────► │ (key, input, context) + │ ◄──── X-Genkit-Trace-Id header ── │ (early flush) + │ ◄──── stream chunks ──── │ (if streaming) + │ ◄──── final result ──── │ + │ │ + │ ──── POST /api/cancelAction ──► │ (traceId) + │ ◄──── {"message": "..."} ──── │ + │ │ + +Key Endpoints:: + + ┌────────┬─────────────────────┬───────────────────────────────────────┐ + │ Method │ Path │ Description │ + ├────────┼─────────────────────┼───────────────────────────────────────┤ + │ GET │ /api/__health │ Health check │ + │ GET │ /api/actions │ List all registered actions │ + │ GET │ /api/values │ List values by type │ + │ GET │ /api/envs │ List configured environments │ + │ POST │ /api/runAction │ Execute action (supports streaming) │ + │ POST │ /api/cancelAction │ Cancel a running action by traceId │ + │ POST │ /api/notify │ Handle notifications │ + │ POST │ /api/__quitquitquit │ Trigger graceful shutdown │ + └────────┴─────────────────────┴───────────────────────────────────────┘ + +Caveats: + The reflection API server predates the flows server implementation and + differs in the protocol it uses to interface with the Dev UI. The + streaming protocol uses unadorned JSON per streamed chunk. This may + change in the future to use Server-Sent Events (SSE). + +See Also: + - V2 WebSocket implementation: genkit.core.reflection +""" + +from __future__ import annotations + +import asyncio +import json +from collections.abc import AsyncGenerator, Callable +from typing import Any, cast + +from starlette.applications import Starlette +from starlette.middleware import Middleware +from starlette.middleware.cors import CORSMiddleware +from starlette.requests import Request +from starlette.responses import JSONResponse, StreamingResponse +from starlette.routing import Route + +from genkit.codec import dump_dict, dump_json +from genkit.core.action import Action +from genkit.core.action.types import ActionKind +from genkit.core.constants import DEFAULT_GENKIT_VERSION +from genkit.core.error import get_reflection_json +from genkit.core.logging import get_logger +from genkit.core.registry import Registry +from genkit.web.manager.signals import terminate_all_servers +from genkit.web.requests import ( + is_streaming_requested, +) +from genkit.web.typing import ( + Application, + StartupHandler, +) + +logger = get_logger(__name__) + + +def _list_registered_actions(registry: Registry) -> dict[str, Action]: + """Return all locally registered actions keyed as `//`.""" + registered: dict[str, Action] = {} + for kind in ActionKind.__members__.values(): + for name, action in registry.get_actions_by_kind(kind).items(): + registered[f'/{kind.value}/{name}'] = action + return registered + + +def _build_actions_payload( + *, + registered_actions: dict[str, Action], + plugin_metas: list[Any], +) -> dict[str, dict[str, Any]]: + """Build payload for GET /api/actions.""" + actions: dict[str, dict[str, Any]] = {} + + # 1) Registered actions (flows/tools/etc). + for key, action in registered_actions.items(): + actions[key] = { + 'key': key, + 'name': action.name, + 'type': action.kind.value, + 'description': action.description, + 'inputSchema': action.input_schema, + 'outputSchema': action.output_schema, + 'metadata': action.metadata, + } + + # 2) Plugin-advertised actions (may not be registered yet). + for meta in plugin_metas or []: + try: + key = f'/{meta.kind.value}/{meta.name}' + except Exception as exc: + # Defensive: skip unexpected plugin metadata objects. + logger.warning('Skipping invalid plugin action metadata', error=str(exc)) + continue + + advertised = { + 'key': key, + 'name': meta.name, + 'type': meta.kind.value, + 'description': getattr(meta, 'description', None), + 'inputSchema': getattr(meta, 'input_json_schema', None), + 'outputSchema': getattr(meta, 'output_json_schema', None), + 'metadata': getattr(meta, 'metadata', None), + } + + if key not in actions: + actions[key] = advertised + continue + + # Merge into the existing (registered) action entry; prefer registered data. + existing = actions[key] + + if not existing.get('description') and advertised.get('description'): + existing['description'] = advertised['description'] + + if not existing.get('inputSchema') and advertised.get('inputSchema'): + existing['inputSchema'] = advertised['inputSchema'] + + if not existing.get('outputSchema') and advertised.get('outputSchema'): + existing['outputSchema'] = advertised['outputSchema'] + + existing_meta = existing.get('metadata') or {} + advertised_meta = advertised.get('metadata') or {} + if isinstance(existing_meta, dict) and isinstance(advertised_meta, dict): + # Prefer registered action metadata on key conflicts. + existing['metadata'] = {**advertised_meta, **existing_meta} + + return actions + + +def create_reflection_asgi_app( + registry: Registry, + on_app_startup: StartupHandler | None = None, + on_app_shutdown: StartupHandler | None = None, + version: str = DEFAULT_GENKIT_VERSION, + _encoding: str = 'utf-8', +) -> Application: + """Create and return a ASGI application for the Genkit reflection API. + + Caveats: + + The reflection API server predates the flows server implementation and + differs in the protocol it uses to interface with the Dev UI. The + streaming protocol uses unadorned JSON per streamed chunk. This may + change in the future to use Server-Sent Events (SSE). + + Key endpoints: + + | Method | Path | Handler | + |--------|---------------------|-----------------------| + | GET | /api/__health | Health check | + | GET | /api/actions | List actions | + | POST | /api/__quitquitquit | Trigger shutdown | + | POST | /api/notify | Handle notification | + | POST | /api/runAction | Run action (streaming)| + + Args: + registry: The registry to use for the reflection server. + on_app_startup: Optional callback to execute when the app's + lifespan starts. Must be an async function. + on_app_shutdown: Optional callback to execute when the app's + lifespan ends. Must be an async function. + version: The version string to use when setting the value of + the X-GENKIT-VERSION HTTP header. + encoding: The text encoding to use; default 'utf-8'. + + Returns: + An ASGI application configured with the given registry. + """ + + async def handle_health_check(_request: Request) -> JSONResponse: + """Handle health check requests. + + Args: + _request: The Starlette request object (unused). + + Returns: + A JSON response with status code 200. + """ + return JSONResponse(content={'status': 'OK'}) + + async def handle_terminate(_request: Request) -> JSONResponse: + """Handle the quit endpoint. + + Args: + _request: The Starlette request object (unused). + + Returns: + An empty JSON response with status code 200. + """ + await logger.ainfo('Shutting down servers...') + terminate_all_servers() + return JSONResponse(content={'status': 'OK'}) + + async def handle_list_actions(_request: Request) -> JSONResponse: + """Handle the request for listing available actions. + + Args: + _request: The Starlette request object (unused). + + Returns: + A JSON response containing all serializable actions. + """ + registered = _list_registered_actions(registry) + metas = await registry.list_actions() + actions = _build_actions_payload(registered_actions=registered, plugin_metas=metas) + + return JSONResponse( + content=actions, + status_code=200, + headers={'x-genkit-version': version}, + ) + + async def handle_list_values(request: Request) -> JSONResponse: + """Handle the request for listing registered values. + + Args: + request: The Starlette request object. + + Returns: + A JSON response containing value names. + """ + kind = request.query_params.get('type') + if not kind: + return JSONResponse(content='Query parameter "type" is required.', status_code=400) + + if kind != 'defaultModel': + return JSONResponse( + content=f"'type' {kind} is not supported. Only 'defaultModel' is supported", status_code=400 + ) + + values = registry.list_values(kind) + return JSONResponse(content=values, status_code=200) + + async def handle_list_envs(_request: Request) -> JSONResponse: + """Handle the request for listing environments. + + Args: + _request: The Starlette request object (unused). + + Returns: + A JSON response containing environments. + """ + return JSONResponse(content=['dev'], status_code=200) + + async def handle_notify(_request: Request) -> JSONResponse: + """Handle the notification endpoint. + + Args: + _request: The Starlette request object (unused). + + Returns: + An empty JSON response with status code 200. + """ + return JSONResponse( + content={}, + status_code=200, + headers={'x-genkit-version': version}, + ) + + # Map of active actions indexed by trace ID for cancellation support. + active_actions: dict[str, asyncio.Task[Any]] = {} + + async def handle_cancel_action(request: Request) -> JSONResponse: + """Handle the cancelAction endpoint. + + Args: + request: The Starlette request object. + + Returns: + A JSON response. + """ + try: + payload = await request.json() + trace_id = payload.get('traceId') + if not trace_id: + return JSONResponse(content={'error': 'traceId is required'}, status_code=400) + + task = active_actions.get(trace_id) + if task: + _ = task.cancel() + return JSONResponse(content={'message': 'Action cancelled'}, status_code=200) + else: + return JSONResponse(content={'message': 'Action not found or already completed'}, status_code=404) + except Exception as e: + logger.error(f'Error cancelling action: {e}', exc_info=True) + return JSONResponse( + content={'error': 'An unexpected error occurred while cancelling the action.'}, + status_code=500, + ) + + async def handle_run_action( + request: Request, + ) -> JSONResponse | StreamingResponse: + """Handle the runAction endpoint for executing registered actions. + + Flow: + 1. Reads and validates the request payload + 2. Looks up the requested action + 3. Executes the action with the provided input + 4. Returns the action result as JSON with trace ID + + Args: + request: The Starlette request object. + + Returns: + A JSON or StreamingResponse with the action result, or an error + response. + """ + # Get the action using async resolve. + payload = await request.json() + action = await registry.resolve_action_by_key(payload['key']) + if action is None: + return JSONResponse( + content={'error': f'Action not found: {payload["key"]}'}, + status_code=404, + ) + + # Run the action. + context = payload.get('context', {}) + action_input = payload.get('input') + stream = is_streaming_requested(request) + + # Wrap execution to track the task for cancellation support + task = asyncio.current_task() + + def on_trace_start(trace_id: str) -> None: + if task: + active_actions[trace_id] = task + + handler = run_streaming_action if stream else run_standard_action + + try: + return await handler(action, payload, action_input, context, version, on_trace_start) + except asyncio.CancelledError: + logger.info('Action execution cancelled.') + # Can't really send response if cancelled? Starlette/uvicorn closes connection? + # Just raise. + raise + + async def run_streaming_action( + action: Action, + payload: dict[str, Any], + _action_input: object, + context: dict[str, Any], + version: str, + on_trace_start: Callable[[str], None], + ) -> StreamingResponse: + """Handle streaming action execution with early header flushing. + + Uses early header flushing to send X-Genkit-Trace-Id immediately when + the trace starts, enabling the Dev UI to subscribe to SSE for real-time + trace updates. + + Args: + action: The action to execute. + payload: Request payload with input data. + action_input: The input for the action. + context: Execution context. + version: The Genkit version header value. + on_trace_start: Callback for trace start. + + Returns: + A StreamingResponse with JSON chunks containing result or error + events. + """ + # Use a queue to pass chunks from the callback to the generator + chunk_queue: asyncio.Queue[str | None] = asyncio.Queue() + + # Event to signal when trace ID is available + trace_id_event: asyncio.Event = asyncio.Event() + run_trace_id: str | None = None + + def wrapped_on_trace_start(tid: str) -> None: + nonlocal run_trace_id + run_trace_id = tid + on_trace_start(tid) + trace_id_event.set() # Signal that trace ID is ready + + async def run_action_task() -> None: + """Run the action and put chunks on the queue.""" + try: + + def send_chunk(chunk: Any) -> None: # noqa: ANN401 + """Callback that puts chunks on the queue.""" + out = dump_json(chunk) + chunk_queue.put_nowait(f'{out}\n') + + output = await action.arun_raw( + raw_input=payload.get('input'), + on_chunk=send_chunk, + context=context, + on_trace_start=wrapped_on_trace_start, + ) + final_response = { + 'result': dump_dict(output.response), + 'telemetry': {'traceId': output.trace_id}, + } + chunk_queue.put_nowait(json.dumps(final_response)) + + except Exception as e: + error_response = get_reflection_json(e).model_dump(by_alias=True) + # Log with exc_info for pretty exception output via rich/structlog + logger.exception('Error streaming action', exc_info=e) + # Error response also should not have trailing newline (final message) + chunk_queue.put_nowait(json.dumps(error_response)) + # Ensure trace_id_event is set even on error + trace_id_event.set() + + finally: + if not trace_id_event.is_set(): + trace_id_event.set() + # Signal end of stream + chunk_queue.put_nowait(None) + if run_trace_id: + _ = active_actions.pop(run_trace_id, None) + + # Start the action task immediately so trace ID becomes available ASAP + action_task = asyncio.create_task(run_action_task()) + + # Wait for trace ID before returning response - this enables early header flushing + _ = await trace_id_event.wait() + + # Now we have the trace ID, include it in headers + headers = { + 'x-genkit-version': version, + 'Transfer-Encoding': 'chunked', + } + if run_trace_id: + headers['X-Genkit-Trace-Id'] = run_trace_id # pyright: ignore[reportUnreachable] + + async def stream_generator() -> AsyncGenerator[str, None]: + """Yield chunks from the queue as they arrive.""" + try: + while True: + chunk = await chunk_queue.get() + if chunk is None: + break + yield chunk + finally: + # Cancel task if still running (no-op if already done) + _ = action_task.cancel() + + return StreamingResponse( + stream_generator(), + # Reflection server uses text/plain for streaming (not SSE format) + # to match Go implementation + media_type='text/plain', + headers=headers, + ) + + async def run_standard_action( + action: Action, + payload: dict[str, Any], + _action_input: object, + context: dict[str, Any], + version: str, + on_trace_start: Callable[[str], None], + ) -> StreamingResponse: + """Handle standard (non-streaming) action execution with early header flushing. + + Uses StreamingResponse to enable sending the X-Genkit-Trace-Id header + immediately when the trace starts, allowing the Dev UI to subscribe to + the SSE stream for real-time trace updates. + + Args: + action: The action to execute. + payload: Request payload with input data. + action_input: The input for the action. + context: Execution context. + version: The Genkit version header value. + on_trace_start: Callback for trace start. + + Returns: + A StreamingResponse that flushes headers early. + """ + # Event to signal when trace ID is available + trace_id_event: asyncio.Event = asyncio.Event() + run_trace_id: str | None = None + action_result: dict[str, Any] | None = None + action_error: Exception | None = None + + def wrapped_on_trace_start(tid: str) -> None: + nonlocal run_trace_id + run_trace_id = tid + on_trace_start(tid) + trace_id_event.set() # Signal that trace ID is ready + + async def run_action_and_get_result() -> None: + nonlocal action_result, action_error + try: + output = await action.arun_raw( + raw_input=payload.get('input'), + context=context, + on_trace_start=wrapped_on_trace_start, + ) + action_result = { + 'result': dump_dict(output.response), + 'telemetry': {'traceId': output.trace_id}, + } + except Exception as e: + action_error = e + finally: + if not trace_id_event.is_set(): + trace_id_event.set() + + # Start the action immediately so trace ID becomes available ASAP + action_task = asyncio.create_task(run_action_and_get_result()) + + # Wait for trace ID before returning response + _ = await trace_id_event.wait() + + # Now return streaming response - headers will include trace ID + async def body_generator() -> AsyncGenerator[bytes, None]: + # Wait for action to complete + await action_task + + if action_error: + error_response = get_reflection_json(action_error).model_dump(by_alias=True) + # Log with exc_info for pretty exception output via rich/structlog + logger.exception('Error executing action', exc_info=action_error) + yield json.dumps(error_response).encode('utf-8') + else: + yield json.dumps(action_result).encode('utf-8') + + if run_trace_id: + _ = active_actions.pop(run_trace_id, None) + + headers = { + 'x-genkit-version': version, + } + if run_trace_id: + headers['X-Genkit-Trace-Id'] = run_trace_id # pyright: ignore[reportUnreachable] + + return StreamingResponse( + body_generator(), + media_type='application/json', + headers=headers, + ) + + app = Starlette( + routes=[ + Route('/api/__health', handle_health_check, methods=['GET']), + Route('/api/__quitquitquit', handle_terminate, methods=['GET', 'POST']), # Support both for parity + Route('/api/actions', handle_list_actions, methods=['GET']), + Route('/api/values', handle_list_values, methods=['GET']), + Route('/api/envs', handle_list_envs, methods=['GET']), + Route('/api/notify', handle_notify, methods=['POST']), + Route('/api/runAction', handle_run_action, methods=['POST']), + Route('/api/cancelAction', handle_cancel_action, methods=['POST']), + ], + middleware=[ + Middleware( + CORSMiddleware, # type: ignore[arg-type] + allow_origins=['*'], + allow_methods=['*'], + allow_headers=['*'], + expose_headers=['X-Genkit-Trace-Id', 'X-Genkit-Span-Id', 'x-genkit-version'], + ) + ], + on_startup=[on_app_startup] if on_app_startup else [], + on_shutdown=[on_app_shutdown] if on_app_shutdown else [], + ) + app.active_actions = active_actions # type: ignore[attr-defined] + return cast(Application, app) diff --git a/py/packages/genkit/tests/genkit/core/endpoints/reflection_test.py b/py/packages/genkit/tests/genkit/core/endpoints/reflection_test.py index 97c8422942..ac56984be7 100644 --- a/py/packages/genkit/tests/genkit/core/endpoints/reflection_test.py +++ b/py/packages/genkit/tests/genkit/core/endpoints/reflection_test.py @@ -14,207 +14,355 @@ # # SPDX-License-Identifier: Apache-2.0 -"""Tests for the reflection API server. +"""Tests for the Reflection API v2 module. -This module contains unit tests for the ASGI-based reflection API server -which provides endpoints for inspecting and interacting with Genkit during -development. +This module contains unit tests for the WebSocket-based Reflection API v2 +client which connects to a runtime manager server. Test coverage includes: -- Health check endpoint (/api/__health) -- Listing registered actions (/api/actions) -- Notification endpoint (/api/notify) -- Action execution with various scenarios (/api/runAction): - - Standard action execution - - Streaming action execution - - Error handling when action not found - - Context passing to actions - -The tests use an ASGI client with mocked Registry to isolate and verify -each endpoint's behavior. +- JSON-RPC message structures (request, response, error) +- Helper functions (is_reflection_v2_enabled, get_reflection_v2_url) +- Active actions map operations +- Action descriptor generation """ from __future__ import annotations -from collections.abc import AsyncIterator, Awaitable, Callable -from typing import Any, cast -from unittest.mock import ANY, AsyncMock, MagicMock, patch +import os +from unittest.mock import AsyncMock, MagicMock, patch import pytest -import pytest_asyncio -from httpx import ASGITransport, AsyncClient -from genkit.core.reflection import create_reflection_asgi_app -from genkit.core.registry import Registry - - -@pytest.fixture -def mock_registry() -> MagicMock: - """Create a mock Registry for testing.""" - return MagicMock(spec=Registry) - - -@pytest_asyncio.fixture -async def asgi_client(mock_registry: MagicMock) -> AsyncIterator[AsyncClient]: - """Create an ASGI test client with a mock registry. - - Args: - mock_registry: A mock Registry object. - - Returns: - An AsyncClient configured to make requests to the test ASGI app. - """ - app = create_reflection_asgi_app(mock_registry) - transport = ASGITransport(app=app) # type: ignore[arg-type] - client = AsyncClient(transport=transport, base_url='http://test') - try: - yield client - finally: - await client.aclose() - - -@pytest.mark.asyncio -async def test_health_check(asgi_client: AsyncClient) -> None: - """Test that the health check endpoint returns 200 OK.""" - response = await asgi_client.get('/api/__health') - assert response.status_code == 200 - - -@pytest.mark.asyncio -async def test_list_actions(asgi_client: AsyncClient, mock_registry: MagicMock) -> None: - """Test that the actions list endpoint returns registered actions.""" - from genkit.core.action import ActionMetadata - from genkit.core.action.types import ActionKind - - # Mock the async list_actions method to return a list of ActionMetadata - async def mock_list_actions_async(allowed_kinds: list[ActionKind] | None = None) -> list[ActionMetadata]: - return [ - ActionMetadata( - kind=ActionKind.CUSTOM, - name='action1', - ) - ] - - mock_registry.list_actions = mock_list_actions_async - response = await asgi_client.get('/api/actions') - assert response.status_code == 200 - result = response.json() - assert '/custom/action1' in result - assert result['/custom/action1']['name'] == 'action1' - assert result['/custom/action1']['type'] == 'custom' - - -@pytest.mark.asyncio -async def test_notify_endpoint(asgi_client: AsyncClient) -> None: - """Test that the notify endpoint returns 200 OK.""" - response = await asgi_client.post('/api/notify') - assert response.status_code == 200 - - -@pytest.mark.asyncio -async def test_run_action_not_found(asgi_client: AsyncClient, mock_registry: MagicMock) -> None: - """Test that requesting a non-existent action returns a 404 error.""" - - async def mock_resolve_action_by_key(key: str) -> None: - return None - - mock_registry.resolve_action_by_key = mock_resolve_action_by_key - response = await asgi_client.post( - '/api/runAction', - json={'key': 'non_existent_action', 'input': {'data': 'test'}}, - ) - assert response.status_code == 404 - assert 'error' in response.json() - - -@pytest.mark.asyncio -async def test_run_action_standard(asgi_client: AsyncClient, mock_registry: MagicMock) -> None: - """Test that a standard (non-streaming) action works correctly.""" - mock_action = AsyncMock() - mock_output = MagicMock() - mock_output.response = {'result': 'success'} - mock_output.trace_id = 'test_trace_id' - mock_action.arun_raw.return_value = mock_output - - async def mock_resolve_action_by_key(key: str) -> AsyncMock: - return mock_action - - mock_registry.resolve_action_by_key = mock_resolve_action_by_key - - response = await asgi_client.post('/api/runAction', json={'key': 'test_action', 'input': {'data': 'test'}}) - - assert response.status_code == 200 - response_data = response.json() - assert 'result' in response_data - assert 'telemetry' in response_data - assert response_data['telemetry']['traceId'] == 'test_trace_id' - mock_action.arun_raw.assert_called_once_with(raw_input={'data': 'test'}, context={}, on_trace_start=ANY) - - -@pytest.mark.asyncio -async def test_run_action_with_context(asgi_client: AsyncClient, mock_registry: MagicMock) -> None: - """Test that an action with context works correctly.""" - mock_action = AsyncMock() - mock_output = MagicMock() - mock_output.response = {'result': 'success'} - mock_output.trace_id = 'test_trace_id' - mock_action.arun_raw.return_value = mock_output - - async def mock_resolve_action_by_key(key: str) -> AsyncMock: - return mock_action - - mock_registry.resolve_action_by_key = mock_resolve_action_by_key - - response = await asgi_client.post( - '/api/runAction', - json={ - 'key': 'test_action', - 'input': {'data': 'test'}, - 'context': {'user': 'test_user'}, - }, - ) - - assert response.status_code == 200 - mock_action.arun_raw.assert_called_once_with( - raw_input={'data': 'test'}, - context={'user': 'test_user'}, - on_trace_start=ANY, - ) - - -@pytest.mark.asyncio -@patch('genkit.core.reflection.is_streaming_requested') -async def test_run_action_streaming( - mock_is_streaming: MagicMock, - asgi_client: AsyncClient, - mock_registry: MagicMock, -) -> None: - """Test that streaming actions work correctly.""" - mock_is_streaming.return_value = True - mock_action = AsyncMock() - - async def mock_streaming( - raw_input: object, - on_chunk: object | None = None, - context: object | None = None, - **kwargs: Any, # noqa: ANN401 - ) -> MagicMock: - if on_chunk: - on_chunk_fn = cast(Callable[[object], Awaitable[None]], on_chunk) - await on_chunk_fn({'chunk': 1}) - await on_chunk_fn({'chunk': 2}) - mock_output = MagicMock() - mock_output.response = {'final': 'result'} - mock_output.trace_id = 'stream_trace_id' - return mock_output - - mock_action.arun_raw.side_effect = mock_streaming - mock_registry.resolve_action_by_key.return_value = mock_action - - response = await asgi_client.post( - '/api/runAction?stream=true', - json={'key': 'test_action', 'input': {'data': 'test'}}, - ) - - assert response.status_code == 200 - assert mock_is_streaming.called +from genkit.core.reflection import ( + ActiveAction, + ActiveActionsMap, + JsonRpcError, + JsonRpcRequest, + JsonRpcResponse, + ReflectionClientV2, + get_reflection_v2_url, + is_reflection_v2_enabled, +) + + +class TestJsonRpcRequest: + """Tests for JsonRpcRequest class.""" + + def test_notification_to_dict(self) -> None: + """Test that a notification (no ID) serializes correctly.""" + request = JsonRpcRequest(method='notify', params={'data': 'test'}) + result = request.to_dict() + + assert result == { + 'jsonrpc': '2.0', + 'method': 'notify', + 'params': {'data': 'test'}, + } + + def test_request_with_id_to_dict(self) -> None: + """Test that a request with ID serializes correctly.""" + request = JsonRpcRequest(method='call', params=['arg1'], id=42) + result = request.to_dict() + + assert result == { + 'jsonrpc': '2.0', + 'method': 'call', + 'params': ['arg1'], + 'id': 42, + } + + def test_request_minimal_to_dict(self) -> None: + """Test that a minimal request (no params, no ID) serializes correctly.""" + request = JsonRpcRequest(method='ping') + result = request.to_dict() + + assert result == { + 'jsonrpc': '2.0', + 'method': 'ping', + } + + +class TestJsonRpcResponse: + """Tests for JsonRpcResponse class.""" + + def test_success_response_to_dict(self) -> None: + """Test that a success response serializes correctly.""" + response = JsonRpcResponse(result={'status': 'ok'}, id=1) + result = response.to_dict() + + assert result == { + 'jsonrpc': '2.0', + 'result': {'status': 'ok'}, + 'id': 1, + } + + def test_error_response_to_dict(self) -> None: + """Test that an error response serializes correctly.""" + error = JsonRpcError(code=-32600, message='Invalid Request') + response = JsonRpcResponse(error=error, id=2) + result = response.to_dict() + + assert result == { + 'jsonrpc': '2.0', + 'error': {'code': -32600, 'message': 'Invalid Request'}, + 'id': 2, + } + + def test_error_with_data_to_dict(self) -> None: + """Test that an error with additional data serializes correctly.""" + error = JsonRpcError(code=-32000, message='Custom Error', data={'detail': 'extra info'}) + response = JsonRpcResponse(error=error, id=3) + result = response.to_dict() + + assert result == { + 'jsonrpc': '2.0', + 'error': { + 'code': -32000, + 'message': 'Custom Error', + 'data': {'detail': 'extra info'}, + }, + 'id': 3, + } + + +class TestJsonRpcError: + """Tests for JsonRpcError class.""" + + def test_basic_error_to_dict(self) -> None: + """Test that a basic error serializes correctly.""" + error = JsonRpcError(code=-32601, message='Method not found') + result = error.to_dict() + + assert result == { + 'code': -32601, + 'message': 'Method not found', + } + + def test_error_with_data_to_dict(self) -> None: + """Test that an error with data serializes correctly.""" + error = JsonRpcError(code=-32000, message='Server Error', data={'stack': 'trace'}) + result = error.to_dict() + + assert result == { + 'code': -32000, + 'message': 'Server Error', + 'data': {'stack': 'trace'}, + } + + +class TestActiveActionsMap: + """Tests for ActiveActionsMap class.""" + + @pytest.mark.asyncio + async def test_set_and_get(self) -> None: + """Test that actions can be set and retrieved.""" + actions_map = ActiveActionsMap() + action = ActiveAction( + cancel=lambda: None, + start_time=1000.0, + trace_id='trace-123', + ) + + await actions_map.set('trace-123', action) + result = await actions_map.get('trace-123') + + assert result is not None + assert result is action + assert result.trace_id == 'trace-123' + + @pytest.mark.asyncio + async def test_get_nonexistent(self) -> None: + """Test that getting a nonexistent action returns None.""" + actions_map = ActiveActionsMap() + result = await actions_map.get('nonexistent') + assert result is None + + @pytest.mark.asyncio + async def test_delete(self) -> None: + """Test that actions can be deleted.""" + actions_map = ActiveActionsMap() + action = ActiveAction( + cancel=lambda: None, + start_time=1000.0, + trace_id='trace-456', + ) + + await actions_map.set('trace-456', action) + await actions_map.delete('trace-456') + result = await actions_map.get('trace-456') + + assert result is None + + @pytest.mark.asyncio + async def test_delete_nonexistent(self) -> None: + """Test that deleting a nonexistent action doesn't raise an error.""" + actions_map = ActiveActionsMap() + # Should not raise + await actions_map.delete('nonexistent') + + +class TestHelperFunctions: + """Tests for helper functions.""" + + def test_is_reflection_v2_enabled_false(self) -> None: + """Test that v2 is disabled when env var is not set.""" + with patch.dict(os.environ, {}, clear=True): + # Remove the env var if it exists + os.environ.pop('GENKIT_REFLECTION_V2_SERVER', None) + result = is_reflection_v2_enabled() + assert result is False + + def test_is_reflection_v2_enabled_true(self) -> None: + """Test that v2 is enabled when env var is set.""" + with patch.dict(os.environ, {'GENKIT_REFLECTION_V2_SERVER': 'ws://localhost:4100'}): + result = is_reflection_v2_enabled() + assert result is True + + def test_get_reflection_v2_url_not_set(self) -> None: + """Test that URL is None when env var is not set.""" + with patch.dict(os.environ, {}, clear=True): + os.environ.pop('GENKIT_REFLECTION_V2_SERVER', None) + result = get_reflection_v2_url() + assert result is None + + def test_get_reflection_v2_url_set(self) -> None: + """Test that URL is returned when env var is set.""" + with patch.dict(os.environ, {'GENKIT_REFLECTION_V2_SERVER': 'ws://localhost:4100'}): + result = get_reflection_v2_url() + assert result == 'ws://localhost:4100' + + +class TestReflectionClientV2: + """Tests for ReflectionClientV2 class.""" + + def test_runtime_id(self) -> None: + """Test that runtime_id is based on process ID.""" + mock_registry = MagicMock() + client = ReflectionClientV2(mock_registry, 'ws://localhost:4100') + + # runtime_id should be the process ID as a string + assert client.runtime_id == str(os.getpid()) + + def test_default_configured_envs(self) -> None: + """Test that default configured_envs is ['dev'].""" + mock_registry = MagicMock() + client = ReflectionClientV2(mock_registry, 'ws://localhost:4100') + + assert client._configured_envs == ['dev'] + + def test_custom_configured_envs(self) -> None: + """Test that custom configured_envs can be set.""" + mock_registry = MagicMock() + client = ReflectionClientV2(mock_registry, 'ws://localhost:4100', configured_envs=['prod', 'staging']) + + assert client._configured_envs == ['prod', 'staging'] + + @pytest.mark.asyncio + async def test_stop(self) -> None: + """Test that stop() sets running to False and closes WebSocket.""" + mock_registry = MagicMock() + client = ReflectionClientV2(mock_registry, 'ws://localhost:4100') + client._running = True + client._ws = AsyncMock() + + await client.stop() + + assert client._running is False + client._ws.close.assert_called_once() + + @pytest.mark.asyncio + async def test_handle_list_actions(self) -> None: + """Test that _handle_list_actions returns action descriptors.""" + from genkit.core.action import Action + from genkit.core.action.types import ActionKind + + mock_registry = MagicMock() + + # Mock action + mock_action = MagicMock(spec=Action) + mock_action.name = 'test_action' + mock_action.kind = ActionKind.TOOL + mock_action.description = 'A test tool' + mock_action.input_schema = {'type': 'object'} + mock_action.output_schema = {'type': 'string'} + mock_action.metadata = {'key': 'value'} + + # Mock registry methods + mock_registry.get_actions_by_kind.return_value = {'test_action': mock_action} + mock_registry.list_actions = AsyncMock(return_value=[]) + + client = ReflectionClientV2(mock_registry, 'ws://localhost:4100') + result = await client._handle_list_actions() + + # Should have the action keyed by /{kind}/{name} + assert '/tool/test_action' in result + assert result['/tool/test_action']['name'] == 'test_action' + assert result['/tool/test_action']['type'] == 'tool' + assert result['/tool/test_action']['description'] == 'A test tool' + + @pytest.mark.asyncio + async def test_handle_list_values(self) -> None: + """Test that _handle_list_values returns value list.""" + mock_registry = MagicMock() + mock_registry.list_values.return_value = ['model1', 'model2'] + + client = ReflectionClientV2(mock_registry, 'ws://localhost:4100') + result = await client._handle_list_values({'type': 'defaultModel'}) + + assert result == ['model1', 'model2'] + mock_registry.list_values.assert_called_once_with('defaultModel') + + @pytest.mark.asyncio + async def test_handle_cancel_action_missing_trace_id(self) -> None: + """Test that cancel action returns error when traceId is missing.""" + mock_registry = MagicMock() + client = ReflectionClientV2(mock_registry, 'ws://localhost:4100') + + result, error = await client._handle_cancel_action({}) + + assert result is None + assert error is not None + assert error.code == -32602 + assert 'traceId' in error.message + + @pytest.mark.asyncio + async def test_handle_cancel_action_not_found(self) -> None: + """Test that cancel action returns error when action not found.""" + mock_registry = MagicMock() + client = ReflectionClientV2(mock_registry, 'ws://localhost:4100') + + result, error = await client._handle_cancel_action({'traceId': 'nonexistent'}) + + assert result is None + assert error is not None + assert error.code == -32004 # JSON-RPC implementation-defined server error + + @pytest.mark.asyncio + async def test_handle_cancel_action_success(self) -> None: + """Test that cancel action works correctly.""" + mock_registry = MagicMock() + client = ReflectionClientV2(mock_registry, 'ws://localhost:4100') + + # Add an active action + cancel_called = [] + + def cancel_fn() -> None: + cancel_called.append(True) + + action = ActiveAction( + cancel=cancel_fn, + start_time=1000.0, + trace_id='trace-to-cancel', + ) + await client._active_actions.set('trace-to-cancel', action) + + result, error = await client._handle_cancel_action({'traceId': 'trace-to-cancel'}) + + assert error is None + assert result is not None + assert 'message' in result + assert cancel_called == [True] + + # Action should be removed from active actions + remaining = await client._active_actions.get('trace-to-cancel') + assert remaining is None diff --git a/py/packages/genkit/tests/genkit/core/endpoints/reflection_v1_test.py b/py/packages/genkit/tests/genkit/core/endpoints/reflection_v1_test.py new file mode 100644 index 0000000000..8873da40bb --- /dev/null +++ b/py/packages/genkit/tests/genkit/core/endpoints/reflection_v1_test.py @@ -0,0 +1,220 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for the reflection API server. + +This module contains unit tests for the ASGI-based reflection API server +which provides endpoints for inspecting and interacting with Genkit during +development. + +Test coverage includes: +- Health check endpoint (/api/__health) +- Listing registered actions (/api/actions) +- Notification endpoint (/api/notify) +- Action execution with various scenarios (/api/runAction): + - Standard action execution + - Streaming action execution + - Error handling when action not found + - Context passing to actions + +The tests use an ASGI client with mocked Registry to isolate and verify +each endpoint's behavior. +""" + +from __future__ import annotations + +from collections.abc import AsyncIterator, Awaitable, Callable +from typing import Any, cast +from unittest.mock import ANY, AsyncMock, MagicMock, patch + +import pytest +import pytest_asyncio +from httpx import ASGITransport, AsyncClient + +from genkit.core.reflection_v1 import create_reflection_asgi_app +from genkit.core.registry import Registry + + +@pytest.fixture +def mock_registry() -> MagicMock: + """Create a mock Registry for testing.""" + return MagicMock(spec=Registry) + + +@pytest_asyncio.fixture +async def asgi_client(mock_registry: MagicMock) -> AsyncIterator[AsyncClient]: + """Create an ASGI test client with a mock registry. + + Args: + mock_registry: A mock Registry object. + + Returns: + An AsyncClient configured to make requests to the test ASGI app. + """ + app = create_reflection_asgi_app(mock_registry) + transport = ASGITransport(app=app) # type: ignore[arg-type] + client = AsyncClient(transport=transport, base_url='http://test') + try: + yield client + finally: + await client.aclose() + + +@pytest.mark.asyncio +async def test_health_check(asgi_client: AsyncClient) -> None: + """Test that the health check endpoint returns 200 OK.""" + response = await asgi_client.get('/api/__health') + assert response.status_code == 200 + + +@pytest.mark.asyncio +async def test_list_actions(asgi_client: AsyncClient, mock_registry: MagicMock) -> None: + """Test that the actions list endpoint returns registered actions.""" + from genkit.core.action import ActionMetadata + from genkit.core.action.types import ActionKind + + # Mock the async list_actions method to return a list of ActionMetadata + async def mock_list_actions_async(allowed_kinds: list[ActionKind] | None = None) -> list[ActionMetadata]: + return [ + ActionMetadata( + kind=ActionKind.CUSTOM, + name='action1', + ) + ] + + mock_registry.list_actions = mock_list_actions_async + response = await asgi_client.get('/api/actions') + assert response.status_code == 200 + result = response.json() + assert '/custom/action1' in result + assert result['/custom/action1']['name'] == 'action1' + assert result['/custom/action1']['type'] == 'custom' + + +@pytest.mark.asyncio +async def test_notify_endpoint(asgi_client: AsyncClient) -> None: + """Test that the notify endpoint returns 200 OK.""" + response = await asgi_client.post('/api/notify') + assert response.status_code == 200 + + +@pytest.mark.asyncio +async def test_run_action_not_found(asgi_client: AsyncClient, mock_registry: MagicMock) -> None: + """Test that requesting a non-existent action returns a 404 error.""" + + async def mock_resolve_action_by_key(key: str) -> None: + return None + + mock_registry.resolve_action_by_key = mock_resolve_action_by_key + response = await asgi_client.post( + '/api/runAction', + json={'key': 'non_existent_action', 'input': {'data': 'test'}}, + ) + assert response.status_code == 404 + assert 'error' in response.json() + + +@pytest.mark.asyncio +async def test_run_action_standard(asgi_client: AsyncClient, mock_registry: MagicMock) -> None: + """Test that a standard (non-streaming) action works correctly.""" + mock_action = AsyncMock() + mock_output = MagicMock() + mock_output.response = {'result': 'success'} + mock_output.trace_id = 'test_trace_id' + mock_action.arun_raw.return_value = mock_output + + async def mock_resolve_action_by_key(key: str) -> AsyncMock: + return mock_action + + mock_registry.resolve_action_by_key = mock_resolve_action_by_key + + response = await asgi_client.post('/api/runAction', json={'key': 'test_action', 'input': {'data': 'test'}}) + + assert response.status_code == 200 + response_data = response.json() + assert 'result' in response_data + assert 'telemetry' in response_data + assert response_data['telemetry']['traceId'] == 'test_trace_id' + mock_action.arun_raw.assert_called_once_with(raw_input={'data': 'test'}, context={}, on_trace_start=ANY) + + +@pytest.mark.asyncio +async def test_run_action_with_context(asgi_client: AsyncClient, mock_registry: MagicMock) -> None: + """Test that an action with context works correctly.""" + mock_action = AsyncMock() + mock_output = MagicMock() + mock_output.response = {'result': 'success'} + mock_output.trace_id = 'test_trace_id' + mock_action.arun_raw.return_value = mock_output + + async def mock_resolve_action_by_key(key: str) -> AsyncMock: + return mock_action + + mock_registry.resolve_action_by_key = mock_resolve_action_by_key + + response = await asgi_client.post( + '/api/runAction', + json={ + 'key': 'test_action', + 'input': {'data': 'test'}, + 'context': {'user': 'test_user'}, + }, + ) + + assert response.status_code == 200 + mock_action.arun_raw.assert_called_once_with( + raw_input={'data': 'test'}, + context={'user': 'test_user'}, + on_trace_start=ANY, + ) + + +@pytest.mark.asyncio +@patch('genkit.core.reflection_v1.is_streaming_requested') +async def test_run_action_streaming( + mock_is_streaming: MagicMock, + asgi_client: AsyncClient, + mock_registry: MagicMock, +) -> None: + """Test that streaming actions work correctly.""" + mock_is_streaming.return_value = True + mock_action = AsyncMock() + + async def mock_streaming( + raw_input: object, + on_chunk: object | None = None, + context: object | None = None, + **kwargs: Any, # noqa: ANN401 + ) -> MagicMock: + if on_chunk: + on_chunk_fn = cast(Callable[[object], Awaitable[None]], on_chunk) + await on_chunk_fn({'chunk': 1}) + await on_chunk_fn({'chunk': 2}) + mock_output = MagicMock() + mock_output.response = {'final': 'result'} + mock_output.trace_id = 'stream_trace_id' + return mock_output + + mock_action.arun_raw.side_effect = mock_streaming + mock_registry.resolve_action_by_key.return_value = mock_action + + response = await asgi_client.post( + '/api/runAction?stream=true', + json={'key': 'test_action', 'input': {'data': 'test'}}, + ) + + assert response.status_code == 200 + assert mock_is_streaming.called diff --git a/py/samples/multi-server/src/main.py b/py/samples/multi-server/src/main.py index 81aee52f87..dd5bd055d0 100755 --- a/py/samples/multi-server/src/main.py +++ b/py/samples/multi-server/src/main.py @@ -84,7 +84,7 @@ from genkit.aio.loop import run_loop from genkit.core.environment import is_dev_environment from genkit.core.logging import get_logger -from genkit.core.reflection import create_reflection_asgi_app +from genkit.core.reflection_v1 import create_reflection_asgi_app from genkit.core.registry import Registry from genkit.web.manager import ( AbstractBaseServer, diff --git a/py/uv.lock b/py/uv.lock index 2b43ba65a3..0c3f2c41de 100644 --- a/py/uv.lock +++ b/py/uv.lock @@ -2060,6 +2060,7 @@ dependencies = [ { name = "typing-extensions" }, { name = "uvicorn" }, { name = "uvloop", marker = "sys_platform != 'win32'" }, + { name = "websockets" }, ] [package.optional-dependencies] @@ -2115,6 +2116,7 @@ requires-dist = [ { name = "typing-extensions", specifier = ">=4.0" }, { name = "uvicorn", specifier = ">=0.34.0" }, { name = "uvloop", marker = "sys_platform != 'win32'", specifier = ">=0.21.0" }, + { name = "websockets", specifier = ">=15.0" }, ] provides-extras = ["dev-local-vectorstore", "flask", "google-cloud", "google-genai", "ollama", "openai", "vertex-ai"] From b3ba0959bba9eac315d275878c96924e332ff50b Mon Sep 17 00:00:00 2001 From: Yesudeep Mangalapilly Date: Thu, 5 Feb 2026 17:08:41 -0800 Subject: [PATCH 2/2] fix(py): update imports for v1/v2 reflection API split - Move create_reflection_asgi_app import from reflection to reflection_v1 in _base_async.py - Update test files to import from correct modules - Add mock for resolve_actions_by_kind in v1 test to match async signature This fixes import errors after the module split and ensures tests pass with the async _list_registered_actions function. --- py/packages/genkit/src/genkit/ai/_base_async.py | 2 +- py/packages/genkit/src/genkit/core/reflection.py | 9 +++------ .../tests/genkit/core/endpoints/reflection_test.py | 4 ++-- .../tests/genkit/core/endpoints/reflection_v1_test.py | 7 ++++++- py/pyproject.toml | 2 +- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/py/packages/genkit/src/genkit/ai/_base_async.py b/py/packages/genkit/src/genkit/ai/_base_async.py index 8f849c65b2..1f549beb14 100644 --- a/py/packages/genkit/src/genkit/ai/_base_async.py +++ b/py/packages/genkit/src/genkit/ai/_base_async.py @@ -30,7 +30,7 @@ from genkit.core.environment import is_dev_environment from genkit.core.logging import get_logger from genkit.core.plugin import Plugin -from genkit.core.reflection import create_reflection_asgi_app +from genkit.core.reflection_v1 import create_reflection_asgi_app from genkit.core.registry import Registry from genkit.web.manager._ports import find_free_port_sync diff --git a/py/packages/genkit/src/genkit/core/reflection.py b/py/packages/genkit/src/genkit/core/reflection.py index 8a4f9df72e..2e0839d148 100644 --- a/py/packages/genkit/src/genkit/core/reflection.py +++ b/py/packages/genkit/src/genkit/core/reflection.py @@ -467,9 +467,9 @@ async def _handle_list_actions(self) -> dict[str, dict[str, Any]]: """ actions: dict[str, dict[str, Any]] = {} - # Get registered actions + # Get registered actions (using resolve to trigger lazy loading) for kind in ActionKind.__members__.values(): - for name, action in self._registry.get_actions_by_kind(kind).items(): + for name, action in (await self._registry.resolve_actions_by_kind(kind)).items(): key = f'/{kind.value}/{name}' actions[key] = self._action_to_desc(action, key) @@ -523,10 +523,7 @@ async def _handle_list_values(self, params: dict[str, Any]) -> list[str]: raise ValueError("The 'type' parameter is required for listValues.") if value_type != 'defaultModel': - raise ValueError( - f"Value type '{value_type}' is not supported. " - "Only 'defaultModel' is currently supported." - ) + raise ValueError(f"Value type '{value_type}' is not supported. Only 'defaultModel' is currently supported.") return self._registry.list_values(value_type) diff --git a/py/packages/genkit/tests/genkit/core/endpoints/reflection_test.py b/py/packages/genkit/tests/genkit/core/endpoints/reflection_test.py index 02e7539f01..af5e6e0f0f 100644 --- a/py/packages/genkit/tests/genkit/core/endpoints/reflection_test.py +++ b/py/packages/genkit/tests/genkit/core/endpoints/reflection_test.py @@ -46,7 +46,7 @@ from genkit.core.action import ActionMetadata from genkit.core.action.types import ActionKind -from genkit.core.reflection import create_reflection_asgi_app +from genkit.core.reflection_v1 import create_reflection_asgi_app from genkit.core.registry import Registry @@ -188,7 +188,7 @@ async def mock_resolve_action_by_key(key: str) -> AsyncMock: @pytest.mark.asyncio -@patch('genkit.core.reflection.is_streaming_requested') +@patch('genkit.core.reflection_v1.is_streaming_requested') async def test_run_action_streaming( mock_is_streaming: MagicMock, asgi_client: AsyncClient, diff --git a/py/packages/genkit/tests/genkit/core/endpoints/reflection_v1_test.py b/py/packages/genkit/tests/genkit/core/endpoints/reflection_v1_test.py index 8873da40bb..22e2612947 100644 --- a/py/packages/genkit/tests/genkit/core/endpoints/reflection_v1_test.py +++ b/py/packages/genkit/tests/genkit/core/endpoints/reflection_v1_test.py @@ -65,7 +65,7 @@ async def asgi_client(mock_registry: MagicMock) -> AsyncIterator[AsyncClient]: An AsyncClient configured to make requests to the test ASGI app. """ app = create_reflection_asgi_app(mock_registry) - transport = ASGITransport(app=app) # type: ignore[arg-type] + transport = ASGITransport(app=app) client = AsyncClient(transport=transport, base_url='http://test') try: yield client @@ -95,7 +95,12 @@ async def mock_list_actions_async(allowed_kinds: list[ActionKind] | None = None) ) ] + # Mock resolve_actions_by_kind to return empty dict (no registered actions in this test) + async def mock_resolve_actions_by_kind(kind: ActionKind) -> dict: + return {} + mock_registry.list_actions = mock_list_actions_async + mock_registry.resolve_actions_by_kind = mock_resolve_actions_by_kind response = await asgi_client.get('/api/actions') assert response.status_code == 200 result = response.json() diff --git a/py/pyproject.toml b/py/pyproject.toml index 6d718aef8d..cbd6260b0d 100644 --- a/py/pyproject.toml +++ b/py/pyproject.toml @@ -75,7 +75,7 @@ dev = [ lint = [ "bandit>=1.7.0", "deptry>=0.22.0", - "litestar>=2.0.0", # For web/typing.py type resolution + "litestar>=2.0.0", # For web/typing.py type resolution "mypy>=1.14.0", "pip-audit>=2.7.0", "pypdf>=6.6.2",