diff --git a/lib/storage.ts b/lib/storage.ts index d3c51ee..6375c61 100644 --- a/lib/storage.ts +++ b/lib/storage.ts @@ -5,12 +5,11 @@ import type { ReadableStream } from 'node:stream/web' import type { Database, StorageLocation } from './db' import type { Env } from './schemas' import { randomUUID } from 'node:crypto' -import { once } from 'node:events' import { createReadStream, createWriteStream } from 'node:fs' import fs from 'node:fs/promises' import { Agent } from 'node:https' import path from 'node:path' -import { PassThrough, Readable } from 'node:stream' +import { Readable } from 'node:stream' import { pipeline } from 'node:stream/promises' import { createSingletonPromise } from '@antfu/utils' import { @@ -224,65 +223,20 @@ export class Storage { .execute() try { - if (storageLocation.mergedAt || storageLocation.mergeStartedAt) - return await this.downloadFromCacheEntryLocation(storageLocation) + // Already merged: serve the single merged blob directly. (Under the v2 + // protocol an already-merged entry is handed out as a presigned URL by + // getCacheEntryWithDownloadUrl and never reaches this path.) + if (storageLocation.mergedAt) + return await this.adapter.createDownloadStream(`${storageLocation.folderName}/merged`) await this.ensurePartsExist(storageLocation) - await this.db - .updateTable('storage_locations') - .set({ - mergeStartedAt: Date.now(), - }) - .where('id', '=', storageLocation.id) - .execute() + // First download of an unmerged entry kicks off the merge in the + // background. The client download below is never coupled to it. + if (!storageLocation.mergeStartedAt) await this.tryStartBackgroundMerge(storageLocation) - const responseStream = new PassThrough() - const mergerStream = new PassThrough() - - const mergePromise = this.adapter - .uploadStream(`${storageLocation.folderName}/merged`, mergerStream) - .then(async () => { - await this.db - .updateTable('storage_locations') - .set({ - mergedAt: Date.now(), - }) - .where('id', '=', storageLocation.id) - .execute() - await this.db.transaction().execute(async (tx) => { - await tx - .updateTable('storage_locations') - .set({ - partsDeletedAt: Date.now(), - }) - .where('id', '=', storageLocation.id) - .execute() - await this.adapter.deleteFolder(`${storageLocation.folderName}/parts`) - }) - }) - .catch(async () => { - await this.db - .updateTable('storage_locations') - .set({ - mergedAt: null, - mergeStartedAt: null, - }) - .where('id', '=', storageLocation.id) - .execute() - mergerStream.destroy() - }) - this.mergeStreamPromises.add(mergePromise) - mergePromise.finally(() => this.mergeStreamPromises.delete(mergePromise)) - - this.pumpPartsToStreams(storageLocation, responseStream, mergerStream).catch((err) => { - responseStream.destroy(err) - mergerStream.destroy(err) - if (err instanceof ObjectNotFoundError) - logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`) - }) - - return responseStream + // Always serve the client straight from the parts at full speed. + return Readable.from(this.streamParts(storageLocation)) } catch (err) { if (err instanceof ObjectNotFoundError) { logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`) @@ -292,40 +246,61 @@ export class Storage { } } + /** + * Merge an unmerged entry's parts into a single `merged` blob in the + * background, reading its own part streams so it never throttles the client + * download. Parts are left in place for the `cleanup:parts` task to remove — + * deleting them inline would race readers still streaming the parts. + */ + private async tryStartBackgroundMerge(location: StorageLocation) { + // Atomically claim the merge: only the request that flips mergeStartedAt + // from NULL runs it, so concurrent downloads (across replicas) don't start + // duplicate merges. + const claim = await this.db + .updateTable('storage_locations') + .set({ + mergeStartedAt: Date.now(), + }) + .where('id', '=', location.id) + .where('mergeStartedAt', 'is', null) + .where('mergedAt', 'is', null) + .executeTakeFirst() + if (claim.numUpdatedRows === 0n) return + + const mergePromise = this.adapter + .uploadStream(`${location.folderName}/merged`, Readable.from(this.streamParts(location))) + .then(async () => { + await this.db + .updateTable('storage_locations') + .set({ + mergedAt: Date.now(), + }) + .where('id', '=', location.id) + .execute() + }) + .catch(async (err: unknown) => { + logger.warn( + `Failed to merge cache entry ${location.folderName}: ${err instanceof Error ? err.message : String(err)}`, + ) + // Release the claim so cleanup:merges / a later download can retry. + await this.db + .updateTable('storage_locations') + .set({ + mergeStartedAt: null, + }) + .where('id', '=', location.id) + .execute() + }) + this.mergeStreamPromises.add(mergePromise) + void mergePromise.finally(() => this.mergeStreamPromises.delete(mergePromise)) + } + private async ensurePartsExist(location: StorageLocation) { const partsFolder = `${location.folderName}/parts` const actualPartCount = await this.adapter.countFilesInFolder(partsFolder) if (actualPartCount < location.partCount) throw new ObjectNotFoundError(partsFolder) } - private async downloadFromCacheEntryLocation(location: StorageLocation) { - if (location.mergedAt) return this.adapter.createDownloadStream(`${location.folderName}/merged`) - - await this.ensurePartsExist(location) - return Readable.from(this.streamParts(location)) - } - - private async pumpPartsToStreams( - location: StorageLocation, - responseStream: PassThrough, - mergerStream: PassThrough, - ) { - if (location.partsDeletedAt) throw new Error('No parts to feed') - - for await (const chunk of this.streamParts(location)) { - const responseWantsMore = responseStream.write(chunk) - const mergerWantsMore = mergerStream.write(chunk) - - if (!responseWantsMore) await once(responseStream, 'drain') - if (!mergerWantsMore) await once(mergerStream, 'drain') - } - - responseStream.end() - mergerStream.end() - - await globalThis.gc?.() - } - private async *streamParts(location: StorageLocation) { if (location.partsDeletedAt) throw new Error('No parts to feed for location with deleted parts') diff --git a/tests/concurrent-download.test.ts b/tests/concurrent-download.test.ts new file mode 100644 index 0000000..995459e --- /dev/null +++ b/tests/concurrent-download.test.ts @@ -0,0 +1,79 @@ +import { Buffer } from 'node:buffer' +import crypto from 'node:crypto' +import { Readable } from 'node:stream' + +import { describe, expect, test } from 'vitest' +import { getStorage } from '~/lib/storage' + +const SCOPE = 'refs/heads/main' +const REPO_ID = '123' +const VERSION = 'concurrent-test-version' + +async function drain(stream: Readable | undefined) { + if (!stream) throw new Error('download returned no stream (cache miss)') + const chunks: Buffer[] = [] + for await (const chunk of stream) chunks.push(Buffer.from(chunk)) + return Buffer.concat(chunks) +} + +describe('concurrent downloads of the same unmerged entry', () => { + test( + 'concurrent readers all get the full payload and the merge keeps parts (no inline delete)', + { timeout: 30_000 }, + async () => { + const storage = await getStorage() + + // Build a fresh multi-part (unmerged) entry directly via the storage API. + const key = `concurrent-${crypto.randomUUID()}` + const part0 = crypto.randomBytes(512 * 1024) + const part1 = crypto.randomBytes(512 * 1024) + const expected = Buffer.concat([part0, part1]) + + const upload = await storage.createUpload({ + key, + version: VERSION, + scope: SCOPE, + repoId: REPO_ID, + }) + if (!upload) throw new Error('createUpload returned nothing') + await storage.uploadPart(upload.id, 0, Readable.toWeb(Readable.from(part0))) + await storage.uploadPart(upload.id, 1, Readable.toWeb(Readable.from(part1))) + await storage.completeUpload({ key, version: VERSION, scope: SCOPE, repoId: REPO_ID }) + + const matched = await storage.matchCacheEntry({ + keys: [key], + version: VERSION, + scopes: [SCOPE], + repoId: REPO_ID, + }) + const cacheEntryId = matched?.match.id + if (!cacheEntryId) throw new Error('cache entry not found after completeUpload') + + // Fire many concurrent downloads of the still-unmerged entry. Each must + // receive the full payload, served directly from the parts. + const results = await Promise.all( + Array.from({ length: 8 }, async () => drain(await storage.download(cacheEntryId))), + ) + for (const body of results) expect(body.compare(expected)).toBe(0) + + await storage.waitForOngoingMerges() + + // The merge must NOT delete the parts inline — that is the deletion race + // (deleting parts out from under in-flight readers). Deletion is the + // `cleanup:parts` task's job, so the parts must still be present here. + // The original inline-delete code wipes them on first download, so this + // asserts the fix. `folderName` is the upload id (see createUpload). + const partsRemaining = await storage.adapter.countFilesInFolder(`${upload.id}/parts`) + expect(partsRemaining).toBe(2) + + // The single background merge must have produced a valid merged blob, + // and an already-merged entry must serve it. + const afterMerge = await drain(await storage.download(cacheEntryId)) + expect(afterMerge.compare(expected)).toBe(0) + const mergedBlob = await drain( + await storage.adapter.createDownloadStream(`${upload.id}/merged`), + ) + expect(mergedBlob.compare(expected)).toBe(0) + }, + ) +})