Skip to content

Commit 11401fe

Browse files
committed
feat(filter): implement the Filter physical operator, support similar select c1 from t1 where c1 > 1
1 parent 24cbc11 commit 11401fe

9 files changed

Lines changed: 95 additions & 6 deletions

File tree

src/binder/expr.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,22 @@ impl Binder {
9595
) -> Result<ScalarExpression> {
9696
let left_expr = Box::new(self.bind_expr(left)?);
9797
let right_expr = Box::new(self.bind_expr(right)?);
98-
let ty = LogicalType::max_logical_type(
99-
&left_expr.return_type(),
100-
&right_expr.return_type()
101-
)?;
98+
99+
let ty = match op {
100+
BinaryOperator::Plus | BinaryOperator::Minus | BinaryOperator::Multiply |
101+
BinaryOperator::Divide | BinaryOperator::Modulo => {
102+
LogicalType::max_logical_type(
103+
&left_expr.return_type(),
104+
&right_expr.return_type()
105+
)?
106+
}
107+
BinaryOperator::Gt | BinaryOperator::Lt | BinaryOperator::GtEq |
108+
BinaryOperator::LtEq | BinaryOperator::Eq | BinaryOperator::NotEq |
109+
BinaryOperator::And | BinaryOperator::Or | BinaryOperator::Xor => {
110+
LogicalType::Boolean
111+
},
112+
_ => todo!()
113+
};
102114

103115
Ok(ScalarExpression::Binary {
104116
op: (op.clone()).into(),

src/db.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ mod test {
180180
tokio_test::block_on(async move {
181181
let _ = database.run("create table t1 (a int, b boolean)").await?;
182182
let _ = database.run("insert into t1 values (1, true), (2, false)").await?;
183-
let vec_batch = database.run("select * from t1").await?;
183+
let vec_batch = database.run("select * from t1 where a = 1 or b = false").await?;
184184

185185
let table = database.storage
186186
.get_catalog()

src/execution_v1/physical_plan/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::execution_v1::physical_plan::physical_create_table::PhysicalCreateTable;
2+
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
23
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
34
use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection;
45
use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan;
@@ -10,12 +11,14 @@ pub(crate) mod physical_projection;
1011
pub(crate) mod physical_table_scan;
1112
pub(crate) mod physical_insert;
1213
pub(crate) mod physical_values;
14+
pub(crate) mod physical_filter;
1315

1416
#[derive(Debug)]
1517
pub enum PhysicalOperator {
1618
Insert(PhysicalInsert),
1719
CreateTable(PhysicalCreateTable),
1820
TableScan(PhysicalTableScan),
1921
Projection(PhysicalProjection),
22+
Filter(PhysicalFilter),
2023
Values(PhysicalValues)
2124
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
use crate::execution_v1::physical_plan::PhysicalOperator;
2+
use crate::expression::ScalarExpression;
3+
4+
#[derive(Debug)]
5+
pub struct PhysicalFilter {
6+
pub(crate) plan_id: u32,
7+
pub(crate) predicate: ScalarExpression,
8+
pub(crate) input: Box<PhysicalOperator>
9+
}

src/execution_v1/physical_plan/physical_plan_builder.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ use crate::planner::operator::Operator;
99
use crate::planner::LogicalPlan;
1010
use anyhow::anyhow;
1111
use anyhow::Result;
12+
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
1213
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
1314
use crate::execution_v1::physical_plan::physical_values::PhysicalValues;
1415
use crate::planner::logical_insert_plan::LogicalInsertPlan;
16+
use crate::planner::operator::filter::FilterOperator;
1517
use crate::planner::operator::insert::InsertOperator;
1618
use crate::planner::operator::project::ProjectOperator;
1719
use crate::planner::operator::values::ValuesOperator;
@@ -84,6 +86,7 @@ impl PhysicalPlanBuilder {
8486
match plan.operator.as_ref() {
8587
Operator::Project(op) => self.build_physical_select_projection(plan, op),
8688
Operator::Scan(scan) => Ok(self.build_physical_scan(scan.clone())),
89+
Operator::Filter(op) => self.build_physical_filter(plan, op),
8790
_ => Err(anyhow!(format!(
8891
"Unsupported physical plan: {:?}",
8992
plan.operator
@@ -104,4 +107,14 @@ impl PhysicalPlanBuilder {
104107
fn build_physical_scan(&mut self, base: ScanOperator) -> PhysicalOperator {
105108
PhysicalOperator::TableScan(PhysicalTableScan { plan_id: self.next_plan_id(), base })
106109
}
110+
111+
fn build_physical_filter(&mut self, plan: &LogicalSelectPlan, base: &FilterOperator) -> Result<PhysicalOperator> {
112+
let input = self.build_select_logical_plan(plan.child(0)?)?;
113+
114+
Ok(PhysicalOperator::Filter(PhysicalFilter {
115+
plan_id: 0,
116+
predicate: base.predicate.clone(),
117+
input: Box::new(input),
118+
}))
119+
}
107120
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use arrow::array::as_boolean_array;
2+
use arrow::compute::filter_record_batch;
3+
use arrow::record_batch::RecordBatch;
4+
use futures_async_stream::try_stream;
5+
use crate::execution_v1::volcano_executor::BoxedExecutor;
6+
use crate::execution_v1::ExecutorError;
7+
use crate::expression::ScalarExpression;
8+
9+
pub struct Filter { }
10+
11+
impl Filter {
12+
#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
13+
pub async fn execute(predicate: ScalarExpression, input: BoxedExecutor) {
14+
#[for_await]
15+
for batch in input {
16+
let batch = batch?;
17+
let predicate = predicate.eval_column(&batch)?;
18+
19+
yield filter_record_batch(&batch, as_boolean_array(&predicate))?;
20+
}
21+
}
22+
}

src/execution_v1/volcano_executor/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod projection;
33
mod table_scan;
44
mod insert;
55
mod values;
6+
mod filter;
67

78
use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection;
89
use crate::execution_v1::physical_plan::PhysicalOperator;
@@ -14,7 +15,9 @@ use crate::storage::StorageImpl;
1415
use arrow::record_batch::RecordBatch;
1516
use futures::stream::BoxStream;
1617
use futures::TryStreamExt;
18+
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
1719
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
20+
use crate::execution_v1::volcano_executor::filter::Filter;
1821
use crate::execution_v1::volcano_executor::insert::Insert;
1922
use crate::execution_v1::volcano_executor::values::Values;
2023

@@ -38,6 +41,7 @@ impl VolcanoExecutor {
3841
}
3942
PhysicalOperator::Projection(PhysicalProjection { input, exprs, .. }) => {
4043
let input = self.build(*input);
44+
4145
Projection::execute(exprs, input)
4246
}
4347
PhysicalOperator::CreateTable(op) => match &self.storage {
@@ -51,7 +55,12 @@ impl VolcanoExecutor {
5155
Insert::execute(table_name, input, storage.clone()),
5256
}
5357
}
54-
PhysicalOperator::Values(op) => Values::execute(op)
58+
PhysicalOperator::Values(op) => Values::execute(op),
59+
PhysicalOperator::Filter(PhysicalFilter { predicate, input, .. }) => {
60+
let input = self.build(*input);
61+
62+
Filter::execute(predicate, input)
63+
}
5564
}
5665
}
5766

src/expression/cast.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use arrow::array::{Array, BooleanArray};
2+
use anyhow::Result;
3+
4+
/// Downcast an Arrow Array to a concrete type
5+
macro_rules! downcast_value {
6+
($Value:expr, $Type:ident) => {{
7+
use std::any::type_name;
8+
9+
if let Some(value) = $Value.as_any().downcast_ref::<$Type>() {
10+
Ok(value)
11+
} else {
12+
Err(anyhow::anyhow!("could not cast value to {}", type_name::<$Type>()))
13+
}
14+
}};
15+
}
16+
17+
/// Downcast ArrayRef to BooleanArray
18+
pub fn as_boolean_array(array: &dyn Array) -> Result<&BooleanArray> {
19+
downcast_value!(array, BooleanArray)
20+
}

src/expression/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::types::LogicalType;
1010
pub mod agg;
1111
mod evaluator;
1212
mod array_compute;
13+
mod cast;
1314

1415
/// ScalarExpression represnet all scalar expression in SQL.
1516
/// SELECT a+1, b FROM t1.

0 commit comments

Comments
 (0)