Skip to content

Commit a51854f

Browse files
Move payload conversion into Durability trait
And remove intermediate worker from generic durability path entirely.
1 parent e34b8b2 commit a51854f

File tree

9 files changed

+123
-530
lines changed

9 files changed

+123
-530
lines changed

crates/core/src/db/durability.rs

Lines changed: 39 additions & 407 deletions
Large diffs are not rendered by default.

crates/core/src/db/persistence.rs

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use spacetimedb_snapshot::SnapshotRepository;
99
use crate::{messages::control_db::Database, util::asyncify};
1010

1111
use super::{
12-
relational_db::{self, LocalDurability, Txdata},
12+
relational_db::{self, Txdata},
1313
snapshot::{self, SnapshotDatabaseState, SnapshotWorker},
1414
};
1515

@@ -30,11 +30,6 @@ pub type DiskSizeFn = Arc<dyn Fn() -> io::Result<SizeOnDisk> + Send + Sync>;
3030
pub struct Persistence {
3131
/// The [Durability] to use, for persisting transactions.
3232
pub durability: Arc<Durability>,
33-
/// TODO: Merge this local durability handle with the generic trait object above.
34-
/// This allows us to bypass an actor whose only responsibility is to generate a
35-
/// commitlog payload from `TxData`. Ultimately though this should just be a part
36-
/// of the [Durability] implementation itself.
37-
pub local_durability: Option<LocalDurability>,
3833
/// The [DiskSizeFn].
3934
///
4035
/// Currently the expectation is that the reported size is the commitlog
@@ -60,7 +55,6 @@ impl Persistence {
6055
) -> Self {
6156
Self {
6257
durability: Arc::new(durability),
63-
local_durability: None,
6458
disk_size: Arc::new(disk_size),
6559
snapshots,
6660
runtime,
@@ -89,34 +83,23 @@ impl Persistence {
8983

9084
/// Convenience to deconstruct an [Option<Self>] into parts.
9185
///
92-
/// Returns `(Some(durability), local_durability, Some(disk_size), Option<SnapshotWorker>, Some(runtime))`
93-
/// if `this` is `Some`, and `(None, None, None, None, None)` if `this` is `None`.
94-
#[allow(clippy::type_complexity)]
86+
/// Returns `(Some(durability), Some(disk_size), Option<SnapshotWorker>, Some(runtime))`
87+
/// if `this` is `Some`, and `(None, None, None, None)` if `this` is `None`.
9588
pub(super) fn unzip(
9689
this: Option<Self>,
9790
) -> (
9891
Option<Arc<Durability>>,
99-
Option<LocalDurability>,
10092
Option<DiskSizeFn>,
10193
Option<SnapshotWorker>,
10294
Option<tokio::runtime::Handle>,
10395
) {
10496
this.map(
10597
|Self {
10698
durability,
107-
local_durability,
10899
disk_size,
109100
snapshots,
110101
runtime,
111-
}| {
112-
(
113-
Some(durability),
114-
local_durability,
115-
Some(disk_size),
116-
snapshots,
117-
Some(runtime),
118-
)
119-
},
102+
}| (Some(durability), Some(disk_size), snapshots, Some(runtime)),
120103
)
121104
.unwrap_or_default()
122105
}
@@ -176,8 +159,7 @@ impl PersistenceProvider for LocalPersistenceProvider {
176159
));
177160

178161
Ok(Persistence {
179-
durability: durability.clone(),
180-
local_durability: Some(durability),
162+
durability,
181163
disk_size,
182164
snapshots: Some(snapshot_worker),
183165
runtime: tokio::runtime::Handle::current(),

crates/core/src/db/relational_db.rs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::db::durability::DurabilityWorker;
1+
use crate::db::durability::{request_durability, spawn_close as spawn_durability_close};
22
use crate::db::MetricsRecorderQueue;
33
use crate::error::{DBError, RestoreSnapshotError};
44
use crate::subscription::ExecutionCounters;
@@ -12,7 +12,7 @@ use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk};
1212
use spacetimedb_data_structures::map::HashSet;
1313
use spacetimedb_datastore::db_metrics::DB_METRICS;
1414
use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError};
15-
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
15+
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
1616
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1717
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1818
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
@@ -98,7 +98,8 @@ pub struct RelationalDB {
9898
owner_identity: Identity,
9999

100100
inner: Locking,
101-
durability: Option<DurabilityWorker>,
101+
durability: Option<Arc<Durability>>,
102+
durability_runtime: Option<tokio::runtime::Handle>,
102103
snapshot_worker: Option<SnapshotWorker>,
103104

104105
row_count_fn: RowCountFn,
@@ -133,8 +134,8 @@ impl std::fmt::Debug for RelationalDB {
133134
impl Drop for RelationalDB {
134135
fn drop(&mut self) {
135136
// Attempt to flush the outstanding transactions.
136-
if let Some(worker) = self.durability.take() {
137-
worker.spawn_close(self.database_identity);
137+
if let (Some(durability), Some(runtime)) = (self.durability.take(), self.durability_runtime.take()) {
138+
spawn_durability_close(durability, &runtime, self.database_identity);
138139
}
139140
}
140141
}
@@ -150,18 +151,12 @@ impl RelationalDB {
150151
let workload_type_to_exec_counters =
151152
Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity)));
152153

153-
let (durability, local_durability, disk_size_fn, snapshot_worker, rt) = Persistence::unzip(persistence);
154-
let durability = match (local_durability, durability, rt) {
155-
(Some(local_durability), _, Some(rt)) => {
156-
Some(DurabilityWorker::new_local(database_identity, local_durability, rt))
157-
}
158-
(None, Some(durability), Some(rt)) => Some(DurabilityWorker::new(database_identity, durability, rt)),
159-
_ => None,
160-
};
154+
let (durability, disk_size_fn, snapshot_worker, durability_runtime) = Persistence::unzip(persistence);
161155

162156
Self {
163157
inner,
164158
durability,
159+
durability_runtime,
165160
snapshot_worker,
166161

167162
database_identity,
@@ -811,9 +806,7 @@ impl RelationalDB {
811806
let reducer_context = tx.ctx.reducer_context().cloned();
812807
// TODO: Never returns `None` -- should it?
813808
let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx_and_then(tx, |tx_data| {
814-
if let Some(durability) = &self.durability {
815-
durability.request_durability(reducer_context, tx_data);
816-
}
809+
self.request_durability(reducer_context, tx_data);
817810
})?
818811
else {
819812
return Ok(None);
@@ -830,9 +823,7 @@ impl RelationalDB {
830823

831824
let reducer_context = tx.ctx.reducer_context().cloned();
832825
let (tx_data, tx_metrics, tx) = self.inner.commit_mut_tx_downgrade_and_then(tx, workload, |tx_data| {
833-
if let Some(durability) = &self.durability {
834-
durability.request_durability(reducer_context, tx_data);
835-
}
826+
self.request_durability(reducer_context, tx_data);
836827
});
837828

838829
self.maybe_do_snapshot(&tx_data);
@@ -848,6 +839,12 @@ impl RelationalDB {
848839
.map(|durability| durability.durable_tx_offset())
849840
}
850841

842+
fn request_durability(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
843+
if let Some(durability) = &self.durability {
844+
request_durability(durability.as_ref(), reducer_context, tx_data);
845+
}
846+
}
847+
851848
/// Decide based on the `committed_state.next_tx_offset`
852849
/// whether to request that the [`SnapshotWorker`] in `self` capture a snapshot of the database.
853850
///
@@ -1970,7 +1967,6 @@ pub mod tests_utils {
19701967

19711968
let persistence = Persistence {
19721969
durability: local.clone(),
1973-
local_durability: Some(local.clone()),
19741970
disk_size: disk_size_fn,
19751971
snapshots,
19761972
runtime: rt,
@@ -2092,7 +2088,6 @@ pub mod tests_utils {
20922088
let history = local.as_history();
20932089
let persistence = Persistence {
20942090
durability: local.clone(),
2095-
local_durability: Some(local.clone()),
20962091
disk_size: disk_size_fn,
20972092
snapshots,
20982093
runtime: rt,

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1900,7 +1900,7 @@ mod tests {
19001900
use spacetimedb_commitlog::{commitlog, repo};
19011901
use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap};
19021902
use spacetimedb_datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID};
1903-
use spacetimedb_durability::{Durability, EmptyHistory, Transaction, TxOffset};
1903+
use spacetimedb_durability::{Durability, EmptyHistory, TxOffset};
19041904
use spacetimedb_execution::dml::MutDatastore;
19051905
use spacetimedb_lib::bsatn::ToBsatn;
19061906
use spacetimedb_lib::db::auth::StAccess;
@@ -1987,7 +1987,8 @@ mod tests {
19871987
impl Durability for ManualDurability {
19881988
type TxData = Txdata;
19891989

1990-
fn append_tx(&self, tx: Transaction<Self::TxData>) {
1990+
fn append_tx(&self, tx: spacetimedb_durability::PreparedTx<Self::TxData>) {
1991+
let tx = tx.into_transaction();
19911992
let mut commitlog = self.commitlog.write().unwrap();
19921993
commitlog.commit([tx]).expect("commit failed");
19931994
commitlog.flush().expect("error flushing commitlog");
@@ -2043,7 +2044,6 @@ mod tests {
20432044
EmptyHistory::new(),
20442045
Some(Persistence {
20452046
durability: durability.clone(),
2046-
local_durability: None,
20472047
disk_size: Arc::new(|| Ok(<_>::default())),
20482048
snapshots: None,
20492049
runtime: rt,

crates/durability/src/imp/local.rs

Lines changed: 30 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use tokio::{
2222
};
2323
use tracing::{instrument, Span};
2424

25-
use crate::{Close, Durability, DurableOffset, History, TxOffset};
25+
use crate::{Close, Durability, DurableOffset, History, PreparedTx, TxOffset};
2626

2727
pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk};
2828

@@ -75,33 +75,6 @@ pub enum OpenError {
7575

7676
type ShutdownReply = oneshot::Sender<OwnedNotified>;
7777

78-
enum QueueItem<T> {
79-
Ready(Transaction<Txdata<T>>),
80-
Deferred { prepare: Box<dyn DeferredTx<T>> },
81-
}
82-
83-
impl<T> QueueItem<T> {
84-
fn prepare(self) -> Transaction<Txdata<T>> {
85-
match self {
86-
Self::Ready(tx) => tx,
87-
Self::Deferred { prepare } => prepare.prepare(),
88-
}
89-
}
90-
}
91-
92-
trait DeferredTx<T>: Send + Sync {
93-
fn prepare(self: Box<Self>) -> Transaction<Txdata<T>>;
94-
}
95-
96-
impl<T, F> DeferredTx<T> for F
97-
where
98-
F: FnOnce() -> Transaction<Txdata<T>> + Send + Sync + 'static,
99-
{
100-
fn prepare(self: Box<Self>) -> Transaction<Txdata<T>> {
101-
self()
102-
}
103-
}
104-
10578
/// [`Durability`] implementation backed by a [`Commitlog`] on local storage.
10679
///
10780
/// The commitlog is constrained to store the canonical [`Txdata`] payload,
@@ -124,7 +97,7 @@ pub struct Local<T> {
12497
///
12598
/// The queue is bounded to
12699
/// `Options::QUEUE_CAPACITY_MULTIPLIER * Options::batch_capacity`.
127-
queue: mpsc::Sender<QueueItem<T>>,
100+
queue: mpsc::Sender<PreparedTx<Txdata<T>>>,
128101
/// How many transactions are pending durability, including items buffered
129102
/// in the queue and items currently being written by the actor.
130103
///
@@ -245,7 +218,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
245218
#[instrument(name = "durability::local::actor", skip_all)]
246219
async fn run(
247220
self,
248-
mut transactions_rx: mpsc::Receiver<QueueItem<T>>,
221+
mut transactions_rx: mpsc::Receiver<PreparedTx<Txdata<T>>>,
249222
mut shutdown_rx: mpsc::Receiver<oneshot::Sender<OwnedNotified>>,
250223
) {
251224
info!("starting durability actor");
@@ -282,9 +255,9 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
282255
let clog = self.clog.clone();
283256
let ready_len = tx_buf.len();
284257
self.queue_depth.fetch_sub(ready_len as u64, Relaxed);
285-
tx_buf = spawn_blocking(move || -> io::Result<Vec<QueueItem<T>>> {
286-
for item in tx_buf.drain(..) {
287-
clog.commit([item.prepare()])?;
258+
tx_buf = spawn_blocking(move || -> io::Result<Vec<PreparedTx<Txdata<T>>>> {
259+
for tx in tx_buf.drain(..) {
260+
clog.commit([tx.into_transaction()])?;
288261
}
289262
Ok(tx_buf)
290263
})
@@ -370,8 +343,30 @@ impl Drop for Lock {
370343
impl<T: Send + Sync + 'static> Durability for Local<T> {
371344
type TxData = Txdata<T>;
372345

373-
fn append_tx(&self, tx: Transaction<Self::TxData>) {
374-
let _ = self.send_item(QueueItem::Ready(tx));
346+
fn append_tx(&self, tx: PreparedTx<Self::TxData>) {
347+
let mut tx = Some(tx);
348+
let blocked = match self.queue.try_reserve() {
349+
Ok(permit) => {
350+
permit.send(tx.take().expect("tx already sent"));
351+
false
352+
}
353+
Err(mpsc::error::TrySendError::Closed(_)) => {
354+
panic!("durability actor crashed");
355+
}
356+
Err(mpsc::error::TrySendError::Full(_)) => {
357+
let mut send = || self.queue.blocking_send(tx.take().expect("tx already sent"));
358+
if tokio::runtime::Handle::try_current().is_ok() {
359+
tokio::task::block_in_place(send)
360+
} else {
361+
send()
362+
}
363+
.expect("durability actor crashed");
364+
true
365+
}
366+
};
367+
368+
self.queue_depth.fetch_add(1, Relaxed);
369+
let _ = blocked;
375370
}
376371

377372
fn durable_tx_offset(&self) -> DurableOffset {
@@ -410,39 +405,6 @@ impl<T: Send + Sync + 'static> Durability for Local<T> {
410405
}
411406
}
412407

413-
impl<T: Send + Sync + 'static> Local<T> {
414-
fn send_item(&self, item: QueueItem<T>) -> bool {
415-
let blocked = match self.queue.try_reserve() {
416-
Ok(permit) => {
417-
permit.send(item);
418-
false
419-
}
420-
Err(mpsc::error::TrySendError::Closed(_)) => {
421-
panic!("durability actor crashed");
422-
}
423-
Err(mpsc::error::TrySendError::Full(_)) => {
424-
let send = || self.queue.blocking_send(item);
425-
if tokio::runtime::Handle::try_current().is_ok() {
426-
tokio::task::block_in_place(send)
427-
} else {
428-
send()
429-
}
430-
.expect("durability actor crashed");
431-
true
432-
}
433-
};
434-
435-
self.queue_depth.fetch_add(1, Relaxed);
436-
blocked
437-
}
438-
439-
pub fn append_tx_deferred(&self, prepare: impl FnOnce() -> Transaction<Txdata<T>> + Send + Sync + 'static) -> bool {
440-
self.send_item(QueueItem::Deferred {
441-
prepare: Box::new(prepare),
442-
})
443-
}
444-
}
445-
446408
impl<T: Encode + 'static> History for Commitlog<Txdata<T>> {
447409
type TxData = Txdata<T>;
448410

crates/durability/src/imp/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ mod testing {
1515
use futures::FutureExt as _;
1616
use tokio::sync::watch;
1717

18-
use crate::{Close, Durability, DurableOffset, Transaction, TxOffset};
18+
use crate::{Close, Durability, DurableOffset, PreparedTx, TxOffset};
1919

2020
/// A [`Durability`] impl that sends all transactions into the void.
2121
///
@@ -41,7 +41,7 @@ mod testing {
4141
impl<T: Send + Sync> Durability for NoDurability<T> {
4242
type TxData = T;
4343

44-
fn append_tx(&self, _: Transaction<Self::TxData>) {
44+
fn append_tx(&self, _: PreparedTx<Self::TxData>) {
4545
if self.closed.load(Ordering::Relaxed) {
4646
panic!("`close` was called on this `NoDurability` instance");
4747
}

0 commit comments

Comments
 (0)