Enable B009 (#144192)

This commit is contained in:
Joost Lekkerkerker
2025-05-21 17:37:51 +02:00
committed by GitHub
parent 1dbe1955eb
commit 4cd3527761
21 changed files with 40 additions and 35 deletions

View File

@ -572,7 +572,7 @@ def device_class_and_uom(
) -> tuple[SensorDeviceClass | None, str | None]:
"""Get native unit of measurement from telegram,."""
dsmr_object = getattr(data, entity_description.obis_reference)
uom: str | None = getattr(dsmr_object, "unit") or None
uom: str | None = dsmr_object.unit or None
with suppress(ValueError):
if entity_description.device_class == SensorDeviceClass.GAS and (
enery_uom := UnitOfEnergy(str(uom))

View File

@ -839,9 +839,9 @@ class MQTT:
"""Return a string with the exception message."""
# if msg_callback is a partial we return the name of the first argument
if isinstance(msg_callback, partial):
call_back_name = getattr(msg_callback.args[0], "__name__")
call_back_name = msg_callback.args[0].__name__
else:
call_back_name = getattr(msg_callback, "__name__")
call_back_name = msg_callback.__name__
return (
f"Exception in {call_back_name} when handling msg on "
f"'{msg.topic}': '{msg.payload}'" # type: ignore[str-bytes-safe]
@ -1109,7 +1109,7 @@ class MQTT:
# decoding the same topic multiple times.
topic = msg.topic
except UnicodeDecodeError:
bare_topic: bytes = getattr(msg, "_topic")
bare_topic: bytes = msg._topic # noqa: SLF001
_LOGGER.warning(
"Skipping received%s message on invalid topic %s (qos=%s): %s",
" retained" if msg.retain else "",

View File

@ -49,7 +49,7 @@ async def async_get_config_entry_diagnostics(
),
"data": {
ACCOUNT: async_redact_data(
getattr(data_handler.account, "raw_data"),
data_handler.account.raw_data,
TO_REDACT,
)
},

View File

@ -178,7 +178,8 @@ class NetatmoWeatherModuleEntity(NetatmoModuleEntity):
def __init__(self, device: NetatmoDevice) -> None:
"""Set up a Netatmo weather module entity."""
super().__init__(device)
category = getattr(self.device.device_category, "name")
assert self.device.device_category
category = self.device.device_category.name
self._publishers.extend(
[
{
@ -189,7 +190,7 @@ class NetatmoWeatherModuleEntity(NetatmoModuleEntity):
)
if hasattr(self.device, "place"):
place = cast(Place, getattr(self.device, "place"))
place = cast(Place, self.device.place)
if hasattr(place, "location") and place.location is not None:
self._attr_extra_state_attributes.update(
{

View File

@ -72,7 +72,9 @@ class NetatmoScheduleSelect(NetatmoBaseEntity, SelectEntity):
self._attr_unique_id = f"{self.home.entity_id}-schedule-select"
self._attr_current_option = getattr(self.home.get_selected_schedule(), "name")
schedule = self.home.get_selected_schedule()
assert schedule
self._attr_current_option = schedule.name
self._attr_options = [
schedule.name for schedule in self.home.schedules.values() if schedule.name
]
@ -98,12 +100,11 @@ class NetatmoScheduleSelect(NetatmoBaseEntity, SelectEntity):
return
if data["event_type"] == EVENT_TYPE_SCHEDULE and "schedule_id" in data:
self._attr_current_option = getattr(
self._attr_current_option = (
self.hass.data[DOMAIN][DATA_SCHEDULES][self.home.entity_id].get(
data["schedule_id"]
),
"name",
)
)
).name
self.async_write_ha_state()
async def async_select_option(self, option: str) -> None:
@ -125,7 +126,9 @@ class NetatmoScheduleSelect(NetatmoBaseEntity, SelectEntity):
@callback
def async_update_callback(self) -> None:
"""Update the entity's state."""
self._attr_current_option = getattr(self.home.get_selected_schedule(), "name")
schedule = self.home.get_selected_schedule()
assert schedule
self._attr_current_option = schedule.name
self.hass.data[DOMAIN][DATA_SCHEDULES][self.home.entity_id] = (
self.home.schedules
)

View File

@ -256,7 +256,7 @@ async def async_setup_entry( # noqa: C901
"""Log all scheduled in the event loop."""
with _increase_repr_limit():
handle: asyncio.Handle
for handle in getattr(hass.loop, "_scheduled"):
for handle in getattr(hass.loop, "_scheduled"): # noqa: B009
if not handle.cancelled():
_LOGGER.critical("Scheduled: %s", handle)

View File

@ -260,11 +260,12 @@ class Scanner:
for source_ip in await async_build_source_set(self.hass):
source_ip_str = str(source_ip)
if source_ip.version == 6:
assert source_ip.scope_id is not None
source_tuple: AddressTupleVXType = (
source_ip_str,
0,
0,
int(getattr(source_ip, "scope_id")),
int(source_ip.scope_id),
)
else:
source_tuple = (source_ip_str, 0)

View File

@ -170,11 +170,12 @@ class Server:
for source_ip in await async_build_source_set(self.hass):
source_ip_str = str(source_ip)
if source_ip.version == 6:
assert source_ip.scope_id is not None
source_tuple: AddressTupleVXType = (
source_ip_str,
0,
0,
int(getattr(source_ip, "scope_id")),
int(source_ip.scope_id),
)
else:
source_tuple = (source_ip_str, 0)

View File

@ -378,7 +378,7 @@ def _get_annotation(item: Any) -> tuple[str, int | str] | None:
if not hasattr(item, "__config_file__"):
return None
return (getattr(item, "__config_file__"), getattr(item, "__line__", "?"))
return (item.__config_file__, getattr(item, "__line__", "?"))
def _get_by_path(data: dict | list, items: list[Hashable]) -> Any:

View File

@ -452,7 +452,7 @@ class HomeAssistant:
self.import_executor = InterruptibleThreadPoolExecutor(
max_workers=1, thread_name_prefix="ImportExecutor"
)
self.loop_thread_id = getattr(self.loop, "_thread_id")
self.loop_thread_id = self.loop._thread_id # type: ignore[attr-defined] # noqa: SLF001
def verify_event_loop_thread(self, what: str) -> None:
"""Report and raise if we are not running in the event loop thread."""

View File

@ -381,7 +381,7 @@ class CachedProperties(type):
for parent in cls.__mro__[:0:-1]:
if "_CachedProperties__cached_properties" not in parent.__dict__:
continue
cached_properties = getattr(parent, "_CachedProperties__cached_properties")
cached_properties = getattr(parent, "_CachedProperties__cached_properties") # noqa: B009
for property_name in cached_properties:
if property_name in seen_props:
continue

View File

@ -160,7 +160,7 @@ class Throttle:
If we cannot acquire the lock, it is running so return None.
"""
if hasattr(method, "__self__"):
host = getattr(method, "__self__")
host = method.__self__
elif is_func:
host = wrapper
else:

View File

@ -707,6 +707,7 @@ select = [
"B002", # Python does not support the unary prefix increment
"B005", # Using .strip() with multi-character strings is misleading
"B007", # Loop control variable {name} not used within loop body
"B009", # Do not call getattr with a constant attribute value. It is not any safer than normal property access.
"B014", # Exception handler with duplicate exception
"B015", # Pointless comparison. Did you mean to assign a value? Otherwise, prepend assert or remove it.
"B017", # pytest.raises(BaseException) should be considered evil

View File

@ -60,7 +60,7 @@ async def test_numbers_implementation(
blocking=True,
)
mocked_method = getattr(mock_flexit_bacnet, "set_fan_setpoint_supply_air_fire")
mocked_method = mock_flexit_bacnet.set_fan_setpoint_supply_air_fire
assert len(mocked_method.mock_calls) == 1
assert hass.states.get(ENTITY_ID).state == "60"
@ -76,7 +76,7 @@ async def test_numbers_implementation(
blocking=True,
)
mocked_method = getattr(mock_flexit_bacnet, "set_fan_setpoint_supply_air_fire")
mocked_method = mock_flexit_bacnet.set_fan_setpoint_supply_air_fire
assert len(mocked_method.mock_calls) == 2
assert hass.states.get(ENTITY_ID).state == "40"
@ -94,7 +94,7 @@ async def test_numbers_implementation(
blocking=True,
)
mocked_method = getattr(mock_flexit_bacnet, "set_fan_setpoint_supply_air_fire")
mocked_method = mock_flexit_bacnet.set_fan_setpoint_supply_air_fire
assert len(mocked_method.mock_calls) == 3
mock_flexit_bacnet.set_fan_setpoint_supply_air_fire.side_effect = None

View File

@ -59,7 +59,7 @@ async def test_switches_implementation(
blocking=True,
)
mocked_method = getattr(mock_flexit_bacnet, "disable_electric_heater")
mocked_method = mock_flexit_bacnet.disable_electric_heater
assert len(mocked_method.mock_calls) == 1
assert hass.states.get(ENTITY_ID).state == STATE_OFF
@ -73,7 +73,7 @@ async def test_switches_implementation(
blocking=True,
)
mocked_method = getattr(mock_flexit_bacnet, "enable_electric_heater")
mocked_method = mock_flexit_bacnet.enable_electric_heater
assert len(mocked_method.mock_calls) == 1
assert hass.states.get(ENTITY_ID).state == STATE_ON
@ -88,7 +88,7 @@ async def test_switches_implementation(
blocking=True,
)
mocked_method = getattr(mock_flexit_bacnet, "disable_electric_heater")
mocked_method = mock_flexit_bacnet.disable_electric_heater
assert len(mocked_method.mock_calls) == 2
mock_flexit_bacnet.disable_electric_heater.side_effect = None
@ -114,7 +114,7 @@ async def test_switches_implementation(
blocking=True,
)
mocked_method = getattr(mock_flexit_bacnet, "enable_electric_heater")
mocked_method = mock_flexit_bacnet.enable_electric_heater
assert len(mocked_method.mock_calls) == 2
mock_flexit_bacnet.enable_electric_heater.side_effect = None

View File

@ -68,9 +68,7 @@ async def test_button_states_and_commands(
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state.state == "2023-06-05T00:16:00+00:00"
getattr(mock_automower_client.commands, "error_confirm").side_effect = ApiError(
"Test error"
)
mock_automower_client.commands.error_confirm.side_effect = ApiError("Test error")
with pytest.raises(
HomeAssistantError,
match="Failed to send command: Test error",

View File

@ -37,7 +37,7 @@ async def test_unload_entry(hass: HomeAssistant, mock_account: MagicMock) -> Non
{ATTR_ENTITY_ID: VACUUM_ENTITY_ID},
blocking=True,
)
getattr(mock_account.robots[0], "start_cleaning").assert_called_once()
mock_account.robots[0].start_cleaning.assert_called_once()
assert await hass.config_entries.async_unload(entry.entry_id)

View File

@ -241,7 +241,7 @@ async def test_websocket_resolve_media(
# Validate url is relative and signed.
assert msg["result"]["url"][0] == "/"
parsed = yarl.URL(msg["result"]["url"])
assert parsed.path == getattr(media, "url")
assert parsed.path == media.url
assert "authSig" in parsed.query
with patch(

View File

@ -52,4 +52,4 @@ async def test_entity_update(
{ATTR_ENTITY_ID: f"{platform.name.lower()}.{name}_{entity}"},
blocking=True,
)
getattr(mock_motion_device, "status_query").assert_called_once_with()
mock_motion_device.status_query.assert_called_once_with()

View File

@ -3605,7 +3605,7 @@ async def test_track_time_interval_name(hass: HomeAssistant) -> None:
timedelta(seconds=10),
name=unique_string,
)
scheduled = getattr(hass.loop, "_scheduled")
scheduled = hass.loop._scheduled
assert any(handle for handle in scheduled if unique_string in str(handle))
unsub()

View File

@ -832,7 +832,7 @@ async def test_configuration_legacy_template_is_removed(hass: HomeAssistant) ->
},
)
assert not getattr(hass.config, "legacy_templates")
assert not hass.config.legacy_templates
async def test_config_defaults() -> None: