@@ -341,6 +341,7 @@ impl From<PhysicalPlan> for PipelinedExecutor {
341341 lhs,
342342 rhs,
343343 rhs_index,
344+ rhs_prefix,
344345 rhs_field,
345346 unique,
346347 lhs_field,
@@ -352,6 +353,7 @@ impl From<PhysicalPlan> for PipelinedExecutor {
352353 lhs : Box :: new ( Self :: from ( * lhs) ) ,
353354 rhs_table : rhs. table_id ,
354355 rhs_index,
356+ rhs_prefix,
355357 rhs_field,
356358 lhs_field,
357359 unique,
@@ -362,6 +364,7 @@ impl From<PhysicalPlan> for PipelinedExecutor {
362364 lhs,
363365 rhs,
364366 rhs_index,
367+ rhs_prefix,
365368 rhs_field,
366369 unique,
367370 lhs_field,
@@ -373,6 +376,7 @@ impl From<PhysicalPlan> for PipelinedExecutor {
373376 lhs : Box :: new ( Self :: from ( * lhs) ) ,
374377 rhs_table : rhs. table_id ,
375378 rhs_index,
379+ rhs_prefix,
376380 rhs_field,
377381 rhs_delta,
378382 lhs_field,
@@ -923,6 +927,16 @@ fn combine_prefix_and_last(prefix: Vec<(ColId, AlgebraicValue)>, last: Algebraic
923927 }
924928}
925929
930+ fn combine_probe_prefix_and_last ( prefix : & [ AlgebraicValue ] , last : AlgebraicValue ) -> AlgebraicValue {
931+ if prefix. is_empty ( ) {
932+ last
933+ } else {
934+ AlgebraicValue :: product ( ProductValue :: from_iter (
935+ prefix. iter ( ) . cloned ( ) . chain ( std:: iter:: once ( last) ) ,
936+ ) )
937+ }
938+ }
939+
926940impl PipelinedIxScanEq {
927941 /// We don't know statically if an index scan will return rows
928942 pub fn is_empty ( & self , _: & impl DeltaStore ) -> bool {
@@ -969,6 +983,8 @@ pub struct PipelinedIxJoin {
969983 pub rhs_table : TableId ,
970984 /// The rhs index
971985 pub rhs_index : IndexId ,
986+ /// Constant prefix values for multi-column index probes.
987+ pub rhs_prefix : Vec < AlgebraicValue > ,
972988 /// The rhs join field
973989 pub rhs_field : ColId ,
974990 /// The lhs join field
@@ -996,7 +1012,7 @@ impl PipelinedIxJoin {
9961012 let mut bytes_scanned = 0 ;
9971013
9981014 let iter_rhs = |u : & Tuple , lhs_field : & TupleField , bytes_scanned : & mut usize | -> Result < _ > {
999- let key = project ( u, lhs_field, bytes_scanned) ;
1015+ let key = combine_probe_prefix_and_last ( & self . rhs_prefix , project ( u, lhs_field, bytes_scanned) ) ;
10001016 Ok ( tx
10011017 . index_scan_point ( self . rhs_table , self . rhs_index , & key) ?
10021018 . map ( Row :: Ptr )
@@ -1135,6 +1151,8 @@ pub struct PipelinedIxDeltaJoin {
11351151 pub rhs_delta : Delta ,
11361152 /// The rhs index
11371153 pub rhs_index : IndexId ,
1154+ /// Constant prefix values for multi-column index probes.
1155+ pub rhs_prefix : Vec < AlgebraicValue > ,
11381156 /// The rhs join field
11391157 pub rhs_field : ColId ,
11401158 /// The lhs join field
@@ -1177,13 +1195,10 @@ impl PipelinedIxDeltaJoin {
11771195 lhs. execute ( tx, metrics, & mut |u| {
11781196 n += 1 ;
11791197 index_seeks += 1 ;
1198+ let key =
1199+ combine_probe_prefix_and_last ( & self . rhs_prefix , project ( & u, lhs_field, & mut bytes_scanned) ) ;
11801200 if tx
1181- . index_scan_point_for_delta (
1182- self . rhs_table ,
1183- self . rhs_index ,
1184- self . rhs_delta ,
1185- & project ( & u, lhs_field, & mut bytes_scanned) ,
1186- )
1201+ . index_scan_point_for_delta ( self . rhs_table , self . rhs_index , self . rhs_delta , & key)
11871202 . next ( )
11881203 . is_some ( )
11891204 {
@@ -1203,13 +1218,10 @@ impl PipelinedIxDeltaJoin {
12031218 lhs. execute ( tx, metrics, & mut |u| {
12041219 n += 1 ;
12051220 index_seeks += 1 ;
1221+ let key =
1222+ combine_probe_prefix_and_last ( & self . rhs_prefix , project ( & u, lhs_field, & mut bytes_scanned) ) ;
12061223 if let Some ( v) = tx
1207- . index_scan_point_for_delta (
1208- self . rhs_table ,
1209- self . rhs_index ,
1210- self . rhs_delta ,
1211- & project ( & u, lhs_field, & mut bytes_scanned) ,
1212- )
1224+ . index_scan_point_for_delta ( self . rhs_table , self . rhs_index , self . rhs_delta , & key)
12131225 . next ( )
12141226 . map ( Tuple :: Row )
12151227 {
@@ -1229,13 +1241,10 @@ impl PipelinedIxDeltaJoin {
12291241 lhs. execute ( tx, metrics, & mut |u| {
12301242 n += 1 ;
12311243 index_seeks += 1 ;
1244+ let key =
1245+ combine_probe_prefix_and_last ( & self . rhs_prefix , project ( & u, lhs_field, & mut bytes_scanned) ) ;
12321246 if let Some ( v) = tx
1233- . index_scan_point_for_delta (
1234- self . rhs_table ,
1235- self . rhs_index ,
1236- self . rhs_delta ,
1237- & project ( & u, lhs_field, & mut bytes_scanned) ,
1238- )
1247+ . index_scan_point_for_delta ( self . rhs_table , self . rhs_index , self . rhs_delta , & key)
12391248 . next ( )
12401249 . map ( Tuple :: Row )
12411250 {
@@ -1256,13 +1265,10 @@ impl PipelinedIxDeltaJoin {
12561265 lhs. execute ( tx, metrics, & mut |u| {
12571266 n += 1 ;
12581267 index_seeks += 1 ;
1268+ let key =
1269+ combine_probe_prefix_and_last ( & self . rhs_prefix , project ( & u, lhs_field, & mut bytes_scanned) ) ;
12591270 for _ in 0 ..tx
1260- . index_scan_point_for_delta (
1261- self . rhs_table ,
1262- self . rhs_index ,
1263- self . rhs_delta ,
1264- & project ( & u, lhs_field, & mut bytes_scanned) ,
1265- )
1271+ . index_scan_point_for_delta ( self . rhs_table , self . rhs_index , self . rhs_delta , & key)
12661272 . count ( )
12671273 {
12681274 f ( u. clone ( ) ) ?;
@@ -1281,13 +1287,10 @@ impl PipelinedIxDeltaJoin {
12811287 lhs. execute ( tx, metrics, & mut |u| {
12821288 n += 1 ;
12831289 index_seeks += 1 ;
1290+ let key =
1291+ combine_probe_prefix_and_last ( & self . rhs_prefix , project ( & u, lhs_field, & mut bytes_scanned) ) ;
12841292 for v in tx
1285- . index_scan_point_for_delta (
1286- self . rhs_table ,
1287- self . rhs_index ,
1288- self . rhs_delta ,
1289- & project ( & u, lhs_field, & mut bytes_scanned) ,
1290- )
1293+ . index_scan_point_for_delta ( self . rhs_table , self . rhs_index , self . rhs_delta , & key)
12911294 . map ( Tuple :: Row )
12921295 {
12931296 f ( v) ?;
@@ -1306,13 +1309,10 @@ impl PipelinedIxDeltaJoin {
13061309 lhs. execute ( tx, metrics, & mut |u| {
13071310 n += 1 ;
13081311 index_seeks += 1 ;
1312+ let key =
1313+ combine_probe_prefix_and_last ( & self . rhs_prefix , project ( & u, lhs_field, & mut bytes_scanned) ) ;
13091314 for v in tx
1310- . index_scan_point_for_delta (
1311- self . rhs_table ,
1312- self . rhs_index ,
1313- self . rhs_delta ,
1314- & project ( & u, lhs_field, & mut bytes_scanned) ,
1315- )
1315+ . index_scan_point_for_delta ( self . rhs_table , self . rhs_index , self . rhs_delta , & key)
13161316 . map ( Tuple :: Row )
13171317 {
13181318 f ( u. clone ( ) . join ( v. clone ( ) ) ) ?;
0 commit comments