Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 4 additions & 106 deletions app/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session, joinedload, selectinload

Expand All @@ -33,7 +32,6 @@
)
from app.models.forge_api_key import ForgeApiKey
from app.models.user import User
from app.services.provider_service import create_default_tensorblock_provider_for_user

logger = get_logger(name="dependencies")

Expand Down Expand Up @@ -402,111 +400,11 @@ async def get_current_user_from_clerk(
.filter(User.clerk_user_id == clerk_user_id)
)
user = result.scalar_one_or_none()

# User doesn't exist yet, create one
if not user:
# Fetch user data from Clerk API
# https://clerk.com/docs/reference/backend-api/tag/users/get/users/%7Buser_id%7D
if not CLERK_API_KEY:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Clerk API key not configured",
)

# Call Clerk API to get user info
url = f"{CLERK_API_URL}/users/{clerk_user_id}"

try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers={"Authorization": f"Bearer {CLERK_API_KEY}"}) as response:
response.raise_for_status()
user_data = await response.json()

# Extract email address
email = None
if user_data.get("primary_email_address_id") and user_data.get(
"email_addresses"
):
for email_obj in user_data.get("email_addresses", []):
if email_obj["id"] == user_data["primary_email_address_id"]:
email = email_obj.get("email_address")
break
if email is None:
raise ValueError("No email found in Clerk user data")

# Use email as username directly
username = email
except Exception as e:
logger.exception(f"Error fetching Clerk user data: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to fetch user data from Clerk",
)

# Check if user exists with this email
result = await db.execute(select(User).filter(User.email == email))
existing_user = result.scalar_one_or_none()
if existing_user:
# Link existing user to Clerk ID
try:
existing_user.clerk_user_id = clerk_user_id
await db.commit()
return existing_user
except IntegrityError:
# Another request might have already linked this user or created a new one
await db.rollback()
# Retry the query to get the user
result = await db.execute(
select(User).filter(User.clerk_user_id == clerk_user_id)
)
user = result.scalar_one_or_none()
if user:
return user
# If still no user, continue with creation attempt

# Create new user
try:
user = User(
email=email,
username=username,
clerk_user_id=clerk_user_id,
is_active=True,
hashed_password="", # Clerk handles authentication
)
db.add(user)
await db.commit()
await db.refresh(user)

# Create default TensorBlock provider for the new user
try:
await create_default_tensorblock_provider_for_user(user.id, db)
except Exception as e:
# Log error but don't fail user creation
logger.warning(
f"Failed to create default TensorBlock provider for user {user.id}: {e}"
)

return user
except IntegrityError as e:
# Handle race condition: another request might have created the user
await db.rollback()
if "users_clerk_user_id_key" in str(e) or "clerk_user_id" in str(e):
# Retry the query to get the user that was created by another request
result = await db.execute(
select(User).filter(User.clerk_user_id == clerk_user_id)
)
user = result.scalar_one_or_none()
if user:
return user
else:
# This shouldn't happen, but handle it gracefully
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create or retrieve user due to database constraint",
)
else:
# Re-raise other integrity errors
raise
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)

return user

Expand Down
8 changes: 5 additions & 3 deletions app/api/routes/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ async def create_user(
user_in: UserCreate, db: AsyncSession = Depends(get_async_db)
) -> Any:
"""
Create a new user.
Create a new user for cli usage.

This is a deprecated endpoint and should not be used in staging/production.
Users should be created via the Clerk webhook.
"""
# Check if email already exists
result = await db.execute(
Expand Down Expand Up @@ -52,7 +55,6 @@ async def create_user(
hashed_password=hashed_password,
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)

# Create default TensorBlock provider for the new user
Expand All @@ -66,7 +68,7 @@ async def create_user(
"error": str(e),
}
})

await db.commit()
return db_user


Expand Down
62 changes: 60 additions & 2 deletions app/api/routes/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from fastapi import APIRouter, Depends, HTTPException, Request, status
import stripe
from sqlalchemy.exc import IntegrityError
from sqlalchemy import update, delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from svix import Webhook, WebhookVerificationError
Expand All @@ -14,6 +15,7 @@
from app.models.user import User
from app.models.admin_users import AdminUsers
from app.services.wallet_service import WalletService
from app.services.provider_service import create_default_tensorblock_provider_for_user

logger = get_logger(name="webhooks")

Expand All @@ -27,12 +29,17 @@
@router.post("/clerk")
async def clerk_webhook_handler(request: Request, db: AsyncSession = Depends(get_async_db)):
"""
Handle Clerk webhooks for user events.
Handle Clerk webhooks for user/organization membership events.

Key events to handle:
# Organization membership events
- organizationMembership.created: Add user to admin users table
- organizationMembership.updated: Update user in admin users table
- organizationMembership.deleted: Remove user from admin users table

# User events
- user.created: Upsert user record
- user.updated: Upsert user record
"""
# Get the request body
payload = await request.body()
Expand Down Expand Up @@ -71,10 +78,12 @@ async def clerk_webhook_handler(request: Request, db: AsyncSession = Depends(get
event_type = event_data.get("type")
logger.info(f"Received Clerk webhook: {event_type}")

if event_type == "organizationMembership.created" or event_type == "organizationMembership.updated":
if event_type in ["organizationMembership.created", "organizationMembership.updated"]:
await handle_organization_membership_created(event_data, db)
elif event_type == "organizationMembership.deleted":
await handle_organization_membership_deleted(event_data, db)
elif event_type in ["user.created", "user.updated"]:
await handle_clerk_user_created(event_data, db)
else:
logger.warning(f"Unhandled Clerk event type: {event_type}")
except json.JSONDecodeError:
Expand Down Expand Up @@ -122,6 +131,55 @@ async def handle_organization_membership_deleted(event_data: dict, db: AsyncSess
await db.commit()


async def handle_clerk_user_created(event_data: dict, db: AsyncSession):
data = event_data['data']
clerk_user_id = data['id']

# extract the primary email address
if not data.get('primary_email_address_id') or not data.get('email_addresses'):
logger.error(f"No primary email address or email addresses found for user {clerk_user_id}")
raise HTTPException(status_code=400, detail="No primary email address or email addresses found for user")

email = None
primary_email_address_id = data['primary_email_address_id']
for email_address in data['email_addresses']:
if email_address['id'] == primary_email_address_id:
email = email_address['email_address']
break

if not email:
logger.error(f"No email address found for user {clerk_user_id}")
raise HTTPException(status_code=400, detail="No email address found for user")

# upsert user record
try:
result = await db.execute(
insert(User).values(
email=email,
username=email, # Use email as username
clerk_user_id=clerk_user_id,
is_active=True,
hashed_password="", # Clerk handles authentication
).on_conflict_do_update(
index_elements=[User.clerk_user_id],
set_=dict(
email=email,
username=email,
is_active=True,
hashed_password="", # Clerk handles authentication
)
).returning(User.id)
)
user_id = result.scalar_one()
await create_default_tensorblock_provider_for_user(user_id, db)
await db.commit()
except IntegrityError:
logger.exception("Error upserting user record for clerk user")
raise HTTPException(status_code=400, detail="Error upserting user record for clerk user")

logger.info(f"Upserted user record for clerk user {clerk_user_id}/{email}")


@router.post("/stripe")
async def stripe_webhook_handler(request: Request, db: AsyncSession = Depends(get_async_db)):
"""
Expand Down
14 changes: 3 additions & 11 deletions app/services/provider_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,15 +859,7 @@ async def create_default_tensorblock_provider_for_user(
)

db.add(provider_key)
await db.commit()

logger.info(f"Created default TensorBlock provider for user {user_id}")

except Exception as e:
await db.rollback()
logger.error(
"Error creating default TensorBlock provider for user {}: {}",
user_id,
e,
)
# Don't raise the exception - this is optional functionality
except Exception:
logger.exception(f"Error creating default TensorBlock provider for user {user_id}")
raise
Loading