Skip to content

Commit 013f305

Browse files
committed
Add telemetry integration into driver components
This is part 6 of 7 in the telemetry implementation stack. Integration Points: - DBSQLClient: Telemetry lifecycle management and configuration - DBSQLOperation: Statement event emissions - DBSQLSession: Session ID propagation - CloudFetchResultHandler: Chunk download events - IDBSQLClient: ConnectionOptions override support DBSQLClient Integration: - initializeTelemetry(): Initialize all telemetry components - Feature flag check via FeatureFlagCache - Create TelemetryClientProvider, EventEmitter, MetricsAggregator, Exporter - Wire event listeners between emitter and aggregator - Cleanup on close(): Flush metrics, release clients, release feature flag context - Override support via ConnectionOptions.telemetryEnabled Event Emission Points: - connection.open: On successful openSession() with driver config - statement.start: In DBSQLOperation constructor - statement.complete: In DBSQLOperation.close() - cloudfetch.chunk: In CloudFetchResultHandler.downloadLink() - error: In DBSQLOperation.emitErrorEvent() with terminal classification Session ID Propagation: - DBSQLSession passes sessionId to DBSQLOperation constructor - All events include sessionId for correlation - Statement events include both sessionId and statementId Error Handling: - All telemetry code wrapped in try-catch - All exceptions logged at LogLevel.debug ONLY - Driver NEVER throws due to telemetry failures - Zero impact on driver operations Configuration Override: - ConnectionOptions.telemetryEnabled overrides config - Per-connection control for testing - Respects feature flag when override not specified Testing: - Integration test suite: 11 comprehensive E2E tests - Tests verify full telemetry flow: connection → statement → export - Tests verify feature flag behavior - Tests verify driver works when telemetry fails - Tests verify no exceptions propagate Dependencies: - Builds on all previous layers [1/7] through [5/7] - Completes the telemetry data flow pipeline
1 parent 44185e4 commit 013f305

File tree

6 files changed

+710
-4
lines changed

6 files changed

+710
-4
lines changed

lib/DBSQLClient.ts

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import thrift from 'thrift';
22
import Int64 from 'node-int64';
3+
import os from 'os';
34

45
import { EventEmitter } from 'events';
56
import TCLIService from '../thrift/TCLIService';
@@ -23,6 +24,14 @@ import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
2324
import DBSQLLogger from './DBSQLLogger';
2425
import CloseableCollection from './utils/CloseableCollection';
2526
import IConnectionProvider from './connection/contracts/IConnectionProvider';
27+
import FeatureFlagCache from './telemetry/FeatureFlagCache';
28+
import TelemetryClientProvider from './telemetry/TelemetryClientProvider';
29+
import TelemetryEventEmitter from './telemetry/TelemetryEventEmitter';
30+
import MetricsAggregator from './telemetry/MetricsAggregator';
31+
import DatabricksTelemetryExporter from './telemetry/DatabricksTelemetryExporter';
32+
import { CircuitBreakerRegistry } from './telemetry/CircuitBreaker';
33+
import { DriverConfiguration } from './telemetry/types';
34+
import driverVersion from './version';
2635

2736
function prependSlash(str: string): string {
2837
if (str.length > 0 && str.charAt(0) !== '/') {
@@ -67,6 +76,19 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
6776

6877
private readonly sessions = new CloseableCollection<DBSQLSession>();
6978

79+
// Telemetry components (instance-based, NOT singletons)
80+
private host?: string;
81+
82+
private featureFlagCache?: FeatureFlagCache;
83+
84+
private telemetryClientProvider?: TelemetryClientProvider;
85+
86+
private telemetryEmitter?: TelemetryEventEmitter;
87+
88+
private telemetryAggregator?: MetricsAggregator;
89+
90+
private circuitBreakerRegistry?: CircuitBreakerRegistry;
91+
7092
private static getDefaultLogger(): IDBSQLLogger {
7193
if (!this.defaultLogger) {
7294
this.defaultLogger = new DBSQLLogger();
@@ -93,6 +115,15 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
93115
cloudFetchSpeedThresholdMBps: 0.1,
94116

95117
useLZ4Compression: true,
118+
119+
// Telemetry defaults
120+
telemetryEnabled: false, // Initially disabled for safe rollout
121+
telemetryBatchSize: 100,
122+
telemetryFlushIntervalMs: 5000,
123+
telemetryMaxRetries: 3,
124+
telemetryAuthenticatedExport: true,
125+
telemetryCircuitBreakerThreshold: 5,
126+
telemetryCircuitBreakerTimeout: 60000, // 1 minute
96127
};
97128
}
98129

@@ -151,6 +182,124 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
151182
return new HttpConnection(this.getConnectionOptions(options), this);
152183
}
153184

185+
/**
186+
* Extract workspace ID from hostname.
187+
* @param host - The host string (e.g., "workspace-id.cloud.databricks.com")
188+
* @returns Workspace ID or host if extraction fails
189+
*/
190+
private extractWorkspaceId(host: string): string {
191+
// Extract workspace ID from hostname (first segment before first dot)
192+
const parts = host.split('.');
193+
return parts.length > 0 ? parts[0] : host;
194+
}
195+
196+
/**
197+
* Build driver configuration for telemetry reporting.
198+
* @returns DriverConfiguration object with current driver settings
199+
*/
200+
private buildDriverConfiguration(): DriverConfiguration {
201+
return {
202+
driverVersion,
203+
driverName: '@databricks/sql',
204+
nodeVersion: process.version,
205+
platform: process.platform,
206+
osVersion: os.release(),
207+
208+
// Feature flags
209+
cloudFetchEnabled: this.config.useCloudFetch ?? false,
210+
lz4Enabled: this.config.useLZ4Compression ?? false,
211+
arrowEnabled: this.config.arrowEnabled ?? false,
212+
directResultsEnabled: true, // Direct results always enabled
213+
214+
// Configuration values
215+
socketTimeout: this.config.socketTimeout ?? 0,
216+
retryMaxAttempts: this.config.retryMaxAttempts ?? 0,
217+
cloudFetchConcurrentDownloads: this.config.cloudFetchConcurrentDownloads ?? 0,
218+
};
219+
}
220+
221+
/**
222+
* Initialize telemetry components if enabled.
223+
* CRITICAL: All errors swallowed and logged at LogLevel.debug ONLY.
224+
* Driver NEVER throws exceptions due to telemetry.
225+
*/
226+
private async initializeTelemetry(): Promise<void> {
227+
if (!this.host) {
228+
return;
229+
}
230+
231+
try {
232+
// Create feature flag cache instance
233+
this.featureFlagCache = new FeatureFlagCache(this);
234+
this.featureFlagCache.getOrCreateContext(this.host);
235+
236+
// Check if telemetry enabled via feature flag
237+
const enabled = await this.featureFlagCache.isTelemetryEnabled(this.host);
238+
if (!enabled) {
239+
this.logger.log(LogLevel.debug, 'Telemetry disabled via feature flag');
240+
return;
241+
}
242+
243+
// Create telemetry components (all instance-based)
244+
this.telemetryClientProvider = new TelemetryClientProvider(this);
245+
this.telemetryEmitter = new TelemetryEventEmitter(this);
246+
247+
// Get or create telemetry client for this host (increments refCount)
248+
this.telemetryClientProvider.getOrCreateClient(this.host);
249+
250+
// Create circuit breaker registry and exporter
251+
this.circuitBreakerRegistry = new CircuitBreakerRegistry(this);
252+
const exporter = new DatabricksTelemetryExporter(this, this.host, this.circuitBreakerRegistry);
253+
this.telemetryAggregator = new MetricsAggregator(this, exporter);
254+
255+
// Wire up event listeners
256+
this.telemetryEmitter.on('telemetry.connection.open', (event) => {
257+
try {
258+
this.telemetryAggregator?.processEvent(event);
259+
} catch (error: any) {
260+
this.logger.log(LogLevel.debug, `Error processing connection.open event: ${error.message}`);
261+
}
262+
});
263+
264+
this.telemetryEmitter.on('telemetry.statement.start', (event) => {
265+
try {
266+
this.telemetryAggregator?.processEvent(event);
267+
} catch (error: any) {
268+
this.logger.log(LogLevel.debug, `Error processing statement.start event: ${error.message}`);
269+
}
270+
});
271+
272+
this.telemetryEmitter.on('telemetry.statement.complete', (event) => {
273+
try {
274+
this.telemetryAggregator?.processEvent(event);
275+
} catch (error: any) {
276+
this.logger.log(LogLevel.debug, `Error processing statement.complete event: ${error.message}`);
277+
}
278+
});
279+
280+
this.telemetryEmitter.on('telemetry.cloudfetch.chunk', (event) => {
281+
try {
282+
this.telemetryAggregator?.processEvent(event);
283+
} catch (error: any) {
284+
this.logger.log(LogLevel.debug, `Error processing cloudfetch.chunk event: ${error.message}`);
285+
}
286+
});
287+
288+
this.telemetryEmitter.on('telemetry.error', (event) => {
289+
try {
290+
this.telemetryAggregator?.processEvent(event);
291+
} catch (error: any) {
292+
this.logger.log(LogLevel.debug, `Error processing error event: ${error.message}`);
293+
}
294+
});
295+
296+
this.logger.log(LogLevel.debug, 'Telemetry initialized successfully');
297+
} catch (error: any) {
298+
// Swallow all telemetry initialization errors
299+
this.logger.log(LogLevel.debug, `Telemetry initialization failed: ${error.message}`);
300+
}
301+
}
302+
154303
/**
155304
* Connects DBSQLClient to endpoint
156305
* @public
@@ -172,11 +321,19 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
172321
}
173322
}
174323

324+
// Store host for telemetry
325+
this.host = options.host;
326+
175327
// Store enableMetricViewMetadata configuration
176328
if (options.enableMetricViewMetadata !== undefined) {
177329
this.config.enableMetricViewMetadata = options.enableMetricViewMetadata;
178330
}
179331

332+
// Override telemetry config if provided in options
333+
if (options.telemetryEnabled !== undefined) {
334+
this.config.telemetryEnabled = options.telemetryEnabled;
335+
}
336+
180337
this.authProvider = this.createAuthProvider(options, authProvider);
181338

182339
this.connectionProvider = this.createConnectionProvider(options);
@@ -210,6 +367,11 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
210367
this.emit('timeout');
211368
});
212369

370+
// Initialize telemetry if enabled
371+
if (this.config.telemetryEnabled) {
372+
await this.initializeTelemetry();
373+
}
374+
213375
return this;
214376
}
215377

@@ -245,12 +407,52 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
245407
serverProtocolVersion: response.serverProtocolVersion,
246408
});
247409
this.sessions.add(session);
410+
411+
// Emit connection.open telemetry event
412+
if (this.telemetryEmitter && this.host) {
413+
try {
414+
const workspaceId = this.extractWorkspaceId(this.host);
415+
const driverConfig = this.buildDriverConfiguration();
416+
this.telemetryEmitter.emitConnectionOpen({
417+
sessionId: session.id,
418+
workspaceId,
419+
driverConfig,
420+
});
421+
} catch (error: any) {
422+
// CRITICAL: All telemetry exceptions swallowed
423+
this.logger.log(LogLevel.debug, `Error emitting connection.open event: ${error.message}`);
424+
}
425+
}
426+
248427
return session;
249428
}
250429

251430
public async close(): Promise<void> {
252431
await this.sessions.closeAll();
253432

433+
// Cleanup telemetry
434+
if (this.host) {
435+
try {
436+
// Step 1: Flush any pending metrics
437+
if (this.telemetryAggregator) {
438+
await this.telemetryAggregator.flush();
439+
}
440+
441+
// Step 2: Release telemetry client (decrements ref count, closes if last)
442+
if (this.telemetryClientProvider) {
443+
await this.telemetryClientProvider.releaseClient(this.host);
444+
}
445+
446+
// Step 3: Release feature flag context (decrements ref count)
447+
if (this.featureFlagCache) {
448+
this.featureFlagCache.releaseContext(this.host);
449+
}
450+
} catch (error: any) {
451+
// Swallow all telemetry cleanup errors
452+
this.logger.log(LogLevel.debug, `Telemetry cleanup error: ${error.message}`);
453+
}
454+
}
455+
254456
this.client = undefined;
255457
this.connectionProvider = undefined;
256458
this.authProvider = undefined;

0 commit comments

Comments
 (0)