-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathperformance-observer.ts
More file actions
402 lines (363 loc) · 13.5 KB
/
performance-observer.ts
File metadata and controls
402 lines (363 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
import {
type PerformanceEntry,
PerformanceObserver,
performance,
} from 'node:perf_hooks';
import type { AppendableSink } from './wal.js';
/**
* Encoder that converts PerformanceEntry to domain events.
*
* Pure function that transforms performance entries into domain events.
* Should be stateless, synchronous, and have no side effects.
* Returns a readonly array of encoded items.
*/
export type PerformanceEntryEncoder<F> = (
entry: PerformanceEntry,
) => readonly F[];
/**
* Array of performance entry types that this observer monitors.
* Only 'mark' and 'measure' entries are tracked as they represent
* user-defined performance markers and measurements.
*/
const OBSERVED_TYPES = ['mark', 'measure'] as const;
type ObservedEntryType = 'mark' | 'measure';
const OBSERVED_TYPE_SET = new Set<ObservedEntryType>(OBSERVED_TYPES);
/**
* Converts an error to a performance mark name for debugging.
* @param error - The error that occurred
* @param entry - The performance entry that failed to encode
* @returns A mark name string
*/
function errorToPerfMark(error: unknown, entry: PerformanceEntry): string {
const errorName = error instanceof Error ? error.name : 'UnknownError';
const entryName = entry.name || 'unnamed';
return `encode-error:${errorName}:${entryName}`;
}
/**
* Default threshold for triggering queue flushes based on queue length.
* When the queue length reaches (maxQueueSize - flushThreshold),
* a flush is triggered to prevent overflow. This provides a buffer zone
* before hitting the maximum queue capacity.
*/
export const DEFAULT_FLUSH_THRESHOLD = 20;
/**
* Default maximum number of items allowed in the queue before entries are dropped.
* This acts as a memory safety limit to prevent unbounded memory growth
* in case of sink slowdown or high-frequency performance entries.
*/
export const DEFAULT_MAX_QUEUE_SIZE = 10_000;
/**
* Validates the flush threshold configuration to ensure sensible bounds.
*
* The flush threshold must be positive and cannot exceed the maximum queue size,
* as it represents a buffer zone within the queue capacity.
*
* @param flushThreshold - The threshold value to validate (must be > 0)
* @param maxQueueSize - The maximum queue size for comparison (flushThreshold <= maxQueueSize)
* @throws {Error} If flushThreshold is not positive or exceeds maxQueueSize
*/
export function validateFlushThreshold(
flushThreshold: number,
maxQueueSize: number,
): void {
if (flushThreshold <= 0) {
throw new Error('flushThreshold must be > 0');
}
if (flushThreshold > maxQueueSize) {
throw new Error('flushThreshold must be <= maxQueueSize');
}
}
/**
* Configuration options for the PerformanceObserverSink.
*
* @template T - The type of encoded performance data that will be written to the sink
*/
export type PerformanceObserverOptions<T> = {
/**
* The sink where encoded performance entries will be written.
* Must implement the AppendableSink interface for handling the encoded data.
*/
sink: AppendableSink<T>;
/**
* Function that encodes raw PerformanceEntry objects into domain-specific types.
* This transformer converts Node.js performance entries into application-specific data structures.
* Returns a readonly array of encoded items.
*/
encodePerfEntry: PerformanceEntryEncoder<T>;
/**
* Whether to enable buffered observation mode.
*
* When true, captures all performance marks and measures that exist in the Node.js
* performance buffer at the time `subscribe()` is called using `performance.getEntriesByType()`
* (the native `buffered` option is unreliable in Node.js).
*
* @default true
*/
captureBufferedEntries?: boolean;
/**
* Threshold for triggering queue flushes.
* Flushes occur in two scenarios:
* 1. When queue length reaches (maxQueueSize - flushThreshold)
* 2. When the number of items added since last flush reaches flushThreshold
* Larger values provide more buffer space before hitting capacity limits.
*
* @default DEFAULT_FLUSH_THRESHOLD (20)
*/
flushThreshold?: number;
/**
* Maximum number of items allowed in the queue before new entries are dropped.
* Acts as a memory safety limit to prevent unbounded growth during sink slowdown.
*
* @default DEFAULT_MAX_QUEUE_SIZE (10000)
*/
maxQueueSize?: number;
/**
* Whether debug mode is enabled for encode failures.
* When true, encode failures create performance marks for debugging.
*
*/
debug: boolean;
};
/**
* A sink implementation that observes Node.js performance entries and forwards them to a configurable sink.
*
* This class provides a buffered, memory-safe bridge between Node.js PerformanceObserver
* and application-specific data sinks. It handles performance entry encoding, queue management,
* and graceful degradation under high load conditions.
*
* Performance entries flow through the following lifecycle:
*
* - Queued in Memory 💾
* - Items stored in queue (`#queue`) until flushed
* - Queue limited by `maxQueueSize` to prevent unbounded growth
* - Items remain in queue if sink is closed during flush
*
* - Successfully Written 📤
* - Items written to sink and counted in `getStats().written`
* - Queue cleared after successful batch writes
*
* - Item Disposition Scenarios 💥
* - **Encode Failure**: ❌ Items lost when `encode()` throws. Creates perf mark if 'DEBUG' env var is set to 'true'.
* - **Sink Write Failure**: 💾 Items stay in queue when sink write fails during flush
* - **Sink Closed**: 💾 Items stay in queue when sink is closed during flush
* - **Proactive Flush Throws**: 💾 Items stay in queue when `flush()` throws during threshold check
* - **Final Flush Throws**: 💾 Items stay in queue when `flush()` throws at end of callback
* - **Buffered Flush Throws**: 💾 Items stay in queue when buffered entries flush fails
* - **Queue Overflow**: ❌ Items dropped when queue reaches `maxQueueSize`
*
* @template T - The type of encoded performance data written to the sink
* @implements {Observer} - Lifecycle management interface
* @implements {Buffered} - Queue statistics interface
*/
export class PerformanceObserverSink<T> {
/** Encoder function for transforming PerformanceEntry objects into domain types */
#encodePerfEntry: PerformanceEntryEncoder<T>;
/** Whether buffered observation mode is enabled */
#buffered: boolean;
/** Threshold for triggering flushes based on queue length proximity to max capacity */
#flushThreshold: number;
/** Maximum number of items allowed in queue before dropping new entries (hard memory limit) */
#maxQueueSize: number;
/** The target sink where encoded performance data is written */
#sink: AppendableSink<T>;
/** Node.js PerformanceObserver instance, undefined when not subscribed */
#observer: PerformanceObserver | undefined;
/** Bounded queue storing encoded performance items awaiting flush */
#queue: T[] = [];
/** Count of performance entries dropped due to queue overflow */
#dropped = 0;
/** Count of performance entries successfully written to sink */
#written = 0;
/** Number of items added to queue since last successful flush */
#addedSinceLastFlush = 0;
/** Whether debug mode is enabled for encode failures */
#debug: boolean;
private processPerformanceEntries(entries: PerformanceEntry[]) {
entries.forEach(entry => {
if (OBSERVED_TYPE_SET.has(entry.entryType as ObservedEntryType)) {
try {
const items = this.encode(entry);
items.forEach(item => {
// ❌ MAX QUEUE OVERFLOW
if (this.#queue.length >= this.#maxQueueSize) {
this.#dropped++; // Items are lost forever
return;
}
if (
this.#queue.length >=
this.#maxQueueSize - this.#flushThreshold
) {
this.flush();
}
this.#queue.push(item);
this.#addedSinceLastFlush++;
});
} catch (error) {
// ❌ Encode failure: item lost forever as user has to fix encode function.
this.#dropped++;
if (this.#debug) {
try {
performance.mark(errorToPerfMark(error, entry));
} catch {
// Ignore mark failures to prevent double errors
}
}
}
}
});
if (this.#addedSinceLastFlush >= this.#flushThreshold) {
this.flush();
}
}
/**
* Creates a new PerformanceObserverSink with the specified configuration.
*
* @param options - Configuration options for the performance observer sink
* @throws {Error} If flushThreshold validation fails (must be > 0 and <= maxQueueSize)
*/
constructor(options: PerformanceObserverOptions<T>) {
const {
encodePerfEntry,
sink,
captureBufferedEntries,
flushThreshold = DEFAULT_FLUSH_THRESHOLD,
maxQueueSize = DEFAULT_MAX_QUEUE_SIZE,
debug,
} = options;
this.#encodePerfEntry = encodePerfEntry;
this.#sink = sink;
this.#buffered = captureBufferedEntries ?? true;
this.#maxQueueSize = maxQueueSize;
validateFlushThreshold(flushThreshold, this.#maxQueueSize);
this.#flushThreshold = flushThreshold;
this.#debug = debug;
}
/**
* Returns whether debug mode is enabled for encode failures.
*
* Debug mode is configured via the `debug` option passed to the
* PerformanceObserverSink constructor. When enabled, encode failures
* are recorded as performance marks for debugging.
*
* @returns true if debug mode is enabled, false otherwise
*/
get debug(): boolean {
return this.#debug;
}
/**
* Returns current queue statistics for monitoring and debugging.
*
* Provides insight into the current state of the performance entry queue,
* useful for monitoring memory usage and processing throughput.
*
* @returns Object containing all states and entry counts
*/
getStats() {
return {
isSubscribed: this.isSubscribed(),
queued: this.#queue.length,
dropped: this.#dropped,
written: this.#written,
maxQueueSize: this.#maxQueueSize,
flushThreshold: this.#flushThreshold,
addedSinceLastFlush: this.#addedSinceLastFlush,
buffered: this.#buffered,
};
}
/**
* Encodes a raw PerformanceEntry using the configured encoder function.
*
* This method delegates to the user-provided encoder function, allowing
* transformation of Node.js performance entries into application-specific types.
*
* @param entry - The raw performance entry to encode
* @returns Readonly array of encoded items
*/
encode(entry: PerformanceEntry): readonly T[] {
return this.#encodePerfEntry(entry);
}
/**
* Starts observing performance entries and forwarding them to the sink.
*
* Creates a Node.js PerformanceObserver that monitors 'mark' and 'measure' entries.
* The observer uses a bounded queue with proactive flushing to manage memory usage.
* When buffered mode is enabled, existing entries are captured via `performance.getEntriesByType()` instead of the unreliable native `buffered` option.
* If the sink is closed, items stay in the queue until reopened.
*/
subscribe(): void {
if (this.#observer) {
return;
}
this.#observer = new PerformanceObserver(list => {
this.processPerformanceEntries(list.getEntries());
});
// Manually capture buffered entries instead of the unreliable native buffered option.
if (this.#buffered) {
const existingMarks = performance.getEntriesByType('mark');
const existingMeasures = performance.getEntriesByType('measure');
const allEntries = [...existingMarks, ...existingMeasures];
this.processPerformanceEntries(allEntries);
}
this.#observer.observe({
entryTypes: OBSERVED_TYPES,
// Note: buffered option intentionally omitted due to unreliability in Node.js.
});
}
/**
* Flushes all queued performance entries to the sink.
*
* Writes all currently queued encoded performance entries to the configured sink.
* If the sink is closed, flush is a no-op and items stay in the queue until reopened.
* The queue is always cleared after flush attempt, regardless of success or failure.
*/
flush(): void {
if (this.#queue.length === 0) {
return;
}
if (this.#sink.isClosed()) {
return;
}
// Process each item in queue
const failedItems: T[] = [];
this.#queue.forEach(item => {
try {
this.#sink.append(item);
this.#written++;
} catch {
failedItems.push(item);
}
});
// Clear queue but keep failed items for retry
this.#queue.length = 0;
this.#queue.push(...failedItems);
this.#addedSinceLastFlush = failedItems.length;
}
/**
* Stops observing performance entries and cleans up resources.
*
* Performs a final flush of any remaining queued entries, then disconnects
* the PerformanceObserver and releases all references.
*
* This method is idempotent - safe to call multiple times.
*/
unsubscribe(): void {
if (!this.#observer) {
return;
}
this.flush();
this.#addedSinceLastFlush = 0;
this.#observer.disconnect();
this.#observer = undefined;
}
/**
* Checks whether the performance observer is currently active.
*
* Returns true if the sink is subscribed and actively observing performance entries.
* This indicates that a PerformanceObserver instance exists and is connected.
*
* @returns true if currently subscribed and observing, false otherwise
*/
isSubscribed(): boolean {
return this.#observer !== undefined;
}
}