@@ -98,15 +98,15 @@ struct PendingMessage {
9898}
9999
100100/// Per-target-database delivery state.
101- struct TargetState {
101+ struct DatabaseQueue {
102102 queue : VecDeque < PendingMessage > ,
103103 /// When `Some`, this target is in backoff and should not be retried until this instant.
104104 blocked_until : Option < Instant > ,
105105 /// Current backoff duration for this target (doubles on each transport error).
106106 backoff : Duration ,
107107}
108108
109- impl TargetState {
109+ impl DatabaseQueue {
110110 fn new ( ) -> Self {
111111 Self {
112112 queue : VecDeque :: new ( ) ,
@@ -141,7 +141,7 @@ enum DeliveryOutcome {
141141 ReducerError ( String ) ,
142142 /// Budget exceeded (HTTP 402).
143143 BudgetExceeded ,
144- /// Transport error: network failure, unexpected HTTP status, etc. Caller should retry.
144+ /// Transport error: database/reducer not found, network failure, unexpected HTTP status, etc. Caller should retry.
145145 TransportError ( String ) ,
146146}
147147
@@ -155,21 +155,21 @@ async fn run_idc_loop(
155155 let client = reqwest:: Client :: new ( ) ;
156156
157157 // Per-target-database delivery state.
158- let mut targets : HashMap < Identity , TargetState > = HashMap :: new ( ) ;
158+ let mut db_queues : HashMap < Identity , DatabaseQueue > = HashMap :: new ( ) ;
159159
160160 // On startup, load any pending messages that survived a restart.
161- load_pending_into_targets ( & db, & mut targets ) ;
161+ load_pending_into_targets ( & db, & mut db_queues ) ;
162162
163163 loop {
164164 // Deliver one message per ready target, then re-check.
165165 let mut any_delivered = true ;
166166 while any_delivered {
167167 any_delivered = false ;
168- for state in targets . values_mut ( ) {
169- if !state . is_ready ( ) {
168+ for queue in db_queues . values_mut ( ) {
169+ if !queue . is_ready ( ) {
170170 continue ;
171171 }
172- let Some ( msg) = state . queue . front ( ) . cloned ( ) else {
172+ let Some ( msg) = queue . queue . front ( ) . cloned ( ) else {
173173 continue ;
174174 } ;
175175 let outcome = attempt_delivery ( & client, & config, & msg) . await ;
@@ -180,12 +180,12 @@ async fn run_idc_loop(
180180 msg. msg_id,
181181 msg. target_db_identity. to_hex( ) ,
182182 ) ;
183- state . record_transport_error ( ) ;
183+ queue . record_transport_error ( ) ;
184184 // Do NOT pop the front — keep retrying this message for this target.
185185 }
186186 outcome => {
187- state . queue . pop_front ( ) ;
188- state . record_success ( ) ;
187+ queue . queue . pop_front ( ) ;
188+ queue . record_success ( ) ;
189189 any_delivered = true ;
190190 let ( result_status, result_payload) = outcome_to_result ( & outcome) ;
191191 finalize_message ( & db, & module_host, & msg, result_status, result_payload) . await ;
@@ -195,7 +195,7 @@ async fn run_idc_loop(
195195 }
196196
197197 // Compute how long to sleep: min over all blocked targets' unblock times.
198- let next_unblock = targets
198+ let next_unblock = db_queues
199199 . values ( )
200200 . filter_map ( |s| s. blocked_until )
201201 . min ( )
@@ -204,15 +204,17 @@ async fn run_idc_loop(
204204
205205 // Wait for a notification or the next retry time.
206206 tokio:: select! {
207+ //TODO:(shub) optimise this to send new entry directly instead of calling
208+ //`load_pending_into_targets`
207209 _ = notify_rx. recv( ) => {
208210 // Drain all pending notifications (coalesce bursts).
209211 while notify_rx. try_recv( ) . is_ok( ) { }
210212 }
211213 _ = tokio:: time:: sleep( sleep_duration) => { }
212214 }
213215
214- // Reload pending messages from DB (catches anything missed and handles restart recovery ).
215- load_pending_into_targets ( & db, & mut targets ) ;
216+ // Reload pending messages from DB (catches new entries ).
217+ load_pending_into_targets ( & db, & mut db_queues ) ;
216218 }
217219}
218220
@@ -247,7 +249,7 @@ fn reducer_workload(module: &ModuleInfo, params: &CallReducerParams) -> Workload
247249 } )
248250}
249251
250- fn duplicate_result_from_row ( row : spacetimedb_datastore:: system_tables:: StInboundMsgRow ) -> ReducerCallResult {
252+ fn duplicate_result_from_st_inbound_row ( row : spacetimedb_datastore:: system_tables:: StInboundMsgRow ) -> ReducerCallResult {
251253 let outcome = match row. result_status {
252254 StInboundMsgResultStatus :: Success => ReducerOutcome :: Committed ,
253255 StInboundMsgResultStatus :: ReducerError => ReducerOutcome :: Failed ( Box :: new (
@@ -294,7 +296,7 @@ where
294296 if let Some ( row) = tx. get_inbound_msg_row ( sender_identity) {
295297 if sender_msg_id == row. last_outbound_msg {
296298 let _ = db. rollback_mut_tx ( tx) ;
297- return Ok ( ( duplicate_result_from_row ( row) , false ) ) ;
299+ return Ok ( ( duplicate_result_from_st_inbound_row ( row) , false ) ) ;
298300 }
299301 if sender_msg_id < row. last_outbound_msg {
300302 let expected = row. last_outbound_msg + 1 ;
@@ -426,6 +428,8 @@ async fn finalize_message(
426428 return ;
427429 }
428430 Err ( e) => {
431+
432+ delete_message ( db, msg. msg_id ) ;
429433 log:: error!(
430434 "idc_actor: on_result reducer '{}' failed for msg_id={}: {e:?}" ,
431435 on_result_reducer,
@@ -435,16 +439,14 @@ async fn finalize_message(
435439 }
436440 }
437441
438- // Delete the row regardless of whether on_result succeeded or failed.
439- delete_message ( db, msg. msg_id ) ;
440442}
441443
442444/// Load all messages from ST_OUTBOUND_MSG into the per-target queues, resolving delivery data
443445/// from the corresponding outbox table rows.
444446///
445447/// A row's presence in ST_OUTBOUND_MSG means it has not yet been processed.
446448/// Messages already in a target's queue (by msg_id) are not re-added.
447- fn load_pending_into_targets ( db : & RelationalDB , targets : & mut HashMap < Identity , TargetState > ) {
449+ fn load_pending_into_targets ( db : & RelationalDB , db_queues : & mut HashMap < Identity , DatabaseQueue > ) {
448450 let tx = db. begin_tx ( Workload :: Internal ) ;
449451
450452 let st_outbound_msg_rows: Vec < StOutboundMsgRow > = db
@@ -557,7 +559,7 @@ fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity,
557559 pending. sort_by_key ( |m| m. msg_id ) ;
558560
559561 for msg in pending {
560- let state = targets . entry ( msg. target_db_identity ) . or_insert_with ( TargetState :: new) ;
562+ let state = db_queues . entry ( msg. target_db_identity ) . or_insert_with ( DatabaseQueue :: new) ;
561563 // Only add if not already in the queue (avoid duplicates after reload).
562564 let already_queued = state. queue . iter ( ) . any ( |m| m. msg_id == msg. msg_id ) ;
563565 if !already_queued {
0 commit comments