Add tilt control for RFXtrx Rfy venetian blinds

This commit is contained in:
dzukero
2020-12-15 18:03:06 +02:00
parent 6fb6d771fd
commit ba61dbf860
8 changed files with 340 additions and 4 deletions

View File

@@ -48,6 +48,8 @@ from .const import (
CONF_OFF_DELAY,
CONF_REMOVE_DEVICE,
CONF_SIGNAL_REPETITIONS,
CONF_VENETIAN_BLIND_MODE,
CONST_VENETIAN_BLIND_MODE_DEFAULT,
DATA_CLEANUP_CALLBACKS,
DATA_LISTENER,
DATA_RFXOBJECT,
@@ -126,6 +128,9 @@ DEVICE_DATA_SCHEMA = vol.Schema(
vol.Optional(CONF_COMMAND_ON): cv.byte,
vol.Optional(CONF_COMMAND_OFF): cv.byte,
vol.Optional(CONF_SIGNAL_REPETITIONS, default=1): cv.positive_int,
vol.Optional(
CONF_VENETIAN_BLIND_MODE, default=CONST_VENETIAN_BLIND_MODE_DEFAULT
): cv.string,
}
)

View File

@@ -40,6 +40,10 @@ from .const import (
CONF_REMOVE_DEVICE,
CONF_REPLACE_DEVICE,
CONF_SIGNAL_REPETITIONS,
CONF_VENETIAN_BLIND_MODE,
CONST_VENETIAN_BLIND_MODE_DEFAULT,
CONST_VENETIAN_BLIND_MODE_EU,
CONST_VENETIAN_BLIND_MODE_US,
DEVICE_PACKET_TYPE_LIGHTING4,
)
from .cover import supported as cover_supported
@@ -218,6 +222,10 @@ class OptionsFlow(config_entries.OptionsFlow):
device[CONF_COMMAND_ON] = command_on
if command_off:
device[CONF_COMMAND_OFF] = command_off
if user_input.get(CONF_VENETIAN_BLIND_MODE):
device[CONF_VENETIAN_BLIND_MODE] = user_input[
CONF_VENETIAN_BLIND_MODE
]
self.update_config_data(
global_options=self._global_options, devices=devices
@@ -282,6 +290,23 @@ class OptionsFlow(config_entries.OptionsFlow):
}
)
if isinstance(self._selected_device_object.device, rfxtrxmod.RfyDevice):
data_schema.update(
{
vol.Optional(
CONF_VENETIAN_BLIND_MODE,
default=device_data.get(
CONF_VENETIAN_BLIND_MODE, CONST_VENETIAN_BLIND_MODE_DEFAULT
),
): vol.In(
[
CONST_VENETIAN_BLIND_MODE_DEFAULT,
CONST_VENETIAN_BLIND_MODE_US,
CONST_VENETIAN_BLIND_MODE_EU,
]
),
}
)
devices = {
entry.id: entry.name_by_user if entry.name_by_user else entry.name
for entry in self._device_entries

View File

@@ -6,10 +6,15 @@ CONF_AUTOMATIC_ADD = "automatic_add"
CONF_SIGNAL_REPETITIONS = "signal_repetitions"
CONF_DEBUG = "debug"
CONF_OFF_DELAY = "off_delay"
CONF_VENETIAN_BLIND_MODE = "venetian_blind_mode"
CONF_REMOVE_DEVICE = "remove_device"
CONF_REPLACE_DEVICE = "replace_device"
CONST_VENETIAN_BLIND_MODE_DEFAULT = "Unknown"
CONST_VENETIAN_BLIND_MODE_EU = "EU"
CONST_VENETIAN_BLIND_MODE_US = "US"
COMMAND_ON_LIST = [
"On",
"Up",

View File

@@ -14,7 +14,13 @@ from . import (
get_device_id,
get_rfx_object,
)
from .const import COMMAND_OFF_LIST, COMMAND_ON_LIST
from .const import (
COMMAND_OFF_LIST,
COMMAND_ON_LIST,
CONF_VENETIAN_BLIND_MODE,
CONST_VENETIAN_BLIND_MODE_EU,
CONST_VENETIAN_BLIND_MODE_US,
)
_LOGGER = logging.getLogger(__name__)
@@ -50,7 +56,10 @@ async def async_setup_entry(
device_ids.add(device_id)
entity = RfxtrxCover(
event.device, device_id, entity_info[CONF_SIGNAL_REPETITIONS]
event.device,
device_id,
signal_repetitions=entity_info[CONF_SIGNAL_REPETITIONS],
venetian_blind_mode=entity_info.get(CONF_VENETIAN_BLIND_MODE),
)
entities.append(entity)
@@ -86,6 +95,18 @@ async def async_setup_entry(
class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
"""Representation of a RFXtrx cover."""
def __init__(
self,
device,
device_id,
signal_repetitions,
event=None,
venetian_blind_mode=None,
):
"""Initialize the RFXtrx cover device."""
super().__init__(device, device_id, signal_repetitions, event)
self._venetian_blind_mode = venetian_blind_mode
async def async_added_to_hass(self):
"""Restore device state."""
await super().async_added_to_hass()
@@ -95,6 +116,18 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
if old_state is not None:
self._state = old_state.state == STATE_OPEN
@property
def current_cover_tilt_position(self):
"""Return current position of cover tilt.
None is unknown, 0 is closed, 100 is fully open.
"""
if self._venetian_blind_mode in (
CONST_VENETIAN_BLIND_MODE_US,
CONST_VENETIAN_BLIND_MODE_EU,
):
return 50 # Rfy does not support tilt position. The non-empty value is used to enable tilt support.
@property
def is_closed(self):
"""Return if the cover is closed."""
@@ -102,13 +135,23 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
async def async_open_cover(self, **kwargs):
"""Move the cover up."""
await self._async_send(self._device.send_open)
if self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_US:
await self._async_send(self._device.send_up05sec)
elif self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_EU:
await self._async_send(self._device.send_up2sec)
else:
await self._async_send(self._device.send_open)
self._state = True
self.async_write_ha_state()
async def async_close_cover(self, **kwargs):
"""Move the cover down."""
await self._async_send(self._device.send_close)
if self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_US:
await self._async_send(self._device.send_down05sec)
elif self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_EU:
await self._async_send(self._device.send_down2sec)
else:
await self._async_send(self._device.send_close)
self._state = False
self.async_write_ha_state()
@@ -118,6 +161,20 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
self._state = True
self.async_write_ha_state()
async def async_open_cover_tilt(self, **kwargs):
"""Tilt the cover up."""
if self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_US:
await self._async_send(self._device.send_up2sec)
elif self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_EU:
await self._async_send(self._device.send_up05sec)
async def async_close_cover_tilt(self, **kwargs):
"""Tilt the cover down."""
if self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_US:
await self._async_send(self._device.send_down2sec)
elif self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_EU:
await self._async_send(self._device.send_down05sec)
def _apply_event(self, event):
"""Apply command from rfxtrx."""
super()._apply_event(event)

View File

@@ -56,6 +56,7 @@
"command_on": "Data bits value for command on",
"command_off": "Data bits value for command off",
"signal_repetitions": "Number of signal repetitions",
"venetian_blind_mode": "Venetian blind mode",
"replace_device": "Select device to replace"
},
"title": "Configure device options"

View File

@@ -64,6 +64,7 @@
"off_delay": "Off delay",
"off_delay_enabled": "Enable off delay",
"replace_device": "Select device to replace",
"venetian_blind_mode": "Venetian blind mode (tilt by: US - long press, EU - short press)",
"signal_repetitions": "Number of signal repetitions"
},
"title": "Configure device options"

View File

@@ -1103,6 +1103,87 @@ async def test_options_add_and_configure_device(hass):
assert "delay_off" not in entry.data["devices"]["0913000022670e013970"]
async def test_options_configure_rfy_cover_device(hass):
"""Test we can configure the venetion blind mode of an Rfy cover."""
await setup.async_setup_component(hass, "persistent_notification", {})
entry = MockConfigEntry(
domain=DOMAIN,
data={
"host": None,
"port": None,
"device": "/dev/tty123",
"automatic_add": False,
"devices": {},
},
unique_id=DOMAIN,
)
entry.add_to_hass(hass)
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] == "form"
assert result["step_id"] == "prompt_options"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
"automatic_add": True,
"event_code": "071a000001020301",
},
)
assert result["type"] == "form"
assert result["step_id"] == "set_device_options"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
"fire_event": False,
"venetian_blind_mode": "EU",
},
)
await hass.async_block_till_done()
assert entry.data["devices"]["071a000001020301"]["venetian_blind_mode"] == "EU"
device_registry = await async_get_device_registry(hass)
device_entries = async_entries_for_config_entry(device_registry, entry.entry_id)
assert device_entries[0].id
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] == "form"
assert result["step_id"] == "prompt_options"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
"automatic_add": False,
"device": device_entries[0].id,
},
)
assert result["type"] == "form"
assert result["step_id"] == "set_device_options"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
"fire_event": False,
"venetian_blind_mode": "EU",
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
await hass.async_block_till_done()
assert entry.data["devices"]["071a000001020301"]["venetian_blind_mode"] == "EU"
def test_get_serial_by_id_no_dir():
"""Test serial by id conversion if there's no /dev/serial/by-id."""
p1 = patch("os.path.isdir", MagicMock(return_value=False))

View File

@@ -140,3 +140,164 @@ async def test_duplicate_cover(hass, rfxtrx):
assert state
assert state.state == "closed"
assert state.attributes.get("friendly_name") == "LightwaveRF, Siemens 0213c7:242"
async def test_rfy_cover(hass, rfxtrx):
"""Test Rfy venetian blind covers."""
entry_data = create_rfx_test_cfg(
devices={
"071a000001020301": {
"signal_repetitions": 1,
"venetian_blind_mode": "Unknown",
},
"071a000001020302": {"signal_repetitions": 1, "venetian_blind_mode": "US"},
"071a000001020303": {"signal_repetitions": 1, "venetian_blind_mode": "EU"},
}
)
mock_entry = MockConfigEntry(domain="rfxtrx", unique_id=DOMAIN, data=entry_data)
mock_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_entry.entry_id)
await hass.async_block_till_done()
# Test a blind with no venetian mode setting
state = hass.states.get("cover.rfy_010203_1")
assert state
await hass.services.async_call(
"cover",
"stop_cover",
{"entity_id": "cover.rfy_010203_1"},
blocking=True,
)
await hass.services.async_call(
"cover",
"open_cover",
{"entity_id": "cover.rfy_010203_1"},
blocking=True,
)
await hass.services.async_call(
"cover",
"close_cover",
{"entity_id": "cover.rfy_010203_1"},
blocking=True,
)
await hass.services.async_call(
"cover",
"open_cover_tilt",
{"entity_id": "cover.rfy_010203_1"},
blocking=True,
)
await hass.services.async_call(
"cover",
"close_cover_tilt",
{"entity_id": "cover.rfy_010203_1"},
blocking=True,
)
assert rfxtrx.transport.send.mock_calls == [
call(bytearray(b"\x08\x1a\x00\x00\x01\x02\x03\x01\x00")),
call(bytearray(b"\x08\x1a\x00\x01\x01\x02\x03\x01\x01")),
call(bytearray(b"\x08\x1a\x00\x02\x01\x02\x03\x01\x03")),
]
# Test a blind with venetian mode set to US
state = hass.states.get("cover.rfy_010203_2")
assert state
rfxtrx.transport.send.mock_calls = []
await hass.services.async_call(
"cover",
"stop_cover",
{"entity_id": "cover.rfy_010203_2"},
blocking=True,
)
await hass.services.async_call(
"cover",
"open_cover",
{"entity_id": "cover.rfy_010203_2"},
blocking=True,
)
await hass.services.async_call(
"cover",
"close_cover",
{"entity_id": "cover.rfy_010203_2"},
blocking=True,
)
await hass.services.async_call(
"cover",
"open_cover_tilt",
{"entity_id": "cover.rfy_010203_2"},
blocking=True,
)
await hass.services.async_call(
"cover",
"close_cover_tilt",
{"entity_id": "cover.rfy_010203_2"},
blocking=True,
)
assert rfxtrx.transport.send.mock_calls == [
call(bytearray(b"\x08\x1a\x00\x00\x01\x02\x03\x02\x00")),
call(bytearray(b"\x08\x1a\x00\x01\x01\x02\x03\x02\x0F")),
call(bytearray(b"\x08\x1a\x00\x02\1x01\x02\x03\x02\x10")),
call(bytearray(b"\x08\x1a\x00\x03\x01\x02\x03\x02\x11")),
call(bytearray(b"\x08\x1a\x00\x04\1x01\x02\x03\x02\x12")),
]
# Test a blind with venetian mode set to EU
state = hass.states.get("cover.rfy_010203_2")
assert state
rfxtrx.transport.send.mock_calls = []
await hass.services.async_call(
"cover",
"stop_cover",
{"entity_id": "cover.rfy_010203_3"},
blocking=True,
)
await hass.services.async_call(
"cover",
"open_cover",
{"entity_id": "cover.rfy_010203_3"},
blocking=True,
)
await hass.services.async_call(
"cover",
"close_cover",
{"entity_id": "cover.rfy_010203_3"},
blocking=True,
)
await hass.services.async_call(
"cover",
"open_cover_tilt",
{"entity_id": "cover.rfy_010203_3"},
blocking=True,
)
await hass.services.async_call(
"cover",
"close_cover_tilt",
{"entity_id": "cover.rfy_010203_3"},
blocking=True,
)
assert rfxtrx.transport.send.mock_calls == [
call(bytearray(b"\x08\x1a\x00\x00\x01\x02\x03\x03\x00")),
call(bytearray(b"\x08\x1a\x00\x01\x01\x02\x03\x03\x11")),
call(bytearray(b"\x08\x1a\x00\x02\1x01\x02\x03\x03\x12")),
call(bytearray(b"\x08\x1a\x00\x03\x01\x02\x03\x03\x0F")),
call(bytearray(b"\x08\x1a\x00\x04\1x01\x02\x03\x03\x10")),
]