包含服务器端 ,桌面端两个分支
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
awrams/receive.py

526 lines
18 KiB

#!/usr/bin/env python
# coding:utf-8
'''
# 因为图片帧不是必须,必须按帧处理数据。 可能摄像头坏掉,没有传感器数据??
# 按帧处理数据,必须在每帧接收完毕判断,数据是否完整, 完整则进一步处理 !!!!!!!!!!!!
# 时间作为目录
'''
import socket
import socketserver
from socketserver import TCPServer,ThreadingMixIn
import threading
# import datetime
import time
# import os
import struct
from tools.mypath import MyDir
from tools.mylogger import log
from pathlib import Path,PurePath
from myconfig import DATA_DIR,DeviceType
from awrams import AWRAMS
IP = ""
PORT = 7887
ADDRESS = (IP, PORT) # 绑定地址
# LOGGING_LEVEL = logging.DEBUG
# LOGGING_LEVEL = logging.INFO
# LOGGING_LEVEL = logging.WARNING
DATA_FRAME_HEAD = b'\x11\x13\x55\xaa'
DATA_FRAME_TAIL = b'\xff\xd9'
PIC_BEGIN_BYTES = b'\xff\xd8'
# 连接超时
TIMEOUT_SECOND = 8 * 3600
# 连接线程池
conn_pool = []
# save_path = Path
class MyTCPServer(TCPServer):
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, cfg=None, retrieve=None):
self.cfg = cfg
self.retrieve = retrieve
TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True )
class MyThreadingTCPServer(ThreadingMixIn, MyTCPServer): pass
class MyException(Exception):
def __init__(self, message="自定义异常"):
self.message = message
class illumination_sensor:
def __init__(self, socket: socket.socket) -> None:
self.__buf = b''
self.__head = {
# "id" : -1, # 设备id
# 'type' : -1, # 类型 信息 传感器 图片
# 'num' : -1, # 光学传感器的第几次测量
# 'con' : -1, # 总的测量序号
# 'size' : -1, # 字节大小
# 'packet_con' : -1, # 第几帧
# 'packet_all' : -1, # 总帧数
# 'head' : b'', # 帧头
# 'payload' : b'' # 具体内容
}
self.state = 0
self.socket = socket
self.id = 0
# self.is_data_complete = {
# "info_frame": False,
# 'sensor_frame': False,
# 'pic_frame': False
# }
# self.data = {
# "info_frame": {},
# 'sensor_frame': {},
# 'pic_frame': {}
# }
self.timeout_base = int(time.time())
def set_id(self, id) -> None: # 对应设备ID
self.id = id
# def set_socket(self,socket:socket.socket) -> socket.socket:
# tmp = self.socket
# self.socket=socket
# return tmp
def read_buf(self, size: int) -> bytes:
if size > self.__buf.__len__():
return b''
ret = self.__buf[0:size]
self.__buf = self.__buf[size:]
return ret
def write_buf(self, buf: bytes) -> None:
id = self.id
len = buf.__len__()
# logging.info(f'Received ID:{id} Size:{len}')
self.__buf = self.__buf+buf
def get_buf_size(self) -> int:
return self.__buf.__len__()
def back_bytes(self, buf: bytes) -> None:
self.__buf = buf+self.__buf
def reset_head(self) -> None:
self.__head = {}
# self.__head['id'] = -1
# self.__head['type'] = -1
# self.__head['num'] = -1
# self.__head['con'] = -1
# self.__head['size'] = -1
# self.__head['packet_con'] = -1
# self.__head['packet_all'] = -1
# self.__head['head'] = b''
# self.__head['payload'] = b''
def reset_data(self) -> None:
self.data['info_frame']: dict = {}
self.data['sensor_frame']: dict = {}
self.data['pic_frame']: dict = {}
def decode(self) -> dict:
if self.__head == {}:
while self.get_buf_size() >= 15:
if self.read_buf(1) != b'\x11':
continue
c = self.read_buf(1)
if c != b'\x13':
self.back_bytes(c)
continue
c = self.read_buf(1)
if c != b'\x55':
self.back_bytes(c)
continue
c = self.read_buf(1)
if c != b'\xaa':
self.back_bytes(c)
continue
head = self.read_buf(11)
head_list = struct.unpack('<HBIHBB', head)
self.__head['id'] = head_list[0]
self.__head['type'] = head_list[1] >> 4
self.__head['num'] = head_list[1] & 0x0f
self.__head['con'] = head_list[2]
self.__head['size'] = head_list[3]
self.__head['packet_con'] = head_list[4]
self.__head['packet_all'] = head_list[5]
self.__head['head'] = b'\x11\x13\x55\xaa'+head
break
if self.__head != {}:
payload = self.read_buf(self.__head['size'])
if payload != b'':
self.__head['payload'] = payload
data = self.__head.copy()
self.__head = {}
self.id = data['id']
return data
return {}
class DealData:
"""
@description : 调用AWRAMS类处理数据
@param :
@Returns :
"""
def __init__(self) -> None:
self.device_id = None
self.devie_type = DeviceType.AWRAMS.name
self.measure_id = None
self.cfg = {}
self.awrams = None
pass
def deal(self, id: int, con: int, cfg:dict , retrieve) -> None: # 取字典中的 payload
log.info(f" 接收到数据开始处理数据 device_id {id} ")
self.device_id = id
self.measure_id = con
if self.device_id is None:
self.device_id = id
if self.cfg == {}:
self.cfg = cfg
# self.cfg = cfg.get(self.device_id)
if self.awrams is None:
self.awrams = AWRAMS() ##处理数据
self.awrams.setSyscfg(self.cfg)
self.awrams.setRetrieve(retrieve)
self.awrams.setDeviceID(self.device_id)
self.awrams.setMeasureID(self.measure_id)
path_tuple = ( "data", str(id), str(con) )
self.awrams.setOldFolder( path_tuple )
self.awrams.getInfoDict( )
self.awrams.transferFromOldFolder()
self.awrams.deleteOldFolder()
self.awrams.dealOneMeasurement_Online()
log.info(f" Complete Dealing one group.")
# self.awrams.readOneFolder( )
@staticmethod
def deal2(id: int, con: int) -> None: # 取字典中的 payload
log.info(f" 修改目录为时间格式,并处理 device_id: {id}, measure_con: {con} ")
src_dir = DATA_DIR.joinpath( str(id), str(con) )
bin_file_list = src_dir.glob( '*.bin' )
info_frame= DealData.read_bin( src_dir.joinpath('info.bin') )
if info_frame == None:
raise MyException("处理数据时,信息帧读取数据为空")
info_dict:dict= DealData.decode_info(info_frame)
if info_dict =={}:
raise MyException("从文件读取信息帧后,解析信息帧遇到异常")
# id/year/month/day/con
dst_dir = DATA_DIR.joinpath("20"+str(info_dict["year"]), str(info_dict["month"]),str(info_dict["day"]), str(con))
log.info( f"dst_dir: {dst_dir}")
if dst_dir.exists() == False:
dst_dir.mkdir(parents=True)
# 保存info_dict到新的目录
DealData.save_dict_to_file(info_dict, dst_dir.joinpath(
"info_20"+str(info_dict["year"])
+str(info_dict["month"])+"_"
+str(info_dict["day"])+"_"
+str(info_dict["hour"])+"_"
+str(info_dict["minute"])+"_"
+str(info_dict["second"])+"_"
+".txt"))
log.info(f" Src File path: {bin_file_list}" )
# 将bin文件存到新的目录
for bfl in bin_file_list:
fname_without_path= bfl.name
new_path = None
if fname_without_path == "pic.bin":
new_path = dst_dir.joinpath("pic.jpg" )
else:
new_path= dst_dir.joinpath( fname_without_path )
bfl.replace(new_path )
# shutil.move( bfl, new_path )
pass
# 判断目录是否为空,删除旧的目录
flist = src_dir.glob('*.*')
try:
next(flist)
log.warning(" 旧的文件夹还存在文件,不能删除,请仔细检测! ")
raise MyException("旧的文件夹还存在文件,不能删除,请仔细检测!")
except StopIteration:
src_dir.rmdir()
if DealData.check_spectrum_data(dst_dir):
log.warning(" 目录光谱数据有异常 ")
raise MyException(f"{dst_dir} 目录光谱数据有异常")
pass
DealData.calibrate_spectrum_data(dst_dir)
DealData.retrieve_data(dst_dir)
@staticmethod
def read_bin(fpath:Path):
ret = None
if fpath.exists() == False:
log.warning(f"not find file: {fpath} ")
return ret
ret = fpath.read_bytes()
# with open( fpath, 'rb') as file:
# ret = file.read()
# return ret
return ret
pass
@staticmethod
def decode_info(info: bytes) -> dict:
ret = {}
try:
temp = struct.unpack( "<BBBBBBHHHHHHIIHHHHHBBBHHIfffIIIII", info )
except Exception as e:
log.info( "decode info 有误, 收到info frame 字节有误" )
return ret
time_ = "20"+str(temp[0]) + "-" + str(temp[1]) + "-" + str(temp[2]) + " " \
+ str(temp[3]) + ":" + str(temp[4]) + ":" + str(temp[5])
ret.update({"time": time_})
ret.update({"year": temp[0]})
ret.update({"month": temp[1]})
ret.update({"day": temp[2]})
ret.update({"hour": temp[3]})
ret.update({"minute": temp[4]})
ret.update({"second": temp[5]})
ret.update({"Roll": temp[6]})
ret.update({"Pitch": temp[7]})
ret.update({"Yaw": temp[8]})
ret.update({"Hx": temp[9]})
ret.update({"Hy": temp[10]})
ret.update({"Hz": temp[11]})
ret.update({"lon": temp[12]})
ret.update({"lat": temp[13]})
ret.update({"satelite_num": temp[14]})
ret.update({"PDOP": temp[15]})
ret.update({"HDOP": temp[16]})
ret.update({"VDOP": temp[17]})
ret.update({"Temperature": temp[18]})
ret.update({"Humidity": temp[19]})
ret.update({"Battery": temp[20]})
ret.update({"ErrorCode": temp[21]})
ret.update({"Azimuth": temp[22]})
ret.update({"RunAngle": temp[23]})
ret.update({"MeasuyeGroupNum": temp[24]})
ret.update({"Tiltx": temp[25]})
ret.update({"Tilty": temp[26]})
ret.update({"Depth": temp[27]})
ret.update({"Sensor1": hex(temp[28])[2:].upper()}) # 28 27 转16进制
ret.update({"Sensor2": hex(temp[29])[2:].upper()}) # 30 29
ret.update({"Sensor3": hex(temp[30])[2:].upper()}) # 32 31
ret.update({"Measure_Num": temp[31]}) # 33
ret.update({"Measure_Interval": temp[32]}) # 34
ret.update({"Measure_Repeat": temp[33]}) # 35
return ret
pass
@staticmethod
def save_dict_to_file(info_dict:dict, fpath:Path) ->None:
temp_str = ""
for key, value in info_dict.items():
temp_str = temp_str + key + " : " + str(value) + "\n"
with open(fpath, "w+") as f:
f.write(temp_str)
ret = None
if fpath.exists() == False:
log.info(f"not find file: {fpath} ")
return ret
with open(fpath, 'rb') as file:
ret = file.read()
return ret
return ret
pass
@staticmethod
def check_spectrum_data(dst_dir:Path):
# 判断目录下是否有 0.bin ...15.bin 文件
sensor_file_list = dst_dir.glob( '*[0-9].bin' )
fname_without_ext = []
for fl in sensor_file_list:
temp = fl.stem
if not temp.isdigit:
log.warning( f" {dst_dir} 目录光谱文件的文件名 {temp} 不为数字,type:{type(temp)},请检查异常" )
return False
fname_without_ext.append( int(temp) )
if len(fname_without_ext) ==0:
log.warning( f" {dst_dir} 目录没有发现光谱文件,请检查异常" )
return False
# 排序,然后检查是否有遗漏项
fname_without_ext.sort()
for i in fname_without_ext:
if fname_without_ext[i] !=i:
log.warning( f" {dst_dir} 目录,序号{i}光谱文件的文件名没有发现,请检查异常" )
return False
return False
pass
@staticmethod
def calibrate_spectrum_data(dst_dir):
'''
用ini back 等文件获得标定后数据
'''
log.info("calibrate_spectrum_data.... ")
pass
@staticmethod
def retrieve_data(dst_dir):
'''
反演遥感反射率等参数
'''
log.info(" retrieve_data.... ")
pass
def save(self,data: dict) -> None:
log.info(f"save .....to first dir {str(data['con'])} - type:{data['type']} - num {data['num']}")
# 路径 传感器id/测量序号(唯一) -- 处理数据时候改时间
saveDir = DATA_DIR.joinpath(str(data['id']), str(data['con']) )
if saveDir.exists() == False:
saveDir.mkdir(parents=True)
if data['type'] == 0:
log.debug( f" {data['type']} - {data['num']}")
fpath = saveDir.joinpath("info.bin")
fpath.write_bytes( data['payload'] )
elif data['type'] == 1:
log.debug( f" {data['type']} - {data['num']}")
fpath = saveDir.joinpath( str(data['num'])+".bin")
fpath.write_bytes( data['payload'] )
elif data['type'] == 2:
log.debug( f" {data['type']} - {data['num']}")
fpath = saveDir.joinpath( "pic.bin" )
fpath.write_bytes( data['payload'] )
else:
pass
# class MyServer(socketserver.BaseRequestHandler):
class MyServer(socketserver.BaseRequestHandler):
def setup(self) -> None:
log.debug(f"retrieve {self.server.retrieve}",__name__, "", "" )
self.cfg =self.server.cfg
self.retrieve =self.server.retrieve
self.sk: socket.socket = self.request
self.sensor = illumination_sensor(self.request)
self.dealData = DealData()
self.begin_time = time.time()
conn_pool.append(self.client_address)
pass
def handle(self) -> None:
log.info('... connected from {}'.format(self.client_address),'__name__')
while True:
# self.request.recv 方法接收客户端发来的消息
try:
data_byte = self.sk.recv(1000)
except ConnectionResetError as e:
log.warning(
f"recv ConnectionResetError, client {self.client_address} had close .....")
break
except:
log.warning(" sk.recv(1000) exception .....")
pass
log.debug(len(data_byte))
# 客户端主动关闭连接后,会不断接收b'', 所以跳出循环,运行到程序结尾,自动关闭线程
if data_byte == b'':
log.info(
" b'' is received , maybe client {self.client_address} had close. ")
self.sk.close()
if hasattr(self, "sensor"):
del self.sensor # 销毁对象
break
else:
break
continue
else:
self.sensor.write_buf(data_byte)
data_byte = b'' # 客户端掉线后以前数据不长居内存
try:
data_ = self.sensor.decode()
except MyException as e:
log.warning(e)
break
except Exception as e:
log.warning("decode data 出现异常 ")
log.warning(e)
break
if data_ != {}:
id = data_['id']
data_code = data_['type']
log.info(f'Received From ID:{id} DATA CODE:{data_code}')
# 保存当前数据
self.dealData.save(data_)
head = data_['head']
log.info(f'Head :{head}')
# 返回head给服务器
self.sk.send(data_['head'])
# 判断是否是最后一帧, 修改目录为时间格式并
if data_["packet_con"] == data_["packet_all"]:
log.info(f'最后一帧数据已经收到并保存')
# id 为传感器测量id ,con 测量序号
self.dealData.deal(data_['id'], data_["con"], self.cfg, self.retrieve)
pass
if time.time() - self.begin_time > TIMEOUT_SECOND:
log.info(f'Received data timeout')
break
def finish(self) -> None:
# 什么时候执行 finish
# 执行handle 出现意外错误时执行 finish,或handle break时执行 finish
# 关闭连接
if self.sk:
self.sk.close()
# conn_pool 连接池销毁
if self.client_address in conn_pool:
conn_pool.remove(self.client_address)
log.info(
f"finish(): stop one socket from client {self.client_address} ")
if __name__ == '__main__':
server_ = socketserver.ThreadingTCPServer(ADDRESS, MyServer)
log.info('listening...........')
try:
server_.serve_forever()
except KeyboardInterrupt:
log.info(" Ctrl+C 退出主程序 ")
server_.server_close()
except Exception as e:
log.info(" 系统异常, 如下: \n ")
log.info(e)
# 有闪退,可以用线程将server_.serve_forever 包起来
# threading.Thread(target=server_.serve_forever).start()