Files
esp-modbus/tools/robot/ModbusSupport.py
2024-12-07 00:19:49 +08:00

442 lines
18 KiB
Python

import struct
from scapy.packet import Packet, Raw
from scapy.fields import ShortField, XShortField, ByteField, XByteField, StrLenField, \
FieldListField, ByteEnumField, BitFieldLenField, ConditionalField
# The below classes override the functionality of original scapy modbus module
# to workaround some dissection issues with modbus packets and do explicit dissection of PDA
# based on function code from payload for request, exception and response frames.
modbus_exceptions = { 0: "Undefined",
1: "Illegal function",
2: "Illegal data address",
3: "Illegal data value",
4: "Slave device failure",
5: "Acknowledge",
6: "Slave device busy",
8: "Memory parity error",
10: "Gateway path unavailable",
11: "Gateway target device failed to respond"}
# The common CRC16 checksum calculation method for Modbus Serial RTU frames
def mb_crc(frame:Raw, length) -> int:
crc = 0xFFFF
for n in range(length):
crc ^= (frame[n])
for i in range(8):
if crc & 1:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc
# Modbus MBAP basic class
class ModbusMBAP(Packet):
name = "Modbus TCP"
fields_desc = [ ShortField("transId", 0),
ShortField("protoId", 0),
ShortField("len", 6),
XByteField("UnitId", 247),
]
# Can be used to replace all Modbus read
class ModbusPDU_Read_Generic(Packet):
name = "Read Generic"
fields_desc = [ XByteField("funcCode", 0x01),
XShortField("startAddr", 0x0000),
XShortField("quantity", 0x0001)]
# 0x01 - Read Coils
class ModbusPDU01_Read_Coils(Packet):
name = "Read Coils Request"
fields_desc = [ XByteField("funcCode", 0x01),
# 0x0000 to 0xFFFF
XShortField("startAddr", 0x0000),
XShortField("quantity", 0x0001)]
class ModbusPDU01_Read_Coils_Answer(Packet):
name = "Read Coils Answer"
fields_desc = [ XByteField("funcCode", 0x01),
BitFieldLenField("byteCount", None, 8, count_of="coilStatus"),
FieldListField("coilStatus", [0x00], ByteField("",0x00), count_from = lambda pkt: pkt.byteCount) ]
class ModbusPDU01_Read_Coils_Exception(Packet):
name = "Read Coils Exception"
fields_desc = [ XByteField("funcCode", 0x81),
ByteEnumField("exceptCode", 1, modbus_exceptions)]
# 0x02 - Read Discrete Inputs
class ModbusPDU02_Read_Discrete_Inputs(Packet):
name = "Read Discrete Inputs"
fields_desc = [ XByteField("funcCode", 0x02),
XShortField("startAddr", 0x0000),
XShortField("quantity", 0x0001)]
class ModbusPDU02_Read_Discrete_Inputs_Answer(Packet):
name = "Read Discrete Inputs Answer"
fields_desc = [ XByteField("funcCode", 0x02),
BitFieldLenField("byteCount", None, 8, count_of="inputStatus"),
FieldListField("inputStatus", [0x00], ByteField("",0x00), count_from = lambda pkt: pkt.byteCount) ]
class ModbusPDU02_Read_Discrete_Inputs_Exception(Packet):
name = "Read Discrete Inputs Exception"
fields_desc = [ XByteField("funcCode", 0x82),
ByteEnumField("exceptCode", 1, modbus_exceptions)]
# 0x03 - Read Holding Registers
class ModbusPDU03_Read_Holding_Registers(Packet):
name = "Read Holding Registers"
fields_desc = [ XByteField("funcCode", 0x03),
XShortField("startAddr", 0x0001),
XShortField("quantity", 0x0002)]
class ModbusPDU03_Read_Holding_Registers_Answer(Packet):
name = "Read Holding Registers Answer"
fields_desc = [ XByteField("funcCode", 0x03),
BitFieldLenField("byteCount", None, 8, count_of="registerVal"),
FieldListField("registerVal", [0x0000], ShortField("",0x0000), count_from = lambda pkt: pkt.byteCount)]
class ModbusPDU03_Read_Holding_Registers_Exception(Packet):
name = "Read Holding Registers Exception"
fields_desc = [ XByteField("funcCode", 0x83),
ByteEnumField("exceptCode", 1, modbus_exceptions)]
# 0x04 - Read Input Registers
class ModbusPDU04_Read_Input_Registers(Packet):
name = "Read Input Registers"
fields_desc = [ XByteField("funcCode", 0x04),
XShortField("startAddr", 0x0000),
XShortField("quantity", 0x0001)]
class ModbusPDU04_Read_Input_Registers_Answer(Packet):
name = "Read Input Registers Response"
fields_desc = [XByteField("funcCode", 0x04),
BitFieldLenField("byteCount", None, 8,
count_of="registerVal",
adjust=lambda pkt, x: x * 2),
FieldListField("registerVal", [0x0000],
ShortField("", 0x0000),
count_from=lambda pkt: pkt.byteCount)]
class ModbusPDU04_Read_Input_Registers_Exception(Packet):
name = "Read Input Registers Exception"
fields_desc = [ XByteField("funcCode", 0x84),
ByteEnumField("exceptCode", 1, modbus_exceptions)]
# 0x05 - Write Single Coil
class ModbusPDU05_Write_Single_Coil(Packet):
name = "Write Single Coil"
fields_desc = [ XByteField("funcCode", 0x05),
XShortField("outputAddr", 0x0000),
XShortField("outputValue", 0x0000)]
class ModbusPDU05_Write_Single_Coil_Answer(Packet):
name = "Write Single Coil"
fields_desc = [ XByteField("funcCode", 0x05),
XShortField("outputAddr", 0x0000),
XShortField("outputValue", 0x0000)]
class ModbusPDU05_Write_Single_Coil_Exception(Packet):
name = "Write Single Coil Exception"
fields_desc = [ XByteField("funcCode", 0x85),
ByteEnumField("exceptCode", 1, modbus_exceptions)]
# 0x06 - Write Single Register
class ModbusPDU06_Write_Single_Register(Packet):
name = "Write Single Register"
fields_desc = [ XByteField("funcCode", 0x06),
XShortField("registerAddr", 0x0000),
XShortField("registerValue", 0x0000)]
class ModbusPDU06_Write_Single_Register_Answer(Packet):
name = "Write Single Register Answer"
fields_desc = [ XByteField("funcCode", 0x06),
XShortField("registerAddr", 0x0000),
XShortField("registerValue", 0x0000)]
class ModbusPDU06_Write_Single_Register_Exception(Packet):
name = "Write Single Register Exception"
fields_desc = [ XByteField("funcCode", 0x86),
ByteEnumField("exceptCode", 1, modbus_exceptions)]
# 0x07 - Read Exception Status (Serial Line Only)
class ModbusPDU07_Read_Exception_Status(Packet):
name = "Read Exception Status"
fields_desc = [ XByteField("funcCode", 0x07)]
class ModbusPDU07_Read_Exception_Status_Answer(Packet):
name = "Read Exception Status Answer"
fields_desc = [ XByteField("funcCode", 0x07),
XByteField("startAddr", 0x00)]
class ModbusPDU07_Read_Exception_Status_Exception(Packet):
name = "Read Exception Status Exception"
fields_desc = [ XByteField("funcCode", 0x87),
ByteEnumField("exceptCode", 1, modbus_exceptions)]
# 0x0F - Write Multiple Coils
class ModbusPDU0F_Write_Multiple_Coils(Packet):
name = "Write Multiple Coils"
fields_desc = [ XByteField("funcCode", 0x0F),
XShortField("startAddr", 0x0000),
XShortField("quantityOutput", 0x0001),
BitFieldLenField("byteCount", None, 8, count_of="outputsValue", adjust=lambda pkt,x:x),
FieldListField("outputsValue", [0x00], XByteField("", 0x00), count_from = lambda pkt: pkt.byteCount)]
class ModbusPDU0F_Write_Multiple_Coils_Answer(Packet):
name = "Write Multiple Coils Answer"
fields_desc = [ XByteField("funcCode", 0x0F),
XShortField("startAddr", 0x0000),
XShortField("quantityOutput", 0x0001)]
class ModbusPDU0F_Write_Multiple_Coils_Exception(Packet):
name = "Write Multiple Coils Exception"
fields_desc = [ XByteField("funcCode", 0x8F),
ByteEnumField("exceptCode", 1, modbus_exceptions)]
class ModbusPDU10_Write_Multiple_Registers(Packet):
name = "Write Multiple Registers"
fields_desc = [XByteField("funcCode", 0x10),
XShortField("startAddr", 0x0000),
BitFieldLenField("quantityRegisters", None, 16,
count_of="outputsValue"),
BitFieldLenField("byteCount", None, 8,
count_of="outputsValue",
adjust=lambda pkt, x: x * 2),
FieldListField("outputsValue", [0x0000],
XShortField("", 0x0000),
count_from=lambda pkt: pkt.byteCount)]
class ModbusPDU10_Write_Multiple_Registers_Serial(ModbusPDU10_Write_Multiple_Registers):
name = "Write Multiple Registers Serial"
_crc: int = 0
def get_crc(self) -> int:
return self._crc
def post_build(self, p, pay):
self._crc = 0
if self.outputsValue is not None and len(self.outputsValue) > 0:
self._crc = mb_crc(p, len(p))
p = p + struct.pack("<H", self._crc) #apply CRC16 network format
self.add_payload(bytes(self._crc))
#self.checksum = self._crc
print(f"post build p={p}, checksum = {self._crc}")
return p
def guess_payload_class(self, payload):
if len(payload) >= 2:
if mb_crc(payload, len(payload)) == 0:
#if self._crc == mb_crc(payload[:-2], len(payload)-2):
self._crc = struct.unpack("<H", payload[-2:])[0]
#print(f"Serial Payload: {payload}, crc: {self._crc}")
return ModbusPDU10_Write_Multiple_Registers_Serial
return Packet.guess_payload_class(self, payload)
class ModbusPDU10_Write_Multiple_Registers_Answer(Packet):
name = "Write Multiple Registers Answer"
fields_desc = [ XByteField("funcCode", 0x10),
XShortField("startAddr", 0x0000),
XShortField("quantityRegisters", 0x0001)]
class ModbusPDU10_Write_Multiple_Registers_Exception(Packet):
name = "Write Multiple Registers Exception"
fields_desc = [ XByteField("funcCode", 0x90),
ByteEnumField("exceptCode", 1, modbus_exceptions)]
# 0x11 - Report Slave Id
class ModbusPDU11_Report_Slave_Id(Packet):
name = "Report Slave Id"
fields_desc = [ XByteField("funcCode", 0x11) ]
class ModbusPDU11_Report_Slave_Id_Answer(Packet):
name = "Report Slave Id Answer"
fields_desc = [ XByteField("funcCode", 0x11),
BitFieldLenField("byteCount", None, 8, length_of="slaveId"),
ConditionalField(StrLenField("slaveId", "", length_from = lambda pkt: pkt.byteCount), lambda pkt: pkt.byteCount>0),
ConditionalField(XByteField("runIdicatorStatus", 0x00), lambda pkt: pkt.byteCount>0)]
class ModbusPDU11_Report_Slave_Id_Exception(Packet):
name = "Report Slave Id Exception"
fields_desc = [ XByteField("funcCode", 0x91),
ByteEnumField("exceptCode", 1, modbus_exceptions)]
class ModbusADU_Request(ModbusMBAP):
name = "ModbusADU Request"
_mb_exception: modbus_exceptions = 0
fields_desc = [
XShortField("transId", 0x0000), # needs to be unique
XShortField("protoId", 0x0000), # needs to be zero (Modbus)
XShortField("len", None), # is calculated with payload
XByteField("unitId", 0x00)] # 0xFF or 0x00 should be used for Modbus over TCP/IP
def mb_get_last_exception(self):
return _mb_exception
# Dissects packets
def guess_payload_class(self, payload):
funcCode = int(payload[0])
#print(f'Request guess payload class func: {funcCode}')
self._mb_exception = 0
if funcCode == 0x01:
return ModbusPDU01_Read_Coils
elif funcCode == 0x81:
self._mb_exception = int(payload[1])
return ModbusPDU01_Read_Coils_Exception
elif funcCode == 0x02:
return ModbusPDU02_Read_Discrete_Inputs
elif funcCode == 0x82:
self._mb_exception = int(payload[1])
return ModbusPDU02_Read_Discrete_Inputs_Exception
elif funcCode == 0x03:
return ModbusPDU03_Read_Holding_Registers
elif funcCode == 0x83:
self._mb_exception = int(payload[1])
return ModbusPDU03_Read_Holding_Registers_Exception
elif funcCode == 0x04:
return ModbusPDU04_Read_Input_Registers
elif funcCode == 0x84:
self._mb_exception = int(payload[1])
return ModbusPDU04_Read_Input_Registers_Exception
elif funcCode == 0x05:
return ModbusPDU05_Write_Single_Coil
elif funcCode == 0x85:
self._mb_exception = int(payload[1])
return ModbusPDU05_Write_Single_Coil_Exception
elif funcCode == 0x06:
return ModbusPDU06_Write_Single_Register
elif funcCode == 0x86:
self._mb_exception = int(payload[1])
return ModbusPDU06_Write_Single_Register_Exception
elif funcCode == 0x07:
return ModbusPDU07_Read_Exception_Status
elif funcCode == 0x87:
self._mb_exception = int(payload[1])
return ModbusPDU07_Read_Exception_Status_Exception
elif funcCode == 0x0F:
return ModbusPDU0F_Write_Multiple_Coils
elif funcCode == 0x8F:
self._mb_exception = int(payload[1])
return ModbusPDU0F_Write_Multiple_Coils_Exception
elif funcCode == 0x11:
return ModbusPDU11_Report_Slave_Id
elif funcCode == 0x91:
self._mb_exception = int(payload[1])
return ModbusPDU11_Report_Slave_Id_Exception
elif funcCode == 0x10:
#return ModbusPDU10_Write_Multiple_Registers
return ModbusPDU10_Write_Multiple_Registers_Serial.guess_payload_class(self, payload)
elif funcCode == 0x90:
self._mb_exception = int(payload[1])
return ModbusPDU10_Write_Multiple_Registers_Exception
else:
return Packet.guess_payload_class(self, payload)
def post_build(self, p, pay):
if self.len is None:
l = len(pay) + 1 #+len(p)
p = p[:4]+struct.pack("!H", l) + p[6:]
return p+pay
def my_show(self, p):
for f in p.fields_desc:
fvalue = p.getfieldval(f.name)
reprval = f.i2repr(p,fvalue)
return "%s" % (reprval)
# If we know the packet is an Modbus answer, we can dissect it with
# ModbusADU_Response(str(pkt))
# Scapy will dissect it on it's own if the TCP stream is available
class ModbusADU_Response(ModbusMBAP):
name = "ModbusADU Response"
_mb_exception: modbus_exceptions = 0
fields_desc = [
XShortField("transId", 0x0000), # needs to be unique
XShortField("protoId", 0x0000), # needs to be zero (Modbus)
XShortField("len", None), # is calculated with payload
XByteField("unitId", 0x01)] # 0xFF or 0x00 should be used for Modbus over TCP/IP
def mb_get_last_exception(self):
return _mb_exception
# Dissects packets
def guess_payload_class(self, payload):
funcCode = int(payload[0])
self._mb_exception = 0
if funcCode == 0x01:
return ModbusPDU01_Read_Coils_Answer
elif funcCode == 0x81:
self._mb_exception = int(payload[1])
return ModbusPDU01_Read_Coils_Exception
elif funcCode == 0x02:
return ModbusPDU02_Read_Discrete_Inputs_Answer
elif funcCode == 0x82:
self._mb_exception = int(payload[1])
return ModbusPDU02_Read_Discrete_Inputs_Exception
elif funcCode == 0x03:
return ModbusPDU03_Read_Holding_Registers_Answer
elif funcCode == 0x83:
self._mb_exception = int(payload[1])
return ModbusPDU03_Read_Holding_Registers_Exception
elif funcCode == 0x04:
return ModbusPDU04_Read_Input_Registers_Answer
elif funcCode == 0x84:
self._mb_exception = int(payload[1])
return ModbusPDU04_Read_Input_Registers_Exception
elif funcCode == 0x05:
return ModbusPDU05_Write_Single_Coil_Answer
elif funcCode == 0x85:
self._mb_exception = int(payload[1])
return ModbusPDU05_Write_Single_Coil_Exception
elif funcCode == 0x06:
return ModbusPDU06_Write_Single_Register_Answer
elif funcCode == 0x86:
self._mb_exception = int(payload[1])
return ModbusPDU06_Write_Single_Register_Exception
elif funcCode == 0x07:
return ModbusPDU07_Read_Exception_Status_Answer
elif funcCode == 0x87:
self._mb_exception = int(payload[1])
return ModbusPDU07_Read_Exception_Status_Exception
elif funcCode == 0x0F:
return ModbusPDU0F_Write_Multiple_Coils_Answer
elif funcCode == 0x8F:
self._mb_exception = int(payload[1])
return ModbusPDU0F_Write_Multiple_Coils_Exception
elif funcCode == 0x10:
return ModbusPDU10_Write_Multiple_Registers_Answer
elif funcCode == 0x90:
self._mb_exception = int(payload[1])
return ModbusPDU10_Write_Multiple_Registers_Exception
elif funcCode == 0x11:
return ModbusPDU11_Report_Slave_Id_Answer
elif funcCode == 0x91:
self._mb_exception = int(payload[1])
return ModbusPDU11_Report_Slave_Id_Exception
else:
return Packet.guess_payload_class(self, payload)