diff --git a/messages/en/quota.json b/messages/en/quota.json index 3d4e4cc40..124ab271c 100644 --- a/messages/en/quota.json +++ b/messages/en/quota.json @@ -153,6 +153,9 @@ "label": "Monthly Cost", "resetAt": "Resets at" }, + "costTotal": { + "label": "Total Cost" + }, "concurrentSessions": { "label": "Concurrent Sessions" }, @@ -183,6 +186,7 @@ "costDaily": "Daily Quota", "costWeekly": "Weekly Quota", "costMonthly": "Monthly Quota", + "costTotal": "Total Quota", "concurrentSessions": "Concurrent Limit", "status": "Status", "actions": "Actions" @@ -235,6 +239,11 @@ "placeholder": "Unlimited", "current": "Current usage: {currency}{current} / {currency}{limit}" }, + "limitTotalUsd": { + "label": "Total Quota (USD)", + "placeholder": "Unlimited", + "current": "Current usage: {currency}{current} / {currency}{limit}" + }, "concurrentSessions": { "label": "Concurrent Session Quota", "placeholder": "0 = Unlimited", diff --git a/messages/ja/quota.json b/messages/ja/quota.json index 2a09ce1e8..bb6c51510 100644 --- a/messages/ja/quota.json +++ b/messages/ja/quota.json @@ -130,6 +130,9 @@ "label": "月次コスト", "resetAt": "リセット時刻" }, + "costTotal": { + "label": "総コスト" + }, "concurrentSessions": { "label": "同時セッション" }, @@ -160,6 +163,7 @@ "costDaily": "日次クォータ", "costWeekly": "週次クォータ", "costMonthly": "月次クォータ", + "costTotal": "総クォータ", "concurrentSessions": "同時制限", "status": "ステータス", "actions": "アクション" @@ -212,6 +216,11 @@ "placeholder": "無制限", "current": "現在使用: {currency}{current} / {currency}{limit}" }, + "limitTotalUsd": { + "label": "総クォータ (USD)", + "placeholder": "無制限", + "current": "現在使用: {currency}{current} / {currency}{limit}" + }, "concurrentSessions": { "label": "同時セッションクォータ", "placeholder": "0 = 無制限", diff --git a/messages/ru/quota.json b/messages/ru/quota.json index 6b23d09c7..0ebe777cc 100644 --- a/messages/ru/quota.json +++ b/messages/ru/quota.json @@ -153,6 +153,9 @@ "label": "Ежемесячные расходы", "resetAt": "Сброс в" }, + "costTotal": { + "label": "Общие расходы" + }, "concurrentSessions": { "label": "Параллельные сессии" }, @@ -183,6 +186,7 @@ "costDaily": "Дневная квота", "costWeekly": "Еженедельная квота", "costMonthly": "Ежемесячная квота", + "costTotal": "Общая квота", "concurrentSessions": "Лимит параллельных", "status": "Статус", "actions": "Действия" @@ -235,6 +239,11 @@ "placeholder": "Неограниченно", "current": "Использовано: {currency}{current} из {currency}{limit}" }, + "limitTotalUsd": { + "label": "Общая квота (USD)", + "placeholder": "Неограниченно", + "current": "Использовано: {currency}{current} из {currency}{limit}" + }, "concurrentSessions": { "label": "Квота параллельных сессий", "placeholder": "0 = без ограничений", diff --git a/messages/zh-CN/quota.json b/messages/zh-CN/quota.json index 52227fa68..63918d285 100644 --- a/messages/zh-CN/quota.json +++ b/messages/zh-CN/quota.json @@ -153,6 +153,9 @@ "label": "月消费", "resetAt": "重置于" }, + "costTotal": { + "label": "总消费" + }, "concurrentSessions": { "label": "并发 Session" }, @@ -183,6 +186,7 @@ "costDaily": "每日限额", "costWeekly": "周限额", "costMonthly": "月限额", + "costTotal": "总限额", "concurrentSessions": "并发限制", "status": "状态", "actions": "操作" @@ -235,6 +239,11 @@ "placeholder": "不限制", "current": "当前已用: {currency}{current} / {currency}{limit}" }, + "limitTotalUsd": { + "label": "总限额(USD)", + "placeholder": "不限制", + "current": "当前已用: {currency}{current} / {currency}{limit}" + }, "concurrentSessions": { "label": "并发 Session 限额", "placeholder": "0 = 不限制", diff --git a/messages/zh-TW/quota.json b/messages/zh-TW/quota.json index acac4bc86..bfa7a99fc 100644 --- a/messages/zh-TW/quota.json +++ b/messages/zh-TW/quota.json @@ -128,6 +128,9 @@ "label": "月消費", "resetAt": "重置於" }, + "costTotal": { + "label": "總消費" + }, "concurrentSessions": { "label": "並發 Session" }, @@ -158,6 +161,7 @@ "costDaily": "每日限額", "costWeekly": "周限額", "costMonthly": "月限額", + "costTotal": "總限額", "concurrentSessions": "並發限制", "status": "狀態", "actions": "操作" @@ -210,6 +214,11 @@ "placeholder": "不限制", "current": "當前已用: {currency}{current} / {currency}{limit}" }, + "limitTotalUsd": { + "label": "總限額 (USD)", + "placeholder": "不限制", + "current": "當前已用: {currency}{current} / {currency}{limit}" + }, "concurrentSessions": { "label": "並發 Session 限額", "placeholder": "0 = 不限制", diff --git a/src/actions/key-quota.ts b/src/actions/key-quota.ts index 3a13bfd39..4578bc30e 100644 --- a/src/actions/key-quota.ts +++ b/src/actions/key-quota.ts @@ -21,6 +21,7 @@ export interface KeyQuotaItem { limit: number | null; mode?: "fixed" | "rolling"; time?: string; + resetAt?: Date; } export interface KeyQuotaUsageResult { @@ -164,6 +165,7 @@ export async function getKeyQuotaUsage(keyId: number): Promise > { @@ -958,6 +958,7 @@ export async function getKeyLimitUsage(keyId: number): Promise< costTotal: { current: totalCost, limit: key.limitTotalUsd ?? null, + resetAt: costResetAt ?? undefined, }, concurrentSessions: { current: concurrentSessions, diff --git a/src/actions/providers.ts b/src/actions/providers.ts index 918a694a8..4a41bdc52 100644 --- a/src/actions/providers.ts +++ b/src/actions/providers.ts @@ -327,6 +327,7 @@ export async function getProviders(): Promise { limitWeeklyUsd: provider.limitWeeklyUsd, limitMonthlyUsd: provider.limitMonthlyUsd, limitTotalUsd: provider.limitTotalUsd, + totalCostResetAt: provider.totalCostResetAt, limitConcurrentSessions: provider.limitConcurrentSessions, maxRetryAttempts: provider.maxRetryAttempts, circuitBreakerFailureThreshold: provider.circuitBreakerFailureThreshold, @@ -1259,6 +1260,15 @@ export async function resetProviderTotalUsage(providerId: number): Promise > { @@ -2713,7 +2724,9 @@ export async function getProviderLimitUsage(providerId: number): Promise< getTimeRangeForPeriodWithMode, } = await import("@/lib/rate-limit/time-utils"); const { RateLimitService } = await import("@/lib/rate-limit"); - const { sumProviderCostInTimeRange } = await import("@/repository/statistics"); + const { sumProviderCostInTimeRange, sumProviderTotalCost } = await import( + "@/repository/statistics" + ); const limit5hResetMode = provider.limit5hResetMode ?? "rolling"; // 计算各周期的时间范围 @@ -2732,15 +2745,23 @@ export async function getProviderLimitUsage(providerId: number): Promise< ]); // 获取金额消费(直接查询数据库,确保配额显示与 DB 一致) - const [cost5h, costDaily, costWeekly, costMonthly, concurrentSessions] = await Promise.all([ - limit5hResetMode === "fixed" - ? RateLimitService.getCurrentCost(providerId, "provider", "5h", undefined, limit5hResetMode) - : sumProviderCostInTimeRange(providerId, range5h.startTime, range5h.endTime), - sumProviderCostInTimeRange(providerId, rangeDaily.startTime, rangeDaily.endTime), - sumProviderCostInTimeRange(providerId, rangeWeekly.startTime, rangeWeekly.endTime), - sumProviderCostInTimeRange(providerId, rangeMonthly.startTime, rangeMonthly.endTime), - SessionTracker.getProviderSessionCount(providerId), - ]); + const [cost5h, costDaily, costWeekly, costMonthly, totalCost, concurrentSessions] = + await Promise.all([ + limit5hResetMode === "fixed" + ? RateLimitService.getCurrentCost( + providerId, + "provider", + "5h", + undefined, + limit5hResetMode + ) + : sumProviderCostInTimeRange(providerId, range5h.startTime, range5h.endTime), + sumProviderCostInTimeRange(providerId, rangeDaily.startTime, rangeDaily.endTime), + sumProviderCostInTimeRange(providerId, rangeWeekly.startTime, rangeWeekly.endTime), + sumProviderCostInTimeRange(providerId, rangeMonthly.startTime, rangeMonthly.endTime), + sumProviderTotalCost(providerId, provider.totalCostResetAt), + SessionTracker.getProviderSessionCount(providerId), + ]); // 获取重置时间信息 const resetDaily = await getResetInfoWithMode( @@ -2779,6 +2800,11 @@ export async function getProviderLimitUsage(providerId: number): Promise< limit: provider.limitMonthlyUsd, resetAt: resetMonthly.resetAt!, }, + limitTotalUsd: { + current: totalCost, + limit: provider.limitTotalUsd ?? null, + resetAt: provider.totalCostResetAt ?? undefined, + }, concurrentSessions: { current: concurrentSessions, limit: provider.limitConcurrentSessions || 0, @@ -2800,6 +2826,7 @@ export type ProviderLimitUsageData = { costDaily: { current: number; limit: number | null; resetAt?: Date }; costWeekly: { current: number; limit: number | null; resetAt: Date }; costMonthly: { current: number; limit: number | null; resetAt: Date }; + limitTotalUsd: { current: number; limit: number | null; resetAt?: Date }; concurrentSessions: { current: number; limit: number }; }; @@ -2820,6 +2847,8 @@ export async function getProviderLimitUsageBatch( limitDailyUsd?: number | null; limitWeeklyUsd?: number | null; limitMonthlyUsd?: number | null; + limitTotalUsd?: number | null; + totalCostResetAt?: Date | null; limitConcurrentSessions?: number | null; }> ): Promise> { @@ -2845,7 +2874,9 @@ export async function getProviderLimitUsageBatch( getTimeRangeForPeriodWithMode, } = await import("@/lib/rate-limit/time-utils"); const { RateLimitService } = await import("@/lib/rate-limit"); - const { sumProviderCostInTimeRange } = await import("@/repository/statistics"); + const { sumProviderCostInTimeRange, sumProviderTotalCost } = await import( + "@/repository/statistics" + ); const providerIds = providers.map((p) => p.id); @@ -2871,7 +2902,7 @@ export async function getProviderLimitUsageBatch( ); // 并行查询该供应商的各周期消费(直接查询数据库) - const [cost5h, resetAt5h, costDaily, costWeekly, costMonthly] = await Promise.all([ + const [cost5h, resetAt5h, costDaily, costWeekly, costMonthly, totalCost] = await Promise.all([ limit5hResetMode === "fixed" ? RateLimitService.getCurrentCost( provider.id, @@ -2887,6 +2918,7 @@ export async function getProviderLimitUsageBatch( sumProviderCostInTimeRange(provider.id, rangeDaily.startTime, rangeDaily.endTime), sumProviderCostInTimeRange(provider.id, rangeWeekly.startTime, rangeWeekly.endTime), sumProviderCostInTimeRange(provider.id, rangeMonthly.startTime, rangeMonthly.endTime), + sumProviderTotalCost(provider.id, provider.totalCostResetAt ?? null), ]); const sessionCount = sessionCountMap.get(provider.id) || 0; @@ -2926,6 +2958,11 @@ export async function getProviderLimitUsageBatch( limit: provider.limitMonthlyUsd ?? null, resetAt: resetMonthly.resetAt!, }, + limitTotalUsd: { + current: totalCost, + limit: provider.limitTotalUsd ?? null, + resetAt: provider.totalCostResetAt ?? undefined, + }, concurrentSessions: { current: sessionCount, limit: provider.limitConcurrentSessions || 0, diff --git a/src/app/[locale]/dashboard/_components/dashboard-header.tsx b/src/app/[locale]/dashboard/_components/dashboard-header.tsx index 55d13cbed..b9caf6e16 100644 --- a/src/app/[locale]/dashboard/_components/dashboard-header.tsx +++ b/src/app/[locale]/dashboard/_components/dashboard-header.tsx @@ -1,4 +1,4 @@ -import { useTranslations } from "next-intl"; +import { getTranslations } from "next-intl/server"; import { VersionUpdateNotifier } from "@/components/customs/version-update-notifier"; import { Button } from "@/components/ui/button"; import { LanguageSwitcher } from "@/components/ui/language-switcher"; @@ -11,10 +11,11 @@ import { UserMenu } from "./user-menu"; interface DashboardHeaderProps { session: AuthSession | null; + locale: string; } -export function DashboardHeader({ session }: DashboardHeaderProps) { - const t = useTranslations("dashboard.nav"); +export async function DashboardHeader({ session, locale }: DashboardHeaderProps) { + const t = await getTranslations({ locale, namespace: "dashboard.nav" }); const isAdmin = session?.user.role === "admin"; const NAV_ITEMS: (DashboardNavItem & { adminOnly?: boolean })[] = [ diff --git a/src/app/[locale]/dashboard/_components/dashboard-sections.tsx b/src/app/[locale]/dashboard/_components/dashboard-sections.tsx index 8f514949a..8c29560cc 100644 --- a/src/app/[locale]/dashboard/_components/dashboard-sections.tsx +++ b/src/app/[locale]/dashboard/_components/dashboard-sections.tsx @@ -32,7 +32,13 @@ export async function DashboardStatisticsSection() { ); } -export async function DashboardLeaderboardSection({ isAdmin }: { isAdmin: boolean }) { +export async function DashboardLeaderboardSection({ + isAdmin, + locale, +}: { + isAdmin: boolean; + locale: string; +}) { const systemSettings = await getCachedSystemSettings(); const canViewLeaderboard = isAdmin || systemSettings.allowGlobalUsageView; @@ -40,7 +46,7 @@ export async function DashboardLeaderboardSection({ isAdmin }: { isAdmin: boolea return null; } - const t = await getTranslations("dashboard"); + const t = await getTranslations({ locale, namespace: "dashboard" }); return (
diff --git a/src/app/[locale]/dashboard/_components/user/key-limit-usage.tsx b/src/app/[locale]/dashboard/_components/user/key-limit-usage.tsx index fe343bc3b..22af74c87 100644 --- a/src/app/[locale]/dashboard/_components/user/key-limit-usage.tsx +++ b/src/app/[locale]/dashboard/_components/user/key-limit-usage.tsx @@ -18,7 +18,7 @@ interface LimitUsageData { costDaily: { current: number; limit: number | null }; costWeekly: { current: number; limit: number | null }; costMonthly: { current: number; limit: number | null }; - costTotal: { current: number; limit: number | null }; + costTotal: { current: number; limit: number | null; resetAt?: Date }; concurrentSessions: { current: number; limit: number }; } diff --git a/src/app/[locale]/dashboard/audit-logs/page.tsx b/src/app/[locale]/dashboard/audit-logs/page.tsx index 8ede2bbc3..2f41d9af8 100644 --- a/src/app/[locale]/dashboard/audit-logs/page.tsx +++ b/src/app/[locale]/dashboard/audit-logs/page.tsx @@ -17,7 +17,7 @@ export default async function AuditLogsPage({ params }: { params: Promise<{ loca return redirect({ href: "/dashboard", locale }); } - const t = await getTranslations("auditLogs"); + const t = await getTranslations({ locale, namespace: "auditLogs" }); return (
diff --git a/src/app/[locale]/dashboard/availability/page.tsx b/src/app/[locale]/dashboard/availability/page.tsx index c00f17210..26801cbd8 100644 --- a/src/app/[locale]/dashboard/availability/page.tsx +++ b/src/app/[locale]/dashboard/availability/page.tsx @@ -10,8 +10,13 @@ import { AvailabilityDashboardSkeleton } from "./_components/availability-skelet export const dynamic = "force-dynamic"; -export default async function AvailabilityPage() { - const t = await getTranslations("dashboard"); +export default async function AvailabilityPage({ + params, +}: { + params: Promise<{ locale: string }>; +}) { + const { locale } = await params; + const t = await getTranslations({ locale, namespace: "dashboard" }); const session = await getSession(); // Only admin can access availability monitoring diff --git a/src/app/[locale]/dashboard/layout.tsx b/src/app/[locale]/dashboard/layout.tsx index 2fe637f75..3346f8995 100644 --- a/src/app/[locale]/dashboard/layout.tsx +++ b/src/app/[locale]/dashboard/layout.tsx @@ -28,7 +28,7 @@ export default async function DashboardLayout({ return (
- + {children}
diff --git a/src/app/[locale]/dashboard/leaderboard/page.tsx b/src/app/[locale]/dashboard/leaderboard/page.tsx index 97d518fc6..83288b6d5 100644 --- a/src/app/[locale]/dashboard/leaderboard/page.tsx +++ b/src/app/[locale]/dashboard/leaderboard/page.tsx @@ -10,8 +10,9 @@ import { LeaderboardView } from "./_components/leaderboard-view"; export const dynamic = "force-dynamic"; -export default async function LeaderboardPage() { - const t = await getTranslations("dashboard"); +export default async function LeaderboardPage({ params }: { params: Promise<{ locale: string }> }) { + const { locale } = await params; + const t = await getTranslations({ locale, namespace: "dashboard" }); // 获取用户 session 和系统设置 const session = await getSession(); const systemSettings = await getSystemSettings(); diff --git a/src/app/[locale]/dashboard/my-quota/page.tsx b/src/app/[locale]/dashboard/my-quota/page.tsx index 19d9d2f0d..d6d3d9e9b 100644 --- a/src/app/[locale]/dashboard/my-quota/page.tsx +++ b/src/app/[locale]/dashboard/my-quota/page.tsx @@ -9,13 +9,13 @@ export const dynamic = "force-dynamic"; export default async function MyQuotaPage({ params }: { params: Promise<{ locale: string }> }) { // Await params to ensure locale is available in the async context - await params; + const { locale } = await params; const [quotaResult, systemSettings, tNav, tCommon] = await Promise.all([ getMyQuota(), getSystemSettings(), - getTranslations("dashboard.nav"), - getTranslations("common"), + getTranslations({ locale, namespace: "dashboard.nav" }), + getTranslations({ locale, namespace: "common" }), ]); // Handle error state diff --git a/src/app/[locale]/dashboard/providers/page.tsx b/src/app/[locale]/dashboard/providers/page.tsx index 2e6a7285a..f287b8b12 100644 --- a/src/app/[locale]/dashboard/providers/page.tsx +++ b/src/app/[locale]/dashboard/providers/page.tsx @@ -30,7 +30,7 @@ export default async function DashboardProvidersPage({ // TypeScript: session is guaranteed to be non-null after the redirect check const currentUser = session!.user; - const t = await getTranslations("settings"); + const t = await getTranslations({ locale, namespace: "settings" }); const providers = await getProviders(); return ( diff --git a/src/app/[locale]/dashboard/quotas/keys/_components/edit-key-quota-dialog.tsx b/src/app/[locale]/dashboard/quotas/keys/_components/edit-key-quota-dialog.tsx index b7ab7e873..aa76db359 100644 --- a/src/app/[locale]/dashboard/quotas/keys/_components/edit-key-quota-dialog.tsx +++ b/src/app/[locale]/dashboard/quotas/keys/_components/edit-key-quota-dialog.tsx @@ -32,6 +32,7 @@ interface KeyQuota { costDaily: { current: number; limit: number | null; resetAt?: Date }; costWeekly: { current: number; limit: number | null }; costMonthly: { current: number; limit: number | null }; + costTotal: { current: number; limit: number | null; resetAt?: Date }; concurrentSessions: { current: number; limit: number }; } @@ -79,6 +80,9 @@ export function EditKeyQuotaDialog({ const [limitMonthly, setLimitMonthly] = useState( currentQuota?.costMonthly.limit?.toString() ?? "" ); + const [limitTotal, setLimitTotal] = useState( + currentQuota?.costTotal.limit?.toString() ?? "" + ); const [limitConcurrent, setLimitConcurrent] = useState( currentQuota?.concurrentSessions.limit?.toString() ?? "0" ); @@ -98,6 +102,7 @@ export function EditKeyQuotaDialog({ dailyResetTime: resetTime, limitWeeklyUsd: limitWeekly ? parseFloat(limitWeekly) : null, limitMonthlyUsd: limitMonthly ? parseFloat(limitMonthly) : null, + limitTotalUsd: limitTotal ? parseFloat(limitTotal) : null, limitConcurrentSessions: limitConcurrent ? parseInt(limitConcurrent, 10) : 0, }); @@ -127,6 +132,7 @@ export function EditKeyQuotaDialog({ dailyResetTime: resetTime, limitWeeklyUsd: null, limitMonthlyUsd: null, + limitTotalUsd: null, limitConcurrentSessions: 0, }); @@ -335,6 +341,32 @@ export function EditKeyQuotaDialog({ )}
+ {/* 总限额 */} +
+ + setLimitTotal(e.target.value)} + className="h-9" + /> + {currentQuota?.costTotal.limit && ( +

+ {t("limitTotalUsd.current", { + currency: currencySymbol, + current: Number(currentQuota.costTotal.current).toFixed(4), + limit: Number(currentQuota.costTotal.limit).toFixed(2), + })} +

+ )} +
+ {/* 并发限额 */}
); } -async function ProvidersQuotaContent() { +async function ProvidersQuotaContent({ locale }: { locale: string }) { const [providers, systemSettings] = await Promise.all([ getProvidersWithQuotas(), getSystemSettings(), ]); - const t = await getTranslations("quota.providers"); + const t = await getTranslations({ locale, namespace: "quota.providers" }); return (
diff --git a/src/app/[locale]/dashboard/quotas/users/page.tsx b/src/app/[locale]/dashboard/quotas/users/page.tsx index 3f68ff410..be1ac1f26 100644 --- a/src/app/[locale]/dashboard/quotas/users/page.tsx +++ b/src/app/[locale]/dashboard/quotas/users/page.tsx @@ -129,7 +129,7 @@ export default async function UsersQuotaPage({ params }: { params: Promise<{ loc return redirect({ href: session ? "/dashboard/my-quota" : "/login", locale }); } - const t = await getTranslations("quota.users"); + const t = await getTranslations({ locale, namespace: "quota.users" }); return (
@@ -162,15 +162,15 @@ export default async function UsersQuotaPage({ params }: { params: Promise<{ loc /> }> - +
); } -async function UsersQuotaContent() { +async function UsersQuotaContent({ locale }: { locale: string }) { const [users, systemSettings] = await Promise.all([getUsersWithQuotas(), getSystemSettings()]); - const t = await getTranslations("quota.users"); + const t = await getTranslations({ locale, namespace: "quota.users" }); return (
diff --git a/src/app/[locale]/dashboard/rate-limits/page.tsx b/src/app/[locale]/dashboard/rate-limits/page.tsx index f7bf99e86..18ef80815 100644 --- a/src/app/[locale]/dashboard/rate-limits/page.tsx +++ b/src/app/[locale]/dashboard/rate-limits/page.tsx @@ -17,7 +17,7 @@ export default async function RateLimitsPage({ params }: { params: Promise<{ loc return redirect({ href: "/dashboard", locale }); } - const t = await getTranslations("dashboard.rateLimits"); + const t = await getTranslations({ locale, namespace: "dashboard.rateLimits" }); return (
diff --git a/src/app/[locale]/settings/_lib/nav-items.ts b/src/app/[locale]/settings/_lib/nav-items.ts index 974bb2b39..7171cf634 100644 --- a/src/app/[locale]/settings/_lib/nav-items.ts +++ b/src/app/[locale]/settings/_lib/nav-items.ts @@ -108,8 +108,8 @@ export const SETTINGS_NAV_ITEMS: SettingsNavItem[] = [ ]; // Helper function to get translated nav items -export async function getTranslatedNavItems(): Promise { - const t = await getTranslations("settings"); +export async function getTranslatedNavItems(locale: string): Promise { + const t = await getTranslations({ locale, namespace: "settings" }); return SETTINGS_NAV_ITEMS.map((item) => ({ ...item, label: item.labelKey ? t(item.labelKey) : item.label, diff --git a/src/app/[locale]/settings/client-versions/page.tsx b/src/app/[locale]/settings/client-versions/page.tsx index 3677b52c6..10e9c6e34 100644 --- a/src/app/[locale]/settings/client-versions/page.tsx +++ b/src/app/[locale]/settings/client-versions/page.tsx @@ -21,7 +21,7 @@ export default async function ClientVersionsPage({ // Await params to ensure locale is available in the async context const { locale } = await params; - const t = await getTranslations("settings"); + const t = await getTranslations({ locale, namespace: "settings" }); const session = await getSession(); if (!session || session.user.role !== "admin") { @@ -55,7 +55,7 @@ export default async function ClientVersionsPage({ iconColor="text-[#E25706]" > }> - +
@@ -71,8 +71,8 @@ async function ClientVersionsSettingsContent() { return ; } -async function ClientVersionsStatsContent() { - const t = await getTranslations("settings"); +async function ClientVersionsStatsContent({ locale }: { locale: string }) { + const t = await getTranslations({ locale, namespace: "settings" }); const statsResult = await fetchClientVersionStats(); const stats = statsResult.ok ? statsResult.data : []; diff --git a/src/app/[locale]/settings/config/page.tsx b/src/app/[locale]/settings/config/page.tsx index bbe54df4b..d139c43eb 100644 --- a/src/app/[locale]/settings/config/page.tsx +++ b/src/app/[locale]/settings/config/page.tsx @@ -9,8 +9,13 @@ import { SystemSettingsForm } from "./_components/system-settings-form"; export const dynamic = "force-dynamic"; -export default async function SettingsConfigPage() { - const t = await getTranslations("settings"); +export default async function SettingsConfigPage({ + params, +}: { + params: Promise<{ locale: string }>; +}) { + const { locale } = await params; + const t = await getTranslations({ locale, namespace: "settings" }); return ( <> @@ -20,14 +25,14 @@ export default async function SettingsConfigPage() { icon="settings" /> }> - + ); } -async function SettingsConfigContent() { - const t = await getTranslations("settings"); +async function SettingsConfigContent({ locale }: { locale: string }) { + const t = await getTranslations({ locale, namespace: "settings" }); const settings = await getSystemSettings(); return ( diff --git a/src/app/[locale]/settings/error-rules/page.tsx b/src/app/[locale]/settings/error-rules/page.tsx index b749cfa40..ccb295d84 100644 --- a/src/app/[locale]/settings/error-rules/page.tsx +++ b/src/app/[locale]/settings/error-rules/page.tsx @@ -12,8 +12,9 @@ import { RuleListTable } from "./_components/rule-list-table"; export const dynamic = "force-dynamic"; -export default async function ErrorRulesPage() { - const t = await getTranslations("settings"); +export default async function ErrorRulesPage({ params }: { params: Promise<{ locale: string }> }) { + const { locale } = await params; + const t = await getTranslations({ locale, namespace: "settings" }); return ( <> diff --git a/src/app/[locale]/settings/layout.tsx b/src/app/[locale]/settings/layout.tsx index 034e072dd..56369258d 100644 --- a/src/app/[locale]/settings/layout.tsx +++ b/src/app/[locale]/settings/layout.tsx @@ -28,11 +28,11 @@ export default async function SettingsLayout({ } // Get translated navigation items - const translatedNavItems = await getTranslatedNavItems(); + const translatedNavItems = await getTranslatedNavItems(locale); return (
- +
{/* Desktop: Grid layout with sidebar */} @@ -42,7 +42,7 @@ export default async function SettingsLayout({ {/* Content area */} -
+
{/* Tablet: Horizontal nav shown above content */}
diff --git a/src/app/[locale]/settings/logs/page.tsx b/src/app/[locale]/settings/logs/page.tsx index cdad92d44..a274ec394 100644 --- a/src/app/[locale]/settings/logs/page.tsx +++ b/src/app/[locale]/settings/logs/page.tsx @@ -5,8 +5,13 @@ import { LogLevelForm } from "./_components/log-level-form"; export const dynamic = "force-dynamic"; -export default async function SettingsLogsPage() { - const t = await getTranslations("settings"); +export default async function SettingsLogsPage({ + params, +}: { + params: Promise<{ locale: string }>; +}) { + const { locale } = await params; + const t = await getTranslations({ locale, namespace: "settings" }); return ( <> diff --git a/src/app/[locale]/settings/prices/_components/price-list.tsx b/src/app/[locale]/settings/prices/_components/price-list.tsx index d043cf62a..071f51a9b 100644 --- a/src/app/[locale]/settings/prices/_components/price-list.tsx +++ b/src/app/[locale]/settings/prices/_components/price-list.tsx @@ -412,8 +412,8 @@ export function PriceList({
{/* 价格表格 */} -
- +
+
diff --git a/src/app/[locale]/settings/prices/page.tsx b/src/app/[locale]/settings/prices/page.tsx index 6ad27dfcc..ad6c4c2f1 100644 --- a/src/app/[locale]/settings/prices/page.tsx +++ b/src/app/[locale]/settings/prices/page.tsx @@ -11,20 +11,29 @@ import { UploadPriceDialog } from "./_components/upload-price-dialog"; export const dynamic = "force-dynamic"; +type SettingsPricesSearchParams = { + required?: string; + page?: string; + pageSize?: string; + size?: string; + search?: string; + source?: string; + litellmProvider?: string; +}; + interface SettingsPricesPageProps { - searchParams: Promise<{ - required?: string; - page?: string; - pageSize?: string; - size?: string; - search?: string; - source?: string; - litellmProvider?: string; + params: Promise<{ + locale: string; }>; + searchParams: Promise; } -export default async function SettingsPricesPage({ searchParams }: SettingsPricesPageProps) { - const t = await getTranslations("settings"); +export default async function SettingsPricesPage({ + params, + searchParams, +}: SettingsPricesPageProps) { + const { locale } = await params; + const t = await getTranslations({ locale, namespace: "settings" }); return ( <> @@ -34,14 +43,20 @@ export default async function SettingsPricesPage({ searchParams }: SettingsPrice icon="dollar-sign" /> }> - + ); } -async function SettingsPricesContent({ searchParams }: SettingsPricesPageProps) { - const t = await getTranslations("settings"); +async function SettingsPricesContent({ + locale, + searchParams, +}: { + locale: string; + searchParams: Promise; +}) { + const t = await getTranslations({ locale, namespace: "settings" }); const params = await searchParams; // 解析分页参数 diff --git a/src/app/[locale]/settings/providers/page.tsx b/src/app/[locale]/settings/providers/page.tsx index 50e29a289..2d406d78b 100644 --- a/src/app/[locale]/settings/providers/page.tsx +++ b/src/app/[locale]/settings/providers/page.tsx @@ -14,8 +14,13 @@ import { SchedulingRulesDialog } from "./_components/scheduling-rules-dialog"; export const dynamic = "force-dynamic"; -export default async function SettingsProvidersPage() { - const t = await getTranslations("settings"); +export default async function SettingsProvidersPage({ + params, +}: { + params: Promise<{ locale: string }>; +}) { + const { locale } = await params; + const t = await getTranslations({ locale, namespace: "settings" }); const session = await getSession(); const providers = await getProviders(); diff --git a/src/app/[locale]/settings/request-filters/page.tsx b/src/app/[locale]/settings/request-filters/page.tsx index f101e005f..cfd27572f 100644 --- a/src/app/[locale]/settings/request-filters/page.tsx +++ b/src/app/[locale]/settings/request-filters/page.tsx @@ -9,8 +9,13 @@ import { RequestFiltersTableSkeleton } from "./_components/request-filters-skele export const dynamic = "force-dynamic"; -export default async function RequestFiltersPage() { - const t = await getTranslations("settings.requestFilters"); +export default async function RequestFiltersPage({ + params, +}: { + params: Promise<{ locale: string }>; +}) { + const { locale } = await params; + const t = await getTranslations({ locale, namespace: "settings.requestFilters" }); return ( <> diff --git a/src/app/[locale]/settings/sensitive-words/page.tsx b/src/app/[locale]/settings/sensitive-words/page.tsx index 89a9a3844..ba80222c1 100644 --- a/src/app/[locale]/settings/sensitive-words/page.tsx +++ b/src/app/[locale]/settings/sensitive-words/page.tsx @@ -11,8 +11,13 @@ import { WordListTable } from "./_components/word-list-table"; export const dynamic = "force-dynamic"; -export default async function SensitiveWordsPage() { - const t = await getTranslations("settings"); +export default async function SensitiveWordsPage({ + params, +}: { + params: Promise<{ locale: string }>; +}) { + const { locale } = await params; + const t = await getTranslations({ locale, namespace: "settings" }); return ( <> diff --git a/src/app/[locale]/settings/status-page/page.tsx b/src/app/[locale]/settings/status-page/page.tsx index 74d71a359..3be0a7bf6 100644 --- a/src/app/[locale]/settings/status-page/page.tsx +++ b/src/app/[locale]/settings/status-page/page.tsx @@ -5,8 +5,13 @@ import { loadStatusPageSettings } from "./loader"; export const dynamic = "force-dynamic"; -export default async function StatusPageSettingsPage() { - const t = await getTranslations("settings"); +export default async function StatusPageSettingsPage({ + params, +}: { + params: Promise<{ locale: string }>; +}) { + const { locale } = await params; + const t = await getTranslations({ locale, namespace: "settings" }); const settings = await loadStatusPageSettings(); return ( diff --git a/src/app/[locale]/status/[slug]/page.tsx b/src/app/[locale]/status/[slug]/page.tsx index 208917520..0ad1e3fb7 100644 --- a/src/app/[locale]/status/[slug]/page.tsx +++ b/src/app/[locale]/status/[slug]/page.tsx @@ -34,7 +34,7 @@ export default async function PublicStatusGroupPage({ params: Promise<{ locale: string; slug: string }>; }) { const { locale, slug } = await params; - const t = await getTranslations("settings"); + const t = await getTranslations({ locale, namespace: "settings" }); const { followServerDefaults, initialPayload, diff --git a/src/app/[locale]/usage-doc/layout.tsx b/src/app/[locale]/usage-doc/layout.tsx index 370ef28e9..d4a89a636 100644 --- a/src/app/[locale]/usage-doc/layout.tsx +++ b/src/app/[locale]/usage-doc/layout.tsx @@ -45,7 +45,7 @@ export default async function UsageDocLayout({
{/* 条件渲染头部:已登录显示 DashboardHeader,未登录显示简化版头部 */} {session ? ( - + ) : (
diff --git a/src/app/v1/_lib/proxy/combine-abort-signals.ts b/src/app/v1/_lib/proxy/combine-abort-signals.ts new file mode 100644 index 000000000..94086b6a4 --- /dev/null +++ b/src/app/v1/_lib/proxy/combine-abort-signals.ts @@ -0,0 +1,55 @@ +/** + * 组合多个 AbortSignal 为单个信号,并返回显式 cleanup。 + * + * 优先使用原生 `AbortSignal.any`(Node.js 20.3+ / V8 内部管理 listener)。 + * 仅在原生不可用(例如 Next.js standalone 覆盖全局 AbortSignal)时使用 polyfill。 + * + * Polyfill 路径必须由调用方在请求生命周期结束时调用 cleanup,否则源信号上的 abort + * listener 会一直持有闭包(包含 combinedController、cleanups 数组及源信号引用), + * 导致 session/请求体无法被 GC——和 #1113 修复的 client abort listener 是同一类泄漏。 + */ +export interface CombinedAbortSignal { + signal: AbortSignal; + cleanup: () => void; +} + +const NOOP_CLEANUP = () => {}; + +export function combineAbortSignals(signals: AbortSignal[]): CombinedAbortSignal { + if ("any" in AbortSignal && typeof AbortSignal.any === "function") { + return { signal: AbortSignal.any(signals), cleanup: NOOP_CLEANUP }; + } + + const combinedController = new AbortController(); + const detachers: Array<() => void> = []; + let cleaned = false; + + const cleanup = () => { + if (cleaned) return; + cleaned = true; + for (const detach of detachers) { + detach(); + } + detachers.length = 0; + }; + + for (const signal of signals) { + if (signal.aborted) { + combinedController.abort(); + cleanup(); + break; + } + + const abortHandler = () => { + combinedController.abort(); + cleanup(); + }; + + signal.addEventListener("abort", abortHandler, { once: true }); + detachers.push(() => { + signal.removeEventListener("abort", abortHandler); + }); + } + + return { signal: combinedController.signal, cleanup }; +} diff --git a/src/app/v1/_lib/proxy/error-handler.ts b/src/app/v1/_lib/proxy/error-handler.ts index c6fe4e2ae..13b4e4d6e 100644 --- a/src/app/v1/_lib/proxy/error-handler.ts +++ b/src/app/v1/_lib/proxy/error-handler.ts @@ -5,6 +5,7 @@ import { isOpenAIErrorFormat, isValidErrorOverrideResponse, } from "@/lib/error-override-validator"; +import { emitProxyLangfuseTrace } from "@/lib/langfuse/emit-proxy-trace"; import { logger } from "@/lib/logger"; import { ProxyStatusTracker } from "@/lib/proxy-status-tracker"; import { sanitizeErrorTextForDetail } from "@/lib/utils/upstream-error-detection"; @@ -34,6 +35,25 @@ function stripUpstreamDetailSuffix(message: string): string { return message.replace(/\s+Upstream detail:\s*[\s\S]*$/u, "").trim() || message; } +function getErrorResponseText(error: unknown): string { + if (!(error instanceof ProxyError)) { + return ""; + } + + // Langfuse trace 用于排查上游故障,按产品预期保留原始上游错误主体。 + return error.upstreamError?.rawBody ?? error.upstreamError?.body ?? ""; +} + +function isRequestStreaming(session: ProxySession): boolean { + const requestUrl = session.requestUrl; + + return ( + session.request?.message?.stream === true || + requestUrl?.pathname.includes("streamGenerateContent") || + requestUrl?.searchParams.get("alt") === "sse" + ); +} + function getGenericProxyErrorFallbackMessage( statusCode: number, error: unknown, @@ -184,6 +204,12 @@ export class ProxyErrorHandler { // 构建详细的 402 响应 const response = ProxyErrorHandler.buildRateLimitResponse(error); + ProxyErrorHandler.emitErrorTrace(session, { + error, + errorMessage: logErrorMessage, + statusCode, + }); + // 记录错误到数据库(包含 rate_limit 元数据) await ProxyErrorHandler.logErrorToDatabase( session, @@ -223,8 +249,35 @@ export class ProxyErrorHandler { } } - // 记录错误到数据库(始终记录详细错误消息,包含供应商名称) - await ProxyErrorHandler.logErrorToDatabase(session, logErrorMessage, statusCode, null); + const finalizeErrorResponse = async ( + response: Response, + traceErrorMessage: string, + options: { traceFinalResponseBody?: boolean } = {} + ) => { + const finalResponse = await attachSessionIdToErrorResponse(session.sessionId, response); + let responseText: string | undefined; + if (options.traceFinalResponseBody) { + try { + responseText = await finalResponse.clone().text(); + } catch { + responseText = undefined; + } + } + ProxyErrorHandler.emitErrorTrace(session, { + error, + errorMessage: traceErrorMessage, + statusCode: finalResponse.status, + responseText, + }); + // 先发出 trace,再写数据库,避免 DB 持久化失败吞掉本次错误诊断。 + await ProxyErrorHandler.logErrorToDatabase( + session, + logErrorMessage, + finalResponse.status, + null + ); + return finalResponse; + }; // 检测是否有覆写配置(响应体或状态码) // 使用异步版本确保错误规则已加载 @@ -271,15 +324,16 @@ export class ProxyErrorHandler { settings, override: { response: null, statusCode: override.statusCode }, }); - return await attachSessionIdToErrorResponse( - session.sessionId, + return await finalizeErrorResponse( ProxyResponses.buildError( responseStatusCode, finalClientErrorMessage, undefined, undefined, safeRequestId - ) + ), + finalClientErrorMessage, + { traceFinalResponseBody: true } ); } // 两者都无效,返回原始错误(但仍透传 request_id,因为有覆写意图) @@ -289,15 +343,16 @@ export class ProxyErrorHandler { settings, override: { response: null, statusCode: null }, }); - return await attachSessionIdToErrorResponse( - session.sessionId, + return await finalizeErrorResponse( ProxyResponses.buildError( statusCode, finalClientErrorMessage, undefined, undefined, safeRequestId - ) + ), + finalClientErrorMessage, + { traceFinalResponseBody: true } ); } @@ -346,12 +401,13 @@ export class ProxyErrorHandler { overridden: true, }); - return await attachSessionIdToErrorResponse( - session.sessionId, + return await finalizeErrorResponse( new Response(JSON.stringify(responseBody), { status: responseStatusCode, headers: { "Content-Type": "application/json" }, - }) + }), + String(responseBody.error.message), + { traceFinalResponseBody: true } ); } @@ -376,15 +432,16 @@ export class ProxyErrorHandler { override: { response: null, statusCode: override.statusCode }, }); - return await attachSessionIdToErrorResponse( - session.sessionId, + return await finalizeErrorResponse( ProxyResponses.buildError( responseStatusCode, finalClientErrorMessage, undefined, undefined, safeRequestId - ) + ), + finalClientErrorMessage, + { traceFinalResponseBody: true } ); } } @@ -463,15 +520,15 @@ export class ProxyErrorHandler { override: null, }); - return await attachSessionIdToErrorResponse( - session.sessionId, + return await finalizeErrorResponse( ProxyResponses.buildError( statusCode, finalClientErrorMessage, undefined, details, safeRequestId - ) + ), + logErrorMessage ); } @@ -590,6 +647,25 @@ export class ProxyErrorHandler { tracker.endRequest(session.messageContext.user.id, session.messageContext.id); } + private static emitErrorTrace( + session: ProxySession, + data: { error: unknown; errorMessage: string; statusCode: number; responseText?: string } + ): void { + const isStreaming = isRequestStreaming(session); + + emitProxyLangfuseTrace(session, { + responseHeaders: new Headers(), + responseText: data.responseText ?? getErrorResponseText(data.error), + usageMetrics: null, + costUsd: undefined, + statusCode: data.statusCode, + durationMs: Math.max(0, Date.now() - session.startTime), + isStreaming, + sseEventCount: isStreaming ? 0 : undefined, + errorMessage: data.errorMessage, + }); + } + /** * 从 provider chain 中提取最后一次实际请求的状态码 */ diff --git a/src/app/v1/_lib/proxy/forwarder.ts b/src/app/v1/_lib/proxy/forwarder.ts index 8b6f383f1..a0d7bc66d 100644 --- a/src/app/v1/_lib/proxy/forwarder.ts +++ b/src/app/v1/_lib/proxy/forwarder.ts @@ -52,6 +52,7 @@ import { buildProxyUrl } from "../url"; import { rectifyBillingHeader } from "./billing-header-rectifier"; import { bindClientAbortListener } from "./client-abort-listener"; import { deriveClientSafeUpstreamErrorMessage } from "./client-error-message"; +import { combineAbortSignals } from "./combine-abort-signals"; import { isStandardProxyEndpointPath } from "./endpoint-family-catalog"; import { resolveEndpointPolicy, shouldEnforceStrictEndpointPoolPolicy } from "./endpoint-policy"; import { @@ -102,6 +103,7 @@ type CacheTtlOption = CacheTtlPreference | null | undefined; type ProxySessionWithAttemptRuntime = ProxySession & { clearResponseTimeout?: () => void; responseController?: AbortController; + releaseAgent?: () => void; }; type StreamingHedgeAttempt = { @@ -121,6 +123,8 @@ type StreamingHedgeAttempt = { thresholdTimer: NodeJS.Timeout | null; reader: ReadableStreamDefaultReader | null; response: Response | null; + releaseAgent: (() => void) | null; + agentReleased: boolean; }; type ReactiveRectifierRetryState = { @@ -2706,56 +2710,18 @@ export class ProxyForwarder { } // 2. 组合双路信号:response + client - let combinedSignal: AbortSignal | undefined; const signals = [responseController.signal]; if (session.clientAbortSignal) { signals.push(session.clientAbortSignal); } - // ⭐ AbortSignal.any 实现(兼容所有环境) - // 原因:Next.js standalone 可能覆盖全局 AbortSignal,导致原生 any 方法不可用 - if ("any" in AbortSignal && typeof AbortSignal.any === "function") { - // 优先使用原生实现(Node.js 20.3+) - combinedSignal = AbortSignal.any(signals); - logger.debug("ProxyForwarder: Using native AbortSignal.any", { - signalCount: signals.length, - }); - } else { - // Polyfill: 手动实现多信号组合逻辑 - logger.debug("ProxyForwarder: Using AbortSignal.any polyfill", { - signalCount: signals.length, - reason: "Native AbortSignal.any not available", - }); - - const combinedController = new AbortController(); - const cleanupHandlers: Array<() => void> = []; - - // 为每个信号添加监听器 - for (const signal of signals) { - // 如果已经有信号中断,立即中断组合信号 - if (signal.aborted) { - combinedController.abort(); - break; - } - - // 监听信号中断事件 - const abortHandler = () => { - // 中断组合信号 - combinedController.abort(); - // 清理所有监听器(避免内存泄漏) - cleanupHandlers.forEach((cleanup) => cleanup()); - }; - - signal.addEventListener("abort", abortHandler, { once: true }); - - // 记录清理函数 - cleanupHandlers.push(() => { - signal.removeEventListener("abort", abortHandler); - }); - } - - combinedSignal = combinedController.signal; - } + // 优先 Node 20.3+ 原生 AbortSignal.any(V8 内部管理 listener,无需手动 cleanup); + // Next.js standalone 覆盖全局时 fallback 到 polyfill,由调用方在请求结束时调用 + // cleanupCombinedSignal 解绑源信号上的 listener,避免持有 session/请求体闭包。 + const { signal: combinedSignal, cleanup: cleanupCombinedSignal } = combineAbortSignals(signals); + logger.debug("ProxyForwarder: Combined abort signals", { + signalCount: signals.length, + }); const init: UndiciFetchOptions = { method: session.method, @@ -2849,6 +2815,9 @@ export class ProxyForwarder { clearTimeout(responseTimeoutId); } + // fetch 失败后可能继续尝试 HTTP/1.1 / 直连 fallback。 + // 这些 fallback 请求仍需响应客户端中断和响应超时,所以 cleanup 只能在最终失败时执行。 + // Release agent ref count on fetch failure (request never started streaming) const releaseKey = proxyConfig?.cacheKey ?? directConnectionCacheKey; const releaseDispatcherId = proxyConfig?.dispatcherId ?? directConnectionDispatcherId; @@ -2898,6 +2867,7 @@ export class ProxyForwarder { // 抛出 ProxyError 并设置特殊状态码 524(Cloudflare: A Timeout Occurred) // 这样会被归类为 PROVIDER_ERROR,计入熔断器并直接切换供应商 + cleanupCombinedSignal(); throw new ProxyError( `${responseTimeoutType === "streaming_first_byte" ? "供应商首字节响应超时" : "供应商响应超时"}: ${responseTimeoutMs}ms 内未收到数据`, 524, // 524 = A Timeout Occurred (Cloudflare standard) @@ -2943,6 +2913,7 @@ export class ProxyForwarder { ); // 抛出 ProxyError(归类为 PROVIDER_ERROR) + cleanupCombinedSignal(); throw new ProxyError( `供应商流式响应静默超时: ${provider.streamingIdleTimeoutMs}ms 内未收到新数据`, 524, // 524 = A Timeout Occurred @@ -2979,6 +2950,7 @@ export class ProxyForwarder { }); // 客户端中断不应计入熔断器,也不重试,直接抛出错误 + cleanupCombinedSignal(); throw new ProxyError( err.name === "ResponseAborted" ? "Response transmission aborted" @@ -3100,6 +3072,7 @@ export class ProxyForwarder { }); // 抛出 HTTP/1.1 错误,让正常的错误处理流程处理 + cleanupCombinedSignal(); throw http1Error; } } else if (proxyConfig) { @@ -3174,10 +3147,12 @@ export class ProxyForwarder { providerId: provider.id, error: directError, }); + cleanupCombinedSignal(); throw fetchError; // 抛出原始代理错误 } } else { // 不降级,直接抛出代理错误 + cleanupCombinedSignal(); throw new ProxyError("Service temporarily unavailable", 503); } } else { @@ -3215,6 +3190,7 @@ export class ProxyForwarder { bodySize: requestBody ? JSON.stringify(requestBody).length : 0, }); + cleanupCombinedSignal(); throw fetchError; } } else { @@ -3253,6 +3229,7 @@ export class ProxyForwarder { bodySize: requestBody ? JSON.stringify(requestBody).length : 0, }); + cleanupCombinedSignal(); throw fetchError; } } @@ -3281,6 +3258,8 @@ export class ProxyForwarder { if (errorReleaseKey && errorReleaseDispatcherId) { getGlobalAgentPool().releaseAgent(errorReleaseKey, errorReleaseDispatcherId); } + // 同上:response-handler 不会跑,polyfill 路径上的源信号 listener 必须在此解绑。 + cleanupCombinedSignal(); } } @@ -3308,14 +3287,19 @@ export class ProxyForwarder { // Attach agent release callback for in-flight reference counting. // response-handler must call this in its finally block after the stream is fully consumed. + // 同时复用此回调作为 combineAbortSignals polyfill 的 cleanup 入口:response-handler 已经 + // 保证在请求结束时(成功/异常)幂等地调用 releaseAgent,把 cleanup 合并到这里就不必再 + // 改造 response-handler 的所有 finally 调用点。两个动作互不影响,cleanup 内部自带 cleaned + // 标志,重复调用安全。 const agentCacheKeyToRelease = proxyConfig?.cacheKey ?? directConnectionCacheKey; const agentDispatcherIdToRelease = proxyConfig?.dispatcherId ?? directConnectionDispatcherId; - if (agentCacheKeyToRelease && agentDispatcherIdToRelease) { - const pool = getGlobalAgentPool(); - sessionWithTimeout.releaseAgent = () => { + const pool = agentCacheKeyToRelease && agentDispatcherIdToRelease ? getGlobalAgentPool() : null; + sessionWithTimeout.releaseAgent = () => { + if (pool && agentCacheKeyToRelease && agentDispatcherIdToRelease) { pool.releaseAgent(agentCacheKeyToRelease, agentDispatcherIdToRelease); - }; - } + } + cleanupCombinedSignal(); + }; return response; } @@ -3432,6 +3416,23 @@ export class ProxyForwarder { return attempt.modelRedirect; }; + const releaseAttemptAgent = (attempt: StreamingHedgeAttempt) => { + if (attempt.agentReleased) return; + const releaseAgent = attempt.releaseAgent; + if (!releaseAgent) return; + attempt.agentReleased = true; + try { + releaseAgent(); + } catch (releaseError) { + logger.debug("ProxyForwarder: hedge attempt releaseAgent failed", { + error: releaseError instanceof Error ? releaseError.message : String(releaseError), + sessionId: attempt.session.sessionId ?? null, + providerId: attempt.provider.id, + providerName: attempt.provider.name, + }); + } + }; + const abortAttempt = (attempt: StreamingHedgeAttempt, reason: string) => { if (attempt.settled) return; attempt.settled = true; @@ -3470,6 +3471,7 @@ export class ProxyForwarder { providerName: attempt.provider.name, }); }); + releaseAttemptAgent(attempt); }; const armAttemptThreshold = (attempt: StreamingHedgeAttempt) => { @@ -3574,6 +3576,7 @@ export class ProxyForwarder { .then(async (response) => { if (settled || winnerCommitted || attempt.settled) { const attemptRuntime = attempt.session as ProxySessionWithAttemptRuntime; + attempt.releaseAgent = attemptRuntime.releaseAgent ?? attempt.releaseAgent; try { attemptRuntime.responseController?.abort(new Error("hedge_loser")); } catch (abortError) { @@ -3593,12 +3596,14 @@ export class ProxyForwarder { providerName: attempt.provider.name, }); }); + releaseAttemptAgent(attempt); return; } const attemptRuntime = attempt.session as ProxySessionWithAttemptRuntime; attempt.responseController = attemptRuntime.responseController ?? null; attempt.clearResponseTimeout = attemptRuntime.clearResponseTimeout ?? null; + attempt.releaseAgent = attemptRuntime.releaseAgent ?? null; attempt.clearResponseTimeout?.(); attempt.response = response; @@ -4003,6 +4008,8 @@ export class ProxyForwarder { thresholdTimer: null, reader: null, response: null, + releaseAgent: null, + agentReleased: false, }; attempts.add(attempt); @@ -4246,6 +4253,7 @@ export class ProxyForwarder { } | null; clearResponseTimeout?: () => void; responseController?: AbortController; + releaseAgent?: () => void; }; const sourceRuntime = source as ProxySessionWithAttemptRuntime; @@ -4282,6 +4290,7 @@ export class ProxyForwarder { : null; targetState.clearResponseTimeout = sourceRuntime.clearResponseTimeout; targetState.responseController = sourceRuntime.responseController; + targetState.releaseAgent = sourceRuntime.releaseAgent; } private static async clearSessionProviderBinding(session: ProxySession): Promise { diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index e2dd19edc..4ac1203b3 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -1,6 +1,7 @@ import { ResponseFixer } from "@/app/v1/_lib/proxy/response-fixer"; import { AsyncTaskManager } from "@/lib/async-task-manager"; import { getEnvConfig } from "@/lib/config/env.schema"; +import { emitProxyLangfuseTrace } from "@/lib/langfuse/emit-proxy-trace"; import { logger } from "@/lib/logger"; import { requestCloudPriceTableSync } from "@/lib/price-sync/cloud-price-updater"; import { ProxyStatusTracker } from "@/lib/proxy-status-tracker"; @@ -126,49 +127,6 @@ function maybeSetCodexContext1m( } } -/** - * Fire Langfuse trace asynchronously. Non-blocking, error-tolerant. - */ -function emitLangfuseTrace( - session: ProxySession, - data: { - responseHeaders: Headers; - responseText: string; - usageMetrics: UsageMetrics | null; - costUsd: string | undefined; - costBreakdown?: CostBreakdown; - statusCode: number; - durationMs: number; - isStreaming: boolean; - sseEventCount?: number; - errorMessage?: string; - } -): void { - if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return; - - void import("@/lib/langfuse/trace-proxy-request") - .then(({ traceProxyRequest }) => { - void traceProxyRequest({ - session, - responseHeaders: data.responseHeaders, - durationMs: data.durationMs, - statusCode: data.statusCode, - isStreaming: data.isStreaming, - responseText: data.responseText, - usageMetrics: data.usageMetrics, - costUsd: data.costUsd, - costBreakdown: data.costBreakdown, - sseEventCount: data.sseEventCount, - errorMessage: data.errorMessage, - }); - }) - .catch((err) => { - logger.warn("[ResponseHandler] Langfuse trace failed", { - error: err instanceof Error ? err.message : String(err), - }); - }); -} - /** * 清理 Response headers 中的传输相关 header * @@ -411,6 +369,70 @@ function isNonBillingUsageEndpoint(session: ProxySession): boolean { return isNonBillingEndpoint(session.getEndpoint()); } +function hasBillableInputCostPerRequest(priceData: { input_cost_per_request?: unknown }): boolean { + const inputCostPerRequest = priceData.input_cost_per_request; + return ( + typeof inputCostPerRequest === "number" && + Number.isFinite(inputCostPerRequest) && + inputCostPerRequest > 0 + ); +} + +async function resolveBillableUsageMetricsForCost( + session: ProxySession, + provider: Provider | null, + usageMetrics: UsageMetrics | null, + statusCode: number, + responseText?: string | null +): Promise { + if (isNonBillingUsageEndpoint(session)) { + return null; + } + + if (statusCode < 200 || statusCode >= 300) { + return null; + } + + if (responseText !== undefined && responseText !== null) { + const detected = detectUpstreamErrorFromSseOrJsonText(responseText, { + maxJsonCharsForMessageCheck: 0, + }); + if (detected.isError) { + logger.warn("[CostCalculation] Skipping billing for fake-200 error payload", { + code: detected.code, + detail: detected.detail, + originalModel: session.getOriginalModel(), + redirectedModel: session.getCurrentModel(), + }); + return null; + } + } + + if (usageMetrics) { + return usageMetrics; + } + + let resolvedPricing: Awaited>; + try { + resolvedPricing = await session.getResolvedPricingByBillingSource(provider); + } catch (error) { + logger.error("[CostCalculation] Failed to resolve per-request pricing, skipping billing", { + error: error instanceof Error ? error.message : String(error), + originalModel: session.getOriginalModel(), + redirectedModel: session.getCurrentModel(), + }); + return null; + } + + if (!resolvedPricing?.priceData || !hasBillableInputCostPerRequest(resolvedPricing.priceData)) { + return null; + } + + // 成功响应可能没有 token usage(例如 OpenAI Images),但本地价格表仍可配置按次价格。 + // 这里用显式零 token sentinel 只承载 input_cost_per_request,不新增按图、按 token 等语义。 + return { input_tokens: 0, output_tokens: 0 }; +} + type FinalizeDeferredStreamingResult = { /** * “内部结算用”的状态码。 @@ -984,7 +1006,7 @@ export class ProxyResponseHandler { false // Gemini 非流式透传 ); - emitLangfuseTrace(session, { + emitProxyLangfuseTrace(session, { responseHeaders: response.headers, responseText, usageMetrics: finalizedUsage, @@ -1145,11 +1167,9 @@ export class ProxyResponseHandler { if (sessionWithCleanup.clearResponseTimeout) { sessionWithCleanup.clearResponseTimeout(); } - let usageRecord: Record | null = null; let usageMetrics: UsageMetrics | null = null; const usageResult = parseUsageFromResponseText(responseText, provider.providerType); - usageRecord = usageResult.usageRecord; usageMetrics = usageResult.usageMetrics; const actualServiceTier = parseServiceTierFromResponseText(responseText); const codexPriorityBillingDecision = await resolveCodexPriorityBillingDecision( @@ -1172,8 +1192,13 @@ export class ProxyResponseHandler { // 关键:必须在 normalizeUsageWithSwap 之后再快照 billable 视图, // 否则 updateRequestCostFromUsage / trackCostToRedis 会用未归一化的旧值, // 导致缓存 TTL swap、bucket 归一化等场景下的账单与限流统计错位。 - const billableUsageMetrics = - usageMetrics && !isNonBillingUsageEndpoint(session) ? usageMetrics : null; + const billableUsageMetrics = await resolveBillableUsageMetricsForCost( + session, + provider, + usageMetrics, + statusCode, + responseText + ); if (billableUsageMetrics) { maybeSetCodexContext1m(session, provider, billableUsageMetrics.input_tokens); @@ -1221,7 +1246,7 @@ export class ProxyResponseHandler { }); } - if (usageRecord && billableUsageMetrics && messageContext) { + if (billableUsageMetrics && messageContext) { const costUpdateResult = await updateRequestCostFromUsage( messageContext.id, session, @@ -1315,12 +1340,16 @@ export class ProxyResponseHandler { } // 更新 session 使用量到 Redis(用于实时监控) - if (session.sessionId && usageMetrics && session.shouldTrackSessionObservability()) { + if ( + session.sessionId && + (usageMetrics || costUsdStr !== undefined) && + session.shouldTrackSessionObservability() + ) { void SessionManager.updateSessionUsage(session.sessionId, { - inputTokens: usageMetrics.input_tokens, - outputTokens: usageMetrics.output_tokens, - cacheCreationInputTokens: usageMetrics.cache_creation_input_tokens, - cacheReadInputTokens: usageMetrics.cache_read_input_tokens, + inputTokens: usageMetrics?.input_tokens, + outputTokens: usageMetrics?.output_tokens, + cacheCreationInputTokens: usageMetrics?.cache_creation_input_tokens, + cacheReadInputTokens: usageMetrics?.cache_read_input_tokens, costUsd: costUsdStr, status: statusCode >= 200 && statusCode < 300 ? "completed" : "error", statusCode: statusCode, @@ -1396,7 +1425,7 @@ export class ProxyResponseHandler { statusCode, }); - emitLangfuseTrace(session, { + emitProxyLangfuseTrace(session, { responseHeaders: response.headers, responseText, usageMetrics, @@ -1883,7 +1912,7 @@ export class ProxyResponseHandler { true // Gemini 流式透传(NDJSON 无 data:/event: 前缀,必须显式告知) ); - emitLangfuseTrace(session, { + emitProxyLangfuseTrace(session, { responseHeaders: response.headers, responseText: allContent, usageMetrics: finalizedUsage, @@ -2336,8 +2365,13 @@ export class ProxyResponseHandler { } } - const billableUsageForCost = - usageForCost && !isNonBillingUsageEndpoint(session) ? usageForCost : null; + const billableUsageForCost = await resolveBillableUsageMetricsForCost( + session, + provider, + usageForCost, + effectiveStatusCode, + allContent + ); const costUpdateResult = await updateRequestCostFromUsage( messageContext.id, @@ -2443,6 +2477,8 @@ export class ProxyResponseHandler { payload.outputTokens = usageForCost.output_tokens; payload.cacheCreationInputTokens = usageForCost.cache_creation_input_tokens; payload.cacheReadInputTokens = usageForCost.cache_read_input_tokens; + } + if (costUsdStr !== undefined) { payload.costUsd = costUsdStr; } @@ -2480,7 +2516,7 @@ export class ProxyResponseHandler { specialSettings: session.getSpecialSettings() ?? undefined, }); - emitLangfuseTrace(session, { + emitProxyLangfuseTrace(session, { responseHeaders: response.headers, responseText: allContent, usageMetrics: usageForCost, @@ -3602,6 +3638,58 @@ export async function finalizeRequestStats( } const priorityServiceTierApplied = codexPriorityBillingDecision?.effectivePriority ?? false; if (!usageMetrics) { + const billablePerRequestUsage = await resolveBillableUsageMetricsForCost( + session, + provider, + null, + statusCode, + responseText + ); + let perRequestCostUsd: string | undefined; + + if (billablePerRequestUsage) { + const costUpdateResult = await updateRequestCostFromUsage( + messageContext.id, + session, + billablePerRequestUsage, + provider, + provider.costMultiplier, + session.getContext1mApplied(), + priorityServiceTierApplied, + session.getGroupCostMultiplier() + ); + if (costUpdateResult.resolvedPricing) { + ensurePricingResolutionSpecialSetting(session, costUpdateResult.resolvedPricing); + } + if (costUpdateResult.longContextPricingApplied) { + ensureLongContextPricingAudit(session, costUpdateResult.longContextPricing); + } + + await trackCostToRedis( + session, + billablePerRequestUsage, + priorityServiceTierApplied, + costUpdateResult.resolvedPricing, + costUpdateResult.longContextPricing + ); + perRequestCostUsd = costUpdateResult.costUsd ?? undefined; + } + + if ( + session.sessionId && + perRequestCostUsd !== undefined && + session.shouldTrackSessionObservability() + ) { + void SessionManager.updateSessionUsage(session.sessionId, { + costUsd: perRequestCostUsd, + status: statusCode >= 200 && statusCode < 300 ? "completed" : "error", + statusCode, + ...(errorMessage ? { errorMessage } : {}), + }).catch((error: unknown) => { + logger.error("[ResponseHandler] Failed to update session usage:", error); + }); + } + await updateMessageRequestDetails(messageContext.id, { statusCode: statusCode, ...(errorMessage ? { errorMessage } : {}), @@ -3988,7 +4076,7 @@ async function persistRequestFailure(options: { } // Emit Langfuse trace for error/abort paths - emitLangfuseTrace(session, { + emitProxyLangfuseTrace(session, { responseHeaders: new Headers(), responseText: "", usageMetrics: null, @@ -3996,6 +4084,7 @@ async function persistRequestFailure(options: { statusCode, durationMs: duration, isStreaming: phase === "stream", + sseEventCount: phase === "stream" ? 0 : undefined, errorMessage, }); } diff --git a/src/components/ui/__tests__/language-switcher.test.tsx b/src/components/ui/__tests__/language-switcher.test.tsx new file mode 100644 index 000000000..19af3c606 --- /dev/null +++ b/src/components/ui/__tests__/language-switcher.test.tsx @@ -0,0 +1,176 @@ +/** + * @vitest-environment happy-dom + */ + +import type { ReactNode } from "react"; +import { act } from "react"; +import { createRoot } from "react-dom/client"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { LanguageSwitcher } from "@/components/ui/language-switcher"; +import type { Locale } from "@/i18n/config"; + +const testState = vi.hoisted(() => ({ + currentLocale: "zh-CN" as Locale, + pathname: "/settings/config", + router: { + push: vi.fn(), + refresh: vi.fn(), + }, +})); + +vi.mock("next-intl", () => ({ + useLocale: () => testState.currentLocale, +})); + +vi.mock("@/i18n/routing", () => ({ + usePathname: () => testState.pathname, + useRouter: () => testState.router, +})); + +vi.mock("@/components/ui/dropdown-menu", () => ({ + DropdownMenu: ({ children }: { children: ReactNode }) =>
{children}
, + DropdownMenuTrigger: ({ children }: { children: ReactNode }) => <>{children}, + DropdownMenuContent: ({ children }: { children: ReactNode }) =>
{children}
, + DropdownMenuItem: ({ children, onClick }: { children: ReactNode; onClick?: () => void }) => ( + + ), +})); + +function render(node: ReactNode) { + const container = document.createElement("div"); + document.body.appendChild(container); + const root = createRoot(container); + + act(() => { + root.render(node); + }); + + return { + container, + rerender: (nextNode: ReactNode) => { + act(() => { + root.render(nextNode); + }); + }, + unmount: () => { + act(() => root.unmount()); + container.remove(); + }, + }; +} + +function click(element: Element) { + act(() => { + element.dispatchEvent(new MouseEvent("mousedown", { bubbles: true })); + element.dispatchEvent(new MouseEvent("mouseup", { bubbles: true })); + element.dispatchEvent(new MouseEvent("click", { bubbles: true })); + }); +} + +describe("LanguageSwitcher", () => { + let view: ReturnType | null = null; + + beforeEach(() => { + window.sessionStorage.clear(); + testState.currentLocale = "zh-CN"; + testState.pathname = "/settings/config"; + testState.router.push.mockReset(); + testState.router.refresh.mockReset(); + }); + + afterEach(() => { + view?.unmount(); + view = null; + }); + + test("refreshes the current route after the locale provider catches up", () => { + view = render(); + + const englishOption = Array.from(view.container.querySelectorAll("button")).find((button) => + button.textContent?.includes("English") + ); + + expect(englishOption).toBeTruthy(); + click(englishOption!); + + expect(testState.router.push).toHaveBeenCalledWith("/settings/config", { locale: "en" }); + expect(testState.router.refresh).not.toHaveBeenCalled(); + + testState.currentLocale = "en"; + view.rerender(); + + expect(testState.router.refresh).toHaveBeenCalledTimes(1); + const trigger = view.container.querySelector( + "button[aria-label='Select language']" + ); + expect(trigger?.disabled).toBe(false); + }); + + test("restores the pending refresh after the switcher remounts during navigation", () => { + view = render(); + + const englishOption = Array.from(view.container.querySelectorAll("button")).find((button) => + button.textContent?.includes("English") + ); + + expect(englishOption).toBeTruthy(); + click(englishOption!); + + expect(window.sessionStorage.getItem("cch.pendingLocaleRefresh")).toBe("en"); + + view.unmount(); + view = null; + + testState.currentLocale = "en"; + view = render(); + + expect(testState.router.refresh).toHaveBeenCalledTimes(1); + expect(window.sessionStorage.getItem("cch.pendingLocaleRefresh")).toBeNull(); + }); + + test("keeps the pending refresh after remount when sessionStorage is blocked", () => { + const setItemSpy = vi.spyOn(window.sessionStorage, "setItem").mockImplementation(() => { + throw new Error("blocked storage"); + }); + const consoleErrorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + view = render(); + + const englishOption = Array.from(view.container.querySelectorAll("button")).find((button) => + button.textContent?.includes("English") + ); + + expect(englishOption).toBeTruthy(); + click(englishOption!); + + expect(testState.router.push).toHaveBeenCalledWith("/settings/config", { locale: "en" }); + expect(testState.router.refresh).not.toHaveBeenCalled(); + expect(consoleErrorSpy).toHaveBeenCalledWith( + "Failed to persist pending locale refresh target:", + expect.any(Error) + ); + + view.unmount(); + view = null; + setItemSpy.mockRestore(); + + testState.currentLocale = "en"; + view = render(); + + expect(testState.router.refresh).toHaveBeenCalledTimes(1); + + consoleErrorSpy.mockRestore(); + }); + + test("restores a pending refresh from sessionStorage after remount", () => { + window.sessionStorage.setItem("cch.pendingLocaleRefresh", "en"); + testState.currentLocale = "en"; + + view = render(); + + expect(testState.router.refresh).toHaveBeenCalledTimes(1); + expect(window.sessionStorage.getItem("cch.pendingLocaleRefresh")).toBeNull(); + }); +}); diff --git a/src/components/ui/language-switcher.tsx b/src/components/ui/language-switcher.tsx index 1f93b3bf3..a7e028d9e 100644 --- a/src/components/ui/language-switcher.tsx +++ b/src/components/ui/language-switcher.tsx @@ -15,11 +15,56 @@ import { normalizePathnameForLocaleNavigation } from "@/i18n/pathname"; import { usePathname, useRouter } from "@/i18n/routing"; import { cn } from "@/lib/utils/index"; +const pendingLocaleRefreshKey = "cch.pendingLocaleRefresh"; +let activePendingLocaleRefreshTarget: Locale | null = null; + interface LanguageSwitcherProps { className?: string; size?: "sm" | "default"; } +function getPendingLocaleRefreshTarget(): Locale | null { + if (typeof window === "undefined") { + return null; + } + + try { + const value = window.sessionStorage.getItem(pendingLocaleRefreshKey); + return locales.some((locale) => locale === value) ? (value as Locale) : null; + } catch (error) { + console.error("Failed to read pending locale refresh target:", error); + return null; + } +} + +function setPendingLocaleRefreshTarget(locale: Locale) { + activePendingLocaleRefreshTarget = locale; + + if (typeof window === "undefined") { + return; + } + + try { + window.sessionStorage.setItem(pendingLocaleRefreshKey, locale); + } catch (error) { + console.error("Failed to persist pending locale refresh target:", error); + } +} + +function clearPendingLocaleRefreshTarget() { + activePendingLocaleRefreshTarget = null; + + if (typeof window === "undefined") { + return; + } + + try { + window.sessionStorage.removeItem(pendingLocaleRefreshKey); + } catch (error) { + console.error("Failed to clear pending locale refresh target:", error); + } +} + /** * LanguageSwitcher Component * @@ -31,6 +76,22 @@ export function LanguageSwitcher({ className, size = "sm" }: LanguageSwitcherPro const router = useRouter(); const pathname = usePathname(); const [isTransitioning, setIsTransitioning] = React.useState(false); + const [pendingLocale, setPendingLocale] = React.useState(null); + + React.useEffect(() => { + const storedRefreshTarget = getPendingLocaleRefreshTarget(); + const refreshTarget = pendingLocale ?? activePendingLocaleRefreshTarget ?? storedRefreshTarget; + + if (refreshTarget !== currentLocale) { + return; + } + + // Locale route 已切换后刷新当前 RSC 树,避免布局与服务端标题继续显示旧语言。 + router.refresh(); + clearPendingLocaleRefreshTarget(); + setPendingLocale(null); + setIsTransitioning(false); + }, [currentLocale, pendingLocale, router]); const handleLocaleChange = React.useCallback( (newLocale: Locale) => { @@ -39,11 +100,15 @@ export function LanguageSwitcher({ className, size = "sm" }: LanguageSwitcherPro } setIsTransitioning(true); + setPendingLocale(newLocale); + setPendingLocaleRefreshTarget(newLocale); try { router.push(normalizePathnameForLocaleNavigation(pathname), { locale: newLocale }); } catch (error) { console.error("Failed to switch locale:", error); + clearPendingLocaleRefreshTarget(); + setPendingLocale(null); setIsTransitioning(false); } }, diff --git a/src/lib/langfuse/emit-proxy-trace.ts b/src/lib/langfuse/emit-proxy-trace.ts new file mode 100644 index 000000000..64e4ffe78 --- /dev/null +++ b/src/lib/langfuse/emit-proxy-trace.ts @@ -0,0 +1,51 @@ +import type { UsageMetrics } from "@/app/v1/_lib/proxy/response-handler"; +import type { ProxySession } from "@/app/v1/_lib/proxy/session"; +import { logger } from "@/lib/logger"; +import type { CostBreakdown } from "@/lib/utils/cost-calculation"; + +export interface EmitProxyLangfuseTraceData { + responseHeaders: Headers; + responseText: string; + usageMetrics: UsageMetrics | null; + costUsd: string | undefined; + costBreakdown?: CostBreakdown; + statusCode: number; + durationMs: number; + isStreaming: boolean; + sseEventCount?: number; + errorMessage?: string; +} + +/** + * 异步发送代理请求的 Langfuse trace。 + * + * 这里保持 fire-and-forget,避免观测系统故障影响代理响应。 + */ +export function emitProxyLangfuseTrace( + session: ProxySession, + data: EmitProxyLangfuseTraceData +): void { + if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return; + + void import("@/lib/langfuse/trace-proxy-request") + .then(({ traceProxyRequest }) => { + void traceProxyRequest({ + session, + responseHeaders: data.responseHeaders, + durationMs: data.durationMs, + statusCode: data.statusCode, + isStreaming: data.isStreaming, + responseText: data.responseText, + usageMetrics: data.usageMetrics, + costUsd: data.costUsd, + costBreakdown: data.costBreakdown, + sseEventCount: data.sseEventCount, + errorMessage: data.errorMessage, + }); + }) + .catch((err) => { + logger.warn("[Langfuse] Proxy trace failed", { + error: err instanceof Error ? err.message : String(err), + }); + }); +} diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts index ad5f1f547..2d4ec6bc3 100644 --- a/src/lib/langfuse/trace-proxy-request.ts +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -82,6 +82,50 @@ export interface TraceContext { costBreakdown?: CostBreakdown; } +function hasRequestInput(ctx: TraceContext): boolean { + if ( + typeof ctx.session.forwardedRequestBody === "string" && + ctx.session.forwardedRequestBody.trim().length > 0 + ) { + return true; + } + + return Object.keys(ctx.session.request.message ?? {}).length > 0; +} + +function isResponseMissing(ctx: TraceContext): boolean { + if (ctx.responseText) return false; + if (ctx.errorMessage) return true; + if (!hasRequestInput(ctx)) return false; + if (ctx.isStreaming) return ctx.sseEventCount === 0; + + return true; +} + +function buildResponseOutput(ctx: TraceContext): unknown { + if (ctx.responseText) { + return tryParseJsonSafe(ctx.responseText); + } + + const responseMissing = isResponseMissing(ctx); + const output: Record = ctx.isStreaming + ? { streaming: true, sseEventCount: ctx.sseEventCount } + : { statusCode: ctx.statusCode }; + + if (responseMissing) { + output.responseMissing = true; + } + + if (ctx.errorMessage) { + if (ctx.isStreaming) { + output.statusCode = ctx.statusCode; + } + output.errorMessage = ctx.errorMessage; + } + + return output; +} + /** * Send a trace to Langfuse for a completed proxy request. * Fully async and non-blocking. Errors are caught and logged. @@ -138,11 +182,8 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { : session.request.message; // Actual response body - no truncation - const actualResponseBody = ctx.responseText - ? tryParseJsonSafe(ctx.responseText) - : isStreaming - ? { streaming: true, sseEventCount: ctx.sseEventCount } - : { statusCode }; + const actualResponseBody = buildResponseOutput(ctx); + const responseMissing = isResponseMissing(ctx); // Root span metadata (former input/output summaries moved here) const rootSpanMetadata: Record = { @@ -153,6 +194,8 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { providerName: provider?.name, statusCode, durationMs, + errorMessage: ctx.errorMessage, + responseMissing, hasUsage: !!ctx.usageMetrics, costUsd: ctx.costUsd, timingBreakdown, @@ -336,11 +379,7 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { // Generation input/output = raw payload, no truncation const generationInput = actualRequestBody; - const generationOutput = ctx.responseText - ? tryParseJsonSafe(ctx.responseText) - : isStreaming - ? { streaming: true, sseEventCount: ctx.sseEventCount } - : { statusCode }; + const generationOutput = buildResponseOutput(ctx); // Create the LLM generation observation const generation = rootSpan.startObservation( diff --git a/src/lib/utils/quota-helpers.ts b/src/lib/utils/quota-helpers.ts index 074481e92..94f2c5c12 100644 --- a/src/lib/utils/quota-helpers.ts +++ b/src/lib/utils/quota-helpers.ts @@ -10,6 +10,7 @@ export type KeyQuota = { costDaily: { current: number; limit: number | null }; costWeekly: { current: number; limit: number | null }; costMonthly: { current: number; limit: number | null }; + costTotal?: { current: number; limit: number | null }; concurrentSessions: { current: number; limit: number }; } | null; @@ -22,7 +23,7 @@ export type UserQuota = { * 判断密钥是否设置了限额 * * @param quota - 密钥限额数据 - * @returns 是否设置了任意限额(5h/周/月/并发) + * @returns 是否设置了任意限额(5h/日/周/月/总额/并发) */ export function hasKeyQuotaSet(quota: KeyQuota): boolean { if (!quota) return false; @@ -32,6 +33,7 @@ export function hasKeyQuotaSet(quota: KeyQuota): boolean { quota.costDaily.limit || quota.costWeekly.limit || quota.costMonthly.limit || + quota.costTotal?.limit || (quota.concurrentSessions.limit && quota.concurrentSessions.limit > 0) ); } @@ -87,6 +89,9 @@ export function getMaxUsageRate(quota: KeyQuota): number { if (quota.costMonthly.limit) { rates.push(getUsageRate(quota.costMonthly.current, quota.costMonthly.limit)); } + if (quota.costTotal?.limit) { + rates.push(getUsageRate(quota.costTotal.current, quota.costTotal.limit)); + } if (quota.concurrentSessions.limit > 0) { rates.push(getUsageRate(quota.concurrentSessions.current, quota.concurrentSessions.limit)); } diff --git a/src/repository/leaderboard.ts b/src/repository/leaderboard.ts index 18ace83f7..370010496 100644 --- a/src/repository/leaderboard.ts +++ b/src/repository/leaderboard.ts @@ -364,7 +364,7 @@ async function findLeaderboardWithTimezone( .innerJoin(users, and(sql`${usageLedger.userId} = ${users.id}`, isNull(users.deletedAt))) .where(and(...whereConditions)) .groupBy(usageLedger.userId, users.name) - .orderBy(desc(sql`sum(${usageLedger.costUsd})`)); + .orderBy(desc(sql`COALESCE(sum(${usageLedger.costUsd}), 0)`)); const baseEntries: LeaderboardEntry[] = rankings.map((entry) => ({ userId: entry.userId, @@ -404,7 +404,7 @@ async function findLeaderboardWithTimezone( .innerJoin(users, and(sql`${usageLedger.userId} = ${users.id}`, isNull(users.deletedAt))) .where(and(...whereConditions)) .groupBy(usageLedger.userId, modelField) - .orderBy(desc(sql`sum(${usageLedger.costUsd})`)); + .orderBy(desc(sql`COALESCE(sum(${usageLedger.costUsd}), 0)`)); const modelStatsByUser = new Map(); for (const row of modelRows) { @@ -696,7 +696,7 @@ async function findProviderLeaderboardWithTimezone( and(...whereConditions.filter((c): c is NonNullable<(typeof whereConditions)[number]> => !!c)) ) .groupBy(usageLedger.finalProviderId, providers.name) - .orderBy(desc(sql`sum(${usageLedger.costUsd})`)); + .orderBy(desc(sql`COALESCE(sum(${usageLedger.costUsd}), 0)`)); const baseEntries: ProviderLeaderboardEntry[] = rankings.map((entry) => { const totalCost = parseFloat(entry.totalCost); @@ -747,7 +747,7 @@ async function findProviderLeaderboardWithTimezone( and(...whereConditions.filter((c): c is NonNullable<(typeof whereConditions)[number]> => !!c)) ) .groupBy(usageLedger.finalProviderId, modelField) - .orderBy(desc(sql`sum(${usageLedger.costUsd})`), desc(sql`count(*)`)); + .orderBy(desc(sql`COALESCE(sum(${usageLedger.costUsd}), 0)`), desc(sql`count(*)`)); const modelStatsByProvider = new Map(); for (const row of modelRows) { @@ -1153,7 +1153,7 @@ async function findModelLeaderboardWithTimezone( .from(usageLedger) .where(and(LEDGER_BILLING_CONDITION, buildDateCondition(period, timezone, dateRange))) .groupBy(modelField) - .orderBy(desc(sql`count(*)`)); // 按请求数排序 + .orderBy(desc(sql`COALESCE(sum(${usageLedger.costUsd}), 0)`), desc(sql`count(*)`)); return rankings .filter((entry) => entry.model !== null && entry.model !== "") diff --git a/src/types/provider.ts b/src/types/provider.ts index c5e217e65..3ea7782dd 100644 --- a/src/types/provider.ts +++ b/src/types/provider.ts @@ -464,6 +464,7 @@ export interface ProviderDisplay { limitWeeklyUsd: number | null; limitMonthlyUsd: number | null; limitTotalUsd: number | null; + totalCostResetAt?: Date | null; limitConcurrentSessions: number; // 熔断器配置 maxRetryAttempts: number | null; diff --git a/tests/integration/billing-model-source.test.ts b/tests/integration/billing-model-source.test.ts index d9ef3e2d9..2088d55c1 100644 --- a/tests/integration/billing-model-source.test.ts +++ b/tests/integration/billing-model-source.test.ts @@ -77,7 +77,7 @@ vi.mock("@/lib/proxy-status-tracker", () => ({ }, })); -import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler"; +import { finalizeRequestStats, ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler"; import { ProxySession } from "@/app/v1/_lib/proxy/session"; import { getCachedSystemSettings, invalidateSystemSettingsCache } from "@/lib/config"; import { SessionManager } from "@/lib/session-manager"; @@ -139,12 +139,17 @@ function makeSystemSettings( }; } -function makePriceRecord(modelName: string, priceData: ModelPriceData): ModelPrice { +function makePriceRecord( + modelName: string, + priceData: ModelPriceData, + source: ModelPrice["source"] = "litellm" +): ModelPrice { const now = new Date(); return { id: 1, modelName, priceData, + source, createdAt: now, updatedAt: now, }; @@ -158,6 +163,8 @@ function createSession({ enableHighConcurrencyMode = false, providerOverrides, requestMessage, + requestPath = "/v1/messages", + groupCostMultiplier, }: { originalModel: string; redirectedModel: string; @@ -166,6 +173,8 @@ function createSession({ enableHighConcurrencyMode?: boolean; providerOverrides?: Record; requestMessage?: Record; + requestPath?: string; + groupCostMultiplier?: number; }): ProxySession { const session = new ( ProxySession as unknown as { @@ -184,7 +193,7 @@ function createSession({ )({ startTime: Date.now(), method: "POST", - requestUrl: new URL("http://localhost/v1/messages"), + requestUrl: new URL(`http://localhost${requestPath}`), headers: new Headers(), headerLog: "", request: { message: requestMessage ?? {}, log: "(test)", model: redirectedModel }, @@ -196,6 +205,9 @@ function createSession({ session.setOriginalModel(originalModel); session.setSessionId(sessionId); session.setHighConcurrencyModeEnabled(enableHighConcurrencyMode); + if (groupCostMultiplier !== undefined) { + session.setGroupCostMultiplier(groupCostMultiplier); + } const provider = { id: 99, @@ -256,6 +268,33 @@ function createNonStreamResponse( ); } +function createImageEditResponseWithoutUsage(): Response { + return new Response( + JSON.stringify({ + created: 1_776_729_600, + data: [{ b64_json: "test-image-bytes" }], + }), + { + status: 200, + headers: { "content-type": "application/json" }, + } + ); +} + +function createFake200ErrorResponse(): Response { + return new Response( + JSON.stringify({ + error: { + message: "invalid api key", + }, + }), + { + status: 200, + headers: { "content-type": "application/json" }, + } + ); +} + function createStreamResponse(usage: { input_tokens: number; output_tokens: number }): Response { const sseText = `event: message_delta\ndata: ${JSON.stringify({ usage })}\n\n`; const encoder = new TextEncoder(); @@ -295,7 +334,14 @@ async function runScenario({ billingModelSource: SystemSettings["billingModelSource"]; isStream: boolean; enableHighConcurrencyMode?: boolean; -}): Promise<{ dbCostUsd: string; sessionCostUsd: string; rateLimitCost: number }> { +}): Promise<{ + dbCostCalls: number; + dbCostUsd: string; + rateLimitCalls: number; + rateLimitCost: number; + sessionCostCalls: number; + sessionCostUsd: string; +}> { invalidateSystemSettingsCache(); const usage = { input_tokens: 2, output_tokens: 3 }; @@ -367,11 +413,14 @@ async function runScenario({ await drainAsyncTasks(); - const dbCostUsd = dbCosts[0] ?? ""; - const sessionCostUsd = sessionCosts[0] ?? ""; - const rateLimitCost = rateLimitCosts[0] ?? Number.NaN; - - return { dbCostUsd, sessionCostUsd, rateLimitCost }; + return { + dbCostCalls: dbCosts.length, + dbCostUsd: dbCosts[0] ?? "", + rateLimitCalls: rateLimitCosts.length, + rateLimitCost: rateLimitCosts[0] ?? Number.NaN, + sessionCostCalls: sessionCosts.length, + sessionCostUsd: sessionCosts[0] ?? "", + }; } describe("Billing model source - Redis session cost vs DB cost", () => { @@ -996,6 +1045,332 @@ describe("Billing model source - Redis session cost vs DB cost", () => { }); }); +describe("模型重定向后的图片按次计费", () => { + async function runImageEditPerRequestScenario( + billingModelSource: SystemSettings["billingModelSource"] + ): Promise<{ + dbCostCalls: number; + dbCostUsd: string; + rateLimitCalls: number; + rateLimitCost: number; + sessionCostCalls: number; + sessionCostUsd: string; + storedBreakdown: Record | undefined; + }> { + invalidateSystemSettingsCache(); + + const originalModel = "gpt-image-2"; + const redirectedModel = "gpt-image-2-all"; + const providerMultiplier = 2; + const groupCostMultiplier = 3; + + vi.mocked(getSystemSettings).mockResolvedValue(makeSystemSettings(billingModelSource)); + vi.mocked(findLatestPriceByModel).mockImplementation(async (modelName: string) => { + if (modelName === originalModel) { + return makePriceRecord(modelName, { input_cost_per_request: 0.01 }, "manual"); + } + if (modelName === redirectedModel) { + return makePriceRecord(modelName, { input_cost_per_request: 0.02 }, "manual"); + } + return null; + }); + + vi.mocked(updateMessageRequestDetails).mockResolvedValue(undefined); + vi.mocked(updateMessageRequestDuration).mockResolvedValue(undefined); + vi.mocked(SessionManager.storeSessionResponse).mockResolvedValue(undefined); + vi.mocked(RateLimitService.trackUserDailyCost).mockResolvedValue(undefined); + vi.mocked(SessionTracker.refreshSession).mockResolvedValue(undefined); + + const dbCosts: string[] = []; + let storedBreakdown: Record | undefined; + vi.mocked(updateMessageRequestCostWithBreakdown).mockImplementation( + async (_id: number, costUsd: unknown, breakdown?: Record) => { + dbCosts.push(String(costUsd)); + storedBreakdown = breakdown; + } + ); + + const sessionCosts: string[] = []; + vi.mocked(SessionManager.updateSessionUsage).mockImplementation( + async (_sessionId: string, payload: Record) => { + if (typeof payload.costUsd === "string") { + sessionCosts.push(payload.costUsd); + } + } + ); + + const rateLimitCosts: number[] = []; + vi.mocked(RateLimitService.trackCost).mockImplementation( + async (_keyId: number, _providerId: number, _sessionId: string, costUsd: number) => { + rateLimitCosts.push(costUsd); + } + ); + + const session = createSession({ + originalModel, + redirectedModel, + sessionId: `sess-image-edit-${billingModelSource}`, + messageId: billingModelSource === "original" ? 4000 : 4001, + requestPath: "/v1/images/edits", + providerOverrides: { + providerType: "openai", + url: "https://api.openai.com/v1", + costMultiplier: providerMultiplier, + }, + groupCostMultiplier, + }); + + const clientResponse = await ProxyResponseHandler.dispatch( + session, + createImageEditResponseWithoutUsage() + ); + await clientResponse.text(); + await drainAsyncTasks(); + + return { + dbCostCalls: dbCosts.length, + dbCostUsd: dbCosts[0] ?? "", + rateLimitCalls: rateLimitCosts.length, + rateLimitCost: rateLimitCosts[0] ?? Number.NaN, + sessionCostCalls: sessionCosts.length, + sessionCostUsd: sessionCosts[0] ?? "", + storedBreakdown, + }; + } + + it("配置 = original 时命中重定向前模型的本地按次价格并应用倍率", async () => { + const result = await runImageEditPerRequestScenario("original"); + + expect(result.dbCostUsd).toBe("0.06"); + expect(result.sessionCostUsd).toBe("0.06"); + expect(result.rateLimitCost).toBe(0.06); + expect(result.dbCostCalls).toBe(1); + expect(result.sessionCostCalls).toBe(1); + expect(result.rateLimitCalls).toBe(1); + expect(result.storedBreakdown).toMatchObject({ + input: "0.01", + base_total: "0.01", + provider_multiplier: 2, + group_multiplier: 3, + total: "0.06", + }); + }); + + it("配置 = redirected 时命中重定向后模型的本地按次价格并应用倍率", async () => { + const result = await runImageEditPerRequestScenario("redirected"); + + expect(result.dbCostUsd).toBe("0.12"); + expect(result.sessionCostUsd).toBe("0.12"); + expect(result.rateLimitCost).toBe(0.12); + expect(result.dbCostCalls).toBe(1); + expect(result.sessionCostCalls).toBe(1); + expect(result.rateLimitCalls).toBe(1); + expect(result.storedBreakdown).toMatchObject({ + input: "0.02", + base_total: "0.02", + provider_multiplier: 2, + group_multiplier: 3, + total: "0.12", + }); + }); + + it("按次价格为 0 时不进入空 usage 计费写入路径", async () => { + invalidateSystemSettingsCache(); + + const originalModel = "gpt-image-2"; + const redirectedModel = "gpt-image-2-all"; + + vi.mocked(getSystemSettings).mockResolvedValue(makeSystemSettings("original")); + vi.mocked(findLatestPriceByModel).mockImplementation(async (modelName: string) => { + return makePriceRecord(modelName, { input_cost_per_request: 0 }, "manual"); + }); + + vi.mocked(updateMessageRequestDetails).mockResolvedValue(undefined); + vi.mocked(updateMessageRequestDuration).mockResolvedValue(undefined); + vi.mocked(SessionManager.storeSessionResponse).mockResolvedValue(undefined); + vi.mocked(RateLimitService.trackUserDailyCost).mockResolvedValue(undefined); + vi.mocked(SessionTracker.refreshSession).mockResolvedValue(undefined); + vi.mocked(updateMessageRequestCostWithBreakdown).mockResolvedValue(undefined); + vi.mocked(SessionManager.updateSessionUsage).mockResolvedValue(undefined); + vi.mocked(RateLimitService.trackCost).mockResolvedValue(undefined); + + const session = createSession({ + originalModel, + redirectedModel, + sessionId: "sess-image-edit-zero-per-request", + messageId: 4002, + requestPath: "/v1/images/edits", + providerOverrides: { + providerType: "openai", + url: "https://api.openai.com/v1", + }, + }); + + const clientResponse = await ProxyResponseHandler.dispatch( + session, + createImageEditResponseWithoutUsage() + ); + await clientResponse.text(); + await drainAsyncTasks(); + + expect(updateMessageRequestCostWithBreakdown).not.toHaveBeenCalled(); + expect(SessionManager.updateSessionUsage).not.toHaveBeenCalled(); + expect(RateLimitService.trackCost).not.toHaveBeenCalled(); + }); + + it("价格查询失败时跳过按次计费且不影响成功响应", async () => { + invalidateSystemSettingsCache(); + + const originalModel = "gpt-image-2"; + const redirectedModel = "gpt-image-2-all"; + + vi.mocked(getSystemSettings).mockResolvedValue(makeSystemSettings("original")); + vi.mocked(findLatestPriceByModel).mockImplementation(async () => { + throw new Error("pricing db unavailable"); + }); + + vi.mocked(updateMessageRequestDetails).mockResolvedValue(undefined); + vi.mocked(updateMessageRequestDuration).mockResolvedValue(undefined); + vi.mocked(SessionManager.storeSessionResponse).mockResolvedValue(undefined); + vi.mocked(RateLimitService.trackUserDailyCost).mockResolvedValue(undefined); + vi.mocked(SessionTracker.refreshSession).mockResolvedValue(undefined); + vi.mocked(updateMessageRequestCostWithBreakdown).mockResolvedValue(undefined); + vi.mocked(SessionManager.updateSessionUsage).mockResolvedValue(undefined); + vi.mocked(RateLimitService.trackCost).mockResolvedValue(undefined); + + const session = createSession({ + originalModel, + redirectedModel, + sessionId: "sess-image-edit-pricing-error", + messageId: 4004, + requestPath: "/v1/images/edits", + providerOverrides: { + providerType: "openai", + url: "https://api.openai.com/v1", + }, + }); + + const clientResponse = await ProxyResponseHandler.dispatch( + session, + createImageEditResponseWithoutUsage() + ); + const responseText = await clientResponse.text(); + await drainAsyncTasks(); + + expect(clientResponse.status).toBe(200); + expect(JSON.parse(responseText)).toMatchObject({ + data: [{ b64_json: "test-image-bytes" }], + }); + expect(updateMessageRequestCostWithBreakdown).not.toHaveBeenCalled(); + expect(SessionManager.updateSessionUsage).not.toHaveBeenCalled(); + expect(RateLimitService.trackCost).not.toHaveBeenCalled(); + }); + + it("上游假 200 错误 payload 不触发图片按次计费", async () => { + invalidateSystemSettingsCache(); + + const originalModel = "gpt-image-2"; + const redirectedModel = "gpt-image-2-all"; + + vi.mocked(getSystemSettings).mockResolvedValue(makeSystemSettings("original")); + vi.mocked(findLatestPriceByModel).mockImplementation(async (modelName: string) => { + return makePriceRecord(modelName, { input_cost_per_request: 0.01 }, "manual"); + }); + + vi.mocked(updateMessageRequestDetails).mockResolvedValue(undefined); + vi.mocked(updateMessageRequestDuration).mockResolvedValue(undefined); + vi.mocked(SessionManager.storeSessionResponse).mockResolvedValue(undefined); + vi.mocked(RateLimitService.trackUserDailyCost).mockResolvedValue(undefined); + vi.mocked(SessionTracker.refreshSession).mockResolvedValue(undefined); + vi.mocked(updateMessageRequestCostWithBreakdown).mockResolvedValue(undefined); + vi.mocked(SessionManager.updateSessionUsage).mockResolvedValue(undefined); + vi.mocked(RateLimitService.trackCost).mockResolvedValue(undefined); + + const session = createSession({ + originalModel, + redirectedModel, + sessionId: "sess-image-edit-fake-200-error", + messageId: 4005, + requestPath: "/v1/images/edits", + providerOverrides: { + providerType: "openai", + url: "https://api.openai.com/v1", + }, + }); + + const clientResponse = await ProxyResponseHandler.dispatch( + session, + createFake200ErrorResponse() + ); + const responseText = await clientResponse.text(); + await drainAsyncTasks(); + + expect(clientResponse.status).toBe(200); + expect(JSON.parse(responseText)).toMatchObject({ + error: { message: "invalid api key" }, + }); + expect(updateMessageRequestCostWithBreakdown).not.toHaveBeenCalled(); + expect(SessionManager.updateSessionUsage).not.toHaveBeenCalled(); + expect(RateLimitService.trackCost).not.toHaveBeenCalled(); + }); + + it("finalizeRequestStats 的按次计费 session usage 保留 errorMessage", async () => { + invalidateSystemSettingsCache(); + + const originalModel = "gpt-image-2"; + const redirectedModel = "gpt-image-2-all"; + const errorMessage = "fake 200 upstream warning"; + + vi.mocked(getSystemSettings).mockResolvedValue(makeSystemSettings("original")); + vi.mocked(findLatestPriceByModel).mockImplementation(async (modelName: string) => { + return makePriceRecord(modelName, { input_cost_per_request: 0.01 }, "manual"); + }); + + vi.mocked(updateMessageRequestCostWithBreakdown).mockResolvedValue(undefined); + vi.mocked(updateMessageRequestDetails).mockResolvedValue(undefined); + vi.mocked(RateLimitService.trackCost).mockResolvedValue(undefined); + + let sessionUsagePayload: Record | undefined; + vi.mocked(SessionManager.updateSessionUsage).mockImplementation( + async (_sessionId: string, payload: Record) => { + sessionUsagePayload = payload; + } + ); + + const session = createSession({ + originalModel, + redirectedModel, + sessionId: "sess-image-edit-finalize-error-message", + messageId: 4003, + requestPath: "/v1/images/edits", + providerOverrides: { + providerType: "openai", + url: "https://api.openai.com/v1", + }, + }); + + await finalizeRequestStats( + session, + JSON.stringify({ + created: 1_776_729_600, + data: [{ b64_json: "test-image-bytes" }], + }), + 200, + 42, + errorMessage, + 99, + false + ); + + expect(sessionUsagePayload).toMatchObject({ + costUsd: "0.01", + status: "completed", + statusCode: 200, + errorMessage, + }); + }); +}); + describe("价格表缺失/查询失败:不计费放行", () => { async function runNoPriceScenario(options: { billingModelSource: SystemSettings["billingModelSource"]; diff --git a/tests/unit/actions/providers-usage.test.ts b/tests/unit/actions/providers-usage.test.ts index 74838c196..f05450f45 100644 --- a/tests/unit/actions/providers-usage.test.ts +++ b/tests/unit/actions/providers-usage.test.ts @@ -17,6 +17,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const getSessionMock = vi.fn(); const findProviderByIdMock = vi.fn(); const sumProviderCostInTimeRangeMock = vi.fn(); +const sumProviderTotalCostMock = vi.fn(); const getProviderSessionCountMock = vi.fn(); const getProviderSessionCountBatchMock = vi.fn(); const getTimeRangeForPeriodMock = vi.fn(); @@ -37,6 +38,8 @@ vi.mock("@/repository/provider", () => ({ vi.mock("@/repository/statistics", () => ({ sumProviderCostInTimeRange: (providerId: number, startTime: Date, endTime: Date) => sumProviderCostInTimeRangeMock(providerId, startTime, endTime), + sumProviderTotalCost: (providerId: number, resetAt?: Date | null) => + sumProviderTotalCostMock(providerId, resetAt), })); vi.mock("@/lib/session-tracker", () => ({ @@ -97,6 +100,8 @@ describe("getProviderLimitUsage", () => { limitDailyUsd: 50, limitWeeklyUsd: 200, limitMonthlyUsd: 500, + limitTotalUsd: 1000, + totalCostResetAt: new Date(nowMs - 3 * 60 * 60 * 1000), limitConcurrentSessions: 5, }; @@ -166,6 +171,7 @@ describe("getProviderLimitUsage", () => { // Default DB costs sumProviderCostInTimeRangeMock.mockResolvedValue(5.5); + sumProviderTotalCostMock.mockResolvedValue(0); }); afterEach(() => { @@ -197,6 +203,14 @@ describe("getProviderLimitUsage", () => { expect(getTimeRangeForPeriodMock).toHaveBeenCalledWith("monthly", undefined); }); + it("should pass total cost reset time to sumProviderTotalCost", async () => { + const { getProviderLimitUsage } = await import("@/actions/providers"); + + await getProviderLimitUsage(1); + + expect(sumProviderTotalCostMock).toHaveBeenCalledWith(1, mockProvider.totalCostResetAt); + }); + it("should call getTimeRangeForPeriodWithMode for daily with provider config", async () => { const { getProviderLimitUsage } = await import("@/actions/providers"); @@ -323,6 +337,8 @@ describe("getProviderLimitUsageBatch", () => { limitDailyUsd: 50, limitWeeklyUsd: 200, limitMonthlyUsd: 500, + limitTotalUsd: 1000, + totalCostResetAt: new Date(nowMs - 3 * 60 * 60 * 1000), limitConcurrentSessions: 5, }, { @@ -334,6 +350,8 @@ describe("getProviderLimitUsageBatch", () => { limitDailyUsd: 100, limitWeeklyUsd: 400, limitMonthlyUsd: 1000, + limitTotalUsd: 2000, + totalCostResetAt: new Date(nowMs - 6 * 60 * 60 * 1000), limitConcurrentSessions: 10, }, ]; @@ -403,6 +421,7 @@ describe("getProviderLimitUsageBatch", () => { get5hWindowResetAtMock.mockResolvedValue(null); sumProviderCostInTimeRangeMock.mockResolvedValue(5.5); + sumProviderTotalCostMock.mockResolvedValue(0); }); afterEach(() => { @@ -433,6 +452,15 @@ describe("getProviderLimitUsageBatch", () => { expect(getTimeRangeForPeriodWithModeMock).toHaveBeenCalledWith("daily", "18:00", "rolling"); }); + it("should pass each provider total reset time to sumProviderTotalCost", async () => { + const { getProviderLimitUsageBatch } = await import("@/actions/providers"); + + await getProviderLimitUsageBatch(mockProviders); + + expect(sumProviderTotalCostMock).toHaveBeenCalledWith(1, mockProviders[0].totalCostResetAt); + expect(sumProviderTotalCostMock).toHaveBeenCalledWith(2, mockProviders[1].totalCostResetAt); + }); + it("should return empty map for empty providers array", async () => { const { getProviderLimitUsageBatch } = await import("@/actions/providers"); diff --git a/tests/unit/i18n/locale-server-translations.test.ts b/tests/unit/i18n/locale-server-translations.test.ts new file mode 100644 index 000000000..2f8e3407e --- /dev/null +++ b/tests/unit/i18n/locale-server-translations.test.ts @@ -0,0 +1,43 @@ +import { readdirSync, readFileSync } from "node:fs"; +import { basename, join } from "node:path"; +import { describe, expect, test } from "vitest"; + +function walk(dir: string): string[] { + return readdirSync(dir, { withFileTypes: true }).flatMap((entry) => { + const fullPath = join(dir, entry.name); + if (entry.isDirectory()) { + return walk(fullPath); + } + return /\.(ts|tsx)$/.test(entry.name) ? [fullPath] : []; + }); +} + +function isRouteOrServerChromeFile(filePath: string): boolean { + const fileName = basename(filePath); + + return ( + fileName === "page.tsx" || + fileName === "layout.tsx" || + filePath.endsWith("dashboard-header.tsx") || + filePath.endsWith("dashboard-sections.tsx") || + filePath.endsWith("settings/_lib/nav-items.ts") + ); +} + +describe("locale server translations", () => { + test("route pages and server chrome pass locale explicitly to getTranslations", () => { + const files = walk("src/app/[locale]").filter(isRouteOrServerChromeFile); + const violations = files.flatMap((file) => { + const content = readFileSync(file, "utf8"); + const lines = content.split("\n"); + return [...content.matchAll(/getTranslations\(\s*["']/g)].map((match) => { + const offset = match.index ?? 0; + const lineNumber = content.slice(0, offset).split("\n").length; + const line = lines[lineNumber - 1] ?? ""; + return `${file}:${lineNumber}: ${line.trim()}`; + }); + }); + + expect(violations).toEqual([]); + }); +}); diff --git a/tests/unit/langfuse/langfuse-trace.test.ts b/tests/unit/langfuse/langfuse-trace.test.ts index 97c1c06e5..fb0871f62 100644 --- a/tests/unit/langfuse/langfuse-trace.test.ts +++ b/tests/unit/langfuse/langfuse-trace.test.ts @@ -513,6 +513,75 @@ describe("traceProxyRequest", () => { }); }); + test("should mark missing non-stream output for error traces", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 502, + isStreaming: false, + errorMessage: "fetch failed", + }); + + const expectedOutput = { + statusCode: 502, + errorMessage: "fetch failed", + responseMissing: true, + }; + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].output).toEqual(expectedOutput); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].output).toEqual(expectedOutput); + expect(mockRootSpan.updateTrace).toHaveBeenCalledWith( + expect.objectContaining({ + output: expectedOutput, + }) + ); + }); + + test("should mark missing non-stream output when request input exists", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession({ + request: { + message: { + model: "claude-sonnet-4-20250514", + messages: [{ role: "user", content: "Hello" }], + stream: false, + }, + model: "claude-sonnet-4-20250514", + }, + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 204, + isStreaming: false, + }); + + const expectedOutput = { + statusCode: 204, + responseMissing: true, + }; + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].output).toEqual(expectedOutput); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].output).toEqual(expectedOutput); + expect(mockRootSpan.updateTrace).toHaveBeenCalledWith( + expect.objectContaining({ + output: expectedOutput, + }) + ); + }); + test("should include costUsd in root span metadata", async () => { const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); diff --git a/tests/unit/proxy/combine-abort-signals.test.ts b/tests/unit/proxy/combine-abort-signals.test.ts new file mode 100644 index 000000000..7a86ce975 --- /dev/null +++ b/tests/unit/proxy/combine-abort-signals.test.ts @@ -0,0 +1,90 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { combineAbortSignals } from "@/app/v1/_lib/proxy/combine-abort-signals"; + +type MutableAbortSignal = { any?: unknown }; + +describe("combineAbortSignals", () => { + describe("native AbortSignal.any path", () => { + it("delegates to AbortSignal.any when available and cleanup is noop", () => { + const c1 = new AbortController(); + const c2 = new AbortController(); + + const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]); + + expect(signal.aborted).toBe(false); + c1.abort(); + expect(signal.aborted).toBe(true); + + // cleanup should be safe to call (noop) — no listeners owned by us. + expect(() => cleanup()).not.toThrow(); + }); + }); + + describe("polyfill path (AbortSignal.any unavailable)", () => { + let originalAny: unknown; + + beforeEach(() => { + originalAny = (AbortSignal as MutableAbortSignal).any; + // 赋 undefined 让 helper 的 `typeof ... === "function"` check 走 polyfill; + // delete 在部分 V8 版本上对 static 不生效,赋值更可靠。 + (AbortSignal as MutableAbortSignal).any = undefined; + }); + + afterEach(() => { + (AbortSignal as MutableAbortSignal).any = originalAny; + }); + + it("aborts combined signal when any source aborts", () => { + const c1 = new AbortController(); + const c2 = new AbortController(); + + const { signal } = combineAbortSignals([c1.signal, c2.signal]); + expect(signal.aborted).toBe(false); + c2.abort(); + expect(signal.aborted).toBe(true); + }); + + it("source-side abort listeners are detached after cleanup is invoked", () => { + const c1 = new AbortController(); + const c2 = new AbortController(); + + const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]); + expect(signal.aborted).toBe(false); + + // 模拟请求正常完成:调用方在 finally 中触发 cleanup。 + cleanup(); + + // 源信号此后再 abort,不应再传播到组合信号(listener 已解绑)。 + c1.abort(); + c2.abort(); + expect(signal.aborted).toBe(false); + }); + + it("auto-cleans listeners when a source aborts (does not require explicit cleanup)", () => { + const c1 = new AbortController(); + const c2 = new AbortController(); + + const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]); + c1.abort(); + expect(signal.aborted).toBe(true); + + // 二次 cleanup 必须幂等(请求结束的 finally 仍会调)。 + expect(() => cleanup()).not.toThrow(); + expect(() => cleanup()).not.toThrow(); + }); + + it("immediately aborts and cleans up when a source signal is already aborted", () => { + const c1 = new AbortController(); + c1.abort(); + const c2 = new AbortController(); + + const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]); + expect(signal.aborted).toBe(true); + + // 后到的源 abort 不应再触发任何路径(已 cleanup)。 + c2.abort(); + expect(signal.aborted).toBe(true); + expect(() => cleanup()).not.toThrow(); + }); + }); +}); diff --git a/tests/unit/proxy/error-handler-langfuse-trace.test.ts b/tests/unit/proxy/error-handler-langfuse-trace.test.ts new file mode 100644 index 000000000..80994a904 --- /dev/null +++ b/tests/unit/proxy/error-handler-langfuse-trace.test.ts @@ -0,0 +1,312 @@ +import { beforeEach, describe, expect, test, vi } from "vitest"; + +const mocks = vi.hoisted(() => ({ + emitProxyLangfuseTrace: vi.fn(), + getCachedSystemSettings: vi.fn(async () => ({ + verboseProviderError: false, + passThroughUpstreamErrorMessage: false, + })), + getErrorOverrideAsync: vi.fn(async () => undefined), + updateMessageRequestDetails: vi.fn(async () => undefined), + updateMessageRequestDuration: vi.fn(async () => undefined), + endRequest: vi.fn(), +})); + +vi.mock("@/lib/langfuse/emit-proxy-trace", () => ({ + emitProxyLangfuseTrace: mocks.emitProxyLangfuseTrace, +})); + +vi.mock("@/lib/config/system-settings-cache", () => ({ + getCachedSystemSettings: mocks.getCachedSystemSettings, +})); + +vi.mock("@/repository/message", () => ({ + updateMessageRequestDetails: mocks.updateMessageRequestDetails, + updateMessageRequestDuration: mocks.updateMessageRequestDuration, +})); + +vi.mock("@/lib/proxy-status-tracker", () => ({ + ProxyStatusTracker: { + getInstance: () => ({ + endRequest: mocks.endRequest, + }), + }, +})); + +vi.mock("@/lib/logger", () => ({ + logger: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + trace: vi.fn(), + error: vi.fn(), + fatal: vi.fn(), + }, +})); + +vi.mock("@/app/v1/_lib/proxy/errors", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getErrorOverrideAsync: mocks.getErrorOverrideAsync, + }; +}); + +import { ProxyErrorHandler } from "@/app/v1/_lib/proxy/error-handler"; +import { ProxyError, RateLimitError } from "@/app/v1/_lib/proxy/errors"; + +function createSession(overrides: Record = {}): any { + const requestMessage = { + model: "claude-sonnet-4-20250514", + messages: [{ role: "user", content: "hello" }], + }; + + return { + sessionId: "s_langfuse_error", + messageContext: { + id: "msg_langfuse_error", + user: { id: 42, name: "test-user" }, + key: { name: "test-key" }, + }, + startTime: Date.now() - 25, + method: "POST", + originalFormat: "claude", + request: { + message: requestMessage, + model: requestMessage.model, + log: JSON.stringify(requestMessage), + }, + headers: new Headers({ "user-agent": "vitest" }), + provider: { + id: 7, + name: "provider-a", + providerType: "claude", + swapCacheTtlBilling: false, + }, + getProviderChain: () => [], + getCurrentModel: () => requestMessage.model, + getContext1mApplied: () => false, + getGroupCostMultiplier: () => 1, + getSpecialSettings: () => null, + getEndpoint: () => "/v1/messages", + getRequestSequence: () => 1, + ...overrides, + }; +} + +describe("ProxyErrorHandler.handle - Langfuse error traces", () => { + beforeEach(() => { + vi.clearAllMocks(); + mocks.getCachedSystemSettings.mockResolvedValue({ + verboseProviderError: false, + passThroughUpstreamErrorMessage: false, + }); + mocks.getErrorOverrideAsync.mockResolvedValue(undefined); + }); + + test("emits trace for local request errors without upstream output", async () => { + const session = createSession(); + + await ProxyErrorHandler.handle(session, new ProxyError("Invalid request: missing model", 400)); + + expect(mocks.emitProxyLangfuseTrace).toHaveBeenCalledWith( + session, + expect.objectContaining({ + responseHeaders: expect.any(Headers), + responseText: "", + usageMetrics: null, + costUsd: undefined, + statusCode: 400, + isStreaming: false, + errorMessage: "Invalid request: missing model", + }) + ); + expect(mocks.emitProxyLangfuseTrace.mock.calls[0][1].durationMs).toBeGreaterThanOrEqual(0); + }); + + test("emits trace for thrown network errors without upstream output", async () => { + const session = createSession(); + + await ProxyErrorHandler.handle(session, new Error("fetch failed")); + + expect(mocks.emitProxyLangfuseTrace).toHaveBeenCalledWith( + session, + expect.objectContaining({ + responseText: "", + statusCode: 500, + isStreaming: false, + errorMessage: "fetch failed", + }) + ); + }); + + test("emits trace before database persistence failures can abort handling", async () => { + const session = createSession(); + mocks.updateMessageRequestDuration.mockRejectedValueOnce(new Error("db down")); + + await expect(ProxyErrorHandler.handle(session, new Error("fetch failed"))).rejects.toThrow( + "db down" + ); + + expect(mocks.emitProxyLangfuseTrace).toHaveBeenCalledWith( + session, + expect.objectContaining({ + responseText: "", + statusCode: 500, + errorMessage: "fetch failed", + }) + ); + }); + + test("uses upstream raw body as trace output when available", async () => { + const session = createSession(); + const error = new ProxyError("Upstream failed", 502, { + body: "sanitized upstream body", + rawBody: '{"error":{"message":"raw upstream failure"}}', + rawBodyTruncated: false, + providerId: 7, + providerName: "provider-a", + }); + + await ProxyErrorHandler.handle(session, error); + + expect(mocks.emitProxyLangfuseTrace).toHaveBeenCalledWith( + session, + expect.objectContaining({ + responseText: '{"error":{"message":"raw upstream failure"}}', + statusCode: 502, + errorMessage: expect.stringContaining("Upstream failed"), + }) + ); + }); + + test("emits final override response and status after error override is applied", async () => { + const session = createSession(); + mocks.getErrorOverrideAsync.mockResolvedValueOnce({ + statusCode: 429, + response: { + error: { + message: "masked quota message", + type: "rate_limit_error", + }, + }, + }); + + const response = await ProxyErrorHandler.handle( + session, + new ProxyError("Upstream failed", 502, { + rawBody: '{"error":{"message":"raw upstream failure"}}', + providerId: 7, + providerName: "provider-a", + }) + ); + + expect(response.status).toBe(429); + expect(mocks.emitProxyLangfuseTrace).toHaveBeenCalledWith( + session, + expect.objectContaining({ + responseText: expect.stringContaining("masked quota message"), + statusCode: 429, + errorMessage: "masked quota message", + }) + ); + expect(mocks.emitProxyLangfuseTrace.mock.calls[0][1].responseText).not.toContain( + "raw upstream failure" + ); + expect(mocks.updateMessageRequestDetails).toHaveBeenCalledWith( + session.messageContext.id, + expect.objectContaining({ + statusCode: 429, + }) + ); + }); + + test("falls back to upstream body when raw body is missing", async () => { + const session = createSession(); + const error = new ProxyError("Upstream failed", 502, { + body: "sanitized upstream body", + rawBodyTruncated: false, + providerId: 7, + providerName: "provider-a", + }); + + await ProxyErrorHandler.handle(session, error); + + expect(mocks.emitProxyLangfuseTrace).toHaveBeenCalledWith( + session, + expect.objectContaining({ + responseText: "sanitized upstream body", + statusCode: 502, + errorMessage: expect.stringContaining("Upstream failed"), + }) + ); + }); + + test("preserves streaming request context for early error traces", async () => { + const requestMessage = { + model: "claude-sonnet-4-20250514", + messages: [{ role: "user", content: "hello" }], + stream: true, + }; + const session = createSession({ + request: { + message: requestMessage, + model: requestMessage.model, + log: JSON.stringify(requestMessage), + }, + }); + + await ProxyErrorHandler.handle(session, new Error("fetch failed")); + + expect(mocks.emitProxyLangfuseTrace).toHaveBeenCalledWith( + session, + expect.objectContaining({ + responseText: "", + statusCode: 500, + isStreaming: true, + sseEventCount: 0, + errorMessage: "fetch failed", + }) + ); + }); + + test("detects Gemini SSE URLs as streaming for early error traces", async () => { + const session = createSession({ + requestUrl: new URL( + "https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:streamGenerateContent?alt=sse" + ), + }); + + await ProxyErrorHandler.handle(session, new Error("fetch failed")); + + expect(mocks.emitProxyLangfuseTrace).toHaveBeenCalledWith( + session, + expect.objectContaining({ + responseText: "", + statusCode: 500, + isStreaming: true, + sseEventCount: 0, + errorMessage: "fetch failed", + }) + ); + }); + + test("emits trace for rate limit early returns", async () => { + const session = createSession(); + + await ProxyErrorHandler.handle( + session, + new RateLimitError("rate_limit_error", "limit exceeded", "daily_quota", 12, 20, null) + ); + + expect(mocks.emitProxyLangfuseTrace).toHaveBeenCalledWith( + session, + expect.objectContaining({ + responseText: "", + statusCode: 402, + isStreaming: false, + errorMessage: "limit exceeded", + }) + ); + }); +}); diff --git a/tests/unit/proxy/proxy-forwarder-hedge-first-byte.test.ts b/tests/unit/proxy/proxy-forwarder-hedge-first-byte.test.ts index 0f41408b9..2f8f1d46f 100644 --- a/tests/unit/proxy/proxy-forwarder-hedge-first-byte.test.ts +++ b/tests/unit/proxy/proxy-forwarder-hedge-first-byte.test.ts @@ -127,6 +127,7 @@ import type { Provider } from "@/types/provider"; type AttemptRuntime = { clearResponseTimeout?: () => void; responseController?: AbortController; + releaseAgent?: () => void; }; function createProvider(overrides: Partial = {}): Provider { @@ -545,11 +546,14 @@ describe("ProxyForwarder - first-byte hedge scheduling", () => { const controller1 = new AbortController(); const controller2 = new AbortController(); + const releaseInitialAgent = vi.fn(); + const releaseLoserAgent = vi.fn(); doForward.mockImplementationOnce(async (attemptSession, providerForRequest) => { const runtime = attemptSession as ProxySession & AttemptRuntime; runtime.responseController = controller1; runtime.clearResponseTimeout = vi.fn(); + runtime.releaseAgent = releaseInitialAgent; expect( ModelRedirector.apply(attemptSession as ProxySession, providerForRequest as Provider) ).toBe(true); @@ -564,6 +568,7 @@ describe("ProxyForwarder - first-byte hedge scheduling", () => { const runtime = attemptSession as ProxySession & AttemptRuntime; runtime.responseController = controller2; runtime.clearResponseTimeout = vi.fn(); + runtime.releaseAgent = releaseLoserAgent; expect( ModelRedirector.apply(attemptSession as ProxySession, providerForRequest as Provider) ).toBe(true); @@ -601,6 +606,77 @@ describe("ProxyForwarder - first-byte hedge scheduling", () => { billingModel: requestedModel, }); expect(mocks.releaseProviderSession).toHaveBeenCalledWith(fireworks.id, "sess-hedge"); + expect(releaseInitialAgent).toHaveBeenCalledTimes(1); + expect(releaseLoserAgent).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + + test("hedge loser 在 releaseAgent 晚到时仍会释放 agent cleanup", async () => { + vi.useFakeTimers(); + + try { + const slow = createProvider({ id: 383, name: "slow", firstByteTimeoutStreamingMs: 100 }); + const fast = createProvider({ id: 206, name: "fast", firstByteTimeoutStreamingMs: 100 }); + const session = createSession(); + setProviderWithSessionRef(session, slow); + session.addProviderToChain(slow, { reason: "initial_selection" }); + + mocks.pickRandomProviderWithExclusion.mockResolvedValueOnce(fast); + + const doForward = vi.spyOn( + ProxyForwarder as unknown as { + doForward: (...args: unknown[]) => Promise; + }, + "doForward" + ); + + const slowController = new AbortController(); + const fastController = new AbortController(); + const releaseSlowAgent = vi.fn(); + const releaseFastAgent = vi.fn(); + + doForward.mockImplementationOnce(async (attemptSession) => { + await new Promise((resolve) => setTimeout(resolve, 180)); + const runtime = attemptSession as ProxySession & AttemptRuntime; + runtime.responseController = slowController; + runtime.clearResponseTimeout = vi.fn(); + runtime.releaseAgent = releaseSlowAgent; + return createStreamingResponse({ + label: "slow", + firstChunkDelayMs: 0, + controller: slowController, + }); + }); + + doForward.mockImplementationOnce(async (attemptSession) => { + const runtime = attemptSession as ProxySession & AttemptRuntime; + runtime.responseController = fastController; + runtime.clearResponseTimeout = vi.fn(); + runtime.releaseAgent = releaseFastAgent; + return createStreamingResponse({ + label: "fast", + firstChunkDelayMs: 20, + controller: fastController, + }); + }); + + const responsePromise = ProxyForwarder.send(session); + + await vi.advanceTimersByTimeAsync(100); + expect(doForward).toHaveBeenCalledTimes(2); + + await vi.advanceTimersByTimeAsync(50); + const response = await responsePromise; + expect(await response.text()).toContain('"provider":"fast"'); + expect(releaseSlowAgent).not.toHaveBeenCalled(); + expect(releaseFastAgent).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(150); + + expect(releaseSlowAgent).toHaveBeenCalledTimes(1); + expect(releaseFastAgent).not.toHaveBeenCalled(); } finally { vi.useRealTimers(); } diff --git a/tests/unit/proxy/proxy-forwarder-provider-session-release.test.ts b/tests/unit/proxy/proxy-forwarder-provider-session-release.test.ts index 98bf78997..efc8a9028 100644 --- a/tests/unit/proxy/proxy-forwarder-provider-session-release.test.ts +++ b/tests/unit/proxy/proxy-forwarder-provider-session-release.test.ts @@ -109,4 +109,61 @@ describe("ProxyForwarder provider failure session release", () => { expect(failedProviderIds).toEqual([42]); expect(mocks.releaseProviderSession).not.toHaveBeenCalled(); }); + + it("同步 hedge 胜出 shadow session 时保留 releaseAgent cleanup 回调", async () => { + const { ProxyForwarder } = await import("@/app/v1/_lib/proxy/forwarder"); + const forwarderInternals = ProxyForwarder as unknown as { + syncWinningAttemptSession: (target: ProxySession, source: ProxySession) => void; + }; + const clearResponseTimeout = vi.fn(); + const releaseAgent = vi.fn(); + const responseController = new AbortController(); + const setTargetCacheTtlResolved = vi.fn(); + const setTargetContext1mApplied = vi.fn(); + const target = { + request: { message: null, buffer: null, log: null, note: null }, + requestUrl: new URL("https://example.com/v1/messages"), + forwardedRequestBody: null, + providerChain: [], + specialSettings: [], + originalModelName: null, + originalUrlPathname: null, + currentModelRedirect: null, + getCacheTtlResolved: vi.fn(() => null), + setCacheTtlResolved: setTargetCacheTtlResolved, + getContext1mApplied: vi.fn(() => false), + setContext1mApplied: setTargetContext1mApplied, + } as unknown as ProxySession; + const source = { + request: { message: "winner", buffer: null, log: null, note: null }, + requestUrl: new URL("https://shadow.example.com/v1/messages"), + forwardedRequestBody: '{"model":"winner"}', + providerChain: [], + specialSettings: [], + originalModelName: "winner-model", + originalUrlPathname: "/v1/messages", + currentModelRedirect: null, + getCacheTtlResolved: vi.fn(() => "5m"), + setCacheTtlResolved: vi.fn(), + getContext1mApplied: vi.fn(() => true), + setContext1mApplied: vi.fn(), + clearResponseTimeout, + responseController, + releaseAgent, + } as unknown as ProxySession; + + forwarderInternals.syncWinningAttemptSession(target, source); + + expect( + (target as ProxySession & { clearResponseTimeout?: () => void }).clearResponseTimeout + ).toBe(clearResponseTimeout); + expect( + (target as ProxySession & { responseController?: AbortController }).responseController + ).toBe(responseController); + expect((target as ProxySession & { releaseAgent?: () => void }).releaseAgent).toBe( + releaseAgent + ); + expect(setTargetCacheTtlResolved).toHaveBeenCalledWith("5m"); + expect(setTargetContext1mApplied).toHaveBeenCalledWith(true); + }); }); diff --git a/tests/unit/repository/leaderboard-provider-metrics.test.ts b/tests/unit/repository/leaderboard-provider-metrics.test.ts index ced8b33ed..2cc659d14 100644 --- a/tests/unit/repository/leaderboard-provider-metrics.test.ts +++ b/tests/unit/repository/leaderboard-provider-metrics.test.ts @@ -691,3 +691,52 @@ describe("Model Leaderboard basis handling", () => { }); }); }); + +describe("Model Leaderboard sort order", () => { + beforeEach(() => { + vi.resetModules(); + selectCallIndex = 0; + chainMocks = []; + mockSelect.mockClear(); + mocks.resolveSystemTimezone.mockResolvedValue("UTC"); + mocks.getSystemSettings.mockResolvedValue({ billingModelSource: "redirected" }); + }); + + it("orders by total cost descending with request count as tiebreaker", async () => { + chainMocks = [ + createChainMock([ + { + model: "expensive-low-volume", + totalRequests: 5, + totalCost: "50.0", + totalTokens: 1000, + successRate: 1.0, + }, + { + model: "cheap-high-volume", + totalRequests: 200, + totalCost: "1.0", + totalTokens: 100000, + successRate: 1.0, + }, + ]), + ]; + + const { findDailyModelLeaderboard } = await import("@/repository/leaderboard"); + const result = await findDailyModelLeaderboard(); + + expect(result).toHaveLength(2); + expect(result[0].model).toBe("expensive-low-volume"); + expect(result[0].totalCost).toBe(50); + expect(result[1].model).toBe("cheap-high-volume"); + expect(result[1].totalCost).toBe(1); + + const orderByMock = chainMocks[0].orderBy; + expect(orderByMock).toHaveBeenCalledTimes(1); + + const args = orderByMock.mock.calls[0]; + expect(args).toHaveLength(2); + expect(JSON.stringify(args[0])).toContain("sum"); + expect(JSON.stringify(args[1])).toContain("count"); + }); +}); diff --git a/tests/unit/settings/prices-layout.test.ts b/tests/unit/settings/prices-layout.test.ts new file mode 100644 index 000000000..6359e32ab --- /dev/null +++ b/tests/unit/settings/prices-layout.test.ts @@ -0,0 +1,23 @@ +import fs from "node:fs"; +import path from "node:path"; +import { describe, expect, test } from "vitest"; + +function readProjectFile(...segments: string[]) { + return fs.readFileSync(path.join(process.cwd(), ...segments), "utf8"); +} + +describe("settings prices layout constraints", () => { + test("settings content column can shrink inside the centered page container", () => { + const source = readProjectFile("src/app/[locale]/settings/layout.tsx"); + + expect(source).toContain('className="mx-auto w-full max-w-7xl'); + expect(source).toContain('className="min-w-0 space-y-6"'); + }); + + test("price table scrolls horizontally inside its settings section", () => { + const source = readProjectFile("src/app/[locale]/settings/prices/_components/price-list.tsx"); + + expect(source).toMatch(/