Skip to content

Composition Pipeline

skobeltsyn edited this page Mar 28, 2026 · 1 revision

Composition: Pipeline (then)

Overview

A Pipeline chains agents sequentially. The output of each agent becomes the input of the next. You build pipelines with the then infix operator:

A then B then C

The type system enforces connectivity: if agent A produces type B_OUT and agent B accepts type B_IN, then A then B compiles only when B_OUT == B_IN. This gives you a compile-time guarantee that data flows correctly through every stage.

A Pipeline<IN, OUT> is itself an invokable function. Call it with pipeline(input) and it returns the final output.

Type Signature

The framework provides then overloads for every combination of composition primitives. Here are all the signatures:

// Agent + Agent -> Pipeline
infix fun <A, B, C> Agent<A, B>.then(other: Agent<B, C>): Pipeline<A, C>

// Pipeline + Agent -> Pipeline (extends an existing pipeline)
infix fun <A, B, C> Pipeline<A, B>.then(other: Agent<B, C>): Pipeline<A, C>

// Pipeline + Pipeline -> Pipeline (joins two pipelines)
infix fun <A, B, C> Pipeline<A, B>.then(other: Pipeline<B, C>): Pipeline<A, C>

// Agent/Pipeline + Parallel -> Pipeline (fan-out, output becomes List)
infix fun <A, B, C> Agent<A, B>.then(other: Parallel<B, C>): Pipeline<A, List<C>>
infix fun <A, B, C> Pipeline<A, B>.then(other: Parallel<B, C>): Pipeline<A, List<C>>

// Parallel + Agent/Pipeline -> Pipeline (aggregation after fan-out)
infix fun <A, B, C> Parallel<A, B>.then(other: Agent<List<B>, C>): Pipeline<A, C>
infix fun <A, B, C> Parallel<A, B>.then(other: Pipeline<List<B>, C>): Pipeline<A, C>

// Agent/Pipeline + Loop -> Pipeline
infix fun <A, B, C> Agent<A, B>.then(other: Loop<B, C>): Pipeline<A, C>
infix fun <A, B, C> Pipeline<A, B>.then(other: Loop<B, C>): Pipeline<A, C>

// Loop + Agent/Pipeline -> Pipeline
infix fun <A, B, C> Loop<A, B>.then(other: Agent<B, C>): Pipeline<A, C>
infix fun <A, B, C> Loop<A, B>.then(other: Pipeline<B, C>): Pipeline<A, C>

// Agent/Pipeline + Branch -> Pipeline
infix fun <A, B, C> Agent<A, B>.then(other: Branch<B, C>): Pipeline<A, C>
infix fun <A, B, C> Pipeline<A, B>.then(other: Branch<B, C>): Pipeline<A, C>

// Branch + Agent/Pipeline -> Pipeline
infix fun <A, B, C> Branch<A, B>.then(other: Agent<B, C>): Pipeline<A, C>
infix fun <A, B, C> Branch<A, B>.then(other: Pipeline<B, C>): Pipeline<A, C>

// Agent/Pipeline + Forum -> Pipeline
infix fun <A, B, C> Agent<A, B>.then(other: Forum<B, C>): Pipeline<A, C>
infix fun <A, B, C> Pipeline<A, B>.then(other: Forum<B, C>): Pipeline<A, C>

Basic Example

A three-stage pipeline where a string is uppercased and then wrapped with punctuation:

import agents_engine.core.*
import agents_engine.composition.pipeline.then

data class Input(val v: String)
data class Middle(val v: String)
data class Output(val v: String)

val upper = agent<Input, Middle>("upper") {
    skills {
        skill<Input, Middle>("upper") {
            implementedBy { Middle(it.v.uppercase()) }
        }
    }
}

val exclaim = agent<Middle, Output>("exclaim") {
    skills {
        skill<Middle, Output>("exclaim") {
            implementedBy { Output("${it.v}!") }
        }
    }
}

val pipeline = upper then exclaim
val result = pipeline(Input("hello"))
// result == Output("HELLO!")

Longer pipelines chain naturally with type inference:

data class A(val v: Int)
data class B(val v: Int)
data class C(val v: Int)
data class D(val v: Int)

val a = agent<A, B>("a") { skills { skill<A, B>("a") { implementedBy { B(it.v + 1) } } } }
val b = agent<B, C>("b") { skills { skill<B, C>("b") { implementedBy { C(it.v * 2) } } } }
val c = agent<C, D>("c") { skills { skill<C, D>("c") { implementedBy { D(it.v - 3) } } } }

val pipeline = a then b then c
val result = pipeline(A(1))
// A(1) -> B(2) -> C(4) -> D(1)

Mixed Compositions

Pipelines serve as the backbone that connects every other composition primitive. You can embed Parallel, Loop, Branch, and Forum segments directly:

// Parallel inside a pipeline: fan-out then aggregate
val pipeline = prepare then (reviewerA / reviewerB / reviewerC) then aggregator

// Loop inside a pipeline: iterate a middle stage
val loop = refine.loop { result -> if (result.score >= 90) null else result }
val pipeline = parse then loop then format

// Branch inside a pipeline: conditional routing
val branch = classifier.branch {
    on<Circle>()    then circleHandler
    on<Rectangle>() then rectangleHandler
}
val pipeline = preprocess then branch then postprocess

Joining Two Pipelines

You can combine independently defined pipelines into a larger one:

val frontend = specMaster then coderMaster then reviewMaster
val backend  = productionManager then machineManager

val total: Pipeline<SpecAsk, MachineryResult> = frontend then backend

The type system verifies that the output of frontend matches the input of backend.

Common Patterns

ETL pipeline -- Extract, transform, load in three clean stages:

val etl = extract then transform then load
val result = etl(DataSource("s3://bucket/file.csv"))

Review chain -- Multiple review stages in sequence:

val reviewed = draft then codeReview then securityReview then finalApproval

Validation pipeline -- Each stage validates or enriches:

val validated = parse then validateSchema then enrichDefaults then freeze

Agent Placement Rules

Each agent instance can participate in exactly one composition. Once an agent is placed in a pipeline, it cannot be reused in another pipeline, parallel, forum, or loop. This prevents accidental shared mutable state. If you need the same logic in two places, create two agent instances:

val a = agent<A, B>("a") {}
val b = agent<B, C>("b") {}

a then b  // a is now placed

// This would throw IllegalArgumentException:
// a then anotherAgent

// Instead, create a new instance:
val a2 = agent<A, B>("a") {}
a2 then anotherAgent  // works fine

Next: Parallel | Loop | Branch | Forum | While Loops

Clone this wiki locally