Skip to content

Commit 3011ffd

Browse files
committed
simplify
1 parent 7745fd7 commit 3011ffd

5 files changed

Lines changed: 242 additions & 92 deletions

File tree

crates/core/src/host/idc_actor.rs

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@
1111
/// `on_result_reducer` (read from the outbox table's schema) and deletes the st_outbound_msg row.
1212
/// 6. Enforces sequential delivery per target database: msg N+1 is only delivered after N is done.
1313
use crate::db::relational_db::RelationalDB;
14-
use crate::host::module_host::WeakModuleHost;
15-
use crate::host::FunctionArgs;
14+
use crate::energy::EnergyQuanta;
15+
use crate::host::module_host::{CallReducerParams, ModuleInfo, WeakModuleHost};
16+
use crate::host::{FunctionArgs, ReducerCallResult, ReducerOutcome};
17+
use anyhow::anyhow;
1618
use bytes::Bytes;
17-
use spacetimedb_datastore::execution_context::Workload;
19+
use spacetimedb_datastore::execution_context::{ReducerContext, Workload};
20+
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
1821
use spacetimedb_datastore::system_tables::{st_inbound_msg_result_status, StOutboundMsgRow, ST_OUTBOUND_MSG_ID};
1922
use spacetimedb_datastore::traits::IsolationLevel;
2023
use spacetimedb_lib::{AlgebraicValue, Identity, ProductValue};
@@ -224,6 +227,119 @@ fn outcome_to_result(outcome: &DeliveryOutcome) -> (u8, Bytes) {
224227
}
225228
}
226229

230+
pub(crate) type ReducerSuccessAction = Box<dyn FnOnce(&mut MutTxId, &Option<Bytes>) -> anyhow::Result<()> + Send>;
231+
232+
fn reducer_workload(module: &ModuleInfo, params: &CallReducerParams) -> Workload {
233+
let reducer_def = module.module_def.reducer_by_id(params.reducer_id);
234+
Workload::Reducer(ReducerContext {
235+
name: reducer_def.name.clone(),
236+
caller_identity: params.caller_identity,
237+
caller_connection_id: params.caller_connection_id,
238+
timestamp: params.timestamp,
239+
arg_bsatn: params.args.get_bsatn().clone(),
240+
})
241+
}
242+
243+
fn duplicate_result_from_row(row: spacetimedb_datastore::system_tables::StInboundMsgRow) -> ReducerCallResult {
244+
let outcome = match row.result_status {
245+
st_inbound_msg_result_status::SUCCESS => ReducerOutcome::Committed,
246+
st_inbound_msg_result_status::REDUCER_ERROR => ReducerOutcome::Failed(Box::new(
247+
String::from_utf8_lossy(&row.result_payload).into_owned().into_boxed_str(),
248+
)),
249+
status => {
250+
log::warn!(
251+
"IDC: unexpected inbound dedup result_status={} for sender {}",
252+
status,
253+
Identity::from(row.database_identity)
254+
);
255+
ReducerOutcome::Failed(Box::new("unexpected inbound dedup result status".into()))
256+
}
257+
};
258+
259+
ReducerCallResult {
260+
outcome,
261+
reducer_return_value: (row.result_status == st_inbound_msg_result_status::SUCCESS).then_some(row.result_payload),
262+
energy_used: EnergyQuanta::ZERO,
263+
execution_duration: Duration::ZERO,
264+
tx_offset: None,
265+
}
266+
}
267+
268+
fn record_failed_inbound_result(db: &RelationalDB, sender_identity: Identity, sender_msg_id: u64, error: &str) {
269+
let mut dedup_tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
270+
if let Err(e) = dedup_tx.upsert_inbound_last_msg(
271+
sender_identity,
272+
sender_msg_id,
273+
st_inbound_msg_result_status::REDUCER_ERROR,
274+
error.to_string().into(),
275+
) {
276+
log::error!("IDC: failed to record reducer error in dedup table for sender {sender_identity}: {e}");
277+
let _ = db.rollback_mut_tx(dedup_tx);
278+
} else if let Err(e) = db.commit_tx(dedup_tx) {
279+
log::error!("IDC: failed to commit dedup error record for sender {sender_identity}: {e}");
280+
}
281+
}
282+
283+
pub(crate) fn call_reducer_from_database<F>(
284+
module: &ModuleInfo,
285+
db: &RelationalDB,
286+
params: CallReducerParams,
287+
sender_identity: Identity,
288+
sender_msg_id: u64,
289+
call_reducer: F,
290+
) -> (ReducerCallResult, bool)
291+
where
292+
F: FnOnce(Option<MutTxId>, CallReducerParams, ReducerSuccessAction) -> (ReducerCallResult, bool),
293+
{
294+
let tx = db.begin_mut_tx(IsolationLevel::Serializable, reducer_workload(module, &params));
295+
if let Some(row) = tx.get_inbound_msg_row(sender_identity)
296+
&& sender_msg_id <= row.last_outbound_msg
297+
{
298+
let _ = db.rollback_mut_tx(tx);
299+
return (duplicate_result_from_row(row), false);
300+
}
301+
302+
let (result, trapped) = call_reducer(
303+
Some(tx),
304+
params,
305+
Box::new(move |tx, reducer_return_value| {
306+
tx.upsert_inbound_last_msg(
307+
sender_identity,
308+
sender_msg_id,
309+
st_inbound_msg_result_status::SUCCESS,
310+
reducer_return_value.clone().unwrap_or_default(),
311+
)
312+
.map_err(anyhow::Error::from)
313+
}),
314+
);
315+
316+
if let ReducerOutcome::Failed(err) = &result.outcome {
317+
record_failed_inbound_result(db, sender_identity, sender_msg_id, err);
318+
}
319+
320+
(result, trapped)
321+
}
322+
323+
pub(crate) fn call_reducer_delete_outbound_on_success<F>(
324+
module: &ModuleInfo,
325+
db: &RelationalDB,
326+
params: CallReducerParams,
327+
msg_id: u64,
328+
call_reducer: F,
329+
) -> (ReducerCallResult, bool)
330+
where
331+
F: FnOnce(Option<MutTxId>, CallReducerParams, ReducerSuccessAction) -> (ReducerCallResult, bool),
332+
{
333+
let tx = db.begin_mut_tx(IsolationLevel::Serializable, reducer_workload(module, &params));
334+
call_reducer(
335+
Some(tx),
336+
params,
337+
Box::new(move |tx, _reducer_return_value| {
338+
tx.delete_outbound_msg(msg_id).map_err(|e| anyhow!(e))
339+
}),
340+
)
341+
}
342+
227343
/// Finalize a delivered message: call the on_result reducer (if any), then delete from ST_OUTBOUND_MSG.
228344
///
229345
/// On the happy path, `on_result_reducer` success and deletion of `st_outbound_msg`

crates/core/src/host/module_host.rs

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -624,16 +624,6 @@ pub struct CallReducerParams {
624624
pub timer: Option<Instant>,
625625
pub reducer_id: ReducerId,
626626
pub args: ArgsTuple,
627-
/// If set, enables at-most-once delivery semantics for database-to-database calls.
628-
///
629-
/// The tuple is `(sender_database_identity, sender_msg_id)`.
630-
/// Before running the reducer, `st_inbound_msg` is consulted:
631-
/// if `sender_msg_id` ≤ the stored last-delivered msg_id for the sender,
632-
/// the call is a duplicate and returns the stored committed or failed result
633-
/// without running the reducer again.
634-
/// Otherwise the reducer runs, and the stored msg_id is updated atomically
635-
/// within the same transaction.
636-
pub dedup_sender: Option<(Identity, u64)>,
637627
}
638628

639629
impl CallReducerParams {
@@ -654,7 +644,6 @@ impl CallReducerParams {
654644
timer: None,
655645
reducer_id,
656646
args,
657-
dedup_sender: None,
658647
}
659648
}
660649
}
@@ -1578,7 +1567,6 @@ impl ModuleHost {
15781567
timer,
15791568
reducer_id,
15801569
args,
1581-
dedup_sender: None,
15821570
})
15831571
}
15841572

@@ -1606,7 +1594,6 @@ impl ModuleHost {
16061594
timer,
16071595
reducer_id,
16081596
args,
1609-
dedup_sender: None,
16101597
};
16111598

16121599
self.call(
@@ -1643,7 +1630,6 @@ impl ModuleHost {
16431630
timer,
16441631
reducer_id,
16451632
args,
1646-
dedup_sender: None,
16471633
};
16481634

16491635
self.call(
@@ -1793,27 +1779,28 @@ impl ModuleHost {
17931779
return Err(ReducerCallError::NoSuchReducer);
17941780
}
17951781

1796-
let args = args
1797-
.into_tuple_for_def(&self.info.module_def, reducer_def)
1798-
.map_err(InvalidReducerArguments)?;
1799-
let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
1800-
let call_reducer_params = CallReducerParams {
1801-
timestamp: Timestamp::now(),
1782+
let call_reducer_params = Self::call_reducer_params(
1783+
&self.info,
18021784
caller_identity,
18031785
caller_connection_id,
18041786
client,
18051787
request_id,
18061788
timer,
18071789
reducer_id,
1790+
reducer_def,
18081791
args,
1809-
dedup_sender: Some((sender_database_identity, sender_msg_id)),
1810-
};
1792+
)?;
18111793

18121794
self.call(
18131795
&reducer_def.name,
1814-
call_reducer_params,
1815-
async |p, inst| Ok(inst.call_reducer(p)),
1816-
async |p, inst| inst.call_reducer(p).await,
1796+
(call_reducer_params, sender_database_identity, sender_msg_id),
1797+
async |(p, sender_database_identity, sender_msg_id), inst| {
1798+
Ok(inst.call_reducer_from_database(p, sender_database_identity, sender_msg_id))
1799+
},
1800+
async |(p, sender_database_identity, sender_msg_id), inst| {
1801+
inst.call_reducer_from_database(p, sender_database_identity, sender_msg_id)
1802+
.await
1803+
},
18171804
)
18181805
.await?
18191806
}

crates/core/src/host/scheduler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ pub(super) async fn call_scheduled_function(
443443
// print their message and backtrace when they occur, so we don't need to do
444444
// anything with the error payload.
445445
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
446-
inst_common.call_reducer_with_tx(Some(tx), params, inst)
446+
inst_common.call_reducer_with_tx(Some(tx), params, inst, |_tx, _ret| Ok(()))
447447
}));
448448
let reschedule = delete_scheduled_function_row(module_info, db, id, None, None, inst_common, inst);
449449
// Currently, we drop the return value from the function call. In the future,

crates/core/src/host/v8/mod.rs

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,13 @@ enum JsWorkerRequest {
586586
reply_tx: JsReplyTx<ReducerCallResult>,
587587
params: CallReducerParams,
588588
},
589+
/// See [`JsInstance::call_reducer_from_database`].
590+
CallReducerFromDatabase {
591+
reply_tx: JsReplyTx<ReducerCallResult>,
592+
params: CallReducerParams,
593+
sender_identity: Identity,
594+
sender_msg_id: u64,
595+
},
589596
/// See [`JsInstance::call_reducer_delete_outbound_on_success`].
590597
CallReducerDeleteOutboundOnSuccess {
591598
reply_tx: JsReplyTx<ReducerCallResult>,
@@ -633,7 +640,7 @@ enum JsWorkerRequest {
633640
},
634641
}
635642

636-
static_assert_size!(CallReducerParams, 256);
643+
static_assert_size!(CallReducerParams, 192);
637644

638645
fn send_worker_reply<T>(ctx: &str, reply_tx: JsReplyTx<T>, value: T, trapped: bool) {
639646
if reply_tx.send(JsWorkerReply { value, trapped }).is_err() {
@@ -908,6 +915,25 @@ impl JsInstanceLane {
908915
.map_err(|_| ReducerCallError::WorkerError(instance_lane_worker_error("call_reducer")))
909916
}
910917

918+
pub async fn call_reducer_from_database(
919+
&self,
920+
params: CallReducerParams,
921+
sender_identity: Identity,
922+
sender_msg_id: u64,
923+
) -> Result<ReducerCallResult, ReducerCallError> {
924+
self.run_once("call_reducer", |inst: JsInstance| async move {
925+
inst.send_request(|reply_tx| JsWorkerRequest::CallReducerFromDatabase {
926+
reply_tx,
927+
params,
928+
sender_identity,
929+
sender_msg_id,
930+
})
931+
.await
932+
})
933+
.await
934+
.map_err(|_| ReducerCallError::WorkerError(instance_lane_worker_error("call_reducer")))
935+
}
936+
911937
pub async fn call_reducer_delete_outbound_on_success(
912938
&self,
913939
params: CallReducerParams,
@@ -1166,7 +1192,15 @@ async fn spawn_instance_worker(
11661192
let mut requests_since_heap_check = 0u64;
11671193
let mut last_heap_check_at = Instant::now();
11681194
for request in request_rx.iter() {
1169-
let mut call_reducer = |tx, params| instance_common.call_reducer_with_tx(tx, params, &mut inst);
1195+
let info_for_idc = instance_common.info();
1196+
let db_for_idc = inst.replica_ctx().relational_db().clone();
1197+
let mut call_reducer_with_success =
1198+
|tx, params, on_success: crate::host::idc_actor::ReducerSuccessAction| {
1199+
instance_common.call_reducer_with_tx(tx, params, &mut inst, on_success)
1200+
};
1201+
let mut call_reducer = |tx, params| {
1202+
call_reducer_with_success(tx, params, Box::new(|_tx, _ret| Ok(())))
1203+
};
11701204
let mut should_exit = false;
11711205

11721206
core_pinner.pin_if_changed();
@@ -1188,15 +1222,36 @@ async fn spawn_instance_worker(
11881222
send_worker_reply("call_reducer", reply_tx, res, trapped);
11891223
should_exit = trapped;
11901224
}
1225+
JsWorkerRequest::CallReducerFromDatabase {
1226+
reply_tx,
1227+
params,
1228+
sender_identity,
1229+
sender_msg_id,
1230+
} => {
1231+
let (res, trapped) = crate::host::idc_actor::call_reducer_from_database(
1232+
info_for_idc.as_ref(),
1233+
db_for_idc.as_ref(),
1234+
params,
1235+
sender_identity,
1236+
sender_msg_id,
1237+
|tx, params, on_success| call_reducer_with_success(tx, params, on_success),
1238+
);
1239+
worker_trapped.store(trapped, Ordering::Relaxed);
1240+
send_worker_reply("call_reducer", reply_tx, res, trapped);
1241+
should_exit = trapped;
1242+
}
11911243
JsWorkerRequest::CallReducerDeleteOutboundOnSuccess {
11921244
reply_tx,
11931245
params,
11941246
msg_id,
11951247
} => {
1196-
let (res, trapped) =
1197-
instance_common.call_reducer_with_tx_and_success_action(None, params, &mut inst, |tx| {
1198-
tx.delete_outbound_msg(msg_id).map_err(anyhow::Error::from)
1199-
});
1248+
let (res, trapped) = crate::host::idc_actor::call_reducer_delete_outbound_on_success(
1249+
info_for_idc.as_ref(),
1250+
db_for_idc.as_ref(),
1251+
params,
1252+
msg_id,
1253+
|tx, params, on_success| call_reducer_with_success(tx, params, on_success),
1254+
);
12001255
worker_trapped.store(trapped, Ordering::Relaxed);
12011256
send_worker_reply("call_reducer", reply_tx, res, trapped);
12021257
should_exit = trapped;

0 commit comments

Comments
 (0)