Skip to content

Commit 6fced42

Browse files
committed
fixes
1 parent 269cf01 commit 6fced42

15 files changed

Lines changed: 363 additions & 64 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/client-api/src/routes/database.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ pub async fn call_from_database<S: ControlStateDelegate + NodeDelegate>(
287287
.await
288288
.map_err(client_connected_error_to_response)?;
289289

290-
let result = module
290+
let mut result = module
291291
.call_reducer_from_database(
292292
caller_identity,
293293
Some(connection_id),
@@ -301,6 +301,14 @@ pub async fn call_from_database<S: ControlStateDelegate + NodeDelegate>(
301301
)
302302
.await;
303303

304+
if let Ok(rcr) = result.as_mut()
305+
&& let Some(tx_offset) = rcr.tx_offset.as_mut()
306+
&& let Some(mut durable_offset) = module.durable_tx_offset()
307+
{
308+
let tx_offset = tx_offset.await.map_err(|_| log_and_500("transaction aborted"))?;
309+
durable_offset.wait_for(tx_offset).await.map_err(log_and_500)?;
310+
}
311+
304312
module
305313
.call_identity_disconnected(caller_identity, connection_id)
306314
.await
@@ -311,7 +319,7 @@ pub async fn call_from_database<S: ControlStateDelegate + NodeDelegate>(
311319
let (status, body) = match rcr.outcome {
312320
ReducerOutcome::Committed => (StatusCode::OK, "".into()),
313321
ReducerOutcome::Deduplicated => (StatusCode::OK, "deduplicated".into()),
314-
// 422 = reducer ran but returned Err; IDC runtime uses this to distinguish
322+
// 422 = reducer ran but returned Err; the IDC actor uses this to distinguish
315323
// reducer failures from transport errors (which it retries).
316324
ReducerOutcome::Failed(errmsg) => (StatusCode::UNPROCESSABLE_ENTITY, *errmsg),
317325
ReducerOutcome::BudgetExceeded => {

crates/core/src/host/host_controller.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::idc_runtime::{IdcRuntime, IdcRuntimeConfig, IdcRuntimeStarter, IdcSender};
1+
use super::idc_actor::{IdcActor, IdcActorConfig, IdcActorSender, IdcActorStarter};
22
use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule};
33
use super::scheduler::SchedulerStarter;
44
use super::wasmtime::WasmtimeRuntime;
@@ -133,11 +133,12 @@ impl HostRuntimes {
133133
}
134134
}
135135

136-
#[derive(Clone, Debug)]
136+
#[derive(Debug)]
137137
pub struct ReducerCallResult {
138138
pub outcome: ReducerOutcome,
139139
pub energy_used: EnergyQuanta,
140140
pub execution_duration: Duration,
141+
pub tx_offset: Option<TransactionOffset>,
141142
}
142143

143144
impl ReducerCallResult {
@@ -710,7 +711,7 @@ async fn make_module_host(
710711
runtimes: Arc<HostRuntimes>,
711712
replica_ctx: Arc<ReplicaContext>,
712713
scheduler: Scheduler,
713-
idc_sender: IdcSender,
714+
idc_sender: IdcActorSender,
714715
program: Program,
715716
energy_monitor: Arc<dyn EnergyMonitor>,
716717
unregister: impl Fn() + Send + Sync + 'static,
@@ -764,7 +765,7 @@ struct LaunchedModule {
764765
module_host: ModuleHost,
765766
scheduler: Scheduler,
766767
scheduler_starter: SchedulerStarter,
767-
idc_starter: IdcRuntimeStarter,
768+
idc_starter: IdcActorStarter,
768769
}
769770

770771
struct ModuleLauncher<F> {
@@ -801,7 +802,7 @@ impl<F: Fn() + Send + Sync + 'static> ModuleLauncher<F> {
801802
.await
802803
.map(Arc::new)?;
803804
let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db().clone());
804-
let (idc_starter, idc_sender) = IdcRuntime::open();
805+
let (idc_starter, idc_sender) = IdcActor::open();
805806
let (program, module_host) = make_module_host(
806807
self.runtimes.clone(),
807808
replica_ctx.clone(),
@@ -889,9 +890,9 @@ struct Host {
889890
/// Handle to the task responsible for cleaning up old views.
890891
/// The task is aborted when [`Host`] is dropped.
891892
view_cleanup_task: AbortHandle,
892-
/// IDC runtime: delivers outbound inter-database messages from `st_outbound_msg`.
893+
/// IDC actor: delivers outbound inter-database messages from `st_outbound_msg`.
893894
/// Stopped when [`Host`] is dropped.
894-
_idc_runtime: IdcRuntime,
895+
_idc_actor: IdcActor,
895896
}
896897

897898
impl Host {
@@ -1112,9 +1113,9 @@ impl Host {
11121113
let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
11131114
let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone());
11141115

1115-
let idc_runtime = idc_starter.start(
1116+
let idc_actor = idc_starter.start(
11161117
replica_ctx.relational_db().clone(),
1117-
IdcRuntimeConfig {
1118+
IdcActorConfig {
11181119
sender_identity: replica_ctx.database_identity,
11191120
},
11201121
module_host.downgrade(),
@@ -1129,7 +1130,7 @@ impl Host {
11291130
disk_metrics_recorder_task,
11301131
tx_metrics_recorder_task,
11311132
view_cleanup_task,
1132-
_idc_runtime: idc_runtime,
1133+
_idc_actor: idc_actor,
11331134
})
11341135
}
11351136

@@ -1202,7 +1203,7 @@ impl Host {
12021203
) -> anyhow::Result<UpdateDatabaseResult> {
12031204
let replica_ctx = &self.replica_ctx;
12041205
let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db().clone());
1205-
let (_idc_starter, idc_sender) = IdcRuntime::open();
1206+
let (_idc_starter, idc_sender) = IdcActor::open();
12061207

12071208
let (program, module) = make_module_host(
12081209
runtimes,
Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/// Inter-Database Communication (IDC) Runtime
1+
/// Inter-Database Communication (IDC) Actor
22
///
33
/// Background tokio task that:
44
/// 1. Loads undelivered entries from `st_outbound_msg` on startup, resolving delivery data from outbox tables.
@@ -29,47 +29,47 @@ const MAX_BACKOFF: Duration = Duration::from_secs(30);
2929
/// How long to wait before polling again when there is no work.
3030
const POLL_INTERVAL: Duration = Duration::from_millis(500);
3131

32-
/// A sender that notifies the IDC runtime of a new outbox row.
32+
/// A sender that notifies the IDC actor of a new outbox row.
3333
///
34-
/// Sending `()` wakes the runtime to deliver pending messages immediately
34+
/// Sending `()` wakes the actor to deliver pending messages immediately
3535
/// rather than waiting for the next poll cycle.
36-
pub type IdcSender = mpsc::UnboundedSender<()>;
36+
pub type IdcActorSender = mpsc::UnboundedSender<()>;
3737

38-
/// The identity of this (sender) database, set when the IDC runtime is started.
39-
pub struct IdcRuntimeConfig {
38+
/// The identity of this (sender) database, set when the IDC actor is started.
39+
pub struct IdcActorConfig {
4040
pub sender_identity: Identity,
4141
}
4242

43-
/// A handle that, when dropped, stops the IDC runtime background task.
44-
pub struct IdcRuntime {
43+
/// A handle that, when dropped, stops the IDC actor background task.
44+
pub struct IdcActor {
4545
_abort: tokio::task::AbortHandle,
4646
}
4747

48-
/// Holds the receiver side of the notification channel until the runtime is started.
48+
/// Holds the receiver side of the notification channel until the actor is started.
4949
///
5050
/// Mirrors the `SchedulerStarter` pattern: create the channel before the module is
51-
/// loaded (so the sender can be stored in `InstanceEnv`), then call [`IdcRuntimeStarter::start`]
51+
/// loaded (so the sender can be stored in `InstanceEnv`), then call [`IdcActorStarter::start`]
5252
/// once the DB is ready.
53-
pub struct IdcRuntimeStarter {
53+
pub struct IdcActorStarter {
5454
rx: mpsc::UnboundedReceiver<()>,
5555
}
5656

57-
impl IdcRuntimeStarter {
58-
/// Spawn the IDC runtime background task.
59-
pub fn start(self, db: Arc<RelationalDB>, config: IdcRuntimeConfig, module_host: WeakModuleHost) -> IdcRuntime {
57+
impl IdcActorStarter {
58+
/// Spawn the IDC actor background task.
59+
pub fn start(self, db: Arc<RelationalDB>, config: IdcActorConfig, module_host: WeakModuleHost) -> IdcActor {
6060
let abort = tokio::spawn(run_idc_loop(db, config, module_host, self.rx)).abort_handle();
61-
IdcRuntime { _abort: abort }
61+
IdcActor { _abort: abort }
6262
}
6363
}
6464

65-
impl IdcRuntime {
65+
impl IdcActor {
6666
/// Open the IDC channel, returning a starter and a sender.
6767
///
6868
/// Store the sender in `ModuleCreationContext` so it reaches `InstanceEnv`.
69-
/// After the module is ready, call [`IdcRuntimeStarter::start`] to spawn the loop.
70-
pub fn open() -> (IdcRuntimeStarter, IdcSender) {
69+
/// After the module is ready, call [`IdcActorStarter::start`] to spawn the loop.
70+
pub fn open() -> (IdcActorStarter, IdcActorSender) {
7171
let (tx, rx) = mpsc::unbounded_channel();
72-
(IdcRuntimeStarter { rx }, tx)
72+
(IdcActorStarter { rx }, tx)
7373
}
7474
}
7575

@@ -138,10 +138,10 @@ enum DeliveryOutcome {
138138
TransportError(String),
139139
}
140140

141-
/// Main IDC loop: maintain per-target queues and deliver messages.
141+
/// Main IDC actor loop: maintain per-target queues and deliver messages.
142142
async fn run_idc_loop(
143143
db: Arc<RelationalDB>,
144-
config: IdcRuntimeConfig,
144+
config: IdcActorConfig,
145145
module_host: WeakModuleHost,
146146
mut notify_rx: mpsc::UnboundedReceiver<()>,
147147
) {
@@ -169,7 +169,7 @@ async fn run_idc_loop(
169169
match outcome {
170170
DeliveryOutcome::TransportError(reason) => {
171171
log::warn!(
172-
"idc_runtime: transport error delivering msg_id={} to {}: {reason}",
172+
"idc_actor: transport error delivering msg_id={} to {}: {reason}",
173173
msg.msg_id,
174174
hex::encode(msg.target_db_identity.to_byte_array()),
175175
);
@@ -224,6 +224,9 @@ fn outcome_to_result(outcome: &DeliveryOutcome) -> (u8, String) {
224224
}
225225

226226
/// Finalize a delivered message: call the on_result reducer (if any), then delete from ST_OUTBOUND_MSG.
227+
///
228+
/// On the happy path, `on_result_reducer` success and deletion of `st_outbound_msg`
229+
/// are committed atomically in the same reducer transaction.
227230
async fn finalize_message(
228231
db: &RelationalDB,
229232
module_host: &WeakModuleHost,
@@ -235,7 +238,7 @@ async fn finalize_message(
235238
if let Some(on_result_reducer) = &msg.on_result_reducer {
236239
let Some(host) = module_host.upgrade() else {
237240
log::warn!(
238-
"idc_runtime: module host gone, cannot call on_result reducer '{}' for msg_id={}",
241+
"idc_actor: module host gone, cannot call on_result reducer '{}' for msg_id={}",
239242
on_result_reducer,
240243
msg.msg_id,
241244
);
@@ -249,7 +252,7 @@ async fn finalize_message(
249252
Ok(b) => b,
250253
Err(e) => {
251254
log::error!(
252-
"idc_runtime: failed to encode on_result args for msg_id={}: {e}",
255+
"idc_actor: failed to encode on_result args for msg_id={}: {e}",
253256
msg.msg_id
254257
);
255258
delete_message(db, msg.msg_id);
@@ -259,28 +262,30 @@ async fn finalize_message(
259262

260263
let caller_identity = Identity::ZERO; // system call
261264
let result = host
262-
.call_reducer(
265+
.call_reducer_delete_outbound_on_success(
263266
caller_identity,
264267
None, // no connection_id
265268
None, // no client sender
266269
None, // no request_id
267270
None, // no timer
268271
on_result_reducer,
269272
FunctionArgs::Bsatn(bytes::Bytes::from(args_bytes)),
273+
msg.msg_id,
270274
)
271275
.await;
272276

273277
match result {
274278
Ok(_) => {
275279
log::debug!(
276-
"idc_runtime: on_result reducer '{}' called for msg_id={}",
280+
"idc_actor: on_result reducer '{}' called for msg_id={}",
277281
on_result_reducer,
278282
msg.msg_id,
279283
);
284+
return;
280285
}
281286
Err(e) => {
282287
log::error!(
283-
"idc_runtime: on_result reducer '{}' failed for msg_id={}: {e:?}",
288+
"idc_actor: on_result reducer '{}' failed for msg_id={}: {e:?}",
284289
on_result_reducer,
285290
msg.msg_id,
286291
);
@@ -307,7 +312,7 @@ fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity,
307312
.collect()
308313
})
309314
.unwrap_or_else(|e| {
310-
log::error!("idc_runtime: failed to read st_outbound_msg: {e}");
315+
log::error!("idc_actor: failed to read st_outbound_msg: {e}");
311316
Vec::new()
312317
});
313318

@@ -321,7 +326,7 @@ fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity,
321326
Ok(s) => s,
322327
Err(e) => {
323328
log::error!(
324-
"idc_runtime: cannot find schema for outbox table {:?} (msg_id={}): {e}",
329+
"idc_actor: cannot find schema for outbox table {:?} (msg_id={}): {e}",
325330
outbox_table_id,
326331
st_row.msg_id,
327332
);
@@ -333,7 +338,7 @@ fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity,
333338
Some(o) => o,
334339
None => {
335340
log::error!(
336-
"idc_runtime: table {:?} (msg_id={}) is not an outbox table",
341+
"idc_actor: table {:?} (msg_id={}) is not an outbox table",
337342
schema.table_name,
338343
st_row.msg_id,
339344
);
@@ -351,7 +356,7 @@ fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity,
351356

352357
let Some(outbox_row_ref) = outbox_row else {
353358
log::error!(
354-
"idc_runtime: outbox row not found in table {:?} for row_id={} (msg_id={})",
359+
"idc_actor: outbox row not found in table {:?} for row_id={} (msg_id={})",
355360
outbox_table_id,
356361
st_row.row_id,
357362
st_row.msg_id,
@@ -366,7 +371,7 @@ fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity,
366371
Some(AlgebraicValue::U256(u)) => Identity::from_u256(**u),
367372
other => {
368373
log::error!(
369-
"idc_runtime: outbox row col 1 expected U256 (Identity), got {other:?} (msg_id={})",
374+
"idc_actor: outbox row col 1 expected U256 (Identity), got {other:?} (msg_id={})",
370375
st_row.msg_id,
371376
);
372377
continue;
@@ -409,7 +414,7 @@ fn load_pending_into_targets(db: &RelationalDB, targets: &mut HashMap<Identity,
409414
/// Attempt a single HTTP delivery of a message.
410415
async fn attempt_delivery(
411416
client: &reqwest::Client,
412-
config: &IdcRuntimeConfig,
417+
config: &IdcActorConfig,
413418
msg: &PendingMessage,
414419
) -> DeliveryOutcome {
415420
let target_db_hex = hex::encode(msg.target_db_identity.to_byte_array());
@@ -453,11 +458,11 @@ async fn attempt_delivery(
453458
fn delete_message(db: &RelationalDB, msg_id: u64) {
454459
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
455460
if let Err(e) = tx.delete_outbound_msg(msg_id) {
456-
log::error!("idc_runtime: failed to delete msg_id={msg_id}: {e}");
461+
log::error!("idc_actor: failed to delete msg_id={msg_id}: {e}");
457462
let _ = db.rollback_mut_tx(tx);
458463
return;
459464
}
460465
if let Err(e) = db.commit_tx(tx) {
461-
log::error!("idc_runtime: failed to commit delete for msg_id={msg_id}: {e}");
466+
log::error!("idc_actor: failed to commit delete for msg_id={msg_id}: {e}");
462467
}
463468
}

0 commit comments

Comments
 (0)