mirror of
https://github.com/home-assistant/core.git
synced 2026-01-21 15:06:59 +01:00
Compare commits
9 Commits
add_trigge
...
ssl_contex
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6bb157141 | ||
|
|
d589b9eb8d | ||
|
|
256d47775b | ||
|
|
c9eae821e8 | ||
|
|
8c02268638 | ||
|
|
8436676e67 | ||
|
|
df10ffd508 | ||
|
|
02218fab7b | ||
|
|
a7cfac2618 |
@@ -28,6 +28,7 @@ from homeassistant.helpers import device_registry as dr
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
||||
from homeassistant.helpers.event import async_track_time_interval
|
||||
from homeassistant.helpers.httpx_client import get_async_client
|
||||
from homeassistant.util.ssl import SSL_ALPN_HTTP11_HTTP2
|
||||
|
||||
from .const import DOMAIN, UPDATE_INTERVAL
|
||||
from .entity import AqualinkEntity
|
||||
@@ -66,7 +67,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: AqualinkConfigEntry) ->
|
||||
username = entry.data[CONF_USERNAME]
|
||||
password = entry.data[CONF_PASSWORD]
|
||||
|
||||
aqualink = AqualinkClient(username, password, httpx_client=get_async_client(hass))
|
||||
aqualink = AqualinkClient(
|
||||
username,
|
||||
password,
|
||||
httpx_client=get_async_client(hass, alpn_protocols=SSL_ALPN_HTTP11_HTTP2),
|
||||
)
|
||||
try:
|
||||
await aqualink.login()
|
||||
except AqualinkServiceException as login_exception:
|
||||
|
||||
@@ -15,6 +15,7 @@ import voluptuous as vol
|
||||
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
|
||||
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
|
||||
from homeassistant.helpers.httpx_client import get_async_client
|
||||
from homeassistant.util.ssl import SSL_ALPN_HTTP11_HTTP2
|
||||
|
||||
from .const import DOMAIN
|
||||
|
||||
@@ -36,7 +37,11 @@ class AqualinkFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
|
||||
try:
|
||||
async with AqualinkClient(
|
||||
username, password, httpx_client=get_async_client(self.hass)
|
||||
username,
|
||||
password,
|
||||
httpx_client=get_async_client(
|
||||
self.hass, alpn_protocols=SSL_ALPN_HTTP11_HTTP2
|
||||
),
|
||||
):
|
||||
pass
|
||||
except AqualinkServiceUnauthorizedException:
|
||||
|
||||
@@ -370,9 +370,13 @@ def _async_get_connector(
|
||||
return connectors[connector_key]
|
||||
|
||||
if verify_ssl:
|
||||
ssl_context: SSLContext = ssl_util.client_context(ssl_cipher)
|
||||
ssl_context: SSLContext = ssl_util.client_context(
|
||||
ssl_cipher, ssl_util.SSL_ALPN_HTTP11
|
||||
)
|
||||
else:
|
||||
ssl_context = ssl_util.client_context_no_verify(ssl_cipher)
|
||||
ssl_context = ssl_util.client_context_no_verify(
|
||||
ssl_cipher, ssl_util.SSL_ALPN_HTTP11
|
||||
)
|
||||
|
||||
connector = HomeAssistantTCPConnector(
|
||||
family=family,
|
||||
|
||||
@@ -17,6 +17,9 @@ from homeassistant.core import Event, HomeAssistant, callback
|
||||
from homeassistant.loader import bind_hass
|
||||
from homeassistant.util.hass_dict import HassKey
|
||||
from homeassistant.util.ssl import (
|
||||
SSL_ALPN_HTTP11,
|
||||
SSL_ALPN_HTTP11_HTTP2,
|
||||
SSLALPNProtocols,
|
||||
SSLCipherList,
|
||||
client_context,
|
||||
create_no_verify_ssl_context,
|
||||
@@ -28,9 +31,9 @@ from .frame import warn_use
|
||||
# and we want to keep the connection open for a while so we
|
||||
# don't have to reconnect every time so we use 15s to match aiohttp.
|
||||
KEEP_ALIVE_TIMEOUT = 15
|
||||
DATA_ASYNC_CLIENT: HassKey[httpx.AsyncClient] = HassKey("httpx_async_client")
|
||||
DATA_ASYNC_CLIENT_NOVERIFY: HassKey[httpx.AsyncClient] = HassKey(
|
||||
"httpx_async_client_noverify"
|
||||
# Shared httpx clients keyed by (verify_ssl, alpn_protocols)
|
||||
DATA_ASYNC_CLIENT: HassKey[dict[tuple[bool, SSLALPNProtocols], httpx.AsyncClient]] = (
|
||||
HassKey("httpx_async_client")
|
||||
)
|
||||
DEFAULT_LIMITS = limits = httpx.Limits(keepalive_expiry=KEEP_ALIVE_TIMEOUT)
|
||||
SERVER_SOFTWARE = (
|
||||
@@ -42,15 +45,26 @@ USER_AGENT = "User-Agent"
|
||||
|
||||
@callback
|
||||
@bind_hass
|
||||
def get_async_client(hass: HomeAssistant, verify_ssl: bool = True) -> httpx.AsyncClient:
|
||||
def get_async_client(
|
||||
hass: HomeAssistant,
|
||||
verify_ssl: bool = True,
|
||||
alpn_protocols: SSLALPNProtocols = SSL_ALPN_HTTP11,
|
||||
) -> httpx.AsyncClient:
|
||||
"""Return default httpx AsyncClient.
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
key = DATA_ASYNC_CLIENT if verify_ssl else DATA_ASYNC_CLIENT_NOVERIFY
|
||||
|
||||
if (client := hass.data.get(key)) is None:
|
||||
client = hass.data[key] = create_async_httpx_client(hass, verify_ssl)
|
||||
Pass alpn_protocols=SSL_ALPN_HTTP11_HTTP2 to get a client configured for HTTP/2.
|
||||
Clients are cached separately by ALPN protocol to ensure proper SSL context
|
||||
configuration (ALPN protocols differ between HTTP versions).
|
||||
"""
|
||||
client_key = (verify_ssl, alpn_protocols)
|
||||
clients = hass.data.setdefault(DATA_ASYNC_CLIENT, {})
|
||||
|
||||
if (client := clients.get(client_key)) is None:
|
||||
client = clients[client_key] = create_async_httpx_client(
|
||||
hass, verify_ssl, alpn_protocols=alpn_protocols
|
||||
)
|
||||
|
||||
return client
|
||||
|
||||
@@ -77,6 +91,7 @@ def create_async_httpx_client(
|
||||
verify_ssl: bool = True,
|
||||
auto_cleanup: bool = True,
|
||||
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
|
||||
alpn_protocols: SSLALPNProtocols = SSL_ALPN_HTTP11,
|
||||
**kwargs: Any,
|
||||
) -> httpx.AsyncClient:
|
||||
"""Create a new httpx.AsyncClient with kwargs, i.e. for cookies.
|
||||
@@ -84,13 +99,22 @@ def create_async_httpx_client(
|
||||
If auto_cleanup is False, the client will be
|
||||
automatically closed on homeassistant_stop.
|
||||
|
||||
Pass alpn_protocols=SSL_ALPN_HTTP11_HTTP2 for HTTP/2 support (automatically
|
||||
enables httpx http2 mode).
|
||||
|
||||
This method must be run in the event loop.
|
||||
"""
|
||||
# Use the requested ALPN protocols directly to ensure proper SSL context
|
||||
# bucketing. httpx/httpcore mutates SSL contexts by calling set_alpn_protocols(),
|
||||
# so we pre-set the correct protocols to prevent shared context corruption.
|
||||
ssl_context = (
|
||||
client_context(ssl_cipher_list)
|
||||
client_context(ssl_cipher_list, alpn_protocols)
|
||||
if verify_ssl
|
||||
else create_no_verify_ssl_context(ssl_cipher_list)
|
||||
else create_no_verify_ssl_context(ssl_cipher_list, alpn_protocols)
|
||||
)
|
||||
# Enable httpx HTTP/2 mode when HTTP/2 protocol is requested
|
||||
if alpn_protocols == SSL_ALPN_HTTP11_HTTP2:
|
||||
kwargs.setdefault("http2", True)
|
||||
client = HassHttpXAsyncClient(
|
||||
verify=ssl_context,
|
||||
headers={USER_AGENT: SERVER_SOFTWARE},
|
||||
|
||||
@@ -8,6 +8,17 @@ import ssl
|
||||
|
||||
import certifi
|
||||
|
||||
# Type alias for ALPN protocols tuple (None means no ALPN protocols set)
|
||||
type SSLALPNProtocols = tuple[str, ...] | None
|
||||
|
||||
# ALPN protocol configurations
|
||||
# No ALPN protocols - used for libraries that don't support/need ALPN (e.g., aioimap)
|
||||
SSL_ALPN_NONE: SSLALPNProtocols = None
|
||||
# HTTP/1.1 only - used by default and for aiohttp (which doesn't support HTTP/2)
|
||||
SSL_ALPN_HTTP11: SSLALPNProtocols = ("http/1.1",)
|
||||
# HTTP/1.1 with HTTP/2 support - used when httpx http2=True
|
||||
SSL_ALPN_HTTP11_HTTP2: SSLALPNProtocols = ("http/1.1", "h2")
|
||||
|
||||
|
||||
class SSLCipherList(StrEnum):
|
||||
"""SSL cipher lists."""
|
||||
@@ -64,7 +75,10 @@ SSL_CIPHER_LISTS = {
|
||||
|
||||
|
||||
@cache
|
||||
def _client_context_no_verify(ssl_cipher_list: SSLCipherList) -> ssl.SSLContext:
|
||||
def _client_context_no_verify(
|
||||
ssl_cipher_list: SSLCipherList,
|
||||
alpn_protocols: SSLALPNProtocols,
|
||||
) -> ssl.SSLContext:
|
||||
# This is a copy of aiohttp's create_default_context() function, with the
|
||||
# ssl verify turned off.
|
||||
# https://github.com/aio-libs/aiohttp/blob/33953f110e97eecc707e1402daa8d543f38a189b/aiohttp/connector.py#L911
|
||||
@@ -78,12 +92,18 @@ def _client_context_no_verify(ssl_cipher_list: SSLCipherList) -> ssl.SSLContext:
|
||||
sslcontext.set_default_verify_paths()
|
||||
if ssl_cipher_list != SSLCipherList.PYTHON_DEFAULT:
|
||||
sslcontext.set_ciphers(SSL_CIPHER_LISTS[ssl_cipher_list])
|
||||
# Set ALPN protocols to prevent downstream libraries (e.g., httpx/httpcore)
|
||||
# from mutating the shared SSL context with different protocol settings.
|
||||
# If alpn_protocols is None, don't set ALPN (for libraries like aioimap).
|
||||
if alpn_protocols is not None:
|
||||
sslcontext.set_alpn_protocols(list(alpn_protocols))
|
||||
|
||||
return sslcontext
|
||||
|
||||
|
||||
def _create_client_context(
|
||||
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
|
||||
alpn_protocols: SSLALPNProtocols = SSL_ALPN_NONE,
|
||||
) -> ssl.SSLContext:
|
||||
"""Return an independent SSL context for making requests."""
|
||||
# Reuse environment variable definition from requests, since it's already a
|
||||
@@ -96,6 +116,11 @@ def _create_client_context(
|
||||
)
|
||||
if ssl_cipher_list != SSLCipherList.PYTHON_DEFAULT:
|
||||
sslcontext.set_ciphers(SSL_CIPHER_LISTS[ssl_cipher_list])
|
||||
# Set ALPN protocols to prevent downstream libraries (e.g., httpx/httpcore)
|
||||
# from mutating the shared SSL context with different protocol settings.
|
||||
# If alpn_protocols is None, don't set ALPN (for libraries like aioimap).
|
||||
if alpn_protocols is not None:
|
||||
sslcontext.set_alpn_protocols(list(alpn_protocols))
|
||||
|
||||
return sslcontext
|
||||
|
||||
@@ -103,63 +128,63 @@ def _create_client_context(
|
||||
@cache
|
||||
def _client_context(
|
||||
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
|
||||
alpn_protocols: SSLALPNProtocols = SSL_ALPN_NONE,
|
||||
) -> ssl.SSLContext:
|
||||
# Cached version of _create_client_context
|
||||
return _create_client_context(ssl_cipher_list)
|
||||
return _create_client_context(ssl_cipher_list, alpn_protocols)
|
||||
|
||||
|
||||
# Create this only once and reuse it
|
||||
_DEFAULT_SSL_CONTEXT = _client_context(SSLCipherList.PYTHON_DEFAULT)
|
||||
_DEFAULT_NO_VERIFY_SSL_CONTEXT = _client_context_no_verify(SSLCipherList.PYTHON_DEFAULT)
|
||||
_NO_VERIFY_SSL_CONTEXTS = {
|
||||
SSLCipherList.INTERMEDIATE: _client_context_no_verify(SSLCipherList.INTERMEDIATE),
|
||||
SSLCipherList.MODERN: _client_context_no_verify(SSLCipherList.MODERN),
|
||||
SSLCipherList.INSECURE: _client_context_no_verify(SSLCipherList.INSECURE),
|
||||
}
|
||||
_SSL_CONTEXTS = {
|
||||
SSLCipherList.INTERMEDIATE: _client_context(SSLCipherList.INTERMEDIATE),
|
||||
SSLCipherList.MODERN: _client_context(SSLCipherList.MODERN),
|
||||
SSLCipherList.INSECURE: _client_context(SSLCipherList.INSECURE),
|
||||
}
|
||||
# Pre-warm the cache for ALL SSL context configurations at module load time.
|
||||
# This is critical because creating SSL contexts loads certificates from disk,
|
||||
# which is blocking I/O that must not happen in the event loop.
|
||||
_SSL_ALPN_PROTOCOLS = (SSL_ALPN_NONE, SSL_ALPN_HTTP11, SSL_ALPN_HTTP11_HTTP2)
|
||||
for _cipher in SSLCipherList:
|
||||
for _alpn in _SSL_ALPN_PROTOCOLS:
|
||||
_client_context(_cipher, _alpn)
|
||||
_client_context_no_verify(_cipher, _alpn)
|
||||
|
||||
|
||||
def get_default_context() -> ssl.SSLContext:
|
||||
"""Return the default SSL context."""
|
||||
return _DEFAULT_SSL_CONTEXT
|
||||
return _client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11)
|
||||
|
||||
|
||||
def get_default_no_verify_context() -> ssl.SSLContext:
|
||||
"""Return the default SSL context that does not verify the server certificate."""
|
||||
return _DEFAULT_NO_VERIFY_SSL_CONTEXT
|
||||
return _client_context_no_verify(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11)
|
||||
|
||||
|
||||
def client_context_no_verify(
|
||||
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
|
||||
alpn_protocols: SSLALPNProtocols = SSL_ALPN_NONE,
|
||||
) -> ssl.SSLContext:
|
||||
"""Return a SSL context with no verification with a specific ssl cipher."""
|
||||
return _NO_VERIFY_SSL_CONTEXTS.get(ssl_cipher_list, _DEFAULT_NO_VERIFY_SSL_CONTEXT)
|
||||
return _client_context_no_verify(ssl_cipher_list, alpn_protocols)
|
||||
|
||||
|
||||
def client_context(
|
||||
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
|
||||
alpn_protocols: SSLALPNProtocols = SSL_ALPN_NONE,
|
||||
) -> ssl.SSLContext:
|
||||
"""Return an SSL context for making requests."""
|
||||
return _SSL_CONTEXTS.get(ssl_cipher_list, _DEFAULT_SSL_CONTEXT)
|
||||
return _client_context(ssl_cipher_list, alpn_protocols)
|
||||
|
||||
|
||||
def create_client_context(
|
||||
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
|
||||
alpn_protocols: SSLALPNProtocols = SSL_ALPN_NONE,
|
||||
) -> ssl.SSLContext:
|
||||
"""Return an independent SSL context for making requests."""
|
||||
# This explicitly uses the non-cached version to create a client context
|
||||
return _create_client_context(ssl_cipher_list)
|
||||
return _create_client_context(ssl_cipher_list, alpn_protocols)
|
||||
|
||||
|
||||
def create_no_verify_ssl_context(
|
||||
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
|
||||
alpn_protocols: SSLALPNProtocols = SSL_ALPN_NONE,
|
||||
) -> ssl.SSLContext:
|
||||
"""Return an SSL context that does not verify the server certificate."""
|
||||
return _client_context_no_verify(ssl_cipher_list)
|
||||
return _client_context_no_verify(ssl_cipher_list, alpn_protocols)
|
||||
|
||||
|
||||
def server_context_modern() -> ssl.SSLContext:
|
||||
|
||||
@@ -22,6 +22,7 @@ from homeassistant.const import (
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import aiohttp_client as client
|
||||
from homeassistant.util import ssl as ssl_util
|
||||
from homeassistant.util.color import RGBColor
|
||||
from homeassistant.util.ssl import SSLCipherList
|
||||
|
||||
@@ -413,3 +414,29 @@ async def test_resolver_is_singleton(hass: HomeAssistant) -> None:
|
||||
assert isinstance(session3._connector, aiohttp.TCPConnector)
|
||||
assert session._connector._resolver is session2._connector._resolver
|
||||
assert session._connector._resolver is session3._connector._resolver
|
||||
|
||||
|
||||
async def test_connector_uses_http11_alpn(hass: HomeAssistant) -> None:
|
||||
"""Test that connector uses HTTP/1.1 ALPN protocols."""
|
||||
with patch.object(
|
||||
ssl_util, "client_context", wraps=ssl_util.client_context
|
||||
) as mock_client_context:
|
||||
client.async_get_clientsession(hass)
|
||||
|
||||
# Verify client_context was called with HTTP/1.1 ALPN
|
||||
mock_client_context.assert_called_once_with(
|
||||
SSLCipherList.PYTHON_DEFAULT, ssl_util.SSL_ALPN_HTTP11
|
||||
)
|
||||
|
||||
|
||||
async def test_connector_no_verify_uses_http11_alpn(hass: HomeAssistant) -> None:
|
||||
"""Test that connector without SSL verification uses HTTP/1.1 ALPN protocols."""
|
||||
with patch.object(
|
||||
ssl_util, "client_context_no_verify", wraps=ssl_util.client_context_no_verify
|
||||
) as mock_client_context_no_verify:
|
||||
client.async_get_clientsession(hass, verify_ssl=False)
|
||||
|
||||
# Verify client_context_no_verify was called with HTTP/1.1 ALPN
|
||||
mock_client_context_no_verify.assert_called_once_with(
|
||||
SSLCipherList.PYTHON_DEFAULT, ssl_util.SSL_ALPN_HTTP11
|
||||
)
|
||||
|
||||
@@ -8,6 +8,7 @@ import pytest
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_CLOSE
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import httpx_client as client
|
||||
from homeassistant.util.ssl import SSL_ALPN_HTTP11, SSL_ALPN_HTTP11_HTTP2
|
||||
|
||||
from tests.common import MockModule, extract_stack_to_frame, mock_integration
|
||||
|
||||
@@ -16,14 +17,20 @@ async def test_get_async_client_with_ssl(hass: HomeAssistant) -> None:
|
||||
"""Test init async client with ssl."""
|
||||
client.get_async_client(hass)
|
||||
|
||||
assert isinstance(hass.data[client.DATA_ASYNC_CLIENT], httpx.AsyncClient)
|
||||
assert isinstance(
|
||||
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)],
|
||||
httpx.AsyncClient,
|
||||
)
|
||||
|
||||
|
||||
async def test_get_async_client_without_ssl(hass: HomeAssistant) -> None:
|
||||
"""Test init async client without ssl."""
|
||||
client.get_async_client(hass, verify_ssl=False)
|
||||
|
||||
assert isinstance(hass.data[client.DATA_ASYNC_CLIENT_NOVERIFY], httpx.AsyncClient)
|
||||
assert isinstance(
|
||||
hass.data[client.DATA_ASYNC_CLIENT][(False, SSL_ALPN_HTTP11)],
|
||||
httpx.AsyncClient,
|
||||
)
|
||||
|
||||
|
||||
async def test_create_async_httpx_client_with_ssl_and_cookies(
|
||||
@@ -34,7 +41,7 @@ async def test_create_async_httpx_client_with_ssl_and_cookies(
|
||||
|
||||
httpx_client = client.create_async_httpx_client(hass, cookies={"bla": True})
|
||||
assert isinstance(httpx_client, httpx.AsyncClient)
|
||||
assert hass.data[client.DATA_ASYNC_CLIENT] != httpx_client
|
||||
assert hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)] != httpx_client
|
||||
|
||||
|
||||
async def test_create_async_httpx_client_without_ssl_and_cookies(
|
||||
@@ -47,31 +54,37 @@ async def test_create_async_httpx_client_without_ssl_and_cookies(
|
||||
hass, verify_ssl=False, cookies={"bla": True}
|
||||
)
|
||||
assert isinstance(httpx_client, httpx.AsyncClient)
|
||||
assert hass.data[client.DATA_ASYNC_CLIENT_NOVERIFY] != httpx_client
|
||||
assert hass.data[client.DATA_ASYNC_CLIENT][(False, SSL_ALPN_HTTP11)] != httpx_client
|
||||
|
||||
|
||||
async def test_get_async_client_cleanup(hass: HomeAssistant) -> None:
|
||||
"""Test init async client with ssl."""
|
||||
client.get_async_client(hass)
|
||||
|
||||
assert isinstance(hass.data[client.DATA_ASYNC_CLIENT], httpx.AsyncClient)
|
||||
assert isinstance(
|
||||
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)],
|
||||
httpx.AsyncClient,
|
||||
)
|
||||
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_CLOSE)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert hass.data[client.DATA_ASYNC_CLIENT].is_closed
|
||||
assert hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)].is_closed
|
||||
|
||||
|
||||
async def test_get_async_client_cleanup_without_ssl(hass: HomeAssistant) -> None:
|
||||
"""Test init async client without ssl."""
|
||||
client.get_async_client(hass, verify_ssl=False)
|
||||
|
||||
assert isinstance(hass.data[client.DATA_ASYNC_CLIENT_NOVERIFY], httpx.AsyncClient)
|
||||
assert isinstance(
|
||||
hass.data[client.DATA_ASYNC_CLIENT][(False, SSL_ALPN_HTTP11)],
|
||||
httpx.AsyncClient,
|
||||
)
|
||||
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_CLOSE)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert hass.data[client.DATA_ASYNC_CLIENT_NOVERIFY].is_closed
|
||||
assert hass.data[client.DATA_ASYNC_CLIENT][(False, SSL_ALPN_HTTP11)].is_closed
|
||||
|
||||
|
||||
async def test_get_async_client_patched_close(hass: HomeAssistant) -> None:
|
||||
@@ -79,7 +92,10 @@ async def test_get_async_client_patched_close(hass: HomeAssistant) -> None:
|
||||
|
||||
with patch("httpx.AsyncClient.aclose") as mock_aclose:
|
||||
httpx_session = client.get_async_client(hass)
|
||||
assert isinstance(hass.data[client.DATA_ASYNC_CLIENT], httpx.AsyncClient)
|
||||
assert isinstance(
|
||||
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)],
|
||||
httpx.AsyncClient,
|
||||
)
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
await httpx_session.aclose()
|
||||
@@ -92,7 +108,10 @@ async def test_get_async_client_context_manager(hass: HomeAssistant) -> None:
|
||||
|
||||
with patch("httpx.AsyncClient.aclose") as mock_aclose:
|
||||
httpx_session = client.get_async_client(hass)
|
||||
assert isinstance(hass.data[client.DATA_ASYNC_CLIENT], httpx.AsyncClient)
|
||||
assert isinstance(
|
||||
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)],
|
||||
httpx.AsyncClient,
|
||||
)
|
||||
|
||||
async with httpx_session:
|
||||
pass
|
||||
@@ -100,6 +119,80 @@ async def test_get_async_client_context_manager(hass: HomeAssistant) -> None:
|
||||
assert mock_aclose.call_count == 0
|
||||
|
||||
|
||||
async def test_get_async_client_http2(hass: HomeAssistant) -> None:
|
||||
"""Test init async client with HTTP/2 support."""
|
||||
http1_client = client.get_async_client(hass)
|
||||
http2_client = client.get_async_client(hass, alpn_protocols=SSL_ALPN_HTTP11_HTTP2)
|
||||
|
||||
# HTTP/1.1 and HTTP/2 clients should be different (different SSL contexts)
|
||||
assert http1_client is not http2_client
|
||||
assert isinstance(
|
||||
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)],
|
||||
httpx.AsyncClient,
|
||||
)
|
||||
assert isinstance(
|
||||
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11_HTTP2)],
|
||||
httpx.AsyncClient,
|
||||
)
|
||||
|
||||
# Same parameters should return cached client
|
||||
assert client.get_async_client(hass) is http1_client
|
||||
assert (
|
||||
client.get_async_client(hass, alpn_protocols=SSL_ALPN_HTTP11_HTTP2)
|
||||
is http2_client
|
||||
)
|
||||
|
||||
|
||||
async def test_get_async_client_http2_cleanup(hass: HomeAssistant) -> None:
|
||||
"""Test cleanup of HTTP/2 async client."""
|
||||
client.get_async_client(hass, alpn_protocols=SSL_ALPN_HTTP11_HTTP2)
|
||||
|
||||
assert isinstance(
|
||||
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11_HTTP2)],
|
||||
httpx.AsyncClient,
|
||||
)
|
||||
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_CLOSE)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11_HTTP2)].is_closed
|
||||
|
||||
|
||||
async def test_get_async_client_http2_without_ssl(hass: HomeAssistant) -> None:
|
||||
"""Test init async client with HTTP/2 and without SSL."""
|
||||
http2_client = client.get_async_client(
|
||||
hass, verify_ssl=False, alpn_protocols=SSL_ALPN_HTTP11_HTTP2
|
||||
)
|
||||
|
||||
assert isinstance(
|
||||
hass.data[client.DATA_ASYNC_CLIENT][(False, SSL_ALPN_HTTP11_HTTP2)],
|
||||
httpx.AsyncClient,
|
||||
)
|
||||
|
||||
# Same parameters should return cached client
|
||||
assert (
|
||||
client.get_async_client(
|
||||
hass, verify_ssl=False, alpn_protocols=SSL_ALPN_HTTP11_HTTP2
|
||||
)
|
||||
is http2_client
|
||||
)
|
||||
|
||||
|
||||
async def test_create_async_httpx_client_http2(hass: HomeAssistant) -> None:
|
||||
"""Test create async client with HTTP/2 uses correct ALPN protocols."""
|
||||
http1_client = client.create_async_httpx_client(hass)
|
||||
http2_client = client.create_async_httpx_client(
|
||||
hass, alpn_protocols=SSL_ALPN_HTTP11_HTTP2
|
||||
)
|
||||
|
||||
# Different clients (not cached)
|
||||
assert http1_client is not http2_client
|
||||
|
||||
# Both should be valid clients
|
||||
assert isinstance(http1_client, httpx.AsyncClient)
|
||||
assert isinstance(http2_client, httpx.AsyncClient)
|
||||
|
||||
|
||||
async def test_warning_close_session_integration(
|
||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
|
||||
) -> None:
|
||||
|
||||
@@ -1,78 +1,58 @@
|
||||
"""Test Home Assistant ssl utility functions."""
|
||||
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from homeassistant.util.ssl import (
|
||||
SSL_ALPN_HTTP11,
|
||||
SSL_ALPN_HTTP11_HTTP2,
|
||||
SSL_ALPN_NONE,
|
||||
SSLCipherList,
|
||||
client_context,
|
||||
client_context_no_verify,
|
||||
create_client_context,
|
||||
create_no_verify_ssl_context,
|
||||
get_default_context,
|
||||
get_default_no_verify_context,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_sslcontext():
|
||||
"""Mock the ssl lib."""
|
||||
return MagicMock(set_ciphers=Mock(return_value=True))
|
||||
|
||||
|
||||
def test_client_context(mock_sslcontext) -> None:
|
||||
"""Test client context."""
|
||||
with patch("homeassistant.util.ssl.ssl.SSLContext", return_value=mock_sslcontext):
|
||||
client_context()
|
||||
mock_sslcontext.set_ciphers.assert_not_called()
|
||||
|
||||
client_context(SSLCipherList.MODERN)
|
||||
mock_sslcontext.set_ciphers.assert_not_called()
|
||||
|
||||
client_context(SSLCipherList.INTERMEDIATE)
|
||||
mock_sslcontext.set_ciphers.assert_not_called()
|
||||
|
||||
client_context(SSLCipherList.INSECURE)
|
||||
mock_sslcontext.set_ciphers.assert_not_called()
|
||||
|
||||
|
||||
def test_no_verify_ssl_context(mock_sslcontext) -> None:
|
||||
"""Test no verify ssl context."""
|
||||
with patch("homeassistant.util.ssl.ssl.SSLContext", return_value=mock_sslcontext):
|
||||
create_no_verify_ssl_context()
|
||||
mock_sslcontext.set_ciphers.assert_not_called()
|
||||
|
||||
create_no_verify_ssl_context(SSLCipherList.MODERN)
|
||||
mock_sslcontext.set_ciphers.assert_not_called()
|
||||
|
||||
create_no_verify_ssl_context(SSLCipherList.INTERMEDIATE)
|
||||
mock_sslcontext.set_ciphers.assert_not_called()
|
||||
|
||||
create_no_verify_ssl_context(SSLCipherList.INSECURE)
|
||||
mock_sslcontext.set_ciphers.assert_not_called()
|
||||
|
||||
|
||||
def test_ssl_context_caching() -> None:
|
||||
"""Test that SSLContext instances are cached correctly."""
|
||||
|
||||
assert client_context() is client_context(SSLCipherList.PYTHON_DEFAULT)
|
||||
assert create_no_verify_ssl_context() is create_no_verify_ssl_context(
|
||||
SSLCipherList.PYTHON_DEFAULT
|
||||
)
|
||||
|
||||
|
||||
def test_create_client_context(mock_sslcontext) -> None:
|
||||
"""Test create client context."""
|
||||
with patch("homeassistant.util.ssl.ssl.SSLContext", return_value=mock_sslcontext):
|
||||
client_context()
|
||||
mock_sslcontext.set_ciphers.assert_not_called()
|
||||
def test_ssl_context_cipher_bucketing() -> None:
|
||||
"""Test that SSL contexts are bucketed by cipher list."""
|
||||
default_ctx = client_context(SSLCipherList.PYTHON_DEFAULT)
|
||||
modern_ctx = client_context(SSLCipherList.MODERN)
|
||||
intermediate_ctx = client_context(SSLCipherList.INTERMEDIATE)
|
||||
insecure_ctx = client_context(SSLCipherList.INSECURE)
|
||||
|
||||
client_context(SSLCipherList.MODERN)
|
||||
mock_sslcontext.set_ciphers.assert_not_called()
|
||||
# Different cipher lists should return different contexts
|
||||
assert default_ctx is not modern_ctx
|
||||
assert default_ctx is not intermediate_ctx
|
||||
assert default_ctx is not insecure_ctx
|
||||
assert modern_ctx is not intermediate_ctx
|
||||
assert modern_ctx is not insecure_ctx
|
||||
assert intermediate_ctx is not insecure_ctx
|
||||
|
||||
client_context(SSLCipherList.INTERMEDIATE)
|
||||
mock_sslcontext.set_ciphers.assert_not_called()
|
||||
# Same parameters should return cached context
|
||||
assert client_context(SSLCipherList.PYTHON_DEFAULT) is default_ctx
|
||||
assert client_context(SSLCipherList.MODERN) is modern_ctx
|
||||
|
||||
client_context(SSLCipherList.INSECURE)
|
||||
mock_sslcontext.set_ciphers.assert_not_called()
|
||||
|
||||
def test_no_verify_ssl_context_cipher_bucketing() -> None:
|
||||
"""Test that no-verify SSL contexts are bucketed by cipher list."""
|
||||
default_ctx = create_no_verify_ssl_context(SSLCipherList.PYTHON_DEFAULT)
|
||||
modern_ctx = create_no_verify_ssl_context(SSLCipherList.MODERN)
|
||||
|
||||
# Different cipher lists should return different contexts
|
||||
assert default_ctx is not modern_ctx
|
||||
|
||||
# Same parameters should return cached context
|
||||
assert create_no_verify_ssl_context(SSLCipherList.PYTHON_DEFAULT) is default_ctx
|
||||
assert create_no_verify_ssl_context(SSLCipherList.MODERN) is modern_ctx
|
||||
|
||||
|
||||
def test_create_client_context_independent() -> None:
|
||||
@@ -82,3 +62,129 @@ def test_create_client_context_independent() -> None:
|
||||
independent_context_2 = create_client_context()
|
||||
assert shared_context is not independent_context_1
|
||||
assert independent_context_1 is not independent_context_2
|
||||
|
||||
|
||||
def test_ssl_context_alpn_bucketing() -> None:
|
||||
"""Test that SSL contexts are bucketed by ALPN protocols.
|
||||
|
||||
Different ALPN protocol configurations should return different cached contexts
|
||||
to prevent downstream libraries (e.g., httpx/httpcore) from mutating shared
|
||||
contexts with incompatible settings.
|
||||
"""
|
||||
# HTTP/1.1, HTTP/2, and no-ALPN contexts should all be different
|
||||
http1_context = client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11)
|
||||
http2_context = client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11_HTTP2)
|
||||
no_alpn_context = client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_NONE)
|
||||
assert http1_context is not http2_context
|
||||
assert http1_context is not no_alpn_context
|
||||
assert http2_context is not no_alpn_context
|
||||
|
||||
# Same parameters should return cached context
|
||||
assert (
|
||||
client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11) is http1_context
|
||||
)
|
||||
assert (
|
||||
client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11_HTTP2)
|
||||
is http2_context
|
||||
)
|
||||
assert (
|
||||
client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_NONE) is no_alpn_context
|
||||
)
|
||||
|
||||
# No-verify contexts should also be bucketed by ALPN
|
||||
http1_no_verify = client_context_no_verify(
|
||||
SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11
|
||||
)
|
||||
http2_no_verify = client_context_no_verify(
|
||||
SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11_HTTP2
|
||||
)
|
||||
no_alpn_no_verify = client_context_no_verify(
|
||||
SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_NONE
|
||||
)
|
||||
assert http1_no_verify is not http2_no_verify
|
||||
assert http1_no_verify is not no_alpn_no_verify
|
||||
assert http2_no_verify is not no_alpn_no_verify
|
||||
|
||||
# create_no_verify_ssl_context should also work with ALPN
|
||||
assert (
|
||||
create_no_verify_ssl_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11)
|
||||
is http1_no_verify
|
||||
)
|
||||
assert (
|
||||
create_no_verify_ssl_context(
|
||||
SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11_HTTP2
|
||||
)
|
||||
is http2_no_verify
|
||||
)
|
||||
assert (
|
||||
create_no_verify_ssl_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_NONE)
|
||||
is no_alpn_no_verify
|
||||
)
|
||||
|
||||
|
||||
def test_ssl_context_insecure_alpn_bucketing() -> None:
|
||||
"""Test that INSECURE cipher list SSL contexts are bucketed by ALPN protocols.
|
||||
|
||||
INSECURE cipher list is used by some integrations that need to connect to
|
||||
devices with outdated TLS implementations.
|
||||
"""
|
||||
# HTTP/1.1, HTTP/2, and no-ALPN contexts should all be different
|
||||
http1_context = client_context(SSLCipherList.INSECURE, SSL_ALPN_HTTP11)
|
||||
http2_context = client_context(SSLCipherList.INSECURE, SSL_ALPN_HTTP11_HTTP2)
|
||||
no_alpn_context = client_context(SSLCipherList.INSECURE, SSL_ALPN_NONE)
|
||||
assert http1_context is not http2_context
|
||||
assert http1_context is not no_alpn_context
|
||||
assert http2_context is not no_alpn_context
|
||||
|
||||
# Same parameters should return cached context
|
||||
assert client_context(SSLCipherList.INSECURE, SSL_ALPN_HTTP11) is http1_context
|
||||
assert (
|
||||
client_context(SSLCipherList.INSECURE, SSL_ALPN_HTTP11_HTTP2) is http2_context
|
||||
)
|
||||
assert client_context(SSLCipherList.INSECURE, SSL_ALPN_NONE) is no_alpn_context
|
||||
|
||||
# No-verify contexts should also be bucketed by ALPN
|
||||
http1_no_verify = client_context_no_verify(SSLCipherList.INSECURE, SSL_ALPN_HTTP11)
|
||||
http2_no_verify = client_context_no_verify(
|
||||
SSLCipherList.INSECURE, SSL_ALPN_HTTP11_HTTP2
|
||||
)
|
||||
no_alpn_no_verify = client_context_no_verify(SSLCipherList.INSECURE, SSL_ALPN_NONE)
|
||||
assert http1_no_verify is not http2_no_verify
|
||||
assert http1_no_verify is not no_alpn_no_verify
|
||||
assert http2_no_verify is not no_alpn_no_verify
|
||||
|
||||
# create_no_verify_ssl_context should also work with ALPN
|
||||
assert (
|
||||
create_no_verify_ssl_context(SSLCipherList.INSECURE, SSL_ALPN_HTTP11)
|
||||
is http1_no_verify
|
||||
)
|
||||
assert (
|
||||
create_no_verify_ssl_context(SSLCipherList.INSECURE, SSL_ALPN_HTTP11_HTTP2)
|
||||
is http2_no_verify
|
||||
)
|
||||
assert (
|
||||
create_no_verify_ssl_context(SSLCipherList.INSECURE, SSL_ALPN_NONE)
|
||||
is no_alpn_no_verify
|
||||
)
|
||||
|
||||
|
||||
def test_get_default_context_uses_http1_alpn() -> None:
|
||||
"""Test that get_default_context returns context with HTTP1 ALPN."""
|
||||
default_ctx = get_default_context()
|
||||
default_no_verify_ctx = get_default_no_verify_context()
|
||||
|
||||
# Default contexts should be the same as explicitly requesting HTTP1 ALPN
|
||||
assert default_ctx is client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11)
|
||||
assert default_no_verify_ctx is client_context_no_verify(
|
||||
SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11
|
||||
)
|
||||
|
||||
|
||||
def test_client_context_default_no_alpn() -> None:
|
||||
"""Test that client_context defaults to no ALPN for backward compatibility."""
|
||||
# Default (no ALPN) should be different from HTTP1 ALPN
|
||||
default_ctx = client_context()
|
||||
http1_ctx = client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11)
|
||||
|
||||
assert default_ctx is not http1_ctx
|
||||
assert default_ctx is client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_NONE)
|
||||
|
||||
Reference in New Issue
Block a user