Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 73 additions & 6 deletions packages/provider-codex/src/access-token-cache.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { refreshCodexAccessToken } from './auth/oauth.ts';
import { CodexOAuthSessionTerminatedError, refreshCodexAccessToken } from './auth/oauth.ts';
import { readCodexUpstreamState, type CodexAccessTokenEntry, type CodexUpstreamState } from './state.ts';
import { getProviderRepo, type Fetcher } from '@floway-dev/provider';

Expand Down Expand Up @@ -76,14 +76,34 @@ export const invalidateCodexAccessToken = async (
accountId: string,
): Promise<void> => { await persistAccessToken(upstreamId, accountId, null, 'invalidateCodexAccessToken'); };

// Refresh-token rotation is deliberately not folded in here: the caller's
// CAS hook for refresh_token has to coordinate with terminal-state
// transitions and lives upstream of this function. `mintCodexAccessToken`
// below is the standard implementation of that callback.
// Reads, mints, and persists. The mint callback is responsible for routing
// the rotated refresh_token through the upstream's CAS hook;
// `mintCodexAccessToken` below is the standard implementation.
//
// Refresh-race recovery: when the mint throws `invalid_grant`, it might mean
// either (a) the refresh_token is genuinely revoked, or (b) a sibling worker
// raced us, won the rotation, and our copy is now stale.
// `recoverFromRefreshRace` distinguishes by re-reading state for the same
// account slot and comparing the refresh token we used against what is now
// stored. If a sibling rotated, we return their freshly-minted access token
// — the caller treats it as a normal cache hit. If the stored value hasn't
// moved, we re-raise the original error so the data-plane / control-plane
// caller flips the row to `refresh_failed`. Mirrors sub2api
// `oauth_refresh_api.go:tryRecoverFromRefreshRace` (lines 173-193). All
// other terminal codes (`app_session_terminated`, `invalid_refresh_token`,
// `invalid_client`, `unauthorized_client`, `access_denied`) signal
// credential death under any race scenario and skip recovery.
export const ensureCodexAccessToken = async (
upstreamId: string,
accountId: string,
mint: (refreshToken: string) => Promise<CodexAccessTokenEntry>,
): Promise<CodexAccessTokenEntry> => await ensureCodexAccessTokenInner(upstreamId, accountId, mint, true);

const ensureCodexAccessTokenInner = async (
upstreamId: string,
accountId: string,
mint: (refreshToken: string) => Promise<CodexAccessTokenEntry>,
recoveryAllowed: boolean,
): Promise<CodexAccessTokenEntry> => {
const fresh = await getProviderRepo().upstreams.getById(upstreamId);
if (!fresh) throw new Error(`Codex upstream ${upstreamId} not found`);
Expand All @@ -93,11 +113,58 @@ export const ensureCodexAccessToken = async (
if (account.accessToken && isAccessTokenFresh(account.accessToken)) {
return account.accessToken;
}
const minted = await mint(account.refresh_token);

let minted;
try {
minted = await mint(account.refresh_token);
} catch (err) {
if (err instanceof CodexOAuthSessionTerminatedError && err.code === 'invalid_grant' && recoveryAllowed) {
const recovered = await recoverFromRefreshRace(upstreamId, accountId, account.refresh_token, mint);
if (recovered) return recovered;
}
throw err;
}
await persistAccessToken(upstreamId, accountId, minted, 'ensureCodexAccessToken');
return minted;
};

// `invalid_grant` ambiguity: dead refresh token, or a sibling worker raced
// us and we hold the rotated-out copy. Re-read state for the same
// `accountId` slot and compare. The "sibling rotated but no cached access
// token yet" subcase (e.g. a concurrent `invalidateCodexAccessToken`
// cleared it) re-enters the refresh flow once with the fresh RT in hand;
// the depth guard prevents runaway recursion if recovery itself observes a
// stale view. Returns `null` when the original error should be re-raised as
// a real session termination.
const recoverFromRefreshRace = async (
upstreamId: string,
accountId: string,
usedRefreshToken: string,
mint: (refreshToken: string) => Promise<CodexAccessTokenEntry>,
): Promise<CodexAccessTokenEntry | null> => {
const reread = await getProviderRepo().upstreams.getById(upstreamId);
if (!reread) return null;
const rereadState = readCodexUpstreamState(reread.state);
const rereadAccount = rereadState.accounts.find(a => a.chatgptAccountId === accountId);
if (!rereadAccount) return null;
if (rereadAccount.state !== 'active') return null;
if (rereadAccount.refresh_token === usedRefreshToken) return null;
console.info(
`Codex refresh-race recovered for upstream ${upstreamId} account ${accountId}: sibling rotated, using their access token`,
);
if (rereadAccount.accessToken && isAccessTokenFresh(rereadAccount.accessToken)) {
return rereadAccount.accessToken;
}
// Sibling rotated the refresh token but no usable access token sits in
// state — most likely an `invalidateCodexAccessToken` ran between the
// sibling's rotation and our re-read. Re-enter the refresh flow once with
// the live RT; the re-entrant call sees the rotated row and goes straight
// through the standard mint path. The depth guard suppresses a second
// recovery attempt — if `invalid_grant` strikes again the refresh token
// really is dead and we want the terminal flip.
return await ensureCodexAccessTokenInner(upstreamId, accountId, mint, false);
};

// Mints a fresh access token via /oauth/token and routes the rotated
// refresh_token through the caller's CAS hook. Awaiting the rotation
// persistence (rather than fire-and-forget) is deliberate: under concurrent
Expand Down
41 changes: 41 additions & 0 deletions packages/provider-codex/src/access-token-cache_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
putCodexAccessToken,
type CodexAccessTokenEntry,
} from './access-token-cache.ts';
import { CodexOAuthSessionTerminatedError } from './auth/oauth.ts';
import type { CodexUpstreamState } from './state.ts';
import { initProviderRepo, type UpstreamRecord } from '@floway-dev/provider';

Expand Down Expand Up @@ -179,4 +180,44 @@ describe('ensureCodexAccessToken', () => {
await expect(ensureCodexAccessToken(upstreamId, accountId, mint)).rejects.toThrow(/oauth boom/);
expect(saveStateSpy).not.toHaveBeenCalled();
});

test('invalid_grant with a sibling rotation in flight → returns the sibling-minted access token, no persist', async () => {
// Simulate the race: between our pre-mint getById and the upstream
// rejecting our refresh_token, a sibling worker won the rotation and
// CAS-wrote rt_v2 + at_sibling. Re-read on recovery observes the new
// pair scoped to the same accountId; we should return it instead of
// destroying a working credential.
const siblingEntry: CodexAccessTokenEntry = { token: 'at_sibling', expiresAt: farFutureMs, refreshedAt: 'sibling' };
getByIdSpy.mockImplementationOnce(async () => current).mockImplementationOnce(async () => {
current = makeRecord({ accounts: [{ ...baseAccount, refresh_token: 'rt_v2', accessToken: siblingEntry }] });
return current;
});
const mint = vi.fn().mockRejectedValue(new CodexOAuthSessionTerminatedError({ code: 'invalid_grant', message: 'replayed' }));

const out = await ensureCodexAccessToken(upstreamId, accountId, mint);
expect(out).toEqual(siblingEntry);
expect(mint).toHaveBeenCalledTimes(1);
// Recovery returns the sibling's cached token; no fresh persist from us.
expect(saveStateSpy).not.toHaveBeenCalled();
});

test('invalid_grant with stored RT unchanged → rethrows for the caller to flip to terminal', async () => {
// Same RT on re-read means no sibling rotated; the refresh_token really
// is dead. The cache surfaces the original error; the data-plane / control-
// plane caller is responsible for the terminal-state flip.
const mint = vi.fn().mockRejectedValue(new CodexOAuthSessionTerminatedError({ code: 'invalid_grant', message: 'revoked' }));
await expect(ensureCodexAccessToken(upstreamId, accountId, mint)).rejects.toBeInstanceOf(CodexOAuthSessionTerminatedError);
expect(mint).toHaveBeenCalledTimes(1);
expect(saveStateSpy).not.toHaveBeenCalled();
});

test('app_session_terminated never attempts race recovery — single getById, original error rethrown', async () => {
// Terminal codes other than invalid_grant signal credential death under
// any race scenario; the cache must not re-read state to second-guess
// them. Assert via the absence of a second getById call.
const mint = vi.fn().mockRejectedValue(new CodexOAuthSessionTerminatedError({ code: 'app_session_terminated', message: 'gone' }));
await expect(ensureCodexAccessToken(upstreamId, accountId, mint)).rejects.toBeInstanceOf(CodexOAuthSessionTerminatedError);
expect(getByIdSpy).toHaveBeenCalledTimes(1);
expect(saveStateSpy).not.toHaveBeenCalled();
});
});
58 changes: 46 additions & 12 deletions packages/provider-codex/src/auth/oauth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,49 @@ export interface CodexOAuthTokens {

// Terminal error: refresh_token is dead, operator must re-import. Distinct
// from generic OAuth 4xx so callers can react to session-termination
// separately from a transient upstream message.
// separately from a transient upstream message. `code` carries the raw OAuth
// `error` value (`invalid_grant`, `app_session_terminated`, etc.) so the
// refresh-race recovery in the access-token cache can single out
// `invalid_grant` — the only terminal code that might mean "a sibling
// worker just rotated the refresh token, and our copy is stale" — from
// codes that signal genuine credential death under any race scenario.
export class CodexOAuthSessionTerminatedError extends Error {
constructor(public readonly upstreamMessage: string) {
super(`Codex OAuth session terminated: ${upstreamMessage}`);
readonly code: string;
readonly upstreamMessage: string;
constructor(args: { code: string; message: string }) {
super(`Codex OAuth session terminated: ${args.message}`);
this.name = 'CodexOAuthSessionTerminatedError';
this.code = args.code;
this.upstreamMessage = args.message;
}
}

// Terminal codes accepted on the authorization-code exchange. `invalid_grant`
// here typically means the operator pasted a stale or wrong callback URL,
// which is recoverable by restarting the PKCE flow rather than re-importing,
// so it stays out of this set.
const EXCHANGE_TERMINAL_OAUTH_CODES: ReadonlySet<string> = new Set([
'app_session_terminated',
]);

// Terminal codes on the refresh path: every one of these signals a dead
// refresh_token that only operator re-import recovers. Aligned with
// sub2api's `isNonRetryableRefreshError`
// (backend/internal/service/token_refresh_service.go:429-451), which shares
// the same list across OpenAI/Claude/Gemini OAuth — Codex is OpenAI OAuth,
// so the set carries over verbatim. `invalid_grant` is included even though
// the refresh-race recovery in access-token-cache.ts may re-classify it
// when a sibling rotation is detected; from the OAuth wire's perspective
// it is still a terminal signal.
const REFRESH_TERMINAL_OAUTH_CODES: ReadonlySet<string> = new Set([
'app_session_terminated',
'invalid_grant',
'invalid_refresh_token',
'invalid_client',
'unauthorized_client',
'access_denied',
]);

const codexTokenRequest = async (
body: URLSearchParams,
terminalCodes: ReadonlySet<string>,
Expand Down Expand Up @@ -65,7 +100,7 @@ const codexTokenRequest = async (
if (message === null && typeof root?.detail === 'string') message = root.detail as string;
message ??= rawText.slice(0, 256);
if (code && terminalCodes.has(code)) {
throw new CodexOAuthSessionTerminatedError(message);
throw new CodexOAuthSessionTerminatedError({ code, message });
}
throw new Error(`Codex OAuth /token returned ${response.status}: ${message}`);
}
Expand Down Expand Up @@ -101,7 +136,7 @@ export const exchangeCodexAuthorizationCode = async (opts: { code: string; codeV
// exchange typically means the operator pasted a stale or wrong callback
// URL, which is recoverable by restarting the PKCE flow rather than
// re-importing.
return await codexTokenRequest(body, new Set(['app_session_terminated']), directFetcher);
return await codexTokenRequest(body, EXCHANGE_TERMINAL_OAUTH_CODES, directFetcher);
};

// `fetcher` is required because the refresh has an associated upstream
Expand All @@ -114,11 +149,10 @@ export const refreshCodexAccessToken = async (refreshToken: string, fetcher: Fet
client_id: CODEX_CLIENT_ID,
scope: CODEX_OAUTH_SCOPE,
});
// OAuth `invalid_grant` on the refresh path is unambiguous — the
// refresh_token has been replayed, revoked, or expired. Same recovery as
// `app_session_terminated`: the operator must re-import a fresh auth.json.
// The error text varies ("Your refresh token has already been used to
// generate a new access token", "Token is no longer valid", etc.); the code
// is the stable signal.
return await codexTokenRequest(body, new Set(['app_session_terminated', 'invalid_grant']), fetcher);
// OAuth `invalid_grant` on the refresh path is ambiguous on its own — it
// can mean a genuinely revoked/expired refresh_token, *or* that a sibling
// worker raced us, won the rotation, and our copy is now stale. The
// access-token cache's `recoverFromRefreshRace` distinguishes by re-reading
// upstream state; the other codes here always mean credential death.
return await codexTokenRequest(body, REFRESH_TERMINAL_OAUTH_CODES, fetcher);
};