Skip to content

Commit 66085ec

Browse files
committed
fix: cap request retries and stream failover
1 parent ce56f72 commit 66085ec

4 files changed

Lines changed: 221 additions & 14 deletions

File tree

index.ts

Lines changed: 75 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ import {
151151
parseEnvInt,
152152
parseFailoverMode,
153153
} from "./lib/request/failover-config.js";
154+
import {
155+
buildStreamFailoverCandidateOrder,
156+
capStreamFailoverMax,
157+
computeOutboundRequestAttemptBudget,
158+
} from "./lib/request/request-attempt-budget.js";
154159
import {
155160
evaluateFailurePolicy,
156161
type FailoverMode,
@@ -732,8 +737,10 @@ let sessionAffinityWriteVersion = 0;
732737
);
733738
const streamFailoverMax = Math.max(
734739
0,
735-
parseEnvInt(process.env.CODEX_AUTH_STREAM_FAILOVER_MAX) ??
736-
STREAM_FAILOVER_MAX_BY_MODE[failoverMode],
740+
capStreamFailoverMax(
741+
parseEnvInt(process.env.CODEX_AUTH_STREAM_FAILOVER_MAX) ??
742+
STREAM_FAILOVER_MAX_BY_MODE[failoverMode],
743+
),
737744
);
738745
const streamFailoverSoftTimeoutMs = Math.max(
739746
1_000,
@@ -919,11 +926,36 @@ let sessionAffinityWriteVersion = 0;
919926
);
920927
runtimeMetrics.lastRequestAt = Date.now();
921928

922-
const abortSignal = requestInit?.signal ?? init?.signal ?? null;
923-
const sleep = createAbortableSleep(abortSignal);
929+
const abortSignal = requestInit?.signal ?? init?.signal ?? null;
930+
const sleep = createAbortableSleep(abortSignal);
931+
const maxOutboundRequestAttempts =
932+
computeOutboundRequestAttemptBudget({
933+
accountCount: accountManager.getAccountCount(),
934+
maxSameAccountRetries,
935+
emptyResponseMaxRetries,
936+
streamFailoverMax,
937+
});
938+
let outboundRequestAttemptsRemaining =
939+
maxOutboundRequestAttempts;
940+
const tryConsumeOutboundRequestAttempt = (
941+
reason: "primary" | "stream-failover",
942+
accountIndex: number,
943+
): boolean => {
944+
if (outboundRequestAttemptsRemaining <= 0) {
945+
runtimeMetrics.lastError =
946+
`Request attempt budget exhausted after ${maxOutboundRequestAttempts} outbound request(s)`;
947+
logWarn(
948+
`Stopping ${reason} replay after exhausting ${maxOutboundRequestAttempts} outbound request(s) on account ${accountIndex + 1}.`,
949+
);
950+
return false;
951+
}
952+
953+
outboundRequestAttemptsRemaining -= 1;
954+
return true;
955+
};
924956

925-
let allRateLimitedRetries = 0;
926-
let emptyResponseRetries = 0;
957+
let allRateLimitedRetries = 0;
958+
let emptyResponseRetries = 0;
927959
const attemptedUnsupportedFallbackModels = new Set<string>();
928960
if (model) {
929961
attemptedUnsupportedFallbackModels.add(model);
@@ -1293,6 +1325,28 @@ let sessionAffinityWriteVersion = 0;
12931325
let successAccountForResponse = account;
12941326
let successEntitlementAccountKey = entitlementAccountKey;
12951327
while (true) {
1328+
if (
1329+
!tryConsumeOutboundRequestAttempt(
1330+
"primary",
1331+
account.index,
1332+
)
1333+
) {
1334+
runtimeMetrics.failedRequests++;
1335+
const lastErrorDetail = runtimeMetrics.lastError;
1336+
const message = lastErrorDetail
1337+
? `${lastErrorDetail}. Try again after the current retries settle.`
1338+
: "Request attempt budget exhausted. Try again shortly.";
1339+
return new Response(
1340+
JSON.stringify({ error: { message } }),
1341+
{
1342+
status: 503,
1343+
headers: {
1344+
"content-type": "application/json; charset=utf-8",
1345+
},
1346+
},
1347+
);
1348+
}
1349+
12961350
let response: Response;
12971351
const fetchStart = performance.now();
12981352

@@ -2002,13 +2056,13 @@ let sessionAffinityWriteVersion = 0;
20022056
runtimeMetrics.cumulativeLatencyMs += fetchLatencyMs;
20032057
let responseForSuccess = response;
20042058
if (isStreaming) {
2005-
const streamFallbackCandidateOrder = [
2006-
account.index,
2007-
...accountManager
2008-
.getAccountsSnapshot()
2009-
.map((candidate) => candidate.index)
2010-
.filter((index) => index !== account.index),
2011-
];
2059+
const streamFallbackCandidateOrder =
2060+
buildStreamFailoverCandidateOrder(
2061+
account.index,
2062+
accountSnapshotList.map(
2063+
(candidate) => candidate.index,
2064+
),
2065+
);
20122066
responseForSuccess = withStreamingFailover(
20132067
response,
20142068
async (failoverAttempt, emittedBytes) => {
@@ -2125,6 +2179,14 @@ let sessionAffinityWriteVersion = 0;
21252179
) {
21262180
continue;
21272181
}
2182+
if (
2183+
!tryConsumeOutboundRequestAttempt(
2184+
"stream-failover",
2185+
fallbackAccount.index,
2186+
)
2187+
) {
2188+
return null;
2189+
}
21282190
fallbackAccount.accountId = fallbackAccountId;
21292191
if (
21302192
!hadFallbackAccountId &&
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
const MAX_TOTAL_OUTBOUND_REQUEST_ATTEMPTS = 6;
2+
const MAX_STREAM_FAILOVERS = 1;
3+
const MAX_STREAM_FAILOVER_CANDIDATES = 2;
4+
5+
export function capStreamFailoverMax(value: number): number {
6+
return Math.max(
7+
0,
8+
Math.min(MAX_STREAM_FAILOVERS, Math.floor(Number.isFinite(value) ? value : 0)),
9+
);
10+
}
11+
12+
export function computeOutboundRequestAttemptBudget(params: {
13+
accountCount: number;
14+
maxSameAccountRetries: number;
15+
emptyResponseMaxRetries: number;
16+
streamFailoverMax: number;
17+
}): number {
18+
const accountCount = Math.max(1, Math.floor(params.accountCount));
19+
const maxSameAccountRetries = Math.max(
20+
0,
21+
Math.floor(params.maxSameAccountRetries),
22+
);
23+
const emptyResponseMaxRetries = Math.max(
24+
0,
25+
Math.floor(params.emptyResponseMaxRetries),
26+
);
27+
const streamFailoverMax = capStreamFailoverMax(params.streamFailoverMax);
28+
29+
return Math.max(
30+
1,
31+
Math.min(
32+
accountCount +
33+
maxSameAccountRetries +
34+
emptyResponseMaxRetries +
35+
streamFailoverMax,
36+
MAX_TOTAL_OUTBOUND_REQUEST_ATTEMPTS,
37+
),
38+
);
39+
}
40+
41+
export function buildStreamFailoverCandidateOrder(
42+
primaryIndex: number,
43+
accountIndices: number[],
44+
): number[] {
45+
const order: number[] = [primaryIndex];
46+
47+
for (const index of accountIndices) {
48+
if (!Number.isFinite(index) || index === primaryIndex || order.includes(index)) {
49+
continue;
50+
}
51+
order.push(index);
52+
if (order.length >= MAX_STREAM_FAILOVER_CANDIDATES) {
53+
break;
54+
}
55+
}
56+
57+
return order;
58+
}

test/index-retry.test.ts

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,6 @@ describe("OpenAIAuthPlugin rate-limit retry", () => {
461461
tui: { showToast: vi.fn() },
462462
auth: { set: vi.fn() },
463463
} as any;
464-
465464
const plugin = await OpenAIAuthPlugin({ client });
466465

467466
const getAuth = async () => ({
@@ -486,6 +485,63 @@ describe("OpenAIAuthPlugin rate-limit retry", () => {
486485
});
487486
});
488487

488+
it("stops after the bounded outbound request budget even when more accounts are available", async () => {
489+
const accounts = Array.from({ length: 8 }, (_, index) =>
490+
createMockAccount({
491+
index,
492+
accountId: `account-${index + 1}`,
493+
email: `user${index + 1}@example.com`,
494+
refreshToken: `refresh-token-${index + 1}`,
495+
access: `access-token-account-${index + 1}`,
496+
}),
497+
);
498+
accountManagerState.accounts = accounts;
499+
accountManagerState.accountSelections = [...accounts];
500+
501+
const fetchMock = vi.fn().mockImplementation(() =>
502+
Promise.resolve(
503+
new Response(
504+
JSON.stringify({
505+
error: { code: "server_error", message: "temporary outage" },
506+
}),
507+
{
508+
status: 500,
509+
headers: { "content-type": "application/json" },
510+
},
511+
),
512+
),
513+
);
514+
globalThis.fetch = fetchMock as any;
515+
516+
const { OpenAIAuthPlugin } = await import("../index.js");
517+
const client = {
518+
tui: { showToast: vi.fn() },
519+
auth: { set: vi.fn() },
520+
} as any;
521+
const plugin = await OpenAIAuthPlugin({ client });
522+
523+
const getAuth = async () => ({
524+
type: "oauth" as const,
525+
access: "a",
526+
refresh: "r",
527+
expires: Date.now() + 60_000,
528+
multiAccount: true,
529+
});
530+
531+
const sdk = (await plugin.auth.loader(getAuth, { options: {}, models: {} })) as any;
532+
const response = await sdk.fetch("https://example.com", {});
533+
const payload = await response.json();
534+
535+
expect(fetchMock).toHaveBeenCalledTimes(6);
536+
expect(response.status).toBe(503);
537+
expect(payload).toEqual({
538+
error: {
539+
message:
540+
"Request attempt budget exhausted after 6 outbound request(s). Try again after the current retries settle.",
541+
},
542+
});
543+
});
544+
489545
it("rebuilds request headers after rotating to the next workspace", async () => {
490546
const account = createMockAccount({
491547
workspaces: [
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { describe, expect, it } from "vitest";
2+
import {
3+
buildStreamFailoverCandidateOrder,
4+
capStreamFailoverMax,
5+
computeOutboundRequestAttemptBudget,
6+
} from "../lib/request/request-attempt-budget.js";
7+
8+
describe("request attempt budget", () => {
9+
it("caps stream failover to a single retry", () => {
10+
expect(capStreamFailoverMax(0)).toBe(0);
11+
expect(capStreamFailoverMax(1)).toBe(1);
12+
expect(capStreamFailoverMax(3)).toBe(1);
13+
});
14+
15+
it("caps outbound request budgets for large account pools", () => {
16+
expect(
17+
computeOutboundRequestAttemptBudget({
18+
accountCount: 12,
19+
maxSameAccountRetries: 2,
20+
emptyResponseMaxRetries: 2,
21+
streamFailoverMax: 3,
22+
}),
23+
).toBe(6);
24+
});
25+
26+
it("keeps the primary stream account plus at most one alternate", () => {
27+
expect(
28+
buildStreamFailoverCandidateOrder(2, [2, 5, 2, 7, 9]),
29+
).toEqual([2, 5]);
30+
});
31+
});

0 commit comments

Comments
 (0)