diff --git a/crates/taurus-core/src/handler/argument.rs b/crates/taurus-core/src/handler/argument.rs index 2105138..ed9f05e 100644 --- a/crates/taurus-core/src/handler/argument.rs +++ b/crates/taurus-core/src/handler/argument.rs @@ -13,6 +13,7 @@ use tucana::shared::SubFlowSetting; #[derive(Clone)] pub struct FunctionThunk { pub identifier: String, + pub result_id: Option, pub parameter_index: i64, pub settings: Vec, } @@ -21,6 +22,7 @@ impl fmt::Debug for FunctionThunk { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FunctionThunk") .field("identifier", &self.identifier) + .field("result_id", &self.result_id) .field("parameter_index", &self.parameter_index) .field("settings_len", &self.settings.len()) .finish() diff --git a/crates/taurus-core/src/runtime/engine.rs b/crates/taurus-core/src/runtime/engine.rs index 924c804..27a72ae 100644 --- a/crates/taurus-core/src/runtime/engine.rs +++ b/crates/taurus-core/src/runtime/engine.rs @@ -1176,6 +1176,94 @@ mod tests { assert_node_result_id(&report.node_execution_results[3], 1); } + #[test] + fn execution_report_keeps_every_for_each_function_identifier_callback_execution() { + let engine = ExecutionEngine::new(); + let mut respond_node = node( + 1, + "rest::control::respond", + vec![ + literal_param(1, "http_status_code", int_value(200)), + literal_param(2, "headers", empty_struct_value()), + literal_param(3, "payload", string_value("20")), + ], + None, + ); + respond_node.definition_source = Some("draco-draco-cron".to_string()); + let mut for_each_node = node( + 2, + "std::list::for_each", + vec![ + literal_param( + 4, + "list", + list_value(vec![int_value(1), int_value(2), int_value(3)]), + ), + function_thunk_param( + 5, + "consumer", + "std::boolean::from_number", + vec![subflow_setting("value", Some(null_value()), false, false)], + ), + ], + Some(1), + ); + for_each_node.definition_source = Some("draco-draco-cron".to_string()); + + let report = engine.execute_graph_report( + 2, + vec![respond_node, for_each_node], + None, + None, + None, + false, + ); + + assert_eq!(report.exit_reason, ExitReason::Success); + assert_eq!(expect_success(report.signal), { + let mut fields = std::collections::HashMap::new(); + fields.insert("http_status_code".to_string(), int_value(200)); + fields.insert("headers".to_string(), empty_struct_value()); + fields.insert("payload".to_string(), string_value("20")); + Value { + kind: Some(Kind::StructValue(Struct { fields })), + } + }); + assert_eq!(report.node_execution_results.len(), 5); + + let function_results: Vec<_> = report + .node_execution_results + .iter() + .filter(|result| result.id == Some(node_execution_result::Id::FunctionId(5))) + .collect(); + assert_eq!(function_results.len(), 3); + + for (index, result) in function_results.iter().enumerate() { + assert_eq!(result.parameter_results.len(), 1); + assert_eq!( + result.parameter_results[0].value, + Some(int_value(index as i64 + 1)) + ); + match result.result.as_ref() { + Some(node_execution_result::Result::Success(value)) => { + assert_eq!( + value, + &Value { + kind: Some(Kind::BoolValue(true)), + } + ); + } + other => panic!("expected function success result, got {:?}", other), + } + } + + assert_function_result_id(&report.node_execution_results[0], 5); + assert_function_result_id(&report.node_execution_results[1], 5); + assert_function_result_id(&report.node_execution_results[2], 5); + assert_node_result_id(&report.node_execution_results[3], 2); + assert_node_result_id(&report.node_execution_results[4], 1); + } + #[test] fn emitter_emits_start_and_finish_for_successful_execution() { let engine = ExecutionEngine::new(); diff --git a/crates/taurus-core/src/runtime/engine/compiler.rs b/crates/taurus-core/src/runtime/engine/compiler.rs index 47d7eb4..626e29c 100644 --- a/crates/taurus-core/src/runtime/engine/compiler.rs +++ b/crates/taurus-core/src/runtime/engine/compiler.rs @@ -162,6 +162,7 @@ pub fn compile_flow( Some(sub_flow::ExecutionReference::FunctionIdentifier(identifier)) => { CompiledArg::Deferred(CompiledThunk::Function { identifier: identifier.clone(), + result_id: identifier.parse().ok().or(Some(parameter.database_id)), parameter_index: parameter_index as i64, settings: sub_flow.settings.clone(), }) diff --git a/crates/taurus-core/src/runtime/engine/executor.rs b/crates/taurus-core/src/runtime/engine/executor.rs index 013d9d1..42879fd 100644 --- a/crates/taurus-core/src/runtime/engine/executor.rs +++ b/crates/taurus-core/src/runtime/engine/executor.rs @@ -734,7 +734,7 @@ impl<'a> EngineExecutor<'a> { } let mut fields = HashMap::new(); - for (parameter, value) in node.parameters.iter().zip(values.into_iter()) { + for (parameter, value) in node.parameters.iter().zip(values) { fields.insert(parameter.runtime_parameter_id.clone(), value); } @@ -959,10 +959,12 @@ fn compiled_thunk_to_argument(thunk: &CompiledThunk) -> Thunk { CompiledThunk::Node(node_id) => Thunk::Node(*node_id), CompiledThunk::Function { identifier, + result_id, parameter_index, settings, } => Thunk::Function(FunctionThunk { identifier: identifier.clone(), + result_id: *result_id, parameter_index: *parameter_index, settings: settings.clone(), }), @@ -970,7 +972,9 @@ fn compiled_thunk_to_argument(thunk: &CompiledThunk) -> Thunk { } fn parse_function_result_id(function: &FunctionThunk) -> Option { - function.identifier.parse::().ok() + function + .result_id + .or_else(|| function.identifier.parse::().ok()) } fn resolve_function_setting( diff --git a/crates/taurus-core/src/runtime/engine/model.rs b/crates/taurus-core/src/runtime/engine/model.rs index 97eeefa..c10e3a7 100644 --- a/crates/taurus-core/src/runtime/engine/model.rs +++ b/crates/taurus-core/src/runtime/engine/model.rs @@ -26,6 +26,7 @@ pub enum CompiledThunk { Node(i64), Function { identifier: String, + result_id: Option, parameter_index: i64, settings: Vec, }, diff --git a/crates/taurus-core/src/runtime/functions/http.rs b/crates/taurus-core/src/runtime/functions/http.rs index 3ee2b9b..265567c 100644 --- a/crates/taurus-core/src/runtime/functions/http.rs +++ b/crates/taurus-core/src/runtime/functions/http.rs @@ -297,11 +297,9 @@ fn decode_response_payload(response: http::Response) -> Result(&text) { + && let Ok(json) = serde_json::from_str::(&text) { return Ok(from_json_value(json)); } - } return Ok(text.to_value()); } diff --git a/crates/taurus-manual/src/main.rs b/crates/taurus-manual/src/main.rs index e2efc2e..083b82d 100644 --- a/crates/taurus-manual/src/main.rs +++ b/crates/taurus-manual/src/main.rs @@ -14,9 +14,12 @@ use taurus_provider::providers::remote::nats_remote_runtime::NATSRemoteRuntime; use tucana::shared::ExecutionFlow; use tucana::shared::NodeExecutionResult; use tucana::shared::ValidationFlow; +use tucana::shared::Value; use tucana::shared::helper::value::from_json_value; use tucana::shared::helper::value::to_json_value; use tucana::shared::node_execution_result::Id as NodeExecutionResultId; +use tucana::shared::node_execution_result::Result as NodeExecutionResultResult; +use tucana::shared::value::Kind; #[derive(Clone, Deserialize)] pub struct Input { @@ -161,10 +164,7 @@ async fn main() { } let flow_input = match case.inputs.get(index as usize) { - Some(inp) => match inp.input.clone() { - Some(json_input) => Some(from_json_value(json_input)), - None => None, - }, + Some(inp) => inp.input.clone().map(from_json_value), None => None, }; @@ -182,7 +182,7 @@ async fn main() { ); let duration_us = start.elapsed().as_micros(); let finished_at = now_unix_micros(); - print_timing_debug( + print_manual_execution_debug( started_at, finished_at, duration_us, @@ -223,7 +223,7 @@ async fn main() { ); let duration_us = start.elapsed().as_micros(); let finished_at = now_unix_micros(); - print_timing_debug( + print_manual_execution_debug( started_at, finished_at, duration_us, @@ -301,6 +301,18 @@ async fn queue_execution( println!("{}", execution_id); } +fn print_manual_execution_debug( + started_at: i64, + finished_at: i64, + duration_us: u128, + node_results: &[NodeExecutionResult], +) { + let mut normalized_results = node_results.to_vec(); + normalize_node_execution_results(&mut normalized_results); + print_timing_debug(started_at, finished_at, duration_us, &normalized_results); + print_execution_result_debug(&normalized_results); +} + fn print_timing_debug( started_at: i64, finished_at: i64, @@ -342,6 +354,69 @@ fn print_timing_debug( } } +fn print_execution_result_debug(node_results: &[NodeExecutionResult]) { + eprintln!("[manual execution result] {:#?}", node_results); +} + +fn normalize_node_execution_results(node_results: &mut [NodeExecutionResult]) { + for result in node_results { + normalize_node_execution_result(result); + } +} + +fn normalize_node_execution_result(result: &mut NodeExecutionResult) { + for parameter_result in &mut result.parameter_results { + match &mut parameter_result.value { + Some(value) => normalize_value(value), + None => { + parameter_result.value = Some(null_value()); + } + } + } + + match &mut result.result { + Some(NodeExecutionResultResult::Success(value)) => normalize_value(value), + Some(NodeExecutionResultResult::Error(error)) => { + if let Some(details) = &mut error.details { + for value in details.fields.values_mut() { + normalize_value(value); + } + } + } + None => { + result.result = Some(NodeExecutionResultResult::Success(null_value())); + } + } +} + +fn normalize_value(value: &mut Value) { + match &mut value.kind { + Some(Kind::StructValue(struct_value)) => { + for field in struct_value.fields.values_mut() { + normalize_value(field); + } + } + Some(Kind::ListValue(list_value)) => { + for item in &mut list_value.values { + normalize_value(item); + } + } + Some(Kind::NumberValue(number)) if number.number.is_none() => { + value.kind = Some(Kind::NullValue(0)); + } + Some(_) => {} + None => { + value.kind = Some(Kind::NullValue(0)); + } + } +} + +fn null_value() -> Value { + Value { + kind: Some(Kind::NullValue(0)), + } +} + fn execution_result_id_label(result: &NodeExecutionResult) -> String { match result.id { Some(NodeExecutionResultId::NodeId(id)) => format!("node_id={}", id), diff --git a/crates/taurus/src/app/mod.rs b/crates/taurus/src/app/mod.rs index a9988e0..768c189 100644 --- a/crates/taurus/src/app/mod.rs +++ b/crates/taurus/src/app/mod.rs @@ -50,11 +50,10 @@ pub async fn run() { wait_for_shutdown(&mut worker_task, &mut health_task).await; if let Some(handle) = runtime_status_heartbeat_task.take() { handle.abort(); - if let Err(err) = handle.await { - if !err.is_cancelled() { + if let Err(err) = handle.await + && !err.is_cancelled() { log::warn!("Runtime status heartbeat task ended unexpectedly: {}", err); } - } } update_stopped_status(runtime_status_service.as_ref()).await; diff --git a/crates/taurus/src/client/runtime_execution.rs b/crates/taurus/src/client/runtime_execution.rs index a21b7dd..0bd6291 100644 --- a/crates/taurus/src/client/runtime_execution.rs +++ b/crates/taurus/src/client/runtime_execution.rs @@ -4,7 +4,10 @@ use code0_flow::flow_service::{ use tonic::{Extensions, Request, transport::Channel}; use tucana::{ aquila::{ExecutionRequest, execution_service_client::ExecutionServiceClient}, - shared::ExecutionResult, + shared::{ + ExecutionResult, NodeExecutionResult, Value, execution_result, node_execution_result, + value::Kind, + }, }; pub struct TaurusRuntimeExecutionService { @@ -23,7 +26,7 @@ impl TaurusRuntimeExecutionService { } } - pub async fn update_runtime_execution(&mut self, runtime_execution: ExecutionResult) { + pub async fn update_runtime_execution(&mut self, mut runtime_execution: ExecutionResult) { log::info!( "Transmitting execution result to Aquila (execution_id={}, flow_id={}, node_results={})", runtime_execution.execution_identifier.as_str(), @@ -31,6 +34,10 @@ impl TaurusRuntimeExecutionService { runtime_execution.node_execution_results.len() ); + normalize_execution_result(&mut runtime_execution); + + log::debug!("{:#?}", runtime_execution); + let request = Request::from_parts( get_authorization_metadata(&self.aquila_token), Extensions::new(), @@ -52,3 +59,235 @@ impl TaurusRuntimeExecutionService { } } } + +fn normalize_execution_result(result: &mut ExecutionResult) { + match &mut result.input { + Some(input) => normalize_value(input), + None => { + result.input = Some(null_value()); + } + } + + for node_result in &mut result.node_execution_results { + normalize_node_execution_result(node_result); + } + + match &mut result.result { + Some(execution_result::Result::Success(value)) => normalize_value(value), + Some(execution_result::Result::Error(error)) => { + if let Some(details) = &mut error.details { + for value in details.fields.values_mut() { + normalize_value(value); + } + } + } + None => { + result.result = Some(execution_result::Result::Success(null_value())); + } + } +} + +fn normalize_node_execution_result(result: &mut NodeExecutionResult) { + for parameter_result in &mut result.parameter_results { + match &mut parameter_result.value { + Some(value) => normalize_value(value), + None => { + parameter_result.value = Some(null_value()); + } + } + } + + match &mut result.result { + Some(node_execution_result::Result::Success(value)) => normalize_value(value), + Some(node_execution_result::Result::Error(error)) => { + if let Some(details) = &mut error.details { + for value in details.fields.values_mut() { + normalize_value(value); + } + } + } + None => { + result.result = Some(node_execution_result::Result::Success(null_value())); + } + } +} + +fn normalize_value(value: &mut Value) { + match &mut value.kind { + Some(Kind::StructValue(struct_value)) => { + for field in struct_value.fields.values_mut() { + normalize_value(field); + } + } + Some(Kind::ListValue(list_value)) => { + for item in &mut list_value.values { + normalize_value(item); + } + } + Some(Kind::NumberValue(number)) if number.number.is_none() => { + value.kind = Some(Kind::NullValue(0)); + } + Some(_) => {} + None => { + value.kind = Some(Kind::NullValue(0)); + } + } +} + +fn null_value() -> Value { + Value { + kind: Some(Kind::NullValue(0)), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tucana::shared::{ + Error, ListValue, NodeParameterNodeExecutionResult, NumberValue, Struct, + node_execution_result, + }; + + #[test] + fn normalize_execution_result_fills_missing_results_with_null_success() { + let mut result = ExecutionResult { + execution_identifier: "execution-id".to_string(), + flow_id: 1, + started_at: 1, + finished_at: 2, + input: None, + node_execution_results: vec![NodeExecutionResult { + started_at: 1, + finished_at: 2, + parameter_results: vec![NodeParameterNodeExecutionResult { value: None }], + id: Some(node_execution_result::Id::NodeId(10)), + result: None, + }], + result: None, + }; + + normalize_execution_result(&mut result); + + assert!(matches!( + result.input.as_ref(), + Some(Value { + kind: Some(Kind::NullValue(_)) + }) + )); + assert!(matches!( + result.result.as_ref(), + Some(execution_result::Result::Success(Value { + kind: Some(Kind::NullValue(_)) + })) + )); + assert!(matches!( + result.node_execution_results[0].result.as_ref(), + Some(node_execution_result::Result::Success(Value { + kind: Some(Kind::NullValue(_)) + })) + )); + assert!(matches!( + result.node_execution_results[0].parameter_results[0] + .value + .as_ref(), + Some(Value { + kind: Some(Kind::NullValue(_)) + }) + )); + } + + #[test] + fn normalize_execution_result_recurses_into_child_values() { + let mut fields = std::collections::HashMap::new(); + fields.insert( + "list".to_string(), + Value { + kind: Some(Kind::ListValue(ListValue { + values: vec![ + Value { kind: None }, + Value { + kind: Some(Kind::NumberValue(NumberValue { number: None })), + }, + ], + })), + }, + ); + let mut result = ExecutionResult { + execution_identifier: "execution-id".to_string(), + flow_id: 1, + started_at: 1, + finished_at: 2, + input: Some(Value { + kind: Some(Kind::StructValue(Struct { fields })), + }), + node_execution_results: vec![NodeExecutionResult { + started_at: 1, + finished_at: 2, + parameter_results: vec![NodeParameterNodeExecutionResult { + value: Some(Value { kind: None }), + }], + id: Some(node_execution_result::Id::NodeId(10)), + result: Some(node_execution_result::Result::Error(Error { + details: Some(Struct { + fields: std::collections::HashMap::from([( + "empty".to_string(), + Value { kind: None }, + )]), + }), + ..Default::default() + })), + }], + result: Some(execution_result::Result::Success(Value { kind: None })), + }; + + normalize_execution_result(&mut result); + + assert!(matches!( + result.result.as_ref(), + Some(execution_result::Result::Success(Value { + kind: Some(Kind::NullValue(_)) + })) + )); + assert!(matches!( + result.node_execution_results[0].parameter_results[0] + .value + .as_ref(), + Some(Value { + kind: Some(Kind::NullValue(_)) + }) + )); + let Some(Value { + kind: Some(Kind::StructValue(input)), + }) = result.input.as_ref() + else { + panic!("expected normalized struct input"); + }; + let Some(Value { + kind: Some(Kind::ListValue(list)), + }) = input.fields.get("list") + else { + panic!("expected normalized list field"); + }; + assert!(matches!( + list.values[0].kind.as_ref(), + Some(Kind::NullValue(_)) + )); + assert!(matches!( + list.values[1].kind.as_ref(), + Some(Kind::NullValue(_)) + )); + let Some(node_execution_result::Result::Error(error)) = + result.node_execution_results[0].result.as_ref() + else { + panic!("expected node error result"); + }; + assert!(matches!( + error + .details + .as_ref() + .and_then(|details| details.fields.get("empty")) + .and_then(|value| value.kind.as_ref()), + Some(Kind::NullValue(_)) + )); + } +} diff --git a/flows/12_for_each_function_subflow.json b/flows/12_for_each_function_subflow.json new file mode 100644 index 0000000..f001bc6 --- /dev/null +++ b/flows/12_for_each_function_subflow.json @@ -0,0 +1,114 @@ +{ + "name": "12_for_each_function_subflow", + "description": "This flow validates for_each execution result reporting for function identifier sub_flow callbacks", + "inputs": [ + { + "input": null, + "expected_result": { + "http_status_code": 200, + "headers": {}, + "payload": "20" + } + } + ], + "flow": { + "flowId": "1", + "projectId": "1", + "startingNodeId": "2", + "nodeFunctions": [ + { + "databaseId": "1", + "runtimeFunctionId": "rest::control::respond", + "parameters": [ + { + "databaseId": "1", + "runtimeParameterId": "http_status_code", + "value": { + "literalValue": { + "numberValue": { + "integer": "200" + } + } + } + }, + { + "databaseId": "2", + "runtimeParameterId": "headers", + "value": { + "literalValue": { + "structValue": { + "fields": {} + } + } + } + }, + { + "databaseId": "3", + "runtimeParameterId": "payload", + "value": { + "literalValue": { + "stringValue": "20" + } + } + } + ], + "definitionSource": "draco-draco-cron" + }, + { + "databaseId": "2", + "runtimeFunctionId": "std::list::for_each", + "parameters": [ + { + "databaseId": "4", + "runtimeParameterId": "list", + "value": { + "literalValue": { + "listValue": { + "values": [ + { + "numberValue": { + "integer": "1" + } + }, + { + "numberValue": { + "integer": "2" + } + }, + { + "numberValue": { + "integer": "3" + } + } + ] + } + } + } + }, + { + "databaseId": "5", + "runtimeParameterId": "consumer", + "value": { + "subFlow": { + "signature": "(value: NUMBER): BOOLEAN", + "settings": [ + { + "identifier": "value", + "defaultValue": { + "nullValue": "NULL_VALUE" + }, + "optional": false, + "hidden": false + } + ], + "functionIdentifier": "std::boolean::from_number" + } + } + } + ], + "nextNodeId": "1", + "definitionSource": "draco-draco-cron" + } + ] + } +}