Files
esp-modbus/tools/robot/ModbusTestLib.py
2024-12-18 11:26:10 +01:00

483 lines
22 KiB
Python

#!/usr/bin/python
import socket
import random
import binascii
import os
from typing import Optional, Any, Tuple
from scapy.utils import wrpcap
from scapy.packet import Packet, Raw
from scapy.supersocket import StreamSocket
from scapy.layers.inet import Ether, IP, TCP
from scapy.config import conf
from scapy.error import Scapy_Exception
from robot.api.deco import keyword, library
from robot.api.logger import info, debug, trace, console
from ModbusSupport import modbus_exceptions, ModbusADU_Request, ModbusADU_Response, ModbusPDU03_Read_Holding_Registers, ModbusPDU10_Write_Multiple_Registers, \
ModbusPDU04_Read_Input_Registers, ModbusPDU01_Read_Coils, ModbusPDU0F_Write_Multiple_Coils, ModbusPDU02_Read_Discrete_Inputs, ModbusPDU06_Write_Single_Register
# Disable debugging of dissector, and set default padding for scapy configuration class
# to workaround issues under robot framework
conf.debug_dissector = False
conf.padding = 1
# The default values for self test of the library
MB_DEF_SERVER_IP = '127.0.0.1'
MB_DEF_PORT = 1502
MB_DEF_TRANS_ID = 0x0000
MB_DEF_FUNC_HOLDING_READ = 0x03
MB_DEF_FUNC_HOLDING_WRITE = 0x10
MB_DEF_FUNC_INPUT_READ = 0x04
MB_DEF_FUNC_COILS_READ = 0x01
MB_DEF_FUNC_COILS_WRITE = 0x0F
MB_DEF_QUANTITY = 1
MB_DEF_START_OFFS = 0x0001
MB_DEF_REQ_TOUT = 5.0
MB_LOGGING_PATH = '.'
# The constructed packets for self testing
TEST_PACKET_HOLDING_READ = 'ModbusADU_Request(transId=MB_DEF_TRANS_ID, unitId=0x01, protoId=0, len=6)/\
ModbusPDU03_Read_Holding_Registers(funcCode=MB_DEF_FUNC_HOLDING_READ, startAddr=MB_DEF_START_OFFS, quantity=MB_DEF_QUANTITY)'
TEST_PACKET_HOLDING_WRITE = 'ModbusADU_Request(transId=MB_DEF_TRANS_ID, unitId=0x01, protoId=0)/\
ModbusPDU10_Write_Multiple_Registers(funcCode=MB_DEF_FUNC_HOLDING_WRITE, startAddr=MB_DEF_START_OFFS, quantityRegisters=2, outputsValue=[0x1122, 0x3344])'
TEST_PACKET_INPUT_READ = 'ModbusADU_Request(transId=MB_DEF_TRANS_ID, unitId=0x01, protoId=0, len=6)/\
ModbusPDU04_Read_Input_Registers(funcCode=MB_DEF_FUNC_INPUT_READ, startAddr=MB_DEF_START_OFFS, quantity=MB_DEF_QUANTITY)'
TEST_PACKET_COILS_READ = 'ModbusADU_Request(unitId=0x01, protoId=0)/\
ModbusPDU01_Read_Coils(funcCode=MB_DEF_FUNC_COILS_READ, startAddr=MB_DEF_START_OFFS, quantity=MB_DEF_QUANTITY)'
TEST_PACKET_COILS_WRITE = 'ModbusADU_Request(unitId=0x01, protoId=0)/\
ModbusPDU0F_Write_Multiple_Coils(funcCode=MB_DEF_FUNC_COILS_WRITE, startAddr=MB_DEF_START_OFFS, quantityOutput=MB_DEF_QUANTITY, outputsValue=[0xFF])'
# The simplified version of custom Modbus Library to check robot framework
@library(scope='GLOBAL', version='2.0.0')
class ModbusTestLib:
'''
ModbusTestLib class is the custom Modbus library for robot framework.
The test class for Modbus includes common functionality to receive and parse Modbus frames.
'''
MB_EXCEPTION_MASK = 0x0080
MB_EXCEPTION_FUNC_MASK = 0x007F
def __init__(self, ip_address = MB_DEF_SERVER_IP, port = MB_DEF_PORT, timeout = MB_DEF_REQ_TOUT) -> None:
self._connection: StreamSocket = None
self.class_id = random.randint(0,100) # is to track of created instance number
self.node_port = port
self.node_address = ip_address
self.trans_id = 0x0001
self.socket = None
self.host_ip = None
self.exception_message = None
self.exception = None
self.in_adu = None
self.in_pdu = None
self.resp_timeout = timeout
self.pcap_file_name = "{path}/{file}_{id}.{ext}".format(path=MB_LOGGING_PATH, file='mb_frames', ext='pcap', id=str(self.class_id))
if os.path.isfile(self.pcap_file_name):
os.remove(self.pcap_file_name)
@property
def connection(self) -> Optional[StreamSocket]:
# type: () -> Optional[Any]
if _connection is not None:
raise SystemError('No Connection established! Connect to server first!')
return self._connection
def get_slave_ip(self) -> int:
# type: () -> int
if self.dut_slave_ip_address is None:
raise SystemError('Transaction is not initialized!')
return self.trans_id
def get_host_ip(self):
# type: () -> Optional[Any]
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.connect(('<broadcast>', 12345)) # use random port
return s.getsockname()[0]
def get_trans_id(self) -> int:
# type: () -> int
if self.trans_id is None:
raise SystemError('Transaction is not initialized!')
return self.trans_id
def inc_trans_id(self) -> int:
# type: () -> int
if self.trans_id is None:
raise SystemError('Transaction field is incorrect!')
self.trans_id = self.trans_id + 1
if self.trans_id > 65535:
self.trans_id = 1
return self.trans_id
def get_last_adu(self) -> Optional[Packet]:
# type: () -> Optional[Packet]
return self.in_adu
def get_last_pdu(self) -> Any:
return self.in_pdu
def mb_match_packet(self, pkt) -> bool:
# type: (Packet, int, bool) -> Optional[Packet]
return True if pkt.haslayer(ModbusADU_Response) else False
# This function implements workaround for sr1 function which does not work reliable in some versions of scapy
def _req_send_recv(self, pkt, timeout=0, verbose=True) -> Optional[Packet]:
# type: (Packet, int, bool) -> Optional[Packet]
if self._connection is None:
raise ValueError("The connection is not active.")
packet = pkt.build()
try:
self._connection.send(packet)
ans = self._connection.sniff(filter=f"tcp and dst host {self.node_address} and dst port {self.node_port}",
prn=lambda x:x.sprintf("{IP:%IP.src% -> %IP.dst%\n}{Raw:%Raw.load%\n}"),
count=1, timeout=timeout)
return ans[0] if ans else None
except Exception as exception:
raise Scapy_Exception(f"Send fail: {exception}")
@keyword("Get Class Id")
def get_class_id(self) -> int:
# type: (Any) -> Optional[Packet]
"""
Return unique class ID for robot suit debugging.
Args:
None
Returns:
Class instance ID
"""
return self.class_id
# Validation of the created packet
def _validate_packet(self, packet: Packet) -> None:
# type: (Packet) -> None
if not packet.haslayer(ModbusADU_Request) or packet[ModbusADU_Request].protoId != 0:
raise ValueError("Only Modbus TCP requests are allowed!")
if not packet[ModbusADU_Request].transId:
packet.transId = self.get_trans_id()
@keyword("Create Request")
def create_request(self, packet_str) -> Packet:
# type: (Any, int) -> Optional[Packet]
"""
Create a Modbus packet based on the given string representation.
Args:
packet_str (str): A string representing the Modbus packet.
Returns:
ModbusADU_Request: The created Modbus packet.
Raises:
ValueError: If the packet creation fails.
"""
try:
packet: ModbusADU_Request = eval(packet_str)
self._validate_packet(packet)
print("Packet created: %s" % str(packet.summary()))
return packet
except Exception as exception:
raise ValueError(f"Failed to create packet: {str(exception)}")
# Connects to a target via TCP socket
@keyword("Connect")
def connect(self, ip_addr=MB_DEF_SERVER_IP, port=MB_DEF_PORT) -> StreamSocket:
# type: (Any, int) -> Optional[Any]
"""
Create a Modbus connection to target over socket stream.
Args:
ip_addr (str): A string representing the Modbus server address.
port: A server port to connect
Returns:
StreamSocket: The created Modbus socket.
Raises:
Scapy_Exception: If the packet creation fails.
"""
if (ip_addr is None and self.node_address is None):
print("Connection is not esteblished.")
raise ValueError('No parameters defined!')
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print(f"Connect to server: {ip_addr}:{port}")
s.connect((ip_addr, port))
self._connection = StreamSocket(s, basecls=ModbusADU_Response)
self.host_ip = self.get_host_ip()
print(f'Host IP address: {self.host_ip}')
self.node_address = ip_addr
self.node_port = port
self.socket = s
except Exception as exception:
self.node_address = None
self.node_port = None
self.socket
raise Scapy_Exception(f"Could not connect to socket: {exception}")
return self._connection
@keyword("Disconnect")
def disconnect(self) -> None:
# type: () -> None
"""
The disconnect from socket method to work as robot framework keyword.
Args:
None
Returns:
None
Raises:
Scapy_Exception: If the connection close fail.
"""
if self._connection is not None:
info(f"Disconnect from server.")
try:
self._connection.close()
self._connection = None
self.socket = None
except Exception as exception:
raise Scapy_Exception(f"Connection close fail, exception occurred: ({exception})")
@keyword("Send Packet")
def send_packet_and_get_response(self, pkt, timeout=2, verbose=True) -> Optional[bytes]:
# type: (Packet, int, bool) -> Optional[bytes]
"""
Wrapped send and receive function used as the robot framework keyword.
Args:
pkt: A Modbus packet.
timeout: timeout to send the data
verbose: logging information
Returns:
bytes: The created Modbus socket as Raw bytes.
Raises:
Scapy_Exception: If the packet send or receive fail.
"""
try:
request: Packet = pkt
if self._connection is None:
print("Connection is not established")
self.connect(ip_addr=self.node_address, port=self.node_port)
response: Packet = self._req_send_recv(request[ModbusADU_Request], timeout=timeout, verbose=verbose)
#assert response is not None, "No respond from slave"
if request is None or response is None:
print("No response from slave.")
return None
print(f"Packet sent: {request[ModbusADU_Request].show(dump=True)}")
print(f"Packet get: {response[ModbusADU_Response].show(dump=True)}")
print(f"Answer bin: {bytes(response)}")
print(f"Answer hex: {binascii.hexlify(bytes(response)).decode('ascii')}")
# Mimic the whole Modbus frames correctly in the pcap
dport = 502 # self.node_port, override to default port to show the packets correctly under wireshark
if not request.haslayer(Ether):
pcap_out = Ether()
if not request.haslayer(IP):
pcap_out /= IP(dst=self.node_address, src=self.host_ip)
if not request.haslayer(TCP):
pcap_out /= TCP(dport=dport, sport=random.randint(37000,39000))
pcap_out /= request
if not request.haslayer(Ether):
pcap_in = Ether()
if not request.haslayer(IP):
pcap_in /= IP(src=self.node_address, dst=self.host_ip)
if not request.haslayer(TCP):
pcap_in /= TCP(dport=dport, sport=random.randint(37000,39000))
pcap_in /= response
# record the packets sent/received
wrpcap(self.pcap_file_name, bytes(pcap_out), append=True)
wrpcap(self.pcap_file_name, bytes(pcap_in), append=True)
self.inc_trans_id()
return bytes(response) if response else None
except Exception as exception:
raise Scapy_Exception(f"Send data fail, exception occurred: ({exception})")
@keyword("Translate Response")
def translate_response(self, pkt) -> Any:
# type: (bytes) -> Optional[Packet]
"""
Translates response received from server. Does dissection of the received packet.
Args:
pkt: A Modbus packet.
Returns:
bytes: The created Modbus socket as Raw bytes.
Raises:
Scapy_Exception: If the packet send or receive fail.
"""
try:
packet: Packet = ModbusADU_Response(pkt)
if packet is None: #or not packet.haslayer(ModbusADU_Response)
raise ValueError("Only Modbus TCP responses are allowed!")
print(f"Packet received: {packet.show(dump=True)}")
self.in_pdu, __ = packet.extract_padding(packet)
print(f"Test received: pdu: {type(self.in_pdu)} {self.in_pdu}, {bytes(self.in_pdu)}")
# workaround to use dissected packets under robot framework
# issue with scapy dissector on incomplete packets
if packet.haslayer(Raw):
print(f"Test workaround PDU: {bytes(packet[Raw].load)}")
self.in_pdu = packet.guess_payload_class(packet[Raw].load)
if (packet.funcCode & self.MB_EXCEPTION_MASK) and \
hasattr(self.in_pdu, 'exceptCode') and \
(self.in_pdu.exceptCode in modbus_exceptions):
self.exception = packet.exceptCode
self.exception_message = modbus_exceptions[self.exception]
self.in_adu = packet[ModbusADU_Response]
print(f'PDU: {self.in_pdu}')
return self.in_pdu
except Exception as exception:
self.in_adu = None
self.in_pdu = None
raise Scapy_Exception(f"Parsing of response : ({exception})")
def get_int(self, val) -> int:
# type: (Any) -> int
if isinstance(val, str):
return int(val)
elif isinstance(val, int):
return val
else:
raise ValueError("Invalid value type")
@keyword("Check Response")
def check_response(self, pdu, expected_func) -> Tuple[int, str]:
# type: (Packet, str) -> Tuple[int, str]
"""
Check PDU frame from response. Check exception code
Args:
pdu: A Modbus PDU frame.
expected_func: timeout to send the data
Returns:
exception: The exception code from Modbus frame
exception_message: exception message
Raises:
ValueError: If the packet send or receive fail.
"""
assert pdu is not None and isinstance(pdu, Packet), "Incorrect pdu provided."
func_code: int = 0
if isinstance(pdu, ModbusADU_Response):
print(f"PDU is ModbusADU_Response, funcCode: {pdu.funcCode}")
func_code = pdu.getfieldval('funcCode')
elif hasattr(pdu, 'funcCode') and isinstance(pdu.funcCode, XByteField):
print(f"Test PDU: type:{type(pdu)}, PDU:{pdu}, Func:{type(pdu.funcCode.i2repr(pdu, pdu.funcCode))}, {pdu.funcCode.i2repr(pdu, pdu.funcCode)}")
func_code = pdu.getfieldval(pdu, 'funcCode')
print(f"PDU has funcCode attribute, value: {pdu.funcCode} {func_code}")
else:
raise ValueError(f"Invalid PDU type or missing function code: {type(pdu)}, {type(pdu.funcCode)}")
print(f"func code: {type(func_code)} {func_code}")
if ((func_code & self.MB_EXCEPTION_MASK) and hasattr(pdu, 'exceptCode') and pdu.exceptCode):
self.exception = pdu.exceptCode
else:
self.exception = 0
exp_func = self.get_int(expected_func) if isinstance(expected_func, str) else expected_func
assert ((func_code & self.MB_EXCEPTION_FUNC_MASK) == exp_func), f"Unexpected function code: {func_code & self.MB_EXCEPTION_FUNC_MASK}, {exp_func}"
self.exception_message = modbus_exceptions[self.exception]
print(f"MB exception: {self.exception}, {self.exception_message}")
return self.exception, self.exception_message
@keyword("Check ADU")
def check_adu(self, adu_out, adu_in):
# type: (Packet, Packet) -> Optional[int]
"""
Check ADU frame fields.
Args:
adu_out: A Modbus ADU frame request.
adu_in: A Modbus ADU frame response.
Returns:
transId: the transaction ID, if the frames are correct, otherwise returns None
Raises:
ValueError: If the packet send or receive fail.
"""
assert adu_out is not None and adu_in is not None, "Incorrect adu frame provided."
try:
if hasattr(adu_out, 'TransId') and \
hasattr(adu_in, 'TransId') and \
hasattr(adu_out, 'protoId') and \
hasattr(adu_in, 'protoId') and \
adu_out.protoId == adu_in.protoId and \
adu_out.TransId == adu_in.TransId:
return adu_out.TransId
return None
except Exception as exception:
raise Scapy_Exception(f"Send data fail, exception occurred: ({exception})")
@keyword("Get Bits From PDU")
def get_bits_from_pdu(self, pdu):
# type: (Packet) -> List[bool]
"""
Check PDU frame, extract bits (coils or discrete) from PDU.
Args:
pdu: A Modbus PDU frame.
Returns:
bits: The list of bits of boolean type.
Raises:
ValueError: If the packet send or receive fail.
"""
assert pdu is not None and isinstance(pdu, Packet) and \
(hasattr(pdu, 'coilStatus') or hasattr(pdu, 'inputStatus')), "Incorrect pdu provided."
bits = []
if hasattr(pdu, 'byteCount') and (pdu.byteCount >= 1):
data_bytes = bytes(pdu.coilStatus) if hasattr(pdu, 'coilStatus') else bytes(pdu.inputStatus)
bits = [(data_bytes[i//8] & 1 << i%8 != 0) for i in range(len(data_bytes) * 8)]
return bits
# Self test function to debug the supported methods. Todo: remove later
def self_test(self) -> None:
# type: () -> None
self.connect(ip_addr=MB_DEF_SERVER_IP, port=MB_DEF_PORT)
packet = self.create_request(TEST_PACKET_HOLDING_READ)
print(f"Test: Packet created: {packet}")
response = self.send_packet_and_get_response(packet, timeout=1, verbose=0)
assert response and len(response) > 1, "No response from slave"
print(f"Test: received: {bytes(response)}")
pdu = self.translate_response(response)
if pdu is not None:
print(f"Register values: {pdu}, len:{len(pdu.registerVal)}")
print(f"PDU Exception: {self.check_response(pdu, packet.funcCode)}")
packet = self.create_request(TEST_PACKET_HOLDING_WRITE)
response = self.send_packet_and_get_response(packet, timeout=1, verbose=0)
assert response and len(response) > 1, "No response from slave"
print(f"Write response ack: {response} ")
if response is not None:
print(f"Test: received: {bytes(response)}")
pdu = self.translate_response(response)
exception = self.check_response(pdu, packet.funcCode)
if pdu is not None and not exception:
print(f"PDU Exception: {self.check_response(pdu, packet.funcCode)}")
packet = self.create_request(TEST_PACKET_INPUT_READ)
print(f"Test: input read packet: {packet}")
response = self.send_packet_and_get_response(packet, timeout=1, verbose=0)
assert response and len(response) > 1, "incorrect response"
print(f"Test: received: {bytes(response)}")
pdu = self.translate_response(response)
exception = self.check_response(pdu, packet.funcCode)
if pdu is not None and not exception:
print(f"Register values: {pdu}, len:{len(pdu.registerVal)}")
packet = self.create_request(TEST_PACKET_COILS_READ)
print(f"Test: read coils request: {packet}")
response = self.send_packet_and_get_response(packet, timeout=1, verbose=0)
assert response and len(response) > 1, "Incorrect coil read response"
print(f"Test: received: {bytes(response)}")
pdu = self.translate_response(response)
exception = self.check_response(pdu, packet.funcCode)
if pdu is not None and not exception:
print(f"Register values: {pdu}, len:{len(pdu.coilStatus)}")
packet = self.create_request(TEST_PACKET_COILS_WRITE)
print(f"Test: write coils request: {packet}")
response = self.send_packet_and_get_response(packet, timeout=1, verbose=0)
assert response and len(response) > 1, "Incorrect coil read response"
print(f"Test: received: {bytes(response)}")
pdu = self.translate_response(response)
exception = self.check_response(pdu, packet.funcCode)
if pdu is not None and not exception:
print(f"Register values: {pdu}, len: {pdu.quantityOutput}")
self.disconnect()
####################################################################
#banner = "\nRobot custom Modbus library based on scapy framework\n"
if __name__ == "__main__":
# interact(mydict=globals(), mybanner=banner)
test_lib = ModbusTestLib()
test_lib.self_test()