Skip to content

Commit 9e97081

Browse files
committed
wip
1 parent 15534eb commit 9e97081

File tree

11 files changed

+96
-67
lines changed

11 files changed

+96
-67
lines changed

src/Agents/Agent.php

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -375,28 +375,21 @@ protected function invokePipeline(
375375
'messages' => $this->memory->getMessages(),
376376
];
377377

378-
$pipeline = $this->pipeline
378+
return $this->pipeline
379379
->enableStreaming($streaming)
380380
->onStart(function (PipelineStart $event): void {
381381
$this->withRuntimeConfig($event->config);
382382
$this->dispatchEvent(new AgentStart($this, $this->runtimeConfig));
383383
})
384-
->when($streaming, function (Pipeline $pipeline): Pipeline {
385-
return $pipeline->onLastLLMStreamEnd(function (): void {
386-
$this->dispatchEvent(new AgentEnd($this, $this->runtimeConfig));
387-
});
388-
}, function (Pipeline $pipeline): Pipeline {
389-
return $pipeline->onEnd(function (PipelineEnd $event): void {
390-
$this->withRuntimeConfig($event->config);
391-
$this->dispatchEvent(new AgentEnd($this, $this->runtimeConfig));
392-
});
384+
->onEnd(function (PipelineEnd $event): void {
385+
// $this->withRuntimeConfig($event->config);
386+
$this->dispatchEvent(new AgentEnd($this, $this->runtimeConfig));
393387
})
394388
->onError(function (PipelineError $event): void {
395389
$this->withRuntimeConfig($event->config);
396390
$this->dispatchEvent(new AgentStepError($this, $event->exception, $event->config));
397-
});
398-
399-
return $pipeline->invoke($payload, $config);
391+
})
392+
->invoke($payload, $config);
400393
}
401394

402395
/**

src/Agents/Stages/HandleToolCalls.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
use Closure;
88
use Cortex\Pipeline;
9-
use Cortex\Agents\Data\Step;
109
use Cortex\Contracts\Pipeable;
1110
use Cortex\LLM\Data\ChatResult;
1211
use Cortex\Contracts\ChatMemory;

src/Agents/Stages/TrackAgentStepEnd.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@
55
namespace Cortex\Agents\Stages;
66

77
use Closure;
8-
use DateTimeImmutable;
98
use Cortex\Agents\Agent;
109
use Cortex\Contracts\Pipeable;
11-
use Cortex\Events\AgentStepEnd;
1210
use Cortex\LLM\Data\ChatResult;
1311
use Cortex\LLM\Enums\ChunkType;
1412
use Cortex\Pipeline\RuntimeConfig;

src/Agents/Stages/TrackAgentStepStart.php

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,9 @@
55
namespace Cortex\Agents\Stages;
66

77
use Closure;
8-
use DateTimeImmutable;
98
use Cortex\Agents\Agent;
10-
use Cortex\Agents\Data\Step;
119
use Cortex\Contracts\Pipeable;
1210
use Cortex\LLM\Enums\ChunkType;
13-
use Cortex\Events\AgentStepStart;
1411
use Cortex\Pipeline\RuntimeConfig;
1512
use Cortex\Support\Traits\CanPipe;
1613
use Cortex\LLM\Data\ChatGenerationChunk;

src/Http/Controllers/AgentsController.php

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public function stream(string $agent, Request $request): void// : StreamedRespon
8989
$agent->onStepError(function (AgentStepError $event): void {
9090
dump('-- step error --');
9191
});
92-
$agent->onChunk(function (ChatModelStream $event): void {
92+
// $agent->onChunk(function (ChatModelStream $event): void {
9393
// dump($event->chunk->type->value);
9494
// $toolCalls = $event->chunk->message->toolCalls;
9595

@@ -98,14 +98,13 @@ public function stream(string $agent, Request $request): void// : StreamedRespon
9898
// } else {
9999
// dump(sprintf('chunk: %s', $event->chunk->message->content));
100100
// }
101-
102-
});
101+
// });
103102
$result = $agent->stream(input: $request->all());
104103

105-
// dd(iterator_to_array($result->flatten(1)));
104+
// dd(iterator_to_array($result, false));
106105

107106
try {
108-
foreach ($result->flatten(1) as $chunk) {
107+
foreach ($result as $chunk) {
109108
dump($chunk->type->value);
110109
// dump(sprintf('%s: %s', $chunk->type->value, $chunk->message->content));
111110
}

src/LLM/AbstractLLM.php

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Cortex\LLM;
66

77
use Closure;
8+
use Generator;
89
use BackedEnum;
910
use Cortex\Pipeline;
1011
use Cortex\Support\Utils;
@@ -122,16 +123,11 @@ public function handlePipeable(mixed $payload, RuntimeConfig $config, Closure $n
122123
// Otherwise, we return the message as is.
123124
return $result instanceof ChatStreamResult
124125
? new ChatStreamResult(function () use ($result, $config, $next) {
125-
foreach ($result->flatten(1) as $chunk) {
126+
foreach ($result as $chunk) {
126127
try {
127128
$chunk = $next($chunk, $config);
128129

129-
$config->dispatchEvent(
130-
event: new RuntimeConfigStreamChunk($config, $chunk),
131-
dispatchToGlobalDispatcher: false,
132-
);
133-
134-
yield $chunk;
130+
yield from $this->flattenAndYield($chunk, $config, dispatchEvents: true);
135131
} catch (OutputParserException) {
136132
// Ignore any parsing errors and continue
137133
}
@@ -140,6 +136,27 @@ public function handlePipeable(mixed $payload, RuntimeConfig $config, Closure $n
140136
: $next($result, $config);
141137
}
142138

139+
protected function flattenAndYield(mixed $content, RuntimeConfig $config, bool $dispatchEvents = false): Generator
140+
{
141+
if ($content instanceof ChatStreamResult) {
142+
// When flattening a nested stream, don't dispatch events here
143+
// The inner stream's AbstractLLM already dispatched them
144+
foreach ($content as $chunk) {
145+
yield from $this->flattenAndYield($chunk, $config, dispatchEvents: false);
146+
}
147+
} else {
148+
// Only dispatch events when we're at the top level (not flattening nested streams)
149+
if ($dispatchEvents && $content instanceof ChatGenerationChunk) {
150+
$config->dispatchEvent(
151+
event: new RuntimeConfigStreamChunk($config, $content),
152+
dispatchToGlobalDispatcher: false,
153+
);
154+
}
155+
156+
yield $content;
157+
}
158+
}
159+
143160
public function output(OutputParser $parser): Pipeline
144161
{
145162
return $this->pipe($parser);

src/Pipeline.php

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,9 @@
1212
use Cortex\LLM\Contracts\LLM;
1313
use Cortex\Contracts\Pipeable;
1414
use Cortex\Events\PipelineEnd;
15-
use Cortex\LLM\Enums\ChunkType;
1615
use Cortex\Events\PipelineError;
1716
use Cortex\Events\PipelineStart;
1817
use Cortex\Contracts\OutputParser;
19-
use Cortex\Events\ChatModelStream;
2018
use Cortex\Pipeline\RuntimeConfig;
2119
use Cortex\Events\ChatModelStreamEnd;
2220
use Cortex\Events\Contracts\StageEvent;
@@ -43,6 +41,8 @@ class Pipeline implements Pipeable
4341
*/
4442
protected ?RuntimeConfig $config = null;
4543

44+
protected bool $streaming = false;
45+
4646
public function __construct(Pipeable|Closure ...$stages)
4747
{
4848
$this->stages = $stages;
@@ -165,15 +165,9 @@ public function output(OutputParser $parser): self
165165
return $this->pipe($parser);
166166
}
167167

168-
/**
169-
* Check if an event belongs to this pipeline instance.
170-
*/
171-
protected function eventBelongsToThisInstance(object $event): bool
168+
public function isStreaming(): bool
172169
{
173-
return match (true) {
174-
$event instanceof PipelineEvent, $event instanceof StageEvent => $event->pipeline === $this,
175-
default => false,
176-
};
170+
return $this->streaming;
177171
}
178172

179173
/**
@@ -189,6 +183,10 @@ public function onStart(Closure $listener): self
189183
*/
190184
public function onEnd(Closure $listener): self
191185
{
186+
if ($this->streaming) {
187+
return $this->onLastLLMStreamEnd($listener);
188+
}
189+
192190
return $this->on(PipelineEnd::class, $listener);
193191
}
194192

@@ -224,6 +222,17 @@ public function onStageError(Closure $listener): self
224222
return $this->on(StageError::class, $listener);
225223
}
226224

225+
/**
226+
* Check if an event belongs to this pipeline instance.
227+
*/
228+
protected function eventBelongsToThisInstance(object $event): bool
229+
{
230+
return match (true) {
231+
$event instanceof PipelineEvent, $event instanceof StageEvent => $event->pipeline === $this,
232+
default => false,
233+
};
234+
}
235+
227236
/**
228237
* Create the callable for the current stage.
229238
*/
@@ -281,24 +290,8 @@ protected function setLLMStreaming(mixed $stage, bool $streaming = true): void
281290
$this->setLLMStreaming($subStage, $streaming);
282291
}
283292
}
284-
}
285293

286-
public function onLLMStreamStepEnd(Closure $listener): self
287-
{
288-
$this->onStageEnd(function (StageEnd $event) use ($listener): void {
289-
if ($event->stage instanceof LLM && $event->stage->isStreaming()) {
290-
// $event->stage->onStreamEnd(function (ChatModelStreamEnd $streamEndEvent) use ($listener): void {
291-
// $listener($streamEndEvent);
292-
// });
293-
$event->stage->onStream(function (ChatModelStream $streamEvent) use ($listener): void {
294-
if ($streamEvent->chunk->type === ChunkType::StepEnd) {
295-
$listener($streamEvent);
296-
}
297-
});
298-
}
299-
});
300-
301-
return $this;
294+
$this->streaming = $streaming;
302295
}
303296

304297
/**
@@ -323,11 +316,11 @@ public function onLastLLMStreamEnd(Closure $listener): self
323316

324317
// Register listener on the LLM instance for ChatModelStreamEnd events
325318
// AbstractLLM implements onStreamEnd, so we can call it via method_exists check
326-
$currentLLM->onStreamEnd(function (ChatModelStreamEnd $streamEndEvent) use ($listener, &$streamEndDispatched, &$lastLLM, $currentLLM): void {
319+
$currentLLM->onStreamEnd(function (ChatModelStreamEnd $streamEndEvent) use ($listener, &$streamEndDispatched, &$lastLLM, $currentLLM, $event): void {
327320
// Only dispatch if this is the last LLM we tracked and it hasn't been dispatched yet
328321
if (! $streamEndDispatched && $streamEndEvent->llm === $lastLLM && $streamEndEvent->llm === $currentLLM) {
329322
$streamEndDispatched = true;
330-
$listener($streamEndEvent);
323+
$listener(new PipelineEnd($this, $event->payload, $event->config, $event->result));
331324
}
332325
});
333326
}

src/Pipeline/RuntimeConfig.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ public function __construct(
2828
$this->runId = Str::uuid7()->toString();
2929
}
3030

31-
public function onStreamChunk(Closure $listener): self
31+
public function onStreamChunk(Closure $listener, bool $once = true): self
3232
{
33-
return $this->on(RuntimeConfigStreamChunk::class, $listener);
33+
return $this->on(RuntimeConfigStreamChunk::class, $listener, $once);
3434
}
3535

3636
/**

src/Support/Traits/DispatchesEvents.php

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,12 @@ public function dispatchEvent(object $event, bool $dispatchToGlobalDispatcher =
6161
/**
6262
* Register an instance-specific listener.
6363
*/
64-
public function on(string $eventClass, Closure $listener): static
64+
public function on(string $eventClass, Closure $listener, bool $once = false): static
6565
{
66+
if ($once && $this->hasListener($eventClass)) {
67+
return $this;
68+
}
69+
6670
if (! isset($this->instanceListeners[$eventClass])) {
6771
$this->instanceListeners[$eventClass] = [];
6872
}
@@ -72,6 +76,19 @@ public function on(string $eventClass, Closure $listener): static
7276
return $this;
7377
}
7478

79+
public function hasListener(string $eventClass): bool
80+
{
81+
return isset($this->instanceListeners[$eventClass]) && count($this->instanceListeners[$eventClass]) > 0;
82+
}
83+
84+
/**
85+
* Register an instance-specific listener only if it hasn't been registered yet.
86+
*/
87+
public function once(string $eventClass, Closure $listener): static
88+
{
89+
return $this->on($eventClass, $listener, once: true);
90+
}
91+
7592
/**
7693
* Check if an event belongs to this instance.
7794
*

tests/Unit/Agents/AgentTest.php

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
use Cortex\Agents\Data\Step;
1010
use Cortex\Events\AgentStart;
1111
use Cortex\JsonSchema\Schema;
12+
use Cortex\Events\AgentStepEnd;
1213
use Cortex\LLM\Data\ChatResult;
1314
use Cortex\LLM\Enums\ChunkType;
15+
use Cortex\Events\AgentStepStart;
1416
use Cortex\LLM\Data\ChatStreamResult;
1517
use Cortex\LLM\Data\ToolCallCollection;
1618
use Cortex\LLM\Data\ChatGenerationChunk;
@@ -637,9 +639,9 @@ function (int $x, int $y): int {
637639
->and($endCalled)->toBe(1, 'AgentEnd should be dispatched exactly once');
638640
});
639641

640-
test('it dispatches AgentStart and AgentEnd events only once when streaming with tool calls and multiple steps', function (): void {
642+
test('it dispatches AgentStart, AgentEnd, AgentStepStart, and AgentStepEnd events only once when streaming with tool calls and multiple steps', function (): void {
641643
// This test verifies that even with multiple steps (tool call + final response),
642-
// AgentStart and AgentEnd are only dispatched once
644+
// AgentStart, AgentEnd, AgentStepStart, and AgentStepEnd are only dispatched exactly once.
643645
$multiplyTool = tool(
644646
'multiply',
645647
'Multiply two numbers together',
@@ -666,6 +668,8 @@ function (int $x, int $y): int {
666668

667669
$startCalled = 0;
668670
$endCalled = 0;
671+
$stepStartCalled = 0;
672+
$stepEndCalled = 0;
669673

670674
$agent->onStart(function (AgentStart $event) use ($agent, &$startCalled): void {
671675
$startCalled++;
@@ -677,6 +681,16 @@ function (int $x, int $y): int {
677681
expect($event->agent)->toBe($agent);
678682
});
679683

684+
$agent->onStepStart(function (AgentStepStart $event) use ($agent, &$stepStartCalled): void {
685+
$stepStartCalled++;
686+
expect($event->agent)->toBe($agent);
687+
});
688+
689+
$agent->onStepEnd(function (AgentStepEnd $event) use ($agent, &$stepEndCalled): void {
690+
$stepEndCalled++;
691+
expect($event->agent)->toBe($agent);
692+
});
693+
680694
$result = $agent->stream(input: [
681695
'query' => 'What is 3 times 4?',
682696
]);
@@ -695,7 +709,9 @@ function (int $x, int $y): int {
695709
// Verify final counts after stream is fully consumed
696710
expect($chunkCount)->toBeGreaterThan(0, 'Should have consumed some chunks')
697711
->and($startCalled)->toBe(1, 'AgentStart should be dispatched exactly once even with multiple steps')
698-
->and($endCalled)->toBe(1, 'AgentEnd should be dispatched exactly once even with multiple steps');
712+
->and($endCalled)->toBe(1, 'AgentEnd should be dispatched exactly once even with multiple steps')
713+
->and($stepStartCalled)->toBe(2, 'AgentStepStart should be dispatched exactly twice even with multiple steps')
714+
->and($stepEndCalled)->toBe(2, 'AgentStepEnd should be dispatched exactly twice even with multiple steps');
699715

700716
// Verify runtime config shows multiple steps
701717
$runtimeConfig = $agent->getRuntimeConfig();

0 commit comments

Comments
 (0)