Skip to content

Commit 50332a0

Browse files
authored
feat(volcano_executor): implementing a Simple Volcano Actuator (#26)
1 parent 52b897c commit 50332a0

29 files changed

Lines changed: 633 additions & 197 deletions

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ async-backtrace = "0.2.6"
4141
futures = "0.3.25"
4242
futures-lite = "1.12.0"
4343

44-
4544
[dev-dependencies]
45+
tokio-test = "0.4.2"
4646
ctor = "0.2.0"
4747
env_logger = "0.10"
4848
paste = "^1.0"

rust-toolchain

Lines changed: 0 additions & 1 deletion
This file was deleted.

src/binder/create.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl Binder {
3939
table_name: table_name.to_string(),
4040
columns: columns
4141
.into_iter()
42-
.map(|col| (col.name.to_string(), col.desc.clone()))
42+
.map(|col| (col.name.to_string(), col.nullable, col.desc.clone()))
4343
.collect(),
4444
};
4545
Ok(plan)
@@ -68,10 +68,12 @@ mod tests {
6868
columns: vec![
6969
(
7070
"id".to_string(),
71+
false,
7172
ColumnDesc::new(LogicalType::Integer, false),
7273
),
7374
(
7475
"name".to_string(),
76+
false,
7577
ColumnDesc::new(LogicalType::Varchar, false),
7678
),
7779
],

src/binder/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::collections::HashMap;
88
use anyhow::Result;
99
use sqlparser::ast::{Ident, ObjectName, Statement};
1010

11-
use crate::catalog::{RootCatalog, DEFAULT_SCHEMA_NAME};
11+
use crate::catalog::{RootCatalog, DEFAULT_SCHEMA_NAME, CatalogError};
1212
use crate::expression::ScalarExpression;
1313
use crate::planner::LogicalPlan;
1414
use crate::types::TableId;
@@ -109,4 +109,6 @@ pub enum BindError {
109109
BinaryOpTypeMismatch(String, String),
110110
#[error("subquery in FROM must have an alias")]
111111
SubqueryMustHaveAlias,
112+
#[error("catalog error")]
113+
CatalogError(#[from] CatalogError),
112114
}

src/binder/select.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,11 +208,7 @@ impl Binder {
208208
// *col_id,
209209
// col.desc().clone(),
210210
// );
211-
let expr = ScalarExpression::ColumnRef {
212-
column_ref_id,
213-
primary_key: col.desc.is_primary(),
214-
desc: col.desc.clone(),
215-
};
211+
let expr = ScalarExpression::ColumnRef((*col).clone());
216212
exprs.push(expr);
217213
}
218214
}
@@ -317,3 +313,39 @@ impl Binder {
317313
Ok(LimitOperator::new(offset, limit, children))
318314
}
319315
}
316+
317+
#[cfg(test)]
318+
mod tests {
319+
use sqlparser::ast::CharacterLength;
320+
321+
use super::*;
322+
use crate::binder::{BinderContext, BindError};
323+
use crate::catalog::{ColumnCatalog, ColumnDesc, RootCatalog};
324+
use crate::planner::LogicalPlan;
325+
use crate::types::LogicalType;
326+
use crate::types::LogicalType::{Boolean, Integer};
327+
328+
fn test_root_catalog() -> Result<RootCatalog, BindError> {
329+
let mut root = RootCatalog::new();
330+
let cols = vec![
331+
ColumnCatalog::new("c1".to_string(), false, ColumnDesc::new(Integer, true)),
332+
ColumnCatalog::new("c2".to_string(), false, ColumnDesc::new(Boolean, false)),
333+
];
334+
let _ = root.add_table("t1".to_string(), cols)?;
335+
Ok(root)
336+
}
337+
338+
#[test]
339+
fn test_select_bind() -> Result<(), BindError> {
340+
let sql = "select * from t1";
341+
let root = test_root_catalog()?;
342+
343+
let binder = Binder::new(BinderContext::new(root));
344+
let stmt = crate::parser::parse_sql(sql).unwrap();
345+
let plan = binder.bind(&stmt[0]).unwrap();
346+
347+
println!("{:#?}", plan);
348+
349+
Ok(())
350+
}
351+
}

src/catalog/column.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
use arrow::datatypes::{DataType, Field};
2-
use sqlparser::ast::ColumnDef;
2+
use sqlparser::ast::{ColumnDef, ColumnOption};
33

44
use crate::types::{ColumnId, IdGenerator, LogicalType};
55

6-
#[derive(Debug, Clone)]
6+
#[derive(Debug, Clone, PartialEq)]
77
pub struct ColumnCatalog {
88
pub id: ColumnId,
99
pub name: String,
10+
pub nullable: bool,
1011
pub desc: ColumnDesc,
1112
}
1213

1314
impl ColumnCatalog {
14-
pub(crate) fn new(column_name: String, column_desc: ColumnDesc) -> ColumnCatalog {
15+
pub(crate) fn new(column_name: String, nullable: bool, column_desc: ColumnDesc) -> ColumnCatalog {
1516
ColumnCatalog {
1617
id: IdGenerator::build(),
1718
name: column_name,
19+
nullable,
1820
desc: column_desc,
1921
}
2022
}
@@ -33,20 +35,32 @@ impl ColumnCatalog {
3335

3436
pub fn to_field(&self) -> Field {
3537
Field::new(
36-
&*self.name.clone(),
37-
DataType::from(self.desc.column_datatype.clone()),
38-
self.desc.is_primary(),
38+
self.name.as_str(),
39+
DataType::from(self.datatype().clone()),
40+
self.nullable,
3941
)
4042
}
4143
}
4244

4345
impl From<ColumnDef> for ColumnCatalog {
4446
fn from(column_def: ColumnDef) -> Self {
4547
let column_name = column_def.name.to_string();
46-
let column_datatype = LogicalType::try_from(column_def.data_type).unwrap();
47-
let is_primary = false;
48-
let column_desc = ColumnDesc::new(column_datatype, is_primary);
49-
ColumnCatalog::new(column_name, column_desc)
48+
let column_desc = ColumnDesc::new(
49+
LogicalType::try_from(column_def.data_type).unwrap(),
50+
false
51+
);
52+
let mut nullable = false;
53+
54+
// TODO: 这里可以对更多字段可设置内容进行补充
55+
for option_def in column_def.options {
56+
match option_def.option {
57+
ColumnOption::Null => nullable = true,
58+
ColumnOption::NotNull => (),
59+
_ => todo!()
60+
}
61+
}
62+
63+
ColumnCatalog::new(column_name, nullable, column_desc)
5064
}
5165
}
5266

src/catalog/root.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,12 @@ mod tests {
6767

6868
let col0 = ColumnCatalog::new(
6969
"a".to_string(),
70+
false,
7071
ColumnDesc::new(LogicalType::Integer, false),
7172
);
7273
let col1 = ColumnCatalog::new(
7374
"b".to_string(),
75+
false,
7476
ColumnDesc::new(LogicalType::Boolean, false),
7577
);
7678
let col_catalogs = vec![col0, col1];

src/catalog/table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ mod tests {
8181
// | 1 | true |
8282
// | 2 | false |
8383
fn test_table_catalog() {
84-
let col0 = ColumnCatalog::new("a".into(), ColumnDesc::new(LogicalType::Integer, false));
85-
let col1 = ColumnCatalog::new("b".into(), ColumnDesc::new(LogicalType::Boolean, false));
84+
let col0 = ColumnCatalog::new("a".into(), false, ColumnDesc::new(LogicalType::Integer, false));
85+
let col1 = ColumnCatalog::new("b".into(), false, ColumnDesc::new(LogicalType::Boolean, false));
8686
let col_catalogs = vec![col0, col1];
8787
let table_catalog = TableCatalog::new("test".to_string(), col_catalogs).unwrap();
8888

src/db.rs

Lines changed: 87 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ use sqlparser::parser::ParserError;
88

99
use crate::binder::{BindError, Binder, BinderContext};
1010
use crate::catalog::ColumnCatalog;
11+
use crate::execution_v1::physical_plan::physical_plan_builder::PhysicalPlanBuilder;
12+
use crate::execution_v1::volcano_executor::VolcanoExecutor;
1113
use crate::parser::parse_sql;
1214
use crate::planner::LogicalPlan;
1315
use crate::storage::memory::InMemoryStorage;
14-
use crate::storage::{Storage, StorageError};
16+
use crate::storage::{Storage, StorageError, StorageImpl};
1517
use crate::types::IdGenerator;
1618

1719
#[derive(Debug)]
@@ -33,7 +35,7 @@ impl Database {
3335
}
3436

3537
/// Run SQL queries.
36-
pub fn run(&mut self, sql: &str) -> Result<()> {
38+
pub async fn run(&self, sql: &str) -> Result<Vec<RecordBatch>> {
3739
// parse
3840
let stmts = parse_sql(sql)?;
3941
// bind
@@ -49,36 +51,44 @@ impl Database {
4951
/// Limit(1)
5052
/// Project(a,b)
5153
let logical_plan = binder.bind(&stmts[0])?;
52-
println!("logic plan {:?}", logical_plan);
53-
54-
// let physical_planner = PhysicalPlaner::default();
55-
// let executor_builder = ExecutorBuilder::new(self.env.clone());
56-
57-
// let physical_plan = physical_planner.plan(logical_plan)?;
58-
// let executor = executor_builder.build(physical_plan)?;
59-
// futures::executor::block_on(executor).unwrap();
60-
61-
/// THE FOLLOWING CODE IS FOR TESTING ONLY
62-
/// THE FINAL CODE WILL BE IN executor MODULE
63-
if let LogicalPlan::CreateTable(plan) = logical_plan {
64-
let mut columns = Vec::new();
65-
plan.columns.iter().for_each(|c| {
66-
columns.push(ColumnCatalog::new(c.0.clone(), c.1.clone()));
67-
});
68-
let table_name = plan.table_name.clone();
69-
// columns->batch record
70-
let mut data = Vec::new();
71-
72-
columns.iter().for_each(|c| {
73-
let batch = RecordBatch::new_empty(Arc::new(Schema::new(vec![c.to_field()])));
74-
data.push(batch);
75-
});
76-
77-
self.storage
78-
.create_table(IdGenerator::build(), table_name.as_str(), data)?;
79-
}
80-
81-
Ok(())
54+
println!("logic plan {:#?}", logical_plan);
55+
56+
let mut builder = PhysicalPlanBuilder::new();
57+
let operator = builder.build_plan(&logical_plan)?;
58+
59+
let storage = StorageImpl::InMemoryStorage(self.storage.clone());
60+
let executor = VolcanoExecutor::new(storage);
61+
62+
let mut stream = executor.build(operator);
63+
64+
Ok(VolcanoExecutor::try_collect(&mut stream).await?)
65+
66+
// // let physical_planner = PhysicalPlaner::default();
67+
// // let executor_builder = ExecutorBuilder::new(self.env.clone());
68+
//
69+
// // let physical_plan = physical_planner.plan(logical_plan)?;
70+
// // let executor = executor_builder.build(physical_plan)?;
71+
// // futures::executor::block_on(executor).unwrap();
72+
//
73+
// /// THE FOLLOWING CODE IS FOR TESTING ONLY
74+
// /// THE FINAL CODE WILL BE IN executor MODULE
75+
// if let LogicalPlan::CreateTable(plan) = logical_plan {
76+
// let mut columns = Vec::new();
77+
// plan.columns.iter().for_each(|c| {
78+
// columns.push(ColumnCatalog::new(c.0.clone(), c.1, c.2.clone()));
79+
// });
80+
// let table_name = plan.table_name.clone();
81+
// // columns->batch record
82+
// let mut data = Vec::new();
83+
//
84+
// columns.iter().for_each(|c| {
85+
// let batch = RecordBatch::new_empty(Arc::new(Schema::new(vec![c.to_field()])));
86+
// data.push(batch);
87+
// });
88+
//
89+
// self.storage
90+
// .create_table(IdGenerator::build(), table_name.as_str(), data)?;
91+
// }
8292
}
8393
}
8494

@@ -113,3 +123,48 @@ pub enum DatabaseError {
113123
#[error("Internal error: {0}")]
114124
InternalError(String),
115125
}
126+
127+
#[cfg(test)]
128+
mod test {
129+
use std::sync::Arc;
130+
use arrow::array::Int32Array;
131+
use arrow::datatypes::Schema;
132+
use arrow::record_batch::RecordBatch;
133+
use itertools::Itertools;
134+
use crate::catalog::{ColumnCatalog, ColumnDesc};
135+
use crate::db::Database;
136+
use crate::execution_v1::ExecutorError;
137+
use crate::storage::{Storage, StorageError};
138+
use crate::storage::memory::InMemoryStorage;
139+
use crate::types::{IdGenerator, LogicalType, TableId};
140+
141+
fn build_table(storage: &impl Storage) -> Result<TableId, StorageError> {
142+
let fields = vec![
143+
ColumnCatalog::new(
144+
"c1".to_string(),
145+
false,
146+
ColumnDesc::new(LogicalType::Integer, true)
147+
).to_field(),
148+
];
149+
let batch = RecordBatch::try_new(
150+
Arc::new(Schema::new(fields)),
151+
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))]
152+
).unwrap();
153+
154+
Ok(storage.create_table("t1", vec![batch])?)
155+
}
156+
157+
#[test]
158+
fn test_run_sql() -> anyhow::Result<()> {
159+
let mut database = Database::new_on_mem();
160+
161+
let i = build_table(&database.storage)?;
162+
163+
tokio_test::block_on(async move {
164+
let batch = database.run("select * from t1").await?;
165+
println!("{:#?}", batch);
166+
167+
Ok(())
168+
})
169+
}
170+
}

src/execution/executor_graph.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,8 @@ use super::{
1010
executor::ExecutionQueue,
1111
parallel::{
1212
meta_pipeline::MetaPipeline,
13-
pipeline::Pipeline,
1413
pipeline_event::{PipelineEvent, PipelineEventStack},
1514
},
16-
physical::PhysicalOperator,
1715
};
1816
use anyhow::Result;
1917

@@ -150,7 +148,7 @@ impl ExecutorGraph {
150148

151149
// Set up the dependencies within this `MetaPipeline`.
152150
for pipeline in pipelines.iter() {
153-
if let Some(source) = pipeline.get_source() {
151+
if let Some(_source) = pipeline.get_source() {
154152
// if (source->type ==
155153
// PhysicalOperatorType::TABLE_SCAN) { //
156154
// we have to reset the source here (in the main thread),
@@ -213,6 +211,7 @@ impl ExecutorGraph {
213211
self.graph[index].clone()
214212
}
215213

214+
#[allow(dead_code)]
216215
fn get_prev_nodes(&self, index: NodeIndex) -> Vec<Arc<PipelineEvent>> {
217216
let mut prev_nodes = vec![];
218217
for edge in self.graph.edges_directed(index, Direction::Incoming) {

0 commit comments

Comments
 (0)