examples: Fix Python coding style

* Original commit: espressif/esp-idf@57c54f96f1
This commit is contained in:
Roland Dobai
2018-12-04 08:32:48 +01:00
committed by suren-gabrielyan-espressif
parent ade4aeffa5
commit dce0b26ef8

View File

@ -3,38 +3,41 @@ import os
import sys import sys
import socket import socket
import time import time
import imp
import struct import struct
import dpkt, dpkt.dns import dpkt
import dpkt.dns
from threading import Thread from threading import Thread
# this is a test case write with tiny-test-fw. # this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw, # to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`, # we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module # then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH") try:
if test_fw_path and test_fw_path not in sys.path: import IDF
sys.path.insert(0, test_fw_path) except ImportError:
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import TinyFW import DUT
import IDF
g_run_server = True g_run_server = True
g_done = False g_done = False
def mdns_server(esp_host): def mdns_server(esp_host):
global g_run_server global g_run_server
global g_done global g_done
UDP_IP="0.0.0.0" UDP_IP = "0.0.0.0"
UDP_PORT=5353 UDP_PORT = 5353
MCAST_GRP = '224.0.0.251' MCAST_GRP = '224.0.0.251'
sock = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.bind( (UDP_IP,UDP_PORT) ) sock.bind((UDP_IP,UDP_PORT))
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01') dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
@ -47,30 +50,31 @@ def mdns_server(esp_host):
arr.cls = dpkt.dns.DNS_IN arr.cls = dpkt.dns.DNS_IN
arr.type = dpkt.dns.DNS_A arr.type = dpkt.dns.DNS_A
arr.name = u'tinytester.local' arr.name = u'tinytester.local'
arr.ip =socket.inet_aton('127.0.0.1') arr.ip = socket.inet_aton('127.0.0.1')
resp_dns. an.append(arr) resp_dns. an.append(arr)
sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT)) sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
while g_run_server: while g_run_server:
try: try:
m=sock.recvfrom( 1024 ); m = sock.recvfrom(1024)
dns = dpkt.dns.DNS(m[0]) dns = dpkt.dns.DNS(m[0])
if len(dns.qd)>0 and dns.qd[0].type == dpkt.dns.DNS_A: if len(dns.qd) > 0 and dns.qd[0].type == dpkt.dns.DNS_A:
if dns.qd[0].name == u'tinytester.local': if dns.qd[0].name == u'tinytester.local':
print (dns.__repr__(),dns.qd[0].name) print(dns.__repr__(),dns.qd[0].name)
sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT)) sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
if len(dns.an)>0 and dns.an[0].type == dpkt.dns.DNS_A: if len(dns.an) > 0 and dns.an[0].type == dpkt.dns.DNS_A:
if dns.an[0].name == esp_host + u'.local': if dns.an[0].name == esp_host + u'.local':
print("Received answer esp32-mdns query") print("Received answer esp32-mdns query")
g_done = True g_done = True
print (dns.an[0].name) print(dns.an[0].name)
dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01') dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
dns.qd[0].name= esp_host + u'.local' dns.qd[0].name = esp_host + u'.local'
sock.sendto(dns.pack(),(MCAST_GRP,UDP_PORT)) sock.sendto(dns.pack(),(MCAST_GRP,UDP_PORT))
print("Sending esp32-mdns query") print("Sending esp32-mdns query")
time.sleep(0.5) time.sleep(0.5)
except socket.timeout: except socket.timeout:
break break
@IDF.idf_example_test(env_tag="Example_WIFI") @IDF.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_mdns(env, extra_data): def test_examples_protocol_mdns(env, extra_data):
global g_run_server global g_run_server
@ -86,20 +90,19 @@ def test_examples_protocol_mdns(env, extra_data):
# check and log bin size # check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mdns-test.bin") binary_file = os.path.join(dut1.app.binary_path, "mdns-test.bin")
bin_size = os.path.getsize(binary_file) bin_size = os.path.getsize(binary_file)
IDF.log_performance("mdns-test_bin_size", "{}KB".format(bin_size//1024)) IDF.log_performance("mdns-test_bin_size", "{}KB".format(bin_size // 1024))
IDF.check_performance("mdns-test_bin_size", bin_size//1024) IDF.check_performance("mdns-test_bin_size", bin_size // 1024)
# 1. start mdns application # 1. start mdns application
dut1.start_app() dut1.start_app()
# 2. get the dut host name (and IP address) # 2. get the dut host name (and IP address)
specific_host = dut1.expect(re.compile(r"mdns hostname set to: \[([^\]]+)\]"), timeout=30) specific_host = dut1.expect(re.compile(r"mdns hostname set to: \[([^\]]+)\]"), timeout=30)
specific_host = str(specific_host[0]) specific_host = str(specific_host[0])
dut_ip = ""
try: try:
dut_ip = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30) dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
except DUT.ExpectTimeout: except DUT.ExpectTimeout:
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP') raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
# 3. check the mdns name is accessible # 3. check the mdns name is accessible
thread1 = Thread(target = mdns_server, args = (specific_host,)) thread1 = Thread(target=mdns_server, args=(specific_host,))
thread1.start() thread1.start()
start = time.time() start = time.time()
while (time.time() - start) <= 60: while (time.time() - start) <= 60:
@ -108,10 +111,11 @@ def test_examples_protocol_mdns(env, extra_data):
break break
g_run_server = False g_run_server = False
thread1.join() thread1.join()
if g_done == False: if g_done is False:
raise ValueError('Test has failed: did not receive mdns answer within timeout') raise ValueError('Test has failed: did not receive mdns answer within timeout')
# 4. check DUT output if mdns advertized host is resolved # 4. check DUT output if mdns advertized host is resolved
dut1.expect(re.compile(r"mdns-test: Query A: tinytester.local resolved to: 127.0.0.1"), timeout=30) dut1.expect(re.compile(r"mdns-test: Query A: tinytester.local resolved to: 127.0.0.1"), timeout=30)
if __name__ == '__main__': if __name__ == '__main__':
test_examples_protocol_mdns() test_examples_protocol_mdns()