Conversation
| # ensuring fresh tokens are injected via do_connect event. | ||
| engine = create_async_engine( | ||
| url, | ||
| pool_recycle=DEFAULT_POOL_RECYCLE_SECONDS, |
There was a problem hiding this comment.
Connection pooling happens here via SQLAlchemy's default QueuePool (SQLAlchemy defaults to QueuePool) and pool_recycle=2700 setting ensures connections are recycled every 45 minutes (before the 50-min token cache expires), at which point the do_connect event injects a fresh token.
| @@ -0,0 +1,234 @@ | |||
| """ | |||
There was a problem hiding this comment.
do we want this to be importable from databricks_openai.agents?
There was a problem hiding this comment.
sure, I'll make this importable so instead of:
from databricks_openai.agents.session import MemorySession
import path will look like:
from databricks_openai.agents import MemorySession
| DEFAULT_DATABASE = "databricks_postgres" | ||
|
|
||
|
|
||
| class _LakebaseCredentials(_LakebasePoolBase): |
There was a problem hiding this comment.
does it make sense to rename _LakebasePoolBase?
also could you remind me why we didn't have the cache lock as an attribute of the _LakebasePoolBase instance?
There was a problem hiding this comment.
we had separate locks (sync vs async) for the sync vs async LakebasePools we implemented, so each of the subclasses adds their own cache lock
There was a problem hiding this comment.
I'll rename _LakebasePoolBase to be _LakebaseBase as it's a more generic class for resolving lakebase host/username/token caching - the actual pooling logic is actually implemented in the subclasses LakebasePool/AsyncLakebasePool
| return token | ||
|
|
||
|
|
||
| class MemorySession(SQLAlchemySession): |
There was a problem hiding this comment.
pls correct me if i'm wrong, but i think this is async only?
can we leave some clarification about this in the docstrings / throw a helpful error if someone tries to run this synchronously
There was a problem hiding this comment.
yes it's async only - taking a look at the source code all of these classes implement the async interface (since they follow the session protocol):
https://github.com/openai/openai-agents-python/blob/main/src/agents/memory/session.py
i'll cover this in unit tests/rename to AsyncDatabricksSession to make it clearer
| ) | ||
|
|
||
| # Attach event to inject Lakebase token before each connection | ||
| # Note: do_connect fires on sync_engine even for async operations |
There was a problem hiding this comment.
we are creating an async engine right? is this an old comment
There was a problem hiding this comment.
yes, but the async engine is a wrapper around a sync engine:
there is not yet an “async” version of a SQLAlchemy event handler
"Events can be registered at the instance level (e.g. a specific AsyncEngine instance) by associating the event with the sync attribute that refers to the proxied object. For example to register the PoolEvents.connect() event against an AsyncEngine instance, use its AsyncEngine.sync_engine attribute as target."
link: https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#using-events-with-the-asyncio-extension
| return self._credentials.username | ||
|
|
||
| @property | ||
| def connection_url(self) -> str: |
There was a problem hiding this comment.
nit: do we wanna reuse this in _create_engine func above?
| token_cache_duration_seconds=token_cache_duration_seconds, | ||
| ) | ||
|
|
||
| engine = self._create_engine(**engine_kwargs) |
There was a problem hiding this comment.
iiuc, this can potentially be run on every new conversation right?
is there any way to reuse an engine across sessions? if not, should we try to make these operations async via event loops
| return token | ||
|
|
||
|
|
||
| class AsyncDatabricksSession(SQLAlchemySession): |
There was a problem hiding this comment.
from talking to the research team working on DBRA, they actually have a very similar snippet as us to manage a SQLAlchemy connection to lakebase: https://sourcegraph.prod.databricks-corp.com/databricks-eng/universe/-/blob/research/aroll/app/aroll_app/db/connection.py?L162-182
would it make sense for us to further abstract this by providing a similar AsyncLakebaseSQLAlchemy / LakebaseSQLAlchemy class?
There was a problem hiding this comment.
discussed offline but I'll refactor such that:
-
in db-ai-bridge we add AsyncLakebaseSqlAlchemy support for creating SQLAlchemy engines (similar to https://sourcegraph.prod.databricks-corp.com/databricks-eng/universe/-/blob/research/aroll/app/aroll_app/db/connection.py?L185)
-
in db-openai we subclass SQLAlchemySession in AsyncDatabricksSession (session protocol is async + specific to openai agents sdk) and pass in engine from AsyncLakebaseSqlAlchemy
this will create much cleaner separation of concerns for future frameworks to reuse any sqlalchemy engines etc!
…n that returns engines to manage connections to db
| # Class-level cache for AsyncLakebaseSQLAlchemy instances, keyed by instance_name. | ||
| # This allows multiple AsyncDatabricksSession instances to share a single engine/pool. | ||
| _lakebase_sql_alchemy_cache: dict[str, AsyncLakebaseSQLAlchemy] = {} | ||
| _lakebase_sql_alchemy_cache_lock = Lock() |
There was a problem hiding this comment.
thoughts on the class-level cache for AsyncLakebaseSQLAlchemy engines keyed by instance_name?
this is so we reuse a single SQLAlchemy engine / pool per Lakebase instance, avoiding repeated pool creation, TCP handshakes, and auth setup.
sessions are still created per Runner.run(), but engines are shared
There was a problem hiding this comment.
this approach looks good to me to minimize IO. two comments:
- we may want to include a param for a func for customers to customize the cache key. currently, diff engine kwargs for the same instance name will be ignored
- let's also call this out in the docstring and add a param to optionally disable this engine caching
There was a problem hiding this comment.
the best case would be include engine kwargs + instance name in the cache key
There was a problem hiding this comment.
sounds good - going to create cache key that takes into consideration both instance name + engine kwards, as well as ability to not cache the engines (but defaults to caching)
| @@ -6,11 +6,15 @@ | |||
| import uuid | |||
There was a problem hiding this comment.
(ok for followup PR) we should probably think about separating this file into a few separate ones since it's getting quite long
| model="databricks-claude-3-7-sonnet", | ||
| messages=[{"role": "user", "content": "hi"}], | ||
| tools=tools, | ||
| tools=cast(Any, tools), |
There was a problem hiding this comment.
did we delete this change from the diff? i think we still need it cc @fanzeyi who ran into a bug that that was fixing earlier
There was a problem hiding this comment.
added this back here and included unit tests to make sure the non-list inputs are handled gracefully!
| ] | ||
|
|
||
| [project.optional-dependencies] | ||
| memory = [ |
There was a problem hiding this comment.
can we update the CI job for this memory extra too
bbqiu
left a comment
There was a problem hiding this comment.
overall LGTM, please address all comments and this looks ready to merge!
bbqiu
left a comment
There was a problem hiding this comment.
lgtm, feel free to merge after addressing comments!
OpenAI AsyncDatabricksSession Stateful Agent Example using session protocol class implemented in databricks/databricks-ai-bridge#316 * openai agents stateful example * add session id to outputs * update example w/ asyncdatabrickssession * package release agent updates * use uuid7 for example * pr review updates * add openai agent memory skill * add to openai templates sync script * run python sync skills * databricks yml and use chatcontext convo id * sanitize mcp tool output items https://github.com/databricks/app-templates/pull/119/changes * deduplicate input logic * update sanitize mcp handler to be more defensive * rename from agent-openai-agents-sdk-stateful-memory to agent-openai-agents-sdk-short-term-memory
Adds AsyncDatabricksSession, a session storage implementation for the OpenAI Agents SDK that persists conversation history to Databricks Lakebase.
This class subclasses OpenAI's SQLAlchemySession original code to inherit all SQL logic while adding Lakebase-specific features:
More on Session protocol:
https://openai.github.io/openai-agents-python/ref/memory/session/#agents.memory.session.Session
Using SQLAlchemy's default QueuePool (pool_size=5, max_overflow=10) for connection pooling
Usage:
Example queries:
returns responses with session id:
follow-up stateful question:
gives us:
testing:
unit + integration tests
sample agent: OpenAI MemorySession Stateful Agent Example

sample app: https://eng-ml-agent-platform.staging.cloud.databricks.com/apps/j-openai-stateful?o=2850744067564480