Skip to content

Commit 4d68dd6

Browse files
committed
fix macro
1 parent 5cd680b commit 4d68dd6

6 files changed

Lines changed: 91 additions & 51 deletions

File tree

crates/bindings-macro/src/table.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,6 +1131,9 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
11311131
}
11321132
};
11331133

1134+
// Save a clone before the schedule closure captures `primary_key_column` by move.
1135+
let primary_key_column_for_outbox = primary_key_column.clone();
1136+
11341137
let (schedule, schedule_typecheck) = args
11351138
.scheduled
11361139
.as_ref()
@@ -1189,6 +1192,44 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
11891192
.unzip();
11901193
let schedule = schedule.into_iter();
11911194

1195+
let (outbox, outbox_typecheck) = args
1196+
.outbox
1197+
.as_ref()
1198+
.map(|ob| {
1199+
let primary_key_column = primary_key_column_for_outbox.ok_or_else(|| {
1200+
syn::Error::new(
1201+
ob.span,
1202+
"outbox tables must have a `#[primary_key] #[auto_inc]` u64 column at position 0 (the row_id)",
1203+
)
1204+
})?;
1205+
if primary_key_column.index != 0 {
1206+
return Err(syn::Error::new(
1207+
ob.span,
1208+
"outbox tables must have the `#[primary_key] #[auto_inc]` column as the first column (col 0)",
1209+
));
1210+
}
1211+
1212+
let remote_reducer = &ob.remote_reducer;
1213+
let on_result_reducer_name = match &ob.on_result_reducer {
1214+
Some(r) => quote!(Some(<#r as spacetimedb::rt::FnInfo>::NAME)),
1215+
None => quote!(None),
1216+
};
1217+
let desc = quote!(spacetimedb::table::OutboxDesc {
1218+
remote_reducer_name: <#remote_reducer as spacetimedb::rt::FnInfo>::NAME,
1219+
on_result_reducer_name: #on_result_reducer_name,
1220+
});
1221+
1222+
let primary_key_ty = primary_key_column.ty;
1223+
let typecheck = quote! {
1224+
spacetimedb::rt::assert_outbox_table_primary_key::<#primary_key_ty>();
1225+
};
1226+
1227+
Ok((desc, typecheck))
1228+
})
1229+
.transpose()?
1230+
.unzip();
1231+
let outbox = outbox.into_iter();
1232+
11921233
let unique_err = if !unique_columns.is_empty() {
11931234
quote!(spacetimedb::UniqueConstraintViolation)
11941235
} else {
@@ -1222,6 +1263,7 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
12221263
#(const PRIMARY_KEY: Option<u16> = Some(#primary_col_id);)*
12231264
const SEQUENCES: &'static [u16] = &[#(#sequence_col_ids),*];
12241265
#(const SCHEDULE: Option<spacetimedb::table::ScheduleDesc<'static>> = Some(#schedule);)*
1266+
#(const OUTBOX: Option<spacetimedb::table::OutboxDesc<'static>> = Some(#outbox);)*
12251267

12261268
#table_id_from_name_func
12271269
#default_fn
@@ -1373,6 +1415,7 @@ pub(crate) fn table_impl(mut args: TableArgs, item: &syn::DeriveInput) -> syn::R
13731415
const _: () = {
13741416
#(let _ = <#field_types as spacetimedb::rt::TableColumn>::_ITEM;)*
13751417
#schedule_typecheck
1418+
#outbox_typecheck
13761419
#default_type_check
13771420
};
13781421

crates/bindings/src/rt.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,9 @@ impl<T: SpacetimeType> TableColumn for T {}
533533
/// Assert that the primary_key column of a scheduled table is a u64.
534534
pub const fn assert_scheduled_table_primary_key<T: ScheduledTablePrimaryKey>() {}
535535

536+
/// Assert that the primary_key column of an outbox table is a u64.
537+
pub const fn assert_outbox_table_primary_key<T: OutboxTablePrimaryKey>() {}
538+
536539
/// Verify at compile time that a function has the correct signature for an outbox `on_result` reducer.
537540
///
538541
/// The reducer must accept `(OutboxRow, Result<(), String>)` as its user-supplied arguments:
@@ -555,6 +558,13 @@ pub trait ScheduledTablePrimaryKey: sealed::Sealed {}
555558
impl sealed::Sealed for u64 {}
556559
impl ScheduledTablePrimaryKey for u64 {}
557560

561+
#[diagnostic::on_unimplemented(
562+
message = "outbox table primary key must be a `u64`",
563+
label = "should be `u64`, not `{Self}`"
564+
)]
565+
pub trait OutboxTablePrimaryKey: sealed::Sealed {}
566+
impl OutboxTablePrimaryKey for u64 {}
567+
558568
/// Used in the last type parameter of `Reducer` to indicate that the
559569
/// context argument *should* be passed to the reducer logic.
560570
pub struct ContextArg;

crates/core/src/host/idc_runtime.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ impl IdcRuntime {
7777
#[derive(Clone)]
7878
struct PendingMessage {
7979
msg_id: u64,
80+
/// Stored for future use (e.g. deleting the outbox row after delivery).
81+
#[allow(dead_code)]
8082
outbox_table_id: TableId,
83+
/// Stored for future use (e.g. deleting the outbox row after delivery).
84+
#[allow(dead_code)]
8185
row_id: u64,
8286
target_db_identity: Identity,
8387
target_reducer: String,
@@ -325,9 +329,19 @@ fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity,
325329
}
326330
};
327331

328-
let table_name = schema.table_name.to_string();
329-
let target_reducer = table_name.strip_prefix("__outbox_").unwrap_or(&table_name).to_string();
330-
let on_result_reducer = schema.on_result_reducer.clone();
332+
let outbox_schema = match schema.outbox.as_ref() {
333+
Some(o) => o,
334+
None => {
335+
log::error!(
336+
"idc_runtime: table {:?} (msg_id={}) is not an outbox table",
337+
schema.table_name,
338+
st_row.msg_id,
339+
);
340+
continue;
341+
}
342+
};
343+
let target_reducer = outbox_schema.remote_reducer.to_string();
344+
let on_result_reducer = outbox_schema.on_result_reducer.as_ref().map(|id| id.to_string());
331345

332346
// Look up the outbox row by its auto-inc PK (col 0) to get target identity and args.
333347
let outbox_row = db

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ use smallvec::SmallVec;
4444
use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap};
4545
use spacetimedb_durability::TxOffset;
4646
use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row};
47-
use spacetimedb_lib::bsatn::ToBsatn;
4847
use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics, Timestamp};
4948
use spacetimedb_lib::{
5049
db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP},
@@ -2777,40 +2776,6 @@ impl MutTxId {
27772776
})
27782777
}
27792778

2780-
/// Returns the table IDs and table names of all tables that had rows inserted
2781-
/// in this transaction and whose name starts with `"__outbox_"`.
2782-
///
2783-
/// Used by the IDC runtime to detect outbox inserts and enqueue ST_OUTBOUND_MSG entries.
2784-
pub fn outbox_insert_table_ids(&self) -> Vec<(TableId, String)> {
2785-
let mut result = Vec::new();
2786-
for table_id in self.tx_state.insert_tables.keys() {
2787-
if let Ok(schema) = self.schema_for_table(*table_id) {
2788-
let name = schema.table_name.to_string();
2789-
if name.starts_with("__outbox_") {
2790-
result.push((*table_id, name));
2791-
}
2792-
}
2793-
}
2794-
result
2795-
}
2796-
2797-
/// Returns the raw BSATN bytes of all rows inserted into `table_id` in this transaction.
2798-
///
2799-
/// Each `Vec<u8>` encodes a full outbox row. The first 32 bytes are the target
2800-
/// database identity (U256 little-endian), and the remaining bytes are the reducer args.
2801-
pub fn outbox_inserts_for_table(&mut self, table_id: TableId) -> Vec<Vec<u8>> {
2802-
let Some(inserted_table) = self.tx_state.insert_tables.get(&table_id) else {
2803-
return Vec::new();
2804-
};
2805-
// Collect blob-store-independent data: we hold a shared ref to insert_tables,
2806-
// so we can't also borrow committed_state mutably for the blob store.
2807-
// Use the blob store embedded in the TxState.
2808-
let blob_store = &self.tx_state.blob_store;
2809-
inserted_table
2810-
.scan_rows(blob_store)
2811-
.filter_map(|row_ref| row_ref.to_bsatn_vec().ok())
2812-
.collect()
2813-
}
28142779

28152780
pub fn insert_via_serialize_bsatn<'a, T: Serialize>(
28162781
&'a mut self,

crates/schema/src/schema.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,8 @@ pub struct TableSchema {
201201
/// Whether this is an event table.
202202
pub is_event: bool,
203203

204-
/// For outbox tables (`__outbox_<reducer>`): the name of the local reducer to call
205-
/// with the result of the remote reducer invocation. `None` means no callback.
206-
pub on_result_reducer: Option<String>,
204+
/// Outbox configuration if this is an outbox table; `None` for non-outbox tables.
205+
pub outbox: Option<OutboxSchema>,
207206

208207
/// Cache for `row_type_for_table` in the data store.
209208
pub row_type: ProductType,
@@ -231,7 +230,7 @@ impl TableSchema {
231230
primary_key: Option<ColId>,
232231
is_event: bool,
233232
alias: Option<Identifier>,
234-
on_result_reducer: Option<String>,
233+
outbox: Option<OutboxSchema>,
235234
) -> Self {
236235
Self {
237236
row_type: columns_to_row_type(&columns),
@@ -248,7 +247,7 @@ impl TableSchema {
248247
primary_key,
249248
is_event,
250249
alias,
251-
on_result_reducer,
250+
outbox,
252251
}
253252
}
254253

@@ -1041,10 +1040,10 @@ impl Schema for TableSchema {
10411040
.as_ref()
10421041
.map(|schedule| ScheduleSchema::from_module_def(module_def, schedule, table_id, ScheduleId::SENTINEL));
10431042

1044-
let on_result_reducer = outbox
1045-
.as_ref()
1046-
.and_then(|o| o.on_result_reducer.as_ref())
1047-
.map(|r| r.to_string());
1043+
let outbox_schema = outbox.as_ref().map(|o| OutboxSchema {
1044+
remote_reducer: o.remote_reducer.clone(),
1045+
on_result_reducer: o.on_result_reducer.clone(),
1046+
});
10481047

10491048
TableSchema::new(
10501049
table_id,
@@ -1060,7 +1059,7 @@ impl Schema for TableSchema {
10601059
*primary_key,
10611060
*is_event,
10621061
Some(accessor_name.clone()),
1063-
on_result_reducer,
1062+
outbox_schema,
10641063
)
10651064
}
10661065

@@ -1443,6 +1442,15 @@ impl Schema for ScheduleSchema {
14431442
}
14441443
}
14451444

1445+
/// Marks a table as an outbox table for inter-database communication.
1446+
#[derive(Debug, Clone, PartialEq, Eq)]
1447+
pub struct OutboxSchema {
1448+
/// The name of the reducer to invoke on the target database.
1449+
pub remote_reducer: Identifier,
1450+
/// The local reducer called with the delivery result, if any.
1451+
pub on_result_reducer: Option<Identifier>,
1452+
}
1453+
14461454
/// A struct representing the schema of a database index.
14471455
#[derive(Debug, Clone, PartialEq, Eq)]
14481456
pub struct IndexSchema {

crates/table/src/table.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub struct Table {
108108
is_scheduler: bool,
109109
/// Indicates whether this is an outbox table or not.
110110
///
111-
/// Outbox tables follow the naming convention `__outbox_<reducer>`.
111+
/// Set from `schema.outbox.is_some()`.
112112
/// This is an optimization to avoid checking the schema in e.g., `InstanceEnv::insert`.
113113
is_outbox: bool,
114114
}
@@ -549,7 +549,7 @@ impl Table {
549549
self.is_scheduler
550550
}
551551

552-
/// Returns whether this is an outbox table (`__outbox_<reducer>`).
552+
/// Returns whether this is an outbox table (i.e., `schema.outbox.is_some()`).
553553
pub fn is_outbox(&self) -> bool {
554554
self.is_outbox
555555
}
@@ -2265,7 +2265,7 @@ impl Table {
22652265
squashed_offset: SquashedOffset,
22662266
pointer_map: Option<PointerMap>,
22672267
) -> Self {
2268-
let is_outbox = schema.table_name.starts_with("__outbox_");
2268+
let is_outbox = schema.outbox.is_some();
22692269
Self {
22702270
inner: TableInner {
22712271
row_layout,

0 commit comments

Comments
 (0)