Skip to content

Commit eee925d

Browse files
authored
RBO Optimizer implementation (#45)
* feat(optimizer): Complete the addition, deletion and modification of `HepGraph` TODO: HepGraph::to_plan * feat(optimizer): implement `HepGraph::to_plan` * feat(optimizer): implement `HepMatcher` to retrieve the corresponding matching `OptExpr` in the `HepGraph` * feat(optimizer): complete the general framework of the RBO optimizer: `HepOptimizer` rules are divided into `RuleBatch` for matching and apply * feat(optimizer): physical operator: `TableScan`, support columns conditional scanning * refactor(physical_plan): `PhysicalPlanBuilder` -> `PhysicalPlanMapping`, `PhysicalPlanMapping` avoids `clone` and reduces performance overhead * feat(storage::memory): `InMemoryTransaction` supports `Columns` projection and Limit offset * fix(insert): fix the table binding of idents when inserting * feat(rule): `columns_pruning` RuleBatch Added `PUSH_PROJECT_THROUGH_CHILD_RULE` Rule to push down the Project to reduce the IO generated by redundant columns * feat(rule): `combine_operators` RuleBatch Added `COLLAPSE_PROJECT_RULE` and `COMBINE_FILTERS_RULE` to minimize the calculation amount of Projection and merge Filter calculation * feat(rule): the new `pushdown_limit` series rules are used to optimize the related structure of the Limit operator * feat(rule): added Rule `PushPredicateThroughJoin`, which is used to push the Filter condition before the Join, and only works for the JoinType of Inner/Left/Right * fix(hash_join): add return early * style(join): code fmt * feat(graph): use `version` to judge `HepGraph` changes * docs: rewrite README.md * style(rule): use `matches!` replace `match` * style(rule): fix note miss
1 parent cec22f2 commit eee925d

45 files changed

Lines changed: 2453 additions & 427 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ async-backtrace = "0.2.6"
4141
futures = "0.3.25"
4242
futures-lite = "1.12.0"
4343
ahash = "0.8.3"
44+
lazy_static = "1.4.0"
4445

4546
[dev-dependencies]
4647
tokio-test = "0.4.2"

README.md

Lines changed: 53 additions & 191 deletions
Large diffs are not rendered by default.

src/binder/expr.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ impl Binder {
1313
pub(crate) fn bind_expr(&mut self, expr: &Expr) -> Result<ScalarExpression> {
1414
match expr {
1515
Expr::Identifier(ident) => {
16-
self.bind_column_ref_from_identifiers(slice::from_ref(ident))
16+
self.bind_column_ref_from_identifiers(slice::from_ref(ident), None)
1717
}
1818
Expr::CompoundIdentifier(idents) => {
19-
self.bind_column_ref_from_identifiers(idents)
19+
self.bind_column_ref_from_identifiers(idents, None)
2020
}
2121
Expr::BinaryOp { left, right, op} => {
2222
self.bind_binary_op_internal(left, right, op)
@@ -33,6 +33,7 @@ impl Binder {
3333
pub fn bind_column_ref_from_identifiers(
3434
&mut self,
3535
idents: &[Ident],
36+
bind_table_name: Option<&String>,
3637
) -> Result<ScalarExpression> {
3738
let idents = idents
3839
.iter()
@@ -54,7 +55,7 @@ impl Binder {
5455
}
5556
};
5657

57-
if let Some(table) = table_name {
58+
if let Some(table) = table_name.or(bind_table_name) {
5859
let table_catalog = self
5960
.context
6061
.catalog

src/binder/insert.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@ impl Binder {
3333
.map(|(_, catalog)| catalog.clone())
3434
.collect_vec();
3535
} else {
36+
let bind_table_name = Some(table.name.to_string());
3637
for ident in idents {
37-
match self.bind_column_ref_from_identifiers(slice::from_ref(ident))? {
38+
match self.bind_column_ref_from_identifiers(
39+
slice::from_ref(ident),
40+
bind_table_name.as_ref()
41+
)? {
3842
ScalarExpression::ColumnRef(catalog) => col_catalogs.push(catalog),
3943
_ => unreachable!()
4044
}

src/binder/mod.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,38 @@ pub enum BindError {
115115
#[error("catalog error")]
116116
CatalogError(#[from] CatalogError),
117117
}
118+
119+
#[cfg(test)]
120+
pub mod test {
121+
use crate::catalog::{ColumnCatalog, ColumnDesc, RootCatalog};
122+
use crate::planner::LogicalPlan;
123+
use crate::types::LogicalType::Integer;
124+
use anyhow::Result;
125+
use crate::binder::{Binder, BinderContext};
126+
127+
fn test_root_catalog() -> Result<RootCatalog> {
128+
let mut root = RootCatalog::new();
129+
130+
let cols_t1 = vec![
131+
ColumnCatalog::new("c1".to_string(), false, ColumnDesc::new(Integer, true)),
132+
ColumnCatalog::new("c2".to_string(), false, ColumnDesc::new(Integer, false)),
133+
];
134+
let _ = root.add_table("t1".to_string(), cols_t1)?;
135+
136+
let cols_t2 = vec![
137+
ColumnCatalog::new("c3".to_string(), false, ColumnDesc::new(Integer, true)),
138+
ColumnCatalog::new("c4".to_string(), false, ColumnDesc::new(Integer, false)),
139+
];
140+
let _ = root.add_table("t2".to_string(), cols_t2)?;
141+
Ok(root)
142+
}
143+
144+
pub fn select_sql_run(sql: &str) -> Result<LogicalPlan> {
145+
let root = test_root_catalog()?;
146+
147+
let binder = Binder::new(BinderContext::new(root));
148+
let stmt = crate::parser::parse_sql(sql).unwrap();
149+
150+
Ok(binder.bind(&stmt[0])?)
151+
}
152+
}

src/binder/select.rs

Lines changed: 7 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use sqlparser::ast::{
2323
Expr, Ident, Join, JoinConstraint, JoinOperator, Offset, OrderByExpr, Query, Select,
2424
SelectItem, SetExpr, TableFactor, TableWithJoins,
2525
};
26+
use crate::execution_v1::volcano_executor::join::joins_nullable;
2627
use crate::expression::BinaryOperator;
2728
use crate::planner::LogicalPlan;
2829
use crate::planner::operator::join::JoinCondition;
@@ -154,15 +155,15 @@ impl Binder {
154155
)));
155156
}
156157

157-
let table_ref_id = self
158+
let table_catalog = self
158159
.context
159-
.catalog
160-
.get_table_id_by_name(table)
160+
.catalog.get_table_by_name(table)
161161
.ok_or_else(|| anyhow::Error::msg(format!("bind table {}", table)))?;
162+
let table_ref_id = table_catalog.id;
162163

163164
self.context.bind_table.insert(table.into(), (table_ref_id, joint_type));
164165

165-
(table_ref_id, ScanOperator::new(table_ref_id))
166+
(table_ref_id, ScanOperator::new(table_ref_id, &table_catalog))
166167
}
167168
_ => unimplemented!(),
168169
};
@@ -354,13 +355,7 @@ impl Binder {
354355

355356
for (table_id, join_option) in bind_tables.values() {
356357
if let Some(join_type) = join_option {
357-
let (left_force_nullable, right_force_nullable) = match join_type {
358-
JoinType::Inner => (false, false),
359-
JoinType::Left => (false, true),
360-
JoinType::Right => (true, false),
361-
JoinType::Full => (true, true),
362-
JoinType::Cross => (true, true),
363-
};
358+
let (left_force_nullable, right_force_nullable) = joins_nullable(join_type);
364359
table_force_nullable.insert(*table_id, right_force_nullable);
365360
left_table_force_nullable = left_force_nullable;
366361
} else {
@@ -498,36 +493,7 @@ impl Binder {
498493
#[cfg(test)]
499494
mod tests {
500495
use super::*;
501-
use crate::binder::BinderContext;
502-
use crate::catalog::{ColumnCatalog, ColumnDesc, RootCatalog};
503-
use crate::planner::LogicalPlan;
504-
use crate::types::LogicalType::Integer;
505-
506-
fn test_root_catalog() -> Result<RootCatalog> {
507-
let mut root = RootCatalog::new();
508-
509-
let cols_t1 = vec![
510-
ColumnCatalog::new("c1".to_string(), false, ColumnDesc::new(Integer, true)),
511-
ColumnCatalog::new("c2".to_string(), false, ColumnDesc::new(Integer, false)),
512-
];
513-
let _ = root.add_table("t1".to_string(), cols_t1)?;
514-
515-
let cols_t2 = vec![
516-
ColumnCatalog::new("c3".to_string(), false, ColumnDesc::new(Integer, true)),
517-
ColumnCatalog::new("c4".to_string(), false, ColumnDesc::new(Integer, false)),
518-
];
519-
let _ = root.add_table("t2".to_string(), cols_t2)?;
520-
Ok(root)
521-
}
522-
523-
fn select_sql_run(sql: &str) -> Result<LogicalPlan> {
524-
let root = test_root_catalog()?;
525-
526-
let binder = Binder::new(BinderContext::new(root));
527-
let stmt = crate::parser::parse_sql(sql).unwrap();
528-
529-
Ok(binder.bind(&stmt[0])?)
530-
}
496+
use crate::binder::test::select_sql_run;
531497

532498
#[test]
533499
fn test_select_bind() -> Result<()> {

src/catalog/column.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use sqlparser::ast::{ColumnDef, ColumnOption};
33

44
use crate::types::{ColumnId, IdGenerator, LogicalType, TableId};
55

6-
#[derive(Debug, Clone, PartialEq)]
6+
#[derive(Debug, Clone)]
77
pub struct ColumnCatalog {
88
pub id: ColumnId,
99
pub name: String,
@@ -12,6 +12,19 @@ pub struct ColumnCatalog {
1212
pub desc: ColumnDesc,
1313
}
1414

15+
// Tips: When there is a Join, the on condition in the Join has a nullable condition,
16+
// and the nullable in the Projection will change after being affected by the JoinType,
17+
// so that Eq does not necessarily match, so only use the ID as the matching criterion
18+
impl PartialEq<Self> for ColumnCatalog {
19+
fn eq(&self, other: &Self) -> bool {
20+
self.id == other.id
21+
}
22+
}
23+
24+
impl Eq for ColumnCatalog {
25+
26+
}
27+
1528
impl ColumnCatalog {
1629
pub(crate) fn new(column_name: String, nullable: bool, column_desc: ColumnDesc) -> ColumnCatalog {
1730
ColumnCatalog {

src/catalog/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ pub(crate) use self::root::*;
66
pub(crate) use self::table::*;
77

88
/// The type of catalog reference.
9-
pub type CatalogRef = Arc<RootCatalog>;
9+
pub type RootRef = Arc<RootCatalog>;
1010

1111
pub(crate) static DEFAULT_DATABASE_NAME: &str = "kipsql";
1212
pub(crate) static DEFAULT_SCHEMA_NAME: &str = "kipsql";

src/db.rs

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,13 @@ use arrow::record_batch::RecordBatch;
44
use sqlparser::parser::ParserError;
55

66
use crate::binder::{BindError, Binder, BinderContext};
7-
use crate::execution_v1::physical_plan::physical_plan_builder::PhysicalPlanBuilder;
7+
use crate::execution_v1::physical_plan::physical_plan_mapping::PhysicalPlanMapping;
88
use crate::execution_v1::volcano_executor::VolcanoExecutor;
9+
use crate::optimizer::heuristic::batch::HepBatchStrategy;
10+
use crate::optimizer::heuristic::optimizer::HepOptimizer;
11+
use crate::optimizer::rule::RuleImpl;
912
use crate::parser::parse_sql;
13+
use crate::planner::LogicalPlan;
1014
use crate::storage::memory::InMemoryStorage;
1115
use crate::storage::{Storage, StorageError, StorageImpl};
1216

@@ -44,20 +48,60 @@ impl Database {
4448
/// Sort(a)
4549
/// Limit(1)
4650
/// Project(a,b)
47-
let logical_plan = binder.bind(&stmts[0])?;
48-
// println!("logic plan: {:#?}", logical_plan);
51+
let source_plan = binder.bind(&stmts[0])?;
52+
// println!("source_plan plan: {:#?}", source_plan);
4953

50-
let mut builder = PhysicalPlanBuilder::new();
51-
let operator = builder.build_plan(&logical_plan)?;
52-
// println!("operator: {:#?}", operator);
54+
let best_plan = Self::default_optimizer(source_plan)
55+
.find_best();
56+
// println!("best_plan plan: {:#?}", best_plan);
57+
58+
let physical_plan = PhysicalPlanMapping::build_plan(best_plan)?;
59+
// println!("physical_plan: {:#?}", physical_plan);
5360

5461
let storage = StorageImpl::InMemoryStorage(self.storage.clone());
5562
let executor = VolcanoExecutor::new(storage);
5663

57-
let mut stream = executor.build(operator);
64+
let mut stream = executor.build(physical_plan);
5865

5966
Ok(VolcanoExecutor::try_collect(&mut stream).await?)
6067
}
68+
69+
fn default_optimizer(source_plan: LogicalPlan) -> HepOptimizer {
70+
HepOptimizer::new(source_plan)
71+
.batch(
72+
"Predicate pushdown".to_string(),
73+
HepBatchStrategy::fix_point_topdown(10),
74+
vec![
75+
RuleImpl::PushPredicateThroughJoin
76+
]
77+
)
78+
.batch(
79+
"Limit pushdown".to_string(),
80+
HepBatchStrategy::fix_point_topdown(10),
81+
vec![
82+
RuleImpl::LimitProjectTranspose,
83+
RuleImpl::PushLimitThroughJoin,
84+
RuleImpl::PushLimitIntoTableScan,
85+
RuleImpl::EliminateLimits,
86+
],
87+
)
88+
.batch(
89+
"Column pruning".to_string(),
90+
HepBatchStrategy::fix_point_topdown(10),
91+
vec![
92+
RuleImpl::PushProjectThroughChild,
93+
RuleImpl::PushProjectIntoScan
94+
]
95+
)
96+
.batch(
97+
"Combine operators".to_string(),
98+
HepBatchStrategy::fix_point_topdown(10),
99+
vec![
100+
RuleImpl::CollapseProject,
101+
RuleImpl::CombineFilter
102+
]
103+
)
104+
}
61105
}
62106

63107
#[derive(thiserror::Error, Debug)]

src/execution_v1/physical_plan/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::execution_v1::physical_plan::physical_table_scan::PhysicalTableScan;
99
use crate::execution_v1::physical_plan::physical_values::PhysicalValues;
1010

1111
pub(crate) mod physical_create_table;
12-
pub(crate) mod physical_plan_builder;
12+
pub(crate) mod physical_plan_mapping;
1313
pub(crate) mod physical_projection;
1414
pub(crate) mod physical_table_scan;
1515
pub(crate) mod physical_insert;

0 commit comments

Comments
 (0)