@@ -18,9 +18,7 @@ use anyhow::anyhow;
1818use bytes:: Bytes ;
1919use spacetimedb_datastore:: execution_context:: { ReducerContext , Workload } ;
2020use spacetimedb_datastore:: locking_tx_datastore:: MutTxId ;
21- use spacetimedb_datastore:: system_tables:: {
22- StInboundMsgResultStatus , StOutboundMsgRow , ST_OUTBOUND_MSG_ID ,
23- } ;
21+ use spacetimedb_datastore:: system_tables:: { StInboundMsgResultStatus , StOutboundMsgRow , ST_OUTBOUND_MSG_ID } ;
2422use spacetimedb_datastore:: traits:: IsolationLevel ;
2523use spacetimedb_lib:: { AlgebraicValue , Identity , ProductValue } ;
2624use spacetimedb_primitives:: { ColId , TableId } ;
@@ -249,11 +247,15 @@ fn reducer_workload(module: &ModuleInfo, params: &CallReducerParams) -> Workload
249247 } )
250248}
251249
252- fn duplicate_result_from_st_inbound_row ( row : spacetimedb_datastore:: system_tables:: StInboundMsgRow ) -> ReducerCallResult {
250+ fn duplicate_result_from_st_inbound_row (
251+ row : spacetimedb_datastore:: system_tables:: StInboundMsgRow ,
252+ ) -> ReducerCallResult {
253253 let outcome = match row. result_status {
254254 StInboundMsgResultStatus :: Success => ReducerOutcome :: Committed ,
255255 StInboundMsgResultStatus :: ReducerError => ReducerOutcome :: Failed ( Box :: new (
256- String :: from_utf8_lossy ( & row. result_payload ) . into_owned ( ) . into_boxed_str ( ) ,
256+ String :: from_utf8_lossy ( & row. result_payload )
257+ . into_owned ( )
258+ . into_boxed_str ( ) ,
257259 ) ) ,
258260 } ;
259261
@@ -344,11 +346,9 @@ where
344346 call_reducer (
345347 Some ( tx) ,
346348 params,
347- Box :: new ( move |tx, _reducer_return_value| {
348- match action {
349- ReducerSuccessActionKind :: DeleteOutboundMsg ( msg_id) => {
350- tx. delete_outbound_msg ( msg_id) . map_err ( |e| anyhow ! ( e) )
351- }
349+ Box :: new ( move |tx, _reducer_return_value| match action {
350+ ReducerSuccessActionKind :: DeleteOutboundMsg ( msg_id) => {
351+ tx. delete_outbound_msg ( msg_id) . map_err ( |e| anyhow ! ( e) )
352352 }
353353 } ) ,
354354 )
@@ -428,7 +428,6 @@ async fn finalize_message(
428428 return ;
429429 }
430430 Err ( e) => {
431-
432431 delete_message ( db, msg. msg_id ) ;
433432 log:: error!(
434433 "idc_actor: on_result reducer '{}' failed for msg_id={}: {e:?}" ,
@@ -438,7 +437,6 @@ async fn finalize_message(
438437 }
439438 }
440439 }
441-
442440}
443441
444442/// Load all messages from ST_OUTBOUND_MSG into the per-target queues, resolving delivery data
@@ -559,7 +557,9 @@ fn load_pending_into_targets(db: &RelationalDB, db_queues: &mut HashMap<Identity
559557 pending. sort_by_key ( |m| m. msg_id ) ;
560558
561559 for msg in pending {
562- let state = db_queues. entry ( msg. target_db_identity ) . or_insert_with ( DatabaseQueue :: new) ;
560+ let state = db_queues
561+ . entry ( msg. target_db_identity )
562+ . or_insert_with ( DatabaseQueue :: new) ;
563563 // Only add if not already in the queue (avoid duplicates after reload).
564564 let already_queued = state. queue . iter ( ) . any ( |m| m. msg_id == msg. msg_id ) ;
565565 if !already_queued {
0 commit comments