import serial import struct import time class Uart(object): def __init__(self,): self.ser = serial.Serial() self.__buf = b'' self.command = b'' self._is_connected = False self.begin_time = None pass def set_serial_para(self, port="COM1",baudrate=9600,bytesize=8,parity="N", stopbit=1, timeout = 4): self.port = port self.ser.port = port self.ser.baudrate = baudrate self.ser.bytesize = bytesize self.ser.parity = parity self.ser.stopbits = stopbit self.ser.timeout = timeout def set_modbus(self, slaveaddress=1, functioncode=3, beginaddress=0, step=5 ): self.slaveaddress = slaveaddress self.functioncode = functioncode self.beginaddress = beginaddress self.step = step self.command = self.__get_modbus_command() self.rcv_len = 2*self.step + 5 # 16bit def connect(self,): try: self.ser.open() if self.ser.isOpen(): self._is_connected = True self.ser.close() except Exception as e: self._is_connected = False # SerialException(f"KH3000 can not open port : {self.ser.port}") if self.ser is not None and self.ser.isOpen(): self.ser.close() return self._is_connected def disconnect(self,): if self._is_connected or self.ser.isOpen(): self.ser.close() self._is_received = False self._is_connected = False pass def write(self,) -> None: try: self.ser.open() if self.ser.isOpen(): self.ser._is_connected = True except Exception as e: self.ser. _is_connected = False if self.ser._is_connected: self.begin_time = time.time() self.ser.reset_input_buffer() self.ser.reset_output_buffer() # print(f"========= command {self.command.hex() }") self.ser.write(self.command) # if isHex: # data = bytes.fromhex(data) # self.ser.write(bytes(data)) # else: # self.ser.write(bytes(data, encoding='utf8')+b'\x0A\x0D') def receive(self,): self.flush() while 1: if not self._is_receiving: break if self.ser.in_waiting: # 这个接收是随机的 for _ in range(self.ser.in_waiting): # print( type(_serial.read(1))) temp = self.ser.read(1) # 可以直接读取指定长度 self.__buf = self.__buf + temp if len(self.__buf) >= self.rcv_len: break if time.time() - self.begin_time > self.ser.timeout: self.__buf = b'' break self._is_receiving = False def OneMeasure(self) -> bytes: if not self.ser.isOpen(): self.ser.open() self._is_receiving = True self.__buf = b'' self.receive() if len(self.__buf)>=self.rcv_len and not self._is_receiving : return self.__buf return b'' pass def reset_input_buffer(self)->None: self.ser.reset_input_buffer() pass def reset_output_buffer(self)->None: self.ser.reset_output_buffer() pass def flush(self)->None: self.ser.flush() pass def IsOpen(self)->bool: return self._is_connected pass def IsReceived(self)->bool: return self._is_received pass def getResult(self)->bytes: return self.__buf pass def add_crc16( self, buf:bytes ): ''' 一般发送命令都是'01030000000A'这样的字符串 输入:test2 = '01030000000A' 输出:0xc5cd hexstr[2:].upper() 去掉 ''' crc = 0xFFFF for pos in buf: crc ^= pos for i in range(8): if ((crc & 1) != 0): crc >>= 1 crc ^= 0xA001 else: crc >>= 1 return ((crc & 0xff) << 8) + (crc >> 8) # return hex(((crc & 0xff) << 8) + (crc >> 8))[2:].upper() def crc16Check(self,buf:bytes): if len(buf) > 4 : crc = self.add_crc16(buf[:-2]) if (crc & 0xff) == buf[-1] and (crc >> 8)== buf[-2]: return True else: return False else: return False def __get_modbus_command(self, ): res = b'' res = res + self.slaveaddress.to_bytes(1, byteorder="big") res = res + self.functioncode.to_bytes(1, byteorder="big") res = res + self.beginaddress.to_bytes(2, byteorder="big") res = res + self.step.to_bytes(2, byteorder="big") crc = self.add_crc16(res) hi = (crc >> 8).to_bytes(1, byteorder="big") lo = (crc & 0xff).to_bytes(1, byteorder="big") return res + hi + lo pass if __name__ == '__main__': # 字符串转字节 u = Uart() s = "01030000000A" sss = b'\x01\x03\x00\x00\x00\x0A\xC5\xCD' print(bytes.fromhex(s)) print(bytearray.fromhex(s)) print( u.add_crc16(bytes.fromhex(s)) ) print("00000000000000000000000000000000000") print(sss[:-2]) if u.crc16Check(sss): print("TRUE,...") else: print("fffffffffffff") # 数字转字节 a = 1024 b=a.to_bytes(2, byteorder='big') # 2个字节 a.from_bytes(b'\x04\x00', byteorder='big') # ASCIIunpack data = "a" bytes(data, encoding='utf8') # struct 打包转字节 ss = struct.pack('>H', 8) # unpack print(ss) pass