Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 21 additions & 33 deletions backend/mcp_server/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import jwt

from core.config import get_settings
from core.custom_exceptions import RepositoryError
from core.enhanced_logging import get_logger
from mcp_server.permissions import DefaultPermissionSets, MCPPermissions
from rag_solution.core.exceptions import DomainError
from rag_solution.file_management.database import get_db
from rag_solution.models.user import User
from rag_solution.schemas.user_schema import UserOutput
from rag_solution.services.user_service import UserService

# SPIFFE imports are optional - used only when SPIFFE auth is configured
Expand Down Expand Up @@ -332,50 +332,38 @@ async def _authenticate_trusted_user(self, user_email: str) -> MCPAuthContext:
This is used when the MCP server is behind a trusted proxy
that has already authenticated the user.

Uses UserService.get_or_create_by_email() to properly look up users
by email address first, avoiding duplicate user creation when the
existing user has a different ibm_id.

Args:
user_email: The authenticated user's email from trusted proxy

Returns:
MCPAuthContext with user identity
"""
db_gen = None
db_session = None
try:
db_gen = get_db()
db_session = next(db_gen)
settings = get_settings()

# First, try to find existing user by email (not ibm_id)
# This handles the case where user was created with a different ibm_id
existing_user = db_session.query(User).filter(User.email == user_email).first()

# Type annotation for user - can be User model or UserOutput schema
user: User | UserOutput | None = None

if existing_user:
user = existing_user
logger.debug("Found existing user by email: %s (id=%s)", user_email, user.id)
else:
# Create new user if not found by email
user_service = UserService(db_session, settings)
user = user_service.get_or_create_user_by_fields(
ibm_id=user_email, # Use email as ibm_id for new trusted proxy users
email=user_email,
name=user_email.split("@")[0], # Extract name from email
role="user",
)
logger.debug("Created new user for trusted proxy: %s (id=%s)", user_email, user.id)
# Use service layer for proper email-based lookup
# This handles existing users AND creates new ones if needed
user_service = UserService(db_session, settings)
user = user_service.get_or_create_by_email(email=user_email)

if user:
return MCPAuthContext(
user_id=user.id,
username=user.email,
is_authenticated=True,
auth_method="trusted_proxy",
permissions=list(DefaultPermissionSets.TRUSTED_PROXY),
metadata={"source": "trusted_proxy"},
)
except Exception as e:
logger.debug("Authenticated user via trusted proxy: %s (id=%s)", user_email, user.id)

return MCPAuthContext(
user_id=user.id,
username=user.email,
is_authenticated=True,
auth_method="trusted_proxy",
permissions=list(DefaultPermissionSets.TRUSTED_PROXY),
metadata={"source": "trusted_proxy"},
)
except (DomainError, RepositoryError) as e:
logger.warning("Trusted user lookup failed: %s", e)
finally:
# Ensure proper cleanup of database session
Expand Down
24 changes: 24 additions & 0 deletions backend/rag_solution/repository/user_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,30 @@ def get_by_ibm_id(self, ibm_id: str) -> UserOutput:
logger.error(f"Error getting user by IBM ID {ibm_id}: {e!s}")
raise RepositoryError(f"Failed to get user by IBM ID: {e!s}") from e

def get_by_email(self, email: str) -> UserOutput:
"""Fetches user by email address.

Args:
email: The email address to look up

Returns:
UserOutput: The user with the given email

Raises:
NotFoundError: If user not found
RepositoryError: For database errors
"""
try:
user = self.db.query(User).filter(User.email == email).options(joinedload(User.teams)).first()
if not user:
raise NotFoundError("User", identifier=f"email={email}")
return UserOutput.model_validate(user)
except NotFoundError:
raise
except Exception as e:
logger.error(f"Error getting user by email {email}: {e!s}")
raise RepositoryError(f"Failed to get user by email: {e!s}") from e

def update(self, user_id: UUID4, user_update: UserInput) -> UserOutput:
"""Updates user data with validation.

Expand Down
75 changes: 75 additions & 0 deletions backend/rag_solution/services/user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,81 @@ def get_or_create_user_by_fields(self, ibm_id: str, email: EmailStr, name: str,
UserInput(ibm_id=ibm_id, email=email, name=name, role=role, preferred_provider_id=None)
)

def get_user_by_email(self, email: str) -> UserOutput:
"""Gets user by email address.

Args:
email: The email address to look up

Returns:
UserOutput: The user with the given email

Raises:
NotFoundError: If user not found
"""
logger.info("Fetching user with email: %s", email)
return self.user_repository.get_by_email(email)

def get_or_create_by_email(self, email: EmailStr, name: str | None = None, role: str = "user") -> UserOutput:
"""Gets existing user by email or creates new one.

This method first looks up the user by email address. If found, returns
the existing user. If not found, creates a new user using the email
as both the email and ibm_id (for trusted proxy scenarios).

This is the preferred method for trusted proxy authentication where
user identity comes from an email header.

Args:
email: The email address (used for lookup and as ibm_id for new users)
name: Optional display name (defaults to email prefix if not provided)
role: User role (defaults to "user")

Returns:
UserOutput: The existing or newly created user with all defaults initialized

Note:
For existing users, ensures required defaults (templates, parameters)
are present, reinitializing them if necessary.
"""
try:
existing_user = self.user_repository.get_by_email(email)

# Defensive check: Ensure user has required defaults
templates = self.prompt_template_service.get_user_templates(existing_user.id)

if not templates or len(templates) < MIN_REQUIRED_TEMPLATES:
logger.warning(
"User %s (email=%s) exists but missing defaults - attempting recovery...",
existing_user.id,
email,
)
try:
_, reinit_templates, parameters = self.user_provider_service.initialize_user_defaults(
existing_user.id
)
logger.info(
"✅ Successfully recovered user %s: %d templates, %s parameters",
existing_user.id,
len(reinit_templates),
"created" if parameters else "failed",
)
except Exception as e:
logger.error("❌ Failed to recover user %s: %s", existing_user.id, str(e))
raise ValidationError(
f"User {existing_user.id} missing required defaults and recovery failed: {e}",
field="user_initialization",
) from e

return existing_user
except NotFoundError:
# User doesn't exist by email, create with email as ibm_id
display_name = name if name else email.split("@")[0]
logger.info("Creating new user for email: %s", email)
return self.create_user(
UserInput(ibm_id=email, email=email, name=display_name, role=role, preferred_provider_id=None)
)

def get_or_create_user(self, user_input: UserInput) -> UserOutput:
"""Gets existing user or creates new one, ensuring all required defaults exist.

Expand Down
Loading