You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
526 lines
18 KiB
526 lines
18 KiB
#!/usr/bin/env python
|
|
# coding:utf-8
|
|
|
|
'''
|
|
# 因为图片帧不是必须,必须按帧处理数据。 可能摄像头坏掉,没有传感器数据??
|
|
# 按帧处理数据,必须在每帧接收完毕判断,数据是否完整, 完整则进一步处理 !!!!!!!!!!!!
|
|
# 时间作为目录
|
|
'''
|
|
import socket
|
|
import socketserver
|
|
from socketserver import TCPServer,ThreadingMixIn
|
|
import threading
|
|
# import datetime
|
|
import time
|
|
# import os
|
|
import struct
|
|
from tools.mypath import MyDir
|
|
from tools.mylogger import log
|
|
from pathlib import Path,PurePath
|
|
from myconfig import DATA_DIR,DeviceType
|
|
from awrams import AWRAMS
|
|
|
|
|
|
IP = ""
|
|
PORT = 7887
|
|
ADDRESS = (IP, PORT) # 绑定地址
|
|
|
|
# LOGGING_LEVEL = logging.DEBUG
|
|
# LOGGING_LEVEL = logging.INFO
|
|
# LOGGING_LEVEL = logging.WARNING
|
|
|
|
DATA_FRAME_HEAD = b'\x11\x13\x55\xaa'
|
|
DATA_FRAME_TAIL = b'\xff\xd9'
|
|
PIC_BEGIN_BYTES = b'\xff\xd8'
|
|
|
|
# 连接超时
|
|
TIMEOUT_SECOND = 8 * 3600
|
|
|
|
# 连接线程池
|
|
conn_pool = []
|
|
|
|
# save_path = Path
|
|
|
|
class MyTCPServer(TCPServer):
|
|
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, cfg=None, retrieve=None):
|
|
self.cfg = cfg
|
|
self.retrieve = retrieve
|
|
TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True )
|
|
|
|
class MyThreadingTCPServer(ThreadingMixIn, MyTCPServer): pass
|
|
|
|
class MyException(Exception):
|
|
def __init__(self, message="自定义异常"):
|
|
self.message = message
|
|
|
|
|
|
class illumination_sensor:
|
|
def __init__(self, socket: socket.socket) -> None:
|
|
self.__buf = b''
|
|
self.__head = {
|
|
# "id" : -1, # 设备id
|
|
# 'type' : -1, # 类型 信息 传感器 图片
|
|
# 'num' : -1, # 光学传感器的第几次测量
|
|
# 'con' : -1, # 总的测量序号
|
|
# 'size' : -1, # 字节大小
|
|
# 'packet_con' : -1, # 第几帧
|
|
# 'packet_all' : -1, # 总帧数
|
|
# 'head' : b'', # 帧头
|
|
# 'payload' : b'' # 具体内容
|
|
}
|
|
self.state = 0
|
|
self.socket = socket
|
|
self.id = 0
|
|
# self.is_data_complete = {
|
|
# "info_frame": False,
|
|
# 'sensor_frame': False,
|
|
# 'pic_frame': False
|
|
# }
|
|
# self.data = {
|
|
# "info_frame": {},
|
|
# 'sensor_frame': {},
|
|
# 'pic_frame': {}
|
|
# }
|
|
self.timeout_base = int(time.time())
|
|
|
|
def set_id(self, id) -> None: # 对应设备ID
|
|
self.id = id
|
|
|
|
# def set_socket(self,socket:socket.socket) -> socket.socket:
|
|
# tmp = self.socket
|
|
# self.socket=socket
|
|
# return tmp
|
|
|
|
def read_buf(self, size: int) -> bytes:
|
|
if size > self.__buf.__len__():
|
|
return b''
|
|
ret = self.__buf[0:size]
|
|
self.__buf = self.__buf[size:]
|
|
return ret
|
|
|
|
def write_buf(self, buf: bytes) -> None:
|
|
id = self.id
|
|
len = buf.__len__()
|
|
# logging.info(f'Received ID:{id} Size:{len}')
|
|
self.__buf = self.__buf+buf
|
|
|
|
def get_buf_size(self) -> int:
|
|
return self.__buf.__len__()
|
|
|
|
def back_bytes(self, buf: bytes) -> None:
|
|
self.__buf = buf+self.__buf
|
|
|
|
def reset_head(self) -> None:
|
|
self.__head = {}
|
|
# self.__head['id'] = -1
|
|
# self.__head['type'] = -1
|
|
# self.__head['num'] = -1
|
|
# self.__head['con'] = -1
|
|
# self.__head['size'] = -1
|
|
# self.__head['packet_con'] = -1
|
|
# self.__head['packet_all'] = -1
|
|
# self.__head['head'] = b''
|
|
# self.__head['payload'] = b''
|
|
|
|
def reset_data(self) -> None:
|
|
self.data['info_frame']: dict = {}
|
|
self.data['sensor_frame']: dict = {}
|
|
self.data['pic_frame']: dict = {}
|
|
|
|
def decode(self) -> dict:
|
|
if self.__head == {}:
|
|
while self.get_buf_size() >= 15:
|
|
if self.read_buf(1) != b'\x11':
|
|
continue
|
|
c = self.read_buf(1)
|
|
if c != b'\x13':
|
|
self.back_bytes(c)
|
|
continue
|
|
c = self.read_buf(1)
|
|
if c != b'\x55':
|
|
self.back_bytes(c)
|
|
continue
|
|
c = self.read_buf(1)
|
|
if c != b'\xaa':
|
|
self.back_bytes(c)
|
|
continue
|
|
head = self.read_buf(11)
|
|
head_list = struct.unpack('<HBIHBB', head)
|
|
self.__head['id'] = head_list[0]
|
|
self.__head['type'] = head_list[1] >> 4
|
|
self.__head['num'] = head_list[1] & 0x0f
|
|
self.__head['con'] = head_list[2]
|
|
self.__head['size'] = head_list[3]
|
|
self.__head['packet_con'] = head_list[4]
|
|
self.__head['packet_all'] = head_list[5]
|
|
self.__head['head'] = b'\x11\x13\x55\xaa'+head
|
|
break
|
|
|
|
if self.__head != {}:
|
|
payload = self.read_buf(self.__head['size'])
|
|
if payload != b'':
|
|
self.__head['payload'] = payload
|
|
data = self.__head.copy()
|
|
self.__head = {}
|
|
self.id = data['id']
|
|
return data
|
|
|
|
return {}
|
|
|
|
class DealData:
|
|
"""
|
|
@description : 调用AWRAMS类处理数据
|
|
@param :
|
|
@Returns :
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.device_id = None
|
|
self.devie_type = DeviceType.AWRAMS.name
|
|
self.measure_id = None
|
|
self.cfg = {}
|
|
self.awrams = None
|
|
pass
|
|
|
|
def deal(self, id: int, con: int, cfg:dict , retrieve) -> None: # 取字典中的 payload
|
|
log.info(f" 接收到数据开始处理数据 device_id {id} ")
|
|
self.device_id = id
|
|
self.measure_id = con
|
|
if self.device_id is None:
|
|
self.device_id = id
|
|
if self.cfg == {}:
|
|
self.cfg = cfg
|
|
# self.cfg = cfg.get(self.device_id)
|
|
if self.awrams is None:
|
|
self.awrams = AWRAMS() ##处理数据
|
|
self.awrams.setSyscfg(self.cfg)
|
|
self.awrams.setRetrieve(retrieve)
|
|
self.awrams.setDeviceID(self.device_id)
|
|
self.awrams.setMeasureID(self.measure_id)
|
|
path_tuple = ( "data", str(id), str(con) )
|
|
self.awrams.setOldFolder( path_tuple )
|
|
self.awrams.getInfoDict( )
|
|
self.awrams.transferFromOldFolder()
|
|
self.awrams.deleteOldFolder()
|
|
self.awrams.dealOneMeasurement_Online()
|
|
log.info(f" Complete Dealing one group.")
|
|
# self.awrams.readOneFolder( )
|
|
|
|
|
|
# @staticmethod
|
|
# def deal2(id: int, con: int) -> None: # 取字典中的 payload
|
|
# log.info(f" 修改目录为时间格式,并处理 device_id: {id}, measure_con: {con} ")
|
|
# src_dir = DATA_DIR.joinpath( str(id), str(con) )
|
|
# bin_file_list = src_dir.glob( '*.bin' )
|
|
# info_frame= DealData.read_bin( src_dir.joinpath('info.bin') )
|
|
# if info_frame == None:
|
|
# raise MyException("处理数据时,信息帧读取数据为空")
|
|
|
|
# info_dict:dict= DealData.decode_info(info_frame)
|
|
# if info_dict =={}:
|
|
# raise MyException("从文件读取信息帧后,解析信息帧遇到异常")
|
|
|
|
# # id/year/month/day/con
|
|
# dst_dir = DATA_DIR.joinpath("20"+str(info_dict["year"]), str(info_dict["month"]),str(info_dict["day"]), str(con))
|
|
# log.info( f"dst_dir: {dst_dir}")
|
|
# if dst_dir.exists() == False:
|
|
# dst_dir.mkdir(parents=True)
|
|
|
|
# # 保存info_dict到新的目录
|
|
# DealData.save_dict_to_file(info_dict, dst_dir.joinpath(
|
|
# "info_20"+str(info_dict["year"])
|
|
# +str(info_dict["month"])+"_"
|
|
# +str(info_dict["day"])+"_"
|
|
# +str(info_dict["hour"])+"_"
|
|
# +str(info_dict["minute"])+"_"
|
|
# +str(info_dict["second"])+"_"
|
|
# +".txt"))
|
|
|
|
# log.info(f" Src File path: {bin_file_list}" )
|
|
# # 将bin文件存到新的目录
|
|
# for bfl in bin_file_list:
|
|
# fname_without_path= bfl.name
|
|
# new_path = None
|
|
# if fname_without_path == "pic.bin":
|
|
# new_path = dst_dir.joinpath("pic.jpg" )
|
|
# else:
|
|
# new_path= dst_dir.joinpath( fname_without_path )
|
|
# bfl.replace(new_path )
|
|
# # shutil.move( bfl, new_path )
|
|
# pass
|
|
|
|
# # 判断目录是否为空,删除旧的目录
|
|
# flist = src_dir.glob('*.*')
|
|
# try:
|
|
# next(flist)
|
|
# log.warning(" 旧的文件夹还存在文件,不能删除,请仔细检测! ")
|
|
# raise MyException("旧的文件夹还存在文件,不能删除,请仔细检测!")
|
|
# except StopIteration:
|
|
# src_dir.rmdir()
|
|
|
|
# if DealData.check_spectrum_data(dst_dir):
|
|
# log.warning(" 目录光谱数据有异常 ")
|
|
# raise MyException(f"{dst_dir} 目录光谱数据有异常")
|
|
# pass
|
|
|
|
# DealData.calibrate_spectrum_data(dst_dir)
|
|
|
|
# DealData.retrieve_data(dst_dir)
|
|
|
|
@staticmethod
|
|
def read_bin(fpath:Path):
|
|
ret = None
|
|
if fpath.exists() == False:
|
|
log.warning(f"not find file: {fpath} ")
|
|
return ret
|
|
ret = fpath.read_bytes()
|
|
# with open( fpath, 'rb') as file:
|
|
# ret = file.read()
|
|
# return ret
|
|
return ret
|
|
pass
|
|
|
|
@staticmethod
|
|
def decode_info(info: bytes) -> dict:
|
|
ret = {}
|
|
try:
|
|
temp = struct.unpack( "<BBBBBBHHHHHHIIHHHHHBBBHHIfffIIIII", info )
|
|
except Exception as e:
|
|
log.info( "decode info 有误, 收到info frame 字节有误" )
|
|
return ret
|
|
time_ = "20"+f"{temp[0]:02d}" + "-" + f"{temp[1]:02d}" + "-" + f"{temp[2]:02d}" + " " \
|
|
+ f"{temp[3]:02d}" + ":" + f"{temp[4]:02d}" + ":" + f"{temp[5]:02d}"
|
|
ret.update({"time": time_})
|
|
ret.update({"year": temp[0]})
|
|
ret.update({"month": temp[1]})
|
|
ret.update({"day": temp[2]})
|
|
ret.update({"hour": temp[3]})
|
|
ret.update({"minute": temp[4]})
|
|
ret.update({"second": temp[5]})
|
|
ret.update({"Roll": temp[6]})
|
|
ret.update({"Pitch": temp[7]})
|
|
ret.update({"Yaw": temp[8]})
|
|
ret.update({"Hx": temp[9]})
|
|
ret.update({"Hy": temp[10]})
|
|
ret.update({"Hz": temp[11]})
|
|
ret.update({"lon": temp[12]})
|
|
ret.update({"lat": temp[13]})
|
|
ret.update({"satelite_num": temp[14]})
|
|
ret.update({"PDOP": temp[15]})
|
|
ret.update({"HDOP": temp[16]})
|
|
ret.update({"VDOP": temp[17]})
|
|
ret.update({"Temperature": temp[18]})
|
|
ret.update({"Humidity": temp[19]})
|
|
ret.update({"Battery": temp[20]})
|
|
ret.update({"ErrorCode": temp[21]})
|
|
ret.update({"Azimuth": temp[22]})
|
|
ret.update({"RunAngle": temp[23]})
|
|
ret.update({"MeasuyeGroupNum": temp[24]})
|
|
ret.update({"Tiltx": temp[25]})
|
|
ret.update({"Tilty": temp[26]})
|
|
ret.update({"Depth": temp[27]})
|
|
|
|
ret.update({"Sensor1": hex(temp[28])[2:].upper()}) # 28 27 转16进制
|
|
ret.update({"Sensor2": hex(temp[29])[2:].upper()}) # 30 29
|
|
ret.update({"Sensor3": hex(temp[30])[2:].upper()}) # 32 31
|
|
ret.update({"Measure_Num": temp[31]}) # 33
|
|
ret.update({"Measure_Interval": temp[32]}) # 34
|
|
ret.update({"Measure_Repeat": temp[33]}) # 35
|
|
return ret
|
|
pass
|
|
|
|
@staticmethod
|
|
def save_dict_to_file(info_dict:dict, fpath:Path) ->None:
|
|
temp_str = ""
|
|
for key, value in info_dict.items():
|
|
temp_str = temp_str + key + " : " + str(value) + "\n"
|
|
with open(fpath, "w+") as f:
|
|
f.write(temp_str)
|
|
|
|
ret = None
|
|
if fpath.exists() == False:
|
|
log.info(f"not find file: {fpath} ")
|
|
return ret
|
|
with open(fpath, 'rb') as file:
|
|
ret = file.read()
|
|
return ret
|
|
return ret
|
|
pass
|
|
|
|
@staticmethod
|
|
def check_spectrum_data(dst_dir:Path):
|
|
# 判断目录下是否有 0.bin ...15.bin 文件
|
|
sensor_file_list = dst_dir.glob( '*[0-9].bin' )
|
|
fname_without_ext = []
|
|
for fl in sensor_file_list:
|
|
temp = fl.stem
|
|
if not temp.isdigit:
|
|
log.warning( f" {dst_dir} 目录光谱文件的文件名 {temp} 不为数字,type:{type(temp)},请检查异常" )
|
|
return False
|
|
fname_without_ext.append( int(temp) )
|
|
|
|
if len(fname_without_ext) ==0:
|
|
log.warning( f" {dst_dir} 目录没有发现光谱文件,请检查异常" )
|
|
return False
|
|
|
|
# 排序,然后检查是否有遗漏项
|
|
fname_without_ext.sort()
|
|
for i in fname_without_ext:
|
|
if fname_without_ext[i] !=i:
|
|
log.warning( f" {dst_dir} 目录,序号{i}光谱文件的文件名没有发现,请检查异常" )
|
|
return False
|
|
|
|
return False
|
|
pass
|
|
|
|
@staticmethod
|
|
def calibrate_spectrum_data(dst_dir):
|
|
'''
|
|
用ini back 等文件获得标定后数据
|
|
'''
|
|
log.info("calibrate_spectrum_data.... ")
|
|
pass
|
|
|
|
@staticmethod
|
|
def retrieve_data(dst_dir):
|
|
'''
|
|
反演遥感反射率等参数
|
|
'''
|
|
log.info(" retrieve_data.... ")
|
|
pass
|
|
|
|
def save(self,data: dict) -> None:
|
|
log.info(f"save .....to first dir {str(data['con'])} - type:{data['type']} - num {data['num']}")
|
|
# 路径 传感器id/测量序号(唯一) -- 处理数据时候改时间
|
|
saveDir = DATA_DIR.joinpath(str(data['id']), str(data['con']) )
|
|
if saveDir.exists() == False:
|
|
saveDir.mkdir(parents=True)
|
|
|
|
if data['type'] == 0:
|
|
log.debug( f" {data['type']} - {data['num']}")
|
|
fpath = saveDir.joinpath("info.bin")
|
|
fpath.write_bytes( data['payload'] )
|
|
elif data['type'] == 1:
|
|
log.debug( f" {data['type']} - {data['num']}")
|
|
fpath = saveDir.joinpath( str(data['num'])+".bin")
|
|
fpath.write_bytes( data['payload'] )
|
|
elif data['type'] == 2:
|
|
log.debug( f" {data['type']} - {data['num']}")
|
|
fpath = saveDir.joinpath( "pic.bin" )
|
|
fpath.write_bytes( data['payload'] )
|
|
else:
|
|
pass
|
|
|
|
|
|
# class MyServer(socketserver.BaseRequestHandler):
|
|
class MyServer(socketserver.BaseRequestHandler):
|
|
|
|
def setup(self) -> None:
|
|
log.debug(f"retrieve {self.server.retrieve}",__name__, "", "" )
|
|
self.cfg =self.server.cfg
|
|
self.retrieve =self.server.retrieve
|
|
self.sk: socket.socket = self.request
|
|
self.sensor = illumination_sensor(self.request)
|
|
self.dealData = DealData()
|
|
self.begin_time = time.time()
|
|
conn_pool.append(self.client_address)
|
|
pass
|
|
|
|
def handle(self) -> None:
|
|
|
|
log.info('... connected from {}'.format(self.client_address),'__name__')
|
|
while True:
|
|
# self.request.recv 方法接收客户端发来的消息
|
|
try:
|
|
data_byte = self.sk.recv(1000)
|
|
except ConnectionResetError as e:
|
|
log.warning(
|
|
f"recv ConnectionResetError, client {self.client_address} had close .....")
|
|
break
|
|
except:
|
|
log.warning(" sk.recv(1000) exception .....")
|
|
pass
|
|
|
|
log.debug(len(data_byte))
|
|
|
|
# 客户端主动关闭连接后,会不断接收b'', 所以跳出循环,运行到程序结尾,自动关闭线程
|
|
if data_byte == b'':
|
|
log.info(
|
|
" b'' is received , maybe client {self.client_address} had close. ")
|
|
self.sk.close()
|
|
if hasattr(self, "sensor"):
|
|
del self.sensor # 销毁对象
|
|
break
|
|
else:
|
|
break
|
|
continue
|
|
else:
|
|
self.sensor.write_buf(data_byte)
|
|
|
|
data_byte = b'' # 客户端掉线后以前数据不长居内存
|
|
|
|
try:
|
|
data_ = self.sensor.decode()
|
|
except MyException as e:
|
|
log.warning(e)
|
|
break
|
|
except Exception as e:
|
|
log.warning("decode data 出现异常 ")
|
|
log.warning(e)
|
|
break
|
|
|
|
if data_ != {}:
|
|
id = data_['id']
|
|
data_code = data_['type']
|
|
log.info(f'Received From ID:{id} DATA CODE:{data_code}')
|
|
|
|
# 保存当前数据
|
|
self.dealData.save(data_)
|
|
head = data_['head']
|
|
log.info(f'Head :{head}')
|
|
|
|
# 返回head给服务器
|
|
self.sk.send(data_['head'])
|
|
|
|
# 判断是否是最后一帧, 修改目录为时间格式并
|
|
if data_["packet_con"] == data_["packet_all"]:
|
|
log.info(f'最后一帧数据已经收到并保存')
|
|
# id 为传感器测量id ,con 测量序号
|
|
self.dealData.deal(data_['id'], data_["con"], self.cfg, self.retrieve)
|
|
pass
|
|
|
|
if time.time() - self.begin_time > TIMEOUT_SECOND:
|
|
log.info(f'Received data timeout')
|
|
break
|
|
|
|
def finish(self) -> None:
|
|
# 什么时候执行 finish
|
|
# 执行handle 出现意外错误时执行 finish,或handle break时执行 finish
|
|
# 关闭连接
|
|
if self.sk:
|
|
self.sk.close()
|
|
|
|
# conn_pool 连接池销毁
|
|
if self.client_address in conn_pool:
|
|
conn_pool.remove(self.client_address)
|
|
|
|
log.info(
|
|
f"finish(): stop one socket from client {self.client_address} ")
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
server_ = socketserver.ThreadingTCPServer(ADDRESS, MyServer)
|
|
log.info('listening...........')
|
|
|
|
try:
|
|
server_.serve_forever()
|
|
except KeyboardInterrupt:
|
|
log.info(" Ctrl+C 退出主程序 ")
|
|
server_.server_close()
|
|
except Exception as e:
|
|
log.info(" 系统异常, 如下: \n ")
|
|
log.info(e)
|
|
|
|
# 有闪退,可以用线程将server_.serve_forever 包起来
|
|
# threading.Thread(target=server_.serve_forever).start()
|
|
|