diff --git a/examples/protocols/modbus/serial/example_test.py b/examples/protocols/modbus/serial/example_test.py index bfc70f4..c150182 100644 --- a/examples/protocols/modbus/serial/example_test.py +++ b/examples/protocols/modbus/serial/example_test.py @@ -1,15 +1,15 @@ # Need Python 3 string formatting functions from __future__ import print_function +import logging import os import re -import logging from threading import Thread import ttfw_idf LOG_LEVEL = logging.DEBUG -LOGGER_NAME = "modbus_test" +LOGGER_NAME = 'modbus_test' # Allowed parameter reads TEST_READ_MIN_COUNT = 10 # Minimum number of correct readings @@ -27,38 +27,38 @@ TEST_SLAVE_ASCII = 'slave_ascii' # Define tuple of strings to expect for each DUT. # -master_expect = ("MASTER_TEST: Modbus master stack initialized...", "MASTER_TEST: Start modbus test...", "MASTER_TEST: Destroy master...") -slave_expect = ("SLAVE_TEST: Modbus slave stack initialized.", "SLAVE_TEST: Start modbus test...", "SLAVE_TEST: Modbus controller destroyed.") +master_expect = ('MASTER_TEST: Modbus master stack initialized...', 'MASTER_TEST: Start modbus test...', 'MASTER_TEST: Destroy master...') +slave_expect = ('SLAVE_TEST: Modbus slave stack initialized.', 'SLAVE_TEST: Start modbus test...', 'SLAVE_TEST: Modbus controller destroyed.') # The dictionary for expected values in listing -expect_dict_master_ok = {"START": (), - "READ_PAR_OK": (), - "ALARM_MSG": (u'7',)} +expect_dict_master_ok = {'START': (), + 'READ_PAR_OK': (), + 'ALARM_MSG': (u'7',)} -expect_dict_master_err = {"READ_PAR_ERR": (u'263', u'ESP_ERR_TIMEOUT'), - "READ_STK_ERR": (u'107', u'ESP_ERR_TIMEOUT')} +expect_dict_master_err = {'READ_PAR_ERR': (u'263', u'ESP_ERR_TIMEOUT'), + 'READ_STK_ERR': (u'107', u'ESP_ERR_TIMEOUT')} # The dictionary for regular expression patterns to check in listing -pattern_dict_master_ok = {"START": (r'.*I \([0-9]+\) MASTER_TEST: Start modbus test...'), - "READ_PAR_OK": (r'.*I\s\([0-9]+\) MASTER_TEST: Characteristic #[0-9]+ [a-zA-Z0-9_]+' +pattern_dict_master_ok = {'START': (r'.*I \([0-9]+\) MASTER_TEST: Start modbus test...'), + 'READ_PAR_OK': (r'.*I\s\([0-9]+\) MASTER_TEST: Characteristic #[0-9]+ [a-zA-Z0-9_]+' r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.\s]*\(0x[a-zA-Z0-9]+\) read successful.'), - "ALARM_MSG": (r'.*I \([0-9]*\) MASTER_TEST: Alarm triggered by cid #([0-9]+).')} + 'ALARM_MSG': (r'.*I \([0-9]*\) MASTER_TEST: Alarm triggered by cid #([0-9]+).')} -pattern_dict_master_err = {"READ_PAR_ERR_TOUT": (r'.*E \([0-9]+\) MASTER_TEST: Characteristic #[0-9]+' +pattern_dict_master_err = {'READ_PAR_ERR_TOUT': (r'.*E \([0-9]+\) MASTER_TEST: Characteristic #[0-9]+' r'\s\([a-zA-Z0-9_]+\) read fail, err = [0-9]+ \([_A-Z]+\).'), - "READ_STK_ERR_TOUT": (r'.*E \([0-9]+\) MB_CONTROLLER_MASTER: [a-zA-Z0-9_]+\([0-9]+\):\s' + 'READ_STK_ERR_TOUT': (r'.*E \([0-9]+\) MB_CONTROLLER_MASTER: [a-zA-Z0-9_]+\([0-9]+\):\s' r'SERIAL master get parameter failure error=\(0x([a-zA-Z0-9]+)\) \(([_A-Z]+)\).')} # The dictionary for expected values in listing -expect_dict_slave_ok = {"START": (), - "READ_PAR_OK": (), - "DESTROY": ()} +expect_dict_slave_ok = {'START': (), + 'READ_PAR_OK': (), + 'DESTROY': ()} # The dictionary for regular expression patterns to check in listing -pattern_dict_slave_ok = {"START": (r'.*I \([0-9]+\) SLAVE_TEST: Start modbus test...'), - "READ_PAR_OK": (r'.*I\s\([0-9]+\) SLAVE_TEST: [A-Z]+ READ \([a-zA-Z0-9_]+ us\),\s' +pattern_dict_slave_ok = {'START': (r'.*I \([0-9]+\) SLAVE_TEST: Start modbus test...'), + 'READ_PAR_OK': (r'.*I\s\([0-9]+\) SLAVE_TEST: [A-Z]+ READ \([a-zA-Z0-9_]+ us\),\s' r'ADDR:[0-9]+, TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'), - "DESTROY": (r'.*I\s\([0-9]+\) SLAVE_TEST: Modbus controller destroyed.')} + 'DESTROY': (r'.*I\s\([0-9]+\) SLAVE_TEST: Modbus controller destroyed.')} logger = logging.getLogger(LOGGER_NAME) @@ -89,8 +89,8 @@ class DutTestThread(Thread): # Check DUT exceptions dut_exceptions = self.dut.get_exceptions() - if "Guru Meditation Error:" in dut_exceptions: - raise Exception("%s generated an exception: %s\n" % (str(self.dut), dut_exceptions)) + if 'Guru Meditation Error:' in dut_exceptions: + raise Exception('%s generated an exception: %s\n' % (str(self.dut), dut_exceptions)) # Mark thread has run to completion without any exceptions self.data = self.dut.stop_capture_raw_data() @@ -102,7 +102,7 @@ def test_filter_output(data=None, start_pattern=None, end_pattern=None): """ start_index = str(data).find(start_pattern) end_index = str(data).find(end_pattern) - logger.debug("Listing start index= %d, end=%d" % (start_index, end_index)) + logger.debug('Listing start index= %d, end=%d' % (start_index, end_index)) if start_index == -1 or end_index == -1: return data return data[start_index:end_index + len(end_pattern)] @@ -145,9 +145,9 @@ def test_check_output(data=None, check_dict=None, expect_dict=None): for line in data_lines: group, index = test_expect_re(line, pattern) if index is not None: - logger.debug("Found key{%s}=%s, line: \n%s" % (key, group, line)) + logger.debug('Found key{%s}=%s, line: \n%s' % (key, group, line)) if expect_dict[key] == group: - logger.debug("The result is correct for the key:%s, expected:%s == returned:%s" % (key, str(expect_dict[key]), str(group))) + logger.debug('The result is correct for the key:%s, expected:%s == returned:%s' % (key, str(expect_dict[key]), str(group))) match_count += 1 return match_count @@ -158,7 +158,7 @@ def test_check_mode(dut=None, mode_str=None, value=None): global logger try: opt = dut.app.get_sdkconfig()[mode_str] - logger.info("%s {%s} = %s.\n" % (str(dut), mode_str, opt)) + logger.info('%s {%s} = %s.\n' % (str(dut), mode_str, opt)) return value == opt except Exception: logger.info('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str)) @@ -170,30 +170,30 @@ def test_modbus_communication(env, comm_mode): global logger # Get device under test. "dut1 - master", "dut2 - slave" must be properly connected through RS485 interface driver - dut_master = env.get_dut("modbus_master", "examples/protocols/modbus/serial/mb_master", dut_class=ttfw_idf.ESP32DUT) - dut_slave = env.get_dut("modbus_slave", "examples/protocols/modbus/serial/mb_slave", dut_class=ttfw_idf.ESP32DUT) + dut_master = env.get_dut('modbus_master', 'examples/protocols/modbus/serial/mb_master', dut_class=ttfw_idf.ESP32DUT) + dut_slave = env.get_dut('modbus_slave', 'examples/protocols/modbus/serial/mb_slave', dut_class=ttfw_idf.ESP32DUT) try: - logger.debug("Environment vars: %s\r\n" % os.environ) - logger.debug("DUT slave sdkconfig: %s\r\n" % dut_slave.app.get_sdkconfig()) - logger.debug("DUT master sdkconfig: %s\r\n" % dut_master.app.get_sdkconfig()) + logger.debug('Environment vars: %s\r\n' % os.environ) + logger.debug('DUT slave sdkconfig: %s\r\n' % dut_slave.app.get_sdkconfig()) + logger.debug('DUT master sdkconfig: %s\r\n' % dut_master.app.get_sdkconfig()) # Check Kconfig configuration options for each built example - if test_check_mode(dut_master, "CONFIG_MB_COMM_MODE_ASCII", "y") and test_check_mode(dut_slave, "CONFIG_MB_COMM_MODE_ASCII", "y"): - logger.info("ENV_TEST_INFO: Modbus ASCII test mode selected in the configuration. \n") + if test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_ASCII', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_ASCII', 'y'): + logger.info('ENV_TEST_INFO: Modbus ASCII test mode selected in the configuration. \n') slave_name = TEST_SLAVE_ASCII master_name = TEST_MASTER_ASCII - elif test_check_mode(dut_master, "CONFIG_MB_COMM_MODE_RTU", "y") and test_check_mode(dut_slave, "CONFIG_MB_COMM_MODE_RTU", "y"): - logger.info("ENV_TEST_INFO: Modbus RTU test mode selected in the configuration. \n") + elif test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_RTU', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_RTU', 'y'): + logger.info('ENV_TEST_INFO: Modbus RTU test mode selected in the configuration. \n') slave_name = TEST_SLAVE_RTU master_name = TEST_MASTER_RTU else: logger.error("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n") raise Exception("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n") # Check if slave address for example application is default one to be able to communicate - if not test_check_mode(dut_slave, "CONFIG_MB_SLAVE_ADDR", "1"): - logger.error("ENV_TEST_FAILURE: Slave address option is incorrect.\n") - raise Exception("ENV_TEST_FAILURE: Slave address option is incorrect.\n") + if not test_check_mode(dut_slave, 'CONFIG_MB_SLAVE_ADDR', '1'): + logger.error('ENV_TEST_FAILURE: Slave address option is incorrect.\n') + raise Exception('ENV_TEST_FAILURE: Slave address option is incorrect.\n') # Flash app onto each DUT dut_master.start_app() @@ -212,15 +212,15 @@ def test_modbus_communication(env, comm_mode): dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT) if dut_slave_thread.isAlive(): - logger.error("ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" % + logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) - raise Exception("ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" % + raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) if dut_master_thread.isAlive(): - logger.error("ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" % + logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) - raise Exception("ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" % + raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) finally: dut_master.close() @@ -228,43 +228,43 @@ def test_modbus_communication(env, comm_mode): # Check if test threads completed successfully and captured data if not dut_slave_thread.result or dut_slave_thread.data is None: - logger.error("The thread %s was not run successfully." % dut_slave_thread.tname) - raise Exception("The thread %s was not run successfully." % dut_slave_thread.tname) + logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname) + raise Exception('The thread %s was not run successfully.' % dut_slave_thread.tname) if not dut_master_thread.result or dut_master_thread.data is None: - logger.error("The thread %s was not run successfully." % dut_slave_thread.tname) - raise Exception("The thread %s was not run successfully." % dut_master_thread.tname) + logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname) + raise Exception('The thread %s was not run successfully.' % dut_master_thread.tname) # Filter output to get test messages master_output = test_filter_output(dut_master_thread.data, master_expect[0], master_expect[len(master_expect) - 1]) if master_output is not None: - logger.info("The data for master thread is captured.") + logger.info('The data for master thread is captured.') logger.debug(master_output) slave_output = test_filter_output(dut_slave_thread.data, slave_expect[0], slave_expect[len(slave_expect) - 1]) if slave_output is not None: - logger.info("The data for slave thread is captured.") + logger.info('The data for slave thread is captured.') logger.debug(slave_output) # Check if parameters are read correctly by master match_count = test_check_output(master_output, pattern_dict_master_ok, expect_dict_master_ok) if match_count < TEST_READ_MIN_COUNT: - logger.error("There are errors reading parameters from %s, %d" % (dut_master_thread.tname, match_count)) - raise Exception("There are errors reading parameters from %s, %d" % (dut_master_thread.tname, match_count)) - logger.info("OK pattern test for %s, match_count=%d." % (dut_master_thread.tname, match_count)) + logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) + raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) + logger.info('OK pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count)) # If the test completed successfully (alarm triggered) but there are some errors during reading of parameters match_count = test_check_output(master_output, pattern_dict_master_err, expect_dict_master_err) if match_count > TEST_READ_MAX_ERR_COUNT: - logger.error("There are errors reading parameters from %s, %d" % (dut_master_thread.tname, match_count)) - raise Exception("There are errors reading parameters from %s, %d" % (dut_master_thread.tname, match_count)) - logger.info("ERROR pattern test for %s, match_count=%d." % (dut_master_thread.tname, match_count)) + logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) + raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) + logger.info('ERROR pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count)) match_count = test_check_output(slave_output, pattern_dict_slave_ok, expect_dict_slave_ok) if match_count < TEST_READ_MIN_COUNT: - logger.error("There are errors reading parameters from %s, %d" % (dut_slave_thread.tname, match_count)) - raise Exception("There are errors reading parameters from %s, %d" % (dut_slave_thread.tname, match_count)) - logger.info("OK pattern test for %s, match_count=%d." % (dut_slave_thread.tname, match_count)) + logger.error('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count)) + raise Exception('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count)) + logger.info('OK pattern test for %s, match_count=%d.' % (dut_slave_thread.tname, match_count)) if __name__ == '__main__': @@ -282,7 +282,7 @@ if __name__ == '__main__': fh.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) - logger.info("Start script %s." % os.path.basename(__file__)) - print("Logging file name: %s" % logger.handlers[0].baseFilename) + logger.info('Start script %s.' % os.path.basename(__file__)) + print('Logging file name: %s' % logger.handlers[0].baseFilename) test_modbus_communication() logging.shutdown() diff --git a/examples/protocols/modbus/tcp/example_test.py b/examples/protocols/modbus/tcp/example_test.py index 1db10be..17c8226 100644 --- a/examples/protocols/modbus/tcp/example_test.py +++ b/examples/protocols/modbus/tcp/example_test.py @@ -1,13 +1,13 @@ +import logging import os import re -import logging from threading import Thread import ttfw_idf from tiny_test_fw import DUT LOG_LEVEL = logging.DEBUG -LOGGER_NAME = "modbus_test" +LOGGER_NAME = 'modbus_test' # Allowed options for the test TEST_READ_MAX_ERR_COUNT = 3 # Maximum allowed read errors during initialization @@ -69,7 +69,7 @@ class DutTestThread(Thread): super(DutTestThread, self).__init__() def __enter__(self): - logger.debug("Restart %s." % self.tname) + logger.debug('Restart %s.' % self.tname) # Reset DUT first self.dut.reset() # Capture output from the DUT @@ -80,7 +80,7 @@ class DutTestThread(Thread): """ The exit method of context manager """ if exc_type is not None or exc_value is not None: - logger.info("Thread %s rised an exception type: %s, value: %s" % (self.tname, str(exc_type), str(exc_value))) + logger.info('Thread %s rised an exception type: %s, value: %s' % (self.tname, str(exc_type), str(exc_value))) def run(self): """ The function implements thread functionality @@ -94,8 +94,8 @@ class DutTestThread(Thread): # Check DUT exceptions dut_exceptions = self.dut.get_exceptions() - if "Guru Meditation Error:" in dut_exceptions: - raise Exception("%s generated an exception: %s\n" % (str(self.dut), dut_exceptions)) + if 'Guru Meditation Error:' in dut_exceptions: + raise Exception('%s generated an exception: %s\n' % (str(self.dut), dut_exceptions)) # Mark thread has run to completion without any exceptions self.data = self.dut.stop_capture_raw_data(capture_id=self.dut.name) @@ -108,13 +108,13 @@ class DutTestThread(Thread): self.dut.read() result = self.dut.expect(re.compile(message), TEST_EXPECT_STR_TIMEOUT) if int(result[0]) != index: - raise Exception("Incorrect index of IP=%d for %s\n" % (int(result[0]), str(self.dut))) - message = "IP%s=%s" % (result[0], self.ip_addr) - self.dut.write(message, "\r\n", False) - logger.debug("Sent message for %s: %s" % (self.tname, message)) + raise Exception('Incorrect index of IP=%d for %s\n' % (int(result[0]), str(self.dut))) + message = 'IP%s=%s' % (result[0], self.ip_addr) + self.dut.write(message, '\r\n', False) + logger.debug('Sent message for %s: %s' % (self.tname, message)) message = r'.*IP\([0-9]+\) = \[([0-9a-zA-Z\.\:]+)\] set from stdin.*' result = self.dut.expect(re.compile(message), TEST_EXPECT_STR_TIMEOUT) - logger.debug("Thread %s initialized with slave IP (%s)." % (self.tname, result[0])) + logger.debug('Thread %s initialized with slave IP (%s).' % (self.tname, result[0])) def test_start(self, timeout_value): """ The method to initialize and handle test stages @@ -122,37 +122,37 @@ class DutTestThread(Thread): def handle_get_ip4(data): """ Handle get_ip v4 """ - logger.debug("%s[STACK_IPV4]: %s" % (self.tname, str(data))) + logger.debug('%s[STACK_IPV4]: %s' % (self.tname, str(data))) self.test_stage = STACK_IPV4 def handle_get_ip6(data): """ Handle get_ip v6 """ - logger.debug("%s[STACK_IPV6]: %s" % (self.tname, str(data))) + logger.debug('%s[STACK_IPV6]: %s' % (self.tname, str(data))) self.test_stage = STACK_IPV6 def handle_init(data): """ Handle init """ - logger.debug("%s[STACK_INIT]: %s" % (self.tname, str(data))) + logger.debug('%s[STACK_INIT]: %s' % (self.tname, str(data))) self.test_stage = STACK_INIT def handle_connect(data): """ Handle connect """ - logger.debug("%s[STACK_CONNECT]: %s" % (self.tname, str(data))) + logger.debug('%s[STACK_CONNECT]: %s' % (self.tname, str(data))) self.test_stage = STACK_CONNECT def handle_test_start(data): """ Handle connect """ - logger.debug("%s[STACK_START]: %s" % (self.tname, str(data))) + logger.debug('%s[STACK_START]: %s' % (self.tname, str(data))) self.test_stage = STACK_START def handle_par_ok(data): """ Handle parameter ok """ - logger.debug("%s[READ_PAR_OK]: %s" % (self.tname, str(data))) + logger.debug('%s[READ_PAR_OK]: %s' % (self.tname, str(data))) if self.test_stage >= STACK_START: self.param_ok_count += 1 self.test_stage = STACK_PAR_OK @@ -160,14 +160,14 @@ class DutTestThread(Thread): def handle_par_fail(data): """ Handle parameter fail """ - logger.debug("%s[READ_PAR_FAIL]: %s" % (self.tname, str(data))) + logger.debug('%s[READ_PAR_FAIL]: %s' % (self.tname, str(data))) self.param_fail_count += 1 self.test_stage = STACK_PAR_FAIL def handle_destroy(data): """ Handle destroy """ - logger.debug("%s[DESTROY]: %s" % (self.tname, str(data))) + logger.debug('%s[DESTROY]: %s' % (self.tname, str(data))) self.test_stage = STACK_DESTROY self.test_finish = True @@ -183,7 +183,7 @@ class DutTestThread(Thread): (re.compile(self.expected[STACK_DESTROY]), handle_destroy), timeout=timeout_value) except DUT.ExpectTimeout: - logger.debug("%s, expect timeout on stage #%d (%s seconds)" % (self.tname, self.test_stage, timeout_value)) + logger.debug('%s, expect timeout on stage #%d (%s seconds)' % (self.tname, self.test_stage, timeout_value)) self.test_finish = True @@ -193,7 +193,7 @@ def test_check_mode(dut=None, mode_str=None, value=None): global logger try: opt = dut.app.get_sdkconfig()[mode_str] - logger.debug("%s {%s} = %s.\n" % (str(dut), mode_str, opt)) + logger.debug('%s {%s} = %s.\n' % (str(dut), mode_str, opt)) return value == opt except Exception: logger.error('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str)) @@ -208,8 +208,8 @@ def test_modbus_communication(env, comm_mode): # Get device under test. Both duts must be able to be connected to WiFi router dut_master = env.get_dut('modbus_tcp_master', os.path.join(rel_project_path, TEST_MASTER_TCP)) dut_slave = env.get_dut('modbus_tcp_slave', os.path.join(rel_project_path, TEST_SLAVE_TCP)) - log_file = os.path.join(env.log_path, "modbus_tcp_test.log") - print("Logging file name: %s" % log_file) + log_file = os.path.join(env.log_path, 'modbus_tcp_test.log') + print('Logging file name: %s' % log_file) try: # create file handler which logs even debug messages @@ -229,29 +229,29 @@ def test_modbus_communication(env, comm_mode): logger.addHandler(ch) # Check Kconfig configuration options for each built example - if (test_check_mode(dut_master, "CONFIG_FMB_COMM_MODE_TCP_EN", "y") and - test_check_mode(dut_slave, "CONFIG_FMB_COMM_MODE_TCP_EN", "y")): + if (test_check_mode(dut_master, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y') and + test_check_mode(dut_slave, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y')): slave_name = TEST_SLAVE_TCP master_name = TEST_MASTER_TCP else: - logger.error("ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n") - raise Exception("ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n") + logger.error('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n') + raise Exception('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n') address = None - if test_check_mode(dut_master, "CONFIG_MB_SLAVE_IP_FROM_STDIN", "y"): - logger.info("ENV_TEST_INFO: Set slave IP address through STDIN.\n") + if test_check_mode(dut_master, 'CONFIG_MB_SLAVE_IP_FROM_STDIN', 'y'): + logger.info('ENV_TEST_INFO: Set slave IP address through STDIN.\n') # Flash app onto DUT (Todo: Debug case when the slave flashed before master then expect does not work correctly for no reason dut_slave.start_app() dut_master.start_app() - if test_check_mode(dut_master, "CONFIG_EXAMPLE_CONNECT_IPV6", "y"): + if test_check_mode(dut_master, 'CONFIG_EXAMPLE_CONNECT_IPV6', 'y'): address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV6]), TEST_EXPECT_STR_TIMEOUT) else: address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV4]), TEST_EXPECT_STR_TIMEOUT) if address is not None: - print("Found IP slave address: %s" % address[0]) + print('Found IP slave address: %s' % address[0]) else: - raise Exception("ENV_TEST_FAILURE: Slave IP address is not found in the output. Check network settings.\n") + raise Exception('ENV_TEST_FAILURE: Slave IP address is not found in the output. Check network settings.\n') else: - raise Exception("ENV_TEST_FAILURE: Slave IP resolver is not configured correctly.\n") + raise Exception('ENV_TEST_FAILURE: Slave IP resolver is not configured correctly.\n') # Create thread for each dut with DutTestThread(dut=dut_master, name=master_name, ip_addr=address[0], expect=pattern_dict_master) as dut_master_thread: @@ -266,21 +266,21 @@ def test_modbus_communication(env, comm_mode): dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT) if dut_slave_thread.isAlive(): - logger.error("ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" % + logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) - raise Exception("ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" % + raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) if dut_master_thread.isAlive(): - logger.error("TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" % + logger.error('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) - raise Exception("TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n" % + raise Exception('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) - logger.info("TEST_INFO: %s error count = %d, %s error count = %d.\n" % + logger.info('TEST_INFO: %s error count = %d, %s error count = %d.\n' % (dut_master_thread.tname, dut_master_thread.param_fail_count, dut_slave_thread.tname, dut_slave_thread.param_fail_count)) - logger.info("TEST_INFO: %s ok count = %d, %s ok count = %d.\n" % + logger.info('TEST_INFO: %s ok count = %d, %s ok count = %d.\n' % (dut_master_thread.tname, dut_master_thread.param_ok_count, dut_slave_thread.tname, dut_slave_thread.param_ok_count)) @@ -288,10 +288,10 @@ def test_modbus_communication(env, comm_mode): (dut_slave_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or (dut_slave_thread.param_ok_count == 0) or (dut_master_thread.param_ok_count == 0)): - raise Exception("TEST_FAILURE: %s parameter read error(ok) count = %d(%d), %s parameter read error(ok) count = %d(%d).\n" % + raise Exception('TEST_FAILURE: %s parameter read error(ok) count = %d(%d), %s parameter read error(ok) count = %d(%d).\n' % (dut_master_thread.tname, dut_master_thread.param_fail_count, dut_master_thread.param_ok_count, dut_slave_thread.tname, dut_slave_thread.param_fail_count, dut_slave_thread.param_ok_count)) - logger.info("TEST_SUCCESS: The Modbus parameter test is completed successfully.\n") + logger.info('TEST_SUCCESS: The Modbus parameter test is completed successfully.\n') finally: dut_master.close()