viper 软件
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
viper/uart.py

200 lines
5.9 KiB

import serial
import struct
import time
class Uart(object):
def __init__(self,):
self.ser = serial.Serial()
self.__buf = b''
self.command = b''
self._is_connected = False
self.begin_time = None
pass
def set_serial_para(self, port="COM1",baudrate=9600,bytesize=8,parity="N", stopbit=1, timeout = 4):
self.port = port
self.ser.port = port
self.ser.baudrate = baudrate
self.ser.bytesize = bytesize
self.ser.parity = parity
self.ser.stopbits = stopbit
self.ser.timeout = timeout
def set_modbus(self, slaveaddress=1, functioncode=3, beginaddress=0, step=5 ):
self.slaveaddress = slaveaddress
self.functioncode = functioncode
self.beginaddress = beginaddress
self.step = step
self.command = self.__get_modbus_command()
self.rcv_len = 2*self.step + 5 # 16bit
def connect(self,):
try:
self.ser.open()
if self.ser.isOpen():
self._is_connected = True
self.ser.close()
except Exception as e:
self._is_connected = False
# SerialException(f"KH3000 can not open port : {self.ser.port}")
if self.ser is not None and self.ser.isOpen():
self.ser.close()
return self._is_connected
def disconnect(self,):
if self._is_connected or self.ser.isOpen():
self.ser.close()
self._is_received = False
self._is_connected = False
pass
def write(self,) -> None:
try:
self.ser.open()
if self.ser.isOpen():
self.ser._is_connected = True
except Exception as e:
self.ser. _is_connected = False
if self.ser._is_connected:
self.begin_time = time.time()
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
# print(f"========= command {self.command.hex() }")
self.ser.write(self.command)
# if isHex:
# data = bytes.fromhex(data)
# self.ser.write(bytes(data))
# else:
# self.ser.write(bytes(data, encoding='utf8')+b'\x0A\x0D')
def receive(self,):
self.flush()
while 1:
if not self._is_receiving:
break
if self.ser.in_waiting: # 这个接收是随机的
for _ in range(self.ser.in_waiting):
# print( type(_serial.read(1)))
temp = self.ser.read(1) # 可以直接读取指定长度
self.__buf = self.__buf + temp
if len(self.__buf) >= self.rcv_len:
break
if time.time() - self.begin_time > self.ser.timeout:
self.__buf = b''
break
self._is_receiving = False
def OneMeasure(self) -> bytes:
if not self.ser.isOpen():
self.ser.open()
self._is_receiving = True
self.__buf = b''
self.receive()
if len(self.__buf)>=self.rcv_len and not self._is_receiving :
return self.__buf
return b''
pass
def reset_input_buffer(self)->None:
self.ser.reset_input_buffer()
pass
def reset_output_buffer(self)->None:
self.ser.reset_output_buffer()
pass
def flush(self)->None:
self.ser.flush()
pass
def IsOpen(self)->bool:
return self._is_connected
pass
def IsReceived(self)->bool:
return self._is_received
pass
def getResult(self)->bytes:
return self.__buf
pass
def add_crc16( self, buf:bytes ):
'''
一般发送命令都是'01030000000A'这样的字符串
输入:test2 = '01030000000A'
输出:0xc5cd
hexstr[2:].upper() 去掉
'''
crc = 0xFFFF
for pos in buf:
crc ^= pos
for i in range(8):
if ((crc & 1) != 0):
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return ((crc & 0xff) << 8) + (crc >> 8)
# return hex(((crc & 0xff) << 8) + (crc >> 8))[2:].upper()
def crc16Check(self,buf:bytes):
if len(buf) > 4 :
crc = self.add_crc16(buf[:-2])
if (crc & 0xff) == buf[-1] and (crc >> 8)== buf[-2]:
return True
else:
return False
else:
return False
def __get_modbus_command(self, ):
res = b''
res = res + self.slaveaddress.to_bytes(1, byteorder="big")
res = res + self.functioncode.to_bytes(1, byteorder="big")
res = res + self.beginaddress.to_bytes(2, byteorder="big")
res = res + self.step.to_bytes(2, byteorder="big")
crc = self.add_crc16(res)
hi = (crc >> 8).to_bytes(1, byteorder="big")
lo = (crc & 0xff).to_bytes(1, byteorder="big")
return res + hi + lo
pass
if __name__ == '__main__':
# 字符串转字节
u = Uart()
s = "01030000000A"
sss = b'\x01\x03\x00\x00\x00\x0A\xC5\xCD'
print(bytes.fromhex(s))
print(bytearray.fromhex(s))
print( u.add_crc16(bytes.fromhex(s)) )
print("00000000000000000000000000000000000")
print(sss[:-2])
if u.crc16Check(sss):
print("TRUE,...")
else:
print("fffffffffffff")
# 数字转字节
a = 1024
b=a.to_bytes(2, byteorder='big') # 2个字节
a.from_bytes(b'\x04\x00', byteorder='big')
# ASCIIunpack
data = "a"
bytes(data, encoding='utf8')
# struct 打包转字节
ss = struct.pack('>H', 8) # unpack
print(ss)
pass