diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index e4f69b7f..a5e5b63e 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -2778,7 +2778,7 @@ def _make_unfinished_update_handler_message( handler_executions: List[HandlerExecution], ) -> str: message = """ -Workflow finished while update handlers are still running. This may have interrupted work that the +[TMPRL1102] Workflow finished while update handlers are still running. This may have interrupted work that the update handler was doing, and the client that sent the update will receive a 'workflow execution already completed' RPCError instead of the update result. You can wait for all update and signal handlers to complete by using `await workflow.wait_condition(lambda: @@ -2797,12 +2797,12 @@ def _make_unfinished_signal_handler_message( handler_executions: List[HandlerExecution], ) -> str: message = """ -Workflow finished while signal handlers are still running. This may have interrupted work that the +[TMPRL1102] Workflow finished while signal handlers are still running. This may have interrupted work that the signal handler was doing. You can wait for all update and signal handlers to complete by using `await workflow.wait_condition(lambda: workflow.all_handlers_finished())`. Alternatively, if both you and the clients sending the signal are okay with interrupting running handlers when the workflow -finishes, and causing clients to receive errors, then you can disable this warning via the signal -handler decorator: `@workflow.signal(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`. +finishes, then you can disable this warning via the signal handler decorator: +`@workflow.signal(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`. """.replace("\n", " ").strip() names = collections.Counter(ex.name for ex in handler_executions) return ( diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 907d30ce..d922bafa 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5222,7 +5222,7 @@ async def test_workflow_current_update(client: Client, env: WorkflowEnvironment) @workflow.defn -class UnfinishedHandlersWorkflow: +class UnfinishedHandlersWarningsWorkflow: def __init__(self): self.started_handler = False self.handler_may_return = False @@ -5275,21 +5275,21 @@ async def test_unfinished_update_handler(client: Client, env: WorkflowEnvironmen pytest.skip( "Java test server: https://github.com/temporalio/sdk-java/issues/1903" ) - async with new_worker(client, UnfinishedHandlersWorkflow) as worker: - test = _UnfinishedHandlersTest(client, worker, "update") + async with new_worker(client, UnfinishedHandlersWarningsWorkflow) as worker: + test = _UnfinishedHandlersWarningsTest(client, worker, "update") await test.test_wait_all_handlers_finished_and_unfinished_handlers_warning() await test.test_unfinished_handlers_cause_exceptions_in_test_suite() async def test_unfinished_signal_handler(client: Client): - async with new_worker(client, UnfinishedHandlersWorkflow) as worker: - test = _UnfinishedHandlersTest(client, worker, "signal") + async with new_worker(client, UnfinishedHandlersWarningsWorkflow) as worker: + test = _UnfinishedHandlersWarningsTest(client, worker, "signal") await test.test_wait_all_handlers_finished_and_unfinished_handlers_warning() await test.test_unfinished_handlers_cause_exceptions_in_test_suite() @dataclass -class _UnfinishedHandlersTest: +class _UnfinishedHandlersWarningsTest: client: Client worker: Worker handler_type: Literal["update", "signal"] @@ -5343,7 +5343,7 @@ async def _workflow_task_failed(self, workflow_id: str) -> bool: for event in reversed(resp.history.events): if event.event_type == EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED: assert event.workflow_task_failed_event_attributes.failure.message.startswith( - f"Workflow finished while {self.handler_type} handlers are still running" + f"[TMPRL1102] Workflow finished while {self.handler_type} handlers are still running" ) return True return False @@ -5370,7 +5370,7 @@ async def _get_workflow_result( handle_future: Optional[asyncio.Future[WorkflowHandle]] = None, ) -> bool: handle = await self.client.start_workflow( - UnfinishedHandlersWorkflow.run, + UnfinishedHandlersWarningsWorkflow.run, arg=wait_all_handlers_finished, id=f"wf-{uuid.uuid4()}", task_queue=self.worker.task_queue, @@ -5404,89 +5404,157 @@ def _unfinished_handler_warning_cls(self) -> Type: @workflow.defn -class UnfinishedHandlersWithCancellationOrFailureWorkflow: +class UnfinishedHandlersOnWorkflowTerminationWorkflow: + def __init__(self) -> None: + self.handlers_may_finish = False + @workflow.run async def run( - self, workflow_termination_type: Literal["cancellation", "failure"] + self, + workflow_termination_type: Literal[ + "-cancellation-", + "-failure-", + "-continue-as-new-", + "-fail-post-continue-as-new-run-", + ], + handler_registration: Literal["-late-registered-", "-not-late-registered-"], + handler_dynamism: Literal["-dynamic-", "-not-dynamic-"], + handler_waiting: Literal[ + "-wait-all-handlers-finish-", "-no-wait-all-handlers-finish-" + ], ) -> NoReturn: - if workflow_termination_type == "failure": + if handler_registration == "-late-registered-": + if handler_dynamism == "-dynamic-": + + async def my_late_registered_dynamic_update( + self, name: str, args: Sequence[RawValue] + ) -> str: + await workflow.wait_condition(lambda: self.handlers_may_finish) + return "my-late-registered-dynamic-update-result" + + async def my_late_registered_dynamic_signal( + self, name: str, args: Sequence[RawValue] + ) -> None: + await workflow.wait_condition(lambda: self.handlers_may_finish) + + workflow.set_dynamic_update_handler(my_late_registered_dynamic_update) + workflow.set_dynamic_signal_handler(my_late_registered_dynamic_signal) + else: + + async def my_late_registered_update(self) -> str: + await workflow.wait_condition(lambda: self.handlers_may_finish) + return "my-late-registered-update-result" + + async def my_late_registered_signal(self) -> None: + await workflow.wait_condition(lambda: self.handlers_may_finish) + + workflow.set_update_handler( + "my_late_registered_update", my_late_registered_update + ) + workflow.set_signal_handler( + "my_late_registered_signal", my_late_registered_signal + ) + + if handler_waiting == "-wait-all-handlers-finish-": + self.handlers_may_finish = True + await workflow.wait_condition(workflow.all_handlers_finished) + if workflow_termination_type == "-failure-": raise ApplicationError( "Deliberately failing workflow with an unfinished handler" ) - await workflow.wait_condition(lambda: False) - raise AssertionError("unreachable") + elif workflow_termination_type == "-fail-post-continue-as-new-run-": + raise ApplicationError("Deliberately failing post-ContinueAsNew run") + elif workflow_termination_type == "-continue-as-new-": + # Fail next run so that test terminates + workflow.continue_as_new( + args=[ + "-fail-post-continue-as-new-run-", + handler_registration, + handler_dynamism, + handler_waiting, + ] + ) + else: + await workflow.wait_condition(lambda: False) + raise AssertionError("unreachable") @workflow.update - async def my_update(self) -> NoReturn: - await workflow.wait_condition(lambda: False) - raise AssertionError("unreachable") + async def my_update(self) -> str: + await workflow.wait_condition(lambda: self.handlers_may_finish) + return "update-result" @workflow.signal - async def my_signal(self) -> NoReturn: - await workflow.wait_condition(lambda: False) - raise AssertionError("unreachable") - - -async def test_unfinished_update_handler_with_workflow_cancellation( - client: Client, env: WorkflowEnvironment -): - if env.supports_time_skipping: - pytest.skip( - "Java test server: https://github.com/temporalio/sdk-java/issues/1903" - ) - await _UnfinishedHandlersWithCancellationOrFailureTest( - client, - "update", - "cancellation", - ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler() + async def my_signal(self) -> None: + await workflow.wait_condition(lambda: self.handlers_may_finish) + @workflow.update(dynamic=True) + async def my_dynamic_update(self, name: str, args: Sequence[RawValue]) -> str: + await workflow.wait_condition(lambda: self.handlers_may_finish) + return "my-dynamic-update-result" -async def test_unfinished_signal_handler_with_workflow_cancellation(client: Client): - await _UnfinishedHandlersWithCancellationOrFailureTest( - client, - "signal", - "cancellation", - ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler() + @workflow.signal(dynamic=True) + async def my_dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None: + await workflow.wait_condition(lambda: self.handlers_may_finish) -async def test_unfinished_update_handler_with_workflow_failure( - client: Client, env: WorkflowEnvironment +@pytest.mark.parametrize("handler_type", ["-signal-", "-update-"]) +@pytest.mark.parametrize( + "handler_registration", ["-late-registered-", "-not-late-registered-"] +) +@pytest.mark.parametrize("handler_dynamism", ["-dynamic-", "-not-dynamic-"]) +@pytest.mark.parametrize( + "handler_waiting", + ["-wait-all-handlers-finish-", "-no-wait-all-handlers-finish-"], +) +@pytest.mark.parametrize( + "workflow_termination_type", ["-cancellation-", "-failure-", "-continue-as-new-"] +) +async def test_unfinished_handler_on_workflow_termination( + client: Client, + env: WorkflowEnvironment, + handler_type: Literal["-signal-", "-update-"], + handler_registration: Literal["-late-registered-", "-not-late-registered-"], + handler_dynamism: Literal["-dynamic-", "-not-dynamic-"], + handler_waiting: Literal[ + "-wait-all-handlers-finish-", "-no-wait-all-handlers-finish-" + ], + workflow_termination_type: Literal[ + "-cancellation-", "-failure-", "-continue-as-new-" + ], ): - if env.supports_time_skipping: + if handler_type == "-update-" and env.supports_time_skipping: pytest.skip( "Java test server: https://github.com/temporalio/sdk-java/issues/1903" ) - await _UnfinishedHandlersWithCancellationOrFailureTest( + await _UnfinishedHandlersOnWorkflowTerminationTest( client, - "update", - "failure", - ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler() - - -async def test_unfinished_signal_handler_with_workflow_failure( - client: Client, env: WorkflowEnvironment -): - if env.supports_time_skipping: - pytest.skip( - "Java test server: https://github.com/temporalio/sdk-java/issues/2127" - ) - await _UnfinishedHandlersWithCancellationOrFailureTest( - client, - "signal", - "failure", - ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler() + handler_type, + workflow_termination_type, + handler_registration, + handler_dynamism, + handler_waiting, + ).test_warning_is_issued_on_exit_with_unfinished_handler() @dataclass -class _UnfinishedHandlersWithCancellationOrFailureTest: +class _UnfinishedHandlersOnWorkflowTerminationTest: client: Client - handler_type: Literal["update", "signal"] - workflow_termination_type: Literal["cancellation", "failure"] - - async def test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler( + handler_type: Literal["-signal-", "-update-"] + workflow_termination_type: Literal[ + "-cancellation-", "-failure-", "-continue-as-new-" + ] + handler_registration: Literal["-late-registered-", "-not-late-registered-"] + handler_dynamism: Literal["-dynamic-", "-not-dynamic-"] + handler_waiting: Literal[ + "-wait-all-handlers-finish-", "-no-wait-all-handlers-finish-" + ] + + async def test_warning_is_issued_on_exit_with_unfinished_handler( self, ): - assert await self._run_workflow_and_get_warning() + assert await self._run_workflow_and_get_warning() == ( + self.handler_waiting == "-no-wait-all-handlers-finish-" + ) async def _run_workflow_and_get_warning(self) -> bool: workflow_id = f"wf-{uuid.uuid4()}" @@ -5497,18 +5565,30 @@ async def _run_workflow_and_get_warning(self) -> bool: # in the same WFT. To do this we start the worker after they've all been accepted by the # server. handle = await self.client.start_workflow( - UnfinishedHandlersWithCancellationOrFailureWorkflow.run, - self.workflow_termination_type, + UnfinishedHandlersOnWorkflowTerminationWorkflow.run, + args=[ + self.workflow_termination_type, + self.handler_registration, + self.handler_dynamism, + self.handler_waiting, + ], id=workflow_id, task_queue=task_queue, ) - if self.workflow_termination_type == "cancellation": + if self.workflow_termination_type == "-cancellation-": await handle.cancel() - if self.handler_type == "update": + if self.handler_type == "-update-": + update_method = ( + "__does_not_exist__" + if self.handler_dynamism == "-dynamic-" + else "my_late_registered_update" + if self.handler_registration == "-late-registered-" + else UnfinishedHandlersOnWorkflowTerminationWorkflow.my_update + ) update_task = asyncio.create_task( handle.execute_update( - UnfinishedHandlersWithCancellationOrFailureWorkflow.my_update, + update_method, # type: ignore id=update_id, ) ) @@ -5517,33 +5597,48 @@ async def _run_workflow_and_get_warning(self) -> bool: lambda: workflow_update_exists(self.client, workflow_id, update_id), ) else: - await handle.signal( - UnfinishedHandlersWithCancellationOrFailureWorkflow.my_signal + signal_method = ( + "__does_not_exist__" + if self.handler_dynamism == "-dynamic-" + else "my_late_registered_signal" + if self.handler_registration == "-late-registered-" + else UnfinishedHandlersOnWorkflowTerminationWorkflow.my_signal ) + await handle.signal(signal_method) # type: ignore async with new_worker( self.client, - UnfinishedHandlersWithCancellationOrFailureWorkflow, + UnfinishedHandlersOnWorkflowTerminationWorkflow, task_queue=task_queue, ): with pytest.WarningsRecorder() as warnings: - if self.handler_type == "update": + if self.handler_type == "-update-": assert update_task - with pytest.raises(RPCError) as update_err: + if self.handler_waiting == "-wait-all-handlers-finish-": await update_task - assert update_err.value.status == RPCStatusCode.NOT_FOUND and ( - str(update_err.value).lower() - == "workflow execution already completed" - ) + else: + with pytest.raises(RPCError) as update_err: + await update_task + assert update_err.value.status == RPCStatusCode.NOT_FOUND and ( + str(update_err.value).lower() + == "workflow execution already completed" + ) with pytest.raises(WorkflowFailureError) as err: await handle.result() assert isinstance( err.value.cause, - {"cancellation": CancelledError, "failure": ApplicationError}[ - self.workflow_termination_type - ], + { + "-cancellation-": CancelledError, + "-continue-as-new-": ApplicationError, + "-failure-": ApplicationError, + }[self.workflow_termination_type], ) + if self.workflow_termination_type == "-continue-as-new-": + assert ( + str(err.value.cause) + == "Deliberately failing post-ContinueAsNew run" + ) unfinished_handler_warning_emitted = any( issubclass(w.category, self._unfinished_handler_warning_cls) @@ -5554,8 +5649,8 @@ async def _run_workflow_and_get_warning(self) -> bool: @property def _unfinished_handler_warning_cls(self) -> Type: return { - "update": workflow.UnfinishedUpdateHandlersWarning, - "signal": workflow.UnfinishedSignalHandlersWarning, + "-update-": workflow.UnfinishedUpdateHandlersWarning, + "-signal-": workflow.UnfinishedSignalHandlersWarning, }[self.handler_type]