Skip to content

Commit f67d49d

Browse files
Changed protocol parser to supported chunked rowsets with compression #88
1 parent c817758 commit f67d49d

5 files changed

Lines changed: 67 additions & 34 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,4 @@ tools/
129129
test/assets/testing-*.db
130130
test/assets/1brc/1brc_*.csv
131131
test/assets/1brc/1brc_*.sql
132+
reports/test-report.html

bun.lockb

-322 Bytes
Binary file not shown.

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
"homepage": "https://github.com/sqlitecloud/sqlitecloud-js#readme",
4545
"dependencies": {
4646
"eventemitter3": "^5.0.1",
47-
"lz4js": "^0.2.0",
4847
"socket.io": "^4.7.5",
4948
"socket.io-client": "^4.7.5"
5049
},

src/drivers/connection-tls.ts

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import {
2020

2121
import * as tls from 'tls'
2222

23+
import fs from 'fs'
24+
2325
/**
2426
* Implementation of SQLiteCloudConnection that connects to the database using specific tls APIs
2527
* that connect to native sockets or tls sockets and communicates via raw, binary protocol.
@@ -140,11 +142,14 @@ export class SQLiteCloudTlsConnection extends SQLiteCloudConnection {
140142
// callback to be called when a command is finished processing
141143
private processCallback?: ResultsCallback
142144

145+
private pendingChunks: Buffer[] = []
146+
143147
/** Handles data received in response to an outbound command sent by processCommands */
144148
private processCommandsData(data: Buffer) {
145149
try {
146150
// append data to buffer as it arrives
147151
if (data.length && data.length > 0) {
152+
// console.debug(`processCommandsData - received ${data.length} bytes`)
148153
this.buffer = Buffer.concat([this.buffer, data])
149154
}
150155

@@ -160,22 +165,31 @@ export class SQLiteCloudTlsConnection extends SQLiteCloudConnection {
160165
bufferString = bufferString.substring(0, 100) + '...' + bufferString.substring(bufferString.length - 40)
161166
}
162167
const elapsedMs = new Date().getTime() - this.startedOn.getTime()
163-
console.debug(`<- ${bufferString} (${elapsedMs}ms)`)
168+
console.debug(`<- ${bufferString} (${bufferString.length} bytes, ${elapsedMs}ms)`)
164169
}
165170

166171
// need to decompress this buffer before decoding?
167172
if (dataType === CMD_COMPRESSED) {
168-
;({ buffer: this.buffer, dataType } = decompressBuffer(this.buffer))
169-
}
170-
171-
if (dataType !== CMD_ROWSET_CHUNK) {
172-
const { data } = popData(this.buffer)
173-
this.processCommandsFinish?.call(this, null, data)
173+
const decompressResults = decompressBuffer(this.buffer)
174+
if (decompressResults.dataType === CMD_ROWSET_CHUNK) {
175+
this.pendingChunks.push(decompressResults.buffer)
176+
this.buffer = decompressResults.remainingBuffer
177+
this.processCommandsData(Buffer.alloc(0))
178+
return
179+
} else {
180+
const { data } = popData(decompressResults.buffer)
181+
this.processCommandsFinish?.call(this, null, data)
182+
}
174183
} else {
175-
// check if rowset received the ending chunk in which case it can be unpacked
176-
if (bufferEndsWith(this.buffer, ROWSET_CHUNKS_END)) {
177-
const parsedData = parseRowsetChunks([this.buffer])
178-
this.processCommandsFinish?.call(this, null, parsedData)
184+
if (dataType !== CMD_ROWSET_CHUNK) {
185+
const { data } = popData(this.buffer)
186+
this.processCommandsFinish?.call(this, null, data)
187+
} else {
188+
const completeChunk = bufferEndsWith(this.buffer, ROWSET_CHUNKS_END)
189+
if (completeChunk) {
190+
const parsedData = parseRowsetChunks([...this.pendingChunks, this.buffer])
191+
this.processCommandsFinish?.call(this, null, parsedData)
192+
}
179193
}
180194
}
181195
}
@@ -188,7 +202,8 @@ export class SQLiteCloudTlsConnection extends SQLiteCloudConnection {
188202
}
189203
}
190204
} catch (error) {
191-
console.assert(error instanceof Error)
205+
console.error(`processCommandsData - error: ${error}`)
206+
console.assert(error instanceof Error, 'An error occoured while processing data')
192207
if (error instanceof Error) {
193208
this.processCommandsFinish?.call(this, error)
194209
}
@@ -210,6 +225,7 @@ export class SQLiteCloudTlsConnection extends SQLiteCloudConnection {
210225
}
211226

212227
this.buffer = Buffer.alloc(0)
228+
this.pendingChunks = []
213229
}
214230

215231
/** Disconnect immediately, release connection, no events. */

src/drivers/protocol.ts

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55
import { SQLiteCloudError, type SQLCloudRowsetMetadata, type SQLiteCloudDataTypes } from './types'
66
import { SQLiteCloudRowset } from './rowset'
77

8+
import fs from 'fs'
89
const lz4 = require('lz4js')
910

11+
import lz4bis from 'lz4'
12+
1013
// The server communicates with clients via commands defined in
1114
// SQLiteCloud Server Protocol (SCSP), see more at:
1215
// https://github.com/sqlitecloud/sdk/blob/master/PROTOCOL.md
@@ -47,32 +50,39 @@ export function parseCommandLength(data: Buffer): number {
4750
}
4851

4952
/** Receive a compressed buffer, decompress with lz4, return buffer and datatype */
50-
export function decompressBuffer(buffer: Buffer): { buffer: Buffer; dataType: string } {
53+
export function decompressBuffer(buffer: Buffer): { buffer: Buffer; dataType: string; remainingBuffer: Buffer } {
54+
// https://github.com/sqlitecloud/sdk/blob/master/PROTOCOL.md#scsp-compression
55+
// jest test/database.test.ts -t "select large result set"
56+
57+
// starts with %<commandLength> <compressed> <uncompressed>
5158
const spaceIndex = buffer.indexOf(' ')
52-
buffer = buffer.subarray(spaceIndex + 1)
59+
const commandLength = parseInt(buffer.subarray(1, spaceIndex).toString('utf8'))
5360

54-
// extract compressed size
55-
const compressedSize = parseInt(buffer.subarray(0, buffer.indexOf(' ') + 1).toString('utf8'))
56-
buffer = buffer.subarray(buffer.indexOf(' ') + 1)
61+
let commandBuffer = buffer.subarray(spaceIndex + 1, spaceIndex + 1 + commandLength)
62+
const remainingBuffer = buffer.subarray(spaceIndex + 1 + commandLength)
5763

58-
// extract decompressed size
59-
const decompressedSize = parseInt(buffer.subarray(0, buffer.indexOf(' ') + 1).toString('utf8'))
60-
buffer = buffer.subarray(buffer.indexOf(' ') + 1)
64+
// extract compressed + decompressed size
65+
const compressedSize = parseInt(commandBuffer.subarray(0, commandBuffer.indexOf(' ') + 1).toString('utf8'))
66+
commandBuffer = commandBuffer.subarray(commandBuffer.indexOf(' ') + 1)
67+
const decompressedSize = parseInt(commandBuffer.subarray(0, commandBuffer.indexOf(' ') + 1).toString('utf8'))
68+
commandBuffer = commandBuffer.subarray(commandBuffer.indexOf(' ') + 1)
6169

6270
// extract compressed dataType
63-
const dataType = buffer.subarray(0, 1).toString('utf8')
64-
const decompressedBuffer = Buffer.alloc(decompressedSize)
65-
const compressedBuffer = buffer.subarray(buffer.length - compressedSize)
71+
const dataType = commandBuffer.subarray(0, 1).toString('utf8')
72+
let decompressedBuffer = Buffer.alloc(decompressedSize)
73+
const compressedBuffer = commandBuffer.subarray(commandBuffer.length - compressedSize)
6674

6775
// lz4js library is javascript and doesn't have types so we silence the type check
6876
// eslint-disable-next-line
6977
const decompressionResult: number = lz4.decompressBlock(compressedBuffer, decompressedBuffer, 0, compressedSize, 0)
70-
buffer = Buffer.concat([buffer.subarray(0, buffer.length - compressedSize), decompressedBuffer])
78+
// the entire command is composed of the header (which is not compressed) + the decompressed block
79+
decompressedBuffer = Buffer.concat([commandBuffer.subarray(0, commandBuffer.length - compressedSize), decompressedBuffer])
80+
7181
if (decompressionResult <= 0 || decompressionResult !== decompressedSize) {
7282
throw new Error(`lz4 decompression error at offset ${decompressionResult}`)
7383
}
7484

75-
return { buffer, dataType }
85+
return { buffer: decompressedBuffer, dataType, remainingBuffer }
7686
}
7787

7888
/** Parse error message or extended error message */
@@ -136,7 +146,7 @@ export function parseRowsetHeader(buffer: Buffer): { index: number; metadata: SQ
136146
// extract rowset header
137147
const { data, fwdBuffer } = popIntegers(buffer, 3)
138148

139-
return {
149+
const result = {
140150
index,
141151
metadata: {
142152
version: data[0],
@@ -146,6 +156,9 @@ export function parseRowsetHeader(buffer: Buffer): { index: number; metadata: SQ
146156
},
147157
fwdBuffer
148158
}
159+
160+
// console.debug(`parseRowsetHeader`, result)
161+
return result
149162
}
150163

151164
/** Extract column names and, optionally, more metadata out of a rowset's header */
@@ -218,7 +231,7 @@ export function parseRowsetChunks(buffers: Buffer[]): SQLiteCloudRowset {
218231

219232
// validate and skip data type
220233
const dataType = buffer.subarray(0, 1).toString()
221-
console.assert(dataType === CMD_ROWSET_CHUNK)
234+
if (dataType !== CMD_ROWSET_CHUNK) throw new Error(`parseRowsetChunks - dataType: ${dataType} should be CMD_ROWSET_CHUNK`)
222235
buffer = buffer.subarray(buffer.indexOf(' ') + 1)
223236

224237
while (buffer.length > 0 && !bufferStartsWith(buffer, ROWSET_CHUNKS_END)) {
@@ -268,23 +281,25 @@ export function popData(buffer: Buffer): { data: SQLiteCloudDataTypes | SQLiteCl
268281

269282
// first character is the data type
270283
console.assert(buffer && buffer instanceof Buffer)
271-
const dataType: string = buffer.subarray(0, 1).toString('utf8')
272-
console.assert(dataType !== CMD_COMPRESSED, "Compressed data shouldn't be decompressed before parsing")
273-
console.assert(dataType !== CMD_ROWSET_CHUNK, 'Chunked data should be parsed by parseRowsetChunks')
284+
let dataType: string = buffer.subarray(0, 1).toString('utf8')
285+
if (dataType == CMD_COMPRESSED) throw new Error('Compressed data should be decompressed before parsing')
286+
if (dataType == CMD_ROWSET_CHUNK) throw new Error('Chunked data should be parsed by parseRowsetChunks')
274287

275288
let spaceIndex = buffer.indexOf(' ')
276289
if (spaceIndex === -1) {
277290
spaceIndex = buffer.length - 1
278291
}
279292

280-
let commandEnd = -1
293+
let commandEnd = -1,
294+
commandLength = -1
281295
if (dataType === CMD_INT || dataType === CMD_FLOAT || dataType === CMD_NULL) {
282296
commandEnd = spaceIndex + 1
283297
} else {
284-
const commandLength = parseInt(buffer.subarray(1, spaceIndex).toString())
298+
commandLength = parseInt(buffer.subarray(1, spaceIndex).toString())
285299
commandEnd = spaceIndex + 1 + commandLength
286300
}
287301

302+
// console.debug(`popData - dataType: ${dataType}, spaceIndex: ${spaceIndex}, commandLength: ${commandLength}, commandEnd: ${commandEnd}`)
288303
switch (dataType) {
289304
case CMD_INT:
290305
return popResults(parseInt(buffer.subarray(1, spaceIndex).toString()))
@@ -313,7 +328,9 @@ export function popData(buffer: Buffer): { data: SQLiteCloudDataTypes | SQLiteCl
313328
break
314329
}
315330

316-
throw new TypeError(`Data type: ${dataType} is not defined in SCSP`)
331+
const msg = `popData - Data type: ${Number(dataType)} '${dataType}' is not defined in SCSP, spaceIndex: ${spaceIndex}, commandLength: ${commandLength}, commandEnd: ${commandEnd}`
332+
console.error(msg)
333+
throw new TypeError(msg)
317334
}
318335

319336
/** Format a command to be sent via SCSP protocol */

0 commit comments

Comments
 (0)