@@ -8,12 +8,12 @@ use crate::locking_tx_datastore::state_view::StateView;
88use crate :: system_tables:: { is_built_in_meta_row, StFields as _} ;
99use crate :: system_tables:: { StColumnRow , StTableFields , StTableRow , ST_COLUMN_ID , ST_TABLE_ID } ;
1010use anyhow:: { anyhow, Context } ;
11- use core:: ops:: { Deref , DerefMut } ;
12- use parking_lot:: RwLock ;
11+ use core:: ops:: { Deref , DerefMut , RangeBounds } ;
12+ use parking_lot:: { RwLock , RwLockReadGuard } ;
1313use spacetimedb_commitlog:: payload:: txdata;
1414use spacetimedb_data_structures:: map:: { HashSet , IntMap , IntSet } ;
1515use spacetimedb_lib:: Identity ;
16- use spacetimedb_primitives:: { ColId , TableId } ;
16+ use spacetimedb_primitives:: { ColId , ColList , TableId } ;
1717use spacetimedb_sats:: algebraic_value:: de:: ValueDeserializer ;
1818use spacetimedb_sats:: buffer:: BufReader ;
1919use spacetimedb_sats:: { AlgebraicValue , Deserialize , ProductValue } ;
@@ -51,6 +51,11 @@ impl<F> Replay<F> {
5151 pub fn next_tx_offset ( & self ) -> u64 {
5252 self . committed_state . read_arc ( ) . next_tx_offset
5353 }
54+
55+ // NOTE: This is not unused.
56+ pub fn committed_state ( & self ) -> RwLockReadGuard < ' _ , CommittedState > {
57+ self . committed_state . read ( )
58+ }
5459}
5560
5661impl < F : FnMut ( u64 ) > spacetimedb_commitlog:: Decoder for & mut Replay < F > {
@@ -339,6 +344,24 @@ struct ReplayCommittedState<'cs> {
339344 /// and delete from it during [`Self::replay_delete`] of `st_column` rows.
340345 /// We assert this is empty at the end of each transaction.
341346 replay_columns_to_ignore : HashSet < RowPointer > ,
347+
348+ /// Set of tables whose `st_table` entries have been updated during the currently-replaying transaction,
349+ /// mapped to the current most-recent `st_table` row.
350+ ///
351+ /// When processing an insert to `st_table`, if the table already exists, we'll record it here.
352+ /// Then, when we see a corresponding delete, we know that the table has not been dropped,
353+ /// and so we won't delete the in-memory structure or insert its ID into [`Self::replay_table_dropped`].
354+ ///
355+ /// When looking up the `st_table` row for a table, if it has an entry here,
356+ /// that means there are two rows resident in `st_table` at this point in replay.
357+ /// We return the row recorded here rather than inspecting `st_table`.
358+ ///
359+ /// We remove from this set when we reach the matching delete,
360+ /// and assert this set is empty at the end of each transaction.
361+ ///
362+ /// [`RowPointer`]s from this set are passed to the `unsafe` [`Table::get_row_ref_unchecked`],
363+ /// so it's important to properly maintain only [`RowPointer`]s to valid, extant, non-deleted rows.
364+ pub ( super ) replay_table_updated : IntMap < TableId , RowPointer > ,
342365}
343366
344367impl Deref for ReplayCommittedState < ' _ > {
@@ -361,6 +384,7 @@ impl<'cs> ReplayCommittedState<'cs> {
361384 state,
362385 replay_table_dropped : <_ >:: default ( ) ,
363386 replay_columns_to_ignore : <_ >:: default ( ) ,
387+ replay_table_updated : <_ >:: default ( ) ,
364388 }
365389 }
366390
@@ -655,6 +679,69 @@ impl<'cs> ReplayCommittedState<'cs> {
655679 }
656680}
657681
682+ impl StateView for ReplayCommittedState < ' _ > {
683+ /// Find the `st_table` row for `table_id`,
684+ /// first inspecting [`Self::replay_table_updated`],
685+ /// then falling back to [`CommittedState::iter_by_col_eq`].
686+ fn find_st_table_row ( & self , table_id : TableId ) -> Result < StTableRow > {
687+ if let Some ( row_ptr) = self . replay_table_updated . get ( & table_id) {
688+ let ( table, blob_store, _) = self . state . get_table_and_blob_store ( table_id) ?;
689+ // SAFETY: `row_ptr` is stored in `self.replay_table_updated`,
690+ // meaning it was inserted into `st_table` by `replay_insert`
691+ // and has not yet been deleted by `replay_delete_by_rel`.
692+ let row_ref = unsafe { table. get_row_ref_unchecked ( blob_store, * row_ptr) } ;
693+ StTableRow :: try_from ( row_ref)
694+ } else {
695+ self . state . find_st_table_row ( table_id)
696+ }
697+ }
698+
699+ type Iter < ' a >
700+ = <CommittedState as StateView >:: Iter < ' a >
701+ where
702+ Self : ' a ;
703+
704+ type IterByColRange < ' a , R : RangeBounds < AlgebraicValue > >
705+ = <CommittedState as StateView >:: IterByColRange < ' a , R >
706+ where
707+ Self : ' a ;
708+
709+ type IterByColEq < ' a , ' r >
710+ = <CommittedState as StateView >:: IterByColEq < ' a , ' r >
711+ where
712+ Self : ' a ;
713+
714+ fn get_schema ( & self , table_id : TableId ) -> Option < & Arc < TableSchema > > {
715+ self . state . get_schema ( table_id)
716+ }
717+
718+ fn table_row_count ( & self , table_id : TableId ) -> Option < u64 > {
719+ self . state . table_row_count ( table_id)
720+ }
721+
722+ fn iter ( & self , table_id : TableId ) -> Result < Self :: Iter < ' _ > > {
723+ self . state . iter ( table_id)
724+ }
725+
726+ fn iter_by_col_range < R : RangeBounds < AlgebraicValue > > (
727+ & self ,
728+ table_id : TableId ,
729+ cols : ColList ,
730+ range : R ,
731+ ) -> Result < Self :: IterByColRange < ' _ , R > > {
732+ self . state . iter_by_col_range ( table_id, cols, range)
733+ }
734+
735+ fn iter_by_col_eq < ' a , ' r > (
736+ & ' a self ,
737+ table_id : TableId ,
738+ cols : impl Into < ColList > ,
739+ value : & ' r AlgebraicValue ,
740+ ) -> Result < Self :: IterByColEq < ' a , ' r > > {
741+ self . state . iter_by_col_eq ( table_id, cols, value)
742+ }
743+ }
744+
658745#[ cfg( test) ]
659746mod tests {
660747 use crate :: {
0 commit comments