feat: Add conversation variable persistence layer #4
Conversation
… factory to pass the ConversationVariableUpdater factory (the only non-VariablePool dependency), plus a unit test to verify the injection path. - `api/core/workflow/nodes/variable_assigner/v2/node.py` adds a kw-only `conv_var_updater_factory` dependency (defaulting to `conversation_variable_updater_factory`) and stores it for use in `_run`. - `api/core/workflow/nodes/node_factory.py` now injects the factory when creating VariableAssigner v2 nodes. - `api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py` adds a test asserting the factory is injected. Tests not run. Next steps (optional): 1) `make lint` 2) `make type-check` 3) `uv run --project api --dev dev/pytest/pytest_unit_tests.sh`
…ructor args. - `api/core/workflow/nodes/node_factory.py` now directly instantiates `VariableAssignerNode` with the injected dependency, and uses a direct call for all other nodes. No tests run.
Add a new command for GraphEngine to update a group of variables. This command takes a group of variable selectors and new values. When the engine receives the command, it will update the corresponding variable in the variable pool. If it does not exist, it will add it; if it does, it will overwrite it. Both behaviors should be treated the same and do not need to be distinguished.
…be-kanban 0941477f) Create a new persistence layer for the Graph Engine. This layer receives a ConversationVariableUpdater upon initialization, which is used to persist the received ConversationVariables to the database. It can retrieve the currently processing ConversationId from the engine's variable pool. It captures the successful execution event of each node and determines whether the type of this node is VariableAssigner(v1 and v2). If so, it retrieves the variable name and value that need to be updated from the node's outputs. This layer is only used in the Advanced Chat. It should be placed outside of Core.Workflow package.
…rs/conversation_variable_persist_layer.py` to satisfy SIM118 - chore(lint): run `make lint` (passes; warnings about missing RECORD during venv package uninstall) - chore(type-check): run `make type-check` (fails: 1275 errors for missing type stubs like `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`)
…tType validation and casting - test(graph-engine): update VariableUpdate usages to include value_type in command tests
… drop common_helpers usage - refactor(variable-assigner-v2): inline updated variable payload and drop common_helpers usage Tests not run.
…n and remove value type validation - test(graph-engine): update UpdateVariablesCommand tests to pass concrete Variable instances - fix(graph-engine): align VariableUpdate values with selector before adding to VariablePool Tests not run.
…e handling for v1/v2 process_data - refactor(app-layer): read updated variables from process_data in conversation variable persistence layer - test(app-layer): adapt persistence layer tests to use common_helpers updated-variable payloads Tests not run.
…nce reads from process_data
…fter venv changes) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs across dependencies) Details: - `make lint` fails with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…ableUnion and remove value type validation" This reverts commit 5ebc87a.
…h SegmentType validation and casting" This reverts commit 3edd525.
This reverts commit 67007f6.
…y out of core.workflow into `api/services/conversation_variable_updater.py` - refactor(app): update advanced chat app runner and conversation service to import the new updater factory Tests not run.
…-linter module missing) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs) Details: - `make lint` reports: `No matches for ignored import core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database` and ends with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing type stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…impl import in `api/.importlinter`
Greptile OverviewGreptile SummaryThis PR refactors conversation variable persistence by extracting database operations from workflow nodes into a dedicated persistence layer, following clean architecture principles. The variable assigner nodes (v1 and v2) were updated to remove inline database calls, delegating persistence to Major Changes:
Critical Issues:
Confidence Score: 1/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant User
participant AppRunner as AdvancedChatAppRunner
participant Layer as ConversationVariablePersistenceLayer
participant Node as VariableAssignerNode
participant Updater as ConversationVariableUpdaterImpl
participant DB as Database
User->>AppRunner: Run workflow with conversation
AppRunner->>AppRunner: Initialize conversation variables from DB
AppRunner->>Layer: Create persistence layer with updater
AppRunner->>AppRunner: Attach layer to workflow engine
AppRunner->>Node: Execute variable assigner node
Node->>Node: Update variable in variable pool
Node->>Node: Add updated variable to process_data
Node-->>Layer: Emit NodeRunSucceededEvent
Layer->>Layer: Check if event is VariableAssigner
Layer->>Layer: Extract updated variables from process_data
Layer->>Layer: Filter for conversation variables only
loop For each conversation variable
Layer->>Updater: update(conversation_id, variable)
Updater->>DB: Create Session (without context manager!)
Updater->>DB: SELECT ConversationVariable
Updater->>DB: UPDATE data field
Updater->>DB: COMMIT (session not closed!)
Layer->>Updater: flush()
Note over Updater: flush() is a no-op
end
Note over Updater,DB: Session leak: Session never closed!
|
| session = Session(db.engine) | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() |
There was a problem hiding this comment.
Session not used as context manager, causing resource leak
| session = Session(db.engine) | |
| row = session.scalar(stmt) | |
| if not row: | |
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | |
| row.data = variable.model_dump_json() | |
| session.commit() | |
| with Session(db.engine, expire_on_commit=False) as session: | |
| row = session.scalar(stmt) | |
| if not row: | |
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | |
| row.data = variable.model_dump_json() | |
| session.commit() |
Prompt To Fix With AI
This is a comment left during a code review.
Path: api/services/conversation_variable_updater.py
Line: 18:23
Comment:
Session not used as context manager, causing resource leak
```suggestion
with Session(db.engine, expire_on_commit=False) as session:
row = session.scalar(stmt)
if not row:
raise ConversationVariableNotFoundError("conversation variable not found in the database")
row.data = variable.model_dump_json()
session.commit()
```
How can I resolve this? If you propose a fix, please make it concise.| self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) | ||
| self._conversation_variable_updater.flush() |
There was a problem hiding this comment.
flush() called after each update() defeats batching purpose. The ConversationVariableUpdater protocol allows batching, but calling flush immediately after each update negates any batching benefits and creates unnecessary database commits in a loop.
Prompt To Fix With AI
This is a comment left during a code review.
Path: api/core/app/layers/conversation_variable_persist_layer.py
Line: 52:53
Comment:
`flush()` called after each `update()` defeats batching purpose. The `ConversationVariableUpdater` protocol allows batching, but calling flush immediately after each update negates any batching benefits and creates unnecessary database commits in a loop.
How can I resolve this? If you propose a fix, please make it concise.
Benchmark PR from agentic-review-benchmarks#4