Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/taurus-core/src/handler/argument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tucana::shared::SubFlowSetting;
#[derive(Clone)]
pub struct FunctionThunk {
pub identifier: String,
pub result_id: Option<i64>,
pub parameter_index: i64,
pub settings: Vec<SubFlowSetting>,
}
Expand All @@ -21,6 +22,7 @@ impl fmt::Debug for FunctionThunk {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FunctionThunk")
.field("identifier", &self.identifier)
.field("result_id", &self.result_id)
.field("parameter_index", &self.parameter_index)
.field("settings_len", &self.settings.len())
.finish()
Expand Down
88 changes: 88 additions & 0 deletions crates/taurus-core/src/runtime/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,94 @@ mod tests {
assert_node_result_id(&report.node_execution_results[3], 1);
}

#[test]
fn execution_report_keeps_every_for_each_function_identifier_callback_execution() {
let engine = ExecutionEngine::new();
let mut respond_node = node(
1,
"rest::control::respond",
vec![
literal_param(1, "http_status_code", int_value(200)),
literal_param(2, "headers", empty_struct_value()),
literal_param(3, "payload", string_value("20")),
],
None,
);
respond_node.definition_source = Some("draco-draco-cron".to_string());
let mut for_each_node = node(
2,
"std::list::for_each",
vec![
literal_param(
4,
"list",
list_value(vec![int_value(1), int_value(2), int_value(3)]),
),
function_thunk_param(
5,
"consumer",
"std::boolean::from_number",
vec![subflow_setting("value", Some(null_value()), false, false)],
),
],
Some(1),
);
for_each_node.definition_source = Some("draco-draco-cron".to_string());

let report = engine.execute_graph_report(
2,
vec![respond_node, for_each_node],
None,
None,
None,
false,
);

assert_eq!(report.exit_reason, ExitReason::Success);
assert_eq!(expect_success(report.signal), {
let mut fields = std::collections::HashMap::new();
fields.insert("http_status_code".to_string(), int_value(200));
fields.insert("headers".to_string(), empty_struct_value());
fields.insert("payload".to_string(), string_value("20"));
Value {
kind: Some(Kind::StructValue(Struct { fields })),
}
});
assert_eq!(report.node_execution_results.len(), 5);

let function_results: Vec<_> = report
.node_execution_results
.iter()
.filter(|result| result.id == Some(node_execution_result::Id::FunctionId(5)))
.collect();
assert_eq!(function_results.len(), 3);

for (index, result) in function_results.iter().enumerate() {
assert_eq!(result.parameter_results.len(), 1);
assert_eq!(
result.parameter_results[0].value,
Some(int_value(index as i64 + 1))
);
match result.result.as_ref() {
Some(node_execution_result::Result::Success(value)) => {
assert_eq!(
value,
&Value {
kind: Some(Kind::BoolValue(true)),
}
);
}
other => panic!("expected function success result, got {:?}", other),
}
}

assert_function_result_id(&report.node_execution_results[0], 5);
assert_function_result_id(&report.node_execution_results[1], 5);
assert_function_result_id(&report.node_execution_results[2], 5);
assert_node_result_id(&report.node_execution_results[3], 2);
assert_node_result_id(&report.node_execution_results[4], 1);
}

#[test]
fn emitter_emits_start_and_finish_for_successful_execution() {
let engine = ExecutionEngine::new();
Expand Down
1 change: 1 addition & 0 deletions crates/taurus-core/src/runtime/engine/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub fn compile_flow(
Some(sub_flow::ExecutionReference::FunctionIdentifier(identifier)) => {
CompiledArg::Deferred(CompiledThunk::Function {
identifier: identifier.clone(),
result_id: identifier.parse().ok().or(Some(parameter.database_id)),
parameter_index: parameter_index as i64,
settings: sub_flow.settings.clone(),
})
Expand Down
8 changes: 6 additions & 2 deletions crates/taurus-core/src/runtime/engine/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ impl<'a> EngineExecutor<'a> {
}

let mut fields = HashMap::new();
for (parameter, value) in node.parameters.iter().zip(values.into_iter()) {
for (parameter, value) in node.parameters.iter().zip(values) {
fields.insert(parameter.runtime_parameter_id.clone(), value);
}

Expand Down Expand Up @@ -959,18 +959,22 @@ fn compiled_thunk_to_argument(thunk: &CompiledThunk) -> Thunk {
CompiledThunk::Node(node_id) => Thunk::Node(*node_id),
CompiledThunk::Function {
identifier,
result_id,
parameter_index,
settings,
} => Thunk::Function(FunctionThunk {
identifier: identifier.clone(),
result_id: *result_id,
parameter_index: *parameter_index,
settings: settings.clone(),
}),
}
}

fn parse_function_result_id(function: &FunctionThunk) -> Option<i64> {
function.identifier.parse::<i64>().ok()
function
.result_id
.or_else(|| function.identifier.parse::<i64>().ok())
}

fn resolve_function_setting(
Expand Down
1 change: 1 addition & 0 deletions crates/taurus-core/src/runtime/engine/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub enum CompiledThunk {
Node(i64),
Function {
identifier: String,
result_id: Option<i64>,
parameter_index: i64,
settings: Vec<SubFlowSetting>,
},
Expand Down
4 changes: 1 addition & 3 deletions crates/taurus-core/src/runtime/functions/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,9 @@ fn decode_response_payload(response: http::Response<Body>) -> Result<Value, Stri
.as_deref()
.map(content_type_is_json)
.unwrap_or(false)
{
if let Ok(json) = serde_json::from_str::<JsonValue>(&text) {
&& let Ok(json) = serde_json::from_str::<JsonValue>(&text) {
return Ok(from_json_value(json));
}
}

return Ok(text.to_value());
}
Expand Down
87 changes: 81 additions & 6 deletions crates/taurus-manual/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ use taurus_provider::providers::remote::nats_remote_runtime::NATSRemoteRuntime;
use tucana::shared::ExecutionFlow;
use tucana::shared::NodeExecutionResult;
use tucana::shared::ValidationFlow;
use tucana::shared::Value;
use tucana::shared::helper::value::from_json_value;
use tucana::shared::helper::value::to_json_value;
use tucana::shared::node_execution_result::Id as NodeExecutionResultId;
use tucana::shared::node_execution_result::Result as NodeExecutionResultResult;
use tucana::shared::value::Kind;

#[derive(Clone, Deserialize)]
pub struct Input {
Expand Down Expand Up @@ -161,10 +164,7 @@ async fn main() {
}

let flow_input = match case.inputs.get(index as usize) {
Some(inp) => match inp.input.clone() {
Some(json_input) => Some(from_json_value(json_input)),
None => None,
},
Some(inp) => inp.input.clone().map(from_json_value),
None => None,
};

Expand All @@ -182,7 +182,7 @@ async fn main() {
);
let duration_us = start.elapsed().as_micros();
let finished_at = now_unix_micros();
print_timing_debug(
print_manual_execution_debug(
started_at,
finished_at,
duration_us,
Expand Down Expand Up @@ -223,7 +223,7 @@ async fn main() {
);
let duration_us = start.elapsed().as_micros();
let finished_at = now_unix_micros();
print_timing_debug(
print_manual_execution_debug(
started_at,
finished_at,
duration_us,
Expand Down Expand Up @@ -301,6 +301,18 @@ async fn queue_execution(
println!("{}", execution_id);
}

fn print_manual_execution_debug(
started_at: i64,
finished_at: i64,
duration_us: u128,
node_results: &[NodeExecutionResult],
) {
let mut normalized_results = node_results.to_vec();
normalize_node_execution_results(&mut normalized_results);
print_timing_debug(started_at, finished_at, duration_us, &normalized_results);
print_execution_result_debug(&normalized_results);
}

fn print_timing_debug(
started_at: i64,
finished_at: i64,
Expand Down Expand Up @@ -342,6 +354,69 @@ fn print_timing_debug(
}
}

fn print_execution_result_debug(node_results: &[NodeExecutionResult]) {
eprintln!("[manual execution result] {:#?}", node_results);
}

fn normalize_node_execution_results(node_results: &mut [NodeExecutionResult]) {
for result in node_results {
normalize_node_execution_result(result);
}
}

fn normalize_node_execution_result(result: &mut NodeExecutionResult) {
for parameter_result in &mut result.parameter_results {
match &mut parameter_result.value {
Some(value) => normalize_value(value),
None => {
parameter_result.value = Some(null_value());
}
}
}

match &mut result.result {
Some(NodeExecutionResultResult::Success(value)) => normalize_value(value),
Some(NodeExecutionResultResult::Error(error)) => {
if let Some(details) = &mut error.details {
for value in details.fields.values_mut() {
normalize_value(value);
}
}
}
None => {
result.result = Some(NodeExecutionResultResult::Success(null_value()));
}
}
}

fn normalize_value(value: &mut Value) {
match &mut value.kind {
Some(Kind::StructValue(struct_value)) => {
for field in struct_value.fields.values_mut() {
normalize_value(field);
}
}
Some(Kind::ListValue(list_value)) => {
for item in &mut list_value.values {
normalize_value(item);
}
}
Some(Kind::NumberValue(number)) if number.number.is_none() => {
value.kind = Some(Kind::NullValue(0));
}
Some(_) => {}
None => {
value.kind = Some(Kind::NullValue(0));
}
}
}

fn null_value() -> Value {
Value {
kind: Some(Kind::NullValue(0)),
}
}

fn execution_result_id_label(result: &NodeExecutionResult) -> String {
match result.id {
Some(NodeExecutionResultId::NodeId(id)) => format!("node_id={}", id),
Expand Down
5 changes: 2 additions & 3 deletions crates/taurus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@ pub async fn run() {
wait_for_shutdown(&mut worker_task, &mut health_task).await;
if let Some(handle) = runtime_status_heartbeat_task.take() {
handle.abort();
if let Err(err) = handle.await {
if !err.is_cancelled() {
if let Err(err) = handle.await
&& !err.is_cancelled() {
log::warn!("Runtime status heartbeat task ended unexpectedly: {}", err);
}
}
}
update_stopped_status(runtime_status_service.as_ref()).await;

Expand Down
Loading