style: format python files with isort and double-quote-string-fixer

This commit is contained in:
Fu Hanxi
2021-01-26 10:49:01 +08:00
committed by Rocha Euripedes
parent 13f6635a51
commit 438b06bd6b
6 changed files with 285 additions and 287 deletions

View File

@@ -1,36 +1,35 @@
from __future__ import print_function
from __future__ import unicode_literals
from builtins import str
import re
import sys
import ssl
import paho.mqtt.client as mqtt
from threading import Thread, Event
import time
import string
from __future__ import print_function, unicode_literals
import random
import re
import ssl
import string
import sys
import time
from builtins import str
from threading import Event, Thread
from tiny_test_fw import DUT
import paho.mqtt.client as mqtt
import ttfw_idf
from tiny_test_fw import DUT
event_client_connected = Event()
event_stop_client = Event()
event_client_received_correct = Event()
message_log = ""
message_log = ''
broker_host = {}
broker_port = {}
expected_data = ""
subscribe_topic = ""
publish_topic = ""
expected_data = ''
subscribe_topic = ''
publish_topic = ''
expected_count = 0
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
print('Connected with result code ' + str(rc))
event_client_connected.set()
client.subscribe("/topic/qos0")
client.subscribe('/topic/qos0')
def mqtt_client_task(client):
@@ -52,8 +51,8 @@ def on_message(client, userdata, msg):
payload = msg.payload.decode()
if payload == expected_data:
expected_count += 1
print("[{}] Received...".format(msg.mid))
message_log += "Received data:" + msg.topic + " " + payload + "\n"
print('[{}] Received...'.format(msg.mid))
message_log += 'Received data:' + msg.topic + ' ' + payload + '\n'
def test_single_config(dut, transport, qos, repeat, published, queue=0):
@@ -63,49 +62,49 @@ def test_single_config(dut, transport, qos, repeat, published, queue=0):
sample_string = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(16))
event_client_connected.clear()
expected_count = 0
message_log = ""
message_log = ''
expected_data = sample_string * repeat
print("PUBLISH TEST: transport:{}, qos:{}, sequence:{}, enqueue:{}, sample msg:'{}'".format(transport, qos, published, queue, expected_data))
client = None
try:
if transport in ["ws", "wss"]:
client = mqtt.Client(transport="websockets")
if transport in ['ws', 'wss']:
client = mqtt.Client(transport='websockets')
else:
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
if transport in ["ssl", "wss"]:
if transport in ['ssl', 'wss']:
client.tls_set(None, None, None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
client.tls_insecure_set(True)
print("Connecting...")
print('Connecting...')
client.connect(broker_host[transport], broker_port[transport], 60)
except Exception:
print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_host[transport], sys.exc_info()[0]))
print('ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:'.format(broker_host[transport], sys.exc_info()[0]))
raise
# Starting a py-client in a separate thread
thread1 = Thread(target=mqtt_client_task, args=(client,))
thread1.start()
print("Connecting py-client to broker {}:{}...".format(broker_host[transport], broker_port[transport]))
print('Connecting py-client to broker {}:{}...'.format(broker_host[transport], broker_port[transport]))
if not event_client_connected.wait(timeout=30):
raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_host[transport]))
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_host[transport]))
client.subscribe(subscribe_topic, qos)
dut.write(' '.join(str(x) for x in (transport, sample_string, repeat, published, qos, queue)), eol="\n")
dut.write(' '.join(str(x) for x in (transport, sample_string, repeat, published, qos, queue)), eol='\n')
try:
# waiting till subscribed to defined topic
dut.expect(re.compile(r"MQTT_EVENT_SUBSCRIBED"), timeout=30)
dut.expect(re.compile(r'MQTT_EVENT_SUBSCRIBED'), timeout=30)
for i in range(published):
client.publish(publish_topic, sample_string * repeat, qos)
print("Publishing...")
print("Checking esp-client received msg published from py-client...")
dut.expect(re.compile(r"Correct pattern received exactly x times"), timeout=60)
print('Publishing...')
print('Checking esp-client received msg published from py-client...')
dut.expect(re.compile(r'Correct pattern received exactly x times'), timeout=60)
start = time.time()
while expected_count < published and time.time() - start <= 60:
time.sleep(1)
# Note: tolerate that messages qos=1 to be received more than once
if expected_count == published or (expected_count > published and qos == 1):
print("All data received from ESP32...")
print('All data received from ESP32...')
else:
raise ValueError("Not all data received from ESP32: Expected:{}x{}, Received:{}x{}".format(expected_count, published, expected_data, message_log))
raise ValueError('Not all data received from ESP32: Expected:{}x{}, Received:{}x{}'.format(expected_count, published, expected_data, message_log))
finally:
event_stop_client.set()
thread1.join()
@@ -113,7 +112,7 @@ def test_single_config(dut, transport, qos, repeat, published, queue=0):
event_stop_client.clear()
@ttfw_idf.idf_custom_test(env_tag="Example_WIFI")
@ttfw_idf.idf_custom_test(env_tag='Example_WIFI')
def test_weekend_mqtt_publish(env, extra_data):
# Using broker url dictionary for different transport
global broker_host
@@ -127,28 +126,28 @@ def test_weekend_mqtt_publish(env, extra_data):
3. Test evaluates python client received correct qos0 message
4. Test ESP32 client received correct qos0 message
"""
dut1 = env.get_dut("mqtt_publish_connect_test", "tools/test_apps/protocols/mqtt/publish_connect_test")
dut1 = env.get_dut('mqtt_publish_connect_test', 'tools/test_apps/protocols/mqtt/publish_connect_test')
# Look for host:port in sdkconfig
try:
# python client subscribes to the topic to which esp client publishes and vice versa
publish_topic = dut1.app.get_sdkconfig()["CONFIG_EXAMPLE_SUBSCIBE_TOPIC"].replace('"','')
subscribe_topic = dut1.app.get_sdkconfig()["CONFIG_EXAMPLE_PUBLISH_TOPIC"].replace('"','')
broker_host["ssl"], broker_port["ssl"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_SSL_URI")
broker_host["tcp"], broker_port["tcp"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_TCP_URI")
broker_host["ws"], broker_port["ws"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_WS_URI")
broker_host["wss"], broker_port["wss"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_WSS_URI")
publish_topic = dut1.app.get_sdkconfig()['CONFIG_EXAMPLE_SUBSCIBE_TOPIC'].replace('"','')
subscribe_topic = dut1.app.get_sdkconfig()['CONFIG_EXAMPLE_PUBLISH_TOPIC'].replace('"','')
broker_host['ssl'], broker_port['ssl'] = get_host_port_from_dut(dut1, 'CONFIG_EXAMPLE_BROKER_SSL_URI')
broker_host['tcp'], broker_port['tcp'] = get_host_port_from_dut(dut1, 'CONFIG_EXAMPLE_BROKER_TCP_URI')
broker_host['ws'], broker_port['ws'] = get_host_port_from_dut(dut1, 'CONFIG_EXAMPLE_BROKER_WS_URI')
broker_host['wss'], broker_port['wss'] = get_host_port_from_dut(dut1, 'CONFIG_EXAMPLE_BROKER_WSS_URI')
except Exception:
print('ENV_TEST_FAILURE: Cannot find broker url in sdkconfig')
raise
dut1.start_app()
try:
ip_address = dut1.expect(re.compile(r" IPv4 address: ([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)"), timeout=30)
print("Connected to AP with IP: {}".format(ip_address))
ip_address = dut1.expect(re.compile(r' IPv4 address: ([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
print('ENV_TEST_FAILURE: Cannot connect to AP')
raise
for qos in [0, 1, 2]:
for transport in ["tcp", "ssl", "ws", "wss"]:
for transport in ['tcp', 'ssl', 'ws', 'wss']:
for q in [0, 1]:
if broker_host[transport] is None:
print('Skipping transport: {}...'.format(transport))
@@ -156,14 +155,14 @@ def test_weekend_mqtt_publish(env, extra_data):
# simple test with empty message
test_single_config(dut1, transport, qos, 0, 5, q)
# decide on broker what level of test will pass (local broker works the best)
if broker_host[transport].startswith("192.168") and qos > 0 and q == 0:
if broker_host[transport].startswith('192.168') and qos > 0 and q == 0:
# medium size, medium repeated
test_single_config(dut1, transport, qos, 5, 50, q)
# long data
test_single_config(dut1, transport, qos, 1000, 10, q)
# short data, many repeats
test_single_config(dut1, transport, qos, 2, 200, q)
elif transport in ["ws", "wss"]:
elif transport in ['ws', 'wss']:
# more relaxed criteria for websockets!
test_single_config(dut1, transport, qos, 2, 5, q)
test_single_config(dut1, transport, qos, 50, 1, q)

View File

@@ -1,29 +1,28 @@
from __future__ import print_function
from __future__ import unicode_literals
from builtins import str
import re
from __future__ import print_function, unicode_literals
import os
import sys
import re
import ssl
import sys
from builtins import str
from threading import Event, Thread
import paho.mqtt.client as mqtt
from threading import Thread, Event
from tiny_test_fw import DUT
import ttfw_idf
from tiny_test_fw import DUT
event_client_connected = Event()
event_stop_client = Event()
event_client_received_correct = Event()
event_client_received_binary = Event()
message_log = ""
message_log = ''
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
print('Connected with result code ' + str(rc))
event_client_connected.set()
client.subscribe("/topic/qos0")
client.subscribe('/topic/qos0')
def mqtt_client_task(client):
@@ -36,33 +35,33 @@ def on_message(client, userdata, msg):
global message_log
global event_client_received_correct
global event_client_received_binary
if msg.topic == "/topic/binary":
if msg.topic == '/topic/binary':
binary = userdata
size = os.path.getsize(binary)
print("Receiving binary from esp and comparing with {}, size {}...".format(binary, size))
with open(binary, "rb") as f:
print('Receiving binary from esp and comparing with {}, size {}...'.format(binary, size))
with open(binary, 'rb') as f:
bin = f.read()
if bin == msg.payload[:size]:
print("...matches!")
print('...matches!')
event_client_received_binary.set()
return
else:
recv_binary = binary + ".received"
with open(recv_binary, "w") as fw:
recv_binary = binary + '.received'
with open(recv_binary, 'w') as fw:
fw.write(msg.payload)
raise ValueError('Received binary (saved as: {}) does not match the original file: {}'.format(recv_binary, binary))
payload = msg.payload.decode()
if not event_client_received_correct.is_set() and payload == "data":
client.subscribe("/topic/binary")
client.publish("/topic/qos0", "send binary please")
if msg.topic == "/topic/qos0" and payload == "data":
if not event_client_received_correct.is_set() and payload == 'data':
client.subscribe('/topic/binary')
client.publish('/topic/qos0', 'send binary please')
if msg.topic == '/topic/qos0' and payload == 'data':
event_client_received_correct.set()
message_log += "Received data:" + msg.topic + " " + payload + "\n"
message_log += 'Received data:' + msg.topic + ' ' + payload + '\n'
@ttfw_idf.idf_example_test(env_tag="Example_WIFI")
@ttfw_idf.idf_example_test(env_tag='Example_WIFI')
def test_examples_protocol_mqtt_ssl(env, extra_data):
broker_url = ""
broker_url = ''
broker_port = 0
"""
steps: |
@@ -72,15 +71,15 @@ def test_examples_protocol_mqtt_ssl(env, extra_data):
4. Test ESP32 client received correct qos0 message
5. Test python client receives binary data from running partition and compares it with the binary
"""
dut1 = env.get_dut("mqtt_ssl", "examples/protocols/mqtt/ssl", dut_class=ttfw_idf.ESP32DUT)
dut1 = env.get_dut('mqtt_ssl', 'examples/protocols/mqtt/ssl', dut_class=ttfw_idf.ESP32DUT)
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_ssl.bin")
binary_file = os.path.join(dut1.app.binary_path, 'mqtt_ssl.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance("mqtt_ssl_bin_size", "{}KB"
ttfw_idf.log_performance('mqtt_ssl_bin_size', '{}KB'
.format(bin_size // 1024))
# Look for host:port in sdkconfig
try:
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut1.app.get_sdkconfig()["CONFIG_BROKER_URI"])
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut1.app.get_sdkconfig()['CONFIG_BROKER_URI'])
broker_url = value.group(1)
broker_port = int(value.group(2))
except Exception:
@@ -97,31 +96,31 @@ def test_examples_protocol_mqtt_ssl(env, extra_data):
None,
None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
client.tls_insecure_set(True)
print("Connecting...")
print('Connecting...')
client.connect(broker_url, broker_port, 60)
except Exception:
print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_url, sys.exc_info()[0]))
print('ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:'.format(broker_url, sys.exc_info()[0]))
raise
# Starting a py-client in a separate thread
thread1 = Thread(target=mqtt_client_task, args=(client,))
thread1.start()
try:
print("Connecting py-client to broker {}:{}...".format(broker_url, broker_port))
print('Connecting py-client to broker {}:{}...'.format(broker_url, broker_port))
if not event_client_connected.wait(timeout=30):
raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_url))
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url))
dut1.start_app()
try:
ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
print("Connected to AP with IP: {}".format(ip_address))
ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
print('ENV_TEST_FAILURE: Cannot connect to AP')
raise
print("Checking py-client received msg published from esp...")
print('Checking py-client received msg published from esp...')
if not event_client_received_correct.wait(timeout=30):
raise ValueError('Wrong data received, msg log: {}'.format(message_log))
print("Checking esp-client received msg published from py-client...")
dut1.expect(re.compile(r"DATA=send binary please"), timeout=30)
print("Receiving binary data from running partition...")
print('Checking esp-client received msg published from py-client...')
dut1.expect(re.compile(r'DATA=send binary please'), timeout=30)
print('Receiving binary data from running partition...')
if not event_client_received_binary.wait(timeout=30):
raise ValueError('Binary not received within timeout')
finally:

View File

@@ -1,20 +1,20 @@
import re
import os
import sys
import re
import socket
from threading import Thread
import struct
import sys
import time
from threading import Thread
from tiny_test_fw import DUT
import ttfw_idf
from tiny_test_fw import DUT
msgid = -1
def get_my_ip():
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s1.connect(("8.8.8.8", 80))
s1.connect(('8.8.8.8', 80))
my_ip = s1.getsockname()[0]
s1.close()
return my_ip
@@ -22,7 +22,7 @@ def get_my_ip():
def mqqt_server_sketch(my_ip, port):
global msgid
print("Starting the server on {}".format(my_ip))
print('Starting the server on {}'.format(my_ip))
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -31,29 +31,29 @@ def mqqt_server_sketch(my_ip, port):
s.listen(1)
q,addr = s.accept()
q.settimeout(30)
print("connection accepted")
print('connection accepted')
except Exception:
print("Local server on {}:{} listening/accepting failure: {}"
"Possibly check permissions or firewall settings"
"to accept connections on this address".format(my_ip, port, sys.exc_info()[0]))
print('Local server on {}:{} listening/accepting failure: {}'
'Possibly check permissions or firewall settings'
'to accept connections on this address'.format(my_ip, port, sys.exc_info()[0]))
raise
data = q.recv(1024)
# check if received initial empty message
print("received from client {}".format(data))
print('received from client {}'.format(data))
data = bytearray([0x20, 0x02, 0x00, 0x00])
q.send(data)
# try to receive qos1
data = q.recv(1024)
msgid = struct.unpack(">H", data[15:17])[0]
print("received from client {}, msgid: {}".format(data, msgid))
msgid = struct.unpack('>H', data[15:17])[0]
print('received from client {}, msgid: {}'.format(data, msgid))
data = bytearray([0x40, 0x02, data[15], data[16]])
q.send(data)
time.sleep(5)
s.close()
print("server closed")
print('server closed')
@ttfw_idf.idf_example_test(env_tag="Example_WIFI")
@ttfw_idf.idf_example_test(env_tag='Example_WIFI')
def test_examples_protocol_mqtt_qos1(env, extra_data):
global msgid
"""
@@ -63,11 +63,11 @@ def test_examples_protocol_mqtt_qos1(env, extra_data):
3. Test evaluates that qos1 message is queued and removed from queued after ACK received
4. Test the broker received the same message id evaluated in step 3
"""
dut1 = env.get_dut("mqtt_tcp", "examples/protocols/mqtt/tcp", dut_class=ttfw_idf.ESP32DUT)
dut1 = env.get_dut('mqtt_tcp', 'examples/protocols/mqtt/tcp', dut_class=ttfw_idf.ESP32DUT)
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_tcp.bin")
binary_file = os.path.join(dut1.app.binary_path, 'mqtt_tcp.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance("mqtt_tcp_bin_size", "{}KB".format(bin_size // 1024))
ttfw_idf.log_performance('mqtt_tcp_bin_size', '{}KB'.format(bin_size // 1024))
# 1. start mqtt broker sketch
host_ip = get_my_ip()
thread1 = Thread(target=mqqt_server_sketch, args=(host_ip,1883))
@@ -76,23 +76,23 @@ def test_examples_protocol_mqtt_qos1(env, extra_data):
dut1.start_app()
# waiting for getting the IP address
try:
ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
print("Connected to AP with IP: {}".format(ip_address))
ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
print("writing to device: {}".format("mqtt://" + host_ip + "\n"))
dut1.write("mqtt://" + host_ip + "\n")
print('writing to device: {}'.format('mqtt://' + host_ip + '\n'))
dut1.write('mqtt://' + host_ip + '\n')
thread1.join()
print("Message id received from server: {}".format(msgid))
print('Message id received from server: {}'.format(msgid))
# 3. check the message id was enqueued and then deleted
msgid_enqueued = dut1.expect(re.compile(r"OUTBOX: ENQUEUE msgid=([0-9]+)"), timeout=30)
msgid_deleted = dut1.expect(re.compile(r"OUTBOX: DELETED msgid=([0-9]+)"), timeout=30)
msgid_enqueued = dut1.expect(re.compile(r'OUTBOX: ENQUEUE msgid=([0-9]+)'), timeout=30)
msgid_deleted = dut1.expect(re.compile(r'OUTBOX: DELETED msgid=([0-9]+)'), timeout=30)
# 4. check the msgid of received data are the same as that of enqueued and deleted from outbox
if (msgid_enqueued[0] == str(msgid) and msgid_deleted[0] == str(msgid)):
print("PASS: Received correct msg id")
print('PASS: Received correct msg id')
else:
print("Failure!")
print('Failure!')
raise ValueError('Mismatch of msgid: received: {}, enqueued {}, deleted {}'.format(msgid, msgid_enqueued, msgid_deleted))

View File

@@ -1,26 +1,26 @@
from __future__ import print_function
from __future__ import unicode_literals
from builtins import str
import re
import os
import sys
import paho.mqtt.client as mqtt
from threading import Thread, Event
from __future__ import print_function, unicode_literals
from tiny_test_fw import DUT
import os
import re
import sys
from builtins import str
from threading import Event, Thread
import paho.mqtt.client as mqtt
import ttfw_idf
from tiny_test_fw import DUT
event_client_connected = Event()
event_stop_client = Event()
event_client_received_correct = Event()
message_log = ""
message_log = ''
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
print('Connected with result code ' + str(rc))
event_client_connected.set()
client.subscribe("/topic/qos0")
client.subscribe('/topic/qos0')
def mqtt_client_task(client):
@@ -32,16 +32,16 @@ def mqtt_client_task(client):
def on_message(client, userdata, msg):
global message_log
payload = msg.payload.decode()
if not event_client_received_correct.is_set() and payload == "data":
client.publish("/topic/qos0", "data_to_esp32")
if msg.topic == "/topic/qos0" and payload == "data":
if not event_client_received_correct.is_set() and payload == 'data':
client.publish('/topic/qos0', 'data_to_esp32')
if msg.topic == '/topic/qos0' and payload == 'data':
event_client_received_correct.set()
message_log += "Received data:" + msg.topic + " " + payload + "\n"
message_log += 'Received data:' + msg.topic + ' ' + payload + '\n'
@ttfw_idf.idf_example_test(env_tag="Example_WIFI")
@ttfw_idf.idf_example_test(env_tag='Example_WIFI')
def test_examples_protocol_mqtt_ws(env, extra_data):
broker_url = ""
broker_url = ''
broker_port = 0
"""
steps: |
@@ -50,14 +50,14 @@ def test_examples_protocol_mqtt_ws(env, extra_data):
3. Test evaluates it received correct qos0 message
4. Test ESP32 client received correct qos0 message
"""
dut1 = env.get_dut("mqtt_websocket", "examples/protocols/mqtt/ws", dut_class=ttfw_idf.ESP32DUT)
dut1 = env.get_dut('mqtt_websocket', 'examples/protocols/mqtt/ws', dut_class=ttfw_idf.ESP32DUT)
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_websocket.bin")
binary_file = os.path.join(dut1.app.binary_path, 'mqtt_websocket.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance("mqtt_websocket_bin_size", "{}KB".format(bin_size // 1024))
ttfw_idf.log_performance('mqtt_websocket_bin_size', '{}KB'.format(bin_size // 1024))
# Look for host:port in sdkconfig
try:
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut1.app.get_sdkconfig()["CONFIG_BROKER_URI"])
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut1.app.get_sdkconfig()['CONFIG_BROKER_URI'])
broker_url = value.group(1)
broker_port = int(value.group(2))
except Exception:
@@ -66,33 +66,33 @@ def test_examples_protocol_mqtt_ws(env, extra_data):
client = None
# 1. Test connects to a broker
try:
client = mqtt.Client(transport="websockets")
client = mqtt.Client(transport='websockets')
client.on_connect = on_connect
client.on_message = on_message
print("Connecting...")
print('Connecting...')
client.connect(broker_url, broker_port, 60)
except Exception:
print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_url, sys.exc_info()[0]))
print('ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:'.format(broker_url, sys.exc_info()[0]))
raise
# Starting a py-client in a separate thread
thread1 = Thread(target=mqtt_client_task, args=(client,))
thread1.start()
try:
print("Connecting py-client to broker {}:{}...".format(broker_url, broker_port))
print('Connecting py-client to broker {}:{}...'.format(broker_url, broker_port))
if not event_client_connected.wait(timeout=30):
raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_url))
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url))
dut1.start_app()
try:
ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
print("Connected to AP with IP: {}".format(ip_address))
ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
print('ENV_TEST_FAILURE: Cannot connect to AP')
raise
print("Checking py-client received msg published from esp...")
print('Checking py-client received msg published from esp...')
if not event_client_received_correct.wait(timeout=30):
raise ValueError('Wrong data received, msg log: {}'.format(message_log))
print("Checking esp-client received msg published from py-client...")
dut1.expect(re.compile(r"DATA=data_to_esp32"), timeout=30)
print('Checking esp-client received msg published from py-client...')
dut1.expect(re.compile(r'DATA=data_to_esp32'), timeout=30)
finally:
event_stop_client.set()
thread1.join()

View File

@@ -1,28 +1,27 @@
from __future__ import unicode_literals
from __future__ import unicode_literals
from builtins import str
import re
import os
import sys
import re
import ssl
import sys
from builtins import str
from threading import Event, Thread
import paho.mqtt.client as mqtt
from threading import Thread, Event
from tiny_test_fw import DUT
import ttfw_idf
from tiny_test_fw import DUT
event_client_connected = Event()
event_stop_client = Event()
event_client_received_correct = Event()
message_log = ""
message_log = ''
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
print('Connected with result code ' + str(rc))
event_client_connected.set()
client.subscribe("/topic/qos0")
client.subscribe('/topic/qos0')
def mqtt_client_task(client):
@@ -34,16 +33,16 @@ def mqtt_client_task(client):
def on_message(client, userdata, msg):
global message_log
payload = msg.payload.decode()
if not event_client_received_correct.is_set() and payload == "data":
client.publish("/topic/qos0", "data_to_esp32")
if msg.topic == "/topic/qos0" and payload == "data":
if not event_client_received_correct.is_set() and payload == 'data':
client.publish('/topic/qos0', 'data_to_esp32')
if msg.topic == '/topic/qos0' and payload == 'data':
event_client_received_correct.set()
message_log += "Received data:" + msg.topic + " " + payload + "\n"
message_log += 'Received data:' + msg.topic + ' ' + payload + '\n'
@ttfw_idf.idf_example_test(env_tag="Example_WIFI")
@ttfw_idf.idf_example_test(env_tag='Example_WIFI')
def test_examples_protocol_mqtt_wss(env, extra_data):
broker_url = ""
broker_url = ''
broker_port = 0
"""
steps: |
@@ -52,14 +51,14 @@ def test_examples_protocol_mqtt_wss(env, extra_data):
3. Test evaluates it received correct qos0 message
4. Test ESP32 client received correct qos0 message
"""
dut1 = env.get_dut("mqtt_websocket_secure", "examples/protocols/mqtt/wss", dut_class=ttfw_idf.ESP32DUT)
dut1 = env.get_dut('mqtt_websocket_secure', 'examples/protocols/mqtt/wss', dut_class=ttfw_idf.ESP32DUT)
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_websocket_secure.bin")
binary_file = os.path.join(dut1.app.binary_path, 'mqtt_websocket_secure.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance("mqtt_websocket_secure_bin_size", "{}KB".format(bin_size // 1024))
ttfw_idf.log_performance('mqtt_websocket_secure_bin_size', '{}KB'.format(bin_size // 1024))
# Look for host:port in sdkconfig
try:
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut1.app.get_sdkconfig()["CONFIG_BROKER_URI"])
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut1.app.get_sdkconfig()['CONFIG_BROKER_URI'])
broker_url = value.group(1)
broker_port = int(value.group(2))
except Exception:
@@ -68,36 +67,36 @@ def test_examples_protocol_mqtt_wss(env, extra_data):
client = None
# 1. Test connects to a broker
try:
client = mqtt.Client(transport="websockets")
client = mqtt.Client(transport='websockets')
client.on_connect = on_connect
client.on_message = on_message
client.tls_set(None,
None,
None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
print("Connecting...")
print('Connecting...')
client.connect(broker_url, broker_port, 60)
except Exception:
print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_url, sys.exc_info()[0]))
print('ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:'.format(broker_url, sys.exc_info()[0]))
raise
# Starting a py-client in a separate thread
thread1 = Thread(target=mqtt_client_task, args=(client,))
thread1.start()
try:
print("Connecting py-client to broker {}:{}...".format(broker_url, broker_port))
print('Connecting py-client to broker {}:{}...'.format(broker_url, broker_port))
if not event_client_connected.wait(timeout=30):
raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_url))
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_url))
dut1.start_app()
try:
ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
print("Connected to AP with IP: {}".format(ip_address))
ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
print('Connected to AP with IP: {}'.format(ip_address))
except DUT.ExpectTimeout:
print('ENV_TEST_FAILURE: Cannot connect to AP')
raise
print("Checking py-client received msg published from esp...")
print('Checking py-client received msg published from esp...')
if not event_client_received_correct.wait(timeout=30):
raise ValueError('Wrong data received, msg log: {}'.format(message_log))
print("Checking esp-client received msg published from py-client...")
dut1.expect(re.compile(r"DATA=data_to_esp32"), timeout=30)
print('Checking esp-client received msg published from py-client...')
dut1.expect(re.compile(r'DATA=data_to_esp32'), timeout=30)
finally:
event_stop_client.set()
thread1.join()

View File

@@ -1,16 +1,17 @@
from __future__ import print_function
from __future__ import unicode_literals
import re
from __future__ import print_function, unicode_literals
import os
import socket
import select
import subprocess
from threading import Thread, Event
import ttfw_idf
import ssl
import paho.mqtt.client as mqtt
import string
import random
import re
import select
import socket
import ssl
import string
import subprocess
from threading import Event, Thread
import paho.mqtt.client as mqtt
import ttfw_idf
DEFAULT_MSG_SIZE = 16
@@ -21,12 +22,12 @@ def _path(f):
def set_server_cert_cn(ip):
arg_list = [
['openssl', 'req', '-out', _path('srv.csr'), '-key', _path('server.key'),'-subj', "/CN={}".format(ip), '-new'],
['openssl', 'req', '-out', _path('srv.csr'), '-key', _path('server.key'),'-subj', '/CN={}'.format(ip), '-new'],
['openssl', 'x509', '-req', '-in', _path('srv.csr'), '-CA', _path('ca.crt'),
'-CAkey', _path('ca.key'), '-CAcreateserial', '-out', _path('srv.crt'), '-days', '360']]
for args in arg_list:
if subprocess.check_call(args) != 0:
raise("openssl command {} failed".format(args))
raise('openssl command {} failed'.format(args))
def get_my_ip():
@@ -54,9 +55,9 @@ class MqttPublisher:
self.log_details = log_details
self.repeat = repeat
self.publish_cfg = publish_cfg
self.publish_cfg["qos"] = qos
self.publish_cfg["queue"] = queue
self.publish_cfg["transport"] = transport
self.publish_cfg['qos'] = qos
self.publish_cfg['queue'] = queue
self.publish_cfg['transport'] = transport
# static variables used to pass options to and from static callbacks of paho-mqtt client
MqttPublisher.event_client_connected = Event()
MqttPublisher.event_client_got_all = Event()
@@ -90,52 +91,52 @@ class MqttPublisher:
def __enter__(self):
qos = self.publish_cfg["qos"]
queue = self.publish_cfg["queue"]
transport = self.publish_cfg["transport"]
broker_host = self.publish_cfg["broker_host_" + transport]
broker_port = self.publish_cfg["broker_port_" + transport]
qos = self.publish_cfg['qos']
queue = self.publish_cfg['queue']
transport = self.publish_cfg['transport']
broker_host = self.publish_cfg['broker_host_' + transport]
broker_port = self.publish_cfg['broker_port_' + transport]
# Start the test
self.print_details("PUBLISH TEST: transport:{}, qos:{}, sequence:{}, enqueue:{}, sample msg:'{}'"
.format(transport, qos, MqttPublisher.published, queue, MqttPublisher.expected_data))
try:
if transport in ["ws", "wss"]:
self.client = mqtt.Client(transport="websockets")
if transport in ['ws', 'wss']:
self.client = mqtt.Client(transport='websockets')
else:
self.client = mqtt.Client()
self.client.on_connect = MqttPublisher.on_connect
self.client.on_message = MqttPublisher.on_message
self.client.user_data_set(0)
if transport in ["ssl", "wss"]:
if transport in ['ssl', 'wss']:
self.client.tls_set(None, None, None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
self.client.tls_insecure_set(True)
self.print_details("Connecting...")
self.print_details('Connecting...')
self.client.connect(broker_host, broker_port, 60)
except Exception:
self.print_details("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}".format(broker_host))
self.print_details('ENV_TEST_FAILURE: Unexpected error while connecting to broker {}'.format(broker_host))
raise
# Starting a py-client in a separate thread
thread1 = Thread(target=self.mqtt_client_task, args=(self.client,))
thread1.start()
self.print_details("Connecting py-client to broker {}:{}...".format(broker_host, broker_port))
self.print_details('Connecting py-client to broker {}:{}...'.format(broker_host, broker_port))
if not MqttPublisher.event_client_connected.wait(timeout=30):
raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_host))
self.client.subscribe(self.publish_cfg["subscribe_topic"], qos)
self.dut.write(' '.join(str(x) for x in (transport, self.sample_string, self.repeat, MqttPublisher.published, qos, queue)), eol="\n")
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_host))
self.client.subscribe(self.publish_cfg['subscribe_topic'], qos)
self.dut.write(' '.join(str(x) for x in (transport, self.sample_string, self.repeat, MqttPublisher.published, qos, queue)), eol='\n')
try:
# waiting till subscribed to defined topic
self.dut.expect(re.compile(r"MQTT_EVENT_SUBSCRIBED"), timeout=30)
self.dut.expect(re.compile(r'MQTT_EVENT_SUBSCRIBED'), timeout=30)
for _ in range(MqttPublisher.published):
self.client.publish(self.publish_cfg["publish_topic"], self.sample_string * self.repeat, qos)
self.print_details("Publishing...")
self.print_details("Checking esp-client received msg published from py-client...")
self.dut.expect(re.compile(r"Correct pattern received exactly x times"), timeout=60)
self.client.publish(self.publish_cfg['publish_topic'], self.sample_string * self.repeat, qos)
self.print_details('Publishing...')
self.print_details('Checking esp-client received msg published from py-client...')
self.dut.expect(re.compile(r'Correct pattern received exactly x times'), timeout=60)
if not MqttPublisher.event_client_got_all.wait(timeout=60):
raise ValueError("Not all data received from ESP32")
print(" - all data received from ESP32")
raise ValueError('Not all data received from ESP32')
print(' - all data received from ESP32')
finally:
self.event_stop_client.set()
thread1.join()
@@ -164,7 +165,7 @@ class TlsServer:
try:
self.socket.bind(('', self.port))
except socket.error as e:
print("Bind failed:{}".format(e))
print('Bind failed:{}'.format(e))
raise
self.socket.listen(1)
@@ -190,23 +191,23 @@ class TlsServer:
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
if self.client_cert:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=_path("ca.crt"))
context.load_cert_chain(certfile=_path("srv.crt"), keyfile=_path("server.key"))
context.load_verify_locations(cafile=_path('ca.crt'))
context.load_cert_chain(certfile=_path('srv.crt'), keyfile=_path('server.key'))
if self.use_alpn:
context.set_alpn_protocols(["mymqtt", "http/1.1"])
context.set_alpn_protocols(['mymqtt', 'http/1.1'])
self.socket = context.wrap_socket(self.socket, server_side=True)
try:
self.conn, address = self.socket.accept() # accept new connection
self.socket.settimeout(10.0)
print(" - connection from: {}".format(address))
print(' - connection from: {}'.format(address))
if self.use_alpn:
self.negotiated_protocol = self.conn.selected_alpn_protocol()
print(" - negotiated_protocol: {}".format(self.negotiated_protocol))
print(' - negotiated_protocol: {}'.format(self.negotiated_protocol))
self.handle_conn()
except ssl.SSLError as e:
self.conn = None
self.ssl_error = str(e)
print(" - SSLError: {}".format(str(e)))
print(' - SSLError: {}'.format(str(e)))
def handle_conn(self):
while not self.shutdown.is_set():
@@ -216,7 +217,7 @@ class TlsServer:
self.process_mqtt_connect()
except socket.error as err:
print(" - error: {}".format(err))
print(' - error: {}'.format(err))
raise
def process_mqtt_connect(self):
@@ -225,20 +226,20 @@ class TlsServer:
message = ''.join(format(x, '02x') for x in data)
if message[0:16] == '101800044d515454':
if self.refuse_connection is False:
print(" - received mqtt connect, sending ACK")
self.conn.send(bytearray.fromhex("20020000"))
print(' - received mqtt connect, sending ACK')
self.conn.send(bytearray.fromhex('20020000'))
else:
# injecting connection not authorized error
print(" - received mqtt connect, sending NAK")
self.conn.send(bytearray.fromhex("20020005"))
print(' - received mqtt connect, sending NAK')
self.conn.send(bytearray.fromhex('20020005'))
else:
raise Exception(" - error process_mqtt_connect unexpected connect received: {}".format(message))
raise Exception(' - error process_mqtt_connect unexpected connect received: {}'.format(message))
finally:
# stop the server after the connect message in happy flow, or if any exception occur
self.shutdown.set()
@ttfw_idf.idf_custom_test(env_tag="Example_WIFI", group="test-apps")
@ttfw_idf.idf_custom_test(env_tag='Example_WIFI', group='test-apps')
def test_app_protocol_mqtt_publish_connect(env, extra_data):
"""
steps:
@@ -246,11 +247,11 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
2. connect to uri specified in the config
3. send and receive data
"""
dut1 = env.get_dut("mqtt_publish_connect_test", "tools/test_apps/protocols/mqtt/publish_connect_test", dut_class=ttfw_idf.ESP32DUT)
dut1 = env.get_dut('mqtt_publish_connect_test', 'tools/test_apps/protocols/mqtt/publish_connect_test', dut_class=ttfw_idf.ESP32DUT)
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_publish_connect_test.bin")
binary_file = os.path.join(dut1.app.binary_path, 'mqtt_publish_connect_test.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance("mqtt_publish_connect_test_bin_size", "{}KB".format(bin_size // 1024))
ttfw_idf.log_performance('mqtt_publish_connect_test_bin_size', '{}KB'.format(bin_size // 1024))
# Look for test case symbolic names and publish configs
cases = {}
@@ -263,30 +264,30 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
return value.group(1), int(value.group(2))
# Get connection test cases configuration: symbolic names for test cases
for i in ["CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT",
"CONFIG_EXAMPLE_CONNECT_CASE_SERVER_CERT",
"CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH",
"CONFIG_EXAMPLE_CONNECT_CASE_INVALID_SERVER_CERT",
"CONFIG_EXAMPLE_CONNECT_CASE_SERVER_DER_CERT",
"CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_KEY_PWD",
"CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_BAD_CRT",
"CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT_ALPN"]:
for i in ['CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT',
'CONFIG_EXAMPLE_CONNECT_CASE_SERVER_CERT',
'CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH',
'CONFIG_EXAMPLE_CONNECT_CASE_INVALID_SERVER_CERT',
'CONFIG_EXAMPLE_CONNECT_CASE_SERVER_DER_CERT',
'CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_KEY_PWD',
'CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_BAD_CRT',
'CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT_ALPN']:
cases[i] = dut1.app.get_sdkconfig()[i]
# Get publish test configuration
publish_cfg["publish_topic"] = dut1.app.get_sdkconfig()["CONFIG_EXAMPLE_SUBSCIBE_TOPIC"].replace('"','')
publish_cfg["subscribe_topic"] = dut1.app.get_sdkconfig()["CONFIG_EXAMPLE_PUBLISH_TOPIC"].replace('"','')
publish_cfg["broker_host_ssl"], publish_cfg["broker_port_ssl"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_SSL_URI")
publish_cfg["broker_host_tcp"], publish_cfg["broker_port_tcp"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_TCP_URI")
publish_cfg["broker_host_ws"], publish_cfg["broker_port_ws"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_WS_URI")
publish_cfg["broker_host_wss"], publish_cfg["broker_port_wss"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_WSS_URI")
publish_cfg['publish_topic'] = dut1.app.get_sdkconfig()['CONFIG_EXAMPLE_SUBSCIBE_TOPIC'].replace('"','')
publish_cfg['subscribe_topic'] = dut1.app.get_sdkconfig()['CONFIG_EXAMPLE_PUBLISH_TOPIC'].replace('"','')
publish_cfg['broker_host_ssl'], publish_cfg['broker_port_ssl'] = get_host_port_from_dut(dut1, 'CONFIG_EXAMPLE_BROKER_SSL_URI')
publish_cfg['broker_host_tcp'], publish_cfg['broker_port_tcp'] = get_host_port_from_dut(dut1, 'CONFIG_EXAMPLE_BROKER_TCP_URI')
publish_cfg['broker_host_ws'], publish_cfg['broker_port_ws'] = get_host_port_from_dut(dut1, 'CONFIG_EXAMPLE_BROKER_WS_URI')
publish_cfg['broker_host_wss'], publish_cfg['broker_port_wss'] = get_host_port_from_dut(dut1, 'CONFIG_EXAMPLE_BROKER_WSS_URI')
except Exception:
print('ENV_TEST_FAILURE: Some mandatory test case not found in sdkconfig')
raise
dut1.start_app()
esp_ip = dut1.expect(re.compile(r" IPv4 address: ([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)"), timeout=30)
print("Got IP={}".format(esp_ip[0]))
esp_ip = dut1.expect(re.compile(r' IPv4 address: ([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)'), timeout=30)
print('Got IP={}'.format(esp_ip[0]))
#
# start connection test
@@ -295,73 +296,73 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
server_port = 2222
def start_connection_case(case, desc):
print("Starting {}: {}".format(case, desc))
print('Starting {}: {}'.format(case, desc))
case_id = cases[case]
dut1.write("conn {} {} {}".format(ip, server_port, case_id))
dut1.expect("Test case:{} started".format(case_id))
dut1.write('conn {} {} {}'.format(ip, server_port, case_id))
dut1.expect('Test case:{} started'.format(case_id))
return case_id
for case in ["CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT", "CONFIG_EXAMPLE_CONNECT_CASE_SERVER_CERT", "CONFIG_EXAMPLE_CONNECT_CASE_SERVER_DER_CERT"]:
for case in ['CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT', 'CONFIG_EXAMPLE_CONNECT_CASE_SERVER_CERT', 'CONFIG_EXAMPLE_CONNECT_CASE_SERVER_DER_CERT']:
# All these cases connect to the server with no server verification or with server only verification
with TlsServer(server_port):
test_nr = start_connection_case(case, "default server - expect to connect normally")
dut1.expect("MQTT_EVENT_CONNECTED: Test={}".format(test_nr), timeout=30)
test_nr = start_connection_case(case, 'default server - expect to connect normally')
dut1.expect('MQTT_EVENT_CONNECTED: Test={}'.format(test_nr), timeout=30)
with TlsServer(server_port, refuse_connection=True):
test_nr = start_connection_case(case, "ssl shall connect, but mqtt sends connect refusal")
dut1.expect("MQTT_EVENT_ERROR: Test={}".format(test_nr), timeout=30)
dut1.expect("MQTT ERROR: 0x5") # expecting 0x5 ... connection not authorized error
test_nr = start_connection_case(case, 'ssl shall connect, but mqtt sends connect refusal')
dut1.expect('MQTT_EVENT_ERROR: Test={}'.format(test_nr), timeout=30)
dut1.expect('MQTT ERROR: 0x5') # expecting 0x5 ... connection not authorized error
with TlsServer(server_port, client_cert=True) as s:
test_nr = start_connection_case(case, "server with client verification - handshake error since client presents no client certificate")
dut1.expect("MQTT_EVENT_ERROR: Test={}".format(test_nr), timeout=30)
dut1.expect("ESP-TLS ERROR: 0x8010") # expect ... handshake error (PEER_DID_NOT_RETURN_A_CERTIFICATE)
if "PEER_DID_NOT_RETURN_A_CERTIFICATE" not in s.get_last_ssl_error():
raise("Unexpected ssl error from the server {}".format(s.get_last_ssl_error()))
test_nr = start_connection_case(case, 'server with client verification - handshake error since client presents no client certificate')
dut1.expect('MQTT_EVENT_ERROR: Test={}'.format(test_nr), timeout=30)
dut1.expect('ESP-TLS ERROR: 0x8010') # expect ... handshake error (PEER_DID_NOT_RETURN_A_CERTIFICATE)
if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' not in s.get_last_ssl_error():
raise('Unexpected ssl error from the server {}'.format(s.get_last_ssl_error()))
for case in ["CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH", "CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_KEY_PWD"]:
for case in ['CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH', 'CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_KEY_PWD']:
# These cases connect to server with both server and client verification (client key might be password protected)
with TlsServer(server_port, client_cert=True):
test_nr = start_connection_case(case, "server with client verification - expect to connect normally")
dut1.expect("MQTT_EVENT_CONNECTED: Test={}".format(test_nr), timeout=30)
test_nr = start_connection_case(case, 'server with client verification - expect to connect normally')
dut1.expect('MQTT_EVENT_CONNECTED: Test={}'.format(test_nr), timeout=30)
case = "CONFIG_EXAMPLE_CONNECT_CASE_INVALID_SERVER_CERT"
case = 'CONFIG_EXAMPLE_CONNECT_CASE_INVALID_SERVER_CERT'
with TlsServer(server_port) as s:
test_nr = start_connection_case(case, "invalid server certificate on default server - expect ssl handshake error")
dut1.expect("MQTT_EVENT_ERROR: Test={}".format(test_nr), timeout=30)
dut1.expect("ESP-TLS ERROR: 0x8010") # expect ... handshake error (TLSV1_ALERT_UNKNOWN_CA)
if "alert unknown ca" not in s.get_last_ssl_error():
raise Exception("Unexpected ssl error from the server {}".format(s.get_last_ssl_error()))
test_nr = start_connection_case(case, 'invalid server certificate on default server - expect ssl handshake error')
dut1.expect('MQTT_EVENT_ERROR: Test={}'.format(test_nr), timeout=30)
dut1.expect('ESP-TLS ERROR: 0x8010') # expect ... handshake error (TLSV1_ALERT_UNKNOWN_CA)
if 'alert unknown ca' not in s.get_last_ssl_error():
raise Exception('Unexpected ssl error from the server {}'.format(s.get_last_ssl_error()))
case = "CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_BAD_CRT"
case = 'CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_BAD_CRT'
with TlsServer(server_port, client_cert=True) as s:
test_nr = start_connection_case(case, "Invalid client certificate on server with client verification - expect ssl handshake error")
dut1.expect("MQTT_EVENT_ERROR: Test={}".format(test_nr), timeout=30)
dut1.expect("ESP-TLS ERROR: 0x8010") # expect ... handshake error (CERTIFICATE_VERIFY_FAILED)
if "CERTIFICATE_VERIFY_FAILED" not in s.get_last_ssl_error():
raise Exception("Unexpected ssl error from the server {}".format(s.get_last_ssl_error()))
test_nr = start_connection_case(case, 'Invalid client certificate on server with client verification - expect ssl handshake error')
dut1.expect('MQTT_EVENT_ERROR: Test={}'.format(test_nr), timeout=30)
dut1.expect('ESP-TLS ERROR: 0x8010') # expect ... handshake error (CERTIFICATE_VERIFY_FAILED)
if 'CERTIFICATE_VERIFY_FAILED' not in s.get_last_ssl_error():
raise Exception('Unexpected ssl error from the server {}'.format(s.get_last_ssl_error()))
for case in ["CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT", "CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT_ALPN"]:
for case in ['CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT', 'CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT_ALPN']:
with TlsServer(server_port, use_alpn=True) as s:
test_nr = start_connection_case(case, "server with alpn - expect connect, check resolved protocol")
dut1.expect("MQTT_EVENT_CONNECTED: Test={}".format(test_nr), timeout=30)
if case == "CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT" and s.get_negotiated_protocol() is None:
print(" - client with alpn off, no negotiated protocol: OK")
elif case == "CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT_ALPN" and s.get_negotiated_protocol() == "mymqtt":
print(" - client with alpn on, negotiated protocol resolved: OK")
test_nr = start_connection_case(case, 'server with alpn - expect connect, check resolved protocol')
dut1.expect('MQTT_EVENT_CONNECTED: Test={}'.format(test_nr), timeout=30)
if case == 'CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT' and s.get_negotiated_protocol() is None:
print(' - client with alpn off, no negotiated protocol: OK')
elif case == 'CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT_ALPN' and s.get_negotiated_protocol() == 'mymqtt':
print(' - client with alpn on, negotiated protocol resolved: OK')
else:
raise Exception("Unexpected negotiated protocol {}".format(s.get_negotiated_protocol()))
raise Exception('Unexpected negotiated protocol {}'.format(s.get_negotiated_protocol()))
#
# start publish tests
def start_publish_case(transport, qos, repeat, published, queue):
print("Starting Publish test: transport:{}, qos:{}, nr_of_msgs:{}, msg_size:{}, enqueue:{}"
print('Starting Publish test: transport:{}, qos:{}, nr_of_msgs:{}, msg_size:{}, enqueue:{}'
.format(transport, qos, published, repeat * DEFAULT_MSG_SIZE, queue))
with MqttPublisher(dut1, transport, qos, repeat, published, queue, publish_cfg):
pass
for qos in [0, 1, 2]:
for transport in ["tcp", "ssl", "ws", "wss"]:
for transport in ['tcp', 'ssl', 'ws', 'wss']:
for q in [0, 1]:
if publish_cfg["broker_host_" + transport] is None:
if publish_cfg['broker_host_' + transport] is None:
print('Skipping transport: {}...'.format(transport))
continue
start_publish_case(transport, qos, 0, 5, q)