You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
200 lines
5.9 KiB
200 lines
5.9 KiB
import serial
|
|
import struct
|
|
import time
|
|
|
|
class Uart(object):
|
|
def __init__(self,):
|
|
self.ser = serial.Serial()
|
|
self.__buf = b''
|
|
self.command = b''
|
|
self._is_connected = False
|
|
self.begin_time = None
|
|
pass
|
|
|
|
def set_serial_para(self, port="COM1",baudrate=9600,bytesize=8,parity="N", stopbit=1, timeout = 4):
|
|
self.port = port
|
|
self.ser.port = port
|
|
self.ser.baudrate = baudrate
|
|
self.ser.bytesize = bytesize
|
|
self.ser.parity = parity
|
|
self.ser.stopbits = stopbit
|
|
self.ser.timeout = timeout
|
|
|
|
def set_modbus(self, slaveaddress=1, functioncode=3, beginaddress=0, step=5 ):
|
|
self.slaveaddress = slaveaddress
|
|
self.functioncode = functioncode
|
|
self.beginaddress = beginaddress
|
|
self.step = step
|
|
self.command = self.__get_modbus_command()
|
|
self.rcv_len = 2*self.step + 5 # 16bit
|
|
|
|
def connect(self,):
|
|
try:
|
|
self.ser.open()
|
|
if self.ser.isOpen():
|
|
self._is_connected = True
|
|
self.ser.close()
|
|
except Exception as e:
|
|
self._is_connected = False
|
|
# SerialException(f"KH3000 can not open port : {self.ser.port}")
|
|
|
|
if self.ser is not None and self.ser.isOpen():
|
|
self.ser.close()
|
|
return self._is_connected
|
|
|
|
def disconnect(self,):
|
|
if self._is_connected or self.ser.isOpen():
|
|
self.ser.close()
|
|
self._is_received = False
|
|
self._is_connected = False
|
|
pass
|
|
|
|
def write(self,) -> None:
|
|
try:
|
|
self.ser.open()
|
|
if self.ser.isOpen():
|
|
self.ser._is_connected = True
|
|
except Exception as e:
|
|
self.ser. _is_connected = False
|
|
|
|
if self.ser._is_connected:
|
|
self.begin_time = time.time()
|
|
self.ser.reset_input_buffer()
|
|
self.ser.reset_output_buffer()
|
|
# print(f"========= command {self.command.hex() }")
|
|
self.ser.write(self.command)
|
|
# if isHex:
|
|
# data = bytes.fromhex(data)
|
|
# self.ser.write(bytes(data))
|
|
# else:
|
|
# self.ser.write(bytes(data, encoding='utf8')+b'\x0A\x0D')
|
|
|
|
def receive(self,):
|
|
self.flush()
|
|
while 1:
|
|
if not self._is_receiving:
|
|
break
|
|
if self.ser.in_waiting: # 这个接收是随机的
|
|
for _ in range(self.ser.in_waiting):
|
|
# print( type(_serial.read(1)))
|
|
temp = self.ser.read(1) # 可以直接读取指定长度
|
|
self.__buf = self.__buf + temp
|
|
if len(self.__buf) >= self.rcv_len:
|
|
break
|
|
if time.time() - self.begin_time > self.ser.timeout:
|
|
self.__buf = b''
|
|
break
|
|
|
|
self._is_receiving = False
|
|
|
|
def OneMeasure(self) -> bytes:
|
|
if not self.ser.isOpen():
|
|
self.ser.open()
|
|
self._is_receiving = True
|
|
self.__buf = b''
|
|
self.receive()
|
|
|
|
if len(self.__buf)>=self.rcv_len and not self._is_receiving :
|
|
return self.__buf
|
|
|
|
return b''
|
|
pass
|
|
|
|
def reset_input_buffer(self)->None:
|
|
self.ser.reset_input_buffer()
|
|
pass
|
|
|
|
def reset_output_buffer(self)->None:
|
|
self.ser.reset_output_buffer()
|
|
pass
|
|
|
|
def flush(self)->None:
|
|
self.ser.flush()
|
|
pass
|
|
|
|
def IsOpen(self)->bool:
|
|
return self._is_connected
|
|
pass
|
|
|
|
def IsReceived(self)->bool:
|
|
return self._is_received
|
|
pass
|
|
|
|
def getResult(self)->bytes:
|
|
return self.__buf
|
|
pass
|
|
|
|
def add_crc16( self, buf:bytes ):
|
|
'''
|
|
一般发送命令都是'01030000000A'这样的字符串
|
|
输入:test2 = '01030000000A'
|
|
输出:0xc5cd
|
|
hexstr[2:].upper() 去掉
|
|
'''
|
|
crc = 0xFFFF
|
|
for pos in buf:
|
|
crc ^= pos
|
|
for i in range(8):
|
|
if ((crc & 1) != 0):
|
|
crc >>= 1
|
|
crc ^= 0xA001
|
|
else:
|
|
crc >>= 1
|
|
return ((crc & 0xff) << 8) + (crc >> 8)
|
|
# return hex(((crc & 0xff) << 8) + (crc >> 8))[2:].upper()
|
|
|
|
def crc16Check(self,buf:bytes):
|
|
if len(buf) > 4 :
|
|
crc = self.add_crc16(buf[:-2])
|
|
if (crc & 0xff) == buf[-1] and (crc >> 8)== buf[-2]:
|
|
return True
|
|
else:
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
def __get_modbus_command(self, ):
|
|
res = b''
|
|
res = res + self.slaveaddress.to_bytes(1, byteorder="big")
|
|
res = res + self.functioncode.to_bytes(1, byteorder="big")
|
|
res = res + self.beginaddress.to_bytes(2, byteorder="big")
|
|
res = res + self.step.to_bytes(2, byteorder="big")
|
|
crc = self.add_crc16(res)
|
|
hi = (crc >> 8).to_bytes(1, byteorder="big")
|
|
lo = (crc & 0xff).to_bytes(1, byteorder="big")
|
|
return res + hi + lo
|
|
|
|
pass
|
|
|
|
if __name__ == '__main__':
|
|
# 字符串转字节
|
|
u = Uart()
|
|
s = "01030000000A"
|
|
sss = b'\x01\x03\x00\x00\x00\x0A\xC5\xCD'
|
|
print(bytes.fromhex(s))
|
|
print(bytearray.fromhex(s))
|
|
|
|
|
|
print( u.add_crc16(bytes.fromhex(s)) )
|
|
print("00000000000000000000000000000000000")
|
|
print(sss[:-2])
|
|
if u.crc16Check(sss):
|
|
print("TRUE,...")
|
|
else:
|
|
print("fffffffffffff")
|
|
|
|
# 数字转字节
|
|
a = 1024
|
|
b=a.to_bytes(2, byteorder='big') # 2个字节
|
|
|
|
|
|
a.from_bytes(b'\x04\x00', byteorder='big')
|
|
|
|
# ASCIIunpack
|
|
data = "a"
|
|
bytes(data, encoding='utf8')
|
|
|
|
# struct 打包转字节
|
|
ss = struct.pack('>H', 8) # unpack
|
|
print(ss)
|
|
pass |