Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion src/mcp/server/auth/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any
from uuid import uuid4

from pydantic import BaseModel, ValidationError
from pydantic import AnyUrl, BaseModel, ValidationError
from starlette.requests import Request
from starlette.responses import Response

Expand All @@ -18,12 +18,29 @@
# provider from what we use in the HTTP handler
RegistrationRequest = OAuthClientMetadata

LOOPBACK_REDIRECT_HOSTS = ("localhost", "127.0.0.1", "[::1]")


class RegistrationErrorResponse(BaseModel):
error: RegistrationErrorCode
error_description: str | None


def validate_redirect_uri(uri: AnyUrl) -> None:
"""Validate a dynamic client registration redirect URI."""
if uri.scheme != "https" and uri.host not in LOOPBACK_REDIRECT_HOSTS:
raise RegistrationError(
error="invalid_redirect_uri",
error_description="redirect_uris must use HTTPS or target a loopback host",
)

if uri.fragment:
raise RegistrationError(
error="invalid_redirect_uri",
error_description="redirect_uris must not include a fragment",
)


@dataclass
class RegistrationHandler:
provider: OAuthAuthorizationServerProvider[Any, Any, Any]
Expand All @@ -35,6 +52,9 @@ async def handle(self, request: Request) -> Response:
body = await request.body()
client_metadata = OAuthClientMetadata.model_validate_json(body)

for redirect_uri in client_metadata.redirect_uris or ():
validate_redirect_uri(redirect_uri)

# Scope validation is handled below
except ValidationError as validation_error:
return PydanticJSONResponse(
Expand All @@ -44,6 +64,14 @@ async def handle(self, request: Request) -> Response:
),
status_code=400,
)
except RegistrationError as registration_error:
return PydanticJSONResponse(
content=RegistrationErrorResponse(
error=registration_error.error,
error_description=registration_error.error_description,
),
status_code=400,
)

client_id = str(uuid4())

Expand Down
7 changes: 0 additions & 7 deletions tests/interaction/_requirements.py
Original file line number Diff line number Diff line change
Expand Up @@ -2713,13 +2713,6 @@ def __post_init__(self) -> None:
),
transports=("streamable-http",),
note="Auth is enforced at the HTTP layer; the bundled AS is an ASGI app.",
divergence=Divergence(
note=(
"Not enforced: the registration handler models redirect_uris as AnyUrl with no scheme or "
"host check, so http://evil.example/callback is accepted and registered. The spec's "
"localhost-or-HTTPS rule is left to the provider implementation."
),
),
),
"hosting:auth:as:token-cache-headers": Requirement(
source="sdk",
Expand Down
43 changes: 35 additions & 8 deletions tests/interaction/auth/test_as_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,22 +279,49 @@ async def test_authorize_with_an_unregistered_redirect_uri_is_rejected_directly(


@requirement("hosting:auth:as:redirect-uri-scheme")
async def test_a_non_loopback_http_redirect_uri_is_accepted_at_registration(
@pytest.mark.parametrize(
"redirect_uri",
[
"http://evil.example/callback",
"javascript:alert(1)",
"data:text/html,<script>alert(1)</script>",
"file:///etc/passwd",
"https://client.example.com/callback#fragment",
],
)
async def test_registration_rejects_redirect_uris_without_https_or_loopback(
as_app: tuple[httpx.AsyncClient, InMemoryAuthorizationServerProvider],
redirect_uri: str,
) -> None:
"""A registration carrying a non-HTTPS, non-loopback redirect URI is accepted.
"""The registration endpoint rejects redirect URIs without HTTPS or a loopback host."""
http, provider = as_app
body = oauth_client_metadata().model_dump(mode="json", exclude_none=True)
body["redirect_uris"] = [redirect_uri]

The spec requires every redirect URI to be either HTTPS or a loopback host; the bundled
registration handler does not enforce this and registers `http://evil.example/callback`
successfully. See the divergence on the requirement.
"""
response = await http.post("/register", json=body)

assert response.status_code == 400
assert response.json()["error"] == "invalid_redirect_uri"
assert provider.clients == {}


@requirement("hosting:auth:as:redirect-uri-scheme")
async def test_registration_allows_https_and_loopback_redirect_uris(
as_app: tuple[httpx.AsyncClient, InMemoryAuthorizationServerProvider],
) -> None:
"""HTTPS redirect URIs and loopback HTTP redirect URIs remain valid."""
http, provider = as_app
body = oauth_client_metadata().model_dump(mode="json", exclude_none=True)
body["redirect_uris"] = ["http://evil.example/callback"]
body["redirect_uris"] = [
"https://client.example.com/callback?next=%2Fhome",
"http://localhost:3000/callback",
"http://127.0.0.1:3000/callback",
"http://[::1]:3000/callback",
]

response = await http.post("/register", json=body)

assert response.status_code == 201
info = OAuthClientInformationFull.model_validate_json(response.content)
assert [str(u) for u in (info.redirect_uris or [])] == ["http://evil.example/callback"]
assert [str(u) for u in (info.redirect_uris or [])] == body["redirect_uris"]
assert info.client_id in provider.clients
Loading