Skip to content

Commit 579d1de

Browse files
committed
feat(values): implement the Values operator to temporarily carry data as a child node of Insert
1 parent 3ed6502 commit 579d1de

18 files changed

Lines changed: 219 additions & 497 deletions

File tree

src/binder/insert.rs

Lines changed: 56 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
1+
use std::slice;
2+
use std::sync::Arc;
13
use sqlparser::ast::{Expr, Ident, ObjectName};
2-
use anyhow::Result;
4+
use anyhow::{Error, Result};
35
use itertools::Itertools;
46
use crate::binder::{Binder, lower_case_name, split_name};
7+
use crate::catalog::ColumnCatalog;
58
use crate::expression::ScalarExpression;
69
use crate::planner::logical_insert_plan::LogicalInsertPlan;
710
use crate::planner::operator::insert::InsertOperator;
11+
use crate::planner::operator::Operator;
12+
use crate::planner::operator::values::ValuesOperator;
13+
use crate::types::value::DataValue;
814

915
impl Binder {
16+
17+
// TODO: 支持Project
18+
// TODO: 检测多行Values对齐
1019
pub(crate) fn bind_insert(
1120
&mut self,
1221
name: ObjectName,
@@ -17,39 +26,45 @@ impl Binder {
1726
let (_, table_name) = split_name(&name)?;
1827

1928
if let Some(table) = self.context.catalog.get_table_by_name(table_name) {
20-
let mut col_idxs = Vec::new();
21-
22-
for ident in idents {
23-
let col_name = &ident.value;
24-
if let Some(col_idx) = table.get_column_id_by_name(col_name) {
25-
col_idxs.push(col_idx.clone());
26-
} else {
27-
return Err(anyhow::Error::msg(format!(
28-
"not found column {} on table {}",
29-
col_name,
30-
table_name
31-
)))
29+
let mut col_catalogs = Vec::new();
30+
31+
if idents.is_empty() {
32+
col_catalogs = table.all_columns()
33+
.into_iter()
34+
.map(|(_, catalog)| catalog.clone())
35+
.collect_vec();
36+
} else {
37+
for ident in idents {
38+
match self.bind_column_ref_from_identifiers(slice::from_ref(ident))? {
39+
ScalarExpression::ColumnRef(catalog) => col_catalogs.push(catalog),
40+
_ => unreachable!()
41+
}
3242
}
3343
}
34-
if col_idxs.is_empty() {
35-
col_idxs = (0..table.columns_len()).collect_vec()
36-
}
3744

38-
// 行转列
39-
let mut cols: Vec<Vec<ScalarExpression>> = vec![Vec::new(); rows[0].len()];
45+
let rows = rows
46+
.into_iter()
47+
.map(|row| {
48+
row.into_iter()
49+
.map(|expr| match self.bind_expr(expr)? {
50+
ScalarExpression::Constant(value) => Ok::<DataValue, Error>(value),
51+
_ => unreachable!(),
52+
})
53+
.try_collect()
54+
})
55+
.try_collect()?;
4056

41-
for row in rows {
42-
for (i, expr) in row.into_iter().enumerate() {
43-
cols[i].push(self.bind_expr(expr)?);
44-
}
45-
}
57+
let values_plan = self.bind_values(rows, col_catalogs.clone());
4658

4759
Ok(LogicalInsertPlan {
48-
operator: InsertOperator {
49-
table: table_name.to_string(),
50-
col_idxs,
51-
cols,
52-
},
60+
operator: Arc::new(
61+
Operator::Insert(
62+
InsertOperator {
63+
table: table_name.to_string(),
64+
}
65+
)
66+
),
67+
children: vec![Arc::new(values_plan)],
5368
})
5469
} else {
5570
Err(anyhow::Error::msg(format!(
@@ -59,5 +74,17 @@ impl Binder {
5974
}
6075
}
6176

62-
77+
fn bind_values(
78+
&mut self,
79+
rows: Vec<Vec<DataValue>>,
80+
col_catalogs: Vec<ColumnCatalog>
81+
) -> LogicalInsertPlan {
82+
LogicalInsertPlan {
83+
operator: Arc::new(Operator::Values(ValuesOperator {
84+
rows,
85+
col_catalogs,
86+
})),
87+
children: vec![],
88+
}
89+
}
6390
}

src/binder/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ impl Binder {
6060
}
6161

6262
pub fn bind(mut self, stmt: &Statement) -> Result<LogicalPlan> {
63-
println!("{:#?}", stmt);
6463
let plan = match stmt {
6564
Statement::Query(query) => {
6665
let plan = self.bind_query(query)?;

0 commit comments

Comments
 (0)