feat: Add conversation variable persistence layer #4
Conversation
… factory to pass the ConversationVariableUpdater factory (the only non-VariablePool dependency), plus a unit test to verify the injection path. - `api/core/workflow/nodes/variable_assigner/v2/node.py` adds a kw-only `conv_var_updater_factory` dependency (defaulting to `conversation_variable_updater_factory`) and stores it for use in `_run`. - `api/core/workflow/nodes/node_factory.py` now injects the factory when creating VariableAssigner v2 nodes. - `api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py` adds a test asserting the factory is injected. Tests not run. Next steps (optional): 1) `make lint` 2) `make type-check` 3) `uv run --project api --dev dev/pytest/pytest_unit_tests.sh`
…ructor args. - `api/core/workflow/nodes/node_factory.py` now directly instantiates `VariableAssignerNode` with the injected dependency, and uses a direct call for all other nodes. No tests run.
Add a new command for GraphEngine to update a group of variables. This command takes a group of variable selectors and new values. When the engine receives the command, it will update the corresponding variable in the variable pool. If it does not exist, it will add it; if it does, it will overwrite it. Both behaviors should be treated the same and do not need to be distinguished.
…be-kanban 0941477f) Create a new persistence layer for the Graph Engine. This layer receives a ConversationVariableUpdater upon initialization, which is used to persist the received ConversationVariables to the database. It can retrieve the currently processing ConversationId from the engine's variable pool. It captures the successful execution event of each node and determines whether the type of this node is VariableAssigner(v1 and v2). If so, it retrieves the variable name and value that need to be updated from the node's outputs. This layer is only used in the Advanced Chat. It should be placed outside of Core.Workflow package.
…rs/conversation_variable_persist_layer.py` to satisfy SIM118 - chore(lint): run `make lint` (passes; warnings about missing RECORD during venv package uninstall) - chore(type-check): run `make type-check` (fails: 1275 errors for missing type stubs like `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`)
…tType validation and casting - test(graph-engine): update VariableUpdate usages to include value_type in command tests
… drop common_helpers usage - refactor(variable-assigner-v2): inline updated variable payload and drop common_helpers usage Tests not run.
…n and remove value type validation - test(graph-engine): update UpdateVariablesCommand tests to pass concrete Variable instances - fix(graph-engine): align VariableUpdate values with selector before adding to VariablePool Tests not run.
…e handling for v1/v2 process_data - refactor(app-layer): read updated variables from process_data in conversation variable persistence layer - test(app-layer): adapt persistence layer tests to use common_helpers updated-variable payloads Tests not run.
…nce reads from process_data
…fter venv changes) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs across dependencies) Details: - `make lint` fails with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…ableUnion and remove value type validation" This reverts commit 5ebc87a.
…h SegmentType validation and casting" This reverts commit 3edd525.
This reverts commit 67007f6.
…y out of core.workflow into `api/services/conversation_variable_updater.py` - refactor(app): update advanced chat app runner and conversation service to import the new updater factory Tests not run.
…-linter module missing) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs) Details: - `make lint` reports: `No matches for ignored import core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database` and ends with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing type stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…impl import in `api/.importlinter`
Code Review by Qodo
1. Session(db.engine) no context
|
| def update(self, conversation_id: str, variable: Variable) -> None: | ||
| stmt = select(ConversationVariable).where( | ||
| ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id | ||
| ) | ||
| with Session(db.engine) as session: | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise VariableOperatorNodeError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() | ||
|
|
||
| def flush(self): | ||
| session = Session(db.engine) | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() |
There was a problem hiding this comment.
1. session(db.engine) no context 📘 Rule violation ⛯ Reliability
• ConversationVariableUpdaterImpl.update() opens a SQLAlchemy Session without a `with Session(...) as session:` context manager, so the session lifecycle is not reliably closed. • This violates the session management requirement and can lead to connection/resource leaks under load.
Agent prompt
## Issue description
`ConversationVariableUpdaterImpl.update()` creates a SQLAlchemy `Session` without a context manager, violating the required pattern and risking session leaks.
## Issue Context
Compliance requires `with Session(db.engine, expire_on_commit=False) as session:` for session lifecycle management.
## Fix Focus Areas
- api/services/conversation_variable_updater.py[14-23]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| class ConversationVariableNotFoundError(Exception): | ||
| pass | ||
|
|
||
|
|
||
| class ConversationVariableUpdaterImpl: | ||
| def update(self, conversation_id: str, variable: Variable): | ||
| def update(self, conversation_id: str, variable: Variable) -> None: | ||
| stmt = select(ConversationVariable).where( | ||
| ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id | ||
| ) | ||
| with Session(db.engine) as session: | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise VariableOperatorNodeError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() | ||
|
|
||
| def flush(self): | ||
| session = Session(db.engine) | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() |
There was a problem hiding this comment.
2. Non-domain exception raised 📘 Rule violation ✓ Correctness
• ConversationVariableUpdaterImpl.update() raises ConversationVariableNotFoundError, which is defined locally in services/conversation_variable_updater.py instead of using the domain exception modules (services/errors or core/errors). • This reduces consistency in error translation at controller boundaries and violates the project’s domain-exception requirement.
Agent prompt
## Issue description
A locally-defined exception (`ConversationVariableNotFoundError`) is raised from a service module, instead of using the project’s domain exception locations.
## Issue Context
Business logic should raise exceptions defined under `services/errors` or `core/errors` so controllers can translate them consistently.
## Fix Focus Areas
- api/services/conversation_variable_updater.py[9-22]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() |
There was a problem hiding this comment.
3. Db error lacks context 📘 Rule violation ✧ Quality
• The raised message conversation variable not found in the database does not include key debugging context like conversation_id and variable.id. • This makes production debugging and incident reconstruction harder because the error does not identify which record lookup failed.
Agent prompt
## Issue description
The raised database error lacks actionable context (no `conversation_id` / `variable.id`), making debugging and incident analysis difficult.
## Issue Context
The compliance rule expects meaningful, contextual errors and explicit handling of failure points.
## Fix Focus Areas
- api/services/conversation_variable_updater.py[20-22]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| def __init__( | ||
| self, | ||
| id: str, | ||
| config: Mapping[str, Any], | ||
| graph_init_params: "GraphInitParams", | ||
| graph_runtime_state: "GraphRuntimeState", | ||
| ): |
There was a problem hiding this comment.
4. init missing -> none 📘 Rule violation ✓ Correctness
• VariableAssignerNode.__init__ in v2 was added without an explicit return type annotation. • This violates the requirement that all Python function definitions include parameter and return type annotations using modern syntax.
Agent prompt
## Issue description
A newly-added `__init__` method is missing an explicit return type annotation (`-> None`).
## Issue Context
The repo requires modern Python 3.12+ type annotations for all function parameters and return values.
## Fix Focus Areas
- api/core/workflow/nodes/variable_assigner/v2/node.py[56-62]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| def _build_graph_runtime_state( | ||
| variable_pool: MockReadOnlyVariablePool, | ||
| conversation_id: str | None = None, | ||
| ) -> ReadOnlyGraphRuntimeState: | ||
| graph_runtime_state = Mock(spec=ReadOnlyGraphRuntimeState) | ||
| graph_runtime_state.variable_pool = variable_pool | ||
| graph_runtime_state.system_variable = SystemVariable(conversation_id=conversation_id).as_view() | ||
| return graph_runtime_state | ||
|
|
||
|
|
||
| def _build_node_run_succeeded_event( | ||
| *, | ||
| node_type: NodeType, | ||
| outputs: dict[str, object] | None = None, | ||
| process_data: dict[str, object] | None = None, | ||
| ) -> NodeRunSucceededEvent: | ||
| return NodeRunSucceededEvent( | ||
| id="node-exec-id", | ||
| node_id="assigner", | ||
| node_type=node_type, | ||
| start_at=datetime.utcnow(), | ||
| node_run_result=NodeRunResult( | ||
| status=WorkflowNodeExecutionStatus.SUCCEEDED, | ||
| outputs=outputs or {}, | ||
| process_data=process_data or {}, | ||
| ), | ||
| ) | ||
|
|
||
|
|
||
| def test_persists_conversation_variables_from_assigner_output(): |
There was a problem hiding this comment.
5. Test funcs lack annotations 📘 Rule violation ✓ Correctness
• New helper/test functions in test_conversation_variable_persist_layer.py are defined without return type annotations (e.g., def test_persists_conversation_variables_from_assigner_output():). • This violates the requirement that all Python function definitions include parameter and return type annotations.
Agent prompt
## Issue description
New pytest test functions were added without return type annotations, violating the project requirement for fully-annotated Python functions.
## Issue Context
Even for tests, the rule requires parameter and return type annotations using modern Python 3.12+ syntax (e.g., `-> None`).
## Fix Focus Areas
- api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py[34-110]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| for item in updated_variables: | ||
| selector = item.selector | ||
| if len(selector) < 2: | ||
| logger.warning("Conversation variable selector invalid. selector=%s", selector) | ||
| continue | ||
| if selector[0] != CONVERSATION_VARIABLE_NODE_ID: | ||
| continue | ||
| variable = self.graph_runtime_state.variable_pool.get(selector) | ||
| if not isinstance(variable, Variable): | ||
| logger.warning( | ||
| "Conversation variable not found in variable pool. selector=%s", | ||
| selector, | ||
| ) | ||
| continue | ||
| self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) | ||
| self._conversation_variable_updater.flush() |
There was a problem hiding this comment.
6. No audit log on update 📘 Rule violation ✧ Quality
• The new conversation-variable persistence flow performs database write actions via self._conversation_variable_updater.update(...) without any audit log entry indicating who performed the change, what changed, and whether it succeeded. • This makes it difficult to reconstruct or investigate changes to conversation variables as part of an audit trail.
Agent prompt
## Issue description
Conversation variable writes occur without creating an audit trail record containing actor identity, action details, and outcome.
## Issue Context
Compliance requires critical write actions to be logged with enough context to reconstruct events.
## Fix Focus Areas
- api/core/app/layers/conversation_variable_persist_layer.py[34-53]
- api/services/conversation_variable_updater.py[14-23]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| conversation_variable_layer = ConversationVariablePersistenceLayer(conversation_variable_updater_factory()) | ||
| workflow_entry.graph_engine.layer(conversation_variable_layer) | ||
| workflow_entry.graph_engine.layer(persistence_layer) |
There was a problem hiding this comment.
7. Shared updater across threads 🐞 Bug ⛯ Reliability
• The graph engine’s EventManager notifies layers outside the collection lock, so layer.on_event() may be invoked concurrently from multiple worker threads. • The ConversationVariableUpdater protocol explicitly warns instances may not be thread-safe, but the PR creates exactly one updater instance and shares it for the whole graph run via the new layer.
Agent prompt
## Issue description
Graph engine layer callbacks can run concurrently, but the PR shares a single `ConversationVariableUpdater` instance for the whole graph execution. The updater protocol warns instances may not be thread-safe.
## Issue Context
`EventManager.collect()` calls `_notify_layers()` outside locks; multiple worker threads can collect events.
## Fix Focus Areas
- api/core/app/apps/advanced_chat/app_runner.py[204-206]
- api/core/app/layers/conversation_variable_persist_layer.py[14-17]
## Suggested change
Option A (recommended):
- Change `ConversationVariablePersistenceLayer` to accept `conversation_variable_updater_factory: Callable[[], ConversationVariableUpdater]`.
- Inside `on_event()`, create a fresh updater instance (per event), call `update()` for all relevant variables, then call `flush()` once.
Option B:
- Keep a single instance but guard `update/flush` with a `threading.Lock` inside the layer.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| for item in updated_variables: | ||
| selector = item.selector | ||
| if len(selector) < 2: | ||
| logger.warning("Conversation variable selector invalid. selector=%s", selector) | ||
| continue | ||
| if selector[0] != CONVERSATION_VARIABLE_NODE_ID: | ||
| continue | ||
| variable = self.graph_runtime_state.variable_pool.get(selector) | ||
| if not isinstance(variable, Variable): | ||
| logger.warning( | ||
| "Conversation variable not found in variable pool. selector=%s", | ||
| selector, | ||
| ) | ||
| continue | ||
| self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) | ||
| self._conversation_variable_updater.flush() | ||
|
|
There was a problem hiding this comment.
8. Partial/hidden persistence failures 🐞 Bug ✓ Correctness
• The persistence layer performs update()/flush() in a loop without per-item error handling; if one variable update raises, remaining variables in that event won’t be persisted. • Layer exceptions are swallowed by the EventManager (logged only), so workflows can succeed while conversation-variable persistence is incomplete—creating hard-to-debug state divergence between in-memory and DB.
Agent prompt
## Issue description
A single failing conversation-variable update aborts the rest of the updates for that event, and the graph continues because EventManager swallows layer exceptions. This can yield partial persistence and silent data divergence.
## Issue Context
`ConversationVariablePersistenceLayer.on_event()` loops and calls `update()`/`flush()` without handling exceptions.
## Fix Focus Areas
- api/core/app/layers/conversation_variable_persist_layer.py[38-54]
- api/core/workflow/graph_engine/event_management/event_manager.py[182-186]
## Suggested change
- Wrap persistence for each variable in try/except, log an error including `conversation_id` and `selector`, and continue with the next item.
- Call `flush()` once after the loop (especially if you move to a buffered updater).
- Optionally: add a warning log when `conversation_id is None` to improve observability of skipped persistence.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
Benchmark PR from agentic-review-benchmarks#4