mirror of
https://github.com/espressif/esp-idf.git
synced 2025-08-03 20:54:32 +02:00
Merge branch 'bugfix/python3_target_test_3.3' into 'release/v3.3'
CI: run target test with python3 (v3.3) See merge request espressif/esp-idf!13460
This commit is contained in:
@@ -890,6 +890,7 @@ assign_test:
|
|||||||
CONFIG_FILE_PATH: "${CI_PROJECT_DIR}/examples/test_configs"
|
CONFIG_FILE_PATH: "${CI_PROJECT_DIR}/examples/test_configs"
|
||||||
LOG_PATH: "$CI_PROJECT_DIR/TEST_LOGS"
|
LOG_PATH: "$CI_PROJECT_DIR/TEST_LOGS"
|
||||||
ENV_FILE: "$CI_PROJECT_DIR/ci-test-runner-configs/$CI_RUNNER_DESCRIPTION/EnvConfig.yml"
|
ENV_FILE: "$CI_PROJECT_DIR/ci-test-runner-configs/$CI_RUNNER_DESCRIPTION/EnvConfig.yml"
|
||||||
|
PYTHON_VER: 3
|
||||||
script:
|
script:
|
||||||
- *define_config_file_name
|
- *define_config_file_name
|
||||||
# first test if config file exists, if not exist, exit 0
|
# first test if config file exists, if not exist, exit 0
|
||||||
|
@@ -109,7 +109,7 @@ def bleprph_client_task(prph_obj, dut, dut_addr):
|
|||||||
- write 'A' to characteristic with write permission
|
- write 'A' to characteristic with write permission
|
||||||
'''
|
'''
|
||||||
chars_ret_on_write = {}
|
chars_ret_on_write = {}
|
||||||
chars_ret_on_write = ble_client_obj.write_chars('A')
|
chars_ret_on_write = ble_client_obj.write_chars(b'A')
|
||||||
if chars_ret_on_write:
|
if chars_ret_on_write:
|
||||||
Utility.console_log("\nCharacteristics after write operation")
|
Utility.console_log("\nCharacteristics after write operation")
|
||||||
for path, props in chars_ret_on_write.items():
|
for path, props in chars_ret_on_write.items():
|
||||||
|
@@ -38,6 +38,7 @@ def test_examples_protocol_asio_chat_server(env, extra_data):
|
|||||||
# 2. get the server IP address
|
# 2. get the server IP address
|
||||||
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
|
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
|
||||||
# 3. create tcp client and connect to server
|
# 3. create tcp client and connect to server
|
||||||
|
dut1.expect('ASIO engine is up and running', timeout=1)
|
||||||
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
cli.settimeout(30)
|
cli.settimeout(30)
|
||||||
cli.connect((data[0],80))
|
cli.connect((data[0],80))
|
||||||
|
@@ -208,6 +208,8 @@ int asio_main()
|
|||||||
servers.emplace_back(io_context, endpoint);
|
servers.emplace_back(io_context, endpoint);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::cout << "ASIO engine is up and running" << std::endl;
|
||||||
|
|
||||||
io_context.run();
|
io_context.run();
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@@ -40,6 +40,7 @@ def test_examples_protocol_asio_tcp_server(env, extra_data):
|
|||||||
# 2. get the server IP address
|
# 2. get the server IP address
|
||||||
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
|
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
|
||||||
# 3. create tcp client and connect to server
|
# 3. create tcp client and connect to server
|
||||||
|
dut1.expect('ASIO engine is up and running', timeout=1)
|
||||||
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
cli.settimeout(30)
|
cli.settimeout(30)
|
||||||
cli.connect((data[0],80))
|
cli.connect((data[0],80))
|
||||||
|
@@ -86,6 +86,8 @@ void asio_main()
|
|||||||
|
|
||||||
server s(io_context, std::atoi(CONFIG_EXAMPLE_PORT));
|
server s(io_context, std::atoi(CONFIG_EXAMPLE_PORT));
|
||||||
|
|
||||||
|
std::cout << "ASIO engine is up and running" << std::endl;
|
||||||
|
|
||||||
io_context.run();
|
io_context.run();
|
||||||
|
|
||||||
}
|
}
|
@@ -40,6 +40,7 @@ def test_examples_protocol_asio_udp_server(env, extra_data):
|
|||||||
# 2. get the server IP address
|
# 2. get the server IP address
|
||||||
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
|
data = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
|
||||||
# 3. create tcp client and connect to server
|
# 3. create tcp client and connect to server
|
||||||
|
dut1.expect('ASIO engine is up and running', timeout=1)
|
||||||
cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
cli.settimeout(30)
|
cli.settimeout(30)
|
||||||
cli.connect((data[0], 80))
|
cli.connect((data[0], 80))
|
||||||
|
@@ -65,5 +65,7 @@ void asio_main()
|
|||||||
|
|
||||||
server s(io_context, std::atoi(CONFIG_EXAMPLE_PORT));
|
server s(io_context, std::atoi(CONFIG_EXAMPLE_PORT));
|
||||||
|
|
||||||
|
std::cout << "ASIO engine is up and running" << std::endl;
|
||||||
|
|
||||||
io_context.run();
|
io_context.run();
|
||||||
}
|
}
|
||||||
|
@@ -113,12 +113,6 @@ def test_examples_protocol_http_server_simple(env, extra_data):
|
|||||||
raise RuntimeError
|
raise RuntimeError
|
||||||
dut1.expect("Found URL query => " + query, timeout=30)
|
dut1.expect("Found URL query => " + query, timeout=30)
|
||||||
|
|
||||||
query = "abcd\nyz"
|
|
||||||
Utility.console_log("Test /hello with invalid query")
|
|
||||||
if client.test_custom_uri_query(got_ip, got_port, query):
|
|
||||||
raise RuntimeError
|
|
||||||
dut1.expect("400 Bad Request - Server unable to understand request due to invalid syntax", timeout=30)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
test_examples_protocol_http_server_simple()
|
test_examples_protocol_http_server_simple()
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
import re
|
import re
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
import struct
|
||||||
import socket
|
import socket
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
import ssl
|
import ssl
|
||||||
@@ -224,8 +225,8 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(env, extra_d
|
|||||||
truncated_bin_size = 64000
|
truncated_bin_size = 64000
|
||||||
# check and log bin size
|
# check and log bin size
|
||||||
binary_file = os.path.join(dut1.app.binary_path, bin_name)
|
binary_file = os.path.join(dut1.app.binary_path, bin_name)
|
||||||
f = open(binary_file, "r+")
|
f = open(binary_file, "rb+")
|
||||||
fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "w+")
|
fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "wb+")
|
||||||
fo.write(f.read(truncated_bin_size))
|
fo.write(f.read(truncated_bin_size))
|
||||||
fo.close()
|
fo.close()
|
||||||
f.close()
|
f.close()
|
||||||
@@ -275,8 +276,8 @@ def test_examples_protocol_advanced_https_ota_example_truncated_header(env, extr
|
|||||||
truncated_bin_size = 180
|
truncated_bin_size = 180
|
||||||
# check and log bin size
|
# check and log bin size
|
||||||
binary_file = os.path.join(dut1.app.binary_path, bin_name)
|
binary_file = os.path.join(dut1.app.binary_path, bin_name)
|
||||||
f = open(binary_file, "r+")
|
f = open(binary_file, "rb+")
|
||||||
fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "w+")
|
fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "wb+")
|
||||||
fo.write(f.read(truncated_bin_size))
|
fo.write(f.read(truncated_bin_size))
|
||||||
fo.close()
|
fo.close()
|
||||||
f.close()
|
f.close()
|
||||||
@@ -324,12 +325,12 @@ def test_examples_protocol_advanced_https_ota_example_random(env, extra_data):
|
|||||||
random_bin_size = 32000
|
random_bin_size = 32000
|
||||||
# check and log bin size
|
# check and log bin size
|
||||||
binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
|
binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
|
||||||
fo = open(binary_file, "w+")
|
fo = open(binary_file, "wb+")
|
||||||
# First byte of binary file is always set to zero. If first byte is generated randomly,
|
# First byte of binary file is always set to zero. If first byte is generated randomly,
|
||||||
# in some cases it may generate 0xE9 which will result in failure of testcase.
|
# in some cases it may generate 0xE9 which will result in failure of testcase.
|
||||||
fo.write(str(0))
|
fo.write(struct.pack("B", 0))
|
||||||
for i in range(random_bin_size - 1):
|
for i in range(random_bin_size - 1):
|
||||||
fo.write(str(random.randrange(0,255,1)))
|
fo.write(struct.pack("B", random.randrange(0,255,1)))
|
||||||
fo.close()
|
fo.close()
|
||||||
bin_size = os.path.getsize(binary_file)
|
bin_size = os.path.getsize(binary_file)
|
||||||
IDF.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
|
IDF.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
import re
|
import re
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
import struct
|
||||||
import socket
|
import socket
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
import ssl
|
import ssl
|
||||||
@@ -196,8 +197,8 @@ def test_examples_protocol_native_ota_example_truncated_bin(env, extra_data):
|
|||||||
truncated_bin_size = 64000
|
truncated_bin_size = 64000
|
||||||
# check and log bin size
|
# check and log bin size
|
||||||
binary_file = os.path.join(dut1.app.binary_path, bin_name)
|
binary_file = os.path.join(dut1.app.binary_path, bin_name)
|
||||||
f = open(binary_file, "r+")
|
f = open(binary_file, "rb+")
|
||||||
fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "w+")
|
fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "wb+")
|
||||||
fo.write(f.read(truncated_bin_size))
|
fo.write(f.read(truncated_bin_size))
|
||||||
fo.close()
|
fo.close()
|
||||||
f.close()
|
f.close()
|
||||||
@@ -247,8 +248,8 @@ def test_examples_protocol_native_ota_example_truncated_header(env, extra_data):
|
|||||||
truncated_bin_size = 180
|
truncated_bin_size = 180
|
||||||
# check and log bin size
|
# check and log bin size
|
||||||
binary_file = os.path.join(dut1.app.binary_path, bin_name)
|
binary_file = os.path.join(dut1.app.binary_path, bin_name)
|
||||||
f = open(binary_file, "r+")
|
f = open(binary_file, "rb+")
|
||||||
fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "w+")
|
fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "wb+")
|
||||||
fo.write(f.read(truncated_bin_size))
|
fo.write(f.read(truncated_bin_size))
|
||||||
fo.close()
|
fo.close()
|
||||||
f.close()
|
f.close()
|
||||||
@@ -296,12 +297,12 @@ def test_examples_protocol_native_ota_example_random(env, extra_data):
|
|||||||
random_bin_size = 32000
|
random_bin_size = 32000
|
||||||
# check and log bin size
|
# check and log bin size
|
||||||
binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
|
binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
|
||||||
fo = open(binary_file, "w+")
|
fo = open(binary_file, "wb+")
|
||||||
# First byte of binary file is always set to zero. If first byte is generated randomly,
|
# First byte of binary file is always set to zero. If first byte is generated randomly,
|
||||||
# in some cases it may generate 0xE9 which will result in failure of testcase.
|
# in some cases it may generate 0xE9 which will result in failure of testcase.
|
||||||
fo.write(str(0))
|
fo.write(struct.pack("B", 0))
|
||||||
for i in range(random_bin_size - 1):
|
for i in range(random_bin_size - 1):
|
||||||
fo.write(str(random.randrange(0,255,1)))
|
fo.write(struct.pack("B", random.randrange(0,255,1)))
|
||||||
fo.close()
|
fo.close()
|
||||||
bin_size = os.path.getsize(binary_file)
|
bin_size = os.path.getsize(binary_file)
|
||||||
IDF.log_performance("native_ota_bin_size", "{}KB".format(bin_size // 1024))
|
IDF.log_performance("native_ota_bin_size", "{}KB".format(bin_size // 1024))
|
||||||
|
@@ -160,6 +160,7 @@ static esp_err_t IRAM_ATTR iperf_run_tcp_server(void)
|
|||||||
return ESP_FAIL;
|
return ESP_FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
printf("iperf tcp server create successfully\n");
|
||||||
buffer = s_iperf_ctrl.buffer;
|
buffer = s_iperf_ctrl.buffer;
|
||||||
want_recv = s_iperf_ctrl.buffer_len;
|
want_recv = s_iperf_ctrl.buffer_len;
|
||||||
while (!s_iperf_ctrl.finish) {
|
while (!s_iperf_ctrl.finish) {
|
||||||
|
@@ -201,7 +201,7 @@ class TestResult(object):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _convert_to_draw_format(data, label):
|
def _convert_to_draw_format(data, label):
|
||||||
keys = data.keys()
|
keys = list(data.keys())
|
||||||
keys.sort()
|
keys.sort()
|
||||||
return {
|
return {
|
||||||
"x-axis": keys,
|
"x-axis": keys,
|
||||||
@@ -235,8 +235,8 @@ class TestResult(object):
|
|||||||
|
|
||||||
LineChart.draw_line_chart(os.path.join(path, file_name),
|
LineChart.draw_line_chart(os.path.join(path, file_name),
|
||||||
"Throughput Vs {} ({} {})".format(type_name, self.proto, self.direction),
|
"Throughput Vs {} ({} {})".format(type_name, self.proto, self.direction),
|
||||||
"Throughput (Mbps)",
|
|
||||||
"{} (dbm)".format(type_name),
|
"{} (dbm)".format(type_name),
|
||||||
|
"Throughput (Mbps)",
|
||||||
data_list)
|
data_list)
|
||||||
return file_name
|
return file_name
|
||||||
|
|
||||||
@@ -324,7 +324,7 @@ class IperfTestUtility(object):
|
|||||||
except subprocess.CalledProcessError:
|
except subprocess.CalledProcessError:
|
||||||
pass
|
pass
|
||||||
self.dut.write("restart")
|
self.dut.write("restart")
|
||||||
self.dut.expect("esp32>")
|
self.dut.expect_any("iperf>", "esp32>")
|
||||||
self.dut.write("scan {}".format(self.ap_ssid))
|
self.dut.write("scan {}".format(self.ap_ssid))
|
||||||
for _ in range(SCAN_RETRY_COUNT):
|
for _ in range(SCAN_RETRY_COUNT):
|
||||||
try:
|
try:
|
||||||
@@ -377,6 +377,12 @@ class IperfTestUtility(object):
|
|||||||
with open(PC_IPERF_TEMP_LOG_FILE, "w") as f:
|
with open(PC_IPERF_TEMP_LOG_FILE, "w") as f:
|
||||||
if proto == "tcp":
|
if proto == "tcp":
|
||||||
self.dut.write("iperf -s -i 1 -t {}".format(TEST_TIME))
|
self.dut.write("iperf -s -i 1 -t {}".format(TEST_TIME))
|
||||||
|
# wait until DUT TCP server created
|
||||||
|
try:
|
||||||
|
self.dut.expect("iperf tcp server create successfully", timeout=1)
|
||||||
|
except DUT.ExpectTimeout:
|
||||||
|
# compatible with old iperf example binary
|
||||||
|
pass
|
||||||
process = subprocess.Popen(["iperf", "-c", dut_ip,
|
process = subprocess.Popen(["iperf", "-c", dut_ip,
|
||||||
"-t", str(TEST_TIME), "-f", "m"],
|
"-t", str(TEST_TIME), "-f", "m"],
|
||||||
stdout=f, stderr=f)
|
stdout=f, stderr=f)
|
||||||
@@ -450,7 +456,7 @@ class IperfTestUtility(object):
|
|||||||
:return: True or False
|
:return: True or False
|
||||||
"""
|
"""
|
||||||
self.dut.write("restart")
|
self.dut.write("restart")
|
||||||
self.dut.expect("esp32>")
|
self.dut.expect_any("iperf>", "esp32>")
|
||||||
for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT):
|
for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT):
|
||||||
try:
|
try:
|
||||||
self.dut.write("scan {}".format(self.ap_ssid))
|
self.dut.write("scan {}".format(self.ap_ssid))
|
||||||
@@ -518,7 +524,7 @@ def test_wifi_throughput_with_different_configs(env, extra_data):
|
|||||||
# 2. get DUT and download
|
# 2. get DUT and download
|
||||||
dut = env.get_dut("iperf", "examples/wifi/iperf")
|
dut = env.get_dut("iperf", "examples/wifi/iperf")
|
||||||
dut.start_app()
|
dut.start_app()
|
||||||
dut.expect("esp32>")
|
dut.expect_any("iperf>", "esp32>")
|
||||||
|
|
||||||
# 3. run test for each required att value
|
# 3. run test for each required att value
|
||||||
test_result[config_name] = {
|
test_result[config_name] = {
|
||||||
@@ -576,7 +582,7 @@ def test_wifi_throughput_vs_rssi(env, extra_data):
|
|||||||
# 2. get DUT and download
|
# 2. get DUT and download
|
||||||
dut = env.get_dut("iperf", "examples/wifi/iperf")
|
dut = env.get_dut("iperf", "examples/wifi/iperf")
|
||||||
dut.start_app()
|
dut.start_app()
|
||||||
dut.expect("esp32>")
|
dut.expect_any("iperf>", "esp32>")
|
||||||
|
|
||||||
# 3. run test for each required att value
|
# 3. run test for each required att value
|
||||||
for ap_info in ap_list:
|
for ap_info in ap_list:
|
||||||
@@ -625,7 +631,7 @@ def test_wifi_throughput_basic(env, extra_data):
|
|||||||
# 2. get DUT
|
# 2. get DUT
|
||||||
dut = env.get_dut("iperf", "examples/wifi/iperf")
|
dut = env.get_dut("iperf", "examples/wifi/iperf")
|
||||||
dut.start_app()
|
dut.start_app()
|
||||||
dut.expect("esp32>")
|
dut.expect_any("iperf>", "esp32>")
|
||||||
|
|
||||||
# 3. preparing
|
# 3. preparing
|
||||||
test_result = {
|
test_result = {
|
||||||
|
@@ -36,7 +36,7 @@ class ThroughputForConfigsReport(object):
|
|||||||
self.sdkconfigs[config_name] = self._parse_config_file(sdkconfig_files[config_name])
|
self.sdkconfigs[config_name] = self._parse_config_file(sdkconfig_files[config_name])
|
||||||
if not os.path.exists(output_path):
|
if not os.path.exists(output_path):
|
||||||
os.makedirs(output_path)
|
os.makedirs(output_path)
|
||||||
self.sort_order = self.sdkconfigs.keys()
|
self.sort_order = list(self.sdkconfigs.keys())
|
||||||
self.sort_order.sort()
|
self.sort_order.sort()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@@ -162,7 +162,7 @@ class ThroughputVsRssiReport(object):
|
|||||||
self.output_path = output_path
|
self.output_path = output_path
|
||||||
self.raw_data_path = os.path.join(output_path, "raw_data")
|
self.raw_data_path = os.path.join(output_path, "raw_data")
|
||||||
self.results = throughput_results
|
self.results = throughput_results
|
||||||
self.throughput_types = self.results.keys()
|
self.throughput_types = list(self.results.keys())
|
||||||
self.throughput_types.sort()
|
self.throughput_types.sort()
|
||||||
if not os.path.exists(self.raw_data_path):
|
if not os.path.exists(self.raw_data_path):
|
||||||
os.makedirs(self.raw_data_path)
|
os.makedirs(self.raw_data_path)
|
||||||
|
@@ -1,14 +1,7 @@
|
|||||||
#! /bin/bash
|
#! /bin/bash
|
||||||
|
|
||||||
# Regexp for matching job names which are incompatible with Python 3
|
if [ -z ${PYTHON_VER+x} ]; then
|
||||||
# - UT_009_ - multi-device tests are not compatible
|
# Use this version of the Python interpreter if it was not defined before
|
||||||
# - UT_014_ - multi-device tests are not compatible
|
|
||||||
# - UT_017_ - multi-device tests are not compatible
|
|
||||||
py3_incomp='UT_009_|UT_013_|UT_014_|UT_017_'
|
|
||||||
|
|
||||||
if [ -z ${PYTHON_VER+x} ] || [[ $CI_JOB_NAME =~ $py3_incomp ]]; then
|
|
||||||
# Use this version of the Python interpreter if it was not defined before or
|
|
||||||
# the given job is not compatible with Python 3
|
|
||||||
PYTHON_VER=2.7.15
|
PYTHON_VER=2.7.15
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
@@ -16,7 +16,7 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
from builtins import input
|
from builtins import input as binput
|
||||||
import argparse
|
import argparse
|
||||||
import textwrap
|
import textwrap
|
||||||
import time
|
import time
|
||||||
@@ -423,7 +423,7 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
select = int(input("Select AP by number (0 to rescan) : "))
|
select = int(binput("Select AP by number (0 to rescan) : "))
|
||||||
if select < 0 or select > len(APs):
|
if select < 0 or select > len(APs):
|
||||||
raise ValueError
|
raise ValueError
|
||||||
break
|
break
|
||||||
|
@@ -167,7 +167,7 @@ class Security1(Security):
|
|||||||
return -1
|
return -1
|
||||||
|
|
||||||
def encrypt_data(self, data):
|
def encrypt_data(self, data):
|
||||||
return self.cipher.update(data)
|
return self.cipher.update(tobytes(data))
|
||||||
|
|
||||||
def decrypt_data(self, data):
|
def decrypt_data(self, data):
|
||||||
return self.cipher.update(data)
|
return self.cipher.update(tobytes(data))
|
||||||
|
@@ -234,6 +234,8 @@ class BLE_Bluez_Client:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
|
path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
|
||||||
|
except TypeError: # python3 compatible
|
||||||
|
path.WriteValue([c for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
|
||||||
except dbus.exceptions.DBusException as e:
|
except dbus.exceptions.DBusException as e:
|
||||||
raise RuntimeError("Failed to write value to characteristic " + characteristic_uuid + ": " + str(e))
|
raise RuntimeError("Failed to write value to characteristic " + characteristic_uuid + ": " + str(e))
|
||||||
|
|
||||||
|
@@ -14,12 +14,14 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
# Convenience functions for commonly used data type conversions
|
# Convenience functions for commonly used data type conversions
|
||||||
|
import binascii
|
||||||
|
from future.utils import tobytes
|
||||||
|
|
||||||
|
|
||||||
def str_to_hexstr(string):
|
def str_to_hexstr(string):
|
||||||
# Form hexstr by appending ASCII codes (in hex) corresponding to
|
# Form hexstr by appending ASCII codes (in hex) corresponding to
|
||||||
# each character in the input string
|
# each character in the input string
|
||||||
return ''.join('{:02x}'.format(ord(c)) for c in string)
|
return binascii.hexlify(tobytes(string)).decode()
|
||||||
|
|
||||||
|
|
||||||
def hexstr_to_str(hexstr):
|
def hexstr_to_str(hexstr):
|
||||||
@@ -28,4 +30,4 @@ def hexstr_to_str(hexstr):
|
|||||||
hexstr = '0' + hexstr
|
hexstr = '0' + hexstr
|
||||||
# Interpret consecutive pairs of hex characters as 8 bit ASCII codes
|
# Interpret consecutive pairs of hex characters as 8 bit ASCII codes
|
||||||
# and append characters corresponding to each code to form the string
|
# and append characters corresponding to each code to form the string
|
||||||
return ''.join(chr(int(hexstr[2 * i: 2 * i + 2], 16)) for i in range(len(hexstr) // 2))
|
return binascii.unhexlify(tobytes(hexstr)).decode()
|
||||||
|
Reference in New Issue
Block a user