Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions everyrow-mcp/deploy/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
.git
.github
.venv
__pycache__
*.pyc
.env
*.env
!.env.example
.claude/
.vscode/
*.egg-info
dist/
build/
node_modules/
docs-site/
3 changes: 2 additions & 1 deletion everyrow-mcp/deploy/.env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
EVERYROW_API_KEY=sk-cho-your-api-key-here
SUPABASE_URL=https://your-project.supabase.co
SUPABASE_ANON_KEY=sb_publishable_your-anon-key-here
REDIS_ENCRYPTION_KEY=generate-with-python-cryptography-fernet
MCP_SERVER_URL=https://your-tunnel-url.example.com
REDIS_PASSWORD=change-me-to-a-strong-random-password
12 changes: 10 additions & 2 deletions everyrow-mcp/deploy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@ RUN uv sync --package everyrow-mcp --no-dev --no-sources --no-editable
# Stage 2: Slim runtime
FROM python:3.13-slim

RUN groupadd -r mcp && useradd -r -g mcp -d /app -s /sbin/nologin mcp

ENV PATH="/app/.venv/bin:$PATH"
EXPOSE 8000

CMD ["everyrow-mcp", "--http", "--port", "8000", "--host", "0.0.0.0"]

WORKDIR /app
COPY --link --from=build /app/.venv .venv
RUN chown -R mcp:mcp /app

USER mcp

HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"

CMD ["everyrow-mcp", "--http", "--port", "8000", "--host", "0.0.0.0"]
20 changes: 16 additions & 4 deletions everyrow-mcp/deploy/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --requirepass "${REDIS_PASSWORD:?Set REDIS_PASSWORD}"
# No ports: — only reachable by other services on the Docker network.
healthcheck:
test: ["CMD", "redis-cli", "ping"]
test: ["CMD", "redis-cli", "-a", "${REDIS_PASSWORD}", "ping"]
interval: 5s
timeout: 3s
retries: 3
deploy:
resources:
limits:
memory: 256M
restart: unless-stopped

mcp-server:
build:
context: ../..
dockerfile: everyrow-mcp/deploy/Dockerfile
ports:
- "8000:8000"
- "127.0.0.1:8000:8000"
depends_on:
redis:
condition: service_healthy
Expand All @@ -25,4 +30,11 @@ services:
REDIS_HOST: redis
REDIS_PORT: "6379"
REDIS_DB: "13"
REDIS_PASSWORD: "${REDIS_PASSWORD}"
EVERYROW_API_URL: "https://everyrow.io/api/v0"
TRUST_PROXY_HEADERS: "true" # Behind Cloudflare tunnel
deploy:
resources:
limits:
memory: 512M
restart: unless-stopped
4 changes: 3 additions & 1 deletion everyrow-mcp/src/everyrow_mcp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@


def _clear_task_state() -> None:
if settings.is_http:
return
if TASK_STATE_FILE.exists():
TASK_STATE_FILE.unlink()

Expand All @@ -33,7 +35,7 @@ async def stdio_lifespan(_server: FastMCP):
raise RuntimeError("Failed to authenticate with everyrow API")
yield SessionContext(client_factory=lambda: client)
except Exception as e:
logging.getLogger(__name__).error(f"everyrow-mcp startup failed: {e!r}")
logging.getLogger(__name__).error("everyrow-mcp startup failed: %r", e)
raise
finally:
_clear_task_state()
Expand Down
97 changes: 67 additions & 30 deletions everyrow-mcp/src/everyrow_mcp/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import httpx
import jwt as pyjwt
import pydantic
from jwt import PyJWKClient
from mcp.server.auth.provider import (
AccessToken,
Expand Down Expand Up @@ -59,6 +60,10 @@ def __init__(
self._revocation_ttl = revocation_ttl
self._jwks_lock = asyncio.Lock()

@property
def revocation_ttl(self) -> int:
return self._revocation_ttl

@staticmethod
def _token_fingerprint(token: str) -> str:
return hashlib.sha256(token.encode()).hexdigest()
Expand All @@ -75,10 +80,14 @@ async def _get_signing_key(self, token: str):
)

def _decode_jwt(self, token: str, signing_key) -> dict[str, Any]:
# Use the algorithm from the JWKS key (e.g. ES256, RS256) rather
# than hardcoding, since Supabase may use any asymmetric algorithm.
jwk_data = getattr(signing_key, "_jwk_data", None) or {}
alg = jwk_data.get("alg", "RS256")
return pyjwt.decode(
token,
signing_key.key,
algorithms=["RS256", "ES256"],
algorithms=[alg],
issuer=self._issuer,
audience=self._audience,
options={"require": ["exp", "sub", "iss", "aud"]},
Expand Down Expand Up @@ -107,7 +116,7 @@ async def verify_token(self, token: str) -> AccessToken | None:
logger.warning("JWKS fetch timed out (10s)")
return None
except pyjwt.PyJWTError:
logger.debug("JWT verification failed", exc_info=True)
logger.debug("JWT verification failed")
return None


Expand Down Expand Up @@ -191,18 +200,20 @@ def _UNSAFE_decode_server_jwt(token: str) -> dict[str, Any]:
endpoint over HTTPS and was never exposed to the client.
NEVER use this for tokens received from end users.
"""
return pyjwt.decode(token, options={"verify_signature": False})
return pyjwt.decode(
token, options={"verify_signature": False}, algorithms=["RS256"]
)

@staticmethod
def _client_ip(request: Request) -> str:
return get_client_ip(request) or "unknown"

async def _check_rate_limit(self, action: str, client_ip: str) -> None:
rl_key = build_key("ratelimit", action, client_ip)
pipe = self._redis.pipeline()
pipe.incr(rl_key)
pipe.expire(rl_key, settings.registration_rate_window)
count, _ = await pipe.execute()
async with self._redis.pipeline() as pipe:
pipe.incr(rl_key)
pipe.expire(rl_key, settings.registration_rate_window, nx=True)
count, _ = await pipe.execute()
if count > settings.registration_rate_limit:
raise ValueError(f"{action.title()} rate limit exceeded")

Expand Down Expand Up @@ -246,9 +257,10 @@ def _supabase_redirect_url(supabase_verifier: str) -> str:
def _validate_redirect_url(
client: OAuthClientInformationFull, params: AuthorizationParams
) -> None:
if client.redirect_uris:
if str(params.redirect_uri) not in [str(u) for u in client.redirect_uris]:
raise ValueError("redirect_uri does not match any registered URI")
if not client.redirect_uris:
raise ValueError("Client must register at least one redirect_uri")
if str(params.redirect_uri) not in [str(u) for u in client.redirect_uris]:
raise ValueError("redirect_uri does not match any registered URI")

async def _validate_auth_request(
self, request: Request, action: str, state: str | None, *, consume: bool = False
Expand All @@ -272,15 +284,14 @@ async def _validate_auth_request(

async def _validate_client(self, pending: PendingAuth) -> None:
client_info = await self.get_client(pending.client_id)
if client_info is None or (
pending.params.redirect_uri
and client_info.redirect_uris
and str(pending.params.redirect_uri)
if client_info is None:
raise HTTPException(status_code=400, detail="Invalid client")
if pending.params.redirect_uri and (
not client_info.redirect_uris
or str(pending.params.redirect_uri)
not in [str(u) for u in client_info.redirect_uris]
):
raise HTTPException(
status_code=400, detail="Invalid client or redirect_uri"
)
raise HTTPException(status_code=400, detail="Invalid redirect_uri")

async def _validate_supabase_code(
self, code: str, supabase_code_verifier: str
Expand All @@ -289,10 +300,10 @@ async def _validate_supabase_code(
return await self._exchange_supabase_code(
code=code, code_verifier=supabase_code_verifier
)
except Exception:
logger.exception("Failed to exchange Supabase code")
except Exception as exc:
logger.error("Failed to exchange Supabase code: %s", type(exc).__name__)
raise HTTPException(
status_code=500, detail="Failed to authenticate with Supabase"
status_code=500, detail="Authentication failed. Please try again."
)

async def _validate_callback_request(
Expand Down Expand Up @@ -411,11 +422,18 @@ async def load_authorization_code(
if len(authorization_code) > 256:
return None

code_data = await self._redis.getdel(build_key("authcode", authorization_code))
key = build_key("authcode", authorization_code)
# GETDEL atomically consumes the code — no race between concurrent requests.
code_data = await self._redis.getdel(key)
if code_data is None:
return None
code_obj = EveryRowAuthorizationCode.model_validate_json(code_data)
if code_obj.expires_at and code_obj.expires_at < time.time():
return None
if code_obj.client_id != client.client_id:
# Re-store so the legitimate client can still use it.
remaining = max(1, int((code_obj.expires_at or 0) - time.time()))
await self._redis.setex(key, remaining, code_data)
return None
return code_obj

Expand Down Expand Up @@ -470,11 +488,15 @@ async def load_refresh_token(
if len(refresh_token) > 256:
return None

data = await self._redis.getdel(build_key("refresh", refresh_token))
key = build_key("refresh", refresh_token)
# GETDEL atomically consumes the token — no race between concurrent requests.
data = await self._redis.getdel(key)
if data is None:
return None
rt = EveryRowRefreshToken.model_validate_json(data)
if rt.client_id != client.client_id:
# Re-store so the legitimate client can still use it.
await self._redis.setex(key, settings.refresh_token_ttl, data)
return None
return rt

Expand All @@ -485,9 +507,18 @@ async def exchange_refresh_token(
scopes: list[str],
) -> OAuthToken:
final_scopes = self._validate_scopes(scopes, refresh_token)
supa_tokens = await self._refresh_supabase_token(
refresh_token.supabase_refresh_token
)
try:
supa_tokens = await self._refresh_supabase_token(
refresh_token.supabase_refresh_token
)
except Exception:
# Re-store the consumed refresh token so the user isn't locked out.
await self._redis.setex(
name=build_key("refresh", refresh_token.token),
time=settings.refresh_token_ttl,
value=refresh_token.model_dump_json(),
)
raise
return await self._issue_token_response(
access_token=supa_tokens.access_token,
client_id=client.client_id,
Expand All @@ -500,9 +531,11 @@ async def revoke_token(self, token: AccessToken | EveryRowRefreshToken) -> None:
await self._redis.delete(build_key("refresh", token.token))
elif isinstance(token, AccessToken):
fp = SupabaseTokenVerifier._token_fingerprint(token.token)
remaining = max(0, (token.expires_at or 0) - int(time.time())) + 60
ttl = remaining if remaining > 60 else self._token_verifier.revocation_ttl
await self._redis.setex(
name=build_key("revoked", fp),
time=self._token_verifier._revocation_ttl,
time=ttl,
value="1",
)

Expand All @@ -519,10 +552,14 @@ async def _supabase_token_request(
)
resp.raise_for_status()
data = resp.json()
return SupabaseTokenResponse(
access_token=data["access_token"],
refresh_token=data["refresh_token"],
)
try:
return SupabaseTokenResponse.model_validate(data)
except pydantic.ValidationError:
logger.error(
"Supabase token response missing required fields: %s",
sorted(data.keys()),
)
raise ValueError("Invalid token response from identity provider")

async def _exchange_supabase_code(
self, code: str, code_verifier: str
Expand Down
18 changes: 18 additions & 0 deletions everyrow-mcp/src/everyrow_mcp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ class Settings(BaseSettings):

everyrow_api_url: str = Field(default="https://everyrow.io/api/v0")
preview_size: int = Field(default=1000)
max_inline_rows: int = Field(
default=50_000,
description="Maximum number of rows allowed in inline JSON data",
)
max_inline_data_bytes: int = Field(
default=10 * 1024 * 1024,
description="Maximum size in bytes for inline CSV string data (10 MB)",
)
max_schema_properties: int = Field(
default=50,
description="Maximum number of properties allowed in a response schema",
)
token_budget: int = Field(
default=20000,
description="Target token budget per page of inline results",
Expand All @@ -27,6 +39,12 @@ class Settings(BaseSettings):
)
redis_sentinel_master_name: str | None = Field(default=None)

trust_proxy_headers: bool = Field(
default=False,
description="Trust X-Forwarded-For and CF-Connecting-IP headers for client IP. "
"Enable only when behind a trusted reverse proxy (e.g. Cloudflare).",
)

# HTTP-only settings — unused in stdio mode
mcp_server_url: str = Field(default="")
supabase_url: str = Field(default="")
Expand Down
22 changes: 21 additions & 1 deletion everyrow-mcp/src/everyrow_mcp/http_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ def configure_http_mode(
mcp_server_url: str,
) -> None:
"""Configure the MCP server for HTTP transport."""
if not no_auth:
missing = []
if not settings.supabase_url:
missing.append("SUPABASE_URL")
if not settings.supabase_anon_key:
missing.append("SUPABASE_ANON_KEY")
if not settings.mcp_server_url:
missing.append("MCP_SERVER_URL")
if missing:
raise RuntimeError(
f"HTTP auth mode requires these environment variables: {', '.join(missing)}"
)

redis_client = get_redis_client()
if no_auth:
lifespan = no_auth_http_lifespan
Expand All @@ -52,7 +65,7 @@ def configure_http_mode(
mcp.settings.port = port

_register_widgets(mcp, mcp_server_url)
_register_routes(mcp, auth_provider if not no_auth else None)
_register_routes(mcp, redis_client, auth_provider if not no_auth else None)
_add_middleware(mcp, redis_client, rate_limit=not no_auth)


Expand All @@ -79,6 +92,7 @@ def _results_ui_http() -> str:

def _register_routes(
mcp: FastMCP,
redis: Redis,
auth_provider: EveryRowAuthProvider | None,
) -> None:
"""Register REST endpoints for widget polling, CSV download, health, and auth."""
Expand All @@ -88,6 +102,12 @@ def _register_routes(
)

async def _health(_request: Request) -> Response:
try:
await redis.ping()
except Exception:
return JSONResponse(
{"status": "unhealthy", "redis": "unreachable"}, status_code=503
)
return JSONResponse({"status": "ok"})

mcp.custom_route("/health", ["GET"])(_health)
Expand Down
Loading