Skip to content

Commit 3b8bc87

Browse files
authored
Merge branch 'KipData:main' into main
2 parents 27e655e + cab8ac8 commit 3b8bc87

16 files changed

Lines changed: 234 additions & 31 deletions

File tree

src/binder/select.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use sqlparser::ast::{
2323
Expr, Ident, Join, JoinConstraint, JoinOperator, Offset, OrderByExpr, Query, Select,
2424
SelectItem, SetExpr, TableFactor, TableWithJoins,
2525
};
26+
use crate::planner::operator::sort::{SortField, SortOperator};
2627

2728
impl Binder {
2829
pub(crate) fn bind_query(&mut self, query: &Query) -> Result<LogicalSelectPlan> {
@@ -90,9 +91,9 @@ impl Binder {
9091
// plan = self.bind_distinct(plan, select_list.clone())?;
9192
// }
9293

93-
// // if let Some(orderby) = having_orderby.1 {
94-
// // plan = self.bind_sort(plan, orderby)?;
95-
// // }
94+
if let Some(orderby) = having_orderby.1 {
95+
plan = self.bind_sort(plan, orderby);
96+
}
9697

9798
plan = self.bind_project(plan, select_list);
9899
Ok(plan)
@@ -281,6 +282,20 @@ impl Binder {
281282
}
282283
}
283284

285+
fn bind_sort(
286+
&mut self,
287+
children: LogicalSelectPlan,
288+
sort_fields: Vec<SortField>,
289+
) -> LogicalSelectPlan {
290+
LogicalSelectPlan {
291+
operator: Arc::new(Operator::Sort(SortOperator {
292+
sort_fields,
293+
limit: None,
294+
})),
295+
children: vec![Arc::new(children)],
296+
}
297+
}
298+
284299
fn bind_limit(
285300
&mut self,
286301
children: LogicalSelectPlan,
@@ -379,6 +394,18 @@ mod tests {
379394
plan_6
380395
);
381396

397+
let plan_7 = select_sql_run("select * from t1 limit 1")?;
398+
println!(
399+
"limit:\n {:#?}",
400+
plan_7
401+
);
402+
403+
let plan_8 = select_sql_run("select * from t1 offset 2")?;
404+
println!(
405+
"offset:\n {:#?}",
406+
plan_8
407+
);
408+
382409
Ok(())
383410
}
384411
}

src/db.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,16 +154,20 @@ mod test {
154154

155155
tokio_test::block_on(async move {
156156
let _ = kipsql.run("create table t1 (a int, b int)").await?;
157-
let _ = kipsql.run("insert into t1 values (1, 2), (3, 4)").await?;
157+
let _ = kipsql.run("insert into t1 values (1, 1), (2, 3), (5, 4)").await?;
158158

159159
println!("full:");
160160
let vec_batch_full_fields = kipsql.run("select * from t1").await?;
161161
print_batches(&vec_batch_full_fields)?;
162162

163163
println!("projection_and_filter:");
164-
let vec_batch_projection_a = kipsql.run("select a from t1 where a != 3 and a < b ").await?;
164+
let vec_batch_projection_a = kipsql.run("select a from t1 where a <= b order by a desc ").await?;
165165
print_batches(&vec_batch_projection_a)?;
166166

167+
println!("limit:");
168+
let vec_batch_limit=kipsql.run("select * from t1 limit 2 offset 1").await?;
169+
print_batches(&vec_batch_limit)?;
170+
167171
Ok(())
168172
})
169173
}

src/execution/physical/physical_plan_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl PhysicalPlanBuilder {
5959
Ok(PhysicalOperator::Limit(PhysicalLimit {
6060
plan_id: self.next_plan_id(),
6161
input: Arc::new(input),
62-
limit: limit.count,
62+
limit: limit.limit,
6363
offset: limit.offset,
6464
}))
6565
}

src/execution_v1/physical_plan/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::execution_v1::physical_plan::physical_create_table::PhysicalCreateTable;
22
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
33
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
4+
use crate::execution_v1::physical_plan::physical_limit::PhysicalLimit;
45
use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection;
6+
use crate::execution_v1::physical_plan::physical_sort::PhysicalSort;
57
use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan;
68
use crate::execution_v1::physical_plan::physical_values::PhysicalValues;
79

@@ -12,6 +14,8 @@ pub(crate) mod physical_table_scan;
1214
pub(crate) mod physical_insert;
1315
pub(crate) mod physical_values;
1416
pub(crate) mod physical_filter;
17+
pub(crate) mod physical_sort;
18+
pub(crate) mod physical_limit;
1519

1620
#[derive(Debug)]
1721
pub enum PhysicalOperator {
@@ -20,5 +24,7 @@ pub enum PhysicalOperator {
2024
TableScan(PhysicalTableScan),
2125
Projection(PhysicalProjection),
2226
Filter(PhysicalFilter),
23-
Values(PhysicalValues)
27+
Sort(PhysicalSort),
28+
Values(PhysicalValues),
29+
Limit(PhysicalLimit),
2430
}

src/execution_v1/physical_plan/physical_filter.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use crate::expression::ScalarExpression;
33

44
#[derive(Debug)]
55
pub struct PhysicalFilter {
6-
pub(crate) plan_id: u32,
76
pub(crate) predicate: ScalarExpression,
87
pub(crate) input: Box<PhysicalOperator>
98
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use crate::execution_v1::physical_plan::PhysicalOperator;
2+
use crate::planner::operator::limit::LimitOperator;
3+
4+
#[derive(Debug)]
5+
pub struct PhysicalLimit {
6+
pub(crate) op: LimitOperator,
7+
pub(crate) input: Box<PhysicalOperator>
8+
}

src/execution_v1/physical_plan/physical_plan_builder.rs

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,22 @@ use anyhow::anyhow;
1111
use anyhow::Result;
1212
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
1313
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
14+
use crate::execution_v1::physical_plan::physical_limit::PhysicalLimit;
15+
use crate::execution_v1::physical_plan::physical_sort::PhysicalSort;
1416
use crate::execution_v1::physical_plan::physical_values::PhysicalValues;
1517
use crate::planner::logical_insert_plan::LogicalInsertPlan;
1618
use crate::planner::operator::filter::FilterOperator;
1719
use crate::planner::operator::insert::InsertOperator;
20+
use crate::planner::operator::limit::LimitOperator;
1821
use crate::planner::operator::project::ProjectOperator;
22+
use crate::planner::operator::sort::SortOperator;
1923
use crate::planner::operator::values::ValuesOperator;
2024

21-
pub struct PhysicalPlanBuilder {
22-
plan_id: u32,
23-
}
25+
pub struct PhysicalPlanBuilder { }
2426

2527
impl PhysicalPlanBuilder {
2628
pub fn new() -> Self {
27-
PhysicalPlanBuilder { plan_id: 0 }
28-
}
29-
30-
fn next_plan_id(&mut self) -> u32 {
31-
let id = self.plan_id;
32-
self.plan_id += 1;
33-
id
29+
PhysicalPlanBuilder { }
3430
}
3531

3632
pub fn build_plan(&mut self, plan: &LogicalPlan) -> Result<PhysicalOperator> {
@@ -87,6 +83,8 @@ impl PhysicalPlanBuilder {
8783
Operator::Project(op) => self.build_physical_select_projection(plan, op),
8884
Operator::Scan(scan) => Ok(self.build_physical_scan(scan.clone())),
8985
Operator::Filter(op) => self.build_physical_filter(plan, op),
86+
Operator::Sort(op) => self.build_physical_sort(plan, op),
87+
Operator::Limit(op)=>self.build_physical_limit(plan,op),
9088
_ => Err(anyhow!(format!(
9189
"Unsupported physical plan: {:?}",
9290
plan.operator
@@ -98,23 +96,39 @@ impl PhysicalPlanBuilder {
9896
let input = self.build_select_logical_plan(plan.child(0)?)?;
9997

10098
Ok(PhysicalOperator::Projection(PhysicalProjection {
101-
plan_id: self.next_plan_id(),
10299
exprs: op.columns.clone(),
103100
input: Box::new(input),
104101
}))
105102
}
106103

107104
fn build_physical_scan(&mut self, base: ScanOperator) -> PhysicalOperator {
108-
PhysicalOperator::TableScan(PhysicalTableScan { plan_id: self.next_plan_id(), base })
105+
PhysicalOperator::TableScan(PhysicalTableScan { base })
109106
}
110107

111108
fn build_physical_filter(&mut self, plan: &LogicalSelectPlan, base: &FilterOperator) -> Result<PhysicalOperator> {
112109
let input = self.build_select_logical_plan(plan.child(0)?)?;
113110

114111
Ok(PhysicalOperator::Filter(PhysicalFilter {
115-
plan_id: 0,
116112
predicate: base.predicate.clone(),
117113
input: Box::new(input),
118114
}))
119115
}
116+
117+
fn build_physical_sort(&mut self, plan: &LogicalSelectPlan, base: &SortOperator) -> Result<PhysicalOperator> {
118+
let input = self.build_select_logical_plan(plan.child(0)?)?;
119+
120+
Ok(PhysicalOperator::Sort(PhysicalSort {
121+
op: base.clone(),
122+
input: Box::new(input),
123+
}))
124+
}
125+
126+
fn build_physical_limit(&mut self, plan: &LogicalSelectPlan,base : &LimitOperator)->Result<PhysicalOperator>{
127+
let input =self.build_select_logical_plan(plan.child(0)?)?;
128+
129+
Ok(PhysicalOperator::Limit(PhysicalLimit{
130+
op:base.clone(),
131+
input: Box::new(input),
132+
}))
133+
}
120134
}

src/execution_v1/physical_plan/physical_projection.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use crate::expression::ScalarExpression;
33

44
#[derive(Debug)]
55
pub struct PhysicalProjection {
6-
pub(crate) plan_id: u32,
76
pub(crate) exprs: Vec<ScalarExpression>,
87
pub(crate) input: Box<PhysicalOperator>
98
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use crate::execution_v1::physical_plan::PhysicalOperator;
2+
use crate::planner::operator::sort::SortOperator;
3+
4+
#[derive(Debug)]
5+
pub struct PhysicalSort {
6+
pub(crate) op: SortOperator,
7+
pub(crate) input: Box<PhysicalOperator>
8+
}

src/execution_v1/physical_plan/physical_table_scan.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,5 @@ use crate::planner::operator::scan::ScanOperator;
22

33
#[derive(Debug)]
44
pub struct PhysicalTableScan {
5-
pub(crate) plan_id: u32,
65
pub(crate) base: ScanOperator
76
}

0 commit comments

Comments
 (0)