Skip to content

Latest commit

 

History

History
2538 lines (2024 loc) · 83.9 KB

File metadata and controls

2538 lines (2024 loc) · 83.9 KB

Databricks Node.js SQL Driver: Event-Based Telemetry Design

Executive Summary

This document outlines an event-based telemetry design for the Databricks Node.js SQL driver that leverages Node.js's native EventEmitter infrastructure. The design is inspired by the production-tested patterns from the Databricks JDBC driver and adapted to Node.js idioms.

Key Objectives:

  • Collect driver usage metrics and export to Databricks telemetry service
  • Leverage Node.js EventEmitter for instrumentation
  • Maintain server-side feature flag control
  • Non-blocking, async operation using Promises
  • Privacy-first: No PII or query data collected

Design Principles:

  • Event-driven architecture: Use Node.js EventEmitter pattern
  • Single instrumentation point: Emit events at key driver operations
  • Non-blocking: All operations async with Promises
  • Privacy-first: No PII or query data collected
  • Server-controlled: Feature flag support for enable/disable

Production Requirements (from JDBC driver experience):

  • Feature flag caching: Per-host caching to avoid rate limiting
  • Circuit breaker: Protect against telemetry endpoint failures
  • 🚨 Exception swallowing: ALL telemetry exceptions caught and logged at LogLevel.debug ONLY (never warn/error)
  • Per-host telemetry client: One client per host to prevent rate limiting
  • Graceful shutdown: Proper cleanup with reference counting
  • Smart exception flushing: Only flush terminal exceptions immediately

Table of Contents

  1. Background & Motivation
  2. Architecture Overview
  3. Core Components
  4. Data Collection
  5. Export Mechanism
  6. Configuration
  7. Privacy & Compliance
  8. Error Handling
  9. Graceful Shutdown
  10. Testing Strategy
  11. Implementation Checklist
  12. Open Questions
  13. References

1. Background & Motivation

1.1 Current State

The Databricks Node.js SQL driver currently:

  • DBSQLClient: Main client class for connection management
  • DBSQLSession: Session management with operation tracking
  • DBSQLOperation: Statement execution and result handling
  • EventEmitter: Built-in Node.js event infrastructure
  • HttpConnection: HTTP-based Thrift communication

1.2 Design Opportunity

The driver needs comprehensive telemetry to:

  • Track driver usage patterns and performance metrics
  • Monitor CloudFetch adoption and effectiveness
  • Identify performance bottlenecks and optimization opportunities
  • Provide data for product decisions and customer support

1.3 The Approach

Event-driven telemetry collection:

  • ✅ Emit telemetry events at key driver operations
  • ✅ Aggregate metrics by statement ID
  • ✅ Export batched data to Databricks service
  • ✅ Maintain correlation between sessions and statements
  • ✅ Follow JDBC driver patterns (per-host clients, circuit breaker, etc.)

2. Architecture Overview

2.1 High-Level Architecture

graph TB
    A[Driver Operations] -->|Emit Events| B[TelemetryEventEmitter]
    B -->|Process Events| C[MetricsAggregator]
    C -->|Batch & Buffer| D[TelemetryClientManager]
    D -->|Get Per-Host Client| E[TelemetryClient per Host]
    E -->|Check Circuit Breaker| F[CircuitBreakerWrapper]
    F -->|HTTP POST| G[DatabricksTelemetryExporter]
    G --> H[Databricks Service]
    H --> I[Lumberjack]

    J[FeatureFlagCache per Host] -.->|Enable/Disable| B
    K[Connection Open] -->|Increment RefCount| D
    K -->|Increment RefCount| J
    L[Connection Close] -->|Decrement RefCount| D
    L -->|Decrement RefCount| J

    style B fill:#e1f5fe
    style C fill:#e1f5fe
    style D fill:#ffe0b2
    style E fill:#ffe0b2
    style F fill:#ffccbc
    style J fill:#c8e6c9
Loading

Key Components:

  1. TelemetryEventEmitter (new): Extends EventEmitter, emits events at key operations
  2. FeatureFlagCache (new): Per-host caching of feature flags with reference counting
  3. TelemetryClientManager (new): Manages one telemetry client per host with reference counting
  4. CircuitBreakerWrapper (new): Protects against failing telemetry endpoint
  5. MetricsAggregator (new): Aggregates by statement, batches events
  6. DatabricksTelemetryExporter (new): Exports to Databricks service

2.2 Event Flow

sequenceDiagram
    participant App as Application
    participant Client as DBSQLClient
    participant Session as DBSQLSession
    participant Op as DBSQLOperation
    participant Emitter as TelemetryEventEmitter
    participant Agg as MetricsAggregator
    participant Exp as TelemetryExporter
    participant Service as Databricks Service

    App->>Client: connect()
    Client->>Emitter: emit('connection.open', data)

    App->>Session: executeStatement()
    Session->>Op: execute()
    Op->>Emitter: emit('statement.start', data)

    Op->>Op: Download CloudFetch chunks
    Op->>Emitter: emit('cloudfetch.chunk', data)

    Op->>Emitter: emit('statement.complete', data)
    Emitter->>Agg: aggregateEvent(event)
    Agg->>Agg: Buffer by statement_id

    alt Batch threshold reached
        Agg->>Exp: flush(batch)
        Exp->>Service: POST /telemetry-ext
    end
Loading

3. Core Components

3.1 FeatureFlagCache (Per-Host)

Purpose: Cache feature flag values at the host level to avoid repeated API calls and rate limiting.

Location: lib/telemetry/FeatureFlagCache.ts

Rationale

  • Per-host caching: Feature flags cached by host (not per connection) to prevent rate limiting
  • Reference counting: Tracks number of connections per host for proper cleanup
  • Automatic expiration: Refreshes cached flags after TTL expires (15 minutes)
  • Thread-safe: Uses proper locking for concurrent access from multiple connections

Interface

// lib/telemetry/FeatureFlagCache.ts

import IClientContext from '../contracts/IClientContext';
import { LogLevel } from '../contracts/IDBSQLLogger';

/**
 * Context holding feature flag state for a specific host.
 */
interface FeatureFlagContext {
  telemetryEnabled?: boolean;
  lastFetched?: Date;
  refCount: number;
  cacheDuration: number; // 15 minutes in ms
}

/**
 * Manages feature flag cache per host.
 * Prevents rate limiting by caching feature flag responses.
 * Instance-based, stored in DBSQLClient.
 */
class FeatureFlagCache {
  private contexts: Map<string, FeatureFlagContext>;
  private readonly CACHE_DURATION_MS = 15 * 60 * 1000; // 15 minutes
  private readonly FEATURE_FLAG_NAME = 'databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForNodeJs';

  constructor(private context: IClientContext) {
    this.contexts = new Map();
  }

  /**
   * Gets or creates a feature flag context for the host.
   * Increments reference count.
   */
  getOrCreateContext(host: string): FeatureFlagContext {
    let ctx = this.contexts.get(host);
    if (!ctx) {
      ctx = {
        refCount: 0,
        cacheDuration: this.CACHE_DURATION_MS,
      };
      this.contexts.set(host, ctx);
    }
    ctx.refCount++;
    return ctx;
  }

  /**
   * Decrements reference count for the host.
   * Removes context when ref count reaches zero.
   */
  releaseContext(host: string): void {
    const ctx = this.contexts.get(host);
    if (ctx) {
      ctx.refCount--;
      if (ctx.refCount <= 0) {
        this.contexts.delete(host);
      }
    }
  }

  /**
   * Checks if telemetry is enabled for the host.
   * Uses cached value if available and not expired.
   */
  async isTelemetryEnabled(host: string): Promise<boolean> {
    const logger = this.context.getLogger();
    const ctx = this.contexts.get(host);

    if (!ctx) {
      return false;
    }

    const isExpired = !ctx.lastFetched || Date.now() - ctx.lastFetched.getTime() > ctx.cacheDuration;

    if (isExpired) {
      try {
        // Fetch feature flag from server
        ctx.telemetryEnabled = await this.fetchFeatureFlag(host);
        ctx.lastFetched = new Date();
      } catch (error: any) {
        // Log at debug level only
        logger.log(LogLevel.debug, `Error fetching feature flag: ${error.message}`);
      }
    }

    return ctx.telemetryEnabled ?? false;
  }

  private async fetchFeatureFlag(host: string): Promise<boolean> {
    const connectionProvider = await this.context.getConnectionProvider();
    // Implementation to fetch feature flag from server using connection provider
    // Returns true if enabled, false otherwise
    return false; // Placeholder
  }
}

export default FeatureFlagCache;

JDBC Reference: DatabricksDriverFeatureFlagsContextFactory.java:27 maintains per-compute (host) feature flag contexts with reference counting.


3.2 TelemetryClientProvider (Per-Host)

Purpose: Manage one telemetry client per host to prevent rate limiting from concurrent connections.

Location: lib/telemetry/TelemetryClientProvider.ts

Implementation Status: ✅ COMPLETED (Task 1.6)

Rationale

  • One client per host: Large customers open many parallel connections to the same host
  • Prevents rate limiting: Shared client batches events from all connections
  • Reference counting: Tracks active connections, only closes client when last connection closes
  • Thread-safe: Safe for concurrent access from multiple connections

Implementation Details

Key Features Implemented:

  • ✅ TelemetryClientProvider takes IClientContext in constructor
  • ✅ One TelemetryClient created per host with reference counting
  • ✅ Client shared across multiple connections to same host
  • ✅ Reference count increments on getOrCreateClient()
  • ✅ Reference count decrements on releaseClient()
  • ✅ Client closed only when refCount reaches zero
  • ✅ Client NOT closed while other connections exist
  • ✅ All logging at LogLevel.debug only via IDBSQLLogger
  • ✅ All exceptions swallowed with debug-level logging
  • ✅ Per-host client isolation
  • ✅ Comprehensive unit tests with 100% code coverage

Test Coverage:

  • 39 unit tests covering all functionality
  • 100% line coverage for both TelemetryClient and TelemetryClientProvider
  • 100% branch coverage

Test Scenarios:

  1. Provider creation and initialization
  2. One client per host creation and sharing
  3. Reference counting (increment/decrement)
  4. Client closure on zero refCount
  5. Client NOT closed while connections exist
  6. Per-host isolation
  7. Context passing to TelemetryClient
  8. Debug-level logging only
  9. Exception swallowing

Interface

// lib/telemetry/TelemetryClientProvider.ts

import IClientContext from '../contracts/IClientContext';
import TelemetryClient from './TelemetryClient';
import { TelemetryConfiguration } from './types';

/**
 * Holds a telemetry client and its reference count.
 */
interface TelemetryClientHolder {
  client: TelemetryClient;
  refCount: number;
}

/**
 * Manages one telemetry client per host.
 * Prevents rate limiting by sharing clients across connections.
 * Instance-based, stored in DBSQLClient.
 */
class TelemetryClientProvider {
  private clients: Map<string, TelemetryClientHolder>;

  constructor(private context: IClientContext) {
    this.clients = new Map();
  }

  /**
   * Gets or creates a telemetry client for the host.
   * Increments reference count.
   */
  getOrCreateClient(host: string): TelemetryClient {
    const config = this.context.getConfig();
    let holder = this.clients.get(host);

    if (!holder) {
      holder = {
        client: new TelemetryClient(this.context, host),
        refCount: 0,
      };
      this.clients.set(host, holder);
    }
    holder.refCount++;
    return holder.client;
  }

  /**
   * Decrements reference count for the host.
   * Closes and removes client when ref count reaches zero.
   */
  async releaseClient(host: string): Promise<void> {
    const holder = this.clients.get(host);
    if (holder) {
      holder.refCount--;
      if (holder.refCount <= 0) {
        await holder.client.close();
        this.clients.delete(host);
      }
    }
  }
}

export default TelemetryClientProvider;

JDBC Reference: TelemetryClientFactory.java:27 maintains ConcurrentHashMap<String, TelemetryClientHolder> with per-host clients and reference counting.


3.3 Circuit Breaker

Purpose: Implement circuit breaker pattern to protect against failing telemetry endpoint.

Location: lib/telemetry/CircuitBreaker.ts

Implementation Status: ✅ COMPLETED (Task 1.3)

Rationale

  • Endpoint protection: The telemetry endpoint itself may fail or become unavailable
  • Not just rate limiting: Protects against 5xx errors, timeouts, network failures
  • Resource efficiency: Prevents wasting resources on a failing endpoint
  • Auto-recovery: Automatically detects when endpoint becomes healthy again

States

  1. Closed: Normal operation, requests pass through
  2. Open: After threshold failures, all requests rejected immediately (drop events)
  3. Half-Open: After timeout, allows test requests to check if endpoint recovered

Implementation Details

Key Features Implemented:

  • ✅ Three-state circuit breaker (CLOSED, OPEN, HALF_OPEN)
  • ✅ Configurable failure threshold (default: 5 consecutive failures)
  • ✅ Configurable timeout period (default: 60 seconds)
  • ✅ Configurable success threshold in HALF_OPEN (default: 2 successes)
  • ✅ Per-host circuit breaker isolation via CircuitBreakerRegistry
  • ✅ All state transitions logged at LogLevel.debug via IDBSQLLogger
  • ✅ No console logging used
  • ✅ Comprehensive unit tests with 100% code coverage

Default Configuration:

{
  failureThreshold: 5,      // Open after 5 consecutive failures
  timeout: 60000,           // Stay open for 60 seconds (1 minute)
  successThreshold: 2,      // Close after 2 successes in HALF_OPEN
}

State Transition Logic:

  • CLOSED → OPEN: After failureThreshold consecutive failures
  • OPEN → HALF_OPEN: After timeout milliseconds
  • HALF_OPEN → CLOSED: After successThreshold consecutive successes
  • HALF_OPEN → OPEN: On any failure, resets to failure counting
  • Any state → CLOSED: On success (in CLOSED or after threshold in HALF_OPEN)

Interface

// lib/telemetry/CircuitBreaker.ts

export enum CircuitBreakerState {
  CLOSED = 'CLOSED',
  OPEN = 'OPEN',
  HALF_OPEN = 'HALF_OPEN',
}

export interface CircuitBreakerConfig {
  failureThreshold: number; // Open after N failures
  timeout: number; // Try again after N ms
  successThreshold: number; // Close after N successes
}

export const DEFAULT_CIRCUIT_BREAKER_CONFIG: CircuitBreakerConfig = {
  failureThreshold: 5,
  timeout: 60000, // 1 minute
  successThreshold: 2,
};

/**
 * Circuit breaker for telemetry exporter.
 */
export class CircuitBreaker {
  private state: CircuitBreakerState = CircuitBreakerState.CLOSED;
  private failureCount = 0;
  private successCount = 0;
  private nextAttempt?: Date;
  private readonly config: CircuitBreakerConfig;

  constructor(private context: IClientContext, config?: Partial<CircuitBreakerConfig>) {
    this.config = {
      ...DEFAULT_CIRCUIT_BREAKER_CONFIG,
      ...config,
    };
  }

  async execute<T>(operation: () => Promise<T>): Promise<T> {
    const logger = this.context.getLogger();

    // Check if circuit is open
    if (this.state === CircuitBreakerState.OPEN) {
      if (this.nextAttempt && Date.now() < this.nextAttempt.getTime()) {
        throw new Error('Circuit breaker OPEN');
      }
      // Timeout expired, transition to HALF_OPEN
      this.state = CircuitBreakerState.HALF_OPEN;
      this.successCount = 0;
      logger.log(LogLevel.debug, 'Circuit breaker transitioned to HALF_OPEN');
    }

    try {
      const result = await operation();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  getState(): CircuitBreakerState {
    return this.state;
  }

  getFailureCount(): number {
    return this.failureCount;
  }

  getSuccessCount(): number {
    return this.successCount;
  }

  private onSuccess(): void {
    const logger = this.context.getLogger();
    this.failureCount = 0;

    if (this.state === CircuitBreakerState.HALF_OPEN) {
      this.successCount++;
      logger.log(
        LogLevel.debug,
        `Circuit breaker success in HALF_OPEN (${this.successCount}/${this.config.successThreshold})`,
      );

      if (this.successCount >= this.config.successThreshold) {
        this.state = CircuitBreakerState.CLOSED;
        this.successCount = 0;
        this.nextAttempt = undefined;
        logger.log(LogLevel.debug, 'Circuit breaker transitioned to CLOSED');
      }
    }
  }

  private onFailure(): void {
    const logger = this.context.getLogger();
    this.failureCount++;
    this.successCount = 0;

    logger.log(LogLevel.debug, `Circuit breaker failure (${this.failureCount}/${this.config.failureThreshold})`);

    if (this.failureCount >= this.config.failureThreshold) {
      this.state = CircuitBreakerState.OPEN;
      this.nextAttempt = new Date(Date.now() + this.config.timeout);
      logger.log(LogLevel.debug, `Circuit breaker transitioned to OPEN (will retry after ${this.config.timeout}ms)`);
    }
  }
}

/**
 * Manages circuit breakers per host.
 * Ensures each host has its own isolated circuit breaker to prevent
 * failures on one host from affecting telemetry to other hosts.
 */
export class CircuitBreakerRegistry {
  private breakers: Map<string, CircuitBreaker>;

  constructor(private context: IClientContext) {
    this.breakers = new Map();
  }

  getCircuitBreaker(host: string, config?: Partial<CircuitBreakerConfig>): CircuitBreaker {
    let breaker = this.breakers.get(host);
    if (!breaker) {
      breaker = new CircuitBreaker(this.context, config);
      this.breakers.set(host, breaker);
      const logger = this.context.getLogger();
      logger.log(LogLevel.debug, `Created circuit breaker for host: ${host}`);
    }
    return breaker;
  }

  getAllBreakers(): Map<string, CircuitBreaker> {
    return new Map(this.breakers);
  }

  removeCircuitBreaker(host: string): void {
    this.breakers.delete(host);
    const logger = this.context.getLogger();
    logger.log(LogLevel.debug, `Removed circuit breaker for host: ${host}`);
  }

  clear(): void {
    this.breakers.clear();
  }
}

Test Coverage

Unit Tests (tests/unit/telemetry/CircuitBreaker.test.ts):

  • ✅ 32 test cases covering all functionality
  • ✅ 100% line coverage (61/61 lines)
  • ✅ 100% branch coverage (16/16 branches)

Test Scenarios:

  1. Initial state verification (CLOSED state, default config)
  2. State transitions: CLOSED → OPEN → HALF_OPEN → CLOSED
  3. Failure threshold configuration (default and custom)
  4. Timeout configuration (default and custom)
  5. Success threshold configuration (default and custom)
  6. Failure count reset on success
  7. Per-host circuit breaker isolation
  8. State transition logging at debug level
  9. No console logging verification
  10. CircuitBreakerRegistry host management

Test Stub (tests/unit/.stubs/CircuitBreakerStub.ts):

  • Simplified implementation for use in other component tests
  • Provides controllable state for testing dependent components

JDBC Reference: CircuitBreakerTelemetryPushClient.java:15 and CircuitBreakerManager.java:25


3.4 TelemetryEventEmitter

Purpose: Emit telemetry events at key driver operations using Node.js EventEmitter.

Location: lib/telemetry/TelemetryEventEmitter.ts

Interface

// lib/telemetry/TelemetryEventEmitter.ts

import { EventEmitter } from 'events';
import IClientContext from '../contracts/IClientContext';
import { LogLevel } from '../contracts/IDBSQLLogger';
import { TelemetryEvent } from './types';

/**
 * EventEmitter for driver telemetry.
 * Emits events at key driver operations.
 */
class TelemetryEventEmitter extends EventEmitter {
  private enabled: boolean;

  constructor(private context: IClientContext) {
    super();
    const config = context.getConfig();
    this.enabled = config.telemetryEnabled ?? true;
  }

  /**
   * Emit a connection open event.
   */
  emitConnectionOpen(data: { sessionId: string; workspaceId: string; driverConfig: any }): void {
    if (!this.enabled) return;

    const logger = this.context.getLogger();
    try {
      this.emit('telemetry.connection.open', {
        eventType: 'connection.open',
        timestamp: Date.now(),
        ...data,
      });
    } catch (error: any) {
      // Swallow all exceptions
      logger.log(LogLevel.debug, `Error emitting connection event: ${error.message}`);
    }
  }

  /**
   * Emit a statement start event.
   */
  emitStatementStart(data: { statementId: string; sessionId: string; operationType: string }): void {
    if (!this.enabled) return;

    try {
      this.emit('telemetry.statement.start', {
        eventType: 'statement.start',
        timestamp: Date.now(),
        ...data,
      });
    } catch (error: any) {
      logger.log(LogLevel.debug, `Error emitting statement start: ${error.message}`);
    }
  }

  /**
   * Emit a statement complete event.
   */
  emitStatementComplete(data: {
    statementId: string;
    sessionId: string;
    latencyMs: number;
    resultFormat?: string;
    chunkCount?: number;
    bytesDownloaded?: number;
    pollCount?: number;
  }): void {
    if (!this.enabled) return;

    const logger = this.context.getLogger();
    try {
      this.emit('telemetry.statement.complete', {
        eventType: 'statement.complete',
        timestamp: Date.now(),
        ...data,
      });
    } catch (error: any) {
      logger.log(LogLevel.debug, `Error emitting statement complete: ${error.message}`);
    }
  }

  /**
   * Emit a CloudFetch chunk download event.
   */
  emitCloudFetchChunk(data: {
    statementId: string;
    chunkIndex: number;
    latencyMs: number;
    bytes: number;
    compressed: boolean;
  }): void {
    if (!this.enabled) return;

    const logger = this.context.getLogger();
    try {
      this.emit('telemetry.cloudfetch.chunk', {
        eventType: 'cloudfetch.chunk',
        timestamp: Date.now(),
        ...data,
      });
    } catch (error: any) {
      logger.log(LogLevel.debug, `Error emitting cloudfetch chunk: ${error.message}`);
    }
  }

  /**
   * Emit an error event.
   */
  emitError(data: {
    statementId?: string;
    sessionId?: string;
    errorName: string;
    errorMessage: string;
    isTerminal: boolean;
  }): void {
    if (!this.enabled) return;

    const logger = this.context.getLogger();
    try {
      this.emit('telemetry.error', {
        eventType: 'error',
        timestamp: Date.now(),
        ...data,
      });
    } catch (error: any) {
      logger.log(LogLevel.debug, `Error emitting error event: ${error.message}`);
    }
  }
}

export default TelemetryEventEmitter;

3.5 MetricsAggregator

Purpose: Aggregate telemetry events into metrics suitable for Databricks telemetry.

Location: lib/telemetry/MetricsAggregator.ts

Key Design: Aggregates metrics by statement_id, with each aggregated event including both statement_id and session_id for correlation. This follows the JDBC driver pattern.

JDBC References:

  • TelemetryCollector.java:29-30 - Per-statement aggregation using ConcurrentHashMap<String, StatementTelemetryDetails>
  • TelemetryEvent.java:8-12 - Both session_id and sql_statement_id fields in exported events

Interface

// lib/telemetry/MetricsAggregator.ts

import IClientContext from '../contracts/IClientContext';
import { LogLevel } from '../contracts/IDBSQLLogger';
import { TelemetryEvent, TelemetryMetric } from './types';
import DatabricksTelemetryExporter from './DatabricksTelemetryExporter';

/**
 * Aggregated telemetry data for a statement.
 */
interface StatementTelemetryDetails {
  statementId: string;
  sessionId: string;
  operationType?: string;
  startTime: number;
  latencyMs?: number;
  resultFormat?: string;
  chunkCount: number;
  totalBytesDownloaded: number;
  pollCount: number;
  pollLatencyMs: number;
  exceptions: Error[];
}

/**
 * Aggregates metrics from events by statement_id and includes session_id.
 * Follows JDBC driver pattern: aggregation by statement, export with both IDs.
 */
class MetricsAggregator {
  private statements: Map<string, StatementTelemetryDetails>;
  private batch: TelemetryMetric[];
  private flushTimer?: NodeJS.Timeout;

  constructor(private context: IClientContext, private exporter: DatabricksTelemetryExporter) {
    this.statements = new Map();
    this.batch = [];
    this.startPeriodicFlush();
  }

  /**
   * Process a telemetry event.
   */
  processEvent(event: TelemetryEvent): void {
    try {
      switch (event.eventType) {
        case 'connection.open':
          this.handleConnectionOpen(event);
          break;
        case 'statement.start':
          this.handleStatementStart(event);
          break;
        case 'statement.complete':
          this.handleStatementComplete(event);
          break;
        case 'cloudfetch.chunk':
          this.handleCloudFetchChunk(event);
          break;
        case 'error':
          this.handleError(event);
          break;
      }
    } catch (error: any) {
      const logger = this.context.getLogger();
      logger.log(LogLevel.debug, `Error processing event: ${error.message}`);
    }
  }

  /**
   * Mark statement complete and emit aggregated metrics.
   */
  completeStatement(statementId: string, failed: boolean = false): void {
    const logger = this.context.getLogger();
    try {
      const details = this.statements.get(statementId);
      if (!details) return;

      // Create aggregated metric
      const metric: TelemetryMetric = {
        metricType: 'statement',
        timestamp: details.startTime,
        sessionId: details.sessionId,
        statementId: details.statementId,
        latencyMs: details.latencyMs,
        resultFormat: details.resultFormat,
        chunkCount: details.chunkCount,
        bytesDownloaded: details.totalBytesDownloaded,
        pollCount: details.pollCount,
      };

      this.addToBatch(metric);

      // Only flush exceptions if statement failed
      if (failed && details.exceptions.length > 0) {
        for (const error of details.exceptions) {
          this.emitErrorMetric(statementId, details.sessionId, error);
        }
      }

      this.statements.delete(statementId);
    } catch (error: any) {
      logger.log(LogLevel.debug, `Error completing statement: ${error.message}`);
    }
  }

  /**
   * Flush all pending metrics.
   */
  async flush(): Promise<void> {
    const logger = this.context.getLogger();
    try {
      if (this.batch.length > 0) {
        const toFlush = [...this.batch];
        this.batch = [];
        await this.exporter.export(toFlush);
      }
    } catch (error: any) {
      logger.log(LogLevel.debug, `Error flushing metrics: ${error.message}`);
    }
  }

  /**
   * Close the aggregator and flush pending metrics.
   */
  async close(): Promise<void> {
    if (this.flushTimer) {
      clearInterval(this.flushTimer);
    }
    await this.flush();
  }

  private handleConnectionOpen(event: TelemetryEvent): void {
    // Connection events are emitted immediately (no aggregation)
    const metric: TelemetryMetric = {
      metricType: 'connection',
      timestamp: event.timestamp,
      sessionId: event.sessionId,
      driverConfig: event.driverConfig,
    };
    this.addToBatch(metric);
  }

  private handleStatementStart(event: TelemetryEvent): void {
    // Create new statement context for aggregation
    this.statements.set(event.statementId!, {
      statementId: event.statementId!,
      sessionId: event.sessionId!,
      operationType: event.operationType,
      startTime: event.timestamp,
      chunkCount: 0,
      totalBytesDownloaded: 0,
      pollCount: 0,
      pollLatencyMs: 0,
      exceptions: [],
    });
  }

  private handleStatementComplete(event: TelemetryEvent): void {
    const details = this.statements.get(event.statementId!);
    if (details) {
      details.latencyMs = event.latencyMs;
      details.resultFormat = event.resultFormat;
      details.pollCount = event.pollCount || 0;
    }
  }

  private handleCloudFetchChunk(event: TelemetryEvent): void {
    const details = this.statements.get(event.statementId!);
    if (details) {
      details.chunkCount++;
      details.totalBytesDownloaded += event.bytes || 0;
    }
  }

  private handleError(event: TelemetryEvent): void {
    if (event.isTerminal) {
      // Terminal exceptions: flush immediately
      this.emitErrorMetric(event.statementId || '', event.sessionId || '', new Error(event.errorMessage));
    } else {
      // Retryable exceptions: buffer until statement completes
      const details = this.statements.get(event.statementId!);
      if (details) {
        details.exceptions.push(new Error(event.errorMessage));
      }
    }
  }

  private emitErrorMetric(statementId: string, sessionId: string, error: Error): void {
    const metric: TelemetryMetric = {
      metricType: 'error',
      timestamp: Date.now(),
      statementId,
      sessionId,
      errorName: error.name,
      errorMessage: error.message,
    };
    this.addToBatch(metric);
  }

  private addToBatch(metric: TelemetryMetric): void {
    const config = this.context.getConfig();
    const logger = this.context.getLogger();

    this.batch.push(metric);
    if (this.batch.length >= (config.telemetryBatchSize ?? 100)) {
      // Fire and forget - don't block on flush
      this.flush().catch((error) => {
        logger.log(LogLevel.debug, `Error in batch flush: ${error.message}`);
      });
    }
  }

  private startPeriodicFlush(): void {
    const config = this.context.getConfig();
    const logger = this.context.getLogger();

    this.flushTimer = setInterval(() => {
      this.flush().catch((error) => {
        logger.log(LogLevel.debug, `Error in periodic flush: ${error.message}`);
      });
    }, config.telemetryFlushIntervalMs ?? 5000);
  }
}

export default MetricsAggregator;

3.6 DatabricksTelemetryExporter

Purpose: Export aggregated metrics to Databricks telemetry service.

Location: lib/telemetry/DatabricksTelemetryExporter.ts

Interface

// lib/telemetry/DatabricksTelemetryExporter.ts

import IClientContext from '../contracts/IClientContext';
import { LogLevel } from '../contracts/IDBSQLLogger';
import { TelemetryMetric } from './types';
import { CircuitBreakerRegistry } from './CircuitBreaker';
import fetch from 'node-fetch';

/**
 * Exports telemetry metrics to Databricks service.
 */
class DatabricksTelemetryExporter {
  private circuitBreaker;

  constructor(
    private context: IClientContext,
    private host: string,
    private circuitBreakerRegistry: CircuitBreakerRegistry,
  ) {
    this.circuitBreaker = circuitBreakerRegistry.getCircuitBreaker(host);
  }

  /**
   * Export metrics to Databricks service. Never throws.
   */
  async export(metrics: TelemetryMetric[]): Promise<void> {
    if (metrics.length === 0) return;

    const logger = this.context.getLogger();

    try {
      await this.circuitBreaker.execute(async () => {
        await this.exportInternal(metrics);
      });
    } catch (error: any) {
      if (error.message === 'Circuit breaker OPEN') {
        logger.log(LogLevel.debug, 'Circuit breaker OPEN - dropping telemetry');
      } else {
        logger.log(LogLevel.debug, `Telemetry export error: ${error.message}`);
      }
    }
  }

  private async exportInternal(metrics: TelemetryMetric[]): Promise<void> {
    const config = this.context.getConfig();
    const authenticatedExport = config.telemetryAuthenticatedExport ?? true;

    const endpoint = authenticatedExport
      ? `https://${this.host}/telemetry-ext`
      : `https://${this.host}/telemetry-unauth`;

    // CRITICAL: Format payload to match JDBC TelemetryRequest with protoLogs
    const telemetryLogs = metrics.map((m) => this.toTelemetryLog(m));
    const protoLogs = telemetryLogs.map((log) => JSON.stringify(log));

    const payload = {
      uploadTime: Date.now(),
      items: [], // Required but unused
      protoLogs, // Array of JSON-stringified log objects
    };

    // Get authentication headers if using authenticated endpoint
    const authHeaders = authenticatedExport ? await this.context.getAuthHeaders() : {};

    const response = await fetch(endpoint, {
      method: 'POST',
      headers: {
        ...authHeaders,
        'Content-Type': 'application/json',
        'User-Agent': this.userAgent,
      },
      body: JSON.stringify(payload),
    });

    if (!response.ok) {
      throw new Error(`Telemetry export failed: ${response.status}`);
    }
  }

  private toTelemetryLog(metric: TelemetryMetric): any {
    const log = {
      frontend_log_event_id: this.generateUUID(),
      context: {
        client_context: {
          timestamp_millis: metric.timestamp,
          user_agent: this.userAgent,
        },
      },
      entry: {
        sql_driver_log: {
          session_id: metric.sessionId,
          sql_statement_id: metric.statementId,
        },
      },
    };

    // Add metric-specific fields based on type
    if (metric.metricType === 'connection' && metric.driverConfig) {
      log.entry.sql_driver_log.system_configuration = {
        driver_version: metric.driverConfig.driverVersion,
        driver_name: metric.driverConfig.driverName,
        runtime_name: 'Node.js',
        runtime_version: metric.driverConfig.nodeVersion,
        runtime_vendor: metric.driverConfig.runtimeVendor,
        os_name: metric.driverConfig.platform,
        os_version: metric.driverConfig.osVersion,
        os_arch: metric.driverConfig.osArch,
        locale_name: metric.driverConfig.localeName,
        char_set_encoding: metric.driverConfig.charSetEncoding,
        process_name: metric.driverConfig.processName,
      };
    } else if (metric.metricType === 'statement') {
      log.entry.sql_driver_log.operation_latency_ms = metric.latencyMs;

      if (metric.resultFormat || metric.chunkCount) {
        log.entry.sql_driver_log.sql_operation = {
          execution_result: metric.resultFormat,
        };

        if (metric.chunkCount && metric.chunkCount > 0) {
          log.entry.sql_driver_log.sql_operation.chunk_details = {
            total_chunks_present: metric.chunkCount,
            total_chunks_iterated: metric.chunkCount,
          };
        }
      }
    } else if (metric.metricType === 'error') {
      log.entry.sql_driver_log.error_info = {
        error_name: metric.errorName || 'UnknownError',
        stack_trace: metric.errorMessage || '',
      };
    }

    return log;
  }

  private generateUUID(): string {
    return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => {
      const r = (Math.random() * 16) | 0;
      const v = c === 'x' ? r : (r & 0x3) | 0x8;
      return v.toString(16);
    });
  }
}

export default DatabricksTelemetryExporter;

4. Data Collection

4.1 Telemetry Events

The driver emits events at key operations:

Event When Data Collected
connection.open Session opened session_id, workspace_id, driver config, latency_ms
statement.start Statement execution begins statement_id, session_id, operation_type
statement.complete Statement execution ends statement_id, latency, result_format, poll_count
cloudfetch.chunk CloudFetch chunk downloaded statement_id, chunk_index, latency, bytes
error Error occurs statement_id, error_name, error_message, is_terminal

4.2 Driver Configuration Data

Collected once per connection:

interface DriverConfiguration {
  driverVersion: string;
  driverName: string; // 'nodejs-sql-driver' (matches JDBC naming)
  nodeVersion: string;
  platform: string;
  osVersion: string;
  osArch: string; // Architecture (x64, arm64, etc.)
  runtimeVendor: string; // 'Node.js Foundation'
  localeName: string; // Locale (e.g., 'en_US')
  charSetEncoding: string; // Character encoding (e.g., 'UTF-8')
  processName: string; // Process name from process.title or script name
  authType: string; // Authentication type (access-token, databricks-oauth, custom)

  // Feature flags
  cloudFetchEnabled: boolean;
  lz4Enabled: boolean;
  arrowEnabled: boolean;
  directResultsEnabled: boolean;

  // Configuration values
  socketTimeout: number;
  retryMaxAttempts: number;
  cloudFetchConcurrentDownloads: number;
}

System Configuration Fields (matches JDBC implementation):

  • driverName: Always set to 'nodejs-sql-driver' to match JDBC driver naming convention
  • osArch: Obtained from os.arch() - reports CPU architecture (x64, arm64, ia32, etc.)
  • runtimeVendor: Always set to 'Node.js Foundation' (equivalent to JDBC's java.vendor)
  • localeName: Extracted from LANG environment variable in format language_country (e.g., en_US), defaults to en_US
  • charSetEncoding: Always 'UTF-8' (Node.js default encoding), equivalent to JDBC's Charset.defaultCharset()
  • processName: Obtained from process.title or extracted from process.argv[1] (script name), equivalent to JDBC's ProcessNameUtil.getProcessName()
  • authType: Authentication method used ('access-token', 'databricks-oauth', or 'custom'), exported as driver_connection_params.auth_type

Connection Parameters:

  • auth_type: Exported in driver_connection_params field for connection metrics, indicates authentication method used

4.3 Statement Metrics

Aggregated per statement:

interface StatementMetrics {
  statementId: string;
  sessionId: string;
  operationType: string;

  // Latency
  executionLatencyMs: number;
  pollCount: number;
  pollLatencyMs: number;

  // Result format (fetched from metadata before statement close)
  resultFormat: 'inline' | 'cloudfetch' | 'arrow';

  // CloudFetch metrics
  chunkCount?: number;
  totalBytesDownloaded?: number;
  compressionEnabled?: boolean;
}

Result Format Population: To ensure sql_operation is properly populated in telemetry logs, the driver fetches result set metadata before emitting the statement.complete event. This guarantees that resultFormat is available even if the user closes the statement immediately after execution without explicitly fetching results.

4.4 Privacy Considerations

Never Collected:

  • ❌ SQL query text
  • ❌ Query results or data values
  • ❌ Table/column names
  • ❌ User identities (only workspace ID)
  • ❌ Credentials or tokens

Always Collected:

  • ✅ Operation latency
  • ✅ Error codes and types
  • ✅ Feature flags (boolean settings)
  • ✅ Statement/session IDs (UUIDs)
  • ✅ Aggregated metrics (counts, sizes)

5. Export Mechanism

5.1 Export Flow

flowchart TD
    A[Event Emitted] --> B[MetricsAggregator]
    B -->|Buffer & Aggregate| C{Flush Trigger?}

    C -->|Batch Size| D[Create TelemetryMetric]
    C -->|Time Interval| D
    C -->|Connection Close| D

    D --> E[TelemetryExporter]
    E -->|Check Circuit Breaker| F{Circuit Open?}
    F -->|Yes| G[Drop Events]
    F -->|No| H[Serialize to JSON]

    H --> I{Authenticated?}
    I -->|Yes| J[POST /telemetry-ext]
    I -->|No| K[POST /telemetry-unauth]

    J --> L[Databricks Service]
    K --> L
    L --> M[Lumberjack]
Loading

5.2 Payload Format

CRITICAL: The Node.js driver uses the same payload format as JDBC with protoLogs (NOT frontend_logs).

Payload Structure

interface DatabricksTelemetryPayload {
  uploadTime: number; // Timestamp in milliseconds
  items: string[]; // Required but unused (empty array)
  protoLogs: string[]; // Array of JSON-stringified log objects
}

Example Payload

{
  "uploadTime": 1706634000000,
  "items": [],
  "protoLogs": [
    "{\"frontend_log_event_id\":\"550e8400-e29b-41d4-a716-446655440000\",\"context\":{\"client_context\":{\"timestamp_millis\":1706634000000,\"user_agent\":\"databricks-sql-nodejs/1.12.0\"}},\"entry\":{\"sql_driver_log\":{\"session_id\":\"01f0fd4d-2ed0-1469-bfee-b6c9c31cb586\",\"sql_statement_id\":null,\"system_configuration\":{\"driver_version\":\"1.12.0\",\"driver_name\":\"nodejs-sql-driver\",\"runtime_name\":\"Node.js\",\"runtime_version\":\"v22.16.0\",\"runtime_vendor\":\"Node.js Foundation\",\"os_name\":\"linux\",\"os_version\":\"5.4.0-1153-aws-fips\",\"os_arch\":\"x64\",\"locale_name\":\"en_US\",\"char_set_encoding\":\"UTF-8\",\"process_name\":\"node\"}}}}",
    "{\"frontend_log_event_id\":\"550e8400-e29b-41d4-a716-446655440001\",\"context\":{\"client_context\":{\"timestamp_millis\":1706634001000,\"user_agent\":\"databricks-sql-nodejs/1.12.0\"}},\"entry\":{\"sql_driver_log\":{\"session_id\":\"01f0fd4d-2ed0-1469-bfee-b6c9c31cb586\",\"sql_statement_id\":\"01f0fd4d-2ed0-1469-bfee-b6c9c31cb587\",\"operation_latency_ms\":123,\"sql_operation\":{\"execution_result\":\"arrow\",\"chunk_details\":{\"total_chunks_present\":5,\"total_chunks_iterated\":5}}}}}"
  ]
}

Log Object Structure

Each item in protoLogs is a JSON-stringified object with this structure:

interface DatabricksTelemetryLog {
  frontend_log_event_id: string; // UUID v4
  context: {
    client_context: {
      timestamp_millis: number;
      user_agent: string; // "databricks-sql-nodejs/<version>"
    };
  };
  entry: {
    sql_driver_log: {
      session_id?: string; // Session UUID
      sql_statement_id?: string; // Statement UUID (null for connection events)

      // Connection events only
      system_configuration?: {
        driver_version?: string; // e.g., "1.12.0"
        driver_name?: string; // "nodejs-sql-driver"
        runtime_name?: string; // "Node.js"
        runtime_version?: string; // e.g., "v22.16.0"
        runtime_vendor?: string; // "Node.js Foundation"
        os_name?: string; // e.g., "linux"
        os_version?: string; // e.g., "5.4.0-1153-aws-fips"
        os_arch?: string; // e.g., "x64"
        locale_name?: string; // e.g., "en_US"
        char_set_encoding?: string; // e.g., "UTF-8"
        process_name?: string; // e.g., "node"
      };

      // Statement events only
      operation_latency_ms?: number;
      sql_operation?: {
        execution_result?: string; // "inline" | "cloudfetch" | "arrow"
        chunk_details?: {
          total_chunks_present?: number;
          total_chunks_iterated?: number;
        };
      };

      // Error events only
      error_info?: {
        error_name: string;
        stack_trace: string;
      };
    };
  };
}

Key Points:

  • Each telemetry log is JSON-stringified before being added to protoLogs array
  • The items field is required but always empty
  • The uploadTime is the timestamp when the batch is being exported
  • Each log has a unique frontend_log_event_id (UUID v4)
  • Connection events have system_configuration populated with all driver metadata
  • Statement events have operation_latency_ms and optional sql_operation details
  • Error events have error_info with error name and message
  • The sql_statement_id is null for connection events

5.3 Batching Strategy

  • Batch size: Default 100 metrics
  • Flush interval: Default 5 seconds
  • Force flush: On connection close
  • Background flushing: Non-blocking with setInterval

5.4 Retry Strategy

  • Retryable errors: 429, 500, 502, 503, 504, network timeouts
  • Terminal errors: 400, 401, 403, 404
  • Max retries: 3 attempts
  • Backoff: Exponential with jitter (100ms - 1000ms)
  • Circuit breaker: Opens after 5 consecutive failures

6. Configuration

6.1 Configuration Model

// lib/telemetry/types.ts

export interface TelemetryEvent {
  eventType: string;
  timestamp: number;
  sessionId?: string;
  statementId?: string;
  // ... other event-specific fields
}

export interface TelemetryMetric {
  metricType: string;
  timestamp: number;
  sessionId?: string;
  statementId?: string;
  // ... other metric fields
}

6.2 Client Configuration

Telemetry configuration is added to ClientConfig (not ClientOptions), following the existing pattern for useCloudFetch, useLZ4Compression, etc.

// lib/contracts/IClientContext.ts

export interface ClientConfig {
  // ... existing fields

  useLZ4Compression: boolean;
  enableMetricViewMetadata?: boolean;

  // Telemetry configuration
  telemetryEnabled?: boolean;
  telemetryBatchSize?: number;
  telemetryFlushIntervalMs?: number;
  telemetryMaxRetries?: number;
  telemetryAuthenticatedExport?: boolean;
  telemetryCircuitBreakerThreshold?: number;
  telemetryCircuitBreakerTimeout?: number;
}

Configuration can be overridden via ConnectionOptions:

// lib/contracts/IDBSQLClient.ts

export type ConnectionOptions = {
  host: string;
  // ... existing fields

  // Optional telemetry overrides
  telemetryEnabled?: boolean;
} & AuthOptions;

6.3 Initialization

// In DBSQLClient.ts

import FeatureFlagCache from './telemetry/FeatureFlagCache';
import TelemetryClientProvider from './telemetry/TelemetryClientProvider';
import TelemetryEventEmitter from './telemetry/TelemetryEventEmitter';
import MetricsAggregator from './telemetry/MetricsAggregator';
import DatabricksTelemetryExporter from './telemetry/DatabricksTelemetryExporter';
import { CircuitBreakerRegistry } from './telemetry/CircuitBreaker';

export default class DBSQLClient extends EventEmitter implements IDBSQLClient, IClientContext {
  // ... existing fields

  // Telemetry components (instances, not singletons)
  private featureFlagCache?: FeatureFlagCache;
  private telemetryClientProvider?: TelemetryClientProvider;
  private telemetryEmitter?: TelemetryEventEmitter;
  private telemetryAggregator?: MetricsAggregator;
  private host?: string;

  private static getDefaultConfig(): ClientConfig {
    return {
      // ... existing config

      // Telemetry defaults
      telemetryEnabled: false, // Initially disabled for safe rollout
      telemetryBatchSize: 100,
      telemetryFlushIntervalMs: 5000,
      telemetryMaxRetries: 3,
      telemetryAuthenticatedExport: true,
      telemetryCircuitBreakerThreshold: 5,
      telemetryCircuitBreakerTimeout: 60000,
    };
  }

  async connect(options: ConnectionOptions): Promise<IDBSQLClient> {
    // ... existing connection logic

    // Store host for telemetry
    this.host = options.host;

    // Override telemetry config if provided in options
    if (options.telemetryEnabled !== undefined) {
      this.config.telemetryEnabled = options.telemetryEnabled;
    }

    // Initialize telemetry if enabled
    if (this.config.telemetryEnabled) {
      await this.initializeTelemetry();
    }

    return this;
  }

  private async initializeTelemetry(): Promise<void> {
    if (!this.host) return;

    try {
      // Create feature flag cache instance
      this.featureFlagCache = new FeatureFlagCache(this);
      this.featureFlagCache.getOrCreateContext(this.host);

      // Check if telemetry enabled via feature flag
      const enabled = await this.featureFlagCache.isTelemetryEnabled(this.host);
      if (!enabled) {
        this.logger.log(LogLevel.debug, 'Telemetry disabled via feature flag');
        return;
      }

      // Create telemetry components (all instance-based)
      this.telemetryClientProvider = new TelemetryClientProvider(this);
      this.telemetryEmitter = new TelemetryEventEmitter(this);

      const circuitBreakerRegistry = new CircuitBreakerRegistry();
      const exporter = new DatabricksTelemetryExporter(this, this.host, circuitBreakerRegistry);
      this.telemetryAggregator = new MetricsAggregator(this, exporter);

      // Wire up event listeners
      this.telemetryEmitter.on('telemetry.connection.open', (event) => {
        this.telemetryAggregator?.processEvent(event);
      });

      this.telemetryEmitter.on('telemetry.statement.start', (event) => {
        this.telemetryAggregator?.processEvent(event);
      });

      this.telemetryEmitter.on('telemetry.statement.complete', (event) => {
        this.telemetryAggregator?.processEvent(event);
      });

      this.telemetryEmitter.on('telemetry.cloudfetch.chunk', (event) => {
        this.telemetryAggregator?.processEvent(event);
      });

      this.telemetryEmitter.on('telemetry.error', (event) => {
        this.telemetryAggregator?.processEvent(event);
      });

      this.logger.log(LogLevel.info, 'Telemetry initialized');
    } catch (error: any) {
      // Swallow all telemetry initialization errors
      this.logger.log(LogLevel.debug, `Telemetry initialization failed: ${error.message}`);
    }
  }

  async close(): Promise<void> {
    // Cleanup telemetry
    if (this.host) {
      try {
        // Flush pending metrics
        if (this.telemetryAggregator) {
          await this.telemetryAggregator.flush();
        }

        // Release telemetry client
        if (this.telemetryClientProvider) {
          await this.telemetryClientProvider.releaseClient(this.host);
        }

        // Release feature flag context
        if (this.featureFlagCache) {
          this.featureFlagCache.releaseContext(this.host);
        }
      } catch (error: any) {
        this.logger.log(LogLevel.debug, `Telemetry cleanup error: ${error.message}`);
      }
    }

    // ... existing close logic
  }
}

6.5 Telemetry Export Lifecycle

This section clarifies when telemetry logs are exported during different lifecycle events.

Export Triggers

Telemetry export can be triggered by:

  1. Batch size threshold - When pending metrics reach configured batch size (default: 100)
  2. Periodic timer - Every flush interval (default: 5 seconds)
  3. Statement close - Completes statement aggregation, may trigger batch export if batch full
  4. Connection close - Final flush of all pending metrics
  5. Terminal error - Immediate flush for non-retryable errors

Statement Close (DBSQLOperation.close())

What happens:

// In DBSQLOperation.close()
try {
  // 1. Emit statement.complete event with latency and metrics
  this.telemetryEmitter.emitStatementComplete({
    statementId: this.statementId,
    sessionId: this.sessionId,
    latencyMs: Date.now() - this.startTime,
    resultFormat: this.resultFormat,
    chunkCount: this.chunkCount,
    bytesDownloaded: this.bytesDownloaded,
    pollCount: this.pollCount,
  });

  // 2. Mark statement complete in aggregator
  this.telemetryAggregator.completeStatement(this.statementId);
} catch (error: any) {
  // All exceptions swallowed
  logger.log(LogLevel.debug, `Error in telemetry: ${error.message}`);
}

Export behavior:

  • Statement metrics are aggregated and added to pending batch
  • Export happens ONLY if batch size threshold is reached
  • Otherwise, metrics remain buffered until next timer flush or connection close
  • Does NOT automatically export - just completes the aggregation

Connection Close (DBSQLClient.close())

What happens:

// In DBSQLClient.close()
try {
  // 1. Close aggregator (stops timer, completes statements, final flush)
  if (this.telemetryAggregator) {
    this.telemetryAggregator.close();
  }

  // 2. Release telemetry client (decrements ref count, closes if last)
  if (this.telemetryClientProvider) {
    await this.telemetryClientProvider.releaseClient(this.host);
  }

  // 3. Release feature flag context (decrements ref count)
  if (this.featureFlagCache) {
    this.featureFlagCache.releaseContext(this.host);
  }
} catch (error: any) {
  logger.log(LogLevel.debug, `Telemetry cleanup error: ${error.message}`);
}

Export behavior:

  • ALWAYS exports all pending metrics via aggregator.close()
  • Stops the periodic flush timer
  • Completes any incomplete statements in the aggregation map
  • Performs final flush to ensure no metrics are lost
  • Guarantees export of all buffered telemetry before connection closes

Aggregator.close() implementation:

// In MetricsAggregator.close()
close(): void {
  const logger = this.context.getLogger();

  try {
    // Step 1: Stop flush timer
    if (this.flushTimer) {
      clearInterval(this.flushTimer);
      this.flushTimer = null;
    }

    // Step 2: Complete any remaining statements
    for (const statementId of this.statementMetrics.keys()) {
      this.completeStatement(statementId);
    }

    // Step 3: Final flush
    this.flush();
  } catch (error: any) {
    logger.log(LogLevel.debug, `MetricsAggregator.close error: ${error.message}`);
  }
}

Process Exit (Node.js shutdown)

What happens:

  • NO automatic export if DBSQLClient.close() was not called
  • Telemetry is lost if process exits without proper cleanup
  • Best practice: Always call client.close() before exit

Recommended pattern:

const client = new DBSQLClient();

// Register cleanup on process exit
process.on('SIGINT', async () => {
  await client.close(); // Ensures final telemetry flush
  process.exit(0);
});

process.on('SIGTERM', async () => {
  await client.close(); // Ensures final telemetry flush
  process.exit(0);
});

Summary Table

Event Statement Aggregated Export Triggered Notes
Statement Close ✅ Yes ⚠️ Only if batch full Metrics buffered, not immediately exported
Batch Size Reached N/A ✅ Yes Automatic export when 100 metrics buffered
Periodic Timer N/A ✅ Yes Every 5 seconds (configurable)
Connection Close ✅ Yes (incomplete) ✅ Yes (guaranteed) Completes all statements, flushes all metrics
Process Exit ❌ No ❌ No Lost unless close() was called first
Terminal Error N/A ✅ Yes (immediate) Auth errors, 4xx errors flushed right away

Key Differences from JDBC

Node.js behavior:

  • Statement close does not automatically export (buffered until batch/timer/connection-close)
  • Connection close always exports all pending metrics
  • Process exit does not guarantee export (must call close() explicitly)

JDBC behavior:

  • Similar buffering and batch export strategy
  • JVM shutdown hooks provide more automatic cleanup
  • Connection close behavior is the same (guaranteed flush)

Recommendation: Always call client.close() in a finally block or using try-finally to ensure telemetry is exported before the process exits.


7. Privacy & Compliance

7.1 Data Privacy

Never Collected:

  • ❌ SQL query text (only statement ID)
  • ❌ Query results or data values
  • ❌ Table/column names from queries
  • ❌ User identities (only workspace ID)
  • ❌ Credentials or authentication tokens

Always Collected:

  • ✅ Operation latency
  • ✅ Error codes (not full stack traces with PII)
  • ✅ Feature flags (boolean settings)
  • ✅ Statement/session IDs (UUIDs)
  • ✅ Aggregated metrics (counts, bytes)

7.2 Compliance

  • GDPR: No personal data collected
  • CCPA: No personal information
  • SOC 2: All data encrypted in transit (HTTPS)
  • Data Residency: Uses regional control plane

8. Error Handling

8.1 Exception Swallowing Strategy

Core Principle: Every telemetry exception must be swallowed with minimal logging to avoid customer anxiety.

Rationale (from JDBC experience):

  • Customers become anxious when they see error logs, even if telemetry is non-blocking
  • Telemetry failures should never impact the driver's core functionality
  • Critical: Circuit breaker must catch errors before swallowing

Logging Policy - Silent by Default

Telemetry logging is kept as silent as possible to avoid noise in customer logs:

Startup Messages (LogLevel.debug):

  • Telemetry: enabled - When telemetry is successfully initialized
  • Telemetry: disabled - When feature flag disables telemetry

Shutdown Messages (LogLevel.debug):

  • Telemetry: closed - When telemetry client is closed

Error Messages (LogLevel.debug):

  • Telemetry initialization error: <message> - Only on initialization failures
  • Telemetry close error: <message> - Only on cleanup failures
  • Telemetry export error: <message> - Only on export failures
  • Circuit breaker OPEN - dropping telemetry - Only when circuit breaker opens

Never Logged:

  • Individual event emissions (connection.open, statement.start, etc.)
  • Metric flushing operations
  • Successful exports
  • Reference counting changes
  • Client creation/lifecycle events

Logging Levels

  • DEBUG (LogLevel.debug): All telemetry messages use this level
  • WARN/ERROR: Never used for telemetry - avoids customer anxiety

Exception Handling Pattern

// All telemetry operations wrapped in try-catch

try {
  // Telemetry operation
  this.telemetryEmitter.emitStatementComplete({ ... });
} catch (error) {
  // Swallow ALL exceptions - no logging unless critical
  logger.log(LogLevel.debug, `Telemetry export error: ${error.message}`);
}

8.2 Terminal vs Retryable Exceptions

Requirement: Do not flush exceptions immediately when they occur. Flush immediately only for terminal exceptions.

Exception Classification

Terminal Exceptions (flush immediately):

  • Authentication failures (401, 403)
  • Invalid SQL syntax errors
  • Permission denied errors
  • Resource not found errors (404)
  • Invalid request format errors (400)

Retryable Exceptions (buffer until statement completes):

  • Network timeouts
  • Connection errors
  • Rate limiting (429)
  • Service unavailable (503)
  • Internal server errors (500, 502, 504)

Exception Classifier

Implementation Status: ✅ COMPLETED (Task 1.4)

Location: lib/telemetry/ExceptionClassifier.ts

Test Coverage: 100% line coverage (17/17 lines), 100% branch coverage (29/29 branches)

Key Features Implemented:

  • ✅ Static isTerminal() method that identifies terminal (unrecoverable) exceptions
  • ✅ Static isRetryable() method that identifies retryable (transient) exceptions
  • ✅ Supports both statusCode and status properties for HTTP status codes
  • ✅ Identifies AuthenticationError class as terminal
  • ✅ Identifies RetryError class as retryable
  • ✅ Detects network timeouts by error name and message
  • ✅ Handles unknown error types gracefully (returns false for both methods)
  • ✅ No dependencies on other telemetry components
  • ✅ Comprehensive unit tests with 51 test cases

Terminal Exception Detection:

  • Authentication failures: AuthenticationError class
  • HTTP 401 Unauthorized
  • HTTP 403 Forbidden
  • HTTP 404 Not Found
  • HTTP 400 Bad Request

Retryable Exception Detection:

  • Retry errors: RetryError class
  • Network timeouts: By error name (TimeoutError) or message containing "timeout"
  • HTTP 429 Too Many Requests
  • HTTP 500 Internal Server Error
  • HTTP 502 Bad Gateway
  • HTTP 503 Service Unavailable
  • HTTP 504 Gateway Timeout

Usage Example:

import ExceptionClassifier from './telemetry/ExceptionClassifier';

// Check if error should be flushed immediately
if (ExceptionClassifier.isTerminal(error)) {
  // Flush immediately to telemetry
  this.emitErrorMetric(error);
} else if (ExceptionClassifier.isRetryable(error)) {
  // Buffer until statement completes
  this.bufferException(error);
}

Implementation Notes:

  • Uses instanceof checks for typed error classes (AuthenticationError, RetryError)
  • Checks both statusCode and status properties for flexibility with different HTTP clients
  • Prioritizes statusCode over status when both are present
  • Returns false for both methods when error type is unknown (fail-safe behavior)

9. Graceful Shutdown

Requirement: Every telemetry client must be closed gracefully. Maintain reference counting properly to determine when to close shared resources.

9.1 Shutdown Sequence

sequenceDiagram
    participant App as Application
    participant Client as DBSQLClient
    participant Manager as TelemetryClientManager
    participant TClient as TelemetryClient (shared)
    participant FFCache as FeatureFlagCache
    participant Agg as MetricsAggregator

    App->>Client: close()

    Client->>Agg: flush()
    Agg->>Agg: Flush pending metrics

    Client->>Manager: releaseClient(host)
    Manager->>Manager: Decrement RefCount

    alt RefCount == 0 (Last Connection)
        Manager->>TClient: close()
        TClient->>TClient: Flush pending events
        TClient->>TClient: Clear timers
    else RefCount > 0 (Other Connections Exist)
        Manager->>Manager: Keep client alive
    end

    Client->>FFCache: releaseContext(host)
    FFCache->>FFCache: Decrement RefCount

    alt RefCount == 0
        FFCache->>FFCache: Remove context
    else RefCount > 0
        FFCache->>FFCache: Keep context
    end
Loading

9.2 Connection Close Implementation

// In DBSQLClient.ts

async close(): Promise<void> {
  if (!this.host) return;

  try {
    // Step 1: Flush any pending metrics
    if (this.telemetryAggregator) {
      await this.telemetryAggregator.flush();
    }

    // Step 2: Release telemetry client (decrements ref count, closes if last)
    await TelemetryClientManager.getInstance().releaseClient(this.host);

    // Step 3: Release feature flag context (decrements ref count)
    FeatureFlagCache.getInstance().releaseContext(this.host);
  } catch (error) {
    // Swallow all exceptions per requirement
    console.debug('[TRACE] Error during telemetry cleanup:', error);
  }

  // Continue with normal connection cleanup
  await this.driver.close();
}

9.3 TelemetryClient Close Implementation

// In TelemetryClient.ts

class TelemetryClient {
  private flushTimer?: NodeJS.Timeout;

  async close(): Promise<void> {
    try {
      // Step 1: Clear flush timer
      if (this.flushTimer) {
        clearInterval(this.flushTimer);
        this.flushTimer = undefined;
      }

      // Step 2: Flush all pending metrics synchronously
      await this.aggregator.flush();
    } catch (error) {
      // Swallow per requirement
      console.debug('[TRACE] Error closing telemetry client:', error);
    }
  }
}

10. Testing Strategy

10.1 Unit Tests

TelemetryEventEmitter Tests:

  • emitter_emits_connection_open_event
  • emitter_emits_statement_events
  • emitter_swallows_exceptions
  • emitter_respects_enabled_flag

MetricsAggregator Tests:

  • aggregator_combines_events_by_statement_id
  • aggregator_emits_on_statement_complete
  • aggregator_handles_connection_event
  • aggregator_flushes_on_batch_size
  • aggregator_flushes_on_time_interval
  • aggregator_buffers_retryable_exceptions
  • aggregator_flushes_terminal_immediately

CircuitBreaker Tests:

  • circuit_breaker_opens_after_failures
  • circuit_breaker_closes_after_successes
  • circuit_breaker_per_host_isolation

FeatureFlagCache Tests:

  • cache_caches_per_host
  • cache_expires_after_15_minutes
  • cache_ref_counting_works

TelemetryClientManager Tests:

  • manager_one_client_per_host
  • manager_ref_counting_works
  • manager_closes_on_last_release

ExceptionClassifier Tests:

  • classifier_identifies_terminal
  • classifier_identifies_retryable

10.2 Integration Tests

End-to-End Tests:

  • e2e_connection_open_exported_successfully
  • e2e_statement_with_chunks_aggregated_correctly
  • e2e_error_captured_in_metrics
  • e2e_feature_flag_disabled_no_export
  • e2e_multiple_connections_share_client
  • e2e_circuit_breaker_stops_flushing_when_open
  • e2e_graceful_shutdown_last_connection_closes_client
  • e2e_terminal_exception_flushed_immediately
  • e2e_retryable_exception_buffered_until_complete

10.3 Performance Tests

Overhead Measurement:

  • telemetry_overhead_less_than_1_percent
  • event_emission_completes_under_one_microsecond

Compare:

  • Baseline: Driver without telemetry
  • With telemetry disabled: Should be ~0% overhead
  • With telemetry enabled: Should be < 1% overhead

11. Implementation Checklist

Phase 1: Feature Flag Cache & Per-Host Management

  • Create type definitions (lib/telemetry/types.ts) - COMPLETED
    • ✅ TelemetryConfiguration interface with all config fields
    • ✅ TelemetryEvent interface with eventType, timestamp, sessionId, statementId
    • ✅ TelemetryMetric interface for export payload
    • ✅ DriverConfiguration interface with driver metadata
    • ✅ StatementMetrics interface for per-statement aggregation
    • ✅ TelemetryEventType enum with 5 event types (CONNECTION_OPEN, STATEMENT_START, STATEMENT_COMPLETE, CLOUDFETCH_CHUNK, ERROR)
    • ✅ DEFAULT_TELEMETRY_CONFIG with default values
    • ✅ All interfaces properly exported and TypeScript compilation verified
  • Create FeatureFlagCache instance with per-host contexts - COMPLETED (Task 1.2)
  • Implement reference counting - COMPLETED (Task 1.2)
  • Add cache expiration logic (15 minute TTL) - COMPLETED (Task 1.2)
  • Implement feature flag fetch from server - COMPLETED (Task 1.2)
  • Create TelemetryClientProvider and TelemetryClient - COMPLETED (Task 1.6)
    • ✅ TelemetryClient class with host association
    • ✅ TelemetryClientProvider with per-host client management
    • ✅ TelemetryClientHolder interface with reference counting
    • ✅ getOrCreateClient() method with ref count increment
    • ✅ releaseClient() method with cleanup when refCount=0
    • ✅ Per-host client map implementation
    • ✅ All logging at LogLevel.debug via IDBSQLLogger
    • ✅ All exceptions swallowed with debug-level logging
  • Add unit tests - COMPLETED (Task 1.6)
    • ✅ 39 comprehensive test cases
    • ✅ 100% line coverage for both files
    • ✅ 100% branch coverage
    • ✅ Tests verify reference counting
    • ✅ Tests verify per-host isolation
    • ✅ Tests verify client sharing across connections
    • ✅ Tests verify cleanup on zero refCount

Phase 2: Circuit Breaker

  • Create CircuitBreaker class with state machine - COMPLETED (Task 1.3)
    • ✅ Implemented three-state circuit breaker (CLOSED, OPEN, HALF_OPEN)
    • ✅ Configurable failure threshold (default: 5)
    • ✅ Configurable timeout (default: 60 seconds)
    • ✅ Configurable success threshold (default: 2)
    • ✅ State transition logic implemented
    • execute() method wrapping operations
  • Create CircuitBreakerRegistry (per-host breakers) - COMPLETED (Task 1.3)
    • ✅ Per-host circuit breaker isolation
    • ✅ Lazy creation of circuit breakers
    • ✅ Host-specific configuration support
    • ✅ Registry management methods (getAllBreakers, removeCircuitBreaker, clear)
  • Configure failure thresholds and timeouts - COMPLETED (Task 1.3)
    • ✅ DEFAULT_CIRCUIT_BREAKER_CONFIG exported
    • ✅ Custom configuration via constructor parameter
  • Add DEBUG logging for state transitions - COMPLETED (Task 1.3)
    • ✅ All state transitions logged at LogLevel.debug
    • ✅ No console logging used
    • ✅ Uses IDBSQLLogger.log() exclusively
  • Add unit tests - COMPLETED (Task 1.3)
    • ✅ 32 comprehensive test cases
    • ✅ 100% line coverage (61/61 lines)
    • ✅ 100% branch coverage (16/16 branches)
    • ✅ All state transitions verified
    • ✅ Per-host isolation verified
    • ✅ Test stub created for integration testing

Phase 3: Exception Handling

  • Create ExceptionClassifier for terminal vs retryable - COMPLETED (Task 1.4)
    • ✅ Static isTerminal() method implemented
    • ✅ Static isRetryable() method implemented
    • ✅ Detects AuthenticationError as terminal
    • ✅ Detects HTTP status codes (400, 401, 403, 404 as terminal)
    • ✅ Detects HTTP status codes (429, 500, 502, 503, 504 as retryable)
    • ✅ Detects RetryError as retryable
    • ✅ Detects network timeouts as retryable
    • ✅ Handles unknown error types gracefully
    • ✅ No dependencies on other telemetry components
    • ✅ 51 comprehensive unit tests
    • ✅ 100% line coverage (17/17 lines)
    • ✅ 100% branch coverage (29/29 branches)
  • Update MetricsAggregator to buffer retryable exceptions - COMPLETED (Task 1.8)
  • Implement immediate flush for terminal exceptions - COMPLETED (Task 1.8)
  • Wrap all telemetry code in try-catch blocks - COMPLETED (All Tasks)
  • Replace all logging with TRACE/DEBUG levels only - COMPLETED (All Tasks)
  • Ensure circuit breaker sees exceptions before swallowing - COMPLETED (Task 1.7)

Phase 4: Core Implementation

  • Create TelemetryEventEmitter class - COMPLETED (Task 1.5)
    • ✅ Extends Node.js EventEmitter
    • ✅ Takes IClientContext in constructor
    • ✅ Reads telemetryEnabled from context.getConfig()
    • ✅ Five emit methods: emitConnectionOpen, emitStatementStart, emitStatementComplete, emitCloudFetchChunk, emitError
    • ✅ ALL methods wrapped in try-catch blocks
    • ✅ ALL exceptions logged at LogLevel.debug ONLY (never warn/error)
    • ✅ NO exceptions propagate to caller (100% swallowed)
    • ✅ NO console logging (only IDBSQLLogger)
    • ✅ Events not emitted when telemetryEnabled is false
    • ✅ Uses TelemetryEventType enum for event names
    • ✅ Comprehensive unit tests with 31 test cases
    • ✅ Full code coverage (all branches covered)
    • ✅ Tests verify exception swallowing, debug-only logging, no console logging
  • Create MetricsAggregator class (with exception buffering) - COMPLETED (Task 1.8)
    • ✅ Aggregates metrics by statement_id
    • ✅ Includes both statement_id and session_id in exports
    • ✅ Buffers retryable exceptions until statement complete
    • ✅ Flushes terminal exceptions immediately
    • ✅ Batch flushing on size threshold
    • ✅ Periodic flushing with configurable interval
    • ✅ Proper cleanup on close
    • ✅ Comprehensive unit tests with 32 test cases
    • ✅ 94.44% line coverage
    • ✅ Tests verify exception buffering and immediate terminal flush
  • Create DatabricksTelemetryExporter class - COMPLETED (Task 1.7)
    • ✅ Takes IClientContext, host, and CircuitBreakerRegistry in constructor
    • ✅ Exports to /api/2.0/sql/telemetry-ext (authenticated endpoint)
    • ✅ Exports to /api/2.0/sql/telemetry-unauth (unauthenticated endpoint)
    • ✅ Formats payload with workspace_id, session_id, sql_statement_id
    • ✅ Uses context.getConnectionProvider() for HTTP calls
    • ✅ Integrates with circuit breaker for endpoint protection
    • ✅ Retries on retryable errors (max from config)
    • ✅ Does not retry on terminal errors (400, 401, 403, 404)
    • ✅ Exponential backoff with jitter (100ms - 1000ms)
    • ✅ CRITICAL: All exceptions swallowed and logged at LogLevel.debug ONLY
    • ✅ CRITICAL: export() method NEVER throws (catches all exceptions)
    • ✅ CRITICAL: NO console logging
    • ✅ 24 comprehensive unit tests
    • ✅ 96.34% statement coverage, 84.61% branch coverage, 100% function coverage
    • ✅ Tests verify exception handling, circuit breaker integration, retry logic
    • ✅ Test stub created (TelemetryExporterStub.ts)
  • Create telemetry types (types.ts) - COMPLETED (Task 1.1)
  • Add event emission points to driver operations

Phase 5: Integration

  • Update DBSQLClient.connect() to use managers - COMPLETED (Task 2.4)
    • ✅ Added telemetryEnabled override to ConnectionOptions in IDBSQLClient.ts
    • ✅ Added private fields for telemetry components in DBSQLClient
    • ✅ Implemented initializeTelemetry() method with feature flag check
    • ✅ Created all telemetry component instances (NOT singletons)
    • ✅ Wired event listeners to aggregator
    • ✅ Reference counting increments via getOrCreateContext() and getOrCreateClient()
  • Implement graceful shutdown in DBSQLClient.close() - COMPLETED (Task 2.4)
    • ✅ Flush pending metrics via aggregator.flush()
    • ✅ Release telemetry client (decrements refCount)
    • ✅ Release feature flag context (decrements refCount)
    • ✅ All wrapped in try-catch with LogLevel.debug logging
  • Add configuration parsing from client options - COMPLETED (Task 2.4)
    • ✅ Override telemetry config from ConnectionOptions
    • ✅ Store host for per-host client management
  • Wire up feature flag cache - COMPLETED (Task 2.4)
    • ✅ Create FeatureFlagCache instance in initializeTelemetry()
    • ✅ Check isTelemetryEnabled() before creating other components
    • ✅ Increment/decrement reference counts properly

Phase 6: Instrumentation

  • Add connection.open event emission - COMPLETED (Task 2.5)
    • ✅ Emitted in DBSQLClient.openSession() after successful session creation
    • ✅ Includes sessionId, workspaceId (extracted from host), and driverConfig
    • ✅ Helper method extractWorkspaceId() extracts workspace ID from hostname
    • ✅ Helper method buildDriverConfiguration() builds complete driver config
    • ✅ All wrapped in try-catch with LogLevel.debug logging
  • Add statement.start/complete event emission - COMPLETED (Task 2.5)
    • ✅ statement.start emitted in DBSQLOperation constructor via emitStatementStart()
    • ✅ statement.complete emitted in DBSQLOperation.close() via emitStatementComplete()
    • ✅ Includes statementId, sessionId, latencyMs, resultFormat, pollCount
    • ✅ Tracks pollCount by incrementing in status() method
    • ✅ Tracks startTime for latency calculation
    • ✅ Calls telemetryAggregator.completeStatement() to finalize aggregation
    • ✅ sessionId passed from DBSQLSession.createOperation() to DBSQLOperation constructor
    • ✅ All wrapped in try-catch with LogLevel.debug logging
  • Add cloudfetch.chunk event emission - COMPLETED (Task 2.5)
    • ✅ Emitted in CloudFetchResultHandler.downloadLink() after each chunk download
    • ✅ Includes statementId, chunkIndex, latencyMs, bytes, compressed flag
    • ✅ chunkIndex tracked and incremented for each download
    • ✅ statementId passed from DBSQLOperation.getResultHandler() to CloudFetchResultHandler
    • ✅ Helper method emitCloudFetchChunk() handles emission
    • ✅ All wrapped in try-catch with LogLevel.debug logging
  • Add error event emission - COMPLETED (Task 2.5)
    • ✅ Helper method emitErrorEvent() in DBSQLOperation for error emission
    • ✅ Uses ExceptionClassifier to determine if error is terminal
    • ✅ Includes statementId, sessionId, errorName, errorMessage, isTerminal
    • ✅ Ready to be called from error handlers when exceptions occur
    • ✅ All wrapped in try-catch with LogLevel.debug logging
  • Test end-to-end flow - COMPLETED (Task 2.6)
    • ✅ All unit tests passing (226 tests)
    • ✅ Integration tests passing
    • ✅ End-to-end telemetry flow verified

Phase 7: Testing

  • Unit tests for all new components - COMPLETED (Task 2.6)
    • ✅ All telemetry components have comprehensive unit tests
    • ✅ 226 unit tests passing
    • ✅ 97.76% line coverage (exceeds >80% requirement)
    • ✅ 90.59% branch coverage
    • ✅ 100% function coverage
    • ✅ FeatureFlagCache: 29 tests, 100% coverage
    • ✅ TelemetryClientProvider: 31 tests, 100% coverage
    • ✅ TelemetryClient: 12 tests, 100% coverage
    • ✅ CircuitBreaker: 32 tests, 100% coverage
    • ✅ ExceptionClassifier: 51 tests, 100% coverage
    • ✅ TelemetryEventEmitter: 31 tests, 100% coverage
    • ✅ MetricsAggregator: 32 tests, 94.44% coverage
    • ✅ DatabricksTelemetryExporter: 24 tests, 96.34% coverage
  • Integration tests for DBSQLClient telemetry integration - COMPLETED (Task 2.4)
    • ✅ Test initialization when telemetryEnabled is true/false
    • ✅ Test feature flag is respected
    • ✅ Test client sharing across multiple connections
    • ✅ Test reference counting works correctly
    • ✅ Test cleanup on close
    • ✅ Test driver continues when telemetry fails
    • ✅ Test no exceptions propagate to application
    • ✅ Test configuration override via ConnectionOptions
    • ✅ Created tests/e2e/telemetry/telemetry-integration.test.ts
  • Test stubs created - COMPLETED (Task 2.6)
    • ✅ tests/unit/.stubs/CircuitBreakerStub.ts
    • ✅ tests/unit/.stubs/TelemetryExporterStub.ts
    • ✅ tests/unit/.stubs/ClientContextStub.ts (already existed)
  • CRITICAL test verifications - COMPLETED (Task 2.6)
    • ✅ ALL exceptions swallowed verified in all test files
    • ✅ ONLY LogLevel.debug used verified in all test files
    • ✅ NO console logging verified in all test files
    • ✅ Driver works when telemetry fails verified in integration tests
  • Integration tests for circuit breaker - COMPLETED (covered in unit tests)
  • Integration tests for graceful shutdown - COMPLETED (covered in telemetry-integration.test.ts)
  • Performance tests (overhead measurement) - DEFERRED (not critical for MVP)
  • Load tests with many concurrent connections - DEFERRED (not critical for MVP)

Phase 8: Documentation

  • Update README with telemetry configuration - COMPLETED (Task 4.3)
    • ✅ Added telemetry overview section to README.md
    • ✅ Included key features, data collection summary, and configuration examples
    • ✅ Added link to detailed docs/TELEMETRY.md
  • Document event types and data collected - COMPLETED (Task 4.3)
    • ✅ Comprehensive documentation of all 4 event types (connection, statement, cloudfetch, error)
    • ✅ Detailed field descriptions with JSON examples
    • ✅ Privacy considerations clearly documented
  • Add troubleshooting guide - COMPLETED (Task 4.3)
    • ✅ Common issues covered (telemetry not working, circuit breaker, debug logging)
    • ✅ Step-by-step debugging instructions
    • ✅ Log message examples
  • Update API documentation - COMPLETED (Task 4.3)
    • ✅ Configuration options table with descriptions
    • ✅ Multiple example configurations
    • ✅ FAQ section with 12 common questions

12. Open Questions

12.1 Event Naming Conventions

Question: Should we use a specific naming convention for telemetry events?

Recommendation: Use dot-notation with namespace prefix:

  • telemetry.connection.open
  • telemetry.statement.start
  • telemetry.statement.complete
  • telemetry.cloudfetch.chunk
  • telemetry.error

12.2 Statement Completion Detection

Question: How do we know when a statement is complete for aggregation?

Options:

  1. Explicit marker: Call completeStatement(id) explicitly (recommended)
  2. Timeout-based: Emit after N seconds of inactivity
  3. On close: When operation is closed

Recommendation: Use explicit marker for better control.

12.3 TypeScript Types

Question: Should we use strict TypeScript types for all telemetry interfaces?

Answer: Yes, use strict types to prevent errors and improve maintainability.


13. References

13.1 Related Documentation

13.2 Existing Code References

JDBC Driver (reference implementation):

  • TelemetryClient.java:15: Main telemetry client with batching and flush
  • TelemetryClientFactory.java:27: Per-host client management with reference counting
  • CircuitBreakerTelemetryPushClient.java:15: Circuit breaker wrapper
  • TelemetryHelper.java:60-71: Feature flag checking
  • DatabricksDriverFeatureFlagsContextFactory.java:27: Per-host feature flag cache
  • TelemetryCollector.java:29-30: Per-statement aggregation
  • TelemetryEvent.java:8-12: Both session_id and sql_statement_id in exported events

14. Proto Field Coverage

14.1 Implemented Fields

The Node.js driver implements the following fields from the OssSqlDriverTelemetryLog proto:

Top-level fields:

  • session_id - Session UUID for correlation
  • sql_statement_id - Statement UUID (filtered to exclude NIL UUID)
  • system_configuration - Complete driver and OS configuration
  • auth_type - Authentication type (pat, external-browser, oauth-m2m, custom)
  • operation_latency_ms - Operation execution time
  • error_info - Error details (name and stack trace)

driver_connection_params:

  • http_path - API endpoint path
  • socket_timeout - Connection timeout
  • enable_arrow - Arrow format flag
  • enable_direct_results - Direct results flag
  • enable_metric_view_metadata - Metric view metadata flag

sql_operation (SqlExecutionEvent):

  • statement_type - Operation type (EXECUTE_STATEMENT, LIST_CATALOGS, etc.)
  • is_compressed - Compression flag from CloudFetch
  • execution_result - Result format (INLINE_ARROW, INLINE_JSON, EXTERNAL_LINKS, COLUMNAR_INLINE)
  • chunk_details.total_chunks_present - Number of chunks
  • chunk_details.total_chunks_iterated - Number of chunks downloaded

14.2 Not Implemented Fields

The following proto fields are not currently implemented as they require additional instrumentation that is not present in the Node.js driver:

sql_operation fields:

  • chunk_id - Specific chunk identifier for failures (not tracked)
  • retry_count - Number of retry attempts (statement-level retries not tracked)
  • operation_detail (OperationDetail message):
    • n_operation_status_calls - Count of getOperationStatus calls
    • operation_status_latency_millis - Total latency of status calls
    • operation_type - Type of operation (redundant with statement_type)
    • is_internal_call - Whether operation is internal
  • result_latency (ResultLatency message):
    • result_set_ready_latency_millis - Time until first result available
    • result_set_consumption_latency_millis - Time to consume all results

chunk_details fields:

  • initial_chunk_latency_millis - Time to download first chunk
  • slowest_chunk_latency_millis - Maximum chunk download time
  • sum_chunks_download_time_millis - Total download time across all chunks

driver_connection_params fields: Most fields in DriverConnectionParameters are specific to JDBC/Java configurations and not applicable to the Node.js driver (proxy configuration, SSL settings, Azure/GCP specific settings, etc.). Only the fields listed in 14.1 are relevant and implemented.

Reason for exclusion: These fields require extensive instrumentation to track:

  • Per-operation status polling (operation_detail)
  • Result set consumption timing (result_latency)
  • Per-chunk download timing (chunk_details timing fields)
  • Statement-level retry tracking

Implementing these would add significant complexity to the driver's core execution paths. They can be added in future iterations if needed for specific debugging or optimization use cases.


Summary

This event-based telemetry design provides an efficient approach to collecting driver metrics by:

  1. Leveraging Node.js patterns: Uses native EventEmitter for instrumentation
  2. Following JDBC patterns: Per-host clients, circuit breaker, feature flag caching
  3. Non-blocking operation: All telemetry operations async and fire-and-forget
  4. Privacy-first: No PII or query data collected
  5. Production-ready: Exception swallowing, graceful shutdown, reference counting

Key Aggregation Pattern (following JDBC):

  • Aggregate by statement_id: Multiple events for the same statement are aggregated together
  • Include session_id in exports: Each exported event contains both statement_id and session_id
  • Enable multi-level correlation: Allows correlation at both statement and session levels

This design enables the Databricks Node.js SQL driver to collect valuable usage metrics while maintaining code simplicity, high performance, and compatibility with Node.js ecosystem.