Skip to content

Commit 9b03de3

Browse files
Make sure that DBSQLOperation.fetchChunk returns chunks of requested size (#200)
* Refactoring: Introduce concept of results provider; convert FetchResultsHelper into provider of TRowSet Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Convert Json/Arrow/CloudFetch result handlers to implement result provider interface Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Refine the code and update tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Make sure that DBSQLOperation.fetchChunk returns chunks of requested size Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add option to disable result buffering & slicing Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Update existing tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add tests for ResultSlicer Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Refine code Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Add more tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent 8e6098a commit 9b03de3

8 files changed

Lines changed: 272 additions & 45 deletions

File tree

lib/DBSQLOperation/index.ts

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import RowSetProvider from '../result/RowSetProvider';
2323
import JsonResultHandler from '../result/JsonResultHandler';
2424
import ArrowResultHandler from '../result/ArrowResultHandler';
2525
import CloudFetchResultHandler from '../result/CloudFetchResultHandler';
26+
import ResultSlicer from '../result/ResultSlicer';
2627
import { definedOrError } from '../utils';
2728
import HiveDriverError from '../errors/HiveDriverError';
2829
import IClientContext from '../contracts/IClientContext';
@@ -68,7 +69,7 @@ export default class DBSQLOperation implements IOperation {
6869

6970
private hasResultSet: boolean = false;
7071

71-
private resultHandler?: IResultsProvider<Array<any>>;
72+
private resultHandler?: ResultSlicer<any>;
7273

7374
constructor({ handle, directResults, context }: DBSQLOperationConstructorOptions) {
7475
this.operationHandle = handle;
@@ -107,9 +108,17 @@ export default class DBSQLOperation implements IOperation {
107108
*/
108109
public async fetchAll(options?: FetchOptions): Promise<Array<object>> {
109110
const data: Array<Array<object>> = [];
111+
112+
const fetchChunkOptions = {
113+
...options,
114+
// Tell slicer to return raw chunks. We're going to process all of them anyway,
115+
// so no need to additionally buffer and slice chunks returned by server
116+
disableBuffering: true,
117+
};
118+
110119
do {
111120
// eslint-disable-next-line no-await-in-loop
112-
const chunk = await this.fetchChunk(options);
121+
const chunk = await this.fetchChunk(fetchChunkOptions);
113122
data.push(chunk);
114123
} while (await this.hasMoreRows()); // eslint-disable-line no-await-in-loop
115124
this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.getId()}`);
@@ -138,7 +147,10 @@ export default class DBSQLOperation implements IOperation {
138147
const resultHandler = await this.getResultHandler();
139148
await this.failIfClosed();
140149

141-
const result = resultHandler.fetchNext({ limit: options?.maxRows || defaultMaxRows });
150+
const result = resultHandler.fetchNext({
151+
limit: options?.maxRows || defaultMaxRows,
152+
disableBuffering: options?.disableBuffering,
153+
});
142154
await this.failIfClosed();
143155

144156
this.context
@@ -335,24 +347,28 @@ export default class DBSQLOperation implements IOperation {
335347
return this.metadata;
336348
}
337349

338-
private async getResultHandler(): Promise<IResultsProvider<Array<any>>> {
350+
private async getResultHandler(): Promise<ResultSlicer<any>> {
339351
const metadata = await this.fetchMetadata();
340352
const resultFormat = definedOrError(metadata.resultFormat);
341353

342354
if (!this.resultHandler) {
355+
let resultSource: IResultsProvider<Array<any>> | undefined;
356+
343357
switch (resultFormat) {
344358
case TSparkRowSetType.COLUMN_BASED_SET:
345-
this.resultHandler = new JsonResultHandler(this.context, this._data, metadata.schema);
359+
resultSource = new JsonResultHandler(this.context, this._data, metadata.schema);
346360
break;
347361
case TSparkRowSetType.ARROW_BASED_SET:
348-
this.resultHandler = new ArrowResultHandler(this.context, this._data, metadata.schema, metadata.arrowSchema);
362+
resultSource = new ArrowResultHandler(this.context, this._data, metadata.schema, metadata.arrowSchema);
349363
break;
350364
case TSparkRowSetType.URL_BASED_SET:
351-
this.resultHandler = new CloudFetchResultHandler(this.context, this._data, metadata.schema);
352-
break;
353-
default:
354-
this.resultHandler = undefined;
365+
resultSource = new CloudFetchResultHandler(this.context, this._data, metadata.schema);
355366
break;
367+
// no default
368+
}
369+
370+
if (resultSource) {
371+
this.resultHandler = new ResultSlicer(this.context, resultSource);
356372
}
357373
}
358374

lib/contracts/IOperation.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ export interface FinishedOptions extends WaitUntilReadyOptions {
1414

1515
export interface FetchOptions extends WaitUntilReadyOptions {
1616
maxRows?: number;
17+
// Disables internal buffer used to ensure a consistent chunks size.
18+
// When set to `true`, returned chunks size may vary (and may differ from `maxRows`)
19+
disableBuffering?: boolean;
1720
}
1821

1922
export interface GetSchemaOptions extends WaitUntilReadyOptions {

lib/result/ResultSlicer.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import IClientContext from '../contracts/IClientContext';
2+
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
3+
4+
export interface ResultSlicerFetchNextOptions extends ResultsProviderFetchNextOptions {
5+
// Setting this to `true` will disable slicer, and it will return unprocessed chunks
6+
// from underlying results provider
7+
disableBuffering?: boolean;
8+
}
9+
10+
export default class ResultSlicer<T> implements IResultsProvider<Array<T>> {
11+
private readonly context: IClientContext;
12+
13+
private readonly source: IResultsProvider<Array<T>>;
14+
15+
private remainingResults: Array<T> = [];
16+
17+
constructor(context: IClientContext, source: IResultsProvider<Array<T>>) {
18+
this.context = context;
19+
this.source = source;
20+
}
21+
22+
public async hasMore(): Promise<boolean> {
23+
if (this.remainingResults.length > 0) {
24+
return true;
25+
}
26+
return this.source.hasMore();
27+
}
28+
29+
public async fetchNext(options: ResultSlicerFetchNextOptions): Promise<Array<T>> {
30+
// If we're asked to not use buffer - first try to return whatever we have in buffer.
31+
// If buffer is empty - just proxy the call to underlying results provider
32+
if (options.disableBuffering) {
33+
if (this.remainingResults.length > 0) {
34+
const result = this.remainingResults;
35+
this.remainingResults = [];
36+
return result;
37+
}
38+
39+
return this.source.fetchNext(options);
40+
}
41+
42+
const result: Array<Array<T>> = [];
43+
let resultsCount = 0;
44+
45+
// First, use remaining items from the previous fetch
46+
if (this.remainingResults.length > 0) {
47+
result.push(this.remainingResults);
48+
resultsCount += this.remainingResults.length;
49+
this.remainingResults = [];
50+
}
51+
52+
// Fetch items from source results provider until we reach a requested count
53+
while (resultsCount < options.limit) {
54+
// eslint-disable-next-line no-await-in-loop
55+
const chunk = await this.source.fetchNext(options);
56+
if (chunk.length === 0) {
57+
break;
58+
}
59+
60+
result.push(chunk);
61+
resultsCount += chunk.length;
62+
}
63+
64+
// If we collected more results than requested, slice the excess items and store them for the next time
65+
if (resultsCount > options.limit) {
66+
const lastChunk = result.pop() ?? [];
67+
const neededCount = options.limit - (resultsCount - lastChunk.length);
68+
result.push(lastChunk.splice(0, neededCount));
69+
this.remainingResults = lastChunk;
70+
}
71+
72+
return result.flat();
73+
}
74+
}

tests/e2e/arrow.test.js

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const config = require('./utils/config');
44
const logger = require('./utils/logger')(config.logger);
55
const { DBSQLClient } = require('../..');
66
const ArrowResultHandler = require('../../dist/result/ArrowResultHandler').default;
7+
const ResultSlicer = require('../../dist/result/ResultSlicer').default;
78

89
const fixtures = require('../fixtures/compatibility');
910
const { expected: expectedColumn } = require('../fixtures/compatibility/column');
@@ -81,7 +82,8 @@ describe('Arrow support', () => {
8182
expect(result).to.deep.equal(expectedColumn);
8283

8384
const resultHandler = await operation.getResultHandler();
84-
expect(resultHandler).to.be.not.instanceof(ArrowResultHandler);
85+
expect(resultHandler).to.be.instanceof(ResultSlicer);
86+
expect(resultHandler.source).to.be.not.instanceof(ArrowResultHandler);
8587

8688
await operation.close();
8789
},
@@ -100,7 +102,8 @@ describe('Arrow support', () => {
100102
expect(fixArrowResult(result)).to.deep.equal(expectedArrow);
101103

102104
const resultHandler = await operation.getResultHandler();
103-
expect(resultHandler).to.be.instanceof(ArrowResultHandler);
105+
expect(resultHandler).to.be.instanceof(ResultSlicer);
106+
expect(resultHandler.source).to.be.instanceof(ArrowResultHandler);
104107

105108
await operation.close();
106109
},
@@ -120,7 +123,8 @@ describe('Arrow support', () => {
120123
expect(fixArrowResult(result)).to.deep.equal(expectedArrowNativeTypes);
121124

122125
const resultHandler = await operation.getResultHandler();
123-
expect(resultHandler).to.be.instanceof(ArrowResultHandler);
126+
expect(resultHandler).to.be.instanceof(ResultSlicer);
127+
expect(resultHandler.source).to.be.instanceof(ArrowResultHandler);
124128

125129
await operation.close();
126130
},
@@ -145,7 +149,8 @@ describe('Arrow support', () => {
145149

146150
// We use some internals here to check that server returned response with multiple batches
147151
const resultHandler = await operation.getResultHandler();
148-
expect(resultHandler).to.be.instanceof(ArrowResultHandler);
152+
expect(resultHandler).to.be.instanceof(ResultSlicer);
153+
expect(resultHandler.source).to.be.instanceof(ArrowResultHandler);
149154

150155
sinon.spy(operation._data, 'fetchNext');
151156

tests/e2e/batched_fetch.test.js

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ describe('Data fetching', () => {
3838
try {
3939
// set `maxRows` to null to disable direct results so all the data are fetched through `driver.fetchResults`
4040
const operation = await session.executeStatement(query, { maxRows: null });
41-
let chunkedOp = await operation.fetchChunk({ maxRows: 10 }).catch((error) => logger(error));
41+
let chunkedOp = await operation
42+
.fetchChunk({ maxRows: 10, disableBuffering: true })
43+
.catch((error) => logger(error));
4244
expect(chunkedOp.length).to.be.equal(10);
4345
// we explicitly requested only one chunk
4446
expect(session.context.driver.fetchResults.callCount).to.equal(1);
@@ -47,6 +49,33 @@ describe('Data fetching', () => {
4749
}
4850
});
4951

52+
it('fetch chunks should respect maxRows', async () => {
53+
const session = await openSession({ arrowEnabled: false });
54+
55+
const chunkSize = 300;
56+
const lastChunkSize = 100; // 1000 % chunkSize
57+
58+
try {
59+
const operation = await session.executeStatement(query, { maxRows: 500 });
60+
61+
let hasMoreRows = true;
62+
let chunkCount = 0;
63+
64+
while (hasMoreRows) {
65+
let chunkedOp = await operation.fetchChunk({ maxRows: 300 });
66+
chunkCount += 1;
67+
hasMoreRows = await operation.hasMoreRows();
68+
69+
const isLastChunk = !hasMoreRows;
70+
expect(chunkedOp.length).to.be.equal(isLastChunk ? lastChunkSize : chunkSize);
71+
}
72+
73+
expect(chunkCount).to.be.equal(4); // 1000 = 3*300 + 1*100
74+
} finally {
75+
await session.close();
76+
}
77+
});
78+
5079
it('fetch all should fetch all records', async () => {
5180
const session = await openSession({ arrowEnabled: false });
5281
sinon.spy(session.context.driver, 'fetchResults');

tests/e2e/cloudfetch.test.js

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const config = require('./utils/config');
44
const logger = require('./utils/logger')(config.logger);
55
const { DBSQLClient } = require('../..');
66
const CloudFetchResultHandler = require('../../dist/result/CloudFetchResultHandler').default;
7+
const ResultSlicer = require('../../dist/result/ResultSlicer').default;
78

89
async function openSession(customConfig) {
910
const client = new DBSQLClient();
@@ -51,31 +52,34 @@ describe('CloudFetch', () => {
5152

5253
// Check if we're actually getting data via CloudFetch
5354
const resultHandler = await operation.getResultHandler();
54-
expect(resultHandler).to.be.instanceOf(CloudFetchResultHandler);
55+
expect(resultHandler).to.be.instanceof(ResultSlicer);
56+
expect(resultHandler.source).to.be.instanceOf(CloudFetchResultHandler);
57+
58+
const cfResultHandler = resultHandler.source;
5559

5660
// Fetch first chunk and check if result handler behaves properly.
5761
// With the count of rows we queried, there should be at least one row set,
5862
// containing 8 result links. After fetching the first chunk,
5963
// result handler should download 5 of them and schedule the rest
60-
expect(await resultHandler.hasMore()).to.be.false;
61-
expect(resultHandler.pendingLinks.length).to.be.equal(0);
62-
expect(resultHandler.downloadedBatches.length).to.be.equal(0);
64+
expect(await cfResultHandler.hasMore()).to.be.false;
65+
expect(cfResultHandler.pendingLinks.length).to.be.equal(0);
66+
expect(cfResultHandler.downloadedBatches.length).to.be.equal(0);
6367

6468
sinon.spy(operation._data, 'fetchNext');
6569

66-
const chunk = await operation.fetchChunk({ maxRows: 100000 });
70+
const chunk = await operation.fetchChunk({ maxRows: 100000, disableBuffering: true });
6771
// Count links returned from server
6872
const resultSet = await operation._data.fetchNext.firstCall.returnValue;
6973
const resultLinksCount = resultSet?.resultLinks?.length ?? 0;
7074

71-
expect(await resultHandler.hasMore()).to.be.true;
75+
expect(await cfResultHandler.hasMore()).to.be.true;
7276
// expected batches minus first 5 already fetched
73-
expect(resultHandler.pendingLinks.length).to.be.equal(resultLinksCount - cloudFetchConcurrentDownloads);
74-
expect(resultHandler.downloadedBatches.length).to.be.equal(cloudFetchConcurrentDownloads - 1);
77+
expect(cfResultHandler.pendingLinks.length).to.be.equal(resultLinksCount - cloudFetchConcurrentDownloads);
78+
expect(cfResultHandler.downloadedBatches.length).to.be.equal(cloudFetchConcurrentDownloads - 1);
7579

7680
let fetchedRowCount = chunk.length;
7781
while (await operation.hasMoreRows()) {
78-
const chunk = await operation.fetchChunk({ maxRows: 100000 });
82+
const chunk = await operation.fetchChunk({ maxRows: 100000, disableBuffering: true });
7983
fetchedRowCount += chunk.length;
8084
}
8185

0 commit comments

Comments
 (0)