Skip to content

Commit 911e5b0

Browse files
Implement custom HTTP client for thrift (#183)
* Implement custom HttpConnection for Thrift library Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Stop patching `thrift` library and remove `patch-package` Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent 643aad3 commit 911e5b0

9 files changed

Lines changed: 307 additions & 667 deletions

File tree

lib/DBSQLClient.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,12 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
8080
return {
8181
host,
8282
port: port || 443,
83-
options: {
84-
path: prependSlash(path),
85-
https: true,
86-
...otherOptions,
87-
headers: {
88-
...headers,
89-
'User-Agent': buildUserAgentString(options.clientId),
90-
},
83+
path: prependSlash(path),
84+
https: true,
85+
...otherOptions,
86+
headers: {
87+
...headers,
88+
'User-Agent': buildUserAgentString(options.clientId),
9189
},
9290
};
9391
}
Lines changed: 31 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,52 @@
11
import thrift from 'thrift';
22
import https from 'https';
3-
import http, { IncomingMessage } from 'http';
3+
import http from 'http';
44

55
import IThriftConnection from '../contracts/IThriftConnection';
66
import IConnectionProvider from '../contracts/IConnectionProvider';
7-
import IConnectionOptions, { Options } from '../contracts/IConnectionOptions';
7+
import IConnectionOptions from '../contracts/IConnectionOptions';
88
import globalConfig from '../../globalConfig';
99

10-
type NodeOptions = {
11-
ca?: Buffer | string;
12-
cert?: Buffer | string;
13-
key?: Buffer | string;
14-
rejectUnauthorized?: boolean;
15-
};
10+
import ThriftHttpConnection from './ThriftHttpConnection';
1611

1712
export default class HttpConnection implements IConnectionProvider, IThriftConnection {
18-
private thrift = thrift;
19-
2013
private connection: any;
2114

22-
async connect(options: IConnectionOptions): Promise<IThriftConnection> {
23-
const agentOptions: http.AgentOptions = {
15+
private async getAgent(options: IConnectionOptions): Promise<http.Agent> {
16+
const httpAgentOptions: http.AgentOptions = {
2417
keepAlive: true,
2518
maxSockets: 5,
2619
keepAliveMsecs: 10000,
27-
timeout: options.options?.socketTimeout ?? globalConfig.socketTimeout,
20+
timeout: options.socketTimeout ?? globalConfig.socketTimeout,
21+
};
22+
23+
const httpsAgentOptions: https.AgentOptions = {
24+
...httpAgentOptions,
25+
minVersion: 'TLSv1.2',
26+
rejectUnauthorized: false,
27+
ca: options.ca,
28+
cert: options.cert,
29+
key: options.key,
2830
};
2931

30-
const agent = options.options?.https
31-
? new https.Agent({ ...agentOptions, minVersion: 'TLSv1.2' })
32-
: new http.Agent(agentOptions);
32+
return options.https ? new https.Agent(httpsAgentOptions) : new http.Agent(httpAgentOptions);
33+
}
3334

34-
const thriftOptions = {
35-
transport: thrift.TBufferedTransport,
36-
protocol: thrift.TBinaryProtocol,
37-
...options.options,
38-
nodeOptions: {
35+
async connect(options: IConnectionOptions): Promise<IThriftConnection> {
36+
const agent = await this.getAgent(options);
37+
38+
this.connection = new ThriftHttpConnection(
39+
{
40+
url: `${options.https ? 'https' : 'http'}://${options.host}:${options.port}${options.path ?? '/'}`,
41+
transport: thrift.TBufferedTransport,
42+
protocol: thrift.TBinaryProtocol,
43+
},
44+
{
3945
agent,
40-
...this.getNodeOptions(options.options || {}),
41-
...(options.options?.nodeOptions || {}),
42-
timeout: options.options?.socketTimeout ?? globalConfig.socketTimeout,
46+
timeout: options.socketTimeout ?? globalConfig.socketTimeout,
47+
headers: options.headers,
4348
},
44-
};
45-
46-
this.connection = this.thrift.createHttpConnection(options.host, options.port, thriftOptions);
47-
this.addCookieHandler();
49+
);
4850

4951
return this;
5052
}
@@ -54,47 +56,6 @@ export default class HttpConnection implements IConnectionProvider, IThriftConne
5456
}
5557

5658
isConnected(): boolean {
57-
if (this.connection) {
58-
return true;
59-
}
60-
return false;
61-
}
62-
63-
private getNodeOptions(options: Options): object {
64-
const { ca, cert, key, https: useHttps } = options;
65-
const nodeOptions: NodeOptions = {};
66-
67-
if (ca) {
68-
nodeOptions.ca = ca;
69-
}
70-
if (cert) {
71-
nodeOptions.cert = cert;
72-
}
73-
if (key) {
74-
nodeOptions.key = key;
75-
}
76-
77-
if (useHttps) {
78-
nodeOptions.rejectUnauthorized = false;
79-
}
80-
81-
return nodeOptions;
82-
}
83-
84-
private addCookieHandler() {
85-
const { responseCallback } = this.connection;
86-
87-
this.connection.responseCallback = (response: IncomingMessage, ...rest: Array<unknown>) => {
88-
if (Array.isArray(response.headers['set-cookie'])) {
89-
const cookie = [this.connection.nodeOptions.headers.cookie];
90-
91-
this.connection.nodeOptions.headers.cookie = cookie
92-
.concat(response.headers['set-cookie'])
93-
.filter(Boolean)
94-
.join(';');
95-
}
96-
97-
responseCallback.call(this.connection, response, ...rest);
98-
};
59+
return !!this.connection;
9960
}
10061
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/**
2+
This file is created using node_modules/thrift/lib/nodejs/lib/thrift/http_connection.js as an example
3+
4+
The code relies on thrift internals, so be careful when upgrading `thrift` library
5+
*/
6+
7+
import { EventEmitter } from 'events';
8+
import { TBinaryProtocol, TBufferedTransport, Thrift, TProtocol, TProtocolConstructor, TTransport } from 'thrift';
9+
import fetch, { RequestInit, Response, FetchError } from 'node-fetch';
10+
// @ts-expect-error TS7016: Could not find a declaration file for module
11+
import InputBufferUnderrunError from 'thrift/lib/nodejs/lib/thrift/input_buffer_underrun_error';
12+
13+
export class THTTPException extends Thrift.TApplicationException {
14+
public readonly statusCode: unknown;
15+
16+
public readonly response: Response;
17+
18+
constructor(response: Response) {
19+
super(
20+
Thrift.TApplicationExceptionType.PROTOCOL_ERROR,
21+
`Received a response with a bad HTTP status code: ${response.status}`,
22+
);
23+
this.statusCode = response.status;
24+
this.response = response;
25+
}
26+
}
27+
28+
type TTransportType = typeof TBufferedTransport;
29+
30+
interface ThriftHttpConnectionOptions {
31+
url: string;
32+
transport?: TTransportType;
33+
protocol?: TProtocolConstructor;
34+
}
35+
36+
// This type describes a shape of internals of Thrift client object.
37+
// It is not perfect good enough for our needs
38+
type ThriftClient = {
39+
// Internal map of callbacks of running requests. Once request is completed (either successfully or not) -
40+
// callback should be removed from it
41+
_reqs: Record<number, (error: unknown, response?: unknown) => void>;
42+
} & {
43+
// For each client's public method Foo there are two private ones: send_Foo and recv_Foo.
44+
// We have to access recv_Foo ones to properly parse the response
45+
[key: string]: (input: TProtocol, mtype: Thrift.MessageType, seqId: number) => void;
46+
};
47+
48+
export default class ThriftHttpConnection extends EventEmitter {
49+
private readonly url: string;
50+
51+
private readonly config: RequestInit;
52+
53+
// This field is used by Thrift internally, so name and type are important
54+
private readonly transport: TTransportType;
55+
56+
// This field is used by Thrift internally, so name and type are important
57+
private readonly protocol: TProtocolConstructor;
58+
59+
// thrift.createClient sets this field internally
60+
public client?: ThriftClient;
61+
62+
constructor(options: ThriftHttpConnectionOptions, config: RequestInit = {}) {
63+
super();
64+
this.url = options.url;
65+
this.config = config;
66+
this.transport = options.transport ?? TBufferedTransport;
67+
this.protocol = options.protocol ?? TBinaryProtocol;
68+
}
69+
70+
public write(data: Buffer, seqId: number) {
71+
const requestConfig: RequestInit = {
72+
...this.config,
73+
method: 'POST',
74+
headers: {
75+
...this.config.headers,
76+
Connection: 'keep-alive',
77+
'Content-Length': `${data.length}`,
78+
'Content-Type': 'application/x-thrift',
79+
},
80+
body: data,
81+
};
82+
83+
fetch(this.url, requestConfig)
84+
.then((response) => {
85+
if (response.status !== 200) {
86+
throw new THTTPException(response);
87+
}
88+
89+
return response.buffer();
90+
})
91+
.then((buffer) => {
92+
this.transport.receiver((transportWithData) => this.handleThriftResponse(transportWithData), seqId)(buffer);
93+
})
94+
.catch((error) => {
95+
if (error instanceof FetchError) {
96+
if (error.type === 'request-timeout') {
97+
error = new Thrift.TApplicationException(
98+
Thrift.TApplicationExceptionType.PROTOCOL_ERROR,
99+
'Request timed out',
100+
);
101+
}
102+
}
103+
104+
const defaultErrorHandler = (err: unknown) => {
105+
this.emit('error', err);
106+
};
107+
108+
if (this.client) {
109+
const callback = this.client._reqs[seqId] ?? defaultErrorHandler;
110+
delete this.client._reqs[seqId];
111+
callback(error);
112+
} else {
113+
defaultErrorHandler(error);
114+
}
115+
});
116+
}
117+
118+
private handleThriftResponse(transportWithData: TTransport) {
119+
if (!this.client) {
120+
throw new Thrift.TApplicationException(Thrift.TApplicationExceptionType.INTERNAL_ERROR, 'Client not available');
121+
}
122+
123+
const Protocol = this.protocol;
124+
const proto = new Protocol(transportWithData);
125+
try {
126+
// eslint-disable-next-line no-constant-condition
127+
while (true) {
128+
const header = proto.readMessageBegin();
129+
const dummySeqId = header.rseqid * -1;
130+
const { client } = this;
131+
132+
client._reqs[dummySeqId] = (err, success) => {
133+
transportWithData.commitPosition();
134+
const clientCallback = client._reqs[header.rseqid];
135+
delete client._reqs[header.rseqid];
136+
if (clientCallback) {
137+
process.nextTick(() => {
138+
clientCallback(err, success);
139+
});
140+
}
141+
};
142+
143+
if (client[`recv_${header.fname}`]) {
144+
client[`recv_${header.fname}`](proto, header.mtype, dummySeqId);
145+
} else {
146+
delete client._reqs[dummySeqId];
147+
throw new Thrift.TApplicationException(
148+
Thrift.TApplicationExceptionType.WRONG_METHOD_NAME,
149+
'Received a response to an unknown RPC function',
150+
);
151+
}
152+
}
153+
} catch (error) {
154+
if (error instanceof InputBufferUnderrunError) {
155+
transportWithData.rollbackPosition();
156+
} else {
157+
throw error;
158+
}
159+
}
160+
}
161+
}
Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,14 @@
1-
import { HttpHeaders } from 'thrift';
1+
import { HeadersInit } from 'node-fetch';
22

3-
export type Options = {
4-
socketTimeout?: number;
5-
username?: string;
6-
password?: string;
7-
ssl?: boolean;
8-
https?: boolean;
9-
debug?: boolean;
10-
max_attempts?: number;
11-
retry_max_delay?: number;
12-
connect_timeout?: number;
13-
timeout?: number;
14-
headers?: HttpHeaders;
3+
export default interface IConnectionOptions {
4+
host: string;
5+
port: number;
156
path?: string;
7+
https?: boolean;
8+
headers?: HeadersInit;
9+
socketTimeout?: number;
10+
1611
ca?: Buffer | string;
1712
cert?: Buffer | string;
1813
key?: Buffer | string;
19-
[key: string]: any;
20-
};
21-
22-
export default interface IConnectionOptions {
23-
host: string;
24-
port: number;
25-
options?: Options;
2614
}

0 commit comments

Comments
 (0)