Skip to content

Commit afd3f35

Browse files
kimjsdtclockwork-labs-bot
authored
core: Bounded channel for durability worker (#4652)
Use a bounded channel for submitting transactions to the durability layer, so as to apply backpressure to the db when transaction volume is too high. --------- Co-authored-by: Jeffrey Dallatezza <jeffreydallatezza@gmail.com> Co-authored-by: clockwork-labs-bot <clockwork-labs-bot@users.noreply.github.com>
1 parent b98c68c commit afd3f35

5 files changed

Lines changed: 74 additions & 23 deletions

File tree

crates/core/src/db/durability.rs

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use tokio::{
1515
runtime,
1616
sync::{
1717
futures::OwnedNotified,
18-
mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
18+
mpsc::{self, channel, Receiver, Sender},
1919
oneshot, Notify,
2020
},
2121
time::timeout,
@@ -69,7 +69,7 @@ type ShutdownReply = oneshot::Sender<OwnedNotified>;
6969
/// [RelationalDB]: crate::db::relational_db::RelationalDB
7070
pub struct DurabilityWorker {
7171
database: Identity,
72-
request_tx: UnboundedSender<DurabilityRequest>,
72+
request_tx: Sender<DurabilityRequest>,
7373
shutdown: Sender<ShutdownReply>,
7474
durability: Arc<Durability>,
7575
runtime: runtime::Handle,
@@ -86,7 +86,7 @@ impl DurabilityWorker {
8686
next_tx_offset: TxOffset,
8787
reorder_window_size: NonZeroUsize,
8888
) -> Self {
89-
let (request_tx, request_rx) = unbounded_channel();
89+
let (request_tx, request_rx) = channel(4 * 4096);
9090
let (shutdown_tx, shutdown_rx) = channel(1);
9191

9292
let actor = DurabilityWorkerActor {
@@ -123,8 +123,8 @@ impl DurabilityWorker {
123123
/// this method is responsible only for reading its decision out of the `tx_data`
124124
/// and calling `durability.append_tx`.
125125
///
126-
/// This method does not block,
127-
/// and sends the work to an actor that collects data and calls `durability.append_tx`.
126+
/// This method sends the work to an actor that collects data and calls `durability.append_tx`.
127+
/// It blocks if the queue is at capacity.
128128
///
129129
/// # Panics
130130
///
@@ -135,12 +135,40 @@ impl DurabilityWorker {
135135
/// - [Self::shutdown] was called
136136
///
137137
pub fn request_durability(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
138-
self.request_tx
139-
.send(DurabilityRequest {
140-
reducer_context,
141-
tx_data: tx_data.clone(),
142-
})
143-
.unwrap_or_else(|_| panic!("durability actor vanished database={}", self.database));
138+
// We first try to send it without blocking.
139+
match self.request_tx.try_reserve() {
140+
Ok(permit) => {
141+
permit.send(DurabilityRequest {
142+
reducer_context,
143+
tx_data: tx_data.clone(),
144+
});
145+
}
146+
Err(mpsc::error::TrySendError::Closed(_)) => {
147+
panic!("durability actor vanished database={}", self.database);
148+
}
149+
Err(mpsc::error::TrySendError::Full(_)) => {
150+
// If the channel was full, we use the blocking version.
151+
let start = std::time::Instant::now();
152+
let send = || {
153+
self.request_tx.blocking_send(DurabilityRequest {
154+
reducer_context,
155+
tx_data: tx_data.clone(),
156+
})
157+
};
158+
if tokio::runtime::Handle::try_current().is_ok() {
159+
tokio::task::block_in_place(send)
160+
} else {
161+
send()
162+
}
163+
.unwrap_or_else(|_| panic!("durability actor vanished database={}", self.database));
164+
// We could cache this metric, but if we are already in the blocking code path,
165+
// the extra time of looking up the metric is probably negligible.
166+
WORKER_METRICS
167+
.durability_blocking_send_duration
168+
.with_label_values(&self.database)
169+
.observe(start.elapsed().as_secs_f64());
170+
}
171+
}
144172
}
145173

146174
/// Get the [`DurableOffset`] of this database.
@@ -281,8 +309,8 @@ impl<T> ReorderWindow<T> {
281309
}
282310
}
283311

284-
struct DurabilityWorkerActor {
285-
request_rx: UnboundedReceiver<DurabilityRequest>,
312+
pub struct DurabilityWorkerActor {
313+
request_rx: mpsc::Receiver<DurabilityRequest>,
286314
shutdown: Receiver<ShutdownReply>,
287315
durability: Arc<Durability>,
288316
reorder_window: ReorderWindow<DurabilityRequest>,
@@ -483,7 +511,7 @@ mod tests {
483511
}
484512
}
485513

486-
#[tokio::test]
514+
#[tokio::test(flavor = "multi_thread")]
487515
async fn shutdown_waits_until_durable() {
488516
let durability = Arc::new(CountingDurability::default());
489517
let worker = DurabilityWorker::new(

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4272,7 +4272,7 @@ mod tests {
42724272
Ok(())
42734273
}
42744274

4275-
#[tokio::test]
4275+
#[tokio::test(flavor = "multi_thread")]
42764276
async fn test_confirmed_reads() -> anyhow::Result<()> {
42774277
let (db, durability) = relational_db_with_manual_durability(tokio::runtime::Handle::current())?;
42784278

crates/core/src/worker_metrics/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,12 @@ metrics_group!(
481481
#[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)]
482482
pub subscription_queries_total: IntCounterVec,
483483

484+
#[name = spacetime_durability_blocking_send_duration_sec]
485+
#[help = "Latency of blocking sends in request_durability (seconds); _count gives the number of times the channel was full"]
486+
#[labels(database_identity: Identity)]
487+
#[buckets(0.001, 0.01, 0.1, 1.0, 10.0)]
488+
pub durability_blocking_send_duration: HistogramVec,
489+
484490
#[name = spacetime_durability_worker_reorder_window_length]
485491
#[help = "The number of transactions currently being held in the reorder window"]
486492
#[labels(db: Identity)]

crates/durability/src/imp/local.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ pub struct Options {
3838
/// transactions that are currently in the queue, but shrink the buffer to
3939
/// `batch_capacity` if it had to make additional space during a burst.
4040
///
41+
/// The internal queue of [Local] is bounded to `2 * batch_capacity`.
42+
///
4143
/// Default: 4096
4244
pub batch_capacity: NonZeroUsize,
4345
/// [`Commitlog`] configuration.
@@ -87,8 +89,8 @@ pub struct Local<T> {
8789
/// Backlog of transactions to be written to disk by the background
8890
/// [`PersisterTask`].
8991
///
90-
/// Note that this is unbounded!
91-
queue: mpsc::UnboundedSender<Transaction<Txdata<T>>>,
92+
/// The queue is bounded to `4 * Option::batch_capacity`.
93+
queue: mpsc::Sender<Transaction<Txdata<T>>>,
9294
/// How many transactions are sitting in the `queue`.
9395
///
9496
/// This is mainly for observability purposes, and can thus be updated with
@@ -126,7 +128,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
126128
opts.commitlog,
127129
on_new_segment,
128130
)?);
129-
let (queue, txdata_rx) = mpsc::unbounded_channel();
131+
let (queue, txdata_rx) = mpsc::channel(4 * opts.batch_capacity.get());
130132
let queue_depth = Arc::new(AtomicU64::new(0));
131133
let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset());
132134
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
@@ -207,7 +209,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
207209
#[instrument(name = "durability::local::actor", skip_all)]
208210
async fn run(
209211
self,
210-
mut transactions_rx: mpsc::UnboundedReceiver<Transaction<Txdata<T>>>,
212+
mut transactions_rx: mpsc::Receiver<Transaction<Txdata<T>>>,
211213
mut shutdown_rx: mpsc::Receiver<oneshot::Sender<OwnedNotified>>,
212214
) {
213215
info!("starting durability actor");
@@ -328,7 +330,22 @@ impl<T: Send + Sync + 'static> Durability for Local<T> {
328330
type TxData = Txdata<T>;
329331

330332
fn append_tx(&self, tx: Transaction<Self::TxData>) {
331-
self.queue.send(tx).expect("durability actor crashed");
333+
match self.queue.try_reserve() {
334+
Ok(permit) => permit.send(tx),
335+
Err(mpsc::error::TrySendError::Closed(_)) => {
336+
panic!("durability actor crashed");
337+
}
338+
Err(mpsc::error::TrySendError::Full(_)) => {
339+
let send = || self.queue.blocking_send(tx);
340+
if tokio::runtime::Handle::try_current().is_ok() {
341+
tokio::task::block_in_place(send)
342+
} else {
343+
send()
344+
}
345+
.expect("durability actor crashed");
346+
}
347+
}
348+
332349
self.queue_depth.fetch_add(1, Relaxed);
333350
}
334351

crates/durability/tests/io/fallocate.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use tokio::{sync::watch, time::sleep};
4343

4444
const MB: u64 = 1024 * 1024;
4545

46-
#[tokio::test]
46+
#[tokio::test(flavor = "multi_thread")]
4747
async fn local_durability_cannot_be_created_if_not_enough_space() -> anyhow::Result<()> {
4848
enable_logging();
4949

@@ -72,7 +72,7 @@ async fn local_durability_cannot_be_created_if_not_enough_space() -> anyhow::Res
7272
// NOTE: This test is set up to proceed more or less sequentially.
7373
// In reality, `append_tx` will fail at some point in the future.
7474
// I.e. transactions can be lost when the host runs out of disk space.
75-
#[tokio::test]
75+
#[tokio::test(flavor = "multi_thread")]
7676
#[should_panic = "durability actor crashed"]
7777
async fn local_durability_crashes_on_new_segment_if_not_enough_space() {
7878
enable_logging();
@@ -120,7 +120,7 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() {
120120
/// without `fallocate`.
121121
///
122122
/// Resuming a segment when there is insufficient space should fail.
123-
#[tokio::test]
123+
#[tokio::test(flavor = "multi_thread")]
124124
async fn local_durability_crashes_on_resume_with_insuffient_space() -> anyhow::Result<()> {
125125
enable_logging();
126126

0 commit comments

Comments
 (0)