mirror of
https://github.com/home-assistant/core.git
synced 2025-08-25 15:31:36 +02:00
test
This commit is contained in:
@@ -22,6 +22,7 @@ from .const import DOMAIN
|
|||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
MAX_WS_RECONNECT_TIME = 600
|
MAX_WS_RECONNECT_TIME = 600
|
||||||
SCAN_INTERVAL = timedelta(minutes=8)
|
SCAN_INTERVAL = timedelta(minutes=8)
|
||||||
|
DEFAULT_RECONNECT_TIME = 2 # Define a default reconnect time
|
||||||
|
|
||||||
|
|
||||||
class AutomowerDataUpdateCoordinator(DataUpdateCoordinator[dict[str, MowerAttributes]]):
|
class AutomowerDataUpdateCoordinator(DataUpdateCoordinator[dict[str, MowerAttributes]]):
|
||||||
@@ -38,8 +39,10 @@ class AutomowerDataUpdateCoordinator(DataUpdateCoordinator[dict[str, MowerAttrib
|
|||||||
update_interval=SCAN_INTERVAL,
|
update_interval=SCAN_INTERVAL,
|
||||||
)
|
)
|
||||||
self.api = api
|
self.api = api
|
||||||
|
|
||||||
self.ws_connected: bool = False
|
self.ws_connected: bool = False
|
||||||
|
self.reconnect_time: int = (
|
||||||
|
DEFAULT_RECONNECT_TIME # Initialize the reconnect time
|
||||||
|
)
|
||||||
|
|
||||||
async def _async_update_data(self) -> dict[str, MowerAttributes]:
|
async def _async_update_data(self) -> dict[str, MowerAttributes]:
|
||||||
"""Subscribe for websocket and poll data from the API."""
|
"""Subscribe for websocket and poll data from the API."""
|
||||||
@@ -64,22 +67,22 @@ class AutomowerDataUpdateCoordinator(DataUpdateCoordinator[dict[str, MowerAttrib
|
|||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
entry: ConfigEntry,
|
entry: ConfigEntry,
|
||||||
automower_client: AutomowerSession,
|
automower_client: AutomowerSession,
|
||||||
reconnect_time: int = 2,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Listen with the client."""
|
"""Listen with the client."""
|
||||||
try:
|
try:
|
||||||
await automower_client.auth.websocket_connect()
|
await automower_client.auth.websocket_connect()
|
||||||
except HusqvarnaWSServerHandshakeError as err:
|
await automower_client.start_listening() # Ensure we catch errors here too
|
||||||
|
self.reconnect_time = DEFAULT_RECONNECT_TIME # Reset reconnect time after successful connection
|
||||||
|
except (HusqvarnaWSServerHandshakeError, Exception) as err:
|
||||||
_LOGGER.debug(
|
_LOGGER.debug(
|
||||||
"Failed to connect to websocket. Trying to reconnect: %s", err
|
"Failed to connect to websocket. Trying to reconnect: %s",
|
||||||
|
err,
|
||||||
|
)
|
||||||
|
if not hass.is_stopping:
|
||||||
|
await asyncio.sleep(self.reconnect_time)
|
||||||
|
self.reconnect_time = min(self.reconnect_time * 2, MAX_WS_RECONNECT_TIME)
|
||||||
|
entry.async_create_background_task(
|
||||||
|
hass,
|
||||||
|
self.client_listen(hass, entry, automower_client),
|
||||||
|
"reconnect_task",
|
||||||
)
|
)
|
||||||
if not hass.is_stopping:
|
|
||||||
await asyncio.sleep(reconnect_time)
|
|
||||||
reconnect_time = min(reconnect_time * 2, MAX_WS_RECONNECT_TIME)
|
|
||||||
entry.async_create_background_task(
|
|
||||||
hass,
|
|
||||||
self.client_listen(hass, entry, automower_client, reconnect_time),
|
|
||||||
"reconnect_task",
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
await automower_client.start_listening()
|
|
||||||
|
@@ -3,7 +3,7 @@
|
|||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
import http
|
import http
|
||||||
import time
|
import time
|
||||||
from unittest.mock import AsyncMock
|
from unittest.mock import AsyncMock, patch
|
||||||
|
|
||||||
from aioautomower.exceptions import (
|
from aioautomower.exceptions import (
|
||||||
ApiException,
|
ApiException,
|
||||||
@@ -130,30 +130,54 @@ async def test_websocket_not_available(
|
|||||||
caplog: pytest.LogCaptureFixture,
|
caplog: pytest.LogCaptureFixture,
|
||||||
freezer: FrozenDateTimeFactory,
|
freezer: FrozenDateTimeFactory,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test trying reload the websocket."""
|
"""Test trying to reload the websocket."""
|
||||||
mock_automower_client.auth.websocket_connect.side_effect = (
|
with patch(
|
||||||
HusqvarnaWSServerHandshakeError("Boom")
|
"homeassistant.components.husqvarna_automower.coordinator.DEFAULT_RECONNECT_TIME",
|
||||||
)
|
new=0,
|
||||||
await setup_integration(hass, mock_config_entry)
|
):
|
||||||
assert "Failed to connect to websocket. Trying to reconnect: Boom" in caplog.text
|
mock_automower_client.auth.websocket_connect.side_effect = (
|
||||||
assert mock_automower_client.auth.websocket_connect.call_count == 1
|
HusqvarnaWSServerHandshakeError("Boom")
|
||||||
assert mock_config_entry.state is ConfigEntryState.LOADED
|
)
|
||||||
reconnect_time = 2
|
await setup_integration(hass, mock_config_entry)
|
||||||
for count in range(1, 945):
|
assert (
|
||||||
reconnect_time = min(reconnect_time * 2, MAX_WS_RECONNECT_TIME)
|
"Failed to connect to websocket. Trying to reconnect: Boom" in caplog.text
|
||||||
freezer.tick(timedelta(seconds=reconnect_time))
|
)
|
||||||
|
# await hass.async_block_till_done()
|
||||||
|
# assert mock_automower_client.auth.websocket_connect.call_count == 1
|
||||||
|
# assert mock_config_entry.state is ConfigEntryState.LOADED
|
||||||
|
|
||||||
|
# reconnect_time = 0 # Default to zero for testing
|
||||||
|
test_range = 1940
|
||||||
|
start_call_count = mock_automower_client.auth.websocket_connect.call_count
|
||||||
|
for count in range(1, test_range):
|
||||||
|
# reconnect_time = min(reconnect_time * 2, MAX_WS_RECONNECT_TIME)
|
||||||
|
# freezer.tick(timedelta(seconds=reconnect_time))
|
||||||
|
# async_fire_time_changed(hass)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
assert (
|
||||||
|
mock_automower_client.auth.websocket_connect.call_count
|
||||||
|
== start_call_count + count
|
||||||
|
)
|
||||||
|
assert mock_config_entry.state is ConfigEntryState.LOADED
|
||||||
|
|
||||||
|
# Simulate a successful connection and starting of listening
|
||||||
|
mock_automower_client.auth.websocket_connect.side_effect = None
|
||||||
|
freezer.tick(timedelta(seconds=MAX_WS_RECONNECT_TIME))
|
||||||
async_fire_time_changed(hass)
|
async_fire_time_changed(hass)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert mock_automower_client.auth.websocket_connect.call_count == count + 1
|
assert mock_automower_client.start_listening.call_count == 1
|
||||||
assert mock_config_entry.state is ConfigEntryState.LOADED
|
|
||||||
mock_automower_client.auth.websocket_connect.side_effect = None
|
# Test that reconnection occurs if the websocket disconnects again
|
||||||
freezer.tick(timedelta(seconds=MAX_WS_RECONNECT_TIME))
|
mock_automower_client.auth.websocket_connect.side_effect = (
|
||||||
async_fire_time_changed(hass)
|
HusqvarnaWSServerHandshakeError("Boom")
|
||||||
await hass.async_block_till_done()
|
)
|
||||||
assert mock_automower_client.start_listening.call_count == 1
|
freezer.tick(timedelta(seconds=0))
|
||||||
mock_automower_client.auth.websocket_connect.side_effect = (
|
async_fire_time_changed(hass)
|
||||||
HusqvarnaWSServerHandshakeError("Boom")
|
await hass.async_block_till_done()
|
||||||
)
|
assert (
|
||||||
|
mock_automower_client.auth.websocket_connect.call_count
|
||||||
|
== start_call_count + test_range + 1
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def test_device_info(
|
async def test_device_info(
|
||||||
|
Reference in New Issue
Block a user