Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 60 additions & 85 deletions lib/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ import type { ReadableStream } from 'node:stream/web'
import type { Database, StorageLocation } from './db'
import type { Env } from './schemas'
import { randomUUID } from 'node:crypto'
import { once } from 'node:events'
import { createReadStream, createWriteStream } from 'node:fs'
import fs from 'node:fs/promises'
import { Agent } from 'node:https'
import path from 'node:path'
import { PassThrough, Readable } from 'node:stream'
import { Readable } from 'node:stream'
import { pipeline } from 'node:stream/promises'
import { createSingletonPromise } from '@antfu/utils'
import {
Expand Down Expand Up @@ -224,65 +223,20 @@ export class Storage {
.execute()

try {
if (storageLocation.mergedAt || storageLocation.mergeStartedAt)
return await this.downloadFromCacheEntryLocation(storageLocation)
// Already merged: serve the single merged blob directly. (Under the v2
// protocol an already-merged entry is handed out as a presigned URL by
// getCacheEntryWithDownloadUrl and never reaches this path.)
if (storageLocation.mergedAt)
return await this.adapter.createDownloadStream(`${storageLocation.folderName}/merged`)

await this.ensurePartsExist(storageLocation)

await this.db
.updateTable('storage_locations')
.set({
mergeStartedAt: Date.now(),
})
.where('id', '=', storageLocation.id)
.execute()
// First download of an unmerged entry kicks off the merge in the
// background. The client download below is never coupled to it.
if (!storageLocation.mergeStartedAt) await this.tryStartBackgroundMerge(storageLocation)

const responseStream = new PassThrough()
const mergerStream = new PassThrough()

const mergePromise = this.adapter
.uploadStream(`${storageLocation.folderName}/merged`, mergerStream)
.then(async () => {
await this.db
.updateTable('storage_locations')
.set({
mergedAt: Date.now(),
})
.where('id', '=', storageLocation.id)
.execute()
await this.db.transaction().execute(async (tx) => {
await tx
.updateTable('storage_locations')
.set({
partsDeletedAt: Date.now(),
})
.where('id', '=', storageLocation.id)
.execute()
await this.adapter.deleteFolder(`${storageLocation.folderName}/parts`)
})
})
.catch(async () => {
await this.db
.updateTable('storage_locations')
.set({
mergedAt: null,
mergeStartedAt: null,
})
.where('id', '=', storageLocation.id)
.execute()
mergerStream.destroy()
})
this.mergeStreamPromises.add(mergePromise)
mergePromise.finally(() => this.mergeStreamPromises.delete(mergePromise))

this.pumpPartsToStreams(storageLocation, responseStream, mergerStream).catch((err) => {
responseStream.destroy(err)
mergerStream.destroy(err)
if (err instanceof ObjectNotFoundError)
logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`)
})

return responseStream
// Always serve the client straight from the parts at full speed.
return Readable.from(this.streamParts(storageLocation))
} catch (err) {
if (err instanceof ObjectNotFoundError) {
logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`)
Expand All @@ -292,40 +246,61 @@ export class Storage {
}
}

/**
* Merge an unmerged entry's parts into a single `merged` blob in the
* background, reading its own part streams so it never throttles the client
* download. Parts are left in place for the `cleanup:parts` task to remove —
* deleting them inline would race readers still streaming the parts.
*/
private async tryStartBackgroundMerge(location: StorageLocation) {
// Atomically claim the merge: only the request that flips mergeStartedAt
// from NULL runs it, so concurrent downloads (across replicas) don't start
// duplicate merges.
const claim = await this.db
.updateTable('storage_locations')
.set({
mergeStartedAt: Date.now(),
})
.where('id', '=', location.id)
.where('mergeStartedAt', 'is', null)
.where('mergedAt', 'is', null)
.executeTakeFirst()
if (claim.numUpdatedRows === 0n) return

const mergePromise = this.adapter
.uploadStream(`${location.folderName}/merged`, Readable.from(this.streamParts(location)))
.then(async () => {
await this.db
.updateTable('storage_locations')
.set({
mergedAt: Date.now(),
})
.where('id', '=', location.id)
.execute()
})
.catch(async (err: unknown) => {
logger.warn(
`Failed to merge cache entry ${location.folderName}: ${err instanceof Error ? err.message : String(err)}`,
)
// Release the claim so cleanup:merges / a later download can retry.
await this.db
.updateTable('storage_locations')
.set({
mergeStartedAt: null,
})
.where('id', '=', location.id)
.execute()
})
this.mergeStreamPromises.add(mergePromise)
void mergePromise.finally(() => this.mergeStreamPromises.delete(mergePromise))
}

private async ensurePartsExist(location: StorageLocation) {
const partsFolder = `${location.folderName}/parts`
const actualPartCount = await this.adapter.countFilesInFolder(partsFolder)
if (actualPartCount < location.partCount) throw new ObjectNotFoundError(partsFolder)
}

private async downloadFromCacheEntryLocation(location: StorageLocation) {
if (location.mergedAt) return this.adapter.createDownloadStream(`${location.folderName}/merged`)

await this.ensurePartsExist(location)
return Readable.from(this.streamParts(location))
}

private async pumpPartsToStreams(
location: StorageLocation,
responseStream: PassThrough,
mergerStream: PassThrough,
) {
if (location.partsDeletedAt) throw new Error('No parts to feed')

for await (const chunk of this.streamParts(location)) {
const responseWantsMore = responseStream.write(chunk)
const mergerWantsMore = mergerStream.write(chunk)

if (!responseWantsMore) await once(responseStream, 'drain')
if (!mergerWantsMore) await once(mergerStream, 'drain')
}

responseStream.end()
mergerStream.end()

await globalThis.gc?.()
}

private async *streamParts(location: StorageLocation) {
if (location.partsDeletedAt) throw new Error('No parts to feed for location with deleted parts')

Expand Down
79 changes: 79 additions & 0 deletions tests/concurrent-download.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { Buffer } from 'node:buffer'
import crypto from 'node:crypto'
import { Readable } from 'node:stream'

import { describe, expect, test } from 'vitest'
import { getStorage } from '~/lib/storage'

const SCOPE = 'refs/heads/main'
const REPO_ID = '123'
const VERSION = 'concurrent-test-version'

async function drain(stream: Readable | undefined) {
if (!stream) throw new Error('download returned no stream (cache miss)')
const chunks: Buffer[] = []
for await (const chunk of stream) chunks.push(Buffer.from(chunk))
return Buffer.concat(chunks)
}

describe('concurrent downloads of the same unmerged entry', () => {
test(
'concurrent readers all get the full payload and the merge keeps parts (no inline delete)',
{ timeout: 30_000 },
async () => {
const storage = await getStorage()

// Build a fresh multi-part (unmerged) entry directly via the storage API.
const key = `concurrent-${crypto.randomUUID()}`
const part0 = crypto.randomBytes(512 * 1024)
const part1 = crypto.randomBytes(512 * 1024)
const expected = Buffer.concat([part0, part1])

const upload = await storage.createUpload({
key,
version: VERSION,
scope: SCOPE,
repoId: REPO_ID,
})
if (!upload) throw new Error('createUpload returned nothing')
await storage.uploadPart(upload.id, 0, Readable.toWeb(Readable.from(part0)))
await storage.uploadPart(upload.id, 1, Readable.toWeb(Readable.from(part1)))
await storage.completeUpload({ key, version: VERSION, scope: SCOPE, repoId: REPO_ID })

const matched = await storage.matchCacheEntry({
keys: [key],
version: VERSION,
scopes: [SCOPE],
repoId: REPO_ID,
})
const cacheEntryId = matched?.match.id
if (!cacheEntryId) throw new Error('cache entry not found after completeUpload')

// Fire many concurrent downloads of the still-unmerged entry. Each must
// receive the full payload, served directly from the parts.
const results = await Promise.all(
Array.from({ length: 8 }, async () => drain(await storage.download(cacheEntryId))),
)
for (const body of results) expect(body.compare(expected)).toBe(0)

await storage.waitForOngoingMerges()

// The merge must NOT delete the parts inline — that is the deletion race
// (deleting parts out from under in-flight readers). Deletion is the
// `cleanup:parts` task's job, so the parts must still be present here.
// The original inline-delete code wipes them on first download, so this
// asserts the fix. `folderName` is the upload id (see createUpload).
const partsRemaining = await storage.adapter.countFilesInFolder(`${upload.id}/parts`)
expect(partsRemaining).toBe(2)

// The single background merge must have produced a valid merged blob,
// and an already-merged entry must serve it.
const afterMerge = await drain(await storage.download(cacheEntryId))
expect(afterMerge.compare(expected)).toBe(0)
const mergedBlob = await drain(
await storage.adapter.createDownloadStream(`${upload.id}/merged`),
)
expect(mergedBlob.compare(expected)).toBe(0)
},
)
})