Skip to content

Commit ef1533f

Browse files
committed
format
1 parent 906b762 commit ef1533f

3 files changed

Lines changed: 35 additions & 29 deletions

File tree

app/api/endpoints/analysis_endpoints.py

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ async def stream_claim_analysis_exp(
113113
claim = await claim_service.get_claim(claim_id=claim_id, user_id=current_user.id)
114114

115115
session = claim_service._claim_repo._session
116-
116+
117117
await session.rollback()
118118

119119
async def event_generator():
@@ -128,59 +128,57 @@ async def event_generator():
128128
# THE HEALTH CHECK LOOP
129129
# ---------------------------------------------------------
130130
next_event_task = None
131-
131+
132132
while True:
133133
# Only create a new task if we don't already have one waiting
134134
if next_event_task is None:
135135
next_event_task = asyncio.create_task(anext(orchestrator_stream))
136136

137137
# Wait for the task to finish, but only wait 15 seconds
138138
done, pending = await asyncio.wait(
139-
[next_event_task],
140-
timeout=15.0,
141-
return_when=asyncio.FIRST_COMPLETED
139+
[next_event_task], timeout=15.0, return_when=asyncio.FIRST_COMPLETED
142140
)
143-
141+
144142
if next_event_task in done:
145143
# The LLM yielded a chunk! Let's process it.
146144
try:
147145
event = next_event_task.result()
148146
if isinstance(event, dict):
149147
yield f"data: {json.dumps(event)}\n\n"
150-
148+
151149
# Reset the task so we grab the next chunk on the next loop
152-
next_event_task = None
153-
150+
next_event_task = None
151+
154152
except StopAsyncIteration:
155153
# The stream finished normally!
156154
break
157155
except Exception as e:
158156
# If the orchestrator crashed, catch it here
159157
raise e
160-
158+
161159
else:
162160
# The task is in 'pending'. 15 seconds passed, but the LLM is still thinking.
163-
# We yield a heartbeat, but we DO NOT reset next_event_task.
161+
# We yield a heartbeat, but we DO NOT reset next_event_task.
164162
# It will keep running safely in the background on the next loop!
165163
logger.debug("Stream idle for 15s. Sending health check ping...")
166164
yield ": healthcheck\n\n"
167-
165+
168166
yield "data: [DONE]\n\n"
169167

170168
except asyncio.CancelledError:
171169
logger.warning(f"Client disconnected during stream for claim {claim_id}")
172170
raise
173-
171+
174172
# async for event in analysis_orchestrator.analyze_claim_stream(
175-
# claim=claim, user_id=current_user.id, default=False
176-
# ):
177-
# if isinstance(event, dict):
178-
# yield f"data: {json.dumps(event)}\n\n"
173+
# claim=claim, user_id=current_user.id, default=False
174+
# ):
175+
# if isinstance(event, dict):
176+
# yield f"data: {json.dumps(event)}\n\n"
179177

180-
# yield "data: [DONE]\n\n"
178+
# yield "data: [DONE]\n\n"
181179

182180
# except asyncio.CancelledError:
183-
# # THE FIX: The user closed their browser!
181+
# # THE FIX: The user closed their browser!
184182
# logger.info(f"Client disconnected during stream for claim {claim_id}")
185183
# await session.rollback() # Explicitly release the lock!
186184
# raise
@@ -196,7 +194,7 @@ async def event_generator():
196194
# logger.error(f"Force rollback failed: {e}")
197195
# finally:
198196
# await session.close()
199-
197+
200198
# # Fire and forget. FastAPI cannot cancel this!
201199
# asyncio.create_task(force_cleanup())
202200
if next_event_task and not next_event_task.done():

app/repositories/implementations/source_repository.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,21 @@ async def create_with_domain(self, source: SourceModel) -> Optional[SourceModel]
4444
try:
4545
self._session.add(source)
4646
await self._session.commit()
47-
47+
4848
# --- THE FIX ---
4949
# Re-fetch to guarantee it is fully loaded and not expired by the commit
50-
stmt = select(self._model_class).where(self._model_class.id == source.id).options(selectinload(self._model_class.domain))
50+
stmt = (
51+
select(self._model_class)
52+
.where(self._model_class.id == source.id)
53+
.options(selectinload(self._model_class.domain))
54+
)
5155
result = await self._session.execute(stmt)
5256
loaded_source = result.scalar_one()
53-
57+
5458
self._session.expunge(loaded_source)
5559
if loaded_source.domain:
5660
self._session.expunge(loaded_source.domain)
57-
61+
5862
return loaded_source
5963
except Exception as e:
6064
await self._session.rollback()
@@ -65,17 +69,21 @@ async def update(self, source: SourceModel) -> SourceModel:
6569
try:
6670
merged = await self._session.merge(source)
6771
await self._session.commit()
68-
72+
6973
# --- THE FIX ---
7074
# Same treatment: refetch eagerly, then detach
71-
stmt = select(self._model_class).where(self._model_class.id == merged.id).options(selectinload(self._model_class.domain))
75+
stmt = (
76+
select(self._model_class)
77+
.where(self._model_class.id == merged.id)
78+
.options(selectinload(self._model_class.domain))
79+
)
7280
result = await self._session.execute(stmt)
7381
loaded_source = result.scalar_one()
74-
82+
7583
self._session.expunge(loaded_source)
7684
if loaded_source.domain:
7785
self._session.expunge(loaded_source.domain)
78-
86+
7987
return loaded_source
8088
except Exception as e:
8189
await self._session.rollback()

app/services/implementations/serper_web_search_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ async def search_and_create_sources(
6464
logger.error(f"Error processing search result: {str(e)}", exc_info=True)
6565
continue
6666
await self.source_repository._session.commit()
67-
67+
6868
return sources
6969

7070
except Exception as e:

0 commit comments

Comments
 (0)