Skip to content

Commit 46c3586

Browse files
Refine auth credentials refresh flow (#187)
Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent e932107 commit 46c3586

13 files changed

Lines changed: 248 additions & 369 deletions

File tree

lib/DBSQLClient.ts

Lines changed: 53 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import thrift, { HttpHeaders } from 'thrift';
1+
import thrift from 'thrift';
22

33
import { EventEmitter } from 'events';
44
import TCLIService from '../thrift/TCLIService';
@@ -13,12 +13,13 @@ import HttpConnection from './connection/connections/HttpConnection';
1313
import IConnectionOptions from './connection/contracts/IConnectionOptions';
1414
import Status from './dto/Status';
1515
import HiveDriverError from './errors/HiveDriverError';
16-
import { areHeadersEqual, buildUserAgentString, definedOrError } from './utils';
16+
import { buildUserAgentString, definedOrError } from './utils';
1717
import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication';
1818
import DatabricksOAuth from './connection/auth/DatabricksOAuth';
1919
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
2020
import DBSQLLogger from './DBSQLLogger';
2121
import CloseableCollection from './utils/CloseableCollection';
22+
import IConnectionProvider from './connection/contracts/IConnectionProvider';
2223

2324
function prependSlash(str: string): string {
2425
if (str.length > 0 && str.charAt(0) !== '/') {
@@ -41,13 +42,11 @@ function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) {
4142
}
4243

4344
export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
44-
private client: TCLIService.Client | null = null;
45+
private connectionProvider?: IConnectionProvider;
4546

46-
private authProvider: IAuthentication | null = null;
47+
private authProvider?: IAuthentication;
4748

48-
private connectionOptions: ConnectionOptions | null = null;
49-
50-
private additionalHeaders: HttpHeaders = {};
49+
private client?: TCLIService.Client;
5150

5251
private readonly logger: IDBSQLLogger;
5352

@@ -61,30 +60,14 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
6160
this.logger.log(LogLevel.info, 'Created DBSQLClient');
6261
}
6362

64-
private getConnectionOptions(options: ConnectionOptions, headers: HttpHeaders): IConnectionOptions {
65-
const {
66-
host,
67-
port,
68-
path,
69-
clientId,
70-
authType,
71-
// @ts-expect-error TS2339: Property 'token' does not exist on type 'ConnectionOptions'
72-
token,
73-
// @ts-expect-error TS2339: Property 'persistence' does not exist on type 'ConnectionOptions'
74-
persistence,
75-
// @ts-expect-error TS2339: Property 'provider' does not exist on type 'ConnectionOptions'
76-
provider,
77-
...otherOptions
78-
} = options;
79-
63+
private getConnectionOptions(options: ConnectionOptions): IConnectionOptions {
8064
return {
81-
host,
82-
port: port || 443,
83-
path: prependSlash(path),
65+
host: options.host,
66+
port: options.port || 443,
67+
path: prependSlash(options.path),
8468
https: true,
85-
...otherOptions,
69+
socketTimeout: options.socketTimeout,
8670
headers: {
87-
...headers,
8871
'User-Agent': buildUserAgentString(options.clientId),
8972
},
9073
};
@@ -128,7 +111,38 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
128111
*/
129112
public async connect(options: ConnectionOptions, authProvider?: IAuthentication): Promise<IDBSQLClient> {
130113
this.authProvider = this.getAuthProvider(options, authProvider);
131-
this.connectionOptions = options;
114+
115+
this.connectionProvider = new HttpConnection(this.getConnectionOptions(options));
116+
117+
const thriftConnection = await this.connectionProvider.getThriftConnection();
118+
119+
thriftConnection.on('error', (error: Error) => {
120+
// Error.stack already contains error type and message, so log stack if available,
121+
// otherwise fall back to just error type + message
122+
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
123+
try {
124+
this.emit('error', error);
125+
} catch (e) {
126+
// EventEmitter will throw unhandled error when emitting 'error' event.
127+
// Since we already logged it few lines above, just suppress this behaviour
128+
}
129+
});
130+
131+
thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => {
132+
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`);
133+
this.emit('reconnecting', params);
134+
});
135+
136+
thriftConnection.on('close', () => {
137+
this.logger.log(LogLevel.debug, 'Closing connection.');
138+
this.emit('close');
139+
});
140+
141+
thriftConnection.on('timeout', () => {
142+
this.logger.log(LogLevel.debug, 'Connection timed out.');
143+
this.emit('timeout');
144+
});
145+
132146
return this;
133147
}
134148

@@ -158,65 +172,28 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
158172
}
159173

160174
private async getClient() {
161-
if (!this.connectionOptions || !this.authProvider) {
175+
if (!this.connectionProvider) {
162176
throw new HiveDriverError('DBSQLClient: not connected');
163177
}
164178

165-
const authHeaders = await this.authProvider.authenticate();
166-
// When auth headers change - recreate client. Thrift library does not provide API for updating
167-
// changed options, therefore we have to recreate both connection and client to apply new headers
168-
if (!this.client || !areHeadersEqual(this.additionalHeaders, authHeaders)) {
179+
if (!this.client) {
169180
this.logger.log(LogLevel.info, 'DBSQLClient: initializing thrift client');
170-
this.additionalHeaders = authHeaders;
171-
const connectionOptions = this.getConnectionOptions(this.connectionOptions, this.additionalHeaders);
181+
this.client = this.thrift.createClient(TCLIService, await this.connectionProvider.getThriftConnection());
182+
}
172183

173-
const connection = await this.createConnection(connectionOptions);
174-
this.client = this.thrift.createClient(TCLIService, connection.getConnection());
184+
if (this.authProvider) {
185+
const authHeaders = await this.authProvider.authenticate();
186+
this.connectionProvider.setHeaders(authHeaders);
175187
}
176188

177189
return this.client;
178190
}
179191

180-
private async createConnection(options: IConnectionOptions) {
181-
const connectionProvider = new HttpConnection();
182-
const connection = await connectionProvider.connect(options);
183-
const thriftConnection = connection.getConnection();
184-
185-
thriftConnection.on('error', (error: Error) => {
186-
// Error.stack already contains error type and message, so log stack if available,
187-
// otherwise fall back to just error type + message
188-
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
189-
try {
190-
this.emit('error', error);
191-
} catch (e) {
192-
// EventEmitter will throw unhandled error when emitting 'error' event.
193-
// Since we already logged it few lines above, just suppress this behaviour
194-
}
195-
});
196-
197-
thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => {
198-
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`);
199-
this.emit('reconnecting', params);
200-
});
201-
202-
thriftConnection.on('close', () => {
203-
this.logger.log(LogLevel.debug, 'Closing connection.');
204-
this.emit('close');
205-
});
206-
207-
thriftConnection.on('timeout', () => {
208-
this.logger.log(LogLevel.debug, 'Connection timed out.');
209-
this.emit('timeout');
210-
});
211-
212-
return connection;
213-
}
214-
215192
public async close(): Promise<void> {
216193
await this.sessions.closeAll();
217194

218-
this.client = null;
219-
this.authProvider = null;
220-
this.connectionOptions = null;
195+
this.client = undefined;
196+
this.connectionProvider = undefined;
197+
this.authProvider = undefined;
221198
}
222199
}

lib/connection/auth/DatabricksOAuth/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { HttpHeaders } from 'thrift';
1+
import { HeadersInit } from 'node-fetch';
22
import IAuthentication from '../../contracts/IAuthentication';
33
import IDBSQLLogger from '../../../contracts/IDBSQLLogger';
44
import OAuthPersistence, { OAuthPersistenceCache } from './OAuthPersistence';
@@ -9,7 +9,7 @@ interface DatabricksOAuthOptions extends OAuthManagerOptions {
99
scopes?: OAuthScopes;
1010
logger?: IDBSQLLogger;
1111
persistence?: OAuthPersistence;
12-
headers?: HttpHeaders;
12+
headers?: HeadersInit;
1313
}
1414

1515
export default class DatabricksOAuth implements IAuthentication {
@@ -27,7 +27,7 @@ export default class DatabricksOAuth implements IAuthentication {
2727
this.manager = OAuthManager.getManager(this.options);
2828
}
2929

30-
public async authenticate(): Promise<HttpHeaders> {
30+
public async authenticate(): Promise<HeadersInit> {
3131
const { host, scopes, headers } = this.options;
3232

3333
const persistence = this.options.persistence ?? this.defaultPersistence;

lib/connection/auth/PlainHttpAuthentication.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
1-
import { HttpHeaders } from 'thrift';
1+
import { HeadersInit } from 'node-fetch';
22
import IAuthentication from '../contracts/IAuthentication';
33

44
interface PlainHttpAuthenticationOptions {
55
username?: string;
66
password?: string;
7-
headers?: HttpHeaders;
7+
headers?: HeadersInit;
88
}
99

1010
export default class PlainHttpAuthentication implements IAuthentication {
1111
private readonly username: string;
1212

1313
private readonly password: string;
1414

15-
private readonly headers: HttpHeaders;
15+
private readonly headers: HeadersInit;
1616

1717
constructor(options: PlainHttpAuthenticationOptions) {
1818
this.username = options?.username || 'anonymous';
1919
this.password = options?.password ?? 'anonymous';
2020
this.headers = options?.headers || {};
2121
}
2222

23-
public async authenticate(): Promise<HttpHeaders> {
23+
public async authenticate(): Promise<HeadersInit> {
2424
return {
2525
...this.headers,
2626
Authorization: `Bearer ${this.password}`,
Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,36 @@
11
import thrift from 'thrift';
22
import https from 'https';
33
import http from 'http';
4+
import { HeadersInit } from 'node-fetch';
45

5-
import IThriftConnection from '../contracts/IThriftConnection';
66
import IConnectionProvider from '../contracts/IConnectionProvider';
77
import IConnectionOptions from '../contracts/IConnectionOptions';
88
import globalConfig from '../../globalConfig';
99

1010
import ThriftHttpConnection from './ThriftHttpConnection';
1111

12-
export default class HttpConnection implements IConnectionProvider, IThriftConnection {
13-
private connection: any;
12+
export default class HttpConnection implements IConnectionProvider {
13+
private readonly options: IConnectionOptions;
14+
15+
private headers: HeadersInit = {};
16+
17+
private connection?: ThriftHttpConnection;
18+
19+
constructor(options: IConnectionOptions) {
20+
this.options = options;
21+
}
22+
23+
public setHeaders(headers: HeadersInit) {
24+
this.headers = headers;
25+
this.connection?.setHeaders({
26+
...this.options.headers,
27+
...this.headers,
28+
});
29+
}
30+
31+
private async getAgent(): Promise<http.Agent> {
32+
const { options } = this;
1433

15-
private async getAgent(options: IConnectionOptions): Promise<http.Agent> {
1634
const httpAgentOptions: http.AgentOptions = {
1735
keepAlive: true,
1836
maxSockets: 5,
@@ -32,30 +50,28 @@ export default class HttpConnection implements IConnectionProvider, IThriftConne
3250
return options.https ? new https.Agent(httpsAgentOptions) : new http.Agent(httpAgentOptions);
3351
}
3452

35-
async connect(options: IConnectionOptions): Promise<IThriftConnection> {
36-
const agent = await this.getAgent(options);
37-
38-
this.connection = new ThriftHttpConnection(
39-
{
40-
url: `${options.https ? 'https' : 'http'}://${options.host}:${options.port}${options.path ?? '/'}`,
41-
transport: thrift.TBufferedTransport,
42-
protocol: thrift.TBinaryProtocol,
43-
},
44-
{
45-
agent,
46-
timeout: options.socketTimeout ?? globalConfig.socketTimeout,
47-
headers: options.headers,
48-
},
49-
);
50-
51-
return this;
52-
}
53+
public async getThriftConnection(): Promise<any> {
54+
if (!this.connection) {
55+
const { options } = this;
56+
const agent = await this.getAgent();
5357

54-
getConnection() {
55-
return this.connection;
56-
}
58+
this.connection = new ThriftHttpConnection(
59+
{
60+
url: `${options.https ? 'https' : 'http'}://${options.host}:${options.port}${options.path ?? '/'}`,
61+
transport: thrift.TBufferedTransport,
62+
protocol: thrift.TBinaryProtocol,
63+
},
64+
{
65+
agent,
66+
timeout: options.socketTimeout ?? globalConfig.socketTimeout,
67+
headers: {
68+
...options.headers,
69+
...this.headers,
70+
},
71+
},
72+
);
73+
}
5774

58-
isConnected(): boolean {
59-
return !!this.connection;
75+
return this.connection;
6076
}
6177
}

lib/connection/connections/ThriftHttpConnection.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import { EventEmitter } from 'events';
88
import { TBinaryProtocol, TBufferedTransport, Thrift, TProtocol, TProtocolConstructor, TTransport } from 'thrift';
9-
import fetch, { RequestInit, Response, FetchError } from 'node-fetch';
9+
import fetch, { RequestInit, HeadersInit, Response, FetchError } from 'node-fetch';
1010
// @ts-expect-error TS7016: Could not find a declaration file for module
1111
import InputBufferUnderrunError from 'thrift/lib/nodejs/lib/thrift/input_buffer_underrun_error';
1212

@@ -48,7 +48,7 @@ type ThriftClient = {
4848
export default class ThriftHttpConnection extends EventEmitter {
4949
private readonly url: string;
5050

51-
private readonly config: RequestInit;
51+
private config: RequestInit;
5252

5353
// This field is used by Thrift internally, so name and type are important
5454
private readonly transport: TTransportType;
@@ -67,6 +67,13 @@ export default class ThriftHttpConnection extends EventEmitter {
6767
this.protocol = options.protocol ?? TBinaryProtocol;
6868
}
6969

70+
public setHeaders(headers: HeadersInit) {
71+
this.config = {
72+
...this.config,
73+
headers,
74+
};
75+
}
76+
7077
public write(data: Buffer, seqId: number) {
7178
const requestConfig: RequestInit = {
7279
...this.config,
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { HttpHeaders } from 'thrift';
1+
import { HeadersInit } from 'node-fetch';
22

33
export default interface IAuthentication {
4-
authenticate(): Promise<HttpHeaders>;
4+
authenticate(): Promise<HeadersInit>;
55
}
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
import IConnectionOptions from './IConnectionOptions';
2-
import IThriftConnection from './IThriftConnection';
1+
import { HeadersInit } from 'node-fetch';
32

43
export default interface IConnectionProvider {
5-
connect(options: IConnectionOptions): Promise<IThriftConnection>;
4+
getThriftConnection(): Promise<any>;
5+
6+
setHeaders(headers: HeadersInit): void;
67
}

lib/connection/contracts/IThriftConnection.ts

Lines changed: 0 additions & 5 deletions
This file was deleted.

0 commit comments

Comments
 (0)