From 6785fc5345a3cd6ee84dee58aa717199a0cab603 Mon Sep 17 00:00:00 2001 From: Peter Svensson Date: Wed, 24 Jun 2026 08:50:45 +0200 Subject: [PATCH] fix: decouple client download from merge and stop deleting parts under readers Two defects in Storage.download() for unmerged entries: 1. The first download of an unmerged entry tees the parts into both the client response and the /merged re-upload through one backpressured pipe (pumpPartsToStreams). Because the merge upload runs at queueSize:1, partSize:5MB, the client is throttled to a serial 5MB PUT loop -- a ~700MB restore stalls at ~124MiB and crawls at ~3-4 MB/s (observed ~3m40s). 2. On merge completion the parts folder is deleted inline. Concurrent downloads still streaming those parts then hit ObjectNotFound (500), which clients treat as a miss -> re-upload churn and entries that never stay merged. Fix: serve the client directly from streamParts() at full speed and run the merge as an independent background stream (its own part reads), so the client is never coupled to the merge. Claim the merge atomically (UPDATE ... WHERE mergeStartedAt IS NULL AND mergedAt IS NULL) so concurrent/replicated downloads don't start duplicate merges. Stop deleting parts inline -- the existing hourly cleanup:parts task already removes parts of merged entries, so in-flight readers are never cut off. Adds tests/concurrent-download.test.ts (fails on the inline-delete code, passes after). Related to #235 (which addresses defect 2 via a parts-delete grace period but leaves the defect-1 download throttling). --- lib/storage.ts | 145 +++++++++++++----------------- tests/concurrent-download.test.ts | 79 ++++++++++++++++ 2 files changed, 139 insertions(+), 85 deletions(-) create mode 100644 tests/concurrent-download.test.ts diff --git a/lib/storage.ts b/lib/storage.ts index d3c51ee..6375c61 100644 --- a/lib/storage.ts +++ b/lib/storage.ts @@ -5,12 +5,11 @@ import type { ReadableStream } from 'node:stream/web' import type { Database, StorageLocation } from './db' import type { Env } from './schemas' import { randomUUID } from 'node:crypto' -import { once } from 'node:events' import { createReadStream, createWriteStream } from 'node:fs' import fs from 'node:fs/promises' import { Agent } from 'node:https' import path from 'node:path' -import { PassThrough, Readable } from 'node:stream' +import { Readable } from 'node:stream' import { pipeline } from 'node:stream/promises' import { createSingletonPromise } from '@antfu/utils' import { @@ -224,65 +223,20 @@ export class Storage { .execute() try { - if (storageLocation.mergedAt || storageLocation.mergeStartedAt) - return await this.downloadFromCacheEntryLocation(storageLocation) + // Already merged: serve the single merged blob directly. (Under the v2 + // protocol an already-merged entry is handed out as a presigned URL by + // getCacheEntryWithDownloadUrl and never reaches this path.) + if (storageLocation.mergedAt) + return await this.adapter.createDownloadStream(`${storageLocation.folderName}/merged`) await this.ensurePartsExist(storageLocation) - await this.db - .updateTable('storage_locations') - .set({ - mergeStartedAt: Date.now(), - }) - .where('id', '=', storageLocation.id) - .execute() + // First download of an unmerged entry kicks off the merge in the + // background. The client download below is never coupled to it. + if (!storageLocation.mergeStartedAt) await this.tryStartBackgroundMerge(storageLocation) - const responseStream = new PassThrough() - const mergerStream = new PassThrough() - - const mergePromise = this.adapter - .uploadStream(`${storageLocation.folderName}/merged`, mergerStream) - .then(async () => { - await this.db - .updateTable('storage_locations') - .set({ - mergedAt: Date.now(), - }) - .where('id', '=', storageLocation.id) - .execute() - await this.db.transaction().execute(async (tx) => { - await tx - .updateTable('storage_locations') - .set({ - partsDeletedAt: Date.now(), - }) - .where('id', '=', storageLocation.id) - .execute() - await this.adapter.deleteFolder(`${storageLocation.folderName}/parts`) - }) - }) - .catch(async () => { - await this.db - .updateTable('storage_locations') - .set({ - mergedAt: null, - mergeStartedAt: null, - }) - .where('id', '=', storageLocation.id) - .execute() - mergerStream.destroy() - }) - this.mergeStreamPromises.add(mergePromise) - mergePromise.finally(() => this.mergeStreamPromises.delete(mergePromise)) - - this.pumpPartsToStreams(storageLocation, responseStream, mergerStream).catch((err) => { - responseStream.destroy(err) - mergerStream.destroy(err) - if (err instanceof ObjectNotFoundError) - logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`) - }) - - return responseStream + // Always serve the client straight from the parts at full speed. + return Readable.from(this.streamParts(storageLocation)) } catch (err) { if (err instanceof ObjectNotFoundError) { logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`) @@ -292,40 +246,61 @@ export class Storage { } } + /** + * Merge an unmerged entry's parts into a single `merged` blob in the + * background, reading its own part streams so it never throttles the client + * download. Parts are left in place for the `cleanup:parts` task to remove — + * deleting them inline would race readers still streaming the parts. + */ + private async tryStartBackgroundMerge(location: StorageLocation) { + // Atomically claim the merge: only the request that flips mergeStartedAt + // from NULL runs it, so concurrent downloads (across replicas) don't start + // duplicate merges. + const claim = await this.db + .updateTable('storage_locations') + .set({ + mergeStartedAt: Date.now(), + }) + .where('id', '=', location.id) + .where('mergeStartedAt', 'is', null) + .where('mergedAt', 'is', null) + .executeTakeFirst() + if (claim.numUpdatedRows === 0n) return + + const mergePromise = this.adapter + .uploadStream(`${location.folderName}/merged`, Readable.from(this.streamParts(location))) + .then(async () => { + await this.db + .updateTable('storage_locations') + .set({ + mergedAt: Date.now(), + }) + .where('id', '=', location.id) + .execute() + }) + .catch(async (err: unknown) => { + logger.warn( + `Failed to merge cache entry ${location.folderName}: ${err instanceof Error ? err.message : String(err)}`, + ) + // Release the claim so cleanup:merges / a later download can retry. + await this.db + .updateTable('storage_locations') + .set({ + mergeStartedAt: null, + }) + .where('id', '=', location.id) + .execute() + }) + this.mergeStreamPromises.add(mergePromise) + void mergePromise.finally(() => this.mergeStreamPromises.delete(mergePromise)) + } + private async ensurePartsExist(location: StorageLocation) { const partsFolder = `${location.folderName}/parts` const actualPartCount = await this.adapter.countFilesInFolder(partsFolder) if (actualPartCount < location.partCount) throw new ObjectNotFoundError(partsFolder) } - private async downloadFromCacheEntryLocation(location: StorageLocation) { - if (location.mergedAt) return this.adapter.createDownloadStream(`${location.folderName}/merged`) - - await this.ensurePartsExist(location) - return Readable.from(this.streamParts(location)) - } - - private async pumpPartsToStreams( - location: StorageLocation, - responseStream: PassThrough, - mergerStream: PassThrough, - ) { - if (location.partsDeletedAt) throw new Error('No parts to feed') - - for await (const chunk of this.streamParts(location)) { - const responseWantsMore = responseStream.write(chunk) - const mergerWantsMore = mergerStream.write(chunk) - - if (!responseWantsMore) await once(responseStream, 'drain') - if (!mergerWantsMore) await once(mergerStream, 'drain') - } - - responseStream.end() - mergerStream.end() - - await globalThis.gc?.() - } - private async *streamParts(location: StorageLocation) { if (location.partsDeletedAt) throw new Error('No parts to feed for location with deleted parts') diff --git a/tests/concurrent-download.test.ts b/tests/concurrent-download.test.ts new file mode 100644 index 0000000..995459e --- /dev/null +++ b/tests/concurrent-download.test.ts @@ -0,0 +1,79 @@ +import { Buffer } from 'node:buffer' +import crypto from 'node:crypto' +import { Readable } from 'node:stream' + +import { describe, expect, test } from 'vitest' +import { getStorage } from '~/lib/storage' + +const SCOPE = 'refs/heads/main' +const REPO_ID = '123' +const VERSION = 'concurrent-test-version' + +async function drain(stream: Readable | undefined) { + if (!stream) throw new Error('download returned no stream (cache miss)') + const chunks: Buffer[] = [] + for await (const chunk of stream) chunks.push(Buffer.from(chunk)) + return Buffer.concat(chunks) +} + +describe('concurrent downloads of the same unmerged entry', () => { + test( + 'concurrent readers all get the full payload and the merge keeps parts (no inline delete)', + { timeout: 30_000 }, + async () => { + const storage = await getStorage() + + // Build a fresh multi-part (unmerged) entry directly via the storage API. + const key = `concurrent-${crypto.randomUUID()}` + const part0 = crypto.randomBytes(512 * 1024) + const part1 = crypto.randomBytes(512 * 1024) + const expected = Buffer.concat([part0, part1]) + + const upload = await storage.createUpload({ + key, + version: VERSION, + scope: SCOPE, + repoId: REPO_ID, + }) + if (!upload) throw new Error('createUpload returned nothing') + await storage.uploadPart(upload.id, 0, Readable.toWeb(Readable.from(part0))) + await storage.uploadPart(upload.id, 1, Readable.toWeb(Readable.from(part1))) + await storage.completeUpload({ key, version: VERSION, scope: SCOPE, repoId: REPO_ID }) + + const matched = await storage.matchCacheEntry({ + keys: [key], + version: VERSION, + scopes: [SCOPE], + repoId: REPO_ID, + }) + const cacheEntryId = matched?.match.id + if (!cacheEntryId) throw new Error('cache entry not found after completeUpload') + + // Fire many concurrent downloads of the still-unmerged entry. Each must + // receive the full payload, served directly from the parts. + const results = await Promise.all( + Array.from({ length: 8 }, async () => drain(await storage.download(cacheEntryId))), + ) + for (const body of results) expect(body.compare(expected)).toBe(0) + + await storage.waitForOngoingMerges() + + // The merge must NOT delete the parts inline — that is the deletion race + // (deleting parts out from under in-flight readers). Deletion is the + // `cleanup:parts` task's job, so the parts must still be present here. + // The original inline-delete code wipes them on first download, so this + // asserts the fix. `folderName` is the upload id (see createUpload). + const partsRemaining = await storage.adapter.countFilesInFolder(`${upload.id}/parts`) + expect(partsRemaining).toBe(2) + + // The single background merge must have produced a valid merged blob, + // and an already-merged entry must serve it. + const afterMerge = await drain(await storage.download(cacheEntryId)) + expect(afterMerge.compare(expected)).toBe(0) + const mergedBlob = await drain( + await storage.adapter.createDownloadStream(`${upload.id}/merged`), + ) + expect(mergedBlob.compare(expected)).toBe(0) + }, + ) +})