Skip to content

Commit 27e655e

Browse files
authored
Merge branch 'KipData:main' into main
2 parents 3d1c39b + 6152eae commit 27e655e

11 files changed

Lines changed: 110 additions & 49 deletions

File tree

src/binder/expr.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,22 @@ impl Binder {
9595
) -> Result<ScalarExpression> {
9696
let left_expr = Box::new(self.bind_expr(left)?);
9797
let right_expr = Box::new(self.bind_expr(right)?);
98-
let ty = LogicalType::max_logical_type(
99-
&left_expr.return_type(),
100-
&right_expr.return_type()
101-
)?;
98+
99+
let ty = match op {
100+
BinaryOperator::Plus | BinaryOperator::Minus | BinaryOperator::Multiply |
101+
BinaryOperator::Divide | BinaryOperator::Modulo => {
102+
LogicalType::max_logical_type(
103+
&left_expr.return_type(),
104+
&right_expr.return_type()
105+
)?
106+
}
107+
BinaryOperator::Gt | BinaryOperator::Lt | BinaryOperator::GtEq |
108+
BinaryOperator::LtEq | BinaryOperator::Eq | BinaryOperator::NotEq |
109+
BinaryOperator::And | BinaryOperator::Or | BinaryOperator::Xor => {
110+
LogicalType::Boolean
111+
},
112+
_ => todo!()
113+
};
102114

103115
Ok(ScalarExpression::Binary {
104116
op: (op.clone()).into(),

src/binder/select.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -324,13 +324,8 @@ mod tests {
324324
use super::*;
325325
use crate::binder::BinderContext;
326326
use crate::catalog::{ColumnCatalog, ColumnDesc, RootCatalog};
327-
use crate::expression::agg::AggKind;
328-
use crate::expression::BinaryOperator::{Gt, Minus};
329-
use crate::expression::ScalarExpression::{AggCall, Binary, ColumnRef, Constant, InputRef};
330327
use crate::planner::LogicalPlan;
331-
use crate::planner::operator::aggregate::AggregateOperator;
332328
use crate::types::LogicalType::Integer;
333-
use crate::types::value::DataValue::Int32;
334329

335330
fn test_root_catalog() -> Result<RootCatalog> {
336331
let mut root = RootCatalog::new();

src/db.rs

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -45,45 +45,18 @@ impl Database {
4545
/// Limit(1)
4646
/// Project(a,b)
4747
let logical_plan = binder.bind(&stmts[0])?;
48-
println!("logic plan: {:#?}", logical_plan);
48+
// println!("logic plan: {:#?}", logical_plan);
4949

5050
let mut builder = PhysicalPlanBuilder::new();
5151
let operator = builder.build_plan(&logical_plan)?;
52-
println!("operator: {:#?}", operator);
52+
// println!("operator: {:#?}", operator);
5353

5454
let storage = StorageImpl::InMemoryStorage(self.storage.clone());
5555
let executor = VolcanoExecutor::new(storage);
5656

5757
let mut stream = executor.build(operator);
5858

5959
Ok(VolcanoExecutor::try_collect(&mut stream).await?)
60-
61-
// // let physical_planner = PhysicalPlaner::default();
62-
// // let executor_builder = ExecutorBuilder::new(self.env.clone());
63-
//
64-
// // let physical_plan = physical_planner.plan(logical_plan)?;
65-
// // let executor = executor_builder.build(physical_plan)?;
66-
// // futures::executor::block_on(executor).unwrap();
67-
//
68-
// /// THE FOLLOWING CODE IS FOR TESTING ONLY
69-
// /// THE FINAL CODE WILL BE IN executor MODULE
70-
// if let LogicalPlan::CreateTable(plan) = logical_plan {
71-
// let mut columns = Vec::new();
72-
// plan.columns.iter().for_each(|c| {
73-
// columns.push(ColumnCatalog::new(c.0.clone(), c.1, c.2.clone()));
74-
// });
75-
// let table_name = plan.table_name.clone();
76-
// // columns->batch record
77-
// let mut data = Vec::new();
78-
//
79-
// columns.iter().for_each(|c| {
80-
// let batch = RecordBatch::new_empty(Arc::new(Schema::new(vec![c.to_field()])));
81-
// data.push(batch);
82-
// });
83-
//
84-
// self.storage
85-
// .create_table(IdGenerator::build(), table_name.as_str(), data)?;
86-
// }
8760
}
8861
}
8962

@@ -126,6 +99,7 @@ mod test {
12699
use arrow::compute::concat_batches;
127100
use arrow::datatypes::Schema;
128101
use arrow::record_batch::RecordBatch;
102+
use arrow::util::pretty::print_batches;
129103
use itertools::Itertools;
130104
use crate::catalog::{ColumnCatalog, ColumnDesc};
131105
use crate::db::Database;
@@ -173,19 +147,22 @@ mod test {
173147
Ok(())
174148
})
175149
}
150+
176151
#[test]
177152
fn test_crud_sql() -> anyhow::Result<()> {
178-
let database = Database::new_on_mem();
153+
let kipsql = Database::new_on_mem();
179154

180155
tokio_test::block_on(async move {
181-
let _ = database.run("create table t1 (a int, b boolean)").await?;
182-
let _ = database.run("insert into t1 values (1, true), (2, false)").await?;
183-
let vec_batch = database.run("select * from t1").await?;
184-
185-
let table = database.storage
186-
.get_catalog()
187-
.get_table(0).unwrap().clone();
188-
println!("{:#?}", concat_batches(&table.schema(), &vec_batch));
156+
let _ = kipsql.run("create table t1 (a int, b int)").await?;
157+
let _ = kipsql.run("insert into t1 values (1, 2), (3, 4)").await?;
158+
159+
println!("full:");
160+
let vec_batch_full_fields = kipsql.run("select * from t1").await?;
161+
print_batches(&vec_batch_full_fields)?;
162+
163+
println!("projection_and_filter:");
164+
let vec_batch_projection_a = kipsql.run("select a from t1 where a != 3 and a < b ").await?;
165+
print_batches(&vec_batch_projection_a)?;
189166

190167
Ok(())
191168
})

src/execution_v1/physical_plan/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::execution_v1::physical_plan::physical_create_table::PhysicalCreateTable;
2+
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
23
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
34
use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection;
45
use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan;
@@ -10,12 +11,14 @@ pub(crate) mod physical_projection;
1011
pub(crate) mod physical_table_scan;
1112
pub(crate) mod physical_insert;
1213
pub(crate) mod physical_values;
14+
pub(crate) mod physical_filter;
1315

1416
#[derive(Debug)]
1517
pub enum PhysicalOperator {
1618
Insert(PhysicalInsert),
1719
CreateTable(PhysicalCreateTable),
1820
TableScan(PhysicalTableScan),
1921
Projection(PhysicalProjection),
22+
Filter(PhysicalFilter),
2023
Values(PhysicalValues)
2124
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
use crate::execution_v1::physical_plan::PhysicalOperator;
2+
use crate::expression::ScalarExpression;
3+
4+
#[derive(Debug)]
5+
pub struct PhysicalFilter {
6+
pub(crate) plan_id: u32,
7+
pub(crate) predicate: ScalarExpression,
8+
pub(crate) input: Box<PhysicalOperator>
9+
}

src/execution_v1/physical_plan/physical_plan_builder.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ use crate::planner::operator::Operator;
99
use crate::planner::LogicalPlan;
1010
use anyhow::anyhow;
1111
use anyhow::Result;
12+
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
1213
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
1314
use crate::execution_v1::physical_plan::physical_values::PhysicalValues;
1415
use crate::planner::logical_insert_plan::LogicalInsertPlan;
16+
use crate::planner::operator::filter::FilterOperator;
1517
use crate::planner::operator::insert::InsertOperator;
1618
use crate::planner::operator::project::ProjectOperator;
1719
use crate::planner::operator::values::ValuesOperator;
@@ -84,6 +86,7 @@ impl PhysicalPlanBuilder {
8486
match plan.operator.as_ref() {
8587
Operator::Project(op) => self.build_physical_select_projection(plan, op),
8688
Operator::Scan(scan) => Ok(self.build_physical_scan(scan.clone())),
89+
Operator::Filter(op) => self.build_physical_filter(plan, op),
8790
_ => Err(anyhow!(format!(
8891
"Unsupported physical plan: {:?}",
8992
plan.operator
@@ -104,4 +107,14 @@ impl PhysicalPlanBuilder {
104107
fn build_physical_scan(&mut self, base: ScanOperator) -> PhysicalOperator {
105108
PhysicalOperator::TableScan(PhysicalTableScan { plan_id: self.next_plan_id(), base })
106109
}
110+
111+
fn build_physical_filter(&mut self, plan: &LogicalSelectPlan, base: &FilterOperator) -> Result<PhysicalOperator> {
112+
let input = self.build_select_logical_plan(plan.child(0)?)?;
113+
114+
Ok(PhysicalOperator::Filter(PhysicalFilter {
115+
plan_id: 0,
116+
predicate: base.predicate.clone(),
117+
input: Box::new(input),
118+
}))
119+
}
107120
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use arrow::array::as_boolean_array;
2+
use arrow::compute::filter_record_batch;
3+
use arrow::record_batch::RecordBatch;
4+
use futures_async_stream::try_stream;
5+
use crate::execution_v1::volcano_executor::BoxedExecutor;
6+
use crate::execution_v1::ExecutorError;
7+
use crate::expression::ScalarExpression;
8+
9+
pub struct Filter { }
10+
11+
impl Filter {
12+
#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
13+
pub async fn execute(predicate: ScalarExpression, input: BoxedExecutor) {
14+
#[for_await]
15+
for batch in input {
16+
let batch = batch?;
17+
let predicate = predicate.eval_column(&batch)?;
18+
19+
yield filter_record_batch(&batch, as_boolean_array(&predicate))?;
20+
}
21+
}
22+
}

src/execution_v1/volcano_executor/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod projection;
33
mod table_scan;
44
mod insert;
55
mod values;
6+
mod filter;
67

78
use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection;
89
use crate::execution_v1::physical_plan::PhysicalOperator;
@@ -14,7 +15,9 @@ use crate::storage::StorageImpl;
1415
use arrow::record_batch::RecordBatch;
1516
use futures::stream::BoxStream;
1617
use futures::TryStreamExt;
18+
use crate::execution_v1::physical_plan::physical_filter::PhysicalFilter;
1719
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
20+
use crate::execution_v1::volcano_executor::filter::Filter;
1821
use crate::execution_v1::volcano_executor::insert::Insert;
1922
use crate::execution_v1::volcano_executor::values::Values;
2023

@@ -38,6 +41,7 @@ impl VolcanoExecutor {
3841
}
3942
PhysicalOperator::Projection(PhysicalProjection { input, exprs, .. }) => {
4043
let input = self.build(*input);
44+
4145
Projection::execute(exprs, input)
4246
}
4347
PhysicalOperator::CreateTable(op) => match &self.storage {
@@ -51,7 +55,12 @@ impl VolcanoExecutor {
5155
Insert::execute(table_name, input, storage.clone()),
5256
}
5357
}
54-
PhysicalOperator::Values(op) => Values::execute(op)
58+
PhysicalOperator::Values(op) => Values::execute(op),
59+
PhysicalOperator::Filter(PhysicalFilter { predicate, input, .. }) => {
60+
let input = self.build(*input);
61+
62+
Filter::execute(predicate, input)
63+
}
5564
}
5665
}
5766

src/execution_v1/volcano_executor/table_scan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ impl TableScan {
1313
pub async fn execute(plan: PhysicalTableScan, storage: impl Storage) {
1414
// TODO: sort_fields, pre_where, limit
1515
let ScanOperator { table_ref_id, .. } = plan.base;
16-
println!("ref id: {}", table_ref_id);
16+
1717
let table = storage.get_table(table_ref_id)?;
1818
let mut transaction = table.read(
1919
None,

src/expression/cast.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use arrow::array::{Array, BooleanArray};
2+
use anyhow::Result;
3+
4+
/// Downcast an Arrow Array to a concrete type
5+
macro_rules! downcast_value {
6+
($Value:expr, $Type:ident) => {{
7+
use std::any::type_name;
8+
9+
if let Some(value) = $Value.as_any().downcast_ref::<$Type>() {
10+
Ok(value)
11+
} else {
12+
Err(anyhow::anyhow!("could not cast value to {}", type_name::<$Type>()))
13+
}
14+
}};
15+
}
16+
17+
/// Downcast ArrayRef to BooleanArray
18+
pub fn as_boolean_array(array: &dyn Array) -> Result<&BooleanArray> {
19+
downcast_value!(array, BooleanArray)
20+
}

0 commit comments

Comments
 (0)