Skip to content

Commit d2ed315

Browse files
eliasyaoycloloxwg
authored andcommitted
feat: executor graph.
1 parent c553a6b commit d2ed315

33 files changed

Lines changed: 1592 additions & 167 deletions

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ serde = { version = "1", features = ["derive", "rc"] }
3131
serde_json = "1"
3232
async-trait = "0.1.68"
3333
integer-encoding = "3.0.4"
34+
petgraph = "0.6.3"
35+
futures-async-stream = "0.2.6"
36+
async-channel = "1.8.0"
37+
async-backtrace = "0.2.6"
38+
futures = "0.3.25"
39+
futures-lite = "1.12.0"
3440

3541
[dev-dependencies]
3642
ctor = "0.2.0"

src/executor/execute_builder.rs

Lines changed: 0 additions & 1 deletion
This file was deleted.

src/executor/execute_runtime.rs

Lines changed: 0 additions & 108 deletions
This file was deleted.

src/executor/execute_task.rs

Lines changed: 0 additions & 1 deletion
This file was deleted.

src/executor/executor_graph.rs

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
use std::{collections::HashMap, sync::Arc};
2+
3+
use petgraph::{
4+
stable_graph::{NodeIndex, StableGraph},
5+
visit::EdgeRef,
6+
Direction,
7+
};
8+
9+
use super::{
10+
parallel::{
11+
meta_pipeline::MetaPipeline,
12+
pipeline::Pipeline,
13+
pipeline_event::{PipelineEvent, PipelineEventStack},
14+
},
15+
physical::PhysicalOperator,
16+
};
17+
use anyhow::Result;
18+
19+
enum State {
20+
Idle,
21+
Processing,
22+
Finished,
23+
}
24+
25+
pub struct PipelineNode {
26+
state: std::sync::Mutex<State>,
27+
pipeline: Pipeline,
28+
}
29+
30+
impl PipelineNode {
31+
pub fn create(pipeline: &Pipeline) -> Arc<PipelineNode> {
32+
Arc::new(PipelineNode {
33+
state: std::sync::Mutex::new(State::Idle),
34+
pipeline: pipeline.clone(),
35+
})
36+
}
37+
}
38+
39+
type StateLockGuard = ExecutorGraph;
40+
41+
pub struct ExecutorGraph {
42+
graph: StableGraph<Arc<PipelineEvent>, ()>,
43+
}
44+
45+
impl ExecutorGraph {
46+
pub fn from_pipeline(mut meta_pipeline: MetaPipeline) -> Result<ExecutorGraph> {
47+
let mut graph: StableGraph<Arc<PipelineEvent>, ()> = StableGraph::new();
48+
Self::init_graph(&mut meta_pipeline, &mut graph)?;
49+
Ok(ExecutorGraph { graph })
50+
}
51+
52+
fn init_graph(
53+
meta_pipeline: &mut MetaPipeline,
54+
graph: &mut StableGraph<Arc<PipelineEvent>, ()>,
55+
) -> Result<()> {
56+
// Get the remaining pipelines.
57+
let pipelines = meta_pipeline.get_pipelines(false)?;
58+
59+
let to_schedue = meta_pipeline.get_meta_pipelines(true, true)?;
60+
61+
// Value -> initialize_event_id, running_event_id, finish_event_id,
62+
// complete_event_id.
63+
let mut pipeline_graph_mapping: HashMap<usize, PipelineEventStack> = HashMap::new();
64+
65+
for meta_pipe in to_schedue {
66+
// let mut pipes_edges: Vec<Vec<Edge>> = Vec::new();
67+
let base_pipeline = Arc::new(meta_pipe.get_base_pipeline()?);
68+
let base_initialize_event = Arc::new(PipelineEvent::create_initialize_event(
69+
base_pipeline.clone(),
70+
));
71+
let base_running_event =
72+
Arc::new(PipelineEvent::create_running_event(base_pipeline.clone()));
73+
let base_finish_event =
74+
Arc::new(PipelineEvent::create_finish_event(base_pipeline.clone()));
75+
let base_complete_event = Arc::new(PipelineEvent::create_complete_event(
76+
true,
77+
base_pipeline.clone(),
78+
));
79+
80+
let base_init_event_node_id = graph.add_node(base_initialize_event);
81+
let base_running_event_node_id = graph.add_node(base_running_event);
82+
let base_finish_event_node_id = graph.add_node(base_finish_event);
83+
let base_complete_event_node_id = graph.add_node(base_complete_event);
84+
85+
// Add base stack.
86+
pipeline_graph_mapping.insert(
87+
base_pipeline.get_pipeline_id(),
88+
PipelineEventStack {
89+
pipeline_initialize_event: base_init_event_node_id,
90+
pipeline_event: base_running_event_node_id,
91+
pipeline_finish_event: base_finish_event_node_id,
92+
pipeline_complete_event: base_complete_event_node_id,
93+
},
94+
);
95+
96+
// Dependencies: initialize -> running -> finish -> complete.
97+
graph.add_edge(base_init_event_node_id, base_running_event_node_id, ());
98+
graph.add_edge(base_running_event_node_id, base_finish_event_node_id, ());
99+
graph.add_edge(base_finish_event_node_id, base_complete_event_node_id, ());
100+
101+
let pipelines = meta_pipe.get_pipelines(false)?;
102+
for idx in 1..pipelines.len() {
103+
let pipeline = &pipelines[idx];
104+
105+
let pipeline_running_event = Arc::new(PipelineEvent::create_running_event(
106+
Arc::from(pipeline.clone()),
107+
));
108+
109+
let running_event_node_id = graph.add_node(pipeline_running_event);
110+
111+
match meta_pipe.get_finish_group(pipeline.get_pipeline_id()) {
112+
Some(finish_group) => {
113+
let mapping_stack = pipeline_graph_mapping.get(finish_group).unwrap();
114+
let stack = PipelineEventStack {
115+
pipeline_initialize_event: base_init_event_node_id,
116+
pipeline_event: base_running_event_node_id,
117+
pipeline_finish_event: mapping_stack.pipeline_finish_event,
118+
pipeline_complete_event: base_complete_event_node_id,
119+
};
120+
121+
// Dependencies: base_finish -> pipeline_event -> group_finish
122+
graph.add_edge(base_running_event_node_id, running_event_node_id, ());
123+
graph.add_edge(
124+
running_event_node_id,
125+
mapping_stack.pipeline_finish_event,
126+
(),
127+
);
128+
129+
pipeline_graph_mapping.insert(pipeline.get_pipeline_id(), stack);
130+
}
131+
None => match meta_pipe.has_finish_event(pipeline.get_pipeline_id()) {
132+
true => {
133+
// Dependencies: base_finish -> pipeline_event ->
134+
// pipeline_finish -> base_complete
135+
let pipeline_finish_event = Arc::new(
136+
PipelineEvent::create_finish_event(Arc::from(pipeline.clone())),
137+
);
138+
139+
let finish_event_node_id = graph.add_node(pipeline_finish_event);
140+
141+
graph.add_edge(base_finish_event_node_id, running_event_node_id, ());
142+
graph.add_edge(running_event_node_id, finish_event_node_id, ());
143+
144+
pipeline_graph_mapping.insert(
145+
pipeline.get_pipeline_id(),
146+
PipelineEventStack {
147+
pipeline_initialize_event: base_init_event_node_id,
148+
pipeline_event: running_event_node_id,
149+
pipeline_finish_event: finish_event_node_id,
150+
pipeline_complete_event: base_complete_event_node_id,
151+
},
152+
);
153+
}
154+
false => {
155+
// Dependencies: base_initialize -> pipeline_event -> base_finish.
156+
graph.add_edge(base_init_event_node_id, running_event_node_id, ());
157+
158+
pipeline_graph_mapping.insert(
159+
pipeline.get_pipeline_id(),
160+
PipelineEventStack {
161+
pipeline_initialize_event: base_init_event_node_id,
162+
pipeline_event: running_event_node_id,
163+
pipeline_finish_event: base_finish_event_node_id,
164+
pipeline_complete_event: base_complete_event_node_id,
165+
},
166+
);
167+
}
168+
},
169+
}
170+
}
171+
172+
// Set up the dependencies within this `MetaPipeline`.
173+
for pipeline in pipelines.iter() {
174+
if let Some(source) = pipeline.get_source() {
175+
// if (source->type ==
176+
// PhysicalOperatorType::TABLE_SCAN) { //
177+
// we have to reset the source here (in the main thread),
178+
// because some of our clients (looking at you, R)
179+
// // do not like it when threads other than the main thread
180+
// call into R, for e.g., arrow scans
181+
// pipeline->ResetSource(true);
182+
// }
183+
}
184+
185+
match meta_pipe.find_dependencies(pipeline.get_pipeline_id())? {
186+
Some(dep_pipes) => dep_pipes.iter().for_each(|dep_pipe| {
187+
if let Some(dependency_stack) = pipeline_graph_mapping.get(dep_pipe) {
188+
let stack = pipeline_graph_mapping
189+
.get(&pipeline.get_pipeline_id())
190+
.unwrap();
191+
graph.add_edge(
192+
dependency_stack.pipeline_event,
193+
stack.pipeline_event,
194+
(),
195+
);
196+
}
197+
}),
198+
None => continue,
199+
}
200+
}
201+
}
202+
203+
// // Set up the dependencies across `MetaPipeline`.
204+
// for pipe in all_pipelines.iter() {
205+
// for pipe_ix in pipe.get_dependencies().iter() {
206+
// let from = *pipeline_graph_mapping.get(pipe_ix).unwrap();
207+
// let to =
208+
// *pipeline_graph_mapping.get(&pipe.get_pipeline_id()).unwrap();
209+
// if !graph.contains_edge(from, to) {
210+
// graph.add_edge(from, to, ());
211+
// }
212+
// }
213+
// }
214+
Ok(())
215+
}
216+
217+
pub fn init_schedule_queue(&self, threads_num: usize) -> Result<()> {
218+
// Schedule source node.
219+
for node_idx in self.graph.externals(Direction::Outgoing) {
220+
let node = self.get_node_by_index(node_idx);
221+
}
222+
Ok(())
223+
}
224+
225+
fn get_node_by_index(&self, index: NodeIndex) -> Arc<PipelineEvent> {
226+
self.graph[index].clone()
227+
}
228+
229+
fn get_prev_nodes(&self, index: NodeIndex) -> Vec<Arc<PipelineEvent>> {
230+
let mut prev_nodes = vec![];
231+
for edge in self.graph.edges_directed(index, Direction::Incoming) {
232+
prev_nodes.push(self.graph[edge.source()].clone());
233+
}
234+
prev_nodes
235+
}
236+
237+
fn get_next_nodes(&self, index: NodeIndex) -> Vec<Arc<PipelineEvent>> {
238+
let mut next_nodes = vec![];
239+
for edge in self.graph.edges_directed(index, Direction::Outgoing) {
240+
next_nodes.push(self.graph[edge.target()].clone());
241+
}
242+
next_nodes
243+
}
244+
245+
fn get_all_nodes(&self) -> Vec<Arc<PipelineEvent>> {
246+
self.graph.node_weights().cloned().collect()
247+
}
248+
249+
fn get_last_node(&self) -> Arc<PipelineEvent> {
250+
let mut last_node = None;
251+
for node in self.graph.node_indices() {
252+
if self
253+
.graph
254+
.edges_directed(node, petgraph::Direction::Outgoing)
255+
.count()
256+
== 0
257+
{
258+
last_node = Some(self.graph[node].clone());
259+
}
260+
}
261+
last_node.unwrap()
262+
}
263+
}

src/executor/executor_task.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
2+
pub trait Task {
3+
4+
}

0 commit comments

Comments
 (0)