Skip to content

Commit f036abd

Browse files
committed
hacky macro
1 parent 811266f commit f036abd

10 files changed

Lines changed: 346 additions & 12 deletions

File tree

crates/bindings-macro/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ mod sym {
132132
symbol!(update);
133133
symbol!(default);
134134
symbol!(event);
135+
symbol!(outbox);
136+
symbol!(on_result);
135137

136138
symbol!(u8);
137139
symbol!(i8);

crates/bindings-macro/src/table.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ pub(crate) struct TableArgs {
2222
accessor: Ident,
2323
indices: Vec<IndexArg>,
2424
event: Option<Span>,
25+
outbox: Option<OutboxArg>,
26+
}
27+
28+
/// Parsed from `outbox(remote_reducer_fn)` optionally followed by `on_result(local_reducer_fn)`.
29+
struct OutboxArg {
30+
span: Span,
31+
/// Path to the remote-side reducer function (used only for its name via `FnInfo::NAME`).
32+
remote_reducer: Path,
33+
/// Path to the local `on_result` reducer, if any.
34+
on_result_reducer: Option<Path>,
2535
}
2636

2737
enum TableAccess {
@@ -82,6 +92,7 @@ impl TableArgs {
8292
let mut name: Option<LitStr> = None;
8393
let mut indices = Vec::new();
8494
let mut event = None;
95+
let mut outbox: Option<OutboxArg> = None;
8596
syn::meta::parser(|meta| {
8697
match_meta!(match meta {
8798
sym::public => {
@@ -149,6 +160,30 @@ If you're migrating from SpacetimeDB 1.*, replace `name = {sym}` with `accessor
149160
check_duplicate(&event, &meta)?;
150161
event = Some(meta.path.span());
151162
}
163+
sym::outbox => {
164+
check_duplicate_msg(&outbox, &meta, "already specified outbox")?;
165+
outbox = Some(OutboxArg::parse_meta(meta)?);
166+
}
167+
sym::on_result => {
168+
// `on_result` must be specified alongside `outbox`.
169+
// We parse it here and attach it to the outbox arg below.
170+
let span = meta.path.span();
171+
let on_result_path = OutboxArg::parse_single_path_meta(meta)?;
172+
match &mut outbox {
173+
Some(ob) => {
174+
if ob.on_result_reducer.is_some() {
175+
return Err(syn::Error::new(span, "already specified on_result"));
176+
}
177+
ob.on_result_reducer = Some(on_result_path);
178+
}
179+
None => {
180+
return Err(syn::Error::new(
181+
span,
182+
"on_result requires outbox to be specified first: `outbox(remote_reducer), on_result(local_reducer)`",
183+
))
184+
}
185+
}
186+
}
152187
});
153188
Ok(())
154189
})
@@ -188,6 +223,7 @@ If you're migrating from SpacetimeDB 1.*, replace `name = {name_str_value:?}` wi
188223
indices,
189224
name,
190225
event,
226+
outbox,
191227
})
192228
}
193229
}
@@ -231,6 +267,64 @@ impl ScheduledArg {
231267
}
232268
}
233269

270+
impl OutboxArg {
271+
/// Parse `outbox(remote_reducer_path)`.
272+
///
273+
/// `on_result` is parsed separately via `parse_single_path_meta` and attached afterwards.
274+
fn parse_meta(meta: ParseNestedMeta) -> syn::Result<Self> {
275+
let span = meta.path.span();
276+
let mut remote_reducer: Option<Path> = None;
277+
278+
meta.parse_nested_meta(|meta| {
279+
if meta.input.peek(syn::Token![=]) || meta.input.peek(syn::token::Paren) {
280+
Err(meta.error("outbox takes a single function path, e.g. `outbox(my_remote_reducer)`"))
281+
} else {
282+
check_duplicate_msg(
283+
&remote_reducer,
284+
&meta,
285+
"can only specify one remote reducer for outbox",
286+
)?;
287+
remote_reducer = Some(meta.path);
288+
Ok(())
289+
}
290+
})?;
291+
292+
let remote_reducer = remote_reducer.ok_or_else(|| {
293+
syn::Error::new(span, "outbox requires a remote reducer: `outbox(my_remote_reducer)`")
294+
})?;
295+
296+
Ok(Self {
297+
span,
298+
remote_reducer,
299+
on_result_reducer: None,
300+
})
301+
}
302+
303+
/// Parse `on_result(local_reducer_path)` and return the path.
304+
fn parse_single_path_meta(meta: ParseNestedMeta) -> syn::Result<Path> {
305+
let span = meta.path.span();
306+
let mut result: Option<Path> = None;
307+
308+
meta.parse_nested_meta(|meta| {
309+
if meta.input.peek(syn::Token![=]) || meta.input.peek(syn::token::Paren) {
310+
Err(meta.error("on_result takes a single function path, e.g. `on_result(my_local_reducer)`"))
311+
} else {
312+
check_duplicate_msg(
313+
&result,
314+
&meta,
315+
"can only specify one on_result reducer",
316+
)?;
317+
result = Some(meta.path);
318+
Ok(())
319+
}
320+
})?;
321+
322+
result.ok_or_else(|| {
323+
syn::Error::new(span, "on_result requires a local reducer: `on_result(my_local_reducer)`")
324+
})
325+
}
326+
}
327+
234328
impl IndexArg {
235329
fn parse_meta(meta: ParseNestedMeta) -> syn::Result<Self> {
236330
let mut accessor = None;

crates/bindings/src/rt.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,17 @@ impl<T: SpacetimeType> TableColumn for T {}
533533
/// Assert that the primary_key column of a scheduled table is a u64.
534534
pub const fn assert_scheduled_table_primary_key<T: ScheduledTablePrimaryKey>() {}
535535

536+
/// Verify at compile time that a function has the correct signature for an outbox `on_result` reducer.
537+
///
538+
/// The reducer must accept `(OutboxRow, Result<(), String>)` as its user-supplied arguments:
539+
/// `fn on_result(ctx: &ReducerContext, request: OutboxRow, result: Result<(), String>)`
540+
pub const fn outbox_typecheck<'de, OutboxRow>(_x: impl Reducer<'de, (OutboxRow, Result<(), String>)>)
541+
where
542+
OutboxRow: spacetimedb_lib::SpacetimeType + Serialize + Deserialize<'de>,
543+
{
544+
core::mem::forget(_x);
545+
}
546+
536547
mod sealed {
537548
pub trait Sealed {}
538549
}
@@ -763,6 +774,14 @@ pub fn register_table<T: Table>() {
763774

764775
table.finish();
765776

777+
if let Some(outbox) = T::OUTBOX {
778+
module.inner.add_outbox(
779+
T::TABLE_NAME,
780+
outbox.remote_reducer_name,
781+
outbox.on_result_reducer_name,
782+
);
783+
}
784+
766785
module.inner.add_explicit_names(T::explicit_names());
767786
})
768787
}

crates/bindings/src/table.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ pub trait TableInternal: Sized {
134134
const SEQUENCES: &'static [u16];
135135
const SCHEDULE: Option<ScheduleDesc<'static>> = None;
136136
const IS_EVENT: bool = false;
137+
const OUTBOX: Option<OutboxDesc<'static>> = None;
137138

138139
/// Returns the ID of this table.
139140
fn table_id() -> TableId;
@@ -161,6 +162,14 @@ pub struct ScheduleDesc<'a> {
161162
pub scheduled_at_column: u16,
162163
}
163164

165+
/// Describes the outbox configuration of a table, for inter-database communication.
166+
pub struct OutboxDesc<'a> {
167+
/// The name of the remote reducer to invoke on the target database.
168+
pub remote_reducer_name: &'a str,
169+
/// The local reducer to call with the delivery result, if any.
170+
pub on_result_reducer_name: Option<&'a str>,
171+
}
172+
164173
#[derive(Debug, Clone)]
165174
pub struct ColumnDefault {
166175
pub col_id: u16,

crates/lib/src/db/raw_def/v10.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,13 @@ pub enum RawModuleDefV10Section {
8989

9090
/// Names provided explicitly by the user that do not follow from the case conversion policy.
9191
ExplicitNames(ExplicitNames),
92+
93+
/// Outbox table definitions.
94+
///
95+
/// Each entry marks a table as an outbox table for inter-database communication,
96+
/// specifying the remote reducer to call and optionally a local callback reducer.
97+
/// New variant — old modules simply omit this section; old servers skip it.
98+
Outboxes(Vec<RawOutboxDefV10>),
9299
}
93100

94101
#[derive(Debug, Clone, Copy, Default, SpacetimeType)]
@@ -264,6 +271,31 @@ pub struct RawColumnDefaultValueV10 {
264271
pub value: Box<[u8]>,
265272
}
266273

274+
/// Marks a table as an outbox table for inter-database communication.
275+
///
276+
/// The table must have:
277+
/// - Col 0: `u64` with `#[primary_key] #[auto_inc]` — the row ID stored in `st_msg_id`.
278+
/// - Col 1: `Identity` (encoded as U256) — the target database identity.
279+
/// - Remaining cols: arguments forwarded verbatim to the remote reducer.
280+
///
281+
/// The `remote_reducer` is the name of the reducer to call on the target database.
282+
/// If `on_result_reducer` is set, that local reducer is called when delivery completes,
283+
/// with signature `fn on_result(ctx: &ReducerContext, request: OutboxRow, result: Result<(), String>)`.
284+
#[derive(Debug, Clone, SpacetimeType)]
285+
#[sats(crate = crate)]
286+
#[cfg_attr(feature = "test", derive(PartialEq, Eq, PartialOrd, Ord))]
287+
pub struct RawOutboxDefV10 {
288+
/// The `source_name` of the outbox table (as given in `accessor = ...`).
289+
pub table_name: RawIdentifier,
290+
291+
/// The name of the reducer to call on the target database.
292+
pub remote_reducer: RawIdentifier,
293+
294+
/// The name of the local reducer to call with the delivery result.
295+
/// If `None`, no callback is made after delivery.
296+
pub on_result_reducer: Option<RawIdentifier>,
297+
}
298+
267299
/// A reducer definition.
268300
#[derive(Debug, Clone, SpacetimeType)]
269301
#[sats(crate = crate)]
@@ -584,6 +616,14 @@ impl RawModuleDefV10 {
584616
.expect("Tables section must exist for tests")
585617
}
586618

619+
/// Get the outboxes section, if present.
620+
pub fn outboxes(&self) -> Option<&Vec<RawOutboxDefV10>> {
621+
self.sections.iter().find_map(|s| match s {
622+
RawModuleDefV10Section::Outboxes(outboxes) => Some(outboxes),
623+
_ => None,
624+
})
625+
}
626+
587627
// Get the row-level security section, if present.
588628
pub fn row_level_security(&self) -> Option<&Vec<RawRowLevelSecurityDefV10>> {
589629
self.sections.iter().find_map(|s| match s {
@@ -785,6 +825,24 @@ impl RawModuleDefV10Builder {
785825
}
786826
}
787827

828+
/// Get mutable access to the outboxes section, creating it if missing.
829+
fn outboxes_mut(&mut self) -> &mut Vec<RawOutboxDefV10> {
830+
let idx = self
831+
.module
832+
.sections
833+
.iter()
834+
.position(|s| matches!(s, RawModuleDefV10Section::Outboxes(_)))
835+
.unwrap_or_else(|| {
836+
self.module.sections.push(RawModuleDefV10Section::Outboxes(Vec::new()));
837+
self.module.sections.len() - 1
838+
});
839+
840+
match &mut self.module.sections[idx] {
841+
RawModuleDefV10Section::Outboxes(outboxes) => outboxes,
842+
_ => unreachable!("Just ensured Outboxes section exists"),
843+
}
844+
}
845+
788846
/// Get mutable access to the case conversion policy, creating it if missing.
789847
fn explicit_names_mut(&mut self) -> &mut ExplicitNames {
790848
let idx = self
@@ -1040,6 +1098,24 @@ impl RawModuleDefV10Builder {
10401098
});
10411099
}
10421100

1101+
/// Register an outbox table for inter-database communication.
1102+
///
1103+
/// `table_name` is the `source_name` of the table (i.e. `accessor =` value).
1104+
/// `remote_reducer` is the reducer to call on the target database.
1105+
/// `on_result_reducer` is an optional local reducer called with the delivery result.
1106+
pub fn add_outbox(
1107+
&mut self,
1108+
table_name: impl Into<RawIdentifier>,
1109+
remote_reducer: impl Into<RawIdentifier>,
1110+
on_result_reducer: Option<impl Into<RawIdentifier>>,
1111+
) {
1112+
self.outboxes_mut().push(RawOutboxDefV10 {
1113+
table_name: table_name.into(),
1114+
remote_reducer: remote_reducer.into(),
1115+
on_result_reducer: on_result_reducer.map(Into::into),
1116+
});
1117+
}
1118+
10431119
/// Add a row-level security policy to the module.
10441120
///
10451121
/// The `sql` expression should be a valid SQL expression that will be used to filter rows.

0 commit comments

Comments
 (0)