Skip to content

Commit e34b8b2

Browse files
Drop datastore lock after requesting durability
1 parent 643cea5 commit e34b8b2

File tree

8 files changed

+99
-353
lines changed

8 files changed

+99
-353
lines changed

crates/core/src/db/durability.rs

Lines changed: 24 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
1+
use std::{sync::Arc, time::Duration};
22

33
use futures::TryFutureExt as _;
44
use log::{error, info};
5-
use prometheus::IntGauge;
65
use spacetimedb_commitlog::payload::{
76
txdata::{Mutations, Ops},
87
Txdata,
98
};
109
use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData};
1110
use spacetimedb_durability::Durability as _;
12-
use spacetimedb_durability::{DurableOffset, ReorderWindow, Transaction, TxOffset};
11+
use spacetimedb_durability::{DurableOffset, Transaction, TxOffset};
1312
use spacetimedb_lib::Identity;
1413
use spacetimedb_sats::ProductValue;
1514
use tokio::{
@@ -39,36 +38,8 @@ type ShutdownReply = oneshot::Sender<OwnedNotified>;
3938
/// Represents a handle to a background task that persists transactions
4039
/// according to the [`Durability`] policy provided.
4140
///
42-
/// This exists to avoid holding a transaction lock while
43-
/// preparing the [TxData] for processing by the [Durability] layer.
44-
///
45-
/// The durability worker is internal to [RelationalDB], which calls
46-
/// [DurabilityWorker::request_durability] after committing a transaction.
47-
///
48-
/// # Transaction ordering
49-
///
50-
/// The backing datastore of [RelationalDB] is responsible for creating a total
51-
/// ordering of transactions and must uphold that [TxOffset]s are monotonically
52-
/// increasing without gaps.
53-
///
54-
/// However, [RelationalDB::commit_tx] respectively [RelationalDB::commit_tx_downgrade]
55-
/// may be called from multiple threads. Because those methods are not
56-
/// synchronized, and release the transaction lock before requesting durability,
57-
/// it is possible for [DurabilityRequest]s to appear slightly out-of-order on
58-
/// the worker channel.
59-
///
60-
/// To mitigate this, the worker keeps a window of up to `reorder_window_size`
61-
/// requests if out-of-order requests are detected, and flushes it to the
62-
/// underlying durability layer once it is able to linearize the offset sequence.
63-
///
64-
/// Since we expect out-of-order requests to happen very rarely, this measure
65-
/// should not negatively impact throughput in the common case, unlike holding
66-
/// the transaction lock until request submission is complete.
67-
///
68-
/// Note that the commitlog rejects out-of-order commits, so if a missing offset
69-
/// arrives outside `reorder_window_size` (or never), already committed
70-
/// transactions may be lost (by way of the durability worker crashing).
71-
/// Those transactions will not be confirmed, however, so this is safe.
41+
/// The durability worker is internal to [RelationalDB], which uses
42+
/// [DurabilityWorker::request_durability] while finalizing a transaction.
7243
///
7344
/// [RelationalDB]: crate::db::relational_db::RelationalDB
7445
pub struct DurabilityWorker {
@@ -92,24 +63,14 @@ impl DurabilityWorker {
9263
/// Create a new [`DurabilityWorker`] using the given `durability` policy.
9364
///
9465
/// Background tasks will be spawned onto to provided tokio `runtime`.
95-
pub fn new(
96-
database: Identity,
97-
durability: Arc<Durability>,
98-
runtime: runtime::Handle,
99-
next_tx_offset: TxOffset,
100-
reorder_window_size: NonZeroUsize,
101-
) -> Self {
66+
pub fn new(database: Identity, durability: Arc<Durability>, runtime: runtime::Handle) -> Self {
10267
let (request_tx, request_rx) = channel(4 * 4096);
10368
let (shutdown_tx, shutdown_rx) = channel(1);
10469

10570
let actor = DurabilityWorkerActor {
10671
request_rx,
10772
shutdown: shutdown_rx,
10873
durability: durability.clone(),
109-
reorder_window: ReorderWindow::new(next_tx_offset, reorder_window_size),
110-
reorder_window_len: WORKER_METRICS
111-
.durability_worker_reorder_window_length
112-
.with_label_values(&database),
11374
};
11475
let _enter = runtime.enter();
11576
tokio::spawn(
@@ -129,9 +90,7 @@ impl DurabilityWorker {
12990
}
13091
}
13192

132-
/// Create a [`DurabilityWorker`] that uses the local commitlog durability
133-
/// actor directly. This removes the extra core durability actor so the
134-
/// local path has only one queued background worker.
93+
/// Create a [`DurabilityWorker`] that uses the local commitlog durability actor directly.
13594
pub fn new_local(database: Identity, durability: LocalDurability, runtime: runtime::Handle) -> Self {
13695
Self {
13796
database,
@@ -161,6 +120,15 @@ impl DurabilityWorker {
161120
/// - [Self::shutdown] was called
162121
///
163122
pub fn request_durability(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
123+
let Some(tx_offset) = tx_data.tx_offset() else {
124+
let name = reducer_context.as_ref().map(|rcx| &rcx.name);
125+
debug_assert!(
126+
!tx_data.has_rows_or_connect_disconnect(name),
127+
"tx_data has no rows but has connect/disconnect: `{name:?}`"
128+
);
129+
return;
130+
};
131+
164132
match &self.inner {
165133
DurabilityWorkerInner::Generic { request_tx, .. } => {
166134
// We first try to send it without blocking.
@@ -199,20 +167,10 @@ impl DurabilityWorker {
199167
}
200168
}
201169
DurabilityWorkerInner::Local { durability } => {
202-
let Some(tx_offset) = tx_data.tx_offset() else {
203-
let name = reducer_context.as_ref().map(|rcx| &rcx.name);
204-
debug_assert!(
205-
!tx_data.has_rows_or_connect_disconnect(name),
206-
"tx_data has no rows but has connect/disconnect: `{name:?}`"
207-
);
208-
return;
209-
};
210-
211170
let start = std::time::Instant::now();
212171
let tx_data = tx_data.clone();
213-
let blocked = durability.append_tx_deferred(tx_offset, move || {
214-
prepare_tx_data_for_durability(tx_offset, reducer_context, &tx_data)
215-
});
172+
let blocked = durability
173+
.append_tx_deferred(move || prepare_tx_data_for_durability(tx_offset, reducer_context, &tx_data));
216174
if blocked {
217175
WORKER_METRICS
218176
.durability_blocking_send_duration
@@ -302,20 +260,15 @@ pub struct DurabilityWorkerActor {
302260
request_rx: mpsc::Receiver<DurabilityRequest>,
303261
shutdown: Receiver<ShutdownReply>,
304262
durability: Arc<Durability>,
305-
reorder_window: ReorderWindow<DurabilityRequest>,
306-
reorder_window_len: IntGauge,
307263
}
308264

309265
impl DurabilityWorkerActor {
310266
/// Processes requests to do durability.
311267
async fn run(mut self) {
312268
// When this future completes or is cancelled, ensure that:
313269
// - shutdown waiters are notified
314-
// - metrics are reset
315-
let done = scopeguard::guard(Arc::new(Notify::new()), |done| {
316-
done.notify_waiters();
317-
self.reorder_window_len.set(0);
318-
});
270+
let done = scopeguard::guard(Arc::new(Notify::new()), |done| done.notify_waiters());
271+
let mut request_buf = Vec::with_capacity(4096);
319272

320273
loop {
321274
tokio::select! {
@@ -328,35 +281,16 @@ impl DurabilityWorkerActor {
328281
let _ = reply.send(done.clone().notified_owned());
329282
},
330283

331-
req = self.request_rx.recv() => {
332-
let Some(request) = req else {
284+
n = self.request_rx.recv_many(&mut request_buf, usize::MAX) => {
285+
if n == 0 {
333286
break;
334-
};
335-
match request.tx_data.tx_offset() {
336-
// Drop the request if it doesn't have a tx offset.
337-
None => {
338-
let name = request.reducer_context.as_ref().map(|rcx| &rcx.name);
339-
debug_assert!(
340-
!request.tx_data.has_rows_or_connect_disconnect(name),
341-
"tx_data has no rows but has connect/disconnect: `{name:?}`"
342-
);
343-
},
344-
// Otherwise, push to the reordering window.
345-
Some(tx_offset) => {
346-
if let Err(e) = self.reorder_window.push(tx_offset, request) {
347-
error!("{e}");
348-
break;
349-
}
350-
},
351287
}
352288
}
353289
}
354290

355-
// Drain all requests that are properly ordered.
356-
self.reorder_window
357-
.drain()
291+
request_buf
292+
.drain(..)
358293
.for_each(|request| Self::do_durability(&*self.durability, request.reducer_context, &request.tx_data));
359-
self.reorder_window_len.set(self.reorder_window.len() as _);
360294
}
361295

362296
info!("durability worker actor done");
@@ -483,13 +417,7 @@ mod tests {
483417
#[tokio::test(flavor = "multi_thread")]
484418
async fn shutdown_waits_until_durable() {
485419
let durability = Arc::new(CountingDurability::default());
486-
let worker = DurabilityWorker::new(
487-
Identity::ONE,
488-
durability.clone(),
489-
runtime::Handle::current(),
490-
0,
491-
NonZeroUsize::new(1).unwrap(),
492-
);
420+
let worker = DurabilityWorker::new(Identity::ONE, durability.clone(), runtime::Handle::current());
493421
for i in 0..=10 {
494422
let mut txdata = TxData::default();
495423
txdata.set_tx_offset(i);

crates/core/src/db/persistence.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ pub type DiskSizeFn = Arc<dyn Fn() -> io::Result<SizeOnDisk> + Send + Sync>;
3030
pub struct Persistence {
3131
/// The [Durability] to use, for persisting transactions.
3232
pub durability: Arc<Durability>,
33-
/// The concrete local durability handle, when the database uses the built-in
34-
/// local commitlog path. This allows the core DB layer to bypass helper
35-
/// actors on the local hot path without changing the generic trait object.
33+
/// TODO: Merge this local durability handle with the generic trait object above.
34+
/// This allows us to bypass an actor whose only responsibility is to generate a
35+
/// commitlog payload from `TxData`. Ultimately though this should just be a part
36+
/// of the [Durability] implementation itself.
3637
pub local_durability: Option<LocalDurability>,
3738
/// The [DiskSizeFn].
3839
///

crates/core/src/db/relational_db.rs

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ use spacetimedb_table::table::{RowRef, TableScanIter};
5858
use spacetimedb_table::table_index::IndexKey;
5959
use std::borrow::Cow;
6060
use std::io;
61-
use std::num::NonZeroUsize;
6261
use std::ops::RangeBounds;
6362
use std::sync::Arc;
6463
use tokio::sync::watch;
@@ -122,7 +121,6 @@ pub struct RelationalDB {
122121
// this value, later introduction of dynamic configuration will allow the
123122
// compiler to find external dependencies.
124123
pub const SNAPSHOT_FREQUENCY: u64 = 1_000_000;
125-
const GENERIC_DURABILITY_REORDER_WINDOW_SIZE: NonZeroUsize = NonZeroUsize::new(8).unwrap();
126124

127125
impl std::fmt::Debug for RelationalDB {
128126
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -157,21 +155,7 @@ impl RelationalDB {
157155
(Some(local_durability), _, Some(rt)) => {
158156
Some(DurabilityWorker::new_local(database_identity, local_durability, rt))
159157
}
160-
(None, Some(durability), Some(rt)) => {
161-
let next_tx_offset = {
162-
let tx = inner.begin_tx(Workload::Internal);
163-
let next_tx_offset = tx.tx_offset();
164-
let _ = inner.release_tx(tx);
165-
next_tx_offset.into_inner()
166-
};
167-
Some(DurabilityWorker::new(
168-
database_identity,
169-
durability,
170-
rt,
171-
next_tx_offset,
172-
GENERIC_DURABILITY_REORDER_WINDOW_SIZE,
173-
))
174-
}
158+
(None, Some(durability), Some(rt)) => Some(DurabilityWorker::new(database_identity, durability, rt)),
175159
_ => None,
176160
};
177161

@@ -826,33 +810,33 @@ impl RelationalDB {
826810

827811
let reducer_context = tx.ctx.reducer_context().cloned();
828812
// TODO: Never returns `None` -- should it?
829-
let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else {
813+
let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx_and_then(tx, |tx_data| {
814+
if let Some(durability) = &self.durability {
815+
durability.request_durability(reducer_context, tx_data);
816+
}
817+
})?
818+
else {
830819
return Ok(None);
831820
};
832821

833822
self.maybe_do_snapshot(&tx_data);
834823

835-
let tx_data = Arc::new(tx_data);
836-
if let Some(durability) = &self.durability {
837-
durability.request_durability(reducer_context, &tx_data);
838-
}
839-
840824
Ok(Some((tx_offset, tx_data, tx_metrics, reducer)))
841825
}
842826

843827
#[tracing::instrument(level = "trace", skip_all)]
844828
pub fn commit_tx_downgrade(&self, tx: MutTx, workload: Workload) -> (Arc<TxData>, TxMetrics, Tx) {
845829
log::trace!("COMMIT MUT TX");
846830

847-
let (tx_data, tx_metrics, tx) = self.inner.commit_mut_tx_downgrade(tx, workload);
831+
let reducer_context = tx.ctx.reducer_context().cloned();
832+
let (tx_data, tx_metrics, tx) = self.inner.commit_mut_tx_downgrade_and_then(tx, workload, |tx_data| {
833+
if let Some(durability) = &self.durability {
834+
durability.request_durability(reducer_context, tx_data);
835+
}
836+
});
848837

849838
self.maybe_do_snapshot(&tx_data);
850839

851-
let tx_data = Arc::new(tx_data);
852-
if let Some(durability) = &self.durability {
853-
durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data);
854-
}
855-
856840
(tx_data, tx_metrics, tx)
857841
}
858842

crates/core/src/worker_metrics/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -491,11 +491,6 @@ metrics_group!(
491491
#[labels(database_identity: Identity)]
492492
#[buckets(0.001, 0.01, 0.1, 1.0, 10.0)]
493493
pub durability_blocking_send_duration: HistogramVec,
494-
495-
#[name = spacetime_durability_worker_reorder_window_length]
496-
#[help = "The number of transactions currently being held in the reorder window"]
497-
#[labels(db: Identity)]
498-
pub durability_worker_reorder_window_length: IntGaugeVec,
499494
}
500495
);
501496

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,27 @@ impl Locking {
968968
pub fn commit_mut_tx_downgrade(&self, tx: MutTxId, workload: Workload) -> (TxData, TxMetrics, TxId) {
969969
tx.commit_downgrade(workload)
970970
}
971+
972+
/// Commit `tx` and invoke `before_release` while the write lock is still held.
973+
#[allow(clippy::type_complexity)]
974+
pub fn commit_mut_tx_and_then(
975+
&self,
976+
tx: MutTxId,
977+
before_release: impl FnOnce(&Arc<TxData>),
978+
) -> Result<Option<(TxOffset, Arc<TxData>, TxMetrics, Option<ReducerName>)>> {
979+
Ok(Some(tx.commit_and_then(before_release)))
980+
}
981+
982+
/// Commit `tx`, invoke `before_downgrade` while the write lock is still held,
983+
/// then downgrade the lock to a read-only transaction.
984+
pub fn commit_mut_tx_downgrade_and_then(
985+
&self,
986+
tx: MutTxId,
987+
workload: Workload,
988+
before_downgrade: impl FnOnce(&Arc<TxData>),
989+
) -> (Arc<TxData>, TxMetrics, TxId) {
990+
tx.commit_downgrade_and_then(workload, before_downgrade)
991+
}
971992
}
972993

973994
#[derive(Debug, Error)]

0 commit comments

Comments
 (0)