From e01361c3c5dcdbf9975a7c987ca0a78edae469ec Mon Sep 17 00:00:00 2001 From: Nelson Parente Date: Wed, 20 May 2026 22:07:52 +0100 Subject: [PATCH 1/2] feat(workflow): align history propagation API with go-sdk Cassie (durabletask-go author) flagged divergence between python-sdk #1025 and the freshly-renamed go-sdk helpers in durabletask-go #105 (merged 2026-05-19, after #1025 landed). This brings python-sdk's surface back in line for cross-SDK parity before 1.18 ships. Read API renames (mirror durabletask-go GetLast*ByName): - PropagatedHistory.get_workflow_by_name -> get_last_workflow_by_name - WorkflowResult.get_activity_by_name -> get_last_activity_by_name - WorkflowResult.get_child_workflow_by_name -> get_last_child_workflow_by_name Plural variants (get_workflows_by_name, get_activities_by_name, get_child_workflows_by_name) and the chain-level helpers are unchanged. Scheduling helpers (mirror go-sdk workflow.PropagateLineage / workflow.PropagateOwnHistory): - propagate_lineage() -> PropagationScope.LINEAGE - propagate_own_history() -> PropagationScope.OWN_HISTORY PropagationScope enum is kept as the underlying value, so both `propagation=propagate_lineage()` and `propagation=PropagationScope.LINEAGE` work. Example, README snippet, and tests updated to use the renamed/new surface. No runtime/proto changes. Refs: dapr/python-sdk#1001, dapr/durabletask-go#105, dapr/go-sdk#823 Signed-off-by: Nelson Parente --- examples/workflow/README.md | 8 +-- examples/workflow/history_propagation.py | 12 ++-- .../dapr/ext/workflow/__init__.py | 4 ++ .../dapr/ext/workflow/propagation.py | 34 ++++++++--- .../tests/durabletask/test_propagation.py | 61 +++++++++++-------- .../durabletask/test_propagation_wiring.py | 4 +- 6 files changed, 80 insertions(+), 43 deletions(-) diff --git a/examples/workflow/README.md b/examples/workflow/README.md index b27816c9..1c40e35c 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -544,12 +544,12 @@ history to a child workflow and to an activity, and how the receivers query that history through `ctx.get_propagated_history()`. It shows: -- `propagation=PropagationScope.OWN_HISTORY` on a child workflow call — +- `propagation=propagate_own_history()` on a child workflow call — forwards the caller's events only. -- `propagation=PropagationScope.LINEAGE` on an activity call — forwards the +- `propagation=propagate_lineage()` on an activity call — forwards the caller's events *plus* anything the caller itself received from its parent. -- `PropagatedHistory.get_workflow_by_name(...)` and `WorkflowResult.get_activity_by_name(...)` - on the receiving side. +- `PropagatedHistory.get_last_workflow_by_name(...)` and + `WorkflowResult.get_last_activity_by_name(...)` on the receiving side. > **Requires** a Dapr sidecar with workflow history propagation support > (durabletask-go PR #85 / runtime 1.18+ ). With an older sidecar the diff --git a/examples/workflow/history_propagation.py b/examples/workflow/history_propagation.py index 76e60056..78580321 100644 --- a/examples/workflow/history_propagation.py +++ b/examples/workflow/history_propagation.py @@ -13,8 +13,8 @@ """History propagation example. The parent workflow runs a couple of activities, then calls a child workflow -with ``propagation=PropagationScope.OWN_HISTORY`` and an activity with -``propagation=PropagationScope.LINEAGE``. The child workflow and the +with ``propagation=propagate_own_history()`` and an activity with +``propagation=propagate_lineage()``. The child workflow and the downstream activity read the parent's recorded history via ``ctx.get_propagated_history()`` and inspect specific events by name. @@ -53,7 +53,7 @@ def log_summary(ctx: wf.WorkflowActivityContext, _: None) -> str: parent = workflows[-1] try: - validate = parent.get_activity_by_name('validate_merchant') + validate = parent.get_last_activity_by_name('validate_merchant') except wf.PropagationNotFoundError: print('*** log_summary: parent did not run validate_merchant', flush=True) return 'parent-missing-validate' @@ -81,7 +81,7 @@ def process_payment(ctx: wf.DaprWorkflowContext, _: None): parent = workflows[-1] try: - validate = parent.get_activity_by_name('validate_merchant') + validate = parent.get_last_activity_by_name('validate_merchant') except wf.PropagationNotFoundError: print('*** process_payment: parent did not run validate_merchant', flush=True) return 'parent-missing-validate' @@ -106,14 +106,14 @@ def merchant_checkout(ctx: wf.DaprWorkflowContext, merchant_id: str): child_result = yield ctx.call_child_workflow( process_payment, input=None, - propagation=wf.PropagationScope.OWN_HISTORY, + propagation=wf.propagate_own_history(), ) print(f'*** child workflow result: {child_result}', flush=True) audit = yield ctx.call_activity( log_summary, input=None, - propagation=wf.PropagationScope.LINEAGE, + propagation=wf.propagate_lineage(), ) print(f'*** audit activity result: {audit}', flush=True) return {'child': child_result, 'audit': audit} diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py b/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py index 228ceac2..de7c55e2 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py @@ -25,6 +25,8 @@ PropagationNotFoundError, PropagationScope, WorkflowResult, + propagate_lineage, + propagate_own_history, ) from dapr.ext.workflow.retry_policy import RetryPolicy from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext @@ -49,6 +51,8 @@ 'WorkflowResult', 'ActivityResult', 'ChildWorkflowResult', + 'propagate_lineage', + 'propagate_own_history', 'DaprMCPClient', 'MCPToolDef', ] diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/propagation.py b/ext/dapr-ext-workflow/dapr/ext/workflow/propagation.py index 9b756c36..cf95d002 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/propagation.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/propagation.py @@ -41,6 +41,26 @@ class PropagationScope(Enum): LINEAGE = int(pb.HISTORY_PROPAGATION_SCOPE_LINEAGE) +def propagate_lineage() -> PropagationScope: + """Propagate the caller's own events plus any ancestor events it received. + + Use this for chain-of-custody verification, where downstream code needs + visibility into the full lineage of upstream workflows. Mirrors the + go-sdk ``workflow.PropagateLineage()`` helper. + """ + return PropagationScope.LINEAGE + + +def propagate_own_history() -> PropagationScope: + """Propagate the caller's events only; drop any ancestor chain. + + Use this as a trust boundary, where downstream code should only see the + immediate caller. Mirrors the go-sdk ``workflow.PropagateOwnHistory()`` + helper. + """ + return PropagationScope.OWN_HISTORY + + class PropagationNotFoundError(Exception): """Raised when a query against propagated history finds no match.""" @@ -144,7 +164,7 @@ def _resolve_child_workflow( class WorkflowResult: """A scoped view of a single workflow's chunk in propagated history. - Use :meth:`get_activity_by_name` / :meth:`get_child_workflow_by_name` + Use :meth:`get_last_activity_by_name` / :meth:`get_last_child_workflow_by_name` to query specific items inside this chunk. Methods return the most-recent occurrence by execution order. """ @@ -158,7 +178,7 @@ def get_activities_by_name(self, name: str) -> list[ActivityResult]: """Return every activity in this chunk whose scheduled name matches, in execution order. Empty list if none. - See also: :meth:`get_activity_by_name` for the most recent match only. + See also: :meth:`get_last_activity_by_name` for the most recent match only. """ return [ _resolve_activity(self._events, e) @@ -166,7 +186,7 @@ def get_activities_by_name(self, name: str) -> list[ActivityResult]: if e.HasField('taskScheduled') and e.taskScheduled.name == name ] - def get_activity_by_name(self, name: str) -> ActivityResult: + def get_last_activity_by_name(self, name: str) -> ActivityResult: """Return the most recent activity in this chunk whose name matches. Raises :class:`PropagationNotFoundError` if no activity scheduled with @@ -186,7 +206,7 @@ def get_child_workflows_by_name(self, name: str) -> list[ChildWorkflowResult]: """Return every child workflow in this chunk whose name matches, in execution order. - See also: :meth:`get_child_workflow_by_name` for the most recent match. + See also: :meth:`get_last_child_workflow_by_name` for the most recent match. """ return [ _resolve_child_workflow(self._events, e.eventId, name) @@ -195,7 +215,7 @@ def get_child_workflows_by_name(self, name: str) -> list[ChildWorkflowResult]: and e.childWorkflowInstanceCreated.name == name ] - def get_child_workflow_by_name(self, name: str) -> ChildWorkflowResult: + def get_last_child_workflow_by_name(self, name: str) -> ChildWorkflowResult: """Return the most recent child workflow in this chunk whose name matches. Raises :class:`PropagationNotFoundError` if no match is found. @@ -301,12 +321,12 @@ def get_workflows_by_name(self, name: str) -> list[WorkflowResult]: """All workflows whose name matches, in execution order. Useful when the chain contains the same name more than once (recursion / ContinueAsNew). - See also: :meth:`get_workflow_by_name` for a single-result helper that + See also: :meth:`get_last_workflow_by_name` for a single-result helper that returns only the most recent match. """ return [self._make_workflow_result(c) for c in self._chunks if c.workflow_name == name] - def get_workflow_by_name(self, name: str) -> WorkflowResult: + def get_last_workflow_by_name(self, name: str) -> WorkflowResult: """Most recent workflow in the chain whose name matches. Raises :class:`PropagationNotFoundError` if no match is found. diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_propagation.py b/ext/dapr-ext-workflow/tests/durabletask/test_propagation.py index 260ab1ab..718e5a69 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_propagation.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_propagation.py @@ -23,6 +23,8 @@ PropagatedHistory, PropagationNotFoundError, PropagationScope, + propagate_lineage, + propagate_own_history, ) from google.protobuf import wrappers_pb2 @@ -196,21 +198,21 @@ def test_get_workflows_returns_chunks_in_order(history: PropagatedHistory): assert workflows[1].instance_id == 'wf-002' -def test_get_workflow_by_name_returns_match(history: PropagatedHistory): - wf = history.get_workflow_by_name('ProcessPayment') +def test_get_last_workflow_by_name_returns_match(history: PropagatedHistory): + wf = history.get_last_workflow_by_name('ProcessPayment') assert wf.name == 'ProcessPayment' assert wf.instance_id == 'wf-002' -def test_get_workflow_by_name_raises_when_missing(history: PropagatedHistory): +def test_get_last_workflow_by_name_raises_when_missing(history: PropagatedHistory): with pytest.raises(PropagationNotFoundError): - history.get_workflow_by_name('NotARealWorkflow') + history.get_last_workflow_by_name('NotARealWorkflow') def test_get_workflows_by_name_returns_all_matches(): """If the same workflow name appears in multiple chunks (e.g. ContinueAsNew or recursion), get_workflows_by_name returns every occurrence and - get_workflow_by_name returns the last.""" + get_last_workflow_by_name returns the last.""" chunk_events = [_execution_started('Loop')] proto = pb.PropagatedHistory( @@ -225,15 +227,15 @@ def test_get_workflows_by_name_returns_all_matches(): all_loops = ph.get_workflows_by_name('Loop') assert len(all_loops) == 2 - assert ph.get_workflow_by_name('Loop').instance_id == 'wf-2' + assert ph.get_last_workflow_by_name('Loop').instance_id == 'wf-2' # --- Activity resolution ---------------------------------------------------- -def test_get_activity_by_name_returns_completed_result(history: PropagatedHistory): - merchant = history.get_workflow_by_name('MerchantCheckout') - activity = merchant.get_activity_by_name('ValidateMerchant') +def test_get_last_activity_by_name_returns_completed_result(history: PropagatedHistory): + merchant = history.get_last_workflow_by_name('MerchantCheckout') + activity = merchant.get_last_activity_by_name('ValidateMerchant') assert activity.name == 'ValidateMerchant' assert activity.started @@ -245,7 +247,7 @@ def test_get_activity_by_name_returns_completed_result(history: PropagatedHistor def test_get_activities_by_name_returns_all_invocations(history: PropagatedHistory): - payment = history.get_workflow_by_name('ProcessPayment') + payment = history.get_last_workflow_by_name('ProcessPayment') cards = payment.get_activities_by_name('ValidateCard') assert len(cards) == 2 @@ -257,20 +259,20 @@ def test_get_activities_by_name_returns_all_invocations(history: PropagatedHisto assert cards[1].error.errorMessage == 'card declined' -def test_get_activity_by_name_returns_last_invocation(history: PropagatedHistory): - """get_activity_by_name returns the most recent invocation in execution +def test_get_last_activity_by_name_returns_last_invocation(history: PropagatedHistory): + """get_last_activity_by_name returns the most recent invocation in execution order, matching Go semantics.""" - payment = history.get_workflow_by_name('ProcessPayment') - last = payment.get_activity_by_name('ValidateCard') + payment = history.get_last_workflow_by_name('ProcessPayment') + last = payment.get_last_activity_by_name('ValidateCard') assert last.failed assert last.error is not None assert last.error.errorMessage == 'card declined' -def test_get_activity_by_name_raises_when_missing(history: PropagatedHistory): - payment = history.get_workflow_by_name('ProcessPayment') +def test_get_last_activity_by_name_raises_when_missing(history: PropagatedHistory): + payment = history.get_last_workflow_by_name('ProcessPayment') with pytest.raises(PropagationNotFoundError): - payment.get_activity_by_name('NotAnActivity') + payment.get_last_activity_by_name('NotAnActivity') def test_activity_not_yet_completed_reports_started_only(): @@ -286,7 +288,7 @@ def test_activity_not_yet_completed_reports_started_only(): ) ph = PropagatedHistory.from_proto(proto) assert ph is not None - pending = ph.get_workflow_by_name('StillRunning').get_activity_by_name('Pending') + pending = ph.get_last_workflow_by_name('StillRunning').get_last_activity_by_name('Pending') assert pending.started assert not pending.completed @@ -298,18 +300,18 @@ def test_activity_not_yet_completed_reports_started_only(): # --- Child workflow resolution ---------------------------------------------- -def test_get_child_workflow_by_name(history: PropagatedHistory): - merchant = history.get_workflow_by_name('MerchantCheckout') - child = merchant.get_child_workflow_by_name('ProcessPayment') +def test_get_last_child_workflow_by_name(history: PropagatedHistory): + merchant = history.get_last_workflow_by_name('MerchantCheckout') + child = merchant.get_last_child_workflow_by_name('ProcessPayment') assert child.name == 'ProcessPayment' assert child.started -def test_get_child_workflow_by_name_raises_when_missing(history: PropagatedHistory): - merchant = history.get_workflow_by_name('MerchantCheckout') +def test_get_last_child_workflow_by_name_raises_when_missing(history: PropagatedHistory): + merchant = history.get_last_workflow_by_name('MerchantCheckout') with pytest.raises(PropagationNotFoundError): - merchant.get_child_workflow_by_name('NotAChild') + merchant.get_last_child_workflow_by_name('NotAChild') # --- from_proto / structural validation ------------------------------------- @@ -370,3 +372,14 @@ def test_from_proto_handles_empty_chunks(): assert ph.events == [] assert ph.get_app_ids() == [] assert ph.get_workflows() == [] + + +# --- Factory helpers (go-sdk parity) ---------------------------------------- + + +def test_propagate_lineage_helper_returns_lineage_scope(): + assert propagate_lineage() is PropagationScope.LINEAGE + + +def test_propagate_own_history_helper_returns_own_history_scope(): + assert propagate_own_history() is PropagationScope.OWN_HISTORY diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_propagation_wiring.py b/ext/dapr-ext-workflow/tests/durabletask/test_propagation_wiring.py index a9418247..f74b8fb8 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_propagation_wiring.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_propagation_wiring.py @@ -158,7 +158,7 @@ def orchestrator(ctx: task.OrchestrationContext, _): history = captured['history'] assert history is not None assert history.get_app_ids() == ['parent-app'] - assert history.get_workflow_by_name('Parent').instance_id == 'parent-instance' + assert history.get_last_workflow_by_name('Parent').instance_id == 'parent-instance' def test_orchestration_executor_propagated_history_is_none_by_default(): @@ -209,7 +209,7 @@ def reading_activity(ctx: task.ActivityContext, _): history = captured['history'] assert history is not None assert history.get_app_ids() == ['parent-app'] - assert history.get_workflow_by_name('Caller').instance_id == 'parent-instance' + assert history.get_last_workflow_by_name('Caller').instance_id == 'parent-instance' def test_activity_executor_propagated_history_is_none_by_default(): From 69a78d6de7657b7444009db229bd0970e01dac62 Mon Sep 17 00:00:00 2001 From: Nelson Parente Date: Thu, 21 May 2026 11:34:54 +0100 Subject: [PATCH 2/2] refactor(workflow): drop propagate_lineage/propagate_own_history factories Per review feedback, Go-style factory helpers are not idiomatic Python: they obscure the actual enum value at the call site and confuse static type checkers (return annotation only shows PropagationScope, not the specific member). Use PropagationScope.LINEAGE / PropagationScope.OWN_HISTORY directly instead. Signed-off-by: Nelson Parente --- examples/workflow/README.md | 4 +-- examples/workflow/history_propagation.py | 8 +++--- .../dapr/ext/workflow/__init__.py | 4 --- .../dapr/ext/workflow/propagation.py | 27 +++++-------------- .../tests/durabletask/test_propagation.py | 13 --------- 5 files changed, 13 insertions(+), 43 deletions(-) diff --git a/examples/workflow/README.md b/examples/workflow/README.md index 1c40e35c..26601d6f 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -544,9 +544,9 @@ history to a child workflow and to an activity, and how the receivers query that history through `ctx.get_propagated_history()`. It shows: -- `propagation=propagate_own_history()` on a child workflow call — +- `propagation=PropagationScope.OWN_HISTORY` on a child workflow call — forwards the caller's events only. -- `propagation=propagate_lineage()` on an activity call — forwards the +- `propagation=PropagationScope.LINEAGE` on an activity call — forwards the caller's events *plus* anything the caller itself received from its parent. - `PropagatedHistory.get_last_workflow_by_name(...)` and `WorkflowResult.get_last_activity_by_name(...)` on the receiving side. diff --git a/examples/workflow/history_propagation.py b/examples/workflow/history_propagation.py index 78580321..7f6a56b2 100644 --- a/examples/workflow/history_propagation.py +++ b/examples/workflow/history_propagation.py @@ -13,8 +13,8 @@ """History propagation example. The parent workflow runs a couple of activities, then calls a child workflow -with ``propagation=propagate_own_history()`` and an activity with -``propagation=propagate_lineage()``. The child workflow and the +with ``propagation=PropagationScope.OWN_HISTORY`` and an activity with +``propagation=PropagationScope.LINEAGE``. The child workflow and the downstream activity read the parent's recorded history via ``ctx.get_propagated_history()`` and inspect specific events by name. @@ -106,14 +106,14 @@ def merchant_checkout(ctx: wf.DaprWorkflowContext, merchant_id: str): child_result = yield ctx.call_child_workflow( process_payment, input=None, - propagation=wf.propagate_own_history(), + propagation=wf.PropagationScope.OWN_HISTORY, ) print(f'*** child workflow result: {child_result}', flush=True) audit = yield ctx.call_activity( log_summary, input=None, - propagation=wf.propagate_lineage(), + propagation=wf.PropagationScope.LINEAGE, ) print(f'*** audit activity result: {audit}', flush=True) return {'child': child_result, 'audit': audit} diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py b/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py index de7c55e2..228ceac2 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py @@ -25,8 +25,6 @@ PropagationNotFoundError, PropagationScope, WorkflowResult, - propagate_lineage, - propagate_own_history, ) from dapr.ext.workflow.retry_policy import RetryPolicy from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext @@ -51,8 +49,6 @@ 'WorkflowResult', 'ActivityResult', 'ChildWorkflowResult', - 'propagate_lineage', - 'propagate_own_history', 'DaprMCPClient', 'MCPToolDef', ] diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/propagation.py b/ext/dapr-ext-workflow/dapr/ext/workflow/propagation.py index cf95d002..8bca170f 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/propagation.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/propagation.py @@ -34,6 +34,13 @@ class PropagationScope(Enum): Values map 1:1 to the protobuf ``HistoryPropagationScope`` enum; the plumbing layer reads ``.value`` when writing to proto fields. + + * ``OWN_HISTORY`` — propagate the caller's events only; drop any ancestor + chain. Use as a trust boundary, where downstream code should only see + the immediate caller. + * ``LINEAGE`` — propagate the caller's events plus any ancestor events it + received. Use for chain-of-custody verification, where downstream code + needs visibility into the full lineage of upstream workflows. """ NONE = int(pb.HISTORY_PROPAGATION_SCOPE_NONE) @@ -41,26 +48,6 @@ class PropagationScope(Enum): LINEAGE = int(pb.HISTORY_PROPAGATION_SCOPE_LINEAGE) -def propagate_lineage() -> PropagationScope: - """Propagate the caller's own events plus any ancestor events it received. - - Use this for chain-of-custody verification, where downstream code needs - visibility into the full lineage of upstream workflows. Mirrors the - go-sdk ``workflow.PropagateLineage()`` helper. - """ - return PropagationScope.LINEAGE - - -def propagate_own_history() -> PropagationScope: - """Propagate the caller's events only; drop any ancestor chain. - - Use this as a trust boundary, where downstream code should only see the - immediate caller. Mirrors the go-sdk ``workflow.PropagateOwnHistory()`` - helper. - """ - return PropagationScope.OWN_HISTORY - - class PropagationNotFoundError(Exception): """Raised when a query against propagated history finds no match.""" diff --git a/ext/dapr-ext-workflow/tests/durabletask/test_propagation.py b/ext/dapr-ext-workflow/tests/durabletask/test_propagation.py index 718e5a69..d63d38c1 100644 --- a/ext/dapr-ext-workflow/tests/durabletask/test_propagation.py +++ b/ext/dapr-ext-workflow/tests/durabletask/test_propagation.py @@ -23,8 +23,6 @@ PropagatedHistory, PropagationNotFoundError, PropagationScope, - propagate_lineage, - propagate_own_history, ) from google.protobuf import wrappers_pb2 @@ -372,14 +370,3 @@ def test_from_proto_handles_empty_chunks(): assert ph.events == [] assert ph.get_app_ids() == [] assert ph.get_workflows() == [] - - -# --- Factory helpers (go-sdk parity) ---------------------------------------- - - -def test_propagate_lineage_helper_returns_lineage_scope(): - assert propagate_lineage() is PropagationScope.LINEAGE - - -def test_propagate_own_history_helper_returns_own_history_scope(): - assert propagate_own_history() is PropagationScope.OWN_HISTORY