Skip to content
5 changes: 4 additions & 1 deletion python/packages/ag-ui/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@ A frontend can then hydrate the latest stored snapshot for the scoped thread:

Endpoint configuration requires `snapshot_scope_resolver` whenever a snapshot store is configured, including when
the store is already set on a pre-wrapped `AgentFrameworkAgent` or `AgentFrameworkWorkflow`. The resolver returns
the application-defined Snapshot Scope used with the AG-UI Thread id as the storage key.
the application-defined Snapshot Scope used with the AG-UI Thread id as the storage key. When using
`AgentFrameworkWorkflow(workflow_factory=...)`, the same resolver also scopes the in-memory workflow cache even
without a snapshot store; provide it in multi-user deployments so two users who submit the same `threadId` do not
share a live `Workflow` instance.

AG-UI Thread ids identify AG-UI Threads; they do not authorize snapshot access. Do not treat a thread id as a bearer
credential or tenant boundary. Production applications must authenticate and authorize every AG-UI endpoint request
Expand Down
5 changes: 2 additions & 3 deletions python/packages/ag-ui/agent_framework_ag_ui/_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,12 @@ async def agent_endpoint(request_body: AGUIRequest) -> StreamingResponse:
"""
try:
input_data = request_body.model_dump(exclude_none=True)
snapshot_persistence_active = False
if snapshot_scope_resolver is not None and _get_snapshot_store(protocol_runner) is not None:
snapshot_persistence_active = _get_snapshot_store(protocol_runner) is not None
if snapshot_scope_resolver is not None:
snapshot_scope = snapshot_scope_resolver(request_body)
if isawaitable(snapshot_scope):
snapshot_scope = await snapshot_scope
input_data[_SNAPSHOT_SCOPE_INPUT_KEY] = snapshot_scope
snapshot_persistence_active = True
if default_state:
if snapshot_persistence_active:
# Defer default application to the runner so defaults only fill keys
Expand Down
5 changes: 3 additions & 2 deletions python/packages/ag-ui/agent_framework_ag_ui/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ def __init__(
self.workflow = workflow
self._workflow_factory = workflow_factory
# Cache keyed by (snapshot_scope, thread_id): the Snapshot Scope is the
# authorization boundary, so the same thread id under different scopes
# must never share an in-memory workflow instance.
# authorization boundary for both snapshots and in-memory workflow_factory
# instances, so the same thread id under different scopes must never share
# mutable workflow state.
self._workflow_by_thread: dict[tuple[str | None, str], Workflow] = {}
self.name = name if name is not None else getattr(workflow, "name", "workflow")
self.description = description if description is not None else getattr(workflow, "description", "")
Expand Down
46 changes: 46 additions & 0 deletions python/packages/ag-ui/tests/ag_ui/test_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1842,3 +1842,49 @@ def factory(thread_id: str) -> Any:

runner.clear_thread_workflow("thread-1")
assert runner._resolve_workflow("thread-1", "tenant-b") is not workflow_b


async def test_workflow_factory_cache_is_scoped_by_resolver_without_snapshot_store():
"""Snapshot Scope resolver scopes live workflow_factory instances even without snapshot persistence."""

@executor(id="responder")
async def responder(message: Any, ctx: WorkflowContext) -> None:
del message
await ctx.yield_output("Workflow response")

def factory(thread_id: str) -> Any:
del thread_id
return WorkflowBuilder(start_executor=responder).build()

app = FastAPI()
runner = AgentFrameworkWorkflow(workflow_factory=factory)
add_agent_framework_fastapi_endpoint(
app,
runner,
path="/workflow",
snapshot_scope_resolver=lambda request: request.forwarded_props["tenant"],
)
client = TestClient(app)

response_a = client.post(
"/workflow",
json={
"thread_id": "thread-1",
"messages": [{"role": "user", "content": "Hello tenant A"}],
"forwardedProps": {"tenant": "tenant-a"},
},
)
response_b = client.post(
"/workflow",
json={
"thread_id": "thread-1",
"messages": [{"role": "user", "content": "Hello tenant B"}],
"forwardedProps": {"tenant": "tenant-b"},
},
)

assert response_a.status_code == 200
assert response_b.status_code == 200
workflow_a = runner._resolve_workflow("thread-1", "tenant-a") # pyright: ignore[reportPrivateUsage]
workflow_b = runner._resolve_workflow("thread-1", "tenant-b") # pyright: ignore[reportPrivateUsage]
assert workflow_a is not workflow_b
25 changes: 25 additions & 0 deletions python/packages/foundry_hosting/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,28 @@
# Foundry Hosting

This package provides the integration of Agent Framework agents and workflows with the Foundry Agent Server, which can be hosted on Foundry infrastructure.

## Security: Hosted Workflow Session Isolation

Hosted workflow checkpoints are mutable session state. `previous_response_id`, `conversation_id`, and `response_id`
identify checkpoint records; they do not authorize checkpoint access by themselves. `ResponsesHostServer` therefore
stamps each hosted workflow checkpoint directory with a hosted session identity context and rejects later resume/write
attempts whose resolved identity does not match.

By default, `ResponsesHostServer` reads the Foundry platform-provided isolation keys from `ResponseContext.isolation`.
If you host workflows outside the Foundry platform, provide `hosted_session_context_resolver=...` that returns a
`HostedSessionContext` derived from your authenticated user and chat/tenant boundary. Do not derive this context only
from untrusted request body fields.

`strict_session_isolation=True` is the default. This rejects hosted workflow checkpoint requests when no identity
context is available. Local-only tests or demos can set `strict_session_isolation=False`, but production multi-user
deployments should keep strict mode enabled and configure a real provider.

### Approval-handle isolation binding

MCP approval handles (`approval_request_id`) are bearer capabilities: anyone who presents one can redeem the
approval-gated tool call. `ResponsesHostServer` therefore binds each saved approval to the same hosted session identity
(`HostedSessionContext`) resolved above, and rejects redemption whose resolved identity does not match the one the
approval was created under — so a handle issued to one user/conversation cannot be replayed from another. When no
identity is available (e.g. single-tenant or local development without isolation headers), the approval binds to the
empty identity and round-trips normally, so approval flows work the same with or without isolation headers.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@
import importlib.metadata

from ._invocations import InvocationsHostServer
from ._responses import ResponsesHostServer
from ._responses import (
HostedSessionContext,
HostedSessionContextResolver,
ResponsesHostServer,
)

try:
__version__ = importlib.metadata.version(__name__)
except importlib.metadata.PackageNotFoundError:
__version__ = "0.0.0"

__all__ = ["InvocationsHostServer", "ResponsesHostServer"]
__all__ = [
"HostedSessionContext",
"HostedSessionContextResolver",
"InvocationsHostServer",
"ResponsesHostServer",
]
Loading
Loading