Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions src/dedalus_labs/lib/mcp/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,35 +165,55 @@ def _encrypt_credentials(
return EncryptedCredentials(**encrypted)


def _server_connection_name(name: str) -> str:
"""Derive the connection name for a server from its slug or name.

Connection names use dashes instead of slashes.
E.g. "dedalus-labs/gmail-mcp" -> "dedalus-labs-gmail-mcp".

Args:
name: Server slug, URL, or name string.

Returns:
Connection name with slashes replaced by dashes.

"""
return name.replace("/", "-")


def _embed_credentials(
servers: List[MCPServerItem],
encrypted: EncryptedCredentials,
) -> List[MCPServerSpec]:
"""Embed encrypted credentials into each server spec.

Converts slug strings to full specs and adds credentials to all servers.
Each server receives only its own credentials, matched by connection name.

Args:
servers: Serialized MCP servers (slug strings or spec dicts).
encrypted: EncryptedCredentials instance.

Returns:
List of MCPServerSpec dicts with credentials embedded.
List of MCPServerSpec dicts with per-server credentials embedded.

"""
creds_dict = encrypted.to_dict()
all_creds = encrypted.to_dict()
result: List[MCPServerSpec] = []

for server in servers:
if isinstance(server, str):
name = server
conn_name = _server_connection_name(name)
server_creds = {k: v for k, v in all_creds.items() if k == conn_name} or None
if server.startswith(("http://", "https://")):
result.append({"url": server, "name": server, "credentials": creds_dict})
result.append({"url": server, "name": name, "credentials": server_creds})
else:
result.append({"slug": server, "name": server, "credentials": creds_dict})
result.append({"slug": server, "name": name, "credentials": server_creds})
elif isinstance(server, dict):
# Existing spec -> add name (if missing) and credentials
name = server.get("name") or server.get("slug") or server.get("url") or ""
spec: MCPServerSpec = {**server, "name": name, "credentials": creds_dict}
conn_name = _server_connection_name(name)
server_creds = {k: v for k, v in all_creds.items() if k == conn_name} or None
spec: MCPServerSpec = {**server, "name": name, "credentials": server_creds}
result.append(spec)

return result
Loading