Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
376 changes: 159 additions & 217 deletions CLAUDE.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/cache_sync_manager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import logging
import asyncio
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Optional
from rp_logger_adapter import get_flash_logger
from constants import NAMESPACE, CACHE_DIR, VOLUME_CACHE_PATH
from subprocess_utils import run_logged_subprocess

Expand All @@ -13,7 +13,7 @@ class CacheSyncManager:
"""Manages async fire-and-forget cache synchronization to network volume."""

def __init__(self):
self.logger = logging.getLogger(f"{NAMESPACE}.{__name__.split('.')[-1]}")
self.logger = get_flash_logger(f"{NAMESPACE}.{__name__.split('.')[-1]}")
self._should_sync_cached: Optional[bool] = None
self._endpoint_id = os.environ.get("RUNPOD_ENDPOINT_ID")
self._baseline_time: Optional[float] = None
Expand Down
4 changes: 2 additions & 2 deletions src/dependency_installer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import logging
import asyncio
import platform
from typing import List

from runpod_flash.protos.remote_execution import FunctionResponse
from rp_logger_adapter import get_flash_logger
from constants import LARGE_SYSTEM_PACKAGES, NAMESPACE
from subprocess_utils import run_logged_subprocess

Expand All @@ -13,7 +13,7 @@ class DependencyInstaller:
"""Handles installation of system and Python dependencies."""

def __init__(self):
self.logger = logging.getLogger(f"{NAMESPACE}.{__name__.split('.')[-1]}")
self.logger = get_flash_logger(f"{NAMESPACE}.{__name__.split('.')[-1]}")
self._nala_available = None # Cache nala availability check
self._is_docker = None # Cache Docker environment detection

Expand Down
4 changes: 2 additions & 2 deletions src/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

from runpod_flash.protos.remote_execution import FunctionRequest, FunctionResponse
from remote_executor import RemoteExecutor
from logger import setup_logging
from rp_logger_adapter import setup_flash_logging
from unpack_volume import maybe_unpack

# Initialize logging configuration
setup_logging()
setup_flash_logging()

# Unpack Flash deployment artifacts if running in Flash mode
# This is a no-op for Live Serverless and local development
Expand Down
10 changes: 7 additions & 3 deletions src/lb_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@

from fastapi import FastAPI

from logger import setup_logging
from rp_logger_adapter import setup_flash_logging, get_flash_logger
from unpack_volume import maybe_unpack

# Suppress noisy third-party loggers (matches runpod-python pattern)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("uvicorn").setLevel(logging.WARNING)

# Initialize logging configuration
setup_logging()
logger = logging.getLogger(__name__)
setup_flash_logging()
logger = get_flash_logger(__name__)
Comment on lines +37 to +38
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description mentions "Suppress urllib3 and uvicorn loggers to reduce console noise" and "Set to WARNING level in lb_handler at module import time", but this functionality is not implemented in the code changes. There is no code that suppresses these loggers or sets their levels to WARNING.

Copilot uses AI. Check for mistakes.

# Unpack Flash deployment artifacts if running in Flash mode
# This is a no-op for Live Serverless and local development
Expand Down
166 changes: 86 additions & 80 deletions src/log_streamer.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,80 @@
"""
Centralized log streaming system for capturing and streaming logs to FunctionResponse.stdout.
Centralized log streaming system for capturing stdout to FunctionResponse.stdout.

This module provides thread-safe log buffering and streaming capabilities to ensure
This module provides thread-safe output buffering and streaming capabilities to ensure
all system logs (dependency installation, workspace setup, etc.) are visible in the
remote execution response.
remote execution response. It captures stdout directly rather than using logging handlers,
since RunPodLogger uses print() internally.
"""

import logging
import sys
import threading
from collections import deque
from typing import Optional, Deque, Callable

from logger import get_log_format

class LogCapturingWriter:
"""
Write-through stdout wrapper that captures output while maintaining console visibility.

This class intercepts stdout writes, buffers complete lines, and forwards all output
to the original stdout.
"""

def __init__(self, original_stdout, log_streamer: "LogStreamer"):
"""
Initialize the capturing writer.

Args:
original_stdout: The original sys.stdout
log_streamer: The LogStreamer instance to buffer lines to
"""
self.original_stdout = original_stdout
self.log_streamer = log_streamer
self._line_buffer = ""
self._lock = threading.Lock()

def write(self, text: str) -> int:
"""
Write text to stdout, capturing complete lines.

Args:
text: Text to write

Returns:
Number of characters written
"""
with self._lock:
# Write to original stdout immediately (write-through)
self.original_stdout.write(text)

# Buffer incomplete lines
self._line_buffer += text

# Process complete lines
while "\n" in self._line_buffer:
line, self._line_buffer = self._line_buffer.split("\n", 1)
if line: # Don't add empty lines
self.log_streamer.add_log_entry(line)

return len(text)

def flush(self) -> None:
"""Flush both the capturing writer and original stdout."""
with self._lock:
self.original_stdout.flush()

def isatty(self) -> bool:
"""Check if original stdout is a TTY."""
try:
return bool(self.original_stdout.isatty())
except (AttributeError, TypeError):
return False


class LogStreamer:
"""
Thread-safe log streaming system that captures logs and makes them available
for streaming to FunctionResponse.stdout.
Thread-safe log streaming system that captures stdout and buffers complete lines.
"""

def __init__(self, max_buffer_size: int = 1000):
Expand All @@ -29,69 +86,53 @@ def __init__(self, max_buffer_size: int = 1000):
"""
self._buffer: Deque[str] = deque(maxlen=max_buffer_size)
self._lock = threading.Lock()
self._handler: Optional[StreamingHandler] = None
self._original_level: Optional[int] = None
self._writer: Optional[LogCapturingWriter] = None
self._original_stdout: Optional[object] = None
self._callback: Optional[Callable[[str], None]] = None

def start_streaming(
self,
level: int = logging.INFO,
level: int = 20, # INFO level (unused, kept for compatibility)
callback: Optional[Callable[[str], None]] = None,
) -> None:
"""
Start capturing logs and streaming them to buffer.
Start capturing stdout.

Args:
level: Minimum log level to capture (DEBUG, INFO, WARNING, ERROR)
callback: Optional callback function called for each log entry
level: Log level (unused, kept for compatibility with previous API)
callback: Optional callback function called for each log line
"""
with self._lock:
if self._handler is not None:
if self._writer is not None:
return # Already streaming

self._callback = callback

# Create and configure streaming handler
self._handler = StreamingHandler(self)
self._handler.setLevel(level)

# Use same format as main logging
formatter = logging.Formatter(get_log_format(level))
self._handler.setFormatter(formatter)

# Add to root logger
root_logger = logging.getLogger()
self._original_level = root_logger.level
root_logger.addHandler(self._handler)

# Ensure we capture logs at the requested level
if root_logger.level > level:
root_logger.setLevel(level)
# Save original stdout and replace with capturing writer
self._original_stdout = sys.stdout
self._writer = LogCapturingWriter(self._original_stdout, self)
sys.stdout = self._writer

def stop_streaming(self) -> None:
"""Stop capturing logs and clean up handler."""
"""Stop capturing stdout and restore original."""
with self._lock:
if self._handler is None:
if self._writer is None:
return # Not streaming

# Remove handler from root logger
root_logger = logging.getLogger()
root_logger.removeHandler(self._handler)

# Restore original log level
if self._original_level is not None:
root_logger.setLevel(self._original_level)
# Restore original stdout
if self._original_stdout is not None:
sys.stdout = self._original_stdout

self._handler = None
self._original_level = None
self._writer = None
self._original_stdout = None
self._callback = None

def add_log_entry(self, log_entry: str) -> None:
"""
Add a log entry to the buffer.

Args:
log_entry: Formatted log entry to add
log_entry: Complete log line to add
"""
with self._lock:
self._buffer.append(log_entry)
Expand Down Expand Up @@ -141,41 +182,6 @@ def has_logs(self) -> bool:
return len(self._buffer) > 0


class StreamingHandler(logging.Handler):
"""
Custom logging handler that streams log records to a LogStreamer.
"""

def __init__(self, log_streamer: LogStreamer):
"""
Initialize the streaming handler.

Args:
log_streamer: LogStreamer instance to send logs to
"""
super().__init__()
self.log_streamer = log_streamer

def emit(self, record: logging.LogRecord) -> None:
"""
Emit a log record to the log streamer.

Args:
record: The log record to emit
"""
try:
# Format the log record
log_entry = self.format(record)

# Add to log streamer buffer
self.log_streamer.add_log_entry(log_entry)

except Exception:
# Don't let logging errors break the application
# This follows Python logging best practices
self.handleError(record)


# Global log streamer instance for convenience
_global_streamer: Optional[LogStreamer] = None
_streamer_lock = threading.Lock()
Expand All @@ -197,14 +203,14 @@ def get_global_log_streamer() -> LogStreamer:


def start_log_streaming(
level: int = logging.INFO, callback: Optional[Callable[[str], None]] = None
level: int = 20, callback: Optional[Callable[[str], None]] = None
) -> LogStreamer:
"""
Convenience function to start log streaming with the global streamer.

Args:
level: Minimum log level to capture
callback: Optional callback for each log entry
level: Minimum log level (unused, kept for compatibility)
callback: Optional callback for each log line

Returns:
The global LogStreamer instance
Expand Down
Loading
Loading