Upgrade paho-mqtt API to v2 (#137613)

* Upgrade paho-mqtt API to v2

* Refactor on_connect callback

* Add tests

* Fix Tasmota tests
This commit is contained in:
Jan Bouwhuis
2025-02-13 22:13:19 +01:00
committed by GitHub
parent bbbad90ca2
commit d6b7762dd6
9 changed files with 171 additions and 93 deletions

View File

@@ -311,8 +311,8 @@ class MqttClientSetup:
client_id = None
transport: str = config.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
self._client = AsyncMQTTClient(
mqtt.CallbackAPIVersion.VERSION1,
client_id,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id=client_id,
protocol=proto,
transport=transport, # type: ignore[arg-type]
reconnect_on_failure=False,
@@ -476,9 +476,9 @@ class MQTT:
mqttc.on_connect = self._async_mqtt_on_connect
mqttc.on_disconnect = self._async_mqtt_on_disconnect
mqttc.on_message = self._async_mqtt_on_message
mqttc.on_publish = self._async_mqtt_on_callback
mqttc.on_subscribe = self._async_mqtt_on_callback
mqttc.on_unsubscribe = self._async_mqtt_on_callback
mqttc.on_publish = self._async_mqtt_on_publish
mqttc.on_subscribe = self._async_mqtt_on_subscribe_unsubscribe
mqttc.on_unsubscribe = self._async_mqtt_on_subscribe_unsubscribe
# suppress exceptions at callback
mqttc.suppress_exceptions = True
@@ -498,7 +498,7 @@ class MQTT:
def _async_reader_callback(self, client: mqtt.Client) -> None:
"""Handle reading data from the socket."""
if (status := client.loop_read(MAX_PACKETS_TO_READ)) != 0:
self._async_on_disconnect(status)
self._async_handle_callback_exception(status)
@callback
def _async_start_misc_periodic(self) -> None:
@@ -593,7 +593,7 @@ class MQTT:
def _async_writer_callback(self, client: mqtt.Client) -> None:
"""Handle writing data to the socket."""
if (status := client.loop_write()) != 0:
self._async_on_disconnect(status)
self._async_handle_callback_exception(status)
def _on_socket_register_write(
self, client: mqtt.Client, userdata: Any, sock: SocketType
@@ -983,9 +983,9 @@ class MQTT:
self,
_mqttc: mqtt.Client,
_userdata: None,
_flags: dict[str, int],
result_code: int,
properties: mqtt.Properties | None = None,
_connect_flags: mqtt.ConnectFlags,
reason_code: mqtt.ReasonCode,
_properties: mqtt.Properties | None = None,
) -> None:
"""On connect callback.
@@ -993,19 +993,20 @@ class MQTT:
message.
"""
# pylint: disable-next=import-outside-toplevel
import paho.mqtt.client as mqtt
if result_code != mqtt.CONNACK_ACCEPTED:
if result_code in (
mqtt.CONNACK_REFUSED_BAD_USERNAME_PASSWORD,
mqtt.CONNACK_REFUSED_NOT_AUTHORIZED,
):
if reason_code.is_failure:
# 24: Continue authentication
# 25: Re-authenticate
# 134: Bad user name or password
# 135: Not authorized
# 140: Bad authentication method
if reason_code.value in (24, 25, 134, 135, 140):
self._should_reconnect = False
self.hass.async_create_task(self.async_disconnect())
self.config_entry.async_start_reauth(self.hass)
_LOGGER.error(
"Unable to connect to the MQTT broker: %s",
mqtt.connack_string(result_code),
reason_code.getName(), # type: ignore[no-untyped-call]
)
self._async_connection_result(False)
return
@@ -1016,7 +1017,7 @@ class MQTT:
"Connected to MQTT server %s:%s (%s)",
self.conf[CONF_BROKER],
self.conf.get(CONF_PORT, DEFAULT_PORT),
result_code,
reason_code,
)
birth: dict[str, Any]
@@ -1153,18 +1154,32 @@ class MQTT:
self._mqtt_data.state_write_requests.process_write_state_requests(msg)
@callback
def _async_mqtt_on_callback(
def _async_mqtt_on_publish(
self,
_mqttc: mqtt.Client,
_userdata: None,
mid: int,
_granted_qos_reason: tuple[int, ...] | mqtt.ReasonCodes | None = None,
_properties_reason: mqtt.ReasonCodes | None = None,
_reason_code: mqtt.ReasonCode,
_properties: mqtt.Properties | None,
) -> None:
"""Publish callback."""
self._async_mqtt_on_callback(mid)
@callback
def _async_mqtt_on_subscribe_unsubscribe(
self,
_mqttc: mqtt.Client,
_userdata: None,
mid: int,
_reason_code: list[mqtt.ReasonCode],
_properties: mqtt.Properties | None,
) -> None:
"""Subscribe / Unsubscribe callback."""
self._async_mqtt_on_callback(mid)
@callback
def _async_mqtt_on_callback(self, mid: int) -> None:
"""Publish / Subscribe / Unsubscribe callback."""
# The callback signature for on_unsubscribe is different from on_subscribe
# see https://github.com/eclipse/paho.mqtt.python/issues/687
# properties and reason codes are not used in Home Assistant
future = self._async_get_mid_future(mid)
if future.done() and (future.cancelled() or future.exception()):
# Timed out or cancelled
@@ -1180,19 +1195,28 @@ class MQTT:
self._pending_operations[mid] = future
return future
@callback
def _async_handle_callback_exception(self, status: mqtt.MQTTErrorCode) -> None:
"""Handle a callback exception."""
# We don't import on the top because some integrations
# should be able to optionally rely on MQTT.
import paho.mqtt.client as mqtt # pylint: disable=import-outside-toplevel
_LOGGER.warning(
"Error returned from MQTT server: %s",
mqtt.error_string(status),
)
@callback
def _async_mqtt_on_disconnect(
self,
_mqttc: mqtt.Client,
_userdata: None,
result_code: int,
_disconnect_flags: mqtt.DisconnectFlags,
reason_code: mqtt.ReasonCode,
properties: mqtt.Properties | None = None,
) -> None:
"""Disconnected callback."""
self._async_on_disconnect(result_code)
@callback
def _async_on_disconnect(self, result_code: int) -> None:
if not self.connected:
# This function is re-entrant and may be called multiple times
# when there is a broken pipe error.
@@ -1203,11 +1227,11 @@ class MQTT:
self.connected = False
async_dispatcher_send(self.hass, MQTT_CONNECTION_STATE, False)
_LOGGER.log(
logging.INFO if result_code == 0 else logging.DEBUG,
logging.INFO if reason_code == 0 else logging.DEBUG,
"Disconnected from MQTT server %s:%s (%s)",
self.conf[CONF_BROKER],
self.conf.get(CONF_PORT, DEFAULT_PORT),
result_code,
reason_code,
)
@callback