"""Tests for the USB Discovery integration.""" import asyncio from datetime import timedelta import logging import os from unittest.mock import MagicMock, Mock, call, patch, sentinel import pytest from homeassistant import config_entries from homeassistant.components import usb from homeassistant.components.usb.models import USBDevice from homeassistant.components.usb.utils import scan_serial_ports, usb_device_from_path from homeassistant.const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP from homeassistant.core import HomeAssistant from homeassistant.setup import async_setup_component from homeassistant.util import dt as dt_util from . import ( force_usb_polling_watcher, # noqa: F401 patch_scanned_serial_ports, ) from tests.common import ( MockModule, async_fire_time_changed, mock_config_flow, mock_integration, mock_platform, ) from tests.typing import WebSocketGenerator conbee_device = USBDevice( device="/dev/cu.usbmodemDE24338801", vid="1CF1", pid="0030", serial_number="DE2433880", manufacturer="dresden elektronik ingenieurtechnik GmbH", description="ConBee II", ) slae_sh_device = USBDevice( device="/dev/cu.usbserial-110", vid="10C4", pid="EA60", serial_number="00_12_4B_00_22_98_88_7F", manufacturer="Silicon Labs", description="slae.sh cc2652rb stick - slaesh's iot stuff", ) async def test_aiousbwatcher_discovery( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test that aiousbwatcher can discover a device without raising an exception.""" new_usb = [{"domain": "test1", "vid": "3039"}, {"domain": "test2", "vid": "0FA0"}] mock_ports = [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] aiousbwatcher_callback = None def async_register_callback(callback): nonlocal aiousbwatcher_callback aiousbwatcher_callback = callback MockAIOUSBWatcher = MagicMock() MockAIOUSBWatcher.async_register_callback = async_register_callback with ( patch("sys.platform", "linux"), patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch( "homeassistant.components.usb.AIOUSBWatcher", return_value=MockAIOUSBWatcher ), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() assert aiousbwatcher_callback is not None assert len(mock_config_flow.mock_calls) == 1 assert mock_config_flow.mock_calls[0][1][0] == "test1" await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 1 mock_ports.append( USBDevice( device=slae_sh_device.device, vid="0FA0", pid="0FA0", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ) aiousbwatcher_callback() await hass.async_block_till_done() async_fire_time_changed( hass, dt_util.utcnow() + timedelta(seconds=usb.ADD_REMOVE_SCAN_COOLDOWN) ) await hass.async_block_till_done(wait_background_tasks=True) assert len(mock_config_flow.mock_calls) == 2 assert mock_config_flow.mock_calls[1][1][0] == "test2" hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) await hass.async_block_till_done() @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_polling_discovery( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test that polling can discover a device without raising an exception.""" new_usb = [{"domain": "test1", "vid": "3039"}] mock_comports_found_device = asyncio.Event() def scan_serial_ports() -> list: nonlocal mock_ports # Only "find" a device after a few invocations if len(mock_ports.mock_calls) < 5: return [] mock_comports_found_device.set() return [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] with ( patch("sys.platform", "linux"), patch( "homeassistant.components.usb.POLLING_MONITOR_SCAN_PERIOD", timedelta(seconds=0.01), ), patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(side_effect=scan_serial_ports) as mock_ports, patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() # Wait until a new device is discovered after a few polling attempts assert len(mock_config_flow.mock_calls) == 0 await mock_comports_found_device.wait() await hass.async_block_till_done(wait_background_tasks=True) assert len(mock_config_flow.mock_calls) == 1 assert mock_config_flow.mock_calls[0][1][0] == "test1" hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) await hass.async_block_till_done() @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_removal_by_aiousbwatcher_before_started(hass: HomeAssistant) -> None: """Test a device is removed by the aiousbwatcher before started.""" new_usb = [{"domain": "test1", "vid": "3039", "pid": "3039"}] mock_ports = [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() with patch_scanned_serial_ports(return_value=[]): hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 0 hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) await hass.async_block_till_done() @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_discovered_by_websocket_scan( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test a device is discovered from websocket scan.""" new_usb = [{"domain": "test1", "vid": "3039", "pid": "3039"}] mock_ports = [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 1 assert mock_config_flow.mock_calls[0][1][0] == "test1" @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_discovered_by_websocket_scan_limited_by_description_matcher( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test a device is discovered from websocket scan is limited by the description matcher.""" new_usb = [ {"domain": "test1", "vid": "3039", "pid": "3039", "description": "*2652*"} ] mock_ports = [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 1 assert mock_config_flow.mock_calls[0][1][0] == "test1" @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_most_targeted_matcher_wins( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test that the most targeted matcher is used.""" new_usb = [ {"domain": "less", "vid": "3039", "pid": "3039"}, {"domain": "more", "vid": "3039", "pid": "3039", "description": "*2652*"}, ] mock_ports = [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 1 assert mock_config_flow.mock_calls[0][1][0] == "more" @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_discovered_by_websocket_scan_rejected_by_description_matcher( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test a device is discovered from websocket scan rejected by the description matcher.""" new_usb = [ {"domain": "test1", "vid": "3039", "pid": "3039", "description": "*not_it*"} ] mock_ports = [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 0 @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_discovered_by_websocket_scan_limited_by_serial_number_matcher( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test a device is discovered from websocket scan is limited by the serial_number matcher.""" new_usb = [ { "domain": "test1", "vid": "3039", "pid": "3039", "serial_number": "00_12_4b_00*", } ] mock_ports = [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 1 assert mock_config_flow.mock_calls[0][1][0] == "test1" @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_discovered_by_websocket_scan_rejected_by_serial_number_matcher( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test a device is discovered from websocket scan is rejected by the serial_number matcher.""" new_usb = [ {"domain": "test1", "vid": "3039", "pid": "3039", "serial_number": "123*"} ] mock_ports = [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 0 @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_discovered_by_websocket_scan_limited_by_manufacturer_matcher( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test a device is discovered from websocket scan is limited by the manufacturer matcher.""" new_usb = [ { "domain": "test1", "vid": "3039", "pid": "3039", "manufacturer": "dresden elektronik ingenieurtechnik*", } ] mock_ports = [ USBDevice( device=conbee_device.device, vid="3039", pid="3039", serial_number=conbee_device.serial_number, manufacturer=conbee_device.manufacturer, description=conbee_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 1 assert mock_config_flow.mock_calls[0][1][0] == "test1" @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_discovered_by_websocket_scan_rejected_by_manufacturer_matcher( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test a device is discovered from websocket scan is rejected by the manufacturer matcher.""" new_usb = [ { "domain": "test1", "vid": "3039", "pid": "3039", "manufacturer": "other vendor*", } ] mock_ports = [ USBDevice( device=conbee_device.device, vid="3039", pid="3039", serial_number=conbee_device.serial_number, manufacturer=conbee_device.manufacturer, description=conbee_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 0 @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_discovered_by_websocket_rejected_with_empty_serial_number_only( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test a device is discovered from websocket is rejected with empty serial number.""" new_usb = [ {"domain": "test1", "vid": "3039", "pid": "3039", "serial_number": "123*"} ] mock_ports = [ USBDevice( device=conbee_device.device, vid="3039", pid="3039", serial_number=None, manufacturer=None, description=None, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 0 @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_discovered_by_websocket_scan_match_vid_only( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test a device is discovered from websocket scan only matching vid.""" new_usb = [{"domain": "test1", "vid": "3039"}] mock_ports = [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 1 assert mock_config_flow.mock_calls[0][1][0] == "test1" @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_discovered_by_websocket_scan_match_vid_wrong_pid( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test a device is discovered from websocket scan only matching vid but wrong pid.""" new_usb = [{"domain": "test1", "vid": "3039", "pid": "9999"}] mock_ports = [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 0 @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_discovered_by_websocket_no_vid_pid( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test a device is discovered from websocket scan with no vid or pid.""" new_usb = [{"domain": "test1", "vid": "3039", "pid": "9999"}] mock_ports = [ USBDevice( device=slae_sh_device.device, vid=None, pid=None, serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 0 @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_non_matching_discovered_by_scanner_after_started( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test a websocket scan that does not match.""" new_usb = [{"domain": "test1", "vid": "4444", "pid": "4444"}] mock_ports = [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 0 async def test_aiousbwatcher_on_wsl_fallback_without_throwing_exception( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test that aiousbwatcher on WSL failure results in fallback to scanning without raising an exception.""" new_usb = [{"domain": "test1", "vid": "3039"}] mock_ports = [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] with ( patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=mock_ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 1 assert mock_config_flow.mock_calls[0][1][0] == "test1" async def test_discovered_by_aiousbwatcher_before_started(hass: HomeAssistant) -> None: """Test a device is discovered since aiousbwatcher is now running.""" new_usb = [{"domain": "test1", "vid": "3039", "pid": "3039"}] mock_ports = [ USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) ] initial_ports = [] aiousbwatcher_callback = None def async_register_callback(callback): nonlocal aiousbwatcher_callback aiousbwatcher_callback = callback MockAIOUSBWatcher = MagicMock() MockAIOUSBWatcher.async_register_callback = async_register_callback with ( patch("sys.platform", "linux"), patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=initial_ports), patch( "homeassistant.components.usb.AIOUSBWatcher", return_value=MockAIOUSBWatcher ), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 0 initial_ports.extend(mock_ports) aiousbwatcher_callback() await hass.async_block_till_done() async_fire_time_changed( hass, dt_util.utcnow() + timedelta(seconds=usb.ADD_REMOVE_SCAN_COOLDOWN) ) await hass.async_block_till_done(wait_background_tasks=True) assert len(mock_config_flow.mock_calls) == 1 def test_get_serial_by_id_no_dir() -> None: """Test serial by id conversion if there's no /dev/serial/by-id.""" p1 = patch("os.path.isdir", MagicMock(return_value=False)) p2 = patch("os.scandir") with p1 as is_dir_mock, p2 as scan_mock: res = usb.get_serial_by_id(sentinel.path) assert res is sentinel.path assert is_dir_mock.call_count == 1 assert scan_mock.call_count == 0 def test_get_serial_by_id() -> None: """Test serial by id conversion.""" def _realpath(path): if path is sentinel.matched_link: return sentinel.path return sentinel.serial_link_path with ( patch("os.path.isdir", MagicMock(return_value=True)) as is_dir_mock, patch("os.scandir") as scan_mock, patch("os.path.realpath", side_effect=_realpath), ): res = usb.get_serial_by_id(sentinel.path) assert res is sentinel.path assert is_dir_mock.call_count == 1 assert scan_mock.call_count == 1 entry1 = MagicMock(spec_set=os.DirEntry) entry1.is_symlink.return_value = True entry1.path = sentinel.some_path entry2 = MagicMock(spec_set=os.DirEntry) entry2.is_symlink.return_value = False entry2.path = sentinel.other_path entry3 = MagicMock(spec_set=os.DirEntry) entry3.is_symlink.return_value = True entry3.path = sentinel.matched_link scan_mock.return_value = [entry1, entry2, entry3] res = usb.get_serial_by_id(sentinel.path) assert res is sentinel.matched_link assert is_dir_mock.call_count == 2 assert scan_mock.call_count == 2 def test_human_readable_device_name() -> None: """Test human readable device name includes the passed data.""" name = usb.human_readable_device_name( "/dev/null", "612020FD", "Silicon Labs", "HubZ Smart Home Controller - HubZ Z-Wave Com Port", "10C4", "8A2A", ) assert "/dev/null" in name assert "612020FD" in name assert "Silicon Labs" in name assert "HubZ Smart Home Controller - HubZ Z-Wave Com Port"[:26] in name assert "10C4" in name assert "8A2A" in name name = usb.human_readable_device_name( "/dev/null", "612020FD", "Silicon Labs", None, "10C4", "8A2A", ) assert "/dev/null" in name assert "612020FD" in name assert "Silicon Labs" in name assert "10C4" in name assert "8A2A" in name @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_web_socket_triggers_discovery_request_callbacks( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test the websocket call triggers a discovery request callback.""" mock_callback = Mock() with ( patch("homeassistant.components.usb.async_get_usb", return_value=[]), patch_scanned_serial_ports(return_value=[]), patch.object(hass.config_entries.flow, "async_init"), ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() cancel = usb.async_register_scan_request_callback(hass, mock_callback) ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_callback.mock_calls) == 1 cancel() await ws_client.send_json({"id": 2, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_callback.mock_calls) == 1 @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_initial_scan_callback( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test it's possible to register a callback when the initial scan is done.""" mock_callback_1 = Mock() mock_callback_2 = Mock() with ( patch("homeassistant.components.usb.async_get_usb", return_value=[]), patch_scanned_serial_ports(return_value=[]), patch.object(hass.config_entries.flow, "async_init"), ): assert await async_setup_component(hass, "usb", {"usb": {}}) cancel_1 = usb.async_register_initial_scan_callback(hass, mock_callback_1) assert len(mock_callback_1.mock_calls) == 0 await hass.async_block_till_done() assert len(mock_callback_1.mock_calls) == 0 # This triggers the initial scan hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() assert len(mock_callback_1.mock_calls) == 1 # A callback registered now should be called immediately. The old callback # should not be called again cancel_2 = usb.async_register_initial_scan_callback(hass, mock_callback_2) assert len(mock_callback_1.mock_calls) == 1 assert len(mock_callback_2.mock_calls) == 1 # Calling the cancels should be allowed even if the callback has been called cancel_1() cancel_2() @pytest.mark.usefixtures("force_usb_polling_watcher") async def test_cancel_initial_scan_callback( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test it's possible to cancel an initial scan callback.""" mock_callback = Mock() with ( patch("homeassistant.components.usb.async_get_usb", return_value=[]), patch_scanned_serial_ports(return_value=[]), patch.object(hass.config_entries.flow, "async_init"), ): assert await async_setup_component(hass, "usb", {"usb": {}}) cancel = usb.async_register_initial_scan_callback(hass, mock_callback) assert len(mock_callback.mock_calls) == 0 await hass.async_block_till_done() assert len(mock_callback.mock_calls) == 0 cancel() # This triggers the initial scan hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() assert len(mock_callback.mock_calls) == 0 @pytest.mark.usefixtures("force_usb_polling_watcher") @pytest.mark.parametrize( "ports", [ [ USBDevice( device="/dev/cu.usbserial-2120", vid="3039", pid="3039", serial_number=conbee_device.serial_number, manufacturer=conbee_device.manufacturer, description=conbee_device.description, ), USBDevice( device="/dev/cu.usbserial-1120", vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ), USBDevice( device="/dev/cu.SLAB_USBtoUART", vid="3039", pid="3039", serial_number=conbee_device.serial_number, manufacturer=conbee_device.manufacturer, description=conbee_device.description, ), USBDevice( device="/dev/cu.SLAB_USBtoUART2", vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ), ], [ USBDevice( device="/dev/cu.SLAB_USBtoUART2", vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ), USBDevice( device="/dev/cu.SLAB_USBtoUART", vid="3039", pid="3039", serial_number=conbee_device.serial_number, manufacturer=conbee_device.manufacturer, description=conbee_device.description, ), USBDevice( device="/dev/cu.usbserial-1120", vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ), USBDevice( device="/dev/cu.usbserial-2120", vid="3039", pid="3039", serial_number=conbee_device.serial_number, manufacturer=conbee_device.manufacturer, description=conbee_device.description, ), ], ], ) async def test_cp2102n_ordering_on_macos( ports: list[MagicMock], hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test CP2102N ordering on macOS.""" new_usb = [ {"domain": "test1", "vid": "3039", "pid": "3039", "description": "*2652*"} ] with ( patch("sys.platform", "darwin"), patch("homeassistant.components.usb.async_get_usb", return_value=new_usb), patch_scanned_serial_ports(return_value=ports), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow, ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() ws_client = await hass_ws_client(hass) await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert len(mock_config_flow.mock_calls) == 1 assert mock_config_flow.mock_calls[0][1][0] == "test1" # We always use `cu.SLAB_USBtoUART` assert mock_config_flow.mock_calls[0][2]["data"].device == "/dev/cu.SLAB_USBtoUART2" @pytest.mark.usefixtures("force_usb_polling_watcher") @patch("homeassistant.components.usb.REQUEST_SCAN_COOLDOWN", 0) async def test_register_port_event_callback( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test the registration of a port event callback.""" port1 = USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) port2 = USBDevice( device=conbee_device.device, vid="303A", pid="303A", serial_number=conbee_device.serial_number, manufacturer=conbee_device.manufacturer, description=conbee_device.description, ) ws_client = await hass_ws_client(hass) mock_callback1 = Mock() mock_callback2 = Mock() # Start off with no ports with ( patch_scanned_serial_ports(return_value=[]), ): assert await async_setup_component(hass, "usb", {"usb": {}}) _cancel1 = usb.async_register_port_event_callback(hass, mock_callback1) cancel2 = usb.async_register_port_event_callback(hass, mock_callback2) assert mock_callback1.mock_calls == [] assert mock_callback2.mock_calls == [] # Add two new ports with patch_scanned_serial_ports(return_value=[port1, port2]): await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] assert mock_callback1.mock_calls == [call({port1, port2}, set())] assert mock_callback2.mock_calls == [call({port1, port2}, set())] # Cancel the second callback cancel2() cancel2() mock_callback1.reset_mock() mock_callback2.reset_mock() # Remove port 2 with patch_scanned_serial_ports(return_value=[port1]): await ws_client.send_json({"id": 2, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert mock_callback1.mock_calls == [call(set(), {port2})] assert mock_callback2.mock_calls == [] # The second callback was unregistered mock_callback1.reset_mock() mock_callback2.reset_mock() # Keep port 2 removed with patch_scanned_serial_ports(return_value=[port1]): await ws_client.send_json({"id": 3, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() # Nothing changed so no callback is called assert mock_callback1.mock_calls == [] assert mock_callback2.mock_calls == [] # Unplug one and plug in the other with patch_scanned_serial_ports(return_value=[port2]): await ws_client.send_json({"id": 4, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() assert mock_callback1.mock_calls == [call({port2}, {port1})] assert mock_callback2.mock_calls == [] @pytest.mark.usefixtures("force_usb_polling_watcher") @patch("homeassistant.components.usb.REQUEST_SCAN_COOLDOWN", 0) async def test_register_port_event_callback_failure( hass: HomeAssistant, hass_ws_client: WebSocketGenerator, caplog: pytest.LogCaptureFixture, ) -> None: """Test port event callback failure handling.""" port1 = USBDevice( device=slae_sh_device.device, vid="3039", pid="3039", serial_number=slae_sh_device.serial_number, manufacturer=slae_sh_device.manufacturer, description=slae_sh_device.description, ) port2 = USBDevice( device=conbee_device.device, vid="303A", pid="303A", serial_number=conbee_device.serial_number, manufacturer=conbee_device.manufacturer, description=conbee_device.description, ) ws_client = await hass_ws_client(hass) mock_callback1 = Mock(side_effect=RuntimeError("Failure 1")) mock_callback2 = Mock(side_effect=RuntimeError("Failure 2")) # Start off with no ports with ( patch_scanned_serial_ports(return_value=[]), ): assert await async_setup_component(hass, "usb", {"usb": {}}) usb.async_register_port_event_callback(hass, mock_callback1) usb.async_register_port_event_callback(hass, mock_callback2) assert mock_callback1.mock_calls == [] assert mock_callback2.mock_calls == [] # Add two new ports with ( patch_scanned_serial_ports(return_value=[port1, port2]), caplog.at_level(logging.ERROR, logger="homeassistant.components.usb"), ): await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() # Both were called even though they raised exceptions assert mock_callback1.mock_calls == [call({port1, port2}, set())] assert mock_callback2.mock_calls == [call({port1, port2}, set())] assert caplog.text.count("Error in USB port event callback") == 2 assert "Failure 1" in caplog.text assert "Failure 2" in caplog.text def test_scan_serial_ports_with_unique_symlinks() -> None: """Test scan_serial_ports returns devices with unique /dev/serial/by-id paths.""" entry1 = MagicMock(spec_set=os.DirEntry) entry1.is_symlink.return_value = True entry1.path = "/dev/serial/by-id/usb-device1" entry2 = MagicMock(spec_set=os.DirEntry) entry2.is_symlink.return_value = True entry2.path = "/dev/serial/by-id/usb-device2" mock_port1 = MagicMock() mock_port1.device = "/dev/ttyUSB0" mock_port1.vid = 0x1234 mock_port1.pid = 0x5678 mock_port1.serial_number = "ABC123" mock_port1.manufacturer = "Test Manufacturer" mock_port1.description = "Test Device" mock_port2 = MagicMock() mock_port2.device = "/dev/ttyUSB1" mock_port2.vid = 0xABCD mock_port2.pid = 0xEF01 mock_port2.serial_number = "XYZ789" mock_port2.manufacturer = "Another Manufacturer" mock_port2.description = "Another Device" def mock_realpath(path: str) -> str: realpath_map = { "/dev/serial/by-id/usb-device1": "/dev/ttyUSB0", "/dev/serial/by-id/usb-device2": "/dev/ttyUSB1", } return realpath_map.get(path, path) with ( patch("os.path.isdir", return_value=True), patch("os.scandir", return_value=[entry1, entry2]), patch("os.path.realpath", side_effect=mock_realpath), patch( "homeassistant.components.usb.utils.comports", return_value=[mock_port1, mock_port2], ), ): devices = scan_serial_ports() assert len(devices) == 2 assert devices[0].device == "/dev/serial/by-id/usb-device1" assert devices[0].vid == "1234" assert devices[1].device == "/dev/serial/by-id/usb-device2" assert devices[1].vid == "ABCD" def test_scan_serial_ports_without_unique_symlinks() -> None: """Test scan_serial_ports returns devices with original paths when no symlinks exist.""" mock_port = MagicMock() mock_port.device = "/dev/ttyUSB0" mock_port.vid = 0x1234 mock_port.pid = 0x5678 mock_port.serial_number = "ABC123" mock_port.manufacturer = "Test Manufacturer" mock_port.description = "Test Device" with ( patch("os.path.isdir", return_value=False), patch("os.path.realpath", side_effect=lambda x: x), patch( "homeassistant.components.usb.utils.comports", return_value=[mock_port], ), ): devices = scan_serial_ports() assert len(devices) == 1 assert devices[0].device == "/dev/ttyUSB0" assert devices[0].vid == "1234" def test_usb_device_from_path_finds_by_symlink() -> None: """Test usb_device_from_path finds device by symlink path.""" scanned_device = USBDevice( device="/dev/serial/by-id/usb-device1", vid="1234", pid="5678", serial_number="ABC123", manufacturer="Test Manufacturer", description="Test Device", ) def mock_realpath(path: str) -> str: realpath_map = { "/dev/serial/by-id/usb-device1": "/dev/ttyUSB0", } return realpath_map.get(path, path) with ( patch( "homeassistant.components.usb.utils.scan_serial_ports", return_value=[scanned_device], ), patch("os.path.realpath", side_effect=mock_realpath), ): result = usb_device_from_path("/dev/serial/by-id/usb-device1") assert result == scanned_device def test_usb_device_from_path_finds_by_realpath() -> None: """Test usb_device_from_path finds device by original device path.""" scanned_device = USBDevice( device="/dev/ttyUSB0", vid="1234", pid="5678", serial_number="ABC123", manufacturer="Test Manufacturer", description="Test Device", ) with ( patch( "homeassistant.components.usb.utils.scan_serial_ports", return_value=[scanned_device], ), patch("os.path.realpath", side_effect=lambda x: x), ): result = usb_device_from_path("/dev/ttyUSB0") assert result == scanned_device def test_usb_device_from_path_returns_none_when_not_found() -> None: """Test usb_device_from_path returns None when device not found.""" scanned_device = USBDevice( device="/dev/ttyUSB0", vid="1234", pid="5678", serial_number="ABC123", manufacturer="Test Manufacturer", description="Test Device", ) with ( patch( "homeassistant.components.usb.utils.scan_serial_ports", return_value=[scanned_device], ), patch("os.path.realpath", side_effect=lambda x: x), ): result = usb_device_from_path("/dev/ttyUSB99") assert result is None @pytest.mark.usefixtures("force_usb_polling_watcher") @patch("homeassistant.components.usb.REQUEST_SCAN_COOLDOWN", 0) async def test_removal_aborts_discovery_flows( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test USB device removal aborts the correct discovery flows.""" # Used by test1 device1 = USBDevice( device="/dev/serial/by-id/unique-device-1", vid="1234", pid="5678", serial_number="ABC123", manufacturer="Test Manufacturer 1", description="Test Device 1 for domain test1", ) # Used by test1 device2 = USBDevice( device="/dev/serial/by-id/unique-device-2", vid="ABCD", pid="EF01", serial_number="XYZ789", manufacturer="Test Manufacturer 2", description="Test Device 2 for domain test1", ) # Used by test2 device3 = USBDevice( device="/dev/serial/by-id/unique-device-3", vid="AAAA", pid="BBBB", serial_number="ABCDEF", manufacturer="Test Manufacturer 3", description="Test Device 3 for domain test2", ) # Not used by any domain device4 = USBDevice( device="/dev/serial/by-id/unique-device-4", vid="CCCC", pid="DDDD", serial_number="ABCDEF", manufacturer="Test Manufacturer 4", description="Test Device 4", ) # Used by both test1 and test2 device5 = USBDevice( device="/dev/serial/by-id/multi-domain-device", vid="FFFF", pid="EEEE", serial_number="MULTI123", manufacturer="Test Manufacturer 5", description="Device matching multiple domains", ) class TestFlow(config_entries.ConfigFlow): VERSION = 1 async def async_step_usb(self, discovery_info): return self.async_show_form(step_id="confirm") async def async_step_confirm(self, user_input=None): # There's no way to exit return self.async_show_form(step_id="confirm") mock_integration(hass, MockModule("test1")) mock_platform(hass, "test1.config_flow", None) mock_integration(hass, MockModule("test2")) mock_platform(hass, "test2.config_flow", None) ws_client = await hass_ws_client(hass) with ( patch( "homeassistant.components.usb.async_get_usb", return_value=[ # Domain `test1` matches devices 1 and 2 {"domain": "test1", "vid": "1234", "pid": "5678"}, {"domain": "test1", "vid": "ABCD", "pid": "EF01"}, # Domain `test2` matches device 3 {"domain": "test2", "vid": "AAAA", "pid": "BBBB"}, # Both domains match device 5 {"domain": "test1", "vid": "FFFF", "pid": "EEEE"}, {"domain": "test2", "vid": "FFFF", "pid": "EEEE"}, ], ), # All devices are plugged in initially patch_scanned_serial_ports( return_value=[device1, device2, device3, device4, device5] ), mock_config_flow("test1", TestFlow), mock_config_flow("test2", TestFlow), ): assert await async_setup_component(hass, "usb", {"usb": {}}) await hass.async_block_till_done() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() # Discovery will create five flows (device5 is matched by both domains) flows = hass.config_entries.flow.async_progress() assert len(flows) == 5 # Three flows for test1 (1, 2, 5), two for test2 (3, 5) assert sorted([flow["handler"] for flow in flows]) == [ "test1", "test1", "test1", "test2", "test2", ] # Device 5 is removed with patch_scanned_serial_ports( return_value=[device1, device2, device3, device4] ): await ws_client.send_json({"id": 1, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() # Both flows for device5 should be aborted (one test1, one test2) remaining_flows = hass.config_entries.flow.async_progress() assert len(remaining_flows) == 3 assert sorted([flow["handler"] for flow in remaining_flows]) == [ "test1", "test1", "test2", ] # Device 3 disappears with patch_scanned_serial_ports(return_value=[device1, device2, device4]): await ws_client.send_json({"id": 2, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() # The corresponding flow is removed remaining_flows = hass.config_entries.flow.async_progress() assert len(remaining_flows) == 2 assert sorted([flow["handler"] for flow in remaining_flows]) == [ "test1", "test1", ] # Remove the others with patch_scanned_serial_ports(return_value=[]): await ws_client.send_json({"id": 3, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() # All the remaining flows should be aborted assert len(hass.config_entries.flow.async_progress()) == 0 # Plug one back in and the unused device4 with patch_scanned_serial_ports(return_value=[device3, device4]): await ws_client.send_json({"id": 4, "type": "usb/scan"}) response = await ws_client.receive_json() assert response["success"] await hass.async_block_till_done() # A new flow is re-created for the old device final_flows = hass.config_entries.flow.async_progress() assert len(final_flows) == 1 assert final_flows[0]["handler"] == "test2"