feat: Add conversation variable persistence layer #4
Conversation
… factory to pass the ConversationVariableUpdater factory (the only non-VariablePool dependency), plus a unit test to verify the injection path. - `api/core/workflow/nodes/variable_assigner/v2/node.py` adds a kw-only `conv_var_updater_factory` dependency (defaulting to `conversation_variable_updater_factory`) and stores it for use in `_run`. - `api/core/workflow/nodes/node_factory.py` now injects the factory when creating VariableAssigner v2 nodes. - `api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py` adds a test asserting the factory is injected. Tests not run. Next steps (optional): 1) `make lint` 2) `make type-check` 3) `uv run --project api --dev dev/pytest/pytest_unit_tests.sh`
…ructor args. - `api/core/workflow/nodes/node_factory.py` now directly instantiates `VariableAssignerNode` with the injected dependency, and uses a direct call for all other nodes. No tests run.
Add a new command for GraphEngine to update a group of variables. This command takes a group of variable selectors and new values. When the engine receives the command, it will update the corresponding variable in the variable pool. If it does not exist, it will add it; if it does, it will overwrite it. Both behaviors should be treated the same and do not need to be distinguished.
…be-kanban 0941477f) Create a new persistence layer for the Graph Engine. This layer receives a ConversationVariableUpdater upon initialization, which is used to persist the received ConversationVariables to the database. It can retrieve the currently processing ConversationId from the engine's variable pool. It captures the successful execution event of each node and determines whether the type of this node is VariableAssigner(v1 and v2). If so, it retrieves the variable name and value that need to be updated from the node's outputs. This layer is only used in the Advanced Chat. It should be placed outside of Core.Workflow package.
…rs/conversation_variable_persist_layer.py` to satisfy SIM118 - chore(lint): run `make lint` (passes; warnings about missing RECORD during venv package uninstall) - chore(type-check): run `make type-check` (fails: 1275 errors for missing type stubs like `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`)
…tType validation and casting - test(graph-engine): update VariableUpdate usages to include value_type in command tests
… drop common_helpers usage - refactor(variable-assigner-v2): inline updated variable payload and drop common_helpers usage Tests not run.
…n and remove value type validation - test(graph-engine): update UpdateVariablesCommand tests to pass concrete Variable instances - fix(graph-engine): align VariableUpdate values with selector before adding to VariablePool Tests not run.
…e handling for v1/v2 process_data - refactor(app-layer): read updated variables from process_data in conversation variable persistence layer - test(app-layer): adapt persistence layer tests to use common_helpers updated-variable payloads Tests not run.
…nce reads from process_data
…fter venv changes) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs across dependencies) Details: - `make lint` fails with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…ableUnion and remove value type validation" This reverts commit 5ebc87a.
…h SegmentType validation and casting" This reverts commit 3edd525.
This reverts commit 67007f6.
…y out of core.workflow into `api/services/conversation_variable_updater.py` - refactor(app): update advanced chat app runner and conversation service to import the new updater factory Tests not run.
…-linter module missing) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs) Details: - `make lint` reports: `No matches for ignored import core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database` and ends with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing type stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…impl import in `api/.importlinter`
| return | ||
|
|
||
| updated_variables = common_helpers.get_updated_variables(event.node_run_result.process_data) or [] | ||
| if not updated_variables: | ||
| return | ||
|
|
||
| conversation_id = self.graph_runtime_state.system_variable.conversation_id | ||
| if conversation_id is None: | ||
| return | ||
|
|
||
| for item in updated_variables: | ||
| selector = item.selector | ||
| if len(selector) < 2: | ||
| logger.warning("Conversation variable selector invalid. selector=%s", selector) | ||
| continue | ||
| if selector[0] != CONVERSATION_VARIABLE_NODE_ID: | ||
| continue | ||
| variable = self.graph_runtime_state.variable_pool.get(selector) | ||
| if not isinstance(variable, Variable): | ||
| logger.warning( | ||
| "Conversation variable not found in variable pool. selector=%s", | ||
| selector, | ||
| ) | ||
| continue | ||
| self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) | ||
| self._conversation_variable_updater.flush() |
There was a problem hiding this comment.
🔴 Conversation variable DB persistence is completely broken after this refactoring. ConversationVariablePersistenceLayer.on_event() runs in the dispatcher thread (a plain threading.Thread at dispatcher.py:74 with no Flask app context), but ConversationVariableUpdaterImpl.update() calls Session(db.engine) where db.engine requires Flask current_app -- this raises RuntimeError on every invocation. The exception is then silently swallowed by EventManager._notify_layers() (event_manager.py:183-186), so the node reports SUCCESS while the DB write never happens. Fix by passing a pre-resolved sessionmaker to the layer (as WorkflowPersistenceLayer does) or pushing Flask app context in the dispatcher thread.
Extended reasoning...
What the bug is
This PR moves conversation variable DB persistence from inside the variable assigner node _run() method (which executes in the worker thread) to a new ConversationVariablePersistenceLayer that runs via on_event(). The problem is twofold:
-
Missing Flask app context: The layer
on_event()is called from the dispatcher thread, which is created as a plainthreading.Threadatdispatcher.py:74with no Flask application context. The worker threads explicitly usepreserve_flask_contexts()(worker.py:134-138), but the dispatcher does not.ConversationVariableUpdaterImpl.update()callsSession(db.engine)wheredbis a Flask-SQLAlchemy instance that requirescurrent_appto resolve the engine. Without app context, this raisesRuntimeError("Working outside of application context")on every single invocation. -
Silent error swallowing:
EventManager._notify_layers()at event_manager.py:182-186 wraps alllayer.on_event()calls intry/except Exceptionthat catches and logs but does not re-raise. The comment even says: "Layer exceptions are caught and logged to prevent disrupting collection." This means the RuntimeError (and any other DB failure) is silently swallowed.
The specific code path
The call chain is:
- Worker thread (HAS Flask context) executes variable assigner node, which now only updates the in-memory variable pool and emits
NodeRunSucceededEvent - Worker puts
NodeRunSucceededEventinto_event_queue - Dispatcher thread (NO Flask context, plain
threading.Threadat dispatcher.py:74) dequeues event event_handler.dispatch(event)->event_collector.collect(event)->_notify_layers(event)ConversationVariablePersistenceLayer.on_event()->updater.update()->Session(db.engine)db.enginerequirescurrent_app->RuntimeError_notify_layerscatches the exception and logs it (event_manager.py:185-186)
Why existing code doesn't prevent it
Other layers like WorkflowPersistenceLayer and PauseStatePersistenceLayer avoid this problem by accepting pre-resolved sessionmaker instances at construction time, so they never need to access db.engine at call time. The new ConversationVariableUpdaterImpl resolves db.engine lazily at call time, which fails without Flask context. The blanket exception handler in _notify_layers was designed to prevent one broken layer from disrupting the event pipeline -- a reasonable design -- but it turns a hard failure into silent data loss.
Impact
This is a critical regression. In the old code, if conv_var_updater.update() failed for any reason (connection error, constraint violation, missing variable), the exception propagated from _run() and the node would fail with NodeRunFailedEvent, visible to the user. In the new code:
- The variable assigner node always reports SUCCESS (the in-memory pool update works fine)
- The DB persistence silently fails every time (RuntimeError from missing Flask context)
- The user sees a successful workflow execution
- On the next conversation turn, the variable reverts to its previous value
- This is silent data loss that would be extremely difficult to debug
Step-by-step proof
- User runs an advanced chat workflow containing a Variable Assigner node that updates a conversation variable
- The worker thread (with Flask context) runs the node. The node updates the in-memory variable pool and returns
NodeRunResult(status=SUCCEEDED, process_data={...with updated_variables...}) - The worker emits
NodeRunSucceededEventinto the event queue - The dispatcher thread (without Flask context) picks up the event and calls
_notify_layers(event) ConversationVariablePersistenceLayer.on_event()runs, finds the updated variable, and callsself._conversation_variable_updater.update(conversation_id=..., variable=...)- Inside
update(),Session(db.engine)is called.dbisflask_sqlalchemy.SQLAlchemy(models/engine.py:25). Accessingdb.enginecalls Flask-SQLAlchemy 3.x internals that requirecurrent_appto look up the cached engine - Since the dispatcher thread has no Flask app context, this raises
RuntimeError("Working outside of application context") _notify_layerscatches the exception at event_manager.py:185 and only logs it- The workflow completes, reporting success to the user
- The conversation variable was never persisted to the database
How to fix
The fix should follow the same pattern as WorkflowPersistenceLayer: pass a pre-resolved sessionmaker (or Session factory) to ConversationVariablePersistenceLayer at construction time, rather than relying on db.engine resolution at call time. Alternatively, the dispatcher thread could be given Flask app context (like the worker threads), but the sessionmaker approach is cleaner and already established as a pattern in this codebase. Additionally, consider whether the layer should propagate critical persistence failures rather than having them silently swallowed.
| session = Session(db.engine) | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() |
There was a problem hiding this comment.
🔴 Bug: ConversationVariableUpdaterImpl.update creates a SQLAlchemy Session(db.engine) but never closes it. The old code used with Session(db.engine) as session: which ensured cleanup, but this was changed to a bare assignment. Every call leaks a database connection, which will exhaust the connection pool in a long-running server.
Extended reasoning...
What the bug is
In api/services/conversation_variable_updater.py, the update() method on line 18 creates a SQLAlchemy session via session = Session(db.engine) but never closes it. The previous implementation used with Session(db.engine) as session: — a context manager that automatically calls session.close() when the block exits, whether normally or via exception.
The specific code path
The diff shows the old code:
with Session(db.engine) as session:
row = session.scalar(stmt)
if not row:
raise VariableOperatorNodeError(...)
row.data = variable.model_dump_json()
session.commit()was replaced with:
session = Session(db.engine)
row = session.scalar(stmt)
if not row:
raise ConversationVariableNotFoundError(...)
row.data = variable.model_dump_json()
session.commit()The session is never closed on any path: (1) the success path after commit(), (2) when ConversationVariableNotFoundError is raised at line 23, or (3) if session.scalar() or session.commit() raise unexpected exceptions.
Why existing code does not prevent it
There is no try/finally block, no context manager, and no explicit session.close() call anywhere in the method. The session object will only be cleaned up when Python garbage collector collects it, which is non-deterministic and unreliable — especially in long-running server processes where reference cycles can delay collection indefinitely.
Step-by-step proof
ConversationVariablePersistenceLayer.on_event()callsself._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable)(line 51 ofconversation_variable_persist_layer.py).- This invokes
ConversationVariableUpdaterImpl.update()inconversation_variable_updater.py. - Line 18:
session = Session(db.engine)— a new session is created, checking out a connection from the pool. - Line 19:
session.scalar(stmt)executes a query. - Lines 20-23: Either raises
ConversationVariableNotFoundError(session not closed) or setsrow.dataand callssession.commit()(session still not closed). - The method returns. The local variable
sessiongoes out of scope, but the session is not closed. The connection remains checked out from the pool. - This also happens via
conversation_service.pyline 282 whereupdater.update(conversation_id, updated_variable)is called.
Impact
Every conversation variable update leaks a database connection. In production, this will gradually exhaust the SQLAlchemy connection pool, leading to hangs or errors when the application can no longer obtain database connections. This is a regression introduced by this PR.
Fix
Restore the context manager pattern:
with Session(db.engine) as session:
row = session.scalar(stmt)
if not row:
raise ConversationVariableNotFoundError(...)
row.data = variable.model_dump_json()
session.commit()| selector, | ||
| ) | ||
| continue | ||
| self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) | ||
| self._conversation_variable_updater.flush() |
There was a problem hiding this comment.
🟡 flush() is called inside the for loop (line 53), so it executes once per variable instead of once after all updates are processed. The ConversationVariableUpdater protocol explicitly documents that implementations may batch updates and flush() persists buffered changes -- calling it per-variable defeats batching by design. Move self._conversation_variable_updater.flush() outside the for loop.
Extended reasoning...
What the bug is
In ConversationVariablePersistenceLayer.on_event(), the call to self._conversation_variable_updater.flush() on line 53 is inside the for item in updated_variables loop (lines 38-53). This means flush() is called N times for N conversation variables, instead of once after all variables have been updated.
How it manifests
Consider a VariableAssigner node that updates 3 conversation variables in a single execution. The current code path is:
- Loop iteration 1: update(var_1) then flush()
- Loop iteration 2: update(var_2) then flush()
- Loop iteration 3: update(var_3) then flush()
The correct behavior (matching the old v2 node code that this layer replaces) would be:
- update(var_1)
- update(var_2)
- update(var_3)
- flush() -- once, after all updates
Why existing code does not prevent it
The current ConversationVariableUpdaterImpl.flush() is a no-op (it just does pass), and update() already calls session.commit() per variable. So in the current implementation there is no observable behavioral difference. However, this contradicts the protocol contract defined in core/workflow/conversation_variable_updater.py (lines 14-16), which explicitly states: "Implementations may choose to batch updates. If batching is used, the flush method should be implemented to persist buffered changes."
Impact
Today the impact is negligible since flush() is a no-op. However, if anyone implements a batching ConversationVariableUpdater (as the protocol explicitly envisions), calling flush() per-variable would either: (a) defeat the purpose of batching entirely by flushing after each individual update, or (b) cause unexpected intermediate commits when only a final commit was intended. The old v2 code in nodes/variable_assigner/v2/node.py correctly called conv_var_updater.flush() once outside the loop -- this PR refactoring inadvertently moved it inside.
How to fix
Move line 53 (self._conversation_variable_updater.flush()) outside and below the for loop, so it is called once after all variables have been updated. The corrected structure should be:
Note that the test test_persists_conversation_variables_from_assigner_output only tests with a single variable, so it asserts updater.flush.assert_called_once() and passes even with the current buggy code. A test with multiple conversation variables would reveal that flush() is called multiple times.
Benchmark PR from agentic-review-benchmarks#4