OpenAI SDK Stateful Agent Example#109
Conversation
…ol for Stateful Conversation Management via SQLAlchemy engine (#316) Adds AsyncDatabricksSession, a session storage implementation for the OpenAI Agents SDK that persists conversation history to Databricks Lakebase. This class subclasses [OpenAI's SQLAlchemySession](https://openai.github.io/openai-agents-python/sessions/sqlalchemy_session/) [original code](https://github.com/openai/openai-agents-python/blob/main/src/agents/extensions/memory/sqlalchemy_session.py) to inherit all SQL logic while adding Lakebase-specific features: - Automatic OAuth token rotation via SQLAlchemy's do_connect event - Instance name resolution and username inference from _LakebasePoolBase More on Session protocol: https://openai.github.io/openai-agents-python/ref/memory/session/#agents.memory.session.Session Using SQLAlchemy's default QueuePool (pool_size=5, max_overflow=10) for connection pooling Usage: ``` from databricks_openai.agents import AsyncDatabricksSession session = AsyncDatabricksSession( session_id=session_id, instance_name=LAKEBASE_INSTANCE_NAME, ) result = Runner.run_streamed(agent, input=messages, session=session) ``` Example queries: ``` curl -X POST http://localhost:8000/invocations \ -H "Content-Type: application/json" \ -d '{"input": [{"role": "user", "content": "Hello I live in SF!"}]}' ``` returns responses with session id: ``` {"object":"response","output":[{"type":"message","id":"__fake_id__","content":[{"annotations":[],"text":"Hi! What part of San Francisco are you in, and what are you looking for—recommendations (food/coffee, parks, things to do), help planning a day, or something else?","type":"output_text","logprobs":[]}],"role":"assistant","status":"completed","provider_data":{"model":"databricks-gpt-5-2","response_id":"chatcmpl-D5Ki6f7TKBNrVVfuxDL0Lu16YCQjz"}}],"custom_outputs":{"session_id":"fd57ff2c-1d66-4da3-ba28-3216d4e6d86e"}} ``` follow-up stateful question: ``` curl -X POST http://localhost:8000/invocations \ -H "Content-Type: application/json" \ -d '{ "input": [{"role": "user", "content": "What city did I say I live in?"}], "custom_inputs": {"session_id": "fd57ff2c-1d66-4da3-ba28-3216d4e6d86e"} }' ``` gives us: ``` {"object":"response","output":[{"type":"message","id":"__fake_id__","content":[{"annotations":[],"text":"You said you live in SF (San Francisco).","type":"output_text","logprobs":[]}],"role":"assistant","status":"completed","provider_data":{"model":"databricks-gpt-5-2","response_id":"chatcmpl-D5KiakiUvMW2weIeLXUuPesxyNpwv"}}],"custom_outputs":{"session_id":"fd57ff2c-1d66-4da3-ba28-3216d4e6d86e"}} ``` testing: unit + integration tests sample agent: [OpenAI MemorySession Stateful Agent Example](databricks/app-templates#109) sample app: https://eng-ml-agent-platform.staging.cloud.databricks.com/apps/j-openai-stateful?o=2850744067564480 <img width="1014" height="886" alt="image" src="https://github.com/user-attachments/assets/b53a3567-7cc2-4967-b8b0-ace481940f2a" />
| "openai-agents>=0.4.1", | ||
| "python-dotenv", | ||
| "uuid-utils>=0.10.0", | ||
| "databricks-openai[memory]>=0.11.1", |
| ) | ||
|
|
||
| # Lakebase instance name for persistent session storage | ||
| LAKEBASE_INSTANCE_NAME = os.environ.get("LAKEBASE_INSTANCE_NAME", "lakebase") |
There was a problem hiding this comment.
Do we want to add any validation here like we did in the langgraph agent.py for lakebase instance name?
dhruv0811
left a comment
There was a problem hiding this comment.
Looks good overall!
Do we want to add this template to the sync script as well? I can see that the skills are synced, maybe the changes to the sync script didn't get added to this PR?
c77d036 to
80a7ff6
Compare
80a7ff6 to
ac6c759
Compare
| name: /Users/${workspace.current_user.userName}/${bundle.name}-${bundle.target} | ||
|
|
||
| apps: | ||
| agent_openai_agents_sdk_stateful_memory: |
There was a problem hiding this comment.
should we also have a lakebase instance in here?
additionally, can we also add a databricks.yml for the long and short term langgraph examples?
| # user_workspace_client = get_user_workspace_client() | ||
|
|
||
| # Create session for persistent conversation history with your Databricks Lakebase instance | ||
| session = AsyncDatabricksSession( |
There was a problem hiding this comment.
can we clarify that this example is short term memory only?
we can extend this in the future to include context compression like https://openai.github.io/openai-agents-python/sessions/#openai-responses-compaction-sessions
| This template uses OpenAI Agents SDK [Sessions](https://openai.github.io/openai-agents-python/sessions/) to automatically manage conversation history. Sessions store conversation history for a specific session, allowing agents to maintain context without requiring explicit manual memory management. This is particularly useful for building chat applications or multi-turn conversations where you want the agent to remember previous interactions. | ||
|
|
||
| How it works: | ||
| - **Before each run**: The session retrieves prior conversation history and prepends it to the input |
There was a problem hiding this comment.
is the convo history that is retrieved deduped with the items that are coming in? bc clients (ours included) will send back in the whole convo
There was a problem hiding this comment.
I see, looks like deduplication only happens for tool calls but not duplicate conversation messages, this is called during Runner.run called here in runner: https://github.com/openai/openai-agents-python/blob/main/src/agents/run.py#L484
which uses the deduplicating logic here: https://github.com/openai/openai-agents-python/blob/main/src/agents/run_internal/items.py#L167
should we be updating our message to only pass the latest message to Runner.run then? like
latest_message = request.input[-1].model_dump()
result = await Runner.run(agent, [latest_message], session=session)
instead of what we currently do:
messages = [i.model_dump() for i in request.input]
result = await Runner.run(agent, messages, session=session)
this would also match up with the openai example where it passes a single message instead of the history when you pass in session since the runner will get the history from the session:
https://openai.github.io/openai-agents-python/sessions/#sqlalchemy-sessions
There was a problem hiding this comment.
added deduplication logic in utils - session's messages is always 1 less than the total client's messages since client message includes all previous turns + new message
bbqiu
left a comment
There was a problem hiding this comment.
lgtm with one nit: let's rename this to ...sdk-short-term-memory to match the langgraph examples
…gents-sdk-short-term-memory

OpenAI AsyncDatabricksSession Stateful Agent Example
using session protocol class implemented in databricks/databricks-ai-bridge#316