Use Shelly RPC cover methods from upstream and fix cover status update (#154345)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Shay Levy
2025-10-14 14:07:37 +03:00
committed by GitHub
parent 487940872e
commit d108d5f106
3 changed files with 93 additions and 143 deletions
+21 -12
View File
@@ -29,6 +29,7 @@ from .entity import (
ShellyRpcAttributeEntity,
async_setup_entry_attribute_entities,
async_setup_entry_rpc,
rpc_call,
)
from .utils import get_device_entry_gen
@@ -192,6 +193,7 @@ class RpcShellyCover(ShellyRpcAttributeEntity, CoverEntity):
_attr_supported_features: CoverEntityFeature = (
CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE | CoverEntityFeature.STOP
)
_id: int
def __init__(
self,
@@ -260,7 +262,7 @@ class RpcShellyCover(ShellyRpcAttributeEntity, CoverEntity):
"""Update the cover position every second."""
try:
while self.is_closing or self.is_opening:
await self.coordinator.device.update_status()
await self.coordinator.device.update_cover_status(self._id)
self.async_write_ha_state()
await asyncio.sleep(RPC_COVER_UPDATE_TIME_SEC)
finally:
@@ -274,39 +276,46 @@ class RpcShellyCover(ShellyRpcAttributeEntity, CoverEntity):
if self.is_closing or self.is_opening:
self.launch_update_task()
@rpc_call
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close cover."""
await self.call_rpc("Cover.Close", {"id": self._id})
await self.coordinator.device.cover_close(self._id)
@rpc_call
async def async_open_cover(self, **kwargs: Any) -> None:
"""Open cover."""
await self.call_rpc("Cover.Open", {"id": self._id})
await self.coordinator.device.cover_open(self._id)
@rpc_call
async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the cover to a specific position."""
await self.call_rpc(
"Cover.GoToPosition", {"id": self._id, "pos": kwargs[ATTR_POSITION]}
await self.coordinator.device.cover_set_position(
self._id, pos=kwargs[ATTR_POSITION]
)
@rpc_call
async def async_stop_cover(self, **_kwargs: Any) -> None:
"""Stop the cover."""
await self.call_rpc("Cover.Stop", {"id": self._id})
await self.coordinator.device.cover_stop(self._id)
@rpc_call
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
"""Open the cover tilt."""
await self.call_rpc("Cover.GoToPosition", {"id": self._id, "slat_pos": 100})
await self.coordinator.device.cover_set_position(self._id, slat_pos=100)
@rpc_call
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
"""Close the cover tilt."""
await self.call_rpc("Cover.GoToPosition", {"id": self._id, "slat_pos": 0})
await self.coordinator.device.cover_set_position(self._id, slat_pos=0)
@rpc_call
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the cover tilt to a specific position."""
await self.call_rpc(
"Cover.GoToPosition",
{"id": self._id, "slat_pos": kwargs[ATTR_TILT_POSITION]},
await self.coordinator.device.cover_set_position(
self._id, slat_pos=kwargs[ATTR_TILT_POSITION]
)
@rpc_call
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
"""Stop the cover."""
await self.call_rpc("Cover.Stop", {"id": self._id})
await self.coordinator.device.cover_stop(self._id)
+10
View File
@@ -617,6 +617,13 @@ async def mock_rpc_device():
{}, RpcUpdateType.INITIALIZED
)
current_pos = iter(range(50, -1, -10)) # from 50 to 0 in steps of 10
async def update_cover_status(cover_id: int):
device.status[f"cover:{cover_id}"]["current_pos"] = next(
current_pos, device.status[f"cover:{cover_id}"]["current_pos"]
)
device = _mock_rpc_device()
rpc_device_mock.return_value = device
rpc_device_mock.return_value.mock_disconnected = Mock(side_effect=disconnected)
@@ -624,6 +631,9 @@ async def mock_rpc_device():
rpc_device_mock.return_value.mock_event = Mock(side_effect=event)
rpc_device_mock.return_value.mock_online = Mock(side_effect=online)
rpc_device_mock.return_value.mock_initialized = Mock(side_effect=initialized)
rpc_device_mock.return_value.update_cover_status = AsyncMock(
side_effect=update_cover_status
)
yield rpc_device_mock.return_value
+62 -131
View File
@@ -139,6 +139,8 @@ async def test_rpc_device_services(
{ATTR_ENTITY_ID: entity_id, ATTR_POSITION: 50},
blocking=True,
)
mock_rpc_device.cover_set_position.assert_called_once_with(0, pos=50)
assert (state := hass.states.get(entity_id))
assert state.attributes[ATTR_CURRENT_POSITION] == 50
@@ -153,6 +155,7 @@ async def test_rpc_device_services(
)
mock_rpc_device.mock_update()
mock_rpc_device.cover_open.assert_called_once_with(0)
assert (state := hass.states.get(entity_id))
assert state.state == CoverState.OPENING
@@ -167,6 +170,7 @@ async def test_rpc_device_services(
)
mock_rpc_device.mock_update()
mock_rpc_device.cover_close.assert_called_once_with(0)
assert (state := hass.states.get(entity_id))
assert state.state == CoverState.CLOSING
@@ -178,6 +182,8 @@ async def test_rpc_device_services(
blocking=True,
)
mock_rpc_device.mock_update()
mock_rpc_device.cover_stop.assert_called_once_with(0)
assert (state := hass.states.get(entity_id))
assert state.state == CoverState.CLOSED
@@ -262,9 +268,11 @@ async def test_rpc_cover_tilt(
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cover:0", "slat_pos", 50)
mock_rpc_device.mock_update()
mock_rpc_device.cover_set_position.assert_called_once_with(0, slat_pos=50)
assert (state := hass.states.get(entity_id))
assert state.attributes[ATTR_CURRENT_TILT_POSITION] == 50
mock_rpc_device.cover_set_position.reset_mock()
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_OPEN_COVER_TILT,
@@ -274,9 +282,11 @@ async def test_rpc_cover_tilt(
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cover:0", "slat_pos", 100)
mock_rpc_device.mock_update()
mock_rpc_device.cover_set_position.assert_called_once_with(0, slat_pos=100)
assert (state := hass.states.get(entity_id))
assert state.attributes[ATTR_CURRENT_TILT_POSITION] == 100
mock_rpc_device.cover_set_position.reset_mock()
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_CLOSE_COVER_TILT,
@@ -292,17 +302,62 @@ async def test_rpc_cover_tilt(
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cover:0", "slat_pos", 10)
mock_rpc_device.mock_update()
mock_rpc_device.cover_stop.assert_called_once_with(0)
assert (state := hass.states.get(entity_id))
assert state.attributes[ATTR_CURRENT_TILT_POSITION] == 10
async def test_update_position_closing(
async def test_rpc_cover_position_update(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
mock_rpc_device: Mock,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test update_position while the cover is closing."""
"""Test RPC update_position while the cover is moving."""
entity_id = "cover.test_name_test_cover_0"
await init_integration(hass, 2)
# Set initial state to closing, position 50 set by update_cover_status mock
mutate_rpc_device_status(
monkeypatch, mock_rpc_device, "cover:0", "state", "closing"
)
mock_rpc_device.mock_update()
assert (state := hass.states.get(entity_id))
assert state.state == CoverState.CLOSING
assert state.attributes[ATTR_CURRENT_POSITION] == 50
# Simulate position updates during closing
for position in range(40, -1, -10):
mock_rpc_device.update_cover_status.reset_mock()
await mock_polling_rpc_update(hass, freezer, RPC_COVER_UPDATE_TIME_SEC)
mock_rpc_device.update_cover_status.assert_called_once_with(0)
assert (state := hass.states.get(entity_id))
assert state.attributes[ATTR_CURRENT_POSITION] == position
assert state.state == CoverState.CLOSING
# Simulate cover reaching final position
mock_rpc_device.update_cover_status.reset_mock()
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cover:0", "state", "closed")
mock_rpc_device.mock_update()
assert (state := hass.states.get(entity_id))
assert state.attributes[ATTR_CURRENT_POSITION] == 0
assert state.state == CoverState.CLOSED
# Ensure update_position does not call update_cover_status when the cover is not moving
await mock_polling_rpc_update(hass, freezer, RPC_COVER_UPDATE_TIME_SEC)
mock_rpc_device.update_cover_status.assert_not_called()
async def test_rpc_not_initialized_update(
hass: HomeAssistant,
mock_rpc_device: Mock,
monkeypatch: pytest.MonkeyPatch,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test update not called when device is not initialized."""
entity_id = "cover.test_name_test_cover_0"
await init_integration(hass, 2)
@@ -311,138 +366,14 @@ async def test_update_position_closing(
monkeypatch, mock_rpc_device, "cover:0", "state", "closing"
)
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cover:0", "current_pos", 40)
mock_rpc_device.mock_update()
assert (state := hass.states.get(entity_id))
assert state.state == CoverState.CLOSING
assert state.attributes[ATTR_CURRENT_POSITION] == 40
# Simulate position decrement
async def simulated_update(*args, **kwargs):
pos = mock_rpc_device.status["cover:0"]["current_pos"]
if pos > 0:
mutate_rpc_device_status(
monkeypatch, mock_rpc_device, "cover:0", "current_pos", pos - 10
)
else:
mutate_rpc_device_status(
monkeypatch, mock_rpc_device, "cover:0", "current_pos", 0
)
mutate_rpc_device_status(
monkeypatch, mock_rpc_device, "cover:0", "state", "closed"
)
# Patching the mock update_status method
monkeypatch.setattr(mock_rpc_device, "update_status", simulated_update)
# Simulate position updates during closing
for position in range(40, -1, -10):
assert (state := hass.states.get(entity_id))
assert state.attributes[ATTR_CURRENT_POSITION] == position
assert state.state == CoverState.CLOSING
await mock_polling_rpc_update(hass, freezer, RPC_COVER_UPDATE_TIME_SEC)
# Final state should be closed
assert (state := hass.states.get(entity_id))
assert state.state == CoverState.CLOSED
assert state.attributes[ATTR_CURRENT_POSITION] == 0
async def test_update_position_opening(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
mock_rpc_device: Mock,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test update_position while the cover is opening."""
entity_id = "cover.test_name_test_cover_0"
await init_integration(hass, 2)
# Set initial state to opening at 60
mutate_rpc_device_status(
monkeypatch, mock_rpc_device, "cover:0", "state", "opening"
)
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cover:0", "current_pos", 60)
mock_rpc_device.mock_update()
assert (state := hass.states.get(entity_id))
assert state.state == CoverState.OPENING
assert state.attributes[ATTR_CURRENT_POSITION] == 60
# Simulate position increment
async def simulated_update(*args, **kwargs):
pos = mock_rpc_device.status["cover:0"]["current_pos"]
if pos < 100:
mutate_rpc_device_status(
monkeypatch, mock_rpc_device, "cover:0", "current_pos", pos + 10
)
else:
mutate_rpc_device_status(
monkeypatch, mock_rpc_device, "cover:0", "current_pos", 100
)
mutate_rpc_device_status(
monkeypatch, mock_rpc_device, "cover:0", "state", "open"
)
# Patching the mock update_status method
monkeypatch.setattr(mock_rpc_device, "update_status", simulated_update)
# Check position updates during opening
for position in range(60, 101, 10):
assert (state := hass.states.get(entity_id))
assert state.attributes[ATTR_CURRENT_POSITION] == position
assert state.state == CoverState.OPENING
await mock_polling_rpc_update(hass, freezer, RPC_COVER_UPDATE_TIME_SEC)
# Final state should be open
assert (state := hass.states.get(entity_id))
assert state.state == CoverState.OPEN
assert state.attributes[ATTR_CURRENT_POSITION] == 100
async def test_update_position_no_movement(
hass: HomeAssistant, mock_rpc_device: Mock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Test update_position when the cover is not moving."""
entity_id = "cover.test_name_test_cover_0"
await init_integration(hass, 2)
# Set initial state to open
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cover:0", "state", "open")
mutate_rpc_device_status(
monkeypatch, mock_rpc_device, "cover:0", "current_pos", 100
)
mock_rpc_device.mock_update()
assert (state := hass.states.get(entity_id))
assert state.state == CoverState.OPEN
assert state.attributes[ATTR_CURRENT_POSITION] == 100
# Call update_position and ensure no changes occur
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_OPEN_COVER,
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
assert (state := hass.states.get(entity_id))
assert state.state == CoverState.OPEN
assert state.attributes[ATTR_CURRENT_POSITION] == 100
async def test_rpc_not_initialized_update(
hass: HomeAssistant, mock_rpc_device: Mock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Test update not called when device is not initialized."""
entity_id = "cover.test_name_test_cover_0"
await init_integration(hass, 2)
assert (state := hass.states.get(entity_id))
assert state.state == CoverState.OPEN
# mock device not initialized (e.g. disconnected)
monkeypatch.setattr(mock_rpc_device, "initialized", False)
mock_rpc_device.mock_update()
# wait for update interval to allow update_position to call update_cover_status
await mock_polling_rpc_update(hass, freezer, RPC_COVER_UPDATE_TIME_SEC)
mock_rpc_device.update_cover_status.assert_not_called()
assert (state := hass.states.get(entity_id))
assert state.state == STATE_UNAVAILABLE