1- use std:: { sync:: Arc , time:: Duration } ;
1+ use std:: { cmp :: Reverse , collections :: BinaryHeap , iter , num :: NonZeroUsize , sync:: Arc , time:: Duration } ;
22
33use futures:: TryFutureExt as _;
44use log:: { error, info} ;
5+ use prometheus:: IntGauge ;
56use spacetimedb_commitlog:: payload:: {
67 txdata:: { Mutations , Ops } ,
78 Txdata ,
89} ;
910use spacetimedb_datastore:: { execution_context:: ReducerContext , traits:: TxData } ;
1011use spacetimedb_durability:: { DurableOffset , Transaction , TxOffset } ;
1112use spacetimedb_lib:: Identity ;
13+ use thiserror:: Error ;
1214use tokio:: {
1315 runtime,
1416 sync:: {
@@ -18,8 +20,9 @@ use tokio::{
1820 } ,
1921 time:: timeout,
2022} ;
23+ use tracing:: { info_span, Instrument as _} ;
2124
22- use crate :: db:: persistence:: Durability ;
25+ use crate :: { db:: persistence:: Durability , worker_metrics :: WORKER_METRICS } ;
2326
2427/// A request to persist a transaction or to terminate the actor.
2528pub struct DurabilityRequest {
@@ -34,6 +37,36 @@ type ShutdownReply = oneshot::Sender<OwnedNotified>;
3437///
3538/// This exists to avoid holding a transaction lock while
3639/// preparing the [TxData] for processing by the [Durability] layer.
40+ ///
41+ /// The durability worker is internal to [RelationalDB], which calls
42+ /// [DurabilityWorker::request_durability] after committing a transaction.
43+ ///
44+ /// # Transaction ordering
45+ ///
46+ /// The backing datastore of [RelationalDB] is responsible for creating a total
47+ /// ordering of transactions and must uphold that [TxOffset]s are monotonically
48+ /// increasing without gaps.
49+ ///
50+ /// However, [RelationalDB::commit_tx] respectively [RelationalDB::commit_tx_downgrade]
51+ /// may be called from multiple threads. Because those methods are not
52+ /// synchronized, and release the transaction lock before requesting durability,
53+ /// it is possible for [DurabilityRequest]s to appear slightly out-of-order on
54+ /// the worker channel.
55+ ///
56+ /// To mitigate this, the worker keeps a window of up to `reorder_window_size`
57+ /// requests if out-of-order requests are detected, and flushes it to the
58+ /// underlying durability layer once it is able to linearize the offset sequence.
59+ ///
60+ /// Since we expect out-of-order requests to happen very rarely, this measure
61+ /// should not negatively impact throughput in the common case, unlike holding
62+ /// the transaction lock until request submission is complete.
63+ ///
64+ /// Note that the commitlog rejects out-of-order commits, so if a missing offset
65+ /// arrives outside `reorder_window_size` (or never), already committed
66+ /// transactions may be lost (by way of the durability worker crashing).
67+ /// Those transactions will not be confirmed, however, so this is safe.
68+ ///
69+ /// [RelationalDB]: crate::db::relational_db::RelationalDB
3770pub struct DurabilityWorker {
3871 database : Identity ,
3972 request_tx : UnboundedSender < DurabilityRequest > ,
@@ -46,17 +79,31 @@ impl DurabilityWorker {
4679 /// Create a new [`DurabilityWorker`] using the given `durability` policy.
4780 ///
4881 /// Background tasks will be spawned onto to provided tokio `runtime`.
49- pub fn new ( database : Identity , durability : Arc < Durability > , runtime : runtime:: Handle ) -> Self {
82+ pub fn new (
83+ database : Identity ,
84+ durability : Arc < Durability > ,
85+ runtime : runtime:: Handle ,
86+ next_tx_offset : TxOffset ,
87+ reorder_window_size : NonZeroUsize ,
88+ ) -> Self {
5089 let ( request_tx, request_rx) = unbounded_channel ( ) ;
5190 let ( shutdown_tx, shutdown_rx) = channel ( 1 ) ;
5291
5392 let actor = DurabilityWorkerActor {
5493 request_rx,
5594 shutdown : shutdown_rx,
5695 durability : durability. clone ( ) ,
96+ reorder_window : ReorderWindow :: new ( next_tx_offset, reorder_window_size) ,
97+ reorder_window_len : WORKER_METRICS
98+ . durability_worker_reorder_window_length
99+ . with_label_values ( & database) ,
57100 } ;
58101 let _enter = runtime. enter ( ) ;
59- tokio:: spawn ( actor. run ( ) ) ;
102+ tokio:: spawn (
103+ actor
104+ . run ( )
105+ . instrument ( info_span ! ( "durability_worker" , database = %database) ) ,
106+ ) ;
60107
61108 Self {
62109 database,
@@ -162,16 +209,97 @@ impl DurabilityWorker {
162209 }
163210}
164211
165- pub struct DurabilityWorkerActor {
212+ #[ derive( Debug , Error ) ]
213+ enum ReorderError {
214+ #[ error( "reordering window exceeded" ) ]
215+ SizeExceeded ,
216+ #[ error( "transaction offset behind expected offset" ) ]
217+ TxBehind ,
218+ }
219+
220+ /// A bounded collection of elements ordered by [TxOffset], backed by a [BinaryHeap].
221+ ///
222+ /// This exists to tolerate slightly out-of-order requests.
223+ /// See the struct docs for [DurabilityWorker] for more context.
224+ struct ReorderWindow < T > {
225+ heap : BinaryHeap < Reverse < TxOrdered < T > > > ,
226+ next_tx : TxOffset ,
227+ max_len : NonZeroUsize ,
228+ }
229+
230+ impl < T > ReorderWindow < T > {
231+ pub fn new ( next_tx : TxOffset , max_len : NonZeroUsize ) -> Self {
232+ // We expect that requests usually arrive in order,
233+ // so allocate only a single element for the common case.
234+ let heap = BinaryHeap :: with_capacity ( 1 ) ;
235+ Self { heap, next_tx, max_len }
236+ }
237+
238+ /// Push a durability request onto the heap.
239+ ///
240+ /// # Errors
241+ ///
242+ /// The method returns an error if:
243+ ///
244+ /// - the window is full, i.e. `self.len() >= self.max_len`
245+ /// - the `tx_offset` of the request is smaller than the next expected offset
246+ ///
247+ pub fn push ( & mut self , req : TxOrdered < T > ) -> Result < ( ) , ReorderError > {
248+ if self . len ( ) >= self . max_len . get ( ) {
249+ return Err ( ReorderError :: SizeExceeded ) ;
250+ }
251+ if req. tx_offset < self . next_tx {
252+ return Err ( ReorderError :: TxBehind ) ;
253+ }
254+ // We've got an out-of-order request,
255+ // eagerly allocate the max capacity.
256+ if self . len ( ) > 0 {
257+ self . heap . reserve_exact ( self . max_len . get ( ) ) ;
258+ }
259+ self . heap . push ( Reverse ( req) ) ;
260+
261+ Ok ( ( ) )
262+ }
263+
264+ /// Remove all [DurabilityRequest]s in order, until a gap in the offset
265+ /// sequence is detected or the heap is empty.
266+ pub fn drain ( & mut self ) -> impl Iterator < Item = T > {
267+ iter:: from_fn ( || {
268+ let min_tx_offset = self . heap . peek ( ) . map ( |Reverse ( x) | x. tx_offset ) ;
269+ if min_tx_offset. is_some_and ( |tx_offset| tx_offset == self . next_tx ) {
270+ let Reverse ( TxOrdered { inner : request, .. } ) = self . heap . pop ( ) . unwrap ( ) ;
271+ self . next_tx += 1 ;
272+ Some ( request)
273+ } else {
274+ None
275+ }
276+ } )
277+ }
278+
279+ pub fn len ( & self ) -> usize {
280+ self . heap . len ( )
281+ }
282+ }
283+
284+ struct DurabilityWorkerActor {
166285 request_rx : UnboundedReceiver < DurabilityRequest > ,
167286 shutdown : Receiver < ShutdownReply > ,
168287 durability : Arc < Durability > ,
288+ reorder_window : ReorderWindow < DurabilityRequest > ,
289+ reorder_window_len : IntGauge ,
169290}
170291
171292impl DurabilityWorkerActor {
172293 /// Processes requests to do durability.
173294 async fn run ( mut self ) {
174- let done = scopeguard:: guard ( Arc :: new ( Notify :: new ( ) ) , |done| done. notify_waiters ( ) ) ;
295+ // When this future completes or is cancelled, ensure that:
296+ // - shutdown waiters are notified
297+ // - metrics are reset
298+ let done = scopeguard:: guard ( Arc :: new ( Notify :: new ( ) ) , |done| {
299+ done. notify_waiters ( ) ;
300+ self . reorder_window_len . set ( 0 ) ;
301+ } ) ;
302+
175303 loop {
176304 tokio:: select! {
177305 // Biased towards the shutdown channel,
@@ -184,26 +312,44 @@ impl DurabilityWorkerActor {
184312 } ,
185313
186314 req = self . request_rx. recv( ) => {
187- let Some ( DurabilityRequest { reducer_context , tx_data } ) = req else {
315+ let Some ( request ) = req else {
188316 break ;
189317 } ;
190- Self :: do_durability( & * self . durability, reducer_context, & tx_data) ;
318+ match request. tx_data. tx_offset( ) {
319+ // Drop the request if it doesn't have a tx offset.
320+ None => {
321+ let name = request. reducer_context. as_ref( ) . map( |rcx| & rcx. name) ;
322+ debug_assert!(
323+ !request. tx_data. has_rows_or_connect_disconnect( name) ,
324+ "tx_data has no rows but has connect/disconnect: `{name:?}`"
325+ ) ;
326+ } ,
327+ // Otherwise, push to the reordering window.
328+ Some ( tx_offset) => {
329+ let request = TxOrdered { tx_offset, inner: request } ;
330+ if let Err ( e) = self . reorder_window. push( request) {
331+ error!( "{e}" ) ;
332+ break ;
333+ }
334+ } ,
335+ }
191336 }
192337 }
338+
339+ // Drain all requests that are properly ordered.
340+ self . reorder_window
341+ . drain ( )
342+ . for_each ( |request| Self :: do_durability ( & * self . durability , request. reducer_context , & request. tx_data ) ) ;
343+ self . reorder_window_len . set ( self . reorder_window . len ( ) as _ ) ;
193344 }
194345
195346 info ! ( "durability worker actor done" ) ;
196347 }
197348
198349 pub fn do_durability ( durability : & Durability , reducer_context : Option < ReducerContext > , tx_data : & TxData ) {
199- let Some ( tx_offset) = tx_data. tx_offset ( ) else {
200- let name = reducer_context. as_ref ( ) . map ( |rcx| & rcx. name ) ;
201- debug_assert ! (
202- !tx_data. has_rows_or_connect_disconnect( name) ,
203- "tx_data has no rows but has connect/disconnect: `{name:?}`"
204- ) ;
205- return ;
206- } ;
350+ let tx_offset = tx_data
351+ . tx_offset ( )
352+ . expect ( "txs without offset should have been dropped" ) ;
207353
208354 let mut inserts: Box < _ > = tx_data
209355 . persistent_inserts ( )
@@ -248,6 +394,33 @@ impl DurabilityWorkerActor {
248394 }
249395}
250396
397+ /// Wrapper to sort [DurabilityRequest]s by [TxOffset].
398+ struct TxOrdered < T > {
399+ tx_offset : TxOffset ,
400+ inner : T ,
401+ }
402+
403+ impl < T > PartialEq for TxOrdered < T > {
404+ fn eq ( & self , other : & Self ) -> bool {
405+ self . tx_offset == other. tx_offset
406+ }
407+ }
408+
409+ impl < T > Eq for TxOrdered < T > { }
410+
411+ #[ allow( clippy:: non_canonical_partial_ord_impl) ]
412+ impl < T > PartialOrd for TxOrdered < T > {
413+ fn partial_cmp ( & self , other : & Self ) -> Option < std:: cmp:: Ordering > {
414+ Some ( self . tx_offset . cmp ( & other. tx_offset ) )
415+ }
416+ }
417+
418+ impl < T > Ord for TxOrdered < T > {
419+ fn cmp ( & self , other : & Self ) -> std:: cmp:: Ordering {
420+ self . partial_cmp ( other) . unwrap ( )
421+ }
422+ }
423+
251424#[ cfg( test) ]
252425mod tests {
253426 use std:: { pin:: pin, task:: Poll } ;
@@ -313,8 +486,13 @@ mod tests {
313486 #[ tokio:: test]
314487 async fn shutdown_waits_until_durable ( ) {
315488 let durability = Arc :: new ( CountingDurability :: default ( ) ) ;
316- let worker = DurabilityWorker :: new ( Identity :: ONE , durability. clone ( ) , runtime:: Handle :: current ( ) ) ;
317-
489+ let worker = DurabilityWorker :: new (
490+ Identity :: ONE ,
491+ durability. clone ( ) ,
492+ runtime:: Handle :: current ( ) ,
493+ 0 ,
494+ NonZeroUsize :: new ( 1 ) . unwrap ( ) ,
495+ ) ;
318496 for i in 0 ..=10 {
319497 let mut txdata = TxData :: default ( ) ;
320498 txdata. set_tx_offset ( i) ;
@@ -351,4 +529,68 @@ mod tests {
351529 "durability should have appended up to tx offset 10"
352530 ) ;
353531 }
532+
533+ #[ test]
534+ fn reorder_window_sorts_by_tx_offset ( ) {
535+ let mut win = ReorderWindow :: new ( 0 , NonZeroUsize :: new ( 5 ) . unwrap ( ) ) ;
536+
537+ for tx_offset in ( 0 ..5 ) . rev ( ) {
538+ win. push ( TxOrdered {
539+ tx_offset,
540+ inner : tx_offset,
541+ } )
542+ . unwrap ( ) ;
543+ }
544+
545+ let txs = win. drain ( ) . collect :: < Vec < _ > > ( ) ;
546+ assert_eq ! ( txs, & [ 0 , 1 , 2 , 3 , 4 ] ) ;
547+ }
548+
549+ #[ test]
550+ fn reorder_window_stops_drain_at_gap ( ) {
551+ let mut win = ReorderWindow :: new ( 0 , NonZeroUsize :: new ( 5 ) . unwrap ( ) ) ;
552+
553+ win. push ( TxOrdered { tx_offset : 4 , inner : 4 } ) . unwrap ( ) ;
554+ assert ! ( win. drain( ) . collect:: <Vec <_>>( ) . is_empty( ) ) ;
555+
556+ for tx_offset in 0 ..4 {
557+ win. push ( TxOrdered {
558+ tx_offset,
559+ inner : tx_offset,
560+ } )
561+ . unwrap ( ) ;
562+ }
563+
564+ let txs = win. drain ( ) . collect :: < Vec < _ > > ( ) ;
565+ assert_eq ! ( & txs, & [ 0 , 1 , 2 , 3 , 4 ] ) ;
566+ }
567+
568+ #[ test]
569+ fn reorder_window_error_when_full ( ) {
570+ let mut win = ReorderWindow :: new ( 0 , NonZeroUsize :: new ( 1 ) . unwrap ( ) ) ;
571+ win. push ( TxOrdered {
572+ tx_offset : 0 ,
573+ inner : ( ) ,
574+ } )
575+ . unwrap ( ) ;
576+ assert_matches ! (
577+ win. push( TxOrdered {
578+ tx_offset: 1 ,
579+ inner: ( )
580+ } ) ,
581+ Err ( ReorderError :: SizeExceeded )
582+ ) ;
583+ }
584+
585+ #[ test]
586+ fn reorder_window_error_on_late_request ( ) {
587+ let mut win = ReorderWindow :: new ( 1 , NonZeroUsize :: new ( 5 ) . unwrap ( ) ) ;
588+ assert_matches ! (
589+ win. push( TxOrdered {
590+ tx_offset: 0 ,
591+ inner: ( )
592+ } ) ,
593+ Err ( ReorderError :: TxBehind )
594+ ) ;
595+ }
354596}
0 commit comments