Skip to content

Commit 99ca2cc

Browse files
committed
wip
1 parent 50dd552 commit 99ca2cc

File tree

14 files changed

+761
-442
lines changed

14 files changed

+761
-442
lines changed

src/Agents/Agent.php

Lines changed: 84 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,28 @@
1414
use Cortex\Memory\ChatMemory;
1515
use UnexpectedValueException;
1616
use Cortex\Contracts\Pipeable;
17+
use Cortex\Events\PipelineEnd;
18+
use Cortex\Events\AgentStepEnd;
1719
use Cortex\LLM\Data\ChatResult;
20+
use Cortex\Events\PipelineError;
1821
use Cortex\LLM\Enums\ToolChoice;
22+
use Cortex\Events\AgentStepError;
23+
use Cortex\Events\AgentStepStart;
1924
use Cortex\LLM\Contracts\Message;
2025
use Cortex\Memory\Contracts\Store;
2126
use Cortex\Pipeline\RuntimeConfig;
2227
use Cortex\Support\Traits\CanPipe;
2328
use Cortex\Agents\Stages\AppendUsage;
2429
use Cortex\LLM\Data\ChatStreamResult;
30+
use Cortex\Events\Contracts\AgentEvent;
2531
use Cortex\Exceptions\GenericException;
2632
use Cortex\Memory\Stores\InMemoryStore;
2733
use Cortex\Agents\Stages\HandleToolCalls;
2834
use Cortex\JsonSchema\Types\ObjectSchema;
2935
use Cortex\LLM\Enums\StructuredOutputMode;
3036
use Cortex\JsonSchema\Contracts\JsonSchema;
3137
use Cortex\LLM\Data\Messages\SystemMessage;
38+
use Cortex\Support\Traits\DispatchesEvents;
3239
use Illuminate\Contracts\Support\Arrayable;
3340
use Cortex\Agents\Stages\AddMessageToMemory;
3441
use Cortex\LLM\Contracts\LLM as LLMContract;
@@ -41,6 +48,7 @@
4148
class Agent implements Pipeable
4249
{
4350
use CanPipe;
51+
use DispatchesEvents;
4452

4553
protected LLMContract $llm;
4654

@@ -95,19 +103,17 @@ public function __construct(
95103
public function pipeline(bool $shouldParseOutput = true): Pipeline
96104
{
97105
$tools = Utils::toToolCollection($this->getTools());
98-
99-
return $this->executionPipeline($shouldParseOutput)
100-
->when(
101-
$tools->isNotEmpty(),
102-
fn(Pipeline $pipeline): Pipeline => $pipeline->pipe(
103-
new HandleToolCalls(
104-
$tools,
105-
$this->memory,
106-
$this->executionPipeline($shouldParseOutput),
107-
$this->maxSteps,
108-
),
109-
),
110-
);
106+
$executionPipeline = $this->executionPipeline($shouldParseOutput);
107+
108+
return $executionPipeline->when(
109+
$tools->isNotEmpty(),
110+
fn(Pipeline $pipeline): Pipeline => $pipeline->pipe(new HandleToolCalls(
111+
$tools,
112+
$this->memory,
113+
$executionPipeline,
114+
$this->maxSteps,
115+
)),
116+
);
111117
}
112118

113119
/**
@@ -116,13 +122,28 @@ public function pipeline(bool $shouldParseOutput = true): Pipeline
116122
public function executionPipeline(bool $shouldParseOutput = true): Pipeline
117123
{
118124
return $this->prompt
125+
->pipe(function ($payload, RuntimeConfig $config, $next) {
126+
$this->dispatchEvent(new AgentStepStart($this, 1));
127+
128+
return $next($payload, $config);
129+
})
119130
->pipe($this->llm->shouldParseOutput($shouldParseOutput))
120131
->pipe(new AddMessageToMemory($this->memory))
121132
->pipe(new AppendUsage($this->usage))
122133
->pipe(function ($payload, RuntimeConfig $config, $next) {
123134
$config->context->set('execution_step', $config->context->get('execution_step', 0) + 1);
135+
$this->dispatchEvent(new AgentStepEnd($this, $config->context->get('execution_step')));
124136

125137
return $next($payload, $config);
138+
})
139+
->onError(function (PipelineError $event): void {
140+
$this->dispatchEvent(
141+
new AgentStepError(
142+
$this,
143+
$event->config->context->get('execution_step'),
144+
$event->exception,
145+
),
146+
);
126147
});
127148
}
128149

@@ -218,6 +239,35 @@ public function getUsage(): Usage
218239
return $this->usage;
219240
}
220241

242+
public function getRuntimeConfig(): ?RuntimeConfig
243+
{
244+
return $this->runtimeConfig;
245+
}
246+
247+
/**
248+
* Register a listener for the start of an agent step.
249+
*/
250+
public function onStepStart(Closure $listener): self
251+
{
252+
return $this->on(AgentStepStart::class, $listener);
253+
}
254+
255+
/**
256+
* Register a listener for the end of an agent step.
257+
*/
258+
public function onStepEnd(Closure $listener): self
259+
{
260+
return $this->on(AgentStepEnd::class, $listener);
261+
}
262+
263+
/**
264+
* Register a listener for the error of an agent step.
265+
*/
266+
public function onStepError(Closure $listener): self
267+
{
268+
return $this->on(AgentStepError::class, $listener);
269+
}
270+
221271
/**
222272
* @param array<int, \Cortex\LLM\Contracts\Message> $messages
223273
* @param array<string, mixed> $input
@@ -230,26 +280,24 @@ protected function invokePipeline(
230280
?RuntimeConfig $config = null,
231281
bool $streaming = false,
232282
): ChatResult|ChatStreamResult {
233-
$this->memory->setVariables([
234-
...$this->initialPromptVariables,
235-
...$input,
236-
]);
237-
238-
$messages = $this->memory->getMessages()->merge($messages);
239-
$this->memory->setMessages($messages);
240-
241-
$pipeline = $streaming
242-
? $this->pipeline->enableStreaming()
243-
: $this->pipeline;
244-
245-
return $pipeline->pipe(function ($payload, RuntimeConfig $config, $next) {
246-
$this->runtimeConfig = $config;
247-
248-
return $next($payload, $config);
249-
})->invoke([
283+
$this->memory
284+
->setMessages($this->memory->getMessages()->merge($messages))
285+
->setVariables([
286+
...$this->initialPromptVariables,
287+
...$input,
288+
]);
289+
290+
$payload = [
250291
...$input,
251292
'messages' => $this->memory->getMessages(),
252-
], $config);
293+
];
294+
295+
return $this->pipeline
296+
->enableStreaming($streaming)
297+
->onEnd(function (PipelineEnd $event): void {
298+
$this->runtimeConfig = $event->config;
299+
})
300+
->invoke($payload, $config);
253301
}
254302

255303
/**
@@ -349,4 +397,9 @@ protected static function buildOutput(ObjectSchema|array|string|null $output): O
349397

350398
return $output;
351399
}
400+
401+
protected function eventBelongsToThisInstance(object $event): bool
402+
{
403+
return $event instanceof AgentEvent && $event->agent === $this;
404+
}
352405
}

src/Agents/Stages/AddMessageToMemory.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public function handlePipeable(mixed $payload, RuntimeConfig $config, Closure $n
3232

3333
if ($message !== null) {
3434
$this->memory->addMessage($message);
35+
$config->context->set('message_history', $this->memory->getMessages());
3536
}
3637

3738
return $next($payload, $config);

src/Events/AgentStepEnd.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Cortex\Events;
6+
7+
use Cortex\Agents\Agent;
8+
use Cortex\Events\Contracts\AgentEvent;
9+
10+
readonly class AgentStepEnd implements AgentEvent
11+
{
12+
public function __construct(
13+
public Agent $agent,
14+
public int $step,
15+
) {}
16+
}

src/Events/AgentStepError.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Cortex\Events;
6+
7+
use Throwable;
8+
use Cortex\Agents\Agent;
9+
use Cortex\Events\Contracts\AgentEvent;
10+
11+
readonly class AgentStepError implements AgentEvent
12+
{
13+
public function __construct(
14+
public Agent $agent,
15+
public int $step,
16+
public Throwable $exception,
17+
) {}
18+
}

src/Events/AgentStepStart.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Cortex\Events;
6+
7+
use Cortex\Agents\Agent;
8+
use Cortex\Events\Contracts\AgentEvent;
9+
10+
readonly class AgentStepStart implements AgentEvent
11+
{
12+
public function __construct(
13+
public Agent $agent,
14+
public int $step,
15+
) {}
16+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Cortex\Events\Contracts;
6+
7+
use Cortex\Agents\Agent;
8+
9+
interface AgentEvent
10+
{
11+
public Agent $agent { get; }
12+
}

src/Http/Controllers/AgentsController.php

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
use Throwable;
99
use Cortex\Cortex;
1010
use Illuminate\Http\Request;
11+
use Cortex\Events\AgentStepEnd;
12+
use Cortex\Events\AgentStepError;
13+
use Cortex\Events\AgentStepStart;
1114
use Illuminate\Http\JsonResponse;
1215
use Illuminate\Routing\Controller;
1316
use Symfony\Component\HttpFoundation\StreamedResponse;
@@ -18,23 +21,33 @@ public function invoke(string $agent, Request $request): JsonResponse
1821
{
1922
try {
2023
$agent = Cortex::agent($agent);
24+
$agent->onStepStart(function (AgentStepStart $event): void {
25+
// dump(sprintf('step start: %d', $event->step));
26+
});
27+
$agent->onStepEnd(function (AgentStepEnd $event): void {
28+
// dump(sprintf('step end: %d', $event->step));
29+
});
30+
$agent->onStepError(function (AgentStepError $event): void {
31+
// dump(sprintf('step error: %d, %s', $event->step, $event->exception->getMessage()));
32+
});
2133
$result = $agent->invoke(input: $request->all());
2234
} catch (Throwable $e) {
2335
return response()->json([
2436
'error' => $e->getMessage(),
2537
], 500);
2638
}
2739

28-
dd([
29-
'result' => $result->toArray(),
30-
'memory' => $agent->getMemory()->getMessages()->toArray(),
31-
'total_usage' => $agent->getUsage()->toArray(),
32-
]);
40+
// dd([
41+
// 'result' => $result->toArray(),
42+
// 'memory' => $agent->getMemory()->getMessages()->toArray(),
43+
// 'total_usage' => $agent->getUsage()->toArray(),
44+
// ]);
3345

3446
return response()->json([
3547
'result' => $result,
36-
'memory' => $agent->getMemory()->getMessages()->toArray(),
37-
'total_usage' => $agent->getUsage()->toArray(),
48+
'config' => $agent->getRuntimeConfig()?->toArray(),
49+
// 'memory' => $agent->getMemory()->getMessages()->toArray(),
50+
// 'total_usage' => $agent->getUsage()->toArray(),
3851
]);
3952
}
4053

@@ -43,6 +56,9 @@ public function stream(string $agent, Request $request): StreamedResponse
4356
$result = Cortex::agent($agent)->stream(input: $request->all());
4457

4558
try {
59+
// foreach ($result->flatten(1) as $chunk) {
60+
// dump(sprintf('%s: %s', $chunk->type->value, $chunk->message->content));
61+
// }
4662
return $result->streamResponse();
4763
} catch (Exception $e) {
4864
dd($e);

src/Pipeline.php

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Cortex\Events\PipelineStart;
1717
use Cortex\Contracts\OutputParser;
1818
use Cortex\Pipeline\RuntimeConfig;
19+
use Cortex\Events\Contracts\StageEvent;
1920
use Illuminate\Support\Traits\Dumpable;
2021
use Cortex\Events\Contracts\PipelineEvent;
2122
use Cortex\Support\Traits\DispatchesEvents;
@@ -122,10 +123,10 @@ public function handlePipeable(mixed $payload, RuntimeConfig $config, Closure $n
122123
/**
123124
* Enable streaming for all LLMs in the pipeline.
124125
*/
125-
public function enableStreaming(): self
126+
public function enableStreaming(bool $streaming = true): self
126127
{
127128
foreach ($this->getStages() as $stage) {
128-
$this->setLLMStreaming($stage, true);
129+
$this->setLLMStreaming($stage, $streaming);
129130
}
130131

131132
return $this;
@@ -136,11 +137,7 @@ public function enableStreaming(): self
136137
*/
137138
public function disableStreaming(): self
138139
{
139-
foreach ($this->getStages() as $stage) {
140-
$this->setLLMStreaming($stage, false);
141-
}
142-
143-
return $this;
140+
return $this->enableStreaming(false);
144141
}
145142

146143
/**
@@ -156,7 +153,10 @@ public function output(OutputParser $parser): self
156153
*/
157154
protected function eventBelongsToThisInstance(object $event): bool
158155
{
159-
return $event instanceof PipelineEvent && $event->pipeline === $this;
156+
return match (true) {
157+
$event instanceof PipelineEvent, $event instanceof StageEvent => $event->pipeline === $this,
158+
default => false,
159+
};
160160
}
161161

162162
/**

tests/Unit/LLM/Drivers/Anthropic/AnthropicChatTest.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Cortex\Cortex;
88
use Cortex\LLM\Data\Usage;
99
use Cortex\Attributes\Tool;
10+
use Cortex\Tools\SchemaTool;
1011
use Cortex\JsonSchema\Schema;
1112
use Cortex\LLM\Data\ToolCall;
1213
use Cortex\LLM\Data\ChatResult;
@@ -172,7 +173,7 @@
172173
[
173174
'type' => 'tool_use',
174175
'id' => 'call_123',
175-
'name' => 'Person',
176+
'name' => SchemaTool::NAME,
176177
'input' => [
177178
'name' => 'John Doe',
178179
'age' => 30,
@@ -212,7 +213,7 @@
212213
&& $parameters['messages'][0]['role'] === 'user'
213214
&& $parameters['messages'][0]['content'] === 'Tell me about a person'
214215
&& $parameters['tools'][0]['type'] === 'custom'
215-
&& $parameters['tools'][0]['name'] === 'Person'
216+
&& $parameters['tools'][0]['name'] === SchemaTool::NAME
216217
&& $parameters['tools'][0]['input_schema']['properties']['name']['type'] === 'string'
217218
&& $parameters['tools'][0]['input_schema']['properties']['age']['type'] === 'integer';
218219
});

0 commit comments

Comments
 (0)