Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
13ca5f0
Added constructor-based DI for VariableAssigner v2 and wired the node…
laipz8200 Jan 4, 2026
3ee34c8
Updated to avoid the dict path and let the type checker enforce const…
laipz8200 Jan 4, 2026
cf98aa7
A new command for updating variables. (vibe-kanban 00377ffe)
laipz8200 Jan 4, 2026
0dd1f93
Add a new persistent storage for handling Conversation Variables. (v…
laipz8200 Jan 4, 2026
2e9b37e
- fix(lint): replace `outputs.keys()` iteration in `api/core/app/laye…
laipz8200 Jan 4, 2026
6fdda43
- feat(graph-engine): enforce variable update value types with Segmen…
laipz8200 Jan 4, 2026
3204369
- refactor(variable-assigner-v1): inline updated variable payload and…
laipz8200 Jan 4, 2026
c45590b
- refactor(graph-engine): switch VariableUpdate.value to VariableUnio…
laipz8200 Jan 5, 2026
dca3149
- refactor(variable-assigner): restore common_helpers updated-variabl…
laipz8200 Jan 5, 2026
f611c82
- refactor(variable-assigner): drop updated outputs now that persiste…
laipz8200 Jan 5, 2026
6430cdc
- chore(lint): run `make lint` (fails: dotenv-linter module missing a…
laipz8200 Jan 5, 2026
ee12c40
Revert "- refactor(graph-engine): switch VariableUpdate.value to Vari…
laipz8200 Jan 5, 2026
1a1c6f8
Revert "- feat(graph-engine): enforce variable update value types wit…
laipz8200 Jan 5, 2026
f409218
Revert "A new command for updating variables. (vibe-kanban 00377ffe)"
laipz8200 Jan 5, 2026
f99e6be
- refactor(services): move ConversationVariableUpdaterImpl and factor…
laipz8200 Jan 5, 2026
aca3e78
- chore(lint): run `make lint` (fails: import linter error and dotenv…
laipz8200 Jan 5, 2026
3ed9743
- chore(import-linter): remove obsolete ignore for variable_assigner …
laipz8200 Jan 5, 2026
cd8a75e
fix(core): guard conversation variable flush when no updates
laipz8200 Jan 5, 2026
eada52b
fix(core): use system variable conversation_id and add custom updater…
laipz8200 Jan 6, 2026
4b73fce
fix(services): use Exception for conversation variable not found
laipz8200 Jan 6, 2026
6ad378a
update pr
tomerqodo Jan 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion api/.importlinter
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ ignore_imports =
core.workflow.nodes.llm.llm_utils -> extensions.ext_database
core.workflow.nodes.llm.node -> extensions.ext_database
core.workflow.nodes.tool.tool_node -> extensions.ext_database
core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database
core.workflow.graph_engine.command_channels.redis_channel -> extensions.ext_redis
core.workflow.graph_engine.manager -> extensions.ext_redis
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> extensions.ext_redis
Expand Down
4 changes: 4 additions & 0 deletions api/core/app/apps/advanced_chat/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
QueueTextChunkEvent,
)
from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature
from core.app.layers.conversation_variable_persist_layer import ConversationVariablePersistenceLayer
from core.moderation.base import ModerationError
from core.moderation.input_moderation import InputModeration
from core.variables.variables import VariableUnion
Expand All @@ -40,6 +41,7 @@
from models.enums import UserFrom
from models.model import App, Conversation, Message, MessageAnnotation
from models.workflow import ConversationVariable
from services.conversation_variable_updater import conversation_variable_updater_factory

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -199,6 +201,8 @@ def run(self):
trace_manager=self.application_generate_entity.trace_manager,
)

conversation_variable_layer = ConversationVariablePersistenceLayer(conversation_variable_updater_factory())
workflow_entry.graph_engine.layer(conversation_variable_layer)
workflow_entry.graph_engine.layer(persistence_layer)
for layer in self._graph_engine_layers:
workflow_entry.graph_engine.layer(layer)
Expand Down
56 changes: 56 additions & 0 deletions api/core/app/layers/conversation_variable_persist_layer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging

from core.variables import Variable
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID
from core.workflow.conversation_variable_updater import ConversationVariableUpdater
from core.workflow.enums import NodeType
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_events import GraphEngineEvent, NodeRunSucceededEvent
from core.workflow.nodes.variable_assigner.common import helpers as common_helpers

logger = logging.getLogger(__name__)


class ConversationVariablePersistenceLayer(GraphEngineLayer):
def __init__(self, conversation_variable_updater: ConversationVariableUpdater) -> None:
super().__init__()
self._conversation_variable_updater = conversation_variable_updater

def on_graph_start(self) -> None:
pass

def on_event(self, event: GraphEngineEvent) -> None:
if not isinstance(event, NodeRunSucceededEvent):
return
if event.node_type != NodeType.VARIABLE_ASSIGNER:
return
if self.graph_runtime_state is None:
return

updated_variables = common_helpers.get_updated_variables(event.node_run_result.process_data) or []
if not updated_variables:
return

conversation_id = self.graph_runtime_state.system_variable.conversation_id
if conversation_id is None:
return

for item in updated_variables:
selector = item.selector
if len(selector) < 2:
logger.warning("Conversation variable selector invalid. selector=%s", selector)
continue
if selector[0] != CONVERSATION_VARIABLE_NODE_ID:
continue
variable = self.graph_runtime_state.variable_pool.get(selector)
if not isinstance(variable, Variable):
logger.warning(
"Conversation variable not found in variable pool. selector=%s",
selector,
)
continue
self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable)
self._conversation_variable_updater.flush()

def on_graph_end(self, error: Exception | None) -> None:
pass
1 change: 0 additions & 1 deletion api/core/workflow/nodes/node_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ def create_node(self, node_config: dict[str, object]) -> Node:
code_providers=self._code_providers,
code_limits=self._code_limits,
)

if node_type == NodeType.TEMPLATE_TRANSFORM:
return TemplateTransformNode(
id=node_id,
Expand Down
21 changes: 2 additions & 19 deletions api/core/workflow/nodes/variable_assigner/v1/node.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,37 @@
from collections.abc import Callable, Mapping, Sequence
from typing import TYPE_CHECKING, Any, TypeAlias
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any

from core.variables import SegmentType, Variable
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID
from core.workflow.conversation_variable_updater import ConversationVariableUpdater
from core.workflow.entities import GraphInitParams
from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes.base.node import Node
from core.workflow.nodes.variable_assigner.common import helpers as common_helpers
from core.workflow.nodes.variable_assigner.common.exc import VariableOperatorNodeError

from ..common.impl import conversation_variable_updater_factory
from .node_data import VariableAssignerData, WriteMode

if TYPE_CHECKING:
from core.workflow.runtime import GraphRuntimeState


_CONV_VAR_UPDATER_FACTORY: TypeAlias = Callable[[], ConversationVariableUpdater]


class VariableAssignerNode(Node[VariableAssignerData]):
node_type = NodeType.VARIABLE_ASSIGNER
_conv_var_updater_factory: _CONV_VAR_UPDATER_FACTORY

def __init__(
self,
id: str,
config: Mapping[str, Any],
graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState",
conv_var_updater_factory: _CONV_VAR_UPDATER_FACTORY = conversation_variable_updater_factory,
):
super().__init__(
id=id,
config=config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)
self._conv_var_updater_factory = conv_var_updater_factory

@classmethod
def version(cls) -> str:
Expand Down Expand Up @@ -96,16 +88,7 @@ def _run(self) -> NodeRunResult:
# Over write the variable.
self.graph_runtime_state.variable_pool.add(assigned_variable_selector, updated_variable)

# TODO: Move database operation to the pipeline.
# Update conversation variable.
conversation_id = self.graph_runtime_state.variable_pool.get(["sys", "conversation_id"])
if not conversation_id:
raise VariableOperatorNodeError("conversation_id not found")
conv_var_updater = self._conv_var_updater_factory()
conv_var_updater.update(conversation_id=conversation_id.text, variable=updated_variable)
conv_var_updater.flush()
updated_variables = [common_helpers.variable_to_processed_data(assigned_variable_selector, updated_variable)]

return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={
Expand Down
41 changes: 19 additions & 22 deletions api/core/workflow/nodes/variable_assigner/v2/node.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
import json
from collections.abc import Mapping, MutableMapping, Sequence
from typing import Any, cast
from typing import TYPE_CHECKING, Any

from core.app.entities.app_invoke_entities import InvokeFrom
from core.variables import SegmentType, Variable
from core.variables.consts import SELECTORS_LENGTH
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID
from core.workflow.conversation_variable_updater import ConversationVariableUpdater
from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes.base.node import Node
from core.workflow.nodes.variable_assigner.common import helpers as common_helpers
from core.workflow.nodes.variable_assigner.common.exc import VariableOperatorNodeError
from core.workflow.nodes.variable_assigner.common.impl import conversation_variable_updater_factory

from . import helpers
from .entities import VariableAssignerNodeData, VariableOperationItem
from .enums import InputType, Operation
from .exc import (
ConversationIDNotFoundError,
InputTypeNotSupportedError,
InvalidDataError,
InvalidInputValueError,
OperationNotSupportedError,
VariableNotFoundError,
)

if TYPE_CHECKING:
from core.workflow.entities import GraphInitParams
from core.workflow.runtime import GraphRuntimeState


def _target_mapping_from_item(mapping: MutableMapping[str, Sequence[str]], node_id: str, item: VariableOperationItem):
selector_node_id = item.variable_selector[0]
Expand Down Expand Up @@ -53,6 +53,20 @@ def _source_mapping_from_item(mapping: MutableMapping[str, Sequence[str]], node_
class VariableAssignerNode(Node[VariableAssignerNodeData]):
node_type = NodeType.VARIABLE_ASSIGNER

def __init__(
self,
id: str,
config: Mapping[str, Any],
graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState",
):
super().__init__(
id=id,
config=config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)

def blocks_variable_output(self, variable_selectors: set[tuple[str, ...]]) -> bool:
"""
Check if this Variable Assigner node blocks the output of specific variables.
Expand All @@ -70,9 +84,6 @@ def blocks_variable_output(self, variable_selectors: set[tuple[str, ...]]) -> bo

return False

def _conv_var_updater_factory(self) -> ConversationVariableUpdater:
return conversation_variable_updater_factory()

@classmethod
def version(cls) -> str:
return "2"
Expand Down Expand Up @@ -179,26 +190,12 @@ def _run(self) -> NodeRunResult:
# remove the duplicated items first.
updated_variable_selectors = list(set(map(tuple, updated_variable_selectors)))

conv_var_updater = self._conv_var_updater_factory()
# Update variables
for selector in updated_variable_selectors:
variable = self.graph_runtime_state.variable_pool.get(selector)
if not isinstance(variable, Variable):
raise VariableNotFoundError(variable_selector=selector)
process_data[variable.name] = variable.value

if variable.selector[0] == CONVERSATION_VARIABLE_NODE_ID:
conversation_id = self.graph_runtime_state.variable_pool.get(["sys", "conversation_id"])
if not conversation_id:
if self.invoke_from != InvokeFrom.DEBUGGER:
raise ConversationIDNotFoundError
else:
conversation_id = conversation_id.value
conv_var_updater.update(
conversation_id=cast(str, conversation_id),
variable=variable,
)
conv_var_updater.flush()
updated_variables = [
common_helpers.variable_to_processed_data(selector, seg)
for selector in updated_variable_selectors
Expand Down
4 changes: 2 additions & 2 deletions api/core/workflow/runtime/graph_runtime_state_protocol.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Mapping
from collections.abc import Mapping, Sequence
from typing import Any, Protocol

from core.model_runtime.entities.llm_entities import LLMUsage
Expand All @@ -9,7 +9,7 @@
class ReadOnlyVariablePool(Protocol):
"""Read-only interface for VariablePool."""

def get(self, node_id: str, variable_key: str) -> Segment | None:
def get(self, selector: Sequence[str], /) -> Segment | None:
"""Get a variable value (read-only)."""
...

Expand Down
6 changes: 3 additions & 3 deletions api/core/workflow/runtime/read_only_wrappers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from collections.abc import Mapping
from collections.abc import Mapping, Sequence
from copy import deepcopy
from typing import Any

Expand All @@ -18,9 +18,9 @@ class ReadOnlyVariablePoolWrapper:
def __init__(self, variable_pool: VariablePool) -> None:
self._variable_pool = variable_pool

def get(self, node_id: str, variable_key: str) -> Segment | None:
def get(self, selector: Sequence[str], /) -> Segment | None:
"""Return a copy of a variable value if present."""
value = self._variable_pool.get([node_id, variable_key])
value = self._variable_pool.get(selector)
return deepcopy(value) if value is not None else None

def get_all_by_node(self, node_id: str) -> Mapping[str, object]:
Expand Down
2 changes: 1 addition & 1 deletion api/services/conversation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
from core.db.session_factory import session_factory
from core.llm_generator.llm_generator import LLMGenerator
from core.variables.types import SegmentType
from core.workflow.nodes.variable_assigner.common.impl import conversation_variable_updater_factory
from extensions.ext_database import db
from factories import variable_factory
from libs.datetime_utils import naive_utc_now
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models import Account, ConversationVariable
from models.model import App, Conversation, EndUser, Message
from services.conversation_variable_updater import conversation_variable_updater_factory
from services.errors.conversation import (
ConversationNotExistsError,
ConversationVariableNotExistsError,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,24 @@
from extensions.ext_database import db
from models import ConversationVariable

from .exc import VariableOperatorNodeError

class ConversationVariableNotFoundError(Exception):
pass


class ConversationVariableUpdaterImpl:
def update(self, conversation_id: str, variable: Variable):
def update(self, conversation_id: str, variable: Variable) -> None:
stmt = select(ConversationVariable).where(
ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id
)
with Session(db.engine) as session:
row = session.scalar(stmt)
if not row:
raise VariableOperatorNodeError("conversation variable not found in the database")
row.data = variable.model_dump_json()
session.commit()

def flush(self):
session = Session(db.engine)
row = session.scalar(stmt)
if not row:
raise ConversationVariableNotFoundError("conversation variable not found in the database")
row.data = variable.model_dump_json()
session.commit()
Comment on lines +14 to +23
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, check if the file exists and view the current code
fd conversation_variable_updater.py

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 179


🏁 Script executed:

# Search for other session usage patterns in the codebase to understand conventions
rg "Session\(db\.engine\)" --type py -A 5

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 50395


🏁 Script executed:

# Check if there's any session management pattern already established
rg "with Session" --type py -A 3 | head -50

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 3650


🌐 Web query:

SQLAlchemy 2.x session lifecycle management context manager best practices

💡 Result:

Concise best-practices for SQLAlchemy 2.x session lifecycle (context-manager focused)

  • Create a single sessionmaker (bound to the Engine) at module/app scope and use it to produce Sessions; don’t instantiate Sessions inside low-level business logic. [1][2]
  • Scope Sessions to a short, explicit block using context managers so close() is automatic:
    • Use Session() as a context manager to ensure .close() runs. [1][3]
    • Use session.begin() (or Session.begin() from a sessionmaker) as a nested context to get automatic begin→commit and rollback on exception. Example: with Session.begin() as session: ... . [1][2][3]
  • Prefer explicit transaction boundaries over implicit usage:
    • Rely on session.begin() (or the session.begin() context) rather than letting multiple functions implicitly trigger autobegin. Consider disabling autobegin (Session(..., autobegin=False)) if you must force explicit begin points. [1][2]
  • Use begin_nested() for per-item savepoints (e.g., catch IntegrityError on individual rows without aborting the outer transaction). [3]
  • Handle resources and errors:
    • Context managers will commit on success and rollback on exceptions; they also close/reset the Session when leaving scope. Avoid manual try/except around commit/rollback when using the context manager. [1][2]
  • Be mindful of object state after commit:
    • Objects are expired by default on commit (expire_on_commit); set or override if you need detached objects to retain loaded state. [1][2]
  • For frameworks/requests:
    • Manage the Session lifecycle at the request/module level (open at request start, begin transaction as needed, commit/rollback and close at request end). Keep data-access functions accept a Session parameter instead of creating one. [2]
  • Other notes:
    • session.close() will rollback any open transaction and release connections; use it (or the context manager) to avoid leaked connections. [1][3]

Sources: SQLAlchemy 2.x docs — Session API & Session Basics; Transactions & session transaction patterns. [1][2][3]

References
[1] SQLAlchemy — Session API / sessionmaker.begin() / context manager usage.
[2] SQLAlchemy — Session Basics (scoping, autobegin, expire_on_commit; recommended pattern to pass session into functions).
[3] SQLAlchemy — Transactions & Session.begin_nested(), explicit begin, close/rollback behavior.


🏁 Script executed:

# View the specific file to confirm current code state
sed -n '14,23p' api/services/conversation_variable_updater.py

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 589


🏁 Script executed:

# Check if there are other instances of bare Session() without context manager in this file
rg "Session\(db\.engine\)" api/services/conversation_variable_updater.py -A 10

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 393


Wrap session creation in a context manager to ensure proper resource cleanup.

Session(db.engine) is created without any cleanup mechanism. If an exception occurs before session.commit(), the session remains open and the connection is leaked back to the pool. Use with Session(db.engine) as session: to automatically close the session and rollback any open transaction, matching the established pattern throughout the codebase.

🔧 Suggested fix
-        session = Session(db.engine)
-        row = session.scalar(stmt)
-        if not row:
-            raise ConversationVariableNotFoundError("conversation variable not found in the database")
-        row.data = variable.model_dump_json()
-        session.commit()
+        with Session(db.engine) as session:
+            row = session.scalar(stmt)
+            if not row:
+                raise ConversationVariableNotFoundError("conversation variable not found in the database")
+            row.data = variable.model_dump_json()
+            session.commit()
🤖 Prompt for AI Agents
In `@api/services/conversation_variable_updater.py` around lines 14 - 23, The
update method creates a Session(db.engine) without cleanup; change it to use a
context manager so the session is always closed/rolled back on exceptions: wrap
the body of update in "with Session(db.engine) as session:" and perform the
select/row lookup, assignment to row.data (using ConversationVariable and
variable.model_dump_json()), and session.commit() inside that block so
ConversationVariableNotFoundError is still raised when no row is found and
resources are cleaned up automatically.


def flush(self) -> None:
pass


Expand Down
Loading