Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions src/sap_cloud_sdk/agentgateway/_customer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,17 @@

logger = logging.getLogger(__name__)

# Environment variable to override default credential path
# Environment variable to override default credential path (points directly to credentials file)
_CREDENTIALS_PATH_ENV = "AGW_CREDENTIALS_PATH"

# Default credential path for Kyma production deployments
_CREDENTIALS_DEFAULT_PATH = "/etc/ums/credentials/credentials"
# servicebinding.io: scan $SERVICE_BINDING_ROOT for a binding whose 'type' file equals the expected type
_SERVICE_BINDING_ROOT_ENV = "SERVICE_BINDING_ROOT"
_BINDING_TYPE = "integration-credentials"
_BINDING_TYPE_FILE = "type"
_CREDENTIALS_FILE = "credentials"

# Kyma default when SERVICE_BINDING_ROOT is not set
_DEFAULT_BINDING_ROOT = "/bindings"

# Resource URN for Agent Gateway token scope (hardcoded - production value)
_AGW_RESOURCE_URN = "urn:sap:identity:application:provider:name:agent-gateway"
Expand All @@ -58,32 +64,42 @@ class _CredentialFields:
GATEWAY_URL = "gatewayUrl"
INTEGRATION_DEPENDENCIES = "integrationDependencies"
ORD_ID = "ordId"
DATA = "data"
GLOBAL_TENANT_ID = "globalTenantId"


def detect_customer_agent_credentials() -> str | None:
"""Check if customer agent credentials file exists.

Checks for credential file in the following order:
1. Path specified in AGW_CREDENTIALS_PATH env var
2. Default mounted path: /etc/ums/credentials/credentials
1. Path specified in AGW_CREDENTIALS_PATH env var (explicit override, points to file directly)
2. $SERVICE_BINDING_ROOT (or /bindings if unset): scans all subdirectories for one whose
'type' file contains 'integration-credentials', then reads 'credentials' from that directory

Returns:
Path to credentials file if found, None otherwise.
"""
# Check env var first (path may be customized)
# 1. Explicit override via env var
path_from_env = os.environ.get(_CREDENTIALS_PATH_ENV)
if path_from_env and os.path.isfile(path_from_env):
logger.debug("Customer credentials found at env var path: %s", path_from_env)
return path_from_env

# Check default mounted path
if os.path.isfile(_CREDENTIALS_DEFAULT_PATH):
logger.debug(
"Customer credentials found at default path: %s", _CREDENTIALS_DEFAULT_PATH
)
return _CREDENTIALS_DEFAULT_PATH
# 2. servicebinding.io: scan $SERVICE_BINDING_ROOT for a binding whose 'type' file equals _BINDING_TYPE
sbr = os.environ.get(_SERVICE_BINDING_ROOT_ENV, _DEFAULT_BINDING_ROOT)
if sbr and os.path.isdir(sbr):
for entry in os.scandir(sbr):
if not entry.is_dir():
continue
type_file = os.path.join(entry.path, _BINDING_TYPE_FILE)
if not os.path.isfile(type_file):
continue
with open(type_file) as f:
if f.read().strip() != _BINDING_TYPE:
continue
credentials_path = os.path.join(entry.path, _CREDENTIALS_FILE)
if os.path.isfile(credentials_path):
logger.debug("Customer credentials found via servicebinding.io type scan: %s", credentials_path)
return credentials_path

return None

Expand Down Expand Up @@ -128,16 +144,14 @@ def load_customer_credentials(path: str) -> CustomerCredentials:
if _CredentialFields.INTEGRATION_DEPENDENCIES not in data:
raise AgentGatewaySDKError(
"Credentials file missing required field: integrationDependencies. "
'Expected format: [{"ordId": "...", "data": {"globalTenantId": "..."}}]'
'Expected format: [{"ordId": "...", "globalTenantId": "..."}]'
)

try:
integration_deps = [
IntegrationDependency(
ord_id=dep[_CredentialFields.ORD_ID],
global_tenant_id=dep[_CredentialFields.DATA][
_CredentialFields.GLOBAL_TENANT_ID
],
global_tenant_id=dep[_CredentialFields.GLOBAL_TENANT_ID],
)
for dep in data[_CredentialFields.INTEGRATION_DEPENDENCIES]
]
Expand All @@ -148,7 +162,7 @@ def load_customer_credentials(path: str) -> CustomerCredentials:
except (KeyError, TypeError) as e:
raise AgentGatewaySDKError(
f"Failed to parse integrationDependencies: {e}. "
'Expected format: [{"ordId": "...", "data": {"globalTenantId": "..."}}]'
'Expected format: [{"ordId": "...", "globalTenantId": "..."}]'
)

return CustomerCredentials(
Expand Down
114 changes: 78 additions & 36 deletions tests/agentgateway/unit/test_customer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
call_mcp_tool_customer,
_build_mcp_url,
_CREDENTIALS_PATH_ENV,
_CREDENTIALS_DEFAULT_PATH,
_SERVICE_BINDING_ROOT_ENV,
_BINDING_TYPE,
_BINDING_TYPE_FILE,
_CREDENTIALS_FILE,
_DEFAULT_BINDING_ROOT,
)
from sap_cloud_sdk.agentgateway._models import (
CustomerCredentials,
Expand All @@ -34,6 +38,15 @@
class TestDetectCustomerAgentCredentials:
"""Tests for customer agent credential detection."""

def _make_binding(self, parent, name="my-binding"):
"""Create a servicebinding.io-compliant binding directory under parent."""
binding_dir = parent / name
binding_dir.mkdir(parents=True, exist_ok=True)
(binding_dir / _BINDING_TYPE_FILE).write_text(_BINDING_TYPE)
creds_file = binding_dir / _CREDENTIALS_FILE
creds_file.write_text('{"clientid": "test"}')
return creds_file

def test_detect_from_env_var_path(self, tmp_path):
"""Detect credentials from path specified in environment variable."""
creds_file = tmp_path / "credentials.json"
Expand All @@ -43,53 +56,82 @@ def test_detect_from_env_var_path(self, tmp_path):
result = detect_customer_agent_credentials()
assert result == str(creds_file)

def test_detect_from_env_var_path_file_not_exists(self):
def test_detect_from_env_var_path_file_not_exists(self, tmp_path):
"""Return None when env var path doesn't exist."""
with patch.dict(os.environ, {_CREDENTIALS_PATH_ENV: "/nonexistent/path"}):
env = {_CREDENTIALS_PATH_ENV: "/nonexistent/path", _SERVICE_BINDING_ROOT_ENV: str(tmp_path)}
with patch.dict(os.environ, env, clear=False):
result = detect_customer_agent_credentials()
assert result is None

def test_detect_from_default_path(self):
"""Detect credentials from default mounted path."""
with patch.dict(os.environ, {}, clear=False):
# Remove env var if present
def test_detect_from_service_binding_root(self, tmp_path):
"""Detect credentials by scanning SERVICE_BINDING_ROOT for a binding with matching type file."""
creds_file = self._make_binding(tmp_path)

env = {_SERVICE_BINDING_ROOT_ENV: str(tmp_path)}
with patch.dict(os.environ, env, clear=False):
os.environ.pop(_CREDENTIALS_PATH_ENV, None)
result = detect_customer_agent_credentials()
assert result == str(creds_file)

with patch("os.path.isfile") as mock_isfile:
mock_isfile.side_effect = lambda p: p == _CREDENTIALS_DEFAULT_PATH
def test_detect_from_default_path(self, tmp_path):
"""Detect credentials from default /bindings root when SERVICE_BINDING_ROOT is not set."""
creds_file = self._make_binding(tmp_path)

with patch.dict(os.environ, {}, clear=False):
os.environ.pop(_CREDENTIALS_PATH_ENV, None)
os.environ.pop(_SERVICE_BINDING_ROOT_ENV, None)
with patch("sap_cloud_sdk.agentgateway._customer._DEFAULT_BINDING_ROOT", str(tmp_path)):
result = detect_customer_agent_credentials()
assert result == _CREDENTIALS_DEFAULT_PATH
assert result == str(creds_file)

def test_no_credentials_returns_none(self):
"""Return None when no credentials are found."""
with patch.dict(os.environ, {}, clear=False):
def test_skips_binding_with_wrong_type(self, tmp_path):
"""Ignore binding directories whose type file does not match."""
wrong_dir = tmp_path / "other-binding"
wrong_dir.mkdir()
(wrong_dir / _BINDING_TYPE_FILE).write_text("something-else")
(wrong_dir / _CREDENTIALS_FILE).write_text('{"clientid": "wrong"}')

env = {_SERVICE_BINDING_ROOT_ENV: str(tmp_path)}
with patch.dict(os.environ, env, clear=False):
os.environ.pop(_CREDENTIALS_PATH_ENV, None)
result = detect_customer_agent_credentials()
assert result is None

with patch("os.path.isfile", return_value=False):
result = detect_customer_agent_credentials()
assert result is None
def test_service_binding_root_takes_priority_over_default(self, tmp_path):
"""SERVICE_BINDING_ROOT is checked before the hardcoded /bindings fallback."""
sbr_dir = tmp_path / "sbr"
sbr_dir.mkdir()
creds_file = self._make_binding(sbr_dir)

def test_env_var_takes_priority_over_default(self, tmp_path):
"""Env var path should take priority over default path."""
creds_file = tmp_path / "custom_credentials.json"
creds_file.write_text('{"clientid": "custom"}')
with patch.dict(os.environ, {_SERVICE_BINDING_ROOT_ENV: str(sbr_dir)}, clear=False):
os.environ.pop(_CREDENTIALS_PATH_ENV, None)
result = detect_customer_agent_credentials()
assert result == str(creds_file)

with patch.dict(os.environ, {_CREDENTIALS_PATH_ENV: str(creds_file)}):
# Even if default path exists, env var should be used
with patch("os.path.isfile") as mock_isfile:
def test_no_credentials_returns_none(self, tmp_path):
"""Return None when no binding with matching type is found."""
env = {_SERVICE_BINDING_ROOT_ENV: str(tmp_path)}
with patch.dict(os.environ, env, clear=False):
os.environ.pop(_CREDENTIALS_PATH_ENV, None)
result = detect_customer_agent_credentials()
assert result is None

def isfile_side_effect(path):
if path == str(creds_file):
return True
if path == _CREDENTIALS_DEFAULT_PATH:
return True
return False
def test_env_var_takes_priority_over_service_binding_root(self, tmp_path):
"""AGW_CREDENTIALS_PATH env var takes priority over SERVICE_BINDING_ROOT."""
creds_file = tmp_path / "custom_credentials.json"
creds_file.write_text('{"clientid": "custom"}')

mock_isfile.side_effect = isfile_side_effect
sbr_dir = tmp_path / "sbr"
sbr_dir.mkdir()
self._make_binding(sbr_dir)

result = detect_customer_agent_credentials()
assert result == str(creds_file)
env = {
_CREDENTIALS_PATH_ENV: str(creds_file),
_SERVICE_BINDING_ROOT_ENV: str(sbr_dir),
}
with patch.dict(os.environ, env, clear=False):
result = detect_customer_agent_credentials()
assert result == str(creds_file)


# ============================================================
Expand All @@ -112,7 +154,7 @@ def test_loads_valid_credentials(self, tmp_path):
"integrationDependencies": [
{
"ordId": "sap.test:apiResource:demo:v1",
"data": {"globalTenantId": "123"},
"globalTenantId": "123",
},
],
}
Expand Down Expand Up @@ -172,11 +214,11 @@ def test_loads_integration_dependencies(self, tmp_path):
"integrationDependencies": [
{
"ordId": "sap.mcpbuilder:apiResource:cost-center:v1",
"data": {"globalTenantId": "250695"},
"globalTenantId": "250695",
},
{
"ordId": "sap.flights:mcpServer:v1",
"data": {"globalTenantId": "892451733"},
"globalTenantId": "892451733",
},
],
}
Expand Down Expand Up @@ -222,7 +264,7 @@ def test_raises_on_invalid_integration_dependencies_format(self, tmp_path):
"privateKey": "-----BEGIN PRIVATE KEY-----\ntest\n-----END PRIVATE KEY-----",
"gatewayUrl": "https://agw.example.com",
"integrationDependencies": [
{"ordId": "missing-data-field"}, # Missing 'data' key
{"ordId": "missing-global-tenant-id-field"}, # Missing 'globalTenantId' key
],
}
creds_file.write_text(json.dumps(creds_data))
Expand Down
Loading