Skip to content

Commit 57c21d7

Browse files
DBSQLOperation Refactoring (3 of 3) (#198)
* Refactoring: Introduce concept of results provider; convert FetchResultsHelper into provider of TRowSet Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Convert Json/Arrow/CloudFetch result handlers to implement result provider interface Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Refine the code and update tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent e169f69 commit 57c21d7

15 files changed

Lines changed: 254 additions & 197 deletions

lib/DBSQLOperation/index.ts

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ import {
1616
TOperationState,
1717
} from '../../thrift/TCLIService_types';
1818
import Status from '../dto/Status';
19-
import FetchResultsHelper from './FetchResultsHelper';
2019
import { LogLevel } from '../contracts/IDBSQLLogger';
2120
import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError';
22-
import IOperationResult from '../result/IOperationResult';
23-
import JsonResult from '../result/JsonResult';
24-
import ArrowResult from '../result/ArrowResult';
25-
import CloudFetchResult from '../result/CloudFetchResult';
21+
import IResultsProvider from '../result/IResultsProvider';
22+
import RowSetProvider from '../result/RowSetProvider';
23+
import JsonResultHandler from '../result/JsonResultHandler';
24+
import ArrowResultHandler from '../result/ArrowResultHandler';
25+
import CloudFetchResultHandler from '../result/CloudFetchResultHandler';
2626
import { definedOrError } from '../utils';
2727
import HiveDriverError from '../errors/HiveDriverError';
2828
import IClientContext from '../contracts/IClientContext';
@@ -50,7 +50,7 @@ export default class DBSQLOperation implements IOperation {
5050

5151
public onClose?: () => void;
5252

53-
private readonly _data: FetchResultsHelper;
53+
private readonly _data: RowSetProvider;
5454

5555
private readonly closeOperation?: TCloseOperationResp;
5656

@@ -68,7 +68,7 @@ export default class DBSQLOperation implements IOperation {
6868

6969
private hasResultSet: boolean = false;
7070

71-
private resultHandler?: IOperationResult;
71+
private resultHandler?: IResultsProvider<Array<any>>;
7272

7373
constructor({ handle, directResults, context }: DBSQLOperationConstructorOptions) {
7474
this.operationHandle = handle;
@@ -82,7 +82,7 @@ export default class DBSQLOperation implements IOperation {
8282
}
8383

8484
this.metadata = directResults?.resultSetMetadata;
85-
this._data = new FetchResultsHelper(
85+
this._data = new RowSetProvider(
8686
this.context,
8787
this.operationHandle,
8888
[directResults?.resultSet],
@@ -135,14 +135,12 @@ export default class DBSQLOperation implements IOperation {
135135

136136
await this.waitUntilReady(options);
137137

138-
const [resultHandler, data] = await Promise.all([
139-
this.getResultHandler(),
140-
this._data.fetch(options?.maxRows || defaultMaxRows),
141-
]);
138+
const resultHandler = await this.getResultHandler();
139+
await this.failIfClosed();
142140

141+
const result = resultHandler.fetchNext({ limit: options?.maxRows || defaultMaxRows });
143142
await this.failIfClosed();
144143

145-
const result = await resultHandler.getValue(data ? [data] : []);
146144
this.context
147145
.getLogger()
148146
.log(
@@ -234,14 +232,9 @@ export default class DBSQLOperation implements IOperation {
234232
return false;
235233
}
236234

237-
// Return early if there are still data available for fetching
238-
if (this._data.hasMoreRows) {
239-
return true;
240-
}
241-
242235
// If we fetched all the data from server - check if there's anything buffered in result handler
243236
const resultHandler = await this.getResultHandler();
244-
return resultHandler.hasPendingData();
237+
return resultHandler.hasMore();
245238
}
246239

247240
public async getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null> {
@@ -342,20 +335,20 @@ export default class DBSQLOperation implements IOperation {
342335
return this.metadata;
343336
}
344337

345-
private async getResultHandler(): Promise<IOperationResult> {
338+
private async getResultHandler(): Promise<IResultsProvider<Array<any>>> {
346339
const metadata = await this.fetchMetadata();
347340
const resultFormat = definedOrError(metadata.resultFormat);
348341

349342
if (!this.resultHandler) {
350343
switch (resultFormat) {
351344
case TSparkRowSetType.COLUMN_BASED_SET:
352-
this.resultHandler = new JsonResult(this.context, metadata.schema);
345+
this.resultHandler = new JsonResultHandler(this.context, this._data, metadata.schema);
353346
break;
354347
case TSparkRowSetType.ARROW_BASED_SET:
355-
this.resultHandler = new ArrowResult(this.context, metadata.schema, metadata.arrowSchema);
348+
this.resultHandler = new ArrowResultHandler(this.context, this._data, metadata.schema, metadata.arrowSchema);
356349
break;
357350
case TSparkRowSetType.URL_BASED_SET:
358-
this.resultHandler = new CloudFetchResult(this.context, metadata.schema);
351+
this.resultHandler = new CloudFetchResultHandler(this.context, this._data, metadata.schema);
359352
break;
360353
default:
361354
this.resultHandler = undefined;
Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,36 +13,46 @@ import {
1313
} from 'apache-arrow';
1414
import { TRowSet, TTableSchema, TColumnDesc } from '../../thrift/TCLIService_types';
1515
import IClientContext from '../contracts/IClientContext';
16-
import IOperationResult from './IOperationResult';
16+
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
1717
import { getSchemaColumns, convertThriftValue } from './utils';
1818

1919
const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils;
2020

2121
type ArrowSchema = Schema<TypeMap>;
2222
type ArrowSchemaField = Field<DataType<Type, TypeMap>>;
2323

24-
export default class ArrowResult implements IOperationResult {
24+
export default class ArrowResultHandler implements IResultsProvider<Array<any>> {
2525
protected readonly context: IClientContext;
2626

27+
private readonly source: IResultsProvider<TRowSet | undefined>;
28+
2729
private readonly schema: Array<TColumnDesc>;
2830

2931
private readonly arrowSchema?: Buffer;
3032

31-
constructor(context: IClientContext, schema?: TTableSchema, arrowSchema?: Buffer) {
33+
constructor(
34+
context: IClientContext,
35+
source: IResultsProvider<TRowSet | undefined>,
36+
schema?: TTableSchema,
37+
arrowSchema?: Buffer,
38+
) {
3239
this.context = context;
40+
this.source = source;
3341
this.schema = getSchemaColumns(schema);
3442
this.arrowSchema = arrowSchema;
3543
}
3644

37-
async hasPendingData() {
38-
return false;
45+
public async hasMore() {
46+
return this.source.hasMore();
3947
}
4048

41-
async getValue(data?: Array<TRowSet>) {
42-
if (this.schema.length === 0 || !this.arrowSchema || !data) {
49+
public async fetchNext(options: ResultsProviderFetchNextOptions) {
50+
if (this.schema.length === 0 || !this.arrowSchema) {
4351
return [];
4452
}
4553

54+
const data = await this.source.fetchNext(options);
55+
4656
const batches = await this.getBatches(data);
4757
if (batches.length === 0) {
4858
return [];
@@ -52,15 +62,13 @@ export default class ArrowResult implements IOperationResult {
5262
return this.getRows(table.schema, table.toArray());
5363
}
5464

55-
protected async getBatches(data: Array<TRowSet>): Promise<Array<Buffer>> {
65+
protected async getBatches(rowSet?: TRowSet): Promise<Array<Buffer>> {
5666
const result: Array<Buffer> = [];
5767

58-
data.forEach((rowSet) => {
59-
rowSet.arrowBatches?.forEach((arrowBatch) => {
60-
if (arrowBatch.batch) {
61-
result.push(arrowBatch.batch);
62-
}
63-
});
68+
rowSet?.arrowBatches?.forEach((arrowBatch) => {
69+
if (arrowBatch.batch) {
70+
result.push(arrowBatch.batch);
71+
}
6472
});
6573

6674
return result;
Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,31 @@ import { Buffer } from 'buffer';
22
import fetch, { RequestInfo, RequestInit } from 'node-fetch';
33
import { TRowSet, TSparkArrowResultLink, TTableSchema } from '../../thrift/TCLIService_types';
44
import IClientContext from '../contracts/IClientContext';
5-
import ArrowResult from './ArrowResult';
5+
import IResultsProvider from './IResultsProvider';
6+
import ArrowResultHandler from './ArrowResultHandler';
67
import globalConfig from '../globalConfig';
78

8-
export default class CloudFetchResult extends ArrowResult {
9+
export default class CloudFetchResultHandler extends ArrowResultHandler {
910
private pendingLinks: Array<TSparkArrowResultLink> = [];
1011

1112
private downloadedBatches: Array<Buffer> = [];
1213

13-
constructor(context: IClientContext, schema?: TTableSchema) {
14+
constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>, schema?: TTableSchema) {
1415
// Arrow schema returned in metadata is not needed for CloudFetch results:
1516
// each batch already contains schema and could be decoded as is
16-
super(context, schema, Buffer.alloc(0));
17+
super(context, source, schema, Buffer.alloc(0));
1718
}
1819

19-
async hasPendingData() {
20-
return this.pendingLinks.length > 0 || this.downloadedBatches.length > 0;
20+
public async hasMore() {
21+
if (this.pendingLinks.length > 0 || this.downloadedBatches.length > 0) {
22+
return true;
23+
}
24+
return super.hasMore();
2125
}
2226

23-
protected async getBatches(data: Array<TRowSet>): Promise<Array<Buffer>> {
24-
data.forEach((item) => {
25-
item.resultLinks?.forEach((link) => {
26-
this.pendingLinks.push(link);
27-
});
27+
protected async getBatches(data?: TRowSet): Promise<Array<Buffer>> {
28+
data?.resultLinks?.forEach((link) => {
29+
this.pendingLinks.push(link);
2830
});
2931

3032
if (this.downloadedBatches.length === 0) {

lib/result/IOperationResult.ts

Lines changed: 0 additions & 7 deletions
This file was deleted.

lib/result/IResultsProvider.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
export interface ResultsProviderFetchNextOptions {
2+
limit: number;
3+
}
4+
5+
export default interface IResultsProvider<T> {
6+
fetchNext(options: ResultsProviderFetchNextOptions): Promise<T>;
7+
8+
hasMore(): Promise<boolean>;
9+
}
Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,38 @@
11
import { ColumnCode } from '../hive/Types';
22
import { TRowSet, TTableSchema, TColumn, TColumnDesc } from '../../thrift/TCLIService_types';
33
import IClientContext from '../contracts/IClientContext';
4-
import IOperationResult from './IOperationResult';
4+
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
55
import { getSchemaColumns, convertThriftValue } from './utils';
66

7-
export default class JsonResult implements IOperationResult {
7+
export default class JsonResultHandler implements IResultsProvider<Array<any>> {
88
private readonly context: IClientContext;
99

10+
private readonly source: IResultsProvider<TRowSet | undefined>;
11+
1012
private readonly schema: Array<TColumnDesc>;
1113

12-
constructor(context: IClientContext, schema?: TTableSchema) {
14+
constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>, schema?: TTableSchema) {
1315
this.context = context;
16+
this.source = source;
1417
this.schema = getSchemaColumns(schema);
1518
}
1619

17-
async hasPendingData() {
18-
return false;
20+
public async hasMore() {
21+
return this.source.hasMore();
1922
}
2023

21-
async getValue(data?: Array<TRowSet>): Promise<Array<object>> {
22-
if (this.schema.length === 0 || !data) {
24+
public async fetchNext(options: ResultsProviderFetchNextOptions) {
25+
if (this.schema.length === 0) {
2326
return [];
2427
}
2528

26-
return data.reduce((result: Array<any>, rowSet: TRowSet) => {
27-
const columns = rowSet.columns || [];
28-
const rows = this.getRows(columns, this.schema);
29+
const data = await this.source.fetchNext(options);
30+
if (!data) {
31+
return [];
32+
}
2933

30-
return result.concat(rows);
31-
}, []);
34+
const columns = data.columns || [];
35+
return this.getRows(columns, this.schema);
3236
}
3337

3438
private getRows(columns: Array<TColumn>, descriptors: Array<TColumnDesc>): Array<any> {
Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
import { ColumnCode, FetchType, Int64 } from '../hive/Types';
99
import Status from '../dto/Status';
1010
import IClientContext from '../contracts/IClientContext';
11+
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
1112

1213
function checkIfOperationHasMoreRows(response: TFetchResultsResp): boolean {
1314
if (response.hasMoreRows) {
@@ -35,7 +36,7 @@ function checkIfOperationHasMoreRows(response: TFetchResultsResp): boolean {
3536
return (columnValue?.values?.length || 0) > 0;
3637
}
3738

38-
export default class FetchResultsHelper {
39+
export default class RowSetProvider implements IResultsProvider<TRowSet | undefined> {
3940
private readonly context: IClientContext;
4041

4142
private readonly operationHandle: TOperationHandle;
@@ -79,7 +80,7 @@ export default class FetchResultsHelper {
7980
return response.results;
8081
}
8182

82-
public async fetch(maxRows: number) {
83+
public async fetchNext({ limit }: ResultsProviderFetchNextOptions) {
8384
const prefetchedResponse = this.prefetchedResults.shift();
8485
if (prefetchedResponse) {
8586
return this.processFetchResponse(prefetchedResponse);
@@ -89,10 +90,14 @@ export default class FetchResultsHelper {
8990
const response = await driver.fetchResults({
9091
operationHandle: this.operationHandle,
9192
orientation: this.fetchOrientation,
92-
maxRows: new Int64(maxRows),
93+
maxRows: new Int64(limit),
9394
fetchType: FetchType.Data,
9495
});
9596

9697
return this.processFetchResponse(response);
9798
}
99+
100+
public async hasMore() {
101+
return this.hasMoreRows;
102+
}
98103
}

tests/e2e/arrow.test.js

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
const { expect } = require('chai');
2+
const sinon = require('sinon');
23
const config = require('./utils/config');
34
const logger = require('./utils/logger')(config.logger);
45
const { DBSQLClient } = require('../..');
5-
const ArrowResult = require('../../dist/result/ArrowResult').default;
6+
const ArrowResultHandler = require('../../dist/result/ArrowResultHandler').default;
67
const globalConfig = require('../../dist/globalConfig').default;
78

89
const fixtures = require('../fixtures/compatibility');
@@ -76,7 +77,7 @@ describe('Arrow support', () => {
7677
expect(result).to.deep.equal(expectedColumn);
7778

7879
const resultHandler = await operation.getResultHandler();
79-
expect(resultHandler).to.be.not.instanceof(ArrowResult);
80+
expect(resultHandler).to.be.not.instanceof(ArrowResultHandler);
8081

8182
await operation.close();
8283
}),
@@ -93,7 +94,7 @@ describe('Arrow support', () => {
9394
expect(fixArrowResult(result)).to.deep.equal(expectedArrow);
9495

9596
const resultHandler = await operation.getResultHandler();
96-
expect(resultHandler).to.be.instanceof(ArrowResult);
97+
expect(resultHandler).to.be.instanceof(ArrowResultHandler);
9798

9899
await operation.close();
99100
}),
@@ -110,7 +111,7 @@ describe('Arrow support', () => {
110111
expect(fixArrowResult(result)).to.deep.equal(expectedArrowNativeTypes);
111112

112113
const resultHandler = await operation.getResultHandler();
113-
expect(resultHandler).to.be.instanceof(ArrowResult);
114+
expect(resultHandler).to.be.instanceof(ArrowResultHandler);
114115

115116
await operation.close();
116117
}),
@@ -130,14 +131,18 @@ describe('Arrow support', () => {
130131

131132
// We use some internals here to check that server returned response with multiple batches
132133
const resultHandler = await operation.getResultHandler();
133-
expect(resultHandler).to.be.instanceof(ArrowResult);
134+
expect(resultHandler).to.be.instanceof(ArrowResultHandler);
134135

135-
const rawData = await operation._data.fetch(rowsCount);
136+
sinon.spy(operation._data, 'fetchNext');
137+
138+
const result = await resultHandler.fetchNext({ limit: rowsCount });
139+
140+
expect(operation._data.fetchNext.callCount).to.be.eq(1);
141+
const rawData = await operation._data.fetchNext.firstCall.returnValue;
136142
// We don't know exact count of batches returned, it depends on server's configuration,
137143
// but with much enough rows there should be more than one result batch
138144
expect(rawData.arrowBatches?.length).to.be.gt(1);
139145

140-
const result = await resultHandler.getValue([rawData]);
141146
expect(result.length).to.be.eq(rowsCount);
142147
});
143148
});

0 commit comments

Comments
 (0)