Skip to content

Commit 4020c78

Browse files
DBSQLOperation Refactoring (1 of 3) (#163)
* Refactoring: Merge CompleteOperationHelper into DBSQLOperation Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Refactoring: Merge SchemaHelper into DBSQLOperation Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Update tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent 824f21a commit 4020c78

7 files changed

Lines changed: 153 additions & 200 deletions

File tree

lib/DBSQLOperation/CompleteOperationHelper.ts

Lines changed: 0 additions & 52 deletions
This file was deleted.

lib/DBSQLOperation/SchemaHelper.ts

Lines changed: 0 additions & 70 deletions
This file was deleted.

lib/DBSQLOperation/index.ts

Lines changed: 87 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,21 @@ import {
1111
TOperationHandle,
1212
TTableSchema,
1313
TSparkDirectResults,
14+
TGetResultSetMetadataResp,
15+
TSparkRowSetType,
16+
TCloseOperationResp,
1417
} from '../../thrift/TCLIService_types';
1518
import Status from '../dto/Status';
1619
import OperationStatusHelper from './OperationStatusHelper';
17-
import SchemaHelper from './SchemaHelper';
1820
import FetchResultsHelper from './FetchResultsHelper';
19-
import CompleteOperationHelper from './CompleteOperationHelper';
2021
import IDBSQLLogger, { LogLevel } from '../contracts/IDBSQLLogger';
2122
import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError';
23+
import IOperationResult from '../result/IOperationResult';
24+
import JsonResult from '../result/JsonResult';
25+
import ArrowResult from '../result/ArrowResult';
26+
import CloudFetchResult from '../result/CloudFetchResult';
27+
import { definedOrError } from '../utils';
28+
import HiveDriverError from '../errors/HiveDriverError';
2229

2330
const defaultMaxRows = 100000;
2431

@@ -37,11 +44,17 @@ export default class DBSQLOperation implements IOperation {
3744

3845
private readonly _status: OperationStatusHelper;
3946

40-
private readonly _schema: SchemaHelper;
41-
4247
private readonly _data: FetchResultsHelper;
4348

44-
private readonly _completeOperation: CompleteOperationHelper;
49+
private readonly closeOperation?: TCloseOperationResp;
50+
51+
private closed: boolean = false;
52+
53+
private cancelled: boolean = false;
54+
55+
private metadata?: TGetResultSetMetadataResp;
56+
57+
private resultHandler?: IOperationResult;
4558

4659
constructor(
4760
driver: HiveDriver,
@@ -56,18 +69,14 @@ export default class DBSQLOperation implements IOperation {
5669
const useOnlyPrefetchedResults = Boolean(directResults?.closeOperation);
5770

5871
this._status = new OperationStatusHelper(this.driver, this.operationHandle, directResults?.operationStatus);
59-
this._schema = new SchemaHelper(this.driver, this.operationHandle, directResults?.resultSetMetadata);
72+
this.metadata = directResults?.resultSetMetadata;
6073
this._data = new FetchResultsHelper(
6174
this.driver,
6275
this.operationHandle,
6376
[directResults?.resultSet],
6477
useOnlyPrefetchedResults,
6578
);
66-
this._completeOperation = new CompleteOperationHelper(
67-
this.driver,
68-
this.operationHandle,
69-
directResults?.closeOperation,
70-
);
79+
this.closeOperation = directResults?.closeOperation;
7180
this.logger.log(LogLevel.debug, `Operation created with id: ${this.getId()}`);
7281
}
7382

@@ -115,7 +124,7 @@ export default class DBSQLOperation implements IOperation {
115124
await this.waitUntilReady(options);
116125

117126
const [resultHandler, data] = await Promise.all([
118-
this._schema.getResultHandler(),
127+
this.getResultHandler(),
119128
this._data.fetch(options?.maxRows || defaultMaxRows),
120129
]);
121130

@@ -145,12 +154,18 @@ export default class DBSQLOperation implements IOperation {
145154
* @throws {StatusError}
146155
*/
147156
public async cancel(): Promise<Status> {
148-
if (this._completeOperation.closed || this._completeOperation.cancelled) {
157+
if (this.closed || this.cancelled) {
149158
return Status.success();
150159
}
151160

152161
this.logger?.log(LogLevel.debug, `Cancelling operation with id: ${this.getId()}`);
153-
const result = this._completeOperation.cancel();
162+
163+
const response = await this.driver.cancelOperation({
164+
operationHandle: this.operationHandle,
165+
});
166+
Status.assert(response.status);
167+
this.cancelled = true;
168+
const result = new Status(response.status);
154169

155170
// Cancelled operation becomes unusable, similarly to being closed
156171
this.onClose?.();
@@ -162,12 +177,20 @@ export default class DBSQLOperation implements IOperation {
162177
* @throws {StatusError}
163178
*/
164179
public async close(): Promise<Status> {
165-
if (this._completeOperation.closed || this._completeOperation.cancelled) {
180+
if (this.closed || this.cancelled) {
166181
return Status.success();
167182
}
168183

169184
this.logger?.log(LogLevel.debug, `Closing operation with id: ${this.getId()}`);
170-
const result = await this._completeOperation.close();
185+
186+
const response =
187+
this.closeOperation ??
188+
(await this.driver.closeOperation({
189+
operationHandle: this.operationHandle,
190+
}));
191+
Status.assert(response.status);
192+
this.closed = true;
193+
const result = new Status(response.status);
171194

172195
this.onClose?.();
173196
return result;
@@ -180,7 +203,7 @@ export default class DBSQLOperation implements IOperation {
180203

181204
public async hasMoreRows(): Promise<boolean> {
182205
// If operation is closed or cancelled - we should not try to get data from it
183-
if (this._completeOperation.closed || this._completeOperation.cancelled) {
206+
if (this.closed || this.cancelled) {
184207
return false;
185208
}
186209

@@ -190,7 +213,7 @@ export default class DBSQLOperation implements IOperation {
190213
}
191214

192215
// If we fetched all the data from server - check if there's anything buffered in result handler
193-
const resultHandler = await this._schema.getResultHandler();
216+
const resultHandler = await this.getResultHandler();
194217
return resultHandler.hasPendingData();
195218
}
196219

@@ -204,14 +227,15 @@ export default class DBSQLOperation implements IOperation {
204227
await this.waitUntilReady(options);
205228

206229
this.logger?.log(LogLevel.debug, `Fetching schema for operation with id: ${this.getId()}`);
207-
return this._schema.fetch();
230+
const metadata = await this.fetchMetadata();
231+
return metadata.schema ?? null;
208232
}
209233

210234
private async failIfClosed(): Promise<void> {
211-
if (this._completeOperation.closed) {
235+
if (this.closed) {
212236
throw new OperationStateError(OperationStateErrorCode.Closed);
213237
}
214-
if (this._completeOperation.cancelled) {
238+
if (this.cancelled) {
215239
throw new OperationStateError(OperationStateErrorCode.Canceled);
216240
}
217241
}
@@ -222,13 +246,53 @@ export default class DBSQLOperation implements IOperation {
222246
} catch (error) {
223247
if (error instanceof OperationStateError) {
224248
if (error.errorCode === OperationStateErrorCode.Canceled) {
225-
this._completeOperation.cancelled = true;
249+
this.cancelled = true;
226250
}
227251
if (error.errorCode === OperationStateErrorCode.Closed) {
228-
this._completeOperation.closed = true;
252+
this.closed = true;
229253
}
230254
}
231255
throw error;
232256
}
233257
}
258+
259+
private async fetchMetadata() {
260+
if (!this.metadata) {
261+
const metadata = await this.driver.getResultSetMetadata({
262+
operationHandle: this.operationHandle,
263+
});
264+
Status.assert(metadata.status);
265+
this.metadata = metadata;
266+
}
267+
268+
return this.metadata;
269+
}
270+
271+
private async getResultHandler(): Promise<IOperationResult> {
272+
const metadata = await this.fetchMetadata();
273+
const resultFormat = definedOrError(metadata.resultFormat);
274+
275+
if (!this.resultHandler) {
276+
switch (resultFormat) {
277+
case TSparkRowSetType.COLUMN_BASED_SET:
278+
this.resultHandler = new JsonResult(metadata.schema);
279+
break;
280+
case TSparkRowSetType.ARROW_BASED_SET:
281+
this.resultHandler = new ArrowResult(metadata.schema, metadata.arrowSchema);
282+
break;
283+
case TSparkRowSetType.URL_BASED_SET:
284+
this.resultHandler = new CloudFetchResult(metadata.schema);
285+
break;
286+
default:
287+
this.resultHandler = undefined;
288+
break;
289+
}
290+
}
291+
292+
if (!this.resultHandler) {
293+
throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`);
294+
}
295+
296+
return this.resultHandler;
297+
}
234298
}

tests/e2e/arrow.test.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ describe('Arrow support', () => {
7575
const result = await operation.fetchAll();
7676
expect(result).to.deep.equal(expectedColumn);
7777

78-
const resultHandler = await operation._schema.getResultHandler();
78+
const resultHandler = await operation.getResultHandler();
7979
expect(resultHandler).to.be.not.instanceof(ArrowResult);
8080

8181
await operation.close();
@@ -92,7 +92,7 @@ describe('Arrow support', () => {
9292
const result = await operation.fetchAll();
9393
expect(fixArrowResult(result)).to.deep.equal(expectedArrow);
9494

95-
const resultHandler = await operation._schema.getResultHandler();
95+
const resultHandler = await operation.getResultHandler();
9696
expect(resultHandler).to.be.instanceof(ArrowResult);
9797

9898
await operation.close();
@@ -109,7 +109,7 @@ describe('Arrow support', () => {
109109
const result = await operation.fetchAll();
110110
expect(fixArrowResult(result)).to.deep.equal(expectedArrowNativeTypes);
111111

112-
const resultHandler = await operation._schema.getResultHandler();
112+
const resultHandler = await operation.getResultHandler();
113113
expect(resultHandler).to.be.instanceof(ArrowResult);
114114

115115
await operation.close();
@@ -129,7 +129,7 @@ describe('Arrow support', () => {
129129
`);
130130

131131
// We use some internals here to check that server returned response with multiple batches
132-
const resultHandler = await operation._schema.getResultHandler();
132+
const resultHandler = await operation.getResultHandler();
133133
expect(resultHandler).to.be.instanceof(ArrowResult);
134134

135135
const rawData = await operation._data.fetch(rowsCount);

tests/e2e/cloudfetch.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ describe('CloudFetch', () => {
5656
await operation.finished();
5757

5858
// Check if we're actually getting data via CloudFetch
59-
const resultHandler = await operation._schema.getResultHandler();
59+
const resultHandler = await operation.getResultHandler();
6060
expect(resultHandler).to.be.instanceOf(CloudFetchResult);
6161

6262
// Fetch first chunk and check if result handler behaves properly.

0 commit comments

Comments
 (0)