Skip to content

Commit 7e84ae4

Browse files
committed
st_databases_tx_offset
1 parent afd3f35 commit 7e84ae4

2 files changed

Lines changed: 48 additions & 9 deletions

File tree

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,20 @@ use crate::{
1919
is_built_in_meta_row, system_tables, table_id_is_reserved, StColumnRow, StConstraintData, StConstraintRow,
2020
StFields, StIndexRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, StViewRow, SystemTable,
2121
ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX,
22-
ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID, ST_MODULE_IDX,
23-
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID, ST_SCHEDULED_IDX, ST_SEQUENCE_ID,
24-
ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_IDX, ST_VAR_ID, ST_VAR_IDX, ST_VIEW_ARG_ID,
25-
ST_VIEW_ARG_IDX,
22+
ST_CONSTRAINT_NAME, ST_DATABASES_TX_OFFSET_ID, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID,
23+
ST_MODULE_IDX, ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID, ST_SCHEDULED_IDX,
24+
ST_SEQUENCE_ID, ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_IDX, ST_VAR_ID, ST_VAR_IDX,
25+
ST_VIEW_ARG_ID, ST_VIEW_ARG_IDX,
2626
},
2727
traits::{EphemeralTables, TxData},
2828
};
2929
use crate::{
3030
locking_tx_datastore::ViewCallInfo,
3131
system_tables::{
3232
ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ACCESSOR_IDX, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX,
33-
ST_EVENT_TABLE_ID, ST_EVENT_TABLE_IDX, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_IDX, ST_TABLE_ACCESSOR_ID,
34-
ST_TABLE_ACCESSOR_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID,
35-
ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX,
33+
ST_DATABASES_TX_OFFSET_IDX, ST_EVENT_TABLE_ID, ST_EVENT_TABLE_IDX, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_IDX,
34+
ST_TABLE_ACCESSOR_ID, ST_TABLE_ACCESSOR_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX,
35+
ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX,
3636
},
3737
};
3838
use anyhow::anyhow;
@@ -477,6 +477,7 @@ impl CommittedState {
477477
self.create_table(ST_TABLE_ACCESSOR_ID, schemas[ST_TABLE_ACCESSOR_IDX].clone());
478478
self.create_table(ST_INDEX_ACCESSOR_ID, schemas[ST_INDEX_ACCESSOR_IDX].clone());
479479
self.create_table(ST_COLUMN_ACCESSOR_ID, schemas[ST_COLUMN_ACCESSOR_IDX].clone());
480+
self.create_table(ST_DATABASES_TX_OFFSET_ID, schemas[ST_DATABASES_TX_OFFSET_IDX].clone());
480481

481482
// Insert the sequences into `st_sequences`
482483
let (st_sequences, blob_store, pool) =

crates/datastore/src/system_tables.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ pub const ST_INDEX_ACCESSOR_ID: TableId = TableId(19);
8989
/// The static ID of the table that maps canonical column names to accessor names
9090
pub const ST_COLUMN_ACCESSOR_ID: TableId = TableId(20);
9191

92+
/// The static ID of the table that tracks the last tx offset send by other databases.
93+
pub const ST_DATABASES_TX_OFFSET_ID: TableId = TableId(21);
94+
9295
pub(crate) const ST_CONNECTION_CREDENTIALS_NAME: &str = "st_connection_credentials";
9396
pub const ST_TABLE_NAME: &str = "st_table";
9497
pub const ST_COLUMN_NAME: &str = "st_column";
@@ -109,6 +112,7 @@ pub(crate) const ST_EVENT_TABLE_NAME: &str = "st_event_table";
109112
pub(crate) const ST_TABLE_ACCESSOR_NAME: &str = "st_table_accessor";
110113
pub(crate) const ST_INDEX_ACCESSOR_NAME: &str = "st_index_accessor";
111114
pub(crate) const ST_COLUMN_ACCESSOR_NAME: &str = "st_column_accessor";
115+
pub(crate) const ST_DATABASES_TX_OFFSET_NAME: &str = "st_databases_tx_offset";
112116
/// Reserved range of sequence values used for system tables.
113117
///
114118
/// Ids for user-created tables will start at `ST_RESERVED_SEQUENCE_RANGE`.
@@ -182,7 +186,7 @@ pub fn is_built_in_meta_row(table_id: TableId, row: &ProductValue) -> Result<boo
182186
let row: StEventTableRow = to_typed_row(row)?;
183187
table_id_is_reserved(row.table_id)
184188
}
185-
ST_TABLE_ACCESSOR_ID | ST_INDEX_ACCESSOR_ID | ST_COLUMN_ACCESSOR_ID => false,
189+
ST_TABLE_ACCESSOR_ID | ST_INDEX_ACCESSOR_ID | ST_COLUMN_ACCESSOR_ID | ST_DATABASES_TX_OFFSET_ID => false,
186190
TableId(..ST_RESERVED_SEQUENCE_RANGE) => {
187191
log::warn!("Unknown system table {table_id:?}");
188192
false
@@ -205,7 +209,7 @@ pub enum SystemTable {
205209
st_table_accessor,
206210
}
207211

208-
pub fn system_tables() -> [TableSchema; 20] {
212+
pub fn system_tables() -> [TableSchema; 21] {
209213
[
210214
// The order should match the `id` of the system table, that start with [ST_TABLE_IDX].
211215
st_table_schema(),
@@ -228,6 +232,7 @@ pub fn system_tables() -> [TableSchema; 20] {
228232
st_table_accessor_schema(),
229233
st_index_accessor_schema(),
230234
st_column_accessor_schema(),
235+
st_databases_tx_offset_schema(),
231236
]
232237
}
233238

@@ -276,6 +281,7 @@ pub(crate) const ST_EVENT_TABLE_IDX: usize = 16;
276281
pub(crate) const ST_TABLE_ACCESSOR_IDX: usize = 17;
277282
pub(crate) const ST_INDEX_ACCESSOR_IDX: usize = 18;
278283
pub(crate) const ST_COLUMN_ACCESSOR_IDX: usize = 19;
284+
pub(crate) const ST_DATABASES_TX_OFFSET_IDX: usize = 20;
279285

280286
macro_rules! st_fields_enum {
281287
($(#[$attr:meta])* enum $ty_name:ident { $($name:expr, $var:ident = $discr:expr,)* }) => {
@@ -450,6 +456,11 @@ st_fields_enum!(enum StColumnAccessorFields {
450456
"accessor_name", AccessorName = 2,
451457
});
452458

459+
st_fields_enum!(enum StDatabasesTxOffsetFields {
460+
"database_identity", DatabaseIdentity = 0,
461+
"tx_offset", TxOffset = 1,
462+
});
463+
453464
/// Helper method to check that a system table has the correct fields.
454465
/// Does not check field types since those aren't included in `StFields` types.
455466
/// If anything in here is not true, the system is completely broken, so it's fine to assert.
@@ -668,6 +679,17 @@ fn system_module_def() -> ModuleDef {
668679
.with_unique_constraint(st_column_accessor_table_alias_cols)
669680
.with_index_no_accessor_name(btree(st_column_accessor_table_alias_cols));
670681

682+
let st_databases_tx_offset_type = builder.add_type::<StDatabasesTxOffsetRow>();
683+
builder
684+
.build_table(
685+
ST_DATABASES_TX_OFFSET_NAME,
686+
*st_databases_tx_offset_type.as_ref().expect("should be ref"),
687+
)
688+
.with_type(TableType::System)
689+
.with_unique_constraint(StDatabasesTxOffsetFields::DatabaseIdentity)
690+
.with_index_no_accessor_name(btree(StDatabasesTxOffsetFields::DatabaseIdentity))
691+
.with_primary_key(StDatabasesTxOffsetFields::DatabaseIdentity);
692+
671693
let result = builder
672694
.finish()
673695
.try_into()
@@ -693,6 +715,7 @@ fn system_module_def() -> ModuleDef {
693715
validate_system_table::<StTableAccessorFields>(&result, ST_TABLE_ACCESSOR_NAME);
694716
validate_system_table::<StIndexAccessorFields>(&result, ST_INDEX_ACCESSOR_NAME);
695717
validate_system_table::<StColumnAccessorFields>(&result, ST_COLUMN_ACCESSOR_NAME);
718+
validate_system_table::<StDatabasesTxOffsetFields>(&result, ST_DATABASES_TX_OFFSET_NAME);
696719

697720
result
698721
}
@@ -741,6 +764,7 @@ lazy_static::lazy_static! {
741764
m.insert("st_index_accessor_accessor_name_key", ConstraintId(23));
742765
m.insert("st_column_accessor_table_name_col_name_key", ConstraintId(24));
743766
m.insert("st_column_accessor_table_name_accessor_name_key", ConstraintId(25));
767+
m.insert("st_databases_tx_offset_database_identity_key", ConstraintId(26));
744768
m
745769
};
746770
}
@@ -779,6 +803,7 @@ lazy_static::lazy_static! {
779803
m.insert("st_index_accessor_accessor_name_idx_btree", IndexId(27));
780804
m.insert("st_column_accessor_table_name_col_name_idx_btree", IndexId(28));
781805
m.insert("st_column_accessor_table_name_accessor_name_idx_btree", IndexId(29));
806+
m.insert("st_databases_tx_offset_database_identity_idx_btree", IndexId(30));
782807
m
783808
};
784809
}
@@ -940,6 +965,10 @@ fn st_column_accessor_schema() -> TableSchema {
940965
st_schema(ST_COLUMN_ACCESSOR_NAME, ST_COLUMN_ACCESSOR_ID)
941966
}
942967

968+
fn st_databases_tx_offset_schema() -> TableSchema {
969+
st_schema(ST_DATABASES_TX_OFFSET_NAME, ST_DATABASES_TX_OFFSET_ID)
970+
}
971+
943972
/// If `table_id` refers to a known system table, return its schema.
944973
///
945974
/// Used when restoring from a snapshot; system tables are reinstantiated with this schema,
@@ -968,6 +997,7 @@ pub(crate) fn system_table_schema(table_id: TableId) -> Option<TableSchema> {
968997
ST_TABLE_ACCESSOR_ID => Some(st_table_accessor_schema()),
969998
ST_INDEX_ACCESSOR_ID => Some(st_index_accessor_schema()),
970999
ST_COLUMN_ACCESSOR_ID => Some(st_column_accessor_schema()),
1000+
ST_DATABASES_TX_OFFSET_ID => Some(st_databases_tx_offset_schema()),
9711001
_ => None,
9721002
}
9731003
}
@@ -1859,6 +1889,14 @@ impl From<StColumnAccessorRow> for ProductValue {
18591889
}
18601890
}
18611891

1892+
/// System Table [ST_DATABASES_TX_OFFST_NAME]
1893+
#[derive(Debug, Clone, PartialEq, Eq, SpacetimeType)]
1894+
#[sats(crate = spacetimedb_lib)]
1895+
pub struct StDatabasesTxOffsetRow {
1896+
pub database_identity: IdentityViaU256,
1897+
pub tx_offset: u64,
1898+
}
1899+
18621900
thread_local! {
18631901
static READ_BUF: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
18641902
}

0 commit comments

Comments
 (0)