Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions core/api_routers/source_groups.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import logging

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List, Optional, Any, Dict
from pydantic import BaseModel
from uuid import UUID

from core.auth import get_current_user
from core.db import get_db
from core.db.models import SourceGroup

router = APIRouter(prefix="/source_groups", tags=["source_groups"])
logger = logging.getLogger(__name__)

# Pydantic items
class SourceGroupBase(BaseModel):
Expand All @@ -26,23 +30,51 @@ class SourceGroupRead(SourceGroupBase):
class Config:
orm_mode = True


def require_source_group_write_access(current_user: Any = Depends(get_current_user)):
role = getattr(current_user, "role", None)
roles = getattr(current_user, "roles", None)

if isinstance(current_user, dict):
role = role or current_user.get("role")
roles = roles or current_user.get("roles")

if role in {"admin", "editor"}:
return current_user

if isinstance(roles, (list, set, tuple)) and any(r in {"admin", "editor"} for r in roles):
return current_user

raise HTTPException(status_code=403, detail="Insufficient permissions")
Comment on lines +34 to +48
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require_source_group_write_access checks for roles {"admin", "editor"}, but the codebase’s defined roles are ["admin", "manager", "triage", "analyst"] (see core/api_routers/auth.py VALID_ROLES). As written, no user can ever have the "editor" role and non-admin roles will be blocked from writes even if they have permissions via role_permissions. Consider reusing the existing core.auth.require_permission(page, "write") mechanism (and adding a page entry for source groups if needed) or updating the role list/permission model consistently across the app.

Copilot uses AI. Check for mistakes.

@router.get("/", response_model=List[SourceGroupRead])
def list_source_groups(db: Session = Depends(get_db)):
def list_source_groups(
db: Session = Depends(get_db),
current_user: Any = Depends(get_current_user)
):
Comment on lines 50 to +54
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auth/authorization behavior changed for these endpoints (now requires authentication, and writes require elevated access), but there are no API router tests for source_groups. Add tests similar to core/tests/test_api_routers/test_jobs.py that cover: unauthenticated requests => 401, authenticated but insufficient role => 403 on POST/PUT, and permitted role => success.

Copilot uses AI. Check for mistakes.
return db.query(SourceGroup).all()

@router.get("/{group_id}", response_model=SourceGroupRead)
def get_source_group(group_id: UUID, db: Session = Depends(get_db)):
def get_source_group(
group_id: UUID,
db: Session = Depends(get_db),
current_user: Any = Depends(get_current_user)
):
sg = db.query(SourceGroup).filter(SourceGroup.id == group_id).first()
if not sg:
raise HTTPException(status_code=404, detail="SourceGroup not found")
return sg

@router.post("/", response_model=SourceGroupRead)
def create_source_group(sg: SourceGroupCreate, db: Session = Depends(get_db)):
def create_source_group(
sg: SourceGroupCreate,
db: Session = Depends(get_db),
current_user: Any = Depends(require_source_group_write_access)
):
existing = db.query(SourceGroup).filter(SourceGroup.slug == sg.slug).first()
if existing:
raise HTTPException(status_code=400, detail="SourceGroup with this slug already exists")

db_sg = SourceGroup(
slug=sg.slug,
description=sg.description,
Expand All @@ -51,17 +83,24 @@ def create_source_group(sg: SourceGroupCreate, db: Session = Depends(get_db)):
db.add(db_sg)
db.commit()
db.refresh(db_sg)
logger.info("source_group_created", extra={"source_group_id": str(db_sg.id), "actor": str(getattr(current_user, "id", None) or current_user.get("id") if isinstance(current_user, dict) else None)})
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The audit log actor field is always None because get_current_user() returns a dict with user_id, not id. Use current_user.get("user_id") (or username) and consider extracting it into a local variable for readability.

Copilot uses AI. Check for mistakes.
return db_sg

@router.put("/{group_id}", response_model=SourceGroupRead)
def update_source_group(group_id: UUID, sg: SourceGroupCreate, db: Session = Depends(get_db)):
def update_source_group(
group_id: UUID,
sg: SourceGroupCreate,
db: Session = Depends(get_db),
current_user: Any = Depends(require_source_group_write_access)
):
db_sg = db.query(SourceGroup).filter(SourceGroup.id == group_id).first()
if not db_sg:
raise HTTPException(status_code=404, detail="SourceGroup not found")

db_sg.slug = sg.slug
db_sg.description = sg.description
db_sg.config = sg.config
db.commit()
db.refresh(db_sg)
logger.info("source_group_updated", extra={"source_group_id": str(db_sg.id), "actor": str(getattr(current_user, "id", None) or current_user.get("id") if isinstance(current_user, dict) else None)})
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as above: get_current_user() provides user_id, not id, so this audit log records actor=None. Use current_user.get("user_id") (or username) to make the audit trail reliable.

Copilot uses AI. Check for mistakes.
return db_sg