Skip to content

Commit a610c60

Browse files
committed
wip
1 parent 4d9a484 commit a610c60

File tree

18 files changed

+399
-46
lines changed

18 files changed

+399
-46
lines changed

src/Console/ChatPrompt.php

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,8 @@ protected function processAgentResponse(string $userInput): void
407407

408408
// Stream response in real-time
409409
foreach ($result as $chunk) {
410+
$textSoFar = $chunk->textSoFar();
411+
410412
// Handle tool calls in debug mode
411413
if ($this->debug) {
412414
match ($chunk->type) {
@@ -417,9 +419,9 @@ protected function processAgentResponse(string $userInput): void
417419
};
418420
}
419421

420-
if ($chunk->type === ChunkType::TextDelta && $chunk->contentSoFar !== '') {
421-
$fullResponse = $chunk->contentSoFar;
422-
$this->streamingContent = $chunk->contentSoFar;
422+
if ($chunk->type === ChunkType::TextDelta && $textSoFar !== null) {
423+
$fullResponse = $textSoFar;
424+
$this->streamingContent = $textSoFar;
423425

424426
// Auto-scroll to bottom during streaming if enabled
425427
if ($this->autoScroll) {
@@ -430,9 +432,9 @@ protected function processAgentResponse(string $userInput): void
430432
$this->throttledRender();
431433
}
432434

433-
if ($chunk->isFinal && $chunk->contentSoFar !== '') {
434-
$fullResponse = $chunk->contentSoFar;
435-
$this->streamingContent = $chunk->contentSoFar;
435+
if ($chunk->isFinal && $textSoFar !== null) {
436+
$fullResponse = $textSoFar;
437+
$this->streamingContent = $textSoFar;
436438
// Force render for final chunk (not throttled)
437439
$this->render();
438440
}

src/LLM/AbstractLLM.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,13 @@ public function __construct(
103103
}
104104
}
105105

106+
public function stream(
107+
MessageCollection|Message|array|string $messages,
108+
array $additionalParameters = [],
109+
): ChatStreamResult {
110+
return $this->withStreaming(true)->invoke($messages, $additionalParameters);
111+
}
112+
106113
public function handlePipeable(mixed $payload, RuntimeConfig $config, Closure $next): mixed
107114
{
108115
$this->shouldParseOutput($config->context->shouldParseOutput());

src/LLM/Contracts/LLM.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,19 @@ public function invoke(
3131
array $additionalParameters = [],
3232
): ChatResult|ChatStreamResult;
3333

34+
/**
35+
* Convenience method to stream the LLM response.
36+
*
37+
* @param \Cortex\LLM\Data\Messages\MessageCollection|\Cortex\LLM\Contracts\Message|array<int, \Cortex\LLM\Contracts\Message>|string $messages
38+
* @param array<string, mixed> $additionalParameters
39+
*
40+
* @return \Cortex\LLM\Data\ChatStreamResult<\Cortex\LLM\Data\ChatGenerationChunk>
41+
*/
42+
public function stream(
43+
MessageCollection|Message|array|string $messages,
44+
array $additionalParameters = [],
45+
): ChatStreamResult;
46+
3447
/**
3548
* Specify the tools to use for the LLM.
3649
*

src/LLM/Data/ChatGenerationChunk.php

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,21 @@
88
use DateTimeImmutable;
99
use DateTimeInterface;
1010
use Cortex\LLM\Enums\ChunkType;
11+
use Cortex\LLM\Contracts\Content;
1112
use Cortex\LLM\Enums\FinishReason;
1213
use Cortex\LLM\Data\Messages\ToolMessage;
1314
use Illuminate\Contracts\Support\Arrayable;
1415
use Cortex\LLM\Data\Messages\AssistantMessage;
16+
use Cortex\LLM\Data\Messages\Content\TextContent;
17+
use Cortex\LLM\Data\Messages\Content\ReasoningContent;
1518

1619
/**
1720
* @implements Arrayable<string, mixed>
1821
*/
1922
readonly class ChatGenerationChunk implements Arrayable
2023
{
2124
/**
25+
* @param array<\Cortex\LLM\Contracts\Content> $contentSoFar
2226
* @param array<string, mixed>|null $rawChunk
2327
* @param array<string, mixed> $metadata
2428
*/
@@ -29,7 +33,7 @@ public function __construct(
2933
public DateTimeInterface $createdAt = new DateTimeImmutable(),
3034
public ?FinishReason $finishReason = null,
3135
public ?Usage $usage = null,
32-
public string $contentSoFar = '',
36+
public array $contentSoFar = [],
3337
public bool $isFinal = false,
3438
public mixed $parsedOutput = null,
3539
public ?string $outputParserError = null,
@@ -48,6 +52,30 @@ public function text(): ?string
4852
return $this->message->text();
4953
}
5054

55+
/**
56+
* Get the text content that has been streamed so far.
57+
*/
58+
public function textSoFar(): ?string
59+
{
60+
/** @var \Cortex\LLM\Data\Messages\Content\TextContent|null $textContent */
61+
$textContent = collect($this->contentSoFar)
62+
->first(fn(Content $content): bool => $content instanceof TextContent);
63+
64+
return $textContent?->text;
65+
}
66+
67+
/**
68+
* Get the reasoning content that has been streamed so far.
69+
*/
70+
public function reasoningSoFar(): ?string
71+
{
72+
/** @var \Cortex\LLM\Data\Messages\Content\ReasoningContent|null $reasoningContent */
73+
$reasoningContent = collect($this->contentSoFar)
74+
->first(fn(Content $content): bool => $content instanceof ReasoningContent);
75+
76+
return $reasoningContent?->reasoning;
77+
}
78+
5179
public function cloneWithParsedOutput(mixed $parsedOutput): self
5280
{
5381
return new self(

src/LLM/Data/ChatStreamResult.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use Cortex\Events\RuntimeConfigStreamChunk;
1313
use Cortex\LLM\Data\Messages\AssistantMessage;
1414
use Cortex\LLM\Data\Concerns\HasStreamResponses;
15+
use Cortex\LLM\Data\Messages\Content\TextContent;
1516

1617
/**
1718
* @extends LazyCollection<int, \Cortex\LLM\Data\ChatGenerationChunk>
@@ -93,7 +94,7 @@ public static function fake(?string $string = null, ?ToolCallCollection $toolCal
9394
completionTokens: $index,
9495
totalTokens: $index,
9596
),
96-
contentSoFar: $contentSoFar,
97+
contentSoFar: [new TextContent($contentSoFar)],
9798
isFinal: $isFinal,
9899
);
99100

src/LLM/Data/Messages/Content/ReasoningContent.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,11 @@ public function __construct(
1010
public string $id,
1111
public string $reasoning,
1212
) {}
13+
14+
public function append(string $reasoning): self
15+
{
16+
$this->reasoning .= $reasoning;
17+
18+
return $this;
19+
}
1320
}

src/LLM/Data/Messages/Content/TextContent.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,15 @@ public function replaceVariables(array $variables): static
2727

2828
return new self($this->getCompiler()->compile($this->text, $variables));
2929
}
30+
31+
public function append(string $text): self
32+
{
33+
if ($this->text === null) {
34+
return new self($text);
35+
}
36+
37+
$this->text .= $text;
38+
39+
return $this;
40+
}
3041
}

src/LLM/Drivers/Anthropic/Concerns/MapStreamResponse.php

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@
1212
use Cortex\LLM\Data\ChatGenerationChunk;
1313
use Cortex\LLM\Data\Messages\AssistantMessage;
1414
use Cortex\SDK\Anthropic\Contracts\StreamEvent;
15+
use Cortex\LLM\Data\Messages\Content\TextContent;
1516
use Cortex\SDK\Anthropic\Data\Messages\MessageStream;
17+
use Cortex\LLM\Data\Messages\Content\ReasoningContent;
1618
use Cortex\SDK\Anthropic\Data\Messages\Streaming\TextDelta;
1719
use Cortex\SDK\Anthropic\Data\Messages\Streaming\ThinkingDelta;
1820
use Cortex\SDK\Anthropic\Data\Messages\Streaming\InputJsonDelta;
21+
use Cortex\SDK\Anthropic\Data\Messages\Streaming\SignatureDelta;
1922
use Cortex\SDK\Anthropic\Data\Messages\Streaming\Events\MessageStop;
2023
use Cortex\SDK\Anthropic\Data\Messages\Streaming\Events\MessageDelta;
2124
use Cortex\SDK\Anthropic\Data\Messages\Streaming\Events\MessageStart;
@@ -40,7 +43,9 @@ protected function mapStreamResponse(MessageStream $response): ChatStreamResult
4043
return new ChatStreamResult(function () use ($response): Generator {
4144
yield from $this->streamBuffer?->drain() ?? [];
4245

43-
$contentSoFar = '';
46+
$contentSoFar = [];
47+
$currentTextContent = null;
48+
$currentReasoningContent = null;
4449

4550
/** @var \Cortex\SDK\Anthropic\Contracts\StreamEvent $event */
4651
foreach ($response as $event) {
@@ -58,13 +63,59 @@ protected function mapStreamResponse(MessageStream $response): ChatStreamResult
5863
? $this->mapFinishReason($event->stopReason)
5964
: null;
6065

61-
$contentSoFar .= $event instanceof ContentBlockDelta
62-
? ($event->delta instanceof TextDelta
63-
? $event->delta->text
64-
: ($event->delta instanceof InputJsonDelta
65-
? $event->delta->partialJson
66-
: $event->delta->thinking))
67-
: '';
66+
// Handle ContentBlockStart - initialize new content objects
67+
if ($event instanceof ContentBlockStart) {
68+
if ($event->contentBlock instanceof TextContentBlock) {
69+
$currentTextContent = new TextContent($event->contentBlock->text);
70+
} elseif ($event->contentBlock instanceof ThinkingContentBlock) {
71+
$currentReasoningContent = new ReasoningContent(
72+
$event->contentBlock->signature,
73+
$event->contentBlock->thinking,
74+
);
75+
} elseif ($event->contentBlock instanceof RedactedThinkingContentBlock) {
76+
// For redacted thinking, we might use the data field as id
77+
$currentReasoningContent = new ReasoningContent(
78+
$event->contentBlock->data,
79+
$event->contentBlock->text,
80+
);
81+
}
82+
}
83+
84+
// Handle ContentBlockDelta - append incremental content
85+
if ($event instanceof ContentBlockDelta) {
86+
if ($event->delta instanceof TextDelta && $currentTextContent !== null) {
87+
$currentTextContent = $currentTextContent->append($event->delta->text);
88+
} elseif ($event->delta instanceof ThinkingDelta && $currentReasoningContent !== null) {
89+
$currentReasoningContent = $currentReasoningContent->append($event->delta->thinking);
90+
} elseif ($event->delta instanceof SignatureDelta && $currentReasoningContent !== null) {
91+
// Update the ID if signature delta comes after content block start
92+
$currentReasoningContent = new ReasoningContent(
93+
$event->delta->signature,
94+
$currentReasoningContent->reasoning,
95+
);
96+
}
97+
}
98+
99+
// Handle ContentBlockStop - finalize and add to contentSoFar
100+
if ($event instanceof ContentBlockStop) {
101+
if ($currentTextContent !== null) {
102+
$contentSoFar[] = $currentTextContent;
103+
$currentTextContent = null;
104+
} elseif ($currentReasoningContent !== null) {
105+
$contentSoFar[] = $currentReasoningContent;
106+
$currentReasoningContent = null;
107+
}
108+
}
109+
110+
// Build current contentSoFar array including partial content being built
111+
// Clone content objects to avoid mutating references in previous chunks
112+
$currentContentSoFar = [...$contentSoFar];
113+
114+
if ($currentTextContent !== null) {
115+
$currentContentSoFar[] = new TextContent($currentTextContent->text);
116+
} elseif ($currentReasoningContent !== null) {
117+
$currentContentSoFar[] = new ReasoningContent($currentReasoningContent->id, $currentReasoningContent->reasoning);
118+
}
68119

69120
$id = $event instanceof MessageStart
70121
? $event->message->id
@@ -95,7 +146,7 @@ protected function mapStreamResponse(MessageStream $response): ChatStreamResult
95146
createdAt: $meta->createdAt ?? new DateTimeImmutable(),
96147
finishReason: $finishReason,
97148
usage: $usage,
98-
contentSoFar: $contentSoFar,
149+
contentSoFar: $currentContentSoFar,
99150
isFinal: $usage !== null,
100151
rawChunk: $this->includeRaw ? $event->raw() : null,
101152
);

src/LLM/Drivers/OpenAI/Chat/Concerns/MapsStreamResponse.php

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use Cortex\OutputParsers\JsonOutputParser;
2222
use Cortex\Exceptions\OutputParserException;
2323
use Cortex\LLM\Data\Messages\AssistantMessage;
24+
use Cortex\LLM\Data\Messages\Content\TextContent;
2425
use OpenAI\Responses\Chat\CreateStreamedResponseChoice;
2526

2627
/** @mixin \Cortex\LLM\Drivers\OpenAI\Chat\OpenAIChat */
@@ -36,7 +37,8 @@ trait MapsStreamResponse
3637
protected function mapStreamResponse(StreamResponse $response): ChatStreamResult
3738
{
3839
return new ChatStreamResult(function () use ($response): Generator {
39-
$contentSoFar = '';
40+
$contentSoFar = [];
41+
$currentTextContent = null;
4042
$toolCallsSoFar = [];
4143
$isActiveText = false;
4244
$finishReason = null;
@@ -68,8 +70,14 @@ protected function mapStreamResponse(StreamResponse $response): ChatStreamResult
6870
$usage,
6971
);
7072

71-
// Now update content and tool call tracking
72-
$contentSoFar .= $choice->delta->content;
73+
// Handle text content - initialize or append
74+
if ($choice->delta->content !== null) {
75+
if ($currentTextContent === null) {
76+
$currentTextContent = new TextContent($choice->delta->content);
77+
} else {
78+
$currentTextContent = $currentTextContent->append($choice->delta->content);
79+
}
80+
}
7381

7482
// Track tool calls across chunks
7583
foreach ($choice->delta->toolCalls as $toolCall) {
@@ -158,11 +166,25 @@ protected function mapStreamResponse(StreamResponse $response): ChatStreamResult
158166
}
159167
}
160168

169+
// Finalize text content if this is the end
170+
if ($usage !== null && $currentTextContent !== null) {
171+
$contentSoFar[] = $currentTextContent;
172+
$currentTextContent = null;
173+
}
174+
175+
// Build current contentSoFar array including partial content being built
176+
// Clone TextContent to avoid mutating references in previous chunks
177+
$currentContentSoFar = [...$contentSoFar];
178+
179+
if ($currentTextContent !== null) {
180+
$currentContentSoFar[] = new TextContent($currentTextContent->text);
181+
}
182+
161183
$chatGenerationChunk = new ChatGenerationChunk(
162184
type: $chunkType,
163185
id: $chunk->id,
164186
message: new AssistantMessage(
165-
content: $choice->delta->content ?? null,
187+
content: $chunk->choices !== [] ? ($chunk->choices[0]->delta->content ?? null) : null,
166188
toolCalls: $accumulatedToolCallsSoFar ?? null,
167189
metadata: new ResponseMetadata(
168190
id: $chunk->id,
@@ -175,7 +197,7 @@ protected function mapStreamResponse(StreamResponse $response): ChatStreamResult
175197
createdAt: DateTimeImmutable::createFromFormat('U', (string) $chunk->created),
176198
finishReason: $usage !== null ? $finishReason : null,
177199
usage: $usage,
178-
contentSoFar: $contentSoFar,
200+
contentSoFar: $currentContentSoFar,
179201
isFinal: $usage !== null,
180202
rawChunk: $this->includeRaw ? $chunk->toArray() : null,
181203
);

0 commit comments

Comments
 (0)