Skip to content

Commit 408e54f

Browse files
Fix disconnects breaking view updates for other connections (#4607)
# Description of Changes Fixes a bug in client disconnect logic that would mark a client's views as dropped(unsubscribed). However it was marking the identity's views as dropped, not the connection. So if an identity had multiple connections open, each subscribing to different views, and one of them disconnected, the subscriptions for the other connections would break. The observed behavior would be that they would stop receiving subscription updates. This could potentially lead to their client cache getting into a corrupted state. Now, instead of dropping all of the views for a particular identity on disconnect, we drop only the views for that particular connection. And when I say drop, what I really mean is decrement. A view is not dropped completely unless it no longer had any subscribers. # API and ABI breaking changes None # Expected complexity level and risk 2 # Testing Regression smoketest was added
1 parent 6eaf06b commit 408e54f

9 files changed

Lines changed: 120 additions & 37 deletions

File tree

crates/client-api/src/routes/database.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,7 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
184184
};
185185

186186
module
187-
// We don't clear views or procedures after reducer calls
188-
.call_identity_disconnected(caller_identity, connection_id, false)
187+
.call_identity_disconnected(caller_identity, connection_id)
189188
.await
190189
.map_err(client_disconnected_error_to_response)?;
191190

crates/core/src/host/host_controller.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1012,7 +1012,7 @@ impl Host {
10121012
// No need to clear view tables here since we do it in `clear_all_clients`.
10131013
for (identity, connection_id) in connected_clients {
10141014
module_host
1015-
.call_identity_disconnected(identity, connection_id, false)
1015+
.call_identity_disconnected(identity, connection_id)
10161016
.await
10171017
.with_context(|| {
10181018
format!(

crates/core/src/host/module_host.rs

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1251,7 +1251,6 @@ impl ModuleHost {
12511251
client_id.identity,
12521252
client_id.connection_id,
12531253
info,
1254-
true,
12551254
call_reducer,
12561255
trapped_slot,
12571256
)
@@ -1293,7 +1292,6 @@ impl ModuleHost {
12931292
caller_identity: Identity,
12941293
caller_connection_id: ConnectionId,
12951294
info: &ModuleInfo,
1296-
drop_view_subscribers: bool,
12971295
call_reducer: impl FnOnce(Option<MutTxId>, CallReducerParams) -> (ReducerCallResult, bool),
12981296
trapped_slot: &mut bool,
12991297
) -> Result<(), ReducerCallError> {
@@ -1309,20 +1307,10 @@ impl ModuleHost {
13091307

13101308
let workload = || Workload::reducer_no_args(reducer_name.clone(), caller_identity, caller_connection_id);
13111309

1312-
// Decrement the number of subscribers for each view this caller is subscribed to
1313-
let dec_view_subscribers = |tx: &mut MutTxId| {
1314-
if drop_view_subscribers && let Err(err) = tx.unsubscribe_views(caller_identity) {
1315-
log::error!("`call_identity_disconnected`: failed to delete client view data: {err}");
1316-
}
1317-
};
1318-
13191310
// A fallback transaction that deletes the client from `st_client`.
13201311
let database_identity = stdb.database_identity();
13211312
let fallback = || {
13221313
stdb.with_auto_commit(workload(), |mut_tx| {
1323-
1324-
dec_view_subscribers(mut_tx);
1325-
13261314
if !is_client_exist(mut_tx) {
13271315
// The client is already gone. Nothing to do.
13281316
log::debug!(
@@ -1348,9 +1336,7 @@ impl ModuleHost {
13481336
};
13491337

13501338
if let Some((reducer_id, reducer_def)) = reducer_lookup {
1351-
let mut mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());
1352-
1353-
dec_view_subscribers(&mut mut_tx);
1339+
let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());
13541340

13551341
if !is_client_exist(&mut_tx) {
13561342
// The client is already gone. Nothing to do.
@@ -1430,13 +1416,12 @@ impl ModuleHost {
14301416
&self,
14311417
caller_identity: Identity,
14321418
caller_connection_id: ConnectionId,
1433-
drop_view_subscribers: bool,
14341419
) -> Result<(), ReducerCallError> {
14351420
self.call(
14361421
"call_identity_disconnected",
1437-
(caller_identity, caller_connection_id, drop_view_subscribers),
1438-
async |(a, b, c), inst| inst.call_identity_disconnected(a, b, c),
1439-
async |(a, b, c), inst| inst.call_identity_disconnected(a, b, c).await,
1422+
(caller_identity, caller_connection_id),
1423+
async |(a, b), inst| inst.call_identity_disconnected(a, b),
1424+
async |(a, b), inst| inst.call_identity_disconnected(a, b).await,
14401425
)
14411426
.await?
14421427
}

crates/core/src/host/v8/mod.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -410,11 +410,10 @@ impl JsInstance {
410410
&mut self,
411411
caller_identity: Identity,
412412
caller_connection_id: ConnectionId,
413-
drop_view_subscribers: bool,
414413
) -> Result<(), ReducerCallError> {
415414
self.send_recv(
416415
JsWorkerReply::into_call_identity_disconnected,
417-
JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id, drop_view_subscribers),
416+
JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id),
418417
)
419418
.await
420419
}
@@ -505,7 +504,7 @@ enum JsWorkerRequest {
505504
/// See [`JsInstance::call_identity_connected`].
506505
CallIdentityConnected(ConnectionAuthCtx, ConnectionId),
507506
/// See [`JsInstance::call_identity_disconnected`].
508-
CallIdentityDisconnected(Identity, ConnectionId, bool),
507+
CallIdentityDisconnected(Identity, ConnectionId),
509508
/// See [`JsInstance::disconnect_client`].
510509
DisconnectClient(ClientActorId),
511510
/// See [`JsInstance::init_database`].
@@ -718,17 +717,12 @@ async fn spawn_instance_worker(
718717
call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped);
719718
reply("call_identity_connected", CallIdentityConnected(res), trapped);
720719
}
721-
JsWorkerRequest::CallIdentityDisconnected(
722-
caller_identity,
723-
caller_connection_id,
724-
drop_view_subcribers,
725-
) => {
720+
JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id) => {
726721
let mut trapped = false;
727722
let res = ModuleHost::call_identity_disconnected_inner(
728723
caller_identity,
729724
caller_connection_id,
730725
info,
731-
drop_view_subcribers,
732726
call_reducer,
733727
&mut trapped,
734728
);

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,6 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
488488
&mut self,
489489
caller_identity: Identity,
490490
caller_connection_id: ConnectionId,
491-
drop_view_subscribers: bool,
492491
) -> Result<(), ReducerCallError> {
493492
let module = &self.common.info.clone();
494493
let call_reducer = |tx, params| self.call_reducer_with_tx(tx, params);
@@ -497,7 +496,6 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
497496
caller_identity,
498497
caller_connection_id,
499498
module,
500-
drop_view_subscribers,
501499
call_reducer,
502500
&mut trapped,
503501
);

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1560,8 +1560,24 @@ impl ModuleSubscriptions {
15601560
}
15611561

15621562
pub fn remove_subscriber(&self, client_id: ClientActorId) {
1563-
let mut subscriptions = self.subscriptions.write();
1564-
subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.connection_id));
1563+
let removed_queries = {
1564+
let mut subscriptions = self.subscriptions.write();
1565+
subscriptions.remove_all_subscriptions(&(client_id.identity, client_id.connection_id))
1566+
};
1567+
1568+
if removed_queries.is_empty() {
1569+
return;
1570+
}
1571+
1572+
// TODO(perf): Removing a subscriber is currently O(subscribed_queries).
1573+
// Instead we should maintain an index to make this O(subscribed_views).
1574+
if let Err(err) = self.unsubscribe_views(&removed_queries, client_id.identity) {
1575+
log::error!(
1576+
"failed to unsubscribe views for disconnected client ({}, {}): {err}",
1577+
client_id.identity,
1578+
client_id.connection_id
1579+
);
1580+
}
15651581
}
15661582

15671583
/// Rolls back `tx` and returns the offset as it was before `tx`.

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,10 +1203,20 @@ impl SubscriptionManager {
12031203
/// If a query no longer has any subscribers,
12041204
/// it is removed from the index along with its table ids.
12051205
#[tracing::instrument(level = "trace", skip_all)]
1206-
pub fn remove_all_subscriptions(&mut self, client: &ClientId) {
1206+
pub fn remove_all_subscriptions(&mut self, client: &ClientId) -> Vec<Query> {
1207+
let mut removed_query_hashes = HashSet::new();
1208+
if let Some(client_info) = self.clients.get(client) {
1209+
removed_query_hashes.extend(client_info.legacy_subscriptions.iter().copied());
1210+
removed_query_hashes.extend(client_info.subscription_ref_count.keys().copied());
1211+
}
1212+
let removed_queries = removed_query_hashes
1213+
.into_iter()
1214+
.filter_map(|hash| self.queries.get(&hash).map(|q| q.query.clone()))
1215+
.collect::<Vec<_>>();
1216+
12071217
self.remove_legacy_subscriptions(client);
12081218
let Some(client_info) = self.remove_client_and_inform_send_worker(*client) else {
1209-
return;
1219+
return removed_queries;
12101220
};
12111221

12121222
debug_assert!(client_info.legacy_subscriptions.is_empty());
@@ -1252,6 +1262,8 @@ impl SubscriptionManager {
12521262
for query_hash in queries_to_remove {
12531263
self.queries.remove(&query_hash);
12541264
}
1265+
1266+
removed_queries
12551267
}
12561268

12571269
/// Find the queries that need to be evaluated for this table update.

crates/smoketests/modules/views-sql/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ pub fn add_player_level(ctx: &ReducerContext, id: u64, level: u64) {
2424
ctx.db.player_level().insert(PlayerState { id, level });
2525
}
2626

27+
#[spacetimedb::reducer]
28+
pub fn set_player_state(ctx: &ReducerContext, id: u64, level: u64) {
29+
if let Some(mut row) = ctx.db.player_state().id().find(id) {
30+
row.level = level;
31+
ctx.db.player_state().id().update(row);
32+
} else {
33+
ctx.db.player_state().insert(PlayerState { id, level });
34+
}
35+
}
36+
2737
#[spacetimedb::view(accessor = my_player_and_level, public)]
2838
pub fn my_player_and_level(ctx: &AnonymousViewContext) -> Option<PlayerState> {
2939
ctx.db.player_level().id().find(0)

crates/smoketests/tests/smoketests/views.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,75 @@ fn test_typescript_procedure_triggers_subscription_updates() {
553553
);
554554
}
555555

556+
#[test]
557+
fn test_disconnect_does_not_break_sender_view() {
558+
let test = Smoketest::builder().precompiled_module("views-sql").build();
559+
560+
test.call("set_player_state", &["42", "1"]).unwrap();
561+
562+
// Two connections subscribe to the same view.
563+
let sub_keep = test.subscribe_background(&["SELECT * FROM player"], 2).unwrap();
564+
let sub_drop = test.subscribe_background(&["SELECT * FROM player"], 1).unwrap();
565+
566+
// Both connections should receive the first update.
567+
// After one connection disconnects, the other should still receive updates.
568+
test.call("set_player_state", &["42", "2"]).unwrap();
569+
let _ = sub_drop.collect().unwrap();
570+
test.call("set_player_state", &["42", "3"]).unwrap();
571+
572+
let events = sub_keep.collect().unwrap();
573+
574+
assert_eq!(events.len(), 2, "Expected two updates for player, got: {events:?}");
575+
let inserts = events[1]["player"]["inserts"]
576+
.as_array()
577+
.expect("Expected inserts array on player update");
578+
assert!(
579+
inserts
580+
.iter()
581+
.any(|row| row["id"] == json!(42) && row["level"] == json!(3)),
582+
"Expected player id=42 level=3 insert after disconnect, got: {events:?}"
583+
);
584+
}
585+
586+
#[test]
587+
fn test_disconnect_does_not_break_anonymous_view() {
588+
let test = Smoketest::builder().precompiled_module("views-sql").build();
589+
590+
// Seed a row in the anonymous-view source table.
591+
test.call("add_player_level", &["0", "2"]).unwrap();
592+
593+
// Two connections subscribe to the same anonymous view.
594+
let sub_keep = test
595+
.subscribe_background(&["SELECT * FROM player_and_level"], 2)
596+
.unwrap();
597+
let sub_drop = test
598+
.subscribe_background(&["SELECT * FROM player_and_level"], 1)
599+
.unwrap();
600+
601+
// Both connections should receive the first update.
602+
// After one connection disconnects, the other should still receive updates.
603+
test.call("add_player_level", &["1", "2"]).unwrap();
604+
let _ = sub_drop.collect().unwrap();
605+
test.call("add_player_level", &["2", "2"]).unwrap();
606+
607+
let events = sub_keep.collect().unwrap();
608+
609+
assert_eq!(
610+
events.len(),
611+
2,
612+
"Expected two updates for player_and_level, got: {events:?}"
613+
);
614+
let inserts = events[1]["player_and_level"]["inserts"]
615+
.as_array()
616+
.expect("Expected inserts array on player_and_level update");
617+
assert!(
618+
inserts
619+
.iter()
620+
.any(|row| row["id"] == json!(2) && row["level"] == json!(2)),
621+
"Expected player id=2 level=2 insert after disconnect, got: {events:?}"
622+
);
623+
}
624+
556625
#[test]
557626
fn test_typescript_query_builder_view_query() {
558627
require_pnpm!();

0 commit comments

Comments
 (0)