mirror of
https://github.com/home-assistant/core.git
synced 2026-04-29 02:13:44 +02:00
Move agent functionality from http (#153917)
This commit is contained in:
@@ -38,9 +38,11 @@ from home_assistant_intents import (
|
||||
ErrorKey,
|
||||
FuzzyConfig,
|
||||
FuzzyLanguageResponses,
|
||||
LanguageScores,
|
||||
get_fuzzy_config,
|
||||
get_fuzzy_language,
|
||||
get_intents,
|
||||
get_language_scores,
|
||||
get_languages,
|
||||
)
|
||||
import yaml
|
||||
@@ -59,6 +61,7 @@ from homeassistant.core import (
|
||||
)
|
||||
from homeassistant.helpers import (
|
||||
area_registry as ar,
|
||||
config_validation as cv,
|
||||
device_registry as dr,
|
||||
entity_registry as er,
|
||||
floor_registry as fr,
|
||||
@@ -343,6 +346,81 @@ class DefaultAgent(ConversationEntity):
|
||||
|
||||
return result
|
||||
|
||||
async def async_debug_recognize(
|
||||
self, user_input: ConversationInput
|
||||
) -> dict[str, Any] | None:
|
||||
"""Debug recognize from user input."""
|
||||
result_dict: dict[str, Any] | None = None
|
||||
|
||||
if trigger_result := await self.async_recognize_sentence_trigger(user_input):
|
||||
result_dict = {
|
||||
# Matched a user-defined sentence trigger.
|
||||
# We can't provide the response here without executing the
|
||||
# trigger.
|
||||
"match": True,
|
||||
"source": "trigger",
|
||||
"sentence_template": trigger_result.sentence_template or "",
|
||||
}
|
||||
elif intent_result := await self.async_recognize_intent(user_input):
|
||||
successful_match = not intent_result.unmatched_entities
|
||||
result_dict = {
|
||||
# Name of the matching intent (or the closest)
|
||||
"intent": {
|
||||
"name": intent_result.intent.name,
|
||||
},
|
||||
# Slot values that would be received by the intent
|
||||
"slots": { # direct access to values
|
||||
entity_key: entity.text or entity.value
|
||||
for entity_key, entity in intent_result.entities.items()
|
||||
},
|
||||
# Extra slot details, such as the originally matched text
|
||||
"details": {
|
||||
entity_key: {
|
||||
"name": entity.name,
|
||||
"value": entity.value,
|
||||
"text": entity.text,
|
||||
}
|
||||
for entity_key, entity in intent_result.entities.items()
|
||||
},
|
||||
# Entities/areas/etc. that would be targeted
|
||||
"targets": {},
|
||||
# True if match was successful
|
||||
"match": successful_match,
|
||||
# Text of the sentence template that matched (or was closest)
|
||||
"sentence_template": "",
|
||||
# When match is incomplete, this will contain the best slot guesses
|
||||
"unmatched_slots": _get_unmatched_slots(intent_result),
|
||||
# True if match was not exact
|
||||
"fuzzy_match": False,
|
||||
}
|
||||
|
||||
if successful_match:
|
||||
result_dict["targets"] = {
|
||||
state.entity_id: {"matched": is_matched}
|
||||
for state, is_matched in _get_debug_targets(
|
||||
self.hass, intent_result
|
||||
)
|
||||
}
|
||||
|
||||
if intent_result.intent_sentence is not None:
|
||||
result_dict["sentence_template"] = intent_result.intent_sentence.text
|
||||
|
||||
if intent_result.intent_metadata:
|
||||
# Inspect metadata to determine if this matched a custom sentence
|
||||
if intent_result.intent_metadata.get(METADATA_CUSTOM_SENTENCE):
|
||||
result_dict["source"] = "custom"
|
||||
result_dict["file"] = intent_result.intent_metadata.get(
|
||||
METADATA_CUSTOM_FILE
|
||||
)
|
||||
else:
|
||||
result_dict["source"] = "builtin"
|
||||
|
||||
result_dict["fuzzy_match"] = intent_result.intent_metadata.get(
|
||||
METADATA_FUZZY_MATCH, False
|
||||
)
|
||||
|
||||
return result_dict
|
||||
|
||||
async def _async_handle_message(
|
||||
self,
|
||||
user_input: ConversationInput,
|
||||
@@ -1529,6 +1607,10 @@ class DefaultAgent(ConversationEntity):
|
||||
return None
|
||||
return response
|
||||
|
||||
async def async_get_language_scores(self) -> dict[str, LanguageScores]:
|
||||
"""Get support scores per language."""
|
||||
return await self.hass.async_add_executor_job(get_language_scores)
|
||||
|
||||
|
||||
def _make_error_result(
|
||||
language: str,
|
||||
@@ -1725,3 +1807,75 @@ def _collect_list_references(expression: Expression, list_names: set[str]) -> No
|
||||
elif isinstance(expression, ListReference):
|
||||
# {list}
|
||||
list_names.add(expression.slot_name)
|
||||
|
||||
|
||||
def _get_debug_targets(
|
||||
hass: HomeAssistant,
|
||||
result: RecognizeResult,
|
||||
) -> Iterable[tuple[State, bool]]:
|
||||
"""Yield state/is_matched pairs for a hassil recognition."""
|
||||
entities = result.entities
|
||||
|
||||
name: str | None = None
|
||||
area_name: str | None = None
|
||||
domains: set[str] | None = None
|
||||
device_classes: set[str] | None = None
|
||||
state_names: set[str] | None = None
|
||||
|
||||
if "name" in entities:
|
||||
name = str(entities["name"].value)
|
||||
|
||||
if "area" in entities:
|
||||
area_name = str(entities["area"].value)
|
||||
|
||||
if "domain" in entities:
|
||||
domains = set(cv.ensure_list(entities["domain"].value))
|
||||
|
||||
if "device_class" in entities:
|
||||
device_classes = set(cv.ensure_list(entities["device_class"].value))
|
||||
|
||||
if "state" in entities:
|
||||
# HassGetState only
|
||||
state_names = set(cv.ensure_list(entities["state"].value))
|
||||
|
||||
if (
|
||||
(name is None)
|
||||
and (area_name is None)
|
||||
and (not domains)
|
||||
and (not device_classes)
|
||||
and (not state_names)
|
||||
):
|
||||
# Avoid "matching" all entities when there is no filter
|
||||
return
|
||||
|
||||
states = intent.async_match_states(
|
||||
hass,
|
||||
name=name,
|
||||
area_name=area_name,
|
||||
domains=domains,
|
||||
device_classes=device_classes,
|
||||
)
|
||||
|
||||
for state in states:
|
||||
# For queries, a target is "matched" based on its state
|
||||
is_matched = (state_names is None) or (state.state in state_names)
|
||||
yield state, is_matched
|
||||
|
||||
|
||||
def _get_unmatched_slots(
|
||||
result: RecognizeResult,
|
||||
) -> dict[str, str | int | float]:
|
||||
"""Return a dict of unmatched text/range slot entities."""
|
||||
unmatched_slots: dict[str, str | int | float] = {}
|
||||
for entity in result.unmatched_entities_list:
|
||||
if isinstance(entity, UnmatchedTextEntity):
|
||||
if entity.text == MISSING_ENTITY:
|
||||
# Don't report <missing> since these are just missing context
|
||||
# slots.
|
||||
continue
|
||||
|
||||
unmatched_slots[entity.name] = entity.text
|
||||
elif isinstance(entity, UnmatchedRangeEntity):
|
||||
unmatched_slots[entity.name] = entity.value
|
||||
|
||||
return unmatched_slots
|
||||
|
||||
@@ -2,21 +2,16 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterable
|
||||
from dataclasses import asdict
|
||||
from typing import Any
|
||||
|
||||
from aiohttp import web
|
||||
from hassil.recognize import MISSING_ENTITY, RecognizeResult
|
||||
from hassil.string_matcher import UnmatchedRangeEntity, UnmatchedTextEntity
|
||||
from home_assistant_intents import get_language_scores
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components import http, websocket_api
|
||||
from homeassistant.components.http.data_validator import RequestDataValidator
|
||||
from homeassistant.const import MATCH_ALL
|
||||
from homeassistant.core import HomeAssistant, State, callback
|
||||
from homeassistant.helpers import config_validation as cv, intent
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.util import language as language_util
|
||||
|
||||
from .agent_manager import (
|
||||
@@ -26,11 +21,6 @@ from .agent_manager import (
|
||||
get_agent_manager,
|
||||
)
|
||||
from .const import DATA_COMPONENT
|
||||
from .default_agent import (
|
||||
METADATA_CUSTOM_FILE,
|
||||
METADATA_CUSTOM_SENTENCE,
|
||||
METADATA_FUZZY_MATCH,
|
||||
)
|
||||
from .entity import ConversationEntity
|
||||
from .models import ConversationInput
|
||||
|
||||
@@ -206,150 +196,12 @@ async def websocket_hass_agent_debug(
|
||||
language=msg.get("language", hass.config.language),
|
||||
agent_id=agent.entity_id,
|
||||
)
|
||||
result_dict: dict[str, Any] | None = None
|
||||
|
||||
if trigger_result := await agent.async_recognize_sentence_trigger(user_input):
|
||||
result_dict = {
|
||||
# Matched a user-defined sentence trigger.
|
||||
# We can't provide the response here without executing the
|
||||
# trigger.
|
||||
"match": True,
|
||||
"source": "trigger",
|
||||
"sentence_template": trigger_result.sentence_template or "",
|
||||
}
|
||||
elif intent_result := await agent.async_recognize_intent(user_input):
|
||||
successful_match = not intent_result.unmatched_entities
|
||||
result_dict = {
|
||||
# Name of the matching intent (or the closest)
|
||||
"intent": {
|
||||
"name": intent_result.intent.name,
|
||||
},
|
||||
# Slot values that would be received by the intent
|
||||
"slots": { # direct access to values
|
||||
entity_key: entity.text or entity.value
|
||||
for entity_key, entity in intent_result.entities.items()
|
||||
},
|
||||
# Extra slot details, such as the originally matched text
|
||||
"details": {
|
||||
entity_key: {
|
||||
"name": entity.name,
|
||||
"value": entity.value,
|
||||
"text": entity.text,
|
||||
}
|
||||
for entity_key, entity in intent_result.entities.items()
|
||||
},
|
||||
# Entities/areas/etc. that would be targeted
|
||||
"targets": {},
|
||||
# True if match was successful
|
||||
"match": successful_match,
|
||||
# Text of the sentence template that matched (or was closest)
|
||||
"sentence_template": "",
|
||||
# When match is incomplete, this will contain the best slot guesses
|
||||
"unmatched_slots": _get_unmatched_slots(intent_result),
|
||||
# True if match was not exact
|
||||
"fuzzy_match": False,
|
||||
}
|
||||
|
||||
if successful_match:
|
||||
result_dict["targets"] = {
|
||||
state.entity_id: {"matched": is_matched}
|
||||
for state, is_matched in _get_debug_targets(hass, intent_result)
|
||||
}
|
||||
|
||||
if intent_result.intent_sentence is not None:
|
||||
result_dict["sentence_template"] = intent_result.intent_sentence.text
|
||||
|
||||
if intent_result.intent_metadata:
|
||||
# Inspect metadata to determine if this matched a custom sentence
|
||||
if intent_result.intent_metadata.get(METADATA_CUSTOM_SENTENCE):
|
||||
result_dict["source"] = "custom"
|
||||
result_dict["file"] = intent_result.intent_metadata.get(
|
||||
METADATA_CUSTOM_FILE
|
||||
)
|
||||
else:
|
||||
result_dict["source"] = "builtin"
|
||||
|
||||
result_dict["fuzzy_match"] = intent_result.intent_metadata.get(
|
||||
METADATA_FUZZY_MATCH, False
|
||||
)
|
||||
|
||||
result_dict = await agent.async_debug_recognize(user_input)
|
||||
result_dicts.append(result_dict)
|
||||
|
||||
connection.send_result(msg["id"], {"results": result_dicts})
|
||||
|
||||
|
||||
def _get_debug_targets(
|
||||
hass: HomeAssistant,
|
||||
result: RecognizeResult,
|
||||
) -> Iterable[tuple[State, bool]]:
|
||||
"""Yield state/is_matched pairs for a hassil recognition."""
|
||||
entities = result.entities
|
||||
|
||||
name: str | None = None
|
||||
area_name: str | None = None
|
||||
domains: set[str] | None = None
|
||||
device_classes: set[str] | None = None
|
||||
state_names: set[str] | None = None
|
||||
|
||||
if "name" in entities:
|
||||
name = str(entities["name"].value)
|
||||
|
||||
if "area" in entities:
|
||||
area_name = str(entities["area"].value)
|
||||
|
||||
if "domain" in entities:
|
||||
domains = set(cv.ensure_list(entities["domain"].value))
|
||||
|
||||
if "device_class" in entities:
|
||||
device_classes = set(cv.ensure_list(entities["device_class"].value))
|
||||
|
||||
if "state" in entities:
|
||||
# HassGetState only
|
||||
state_names = set(cv.ensure_list(entities["state"].value))
|
||||
|
||||
if (
|
||||
(name is None)
|
||||
and (area_name is None)
|
||||
and (not domains)
|
||||
and (not device_classes)
|
||||
and (not state_names)
|
||||
):
|
||||
# Avoid "matching" all entities when there is no filter
|
||||
return
|
||||
|
||||
states = intent.async_match_states(
|
||||
hass,
|
||||
name=name,
|
||||
area_name=area_name,
|
||||
domains=domains,
|
||||
device_classes=device_classes,
|
||||
)
|
||||
|
||||
for state in states:
|
||||
# For queries, a target is "matched" based on its state
|
||||
is_matched = (state_names is None) or (state.state in state_names)
|
||||
yield state, is_matched
|
||||
|
||||
|
||||
def _get_unmatched_slots(
|
||||
result: RecognizeResult,
|
||||
) -> dict[str, str | int | float]:
|
||||
"""Return a dict of unmatched text/range slot entities."""
|
||||
unmatched_slots: dict[str, str | int | float] = {}
|
||||
for entity in result.unmatched_entities_list:
|
||||
if isinstance(entity, UnmatchedTextEntity):
|
||||
if entity.text == MISSING_ENTITY:
|
||||
# Don't report <missing> since these are just missing context
|
||||
# slots.
|
||||
continue
|
||||
|
||||
unmatched_slots[entity.name] = entity.text
|
||||
elif isinstance(entity, UnmatchedRangeEntity):
|
||||
unmatched_slots[entity.name] = entity.value
|
||||
|
||||
return unmatched_slots
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "conversation/agent/homeassistant/language_scores",
|
||||
@@ -364,10 +216,13 @@ async def websocket_hass_agent_language_scores(
|
||||
msg: dict[str, Any],
|
||||
) -> None:
|
||||
"""Get support scores per language."""
|
||||
agent = get_agent_manager(hass).default_agent
|
||||
assert agent is not None
|
||||
|
||||
language = msg.get("language", hass.config.language)
|
||||
country = msg.get("country", hass.config.country)
|
||||
|
||||
scores = await hass.async_add_executor_job(get_language_scores)
|
||||
scores = await agent.async_get_language_scores()
|
||||
matching_langs = language_util.matches(language, scores.keys(), country=country)
|
||||
preferred_lang = matching_langs[0] if matching_langs else language
|
||||
result = {
|
||||
|
||||
Reference in New Issue
Block a user