#!/usr/bin/env python # coding:utf-8 ''' # 因为图片帧不是必须,必须按帧处理数据。 可能摄像头坏掉,没有传感器数据?? # 按帧处理数据,必须在每帧接收完毕判断,数据是否完整, 完整则进一步处理 !!!!!!!!!!!! # 时间作为目录 ''' import socket import socketserver from socketserver import TCPServer,ThreadingMixIn import threading # import datetime import time # import os import struct from tools.mypath import MyDir from tools.mylogger import log from pathlib import Path,PurePath from myconfig import DATA_DIR,DeviceType from awrams import AWRAMS,InfoFrame from awrams import AwramsHandle IP = "" PORT = 7887 ADDRESS = (IP, PORT) # 绑定地址 # LOGGING_LEVEL = logging.DEBUG # LOGGING_LEVEL = logging.INFO # LOGGING_LEVEL = logging.WARNING DATA_FRAME_HEAD = b'\x11\x13\x55\xaa' DATA_FRAME_TAIL = b'\xff\xd9' PIC_BEGIN_BYTES = b'\xff\xd8' # 连接超时 TIMEOUT_SECOND = 8 * 3600 # 连接线程池 conn_pool = [] # save_path = Path class MyTCPServer(TCPServer): def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, cfg=None, retrieve=None, awrams_handle:dict=None): self.cfg = cfg self.retrieve = retrieve self.awrams_handle = awrams_handle TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True ) class MyThreadingTCPServer(ThreadingMixIn, MyTCPServer): pass class MyException(Exception): def __init__(self, message="自定义异常"): self.message = message class illumination_sensor: def __init__(self, socket: socket.socket) -> None: self.__buf = b'' self.__head = { # "id" : -1, # 设备id # 'type' : -1, # 类型 信息 传感器 图片 # 'num' : -1, # 光学传感器的第几次测量 # 'con' : -1, # 总的测量序号 # 'size' : -1, # 字节大小 # 'packet_con' : -1, # 第几帧 # 'packet_all' : -1, # 总帧数 # 'head' : b'', # 帧头 # 'payload' : b'' # 具体内容 } self.state = 0 self.socket = socket self.id = 0 # self.is_data_complete = { # "info_frame": False, # 'sensor_frame': False, # 'pic_frame': False # } # self.data = { # "info_frame": {}, # 'sensor_frame': {}, # 'pic_frame': {} # } self.timeout_base = int(time.time()) def set_id(self, id) -> None: # 对应设备ID self.id = id # def set_socket(self,socket:socket.socket) -> socket.socket: # tmp = self.socket # self.socket=socket # return tmp def read_buf(self, size: int) -> bytes: if size > self.__buf.__len__(): return b'' ret = self.__buf[0:size] self.__buf = self.__buf[size:] return ret def write_buf(self, buf: bytes) -> None: id = self.id len = buf.__len__() # logging.info(f'Received ID:{id} Size:{len}') self.__buf = self.__buf+buf def get_buf_size(self) -> int: return self.__buf.__len__() def back_bytes(self, buf: bytes) -> None: self.__buf = buf+self.__buf def reset_head(self) -> None: self.__head = {} # self.__head['id'] = -1 # self.__head['type'] = -1 # self.__head['num'] = -1 # self.__head['con'] = -1 # self.__head['size'] = -1 # self.__head['packet_con'] = -1 # self.__head['packet_all'] = -1 # self.__head['head'] = b'' # self.__head['payload'] = b'' def reset_data(self) -> None: self.data['info_frame']: dict = {} self.data['sensor_frame']: dict = {} self.data['pic_frame']: dict = {} def decode(self) -> dict: if self.__head == {}: while self.get_buf_size() >= 15: if self.read_buf(1) != b'\x11': continue c = self.read_buf(1) if c != b'\x13': self.back_bytes(c) continue c = self.read_buf(1) if c != b'\x55': self.back_bytes(c) continue c = self.read_buf(1) if c != b'\xaa': self.back_bytes(c) continue head = self.read_buf(11) head_list = struct.unpack('> 4 self.__head['num'] = head_list[1] & 0x0f self.__head['con'] = head_list[2] self.__head['size'] = head_list[3] self.__head['packet_con'] = head_list[4] self.__head['packet_all'] = head_list[5] self.__head['head'] = b'\x11\x13\x55\xaa'+head break if self.__head != {}: payload = self.read_buf(self.__head['size']) if payload != b'': self.__head['payload'] = payload data = self.__head.copy() self.__head = {} self.id = data['id'] return data return {} class DealData: """ @description : 调用AWRAMS类处理数据 @param : @Returns : """ def __init__(self) -> None: self.device_id = None self.devie_type = DeviceType.AWRAMS.name self.base_folder = Path() self.old_folder = Path() self.new_folder = Path() self.info_frame = InfoFrame() self.measure_con = None pass def deal( self, pth: Path, cfg: dict, retrieve, awrams_handle ) -> None: # 取字典中的 payload log.info(f" 接收到数据开始处理数据, path : {pth} ") self.old_folder = pth self.base_folder = self.old_folder.parent deviceid = int(self.base_folder.parts[-1]) ## 索引为数字 awrams_handle:AwramsHandle = awrams_handle[deviceid] info_path = self.old_folder.joinpath("info.bin") info_byte = self.read_bin(info_path) awrams_handle.aw.data.set_info_frame(info_byte) self.new_folder = self.base_folder.joinpath( "20"+str(awrams_handle.aw.data.info_frame.year) , str(awrams_handle.aw.data.info_frame.month) , str(awrams_handle.aw.data.info_frame.day) , self.old_folder.parts[-1] ) if not self.new_folder.exists(): self.new_folder.mkdir( parents= True) self.transferToNewFolder() awrams_handle.read_one_folder_awrams_online( self.new_folder ) self.deleteOlderFolder() def setOlderFolder(self,pth:Path): ''' 解析信息帧, 获得新文件夹位置转移文件并 ''' self.old_folder = pth pass def getBaseFolder(self,grade=1): ''' 解析信息帧, 获得新文件夹位置转移文件并 ''' self.base_folder = self.old_folder for i in range(grade): try: self.base_folder = self.base_folder.parent except : assert 1>2, f">>>> Fail to Get Base Folder" assert self.base_folder.exists(), f">>>> Fail to Get Base Folder" pass def deleteOlderFolder(self, ): ''' 删除文件夹 ''' try: if self.old_folder.exists(): self.old_folder.rmdir() except OSError as e: raise Exception(e) def transferToNewFolder(self, mode=0): ''' pic.bin pic.jpg''' bin_files = self.old_folder.glob('*.bin') for bf in bin_files: if bf.name == "pic.bin": bf.replace( self.new_folder.joinpath( "pic.jpg" ) ) else: bf.replace( self.new_folder.joinpath(bf.name) ) pass # self.getNewFolderFromOldFolder() # bin_files = self.old_folder.glob('*.bin') # for bf in bin_files: # if bf.name == "pic.bin": # bf.replace( self.new_folder.joinpath( "pic.jpg" ) ) # else: # bf.replace( self.new_folder.joinpath(bf.name) ) # pass def read_bin(self,fpath: Path): assert fpath.exists(), f">>>> not find {fpath} " ret =b'' with open(fpath, 'rb') as file: ret = file.read() return ret pass # @staticmethod # def deal2(id: int, con: int) -> None: # 取字典中的 payload # log.info(f" 修改目录为时间格式,并处理 device_id: {id}, measure_con: {con} ") # src_dir = DATA_DIR.joinpath( str(id), str(con) ) # bin_file_list = src_dir.glob( '*.bin' ) # info_frame= DealData.read_bin( src_dir.joinpath('info.bin') ) # if info_frame == None: # raise MyException("处理数据时,信息帧读取数据为空") # info_dict:dict= DealData.decode_info(info_frame) # if info_dict =={}: # raise MyException("从文件读取信息帧后,解析信息帧遇到异常") # # id/year/month/day/con # dst_dir = DATA_DIR.joinpath("20"+str(info_dict["year"]), str(info_dict["month"]),str(info_dict["day"]), str(con)) # log.info( f"dst_dir: {dst_dir}") # if dst_dir.exists() == False: # dst_dir.mkdir(parents=True) # # 保存info_dict到新的目录 # DealData.save_dict_to_file(info_dict, dst_dir.joinpath( # "info_20"+str(info_dict["year"]) # +str(info_dict["month"])+"_" # +str(info_dict["day"])+"_" # +str(info_dict["hour"])+"_" # +str(info_dict["minute"])+"_" # +str(info_dict["second"])+"_" # +".txt")) # log.info(f" Src File path: {bin_file_list}" ) # # 将bin文件存到新的目录 # for bfl in bin_file_list: # fname_without_path= bfl.name # new_path = None # if fname_without_path == "pic.bin": # new_path = dst_dir.joinpath("pic.jpg" ) # else: # new_path= dst_dir.joinpath( fname_without_path ) # bfl.replace(new_path ) # # shutil.move( bfl, new_path ) # pass # # 判断目录是否为空,删除旧的目录 # flist = src_dir.glob('*.*') # try: # next(flist) # log.warning(" 旧的文件夹还存在文件,不能删除,请仔细检测! ") # raise MyException("旧的文件夹还存在文件,不能删除,请仔细检测!") # except StopIteration: # src_dir.rmdir() # if DealData.check_spectrum_data(dst_dir): # log.warning(" 目录光谱数据有异常 ") # raise MyException(f"{dst_dir} 目录光谱数据有异常") # pass # DealData.calibrate_spectrum_data(dst_dir) # DealData.retrieve_data(dst_dir) # @staticmethod # def read_bin(fpath:Path): # ret = None # if fpath.exists() == False: # log.warning(f"not find file: {fpath} ") # return ret # ret = fpath.read_bytes() # # with open( fpath, 'rb') as file: # # ret = file.read() # # return ret # return ret # pass # @staticmethod # def decode_info(info: bytes) -> dict: # ret = {} # try: # temp = struct.unpack( "None: # temp_str = "" # for key, value in info_dict.items(): # temp_str = temp_str + key + " : " + str(value) + "\n" # with open(fpath, "w+") as f: # f.write(temp_str) # ret = None # if fpath.exists() == False: # log.info(f"not find file: {fpath} ") # return ret # with open(fpath, 'rb') as file: # ret = file.read() # return ret # return ret # pass # @staticmethod # def check_spectrum_data(dst_dir:Path): # # 判断目录下是否有 0.bin ...15.bin 文件 # sensor_file_list = dst_dir.glob( '*[0-9].bin' ) # fname_without_ext = [] # for fl in sensor_file_list: # temp = fl.stem # if not temp.isdigit: # log.warning( f" {dst_dir} 目录光谱文件的文件名 {temp} 不为数字,type:{type(temp)},请检查异常" ) # return False # fname_without_ext.append( int(temp) ) # if len(fname_without_ext) ==0: # log.warning( f" {dst_dir} 目录没有发现光谱文件,请检查异常" ) # return False # # 排序,然后检查是否有遗漏项 # fname_without_ext.sort() # for i in fname_without_ext: # if fname_without_ext[i] !=i: # log.warning( f" {dst_dir} 目录,序号{i}光谱文件的文件名没有发现,请检查异常" ) # return False # return False # pass # @staticmethod # def calibrate_spectrum_data(dst_dir): # ''' # 用ini back 等文件获得标定后数据 # ''' # log.info("calibrate_spectrum_data.... ") # pass # @staticmethod # def retrieve_data(dst_dir): # ''' # 反演遥感反射率等参数 # ''' # log.info(" retrieve_data.... ") # pass def save(self,data: dict) -> None: log.info(f"save .....to first dir {str(data['con'])} - type:{data['type']} - num {data['num']}") # 路径 传感器id/测量序号(唯一) -- 处理数据时候改时间 saveDir = DATA_DIR.joinpath(str(data['id']), str(data['con']) ) if saveDir.exists() == False: saveDir.mkdir(parents=True) if data['type'] == 0: log.debug( f" {data['type']} - {data['num']}") fpath = saveDir.joinpath("info.bin") fpath.write_bytes( data['payload'] ) elif data['type'] == 1: log.debug( f" {data['type']} - {data['num']}") fpath = saveDir.joinpath( str(data['num'])+".bin") fpath.write_bytes( data['payload'] ) elif data['type'] == 2: log.debug( f" {data['type']} - {data['num']}") fpath = saveDir.joinpath( "pic.bin" ) fpath.write_bytes( data['payload'] ) else: pass # class MyServer(socketserver.BaseRequestHandler): class MyServer(socketserver.BaseRequestHandler): def setup(self) -> None: log.debug(f"retrieve {self.server.retrieve}",__name__, "", "" ) self.cfg =self.server.cfg self.retrieve =self.server.retrieve self.awrams_handle =self.server.awrams_handle self.sk: socket.socket = self.request self.sensor = illumination_sensor(self.request) self.dealData = DealData() self.begin_time = time.time() # log.warning(f"client_address {self.client_address}",__name__, "", "" ) pass def handle(self) -> None: log.info( f'... connected from {self.client_address}' ,'__name__') while True: # self.request.recv 方法接收客户端发来的消息 try: data_byte = self.sk.recv(1000) except ConnectionResetError as e: log.warning( f"recv ConnectionResetError, client {self.client_address} had close .....") break except: log.warning(" sk.recv(1000) exception .....") pass log.debug(len(data_byte)) # 客户端主动关闭连接后,会不断接收b'', 所以跳出循环,运行到程序结尾,自动关闭线程 if data_byte == b'': log.info( " b'' is received , maybe client {self.client_address} had close. ") self.sk.close() if hasattr(self, "sensor"): del self.sensor # 销毁对象 break else: break continue else: self.sensor.write_buf(data_byte) data_byte = b'' # 客户端掉线后以前数据不长居内存 try: data_ = self.sensor.decode() except MyException as e: log.warning(e) break except Exception as e: log.warning("decode data 出现异常 ") log.warning(e) break if data_ != {}: id = data_['id'] data_code = data_['type'] log.info(f'Received From ID:{id} DATA CODE:{data_code}') # 保存当前数据 self.dealData.save(data_) head = data_['head'] log.info(f'Head :{head}') # 返回head给服务器 self.sk.send(data_['head']) # 判断是否是最后一帧, 修改目录为时间格式并 if data_["packet_con"] == data_["packet_all"]: log.info(f'最后一帧数据已经收到并保存') # id 为传感器测量id ,con 测量序号 pth = DATA_DIR.joinpath(str(data_['id']),str(data_['con'])) self.dealData.deal(pth, self.cfg, self.retrieve, self.awrams_handle) pass if time.time() - self.begin_time > TIMEOUT_SECOND: log.info(f'Received data timeout') break def finish(self) -> None: # 什么时候执行 finish # 执行handle 出现意外错误时执行 finish,或handle break时执行 finish # 关闭连接 if self.sk: self.sk.close() # conn_pool 连接池销毁 if self.client_address in conn_pool: conn_pool.remove(self.client_address) log.info( f"finish(): stop one socket from client {self.client_address} ",__name__, "MyServer","finish") if __name__ == '__main__': server_ = socketserver.ThreadingTCPServer(ADDRESS, MyServer) log.info('listening...........') try: server_.serve_forever() except KeyboardInterrupt: log.info(" Ctrl+C 退出主程序 ") server_.server_close() except Exception as e: log.info(" 系统异常, 如下: \n ") log.info(e) # 有闪退,可以用线程将server_.serve_forever 包起来 # threading.Thread(target=server_.serve_forever).start()