Skip to content

Test/async migration all steps#671

Open
yashkrishan wants to merge 27 commits intomainfrom
test/async-migration-all-steps
Open

Test/async migration all steps#671
yashkrishan wants to merge 27 commits intomainfrom
test/async-migration-all-steps

Conversation

@yashkrishan
Copy link
Copy Markdown
Collaborator

@yashkrishan yashkrishan commented Mar 2, 2026

Summary by CodeRabbit

  • New Features

    • Async streaming and task orchestration for conversations, async DB/Redis paths, and non-blocking auth/GitHub flows with graceful async startup/shutdown.
  • Performance Improvements

    • Configurable worker pool/timeouts, improved task routing, and new benchmark scripts to measure latency and throughput.
  • Bug Fixes

    • Fork-safety for Git operations, safer crash handling, and more robust Redis/session and usage checks.
  • Documentation

    • Added an async migration plan and multiple benchmarking tools.

…Manager)

- Add AsyncRedisStreamManager (redis.asyncio) for FastAPI routes
- Add AsyncSessionService with scan_iter instead of keys()
- conversation_routing: async_ensure_unique_run_id, async start_celery_task_and_stream/wait
- Routers use async Redis/session deps; stop_generation uses async when injected
- Lifespan: create AsyncRedisStreamManager on startup, aclose on shutdown
- Sync RedisStreamManager.wait_for_task_start: require_running=True (correctness)
- ConversationController/Service accept optional async_redis_manager, async_session_service

Made-with: Cursor
- usage_service: get_usage_data(session, ...) and check_usage_limit(user_id, session) with async select/execute
- usage_controller: get_user_usage accepts session, passes to get_usage_data
- usage_router: inject get_async_db, pass async_db to get_user_usage
- conversations_router + api/router: pass async_db to check_usage_limit, remove dead if-not-checked blocks
- scripts/benchmark_usage_check.py: benchmark POST /conversations for before/after comparison

Made-with: Cursor
- Add AsyncChatHistoryService with async get_session_history, flush_message_buffer, save_partial_ai_message
- ConversationService.create() accepts optional async_db; when set, uses AsyncChatHistoryService on FastAPI path
- ConversationController passes async_db to create() for async history
- Celery path unchanged (no async_db) and continues using sync ChatHistoryService
- GitHub service: add connect/read timeouts and HTTPException for timeout/request errors

Made-with: Cursor
- Add AsyncShareChatService in access_service.py (share_chat, get_shared_emails, remove_access) using AsyncSession
- conversations_router: share/access endpoints use get_async_db and AsyncShareChatService
- Add AsyncUserService in user_service.py (get_user_by_uid, get_user_id_by_email, get_user_by_email, get_user_ids_by_emails, create_user, update_last_login)
- api/router: get_api_key_user uses AsyncUserService(async_db) for admin-secret user lookup
- Add scripts/benchmark_share_access.py for POST /share, GET /shared-emails, DELETE /access

Made-with: Cursor
…ned_user_repos)

2d:
- auth_router: inject get_async_db, use AsyncUserService(async_db) in signup, sso_login, get_my_account
- await get_user_by_uid, get_user_by_email, update_last_login

2e:
- github_service: get_combined_user_repos and get_repos_for_user accept optional async_session; use select() when provided
- code_provider_controller.get_user_repos and github_router pass async_db to get_combined_user_repos

Made-with: Cursor
…clients

Step 3 - Async Redis for non-streaming:
- tunnel_service: async Redis client, get/set_workspace_tunnel_record_async,
  list_user_tunnels_async (scan_iter); sync list_user_tunnels uses scan_iter;
  workspace record TTL
- tunnel_router: use get_workspace_tunnel_record_async
- github_service: lazy async Redis cache for get_project_structure_async
- branch_cache: get_branches_async; controller get_branch_list uses it
- benchmark_tunnel_github.py, docs/async-migration-plan.md

Step 4 - Async HTTP clients:
- auth_service: login_async with httpx.AsyncClient; sync login with httpx.Client
- auth_router: login calls login_async; send_slack_message uses httpx.AsyncClient
- parse_webhook_helper: send_slack_notification uses httpx.AsyncClient
- linear_client: execute_query_async, get_issue_async, update_issue_async,
  comment_create_async (timeout 10s/30s); Linear tools use async methods
- email_helper: resend.Emails.send via asyncio.to_thread
- posthog_helper: send_event uses run_in_executor when loop running
- auth_service_test: patch httpx.Client for login tests

Made-with: Cursor
- auth_service: use httpx.Timeout(connect=, read=) to fix TypeError
- linear_client: same httpx.Timeout fix
- auth_router: move imports to top (E402), consolidate AuthProviderCreate
- github_service: remove f-string prefix where no placeholders (F541)
- branch_cache: guard redis.from_url when redis_url is None
- parse_webhook_helper: use logger instead of print
- email_helper: set resend.api_key only when api_key is set

Made-with: Cursor
- auth_service: catch HTTPStatusError/HTTPError, raise HTTPException with status
- branch_cache: add aclose() for async and sync Redis clients
- github_service: close_github_async_redis_cache() and call from app shutdown
- github_service: guard project_structure Redis read/write with try/except

Made-with: Cursor
- auth_service: wrap client.post in try so transport errors raise HTTPException
- auth_service: log only status_code for HTTPStatusError, generic msg for HTTPError
- github_service: add asyncio.Lock for _get_async_redis_cache to fix race

Made-with: Cursor
…ool)

- Lazy-import GitPython in parsing_helper, parsing_service, github_service,
  local_repo_service, local_provider, change_detection_tool to avoid loading
  git at import time (fork-unsafe in gunicorn/Celery workers on macOS).
- parsing_helper: _fetch_github_branch_head_sha_http for commit-status check
  (HTTP-only, no GitPython) so parsing-status endpoint is safe in forked workers.
- Celery: on Darwin use solo pool and set multiprocessing start method to
  spawn to avoid SIGSEGV in forked workers; prefork unchanged on Linux.
- git_safe: SIGSEGV handler uses async-signal-safe os.write/os._exit only.
- parsing_controller: remove debug instrumentation.

Made-with: Cursor
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 2, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Migrates many sync flows to async: adds AsyncRedisStreamManager, AsyncSession/AsyncUserService, async DB/Redis paths across routers and services, replaces blocking HTTP with httpx (sync+async), lazy-loads GitPython, updates Celery config, and adds docs and benchmark scripts.

Changes

Cohort / File(s) Summary
API & App startup
app/api/router.py, app/main.py
Routes and DI now include AsyncSession/AsyncRedisStreamManager; get_api_key_user and message endpoints use async DB and async Redis manager; app startup/shutdown initialize and close async Redis stream manager.
Async Redis & Routing utils
app/modules/conversations/utils/redis_streaming.py, app/modules/conversations/utils/conversation_routing.py, app/modules/conversations/conversation_deps.py
Adds AsyncRedisStreamManager, retry helpers, _reservation_key, async_ensure_unique_run_id, and converts start_celery_task_and_stream/start_celery_task_and_wait to async, plus DI helpers for async Redis/session.
Conversations & Sessions
app/modules/conversations/.../conversation_service.py, .../conversation_controller.py, .../session/session_service.py, .../conversations_router.py
Wires async_redis_manager and AsyncSessionService through controller/service/router; adds async history/session dispatch, async session APIs, and replaces sync Redis/session interactions with async equivalents.
Auth & Users
app/modules/auth/*, app/modules/users/user_service.py, app/modules/auth/tests/auth_service_test.py
Introduces AsyncUserService, migrates auth flows to async (e.g., login_async), swaps requests → httpx (sync+async), and updates tests to mock httpx client.
Chat history & access
app/modules/intelligence/memory/chat_history_service.py, app/modules/conversations/access/access_service.py
Adds AsyncChatHistoryService and AsyncShareChatService with async DB queries, buffering/flush semantics, and router wiring for async access endpoints.
Code provider & GitHub
app/modules/code_provider/..., app/modules/code_provider/github/github_service.py
Adds async branch cache paths, async token/DB support (async_get_github_oauth_token), and updates controllers/routers to accept and forward AsyncSession.
Lazy GitPython loading
app/modules/code_provider/local_repo/*, app/modules/parsing/graph_construction/*, app/modules/intelligence/tools/change_detection/*
Defers GitPython imports with runtime helpers and changes Repo typing to Any to avoid fork-safety SIGSEGVs.
Tunnel & Usage
app/modules/tunnel/*, app/modules/usage/*
Adds async Redis workspace-tunnel ops, WORKSPACE_TUNNEL_RECORD_TTL, and migrates usage service/controller/router to accept AsyncSession and run async queries.
Linear & tools
app/modules/intelligence/tools/linear_tools/*
Adds httpx-based async GraphQL client with execute_query_async, async issue/update/comment methods, and client factories.
Utilities & webhooks
app/modules/utils/*.py
Replaces blocking HTTP with httpx async clients or offloads sync SDK calls to threads (email, Slack, PostHog); adds timeouts and logging.
Celery & infra
app/celery/celery_app.py
macOS multiprocessing spawn handling, configurable worker_pool/timeouts via env var, visibility_timeout aligned to task limit, and expanded task routing.
Docs & benchmarks
docs/async-migration-plan.md, scripts/*benchmark*.py
Adds async migration plan and benchmark scripts for share/access, tunnel/GitHub, and usage endpoints.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client
    participant API as FastAPI
    participant Auth as AuthService
    participant DB as AsyncDB
    participant AsyncRedis as AsyncRedisMgr
    participant Celery as CeleryWorker

    Client->>API: POST /conversations/{id}/message
    API->>Auth: await validate token / fetch user (AsyncUserService)
    API->>DB: await UsageService.check_usage_limit(user_id, async_db)
    API->>AsyncRedis: await async_ensure_unique_run_id(conversation_id, run_id, async_redis)
    API->>AsyncRedis: await set_task_status / publish_event
    API->>Celery: await start_celery_task_and_stream(..., async_redis_manager)
    Celery->>AsyncRedis: await wait_for_task_start / publish events
    Celery->>DB: await async DB operations (AsyncSession)
    Celery->>AsyncRedis: await publish end event
    API-->>Client: StreamingResponse or ChatMessageResponse
Loading
sequenceDiagram
    participant FastAPI as FastAPI
    participant Startup as Startup
    participant AsyncRedisMgr as AsyncRedisMgr
    participant Redis as Redis

    FastAPI->>Startup: on_event(startup)
    Startup->>AsyncRedisMgr: initialize/connect (redis.asyncio)
    AsyncRedisMgr->>Redis: connect
    Redis-->>AsyncRedisMgr: ready
    AsyncRedisMgr-->>FastAPI: store on app.state
    FastAPI->>FastAPI: serve requests
    FastAPI->>Startup: on_event(shutdown)
    Startup->>AsyncRedisMgr: await aclose()
    AsyncRedisMgr->>Redis: disconnect
    AsyncRedisMgr-->>FastAPI: closed
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Suggested reviewers

  • dhirenmathur
  • nndn

Poem

🐰
I hopped through code and made it light and quick,
Streams whisper softly now — no blocking trick,
Git sleeps until called, Redis hums async tunes,
Celery spawns tidy on macOS moons,
Hooray — the rabbit danced beneath the code-lit moon.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.51% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Test/async migration all steps' accurately describes the main objective of this PR, which comprehensively migrates the codebase from synchronous to asynchronous patterns across all identified steps in the async-migration-plan.md.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch test/async-migration-all-steps

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@yashkrishan yashkrishan requested review from dhirenmathur and nndn March 2, 2026 07:23
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (6)
app/modules/auth/tests/auth_service_test.py (1)

49-50: ⚠️ Potential issue | 🔴 Critical

Error-path tests assert outdated exception types and mock incorrect exceptions.

The AuthService.login method now translates all httpx failures into HTTPException:

  • httpx.HTTPStatusErrorHTTPException with upstream status code (lines 36-47)
  • httpx.HTTPError (including ConnectError, TimeoutException) → HTTPException(502) (lines 48-53)

However, the tests still assert against the old behavior:

  • Lines 84, 103: Expect generic Exception but service raises HTTPException
  • Lines 119, 131: Expect raw httpx.ConnectError and httpx.TimeoutException but service raises HTTPException(502)
  • Lines 49-50, 91-94: Mock with generic Exception instead of httpx.HTTPStatusError, preventing proper error-translation validation

These tests will fail at runtime and do not validate the intended error-handling contract.

🧪 Proposed test updates
-        mock_response.raise_for_status.side_effect = Exception(mock_response.json())
+        request = httpx.Request("POST", "https://identitytoolkit.googleapis.com")
+        response = httpx.Response(
+            400,
+            request=request,
+            json={"error": {"message": "INVALID_PASSWORD"}},
+        )
+        mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
+            "400 from upstream",
+            request=request,
+            response=response,
+        )
@@
-                with pytest.raises(Exception) as exc_info:
+                with pytest.raises(HTTPException) as exc_info:
                     auth_service.login("test@example.com", "invalid_password")
-                assert "INVALID_PASSWORD" in str(exc_info.value)
+                assert exc_info.value.status_code == 400
+                assert "INVALID_PASSWORD" in str(exc_info.value.detail)
@@
-            mock_response.raise_for_status.side_effect = Exception(
-                mock_response.json()
-            )
+            request = httpx.Request("POST", "https://identitytoolkit.googleapis.com")
+            response = httpx.Response(
+                400,
+                request=request,
+                json={"error": {"message": "MISSING_EMAIL"}},
+            )
+            mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
+                "400 from upstream",
+                request=request,
+                response=response,
+            )
@@
-                with pytest.raises(Exception) as exc_info:
+                with pytest.raises(HTTPException) as exc_info:
                     auth_service.login("", "")
-                assert "MISSING_EMAIL" in str(exc_info.value)
+                assert exc_info.value.status_code == 400
+                assert "MISSING_EMAIL" in str(exc_info.value.detail)
@@
-                with pytest.raises(httpx.ConnectError):
+                with pytest.raises(HTTPException) as exc_info:
                     auth_service.login("test@example.com", "password")
+                assert exc_info.value.status_code == 502
@@
-                with pytest.raises(httpx.TimeoutException):
+                with pytest.raises(HTTPException) as exc_info:
                     auth_service.login("test@example.com", "password")
+                assert exc_info.value.status_code == 502

Also applies to: 83-85, 91-93, 102-104, 118-134

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/auth/tests/auth_service_test.py` around lines 49 - 50, Tests in
auth_service_test.py still mock and assert generic Exceptions instead of the
httpx exceptions that AuthService.login now translates into HTTPException;
update the mocks to raise httpx.HTTPStatusError for upstream error-paths (use a
Response and Request mock to construct the HTTPStatusError) and raise
httpx.ConnectError / httpx.ReadTimeout for connection/timeout paths, then change
assertions to expect fastapi.HTTPException with the upstream status code for
HTTPStatusError cases and HTTPException(status_code=502) for httpx.HTTPError
cases; also adjust the mock of mock_response.raise_for_status to simulate
raising httpx.HTTPStatusError rather than a generic Exception so the
error-translation logic in AuthService.login is exercised.
app/modules/parsing/graph_construction/parsing_helper.py (4)

2488-2493: ⚠️ Potential issue | 🔴 Critical

Repo used in isinstance check without lazy import in extract_repository_metadata.

Line 2489 uses isinstance(repo, Repo) but Repo is not in scope at this point.

Proposed fix
     def extract_repository_metadata(self, repo):
+        _, _, Repo = _get_git_imports()
         if isinstance(repo, Repo):
             metadata = ParseHelper.extract_local_repo_metadata(repo)
         else:
             metadata = ParseHelper.extract_remote_repo_metadata(repo)
         return metadata
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/parsing/graph_construction/parsing_helper.py` around lines 2488 -
2493, The isinstance check in extract_repository_metadata references Repo which
isn't imported; fix by ensuring Repo is available via a local (lazy) import
inside extract_repository_metadata (or add a top-level import) so the
isinstance(repo, Repo) check works; update the function
extract_repository_metadata to import Repo before the isinstance call and then
call ParseHelper.extract_local_repo_metadata(repo) or
ParseHelper.extract_remote_repo_metadata(repo) as currently implemented.

1103-1118: ⚠️ Potential issue | 🔴 Critical

Repo used in isinstance checks without lazy import.

Lines 1103 and 1112 use isinstance(repo, Repo) but Repo may not be imported. While line 1261 shows the correct pattern (importing before use), these earlier checks can fail.

Proposed fix
+        _, _, Repo = _get_git_imports()
         if repo_manager_path:
             # RepoManager-cached remote repo - DON'T set repo_path (it's a cached remote, not true local)
             repo_path = None
             ...
         elif isinstance(repo, Repo):
             # Local repository - use full path from Repo object
             ...
         elif isinstance(repo_details, Repo):
             # Alternative: repo_details is the Repo object (non-dev mode)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/parsing/graph_construction/parsing_helper.py` around lines 1103 -
1118, The isinstance checks use the class Repo before it's guaranteed to be
imported, causing NameError; update the code in ParsingHelper to either import
Repo at top-level or perform a local/lazy import before the checks (e.g., ensure
`from git import Repo` or equivalent is executed before using `isinstance(repo,
Repo)` and `isinstance(repo_details, Repo)`), and keep the existing logger
messages and variable assignments (repo_path, full_name) unchanged so the
behavior remains the same.

1891-1908: ⚠️ Potential issue | 🔴 Critical

Critical: Repo used without lazy import - will cause NameError.

Line 1893 calls Repo.clone_from(...) directly, but Repo is not imported at module level (it's lazy-loaded). This will raise NameError: name 'Repo' is not defined when this code path is executed.

Proposed fix
             try:
                 # Clone as bare repository
+                _, _, Repo = _get_git_imports()
                 Repo.clone_from(
                     clone_url,
                     str(bare_repo_path),
                     bare=True,
                     mirror=True,  # Mirror for full fidelity
                 )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/parsing/graph_construction/parsing_helper.py` around lines 1891 -
1908, The code calls Repo.clone_from (the Repo symbol) inside the try block but
Repo is not imported, causing NameError; fix by adding a proper import for
GitPython (e.g. from git import Repo) before use—preferably a local/lazy import
placed immediately above the Repo.clone_from call in parsing_helper.py (inside
the function or try block) or add a guarded top-level import and fallback to
local import to preserve lazy-loading; update any logging or exception handling
around Repo.clone_from to reflect the import placement if needed.

481-486: ⚠️ Potential issue | 🔴 Critical

Repo used in isinstance check without lazy import.

Line 483 uses isinstance(repo, Repo) but Repo is not imported at module level. This will cause a NameError.

Proposed fix
     async def _clone_repository_with_auth(self, repo, branch, target_dir, user_id):
         ...
+        _, _, Repo = _get_git_imports()
         repo_name = (
             repo.working_tree_dir
             if isinstance(repo, Repo)
             else getattr(repo, "full_name", "unknown")
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/parsing/graph_construction/parsing_helper.py` around lines 481 -
486, The isinstance(repo, Repo) check will raise NameError because Repo isn't
imported; fix by performing a lazy import just before the check (e.g., try: from
git import Repo except ImportError: Repo = None) and then use isinstance(repo,
Repo) only when Repo is not None, otherwise fall back to getattr(repo,
"full_name", "unknown"); update the code around the repo_name assignment (the
repo_name variable and the isinstance(repo, Repo) check) to include this local
import and safe fallback.
app/modules/intelligence/tools/linear_tools/linear_client.py (1)

218-232: ⚠️ Potential issue | 🟡 Minor

Sync call in async method will block the event loop.

SecretStorageHandler.get_secret is a synchronous method that makes blocking I/O calls to Google Cloud Secret Manager (via google.cloud.secretmanager). Calling it directly in an async method without using an executor will block the event loop and degrade performance.

Wrap the call in a thread pool executor using asyncio.to_thread() or loop.run_in_executor(), or use an async-compatible secret manager client.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/intelligence/tools/linear_tools/linear_client.py` around lines
218 - 232, The _get_api_key_from_secrets async method currently calls the
synchronous SecretStorageHandler.get_secret (which performs blocking I/O)
directly; change the call to run off the event loop (e.g., wrap
SecretStorageHandler.get_secret(...) in asyncio.to_thread(...) or
loop.run_in_executor(...)) so the blocking Google Cloud Secret Manager I/O does
not block the event loop, await the to_thread call, and preserve the same
try/except/return None semantics around the awaited result.
🟡 Minor comments (8)
app/modules/utils/parse_webhook_helper.py-34-35 (1)

34-35: ⚠️ Potential issue | 🟡 Minor

Avoid broad exception catch and use exception logging to preserve traceback context.

The current code catches all Exception types and logs only the string representation, which loses traceback details that are essential for debugging. Since httpx is used for the HTTP request, expected HTTP failures should be caught specifically, while unexpected errors should be logged with full context.

Use logger.exception() instead of logger.warning() to preserve the traceback, and narrow the catch to httpx.HTTPError for expected failures:

Proposed fix
-        except Exception as e:
-            logger.warning("Error sending message to Slack: %s", e)
+        except httpx.HTTPError:
+            logger.exception("Error sending message to Slack")
+        except Exception:
+            logger.exception("Unexpected error while sending Slack notification")
+            raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/utils/parse_webhook_helper.py` around lines 34 - 35, Replace the
broad "except Exception as e" block in parse_webhook_helper.py with a targeted
catch for httpx.HTTPError and a separate catch-all that preserves traceback:
catch httpx.HTTPError as e and log the expected HTTP failure (using
logger.warning or logger.error) including the exception message, then add an
"except Exception:" block that calls logger.exception("Unexpected error sending
message to Slack") to preserve the full traceback; ensure you import httpx if
not already and keep using the existing logger symbol.
app/modules/utils/posthog_helper.py-50-53 (1)

50-53: ⚠️ Potential issue | 🟡 Minor

Snapshot properties before executor handoff and pass args directly to avoid mutable closure.

At line 52, the lambda captures a reference to the mutable properties dict. If callers mutate the dict after send_event() returns but before the queued thread task executes, the captured reference will reflect those mutations, sending altered payloads. Create a snapshot using dict(properties) and pass all arguments directly to run_in_executor.

Suggested fix
         try:
             loop = asyncio.get_running_loop()
             # Fire-and-forget: run sync capture in thread pool
+            properties_snapshot = dict(properties)
             loop.run_in_executor(
                 None,
-                lambda: self._capture_sync(user_id, event_name, properties),
+                self._capture_sync,
+                user_id,
+                event_name,
+                properties_snapshot,
             )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/utils/posthog_helper.py` around lines 50 - 53, The lambda passed
to loop.run_in_executor in send_event captures the mutable properties dict by
reference; create an immutable snapshot (e.g., properties_copy =
dict(properties) or properties.copy()) inside send_event before scheduling and
call loop.run_in_executor with the target function self._capture_sync and the
explicit args (user_id, event_name, properties_copy) instead of using a lambda
to avoid the mutable closure; update send_event to use run_in_executor(None,
self._capture_sync, user_id, event_name, properties_copy).
app/modules/auth/auth_service.py-40-47 (1)

40-47: ⚠️ Potential issue | 🟡 Minor

Preserve exception causality when re-raising HTTPException.

At Line 44 and Line 81, re-raising without from e drops causal context from the upstream HTTP error handling path.

🧭 Proposed fix
-                raise HTTPException(
+                raise HTTPException(
                     status_code=e.response.status_code,
                     detail=detail,
-                )
+                ) from e
@@
-            raise HTTPException(
+            raise HTTPException(
                 status_code=e.response.status_code,
                 detail=detail,
-            )
+            ) from e

Also applies to: 77-84

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/auth/auth_service.py` around lines 40 - 47, The HTTPException
raised in the exception handlers currently discards the original exception
context; update both raise statements that construct HTTPException (the ones
inside the except handling the upstream client error where detail =
e.response.json() / e.response.text and the similar handler later in the file)
to re-raise with explicit causality by appending "from e" so the original
exception (e) is preserved as the __cause__; keep the same status_code and
detail values but change "raise HTTPException(...)" to "raise HTTPException(...)
from e".
app/modules/code_provider/branch_cache.py-317-320 (1)

317-320: ⚠️ Potential issue | 🟡 Minor

Avoid silent except ...: pass in cache diagnostics.

At Line 319, swallowing decode errors hides malformed cache payload issues during debugging.

🛠️ Proposed fix
-                    except Exception:
-                        pass
+                    except Exception as e:
+                        logger.warning(
+                            "BranchCache: failed to decode cached branches for key=%s: %s",
+                            key,
+                            e,
+                        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/code_provider/branch_cache.py` around lines 317 - 320, The code
currently swallows all exceptions when decoding cached_data into branches
(variables branches, cached_data, branch_count), which hides malformed cache
payloads; replace the bare except with specific exception handling (e.g., catch
json.JSONDecodeError and TypeError), log a warning including context (cache
key/identifier if available, truncated cached_data or its length, and the
exception message) using the module logger, and then fall back to a safe state
(e.g., set branch_count = 0 or treat cache as miss); optionally remove/refresh
the bad cache entry after logging to avoid repeated failures.
docs/async-migration-plan.md-74-75 (1)

74-75: ⚠️ Potential issue | 🟡 Minor

Add language identifiers to fenced code blocks

These fences are missing a language tag, which trips markdownlint (MD040). Please annotate them (for example text, python, bash, or json) to keep docs lint-clean.

Also applies to: 99-100, 120-121, 137-138, 152-153, 183-184, 298-299

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/async-migration-plan.md` around lines 74 - 75, Several fenced code
blocks in async-migration-plan.md are missing language identifiers (causing
markdownlint MD040); update each triple-backtick fence (e.g., the block starting
at the "Measures:" section and also the blocks around lines noted: 99-100,
120-121, 137-138, 152-153, 183-184, 298-299) to include an appropriate language
tag such as ```text, ```bash, ```json or ```python so the linter recognizes the
language for each fenced code block.
app/modules/parsing/graph_construction/parsing_helper.py-43-66 (1)

43-66: ⚠️ Potential issue | 🟡 Minor

URL scheme and error handling concerns in _fetch_github_branch_head_sha_http.

Several issues:

  1. Redundant token fallback (Line 57): token is already set from CODE_PROVIDER_TOKEN on Line 51, so the fallback on Line 57 is a no-op.

  2. URL scheme not validated (Lines 58, 62): Static analysis flags potential security concern - the URL is constructed dynamically but not validated for scheme. While the hardcoded https://api.github.com prefix mitigates this, consider explicit validation for defense-in-depth.

  3. Silent exception swallowing (Lines 65-66): Catching all exceptions and returning None hides failures. Consider logging the exception for observability.

Proposed fix
 def _fetch_github_branch_head_sha_http(repo_name: str, branch_name: str) -> Optional[str]:
     """
     Fetch the HEAD commit SHA for a GitHub branch using only HTTP (no GitPython/PyGithub).
     Safe to call from forked processes (gunicorn workers) where GitPython causes SIGSEGV.
     """
     try:
         url = f"https://api.github.com/repos/{repo_name}/branches/{branch_name}"
         token_list = os.getenv("GH_TOKEN_LIST", "").strip()
         token = os.getenv("CODE_PROVIDER_TOKEN")
         if token_list:
             parts = [p.strip() for p in token_list.replace("\n", ",").split(",") if p.strip()]
             if parts:
                 token = token or parts[0]
-        if not token:
-            token = os.getenv("CODE_PROVIDER_TOKEN")
         req = urllib.request.Request(url)
         req.add_header("Accept", "application/vnd.github.v3+json")
         if token:
             req.add_header("Authorization", f"Bearer {token}")
         with urllib.request.urlopen(req, timeout=15) as resp:
             data = json.loads(resp.read().decode())
         return (data.get("commit") or {}).get("sha")
-    except Exception:
+    except Exception as e:
+        logger.debug(f"Failed to fetch branch head SHA for {repo_name}/{branch_name}: {e}")
         return None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/parsing/graph_construction/parsing_helper.py` around lines 43 -
66, In _fetch_github_branch_head_sha_http, remove the redundant re-assignment of
token (the second os.getenv("CODE_PROVIDER_TOKEN") fallback) and instead pick
token from CODE_PROVIDER_TOKEN or the first entry of GH_TOKEN_LIST; validate the
constructed url (the variable url) before use by asserting it begins with
"https://" to prevent non-HTTPS schemes; and replace the bare except: return
None with exception-aware handling — catch specific exceptions (e.g.,
urllib.error.URLError, json.JSONDecodeError, Exception as e) and log the
error/exception detail via the module logger or processLogger before returning
None so failures are observable.
app/modules/auth/auth_router.py-14-14 (1)

14-14: ⚠️ Potential issue | 🟡 Minor

Duplicate logger initialization.

The logger is initialized twice in this file (lines 14 and 40-41). Remove one of the duplicate definitions.

Suggested fix

Remove the duplicate at lines 40-41:

 from app.modules.utils.posthog_helper import PostHogClient
 from app.modules.utils.email_helper import is_personal_email_domain

-logger = logging.getLogger(__name__)
-
 SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL", None)

Also applies to: 40-41

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/auth/auth_router.py` at line 14, The file declares logger =
logging.getLogger(__name__) twice; remove the duplicate declaration (the later
one in the file) so only the initial logger variable remains; locate the second
logger assignment and delete it, ensuring all uses of logger continue to
reference the single top-level logger variable (logger) defined near the module
top.
app/modules/usage/usage_service.py-26-41 (1)

26-41: ⚠️ Potential issue | 🟡 Minor

The query duplicates message counts if a single conversation has multiple agents in the agent_ids array.

When func.unnest(Conversation.agent_ids) expands an array like ["agent_A", "agent_B"], each message in that conversation is counted once per agent. For a conversation with 2 agents and 10 messages, the sum becomes 20 instead of 10. This inflates total_human_messages used for subscription limit checking (line 95).

If each conversation has exactly one agent, this is not an issue. If conversations can have multiple agents, either:

  • Use DISTINCT ON to count unique messages per conversation first
  • Or change the data model to have a separate agent-message relationship if per-agent attribution is needed
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/usage/usage_service.py` around lines 26 - 41, The current query
using func.unnest(Conversation.agent_ids) in stmt causes each Message to be
counted once per agent, inflating totals; to fix it, change the query logic in
the function that builds stmt so messages are first deduplicated per
conversation (e.g., count distinct Message.id per Conversation.id) or aggregate
by Conversation.id before unnesting agents, then join the per-conversation
message counts to the unnested agents; update the select/group_by so you
aggregate message_count as COUNT(DISTINCT Message.id) or compute a subquery that
selects Conversation.id and message_count and then unnest Conversation.agent_ids
to attribute that single message_count per conversation to each agent, ensuring
total_human_messages reflects unique messages only.
🧹 Nitpick comments (14)
app/main.py (1)

208-224: LGTM with minor improvement: Remove redundant exception object from logging.exception.

The startup logic correctly initializes AsyncRedisStreamManager and fails fast if unavailable. One minor issue: logging.exception() already captures exception info automatically, so passing e as an argument is redundant.

♻️ Remove redundant exception argument
         except Exception as e:
             logger.exception(
-                "AsyncRedisStreamManager failed to initialize (redis.asyncio required): %s",
-                e,
+                "AsyncRedisStreamManager failed to initialize (redis.asyncio required)"
             )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/main.py` around lines 208 - 224, The logger.exception call in the
AsyncRedisStreamManager initialization is passing the exception object
redundantly; update the except block around AsyncRedisStreamManager
import/instantiation so logger.exception only takes the message (e.g., change
logger.exception("AsyncRedisStreamManager failed to initialize (redis.asyncio
required): %s", e) to logger.exception("AsyncRedisStreamManager failed to
initialize (redis.asyncio required)")) while keeping the subsequent raise
RuntimeError from e and leaving AsyncRedisStreamManager and
app.state.async_redis_stream_manager usage unchanged.
app/modules/code_provider/local_repo/local_repo_service.py (1)

55-55: Return type changed to Any - consider documenting.

The return type change from git.Repo to Any is necessary for the lazy import pattern but loses type information. Consider adding a docstring note that the return type is actually git.Repo at runtime.

📝 Add type hint documentation
     def get_repo(self, repo_path: str) -> Any:
+        """Get git repository object.
+        
+        Returns:
+            git.Repo: The repository object (typed as Any to support lazy import)
+        """
         if not os.path.exists(repo_path):

Also applies to: 60-61

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/code_provider/local_repo/local_repo_service.py` at line 55,
Update the get_repo function's signature/docstring to document that although the
return type is annotated as Any (due to lazy import), the object is actually an
instance of git.Repo at runtime; add a brief note in the get_repo docstring
mentioning "Returns: git.Repo at runtime (annotated as Any for lazy import)" and
do the same for any other methods in this module that were changed to Any (the
ones noted on lines ~60-61) so callers and IDEs understand the runtime type.
app/modules/intelligence/memory/chat_history_service.py (1)

261-278: Unused parameters message_type and sender_id in add_message_chunk.

These parameters are accepted but never used in the method body. The sync version (lines 70-85) has the same unused parameters. If these are intentionally kept for API consistency with flush_message_buffer, consider adding a brief comment or prefixing with underscore to indicate they're intentionally unused.

♻️ Option 1: Prefix unused params with underscore
     def add_message_chunk(
         self,
         conversation_id: str,
         content: str,
-        message_type: MessageType,
-        sender_id: Optional[str] = None,
+        _message_type: MessageType,
+        _sender_id: Optional[str] = None,
         citations: Optional[List[str]] = None,
     ) -> None:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/intelligence/memory/chat_history_service.py` around lines 261 -
278, The add_message_chunk method accepts message_type and sender_id but never
uses them; update the signature to mark them as intentionally unused (e.g.,
rename to _message_type and _sender_id) or add a succinct comment inside
add_message_chunk (and the sync counterpart) stating they are kept for API
consistency with flush_message_buffer; ensure references to add_message_chunk
and the sync version in chat_history_service.py are updated consistently so
linters and readers understand these params are intentionally unused.
app/modules/intelligence/tools/change_detection/change_detection_tool.py (1)

657-657: Consider caching the class reference to avoid repeated imports.

Calling _get_local_repo_service() inside isinstance() will import the module on every check. While functionally correct, caching the result would be more efficient if this code path is hit frequently.

♻️ Optional optimization
+            LocalRepoService = _get_local_repo_service()
-            elif isinstance(code_service.service_instance, _get_local_repo_service()):
+            elif isinstance(code_service.service_instance, LocalRepoService):
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/intelligence/tools/change_detection/change_detection_tool.py` at
line 657, The isinstance check calls _get_local_repo_service() each time,
causing repeated imports; cache the class reference once (e.g., fetch
_get_local_repo_service() into a module- or function-level variable like
local_repo_service_class) and replace isinstance(code_service.service_instance,
_get_local_repo_service()) with isinstance(code_service.service_instance,
local_repo_service_class) so the import/lookup happens only once while
preserving behavior of the _get_local_repo_service factory.
scripts/benchmark_tunnel_github.py (1)

51-55: method parameter is currently unused in _run_concurrent.

Either remove it or route request dispatch by method for clarity and future reuse.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/benchmark_tunnel_github.py` around lines 51 - 55, The _run_concurrent
function currently accepts a method parameter that is never used; either remove
the unused parameter from the signature or wire it into the request dispatch so
different HTTP methods are honored. Locate the _run_concurrent definition and
either (A) drop the method: str parameter and update all callers to stop passing
it, or (B) use the method value when sending requests (e.g., branch on method or
call the appropriate request helper) so that GET/POST/etc. are dispatched
correctly; update any helper/functions that build or send requests to accept and
propagate method accordingly.
app/modules/parsing/graph_construction/parsing_helper.py (1)

2353-2353: Unused variable GitCommandError.

The unpacked variable is not used in this function. Prefix with underscore to indicate intentional discard.

-        GitCommandError, InvalidGitRepositoryError, Repo = _get_git_imports()
+        _, InvalidGitRepositoryError, Repo = _get_git_imports()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/parsing/graph_construction/parsing_helper.py` at line 2353, The
unpacked variable GitCommandError returned from _get_git_imports() is unused;
update the unpacking to indicate intentional discard by prefixing it with an
underscore (e.g., _GitCommandError, InvalidGitRepositoryError, Repo =
_get_git_imports()) so linters know the value is intentionally ignored while
keeping InvalidGitRepositoryError and Repo unchanged.
app/modules/users/user_service.py (1)

193-195: Consider using logger.exception for better stack traces.

Throughout the AsyncUserService, logger.error is used in exception handlers. Using logger.exception would automatically include the stack trace, improving debuggability.

Example fix (apply similar pattern to other handlers)
         except Exception as e:
-            logger.error("Error fetching user: %s", e)
+            logger.exception("Error fetching user: %s", e)
             return None

Also applies to: 205-207, 214-219, 229-231, 251-253, 278-280

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/users/user_service.py` around lines 193 - 195, Replace uses of
logger.error in AsyncUserService exception handlers with logger.exception so
stack traces are recorded; locate the except blocks in methods such as the user
fetch handler (the block catching Exception as e where logger.error("Error
fetching user: %s", e) is called) and change to logger.exception with the same
message text (e.g., logger.exception("Error fetching user: %s", e)) and apply
the same change to the other except blocks referenced (lines around 205-207,
214-219, 229-231, 251-253, 278-280) to ensure all exceptions log full
tracebacks.
app/modules/conversations/access/access_service.py (2)

197-199: Preserve exception chain with raise ... from e.

When re-raising a custom exception from a caught exception, use raise ... from e to maintain the exception chain for better debugging.

Suggested fix
         except Exception as e:
             await self.session.rollback()
-            raise ShareChatServiceError(f"Failed to update shared chat: {str(e)}")
+            raise ShareChatServiceError(f"Failed to update shared chat: {str(e)}") from e
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/access/access_service.py` around lines 197 - 199,
The except block in access_service.py currently swallows the original traceback
when re-raising a ShareChatServiceError; modify the exception handling in the
method that contains "await self.session.rollback()" so that you re-raise the
custom exception with exception chaining (use "raise
ShareChatServiceError(f'Failed to update shared chat: {str(e)}') from e") to
preserve the original exception context; keep the rollback call and message
intact but append "from e" when raising ShareChatServiceError.

143-149: Use explicit Optional type annotation.

PEP 484 prohibits implicit Optional. The parameter should be explicitly annotated as Optional[List[str]].

Suggested fix
     async def share_chat(
         self,
         conversation_id: str,
         user_id: str,
-        recipient_emails: List[str] = None,
+        recipient_emails: Optional[List[str]] = None,
         visibility: Visibility = None,
     ) -> str:

And add Optional to the imports:

-from typing import List
+from typing import List, Optional
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/access/access_service.py` around lines 143 - 149,
The function share_chat has implicitly optional params; update its signature to
use explicit Optional types: change recipient_emails: List[str] = None to
recipient_emails: Optional[List[str]] = None (and consider making visibility:
Optional[Visibility] = None if it can be omitted) and add Optional to the module
imports so the type annotation is PEP 484 compliant; ensure you update any
references or type hints that import Visibility or use recipient_emails in
share_chat accordingly.
app/modules/intelligence/tools/linear_tools/linear_client.py (1)

50-57: Consider custom exception classes for better error handling.

Using generic Exception makes it harder for callers to handle specific error cases. Consider creating custom exceptions like LinearAPIError and LinearGraphQLError.

Example implementation
class LinearAPIError(Exception):
    """Raised when Linear API returns a non-200 status code."""
    def __init__(self, status_code: int, message: str):
        self.status_code = status_code
        super().__init__(f"Linear API error ({status_code}): {message}")

class LinearGraphQLError(Exception):
    """Raised when Linear API returns GraphQL errors."""
    def __init__(self, errors: list):
        self.errors = errors
        super().__init__(f"GraphQL errors: {errors}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/intelligence/tools/linear_tools/linear_client.py` around lines 50
- 57, Replace the generic Exception raises in linear_client.py with custom
exception classes: add two new classes LinearAPIError (accepting status_code and
message) and LinearGraphQLError (accepting the errors list) in this module, then
raise LinearAPIError instead of Exception when the HTTP response status is
non-200 (the block that currently raises f"Request failed...") and raise
LinearGraphQLError where the code now checks if "errors" in result (the block
that currently raises f"GraphQL errors..."); ensure these classes expose the
status_code or errors attributes so callers can handle them programmatically.
app/modules/usage/usage_service.py (1)

55-56: Redundant exception object in logger.exception call.

logger.exception automatically includes the exception information and stack trace. Passing e explicitly is redundant.

Suggested fix
-            logger.exception("Failed to fetch usage data: %s", e)
+            logger.exception("Failed to fetch usage data")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/usage/usage_service.py` around lines 55 - 56, Remove the
redundant explicit exception argument from the logger.exception call in
usage_service.py: change the call logger.exception("Failed to fetch usage data:
%s", e) to simply logger.exception("Failed to fetch usage data") so the
exception and stack trace are logged automatically, and keep the existing raise
Exception("Failed to fetch usage data") from e to preserve chaining.
app/modules/conversations/conversations_router.py (2)

512-514: Same exception handling improvement needed.

Apply the same fix as above for consistency.

Suggested fix
         except Exception as e:
-            logger.error(f"Access denied for conversation {conversation_id}: {str(e)}")
-            raise HTTPException(status_code=403, detail="Access denied to conversation")
+            logger.exception("Access denied for conversation %s", conversation_id)
+            raise HTTPException(status_code=403, detail="Access denied to conversation") from e
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/conversations_router.py` around lines 512 - 514,
The except block currently logs a stringified error without full traceback;
change it to log the exception details (use logger.exception(...) or
logger.error(..., exc_info=True)) so the full stacktrace and error context for
conversation_id are recorded, then re-raise the same
HTTPException(status_code=403, detail="Access denied to conversation"); update
the except handler around the code that references conversation_id and logger to
include exc_info so troubleshooting matches other fixes.

484-486: Improve exception handling for access denied errors.

The exception handling should use logger.exception for automatic stack trace inclusion and raise ... from e to preserve the exception chain.

Suggested fix
         except Exception as e:
-            logger.error(f"Access denied for conversation {conversation_id}: {str(e)}")
-            raise HTTPException(status_code=403, detail="Access denied to conversation")
+            logger.exception("Access denied for conversation %s", conversation_id)
+            raise HTTPException(status_code=403, detail="Access denied to conversation") from e
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/conversations_router.py` around lines 484 - 486,
Replace the generic logger.error and bare raise with proper exception chaining
and stack logging: change the logger call from logger.error(...) to
logger.exception(...) so the stack trace is captured, and re-raise the
HTTPException using "raise HTTPException(status_code=403, detail='Access denied
to conversation') from e" to preserve the original exception chain (update the
except block that currently references conversation_id, logger.error and raises
HTTPException).
app/modules/auth/auth_router.py (1)

89-93: Mixed sync and async session usage is transitional.

The endpoint uses both db: Session (for UnifiedAuthService) and async_db: AsyncSession (for AsyncUserService). This appears to be a transitional pattern during the async migration. Consider tracking this as technical debt to eventually migrate UnifiedAuthService to async as well.

Also applies to: 150-151

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/auth/auth_router.py` around lines 89 - 93, The signup endpoint is
mixing sync and async DB sessions (signup uses db: Session from get_db for
UnifiedAuthService and async_db: AsyncSession from get_async_db for
AsyncUserService); fix by removing the mixed usage: either convert the
UnifiedAuthService methods invoked in signup into async variants (e.g., create
an AsyncUnifiedAuthService or make UnifiedAuthService methods async) and then
change the signup signature to accept only async_db: AsyncSession (drop db and
get_db), or if not ready, centralize/annotate this as technical debt by adding a
clear TODO and wrapper that ensures all calls use the same session type; update
all call sites in signup and the other noted endpoint(s) to use the unified
service name (UnifiedAuthService or AsyncUnifiedAuthService) and the single
session dependency (get_async_db) consistently.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2f38075 and 907a1f9.

📒 Files selected for processing (42)
  • app/api/router.py
  • app/celery/celery_app.py
  • app/main.py
  • app/modules/auth/auth_router.py
  • app/modules/auth/auth_service.py
  • app/modules/auth/tests/auth_service_test.py
  • app/modules/code_provider/branch_cache.py
  • app/modules/code_provider/code_provider_controller.py
  • app/modules/code_provider/git_safe.py
  • app/modules/code_provider/github/github_router.py
  • app/modules/code_provider/github/github_service.py
  • app/modules/code_provider/local_repo/local_provider.py
  • app/modules/code_provider/local_repo/local_repo_service.py
  • app/modules/conversations/access/access_service.py
  • app/modules/conversations/conversation/conversation_controller.py
  • app/modules/conversations/conversation/conversation_service.py
  • app/modules/conversations/conversation_deps.py
  • app/modules/conversations/conversations_router.py
  • app/modules/conversations/session/session_service.py
  • app/modules/conversations/utils/conversation_routing.py
  • app/modules/conversations/utils/redis_streaming.py
  • app/modules/intelligence/memory/chat_history_service.py
  • app/modules/intelligence/tools/change_detection/change_detection_tool.py
  • app/modules/intelligence/tools/linear_tools/get_linear_issue_tool.py
  • app/modules/intelligence/tools/linear_tools/linear_client.py
  • app/modules/intelligence/tools/linear_tools/update_linear_issue_tool.py
  • app/modules/parsing/graph_construction/parsing_controller.py
  • app/modules/parsing/graph_construction/parsing_helper.py
  • app/modules/parsing/graph_construction/parsing_service.py
  • app/modules/tunnel/tunnel_router.py
  • app/modules/tunnel/tunnel_service.py
  • app/modules/usage/usage_controller.py
  • app/modules/usage/usage_router.py
  • app/modules/usage/usage_service.py
  • app/modules/users/user_service.py
  • app/modules/utils/email_helper.py
  • app/modules/utils/parse_webhook_helper.py
  • app/modules/utils/posthog_helper.py
  • docs/async-migration-plan.md
  • scripts/benchmark_share_access.py
  • scripts/benchmark_tunnel_github.py
  • scripts/benchmark_usage_check.py
💤 Files with no reviewable changes (1)
  • app/modules/parsing/graph_construction/parsing_controller.py

Comment thread app/modules/auth/auth_router.py Outdated
Comment thread app/modules/auth/auth_service.py Outdated
Comment thread app/modules/code_provider/github/github_service.py Outdated
Comment thread app/modules/conversations/utils/conversation_routing.py Outdated
Comment thread app/modules/conversations/utils/redis_streaming.py
Comment thread app/modules/utils/email_helper.py Outdated
Comment thread app/modules/utils/posthog_helper.py
Comment thread scripts/benchmark_share_access.py
Comment thread scripts/benchmark_tunnel_github.py
Comment thread scripts/benchmark_usage_check.py
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
app/modules/conversations/conversation/conversation_service.py (1)

1222-1243: ⚠️ Potential issue | 🔴 Critical

Fix critical syntax error: incomplete method call preventing file from parsing.

Line 1224 (and 1289) contains an incomplete self._history_add_message_chunk( call with no arguments or closing parenthesis, causing a SyntaxError. The code then incorrectly bypasses the async history routing wrapper by calling self.history_manager.add_message_chunk() directly. Additionally, the direct call passes a tool_calls parameter that the wrapper method does not support.

Replace both instances with proper calls to self._history_add_message_chunk() using the wrapper's supported parameters (remove the tool_calls argument).

Proposed fix
-                    self._history_add_message_chunk(
                     # Accumulate tool_calls from each chunk
                     if chunk.tool_calls:
                         for tool_call in chunk.tool_calls:
@@
                     # Capture thinking content if present
                     if chunk.thinking:
                         accumulated_thinking = chunk.thinking
-                    self.history_manager.add_message_chunk(
+                    self._history_add_message_chunk(
                         conversation_id,
                         chunk.response,
                         MessageType.AI_GENERATED,
                         citations=chunk.citations,
-                        tool_calls=accumulated_tool_calls if chunk.tool_calls else None,
                     )

Also applies to: 1289-1310

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/conversation/conversation_service.py` around lines
1222 - 1243, There is a stray incomplete call to
self._history_add_message_chunk( which causes a SyntaxError and bypasses the
async wrapper by calling self.history_manager.add_message_chunk(...) with an
unsupported tool_calls argument; remove the incomplete call and replace the
direct history_manager.add_message_chunk call with a proper call to
self._history_add_message_chunk(conversation_id, chunk.response,
MessageType.AI_GENERATED, citations=chunk.citations,
thinking=accumulated_thinking) (i.e., call the wrapper method with the supported
parameters and do not pass tool_calls).
app/modules/conversations/utils/conversation_routing.py (1)

270-299: ⚠️ Potential issue | 🟠 Major

Non-streaming wait can block indefinitely

Line 298 awaits stream collection with no timeout. If the worker never emits an end event, this request will hang indefinitely, consuming resources and timing out only at the infrastructure level.

🔧 Suggested fix (bounded wait + 504)
-        loop = asyncio.get_event_loop()
-        with ThreadPoolExecutor() as executor:
-            events = await loop.run_in_executor(executor, collect_from_stream)
+        loop = asyncio.get_running_loop()
+        try:
+            events = await asyncio.wait_for(
+                loop.run_in_executor(None, collect_from_stream),
+                timeout=120,
+            )
+        except asyncio.TimeoutError as exc:
+            raise HTTPException(
+                status_code=504,
+                detail="Timed out waiting for task output stream",
+            ) from exc
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/utils/conversation_routing.py` around lines 270 -
299, The await on loop.run_in_executor for collect_from_stream can hang forever
if no "end" event arrives; wrap the executor call with a bounded timeout (e.g.,
asyncio.wait_for) and handle asyncio.TimeoutError by adding an error/end event
and returning a 504-like error status; update the code around
collect_from_stream / loop.run_in_executor (referencing collect_from_stream,
loop.run_in_executor, conversation_id, run_id) to catch the timeout, append an
{"type":"end","status":"timeout","message":"Stream timed out"} event (or
equivalent) to events, and ensure the caller responds with a 504 status when the
timeout occurs.
♻️ Duplicate comments (1)
app/modules/conversations/utils/conversation_routing.py (1)

65-78: ⚠️ Potential issue | 🟠 Major

async_ensure_unique_run_id still has a TOCTOU race

Line 74 does a non-atomic exists() check loop; concurrent requests can still pick the same run_id. This repeats the previously reported issue.

🔧 Suggested fix (atomic reservation key)
+import uuid
+
 async def async_ensure_unique_run_id(
     conversation_id: str, run_id: str, async_redis: AsyncRedisStreamManager
 ) -> str:
@@
-    original_run_id = run_id
-    counter = 1
-    key = async_redis.stream_key(conversation_id, run_id)
-    while await async_redis.redis_client.exists(key):
-        run_id = f"{original_run_id}-{counter}"
-        counter += 1
-        key = async_redis.stream_key(conversation_id, run_id)
-    return run_id
+    original_run_id = run_id
+    counter = 0
+    claim_token = str(uuid.uuid4())
+    claim_ttl_seconds = 60
+
+    while True:
+        candidate = original_run_id if counter == 0 else f"{original_run_id}-{counter}"
+        reservation_key = f"{async_redis.stream_key(conversation_id, candidate)}:claim"
+        claimed = await async_redis.redis_client.set(
+            reservation_key, claim_token, nx=True, ex=claim_ttl_seconds
+        )
+        if claimed:
+            return candidate
+        counter += 1
#!/bin/bash
# Verify the uniqueness helpers are still using non-atomic exists loops.
rg -n -C3 'def ensure_unique_run_id|def async_ensure_unique_run_id|exists\(|set\(' app/modules/conversations/utils/conversation_routing.py
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/utils/conversation_routing.py` around lines 65 -
78, async_ensure_unique_run_id currently does a TOCTOU loop using
async_redis.redis_client.exists and can return duplicate run_ids under
concurrency; replace the non-atomic exists loop with an atomic reservation
attempt by trying to create a reservation key for each candidate run_id (use
async_redis.stream_key(conversation_id, run_id) as the reservation key) with a
Redis SET NX (or setnx) and a short TTL/expiry; if the SET NX succeeds return
that run_id, otherwise increment the counter and retry until a reservation is
acquired. Ensure you call the atomic set on async_redis.redis_client (not
exists) and keep the short expiration to avoid leaked reservations.
🧹 Nitpick comments (1)
app/modules/intelligence/memory/chat_history_service.py (1)

246-246: Fix message_buffer type annotation mismatch.

Dict[str, Dict[str, str]] does not match the values stored (e.g., citations is a list), so static checks here are currently misleading.

Proposed fix
-        self.message_buffer: Dict[str, Dict[str, str]] = {}
+        self.message_buffer: Dict[str, Dict[str, Any]] = {}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/intelligence/memory/chat_history_service.py` at line 246, The
type annotation for self.message_buffer is too narrow (Dict[str, Dict[str,
str]]) and conflicts with stored values like lists (e.g., "citations"); change
the annotation to reflect heterogeneous entry values such as Dict[str, Dict[str,
Any]] (or Dict[str, Any] if entries vary widely), update the import to include
Any from typing, and ensure any code using message_buffer is typed/accessed
accordingly (refer to the message_buffer attribute on the ChatHistoryService
class to locate the change).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/modules/conversations/conversation/conversation_service.py`:
- Around line 1731-1737: The stop_generation flow mixes async and sync Redis
managers causing blocking I/O and potential state divergence; in the
stop_generation function, when self.async_redis_manager is present, replace uses
of self.redis_manager.get_task_status(...), get_snapshot(...), cancel_task(...),
and clear_task(...) with their async counterparts on self.async_redis_manager
(await self.async_redis_manager.get_task_status(...), await
self.async_redis_manager.get_snapshot(...), await
self.async_redis_manager.cancel_task(...), await
self.async_redis_manager.clear_task(...)) and keep the existing await for
get_task_id so all status/snapshot/cancel/clear calls follow the same async
path.

In `@app/modules/intelligence/memory/chat_history_service.py`:
- Around line 290-333: The async buffer currently only stores "content" and
"citations", causing loss of Message metadata like tool_calls and thinking;
update add_message_chunk to also buffer tool_calls and thinking on
self.message_buffer[conversation_id] (initialize keys the same way as "content"
and "citations"), accumulate/merge/extend these fields when new chunks arrive,
and then in flush_message_buffer read and persist those buffered fields into the
Message instance (serialize/format tool_calls and thinking the same way the
synchronous path does before assigning to Message.tool_calls and
Message.thinking); apply the same changes for the other async buffering code
block referenced around the 362-383 region so all async paths preserve
tool_calls and thinking consistently.

---

Outside diff comments:
In `@app/modules/conversations/conversation/conversation_service.py`:
- Around line 1222-1243: There is a stray incomplete call to
self._history_add_message_chunk( which causes a SyntaxError and bypasses the
async wrapper by calling self.history_manager.add_message_chunk(...) with an
unsupported tool_calls argument; remove the incomplete call and replace the
direct history_manager.add_message_chunk call with a proper call to
self._history_add_message_chunk(conversation_id, chunk.response,
MessageType.AI_GENERATED, citations=chunk.citations,
thinking=accumulated_thinking) (i.e., call the wrapper method with the supported
parameters and do not pass tool_calls).

In `@app/modules/conversations/utils/conversation_routing.py`:
- Around line 270-299: The await on loop.run_in_executor for collect_from_stream
can hang forever if no "end" event arrives; wrap the executor call with a
bounded timeout (e.g., asyncio.wait_for) and handle asyncio.TimeoutError by
adding an error/end event and returning a 504-like error status; update the code
around collect_from_stream / loop.run_in_executor (referencing
collect_from_stream, loop.run_in_executor, conversation_id, run_id) to catch the
timeout, append an {"type":"end","status":"timeout","message":"Stream timed
out"} event (or equivalent) to events, and ensure the caller responds with a 504
status when the timeout occurs.

---

Duplicate comments:
In `@app/modules/conversations/utils/conversation_routing.py`:
- Around line 65-78: async_ensure_unique_run_id currently does a TOCTOU loop
using async_redis.redis_client.exists and can return duplicate run_ids under
concurrency; replace the non-atomic exists loop with an atomic reservation
attempt by trying to create a reservation key for each candidate run_id (use
async_redis.stream_key(conversation_id, run_id) as the reservation key) with a
Redis SET NX (or setnx) and a short TTL/expiry; if the SET NX succeeds return
that run_id, otherwise increment the counter and retry until a reservation is
acquired. Ensure you call the atomic set on async_redis.redis_client (not
exists) and keep the short expiration to avoid leaked reservations.

---

Nitpick comments:
In `@app/modules/intelligence/memory/chat_history_service.py`:
- Line 246: The type annotation for self.message_buffer is too narrow (Dict[str,
Dict[str, str]]) and conflicts with stored values like lists (e.g.,
"citations"); change the annotation to reflect heterogeneous entry values such
as Dict[str, Dict[str, Any]] (or Dict[str, Any] if entries vary widely), update
the import to include Any from typing, and ensure any code using message_buffer
is typed/accessed accordingly (refer to the message_buffer attribute on the
ChatHistoryService class to locate the change).

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 907a1f9 and 615ee4f.

📒 Files selected for processing (4)
  • app/celery/celery_app.py
  • app/modules/conversations/conversation/conversation_service.py
  • app/modules/conversations/utils/conversation_routing.py
  • app/modules/intelligence/memory/chat_history_service.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • app/celery/celery_app.py

Comment thread app/modules/conversations/conversation/conversation_service.py
Comment thread app/modules/intelligence/memory/chat_history_service.py
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
app/modules/conversations/conversation/conversation_service.py (2)

1236-1254: ⚠️ Potential issue | 🔴 Critical

Route AI chunk buffering through the async-aware history path.

When async_history_manager is active, chunks are still buffered via self.history_manager, but flush uses _history_flush_message_buffer(...). This can flush the wrong buffer and drop persisted AI output/tool metadata.

Proposed fix
@@
-                    self.history_manager.add_message_chunk(
-                        conversation_id,
-                        chunk.response,
-                        MessageType.AI_GENERATED,
-                        citations=chunk.citations,
-                        tool_calls=accumulated_tool_calls if chunk.tool_calls else None,
-                    )
+                    self._history_add_message_chunk(
+                        conversation_id,
+                        chunk.response,
+                        MessageType.AI_GENERATED,
+                        citations=chunk.citations,
+                    )
@@
-                    self.history_manager.add_message_chunk(
-                        conversation_id,
-                        chunk.response,
-                        MessageType.AI_GENERATED,
-                        citations=chunk.citations,
-                        tool_calls=accumulated_tool_calls if chunk.tool_calls else None,
-                    )
+                    self._history_add_message_chunk(
+                        conversation_id,
+                        chunk.response,
+                        MessageType.AI_GENERATED,
+                        citations=chunk.citations,
+                    )

Also applies to: 1302-1320

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/conversation/conversation_service.py` around lines
1236 - 1254, The code is adding AI chunks via
self.history_manager.add_message_chunk but always calling await
self._history_flush_message_buffer(...), which ignores async_history_manager and
can flush the wrong buffer; update the flow to detect if
self.async_history_manager is active and, in that case, call
self.async_history_manager.add_message_chunk(...) for buffering and use the
async-aware flush path (e.g., await
self._history_flush_message_buffer_async(...) or await
self.async_history_manager.flush_message_buffer(...) — whichever async flush API
exists) instead of self._history_flush_message_buffer; make the same change for
the other occurrence around the symbols at lines ~1302-1320 so both
add_message_chunk and the corresponding flush use the same (sync vs async)
history manager consistently.

1774-1776: ⚠️ Potential issue | 🟠 Major

Replace blocking sleep with async sleep in async def stop_generation.

The time.sleep(0.5) at line 1775 blocks the event loop in this async function. Replace with await asyncio.sleep(0.5).

Proposed fix
-                time.sleep(0.5)
+                await asyncio.sleep(0.5)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/conversation/conversation_service.py` around lines
1774 - 1776, In async def stop_generation replace the blocking call
time.sleep(0.5) with await asyncio.sleep(0.5) so the event loop isn't blocked;
ensure asyncio is imported at top if not already and update the call in the
stop_generation function (search for the time.sleep(0.5) line) to use await
asyncio.sleep(0.5).
♻️ Duplicate comments (1)
app/modules/conversations/conversation/conversation_service.py (1)

1758-1779: ⚠️ Potential issue | 🟠 Major

Keep task-status checks on the same Redis manager path.

stop_generation still checks status with sync self.redis_manager.get_task_status(...) even when async manager is configured. This reintroduces mixed I/O paths and potential state divergence.

#!/bin/bash
# Verify stop_generation still mixes async/sync Redis status calls.
rg -n -C4 'stop_generation|get_task_status\(' app/modules/conversations/conversation/conversation_service.py
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/conversation/conversation_service.py` around lines
1758 - 1779, In stop_generation, avoid mixing sync/async Redis calls: replace
the direct call to self.redis_manager.get_task_status(...) with an awaitable
path that uses self.async_redis_manager.get_task_status(...) when
self.async_redis_manager is set (await
self.async_redis_manager.get_task_status(conversation_id, run_id)), otherwise
call the synchronous self.redis_manager.get_task_status(...) as before; ensure
the code branches consistently for both setting cancellation (set_cancellation)
and checking task_status so all status reads/writes use the same manager
(reference async_redis_manager, redis_manager.get_task_status,
async_redis_manager.get_task_status, and the stop_generation method).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@app/modules/conversations/conversation/conversation_service.py`:
- Around line 1236-1254: The code is adding AI chunks via
self.history_manager.add_message_chunk but always calling await
self._history_flush_message_buffer(...), which ignores async_history_manager and
can flush the wrong buffer; update the flow to detect if
self.async_history_manager is active and, in that case, call
self.async_history_manager.add_message_chunk(...) for buffering and use the
async-aware flush path (e.g., await
self._history_flush_message_buffer_async(...) or await
self.async_history_manager.flush_message_buffer(...) — whichever async flush API
exists) instead of self._history_flush_message_buffer; make the same change for
the other occurrence around the symbols at lines ~1302-1320 so both
add_message_chunk and the corresponding flush use the same (sync vs async)
history manager consistently.
- Around line 1774-1776: In async def stop_generation replace the blocking call
time.sleep(0.5) with await asyncio.sleep(0.5) so the event loop isn't blocked;
ensure asyncio is imported at top if not already and update the call in the
stop_generation function (search for the time.sleep(0.5) line) to use await
asyncio.sleep(0.5).

---

Duplicate comments:
In `@app/modules/conversations/conversation/conversation_service.py`:
- Around line 1758-1779: In stop_generation, avoid mixing sync/async Redis
calls: replace the direct call to self.redis_manager.get_task_status(...) with
an awaitable path that uses self.async_redis_manager.get_task_status(...) when
self.async_redis_manager is set (await
self.async_redis_manager.get_task_status(conversation_id, run_id)), otherwise
call the synchronous self.redis_manager.get_task_status(...) as before; ensure
the code branches consistently for both setting cancellation (set_cancellation)
and checking task_status so all status reads/writes use the same manager
(reference async_redis_manager, redis_manager.get_task_status,
async_redis_manager.get_task_status, and the stop_generation method).

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 615ee4f and 9911843.

📒 Files selected for processing (1)
  • app/modules/conversations/conversation/conversation_service.py

…, auth/timeout/posthog/routing/redis fixes

Made-with: Cursor
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
app/modules/auth/auth_router.py (1)

73-86: ⚠️ Potential issue | 🟡 Minor

ValueError branch is unreachable after switching to login_async.

Line 74 now calls login_async, which raises HTTPException on auth failures; Line 77 won’t execute. Also, Line 83 stringifies HTTPException and drops structured detail.

♻️ Proposed fix
-        except ValueError:
-            return JSONResponse(
-                content={"error": "Invalid email or password"}, status_code=401
-            )
         except HTTPException as he:
             return JSONResponse(
-                content={"error": f"HTTP Error: {str(he)}"}, status_code=he.status_code
+                content={"error": he.detail}, status_code=he.status_code
             )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/auth/auth_router.py` around lines 73 - 86, The except ValueError
branch is dead after switching to auth_handler.login_async; remove that except
block and consolidate error handling by catching HTTPException and returning its
structured detail instead of stringifying it: in the except HTTPException as he:
handler for auth_handler.login_async, create the JSONResponse using he.detail
(falling back to str(he) if detail is missing) and preserve he.status_code; keep
the generic except Exception as before for other errors. Ensure to reference
auth_handler.login_async, HTTPException, he.detail, and JSONResponse when making
the changes.
♻️ Duplicate comments (1)
app/modules/conversations/utils/redis_streaming.py (1)

402-405: ⚠️ Potential issue | 🟠 Major

Add socket timeouts to async Redis client initialization.

Line 402–405 still creates AsyncRedis without socket_connect_timeout / socket_timeout, so stalled Redis connections can hang awaited operations.

🔧 Proposed fix
         self.redis_client: AsyncRedis = AsyncRedis.from_url(
             config.get_redis_url(),
             max_connections=max_connections,
+            socket_connect_timeout=10,
+            socket_timeout=30,
+            decode_responses=False,
         )
#!/bin/bash
# Verify sync vs async Redis client timeout parity and installed redis version constraints.
fd redis_streaming.py -t f -x sed -n '14,30p' {}
fd redis_streaming.py -t f -x sed -n '395,410p' {}
rg -n 'redis([<>=!~ ]|$)' pyproject.toml requirements.txt setup.py
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/utils/redis_streaming.py` around lines 402 - 405,
The AsyncRedis client created in AsyncRedis.from_url (assigned to
self.redis_client) lacks socket_connect_timeout and socket_timeout, allowing
stalled Redis connections to hang awaited operations; update the
AsyncRedis.from_url call that uses config.get_redis_url() to pass explicit
socket_connect_timeout and socket_timeout (choose sensible defaults consistent
with the synchronous Redis client elsewhere in the project or config values),
and ensure these timeouts are configurable via existing config helpers so the
constructor reads them rather than hardcoding values.
🧹 Nitpick comments (2)
app/modules/auth/auth_router.py (1)

40-40: Remove duplicate logger initialization.

Line 40 repeats the same assignment already done at Line 14; keep one declaration.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/auth/auth_router.py` at line 40, There is a duplicate
initialization of the module logger: remove the redundant "logger =
logging.getLogger(__name__)" declaration so only a single module-level logger
remains; locate both occurrences of the variable name logger in auth_router.py
(the one at the top-level and the duplicate later) and delete the later
duplicate, keeping the original declaration used by the module.
app/modules/auth/auth_service.py (1)

44-47: Preserve original exception cause when re-raising HTTPException.

Line 44 and Line 81 should chain with from e so traceback clearly shows the upstream failure path.

♻️ Proposed fix
-                raise HTTPException(
+                raise HTTPException(
                     status_code=e.response.status_code,
                     detail=detail,
-                )
+                ) from e
@@
-            raise HTTPException(
+            raise HTTPException(
                 status_code=e.response.status_code,
                 detail=detail,
-            )
+            ) from e

Also applies to: 81-84

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/auth/auth_service.py` around lines 44 - 47, The HTTPException
raises that use the caught exception variable e (e.g., raise
HTTPException(status_code=e.response.status_code, detail=detail)) should
preserve the original cause by chaining the exception; change both occurrences
(the one using status_code=e.response.status_code and the other at the later
raise) to use "raise HTTPException(... ) from e" so the traceback includes the
upstream error.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/modules/auth/auth_router.py`:
- Around line 61-65: The Slack webhook post currently ignores non-2xx responses;
in the block inside the try in the function/method where httpx.AsyncClient is
used (the code around the client.post call in auth_router.py), capture the
response from await client.post(...), call response.raise_for_status() to raise
for non-2xx statuses, and update the except to catch httpx.HTTPStatusError
(and/or Exception) so failures are logged and handled (e.g., in logger.warning)
rather than silently ignored.

In `@app/modules/code_provider/github/github_service.py`:
- Around line 57-66: The shared async Redis client is set/cleared without
consistent locking causing races between _get_async_redis_cache() and
close_github_async_redis_cache(); fix by using _async_redis_cache_lock in the
shutdown path and ensure all mutations/read-checks occur under that lock: in
close_github_async_redis_cache() acquire _async_redis_cache_lock before
closing/awaiting the client and setting _async_redis_cache = None, and adjust
_get_async_redis_cache() to perform the initial existence check and the
assignment/return while holding the same _async_redis_cache_lock so no coroutine
can read a client while it’s being closed. Also apply the same locking pattern
to the analogous sync path referenced around lines 77-85.
- Around line 82-84: The shutdown block around await _async_redis_cache.aclose()
currently swallows all exceptions; change it to either let errors propagate
(remove the broad try/except) or catch specific Redis errors and log them at
error level and re-raise; for example, replace the blanket except Exception as
e: logger.warning(...) with either no except so failures bubble from
_async_redis_cache.aclose(), or catch redis.exceptions.RedisError (or the
library-specific exception type used in your project) and call
logger.exception("Failed to close GitHub async Redis cache") and then re-raise
the exception so critical shutdown failures are not silently ignored.
- Line 1183: The call to _get_git_module() may raise, leaving the local name git
unbound and causing later except clauses that reference
git.InvalidGitRepositoryError or git.GitCommandError to raise UnboundLocalError;
wrap the _get_git_module() call in a try/except ImportError (or add an immediate
except ImportError) so you handle import failures before any except blocks that
reference git, e.g., ensure git is assigned or return/raise a clear error in the
ImportError handler prior to using git in except git.InvalidGitRepositoryError /
except git.GitCommandError handlers.

In `@app/modules/conversations/conversation/conversation_service.py`:
- Around line 1270-1272: The flush routine _history_flush_message_buffer
currently calls history_manager.add_message_chunk directly (occurring near the
blocks around add_message_chunk at the earlier and later occurrences) which
bypasses the async dispatcher when async history is enabled and can drop AI
messages; update _history_flush_message_buffer to detect async mode (e.g., via
self.history_manager.is_async or the same flag used to route to async history)
and, when async is enabled, call the async enqueue/write path (the history
dispatcher method or an awaited add_message_chunk_async method) instead of
history_manager.add_message_chunk; apply the same change to the other
chunk-write sites noted (the other occurrences around the same method, including
the later block at the referenced 1336-1338 region) so all chunk writes go
through the async dispatcher when async history is active.
- Line 8: The module uses the type annotation Dict[str, Any] but Any is not
imported from typing; update the top import statement (the typing import that
currently lists AsyncGenerator, Callable, Dict, List, Optional, Union) to also
include Any so the annotations (e.g., the Dict[str, Any] usages in this file)
are valid.

In `@app/modules/conversations/utils/conversation_routing.py`:
- Around line 84-93: The reservation loop using _reservation_key +
async_redis.redis_client.set(nx=True, ex=RUN_ID_RESERVATION_TTL) can still
re-use a run_id whose actual stream is still active after the reservation TTL
expires; to fix, after a successful SET (i.e., when claimed is truthy) verify
that the corresponding stream/active-run key for run_id does not already exist
(use async_redis.redis_client.exists or an appropriate stream key check) and if
it does exist delete the reservation key and continue the loop; for stronger
guarantees consider replacing the two-step check with an atomic Lua script that
checks for an existing stream and sets the reservation in one operation
(reference symbols: _reservation_key, run_id, original_run_id,
async_redis.redis_client.set, RUN_ID_RESERVATION_TTL).

In `@app/modules/utils/posthog_helper.py`:
- Around line 50-53: The code schedules a background call via
loop.run_in_executor passing the mutable properties dict by reference to
self._capture_sync; create a snapshot copy of properties before scheduling
(e.g., props = dict(properties) or use copy.deepcopy(properties) if nested
structures must be preserved) and pass that snapshot into the lambda instead of
the original variable so mutations after send_event returns cannot alter the
payload; update the call site around loop.run_in_executor / self._capture_sync
to use the snapshot variable.

---

Outside diff comments:
In `@app/modules/auth/auth_router.py`:
- Around line 73-86: The except ValueError branch is dead after switching to
auth_handler.login_async; remove that except block and consolidate error
handling by catching HTTPException and returning its structured detail instead
of stringifying it: in the except HTTPException as he: handler for
auth_handler.login_async, create the JSONResponse using he.detail (falling back
to str(he) if detail is missing) and preserve he.status_code; keep the generic
except Exception as before for other errors. Ensure to reference
auth_handler.login_async, HTTPException, he.detail, and JSONResponse when making
the changes.

---

Duplicate comments:
In `@app/modules/conversations/utils/redis_streaming.py`:
- Around line 402-405: The AsyncRedis client created in AsyncRedis.from_url
(assigned to self.redis_client) lacks socket_connect_timeout and socket_timeout,
allowing stalled Redis connections to hang awaited operations; update the
AsyncRedis.from_url call that uses config.get_redis_url() to pass explicit
socket_connect_timeout and socket_timeout (choose sensible defaults consistent
with the synchronous Redis client elsewhere in the project or config values),
and ensure these timeouts are configurable via existing config helpers so the
constructor reads them rather than hardcoding values.

---

Nitpick comments:
In `@app/modules/auth/auth_router.py`:
- Line 40: There is a duplicate initialization of the module logger: remove the
redundant "logger = logging.getLogger(__name__)" declaration so only a single
module-level logger remains; locate both occurrences of the variable name logger
in auth_router.py (the one at the top-level and the duplicate later) and delete
the later duplicate, keeping the original declaration used by the module.

In `@app/modules/auth/auth_service.py`:
- Around line 44-47: The HTTPException raises that use the caught exception
variable e (e.g., raise HTTPException(status_code=e.response.status_code,
detail=detail)) should preserve the original cause by chaining the exception;
change both occurrences (the one using status_code=e.response.status_code and
the other at the later raise) to use "raise HTTPException(... ) from e" so the
traceback includes the upstream error.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9911843 and 7927ef7.

📒 Files selected for processing (8)
  • app/modules/auth/auth_router.py
  • app/modules/auth/auth_service.py
  • app/modules/code_provider/github/github_service.py
  • app/modules/conversations/conversation/conversation_service.py
  • app/modules/conversations/utils/conversation_routing.py
  • app/modules/conversations/utils/redis_streaming.py
  • app/modules/intelligence/memory/chat_history_service.py
  • app/modules/utils/posthog_helper.py

Comment thread app/modules/auth/auth_router.py
Comment thread app/modules/code_provider/github/github_service.py
Comment thread app/modules/code_provider/github/github_service.py
Comment thread app/modules/code_provider/github/github_service.py Outdated
Comment thread app/modules/conversations/conversation/conversation_service.py Outdated
Comment thread app/modules/conversations/conversation/conversation_service.py
Comment thread app/modules/conversations/utils/conversation_routing.py
Comment thread app/modules/utils/posthog_helper.py
… timeout helper, run_id stream check

Made-with: Cursor
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
app/modules/conversations/utils/conversation_routing.py (1)

49-66: ⚠️ Potential issue | 🟡 Minor

Sync ensure_unique_run_id is unused dead code and should be removed.

The sync function uses a non-atomic exists() loop that could race, but it has no active call sites in the codebase—only the async version is used. If this function is not intended for future use (e.g., Celery tasks), remove it to avoid confusion. If it may be used, align it with the async reservation strategy already implemented.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/utils/conversation_routing.py` around lines 49 -
66, The function ensure_unique_run_id is unused and unsafe (non-atomic exists()
loop) — remove this dead sync implementation to avoid confusion; delete the
ensure_unique_run_id function and any associated imports of
RedisStreamManager/stream_key in conversation_routing.py, or if you intend to
keep sync support, replace it with a reservation approach consistent with the
async counterpart (mirror the async reservation logic and use atomic redis
operations instead of redis_client.exists in ensure_unique_run_id).
🧹 Nitpick comments (3)
app/modules/conversations/conversation/conversation_service.py (1)

1795-1795: Replace blocking time.sleep with await asyncio.sleep.

time.sleep(0.5) blocks the event loop in this async method. Use await asyncio.sleep(0.5) to allow other coroutines to run during the wait.

♻️ Proposed fix
-                time.sleep(0.5)
+                await asyncio.sleep(0.5)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/conversation/conversation_service.py` at line 1795,
In the async method that currently calls time.sleep(0.5) replace that blocking
call with await asyncio.sleep(0.5) so the event loop is not blocked; add an
import for asyncio at the top of the module if missing and remove the time
import (or other time.sleep usages) if it becomes unused; ensure the change is
applied to the exact occurrence of time.sleep(0.5) in conversation_service.py so
the coroutine yields correctly.
app/modules/utils/email_helper.py (1)

199-203: Consider simplifying the try block.

The static analysis flags a stylistic issue (TRY300). Since _send_with_resend already logs exceptions before re-raising, this catch block purely exists to swallow failures for fire-and-forget alerts, which is acceptable. A minor simplification:

Optional simplification
         try:
-            email = await _send_with_resend(params)
-            return email
+            return await _send_with_resend(params)
         except Exception:
             return None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/utils/email_helper.py` around lines 199 - 203, Remove the
unnecessary try/except and directly return the awaited result of
_send_with_resend(params); since _send_with_resend already logs and re-raises,
replace the try/except block with a single line "return await
_send_with_resend(params)" where the current call to _send_with_resend(params)
resides to simplify the code.
app/modules/conversations/utils/conversation_routing.py (1)

319-321: Avoid allocating a fresh thread pool per request.

Creating ThreadPoolExecutor() in-request is expensive under concurrency. Prefer the loop’s shared executor for this single blocking call.

♻️ Proposed change
-        loop = asyncio.get_event_loop()
-        with ThreadPoolExecutor() as executor:
-            events = await loop.run_in_executor(executor, collect_from_stream)
+        loop = asyncio.get_event_loop()
+        events = await loop.run_in_executor(None, collect_from_stream)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/modules/conversations/utils/conversation_routing.py` around lines 319 -
321, Replace the per-request ThreadPoolExecutor allocation by using the event
loop's shared executor: get the running loop (asyncio.get_running_loop()) and
call loop.run_in_executor(None, collect_from_stream) instead of creating
ThreadPoolExecutor(); this uses the loop's default/shared executor for the
blocking collect_from_stream call and avoids expensive per-request thread pool
creation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/modules/code_provider/github/github_service.py`:
- Around line 399-407: The async branch currently logs the decryption failure
using logger.error (including uid, error, len(raw_token)) and then immediately
returns None; to match the sync behavior you must remove the early return so the
code falls through and attempts the legacy provider_info/system token fallback
instead—i.e., keep the logger.error call but do not return in the exception
handler (the same exception handling block that contains the logger.error and
the return None), allowing the subsequent logic that reads provider_info to run.

In `@app/modules/conversations/utils/conversation_routing.py`:
- Around line 206-208: The current calls to
async_redis_manager.wait_for_task_start(..., require_running=True) in
conversation_routing.py can block for the full timeout if a task moves rapidly
from queued to completed/error or only transiently hits running; change the
calls to use require_running=False (for both occurrences around the earlier call
and the one at the later occurrence) and then immediately inspect the returned
task status (or poll briefly with a small timeout) to handle completed/error
states without waiting 30s for a running state—i.e., replace
require_running=True with require_running=False in the wait_for_task_start calls
and add a short follow-up check on the returned object (status field) to branch
to completed/error handling or continue waiting if truly needed.

---

Outside diff comments:
In `@app/modules/conversations/utils/conversation_routing.py`:
- Around line 49-66: The function ensure_unique_run_id is unused and unsafe
(non-atomic exists() loop) — remove this dead sync implementation to avoid
confusion; delete the ensure_unique_run_id function and any associated imports
of RedisStreamManager/stream_key in conversation_routing.py, or if you intend to
keep sync support, replace it with a reservation approach consistent with the
async counterpart (mirror the async reservation logic and use atomic redis
operations instead of redis_client.exists in ensure_unique_run_id).

---

Nitpick comments:
In `@app/modules/conversations/conversation/conversation_service.py`:
- Line 1795: In the async method that currently calls time.sleep(0.5) replace
that blocking call with await asyncio.sleep(0.5) so the event loop is not
blocked; add an import for asyncio at the top of the module if missing and
remove the time import (or other time.sleep usages) if it becomes unused; ensure
the change is applied to the exact occurrence of time.sleep(0.5) in
conversation_service.py so the coroutine yields correctly.

In `@app/modules/conversations/utils/conversation_routing.py`:
- Around line 319-321: Replace the per-request ThreadPoolExecutor allocation by
using the event loop's shared executor: get the running loop
(asyncio.get_running_loop()) and call loop.run_in_executor(None,
collect_from_stream) instead of creating ThreadPoolExecutor(); this uses the
loop's default/shared executor for the blocking collect_from_stream call and
avoids expensive per-request thread pool creation.

In `@app/modules/utils/email_helper.py`:
- Around line 199-203: Remove the unnecessary try/except and directly return the
awaited result of _send_with_resend(params); since _send_with_resend already
logs and re-raises, replace the try/except block with a single line "return
await _send_with_resend(params)" where the current call to
_send_with_resend(params) resides to simplify the code.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7927ef7 and ca44d43.

📒 Files selected for processing (4)
  • app/modules/code_provider/github/github_service.py
  • app/modules/conversations/conversation/conversation_service.py
  • app/modules/conversations/utils/conversation_routing.py
  • app/modules/utils/email_helper.py

Comment thread app/modules/code_provider/github/github_service.py
Comment thread app/modules/conversations/utils/conversation_routing.py
@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented Mar 2, 2026

Quality Gate Failed Quality Gate failed

Failed conditions
3 Security Hotspots
7.2% Duplication on New Code (required ≤ 3%)
D Reliability Rating on New Code (required ≥ A)
B Security Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant