Files
wolfssl/scripts/multi-msg-record.py
T

669 lines
25 KiB
Python
Raw Normal View History

2026-04-10 16:32:35 +00:00
#!/usr/bin/env python3
#
# multi-msg-record.py
#
# Python half of scripts/multi-msg-record.test (the bash wrapper handles
# NETWORK_UNSHARE_HELPER / AM_BWRAPPED and the python3 availability
# check, then execs this script).
#
# Tests that wolfSSL correctly processes TLS records containing multiple
# handshake messages packed into a single record.
#
# Uses tlslite-ng as the TLS peer to craft multi-message records:
#
# TLS 1.2 Each connection tests TWO code paths back-to-back:
# 1. Initial handshake: RecordMergingSocket rewrites separate
# plaintext ServerHello + Certificate + ServerKeyExchange +
# ServerHelloDone records into one multi-message TLS
# record before forwarding to the wolfSSL client.
# 2. Renegotiation on the same connection: tlslite-ng is
# monkey-patched to coalesce SH+Cert+SKE+SHD into ONE
# encrypted handshake record (exercises the
# curSize -= padSz CBC-padding path and the AEAD path).
#
# TLS 1.3 tlslite-ng's _queue_message / _queue_flush mechanism already
# coalesces EncryptedExtensions + Certificate + CertificateVerify
# + Finished into a single encrypted record. The test verifies
# that wolfSSL parses this correctly.
#
# Multiple cipher suites are tested for both protocol versions.
#
# Requirements: python3, tlslite-ng (pip install tlslite-ng)
import socket
import struct
import subprocess
import os
import sys
import threading
import time
import types
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
def _find_wolfssl_dir():
# Under `make check` the working directory is the build tree, which is
# where the client binary lives and which differs from the tree
# containing this script in an out-of-tree (VPATH) build. Fall back to
# the script's parent for direct invocation from the source tree.
for root in (os.getcwd(), os.path.dirname(SCRIPT_DIR)):
if os.path.isfile(os.path.join(root, "examples", "client", "client")):
return root
return os.path.dirname(SCRIPT_DIR)
WOLFSSL_DIR = _find_wolfssl_dir()
2026-04-10 16:32:35 +00:00
WOLF_CLIENT = os.path.join(WOLFSSL_DIR, "examples", "client", "client")
CERT_DIR = os.path.join(WOLFSSL_DIR, "certs")
# CA cert path passed to the wolfSSL client via -A. Set in main() after
# detect_wolf_features() determines whether the build accepts PEM or DER.
WOLF_CA_CERT = os.path.join(CERT_DIR, "ca-cert.pem")
2026-04-10 16:32:35 +00:00
# ---------------------------------------------------------------------------
# Bypass a strict tlslite-ng validation that rejects wolfSSL's ClientHello
# when the client advertises FFDHE groups in a TLS-1.3-only hello.
# This must happen before importing TLSConnection.
#
# If tlslite-ng isn't installed we exit 77 so automake marks the test
# SKIPped instead of FAILed.
# ---------------------------------------------------------------------------
try:
import tlslite.tlsconnection # noqa: E402
import tlslite.recordlayer # noqa: E402
tlslite.tlsconnection.TLS_1_3_FORBIDDEN_GROUPS = frozenset()
from tlslite import ( # noqa: E402
TLSConnection, HandshakeSettings, X509CertChain, parsePEMKey,
)
from tlslite.constants import ContentType # noqa: E402
from tlslite.extensions import RenegotiationInfoExtension # noqa: E402
from tlslite.constants import ExtensionType # noqa: E402
from tlslite.messages import HelloMessage, Message as TLSMessage # noqa: E402
except ImportError as e:
sys.stdout.write(
"tlslite-ng not installed ({}); skipping multi-msg-record test\n"
" (install with: pip install tlslite-ng)\n".format(e))
sys.exit(77)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
HS_NAMES = {
2: "SH", 4: "NST", 8: "EE", 11: "Cert", 12: "SKE",
13: "CR", 14: "SHD", 15: "CV", 16: "CKE", 20: "Fin",
}
PASS_COUNT = 0
FAIL_COUNT = 0
SKIP_COUNT = 0
def passed(label):
global PASS_COUNT
PASS_COUNT += 1
print(f" PASS: {label}")
def failed(label):
global FAIL_COUNT
FAIL_COUNT += 1
print(f" FAIL: {label}")
def skipped(label):
global SKIP_COUNT
SKIP_COUNT += 1
print(f" SKIP: {label}")
def detect_wolf_features():
"""Probe the wolfSSL client binary to find which features are
compiled in. Used to decide which test phases to run.
Returns dict with keys: tls12 (bool), tls13 (bool),
secure_reneg (bool), rsa (bool), ciphers (set[str]), ca_cert (str).
2026-04-10 16:32:35 +00:00
"""
feats = {"tls12": False, "tls13": False, "secure_reneg": False,
"rsa": True,
"ciphers": set(),
"ca_cert": os.path.join(CERT_DIR, "ca-cert.pem")}
2026-04-10 16:32:35 +00:00
# ./client -V -> e.g. "3:4:d(downgrade):e(either):"
try:
r = subprocess.run([WOLF_CLIENT, "-V"],
capture_output=True, timeout=5)
parts = r.stdout.decode("utf-8", errors="replace").strip().split(":")
feats["tls12"] = "3" in parts
feats["tls13"] = "4" in parts
except Exception:
pass
# ./client -? -> help text includes "-R" only when
# HAVE_SECURE_RENEGOTIATION is defined. The default -A path
# ("ca-cert.pem" vs "ca-cert.der") also tells us which CA file
# format the build can load. The RSA key-size line reports
# "RSA not supported" when NO_RSA is defined.
2026-04-10 16:32:35 +00:00
try:
r = subprocess.run([WOLF_CLIENT, "-?"],
capture_output=True, timeout=5)
htxt = r.stdout.decode("utf-8", errors="replace")
feats["secure_reneg"] = ("Allow Secure Renegotiation" in htxt)
if "ca-cert.der" in htxt and "ca-cert.pem" not in htxt:
feats["ca_cert"] = os.path.join(CERT_DIR, "ca-cert.der")
if "RSA not supported" in htxt:
feats["rsa"] = False
2026-04-10 16:32:35 +00:00
except Exception:
pass
# ./client -e -> colon-separated list of supported cipher suites.
try:
r = subprocess.run([WOLF_CLIENT, "-e"],
capture_output=True, timeout=5)
ctxt = r.stdout.decode("utf-8", errors="replace").strip()
feats["ciphers"] = {c for c in ctxt.split(":") if c}
except Exception:
pass
2026-04-10 16:32:35 +00:00
return feats
def _load_chain(cert_file):
with open(cert_file) as f:
chain = X509CertChain()
chain.parsePemList(f.read())
return chain
def _load_key(key_file):
with open(key_file) as f:
return parsePEMKey(f.read(), private=True)
def _parse_hs_types(data):
"""Parse handshake message types from raw handshake content."""
msgs = []
off = 0
while off + 4 <= len(data):
ht = data[off]
hl = struct.unpack("!I", b"\x00" + bytes(data[off + 1 : off + 4]))[0]
msgs.append(HS_NAMES.get(ht, f"T{ht}"))
off += 4 + hl
return msgs
def _get_free_port():
"""Get an available TCP port."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
def _listen_socket():
"""Bind a listening TCP socket on localhost with the standard test timeout."""
port = _get_free_port()
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(("127.0.0.1", port))
srv.listen(1)
srv.settimeout(15)
return srv, port
def _run_wolf_client(port, version, cipher, extra=()):
"""Invoke the wolfSSL example client against 127.0.0.1:port.
WOLF_CA_CERT is PEM or DER depending on the build (NO_CODING /
OPENSSL_EXTRA builds don't both support PEM).
"""
2026-04-10 16:32:35 +00:00
cmd = [WOLF_CLIENT, "-h", "127.0.0.1", "-p", str(port),
"-v", version, "-A", WOLF_CA_CERT,
2026-04-10 16:32:35 +00:00
"-g", *extra]
if cipher:
cmd.extend(["-l", cipher])
return subprocess.run(cmd, capture_output=True, timeout=15)
class _SendRecordTrace:
"""Context manager that wraps RecordLayer.sendRecord to log every record."""
def __init__(self):
self.log = []
self._orig = None
def __enter__(self):
self._orig = tlslite.recordlayer.RecordLayer.sendRecord
log = self.log
orig = self._orig
def wrapper(self_rl, msg):
data = msg.write()
ct = msg.contentType
encrypted = bool(self_rl._writeState
and self_rl._writeState.encContext)
hs_msgs = []
if ct == ContentType.handshake:
hs_msgs = _parse_hs_types(data)
log.append((ct, encrypted, len(data), hs_msgs))
yield from orig(self_rl, msg)
tlslite.recordlayer.RecordLayer.sendRecord = wrapper
return self.log
def __exit__(self, *exc):
tlslite.recordlayer.RecordLayer.sendRecord = self._orig
# ---------------------------------------------------------------------------
# RecordMergingSocket (TLS 1.2 plaintext record merging)
# ---------------------------------------------------------------------------
class RecordMergingSocket:
"""Socket wrapper that rewrites consecutive TLS handshake records into
a single multi-message record. Only merges plaintext records that
precede ChangeCipherSpec."""
def __init__(self, sock):
self._sock = sock
self._pending = bytearray()
self._ver = 0x0303
self._after_ccs = False
self.merged_msgs = [] # [(n_msgs, [names], size)]
def _flush(self):
if not self._pending:
return
msgs = _parse_hs_types(self._pending)
n = len(msgs)
hdr = struct.pack("!BHH", 22, self._ver, len(self._pending))
self._sock.sendall(hdr + bytes(self._pending))
self.merged_msgs.append((n, msgs, len(self._pending)))
self._pending = bytearray()
# Called by BufferedSocket (one record per call, or multiple from flush)
def _process(self, data):
data = bytearray(data)
off = 0
while off + 5 <= len(data):
ct = data[off]
ver = struct.unpack("!H", data[off + 1 : off + 3])[0]
rlen = struct.unpack("!H", data[off + 3 : off + 5])[0]
if off + 5 + rlen > len(data):
break
payload = data[off + 5 : off + 5 + rlen]
if not self._after_ccs and ct == 22:
self._pending.extend(payload)
self._ver = ver
else:
if ct == 20:
self._after_ccs = True
self._flush()
self._sock.sendall(bytes(data[off : off + 5 + rlen]))
off += 5 + rlen
def send(self, data):
self._process(data)
return len(data)
def sendall(self, data):
self._process(data)
def recv(self, bufsize):
self._flush()
return self._sock.recv(bufsize)
def __getattr__(self, name):
return getattr(self._sock, name)
# ---------------------------------------------------------------------------
# Test runners
# ---------------------------------------------------------------------------
def run_tls12_test(cipher_wolf, cert_chain, priv_key, label,
do_reneg=True):
"""TLS 1.2 test one connection optionally exercises two code paths:
Phase 1 (plaintext grouping, initial handshake):
RecordMergingSocket rewrites separate plaintext ServerHello,
Certificate, ServerKeyExchange and ServerHelloDone records into
one multi-message TLS record before delivery to wolfSSL.
Phase 2 (encrypted grouping, renegotiation on same connection):
tlslite-ng server is monkey-patched to coalesce SH+Cert+SKE+SHD
into a single encrypted handshake record inside the renegotiation
(exercises wolfSSL's encrypted multi-message parsing including
curSize -= padSz for CBC padding).
Phase 2 is skipped when do_reneg=False (e.g. the wolfSSL client was
built without HAVE_SECURE_RENEGOTIATION).
"""
srv, port = _listen_socket()
result = {"ok": False, "error": ""}
msock_ref = [None]
trace_log = []
reneg_active = [False]
verify_data = {'client': None, 'server': None}
# --- monkey-patches (used only during this connection) ----------------
orig_calc_key = tlslite.tlsconnection.calc_key
def capturing_calc_key(*args, **kwargs):
res = orig_calc_key(*args, **kwargs)
lbl = args[3] if len(args) > 3 else kwargs.get('label', b'')
if lbl == b"client finished" and verify_data['client'] is None:
verify_data['client'] = bytearray(res)
elif lbl == b"server finished" and verify_data['server'] is None:
verify_data['server'] = bytearray(res)
return res
orig_getExt = HelloMessage.getExtension
def patched_getExt(self, ext_type):
ext = orig_getExt(self, ext_type)
if (ext_type == ExtensionType.renegotiation_info
and ext is not None and reneg_active[0]):
ext._internal_value = bytearray(0)
return ext
orig_rie_create = RenegotiationInfoExtension.create
def patched_rie_create(self, data):
if reneg_active[0] and data == bytearray(0):
combined = (bytearray(verify_data['client'])
+ bytearray(verify_data['server']))
return orig_rie_create(self, combined)
return orig_rie_create(self, data)
# ----------------------------------------------------------------------
def server():
try:
tlslite.tlsconnection.calc_key = capturing_calc_key
HelloMessage.getExtension = patched_getExt
RenegotiationInfoExtension.create = patched_rie_create
conn, _ = srv.accept()
conn.settimeout(15)
msock = RecordMergingSocket(conn)
msock_ref[0] = msock
tls = TLSConnection(msock)
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
# ---------- Phase 1: initial handshake (plaintext grouping) ----
tls.handshakeServer(certChain=cert_chain, privateKey=priv_key,
settings=settings)
tlslite.tlsconnection.calc_key = orig_calc_key
data = tls.recv(4096)
if do_reneg:
# ---------- Phase 2: trigger + run renegotiation ----------
hr = TLSMessage(ContentType.handshake,
bytearray([0, 0, 0, 0]))
for _ in tls._sendMsg(hr, randomizeFirstBlock=False,
update_hashes=False):
pass
# Bypass tlslite-ng renegotiation guards
tls.closed = True
tls.session = None
reneg_active[0] = True
# Coalesce handshake messages into ONE encrypted TLS record
def coalescing_sendMsgs(self, msgs):
for msg in msgs:
self._queue_message(msg)
yield from self._queue_flush()
tls._sendMsgs = types.MethodType(coalescing_sendMsgs, tls)
with _SendRecordTrace() as log:
tls.handshakeServer(certChain=cert_chain,
privateKey=priv_key,
settings=settings)
reneg_active[0] = False
trace_log.extend(log)
if data:
tls.send(data)
tls.close()
result["ok"] = True
except Exception as e:
import traceback
result["error"] = traceback.format_exc()
finally:
tlslite.tlsconnection.calc_key = orig_calc_key
HelloMessage.getExtension = orig_getExt
RenegotiationInfoExtension.create = orig_rie_create
reneg_active[0] = False
srv.close()
st = threading.Thread(target=server, daemon=True)
st.start()
time.sleep(0.1)
proc = _run_wolf_client(port, "3", cipher_wolf,
extra=("-R",) if do_reneg else ())
st.join(timeout=5)
if proc.returncode != 0 or not result["ok"]:
err = (result["error"]
or proc.stderr.decode("utf-8", errors="replace")[:400])
failed(f"{label}: connection failed ({err})")
return False
ok = True
# Phase 1 verification: plaintext multi-message record
msock = msock_ref[0]
has_pt_grouped = False
for n, msgs, sz in (msock.merged_msgs if msock else []):
if n > 1:
has_pt_grouped = True
passed(f"{label} [plaintext]: {n} msgs "
f"[{'+'.join(msgs)}] in one record ({sz} bytes)")
if not has_pt_grouped:
failed(f"{label} [plaintext]: no multi-message record detected")
ok = False
# Phase 2 verification: encrypted multi-message record (renego)
if do_reneg:
has_enc_grouped = False
for ct, enc, sz, msgs in trace_log:
if ct == ContentType.handshake and enc and len(msgs) > 1:
has_enc_grouped = True
passed(f"{label} [encrypted]: {len(msgs)} msgs "
f"[{'+'.join(msgs)}] in one record ({sz} bytes)")
if not has_enc_grouped:
failed(f"{label} [encrypted]: no multi-message "
f"encrypted record")
ok = False
return ok
def run_tls13_test(cipher_wolf, cert_chain, priv_key, label):
"""TLS 1.3: verify tlslite-ng sends multi-msg encrypted record and
wolfSSL client processes it."""
srv, port = _listen_socket()
result = {"ok": False, "error": ""}
def server():
try:
conn, _ = srv.accept()
conn.settimeout(15)
tls = TLSConnection(conn)
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
tls.handshakeServer(certChain=cert_chain, privateKey=priv_key,
settings=settings)
data = tls.recv(4096)
if data:
tls.send(data)
tls.close()
result["ok"] = True
except Exception as e:
result["error"] = str(e)
finally:
srv.close()
with _SendRecordTrace() as log:
st = threading.Thread(target=server, daemon=True)
st.start()
time.sleep(0.1)
proc = _run_wolf_client(port, "4", cipher_wolf)
st.join(timeout=5)
if proc.returncode != 0 or not result["ok"]:
err = result["error"] or proc.stderr.decode("utf-8", errors="replace")[:200]
failed(f"{label}: handshake failed ({err})")
return False
# Check that at least one encrypted handshake record has multiple messages
has_multi = False
for ct, enc, sz, msgs in log:
if ct == ContentType.handshake and enc and len(msgs) > 1:
has_multi = True
passed(f"{label}: {len(msgs)} encrypted msgs "
f"[{'+'.join(msgs)}] in one record ({sz} bytes)")
if not has_multi:
failed(f"{label}: no multi-message encrypted records")
return False
return True
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
if not os.path.isfile(WOLF_CLIENT):
print(f"ERROR: wolfSSL client not found: {WOLF_CLIENT}")
print(" Build wolfSSL first (./configure && make)")
sys.exit(1)
# Probe the client to see which features are compiled in so each
# phase of the test is only run when it can succeed.
feats = detect_wolf_features()
global WOLF_CA_CERT
WOLF_CA_CERT = feats["ca_cert"]
2026-04-10 16:32:35 +00:00
print("=" * 60)
print(" Multi-Message TLS Record Test")
print("=" * 60)
print(f" wolfSSL features: TLS1.2={feats['tls12']} "
f"TLS1.3={feats['tls13']} "
f"secure_reneg={feats['secure_reneg']} "
f"rsa={feats['rsa']}")
# The test certs are RSA; skip the whole test when the wolfSSL build
# has no RSA support (the client can't load or verify them).
if not feats["rsa"]:
print("\n wolfSSL built without RSA; skipping multi-msg-record "
"test (RSA test certs cannot be verified).")
sys.exit(77)
# Load certificate / key pairs
rsa_chain = _load_chain(os.path.join(CERT_DIR, "server-cert.pem"))
rsa_key = _load_key(os.path.join(CERT_DIR, "server-key.pem"))
2026-04-10 16:32:35 +00:00
# ------------------------------------------------------------------
# TLS 1.2 plaintext (initial HS) + optional encrypted (renegotiation)
# multi-message records, same connection per cipher suite.
# ------------------------------------------------------------------
tls12_suites = [
# (wolfSSL cipher name, description)
(None, "default negotiated"),
# AEAD (GCM)
("ECDHE-RSA-AES128-GCM-SHA256", "ECDHE-RSA AES128-GCM"),
("ECDHE-RSA-AES256-GCM-SHA384", "ECDHE-RSA AES256-GCM"),
("DHE-RSA-AES128-GCM-SHA256", "DHE-RSA AES128-GCM"),
("DHE-RSA-AES256-GCM-SHA384", "DHE-RSA AES256-GCM"),
# CBC + HMAC (exercises padding path)
("ECDHE-RSA-AES128-SHA256", "ECDHE-RSA AES128-CBC-SHA256"),
("ECDHE-RSA-AES256-SHA384", "ECDHE-RSA AES256-CBC-SHA384"),
("DHE-RSA-AES128-SHA256", "DHE-RSA AES128-CBC-SHA256"),
("DHE-RSA-AES256-SHA256", "DHE-RSA AES256-CBC-SHA256"),
# AEAD (ChaCha20-Poly1305)
("ECDHE-RSA-CHACHA20-POLY1305", "ECDHE-RSA CHACHA20-POLY1305"),
("DHE-RSA-CHACHA20-POLY1305", "DHE-RSA CHACHA20-POLY1305"),
]
if feats["tls12"]:
if feats["secure_reneg"]:
print("\n--- TLS 1.2: plaintext + encrypted multi-message "
"records ---")
print(" Each connection verifies BOTH code paths:")
print(" * initial handshake -> plaintext SH+Cert+SKE+SHD")
print(" * renegotiation -> encrypted SH+Cert+SKE+SHD")
else:
print("\n--- TLS 1.2: plaintext multi-message records ---")
print(" wolfSSL built without HAVE_SECURE_RENEGOTIATION;")
print(" skipping the encrypted (renegotiation) half.")
print(" Covers multiple key-exchanges, ciphers and MAC "
"families.\n")
for cipher, desc in tls12_suites:
if cipher and cipher not in feats["ciphers"]:
skipped(f"TLS1.2 {desc} - cipher not in wolfSSL build")
continue
2026-04-10 16:32:35 +00:00
run_tls12_test(cipher, rsa_chain, rsa_key,
f"TLS1.2 {desc}",
do_reneg=feats["secure_reneg"])
if not feats["secure_reneg"]:
skipped("TLS1.2 encrypted multi-msg record "
"(requires HAVE_SECURE_RENEGOTIATION)")
else:
skipped(f"TLS 1.2 tests ({len(tls12_suites)} suites) - "
"wolfSSL built without TLS 1.2")
# ------------------------------------------------------------------
# TLS 1.3 encrypted multi-message records
# ------------------------------------------------------------------
tls13_suites = [
# (wolfSSL cipher name, description)
(None, "default negotiated"),
("TLS13-AES128-GCM-SHA256", "AES-128-GCM"),
("TLS13-AES256-GCM-SHA384", "AES-256-GCM"),
("TLS13-CHACHA20-POLY1305-SHA256", "CHACHA20-POLY1305"),
]
if feats["tls13"]:
print("\n--- TLS 1.3: encrypted multi-message records ---")
print(" Server sends EE+Cert+CV+Fin in a single encrypted "
"record;")
print(" wolfSSL client must decrypt and parse.\n")
for cipher, desc in tls13_suites:
if cipher and cipher not in feats["ciphers"]:
skipped(f"TLS1.3 {desc} - cipher not in wolfSSL build")
continue
2026-04-10 16:32:35 +00:00
run_tls13_test(cipher, rsa_chain, rsa_key,
f"TLS1.3 {desc}")
else:
skipped(f"TLS 1.3 tests ({len(tls13_suites)} suites) - "
"wolfSSL built without TLS 1.3")
# ------------------------------------------------------------------
# Summary
# ------------------------------------------------------------------
print()
print("=" * 60)
print(f" Results: {PASS_COUNT} passed, {FAIL_COUNT} failed, "
f"{SKIP_COUNT} skipped")
print("=" * 60)
# If nothing at all could run, signal SKIP (exit 77) so automake
# records the test as skipped rather than passed-with-nothing.
if PASS_COUNT == 0 and FAIL_COUNT == 0:
sys.exit(77)
return FAIL_COUNT == 0
if __name__ == "__main__":
sys.exit(0 if main() else 1)