@@ -1198,7 +1198,7 @@ impl CommittedState {
11981198 & mut self ,
11991199 tx_data : & mut TxData ,
12001200 insert_tables : BTreeMap < TableId , Table > ,
1201- tx_blob_store : impl BlobStore ,
1201+ tx_bs : impl BlobStore ,
12021202 truncates : & mut IntSet < TableId > ,
12031203 ) {
12041204 // TODO(perf): Consider moving whole pages from the `insert_tables` into the committed state,
@@ -1210,40 +1210,65 @@ impl CommittedState {
12101210 // and the fullness of the page.
12111211
12121212 for ( table_id, tx_table) in insert_tables {
1213- // For each newly-inserted row, serialize to a product value.
1214- let mut inserts = Vec :: with_capacity ( tx_table. row_count as usize ) ;
1215- inserts. extend ( tx_table. scan_rows ( & tx_blob_store) . map ( |row| row. to_product_value ( ) ) ) ;
1216-
1217- // For each newly-inserted row, serialize to a product value.
1218- // This doesn't apply to event tables,
1219- // which are only recorded in `TxData`,
1220- // but never the committed state.
12211213 let schema = tx_table. get_schema ( ) ;
1222- if !schema. is_event {
1214+ let page_pool = & self . page_pool ;
1215+ if schema. is_event {
1216+ // For event tables, we don't want to insert into the committed state,
1217+ // we just want to include them in subscriptions and the commitlog.
1218+ Self :: collect_inserts ( page_pool, truncates, tx_data, & tx_bs, table_id, tx_table, |_| { } ) ;
1219+ } else {
12231220 let ( commit_table, commit_blob_store, page_pool) =
1224- self . get_table_and_blob_store_or_create ( table_id, tx_table . get_schema ( ) ) ;
1225- for row in & inserts {
1221+ self . get_table_and_blob_store_or_create ( table_id, schema ) ;
1222+ Self :: collect_inserts ( page_pool , truncates , tx_data , & tx_bs , table_id , tx_table , |row| {
12261223 commit_table
12271224 . insert ( page_pool, commit_blob_store, row)
12281225 . expect ( "Failed to insert when merging commit" ) ;
1229- }
1226+ } ) ;
12301227 }
1228+ }
1229+ }
1230+
1231+ /// Collects the inserted rows in `tx_table` into `tx_data`,
1232+ /// and applies `on_row` to each inserted row.
1233+ ///
1234+ /// The `on_row` closure will be called with each inserted row.
1235+ /// `Self::merge_apply_inserts` uses this to add non-event rows to the committed state.
1236+ fn collect_inserts (
1237+ page_pool : & PagePool ,
1238+ truncates : & mut IntSet < TableId > ,
1239+ tx_data : & mut TxData ,
1240+ tx_blob_store : & impl BlobStore ,
1241+ table_id : TableId ,
1242+ tx_table : Table ,
1243+ mut on_row : impl FnMut ( & ProductValue ) ,
1244+ ) {
1245+ // For each newly-inserted row, serialize to a product value.
1246+ // This bypasses the `Vec<_>` intermediary and constructs the `Arc<[_]>` directly,
1247+ // which matters somewhat for smaller transactions and more for larger transactions.
1248+ let mut inserts = Arc :: new_uninit_slice ( tx_table. row_count as usize ) ;
1249+ let inserts_mut = Arc :: get_mut ( & mut inserts) . expect ( "`Arc` should be unique as it was just created" ) ;
1250+ for ( row, slot) in tx_table. scan_rows ( tx_blob_store) . zip ( inserts_mut) {
1251+ let row = row. to_product_value ( ) ;
1252+ on_row ( & row) ;
1253+ slot. write ( row) ;
1254+ }
1255+ // SAFETY: We've written to every slot in `inserts`, so it's now fully initialized.
1256+ let inserts = unsafe { inserts. assume_init ( ) } ;
12311257
1232- // Add the table to `TxData` if there were insertions.
1233- if !inserts. is_empty ( ) {
1234- tx_data. set_inserts_for_table ( table_id, & schema . table_name , inserts. into ( ) ) ;
1258+ // Add the table to `TxData` if there were insertions.
1259+ if !inserts. is_empty ( ) {
1260+ tx_data. set_inserts_for_table ( table_id, & tx_table . get_schema ( ) . table_name , inserts) ;
12351261
1236- // If table has inserted rows, it cannot be truncated.
1237- if truncates. contains ( & table_id) {
1238- truncates. remove ( & table_id) ;
1239- }
1262+ // If table has inserted rows, it cannot be truncated.
1263+ if truncates. contains ( & table_id) {
1264+ truncates. remove ( & table_id) ;
12401265 }
1266+ }
12411267
1242- let ( .., pages) = tx_table. consume_for_merge ( ) ;
1268+ let ( .., pages) = tx_table. consume_for_merge ( ) ;
12431269
1244- // Put all the pages in the table back into the pool.
1245- self . page_pool . put_many ( pages) ;
1246- }
1270+ // Put all the pages in the table back into the pool.
1271+ page_pool. put_many ( pages) ;
12471272 }
12481273
12491274 /// Rolls back the changes immediately made to the committed state during a transaction.
0 commit comments