Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 43 additions & 19 deletions rest/python/server/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import config
import db
from exceptions import UcpVersionError
from fastapi import Depends
from fastapi import Header
from fastapi import HTTPException
Expand All @@ -36,6 +37,7 @@
from services.checkout_service import CheckoutService
from services.fulfillment_service import FulfillmentService
from sqlalchemy.ext.asyncio import AsyncSession
from ucp_version import parse_ucp_version


class CommonHeaders(BaseModel):
Expand Down Expand Up @@ -63,39 +65,61 @@ async def common_headers(
)


def _version_error_detail(code: str, message: str) -> dict:
"""Build a UCP-shaped error detail payload for version failures."""
return {
"status": "error",
"errors": [
{
"code": code,
"message": message,
"severity": "critical",
}
],
}


async def validate_ucp_headers(ucp_agent: str):
"""Validate UCP headers and version negotiation."""
server_version = config.get_server_version()
agent_version = server_version # Default to server version if not specified
try:
server_date = parse_ucp_version(server_version)
except UcpVersionError as exc:
raise HTTPException(
status_code=500, detail=exc.to_detail()
) from exc

# Default to server version if UCP-Agent omits version=.
agent_version = server_version
agent_date = server_date

# Use regex to extract version more robustly.
# We look for 'version=' either at the start or after a semicolon,
# allowing for whitespace.
# Matches: version="1.2.3" or version=1.2.3
# Matches: version="2026-01-23" or version=2026-01-23
match = re.search(
r"(?:^|;)\s*version=(?:\"([^\"]+)\"|([^;]+))", ucp_agent, re.IGNORECASE
)
if match:
# Group 1 is quoted value, Group 2 is unquoted value
agent_version = match.group(1) or match.group(2)
agent_version = agent_version.strip()

if agent_version > server_version:
agent_version = (match.group(1) or match.group(2)).strip()
try:
agent_date = parse_ucp_version(agent_version)
except UcpVersionError as exc:
raise HTTPException(
status_code=400, detail=exc.to_detail()
) from exc

if agent_date > server_date:
raise HTTPException(
status_code=400,
detail={
"status": "error",
"errors": [
{
"code": "VERSION_UNSUPPORTED",
"message": (
f"Version {agent_version} is not supported. This merchant"
f" implements version {server_version}."
),
"severity": "critical",
}
],
},
detail=_version_error_detail(
"VERSION_UNSUPPORTED",
(
f"Version {agent_version} is not supported. This merchant"
f" implements version {server_version}."
),
),
)


Expand Down
23 changes: 23 additions & 0 deletions rest/python/server/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,26 @@ class InvalidRequestError(UcpError):
def __init__(self, message: str):
"""Initialize InvalidRequestError."""
super().__init__(message, code="INVALID_REQUEST", status_code=400)


class UcpVersionError(UcpError):
"""Raised when a UCP version string is invalid or unsupported."""

def __init__(
self, message: str, code: str = "VERSION_INVALID_FORMAT"
):
"""Initialize UcpVersionError."""
super().__init__(message, code=code, status_code=400)

def to_detail(self) -> dict:
"""Return an error payload matching UCP REST error shape."""
return {
"status": "error",
"errors": [
{
"code": self.code,
"message": self.message,
"severity": "critical",
}
],
}
54 changes: 54 additions & 0 deletions rest/python/server/ucp_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2026 UCP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""UCP version string parsing (YYYY-MM-DD)."""

import datetime
import re

from exceptions import UcpVersionError

_UCP_VERSION_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")


def parse_ucp_version(version: str) -> datetime.date:
"""Parse a UCP version string in YYYY-MM-DD format.

Args:
version: The version string to parse.

Returns:
A datetime.date representing the version.

Raises:
TypeError: If version is not a string.
UcpVersionError: If the string is not a valid YYYY-MM-DD calendar date.
"""
if not isinstance(version, str):
raise TypeError(f"Version must be a string, got {type(version).__name__}.")

version = version.strip()
if not _UCP_VERSION_RE.fullmatch(version):
raise UcpVersionError(
f"Version '{version}' is invalid. Expected YYYY-MM-DD.",
code="VERSION_INVALID_FORMAT",
)

try:
return datetime.date.fromisoformat(version)
except ValueError as exc:
raise UcpVersionError(
f"Version '{version}' is invalid. Expected YYYY-MM-DD.",
code="VERSION_INVALID_FORMAT",
) from exc