Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "north-mcp-python-sdk"
version = "0.3.1"
version = "0.3.2"
description = "Add your description here"
readme = "README.md"
authors = [{ name = "Raphael Cristal", email = "raphael@cohere.com" }]
Expand Down
57 changes: 51 additions & 6 deletions src/north_mcp_python_sdk/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ def get_authenticated_user() -> AuthenticatedNorthUser:
except ValidationError as e:
raise Exception(f"Failed to validate claims: {e}") from e

if (
access_token.token == ""
Comment thread
speterkins marked this conversation as resolved.
and claims.email is None
and not claims.connector_access_tokens
):
raise Exception(
"Access token not found in context. Cannot construct AuthenticatedNorthUser."
)

return AuthenticatedNorthUser(claims.connector_access_tokens, claims.email)


Expand Down Expand Up @@ -265,6 +274,9 @@ def _parse_connector_tokens(self, header_value: str) -> dict[str, str]:

return tokens

def _auth_is_configured(self) -> bool:
return bool(self._server_secret or self._trusted_issuers)

def _validate_server_secret(self, provided_secret: str | None) -> None:
"""Validate server secret matches expected value."""
if provided_secret:
Expand Down Expand Up @@ -337,6 +349,8 @@ def _create_authenticated_user(
AuthCredentials(),
AuthenticatedUser(
auth_info=AccessToken(
# FastMCP exposes auth context through AccessToken, so we currently
# lean on it as the carrier for North request context as well.
token=user_id_token or "",
client_id=email or "",
scopes=[],
Expand Down Expand Up @@ -460,6 +474,28 @@ async def authenticate(
headers_debug = {k: v for k, v in conn.headers.items()}
self.logger.debug("Request headers: %s", headers_debug)

if not self._auth_is_configured():
if self._has_x_north_headers(conn):
self.logger.debug(
"No auth configured, but X-North headers are present; parsing request context without enforcing authentication"
)
return await self._authenticate_x_north_headers(conn)

if conn.headers.get("Authorization"):
self.logger.debug(
"No auth configured, but Authorization header is present; parsing legacy request context without enforcing authentication"
)
return await self._authenticate_legacy_bearer(conn)
Comment thread
speterkins marked this conversation as resolved.

self.logger.debug(
"No server secret or trusted issuer configuration present and no auth headers provided; skipping authentication"
)
return self._create_authenticated_user(
email=None,
connector_access_tokens={},
user_id_token=None,
)

# Check for X-North headers first (preferred)
if self._has_x_north_headers(conn):
return await self._authenticate_x_north_headers(conn)
Expand All @@ -470,16 +506,25 @@ async def authenticate(
def _verify_token_signature(
self, raw_token: str, decoded_token: dict[str, Any]
) -> None:
self.logger.debug(
"Verifying user ID token signature against trusted issuers"
)
issuer = decoded_token.get("iss")

if self._trusted_issuers and issuer in self._trusted_issuers:
self._verify_token_signature_from_issuer(
raw_token=raw_token,
issuer=issuer,
)
return

if not issuer:
raise AuthenticationError("Token missing issuer")
raise AuthenticationError(f"Untrusted issuer: {issuer}")

if issuer not in self._trusted_issuers:
raise AuthenticationError(f"Untrusted issuer: {issuer}")

def _verify_token_signature_from_issuer(
self, *, raw_token: str, issuer: str
) -> None:
self.logger.debug(
"Verifying user ID token signature against trusted issuers"
)
openid_config_req = urllib.request.Request(
url=issuer.rstrip("/") + "/.well-known/openid-configuration"
)
Expand Down
34 changes: 26 additions & 8 deletions tests/test_north_mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,25 @@ async def test_client(app: NorthMCPServer):
yield client


@pytest.fixture
def app_with_auth() -> NorthMCPServer:
return NorthMCPServer(server_secret="secret")


@pytest_asyncio.fixture
async def auth_test_client(app_with_auth: NorthMCPServer):
asgi_app = app_with_auth.http_app(transport="streamable-http")
async with LifespanManager(asgi_app) as manager:
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=manager.app),
base_url="https://mcptest.com",
) as client:
yield client


@pytest.mark.asyncio
async def test_missing_auth_header(test_client: httpx.AsyncClient):
result = await test_client.post(
async def test_missing_auth_header(auth_test_client: httpx.AsyncClient):
result = await auth_test_client.post(
"/mcp",
json={"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {}},
)
Expand All @@ -38,8 +54,8 @@ async def test_missing_auth_header(test_client: httpx.AsyncClient):


@pytest.mark.asyncio
async def test_invalid_auth_header(test_client: httpx.AsyncClient):
result = await test_client.post(
async def test_invalid_auth_header(auth_test_client: httpx.AsyncClient):
result = await auth_test_client.post(
"/mcp",
headers={"Authorization": "Bearer Invalid"},
json={"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {}},
Expand All @@ -50,9 +66,9 @@ async def test_invalid_auth_header(test_client: httpx.AsyncClient):

@pytest.mark.asyncio
async def test_missing_token_returns_unauthorized(
test_client: httpx.AsyncClient,
auth_test_client: httpx.AsyncClient,
):
result = await test_client.post(
result = await auth_test_client.post(
"/mcp",
headers={"Authorization": ""},
json={"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {}},
Expand All @@ -62,8 +78,10 @@ async def test_missing_token_returns_unauthorized(


@pytest.mark.asyncio
async def test_invalid_base64_auth_header(test_client: httpx.AsyncClient):
result = await test_client.post(
async def test_invalid_base64_auth_header(
auth_test_client: httpx.AsyncClient,
):
result = await auth_test_client.post(
"/mcp",
headers={"Authorization": "invalid_base64"},
json={"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {}},
Expand Down
24 changes: 21 additions & 3 deletions tests/test_north_token_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,10 @@ def echo(message: str) -> str:
yield client

@pytest.mark.asyncio
async def test_mcp_route_requires_auth(self, fastmcp_with_north_auth):
"""Test that MCP routes require authentication."""
async def test_mcp_route_allows_requests_without_auth_by_default(
self, fastmcp_with_north_auth
):
"""Test that MCP routes do not require auth when no auth is configured."""
response = await fastmcp_with_north_auth.post(
"/mcp",
json={
Expand All @@ -192,7 +194,7 @@ async def test_mcp_route_requires_auth(self, fastmcp_with_north_auth):
"params": {},
},
)
assert response.status_code == 401
assert response.status_code != 401

@pytest.mark.asyncio
async def test_mcp_route_with_valid_auth(self, fastmcp_with_north_auth):
Expand Down Expand Up @@ -231,6 +233,22 @@ async def test_server_secret_validation(self, fastmcp_with_secret):
)
assert response.status_code == 401

@pytest.mark.asyncio
async def test_server_secret_auth_requires_headers(
self, fastmcp_with_secret
):
"""Test that MCP routes require auth when server secret auth is configured."""
response = await fastmcp_with_secret.post(
"/mcp",
json={
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {},
},
)
assert response.status_code == 401

# With wrong secret - should fail
auth_header = self.create_auth_header(server_secret="wrong-secret")
response = await fastmcp_with_secret.post(
Expand Down
26 changes: 20 additions & 6 deletions tests/test_x_north_auth_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,21 +297,35 @@ async def test_x_north_whitespace_only_headers_treated_as_absent():
}
conn = create_mock_connection(headers)

# Should fall back to legacy auth since X-North headers are effectively absent
with pytest.raises(AuthenticationError, match="invalid authorization"):
await backend.authenticate(conn)
# Open auth should treat effectively-empty North headers the same as no headers.
auth_response = await backend.authenticate(conn)
if auth_response is None:
raise ValueError("Authentication response is None")
_, user = auth_response

assert isinstance(user, AuthenticatedUser)
assert user.access_token.token == ""
assert user.access_token.claims["email"] is None
assert user.access_token.claims["connector_access_tokens"] == {}


@pytest.mark.asyncio
async def test_no_auth_headers_present():
"""Test error when no authentication headers are provided at all."""
"""Test open auth returns empty context when no auth headers are provided."""
backend = NorthAuthBackend()

headers = {}
conn = create_mock_connection(headers)

with pytest.raises(AuthenticationError, match="invalid authorization"):
await backend.authenticate(conn)
auth_response = await backend.authenticate(conn)
if auth_response is None:
raise ValueError("Authentication response is None")
_, user = auth_response

assert isinstance(user, AuthenticatedUser)
assert user.access_token.token == ""
assert user.access_token.claims["email"] is None
assert user.access_token.claims["connector_access_tokens"] == {}


@pytest.mark.asyncio
Expand Down
60 changes: 57 additions & 3 deletions tests/test_x_north_trusted_issuers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ def create_mock_connection(headers: dict[str, str]) -> Mock:


def create_x_north_headers_with_issuer(
email: str = "test@company.com", issuer: str = "https://example.okta.com"
email: str = "test@company.com",
issuer: str = "https://example.okta.com",
include_server_secret: bool = True,
) -> dict[str, str]:
"""Helper to create X-North headers with specific issuer."""
user_id_token = jwt.encode(
Expand All @@ -36,11 +38,13 @@ def create_x_north_headers_with_issuer(
.rstrip("=")
)

return {
headers = {
"X-North-ID-Token": user_id_token,
"X-North-Connector-Tokens": connector_tokens_b64,
"X-North-Server-Secret": "server_secret",
}
if include_server_secret:
headers["X-North-Server-Secret"] = "server_secret"
return headers


@pytest.mark.asyncio
Expand All @@ -66,6 +70,56 @@ async def test_x_north_headers_without_trusted_issuers():
assert user.access_token.claims["email"] == "test@company.com"


@pytest.mark.asyncio
async def test_auth_without_configuration_allows_missing_headers():
backend = NorthAuthBackend()
conn = create_mock_connection({})

auth_response = await backend.authenticate(conn)
if auth_response is None:
raise ValueError("Authentication response is None")
_, user = auth_response

assert isinstance(user, AuthenticatedUser)
assert user.access_token.token == ""
assert user.access_token.claims["email"] is None


@pytest.mark.asyncio
async def test_auth_without_configuration_still_parses_x_north_headers():
backend = NorthAuthBackend()
headers = create_x_north_headers_with_issuer(
email="test@company.com",
issuer="https://untrusted.example.com",
include_server_secret=False,
)
conn = create_mock_connection(headers)

auth_response = await backend.authenticate(conn)
if auth_response is None:
raise ValueError("Authentication response is None")
_, user = auth_response

assert isinstance(user, AuthenticatedUser)
assert user.access_token.claims["email"] == "test@company.com"
assert user.access_token.claims["connector_access_tokens"] == {
"google": "token123"
}


@pytest.mark.asyncio
async def test_trusted_issuers_require_auth_headers():
backend = NorthAuthBackend(
trusted_issuers=["https://example.okta.com"],
)
conn = create_mock_connection({})

with pytest.raises(
AuthenticationError, match="invalid authorization header"
):
await backend.authenticate(conn)


@pytest.mark.asyncio
async def test_x_north_headers_trusted_issuers_missing_issuer():
"""Test X-North headers reject tokens missing issuer when trusted issuers configured."""
Expand Down
Loading