|
3 | 3 | This handler provides a FastAPI application for the Load Balancer runtime. |
4 | 4 | It supports: |
5 | 5 | - /ping: Health check endpoint (required by RunPod Load Balancer) |
6 | | -- /execute: Remote function execution via HTTP POST (queue-based mode) |
7 | | -- User's FastAPI app routes (mothership mode) |
| 6 | +- /execute: Remote function execution via HTTP POST (QB endpoint mode) |
| 7 | +- User's FastAPI app routes (LB endpoint mode) |
8 | 8 |
|
9 | 9 | The handler uses worker-flash's RemoteExecutor for function execution. |
10 | 10 |
|
11 | | -Mothership Mode (FLASH_IS_MOTHERSHIP=true): |
12 | | -- Imports user's FastAPI application from FLASH_MAIN_FILE |
13 | | -- Loads the app object from FLASH_APP_VARIABLE |
| 11 | +LB Endpoint Mode (FLASH_ENDPOINT_TYPE=lb): |
| 12 | +- Auto-discovers generated handler from FLASH_RESOURCE_NAME |
| 13 | +- Loads handler_{resource_name}.py with FastAPI app |
14 | 14 | - Preserves all user routes and middleware |
15 | 15 | - Adds /ping health check endpoint |
16 | 16 |
|
17 | | -Queue-Based Mode (FLASH_IS_MOTHERSHIP not set or false): |
| 17 | +QB Endpoint Mode (FLASH_ENDPOINT_TYPE not set or not "lb"): |
18 | 18 | - Creates generic FastAPI app with /execute endpoint |
19 | 19 | - Uses RemoteExecutor for function execution |
20 | 20 | """ |
21 | 21 |
|
22 | 22 | import importlib.util |
23 | 23 | import logging |
24 | 24 | import os |
| 25 | +from pathlib import Path |
25 | 26 | from typing import Any, Dict |
26 | 27 |
|
27 | 28 | from fastapi import FastAPI |
|
42 | 43 | from runpod_flash.protos.remote_execution import FunctionRequest, FunctionResponse # noqa: E402 |
43 | 44 | from remote_executor import RemoteExecutor # noqa: E402 |
44 | 45 |
|
45 | | -# Determine mode based on environment variables |
46 | | -is_mothership = os.getenv("FLASH_IS_MOTHERSHIP") == "true" |
47 | 46 |
|
48 | | -if is_mothership: |
49 | | - # Mothership mode: Import user's FastAPI application |
50 | | - try: |
51 | | - main_file = os.getenv("FLASH_MAIN_FILE", "main.py") |
52 | | - app_variable = os.getenv("FLASH_APP_VARIABLE", "app") |
| 47 | +def _is_lb_endpoint() -> bool: |
| 48 | + """Determine if this endpoint runs in LB mode (serves user FastAPI routes).""" |
| 49 | + return os.getenv("FLASH_ENDPOINT_TYPE") == "lb" |
53 | 50 |
|
54 | | - logger.info(f"Mothership mode: Importing {app_variable} from {main_file}") |
55 | 51 |
|
56 | | - # Dynamic import of user's module |
57 | | - spec = importlib.util.spec_from_file_location("user_main", main_file) |
58 | | - if spec is None or spec.loader is None: |
59 | | - raise ImportError(f"Cannot find or load {main_file}") |
| 52 | +def _discover_lb_app(handler_dir: str = "/app") -> FastAPI: |
| 53 | + """Auto-discover and load the generated LB handler's FastAPI app. |
60 | 54 |
|
61 | | - user_module = importlib.util.module_from_spec(spec) |
62 | | - spec.loader.exec_module(user_module) |
| 55 | + Derives handler path from FLASH_RESOURCE_NAME and imports the module. |
63 | 56 |
|
64 | | - # Get the FastAPI app from user's module |
65 | | - if not hasattr(user_module, app_variable): |
66 | | - raise AttributeError(f"Module {main_file} does not have '{app_variable}' attribute") |
| 57 | + Args: |
| 58 | + handler_dir: Base directory for handler files (default /app). |
67 | 59 |
|
68 | | - app = getattr(user_module, app_variable) |
| 60 | + Returns: |
| 61 | + FastAPI app from the generated handler. |
69 | 62 |
|
70 | | - if not isinstance(app, FastAPI): |
71 | | - raise TypeError( |
72 | | - f"Expected FastAPI instance, got {type(app).__name__} for {app_variable}" |
73 | | - ) |
| 63 | + Raises: |
| 64 | + RuntimeError: If FLASH_RESOURCE_NAME is not set or resolves outside handler_dir. |
| 65 | + FileNotFoundError: If the handler file does not exist. |
| 66 | + ImportError: If the handler module cannot produce a valid spec. |
| 67 | + AttributeError: If the handler module lacks an 'app' attribute. |
| 68 | + TypeError: If the 'app' attribute is not a FastAPI instance. |
| 69 | + """ |
| 70 | + resource_name = os.getenv("FLASH_RESOURCE_NAME") |
| 71 | + if not resource_name: |
| 72 | + raise RuntimeError("FLASH_RESOURCE_NAME not set. Cannot discover generated LB handler.") |
| 73 | + |
| 74 | + handler_file = f"{handler_dir}/handler_{resource_name}.py" |
| 75 | + |
| 76 | + handler_path = Path(handler_file) |
| 77 | + if not handler_path.resolve().is_relative_to(Path(handler_dir).resolve()): |
| 78 | + raise RuntimeError(f"FLASH_RESOURCE_NAME '{resource_name}' resolves outside {handler_dir}") |
| 79 | + |
| 80 | + app_variable = "app" |
| 81 | + |
| 82 | + spec = importlib.util.spec_from_file_location("user_main", handler_file) |
| 83 | + if spec is None or spec.loader is None: |
| 84 | + raise ImportError(f"Cannot find or load {handler_file}") |
| 85 | + |
| 86 | + user_module = importlib.util.module_from_spec(spec) |
| 87 | + spec.loader.exec_module(user_module) |
74 | 88 |
|
75 | | - logger.info(f"Successfully imported FastAPI app '{app_variable}' from {main_file}") |
| 89 | + if not hasattr(user_module, app_variable): |
| 90 | + raise AttributeError(f"Module {handler_file} does not have '{app_variable}' attribute") |
| 91 | + |
| 92 | + discovered_app = getattr(user_module, app_variable) |
| 93 | + |
| 94 | + if not isinstance(discovered_app, FastAPI): |
| 95 | + raise TypeError( |
| 96 | + f"Expected FastAPI instance, got {type(discovered_app).__name__} for {app_variable}" |
| 97 | + ) |
| 98 | + |
| 99 | + return discovered_app |
| 100 | + |
| 101 | + |
| 102 | +is_lb_endpoint = _is_lb_endpoint() |
| 103 | + |
| 104 | +if is_lb_endpoint: |
| 105 | + # LB endpoint mode: Auto-discover generated handler from FLASH_RESOURCE_NAME |
| 106 | + try: |
| 107 | + app = _discover_lb_app() |
| 108 | + logger.info("Successfully imported FastAPI app for LB endpoint") |
76 | 109 |
|
77 | 110 | # Add /ping endpoint if not already present |
78 | | - # Check if /ping route already exists to avoid adding a duplicate health check endpoint |
79 | 111 | ping_exists = any(getattr(route, "path", None) == "/ping" for route in app.routes) |
80 | 112 |
|
81 | 113 | if not ping_exists: |
82 | 114 |
|
83 | 115 | @app.get("/ping") |
84 | | - async def ping_mothership() -> Dict[str, Any]: |
85 | | - """Health check endpoint for mothership (added by framework).""" |
| 116 | + async def ping_lb() -> Dict[str, Any]: |
| 117 | + """Health check endpoint for LB (added by framework).""" |
86 | 118 | return { |
87 | 119 | "status": "healthy", |
88 | | - "endpoint": "mothership", |
| 120 | + "endpoint": "lb", |
89 | 121 | "id": os.getenv("RUNPOD_ENDPOINT_ID", "unknown"), |
90 | 122 | } |
91 | 123 |
|
92 | 124 | logger.info("Added /ping endpoint to user's FastAPI app") |
93 | 125 |
|
94 | 126 | except Exception as error: |
95 | | - logger.error(f"Failed to initialize mothership mode: {error}", exc_info=True) |
| 127 | + logger.error("Failed to initialize LB endpoint mode: %s", error, exc_info=True) |
96 | 128 | raise |
97 | 129 |
|
98 | 130 | else: |
99 | 131 | # Queue-based mode: Create generic Load Balancer handler app |
100 | 132 | app = FastAPI(title="Load Balancer Handler") |
101 | | - logger.info("Queue-based mode: Using generic Load Balancer handler") |
| 133 | + logger.info("QB endpoint mode: Using generic Load Balancer handler") |
102 | 134 |
|
103 | 135 |
|
104 | 136 | # Queue-based mode endpoints |
105 | | -if not is_mothership: |
| 137 | +if not is_lb_endpoint: |
106 | 138 |
|
107 | 139 | @app.get("/ping") |
108 | 140 | async def ping() -> Dict[str, Any]: |
|
0 commit comments