-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathperformance-observer.ts
More file actions
110 lines (92 loc) · 2.72 KB
/
performance-observer.ts
File metadata and controls
110 lines (92 loc) · 2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import {
type PerformanceEntry,
PerformanceObserver,
type PerformanceObserverEntryList,
performance,
} from 'node:perf_hooks';
import type { AppendableSink } from './wal.js';
const OBSERVED_TYPES = ['mark', 'measure'] as const;
type ObservedEntryType = 'mark' | 'measure';
export const DEFAULT_FLUSH_THRESHOLD = 20;
export type PerformanceObserverOptions<T> = {
sink: AppendableSink<T>;
encode: (entry: PerformanceEntry) => T[];
buffered?: boolean;
flushThreshold?: number;
};
export class PerformanceObserverSink<T> {
#encode: (entry: PerformanceEntry) => T[];
#buffered: boolean;
#flushThreshold: number;
#sink: AppendableSink<T>;
#observer: PerformanceObserver | undefined;
#pendingCount = 0;
// "cursor" per type: how many we already wrote from the global buffer
#written: Map<ObservedEntryType, number>;
constructor(options: PerformanceObserverOptions<T>) {
const { encode, sink, buffered, flushThreshold } = options;
this.#encode = encode;
this.#written = new Map<ObservedEntryType, number>(
OBSERVED_TYPES.map(t => [t, 0]),
);
this.#sink = sink;
this.#buffered = buffered ?? false;
this.#flushThreshold = flushThreshold ?? DEFAULT_FLUSH_THRESHOLD;
}
encode(entry: PerformanceEntry): T[] {
return this.#encode(entry);
}
subscribe(): void {
if (this.#observer) {
return;
}
// Only used to trigger the flush - it's not processing the entries, just counting them
this.#observer = new PerformanceObserver(
(list: PerformanceObserverEntryList) => {
const batchCount = OBSERVED_TYPES.reduce(
(n, t) => n + list.getEntriesByType(t).length,
0,
);
this.#pendingCount += batchCount;
if (this.#pendingCount >= this.#flushThreshold) {
this.flush();
}
},
);
this.#observer.observe({
entryTypes: OBSERVED_TYPES,
buffered: this.#buffered,
});
}
flush(): void {
if (!this.#observer) {
return;
}
OBSERVED_TYPES.forEach(t => {
const written = this.#written.get(t) ?? 0;
const fresh = performance.getEntriesByType(t).slice(written);
try {
fresh
.flatMap(entry => this.encode(entry))
.forEach(item => this.#sink.append(item));
this.#written.set(t, written + fresh.length);
} catch (error) {
throw new Error(
'PerformanceObserverSink failed to write items to sink.',
{ cause: error },
);
}
});
this.#pendingCount = 0;
}
unsubscribe(): void {
if (!this.#observer) {
return;
}
this.#observer?.disconnect();
this.#observer = undefined;
}
isSubscribed(): boolean {
return this.#observer !== undefined;
}
}