Skip to content

Commit 5cd680b

Browse files
committed
system table naming
1 parent 23adfc2 commit 5cd680b

11 files changed

Lines changed: 135 additions & 130 deletions

File tree

crates/client-api/src/routes/database.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,8 @@ pub struct CallFromDatabaseParams {
230230
pub struct CallFromDatabaseQuery {
231231
/// Hex-encoded [`Identity`] of the sending database.
232232
sender_identity: String,
233-
/// The inter-database message ID from the sender's st_msg_id.
234-
/// Used for at-most-once delivery via `st_inbound_msg_id`.
233+
/// The inter-database message ID from the sender's st_outbound_msg.
234+
/// Used for at-most-once delivery via `st_inbound_msg`.
235235
msg_id: u64,
236236
}
237237

@@ -241,9 +241,9 @@ pub struct CallFromDatabaseQuery {
241241
///
242242
/// Required query params:
243243
/// - `sender_identity` — hex-encoded identity of the sending database.
244-
/// - `msg_id` — the inter-database message ID from the sender's st_msg_id.
244+
/// - `msg_id` — the inter-database message ID from the sender's st_outbound_msg.
245245
///
246-
/// Before invoking the reducer, the receiver checks `st_inbound_msg_id`.
246+
/// Before invoking the reducer, the receiver checks `st_inbound_msg`.
247247
/// If the incoming `msg_id` is ≤ the last delivered msg_id for `sender_identity`,
248248
/// the call is a duplicate and 200 OK is returned immediately without running the reducer.
249249
/// Otherwise the reducer is invoked, the dedup index is updated atomically in the same

crates/core/src/host/host_controller.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -889,7 +889,7 @@ struct Host {
889889
/// Handle to the task responsible for cleaning up old views.
890890
/// The task is aborted when [`Host`] is dropped.
891891
view_cleanup_task: AbortHandle,
892-
/// IDC runtime: delivers outbound inter-database messages from `st_msg_id`.
892+
/// IDC runtime: delivers outbound inter-database messages from `st_outbound_msg`.
893893
/// Stopped when [`Host`] is dropped.
894894
_idc_runtime: IdcRuntime,
895895
}

crates/core/src/host/idc_runtime.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
/// Inter-Database Communication (IDC) Runtime
22
///
33
/// Background tokio task that:
4-
/// 1. Loads undelivered entries from `st_msg_id` on startup, resolving delivery data from outbox tables.
4+
/// 1. Loads undelivered entries from `st_outbound_msg` on startup, resolving delivery data from outbox tables.
55
/// 2. Accepts immediate notifications via an mpsc channel when new outbox rows are inserted.
66
/// 3. Delivers each message in msg_id order via HTTP POST to
77
/// `http://localhost:80/v1/database/{target_db}/call-from-database/{reducer}?sender_identity=<hex>&msg_id=<n>`
88
/// 4. On transport errors (network, 5xx, 4xx except 422/402): retries infinitely with exponential
99
/// backoff, blocking only the affected target database (other targets continue unaffected).
1010
/// 5. On reducer errors (HTTP 422) or budget exceeded (HTTP 402): calls the configured
11-
/// `on_result_reducer` (read from the outbox table's schema) and deletes the st_msg_id row.
11+
/// `on_result_reducer` (read from the outbox table's schema) and deletes the st_outbound_msg row.
1212
/// 6. Enforces sequential delivery per target database: msg N+1 is only delivered after N is done.
1313
use crate::db::relational_db::RelationalDB;
1414
use crate::host::module_host::WeakModuleHost;
1515
use crate::host::FunctionArgs;
1616
use spacetimedb_datastore::execution_context::Workload;
17-
use spacetimedb_datastore::system_tables::{StMsgIdRow, ST_MSG_ID_ID};
17+
use spacetimedb_datastore::system_tables::{StOutboundMsgRow, ST_OUTBOUND_MSG_ID};
1818
use spacetimedb_datastore::traits::IsolationLevel;
1919
use spacetimedb_lib::{AlgebraicValue, Identity};
2020
use spacetimedb_primitives::{ColId, TableId};
@@ -207,19 +207,19 @@ async fn run_idc_loop(
207207

208208
/// Decode the delivery outcome into `(result_status, result_payload)` for recording.
209209
fn outcome_to_result(outcome: &DeliveryOutcome) -> (u8, String) {
210-
use spacetimedb_datastore::system_tables::st_inbound_msg_id_result_status;
210+
use spacetimedb_datastore::system_tables::st_inbound_msg_result_status;
211211
match outcome {
212-
DeliveryOutcome::Success => (st_inbound_msg_id_result_status::SUCCESS, String::new()),
213-
DeliveryOutcome::ReducerError(msg) => (st_inbound_msg_id_result_status::REDUCER_ERROR, msg.clone()),
212+
DeliveryOutcome::Success => (st_inbound_msg_result_status::SUCCESS, String::new()),
213+
DeliveryOutcome::ReducerError(msg) => (st_inbound_msg_result_status::REDUCER_ERROR, msg.clone()),
214214
DeliveryOutcome::BudgetExceeded => (
215-
st_inbound_msg_id_result_status::REDUCER_ERROR,
215+
st_inbound_msg_result_status::REDUCER_ERROR,
216216
"budget exceeded".to_string(),
217217
),
218218
DeliveryOutcome::TransportError(_) => unreachable!("transport errors never finalize"),
219219
}
220220
}
221221

222-
/// Finalize a delivered message: call the on_result reducer (if any), then delete from ST_MSG_ID.
222+
/// Finalize a delivered message: call the on_result reducer (if any), then delete from ST_OUTBOUND_MSG.
223223
async fn finalize_message(
224224
db: &RelationalDB,
225225
module_host: &WeakModuleHost,
@@ -288,25 +288,28 @@ async fn finalize_message(
288288
delete_message(db, msg.msg_id);
289289
}
290290

291-
/// Load all messages from ST_MSG_ID into the per-target queues, resolving delivery data
291+
/// Load all messages from ST_OUTBOUND_MSG into the per-target queues, resolving delivery data
292292
/// from the corresponding outbox table rows.
293293
///
294-
/// A row's presence in ST_MSG_ID means it has not yet been processed.
294+
/// A row's presence in ST_OUTBOUND_MSG means it has not yet been processed.
295295
/// Messages already in a target's queue (by msg_id) are not re-added.
296296
fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity, TargetState>) {
297297
let tx = db.begin_tx(Workload::Internal);
298298

299-
let st_msg_id_rows: Vec<StMsgIdRow> = db
300-
.iter(&tx, ST_MSG_ID_ID)
301-
.map(|iter| iter.filter_map(|row_ref| StMsgIdRow::try_from(row_ref).ok()).collect())
299+
let st_outbound_msg_rows: Vec<StOutboundMsgRow> = db
300+
.iter(&tx, ST_OUTBOUND_MSG_ID)
301+
.map(|iter| {
302+
iter.filter_map(|row_ref| StOutboundMsgRow::try_from(row_ref).ok())
303+
.collect()
304+
})
302305
.unwrap_or_else(|e| {
303-
log::error!("idc_runtime: failed to read st_msg_id: {e}");
306+
log::error!("idc_runtime: failed to read st_outbound_msg: {e}");
304307
Vec::new()
305308
});
306309

307-
let mut pending: Vec<PendingMessage> = Vec::with_capacity(st_msg_id_rows.len());
310+
let mut pending: Vec<PendingMessage> = Vec::with_capacity(st_outbound_msg_rows.len());
308311

309-
for st_row in st_msg_id_rows {
312+
for st_row in st_outbound_msg_rows {
310313
let outbox_table_id = TableId(st_row.outbox_table_id);
311314

312315
// Read the outbox table schema for reducer name and on_result_reducer.
@@ -432,10 +435,10 @@ async fn attempt_delivery(
432435
}
433436
}
434437

435-
/// Delete a message from ST_MSG_ID within a new transaction.
438+
/// Delete a message from ST_OUTBOUND_MSG within a new transaction.
436439
fn delete_message(db: &RelationalDB, msg_id: u64) {
437440
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
438-
if let Err(e) = tx.delete_msg_id(msg_id) {
441+
if let Err(e) = tx.delete_outbound_msg(msg_id) {
439442
log::error!("idc_runtime: failed to delete msg_id={msg_id}: {e}");
440443
let _ = db.rollback_mut_tx(tx);
441444
return;

crates/core/src/host/instance_env.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -432,16 +432,16 @@ impl InstanceEnv {
432432
Ok(())
433433
}
434434

435-
/// Enqueue an outbox row into ST_MSG_ID atomically within the current transaction,
435+
/// Enqueue an outbox row into ST_OUTBOUND_MSG atomically within the current transaction,
436436
/// and notify the IDC runtime so it delivers without waiting for the next poll cycle.
437437
///
438438
/// Outbox tables follow the naming convention `__outbox_<reducer>` and have:
439-
/// - Col 0: auto-inc primary key (u64) — stored as `row_id` in ST_MSG_ID.
439+
/// - Col 0: auto-inc primary key (u64) — stored as `row_id` in ST_OUTBOUND_MSG.
440440
/// - Col 1: target database Identity (stored as U256).
441441
/// - Remaining cols: args for the remote reducer.
442442
///
443443
/// The `on_result_reducer` and delivery data are resolved at delivery time from the
444-
/// outbox table's schema and row, so ST_MSG_ID only stores the minimal reference.
444+
/// outbox table's schema and row, so ST_OUTBOUND_MSG only stores the minimal reference.
445445
fn enqueue_outbox_row(
446446
&self,
447447
_stdb: &RelationalDB,
@@ -454,7 +454,7 @@ impl InstanceEnv {
454454
let row_ref = tx.get(table_id, row_ptr).map_err(DBError::from)?.unwrap();
455455
let pv = row_ref.to_product_value();
456456

457-
// Col 0 is the auto-inc primary key — this is the row_id we store in ST_MSG_ID.
457+
// Col 0 is the auto-inc primary key — this is the row_id we store in ST_OUTBOUND_MSG.
458458
let row_id = match pv.elements.first() {
459459
Some(AlgebraicValue::U64(id)) => *id,
460460
other => {
@@ -466,7 +466,7 @@ impl InstanceEnv {
466466
}
467467
};
468468

469-
tx.insert_st_msg_id(table_id.0, row_id).map_err(DBError::from)?;
469+
tx.insert_st_outbound_msg(table_id.0, row_id).map_err(DBError::from)?;
470470

471471
// Wake the IDC runtime immediately so it doesn't wait for the next poll cycle.
472472
let _ = self.idc_sender.send(());

crates/core/src/host/module_host.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,7 @@ pub struct CallReducerParams {
627627
/// If set, enables at-most-once delivery semantics for database-to-database calls.
628628
///
629629
/// The tuple is `(sender_database_identity, sender_msg_id)`.
630-
/// Before running the reducer, `st_inbound_msg_id` is consulted:
630+
/// Before running the reducer, `st_inbound_msg` is consulted:
631631
/// if `sender_msg_id` ≤ the stored last-delivered msg_id for the sender,
632632
/// the call is a duplicate and returns [`ReducerCallError::Deduplicated`].
633633
/// Otherwise the reducer runs, and the stored msg_id is updated atomically
@@ -1670,7 +1670,7 @@ impl ModuleHost {
16701670
/// Variant of [`Self::call_reducer`] for database-to-database calls.
16711671
///
16721672
/// Behaves identically to `call_reducer`, except that it enforces at-most-once
1673-
/// delivery using the `st_inbound_msg_id` dedup index.
1673+
/// delivery using the `st_inbound_msg` dedup index.
16741674
/// Before invoking the reducer, the receiver checks whether
16751675
/// `sender_msg_id` ≤ the last delivered msg_id for `sender_database_identity`.
16761676
/// If so, the call is a duplicate and [`ReducerOutcome::Deduplicated`] is returned

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use spacetimedb_datastore::db_metrics::DB_METRICS;
3737
use spacetimedb_datastore::error::{DatastoreError, ViewError};
3838
use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload};
3939
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo};
40-
use spacetimedb_datastore::system_tables::st_inbound_msg_id_result_status;
40+
use spacetimedb_datastore::system_tables::st_inbound_msg_result_status;
4141
use spacetimedb_datastore::traits::{IsolationLevel, Program};
4242
use spacetimedb_execution::pipelined::PipelinedProject;
4343
use spacetimedb_lib::buffer::DecodeError;
@@ -855,13 +855,13 @@ impl InstanceCommon {
855855
// If the incoming msg_id is ≤ the last delivered msg_id for this sender,
856856
// the message is a duplicate; discard it and return the stored result.
857857
if let Some((sender_identity, sender_msg_id)) = dedup_sender {
858-
let stored = tx.get_inbound_msg_id_row(sender_identity);
859-
if stored.as_ref().is_some_and(|r| sender_msg_id <= r.last_msg_id) {
858+
let stored = tx.get_inbound_msg_row(sender_identity);
859+
if stored.as_ref().is_some_and(|r| sender_msg_id <= r.last_outbound_msg) {
860860
let _ = stdb.rollback_mut_tx(tx);
861861
let stored = stored.unwrap();
862862
let outcome = match stored.result_status {
863-
s if s == st_inbound_msg_id_result_status::SUCCESS => ReducerOutcome::Committed,
864-
s if s == st_inbound_msg_id_result_status::REDUCER_ERROR => {
863+
s if s == st_inbound_msg_result_status::SUCCESS => ReducerOutcome::Committed,
864+
s if s == st_inbound_msg_result_status::REDUCER_ERROR => {
865865
ReducerOutcome::Failed(Box::new(stored.result_payload.into()))
866866
}
867867
_ => ReducerOutcome::Deduplicated,
@@ -927,10 +927,10 @@ impl InstanceCommon {
927927
// Atomically update the dedup index so this sender's tx_offset is recorded
928928
// as delivered. This prevents re-processing if the message is retried.
929929
let dedup_res = match dedup_sender {
930-
Some((sender_identity, sender_msg_id)) => tx.upsert_inbound_last_msg_id(
930+
Some((sender_identity, sender_msg_id)) => tx.upsert_inbound_last_msg(
931931
sender_identity,
932932
sender_msg_id,
933-
st_inbound_msg_id_result_status::SUCCESS,
933+
st_inbound_msg_result_status::SUCCESS,
934934
String::new(),
935935
),
936936
None => Ok(()),
@@ -996,16 +996,16 @@ impl InstanceCommon {
996996
let event = commit_and_broadcast_event(&info.subscriptions, client, event, out.tx).event;
997997

998998
// For IDC (database-to-database) calls: if the reducer failed with a user error,
999-
// record the failure in st_inbound_msg_id in a separate tx (since the reducer tx
999+
// record the failure in st_inbound_msg in a separate tx (since the reducer tx
10001000
// was rolled back). This allows the sending database to receive the error on dedup
10011001
// rather than re-running the reducer.
10021002
if let (Some((sender_identity, sender_msg_id)), EventStatus::FailedUser(err)) = (dedup_sender, &event.status) {
10031003
let err_msg = err.clone();
10041004
let mut dedup_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
1005-
if let Err(e) = dedup_tx.upsert_inbound_last_msg_id(
1005+
if let Err(e) = dedup_tx.upsert_inbound_last_msg(
10061006
sender_identity,
10071007
sender_msg_id,
1008-
st_inbound_msg_id_result_status::REDUCER_ERROR,
1008+
st_inbound_msg_result_status::REDUCER_ERROR,
10091009
err_msg,
10101010
) {
10111011
log::error!("IDC: failed to record reducer error in dedup table for sender {sender_identity}: {e}");

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
is_built_in_meta_row, system_tables, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow,
2020
StFields, StIndexRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewRow, SystemTable,
2121
ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX,
22-
ST_CONSTRAINT_NAME, ST_INBOUND_MSG_ID_ID, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID,
22+
ST_CONSTRAINT_NAME, ST_INBOUND_MSG_ID, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID,
2323
ST_MODULE_IDX, ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID, ST_SCHEDULED_IDX,
2424
ST_SEQUENCE_ID, ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_IDX, ST_VAR_ID, ST_VAR_IDX,
2525
ST_VIEW_ARG_ID, ST_VIEW_ARG_IDX,
@@ -30,8 +30,8 @@ use crate::{
3030
locking_tx_datastore::ViewCallInfo,
3131
system_tables::{
3232
ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ACCESSOR_IDX, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX,
33-
ST_EVENT_TABLE_ID, ST_EVENT_TABLE_IDX, ST_INBOUND_MSG_ID_IDX, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_IDX,
34-
ST_MSG_ID_ID, ST_MSG_ID_IDX, ST_TABLE_ACCESSOR_ID, ST_TABLE_ACCESSOR_IDX, ST_VIEW_COLUMN_ID,
33+
ST_EVENT_TABLE_ID, ST_EVENT_TABLE_IDX, ST_INBOUND_MSG_IDX, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_IDX,
34+
ST_OUTBOUND_MSG_ID, ST_OUTBOUND_MSG_IDX, ST_TABLE_ACCESSOR_ID, ST_TABLE_ACCESSOR_IDX, ST_VIEW_COLUMN_ID,
3535
ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID,
3636
ST_VIEW_SUB_IDX,
3737
},
@@ -478,8 +478,8 @@ impl CommittedState {
478478
self.create_table(ST_TABLE_ACCESSOR_ID, schemas[ST_TABLE_ACCESSOR_IDX].clone());
479479
self.create_table(ST_INDEX_ACCESSOR_ID, schemas[ST_INDEX_ACCESSOR_IDX].clone());
480480
self.create_table(ST_COLUMN_ACCESSOR_ID, schemas[ST_COLUMN_ACCESSOR_IDX].clone());
481-
self.create_table(ST_INBOUND_MSG_ID_ID, schemas[ST_INBOUND_MSG_ID_IDX].clone());
482-
self.create_table(ST_MSG_ID_ID, schemas[ST_MSG_ID_IDX].clone());
481+
self.create_table(ST_INBOUND_MSG_ID, schemas[ST_INBOUND_MSG_IDX].clone());
482+
self.create_table(ST_OUTBOUND_MSG_ID, schemas[ST_OUTBOUND_MSG_IDX].clone());
483483

484484
// Insert the sequences into `st_sequences`
485485
let (st_sequences, blob_store, pool) =

0 commit comments

Comments
 (0)