Skip to content

Commit c5ebffd

Browse files
committed
fix: address remaining PR 387 runtime regressions
1 parent e861d3d commit c5ebffd

12 files changed

Lines changed: 459 additions & 96 deletions

index.ts

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -443,14 +443,17 @@ let sessionAffinityWriteVersion = 0;
443443
});
444444
const syncRuntimeObservability = (requestId: string | null): void => {
445445
mutateRuntimeObservabilitySnapshot((snapshot) => {
446+
const now = Date.now();
447+
const poolRemaining = getPoolExhaustionCooldownRemaining(now);
448+
const burstRemaining = getServerBurstCooldownRemaining(now);
446449
snapshot.currentRequestId = requestId;
447450
snapshot.poolExhaustionCooldownUntil =
448-
getPoolExhaustionCooldownRemaining() > 0
449-
? Date.now() + getPoolExhaustionCooldownRemaining()
451+
poolRemaining > 0
452+
? now + poolRemaining
450453
: null;
451454
snapshot.serverBurstCooldownUntil =
452-
getServerBurstCooldownRemaining() > 0
453-
? Date.now() + getServerBurstCooldownRemaining()
455+
burstRemaining > 0
456+
? now + burstRemaining
454457
: null;
455458
snapshot.responsesRequests = runtimeMetrics.responsesRequests;
456459
snapshot.authRefreshRequests = runtimeMetrics.authRefreshRequests;
@@ -1980,18 +1983,26 @@ let sessionAffinityWriteVersion = 0;
19801983
const serverRetryAfterMs = parseRetryAfterHintMs(
19811984
response.headers,
19821985
);
1983-
const policy = evaluateFailurePolicy(
1984-
{
1985-
kind: "server",
1986-
failoverMode,
1987-
serverRetryAfterMs:
1988-
serverRetryAfterMs ?? undefined,
1989-
},
1990-
{ serverCooldownMs: serverErrorCooldownMs },
1991-
);
1992-
// Overload-type server errors (502 Bad Gateway, 503 Service
1993-
// Unavailable, 529 Overloaded) signal upstream capacity
1994-
// pressure. Notify the quota scheduler so it can proactively
1986+
const policy = evaluateFailurePolicy(
1987+
{
1988+
kind: "server",
1989+
failoverMode,
1990+
serverRetryAfterMs:
1991+
serverRetryAfterMs ?? undefined,
1992+
},
1993+
{ serverCooldownMs: serverErrorCooldownMs },
1994+
);
1995+
const serverBurstCooldownUntil = recordServerBurstFailure(
1996+
account.index,
1997+
);
1998+
if (serverBurstCooldownUntil > 0) {
1999+
runtimeMetrics.lastError =
2000+
"Repeated upstream server errors across the account pool";
2001+
syncRuntimeObservability(requestTraceId);
2002+
}
2003+
// Overload-type server errors (502 Bad Gateway, 503 Service
2004+
// Unavailable, 529 Overloaded) signal upstream capacity
2005+
// pressure. Notify the quota scheduler so it can proactively
19952006
// defer subsequent requests for this quota key, mirroring the
19962007
// 429 handler's scheduler awareness.
19972008
if (
@@ -2441,12 +2452,14 @@ let sessionAffinityWriteVersion = 0;
24412452
if (fallbackAccount.index !== account.index) {
24422453
runtimeMetrics.streamFailoverCrossAccountRecoveries += 1;
24432454
runtimeMetrics.accountRotations += 1;
2444-
if (!responseContinuationEnabled) {
2445-
sessionAffinityStore?.remember(
2446-
sessionAffinityKey,
2447-
fallbackAccount.index,
2448-
);
2449-
}
2455+
if (!responseContinuationEnabled) {
2456+
sessionAffinityStore?.rememberWithVersion(
2457+
sessionAffinityKey,
2458+
fallbackAccount.index,
2459+
Date.now(),
2460+
sessionAffinityVersion,
2461+
);
2462+
}
24502463
}
24512464

24522465
logInfo(
@@ -2643,9 +2656,11 @@ let sessionAffinityWriteVersion = 0;
26432656
!responseContinuationEnabled ||
26442657
(!isStreaming && !storedResponseIdForSuccess)
26452658
) {
2646-
sessionAffinityStore?.remember(
2659+
sessionAffinityStore?.rememberWithVersion(
26472660
sessionAffinityKey,
26482661
successAccountForResponse.index,
2662+
Date.now(),
2663+
sessionAffinityVersion,
26492664
);
26502665
}
26512666
runtimeMetrics.successfulRequests++;

lib/request/request-resilience.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ export function getPoolExhaustionCooldownRemaining(now = Date.now()): number {
2626

2727
export function armPoolExhaustionCooldown(waitMs: number, now = Date.now()): number {
2828
const bounded = Math.max(POOL_EXHAUSTION_COOLDOWN_MS, Math.floor(waitMs));
29-
poolExhaustionCooldownUntil = now + bounded;
29+
const nextExpiry = now + bounded;
30+
poolExhaustionCooldownUntil = Math.max(
31+
poolExhaustionCooldownUntil ?? 0,
32+
nextExpiry,
33+
);
3034
return poolExhaustionCooldownUntil;
3135
}
3236

@@ -45,7 +49,14 @@ export function recordServerBurstFailure(
4549
accountIndex: number,
4650
now = Date.now(),
4751
): number {
48-
if (now - serverBurstState.windowStartedAt > SERVER_BURST_COOLDOWN_MS) {
52+
if (serverBurstState.cooldownUntil && serverBurstState.cooldownUntil > now) {
53+
return serverBurstState.cooldownUntil;
54+
}
55+
if (
56+
(serverBurstState.cooldownUntil === null ||
57+
serverBurstState.cooldownUntil <= now) &&
58+
now - serverBurstState.windowStartedAt > SERVER_BURST_COOLDOWN_MS
59+
) {
4960
serverBurstState = {
5061
windowStartedAt: now,
5162
accountIndices: new Set<number>(),

lib/runtime/runtime-observability.ts

Lines changed: 81 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { existsSync, promises as fs } from "node:fs";
1+
import { existsSync, readFileSync, promises as fs } from "node:fs";
22
import { join } from "node:path";
33
import { getCodexMultiAuthDir } from "../runtime-paths.js";
44

@@ -48,6 +48,7 @@ export interface RuntimeObservabilitySnapshot {
4848
const SNAPSHOT_FILE_NAME = "runtime-observability.json";
4949
const PERSIST_RUNTIME_SNAPSHOT = process.env.VITEST !== "true";
5050
const RUNTIME_OBSERVABILITY_SNAPSHOT_VERSION = 1;
51+
const RETRYABLE_SNAPSHOT_ERRORS = new Set(["EBUSY", "EPERM"]);
5152

5253
let snapshotState: RuntimeObservabilitySnapshot | null = null;
5354
let pendingWrite: Promise<void> | null = null;
@@ -99,46 +100,102 @@ function createDefaultSnapshot(): RuntimeObservabilitySnapshot {
99100
};
100101
}
101102

103+
function normalizePersistedSnapshot(
104+
parsed: Partial<RuntimeObservabilitySnapshot> | null,
105+
): RuntimeObservabilitySnapshot | null {
106+
if (!parsed || typeof parsed !== "object") {
107+
return null;
108+
}
109+
if (
110+
typeof parsed.version === "number" &&
111+
parsed.version !== RUNTIME_OBSERVABILITY_SNAPSHOT_VERSION
112+
) {
113+
return null;
114+
}
115+
const base = createDefaultSnapshot();
116+
return {
117+
...base,
118+
...parsed,
119+
version: RUNTIME_OBSERVABILITY_SNAPSHOT_VERSION,
120+
runtimeMetrics: {
121+
...base.runtimeMetrics,
122+
...(parsed.runtimeMetrics ?? {}),
123+
},
124+
};
125+
}
126+
127+
function loadPersistedRuntimeObservabilitySnapshotSync(): RuntimeObservabilitySnapshot | null {
128+
const path = getSnapshotPath();
129+
if (!existsSync(path)) {
130+
return null;
131+
}
132+
try {
133+
const raw = readFileSync(path, "utf-8");
134+
const parsed = JSON.parse(raw) as Partial<RuntimeObservabilitySnapshot> | null;
135+
return normalizePersistedSnapshot(parsed);
136+
} catch {
137+
return null;
138+
}
139+
}
140+
141+
function ensureSnapshotState(): RuntimeObservabilitySnapshot {
142+
if (!snapshotState) {
143+
snapshotState =
144+
(PERSIST_RUNTIME_SNAPSHOT
145+
? loadPersistedRuntimeObservabilitySnapshotSync()
146+
: null) ?? createDefaultSnapshot();
147+
}
148+
return snapshotState;
149+
}
150+
102151
async function writeSnapshot(snapshot: RuntimeObservabilitySnapshot): Promise<void> {
103152
const dir = getCodexMultiAuthDir();
104153
const path = getSnapshotPath();
105154
await fs.mkdir(dir, { recursive: true });
106-
const tempPath = `${path}.${process.pid}.${Date.now()}.tmp`;
107-
let moved = false;
108-
try {
109-
await fs.writeFile(tempPath, JSON.stringify(snapshot, null, 2), "utf-8");
110-
await fs.rename(tempPath, path);
111-
moved = true;
112-
} finally {
113-
if (!moved) {
114-
try {
115-
await fs.unlink(tempPath);
116-
} catch {
117-
// Best-effort cleanup for interrupted writes.
155+
let lastError: unknown = null;
156+
for (let attempt = 0; attempt < 3; attempt += 1) {
157+
const tempPath = `${path}.${process.pid}.${Date.now()}.${attempt}.tmp`;
158+
let moved = false;
159+
try {
160+
await fs.writeFile(tempPath, JSON.stringify(snapshot, null, 2), "utf-8");
161+
await fs.rename(tempPath, path);
162+
moved = true;
163+
return;
164+
} catch (error) {
165+
lastError = error;
166+
const code = (error as NodeJS.ErrnoException | undefined)?.code ?? "";
167+
if (!RETRYABLE_SNAPSHOT_ERRORS.has(code) || attempt >= 2) {
168+
throw error;
169+
}
170+
} finally {
171+
if (!moved) {
172+
try {
173+
await fs.unlink(tempPath);
174+
} catch {
175+
// Best-effort cleanup for interrupted writes.
176+
}
118177
}
119178
}
120179
}
180+
if (lastError) {
181+
throw lastError;
182+
}
121183
}
122184

123185
export function getRuntimeObservabilitySnapshot(): RuntimeObservabilitySnapshot {
124-
if (!snapshotState) {
125-
snapshotState = createDefaultSnapshot();
126-
}
127-
return structuredClone(snapshotState);
186+
return structuredClone(ensureSnapshotState());
128187
}
129188

130189
export function mutateRuntimeObservabilitySnapshot(
131190
mutator: (snapshot: RuntimeObservabilitySnapshot) => void,
132191
): void {
133-
if (!snapshotState) {
134-
snapshotState = createDefaultSnapshot();
135-
}
136-
mutator(snapshotState);
137-
snapshotState.updatedAt = Date.now();
192+
const snapshot = ensureSnapshotState();
193+
mutator(snapshot);
194+
snapshot.updatedAt = Date.now();
138195
if (!PERSIST_RUNTIME_SNAPSHOT) {
139196
return;
140197
}
141-
const nextSnapshot = structuredClone(snapshotState);
198+
const nextSnapshot = structuredClone(snapshot);
142199
pendingWrite = (pendingWrite ?? Promise.resolve())
143200
.catch(() => undefined)
144201
.then(() => writeSnapshot(nextSnapshot));
@@ -152,25 +209,7 @@ export async function loadPersistedRuntimeObservabilitySnapshot(): Promise<Runti
152209
try {
153210
const raw = await fs.readFile(path, "utf-8");
154211
const parsed = JSON.parse(raw) as Partial<RuntimeObservabilitySnapshot> | null;
155-
if (!parsed || typeof parsed !== "object") {
156-
return null;
157-
}
158-
if (
159-
typeof parsed.version === "number" &&
160-
parsed.version !== RUNTIME_OBSERVABILITY_SNAPSHOT_VERSION
161-
) {
162-
return null;
163-
}
164-
const base = createDefaultSnapshot();
165-
return {
166-
...base,
167-
...parsed,
168-
version: RUNTIME_OBSERVABILITY_SNAPSHOT_VERSION,
169-
runtimeMetrics: {
170-
...base.runtimeMetrics,
171-
...(parsed.runtimeMetrics ?? {}),
172-
},
173-
};
212+
return normalizePersistedSnapshot(parsed);
174213
} catch {
175214
return null;
176215
}

lib/runtime/runtime-services.ts

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,25 +37,20 @@ export async function ensureLiveAccountSyncState<
3737
let liveAccountSyncConfigKey = params.currentConfigKey ?? null;
3838

3939
if (!params.enabled) {
40-
if (liveAccountSync) {
41-
liveAccountSync.stop();
42-
liveAccountSync = null;
43-
liveAccountSyncPath = null;
44-
liveAccountSyncConfigKey = null;
45-
}
40+
liveAccountSync?.stop();
4641
return {
47-
liveAccountSync,
48-
liveAccountSyncPath,
49-
liveAccountSyncConfigKey,
42+
liveAccountSync: null,
43+
liveAccountSyncPath: null,
44+
liveAccountSyncConfigKey: null,
5045
};
5146
}
5247

5348
const nextConfigKey = params.configKey ?? null;
5449
if (
5550
liveAccountSync &&
5651
nextConfigKey !== null &&
57-
liveAccountSyncConfigKey !== null &&
58-
liveAccountSyncConfigKey !== nextConfigKey
52+
(liveAccountSyncConfigKey === null ||
53+
liveAccountSyncConfigKey !== nextConfigKey)
5954
) {
6055
liveAccountSync.stop();
6156
liveAccountSync = null;

0 commit comments

Comments
 (0)