mirror of
https://github.com/home-assistant/core.git
synced 2025-06-25 01:21:51 +02:00
Improve util.async_
typing (#66510)
This commit is contained in:
@ -20,6 +20,7 @@ homeassistant.helpers.entity_values
|
||||
homeassistant.helpers.reload
|
||||
homeassistant.helpers.script_variables
|
||||
homeassistant.helpers.translation
|
||||
homeassistant.util.async_
|
||||
homeassistant.util.color
|
||||
homeassistant.util.process
|
||||
homeassistant.util.unit_system
|
||||
|
@ -11,14 +11,20 @@ import threading
|
||||
from traceback import extract_stack
|
||||
from typing import Any, TypeVar
|
||||
|
||||
from typing_extensions import ParamSpec
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
_SHUTDOWN_RUN_CALLBACK_THREADSAFE = "_shutdown_run_callback_threadsafe"
|
||||
|
||||
T = TypeVar("T")
|
||||
_T = TypeVar("_T")
|
||||
_R = TypeVar("_R")
|
||||
_P = ParamSpec("_P")
|
||||
|
||||
|
||||
def fire_coroutine_threadsafe(coro: Coroutine, loop: AbstractEventLoop) -> None:
|
||||
def fire_coroutine_threadsafe(
|
||||
coro: Coroutine[Any, Any, Any], loop: AbstractEventLoop
|
||||
) -> None:
|
||||
"""Submit a coroutine object to a given event loop.
|
||||
|
||||
This method does not provide a way to retrieve the result and
|
||||
@ -40,8 +46,8 @@ def fire_coroutine_threadsafe(coro: Coroutine, loop: AbstractEventLoop) -> None:
|
||||
|
||||
|
||||
def run_callback_threadsafe(
|
||||
loop: AbstractEventLoop, callback: Callable[..., T], *args: Any
|
||||
) -> concurrent.futures.Future[T]:
|
||||
loop: AbstractEventLoop, callback: Callable[..., _T], *args: Any
|
||||
) -> concurrent.futures.Future[_T]:
|
||||
"""Submit a callback object to a given event loop.
|
||||
|
||||
Return a concurrent.futures.Future to access the result.
|
||||
@ -50,7 +56,7 @@ def run_callback_threadsafe(
|
||||
if ident is not None and ident == threading.get_ident():
|
||||
raise RuntimeError("Cannot be called from within the event loop")
|
||||
|
||||
future: concurrent.futures.Future = concurrent.futures.Future()
|
||||
future: concurrent.futures.Future[_T] = concurrent.futures.Future()
|
||||
|
||||
def run_callback() -> None:
|
||||
"""Run callback and store result."""
|
||||
@ -88,7 +94,7 @@ def run_callback_threadsafe(
|
||||
return future
|
||||
|
||||
|
||||
def check_loop(func: Callable, strict: bool = True) -> None:
|
||||
def check_loop(func: Callable[..., Any], strict: bool = True) -> None:
|
||||
"""Warn if called inside the event loop. Raise if `strict` is True."""
|
||||
try:
|
||||
get_running_loop()
|
||||
@ -159,11 +165,11 @@ def check_loop(func: Callable, strict: bool = True) -> None:
|
||||
)
|
||||
|
||||
|
||||
def protect_loop(func: Callable, strict: bool = True) -> Callable:
|
||||
def protect_loop(func: Callable[_P, _R], strict: bool = True) -> Callable[_P, _R]:
|
||||
"""Protect function from running in event loop."""
|
||||
|
||||
@functools.wraps(func)
|
||||
def protected_loop_func(*args, **kwargs): # type: ignore
|
||||
def protected_loop_func(*args: _P.args, **kwargs: _P.kwargs) -> _R:
|
||||
check_loop(func, strict=strict)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
|
Reference in New Issue
Block a user