-
Notifications
You must be signed in to change notification settings - Fork 6
Description
Idempotent Operations & Multi-Agent Coordination
Problem Statement
The dialectical framework performs multi-step graph operations that can fail mid-execution. Currently, each save() and connect() call auto-commits independently to the graph database. If an operation fails partway through:
- Orphaned nodes: Partial graph state with no way to detect or clean up
- Duplicate work: Re-running creates duplicates instead of resuming
- No authorship: Can't distinguish "my incomplete work" from "someone else's work"
- Wasted LLM calls: Expensive AI reasoning is lost on failure
Example: Action-Reflection Failure
think_action_reflection.py performs 22+ sequential save/connect operations:
1. problem_rationale.save() ✓ committed
2. ac_re_wu.save() ✓ committed
3. ac_re_wu.rationales.connect() ✓ committed
4. [loop] component.save() ✓ committed (×6)
5. [loop] manager.connect() ✓ committed (×6)
6. transformation.save() ✓ committed
7. transformation.ac_re.connect() ✓ committed
8. wu.transformation.connect() ✓ committed
9. rationale1.save() ✓ committed
10. transition1.save() ✗ FAILURE HERE
─────────────────────────────────
11. transition1.source.connect() ✗ never executed
12. transition1.target.connect() ✗ never executed
... (more operations)
Result: Graph contains partial transformation with no transitions. Re-running would create duplicate transformation, rationales, and components.
Design Goals
- Resumability: Detect incomplete operations and complete them
- Idempotency: Re-running same operation produces same result (no duplicates)
- Multi-agent support: Each agent tracks its own work independently
- Minimal overhead: Don't complicate simple operations
- Graph-native: Use the graph DB itself as the state tracker (no external dependencies)
Proposed Solution: Operation Tracking Pattern
Core Concept
Before doing work, create an Operation node that tracks the unit of work. Link all created artifacts to it. Mark complete when done. On failure/restart, query pending operations and resume.
┌─────────────────────────────────────────────────────┐
│ 1. Create Operation(status="pending") │
│ 2. Do LLM work (expensive, cache results on op) │
│ 3. Create nodes, link each to Operation │
│ 4. Create relationships │ ← Failure here?
│ 5. Mark Operation(status="complete") │ Query op.artifacts,
└─────────────────────────────────────────────────────┘ complete what's missing
Operation Node
class Operation(BaseNode):
"""
Tracks a unit of work for resumability and authorship.
The Operation node lives in the same graph database as domain nodes,
providing atomic state tracking without external dependencies.
"""
# What kind of work
operation_type: str # "action_reflection", "synthesis", "polarity", etc.
# Lifecycle
status: str = "pending" # pending → complete → failed
started_at: datetime # When operation began
completed_at: Optional[datetime] # When operation finished (if complete)
# Authorship
created_by: str # Agent/app identifier (e.g., "agent-123", "app-main")
# Target
target_uid: str # The node being operated on (e.g., WisdomUnit uid)
target_type: str # The type of target node (e.g., "WisdomUnit")
# Cached LLM results (to avoid re-running expensive calls on resume)
cached_results: Optional[str] # JSON-serialized LLM response data
# All nodes created during this operation
artifacts: ClassVar[RelationshipManager[BaseNode]] = RelationshipTo(
"BaseNode",
"CREATED_ARTIFACT"
)Idempotency Key
Operations are uniquely identified by the combination:
idempotency_key = hash(created_by + operation_type + target_uid)
This ensures:
- Same agent doing same operation on same target → finds existing operation
- Different agent doing same operation → creates separate operation
- Same agent doing different operation → creates separate operation
Implementation Plan
Phase 1: Operation Node Foundation
1.1 Create Operation node class
# src/dialectical_framework/graph/nodes/operation.py
from __future__ import annotations
from datetime import datetime
from typing import ClassVar, Optional, TYPE_CHECKING
from dialectical_framework.graph.nodes.base_node import BaseNode
from dialectical_framework.graph.relationship_manager import (
RelationshipManager,
RelationshipTo,
)
if TYPE_CHECKING:
pass
class Operation(BaseNode):
"""Tracks a unit of work for resumability and authorship."""
operation_type: str
status: str = "pending" # pending, complete, failed
started_at: datetime
completed_at: Optional[datetime] = None
created_by: str
target_uid: str
target_type: str
cached_results: Optional[str] = None # JSON serialized
error_message: Optional[str] = None # If status=failed
artifacts: ClassVar[RelationshipManager[BaseNode]] = RelationshipTo(
"BaseNode",
"CREATED_ARTIFACT",
cardinality=(0, None),
)1.2 Create OperationManager service
# src/dialectical_framework/graph/operation_manager.py
from __future__ import annotations
import json
from datetime import datetime
from typing import Any, Optional, TYPE_CHECKING
from dependency_injector.wiring import Provide, inject
from gqlalchemy import Memgraph, Neo4j
from dialectical_framework.enums.di import DI
from dialectical_framework.graph.nodes.operation import Operation
if TYPE_CHECKING:
from dialectical_framework.graph.nodes.base_node import BaseNode
class OperationManager:
"""Manages operation lifecycle for idempotent, resumable work."""
@inject
def find_pending(
self,
agent_id: str,
operation_type: str,
target_uid: str,
graph_db: Union[Memgraph, Neo4j] = Provide[DI.graph_db],
) -> Optional[Operation]:
"""Find an incomplete operation for this agent/type/target."""
query = """
MATCH (op:Operation {
created_by: $agent_id,
operation_type: $op_type,
target_uid: $target_uid,
status: 'pending'
})
RETURN op
LIMIT 1
"""
results = list(graph_db.execute_and_fetch(query, {
"agent_id": agent_id,
"op_type": operation_type,
"target_uid": target_uid,
}))
if results:
return results[0]["op"]
return None
def start(
self,
agent_id: str,
operation_type: str,
target_uid: str,
target_type: str,
) -> Operation:
"""Start a new operation (or return existing pending one)."""
# Check for existing pending operation
existing = self.find_pending(agent_id, operation_type, target_uid)
if existing:
return existing
# Create new operation
operation = Operation(
operation_type=operation_type,
status="pending",
started_at=datetime.utcnow(),
created_by=agent_id,
target_uid=target_uid,
target_type=target_type,
)
operation.save()
return operation
def cache_results(self, operation: Operation, results: dict[str, Any]) -> None:
"""Cache LLM results on operation for resume capability."""
operation.cached_results = json.dumps(results)
operation.save()
def get_cached_results(self, operation: Operation) -> Optional[dict[str, Any]]:
"""Retrieve cached LLM results."""
if operation.cached_results:
return json.loads(operation.cached_results)
return None
def track_artifact(self, operation: Operation, node: BaseNode) -> None:
"""Link a created node to this operation."""
operation.artifacts.connect(node)
def complete(self, operation: Operation) -> None:
"""Mark operation as successfully completed."""
operation.status = "complete"
operation.completed_at = datetime.utcnow()
operation.save()
def fail(self, operation: Operation, error: str) -> None:
"""Mark operation as failed with error message."""
operation.status = "failed"
operation.completed_at = datetime.utcnow()
operation.error_message = error
operation.save()
@inject
def get_artifacts_by_type(
self,
operation: Operation,
node_type: str,
graph_db: Union[Memgraph, Neo4j] = Provide[DI.graph_db],
) -> list[BaseNode]:
"""Get all artifacts of a specific type from this operation."""
query = """
MATCH (op:Operation {uid: $op_uid})-[:CREATED_ARTIFACT]->(n)
WHERE $node_type IN labels(n)
RETURN n
"""
results = list(graph_db.execute_and_fetch(query, {
"op_uid": operation.uid,
"node_type": node_type,
}))
return [r["n"] for r in results]
@inject
def cleanup_failed(
self,
agent_id: str,
graph_db: Union[Memgraph, Neo4j] = Provide[DI.graph_db],
) -> int:
"""Clean up failed operations and their orphaned artifacts."""
# Find failed operations for this agent
query = """
MATCH (op:Operation {created_by: $agent_id, status: 'failed'})
OPTIONAL MATCH (op)-[:CREATED_ARTIFACT]->(artifact)
DETACH DELETE op, artifact
RETURN count(op) as deleted_count
"""
results = list(graph_db.execute_and_fetch(query, {"agent_id": agent_id}))
return results[0]["deleted_count"] if results else 0Phase 2: Integration with Existing Operations
2.1 Update ThinkActionReflection
# Simplified example of updated think() method
async def think(
self,
focus: WheelSegment,
agent_id: str = "default"
) -> list[Transition]:
wu = self._wheel.wisdom_unit_at(focus)
op_manager = OperationManager()
# Start or resume operation
operation = op_manager.start(
agent_id=agent_id,
operation_type="action_reflection",
target_uid=wu.uid,
target_type="WisdomUnit",
)
try:
# Check for cached LLM results (resume case)
cached = op_manager.get_cached_results(operation)
if cached:
dc_deck_dto = DialecticalComponentsDeckDto(**cached["dc_deck"])
reciprocal_sol_dto = ReciprocalSolutionDto(**cached["reciprocal"])
else:
# Do LLM work (expensive)
dc_deck_dto, reciprocal_sol_dto = await asyncio.gather(
self.action_reflection(focus=wu),
self.reciprocal_solution(focus=wu)
)
# Cache results immediately
op_manager.cache_results(operation, {
"dc_deck": dc_deck_dto.dict(),
"reciprocal": reciprocal_sol_dto.dict(),
})
# Check what artifacts already exist (resume case)
existing_transformations = op_manager.get_artifacts_by_type(
operation, "Transformation"
)
existing_transitions = op_manager.get_artifacts_by_type(
operation, "Transition"
)
# Create only what's missing...
if not existing_transformations:
transformation = Transformation()
transformation.save()
op_manager.track_artifact(operation, transformation)
else:
transformation = existing_transformations[0]
# ... continue with remaining work, tracking each artifact ...
# Mark complete when all done
op_manager.complete(operation)
return transitions
except Exception as e:
op_manager.fail(operation, str(e))
raise2.2 Update other operations similarly
Apply same pattern to:
think_polarity.py- Polarity analysisthink_causality.py- Causality analysisthink_synthesis.py- Synthesis generationthink_constructive_convergence_auditor.py- Auditing
Phase 3: Multi-Agent Query Support
3.1 Add query methods for agent coordination
# Additional methods on OperationManager
@inject
def get_my_pending_operations(
self,
agent_id: str,
graph_db: Union[Memgraph, Neo4j] = Provide[DI.graph_db],
) -> list[Operation]:
"""Get all pending operations for this agent."""
query = """
MATCH (op:Operation {created_by: $agent_id, status: 'pending'})
RETURN op
ORDER BY op.started_at
"""
results = list(graph_db.execute_and_fetch(query, {"agent_id": agent_id}))
return [r["op"] for r in results]
@inject
def is_target_being_worked_on(
self,
target_uid: str,
operation_type: str,
exclude_agent: Optional[str] = None,
graph_db: Union[Memgraph, Neo4j] = Provide[DI.graph_db],
) -> bool:
"""Check if another agent is working on this target."""
query = """
MATCH (op:Operation {
target_uid: $target_uid,
operation_type: $op_type,
status: 'pending'
})
WHERE op.created_by <> $exclude_agent OR $exclude_agent IS NULL
RETURN count(op) > 0 as in_progress
"""
results = list(graph_db.execute_and_fetch(query, {
"target_uid": target_uid,
"op_type": operation_type,
"exclude_agent": exclude_agent,
}))
return results[0]["in_progress"] if results else FalseUsage Examples
Basic Usage (Single Agent)
# Agent does work, fails, resumes automatically
# First run - fails after creating transformation
await consultant.think(focus=segment, agent_id="agent-1")
# Creates: Operation(pending), Transformation, but crashes before Transitions
# Second run - resumes from cached LLM results
await consultant.think(focus=segment, agent_id="agent-1")
# Finds pending Operation, uses cached results, creates missing Transitions
# Marks Operation completeMulti-Agent Coordination
# Two agents working on different WUs
agent_a_task = consultant_a.think(focus=segment_1, agent_id="agent-a")
agent_b_task = consultant_b.think(focus=segment_2, agent_id="agent-b")
await asyncio.gather(agent_a_task, agent_b_task)
# Each agent tracks its own operations independently
# No collision even if working on same wheelChecking for Conflicts
# Before starting expensive work, check if someone else is already on it
op_manager = OperationManager()
if op_manager.is_target_being_worked_on(wu.uid, "action_reflection", exclude_agent="me"):
# Another agent is already processing this WU
# Could wait, skip, or coordinate
passCleanup Failed Operations
# Periodic cleanup of failed operations
op_manager = OperationManager()
deleted = op_manager.cleanup_failed(agent_id="agent-1")
print(f"Cleaned up {deleted} failed operations")Migration Strategy
Phase 1: Non-Breaking Addition
- Add
Operationnode andOperationManager(no changes to existing code) - Add tests for operation lifecycle
Phase 2: Opt-In Integration
- Add
agent_idparameter to consultant methods (default="default") - Integrate
OperationManagerintoThinkActionReflectionbehind feature flag - Test with single operation type
Phase 3: Full Rollout
- Remove feature flag, enable for all operations
- Integrate into remaining consultant classes
- Add multi-agent coordination queries
- Document agent ID conventions
Graph Schema Changes
New Node Type
(:Operation {
uid: String, -- Unique identifier (UUID)
operation_type: String, -- "action_reflection", "synthesis", etc.
status: String, -- "pending", "complete", "failed"
started_at: DateTime,
completed_at: DateTime?,
created_by: String, -- Agent identifier
target_uid: String, -- Target node UID
target_type: String, -- Target node type
cached_results: String?, -- JSON serialized LLM results
error_message: String? -- Error if failed
})New Relationship Type
(:Operation)-[:CREATED_ARTIFACT]->(:BaseNode)Index Recommendations
CREATE INDEX ON :Operation(created_by, status);
CREATE INDEX ON :Operation(target_uid, operation_type, status);Considerations
What This Solves
- Partial failures: Resume from where you left off
- Duplicate prevention: Idempotent by checking existing operation
- LLM cost savings: Cached results survive failures
- Multi-agent: Each agent's work is isolated and trackable
- Debugging: Clear audit trail of what each agent did
What This Doesn't Solve
- Concurrent writes to same node: Still need application-level coordination
- Cross-agent dependencies: If agent B depends on agent A's output, need separate coordination
- Long-running operation timeouts: No built-in timeout/stale detection (could add)
Future Enhancements
- Timeout detection: Mark operations stale after N minutes
- Operation dependencies: Model "this op depends on that op completing"
- Distributed locking: Prevent concurrent work on same target
- Operation history: Keep completed operations for audit trail (vs deleting)
References
- Saga Pattern: Distributed transaction management in microservices
- Idempotency Keys: Stripe, AWS - ensuring exactly-once processing
- Event Sourcing: Store events, derive state, replay on failure
- Kubernetes Controller Pattern: Desired state vs actual state reconciliation