手持式服务器端 桌面端两个分支
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
handheld/tcp_handheld.py

234 lines
10 KiB

# tcp Handheld 客户端 模拟 ,模拟数据打包
# 直接平一个小包测试数据 直接写tou
import socket
import time
# IP = "39.96.0.224"
IP = "127.0.0.1"
PORT = 7887
ADDRESS = ( IP, PORT ) # 绑定地址
INFO_HEADER = "111355AA020000250000004F000107"
DATA_HEADER = "11 13 55 AA 02 00 10 25 00 00 00 DA 06 02 07"
END_FRAME = "11 13 55 AA 02 00 FF 25 00 00 00 00 00 00 00"
# 改数据
class DataContent:
# 11 13 55 AA 0200 00 11 00 00 00 4F00 01 07
# id type-num 序号 17 字节数 包号 总
info_frame = "1703140E313518565D1E51D55605CCE812068CDDC347D0239515000000000000000000000064000000000000000000000000000000000000000000F450F450F4500600000300000000000000000000"
sensor_frame =[
# 11 13 55 AA 02 00 10 11 00 00 00 DA 06 02 07
'131123600020000000524844557691E700CB054C054A05A10001\
23A000300700000A07FD062A073707440750074D074C07530759075A07610765076A07720790079107A407C3070709BE0AAC0ABF080F080808270849089E0880094F0BB70F2A1901\
23A000300600005223CD28242B3830E235813BE9401B46675A3181C68D5C7916715D74C676617658742E71B26C9C674363DA5F1C5E0160F164DF66A262F85AD153764E4F4C014D01\
23A00030050000EA4D8C4D314DF44D3C4FC250E453BA5E4486B8C5BCECC4D29A96436BFA56734E4B4A55473446364A845140582A5A9C55A54D91465440303C1B3FDE5783718E7001\
23A00030040000DC565046C140F63D21392E32D12B5B28A226F3253525662366219E20DD1FB11E971DC51C2F1C901B921A6419C21882185818E418181B401DEC1C1D1A3917F71501\
23A00030030000EE153316F516CA176E18B218CA18B118701845186A1730155913E51314153015E1146C1420147D13C11208128C1128113011BF121614DB12F70F3B0EE50D320E01\
23A00030020000FD0D3C0D930C470C0D0CB80B710B1E0BCD0A920A560A1F0A000A140A460A3E0AD0097A096C095F092109B60870085F0834082B081C0808080B08DD07B907950701\
23A000300100007007620755074A0740074C074607460748074A07500758075D0757074F0743075F0756075F0758074F073F07240745076207770756073F072A070907E606D60601\
23A00030000000F3060F07250722072E07200703070B07EC06FF0619073E07590734071807190719070A07E106DD06F006EC061B072F073E073E07220725070B07F806C806DE0601\
23A000300700000A07FD062A073707440750074D074C07530759075A07610765076A07720790079107A407C3070709BE0AAC0ABF080F080808270849089E0880094F0BB70F2A1901\
23A000300600005223CD28242B3830E235813BE9401B46675A3181C68D5C7916715D74C676617658742E71B26C9C674363DA5F1C5E0160F164DF66A262F85AD153764E4F4C014D01\
23A00030050000EA4D8C4D314DF44D3C4FC250E453BA5E4486B8C5BCECC4D29A96436BFA56734E4B4A55473446364A845140582A5A9C55A54D91465440303C1B3FDE5783718E7001\
23A00030040000DC565046C140F63D21392E32D12B5B28A226F3253525662366219E20DD1FB11E971DC51C2F1C901B921A6419C21882185818E418181B401DEC1C1D1A3917F71501\
23A00030030000EE153316F516CA176E18B218CA18B118701845186A1730155913E51314153015E1146C1420147D13C11208128C1128113011BF121614DB12F70F3B0EE50D320E01\
23A00030020000FD0D3C0D930C470C0D0CB80B710B1E0BCD0A920A560A1F0A000A140A460A3E0AD0097A096C095F092109B60870085F0834082B081C0808080B08DD07B907950701\
23A000300100007007620755074A0740074C074607460748074A07500758075D0757074F0743075F0756075F0758074F073F07240745076207770756073F072A070907E606D60601\
23A00030000000F3060F07250722072E07200703070B07EC06FF0619073E07590734071807190719070A07E106DD06F006EC061B072F073E073E07220725070B07F806C806DE0601\
23A000300700000A07FD062A073707440750074D074C07530759075A07610765076A07720790079107A407C3070709BE0AAC0ABF080F080808270849089E0880094F0BB70F2A1901\
23A000300600005223CD28242B3830E235813BE9401B46675A3181C68D5C7916715D74C676617658742E71B26C9C674363DA5F1C5E0160F164DF66A262F85AD153764E4F4C014D01\
23A00030050000EA4D8C4D314DF44D3C4FC250E453BA5E4486B8C5BCECC4D29A96436BFA56734E4B4A55473446364A845140582A5A9C55A54D91465440303C1B3FDE5783718E7001\
23A00030040000DC565046C140F63D21392E32D12B5B28A226F3253525662366219E20DD1FB11E971DC51C2F1C901B921A6419C21882185818E418181B401DEC1C1D1A3917F71501\
23A00030030000EE153316F516CA176E18B218CA18B118701845186A1730155913E51314153015E1146C1420147D13C11208128C1128113011BF121614DB12F70F3B0EE50D320E01\
23A00030020000FD0D3C0D930C470C0D0CB80B710B1E0BCD0A920A560A1F0A000A140A460A3E0AD0097A096C095F092109B60870085F0834082B081C0808080B08DD07B907950701\
23A000300100007007620755074A0740074C074607460748074A07500758075D0757074F0743075F0756075F0758074F073F07240745076207770756073F072A070907E606D60601\
23A00030000000F3060F07250722072E07200703070B07EC06FF0619073E07590734071807190719070A07E106DD06F006EC061B072F073E073E07220725070B07F806C806DE0601',
]
end_frame = "11 13 55 AA 02 00 FF 25 00 00 00 00 00 00 00" #0xFF
class MyBuf:
'''一次测量三个传感器数据打一个包'''
def __init__(self,) -> None:
self.__buf = b''
self.__head = {}
self.__begin_sign = b'\x23'
self.__end_sign = b'\x0D'
self.send_head = b'\x11\x13\x55\xAA'
self.send_id = 2 # 2byte 地位在前
self.send_type = 0 # info 0 data 1 pic 2
self.send_num = 0 # 一次测量最多十六次平均
self.send_con = 0 # 总的测量序号 4byte ,16次平均算几次?每次算一个
self.send_size = 0 #字节数 2byte
self.send_packet_con = 0
self.send_packet_all = 0
self.repeat = 0 # 依据这个拆包
self.groups = 0
self.state = 0
def readFile2Buf(self, fpath) -> None:
with open(fpath,"rb") as f:
self.__buf = f.read()
pass
pass
def read_buf(self, size: int) -> bytes:
if size > self.__buf.__len__():
return b''
ret = self.__buf[0:size]
self.__buf = self.__buf[size:]
return ret
def write_buf(self, buf: bytes) -> None:
len = buf.__len__()
# logging.info(f'Received ID:{id} Size:{len}')
self.__buf = self.__buf+buf
def get_buf_size(self) -> int:
return self.__buf.__len__()
def back_bytes(self, buf: bytes) -> None:
self.__buf = buf+self.__buf
def reset_head(self) -> None:
self.__head = {}
def reset_buf(self) -> None:
self.__buf = b''
def pack_info_send(self,):
length = 26
ret = self.read_buf(length)
id = self.send_id.to_bytes(2, byteorder='little')
self.send_type = 0
type_num = (self.send_type *16 + self.send_num).to_bytes(1,byteorder='little' ) # ??
con = self.send_con.to_bytes(4, byteorder='little')
size = length.to_bytes(2, byteorder='little')
packet_con = self.send_packet_con.to_bytes(1,byteorder='little' )
packet_all = self.send_packet_all.to_bytes(1,byteorder='little' )
ret = self.send_head + id + type_num + con + size + packet_con + packet_all +ret
self.send_packet_con += 1
# 取重复次数和测量间隔
return ret
def pack_handheld_send(self,):
length = 26+576+576+576
ret = self.read_buf(length)
id = self.send_id.to_bytes(2, byteorder='little')
self.send_type = 1
type_num = self.send_type << 4 + self.send_num # ??
con = self.send_con.to_bytes(4, byteorder='little')
size = length.to_bytes(2, byteorder='little')
packet_con = self.send_packet_con.to_bytes(1,byteorder='little' )
packet_all = self.send_packet_all.to_bytes(1,byteorder='little' )
ret = self.send_head + id + type_num + con + size + packet_con + packet_all +ret
self.send_con+=1 # 总测量序号+1
self.send_num += 1 # 是平均中的第几次
self.send_packet_con += 1
return ret
pass
def pack_end_send(self,):
ret = b'\x11\x13\x55\xAA\x02\x00\xFF\x11\x00\x00\x00\x00\x00\x00\x00'
return ret
pass
class TcpHandheld(object):
def __init__(self ):
super(TcpHandheld, self).__init__()
self.flag = True
self.delay_time = 15
self.recv_times = 10
self.head = b''
self.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
self.socket.setblocking(True)
def connect(self,):
print(f"== connect to server : {ADDRESS}")
self.socket.connect( ADDRESS )
def disconnect(self,):
self.socket.close( )
def rcv(self,):
recv_data = self.socket.recv( 1024 )
if recv_data == self.head:
self.flag =True
time.sleep( 0.01 )
def send_rcv(self,buf):
self.socket.send( buf )
self.flag = False
self.head = buf[:15]
# 当前接收到数据就不阻塞了,后加数据不对继续阻塞
try:
data = self.socket.recv(1024)
if data != b'':
print("not empty")
print(data)
if data==self.head:
print(f"recv.... {data}" )
self.flag = True
self.head = b''
time.sleep( 0.1 )
except BlockingIOError as e:
print(f" 阻塞 中断 ....")
except KeyboardInterrupt as e:
print(f" == ctrl +c , exit")
def send_info(self,):
buf = bytes.fromhex (INFO_HEADER+ DataContent.info_frame)
self.flag = False
self.head = buf[:15]
print(f"head ....{self.head.hex()}")
self.send_rcv(buf)
def send_data(self,):
buf = b''
buf = bytes.fromhex (DATA_HEADER+ DataContent.sensor_frame[0])
self.flag = False
self.head = buf[:15]
print(f"head ....{self.head.hex()}")
self.send_rcv(buf)
# for i in range( len( DataContent.sensor_frame) ):
# print( "sensorframe..." )
# buf = bytes.fromhex ( DataContent.sensor_frame[i] )
# self.flag = False
# self.head = buf[:15]
# print(f"head ....{self.head.hex()} ")
# self.send_rcv(buf)
def send_end_frame(self,):
buf = bytes.fromhex (DataContent.end_frame)
self.flag = False
self.head = buf[:15]
print(f"head ....{self.head.hex()}")
self.send_rcv(buf)
if __name__ == '__main__':
# print( bytes.fromhex (DataContent.info_frame) )
th = TcpHandheld()
th.connect()
try :
while True:
th.send_info()
th.send_data()
th.send_end_frame()
time.sleep( 10)
except KeyboardInterrupt as e:
print(e)
th.disconnect( )