-
Notifications
You must be signed in to change notification settings - Fork 102
fix(stream): decode SSE/hex audio for --stream and handle EPIPE #55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ import { musicEndpoint } from '../../client/endpoints'; | |
| import { formatOutput, detectOutputFormat } from '../../output/formatter'; | ||
| import { saveAudioOutput } from '../../output/audio'; | ||
| import { readTextFromPathOrStdin } from '../../utils/fs'; | ||
| import { pipeAudioSseToStdout } from '../../utils/audio-stream'; | ||
| import type { Config } from '../../config/schema'; | ||
| import type { GlobalFlags } from '../../types/flags'; | ||
| import type { MusicRequest, MusicResponse } from '../../types/api'; | ||
|
|
@@ -149,14 +150,14 @@ export default defineCommand({ | |
|
|
||
| if (flags.stream) { | ||
| const res = await request(config, { url, method: 'POST', body, stream: true }); | ||
| const reader = res.body?.getReader(); | ||
| if (!reader) throw new CLIError('No response body', ExitCode.GENERAL); | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| process.stdout.write(value); | ||
| try { | ||
| await pipeAudioSseToStdout(res.body); | ||
| } catch (err) { | ||
| if (err instanceof Error && err.message === 'No response body') { | ||
| throw new CLIError('No response body', ExitCode.GENERAL); | ||
| } | ||
| throw err; | ||
| } | ||
|
Comment on lines
+153
to
160
|
||
| reader.releaseLock(); | ||
| return; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ import { speechEndpoint } from '../../client/endpoints'; | |
| import { detectOutputFormat, formatOutput } from '../../output/formatter'; | ||
| import { saveAudioOutput } from '../../output/audio'; | ||
| import { readTextFromPathOrStdin } from '../../utils/fs'; | ||
| import { pipeAudioSseToStdout } from '../../utils/audio-stream'; | ||
| import type { Config } from '../../config/schema'; | ||
| import type { GlobalFlags } from '../../types/flags'; | ||
| import type { SpeechRequest, SpeechResponse } from '../../types/api'; | ||
|
|
@@ -98,14 +99,14 @@ export default defineCommand({ | |
|
|
||
| if (flags.stream) { | ||
| const res = await request(config, { url, method: 'POST', body, stream: true }); | ||
| const reader = res.body?.getReader(); | ||
| if (!reader) throw new CLIError('No response body', ExitCode.GENERAL); | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| process.stdout.write(value); | ||
| try { | ||
| await pipeAudioSseToStdout(res.body); | ||
| } catch (err) { | ||
|
Comment on lines
100
to
+104
|
||
| if (err instanceof Error && err.message === 'No response body') { | ||
| throw new CLIError('No response body', ExitCode.GENERAL); | ||
| } | ||
| throw err; | ||
|
Comment on lines
+102
to
+108
|
||
| } | ||
| reader.releaseLock(); | ||
| return; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| /** | ||
| * Helpers for piping streamed TTS / music responses to stdout as raw audio. | ||
| * | ||
| * The MiniMax streaming endpoints return a Server-Sent Events stream of JSON | ||
| * envelopes whose `data.audio` field is a hex-encoded chunk of the target | ||
| * audio format. The `--stream` CLI flag is documented as writing *raw audio* | ||
| * to stdout (so it can be piped directly into players such as `mpv -`), so | ||
| * this helper parses the SSE frames, decodes the hex payloads, and writes | ||
| * the decoded bytes to stdout. | ||
| */ | ||
|
|
||
| /** | ||
| * Install a one-shot EPIPE handler on stdout so that downstream consumers | ||
| * closing the pipe early (e.g. `... | head`, or a player that exits) does | ||
| * not crash the process with an unhandled `'error'` event. | ||
| */ | ||
| export function installStdoutEpipeHandler(): void { | ||
| process.stdout.on('error', (err: NodeJS.ErrnoException) => { | ||
| if (err && err.code === 'EPIPE') { | ||
| process.exit(0); | ||
| } | ||
| throw err; | ||
| }); | ||
|
Comment on lines
+44
to
+52
|
||
| } | ||
|
|
||
| /** | ||
| * Consume a fetch-style ReadableStream of SSE bytes and write the decoded | ||
| * raw audio bytes (from `data.audio` hex fields) to stdout. | ||
| */ | ||
| export async function pipeAudioSseToStdout( | ||
| body: ReadableStream<Uint8Array> | null | undefined, | ||
| ): Promise<void> { | ||
| const reader = body?.getReader(); | ||
| if (!reader) { | ||
| throw new Error('No response body'); | ||
| } | ||
|
|
||
| installStdoutEpipeHandler(); | ||
|
|
||
| const decoder = new TextDecoder(); | ||
| let buffer = ''; | ||
|
|
||
| try { | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| buffer += decoder.decode(value, { stream: true }); | ||
|
|
||
| // SSE events are separated by blank lines. | ||
| let sep: number; | ||
| while ((sep = buffer.indexOf('\n\n')) >= 0) { | ||
| const event = buffer.slice(0, sep); | ||
| buffer = buffer.slice(sep + 2); | ||
| writeEvent(event); | ||
| } | ||
|
||
| } | ||
|
|
||
| // Flush any trailing event without a terminating blank line. | ||
| buffer += decoder.decode(); | ||
| if (buffer.length > 0) { | ||
| writeEvent(buffer); | ||
| } | ||
| } finally { | ||
| reader.releaseLock(); | ||
| } | ||
| } | ||
|
|
||
| function writeEvent(event: string): void { | ||
| for (const rawLine of event.split('\n')) { | ||
| if (!rawLine.startsWith('data:')) continue; | ||
| // Per SSE spec, an optional single space after `data:` should be stripped. | ||
| const payload = rawLine.slice(5).replace(/^ /, ''); | ||
| if (!payload || payload === '[DONE]') continue; | ||
|
|
||
| let parsed: { data?: { audio?: string } }; | ||
| try { | ||
| parsed = JSON.parse(payload); | ||
| } catch { | ||
| // Non-JSON keepalive or comment — skip. | ||
| continue; | ||
|
||
| } | ||
|
|
||
| const hex = parsed?.data?.audio; | ||
| if (typeof hex === 'string' && hex.length > 0) { | ||
| process.stdout.write(Buffer.from(hex, 'hex')); | ||
| } | ||
|
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
--streampath now decodes SSE->JSON->hex->bytes, but there’s no automated test coverage for this behavior. Consider adding a test that serves an SSE response via the existing mock server helper and verifies the generated bytes written to stdout formusic generate --stream(including chunk-boundary buffering).