Skip to content

Commit e69ee4b

Browse files
DBSQLOperation Refactoring (2 of 3) (#185)
Refactoring: Merge OperationStatusHelper into DBSQLOperation Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent 4f32d5e commit e69ee4b

3 files changed

Lines changed: 132 additions & 160 deletions

File tree

lib/DBSQLOperation/OperationStatusHelper.ts

Lines changed: 0 additions & 118 deletions
This file was deleted.

lib/DBSQLOperation/index.ts

Lines changed: 106 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ import {
1414
TGetResultSetMetadataResp,
1515
TSparkRowSetType,
1616
TCloseOperationResp,
17+
TOperationState,
1718
} from '../../thrift/TCLIService_types';
1819
import Status from '../dto/Status';
19-
import OperationStatusHelper from './OperationStatusHelper';
2020
import FetchResultsHelper from './FetchResultsHelper';
2121
import IDBSQLLogger, { LogLevel } from '../contracts/IDBSQLLogger';
2222
import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError';
@@ -33,6 +33,14 @@ interface DBSQLOperationConstructorOptions {
3333
logger: IDBSQLLogger;
3434
}
3535

36+
async function delay(ms?: number): Promise<void> {
37+
return new Promise((resolve) => {
38+
setTimeout(() => {
39+
resolve();
40+
}, ms);
41+
});
42+
}
43+
3644
export default class DBSQLOperation implements IOperation {
3745
private readonly driver: HiveDriver;
3846

@@ -42,8 +50,6 @@ export default class DBSQLOperation implements IOperation {
4250

4351
public onClose?: () => void;
4452

45-
private readonly _status: OperationStatusHelper;
46-
4753
private readonly _data: FetchResultsHelper;
4854

4955
private readonly closeOperation?: TCloseOperationResp;
@@ -54,6 +60,14 @@ export default class DBSQLOperation implements IOperation {
5460

5561
private metadata?: TGetResultSetMetadataResp;
5662

63+
private state: number = TOperationState.INITIALIZED_STATE;
64+
65+
// Once operation is finished or fails - cache status response, because subsequent calls
66+
// to `getOperationStatus()` may fail with irrelevant errors, e.g. HTTP 404
67+
private operationStatus?: TGetOperationStatusResp;
68+
69+
private hasResultSet: boolean = false;
70+
5771
private resultHandler?: IOperationResult;
5872

5973
constructor(
@@ -68,7 +82,11 @@ export default class DBSQLOperation implements IOperation {
6882

6983
const useOnlyPrefetchedResults = Boolean(directResults?.closeOperation);
7084

71-
this._status = new OperationStatusHelper(this.driver, this.operationHandle, directResults?.operationStatus);
85+
this.hasResultSet = operationHandle.hasResultSet;
86+
if (directResults?.operationStatus) {
87+
this.processOperationStatusResponse(directResults.operationStatus);
88+
}
89+
7290
this.metadata = directResults?.resultSetMetadata;
7391
this._data = new FetchResultsHelper(
7492
this.driver,
@@ -117,7 +135,7 @@ export default class DBSQLOperation implements IOperation {
117135
public async fetchChunk(options?: FetchOptions): Promise<Array<object>> {
118136
await this.failIfClosed();
119137

120-
if (!this._status.hasResultSet) {
138+
if (!this.hasResultSet) {
121139
return [];
122140
}
123141

@@ -146,7 +164,17 @@ export default class DBSQLOperation implements IOperation {
146164
public async status(progress: boolean = false): Promise<TGetOperationStatusResp> {
147165
await this.failIfClosed();
148166
this.logger?.log(LogLevel.debug, `Fetching status for operation with id: ${this.getId()}`);
149-
return this._status.status(progress);
167+
168+
if (this.operationStatus) {
169+
return this.operationStatus;
170+
}
171+
172+
const response = await this.driver.getOperationStatus({
173+
operationHandle: this.operationHandle,
174+
getProgressUpdate: progress,
175+
});
176+
177+
return this.processOperationStatusResponse(response);
150178
}
151179

152180
/**
@@ -220,7 +248,7 @@ export default class DBSQLOperation implements IOperation {
220248
public async getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null> {
221249
await this.failIfClosed();
222250

223-
if (!this._status.hasResultSet) {
251+
if (!this.hasResultSet) {
224252
return null;
225253
}
226254

@@ -247,18 +275,58 @@ export default class DBSQLOperation implements IOperation {
247275
}
248276

249277
private async waitUntilReady(options?: WaitUntilReadyOptions) {
250-
try {
251-
await this._status.waitUntilReady(options);
252-
} catch (error) {
253-
if (error instanceof OperationStateError) {
254-
if (error.errorCode === OperationStateErrorCode.Canceled) {
278+
if (this.state === TOperationState.FINISHED_STATE) {
279+
return;
280+
}
281+
282+
let isReady = false;
283+
284+
while (!isReady) {
285+
// eslint-disable-next-line no-await-in-loop
286+
const response = await this.status(Boolean(options?.progress));
287+
288+
if (options?.callback) {
289+
// eslint-disable-next-line no-await-in-loop
290+
await Promise.resolve(options.callback(response));
291+
}
292+
293+
switch (response.operationState) {
294+
// For these states do nothing and continue waiting
295+
case TOperationState.INITIALIZED_STATE:
296+
case TOperationState.PENDING_STATE:
297+
case TOperationState.RUNNING_STATE:
298+
break;
299+
300+
// Operation is completed, so exit the loop
301+
case TOperationState.FINISHED_STATE:
302+
isReady = true;
303+
break;
304+
305+
// Operation was cancelled, so set a flag and exit the loop (throw an error)
306+
case TOperationState.CANCELED_STATE:
255307
this.cancelled = true;
256-
}
257-
if (error.errorCode === OperationStateErrorCode.Closed) {
308+
throw new OperationStateError(OperationStateErrorCode.Canceled, response);
309+
310+
// Operation was closed, so set a flag and exit the loop (throw an error)
311+
case TOperationState.CLOSED_STATE:
258312
this.closed = true;
259-
}
313+
throw new OperationStateError(OperationStateErrorCode.Closed, response);
314+
315+
// Error states - throw and exit the loop
316+
case TOperationState.ERROR_STATE:
317+
throw new OperationStateError(OperationStateErrorCode.Error, response);
318+
case TOperationState.TIMEDOUT_STATE:
319+
throw new OperationStateError(OperationStateErrorCode.Timeout, response);
320+
case TOperationState.UKNOWN_STATE:
321+
default:
322+
throw new OperationStateError(OperationStateErrorCode.Unknown, response);
323+
}
324+
325+
// If not ready yet - make some delay before the next status requests
326+
if (!isReady) {
327+
// eslint-disable-next-line no-await-in-loop
328+
await delay(100);
260329
}
261-
throw error;
262330
}
263331
}
264332

@@ -301,4 +369,26 @@ export default class DBSQLOperation implements IOperation {
301369

302370
return this.resultHandler;
303371
}
372+
373+
private processOperationStatusResponse(response: TGetOperationStatusResp) {
374+
Status.assert(response.status);
375+
376+
this.state = response.operationState ?? this.state;
377+
378+
if (typeof response.hasResultSet === 'boolean') {
379+
this.hasResultSet = response.hasResultSet;
380+
}
381+
382+
const isInProgress = [
383+
TOperationState.INITIALIZED_STATE,
384+
TOperationState.PENDING_STATE,
385+
TOperationState.RUNNING_STATE,
386+
].includes(this.state);
387+
388+
if (!isInProgress) {
389+
this.operationStatus = response;
390+
}
391+
392+
return response;
393+
}
304394
}

0 commit comments

Comments
 (0)