|
import asyncio |
|
import functools |
|
from typing import Awaitable, Callable, Optional |
|
|
|
import anyio |
|
import anyio.to_thread |
|
from typing_extensions import ParamSpec, TypeVar |
|
|
|
T_ParamSpec = ParamSpec("T_ParamSpec") |
|
T_Retval = TypeVar("T_Retval") |
|
|
|
|
|
def function_has_argument(function: Callable, arg_name: str) -> bool: |
|
"""Helper function to check if a function has a specific argument.""" |
|
import inspect |
|
|
|
signature = inspect.signature(function) |
|
return arg_name in signature.parameters |
|
|
|
|
|
def asyncify( |
|
function: Callable[T_ParamSpec, T_Retval], |
|
*, |
|
cancellable: bool = False, |
|
limiter: Optional[anyio.CapacityLimiter] = None, |
|
) -> Callable[T_ParamSpec, Awaitable[T_Retval]]: |
|
""" |
|
Take a blocking function and create an async one that receives the same |
|
positional and keyword arguments, and that when called, calls the original function |
|
in a worker thread using `anyio.to_thread.run_sync()`. |
|
|
|
If the `cancellable` option is enabled and the task waiting for its completion is |
|
cancelled, the thread will still run its course but its return value (or any raised |
|
exception) will be ignored. |
|
|
|
## Arguments |
|
- `function`: a blocking regular callable (e.g. a function) |
|
- `cancellable`: `True` to allow cancellation of the operation |
|
- `limiter`: capacity limiter to use to limit the total amount of threads running |
|
(if omitted, the default limiter is used) |
|
|
|
## Return |
|
An async function that takes the same positional and keyword arguments as the |
|
original one, that when called runs the same original function in a thread worker |
|
and returns the result. |
|
""" |
|
|
|
async def wrapper( |
|
*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs |
|
) -> T_Retval: |
|
partial_f = functools.partial(function, *args, **kwargs) |
|
|
|
|
|
|
|
|
|
if function_has_argument(anyio.to_thread.run_sync, "abandon_on_cancel"): |
|
return await anyio.to_thread.run_sync( |
|
partial_f, |
|
abandon_on_cancel=cancellable, |
|
limiter=limiter, |
|
) |
|
|
|
return await anyio.to_thread.run_sync( |
|
partial_f, |
|
cancellable=cancellable, |
|
limiter=limiter, |
|
) |
|
|
|
return wrapper |
|
|
|
|
|
def run_async_function(async_function, *args, **kwargs): |
|
""" |
|
Helper utility to run an async function in a sync context. |
|
Handles the case where there is an existing event loop running. |
|
|
|
Args: |
|
async_function (Callable): The async function to run |
|
*args: Positional arguments to pass to the async function |
|
**kwargs: Keyword arguments to pass to the async function |
|
|
|
Returns: |
|
The result of the async function execution |
|
|
|
Example: |
|
```python |
|
async def my_async_func(x, y): |
|
return x + y |
|
|
|
result = run_async_function(my_async_func, 1, 2) |
|
``` |
|
""" |
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
def run_in_new_loop(): |
|
"""Run the coroutine in a new event loop within this thread.""" |
|
new_loop = asyncio.new_event_loop() |
|
try: |
|
asyncio.set_event_loop(new_loop) |
|
return new_loop.run_until_complete(async_function(*args, **kwargs)) |
|
finally: |
|
new_loop.close() |
|
asyncio.set_event_loop(None) |
|
|
|
try: |
|
|
|
_ = asyncio.get_running_loop() |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=1) as executor: |
|
future = executor.submit(run_in_new_loop) |
|
return future.result() |
|
|
|
except RuntimeError: |
|
|
|
return run_in_new_loop() |
|
|