Skip to content

Commit 3e80d52

Browse files
committed
fix: stop stream failover replay after output
1 parent 28fd4e8 commit 3e80d52

2 files changed

Lines changed: 47 additions & 12 deletions

File tree

lib/request/stream-failover.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,11 @@ async function readChunkWithSoftHardTimeout(
105105
*
106106
* The returned Response streams bytes from the initialResponse body and, when the stream stalls or errors, will attempt up to `maxFailovers` failovers by calling `getFallbackResponse(attempt, emittedBytes)`. On each successful failover a textual marker is injected into the stream identifying the failover attempt (and `requestInstanceId` when provided). The function performs best-effort cleanup of underlying readers and enforces soft/hard read timeouts as configured via `options`.
107107
*
108-
* Concurrency assumptions: the implementation expects a single consumer reading the returned Response body; callers must not concurrently read the same stream body from multiple consumers. Filesystem/platform note: behavior is platform-agnostic; no filesystem access is performed (Windows-specific filesystem semantics do not apply). Token redaction: any request identifiers embedded in the injected marker are limited to the normalized `requestInstanceId` (trimmed and truncated to 64 chars) to avoid leaking long tokens.
108+
* Concurrency assumptions: the implementation expects a single consumer reading the returned Response body; callers must not concurrently read the same stream body from multiple consumers. Filesystem/platform note: behavior is platform-agnostic; no filesystem access is performed (Windows-specific filesystem semantics do not apply). Token redaction: any request identifiers embedded in the injected marker are limited to the normalized `requestInstanceId` (trimmed and truncated to 64 chars) to avoid leaking long tokens.
109+
*
110+
* Failover safety: once the wrapper has already emitted bytes from the primary stream,
111+
* it will stop attempting fallback replays. Reissuing the upstream request after partial
112+
* output can duplicate streamed text and any side-effectful tool activity.
109113
*
110114
* @param initialResponse - The original Response whose body will be streamed and monitored for stalls/errors.
111115
* @param getFallbackResponse - Async function invoked for each failover attempt with the 1-based attempt number and total emitted bytes; should return a Response with a streaming body to switch to, or `null`/a Response without a body to indicate no fallback.
@@ -162,6 +166,9 @@ export function withStreamingFailover(
162166
if (failoverAttempt >= maxFailovers) {
163167
return false;
164168
}
169+
if (emittedBytes > 0) {
170+
return false;
171+
}
165172
failoverAttempt += 1;
166173
const fallback = await getFallbackResponse(failoverAttempt, emittedBytes);
167174
if (!fallback?.body) {

test/stream-failover.test.ts

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,21 @@ function makeStallingResponse(): Response {
1818
);
1919
}
2020

21+
function makeIdleResponse(): Response {
22+
return new Response(
23+
new ReadableStream<Uint8Array>({
24+
start() {
25+
// Intentionally idle until timeout.
26+
},
27+
}),
28+
{
29+
headers: {
30+
"content-type": "text/event-stream",
31+
},
32+
},
33+
);
34+
}
35+
2136
function makeSseResponse(payload: string): Response {
2237
return new Response(
2338
new ReadableStream<Uint8Array>({
@@ -52,15 +67,14 @@ describe("stream failover", () => {
5267
it("switches to fallback stream when primary stalls", async () => {
5368
vi.useFakeTimers();
5469
const fallback = vi.fn(async () => makeSseResponse("data: second\n\n"));
55-
const response = withStreamingFailover(makeStallingResponse(), fallback, {
70+
const response = withStreamingFailover(makeIdleResponse(), fallback, {
5671
maxFailovers: 1,
5772
stallTimeoutMs: 10,
5873
});
5974

6075
const textPromise = response.text();
6176
await vi.advanceTimersByTimeAsync(1_200);
6277
const text = await textPromise;
63-
expect(text).toContain("data: first");
6478
expect(text).toContain("codex-multi-auth failover 1");
6579
expect(text).toContain("data: second");
6680
expect(fallback).toHaveBeenCalledTimes(1);
@@ -69,7 +83,7 @@ describe("stream failover", () => {
6983
it("includes request id marker when provided", async () => {
7084
vi.useFakeTimers();
7185
const response = withStreamingFailover(
72-
makeStallingResponse(),
86+
makeIdleResponse(),
7387
async () => makeSseResponse("data: fallback\n\n"),
7488
{
7589
maxFailovers: 1,
@@ -87,7 +101,7 @@ describe("stream failover", () => {
87101
it("errors when fallback is unavailable", async () => {
88102
vi.useFakeTimers();
89103
const response = withStreamingFailover(
90-
makeStallingResponse(),
104+
makeIdleResponse(),
91105
async () => null,
92106
{ maxFailovers: 1, stallTimeoutMs: 10 },
93107
);
@@ -101,7 +115,7 @@ describe("stream failover", () => {
101115
it("propagates fallback provider exceptions deterministically", async () => {
102116
vi.useFakeTimers();
103117
const response = withStreamingFailover(
104-
makeStallingResponse(),
118+
makeIdleResponse(),
105119
async () => {
106120
throw new Error("fallback exploded");
107121
},
@@ -114,7 +128,7 @@ describe("stream failover", () => {
114128
await assertion;
115129
});
116130

117-
it("calls fallback exactly once when read-error and timeout race", async () => {
131+
it("does not trigger fallback when read-error and timeout race after bytes emitted", async () => {
118132
vi.useFakeTimers();
119133
const raceResponse = new Response(
120134
new ReadableStream<Uint8Array>({
@@ -139,13 +153,27 @@ describe("stream failover", () => {
139153
});
140154

141155
const textPromise = response.text();
156+
const assertion = expect(textPromise).rejects.toThrow("primary read failure");
142157
await vi.advanceTimersByTimeAsync(1_200);
143-
const text = await textPromise;
158+
await assertion;
144159

145-
expect(fallback).toHaveBeenCalledTimes(1);
146-
expect((text.match(/codex-multi-auth failover 1/g) ?? []).length).toBe(1);
147-
expect(text).toContain("data: first");
148-
expect(text).toContain("data: fallback");
160+
expect(fallback).not.toHaveBeenCalled();
161+
});
162+
163+
it("does not replay after bytes have already been emitted", async () => {
164+
vi.useFakeTimers();
165+
const fallback = vi.fn(async () => makeSseResponse("data: fallback\n\n"));
166+
const response = withStreamingFailover(makeStallingResponse(), fallback, {
167+
maxFailovers: 1,
168+
stallTimeoutMs: 10,
169+
});
170+
171+
const textPromise = response.text();
172+
const assertion = expect(textPromise).rejects.toThrow("SSE stream stalled");
173+
await vi.advanceTimersByTimeAsync(1_200);
174+
await assertion;
175+
176+
expect(fallback).not.toHaveBeenCalled();
149177
});
150178

151179
it("releases underlying reader when wrapped stream is cancelled", async () => {

0 commit comments

Comments
 (0)