Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 108 additions & 16 deletions src-tauri/src/services/chat_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,42 @@ fn needs_visual_guide(content: &str) -> bool {
VISUAL_KEYWORDS.iter().any(|kw| lower.contains(kw))
}

#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct TokenUsage {
pub prompt_tokens: u32,
pub completion_tokens: u32,
pub total_tokens: u32,
}

#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
struct ChatUsageEvent {
session_id: String,
usage: TokenUsage,
}

#[derive(Debug, Default)]
struct UsageAccumulator {
prompt_tokens: u32,
completion_tokens: u32,
}

impl UsageAccumulator {
fn finish(self) -> Option<TokenUsage> {
let total = self.prompt_tokens + self.completion_tokens;
if total == 0 {
None
} else {
Some(TokenUsage {
prompt_tokens: self.prompt_tokens,
completion_tokens: self.completion_tokens,
total_tokens: total,
})
}
}
}

pub async fn get_messages(db: &SqlitePool, session_id: &str) -> AppResult<Vec<Message>> {
let messages = sqlx::query_as::<_, Message>(
"SELECT id, session_id, role, content, created_at FROM messages \
Expand Down Expand Up @@ -280,7 +316,7 @@ async fn send_message_inner(
if provider.uses_anthropic_format() { "anthropic" } else { "openai" },
);

let assistant_output = if provider.uses_anthropic_format() {
let (assistant_output, token_usage) = if provider.uses_anthropic_format() {
send_anthropic(
history,
model,
Expand All @@ -292,17 +328,29 @@ async fn send_message_inner(
)
.await?
} else {
let supports_stream_usage = provider.provider_type == "openai";
send_openai_compatible(
&provider.base_url,
model,
provider.api_key.as_deref(),
supports_stream_usage,
history,
&on_token,
&cancel_token,
)
.await?
};

if let Some(usage) = token_usage {
let _ = app_handle.emit(
"chat-usage",
ChatUsageEvent {
session_id: session_id.to_string(),
usage,
},
);
}

let assistant_message = Message {
id: Uuid::new_v4().to_string(),
session_id: session_id.to_string(),
Expand Down Expand Up @@ -330,10 +378,11 @@ async fn send_openai_compatible(
base_url: &str,
model: &str,
api_key: Option<&str>,
include_usage: bool,
history: Vec<Message>,
on_token: &Channel<String>,
cancel_token: &CancellationToken,
) -> AppResult<String> {
) -> AppResult<(String, Option<TokenUsage>)> {
let client = http_client::streaming_client()?;
let endpoint = format!("{}/chat/completions", base_url.trim_end_matches('/'));

Expand All @@ -342,7 +391,7 @@ async fn send_openai_compatible(
.map(|m| serde_json::json!({ "role": m.role, "content": m.content }))
.collect();

let payload = serde_json::json!({
let mut payload = serde_json::json!({
"model": model,
"messages": messages,
"temperature": 0.2,
Expand All @@ -351,6 +400,10 @@ async fn send_openai_compatible(
"stream": true,
});

if include_usage {
payload["stream_options"] = serde_json::json!({ "include_usage": true });
}

// Lazy system prompt: only inject full preview guide when user asks for visuals
let last_user_content = history.iter().rev().find(|m| m.role == "user").map(|m| m.content.as_str()).unwrap_or("");
let system_instructions = if needs_visual_guide(last_user_content) {
Expand Down Expand Up @@ -412,7 +465,7 @@ async fn send_anthropic(
base_url: &str,
on_token: &Channel<String>,
cancel_token: &CancellationToken,
) -> AppResult<String> {
) -> AppResult<(String, Option<TokenUsage>)> {
let client = http_client::streaming_client()?;

let (system_msgs, chat_msgs): (Vec<_>, Vec<_>) =
Expand Down Expand Up @@ -531,7 +584,7 @@ async fn send_anthropic(
return Err(AppError::Http(format!("Anthropic {status}: {body}")));
}

let output = stream_anthropic_sse(response, on_token, cancel_token).await?;
let (output, usage) = stream_anthropic_sse(response, on_token, cancel_token).await?;

// Fallback: some gateways return message_start → message_stop without any
// content_block events for certain models. Retry non-streaming.
Expand Down Expand Up @@ -569,23 +622,24 @@ async fn send_anthropic(
.and_then(Value::as_str)
{
let _ = on_token.send(text.to_string());
return Ok(text.to_string());
return Ok((text.to_string(), usage));
}

return Ok(String::new());
return Ok((String::new(), usage));
}

Ok(output)
Ok((output, usage))
}

async fn stream_openai_sse(
response: reqwest::Response,
on_token: &Channel<String>,
cancel_token: &CancellationToken,
) -> AppResult<String> {
) -> AppResult<(String, Option<TokenUsage>)> {
let mut stream = response.bytes_stream();
let mut line_buffer = String::new();
let mut output = String::new();
let mut usage = UsageAccumulator::default();

loop {
tokio::select! {
Expand All @@ -604,8 +658,8 @@ async fn stream_openai_sse(
line.pop();
}

if parse_openai_sse_line(&line, on_token, &mut output)? {
return Ok(output);
if parse_openai_sse_line(&line, on_token, &mut output, &mut usage)? {
return Ok((output, usage.finish()));
}
}
}
Expand All @@ -617,16 +671,17 @@ async fn stream_openai_sse(
}

if !line_buffer.is_empty() {
parse_openai_sse_line(&line_buffer, on_token, &mut output)?;
parse_openai_sse_line(&line_buffer, on_token, &mut output, &mut usage)?;
}

Ok(output)
Ok((output, usage.finish()))
}

fn parse_openai_sse_line(
line: &str,
on_token: &Channel<String>,
output: &mut String,
usage: &mut UsageAccumulator,
) -> AppResult<bool> {
let trimmed = line.trim();
if trimmed.is_empty() {
Expand All @@ -642,6 +697,16 @@ fn parse_openai_sse_line(
}

let value: Value = serde_json::from_str(payload)?;

if let Some(u) = value.get("usage") {
if let Some(pt) = u.get("prompt_tokens").and_then(Value::as_u64) {
usage.prompt_tokens = pt as u32;
}
if let Some(ct) = u.get("completion_tokens").and_then(Value::as_u64) {
usage.completion_tokens = ct as u32;
}
}

if let Some(token) = value
.get("choices")
.and_then(Value::as_array)
Expand All @@ -661,14 +726,15 @@ async fn stream_anthropic_sse(
response: reqwest::Response,
on_token: &Channel<String>,
cancel_token: &CancellationToken,
) -> AppResult<String> {
) -> AppResult<(String, Option<TokenUsage>)> {
let mut stream = response.bytes_stream();
let mut line_buffer = String::new();
let mut output = String::new();
// Track the most recent `event:` line so we can use it when parsing the
// subsequent `data:` line. Some gateways omit the `"type"` field from
// the JSON payload, so we fall back to the SSE event name.
let mut current_event = String::new();
let mut usage = UsageAccumulator::default();
let mut message_stop_received = false;

'outer: loop {
Expand All @@ -688,7 +754,13 @@ async fn stream_anthropic_sse(
line.pop();
}

if parse_anthropic_sse_line(&line, &mut current_event, on_token, &mut output)? {
if parse_anthropic_sse_line(
&line,
&mut current_event,
on_token,
&mut output,
&mut usage,
)? {
message_stop_received = true;
break 'outer;
}
Expand All @@ -707,7 +779,7 @@ async fn stream_anthropic_sse(
));
}

Ok(output)
Ok((output, usage.finish()))
}

/// Parse a single SSE line from an Anthropic-format stream.
Expand All @@ -720,6 +792,7 @@ fn parse_anthropic_sse_line(
current_event: &mut String,
on_token: &Channel<String>,
output: &mut String,
usage: &mut UsageAccumulator,
) -> AppResult<bool> {
let trimmed = line.trim();
if trimmed.is_empty() {
Expand Down Expand Up @@ -752,6 +825,25 @@ fn parse_anthropic_sse_line(
.unwrap_or(current_event.as_str());

match event_type {
"message_start" => {
if let Some(pt) = value
.get("message")
.and_then(|m| m.get("usage"))
.and_then(|u| u.get("input_tokens"))
.and_then(Value::as_u64)
{
usage.prompt_tokens = pt as u32;
}
}
"message_delta" => {
if let Some(ct) = value
.get("usage")
.and_then(|u| u.get("output_tokens"))
.and_then(Value::as_u64)
{
usage.completion_tokens = ct as u32;
}
}
"content_block_delta" => {
if let Some(token) = value
.get("delta")
Expand Down
10 changes: 8 additions & 2 deletions src/components/layout/AppShell.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import { useUIStore } from '@/stores/useUIStore';
import { useAgentStore } from '@/stores/useAgentStore';
import { SettingsModal } from '@/components/settings/SettingsModal';
import { ExcalidrawCanvas } from '@/components/canvas/ExcalidrawCanvas';
import { AgentConfig, AgentRunWithTools, AgentType, Message, PermissionRequest, Project, Provider, ProviderModelConfig, Session, ToolCall } from '@/types';
import { AgentConfig, AgentRunWithTools, AgentType, ChatUsageEvent, Message, PermissionRequest, Project, Provider, ProviderModelConfig, Session, ToolCall } from '@/types';
import { cn } from '@/lib/utils';

export const AppShell: React.FC = () => {
const { addMessage, appendStreamToken, setStreaming, clearStreaming, setMessages } = useChatStore();
const { addMessage, appendStreamToken, setStreaming, clearStreaming, setMessages, addTokenUsage } = useChatStore();
const setProjects = useProjectStore((s) => s.setProjects);
const addProject = useProjectStore((s) => s.addProject);
const setActiveProjectId = useProjectStore((s) => s.setActiveProjectId);
Expand Down Expand Up @@ -182,6 +182,10 @@ export const AppShell: React.FC = () => {
clearStreaming();
});

const unlistenChatUsage = await listen<ChatUsageEvent>('chat-usage', (event) => {
addTokenUsage(event.payload.sessionId, event.payload.usage);
});

const unlistenAgentStarted = await listen<{
agentRunId: string;
agentType: string;
Expand Down Expand Up @@ -326,6 +330,7 @@ export const AppShell: React.FC = () => {
localUnlisten.push(
unlistenChatDone,
unlistenChatError,
unlistenChatUsage,
unlistenAgentStarted,
unlistenAgentToken,
unlistenAgentToolCall,
Expand All @@ -348,6 +353,7 @@ export const AppShell: React.FC = () => {
};
}, [
clearStreaming,
addTokenUsage,
addAgentRun,
appendAgentToken,
flushThinkingBlock,
Expand Down
13 changes: 13 additions & 0 deletions src/components/layout/ChatHeader.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@ export const ChatHeader: React.FC<ChatHeaderProps> = ({ onToggleLeftSidebar }) =
const mainView = useUIStore((s) => s.mainView);
const setMainView = useUIStore((s) => s.setMainView);
const { activeProjectId } = useProjectStore();
const sessionUsage = useChatStore((s) => s.sessionUsage);

const { theme, toggleTheme } = useUIStore();
const activeSession = sessions.find(s => s.id === activeSessionId);
const currentUsage = activeSessionId ? sessionUsage[activeSessionId] : undefined;

const formatTokens = (n: number) =>
n >= 1000 ? `${(n / 1000).toFixed(1)}k` : String(n);

const [isRenaming, setIsRenaming] = useState(false);
const [renameValue, setRenameValue] = useState('');
Expand Down Expand Up @@ -165,6 +170,14 @@ export const ChatHeader: React.FC<ChatHeaderProps> = ({ onToggleLeftSidebar }) =
<PencilSimple size={14} />
</button>
)}
{currentUsage && (
<span
className="text-[10px] font-medium px-1.5 py-0.5 rounded-md bg-[var(--surface-2)] text-[var(--text-muted)] border border-[var(--border)] whitespace-nowrap"
title={`↑ ${formatTokens(currentUsage.promptTokens)} prompt · ↓ ${formatTokens(currentUsage.completionTokens)} completion`}
>
{formatTokens(currentUsage.totalTokens)} tokens
</span>
)}
</>
)}
</div>
Expand Down
Loading
Loading