11use std:: {
22 io,
33 num:: NonZeroUsize ,
4- path:: PathBuf ,
54 sync:: {
65 atomic:: { AtomicU64 , Ordering :: Relaxed } ,
7- Arc ,
6+ Arc , Mutex ,
87 } ,
98} ;
109
11- use futures:: { FutureExt as _, TryFutureExt as _ } ;
10+ use futures:: FutureExt as _;
1211use itertools:: Itertools as _;
1312use log:: { info, trace, warn} ;
1413use scopeguard:: ScopeGuard ;
@@ -17,8 +16,8 @@ use spacetimedb_fs_utils::lockfile::advisory::{LockError, LockedFile};
1716use spacetimedb_paths:: server:: ReplicaDir ;
1817use thiserror:: Error ;
1918use tokio:: {
20- sync:: { futures :: OwnedNotified , mpsc , oneshot , watch, Notify } ,
21- task:: { spawn_blocking, AbortHandle } ,
19+ sync:: watch,
20+ task:: { spawn_blocking, JoinHandle } ,
2221} ;
2322use tracing:: { instrument, Span } ;
2423
@@ -73,8 +72,6 @@ pub enum OpenError {
7372 Commitlog ( #[ from] io:: Error ) ,
7473}
7574
76- type ShutdownReply = oneshot:: Sender < OwnedNotified > ;
77-
7875/// [`Durability`] implementation backed by a [`Commitlog`] on local storage.
7976///
8077/// The commitlog is constrained to store the canonical [`Txdata`] payload,
@@ -104,10 +101,9 @@ pub struct Local<T> {
104101 /// This is mainly for observability purposes, and can thus be updated with
105102 /// relaxed memory ordering.
106103 queue_depth : Arc < AtomicU64 > ,
107- /// Channel to request the actor to exit.
108- shutdown : mpsc:: Sender < ShutdownReply > ,
109- /// [AbortHandle] to force cancellation of the [Actor].
110- abort : AbortHandle ,
104+ /// [JoinHandle] for the actor task. Contains `None` if already cancelled
105+ /// (via [Durability::close]).
106+ actor : Mutex < Option < JoinHandle < ( ) > > > ,
111107}
112108
113109impl < T : Encode + Send + Sync + ' static > Local < T > {
@@ -129,7 +125,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
129125
130126 // We could just place a lock on the commitlog directory,
131127 // yet for backwards-compatibility, we keep using the `db.lock` file.
132- let lock = Lock :: create ( replica_dir. 0 . join ( "db.lock" ) ) ?;
128+ let lock = LockedFile :: lock ( replica_dir. 0 . join ( "db.lock" ) ) ?;
133129
134130 let clog = Arc :: new ( Commitlog :: open (
135131 replica_dir. commit_log ( ) ,
@@ -140,31 +136,27 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
140136 let ( queue, txdata_rx) = async_channel:: bounded ( queue_capacity) ;
141137 let queue_depth = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
142138 let ( durable_tx, durable_rx) = watch:: channel ( clog. max_committed_offset ( ) ) ;
143- let ( shutdown_tx, shutdown_rx) = mpsc:: channel ( 1 ) ;
144139
145- let abort = rt
146- . spawn (
147- Actor {
148- clog : clog. clone ( ) ,
140+ let actor = rt. spawn (
141+ Actor {
142+ clog : clog. clone ( ) ,
149143
150- durable_offset : durable_tx,
151- queue_depth : queue_depth. clone ( ) ,
144+ durable_offset : durable_tx,
145+ queue_depth : queue_depth. clone ( ) ,
152146
153- batch_capacity : opts. batch_capacity ,
147+ batch_capacity : opts. batch_capacity ,
154148
155- lock,
156- }
157- . run ( txdata_rx, shutdown_rx) ,
158- )
159- . abort_handle ( ) ;
149+ lock,
150+ }
151+ . run ( txdata_rx) ,
152+ ) ;
160153
161154 Ok ( Self {
162155 clog,
163156 durable_offset : durable_rx,
164157 queue,
165- shutdown : shutdown_tx,
166158 queue_depth,
167- abort ,
159+ actor : Mutex :: new ( Some ( actor ) ) ,
168160 } )
169161 }
170162
@@ -211,16 +203,12 @@ struct Actor<T> {
211203 batch_capacity : NonZeroUsize ,
212204
213205 #[ allow( unused) ]
214- lock : Lock ,
206+ lock : LockedFile ,
215207}
216208
217209impl < T : Encode + Send + Sync + ' static > Actor < T > {
218210 #[ instrument( name = "durability::local::actor" , skip_all) ]
219- async fn run (
220- self ,
221- transactions_rx : async_channel:: Receiver < PreparedTx < Txdata < T > > > ,
222- mut shutdown_rx : mpsc:: Receiver < oneshot:: Sender < OwnedNotified > > ,
223- ) {
211+ async fn run ( self , transactions_rx : async_channel:: Receiver < PreparedTx < Txdata < T > > > ) {
224212 info ! ( "starting durability actor" ) ;
225213
226214 let mut tx_buf = Vec :: with_capacity ( self . batch_capacity . get ( ) ) ;
@@ -229,50 +217,37 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
229217 let mut sync_on_exit = true ;
230218
231219 loop {
232- tokio:: select! {
233- // Biased towards the shutdown channel,
234- // so that we stop accepting new data promptly after
235- // `Durability::close` was called.
236- biased;
237-
238- Some ( reply) = shutdown_rx. recv( ) => {
239- transactions_rx. close( ) ;
240- let _ = reply. send( self . lock. notified( ) ) ;
241- } ,
242-
243- // Pop as many elements from the channel as possible,
244- // potentially requiring the `tx_buf` to allocate additional
245- // capacity.
246- // We'll reclaim capacity in excess of `self.batch_size` below.
247- n = recv_many( & transactions_rx, & mut tx_buf, usize :: MAX ) => {
248- if n == 0 {
249- break ;
250- }
251- if tx_buf. is_empty( ) {
252- continue ;
253- }
254-
255- let clog = self . clog. clone( ) ;
256- let ready_len = tx_buf. len( ) ;
257- self . queue_depth. fetch_sub( ready_len as u64 , Relaxed ) ;
258- tx_buf = spawn_blocking( move || -> io:: Result <Vec <PreparedTx <Txdata <T >>>> {
259- for tx in tx_buf. drain( ..) {
260- clog. commit( [ tx. into_transaction( ) ] ) ?;
261- }
262- Ok ( tx_buf)
263- } )
264- . await
265- . expect( "commitlog write panicked" )
266- . expect( "commitlog write failed" ) ;
267- if self . flush_and_sync( ) . await . is_err( ) {
268- sync_on_exit = false ;
269- break ;
270- }
271- // Reclaim burst capacity.
272- if n < self . batch_capacity. get( ) {
273- tx_buf. shrink_to( self . batch_capacity. get( ) ) ;
274- }
275- } ,
220+ // Pop as many elements from the channel as possible,
221+ // potentially requiring the `tx_buf` to allocate additional
222+ // capacity.
223+ // We'll reclaim capacity in excess of `self.batch_size` below.
224+ let n = recv_many ( & transactions_rx, & mut tx_buf, usize:: MAX ) . await ;
225+ if n == 0 {
226+ break ;
227+ }
228+ if tx_buf. is_empty ( ) {
229+ continue ;
230+ }
231+
232+ let clog = self . clog . clone ( ) ;
233+ let ready_len = tx_buf. len ( ) ;
234+ self . queue_depth . fetch_sub ( ready_len as u64 , Relaxed ) ;
235+ tx_buf = spawn_blocking ( move || -> io:: Result < Vec < PreparedTx < Txdata < T > > > > {
236+ for tx in tx_buf. drain ( ..) {
237+ clog. commit ( [ tx. into_transaction ( ) ] ) ?;
238+ }
239+ Ok ( tx_buf)
240+ } )
241+ . await
242+ . expect ( "commitlog write panicked" )
243+ . expect ( "commitlog write failed" ) ;
244+ if self . flush_and_sync ( ) . await . is_err ( ) {
245+ sync_on_exit = false ;
246+ break ;
247+ }
248+ // Reclaim burst capacity.
249+ if n < self . batch_capacity . get ( ) {
250+ tx_buf. shrink_to ( self . batch_capacity . get ( ) ) ;
276251 }
277252 }
278253
@@ -312,34 +287,6 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
312287 }
313288}
314289
315- struct Lock {
316- file : Option < LockedFile > ,
317- notify_on_drop : Arc < Notify > ,
318- }
319-
320- impl Lock {
321- pub fn create ( path : PathBuf ) -> Result < Self , LockError > {
322- let file = LockedFile :: lock ( path) . map ( Some ) ?;
323- let notify_on_drop = Arc :: new ( Notify :: new ( ) ) ;
324-
325- Ok ( Self { file, notify_on_drop } )
326- }
327-
328- pub fn notified ( & self ) -> OwnedNotified {
329- self . notify_on_drop . clone ( ) . notified_owned ( )
330- }
331- }
332-
333- impl Drop for Lock {
334- fn drop ( & mut self ) {
335- // Ensure the file lock is dropped before notifying.
336- if let Some ( file) = self . file . take ( ) {
337- drop ( file) ;
338- }
339- self . notify_on_drop . notify_waiters ( ) ;
340- }
341- }
342-
343290impl < T : Send + Sync + ' static > Durability for Local < T > {
344291 type TxData = Txdata < T > ;
345292
@@ -356,26 +303,27 @@ impl<T: Send + Sync + 'static> Durability for Local<T> {
356303 info ! ( "close local durability" ) ;
357304
358305 let durable_offset = self . durable_tx_offset ( ) ;
359- let shutdown = self . shutdown . clone ( ) ;
306+ let maybe_actor = self . actor . lock ( ) . unwrap ( ) . take ( ) ;
360307 // Abort actor if shutdown future is dropped.
361- let abort = scopeguard:: guard ( self . abort . clone ( ) , |actor| {
362- warn ! ( "close future dropped, aborting durability actor" ) ;
363- actor. abort ( ) ;
364- } ) ;
365-
308+ let abort = scopeguard:: guard (
309+ maybe_actor. as_ref ( ) . map ( |join_handle| join_handle. abort_handle ( ) ) ,
310+ |maybe_abort_handle| {
311+ if let Some ( abort_handle) = maybe_abort_handle {
312+ warn ! ( "close future dropped, aborting durability actor" ) ;
313+ abort_handle. abort ( ) ;
314+ }
315+ } ,
316+ ) ;
317+ self . queue . close ( ) ;
366318 async move {
367- let ( done_tx, done_rx) = oneshot:: channel ( ) ;
368- // Ignore channel errors - those just mean the actor is already gone.
369- let _ = shutdown
370- . send ( done_tx)
371- . map_err ( drop)
372- . and_then ( |( ) | done_rx. map_err ( drop) )
373- . and_then ( |done| async move {
374- done. await ;
375- Ok ( ( ) )
376- } )
377- . await ;
378- // Don't abort if we completed normally.
319+ if let Some ( actor) = maybe_actor
320+ && let Err ( e) = actor. await
321+ {
322+ // Will print "durability actor: task was cancelled"
323+ // or "durability actor: task panicked [...]"
324+ warn ! ( "durability actor: {e}" ) ;
325+ }
326+ // Don't abort if the actor completed.
379327 let _ = ScopeGuard :: into_inner ( abort) ;
380328
381329 durable_offset. last_seen ( )
0 commit comments