Skip to content

Commit 5c5b87f

Browse files
[PECO-953] Optimize CloudFetchResultHandler memory consumption (#204)
* Refactoring: Introduce concept of results provider; convert FetchResultsHelper into provider of TRowSet Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Convert Json/Arrow/CloudFetch result handlers to implement result provider interface Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Refine the code and update tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Make sure that DBSQLOperation.fetchChunk returns chunks of requested size Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add option to disable result buffering & slicing Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Update existing tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add tests for ResultSlicer Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Refine code Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add more tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Optimize CloudFetchResultHandler memory consumption Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add and update tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent 69d88b8 commit 5c5b87f

17 files changed

Lines changed: 460 additions & 341 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ node_modules
55
.nyc_output
66
coverage_e2e
77
coverage_unit
8+
.clinic
89
*.code-workspace
910
dist
1011
*.DS_Store

.prettierignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ node_modules
55
.nyc_output
66
coverage_e2e
77
coverage_unit
8+
.clinic
89

910
dist
1011
thrift

lib/DBSQLOperation/index.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import RowSetProvider from '../result/RowSetProvider';
2323
import JsonResultHandler from '../result/JsonResultHandler';
2424
import ArrowResultHandler from '../result/ArrowResultHandler';
2525
import CloudFetchResultHandler from '../result/CloudFetchResultHandler';
26+
import ArrowResultConverter from '../result/ArrowResultConverter';
2627
import ResultSlicer from '../result/ResultSlicer';
2728
import { definedOrError } from '../utils';
2829
import HiveDriverError from '../errors/HiveDriverError';
@@ -377,10 +378,18 @@ export default class DBSQLOperation implements IOperation {
377378
resultSource = new JsonResultHandler(this.context, this._data, metadata.schema);
378379
break;
379380
case TSparkRowSetType.ARROW_BASED_SET:
380-
resultSource = new ArrowResultHandler(this.context, this._data, metadata.schema, metadata.arrowSchema);
381+
resultSource = new ArrowResultConverter(
382+
this.context,
383+
new ArrowResultHandler(this.context, this._data, metadata.arrowSchema),
384+
metadata.schema,
385+
);
381386
break;
382387
case TSparkRowSetType.URL_BASED_SET:
383-
resultSource = new CloudFetchResultHandler(this.context, this._data, metadata.schema);
388+
resultSource = new ArrowResultConverter(
389+
this.context,
390+
new CloudFetchResultHandler(this.context, this._data),
391+
metadata.schema,
392+
);
384393
break;
385394
// no default
386395
}

lib/result/ArrowResultConverter.ts

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
import { Buffer } from 'buffer';
2+
import {
3+
Table,
4+
Schema,
5+
Field,
6+
TypeMap,
7+
DataType,
8+
Type,
9+
StructRow,
10+
MapRow,
11+
Vector,
12+
RecordBatch,
13+
RecordBatchReader,
14+
util as arrowUtils,
15+
} from 'apache-arrow';
16+
import { TTableSchema, TColumnDesc } from '../../thrift/TCLIService_types';
17+
import IClientContext from '../contracts/IClientContext';
18+
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
19+
import { getSchemaColumns, convertThriftValue } from './utils';
20+
21+
const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils;
22+
23+
type ArrowSchema = Schema<TypeMap>;
24+
type ArrowSchemaField = Field<DataType<Type, TypeMap>>;
25+
26+
export default class ArrowResultConverter implements IResultsProvider<Array<any>> {
27+
protected readonly context: IClientContext;
28+
29+
private readonly source: IResultsProvider<Array<Buffer>>;
30+
31+
private readonly schema: Array<TColumnDesc>;
32+
33+
private reader?: IterableIterator<RecordBatch<TypeMap>>;
34+
35+
private pendingRecordBatch?: RecordBatch<TypeMap>;
36+
37+
constructor(context: IClientContext, source: IResultsProvider<Array<Buffer>>, schema?: TTableSchema) {
38+
this.context = context;
39+
this.source = source;
40+
this.schema = getSchemaColumns(schema);
41+
}
42+
43+
public async hasMore() {
44+
if (this.schema.length === 0) {
45+
return false;
46+
}
47+
if (this.pendingRecordBatch) {
48+
return true;
49+
}
50+
return this.source.hasMore();
51+
}
52+
53+
public async fetchNext(options: ResultsProviderFetchNextOptions) {
54+
if (this.schema.length === 0) {
55+
return [];
56+
}
57+
58+
// eslint-disable-next-line no-constant-condition
59+
while (true) {
60+
// It's not possible to know if iterator has more items until trying
61+
// to get the next item. But we need to know if iterator is empty right
62+
// after getting the next item. Therefore, after creating the iterator,
63+
// we get one item more and store it in `pendingRecordBatch`. Next time,
64+
// we use that stored item, and prefetch the next one. Prefetched item
65+
// is therefore the next item we are going to return, so it can be used
66+
// to know if we actually can return anything next time
67+
const recordBatch = this.pendingRecordBatch;
68+
this.pendingRecordBatch = this.prefetch();
69+
70+
if (recordBatch) {
71+
const table = new Table(recordBatch);
72+
return this.getRows(table.schema, table.toArray());
73+
}
74+
75+
// eslint-disable-next-line no-await-in-loop
76+
const batches = await this.source.fetchNext(options);
77+
if (batches.length === 0) {
78+
this.reader = undefined;
79+
break;
80+
}
81+
82+
const reader = RecordBatchReader.from<TypeMap>(batches);
83+
this.reader = reader[Symbol.iterator]();
84+
this.pendingRecordBatch = this.prefetch();
85+
}
86+
87+
return [];
88+
}
89+
90+
private prefetch(): RecordBatch<TypeMap> | undefined {
91+
const item = this.reader?.next() ?? { done: true, value: undefined };
92+
93+
if (item.done || item.value === undefined) {
94+
this.reader = undefined;
95+
return undefined;
96+
}
97+
98+
return item.value;
99+
}
100+
101+
private getRows(schema: ArrowSchema, rows: Array<StructRow | MapRow>): Array<any> {
102+
return rows.map((row) => {
103+
// First, convert native Arrow values to corresponding plain JS objects
104+
const record = this.convertArrowTypes(row, undefined, schema.fields);
105+
// Second, cast all the values to original Thrift types
106+
return this.convertThriftTypes(record);
107+
});
108+
}
109+
110+
private convertArrowTypes(value: any, valueType: DataType | undefined, fields: Array<ArrowSchemaField> = []): any {
111+
if (value === null) {
112+
return value;
113+
}
114+
115+
const fieldsMap: Record<string, ArrowSchemaField> = {};
116+
for (const field of fields) {
117+
fieldsMap[field.name] = field;
118+
}
119+
120+
// Convert structures to plain JS object and process all its fields recursively
121+
if (value instanceof StructRow) {
122+
const result = value.toJSON();
123+
for (const key of Object.keys(result)) {
124+
const field: ArrowSchemaField | undefined = fieldsMap[key];
125+
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
126+
}
127+
return result;
128+
}
129+
if (value instanceof MapRow) {
130+
const result = value.toJSON();
131+
// Map type consists of its key and value types. We need only value type here, key will be cast to string anyway
132+
const field = fieldsMap.entries?.type.children.find((item) => item.name === 'value');
133+
for (const key of Object.keys(result)) {
134+
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
135+
}
136+
return result;
137+
}
138+
139+
// Convert lists to JS array and process items recursively
140+
if (value instanceof Vector) {
141+
const result = value.toJSON();
142+
// Array type contains the only child which defines a type of each array's element
143+
const field = fieldsMap.element;
144+
return result.map((item) => this.convertArrowTypes(item, field?.type, field?.type.children || []));
145+
}
146+
147+
if (DataType.isTimestamp(valueType)) {
148+
return new Date(value);
149+
}
150+
151+
// Convert big number values to BigInt
152+
// Decimals are also represented as big numbers in Arrow, so additionally process them (convert to float)
153+
if (value instanceof Object && value[isArrowBigNumSymbol]) {
154+
const result = bigNumToBigInt(value);
155+
if (DataType.isDecimal(valueType)) {
156+
return Number(result) / 10 ** valueType.scale;
157+
}
158+
return result;
159+
}
160+
161+
// Convert binary data to Buffer
162+
if (value instanceof Uint8Array) {
163+
return Buffer.from(value);
164+
}
165+
166+
// Return other values as is
167+
return typeof value === 'bigint' ? Number(value) : value;
168+
}
169+
170+
private convertThriftTypes(record: Record<string, any>): any {
171+
const result: Record<string, any> = {};
172+
173+
this.schema.forEach((column) => {
174+
const typeDescriptor = column.typeDesc.types[0]?.primitiveEntry;
175+
const field = column.columnName;
176+
const value = record[field];
177+
result[field] = value === null ? null : convertThriftValue(typeDescriptor, value);
178+
});
179+
180+
return result;
181+
}
182+
}

lib/result/ArrowResultHandler.ts

Lines changed: 13 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -1,158 +1,46 @@
11
import { Buffer } from 'buffer';
2-
import {
3-
tableFromIPC,
4-
Schema,
5-
Field,
6-
TypeMap,
7-
DataType,
8-
Type,
9-
StructRow,
10-
MapRow,
11-
Vector,
12-
util as arrowUtils,
13-
} from 'apache-arrow';
14-
import { TRowSet, TTableSchema, TColumnDesc } from '../../thrift/TCLIService_types';
2+
import { TRowSet } from '../../thrift/TCLIService_types';
153
import IClientContext from '../contracts/IClientContext';
164
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
17-
import { getSchemaColumns, convertThriftValue } from './utils';
185

19-
const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils;
20-
21-
type ArrowSchema = Schema<TypeMap>;
22-
type ArrowSchemaField = Field<DataType<Type, TypeMap>>;
23-
24-
export default class ArrowResultHandler implements IResultsProvider<Array<any>> {
6+
export default class ArrowResultHandler implements IResultsProvider<Array<Buffer>> {
257
protected readonly context: IClientContext;
268

279
private readonly source: IResultsProvider<TRowSet | undefined>;
2810

29-
private readonly schema: Array<TColumnDesc>;
30-
3111
private readonly arrowSchema?: Buffer;
3212

33-
constructor(
34-
context: IClientContext,
35-
source: IResultsProvider<TRowSet | undefined>,
36-
schema?: TTableSchema,
37-
arrowSchema?: Buffer,
38-
) {
13+
constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>, arrowSchema?: Buffer) {
3914
this.context = context;
4015
this.source = source;
41-
this.schema = getSchemaColumns(schema);
4216
this.arrowSchema = arrowSchema;
4317
}
4418

4519
public async hasMore() {
20+
if (!this.arrowSchema) {
21+
return false;
22+
}
4623
return this.source.hasMore();
4724
}
4825

4926
public async fetchNext(options: ResultsProviderFetchNextOptions) {
50-
if (this.schema.length === 0 || !this.arrowSchema) {
51-
return [];
52-
}
53-
54-
const data = await this.source.fetchNext(options);
55-
56-
const batches = await this.getBatches(data);
57-
if (batches.length === 0) {
27+
if (!this.arrowSchema) {
5828
return [];
5929
}
6030

61-
const table = tableFromIPC<TypeMap>([this.arrowSchema, ...batches]);
62-
return this.getRows(table.schema, table.toArray());
63-
}
64-
65-
protected async getBatches(rowSet?: TRowSet): Promise<Array<Buffer>> {
66-
const result: Array<Buffer> = [];
31+
const rowSet = await this.source.fetchNext(options);
6732

33+
const batches: Array<Buffer> = [];
6834
rowSet?.arrowBatches?.forEach((arrowBatch) => {
6935
if (arrowBatch.batch) {
70-
result.push(arrowBatch.batch);
36+
batches.push(arrowBatch.batch);
7137
}
7238
});
7339

74-
return result;
75-
}
76-
77-
private getRows(schema: ArrowSchema, rows: Array<StructRow | MapRow>): Array<any> {
78-
return rows.map((row) => {
79-
// First, convert native Arrow values to corresponding plain JS objects
80-
const record = this.convertArrowTypes(row, undefined, schema.fields);
81-
// Second, cast all the values to original Thrift types
82-
return this.convertThriftTypes(record);
83-
});
84-
}
85-
86-
private convertArrowTypes(value: any, valueType: DataType | undefined, fields: Array<ArrowSchemaField> = []): any {
87-
if (value === null) {
88-
return value;
89-
}
90-
91-
const fieldsMap: Record<string, ArrowSchemaField> = {};
92-
for (const field of fields) {
93-
fieldsMap[field.name] = field;
94-
}
95-
96-
// Convert structures to plain JS object and process all its fields recursively
97-
if (value instanceof StructRow) {
98-
const result = value.toJSON();
99-
for (const key of Object.keys(result)) {
100-
const field: ArrowSchemaField | undefined = fieldsMap[key];
101-
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
102-
}
103-
return result;
104-
}
105-
if (value instanceof MapRow) {
106-
const result = value.toJSON();
107-
// Map type consists of its key and value types. We need only value type here, key will be cast to string anyway
108-
const field = fieldsMap.entries?.type.children.find((item) => item.name === 'value');
109-
for (const key of Object.keys(result)) {
110-
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
111-
}
112-
return result;
113-
}
114-
115-
// Convert lists to JS array and process items recursively
116-
if (value instanceof Vector) {
117-
const result = value.toJSON();
118-
// Array type contains the only child which defines a type of each array's element
119-
const field = fieldsMap.element;
120-
return result.map((item) => this.convertArrowTypes(item, field?.type, field?.type.children || []));
121-
}
122-
123-
if (DataType.isTimestamp(valueType)) {
124-
return new Date(value);
125-
}
126-
127-
// Convert big number values to BigInt
128-
// Decimals are also represented as big numbers in Arrow, so additionally process them (convert to float)
129-
if (value instanceof Object && value[isArrowBigNumSymbol]) {
130-
const result = bigNumToBigInt(value);
131-
if (DataType.isDecimal(valueType)) {
132-
return Number(result) / 10 ** valueType.scale;
133-
}
134-
return result;
135-
}
136-
137-
// Convert binary data to Buffer
138-
if (value instanceof Uint8Array) {
139-
return Buffer.from(value);
40+
if (batches.length === 0) {
41+
return [];
14042
}
14143

142-
// Return other values as is
143-
return typeof value === 'bigint' ? Number(value) : value;
144-
}
145-
146-
private convertThriftTypes(record: Record<string, any>): any {
147-
const result: Record<string, any> = {};
148-
149-
this.schema.forEach((column) => {
150-
const typeDescriptor = column.typeDesc.types[0]?.primitiveEntry;
151-
const field = column.columnName;
152-
const value = record[field];
153-
result[field] = value === null ? null : convertThriftValue(typeDescriptor, value);
154-
});
155-
156-
return result;
44+
return [this.arrowSchema, ...batches];
15745
}
15846
}

0 commit comments

Comments
 (0)