Skip to content

Commit 74802eb

Browse files
Update typescript sdk to use v3 websocket api (#4784)
Reopening #4762 which was approved but merged prematurely.
1 parent f672ae2 commit 74802eb

8 files changed

Lines changed: 531 additions & 58 deletions

crates/bindings-typescript/src/lib/binary_writer.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ export default class BinaryWriter {
9393
this.offset += 1;
9494
}
9595

96+
writeBytes(value: Uint8Array): void {
97+
this.expandBuffer(value.length);
98+
new Uint8Array(this.buffer.buffer, this.offset, value.length).set(value);
99+
this.offset += value.length;
100+
}
101+
96102
writeI8(value: number): void {
97103
this.expandBuffer(1);
98104
this.view.setInt8(this.offset, value);

crates/bindings-typescript/src/sdk/db_connection_impl.ts

Lines changed: 145 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { ConnectionId, ProductBuilder, ProductType } from '../';
22
import { AlgebraicType, type ComparablePrimitive } from '../';
3-
import { BinaryReader } from '../';
4-
import { BinaryWriter } from '../';
3+
import BinaryReader from '../lib/binary_reader.ts';
4+
import BinaryWriter from '../lib/binary_writer.ts';
55
import {
66
BsatnRowList,
77
ClientMessage,
@@ -60,6 +60,18 @@ import type { ProceduresView } from './procedures.ts';
6060
import type { Values } from '../lib/type_util.ts';
6161
import type { TransactionUpdate } from './client_api/types.ts';
6262
import { InternalError, SenderError } from '../lib/errors.ts';
63+
import {
64+
normalizeWsProtocol,
65+
PREFERRED_WS_PROTOCOLS,
66+
V2_WS_PROTOCOL,
67+
V3_WS_PROTOCOL,
68+
type NegotiatedWsProtocol,
69+
} from './websocket_protocols';
70+
import {
71+
countClientMessagesForV3Frame,
72+
encodeClientMessagesV3,
73+
forEachServerMessageV3,
74+
} from './websocket_v3_frames.ts';
6375

6476
export {
6577
DbConnectionBuilder,
@@ -117,6 +129,9 @@ const CLIENT_MESSAGE_CALL_REDUCER_TAG =
117129
getClientMessageVariantTag('CallReducer');
118130
const CLIENT_MESSAGE_CALL_PROCEDURE_TAG =
119131
getClientMessageVariantTag('CallProcedure');
132+
// Keep individual v3 frames bounded so one burst does not monopolize the send
133+
// path or create very large websocket writes.
134+
const MAX_V3_OUTBOUND_FRAME_BYTES = 256 * 1024;
120135

121136
export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
122137
implements DbContext<RemoteModule>
@@ -172,6 +187,8 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
172187
#inboundQueueOffset = 0;
173188
#isDrainingInboundQueue = false;
174189
#outboundQueue: Uint8Array[] = [];
190+
#isOutboundFlushScheduled = false;
191+
#negotiatedWsProtocol: NegotiatedWsProtocol = V2_WS_PROTOCOL;
175192
#subscriptionManager = new SubscriptionManager<RemoteModule>();
176193
#remoteModule: RemoteModule;
177194
#reducerCallbacks = new Map<
@@ -198,6 +215,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
198215
#sourceNameToTableDef: Record<string, Values<RemoteModule['tables']>>;
199216
#messageReader = new BinaryReader(new Uint8Array());
200217
#rowListReader = new BinaryReader(new Uint8Array());
218+
#clientFrameEncoder = new BinaryWriter(1024);
201219
#boundSubscriptionBuilder!: () => SubscriptionBuilderImpl<RemoteModule>;
202220
#boundDisconnect!: () => void;
203221

@@ -296,7 +314,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
296314
this.wsPromise = createWSFn({
297315
url,
298316
nameOrAddress,
299-
wsProtocol: 'v2.bsatn.spacetimedb',
317+
wsProtocol: [...PREFERRED_WS_PROTOCOLS],
300318
authToken: token,
301319
compression: compression,
302320
lightMode: lightMode,
@@ -595,23 +613,99 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
595613
}
596614

597615
#flushOutboundQueue(wsResolved: WebsocketAdapter): void {
616+
if (this.#negotiatedWsProtocol === V3_WS_PROTOCOL) {
617+
this.#flushOutboundQueueV3(wsResolved);
618+
return;
619+
}
620+
this.#flushOutboundQueueV2(wsResolved);
621+
}
622+
623+
#flushOutboundQueueV2(wsResolved: WebsocketAdapter): void {
598624
const pending = this.#outboundQueue.splice(0);
599625
for (const message of pending) {
600626
wsResolved.send(message);
601627
}
602628
}
603629

630+
#flushOutboundQueueV3(wsResolved: WebsocketAdapter): void {
631+
if (this.#outboundQueue.length === 0) {
632+
return;
633+
}
634+
635+
// Emit at most one bounded frame per flush. If more encoded v2 messages
636+
// remain in the queue, they are sent by a later scheduled flush so inbound
637+
// traffic and other tasks get a chance to run between websocket writes.
638+
const batchSize = countClientMessagesForV3Frame(
639+
this.#outboundQueue,
640+
MAX_V3_OUTBOUND_FRAME_BYTES
641+
);
642+
wsResolved.send(
643+
encodeClientMessagesV3(
644+
this.#clientFrameEncoder,
645+
this.#outboundQueue,
646+
batchSize
647+
)
648+
);
649+
650+
if (batchSize === this.#outboundQueue.length) {
651+
this.#outboundQueue.length = 0;
652+
return;
653+
}
654+
655+
this.#outboundQueue.copyWithin(0, batchSize);
656+
this.#outboundQueue.length -= batchSize;
657+
if (this.#outboundQueue.length > 0) {
658+
this.#scheduleDeferredOutboundFlush();
659+
}
660+
}
661+
662+
#scheduleOutboundFlush(): void {
663+
this.#scheduleOutboundFlushWith('microtask');
664+
}
665+
666+
#scheduleDeferredOutboundFlush(): void {
667+
this.#scheduleOutboundFlushWith('next-task');
668+
}
669+
670+
#scheduleOutboundFlushWith(schedule: 'microtask' | 'next-task'): void {
671+
if (this.#isOutboundFlushScheduled) {
672+
return;
673+
}
674+
675+
this.#isOutboundFlushScheduled = true;
676+
const flush = () => {
677+
this.#isOutboundFlushScheduled = false;
678+
if (this.ws && this.isActive) {
679+
this.#flushOutboundQueue(this.ws);
680+
}
681+
};
682+
683+
// The first v3 flush stays on the current turn so same-tick sends coalesce.
684+
// Follow-up flushes after a size-capped frame yield to the next task so we
685+
// do not sit in a tight send loop while inbound websocket work is waiting.
686+
if (schedule === 'next-task') {
687+
setTimeout(flush, 0);
688+
} else {
689+
queueMicrotask(flush);
690+
}
691+
}
692+
604693
#reducerArgsEncoder = new BinaryWriter(1024);
605694
#clientMessageEncoder = new BinaryWriter(1024);
606695
#sendEncodedMessage(encoded: Uint8Array, describe: () => string): void {
696+
stdbLogger('trace', describe);
607697
if (this.ws && this.isActive) {
608-
if (this.#outboundQueue.length) this.#flushOutboundQueue(this.ws);
698+
if (this.#negotiatedWsProtocol === V2_WS_PROTOCOL) {
699+
if (this.#outboundQueue.length) this.#flushOutboundQueue(this.ws);
700+
this.ws.send(encoded);
701+
return;
702+
}
609703

610-
stdbLogger('trace', describe);
611-
this.ws.send(encoded);
704+
this.#outboundQueue.push(encoded.slice());
705+
this.#scheduleOutboundFlush();
612706
} else {
613-
stdbLogger('trace', describe);
614-
// use slice() to copy, in case the clientMessageEncoder's buffer gets used
707+
// Use slice() to copy, in case the clientMessageEncoder's buffer gets reused
708+
// before the connection opens or before a v3 microbatch flush runs.
615709
this.#outboundQueue.push(encoded.slice());
616710
}
617711
}
@@ -681,6 +775,9 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
681775
* Handles WebSocket onOpen event.
682776
*/
683777
#handleOnOpen(): void {
778+
if (this.ws) {
779+
this.#negotiatedWsProtocol = normalizeWsProtocol(this.ws.protocol);
780+
}
684781
this.isActive = true;
685782
if (this.ws) {
686783
this.#flushOutboundQueue(this.ws);
@@ -728,10 +825,17 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
728825
);
729826
}
730827

731-
#processMessage(data: Uint8Array): void {
732-
const reader = this.#messageReader;
733-
reader.reset(data);
734-
const serverMessage = ServerMessage.deserialize(reader);
828+
#dispatchPendingCallbacks(callbacks: readonly PendingCallback[]): void {
829+
stdbLogger(
830+
'trace',
831+
() => `Calling ${callbacks.length} triggered row callbacks`
832+
);
833+
for (const callback of callbacks) {
834+
callback.cb();
835+
}
836+
}
837+
838+
#processServerMessage(serverMessage: ServerMessage): void {
735839
stdbLogger(
736840
'trace',
737841
() => `Processing server message: ${stringify(serverMessage)}`
@@ -769,13 +873,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
769873
const callbacks = this.#applyTableUpdates(tableUpdates, eventContext);
770874
const { event: _, ...subscriptionEventContext } = eventContext;
771875
subscription.emitter.emit('applied', subscriptionEventContext);
772-
stdbLogger(
773-
'trace',
774-
() => `Calling ${callbacks.length} triggered row callbacks`
775-
);
776-
for (const callback of callbacks) {
777-
callback.cb();
778-
}
876+
this.#dispatchPendingCallbacks(callbacks);
779877
break;
780878
}
781879
case 'UnsubscribeApplied': {
@@ -801,13 +899,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
801899
const { event: _, ...subscriptionEventContext } = eventContext;
802900
subscription.emitter.emit('end', subscriptionEventContext);
803901
this.#subscriptionManager.subscriptions.delete(querySetId);
804-
stdbLogger(
805-
'trace',
806-
() => `Calling ${callbacks.length} triggered row callbacks`
807-
);
808-
for (const callback of callbacks) {
809-
callback.cb();
810-
}
902+
this.#dispatchPendingCallbacks(callbacks);
811903
break;
812904
}
813905
case 'SubscriptionError': {
@@ -861,13 +953,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
861953
eventContext,
862954
serverMessage.value
863955
);
864-
stdbLogger(
865-
'trace',
866-
() => `Calling ${callbacks.length} triggered row callbacks`
867-
);
868-
for (const callback of callbacks) {
869-
callback.cb();
870-
}
956+
this.#dispatchPendingCallbacks(callbacks);
871957
break;
872958
}
873959
case 'ReducerResult': {
@@ -899,13 +985,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
899985
eventContext,
900986
result.value.transactionUpdate
901987
);
902-
stdbLogger(
903-
'trace',
904-
() => `Calling ${callbacks.length} triggered row callbacks`
905-
);
906-
for (const callback of callbacks) {
907-
callback.cb();
908-
}
988+
this.#dispatchPendingCallbacks(callbacks);
909989
}
910990
this.#reducerCallInfo.delete(requestId);
911991
const cb = this.#reducerCallbacks.get(requestId);
@@ -934,6 +1014,31 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
9341014
}
9351015
}
9361016

1017+
#processV2Message(data: Uint8Array): void {
1018+
const reader = this.#messageReader;
1019+
reader.reset(data);
1020+
this.#processServerMessage(ServerMessage.deserialize(reader));
1021+
}
1022+
1023+
#processMessage(data: Uint8Array): void {
1024+
if (this.#negotiatedWsProtocol !== V3_WS_PROTOCOL) {
1025+
this.#processV2Message(data);
1026+
return;
1027+
}
1028+
1029+
const messageCount = forEachServerMessageV3(
1030+
this.#messageReader,
1031+
data,
1032+
serverMessage => {
1033+
this.#processServerMessage(serverMessage);
1034+
}
1035+
);
1036+
stdbLogger(
1037+
'trace',
1038+
() => `Processing server v3 payload with ${messageCount} message(s)`
1039+
);
1040+
}
1041+
9371042
/**
9381043
* Handles WebSocket onMessage event.
9391044
* @param wsMessage MessageEvent object.

crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { decompress } from './decompress';
22
import { resolveWS } from './ws';
33

44
export interface WebsocketAdapter {
5+
readonly protocol: string;
56
send(msg: Uint8Array): void;
67
close(): void;
78

@@ -12,6 +13,10 @@ export interface WebsocketAdapter {
1213
}
1314

1415
export class WebsocketDecompressAdapter implements WebsocketAdapter {
16+
get protocol(): string {
17+
return this.#ws.protocol;
18+
}
19+
1520
set onclose(handler: (ev: CloseEvent) => void) {
1621
this.#ws.onclose = handler;
1722
}
@@ -73,7 +78,7 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter {
7378
confirmedReads,
7479
}: {
7580
url: URL;
76-
wsProtocol: string;
81+
wsProtocol: string | string[];
7782
nameOrAddress: string;
7883
authToken?: string;
7984
compression: 'gzip' | 'none';
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { stdbLogger } from './logger.ts';
2+
3+
export const V2_WS_PROTOCOL = 'v2.bsatn.spacetimedb';
4+
export const V3_WS_PROTOCOL = 'v3.bsatn.spacetimedb';
5+
export const PREFERRED_WS_PROTOCOLS = [V3_WS_PROTOCOL, V2_WS_PROTOCOL] as const;
6+
7+
export type NegotiatedWsProtocol =
8+
| typeof V2_WS_PROTOCOL
9+
| typeof V3_WS_PROTOCOL;
10+
11+
export function normalizeWsProtocol(protocol: string): NegotiatedWsProtocol {
12+
if (protocol === V3_WS_PROTOCOL) {
13+
return V3_WS_PROTOCOL;
14+
}
15+
// We treat an empty negotiated subprotocol as legacy v2 for compatibility.
16+
if (protocol === '' || protocol === V2_WS_PROTOCOL) {
17+
return V2_WS_PROTOCOL;
18+
}
19+
20+
stdbLogger(
21+
'warn',
22+
`Unexpected websocket subprotocol "${protocol}", falling back to ${V2_WS_PROTOCOL}.`
23+
);
24+
return V2_WS_PROTOCOL;
25+
}

0 commit comments

Comments
 (0)