Skip to content

Commit 37853a2

Browse files
committed
some refactoring
1 parent 3011ffd commit 37853a2

7 files changed

Lines changed: 132 additions & 90 deletions

File tree

crates/client-api/src/routes/database.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ fn map_reducer_error(e: ReducerCallError, reducer: &str) -> (StatusCode, String)
103103
log::debug!("Attempt to call {lifecycle:?} lifecycle reducer {reducer}");
104104
StatusCode::BAD_REQUEST
105105
}
106+
ReducerCallError::OutOfOrderInboundMessage { .. } => StatusCode::BAD_REQUEST,
106107
};
107108

108109
log::debug!("Error while invoking reducer {e:#}");
@@ -244,7 +245,7 @@ pub struct CallFromDatabaseQuery {
244245
/// - `msg_id` — the inter-database message ID from the sender's st_outbound_msg.
245246
///
246247
/// Before invoking the reducer, the receiver checks `st_inbound_msg`.
247-
/// If the incoming `msg_id` is the last delivered msg_id for `sender_identity`,
248+
/// If the incoming `msg_id` is the last delivered msg_id for `sender_identity`,
248249
/// the call is a duplicate and 200 OK is returned immediately without running the reducer.
249250
/// Otherwise the reducer is invoked, the dedup index is updated atomically in the same
250251
/// transaction, and an acknowledgment is returned on success.

crates/core/src/host/idc_actor.rs

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
use crate::db::relational_db::RelationalDB;
1414
use crate::energy::EnergyQuanta;
1515
use crate::host::module_host::{CallReducerParams, ModuleInfo, WeakModuleHost};
16-
use crate::host::{FunctionArgs, ReducerCallResult, ReducerOutcome};
16+
use crate::host::{FunctionArgs, ReducerCallError, ReducerCallResult, ReducerOutcome};
1717
use anyhow::anyhow;
1818
use bytes::Bytes;
1919
use spacetimedb_datastore::execution_context::{ReducerContext, Workload};
2020
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
21-
use spacetimedb_datastore::system_tables::{st_inbound_msg_result_status, StOutboundMsgRow, ST_OUTBOUND_MSG_ID};
21+
use spacetimedb_datastore::system_tables::{
22+
StInboundMsgResultStatus, StOutboundMsgRow, ST_OUTBOUND_MSG_ID,
23+
};
2224
use spacetimedb_datastore::traits::IsolationLevel;
2325
use spacetimedb_lib::{AlgebraicValue, Identity, ProductValue};
2426
use spacetimedb_primitives::{ColId, TableId};
@@ -215,12 +217,12 @@ async fn run_idc_loop(
215217
}
216218

217219
/// Decode the delivery outcome into `(result_status, result_payload)` for recording.
218-
fn outcome_to_result(outcome: &DeliveryOutcome) -> (u8, Bytes) {
220+
fn outcome_to_result(outcome: &DeliveryOutcome) -> (StInboundMsgResultStatus, Bytes) {
219221
match outcome {
220-
DeliveryOutcome::Success(payload) => (st_inbound_msg_result_status::SUCCESS, payload.clone()),
221-
DeliveryOutcome::ReducerError(msg) => (st_inbound_msg_result_status::REDUCER_ERROR, Bytes::from(msg.clone())),
222+
DeliveryOutcome::Success(payload) => (StInboundMsgResultStatus::Success, payload.clone()),
223+
DeliveryOutcome::ReducerError(msg) => (StInboundMsgResultStatus::ReducerError, Bytes::from(msg.clone())),
222224
DeliveryOutcome::BudgetExceeded => (
223-
st_inbound_msg_result_status::REDUCER_ERROR,
225+
StInboundMsgResultStatus::ReducerError,
224226
Bytes::from("budget exceeded".to_string()),
225227
),
226228
DeliveryOutcome::TransportError(_) => unreachable!("transport errors never finalize"),
@@ -229,6 +231,11 @@ fn outcome_to_result(outcome: &DeliveryOutcome) -> (u8, Bytes) {
229231

230232
pub(crate) type ReducerSuccessAction = Box<dyn FnOnce(&mut MutTxId, &Option<Bytes>) -> anyhow::Result<()> + Send>;
231233

234+
#[derive(Debug, Clone, Copy)]
235+
pub enum ReducerSuccessActionKind {
236+
DeleteOutboundMsg(u64),
237+
}
238+
232239
fn reducer_workload(module: &ModuleInfo, params: &CallReducerParams) -> Workload {
233240
let reducer_def = module.module_def.reducer_by_id(params.reducer_id);
234241
Workload::Reducer(ReducerContext {
@@ -242,23 +249,15 @@ fn reducer_workload(module: &ModuleInfo, params: &CallReducerParams) -> Workload
242249

243250
fn duplicate_result_from_row(row: spacetimedb_datastore::system_tables::StInboundMsgRow) -> ReducerCallResult {
244251
let outcome = match row.result_status {
245-
st_inbound_msg_result_status::SUCCESS => ReducerOutcome::Committed,
246-
st_inbound_msg_result_status::REDUCER_ERROR => ReducerOutcome::Failed(Box::new(
252+
StInboundMsgResultStatus::Success => ReducerOutcome::Committed,
253+
StInboundMsgResultStatus::ReducerError => ReducerOutcome::Failed(Box::new(
247254
String::from_utf8_lossy(&row.result_payload).into_owned().into_boxed_str(),
248255
)),
249-
status => {
250-
log::warn!(
251-
"IDC: unexpected inbound dedup result_status={} for sender {}",
252-
status,
253-
Identity::from(row.database_identity)
254-
);
255-
ReducerOutcome::Failed(Box::new("unexpected inbound dedup result status".into()))
256-
}
257256
};
258257

259258
ReducerCallResult {
260259
outcome,
261-
reducer_return_value: (row.result_status == st_inbound_msg_result_status::SUCCESS).then_some(row.result_payload),
260+
reducer_return_value: (row.result_status == StInboundMsgResultStatus::Success).then_some(row.result_payload),
262261
energy_used: EnergyQuanta::ZERO,
263262
execution_duration: Duration::ZERO,
264263
tx_offset: None,
@@ -270,7 +269,7 @@ fn record_failed_inbound_result(db: &RelationalDB, sender_identity: Identity, se
270269
if let Err(e) = dedup_tx.upsert_inbound_last_msg(
271270
sender_identity,
272271
sender_msg_id,
273-
st_inbound_msg_result_status::REDUCER_ERROR,
272+
StInboundMsgResultStatus::ReducerError,
274273
error.to_string().into(),
275274
) {
276275
log::error!("IDC: failed to record reducer error in dedup table for sender {sender_identity}: {e}");
@@ -287,16 +286,25 @@ pub(crate) fn call_reducer_from_database<F>(
287286
sender_identity: Identity,
288287
sender_msg_id: u64,
289288
call_reducer: F,
290-
) -> (ReducerCallResult, bool)
289+
) -> Result<(ReducerCallResult, bool), ReducerCallError>
291290
where
292291
F: FnOnce(Option<MutTxId>, CallReducerParams, ReducerSuccessAction) -> (ReducerCallResult, bool),
293292
{
294293
let tx = db.begin_mut_tx(IsolationLevel::Serializable, reducer_workload(module, &params));
295-
if let Some(row) = tx.get_inbound_msg_row(sender_identity)
296-
&& sender_msg_id <= row.last_outbound_msg
297-
{
298-
let _ = db.rollback_mut_tx(tx);
299-
return (duplicate_result_from_row(row), false);
294+
if let Some(row) = tx.get_inbound_msg_row(sender_identity) {
295+
if sender_msg_id == row.last_outbound_msg {
296+
let _ = db.rollback_mut_tx(tx);
297+
return Ok((duplicate_result_from_row(row), false));
298+
}
299+
if sender_msg_id < row.last_outbound_msg {
300+
let expected = row.last_outbound_msg + 1;
301+
let _ = db.rollback_mut_tx(tx);
302+
return Err(ReducerCallError::OutOfOrderInboundMessage {
303+
sender: sender_identity,
304+
expected,
305+
received: sender_msg_id,
306+
});
307+
}
300308
}
301309

302310
let (result, trapped) = call_reducer(
@@ -306,7 +314,7 @@ where
306314
tx.upsert_inbound_last_msg(
307315
sender_identity,
308316
sender_msg_id,
309-
st_inbound_msg_result_status::SUCCESS,
317+
StInboundMsgResultStatus::Success,
310318
reducer_return_value.clone().unwrap_or_default(),
311319
)
312320
.map_err(anyhow::Error::from)
@@ -317,14 +325,14 @@ where
317325
record_failed_inbound_result(db, sender_identity, sender_msg_id, err);
318326
}
319327

320-
(result, trapped)
328+
Ok((result, trapped))
321329
}
322330

323-
pub(crate) fn call_reducer_delete_outbound_on_success<F>(
331+
pub(crate) fn call_reducer_with_success_action<F>(
324332
module: &ModuleInfo,
325333
db: &RelationalDB,
326334
params: CallReducerParams,
327-
msg_id: u64,
335+
action: ReducerSuccessActionKind,
328336
call_reducer: F,
329337
) -> (ReducerCallResult, bool)
330338
where
@@ -335,7 +343,11 @@ where
335343
Some(tx),
336344
params,
337345
Box::new(move |tx, _reducer_return_value| {
338-
tx.delete_outbound_msg(msg_id).map_err(|e| anyhow!(e))
346+
match action {
347+
ReducerSuccessActionKind::DeleteOutboundMsg(msg_id) => {
348+
tx.delete_outbound_msg(msg_id).map_err(|e| anyhow!(e))
349+
}
350+
}
339351
}),
340352
)
341353
}
@@ -348,7 +360,7 @@ async fn finalize_message(
348360
db: &RelationalDB,
349361
module_host: &WeakModuleHost,
350362
msg: &PendingMessage,
351-
result_status: u8,
363+
result_status: StInboundMsgResultStatus,
352364
result_payload: Bytes,
353365
) {
354366
// Call the on_result reducer if configured.
@@ -373,11 +385,11 @@ async fn finalize_message(
373385
return;
374386
}
375387
match result_status {
376-
st_inbound_msg_result_status::SUCCESS => {
388+
StInboundMsgResultStatus::Success => {
377389
args_bytes.push(0);
378390
args_bytes.extend_from_slice(&result_payload);
379391
}
380-
st_inbound_msg_result_status::REDUCER_ERROR => {
392+
StInboundMsgResultStatus::ReducerError => {
381393
let err = String::from_utf8_lossy(&result_payload).into_owned();
382394
if let Err(e) = spacetimedb_sats::bsatn::to_writer(&mut args_bytes, &Err::<(), String>(err)) {
383395
log::error!(
@@ -388,24 +400,19 @@ async fn finalize_message(
388400
return;
389401
}
390402
}
391-
status => {
392-
log::error!("idc_actor: unexpected result status {status} for msg_id={}", msg.msg_id);
393-
delete_message(db, msg.msg_id);
394-
return;
395-
}
396403
}
397404

398405
let caller_identity = Identity::ZERO; // system call
399406
let result = host
400-
.call_reducer_delete_outbound_on_success(
407+
.call_reducer_with_success_action(
401408
caller_identity,
402409
None, // no connection_id
403410
None, // no client sender
404411
None, // no request_id
405412
None, // no timer
406413
on_result_reducer,
407414
FunctionArgs::Bsatn(bytes::Bytes::from(args_bytes)),
408-
msg.msg_id,
415+
ReducerSuccessActionKind::DeleteOutboundMsg(msg.msg_id),
409416
)
410417
.await;
411418

crates/core/src/host/module_host.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -946,6 +946,12 @@ pub enum ReducerCallError {
946946
ScheduleReducerNotFound,
947947
#[error("can't directly call special {0:?} lifecycle reducer")]
948948
LifecycleReducer(Lifecycle),
949+
#[error("out-of-order inbound message id {received} from {sender}; expected {expected}")]
950+
OutOfOrderInboundMessage {
951+
sender: Identity,
952+
expected: u64,
953+
received: u64,
954+
},
949955
}
950956

951957
#[derive(Debug, PartialEq, Eq)]
@@ -1605,7 +1611,7 @@ impl ModuleHost {
16051611
.await?
16061612
}
16071613

1608-
async fn call_reducer_delete_outbound_on_success_inner(
1614+
async fn call_reducer_with_success_action_inner(
16091615
&self,
16101616
caller_identity: Identity,
16111617
caller_connection_id: Option<ConnectionId>,
@@ -1615,7 +1621,7 @@ impl ModuleHost {
16151621
reducer_id: ReducerId,
16161622
reducer_def: &ReducerDef,
16171623
args: FunctionArgs,
1618-
msg_id: u64,
1624+
action: crate::host::idc_actor::ReducerSuccessActionKind,
16191625
) -> Result<ReducerCallResult, ReducerCallError> {
16201626
let args = args
16211627
.into_tuple_for_def(&self.info.module_def, reducer_def)
@@ -1634,9 +1640,9 @@ impl ModuleHost {
16341640

16351641
self.call(
16361642
&reducer_def.name,
1637-
(call_reducer_params, msg_id),
1638-
async |(p, msg_id), inst| Ok(inst.call_reducer_delete_outbound_on_success(p, msg_id)),
1639-
async |(p, msg_id), inst| inst.call_reducer_delete_outbound_on_success(p, msg_id).await,
1643+
(call_reducer_params, action),
1644+
async |(p, action), inst| Ok(inst.call_reducer_with_success_action(p, action)),
1645+
async |(p, action), inst| inst.call_reducer_with_success_action(p, action).await,
16401646
)
16411647
.await?
16421648
}
@@ -1691,7 +1697,7 @@ impl ModuleHost {
16911697
res
16921698
}
16931699

1694-
pub async fn call_reducer_delete_outbound_on_success(
1700+
pub async fn call_reducer_with_success_action(
16951701
&self,
16961702
caller_identity: Identity,
16971703
caller_connection_id: Option<ConnectionId>,
@@ -1700,7 +1706,7 @@ impl ModuleHost {
17001706
timer: Option<Instant>,
17011707
reducer_name: &str,
17021708
args: FunctionArgs,
1703-
msg_id: u64,
1709+
action: crate::host::idc_actor::ReducerSuccessActionKind,
17041710
) -> Result<ReducerCallResult, ReducerCallError> {
17051711
let res = async {
17061712
let (reducer_id, reducer_def) = self
@@ -1716,7 +1722,7 @@ impl ModuleHost {
17161722
return Err(ReducerCallError::NoSuchReducer);
17171723
}
17181724

1719-
self.call_reducer_delete_outbound_on_success_inner(
1725+
self.call_reducer_with_success_action_inner(
17201726
caller_identity,
17211727
caller_connection_id,
17221728
client,
@@ -1725,7 +1731,7 @@ impl ModuleHost {
17251731
reducer_id,
17261732
reducer_def,
17271733
args,
1728-
msg_id,
1734+
action,
17291735
)
17301736
.await
17311737
}
@@ -1795,7 +1801,7 @@ impl ModuleHost {
17951801
&reducer_def.name,
17961802
(call_reducer_params, sender_database_identity, sender_msg_id),
17971803
async |(p, sender_database_identity, sender_msg_id), inst| {
1798-
Ok(inst.call_reducer_from_database(p, sender_database_identity, sender_msg_id))
1804+
inst.call_reducer_from_database(p, sender_database_identity, sender_msg_id)
17991805
},
18001806
async |(p, sender_database_identity, sender_msg_id), inst| {
18011807
inst.call_reducer_from_database(p, sender_database_identity, sender_msg_id)

0 commit comments

Comments
 (0)