Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ Never block the event loop — it freezes health checks, HPA scaling, and all co
- `stripe_executor` (4w) — Stripe API calls
- `sync_executor` (16w) — sync endpoint pipeline work, parent calls that fan out to storage_executor
- `postprocess_executor` (24w) — post-conversation processing, coordinator functions
- `storage_executor` (96w) — GCS uploads/downloads, audio chunk I/O (fan-out gated by semaphores: 32 global chunks, 8 per-call window, 4 concurrent precache files)
- `storage_executor` (128w) — GCS uploads/downloads, audio chunk I/O (fan-out gated by semaphores: 32 global chunks, 8 per-call window, 4 concurrent precache files)
- **Deadlock prevention — 4 rules:**
1. **Worker threads are leaf operations only.** Never `.result()` on another pool from inside a worker thread. If pool A thread submits to pool B and calls `.result()`, and vice versa, both pools deadlock.
2. **Orchestration stays in async code.** The async handler coordinates via `await run_blocking(pool, fn)` — sequentially or with `asyncio.gather`. The event loop never blocks, pools stay independent.
Expand Down
51 changes: 38 additions & 13 deletions backend/tests/unit/test_action_item_date_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import types
from datetime import datetime, timedelta, timezone
from pathlib import Path
from zoneinfo import ZoneInfo
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from unittest.mock import MagicMock, patch

import pytest
Expand Down Expand Up @@ -62,6 +62,17 @@ def _load_module_from_file(module_name, file_path):
return mod


def _zoneinfo_for_test(key):
try:
return ZoneInfo(key)
except (ZoneInfoNotFoundError, KeyError):
if key == "UTC":
return timezone.utc
if key == "Asia/Kolkata":
return timezone(timedelta(hours=5, minutes=30), key)
raise
Comment on lines +65 to +73

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Overly-broad exception catch in _zoneinfo_for_test

except Exception swallows any error that ZoneInfo might raise, not just the expected ZoneInfoNotFoundError/KeyError. If someone passes a known key like "UTC" but ZoneInfo fails for an unrelated reason (e.g., corrupted module state), the function silently returns timezone.utc instead of surfacing the real problem. The fallback raise at the bottom only helps for unknown keys. Using except (ZoneInfoNotFoundError, KeyError) (both from zoneinfo) would constrain the catch to the specific Windows/no-tzdata scenario this helper targets.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in fb41d75ab. _zoneinfo_for_test now imports ZoneInfoNotFoundError and only catches (ZoneInfoNotFoundError, KeyError), so unexpected ZoneInfo failures are no longer swallowed by the Windows fallback.

Revalidated on the Windows backend venv:

  • python -m pytest tests\unit\test_action_item_date_validation.py -q -> 26 passed
  • python -m black --line-length 120 --skip-string-normalization tests\unit\test_action_item_date_validation.py tests\unit\test_async_app_integrations.py --check
  • python -m py_compile tests\unit\test_action_item_date_validation.py tests\unit\test_async_app_integrations.py



# ---------------------------------------------------------------------------
# Stub heavy dependencies
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -100,13 +111,19 @@ def _load_module_from_file(module_name, file_path):
}
)
action_items_db.update_action_item = MagicMock(return_value=True)
_stub_package("database")
database_pkg = _stub_package("database")
database_pkg.__path__ = [str(BACKEND_DIR / "database")]
database_pkg.action_items = action_items_db

# Stub notifications
notif_mod = _stub_module("utils.notifications")
notif_mod.send_action_item_completed_notification = MagicMock()
notif_mod.send_action_item_created_notification = MagicMock()
notif_mod.send_action_item_data_message = MagicMock()
notif_mod.send_app_review_reply_notification = MagicMock()
notif_mod.send_new_app_review_notification = MagicMock()
notif_mod.send_notification = MagicMock()
notif_mod.sync_action_item_reminder = MagicMock()

# Stub langchain
langchain_core = _stub_package("langchain_core")
Expand Down Expand Up @@ -136,12 +153,17 @@ def fake_tool(func=None, **kwargs):
# Stub pydantic (already installed, just need BaseModel/Field accessible)
# pydantic is real, no stub needed

# Stub utils packages
_stub_package("utils")
_stub_package("utils.retrieval")
_stub_package("utils.retrieval.tools")
_stub_package("utils.llm")
_stub_package("utils.conversations")
# Stub utils packages while preserving imports for unstubbed submodules in later tests.
utils_pkg = _stub_package("utils")
utils_pkg.__path__ = [str(BACKEND_DIR / "utils")]
retrieval_pkg = _stub_package("utils.retrieval")
retrieval_pkg.__path__ = [str(BACKEND_DIR / "utils" / "retrieval")]
tools_pkg = _stub_package("utils.retrieval.tools")
tools_pkg.__path__ = [str(BACKEND_DIR / "utils" / "retrieval" / "tools")]
llm_pkg = _stub_package("utils.llm")
llm_pkg.__path__ = [str(BACKEND_DIR / "utils" / "llm")]
conversations_pkg = _stub_package("utils.conversations")
conversations_pkg.__path__ = [str(BACKEND_DIR / "utils" / "conversations")]

# Stub utils.retrieval.agentic
import contextvars
Expand All @@ -160,6 +182,7 @@ def fake_tool(func=None, **kwargs):
llm_clients_stub.llm_high = MagicMock()
llm_clients_stub.llm_medium_experiment = MagicMock()
llm_clients_stub.get_llm = MagicMock(return_value=MagicMock())
llm_clients_stub.embeddings = MagicMock()

# Load models first
_stub_package("models")
Expand Down Expand Up @@ -583,7 +606,9 @@ def _run(self, due_at, tz):
mock_llm.__or__ = MagicMock(return_value=mock_chain)
with patch.object(conv_proc, 'get_llm', return_value=mock_llm), patch.object(
conv_proc, 'PydanticOutputParser'
) as mock_parser_cls, patch.object(conv_proc, 'ChatPromptTemplate') as mock_prompt_cls:
) as mock_parser_cls, patch.object(conv_proc, 'ChatPromptTemplate') as mock_prompt_cls, patch.object(
conv_proc, 'ZoneInfo', side_effect=_zoneinfo_for_test
):
mock_parser = MagicMock()
mock_parser.get_format_instructions.return_value = "format"
mock_parser_cls.return_value = mock_parser
Expand All @@ -600,9 +625,9 @@ def _run(self, due_at, tz):

def test_naive_local_due_converted_to_utc_for_ist(self):
# LLM emits naive local 10:00 IST tomorrow -> server stores 04:30 UTC (the #7059 bug).
naive_local = (datetime.now(timezone.utc).astimezone(ZoneInfo("Asia/Kolkata")) + timedelta(days=1)).replace(
hour=10, minute=0, second=0, microsecond=0, tzinfo=None
)
naive_local = (
datetime.now(timezone.utc).astimezone(_zoneinfo_for_test("Asia/Kolkata")) + timedelta(days=1)
).replace(hour=10, minute=0, second=0, microsecond=0, tzinfo=None)
result, _ = self._run(naive_local, tz="Asia/Kolkata")
assert len(result) == 1 and result[0].due_at is not None
due = result[0].due_at
Expand Down Expand Up @@ -646,7 +671,7 @@ def test_aware_datetime_is_normalized_to_utc(self):
assert d1 is not None and d1.utcoffset() == timedelta(0)
assert (d1.hour, d1.minute) == (4, 30)
# (b) aware +05:30 value is converted to UTC, not trusted as-is
aware_ist = base.replace(hour=10, minute=0, second=0, microsecond=0, tzinfo=ZoneInfo("Asia/Kolkata"))
aware_ist = base.replace(hour=10, minute=0, second=0, microsecond=0, tzinfo=_zoneinfo_for_test("Asia/Kolkata"))
r2, _ = self._run(aware_ist, tz="UTC")
d2 = r2[0].due_at
assert d2 is not None and d2.utcoffset() == timedelta(0)
Expand Down
6 changes: 6 additions & 0 deletions backend/tests/unit/test_apps_review_reply_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ def exec_module(self, module):
pass


_preserved_stub_modules = {
_n: sys.modules[_n] for _n in list(sys.modules) if any(_n == p or _n.startswith(p + '.') for p in _STUB)
}
for _n in _preserved_stub_modules:
sys.modules.pop(_n, None)
_finder = _Finder()
sys.meta_path.insert(0, _finder)
try:
Expand All @@ -73,6 +78,7 @@ def exec_module(self, module):
for _n in list(sys.modules):
if any(_n == p or _n.startswith(p + '.') for p in _STUB):
sys.modules.pop(_n, None)
sys.modules.update(_preserved_stub_modules)

from fastapi import HTTPException # noqa: E402

Expand Down
20 changes: 20 additions & 0 deletions backend/tests/unit/test_async_app_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import sys
import types
import functools
from unittest.mock import MagicMock, AsyncMock, patch

import pytest
Expand Down Expand Up @@ -39,6 +40,7 @@
"llm_usage",
"chat",
"goals",
"webhook_health",
]:
mod = types.ModuleType(f"database.{submod}")
sys.modules.setdefault(f"database.{submod}", mod)
Expand All @@ -65,6 +67,11 @@
sys.modules["database.notifications"].get_mentor_notification_frequency = MagicMock(return_value=0)
sys.modules["database.conversations"].get_conversations_by_id = MagicMock(return_value=[])
sys.modules["database.goals"].get_user_goals = MagicMock(return_value=[])
sys.modules["database.users"].get_user_language_preference = MagicMock(return_value="en")
sys.modules["database.webhook_health"].record_app_webhook_failure = MagicMock(return_value=0)
sys.modules["database.webhook_health"].record_app_webhook_success = MagicMock()
sys.modules["database.webhook_health"].is_app_webhook_disabled = MagicMock(return_value=False)
sys.modules["database.webhook_health"].disable_app_in_firestore = MagicMock()

for name in [
"utils.apps",
Expand All @@ -78,12 +85,14 @@
"utils.mentor_notifications",
"utils.log_sanitizer",
"utils.http_client",
"utils.subscription",
]:
if name not in sys.modules:
sys.modules[name] = types.ModuleType(name)

sys.modules["utils.apps"].get_available_apps = MagicMock(return_value=[])
sys.modules["utils.notifications"].send_notification = MagicMock()
sys.modules["utils.subscription"].is_trial_paywalled = MagicMock(return_value=False)
sys.modules["utils.llm.clients"].generate_embedding = MagicMock(return_value=[0] * 3072)
sys.modules["utils.mentor_notifications"].process_mentor_notification = MagicMock(return_value=None)
sys.modules["utils.log_sanitizer"].sanitize = MagicMock(side_effect=lambda x: x)
Expand Down Expand Up @@ -131,6 +140,7 @@ def _noop_track(uid, feature):
_mock_cb.record_failure = MagicMock()
_http_mod.get_webhook_circuit_breaker = MagicMock(return_value=_mock_cb)
_http_mod.get_webhook_semaphore = MagicMock(return_value=_asyncio.Semaphore(64))
_http_mod.get_maps_semaphore = MagicMock(return_value=_asyncio.Semaphore(64))
_http_mod.latest_wins_start = MagicMock(return_value=1)
_http_mod.latest_wins_check = MagicMock(return_value=True)

Expand All @@ -140,12 +150,22 @@ def _noop_track(uid, feature):

if "utils.executors" not in sys.modules:
sys.modules["utils.executors"] = types.ModuleType("utils.executors")
sys.modules["utils.executors"].db_executor = _TPE(max_workers=2, thread_name_prefix="test-db")
sys.modules["utils.executors"].critical_executor = _TPE(max_workers=2, thread_name_prefix="test-critical")
sys.modules["utils.executors"].storage_executor = _TPE(max_workers=2, thread_name_prefix="test-storage")


async def _run_blocking(_executor, fn, *args, **kwargs):
loop = _asyncio.get_running_loop()
return await loop.run_in_executor(_executor, functools.partial(fn, *args, **kwargs))


sys.modules["utils.executors"].run_blocking = _run_blocking
Comment on lines +158 to +163

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 _run_blocking stub executes fn in the calling thread, not in the executor

The stub bypasses executor dispatch entirely. If fn is itself synchronous but relies on the GIL being released (e.g., for lock-ordering), or if future production code passes an async callable by mistake, the behaviour under the stub diverges silently from production. A slightly safer alternative that still avoids real executor overhead is loop.run_in_executor(None, functools.partial(fn, *args, **kwargs)) — that preserves executor semantics in the event loop without spawning threads. Minor for unit tests, but worth noting since run_blocking is a shared abstraction used across many production paths.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in fb41d75ab. The stub now preserves executor semantics with:

loop = _asyncio.get_running_loop()
return await loop.run_in_executor(_executor, functools.partial(fn, *args, **kwargs))

It also uses real ThreadPoolExecutor instances for the executor stubs so run_in_executor can call submit() normally.

Revalidated on the Windows backend venv:

  • python -m pytest tests\unit\test_async_app_integrations.py -q -> 9 passed
  • python -m black --line-length 120 --skip-string-normalization tests\unit\test_action_item_date_validation.py tests\unit\test_async_app_integrations.py --check
  • python -m py_compile tests\unit\test_action_item_date_validation.py tests\unit\test_async_app_integrations.py


import importlib

app_integrations = importlib.import_module("utils.app_integrations")
sys.modules["utils"].mentor_notifications = sys.modules["utils.mentor_notifications"]


def _make_app(app_id: str, webhook_url: str, triggers_realtime=False, triggers_audio=False, uid=None):
Expand Down
5 changes: 5 additions & 0 deletions backend/tests/unit/test_async_geocoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
from models.conversation import Geolocation
from utils.conversations.location import async_get_google_maps_location

if "utils" in sys.modules and "utils.conversations" in sys.modules:
sys.modules["utils"].conversations = sys.modules["utils.conversations"]
if "utils.conversations" in sys.modules and "utils.conversations.location" in sys.modules:
sys.modules["utils.conversations"].location = sys.modules["utils.conversations.location"]


class TestAsyncCacheHit:
"""When Redis has cached data, return without calling Google API."""
Expand Down
27 changes: 19 additions & 8 deletions backend/tests/unit/test_async_http_infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,20 @@
"""

import asyncio
import sys
import time
from unittest.mock import patch

import pytest

for _module_name in ("utils.http_client", "utils.executors"):
sys.modules.pop(_module_name, None)
_utils_pkg = sys.modules.get("utils")
if _utils_pkg is not None:
for _attr in ("http_client", "executors"):
if hasattr(_utils_pkg, _attr):
delattr(_utils_pkg, _attr)

from utils.http_client import (
WebhookCircuitBreaker,
get_webhook_circuit_breaker,
Expand Down Expand Up @@ -386,9 +395,9 @@ def test_critical_executor_has_8_workers(self):
"""critical_executor documented as 8 workers for latency-sensitive work."""
assert critical_executor._max_workers == 8

def test_storage_executor_has_96_workers(self):
"""storage_executor sized for 96 workers to handle concurrent private cloud uploads (#7376)."""
assert storage_executor._max_workers == 96
def test_storage_executor_has_128_workers(self):
"""storage_executor sized for 128 workers to handle concurrent private cloud uploads."""
assert storage_executor._max_workers == 128


class TestNotificationWebhookWiring:
Expand All @@ -402,7 +411,7 @@ def test_send_summary_calls_storage_executor_with_asyncio_run(self):

# Read source to verify pattern without triggering Firestore imports
backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
with open(os.path.join(backend_dir, 'utils', 'other', 'notifications.py')) as f:
with open(os.path.join(backend_dir, 'utils', 'other', 'notifications.py'), encoding='utf-8') as f:
src = f.read()

# Verify the exact wiring pattern (postprocess_executor, not storage_executor, #7387)
Expand All @@ -420,7 +429,7 @@ def test_pusher_uses_deque_with_maxlen(self):
import os

backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
with open(os.path.join(backend_dir, 'routers', 'pusher.py')) as f:
with open(os.path.join(backend_dir, 'routers', 'pusher.py'), encoding='utf-8') as f:
src = f.read()

assert 'deque(maxlen=PRIVATE_CLOUD_QUEUE_MAX_SIZE)' in src
Expand All @@ -432,7 +441,7 @@ def test_queue_max_size_is_20(self):
import os

backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
with open(os.path.join(backend_dir, 'routers', 'pusher.py')) as f:
with open(os.path.join(backend_dir, 'routers', 'pusher.py'), encoding='utf-8') as f:
src = f.read()

tree = ast.parse(src)
Expand All @@ -450,7 +459,7 @@ def test_overflow_warning_at_all_enqueue_points(self):
import os

backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
with open(os.path.join(backend_dir, 'routers', 'pusher.py')) as f:
with open(os.path.join(backend_dir, 'routers', 'pusher.py'), encoding='utf-8') as f:
src = f.read()

# Count occurrences of the overflow warning pattern
Expand Down Expand Up @@ -521,7 +530,9 @@ def test_allow_request_updates_access_time(self):
_webhook_circuit_breakers.clear()
cb = get_webhook_circuit_breaker('https://test.test/hook')
old_access = cb._last_access_time
time.sleep(0.01)
deadline = time.monotonic() + 0.1
while time.monotonic() <= old_access and time.monotonic() < deadline:
time.sleep(0.001)
cb.allow_request()
assert cb._last_access_time > old_access
_webhook_circuit_breakers.clear()
18 changes: 14 additions & 4 deletions backend/tests/unit/test_async_webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@

_backend_dir = os.path.join(os.path.dirname(__file__), '..', '..')

for mod_name in ["database", "database.notifications", "database.users", "database.folders", "database.conversations"]:
for mod_name in [
"database",
"database.notifications",
"database.users",
"database.folders",
"database.conversations",
"database.webhook_health",
]:
if mod_name not in sys.modules:
sys.modules[mod_name] = types.ModuleType(mod_name)
if mod_name == "database":
Expand All @@ -42,6 +49,9 @@
sys.modules["database.users"].get_people_by_ids = MagicMock(return_value=[])
sys.modules["database.folders"].get_folders = MagicMock(return_value=[])
sys.modules["database.conversations"].get_conversations = MagicMock(return_value=[])
sys.modules["database.webhook_health"].record_dev_webhook_failure = MagicMock(return_value=False)
sys.modules["database.webhook_health"].record_dev_webhook_success = MagicMock()
sys.modules["database.webhook_health"]._DEV_FAILURE_THRESHOLD = 100

if "utils.notifications" not in sys.modules:
sys.modules["utils.notifications"] = types.ModuleType("utils.notifications")
Expand Down Expand Up @@ -203,13 +213,13 @@ class level (to avoid heavy transitive deps).
@staticmethod
def _read_webhooks_source() -> str:
webhooks_path = os.path.join(os.path.dirname(__file__), '..', '..', 'utils', 'webhooks.py')
with open(webhooks_path) as f:
with open(webhooks_path, encoding='utf-8') as f:
return f.read()

@staticmethod
def _parse_webhooks_ast():
webhooks_path = os.path.join(os.path.dirname(__file__), '..', '..', 'utils', 'webhooks.py')
with open(webhooks_path) as f:
with open(webhooks_path, encoding='utf-8') as f:
return ast.parse(f.read())

def test_conversation_created_webhook_is_async(self):
Expand Down Expand Up @@ -357,7 +367,7 @@ class TestSendSummaryNotificationWiresSummaryJson:

def test_notifications_passes_summary_data_as_summary_json(self):
path = os.path.join(os.path.dirname(__file__), '..', '..', 'utils', 'other', 'notifications.py')
with open(path) as f:
with open(path, encoding='utf-8') as f:
src = f.read()

assert 'day_summary_webhook(uid, str(summary_data), summary_data)' in src, (
Expand Down
Loading
Loading