Skip to content

Commit 269cf01

Browse files
committed
Merge branch 'master' into shub/hack-idc
2 parents 4d68dd6 + b2fb04a commit 269cf01

59 files changed

Lines changed: 2351 additions & 979 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

crates/bindings-typescript/src/sdk/db_connection_impl.ts

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
158158
#reducerCallInfo = new Map<number, { name: string; args: object }>();
159159
#procedureCallbacks = new Map<number, ProcedureCallback>();
160160
#rowDeserializers: Record<string, Deserializer<any>>;
161+
#rowIdMetadata: Record<
162+
string,
163+
{ primaryKeyColName?: string; primaryKeyColType?: AlgebraicType }
164+
>;
161165
#reducerArgsSerializers: Record<
162166
string,
163167
{ serialize: Serializer<any>; deserialize: Deserializer<any> }
@@ -205,6 +209,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
205209
this.#emitter = emitter;
206210

207211
this.#rowDeserializers = Object.create(null);
212+
this.#rowIdMetadata = Object.create(null);
208213
this.#sourceNameToTableDef = Object.create(null);
209214
for (const table of Object.values(remoteModule.tables)) {
210215
this.#rowDeserializers[table.sourceName] = ProductType.makeDeserializer(
@@ -213,6 +218,15 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
213218
this.#sourceNameToTableDef[table.sourceName] = table as Values<
214219
RemoteModule['tables']
215220
>;
221+
const primaryKeyColumn = Object.entries(table.columns).find(
222+
([, column]) => column.columnMetadata.isPrimaryKey
223+
);
224+
this.#rowIdMetadata[table.sourceName] = primaryKeyColumn
225+
? {
226+
primaryKeyColName: primaryKeyColumn[0],
227+
primaryKeyColType: primaryKeyColumn[1].typeBuilder.algebraicType,
228+
}
229+
: {};
216230
}
217231

218232
this.#reducerArgsSerializers = Object.create(null);
@@ -302,8 +316,6 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
302316
#makeReducers(def: RemoteModule): ReducersView<RemoteModule> {
303317
const out: Record<string, unknown> = {};
304318

305-
const writer = new BinaryWriter(1024);
306-
307319
for (const reducer of def.reducers) {
308320
const reducerName = reducer.name;
309321
const key = reducer.accessorName;
@@ -312,6 +324,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
312324
this.#reducerArgsSerializers[reducerName];
313325

314326
(out as any)[key] = (params: InferTypeOfRow<typeof reducer.params>) => {
327+
const writer = this.#reducerArgsEncoder;
315328
writer.clear();
316329
serializeArgs(writer, params);
317330
const argsBuffer = writer.getBuffer();
@@ -428,20 +441,13 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
428441
const rows: Operation[] = [];
429442

430443
const deserializeRow = this.#rowDeserializers[tableName];
431-
const table = this.#sourceNameToTableDef[tableName];
432-
// TODO: performance
433-
const columnsArray = Object.entries(table.columns);
434-
const primaryKeyColumnEntry = columnsArray.find(
435-
col => col[1].columnMetadata.isPrimaryKey
436-
);
444+
const { primaryKeyColName, primaryKeyColType } =
445+
this.#rowIdMetadata[tableName];
437446
let previousOffset = 0;
438447
while (reader.remaining > 0) {
439448
const row = deserializeRow(reader);
440449
let rowId: ComparablePrimitive | undefined = undefined;
441-
if (primaryKeyColumnEntry !== undefined) {
442-
const primaryKeyColName = primaryKeyColumnEntry[0];
443-
const primaryKeyColType =
444-
primaryKeyColumnEntry[1].typeBuilder.algebraicType;
450+
if (primaryKeyColName !== undefined && primaryKeyColType !== undefined) {
445451
rowId = AlgebraicType.intoMapKey(
446452
primaryKeyColType,
447453
row[primaryKeyColName]
@@ -548,6 +554,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
548554
}
549555
}
550556

557+
#reducerArgsEncoder = new BinaryWriter(1024);
551558
#clientMessageEncoder = new BinaryWriter(1024);
552559
#sendMessage(message: ClientMessage): void {
553560
const writer = this.#clientMessageEncoder;
@@ -906,7 +913,8 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
906913
_paramsType: ProductType,
907914
params: object
908915
): Promise<void> {
909-
const writer = new BinaryWriter(1024);
916+
const writer = this.#reducerArgsEncoder;
917+
writer.clear();
910918
this.#reducerArgsSerializers[reducerName].serialize(writer, params);
911919
const argsBuffer = writer.getBuffer();
912920
return this.callReducer(reducerName, argsBuffer, params);

crates/bindings-typescript/src/sdk/table_cache.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ export class TableCacheImpl<
6666
TableName extends TableNamesOf<RemoteModule>,
6767
> implements ClientTableCoreImplementable<RemoteModule, TableName>
6868
{
69+
private readonly hasPrimaryKey: boolean;
6970
private rows: Map<
7071
ComparablePrimitive,
7172
[RowType<TableDefForTableName<RemoteModule, TableName>>, number]
@@ -83,6 +84,9 @@ export class TableCacheImpl<
8384
this.tableDef = tableDef;
8485
this.rows = new Map();
8586
this.emitter = new EventEmitter();
87+
this.hasPrimaryKey = Object.values(this.tableDef.columns).some(
88+
col => col.columnMetadata.isPrimaryKey === true
89+
);
8690
// Build index views from the resolved runtime index metadata.
8791
//
8892
// We intentionally use `resolvedIndexes` rather than `indexes`:
@@ -281,11 +285,7 @@ export class TableCacheImpl<
281285
return pendingCallbacks;
282286
}
283287

284-
// TODO: performance
285-
const hasPrimaryKey = Object.values(this.tableDef.columns).some(
286-
col => col.columnMetadata.isPrimaryKey === true
287-
);
288-
if (hasPrimaryKey) {
288+
if (this.hasPrimaryKey) {
289289
const insertMap = new Map<
290290
ComparablePrimitive,
291291
[

crates/bindings-typescript/src/server/runtime.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,9 +667,37 @@ function makeTableView(
667667
index = base as UniqueIndex<any, any>;
668668
} else if (serializeSinglePoint) {
669669
// numColumns == 1
670+
671+
const serializeSingleRange = !isHashIndex
672+
? (buffer: ResizableBuffer, range: Range<any>): IndexScanArgs => {
673+
BINARY_WRITER.reset(buffer);
674+
const writer = BINARY_WRITER;
675+
const writeBound = (bound: Bound<any>) => {
676+
const tags = { included: 0, excluded: 1, unbounded: 2 };
677+
writer.writeU8(tags[bound.tag]);
678+
if (bound.tag !== 'unbounded')
679+
serializeSingleElement!(writer, bound.value);
680+
};
681+
writeBound(range.from);
682+
const rstartLen = writer.offset;
683+
writeBound(range.to);
684+
const rendLen = writer.offset - rstartLen;
685+
return [0, 0, rstartLen, rendLen];
686+
}
687+
: null;
688+
670689
const rawIndex = {
671690
filter: (range: any): IteratorObject<RowType<any>> => {
672691
const buf = LEAF_BUF;
692+
if (serializeSingleRange && range instanceof Range) {
693+
const args = serializeSingleRange(buf, range);
694+
const iter_id = sys.datastore_index_scan_range_bsatn(
695+
index_id,
696+
buf.buffer,
697+
...args
698+
);
699+
return tableIterator(iter_id, deserializeRow);
700+
}
673701
const point_len = serializeSinglePoint(buf, range);
674702
const iter_id = sys.datastore_index_scan_point_bsatn(
675703
index_id,
@@ -680,6 +708,14 @@ function makeTableView(
680708
},
681709
delete: (range: any): u32 => {
682710
const buf = LEAF_BUF;
711+
if (serializeSingleRange && range instanceof Range) {
712+
const args = serializeSingleRange(buf, range);
713+
return sys.datastore_delete_by_index_scan_range_bsatn(
714+
index_id,
715+
buf.buffer,
716+
...args
717+
);
718+
}
683719
const point_len = serializeSinglePoint(buf, range);
684720
return sys.datastore_delete_by_index_scan_point_bsatn(
685721
index_id,

crates/core/src/config.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,12 @@ impl MetadataFile {
5656
path.write(self.to_string())
5757
}
5858

59-
fn check_compatibility(previous: &Self, current: &Self) -> anyhow::Result<()> {
59+
fn check_compatibility(previous: &Self, current: &Self, metafile: &Path) -> anyhow::Result<()> {
6060
anyhow::ensure!(
6161
previous.edition == current.edition,
62-
"metadata.toml indicates that this database is from a different \
62+
"metadata.toml at {} indicates that this database is from a different \
6363
edition of SpacetimeDB (running {:?}, but this database is {:?})",
64+
metafile.display(),
6465
current.edition,
6566
previous.edition,
6667
);
@@ -110,8 +111,8 @@ impl MetadataFile {
110111
/// `self` is the metadata file read from a database, and current is
111112
/// the default metadata file that the active database version would
112113
/// right to a new database.
113-
pub fn check_compatibility_and_update(mut self, current: Self) -> anyhow::Result<Self> {
114-
Self::check_compatibility(&self, &current)?;
114+
pub fn check_compatibility_and_update(mut self, current: Self, metafile: &Path) -> anyhow::Result<Self> {
115+
Self::check_compatibility(&self, &current, metafile)?;
115116
// bump the version in the file only if it's being run in a newer database.
116117
self.version = std::cmp::max(self.version, current.version);
117118
Ok(self)
@@ -344,67 +345,67 @@ mod tests {
344345
fn check_metadata_compatibility_checking() {
345346
assert_eq!(
346347
mkmeta(1, 0, 0)
347-
.check_compatibility_and_update(mkmeta(1, 0, 1))
348+
.check_compatibility_and_update(mkmeta(1, 0, 1), Path::new("metadata.toml"))
348349
.unwrap()
349350
.version,
350351
mkver(1, 0, 1)
351352
);
352353
assert_eq!(
353354
mkmeta(1, 0, 1)
354-
.check_compatibility_and_update(mkmeta(1, 0, 0))
355+
.check_compatibility_and_update(mkmeta(1, 0, 0), Path::new("metadata.toml"))
355356
.unwrap()
356357
.version,
357358
mkver(1, 0, 1)
358359
);
359360

360361
mkmeta(1, 1, 0)
361-
.check_compatibility_and_update(mkmeta(1, 0, 5))
362+
.check_compatibility_and_update(mkmeta(1, 0, 5), Path::new("metadata.toml"))
362363
.unwrap_err();
363364
mkmeta(2, 0, 0)
364-
.check_compatibility_and_update(mkmeta(1, 3, 5))
365+
.check_compatibility_and_update(mkmeta(1, 3, 5), Path::new("metadata.toml"))
365366
.unwrap_err();
366367
assert_eq!(
367368
mkmeta(1, 12, 0)
368-
.check_compatibility_and_update(mkmeta(2, 0, 0))
369+
.check_compatibility_and_update(mkmeta(2, 0, 0), Path::new("metadata.toml"))
369370
.unwrap()
370371
.version,
371372
mkver(2, 0, 0)
372373
);
373374
mkmeta(2, 0, 0)
374-
.check_compatibility_and_update(mkmeta(3, 0, 0))
375+
.check_compatibility_and_update(mkmeta(3, 0, 0), Path::new("metadata.toml"))
375376
.unwrap_err();
376377
}
377378

378379
#[test]
379380
fn check_metadata_compatibility_prerelease() {
380381
mkmeta(1, 9, 0)
381-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
382+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
382383
.unwrap();
383384

384385
mkmeta_pre(2, 0, 0, "rc1")
385-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
386+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
386387
.unwrap();
387388

388389
mkmeta_pre(2, 0, 0, "rc1")
389-
.check_compatibility_and_update(mkmeta(2, 0, 1))
390+
.check_compatibility_and_update(mkmeta(2, 0, 1), Path::new("metadata.toml"))
390391
.unwrap();
391392

392393
mkmeta_pre(2, 0, 0, "rc1")
393-
.check_compatibility_and_update(mkmeta(2, 0, 0))
394+
.check_compatibility_and_update(mkmeta(2, 0, 0), Path::new("metadata.toml"))
394395
.unwrap();
395396

396397
// Now check some failures..
397398

398399
mkmeta_pre(2, 0, 0, "rc1")
399-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc2"))
400+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc2"), Path::new("metadata.toml"))
400401
.unwrap_err();
401402

402403
mkmeta_pre(2, 0, 0, "rc2")
403-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
404+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
404405
.unwrap_err();
405406

406407
mkmeta(2, 0, 0)
407-
.check_compatibility_and_update(mkmeta_pre(2, 1, 0, "rc1"))
408+
.check_compatibility_and_update(mkmeta_pre(2, 1, 0, "rc1"), Path::new("metadata.toml"))
408409
.unwrap_err();
409410
}
410411

crates/core/src/db/relational_db.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1717
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1818
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
1919
};
20-
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
20+
use spacetimedb_datastore::locking_tx_datastore::{IndexScanPointOrRange, MutTxId, TxId};
2121
use spacetimedb_datastore::system_tables::{
2222
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
2323
};
@@ -55,10 +55,11 @@ use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotError, SnapshotReposit
5555
use spacetimedb_table::indexes::RowPointer;
5656
use spacetimedb_table::page_pool::PagePool;
5757
use spacetimedb_table::table::{RowRef, TableScanIter};
58+
use spacetimedb_table::table_index::IndexKey;
5859
use std::borrow::Cow;
5960
use std::io;
6061
use std::num::NonZeroUsize;
61-
use std::ops::{Bound, RangeBounds};
62+
use std::ops::RangeBounds;
6263
use std::sync::Arc;
6364
use tokio::sync::watch;
6465

@@ -1394,32 +1395,24 @@ impl RelationalDB {
13941395
Ok(self.inner.iter_by_col_range_tx(tx, table_id.into(), cols, range)?)
13951396
}
13961397

1397-
pub fn index_scan_range<'a>(
1398+
pub fn index_scan_range<'de, 'a>(
13981399
&'a self,
13991400
tx: &'a MutTx,
14001401
index_id: IndexId,
1401-
prefix: &[u8],
1402+
prefix: &'de [u8],
14021403
prefix_elems: ColId,
1403-
rstart: &[u8],
1404-
rend: &[u8],
1405-
) -> Result<
1406-
(
1407-
TableId,
1408-
Bound<AlgebraicValue>,
1409-
Bound<AlgebraicValue>,
1410-
impl Iterator<Item = RowRef<'a>> + use<'a>,
1411-
),
1412-
DBError,
1413-
> {
1404+
rstart: &'de [u8],
1405+
rend: &'de [u8],
1406+
) -> Result<(TableId, IndexScanPointOrRange<'de, 'a>), DBError> {
14141407
Ok(tx.index_scan_range(index_id, prefix, prefix_elems, rstart, rend)?)
14151408
}
14161409

1417-
pub fn index_scan_point<'a>(
1410+
pub fn index_scan_point<'a, 'p>(
14181411
&'a self,
14191412
tx: &'a MutTx,
14201413
index_id: IndexId,
1421-
point: &[u8],
1422-
) -> Result<(TableId, AlgebraicValue, impl Iterator<Item = RowRef<'a>> + use<'a>), DBError> {
1414+
point: &'p [u8],
1415+
) -> Result<(TableId, IndexKey<'p>, impl Iterator<Item = RowRef<'a>> + use<'a>), DBError> {
14231416
Ok(tx.index_scan_point(index_id, point)?)
14241417
}
14251418

crates/core/src/host/instance_env.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use spacetimedb_client_api_messages::energy::EnergyQuanta;
1818
use spacetimedb_datastore::db_metrics::DB_METRICS;
1919
use spacetimedb_datastore::execution_context::Workload;
2020
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
21-
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId};
21+
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, IndexScanPointOrRange, MutTxId};
2222
use spacetimedb_datastore::traits::IsolationLevel;
2323
use spacetimedb_lib::{http as st_http, ConnectionId, Identity, Timestamp};
2424
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
@@ -539,9 +539,12 @@ impl InstanceEnv {
539539
let tx = &mut *self.get_tx()?;
540540

541541
// Find all rows in the table to delete.
542-
let (table_id, _, _, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
542+
let (table_id, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
543543
// Re. `SmallVec`, `delete_by_field` only cares about 1 element, so optimize for that.
544-
let rows_to_delete = iter.map(|row_ref| row_ref.pointer()).collect::<SmallVec<[_; 1]>>();
544+
let rows_to_delete = match iter {
545+
IndexScanPointOrRange::Point(_, iter) => iter.map(|row_ref| row_ref.pointer()).collect(),
546+
IndexScanPointOrRange::Range(iter) => iter.map(|row_ref| row_ref.pointer()).collect(),
547+
};
545548

546549
Ok(Self::datastore_delete_by_index_scan(stdb, tx, table_id, rows_to_delete))
547550
}
@@ -703,19 +706,22 @@ impl InstanceEnv {
703706
let tx = &mut *self.get_tx()?;
704707

705708
// Open index iterator
706-
let (table_id, lower, upper, iter) =
709+
let (table_id, iter) =
707710
self.relational_db()
708711
.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
709712

710713
// Scan the index and serialize rows to BSATN.
711-
let (chunks, rows_scanned, bytes_scanned) = ChunkedWriter::collect_iter(pool, iter);
714+
let (point, (chunks, rows_scanned, bytes_scanned)) = match iter {
715+
IndexScanPointOrRange::Point(point, iter) => (Some(point), ChunkedWriter::collect_iter(pool, iter)),
716+
IndexScanPointOrRange::Range(iter) => (None, ChunkedWriter::collect_iter(pool, iter)),
717+
};
712718

713719
// Record the number of rows and the number of bytes scanned by the iterator.
714720
tx.metrics.index_seeks += 1;
715721
tx.metrics.bytes_scanned += bytes_scanned;
716722
tx.metrics.rows_scanned += rows_scanned;
717723

718-
tx.record_index_scan_range(&self.func_type, table_id, index_id, lower, upper);
724+
tx.record_index_scan_range(&self.func_type, table_id, index_id, point);
719725

720726
Ok(chunks)
721727
}

0 commit comments

Comments
 (0)