Skip to content

Commit 7745fd7

Browse files
committed
Preserve IDC reducer return values
1 parent 1437e92 commit 7745fd7

9 files changed

Lines changed: 66 additions & 68 deletions

File tree

crates/bindings/src/rt.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -538,11 +538,12 @@ pub const fn assert_outbox_table_primary_key<T: OutboxTablePrimaryKey>() {}
538538

539539
/// Verify at compile time that a function has the correct signature for an outbox `on_result` reducer.
540540
///
541-
/// The reducer must accept `(OutboxRow, Result<(), String>)` as its user-supplied arguments:
542-
/// `fn on_result(ctx: &ReducerContext, request: OutboxRow, result: Result<(), String>)`
543-
pub const fn outbox_typecheck<'de, OutboxRow>(_x: impl Reducer<'de, (OutboxRow, Result<(), String>)>)
541+
/// The reducer must accept `(OutboxRow, Result<T, String>)` as its user-supplied arguments:
542+
/// `fn on_result(ctx: &ReducerContext, request: OutboxRow, result: Result<T, String>)`
543+
pub const fn outbox_typecheck<'de, OutboxRow, T>(_x: impl Reducer<'de, (OutboxRow, Result<T, String>)>)
544544
where
545545
OutboxRow: spacetimedb_lib::SpacetimeType + Serialize + Deserialize<'de>,
546+
T: spacetimedb_lib::SpacetimeType + Serialize + Deserialize<'de>,
546547
{
547548
core::mem::forget(_x);
548549
}

crates/client-api/src/routes/database.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -310,15 +310,26 @@ pub async fn call_from_database<S: ControlStateDelegate + NodeDelegate>(
310310
match result {
311311
Ok(rcr) => {
312312
let (status, body) = match rcr.outcome {
313-
ReducerOutcome::Committed => (StatusCode::OK, "".into()),
313+
ReducerOutcome::Committed => (
314+
StatusCode::OK,
315+
axum::body::Body::from(rcr.reducer_return_value.unwrap_or_default()),
316+
),
314317
// 422 = reducer ran but returned Err; the IDC actor uses this to distinguish
315318
// reducer failures from transport errors (which it retries).
316-
ReducerOutcome::Failed(errmsg) => (StatusCode::UNPROCESSABLE_ENTITY, *errmsg),
319+
ReducerOutcome::Failed(errmsg) => {
320+
(
321+
StatusCode::UNPROCESSABLE_ENTITY,
322+
axum::body::Body::from(errmsg.to_string()),
323+
)
324+
}
317325
ReducerOutcome::BudgetExceeded => {
318326
log::warn!(
319327
"Node's energy budget exceeded for identity: {owner_identity} while executing {reducer}"
320328
);
321-
(StatusCode::PAYMENT_REQUIRED, "Module energy budget exhausted.".into())
329+
(
330+
StatusCode::PAYMENT_REQUIRED,
331+
axum::body::Body::from("Module energy budget exhausted."),
332+
)
322333
}
323334
};
324335
Ok((

crates/core/src/host/host_controller.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::util::jobs::{AllocatedJobCore, JobCores};
2323
use crate::worker_metrics::WORKER_METRICS;
2424
use anyhow::{anyhow, bail, Context};
2525
use async_trait::async_trait;
26+
use bytes::Bytes;
2627
use durability::{Durability, EmptyHistory};
2728
use log::{info, trace, warn};
2829
use parking_lot::Mutex;
@@ -139,6 +140,7 @@ impl HostRuntimes {
139140
#[derive(Debug)]
140141
pub struct ReducerCallResult {
141142
pub outcome: ReducerOutcome,
143+
pub reducer_return_value: Option<Bytes>,
142144
pub energy_used: EnergyQuanta,
143145
pub execution_duration: Duration,
144146
pub tx_offset: Option<TransactionOffset>,

crates/core/src/host/idc_actor.rs

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
use crate::db::relational_db::RelationalDB;
1414
use crate::host::module_host::WeakModuleHost;
1515
use crate::host::FunctionArgs;
16+
use bytes::Bytes;
1617
use spacetimedb_datastore::execution_context::Workload;
17-
use spacetimedb_datastore::system_tables::{StOutboundMsgRow, ST_OUTBOUND_MSG_ID};
18+
use spacetimedb_datastore::system_tables::{st_inbound_msg_result_status, StOutboundMsgRow, ST_OUTBOUND_MSG_ID};
1819
use spacetimedb_datastore::traits::IsolationLevel;
1920
use spacetimedb_lib::{AlgebraicValue, Identity, ProductValue};
2021
use spacetimedb_primitives::{ColId, TableId};
@@ -130,7 +131,7 @@ impl TargetState {
130131
/// Outcome of a delivery attempt.
131132
enum DeliveryOutcome {
132133
/// Reducer succeeded (HTTP 200).
133-
Success,
134+
Success(Bytes),
134135
/// Reducer ran but returned Err (HTTP 422).
135136
ReducerError(String),
136137
/// Budget exceeded (HTTP 402).
@@ -211,14 +212,13 @@ async fn run_idc_loop(
211212
}
212213

213214
/// Decode the delivery outcome into `(result_status, result_payload)` for recording.
214-
fn outcome_to_result(outcome: &DeliveryOutcome) -> (u8, String) {
215-
use spacetimedb_datastore::system_tables::st_inbound_msg_result_status;
215+
fn outcome_to_result(outcome: &DeliveryOutcome) -> (u8, Bytes) {
216216
match outcome {
217-
DeliveryOutcome::Success => (st_inbound_msg_result_status::SUCCESS, String::new()),
218-
DeliveryOutcome::ReducerError(msg) => (st_inbound_msg_result_status::REDUCER_ERROR, msg.clone()),
217+
DeliveryOutcome::Success(payload) => (st_inbound_msg_result_status::SUCCESS, payload.clone()),
218+
DeliveryOutcome::ReducerError(msg) => (st_inbound_msg_result_status::REDUCER_ERROR, Bytes::from(msg.clone())),
219219
DeliveryOutcome::BudgetExceeded => (
220220
st_inbound_msg_result_status::REDUCER_ERROR,
221-
"budget exceeded".to_string(),
221+
Bytes::from("budget exceeded".to_string()),
222222
),
223223
DeliveryOutcome::TransportError(_) => unreachable!("transport errors never finalize"),
224224
}
@@ -232,8 +232,8 @@ async fn finalize_message(
232232
db: &RelationalDB,
233233
module_host: &WeakModuleHost,
234234
msg: &PendingMessage,
235-
_result_status: u8,
236-
result_payload: String,
235+
result_status: u8,
236+
result_payload: Bytes,
237237
) {
238238
// Call the on_result reducer if configured.
239239
if let Some(on_result_reducer) = &msg.on_result_reducer {
@@ -247,23 +247,37 @@ async fn finalize_message(
247247
return;
248248
};
249249

250-
// Encode `(request_row: OutboxRow, result: Result<(), String>)` as BSATN args.
251-
let result = if result_payload.is_empty() {
252-
Ok::<(), String>(())
253-
} else {
254-
Err::<(), String>(result_payload)
255-
};
256250
let mut args_bytes = Vec::new();
257-
if let Err(e) = spacetimedb_sats::bsatn::to_writer(&mut args_bytes, &msg.request_row)
258-
.and_then(|_| spacetimedb_sats::bsatn::to_writer(&mut args_bytes, &result))
259-
{
251+
if let Err(e) = spacetimedb_sats::bsatn::to_writer(&mut args_bytes, &msg.request_row) {
260252
log::error!(
261253
"idc_actor: failed to encode on_result args for msg_id={}: {e}",
262254
msg.msg_id
263255
);
264256
delete_message(db, msg.msg_id);
265257
return;
266258
}
259+
match result_status {
260+
st_inbound_msg_result_status::SUCCESS => {
261+
args_bytes.push(0);
262+
args_bytes.extend_from_slice(&result_payload);
263+
}
264+
st_inbound_msg_result_status::REDUCER_ERROR => {
265+
let err = String::from_utf8_lossy(&result_payload).into_owned();
266+
if let Err(e) = spacetimedb_sats::bsatn::to_writer(&mut args_bytes, &Err::<(), String>(err)) {
267+
log::error!(
268+
"idc_actor: failed to encode on_result error args for msg_id={}: {e}",
269+
msg.msg_id
270+
);
271+
delete_message(db, msg.msg_id);
272+
return;
273+
}
274+
}
275+
status => {
276+
log::error!("idc_actor: unexpected result status {status} for msg_id={}", msg.msg_id);
277+
delete_message(db, msg.msg_id);
278+
return;
279+
}
280+
}
267281

268282
let caller_identity = Identity::ZERO; // system call
269283
let result = host
@@ -452,7 +466,7 @@ async fn attempt_delivery(client: &reqwest::Client, config: &IdcActorConfig, msg
452466
let status = resp.status();
453467
if status.is_success() {
454468
// HTTP 200: reducer committed successfully.
455-
DeliveryOutcome::Success
469+
DeliveryOutcome::Success(resp.bytes().await.unwrap_or_default())
456470
} else if status.as_u16() == 422 {
457471
// HTTP 422: reducer ran but returned Err(...).
458472
let body = resp.text().await.unwrap_or_default();

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 4 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::instrumentation::CallTimes;
22
use super::*;
33
use crate::client::ClientActorId;
44
use crate::database_logger;
5-
use crate::energy::{EnergyMonitor, EnergyQuanta, FunctionBudget, FunctionFingerprint};
5+
use crate::energy::{EnergyMonitor, FunctionBudget, FunctionFingerprint};
66
use crate::error::DBError;
77
use crate::host::host_controller::CallProcedureReturn;
88
use crate::host::instance_env::{InstanceEnv, TxSlot};
@@ -880,41 +880,6 @@ impl InstanceCommon {
880880
let workload = Workload::Reducer(ReducerContext::from(op.clone()));
881881
let tx = tx.unwrap_or_else(|| stdb.begin_mut_tx(IsolationLevel::Serializable, workload));
882882

883-
// Check the dedup index atomically within this transaction.
884-
// If the incoming msg_id is ≤ the last delivered msg_id for this sender,
885-
// the message is a duplicate; discard it and return the stored result.
886-
if let Some((sender_identity, sender_msg_id)) = dedup_sender {
887-
let stored = tx.get_inbound_msg_row(sender_identity);
888-
if stored.as_ref().is_some_and(|r| sender_msg_id <= r.last_outbound_msg) {
889-
let _ = stdb.rollback_mut_tx(tx);
890-
let stored = stored.unwrap();
891-
let outcome = match stored.result_status {
892-
s if s == st_inbound_msg_result_status::SUCCESS => ReducerOutcome::Committed,
893-
s if s == st_inbound_msg_result_status::REDUCER_ERROR => {
894-
ReducerOutcome::Failed(Box::new(stored.result_payload.into()))
895-
}
896-
_ => {
897-
log::warn!(
898-
"IDC: unexpected inbound dedup result_status={} for sender {}, msg_id={}",
899-
stored.result_status,
900-
sender_identity,
901-
sender_msg_id
902-
);
903-
ReducerOutcome::Committed
904-
}
905-
};
906-
return (
907-
ReducerCallResult {
908-
outcome,
909-
energy_used: EnergyQuanta::ZERO,
910-
execution_duration: Duration::ZERO,
911-
tx_offset: None,
912-
},
913-
false,
914-
);
915-
}
916-
}
917-
918883
let mut tx_slot = inst.tx_slot();
919884

920885
let vm_metrics = self.vm_metrics.get_for_reducer_id(reducer_id);
@@ -969,7 +934,7 @@ impl InstanceCommon {
969934
sender_identity,
970935
sender_msg_id,
971936
st_inbound_msg_result_status::SUCCESS,
972-
String::new(),
937+
return_value.clone().unwrap_or_default(),
973938
),
974939
None => Ok(()),
975940
};
@@ -1049,7 +1014,7 @@ impl InstanceCommon {
10491014
sender_identity,
10501015
sender_msg_id,
10511016
st_inbound_msg_result_status::REDUCER_ERROR,
1052-
err_msg,
1017+
err_msg.into(),
10531018
) {
10541019
log::error!("IDC: failed to record reducer error in dedup table for sender {sender_identity}: {e}");
10551020
let _ = stdb.rollback_mut_tx(dedup_tx);
@@ -1060,6 +1025,7 @@ impl InstanceCommon {
10601025

10611026
let res = ReducerCallResult {
10621027
outcome: ReducerOutcome::from(&event.status),
1028+
reducer_return_value: event.reducer_return_value.clone(),
10631029
energy_used: energy_quanta_used,
10641030
execution_duration: total_duration,
10651031
tx_offset: Some(tx_offset),

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::{
3535
locking_tx_datastore::state_view::ScanOrIndex,
3636
traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags},
3737
};
38+
use bytes::Bytes;
3839
use core::{cell::RefCell, iter, mem, ops::RangeBounds};
3940
use itertools::Either;
4041
use smallvec::SmallVec;
@@ -2636,7 +2637,7 @@ impl MutTxId {
26362637
sender_identity: Identity,
26372638
last_outbound_msg: u64,
26382639
result_status: u8,
2639-
result_payload: String,
2640+
result_payload: Bytes,
26402641
) -> Result<()> {
26412642
// Delete the existing row if present.
26422643
self.delete_col_eq(

crates/datastore/src/system_tables.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
//! - Use [`st_fields_enum`] to define its column enum.
1212
//! - Register its schema in [`system_module_def`], making sure to call `validate_system_table` at the end of the function.
1313
14+
use bytes::Bytes;
1415
use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap};
1516
use spacetimedb_lib::db::auth::{StAccess, StTableType};
1617
use spacetimedb_lib::db::raw_def::v9::{btree, RawSql};
@@ -1936,8 +1937,10 @@ pub struct StInboundMsgRow {
19361937
pub last_outbound_msg: u64,
19371938
/// See [st_inbound_msg_result_status] for values.
19381939
pub result_status: u8,
1939-
/// Error message if result_status is REDUCER_ERROR, empty otherwise.
1940-
pub result_payload: String,
1940+
/// Reducer return payload encoded as raw bytes.
1941+
/// For `SUCCESS`, this stores the committed reducer return value.
1942+
/// For `REDUCER_ERROR`, this stores the error message bytes.
1943+
pub result_payload: Bytes,
19411944
}
19421945

19431946
impl TryFrom<RowRef<'_>> for StInboundMsgRow {

crates/lib/src/db/raw_def/v10.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ pub struct RawColumnDefaultValueV10 {
280280
///
281281
/// The `remote_reducer` is the name of the reducer to call on the target database.
282282
/// If `on_result_reducer` is set, that local reducer is called when delivery completes,
283-
/// with signature `fn on_result(ctx: &ReducerContext, request: OutboxRow, result: Result<(), String>)`.
283+
/// with signature `fn on_result(ctx: &ReducerContext, request: OutboxRow, result: Result<T, String>)`.
284284
#[derive(Debug, Clone, SpacetimeType)]
285285
#[sats(crate = crate)]
286286
#[cfg_attr(feature = "test", derive(PartialEq, Eq, PartialOrd, Ord))]

crates/schema/src/def.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ pub struct OutboxDef {
723723

724724
/// The local reducer called with the delivery result, if any.
725725
///
726-
/// Signature: `fn on_result(ctx: &ReducerContext, request: OutboxRow, result: Result<(), String>)`.
726+
/// Signature: `fn on_result(ctx: &ReducerContext, request: OutboxRow, result: Result<T, String>)`.
727727
pub on_result_reducer: Option<Identifier>,
728728
}
729729

0 commit comments

Comments
 (0)