Skip to content

Commit 184d4e9

Browse files
cloutiertylergefjonjsdt
authored
Implement server-side support for the v2 websocket protocol (#4213)
# Description of Changes This adds the v2 websocket protocol and adds support on the server side. For context on many of the changes/decisions, you can look at the discussion on #4023. To restate some of the key changes: - The reducer event information is no longer sent with transaction updates (because we don't want to broadcast reducer call information anymore). - If a client calls a reducer, they are sent a `ReducerResult` which includes the outcome of the reducer call and and related row updates for queries that the client is subscribed to. - We no longer dedupe queries that appear in multiple query sets for the same client. This is because we are moving toward per-query storage. - Related to that, Unsubscribe requests have an option to send the related rows. We need this for now, since clients don't have per-query storage implemented yet. - We don't have the json format in v2. Notes for reviewers: - This moves around the messages in `crates/client-api-messages/src/websocket` (into `common`, `v1`, and `v2`), and this renaming of existing messages adds a lot of noise to the PR. - In many places, I chose to duplicate a lot of code to have a v1 version and a v2 version. I went with this to make it easier to remove the v1 version in the future (hopefully we can just fully delete most of the v1 functions). - `module_subscription_manager.rs` has probably has the biggest changes, since we now track queries by query_set_id, and we get to remove some complexity of v1's FormatSwitch. <!-- Please describe your change, mention any related tickets, and so on here. --> # API and ABI breaking changes The v1 protocol still works, though we won't send the reducer event info for v10 modules. # Expected complexity level and risk 4. This touches a lot of places. # Testing Unit testing is pretty minimal for the new code paths. I've done some manual e2e testing with the typescript quickstart, and this has been tested with a different branch implementing the v2 rust client. --------- Co-authored-by: Phoebe Goldman <phoebe@goldman-tribe.org> Co-authored-by: Jeffrey Dallatezza <jeffreydallatezza@gmail.com>
1 parent 12f931e commit 184d4e9

51 files changed

Lines changed: 4456 additions & 1813 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci.yml

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -958,14 +958,15 @@ jobs:
958958
exit 1
959959
}
960960
961-
- name: Check client-api bindings are up to date
962-
working-directory: sdks/csharp
963-
run: |
964-
bash tools~/gen-client-api.sh
965-
"${GITHUB_WORKSPACE}"/tools/check-diff.sh src/SpacetimeDB/ClientApi || {
966-
echo 'Error: Client API bindings are dirty. Please run `sdks/csharp/tools~/gen-client-api.sh`.'
967-
exit 1
968-
}
961+
# TODO: Re-enable this once csharp is using the v2 ws api.
962+
# - name: Check client-api bindings are up to date
963+
# working-directory: sdks/csharp
964+
# run: |
965+
# bash tools~/gen-client-api.sh
966+
# "${GITHUB_WORKSPACE}"/tools/check-diff.sh src/SpacetimeDB/ClientApi || {
967+
# echo 'Error: Client API bindings are dirty. Please run `sdks/csharp/tools~/gen-client-api.sh`.'
968+
# exit 1
969+
# }
969970

970971
- name: Start SpacetimeDB
971972
run: |

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/bench/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ bench = false
3838

3939
[dependencies]
4040
spacetimedb-client-api = { path = "../client-api" }
41+
spacetimedb-client-api-messages = { path = "../client-api-messages" }
4142
spacetimedb-core = { path = "../core", features = ["test"] }
4243
spacetimedb-data-structures.workspace = true
4344
spacetimedb-datastore.workspace = true

crates/bench/benches/subscription.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
use criterion::{black_box, criterion_group, criterion_main, Criterion};
22
use spacetimedb::client::consume_each_list::ConsumeEachBuffer;
3+
use spacetimedb::db::relational_db::RelationalDB;
34
use spacetimedb::error::DBError;
45
use spacetimedb::host::module_host::DatabaseTableUpdate;
56
use spacetimedb::identity::AuthCtx;
6-
use spacetimedb::messages::websocket::BsatnFormat;
77
use spacetimedb::sql::ast::SchemaViewer;
88
use spacetimedb::subscription::query::compile_read_only_queryset;
99
use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool;
1010
use spacetimedb::subscription::subscription::ExecutionSet;
1111
use spacetimedb::subscription::tx::DeltaTx;
1212
use spacetimedb::subscription::{collect_table_update, TableUpdateType};
13-
use spacetimedb::{db::relational_db::RelationalDB, messages::websocket::Compression};
1413
use spacetimedb_bench::database::BenchDatabase as _;
1514
use spacetimedb_bench::spacetime_raw::SpacetimeRaw;
15+
use spacetimedb_client_api_messages::websocket::v1::{BsatnFormat, Compression};
1616
use spacetimedb_datastore::execution_context::Workload;
1717
use spacetimedb_execution::pipelined::PipelinedProject;
1818
use spacetimedb_primitives::{col_list, TableId};

crates/cli/src/subcommands/subscribe.rs

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use futures::{Sink, SinkExt, TryStream, TryStreamExt};
44
use http::header;
55
use reqwest::Url;
66
use serde_json::Value;
7-
use spacetimedb_client_api_messages::websocket::{self as ws, JsonFormat};
7+
use spacetimedb_client_api_messages::websocket::v1 as ws_v1;
88
use spacetimedb_data_structures::map::HashMap;
99
use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9;
1010
use spacetimedb_lib::de::serde::{DeserializeWrapper, SeedWrapper};
@@ -71,16 +71,16 @@ pub fn cli() -> clap::Command {
7171
.arg(common_args::server().help("The nickname, host name or URL of the server hosting the database"))
7272
}
7373

74-
fn parse_msg_json(msg: &WsMessage) -> Option<ws::ServerMessage<JsonFormat>> {
74+
fn parse_msg_json(msg: &WsMessage) -> Option<ws_v1::ServerMessage<ws_v1::JsonFormat>> {
7575
let WsMessage::Text(msg) = msg else { return None };
76-
serde_json::from_str::<DeserializeWrapper<ws::ServerMessage<JsonFormat>>>(msg)
76+
serde_json::from_str::<DeserializeWrapper<ws_v1::ServerMessage<ws_v1::JsonFormat>>>(msg)
7777
.inspect_err(|e| eprintln!("couldn't parse message from server: {e}"))
7878
.map(|wrapper| wrapper.0)
7979
.ok()
8080
}
8181

8282
fn reformat_update<'a>(
83-
msg: &'a ws::DatabaseUpdate<JsonFormat>,
83+
msg: &'a ws_v1::DatabaseUpdate<ws_v1::JsonFormat>,
8484
schema: &RawModuleDefV9,
8585
) -> anyhow::Result<HashMap<&'a str, SubscriptionTable>> {
8686
msg.tables
@@ -152,7 +152,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
152152
let mut req = url.into_client_request()?;
153153
req.headers_mut().insert(
154154
header::SEC_WEBSOCKET_PROTOCOL,
155-
http::HeaderValue::from_static(ws::TEXT_PROTOCOL),
155+
http::HeaderValue::from_static(ws_v1::TEXT_PROTOCOL),
156156
);
157157
// Add the authorization header, if any.
158158
if let Some(auth_header) = api.con.auth_header.to_header() {
@@ -241,8 +241,8 @@ async fn subscribe<S>(ws: &mut S, query_strings: Box<[Box<str>]>) -> Result<(),
241241
where
242242
S: Sink<WsMessage, Error = WsError> + Unpin,
243243
{
244-
let msg = serde_json::to_string(&SerializeWrapper::new(ws::ClientMessage::<()>::Subscribe(
245-
ws::Subscribe {
244+
let msg = serde_json::to_string(&SerializeWrapper::new(ws_v1::ClientMessage::<()>::Subscribe(
245+
ws_v1::Subscribe {
246246
query_strings,
247247
request_id: 0,
248248
},
@@ -262,22 +262,22 @@ where
262262
while let Some(msg) = ws.try_next().await.map_err(|source| Error::Websocket { source })? {
263263
let Some(msg) = parse_msg_json(&msg) else { continue };
264264
match msg {
265-
ws::ServerMessage::InitialSubscription(sub) => {
265+
ws_v1::ServerMessage::InitialSubscription(sub) => {
266266
if let Some(module_def) = module_def {
267267
let output = format_output_json(&sub.database_update, module_def)?;
268268
tokio::io::stdout().write_all(output.as_bytes()).await?
269269
}
270270
break;
271271
}
272-
ws::ServerMessage::TransactionUpdate(ws::TransactionUpdate { status, .. }) => {
272+
ws_v1::ServerMessage::TransactionUpdate(ws_v1::TransactionUpdate { status, .. }) => {
273273
return Err(match status {
274-
ws::UpdateStatus::Failed(msg) => Error::TransactionFailure { reason: msg },
274+
ws_v1::UpdateStatus::Failed(msg) => Error::TransactionFailure { reason: msg },
275275
_ => Error::Protocol {
276276
details: RECV_TX_UPDATE,
277277
},
278278
})
279279
}
280-
ws::ServerMessage::TransactionUpdateLight(ws::TransactionUpdateLight { .. }) => {
280+
ws_v1::ServerMessage::TransactionUpdateLight(ws_v1::TransactionUpdateLight { .. }) => {
281281
return Err(Error::Protocol {
282282
details: RECV_TX_UPDATE,
283283
})
@@ -310,14 +310,14 @@ where
310310

311311
let Some(msg) = parse_msg_json(&msg) else { continue };
312312
match msg {
313-
ws::ServerMessage::InitialSubscription(_) => {
313+
ws_v1::ServerMessage::InitialSubscription(_) => {
314314
return Err(Error::Protocol {
315315
details: "received a second initial subscription update",
316316
})
317317
}
318-
ws::ServerMessage::TransactionUpdateLight(ws::TransactionUpdateLight { update, .. })
319-
| ws::ServerMessage::TransactionUpdate(ws::TransactionUpdate {
320-
status: ws::UpdateStatus::Committed(update),
318+
ws_v1::ServerMessage::TransactionUpdateLight(ws_v1::TransactionUpdateLight { update, .. })
319+
| ws_v1::ServerMessage::TransactionUpdate(ws_v1::TransactionUpdate {
320+
status: ws_v1::UpdateStatus::Committed(update),
321321
..
322322
}) => {
323323
let output = format_output_json(&update, module_def)?;
@@ -329,7 +329,10 @@ where
329329
}
330330
}
331331

332-
fn format_output_json(msg: &ws::DatabaseUpdate<JsonFormat>, schema: &RawModuleDefV9) -> Result<String, Error> {
332+
fn format_output_json(
333+
msg: &ws_v1::DatabaseUpdate<ws_v1::JsonFormat>,
334+
schema: &RawModuleDefV9,
335+
) -> Result<String, Error> {
333336
let formatted = reformat_update(msg, schema).map_err(|source| Error::Reformat { source })?;
334337
let output = serde_json::to_string(&formatted)? + "\n";
335338

crates/client-api-messages/examples/get_ws_schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use bytes::Bytes;
2-
use spacetimedb_client_api_messages::websocket::{BsatnFormat, ClientMessage, ServerMessage};
2+
use spacetimedb_client_api_messages::websocket::v1::{BsatnFormat, ClientMessage, ServerMessage};
33
use spacetimedb_lib::ser::serde::SerializeWrapper;
44
use spacetimedb_lib::{RawModuleDef, RawModuleDefV8};
55

0 commit comments

Comments
 (0)