Skip to content

Commit d639be0

Browse files
authored
Replay: some code motion & reuse ReplayCommittedState (#4849)
# Description of Changes First two commits are code motion. The second commit fixes a mistake I made in a previous PR that made us use potentially several `ReplayCommittedState`s per `datastore.replay(..)`. More to come in terms of PRs; stay tuned. # API and ABI breaking changes None # Expected complexity level and risk 3
1 parent 91494c9 commit d639be0

4 files changed

Lines changed: 150 additions & 119 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 14 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1717
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1818
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
1919
};
20-
use spacetimedb_datastore::locking_tx_datastore::{IndexScanPointOrRange, MutTxId, TxId};
20+
use spacetimedb_datastore::locking_tx_datastore::{ApplyHistoryCounters, IndexScanPointOrRange, MutTxId, TxId};
2121
use spacetimedb_datastore::system_tables::{
2222
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
2323
};
@@ -1617,62 +1617,20 @@ impl RelationalDB {
16171617
}
16181618
}
16191619

1620-
fn apply_history<H>(datastore: &Locking, database_identity: Identity, history: H) -> Result<(), DBError>
1621-
where
1622-
H: durability::History<TxData = Txdata>,
1623-
{
1624-
log::info!("[{database_identity}] DATABASE: applying transaction history...");
1625-
1626-
// TODO: Revisit once we actually replay history suffixes, ie. starting
1627-
// from an offset larger than the history's min offset.
1628-
// TODO: We may want to require that a `tokio::runtime::Handle` is
1629-
// always supplied when constructing a `RelationalDB`. This would allow
1630-
// to spawn a timer task here which just prints the progress periodically
1631-
// in case the history is finite but very long.
1632-
let (_, max_tx_offset) = history.tx_range_hint();
1633-
let mut last_logged_percentage = 0;
1634-
let progress = |tx_offset: u64| {
1635-
if let Some(max_tx_offset) = max_tx_offset {
1636-
let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32;
1637-
if percentage > last_logged_percentage && percentage % 10 == 0 {
1638-
log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})");
1639-
last_logged_percentage = percentage;
1640-
}
1641-
// Print _something_ even if we don't know what's still ahead.
1642-
} else if tx_offset.is_multiple_of(10_000) {
1643-
log::info!("[{database_identity}] Loading transaction {tx_offset}");
1644-
}
1620+
fn apply_history(
1621+
datastore: &Locking,
1622+
database_identity: Identity,
1623+
history: impl durability::History<TxData = Txdata>,
1624+
) -> Result<(), DBError> {
1625+
let counters = ApplyHistoryCounters {
1626+
replay_commitlog_time_seconds: WORKER_METRICS
1627+
.replay_commitlog_time_seconds
1628+
.with_label_values(&database_identity),
1629+
replay_commitlog_num_commits: WORKER_METRICS
1630+
.replay_commitlog_num_commits
1631+
.with_label_values(&database_identity),
16451632
};
1646-
1647-
let time_before = std::time::Instant::now();
1648-
1649-
let mut replay = datastore.replay(
1650-
progress,
1651-
// We don't want to instantiate an incorrect state;
1652-
// if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data.
1653-
spacetimedb_datastore::locking_tx_datastore::ErrorBehavior::FailFast,
1654-
);
1655-
let start_tx_offset = replay.next_tx_offset();
1656-
history
1657-
.fold_transactions_from(start_tx_offset, &mut replay)
1658-
.map_err(anyhow::Error::from)?;
1659-
1660-
let time_elapsed = time_before.elapsed();
1661-
WORKER_METRICS
1662-
.replay_commitlog_time_seconds
1663-
.with_label_values(&database_identity)
1664-
.set(time_elapsed.as_secs_f64());
1665-
1666-
let end_tx_offset = replay.next_tx_offset();
1667-
WORKER_METRICS
1668-
.replay_commitlog_num_commits
1669-
.with_label_values(&database_identity)
1670-
.set((end_tx_offset - start_tx_offset) as _);
1671-
1672-
log::info!("[{database_identity}] DATABASE: applied transaction history");
1673-
datastore.rebuild_state_after_replay()?;
1674-
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");
1675-
1633+
spacetimedb_datastore::locking_tx_datastore::apply_history(datastore, database_identity, history, counters)?;
16761634
Ok(())
16771635
}
16781636

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
},
2525
};
2626
use anyhow::anyhow;
27-
use core::{cell::RefCell, ops::RangeBounds};
27+
use core::ops::RangeBounds;
2828
use parking_lot::{Mutex, RwLock};
2929
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap};
3030
use spacetimedb_durability::TxOffset;
@@ -33,7 +33,7 @@ use spacetimedb_lib::{ConnectionId, Identity};
3333
use spacetimedb_paths::server::SnapshotDirPath;
3434
use spacetimedb_primitives::{ColId, ColList, ConstraintId, IndexId, SequenceId, TableId, ViewId};
3535
use spacetimedb_sats::memory_usage::MemoryUsage;
36-
use spacetimedb_sats::{bsatn, AlgebraicValue, ProductValue};
36+
use spacetimedb_sats::{AlgebraicValue, ProductValue};
3737
use spacetimedb_schema::table_name::TableName;
3838
use spacetimedb_schema::{
3939
reducer_name::ReducerName,
@@ -48,7 +48,6 @@ use spacetimedb_table::{
4848
use std::borrow::Cow;
4949
use std::sync::Arc;
5050
use std::time::{Duration, Instant};
51-
use thiserror::Error;
5251

5352
pub type Result<T> = std::result::Result<T, DatastoreError>;
5453

@@ -171,13 +170,9 @@ impl Locking {
171170
/// The provided closure will be called for each transaction found in the
172171
/// history, the parameter is the transaction's offset. The closure is called
173172
/// _before_ the transaction is applied to the database state.
174-
pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<F> {
175-
Replay {
176-
database_identity: self.database_identity,
177-
committed_state: self.committed_state.clone(),
178-
progress: RefCell::new(progress),
179-
error_behavior,
180-
}
173+
pub fn replay<F: FnMut(u64)>(&self, progress: F, error_behavior: ErrorBehavior) -> Replay<'_, F> {
174+
let committed_state = self.committed_state.write();
175+
Replay::new(self.database_identity, committed_state, progress, error_behavior)
181176
}
182177

183178
/// Construct a new [`Locking`] datastore containing the state stored in `snapshot`.
@@ -999,18 +994,6 @@ impl Locking {
999994
}
1000995
}
1001996

1002-
#[derive(Debug, Error)]
1003-
pub enum ReplayError {
1004-
#[error("Expected tx offset {expected}, encountered {encountered}")]
1005-
InvalidOffset { expected: u64, encountered: u64 },
1006-
#[error(transparent)]
1007-
Decode(#[from] bsatn::DecodeError),
1008-
#[error(transparent)]
1009-
Db(#[from] DatastoreError),
1010-
#[error(transparent)]
1011-
Any(#[from] anyhow::Error),
1012-
}
1013-
1014997
/// Construct a [`Metadata`] from the given [`RowRef`],
1015998
/// reading only the columns necessary to construct the value.
1016999
fn metadata_from_row(row: RowRef<'_>) -> Result<Metadata> {
@@ -1041,7 +1024,6 @@ pub(crate) mod tests {
10411024
};
10421025
use crate::traits::{IsolationLevel, MutTx};
10431026
use crate::Result;
1044-
use bsatn::to_vec;
10451027
use core::{fmt, mem};
10461028
use itertools::Itertools;
10471029
use pretty_assertions::{assert_eq, assert_matches};
@@ -1053,7 +1035,7 @@ pub(crate) mod tests {
10531035
use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration};
10541036
use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId};
10551037
use spacetimedb_sats::algebraic_value::ser::value_serialize;
1056-
use spacetimedb_sats::bsatn::ToBsatn;
1038+
use spacetimedb_sats::bsatn::{to_vec, ToBsatn};
10571039
use spacetimedb_sats::layout::RowTypeLayout;
10581040
use spacetimedb_sats::raw_identifier::RawIdentifier;
10591041
use spacetimedb_sats::{product, AlgebraicType, GroundSpacetimeType, SumTypeVariant, SumValue};

crates/datastore/src/locking_tx_datastore/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub mod state_view;
99
pub use state_view::{IterByColEqTx, IterByColRangeTx};
1010
pub mod delete_table;
1111
mod replay;
12-
pub use replay::{ErrorBehavior, Replay};
12+
pub use replay::{apply_history, ApplyHistoryCounters, ErrorBehavior, Replay};
1313
mod tx;
1414
pub use tx::{NumDistinctValues, TxId};
1515
mod tx_state;

0 commit comments

Comments
 (0)