From 415d08f2558322afa7f034107354ec7d1b668081 Mon Sep 17 00:00:00 2001 From: wucm667 Date: Fri, 29 May 2026 10:25:26 +0800 Subject: [PATCH] fix(scheduler): add sticky health escape --- backend/internal/config/config.go | 27 ++ backend/internal/config/config_test.go | 24 ++ .../service/openai_account_scheduler.go | 105 ++++++-- .../service/openai_account_scheduler_test.go | 250 ++++++++++++++++++ deploy/config.example.yaml | 8 + 5 files changed, 399 insertions(+), 15 deletions(-) diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index f689c2a908e..a50dd5ee829 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -716,6 +716,8 @@ type GatewayConfig struct { OpenAIPassthroughAllowTimeoutHeaders bool `mapstructure:"openai_passthrough_allow_timeout_headers"` // OpenAIWS: OpenAI Responses WebSocket 配置(默认开启,可按需回滚到 HTTP) OpenAIWS GatewayOpenAIWSConfig `mapstructure:"openai_ws"` + // OpenAIScheduler: OpenAI 高级调度器粘性逃逸配置 + OpenAIScheduler GatewayOpenAISchedulerConfig `mapstructure:"openai_scheduler"` // OpenAIHTTP2: OpenAI HTTP 上游协议策略(默认启用 HTTP/2,可按代理能力回退 HTTP/1.1) OpenAIHTTP2 GatewayOpenAIHTTP2Config `mapstructure:"openai_http2"` // ImageConcurrency: 图片生成独立并发限制配置(默认关闭) @@ -948,6 +950,16 @@ type GatewayOpenAIWSSchedulerScoreWeights struct { TTFT float64 `mapstructure:"ttft"` } +// GatewayOpenAISchedulerConfig OpenAI 高级调度器配置。 +type GatewayOpenAISchedulerConfig struct { + // StickyEscapeEnabled: 是否允许 session_hash sticky 在账号健康度劣化时临时逃逸 + StickyEscapeEnabled bool `mapstructure:"sticky_escape_enabled"` + // StickyEscapeTTFTMs: TTFT EWMA 超过该阈值时跳过 sticky + StickyEscapeTTFTMs int `mapstructure:"sticky_escape_ttft_ms"` + // StickyEscapeErrorRate: 错误率 EWMA 超过该阈值时跳过 sticky + StickyEscapeErrorRate float64 `mapstructure:"sticky_escape_error_rate"` +} + // GatewayUsageRecordConfig 使用量记录异步队列配置 type GatewayUsageRecordConfig struct { // WorkerCount: worker 初始数量(自动扩缩容开启时作为初始并发上限) @@ -1369,6 +1381,15 @@ func load(allowMissingJWTSecret bool) (*Config, error) { if err := viper.Unmarshal(&cfg); err != nil { return nil, fmt.Errorf("unmarshal config error: %w", err) } + if cfg.Gateway.OpenAIScheduler.StickyEscapeTTFTMs == 0 { + cfg.Gateway.OpenAIScheduler.StickyEscapeTTFTMs = 15000 + } + if cfg.Gateway.OpenAIScheduler.StickyEscapeErrorRate == 0 { + cfg.Gateway.OpenAIScheduler.StickyEscapeErrorRate = 0.5 + } + if !cfg.Gateway.OpenAIScheduler.StickyEscapeEnabled && !viper.IsSet("gateway.openai_scheduler.sticky_escape_enabled") { + cfg.Gateway.OpenAIScheduler.StickyEscapeEnabled = true + } cfg.RunMode = NormalizeRunMode(cfg.RunMode) cfg.Server.Mode = strings.ToLower(strings.TrimSpace(cfg.Server.Mode)) @@ -2603,6 +2624,12 @@ func (c *Config) Validate() error { if weightSum <= 0 { return fmt.Errorf("gateway.openai_ws.scheduler_score_weights must not all be zero") } + if c.Gateway.OpenAIScheduler.StickyEscapeTTFTMs <= 0 { + return fmt.Errorf("gateway.openai_scheduler.sticky_escape_ttft_ms must be positive") + } + if c.Gateway.OpenAIScheduler.StickyEscapeErrorRate < 0 || c.Gateway.OpenAIScheduler.StickyEscapeErrorRate > 1 { + return fmt.Errorf("gateway.openai_scheduler.sticky_escape_error_rate must be between 0 and 1") + } if c.Gateway.MaxLineSize < 0 { return fmt.Errorf("gateway.max_line_size must be non-negative") } diff --git a/backend/internal/config/config_test.go b/backend/internal/config/config_test.go index 1eae5ed9595..d2d44b231ae 100644 --- a/backend/internal/config/config_test.go +++ b/backend/internal/config/config_test.go @@ -110,6 +110,15 @@ func TestLoadDefaultOpenAIWSConfig(t *testing.T) { if cfg.Gateway.OpenAIWS.StickySessionTTLSeconds != 3600 { t.Fatalf("Gateway.OpenAIWS.StickySessionTTLSeconds = %d, want 3600", cfg.Gateway.OpenAIWS.StickySessionTTLSeconds) } + if !cfg.Gateway.OpenAIScheduler.StickyEscapeEnabled { + t.Fatalf("Gateway.OpenAIScheduler.StickyEscapeEnabled = false, want true") + } + if cfg.Gateway.OpenAIScheduler.StickyEscapeTTFTMs != 15000 { + t.Fatalf("Gateway.OpenAIScheduler.StickyEscapeTTFTMs = %d, want 15000", cfg.Gateway.OpenAIScheduler.StickyEscapeTTFTMs) + } + if cfg.Gateway.OpenAIScheduler.StickyEscapeErrorRate != 0.5 { + t.Fatalf("Gateway.OpenAIScheduler.StickyEscapeErrorRate = %v, want 0.5", cfg.Gateway.OpenAIScheduler.StickyEscapeErrorRate) + } if !cfg.Gateway.OpenAIWS.SessionHashReadOldFallback { t.Fatalf("Gateway.OpenAIWS.SessionHashReadOldFallback = false, want true") } @@ -1705,6 +1714,21 @@ func TestValidateConfig_OpenAIWSRules(t *testing.T) { }, wantErr: "gateway.openai_ws.scheduler_score_weights must not all be zero", }, + { + name: "sticky_escape_ttft_ms 必须为正数", + mutate: func(c *Config) { c.Gateway.OpenAIScheduler.StickyEscapeTTFTMs = 0 }, + wantErr: "gateway.openai_scheduler.sticky_escape_ttft_ms", + }, + { + name: "sticky_escape_error_rate 不能小于 0", + mutate: func(c *Config) { c.Gateway.OpenAIScheduler.StickyEscapeErrorRate = -0.1 }, + wantErr: "gateway.openai_scheduler.sticky_escape_error_rate", + }, + { + name: "sticky_escape_error_rate 不能大于 1", + mutate: func(c *Config) { c.Gateway.OpenAIScheduler.StickyEscapeErrorRate = 1.1 }, + wantErr: "gateway.openai_scheduler.sticky_escape_error_rate", + }, } for _, tc := range cases { diff --git a/backend/internal/service/openai_account_scheduler.go b/backend/internal/service/openai_account_scheduler.go index 1eca08b128d..777dd530041 100644 --- a/backend/internal/service/openai_account_scheduler.go +++ b/backend/internal/service/openai_account_scheduler.go @@ -41,6 +41,7 @@ type OpenAIAccountScheduleRequest struct { GroupID *int64 SessionHash string StickyAccountID int64 + PreserveStickyBinding bool PreviousResponseID string RequestedModel string RequiredTransport OpenAIUpstreamTransport @@ -241,6 +242,12 @@ type defaultOpenAIAccountScheduler struct { stats *openAIAccountRuntimeStats } +type openAIStickyEscapeConfig struct { + enabled bool + ttftMs float64 + errorRate float64 +} + func newDefaultOpenAIAccountScheduler(service *OpenAIGatewayService, stats *openAIAccountRuntimeStats) OpenAIAccountScheduler { if stats == nil { stats = newOpenAIAccountRuntimeStats() @@ -296,7 +303,7 @@ func (s *defaultOpenAIAccountScheduler) Select( } } - selection, err := s.selectBySessionHash(ctx, req) + selection, escapedSticky, err := s.selectBySessionHash(ctx, req) if err != nil { return nil, decision, err } @@ -307,6 +314,9 @@ func (s *defaultOpenAIAccountScheduler) Select( decision.SelectedAccountType = selection.Account.Type return selection, decision, nil } + if escapedSticky { + req.PreserveStickyBinding = true + } selection, candidateCount, topK, loadSkew, err := s.selectByLoadBalance(ctx, req) decision.Layer = openAIAccountScheduleLayerLoadBalance @@ -326,10 +336,10 @@ func (s *defaultOpenAIAccountScheduler) Select( func (s *defaultOpenAIAccountScheduler) selectBySessionHash( ctx context.Context, req OpenAIAccountScheduleRequest, -) (*AccountSelectionResult, error) { +) (*AccountSelectionResult, bool, error) { sessionHash := strings.TrimSpace(req.SessionHash) if sessionHash == "" || s == nil || s.service == nil || s.service.cache == nil { - return nil, nil + return nil, false, nil } accountID := req.StickyAccountID @@ -337,38 +347,48 @@ func (s *defaultOpenAIAccountScheduler) selectBySessionHash( var err error accountID, err = s.service.getStickySessionAccountID(ctx, req.GroupID, sessionHash) if err != nil || accountID <= 0 { - return nil, nil + return nil, false, nil } } if accountID <= 0 { - return nil, nil + return nil, false, nil } if req.ExcludedIDs != nil { if _, excluded := req.ExcludedIDs[accountID]; excluded { - return nil, nil + return nil, false, nil } } account, err := s.service.getSchedulableAccount(ctx, accountID) if err != nil || account == nil { _ = s.service.deleteStickySessionAccountID(ctx, req.GroupID, sessionHash) - return nil, nil + return nil, false, nil } if shouldClearStickySession(account, req.RequestedModel) || !account.IsOpenAI() || !account.IsSchedulable() { _ = s.service.deleteStickySessionAccountID(ctx, req.GroupID, sessionHash) - return nil, nil + return nil, false, nil } if !s.isAccountRequestCompatible(ctx, account, req) { - return nil, nil + return nil, false, nil } if !s.isAccountTransportCompatible(account, req.RequiredTransport) { _ = s.service.deleteStickySessionAccountID(ctx, req.GroupID, sessionHash) - return nil, nil + return nil, false, nil } account = s.service.recheckSelectedOpenAIAccountFromDB(ctx, account, req.RequestedModel, req.RequireCompact, req.RequiredCapability) if account == nil || !s.isAccountTransportCompatible(account, req.RequiredTransport) { _ = s.service.deleteStickySessionAccountID(ctx, req.GroupID, sessionHash) - return nil, nil + return nil, false, nil + } + escapeCfg := s.service.openAIStickyEscapeConfig() + if reason, errorRate, ttft, shouldEscape := s.shouldEscapeStickyAccount(accountID, escapeCfg); shouldEscape { + slog.Info("sticky_escape_triggered", + "account_id", accountID, + "reason", reason, + "error_rate", errorRate, + "ttft", ttft, + ) + return nil, true, nil } result, acquireErr := s.service.tryAcquireAccountSlot(ctx, accountID, account.Concurrency) @@ -378,12 +398,22 @@ func (s *defaultOpenAIAccountScheduler) selectBySessionHash( Account: account, Acquired: true, ReleaseFunc: result.ReleaseFunc, - }, nil + }, false, nil } cfg := s.service.schedulingConfig() // WaitPlan.MaxConcurrency 使用 Concurrency(非 EffectiveLoadFactor),因为 WaitPlan 控制的是 Redis 实际并发槽位等待。 if s.service.concurrencyService != nil { + if escapeCfg.enabled && acquireErr == nil && result != nil && !result.Acquired { + errorRate, ttft, _ := s.stats.snapshot(accountID) + slog.Info("sticky_escape_triggered", + "account_id", accountID, + "reason", "concurrency_full", + "error_rate", errorRate, + "ttft", ttft, + ) + return nil, true, nil + } return &AccountSelectionResult{ Account: account, WaitPlan: &AccountWaitPlan{ @@ -392,9 +422,23 @@ func (s *defaultOpenAIAccountScheduler) selectBySessionHash( Timeout: cfg.StickySessionWaitTimeout, MaxWaiting: cfg.StickySessionMaxWaiting, }, - }, nil + }, false, nil + } + return nil, false, nil +} + +func (s *defaultOpenAIAccountScheduler) shouldEscapeStickyAccount(accountID int64, cfg openAIStickyEscapeConfig) (reason string, errorRate float64, ttft float64, shouldEscape bool) { + if !cfg.enabled || s == nil || s.stats == nil || accountID <= 0 { + return "", 0, 0, false + } + errorRate, ttft, hasTTFT := s.stats.snapshot(accountID) + if hasTTFT && ttft > cfg.ttftMs { + return "ttft", errorRate, ttft, true + } + if errorRate > cfg.errorRate { + return "error_rate", errorRate, ttft, true } - return nil, nil + return "", errorRate, ttft, false } type openAIAccountCandidateScore struct { @@ -810,7 +854,7 @@ func (s *defaultOpenAIAccountScheduler) tryAcquireOpenAISelectionOrder( return nil, compactBlocked, acquireErr } if result != nil && result.Acquired { - if req.SessionHash != "" { + if req.SessionHash != "" && !req.PreserveStickyBinding { _ = s.service.BindStickySession(ctx, req.GroupID, req.SessionHash, fresh.ID) } return &AccountSelectionResult{ @@ -1305,6 +1349,37 @@ func (s *OpenAIGatewayService) openAIWSLBTopK() int { return 7 } +func (s *OpenAIGatewayService) openAIStickyEscapeConfig() openAIStickyEscapeConfig { + if s != nil && s.cfg != nil { + cfg := s.cfg.Gateway.OpenAIScheduler + enabled := cfg.StickyEscapeEnabled + if !enabled && cfg.StickyEscapeTTFTMs == 0 && cfg.StickyEscapeErrorRate == 0 { + enabled = true + } + ttftMs := float64(cfg.StickyEscapeTTFTMs) + if ttftMs <= 0 { + ttftMs = 15000 + } + errorRate := cfg.StickyEscapeErrorRate + if errorRate < 0 || errorRate > 1 { + errorRate = 0.5 + } + if errorRate == 0 && cfg.StickyEscapeTTFTMs == 0 && cfg.StickyEscapeErrorRate == 0 { + errorRate = 0.5 + } + return openAIStickyEscapeConfig{ + enabled: enabled, + ttftMs: ttftMs, + errorRate: errorRate, + } + } + return openAIStickyEscapeConfig{ + enabled: true, + ttftMs: 15000, + errorRate: 0.5, + } +} + func (s *OpenAIGatewayService) openAIWSSchedulerWeights() GatewayOpenAIWSSchedulerScoreWeightsView { if s != nil && s.cfg != nil { return GatewayOpenAIWSSchedulerScoreWeightsView{ diff --git a/backend/internal/service/openai_account_scheduler_test.go b/backend/internal/service/openai_account_scheduler_test.go index fedf7e9c11a..76ec8da3b48 100644 --- a/backend/internal/service/openai_account_scheduler_test.go +++ b/backend/internal/service/openai_account_scheduler_test.go @@ -948,6 +948,9 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionStickyBusyKeepsS cfg := &config.Config{} cfg.Gateway.Scheduling.StickySessionMaxWaiting = 2 cfg.Gateway.Scheduling.StickySessionWaitTimeout = 45 * time.Second + cfg.Gateway.OpenAIScheduler.StickyEscapeEnabled = false + cfg.Gateway.OpenAIScheduler.StickyEscapeTTFTMs = 15000 + cfg.Gateway.OpenAIScheduler.StickyEscapeErrorRate = 0.5 cfg.Gateway.OpenAIWS.Enabled = true cfg.Gateway.OpenAIWS.APIKeyEnabled = true cfg.Gateway.OpenAIWS.OAuthEnabled = true @@ -996,6 +999,253 @@ func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionStickyBusyKeepsS require.True(t, decision.StickySessionHit) } +func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionStickyEscapeByTTFT(t *testing.T) { + ctx := context.Background() + groupID := int64(10101) + accounts := []Account{ + { + ID: 21101, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + }, + { + ID: 21102, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 1, + }, + } + cache := &schedulerTestGatewayCache{sessionBindings: map[string]int64{"openai:session_hash_sticky_ttft": 21101}} + cfg := &config.Config{} + cfg.Gateway.OpenAIScheduler.StickyEscapeEnabled = true + cfg.Gateway.OpenAIScheduler.StickyEscapeTTFTMs = 15000 + cfg.Gateway.OpenAIScheduler.StickyEscapeErrorRate = 0.5 + concurrencyCache := schedulerTestConcurrencyCache{acquireResults: map[int64]bool{21102: true}} + svc := &OpenAIGatewayService{ + accountRepo: schedulerTestOpenAIAccountRepo{accounts: accounts}, + cache: cache, + cfg: cfg, + rateLimitService: newOpenAIAdvancedSchedulerRateLimitService("true"), + concurrencyService: NewConcurrencyService(concurrencyCache), + openaiAccountStats: newOpenAIAccountRuntimeStats(), + } + fastTTFT := 14999 + svc.openaiAccountStats.report(21101, true, &fastTTFT) + stableTTFT := 14999 + svc.openaiAccountStats.report(21101, true, &stableTTFT) + + selection, decision, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "session_hash_sticky_ttft", "gpt-5.1", nil, OpenAIUpstreamTransportAny, false) + require.NoError(t, err) + require.NotNil(t, selection) + require.NotNil(t, selection.Account) + require.Equal(t, int64(21101), selection.Account.ID) + require.Equal(t, openAIAccountScheduleLayerSessionSticky, decision.Layer) + require.True(t, decision.StickySessionHit) + if selection.ReleaseFunc != nil { + selection.ReleaseFunc() + } + + slowTTFT := 20000 + for i := 0; i < 3; i++ { + svc.openaiAccountStats.report(21101, true, &slowTTFT) + } + + selection, decision, err = svc.SelectAccountWithScheduler(ctx, &groupID, "", "session_hash_sticky_ttft", "gpt-5.1", nil, OpenAIUpstreamTransportAny, false) + require.NoError(t, err) + require.NotNil(t, selection) + require.NotNil(t, selection.Account) + require.Equal(t, int64(21102), selection.Account.ID) + require.Equal(t, openAIAccountScheduleLayerLoadBalance, decision.Layer) + require.False(t, decision.StickySessionHit) + require.Equal(t, int64(21101), cache.sessionBindings["openai:session_hash_sticky_ttft"]) + if selection.ReleaseFunc != nil { + selection.ReleaseFunc() + } +} + +func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionStickyEscapeByErrorRate(t *testing.T) { + ctx := context.Background() + groupID := int64(10102) + accounts := []Account{ + {ID: 21201, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 0}, + {ID: 21202, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 1}, + } + cache := &schedulerTestGatewayCache{sessionBindings: map[string]int64{"openai:session_hash_sticky_error_rate": 21201}} + cfg := &config.Config{} + cfg.Gateway.OpenAIScheduler.StickyEscapeEnabled = true + cfg.Gateway.OpenAIScheduler.StickyEscapeTTFTMs = 15000 + cfg.Gateway.OpenAIScheduler.StickyEscapeErrorRate = 0.5 + svc := &OpenAIGatewayService{ + accountRepo: schedulerTestOpenAIAccountRepo{accounts: accounts}, + cache: cache, + cfg: cfg, + rateLimitService: newOpenAIAdvancedSchedulerRateLimitService("true"), + concurrencyService: NewConcurrencyService(schedulerTestConcurrencyCache{acquireResults: map[int64]bool{21202: true}}), + openaiAccountStats: newOpenAIAccountRuntimeStats(), + } + for i := 0; i < 3; i++ { + svc.openaiAccountStats.report(21201, false, nil) + } + selection, decision, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "session_hash_sticky_error_rate", "gpt-5.1", nil, OpenAIUpstreamTransportAny, false) + require.NoError(t, err) + require.NotNil(t, selection) + require.NotNil(t, selection.Account) + require.Equal(t, int64(21201), selection.Account.ID) + require.Equal(t, openAIAccountScheduleLayerSessionSticky, decision.Layer) + require.True(t, decision.StickySessionHit) + if selection.ReleaseFunc != nil { + selection.ReleaseFunc() + } + for i := 0; i < 2; i++ { + svc.openaiAccountStats.report(21201, false, nil) + } + + selection, decision, err = svc.SelectAccountWithScheduler(ctx, &groupID, "", "session_hash_sticky_error_rate", "gpt-5.1", nil, OpenAIUpstreamTransportAny, false) + require.NoError(t, err) + require.NotNil(t, selection) + require.NotNil(t, selection.Account) + require.Equal(t, int64(21202), selection.Account.ID) + require.Equal(t, openAIAccountScheduleLayerLoadBalance, decision.Layer) + require.False(t, decision.StickySessionHit) + require.Equal(t, int64(21201), cache.sessionBindings["openai:session_hash_sticky_error_rate"]) + if selection.ReleaseFunc != nil { + selection.ReleaseFunc() + } +} + +func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionStickyBusyEscapes(t *testing.T) { + ctx := context.Background() + groupID := int64(10103) + accounts := []Account{ + {ID: 21301, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 0}, + {ID: 21302, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 1}, + } + cache := &schedulerTestGatewayCache{sessionBindings: map[string]int64{"openai:session_hash_sticky_busy_escape": 21301}} + cfg := &config.Config{} + cfg.Gateway.OpenAIScheduler.StickyEscapeEnabled = true + cfg.Gateway.OpenAIScheduler.StickyEscapeTTFTMs = 15000 + cfg.Gateway.OpenAIScheduler.StickyEscapeErrorRate = 0.5 + cfg.Gateway.Scheduling.StickySessionMaxWaiting = 2 + cfg.Gateway.Scheduling.StickySessionWaitTimeout = 45 * time.Second + concurrencyCache := schedulerTestConcurrencyCache{ + acquireResults: map[int64]bool{21301: false, 21302: true}, + waitCounts: map[int64]int{21301: 999}, + loadMap: map[int64]*AccountLoadInfo{ + 21301: {AccountID: 21301, LoadRate: 95, WaitingCount: 9}, + 21302: {AccountID: 21302, LoadRate: 1, WaitingCount: 0}, + }, + } + svc := &OpenAIGatewayService{ + accountRepo: schedulerTestOpenAIAccountRepo{accounts: accounts}, + cache: cache, + cfg: cfg, + rateLimitService: newOpenAIAdvancedSchedulerRateLimitService("true"), + concurrencyService: NewConcurrencyService(concurrencyCache), + } + + selection, decision, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "session_hash_sticky_busy_escape", "gpt-5.1", nil, OpenAIUpstreamTransportAny, false) + require.NoError(t, err) + require.NotNil(t, selection) + require.NotNil(t, selection.Account) + require.Equal(t, int64(21302), selection.Account.ID) + require.Nil(t, selection.WaitPlan) + require.Equal(t, openAIAccountScheduleLayerLoadBalance, decision.Layer) + require.False(t, decision.StickySessionHit) + if selection.ReleaseFunc != nil { + selection.ReleaseFunc() + } +} + +func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionStickyEscapeDisabledKeepsLegacyBehavior(t *testing.T) { + ctx := context.Background() + groupID := int64(10104) + accounts := []Account{ + {ID: 21401, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 0}, + {ID: 21402, Platform: PlatformOpenAI, Type: AccountTypeAPIKey, Status: StatusActive, Schedulable: true, Concurrency: 1, Priority: 1}, + } + cache := &schedulerTestGatewayCache{sessionBindings: map[string]int64{"openai:session_hash_sticky_disabled": 21401}} + cfg := &config.Config{} + cfg.Gateway.OpenAIScheduler.StickyEscapeEnabled = false + cfg.Gateway.OpenAIScheduler.StickyEscapeTTFTMs = 15000 + cfg.Gateway.OpenAIScheduler.StickyEscapeErrorRate = 0.5 + cfg.Gateway.Scheduling.StickySessionMaxWaiting = 2 + cfg.Gateway.Scheduling.StickySessionWaitTimeout = 45 * time.Second + concurrencyCache := schedulerTestConcurrencyCache{ + acquireResults: map[int64]bool{21401: false, 21402: true}, + waitCounts: map[int64]int{21401: 999}, + } + svc := &OpenAIGatewayService{ + accountRepo: schedulerTestOpenAIAccountRepo{accounts: accounts}, + cache: cache, + cfg: cfg, + rateLimitService: newOpenAIAdvancedSchedulerRateLimitService("true"), + concurrencyService: NewConcurrencyService(concurrencyCache), + openaiAccountStats: newOpenAIAccountRuntimeStats(), + } + slowTTFT := 20000 + svc.openaiAccountStats.report(21401, true, &slowTTFT) + for i := 0; i < 5; i++ { + svc.openaiAccountStats.report(21401, false, nil) + } + + selection, decision, err := svc.SelectAccountWithScheduler(ctx, &groupID, "", "session_hash_sticky_disabled", "gpt-5.1", nil, OpenAIUpstreamTransportAny, false) + require.NoError(t, err) + require.NotNil(t, selection) + require.NotNil(t, selection.Account) + require.Equal(t, int64(21401), selection.Account.ID) + require.NotNil(t, selection.WaitPlan) + require.Equal(t, int64(21401), selection.WaitPlan.AccountID) + require.Equal(t, openAIAccountScheduleLayerSessionSticky, decision.Layer) + require.True(t, decision.StickySessionHit) +} + +func TestDefaultOpenAIAccountScheduler_ShouldEscapeStickyAccount_ThresholdBoundary(t *testing.T) { + stats := newOpenAIAccountRuntimeStats() + accountID := int64(21501) + ttft := 15000 + stats.report(accountID, true, &ttft) + stats.report(accountID, false, nil) + stats.report(accountID, true, nil) + scheduler := &defaultOpenAIAccountScheduler{stats: stats} + + reason, errorRate, observedTTFT, shouldEscape := scheduler.shouldEscapeStickyAccount(accountID, openAIStickyEscapeConfig{ + enabled: true, + ttftMs: 15000, + errorRate: 0.5, + }) + require.False(t, shouldEscape) + require.Empty(t, reason) + require.InDelta(t, 0.16, errorRate, 1e-9) + require.InDelta(t, 15000, observedTTFT, 1e-9) + + for i := 0; i < 4; i++ { + stats.report(accountID, false, nil) + } + reason, errorRate, _, shouldEscape = scheduler.shouldEscapeStickyAccount(accountID, openAIStickyEscapeConfig{ + enabled: true, + ttftMs: 15000, + errorRate: 1, + }) + require.False(t, shouldEscape) + require.Empty(t, reason) + reason, errorRate, observedTTFT, shouldEscape = scheduler.shouldEscapeStickyAccount(accountID, openAIStickyEscapeConfig{ + enabled: true, + ttftMs: 15000, + errorRate: errorRate, + }) + require.False(t, shouldEscape) + require.Empty(t, reason) + require.InDelta(t, 0.655936, errorRate, 1e-9) + require.InDelta(t, 15000, observedTTFT, 1e-9) +} + func TestOpenAIGatewayService_SelectAccountWithScheduler_SessionSticky_ForceHTTP(t *testing.T) { ctx := context.Background() groupID := int64(1010) diff --git a/deploy/config.example.yaml b/deploy/config.example.yaml index 31b38a19b96..35c76964230 100644 --- a/deploy/config.example.yaml +++ b/deploy/config.example.yaml @@ -320,6 +320,14 @@ gateway: queue: 0.7 error_rate: 0.8 ttft: 0.5 + # OpenAI 高级调度器补充配置 + openai_scheduler: + # 是否允许 session_hash sticky 在账号健康度恶化时临时逃逸;false 可一键回退旧行为 + sticky_escape_enabled: true + # TTFT EWMA 超过该阈值(毫秒)时跳过 sticky,默认 15s,避免轻微抖动就逃逸 + sticky_escape_ttft_ms: 15000 + # 错误率 EWMA 超过该阈值时跳过 sticky,默认 0.5,仅在明显降级时触发 + sticky_escape_error_rate: 0.5 # OpenAI HTTP upstream protocol strategy. # OpenAI HTTP 上游协议策略(默认 HTTP/2;代理明确不兼容时可临时回退 HTTP/1.1)。 openai_http2: