diff --git a/examples/workflow/README.md b/examples/workflow/README.md index b27816c9..26601d6f 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -548,8 +548,8 @@ It shows: forwards the caller's events only. - `propagation=PropagationScope.LINEAGE` on an activity call — forwards the caller's events *plus* anything the caller itself received from its parent. -- `PropagatedHistory.get_workflow_by_name(...)` and `WorkflowResult.get_activity_by_name(...)` - on the receiving side. +- `PropagatedHistory.get_last_workflow_by_name(...)` and + `WorkflowResult.get_last_activity_by_name(...)` on the receiving side. > **Requires** a Dapr sidecar with workflow history propagation support > (durabletask-go PR #85 / runtime 1.18+ ). With an older sidecar the diff --git a/examples/workflow/history_propagation.py b/examples/workflow/history_propagation.py index 76e60056..7f6a56b2 100644 --- a/examples/workflow/history_propagation.py +++ b/examples/workflow/history_propagation.py @@ -53,7 +53,7 @@ def log_summary(ctx: wf.WorkflowActivityContext, _: None) -> str: parent = workflows[-1] try: - validate = parent.get_activity_by_name('validate_merchant') + validate = parent.get_last_activity_by_name('validate_merchant') except wf.PropagationNotFoundError: print('*** log_summary: parent did not run validate_merchant', flush=True) return 'parent-missing-validate' @@ -81,7 +81,7 @@ def process_payment(ctx: wf.DaprWorkflowContext, _: None): parent = workflows[-1] try: - validate = parent.get_activity_by_name('validate_merchant') + validate = parent.get_last_activity_by_name('validate_merchant') except wf.PropagationNotFoundError: print('*** process_payment: parent did not run validate_merchant', flush=True) return 'parent-missing-validate' diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/propagation.py b/ext/dapr-ext-workflow/dapr/ext/workflow/propagation.py index 9b756c36..8bca170f 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/propagation.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/propagation.py @@ -34,6 +34,13 @@ class PropagationScope(Enum): Values map 1:1 to the protobuf ``HistoryPropagationScope`` enum; the plumbing layer reads ``.value`` when writing to proto fields. + + * ``OWN_HISTORY`` — propagate the caller's events only; drop any ancestor + chain. Use as a trust boundary, where downstream code should only see + the immediate caller. + * ``LINEAGE`` — propagate the caller's events plus any ancestor events it + received. Use for chain-of-custody verification, where downstream code + needs visibility into the full lineage of upstream workflows. """ NONE = int(pb.HISTORY_PROPAGATION_SCOPE_NONE) @@ -144,7 +151,7 @@ def _resolve_child_workflow( class WorkflowResult: """A scoped view of a single workflow's chunk in propagated history. - Use :meth:`get_activity_by_name` / :meth:`get_child_workflow_by_name` + Use :meth:`get_last_activity_by_name` / :meth:`get_last_child_workflow_by_name` to query specific items inside this chunk. Methods return the most-recent occurrence by execution order. """ @@ -158,7 +165,7 @@ def get_activities_by_name(self, name: str) -> list[ActivityResult]: """Return every activity in this chunk whose scheduled name matches, in execution order. Empty list if none. - See also: :meth:`get_activity_by_name` for the most recent match only. + See also: :meth:`get_last_activity_by_name` for the most recent match only. """ return [ _resolve_activity(self._events, e) @@ -166,7 +173,7 @@ def get_activities_by_name(self, name: str) -> list[ActivityResult]: if e.HasField('taskScheduled') and e.taskScheduled.name == name ] - def get_activity_by_name(self, name: str) -> ActivityResult: + def get_last_activity_by_name(self, name: str) -> ActivityResult: """Return the most recent activity in this chunk whose name matches. Raises :class:`PropagationNotFoundError` if no activity scheduled with @@ -186,7 +193,7 @@ def get_child_workflows_by_name(self, name: str) -> list[ChildWorkflowResult]: """Return every child workflow in this chunk whose name matches, in execution order. - See also: :meth:`get_child_workflow_by_name` for the most recent match. + See also: :meth:`get_last_child_workflow_by_name` for the most recent match. """ return [ _resolve_child_workflow(self._events, e.eventId, name) @@ -195,7 +202,7 @@ def get_child_workflows_by_name(self, name: str) -> list[ChildWorkflowResult]: and e.childWorkflowInstanceCreated.name == name ] - def get_child_workflow_by_name(self, name: str) -> ChildWorkflowResult: + def get_last_child_workflow_by_name(self, name: str) -> ChildWorkflowResult: """Return the most recent child workflow in this chunk whose name matches. Raises :class:`PropagationNotFoundError` if no match is found. @@ -301,12 +308,12 @@ def get_workflows_by_name(self, name: str) -> list[WorkflowResult]: """All workflows whose name matches, in execution order. Useful when the chain contains the same name more than once (recursion / ContinueAsNew). - See also: :meth:`get_workflow_by_name` for a single-result helper that + See also: :meth:`get_last_workflow_by_name` for a single-result helper that returns only the most recent match. """ return [self._make_workflow_result(c) for c in self._chunks if c.workflow_name == name] - def get_workflow_by_name(self, name: str) -> WorkflowResult: + def get_last_workflow_by_name(self, name: str) -> WorkflowResult: """Most recent workflow in the chain whose name matches. Raises :class:`PropagationNotFoundError` if no match is found. diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_propagation.py b/ext/dapr-ext-workflow/tests/durabletask/test_propagation.py index 260ab1ab..d63d38c1 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_propagation.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_propagation.py @@ -196,21 +196,21 @@ def test_get_workflows_returns_chunks_in_order(history: PropagatedHistory): assert workflows[1].instance_id == 'wf-002' -def test_get_workflow_by_name_returns_match(history: PropagatedHistory): - wf = history.get_workflow_by_name('ProcessPayment') +def test_get_last_workflow_by_name_returns_match(history: PropagatedHistory): + wf = history.get_last_workflow_by_name('ProcessPayment') assert wf.name == 'ProcessPayment' assert wf.instance_id == 'wf-002' -def test_get_workflow_by_name_raises_when_missing(history: PropagatedHistory): +def test_get_last_workflow_by_name_raises_when_missing(history: PropagatedHistory): with pytest.raises(PropagationNotFoundError): - history.get_workflow_by_name('NotARealWorkflow') + history.get_last_workflow_by_name('NotARealWorkflow') def test_get_workflows_by_name_returns_all_matches(): """If the same workflow name appears in multiple chunks (e.g. ContinueAsNew or recursion), get_workflows_by_name returns every occurrence and - get_workflow_by_name returns the last.""" + get_last_workflow_by_name returns the last.""" chunk_events = [_execution_started('Loop')] proto = pb.PropagatedHistory( @@ -225,15 +225,15 @@ def test_get_workflows_by_name_returns_all_matches(): all_loops = ph.get_workflows_by_name('Loop') assert len(all_loops) == 2 - assert ph.get_workflow_by_name('Loop').instance_id == 'wf-2' + assert ph.get_last_workflow_by_name('Loop').instance_id == 'wf-2' # --- Activity resolution ---------------------------------------------------- -def test_get_activity_by_name_returns_completed_result(history: PropagatedHistory): - merchant = history.get_workflow_by_name('MerchantCheckout') - activity = merchant.get_activity_by_name('ValidateMerchant') +def test_get_last_activity_by_name_returns_completed_result(history: PropagatedHistory): + merchant = history.get_last_workflow_by_name('MerchantCheckout') + activity = merchant.get_last_activity_by_name('ValidateMerchant') assert activity.name == 'ValidateMerchant' assert activity.started @@ -245,7 +245,7 @@ def test_get_activity_by_name_returns_completed_result(history: PropagatedHistor def test_get_activities_by_name_returns_all_invocations(history: PropagatedHistory): - payment = history.get_workflow_by_name('ProcessPayment') + payment = history.get_last_workflow_by_name('ProcessPayment') cards = payment.get_activities_by_name('ValidateCard') assert len(cards) == 2 @@ -257,20 +257,20 @@ def test_get_activities_by_name_returns_all_invocations(history: PropagatedHisto assert cards[1].error.errorMessage == 'card declined' -def test_get_activity_by_name_returns_last_invocation(history: PropagatedHistory): - """get_activity_by_name returns the most recent invocation in execution +def test_get_last_activity_by_name_returns_last_invocation(history: PropagatedHistory): + """get_last_activity_by_name returns the most recent invocation in execution order, matching Go semantics.""" - payment = history.get_workflow_by_name('ProcessPayment') - last = payment.get_activity_by_name('ValidateCard') + payment = history.get_last_workflow_by_name('ProcessPayment') + last = payment.get_last_activity_by_name('ValidateCard') assert last.failed assert last.error is not None assert last.error.errorMessage == 'card declined' -def test_get_activity_by_name_raises_when_missing(history: PropagatedHistory): - payment = history.get_workflow_by_name('ProcessPayment') +def test_get_last_activity_by_name_raises_when_missing(history: PropagatedHistory): + payment = history.get_last_workflow_by_name('ProcessPayment') with pytest.raises(PropagationNotFoundError): - payment.get_activity_by_name('NotAnActivity') + payment.get_last_activity_by_name('NotAnActivity') def test_activity_not_yet_completed_reports_started_only(): @@ -286,7 +286,7 @@ def test_activity_not_yet_completed_reports_started_only(): ) ph = PropagatedHistory.from_proto(proto) assert ph is not None - pending = ph.get_workflow_by_name('StillRunning').get_activity_by_name('Pending') + pending = ph.get_last_workflow_by_name('StillRunning').get_last_activity_by_name('Pending') assert pending.started assert not pending.completed @@ -298,18 +298,18 @@ def test_activity_not_yet_completed_reports_started_only(): # --- Child workflow resolution ---------------------------------------------- -def test_get_child_workflow_by_name(history: PropagatedHistory): - merchant = history.get_workflow_by_name('MerchantCheckout') - child = merchant.get_child_workflow_by_name('ProcessPayment') +def test_get_last_child_workflow_by_name(history: PropagatedHistory): + merchant = history.get_last_workflow_by_name('MerchantCheckout') + child = merchant.get_last_child_workflow_by_name('ProcessPayment') assert child.name == 'ProcessPayment' assert child.started -def test_get_child_workflow_by_name_raises_when_missing(history: PropagatedHistory): - merchant = history.get_workflow_by_name('MerchantCheckout') +def test_get_last_child_workflow_by_name_raises_when_missing(history: PropagatedHistory): + merchant = history.get_last_workflow_by_name('MerchantCheckout') with pytest.raises(PropagationNotFoundError): - merchant.get_child_workflow_by_name('NotAChild') + merchant.get_last_child_workflow_by_name('NotAChild') # --- from_proto / structural validation ------------------------------------- diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_propagation_wiring.py b/ext/dapr-ext-workflow/tests/durabletask/test_propagation_wiring.py index a9418247..f74b8fb8 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_propagation_wiring.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_propagation_wiring.py @@ -158,7 +158,7 @@ def orchestrator(ctx: task.OrchestrationContext, _): history = captured['history'] assert history is not None assert history.get_app_ids() == ['parent-app'] - assert history.get_workflow_by_name('Parent').instance_id == 'parent-instance' + assert history.get_last_workflow_by_name('Parent').instance_id == 'parent-instance' def test_orchestration_executor_propagated_history_is_none_by_default(): @@ -209,7 +209,7 @@ def reading_activity(ctx: task.ActivityContext, _): history = captured['history'] assert history is not None assert history.get_app_ids() == ['parent-app'] - assert history.get_workflow_by_name('Caller').instance_id == 'parent-instance' + assert history.get_last_workflow_by_name('Caller').instance_id == 'parent-instance' def test_activity_executor_propagated_history_is_none_by_default():