Skip to content

Commit f860cad

Browse files
eliasyaoycloloxwg
authored andcommitted
refactor: pipeline event.
1 parent e8271a3 commit f860cad

36 files changed

Lines changed: 601 additions & 393 deletions

src/execution/executor.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use std::{collections::VecDeque, sync::Arc};
2+
3+
use anyhow::Result;
4+
5+
use super::executor_graph::ExecutorGraph;
6+
use super::executor_task::ExecutionTask;
7+
use super::parallel::pipeline_builder::PipelineBuilder;
8+
use super::physical::PhysicalOperatorRef;
9+
use super::runtime::thread::Thread;
10+
use super::runtime::thread::ThreadJoinHandle;
11+
use super::runtime::ExecutionRuntime;
12+
use super::runtime::TrySpawn;
13+
14+
pub struct ExecutionQueue(pub VecDeque<ExecutionTask>);
15+
16+
impl ExecutionQueue {
17+
pub fn push_task(&mut self, task: ExecutionTask) {
18+
self.0.push_back(task);
19+
}
20+
}
21+
22+
pub struct ExecutionContext {
23+
pub global_execution_queue: ExecutionQueue,
24+
}
25+
26+
impl ExecutionContext {
27+
pub fn get_global_execution_queue(&mut self) -> &mut ExecutionQueue {
28+
&mut self.global_execution_queue
29+
}
30+
}
31+
32+
pub struct Executor {
33+
pub context: ExecutionContext,
34+
pub graph: ExecutorGraph,
35+
pub threads_num: usize,
36+
pub runtime: Arc<ExecutionRuntime>,
37+
}
38+
39+
impl Executor {
40+
pub async fn initialize(plan: PhysicalOperatorRef) -> Result<Executor> {
41+
let mut builder = PipelineBuilder::new();
42+
let meta_pipeline = builder.finalize(plan)?;
43+
44+
let graph = ExecutorGraph::from_pipeline(meta_pipeline)?;
45+
46+
Ok(Executor {
47+
context: ExecutionContext {
48+
global_execution_queue: ExecutionQueue(VecDeque::new()),
49+
},
50+
graph,
51+
threads_num: 0,
52+
runtime: Arc::new(ExecutionRuntime::with_default_worker_threads()?),
53+
})
54+
}
55+
56+
pub async fn execute(&mut self) -> Result<()> {
57+
// Schedule the pipelines that do not have dependencies.
58+
self.graph
59+
.init_schedule_event_queue(&mut self.context.global_execution_queue)
60+
.await;
61+
62+
// Pull execution task from `GlobalExecutionQueue`.
63+
self.runtime.spawn(ExecutionTask::empty());
64+
Ok(())
65+
}
66+
67+
fn execute_threads(self: &Arc<Self>) -> Vec<ThreadJoinHandle<Result<()>>> {
68+
let mut thread_join_handles = Vec::with_capacity(self.threads_num);
69+
for thread_num in 0..self.threads_num {
70+
let this = self.clone();
71+
#[allow(unused_mut)]
72+
let mut name = Some(format!("PipelineTask-{}", thread_num));
73+
74+
thread_join_handles.push(Thread::named_spawn(name, move || unsafe {
75+
// let this_clone = this.clone();
76+
77+
// let try_result = catch_unwind(move || -> Result<()> {
78+
// match this_clone.execute_single_thread(thread_num) {
79+
// Ok(_) => Ok(()),
80+
// Err(cause) => Err(cause),
81+
// }
82+
// });
83+
84+
// // finish the pipeline executor when has error or panic
85+
// if let Err(cause) = try_result.flatten() {
86+
// // this.finish(Some(cause));
87+
// }
88+
89+
Ok(())
90+
}));
91+
}
92+
thread_join_handles
93+
}
94+
95+
unsafe fn execute_single_thread(&self, thread_num: usize) -> Result<()> {
96+
Ok(())
97+
}
98+
99+
fn complete_pipeline(&self) {}
100+
101+
fn finish_task(&self) -> Result<()> {
102+
Ok(())
103+
}
104+
}
Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use petgraph::{
77
};
88

99
use super::{
10+
executor::ExecutionQueue,
1011
parallel::{
1112
meta_pipeline::MetaPipeline,
1213
pipeline::Pipeline,
@@ -16,28 +17,6 @@ use super::{
1617
};
1718
use anyhow::Result;
1819

19-
enum State {
20-
Idle,
21-
Processing,
22-
Finished,
23-
}
24-
25-
pub struct PipelineNode {
26-
state: std::sync::Mutex<State>,
27-
pipeline: Pipeline,
28-
}
29-
30-
impl PipelineNode {
31-
pub fn create(pipeline: &Pipeline) -> Arc<PipelineNode> {
32-
Arc::new(PipelineNode {
33-
state: std::sync::Mutex::new(State::Idle),
34-
pipeline: pipeline.clone(),
35-
})
36-
}
37-
}
38-
39-
type StateLockGuard = ExecutorGraph;
40-
4120
pub struct ExecutorGraph {
4221
graph: StableGraph<Arc<PipelineEvent>, ()>,
4322
}
@@ -219,12 +198,15 @@ impl ExecutorGraph {
219198
Ok(())
220199
}
221200

222-
pub fn init_schedule_queue(&self, threads_num: usize) -> Result<()> {
201+
pub async fn init_schedule_event_queue(&self, global_execution_queue: &mut ExecutionQueue) {
223202
// Schedule source node.
224203
for node_idx in self.graph.externals(Direction::Outgoing) {
225204
let node = self.get_node_by_index(node_idx);
205+
// todo: fix deref pointer.
206+
let pipeline_event =
207+
unsafe { &mut *(node.as_ref() as *const PipelineEvent as *mut PipelineEvent) };
208+
pipeline_event.schedule(global_execution_queue).await;
226209
}
227-
Ok(())
228210
}
229211

230212
fn get_node_by_index(&self, index: NodeIndex) -> Arc<PipelineEvent> {

src/execution/executor_task.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
use std::{
2+
pin::Pin,
3+
sync::Arc,
4+
task::{Context, Poll},
5+
};
6+
7+
use anyhow::anyhow;
8+
use anyhow::Result;
9+
use futures::{future::BoxFuture, Future};
10+
11+
pub enum TaskExecutionMode {
12+
Complete,
13+
Partial,
14+
}
15+
16+
pub enum TaskExecutionResult {
17+
Finished,
18+
NotFinished,
19+
Error,
20+
Blocked,
21+
}
22+
23+
#[async_trait::async_trait]
24+
pub trait Task {
25+
/// The name of task.
26+
fn name(&self) -> String;
27+
28+
/// Execute the task in the specified execution mode
29+
/// * If mode is Complete, Execute should always finish processing and
30+
/// return Finished
31+
/// * If mode is Partial, Execute can return not_finished, in which case
32+
/// Execute will be called again
33+
/// * In case of an error, error is returned
34+
/// * In case the task has interrupted, blocked is returned.
35+
async fn execute(&mut self, mode: TaskExecutionMode) -> Result<TaskExecutionResult>;
36+
}
37+
38+
pub struct ExecutionTask {
39+
inner: BoxFuture<'static, Result<()>>,
40+
}
41+
42+
impl ExecutionTask {
43+
// pub fn create<Inner>(inner: Inner) -> ExecutionTask
44+
// where
45+
// Inner: Future<Output = Result<()>> + Send + 'static,
46+
// {
47+
// ExecutionTask {
48+
// inner: inner.boxed(),
49+
// }
50+
// }
51+
52+
pub fn create(inner: Arc<dyn Task>) -> ExecutionTask {
53+
todo!()
54+
}
55+
56+
pub fn empty() -> ExecutionTask {
57+
todo!()
58+
}
59+
}
60+
61+
impl Future for ExecutionTask {
62+
type Output = ();
63+
64+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
65+
let inner = self.inner.as_mut();
66+
let try_result =
67+
std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || -> Poll<Result<()>> {
68+
inner.poll(cx)
69+
}));
70+
71+
match try_result {
72+
Ok(Poll::Pending) => Poll::Pending,
73+
Ok(Poll::Ready(_res)) => {
74+
// self.queue.completed_async_task(
75+
// self.workers_condvar.clone(),
76+
// CompletedAsyncTask::create(self.processor_id, self.worker_id, res),
77+
// );
78+
Poll::Ready(())
79+
}
80+
Err(cause) => {
81+
let _res: Result<()> = match cause.downcast_ref::<&'static str>() {
82+
Some(msg) => Err(anyhow!(msg.to_string())),
83+
None => match cause.downcast_ref::<String>() {
84+
Some(msg) => Err(anyhow!(msg.to_string())),
85+
None => Err(anyhow!("unknown panic message".to_string())),
86+
},
87+
};
88+
89+
// self.queue.completed_async_task(
90+
// self.workers_condvar.clone(),
91+
// CompletedAsyncTask::create(self.processor_id, self.worker_id, res),
92+
// );
93+
Poll::Ready(())
94+
}
95+
}
96+
}
97+
}

src/execution/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pub mod executor;
2+
pub mod executor_graph;
3+
pub mod executor_task;
4+
pub mod parallel;
5+
pub mod physical;
6+
pub mod runtime;
Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
use std::collections::HashMap;
22

3-
use crate::executor::{
4-
physical::{PhysicalOperator, PhysicalOperatorRef},
5-
Executor,
6-
};
3+
use crate::execution::executor::Executor;
4+
use crate::execution::physical::{PhysicalOperator, PhysicalOperatorRef};
75

86
use super::pipeline::Pipeline;
9-
use anyhow::anyhow;
107
use anyhow::Result;
118

129
pub type PipelineIx = usize;

src/execution/parallel/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
pub mod meta_pipeline;
2+
pub mod pipeline;
3+
pub mod pipeline_builder;
4+
pub mod pipeline_complete_event;
5+
pub mod pipeline_event;
6+
pub mod pipeline_finish_event;
7+
pub mod pipeline_initialize_event;
8+
pub mod pipeline_running_event;

0 commit comments

Comments
 (0)