Compare commits

...

9 Commits

Author SHA1 Message Date
J. Nick Koston
b6bb157141 http2 checks 2026-01-20 21:46:24 -10:00
J. Nick Koston
d589b9eb8d more flexible for the future 2026-01-20 11:15:25 -10:00
J. Nick Koston
256d47775b better design for future where we live in http3 land 2026-01-20 11:08:25 -10:00
J. Nick Koston
c9eae821e8 Merge remote-tracking branch 'upstream/dev' into ssl_context_mutates 2026-01-20 11:04:06 -10:00
J. Nick Koston
8c02268638 delete brittle tests that were only needed to make sure it works once 2026-01-20 10:44:57 -10:00
J. Nick Koston
8436676e67 Merge branch 'dev' into ssl_context_mutates 2026-01-20 10:42:57 -10:00
J. Nick Koston
df10ffd508 avoid blocking I/O 2026-01-20 10:37:19 -10:00
J. Nick Koston
02218fab7b warm insecure as well 2026-01-20 10:34:10 -10:00
J. Nick Koston
a7cfac2618 Fix SSL context mutation by httpx/httpcore with ALPN protocol bucketing 2026-01-20 08:54:52 -10:00
8 changed files with 388 additions and 99 deletions

View File

@@ -28,6 +28,7 @@ from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.httpx_client import get_async_client
from homeassistant.util.ssl import SSL_ALPN_HTTP11_HTTP2
from .const import DOMAIN, UPDATE_INTERVAL
from .entity import AqualinkEntity
@@ -66,7 +67,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: AqualinkConfigEntry) ->
username = entry.data[CONF_USERNAME]
password = entry.data[CONF_PASSWORD]
aqualink = AqualinkClient(username, password, httpx_client=get_async_client(hass))
aqualink = AqualinkClient(
username,
password,
httpx_client=get_async_client(hass, alpn_protocols=SSL_ALPN_HTTP11_HTTP2),
)
try:
await aqualink.login()
except AqualinkServiceException as login_exception:

View File

@@ -15,6 +15,7 @@ import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.helpers.httpx_client import get_async_client
from homeassistant.util.ssl import SSL_ALPN_HTTP11_HTTP2
from .const import DOMAIN
@@ -36,7 +37,11 @@ class AqualinkFlowHandler(ConfigFlow, domain=DOMAIN):
try:
async with AqualinkClient(
username, password, httpx_client=get_async_client(self.hass)
username,
password,
httpx_client=get_async_client(
self.hass, alpn_protocols=SSL_ALPN_HTTP11_HTTP2
),
):
pass
except AqualinkServiceUnauthorizedException:

View File

@@ -370,9 +370,13 @@ def _async_get_connector(
return connectors[connector_key]
if verify_ssl:
ssl_context: SSLContext = ssl_util.client_context(ssl_cipher)
ssl_context: SSLContext = ssl_util.client_context(
ssl_cipher, ssl_util.SSL_ALPN_HTTP11
)
else:
ssl_context = ssl_util.client_context_no_verify(ssl_cipher)
ssl_context = ssl_util.client_context_no_verify(
ssl_cipher, ssl_util.SSL_ALPN_HTTP11
)
connector = HomeAssistantTCPConnector(
family=family,

View File

@@ -17,6 +17,9 @@ from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.loader import bind_hass
from homeassistant.util.hass_dict import HassKey
from homeassistant.util.ssl import (
SSL_ALPN_HTTP11,
SSL_ALPN_HTTP11_HTTP2,
SSLALPNProtocols,
SSLCipherList,
client_context,
create_no_verify_ssl_context,
@@ -28,9 +31,9 @@ from .frame import warn_use
# and we want to keep the connection open for a while so we
# don't have to reconnect every time so we use 15s to match aiohttp.
KEEP_ALIVE_TIMEOUT = 15
DATA_ASYNC_CLIENT: HassKey[httpx.AsyncClient] = HassKey("httpx_async_client")
DATA_ASYNC_CLIENT_NOVERIFY: HassKey[httpx.AsyncClient] = HassKey(
"httpx_async_client_noverify"
# Shared httpx clients keyed by (verify_ssl, alpn_protocols)
DATA_ASYNC_CLIENT: HassKey[dict[tuple[bool, SSLALPNProtocols], httpx.AsyncClient]] = (
HassKey("httpx_async_client")
)
DEFAULT_LIMITS = limits = httpx.Limits(keepalive_expiry=KEEP_ALIVE_TIMEOUT)
SERVER_SOFTWARE = (
@@ -42,15 +45,26 @@ USER_AGENT = "User-Agent"
@callback
@bind_hass
def get_async_client(hass: HomeAssistant, verify_ssl: bool = True) -> httpx.AsyncClient:
def get_async_client(
hass: HomeAssistant,
verify_ssl: bool = True,
alpn_protocols: SSLALPNProtocols = SSL_ALPN_HTTP11,
) -> httpx.AsyncClient:
"""Return default httpx AsyncClient.
This method must be run in the event loop.
"""
key = DATA_ASYNC_CLIENT if verify_ssl else DATA_ASYNC_CLIENT_NOVERIFY
if (client := hass.data.get(key)) is None:
client = hass.data[key] = create_async_httpx_client(hass, verify_ssl)
Pass alpn_protocols=SSL_ALPN_HTTP11_HTTP2 to get a client configured for HTTP/2.
Clients are cached separately by ALPN protocol to ensure proper SSL context
configuration (ALPN protocols differ between HTTP versions).
"""
client_key = (verify_ssl, alpn_protocols)
clients = hass.data.setdefault(DATA_ASYNC_CLIENT, {})
if (client := clients.get(client_key)) is None:
client = clients[client_key] = create_async_httpx_client(
hass, verify_ssl, alpn_protocols=alpn_protocols
)
return client
@@ -77,6 +91,7 @@ def create_async_httpx_client(
verify_ssl: bool = True,
auto_cleanup: bool = True,
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
alpn_protocols: SSLALPNProtocols = SSL_ALPN_HTTP11,
**kwargs: Any,
) -> httpx.AsyncClient:
"""Create a new httpx.AsyncClient with kwargs, i.e. for cookies.
@@ -84,13 +99,22 @@ def create_async_httpx_client(
If auto_cleanup is False, the client will be
automatically closed on homeassistant_stop.
Pass alpn_protocols=SSL_ALPN_HTTP11_HTTP2 for HTTP/2 support (automatically
enables httpx http2 mode).
This method must be run in the event loop.
"""
# Use the requested ALPN protocols directly to ensure proper SSL context
# bucketing. httpx/httpcore mutates SSL contexts by calling set_alpn_protocols(),
# so we pre-set the correct protocols to prevent shared context corruption.
ssl_context = (
client_context(ssl_cipher_list)
client_context(ssl_cipher_list, alpn_protocols)
if verify_ssl
else create_no_verify_ssl_context(ssl_cipher_list)
else create_no_verify_ssl_context(ssl_cipher_list, alpn_protocols)
)
# Enable httpx HTTP/2 mode when HTTP/2 protocol is requested
if alpn_protocols == SSL_ALPN_HTTP11_HTTP2:
kwargs.setdefault("http2", True)
client = HassHttpXAsyncClient(
verify=ssl_context,
headers={USER_AGENT: SERVER_SOFTWARE},

View File

@@ -8,6 +8,17 @@ import ssl
import certifi
# Type alias for ALPN protocols tuple (None means no ALPN protocols set)
type SSLALPNProtocols = tuple[str, ...] | None
# ALPN protocol configurations
# No ALPN protocols - used for libraries that don't support/need ALPN (e.g., aioimap)
SSL_ALPN_NONE: SSLALPNProtocols = None
# HTTP/1.1 only - used by default and for aiohttp (which doesn't support HTTP/2)
SSL_ALPN_HTTP11: SSLALPNProtocols = ("http/1.1",)
# HTTP/1.1 with HTTP/2 support - used when httpx http2=True
SSL_ALPN_HTTP11_HTTP2: SSLALPNProtocols = ("http/1.1", "h2")
class SSLCipherList(StrEnum):
"""SSL cipher lists."""
@@ -64,7 +75,10 @@ SSL_CIPHER_LISTS = {
@cache
def _client_context_no_verify(ssl_cipher_list: SSLCipherList) -> ssl.SSLContext:
def _client_context_no_verify(
ssl_cipher_list: SSLCipherList,
alpn_protocols: SSLALPNProtocols,
) -> ssl.SSLContext:
# This is a copy of aiohttp's create_default_context() function, with the
# ssl verify turned off.
# https://github.com/aio-libs/aiohttp/blob/33953f110e97eecc707e1402daa8d543f38a189b/aiohttp/connector.py#L911
@@ -78,12 +92,18 @@ def _client_context_no_verify(ssl_cipher_list: SSLCipherList) -> ssl.SSLContext:
sslcontext.set_default_verify_paths()
if ssl_cipher_list != SSLCipherList.PYTHON_DEFAULT:
sslcontext.set_ciphers(SSL_CIPHER_LISTS[ssl_cipher_list])
# Set ALPN protocols to prevent downstream libraries (e.g., httpx/httpcore)
# from mutating the shared SSL context with different protocol settings.
# If alpn_protocols is None, don't set ALPN (for libraries like aioimap).
if alpn_protocols is not None:
sslcontext.set_alpn_protocols(list(alpn_protocols))
return sslcontext
def _create_client_context(
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
alpn_protocols: SSLALPNProtocols = SSL_ALPN_NONE,
) -> ssl.SSLContext:
"""Return an independent SSL context for making requests."""
# Reuse environment variable definition from requests, since it's already a
@@ -96,6 +116,11 @@ def _create_client_context(
)
if ssl_cipher_list != SSLCipherList.PYTHON_DEFAULT:
sslcontext.set_ciphers(SSL_CIPHER_LISTS[ssl_cipher_list])
# Set ALPN protocols to prevent downstream libraries (e.g., httpx/httpcore)
# from mutating the shared SSL context with different protocol settings.
# If alpn_protocols is None, don't set ALPN (for libraries like aioimap).
if alpn_protocols is not None:
sslcontext.set_alpn_protocols(list(alpn_protocols))
return sslcontext
@@ -103,63 +128,63 @@ def _create_client_context(
@cache
def _client_context(
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
alpn_protocols: SSLALPNProtocols = SSL_ALPN_NONE,
) -> ssl.SSLContext:
# Cached version of _create_client_context
return _create_client_context(ssl_cipher_list)
return _create_client_context(ssl_cipher_list, alpn_protocols)
# Create this only once and reuse it
_DEFAULT_SSL_CONTEXT = _client_context(SSLCipherList.PYTHON_DEFAULT)
_DEFAULT_NO_VERIFY_SSL_CONTEXT = _client_context_no_verify(SSLCipherList.PYTHON_DEFAULT)
_NO_VERIFY_SSL_CONTEXTS = {
SSLCipherList.INTERMEDIATE: _client_context_no_verify(SSLCipherList.INTERMEDIATE),
SSLCipherList.MODERN: _client_context_no_verify(SSLCipherList.MODERN),
SSLCipherList.INSECURE: _client_context_no_verify(SSLCipherList.INSECURE),
}
_SSL_CONTEXTS = {
SSLCipherList.INTERMEDIATE: _client_context(SSLCipherList.INTERMEDIATE),
SSLCipherList.MODERN: _client_context(SSLCipherList.MODERN),
SSLCipherList.INSECURE: _client_context(SSLCipherList.INSECURE),
}
# Pre-warm the cache for ALL SSL context configurations at module load time.
# This is critical because creating SSL contexts loads certificates from disk,
# which is blocking I/O that must not happen in the event loop.
_SSL_ALPN_PROTOCOLS = (SSL_ALPN_NONE, SSL_ALPN_HTTP11, SSL_ALPN_HTTP11_HTTP2)
for _cipher in SSLCipherList:
for _alpn in _SSL_ALPN_PROTOCOLS:
_client_context(_cipher, _alpn)
_client_context_no_verify(_cipher, _alpn)
def get_default_context() -> ssl.SSLContext:
"""Return the default SSL context."""
return _DEFAULT_SSL_CONTEXT
return _client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11)
def get_default_no_verify_context() -> ssl.SSLContext:
"""Return the default SSL context that does not verify the server certificate."""
return _DEFAULT_NO_VERIFY_SSL_CONTEXT
return _client_context_no_verify(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11)
def client_context_no_verify(
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
alpn_protocols: SSLALPNProtocols = SSL_ALPN_NONE,
) -> ssl.SSLContext:
"""Return a SSL context with no verification with a specific ssl cipher."""
return _NO_VERIFY_SSL_CONTEXTS.get(ssl_cipher_list, _DEFAULT_NO_VERIFY_SSL_CONTEXT)
return _client_context_no_verify(ssl_cipher_list, alpn_protocols)
def client_context(
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
alpn_protocols: SSLALPNProtocols = SSL_ALPN_NONE,
) -> ssl.SSLContext:
"""Return an SSL context for making requests."""
return _SSL_CONTEXTS.get(ssl_cipher_list, _DEFAULT_SSL_CONTEXT)
return _client_context(ssl_cipher_list, alpn_protocols)
def create_client_context(
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
alpn_protocols: SSLALPNProtocols = SSL_ALPN_NONE,
) -> ssl.SSLContext:
"""Return an independent SSL context for making requests."""
# This explicitly uses the non-cached version to create a client context
return _create_client_context(ssl_cipher_list)
return _create_client_context(ssl_cipher_list, alpn_protocols)
def create_no_verify_ssl_context(
ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
alpn_protocols: SSLALPNProtocols = SSL_ALPN_NONE,
) -> ssl.SSLContext:
"""Return an SSL context that does not verify the server certificate."""
return _client_context_no_verify(ssl_cipher_list)
return _client_context_no_verify(ssl_cipher_list, alpn_protocols)
def server_context_modern() -> ssl.SSLContext:

View File

@@ -22,6 +22,7 @@ from homeassistant.const import (
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import aiohttp_client as client
from homeassistant.util import ssl as ssl_util
from homeassistant.util.color import RGBColor
from homeassistant.util.ssl import SSLCipherList
@@ -413,3 +414,29 @@ async def test_resolver_is_singleton(hass: HomeAssistant) -> None:
assert isinstance(session3._connector, aiohttp.TCPConnector)
assert session._connector._resolver is session2._connector._resolver
assert session._connector._resolver is session3._connector._resolver
async def test_connector_uses_http11_alpn(hass: HomeAssistant) -> None:
"""Test that connector uses HTTP/1.1 ALPN protocols."""
with patch.object(
ssl_util, "client_context", wraps=ssl_util.client_context
) as mock_client_context:
client.async_get_clientsession(hass)
# Verify client_context was called with HTTP/1.1 ALPN
mock_client_context.assert_called_once_with(
SSLCipherList.PYTHON_DEFAULT, ssl_util.SSL_ALPN_HTTP11
)
async def test_connector_no_verify_uses_http11_alpn(hass: HomeAssistant) -> None:
"""Test that connector without SSL verification uses HTTP/1.1 ALPN protocols."""
with patch.object(
ssl_util, "client_context_no_verify", wraps=ssl_util.client_context_no_verify
) as mock_client_context_no_verify:
client.async_get_clientsession(hass, verify_ssl=False)
# Verify client_context_no_verify was called with HTTP/1.1 ALPN
mock_client_context_no_verify.assert_called_once_with(
SSLCipherList.PYTHON_DEFAULT, ssl_util.SSL_ALPN_HTTP11
)

View File

@@ -8,6 +8,7 @@ import pytest
from homeassistant.const import EVENT_HOMEASSISTANT_CLOSE
from homeassistant.core import HomeAssistant
from homeassistant.helpers import httpx_client as client
from homeassistant.util.ssl import SSL_ALPN_HTTP11, SSL_ALPN_HTTP11_HTTP2
from tests.common import MockModule, extract_stack_to_frame, mock_integration
@@ -16,14 +17,20 @@ async def test_get_async_client_with_ssl(hass: HomeAssistant) -> None:
"""Test init async client with ssl."""
client.get_async_client(hass)
assert isinstance(hass.data[client.DATA_ASYNC_CLIENT], httpx.AsyncClient)
assert isinstance(
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)],
httpx.AsyncClient,
)
async def test_get_async_client_without_ssl(hass: HomeAssistant) -> None:
"""Test init async client without ssl."""
client.get_async_client(hass, verify_ssl=False)
assert isinstance(hass.data[client.DATA_ASYNC_CLIENT_NOVERIFY], httpx.AsyncClient)
assert isinstance(
hass.data[client.DATA_ASYNC_CLIENT][(False, SSL_ALPN_HTTP11)],
httpx.AsyncClient,
)
async def test_create_async_httpx_client_with_ssl_and_cookies(
@@ -34,7 +41,7 @@ async def test_create_async_httpx_client_with_ssl_and_cookies(
httpx_client = client.create_async_httpx_client(hass, cookies={"bla": True})
assert isinstance(httpx_client, httpx.AsyncClient)
assert hass.data[client.DATA_ASYNC_CLIENT] != httpx_client
assert hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)] != httpx_client
async def test_create_async_httpx_client_without_ssl_and_cookies(
@@ -47,31 +54,37 @@ async def test_create_async_httpx_client_without_ssl_and_cookies(
hass, verify_ssl=False, cookies={"bla": True}
)
assert isinstance(httpx_client, httpx.AsyncClient)
assert hass.data[client.DATA_ASYNC_CLIENT_NOVERIFY] != httpx_client
assert hass.data[client.DATA_ASYNC_CLIENT][(False, SSL_ALPN_HTTP11)] != httpx_client
async def test_get_async_client_cleanup(hass: HomeAssistant) -> None:
"""Test init async client with ssl."""
client.get_async_client(hass)
assert isinstance(hass.data[client.DATA_ASYNC_CLIENT], httpx.AsyncClient)
assert isinstance(
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)],
httpx.AsyncClient,
)
hass.bus.async_fire(EVENT_HOMEASSISTANT_CLOSE)
await hass.async_block_till_done()
assert hass.data[client.DATA_ASYNC_CLIENT].is_closed
assert hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)].is_closed
async def test_get_async_client_cleanup_without_ssl(hass: HomeAssistant) -> None:
"""Test init async client without ssl."""
client.get_async_client(hass, verify_ssl=False)
assert isinstance(hass.data[client.DATA_ASYNC_CLIENT_NOVERIFY], httpx.AsyncClient)
assert isinstance(
hass.data[client.DATA_ASYNC_CLIENT][(False, SSL_ALPN_HTTP11)],
httpx.AsyncClient,
)
hass.bus.async_fire(EVENT_HOMEASSISTANT_CLOSE)
await hass.async_block_till_done()
assert hass.data[client.DATA_ASYNC_CLIENT_NOVERIFY].is_closed
assert hass.data[client.DATA_ASYNC_CLIENT][(False, SSL_ALPN_HTTP11)].is_closed
async def test_get_async_client_patched_close(hass: HomeAssistant) -> None:
@@ -79,7 +92,10 @@ async def test_get_async_client_patched_close(hass: HomeAssistant) -> None:
with patch("httpx.AsyncClient.aclose") as mock_aclose:
httpx_session = client.get_async_client(hass)
assert isinstance(hass.data[client.DATA_ASYNC_CLIENT], httpx.AsyncClient)
assert isinstance(
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)],
httpx.AsyncClient,
)
with pytest.raises(RuntimeError):
await httpx_session.aclose()
@@ -92,7 +108,10 @@ async def test_get_async_client_context_manager(hass: HomeAssistant) -> None:
with patch("httpx.AsyncClient.aclose") as mock_aclose:
httpx_session = client.get_async_client(hass)
assert isinstance(hass.data[client.DATA_ASYNC_CLIENT], httpx.AsyncClient)
assert isinstance(
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)],
httpx.AsyncClient,
)
async with httpx_session:
pass
@@ -100,6 +119,80 @@ async def test_get_async_client_context_manager(hass: HomeAssistant) -> None:
assert mock_aclose.call_count == 0
async def test_get_async_client_http2(hass: HomeAssistant) -> None:
"""Test init async client with HTTP/2 support."""
http1_client = client.get_async_client(hass)
http2_client = client.get_async_client(hass, alpn_protocols=SSL_ALPN_HTTP11_HTTP2)
# HTTP/1.1 and HTTP/2 clients should be different (different SSL contexts)
assert http1_client is not http2_client
assert isinstance(
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11)],
httpx.AsyncClient,
)
assert isinstance(
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11_HTTP2)],
httpx.AsyncClient,
)
# Same parameters should return cached client
assert client.get_async_client(hass) is http1_client
assert (
client.get_async_client(hass, alpn_protocols=SSL_ALPN_HTTP11_HTTP2)
is http2_client
)
async def test_get_async_client_http2_cleanup(hass: HomeAssistant) -> None:
"""Test cleanup of HTTP/2 async client."""
client.get_async_client(hass, alpn_protocols=SSL_ALPN_HTTP11_HTTP2)
assert isinstance(
hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11_HTTP2)],
httpx.AsyncClient,
)
hass.bus.async_fire(EVENT_HOMEASSISTANT_CLOSE)
await hass.async_block_till_done()
assert hass.data[client.DATA_ASYNC_CLIENT][(True, SSL_ALPN_HTTP11_HTTP2)].is_closed
async def test_get_async_client_http2_without_ssl(hass: HomeAssistant) -> None:
"""Test init async client with HTTP/2 and without SSL."""
http2_client = client.get_async_client(
hass, verify_ssl=False, alpn_protocols=SSL_ALPN_HTTP11_HTTP2
)
assert isinstance(
hass.data[client.DATA_ASYNC_CLIENT][(False, SSL_ALPN_HTTP11_HTTP2)],
httpx.AsyncClient,
)
# Same parameters should return cached client
assert (
client.get_async_client(
hass, verify_ssl=False, alpn_protocols=SSL_ALPN_HTTP11_HTTP2
)
is http2_client
)
async def test_create_async_httpx_client_http2(hass: HomeAssistant) -> None:
"""Test create async client with HTTP/2 uses correct ALPN protocols."""
http1_client = client.create_async_httpx_client(hass)
http2_client = client.create_async_httpx_client(
hass, alpn_protocols=SSL_ALPN_HTTP11_HTTP2
)
# Different clients (not cached)
assert http1_client is not http2_client
# Both should be valid clients
assert isinstance(http1_client, httpx.AsyncClient)
assert isinstance(http2_client, httpx.AsyncClient)
async def test_warning_close_session_integration(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:

View File

@@ -1,78 +1,58 @@
"""Test Home Assistant ssl utility functions."""
from unittest.mock import MagicMock, Mock, patch
import pytest
from homeassistant.util.ssl import (
SSL_ALPN_HTTP11,
SSL_ALPN_HTTP11_HTTP2,
SSL_ALPN_NONE,
SSLCipherList,
client_context,
client_context_no_verify,
create_client_context,
create_no_verify_ssl_context,
get_default_context,
get_default_no_verify_context,
)
@pytest.fixture
def mock_sslcontext():
"""Mock the ssl lib."""
return MagicMock(set_ciphers=Mock(return_value=True))
def test_client_context(mock_sslcontext) -> None:
"""Test client context."""
with patch("homeassistant.util.ssl.ssl.SSLContext", return_value=mock_sslcontext):
client_context()
mock_sslcontext.set_ciphers.assert_not_called()
client_context(SSLCipherList.MODERN)
mock_sslcontext.set_ciphers.assert_not_called()
client_context(SSLCipherList.INTERMEDIATE)
mock_sslcontext.set_ciphers.assert_not_called()
client_context(SSLCipherList.INSECURE)
mock_sslcontext.set_ciphers.assert_not_called()
def test_no_verify_ssl_context(mock_sslcontext) -> None:
"""Test no verify ssl context."""
with patch("homeassistant.util.ssl.ssl.SSLContext", return_value=mock_sslcontext):
create_no_verify_ssl_context()
mock_sslcontext.set_ciphers.assert_not_called()
create_no_verify_ssl_context(SSLCipherList.MODERN)
mock_sslcontext.set_ciphers.assert_not_called()
create_no_verify_ssl_context(SSLCipherList.INTERMEDIATE)
mock_sslcontext.set_ciphers.assert_not_called()
create_no_verify_ssl_context(SSLCipherList.INSECURE)
mock_sslcontext.set_ciphers.assert_not_called()
def test_ssl_context_caching() -> None:
"""Test that SSLContext instances are cached correctly."""
assert client_context() is client_context(SSLCipherList.PYTHON_DEFAULT)
assert create_no_verify_ssl_context() is create_no_verify_ssl_context(
SSLCipherList.PYTHON_DEFAULT
)
def test_create_client_context(mock_sslcontext) -> None:
"""Test create client context."""
with patch("homeassistant.util.ssl.ssl.SSLContext", return_value=mock_sslcontext):
client_context()
mock_sslcontext.set_ciphers.assert_not_called()
def test_ssl_context_cipher_bucketing() -> None:
"""Test that SSL contexts are bucketed by cipher list."""
default_ctx = client_context(SSLCipherList.PYTHON_DEFAULT)
modern_ctx = client_context(SSLCipherList.MODERN)
intermediate_ctx = client_context(SSLCipherList.INTERMEDIATE)
insecure_ctx = client_context(SSLCipherList.INSECURE)
client_context(SSLCipherList.MODERN)
mock_sslcontext.set_ciphers.assert_not_called()
# Different cipher lists should return different contexts
assert default_ctx is not modern_ctx
assert default_ctx is not intermediate_ctx
assert default_ctx is not insecure_ctx
assert modern_ctx is not intermediate_ctx
assert modern_ctx is not insecure_ctx
assert intermediate_ctx is not insecure_ctx
client_context(SSLCipherList.INTERMEDIATE)
mock_sslcontext.set_ciphers.assert_not_called()
# Same parameters should return cached context
assert client_context(SSLCipherList.PYTHON_DEFAULT) is default_ctx
assert client_context(SSLCipherList.MODERN) is modern_ctx
client_context(SSLCipherList.INSECURE)
mock_sslcontext.set_ciphers.assert_not_called()
def test_no_verify_ssl_context_cipher_bucketing() -> None:
"""Test that no-verify SSL contexts are bucketed by cipher list."""
default_ctx = create_no_verify_ssl_context(SSLCipherList.PYTHON_DEFAULT)
modern_ctx = create_no_verify_ssl_context(SSLCipherList.MODERN)
# Different cipher lists should return different contexts
assert default_ctx is not modern_ctx
# Same parameters should return cached context
assert create_no_verify_ssl_context(SSLCipherList.PYTHON_DEFAULT) is default_ctx
assert create_no_verify_ssl_context(SSLCipherList.MODERN) is modern_ctx
def test_create_client_context_independent() -> None:
@@ -82,3 +62,129 @@ def test_create_client_context_independent() -> None:
independent_context_2 = create_client_context()
assert shared_context is not independent_context_1
assert independent_context_1 is not independent_context_2
def test_ssl_context_alpn_bucketing() -> None:
"""Test that SSL contexts are bucketed by ALPN protocols.
Different ALPN protocol configurations should return different cached contexts
to prevent downstream libraries (e.g., httpx/httpcore) from mutating shared
contexts with incompatible settings.
"""
# HTTP/1.1, HTTP/2, and no-ALPN contexts should all be different
http1_context = client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11)
http2_context = client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11_HTTP2)
no_alpn_context = client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_NONE)
assert http1_context is not http2_context
assert http1_context is not no_alpn_context
assert http2_context is not no_alpn_context
# Same parameters should return cached context
assert (
client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11) is http1_context
)
assert (
client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11_HTTP2)
is http2_context
)
assert (
client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_NONE) is no_alpn_context
)
# No-verify contexts should also be bucketed by ALPN
http1_no_verify = client_context_no_verify(
SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11
)
http2_no_verify = client_context_no_verify(
SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11_HTTP2
)
no_alpn_no_verify = client_context_no_verify(
SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_NONE
)
assert http1_no_verify is not http2_no_verify
assert http1_no_verify is not no_alpn_no_verify
assert http2_no_verify is not no_alpn_no_verify
# create_no_verify_ssl_context should also work with ALPN
assert (
create_no_verify_ssl_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11)
is http1_no_verify
)
assert (
create_no_verify_ssl_context(
SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11_HTTP2
)
is http2_no_verify
)
assert (
create_no_verify_ssl_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_NONE)
is no_alpn_no_verify
)
def test_ssl_context_insecure_alpn_bucketing() -> None:
"""Test that INSECURE cipher list SSL contexts are bucketed by ALPN protocols.
INSECURE cipher list is used by some integrations that need to connect to
devices with outdated TLS implementations.
"""
# HTTP/1.1, HTTP/2, and no-ALPN contexts should all be different
http1_context = client_context(SSLCipherList.INSECURE, SSL_ALPN_HTTP11)
http2_context = client_context(SSLCipherList.INSECURE, SSL_ALPN_HTTP11_HTTP2)
no_alpn_context = client_context(SSLCipherList.INSECURE, SSL_ALPN_NONE)
assert http1_context is not http2_context
assert http1_context is not no_alpn_context
assert http2_context is not no_alpn_context
# Same parameters should return cached context
assert client_context(SSLCipherList.INSECURE, SSL_ALPN_HTTP11) is http1_context
assert (
client_context(SSLCipherList.INSECURE, SSL_ALPN_HTTP11_HTTP2) is http2_context
)
assert client_context(SSLCipherList.INSECURE, SSL_ALPN_NONE) is no_alpn_context
# No-verify contexts should also be bucketed by ALPN
http1_no_verify = client_context_no_verify(SSLCipherList.INSECURE, SSL_ALPN_HTTP11)
http2_no_verify = client_context_no_verify(
SSLCipherList.INSECURE, SSL_ALPN_HTTP11_HTTP2
)
no_alpn_no_verify = client_context_no_verify(SSLCipherList.INSECURE, SSL_ALPN_NONE)
assert http1_no_verify is not http2_no_verify
assert http1_no_verify is not no_alpn_no_verify
assert http2_no_verify is not no_alpn_no_verify
# create_no_verify_ssl_context should also work with ALPN
assert (
create_no_verify_ssl_context(SSLCipherList.INSECURE, SSL_ALPN_HTTP11)
is http1_no_verify
)
assert (
create_no_verify_ssl_context(SSLCipherList.INSECURE, SSL_ALPN_HTTP11_HTTP2)
is http2_no_verify
)
assert (
create_no_verify_ssl_context(SSLCipherList.INSECURE, SSL_ALPN_NONE)
is no_alpn_no_verify
)
def test_get_default_context_uses_http1_alpn() -> None:
"""Test that get_default_context returns context with HTTP1 ALPN."""
default_ctx = get_default_context()
default_no_verify_ctx = get_default_no_verify_context()
# Default contexts should be the same as explicitly requesting HTTP1 ALPN
assert default_ctx is client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11)
assert default_no_verify_ctx is client_context_no_verify(
SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11
)
def test_client_context_default_no_alpn() -> None:
"""Test that client_context defaults to no ALPN for backward compatibility."""
# Default (no ALPN) should be different from HTTP1 ALPN
default_ctx = client_context()
http1_ctx = client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_HTTP11)
assert default_ctx is not http1_ctx
assert default_ctx is client_context(SSLCipherList.PYTHON_DEFAULT, SSL_ALPN_NONE)