Skip to content

Commit 906b762

Browse files
committed
experiment working, database leaks closed
1 parent 8a0d8b1 commit 906b762

8 files changed

Lines changed: 145 additions & 14 deletions

File tree

app/api/endpoints/analysis_endpoints.py

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import asyncio
23
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
34
from typing import List
45
from uuid import UUID
@@ -112,24 +113,95 @@ async def stream_claim_analysis_exp(
112113
claim = await claim_service.get_claim(claim_id=claim_id, user_id=current_user.id)
113114

114115
session = claim_service._claim_repo._session
116+
117+
await session.rollback()
115118

116119
async def event_generator():
117120
try:
118121
logger.info(f"Starting analysis stream for claim {claim_id}")
119122
yield f"data: {json.dumps({'type': 'status', 'content': 'Initializing analysis...'})}\n\n"
120123

121-
async for event in analysis_orchestrator.analyze_claim_stream(
124+
orchestrator_stream = analysis_orchestrator.analyze_claim_stream(
122125
claim=claim, user_id=current_user.id, default=False
123-
):
124-
if isinstance(event, dict):
125-
yield f"data: {json.dumps(event)}\n\n"
126+
)
127+
# ---------------------------------------------------------
128+
# THE HEALTH CHECK LOOP
129+
# ---------------------------------------------------------
130+
next_event_task = None
131+
132+
while True:
133+
# Only create a new task if we don't already have one waiting
134+
if next_event_task is None:
135+
next_event_task = asyncio.create_task(anext(orchestrator_stream))
136+
137+
# Wait for the task to finish, but only wait 15 seconds
138+
done, pending = await asyncio.wait(
139+
[next_event_task],
140+
timeout=15.0,
141+
return_when=asyncio.FIRST_COMPLETED
142+
)
143+
144+
if next_event_task in done:
145+
# The LLM yielded a chunk! Let's process it.
146+
try:
147+
event = next_event_task.result()
148+
if isinstance(event, dict):
149+
yield f"data: {json.dumps(event)}\n\n"
150+
151+
# Reset the task so we grab the next chunk on the next loop
152+
next_event_task = None
153+
154+
except StopAsyncIteration:
155+
# The stream finished normally!
156+
break
157+
except Exception as e:
158+
# If the orchestrator crashed, catch it here
159+
raise e
160+
161+
else:
162+
# The task is in 'pending'. 15 seconds passed, but the LLM is still thinking.
163+
# We yield a heartbeat, but we DO NOT reset next_event_task.
164+
# It will keep running safely in the background on the next loop!
165+
logger.debug("Stream idle for 15s. Sending health check ping...")
166+
yield ": healthcheck\n\n"
167+
168+
yield "data: [DONE]\n\n"
169+
170+
except asyncio.CancelledError:
171+
logger.warning(f"Client disconnected during stream for claim {claim_id}")
172+
raise
173+
174+
# async for event in analysis_orchestrator.analyze_claim_stream(
175+
# claim=claim, user_id=current_user.id, default=False
176+
# ):
177+
# if isinstance(event, dict):
178+
# yield f"data: {json.dumps(event)}\n\n"
179+
180+
# yield "data: [DONE]\n\n"
181+
182+
# except asyncio.CancelledError:
183+
# # THE FIX: The user closed their browser!
184+
# logger.info(f"Client disconnected during stream for claim {claim_id}")
185+
# await session.rollback() # Explicitly release the lock!
186+
# raise
126187

127188
except Exception as e:
128189
logger.error(f"Error in analysis stream: {str(e)}", exc_info=True)
129190
yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n"
130191
finally:
131-
await session.close()
132-
yield "data: [DONE]\n\n"
192+
# async def force_cleanup():
193+
# try:
194+
# await session.rollback()
195+
# except Exception as e:
196+
# logger.error(f"Force rollback failed: {e}")
197+
# finally:
198+
# await session.close()
199+
200+
# # Fire and forget. FastAPI cannot cancel this!
201+
# asyncio.create_task(force_cleanup())
202+
if next_event_task and not next_event_task.done():
203+
logger.debug("Cancelling background orchestrator task...")
204+
next_event_task.cancel()
133205

134206
return StreamingResponse(
135207
event_generator(),
@@ -138,8 +210,8 @@ async def event_generator():
138210
"Cache-Control": "no-cache",
139211
"Connection": "keep-alive",
140212
"X-Accel-Buffering": "no",
141-
"Access-Control-Allow-Origin": "*",
142-
"Access-Control-Allow-Credentials": "true",
213+
# "Access-Control-Allow-Origin": "*",
214+
# "Access-Control-Allow-Credentials": "true",
143215
},
144216
)
145217
except Exception as e:

app/main.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ async def lifespan(app: FastAPI):
6363
"https://veracity-eval-frontend-git-g2frontend-complex-data-lab.vercel.app",
6464
"https://veracity-eval-frontend-git-g1frontendmod-complex-data-lab.vercel.app",
6565
"https://veracity-eval-frontend-git-g1frontendmod2-complex-data-lab.vercel.app",
66+
"https://veracity-eval-frontend-git-g0frontendmod-complex-data-lab.vercel.app",
6667
],
6768
allow_credentials=True,
6869
allow_methods=["*"],

app/models/database/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ class AnalysisModel(Base):
145145

146146
log_probs: Mapped[bytes] = mapped_column(LargeBinary, nullable=True)
147147

148-
claim: Mapped["ClaimModel"] = relationship(back_populates="analyses", doc="Related claim")
148+
claim: Mapped["ClaimModel"] = relationship(back_populates="analyses", doc="Related claim", lazy="selectin")
149149
searches: Mapped[List["SearchModel"]] = relationship(back_populates="analysis", cascade="all, delete-orphan")
150150
feedbacks: Mapped[List["FeedbackModel"]] = relationship(
151151
back_populates="analysis",

app/repositories/implementations/claim_repository.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ async def update_status(self, claim_id: UUID, status: ClaimStatus) -> Optional[C
7676

7777
claim.status = status
7878
updated_claim = await self.update(claim)
79+
80+
await self._session.commit()
81+
7982
return updated_claim
8083

8184
except Exception:

app/repositories/implementations/source_repository.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,19 @@ async def create_with_domain(self, source: SourceModel) -> Optional[SourceModel]
4343
"""Create a source with its domain relationship."""
4444
try:
4545
self._session.add(source)
46-
await self._session.flush()
47-
await self._session.refresh(source, ["domain"])
4846
await self._session.commit()
49-
return source
47+
48+
# --- THE FIX ---
49+
# Re-fetch to guarantee it is fully loaded and not expired by the commit
50+
stmt = select(self._model_class).where(self._model_class.id == source.id).options(selectinload(self._model_class.domain))
51+
result = await self._session.execute(stmt)
52+
loaded_source = result.scalar_one()
53+
54+
self._session.expunge(loaded_source)
55+
if loaded_source.domain:
56+
self._session.expunge(loaded_source.domain)
57+
58+
return loaded_source
5059
except Exception as e:
5160
await self._session.rollback()
5261
raise e
@@ -56,7 +65,18 @@ async def update(self, source: SourceModel) -> SourceModel:
5665
try:
5766
merged = await self._session.merge(source)
5867
await self._session.commit()
59-
return merged
68+
69+
# --- THE FIX ---
70+
# Same treatment: refetch eagerly, then detach
71+
stmt = select(self._model_class).where(self._model_class.id == merged.id).options(selectinload(self._model_class.domain))
72+
result = await self._session.execute(stmt)
73+
loaded_source = result.scalar_one()
74+
75+
self._session.expunge(loaded_source)
76+
if loaded_source.domain:
77+
self._session.expunge(loaded_source.domain)
78+
79+
return loaded_source
6080
except Exception as e:
6181
await self._session.rollback()
6282
raise e

app/services/analysis_orchestrator.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,17 @@ async def _generate_analysis(
104104
)
105105
current_analysis = await self._analysis_repo.create(initial_analysis)
106106

107+
await self._analysis_repo._session.commit()
108+
107109
yield {"type": "status", "content": "Searching for relevant sources..."}
108110

109111
query = self._query_initial(claim_text, language)
110112
messages = [LLMMessage(role="user", content=query)]
111113
all_sources = []
112114
for turns in range(MAX_NUM_TURNS):
113115

116+
await self._analysis_repo._session.rollback()
117+
114118
response = await self._llm.generate_response(messages)
115119

116120
main_agent_message = response.text
@@ -134,12 +138,17 @@ async def _generate_analysis(
134138
updated_at=datetime.now(UTC),
135139
)
136140
current_search = await self._search_repo.create(initial_search)
141+
142+
await self._search_repo._session.commit()
143+
137144
sources = await self._web_search.search_and_create_sources(
138145
claim_text=search_request_match.matched_content, search_id=current_search.id, language=language
139146
)
140147

141148
all_sources += sources
142149

150+
await self._analysis_repo._session.rollback()
151+
143152
search_response = self._web_search.format_sources_for_prompt(sources, language)
144153

145154
if language == "english":
@@ -196,6 +205,8 @@ async def _generate_analysis(
196205
analysis_text = []
197206
log_probs = []
198207

208+
await self._analysis_repo._session.rollback()
209+
199210
async for chunk in self._llm.generate_stream(messages):
200211
if not chunk.is_complete:
201212
analysis_text.append(chunk.text)
@@ -244,6 +255,8 @@ async def _generate_analysis(
244255

245256
if not default:
246257

258+
await self._analysis_repo._session.rollback()
259+
247260
con_score = await self._generate_logprob_confidence_score(log_probs=log_probs)
248261
logger.info(con_score)
249262
current_analysis.confidence_score = float(con_score)
@@ -515,6 +528,7 @@ async def analyze_claim_stream(
515528
self._analysis_state.current_claim = claim
516529

517530
await self._claim_repo.update_status(claim.id, ClaimStatus.analyzing)
531+
518532
yield {"type": "status", "content": "Starting analysis..."}
519533

520534
# Generate analysis

app/services/implementations/serper_web_search_service.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ async def search_and_create_sources(
6363
except Exception as e:
6464
logger.error(f"Error processing search result: {str(e)}", exc_info=True)
6565
continue
66-
66+
await self.source_repository._session.commit()
67+
6768
return sources
6869

6970
except Exception as e:

infrastructure/terraform/main.tf

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,12 @@ resource "kubernetes_service" "misinformation_mitigation_api" {
374374
metadata {
375375
name = "misinformation-mitigation-api"
376376
namespace = kubernetes_namespace.misinformation_mitigation.metadata[0].name
377+
378+
annotations = {
379+
"cloud.google.com/backend-config" = jsonencode({
380+
"default" = "api-timeout-config"
381+
})
382+
}
377383
}
378384

379385
spec {
@@ -391,6 +397,20 @@ resource "kubernetes_service" "misinformation_mitigation_api" {
391397

392398

393399
# Load Balancer and DNS
400+
resource "kubernetes_manifest" "api_backend_config" {
401+
manifest = {
402+
apiVersion = "cloud.google.com/v1"
403+
kind = "BackendConfig"
404+
metadata = {
405+
name = "api-timeout-config"
406+
namespace = kubernetes_namespace.misinformation_mitigation.metadata[0].name
407+
}
408+
spec = {
409+
timeoutSec = 600 # 600 seconds = 10 minutes
410+
}
411+
}
412+
}
413+
394414
resource "google_compute_global_address" "misinformation_mitigation_api_ip" {
395415
name = "misinformation-mitigation-api-ip"
396416
}

0 commit comments

Comments
 (0)