Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 3 additions & 72 deletions .github/workflows/stack-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,76 +185,7 @@ jobs:
path: /tmp/all-images.tar.zst
retention-days: 1

# Three parallel test jobs
backend-integration:
name: Backend Integration Tests
needs: [build-images]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6

- name: Cache and load Docker images
uses: ./.github/actions/docker-cache
with:
images: ${{ env.MONGO_IMAGE }} ${{ env.REDIS_IMAGE }} ${{ env.KAFKA_IMAGE }} ${{ env.ZOOKEEPER_IMAGE }} ${{ env.SCHEMA_REGISTRY_IMAGE }}

- name: Download built images
uses: actions/download-artifact@v7
with:
name: docker-images
path: /tmp

- name: Load built images
run: zstd -d -c /tmp/all-images.tar.zst | docker load

- name: Setup k3s
uses: ./.github/actions/k3s-setup

- name: Use test environment config
run: cp backend/.env.test backend/.env

- name: Start stack
run: ./deploy.sh dev --wait

- name: Run integration tests
timeout-minutes: 10
run: |
docker compose exec -T -e TEST_RUN_ID=integration backend \
uv run pytest tests/integration -v -rs \
--durations=0 \
--cov=app \
--cov-report=xml:coverage-integration.xml \
--cov-report=term

- name: Copy coverage
if: always()
run: docker compose cp backend:/app/coverage-integration.xml backend/coverage-integration.xml || true

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
if: always()
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: backend/coverage-integration.xml
flags: backend-integration
name: backend-integration-coverage
fail_ci_if_error: false

- name: Collect logs on failure
if: failure()
run: |
mkdir -p logs
docker compose logs > logs/docker-compose.log 2>&1
docker compose logs backend > logs/backend.log 2>&1
docker compose logs kafka > logs/kafka.log 2>&1

- name: Upload logs
if: failure()
uses: actions/upload-artifact@v6
with:
name: backend-integration-logs
path: logs/

# Parallel test jobs (backend-e2e, frontend-e2e)
backend-e2e:
name: Backend E2E Tests
needs: [build-images]
Expand Down Expand Up @@ -289,9 +220,9 @@ jobs:
run: docker compose exec -T backend uv run python scripts/seed_users.py

- name: Run E2E tests
timeout-minutes: 10
timeout-minutes: 15
run: |
docker compose exec -T -e TEST_RUN_ID=e2e backend \
docker compose exec -T backend \
uv run pytest tests/e2e -v -rs \
--durations=0 \
--cov=app \
Expand Down
4 changes: 2 additions & 2 deletions backend/app/api/routes/admin/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def export_events_csv(
) -> StreamingResponse:
try:
export_filter = EventFilter(
event_types=[str(et) for et in event_types] if event_types else None,
event_types=event_types,
start_time=start_time,
end_time=end_time,
)
Expand Down Expand Up @@ -107,7 +107,7 @@ async def export_events_json(
"""Export events as JSON with comprehensive filtering."""
try:
export_filter = EventFilter(
event_types=[str(et) for et in event_types] if event_types else None,
event_types=event_types,
aggregate_id=aggregate_id,
correlation_id=correlation_id,
user_id=user_id,
Expand Down
52 changes: 23 additions & 29 deletions backend/app/api/routes/admin/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@
from app.api.dependencies import admin_user
from app.db.repositories.admin.admin_user_repository import AdminUserRepository
from app.domain.enums.user import UserRole
from app.domain.rate_limit import UserRateLimit
from app.domain.rate_limit import RateLimitRule, UserRateLimit
from app.domain.user import UserUpdate as DomainUserUpdate
from app.schemas_pydantic.admin_user_overview import (
AdminUserOverview,
DerivedCounts,
RateLimitSummary,
)
from app.schemas_pydantic.events import EventResponse, EventStatistics
from app.schemas_pydantic.events import EventStatistics
from app.schemas_pydantic.user import (
DeleteUserResponse,
MessageResponse,
PasswordResetRequest,
RateLimitUpdateRequest,
RateLimitUpdateResponse,
UserCreate,
UserListResponse,
Expand All @@ -27,7 +28,6 @@
UserUpdate,
)
from app.services.admin import AdminUserService
from app.services.rate_limit_service import RateLimitService

router = APIRouter(
prefix="/admin/users", tags=["admin", "users"], route_class=DishkaRoute, dependencies=[Depends(admin_user)]
Expand All @@ -38,7 +38,6 @@
async def list_users(
admin: Annotated[UserResponse, Depends(admin_user)],
admin_user_service: FromDishka[AdminUserService],
rate_limit_service: FromDishka[RateLimitService],
limit: int = Query(default=100, le=1000),
offset: int = Query(default=0, ge=0),
search: str | None = None,
Expand All @@ -51,24 +50,8 @@ async def list_users(
search=search,
role=role,
)

summaries = await rate_limit_service.get_user_rate_limit_summaries([u.user_id for u in result.users])
user_responses: list[UserResponse] = []
for user in result.users:
user_response = UserResponse.model_validate(user)
summary = summaries.get(user.user_id)
if summary:
user_response = user_response.model_copy(
update={
"bypass_rate_limit": summary.bypass_rate_limit,
"global_multiplier": summary.global_multiplier,
"has_custom_limits": summary.has_custom_limits,
}
)
user_responses.append(user_response)

return UserListResponse(
users=user_responses,
users=[UserResponse.model_validate(u) for u in result.users],
total=result.total,
offset=result.offset,
limit=result.limit,
Expand Down Expand Up @@ -119,7 +102,7 @@ async def get_user_overview(
stats=EventStatistics.model_validate(domain.stats),
derived_counts=DerivedCounts.model_validate(domain.derived_counts),
rate_limit_summary=RateLimitSummary.model_validate(domain.rate_limit_summary),
recent_events=[EventResponse.model_validate(e).model_dump() for e in domain.recent_events],
recent_events=domain.recent_events,
)


Expand Down Expand Up @@ -165,13 +148,19 @@ async def delete_user(
if admin.user_id == user_id:
raise HTTPException(status_code=400, detail="Cannot delete your own account")

deleted_counts = await admin_user_service.delete_user(
result = await admin_user_service.delete_user(
admin_username=admin.username, user_id=user_id, cascade=cascade
)
if deleted_counts.get("user", 0) == 0:
raise HTTPException(status_code=500, detail="Failed to delete user")

return DeleteUserResponse(message=f"User {user_id} deleted successfully", deleted_counts=deleted_counts)
return DeleteUserResponse(
message=f"User {user_id} deleted successfully",
user_deleted=result.user_deleted,
executions=result.executions,
saved_scripts=result.saved_scripts,
notifications=result.notifications,
user_settings=result.user_settings,
events=result.events,
sagas=result.sagas,
)


@router.post("/{user_id}/reset-password", response_model=MessageResponse)
Expand Down Expand Up @@ -204,10 +193,15 @@ async def update_user_rate_limits(
admin: Annotated[UserResponse, Depends(admin_user)],
admin_user_service: FromDishka[AdminUserService],
user_id: str,
rate_limit_config: UserRateLimit,
request: RateLimitUpdateRequest,
) -> RateLimitUpdateResponse:
config = UserRateLimit(
user_id=user_id,
rules=[RateLimitRule(**r.model_dump()) for r in request.rules],
**request.model_dump(exclude={"rules"}),
)
result = await admin_user_service.update_user_rate_limits(
admin_username=admin.username, user_id=user_id, config=rate_limit_config
admin_username=admin.username, user_id=user_id, config=config
)
return RateLimitUpdateResponse.model_validate(result)

Expand Down
42 changes: 20 additions & 22 deletions backend/app/api/routes/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,23 @@
from app.core.correlation import CorrelationContext
from app.core.utils import get_client_ip
from app.domain.enums.common import SortOrder
from app.domain.enums.events import EventType
from app.domain.enums.user import UserRole
from app.domain.events.event_models import EventFilter
from app.domain.events.typed import BaseEvent, EventMetadata
from app.domain.events.typed import BaseEvent, DomainEvent, EventMetadata
from app.schemas_pydantic.events import (
DeleteEventResponse,
EventAggregationRequest,
EventFilterRequest,
EventListResponse,
EventResponse,
EventStatistics,
PublishEventRequest,
PublishEventResponse,
ReplayAggregateResponse,
)
from app.schemas_pydantic.user import UserResponse
from app.services.event_service import EventService
from app.services.execution_service import ExecutionService
from app.services.kafka_event_service import KafkaEventService
from app.settings import Settings

Expand All @@ -37,10 +39,16 @@ async def get_execution_events(
execution_id: str,
current_user: Annotated[UserResponse, Depends(current_user)],
event_service: FromDishka[EventService],
execution_service: FromDishka[ExecutionService],
include_system_events: bool = Query(False, description="Include system-generated events"),
limit: int = Query(100, ge=1, le=1000),
skip: int = Query(0, ge=0),
) -> EventListResponse:
# Check execution ownership first (before checking events)
execution = await execution_service.get_execution_result(execution_id)
if execution.user_id and execution.user_id != current_user.user_id and current_user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="Access denied")

Comment on lines +47 to +51
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle ExecutionNotFoundError to return 404 instead of 500.

The get_execution_result method raises ExecutionNotFoundError when the execution doesn't exist (per the relevant code snippet). This exception will propagate as an unhandled 500 error. Consider catching it and returning a proper 404 response.

🐛 Proposed fix
+from app.domain.exceptions import ExecutionNotFoundError
+
 `@router.get`("/executions/{execution_id}/events", response_model=EventListResponse)
 async def get_execution_events(
     execution_id: str,
     current_user: Annotated[UserResponse, Depends(current_user)],
     event_service: FromDishka[EventService],
     execution_service: FromDishka[ExecutionService],
     include_system_events: bool = Query(False, description="Include system-generated events"),
     limit: int = Query(100, ge=1, le=1000),
     skip: int = Query(0, ge=0),
 ) -> EventListResponse:
     # Check execution ownership first (before checking events)
+    try:
+        execution = await execution_service.get_execution_result(execution_id)
+    except ExecutionNotFoundError:
+        raise HTTPException(status_code=404, detail="Execution not found")
-    execution = await execution_service.get_execution_result(execution_id)
     if execution.user_id and execution.user_id != current_user.user_id and current_user.role != UserRole.ADMIN:
         raise HTTPException(status_code=403, detail="Access denied")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Check execution ownership first (before checking events)
execution = await execution_service.get_execution_result(execution_id)
if execution.user_id and execution.user_id != current_user.user_id and current_user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="Access denied")
from app.domain.exceptions import ExecutionNotFoundError
# Check execution ownership first (before checking events)
try:
execution = await execution_service.get_execution_result(execution_id)
except ExecutionNotFoundError:
raise HTTPException(status_code=404, detail="Execution not found")
if execution.user_id and execution.user_id != current_user.user_id and current_user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="Access denied")
🤖 Prompt for AI Agents
In `@backend/app/api/routes/events.py` around lines 47 - 51, When calling
execution_service.get_execution_result(execution_id) handle the
ExecutionNotFoundError so it returns a 404 HTTPException instead of bubbling up
as a 500; wrap the call in a try/except that catches ExecutionNotFoundError and
raises HTTPException(status_code=404, detail="Execution not found") before
performing the existing ownership check that uses execution.user_id,
current_user and UserRole.ADMIN.

result = await event_service.get_execution_events(
execution_id=execution_id,
user_id=current_user.user_id,
Expand All @@ -53,10 +61,8 @@ async def get_execution_events(
if result is None:
raise HTTPException(status_code=403, detail="Access denied")

event_responses = [EventResponse.model_validate(event) for event in result.events]

return EventListResponse(
events=event_responses,
events=result.events,
total=result.total,
limit=limit,
skip=skip,
Expand All @@ -68,7 +74,7 @@ async def get_execution_events(
async def get_user_events(
current_user: Annotated[UserResponse, Depends(current_user)],
event_service: FromDishka[EventService],
event_types: List[str] | None = Query(None),
event_types: List[EventType] | None = Query(None),
start_time: datetime | None = Query(None),
end_time: datetime | None = Query(None),
limit: int = Query(100, ge=1, le=1000),
Expand All @@ -86,10 +92,8 @@ async def get_user_events(
sort_order=sort_order,
)

event_responses = [EventResponse.model_validate(event) for event in result.events]

return EventListResponse(
events=event_responses, total=result.total, limit=limit, skip=skip, has_more=result.has_more
events=result.events, total=result.total, limit=limit, skip=skip, has_more=result.has_more
)


Expand All @@ -100,7 +104,7 @@ async def query_events(
event_service: FromDishka[EventService],
) -> EventListResponse:
event_filter = EventFilter(
event_types=[str(et) for et in filter_request.event_types] if filter_request.event_types else None,
event_types=filter_request.event_types,
aggregate_id=filter_request.aggregate_id,
correlation_id=filter_request.correlation_id,
user_id=filter_request.user_id,
Expand All @@ -121,10 +125,8 @@ async def query_events(
if result is None:
raise HTTPException(status_code=403, detail="Cannot query other users' events")

event_responses = [EventResponse.model_validate(event) for event in result.events]

return EventListResponse(
events=event_responses, total=result.total, limit=result.limit, skip=result.skip, has_more=result.has_more
events=result.events, total=result.total, limit=result.limit, skip=result.skip, has_more=result.has_more
)


Expand All @@ -146,10 +148,8 @@ async def get_events_by_correlation(
skip=skip,
)

event_responses = [EventResponse.model_validate(event) for event in result.events]

return EventListResponse(
events=event_responses,
events=result.events,
total=result.total,
limit=limit,
skip=skip,
Expand Down Expand Up @@ -177,10 +177,8 @@ async def get_current_request_events(
skip=skip,
)

event_responses = [EventResponse.model_validate(event) for event in result.events]

return EventListResponse(
events=event_responses,
events=result.events,
total=result.total,
limit=limit,
skip=skip,
Expand Down Expand Up @@ -212,15 +210,15 @@ async def get_event_statistics(
return EventStatistics.model_validate(stats)


@router.get("/{event_id}", response_model=EventResponse)
@router.get("/{event_id}", response_model=DomainEvent)
async def get_event(
event_id: str, current_user: Annotated[UserResponse, Depends(current_user)], event_service: FromDishka[EventService]
) -> EventResponse:
) -> DomainEvent:
"""Get a specific event by ID"""
event = await event_service.get_event(event_id=event_id, user_id=current_user.user_id, user_role=current_user.role)
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
return EventResponse.model_validate(event)
return event


@router.post("/publish", response_model=PublishEventResponse)
Expand Down
Loading
Loading