Skip to content

Commit 68022eb

Browse files
Refresh views during one-shot schedule cleanup (#4639)
# Description of Changes Fixes a bug where clients subscribed through views could miss delete updates when a one-shot scheduled reducer or procedure completed and the scheduler automatically deleted the corresponding schedule row. This patch ensures that scheduled-row cleanup refreshes any stale views before commit/broadcast. Note, this only affected one-shot schedules. Interval schedules do not delete their schedule row after each run, so they were not going through this path. # API and ABI breaking changes None # Expected complexity level and risk 1.5 # Testing Added regression coverage for: - one-shot scheduled reducer: table + view subscription sees insert then delete - one-shot scheduled procedure: table + view subscription sees insert then delete - failing scheduled reducer: cleanup still refreshes the dependent view delete - observe transitive scheduled table updates through join The one-shot tests now assert on the scheduled table and dependent view in the same subscription updates, which proves the delete comes from the automatic cleanup transaction itself rather than a later manual write.
1 parent 3fe50e7 commit 68022eb

7 files changed

Lines changed: 362 additions & 184 deletions

File tree

crates/core/src/host/module_host.rs

Lines changed: 89 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1904,51 +1904,91 @@ impl ModuleHost {
19041904
Ok((tx, false))
19051905
}
19061906

1907+
/// Refreshes every view made stale by `tx`.
1908+
///
1909+
/// The returned transaction contains both the original writes and any backing-table
1910+
/// updates produced by re-materializing the affected views.
19071911
pub fn call_views_with_tx<I: WasmInstance>(
19081912
tx: MutTxId,
19091913
instance: &mut RefInstance<'_, I>,
19101914
caller: Identity,
1911-
) -> Result<(ViewCallResult, bool), ViewCallError> {
1912-
let mut out = ViewCallResult::default(tx);
1915+
) -> (ViewCallResult, bool) {
1916+
Self::call_views_with_tx_at(tx, instance, caller, Timestamp::now())
1917+
}
1918+
1919+
/// Refreshes every view made stale by `tx`.
1920+
///
1921+
/// This is the shared host-side path for finishing a transaction after writes have
1922+
/// invalidated one or more materialized views.
1923+
pub fn call_views_with_tx_at<I: WasmInstance>(
1924+
tx: MutTxId,
1925+
instance: &mut RefInstance<'_, I>,
1926+
caller: Identity,
1927+
timestamp: Timestamp,
1928+
) -> (ViewCallResult, bool) {
1929+
let mut tx = tx;
19131930
let module_def = &instance.common.info().module_def;
1931+
let mut outcome = ViewOutcome::Success;
1932+
let mut energy_used = FunctionBudget::ZERO;
1933+
let mut total_duration = Duration::ZERO;
1934+
let mut abi_duration = Duration::ZERO;
19141935
let mut trapped = false;
1915-
use FunctionArgs::Nullary;
19161936
for ViewCallInfo {
19171937
view_id,
19181938
table_id,
19191939
fn_ptr,
19201940
sender,
1921-
} in out.tx.views_for_refresh().cloned().collect::<Vec<_>>()
1941+
} in tx.views_for_refresh().cloned().collect::<Vec<_>>()
19221942
{
1923-
let view_def = module_def
1924-
.get_view_by_id(fn_ptr, sender.is_none())
1925-
.ok_or(ViewCallError::NoSuchView)?;
1943+
let Some(view_def) = module_def.get_view_by_id(fn_ptr, sender.is_none()) else {
1944+
outcome = ViewOutcome::Failed(format!("view with fn_ptr `{fn_ptr}` not found"));
1945+
break;
1946+
};
1947+
let args = match FunctionArgs::Nullary.into_tuple_for_def(module_def, view_def) {
1948+
Ok(args) => args,
1949+
Err(err) => {
1950+
outcome = ViewOutcome::Failed(format!("failed to build view args: {err}"));
1951+
break;
1952+
}
1953+
};
19261954

1927-
let (result, trap) = Self::call_view(
1955+
let (result, trap) = Self::call_view_inner(
19281956
instance,
1929-
out.tx,
1957+
tx,
19301958
&view_def.name,
19311959
view_id,
19321960
table_id,
1933-
Nullary,
1961+
view_def.fn_ptr,
19341962
caller,
19351963
sender,
1936-
)?;
1964+
args,
1965+
view_def.product_type_ref,
1966+
timestamp,
1967+
);
19371968

19381969
// Increment execution stats
1939-
out.tx = result.tx;
1940-
out.outcome = result.outcome;
1941-
out.energy_used += result.energy_used;
1942-
out.total_duration += result.total_duration;
1943-
out.abi_duration += result.abi_duration;
1970+
tx = result.tx;
1971+
outcome = result.outcome;
1972+
energy_used += result.energy_used;
1973+
total_duration += result.total_duration;
1974+
abi_duration += result.abi_duration;
19441975
trapped |= trap;
19451976

19461977
// Terminate early if execution failed
1947-
if !matches!(out.outcome, ViewOutcome::Success) || trapped {
1978+
if !matches!(outcome, ViewOutcome::Success) || trapped {
19481979
break;
19491980
}
19501981
}
1951-
Ok((out, trapped))
1982+
(
1983+
ViewCallResult {
1984+
outcome,
1985+
tx,
1986+
energy_used,
1987+
total_duration,
1988+
abi_duration,
1989+
},
1990+
trapped,
1991+
)
19521992
}
19531993

19541994
fn call_view<I: WasmInstance>(
@@ -1960,6 +2000,30 @@ impl ModuleHost {
19602000
args: FunctionArgs,
19612001
caller: Identity,
19622002
sender: Option<Identity>,
2003+
) -> Result<(ViewCallResult, bool), ViewCallError> {
2004+
Self::call_view_at(
2005+
instance,
2006+
tx,
2007+
view_name,
2008+
view_id,
2009+
table_id,
2010+
args,
2011+
caller,
2012+
sender,
2013+
Timestamp::now(),
2014+
)
2015+
}
2016+
2017+
fn call_view_at<I: WasmInstance>(
2018+
instance: &mut RefInstance<'_, I>,
2019+
tx: MutTxId,
2020+
view_name: &Identifier,
2021+
view_id: ViewId,
2022+
table_id: TableId,
2023+
args: FunctionArgs,
2024+
caller: Identity,
2025+
sender: Option<Identity>,
2026+
timestamp: Timestamp,
19632027
) -> Result<(ViewCallResult, bool), ViewCallError> {
19642028
let module_def = &instance.common.info().module_def;
19652029
let view_def = module_def.view(view_name).ok_or(ViewCallError::NoSuchView)?;
@@ -1969,21 +2033,9 @@ impl ModuleHost {
19692033
.into_tuple_for_def(module_def, view_def)
19702034
.map_err(InvalidViewArguments)?;
19712035

1972-
match Self::call_view_inner(
1973-
instance, tx, view_name, view_id, table_id, fn_ptr, caller, sender, args, row_type,
1974-
) {
1975-
err @ Err(ViewCallError::NoSuchView) => {
1976-
let _log_message = no_such_function_log_message("view", view_name);
1977-
// self.inject_logs(LogLevel::Error, view_name, &log_message);
1978-
err
1979-
}
1980-
err @ Err(ViewCallError::Args(_)) => {
1981-
let _log_message = args_error_log_message("view", view_name);
1982-
// self.inject_logs(LogLevel::Error, view_name, &log_message);
1983-
err
1984-
}
1985-
res => res,
1986-
}
2036+
Ok(Self::call_view_inner(
2037+
instance, tx, view_name, view_id, table_id, fn_ptr, caller, sender, args, row_type, timestamp,
2038+
))
19872039
}
19882040

19892041
fn call_view_inner<I: WasmInstance>(
@@ -1997,10 +2049,11 @@ impl ModuleHost {
19972049
sender: Option<Identity>,
19982050
args: ArgsTuple,
19992051
row_type: AlgebraicTypeRef,
2000-
) -> Result<(ViewCallResult, bool), ViewCallError> {
2052+
timestamp: Timestamp,
2053+
) -> (ViewCallResult, bool) {
20012054
let view_name = name.clone();
20022055
let params = CallViewParams {
2003-
timestamp: Timestamp::now(),
2056+
timestamp,
20042057
view_name,
20052058
view_id,
20062059
table_id,
@@ -2011,7 +2064,7 @@ impl ModuleHost {
20112064
row_type,
20122065
};
20132066

2014-
Ok(instance.common.call_view_with_tx(tx, params, instance.instance))
2067+
instance.common.call_view_with_tx(tx, params, instance.instance)
20152068
}
20162069

20172070
pub async fn init_database(&self, program: Program) -> Result<Option<ReducerCallResult>, InitDatabaseError> {

crates/core/src/host/scheduler.rs

Lines changed: 72 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -379,21 +379,6 @@ pub(super) async fn call_scheduled_function(
379379
};
380380
let db = &**module_info.relational_db();
381381

382-
let delete_scheduled_function_row = |tx: Option<MutTxId>, timestamp: Option<_>| {
383-
id.and_then(|id| {
384-
let (timestamp, instant) = timestamp.unwrap_or_else(|| (Timestamp::now(), Instant::now()));
385-
let tx = tx.unwrap_or_else(|| db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal));
386-
let schedule_at = delete_scheduled_function_row_with_tx(module_info, db, tx, id)?;
387-
let ScheduleAt::Interval(dur) = schedule_at else {
388-
return None;
389-
};
390-
Some(Reschedule {
391-
at_ts: schedule_at.to_timestamp_from(timestamp),
392-
at_real: instant + dur.to_duration_abs(),
393-
})
394-
})
395-
};
396-
397382
enum Function {
398383
Reducer(CallScheduledFunctionResult, bool),
399384
Procedure {
@@ -416,7 +401,7 @@ pub(super) async fn call_scheduled_function(
416401
Err(err) => {
417402
// All we can do here is log an error.
418403
log::error!("could not determine scheduled function or its parameters: {err:#}");
419-
let reschedule = delete_scheduled_function_row(Some(tx), None);
404+
let reschedule = delete_scheduled_function_row(module_info, db, id, Some(tx), None, inst_common, inst);
420405
return (CallScheduledFunctionResult { reschedule }, false);
421406
}
422407
};
@@ -452,7 +437,7 @@ pub(super) async fn call_scheduled_function(
452437
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
453438
inst_common.call_reducer_with_tx(Some(tx), params, inst)
454439
}));
455-
let reschedule = delete_scheduled_function_row(None, None);
440+
let reschedule = delete_scheduled_function_row(module_info, db, id, None, None, inst_common, inst);
456441
// Currently, we drop the return value from the function call. In the future,
457442
// we might want to handle it somehow.
458443
let trapped = match result {
@@ -463,7 +448,15 @@ pub(super) async fn call_scheduled_function(
463448
}
464449
CallParams::Procedure(params) => {
465450
// Delete scheduled row.
466-
let reschedule = delete_scheduled_function_row(Some(tx), Some((timestamp, instant)));
451+
let reschedule = delete_scheduled_function_row(
452+
module_info,
453+
db,
454+
id,
455+
Some(tx),
456+
Some((timestamp, instant)),
457+
inst_common,
458+
inst,
459+
);
467460
Function::Procedure { params, reschedule }
468461
}
469462
}
@@ -492,11 +485,47 @@ pub(super) async fn call_scheduled_function(
492485
}
493486
}
494487

488+
/// Deletes a scheduled-row entry after its function runs, reusing `tx` when one is already
489+
/// open and otherwise creating an internal transaction for the cleanup.
490+
///
491+
/// One-shot schedules are deleted and committed immediately. Interval schedules are not
492+
/// deleted here; instead their `ScheduleAt` is returned to the caller as a `Reschedule`.
493+
fn delete_scheduled_function_row(
494+
module_info: &ModuleInfo,
495+
db: &RelationalDB,
496+
id: Option<ScheduledFunctionId>,
497+
tx: Option<MutTxId>,
498+
timestamp: Option<(Timestamp, Instant)>,
499+
inst_common: &mut InstanceCommon,
500+
inst: &mut impl WasmInstance,
501+
) -> Option<Reschedule> {
502+
id.and_then(|id| {
503+
let (timestamp, instant) = timestamp.unwrap_or_else(|| (Timestamp::now(), Instant::now()));
504+
let tx = tx.unwrap_or_else(|| db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal));
505+
let schedule_at = delete_scheduled_function_row_with_tx(module_info, db, tx, id, inst_common, inst)?;
506+
let ScheduleAt::Interval(dur) = schedule_at else {
507+
return None;
508+
};
509+
Some(Reschedule {
510+
at_ts: schedule_at.to_timestamp_from(timestamp),
511+
at_real: instant + dur.to_duration_abs(),
512+
})
513+
})
514+
}
515+
516+
/// Deletes a scheduled-row entry inside an existing mutable transaction.
517+
///
518+
/// If the row describes a one-shot schedule, this also refreshes any stale views and
519+
/// broadcasts the resulting delete in the same transaction.
520+
///
521+
/// Interval schedules are left in place and returned to the caller for rescheduling.
495522
fn delete_scheduled_function_row_with_tx(
496523
module_info: &ModuleInfo,
497524
db: &RelationalDB,
498525
mut tx: MutTxId,
499526
id: ScheduledFunctionId,
527+
inst_common: &mut InstanceCommon,
528+
inst: &mut impl WasmInstance,
500529
) -> Option<ScheduleAt> {
501530
if let Ok(Some(schedule_row)) = get_schedule_row_mut(&tx, db, id) {
502531
match read_schedule_at(&schedule_row, id.at_column) {
@@ -508,7 +537,7 @@ fn delete_scheduled_function_row_with_tx(
508537
let row_ptr = schedule_row.pointer();
509538
db.delete(&mut tx, id.table_id, [row_ptr]);
510539

511-
commit_and_broadcast_deletion_event(tx, module_info);
540+
refresh_views_then_commit_and_broadcast(tx, module_info, inst_common, inst);
512541
}
513542
_ => {
514543
log::debug!(
@@ -522,13 +551,30 @@ fn delete_scheduled_function_row_with_tx(
522551
None
523552
}
524553

525-
fn commit_and_broadcast_deletion_event(tx: MutTxId, module_info: &ModuleInfo) {
554+
/// Refreshes stale views, commits transaction, and broadcasts subscription updates.
555+
fn refresh_views_then_commit_and_broadcast(
556+
tx: MutTxId,
557+
module_info: &ModuleInfo,
558+
inst_common: &mut InstanceCommon,
559+
inst: &mut impl WasmInstance,
560+
) {
561+
let timestamp = Timestamp::now();
562+
let (view_result, trapped) = inst_common.call_views_with_tx(tx, module_info.database_identity, inst, timestamp);
563+
let mut status = match view_result.outcome {
564+
crate::host::module_host::ViewOutcome::Success => EventStatus::Committed(DatabaseUpdate::default()),
565+
crate::host::module_host::ViewOutcome::Failed(err) => EventStatus::FailedInternal(err),
566+
crate::host::module_host::ViewOutcome::BudgetExceeded => EventStatus::OutOfEnergy,
567+
};
568+
if trapped && matches!(status, EventStatus::Committed(_)) {
569+
status = EventStatus::FailedInternal("The instance encountered a fatal error.".into());
570+
}
571+
526572
let event = ModuleEvent {
527-
timestamp: Timestamp::now(),
573+
timestamp,
528574
caller_identity: module_info.database_identity,
529575
caller_connection_id: None,
530576
function_call: ModuleFunctionCall::default(),
531-
status: EventStatus::Committed(DatabaseUpdate::default()),
577+
status,
532578
reducer_return_value: None,
533579
//Keeping them 0 as it is internal transaction, not by reducer
534580
energy_quanta_used: EnergyQuanta { quanta: 0 },
@@ -537,7 +583,10 @@ fn commit_and_broadcast_deletion_event(tx: MutTxId, module_info: &ModuleInfo) {
537583
timer: None,
538584
};
539585

540-
if let Err(e) = module_info.subscriptions.commit_and_broadcast_event(None, event, tx) {
586+
if let Err(e) = module_info
587+
.subscriptions
588+
.commit_and_broadcast_event(None, event, view_result.tx)
589+
{
541590
log::error!("Failed to broadcast deletion event: {e:#}");
542591
}
543592
}

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -947,7 +947,7 @@ impl InstanceCommon {
947947

948948
// Only re-evaluate and update views if the reducer's execution was successful
949949
let (out, trapped) = if !trapped && matches!(status, EventStatus::Committed(_)) {
950-
self.call_views_with_tx(tx, caller_identity, &info.module_def, inst, timestamp)
950+
self.call_views_with_tx(tx, caller_identity, inst, timestamp)
951951
} else {
952952
(ViewCallResult::default(tx), trapped)
953953
};
@@ -1315,32 +1315,14 @@ impl InstanceCommon {
13151315
&mut self,
13161316
tx: MutTxId,
13171317
caller: Identity,
1318-
module_def: &ModuleDef,
13191318
inst: &mut I,
13201319
timestamp: Timestamp,
13211320
) -> (ViewCallResult, bool) {
1322-
let view_calls = tx
1323-
.views_for_refresh()
1324-
.map(|info| {
1325-
let view_def = module_def
1326-
.get_view_by_id(info.fn_ptr, info.sender.is_none())
1327-
.unwrap_or_else(|| panic!("view with fn_ptr `{}` not found", info.fn_ptr));
1328-
1329-
CallViewParams {
1330-
view_name: view_def.name.clone(),
1331-
view_id: info.view_id,
1332-
table_id: info.table_id,
1333-
fn_ptr: view_def.fn_ptr,
1334-
caller,
1335-
sender: info.sender,
1336-
args: ArgsTuple::nullary(),
1337-
row_type: view_def.product_type_ref,
1338-
timestamp,
1339-
}
1340-
})
1341-
.collect::<Vec<_>>();
1342-
1343-
self.execute_view_calls(tx, view_calls, inst)
1321+
let mut instance = RefInstance {
1322+
common: self,
1323+
instance: inst,
1324+
};
1325+
ModuleHost::call_views_with_tx_at(tx, &mut instance, caller, timestamp)
13441326
}
13451327

13461328
/// Executes view calls and accumulate results.

0 commit comments

Comments
 (0)