Skip to content

Commit 811266f

Browse files
committed
fix st_msg_id syntax
1 parent 06bc9a9 commit 811266f

12 files changed

Lines changed: 180 additions & 90 deletions

File tree

crates/core/src/host/idc_runtime.rs

Lines changed: 127 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
11
/// Inter-Database Communication (IDC) Runtime
22
///
33
/// Background tokio task that:
4-
/// 1. Loads Pending entries from `st_msg_id` on startup.
4+
/// 1. Loads undelivered entries from `st_msg_id` on startup, resolving delivery data from outbox tables.
55
/// 2. Accepts immediate notifications via an mpsc channel when new outbox rows are inserted.
66
/// 3. Delivers each message in msg_id order via HTTP POST to
77
/// `http://localhost:80/v1/database/{target_db}/call-from-database/{reducer}?sender_identity=<hex>&msg_id=<n>`
88
/// 4. On transport errors (network, 5xx, 4xx except 422/402): retries infinitely with exponential
99
/// backoff, blocking only the affected target database (other targets continue unaffected).
10-
/// 5. On reducer errors (HTTP 422) or budget exceeded (HTTP 402): records the result, marks DONE,
11-
/// and calls the configured `on_result_reducer` on the local database (if any).
10+
/// 5. On reducer errors (HTTP 422) or budget exceeded (HTTP 402): calls the configured
11+
/// `on_result_reducer` (read from the outbox table's schema) and deletes the st_msg_id row.
1212
/// 6. Enforces sequential delivery per target database: msg N+1 is only delivered after N is done.
1313
use crate::db::relational_db::RelationalDB;
1414
use crate::host::module_host::WeakModuleHost;
1515
use crate::host::FunctionArgs;
1616
use spacetimedb_datastore::execution_context::Workload;
1717
use spacetimedb_datastore::system_tables::{StMsgIdRow, ST_MSG_ID_ID};
1818
use spacetimedb_datastore::traits::IsolationLevel;
19-
use spacetimedb_lib::Identity;
19+
use spacetimedb_lib::{AlgebraicValue, Identity};
20+
use spacetimedb_primitives::{ColId, TableId};
2021
use std::collections::{HashMap, VecDeque};
2122
use std::sync::Arc;
2223
use std::time::{Duration, Instant};
@@ -77,9 +78,22 @@ impl IdcRuntime {
7778
}
7879
}
7980

81+
/// All data needed to deliver a single outbound message, resolved from the outbox table.
82+
#[derive(Clone)]
83+
struct PendingMessage {
84+
msg_id: u64,
85+
outbox_table_id: TableId,
86+
row_id: u64,
87+
target_db_identity: Identity,
88+
target_reducer: String,
89+
args_bsatn: Vec<u8>,
90+
/// From the outbox table's `TableSchema::on_result_reducer`.
91+
on_result_reducer: Option<String>,
92+
}
93+
8094
/// Per-target-database delivery state.
8195
struct TargetState {
82-
queue: VecDeque<StMsgIdRow>,
96+
queue: VecDeque<PendingMessage>,
8397
/// When `Some`, this target is in backoff and should not be retried until this instant.
8498
blocked_until: Option<Instant>,
8599
/// Current backoff duration for this target (doubles on each transport error).
@@ -149,16 +163,16 @@ async fn run_idc_loop(
149163
if !state.is_ready() {
150164
continue;
151165
}
152-
let Some(row) = state.queue.front().cloned() else {
166+
let Some(msg) = state.queue.front().cloned() else {
153167
continue;
154168
};
155-
let outcome = attempt_delivery(&client, &config, &row).await;
169+
let outcome = attempt_delivery(&client, &config, &msg).await;
156170
match outcome {
157171
DeliveryOutcome::TransportError(reason) => {
158172
log::warn!(
159173
"idc_runtime: transport error delivering msg_id={} to {}: {reason}",
160-
row.msg_id,
161-
hex::encode(Identity::from(row.target_db_identity).to_byte_array()),
174+
msg.msg_id,
175+
hex::encode(msg.target_db_identity.to_byte_array()),
162176
);
163177
state.record_transport_error();
164178
// Do NOT pop the front — keep retrying this message for this target.
@@ -168,7 +182,7 @@ async fn run_idc_loop(
168182
state.record_success();
169183
any_delivered = true;
170184
let (result_status, result_payload) = outcome_to_result(&outcome);
171-
finalize_message(&db, &module_host, &row, result_status, result_payload).await;
185+
finalize_message(&db, &module_host, &msg, result_status, result_payload).await;
172186
}
173187
}
174188
}
@@ -214,19 +228,19 @@ fn outcome_to_result(outcome: &DeliveryOutcome) -> (u8, String) {
214228
async fn finalize_message(
215229
db: &RelationalDB,
216230
module_host: &WeakModuleHost,
217-
row: &StMsgIdRow,
231+
msg: &PendingMessage,
218232
_result_status: u8,
219233
result_payload: String,
220234
) {
221235
// Call the on_result reducer if configured.
222-
if !row.on_result_reducer.is_empty() {
236+
if let Some(on_result_reducer) = &msg.on_result_reducer {
223237
let Some(host) = module_host.upgrade() else {
224238
log::warn!(
225239
"idc_runtime: module host gone, cannot call on_result reducer '{}' for msg_id={}",
226-
row.on_result_reducer,
227-
row.msg_id,
240+
on_result_reducer,
241+
msg.msg_id,
228242
);
229-
delete_message(db, row.msg_id);
243+
delete_message(db, msg.msg_id);
230244
return;
231245
};
232246

@@ -237,9 +251,9 @@ async fn finalize_message(
237251
Err(e) => {
238252
log::error!(
239253
"idc_runtime: failed to encode on_result args for msg_id={}: {e}",
240-
row.msg_id
254+
msg.msg_id
241255
);
242-
delete_message(db, row.msg_id);
256+
delete_message(db, msg.msg_id);
243257
return;
244258
}
245259
};
@@ -252,7 +266,7 @@ async fn finalize_message(
252266
None, // no client sender
253267
None, // no request_id
254268
None, // no timer
255-
&row.on_result_reducer,
269+
on_result_reducer,
256270
FunctionArgs::Bsatn(bytes::Bytes::from(args_bytes)),
257271
)
258272
.await;
@@ -261,50 +275,124 @@ async fn finalize_message(
261275
Ok(_) => {
262276
log::debug!(
263277
"idc_runtime: on_result reducer '{}' called for msg_id={}",
264-
row.on_result_reducer,
265-
row.msg_id,
278+
on_result_reducer,
279+
msg.msg_id,
266280
);
267281
}
268282
Err(e) => {
269283
log::error!(
270284
"idc_runtime: on_result reducer '{}' failed for msg_id={}: {e:?}",
271-
row.on_result_reducer,
272-
row.msg_id,
285+
on_result_reducer,
286+
msg.msg_id,
273287
);
274288
}
275289
}
276290
}
277291

278292
// Delete the row regardless of whether on_result succeeded or failed.
279-
delete_message(db, row.msg_id);
293+
delete_message(db, msg.msg_id);
280294
}
281295

282-
/// Load all messages from ST_MSG_ID into the per-target queues.
296+
/// Load all messages from ST_MSG_ID into the per-target queues, resolving delivery data
297+
/// from the corresponding outbox table rows.
283298
///
284-
/// A row's presence in the table means it has not yet been processed.
285-
/// Messages that are already in a target's queue (by msg_id) are not re-added.
299+
/// A row's presence in ST_MSG_ID means it has not yet been processed.
300+
/// Messages already in a target's queue (by msg_id) are not re-added.
286301
fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity, TargetState>) {
287302
let tx = db.begin_tx(Workload::Internal);
288-
let rows: Vec<StMsgIdRow> = db
303+
304+
let st_msg_id_rows: Vec<StMsgIdRow> = db
289305
.iter(&tx, ST_MSG_ID_ID)
290306
.map(|iter| iter.filter_map(|row_ref| StMsgIdRow::try_from(row_ref).ok()).collect())
291307
.unwrap_or_else(|e| {
292-
log::error!("idc_runtime: failed to read pending messages: {e}");
308+
log::error!("idc_runtime: failed to read st_msg_id: {e}");
293309
Vec::new()
294310
});
311+
312+
let mut pending: Vec<PendingMessage> = Vec::with_capacity(st_msg_id_rows.len());
313+
314+
for st_row in st_msg_id_rows {
315+
let outbox_table_id = TableId(st_row.outbox_table_id);
316+
317+
// Read the outbox table schema for reducer name and on_result_reducer.
318+
let schema = match db.schema_for_table(&tx, outbox_table_id) {
319+
Ok(s) => s,
320+
Err(e) => {
321+
log::error!(
322+
"idc_runtime: cannot find schema for outbox table {:?} (msg_id={}): {e}",
323+
outbox_table_id,
324+
st_row.msg_id,
325+
);
326+
continue;
327+
}
328+
};
329+
330+
let table_name = schema.table_name.to_string();
331+
let target_reducer = table_name
332+
.strip_prefix("__outbox_")
333+
.unwrap_or(&table_name)
334+
.to_string();
335+
let on_result_reducer = schema.on_result_reducer.clone();
336+
337+
// Look up the outbox row by its auto-inc PK (col 0) to get target identity and args.
338+
let outbox_row = db
339+
.iter_by_col_eq(&tx, outbox_table_id, ColId(0), &AlgebraicValue::U64(st_row.row_id))
340+
.ok()
341+
.and_then(|mut iter| iter.next());
342+
343+
let Some(outbox_row_ref) = outbox_row else {
344+
log::error!(
345+
"idc_runtime: outbox row not found in table {:?} for row_id={} (msg_id={})",
346+
outbox_table_id,
347+
st_row.row_id,
348+
st_row.msg_id,
349+
);
350+
continue;
351+
};
352+
353+
let pv = outbox_row_ref.to_product_value();
354+
355+
// Col 1: target_db_identity (Identity stored as U256).
356+
let target_db_identity = match pv.elements.get(1) {
357+
Some(AlgebraicValue::U256(u)) => Identity::from_u256(**u),
358+
other => {
359+
log::error!(
360+
"idc_runtime: outbox row col 1 expected U256 (Identity), got {other:?} (msg_id={})",
361+
st_row.msg_id,
362+
);
363+
continue;
364+
}
365+
};
366+
367+
// Cols 2+: args for the remote reducer.
368+
let args_bsatn = pv.elements[2..].iter().fold(Vec::new(), |mut acc, elem| {
369+
spacetimedb_sats::bsatn::to_writer(&mut acc, elem)
370+
.expect("writing outbox row args to BSATN should never fail");
371+
acc
372+
});
373+
374+
pending.push(PendingMessage {
375+
msg_id: st_row.msg_id,
376+
outbox_table_id,
377+
row_id: st_row.row_id,
378+
target_db_identity,
379+
target_reducer,
380+
args_bsatn,
381+
on_result_reducer,
382+
});
383+
}
384+
295385
drop(tx);
296386

297387
// Sort by msg_id ascending so delivery order is preserved.
298-
let mut sorted = rows;
299-
sorted.sort_by_key(|r| r.msg_id);
388+
pending.sort_by_key(|m| m.msg_id);
300389

301-
for row in sorted {
302-
let target_id = Identity::from(row.target_db_identity);
303-
let state = targets.entry(target_id).or_insert_with(TargetState::new);
390+
for msg in pending {
391+
let state = targets.entry(msg.target_db_identity).or_insert_with(TargetState::new);
304392
// Only add if not already in the queue (avoid duplicates after reload).
305-
let already_queued = state.queue.iter().any(|r| r.msg_id == row.msg_id);
393+
let already_queued = state.queue.iter().any(|m| m.msg_id == msg.msg_id);
306394
if !already_queued {
307-
state.queue.push_back(row);
395+
state.queue.push_back(msg);
308396
}
309397
}
310398
}
@@ -313,21 +401,20 @@ fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity,
313401
async fn attempt_delivery(
314402
client: &reqwest::Client,
315403
config: &IdcRuntimeConfig,
316-
row: &StMsgIdRow,
404+
msg: &PendingMessage,
317405
) -> DeliveryOutcome {
318-
let target_identity = Identity::from(row.target_db_identity);
319-
let target_db_hex = hex::encode(target_identity.to_byte_array());
406+
let target_db_hex = hex::encode(msg.target_db_identity.to_byte_array());
320407
let sender_hex = hex::encode(config.sender_identity.to_byte_array());
321408

322409
let url = format!(
323410
"http://localhost:{IDC_HTTP_PORT}/v1/database/{target_db_hex}/call-from-database/{}?sender_identity={sender_hex}&msg_id={}",
324-
row.target_reducer, row.msg_id,
411+
msg.target_reducer, msg.msg_id,
325412
);
326413

327414
let result = client
328415
.post(&url)
329416
.header("Content-Type", "application/octet-stream")
330-
.body(row.args_bsatn.clone())
417+
.body(msg.args_bsatn.clone())
331418
.send()
332419
.await;
333420

crates/core/src/host/instance_env.rs

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -435,10 +435,13 @@ impl InstanceEnv {
435435
/// Enqueue an outbox row into ST_MSG_ID atomically within the current transaction,
436436
/// and notify the IDC runtime so it delivers without waiting for the next poll cycle.
437437
///
438-
/// Outbox tables follow the naming convention `__outbox_<reducer>`.
439-
/// The first column (col 0) must hold the target database Identity (BSATN-encoded as U256).
440-
/// The remaining bytes of the row's BSATN encoding are passed as `args_bsatn` to the
441-
/// remote reducer.
438+
/// Outbox tables follow the naming convention `__outbox_<reducer>` and have:
439+
/// - Col 0: auto-inc primary key (u64) — stored as `row_id` in ST_MSG_ID.
440+
/// - Col 1: target database Identity (stored as U256).
441+
/// - Remaining cols: args for the remote reducer.
442+
///
443+
/// The `on_result_reducer` and delivery data are resolved at delivery time from the
444+
/// outbox table's schema and row, so ST_MSG_ID only stores the minimal reference.
442445
#[cold]
443446
#[inline(never)]
444447
fn enqueue_outbox_row(
@@ -450,34 +453,22 @@ impl InstanceEnv {
450453
) -> Result<(), NodesError> {
451454
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView as _;
452455

453-
// Get table name to extract reducer name.
454-
let schema = tx.schema_for_table(table_id).map_err(DBError::from)?;
455-
let table_name = schema.table_name.to_string();
456-
let reducer_name = table_name.strip_prefix("__outbox_").unwrap_or(&table_name).to_string();
457-
458456
let row_ref = tx.get(table_id, row_ptr).map_err(DBError::from)?.unwrap();
459-
460457
let pv = row_ref.to_product_value();
461458

462-
// Col 0 must be the target database Identity, stored as AlgebraicValue::U256.
463-
let target_db_identity = match pv.elements.first() {
464-
Some(AlgebraicValue::U256(u)) => Identity::from_u256(**u),
459+
// Col 0 is the auto-inc primary key — this is the row_id we store in ST_MSG_ID.
460+
let row_id = match pv.elements.first() {
461+
Some(AlgebraicValue::U64(id)) => *id,
465462
other => {
463+
let schema = tx.schema_for_table(table_id).map_err(DBError::from)?;
466464
return Err(NodesError::Internal(Box::new(DBError::Other(anyhow::anyhow!(
467-
"outbox table {table_name}: expected col 0 to be U256 (Identity), got {other:?}"
465+
"outbox table {}: expected col 0 to be U64 (auto-inc PK), got {other:?}",
466+
schema.table_name
468467
)))));
469468
}
470469
};
471470

472-
// Remaining elements are the args for the remote reducer.
473-
let args_bsatn = pv.elements[1..].iter().fold(Vec::new(), |mut acc, elem| {
474-
bsatn::to_writer(&mut acc, elem).expect("writing outbox row args to BSATN should never fail");
475-
acc
476-
});
477-
478-
// TODO: populate on_result_reducer from TableDef once that field is added.
479-
tx.insert_st_msg_id(table_id.0, target_db_identity, reducer_name, args_bsatn, String::new())
480-
.map_err(DBError::from)?;
471+
tx.insert_st_msg_id(table_id.0, row_id).map_err(DBError::from)?;
481472

482473
// Wake the IDC runtime immediately so it doesn't wait for the next poll cycle.
483474
let _ = self.idc_sender.send(());

crates/core/src/sql/execute.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ pub(crate) mod tests {
340340
None,
341341
false,
342342
None,
343+
None,
343344
),
344345
)?;
345346
let schema = db.schema_for_table_mut(tx, table_id)?;

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1627,6 +1627,7 @@ mod tests {
16271627
pk,
16281628
false,
16291629
None,
1630+
None,
16301631
)
16311632
}
16321633

0 commit comments

Comments
 (0)