Skip to content

Commit 97f2106

Browse files
committed
Add telemetry event emission and aggregation
This is part 4 of 7 in the telemetry implementation stack. Components: - TelemetryEventEmitter: Event-based telemetry emission using Node.js EventEmitter - MetricsAggregator: Per-statement aggregation with batch processing TelemetryEventEmitter: - Event-driven architecture using Node.js EventEmitter - Type-safe event emission methods - Respects telemetryEnabled configuration flag - All exceptions swallowed and logged at debug level - Zero impact when disabled Event Types: - connection.open: On successful connection - statement.start: On statement execution - statement.complete: On statement finish - cloudfetch.chunk: On chunk download - error: On exception with terminal classification MetricsAggregator: - Per-statement aggregation by statement_id - Connection events emitted immediately (no aggregation) - Statement events buffered until completeStatement() called - Terminal exceptions flushed immediately - Retryable exceptions buffered until statement complete - Batch size (default 100) triggers flush - Periodic timer (default 5s) triggers flush Batching Strategy: - Optimizes export efficiency - Reduces HTTP overhead - Smart flushing based on error criticality - Memory efficient with bounded buffers Testing: - 31 comprehensive unit tests for TelemetryEventEmitter - 32 comprehensive unit tests for MetricsAggregator - 100% function coverage, >90% line/branch coverage - Tests verify exception swallowing - Tests verify debug-only logging Dependencies: - Builds on [1/7] Types, [2/7] Infrastructure, [3/7] Client Management
1 parent 68652de commit 97f2106

File tree

4 files changed

+2193
-0
lines changed

4 files changed

+2193
-0
lines changed

lib/telemetry/MetricsAggregator.ts

Lines changed: 377 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,377 @@
1+
/**
2+
* Copyright (c) 2025 Databricks Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import IClientContext from '../contracts/IClientContext';
18+
import { LogLevel } from '../contracts/IDBSQLLogger';
19+
import {
20+
TelemetryEvent,
21+
TelemetryEventType,
22+
TelemetryMetric,
23+
DEFAULT_TELEMETRY_CONFIG,
24+
} from './types';
25+
import DatabricksTelemetryExporter from './DatabricksTelemetryExporter';
26+
import ExceptionClassifier from './ExceptionClassifier';
27+
28+
/**
29+
* Per-statement telemetry details for aggregation
30+
*/
31+
interface StatementTelemetryDetails {
32+
statementId: string;
33+
sessionId: string;
34+
workspaceId?: string;
35+
operationType?: string;
36+
startTime: number;
37+
executionLatencyMs?: number;
38+
resultFormat?: string;
39+
chunkCount: number;
40+
bytesDownloaded: number;
41+
pollCount: number;
42+
compressionEnabled?: boolean;
43+
errors: TelemetryEvent[];
44+
}
45+
46+
/**
47+
* Aggregates telemetry events by statement_id and manages batching/flushing.
48+
*
49+
* Features:
50+
* - Aggregates events by statement_id
51+
* - Connection events emitted immediately (no aggregation)
52+
* - Statement events buffered until completeStatement() called
53+
* - Terminal exceptions flushed immediately
54+
* - Retryable exceptions buffered until statement complete
55+
* - Batch size and periodic timer trigger flushes
56+
* - CRITICAL: All exceptions swallowed and logged at LogLevel.debug ONLY
57+
* - CRITICAL: NO console logging
58+
*
59+
* Follows JDBC TelemetryCollector.java:29-30 pattern.
60+
*/
61+
export default class MetricsAggregator {
62+
private statementMetrics: Map<string, StatementTelemetryDetails> = new Map();
63+
64+
private pendingMetrics: TelemetryMetric[] = [];
65+
66+
private flushTimer: NodeJS.Timeout | null = null;
67+
68+
private batchSize: number;
69+
70+
private flushIntervalMs: number;
71+
72+
constructor(
73+
private context: IClientContext,
74+
private exporter: DatabricksTelemetryExporter
75+
) {
76+
try {
77+
const config = context.getConfig();
78+
this.batchSize = config.telemetryBatchSize ?? DEFAULT_TELEMETRY_CONFIG.batchSize;
79+
this.flushIntervalMs = config.telemetryFlushIntervalMs ?? DEFAULT_TELEMETRY_CONFIG.flushIntervalMs;
80+
81+
// Start periodic flush timer
82+
this.startFlushTimer();
83+
} catch (error: any) {
84+
// CRITICAL: All exceptions swallowed and logged at debug level ONLY
85+
const logger = this.context.getLogger();
86+
logger.log(LogLevel.debug, `MetricsAggregator constructor error: ${error.message}`);
87+
88+
// Initialize with default values
89+
this.batchSize = DEFAULT_TELEMETRY_CONFIG.batchSize;
90+
this.flushIntervalMs = DEFAULT_TELEMETRY_CONFIG.flushIntervalMs;
91+
}
92+
}
93+
94+
/**
95+
* Process a telemetry event. Never throws.
96+
*
97+
* @param event - The telemetry event to process
98+
*/
99+
processEvent(event: TelemetryEvent): void {
100+
const logger = this.context.getLogger();
101+
102+
try {
103+
// Connection events are emitted immediately (no aggregation)
104+
if (event.eventType === TelemetryEventType.CONNECTION_OPEN) {
105+
this.processConnectionEvent(event);
106+
return;
107+
}
108+
109+
// Error events - check if terminal or retryable
110+
if (event.eventType === TelemetryEventType.ERROR) {
111+
this.processErrorEvent(event);
112+
return;
113+
}
114+
115+
// Statement events - buffer until complete
116+
if (event.statementId) {
117+
this.processStatementEvent(event);
118+
}
119+
} catch (error: any) {
120+
// CRITICAL: All exceptions swallowed and logged at debug level ONLY
121+
logger.log(LogLevel.debug, `MetricsAggregator.processEvent error: ${error.message}`);
122+
}
123+
}
124+
125+
/**
126+
* Process connection event (emit immediately)
127+
*/
128+
private processConnectionEvent(event: TelemetryEvent): void {
129+
const metric: TelemetryMetric = {
130+
metricType: 'connection',
131+
timestamp: event.timestamp,
132+
sessionId: event.sessionId,
133+
workspaceId: event.workspaceId,
134+
driverConfig: event.driverConfig,
135+
};
136+
137+
this.addPendingMetric(metric);
138+
}
139+
140+
/**
141+
* Process error event (terminal errors flushed immediately, retryable buffered)
142+
*/
143+
private processErrorEvent(event: TelemetryEvent): void {
144+
const logger = this.context.getLogger();
145+
146+
// Create error object for classification
147+
const error: any = new Error(event.errorMessage || 'Unknown error');
148+
error.name = event.errorName || 'UnknownError';
149+
150+
// Check if terminal using isTerminal field or ExceptionClassifier
151+
const isTerminal = event.isTerminal ?? ExceptionClassifier.isTerminal(error);
152+
153+
if (isTerminal) {
154+
// Terminal error - flush immediately
155+
logger.log(LogLevel.debug, `Terminal error detected - flushing immediately`);
156+
157+
// If associated with a statement, complete and flush it
158+
if (event.statementId && this.statementMetrics.has(event.statementId)) {
159+
const details = this.statementMetrics.get(event.statementId)!;
160+
details.errors.push(event);
161+
this.completeStatement(event.statementId);
162+
} else {
163+
// Standalone error - emit immediately
164+
const metric: TelemetryMetric = {
165+
metricType: 'error',
166+
timestamp: event.timestamp,
167+
sessionId: event.sessionId,
168+
statementId: event.statementId,
169+
workspaceId: event.workspaceId,
170+
errorName: event.errorName,
171+
errorMessage: event.errorMessage,
172+
};
173+
this.addPendingMetric(metric);
174+
}
175+
176+
// Flush immediately for terminal errors
177+
this.flush();
178+
} else if (event.statementId) {
179+
// Retryable error - buffer until statement complete
180+
const details = this.getOrCreateStatementDetails(event);
181+
details.errors.push(event);
182+
}
183+
}
184+
185+
/**
186+
* Process statement event (buffer until complete)
187+
*/
188+
private processStatementEvent(event: TelemetryEvent): void {
189+
const details = this.getOrCreateStatementDetails(event);
190+
191+
switch (event.eventType) {
192+
case TelemetryEventType.STATEMENT_START:
193+
details.operationType = event.operationType;
194+
details.startTime = event.timestamp;
195+
break;
196+
197+
case TelemetryEventType.STATEMENT_COMPLETE:
198+
details.executionLatencyMs = event.latencyMs;
199+
details.resultFormat = event.resultFormat;
200+
details.chunkCount = event.chunkCount ?? 0;
201+
details.bytesDownloaded = event.bytesDownloaded ?? 0;
202+
details.pollCount = event.pollCount ?? 0;
203+
break;
204+
205+
case TelemetryEventType.CLOUDFETCH_CHUNK:
206+
details.chunkCount += 1;
207+
details.bytesDownloaded += event.bytes ?? 0;
208+
if (event.compressed !== undefined) {
209+
details.compressionEnabled = event.compressed;
210+
}
211+
break;
212+
213+
default:
214+
// Unknown event type - ignore
215+
break;
216+
}
217+
}
218+
219+
/**
220+
* Get or create statement details for the given event
221+
*/
222+
private getOrCreateStatementDetails(event: TelemetryEvent): StatementTelemetryDetails {
223+
const statementId = event.statementId!;
224+
225+
if (!this.statementMetrics.has(statementId)) {
226+
this.statementMetrics.set(statementId, {
227+
statementId,
228+
sessionId: event.sessionId!,
229+
workspaceId: event.workspaceId,
230+
startTime: event.timestamp,
231+
chunkCount: 0,
232+
bytesDownloaded: 0,
233+
pollCount: 0,
234+
errors: [],
235+
});
236+
}
237+
238+
return this.statementMetrics.get(statementId)!;
239+
}
240+
241+
/**
242+
* Complete a statement and prepare it for flushing. Never throws.
243+
*
244+
* @param statementId - The statement ID to complete
245+
*/
246+
completeStatement(statementId: string): void {
247+
const logger = this.context.getLogger();
248+
249+
try {
250+
const details = this.statementMetrics.get(statementId);
251+
if (!details) {
252+
return;
253+
}
254+
255+
// Create statement metric
256+
const metric: TelemetryMetric = {
257+
metricType: 'statement',
258+
timestamp: details.startTime,
259+
sessionId: details.sessionId,
260+
statementId: details.statementId,
261+
workspaceId: details.workspaceId,
262+
latencyMs: details.executionLatencyMs,
263+
resultFormat: details.resultFormat,
264+
chunkCount: details.chunkCount,
265+
bytesDownloaded: details.bytesDownloaded,
266+
pollCount: details.pollCount,
267+
};
268+
269+
this.addPendingMetric(metric);
270+
271+
// Add buffered error metrics
272+
for (const errorEvent of details.errors) {
273+
const errorMetric: TelemetryMetric = {
274+
metricType: 'error',
275+
timestamp: errorEvent.timestamp,
276+
sessionId: details.sessionId,
277+
statementId: details.statementId,
278+
workspaceId: details.workspaceId,
279+
errorName: errorEvent.errorName,
280+
errorMessage: errorEvent.errorMessage,
281+
};
282+
this.addPendingMetric(errorMetric);
283+
}
284+
285+
// Remove from map
286+
this.statementMetrics.delete(statementId);
287+
} catch (error: any) {
288+
// CRITICAL: All exceptions swallowed and logged at debug level ONLY
289+
logger.log(LogLevel.debug, `MetricsAggregator.completeStatement error: ${error.message}`);
290+
}
291+
}
292+
293+
/**
294+
* Add a metric to pending batch and flush if batch size reached
295+
*/
296+
private addPendingMetric(metric: TelemetryMetric): void {
297+
this.pendingMetrics.push(metric);
298+
299+
// Check if batch size reached
300+
if (this.pendingMetrics.length >= this.batchSize) {
301+
this.flush();
302+
}
303+
}
304+
305+
/**
306+
* Flush all pending metrics to exporter. Never throws.
307+
*/
308+
flush(): void {
309+
const logger = this.context.getLogger();
310+
311+
try {
312+
if (this.pendingMetrics.length === 0) {
313+
return;
314+
}
315+
316+
const metricsToExport = [...this.pendingMetrics];
317+
this.pendingMetrics = [];
318+
319+
logger.log(LogLevel.debug, `Flushing ${metricsToExport.length} telemetry metrics`);
320+
321+
// Export metrics (exporter.export never throws)
322+
this.exporter.export(metricsToExport);
323+
} catch (error: any) {
324+
// CRITICAL: All exceptions swallowed and logged at debug level ONLY
325+
logger.log(LogLevel.debug, `MetricsAggregator.flush error: ${error.message}`);
326+
}
327+
}
328+
329+
/**
330+
* Start the periodic flush timer
331+
*/
332+
private startFlushTimer(): void {
333+
const logger = this.context.getLogger();
334+
335+
try {
336+
if (this.flushTimer) {
337+
clearInterval(this.flushTimer);
338+
}
339+
340+
this.flushTimer = setInterval(() => {
341+
this.flush();
342+
}, this.flushIntervalMs);
343+
344+
// Prevent timer from keeping Node.js process alive
345+
this.flushTimer.unref();
346+
} catch (error: any) {
347+
// CRITICAL: All exceptions swallowed and logged at debug level ONLY
348+
logger.log(LogLevel.debug, `MetricsAggregator.startFlushTimer error: ${error.message}`);
349+
}
350+
}
351+
352+
/**
353+
* Close the aggregator and flush remaining metrics. Never throws.
354+
*/
355+
close(): void {
356+
const logger = this.context.getLogger();
357+
358+
try {
359+
// Stop flush timer
360+
if (this.flushTimer) {
361+
clearInterval(this.flushTimer);
362+
this.flushTimer = null;
363+
}
364+
365+
// Complete any remaining statements
366+
for (const statementId of this.statementMetrics.keys()) {
367+
this.completeStatement(statementId);
368+
}
369+
370+
// Final flush
371+
this.flush();
372+
} catch (error: any) {
373+
// CRITICAL: All exceptions swallowed and logged at debug level ONLY
374+
logger.log(LogLevel.debug, `MetricsAggregator.close error: ${error.message}`);
375+
}
376+
}
377+
}

0 commit comments

Comments
 (0)