Skip to content

Commit cec22f2

Browse files
authored
feat(hash_join): implement basic Inner/Left/Right/Full Join operations (#43)
* feat(hash_join): implement basic Inner/Left/Right/Full Join operations found a problem that needs to be fixed: - when the `Projection` operator executes eval_column, the reading data is abnormal due to the duplication of the ColumnIdx subscript of the Catalog So `Binder::bind_project` need to support join * fix(hash_join): fix filter on right join * feat(projection): `Projection` supports multi-table mapping By marshaling Column and rewriting `ScalarExpression` to convert `ColumnRef` to `InputRef` * fix(insert): fix the misalignment between the specified field and the data when inserting * refactor(physical_plan_builder): for subsequent Agg compatibility, cancel converting InputRef to ColumnRef * fix(create_table): fix `test_create_bind` * refactor(binder): when selecting Join in `Binder::bind`, perform corresponding Select nullable processing to avoid rewrite
1 parent 6914199 commit cec22f2

27 files changed

Lines changed: 960 additions & 223 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ async-channel = "1.8.0"
4040
async-backtrace = "0.2.6"
4141
futures = "0.3.25"
4242
futures-lite = "1.12.0"
43+
ahash = "0.8.3"
4344

4445
[dev-dependencies]
4546
tokio-test = "0.4.2"

src/binder/aggregate.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,7 @@ impl Binder {
9090
ScalarExpression::AggCall {
9191
ty: return_type, ..
9292
} => {
93-
let index = if self.context.agg_calls.len() == 0 {
94-
0
95-
} else {
96-
self.context.agg_calls.len() + 1
97-
};
93+
let index = self.context.agg_calls.len();
9894
let input_ref = ScalarExpression::InputRef {
9995
index,
10096
ty: return_type.clone(),

src/binder/create_table.rs

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -52,38 +52,28 @@ impl Binder {
5252
mod tests {
5353
use super::*;
5454
use crate::binder::BinderContext;
55-
use crate::catalog::{ColumnCatalog, ColumnDesc, RootCatalog};
56-
use crate::planner::LogicalPlan;
55+
use crate::catalog::{ColumnDesc, RootCatalog};
5756
use crate::types::LogicalType;
5857

5958
#[test]
6059
fn test_create_bind() {
61-
let sql = "create table t1 (id int , name varchar(10))";
60+
let sql = "create table t1 (id int , name varchar(10) null)";
6261
let binder = Binder::new(BinderContext::new(RootCatalog::new()));
6362
let stmt = crate::parser::parse_sql(sql).unwrap();
6463
let plan1 = binder.bind(&stmt[0]).unwrap();
6564

66-
let plan2 = LogicalPlan {
67-
operator: Operator::CreateTable(
68-
CreateTableOperator {
69-
table_name: "t1".to_string(),
70-
columns: vec![
71-
ColumnCatalog::new(
72-
"id".to_string(),
73-
false,
74-
ColumnDesc::new(LogicalType::Integer, false)
75-
),
76-
ColumnCatalog::new(
77-
"name".to_string(),
78-
false,
79-
ColumnDesc::new(LogicalType::Varchar, false)
80-
)
81-
],
82-
}
83-
),
84-
childrens: vec![],
85-
};
65+
match plan1.operator {
66+
Operator::CreateTable(op) => {
67+
assert_eq!(op.table_name, "t1".to_string());
68+
assert_eq!(op.columns[0].name, "id".to_string());
69+
assert_eq!(op.columns[0].nullable, false);
70+
assert_eq!(op.columns[0].desc, ColumnDesc::new(LogicalType::Integer, false));
71+
assert_eq!(op.columns[1].name, "name".to_string());
72+
assert_eq!(op.columns[1].nullable, true);
73+
assert_eq!(op.columns[1].desc, ColumnDesc::new(LogicalType::Varchar, false));
74+
}
75+
_ => unreachable!()
76+
}
8677

87-
assert_eq!(plan1, plan2);
8878
}
8979
}

src/binder/expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ impl Binder {
6868
} else {
6969
// handle col syntax
7070
let mut got_column = None;
71-
for table_catalog in &self.context.catalog.tables {
71+
for (_, table_catalog) in self.context.catalog.tables() {
7272
if let Some(column_catalog) = table_catalog.get_column_by_name(column_name) {
7373
if got_column.is_some() {
7474
return Err(BindError::InvalidColumn(column_name.to_string()).into());

src/binder/mod.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,21 @@ pub mod expr;
44
mod select;
55
mod insert;
66

7-
use std::collections::HashMap;
7+
use std::collections::BTreeMap;
88

99
use anyhow::Result;
1010
use sqlparser::ast::{Ident, ObjectName, SetExpr, Statement};
1111

1212
use crate::catalog::{RootCatalog, DEFAULT_SCHEMA_NAME, CatalogError};
1313
use crate::expression::ScalarExpression;
1414
use crate::planner::LogicalPlan;
15-
use crate::types::TableIdx;
16-
#[derive(Clone)]
15+
use crate::planner::operator::join::JoinType;
16+
use crate::types::TableId;
17+
#[derive(Debug, Clone)]
1718
pub struct BinderContext {
18-
catalog: RootCatalog,
19-
bind_table: HashMap<String, TableIdx>,
20-
aliases: HashMap<String, ScalarExpression>,
19+
pub(crate) catalog: RootCatalog,
20+
pub(crate) bind_table: BTreeMap<String, (TableId, Option<JoinType>)>,
21+
aliases: BTreeMap<String, ScalarExpression>,
2122
group_by_exprs: Vec<ScalarExpression>,
2223
agg_calls: Vec<ScalarExpression>,
2324
index: u16,

0 commit comments

Comments
 (0)