forked from espressif/esp-modbus
style: format python files with isort and double-quote-string-fixer
* Original commit: espressif/esp-idf@0146f258d7
This commit is contained in:
@@ -1,15 +1,15 @@
|
|||||||
# Need Python 3 string formatting functions
|
# Need Python 3 string formatting functions
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
|
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import logging
|
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
import ttfw_idf
|
import ttfw_idf
|
||||||
|
|
||||||
LOG_LEVEL = logging.DEBUG
|
LOG_LEVEL = logging.DEBUG
|
||||||
LOGGER_NAME = "modbus_test"
|
LOGGER_NAME = 'modbus_test'
|
||||||
|
|
||||||
# Allowed parameter reads
|
# Allowed parameter reads
|
||||||
TEST_READ_MIN_COUNT = 10 # Minimum number of correct readings
|
TEST_READ_MIN_COUNT = 10 # Minimum number of correct readings
|
||||||
@@ -27,38 +27,38 @@ TEST_SLAVE_ASCII = 'slave_ascii'
|
|||||||
|
|
||||||
# Define tuple of strings to expect for each DUT.
|
# Define tuple of strings to expect for each DUT.
|
||||||
#
|
#
|
||||||
master_expect = ("MASTER_TEST: Modbus master stack initialized...", "MASTER_TEST: Start modbus test...", "MASTER_TEST: Destroy master...")
|
master_expect = ('MASTER_TEST: Modbus master stack initialized...', 'MASTER_TEST: Start modbus test...', 'MASTER_TEST: Destroy master...')
|
||||||
slave_expect = ("SLAVE_TEST: Modbus slave stack initialized.", "SLAVE_TEST: Start modbus test...", "SLAVE_TEST: Modbus controller destroyed.")
|
slave_expect = ('SLAVE_TEST: Modbus slave stack initialized.', 'SLAVE_TEST: Start modbus test...', 'SLAVE_TEST: Modbus controller destroyed.')
|
||||||
|
|
||||||
# The dictionary for expected values in listing
|
# The dictionary for expected values in listing
|
||||||
expect_dict_master_ok = {"START": (),
|
expect_dict_master_ok = {'START': (),
|
||||||
"READ_PAR_OK": (),
|
'READ_PAR_OK': (),
|
||||||
"ALARM_MSG": (u'7',)}
|
'ALARM_MSG': (u'7',)}
|
||||||
|
|
||||||
expect_dict_master_err = {"READ_PAR_ERR": (u'263', u'ESP_ERR_TIMEOUT'),
|
expect_dict_master_err = {'READ_PAR_ERR': (u'263', u'ESP_ERR_TIMEOUT'),
|
||||||
"READ_STK_ERR": (u'107', u'ESP_ERR_TIMEOUT')}
|
'READ_STK_ERR': (u'107', u'ESP_ERR_TIMEOUT')}
|
||||||
|
|
||||||
# The dictionary for regular expression patterns to check in listing
|
# The dictionary for regular expression patterns to check in listing
|
||||||
pattern_dict_master_ok = {"START": (r'.*I \([0-9]+\) MASTER_TEST: Start modbus test...'),
|
pattern_dict_master_ok = {'START': (r'.*I \([0-9]+\) MASTER_TEST: Start modbus test...'),
|
||||||
"READ_PAR_OK": (r'.*I\s\([0-9]+\) MASTER_TEST: Characteristic #[0-9]+ [a-zA-Z0-9_]+'
|
'READ_PAR_OK': (r'.*I\s\([0-9]+\) MASTER_TEST: Characteristic #[0-9]+ [a-zA-Z0-9_]+'
|
||||||
r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.\s]*\(0x[a-zA-Z0-9]+\) read successful.'),
|
r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.\s]*\(0x[a-zA-Z0-9]+\) read successful.'),
|
||||||
"ALARM_MSG": (r'.*I \([0-9]*\) MASTER_TEST: Alarm triggered by cid #([0-9]+).')}
|
'ALARM_MSG': (r'.*I \([0-9]*\) MASTER_TEST: Alarm triggered by cid #([0-9]+).')}
|
||||||
|
|
||||||
pattern_dict_master_err = {"READ_PAR_ERR_TOUT": (r'.*E \([0-9]+\) MASTER_TEST: Characteristic #[0-9]+'
|
pattern_dict_master_err = {'READ_PAR_ERR_TOUT': (r'.*E \([0-9]+\) MASTER_TEST: Characteristic #[0-9]+'
|
||||||
r'\s\([a-zA-Z0-9_]+\) read fail, err = [0-9]+ \([_A-Z]+\).'),
|
r'\s\([a-zA-Z0-9_]+\) read fail, err = [0-9]+ \([_A-Z]+\).'),
|
||||||
"READ_STK_ERR_TOUT": (r'.*E \([0-9]+\) MB_CONTROLLER_MASTER: [a-zA-Z0-9_]+\([0-9]+\):\s'
|
'READ_STK_ERR_TOUT': (r'.*E \([0-9]+\) MB_CONTROLLER_MASTER: [a-zA-Z0-9_]+\([0-9]+\):\s'
|
||||||
r'SERIAL master get parameter failure error=\(0x([a-zA-Z0-9]+)\) \(([_A-Z]+)\).')}
|
r'SERIAL master get parameter failure error=\(0x([a-zA-Z0-9]+)\) \(([_A-Z]+)\).')}
|
||||||
|
|
||||||
# The dictionary for expected values in listing
|
# The dictionary for expected values in listing
|
||||||
expect_dict_slave_ok = {"START": (),
|
expect_dict_slave_ok = {'START': (),
|
||||||
"READ_PAR_OK": (),
|
'READ_PAR_OK': (),
|
||||||
"DESTROY": ()}
|
'DESTROY': ()}
|
||||||
|
|
||||||
# The dictionary for regular expression patterns to check in listing
|
# The dictionary for regular expression patterns to check in listing
|
||||||
pattern_dict_slave_ok = {"START": (r'.*I \([0-9]+\) SLAVE_TEST: Start modbus test...'),
|
pattern_dict_slave_ok = {'START': (r'.*I \([0-9]+\) SLAVE_TEST: Start modbus test...'),
|
||||||
"READ_PAR_OK": (r'.*I\s\([0-9]+\) SLAVE_TEST: [A-Z]+ READ \([a-zA-Z0-9_]+ us\),\s'
|
'READ_PAR_OK': (r'.*I\s\([0-9]+\) SLAVE_TEST: [A-Z]+ READ \([a-zA-Z0-9_]+ us\),\s'
|
||||||
r'ADDR:[0-9]+, TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'),
|
r'ADDR:[0-9]+, TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'),
|
||||||
"DESTROY": (r'.*I\s\([0-9]+\) SLAVE_TEST: Modbus controller destroyed.')}
|
'DESTROY': (r'.*I\s\([0-9]+\) SLAVE_TEST: Modbus controller destroyed.')}
|
||||||
|
|
||||||
logger = logging.getLogger(LOGGER_NAME)
|
logger = logging.getLogger(LOGGER_NAME)
|
||||||
|
|
||||||
@@ -89,8 +89,8 @@ class DutTestThread(Thread):
|
|||||||
|
|
||||||
# Check DUT exceptions
|
# Check DUT exceptions
|
||||||
dut_exceptions = self.dut.get_exceptions()
|
dut_exceptions = self.dut.get_exceptions()
|
||||||
if "Guru Meditation Error:" in dut_exceptions:
|
if 'Guru Meditation Error:' in dut_exceptions:
|
||||||
raise Exception("%s generated an exception: %s\n" % (str(self.dut), dut_exceptions))
|
raise Exception('%s generated an exception: %s\n' % (str(self.dut), dut_exceptions))
|
||||||
|
|
||||||
# Mark thread has run to completion without any exceptions
|
# Mark thread has run to completion without any exceptions
|
||||||
self.data = self.dut.stop_capture_raw_data()
|
self.data = self.dut.stop_capture_raw_data()
|
||||||
@@ -102,7 +102,7 @@ def test_filter_output(data=None, start_pattern=None, end_pattern=None):
|
|||||||
"""
|
"""
|
||||||
start_index = str(data).find(start_pattern)
|
start_index = str(data).find(start_pattern)
|
||||||
end_index = str(data).find(end_pattern)
|
end_index = str(data).find(end_pattern)
|
||||||
logger.debug("Listing start index= %d, end=%d" % (start_index, end_index))
|
logger.debug('Listing start index= %d, end=%d' % (start_index, end_index))
|
||||||
if start_index == -1 or end_index == -1:
|
if start_index == -1 or end_index == -1:
|
||||||
return data
|
return data
|
||||||
return data[start_index:end_index + len(end_pattern)]
|
return data[start_index:end_index + len(end_pattern)]
|
||||||
@@ -145,9 +145,9 @@ def test_check_output(data=None, check_dict=None, expect_dict=None):
|
|||||||
for line in data_lines:
|
for line in data_lines:
|
||||||
group, index = test_expect_re(line, pattern)
|
group, index = test_expect_re(line, pattern)
|
||||||
if index is not None:
|
if index is not None:
|
||||||
logger.debug("Found key{%s}=%s, line: \n%s" % (key, group, line))
|
logger.debug('Found key{%s}=%s, line: \n%s' % (key, group, line))
|
||||||
if expect_dict[key] == group:
|
if expect_dict[key] == group:
|
||||||
logger.debug("The result is correct for the key:%s, expected:%s == returned:%s" % (key, str(expect_dict[key]), str(group)))
|
logger.debug('The result is correct for the key:%s, expected:%s == returned:%s' % (key, str(expect_dict[key]), str(group)))
|
||||||
match_count += 1
|
match_count += 1
|
||||||
return match_count
|
return match_count
|
||||||
|
|
||||||
@@ -158,7 +158,7 @@ def test_check_mode(dut=None, mode_str=None, value=None):
|
|||||||
global logger
|
global logger
|
||||||
try:
|
try:
|
||||||
opt = dut.app.get_sdkconfig()[mode_str]
|
opt = dut.app.get_sdkconfig()[mode_str]
|
||||||
logger.info("%s {%s} = %s.\n" % (str(dut), mode_str, opt))
|
logger.info('%s {%s} = %s.\n' % (str(dut), mode_str, opt))
|
||||||
return value == opt
|
return value == opt
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.info('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str))
|
logger.info('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str))
|
||||||
@@ -170,30 +170,30 @@ def test_modbus_communication(env, comm_mode):
|
|||||||
global logger
|
global logger
|
||||||
|
|
||||||
# Get device under test. "dut1 - master", "dut2 - slave" must be properly connected through RS485 interface driver
|
# Get device under test. "dut1 - master", "dut2 - slave" must be properly connected through RS485 interface driver
|
||||||
dut_master = env.get_dut("modbus_master", "examples/protocols/modbus/serial/mb_master", dut_class=ttfw_idf.ESP32DUT)
|
dut_master = env.get_dut('modbus_master', 'examples/protocols/modbus/serial/mb_master', dut_class=ttfw_idf.ESP32DUT)
|
||||||
dut_slave = env.get_dut("modbus_slave", "examples/protocols/modbus/serial/mb_slave", dut_class=ttfw_idf.ESP32DUT)
|
dut_slave = env.get_dut('modbus_slave', 'examples/protocols/modbus/serial/mb_slave', dut_class=ttfw_idf.ESP32DUT)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
logger.debug("Environment vars: %s\r\n" % os.environ)
|
logger.debug('Environment vars: %s\r\n' % os.environ)
|
||||||
logger.debug("DUT slave sdkconfig: %s\r\n" % dut_slave.app.get_sdkconfig())
|
logger.debug('DUT slave sdkconfig: %s\r\n' % dut_slave.app.get_sdkconfig())
|
||||||
logger.debug("DUT master sdkconfig: %s\r\n" % dut_master.app.get_sdkconfig())
|
logger.debug('DUT master sdkconfig: %s\r\n' % dut_master.app.get_sdkconfig())
|
||||||
|
|
||||||
# Check Kconfig configuration options for each built example
|
# Check Kconfig configuration options for each built example
|
||||||
if test_check_mode(dut_master, "CONFIG_MB_COMM_MODE_ASCII", "y") and test_check_mode(dut_slave, "CONFIG_MB_COMM_MODE_ASCII", "y"):
|
if test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_ASCII', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_ASCII', 'y'):
|
||||||
logger.info("ENV_TEST_INFO: Modbus ASCII test mode selected in the configuration. \n")
|
logger.info('ENV_TEST_INFO: Modbus ASCII test mode selected in the configuration. \n')
|
||||||
slave_name = TEST_SLAVE_ASCII
|
slave_name = TEST_SLAVE_ASCII
|
||||||
master_name = TEST_MASTER_ASCII
|
master_name = TEST_MASTER_ASCII
|
||||||
elif test_check_mode(dut_master, "CONFIG_MB_COMM_MODE_RTU", "y") and test_check_mode(dut_slave, "CONFIG_MB_COMM_MODE_RTU", "y"):
|
elif test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_RTU', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_RTU', 'y'):
|
||||||
logger.info("ENV_TEST_INFO: Modbus RTU test mode selected in the configuration. \n")
|
logger.info('ENV_TEST_INFO: Modbus RTU test mode selected in the configuration. \n')
|
||||||
slave_name = TEST_SLAVE_RTU
|
slave_name = TEST_SLAVE_RTU
|
||||||
master_name = TEST_MASTER_RTU
|
master_name = TEST_MASTER_RTU
|
||||||
else:
|
else:
|
||||||
logger.error("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n")
|
logger.error("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n")
|
||||||
raise Exception("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n")
|
raise Exception("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n")
|
||||||
# Check if slave address for example application is default one to be able to communicate
|
# Check if slave address for example application is default one to be able to communicate
|
||||||
if not test_check_mode(dut_slave, "CONFIG_MB_SLAVE_ADDR", "1"):
|
if not test_check_mode(dut_slave, 'CONFIG_MB_SLAVE_ADDR', '1'):
|
||||||
logger.error("ENV_TEST_FAILURE: Slave address option is incorrect.\n")
|
logger.error('ENV_TEST_FAILURE: Slave address option is incorrect.\n')
|
||||||
raise Exception("ENV_TEST_FAILURE: Slave address option is incorrect.\n")
|
raise Exception('ENV_TEST_FAILURE: Slave address option is incorrect.\n')
|
||||||
|
|
||||||
# Flash app onto each DUT
|
# Flash app onto each DUT
|
||||||
dut_master.start_app()
|
dut_master.start_app()
|
||||||
@@ -212,15 +212,15 @@ def test_modbus_communication(env, comm_mode):
|
|||||||
dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
|
dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
|
||||||
|
|
||||||
if dut_slave_thread.isAlive():
|
if dut_slave_thread.isAlive():
|
||||||
logger.error("ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" %
|
logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||||
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||||
raise Exception("ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" %
|
raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||||
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||||
|
|
||||||
if dut_master_thread.isAlive():
|
if dut_master_thread.isAlive():
|
||||||
logger.error("ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" %
|
logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||||
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||||
raise Exception("ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" %
|
raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||||
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||||
finally:
|
finally:
|
||||||
dut_master.close()
|
dut_master.close()
|
||||||
@@ -228,43 +228,43 @@ def test_modbus_communication(env, comm_mode):
|
|||||||
|
|
||||||
# Check if test threads completed successfully and captured data
|
# Check if test threads completed successfully and captured data
|
||||||
if not dut_slave_thread.result or dut_slave_thread.data is None:
|
if not dut_slave_thread.result or dut_slave_thread.data is None:
|
||||||
logger.error("The thread %s was not run successfully." % dut_slave_thread.tname)
|
logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname)
|
||||||
raise Exception("The thread %s was not run successfully." % dut_slave_thread.tname)
|
raise Exception('The thread %s was not run successfully.' % dut_slave_thread.tname)
|
||||||
|
|
||||||
if not dut_master_thread.result or dut_master_thread.data is None:
|
if not dut_master_thread.result or dut_master_thread.data is None:
|
||||||
logger.error("The thread %s was not run successfully." % dut_slave_thread.tname)
|
logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname)
|
||||||
raise Exception("The thread %s was not run successfully." % dut_master_thread.tname)
|
raise Exception('The thread %s was not run successfully.' % dut_master_thread.tname)
|
||||||
|
|
||||||
# Filter output to get test messages
|
# Filter output to get test messages
|
||||||
master_output = test_filter_output(dut_master_thread.data, master_expect[0], master_expect[len(master_expect) - 1])
|
master_output = test_filter_output(dut_master_thread.data, master_expect[0], master_expect[len(master_expect) - 1])
|
||||||
if master_output is not None:
|
if master_output is not None:
|
||||||
logger.info("The data for master thread is captured.")
|
logger.info('The data for master thread is captured.')
|
||||||
logger.debug(master_output)
|
logger.debug(master_output)
|
||||||
|
|
||||||
slave_output = test_filter_output(dut_slave_thread.data, slave_expect[0], slave_expect[len(slave_expect) - 1])
|
slave_output = test_filter_output(dut_slave_thread.data, slave_expect[0], slave_expect[len(slave_expect) - 1])
|
||||||
if slave_output is not None:
|
if slave_output is not None:
|
||||||
logger.info("The data for slave thread is captured.")
|
logger.info('The data for slave thread is captured.')
|
||||||
logger.debug(slave_output)
|
logger.debug(slave_output)
|
||||||
|
|
||||||
# Check if parameters are read correctly by master
|
# Check if parameters are read correctly by master
|
||||||
match_count = test_check_output(master_output, pattern_dict_master_ok, expect_dict_master_ok)
|
match_count = test_check_output(master_output, pattern_dict_master_ok, expect_dict_master_ok)
|
||||||
if match_count < TEST_READ_MIN_COUNT:
|
if match_count < TEST_READ_MIN_COUNT:
|
||||||
logger.error("There are errors reading parameters from %s, %d" % (dut_master_thread.tname, match_count))
|
logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
|
||||||
raise Exception("There are errors reading parameters from %s, %d" % (dut_master_thread.tname, match_count))
|
raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
|
||||||
logger.info("OK pattern test for %s, match_count=%d." % (dut_master_thread.tname, match_count))
|
logger.info('OK pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count))
|
||||||
|
|
||||||
# If the test completed successfully (alarm triggered) but there are some errors during reading of parameters
|
# If the test completed successfully (alarm triggered) but there are some errors during reading of parameters
|
||||||
match_count = test_check_output(master_output, pattern_dict_master_err, expect_dict_master_err)
|
match_count = test_check_output(master_output, pattern_dict_master_err, expect_dict_master_err)
|
||||||
if match_count > TEST_READ_MAX_ERR_COUNT:
|
if match_count > TEST_READ_MAX_ERR_COUNT:
|
||||||
logger.error("There are errors reading parameters from %s, %d" % (dut_master_thread.tname, match_count))
|
logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
|
||||||
raise Exception("There are errors reading parameters from %s, %d" % (dut_master_thread.tname, match_count))
|
raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
|
||||||
logger.info("ERROR pattern test for %s, match_count=%d." % (dut_master_thread.tname, match_count))
|
logger.info('ERROR pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count))
|
||||||
|
|
||||||
match_count = test_check_output(slave_output, pattern_dict_slave_ok, expect_dict_slave_ok)
|
match_count = test_check_output(slave_output, pattern_dict_slave_ok, expect_dict_slave_ok)
|
||||||
if match_count < TEST_READ_MIN_COUNT:
|
if match_count < TEST_READ_MIN_COUNT:
|
||||||
logger.error("There are errors reading parameters from %s, %d" % (dut_slave_thread.tname, match_count))
|
logger.error('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count))
|
||||||
raise Exception("There are errors reading parameters from %s, %d" % (dut_slave_thread.tname, match_count))
|
raise Exception('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count))
|
||||||
logger.info("OK pattern test for %s, match_count=%d." % (dut_slave_thread.tname, match_count))
|
logger.info('OK pattern test for %s, match_count=%d.' % (dut_slave_thread.tname, match_count))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
@@ -282,7 +282,7 @@ if __name__ == '__main__':
|
|||||||
fh.setFormatter(formatter)
|
fh.setFormatter(formatter)
|
||||||
logger.addHandler(fh)
|
logger.addHandler(fh)
|
||||||
logger.addHandler(ch)
|
logger.addHandler(ch)
|
||||||
logger.info("Start script %s." % os.path.basename(__file__))
|
logger.info('Start script %s.' % os.path.basename(__file__))
|
||||||
print("Logging file name: %s" % logger.handlers[0].baseFilename)
|
print('Logging file name: %s' % logger.handlers[0].baseFilename)
|
||||||
test_modbus_communication()
|
test_modbus_communication()
|
||||||
logging.shutdown()
|
logging.shutdown()
|
||||||
|
@@ -1,13 +1,13 @@
|
|||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import logging
|
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
import ttfw_idf
|
import ttfw_idf
|
||||||
from tiny_test_fw import DUT
|
from tiny_test_fw import DUT
|
||||||
|
|
||||||
LOG_LEVEL = logging.DEBUG
|
LOG_LEVEL = logging.DEBUG
|
||||||
LOGGER_NAME = "modbus_test"
|
LOGGER_NAME = 'modbus_test'
|
||||||
|
|
||||||
# Allowed options for the test
|
# Allowed options for the test
|
||||||
TEST_READ_MAX_ERR_COUNT = 3 # Maximum allowed read errors during initialization
|
TEST_READ_MAX_ERR_COUNT = 3 # Maximum allowed read errors during initialization
|
||||||
@@ -69,7 +69,7 @@ class DutTestThread(Thread):
|
|||||||
super(DutTestThread, self).__init__()
|
super(DutTestThread, self).__init__()
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
logger.debug("Restart %s." % self.tname)
|
logger.debug('Restart %s.' % self.tname)
|
||||||
# Reset DUT first
|
# Reset DUT first
|
||||||
self.dut.reset()
|
self.dut.reset()
|
||||||
# Capture output from the DUT
|
# Capture output from the DUT
|
||||||
@@ -80,7 +80,7 @@ class DutTestThread(Thread):
|
|||||||
""" The exit method of context manager
|
""" The exit method of context manager
|
||||||
"""
|
"""
|
||||||
if exc_type is not None or exc_value is not None:
|
if exc_type is not None or exc_value is not None:
|
||||||
logger.info("Thread %s rised an exception type: %s, value: %s" % (self.tname, str(exc_type), str(exc_value)))
|
logger.info('Thread %s rised an exception type: %s, value: %s' % (self.tname, str(exc_type), str(exc_value)))
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
""" The function implements thread functionality
|
""" The function implements thread functionality
|
||||||
@@ -94,8 +94,8 @@ class DutTestThread(Thread):
|
|||||||
|
|
||||||
# Check DUT exceptions
|
# Check DUT exceptions
|
||||||
dut_exceptions = self.dut.get_exceptions()
|
dut_exceptions = self.dut.get_exceptions()
|
||||||
if "Guru Meditation Error:" in dut_exceptions:
|
if 'Guru Meditation Error:' in dut_exceptions:
|
||||||
raise Exception("%s generated an exception: %s\n" % (str(self.dut), dut_exceptions))
|
raise Exception('%s generated an exception: %s\n' % (str(self.dut), dut_exceptions))
|
||||||
|
|
||||||
# Mark thread has run to completion without any exceptions
|
# Mark thread has run to completion without any exceptions
|
||||||
self.data = self.dut.stop_capture_raw_data(capture_id=self.dut.name)
|
self.data = self.dut.stop_capture_raw_data(capture_id=self.dut.name)
|
||||||
@@ -108,13 +108,13 @@ class DutTestThread(Thread):
|
|||||||
self.dut.read()
|
self.dut.read()
|
||||||
result = self.dut.expect(re.compile(message), TEST_EXPECT_STR_TIMEOUT)
|
result = self.dut.expect(re.compile(message), TEST_EXPECT_STR_TIMEOUT)
|
||||||
if int(result[0]) != index:
|
if int(result[0]) != index:
|
||||||
raise Exception("Incorrect index of IP=%d for %s\n" % (int(result[0]), str(self.dut)))
|
raise Exception('Incorrect index of IP=%d for %s\n' % (int(result[0]), str(self.dut)))
|
||||||
message = "IP%s=%s" % (result[0], self.ip_addr)
|
message = 'IP%s=%s' % (result[0], self.ip_addr)
|
||||||
self.dut.write(message, "\r\n", False)
|
self.dut.write(message, '\r\n', False)
|
||||||
logger.debug("Sent message for %s: %s" % (self.tname, message))
|
logger.debug('Sent message for %s: %s' % (self.tname, message))
|
||||||
message = r'.*IP\([0-9]+\) = \[([0-9a-zA-Z\.\:]+)\] set from stdin.*'
|
message = r'.*IP\([0-9]+\) = \[([0-9a-zA-Z\.\:]+)\] set from stdin.*'
|
||||||
result = self.dut.expect(re.compile(message), TEST_EXPECT_STR_TIMEOUT)
|
result = self.dut.expect(re.compile(message), TEST_EXPECT_STR_TIMEOUT)
|
||||||
logger.debug("Thread %s initialized with slave IP (%s)." % (self.tname, result[0]))
|
logger.debug('Thread %s initialized with slave IP (%s).' % (self.tname, result[0]))
|
||||||
|
|
||||||
def test_start(self, timeout_value):
|
def test_start(self, timeout_value):
|
||||||
""" The method to initialize and handle test stages
|
""" The method to initialize and handle test stages
|
||||||
@@ -122,37 +122,37 @@ class DutTestThread(Thread):
|
|||||||
def handle_get_ip4(data):
|
def handle_get_ip4(data):
|
||||||
""" Handle get_ip v4
|
""" Handle get_ip v4
|
||||||
"""
|
"""
|
||||||
logger.debug("%s[STACK_IPV4]: %s" % (self.tname, str(data)))
|
logger.debug('%s[STACK_IPV4]: %s' % (self.tname, str(data)))
|
||||||
self.test_stage = STACK_IPV4
|
self.test_stage = STACK_IPV4
|
||||||
|
|
||||||
def handle_get_ip6(data):
|
def handle_get_ip6(data):
|
||||||
""" Handle get_ip v6
|
""" Handle get_ip v6
|
||||||
"""
|
"""
|
||||||
logger.debug("%s[STACK_IPV6]: %s" % (self.tname, str(data)))
|
logger.debug('%s[STACK_IPV6]: %s' % (self.tname, str(data)))
|
||||||
self.test_stage = STACK_IPV6
|
self.test_stage = STACK_IPV6
|
||||||
|
|
||||||
def handle_init(data):
|
def handle_init(data):
|
||||||
""" Handle init
|
""" Handle init
|
||||||
"""
|
"""
|
||||||
logger.debug("%s[STACK_INIT]: %s" % (self.tname, str(data)))
|
logger.debug('%s[STACK_INIT]: %s' % (self.tname, str(data)))
|
||||||
self.test_stage = STACK_INIT
|
self.test_stage = STACK_INIT
|
||||||
|
|
||||||
def handle_connect(data):
|
def handle_connect(data):
|
||||||
""" Handle connect
|
""" Handle connect
|
||||||
"""
|
"""
|
||||||
logger.debug("%s[STACK_CONNECT]: %s" % (self.tname, str(data)))
|
logger.debug('%s[STACK_CONNECT]: %s' % (self.tname, str(data)))
|
||||||
self.test_stage = STACK_CONNECT
|
self.test_stage = STACK_CONNECT
|
||||||
|
|
||||||
def handle_test_start(data):
|
def handle_test_start(data):
|
||||||
""" Handle connect
|
""" Handle connect
|
||||||
"""
|
"""
|
||||||
logger.debug("%s[STACK_START]: %s" % (self.tname, str(data)))
|
logger.debug('%s[STACK_START]: %s' % (self.tname, str(data)))
|
||||||
self.test_stage = STACK_START
|
self.test_stage = STACK_START
|
||||||
|
|
||||||
def handle_par_ok(data):
|
def handle_par_ok(data):
|
||||||
""" Handle parameter ok
|
""" Handle parameter ok
|
||||||
"""
|
"""
|
||||||
logger.debug("%s[READ_PAR_OK]: %s" % (self.tname, str(data)))
|
logger.debug('%s[READ_PAR_OK]: %s' % (self.tname, str(data)))
|
||||||
if self.test_stage >= STACK_START:
|
if self.test_stage >= STACK_START:
|
||||||
self.param_ok_count += 1
|
self.param_ok_count += 1
|
||||||
self.test_stage = STACK_PAR_OK
|
self.test_stage = STACK_PAR_OK
|
||||||
@@ -160,14 +160,14 @@ class DutTestThread(Thread):
|
|||||||
def handle_par_fail(data):
|
def handle_par_fail(data):
|
||||||
""" Handle parameter fail
|
""" Handle parameter fail
|
||||||
"""
|
"""
|
||||||
logger.debug("%s[READ_PAR_FAIL]: %s" % (self.tname, str(data)))
|
logger.debug('%s[READ_PAR_FAIL]: %s' % (self.tname, str(data)))
|
||||||
self.param_fail_count += 1
|
self.param_fail_count += 1
|
||||||
self.test_stage = STACK_PAR_FAIL
|
self.test_stage = STACK_PAR_FAIL
|
||||||
|
|
||||||
def handle_destroy(data):
|
def handle_destroy(data):
|
||||||
""" Handle destroy
|
""" Handle destroy
|
||||||
"""
|
"""
|
||||||
logger.debug("%s[DESTROY]: %s" % (self.tname, str(data)))
|
logger.debug('%s[DESTROY]: %s' % (self.tname, str(data)))
|
||||||
self.test_stage = STACK_DESTROY
|
self.test_stage = STACK_DESTROY
|
||||||
self.test_finish = True
|
self.test_finish = True
|
||||||
|
|
||||||
@@ -183,7 +183,7 @@ class DutTestThread(Thread):
|
|||||||
(re.compile(self.expected[STACK_DESTROY]), handle_destroy),
|
(re.compile(self.expected[STACK_DESTROY]), handle_destroy),
|
||||||
timeout=timeout_value)
|
timeout=timeout_value)
|
||||||
except DUT.ExpectTimeout:
|
except DUT.ExpectTimeout:
|
||||||
logger.debug("%s, expect timeout on stage #%d (%s seconds)" % (self.tname, self.test_stage, timeout_value))
|
logger.debug('%s, expect timeout on stage #%d (%s seconds)' % (self.tname, self.test_stage, timeout_value))
|
||||||
self.test_finish = True
|
self.test_finish = True
|
||||||
|
|
||||||
|
|
||||||
@@ -193,7 +193,7 @@ def test_check_mode(dut=None, mode_str=None, value=None):
|
|||||||
global logger
|
global logger
|
||||||
try:
|
try:
|
||||||
opt = dut.app.get_sdkconfig()[mode_str]
|
opt = dut.app.get_sdkconfig()[mode_str]
|
||||||
logger.debug("%s {%s} = %s.\n" % (str(dut), mode_str, opt))
|
logger.debug('%s {%s} = %s.\n' % (str(dut), mode_str, opt))
|
||||||
return value == opt
|
return value == opt
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.error('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str))
|
logger.error('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str))
|
||||||
@@ -208,8 +208,8 @@ def test_modbus_communication(env, comm_mode):
|
|||||||
# Get device under test. Both duts must be able to be connected to WiFi router
|
# Get device under test. Both duts must be able to be connected to WiFi router
|
||||||
dut_master = env.get_dut('modbus_tcp_master', os.path.join(rel_project_path, TEST_MASTER_TCP))
|
dut_master = env.get_dut('modbus_tcp_master', os.path.join(rel_project_path, TEST_MASTER_TCP))
|
||||||
dut_slave = env.get_dut('modbus_tcp_slave', os.path.join(rel_project_path, TEST_SLAVE_TCP))
|
dut_slave = env.get_dut('modbus_tcp_slave', os.path.join(rel_project_path, TEST_SLAVE_TCP))
|
||||||
log_file = os.path.join(env.log_path, "modbus_tcp_test.log")
|
log_file = os.path.join(env.log_path, 'modbus_tcp_test.log')
|
||||||
print("Logging file name: %s" % log_file)
|
print('Logging file name: %s' % log_file)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# create file handler which logs even debug messages
|
# create file handler which logs even debug messages
|
||||||
@@ -229,29 +229,29 @@ def test_modbus_communication(env, comm_mode):
|
|||||||
logger.addHandler(ch)
|
logger.addHandler(ch)
|
||||||
|
|
||||||
# Check Kconfig configuration options for each built example
|
# Check Kconfig configuration options for each built example
|
||||||
if (test_check_mode(dut_master, "CONFIG_FMB_COMM_MODE_TCP_EN", "y") and
|
if (test_check_mode(dut_master, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y') and
|
||||||
test_check_mode(dut_slave, "CONFIG_FMB_COMM_MODE_TCP_EN", "y")):
|
test_check_mode(dut_slave, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y')):
|
||||||
slave_name = TEST_SLAVE_TCP
|
slave_name = TEST_SLAVE_TCP
|
||||||
master_name = TEST_MASTER_TCP
|
master_name = TEST_MASTER_TCP
|
||||||
else:
|
else:
|
||||||
logger.error("ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n")
|
logger.error('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n')
|
||||||
raise Exception("ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n")
|
raise Exception('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n')
|
||||||
address = None
|
address = None
|
||||||
if test_check_mode(dut_master, "CONFIG_MB_SLAVE_IP_FROM_STDIN", "y"):
|
if test_check_mode(dut_master, 'CONFIG_MB_SLAVE_IP_FROM_STDIN', 'y'):
|
||||||
logger.info("ENV_TEST_INFO: Set slave IP address through STDIN.\n")
|
logger.info('ENV_TEST_INFO: Set slave IP address through STDIN.\n')
|
||||||
# Flash app onto DUT (Todo: Debug case when the slave flashed before master then expect does not work correctly for no reason
|
# Flash app onto DUT (Todo: Debug case when the slave flashed before master then expect does not work correctly for no reason
|
||||||
dut_slave.start_app()
|
dut_slave.start_app()
|
||||||
dut_master.start_app()
|
dut_master.start_app()
|
||||||
if test_check_mode(dut_master, "CONFIG_EXAMPLE_CONNECT_IPV6", "y"):
|
if test_check_mode(dut_master, 'CONFIG_EXAMPLE_CONNECT_IPV6', 'y'):
|
||||||
address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV6]), TEST_EXPECT_STR_TIMEOUT)
|
address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV6]), TEST_EXPECT_STR_TIMEOUT)
|
||||||
else:
|
else:
|
||||||
address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV4]), TEST_EXPECT_STR_TIMEOUT)
|
address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV4]), TEST_EXPECT_STR_TIMEOUT)
|
||||||
if address is not None:
|
if address is not None:
|
||||||
print("Found IP slave address: %s" % address[0])
|
print('Found IP slave address: %s' % address[0])
|
||||||
else:
|
else:
|
||||||
raise Exception("ENV_TEST_FAILURE: Slave IP address is not found in the output. Check network settings.\n")
|
raise Exception('ENV_TEST_FAILURE: Slave IP address is not found in the output. Check network settings.\n')
|
||||||
else:
|
else:
|
||||||
raise Exception("ENV_TEST_FAILURE: Slave IP resolver is not configured correctly.\n")
|
raise Exception('ENV_TEST_FAILURE: Slave IP resolver is not configured correctly.\n')
|
||||||
|
|
||||||
# Create thread for each dut
|
# Create thread for each dut
|
||||||
with DutTestThread(dut=dut_master, name=master_name, ip_addr=address[0], expect=pattern_dict_master) as dut_master_thread:
|
with DutTestThread(dut=dut_master, name=master_name, ip_addr=address[0], expect=pattern_dict_master) as dut_master_thread:
|
||||||
@@ -266,21 +266,21 @@ def test_modbus_communication(env, comm_mode):
|
|||||||
dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
|
dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
|
||||||
|
|
||||||
if dut_slave_thread.isAlive():
|
if dut_slave_thread.isAlive():
|
||||||
logger.error("ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" %
|
logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||||
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||||
raise Exception("ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" %
|
raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||||
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||||
|
|
||||||
if dut_master_thread.isAlive():
|
if dut_master_thread.isAlive():
|
||||||
logger.error("TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" %
|
logger.error('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||||
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||||
raise Exception("TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" %
|
raise Exception('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||||
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||||
|
|
||||||
logger.info("TEST_INFO: %s error count = %d, %s error count = %d.\n" %
|
logger.info('TEST_INFO: %s error count = %d, %s error count = %d.\n' %
|
||||||
(dut_master_thread.tname, dut_master_thread.param_fail_count,
|
(dut_master_thread.tname, dut_master_thread.param_fail_count,
|
||||||
dut_slave_thread.tname, dut_slave_thread.param_fail_count))
|
dut_slave_thread.tname, dut_slave_thread.param_fail_count))
|
||||||
logger.info("TEST_INFO: %s ok count = %d, %s ok count = %d.\n" %
|
logger.info('TEST_INFO: %s ok count = %d, %s ok count = %d.\n' %
|
||||||
(dut_master_thread.tname, dut_master_thread.param_ok_count,
|
(dut_master_thread.tname, dut_master_thread.param_ok_count,
|
||||||
dut_slave_thread.tname, dut_slave_thread.param_ok_count))
|
dut_slave_thread.tname, dut_slave_thread.param_ok_count))
|
||||||
|
|
||||||
@@ -288,10 +288,10 @@ def test_modbus_communication(env, comm_mode):
|
|||||||
(dut_slave_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or
|
(dut_slave_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or
|
||||||
(dut_slave_thread.param_ok_count == 0) or
|
(dut_slave_thread.param_ok_count == 0) or
|
||||||
(dut_master_thread.param_ok_count == 0)):
|
(dut_master_thread.param_ok_count == 0)):
|
||||||
raise Exception("TEST_FAILURE: %s parameter read error(ok) count = %d(%d), %s parameter read error(ok) count = %d(%d).\n" %
|
raise Exception('TEST_FAILURE: %s parameter read error(ok) count = %d(%d), %s parameter read error(ok) count = %d(%d).\n' %
|
||||||
(dut_master_thread.tname, dut_master_thread.param_fail_count, dut_master_thread.param_ok_count,
|
(dut_master_thread.tname, dut_master_thread.param_fail_count, dut_master_thread.param_ok_count,
|
||||||
dut_slave_thread.tname, dut_slave_thread.param_fail_count, dut_slave_thread.param_ok_count))
|
dut_slave_thread.tname, dut_slave_thread.param_fail_count, dut_slave_thread.param_ok_count))
|
||||||
logger.info("TEST_SUCCESS: The Modbus parameter test is completed successfully.\n")
|
logger.info('TEST_SUCCESS: The Modbus parameter test is completed successfully.\n')
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
dut_master.close()
|
dut_master.close()
|
||||||
|
Reference in New Issue
Block a user