Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 224 additions & 21 deletions src/sap_cloud_sdk/extensibility/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import asyncio
import itertools
import json
import logging
Expand All @@ -15,6 +16,7 @@

from sap_cloud_sdk.core.telemetry import Module, Operation
from sap_cloud_sdk.core.telemetry.metrics_decorator import record_metrics
from sap_cloud_sdk.agentgateway import create_client as create_agw_client
from sap_cloud_sdk.extensibility._models import (
DEFAULT_EXTENSION_CAPABILITY_ID,
ExtensionCapabilityImplementation,
Expand All @@ -38,6 +40,7 @@

_EXECUTE_WORKFLOW_TOOL_NAME = "execute_workflow"
_GET_EXECUTION_TOOL_NAME = "get_execution"
_N8N_MCP_SERVER_NAME = "sap.btpn8n:apiResource:ManagedN8nMcpServer:v1"

_JSONRPC_VERSION = "2.0"

Expand Down Expand Up @@ -220,34 +223,34 @@ def call_hook(
hook_config: HookConfig,
) -> Optional[Message]:
"""Call a hook's MCP endpoint and poll until completion.

Executes the workflow via ``execute-workflow``, then polls
``get-execution`` every 500 ms until the execution succeeds, fails,
or ``hook.timeout`` seconds elapse.

This method is transport-agnostic: regardless of how extension
metadata was fetched (backend, local file, or no-op),
the actual hook invocation is always a direct HTTP call to the
URL embedded in the :class:`Hook` object.

Args:
hook: Hook configuration (workflow ID, method, timeout).
hook_config: Hook invocation configuration (endpoint URL, auth token, optional payload).

Returns:
Parsed ``Message`` from the last executed workflow node, or ``None``
if the hook completed successfully but produced no message.

Raises:
TransportError: On HTTP errors, terminal execution failures, or timeout.

Example:
```python
from sap_cloud_sdk.extensibility import create_client

client = create_client("sap.ai:agent:myAgent:v1")
impl = client.get_extension_capability_implementation(tenant="tenant-abc")

if impl.hooks:
hook = impl.hooks[0]
result = client.call_hook(
Expand All @@ -262,13 +265,13 @@ def call_hook(
"""
headers = {**_JSONRPC_HEADERS}
inject(headers)

message_payload: dict[str, Any] = {}
if hook_config.payload is not None:
model_dump = getattr(hook_config.payload, "model_dump", None)
if callable(model_dump):
message_payload = cast(dict[str, Any], model_dump(exclude_none=True))

# 1. Execute workflow
execute_workflow_arguments = {
"workflowId": hook.n8n_workflow_config.workflow_id,
Expand All @@ -282,7 +285,7 @@ def call_hook(
},
},
}

try:
with httpx.Client(
headers={"Authorization": f"Bearer {hook_config.auth_token}"},
Expand All @@ -301,24 +304,24 @@ def call_hook(
raise TransportError(
f"HTTP request to hook MCP endpoint failed: {exc}"
) from exc

try:
data = _extract_tool_result(_parse_response(tool_resp))
except TransportError:
raise
except Exception as exc:
raise TransportError(f"Could not parse hook response: {exc}") from exc

status = data.get("status", "")

# 2. Fail fast on terminal statuses from execute-workflow
if status in _EXECUTE_TERMINAL_STATUSES:
error_msg = data.get("error", "")
raise TransportError(
f"Workflow execution failed with status {status!r}"
+ (f": {error_msg}" if error_msg else "")
)

# 3. Return immediately if execution completed synchronously
if status == "success":
try:
Expand All @@ -336,20 +339,20 @@ def call_hook(
raise TransportError(
f"Failed to extract response from last executed node: {exc}"
) from exc

# 4. Poll get-execution for running/new/waiting/started
execution_id = data.get("executionId")
get_execution_arguments = {
"workflowId": hook.n8n_workflow_config.workflow_id,
"executionId": str(execution_id),
"includeData": True,
}

deadline = time.monotonic() + hook.timeout
last_status = status
while time.monotonic() < deadline:
time.sleep(_HOOK_POLL_INTERVAL)

try:
with httpx.Client(
headers={"Authorization": f"Bearer {hook_config.auth_token}"},
Expand All @@ -368,13 +371,215 @@ def call_hook(
raise TransportError(
f"HTTP request to hook MCP endpoint failed: {exc}"
) from exc

try:
data = _extract_tool_result(_parse_response(tool_resp))
except TransportError:
raise
except Exception as exc:
raise TransportError(f"Could not parse hook response: {exc}") from exc

last_status = data.get("execution", {}).get("status", "") or data.get(
"status", ""
)

if last_status == "success":
try:
result_data = data.get("data", {}).get("resultData", {})
last_node = result_data.get("lastNodeExecuted", "")
response_json = (
result_data.get("runData", {})
.get(last_node, [{}])[0]
.get("data", {})
.get("main", [[{}]])[0][0]
.get("json", {})
)
return Message(**response_json)
except (KeyError, IndexError, TypeError, ValidationError) as exc:
raise TransportError(
f"Failed to extract response from last executed node: {exc}"
) from exc

if last_status in _EXECUTION_TERMINAL_STATUSES:
error_msg = data.get("error", "")
raise TransportError(
f"Workflow execution failed with status {last_status!r}"
+ (f": {error_msg}" if error_msg else "")
)

# Continue polling for: running, waiting, new, unknown

raise TransportError(
f"Workflow execution timed out after {hook.timeout}s. "
f"Last status: {last_status!r}"
)


@record_metrics(
Module.EXTENSIBILITY,
Operation.EXTENSIBILITY_CALL_HOOK,
)
async def call_hook(
self,
hook: Hook,
user_token: Optional[str] = None,
message: Optional[Any] = None,
headers: Optional[dict] = None,
tenant_subdomain: Optional[str] = None
) -> Optional[Message]:
"""Call a hook via Agent Gateway MCP tool invocation.

Discovers the N8N MCP tools via Agent Gateway, executes the workflow via
``execute_workflow``, then polls ``get_execution`` every 500 ms until the
execution succeeds, fails, or ``hook.timeout`` seconds elapse.

Auth and endpoint resolution are handled internally by the AGW client —
no manual token or URL configuration is required.

Args:
hook: Hook configuration (workflow ID, method, timeout).
agw_client: Configured Agent Gateway client used for tool discovery
and invocation.

Returns:
Parsed ``Message`` from the last executed workflow node, or ``None``
if the hook completed successfully but produced no message.

Raises:
TransportError: On AGW tool call errors, terminal execution failures,
or timeout.

Example:
```python
from sap_cloud_sdk.extensibility import call_hook
from sap_cloud_sdk.agentgateway import create_client as create_agw_client

agw_client = create_agw_client(tenant_subdomain="my-tenant")

result = await call_hook(
hook=impl.hooks[0],
agw_client=agw_client,
)
```
"""
# 1. Create AGW client for the given tenant subdomain.
agw_client = None
agw_client = create_agw_client(tenant_subdomain)

# 2. Discover MCP tools — AGW resolves N8N GTID and handles auth internally
# TODO: Cache the list of mcp tools for performance.
tools = await agw_client.list_mcp_tools(user_token=user_token or None)

execute_tool = next(
(
t for t in tools
if t.name == _EXECUTE_WORKFLOW_TOOL_NAME and t.server_name == _N8N_MCP_SERVER_NAME
),
None,
)
if execute_tool is None:
raise TransportError(
f"MCP tool '{_EXECUTE_WORKFLOW_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' "
"not found via Agent Gateway."
)

get_exec_tool = next(
(
t for t in tools
if t.name == _GET_EXECUTION_TOOL_NAME and t.server_name == _N8N_MCP_SERVER_NAME
),
None,
)
if get_exec_tool is None:
raise TransportError(
f"MCP tool '{_GET_EXECUTION_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' "
"not found via Agent Gateway."
)

# 3. Execute workflow
message_body = message.model_dump(mode="json") if message is not None else {}
execute_arguments = {
"workflowId": hook.n8n_workflow_config.workflow_id,
"inputs": {
"type": "webhook",
"webhookData": {
"method": hook.n8n_workflow_config.method,
"query": {},
"body": message_body,
"headers": headers or {},
},
},
}
try:
result_str = await agw_client.call_mcp_tool(
execute_tool,
user_token=user_token or None,
**execute_arguments,
)
except Exception as exc:
raise TransportError(
f"AGW tool call for '{_EXECUTE_WORKFLOW_TOOL_NAME}' failed: {exc}"
) from exc

try:
data = json.loads(result_str)
except Exception as exc:
raise TransportError(f"Could not parse hook response: {exc}") from exc

status = data.get("status", "")

if status in _EXECUTE_TERMINAL_STATUSES:
error_msg = data.get("error", "")
raise TransportError(
f"Workflow execution failed with status {status!r}"
+ (f": {error_msg}" if error_msg else "")
)

if status == "success":
try:
result_data = data.get("data", {}).get("resultData", {})
last_node = result_data.get("lastNodeExecuted", "")
response_json = (
result_data.get("runData", {})
.get(last_node, [{}])[0]
.get("data", {})
.get("main", [[{}]])[0][0]
.get("json", {})
)
return Message(**response_json)
except (KeyError, IndexError, TypeError, ValidationError) as exc:
raise TransportError(
f"Failed to extract response from last executed node: {exc}"
) from exc

# 4. Poll get_execution for running/new/waiting/started
execution_id = data.get("executionId")
deadline = time.monotonic() + hook.timeout
last_status = status

while time.monotonic() < deadline:
await asyncio.sleep(_HOOK_POLL_INTERVAL)

try:
get_execution_arguments = {
"workflowId": hook.n8n_workflow_config.workflow_id,
"executionId": str(execution_id),
"includeData": True,
}
result_str = await agw_client.call_mcp_tool(
get_exec_tool,
user_token=user_token or None,
**get_execution_arguments,
)
except Exception as exc:
raise TransportError(
f"AGW tool call for '{_GET_EXECUTION_TOOL_NAME}' failed: {exc}"
) from exc

try:
data = json.loads(result_str)
except Exception as exc:
raise TransportError(f"Could not parse hook response: {exc}") from exc

last_status = data.get("execution", {}).get("status", "") or data.get(
"status", ""
Expand Down Expand Up @@ -404,8 +609,6 @@ def call_hook(
+ (f": {error_msg}" if error_msg else "")
)

# Continue polling for: running, waiting, new, unknown

raise TransportError(
f"Workflow execution timed out after {hook.timeout}s. "
f"Last status: {last_status!r}"
Expand Down
Loading
Loading