Skip to content

Consider streaming-first architecture for subagents and background operations #59

@evansenter

Description

@evansenter

Context

Clemini currently uses manual Delta accumulation in a loop:

loop {
    tokio::select! {
        Some(event) = stream.next() => { /* accumulate function calls */ }
        _ = cancellation_token.cancelled() => { break; }
    }
}
// Execute tools after stream completes

This works well for the current single-agent, sequential model. However, if we want to add:

  • Subagents - spawning child agents for parallel work
  • Background bash - commands that run while conversation continues
  • Parallel tool execution - fire multiple tools at once
  • File watchers - reactive events during session

...the manual loop approach becomes unwieldy with N concurrent sources.

Related Issues

These are the specific features that would benefit from streaming-first architecture:

Feature Issue Status
Subagents (RFC) #3 🔮 RFC
Task tool (implementation) #78 📋 Ready to implement
Background bash #13, #69, #70 ✅ Implemented
Parallel tool execution #76 🔮 Speculative
File watchers #77 🔮 Speculative

Note: #78 (Task tool) can be implemented today using process-based approach—no streaming-first refactor needed. See comment below for details.

Proposed Pattern: Stream<Item = Result<Event>>

Instead of manual accumulation, agents return a stream of events:

trait Agent {
    fn execute(&self, query: &str, ctx: Context) -> impl Stream<Item = Result<AgentEvent>>;
}

Events could be:

enum AgentEvent {
    // Progress
    TextDelta(String),
    ToolExecuting { name: String, args: Value },
    ToolCompleted { name: String, result: Value, duration: Duration },
    
    // Subagents
    SubagentSpawned { id: String, query: String },
    SubagentEvent { id: String, event: Box<AgentEvent> },
    
    // Background operations  
    BackgroundStarted { id: String, command: String },
    BackgroundOutput { id: String, chunk: String },
    BackgroundCompleted { id: String, exit_code: i32 },
    
    // Completion
    FinalResult { content: String, metadata: Value },
    
    // Extensible
    Custom { event_type: String, data: Value },
}

Why this helps

Multiple concurrent sources merge cleanly:

let merged = select_all(vec![
    main_agent.execute(query, ctx).boxed(),
    subagent.execute(sub_query, ctx).boxed(),
    background_monitor.boxed(),
]);

while let Some(event) = merged.next().await {
    // Uniform handling regardless of source
}

Hierarchical cancellation: Parent stream dropping cancels children via standard Rust drop semantics.

Composability: Interceptors/middleware wrap streams without changing agent code:

let logged = agent.execute(query, ctx)
    .inspect(|e| log::debug!("{:?}", e));

Reference Implementation

Gemicro uses this pattern in its agent trait:

Key patterns from gemicro:

  1. AgentUpdate::custom(event_type, message, data) for soft-typed extensibility
  2. AgentUpdate::final_result() signals completion
  3. Unknown event types are logged and ignored, not errors

Trade-offs

Aspect Current (manual loop) Streaming-first
Simplicity ✓ Simpler for single-agent More abstraction
Cancellation ✓ Fine-grained control Cooperative via drop
Concurrency ✗ Doesn't scale ✓ Merges cleanly
genai-rs integration ✓ Direct Needs adapter layer

Recommendation

Don't implement this now—the current approach is right for clemini's current scope. But if/when we add subagents or background operations, this pattern becomes valuable enough to justify the refactor.

This issue exists to document the pattern for future reference.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions