Skip to content

Commit 0ea995f

Browse files
committed
feat(create): create table in volcano
- bind column ref
1 parent 50332a0 commit 0ea995f

13 files changed

Lines changed: 182 additions & 19 deletions

File tree

rust-toolchain

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
nightly

src/binder/expr.rs

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,78 @@
1+
use crate::binder::BindError;
12
use anyhow::Result;
2-
use sqlparser::ast::Expr;
3+
use itertools::Itertools;
4+
use sqlparser::ast::{Expr, Ident};
5+
use std::slice;
36

47
use super::Binder;
58
use crate::expression::ScalarExpression;
69

710
impl Binder {
811
pub(crate) fn bind_expr(&mut self, expr: &Expr) -> Result<ScalarExpression> {
9-
todo!()
12+
match expr {
13+
Expr::Identifier(ident) => {
14+
self.bind_column_ref_from_identifiers(slice::from_ref(ident))
15+
}
16+
_ => {
17+
todo!()
18+
}
19+
}
20+
}
21+
22+
pub fn bind_column_ref_from_identifiers(
23+
&mut self,
24+
idents: &[Ident],
25+
) -> Result<ScalarExpression> {
26+
let idents = idents
27+
.iter()
28+
.map(|ident| Ident::new(ident.value.to_lowercase()))
29+
.collect_vec();
30+
let (_schema_name, table_name, column_name) = match idents.as_slice() {
31+
[column] => (None, None, &column.value),
32+
[table, column] => (None, Some(&table.value), &column.value),
33+
[schema, table, column] => (Some(&schema.value), Some(&table.value), &column.value),
34+
_ => {
35+
return Err(BindError::InvalidColumn(
36+
idents
37+
.iter()
38+
.map(|ident| ident.value.clone())
39+
.join(".")
40+
.to_string(),
41+
)
42+
.into())
43+
}
44+
};
45+
46+
if let Some(table) = table_name {
47+
let table_catalog = self
48+
.context
49+
.catalog
50+
.get_table_by_name(table)
51+
.ok_or_else(|| BindError::InvalidTable(table.to_string()))?;
52+
53+
let column_catalog = table_catalog
54+
.get_column_by_name(column_name)
55+
.ok_or_else(|| BindError::InvalidColumn(column_name.to_string()))?;
56+
Ok(ScalarExpression::ColumnRef(column_catalog.clone()))
57+
} else {
58+
// handle col syntax
59+
let mut got_column = None;
60+
for table_catalog in self.context.catalog.tables.values() {
61+
if let Some(column_catalog) = table_catalog.get_column_by_name(column_name) {
62+
if got_column.is_some() {
63+
return Err(BindError::InvalidColumn(column_name.to_string()).into());
64+
}
65+
got_column = Some(column_catalog);
66+
}
67+
}
68+
if got_column.is_none() {
69+
if let Some(expr) = self.context.aliases.get(column_name) {
70+
return Ok(expr.clone());
71+
}
72+
}
73+
let column_catalog =
74+
got_column.ok_or_else(|| BindError::InvalidColumn(column_name.to_string()))?;
75+
Ok(ScalarExpression::ColumnRef(column_catalog.clone()))
76+
}
1077
}
1178
}

src/catalog/root.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use crate::types::TableId;
55

66
#[derive(Debug, Clone)]
77
pub struct RootCatalog {
8-
table_idxs: BTreeMap<String, TableId>,
9-
tables: BTreeMap<TableId, TableCatalog>,
8+
pub table_idxs: BTreeMap<String, TableId>,
9+
pub tables: BTreeMap<TableId, TableCatalog>,
1010
}
1111

1212
impl Default for RootCatalog {

src/catalog/table.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ impl TableCatalog {
2222
self.column_idxs.get(name).cloned()
2323
}
2424

25+
pub(crate) fn get_column_by_name(&self, name: &str) -> Option<&ColumnCatalog> {
26+
let id = self.column_idxs.get(name)?;
27+
self.columns.get(id)
28+
}
29+
2530
pub(crate) fn contains_column(&self, name: &str) -> bool {
2631
self.column_idxs.contains_key(name)
2732
}

src/db.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,18 @@ mod test {
164164
let batch = database.run("select * from t1").await?;
165165
println!("{:#?}", batch);
166166

167+
Ok(())
168+
})
169+
}
170+
#[test]
171+
fn test_sql() -> anyhow::Result<()> {
172+
let database = Database::new_on_mem();
173+
174+
tokio_test::block_on(async move {
175+
let _batch = database.run("create table t1 (a int)").await?;
176+
let batch = database.run("select a from t1").await?;
177+
println!("{:#?}", batch);
178+
167179
Ok(())
168180
})
169181
}
Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
use crate::execution_v1::physical_plan::physical_create_table::PhysicalCreateTable;
12
use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection;
23
use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan;
34

4-
pub(crate) mod physical_table_scan;
5-
pub(crate) mod physical_projection;
5+
pub(crate) mod physical_create_table;
66
pub(crate) mod physical_plan_builder;
7+
pub(crate) mod physical_projection;
8+
pub(crate) mod physical_table_scan;
79

810
pub enum PhysicalOperator {
11+
CreateTable(PhysicalCreateTable),
912
TableScan(PhysicalTableScan),
10-
Projection(PhysicalProjection)
13+
Projection(PhysicalProjection),
1114
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
use crate::catalog::ColumnDesc;
2+
use crate::execution_v1::physical_plan::PhysicalOperator;
3+
use crate::expression::ScalarExpression;
4+
5+
pub struct PhysicalCreateTable {
6+
/// Table name to insert to
7+
pub table_name: String,
8+
/// List of columns of the table
9+
pub columns: Vec<(String, bool, ColumnDesc)>,
10+
}

src/execution_v1/physical_plan/physical_plan_builder.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
use std::sync::Arc;
2-
use anyhow::anyhow;
3-
use anyhow::Result;
1+
use crate::execution_v1::physical_plan::physical_create_table::PhysicalCreateTable;
42
use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection;
53
use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan;
64
use crate::execution_v1::physical_plan::PhysicalOperator;
5+
use crate::planner::logical_create_table_plan::LogicalCreateTablePlan;
76
use crate::planner::logical_select_plan::LogicalSelectPlan;
8-
use crate::planner::LogicalPlan;
9-
use crate::planner::operator::Operator;
107
use crate::planner::operator::scan::ScanOperator;
8+
use crate::planner::operator::Operator;
9+
use crate::planner::LogicalPlan;
10+
use anyhow::anyhow;
11+
use anyhow::Result;
1112

1213
pub struct PhysicalPlanBuilder {
1314
plan_id: u32,
@@ -27,10 +28,22 @@ impl PhysicalPlanBuilder {
2728
pub fn build_plan(&mut self, plan: &LogicalPlan) -> Result<PhysicalOperator> {
2829
match plan {
2930
LogicalPlan::Select(select) => self.build_select_logical_plan(select),
30-
LogicalPlan::CreateTable(_) => todo!(),
31+
LogicalPlan::CreateTable(create_table) => {
32+
self.build_create_table_logic_plan(create_table)
33+
}
3134
}
3235
}
3336

37+
fn build_create_table_logic_plan(
38+
&mut self,
39+
plan: &LogicalCreateTablePlan,
40+
) -> Result<PhysicalOperator> {
41+
Ok(PhysicalOperator::CreateTable(PhysicalCreateTable {
42+
table_name: plan.table_name.to_string(),
43+
columns: plan.columns.clone(),
44+
}))
45+
}
46+
3447
fn build_select_logical_plan(&mut self, plan: &LogicalSelectPlan) -> Result<PhysicalOperator> {
3548
match plan.operator.as_ref() {
3649
Operator::Project(op) => {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use crate::catalog::ColumnCatalog;
2+
use crate::execution_v1::physical_plan::physical_create_table::PhysicalCreateTable;
3+
use crate::execution_v1::ExecutorError;
4+
use crate::storage::Storage;
5+
use arrow::datatypes::{Schema, SchemaRef};
6+
use arrow::record_batch::RecordBatch;
7+
use futures_async_stream::try_stream;
8+
use std::sync::Arc;
9+
10+
pub struct CreateTable {}
11+
12+
impl CreateTable {
13+
#[try_stream(boxed, ok = RecordBatch , error = ExecutorError)]
14+
pub async fn execute(plan: PhysicalCreateTable, storage: impl Storage) {
15+
let mut columns = Vec::new();
16+
plan.columns.iter().for_each(|c| {
17+
columns.push(ColumnCatalog::new(c.0.clone(), c.1, c.2.clone()));
18+
});
19+
let table_name = plan.table_name.clone();
20+
// columns->batch record
21+
let mut data = Vec::new();
22+
23+
columns.iter().for_each(|c| {
24+
let batch = RecordBatch::new_empty(Arc::new(Schema::new(vec![c.to_field()])));
25+
data.push(batch);
26+
});
27+
28+
storage.create_table(plan.table_name.as_str(), data);
29+
}
30+
}

src/execution_v1/volcano_executor/mod.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
1-
mod table_scan;
1+
mod create_table;
22
mod projection;
3+
mod table_scan;
34

4-
use arrow::record_batch::RecordBatch;
5-
use futures::stream::BoxStream;
6-
use futures::TryStreamExt;
7-
use crate::execution_v1::ExecutorError;
85
use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection;
96
use crate::execution_v1::physical_plan::PhysicalOperator;
7+
use crate::execution_v1::volcano_executor::create_table::CreateTable;
108
use crate::execution_v1::volcano_executor::projection::Projection;
119
use crate::execution_v1::volcano_executor::table_scan::TableScan;
10+
use crate::execution_v1::ExecutorError;
1211
use crate::storage::StorageImpl;
12+
use arrow::record_batch::RecordBatch;
13+
use futures::stream::BoxStream;
14+
use futures::TryStreamExt;
1315

1416
pub type BoxedExecutor = BoxStream<'static, Result<RecordBatch, ExecutorError>>;
1517

@@ -33,6 +35,12 @@ impl VolcanoExecutor {
3335
let input = self.build(*input);
3436
Projection::execute(exprs, input)
3537
}
38+
PhysicalOperator::CreateTable(op) => match &self.storage {
39+
StorageImpl::InMemoryStorage(storage) => CreateTable::execute(op, storage.clone()),
40+
},
41+
_ => {
42+
unimplemented!()
43+
}
3644
}
3745
}
3846

0 commit comments

Comments
 (0)