Skip to content

Commit f181fce

Browse files
Fix anonymous view subscription cleanup (#4646)
# Description of Changes Fixes a bug during view cleanup where we would drop anonymous views that had live subscribers. The reason for this bug is that the `st_view_sub` does not have a single entry for an anonymous view. Rather it has multiple entries - one per identity subscribed. However the code was assuming only a single entry per anonymous view, and so when view cleanup ran, we would only look at the first entry in `st_view_sub` for an anonymous view, and if there were no subscribers, we would drop the view's backing table and read set. But there could still have been other clients subscribed to that view, at which point they would stop receiving updates. This change fixes the bug by making sure there aren't any rows in `st_view_sub` with live subscribers before dropping the anonymous view's backing table and read set. The better fix would be to update `st_view_sub` so that there's only ever at most one row per anonymous view. Unfortunately `st_view_sub` mixes both anonymous and sender-scoped views, and the `sender` column is of type `Identity`, not `Option<Identity>`. Instead of adding a new system table to correct this, I chose to keep using `st_view_sub` and delay this refactor until we add support for view parameters. # API and ABI breaking changes None # Expected complexity level and risk 1.5 # Testing Added regression tests that assert on the underlying system table rows. A smoketest was **not** added because cleanup runs periodically and currently there's no way to force a clean up.
1 parent 635f9d8 commit f181fce

4 files changed

Lines changed: 318 additions & 51 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2349,12 +2349,12 @@ mod tests {
23492349
use std::fs::OpenOptions;
23502350
use std::path::PathBuf;
23512351
use std::rc::Rc;
2352-
use std::time::Instant;
2352+
use std::time::{Duration, Instant};
23532353

23542354
use super::tests_utils::begin_mut_tx;
23552355
use super::*;
23562356
use crate::db::relational_db::tests_utils::{
2357-
begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
2357+
begin_tx, create_view_for_test, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
23582358
};
23592359
use anyhow::bail;
23602360
use bytes::Bytes;
@@ -2518,6 +2518,15 @@ mod tests {
25182518
Ok((view_id, table_id, module_def.clone(), view_def.clone()))
25192519
}
25202520

2521+
fn setup_anonymous_view(stdb: &TestDB) -> ResultTest<(ViewId, TableId)> {
2522+
Ok(create_view_for_test(
2523+
stdb,
2524+
"my_anonymous_view",
2525+
&[("b", AlgebraicType::U8)],
2526+
true,
2527+
)?)
2528+
}
2529+
25212530
fn insert_view_row(stdb: &TestDB, view_id: ViewId, table_id: TableId, sender: Identity, v: u8) -> ResultTest<()> {
25222531
let row_pv = |v: u8| product![v];
25232532

@@ -2543,6 +2552,22 @@ mod tests {
25432552
.collect()
25442553
}
25452554

2555+
fn project_anonymous_views(stdb: &TestDB, table_id: TableId) -> Vec<ProductValue> {
2556+
let tx = begin_tx(stdb);
2557+
2558+
stdb.iter(&tx, table_id)
2559+
.unwrap()
2560+
.map(|row| row.to_product_value())
2561+
.collect()
2562+
}
2563+
2564+
fn update_last_called(stdb: &TestDB, view_id: ViewId, sender: Identity, last_called: Timestamp) -> ResultTest<()> {
2565+
let mut tx = begin_mut_tx(stdb);
2566+
tx.update_view_timestamp_at(view_id, ArgId::SENTINEL, sender, last_called)?;
2567+
stdb.commit_tx(tx)?;
2568+
Ok(())
2569+
}
2570+
25462571
#[test]
25472572
fn test_view_tables_are_ephemeral_in_commitlog() -> ResultTest<()> {
25482573
let stdb = TestDB::durable_without_snapshot_repo()?;
@@ -2723,6 +2748,99 @@ mod tests {
27232748

27242749
Ok(())
27252750
}
2751+
2752+
/// Regression test for anonymous-view cleanup.
2753+
///
2754+
/// If one subscriber row has expired but another subscriber for the same anonymous
2755+
/// view is still live, cleanup must delete only the stale bookkeeping row and keep
2756+
/// the shared materialized backing table intact.
2757+
#[test]
2758+
fn test_anonymous_view_cleanup_keeps_rows_for_live_subscribers() -> ResultTest<()> {
2759+
let stdb = TestDB::durable()?;
2760+
let (view_id, table_id) = setup_anonymous_view(&stdb)?;
2761+
2762+
let stale_sender = Identity::ONE;
2763+
let live_sender = Identity::ZERO;
2764+
2765+
let mut tx = begin_mut_tx(&stdb);
2766+
tx.subscribe_view(view_id, ArgId::SENTINEL, stale_sender)?;
2767+
tx.subscribe_view(view_id, ArgId::SENTINEL, live_sender)?;
2768+
stdb.materialize_anonymous_view(&mut tx, table_id, vec![product![42u8]])?;
2769+
stdb.commit_tx(tx)?;
2770+
2771+
let mut tx = begin_mut_tx(&stdb);
2772+
tx.unsubscribe_view(view_id, ArgId::SENTINEL, stale_sender)?;
2773+
stdb.commit_tx(tx)?;
2774+
2775+
// Make one row definitely expired without relying on wall-clock sleeps.
2776+
update_last_called(&stdb, view_id, stale_sender, Timestamp::UNIX_EPOCH)?;
2777+
2778+
let mut tx = begin_mut_tx(&stdb);
2779+
tx.update_view_timestamp(view_id, ArgId::SENTINEL, live_sender)?;
2780+
stdb.commit_tx(tx)?;
2781+
2782+
// Cleanup should remove only the stale subscriber row and keep the shared
2783+
// anonymous materialization because another subscriber is still live.
2784+
let mut tx = begin_mut_tx(&stdb);
2785+
let (_cleaned, _total_expired) = tx.clear_expired_views(Duration::from_secs(1), VIEW_CLEANUP_BUDGET)?;
2786+
stdb.commit_tx(tx)?;
2787+
2788+
assert_eq!(
2789+
project_anonymous_views(&stdb, table_id),
2790+
vec![product![42u8]],
2791+
"anonymous view rows should survive cleanup while another identity is still subscribed"
2792+
);
2793+
2794+
let tx = begin_mut_tx(&stdb);
2795+
let st_after = tx.lookup_st_view_subs(view_id)?;
2796+
assert_eq!(st_after.len(), 1);
2797+
assert_eq!(st_after[0].identity.0, live_sender);
2798+
assert!(st_after[0].has_subscribers);
2799+
assert_eq!(st_after[0].num_subscribers, 1);
2800+
2801+
Ok(())
2802+
}
2803+
2804+
/// Regression test for anonymous-view cleanup.
2805+
///
2806+
/// Once the final subscriber row for an anonymous view has expired, cleanup must
2807+
/// remove both the stale bookkeeping row and the shared materialized backing table.
2808+
#[test]
2809+
fn test_anonymous_view_cleanup_clears_rows_when_unused() -> ResultTest<()> {
2810+
let stdb = TestDB::durable()?;
2811+
let (view_id, table_id) = setup_anonymous_view(&stdb)?;
2812+
2813+
let sender = Identity::ONE;
2814+
2815+
let mut tx = begin_mut_tx(&stdb);
2816+
tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?;
2817+
stdb.materialize_anonymous_view(&mut tx, table_id, vec![product![42u8]])?;
2818+
stdb.commit_tx(tx)?;
2819+
2820+
let mut tx = begin_mut_tx(&stdb);
2821+
tx.unsubscribe_view(view_id, ArgId::SENTINEL, sender)?;
2822+
stdb.commit_tx(tx)?;
2823+
2824+
// Mark the unsubscribed row as expired so cleanup can process it immediately.
2825+
update_last_called(&stdb, view_id, sender, Timestamp::UNIX_EPOCH)?;
2826+
2827+
// With no remaining subscriber rows, cleanup should drop the shared
2828+
// anonymous materialization and remove the bookkeeping row.
2829+
let mut tx = begin_mut_tx(&stdb);
2830+
let (_cleaned, _total_expired) = tx.clear_expired_views(Duration::from_secs(1), VIEW_CLEANUP_BUDGET)?;
2831+
stdb.commit_tx(tx)?;
2832+
2833+
assert!(
2834+
project_anonymous_views(&stdb, table_id).is_empty(),
2835+
"anonymous view rows should be cleared once no entries remain"
2836+
);
2837+
2838+
let tx = begin_mut_tx(&stdb);
2839+
let st_after = tx.lookup_st_view_subs(view_id)?;
2840+
assert!(st_after.is_empty());
2841+
2842+
Ok(())
2843+
}
27262844
#[test]
27272845
fn test_table_name() -> ResultTest<()> {
27282846
let stdb = TestDB::durable()?;

crates/core/src/host/module_host.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1884,7 +1884,12 @@ impl ModuleHost {
18841884
let table_id = st_view_row.table_id.ok_or(ViewCallError::TableDoesNotExist(view_id))?;
18851885
let is_anonymous = st_view_row.is_anonymous;
18861886
let sender = if is_anonymous { None } else { Some(caller) };
1887-
if !tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)? {
1887+
let is_materialized = if is_anonymous {
1888+
tx.is_anonymous_view_materialized(view_id)?
1889+
} else {
1890+
tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)?
1891+
};
1892+
if !is_materialized {
18881893
let (res, trapped) =
18891894
Self::call_view(instance, tx, &view_name, view_id, table_id, Nullary, caller, sender)?;
18901895
tx = res.tx;

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 151 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -688,44 +688,7 @@ impl InstanceCommon {
688688
tx: MutTxId,
689689
inst: &mut I,
690690
) -> Result<(ViewCallResult, bool), anyhow::Error> {
691-
let views = self.info.module_def.views().collect::<Vec<_>>();
692-
let owner_identity = self.info.owner_identity;
693-
694-
let mut view_calls = Vec::new();
695-
696-
for view in views {
697-
let ViewDef {
698-
name: view_name,
699-
is_anonymous,
700-
fn_ptr,
701-
product_type_ref,
702-
..
703-
} = view;
704-
705-
let st_view = tx
706-
.view_from_name(view_name)?
707-
.ok_or_else(|| anyhow::anyhow!("view {} not found in database", &view_name))?;
708-
709-
let view_id = st_view.view_id;
710-
let table_id = st_view
711-
.table_id
712-
.ok_or_else(|| anyhow::anyhow!("view {} does not have a backing table in database", &view_name))?;
713-
714-
for sub in tx.lookup_st_view_subs(view_id)? {
715-
view_calls.push(CallViewParams {
716-
view_name: view_name.clone(),
717-
view_id,
718-
table_id,
719-
fn_ptr: *fn_ptr,
720-
caller: owner_identity,
721-
sender: if *is_anonymous { None } else { Some(sub.identity.into()) },
722-
args: ArgsTuple::nullary(),
723-
row_type: *product_type_ref,
724-
timestamp: Timestamp::now(),
725-
});
726-
}
727-
}
728-
691+
let view_calls = collect_subscribed_view_calls(&tx, &self.info.module_def, self.info.owner_identity)?;
729692
Ok(self.execute_view_calls(tx, view_calls, inst))
730693
}
731694

@@ -1370,6 +1333,68 @@ impl InstanceCommon {
13701333
}
13711334
}
13721335

1336+
fn collect_subscribed_view_calls(
1337+
tx: &MutTxId,
1338+
module_def: &ModuleDef,
1339+
owner_identity: Identity,
1340+
) -> Result<Vec<CallViewParams>, anyhow::Error> {
1341+
let mut view_calls = Vec::new();
1342+
1343+
for view in module_def.views() {
1344+
let ViewDef {
1345+
name: view_name,
1346+
is_anonymous,
1347+
fn_ptr,
1348+
product_type_ref,
1349+
..
1350+
} = view;
1351+
1352+
let st_view = tx
1353+
.view_from_name(view_name)?
1354+
.ok_or_else(|| anyhow::anyhow!("view {} not found in database", &view_name))?;
1355+
1356+
let view_id = st_view.view_id;
1357+
let table_id = st_view
1358+
.table_id
1359+
.ok_or_else(|| anyhow::anyhow!("view {} does not have a backing table in database", &view_name))?;
1360+
let subs = tx.lookup_st_view_subs(view_id)?;
1361+
1362+
if *is_anonymous {
1363+
if subs.is_empty() {
1364+
continue;
1365+
}
1366+
view_calls.push(CallViewParams {
1367+
view_name: view_name.clone(),
1368+
view_id,
1369+
table_id,
1370+
fn_ptr: *fn_ptr,
1371+
caller: owner_identity,
1372+
sender: None,
1373+
args: ArgsTuple::nullary(),
1374+
row_type: *product_type_ref,
1375+
timestamp: Timestamp::now(),
1376+
});
1377+
continue;
1378+
}
1379+
1380+
for sub in subs {
1381+
view_calls.push(CallViewParams {
1382+
view_name: view_name.clone(),
1383+
view_id,
1384+
table_id,
1385+
fn_ptr: *fn_ptr,
1386+
caller: owner_identity,
1387+
sender: Some(sub.identity.into()),
1388+
args: ArgsTuple::nullary(),
1389+
row_type: *product_type_ref,
1390+
timestamp: Timestamp::now(),
1391+
});
1392+
}
1393+
}
1394+
1395+
Ok(view_calls)
1396+
}
1397+
13731398
/// Pre-fetched VM metrics counters for all reducers and views in a module.
13741399
/// Anonymous views have lazily fetched metrics counters.
13751400
struct AllVmMetrics {
@@ -1712,3 +1737,91 @@ impl InstanceOp for ProcedureOp {
17121737
FuncCallType::Procedure
17131738
}
17141739
}
1740+
1741+
#[cfg(test)]
1742+
mod tests {
1743+
use super::collect_subscribed_view_calls;
1744+
use crate::db::relational_db::tests_utils::{begin_mut_tx, TestDB};
1745+
use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9Builder;
1746+
use spacetimedb_lib::{AlgebraicType, Identity, ProductType};
1747+
use spacetimedb_primitives::ArgId;
1748+
use spacetimedb_sats::raw_identifier::RawIdentifier;
1749+
use spacetimedb_schema::def::ModuleDef;
1750+
1751+
fn module_def_for_view(name: &str, is_anonymous: bool) -> ModuleDef {
1752+
let mut builder = RawModuleDefV9Builder::new();
1753+
let name = RawIdentifier::new(name);
1754+
let type_ref = builder.add_algebraic_type(
1755+
[],
1756+
name.clone(),
1757+
AlgebraicType::Product(ProductType::from_iter([("x", AlgebraicType::U8)])),
1758+
true,
1759+
);
1760+
1761+
builder.add_view(
1762+
name.clone(),
1763+
0,
1764+
true,
1765+
is_anonymous,
1766+
ProductType::unit(),
1767+
AlgebraicType::array(AlgebraicType::Ref(type_ref)),
1768+
);
1769+
1770+
builder.finish().try_into().expect("test module def should be valid")
1771+
}
1772+
1773+
/// Regression test for evaluating anonymous views.
1774+
///
1775+
/// Anonymous views have one shared materialization,
1776+
/// so we should only re-evaluate once even if there are multiple subscribers.
1777+
#[test]
1778+
fn test_dedup_anonymous_view_calls() -> anyhow::Result<()> {
1779+
let stdb = TestDB::in_memory()?;
1780+
let module_def = module_def_for_view("anonymous_view", true);
1781+
let view_def = module_def.view("anonymous_view").expect("view should exist");
1782+
1783+
let mut tx = begin_mut_tx(&stdb);
1784+
let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
1785+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?;
1786+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
1787+
1788+
// Two subscriber rows exist, but anonymous views should still be reevaluated once
1789+
// because they share a single materialization.
1790+
let calls = collect_subscribed_view_calls(&tx, &module_def, Identity::ZERO)?;
1791+
1792+
assert_eq!(
1793+
calls.len(),
1794+
1,
1795+
"anonymous views should only be reevaluated once even with multiple subscriber rows"
1796+
);
1797+
assert_eq!(calls[0].view_id, view_id);
1798+
assert_eq!(calls[0].sender, None);
1799+
Ok(())
1800+
}
1801+
1802+
/// Regression test for evaluating sender-scoped views.
1803+
///
1804+
/// These views have separate materializations per sender,
1805+
/// so reevaluation must emit one call per subscribed sender.
1806+
#[test]
1807+
fn test_distinct_sender_scoped_view_calls() -> anyhow::Result<()> {
1808+
let stdb = TestDB::in_memory()?;
1809+
let module_def = module_def_for_view("sender_view", false);
1810+
let view_def = module_def.view("sender_view").expect("view should exist");
1811+
1812+
let mut tx = begin_mut_tx(&stdb);
1813+
let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
1814+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?;
1815+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
1816+
1817+
// Sender-backed views keep one materialization per sender, so reevaluation must
1818+
// preserve both callers.
1819+
let calls = collect_subscribed_view_calls(&tx, &module_def, Identity::ZERO)?;
1820+
let senders: Vec<_> = calls.iter().filter_map(|call| call.sender).collect();
1821+
1822+
assert_eq!(calls.len(), 2, "sender views should still reevaluate once per sender");
1823+
assert!(senders.contains(&Identity::ZERO));
1824+
assert!(senders.contains(&Identity::ONE));
1825+
Ok(())
1826+
}
1827+
}

0 commit comments

Comments
 (0)