diff --git a/Cargo.toml b/Cargo.toml index 06a852a..715268d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pyo3-asyncio" description = "PyO3 utilities for Python's Asyncio library" -version = "0.13.5" +version = "0.14.0" authors = ["Andrew J Westlake "] readme = "README.md" keywords = ["pyo3", "python", "ffi", "async", "asyncio"] @@ -89,8 +89,11 @@ futures = "0.3" inventory = "0.1" lazy_static = "1.4" once_cell = "1.5" -pyo3 = "0.13" -pyo3-asyncio-macros = { path = "pyo3-asyncio-macros", version = "=0.13.5", optional = true } +pyo3 = "0.14" +pyo3-asyncio-macros = { path = "pyo3-asyncio-macros", version = "=0.14.0", optional = true } + +[dev-dependencies] +pyo3 = { version = "0.14", features = ["macros"] } [dependencies.async-std] version = "1.9" diff --git a/README.md b/README.md index dfc67ec..afca8a3 100644 --- a/README.md +++ b/README.md @@ -17,21 +17,36 @@ > PyO3 Asyncio is a _brand new_ part of the broader PyO3 ecosystem. Feel free to open any issues for feature requests or bugfixes for this crate. -## Known Problems +__If you're a new-comer, the best way to get started is to read through the primer below! For `v0.13` users I highly recommend reading through the [migration section](#migrating-from-013-to-014) to get a general idea of what's changed in `v0.14`.__ -This library can give spurious failures during finalization prior to PyO3 release `v0.13.2`. Make sure your PyO3 dependency is up-to-date! +## PyO3 Asyncio Primer + +If you are working with a Python library that makes use of async functions or wish to provide +Python bindings for an async Rust library, [`pyo3-asyncio`](https://github.com/awestlake87/pyo3-asyncio) +likely has the tools you need. It provides conversions between async functions in both Python and +Rust and was designed with first-class support for popular Rust runtimes such as +[`tokio`](https://tokio.rs/) and [`async-std`](https://async.rs/). In addition, all async Python +code runs on the default `asyncio` event loop, so `pyo3-asyncio` should work just fine with existing +Python libraries. + +In the following sections, we'll give a general overview of `pyo3-asyncio` explaining how to call +async Python functions with PyO3, how to call async Rust functions from Python, and how to configure +your codebase to manage the runtimes of both. + +### Quickstart -## Quickstart +Here are some examples to get you started right away! A more detailed breakdown +of the concepts in these examples can be found in the following sections. -### Rust Applications +#### Rust Applications Here we initialize the runtime, import Python's `asyncio` library and run the given future to completion using Python's default `EventLoop` and `async-std`. Inside the future, we convert `asyncio` sleep into a Rust future and await it. ```toml # Cargo.toml dependencies [dependencies] -pyo3 = { version = "0.13" } -pyo3-asyncio = { version = "0.13", features = ["attributes", "async-std-runtime"] } +pyo3 = { version = "0.14" } +pyo3-asyncio = { version = "0.14", features = ["attributes", "async-std-runtime"] } async-std = "1.9" ``` @@ -44,9 +59,8 @@ use pyo3::prelude::*; async fn main() -> PyResult<()> { let fut = Python::with_gil(|py| { let asyncio = py.import("asyncio")?; - // convert asyncio.sleep into a Rust Future - pyo3_asyncio::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?) + pyo3_asyncio::async_std::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?) })?; fut.await?; @@ -61,8 +75,8 @@ attribute. ```toml # Cargo.toml dependencies [dependencies] -pyo3 = { version = "0.13" } -pyo3-asyncio = { version = "0.13", features = ["attributes", "tokio-runtime"] } +pyo3 = { version = "0.14" } +pyo3-asyncio = { version = "0.14", features = ["attributes", "tokio-runtime"] } tokio = "1.4" ``` @@ -75,9 +89,8 @@ use pyo3::prelude::*; async fn main() -> PyResult<()> { let fut = Python::with_gil(|py| { let asyncio = py.import("asyncio")?; - // convert asyncio.sleep into a Rust Future - pyo3_asyncio::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?) + pyo3_asyncio::tokio::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?) })?; fut.await?; @@ -86,9 +99,9 @@ async fn main() -> PyResult<()> { } ``` -More details on the usage of this library can be found in the [API docs](https://awestlake87.github.io/pyo3-asyncio/master/doc). +More details on the usage of this library can be found in the [API docs](https://awestlake87.github.io/pyo3-asyncio/master/doc) and the primer below. -### PyO3 Native Rust Modules +#### PyO3 Native Rust Modules PyO3 Asyncio can also be used to write native modules with async functions. @@ -105,16 +118,16 @@ Make your project depend on `pyo3` with the `extension-module` feature enabled a For `async-std`: ```toml [dependencies] -pyo3 = { version = "0.13", features = ["extension-module"] } -pyo3-asyncio = { version = "0.13", features = ["async-std-runtime"] } +pyo3 = { version = "0.14", features = ["extension-module"] } +pyo3-asyncio = { version = "0.14", features = ["async-std-runtime"] } async-std = "1.9" ``` For `tokio`: ```toml [dependencies] -pyo3 = { version = "0.13", features = ["extension-module"] } -pyo3-asyncio = { version = "0.13", features = ["tokio-runtime"] } +pyo3 = { version = "0.14", features = ["extension-module"] } +pyo3-asyncio = { version = "0.14", features = ["tokio-runtime"] } tokio = "1.4" ``` @@ -126,8 +139,8 @@ Export an async function that makes use of `async-std`: use pyo3::{prelude::*, wrap_pyfunction}; #[pyfunction] -fn rust_sleep(py: Python) -> PyResult { - pyo3_asyncio::async_std::into_coroutine(py, async { +fn rust_sleep(py: Python) -> PyResult<&PyAny> { + pyo3_asyncio::async_std::future_into_py(py, async { async_std::task::sleep(std::time::Duration::from_secs(1)).await; Ok(Python::with_gil(|py| py.None())) }) @@ -135,8 +148,6 @@ fn rust_sleep(py: Python) -> PyResult { #[pymodule] fn my_async_module(py: Python, m: &PyModule) -> PyResult<()> { - pyo3_asyncio::try_init(py)?; - m.add_function(wrap_pyfunction!(rust_sleep, m)?)?; Ok(()) @@ -152,8 +163,8 @@ If you want to use `tokio` instead, here's what your module should look like: use pyo3::{prelude::*, wrap_pyfunction}; #[pyfunction] -fn rust_sleep(py: Python) -> PyResult { - pyo3_asyncio::tokio::into_coroutine(py, async { +fn rust_sleep(py: Python) -> PyResult<&PyAny> { + pyo3_asyncio::tokio::future_into_py(py, async { tokio::time::sleep(std::time::Duration::from_secs(1)).await; Ok(Python::with_gil(|py| py.None())) }) @@ -161,44 +172,456 @@ fn rust_sleep(py: Python) -> PyResult { #[pymodule] fn my_async_module(py: Python, m: &PyModule) -> PyResult<()> { - pyo3_asyncio::try_init(py)?; - // Tokio needs explicit initialization before any pyo3-asyncio conversions. - // The module import is a prime place to do this. - pyo3_asyncio::tokio::init_multi_thread_once(); - m.add_function(wrap_pyfunction!(rust_sleep, m)?)?; + Ok(()) +} +``` + +You can build your module with maturin (see the [Using Rust in Python](https://pyo3.rs/main/#using-rust-from-python) section in the PyO3 guide for setup instructions). After that you should be able to run the Python REPL to try it out. + +```bash +maturin develop && python3 +🔗 Found pyo3 bindings +🐍 Found CPython 3.8 at python3 + Finished dev [unoptimized + debuginfo] target(s) in 0.04s +Python 3.8.5 (default, Jan 27 2021, 15:41:15) +[GCC 9.3.0] on linux +Type "help", "copyright", "credits" or "license" for more information. +>>> import asyncio +>>> +>>> from my_async_module import rust_sleep +>>> +>>> async def main(): +>>> await rust_sleep() +>>> +>>> # should sleep for 1s +>>> asyncio.run(main()) +>>> +``` + +### Awaiting an Async Python Function in Rust + +Let's take a look at a dead simple async Python function: + +```python +# Sleep for 1 second +async def py_sleep(): + await asyncio.sleep(1) +``` + +**Async functions in Python are simply functions that return a `coroutine` object**. For our purposes, +we really don't need to know much about these `coroutine` objects. The key factor here is that calling +an `async` function is _just like calling a regular function_, the only difference is that we have +to do something special with the object that it returns. + +Normally in Python, that something special is the `await` keyword, but in order to await this +coroutine in Rust, we first need to convert it into Rust's version of a `coroutine`: a `Future`. +That's where `pyo3-asyncio` comes in. +[`pyo3_asyncio::into_future`](https://docs.rs/pyo3-asyncio/latest/pyo3_asyncio/fn.into_future.html) +performs this conversion for us: + + +```rust no_run +use pyo3::prelude::*; + +#[pyo3_asyncio::tokio::main] +async fn main() -> PyResult<()> { + let future = Python::with_gil(|py| -> PyResult<_> { + // import the module containing the py_sleep function + let example = py.import("example")?; + + // calling the py_sleep method like a normal function + // returns a coroutine + let coroutine = example.call_method0("py_sleep")?; + + // convert the coroutine into a Rust future using the + // tokio runtime + pyo3_asyncio::tokio::into_future(coroutine) + })?; + + // await the future + future.await?; Ok(()) } +``` + +> If you're interested in learning more about `coroutines` and `awaitables` in general, check out the +> [Python 3 `asyncio` docs](https://docs.python.org/3/library/asyncio-task.html) for more information. + +### Awaiting a Rust Future in Python + +Here we have the same async function as before written in Rust using the +[`async-std`](https://async.rs/) runtime: + +```rust +/// Sleep for 1 second +async fn rust_sleep() { + async_std::task::sleep(std::time::Duration::from_secs(1)).await; +} +``` + +Similar to Python, Rust's async functions also return a special object called a +`Future`: + +```rust compile_fail +let future = rust_sleep(); +``` + +We can convert this `Future` object into Python to make it `awaitable`. This tells Python that you +can use the `await` keyword with it. In order to do this, we'll call +[`pyo3_asyncio::async_std::future_into_py`](https://docs.rs/pyo3-asyncio/latest/pyo3_asyncio/async_std/fn.future_into_py.html): + +```rust +use pyo3::prelude::*; + +async fn rust_sleep() { + async_std::task::sleep(std::time::Duration::from_secs(1)).await; +} + +#[pyfunction] +fn call_rust_sleep(py: Python) -> PyResult<&PyAny> { + pyo3_asyncio::async_std::future_into_py(py, async move { + rust_sleep().await; + Ok(Python::with_gil(|py| py.None())) + }) +} +``` +In Python, we can call this pyo3 function just like any other async function: + +```python +from example import call_rust_sleep + +async def rust_sleep(): + await call_rust_sleep() ``` -Build your module and rename `libmy_async_module.so` to `my_async_module.so` +## Managing Event Loops + +Python's event loop requires some special treatment, especially regarding the main thread. Some of +Python's `asyncio` features, like proper signal handling, require control over the main thread, which +doesn't always play well with Rust. + +Luckily, Rust's event loops are pretty flexible and don't _need_ control over the main thread, so in +`pyo3-asyncio`, we decided the best way to handle Rust/Python interop was to just surrender the main +thread to Python and run Rust's event loops in the background. Unfortunately, since most event loop +implementations _prefer_ control over the main thread, this can still make some things awkward. + +### PyO3 Asyncio Initialization + +Because Python needs to control the main thread, we can't use the convenient proc macros from Rust +runtimes to handle the `main` function or `#[test]` functions. Instead, the initialization for PyO3 has to be done from the `main` function and the main +thread must block on [`pyo3_asyncio::run_forever`](https://docs.rs/pyo3-asyncio/latest/pyo3_asyncio/fn.run_forever.html) or [`pyo3_asyncio::async_std::run_until_complete`](https://docs.rs/pyo3-asyncio/latest/pyo3_asyncio/async_std/fn.run_until_complete.html). + +Because we have to block on one of those functions, we can't use [`#[async_std::main]`](https://docs.rs/async-std/latest/async_std/attr.main.html) or [`#[tokio::main]`](https://docs.rs/tokio/1.1.0/tokio/attr.main.html) +since it's not a good idea to make long blocking calls during an async function. + +> Internally, these `#[main]` proc macros are expanded to something like this: +> ```rust compile_fail +> fn main() { +> // your async main fn +> async fn _main_impl() { /* ... */ } +> Runtime::new().block_on(_main_impl()); +> } +> ``` +> Making a long blocking call inside the `Future` that's being driven by `block_on` prevents that +> thread from doing anything else and can spell trouble for some runtimes (also this will actually +> deadlock a single-threaded runtime!). Many runtimes have some sort of `spawn_blocking` mechanism +> that can avoid this problem, but again that's not something we can use here since we need it to +> block on the _main_ thread. + +For this reason, `pyo3-asyncio` provides its own set of proc macros to provide you with this +initialization. These macros are intended to mirror the initialization of `async-std` and `tokio` +while also satisfying the Python runtime's needs. + +Here's a full example of PyO3 initialization with the `async-std` runtime: +```rust no_run +use pyo3::prelude::*; + +#[pyo3_asyncio::async_std::main] +async fn main() -> PyResult<()> { + // PyO3 is initialized - Ready to go + + let fut = Python::with_gil(|py| -> PyResult<_> { + let asyncio = py.import("asyncio")?; + + // convert asyncio.sleep into a Rust Future + pyo3_asyncio::async_std::into_future( + asyncio.call_method1("sleep", (1.into_py(py),))? + ) + })?; + + fut.await?; + + Ok(()) +} +``` + +#### A Note About `asyncio.run` + +In Python 3.7+, the recommended way to run a top-level coroutine with `asyncio` +is with `asyncio.run`. In `v0.13` we recommended against using this function due to initialization issues, but in `v0.14` it's perfectly valid to use this function... with a caveat. + +Since our Rust <--> Python conversions require a reference to the Python event loop, this poses a problem. Imagine we have a PyO3 Asyncio module that defines +a `rust_sleep` function like in previous examples. You might rightfully assume that you can call pass this directly into `asyncio.run` like this: + +```python +import asyncio + +from my_async_module import rust_sleep + +asyncio.run(rust_sleep()) +``` + +You might be surprised to find out that this throws an error: ```bash -cargo build --release && mv target/release/libmy_async_module.so target/release/my_async_module.so +Traceback (most recent call last): + File "example.py", line 5, in + asyncio.run(rust_sleep()) +RuntimeError: no running event loop +``` + +What's happening here is that we are calling `rust_sleep` _before_ the future is +actually running on the event loop created by `asyncio.run`. This is counter-intuitive, but expected behaviour, and unfortunately there doesn't seem to be a good way of solving this problem within PyO3 Asyncio itself. + +However, we can make this example work with a simple workaround: + +```python +import asyncio + +from my_async_module import rust_sleep + +# Calling main will just construct the coroutine that later calls rust_sleep. +# - This ensures that rust_sleep will be called when the event loop is running, +# not before. +async def main(): + await rust_sleep() + +# Run the main() coroutine at the top-level instead +asyncio.run(main()) +``` + +#### Non-standard Python Event Loops + +Python allows you to use alternatives to the default `asyncio` event loop. One +popular alternative is `uvloop`. In `v0.13` using non-standard event loops was +a bit of an ordeal, but in `v0.14` it's trivial. + +#### Using `uvloop` in a PyO3 Asyncio Native Extensions + +```toml +# Cargo.toml + +[lib] +name = "my_async_module" +crate-type = ["cdylib"] + +[dependencies] +pyo3 = { version = "0.14", features = ["extension-module"] } +pyo3-asyncio = { version = "0.14", features = ["tokio-runtime"] } +async-std = "1.9" +tokio = "1.4" ``` -Now, point your `PYTHONPATH` to the directory containing `my_async_module.so`, then you'll be able -to import and use it: +```rust +//! lib.rs + +use pyo3::{prelude::*, wrap_pyfunction}; + +#[pyfunction] +fn rust_sleep(py: Python) -> PyResult<&PyAny> { + pyo3_asyncio::tokio::future_into_py(py, async { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Ok(Python::with_gil(|py| py.None())) + }) +} + +#[pymodule] +fn my_async_module(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_function(wrap_pyfunction!(rust_sleep, m)?)?; + + Ok(()) +} +``` ```bash -$ PYTHONPATH=target/release python3 -Python 3.8.5 (default, Jan 27 2021, 15:41:15) -[GCC 9.3.0] on linux +$ maturin develop && python3 +🔗 Found pyo3 bindings +🐍 Found CPython 3.8 at python3 + Finished dev [unoptimized + debuginfo] target(s) in 0.04s +Python 3.8.8 (default, Apr 13 2021, 19:58:26) +[GCC 7.3.0] :: Anaconda, Inc. on linux Type "help", "copyright", "credits" or "license" for more information. >>> import asyncio ->>> from my_async_module import rust_sleep +>>> import uvloop >>> ->>> # should sleep for 1s ->>> asyncio.get_event_loop().run_until_complete(rust_sleep()) +>>> import my_async_module +>>> +>>> uvloop.install() +>>> +>>> async def main(): +... await my_async_module.rust_sleep() +... +>>> asyncio.run(main()) >>> ``` -> Note that we are using `EventLoop.run_until_complete` here instead of the newer `asyncio.run`. That is because `asyncio.run` will set up its own internal event loop that `pyo3_asyncio` will not be aware of. For this reason, running `pyo3_asyncio` conversions through `asyncio.run` is not currently supported. -> -> This restriction may be lifted in a future release. +#### Using `uvloop` in Rust Applications + +Using `uvloop` in Rust applications is a bit trickier, but it's still possible +with relatively few modifications. + +Unfortunately, we can't make use of the `#[pyo3_asyncio::::main]` attribute with non-standard event loops. This is because the `#[pyo3_asyncio::::main]` proc macro has to interact with the Python +event loop before we can install the `uvloop` policy. + +```toml +[dependencies] +async-std = "1.9" +pyo3 = "0.14" +pyo3-asyncio = { version = "0.14", features = ["async-std-runtime"] } +``` + +```rust +//! main.rs + +use pyo3::{prelude::*, types::PyType}; + +fn main() -> PyResult<()> { + pyo3::prepare_freethreaded_python(); + + Python::with_gil(|py| { + let uvloop = py.import("uvloop")?; + uvloop.call_method0("install")?; + + // store a reference for the assertion + let uvloop = PyObject::from(uvloop); + + pyo3_asyncio::async_std::run(py, async move { + // verify that we are on a uvloop.Loop + Python::with_gil(|py| -> PyResult<()> { + assert!(uvloop + .as_ref(py) + .getattr("Loop")? + .downcast::() + .unwrap() + .is_instance(pyo3_asyncio::async_std::get_current_loop(py)?)?); + Ok(()) + })?; + + async_std::task::sleep(std::time::Duration::from_secs(1)).await; + + Ok(()) + }) + }) +} +``` + +### Additional Information +- Managing event loop references can be tricky with pyo3-asyncio. See [Event Loop References](https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#event-loop-references) in the API docs to get a better intuition for how event loop references are managed in this library. +- Testing pyo3-asyncio libraries and applications requires a custom test harness since Python requires control over the main thread. You can find a testing guide in the [API docs for the `testing` module](https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/testing) + +## Migrating from 0.13 to 0.14 + +So what's changed from `v0.13` to `v0.14`? + +Well, a lot actually. There were some pretty major flaws in the initialization behaviour of `v0.13`. While it would have been nicer to address these issues without changing the public API, I decided it'd be better to break some of the old API rather than completely change the underlying behaviour of the existing functions. I realize this is going to be a bit of a headache, so hopefully this section will help you through it. + +To make things a bit easier, I decided to keep most of the old API alongside the new one (with some deprecation warnings to encourage users to move away from it). It should be possible to use the `v0.13` API alongside the newer `v0.14` API, which should allow you to upgrade your application piecemeal rather than all at once. + +__Before you get started, I personally recommend taking a look at [Event Loop References and Thread-awareness](#event-loop-references-and-thread-awareness) in order to get a better grasp on the motivation behind these changes and the nuance involved in using the new conversions.__ + +### 0.14 Highlights +- Tokio initialization is now lazy. + - No configuration necessary if you're using the multithreaded scheduler + - Calls to `pyo3_asyncio::tokio::init_multithread` or `pyo3_asyncio::tokio::init_multithread_once` can just be removed. + - Calls to `pyo3_asyncio::tokio::init_current_thread` or `pyo3_asyncio::tokio::init_current_thread_once` require some special attention. + - Custom runtime configuration is done by passing a `tokio::runtime::Builder` into `pyo3_asyncio::tokio::init` instead of a `tokio::runtime::Runtime` +- A new, more correct set of functions has been added to replace the `v0.13` conversions. + - `pyo3_asyncio::into_future_with_loop` + - `pyo3_asyncio::::future_into_py_with_loop` + - `pyo3_asyncio::::local_future_into_py_with_loop` + - `pyo3_asyncio::::into_future` + - `pyo3_asyncio::::future_into_py` + - `pyo3_asyncio::::local_future_into_py` + - `pyo3_asyncio::::get_current_loop` +- `pyo3_asyncio::try_init` is no longer required if you're only using `0.14` conversions +- The `ThreadPoolExecutor` is no longer configured automatically at the start. + - Fortunately, this doesn't seem to have much effect on `v0.13` code, it just means that it's now possible to configure the executor manually as you see fit. + +### Upgrading Your Code to 0.14 + +1. Fix PyO3 0.14 initialization. + - PyO3 0.14 feature gated its automatic initialization behaviour behind "auto-initialize". You can either enable the "auto-initialize" behaviour in your project or add a call to `pyo3::prepare_freethreaded_python()` to the start of your program. + - If you're using the `#[pyo3_asyncio::::main]` proc macro attributes, then you can skip this step. `#[pyo3_asyncio::::main]` will call `pyo3::prepare_freethreaded_python()` at the start regardless of your project's "auto-initialize" feature. +2. Fix the tokio initialization. + - Calls to `pyo3_asyncio::tokio::init_multithread` or `pyo3_asyncio::tokio::init_multithread_once` can just be removed. + - If you're using the current thread scheduler, you'll need to manually spawn the thread that it runs on during initialization: + ```rust no_run + let mut builder = tokio::runtime::Builder::new_current_thread(); + builder.enable_all(); + + pyo3_asyncio::tokio::init(builder); + std::thread::spawn(move || { + pyo3_asyncio::tokio::get_runtime().block_on( + futures::future::pending::<()>() + ); + }); + ``` + - Custom `tokio::runtime::Builder` configs can be passed into `pyo3_asyncio::tokio::init`. The `tokio::runtime::Runtime` will be lazily instantiated on the first call to `pyo3_asyncio::tokio::get_runtime()` +3. If you're using `pyo3_asyncio::run_forever` in your application, you should switch to a more manual approach. + > `run_forever` is not the recommended way of running an event loop in Python, so it might be a good idea to move away from it. This function would have needed to change for `0.14`, but since it's considered an edge case, it was decided that users could just manually call it if they need to. + ```rust + use pyo3::prelude::*; + + fn main() -> PyResult<()> { + pyo3::prepare_freethreaded_python(); + + Python::with_gil(|py| { + let asyncio = py.import("asyncio")?; + + let event_loop = asyncio.call_method0("new_event_loop")?; + asyncio.call_method1("set_event_loop", (event_loop,))?; + + let event_loop_hdl = PyObject::from(event_loop); + + pyo3_asyncio::tokio::get_runtime().spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // Stop the event loop manually + Python::with_gil(|py| { + event_loop_hdl + .as_ref(py) + .call_method1( + "call_soon_threadsafe", + (event_loop_hdl + .as_ref(py) + .getattr("stop") + .unwrap(),), + ) + .unwrap(); + }) + }); + + event_loop.call_method0("run_forever")?; + Ok(()) + }) + } + ``` +4. Replace conversions with their newer counterparts. + > You may encounter some issues regarding the usage of `get_running_loop` vs `get_event_loop`. For more details on these newer conversions and how they should be used see [Event Loop References and Thread-awareness](#event-loop-references-and-thread-awareness). + - Replace `pyo3_asyncio::into_future` with `pyo3_asyncio::::into_future` + - Replace `pyo3_asyncio::::into_coroutine` with `pyo3_asyncio::::future_into_py` + - Replace `pyo3_asyncio::get_event_loop` with `pyo3_asyncio::::get_current_loop` +5. After all conversions have been replaced with their `v0.14` counterparts, `pyo3_asyncio::try_init` can safely be removed. + +> The `v0.13` API will likely still be supported in version `v0.15`, but no solid guarantees after that point. + +## Known Problems + +This library can give spurious failures during finalization prior to PyO3 release `v0.13.2`. Make sure your PyO3 dependency is up-to-date! ## MSRV Currently the MSRV for this library is 1.46.0, _but_ if you don't need to use the `async-std-runtime` feature, you can use rust 1.45.0. -> `async-std` depends on `socket2` which fails to compile under 1.45.0. \ No newline at end of file +> `async-std` depends on `socket2` which fails to compile under 1.45.0. diff --git a/examples/async_std.rs b/examples/async_std.rs index 35d72f3..ba7c8e7 100644 --- a/examples/async_std.rs +++ b/examples/async_std.rs @@ -6,7 +6,7 @@ async fn main() -> PyResult<()> { let asyncio = py.import("asyncio")?; // convert asyncio.sleep into a Rust Future - pyo3_asyncio::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?) + pyo3_asyncio::async_std::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?) })?; println!("sleeping for 1s"); diff --git a/examples/tokio.rs b/examples/tokio.rs index 8f5f49a..794c43a 100644 --- a/examples/tokio.rs +++ b/examples/tokio.rs @@ -6,7 +6,7 @@ async fn main() -> PyResult<()> { let asyncio = py.import("asyncio")?; // convert asyncio.sleep into a Rust Future - pyo3_asyncio::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?) + pyo3_asyncio::tokio::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?) })?; println!("sleeping for 1s"); diff --git a/examples/tokio_current_thread.rs b/examples/tokio_current_thread.rs index 84b56a8..5b8819a 100644 --- a/examples/tokio_current_thread.rs +++ b/examples/tokio_current_thread.rs @@ -6,7 +6,7 @@ async fn main() -> PyResult<()> { let asyncio = py.import("asyncio")?; // convert asyncio.sleep into a Rust Future - pyo3_asyncio::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?) + pyo3_asyncio::tokio::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?) })?; println!("sleeping for 1s"); diff --git a/examples/tokio_multi_thread.rs b/examples/tokio_multi_thread.rs index 82556f0..0deb688 100644 --- a/examples/tokio_multi_thread.rs +++ b/examples/tokio_multi_thread.rs @@ -6,7 +6,7 @@ async fn main() -> PyResult<()> { let asyncio = py.import("asyncio")?; // convert asyncio.sleep into a Rust Future - pyo3_asyncio::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?) + pyo3_asyncio::tokio::into_future(asyncio.call_method1("sleep", (1.into_py(py),))?) })?; println!("sleeping for 1s"); diff --git a/pyo3-asyncio-macros/Cargo.toml b/pyo3-asyncio-macros/Cargo.toml index 7a7b4a4..08377c6 100644 --- a/pyo3-asyncio-macros/Cargo.toml +++ b/pyo3-asyncio-macros/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pyo3-asyncio-macros" description = "Proc Macro Attributes for PyO3 Asyncio" -version = "0.13.5" +version = "0.14.0" authors = ["Andrew J Westlake "] readme = "../README.md" keywords = ["pyo3", "python", "ffi", "async", "asyncio"] diff --git a/pyo3-asyncio-macros/src/lib.rs b/pyo3-asyncio-macros/src/lib.rs index 5f02273..92da632 100644 --- a/pyo3-asyncio-macros/src/lib.rs +++ b/pyo3-asyncio-macros/src/lib.rs @@ -49,16 +49,14 @@ pub fn async_std_main(_attr: TokenStream, item: TokenStream) -> TokenStream { #body } - pyo3::Python::with_gil(|py| { - pyo3_asyncio::with_runtime(py, || { - pyo3_asyncio::async_std::run_until_complete(py, main())?; + pyo3::prepare_freethreaded_python(); - Ok(()) - }) - .map_err(|e| { - e.print_and_set_sys_last_vars(py); - }) - .unwrap(); + pyo3::Python::with_gil(|py| { + pyo3_asyncio::async_std::run(py, main()) + .map_err(|e| { + e.print_and_set_sys_last_vars(py); + }) + .unwrap(); }); } }; @@ -128,6 +126,13 @@ pub fn tokio_main(args: TokenStream, item: TokenStream) -> TokenStream { /// thread::sleep(Duration::from_secs(1)); /// Ok(()) /// } +/// +/// // blocking test functions can optionally accept an event_loop parameter +/// #[pyo3_asyncio::async_std::test] +/// fn test_blocking_sleep_with_event_loop(event_loop: PyObject) -> PyResult<()> { +/// thread::sleep(Duration::from_secs(1)); +/// Ok(()) +/// } /// ``` #[cfg(not(test))] // NOTE: exporting main breaks tests, we should file an issue. #[proc_macro_attribute] @@ -140,15 +145,31 @@ pub fn async_std_test(_attr: TokenStream, item: TokenStream) -> TokenStream { let vis = &input.vis; let fn_impl = if input.sig.asyncness.is_none() { + // Optionally pass an event_loop parameter to blocking tasks + let task = if sig.inputs.is_empty() { + quote! { + Box::pin(pyo3_asyncio::async_std::re_exports::spawn_blocking(move || { + #name() + })) + } + } else { + quote! { + let event_loop = Python::with_gil(|py| { + pyo3_asyncio::async_std::get_current_loop(py).unwrap().into() + }); + Box::pin(pyo3_asyncio::async_std::re_exports::spawn_blocking(move || { + #name(event_loop) + })) + } + }; + quote! { #vis fn #name() -> std::pin::Pin> + Send>> { #sig { #body } - Box::pin(pyo3_asyncio::async_std::re_exports::spawn_blocking(move || { - #name() - })) + #task } } } else { @@ -204,6 +225,13 @@ pub fn async_std_test(_attr: TokenStream, item: TokenStream) -> TokenStream { /// thread::sleep(Duration::from_secs(1)); /// Ok(()) /// } +/// +/// // blocking test functions can optionally accept an event_loop parameter +/// #[pyo3_asyncio::tokio::test] +/// fn test_blocking_sleep_with_event_loop(event_loop: PyObject) -> PyResult<()> { +/// thread::sleep(Duration::from_secs(1)); +/// Ok(()) +/// } /// ``` #[cfg(not(test))] // NOTE: exporting main breaks tests, we should file an issue. #[proc_macro_attribute] @@ -216,14 +244,11 @@ pub fn tokio_test(_attr: TokenStream, item: TokenStream) -> TokenStream { let vis = &input.vis; let fn_impl = if input.sig.asyncness.is_none() { - quote! { - #vis fn #name() -> std::pin::Pin> + Send>> { - #sig { - #body - } - - Box::pin(async { - match pyo3_asyncio::tokio::get_runtime().spawn_blocking(&#name).await { + // Optionally pass an event_loop parameter to blocking tasks + let task = if sig.inputs.is_empty() { + quote! { + Box::pin(async move { + match pyo3_asyncio::tokio::get_runtime().spawn_blocking(move || #name()).await { Ok(result) => result, Err(e) => { assert!(e.is_panic()); @@ -232,6 +257,31 @@ pub fn tokio_test(_attr: TokenStream, item: TokenStream) -> TokenStream { } }) } + } else { + quote! { + let event_loop = Python::with_gil(|py| { + pyo3_asyncio::tokio::get_current_loop(py).unwrap().into() + }); + Box::pin(async move { + match pyo3_asyncio::tokio::get_runtime().spawn_blocking(move || #name(event_loop)).await { + Ok(result) => result, + Err(e) => { + assert!(e.is_panic()); + Err(pyo3::exceptions::PyException::new_err("rust future panicked")) + } + } + }) + } + }; + + quote! { + #vis fn #name() -> std::pin::Pin> + Send>> { + #sig { + #body + } + + #task + } } } else { quote! { diff --git a/pyo3-asyncio-macros/src/tokio.rs b/pyo3-asyncio-macros/src/tokio.rs index 98d711e..081d35f 100644 --- a/pyo3-asyncio-macros/src/tokio.rs +++ b/pyo3-asyncio-macros/src/tokio.rs @@ -219,7 +219,7 @@ fn parse_knobs( let config = config.build()?; - let mut rt = match config.flavor { + let builder = match config.flavor { RuntimeFlavor::CurrentThread => quote! { pyo3_asyncio::tokio::re_exports::runtime::Builder::new_current_thread() }, @@ -227,8 +227,15 @@ fn parse_knobs( pyo3_asyncio::tokio::re_exports::runtime::Builder::new_multi_thread() }, }; + + let mut builder_init = quote! { + builder.enable_all(); + }; if let Some(v) = config.worker_threads { - rt = quote! { #rt.worker_threads(#v) }; + builder_init = quote! { + builder.worker_threads(#v); + #builder_init; + }; } let rt_init = match config.flavor { @@ -247,25 +254,21 @@ fn parse_knobs( #body } - pyo3_asyncio::tokio::init( - #rt - .enable_all() - .build() - .unwrap() - ); + pyo3::prepare_freethreaded_python(); + + let mut builder = #builder; + #builder_init; + + pyo3_asyncio::tokio::init(builder); #rt_init pyo3::Python::with_gil(|py| { - pyo3_asyncio::with_runtime(py, || { - pyo3_asyncio::tokio::run_until_complete(py, main())?; - - Ok(()) - }) - .map_err(|e| { - e.print_and_set_sys_last_vars(py); - }) - .unwrap(); + pyo3_asyncio::tokio::run(py, main()) + .map_err(|e| { + e.print_and_set_sys_last_vars(py); + }) + .unwrap(); }); } }; diff --git a/pytests/common/mod.rs b/pytests/common/mod.rs index c4ffb48..a6f4e16 100644 --- a/pytests/common/mod.rs +++ b/pytests/common/mod.rs @@ -12,7 +12,24 @@ async def sleep_for_1s(sleep_for): await sleep_for(1) "#; -pub(super) async fn test_into_future() -> PyResult<()> { +pub(super) async fn test_into_future(event_loop: PyObject) -> PyResult<()> { + let fut = Python::with_gil(|py| { + let test_mod = + PyModule::from_code(py, TEST_MOD, "test_rust_coroutine/test_mod.py", "test_mod")?; + + pyo3_asyncio::into_future_with_loop( + event_loop.as_ref(py), + test_mod.call_method1("py_sleep", (1.into_py(py),))?, + ) + })?; + + fut.await?; + + Ok(()) +} + +#[allow(deprecated)] +pub(super) async fn test_into_future_0_13() -> PyResult<()> { let fut = Python::with_gil(|py| { let test_mod = PyModule::from_code(py, TEST_MOD, "test_rust_coroutine/test_mod.py", "test_mod")?; @@ -30,13 +47,13 @@ pub(super) fn test_blocking_sleep() -> PyResult<()> { Ok(()) } -pub(super) async fn test_other_awaitables() -> PyResult<()> { +pub(super) async fn test_other_awaitables(event_loop: PyObject) -> PyResult<()> { let fut = Python::with_gil(|py| { let functools = py.import("functools")?; let time = py.import("time")?; // spawn a blocking sleep in the threadpool executor - returns a task, not a coroutine - let task = pyo3_asyncio::get_event_loop(py).call_method1( + let task = event_loop.as_ref(py).call_method1( "run_in_executor", ( py.None(), @@ -44,18 +61,10 @@ pub(super) async fn test_other_awaitables() -> PyResult<()> { ), )?; - pyo3_asyncio::into_future(task) + pyo3_asyncio::into_future_with_loop(event_loop.as_ref(py), task) })?; fut.await?; Ok(()) } - -pub(super) fn test_init_twice() -> PyResult<()> { - // try_init has already been called in test main - ensure a second call doesn't mess the other - // tests up - Python::with_gil(|py| pyo3_asyncio::try_init(py))?; - - Ok(()) -} diff --git a/pytests/test_async_std_asyncio.rs b/pytests/test_async_std_asyncio.rs index e339940..275cea8 100644 --- a/pytests/test_async_std_asyncio.rs +++ b/pytests/test_async_std_asyncio.rs @@ -3,10 +3,16 @@ mod common; use std::{rc::Rc, time::Duration}; use async_std::task; -use pyo3::{prelude::*, types::PyType, wrap_pyfunction}; +use pyo3::{ + prelude::*, + proc_macro::pymodule, + types::{IntoPyDict, PyType}, + wrap_pyfunction, wrap_pymodule, +}; #[pyfunction] -fn sleep_for(py: Python, secs: &PyAny) -> PyResult { +#[allow(deprecated)] +fn sleep_into_coroutine(py: Python, secs: &PyAny) -> PyResult { let secs = secs.extract()?; pyo3_asyncio::async_std::into_coroutine(py, async move { @@ -15,22 +21,64 @@ fn sleep_for(py: Python, secs: &PyAny) -> PyResult { }) } +#[pyfunction] +fn sleep<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> { + let secs = secs.extract()?; + + pyo3_asyncio::async_std::future_into_py(py, async move { + task::sleep(Duration::from_secs(secs)).await; + Python::with_gil(|py| Ok(py.None())) + }) +} + #[pyo3_asyncio::async_std::test] -async fn test_into_coroutine() -> PyResult<()> { +fn test_into_coroutine() -> PyResult<()> { + #[allow(deprecated)] + Python::with_gil(|py| { + let sleeper_mod = PyModule::new(py, "rust_sleeper")?; + + sleeper_mod.add_wrapped(wrap_pyfunction!(sleep_into_coroutine))?; + + let test_mod = PyModule::from_code( + py, + common::TEST_MOD, + "test_into_coroutine_mod.py", + "test_into_coroutine_mod", + )?; + + let fut = pyo3_asyncio::into_future(test_mod.call_method1( + "sleep_for_1s", + (sleeper_mod.getattr("sleep_into_coroutine")?,), + )?)?; + + pyo3_asyncio::async_std::run_until_complete( + pyo3_asyncio::get_event_loop(py), + async move { + fut.await?; + Ok(()) + }, + )?; + + Ok(()) + }) +} + +#[pyo3_asyncio::async_std::test] +async fn test_future_into_py() -> PyResult<()> { let fut = Python::with_gil(|py| { let sleeper_mod = PyModule::new(py, "rust_sleeper")?; - sleeper_mod.add_wrapped(wrap_pyfunction!(sleep_for))?; + sleeper_mod.add_wrapped(wrap_pyfunction!(sleep))?; let test_mod = PyModule::from_code( py, common::TEST_MOD, - "test_rust_coroutine/test_mod.py", - "test_mod", + "test_future_into_py_mod.py", + "test_future_into_py_mod", )?; - pyo3_asyncio::into_future( - test_mod.call_method1("sleep_for_1s", (sleeper_mod.getattr("sleep_for")?,))?, + pyo3_asyncio::async_std::into_future( + test_mod.call_method1("sleep_for_1s", (sleeper_mod.getattr("sleep")?,))?, ) })?; @@ -47,7 +95,7 @@ async fn test_async_sleep() -> PyResult<()> { task::sleep(Duration::from_secs(1)).await; Python::with_gil(|py| { - pyo3_asyncio::into_future(asyncio.as_ref(py).call_method1("sleep", (1.0,))?) + pyo3_asyncio::async_std::into_future(asyncio.as_ref(py).call_method1("sleep", (1.0,))?) })? .await?; @@ -61,26 +109,51 @@ fn test_blocking_sleep() -> PyResult<()> { #[pyo3_asyncio::async_std::test] async fn test_into_future() -> PyResult<()> { - common::test_into_future().await + common::test_into_future(Python::with_gil(|py| { + pyo3_asyncio::async_std::get_current_loop(py) + .unwrap() + .into() + })) + .await } #[pyo3_asyncio::async_std::test] -async fn test_other_awaitables() -> PyResult<()> { - common::test_other_awaitables().await +async fn test_into_future_0_13() -> PyResult<()> { + common::test_into_future_0_13().await } #[pyo3_asyncio::async_std::test] -fn test_init_twice() -> PyResult<()> { - common::test_init_twice() +async fn test_other_awaitables() -> PyResult<()> { + common::test_other_awaitables(Python::with_gil(|py| { + pyo3_asyncio::async_std::get_current_loop(py) + .unwrap() + .into() + })) + .await } -#[pyo3_asyncio::async_std::main] -async fn main() -> pyo3::PyResult<()> { - pyo3_asyncio::testing::main().await +#[pyo3_asyncio::async_std::test] +async fn test_panic() -> PyResult<()> { + let fut = Python::with_gil(|py| -> PyResult<_> { + pyo3_asyncio::async_std::into_future(pyo3_asyncio::async_std::future_into_py(py, async { + panic!("this panic was intentional!") + })?) + })?; + + match fut.await { + Ok(_) => panic!("coroutine should panic"), + Err(e) => Python::with_gil(|py| { + if e.is_instance::(py) { + Ok(()) + } else { + panic!("expected RustPanic err") + } + }), + } } #[pyo3_asyncio::async_std::test] -async fn test_local_coroutine() -> PyResult<()> { +async fn test_local_future_into_py() -> PyResult<()> { Python::with_gil(|py| { let non_send_secs = Rc::new(1); @@ -89,7 +162,7 @@ async fn test_local_coroutine() -> PyResult<()> { Ok(Python::with_gil(|py| py.None())) })?; - pyo3_asyncio::into_future(py_future) + pyo3_asyncio::async_std::into_future(py_future) })? .await?; @@ -98,16 +171,17 @@ async fn test_local_coroutine() -> PyResult<()> { #[pyo3_asyncio::async_std::test] async fn test_cancel() -> PyResult<()> { - let py_future = Python::with_gil(|py| { - pyo3_asyncio::async_std::into_coroutine(py, async { + let py_future = Python::with_gil(|py| -> PyResult { + Ok(pyo3_asyncio::async_std::future_into_py(py, async { async_std::task::sleep(Duration::from_secs(1)).await; Ok(Python::with_gil(|py| py.None())) - }) + })? + .into()) })?; if let Err(e) = Python::with_gil(|py| -> PyResult<_> { py_future.as_ref(py).call_method0("cancel")?; - pyo3_asyncio::into_future(py_future.as_ref(py)) + pyo3_asyncio::async_std::into_future(py_future.as_ref(py)) })? .await { @@ -126,3 +200,60 @@ async fn test_cancel() -> PyResult<()> { Ok(()) } + +/// This module is implemented in Rust. +#[pymodule] +fn test_mod(_py: Python, m: &PyModule) -> PyResult<()> { + #![allow(deprecated)] + #[pyfn(m, "sleep")] + fn sleep(py: Python) -> PyResult<&PyAny> { + pyo3_asyncio::async_std::future_into_py(py, async move { + async_std::task::sleep(Duration::from_millis(500)).await; + Ok(Python::with_gil(|py| py.None())) + }) + } + + Ok(()) +} + +const TEST_CODE: &str = r#" +async def main(): + return await test_mod.sleep() + +asyncio.run(main()) +"#; + +#[pyo3_asyncio::async_std::test] +fn test_multiple_asyncio_run() -> PyResult<()> { + Python::with_gil(|py| { + pyo3_asyncio::async_std::run(py, async move { + async_std::task::sleep(Duration::from_millis(500)).await; + Ok(()) + })?; + pyo3_asyncio::async_std::run(py, async move { + async_std::task::sleep(Duration::from_millis(500)).await; + Ok(()) + })?; + + let d = [ + ("asyncio", py.import("asyncio")?.into()), + ("test_mod", wrap_pymodule!(test_mod)(py)), + ] + .into_py_dict(py); + + py.run(TEST_CODE, Some(d), None)?; + py.run(TEST_CODE, Some(d), None)?; + Ok(()) + }) +} + +#[allow(deprecated)] +fn main() -> pyo3::PyResult<()> { + pyo3::prepare_freethreaded_python(); + + Python::with_gil(|py| { + // into_coroutine requires the 0.13 API + pyo3_asyncio::try_init(py)?; + pyo3_asyncio::async_std::run(py, pyo3_asyncio::testing::main()) + }) +} diff --git a/pytests/test_async_std_run_forever.rs b/pytests/test_async_std_run_forever.rs index dd2d620..2346466 100644 --- a/pytests/test_async_std_run_forever.rs +++ b/pytests/test_async_std_run_forever.rs @@ -2,39 +2,47 @@ use std::time::Duration; use pyo3::prelude::*; -fn dump_err(py: Python<'_>) -> impl FnOnce(PyErr) + '_ { - move |e| { - // We can't display Python exceptions via std::fmt::Display, - // so print the error here manually. - e.print_and_set_sys_last_vars(py); - } +fn dump_err(py: Python, e: PyErr) { + // We can't display Python exceptions via std::fmt::Display, + // so print the error here manually. + e.print_and_set_sys_last_vars(py); } fn main() { + pyo3::prepare_freethreaded_python(); + Python::with_gil(|py| { - pyo3_asyncio::with_runtime(py, || { - async_std::task::spawn(async move { - async_std::task::sleep(Duration::from_secs(1)).await; - - Python::with_gil(|py| { - let event_loop = pyo3_asyncio::get_event_loop(py); - - event_loop - .call_method1( - "call_soon_threadsafe", - (event_loop.getattr("stop").map_err(dump_err(py)).unwrap(),), - ) - .map_err(dump_err(py)) - .unwrap(); - }) - }); - - pyo3_asyncio::run_forever(py)?; - - println!("test test_run_forever ... ok"); - Ok(()) - }) - .map_err(dump_err(py)) - .unwrap(); + let asyncio = py.import("asyncio")?; + + let event_loop = asyncio.call_method0("new_event_loop")?; + asyncio.call_method1("set_event_loop", (event_loop,))?; + + let event_loop_hdl = PyObject::from(event_loop); + + async_std::task::spawn(async move { + async_std::task::sleep(Duration::from_secs(1)).await; + + Python::with_gil(|py| { + event_loop_hdl + .as_ref(py) + .call_method1( + "call_soon_threadsafe", + (event_loop_hdl + .as_ref(py) + .getattr("stop") + .map_err(|e| dump_err(py, e)) + .unwrap(),), + ) + .map_err(|e| dump_err(py, e)) + .unwrap(); + }) + }); + + event_loop.call_method0("run_forever")?; + + println!("test test_run_forever ... ok"); + Ok(()) }) + .map_err(|e| Python::with_gil(|py| dump_err(py, e))) + .unwrap() } diff --git a/pytests/test_tokio_current_thread_asyncio.rs b/pytests/test_tokio_current_thread_asyncio.rs index 75f1a94..8a6fa98 100644 --- a/pytests/test_tokio_current_thread_asyncio.rs +++ b/pytests/test_tokio_current_thread_asyncio.rs @@ -1,7 +1,24 @@ mod common; mod tokio_asyncio; -#[pyo3_asyncio::tokio::main(flavor = "current_thread")] -async fn main() -> pyo3::PyResult<()> { - pyo3_asyncio::testing::main().await +use pyo3::prelude::*; + +#[allow(deprecated)] +fn main() -> pyo3::PyResult<()> { + pyo3::prepare_freethreaded_python(); + + Python::with_gil(|py| { + // into_coroutine requires the 0.13 API + pyo3_asyncio::try_init(py)?; + + let mut builder = tokio::runtime::Builder::new_current_thread(); + builder.enable_all(); + + pyo3_asyncio::tokio::init(builder); + std::thread::spawn(move || { + pyo3_asyncio::tokio::get_runtime().block_on(futures::future::pending::<()>()); + }); + + pyo3_asyncio::tokio::run(py, pyo3_asyncio::testing::main()) + }) } diff --git a/pytests/test_tokio_current_thread_run_forever.rs b/pytests/test_tokio_current_thread_run_forever.rs index f2a72ad..10944d9 100644 --- a/pytests/test_tokio_current_thread_run_forever.rs +++ b/pytests/test_tokio_current_thread_run_forever.rs @@ -1,7 +1,15 @@ mod tokio_run_forever; fn main() { - pyo3_asyncio::tokio::init_current_thread(); + pyo3::prepare_freethreaded_python(); + + let mut builder = tokio::runtime::Builder::new_current_thread(); + builder.enable_all(); + + pyo3_asyncio::tokio::init(builder); + std::thread::spawn(move || { + pyo3_asyncio::tokio::get_runtime().block_on(futures::future::pending::<()>()); + }); tokio_run_forever::test_main(); } diff --git a/pytests/test_tokio_multi_thread_asyncio.rs b/pytests/test_tokio_multi_thread_asyncio.rs index 11e582e..b83d10d 100644 --- a/pytests/test_tokio_multi_thread_asyncio.rs +++ b/pytests/test_tokio_multi_thread_asyncio.rs @@ -1,7 +1,15 @@ mod common; mod tokio_asyncio; -#[pyo3_asyncio::tokio::main] -async fn main() -> pyo3::PyResult<()> { - pyo3_asyncio::testing::main().await +use pyo3::prelude::*; + +#[allow(deprecated)] +fn main() -> pyo3::PyResult<()> { + pyo3::prepare_freethreaded_python(); + + Python::with_gil(|py| { + // into_coroutine requires the 0.13 API + pyo3_asyncio::try_init(py)?; + pyo3_asyncio::tokio::run(py, pyo3_asyncio::testing::main()) + }) } diff --git a/pytests/test_tokio_multi_thread_run_forever.rs b/pytests/test_tokio_multi_thread_run_forever.rs index 59d1ce3..8f75b16 100644 --- a/pytests/test_tokio_multi_thread_run_forever.rs +++ b/pytests/test_tokio_multi_thread_run_forever.rs @@ -1,7 +1,6 @@ mod tokio_run_forever; fn main() { - pyo3_asyncio::tokio::init_multi_thread(); - + pyo3::prepare_freethreaded_python(); tokio_run_forever::test_main(); } diff --git a/pytests/tokio_asyncio/mod.rs b/pytests/tokio_asyncio/mod.rs index d0003c4..0a2997a 100644 --- a/pytests/tokio_asyncio/mod.rs +++ b/pytests/tokio_asyncio/mod.rs @@ -1,11 +1,17 @@ use std::{rc::Rc, time::Duration}; -use pyo3::{prelude::*, types::PyType, wrap_pyfunction}; +use pyo3::{ + prelude::*, + proc_macro::pymodule, + types::{IntoPyDict, PyType}, + wrap_pyfunction, wrap_pymodule, +}; use crate::common; #[pyfunction] -fn sleep_for(py: Python, secs: &PyAny) -> PyResult { +#[allow(deprecated)] +fn sleep_into_coroutine(py: Python, secs: &PyAny) -> PyResult { let secs = secs.extract()?; pyo3_asyncio::tokio::into_coroutine(py, async move { @@ -14,22 +20,61 @@ fn sleep_for(py: Python, secs: &PyAny) -> PyResult { }) } +#[pyfunction] +fn sleep<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> { + let secs = secs.extract()?; + + pyo3_asyncio::tokio::future_into_py(py, async move { + tokio::time::sleep(Duration::from_secs(secs)).await; + Python::with_gil(|py| Ok(py.None())) + }) +} + +#[pyo3_asyncio::tokio::test] +fn test_into_coroutine() -> PyResult<()> { + #[allow(deprecated)] + Python::with_gil(|py| { + let sleeper_mod = PyModule::new(py, "rust_sleeper")?; + + sleeper_mod.add_wrapped(wrap_pyfunction!(sleep_into_coroutine))?; + + let test_mod = PyModule::from_code( + py, + common::TEST_MOD, + "test_into_coroutine_mod.py", + "test_into_coroutine_mod", + )?; + + let fut = pyo3_asyncio::into_future(test_mod.call_method1( + "sleep_for_1s", + (sleeper_mod.getattr("sleep_into_coroutine")?,), + )?)?; + + pyo3_asyncio::tokio::run_until_complete(pyo3_asyncio::get_event_loop(py), async move { + fut.await?; + Ok(()) + })?; + + Ok(()) + }) +} + #[pyo3_asyncio::tokio::test] -async fn test_into_coroutine() -> PyResult<()> { +async fn test_future_into_py() -> PyResult<()> { let fut = Python::with_gil(|py| { let sleeper_mod = PyModule::new(py, "rust_sleeper")?; - sleeper_mod.add_wrapped(wrap_pyfunction!(sleep_for))?; + sleeper_mod.add_wrapped(wrap_pyfunction!(sleep))?; let test_mod = PyModule::from_code( py, common::TEST_MOD, - "test_rust_coroutine/test_mod.py", - "test_mod", + "test_future_into_py_mod.py", + "test_future_into_py_mod", )?; - pyo3_asyncio::into_future( - test_mod.call_method1("sleep_for_1s", (sleeper_mod.getattr("sleep_for")?,))?, + pyo3_asyncio::tokio::into_future( + test_mod.call_method1("sleep_for_1s", (sleeper_mod.getattr("sleep")?,))?, ) })?; @@ -46,7 +91,7 @@ async fn test_async_sleep() -> PyResult<()> { tokio::time::sleep(Duration::from_secs(1)).await; Python::with_gil(|py| { - pyo3_asyncio::into_future(asyncio.as_ref(py).call_method1("sleep", (1.0,))?) + pyo3_asyncio::tokio::into_future(asyncio.as_ref(py).call_method1("sleep", (1.0,))?) })? .await?; @@ -60,41 +105,40 @@ fn test_blocking_sleep() -> PyResult<()> { #[pyo3_asyncio::tokio::test] async fn test_into_future() -> PyResult<()> { - common::test_into_future().await -} - -#[pyo3_asyncio::tokio::test] -async fn test_other_awaitables() -> PyResult<()> { - common::test_other_awaitables().await + common::test_into_future(Python::with_gil(|py| { + pyo3_asyncio::tokio::get_current_loop(py).unwrap().into() + })) + .await } #[pyo3_asyncio::tokio::test] -fn test_init_twice() -> PyResult<()> { - common::test_init_twice() +async fn test_into_future_0_13() -> PyResult<()> { + common::test_into_future_0_13().await } #[pyo3_asyncio::tokio::test] -fn test_init_tokio_twice() -> PyResult<()> { - // tokio has already been initialized in test main. call these functions to - // make sure they don't cause problems with the other tests. - pyo3_asyncio::tokio::init_multi_thread_once(); - pyo3_asyncio::tokio::init_current_thread_once(); - - Ok(()) +async fn test_other_awaitables() -> PyResult<()> { + common::test_other_awaitables(Python::with_gil(|py| { + pyo3_asyncio::tokio::get_current_loop(py).unwrap().into() + })) + .await } #[pyo3_asyncio::tokio::test] -fn test_local_set_coroutine() -> PyResult<()> { +fn test_local_future_into_py(event_loop: PyObject) -> PyResult<()> { tokio::task::LocalSet::new().block_on(pyo3_asyncio::tokio::get_runtime(), async { Python::with_gil(|py| { let non_send_secs = Rc::new(1); - let py_future = pyo3_asyncio::tokio::local_future_into_py(py, async move { - tokio::time::sleep(Duration::from_secs(*non_send_secs)).await; - Ok(Python::with_gil(|py| py.None())) - })?; + let py_future = pyo3_asyncio::tokio::local_future_into_py_with_loop( + event_loop.as_ref(py), + async move { + tokio::time::sleep(Duration::from_secs(*non_send_secs)).await; + Ok(Python::with_gil(|py| py.None())) + }, + )?; - pyo3_asyncio::into_future(py_future) + pyo3_asyncio::into_future_with_loop(event_loop.as_ref(py), py_future) })? .await?; @@ -102,18 +146,39 @@ fn test_local_set_coroutine() -> PyResult<()> { }) } +#[pyo3_asyncio::tokio::test] +async fn test_panic() -> PyResult<()> { + let fut = Python::with_gil(|py| -> PyResult<_> { + pyo3_asyncio::tokio::into_future(pyo3_asyncio::tokio::future_into_py(py, async { + panic!("this panic was intentional!") + })?) + })?; + + match fut.await { + Ok(_) => panic!("coroutine should panic"), + Err(e) => Python::with_gil(|py| { + if e.is_instance::(py) { + Ok(()) + } else { + panic!("expected RustPanic err") + } + }), + } +} + #[pyo3_asyncio::tokio::test] async fn test_cancel() -> PyResult<()> { - let py_future = Python::with_gil(|py| { - pyo3_asyncio::tokio::into_coroutine(py, async { + let py_future = Python::with_gil(|py| -> PyResult { + Ok(pyo3_asyncio::tokio::future_into_py(py, async { tokio::time::sleep(Duration::from_secs(1)).await; Ok(Python::with_gil(|py| py.None())) - }) + })? + .into()) })?; if let Err(e) = Python::with_gil(|py| -> PyResult<_> { py_future.as_ref(py).call_method0("cancel")?; - pyo3_asyncio::into_future(py_future.as_ref(py)) + pyo3_asyncio::tokio::into_future(py_future.as_ref(py)) })? .await { @@ -132,3 +197,48 @@ async fn test_cancel() -> PyResult<()> { Ok(()) } +/// This module is implemented in Rust. +#[pymodule] +fn test_mod(_py: Python, m: &PyModule) -> PyResult<()> { + #![allow(deprecated)] + #[pyfn(m, "sleep")] + fn sleep(py: Python) -> PyResult<&PyAny> { + pyo3_asyncio::async_std::future_into_py(py, async move { + async_std::task::sleep(Duration::from_millis(500)).await; + Ok(Python::with_gil(|py| py.None())) + }) + } + + Ok(()) +} + +const TEST_CODE: &str = r#" +async def main(): + return await test_mod.sleep() + +asyncio.run(main()) +"#; + +#[pyo3_asyncio::tokio::test] +fn test_multiple_asyncio_run() -> PyResult<()> { + Python::with_gil(|py| { + pyo3_asyncio::tokio::run(py, async move { + tokio::time::sleep(Duration::from_millis(500)).await; + Ok(()) + })?; + pyo3_asyncio::tokio::run(py, async move { + tokio::time::sleep(Duration::from_millis(500)).await; + Ok(()) + })?; + + let d = [ + ("asyncio", py.import("asyncio")?.into()), + ("test_mod", wrap_pymodule!(test_mod)(py)), + ] + .into_py_dict(py); + + py.run(TEST_CODE, Some(d), None)?; + py.run(TEST_CODE, Some(d), None)?; + Ok(()) + }) +} diff --git a/pytests/tokio_run_forever/mod.rs b/pytests/tokio_run_forever/mod.rs index 28fda8c..dab0a08 100644 --- a/pytests/tokio_run_forever/mod.rs +++ b/pytests/tokio_run_forever/mod.rs @@ -2,39 +2,45 @@ use std::time::Duration; use pyo3::prelude::*; -fn dump_err(py: Python<'_>) -> impl FnOnce(PyErr) + '_ { - move |e| { - // We can't display Python exceptions via std::fmt::Display, - // so print the error here manually. - e.print_and_set_sys_last_vars(py); - } +fn dump_err(py: Python<'_>, e: PyErr) { + // We can't display Python exceptions via std::fmt::Display, + // so print the error here manually. + e.print_and_set_sys_last_vars(py); } pub(super) fn test_main() { Python::with_gil(|py| { - pyo3_asyncio::with_runtime(py, || { - pyo3_asyncio::tokio::get_runtime().spawn(async move { - tokio::time::sleep(Duration::from_secs(1)).await; - - Python::with_gil(|py| { - let event_loop = pyo3_asyncio::get_event_loop(py); - - event_loop - .call_method1( - "call_soon_threadsafe", - (event_loop.getattr("stop").map_err(dump_err(py)).unwrap(),), - ) - .map_err(dump_err(py)) - .unwrap(); - }) - }); - - pyo3_asyncio::run_forever(py)?; - - println!("test test_run_forever ... ok"); - Ok(()) - }) - .map_err(dump_err(py)) - .unwrap(); + let asyncio = py.import("asyncio")?; + + let event_loop = asyncio.call_method0("new_event_loop")?; + asyncio.call_method1("set_event_loop", (event_loop,))?; + + let event_loop_hdl = PyObject::from(event_loop); + + pyo3_asyncio::tokio::get_runtime().spawn(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + + Python::with_gil(|py| { + event_loop_hdl + .as_ref(py) + .call_method1( + "call_soon_threadsafe", + (event_loop_hdl + .as_ref(py) + .getattr("stop") + .map_err(|e| dump_err(py, e)) + .unwrap(),), + ) + .map_err(|e| dump_err(py, e)) + .unwrap(); + }) + }); + + event_loop.call_method0("run_forever")?; + + println!("test test_run_forever ... ok"); + Ok(()) }) + .map_err(|e| Python::with_gil(|py| dump_err(py, e))) + .unwrap(); } diff --git a/src/async_std.rs b/src/async_std.rs index 6459031..0612047 100644 --- a/src/async_std.rs +++ b/src/async_std.rs @@ -1,6 +1,7 @@ -use std::future::Future; +use std::{any::Any, cell::RefCell, future::Future, panic::AssertUnwindSafe, pin::Pin}; use async_std::task; +use futures::prelude::*; use pyo3::prelude::*; use crate::generic::{self, JoinError, Runtime, SpawnLocalExt}; @@ -23,32 +24,72 @@ pub use pyo3_asyncio_macros::async_std_main as main; #[cfg(all(feature = "attributes", feature = "testing"))] pub use pyo3_asyncio_macros::async_std_test as test; -struct AsyncStdJoinError; +struct AsyncStdJoinErr(Box); -impl JoinError for AsyncStdJoinError { +impl JoinError for AsyncStdJoinErr { fn is_panic(&self) -> bool { - todo!() + true } } +async_std::task_local! { + static EVENT_LOOP: RefCell> = RefCell::new(None); +} + struct AsyncStdRuntime; impl Runtime for AsyncStdRuntime { - type JoinError = AsyncStdJoinError; - type JoinHandle = task::JoinHandle>; + type JoinError = AsyncStdJoinErr; + type JoinHandle = task::JoinHandle>; + + fn scope(event_loop: PyObject, fut: F) -> Pin + Send>> + where + F: Future + Send + 'static, + { + let old = EVENT_LOOP.with(|c| c.replace(Some(event_loop))); + Box::pin(async move { + let result = fut.await; + EVENT_LOOP.with(|c| c.replace(old)); + result + }) + } + fn get_task_event_loop(py: Python) -> Option<&PyAny> { + match EVENT_LOOP.try_with(|c| { + c.borrow() + .as_ref() + .map(|event_loop| event_loop.clone().into_ref(py)) + }) { + Ok(event_loop) => event_loop, + Err(_) => None, + } + } fn spawn(fut: F) -> Self::JoinHandle where F: Future + Send + 'static, { task::spawn(async move { - fut.await; - Ok(()) + AssertUnwindSafe(fut) + .catch_unwind() + .await + .map_err(|e| AsyncStdJoinErr(e)) }) } } impl SpawnLocalExt for AsyncStdRuntime { + fn scope_local(event_loop: PyObject, fut: F) -> Pin>> + where + F: Future + 'static, + { + let old = EVENT_LOOP.with(|c| c.replace(Some(event_loop))); + Box::pin(async move { + let result = fut.await; + EVENT_LOOP.with(|c| c.replace(old)); + result + }) + } + fn spawn_local(fut: F) -> Self::JoinHandle where F: Future + 'static, @@ -60,15 +101,40 @@ impl SpawnLocalExt for AsyncStdRuntime { } } +/// Set the task local event loop for the given future +pub async fn scope(event_loop: PyObject, fut: F) -> R +where + F: Future + Send + 'static, +{ + AsyncStdRuntime::scope(event_loop, fut).await +} + +/// Set the task local event loop for the given !Send future +pub async fn scope_local(event_loop: PyObject, fut: F) -> R +where + F: Future + 'static, +{ + AsyncStdRuntime::scope_local(event_loop, fut).await +} + +/// Get the current event loop from either Python or Rust async task local context +/// +/// This function first checks if the runtime has a task-local reference to the Python event loop. +/// If not, it calls [`get_running_loop`](`crate::get_running_loop`) to get the event loop +/// associated with the current OS thread. +pub fn get_current_loop(py: Python) -> PyResult<&PyAny> { + generic::get_current_loop::(py) +} + /// Run the event loop until the given Future completes /// /// The event loop runs until the given future is complete. /// /// After this function returns, the event loop can be resumed with either [`run_until_complete`] or -/// [`crate::run_forever`] +/// [`run_forever`](`crate::run_forever`) /// /// # Arguments -/// * `py` - The current PyO3 GIL guard +/// * `event_loop` - The Python event loop that should run the future /// * `fut` - The future to drive to completion /// /// # Examples @@ -78,9 +144,12 @@ impl SpawnLocalExt for AsyncStdRuntime { /// # /// # use pyo3::prelude::*; /// # +/// # pyo3::prepare_freethreaded_python(); +/// # /// # Python::with_gil(|py| { /// # pyo3_asyncio::with_runtime(py, || { -/// pyo3_asyncio::async_std::run_until_complete(py, async move { +/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?; +/// pyo3_asyncio::async_std::run_until_complete(event_loop, async move { /// async_std::task::sleep(Duration::from_secs(1)).await; /// Ok(()) /// })?; @@ -92,11 +161,47 @@ impl SpawnLocalExt for AsyncStdRuntime { /// # .unwrap(); /// # }); /// ``` -pub fn run_until_complete(py: Python, fut: F) -> PyResult<()> +pub fn run_until_complete(event_loop: &PyAny, fut: F) -> PyResult<()> +where + F: Future> + Send + 'static, +{ + generic::run_until_complete::(event_loop, fut) +} + +/// Run the event loop until the given Future completes +/// +/// # Arguments +/// * `py` - The current PyO3 GIL guard +/// * `fut` - The future to drive to completion +/// +/// # Examples +/// +/// ```no_run +/// # use std::time::Duration; +/// # +/// # use pyo3::prelude::*; +/// # +/// fn main() { +/// // call this or use pyo3 0.14 "auto-initialize" feature +/// pyo3::prepare_freethreaded_python(); +/// +/// Python::with_gil(|py| { +/// pyo3_asyncio::async_std::run(py, async move { +/// async_std::task::sleep(Duration::from_secs(1)).await; +/// Ok(()) +/// }) +/// .map_err(|e| { +/// e.print_and_set_sys_last_vars(py); +/// }) +/// .unwrap(); +/// }) +/// } +/// ``` +pub fn run(py: Python, fut: F) -> PyResult<()> where F: Future> + Send + 'static, { - generic::run_until_complete::(py, fut) + generic::run::(py, fut) } /// Convert a Rust Future into a Python awaitable @@ -123,6 +228,11 @@ where /// }) /// } /// ``` +#[deprecated( + since = "0.14.0", + note = "Use the pyo3_asyncio::async_std::future_into_py instead\n\t\t(see the [migration guide](https://github.com/awestlake87/pyo3-asyncio/#migrating-from-013-to-014) for more details)" +)] +#[allow(deprecated)] pub fn into_coroutine(py: Python, fut: F) -> PyResult where F: Future> + Send + 'static, @@ -130,7 +240,40 @@ where generic::into_coroutine::(py, fut) } -/// Convert a `!Send` Rust Future into a Python awaitable +/// Convert a Rust Future into a Python awaitable +/// +/// # Arguments +/// * `event_loop` - The Python event loop that the awaitable should be attached to +/// * `fut` - The Rust future to be converted +/// +/// # Examples +/// +/// ``` +/// use std::time::Duration; +/// +/// use pyo3::prelude::*; +/// +/// /// Awaitable sleep function +/// #[pyfunction] +/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> { +/// let secs = secs.extract()?; +/// pyo3_asyncio::async_std::future_into_py_with_loop( +/// pyo3_asyncio::async_std::get_current_loop(py)?, +/// async move { +/// async_std::task::sleep(Duration::from_secs(secs)).await; +/// Python::with_gil(|py| Ok(py.None())) +/// } +/// ) +/// } +/// ``` +pub fn future_into_py_with_loop(event_loop: &PyAny, fut: F) -> PyResult<&PyAny> +where + F: Future> + Send + 'static, +{ + generic::future_into_py_with_loop::(event_loop, fut) +} + +/// Convert a Rust Future into a Python awaitable /// /// # Arguments /// * `py` - The current PyO3 GIL guard @@ -139,6 +282,36 @@ where /// # Examples /// /// ``` +/// use std::time::Duration; +/// +/// use pyo3::prelude::*; +/// +/// /// Awaitable sleep function +/// #[pyfunction] +/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> { +/// let secs = secs.extract()?; +/// pyo3_asyncio::async_std::future_into_py(py, async move { +/// async_std::task::sleep(Duration::from_secs(secs)).await; +/// Python::with_gil(|py| Ok(py.None())) +/// }) +/// } +/// ``` +pub fn future_into_py(py: Python, fut: F) -> PyResult<&PyAny> +where + F: Future> + Send + 'static, +{ + generic::future_into_py::(py, fut) +} + +/// Convert a `!Send` Rust Future into a Python awaitable +/// +/// # Arguments +/// * `event_loop` - The Python event loop that the awaitable should be attached to +/// * `fut` - The Rust future to be converted +/// +/// # Examples +/// +/// ``` /// use std::{rc::Rc, time::Duration}; /// /// use pyo3::prelude::*; @@ -146,9 +319,56 @@ where /// /// Awaitable non-send sleep function /// #[pyfunction] /// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> { -/// // Rc is non-send so it cannot be passed into pyo3_asyncio::tokio::into_coroutine +/// // Rc is non-send so it cannot be passed into pyo3_asyncio::async_std::future_into_py /// let secs = Rc::new(secs); +/// Ok(pyo3_asyncio::async_std::local_future_into_py_with_loop( +/// pyo3_asyncio::async_std::get_current_loop(py)?, +/// async move { +/// async_std::task::sleep(Duration::from_secs(*secs)).await; +/// Python::with_gil(|py| Ok(py.None())) +/// } +/// )?.into()) +/// } +/// +/// # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))] +/// #[pyo3_asyncio::async_std::main] +/// async fn main() -> PyResult<()> { +/// Python::with_gil(|py| { +/// let py_future = sleep_for(py, 1)?; +/// pyo3_asyncio::async_std::into_future(py_future) +/// })? +/// .await?; /// +/// Ok(()) +/// } +/// # #[cfg(not(all(feature = "async-std-runtime", feature = "attributes")))] +/// # fn main() {} +/// ``` +pub fn local_future_into_py_with_loop(event_loop: &PyAny, fut: F) -> PyResult<&PyAny> +where + F: Future> + 'static, +{ + generic::local_future_into_py_with_loop::(event_loop, fut) +} + +/// Convert a `!Send` Rust Future into a Python awaitable +/// +/// # Arguments +/// * `py` - The current PyO3 GIL guard +/// * `fut` - The Rust future to be converted +/// +/// # Examples +/// +/// ``` +/// use std::{rc::Rc, time::Duration}; +/// +/// use pyo3::prelude::*; +/// +/// /// Awaitable non-send sleep function +/// #[pyfunction] +/// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> { +/// // Rc is non-send so it cannot be passed into pyo3_asyncio::async_std::future_into_py +/// let secs = Rc::new(secs); /// pyo3_asyncio::async_std::local_future_into_py(py, async move { /// async_std::task::sleep(Duration::from_secs(*secs)).await; /// Python::with_gil(|py| Ok(py.None())) @@ -159,8 +379,8 @@ where /// #[pyo3_asyncio::async_std::main] /// async fn main() -> PyResult<()> { /// Python::with_gil(|py| { -/// let py_future = sleep_for(py, 1)?; -/// pyo3_asyncio::into_future(py_future) +/// let py_future = sleep_for(py, 1)?; +/// pyo3_asyncio::async_std::into_future(py_future) /// })? /// .await?; /// @@ -175,3 +395,55 @@ where { generic::local_future_into_py::(py, fut) } + +/// Convert a Python `awaitable` into a Rust Future +/// +/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A +/// completion handler sends the result of this Task through a +/// `futures::channel::oneshot::Sender>` and the future returned by this function +/// simply awaits the result through the `futures::channel::oneshot::Receiver>`. +/// +/// # Arguments +/// * `awaitable` - The Python `awaitable` to be converted +/// +/// # Examples +/// +/// ``` +/// use std::time::Duration; +/// +/// use pyo3::prelude::*; +/// +/// const PYTHON_CODE: &'static str = r#" +/// import asyncio +/// +/// async def py_sleep(duration): +/// await asyncio.sleep(duration) +/// "#; +/// +/// async fn py_sleep(seconds: f32) -> PyResult<()> { +/// let test_mod = Python::with_gil(|py| -> PyResult { +/// Ok( +/// PyModule::from_code( +/// py, +/// PYTHON_CODE, +/// "test_into_future/test_mod.py", +/// "test_mod" +/// )? +/// .into() +/// ) +/// })?; +/// +/// Python::with_gil(|py| { +/// pyo3_asyncio::async_std::into_future( +/// test_mod +/// .call_method1(py, "py_sleep", (seconds.into_py(py),))? +/// .as_ref(py), +/// ) +/// })? +/// .await?; +/// Ok(()) +/// } +/// ``` +pub fn into_future(awaitable: &PyAny) -> PyResult> + Send> { + generic::into_future::(awaitable) +} diff --git a/src/err.rs b/src/err.rs new file mode 100644 index 0000000..95e5d15 --- /dev/null +++ b/src/err.rs @@ -0,0 +1,9 @@ +// FIXME - is there a way to document custom PyO3 exceptions? +#[allow(missing_docs)] +mod exceptions { + use pyo3::{create_exception, exceptions::PyException}; + + create_exception!(pyo3_asyncio, RustPanic, PyException); +} + +pub use exceptions::RustPanic; diff --git a/src/generic.rs b/src/generic.rs index d6f98a2..6f15537 100644 --- a/src/generic.rs +++ b/src/generic.rs @@ -1,8 +1,12 @@ -use std::future::Future; +use std::{future::Future, pin::Pin}; -use pyo3::{exceptions::PyException, prelude::*}; +use pyo3::{prelude::*, PyNativeType}; -use crate::{dump_err, get_event_loop, CALL_SOON, CREATE_FUTURE, EXPECT_INIT}; +#[allow(deprecated)] +use crate::{ + asyncio, call_soon_threadsafe, close, create_future, dump_err, err::RustPanic, get_event_loop, + get_running_loop, into_future_with_loop, +}; /// Generic utilities for a JoinError pub trait JoinError { @@ -17,6 +21,13 @@ pub trait Runtime { /// A future that completes with the result of the spawned task type JoinHandle: Future> + Send; + /// Set the task local event loop for the given future + fn scope(event_loop: PyObject, fut: F) -> Pin + Send>> + where + F: Future + Send + 'static; + /// Get the task local event loop for the current task + fn get_task_event_loop(py: Python) -> Option<&PyAny>; + /// Spawn a future onto this runtime's event loop fn spawn(fut: F) -> Self::JoinHandle where @@ -25,19 +36,40 @@ pub trait Runtime { /// Extension trait for async/await runtimes that support spawning local tasks pub trait SpawnLocalExt: Runtime { + /// Set the task local event loop for the given !Send future + fn scope_local(event_loop: PyObject, fut: F) -> Pin>> + where + F: Future + 'static; + /// Spawn a !Send future onto this runtime's event loop fn spawn_local(fut: F) -> Self::JoinHandle where F: Future + 'static; } +/// Get the current event loop from either Python or Rust async task local context +/// +/// This function first checks if the runtime has a task-local reference to the Python event loop. +/// If not, it calls [`get_running_loop`](crate::get_running_loop`) to get the event loop associated +/// with the current OS thread. +pub fn get_current_loop(py: Python) -> PyResult<&PyAny> +where + R: Runtime, +{ + if let Some(event_loop) = R::get_task_event_loop(py) { + Ok(event_loop) + } else { + get_running_loop(py) + } +} + /// Run the event loop until the given Future completes /// /// After this function returns, the event loop can be resumed with either [`run_until_complete`] or -/// [`crate::run_forever`] +/// [`run_forever`](`crate::run_forever`) /// /// # Arguments -/// * `py` - The current PyO3 GIL guard +/// * `event_loop` - The Python event loop that should run the future /// * `fut` - The future to drive to completion /// /// # Examples @@ -70,6 +102,16 @@ pub trait SpawnLocalExt: Runtime { /// # impl Runtime for MyCustomRuntime { /// # type JoinError = MyCustomJoinError; /// # type JoinHandle = MyCustomJoinHandle; +/// # +/// # fn scope(_event_loop: PyObject, fut: F) -> Pin + Send>> +/// # where +/// # F: Future + Send + 'static +/// # { +/// # unreachable!() +/// # } +/// # fn get_task_event_loop(py: Python) -> Option<&PyAny> { +/// # unreachable!() +/// # } /// # /// # fn spawn(fut: F) -> Self::JoinHandle /// # where @@ -85,8 +127,9 @@ pub trait SpawnLocalExt: Runtime { /// # /// # Python::with_gil(|py| { /// # pyo3_asyncio::with_runtime(py, || { +/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?; /// # #[cfg(feature = "tokio-runtime")] -/// pyo3_asyncio::generic::run_until_complete::(py, async move { +/// pyo3_asyncio::generic::run_until_complete::(event_loop, async move { /// tokio::time::sleep(Duration::from_secs(1)).await; /// Ok(()) /// })?; @@ -98,50 +141,237 @@ pub trait SpawnLocalExt: Runtime { /// # .unwrap(); /// # }); /// ``` -pub fn run_until_complete(py: Python, fut: F) -> PyResult<()> +pub fn run_until_complete(event_loop: &PyAny, fut: F) -> PyResult<()> where R: Runtime, F: Future> + Send + 'static, { - let coro = into_coroutine::(py, async move { + let coro = future_into_py_with_loop::(event_loop, async move { fut.await?; Ok(Python::with_gil(|py| py.None())) })?; - get_event_loop(py).call_method1("run_until_complete", (coro,))?; + event_loop.call_method1("run_until_complete", (coro,))?; Ok(()) } +/// Run the event loop until the given Future completes +/// +/// # Arguments +/// * `py` - The current PyO3 GIL guard +/// * `fut` - The future to drive to completion +/// +/// # Examples +/// +/// ```no_run +/// # use std::{task::{Context, Poll}, pin::Pin, future::Future}; +/// # +/// # use pyo3_asyncio::generic::{JoinError, Runtime}; +/// # +/// # struct MyCustomJoinError; +/// # +/// # impl JoinError for MyCustomJoinError { +/// # fn is_panic(&self) -> bool { +/// # unreachable!() +/// # } +/// # } +/// # +/// # struct MyCustomJoinHandle; +/// # +/// # impl Future for MyCustomJoinHandle { +/// # type Output = Result<(), MyCustomJoinError>; +/// # +/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { +/// # unreachable!() +/// # } +/// # } +/// # +/// # struct MyCustomRuntime; +/// # +/// # impl Runtime for MyCustomRuntime { +/// # type JoinError = MyCustomJoinError; +/// # type JoinHandle = MyCustomJoinHandle; +/// # +/// # fn scope(_event_loop: PyObject, fut: F) -> Pin + Send>> +/// # where +/// # F: Future + Send + 'static +/// # { +/// # unreachable!() +/// # } +/// # fn get_task_event_loop(py: Python) -> Option<&PyAny> { +/// # unreachable!() +/// # } +/// # +/// # fn spawn(fut: F) -> Self::JoinHandle +/// # where +/// # F: Future + Send + 'static +/// # { +/// # unreachable!() +/// # } +/// # } +/// # +/// # use std::time::Duration; +/// # async fn custom_sleep(_duration: Duration) { } +/// # +/// # use pyo3::prelude::*; +/// # +/// fn main() { +/// Python::with_gil(|py| { +/// pyo3_asyncio::generic::run::(py, async move { +/// custom_sleep(Duration::from_secs(1)).await; +/// Ok(()) +/// }) +/// .map_err(|e| { +/// e.print_and_set_sys_last_vars(py); +/// }) +/// .unwrap(); +/// }) +/// } +/// ``` +pub fn run(py: Python, fut: F) -> PyResult<()> +where + R: Runtime, + F: Future> + Send + 'static, +{ + let event_loop = asyncio(py)?.call_method0("new_event_loop")?; + + let result = run_until_complete::(event_loop, fut); + + close(event_loop)?; + + result +} + fn cancelled(future: &PyAny) -> PyResult { future.getattr("cancelled")?.call0()?.is_true() } -fn set_result(py: Python, future: &PyAny, result: PyResult) -> PyResult<()> { +fn set_result(event_loop: &PyAny, future: &PyAny, result: PyResult) -> PyResult<()> { match result { Ok(val) => { let set_result = future.getattr("set_result")?; - CALL_SOON - .get() - .expect(EXPECT_INIT) - .call1(py, (set_result, val))?; + call_soon_threadsafe(event_loop, (set_result, val))?; } Err(err) => { let set_exception = future.getattr("set_exception")?; - CALL_SOON - .get() - .expect(EXPECT_INIT) - .call1(py, (set_exception, err))?; + call_soon_threadsafe(event_loop, (set_exception, err))?; } } Ok(()) } +/// Convert a Python `awaitable` into a Rust Future +/// +/// This function simply forwards the future and the `event_loop` returned by [`get_current_loop`] +/// to [`into_future_with_loop`](`crate::into_future_with_loop`). See +/// [`into_future_with_loop`](`crate::into_future_with_loop`) for more details. +/// +/// # Arguments +/// * `awaitable` - The Python `awaitable` to be converted +/// +/// # Examples +/// +/// ```no_run +/// # use std::{pin::Pin, future::Future, task::{Context, Poll}, time::Duration}; +/// # +/// # use pyo3::prelude::*; +/// # +/// # use pyo3_asyncio::generic::{JoinError, Runtime}; +/// # +/// # struct MyCustomJoinError; +/// # +/// # impl JoinError for MyCustomJoinError { +/// # fn is_panic(&self) -> bool { +/// # unreachable!() +/// # } +/// # } +/// # +/// # struct MyCustomJoinHandle; +/// # +/// # impl Future for MyCustomJoinHandle { +/// # type Output = Result<(), MyCustomJoinError>; +/// # +/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { +/// # unreachable!() +/// # } +/// # } +/// # +/// # struct MyCustomRuntime; +/// # +/// # impl MyCustomRuntime { +/// # async fn sleep(_: Duration) { +/// # unreachable!() +/// # } +/// # } +/// # +/// # impl Runtime for MyCustomRuntime { +/// # type JoinError = MyCustomJoinError; +/// # type JoinHandle = MyCustomJoinHandle; +/// # +/// # fn scope(_event_loop: PyObject, fut: F) -> Pin + Send>> +/// # where +/// # F: Future + Send + 'static +/// # { +/// # unreachable!() +/// # } +/// # fn get_task_event_loop(py: Python) -> Option<&PyAny> { +/// # unreachable!() +/// # } +/// # +/// # fn spawn(fut: F) -> Self::JoinHandle +/// # where +/// # F: Future + Send + 'static +/// # { +/// # unreachable!() +/// # } +/// # } +/// # +/// const PYTHON_CODE: &'static str = r#" +/// import asyncio +/// +/// async def py_sleep(duration): +/// await asyncio.sleep(duration) +/// "#; +/// +/// async fn py_sleep(seconds: f32) -> PyResult<()> { +/// let test_mod = Python::with_gil(|py| -> PyResult { +/// Ok( +/// PyModule::from_code( +/// py, +/// PYTHON_CODE, +/// "test_into_future/test_mod.py", +/// "test_mod" +/// )? +/// .into() +/// ) +/// })?; +/// +/// Python::with_gil(|py| { +/// pyo3_asyncio::generic::into_future::( +/// test_mod +/// .call_method1(py, "py_sleep", (seconds.into_py(py),))? +/// .as_ref(py), +/// ) +/// })? +/// .await?; +/// Ok(()) +/// } +/// ``` +pub fn into_future( + awaitable: &PyAny, +) -> PyResult> + Send> +where + R: Runtime, +{ + into_future_with_loop(get_current_loop::(awaitable.py())?, awaitable) +} + /// Convert a Rust Future into a Python awaitable with a generic runtime /// /// # Arguments -/// * `py` - The current PyO3 GIL guard +/// * `event_loop` - The Python event loop that the awaitable should be attached to /// * `fut` - The Rust future to be converted /// /// # Examples @@ -180,6 +410,16 @@ fn set_result(py: Python, future: &PyAny, result: PyResult) -> PyResul /// # impl Runtime for MyCustomRuntime { /// # type JoinError = MyCustomJoinError; /// # type JoinHandle = MyCustomJoinHandle; +/// # +/// # fn scope(_event_loop: PyObject, fut: F) -> Pin + Send>> +/// # where +/// # F: Future + Send + 'static +/// # { +/// # unreachable!() +/// # } +/// # fn get_task_event_loop(py: Python) -> Option<&PyAny> { +/// # unreachable!() +/// # } /// # /// # fn spawn(fut: F) -> Self::JoinHandle /// # where @@ -195,27 +435,33 @@ fn set_result(py: Python, future: &PyAny, result: PyResult) -> PyResul /// /// /// Awaitable sleep function /// #[pyfunction] -/// fn sleep_for(py: Python, secs: &PyAny) -> PyResult { +/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> { /// let secs = secs.extract()?; -/// -/// pyo3_asyncio::generic::into_coroutine::(py, async move { -/// MyCustomRuntime::sleep(Duration::from_secs(secs)).await; -/// Python::with_gil(|py| Ok(py.None())) -/// }) +/// pyo3_asyncio::generic::future_into_py_with_loop::( +/// pyo3_asyncio::generic::get_current_loop::(py)?, +/// async move { +/// MyCustomRuntime::sleep(Duration::from_secs(secs)).await; +/// Python::with_gil(|py| Ok(py.None())) +/// } +/// ) /// } /// ``` -pub fn into_coroutine(py: Python, fut: F) -> PyResult +pub fn future_into_py_with_loop(event_loop: &PyAny, fut: F) -> PyResult<&PyAny> where R: Runtime, F: Future> + Send + 'static, { - let future_rx = CREATE_FUTURE.get().expect(EXPECT_INIT).call0(py)?; - let future_tx1 = future_rx.clone(); - let future_tx2 = future_rx.clone(); + let future_rx = create_future(event_loop)?; + let future_tx1 = PyObject::from(future_rx); + let future_tx2 = future_tx1.clone(); + + let event_loop = PyObject::from(event_loop); R::spawn(async move { + let event_loop2 = event_loop.clone(); + if let Err(e) = R::spawn(async move { - let result = fut.await; + let result = R::scope(event_loop2.clone(), fut).await; Python::with_gil(move |py| { if cancelled(future_tx1.as_ref(py)) @@ -225,7 +471,8 @@ where return; } - let _ = set_result(py, future_tx1.as_ref(py), result).map_err(dump_err(py)); + let _ = set_result(event_loop2.as_ref(py), future_tx1.as_ref(py), result) + .map_err(dump_err(py)); }); }) .await @@ -240,9 +487,9 @@ where } let _ = set_result( - py, + event_loop.as_ref(py), future_tx2.as_ref(py), - Err(PyException::new_err("rust future panicked")), + Err(RustPanic::new_err("rust future panicked")), ) .map_err(dump_err(py)); }); @@ -253,7 +500,7 @@ where Ok(future_rx) } -/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime +/// Convert a Rust Future into a Python awaitable with a generic runtime /// /// # Arguments /// * `py` - The current PyO3 GIL guard @@ -264,6 +511,177 @@ where /// ```no_run /// # use std::{task::{Context, Poll}, pin::Pin, future::Future}; /// # +/// # use pyo3_asyncio::generic::{JoinError, Runtime}; +/// # +/// # struct MyCustomJoinError; +/// # +/// # impl JoinError for MyCustomJoinError { +/// # fn is_panic(&self) -> bool { +/// # unreachable!() +/// # } +/// # } +/// # +/// # struct MyCustomJoinHandle; +/// # +/// # impl Future for MyCustomJoinHandle { +/// # type Output = Result<(), MyCustomJoinError>; +/// # +/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { +/// # unreachable!() +/// # } +/// # } +/// # +/// # struct MyCustomRuntime; +/// # +/// # impl MyCustomRuntime { +/// # async fn sleep(_: Duration) { +/// # unreachable!() +/// # } +/// # } +/// # +/// # impl Runtime for MyCustomRuntime { +/// # type JoinError = MyCustomJoinError; +/// # type JoinHandle = MyCustomJoinHandle; +/// # +/// # fn scope(_event_loop: PyObject, fut: F) -> Pin + Send>> +/// # where +/// # F: Future + Send + 'static +/// # { +/// # unreachable!() +/// # } +/// # fn get_task_event_loop(py: Python) -> Option<&PyAny> { +/// # unreachable!() +/// # } +/// # +/// # fn spawn(fut: F) -> Self::JoinHandle +/// # where +/// # F: Future + Send + 'static +/// # { +/// # unreachable!() +/// # } +/// # } +/// # +/// use std::time::Duration; +/// +/// use pyo3::prelude::*; +/// +/// /// Awaitable sleep function +/// #[pyfunction] +/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> { +/// let secs = secs.extract()?; +/// pyo3_asyncio::generic::future_into_py::(py, async move { +/// MyCustomRuntime::sleep(Duration::from_secs(secs)).await; +/// Python::with_gil(|py| Ok(py.None())) +/// }) +/// } +/// ``` +pub fn future_into_py(py: Python, fut: F) -> PyResult<&PyAny> +where + R: Runtime, + F: Future> + Send + 'static, +{ + future_into_py_with_loop::(get_current_loop::(py)?, fut) +} + +/// Convert a Rust Future into a Python awaitable with a generic runtime +/// +/// # Arguments +/// * `py` - The current PyO3 GIL guard +/// * `fut` - The Rust future to be converted +/// +/// # Examples +/// +/// ```no_run +/// # use std::{task::{Context, Poll}, pin::Pin, future::Future}; +/// # +/// # use pyo3_asyncio::generic::{JoinError, Runtime}; +/// # +/// # struct MyCustomJoinError; +/// # +/// # impl JoinError for MyCustomJoinError { +/// # fn is_panic(&self) -> bool { +/// # unreachable!() +/// # } +/// # } +/// # +/// # struct MyCustomJoinHandle; +/// # +/// # impl Future for MyCustomJoinHandle { +/// # type Output = Result<(), MyCustomJoinError>; +/// # +/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { +/// # unreachable!() +/// # } +/// # } +/// # +/// # struct MyCustomRuntime; +/// # +/// # impl MyCustomRuntime { +/// # async fn sleep(_: Duration) { +/// # unreachable!() +/// # } +/// # } +/// # +/// # impl Runtime for MyCustomRuntime { +/// # type JoinError = MyCustomJoinError; +/// # type JoinHandle = MyCustomJoinHandle; +/// # +/// # fn scope(_event_loop: PyObject, fut: F) -> Pin + Send>> +/// # where +/// # F: Future + Send + 'static +/// # { +/// # unreachable!() +/// # } +/// # fn get_task_event_loop(py: Python) -> Option<&PyAny> { +/// # unreachable!() +/// # } +/// # +/// # fn spawn(fut: F) -> Self::JoinHandle +/// # where +/// # F: Future + Send + 'static +/// # { +/// # unreachable!() +/// # } +/// # } +/// # +/// use std::time::Duration; +/// +/// use pyo3::prelude::*; +/// +/// /// Awaitable sleep function +/// #[pyfunction] +/// fn sleep_for(py: Python, secs: &PyAny) -> PyResult { +/// let secs = secs.extract()?; +/// pyo3_asyncio::generic::into_coroutine::(py, async move { +/// MyCustomRuntime::sleep(Duration::from_secs(secs)).await; +/// Python::with_gil(|py| Ok(py.None())) +/// }) +/// } +/// ``` +#[deprecated( + since = "0.14.0", + note = "Use the pyo3_asyncio::generic::future_into_py instead\n\t\t(see the [migration guide](https://github.com/awestlake87/pyo3-asyncio/#migrating-from-013-to-014) for more details)" +)] +#[allow(deprecated)] +pub fn into_coroutine(py: Python, fut: F) -> PyResult +where + R: Runtime, + F: Future> + Send + 'static, +{ + Ok(future_into_py_with_loop::(get_event_loop(py), fut)?.into()) +} + +/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime +/// +/// # Arguments +/// * `event_loop` - The Python event loop that the awaitable should be attached to +/// * `fut` - The Rust future to be converted +/// +/// # Examples +/// +/// ```no_run +/// # use std::{task::{Context, Poll}, pin::Pin, future::Future}; +/// # /// # use pyo3_asyncio::generic::{JoinError, SpawnLocalExt, Runtime}; /// # /// # struct MyCustomJoinError; @@ -295,6 +713,16 @@ where /// # impl Runtime for MyCustomRuntime { /// # type JoinError = MyCustomJoinError; /// # type JoinHandle = MyCustomJoinHandle; +/// # +/// # fn scope(_event_loop: PyObject, fut: F) -> Pin + Send>> +/// # where +/// # F: Future + Send + 'static +/// # { +/// # unreachable!() +/// # } +/// # fn get_task_event_loop(py: Python) -> Option<&PyAny> { +/// # unreachable!() +/// # } /// # /// # fn spawn(fut: F) -> Self::JoinHandle /// # where @@ -305,6 +733,13 @@ where /// # } /// # /// # impl SpawnLocalExt for MyCustomRuntime { +/// # fn scope_local(_event_loop: PyObject, fut: F) -> Pin>> +/// # where +/// # F: Future + 'static +/// # { +/// # unreachable!() +/// # } +/// # /// # fn spawn_local(fut: F) -> Self::JoinHandle /// # where /// # F: Future + 'static @@ -313,31 +748,41 @@ where /// # } /// # } /// # -/// use std::time::Duration; +/// use std::{rc::Rc, time::Duration}; /// /// use pyo3::prelude::*; /// /// /// Awaitable sleep function /// #[pyfunction] /// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> { -/// pyo3_asyncio::generic::local_future_into_py::(py, async move { -/// MyCustomRuntime::sleep(Duration::from_secs(secs)).await; -/// Python::with_gil(|py| Ok(py.None())) -/// }) +/// // Rc is !Send so it cannot be passed into pyo3_asyncio::generic::future_into_py +/// let secs = Rc::new(secs); +/// +/// pyo3_asyncio::generic::local_future_into_py_with_loop::( +/// pyo3_asyncio::get_running_loop(py)?, +/// async move { +/// MyCustomRuntime::sleep(Duration::from_secs(*secs)).await; +/// Python::with_gil(|py| Ok(py.None())) +/// } +/// ) /// } /// ``` -pub fn local_future_into_py(py: Python, fut: F) -> PyResult<&PyAny> +pub fn local_future_into_py_with_loop(event_loop: &PyAny, fut: F) -> PyResult<&PyAny> where R: SpawnLocalExt, F: Future> + 'static, { - let future_rx = CREATE_FUTURE.get().expect(EXPECT_INIT).as_ref(py).call0()?; + let future_rx = create_future(event_loop)?; let future_tx1 = PyObject::from(future_rx); let future_tx2 = future_tx1.clone(); + let event_loop = PyObject::from(event_loop); + R::spawn_local(async move { + let event_loop2 = event_loop.clone(); + if let Err(e) = R::spawn_local(async move { - let result = fut.await; + let result = R::scope_local(event_loop2.clone(), fut).await; Python::with_gil(move |py| { if cancelled(future_tx1.as_ref(py)) @@ -347,7 +792,8 @@ where return; } - let _ = set_result(py, future_tx1.as_ref(py), result).map_err(dump_err(py)); + let _ = set_result(event_loop2.as_ref(py), future_tx1.as_ref(py), result) + .map_err(dump_err(py)); }); }) .await @@ -362,9 +808,9 @@ where } let _ = set_result( - py, + event_loop.as_ref(py), future_tx2.as_ref(py), - Err(PyException::new_err("rust future panicked")), + Err(RustPanic::new_err("Rust future panicked")), ) .map_err(dump_err(py)); }); @@ -374,3 +820,104 @@ where Ok(future_rx) } + +/// Convert a `!Send` Rust Future into a Python awaitable with a generic runtime +/// +/// # Arguments +/// * `py` - The current PyO3 GIL guard +/// * `fut` - The Rust future to be converted +/// +/// # Examples +/// +/// ```no_run +/// # use std::{task::{Context, Poll}, pin::Pin, future::Future}; +/// # +/// # use pyo3_asyncio::generic::{JoinError, SpawnLocalExt, Runtime}; +/// # +/// # struct MyCustomJoinError; +/// # +/// # impl JoinError for MyCustomJoinError { +/// # fn is_panic(&self) -> bool { +/// # unreachable!() +/// # } +/// # } +/// # +/// # struct MyCustomJoinHandle; +/// # +/// # impl Future for MyCustomJoinHandle { +/// # type Output = Result<(), MyCustomJoinError>; +/// # +/// # fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { +/// # unreachable!() +/// # } +/// # } +/// # +/// # struct MyCustomRuntime; +/// # +/// # impl MyCustomRuntime { +/// # async fn sleep(_: Duration) { +/// # unreachable!() +/// # } +/// # } +/// # +/// # impl Runtime for MyCustomRuntime { +/// # type JoinError = MyCustomJoinError; +/// # type JoinHandle = MyCustomJoinHandle; +/// # +/// # fn scope(_event_loop: PyObject, fut: F) -> Pin + Send>> +/// # where +/// # F: Future + Send + 'static +/// # { +/// # unreachable!() +/// # } +/// # fn get_task_event_loop(py: Python) -> Option<&PyAny> { +/// # unreachable!() +/// # } +/// # +/// # fn spawn(fut: F) -> Self::JoinHandle +/// # where +/// # F: Future + Send + 'static +/// # { +/// # unreachable!() +/// # } +/// # } +/// # +/// # impl SpawnLocalExt for MyCustomRuntime { +/// # fn scope_local(_event_loop: PyObject, fut: F) -> Pin>> +/// # where +/// # F: Future + 'static +/// # { +/// # unreachable!() +/// # } +/// # +/// # fn spawn_local(fut: F) -> Self::JoinHandle +/// # where +/// # F: Future + 'static +/// # { +/// # unreachable!() +/// # } +/// # } +/// # +/// use std::{rc::Rc, time::Duration}; +/// +/// use pyo3::prelude::*; +/// +/// /// Awaitable sleep function +/// #[pyfunction] +/// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> { +/// // Rc is !Send so it cannot be passed into pyo3_asyncio::generic::future_into_py +/// let secs = Rc::new(secs); +/// +/// pyo3_asyncio::generic::local_future_into_py::(py, async move { +/// MyCustomRuntime::sleep(Duration::from_secs(*secs)).await; +/// Python::with_gil(|py| Ok(py.None())) +/// }) +/// } +/// ``` +pub fn local_future_into_py(py: Python, fut: F) -> PyResult<&PyAny> +where + R: SpawnLocalExt, + F: Future> + 'static, +{ + local_future_into_py_with_loop::(get_current_loop::(py)?, fut) +} diff --git a/src/lib.rs b/src/lib.rs index c75f5e7..c283191 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,202 @@ //! means that Cargo's default test harness will no longer work since it doesn't provide a method of //! overriding the main function to add our event loop initialization and finalization. //! +//! ## Event Loop References +//! +//! One problem that arises when interacting with Python's asyncio library is that the functions we use to get a reference to the Python event loop can only be called in certain contexts. Since PyO3 Asyncio needs to interact with Python's event loop during conversions, the context of these conversions can matter a lot. +//! +//! > The core conversions we've mentioned so far in this guide should insulate you from these concerns in most cases, but in the event that they don't, this section should provide you with the information you need to solve these problems. +//! +//! ### The Main Dilemma +//! +//! Python programs can have many independent event loop instances throughout the lifetime of the application (`asyncio.run` for example creates its own event loop each time it's called for instance), and they can even run concurrent with other event loops. For this reason, the most correct method of obtaining a reference to the Python event loop is via `asyncio.get_running_loop`. +//! +//! `asyncio.get_running_loop` returns the event loop associated with the current OS thread. It can be used inside Python coroutines to spawn concurrent tasks, interact with timers, or in our case signal between Rust and Python. This is all well and good when we are operating on a Python thread, but since Rust threads are not associated with a Python event loop, `asyncio.get_running_loop` will fail when called on a Rust runtime. +//! +//! ### The Solution +//! +//! A really straightforward way of dealing with this problem is to pass a reference to the associated Python event loop for every conversion. That's why in `v0.14`, we introduced a new set of conversion functions that do just that: +//! +//! - `pyo3_asyncio::into_future_with_loop` - Convert a Python awaitable into a Rust future with the given asyncio event loop. +//! - `pyo3_asyncio::::future_into_py_with_loop` - Convert a Rust future into a Python awaitable with the given asyncio event loop. +//! - `pyo3_asyncio::::local_future_into_py_with_loop` - Convert a `!Send` Rust future into a Python awaitable with the given asyncio event loop. +//! +//! One clear disadvantage to this approach (aside from the verbose naming) is that the Rust application has to explicitly track its references to the Python event loop. In native libraries, we can't make any assumptions about the underlying event loop, so the only reliable way to make sure our conversions work properly is to store a reference to the current event loop at the callsite to use later on. +//! +//! ```rust +//! use pyo3::prelude::*; +//! +//! #[pyfunction] +//! fn sleep(py: Python) -> PyResult<&PyAny> { +//! let current_loop = pyo3_asyncio::get_running_loop(py)?; +//! let loop_ref = PyObject::from(current_loop); +//! +//! // Convert the async move { } block to a Python awaitable +//! pyo3_asyncio::tokio::future_into_py_with_loop(current_loop, async move { +//! let py_sleep = Python::with_gil(|py| { +//! // Sometimes we need to call other async Python functions within +//! // this future. In order for this to work, we need to track the +//! // event loop from earlier. +//! pyo3_asyncio::into_future_with_loop( +//! loop_ref.as_ref(py), +//! py.import("asyncio")?.call_method1("sleep", (1,))? +//! ) +//! })?; +//! +//! py_sleep.await?; +//! +//! Ok(Python::with_gil(|py| py.None())) +//! }) +//! } +//! +//! #[pymodule] +//! fn my_mod(py: Python, m: &PyModule) -> PyResult<()> { +//! m.add_function(wrap_pyfunction!(sleep, m)?)?; +//! Ok(()) +//! } +//! ``` +//! +//! > A naive solution to this tracking problem would be to cache a global reference to the asyncio event loop that all PyO3 Asyncio conversions can use. In fact this is what we did in PyO3 Asyncio `v0.13`. This works well for applications, but it soon became clear that this is not so ideal for libraries. Libraries usually have no direct control over how the event loop is managed, they're just expected to work with any event loop at any point in the application. This problem is compounded further when multiple event loops are used in the application since the global reference will only point to one. +//! +//! Another disadvantage to this explicit approach that is less obvious is that we can no longer call our `#[pyfunction] fn sleep` on a Rust runtime since `asyncio.get_running_loop` only works on Python threads! It's clear that we need a slightly more flexible approach. +//! +//! In order to detect the Python event loop at the callsite, we need something like `asyncio.get_running_loop` that works for _both Python and Rust_. In Python, `asyncio.get_running_loop` uses thread-local data to retrieve the event loop associated with the current thread. What we need in Rust is something that can retrieve the Python event loop associated with the current _task_. +//! +//! Enter `pyo3_asyncio::::get_current_loop`. This function first checks task-local data for a Python event loop, then falls back on `asyncio.get_running_loop` if no task-local event loop is found. This way both bases are covered. +//! +//! Now, all we need is a way to store the event loop in task-local data. Since this is a runtime-specific feature, you can find the following functions in each runtime module: +//! +//! - `pyo3_asyncio::::scope` - Store the event loop in task-local data when executing the given Future. +//! - `pyo3_asyncio::::scope_local` - Store the event loop in task-local data when executing the given `!Send` Future. +//! +//! With these new functions, we can make our previous example more correct: +//! +//! ```rust no_run +//! use pyo3::prelude::*; +//! +//! #[pyfunction] +//! fn sleep(py: Python) -> PyResult<&PyAny> { +//! // get the current event loop through task-local data +//! // OR `asyncio.get_running_loop` +//! let current_loop = pyo3_asyncio::tokio::get_current_loop(py)?; +//! +//! pyo3_asyncio::tokio::future_into_py_with_loop( +//! current_loop, +//! // Store the current loop in task-local data +//! pyo3_asyncio::tokio::scope(current_loop.into(), async move { +//! let py_sleep = Python::with_gil(|py| { +//! pyo3_asyncio::into_future_with_loop( +//! // Now we can get the current loop through task-local data +//! pyo3_asyncio::tokio::get_current_loop(py)?, +//! py.import("asyncio")?.call_method1("sleep", (1,))? +//! ) +//! })?; +//! +//! py_sleep.await?; +//! +//! Ok(Python::with_gil(|py| py.None())) +//! }) +//! ) +//! } +//! +//! #[pyfunction] +//! fn wrap_sleep(py: Python) -> PyResult<&PyAny> { +//! // get the current event loop through task-local data +//! // OR `asyncio.get_running_loop` +//! let current_loop = pyo3_asyncio::tokio::get_current_loop(py)?; +//! +//! pyo3_asyncio::tokio::future_into_py_with_loop( +//! current_loop, +//! // Store the current loop in task-local data +//! pyo3_asyncio::tokio::scope(current_loop.into(), async move { +//! let py_sleep = Python::with_gil(|py| { +//! pyo3_asyncio::into_future_with_loop( +//! pyo3_asyncio::tokio::get_current_loop(py)?, +//! // We can also call sleep within a Rust task since the +//! // event loop is stored in task local data +//! sleep(py)? +//! ) +//! })?; +//! +//! py_sleep.await?; +//! +//! Ok(Python::with_gil(|py| py.None())) +//! }) +//! ) +//! } +//! +//! #[pymodule] +//! fn my_mod(py: Python, m: &PyModule) -> PyResult<()> { +//! m.add_function(wrap_pyfunction!(sleep, m)?)?; +//! m.add_function(wrap_pyfunction!(wrap_sleep, m)?)?; +//! Ok(()) +//! } +//! ``` +//! +//! Even though this is more correct, it's clearly not more ergonomic. That's why we introduced a new set of functions with this functionality baked in: +//! +//! - `pyo3_asyncio::::into_future` +//! > Convert a Python awaitable into a Rust future (using `pyo3_asyncio::::get_current_loop`) +//! - `pyo3_asyncio::::future_into_py` +//! > Convert a Rust future into a Python awaitable (using `pyo3_asyncio::::get_current_loop` and `pyo3_asyncio::::scope` to set the task-local event loop for the given Rust future) +//! - `pyo3_asyncio::::local_future_into_py` +//! > Convert a `!Send` Rust future into a Python awaitable (using `pyo3_asyncio::::get_current_loop` and `pyo3_asyncio::::scope_local` to set the task-local event loop for the given Rust future). +//! +//! __These are the functions that we recommend using__. With these functions, the previous example can be rewritten to be more compact: +//! +//! ```rust +//! use pyo3::prelude::*; +//! +//! #[pyfunction] +//! fn sleep(py: Python) -> PyResult<&PyAny> { +//! pyo3_asyncio::tokio::future_into_py(py, async move { +//! let py_sleep = Python::with_gil(|py| { +//! pyo3_asyncio::tokio::into_future( +//! py.import("asyncio")?.call_method1("sleep", (1,))? +//! ) +//! })?; +//! +//! py_sleep.await?; +//! +//! Ok(Python::with_gil(|py| py.None())) +//! }) +//! } +//! +//! #[pyfunction] +//! fn wrap_sleep(py: Python) -> PyResult<&PyAny> { +//! pyo3_asyncio::tokio::future_into_py(py, async move { +//! let py_sleep = Python::with_gil(|py| { +//! pyo3_asyncio::tokio::into_future(sleep(py)?) +//! })?; +//! +//! py_sleep.await?; +//! +//! Ok(Python::with_gil(|py| py.None())) +//! }) +//! } +//! +//! #[pymodule] +//! fn my_mod(py: Python, m: &PyModule) -> PyResult<()> { +//! m.add_function(wrap_pyfunction!(sleep, m)?)?; +//! m.add_function(wrap_pyfunction!(wrap_sleep, m)?)?; +//! Ok(()) +//! } +//! ``` +//! +//! ## A Note for `v0.13` Users +//! +//! Hey guys, I realize that these are pretty major changes for `v0.14`, and I apologize in advance for having to modify the public API so much. I hope +//! the explanation above gives some much needed context and justification for all the breaking changes. +//! +//! Part of the reason why it's taken so long to push out a `v0.14` release is because I wanted to make sure we got this release right. There were a lot of issues with the `v0.13` release that I hadn't anticipated, and it's thanks to your feedback and patience that we've worked through these issues to get a more correct, more flexible version out there! +//! +//! This new release should address most the core issues that users have reported in the `v0.13` release, so I think we can expect more stability going forward. +//! +//! Also, a special thanks to [@ShadowJonathan](https://github.com/ShadowJonathan) for helping with the design and review +//! of these changes! +//! +//! - [@awestlake87](https://github.com/awestlake87) +//! //! ## Rust's Event Loop //! //! Currently only the async-std and Tokio runtimes are supported by this crate. @@ -100,6 +296,9 @@ pub mod async_std; #[cfg(feature = "tokio-runtime")] pub mod tokio; +/// Errors and exceptions related to PyO3 Asyncio +pub mod err; + /// Generic implementations of PyO3 Asyncio utilities that can be used for any Rust runtime pub mod generic; @@ -107,7 +306,7 @@ use std::future::Future; use futures::channel::oneshot; use once_cell::sync::OnceCell; -use pyo3::{exceptions::PyKeyboardInterrupt, prelude::*, PyNativeType}; +use pyo3::{exceptions::PyKeyboardInterrupt, prelude::*, types::PyTuple, PyNativeType}; /// Re-exported for #[test] attributes #[cfg(all(feature = "attributes", feature = "testing"))] @@ -139,17 +338,25 @@ pub mod doc_test { doctest!("../README.md", readme_md); } -const EXPECT_INIT: &str = "PyO3 Asyncio has not been initialized"; - static ASYNCIO: OnceCell = OnceCell::new(); static ENSURE_FUTURE: OnceCell = OnceCell::new(); -static EVENT_LOOP: OnceCell = OnceCell::new(); +static GET_RUNNING_LOOP: OnceCell = OnceCell::new(); + +const EXPECT_INIT: &str = "PyO3 Asyncio has not been initialized"; +static CACHED_EVENT_LOOP: OnceCell = OnceCell::new(); static EXECUTOR: OnceCell = OnceCell::new(); -static CALL_SOON: OnceCell = OnceCell::new(); -static CREATE_FUTURE: OnceCell = OnceCell::new(); -fn ensure_future(py: Python) -> &PyAny { - ENSURE_FUTURE.get().expect(EXPECT_INIT).as_ref(py) +fn ensure_future<'p>(py: Python<'p>, awaitable: &'p PyAny) -> PyResult<&'p PyAny> { + ENSURE_FUTURE + .get_or_try_init(|| -> PyResult { + Ok(asyncio(py)?.getattr("ensure_future")?.into()) + })? + .as_ref(py) + .call1((awaitable,)) +} + +fn create_future(event_loop: &PyAny) -> PyResult<&PyAny> { + event_loop.call_method0("create_future") } #[allow(clippy::needless_doctest_main)] @@ -167,6 +374,9 @@ fn ensure_future(py: Python) -> &PyAny { /// use pyo3::prelude::*; /// /// fn main() { +/// // Call this function or use pyo3's "auto-initialize" feature +/// pyo3::prepare_freethreaded_python(); +/// /// Python::with_gil(|py| { /// pyo3_asyncio::with_runtime(py, || { /// println!("PyO3 Asyncio Initialized!"); @@ -179,6 +389,11 @@ fn ensure_future(py: Python) -> &PyAny { /// }) /// } /// ``` +#[deprecated( + since = "0.14.0", + note = "Use the pyo3_asyncio::async_std::run or pyo3_asyncio::tokio::run instead\n\t\t(see the [migration guide](https://github.com/awestlake87/pyo3-asyncio/#migrating-from-013-to-014) for more details)" +)] +#[allow(deprecated)] pub fn with_runtime(py: Python, f: F) -> PyResult where F: FnOnce() -> PyResult, @@ -192,38 +407,91 @@ where Ok(result) } +fn close(event_loop: &PyAny) -> PyResult<()> { + event_loop.call_method1( + "run_until_complete", + (event_loop.call_method0("shutdown_asyncgens")?,), + )?; + + // how to do this prior to 3.9? + if event_loop.hasattr("shutdown_default_executor")? { + event_loop.call_method1( + "run_until_complete", + (event_loop.call_method0("shutdown_default_executor")?,), + )?; + } + + event_loop.call_method0("close")?; + + Ok(()) +} + /// Attempt to initialize the Python and Rust event loops /// /// - Must be called before any other pyo3-asyncio functions. /// - Calling `try_init` a second time returns `Ok(())` and does nothing. /// > In future versions this may return an `Err`. +#[deprecated( + since = "0.14.0", + note = "see the [migration guide](https://github.com/awestlake87/pyo3-asyncio/#migrating-from-013-to-014) for more details" +)] pub fn try_init(py: Python) -> PyResult<()> { - EVENT_LOOP.get_or_try_init(|| -> PyResult { - let asyncio = py.import("asyncio")?; - let ensure_future = asyncio.getattr("ensure_future")?; - let event_loop = asyncio.call_method0("get_event_loop")?; + CACHED_EVENT_LOOP.get_or_try_init(|| -> PyResult { + let event_loop = asyncio_get_event_loop(py)?; let executor = py .import("concurrent.futures.thread")? - .getattr("ThreadPoolExecutor")? - .call0()?; + .call_method0("ThreadPoolExecutor")?; event_loop.call_method1("set_default_executor", (executor,))?; - let call_soon = event_loop.getattr("call_soon_threadsafe")?; - let create_future = event_loop.getattr("create_future")?; - - ASYNCIO.get_or_init(|| asyncio.into()); - ENSURE_FUTURE.get_or_init(|| ensure_future.into()); - EXECUTOR.get_or_init(|| executor.into()); - CALL_SOON.get_or_init(|| call_soon.into()); - CREATE_FUTURE.get_or_init(|| create_future.into()); + + EXECUTOR.set(executor.into()).unwrap(); Ok(event_loop.into()) })?; Ok(()) } +fn asyncio(py: Python) -> PyResult<&PyAny> { + ASYNCIO + .get_or_try_init(|| Ok(py.import("asyncio")?.into())) + .map(|asyncio| asyncio.as_ref(py)) +} + +fn asyncio_get_event_loop(py: Python) -> PyResult<&PyAny> { + asyncio(py)?.call_method0("get_event_loop") +} + /// Get a reference to the Python Event Loop from Rust +/// +/// Equivalent to `asyncio.get_running_loop()` in Python 3.7+. +/// > For Python 3.6, this function falls back to `asyncio.get_event_loop()` which has slightly +/// different behaviour. See the [`asyncio.get_event_loop`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_event_loop) +/// docs to better understand the differences. +pub fn get_running_loop(py: Python) -> PyResult<&PyAny> { + // Ideally should call get_running_loop, but calls get_event_loop for compatibility when + // get_running_loop is not available. + GET_RUNNING_LOOP + .get_or_try_init(|| -> PyResult { + let asyncio = asyncio(py)?; + + if asyncio.hasattr("get_running_loop")? { + // correct behaviour with Python 3.7+ + Ok(asyncio.getattr("get_running_loop")?.into()) + } else { + // Python 3.6 compatibility mode + Ok(asyncio.getattr("get_event_loop")?.into()) + } + })? + .as_ref(py) + .call0() +} + +/// Get a reference to the Python event loop cached by `try_init` (0.13 behaviour) +#[deprecated( + since = "0.14.0", + note = "see the [migration guide](https://github.com/awestlake87/pyo3-asyncio/#migrating-from-013-to-014) for more details" +)] pub fn get_event_loop(py: Python) -> &PyAny { - EVENT_LOOP.get().expect(EXPECT_INIT).as_ref(py) + CACHED_EVENT_LOOP.get().expect(EXPECT_INIT).as_ref(py) } /// Run the event loop forever @@ -240,38 +508,48 @@ pub fn get_event_loop(py: Python) -> &PyAny { /// # Examples /// /// ``` -/// # use std::time::Duration; -/// # use pyo3::prelude::*; -/// # Python::with_gil(|py| { -/// # pyo3_asyncio::with_runtime(py, || { -/// // Wait 1 second, then stop the event loop /// # #[cfg(feature = "async-std-runtime")] -/// async_std::task::spawn(async move { -/// async_std::task::sleep(Duration::from_secs(1)).await; +/// fn main() -> pyo3::PyResult<()> { +/// use std::time::Duration; +/// use pyo3::prelude::*; +/// +/// // call this or use pyo3 0.14 "auto-initialize" feature +/// pyo3::prepare_freethreaded_python(); +/// /// Python::with_gil(|py| { -/// let event_loop = pyo3_asyncio::get_event_loop(py); -/// -/// event_loop -/// .call_method1( -/// "call_soon_threadsafe", -/// (event_loop -/// .getattr("stop") -/// .map_err(|e| e.print_and_set_sys_last_vars(py)) -/// .unwrap(),), -/// ) -/// .map_err(|e| e.print_and_set_sys_last_vars(py)) -/// .unwrap(); -/// }) -/// }); +/// pyo3_asyncio::with_runtime(py, || { +/// let event_loop_hdl = PyObject::from(pyo3_asyncio::get_event_loop(py)); +/// // Wait 1 second, then stop the event loop +/// async_std::task::spawn(async move { +/// async_std::task::sleep(Duration::from_secs(1)).await; +/// Python::with_gil(|py| { +/// event_loop_hdl +/// .as_ref(py) +/// .call_method1( +/// "call_soon_threadsafe", +/// (event_loop_hdl +/// .as_ref(py) +/// .getattr("stop") +/// .map_err(|e| e.print_and_set_sys_last_vars(py)) +/// .unwrap(),), +/// ) +/// .unwrap(); +/// }) +/// }); +/// +/// pyo3_asyncio::run_forever(py)?; /// -/// // block until stop is called -/// # #[cfg(feature = "async-std-runtime")] -/// pyo3_asyncio::run_forever(py)?; -/// # Ok(()) -/// # }) -/// # .map_err(|e| e.print_and_set_sys_last_vars(py)) -/// # .unwrap(); -/// # }) +/// Ok(()) +/// }) +/// }) +/// } +/// # #[cfg(not(feature = "async-std-runtime"))] +/// # fn main() {} +#[deprecated( + since = "0.14.0", + note = "see the [migration guide](https://github.com/awestlake87/pyo3-asyncio/#migrating-from-013-to-014) for more details" +)] +#[allow(deprecated)] pub fn run_forever(py: Python) -> PyResult<()> { if let Err(e) = get_event_loop(py).call_method0("run_forever") { if e.is_instance::(py) { @@ -285,15 +563,20 @@ pub fn run_forever(py: Python) -> PyResult<()> { } /// Shutdown the event loops and perform any necessary cleanup +#[deprecated( + since = "0.14.0", + note = "see the [migration guide](https://github.com/awestlake87/pyo3-asyncio/#migrating-from-013-to-014) for more details" +)] pub fn try_close(py: Python) -> PyResult<()> { - // Shutdown the executor and wait until all threads are cleaned up - EXECUTOR - .get() - .expect(EXPECT_INIT) - .call_method0(py, "shutdown")?; - - get_event_loop(py).call_method0("stop")?; - get_event_loop(py).call_method0("close")?; + if let Some(exec) = EXECUTOR.get() { + // Shutdown the executor and wait until all threads are cleaned up + exec.call_method0(py, "shutdown")?; + } + + if let Some(event_loop) = CACHED_EVENT_LOOP.get() { + close(event_loop.as_ref(py))?; + } + Ok(()) } @@ -339,7 +622,7 @@ impl PyEnsureFuture { #[call] pub fn __call__(&mut self) -> PyResult<()> { Python::with_gil(|py| { - let task = ensure_future(py).call1((self.awaitable.as_ref(py),))?; + let task = ensure_future(py, self.awaitable.as_ref(py))?; let on_complete = PyTaskCompleter { tx: self.tx.take() }; task.call_method1("add_done_callback", (on_complete,))?; @@ -348,6 +631,11 @@ impl PyEnsureFuture { } } +fn call_soon_threadsafe(event_loop: &PyAny, args: impl IntoPy>) -> PyResult<()> { + event_loop.call_method1("call_soon_threadsafe", args)?; + Ok(()) +} + /// Convert a Python `awaitable` into a Rust Future /// /// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A @@ -356,6 +644,7 @@ impl PyEnsureFuture { /// simply awaits the result through the `futures::channel::oneshot::Receiver>`. /// /// # Arguments +/// * `event_loop` - The Python event loop that the awaitable should be attached to /// * `awaitable` - The Python `awaitable` to be converted /// /// # Examples @@ -386,7 +675,8 @@ impl PyEnsureFuture { /// })?; /// /// Python::with_gil(|py| { -/// pyo3_asyncio::into_future( +/// pyo3_asyncio::into_future_with_loop( +/// pyo3_asyncio::get_running_loop(py)?, /// test_mod /// .call_method1(py, "py_sleep", (seconds.into_py(py),))? /// .as_ref(py), @@ -396,12 +686,14 @@ impl PyEnsureFuture { /// Ok(()) /// } /// ``` -pub fn into_future(awaitable: &PyAny) -> PyResult> + Send> { - let py = awaitable.py(); +pub fn into_future_with_loop( + event_loop: &PyAny, + awaitable: &PyAny, +) -> PyResult> + Send> { let (tx, rx) = oneshot::channel(); - CALL_SOON.get().expect(EXPECT_INIT).call1( - py, + call_soon_threadsafe( + event_loop, (PyEnsureFuture { awaitable: awaitable.into(), tx: Some(tx), @@ -413,17 +705,71 @@ pub fn into_future(awaitable: &PyAny) -> PyResult item, Err(_) => Python::with_gil(|py| { Err(PyErr::from_instance( - ASYNCIO - .get() - .expect(EXPECT_INIT) - .call_method0(py, "CancelledError")? - .as_ref(py), + asyncio(py)?.call_method0("CancelledError")?, )) }), } }) } +/// Convert a Python `awaitable` into a Rust Future +/// +/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A +/// completion handler sends the result of this Task through a +/// `futures::channel::oneshot::Sender>` and the future returned by this function +/// simply awaits the result through the `futures::channel::oneshot::Receiver>`. +/// +/// # Arguments +/// * `awaitable` - The Python `awaitable` to be converted +/// +/// # Examples +/// +/// ``` +/// use std::time::Duration; +/// +/// use pyo3::prelude::*; +/// +/// const PYTHON_CODE: &'static str = r#" +/// import asyncio +/// +/// async def py_sleep(duration): +/// await asyncio.sleep(duration) +/// "#; +/// +/// async fn py_sleep(seconds: f32) -> PyResult<()> { +/// let test_mod = Python::with_gil(|py| -> PyResult { +/// Ok( +/// PyModule::from_code( +/// py, +/// PYTHON_CODE, +/// "test_into_future/test_mod.py", +/// "test_mod" +/// )? +/// .into() +/// ) +/// })?; +/// +/// Python::with_gil(|py| { +/// // Only works with cached event loop +/// pyo3_asyncio::into_future( +/// test_mod +/// .call_method1(py, "py_sleep", (seconds.into_py(py),))? +/// .as_ref(py), +/// ) +/// })? +/// .await?; +/// Ok(()) +/// } +/// ``` +#[deprecated( + since = "0.14.0", + note = "Use pyo3_asyncio::async_std::into_future or pyo3_asyncio::tokio::into_future instead\n (see the [migration guide](https://github.com/awestlake87/pyo3-asyncio/#migrating-from-013-to-014) for more details)" +)] +#[allow(deprecated)] +pub fn into_future(awaitable: &PyAny) -> PyResult> + Send> { + into_future_with_loop(get_event_loop(awaitable.py()), awaitable) +} + fn dump_err(py: Python<'_>) -> impl FnOnce(PyErr) + '_ { move |e| { // We can't display Python exceptions via std::fmt::Display, diff --git a/src/testing.rs b/src/testing.rs index 9e8bd0e..84cc3fc 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -18,45 +18,34 @@ //! overriding the default test harness can be quite different from what you're used to doing for //! integration tests, so these next sections will walk you through this process. //! -//! ### Main Test File +//! ## Main Test File //! First, we need to create the test's main file. Although these tests are considered integration //! tests, we cannot put them in the `tests` directory since that is a special directory owned by -//! Cargo. Instead, we put our tests in a `pytests` directory, although the name `pytests` is just -//! a convention. +//! Cargo. Instead, we put our tests in a `pytests` directory. //! -//! We'll also want to provide the test's main function. Most of the functionality that the test -//! harness needs is packed in this module's [`main`](crate::testing::main) function. It will parse -//! the test's CLI arguments, collect and pass the functions marked with -//! [`#[pyo3_asyncio::async_std::test]`](crate::async_std::test) or -//! [`#[pyo3_asyncio::tokio::test]`](crate::tokio::test) and pass them into the test harness for -//! running and filtering. +//! > The name `pytests` is just a convention. You can name this folder anything you want in your own +//! > projects. +//! +//! We'll also want to provide the test's main function. Most of the functionality that the test harness needs is packed in the [`pyo3_asyncio::testing::main`](https://docs.rs/pyo3-asyncio/latest/pyo3_asyncio/testing/fn.main.html) function. This function will parse the test's CLI arguments, collect and pass the functions marked with [`#[pyo3_asyncio::async_std::test]`](https://docs.rs/pyo3-asyncio/latest/pyo3_asyncio/async_std/attr.test.html) or [`#[pyo3_asyncio::tokio::test]`](https://docs.rs/pyo3-asyncio/latest/pyo3_asyncio/tokio/attr.test.html) and pass them into the test harness for running and filtering. //! //! `pytests/test_example.rs` for the `tokio` runtime: -//! ``` -//! # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))] +//! ```rust //! #[pyo3_asyncio::tokio::main] //! async fn main() -> pyo3::PyResult<()> { //! pyo3_asyncio::testing::main().await //! } -//! # -//! # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))] -//! # fn main() {} //! ``` //! //! `pytests/test_example.rs` for the `async-std` runtime: -//! ``` -//! # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))] +//! ```rust //! #[pyo3_asyncio::async_std::main] //! async fn main() -> pyo3::PyResult<()> { //! pyo3_asyncio::testing::main().await //! } -//! # -//! # #[cfg(not(all(feature = "async-std-runtime", feature = "attributes")))] -//! # fn main() {} //! ``` //! -//! ### Cargo Configuration -//! Next, we need to add our test file to the Cargo manifest. Add the following section to your +//! ## Cargo Configuration +//! Next, we need to add our test file to the Cargo manifest by adding the following section to the //! `Cargo.toml` //! //! ```toml @@ -66,34 +55,33 @@ //! harness = false //! ``` //! -//! Also, add the `testing` and `attributes` features to `pyo3-asyncio` and select your preferred -//! runtime: +//! Also add the `testing` and `attributes` features to the `pyo3-asyncio` dependency and select your preferred runtime: //! //! ```toml -//! [dependencies] //! pyo3-asyncio = { version = "0.13", features = ["testing", "attributes", "async-std-runtime"] } //! ``` //! -//! At this point you should be able to run the test via `cargo test` +//! At this point, you should be able to run the test via `cargo test` //! //! ### Adding Tests to the PyO3 Asyncio Test Harness //! //! We can add tests anywhere in the test crate with the runtime's corresponding `#[test]` attribute: //! -//! For `async-std` use the [`pyo3_asyncio::async_std::test`](crate::async_std::test) attribute: -//! ``` -//! # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))] +//! For `async-std` use the [`pyo3_asyncio::async_std::test`](https://docs.rs/pyo3-asyncio/latest/pyo3_asyncio/async_std/attr.test.html) attribute: +//! ```rust //! mod tests { //! use std::{time::Duration, thread}; //! //! use pyo3::prelude::*; //! +//! // tests can be async //! #[pyo3_asyncio::async_std::test] //! async fn test_async_sleep() -> PyResult<()> { //! async_std::task::sleep(Duration::from_secs(1)).await; //! Ok(()) //! } //! +//! // they can also be synchronous //! #[pyo3_asyncio::async_std::test] //! fn test_blocking_sleep() -> PyResult<()> { //! thread::sleep(Duration::from_secs(1)); @@ -101,29 +89,27 @@ //! } //! } //! -//! # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))] //! #[pyo3_asyncio::async_std::main] //! async fn main() -> pyo3::PyResult<()> { //! pyo3_asyncio::testing::main().await //! } -//! # #[cfg(not(all(feature = "async-std-runtime", feature = "attributes")))] -//! # fn main() {} //! ``` //! -//! For `tokio` use the [`pyo3_asyncio::tokio::test`](crate::tokio::test) attribute: -//! ``` -//! # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))] +//! For `tokio` use the [`pyo3_asyncio::tokio::test`](https://docs.rs/pyo3-asyncio/latest/pyo3_asyncio/tokio/attr.test.html) attribute: +//! ```rust //! mod tests { //! use std::{time::Duration, thread}; //! //! use pyo3::prelude::*; //! +//! // tests can be async //! #[pyo3_asyncio::tokio::test] //! async fn test_async_sleep() -> PyResult<()> { //! tokio::time::sleep(Duration::from_secs(1)).await; //! Ok(()) //! } //! +//! // they can also be synchronous //! #[pyo3_asyncio::tokio::test] //! fn test_blocking_sleep() -> PyResult<()> { //! thread::sleep(Duration::from_secs(1)); @@ -131,21 +117,18 @@ //! } //! } //! -//! # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))] //! #[pyo3_asyncio::tokio::main] //! async fn main() -> pyo3::PyResult<()> { //! pyo3_asyncio::testing::main().await //! } -//! # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))] -//! # fn main() {} //! ``` //! -//! ### Lib Tests +//! ## Lib Tests //! //! Unfortunately, as we mentioned at the beginning, these utilities will only run in integration //! tests and doc tests. Running lib tests are out of the question since we need control over the -//! main function. You can however perform compilation checks for lib tests. This is unfortunately -//! much more useful in doc tests than it is for lib tests, but the option is there if you want it. +//! main function. You can however perform compilation checks for lib tests. This is much more +//! useful in doc tests than it is for lib tests, but the option is there if you want it. //! //! `my-crate/src/lib.rs` //! ``` diff --git a/src/tokio.rs b/src/tokio.rs index c629918..af92e2f 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -1,14 +1,16 @@ -use std::{future::Future, thread}; +use std::{future::Future, pin::Pin, sync::Mutex}; use ::tokio::{ runtime::{Builder, Runtime}, task, }; -use futures::future::pending; -use once_cell::sync::OnceCell; +use once_cell::{ + sync::{Lazy, OnceCell}, + unsync::OnceCell as UnsyncOnceCell, +}; use pyo3::prelude::*; -use crate::generic; +use crate::generic::{self, Runtime as GenericRuntime, SpawnLocalExt}; /// attributes /// re-exports for macros @@ -30,10 +32,9 @@ pub use pyo3_asyncio_macros::tokio_main as main; #[cfg(all(feature = "attributes", feature = "testing"))] pub use pyo3_asyncio_macros::tokio_test as test; +static TOKIO_BUILDER: Lazy> = Lazy::new(|| Mutex::new(multi_thread())); static TOKIO_RUNTIME: OnceCell = OnceCell::new(); -const EXPECT_TOKIO_INIT: &str = "Tokio runtime must be initialized"; - impl generic::JoinError for task::JoinError { fn is_panic(&self) -> bool { task::JoinError::is_panic(self) @@ -42,10 +43,31 @@ impl generic::JoinError for task::JoinError { struct TokioRuntime; -impl generic::Runtime for TokioRuntime { +tokio::task_local! { + static EVENT_LOOP: UnsyncOnceCell; +} + +impl GenericRuntime for TokioRuntime { type JoinError = task::JoinError; type JoinHandle = task::JoinHandle<()>; + fn scope(event_loop: PyObject, fut: F) -> Pin + Send>> + where + F: Future + Send + 'static, + { + let cell = UnsyncOnceCell::new(); + cell.set(event_loop).unwrap(); + + Box::pin(EVENT_LOOP.scope(cell, fut)) + } + + fn get_task_event_loop(py: Python) -> Option<&PyAny> { + match EVENT_LOOP.try_with(|c| c.get().map(|event_loop| event_loop.clone().into_ref(py))) { + Ok(event_loop) => event_loop, + Err(_) => None, + } + } + fn spawn(fut: F) -> Self::JoinHandle where F: Future + Send + 'static, @@ -56,7 +78,17 @@ impl generic::Runtime for TokioRuntime { } } -impl generic::SpawnLocalExt for TokioRuntime { +impl SpawnLocalExt for TokioRuntime { + fn scope_local(event_loop: PyObject, fut: F) -> Pin>> + where + F: Future + 'static, + { + let cell = UnsyncOnceCell::new(); + cell.set(event_loop).unwrap(); + + Box::pin(EVENT_LOOP.scope(cell, fut)) + } + fn spawn_local(fut: F) -> Self::JoinHandle where F: Future + 'static, @@ -65,79 +97,51 @@ impl generic::SpawnLocalExt for TokioRuntime { } } -/// Initialize the Tokio Runtime with a custom build -pub fn init(runtime: Runtime) { - TOKIO_RUNTIME - .set(runtime) - .expect("Tokio Runtime has already been initialized"); +/// Set the task local event loop for the given future +pub async fn scope(event_loop: PyObject, fut: F) -> R +where + F: Future + Send + 'static, +{ + TokioRuntime::scope(event_loop, fut).await } -fn current_thread() -> Runtime { - Builder::new_current_thread() - .enable_all() - .build() - .expect("Couldn't build the current-thread Tokio runtime") +/// Set the task local event loop for the given !Send future +pub async fn scope_local(event_loop: PyObject, fut: F) -> R +where + F: Future + 'static, +{ + TokioRuntime::scope_local(event_loop, fut).await } -fn start_current_thread() { - thread::spawn(move || { - TOKIO_RUNTIME.get().unwrap().block_on(pending::<()>()); - }); +/// Get the current event loop from either Python or Rust async task local context +/// +/// This function first checks if the runtime has a task-local reference to the Python event loop. +/// If not, it calls [`get_running_loop`](`crate::get_running_loop`) to get the event loop +/// associated with the current OS thread. +pub fn get_current_loop(py: Python) -> PyResult<&PyAny> { + generic::get_current_loop::(py) } -/// Initialize the Tokio Runtime with current-thread scheduler -/// -/// # Panics -/// This function will panic if called a second time. See [`init_current_thread_once`] if you want -/// to avoid this panic. -pub fn init_current_thread() { - init(current_thread()); - start_current_thread(); +/// Initialize the Tokio runtime with a custom build +pub fn init(builder: Builder) { + *TOKIO_BUILDER.lock().unwrap() = builder } /// Get a reference to the current tokio runtime pub fn get_runtime<'a>() -> &'a Runtime { - TOKIO_RUNTIME.get().expect(EXPECT_TOKIO_INIT) -} - -fn multi_thread() -> Runtime { - Builder::new_multi_thread() - .enable_all() - .build() - .expect("Couldn't build the multi-thread Tokio runtime") -} - -/// Initialize the Tokio Runtime with the multi-thread scheduler -/// -/// # Panics -/// This function will panic if called a second time. See [`init_multi_thread_once`] if you want to -/// avoid this panic. -pub fn init_multi_thread() { - init(multi_thread()); -} - -/// Ensure that the Tokio Runtime is initialized -/// -/// If the runtime has not been initialized already, the multi-thread scheduler -/// is used. Calling this function a second time is a no-op. -pub fn init_multi_thread_once() { - TOKIO_RUNTIME.get_or_init(|| multi_thread()); -} - -/// Ensure that the Tokio Runtime is initialized -/// -/// If the runtime has not been initialized already, the current-thread -/// scheduler is used. Calling this function a second time is a no-op. -pub fn init_current_thread_once() { - let mut initialized = false; TOKIO_RUNTIME.get_or_init(|| { - initialized = true; - current_thread() - }); + TOKIO_BUILDER + .lock() + .unwrap() + .build() + .expect("Unable to build Tokio runtime") + }) +} - if initialized { - start_current_thread(); - } +fn multi_thread() -> Builder { + let mut builder = Builder::new_multi_thread(); + builder.enable_all(); + builder } /// Run the event loop until the given Future completes @@ -148,7 +152,7 @@ pub fn init_current_thread_once() { /// [`crate::run_forever`] /// /// # Arguments -/// * `py` - The current PyO3 GIL guard +/// * `event_loop` - The Python event loop that should run the future /// * `fut` - The future to drive to completion /// /// # Examples @@ -157,17 +161,12 @@ pub fn init_current_thread_once() { /// # use std::time::Duration; /// # /// # use pyo3::prelude::*; -/// # use tokio::runtime::{Builder, Runtime}; -/// # -/// # let runtime = Builder::new_current_thread() -/// # .enable_all() -/// # .build() -/// # .expect("Couldn't build the runtime"); /// # +/// # pyo3::prepare_freethreaded_python(); /// # Python::with_gil(|py| { /// # pyo3_asyncio::with_runtime(py, || { -/// # pyo3_asyncio::tokio::init_current_thread(); -/// pyo3_asyncio::tokio::run_until_complete(py, async move { +/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?; +/// pyo3_asyncio::tokio::run_until_complete(event_loop, async move { /// tokio::time::sleep(Duration::from_secs(1)).await; /// Ok(()) /// })?; @@ -179,11 +178,44 @@ pub fn init_current_thread_once() { /// # .unwrap(); /// # }); /// ``` -pub fn run_until_complete(py: Python, fut: F) -> PyResult<()> +pub fn run_until_complete(event_loop: &PyAny, fut: F) -> PyResult<()> +where + F: Future> + Send + 'static, +{ + generic::run_until_complete::(event_loop, fut) +} + +/// Run the event loop until the given Future completes +/// +/// # Arguments +/// * `py` - The current PyO3 GIL guard +/// * `fut` - The future to drive to completion +/// +/// # Examples +/// +/// ```no_run +/// # use std::time::Duration; +/// # +/// # use pyo3::prelude::*; +/// # +/// fn main() { +/// Python::with_gil(|py| { +/// pyo3_asyncio::tokio::run(py, async move { +/// tokio::time::sleep(Duration::from_secs(1)).await; +/// Ok(()) +/// }) +/// .map_err(|e| { +/// e.print_and_set_sys_last_vars(py); +/// }) +/// .unwrap(); +/// }) +/// } +/// ``` +pub fn run(py: Python, fut: F) -> PyResult<()> where F: Future> + Send + 'static, { - generic::run_until_complete::(py, fut) + generic::run::(py, fut) } /// Convert a Rust Future into a Python awaitable @@ -203,13 +235,17 @@ where /// #[pyfunction] /// fn sleep_for(py: Python, secs: &PyAny) -> PyResult { /// let secs = secs.extract()?; -/// /// pyo3_asyncio::tokio::into_coroutine(py, async move { /// tokio::time::sleep(Duration::from_secs(secs)).await; /// Python::with_gil(|py| Ok(py.None())) /// }) /// } /// ``` +#[deprecated( + since = "0.14.0", + note = "Use pyo3_asyncio::tokio::future_into_py instead\n (see the [migration guide](https://github.com/awestlake87/pyo3-asyncio/#migrating-from-013-to-014) for more details)" +)] +#[allow(deprecated)] pub fn into_coroutine(py: Python, fut: F) -> PyResult where F: Future> + Send + 'static, @@ -217,7 +253,40 @@ where generic::into_coroutine::(py, fut) } -/// Convert a `!Send` Rust Future into a Python awaitable +/// Convert a Rust Future into a Python awaitable +/// +/// # Arguments +/// * `event_loop` - The Python event loop that the awaitable should be attached to +/// * `fut` - The Rust future to be converted +/// +/// # Examples +/// +/// ``` +/// use std::time::Duration; +/// +/// use pyo3::prelude::*; +/// +/// /// Awaitable sleep function +/// #[pyfunction] +/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> { +/// let secs = secs.extract()?; +/// pyo3_asyncio::tokio::future_into_py_with_loop( +/// pyo3_asyncio::tokio::get_current_loop(py)?, +/// async move { +/// tokio::time::sleep(Duration::from_secs(secs)).await; +/// Python::with_gil(|py| Ok(py.None())) +/// } +/// ) +/// } +/// ``` +pub fn future_into_py_with_loop(event_loop: &PyAny, fut: F) -> PyResult<&PyAny> +where + F: Future> + Send + 'static, +{ + generic::future_into_py_with_loop::(event_loop, fut) +} + +/// Convert a Rust Future into a Python awaitable /// /// # Arguments /// * `py` - The current PyO3 GIL guard @@ -226,6 +295,36 @@ where /// # Examples /// /// ``` +/// use std::time::Duration; +/// +/// use pyo3::prelude::*; +/// +/// /// Awaitable sleep function +/// #[pyfunction] +/// fn sleep_for<'p>(py: Python<'p>, secs: &'p PyAny) -> PyResult<&'p PyAny> { +/// let secs = secs.extract()?; +/// pyo3_asyncio::tokio::future_into_py(py, async move { +/// tokio::time::sleep(Duration::from_secs(secs)).await; +/// Python::with_gil(|py| Ok(py.None())) +/// }) +/// } +/// ``` +pub fn future_into_py(py: Python, fut: F) -> PyResult<&PyAny> +where + F: Future> + Send + 'static, +{ + generic::future_into_py::(py, fut) +} + +/// Convert a `!Send` Rust Future into a Python awaitable +/// +/// # Arguments +/// * `event_loop` - The Python event loop that the awaitable should be attached to +/// * `fut` - The Rust future to be converted +/// +/// # Examples +/// +/// ``` /// use std::{rc::Rc, time::Duration}; /// /// use pyo3::prelude::*; @@ -233,9 +332,72 @@ where /// /// Awaitable non-send sleep function /// #[pyfunction] /// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> { -/// // Rc is non-send so it cannot be passed into pyo3_asyncio::tokio::into_coroutine +/// // Rc is non-send so it cannot be passed into pyo3_asyncio::tokio::future_into_py /// let secs = Rc::new(secs); /// +/// pyo3_asyncio::tokio::local_future_into_py_with_loop( +/// pyo3_asyncio::tokio::get_current_loop(py)?, +/// async move { +/// tokio::time::sleep(Duration::from_secs(*secs)).await; +/// Python::with_gil(|py| Ok(py.None())) +/// } +/// ) +/// } +/// +/// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))] +/// #[pyo3_asyncio::tokio::main] +/// async fn main() -> PyResult<()> { +/// let event_loop = Python::with_gil(|py| -> PyResult { +/// Ok(pyo3_asyncio::tokio::get_current_loop(py)?.into()) +/// })?; +/// +/// // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead +/// // we use spawn_blocking in order to use LocalSet::block_on +/// tokio::task::spawn_blocking(move || { +/// // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to +/// // pyo3_asyncio::tokio::local_future_into_py will panic. +/// tokio::task::LocalSet::new().block_on( +/// pyo3_asyncio::tokio::get_runtime(), +/// pyo3_asyncio::tokio::scope_local(event_loop, async { +/// Python::with_gil(|py| { +/// let py_future = sleep_for(py, 1)?; +/// pyo3_asyncio::tokio::into_future(py_future) +/// })? +/// .await?; +/// +/// Ok(()) +/// }) +/// ) +/// }).await.unwrap() +/// } +/// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))] +/// # fn main() {} +/// ``` +pub fn local_future_into_py_with_loop<'p, F>(event_loop: &'p PyAny, fut: F) -> PyResult<&PyAny> +where + F: Future> + 'static, +{ + generic::local_future_into_py_with_loop::(event_loop, fut) +} + +/// Convert a `!Send` Rust Future into a Python awaitable +/// +/// # Arguments +/// * `py` - The current PyO3 GIL guard +/// * `fut` - The Rust future to be converted +/// +/// # Examples +/// +/// ``` +/// use std::{rc::Rc, time::Duration}; +/// +/// use pyo3::prelude::*; +/// +/// /// Awaitable non-send sleep function +/// #[pyfunction] +/// fn sleep_for(py: Python, secs: u64) -> PyResult<&PyAny> { +/// // Rc is non-send so it cannot be passed into pyo3_asyncio::tokio::future_into_py +/// let secs = Rc::new(secs); /// pyo3_asyncio::tokio::local_future_into_py(py, async move { /// tokio::time::sleep(Duration::from_secs(*secs)).await; /// Python::with_gil(|py| Ok(py.None())) @@ -245,28 +407,87 @@ where /// # #[cfg(all(feature = "tokio-runtime", feature = "attributes"))] /// #[pyo3_asyncio::tokio::main] /// async fn main() -> PyResult<()> { +/// let event_loop = Python::with_gil(|py| { +/// PyObject::from(pyo3_asyncio::tokio::get_current_loop(py).unwrap()) +/// }); +/// /// // the main coroutine is running in a Send context, so we cannot use LocalSet here. Instead /// // we use spawn_blocking in order to use LocalSet::block_on -/// tokio::task::spawn_blocking(|| { +/// tokio::task::spawn_blocking(move || { /// // LocalSet allows us to work with !Send futures within tokio. Without it, any calls to /// // pyo3_asyncio::tokio::local_future_into_py will panic. -/// tokio::task::LocalSet::new().block_on(pyo3_asyncio::tokio::get_runtime(), async { -/// Python::with_gil(|py| { -/// let py_future = sleep_for(py, 1)?; -/// pyo3_asyncio::into_future(py_future) -/// })? -/// .await?; +/// tokio::task::LocalSet::new().block_on( +/// pyo3_asyncio::tokio::get_runtime(), +/// pyo3_asyncio::tokio::scope_local(event_loop, async { +/// Python::with_gil(|py| { +/// let py_future = sleep_for(py, 1)?; +/// pyo3_asyncio::tokio::into_future(py_future) +/// })? +/// .await?; /// -/// Ok(()) -/// }) +/// Ok(()) +/// }) +/// ) /// }).await.unwrap() /// } /// # #[cfg(not(all(feature = "tokio-runtime", feature = "attributes")))] /// # fn main() {} /// ``` -pub fn local_future_into_py<'p, F>(py: Python<'p>, fut: F) -> PyResult<&'p PyAny> +pub fn local_future_into_py(py: Python, fut: F) -> PyResult<&PyAny> where F: Future> + 'static, { generic::local_future_into_py::(py, fut) } + +/// Convert a Python `awaitable` into a Rust Future +/// +/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A +/// completion handler sends the result of this Task through a +/// `futures::channel::oneshot::Sender>` and the future returned by this function +/// simply awaits the result through the `futures::channel::oneshot::Receiver>`. +/// +/// # Arguments +/// * `awaitable` - The Python `awaitable` to be converted +/// +/// # Examples +/// +/// ``` +/// use std::time::Duration; +/// +/// use pyo3::prelude::*; +/// +/// const PYTHON_CODE: &'static str = r#" +/// import asyncio +/// +/// async def py_sleep(duration): +/// await asyncio.sleep(duration) +/// "#; +/// +/// async fn py_sleep(seconds: f32) -> PyResult<()> { +/// let test_mod = Python::with_gil(|py| -> PyResult { +/// Ok( +/// PyModule::from_code( +/// py, +/// PYTHON_CODE, +/// "test_into_future/test_mod.py", +/// "test_mod" +/// )? +/// .into() +/// ) +/// })?; +/// +/// Python::with_gil(|py| { +/// pyo3_asyncio::tokio::into_future( +/// test_mod +/// .call_method1(py, "py_sleep", (seconds.into_py(py),))? +/// .as_ref(py), +/// ) +/// })? +/// .await?; +/// Ok(()) +/// } +/// ``` +pub fn into_future(awaitable: &PyAny) -> PyResult> + Send> { + generic::into_future::(awaitable) +}