Skip to content

Commit 06bc9a9

Browse files
committed
idc runtime
1 parent 9d4f21e commit 06bc9a9

15 files changed

Lines changed: 783 additions & 92 deletions

File tree

crates/client-api/src/routes/database.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,9 @@ pub struct CallFromDatabaseParams {
230230
pub struct CallFromDatabaseQuery {
231231
/// Hex-encoded [`Identity`] of the sending database.
232232
sender_identity: String,
233-
/// The commitlog offset of the message on the sender side.
234-
/// Used for at-most-once delivery via `st_databases_tx_offset`.
235-
tx_offset: u64,
233+
/// The inter-database message ID from the sender's st_msg_id.
234+
/// Used for at-most-once delivery via `st_inbound_msg_id`.
235+
msg_id: u64,
236236
}
237237

238238
/// Call a reducer on behalf of another database, with deduplication.
@@ -241,10 +241,10 @@ pub struct CallFromDatabaseQuery {
241241
///
242242
/// Required query params:
243243
/// - `sender_identity` — hex-encoded identity of the sending database.
244-
/// - `tx_offset` — the sender's commitlog offset for this message.
244+
/// - `msg_id` — the inter-database message ID from the sender's st_msg_id.
245245
///
246-
/// Before invoking the reducer, the receiver checks `st_databases_tx_offset`.
247-
/// If the incoming `tx_offset` is ≤ the last delivered offset for `sender_identity`,
246+
/// Before invoking the reducer, the receiver checks `st_inbound_msg_id`.
247+
/// If the incoming `msg_id` is ≤ the last delivered msg_id for `sender_identity`,
248248
/// the call is a duplicate and 200 OK is returned immediately without running the reducer.
249249
/// Otherwise the reducer is invoked, the dedup index is updated atomically in the same
250250
/// transaction, and an acknowledgment is returned on success.
@@ -257,19 +257,22 @@ pub async fn call_from_database<S: ControlStateDelegate + NodeDelegate>(
257257
}): Path<CallFromDatabaseParams>,
258258
Query(CallFromDatabaseQuery {
259259
sender_identity,
260-
tx_offset,
260+
msg_id,
261261
}): Query<CallFromDatabaseQuery>,
262262
TypedHeader(content_type): TypedHeader<headers::ContentType>,
263-
ByteStringBody(body): ByteStringBody,
263+
body: axum::body::Bytes,
264264
) -> axum::response::Result<impl IntoResponse> {
265-
assert_content_type_json(content_type)?;
265+
// IDC callers send BSATN (application/octet-stream).
266+
if content_type != headers::ContentType::octet_stream() {
267+
return Err((StatusCode::UNSUPPORTED_MEDIA_TYPE, "Expected application/octet-stream").into());
268+
}
266269

267270
let caller_identity = auth.claims.identity;
268271

269272
let sender_identity = Identity::from_hex(&sender_identity)
270273
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid sender_identity: expected hex-encoded identity"))?;
271274

272-
let args = FunctionArgs::Json(body);
275+
let args = FunctionArgs::Bsatn(body);
273276
let connection_id = generate_random_connection_id();
274277

275278
let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?;
@@ -290,7 +293,7 @@ pub async fn call_from_database<S: ControlStateDelegate + NodeDelegate>(
290293
&reducer,
291294
args,
292295
sender_identity,
293-
tx_offset,
296+
msg_id,
294297
)
295298
.await;
296299

@@ -304,7 +307,9 @@ pub async fn call_from_database<S: ControlStateDelegate + NodeDelegate>(
304307
let (status, body) = match rcr.outcome {
305308
ReducerOutcome::Committed => (StatusCode::OK, "".into()),
306309
ReducerOutcome::Deduplicated => (StatusCode::OK, "deduplicated".into()),
307-
ReducerOutcome::Failed(errmsg) => (StatusCode::from_u16(530).unwrap(), *errmsg),
310+
// 422 = reducer ran but returned Err; IDC runtime uses this to distinguish
311+
// reducer failures from transport errors (which it retries).
312+
ReducerOutcome::Failed(errmsg) => (StatusCode::UNPROCESSABLE_ENTITY, *errmsg),
308313
ReducerOutcome::BudgetExceeded => {
309314
log::warn!(
310315
"Node's energy budget exceeded for identity: {owner_identity} while executing {reducer}"
@@ -1300,7 +1305,7 @@ pub struct DatabaseRoutes<S> {
13001305
pub subscribe_get: MethodRouter<S>,
13011306
/// POST: /database/:name_or_identity/call/:reducer
13021307
pub call_reducer_procedure_post: MethodRouter<S>,
1303-
/// POST: /database/:name_or_identity/call-from-database/:reducer?sender_identity=<hex>&tx_offset=<u64>
1308+
/// POST: /database/:name_or_identity/call-from-database/:reducer?sender_identity=<hex>&msg_id=<u64>
13041309
pub call_from_database_post: MethodRouter<S>,
13051310
/// GET: /database/:name_or_identity/schema
13061311
pub schema_get: MethodRouter<S>,

crates/core/src/host/host_controller.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use super::idc_runtime::{IdcRuntime, IdcRuntimeConfig, IdcRuntimeStarter, IdcSender};
12
use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule};
23
use super::scheduler::SchedulerStarter;
34
use super::wasmtime::WasmtimeRuntime;
@@ -709,6 +710,7 @@ async fn make_module_host(
709710
runtimes: Arc<HostRuntimes>,
710711
replica_ctx: Arc<ReplicaContext>,
711712
scheduler: Scheduler,
713+
idc_sender: IdcSender,
712714
program: Program,
713715
energy_monitor: Arc<dyn EnergyMonitor>,
714716
unregister: impl Fn() + Send + Sync + 'static,
@@ -724,6 +726,7 @@ async fn make_module_host(
724726
let mcc = ModuleCreationContext {
725727
replica_ctx,
726728
scheduler,
729+
idc_sender,
727730
program_hash: program.hash,
728731
energy_monitor,
729732
};
@@ -761,6 +764,7 @@ struct LaunchedModule {
761764
module_host: ModuleHost,
762765
scheduler: Scheduler,
763766
scheduler_starter: SchedulerStarter,
767+
idc_starter: IdcRuntimeStarter,
764768
}
765769

766770
struct ModuleLauncher<F> {
@@ -797,10 +801,12 @@ impl<F: Fn() + Send + Sync + 'static> ModuleLauncher<F> {
797801
.await
798802
.map(Arc::new)?;
799803
let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db().clone());
804+
let (idc_starter, idc_sender) = IdcRuntime::open();
800805
let (program, module_host) = make_module_host(
801806
self.runtimes.clone(),
802807
replica_ctx.clone(),
803808
scheduler.clone(),
809+
idc_sender.clone(),
804810
self.program,
805811
self.energy_monitor,
806812
self.on_panic,
@@ -817,6 +823,7 @@ impl<F: Fn() + Send + Sync + 'static> ModuleLauncher<F> {
817823
module_host,
818824
scheduler,
819825
scheduler_starter,
826+
idc_starter,
820827
},
821828
))
822829
}
@@ -882,6 +889,9 @@ struct Host {
882889
/// Handle to the task responsible for cleaning up old views.
883890
/// The task is aborted when [`Host`] is dropped.
884891
view_cleanup_task: AbortHandle,
892+
/// IDC runtime: delivers outbound inter-database messages from `st_msg_id`.
893+
/// Stopped when [`Host`] is dropped.
894+
_idc_runtime: IdcRuntime,
885895
}
886896

887897
impl Host {
@@ -1075,6 +1085,7 @@ impl Host {
10751085
module_host,
10761086
scheduler,
10771087
scheduler_starter,
1088+
idc_starter,
10781089
} = launched;
10791090

10801091
// Disconnect dangling clients.
@@ -1101,6 +1112,14 @@ impl Host {
11011112
let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
11021113
let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone());
11031114

1115+
let idc_runtime = idc_starter.start(
1116+
replica_ctx.relational_db().clone(),
1117+
IdcRuntimeConfig {
1118+
sender_identity: replica_ctx.database_identity,
1119+
},
1120+
module_host.downgrade(),
1121+
);
1122+
11041123
let module = watch::Sender::new(module_host);
11051124

11061125
Ok(Host {
@@ -1110,6 +1129,7 @@ impl Host {
11101129
disk_metrics_recorder_task,
11111130
tx_metrics_recorder_task,
11121131
view_cleanup_task,
1132+
_idc_runtime: idc_runtime,
11131133
})
11141134
}
11151135

@@ -1182,11 +1202,13 @@ impl Host {
11821202
) -> anyhow::Result<UpdateDatabaseResult> {
11831203
let replica_ctx = &self.replica_ctx;
11841204
let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db().clone());
1205+
let (_idc_starter, idc_sender) = IdcRuntime::open();
11851206

11861207
let (program, module) = make_module_host(
11871208
runtimes,
11881209
replica_ctx.clone(),
11891210
scheduler.clone(),
1211+
idc_sender,
11901212
program,
11911213
energy_monitor,
11921214
on_panic,

0 commit comments

Comments
 (0)