Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 59 additions & 30 deletions backend/openmlr/agent/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from collections.abc import AsyncGenerator

from ..config import AgentConfig
from .types import LLMResult, ToolCall
from .types import LLMResult, ThinkingChunk, ToolCall


class LLMProvider:
Expand Down Expand Up @@ -112,6 +112,18 @@ def _is_anthropic_model(model_name: str) -> bool:
OpenRouter-routed Claude models use the OpenAI-compatible path."""
return model_name.lower().startswith("anthropic/")

@staticmethod
def _supports_thinking(model_name: str) -> bool:
"""Check if an Anthropic model supports extended thinking (Claude 3.7+, Claude 4+)."""
normalized = LLMProvider._normalize_model(model_name).lower()
thinking_patterns = [
"claude-3-7",
"claude-3.7", # Claude 3.7 Sonnet
"claude-sonnet-4",
"claude-opus-4", # Claude 4 family
]
return any(p in normalized for p in thinking_patterns)

@staticmethod
def _uses_anthropic_format(model_name: str, custom_providers: list | None = None) -> bool:
"""Check if model uses Anthropic message format (native Anthropic, OpenCode Go Anthropic, or custom provider with anthropic-sdk)."""
Expand Down Expand Up @@ -139,7 +151,7 @@ async def generate_stream(
messages: list[dict],
config: AgentConfig,
tools: list[dict] | None = None,
) -> AsyncGenerator[str | ToolCall | dict, None]:
) -> AsyncGenerator[str | ToolCall | ThinkingChunk | dict, None]:
async for chunk in LLMProvider._stream_with_retry(messages, config, tools):
yield chunk

Expand Down Expand Up @@ -217,7 +229,7 @@ async def _stream_with_retry(
messages: list[dict],
config: AgentConfig,
tools: list[dict] | None = None,
) -> AsyncGenerator[str | ToolCall | dict, None]:
) -> AsyncGenerator[str | ToolCall | ThinkingChunk | dict, None]:
last_error = None
for attempt in range(3):
try:
Expand Down Expand Up @@ -319,7 +331,7 @@ async def _stream_openai(
messages: list[dict],
config: AgentConfig,
tools: list[dict] | None,
) -> AsyncGenerator[str | ToolCall | dict, None]:
) -> AsyncGenerator[str | ToolCall | ThinkingChunk | dict, None]:
client = LLMProvider._openai_client(config)
model = LLMProvider._normalize_model(config.model_name, config.custom_providers)

Expand Down Expand Up @@ -358,6 +370,11 @@ async def _stream_openai(
if delta is None:
continue

# Reasoning content (OpenAI o1/o3 reasoning models)
reasoning = getattr(delta, "reasoning_content", None)
if reasoning:
yield ThinkingChunk(text=reasoning)

# Text content
if delta.content:
yield delta.content
Expand Down Expand Up @@ -511,16 +528,15 @@ def _anthropic_client(config: AgentConfig):
return AsyncAnthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))

@staticmethod
async def _call_anthropic(
messages: list[dict],
config: AgentConfig,
def _build_anthropic_params(
model: str,
chat_msgs: list[dict],
system_prompt: str,
tools: list[dict] | None,
) -> LLMResult:
model = LLMProvider._normalize_model(config.model_name, config.custom_providers)
client = LLMProvider._anthropic_client(config)
system_prompt, chat_msgs = LLMProvider._to_anthropic_messages(messages)

params = {"model": model, "messages": chat_msgs, "max_tokens": 4096}
model_name: str,
) -> dict:
"""Build the params dict shared by _call_anthropic and _stream_anthropic."""
params: dict = {"model": model, "messages": chat_msgs, "max_tokens": 4096}
if system_prompt:
params["system"] = [
{
Expand All @@ -532,13 +548,34 @@ async def _call_anthropic(
anthropic_tools = LLMProvider._anthropic_tool_param(tools)
if anthropic_tools:
params["tools"] = anthropic_tools

# Enable extended thinking for compatible models (Claude 3.7+, Claude 4+)
if LLMProvider._supports_thinking(model_name):
params["max_tokens"] = 16000
params["thinking"] = {"type": "enabled", "budget_tokens": 10000}
params["extra_headers"] = {"anthropic-beta": "prompt-caching-2024-07-31"}
return params

@staticmethod
async def _call_anthropic(
messages: list[dict],
config: AgentConfig,
tools: list[dict] | None,
) -> LLMResult:
model = LLMProvider._normalize_model(config.model_name, config.custom_providers)
client = LLMProvider._anthropic_client(config)
system_prompt, chat_msgs = LLMProvider._to_anthropic_messages(messages)

params = LLMProvider._build_anthropic_params(
model, chat_msgs, system_prompt, tools, config.model_name
)
response = await client.messages.create(**params)

tool_calls = []
text_content = ""
for block in response.content:
if block.type == "thinking":
# Thinking blocks are not included in the response text
continue
if block.type == "text":
text_content += block.text
elif block.type == "tool_use":
Expand All @@ -564,29 +601,21 @@ async def _stream_anthropic(
messages: list[dict],
config: AgentConfig,
tools: list[dict] | None,
) -> AsyncGenerator[str | ToolCall | dict, None]:
) -> AsyncGenerator[str | ToolCall | ThinkingChunk | dict, None]:
model = LLMProvider._normalize_model(config.model_name, config.custom_providers)
client = LLMProvider._anthropic_client(config)
system_prompt, chat_msgs = LLMProvider._to_anthropic_messages(messages)

params = {"model": model, "messages": chat_msgs, "max_tokens": 4096}
if system_prompt:
params["system"] = [
{
"type": "text",
"text": system_prompt,
"cache_control": {"type": "ephemeral"},
}
]
anthropic_tools = LLMProvider._anthropic_tool_param(tools)
if anthropic_tools:
params["tools"] = anthropic_tools

params["extra_headers"] = {"anthropic-beta": "prompt-caching-2024-07-31"}
params = LLMProvider._build_anthropic_params(
model, chat_msgs, system_prompt, tools, config.model_name
)
async with client.messages.stream(**params) as stream:
async for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
if event.delta.type == "thinking_delta":
# Extended thinking content
yield ThinkingChunk(text=event.delta.thinking)
elif event.delta.type == "text_delta":
yield event.delta.text

if event.type == "message_delta" and event.usage:
Expand Down
48 changes: 46 additions & 2 deletions backend/openmlr/agent/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import asyncio
import json
import time
import traceback

from ..config import AgentConfig
from .doom_loop import detect_doom_loop
from .llm import LLMProvider
from .session import Session
from .types import AgentEvent, LLMResult, Message, OpType, Submission, ToolCall
from .types import AgentEvent, LLMResult, Message, OpType, Submission, ThinkingChunk, ToolCall


def _append_hint_to_last_user_msg(messages: list[Message], hint: str) -> None:
Expand Down Expand Up @@ -319,12 +320,35 @@ async def _stream_llm_call(
content_buffer = ""
tool_calls: list[ToolCall] = []
usage_data = None
thinking_started: float | None = None
was_thinking = False

async for chunk in LLMProvider.generate_stream(messages, session.config, tools):
if session.is_cancelled():
return None

if isinstance(chunk, str):
if isinstance(chunk, ThinkingChunk):
# Extended thinking / reasoning content
if thinking_started is None:
thinking_started = time.time()
was_thinking = True
await session.emit(
AgentEvent(
event_type="thinking_chunk",
data={"chunk": chunk.text},
)
)
elif isinstance(chunk, str):
# Transition from thinking to text — emit thinking_end
if was_thinking:
duration = time.time() - thinking_started if thinking_started else 0
await session.emit(
AgentEvent(
event_type="thinking_end",
data={"duration_seconds": round(duration, 1)},
)
)
was_thinking = False
content_buffer += chunk
await session.emit(
AgentEvent(
Expand All @@ -333,6 +357,16 @@ async def _stream_llm_call(
)
)
elif isinstance(chunk, ToolCall):
# Transition from thinking to tool call — emit thinking_end
if was_thinking:
duration = time.time() - thinking_started if thinking_started else 0
await session.emit(
AgentEvent(
event_type="thinking_end",
data={"duration_seconds": round(duration, 1)},
)
)
was_thinking = False
tool_calls.append(chunk)
await session.emit(
AgentEvent(
Expand All @@ -350,6 +384,16 @@ async def _stream_llm_call(
if chunk.get("event") == "usage":
usage_data = chunk.get("usage")

# If thinking was still active at end of stream (no text/tool followed), close it
if was_thinking and thinking_started:
duration = time.time() - thinking_started
await session.emit(
AgentEvent(
event_type="thinking_end",
data={"duration_seconds": round(duration, 1)},
)
)

if content_buffer or tool_calls:
await session.emit(AgentEvent(event_type="assistant_stream_end"))

Expand Down
7 changes: 7 additions & 0 deletions backend/openmlr/agent/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
from typing import Any


@dataclass
class ThinkingChunk:
"""A chunk of thinking/reasoning content from the LLM."""

text: str


@dataclass
class ToolCall:
"""A tool call requested by the LLM."""
Expand Down
53 changes: 51 additions & 2 deletions frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
const { uuid: routeUuid } = useParams<{ uuid: string }>();

const [messages, setMessages] = useState<Message[]>([]);
const [approvalEvent, setApprovalEvent] = useState<any>(null);

Check warning on line 120 in frontend/src/App.tsx

View workflow job for this annotation

GitHub Actions / Frontend Lint

Unexpected any. Specify a different type
const [conversations, setConversations] = useState<Conversation[]>([]);
const [currentConvUuid, setCurrentConvUuid] = useState<string | null>(routeUuid || null);
const [convStatuses, setConvStatuses] = useState<Record<string, ConvStatus>>({});
Expand All @@ -130,8 +130,8 @@
const [viewingReport, setViewingReport] = useState<Resource | null>(null);
const [inputMode, setInputMode] = useState<Mode>('plan');
const [inputText, setInputText] = useState('');
const [computeNodes, setComputeNodes] = useState<any[]>([]);

Check warning on line 133 in frontend/src/App.tsx

View workflow job for this annotation

GitHub Actions / Frontend Lint

Unexpected any. Specify a different type
const [activeCompute, setActiveCompute] = useState<any>(null);

Check warning on line 134 in frontend/src/App.tsx

View workflow job for this annotation

GitHub Actions / Frontend Lint

Unexpected any. Specify a different type
const [projects, setProjects] = useState<Project[]>([]);
const [activeProject, setActiveProject] = useState<Project | null>(null);
const [showProjectModal, setShowProjectModal] = useState(false);
Expand Down Expand Up @@ -272,7 +272,7 @@
}
};
init();
}, []);

Check warning on line 275 in frontend/src/App.tsx

View workflow job for this annotation

GitHub Actions / Frontend Lint

React Hook useEffect has missing dependencies: 'currentConvUuid', 'loadComputeNodes', 'loadConversations', 'loadMcpServers', 'navigate', 'routeUuid', 'setModel', and 'switchConv'. Either include them or remove the dependency array. If 'setModel' changes too often, find the parent component that defines it and wrap that definition in useCallback

// Reload conversations when activeProject changes and auto-select the first
useEffect(() => {
Expand All @@ -291,14 +291,14 @@
setResources([]);
}
});
}, [activeProject, loadConversations]);

Check warning on line 294 in frontend/src/App.tsx

View workflow job for this annotation

GitHub Actions / Frontend Lint

React Hook useEffect has missing dependencies: 'navigate' and 'switchConv'. Either include them or remove the dependency array

// Handle navigation to a different conversation via URL change
useEffect(() => {
if (routeUuid && routeUuid !== currentConvUuid) {
switchConv(routeUuid);
}
}, [routeUuid]);

Check warning on line 301 in frontend/src/App.tsx

View workflow job for this annotation

GitHub Actions / Frontend Lint

React Hook useEffect has missing dependencies: 'currentConvUuid' and 'switchConv'. Either include them or remove the dependency array

const setCurrentConvStatus = useCallback((status: ConvStatus) => {
setCurrentConvUuid((uuid) => {
Expand Down Expand Up @@ -327,8 +327,8 @@
setApprovalEvent(null); setQuestionsPayload(null); setTodoApprovalPayload(null);

// Load persisted tasks and resources from database
setTasks(data.tasks?.map((t: any) => ({ title: t.title, status: t.status })) || []);

Check warning on line 330 in frontend/src/App.tsx

View workflow job for this annotation

GitHub Actions / Frontend Lint

Unexpected any. Specify a different type
setResources(data.resources?.map((r: any) => ({

Check warning on line 331 in frontend/src/App.tsx

View workflow job for this annotation

GitHub Actions / Frontend Lint

Unexpected any. Specify a different type
title: r.title,
url: r.url || '',
type: r.type,
Expand All @@ -340,7 +340,7 @@
setRightPanelOpen(true);
}

setMessages(data.messages?.map((m: any) => {

Check warning on line 343 in frontend/src/App.tsx

View workflow job for this annotation

GitHub Actions / Frontend Lint

Unexpected any. Specify a different type
if (m.role === 'tool') {
const meta = m.metadata || {};
return { id: nextId(), role: 'tool' as const, content: '', metadata: { tool: meta.tool || 'tool', args: '', output: m.content, outputSuccess: meta.success !== false } };
Expand Down Expand Up @@ -413,7 +413,7 @@
// Guard: only apply if this is still the active conversation
if (uuid !== currentConvUuidRef.current) return;
if (data.messages) {
setMessages(data.messages.map((m: any) => {

Check warning on line 416 in frontend/src/App.tsx

View workflow job for this annotation

GitHub Actions / Frontend Lint

Unexpected any. Specify a different type
if (m.role === 'tool') {
const meta = m.metadata || {};
return { id: nextId(), role: 'tool' as const, content: '', metadata: { tool: meta.tool || 'tool', args: '', output: m.content, outputSuccess: meta.success !== false } };
Expand Down Expand Up @@ -480,12 +480,51 @@
return [...prev, { id: nextId(), role: 'system', content: '::thinking::' }];
});
break;
case 'thinking_chunk':
case 'thinking_end': {
if (event_type === 'thinking_chunk') {
const tchunk = data?.chunk || '';
if (!tchunk) break;
setMessages((prev) => {
// Remove plain ::thinking:: indicator if present
let msgs = prev;
if (msgs.length > 0 && msgs[msgs.length - 1].content === '::thinking::') msgs = msgs.slice(0, -1);
// Append to existing thinking message or create new one
const last = msgs[msgs.length - 1];
if (last?.role === 'system' && last.content === '::thinking_content::') {
const updated = [...msgs];
updated[updated.length - 1] = { ...last, thinking: (last.thinking || '') + tchunk };
return updated;
}
return [...msgs, { id: nextId(), role: 'system', content: '::thinking_content::', thinking: tchunk }];
});
} else {
const duration = data?.duration_seconds || 0;
setMessages((prev) => {
const idx = findLastIndex(prev, (m: Message) => m.role === 'system' && m.content === '::thinking_content::');
if (idx >= 0) {
const updated = [...prev];
updated[idx] = { ...updated[idx], thinkingDuration: duration };
return updated;
}
return prev;
});
}
break;
}
case 'assistant_chunk': {
const chunk = data?.chunk || data?.content || '';
if (!chunk) break;
setMessages((prev) => {
let msgs = prev;
// Remove plain ::thinking:: indicator if present
if (msgs.length > 0 && msgs[msgs.length - 1].content === '::thinking::') msgs = msgs.slice(0, -1);
// Collapse thinking block when reply starts
const thinkIdx = findLastIndex(msgs, (m: Message) => m.role === 'system' && m.content === '::thinking_content::' && !m.thinkingCollapsed);
if (thinkIdx >= 0) {
msgs = [...msgs];
msgs[thinkIdx] = { ...msgs[thinkIdx], thinkingCollapsed: true };
}
const last = msgs[msgs.length - 1];
if (last?.role === 'assistant' && last.streaming) {
const updated = [...msgs]; updated[updated.length - 1] = { ...last, content: last.content + chunk }; return updated;
Expand All @@ -512,7 +551,13 @@
break;
case 'tool_call':
setMessages((prev) => {
const msgs = prev.filter((m) => !(m.role === 'system' && m.content === '::thinking::'));
let msgs = prev.filter((m) => !(m.role === 'system' && m.content === '::thinking::'));
// Collapse thinking block when tool call arrives
const thinkIdx = findLastIndex(msgs, (m: Message) => m.role === 'system' && m.content === '::thinking_content::' && !m.thinkingCollapsed);
if (thinkIdx >= 0) {
msgs = [...msgs];
msgs[thinkIdx] = { ...msgs[thinkIdx], thinkingCollapsed: true };
}
return [...msgs, { id: nextId(), role: 'tool', content: '', metadata: { tool: data?.tool ?? '', tool_call_id: data?.id, args: typeof data?.arguments === 'string' ? data.arguments.slice(0, 120) : JSON.stringify(data?.arguments ?? {}).slice(0, 120) } }];
});
break;
Expand Down Expand Up @@ -608,7 +653,11 @@
// Cancel any pending job_complete reload — SSE events already updated state
if (reloadTimerRef.current) { clearTimeout(reloadTimerRef.current); reloadTimerRef.current = null; }
setMessages((prev) => {
const c = prev.filter((m) => !(m.role === 'system' && m.content === '::thinking::'));
// Remove plain thinking indicator, collapse any uncollapsed thinking blocks
const c = prev
.filter((m) => !(m.role === 'system' && m.content === '::thinking::'))
.map((m) => (m.role === 'system' && m.content === '::thinking_content::' && !m.thinkingCollapsed)
? { ...m, thinkingCollapsed: true } : m);
const last = c[c.length - 1];
setCurrentConvStatus(last?.role === 'assistant' && last.content.trim().endsWith('?') ? 'waiting_input' : 'idle');
return c;
Expand Down
4 changes: 2 additions & 2 deletions frontend/src/components/InputArea.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ export const InputArea = React.memo(function InputArea({ disabled, showStop, mod
/>
</div>

{/* Stop button */}
{showStop && (
{/* Stop button — only during active processing (not waiting_input/waiting_approval) */}
{showStop && disabled && (
<button
className="h-11 w-11 rounded-lg flex items-center justify-center bg-error text-white hover:opacity-90 transition-all shrink-0"
onClick={onStop}
Expand Down
Loading
Loading