Skip to content

Commit a4e4aa7

Browse files
[PECO-1259] Implement retry behavior for CloudFetch (#211)
* [PECO-1259] Implement retry behavior for CloudFetch + refactoring Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Update tests Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent c85629b commit a4e4aa7

9 files changed

Lines changed: 193 additions & 93 deletions

File tree

lib/connection/connections/HttpConnection.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
import thrift from 'thrift';
22
import https from 'https';
33
import http from 'http';
4-
import { HeadersInit } from 'node-fetch';
4+
import { HeadersInit, Response } from 'node-fetch';
55
import { ProxyAgent } from 'proxy-agent';
66

77
import IConnectionProvider from '../contracts/IConnectionProvider';
88
import IConnectionOptions, { ProxyOptions } from '../contracts/IConnectionOptions';
99
import IClientContext from '../../contracts/IClientContext';
1010

1111
import ThriftHttpConnection from './ThriftHttpConnection';
12+
import IRetryPolicy from '../contracts/IRetryPolicy';
13+
import HttpRetryPolicy from './HttpRetryPolicy';
1214

1315
export default class HttpConnection implements IConnectionProvider {
1416
private readonly options: IConnectionOptions;
@@ -102,6 +104,7 @@ export default class HttpConnection implements IConnectionProvider {
102104
url: `${options.https ? 'https' : 'http'}://${options.host}:${options.port}${options.path ?? '/'}`,
103105
transport: thrift.TBufferedTransport,
104106
protocol: thrift.TBinaryProtocol,
107+
getRetryPolicy: () => this.getRetryPolicy(),
105108
},
106109
{
107110
agent,
@@ -116,4 +119,8 @@ export default class HttpConnection implements IConnectionProvider {
116119

117120
return this.connection;
118121
}
122+
123+
public async getRetryPolicy(): Promise<IRetryPolicy<Response>> {
124+
return new HttpRetryPolicy(this.context);
125+
}
119126
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import { Response } from 'node-fetch';
2+
import IRetryPolicy, { ShouldRetryResult, RetryableOperation } from '../contracts/IRetryPolicy';
3+
import IClientContext, { ClientConfig } from '../../contracts/IClientContext';
4+
import RetryError, { RetryErrorCode } from '../../errors/RetryError';
5+
6+
function getRetryDelay(attempt: number, config: ClientConfig): number {
7+
const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1
8+
return Math.min(config.retryDelayMin * scale, config.retryDelayMax);
9+
}
10+
11+
function delay(milliseconds: number): Promise<void> {
12+
return new Promise<void>((resolve) => {
13+
setTimeout(() => resolve(), milliseconds);
14+
});
15+
}
16+
17+
export default class HttpRetryPolicy implements IRetryPolicy<Response> {
18+
private context: IClientContext;
19+
20+
private readonly startTime: number; // in milliseconds
21+
22+
private attempt: number;
23+
24+
constructor(context: IClientContext) {
25+
this.context = context;
26+
this.startTime = Date.now();
27+
this.attempt = 0;
28+
}
29+
30+
public async shouldRetry(response: Response): Promise<ShouldRetryResult> {
31+
if (!response.ok) {
32+
switch (response.status) {
33+
// On these status codes it's safe to retry the request. However,
34+
// both error codes mean that server is overwhelmed or even down.
35+
// Therefore, we need to add some delay between attempts so
36+
// server can recover and more likely handle next request
37+
case 429: // Too Many Requests
38+
case 503: // Service Unavailable
39+
this.attempt += 1;
40+
41+
const clientConfig = this.context.getConfig();
42+
43+
// Delay interval depends on current attempt - the more attempts we do
44+
// the longer the interval will be
45+
// TODO: Respect `Retry-After` header (PECO-729)
46+
const retryDelay = getRetryDelay(this.attempt, clientConfig);
47+
48+
const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
49+
if (attemptsExceeded) {
50+
throw new RetryError(RetryErrorCode.AttemptsExceeded, response);
51+
}
52+
53+
const timeoutExceeded = Date.now() - this.startTime + retryDelay >= clientConfig.retriesTimeout;
54+
if (timeoutExceeded) {
55+
throw new RetryError(RetryErrorCode.TimeoutExceeded, response);
56+
}
57+
58+
return { shouldRetry: true, retryAfter: retryDelay };
59+
60+
// TODO: Here we should handle other error types (see PECO-730)
61+
62+
// no default
63+
}
64+
}
65+
66+
return { shouldRetry: false };
67+
}
68+
69+
public async invokeWithRetry(operation: RetryableOperation<Response>): Promise<Response> {
70+
for (;;) {
71+
const response = await operation(); // eslint-disable-line no-await-in-loop
72+
const status = await this.shouldRetry(response); // eslint-disable-line no-await-in-loop
73+
if (!status.shouldRetry) {
74+
return response;
75+
}
76+
await delay(status.retryAfter); // eslint-disable-line no-await-in-loop
77+
}
78+
}
79+
}

lib/connection/connections/ThriftHttpConnection.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { TBinaryProtocol, TBufferedTransport, Thrift, TProtocol, TProtocolConstr
99
import fetch, { RequestInit, HeadersInit, Response, FetchError } from 'node-fetch';
1010
// @ts-expect-error TS7016: Could not find a declaration file for module
1111
import InputBufferUnderrunError from 'thrift/lib/nodejs/lib/thrift/input_buffer_underrun_error';
12+
import IRetryPolicy from '../contracts/IRetryPolicy';
1213

1314
export class THTTPException extends Thrift.TApplicationException {
1415
public readonly statusCode: unknown;
@@ -31,6 +32,7 @@ interface ThriftHttpConnectionOptions {
3132
url: string;
3233
transport?: TTransportType;
3334
protocol?: TProtocolConstructor;
35+
getRetryPolicy(): Promise<IRetryPolicy<Response>>;
3436
}
3537

3638
// This type describes a shape of internals of Thrift client object.
@@ -56,6 +58,8 @@ export default class ThriftHttpConnection extends EventEmitter {
5658
// This field is used by Thrift internally, so name and type are important
5759
private readonly protocol: TProtocolConstructor;
5860

61+
private readonly getRetryPolicy: () => Promise<IRetryPolicy<Response>>;
62+
5963
// thrift.createClient sets this field internally
6064
public client?: ThriftClient;
6165

@@ -65,6 +69,7 @@ export default class ThriftHttpConnection extends EventEmitter {
6569
this.config = config;
6670
this.transport = options.transport ?? TBufferedTransport;
6771
this.protocol = options.protocol ?? TBinaryProtocol;
72+
this.getRetryPolicy = options.getRetryPolicy;
6873
}
6974

7075
public setHeaders(headers: HeadersInit) {
@@ -87,7 +92,11 @@ export default class ThriftHttpConnection extends EventEmitter {
8792
body: data,
8893
};
8994

90-
fetch(this.url, requestConfig)
95+
this.getRetryPolicy()
96+
.then((retryPolicy) => {
97+
const makeRequest = () => fetch(this.url, requestConfig);
98+
return retryPolicy.invokeWithRetry(makeRequest);
99+
})
91100
.then((response) => {
92101
if (response.status !== 200) {
93102
throw new THTTPException(response);
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import http from 'http';
2-
import { HeadersInit } from 'node-fetch';
2+
import { HeadersInit, Response } from 'node-fetch';
3+
import IRetryPolicy from './IRetryPolicy';
34

45
export default interface IConnectionProvider {
56
getThriftConnection(): Promise<any>;
67

78
getAgent(): Promise<http.Agent>;
89

910
setHeaders(headers: HeadersInit): void;
11+
12+
getRetryPolicy(): Promise<IRetryPolicy<Response>>;
1013
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
export type ShouldRetryResult =
2+
| {
3+
shouldRetry: false;
4+
}
5+
| {
6+
shouldRetry: true;
7+
retryAfter: number; // in milliseconds
8+
};
9+
10+
export type RetryableOperation<R> = () => Promise<R>;
11+
12+
export default interface IRetryPolicy<R> {
13+
shouldRetry(response: R): Promise<ShouldRetryResult>;
14+
15+
invokeWithRetry(operation: RetryableOperation<R>): Promise<R>;
16+
}

lib/errors/RetryError.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
export enum RetryErrorCode {
2+
AttemptsExceeded = 'ATTEMPTS_EXCEEDED',
3+
TimeoutExceeded = 'TIMEOUT_EXCEEDED',
4+
}
5+
6+
const errorMessages: Record<RetryErrorCode, string> = {
7+
[RetryErrorCode.AttemptsExceeded]: 'Max retry count exceeded',
8+
[RetryErrorCode.TimeoutExceeded]: 'Retry timeout exceeded',
9+
};
10+
11+
export default class RetryError extends Error {
12+
public readonly errorCode: RetryErrorCode;
13+
14+
public readonly payload: unknown;
15+
16+
constructor(errorCode: RetryErrorCode, payload?: unknown) {
17+
super(errorMessages[errorCode]);
18+
this.errorCode = errorCode;
19+
this.payload = payload;
20+
}
21+
}

lib/hive/Commands/BaseCommand.ts

Lines changed: 17 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,8 @@
1-
import { Thrift } from 'thrift';
1+
import { Response } from 'node-fetch';
22
import TCLIService from '../../../thrift/TCLIService';
33
import HiveDriverError from '../../errors/HiveDriverError';
4-
import IClientContext, { ClientConfig } from '../../contracts/IClientContext';
5-
6-
interface CommandExecutionInfo {
7-
startTime: number; // in milliseconds
8-
attempt: number;
9-
}
10-
11-
function getRetryDelay(attempt: number, config: ClientConfig): number {
12-
const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1
13-
return Math.min(config.retryDelayMin * scale, config.retryDelayMax);
14-
}
15-
16-
function delay(milliseconds: number): Promise<void> {
17-
return new Promise<void>((resolve) => {
18-
setTimeout(() => resolve(), milliseconds);
19-
});
20-
}
4+
import RetryError, { RetryErrorCode } from '../../errors/RetryError';
5+
import IClientContext from '../../contracts/IClientContext';
216

227
export default abstract class BaseCommand {
238
protected client: TCLIService.Client;
@@ -29,57 +14,23 @@ export default abstract class BaseCommand {
2914
this.context = context;
3015
}
3116

32-
protected executeCommand<Response>(request: object, command: Function | void): Promise<Response> {
33-
return this.invokeWithErrorHandling<Response>(request, command, { startTime: Date.now(), attempt: 0 });
34-
}
35-
36-
private async invokeWithErrorHandling<Response>(
37-
request: object,
38-
command: Function | void,
39-
info: CommandExecutionInfo,
40-
): Promise<Response> {
17+
protected async executeCommand<Response>(request: object, command: Function | void): Promise<Response> {
4118
try {
4219
return await this.invokeCommand<Response>(request, command);
4320
} catch (error) {
44-
if (error instanceof Thrift.TApplicationException) {
45-
if ('statusCode' in error) {
46-
switch (error.statusCode) {
47-
// On these status codes it's safe to retry the request. However,
48-
// both error codes mean that server is overwhelmed or even down.
49-
// Therefore, we need to add some delay between attempts so
50-
// server can recover and more likely handle next request
51-
case 429: // Too Many Requests
52-
case 503: // Service Unavailable
53-
info.attempt += 1;
54-
55-
const clientConfig = this.context.getConfig();
56-
57-
// Delay interval depends on current attempt - the more attempts we do
58-
// the longer the interval will be
59-
// TODO: Respect `Retry-After` header (PECO-729)
60-
const retryDelay = getRetryDelay(info.attempt, clientConfig);
61-
62-
const attemptsExceeded = info.attempt >= clientConfig.retryMaxAttempts;
63-
if (attemptsExceeded) {
64-
throw new HiveDriverError(
65-
`Hive driver: ${error.statusCode} when connecting to resource. Max retry count exceeded.`,
66-
);
67-
}
68-
69-
const timeoutExceeded = Date.now() - info.startTime + retryDelay >= clientConfig.retriesTimeout;
70-
if (timeoutExceeded) {
71-
throw new HiveDriverError(
72-
`Hive driver: ${error.statusCode} when connecting to resource. Retry timeout exceeded.`,
73-
);
74-
}
75-
76-
await delay(retryDelay);
77-
return this.invokeWithErrorHandling(request, command, info);
78-
79-
// TODO: Here we should handle other error types (see PECO-730)
80-
81-
// no default
82-
}
21+
if (error instanceof RetryError) {
22+
const statusCode = error.payload instanceof Response ? error.payload.status : undefined;
23+
24+
switch (error.errorCode) {
25+
case RetryErrorCode.AttemptsExceeded:
26+
throw new HiveDriverError(
27+
`Hive driver: ${statusCode ?? 'Error'} when connecting to resource. Max retry count exceeded.`,
28+
);
29+
case RetryErrorCode.TimeoutExceeded:
30+
throw new HiveDriverError(
31+
`Hive driver: ${statusCode ?? 'Error'} when connecting to resource. Retry timeout exceeded.`,
32+
);
33+
// no default
8334
}
8435
}
8536

lib/result/CloudFetchResultHandler.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,9 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
6262
private async fetch(url: RequestInfo, init?: RequestInit) {
6363
const connectionProvider = await this.context.getConnectionProvider();
6464
const agent = await connectionProvider.getAgent();
65+
const retryPolicy = await connectionProvider.getRetryPolicy();
6566

66-
return fetch(url, {
67-
agent,
68-
...init,
69-
});
67+
const requestConfig: RequestInit = { agent, ...init };
68+
return retryPolicy.invokeWithRetry(() => fetch(url, requestConfig));
7069
}
7170
}

0 commit comments

Comments
 (0)