diff --git a/app/api/dependencies.py b/app/api/dependencies.py index bb6b308..abf03d6 100644 --- a/app/api/dependencies.py +++ b/app/api/dependencies.py @@ -13,7 +13,6 @@ from fastapi.security import APIKeyHeader, OAuth2PasswordBearer from jose import JWTError, jwt from sqlalchemy import select -from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Session, joinedload, selectinload @@ -33,7 +32,6 @@ ) from app.models.forge_api_key import ForgeApiKey from app.models.user import User -from app.services.provider_service import create_default_tensorblock_provider_for_user logger = get_logger(name="dependencies") @@ -402,111 +400,11 @@ async def get_current_user_from_clerk( .filter(User.clerk_user_id == clerk_user_id) ) user = result.scalar_one_or_none() - - # User doesn't exist yet, create one if not user: - # Fetch user data from Clerk API - # https://clerk.com/docs/reference/backend-api/tag/users/get/users/%7Buser_id%7D - if not CLERK_API_KEY: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Clerk API key not configured", - ) - - # Call Clerk API to get user info - url = f"{CLERK_API_URL}/users/{clerk_user_id}" - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url, headers={"Authorization": f"Bearer {CLERK_API_KEY}"}) as response: - response.raise_for_status() - user_data = await response.json() - - # Extract email address - email = None - if user_data.get("primary_email_address_id") and user_data.get( - "email_addresses" - ): - for email_obj in user_data.get("email_addresses", []): - if email_obj["id"] == user_data["primary_email_address_id"]: - email = email_obj.get("email_address") - break - if email is None: - raise ValueError("No email found in Clerk user data") - - # Use email as username directly - username = email - except Exception as e: - logger.exception(f"Error fetching Clerk user data: {e}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to fetch user data from Clerk", - ) - - # Check if user exists with this email - result = await db.execute(select(User).filter(User.email == email)) - existing_user = result.scalar_one_or_none() - if existing_user: - # Link existing user to Clerk ID - try: - existing_user.clerk_user_id = clerk_user_id - await db.commit() - return existing_user - except IntegrityError: - # Another request might have already linked this user or created a new one - await db.rollback() - # Retry the query to get the user - result = await db.execute( - select(User).filter(User.clerk_user_id == clerk_user_id) - ) - user = result.scalar_one_or_none() - if user: - return user - # If still no user, continue with creation attempt - - # Create new user - try: - user = User( - email=email, - username=username, - clerk_user_id=clerk_user_id, - is_active=True, - hashed_password="", # Clerk handles authentication - ) - db.add(user) - await db.commit() - await db.refresh(user) - - # Create default TensorBlock provider for the new user - try: - await create_default_tensorblock_provider_for_user(user.id, db) - except Exception as e: - # Log error but don't fail user creation - logger.warning( - f"Failed to create default TensorBlock provider for user {user.id}: {e}" - ) - - return user - except IntegrityError as e: - # Handle race condition: another request might have created the user - await db.rollback() - if "users_clerk_user_id_key" in str(e) or "clerk_user_id" in str(e): - # Retry the query to get the user that was created by another request - result = await db.execute( - select(User).filter(User.clerk_user_id == clerk_user_id) - ) - user = result.scalar_one_or_none() - if user: - return user - else: - # This shouldn't happen, but handle it gracefully - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to create or retrieve user due to database constraint", - ) - else: - # Re-raise other integrity errors - raise + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="User not found", + ) return user diff --git a/app/api/routes/users.py b/app/api/routes/users.py index 8ae56cb..887a9f6 100644 --- a/app/api/routes/users.py +++ b/app/api/routes/users.py @@ -22,7 +22,10 @@ async def create_user( user_in: UserCreate, db: AsyncSession = Depends(get_async_db) ) -> Any: """ - Create a new user. + Create a new user for cli usage. + + This is a deprecated endpoint and should not be used in staging/production. + Users should be created via the Clerk webhook. """ # Check if email already exists result = await db.execute( @@ -52,7 +55,6 @@ async def create_user( hashed_password=hashed_password, ) db.add(db_user) - await db.commit() await db.refresh(db_user) # Create default TensorBlock provider for the new user @@ -66,7 +68,7 @@ async def create_user( "error": str(e), } }) - + await db.commit() return db_user diff --git a/app/api/routes/webhooks.py b/app/api/routes/webhooks.py index 6cd7068..ee792e9 100644 --- a/app/api/routes/webhooks.py +++ b/app/api/routes/webhooks.py @@ -3,6 +3,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request, status import stripe +from sqlalchemy.exc import IntegrityError from sqlalchemy import update, delete, select from sqlalchemy.ext.asyncio import AsyncSession from svix import Webhook, WebhookVerificationError @@ -14,6 +15,7 @@ from app.models.user import User from app.models.admin_users import AdminUsers from app.services.wallet_service import WalletService +from app.services.provider_service import create_default_tensorblock_provider_for_user logger = get_logger(name="webhooks") @@ -27,12 +29,17 @@ @router.post("/clerk") async def clerk_webhook_handler(request: Request, db: AsyncSession = Depends(get_async_db)): """ - Handle Clerk webhooks for user events. + Handle Clerk webhooks for user/organization membership events. Key events to handle: + # Organization membership events - organizationMembership.created: Add user to admin users table - organizationMembership.updated: Update user in admin users table - organizationMembership.deleted: Remove user from admin users table + + # User events + - user.created: Upsert user record + - user.updated: Upsert user record """ # Get the request body payload = await request.body() @@ -71,10 +78,12 @@ async def clerk_webhook_handler(request: Request, db: AsyncSession = Depends(get event_type = event_data.get("type") logger.info(f"Received Clerk webhook: {event_type}") - if event_type == "organizationMembership.created" or event_type == "organizationMembership.updated": + if event_type in ["organizationMembership.created", "organizationMembership.updated"]: await handle_organization_membership_created(event_data, db) elif event_type == "organizationMembership.deleted": await handle_organization_membership_deleted(event_data, db) + elif event_type in ["user.created", "user.updated"]: + await handle_clerk_user_created(event_data, db) else: logger.warning(f"Unhandled Clerk event type: {event_type}") except json.JSONDecodeError: @@ -122,6 +131,55 @@ async def handle_organization_membership_deleted(event_data: dict, db: AsyncSess await db.commit() +async def handle_clerk_user_created(event_data: dict, db: AsyncSession): + data = event_data['data'] + clerk_user_id = data['id'] + + # extract the primary email address + if not data.get('primary_email_address_id') or not data.get('email_addresses'): + logger.error(f"No primary email address or email addresses found for user {clerk_user_id}") + raise HTTPException(status_code=400, detail="No primary email address or email addresses found for user") + + email = None + primary_email_address_id = data['primary_email_address_id'] + for email_address in data['email_addresses']: + if email_address['id'] == primary_email_address_id: + email = email_address['email_address'] + break + + if not email: + logger.error(f"No email address found for user {clerk_user_id}") + raise HTTPException(status_code=400, detail="No email address found for user") + + # upsert user record + try: + result = await db.execute( + insert(User).values( + email=email, + username=email, # Use email as username + clerk_user_id=clerk_user_id, + is_active=True, + hashed_password="", # Clerk handles authentication + ).on_conflict_do_update( + index_elements=[User.clerk_user_id], + set_=dict( + email=email, + username=email, + is_active=True, + hashed_password="", # Clerk handles authentication + ) + ).returning(User.id) + ) + user_id = result.scalar_one() + await create_default_tensorblock_provider_for_user(user_id, db) + await db.commit() + except IntegrityError: + logger.exception("Error upserting user record for clerk user") + raise HTTPException(status_code=400, detail="Error upserting user record for clerk user") + + logger.info(f"Upserted user record for clerk user {clerk_user_id}/{email}") + + @router.post("/stripe") async def stripe_webhook_handler(request: Request, db: AsyncSession = Depends(get_async_db)): """ diff --git a/app/services/provider_service.py b/app/services/provider_service.py index 7c9e763..ff2446e 100644 --- a/app/services/provider_service.py +++ b/app/services/provider_service.py @@ -859,15 +859,7 @@ async def create_default_tensorblock_provider_for_user( ) db.add(provider_key) - await db.commit() - logger.info(f"Created default TensorBlock provider for user {user_id}") - - except Exception as e: - await db.rollback() - logger.error( - "Error creating default TensorBlock provider for user {}: {}", - user_id, - e, - ) - # Don't raise the exception - this is optional functionality + except Exception: + logger.exception(f"Error creating default TensorBlock provider for user {user_id}") + raise