From 2c27ff117f04b7787cefc5b833287afd979813eb Mon Sep 17 00:00:00 2001 From: Corran Webster Date: Mon, 4 Mar 2024 07:30:07 +0000 Subject: [PATCH 01/11] Add basic support for async dispatch --- .../traits_user_manual/notification.rst | 33 +++++++++ traits/observation/observe.py | 19 ++++- traits/observation/tests/test_observe.py | 73 +++++++++++++++++++ traits/tests/test_observe.py | 47 ++++++++++++ 4 files changed, 170 insertions(+), 2 deletions(-) diff --git a/docs/source/traits_user_manual/notification.rst b/docs/source/traits_user_manual/notification.rst index 13ca695df..e18a3d5b1 100644 --- a/docs/source/traits_user_manual/notification.rst +++ b/docs/source/traits_user_manual/notification.rst @@ -313,6 +313,39 @@ it is invoked. The following example shows the first option: :start-at: from traits.api +Async Notification Handlers +``````````````````````````` + +Since Traits 8.0 you can use an async coroutine as an observe handler, either +with an |@observe| decorator:: + + class AsyncExample(HasTraits): + value = Str() + + @observe('value') + async def _value_updated(self, event): + await asyncio.sleep(0) + print("value changed") + +or via the |HasTraits.observe| method:: + + async def async_observer(self, event): + await asyncio.sleep(0) + print("value changed") + + async_example = AsyncExample() + async_example.observe(async_observer, "value") + + +When a trait change event occurs which is observed by an async handler while +in an asyncio event loop, a task will be created to call the handler at a later +time. If the event loop is not running an exception will be raised. + +.. warning:: + + This is an experimental feature, and behavior may change in the future. + + Features and fixes provided by |@observe| ----------------------------------------- diff --git a/traits/observation/observe.py b/traits/observation/observe.py index dfab99761..020051aa2 100644 --- a/traits/observation/observe.py +++ b/traits/observation/observe.py @@ -8,23 +8,38 @@ # # Thanks for using Enthought open source! +import asyncio + from traits.observation._observe import add_or_remove_notifiers from traits.observation.expression import compile_expr +#: Set to hold references to active async traits handlers. +_active_handler_tasks = set() + def dispatch_same(handler, event): """ Dispatch an event handler on the same thread. + This dispatcher accepts both callables and async callables, the latter + being dispatched asynchronously via an async Task. Asynchronous dispatch + is only available when an async event loop is running; it will raise if + it cannot create an async Task. + Parameters ---------- - handler : callable(event) + handler : callable(event) or async callable(event) User-defined callable to handle change events. ``event`` is an object representing the change. Its type and content depends on the change. event : object The event object to be given to handler. """ - handler(event) + if asyncio.iscoroutinefunction(handler): + task = asyncio.create_task(handler(event)) + _active_handler_tasks.add(task) + task.add_done_callback(_active_handler_tasks.discard) + else: + handler(event) def observe( diff --git a/traits/observation/tests/test_observe.py b/traits/observation/tests/test_observe.py index 29f00b2c5..f84c7bb93 100644 --- a/traits/observation/tests/test_observe.py +++ b/traits/observation/tests/test_observe.py @@ -8,6 +8,8 @@ # # Thanks for using Enthought open source! +import asyncio +from contextlib import contextmanager import unittest from unittest import mock @@ -653,3 +655,74 @@ def test_apply_observers_different_target(self): # then # the handler should be called twice as the targets are different. self.assertEqual(handler.call_count, 2) + + +# ---- Low-level tests for async dispatch_same ------------------------------ + + +class TestAsyncDispatchSame(unittest.IsolatedAsyncioTestCase): + """Test low-level async dispatch.""" + + def setUp(self): + from traits.observation.observe import _active_handler_tasks + + # ensure no lingering references to handler tasks after test run + self.addCleanup(_active_handler_tasks.clear) + + push_exception_handler(reraise_exceptions=True) + self.addCleanup(pop_exception_handler) + + async def test_async_dispatch(self): + event = [] + + async def handler(event): + await asyncio.sleep(0) + event.append('called') + + dispatch_same(handler, event) + + await asyncio.sleep(0.1) + + self.assertEqual(event, ['called']) + + async def test_async_dispatch_error(self): + event = [] + exceptions = [] + + async def handler(event): + await asyncio.sleep(0) + raise Exception("Bad handler") + + def excption_handler(loop, context): + exceptions.append(context['exception'].args[0]) + + with self.asyncio_exception_handler(excption_handler): + dispatch_same(handler, event) + await asyncio.sleep(0.1) + + self.assertEqual(event, []) + self.assertEqual(exceptions, ["Bad handler"]) + + + def test_async_dispatch_no_loop(self): + event = [] + + async def handler(event): + await asyncio.sleep(0) + event.append('called') + + with self.assertWarns(RuntimeWarning): + with self.assertRaises(RuntimeError): + dispatch_same(handler, event) + + self.assertEqual(event, []) + + @contextmanager + def asyncio_exception_handler(self, exc_handler): + loop = asyncio.get_event_loop() + old_handler = loop.get_exception_handler() + loop.set_exception_handler(exc_handler) + try: + yield exc_handler + finally: + loop.set_exception_handler(old_handler) diff --git a/traits/tests/test_observe.py b/traits/tests/test_observe.py index bd0460f8f..435506670 100644 --- a/traits/tests/test_observe.py +++ b/traits/tests/test_observe.py @@ -12,6 +12,7 @@ See tests in ``traits.observations`` for more targeted tests. """ +import asyncio import unittest from traits.api import ( @@ -930,3 +931,49 @@ class A(HasTraits): self.assertEqual(event.index, 2) self.assertEqual(event.removed, [3]) self.assertEqual(event.added, [4]) + + +# Integration tests for async observe decorator ------------------------------- + + +class SimpleAsyncExample(HasTraits): + + value = Str() + + events = List() + + @observe('value') + async def value_changed_async(self, event): + await asyncio.sleep(0) + self.events.append(event) + + +class TestAsyncObserverDecorator(unittest.IsolatedAsyncioTestCase): + + def setUp(self): + from traits.observation.observe import _active_handler_tasks + + # ensure no lingering references to handler tasks after test run + self.addCleanup(_active_handler_tasks.clear) + + async def test_async_dispatch(self): + + obj = SimpleAsyncExample(value='initial') + + self.assertEqual(len(obj.events), 0) + + await asyncio.sleep(0.1) + + self.assertEqual(len(obj.events), 1) + self.assertEqual(obj.events[0].name, 'value') + self.assertEqual(obj.events[0].new, 'initial') + + obj.value = 'changed' + + self.assertEqual(len(obj.events), 1) + + await asyncio.sleep(0.1) + + self.assertEqual(len(obj.events), 2) + self.assertEqual(obj.events[1].name, 'value') + self.assertEqual(obj.events[1].new, 'changed') From d1879e96903b44facdfe3727158d541d39f2ec02 Mon Sep 17 00:00:00 2001 From: Corran Webster Date: Mon, 18 Mar 2024 08:41:43 +0000 Subject: [PATCH 02/11] Apply suggestions from code review Co-authored-by: Mark Dickinson --- docs/source/traits_user_manual/notification.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/traits_user_manual/notification.rst b/docs/source/traits_user_manual/notification.rst index e18a3d5b1..2d4f1de62 100644 --- a/docs/source/traits_user_manual/notification.rst +++ b/docs/source/traits_user_manual/notification.rst @@ -316,7 +316,7 @@ it is invoked. The following example shows the first option: Async Notification Handlers ``````````````````````````` -Since Traits 8.0 you can use an async coroutine as an observe handler, either +Since Traits 7.0 you can use an async coroutine as an observe handler, either with an |@observe| decorator:: class AsyncExample(HasTraits): From e886ff77d641443c9e433066661084458aa66393 Mon Sep 17 00:00:00 2001 From: Corran Webster Date: Mon, 18 Mar 2024 09:11:11 +0000 Subject: [PATCH 03/11] Use inspect.iscoroutinefunction --- traits/observation/observe.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/traits/observation/observe.py b/traits/observation/observe.py index 020051aa2..eeb3a8e75 100644 --- a/traits/observation/observe.py +++ b/traits/observation/observe.py @@ -9,6 +9,7 @@ # Thanks for using Enthought open source! import asyncio +import inspect from traits.observation._observe import add_or_remove_notifiers from traits.observation.expression import compile_expr @@ -34,7 +35,7 @@ def dispatch_same(handler, event): event : object The event object to be given to handler. """ - if asyncio.iscoroutinefunction(handler): + if inspect.iscoroutinefunction(handler): task = asyncio.create_task(handler(event)) _active_handler_tasks.add(task) task.add_done_callback(_active_handler_tasks.discard) From 1358645998cbd362728abe0b4948d5709e29d2b5 Mon Sep 17 00:00:00 2001 From: Corran Webster Date: Mon, 18 Mar 2024 09:43:05 +0000 Subject: [PATCH 04/11] Refactor tests to use Events, remove asyncio.sleep() --- traits/observation/tests/test_observe.py | 47 ++++++++++++------------ traits/tests/test_observe.py | 22 ++++++----- 2 files changed, 37 insertions(+), 32 deletions(-) diff --git a/traits/observation/tests/test_observe.py b/traits/observation/tests/test_observe.py index f84c7bb93..fd3b9e62c 100644 --- a/traits/observation/tests/test_observe.py +++ b/traits/observation/tests/test_observe.py @@ -673,49 +673,50 @@ def setUp(self): self.addCleanup(pop_exception_handler) async def test_async_dispatch(self): - event = [] + + async def test_async_dispatch(self): + event = asyncio.Event() async def handler(event): - await asyncio.sleep(0) - event.append('called') - - dispatch_same(handler, event) + event.set() - await asyncio.sleep(0.1) + async with asyncio.timeout(10): + dispatch_same(handler, event) + await event.wait() - self.assertEqual(event, ['called']) + self.assertTrue(event.is_set()) async def test_async_dispatch_error(self): - event = [] + event = asyncio.Event() exceptions = [] async def handler(event): - await asyncio.sleep(0) raise Exception("Bad handler") - - def excption_handler(loop, context): - exceptions.append(context['exception'].args[0]) + event.set() - with self.asyncio_exception_handler(excption_handler): - dispatch_same(handler, event) - await asyncio.sleep(0.1) + def exception_handler(loop, context): + exceptions.append(context["exception"].args[0]) - self.assertEqual(event, []) - self.assertEqual(exceptions, ["Bad handler"]) + with self.asyncio_exception_handler(exception_handler): + async with asyncio.timeout(0.1): + dispatch_same(handler, event) + await event.wait() + self.assertFalse(event.is_set()) + self.assertEqual(exceptions, ["Bad handler"]) def test_async_dispatch_no_loop(self): - event = [] + event = asyncio.Event() async def handler(event): - await asyncio.sleep(0) - event.append('called') - + event.set() + with self.assertWarns(RuntimeWarning): with self.assertRaises(RuntimeError): - dispatch_same(handler, event) + async with asyncio.timeout(0.1): + dispatch_same(handler, event) - self.assertEqual(event, []) + self.assertFalse(event.is_set()) @contextmanager def asyncio_exception_handler(self, exc_handler): diff --git a/traits/tests/test_observe.py b/traits/tests/test_observe.py index 435506670..eaba0f593 100644 --- a/traits/tests/test_observe.py +++ b/traits/tests/test_observe.py @@ -941,11 +941,13 @@ class SimpleAsyncExample(HasTraits): value = Str() events = List() + + event = Instance(asyncio.Event)) @observe('value') async def value_changed_async(self, event): - await asyncio.sleep(0) self.events.append(event) + self.event.set() class TestAsyncObserverDecorator(unittest.IsolatedAsyncioTestCase): @@ -957,22 +959,24 @@ def setUp(self): self.addCleanup(_active_handler_tasks.clear) async def test_async_dispatch(self): + event = Event() - obj = SimpleAsyncExample(value='initial') + obj = SimpleAsyncExample(value='initial', event=event) - self.assertEqual(len(obj.events), 0) - - await asyncio.sleep(0.1) + async with asyncio.timeout(10): + self.assertEqual(len(obj.events), 0) + await event.wait() self.assertEqual(len(obj.events), 1) self.assertEqual(obj.events[0].name, 'value') self.assertEqual(obj.events[0].new, 'initial') - obj.value = 'changed' - - self.assertEqual(len(obj.events), 1) + event.reset() - await asyncio.sleep(0.1) + async with asyncio.timeout(10): + obj.value = 'changed' + self.assertEqual(len(obj.events), 1) + await event.wait() self.assertEqual(len(obj.events), 2) self.assertEqual(obj.events[1].name, 'value') From 8861b6a3ee71c58fd667146375e57ea82b24e22e Mon Sep 17 00:00:00 2001 From: Corran Webster Date: Mon, 18 Mar 2024 09:46:07 +0000 Subject: [PATCH 05/11] Fix typos. --- traits/observation/tests/test_observe.py | 2 -- traits/tests/test_observe.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/traits/observation/tests/test_observe.py b/traits/observation/tests/test_observe.py index fd3b9e62c..5065c4b5e 100644 --- a/traits/observation/tests/test_observe.py +++ b/traits/observation/tests/test_observe.py @@ -672,8 +672,6 @@ def setUp(self): push_exception_handler(reraise_exceptions=True) self.addCleanup(pop_exception_handler) - async def test_async_dispatch(self): - async def test_async_dispatch(self): event = asyncio.Event() diff --git a/traits/tests/test_observe.py b/traits/tests/test_observe.py index eaba0f593..a113b1aee 100644 --- a/traits/tests/test_observe.py +++ b/traits/tests/test_observe.py @@ -942,7 +942,7 @@ class SimpleAsyncExample(HasTraits): events = List() - event = Instance(asyncio.Event)) + event = Instance(asyncio.Event) @observe('value') async def value_changed_async(self, event): From 1df0bfca99be5ea2bf0916dfbb1223ffa51916c1 Mon Sep 17 00:00:00 2001 From: Corran Webster Date: Mon, 18 Mar 2024 09:47:29 +0000 Subject: [PATCH 06/11] Remove excess whitespace --- traits/tests/test_observe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/traits/tests/test_observe.py b/traits/tests/test_observe.py index a113b1aee..2333a2481 100644 --- a/traits/tests/test_observe.py +++ b/traits/tests/test_observe.py @@ -941,7 +941,7 @@ class SimpleAsyncExample(HasTraits): value = Str() events = List() - + event = Instance(asyncio.Event) @observe('value') From cf8c39daea76cb11fb7b319b4179a1bc57bb6ee9 Mon Sep 17 00:00:00 2001 From: Corran Webster Date: Mon, 18 Mar 2024 09:50:18 +0000 Subject: [PATCH 07/11] Fix tests. --- traits/observation/tests/test_observe.py | 3 +-- traits/tests/test_observe.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/traits/observation/tests/test_observe.py b/traits/observation/tests/test_observe.py index 5065c4b5e..862ae688d 100644 --- a/traits/observation/tests/test_observe.py +++ b/traits/observation/tests/test_observe.py @@ -711,8 +711,7 @@ async def handler(event): with self.assertWarns(RuntimeWarning): with self.assertRaises(RuntimeError): - async with asyncio.timeout(0.1): - dispatch_same(handler, event) + dispatch_same(handler, event) self.assertFalse(event.is_set()) diff --git a/traits/tests/test_observe.py b/traits/tests/test_observe.py index 2333a2481..290d989d5 100644 --- a/traits/tests/test_observe.py +++ b/traits/tests/test_observe.py @@ -959,7 +959,7 @@ def setUp(self): self.addCleanup(_active_handler_tasks.clear) async def test_async_dispatch(self): - event = Event() + event = asyncio.Event() obj = SimpleAsyncExample(value='initial', event=event) From 4053887b9801400d323d1573d198385aa704c6ef Mon Sep 17 00:00:00 2001 From: Corran Webster Date: Mon, 18 Mar 2024 10:01:44 +0000 Subject: [PATCH 08/11] Don't use asyncio.timeout() --- traits/observation/tests/test_observe.py | 10 +++++----- traits/tests/test_observe.py | 17 +++++++++-------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/traits/observation/tests/test_observe.py b/traits/observation/tests/test_observe.py index 862ae688d..65e9a97b2 100644 --- a/traits/observation/tests/test_observe.py +++ b/traits/observation/tests/test_observe.py @@ -678,9 +678,9 @@ async def test_async_dispatch(self): async def handler(event): event.set() - async with asyncio.timeout(10): - dispatch_same(handler, event) - await event.wait() + dispatch_same(handler, event) + + await asyncio.wait_for(event.wait(), timeout=10) self.assertTrue(event.is_set()) @@ -696,9 +696,9 @@ def exception_handler(loop, context): exceptions.append(context["exception"].args[0]) with self.asyncio_exception_handler(exception_handler): - async with asyncio.timeout(0.1): + with self.assertRaises(TimeoutError): dispatch_same(handler, event) - await event.wait() + await asyncio.wait_for(event.wait(), timeout=0.1) self.assertFalse(event.is_set()) self.assertEqual(exceptions, ["Bad handler"]) diff --git a/traits/tests/test_observe.py b/traits/tests/test_observe.py index 290d989d5..43154f3f5 100644 --- a/traits/tests/test_observe.py +++ b/traits/tests/test_observe.py @@ -963,20 +963,21 @@ async def test_async_dispatch(self): obj = SimpleAsyncExample(value='initial', event=event) - async with asyncio.timeout(10): - self.assertEqual(len(obj.events), 0) - await event.wait() + self.assertEqual(len(obj.events), 0) + + await asyncio.wait_for(event.wait(), timeout=10) self.assertEqual(len(obj.events), 1) self.assertEqual(obj.events[0].name, 'value') self.assertEqual(obj.events[0].new, 'initial') - event.reset() + event.clear() + + obj.value = 'changed' + + self.assertEqual(len(obj.events), 1) - async with asyncio.timeout(10): - obj.value = 'changed' - self.assertEqual(len(obj.events), 1) - await event.wait() + await asyncio.wait_for(event.wait(), timeout=10) self.assertEqual(len(obj.events), 2) self.assertEqual(obj.events[1].name, 'value') From 940b3717278b7401817dc26c3e00e61a3d6158ee Mon Sep 17 00:00:00 2001 From: Corran Webster Date: Mon, 18 Mar 2024 10:32:47 +0000 Subject: [PATCH 09/11] Import exception object correctly. --- traits/observation/tests/test_observe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/traits/observation/tests/test_observe.py b/traits/observation/tests/test_observe.py index 65e9a97b2..0c0f80700 100644 --- a/traits/observation/tests/test_observe.py +++ b/traits/observation/tests/test_observe.py @@ -696,7 +696,7 @@ def exception_handler(loop, context): exceptions.append(context["exception"].args[0]) with self.asyncio_exception_handler(exception_handler): - with self.assertRaises(TimeoutError): + with self.assertRaises(asyncio.exceptions.TimeoutError): dispatch_same(handler, event) await asyncio.wait_for(event.wait(), timeout=0.1) From 9148d03443963b9358dc692fab5f877e403c721f Mon Sep 17 00:00:00 2001 From: Corran Webster Date: Mon, 18 Mar 2024 11:07:40 +0000 Subject: [PATCH 10/11] Have an async handler do async things during at least one test. --- traits/tests/test_observe.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/traits/tests/test_observe.py b/traits/tests/test_observe.py index 43154f3f5..266f5daa1 100644 --- a/traits/tests/test_observe.py +++ b/traits/tests/test_observe.py @@ -944,9 +944,12 @@ class SimpleAsyncExample(HasTraits): event = Instance(asyncio.Event) + queue = Instance(asyncio.Queue) + @observe('value') async def value_changed_async(self, event): - self.events.append(event) + queue_value = await self.queue.get() + self.events.append((event, queue_value)) self.event.set() @@ -960,16 +963,22 @@ def setUp(self): async def test_async_dispatch(self): event = asyncio.Event() + queue = asyncio.Queue() - obj = SimpleAsyncExample(value='initial', event=event) + obj = SimpleAsyncExample(value='initial', event=event, queue=queue) self.assertEqual(len(obj.events), 0) + task = asyncio.create_task(queue.put("first")) + await asyncio.wait_for(event.wait(), timeout=10) + self.assertTrue(task.done()) self.assertEqual(len(obj.events), 1) - self.assertEqual(obj.events[0].name, 'value') - self.assertEqual(obj.events[0].new, 'initial') + trait_event, queue_value = obj.events[0] + self.assertEqual(trait_event.name, 'value') + self.assertEqual(trait_event.new, 'initial') + self.assertEqual(queue_value, 'first') event.clear() @@ -977,8 +986,13 @@ async def test_async_dispatch(self): self.assertEqual(len(obj.events), 1) + task = asyncio.create_task(queue.put("second")) + await asyncio.wait_for(event.wait(), timeout=10) + self.assertTrue(task.done()) self.assertEqual(len(obj.events), 2) - self.assertEqual(obj.events[1].name, 'value') - self.assertEqual(obj.events[1].new, 'changed') + trait_event, queue_value = obj.events[1] + self.assertEqual(trait_event.name, 'value') + self.assertEqual(trait_event.new, 'changed') + self.assertEqual(queue_value, 'second') From cc9127caa6588095f48162afd61f9c4523b72dca Mon Sep 17 00:00:00 2001 From: Mark Dickinson Date: Tue, 26 Mar 2024 09:56:44 +0000 Subject: [PATCH 11/11] Adjust wait condition in test --- traits/observation/tests/test_observe.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/traits/observation/tests/test_observe.py b/traits/observation/tests/test_observe.py index 0c0f80700..436987965 100644 --- a/traits/observation/tests/test_observe.py +++ b/traits/observation/tests/test_observe.py @@ -690,17 +690,15 @@ async def test_async_dispatch_error(self): async def handler(event): raise Exception("Bad handler") - event.set() def exception_handler(loop, context): exceptions.append(context["exception"].args[0]) + event.set() with self.asyncio_exception_handler(exception_handler): - with self.assertRaises(asyncio.exceptions.TimeoutError): - dispatch_same(handler, event) - await asyncio.wait_for(event.wait(), timeout=0.1) + dispatch_same(handler, event) + await asyncio.wait_for(event.wait(), timeout=10.0) - self.assertFalse(event.is_set()) self.assertEqual(exceptions, ["Bad handler"]) def test_async_dispatch_no_loop(self):