From e71d898339e3c98874813cf4876c278972a7e21b Mon Sep 17 00:00:00 2001 From: Menci Date: Sat, 20 Jun 2026 03:51:58 +0800 Subject: [PATCH] fix(codex): recover from concurrent refresh-token rotation races Under burst load, two workers can both observe a stale access token on the same Codex upstream and both attempt a refresh. OpenAI rotates the refresh_token on every successful /oauth/token call, so exactly one racer wins; the other's request is rejected with `invalid_grant` for trying to redeem the rotated-out copy. The previous flow treated every `invalid_grant` as a dead credential and let the caller flip the account to `refresh_failed` - destroying the working credential a sibling had just rotated, and forcing the operator to re-import. On `invalid_grant`, the access-token cache now re-reads upstream state for the same `chatgpt-account-id` slot and compares the refresh_token it tried against what is now stored. If they differ, a sibling rotated and we return their freshly-minted access token (the caller treats it as a normal cache hit and skips the terminal flip). If they match, we re-raise the original error so the data-plane / control-plane caller flips the row as before. The other refresh-terminal codes - `app_session_terminated`, `invalid_refresh_token`, `invalid_client`, `unauthorized_client`, `access_denied` - bypass recovery entirely; none of them are caused by a rotation race. `CodexOAuthSessionTerminatedError` now carries the raw OAuth `error` value as a `code` field alongside the existing `upstreamMessage` so the recovery branch can single out `invalid_grant` from the catch. `REFRESH_TERMINAL_OAUTH_CODES` is broadened to the audit-aligned set (`app_session_terminated`, `invalid_grant`, `invalid_refresh_token`, `invalid_client`, `unauthorized_client`, `access_denied`) - Codex is OpenAI OAuth, so the list matches sub2api's `isNonRetryableRefreshError` verbatim. Sub2api's tryRecoverFromRefreshRace (backend/internal/service/oauth_refresh_api.go:173-193) is the canonical pattern; we apply it to Codex's per-account credential here. The token rotation persistence hook stays awaited; the recovery branch reads from the just-persisted state via the upstream repo and returns the sibling's cached access token directly so no second mint fires from this call site. --- .../provider-codex/src/access-token-cache.ts | 79 +++++++++++++++++-- .../src/access-token-cache_test.ts | 41 ++++++++++ packages/provider-codex/src/auth/oauth.ts | 58 +++++++++++--- 3 files changed, 160 insertions(+), 18 deletions(-) diff --git a/packages/provider-codex/src/access-token-cache.ts b/packages/provider-codex/src/access-token-cache.ts index ac792da9..3b66cd37 100644 --- a/packages/provider-codex/src/access-token-cache.ts +++ b/packages/provider-codex/src/access-token-cache.ts @@ -1,4 +1,4 @@ -import { refreshCodexAccessToken } from './auth/oauth.ts'; +import { CodexOAuthSessionTerminatedError, refreshCodexAccessToken } from './auth/oauth.ts'; import { readCodexUpstreamState, type CodexAccessTokenEntry, type CodexUpstreamState } from './state.ts'; import { getProviderRepo, type Fetcher } from '@floway-dev/provider'; @@ -76,14 +76,34 @@ export const invalidateCodexAccessToken = async ( accountId: string, ): Promise => { await persistAccessToken(upstreamId, accountId, null, 'invalidateCodexAccessToken'); }; -// Refresh-token rotation is deliberately not folded in here: the caller's -// CAS hook for refresh_token has to coordinate with terminal-state -// transitions and lives upstream of this function. `mintCodexAccessToken` -// below is the standard implementation of that callback. +// Reads, mints, and persists. The mint callback is responsible for routing +// the rotated refresh_token through the upstream's CAS hook; +// `mintCodexAccessToken` below is the standard implementation. +// +// Refresh-race recovery: when the mint throws `invalid_grant`, it might mean +// either (a) the refresh_token is genuinely revoked, or (b) a sibling worker +// raced us, won the rotation, and our copy is now stale. +// `recoverFromRefreshRace` distinguishes by re-reading state for the same +// account slot and comparing the refresh token we used against what is now +// stored. If a sibling rotated, we return their freshly-minted access token +// — the caller treats it as a normal cache hit. If the stored value hasn't +// moved, we re-raise the original error so the data-plane / control-plane +// caller flips the row to `refresh_failed`. Mirrors sub2api +// `oauth_refresh_api.go:tryRecoverFromRefreshRace` (lines 173-193). All +// other terminal codes (`app_session_terminated`, `invalid_refresh_token`, +// `invalid_client`, `unauthorized_client`, `access_denied`) signal +// credential death under any race scenario and skip recovery. export const ensureCodexAccessToken = async ( upstreamId: string, accountId: string, mint: (refreshToken: string) => Promise, +): Promise => await ensureCodexAccessTokenInner(upstreamId, accountId, mint, true); + +const ensureCodexAccessTokenInner = async ( + upstreamId: string, + accountId: string, + mint: (refreshToken: string) => Promise, + recoveryAllowed: boolean, ): Promise => { const fresh = await getProviderRepo().upstreams.getById(upstreamId); if (!fresh) throw new Error(`Codex upstream ${upstreamId} not found`); @@ -93,11 +113,58 @@ export const ensureCodexAccessToken = async ( if (account.accessToken && isAccessTokenFresh(account.accessToken)) { return account.accessToken; } - const minted = await mint(account.refresh_token); + + let minted; + try { + minted = await mint(account.refresh_token); + } catch (err) { + if (err instanceof CodexOAuthSessionTerminatedError && err.code === 'invalid_grant' && recoveryAllowed) { + const recovered = await recoverFromRefreshRace(upstreamId, accountId, account.refresh_token, mint); + if (recovered) return recovered; + } + throw err; + } await persistAccessToken(upstreamId, accountId, minted, 'ensureCodexAccessToken'); return minted; }; +// `invalid_grant` ambiguity: dead refresh token, or a sibling worker raced +// us and we hold the rotated-out copy. Re-read state for the same +// `accountId` slot and compare. The "sibling rotated but no cached access +// token yet" subcase (e.g. a concurrent `invalidateCodexAccessToken` +// cleared it) re-enters the refresh flow once with the fresh RT in hand; +// the depth guard prevents runaway recursion if recovery itself observes a +// stale view. Returns `null` when the original error should be re-raised as +// a real session termination. +const recoverFromRefreshRace = async ( + upstreamId: string, + accountId: string, + usedRefreshToken: string, + mint: (refreshToken: string) => Promise, +): Promise => { + const reread = await getProviderRepo().upstreams.getById(upstreamId); + if (!reread) return null; + const rereadState = readCodexUpstreamState(reread.state); + const rereadAccount = rereadState.accounts.find(a => a.chatgptAccountId === accountId); + if (!rereadAccount) return null; + if (rereadAccount.state !== 'active') return null; + if (rereadAccount.refresh_token === usedRefreshToken) return null; + console.info( + `Codex refresh-race recovered for upstream ${upstreamId} account ${accountId}: sibling rotated, using their access token`, + ); + if (rereadAccount.accessToken && isAccessTokenFresh(rereadAccount.accessToken)) { + return rereadAccount.accessToken; + } + // Sibling rotated the refresh token but no usable access token sits in + // state — most likely an `invalidateCodexAccessToken` ran between the + // sibling's rotation and our re-read. Re-enter the refresh flow once with + // the live RT; the re-entrant call sees the rotated row and goes straight + // through the standard mint path. The depth guard suppresses a second + // recovery attempt — if `invalid_grant` strikes again the refresh token + // really is dead and we want the terminal flip. + return await ensureCodexAccessTokenInner(upstreamId, accountId, mint, false); +}; + // Mints a fresh access token via /oauth/token and routes the rotated // refresh_token through the caller's CAS hook. Awaiting the rotation // persistence (rather than fire-and-forget) is deliberate: under concurrent diff --git a/packages/provider-codex/src/access-token-cache_test.ts b/packages/provider-codex/src/access-token-cache_test.ts index 4b2431e5..b40fdd37 100644 --- a/packages/provider-codex/src/access-token-cache_test.ts +++ b/packages/provider-codex/src/access-token-cache_test.ts @@ -7,6 +7,7 @@ import { putCodexAccessToken, type CodexAccessTokenEntry, } from './access-token-cache.ts'; +import { CodexOAuthSessionTerminatedError } from './auth/oauth.ts'; import type { CodexUpstreamState } from './state.ts'; import { initProviderRepo, type UpstreamRecord } from '@floway-dev/provider'; @@ -179,4 +180,44 @@ describe('ensureCodexAccessToken', () => { await expect(ensureCodexAccessToken(upstreamId, accountId, mint)).rejects.toThrow(/oauth boom/); expect(saveStateSpy).not.toHaveBeenCalled(); }); + + test('invalid_grant with a sibling rotation in flight → returns the sibling-minted access token, no persist', async () => { + // Simulate the race: between our pre-mint getById and the upstream + // rejecting our refresh_token, a sibling worker won the rotation and + // CAS-wrote rt_v2 + at_sibling. Re-read on recovery observes the new + // pair scoped to the same accountId; we should return it instead of + // destroying a working credential. + const siblingEntry: CodexAccessTokenEntry = { token: 'at_sibling', expiresAt: farFutureMs, refreshedAt: 'sibling' }; + getByIdSpy.mockImplementationOnce(async () => current).mockImplementationOnce(async () => { + current = makeRecord({ accounts: [{ ...baseAccount, refresh_token: 'rt_v2', accessToken: siblingEntry }] }); + return current; + }); + const mint = vi.fn().mockRejectedValue(new CodexOAuthSessionTerminatedError({ code: 'invalid_grant', message: 'replayed' })); + + const out = await ensureCodexAccessToken(upstreamId, accountId, mint); + expect(out).toEqual(siblingEntry); + expect(mint).toHaveBeenCalledTimes(1); + // Recovery returns the sibling's cached token; no fresh persist from us. + expect(saveStateSpy).not.toHaveBeenCalled(); + }); + + test('invalid_grant with stored RT unchanged → rethrows for the caller to flip to terminal', async () => { + // Same RT on re-read means no sibling rotated; the refresh_token really + // is dead. The cache surfaces the original error; the data-plane / control- + // plane caller is responsible for the terminal-state flip. + const mint = vi.fn().mockRejectedValue(new CodexOAuthSessionTerminatedError({ code: 'invalid_grant', message: 'revoked' })); + await expect(ensureCodexAccessToken(upstreamId, accountId, mint)).rejects.toBeInstanceOf(CodexOAuthSessionTerminatedError); + expect(mint).toHaveBeenCalledTimes(1); + expect(saveStateSpy).not.toHaveBeenCalled(); + }); + + test('app_session_terminated never attempts race recovery — single getById, original error rethrown', async () => { + // Terminal codes other than invalid_grant signal credential death under + // any race scenario; the cache must not re-read state to second-guess + // them. Assert via the absence of a second getById call. + const mint = vi.fn().mockRejectedValue(new CodexOAuthSessionTerminatedError({ code: 'app_session_terminated', message: 'gone' })); + await expect(ensureCodexAccessToken(upstreamId, accountId, mint)).rejects.toBeInstanceOf(CodexOAuthSessionTerminatedError); + expect(getByIdSpy).toHaveBeenCalledTimes(1); + expect(saveStateSpy).not.toHaveBeenCalled(); + }); }); diff --git a/packages/provider-codex/src/auth/oauth.ts b/packages/provider-codex/src/auth/oauth.ts index 2e539543..fdb413b7 100644 --- a/packages/provider-codex/src/auth/oauth.ts +++ b/packages/provider-codex/src/auth/oauth.ts @@ -17,14 +17,49 @@ export interface CodexOAuthTokens { // Terminal error: refresh_token is dead, operator must re-import. Distinct // from generic OAuth 4xx so callers can react to session-termination -// separately from a transient upstream message. +// separately from a transient upstream message. `code` carries the raw OAuth +// `error` value (`invalid_grant`, `app_session_terminated`, etc.) so the +// refresh-race recovery in the access-token cache can single out +// `invalid_grant` — the only terminal code that might mean "a sibling +// worker just rotated the refresh token, and our copy is stale" — from +// codes that signal genuine credential death under any race scenario. export class CodexOAuthSessionTerminatedError extends Error { - constructor(public readonly upstreamMessage: string) { - super(`Codex OAuth session terminated: ${upstreamMessage}`); + readonly code: string; + readonly upstreamMessage: string; + constructor(args: { code: string; message: string }) { + super(`Codex OAuth session terminated: ${args.message}`); this.name = 'CodexOAuthSessionTerminatedError'; + this.code = args.code; + this.upstreamMessage = args.message; } } +// Terminal codes accepted on the authorization-code exchange. `invalid_grant` +// here typically means the operator pasted a stale or wrong callback URL, +// which is recoverable by restarting the PKCE flow rather than re-importing, +// so it stays out of this set. +const EXCHANGE_TERMINAL_OAUTH_CODES: ReadonlySet = new Set([ + 'app_session_terminated', +]); + +// Terminal codes on the refresh path: every one of these signals a dead +// refresh_token that only operator re-import recovers. Aligned with +// sub2api's `isNonRetryableRefreshError` +// (backend/internal/service/token_refresh_service.go:429-451), which shares +// the same list across OpenAI/Claude/Gemini OAuth — Codex is OpenAI OAuth, +// so the set carries over verbatim. `invalid_grant` is included even though +// the refresh-race recovery in access-token-cache.ts may re-classify it +// when a sibling rotation is detected; from the OAuth wire's perspective +// it is still a terminal signal. +const REFRESH_TERMINAL_OAUTH_CODES: ReadonlySet = new Set([ + 'app_session_terminated', + 'invalid_grant', + 'invalid_refresh_token', + 'invalid_client', + 'unauthorized_client', + 'access_denied', +]); + const codexTokenRequest = async ( body: URLSearchParams, terminalCodes: ReadonlySet, @@ -65,7 +100,7 @@ const codexTokenRequest = async ( if (message === null && typeof root?.detail === 'string') message = root.detail as string; message ??= rawText.slice(0, 256); if (code && terminalCodes.has(code)) { - throw new CodexOAuthSessionTerminatedError(message); + throw new CodexOAuthSessionTerminatedError({ code, message }); } throw new Error(`Codex OAuth /token returned ${response.status}: ${message}`); } @@ -101,7 +136,7 @@ export const exchangeCodexAuthorizationCode = async (opts: { code: string; codeV // exchange typically means the operator pasted a stale or wrong callback // URL, which is recoverable by restarting the PKCE flow rather than // re-importing. - return await codexTokenRequest(body, new Set(['app_session_terminated']), directFetcher); + return await codexTokenRequest(body, EXCHANGE_TERMINAL_OAUTH_CODES, directFetcher); }; // `fetcher` is required because the refresh has an associated upstream @@ -114,11 +149,10 @@ export const refreshCodexAccessToken = async (refreshToken: string, fetcher: Fet client_id: CODEX_CLIENT_ID, scope: CODEX_OAUTH_SCOPE, }); - // OAuth `invalid_grant` on the refresh path is unambiguous — the - // refresh_token has been replayed, revoked, or expired. Same recovery as - // `app_session_terminated`: the operator must re-import a fresh auth.json. - // The error text varies ("Your refresh token has already been used to - // generate a new access token", "Token is no longer valid", etc.); the code - // is the stable signal. - return await codexTokenRequest(body, new Set(['app_session_terminated', 'invalid_grant']), fetcher); + // OAuth `invalid_grant` on the refresh path is ambiguous on its own — it + // can mean a genuinely revoked/expired refresh_token, *or* that a sibling + // worker raced us, won the rotation, and our copy is now stale. The + // access-token cache's `recoverFromRefreshRace` distinguishes by re-reading + // upstream state; the other codes here always mean credential death. + return await codexTokenRequest(body, REFRESH_TERMINAL_OAUTH_CODES, fetcher); };