Skip to content

Commit 6a66644

Browse files
authored
Merge pull request #37 from KKould/main
feat(sort): Implement Sort operator to support Order By clause
2 parents 6152eae + d0656aa commit 6a66644

12 files changed

Lines changed: 117 additions & 26 deletions

File tree

src/binder/select.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use sqlparser::ast::{
2323
Expr, Ident, Join, JoinConstraint, JoinOperator, Offset, OrderByExpr, Query, Select,
2424
SelectItem, SetExpr, TableFactor, TableWithJoins,
2525
};
26+
use crate::planner::operator::sort::{SortField, SortOperator};
2627

2728
impl Binder {
2829
pub(crate) fn bind_query(&mut self, query: &Query) -> Result<LogicalSelectPlan> {
@@ -90,9 +91,9 @@ impl Binder {
9091
// plan = self.bind_distinct(plan, select_list.clone())?;
9192
// }
9293

93-
// // if let Some(orderby) = having_orderby.1 {
94-
// // plan = self.bind_sort(plan, orderby)?;
95-
// // }
94+
if let Some(orderby) = having_orderby.1 {
95+
plan = self.bind_sort(plan, orderby);
96+
}
9697

9798
plan = self.bind_project(plan, select_list);
9899
Ok(plan)
@@ -281,6 +282,20 @@ impl Binder {
281282
}
282283
}
283284

285+
fn bind_sort(
286+
&mut self,
287+
children: LogicalSelectPlan,
288+
sort_fields: Vec<SortField>,
289+
) -> LogicalSelectPlan {
290+
LogicalSelectPlan {
291+
operator: Arc::new(Operator::Sort(SortOperator {
292+
sort_fields,
293+
limit: None,
294+
})),
295+
children: vec![Arc::new(children)],
296+
}
297+
}
298+
284299
fn bind_limit(
285300
&mut self,
286301
children: LogicalSelectPlan,

src/db.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,14 +154,14 @@ mod test {
154154

155155
tokio_test::block_on(async move {
156156
let _ = kipsql.run("create table t1 (a int, b int)").await?;
157-
let _ = kipsql.run("insert into t1 values (1, 2), (3, 4)").await?;
157+
let _ = kipsql.run("insert into t1 values (1, 1), (2, 3), (5, 4)").await?;
158158

159159
println!("full:");
160160
let vec_batch_full_fields = kipsql.run("select * from t1").await?;
161161
print_batches(&vec_batch_full_fields)?;
162162

163163
println!("projection_and_filter:");
164-
let vec_batch_projection_a = kipsql.run("select a from t1 where a != 3 and a < b ").await?;
164+
let vec_batch_projection_a = kipsql.run("select a from t1 where a <= b order by a desc ").await?;
165165
print_batches(&vec_batch_projection_a)?;
166166

167167
Ok(())

src/execution_v1/physical_plan/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::execution_v1::physical_plan::physical_create_table::PhysicalCreateTab
22
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
33
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
44
use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection;
5+
use crate::execution_v1::physical_plan::physical_sort::PhysicalSort;
56
use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan;
67
use crate::execution_v1::physical_plan::physical_values::PhysicalValues;
78

@@ -12,6 +13,7 @@ pub(crate) mod physical_table_scan;
1213
pub(crate) mod physical_insert;
1314
pub(crate) mod physical_values;
1415
pub(crate) mod physical_filter;
16+
pub(crate) mod physical_sort;
1517

1618
#[derive(Debug)]
1719
pub enum PhysicalOperator {
@@ -20,5 +22,6 @@ pub enum PhysicalOperator {
2022
TableScan(PhysicalTableScan),
2123
Projection(PhysicalProjection),
2224
Filter(PhysicalFilter),
25+
Sort(PhysicalSort),
2326
Values(PhysicalValues)
2427
}

src/execution_v1/physical_plan/physical_filter.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use crate::expression::ScalarExpression;
33

44
#[derive(Debug)]
55
pub struct PhysicalFilter {
6-
pub(crate) plan_id: u32,
76
pub(crate) predicate: ScalarExpression,
87
pub(crate) input: Box<PhysicalOperator>
98
}

src/execution_v1/physical_plan/physical_plan_builder.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,20 @@ use anyhow::anyhow;
1111
use anyhow::Result;
1212
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
1313
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
14+
use crate::execution_v1::physical_plan::physical_sort::PhysicalSort;
1415
use crate::execution_v1::physical_plan::physical_values::PhysicalValues;
1516
use crate::planner::logical_insert_plan::LogicalInsertPlan;
1617
use crate::planner::operator::filter::FilterOperator;
1718
use crate::planner::operator::insert::InsertOperator;
1819
use crate::planner::operator::project::ProjectOperator;
20+
use crate::planner::operator::sort::SortOperator;
1921
use crate::planner::operator::values::ValuesOperator;
2022

21-
pub struct PhysicalPlanBuilder {
22-
plan_id: u32,
23-
}
23+
pub struct PhysicalPlanBuilder { }
2424

2525
impl PhysicalPlanBuilder {
2626
pub fn new() -> Self {
27-
PhysicalPlanBuilder { plan_id: 0 }
28-
}
29-
30-
fn next_plan_id(&mut self) -> u32 {
31-
let id = self.plan_id;
32-
self.plan_id += 1;
33-
id
27+
PhysicalPlanBuilder { }
3428
}
3529

3630
pub fn build_plan(&mut self, plan: &LogicalPlan) -> Result<PhysicalOperator> {
@@ -87,6 +81,7 @@ impl PhysicalPlanBuilder {
8781
Operator::Project(op) => self.build_physical_select_projection(plan, op),
8882
Operator::Scan(scan) => Ok(self.build_physical_scan(scan.clone())),
8983
Operator::Filter(op) => self.build_physical_filter(plan, op),
84+
Operator::Sort(op) => self.build_physical_sort(plan, op),
9085
_ => Err(anyhow!(format!(
9186
"Unsupported physical plan: {:?}",
9287
plan.operator
@@ -98,23 +93,30 @@ impl PhysicalPlanBuilder {
9893
let input = self.build_select_logical_plan(plan.child(0)?)?;
9994

10095
Ok(PhysicalOperator::Projection(PhysicalProjection {
101-
plan_id: self.next_plan_id(),
10296
exprs: op.columns.clone(),
10397
input: Box::new(input),
10498
}))
10599
}
106100

107101
fn build_physical_scan(&mut self, base: ScanOperator) -> PhysicalOperator {
108-
PhysicalOperator::TableScan(PhysicalTableScan { plan_id: self.next_plan_id(), base })
102+
PhysicalOperator::TableScan(PhysicalTableScan { base })
109103
}
110104

111105
fn build_physical_filter(&mut self, plan: &LogicalSelectPlan, base: &FilterOperator) -> Result<PhysicalOperator> {
112106
let input = self.build_select_logical_plan(plan.child(0)?)?;
113107

114108
Ok(PhysicalOperator::Filter(PhysicalFilter {
115-
plan_id: 0,
116109
predicate: base.predicate.clone(),
117110
input: Box::new(input),
118111
}))
119112
}
113+
114+
fn build_physical_sort(&mut self, plan: &LogicalSelectPlan, base: &SortOperator) -> Result<PhysicalOperator> {
115+
let input = self.build_select_logical_plan(plan.child(0)?)?;
116+
117+
Ok(PhysicalOperator::Sort(PhysicalSort {
118+
op: base.clone(),
119+
input: Box::new(input),
120+
}))
121+
}
120122
}

src/execution_v1/physical_plan/physical_projection.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use crate::expression::ScalarExpression;
33

44
#[derive(Debug)]
55
pub struct PhysicalProjection {
6-
pub(crate) plan_id: u32,
76
pub(crate) exprs: Vec<ScalarExpression>,
87
pub(crate) input: Box<PhysicalOperator>
98
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use crate::execution_v1::physical_plan::PhysicalOperator;
2+
use crate::planner::operator::sort::SortOperator;
3+
4+
#[derive(Debug)]
5+
pub struct PhysicalSort {
6+
pub(crate) op: SortOperator,
7+
pub(crate) input: Box<PhysicalOperator>
8+
}

src/execution_v1/physical_plan/physical_table_scan.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,5 @@ use crate::planner::operator::scan::ScanOperator;
22

33
#[derive(Debug)]
44
pub struct PhysicalTableScan {
5-
pub(crate) plan_id: u32,
65
pub(crate) base: ScanOperator
76
}

src/execution_v1/volcano_executor/insert.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ impl Insert {
2323
let mut arrays = batch.columns().to_vec();
2424
let col_len = arrays[0].len();
2525

26-
arrays.reverse();
27-
2826
let full_arrays = table.all_columns()
2927
.into_iter()
3028
.map(|(_, col_catalog)| {

src/execution_v1/volcano_executor/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod table_scan;
44
mod insert;
55
mod values;
66
mod filter;
7+
mod sort;
78

89
use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection;
910
use crate::execution_v1::physical_plan::PhysicalOperator;
@@ -17,8 +18,10 @@ use futures::stream::BoxStream;
1718
use futures::TryStreamExt;
1819
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
1920
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
21+
use crate::execution_v1::physical_plan::physical_sort::PhysicalSort;
2022
use crate::execution_v1::volcano_executor::filter::Filter;
2123
use crate::execution_v1::volcano_executor::insert::Insert;
24+
use crate::execution_v1::volcano_executor::sort::Sort;
2225
use crate::execution_v1::volcano_executor::values::Values;
2326

2427
pub type BoxedExecutor = BoxStream<'static, Result<RecordBatch, ExecutorError>>;
@@ -61,6 +64,11 @@ impl VolcanoExecutor {
6164

6265
Filter::execute(predicate, input)
6366
}
67+
PhysicalOperator::Sort(PhysicalSort {op, input, ..}) => {
68+
let input = self.build(*input);
69+
70+
Sort::execute(op.sort_fields, op.limit, input)
71+
}
6472
}
6573
}
6674

0 commit comments

Comments
 (0)