[Feat] SSE 이벤트 인프라 구현#11
Conversation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughSSE 이벤트 타입과 Pydantic 모델을 추가하고, asyncio.Queue 기반의 SSEEmitter를 구현해 이벤트 발행, 종료 신호, 그리고 비동기 제너레이터 형태의 스트리밍을 제공합니다. ChangesSSE 이벤트 인프라
Sequence Diagram(s)sequenceDiagram
participant Client
participant SSEEmitter
participant Queue
participant Consumer
Client->>SSEEmitter: emit(event, data)
SSEEmitter->>Queue: put(SSEEvent)
Client->>SSEEmitter: close()
SSEEmitter->>Queue: put(None)
Consumer->>SSEEmitter: stream()
loop until None
SSEEmitter->>Queue: get()
SSEEmitter->>Consumer: yield to_sse()
end
전체 요약SSE 기반 실시간 이벤트 스트리밍 인프라의 기초 계층을 구현합니다. 이벤트 타입과 데이터 모델, 비동기 큐 기반 이미터를 제공하여 LangGraph 실행 중 발생하는 이벤트를 클라이언트에 실시간으로 전달할 수 있도록 합니다. 코드 리뷰 예상 시간🎯 3 (Moderate) | ⏱️ ~20분 시
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/proovy_agent/common/sse/emitter.py`:
- Around line 18-28: The stream currently puts a sentinel None in close() and
then stops immediately in stream(), which allows a race where emit() can enqueue
events after close() and those events are lost; add a closed state and guard to
prevent emits after close: introduce an instance flag (e.g., self._closed) and
an asyncio.Lock (e.g., self._close_lock), set self._closed = True inside close()
while holding the lock and then put the sentinel, and modify emit(...) to
acquire the same lock or check self._closed and raise/ignore if True so no new
events are enqueued after close; keep stream() behavior of breaking on the
sentinel (None) so shutdown remains deterministic.
- Line 22: Add a return type hint to the async method stream: change its
signature to declare AsyncIterator[dict[str, str]] so it matches the other typed
methods (__init__, emit, close) and complies with the project's Python 3.12+
mandatory type-hinting guideline; ensure you import or reference
typing.AsyncIterator (or from typing import AsyncIterator) if not already
imported and update the stream method signature to use AsyncIterator[dict[str,
str]].
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: daa20db6-45c7-4e74-b14e-5a2f5a436d08
📒 Files selected for processing (2)
src/proovy_agent/common/sse/emitter.pysrc/proovy_agent/common/sse/events.py
- close() 호출 후 emit()이 이벤트를 enqueue하는 레이스 컨디션 수정 (_closed 플래그 + asyncio.Lock으로 close 이후 emit 차단) - close() 중복 호출 시 sentinel 이중 삽입 방지 - stream() 반환 타입 힌트 AsyncIterator[dict[str, str]] 추가 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
chowon442
left a comment
There was a problem hiding this comment.
인프라여도 테스트 코드 작성해주시면 좋을 것 같아요! test_emit_and_stream(), test_close_idempotent(), test_emit_after_close_ignored() 같은 것들 정도는 SSEEmitter가 순수 async 로직이라 외부 의존성이 없어서 쉽게 추가할 수 있을 것 같습니다.
마지막으로 주석들은 한국어로 작성해주시면 좋을 것 같아요~
| """Collects SSE events and streams them via an async queue.""" | ||
|
|
||
| def __init__(self) -> None: | ||
| self._queue: asyncio.Queue[SSEEvent | None] = asyncio.Queue() |
There was a problem hiding this comment.
asyncio.Queue()에 maxsize를 설정해서 무제한 메모리 증가를 막아주면 좋을 것 같아요!
There was a problem hiding this comment.
maxsize=256으로 설정했습니다!
| async def close(self) -> None: | ||
| """Signal stream end.""" | ||
| async with self._close_lock: | ||
| if self._closed: | ||
| return | ||
| self._closed = True | ||
| await self._queue.put(None) |
There was a problem hiding this comment.
close() 이후 emit()이 silent drop이라서 logging.debug 있으면 디버깅하기 좋을 것 같아요!
|
|
||
| def to_sse(self) -> dict: | ||
| """Return dict compatible with sse-starlette ServerSentEvent.""" | ||
| import json |
There was a problem hiding this comment.
to_sse() 안에서 import하는게 맞는 걸까요?
- 큐 최대 크기 제한 (maxsize=256) 으로 무제한 메모리 증가 방지 - close() 후 emit() 무시 시 logging.debug 로 디버깅 정보 기록 - events.py의 import json을 모듈 상단으로 이동 - 한국어 docstring 적용 - SSEEmitter 단위 테스트 추가 - test_emit_and_stream: 이벤트 순서 검증 - test_close_idempotent: close() 중복 호출 안전성 검증 - test_emit_after_close_ignored: close 후 emit 무시 검증 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
📌 관련 이슈
🏷️ PR 타입
📝 작업 내용
src/proovy_agent/common/sse/events.py구현 — 이벤트 타입 정의 (SSEEvent,EventType)src/proovy_agent/common/sse/emitter.py구현 — async 큐 기반 이벤트 발행 클래스 (SSEEmitter)stream()제너레이터로EventSourceResponse에 바로 연결 가능page_start,solve_progress,token,tool_start,tool_result,image_placeholder,image_result,node_result,credit_settled,error📸 스크린샷
✅ 체크리스트
📎 기타 참고사항
Summary by CodeRabbit