Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
419 changes: 169 additions & 250 deletions CLAUDE.md

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions docs/Runtime_Execution_Paths.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,17 @@ graph TB

The handler automatically detects the deployment mode using environment variables:

| Environment | RUNPOD_POD_ID | FLASH_* vars | Mode Detected |
|-------------|---------------|--------------|---------------|
| Environment | RUNPOD_ENDPOINT_ID | FLASH_* vars | Mode Detected |
|-------------|-------------------|--------------|---------------|
| Local dev | ❌ Not set | ❌ Not set | Live Serverless only |
| Live Serverless | ✅ Set | ❌ Not set | Live Serverless |
| Flash Mothership | ✅ Set | ✅ FLASH_IS_MOTHERSHIP=true | Flash Deployed |
| Flash LB Endpoint | ✅ Set | ✅ FLASH_ENDPOINT_TYPE=lb | Flash Deployed |
| Flash QB Endpoint | ✅ Set | ✅ FLASH_ENDPOINT_TYPE=qb | Flash Deployed |
| Flash Child | ✅ Set | ✅ FLASH_RESOURCE_NAME | Flash Deployed |

Flash-specific environment variables:
- `FLASH_IS_MOTHERSHIP=true` - Set for mothership endpoints
- `FLASH_ENDPOINT_TYPE=lb` - Set for load-balanced endpoints
- `FLASH_ENDPOINT_TYPE=qb` - Set for queue-based endpoints
- `FLASH_RESOURCE_NAME` - Specifies resource config name

## Request Format Differences
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies = [
"huggingface_hub>=0.32.0",
"fastapi>=0.115.0",
"uvicorn[standard]>=0.34.0",
"runpod-flash",
"runpod-flash>=1.4.0",
]

[dependency-groups]
Expand Down
4 changes: 2 additions & 2 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@
"""Default timeout in seconds for cross-endpoint HTTP requests."""

DEFAULT_TARBALL_UNPACK_ATTEMPTS = 3
"""Number of times the mothership CPU will attempt to unpack the worker-flash tarball from mounted volume"""
"""Number of times the Flash-deployed endpoint will attempt to unpack the worker-flash tarball from mounted volume."""
DEFAULT_TARBALL_UNPACK_INTERVAL = 30
"""Time in seconds mothership CPU endpoint will wait between tarball unpack attempts"""
"""Time in seconds the Flash-deployed endpoint will wait between tarball unpack attempts."""
138 changes: 122 additions & 16 deletions src/handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import Dict, Any
import importlib.util
import logging
import os
from pathlib import Path
from typing import Any, Dict, Optional

from runpod_flash.protos.remote_execution import FunctionRequest, FunctionResponse
from remote_executor import RemoteExecutor
from logger import setup_logging
from unpack_volume import maybe_unpack

Expand All @@ -12,25 +14,129 @@
# This is a no-op for Live Serverless and local development
maybe_unpack()

logger = logging.getLogger(__name__)

async def handler(event: Dict[str, Any]) -> Dict[str, Any]:
"""
RunPod serverless function handler with dependency installation.

def _load_generated_handler() -> Optional[Any]:
"""Load Flash-generated handler if available (deployed QB mode).

Checks for a handler_<resource_name>.py file generated by the flash
build pipeline. These handlers accept plain JSON input without
FunctionRequest/cloudpickle serialization.

Returns:
Handler function if generated handler found, None otherwise.
"""
output: FunctionResponse
resource_name = os.getenv("FLASH_RESOURCE_NAME")
if not resource_name:
return None

handler_file = Path(f"/app/handler_{resource_name}.py")

if not handler_file.resolve().is_relative_to(Path("/app").resolve()):
logger.warning(
"FLASH_RESOURCE_NAME '%s' resolves outside /app. "
"Falling back to FunctionRequest handler.",
resource_name,
)
return None

if not handler_file.exists():
logger.warning(
"Generated handler file %s not found for resource '%s'. "
"The build artifact may be incomplete. "
"Falling back to FunctionRequest handler.",
handler_file,
resource_name,
)
return None

spec = importlib.util.spec_from_file_location(f"handler_{resource_name}", handler_file)
if not spec or not spec.loader:
logger.warning("Failed to create module spec for %s", handler_file)
return None

mod = importlib.util.module_from_spec(spec)
try:
executor = RemoteExecutor()
input_data = FunctionRequest(**event.get("input", {}))
output = await executor.ExecuteFunction(input_data)

except Exception as error:
output = FunctionResponse(
success=False,
error=f"Error in handler: {str(error)}",
spec.loader.exec_module(mod)
except ImportError as e:
logger.warning(
"Generated handler %s failed to import (missing dependency: %s). "
"Deploy with --use-local-flash to include latest runpod_flash. "
"Falling back to FunctionRequest handler.",
handler_file,
e,
)
return None
except SyntaxError as e:
logger.error(
"Generated handler %s has a syntax error: %s. "
"This indicates a bug in the flash build pipeline. "
"Falling back to FunctionRequest handler.",
handler_file,
e,
)
return None
except Exception as e:
logger.error(
"Generated handler %s failed to load unexpectedly: %s (%s). "
"Falling back to FunctionRequest handler.",
handler_file,
e,
type(e).__name__,
exc_info=True,
)
return None

generated = getattr(mod, "handler", None)
if generated is None:
logger.warning(
"Generated handler %s loaded but has no 'handler' attribute. "
"Ensure the flash build pipeline generates a 'handler' function. "
"Falling back to FunctionRequest handler.",
handler_file,
)
return None

if not callable(generated):
logger.warning(
"Generated handler %s has a 'handler' attribute but it is not callable (%s). "
"Falling back to FunctionRequest handler.",
handler_file,
type(generated).__name__,
)
return None

logger.info("Loaded generated handler from %s", handler_file)
return generated


# Try generated handler first (plain JSON mode for deployed QB endpoints)
_generated = _load_generated_handler()

if _generated:
handler = _generated
else:
# Fallback: original FunctionRequest handler (backward compatible)
from runpod_flash.protos.remote_execution import FunctionRequest, FunctionResponse
from remote_executor import RemoteExecutor

async def handler(event: Dict[str, Any]) -> Dict[str, Any]:
"""RunPod serverless function handler with dependency installation."""
output: FunctionResponse

try:
executor = RemoteExecutor()
input_data = FunctionRequest(**event.get("input", {}))
output = await executor.ExecuteFunction(input_data)

except Exception as error:
output = FunctionResponse(
success=False,
error=f"Error in handler: {str(error)}",
)

return output.model_dump() # type: ignore[no-any-return]
return output.model_dump() # type: ignore[no-any-return]


# Start the RunPod serverless handler (only available on RunPod platform)
Expand Down
104 changes: 68 additions & 36 deletions src/lb_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,26 @@
This handler provides a FastAPI application for the Load Balancer runtime.
It supports:
- /ping: Health check endpoint (required by RunPod Load Balancer)
- /execute: Remote function execution via HTTP POST (queue-based mode)
- User's FastAPI app routes (mothership mode)
- /execute: Remote function execution via HTTP POST (QB endpoint mode)
- User's FastAPI app routes (LB endpoint mode)

The handler uses worker-flash's RemoteExecutor for function execution.

Mothership Mode (FLASH_IS_MOTHERSHIP=true):
- Imports user's FastAPI application from FLASH_MAIN_FILE
- Loads the app object from FLASH_APP_VARIABLE
LB Endpoint Mode (FLASH_ENDPOINT_TYPE=lb):
- Auto-discovers generated handler from FLASH_RESOURCE_NAME
- Loads handler_{resource_name}.py with FastAPI app
- Preserves all user routes and middleware
- Adds /ping health check endpoint

Queue-Based Mode (FLASH_IS_MOTHERSHIP not set or false):
QB Endpoint Mode (FLASH_ENDPOINT_TYPE not set or not "lb"):
- Creates generic FastAPI app with /execute endpoint
- Uses RemoteExecutor for function execution
"""

import importlib.util
import logging
import os
from pathlib import Path
from typing import Any, Dict

from fastapi import FastAPI
Expand All @@ -42,67 +43,98 @@
from runpod_flash.protos.remote_execution import FunctionRequest, FunctionResponse # noqa: E402
from remote_executor import RemoteExecutor # noqa: E402

# Determine mode based on environment variables
is_mothership = os.getenv("FLASH_IS_MOTHERSHIP") == "true"

if is_mothership:
# Mothership mode: Import user's FastAPI application
try:
main_file = os.getenv("FLASH_MAIN_FILE", "main.py")
app_variable = os.getenv("FLASH_APP_VARIABLE", "app")
def _is_lb_endpoint() -> bool:
"""Determine if this endpoint runs in LB mode (serves user FastAPI routes)."""
return os.getenv("FLASH_ENDPOINT_TYPE") == "lb"

logger.info(f"Mothership mode: Importing {app_variable} from {main_file}")

# Dynamic import of user's module
spec = importlib.util.spec_from_file_location("user_main", main_file)
if spec is None or spec.loader is None:
raise ImportError(f"Cannot find or load {main_file}")
def _discover_lb_app(handler_dir: str = "/app") -> FastAPI:
"""Auto-discover and load the generated LB handler's FastAPI app.

user_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(user_module)
Derives handler path from FLASH_RESOURCE_NAME and imports the module.

# Get the FastAPI app from user's module
if not hasattr(user_module, app_variable):
raise AttributeError(f"Module {main_file} does not have '{app_variable}' attribute")
Args:
handler_dir: Base directory for handler files (default /app).

app = getattr(user_module, app_variable)
Returns:
FastAPI app from the generated handler.

if not isinstance(app, FastAPI):
raise TypeError(
f"Expected FastAPI instance, got {type(app).__name__} for {app_variable}"
)
Raises:
RuntimeError: If FLASH_RESOURCE_NAME is not set or resolves outside handler_dir.
FileNotFoundError: If the handler file does not exist.
ImportError: If the handler module cannot produce a valid spec.
AttributeError: If the handler module lacks an 'app' attribute.
TypeError: If the 'app' attribute is not a FastAPI instance.
"""
resource_name = os.getenv("FLASH_RESOURCE_NAME")
if not resource_name:
raise RuntimeError("FLASH_RESOURCE_NAME not set. Cannot discover generated LB handler.")

handler_file = f"{handler_dir}/handler_{resource_name}.py"

handler_path = Path(handler_file)
if not handler_path.resolve().is_relative_to(Path(handler_dir).resolve()):
raise RuntimeError(f"FLASH_RESOURCE_NAME '{resource_name}' resolves outside {handler_dir}")

app_variable = "app"

spec = importlib.util.spec_from_file_location("user_main", handler_file)
if spec is None or spec.loader is None:
raise ImportError(f"Cannot find or load {handler_file}")

user_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(user_module)

logger.info(f"Successfully imported FastAPI app '{app_variable}' from {main_file}")
if not hasattr(user_module, app_variable):
raise AttributeError(f"Module {handler_file} does not have '{app_variable}' attribute")

discovered_app = getattr(user_module, app_variable)

if not isinstance(discovered_app, FastAPI):
raise TypeError(
f"Expected FastAPI instance, got {type(discovered_app).__name__} for {app_variable}"
)

return discovered_app


is_lb_endpoint = _is_lb_endpoint()

if is_lb_endpoint:
# LB endpoint mode: Auto-discover generated handler from FLASH_RESOURCE_NAME
try:
app = _discover_lb_app()
logger.info("Successfully imported FastAPI app for LB endpoint")

# Add /ping endpoint if not already present
# Check if /ping route already exists to avoid adding a duplicate health check endpoint
ping_exists = any(getattr(route, "path", None) == "/ping" for route in app.routes)

if not ping_exists:

@app.get("/ping")
async def ping_mothership() -> Dict[str, Any]:
"""Health check endpoint for mothership (added by framework)."""
async def ping_lb() -> Dict[str, Any]:
"""Health check endpoint for LB (added by framework)."""
return {
"status": "healthy",
"endpoint": "mothership",
"endpoint": "lb",
"id": os.getenv("RUNPOD_ENDPOINT_ID", "unknown"),
}

logger.info("Added /ping endpoint to user's FastAPI app")

except Exception as error:
logger.error(f"Failed to initialize mothership mode: {error}", exc_info=True)
logger.error("Failed to initialize LB endpoint mode: %s", error, exc_info=True)
raise

else:
# Queue-based mode: Create generic Load Balancer handler app
app = FastAPI(title="Load Balancer Handler")
logger.info("Queue-based mode: Using generic Load Balancer handler")
logger.info("QB endpoint mode: Using generic Load Balancer handler")


# Queue-based mode endpoints
if not is_mothership:
if not is_lb_endpoint:

@app.get("/ping")
async def ping() -> Dict[str, Any]:
Expand Down
17 changes: 13 additions & 4 deletions src/manifest_reconciliation.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def is_flash_deployment() -> bool:
endpoint_id = os.getenv("RUNPOD_ENDPOINT_ID")
is_flash = any(
[
os.getenv("FLASH_IS_MOTHERSHIP") == "true",
os.getenv("FLASH_ENDPOINT_TYPE") in ("lb", "qb"),
os.getenv("FLASH_RESOURCE_NAME"),
]
)
Expand Down Expand Up @@ -78,8 +78,9 @@ def _is_manifest_stale(
if is_stale:
logger.debug(f"Manifest is stale: {age_seconds:.0f}s old (TTL: {ttl_seconds}s)")
return is_stale
except OSError:
return True # Error reading file, consider stale
except OSError as e:
logger.debug("Cannot stat manifest file %s: %s. Treating as stale.", manifest_path, e)
return True


async def _fetch_and_save_manifest(
Expand Down Expand Up @@ -112,8 +113,16 @@ async def _fetch_and_save_manifest(
logger.info("Manifest refreshed from State Manager")
return True

except (OSError, ConnectionError, TimeoutError) as e:
logger.warning("Failed to refresh manifest from State Manager: %s", e)
return False
except Exception as e:
logger.warning(f"Failed to refresh manifest from State Manager: {e}")
logger.error(
"Unexpected error refreshing manifest from State Manager: %s (%s)",
e,
type(e).__name__,
exc_info=True,
)
return False


Expand Down
Loading
Loading