Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions surfsense_backend/app/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import gc
import logging
import time
from collections import defaultdict
Expand Down Expand Up @@ -212,27 +213,28 @@ def _enable_slow_callback_logging(threshold_sec: float = 0.5) -> None:

@asynccontextmanager
async def lifespan(app: FastAPI):
# Enable slow-callback detection (set PERF_DEBUG=1 env var to activate)
# Tune GC: lower gen-2 threshold so long-lived garbage is collected
# sooner (default 700/10/10 → 700/10/5). This reduces peak RSS
# with minimal CPU overhead.
gc.set_threshold(700, 10, 5)

_enable_slow_callback_logging(threshold_sec=0.5)
# Not needed if you setup a migration system like Alembic
await create_db_and_tables()
# Setup LangGraph checkpointer tables for conversation persistence
await setup_checkpointer_tables()
# Initialize LLM Router for Auto mode load balancing
initialize_llm_router()
# Initialize Image Generation Router for Auto mode load balancing
initialize_image_gen_router()
# Seed Surfsense documentation (with timeout so a slow embedding API
# doesn't block startup indefinitely and make the container unresponsive)
try:
await asyncio.wait_for(seed_surfsense_docs(), timeout=120)
except TimeoutError:
logging.getLogger(__name__).warning(
"Surfsense docs seeding timed out after 120s — skipping. "
"Docs will be indexed on the next restart."
)

log_system_snapshot("startup_complete")

yield
# Cleanup: close checkpointer connection on shutdown

await close_checkpointer()


Expand Down
9 changes: 8 additions & 1 deletion surfsense_backend/app/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1856,7 +1856,14 @@ def is_valid(self) -> bool:
return not self.is_expired and not self.is_revoked


engine = create_async_engine(DATABASE_URL)
engine = create_async_engine(
DATABASE_URL,
pool_size=30,
max_overflow=150,
pool_recycle=1800,
pool_pre_ping=True,
pool_timeout=30,
)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)


Expand Down
115 changes: 53 additions & 62 deletions surfsense_backend/app/routes/documents_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ async def create_documents_file_upload(

Requires DOCUMENTS_CREATE permission.
"""
import os
import tempfile
from datetime import datetime

from app.db import DocumentStatus
Expand All @@ -143,7 +145,6 @@ async def create_documents_file_upload(
from app.utils.document_converters import generate_unique_identifier_hash

try:
# Check permission
await check_permission(
session,
user,
Expand Down Expand Up @@ -179,69 +180,64 @@ async def create_documents_file_upload(
f"exceeds the {MAX_TOTAL_SIZE_BYTES // (1024 * 1024)} MB limit.",
)

# ===== Read all files concurrently to avoid blocking the event loop =====
async def _read_and_save(file: UploadFile) -> tuple[str, str, int]:
"""Read upload content and write to temp file off the event loop."""
content = await file.read()
file_size = len(content)
filename = file.filename or "unknown"

if file_size > MAX_FILE_SIZE_BYTES:
raise HTTPException(
status_code=413,
detail=f"File '{filename}' ({file_size / (1024 * 1024):.1f} MB) "
f"exceeds the {MAX_FILE_SIZE_BYTES // (1024 * 1024)} MB per-file limit.",
)

def _write_temp() -> str:
with tempfile.NamedTemporaryFile(
delete=False, suffix=os.path.splitext(filename)[1]
) as tmp:
tmp.write(content)
return tmp.name

temp_path = await asyncio.to_thread(_write_temp)
return temp_path, filename, file_size

saved_files = await asyncio.gather(*(_read_and_save(f) for f in files))

actual_total_size = sum(size for _, _, size in saved_files)
if actual_total_size > MAX_TOTAL_SIZE_BYTES:
for temp_path, _, _ in saved_files:
os.unlink(temp_path)
raise HTTPException(
status_code=413,
detail=f"Total upload size ({actual_total_size / (1024 * 1024):.1f} MB) "
f"exceeds the {MAX_TOTAL_SIZE_BYTES // (1024 * 1024)} MB limit.",
)

# ===== PHASE 1: Create pending documents for all files =====
created_documents: list[Document] = []
files_to_process: list[
tuple[Document, str, str]
] = [] # (document, temp_path, filename)
files_to_process: list[tuple[Document, str, str]] = []
skipped_duplicates = 0
duplicate_document_ids: list[int] = []
actual_total_size = 0

# ===== PHASE 1: Create pending documents for all files =====
# This makes ALL documents visible in the UI immediately with pending status
for file in files:
for temp_path, filename, file_size in saved_files:
try:
import os
import tempfile

# Save file to temp location
with tempfile.NamedTemporaryFile(
delete=False, suffix=os.path.splitext(file.filename or "")[1]
) as temp_file:
temp_path = temp_file.name

content = await file.read()
file_size = len(content)

if file_size > MAX_FILE_SIZE_BYTES:
os.unlink(temp_path)
raise HTTPException(
status_code=413,
detail=f"File '{file.filename}' ({file_size / (1024 * 1024):.1f} MB) "
f"exceeds the {MAX_FILE_SIZE_BYTES // (1024 * 1024)} MB per-file limit.",
)

actual_total_size += file_size
if actual_total_size > MAX_TOTAL_SIZE_BYTES:
os.unlink(temp_path)
raise HTTPException(
status_code=413,
detail=f"Total upload size ({actual_total_size / (1024 * 1024):.1f} MB) "
f"exceeds the {MAX_TOTAL_SIZE_BYTES // (1024 * 1024)} MB limit.",
)

with open(temp_path, "wb") as f:
f.write(content)

# Generate unique identifier for deduplication check
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.FILE, file.filename or "unknown", search_space_id
DocumentType.FILE, filename, search_space_id
)

# Check if document already exists (by unique identifier)
existing = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
if existing:
if DocumentStatus.is_state(existing.status, DocumentStatus.READY):
# True duplicate — content already indexed, skip
os.unlink(temp_path)
skipped_duplicates += 1
duplicate_document_ids.append(existing.id)
continue

# Existing document is stuck (failed/pending/processing)
# Reset it to pending and re-dispatch for processing
existing.status = DocumentStatus.pending()
existing.content = "Processing..."
existing.document_metadata = {
Expand All @@ -251,50 +247,45 @@ async def create_documents_file_upload(
}
existing.updated_at = get_current_timestamp()
created_documents.append(existing)
files_to_process.append(
(existing, temp_path, file.filename or "unknown")
)
files_to_process.append((existing, temp_path, filename))
continue

# Create pending document (visible immediately in UI via ElectricSQL)
document = Document(
search_space_id=search_space_id,
title=file.filename or "Uploaded File",
title=filename if filename != "unknown" else "Uploaded File",
document_type=DocumentType.FILE,
document_metadata={
"FILE_NAME": file.filename,
"FILE_NAME": filename,
"file_size": file_size,
"upload_time": datetime.now().isoformat(),
},
content="Processing...", # Placeholder until processed
content_hash=unique_identifier_hash, # Temporary, updated when ready
content="Processing...",
content_hash=unique_identifier_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=None,
status=DocumentStatus.pending(), # Shows "pending" in UI
status=DocumentStatus.pending(),
updated_at=get_current_timestamp(),
created_by_id=str(user.id),
)
session.add(document)
created_documents.append(document)
files_to_process.append(
(document, temp_path, file.filename or "unknown")
)
files_to_process.append((document, temp_path, filename))

except HTTPException:
raise
except Exception as e:
os.unlink(temp_path)
raise HTTPException(
status_code=422,
detail=f"Failed to process file {file.filename}: {e!s}",
detail=f"Failed to process file {filename}: {e!s}",
) from e

# Commit all pending documents - they appear in UI immediately via ElectricSQL
if created_documents:
await session.commit()
# Refresh to get generated IDs
for doc in created_documents:
await session.refresh(doc)

# ===== PHASE 2: Dispatch tasks for each file =====
# Each task will update document status: pending → processing → ready/failed
for document, temp_path, filename in files_to_process:
await dispatcher.dispatch_file_processing(
document_id=document.id,
Expand Down
4 changes: 4 additions & 0 deletions surfsense_backend/app/services/llm_router_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import time
from typing import Any

import litellm
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.exceptions import ContextOverflowError
from langchain_core.language_models import BaseChatModel
Expand All @@ -29,6 +30,9 @@

from app.utils.perf import get_perf_logger

litellm.json_logs = False
litellm.store_audit_logs = False

logger = logging.getLogger(__name__)

_CONTEXT_OVERFLOW_PATTERNS = re.compile(
Expand Down
7 changes: 7 additions & 0 deletions surfsense_backend/app/services/llm_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
# Configure litellm to automatically drop unsupported parameters
litellm.drop_params = True

# Memory controls: prevent unbounded internal accumulation
litellm.telemetry = False
litellm.cache = None
litellm.success_callback = []
litellm.failure_callback = []
litellm.input_callback = []

logger = logging.getLogger(__name__)


Expand Down
27 changes: 27 additions & 0 deletions surfsense_backend/app/tasks/celery_tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,28 @@
"""Celery tasks package."""

from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool

from app.config import config

_celery_engine = None
_celery_session_maker = None


def get_celery_session_maker() -> async_sessionmaker:
"""Return a shared async session maker for Celery tasks.

A single NullPool engine is created per worker process and reused
across all task invocations to avoid leaking engine objects.
"""
global _celery_engine, _celery_session_maker
if _celery_session_maker is None:
_celery_engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool,
echo=False,
)
_celery_session_maker = async_sessionmaker(
_celery_engine, expire_on_commit=False
)
return _celery_session_maker
19 changes: 1 addition & 18 deletions surfsense_backend/app/tasks/celery_tasks/connector_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
import logging
import traceback

from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool

from app.celery_app import celery_app
from app.config import config
from app.tasks.celery_tasks import get_celery_session_maker

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -42,20 +39,6 @@ def _handle_greenlet_error(e: Exception, task_name: str, connector_id: int) -> N
)


def get_celery_session_maker():
"""
Create a new async session maker for Celery tasks.
This is necessary because Celery tasks run in a new event loop,
and the default session maker is bound to the main app's event loop.
"""
engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool, # Don't use connection pooling for Celery tasks
echo=False,
)
return async_sessionmaker(engine, expire_on_commit=False)


@celery_app.task(name="index_slack_messages", bind=True)
def index_slack_messages_task(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@

from sqlalchemy import delete, select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.orm import selectinload
from sqlalchemy.pool import NullPool

from app.celery_app import celery_app
from app.config import config
from app.db import Document
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.tasks.celery_tasks import get_celery_session_maker
from app.utils.document_converters import (
create_document_chunks,
generate_document_summary,
Expand All @@ -21,16 +19,6 @@
logger = logging.getLogger(__name__)


def get_celery_session_maker():
"""Create async session maker for Celery tasks."""
engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool,
echo=False,
)
return async_sessionmaker(engine, expire_on_commit=False)


@celery_app.task(name="reindex_document", bind=True)
def reindex_document_task(self, document_id: int, user_id: str):
"""
Expand Down
Loading
Loading