From e3056066bb4cb74ea8470dff66684e8ecaebab31 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 5 Dec 2023 11:44:52 -1000 Subject: [PATCH] Relocate Bluetooth manager to habluetooth library --- homeassistant/components/bluetooth/api.py | 9 +- homeassistant/components/bluetooth/manager.py | 648 +----------------- homeassistant/components/bluetooth/usage.py | 51 -- .../components/bluetooth/wrappers.py | 391 ----------- 4 files changed, 13 insertions(+), 1086 deletions(-) delete mode 100644 homeassistant/components/bluetooth/usage.py delete mode 100644 homeassistant/components/bluetooth/wrappers.py diff --git a/homeassistant/components/bluetooth/api.py b/homeassistant/components/bluetooth/api.py index afdd26a2001..4acb8d91c84 100644 --- a/homeassistant/components/bluetooth/api.py +++ b/homeassistant/components/bluetooth/api.py @@ -9,17 +9,20 @@ from asyncio import Future from collections.abc import Callable, Iterable from typing import TYPE_CHECKING, cast -from habluetooth import BluetoothScanningMode +from habluetooth import ( + BaseHaScanner, + BluetoothScannerDevice, + BluetoothScanningMode, + HaBleakScannerWrapper, +) from home_assistant_bluetooth import BluetoothServiceInfoBleak from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback as hass_callback -from .base_scanner import BaseHaScanner, BluetoothScannerDevice from .const import DATA_MANAGER from .manager import HomeAssistantBluetoothManager from .match import BluetoothCallbackMatcher from .models import BluetoothCallback, BluetoothChange, ProcessAdvertisementCallback -from .wrappers import HaBleakScannerWrapper if TYPE_CHECKING: from bleak.backends.device import BLEDevice diff --git a/homeassistant/components/bluetooth/manager.py b/homeassistant/components/bluetooth/manager.py index 777d0ebe317..49bf9e39a54 100644 --- a/homeassistant/components/bluetooth/manager.py +++ b/homeassistant/components/bluetooth/manager.py @@ -1,22 +1,15 @@ """The bluetooth integration.""" from __future__ import annotations -import asyncio from collections.abc import Callable, Iterable import itertools import logging -from typing import TYPE_CHECKING, Any, Final +from typing import Final -from bleak.backends.scanner import AdvertisementDataCallback -from bleak_retry_connector import NO_RSSI_VALUE, RSSI_SWITCH_THRESHOLD, BleakSlotManager -from bluetooth_adapters import ( - ADAPTER_ADDRESS, - ADAPTER_PASSIVE_SCAN, - AdapterDetails, - BluetoothAdapters, -) +from bleak_retry_connector import BleakSlotManager +from bluetooth_adapters import BluetoothAdapters from bluetooth_data_tools import monotonic_time_coarse -from habluetooth import TRACKER_BUFFERING_WOBBLE_SECONDS, AdvertisementTracker +from habluetooth import BluetoothManager from homeassistant import config_entries from homeassistant.const import EVENT_LOGGING_CHANGED @@ -28,11 +21,6 @@ from homeassistant.core import ( ) from homeassistant.helpers import discovery_flow -from .base_scanner import BaseHaScanner, BluetoothScannerDevice -from .const import ( - FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS, - UNAVAILABLE_TRACK_SECONDS, -) from .match import ( ADDRESS, CALLBACK, @@ -45,642 +33,19 @@ from .match import ( ) from .models import BluetoothCallback, BluetoothChange, BluetoothServiceInfoBleak from .storage import BluetoothStorage -from .usage import install_multiple_bleak_catcher, uninstall_multiple_bleak_catcher from .util import async_load_history_from_system -if TYPE_CHECKING: - from bleak.backends.device import BLEDevice - from bleak.backends.scanner import AdvertisementData - - -FILTER_UUIDS: Final = "UUIDs" - -APPLE_MFR_ID: Final = 76 -APPLE_IBEACON_START_BYTE: Final = 0x02 # iBeacon (tilt_ble) -APPLE_HOMEKIT_START_BYTE: Final = 0x06 # homekit_controller -APPLE_DEVICE_ID_START_BYTE: Final = 0x10 # bluetooth_le_tracker -APPLE_HOMEKIT_NOTIFY_START_BYTE: Final = 0x11 # homekit_controller -APPLE_START_BYTES_WANTED: Final = { - APPLE_IBEACON_START_BYTE, - APPLE_HOMEKIT_START_BYTE, - APPLE_HOMEKIT_NOTIFY_START_BYTE, - APPLE_DEVICE_ID_START_BYTE, -} - MONOTONIC_TIME: Final = monotonic_time_coarse _LOGGER = logging.getLogger(__name__) -def _dispatch_bleak_callback( - callback: AdvertisementDataCallback | None, - filters: dict[str, set[str]], - device: BLEDevice, - advertisement_data: AdvertisementData, -) -> None: - """Dispatch the callback.""" - if not callback: - # Callback destroyed right before being called, ignore - return - - if (uuids := filters.get(FILTER_UUIDS)) and not uuids.intersection( - advertisement_data.service_uuids - ): - return - - try: - callback(device, advertisement_data) - except Exception: # pylint: disable=broad-except - _LOGGER.exception("Error in callback: %s", callback) - - -class BluetoothManager: - """Manage Bluetooth.""" - - __slots__ = ( - "_cancel_unavailable_tracking", - "_advertisement_tracker", - "_fallback_intervals", - "_intervals", - "_unavailable_callbacks", - "_connectable_unavailable_callbacks", - "_bleak_callbacks", - "_all_history", - "_connectable_history", - "_non_connectable_scanners", - "_connectable_scanners", - "_adapters", - "_sources", - "_bluetooth_adapters", - "storage", - "slot_manager", - "_debug", - "shutdown", - "_loop", - ) - - def __init__( - self, - bluetooth_adapters: BluetoothAdapters, - storage: BluetoothStorage, - slot_manager: BleakSlotManager, - ) -> None: - """Init bluetooth manager.""" - self._cancel_unavailable_tracking: asyncio.TimerHandle | None = None - - self._advertisement_tracker = AdvertisementTracker() - self._fallback_intervals = self._advertisement_tracker.fallback_intervals - self._intervals = self._advertisement_tracker.intervals - - self._unavailable_callbacks: dict[ - str, list[Callable[[BluetoothServiceInfoBleak], None]] - ] = {} - self._connectable_unavailable_callbacks: dict[ - str, list[Callable[[BluetoothServiceInfoBleak], None]] - ] = {} - - self._bleak_callbacks: list[ - tuple[AdvertisementDataCallback, dict[str, set[str]]] - ] = [] - self._all_history: dict[str, BluetoothServiceInfoBleak] = {} - self._connectable_history: dict[str, BluetoothServiceInfoBleak] = {} - self._non_connectable_scanners: list[BaseHaScanner] = [] - self._connectable_scanners: list[BaseHaScanner] = [] - self._adapters: dict[str, AdapterDetails] = {} - self._sources: dict[str, BaseHaScanner] = {} - self._bluetooth_adapters = bluetooth_adapters - self.storage = storage - self.slot_manager = slot_manager - self._debug = _LOGGER.isEnabledFor(logging.DEBUG) - self.shutdown = False - self._loop: asyncio.AbstractEventLoop | None = None - - @property - def supports_passive_scan(self) -> bool: - """Return if passive scan is supported.""" - return any(adapter[ADAPTER_PASSIVE_SCAN] for adapter in self._adapters.values()) - - def async_scanner_count(self, connectable: bool = True) -> int: - """Return the number of scanners.""" - if connectable: - return len(self._connectable_scanners) - return len(self._connectable_scanners) + len(self._non_connectable_scanners) - - async def async_diagnostics(self) -> dict[str, Any]: - """Diagnostics for the manager.""" - scanner_diagnostics = await asyncio.gather( - *[ - scanner.async_diagnostics() - for scanner in itertools.chain( - self._non_connectable_scanners, self._connectable_scanners - ) - ] - ) - return { - "adapters": self._adapters, - "slot_manager": self.slot_manager.diagnostics(), - "scanners": scanner_diagnostics, - "connectable_history": [ - service_info.as_dict() - for service_info in self._connectable_history.values() - ], - "all_history": [ - service_info.as_dict() for service_info in self._all_history.values() - ], - "advertisement_tracker": self._advertisement_tracker.async_diagnostics(), - } - - def _find_adapter_by_address(self, address: str) -> str | None: - for adapter, details in self._adapters.items(): - if details[ADAPTER_ADDRESS] == address: - return adapter - return None - - def async_scanner_by_source(self, source: str) -> BaseHaScanner | None: - """Return the scanner for a source.""" - return self._sources.get(source) - - async def async_get_bluetooth_adapters( - self, cached: bool = True - ) -> dict[str, AdapterDetails]: - """Get bluetooth adapters.""" - if not self._adapters or not cached: - if not cached: - await self._bluetooth_adapters.refresh() - self._adapters = self._bluetooth_adapters.adapters - return self._adapters - - async def async_get_adapter_from_address(self, address: str) -> str | None: - """Get adapter from address.""" - if adapter := self._find_adapter_by_address(address): - return adapter - await self._bluetooth_adapters.refresh() - self._adapters = self._bluetooth_adapters.adapters - return self._find_adapter_by_address(address) - - async def async_setup(self) -> None: - """Set up the bluetooth manager.""" - self._loop = asyncio.get_running_loop() - await self._bluetooth_adapters.refresh() - install_multiple_bleak_catcher() - self.async_setup_unavailable_tracking() - - def async_stop(self) -> None: - """Stop the Bluetooth integration at shutdown.""" - _LOGGER.debug("Stopping bluetooth manager") - self.shutdown = True - if self._cancel_unavailable_tracking: - self._cancel_unavailable_tracking.cancel() - self._cancel_unavailable_tracking = None - uninstall_multiple_bleak_catcher() - - def async_scanner_devices_by_address( - self, address: str, connectable: bool - ) -> list[BluetoothScannerDevice]: - """Get BluetoothScannerDevice by address.""" - if not connectable: - scanners: Iterable[BaseHaScanner] = itertools.chain( - self._connectable_scanners, self._non_connectable_scanners - ) - else: - scanners = self._connectable_scanners - return [ - BluetoothScannerDevice(scanner, *device_adv) - for scanner in scanners - if ( - device_adv := scanner.discovered_devices_and_advertisement_data.get( - address - ) - ) - ] - - def _async_all_discovered_addresses(self, connectable: bool) -> Iterable[str]: - """Return all of discovered addresses. - - Include addresses from all the scanners including duplicates. - """ - yield from itertools.chain.from_iterable( - scanner.discovered_devices_and_advertisement_data - for scanner in self._connectable_scanners - ) - if not connectable: - yield from itertools.chain.from_iterable( - scanner.discovered_devices_and_advertisement_data - for scanner in self._non_connectable_scanners - ) - - def async_discovered_devices(self, connectable: bool) -> list[BLEDevice]: - """Return all of combined best path to discovered from all the scanners.""" - histories = self._connectable_history if connectable else self._all_history - return [history.device for history in histories.values()] - - def async_setup_unavailable_tracking(self) -> None: - """Set up the unavailable tracking.""" - self._schedule_unavailable_tracking() - - def _schedule_unavailable_tracking(self) -> None: - """Schedule the unavailable tracking.""" - if TYPE_CHECKING: - assert self._loop is not None - loop = self._loop - self._cancel_unavailable_tracking = loop.call_at( - loop.time() + UNAVAILABLE_TRACK_SECONDS, self._async_check_unavailable - ) - - def _async_check_unavailable(self) -> None: - """Watch for unavailable devices and cleanup state history.""" - monotonic_now = MONOTONIC_TIME() - connectable_history = self._connectable_history - all_history = self._all_history - tracker = self._advertisement_tracker - intervals = tracker.intervals - - for connectable in (True, False): - if connectable: - unavailable_callbacks = self._connectable_unavailable_callbacks - else: - unavailable_callbacks = self._unavailable_callbacks - history = connectable_history if connectable else all_history - disappeared = set(history).difference( - self._async_all_discovered_addresses(connectable) - ) - for address in disappeared: - if not connectable: - # - # For non-connectable devices we also check the device has exceeded - # the advertising interval before we mark it as unavailable - # since it may have gone to sleep and since we do not need an active - # connection to it we can only determine its availability - # by the lack of advertisements - if advertising_interval := ( - intervals.get(address) or self._fallback_intervals.get(address) - ): - advertising_interval += TRACKER_BUFFERING_WOBBLE_SECONDS - else: - advertising_interval = ( - FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS - ) - time_since_seen = monotonic_now - all_history[address].time - if time_since_seen <= advertising_interval: - continue - - # The second loop (connectable=False) is responsible for removing - # the device from all the interval tracking since it is no longer - # available for both connectable and non-connectable - tracker.async_remove_fallback_interval(address) - tracker.async_remove_address(address) - self._address_disappeared(address) - - service_info = history.pop(address) - - if not (callbacks := unavailable_callbacks.get(address)): - continue - - for callback in callbacks: - try: - callback(service_info) - except Exception: # pylint: disable=broad-except - _LOGGER.exception("Error in unavailable callback") - - self._schedule_unavailable_tracking() - - def _address_disappeared(self, address: str) -> None: - """Call when an address disappears from the stack. - - This method is intended to be overridden by subclasses. - """ - - def _prefer_previous_adv_from_different_source( - self, - old: BluetoothServiceInfoBleak, - new: BluetoothServiceInfoBleak, - ) -> bool: - """Prefer previous advertisement from a different source if it is better.""" - if new.time - old.time > ( - stale_seconds := self._intervals.get( - new.address, - self._fallback_intervals.get( - new.address, FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS - ), - ) - ): - # If the old advertisement is stale, any new advertisement is preferred - if self._debug: - _LOGGER.debug( - ( - "%s (%s): Switching from %s to %s (time elapsed:%s > stale" - " seconds:%s)" - ), - new.name, - new.address, - self._async_describe_source(old), - self._async_describe_source(new), - new.time - old.time, - stale_seconds, - ) - return False - if (new.rssi or NO_RSSI_VALUE) - RSSI_SWITCH_THRESHOLD > ( - old.rssi or NO_RSSI_VALUE - ): - # If new advertisement is RSSI_SWITCH_THRESHOLD more, - # the new one is preferred. - if self._debug: - _LOGGER.debug( - ( - "%s (%s): Switching from %s to %s (new rssi:%s - threshold:%s >" - " old rssi:%s)" - ), - new.name, - new.address, - self._async_describe_source(old), - self._async_describe_source(new), - new.rssi, - RSSI_SWITCH_THRESHOLD, - old.rssi, - ) - return False - return True - - def scanner_adv_received(self, service_info: BluetoothServiceInfoBleak) -> None: - """Handle a new advertisement from any scanner. - - Callbacks from all the scanners arrive here. - """ - - # Pre-filter noisy apple devices as they can account for 20-35% of the - # traffic on a typical network. - if ( - (manufacturer_data := service_info.manufacturer_data) - and APPLE_MFR_ID in manufacturer_data - and manufacturer_data[APPLE_MFR_ID][0] not in APPLE_START_BYTES_WANTED - and len(manufacturer_data) == 1 - and not service_info.service_data - ): - return - - address = service_info.device.address - all_history = self._all_history - connectable = service_info.connectable - connectable_history = self._connectable_history - old_connectable_service_info = connectable and connectable_history.get(address) - source = service_info.source - # This logic is complex due to the many combinations of scanners - # that are supported. - # - # We need to handle multiple connectable and non-connectable scanners - # and we need to handle the case where a device is connectable on one scanner - # but not on another. - # - # The device may also be connectable only by a scanner that has worse - # signal strength than a non-connectable scanner. - # - # all_history - the history of all advertisements from all scanners with the - # best advertisement from each scanner - # connectable_history - the history of all connectable advertisements from all - # scanners with the best advertisement from each - # connectable scanner - # - if ( - (old_service_info := all_history.get(address)) - and source != old_service_info.source - and (scanner := self._sources.get(old_service_info.source)) - and scanner.scanning - and self._prefer_previous_adv_from_different_source( - old_service_info, service_info - ) - ): - # If we are rejecting the new advertisement and the device is connectable - # but not in the connectable history or the connectable source is the same - # as the new source, we need to add it to the connectable history - if connectable: - if old_connectable_service_info and ( - # If its the same as the preferred source, we are done - # as we know we prefer the old advertisement - # from the check above - (old_connectable_service_info is old_service_info) - # If the old connectable source is different from the preferred - # source, we need to check it as well to see if we prefer - # the old connectable advertisement - or ( - source != old_connectable_service_info.source - and ( - connectable_scanner := self._sources.get( - old_connectable_service_info.source - ) - ) - and connectable_scanner.scanning - and self._prefer_previous_adv_from_different_source( - old_connectable_service_info, service_info - ) - ) - ): - return - - connectable_history[address] = service_info - - return - - if connectable: - connectable_history[address] = service_info - - all_history[address] = service_info - - # Track advertisement intervals to determine when we need to - # switch adapters or mark a device as unavailable - tracker = self._advertisement_tracker - if (last_source := tracker.sources.get(address)) and last_source != source: - # Source changed, remove the old address from the tracker - tracker.async_remove_address(address) - if address not in tracker.intervals: - tracker.async_collect(service_info) - - # If the advertisement data is the same as the last time we saw it, we - # don't need to do anything else unless its connectable and we are missing - # connectable history for the device so we can make it available again - # after unavailable callbacks. - if ( - # Ensure its not a connectable device missing from connectable history - not (connectable and not old_connectable_service_info) - # Than check if advertisement data is the same - and old_service_info - and not ( - service_info.manufacturer_data != old_service_info.manufacturer_data - or service_info.service_data != old_service_info.service_data - or service_info.service_uuids != old_service_info.service_uuids - or service_info.name != old_service_info.name - ) - ): - return - - if not connectable and old_connectable_service_info: - # Since we have a connectable path and our BleakClient will - # route any connection attempts to the connectable path, we - # mark the service_info as connectable so that the callbacks - # will be called and the device can be discovered. - service_info = BluetoothServiceInfoBleak( - name=service_info.name, - address=service_info.address, - rssi=service_info.rssi, - manufacturer_data=service_info.manufacturer_data, - service_data=service_info.service_data, - service_uuids=service_info.service_uuids, - source=service_info.source, - device=service_info.device, - advertisement=service_info.advertisement, - connectable=True, - time=service_info.time, - ) - - if (connectable or old_connectable_service_info) and ( - bleak_callbacks := self._bleak_callbacks - ): - # Bleak callbacks must get a connectable device - device = service_info.device - advertisement_data = service_info.advertisement - for callback_filters in bleak_callbacks: - _dispatch_bleak_callback(*callback_filters, device, advertisement_data) - - self._discover_service_info(service_info) - - def _discover_service_info(self, service_info: BluetoothServiceInfoBleak) -> None: - """Discover a new service info. - - This method is intended to be overridden by subclasses. - """ - - def _async_describe_source(self, service_info: BluetoothServiceInfoBleak) -> str: - """Describe a source.""" - if scanner := self._sources.get(service_info.source): - description = scanner.name - else: - description = service_info.source - if service_info.connectable: - description += " [connectable]" - return description - - def async_track_unavailable( - self, - callback: Callable[[BluetoothServiceInfoBleak], None], - address: str, - connectable: bool, - ) -> Callable[[], None]: - """Register a callback.""" - if connectable: - unavailable_callbacks = self._connectable_unavailable_callbacks - else: - unavailable_callbacks = self._unavailable_callbacks - unavailable_callbacks.setdefault(address, []).append(callback) - - def _async_remove_callback() -> None: - unavailable_callbacks[address].remove(callback) - if not unavailable_callbacks[address]: - del unavailable_callbacks[address] - - return _async_remove_callback - - def async_ble_device_from_address( - self, address: str, connectable: bool - ) -> BLEDevice | None: - """Return the BLEDevice if present.""" - histories = self._connectable_history if connectable else self._all_history - if history := histories.get(address): - return history.device - return None - - def async_address_present(self, address: str, connectable: bool) -> bool: - """Return if the address is present.""" - histories = self._connectable_history if connectable else self._all_history - return address in histories - - def async_discovered_service_info( - self, connectable: bool - ) -> Iterable[BluetoothServiceInfoBleak]: - """Return all the discovered services info.""" - histories = self._connectable_history if connectable else self._all_history - return histories.values() - - def async_last_service_info( - self, address: str, connectable: bool - ) -> BluetoothServiceInfoBleak | None: - """Return the last service info for an address.""" - histories = self._connectable_history if connectable else self._all_history - return histories.get(address) - - def async_register_scanner( - self, - scanner: BaseHaScanner, - connectable: bool, - connection_slots: int | None = None, - ) -> CALLBACK_TYPE: - """Register a new scanner.""" - _LOGGER.debug("Registering scanner %s", scanner.name) - if connectable: - scanners = self._connectable_scanners - else: - scanners = self._non_connectable_scanners - - def _unregister_scanner() -> None: - _LOGGER.debug("Unregistering scanner %s", scanner.name) - self._advertisement_tracker.async_remove_source(scanner.source) - scanners.remove(scanner) - del self._sources[scanner.source] - if connection_slots: - self.slot_manager.remove_adapter(scanner.adapter) - - scanners.append(scanner) - self._sources[scanner.source] = scanner - if connection_slots: - self.slot_manager.register_adapter(scanner.adapter, connection_slots) - return _unregister_scanner - - def async_register_bleak_callback( - self, callback: AdvertisementDataCallback, filters: dict[str, set[str]] - ) -> CALLBACK_TYPE: - """Register a callback.""" - callback_entry = (callback, filters) - self._bleak_callbacks.append(callback_entry) - - def _remove_callback() -> None: - self._bleak_callbacks.remove(callback_entry) - - # Replay the history since otherwise we miss devices - # that were already discovered before the callback was registered - # or we are in passive mode - for history in self._connectable_history.values(): - _dispatch_bleak_callback( - callback, filters, history.device, history.advertisement - ) - - return _remove_callback - - def async_release_connection_slot(self, device: BLEDevice) -> None: - """Release a connection slot.""" - self.slot_manager.release_slot(device) - - def async_allocate_connection_slot(self, device: BLEDevice) -> bool: - """Allocate a connection slot.""" - return self.slot_manager.allocate_slot(device) - - def async_get_learned_advertising_interval(self, address: str) -> float | None: - """Get the learned advertising interval for a MAC address.""" - return self._intervals.get(address) - - def async_get_fallback_availability_interval(self, address: str) -> float | None: - """Get the fallback availability timeout for a MAC address.""" - return self._fallback_intervals.get(address) - - def async_set_fallback_availability_interval( - self, address: str, interval: float - ) -> None: - """Override the fallback availability timeout for a MAC address.""" - self._fallback_intervals[address] = interval - - class HomeAssistantBluetoothManager(BluetoothManager): """Manage Bluetooth for Home Assistant.""" __slots__ = ( "hass", + "storage", "_integration_matcher", "_callback_index", "_cancel_logging_listener", @@ -696,10 +61,11 @@ class HomeAssistantBluetoothManager(BluetoothManager): ) -> None: """Init bluetooth manager.""" self.hass = hass + self.storage = storage self._integration_matcher = integration_matcher self._callback_index = BluetoothCallbackMatcherIndex() self._cancel_logging_listener: CALLBACK_TYPE | None = None - super().__init__(bluetooth_adapters, storage, slot_manager) + super().__init__(bluetooth_adapters, slot_manager) @hass_callback def _async_logging_changed(self, event: Event) -> None: diff --git a/homeassistant/components/bluetooth/usage.py b/homeassistant/components/bluetooth/usage.py deleted file mode 100644 index d89f0b5b684..00000000000 --- a/homeassistant/components/bluetooth/usage.py +++ /dev/null @@ -1,51 +0,0 @@ -"""bluetooth usage utility to handle multiple instances.""" - -from __future__ import annotations - -import bleak -from bleak.backends.service import BleakGATTServiceCollection -import bleak_retry_connector - -from .wrappers import HaBleakClientWrapper, HaBleakScannerWrapper - -ORIGINAL_BLEAK_SCANNER = bleak.BleakScanner -ORIGINAL_BLEAK_CLIENT = bleak.BleakClient -ORIGINAL_BLEAK_RETRY_CONNECTOR_CLIENT_WITH_SERVICE_CACHE = ( - bleak_retry_connector.BleakClientWithServiceCache -) -ORIGINAL_BLEAK_RETRY_CONNECTOR_CLIENT = bleak_retry_connector.BleakClient - - -def install_multiple_bleak_catcher() -> None: - """Wrap the bleak classes to return the shared instance. - - In case multiple instances are detected. - """ - bleak.BleakScanner = HaBleakScannerWrapper # type: ignore[misc, assignment] - bleak.BleakClient = HaBleakClientWrapper # type: ignore[misc] - bleak_retry_connector.BleakClientWithServiceCache = HaBleakClientWithServiceCache # type: ignore[misc,assignment] # noqa: E501 - bleak_retry_connector.BleakClient = HaBleakClientWrapper # type: ignore[misc] # noqa: E501 - - -def uninstall_multiple_bleak_catcher() -> None: - """Unwrap the bleak classes.""" - bleak.BleakScanner = ORIGINAL_BLEAK_SCANNER # type: ignore[misc] - bleak.BleakClient = ORIGINAL_BLEAK_CLIENT # type: ignore[misc] - bleak_retry_connector.BleakClientWithServiceCache = ( # type: ignore[misc] - ORIGINAL_BLEAK_RETRY_CONNECTOR_CLIENT_WITH_SERVICE_CACHE - ) - bleak_retry_connector.BleakClient = ( # type: ignore[misc] - ORIGINAL_BLEAK_RETRY_CONNECTOR_CLIENT - ) - - -class HaBleakClientWithServiceCache(HaBleakClientWrapper): - """A BleakClient that implements service caching.""" - - def set_cached_services(self, services: BleakGATTServiceCollection | None) -> None: - """Set the cached services. - - No longer used since bleak 0.17+ has service caching built-in. - - This was only kept for backwards compatibility. - """ diff --git a/homeassistant/components/bluetooth/wrappers.py b/homeassistant/components/bluetooth/wrappers.py deleted file mode 100644 index e3c08a035a8..00000000000 --- a/homeassistant/components/bluetooth/wrappers.py +++ /dev/null @@ -1,391 +0,0 @@ -"""Bleak wrappers for bluetooth.""" -from __future__ import annotations - -import asyncio -from collections.abc import Callable -import contextlib -from dataclasses import dataclass -from functools import partial -import inspect -import logging -from typing import TYPE_CHECKING, Any, Final - -from bleak import BleakClient, BleakError -from bleak.backends.client import BaseBleakClient, get_platform_client_backend_type -from bleak.backends.device import BLEDevice -from bleak.backends.scanner import ( - AdvertisementData, - AdvertisementDataCallback, - BaseBleakScanner, -) -from bleak_retry_connector import ( - NO_RSSI_VALUE, - ble_device_description, - clear_cache, - device_source, -) - -from homeassistant.core import CALLBACK_TYPE, callback as hass_callback -from homeassistant.helpers.frame import report - -from . import models -from .base_scanner import BaseHaScanner, BluetoothScannerDevice - -FILTER_UUIDS: Final = "UUIDs" -_LOGGER = logging.getLogger(__name__) - - -if TYPE_CHECKING: - from .manager import BluetoothManager - - -@dataclass(slots=True) -class _HaWrappedBleakBackend: - """Wrap bleak backend to make it usable by Home Assistant.""" - - device: BLEDevice - scanner: BaseHaScanner - client: type[BaseBleakClient] - source: str | None - - -class HaBleakScannerWrapper(BaseBleakScanner): - """A wrapper that uses the single instance.""" - - def __init__( - self, - *args: Any, - detection_callback: AdvertisementDataCallback | None = None, - service_uuids: list[str] | None = None, - **kwargs: Any, - ) -> None: - """Initialize the BleakScanner.""" - self._detection_cancel: CALLBACK_TYPE | None = None - self._mapped_filters: dict[str, set[str]] = {} - self._advertisement_data_callback: AdvertisementDataCallback | None = None - self._background_tasks: set[asyncio.Task] = set() - remapped_kwargs = { - "detection_callback": detection_callback, - "service_uuids": service_uuids or [], - **kwargs, - } - self._map_filters(*args, **remapped_kwargs) - super().__init__( - detection_callback=detection_callback, service_uuids=service_uuids or [] - ) - - @classmethod - async def discover(cls, timeout: float = 5.0, **kwargs: Any) -> list[BLEDevice]: - """Discover devices.""" - assert models.MANAGER is not None - return list(models.MANAGER.async_discovered_devices(True)) - - async def stop(self, *args: Any, **kwargs: Any) -> None: - """Stop scanning for devices.""" - - async def start(self, *args: Any, **kwargs: Any) -> None: - """Start scanning for devices.""" - - def _map_filters(self, *args: Any, **kwargs: Any) -> bool: - """Map the filters.""" - mapped_filters = {} - if filters := kwargs.get("filters"): - if filter_uuids := filters.get(FILTER_UUIDS): - mapped_filters[FILTER_UUIDS] = set(filter_uuids) - else: - _LOGGER.warning("Only %s filters are supported", FILTER_UUIDS) - if service_uuids := kwargs.get("service_uuids"): - mapped_filters[FILTER_UUIDS] = set(service_uuids) - if mapped_filters == self._mapped_filters: - return False - self._mapped_filters = mapped_filters - return True - - def set_scanning_filter(self, *args: Any, **kwargs: Any) -> None: - """Set the filters to use.""" - if self._map_filters(*args, **kwargs): - self._setup_detection_callback() - - def _cancel_callback(self) -> None: - """Cancel callback.""" - if self._detection_cancel: - self._detection_cancel() - self._detection_cancel = None - - @property - def discovered_devices(self) -> list[BLEDevice]: - """Return a list of discovered devices.""" - assert models.MANAGER is not None - return list(models.MANAGER.async_discovered_devices(True)) - - def register_detection_callback( - self, callback: AdvertisementDataCallback | None - ) -> Callable[[], None]: - """Register a detection callback. - - The callback is called when a device is discovered or has a property changed. - - This method takes the callback and registers it with the long running scanner. - """ - self._advertisement_data_callback = callback - self._setup_detection_callback() - assert self._detection_cancel is not None - return self._detection_cancel - - def _setup_detection_callback(self) -> None: - """Set up the detection callback.""" - if self._advertisement_data_callback is None: - return - callback = self._advertisement_data_callback - self._cancel_callback() - super().register_detection_callback(self._advertisement_data_callback) - assert models.MANAGER is not None - - if not inspect.iscoroutinefunction(callback): - detection_callback = callback - else: - - def detection_callback( - ble_device: BLEDevice, advertisement_data: AdvertisementData - ) -> None: - task = asyncio.create_task(callback(ble_device, advertisement_data)) - self._background_tasks.add(task) - task.add_done_callback(self._background_tasks.discard) - - self._detection_cancel = models.MANAGER.async_register_bleak_callback( - detection_callback, self._mapped_filters - ) - - def __del__(self) -> None: - """Delete the BleakScanner.""" - if self._detection_cancel: - # Nothing to do if event loop is already closed - with contextlib.suppress(RuntimeError): - asyncio.get_running_loop().call_soon_threadsafe(self._detection_cancel) - - -def _rssi_sorter_with_connection_failure_penalty( - device: BluetoothScannerDevice, - connection_failure_count: dict[BaseHaScanner, int], - rssi_diff: int, -) -> float: - """Get a sorted list of scanner, device, advertisement data. - - Adjusting for previous connection failures. - - When a connection fails, we want to try the next best adapter so we - apply a penalty to the RSSI value to make it less likely to be chosen - for every previous connection failure. - - We use the 51% of the RSSI difference between the first and second - best adapter as the penalty. This ensures we will always try the - best adapter twice before moving on to the next best adapter since - the first failure may be a transient service resolution issue. - """ - base_rssi = device.advertisement.rssi or NO_RSSI_VALUE - if connect_failures := connection_failure_count.get(device.scanner): - if connect_failures > 1 and not rssi_diff: - rssi_diff = 1 - return base_rssi - (rssi_diff * connect_failures * 0.51) - return base_rssi - - -class HaBleakClientWrapper(BleakClient): - """Wrap the BleakClient to ensure it does not shutdown our scanner. - - If an address is passed into BleakClient instead of a BLEDevice, - bleak will quietly start a new scanner under the hood to resolve - the address. This can cause a conflict with our scanner. We need - to handle translating the address to the BLEDevice in this case - to avoid the whole stack from getting stuck in an in progress state - when an integration does this. - """ - - def __init__( # pylint: disable=super-init-not-called - self, - address_or_ble_device: str | BLEDevice, - disconnected_callback: Callable[[BleakClient], None] | None = None, - *args: Any, - timeout: float = 10.0, - **kwargs: Any, - ) -> None: - """Initialize the BleakClient.""" - if isinstance(address_or_ble_device, BLEDevice): - self.__address = address_or_ble_device.address - else: - report( - "attempted to call BleakClient with an address instead of a BLEDevice", - exclude_integrations={"bluetooth"}, - error_if_core=False, - ) - self.__address = address_or_ble_device - self.__disconnected_callback = disconnected_callback - self.__timeout = timeout - self.__connect_failures: dict[BaseHaScanner, int] = {} - self._backend: BaseBleakClient | None = None # type: ignore[assignment] - - @property - def is_connected(self) -> bool: - """Return True if the client is connected to a device.""" - return self._backend is not None and self._backend.is_connected - - async def clear_cache(self) -> bool: - """Clear the GATT cache.""" - if self._backend is not None and hasattr(self._backend, "clear_cache"): - return await self._backend.clear_cache() # type: ignore[no-any-return] - return await clear_cache(self.__address) - - def set_disconnected_callback( - self, - callback: Callable[[BleakClient], None] | None, - **kwargs: Any, - ) -> None: - """Set the disconnect callback.""" - self.__disconnected_callback = callback - if self._backend: - self._backend.set_disconnected_callback( - self._make_disconnected_callback(callback), - **kwargs, - ) - - def _make_disconnected_callback( - self, callback: Callable[[BleakClient], None] | None - ) -> Callable[[], None] | None: - """Make the disconnected callback. - - https://github.com/hbldh/bleak/pull/1256 - The disconnected callback needs to get the top level - BleakClientWrapper instance, not the backend instance. - - The signature of the callback for the backend is: - Callable[[], None] - - To make this work we need to wrap the callback in a partial - that passes the BleakClientWrapper instance as the first - argument. - """ - return None if callback is None else partial(callback, self) - - async def connect(self, **kwargs: Any) -> bool: - """Connect to the specified GATT server.""" - assert models.MANAGER is not None - manager = models.MANAGER - if manager.shutdown: - raise BleakError("Bluetooth is already shutdown") - if debug_logging := _LOGGER.isEnabledFor(logging.DEBUG): - _LOGGER.debug("%s: Looking for backend to connect", self.__address) - wrapped_backend = self._async_get_best_available_backend_and_device(manager) - device = wrapped_backend.device - scanner = wrapped_backend.scanner - self._backend = wrapped_backend.client( - device, - disconnected_callback=self._make_disconnected_callback( - self.__disconnected_callback - ), - timeout=self.__timeout, - ) - if debug_logging: - # Only lookup the description if we are going to log it - description = ble_device_description(device) - _, adv = scanner.discovered_devices_and_advertisement_data[device.address] - rssi = adv.rssi - _LOGGER.debug( - "%s: Connecting via %s (last rssi: %s)", description, scanner.name, rssi - ) - connected = None - try: - connected = await super().connect(**kwargs) - finally: - # If we failed to connect and its a local adapter (no source) - # we release the connection slot - if not connected: - self.__connect_failures[scanner] = ( - self.__connect_failures.get(scanner, 0) + 1 - ) - if not wrapped_backend.source: - manager.async_release_connection_slot(device) - - if debug_logging: - _LOGGER.debug( - "%s: Connected via %s (last rssi: %s)", description, scanner.name, rssi - ) - return connected - - @hass_callback - def _async_get_backend_for_ble_device( - self, manager: BluetoothManager, scanner: BaseHaScanner, ble_device: BLEDevice - ) -> _HaWrappedBleakBackend | None: - """Get the backend for a BLEDevice.""" - if not (source := device_source(ble_device)): - # If client is not defined in details - # its the client for this platform - if not manager.async_allocate_connection_slot(ble_device): - return None - cls = get_platform_client_backend_type() - return _HaWrappedBleakBackend(ble_device, scanner, cls, source) - - # Make sure the backend can connect to the device - # as some backends have connection limits - if not scanner.connector or not scanner.connector.can_connect(): - return None - - return _HaWrappedBleakBackend( - ble_device, scanner, scanner.connector.client, source - ) - - @hass_callback - def _async_get_best_available_backend_and_device( - self, manager: BluetoothManager - ) -> _HaWrappedBleakBackend: - """Get a best available backend and device for the given address. - - This method will return the backend with the best rssi - that has a free connection slot. - """ - address = self.__address - devices = manager.async_scanner_devices_by_address(self.__address, True) - sorted_devices = sorted( - devices, - key=lambda device: device.advertisement.rssi or NO_RSSI_VALUE, - reverse=True, - ) - - # If we have connection failures we adjust the rssi sorting - # to prefer the adapter/scanner with the less failures so - # we don't keep trying to connect with an adapter - # that is failing - if self.__connect_failures and len(sorted_devices) > 1: - # We use the rssi diff between to the top two - # to adjust the rssi sorter so that each failure - # will reduce the rssi sorter by the diff amount - rssi_diff = ( - sorted_devices[0].advertisement.rssi - - sorted_devices[1].advertisement.rssi - ) - adjusted_rssi_sorter = partial( - _rssi_sorter_with_connection_failure_penalty, - connection_failure_count=self.__connect_failures, - rssi_diff=rssi_diff, - ) - sorted_devices = sorted( - devices, - key=adjusted_rssi_sorter, - reverse=True, - ) - - for device in sorted_devices: - if backend := self._async_get_backend_for_ble_device( - manager, device.scanner, device.ble_device - ): - return backend - - raise BleakError( - "No backend with an available connection slot that can reach address" - f" {address} was found" - ) - - async def disconnect(self) -> bool: - """Disconnect from the device.""" - if self._backend is None: - return True - return await self._backend.disconnect()