Skip to content

Commit 904471b

Browse files
committed
outbox macro change
1 parent a337bec commit 904471b

14 files changed

Lines changed: 314 additions & 60 deletions

File tree

crates/bindings-macro/src/table.rs

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,26 @@ pub(crate) struct TableArgs {
2525
outbox: Option<OutboxArg>,
2626
}
2727

28-
/// Parsed from `outbox(remote_reducer_fn)` optionally followed by `on_result(local_reducer_fn)`.
28+
/// Parsed from `outbox(remote_reducer_fn, on_result = local_reducer_fn)`.
29+
///
30+
/// For backwards compatibility, `on_result(local_reducer_fn)` is also accepted as a sibling
31+
/// table attribute and attached to the previously-declared outbox.
2932
struct OutboxArg {
3033
span: Span,
31-
/// Path to the remote-side reducer function (used only for its name via `FnInfo::NAME`).
34+
/// Path to the remote-side reducer function, used only for its final path segment.
3235
remote_reducer: Path,
3336
/// Path to the local `on_result` reducer, if any.
3437
on_result_reducer: Option<Path>,
3538
}
3639

40+
fn path_tail_name(path: &Path) -> LitStr {
41+
let segment = path
42+
.segments
43+
.last()
44+
.expect("syn::Path should always contain at least one segment");
45+
LitStr::new(&segment.ident.to_string(), segment.ident.span())
46+
}
47+
3748
enum TableAccess {
3849
Public(Span),
3950
Private(Span),
@@ -268,18 +279,44 @@ impl ScheduledArg {
268279
}
269280

270281
impl OutboxArg {
271-
/// Parse `outbox(remote_reducer_path)`.
282+
/// Parse `outbox(remote_reducer_path, on_result = local_reducer_path)`.
272283
///
273-
/// `on_result` is parsed separately via `parse_single_path_meta` and attached afterwards.
284+
/// For backwards compatibility, `on_result` may also be parsed separately via
285+
/// `parse_single_path_meta` and attached afterwards.
274286
fn parse_meta(meta: ParseNestedMeta) -> syn::Result<Self> {
275287
let span = meta.path.span();
276288
let mut remote_reducer: Option<Path> = None;
289+
let mut on_result_reducer: Option<Path> = None;
277290

278291
meta.parse_nested_meta(|meta| {
279-
if meta.input.peek(syn::Token![=]) || meta.input.peek(syn::token::Paren) {
280-
Err(meta.error("outbox takes a single function path, e.g. `outbox(my_remote_reducer)`"))
292+
if meta.input.peek(Token![=]) {
293+
if meta.path.is_ident("on_result") {
294+
check_duplicate_msg(
295+
&on_result_reducer,
296+
&meta,
297+
"can only specify one on_result reducer",
298+
)?;
299+
on_result_reducer = Some(meta.value()?.parse()?);
300+
Ok(())
301+
} else {
302+
Err(meta.error(
303+
"outbox only supports `on_result = my_local_reducer` as a named argument",
304+
))
305+
}
306+
} else if meta.input.peek(syn::token::Paren) {
307+
Err(meta.error(
308+
"outbox expects a remote reducer path and optional `on_result = my_local_reducer`, e.g. `outbox(my_remote_reducer, on_result = my_local_reducer)`",
309+
))
310+
} else if meta.path.is_ident("on_result") {
311+
Err(meta.error(
312+
"outbox `on_result` must use `=`, e.g. `outbox(my_remote_reducer, on_result = my_local_reducer)`",
313+
))
281314
} else {
282-
check_duplicate_msg(&remote_reducer, &meta, "can only specify one remote reducer for outbox")?;
315+
check_duplicate_msg(
316+
&remote_reducer,
317+
&meta,
318+
"can only specify one remote reducer for outbox",
319+
)?;
283320
remote_reducer = Some(meta.path);
284321
Ok(())
285322
}
@@ -291,7 +328,7 @@ impl OutboxArg {
291328
Ok(Self {
292329
span,
293330
remote_reducer,
294-
on_result_reducer: None,
331+
on_result_reducer,
295332
})
296333
}
297334

@@ -1209,19 +1246,24 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
12091246
));
12101247
}
12111248

1212-
let remote_reducer = &ob.remote_reducer;
1213-
let on_result_reducer_name = match &ob.on_result_reducer {
1214-
Some(r) => quote!(Some(<#r as spacetimedb::rt::FnInfo>::NAME)),
1249+
let remote_reducer_name = path_tail_name(&ob.remote_reducer);
1250+
let on_result_reducer_name = match ob.on_result_reducer.as_ref().map(path_tail_name) {
1251+
Some(r) => quote!(Some(#r)),
12151252
None => quote!(None),
12161253
};
12171254
let desc = quote!(spacetimedb::table::OutboxDesc {
1218-
remote_reducer_name: <#remote_reducer as spacetimedb::rt::FnInfo>::NAME,
1255+
remote_reducer_name: #remote_reducer_name,
12191256
on_result_reducer_name: #on_result_reducer_name,
12201257
});
12211258

12221259
let primary_key_ty = primary_key_column.ty;
1260+
let callback_typecheck = ob
1261+
.on_result_reducer
1262+
.as_ref()
1263+
.map(|r| quote!(spacetimedb::rt::outbox_typecheck::<#original_struct_ident>(#r);));
12231264
let typecheck = quote! {
12241265
spacetimedb::rt::assert_outbox_table_primary_key::<#primary_key_ty>();
1266+
#callback_typecheck
12251267
};
12261268

12271269
Ok((desc, typecheck))

crates/core/src/host/host_controller.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use spacetimedb_schema::def::{ModuleDef, RawModuleDefVersion};
4444
use spacetimedb_table::page_pool::PagePool;
4545
use std::future::Future;
4646
use std::ops::Deref;
47+
use std::sync::atomic::{AtomicU16, Ordering};
4748
use std::sync::Arc;
4849
use std::time::Duration;
4950
use tokio::sync::{watch, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock as AsyncRwLock};
@@ -118,6 +119,8 @@ pub struct HostController {
118119
db_cores: JobCores,
119120
/// The pool of buffers used to build `BsatnRowList`s in subscriptions.
120121
pub bsatn_rlb_pool: BsatnRowListBuilderPool,
122+
/// HTTP port used by local IDC delivery.
123+
idc_http_port: Arc<AtomicU16>,
121124
}
122125

123126
pub(crate) struct HostRuntimes {
@@ -233,6 +236,7 @@ impl HostController {
233236
page_pool: PagePool::new(default_config.page_pool_max_size),
234237
bsatn_rlb_pool: BsatnRowListBuilderPool::new(),
235238
db_cores,
239+
idc_http_port: Arc::new(AtomicU16::new(80)),
236240
}
237241
}
238242

@@ -241,6 +245,10 @@ impl HostController {
241245
self.program_storage = ps;
242246
}
243247

248+
pub fn set_idc_http_port(&self, port: u16) {
249+
self.idc_http_port.store(port, Ordering::Relaxed);
250+
}
251+
244252
/// Get a [`ModuleHost`] managed by this controller, or launch it from
245253
/// persistent state.
246254
///
@@ -1117,6 +1125,7 @@ impl Host {
11171125
replica_ctx.relational_db().clone(),
11181126
IdcActorConfig {
11191127
sender_identity: replica_ctx.database_identity,
1128+
http_port: host_controller.idc_http_port.load(Ordering::Relaxed),
11201129
},
11211130
module_host.downgrade(),
11221131
);

crates/core/src/host/idc_actor.rs

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
/// 1. Loads undelivered entries from `st_outbound_msg` on startup, resolving delivery data from outbox tables.
55
/// 2. Accepts immediate notifications via an mpsc channel when new outbox rows are inserted.
66
/// 3. Delivers each message in msg_id order via HTTP POST to
7-
/// `http://localhost:80/v1/database/{target_db}/call-from-database/{reducer}?sender_identity=<hex>&msg_id=<n>`
7+
/// `http://localhost:{http_port}/v1/database/{target_db}/call-from-database/{reducer}?sender_identity=<hex>&msg_id=<n>`
88
/// 4. On transport errors (network, 5xx, 4xx except 422/402): retries infinitely with exponential
99
/// backoff, blocking only the affected target database (other targets continue unaffected).
1010
/// 5. On reducer errors (HTTP 422) or budget exceeded (HTTP 402): calls the configured
@@ -16,14 +16,13 @@ use crate::host::FunctionArgs;
1616
use spacetimedb_datastore::execution_context::Workload;
1717
use spacetimedb_datastore::system_tables::{StOutboundMsgRow, ST_OUTBOUND_MSG_ID};
1818
use spacetimedb_datastore::traits::IsolationLevel;
19-
use spacetimedb_lib::{AlgebraicValue, Identity};
19+
use spacetimedb_lib::{AlgebraicValue, Identity, ProductValue};
2020
use spacetimedb_primitives::{ColId, TableId};
2121
use std::collections::{HashMap, VecDeque};
2222
use std::sync::Arc;
2323
use std::time::{Duration, Instant};
2424
use tokio::sync::mpsc;
2525

26-
const IDC_HTTP_PORT: u16 = 80;
2726
const INITIAL_BACKOFF: Duration = Duration::from_millis(100);
2827
const MAX_BACKOFF: Duration = Duration::from_secs(30);
2928
/// How long to wait before polling again when there is no work.
@@ -38,6 +37,7 @@ pub type IdcActorSender = mpsc::UnboundedSender<()>;
3837
/// The identity of this (sender) database, set when the IDC actor is started.
3938
pub struct IdcActorConfig {
4039
pub sender_identity: Identity,
40+
pub http_port: u16,
4141
}
4242

4343
/// A handle that, when dropped, stops the IDC actor background task.
@@ -86,6 +86,7 @@ struct PendingMessage {
8686
target_db_identity: Identity,
8787
target_reducer: String,
8888
args_bsatn: Vec<u8>,
89+
request_row: ProductValue,
8990
/// From the outbox table's `TableSchema::on_result_reducer`.
9091
on_result_reducer: Option<String>,
9192
}
@@ -171,7 +172,7 @@ async fn run_idc_loop(
171172
log::warn!(
172173
"idc_actor: transport error delivering msg_id={} to {}: {reason}",
173174
msg.msg_id,
174-
hex::encode(msg.target_db_identity.to_byte_array()),
175+
msg.target_db_identity.to_hex(),
175176
);
176177
state.record_transport_error();
177178
// Do NOT pop the front — keep retrying this message for this target.
@@ -246,19 +247,23 @@ async fn finalize_message(
246247
return;
247248
};
248249

249-
// Encode (result_payload: String) as BSATN args.
250-
// The on_result reducer is expected to accept a single String argument.
251-
let args_bytes = match spacetimedb_sats::bsatn::to_vec(&result_payload) {
252-
Ok(b) => b,
253-
Err(e) => {
254-
log::error!(
255-
"idc_actor: failed to encode on_result args for msg_id={}: {e}",
256-
msg.msg_id
257-
);
258-
delete_message(db, msg.msg_id);
259-
return;
260-
}
250+
// Encode `(request_row: OutboxRow, result: Result<(), String>)` as BSATN args.
251+
let result = if result_payload.is_empty() {
252+
Ok::<(), String>(())
253+
} else {
254+
Err::<(), String>(result_payload)
261255
};
256+
let mut args_bytes = Vec::new();
257+
if let Err(e) = spacetimedb_sats::bsatn::to_writer(&mut args_bytes, &msg.request_row)
258+
.and_then(|_| spacetimedb_sats::bsatn::to_writer(&mut args_bytes, &result))
259+
{
260+
log::error!(
261+
"idc_actor: failed to encode on_result args for msg_id={}: {e}",
262+
msg.msg_id
263+
);
264+
delete_message(db, msg.msg_id);
265+
return;
266+
}
262267

263268
let caller_identity = Identity::ZERO; // system call
264269
let result = host
@@ -366,12 +371,24 @@ fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity,
366371

367372
let pv = outbox_row_ref.to_product_value();
368373

369-
// Col 1: target_db_identity (Identity stored as U256).
370-
let target_db_identity = match pv.elements.get(1) {
371-
Some(AlgebraicValue::U256(u)) => Identity::from_u256(**u),
374+
// Col 1: target_db_identity stored as SATS `Identity`,
375+
// i.e. the product wrapper `(__identity__: U256)`.
376+
let target_db_identity: Identity = match pv.elements.get(1) {
377+
Some(AlgebraicValue::Product(identity_pv)) if identity_pv.elements.len() == 1 => {
378+
match &identity_pv.elements[0] {
379+
AlgebraicValue::U256(u) => Identity::from_u256(**u),
380+
other => {
381+
log::error!(
382+
"idc_actor: outbox row col 1 expected Identity inner U256, got {other:?} (msg_id={})",
383+
st_row.msg_id,
384+
);
385+
continue;
386+
}
387+
}
388+
}
372389
other => {
373390
log::error!(
374-
"idc_actor: outbox row col 1 expected U256 (Identity), got {other:?} (msg_id={})",
391+
"idc_actor: outbox row col 1 expected Identity wrapper, got {other:?} (msg_id={})",
375392
st_row.msg_id,
376393
);
377394
continue;
@@ -392,6 +409,7 @@ fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity,
392409
target_db_identity,
393410
target_reducer,
394411
args_bsatn,
412+
request_row: pv,
395413
on_result_reducer,
396414
});
397415
}
@@ -412,17 +430,13 @@ fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity,
412430
}
413431

414432
/// Attempt a single HTTP delivery of a message.
415-
async fn attempt_delivery(
416-
client: &reqwest::Client,
417-
config: &IdcActorConfig,
418-
msg: &PendingMessage,
419-
) -> DeliveryOutcome {
420-
let target_db_hex = hex::encode(msg.target_db_identity.to_byte_array());
421-
let sender_hex = hex::encode(config.sender_identity.to_byte_array());
433+
async fn attempt_delivery(client: &reqwest::Client, config: &IdcActorConfig, msg: &PendingMessage) -> DeliveryOutcome {
434+
let target_db_hex = msg.target_db_identity.to_hex();
435+
let sender_hex = config.sender_identity.to_hex();
422436

423437
let url = format!(
424-
"http://localhost:{IDC_HTTP_PORT}/v1/database/{target_db_hex}/call-from-database/{}?sender_identity={sender_hex}&msg_id={}",
425-
msg.target_reducer, msg.msg_id,
438+
"http://localhost:{}/v1/database/{target_db_hex}/call-from-database/{}?sender_identity={sender_hex}&msg_id={}",
439+
config.http_port, msg.target_reducer, msg.msg_id,
426440
);
427441

428442
let result = client

crates/core/src/host/v8/mod.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,13 +1188,15 @@ async fn spawn_instance_worker(
11881188
send_worker_reply("call_reducer", reply_tx, res, trapped);
11891189
should_exit = trapped;
11901190
}
1191-
JsWorkerRequest::CallReducerDeleteOutboundOnSuccess { reply_tx, params, msg_id } => {
1192-
let (res, trapped) = instance_common.call_reducer_with_tx_and_success_action(
1193-
None,
1194-
params,
1195-
&mut inst,
1196-
|tx| tx.delete_outbound_msg(msg_id).map_err(anyhow::Error::from),
1197-
);
1191+
JsWorkerRequest::CallReducerDeleteOutboundOnSuccess {
1192+
reply_tx,
1193+
params,
1194+
msg_id,
1195+
} => {
1196+
let (res, trapped) =
1197+
instance_common.call_reducer_with_tx_and_success_action(None, params, &mut inst, |tx| {
1198+
tx.delete_outbound_msg(msg_id).map_err(anyhow::Error::from)
1199+
});
11981200
worker_trapped.store(trapped, Ordering::Relaxed);
11991201
send_worker_reply("call_reducer", reply_tx, res, trapped);
12001202
should_exit = trapped;

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,11 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
472472
res
473473
}
474474

475-
pub fn call_reducer_delete_outbound_on_success(&mut self, params: CallReducerParams, msg_id: u64) -> ReducerCallResult {
475+
pub fn call_reducer_delete_outbound_on_success(
476+
&mut self,
477+
params: CallReducerParams,
478+
msg_id: u64,
479+
) -> ReducerCallResult {
476480
let (res, trapped) = crate::callgrind_flag::invoke_allowing_callgrind(|| {
477481
self.common
478482
.call_reducer_with_tx_and_success_action(None, params, &mut self.instance, |tx| {

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ use crate::{
1919
is_built_in_meta_row, system_tables, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow,
2020
StFields, StIndexRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewRow, SystemTable,
2121
ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX,
22-
ST_CONSTRAINT_NAME, ST_INBOUND_MSG_ID, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID,
23-
ST_MODULE_IDX, ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID, ST_SCHEDULED_IDX,
24-
ST_SEQUENCE_ID, ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_IDX, ST_VAR_ID, ST_VAR_IDX,
25-
ST_VIEW_ARG_ID, ST_VIEW_ARG_IDX,
22+
ST_CONSTRAINT_NAME, ST_INBOUND_MSG_ID, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID, ST_MODULE_IDX,
23+
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID, ST_SCHEDULED_IDX, ST_SEQUENCE_ID,
24+
ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_IDX, ST_VAR_ID, ST_VAR_IDX, ST_VIEW_ARG_ID,
25+
ST_VIEW_ARG_IDX,
2626
},
2727
traits::{EphemeralTables, TxData},
2828
};

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2697,7 +2697,6 @@ impl MutTxId {
26972697
})
26982698
}
26992699

2700-
27012700
pub fn insert_via_serialize_bsatn<'a, T: Serialize>(
27022701
&'a mut self,
27032702
table_id: TableId,

crates/datastore/src/system_tables.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,10 @@ fn system_module_def() -> ModuleDef {
710710

711711
let st_outbound_msg_type = builder.add_type::<StOutboundMsgRow>();
712712
builder
713-
.build_table(ST_OUTBOUND_MSG_NAME, *st_outbound_msg_type.as_ref().expect("should be ref"))
713+
.build_table(
714+
ST_OUTBOUND_MSG_NAME,
715+
*st_outbound_msg_type.as_ref().expect("should be ref"),
716+
)
714717
.with_type(TableType::System)
715718
.with_auto_inc_primary_key(StOutboundMsgFields::MsgId)
716719
.with_index_no_accessor_name(btree(StOutboundMsgFields::MsgId));

0 commit comments

Comments
 (0)