-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for async functions #313
base: main
Are you sure you want to change the base?
Changes from all commits
8378d98
9cf18b4
a5af133
1ab2bb7
e3f3abd
5bf155a
a27a7cc
7e31089
40ca27b
c594736
d468d48
c21dc0f
21ffe16
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
using CSnakes.Runtime.Python; | ||
using System.Collections.Concurrent; | ||
using System.Threading.Tasks; | ||
|
||
namespace CSnakes.Runtime.CPython; | ||
internal unsafe partial class CPythonAPI | ||
{ | ||
internal static bool IsPyCoroutine(PyObject p) | ||
{ | ||
return HasAttr(p, "__await__"); | ||
} | ||
|
||
internal class EventLoop : IDisposable | ||
{ | ||
private readonly PyObject? loop = null; | ||
|
||
public EventLoop() | ||
{ | ||
loop = NewEventLoopFactory!.Call(); | ||
} | ||
|
||
public bool IsDisposed { get; private set; } | ||
|
||
private void Close() | ||
{ | ||
if (loop is null) | ||
{ | ||
throw new InvalidOperationException("Event loop not initialized"); | ||
} | ||
using var close = loop.GetAttr("close"); | ||
close?.Call(); | ||
} | ||
|
||
public void Dispose() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should ensure that |
||
{ | ||
Close(); | ||
loop?.Dispose(); | ||
IsDisposed = true; | ||
} | ||
|
||
public PyObject RunTaskUntilComplete(PyObject coroutine) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Public APIs on a type that implement |
||
{ | ||
if (loop is null) | ||
{ | ||
throw new InvalidOperationException("Event loop not initialized"); | ||
} | ||
using var taskFunc = loop.GetAttr("create_task"); | ||
using var task = taskFunc?.Call(coroutine) ?? PyObject.None; | ||
using var runUntilComplete = loop.GetAttr("run_until_complete"); | ||
var result = runUntilComplete.Call(task); | ||
return result; | ||
} | ||
} | ||
|
||
private static ConcurrentBag<EventLoop> eventLoops = []; | ||
[ThreadStatic] private static EventLoop? currentEventLoop = null; | ||
private static PyObject? AsyncioModule = null; | ||
private static PyObject? NewEventLoopFactory = null; | ||
|
||
internal static EventLoop GetEventLoop() | ||
{ | ||
if (AsyncioModule is null) | ||
{ | ||
throw new InvalidOperationException("Asyncio module not initialized"); | ||
} | ||
if (currentEventLoop is null || currentEventLoop.IsDisposed) | ||
{ | ||
currentEventLoop = new EventLoop(); | ||
eventLoops.Add(currentEventLoop); | ||
} | ||
return currentEventLoop!; | ||
} | ||
|
||
internal static void CloseEventLoops() | ||
{ | ||
foreach (var eventLoop in eventLoops) | ||
{ | ||
eventLoop.Dispose(); | ||
} | ||
eventLoops.Clear(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,4 @@ | ||
using CSnakes.Runtime.Python; | ||
using CSnakes.Runtime.Python.Interns; | ||
using System.Diagnostics; | ||
using System.Reflection; | ||
using System.Runtime.InteropServices; | ||
|
@@ -15,6 +14,8 @@ internal unsafe partial class CPythonAPI : IDisposable | |
private static readonly Lock initLock = new(); | ||
private static Version PythonVersion = new("0.0.0"); | ||
private bool disposedValue = false; | ||
private Task? initializationTask = null; | ||
private CancellationTokenSource? cts = null; | ||
|
||
public CPythonAPI(string pythonLibraryPath, Version version) | ||
{ | ||
|
@@ -51,7 +52,8 @@ private static IntPtr DllImportResolver(string libraryName, Assembly assembly, D | |
{ | ||
// Override default dlopen flags on linux to allow global symbol resolution (required in extension modules) | ||
// See https://github.com/tonybaloney/CSnakes/issues/112#issuecomment-2290643468 | ||
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)){ | ||
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) | ||
{ | ||
return dlopen(pythonLibraryPath!, (int)(RTLD.LAZY | RTLD.GLOBAL)); | ||
} | ||
|
||
|
@@ -65,7 +67,47 @@ internal void Initialize() | |
if (IsInitialized) | ||
return; | ||
|
||
InitializeEmbeddedPython(); | ||
/* Notes: | ||
* | ||
* The CPython initialization and finalization | ||
* methods should be called from the same thread. | ||
* | ||
* Without doing so, CPython can hang on finalization. | ||
* Especially if the code imports the threading module, or | ||
* uses asyncio. | ||
* | ||
* Since we have no guarantee that the Dispose() of this | ||
* class will be called from the same managed CLR thread | ||
* as the one which called Init(), we create a Task with | ||
* a cancellation token and use that as a mechanism to wait | ||
* in the background for shutdown then call the finalization. | ||
*/ | ||
|
||
cts = new CancellationTokenSource(); | ||
bool initialized = false; | ||
initializationTask = Task.Run( | ||
() => { | ||
InitializeEmbeddedPython(); | ||
initialized = true; | ||
while (true) | ||
{ | ||
if (cts.IsCancellationRequested) | ||
{ | ||
FinalizeEmbeddedPython(); | ||
break; | ||
} | ||
else | ||
{ | ||
Thread.Sleep(500); | ||
} | ||
} | ||
}, | ||
cancellationToken: cts.Token); | ||
|
||
while (!initialized) // Wait for startup | ||
{ | ||
continue; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm sure there's a better pattern for doing this, also we should maybe add a 50ms sleep here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could also use some event (for example, |
||
} | ||
} | ||
|
||
private void InitializeEmbeddedPython() | ||
|
@@ -101,6 +143,8 @@ private void InitializeEmbeddedPython() | |
PyBytesType = GetTypeRaw(PyBytes_FromByteSpan(new byte[] { })); | ||
ItemsStrIntern = AsPyUnicodeObject("items"); | ||
PyNone = GetBuiltin("None"); | ||
AsyncioModule = Import("asyncio"); | ||
NewEventLoopFactory = AsyncioModule.GetAttr("new_event_loop"); | ||
} | ||
PyEval_SaveThread(); | ||
} | ||
|
@@ -126,28 +170,48 @@ private void InitializeEmbeddedPython() | |
[LibraryImport(PythonLibraryName, EntryPoint = "Py_SetPath", StringMarshallingCustomType = typeof(Utf32StringMarshaller), StringMarshalling = StringMarshalling.Custom)] | ||
internal static partial void Py_SetPath_UCS4_UTF32(string path); | ||
|
||
protected void FinalizeEmbeddedPython() | ||
{ | ||
lock (initLock) | ||
{ | ||
if (!IsInitialized) | ||
return; | ||
|
||
// Shut down asyncio coroutines | ||
CloseEventLoops(); | ||
|
||
// Clean-up interns | ||
NewEventLoopFactory?.Dispose(); | ||
AsyncioModule?.Dispose(); | ||
// TODO: Add more cleanup code here | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This TODO is to clean up the currently static globals which we never dispose of, but are cleaned up automatically. We should dispose of them incase people recycle environments. |
||
|
||
Debug.WriteLine($"Calling Py_Finalize() on thread {GetNativeThreadId()}"); | ||
|
||
// Acquire the GIL only to dispose it immediately because `PyGILState_Release` | ||
// is not available after `Py_Finalize` is called. This is done primarily to | ||
// trigger the disposal of handles that have been queued before the Python | ||
// runtime is finalized. | ||
|
||
GIL.Acquire().Dispose(); | ||
|
||
PyGILState_Ensure(); | ||
Py_Finalize(); | ||
} | ||
} | ||
|
||
protected virtual void Dispose(bool disposing) | ||
{ | ||
if (!disposedValue) | ||
{ | ||
if (disposing) | ||
{ | ||
lock (initLock) | ||
if (initializationTask != null) | ||
{ | ||
if (!IsInitialized) | ||
return; | ||
|
||
Debug.WriteLine("Calling Py_Finalize()"); | ||
|
||
// Acquire the GIL only to dispose it immediately because `PyGILState_Release` | ||
// is not available after `Py_Finalize` is called. This is done primarily to | ||
// trigger the disposal of handles that have been queued before the Python | ||
// runtime is finalized. | ||
|
||
GIL.Acquire().Dispose(); | ||
if (cts == null) | ||
throw new InvalidOperationException("Invalid runtime state"); | ||
|
||
PyGILState_Ensure(); | ||
Py_Finalize(); | ||
cts.Cancel(); | ||
initializationTask.Wait(); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
using CSnakes.Runtime.CPython; | ||
using CSnakes.Runtime.Python; | ||
using System.Diagnostics; | ||
using System.Reflection; | ||
|
||
namespace CSnakes.Runtime; | ||
internal partial class PyObjectTypeConverter | ||
{ | ||
internal static object ConvertToCoroutine(PyObject pyObject, Type destinationType) | ||
{ | ||
Debug.Assert(destinationType.IsGenericType); | ||
Debug.Assert(!destinationType.IsGenericTypeDefinition); | ||
Debug.Assert(destinationType.GetGenericTypeDefinition() == typeof(ICoroutine<,,>)); | ||
|
||
if (!CPythonAPI.IsPyCoroutine(pyObject)) | ||
{ | ||
throw new InvalidCastException($"Cannot convert {pyObject.GetPythonType()} to a coroutine."); | ||
} | ||
|
||
if (!knownDynamicTypes.TryGetValue(destinationType, out DynamicTypeInfo? typeInfo)) | ||
{ | ||
var typeArgs = destinationType.GetGenericArguments(); | ||
Type coroutineType = typeof(Coroutine<,,>).MakeGenericType(typeArgs); | ||
ConstructorInfo ctor = coroutineType.GetConstructors().First(); | ||
typeInfo = new(ctor); | ||
knownDynamicTypes[destinationType] = typeInfo; | ||
} | ||
|
||
return typeInfo.ReturnTypeConstructor.Invoke([pyObject.Clone()]); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
using CSnakes.Runtime.CPython; | ||
|
||
namespace CSnakes.Runtime.Python; | ||
|
||
public class Coroutine<TYield, TSend, TReturn>(PyObject coroutine) : ICoroutine<TYield, TSend, TReturn> | ||
{ | ||
private TYield current = default!; | ||
private TReturn @return = default!; | ||
|
||
public TYield Current => current; | ||
public TReturn Return => @return; | ||
|
||
public Task<TYield?> AsTask() | ||
{ | ||
return Task.Run( | ||
() => | ||
{ | ||
try | ||
{ | ||
using (GIL.Acquire()) | ||
{ | ||
using PyObject result = CPythonAPI.GetEventLoop().RunTaskUntilComplete(coroutine); | ||
current = result.As<TYield>(); | ||
} | ||
return current; | ||
} | ||
catch (PythonInvocationException ex) | ||
{ | ||
if (ex.InnerException is PythonStopIterationException stopIteration) | ||
{ | ||
using var @return = stopIteration.TakeValue(); | ||
this.@return = @return.As<TReturn>(); | ||
|
||
// Coroutine has finished | ||
// TODO: define behavior for this case | ||
return default; | ||
} | ||
|
||
throw; | ||
} | ||
} | ||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
namespace CSnakes.Runtime.Python; | ||
|
||
public interface ICoroutine<TYield, TSend, TReturn> : ICoroutine | ||
{ | ||
public Task<TYield?> AsTask(); | ||
} | ||
|
||
public interface ICoroutine { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call can return a
null
? If so, why not simply throw here ifnull
?