Skip to content

Commit dfbb103

Browse files
committed
perf: Stream BigQuery results to Cloud Storage to prevent OOM errors
Refactored the BigQuery to Google Cloud Storage export process to use streams instead of loading the entire result set into a massive memory array. This resolves potential Out-Of-Memory (OOM) errors in Cloud Run and significantly improves overall memory efficiency for large exports. - Updated `infra/dataform-service/src/index.js` to utilize `bigquery.queryResultsStream()`. - Refactored `StorageUpload.exportToJson` in `infra/dataform-service/src/storage.js` to accept a stream. - Implemented a custom `Transform` stream to efficiently format object chunks into a proper JSON array structure while buffering in batches of 1000 for high performance. - Removed unused memory-bound `Readable` initialization from `StorageUpload` constructor.
1 parent 66edb2b commit dfbb103

File tree

2 files changed

+57
-12
lines changed

2 files changed

+57
-12
lines changed

infra/dataform-service/src/index.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ async function handleExport (req, res) {
9595
console.info('Cloud Storage export')
9696
console.log(query, config)
9797

98-
const data = await bigquery.queryResults(query)
98+
const stream = await bigquery.queryResultsStream(query)
9999
const storage = new StorageUpload(config.bucket)
100-
await storage.exportToJson(data, config.name)
100+
await storage.exportToJson(stream, config.name)
101101
} else if (destination === 'firestore') {
102102
console.info('Firestore export')
103103
const jobName = `projects/${projectId}/locations/${location}/jobs/${jobId}`

infra/dataform-service/src/storage.js

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,75 @@
11
import { Storage } from '@google-cloud/storage'
2-
import { Readable } from 'stream'
2+
import { Transform } from 'stream'
33
import zlib from 'zlib'
44

55
const storage = new Storage()
66

77
export class StorageUpload {
88
constructor (bucket) {
99
this.bucket = bucket
10-
this.stream = new Readable({
11-
objectMode: true,
12-
read () {}
13-
})
1410
}
1511

16-
async exportToJson (data, fileName) {
12+
async exportToJson (stream, fileName) {
1713
const bucket = storage.bucket(this.bucket)
1814
const file = bucket.file(fileName)
1915

20-
const jsonData = JSON.stringify(data)
21-
this.stream.push(jsonData)
22-
this.stream.push(null)
16+
let first = true
17+
let batch = []
18+
const BATCH_SIZE = 1000
19+
20+
const jsonTransform = new Transform({
21+
writableObjectMode: true,
22+
transform (chunk, encoding, callback) {
23+
batch.push(chunk)
24+
if (batch.length >= BATCH_SIZE) {
25+
let str = ''
26+
if (first) {
27+
str = '[\n ' + JSON.stringify(batch[0])
28+
for (let i = 1; i < batch.length; i++) {
29+
str += ',\n ' + JSON.stringify(batch[i])
30+
}
31+
first = false
32+
} else {
33+
for (let i = 0; i < batch.length; i++) {
34+
str += ',\n ' + JSON.stringify(batch[i])
35+
}
36+
}
37+
batch = []
38+
callback(null, str)
39+
} else {
40+
callback()
41+
}
42+
},
43+
flush (callback) {
44+
let str = ''
45+
if (batch.length > 0) {
46+
if (first) {
47+
str = '[\n ' + JSON.stringify(batch[0])
48+
for (let i = 1; i < batch.length; i++) {
49+
str += ',\n ' + JSON.stringify(batch[i])
50+
}
51+
first = false
52+
} else {
53+
for (let i = 0; i < batch.length; i++) {
54+
str += ',\n ' + JSON.stringify(batch[i])
55+
}
56+
}
57+
}
58+
59+
if (first) {
60+
str += '[]'
61+
} else {
62+
str += '\n]'
63+
}
64+
callback(null, str)
65+
}
66+
})
2367

2468
const gzip = zlib.createGzip()
2569

2670
await new Promise((resolve, reject) => {
27-
this.stream
71+
stream
72+
.pipe(jsonTransform)
2873
.pipe(gzip)
2974
.pipe(file.createWriteStream({
3075
metadata: {

0 commit comments

Comments
 (0)