Source code for aiothreads.coroutine_waiter
import asyncio
import threading
from typing import Any, Awaitable, Callable, Coroutine, Optional
from aiothreads.types import EVENT_LOOP, T
class CoroutineWaiter:
def __init__(
self, coroutine: Coroutine[Any, Any, T],
loop: Optional[asyncio.AbstractEventLoop] = None,
):
self.__coro: Coroutine[Any, Any, T] = coroutine
self.__loop = loop or EVENT_LOOP.get()
self.__event = threading.Event()
self.__result: Optional[T] = None
self.__exception: Optional[BaseException] = None
def _on_result(self, task: asyncio.Future) -> None:
self.__exception = task.exception()
if self.__exception is None:
self.__result = task.result()
self.__event.set()
def _awaiter(self) -> None:
task: asyncio.Future = self.__loop.create_task(self.__coro)
task.add_done_callback(self._on_result)
def start(self) -> None:
self.__loop.call_soon_threadsafe(self._awaiter)
def wait(self) -> Any:
self.__event.wait()
if self.__exception is not None:
raise self.__exception
return self.__result
[docs]
def wait_coroutine(
coro: Coroutine[Any, Any, T],
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> T:
waiter = CoroutineWaiter(coro, loop)
waiter.start()
return waiter.wait()
[docs]
def sync_wait_coroutine(
loop: Optional[asyncio.AbstractEventLoop],
coro_func: Callable[..., Coroutine[Any, Any, T]],
*args: Any,
**kwargs: Any,
) -> T:
return wait_coroutine(coro_func(*args, **kwargs), loop=loop)
[docs]
def sync_await(
func: Callable[..., Awaitable[T]],
*args: Any,
**kwargs: Any,
) -> T:
async def awaiter() -> T:
return await func(*args, **kwargs)
return wait_coroutine(awaiter())