Skip to content

Commit d540169

Browse files
committed
feat(insert): support sql insert simple syntax
1 parent 929deda commit d540169

22 files changed

Lines changed: 304 additions & 72 deletions

File tree

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,30 @@
11
use std::collections::HashSet;
2-
32
use anyhow::Result;
43
use sqlparser::ast::{ColumnDef, ObjectName};
54

65
use super::Binder;
76
use crate::binder::{lower_case_name, split_name};
87
use crate::catalog::ColumnCatalog;
98
use crate::planner::logical_create_table_plan::LogicalCreateTablePlan;
9+
use crate::planner::operator::create_table::CreateOperator;
1010

1111
impl Binder {
1212
pub(crate) fn bind_create_table(
1313
&mut self,
14-
name: ObjectName,
14+
name: &ObjectName,
1515
columns: &[ColumnDef],
1616
) -> Result<LogicalCreateTablePlan> {
1717
let name = lower_case_name(&name);
18-
1918
let (_, table_name) = split_name(&name)?;
2019

2120
// check duplicated column names
2221
let mut set = HashSet::new();
2322
for col in columns.iter() {
24-
if !set.insert(col.name.value.clone()) {
23+
let col_name = &col.name.value;
24+
if !set.insert(col_name.clone()) {
2525
return Err(anyhow::Error::msg(format!(
2626
"bind duplicated column {}",
27-
col.name.value.clone()
27+
col_name
2828
)));
2929
}
3030
}
@@ -35,8 +35,10 @@ impl Binder {
3535
.collect();
3636

3737
let plan = LogicalCreateTablePlan {
38-
table_name: table_name.to_string(),
39-
columns,
38+
operator: CreateOperator {
39+
table_name: table_name.to_string(),
40+
columns
41+
},
4042
};
4143
Ok(plan)
4244
}
@@ -46,7 +48,7 @@ impl Binder {
4648
mod tests {
4749
use super::*;
4850
use crate::binder::BinderContext;
49-
use crate::catalog::{ColumnDesc, RootCatalog};
51+
use crate::catalog::{ColumnCatalog, ColumnDesc, RootCatalog};
5052
use crate::planner::LogicalPlan;
5153
use crate::types::LogicalType;
5254

@@ -58,19 +60,21 @@ mod tests {
5860
let plan1 = binder.bind(&stmt[0]).unwrap();
5961

6062
let plan2 = LogicalPlan::CreateTable(LogicalCreateTablePlan {
61-
table_name: "t1".to_string(),
62-
columns: vec![
63-
ColumnCatalog::new(
64-
"id".to_string(),
65-
false,
66-
ColumnDesc::new(LogicalType::Integer, false)
67-
),
68-
ColumnCatalog::new(
69-
"name".to_string(),
70-
false,
71-
ColumnDesc::new(LogicalType::Varchar, false)
72-
)
73-
],
63+
operator: CreateOperator {
64+
table_name: "t1".to_string(),
65+
columns: vec![
66+
ColumnCatalog::new(
67+
"id".to_string(),
68+
false,
69+
ColumnDesc::new(LogicalType::Integer, false)
70+
),
71+
ColumnCatalog::new(
72+
"name".to_string(),
73+
false,
74+
ColumnDesc::new(LogicalType::Varchar, false)
75+
)
76+
],
77+
},
7478
});
7579

7680
assert_eq!(plan1, plan2);

src/binder/insert.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use sqlparser::ast::{Expr, Ident, ObjectName};
2+
use anyhow::Result;
3+
use itertools::Itertools;
4+
use crate::binder::{Binder, lower_case_name, split_name};
5+
use crate::expression::ScalarExpression;
6+
use crate::planner::logical_insert_plan::LogicalInsertPlan;
7+
use crate::planner::operator::insert::InsertOperator;
8+
9+
impl Binder {
10+
pub(crate) fn bind_insert(
11+
&mut self,
12+
name: ObjectName,
13+
idents: &[Ident],
14+
rows: &Vec<Vec<Expr>>
15+
) -> Result<LogicalInsertPlan> {
16+
let name = lower_case_name(&name);
17+
let (_, table_name) = split_name(&name)?;
18+
19+
if let Some(table) = self.context.catalog.get_table_by_name(table_name) {
20+
let mut col_idxs = Vec::new();
21+
22+
for ident in idents {
23+
let col_name = &ident.value;
24+
if let Some(col_idx) = table.get_column_id_by_name(col_name) {
25+
col_idxs.push(col_idx.clone());
26+
} else {
27+
return Err(anyhow::Error::msg(format!(
28+
"not found column {} on table {}",
29+
col_name,
30+
table_name
31+
)))
32+
}
33+
}
34+
if col_idxs.is_empty() {
35+
col_idxs = (0..table.columns_len()).collect_vec()
36+
}
37+
38+
// 行转列
39+
let mut cols: Vec<Vec<ScalarExpression>> = vec![Vec::new(); rows[0].len()];
40+
41+
for row in rows {
42+
for (i, expr) in row.into_iter().enumerate() {
43+
cols[i].push(self.bind_expr(expr)?);
44+
}
45+
}
46+
47+
Ok(LogicalInsertPlan {
48+
operator: InsertOperator {
49+
table: table_name.to_string(),
50+
col_idxs,
51+
cols,
52+
},
53+
})
54+
} else {
55+
Err(anyhow::Error::msg(format!(
56+
"not found table {}",
57+
table_name
58+
)))
59+
}
60+
}
61+
62+
63+
}

src/binder/mod.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
pub mod aggregate;
2-
mod create;
2+
mod create_table;
33
pub mod expr;
44
mod select;
5+
mod insert;
56

67
use std::collections::HashMap;
78

89
use anyhow::Result;
9-
use sqlparser::ast::{Ident, ObjectName, Statement};
10+
use sqlparser::ast::{Ident, ObjectName, SetExpr, Statement};
1011

1112
use crate::catalog::{RootCatalog, DEFAULT_SCHEMA_NAME, CatalogError};
1213
use crate::expression::ScalarExpression;
@@ -59,15 +60,24 @@ impl Binder {
5960
}
6061

6162
pub fn bind(mut self, stmt: &Statement) -> Result<LogicalPlan> {
63+
println!("{:#?}", stmt);
6264
let plan = match stmt {
6365
Statement::Query(query) => {
6466
let plan = self.bind_query(query)?;
6567
LogicalPlan::Select(plan)
6668
}
6769
Statement::CreateTable { name, columns, .. } => {
68-
let plan = self.bind_create_table(name.to_owned(), &columns)?;
70+
let plan = self.bind_create_table(name, &columns)?;
6971
LogicalPlan::CreateTable(plan)
7072
}
73+
Statement::Insert { table_name, columns, source, .. } => {
74+
if let SetExpr::Values(values) = source.body.as_ref() {
75+
let plan = self.bind_insert(table_name.to_owned(), columns, &values.rows)?;
76+
LogicalPlan::Insert(plan)
77+
} else {
78+
todo!()
79+
}
80+
}
7181
_ => unimplemented!(),
7282
};
7383
Ok(plan)

src/binder/select.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ impl Binder {
200200
let mut exprs = vec![];
201201
for ref_id in self.context.bind_table.values().cloned().collect_vec() {
202202
let table = self.context.catalog.get_table(ref_id).unwrap();
203-
for (col_id, col) in &table.get_all_columns() {
203+
for (col_id, col) in &table.all_columns() {
204204
let column_ref_id = ColumnRefId::from_table(ref_id, *col_id);
205205
// self.record_regular_table_column(
206206
// &table.name(),

src/catalog/column.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,6 @@ impl ColumnDesc {
7575
}
7676
}
7777

78-
pub(crate) fn is_primary(&self) -> bool {
79-
self.is_primary
80-
}
81-
8278
pub(crate) fn get_datatype(&self) -> LogicalType {
8379
self.column_datatype.clone()
8480
}

src/catalog/table.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
use std::collections::HashMap;
2+
use std::sync::Arc;
3+
use arrow::datatypes::{Schema, SchemaRef};
24

35
use itertools::Itertools;
46

57
use crate::catalog::{CatalogError, ColumnCatalog};
68
use crate::types::{ColumnIdx, IdGenerator, TableIdx};
7-
#[derive(Debug, Clone)]
9+
#[derive(Debug, Clone, PartialEq)]
810
pub struct TableCatalog {
911
pub id: Option<TableIdx>,
1012
pub name: String,
@@ -15,6 +17,10 @@ pub struct TableCatalog {
1517
}
1618

1719
impl TableCatalog {
20+
pub(crate) fn columns_len(&self) -> usize {
21+
self.columns.len()
22+
}
23+
1824
pub(crate) fn get_column_by_id(&self, id: ColumnIdx) -> Option<&ColumnCatalog> {
1925
self.columns.get(id)
2026
}
@@ -32,13 +38,21 @@ impl TableCatalog {
3238
self.column_idxs.contains_key(name)
3339
}
3440

35-
pub(crate) fn get_all_columns(&self) -> Vec<(ColumnIdx, &ColumnCatalog)> {
41+
pub(crate) fn all_columns(&self) -> Vec<(ColumnIdx, &ColumnCatalog)> {
3642
self.columns
3743
.iter()
3844
.enumerate()
3945
.collect_vec()
4046
}
4147

48+
// TODO: 缓存schema
49+
pub(crate) fn schema(&self) -> SchemaRef {
50+
let fields = self.columns.iter()
51+
.map(ColumnCatalog::to_field)
52+
.collect_vec();
53+
Arc::new(Schema::new(fields))
54+
}
55+
4256
/// Add a column to the table catalog.
4357
pub(crate) fn add_column(
4458
&mut self,

src/db.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ pub enum DatabaseError {
123123
mod test {
124124
use std::sync::Arc;
125125
use arrow::array::{BooleanArray, Int32Array};
126+
use arrow::compute::concat_batches;
126127
use arrow::datatypes::Schema;
127128
use arrow::record_batch::RecordBatch;
128129
use itertools::Itertools;
@@ -177,9 +178,14 @@ mod test {
177178
let database = Database::new_on_mem();
178179

179180
tokio_test::block_on(async move {
180-
let _batch = database.run("create table t1 (a int, b int)").await?;
181-
let batch = database.run("select * from t1").await?;
182-
println!("{:#?}", batch);
181+
let _ = database.run("create table t1 (a int, b boolean)").await?;
182+
let _ = database.run("insert into t1 values (1, true), (2, false)").await?;
183+
let vec_batch = database.run("select * from t1").await?;
184+
185+
let table = database.storage
186+
.get_catalog()
187+
.get_table(0).unwrap().clone();
188+
println!("{:#?}", concat_batches(&table.schema(), &vec_batch));
183189

184190
Ok(())
185191
})

src/execution/physical/physical_plan_builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ impl PhysicalPlanBuilder {
3030
match plan {
3131
LogicalPlan::Select(select) => self.build_select_logical_plan(select),
3232
LogicalPlan::CreateTable(_) => todo!(),
33+
LogicalPlan::Insert(_) => todo!(),
3334
}
3435
}
3536

src/execution_v1/physical_plan/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
use crate::execution_v1::physical_plan::physical_create_table::PhysicalCreateTable;
2+
use crate::execution_v1::physical_plan::physical_insert::PhysicalInsert;
23
use crate::execution_v1::physical_plan::physical_projection::PhysicalProjection;
34
use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan;
45

56
pub(crate) mod physical_create_table;
67
pub(crate) mod physical_plan_builder;
78
pub(crate) mod physical_projection;
89
pub(crate) mod physical_table_scan;
10+
pub(crate) mod physical_insert;
911

1012
#[derive(Debug)]
1113
pub enum PhysicalOperator {
14+
Insert(PhysicalInsert),
1215
CreateTable(PhysicalCreateTable),
1316
TableScan(PhysicalTableScan),
1417
Projection(PhysicalProjection),
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
use crate::expression::ScalarExpression;
2+
use crate::types::ColumnIdx;
3+
4+
#[derive(Debug)]
5+
pub struct PhysicalInsert {
6+
/// Table name to insert to
7+
pub table_name: String,
8+
9+
pub col_idxs: Vec<ColumnIdx>,
10+
/// List of columns of the table
11+
pub cols: Vec<Vec<ScalarExpression>>
12+
}

0 commit comments

Comments
 (0)