|
5 | 5 | namespace Cortex\Agents\Stages; |
6 | 6 |
|
7 | 7 | use Closure; |
| 8 | +use Generator; |
8 | 9 | use Cortex\Pipeline; |
9 | 10 | use Cortex\Contracts\Pipeable; |
10 | 11 | use Cortex\LLM\Data\ChatResult; |
| 12 | +use Cortex\LLM\Enums\ChunkType; |
11 | 13 | use Cortex\Contracts\ChatMemory; |
12 | 14 | use Cortex\Pipeline\RuntimeConfig; |
13 | 15 | use Cortex\Support\Traits\CanPipe; |
@@ -35,65 +37,118 @@ public function __construct( |
35 | 37 | ) {} |
36 | 38 |
|
37 | 39 | public function handlePipeable(mixed $payload, RuntimeConfig $config, Closure $next): mixed |
| 40 | + { |
| 41 | + return match (true) { |
| 42 | + $payload instanceof ChatGenerationChunk && $payload->type === ChunkType::ToolInputEnd => $this->handleStreamingChunk($payload, $config, $next), |
| 43 | + $payload instanceof ChatStreamResult => $this->handleStreamingResult($payload), |
| 44 | + default => $this->handleNonStreaming($payload, $config, $next), |
| 45 | + }; |
| 46 | + } |
| 47 | + |
| 48 | + /** |
| 49 | + * Handle streaming chunks (individual ChatGenerationChunk objects). |
| 50 | + */ |
| 51 | + protected function handleStreamingChunk(ChatGenerationChunk $chunk, RuntimeConfig $config, Closure $next): mixed |
| 52 | + { |
| 53 | + $processedChunk = $next($chunk, $config); |
| 54 | + |
| 55 | + // Process tool calls if needed |
| 56 | + if ($chunk->message->hasToolCalls() && $this->currentStep++ < $this->maxSteps) { |
| 57 | + $nestedPayload = $this->processToolCalls($chunk, $config); |
| 58 | + |
| 59 | + if ($nestedPayload !== null) { |
| 60 | + // Return stream with ToolInputEnd chunk + nested stream |
| 61 | + // AbstractLLM will yield from this stream |
| 62 | + return new ChatStreamResult(function () use ($processedChunk, $nestedPayload): Generator { |
| 63 | + if ($processedChunk instanceof ChatGenerationChunk) { |
| 64 | + yield $processedChunk; |
| 65 | + } |
| 66 | + |
| 67 | + if ($nestedPayload instanceof ChatStreamResult) { |
| 68 | + foreach ($nestedPayload as $nestedChunk) { |
| 69 | + yield $nestedChunk; |
| 70 | + } |
| 71 | + } |
| 72 | + }); |
| 73 | + } |
| 74 | + } |
| 75 | + |
| 76 | + return $processedChunk; |
| 77 | + } |
| 78 | + |
| 79 | + /** |
| 80 | + * Handle streaming results (ChatStreamResult from nested pipeline). |
| 81 | + */ |
| 82 | + protected function handleStreamingResult(ChatStreamResult $result): ChatStreamResult |
| 83 | + { |
| 84 | + // This happens when we return a nested stream - AbstractLLM will handle it |
| 85 | + return $result; |
| 86 | + } |
| 87 | + |
| 88 | + /** |
| 89 | + * Handle non-streaming payloads (ChatResult, ChatGeneration, etc.). |
| 90 | + */ |
| 91 | + protected function handleNonStreaming(mixed $payload, RuntimeConfig $config, Closure $next): mixed |
38 | 92 | { |
39 | 93 | $generation = $this->getGeneration($payload); |
40 | 94 |
|
41 | 95 | while ($generation?->message?->hasToolCalls() && $this->currentStep++ < $this->maxSteps) { |
42 | | - // Get the results of the tool calls, represented as tool messages. |
43 | | - $toolMessages = $generation->message->toolCalls->invokeAsToolMessages($this->tools); |
44 | | - |
45 | | - // If there are any tool messages, add them to the memory. |
46 | | - // And send them to the execution pipeline to get a new generation. |
47 | | - if ($toolMessages->isNotEmpty()) { |
48 | | - // @phpstan-ignore argument.type |
49 | | - $toolMessages->each(fn(ToolMessage $message) => $this->memory->addMessage($message)); |
50 | | - |
51 | | - // Track the next step before making the LLM call |
52 | | - $config->context->addNextStep(); |
53 | | - |
54 | | - // Send the tool messages to the execution stages to get a new generation. |
55 | | - // Create a temporary pipeline from the execution stages. |
56 | | - // Since this is a new Pipeline instance, its Pipeline events won't trigger |
57 | | - // the main pipeline's callbacks due to eventBelongsToThisInstance filtering. |
58 | | - $nestedPipeline = new Pipeline(...$this->executionStages); |
59 | | - |
60 | | - // Enable streaming on the nested pipeline if the config has streaming enabled |
61 | | - if ($config->streaming) { |
62 | | - $nestedPipeline->enableStreaming(); |
63 | | - } |
64 | | - |
65 | | - $payload = $nestedPipeline->invoke([ |
66 | | - 'messages' => $this->memory->getMessages(), |
67 | | - ...$this->memory->getVariables(), |
68 | | - ], $config); |
69 | | - |
70 | | - // If the payload is a stream result, append any stream buffer chunks |
71 | | - // (like StepStart/StepEnd) that were pushed during the nested pipeline execution |
72 | | - if ($payload instanceof ChatStreamResult) { |
73 | | - $payload = $payload->appendStreamBuffer($config); |
74 | | - } |
75 | | - |
76 | | - // Update the generation so that the loop can check the new generation for tool calls. |
77 | | - $generation = $this->getGeneration($payload); |
| 96 | + $nestedPayload = $this->processToolCalls($generation, $config); |
| 97 | + |
| 98 | + if ($nestedPayload !== null) { |
| 99 | + // Update the generation so that the loop can check the new generation for tool calls |
| 100 | + $generation = $this->getGeneration($nestedPayload); |
| 101 | + $payload = $nestedPayload; |
78 | 102 | } |
79 | 103 | } |
80 | 104 |
|
81 | | - // The final step is already properly set - no need to update it |
82 | | - // If it has tool calls, they were set in the while loop |
83 | | - // If it doesn't have tool calls, it was initialized with an empty ToolCallCollection |
84 | | - |
85 | 105 | return $next($payload, $config); |
86 | 106 | } |
87 | 107 |
|
| 108 | + /** |
| 109 | + * Process tool calls and return the nested pipeline result. |
| 110 | + * |
| 111 | + * @return ChatResult|ChatStreamResult|null Returns null if no tool calls to process |
| 112 | + */ |
| 113 | + protected function processToolCalls(ChatGeneration|ChatGenerationChunk $generation, RuntimeConfig $config): ChatResult|ChatStreamResult|null |
| 114 | + { |
| 115 | + $toolMessages = $generation->message->toolCalls->invokeAsToolMessages($this->tools); |
| 116 | + |
| 117 | + if ($toolMessages->isEmpty()) { |
| 118 | + return null; |
| 119 | + } |
| 120 | + |
| 121 | + // @phpstan-ignore argument.type |
| 122 | + $toolMessages->each(fn(ToolMessage $message) => $this->memory->addMessage($message)); |
| 123 | + |
| 124 | + $config->context->addNextStep(); |
| 125 | + |
| 126 | + $nestedPipeline = new Pipeline(...$this->executionStages) |
| 127 | + ->enableStreaming($config->streaming); |
| 128 | + |
| 129 | + $nestedPayload = $nestedPipeline->invoke([ |
| 130 | + 'messages' => $this->memory->getMessages(), |
| 131 | + ...$this->memory->getVariables(), |
| 132 | + ], $config); |
| 133 | + |
| 134 | + // If the payload is a stream result, append any stream buffer chunks |
| 135 | + // (like StepStart/StepEnd) that were pushed during the nested pipeline execution |
| 136 | + if ($nestedPayload instanceof ChatStreamResult) { |
| 137 | + return $nestedPayload->appendStreamBuffer($config); |
| 138 | + } |
| 139 | + |
| 140 | + return $nestedPayload; |
| 141 | + } |
| 142 | + |
88 | 143 | /** |
89 | 144 | * Get the generation from the payload. |
90 | 145 | */ |
91 | 146 | protected function getGeneration(mixed $payload): ChatGeneration|ChatGenerationChunk|null |
92 | 147 | { |
93 | 148 | return match (true) { |
94 | 149 | $payload instanceof ChatGeneration => $payload, |
95 | | - // When streaming, only the final chunk will contain the completed tool calls and content. |
96 | | - $payload instanceof ChatGenerationChunk && $payload->isFinal => $payload, |
| 150 | + // When streaming, We need to wait for the ToolInputEnd chunk to get the completed tool calls and content. |
| 151 | + $payload instanceof ChatGenerationChunk && $payload->type === ChunkType::ToolInputEnd => $payload, |
97 | 152 | $payload instanceof ChatResult => $payload->generation, |
98 | 153 | default => null, |
99 | 154 | }; |
|
0 commit comments