Skip to content

Commit f61daba

Browse files
committed
feat(limit): support physical limit operator
1 parent 6a66644 commit f61daba

9 files changed

Lines changed: 117 additions & 5 deletions

File tree

src/binder/select.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,18 @@ mod tests {
394394
plan_6
395395
);
396396

397+
let plan_7 = select_sql_run("select * from t1 limit 1")?;
398+
println!(
399+
"limit:\n {:#?}",
400+
plan_7
401+
);
402+
403+
let plan_8 = select_sql_run("select * from t1 offset 2")?;
404+
println!(
405+
"offset:\n {:#?}",
406+
plan_8
407+
);
408+
397409
Ok(())
398410
}
399411
}

src/db.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ mod test {
164164
let vec_batch_projection_a = kipsql.run("select a from t1 where a <= b order by a desc ").await?;
165165
print_batches(&vec_batch_projection_a)?;
166166

167+
println!("limit:");
168+
let vec_batch_limit=kipsql.run("select * from t1 limit 2 offset 1").await?;
169+
print_batches(&vec_batch_limit)?;
170+
167171
Ok(())
168172
})
169173
}

src/execution/physical/physical_plan_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl PhysicalPlanBuilder {
5959
Ok(PhysicalOperator::Limit(PhysicalLimit {
6060
plan_id: self.next_plan_id(),
6161
input: Arc::new(input),
62-
limit: limit.count,
62+
limit: limit.limit,
6363
offset: limit.offset,
6464
}))
6565
}

src/execution_v1/physical_plan/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::execution_v1::physical_plan::physical_create_table::PhysicalCreateTable;
22
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
33
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
4+
use crate::execution_v1::physical_plan::physical_limit::PhysicalLimit;
45
use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection;
56
use crate::execution_v1::physical_plan::physical_sort::PhysicalSort;
67
use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan;
@@ -14,6 +15,7 @@ pub(crate) mod physical_insert;
1415
pub(crate) mod physical_values;
1516
pub(crate) mod physical_filter;
1617
pub(crate) mod physical_sort;
18+
pub(crate) mod physical_limit;
1719

1820
#[derive(Debug)]
1921
pub enum PhysicalOperator {
@@ -23,5 +25,6 @@ pub enum PhysicalOperator {
2325
Projection(PhysicalProjection),
2426
Filter(PhysicalFilter),
2527
Sort(PhysicalSort),
26-
Values(PhysicalValues)
28+
Values(PhysicalValues),
29+
Limit(PhysicalLimit),
2730
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use crate::execution_v1::physical_plan::PhysicalOperator;
2+
use crate::planner::operator::limit::LimitOperator;
3+
4+
#[derive(Debug)]
5+
pub struct PhysicalLimit {
6+
pub(crate) op: LimitOperator,
7+
pub(crate) input: Box<PhysicalOperator>
8+
}

src/execution_v1/physical_plan/physical_plan_builder.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ use anyhow::anyhow;
1111
use anyhow::Result;
1212
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
1313
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
14+
use crate::execution_v1::physical_plan::physical_limit::PhysicalLimit;
1415
use crate::execution_v1::physical_plan::physical_sort::PhysicalSort;
1516
use crate::execution_v1::physical_plan::physical_values::PhysicalValues;
1617
use crate::planner::logical_insert_plan::LogicalInsertPlan;
1718
use crate::planner::operator::filter::FilterOperator;
1819
use crate::planner::operator::insert::InsertOperator;
20+
use crate::planner::operator::limit::LimitOperator;
1921
use crate::planner::operator::project::ProjectOperator;
2022
use crate::planner::operator::sort::SortOperator;
2123
use crate::planner::operator::values::ValuesOperator;
@@ -82,6 +84,7 @@ impl PhysicalPlanBuilder {
8284
Operator::Scan(scan) => Ok(self.build_physical_scan(scan.clone())),
8385
Operator::Filter(op) => self.build_physical_filter(plan, op),
8486
Operator::Sort(op) => self.build_physical_sort(plan, op),
87+
Operator::Limit(op)=>self.build_physical_limit(plan,op),
8588
_ => Err(anyhow!(format!(
8689
"Unsupported physical plan: {:?}",
8790
plan.operator
@@ -119,4 +122,13 @@ impl PhysicalPlanBuilder {
119122
input: Box::new(input),
120123
}))
121124
}
125+
126+
fn build_physical_limit(&mut self, plan: &LogicalSelectPlan,base : &LimitOperator)->Result<PhysicalOperator>{
127+
let input =self.build_select_logical_plan(plan.child(0)?)?;
128+
129+
Ok(PhysicalOperator::Limit(PhysicalLimit{
130+
op:base.clone(),
131+
input: Box::new(input),
132+
}))
133+
}
122134
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use arrow::record_batch::RecordBatch;
2+
use futures_async_stream::try_stream;
3+
use crate::execution_v1::ExecutorError;
4+
use crate::execution_v1::volcano_executor::BoxedExecutor;
5+
6+
pub struct Limit {}
7+
8+
impl Limit {
9+
#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
10+
pub async fn execute(offset: Option<usize>, limit: Option<usize>, input: BoxedExecutor) {
11+
let offset_val = offset.unwrap_or(0);
12+
if limit.is_some() && limit.unwrap() == 0 {
13+
return Ok(());
14+
}
15+
16+
let mut returned_count = 0;
17+
18+
#[for_await]
19+
for batch in input {
20+
let batch = batch?;
21+
22+
let cardinality = batch.num_rows() as usize;
23+
let limit_val = limit.unwrap_or(cardinality);
24+
25+
let start = returned_count.max(offset_val) - returned_count;
26+
let end = {
27+
// from total returned rows level, the total_end is end index of whole returned
28+
// rows level.
29+
let total_end = offset_val + limit_val;
30+
let current_batch_end = returned_count + cardinality;
31+
// we choose the min of total_end and current_batch_end as the end index of to
32+
// match limit semantics.
33+
let real_end = total_end.min(current_batch_end);
34+
// to calculate the end index of current batch
35+
real_end - returned_count
36+
};
37+
returned_count += cardinality;
38+
39+
// example: offset=1000, limit=2, cardinality=100
40+
// when first loop:
41+
// start = 0.max(1000)-0 = 1000
42+
// end = (1000+2).min(0+100)-0 = 100
43+
// so, start(1000) > end(100), we skip this loop batch.
44+
if start >= end {
45+
continue;
46+
}
47+
48+
if (start..end) == (0..cardinality) {
49+
yield batch;
50+
} else {
51+
let length = end - start;
52+
yield batch.slice(start as usize, length as usize);
53+
}
54+
55+
// dut to returned_count is always += cardinality, and returned_batch maybe slsliced,
56+
// so it will larger than real total_end.
57+
// example: offset=1, limit=4, cardinality=6, data=[(0..6)]
58+
// returned_count=6 > 1+4, meanwhile returned_batch size is 4 ([0..5])
59+
if returned_count >= offset_val + limit_val {
60+
break;
61+
}
62+
}
63+
}
64+
}

src/execution_v1/volcano_executor/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod insert;
55
mod values;
66
mod filter;
77
mod sort;
8+
mod limit;
89

910
use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection;
1011
use crate::execution_v1::physical_plan::PhysicalOperator;
@@ -16,11 +17,14 @@ use crate::storage::StorageImpl;
1617
use arrow::record_batch::RecordBatch;
1718
use futures::stream::BoxStream;
1819
use futures::TryStreamExt;
20+
use serde::de::Unexpected::Option;
1921
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
2022
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
23+
use crate::execution_v1::physical_plan::physical_limit::PhysicalLimit;
2124
use crate::execution_v1::physical_plan::physical_sort::PhysicalSort;
2225
use crate::execution_v1::volcano_executor::filter::Filter;
2326
use crate::execution_v1::volcano_executor::insert::Insert;
27+
use crate::execution_v1::volcano_executor::limit::Limit;
2428
use crate::execution_v1::volcano_executor::sort::Sort;
2529
use crate::execution_v1::volcano_executor::values::Values;
2630

@@ -69,6 +73,11 @@ impl VolcanoExecutor {
6973

7074
Sort::execute(op.sort_fields, op.limit, input)
7175
}
76+
PhysicalOperator::Limit(PhysicalLimit {op,input, ..}) =>{
77+
let input = self.build(*input);
78+
79+
Limit::execute(Some(op.offset), Some(op.limit), input)
80+
}
7281
}
7382
}
7483

src/planner/operator/limit.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ use super::Operator;
77
#[derive(Debug, PartialEq, Clone)]
88
pub struct LimitOperator {
99
pub offset: usize,
10-
pub count: usize,
10+
pub limit: usize,
1111
}
1212

1313
impl LimitOperator {
14-
pub fn new(offset: usize, count: usize, children: LogicalSelectPlan) -> LogicalSelectPlan {
14+
pub fn new(offset: usize, limit: usize, children: LogicalSelectPlan) -> LogicalSelectPlan {
1515
LogicalSelectPlan {
16-
operator: Arc::new(Operator::Limit(LimitOperator { offset, count })),
16+
operator: Arc::new(Operator::Limit(LimitOperator { offset, limit })),
1717
children: vec![Arc::new(children)],
1818
}
1919
}

0 commit comments

Comments
 (0)