Skip to content

Commit 9d4f21e

Browse files
committed
call_from_database endpoint
1 parent 7e84ae4 commit 9d4f21e

7 files changed

Lines changed: 292 additions & 13 deletions

File tree

crates/client-api/src/routes/database.rs

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,117 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
216216
}
217217
}
218218

219+
/// Path parameters for the `call_from_database` route.
220+
#[derive(Deserialize)]
221+
pub struct CallFromDatabaseParams {
222+
name_or_identity: NameOrIdentity,
223+
reducer: String,
224+
}
225+
226+
/// Query parameters for the `call_from_database` route.
227+
///
228+
/// Both fields are mandatory; a missing field results in a 400 Bad Request.
229+
#[derive(Deserialize)]
230+
pub struct CallFromDatabaseQuery {
231+
/// Hex-encoded [`Identity`] of the sending database.
232+
sender_identity: String,
233+
/// The commitlog offset of the message on the sender side.
234+
/// Used for at-most-once delivery via `st_databases_tx_offset`.
235+
tx_offset: u64,
236+
}
237+
238+
/// Call a reducer on behalf of another database, with deduplication.
239+
///
240+
/// Endpoint: `POST /database/:name_or_identity/call-from-database/:reducer`
241+
///
242+
/// Required query params:
243+
/// - `sender_identity` — hex-encoded identity of the sending database.
244+
/// - `tx_offset` — the sender's commitlog offset for this message.
245+
///
246+
/// Before invoking the reducer, the receiver checks `st_databases_tx_offset`.
247+
/// If the incoming `tx_offset` is ≤ the last delivered offset for `sender_identity`,
248+
/// the call is a duplicate and 200 OK is returned immediately without running the reducer.
249+
/// Otherwise the reducer is invoked, the dedup index is updated atomically in the same
250+
/// transaction, and an acknowledgment is returned on success.
251+
pub async fn call_from_database<S: ControlStateDelegate + NodeDelegate>(
252+
State(worker_ctx): State<S>,
253+
Extension(auth): Extension<SpacetimeAuth>,
254+
Path(CallFromDatabaseParams {
255+
name_or_identity,
256+
reducer,
257+
}): Path<CallFromDatabaseParams>,
258+
Query(CallFromDatabaseQuery {
259+
sender_identity,
260+
tx_offset,
261+
}): Query<CallFromDatabaseQuery>,
262+
TypedHeader(content_type): TypedHeader<headers::ContentType>,
263+
ByteStringBody(body): ByteStringBody,
264+
) -> axum::response::Result<impl IntoResponse> {
265+
assert_content_type_json(content_type)?;
266+
267+
let caller_identity = auth.claims.identity;
268+
269+
let sender_identity = Identity::from_hex(&sender_identity)
270+
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid sender_identity: expected hex-encoded identity"))?;
271+
272+
let args = FunctionArgs::Json(body);
273+
let connection_id = generate_random_connection_id();
274+
275+
let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?;
276+
277+
// Call client_connected, if defined.
278+
module
279+
.call_identity_connected(auth.into(), connection_id)
280+
.await
281+
.map_err(client_connected_error_to_response)?;
282+
283+
let result = module
284+
.call_reducer_from_database(
285+
caller_identity,
286+
Some(connection_id),
287+
None,
288+
None,
289+
None,
290+
&reducer,
291+
args,
292+
sender_identity,
293+
tx_offset,
294+
)
295+
.await;
296+
297+
module
298+
.call_identity_disconnected(caller_identity, connection_id)
299+
.await
300+
.map_err(client_disconnected_error_to_response)?;
301+
302+
match result {
303+
Ok(rcr) => {
304+
let (status, body) = match rcr.outcome {
305+
ReducerOutcome::Committed => (StatusCode::OK, "".into()),
306+
ReducerOutcome::Deduplicated => (StatusCode::OK, "deduplicated".into()),
307+
ReducerOutcome::Failed(errmsg) => (StatusCode::from_u16(530).unwrap(), *errmsg),
308+
ReducerOutcome::BudgetExceeded => {
309+
log::warn!(
310+
"Node's energy budget exceeded for identity: {owner_identity} while executing {reducer}"
311+
);
312+
(StatusCode::PAYMENT_REQUIRED, "Module energy budget exhausted.".into())
313+
}
314+
};
315+
Ok((
316+
status,
317+
TypedHeader(SpacetimeEnergyUsed(rcr.energy_used)),
318+
TypedHeader(SpacetimeExecutionDurationMicros(rcr.execution_duration)),
319+
body,
320+
)
321+
.into_response())
322+
}
323+
Err(e) => {
324+
let (status, msg) = map_reducer_error(e, &reducer);
325+
Err((status, msg).into())
326+
}
327+
}
328+
}
329+
219330
fn assert_content_type_json(content_type: headers::ContentType) -> axum::response::Result<()> {
220331
if content_type != headers::ContentType::json() {
221332
Err(axum::extract::rejection::MissingJsonContentType::default().into())
@@ -230,7 +341,7 @@ fn reducer_outcome_response(
230341
outcome: ReducerOutcome,
231342
) -> (StatusCode, Box<str>) {
232343
match outcome {
233-
ReducerOutcome::Committed => (StatusCode::OK, "".into()),
344+
ReducerOutcome::Committed | ReducerOutcome::Deduplicated => (StatusCode::OK, "".into()),
234345
ReducerOutcome::Failed(errmsg) => {
235346
// TODO: different status code? this is what cloudflare uses, sorta
236347
(StatusCode::from_u16(530).unwrap(), *errmsg)
@@ -1189,6 +1300,8 @@ pub struct DatabaseRoutes<S> {
11891300
pub subscribe_get: MethodRouter<S>,
11901301
/// POST: /database/:name_or_identity/call/:reducer
11911302
pub call_reducer_procedure_post: MethodRouter<S>,
1303+
/// POST: /database/:name_or_identity/call-from-database/:reducer?sender_identity=<hex>&tx_offset=<u64>
1304+
pub call_from_database_post: MethodRouter<S>,
11921305
/// GET: /database/:name_or_identity/schema
11931306
pub schema_get: MethodRouter<S>,
11941307
/// GET: /database/:name_or_identity/logs
@@ -1220,6 +1333,7 @@ where
12201333
identity_get: get(get_identity::<S>),
12211334
subscribe_get: get(handle_websocket::<S>),
12221335
call_reducer_procedure_post: post(call::<S>),
1336+
call_from_database_post: post(call_from_database::<S>),
12231337
schema_get: get(schema::<S>),
12241338
logs_get: get(logs::<S>),
12251339
sql_post: post(sql::<S>),
@@ -1245,6 +1359,7 @@ where
12451359
.route("/identity", self.identity_get)
12461360
.route("/subscribe", self.subscribe_get)
12471361
.route("/call/:reducer", self.call_reducer_procedure_post)
1362+
.route("/call-from-database/:reducer", self.call_from_database_post)
12481363
.route("/schema", self.schema_get)
12491364
.route("/logs", self.logs_get)
12501365
.route("/sql", self.sql_post)

crates/core/src/host/host_controller.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,19 +160,22 @@ pub enum ReducerOutcome {
160160
Committed,
161161
Failed(Box<Box<str>>),
162162
BudgetExceeded,
163+
/// The call was identified as a duplicate via the `st_databases_tx_offset` dedup index
164+
/// and was discarded without running the reducer.
165+
Deduplicated,
163166
}
164167

165168
impl ReducerOutcome {
166169
pub fn into_result(self) -> anyhow::Result<()> {
167170
match self {
168-
Self::Committed => Ok(()),
171+
Self::Committed | Self::Deduplicated => Ok(()),
169172
Self::Failed(e) => Err(anyhow::anyhow!(e)),
170173
Self::BudgetExceeded => Err(anyhow::anyhow!("reducer ran out of energy")),
171174
}
172175
}
173176

174177
pub fn is_err(&self) -> bool {
175-
!matches!(self, Self::Committed)
178+
!matches!(self, Self::Committed | Self::Deduplicated)
176179
}
177180
}
178181

crates/core/src/host/module_host.rs

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ pub fn call_identity_connected(
589589
// then insert into `st_client`,
590590
// but if we crashed in between, we'd be left in an inconsistent state
591591
// where the reducer had run but `st_client` was not yet updated.
592-
ReducerOutcome::Committed => Ok(()),
592+
ReducerOutcome::Committed | ReducerOutcome::Deduplicated => Ok(()),
593593

594594
// If the reducer returned an error or couldn't run due to insufficient energy,
595595
// abort the connection: the module code has decided it doesn't want this client.
@@ -624,6 +624,15 @@ pub struct CallReducerParams {
624624
pub timer: Option<Instant>,
625625
pub reducer_id: ReducerId,
626626
pub args: ArgsTuple,
627+
/// If set, enables at-most-once delivery semantics for database-to-database calls.
628+
///
629+
/// The tuple is `(sender_database_identity, sender_tx_offset)`.
630+
/// Before running the reducer, `st_databases_tx_offset` is consulted:
631+
/// if `sender_tx_offset` ≤ the stored last-delivered offset for the sender,
632+
/// the call is a duplicate and returns [`ReducerCallError::Deduplicated`].
633+
/// Otherwise the reducer runs, and the stored offset is updated atomically
634+
/// within the same transaction.
635+
pub dedup_sender: Option<(Identity, u64)>,
627636
}
628637

629638
impl CallReducerParams {
@@ -644,6 +653,7 @@ impl CallReducerParams {
644653
timer: None,
645654
reducer_id,
646655
args,
656+
dedup_sender: None,
647657
}
648658
}
649659
}
@@ -1490,9 +1500,9 @@ impl ModuleHost {
14901500
..
14911501
}) => fallback(),
14921502

1493-
// If it succeeded, as mentioned above, `st_client` is already updated.
1503+
// If it succeeded (or was deduplicated), as mentioned above, `st_client` is already updated.
14941504
Ok(ReducerCallResult {
1495-
outcome: ReducerOutcome::Committed,
1505+
outcome: ReducerOutcome::Committed | ReducerOutcome::Deduplicated,
14961506
..
14971507
}) => Ok(()),
14981508
}
@@ -1567,6 +1577,7 @@ impl ModuleHost {
15671577
timer,
15681578
reducer_id,
15691579
args,
1580+
dedup_sender: None,
15701581
})
15711582
}
15721583

@@ -1594,6 +1605,7 @@ impl ModuleHost {
15941605
timer,
15951606
reducer_id,
15961607
args,
1608+
dedup_sender: None,
15971609
};
15981610

15991611
self.call(
@@ -1655,6 +1667,80 @@ impl ModuleHost {
16551667
res
16561668
}
16571669

1670+
/// Variant of [`Self::call_reducer`] for database-to-database calls.
1671+
///
1672+
/// Behaves identically to `call_reducer`, except that it enforces at-most-once
1673+
/// delivery using the `st_databases_tx_offset` dedup index.
1674+
/// Before invoking the reducer, the receiver checks whether
1675+
/// `sender_tx_offset` ≤ the last delivered offset for `sender_database_identity`.
1676+
/// If so, the call is a duplicate and [`ReducerOutcome::Deduplicated`] is returned
1677+
/// without running the reducer.
1678+
/// Otherwise the reducer runs, and the dedup index is updated atomically
1679+
/// within the same transaction.
1680+
pub async fn call_reducer_from_database(
1681+
&self,
1682+
caller_identity: Identity,
1683+
caller_connection_id: Option<ConnectionId>,
1684+
client: Option<Arc<ClientConnectionSender>>,
1685+
request_id: Option<RequestId>,
1686+
timer: Option<Instant>,
1687+
reducer_name: &str,
1688+
args: FunctionArgs,
1689+
sender_database_identity: Identity,
1690+
sender_tx_offset: u64,
1691+
) -> Result<ReducerCallResult, ReducerCallError> {
1692+
let res = async {
1693+
let (reducer_id, reducer_def) = self
1694+
.info
1695+
.module_def
1696+
.reducer_full(reducer_name)
1697+
.ok_or(ReducerCallError::NoSuchReducer)?;
1698+
if let Some(lifecycle) = reducer_def.lifecycle {
1699+
return Err(ReducerCallError::LifecycleReducer(lifecycle));
1700+
}
1701+
1702+
if reducer_def.visibility.is_private() && !self.is_database_owner(caller_identity) {
1703+
return Err(ReducerCallError::NoSuchReducer);
1704+
}
1705+
1706+
let args = args
1707+
.into_tuple_for_def(&self.info.module_def, reducer_def)
1708+
.map_err(InvalidReducerArguments)?;
1709+
let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
1710+
let call_reducer_params = CallReducerParams {
1711+
timestamp: Timestamp::now(),
1712+
caller_identity,
1713+
caller_connection_id,
1714+
client,
1715+
request_id,
1716+
timer,
1717+
reducer_id,
1718+
args,
1719+
dedup_sender: Some((sender_database_identity, sender_tx_offset)),
1720+
};
1721+
1722+
self.call(
1723+
&reducer_def.name,
1724+
call_reducer_params,
1725+
async |p, inst| Ok(inst.call_reducer(p)),
1726+
async |p, inst| inst.call_reducer(p).await,
1727+
)
1728+
.await?
1729+
}
1730+
.await;
1731+
1732+
let log_message = match &res {
1733+
Err(ReducerCallError::NoSuchReducer) => Some(no_such_function_log_message("reducer", reducer_name)),
1734+
Err(ReducerCallError::Args(_)) => Some(args_error_log_message("reducer", reducer_name)),
1735+
_ => None,
1736+
};
1737+
if let Some(log_message) = log_message {
1738+
self.inject_logs(LogLevel::Error, reducer_name, &log_message)
1739+
}
1740+
1741+
res
1742+
}
1743+
16581744
pub async fn call_view_add_single_subscription(
16591745
&self,
16601746
sender: Arc<ClientConnectionSender>,

crates/core/src/host/v8/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ enum JsWorkerRequest {
613613
},
614614
}
615615

616-
static_assert_size!(CallReducerParams, 192);
616+
static_assert_size!(CallReducerParams, 256);
617617

618618
fn send_worker_reply<T>(ctx: &str, reply_tx: JsReplyTx<T>, value: T, trapped: bool) {
619619
if reply_tx.send(JsWorkerReply { value, trapped }).is_err() {

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::instrumentation::CallTimes;
22
use super::*;
33
use crate::client::ClientActorId;
44
use crate::database_logger;
5-
use crate::energy::{EnergyMonitor, FunctionBudget, FunctionFingerprint};
5+
use crate::energy::{EnergyMonitor, EnergyQuanta, FunctionBudget, FunctionFingerprint};
66
use crate::error::DBError;
77
use crate::host::host_controller::CallProcedureReturn;
88
use crate::host::instance_env::{InstanceEnv, TxSlot};
@@ -820,6 +820,7 @@ impl InstanceCommon {
820820
reducer_id,
821821
args,
822822
timer,
823+
dedup_sender,
823824
} = params;
824825
let caller_connection_id_opt = (caller_connection_id != ConnectionId::ZERO).then_some(caller_connection_id);
825826

@@ -844,6 +845,25 @@ impl InstanceCommon {
844845

845846
let workload = Workload::Reducer(ReducerContext::from(op.clone()));
846847
let tx = tx.unwrap_or_else(|| stdb.begin_mut_tx(IsolationLevel::Serializable, workload));
848+
849+
// Check the dedup index atomically within this transaction.
850+
// If the incoming offset is ≤ the last delivered offset for this sender,
851+
// the message is a duplicate; discard it and roll back.
852+
if let Some((sender_identity, sender_tx_offset)) = dedup_sender {
853+
let last_delivered = tx.get_databases_tx_offset(sender_identity);
854+
if last_delivered.is_some_and(|last| sender_tx_offset <= last) {
855+
let _ = stdb.rollback_mut_tx(tx);
856+
return (
857+
ReducerCallResult {
858+
outcome: ReducerOutcome::Deduplicated,
859+
energy_used: EnergyQuanta::ZERO,
860+
execution_duration: Duration::ZERO,
861+
},
862+
false,
863+
);
864+
}
865+
}
866+
847867
let mut tx_slot = inst.tx_slot();
848868

849869
let vm_metrics = self.vm_metrics.get_for_reducer_id(reducer_id);
@@ -885,12 +905,21 @@ impl InstanceCommon {
885905
Ok(return_value) => {
886906
// If this is an OnDisconnect lifecycle event, remove the client from st_clients.
887907
// We handle OnConnect events before running the reducer.
888-
let res = match reducer_def.lifecycle {
908+
let lifecycle_res = match reducer_def.lifecycle {
889909
Some(Lifecycle::OnDisconnect) => {
890910
tx.delete_st_client(caller_identity, caller_connection_id, info.database_identity)
891911
}
892912
_ => Ok(()),
893913
};
914+
// Atomically update the dedup index so this sender's tx_offset is recorded
915+
// as delivered. This prevents re-processing if the message is retried.
916+
let dedup_res = match dedup_sender {
917+
Some((sender_identity, sender_tx_offset)) => {
918+
tx.upsert_databases_tx_offset(sender_identity, sender_tx_offset)
919+
}
920+
None => Ok(()),
921+
};
922+
let res = lifecycle_res.and(dedup_res);
894923
match res {
895924
Ok(()) => (EventStatus::Committed(DatabaseUpdate::default()), return_value),
896925
Err(err) => {

0 commit comments

Comments
 (0)