mirror of
https://github.com/home-assistant/core.git
synced 2026-05-03 19:41:15 +02:00
Add support for Telegram message attachments (#153216)
This commit is contained in:
@@ -7,7 +7,7 @@ import io
|
||||
import logging
|
||||
from ssl import SSLContext
|
||||
from types import MappingProxyType
|
||||
from typing import Any
|
||||
from typing import Any, cast
|
||||
|
||||
import httpx
|
||||
from telegram import (
|
||||
@@ -23,6 +23,7 @@ from telegram import (
|
||||
InputMediaVideo,
|
||||
InputPollOption,
|
||||
Message,
|
||||
PhotoSize,
|
||||
ReplyKeyboardMarkup,
|
||||
ReplyKeyboardRemove,
|
||||
Update,
|
||||
@@ -56,6 +57,10 @@ from .const import (
|
||||
ATTR_DISABLE_NOTIF,
|
||||
ATTR_DISABLE_WEB_PREV,
|
||||
ATTR_FILE,
|
||||
ATTR_FILE_ID,
|
||||
ATTR_FILE_MIME_TYPE,
|
||||
ATTR_FILE_NAME,
|
||||
ATTR_FILE_SIZE,
|
||||
ATTR_FROM_FIRST,
|
||||
ATTR_FROM_LAST,
|
||||
ATTR_INLINE_MESSAGE_ID,
|
||||
@@ -86,6 +91,7 @@ from .const import (
|
||||
CONF_CHAT_ID,
|
||||
CONF_PROXY_URL,
|
||||
DOMAIN,
|
||||
EVENT_TELEGRAM_ATTACHMENT,
|
||||
EVENT_TELEGRAM_CALLBACK,
|
||||
EVENT_TELEGRAM_COMMAND,
|
||||
EVENT_TELEGRAM_SENT,
|
||||
@@ -183,6 +189,10 @@ class BaseTelegramBot:
|
||||
# This is a command message - set event type to command and split data into command and args
|
||||
event_type = EVENT_TELEGRAM_COMMAND
|
||||
event_data.update(self._get_command_event_data(message.text))
|
||||
elif filters.ATTACHMENT.filter(message):
|
||||
event_type = EVENT_TELEGRAM_ATTACHMENT
|
||||
event_data[ATTR_TEXT] = message.caption
|
||||
event_data.update(self._get_file_id_event_data(message))
|
||||
else:
|
||||
event_type = EVENT_TELEGRAM_TEXT
|
||||
event_data[ATTR_TEXT] = message.text
|
||||
@@ -192,6 +202,26 @@ class BaseTelegramBot:
|
||||
|
||||
return event_type, event_data
|
||||
|
||||
def _get_file_id_event_data(self, message: Message) -> dict[str, Any]:
|
||||
"""Extract file_id from a message attachment, if any."""
|
||||
if filters.PHOTO.filter(message):
|
||||
photos = cast(Sequence[PhotoSize], message.effective_attachment)
|
||||
return {
|
||||
ATTR_FILE_ID: photos[-1].file_id,
|
||||
ATTR_FILE_MIME_TYPE: "image/jpeg", # telegram always uses jpeg for photos
|
||||
ATTR_FILE_SIZE: photos[-1].file_size,
|
||||
}
|
||||
return {
|
||||
k: getattr(message.effective_attachment, v)
|
||||
for k, v in (
|
||||
(ATTR_FILE_ID, "file_id"),
|
||||
(ATTR_FILE_NAME, "file_name"),
|
||||
(ATTR_FILE_MIME_TYPE, "mime_type"),
|
||||
(ATTR_FILE_SIZE, "file_size"),
|
||||
)
|
||||
if hasattr(message.effective_attachment, v)
|
||||
}
|
||||
|
||||
def _get_user_event_data(self, user: User) -> dict[str, Any]:
|
||||
return {
|
||||
ATTR_USER_ID: user.id,
|
||||
|
||||
@@ -54,6 +54,7 @@ SERVICE_LEAVE_CHAT = "leave_chat"
|
||||
EVENT_TELEGRAM_CALLBACK = "telegram_callback"
|
||||
EVENT_TELEGRAM_COMMAND = "telegram_command"
|
||||
EVENT_TELEGRAM_TEXT = "telegram_text"
|
||||
EVENT_TELEGRAM_ATTACHMENT = "telegram_attachment"
|
||||
EVENT_TELEGRAM_SENT = "telegram_sent"
|
||||
|
||||
PARSER_HTML = "html"
|
||||
@@ -90,6 +91,10 @@ ATTR_DISABLE_NOTIF = "disable_notification"
|
||||
ATTR_DISABLE_WEB_PREV = "disable_web_page_preview"
|
||||
ATTR_EDITED_MSG = "edited_message"
|
||||
ATTR_FILE = "file"
|
||||
ATTR_FILE_ID = "file_id"
|
||||
ATTR_FILE_MIME_TYPE = "file_mime_type"
|
||||
ATTR_FILE_NAME = "file_name"
|
||||
ATTR_FILE_SIZE = "file_size"
|
||||
ATTR_FROM_FIRST = "from_first"
|
||||
ATTR_FROM_LAST = "from_last"
|
||||
ATTR_KEYBOARD = "keyboard"
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
{
|
||||
"update_id": 2,
|
||||
"message": {
|
||||
"message_id": 2,
|
||||
"date": 1441645532,
|
||||
"from": {
|
||||
"id": 12345678,
|
||||
"is_bot": false,
|
||||
"last_name": "Test Lastname",
|
||||
"first_name": "Test Firstname",
|
||||
"username": "Testusername"
|
||||
},
|
||||
"chat": {
|
||||
"last_name": "Test Lastname",
|
||||
"id": 1111111,
|
||||
"type": "private",
|
||||
"first_name": "Test Firstname",
|
||||
"username": "Testusername"
|
||||
},
|
||||
"document": {
|
||||
"file_id": "AgACAgUAAxkBAAIBUWJ5aXl6Y3h5bXl6Y3h5bXl6Y3gAAJxvMRtP7nG4Fq7m0m0vBAAMCAAN5AAMjBA",
|
||||
"file_unique_id": "AQADcbzEbT-5xuBa",
|
||||
"mime_type": "application/pdf",
|
||||
"file_size": 123456
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
{
|
||||
"update_id": 1,
|
||||
"message": {
|
||||
"message_id": 1,
|
||||
"date": 1441645532,
|
||||
"from": {
|
||||
"id": 12345678,
|
||||
"is_bot": false,
|
||||
"last_name": "Test Lastname",
|
||||
"first_name": "Test Firstname",
|
||||
"username": "Testusername"
|
||||
},
|
||||
"chat": {
|
||||
"last_name": "Test Lastname",
|
||||
"id": 1111111,
|
||||
"type": "private",
|
||||
"first_name": "Test Firstname",
|
||||
"username": "Testusername"
|
||||
},
|
||||
"photo": [
|
||||
{
|
||||
"file_id": "AgACAgUAAxkBAAIBUWJ5aXl6Y3h5bXl6Y3h5bXl6Y3gAAJxvMRtP7nG4Fq7m0m0vBAAMCAAN5AAMjBA",
|
||||
"file_unique_id": "AQADcbzEbT-5xuBa",
|
||||
"file_size": 1234,
|
||||
"width": 90,
|
||||
"height": 90
|
||||
},
|
||||
{
|
||||
"file_id": "AgACAgUAAxkBAAIBUWJ5aXl6Y3h5bXl6Y3h5bXl6Y3gAAJxvMRtP7nG4Fq7m0m0vBAAMCAAN5AAMjBA",
|
||||
"file_unique_id": "AQADcbzEbT-5xuBa",
|
||||
"file_size": 12345,
|
||||
"width": 320,
|
||||
"height": 320
|
||||
},
|
||||
{
|
||||
"file_id": "AgACAgUAAxkBAAIBUWJ5aXl6Y3h5bXl6Y3h5bXl6Y3gAAJxvMRtP7nG4Fq7m0m0vBAAMCAAN5AAMjBA",
|
||||
"file_unique_id": "AQADcbzEbT-5xuBa",
|
||||
"file_size": 123456,
|
||||
"width": 800,
|
||||
"height": 800
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
@@ -91,9 +91,10 @@ from homeassistant.exceptions import (
|
||||
ServiceValidationError,
|
||||
)
|
||||
from homeassistant.setup import async_setup_component
|
||||
from homeassistant.util import json as json_util
|
||||
from homeassistant.util.file import write_utf8_file
|
||||
|
||||
from tests.common import MockConfigEntry, async_capture_events
|
||||
from tests.common import MockConfigEntry, async_capture_events, async_load_fixture
|
||||
from tests.typing import ClientSessionGenerator
|
||||
|
||||
|
||||
@@ -565,6 +566,52 @@ async def test_webhook_endpoint_generates_telegram_callback_event(
|
||||
assert isinstance(events[0].context, Context)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("attachment_type"),
|
||||
[
|
||||
("photo"),
|
||||
("document"),
|
||||
],
|
||||
)
|
||||
async def test_webhook_endpoint_generates_telegram_attachment_event(
|
||||
hass: HomeAssistant,
|
||||
webhook_platform: None,
|
||||
hass_client: ClientSessionGenerator,
|
||||
mock_generate_secret_token: str,
|
||||
attachment_type: str,
|
||||
) -> None:
|
||||
"""POST to the configured webhook endpoint and assert fired `telegram_attachment` event for photo and document."""
|
||||
client = await hass_client()
|
||||
events = async_capture_events(hass, "telegram_attachment")
|
||||
update_message_attachment = await async_load_fixture(
|
||||
hass, f"update_message_attachment_{attachment_type}.json", DOMAIN
|
||||
)
|
||||
|
||||
response = await client.post(
|
||||
f"{TELEGRAM_WEBHOOK_URL}_123456",
|
||||
data=update_message_attachment,
|
||||
headers={
|
||||
"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
assert response.status == 200
|
||||
assert (await response.read()).decode("utf-8") == ""
|
||||
|
||||
# Make sure event has fired
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(events) == 1
|
||||
loaded = json_util.json_loads(update_message_attachment)
|
||||
if attachment_type == "photo":
|
||||
expected_file_id = loaded["message"]["photo"][-1]["file_id"]
|
||||
else:
|
||||
expected_file_id = loaded["message"][attachment_type]["file_id"]
|
||||
|
||||
assert events[0].data["file_id"] == expected_file_id
|
||||
assert isinstance(events[0].context, Context)
|
||||
|
||||
|
||||
async def test_polling_platform_message_text_update(
|
||||
hass: HomeAssistant,
|
||||
config_polling,
|
||||
|
||||
Reference in New Issue
Block a user