#! python3 # -*- encoding: utf-8 -*- ''' @File : AWRAMS.py @Time : 2023/02/23 14:24:45 @Author : Jim @ Yiwin @Version : 1.0 @Contact : jim@yi-win.com @Desp : 将接收的文件转到新文件夹,并解析数据 调用方法: a = AWRAMS(cfg,retrieve) src_dir = DATA_DIR.joinpath( str(id), str(con) ) a.readOneFolder(src_dir) ''' import struct,time from pathlib import Path, PurePath from typing import List from enum import Enum import numpy as np from myexception import MyException from mylogger import log from myconfig import ROWFACTOR,SAVE_EXT_NAME,TOKEN,SEPARATOR,NEWLINE,DATA_DIR,OUTPUT_DIR from myconfig import RamsesAWRAMS,RamsesSURFACE,RamsesPROFILE,DeviceType from Ramses import Ramses from saveList2File import SaveList2File from mytime import MyTime # class RamsesFunc(Enum): # Lsky = 1 # Lwater = 2 # Esky = 3 class HandHeldBuf: def __init__(self,) -> None: self.__buf = b'' self.__head = {} self.__begin_sign = b'\x23' self.__end_sign = b'\x0D' self.data_ip = b'' self.measure_group = { "Lsky": b'', "Esky": b'', "Lwater": b'', } self.one_group_data= b'' self.state = 0 def readFile2Buf(self, fpath) -> None: with open(fpath,"rb") as f: self.__buf = f.read() pass pass def read_buf(self, size: int) -> bytes: if size > self.__buf.__len__(): return b'' ret = self.__buf[0:size] self.__buf = self.__buf[size:] return ret def write_buf(self, buf: bytes) -> None: len = buf.__len__() # logging.info(f'Received ID:{id} Size:{len}') self.__buf = self.__buf+buf def get_buf_size(self) -> int: return self.__buf.__len__() def back_bytes(self, buf: bytes) -> None: self.__buf = buf+self.__buf def reset_head(self) -> None: self.__head = {} def reset_buf(self) -> None: self.__buf = b'' def getResult(self) -> str: return self.res def resetMeasureGroup(self) -> None: self.measure_group['Lsky'] = b'' self.measure_group['Esky'] = b'' self.measure_group['Lwater'] = b'' def getMeasureGroup(self) -> dict: return self.measure_group def decode_handheld(self) -> bool: '''以26个字节开始,一般26个00 ,然后23... 07.... ''' if self.get_buf_size() < 1754: self.__buf = b'' return False self.data_ip = self.read_buf(26) Lsky = self.read_buf(576) Esky = self.read_buf(576) Lwater = self.read_buf(576) self.measure_group['Lsky'] = self.deal_576_to_512(Lsky) self.measure_group['Esky'] = self.deal_576_to_512(Esky) self.measure_group['Lwater'] = self.deal_576_to_512(Lwater) if self.measure_group['Lsky'] == b'' \ or self.measure_group['Esky'] == b'' \ or self.measure_group['Lwater'] == b'' : return False return True def decode_one_group_handheld(self) -> bool: '''以26个字节开始,一般26个00 ,然后23... 07.... ''' if self.get_buf_size() < 1754: self.__buf = b'' return False self.data_ip = self.read_buf(26) self.one_group_data = self.read_buf(1728) return True def deal_576_to_512(self,data:bytes) -> bytes: ''' 576字节校验,拆分成字典 23... 07.... ,然后254*2''' ret = {} index = 72 for i in range(8): temp = data[i*index : i*index+index] # print( temp.hex()) if temp[0] != 35 and temp[0]<8 and temp>0 : return b'' pass ret.update( { temp[4]: temp} ) if len(ret) != 8: return b'' ret_byte = ret[7][7:71]+ ret[6][7:71]+ ret[5][7:71]+ ret[4][7:71] \ + ret[3][7:71]+ ret[2][7:71]+ ret[1][7:71]+ ret[0][7:71] return ret_byte def decode(self) -> str: ret = '' temp_buf = b'' token = ";" if TOKEN: token = TOKEN if self.state == 0: while self.get_buf_size() >= 1: if self.read_buf(1) != self.__end_sign: continue self.state = 1 break if self.state == 1: while self.get_buf_size() >= 1: buf = self.read_buf(1) if buf != self.__end_sign: temp_buf += buf if buf == self.__end_sign: ret = temp_buf[0:20].decode( 'utf-8').strip(" ") + token + temp_buf[-9:-1].decode('utf-8').strip(" ") log.info(f"decode : {ret}") temp_buf = b'' self.back_bytes(temp_buf) # 写回临时buf到 self.res = ret return ret pass class AWRAMS(object): def __init__(self, ): """ @description : 手持数据初始化 """ self.device_type = DeviceType.AWRAMS.name self.device_enum = None self.device_id = {} self.syscfg = {} self.retrieve ={} self.error_result=[] self.base_purepath = PurePath() self.base_path = Path() self.current_path = Path() self.data_path = DATA_DIR self.output_path = OUTPUT_DIR self.old_folder = Path() self.new_folder = Path() self.info_path_fname:Path = Path() self.sensor_path_fname:Path = Path() self.filelist = [] # 包含了全部路径信息 self.intensity_before_avg = None self.intensity_after_avg = {} # 最终结果{lsky: esky: lwater: } self.intensity_after_interpo = {} # 最终结果{lsky: esky: lwater: } self.one_group_result = {} # 三个传感器数据,平均后的 self.wavelength = np.array([]) self.real_wavelength = {} # 最终结果{lsky: esky: lwater: } self.current_filepath = '' # 不含后缀 self.current_measure_time = None # 当前测量时间 self.measurement_interval = 0 self.measurement_repeat = 1 self.sl2f = SaveList2File() self.my_time = MyTime() self.hhb = HandHeldBuf() self.ramses = Ramses() self.info_dict = {} self.res = {} # 最终结果{lsky: esky: lwater: Lw: Rs:} # 方法 self.setDeviceEnum() pass def setDeviceEnum(self, ): if self.device_type == DeviceType.SURFACE.name: self.device_enum = RamsesSURFACE if self.device_type == DeviceType.AWRAMS.name: self.device_enum = RamsesAWRAMS if self.device_type == DeviceType.PROFILE.name: self.device_enum = RamsesPROFILE def setOldFolder(self, tuple_path:tuple): self.old_folder = self.base_path.joinpath( *tuple_path ) def getInfoDict(self, ): info_path = self.old_folder.joinpath("info.bin") hexbin = self.read_bin(info_path) self.info_dict = self.decode_info(hexbin) self.new_folder = self.getNewFolderFromOldFolder() def transferFromOldFolder(self, ): log.info( f"transferFromOldFolder: ", __name__ ) self.getNewFolderFromOldFolder() bin_files = self.old_folder.glob('*.bin') for bf in bin_files: if bf.name == "pic.bin": bf.replace( self.new_folder.joinpath( "pic.jpg" ) ) else: bf.replace( self.new_folder.joinpath(bf.name) ) pass def deleteOldFolder(self, ): log.info( f"deleteOldFolder: ", __name__ ) self.sl2f.setBasePath(self.old_folder) self.sl2f.deleteDir() self.old_folder = DATA_DIR def setHandHeldBuf( self ) : buf = b'' bin_files = self.new_folder.glob( '*.bin' ) tmp_dict = { } for bf in bin_files: if bf.stem.isdigit(): tmp_dict.update({ int(bf.stem): bf } ) len_bin = len(tmp_dict) if len_bin==0 : self.hhb.write_buf(b'') return False bin_key = tmp_dict.keys() for i in range(len_bin) : if i not in bin_key: log.warning(f" could find {i}.bin file") else: tmp_buf = self.read_bin(tmp_dict[i]) buf += tmp_buf self.hhb.write_buf(buf) pass def getNewFolderFromOldFolder( self, ) -> Path: # 服务器上转移后的模式 tmp_folder = self.old_folder.parent.parent path_tuple = ("20"+str(self.info_dict['year']) ,str(self.info_dict['month']) ,str(self.info_dict['day']) ,self.old_folder.parts[-1] ) self.new_folder =tmp_folder.joinpath(*path_tuple) self.sl2f.setBasePath(self.new_folder) self.sl2f.newPathIfNot() pass def getSensorPathFromInfoPath( self, info_path:Path, mode:int=0) -> Path: # 服务器上转移后的模式 if mode == 1: sensor_path = self.base_path.joinpath( info_path.parent, "sensor.bin" ) # SD卡上未拷贝的模式 sensor_path = self.base_path.joinpath( "data", info_path.parts[1][:4] + "_" + info_path.parts[1][-2:], info_path.parts[2], "sensor", info_path.name ) return sensor_path pass def getOutputPathFromSensorPath(self,sensor_path:Path, mode:int=0) -> Path: output_path = self.base_path.joinpath( "data", "output", sensor_path.parts[1][:4] + "_" + sensor_path.parts[1][-2:] \ + "_" +sensor_path.parts[2] + "_" +sensor_path.name ) return output_path pass def getCurrentMeasureTimeFromPath(self,fpath:Path, mode:int=0) -> str: ret = fpath.parts[1][:4]+"-"+fpath.parts[1][-2:]+"-"+fpath.parts[2]+" " hour = fpath.name[0:2] minute = fpath.name[3:5] second = fpath.name[6:8] ret = ret + hour + ":" + minute + ":" + second return ret pass def getDataFileList(self, mode:int = 0): ''' 获得成对的info sensor 文件 [目录名,文件名,年月日,时间, measure_id] ''' # ret = [] fs = None if mode == 1: fs = self.data_path.glob( "*/*/*/*/info.bin" ) fs = self.data_path.glob( "*/*/info/*" ) for f in fs: error_file = {} if f.stat().st_size==0: error_file.update( {"path": f } ) error_file.update( {"error":"info file size is zero"} ) self.error_result.append(error_file) continue # self.info_path_fname = f sensor_path = self.getSensorPathFromInfoPath(f) # sensor_path = Path(sensor_purepath) # sensor 文件不存在 if not sensor_path.exists(): error_file.update( {"path":f} ) error_file.update( {"error":"cannot find the sensor file "} ) self.error_result.append(error_file) continue # sensor文件大小为0 if sensor_path.stat().st_size==0: error_file.update( {"path":f} ) error_file.update( {"error":"the file size of the sensor is zero' "} ) self.error_result.append(error_file) continue self.setFilelist(f,sensor_path,mode=mode) pass def setFilelist(self, info_path:Path, sensor_path:Path, mode:int = 0): temp = {} temp.update( {"info_path" : info_path } ) temp.update( {"name" : info_path.name} ) temp.update( {"parent" : info_path.parent} ) if mode==1: # 服务器转移后目录 temp.update( {"year" : info_path.parts[-5] } ) temp.update( {"month" :info_path.parts[-4] } ) temp.update( {"day" :info_path.parts[-3] } ) temp.update( {"year" : info_path.parts[1][:4] } ) temp.update( {"month" :info_path.parts[1][-2:] } ) temp.update( {"day" :info_path.parts[2] } ) # temp.update( {"hour" :info_path.parts[-1][:2] } ) # temp.update( {"minute" :info_path.parts[-1][3:5] } ) # temp.update( {"second" :info_path.parts[-1][6:8] } ) temp.update( { "sensor_path" :sensor_path } ) self.filelist.append(temp) def getDataFileList_SD(self, ): ''' 获得成对的info sensor 文件 [目录名,文件名,年月日,时间] ''' # ret = [] fs = self.data_path.glob( "*/*/info/*" ) for f in fs: error_file = {} if f.stat().st_size==0: error_file.update( {"path": f.name } ) error_file.update( {"error":"info file size is zero"} ) self.error_result.append(error_file) continue self.info_path_fname = f sensor_path = self.getSensorPathFromInfoPath(f) # sensor_path = Path(sensor_purepath) # sensor 文件不存在 if not sensor_path.exists(): error_file.update( {"path":f} ) error_file.update( {"error":"cannot find the sensor file "} ) self.error_result.append(error_file) continue # sensor文件大小为0 if sensor_path.stat().st_size==0: error_file.update( {"path":f} ) error_file.update( {"error":"the file size of the sensor is zero' "} ) self.error_result.append(error_file) continue temp = {} temp.update( {"info_path" : f} ) temp.update( {"name" :f.name} ) temp.update( {"parent" :f.parent} ) temp.update( {"year" :f.parts[1][:4] } ) temp.update( {"month" :f.parts[1][-2:] } ) temp.update( {"day" :f.parts[2] } ) temp.update( {"hour" :f.parts[-1][:2] } ) temp.update( {"minute" :f.parts[-1][3:5] } ) temp.update( {"second" :f.parts[-1][6:8] } ) temp.update( { "sensor_path" :sensor_path } ) self.filelist.append(temp) # print(self.filelist) pass def dealAllMeasurements(self, ): log.info(f"dealAllMeasurements: 所有测量文件", __name__, "dealAllMeasurements", ) if len(self.filelist)<1: pass # 前面已经考虑各种文件错误 for df in self.filelist: # 处理信息帧 self.info_dict = {} self.info_path_fname:Path = df["info_path"] self.sensor_path_fname:Path = df["sensor_path"] hex_info = self.read_bin( self.info_path_fname ) try: self.info_dict = self.decode_info( hex_info ) except: log.error( f"处理信息文件" + "/" +self.info_path_fname.stem + "出现错误", __name__, "dealAllMeasurements" ) raise MyException( "处理文件"+ self.info_path_fname + "出现错误") pass # self.measurement_interval = self.data_info['Measure_Interval'] # self.measurement_repeat = self.data_info['Measure_Repeat'] try: self.dealOneMeasurement(self.sensor_path_fname ) except Exception as e: log.error( "处理传感器文件" + self.sensor_path_fname.stem + "出现错误",__name__,"dealAllMeasurements") raise MyException( "处理传感器文件" + self.sensor_path_fname.stem + "出现错误" ) pass log.info(f"all is done !! ",__name__,"dealAllMeasurements") return True,self.error_result pass def dealOneMeasurement(self, fpath:Path): '''handheld一次测量包含多组数据''' # 调用handheldbuf 处理,将一组数据提交出来 log.info(f"dealOneMeasurement: 一组测量数据", __name__, "dealOneMeasurement") # if len(self.filelist)<1: # pass # 当前文件名 self.current_filepath = fpath self.current_measure_time = self.getCurrentMeasureTimeFromPath(fpath) self.hhb.readFile2Buf(fpath) # 解析Buf, 对buf分组,获得[{lsky: esky : lwater} .... ] self.decode_senser_buf() self.output_path = self.getOutputPathFromSensorPath( fpath ) self.dealOneGroup() def dealOneMeasurement_online(self, ): '''handheld一次测量包含多组数据''' # 调用handheldbuf 处理,将一组数据提交出来 log.info(f"deal : 一组测量数据", __name__, "dealOneMeasurement_online") # 当前文件名 self.current_filepath = self.new_folder self.current_measure_time = self.info_dict['time'] self.setHandHeldBuf() # 解析Buf, 对buf分组,获得[{lsky: esky : lwater} .... ] self.decode_senser_buf() self.output_path = self.new_folder self.dealOneGroup() def dealOneGroup(self, ): # 分组,并获得平均值, 255个未插值结果 (依据 measurement_interval measurement_repeat) self.getAvg( self.intensity_before_avg ) # 插值 self.real_wavelength = self.getWavelenthDict() self.__do_sensor_dict_interpo() self.__get_Lw_Rs() # 保存数据 time_name = "20" + str(self.info_dict['year']) \ + '_' + str(self.info_dict['month']) \ + '_' + str(self.info_dict['day']) \ + '_' + str(self.info_dict['hour']) \ + '_' + str(self.info_dict['minute']) \ + '_' + str(self.info_dict['second']) save_path_csv = self.new_folder.joinpath(time_name + SAVE_EXT_NAME) self.doSaveCsv(save_path_csv) path_info_txt = self.new_folder.joinpath( time_name + "_info.txt" ) self.save_dict_to_file( self.info_dict, path_info_txt ) def decode_senser_buf(self,) : # 处理Buf,对多组数据取平均 self.intensity_before_avg = [] # res_before_avg = [] self.clearRes() # 清空数据 while True: if not self.hhb.decode_one_group_handheld() : break # 清空数据 res = {} buf = self.hhb.one_group_data[26:] for i in range(1,4,1): temp_buf = buf[7:71] + buf[79:143] + \ buf[151:215] + buf[223:287] + \ buf[295:359] + buf[367:431] + \ buf[439:503] + buf[511:575] # Ramses类 标定处理,设置buf 标定文件 self.ramses.setBuf(temp_buf) func = self.getFuncBySeq(i) cfg = self.getCfgByFunc( func) self.ramses.setCalCfg(cfg) self.ramses.resetItSpectrum() self.ramses.ConvertAndCalibrate() res.update({ func : self.ramses.spectrum }) self.intensity_before_avg.append( res ) pass def getAvg( self, d:list) : log.info(f"getAvg: 平均多组数据", __name__, "getAvg") data = d ret = {} len_result = len(data) if len_result == 0: self.intensity_after_avg ={} return None if len_result == 1: self.intensity_after_avg = data[0] return None ret = data[0] res_dict = self.getRetDict() for k in res_dict.keys(): for i in range(1,len_result,1): data[0][k] = data[0][k] + data[i][k] ret = data[0][k]/len_result self.intensity_after_avg.update( { k : ret } ) log.debug(f"getAvg: {self.intensity_after_avg}", __name__, "getAvg") pass def getRetDict(self,) : ret_dict = { } ret_dict.update( {self.device_enum(1).name:np.array([])} ) ret_dict.update( {self.device_enum(2).name:np.array([])} ) ret_dict.update( {self.device_enum(3).name:np.array([])} ) # self.one_group_result = ret_dict return ret_dict def __do_sensor_dict_interpo(self,) : log.info( f"同步处理多个个插值 ", __name__, "__do_sensor_dict_interpo" ) self.clearRes() for k in self.intensity_after_avg.keys(): tmp = np.interp( self.wavelength, self.real_wavelength[k], self.intensity_after_avg[k] ) self.res.update( { k : tmp } ) def getCfgByDid(self, func:str) : cfg_id:dict = self.syscfg.get(int(self.device_id)) cfg_sensor = cfg_id.get( func) return cfg_sensor pass def getFuncBySeq(self, seq:int) : func = "" if self.device_type == DeviceType.AWRAMS.name: func = RamsesAWRAMS(seq).name if self.device_type == DeviceType.SURFACE.name: func = RamsesAWRAMS(seq).name if self.device_type == DeviceType.PROFILE.name: func = RamsesAWRAMS(seq).name return func def getCfgByFunc(self, func:str) : cfg_id:dict = self.syscfg.get(int(self.device_id)) cfg_sensor = cfg_id.get( func) return cfg_sensor pass def getCfgBySeq(self, seq:int) : func = "" if self.device_type == DeviceType.AWRAMS.name: func = RamsesAWRAMS(seq).name if self.device_type == DeviceType.SURFACE.name: func = RamsesAWRAMS(seq).name if self.device_type == DeviceType.PROFILE.name: func = RamsesAWRAMS(seq).name cfg_id:dict = self.syscfg.get(int(self.device_id)) cfg_sensor = cfg_id.get( func) return cfg_sensor pass def __get_Lw_Rs(self, ) -> bool: Lw = self.res["Lwater"] - ROWFACTOR * self.res["Lsky"] self.res.update({ "Lw" : Lw }) Rs = self.res["Lw"] / self.res["Esky"] self.res.update({ "Rs" : Rs }) pass def doSaveCsv(self, save_path:Path) : header = self.list2header(self.wavelength.tolist(), TOKEN, str(self.device_id)+"_"+str(self.measure_id)) data_str = "" for k,v in self.res.items(): data_str = data_str + NEWLINE + \ self.list2header(self.res[k].tolist(),TOKEN,k) save_path.write_text(header+data_str) def list2header(self, lst:List[float], separator:str, first_column:str=None): ''' list 转为字符,作为文件头,是否有额外第一列''' # temp = separator.join(lst) tmpstr = "" if first_column: tmpstr = tmpstr + first_column for l in lst: tmpstr = tmpstr + separator + str(round(l,8)) return tmpstr def checkAndSaveData(self, ) -> bool: """ check self.Lsky Esky Lwater and Save 处理self.res 的数据 """ log.info(f"checkAndSaveData: {self.output_path.parts}", __name__) self.sl2f.setBasePath(self.output_path) #基路径 self.sl2f.setHeader( self.wavelength.tolist(), TOKEN, "device_id_"+str(self.device_id) ) # print(f"header_str : {self.sl2f.header_str[-1:]}") self.sl2f.newFileIfNot("Lsky"+SAVE_EXT_NAME) if self.sl2f.checkHeader() == -1: log.error(f"请备份文件:{self.sl2f.current_filepath.parent} {self.sl2f.current_filepath.name}, 并删除文件后再试!", __name__) raise MyException( f"请备份文件:{self.sl2f.current_filepath.parent} {self.sl2f.current_filepath.name}, 并删除文件后再试!" ) return False if self.sl2f.checkHeader() == 1: pass if self.sl2f.checkHeader() == 0: self.sl2f.writeHeader() pass # 写入数据content self.sl2f.setContent(self.res["Lsky"].tolist(), TOKEN, self.current_measure_time) self.sl2f.writeContent() self.sl2f.newFileIfNot("Esky"+SAVE_EXT_NAME) # self.sl2f.setHeader( self.wavelength.tolist(), TOKEN, str(self.device_id) ) if self.sl2f.checkHeader() == -1: log.error(f"请备份文件:{self.sl2f.current_filepath.parent} {self.sl2f.current_filepath.name}, 并删除文件后再试!", __name__) raise MyException( f"请备份文件:{self.sl2f.current_filepath.parent} {self.sl2f.current_filepath.name}, 并删除文件后再试!" ) return False if self.sl2f.checkHeader() == 1: pass if self.sl2f.checkHeader() == 0: self.sl2f.writeHeader() pass # 写入数据 self.sl2f.setContent(self.res["Esky"], TOKEN, self.current_measure_time) self.sl2f.writeContent() self.sl2f.newFileIfNot("Lwater"+SAVE_EXT_NAME) # self.sl2f.setHeader( self.wavelength.tolist(), TOKEN, str(self.device_id) ) if self.sl2f.checkHeader() == -1: log.error(f"请备份文件:{self.sl2f.current_filepath.parent} {self.sl2f.current_filepath.name}, 并删除文件后再试!", __name__) raise MyException( f"请备份文件:{self.sl2f.current_filepath.parent} {self.sl2f.current_filepath.name}, 并删除文件后再试!" ) return False if self.sl2f.checkHeader() == 1: pass if self.sl2f.checkHeader() == 0: self.sl2f.writeHeader() pass # 写入数据 self.sl2f.setContent( self.res["Lwater"], TOKEN, self.current_measure_time ) self.sl2f.writeContent() # def clearLskyEskyLwater(self, ) -> None: # self.res = { } def clearRes(self, ) -> None: self.res = { } def checkLskyEskyLwater(self, ) -> bool: if not self.res["Lsky"] : return False if not self.res["Esky"] : return False if not self.res["Lwater"] : return False return True def getLwRsAndSave(self, ) -> bool: """ 并计算Lw Rs并保存 """ self.Lw = self.res["Lwater"] - ROWFACTOR * self.res["Lsky"] self.Rs = self.Lw / self.res["Esky"] self.sl2f.setBasePath(self.output_path) #基路径 # 保存 self.sl2f.newFileIfNot("Lw"+SAVE_EXT_NAME) # self.sl2f.setHeader( self.wavelength.tolist(), TOKEN, str(self.device_id) ) if self.sl2f.checkHeader() == -1: raise MyException( f"请备份文件:{self.sl2f.current_filepath.cwd()}, 并删除文件后再试!" ) return False if self.sl2f.checkHeader() == 1: pass if self.sl2f.checkHeader() == 0: self.sl2f.writeHeader() pass # 写入数据 self.sl2f.setContent(self.Lw, self.current_measure_time) self.sl2f.writeContent() self.sl2f.newFileIfNot("Rs"+SAVE_EXT_NAME) # self.sl2f.setHeader( self.wavelength.tolist(), TOKEN, str(self.device_id) ) if self.sl2f.checkHeader() == -1: MyException( f"请备份文件:{self.sl2f.current_filepath.cwd()}, 并删除文件后再试!" ) return False if self.sl2f.checkHeader() == 1: pass if self.sl2f.checkHeader() == 0: self.sl2f.writeHeader() pass # 写入数据 self.sl2f.setContent( self.Rs, self.current_measure_time ) self.sl2f.writeContent() return True def read_bin(self,fpath: Path): log.debug(f" readbin: ", __name__, "", "" ) ret = None if not fpath.exists() : log.info(f"not find file: {fpath} ") return ret with open(fpath, 'rb') as file: ret = file.read() return ret log.debug(f" readbin: {ret} ", __name__, "", "" ) return ret pass def decode_info( self,info: bytes ) -> dict: ret = {} # 剖面型加了1311+24个字节共26个字节 # 保留字节有所变化,改为序列号,每个序列号两个字节 try: temp = struct.unpack(" None: temp_str = "" for key, value in info_dict.items(): temp_str = temp_str + key + " : " + str(value) + "\n" with open(fpath, "w+") as f: f.write(temp_str) def save_error_to_file(self, errlist: list, fpath: Path) -> None: temp_str = "" if len(errlist) <1: return None for errdict in errlist: temp_str = temp_str + errdict["path"] +" : "+ errdict["error"] + "\n" pass with open(fpath, "w+") as f: f.write(temp_str) return None pass def getWavelenthDict( self, ) -> dict: ret_dict = self.getRetDict() cfg_id:dict = self.syscfg.get( int(self.device_id)) for k in ret_dict.keys(): tmp = self.getWavelength( cfg_id.get(k) ) ret_dict.update({ k : tmp }) return ret_dict def getWavelength(self,ramsesdict:dict) -> np.ndarray: ret = [] for i in range(1,256): tmp = float(ramsesdict['c0s']) + float(ramsesdict['c1s'])*i \ + float(ramsesdict['c2s'])*i*i + float(ramsesdict['c3s'])*i*i*i ret.append(tmp) pass return np.array(ret) def setSyscfg(self, syscfg:dict): self.syscfg = syscfg def setRetrieve(self, retrieve:dict): log.debug( f"setRetrieve : {retrieve}......", __name__ ) self.retrieve = retrieve self.setNewWavelength() # self.wavelength:np.ndarray = np.arange ( self.retrieve['beginWL'], self.retrieve['endWL'], self.retrieve['interval'] ) def setNewWavelength(self, ): self.wavelength:np.ndarray = np.arange ( self.retrieve['beginWL'], self.retrieve['endWL'], self.retrieve['interval'] ) def setDeviceID(self, did:int): self.device_id = did def setDeviceType(self, device_type): self.device_type = device_type def setMeasureID(self, mid:int): self.measure_id = mid def getErrorInfoDict(self, ): return self.error_result def printResult(self,): log.info( f"***** : Print Lsky Esky Lwater Lw Rs......", __name__ ) print(self.res[self.device_enum(1)]) print(self.res[self.device_enum(2)]) print(self.res[self.device_enum(3)]) print(self.Lw) print(self.Rs) pass if __name__ == "__main__": cfg ={} retrieve = {} a = AWRAMS(cfg,retrieve) p = Path("data") print(p) # a.readOneFolder(p) # class AWRAMS2(): # """ # @description : # @param : cfg :字典类型, 获得传感器配置及标定文件 # retieve: 反演需要的参数,起始波长间隔 # @Returns : # """ # def __init__(self ) -> None: # self.cfg = {} # self.retrieve = {} # self.device_type = DeviceType.AWRAMS.name # self.device_id = 0 # self.measure_id = 0 # self.base_path = Path() # self.oldpath:Path = None # self.newpath:Path = None # self.beginWL = 350 # self.endWL = 950 # self.interval = 1.0 # self.rowFactor = 0.026 # self.ramses = Ramses() # self.time = "" # self.savefilepath:Path = None # self.sensor_num = 0 # self.Lw = None # self.Rs = None # self.sl2f = SaveList2File() # # self.__init_parameter() # pass # def setBasePath( self, p:Path ): # self.base_path = p # pass # def setRetrieve( self, rtv:dict ): # self.retrieve = rtv # self.beginWL = self.retrieve['beginWL'] # self.endWL = self.retrieve['endWL'] # self.interval = self.retrieve['interval'] # self.rowFactor = self.retrieve['rowFactor'] # self.wavelength:np.ndarray = np.arange ( self.beginWL, self.endWL, self.interval ) # pass # def setCfg(self,cfg:dict): # self.cfg = cfg # pass # def setOldFolder(self, tuple_path: tuple ): # self.oldpath = self.sl2f.getPathFromBaseAndTuple(self.base_path, tuple_path) # pass # def setOneFolder(self, fpath: Path ): # self.oldpath = fpath # pass # def getNewPathFromOldPath(self, tuple_path:tuple) -> Path: # tmp =( self.oldpath.parts[-3], self.oldpath.parts[-2]) # # print(f"tmp: {tmp}") # # path_ = self.base_path.joinpath(*tmp) # self.sl2f.setBasePath(self.base_path) # self.sl2f.setBasePath( tmp) # self.sl2f.setBasePath( tuple_path) # return self.sl2f.current_path # # return self.sl2f.getPathFromBaseAndTuple(path_, tuple_path) # pass # def readOneFolder(self ): # """ # 从老文件(数据文件所在目录),转移到新文件夹并处理 # do all get Rs # """ # # 从old文件夹获得id # self.device_id,self.measure_id = self.getDeviceIDMeasureID(self.oldpath) # # 读取并处理info.bin文件, 并获得时间 # self.deal_info_bin() # self.time = self.info_dict['time'] # self.ymdhms = "20"+str(self.info_dict["year"]) + "_" + str(self.info_dict["month"]) \ # +"_"+str(self.info_dict["day"]) + "_" + str(self.info_dict["hour"]) \ # +"_"+str(self.info_dict["minute"]) + "_" + str(self.info_dict["second"]) # # 处理文件路径 self.newpath self.savefilepath self.infopath # self.deal_path_filepath() # # 遍历文件夹 *.bin, 每组文件存到list # bin_data_lst = self.getGroupsFromBin(self.oldpath) # spectrum = self.dealBinDatalist(bin_data_lst,True) # real_wl = self.getWavelenthDict() #字典 {lsky: esky lwater} # spectrum_after_interpo = self.doMultiSenorInterpo(spectrum,real_wl) # self.Lw = spectrum_after_interpo[RamsesAWRAMS.Lwater.name] - self.rowFactor* spectrum_after_interpo[RamsesAWRAMS.Lsky.name] # self.Rs = self.Lw / spectrum_after_interpo[RamsesAWRAMS.Esky.name] # # # 转移文件夹,处理文件 # self.copyBinFromOldToNew() # # for p in self.oldpath.glob('*.bin'): # # if p.name == "pic.bin": # # p.replace(self.newpath.joinpath("pic.jpg")) # # else: # # p.replace(self.newpath.joinpath(p.name)) # # 保存 info.txt # # infopath = self.newpath.joinpath(self.ymdhms+"_info.txt") # self.save_dict_to_file( self.info_dict, self.infopath ) # # 保存结果 # # save_csv_name = self.newpath.joinpath(self.ymdhms+SAVE_EXT_NAME) # self.doSaveCsv(spectrum_after_interpo, self.savefilepath) # log.info(f"Done. device_id:{self.device_id} measure_id {self.measure_id} ") # log.info(f"One measure result was received and dealed.") # def copyBinFromOldToNew(self, ) : # """由cfg计算波长后插值""" # log.info(f"copyBinFromOldToNew: : ", __name__) # for p in self.oldpath.glob('*.bin'): # if p.name == "pic.bin": # p.replace(self.newpath.joinpath("pic.jpg")) # else: # p.replace(self.newpath.joinpath(p.name)) # def dealBinDatalist(self, bin_data_list, ip_included: bool = False) : # """由cfg计算波长后插值""" # log.info(f"dealBinDatalist: : 5 * 3 sensor_data ", __name__) # result = [] # for bdl in bin_data_list: # buf = bdl # buf_len = len(buf) # if buf_len < 576: # return # if ip_included: # buf = buf[26:] # buf_len = buf_len - 26 # buf_len = len(buf) # if buf_len != 1728: # return # # 处理三个传感器组成的hex, 遍历处理 # res = {} # for i in range(1,4,1): # temp_buf = buf[7:71] + buf[79:143] + \ # buf[151:215] + buf[223:287] + \ # buf[295:359] + buf[367:431] + \ # buf[439:503] + buf[511:575] # # Ramses类 标定处理,设置buf 标定文件 # self.ramses.setBuf(temp_buf) # func = self.getFuncBySeq(i) # # log.debug(f"func: : {func} ", __name__,"dealBinDatalist") # cfg = self.getCfgByFunc( func) # self.ramses.setCalCfg(cfg) # self.ramses.resetItSpectrum() # self.ramses.ConvertAndCalibrate() # res.update({ func:self.ramses.spectrum }) # result.append(res) # # print(result[0]) # #取平均 -- 补充更多校验 # ret:dict = self.getRetDict() # len_result = len(result) # for k in ret.keys(): # for i in range(1,len_result,1): # result[0][k] = result[0][k] + result[i][k] # tmp = result[0][k]/len_result # ret.update( { k : tmp}) # return ret # pass # def dealBin(self, buf:bytes, ip_included: bool = False) : # res = {} # len_ = len(self.buf) # if len_ < 576: # return # if ip_included: # self.buf = self.buf[26:] # len_ = len_ - 26 # if len_ % 576 != 0: # return # for i in range(int(len_/576)): # res.update({i+1: {}}) # temp_buf = self.buf[7:71] + self.buf[79:143] + \ # self.buf[151:215] + self.buf[223:287] + \ # self.buf[295:359] + self.buf[367:431] + \ # self.buf[439:503] + self.buf[511:575] # # self.ConvertAndCalibrate( temp_buf ) # # print(len(temp_buf)) # # temp = self.__ConvertBytesToInt(temp_buf) # # res.update( { i+1: temp } ) # # print(res) # pass # def getRetDict(self,) : # ret_dict = { } # if self.device_type == DeviceType.AWRAMS.name: # ret_dict.update( {RamsesAWRAMS(1).name:np.array([])} ) # ret_dict.update( {RamsesAWRAMS(2).name:np.array([])} ) # ret_dict.update( {RamsesAWRAMS(3).name:np.array([])} ) # if self.device_type == DeviceType.SURFACE.name: # ret_dict.update( {RamsesSURFACE(1).name:np.array([])} ) # ret_dict.update( {RamsesSURFACE(2).name:np.array([])} ) # ret_dict.update( {RamsesSURFACE(3).name:np.array([])} ) # if self.device_type == DeviceType.PROFILE.name: # ret_dict.update( {RamsesPROFILE(1).name:np.array([])} ) # ret_dict.update( {RamsesPROFILE(2).name:np.array([])} ) # ret_dict.update( {RamsesPROFILE(3).name:np.array([])} ) # return ret_dict # def getFuncBySeq(self, seq:int) : # func = "" # if self.device_type == DeviceType.AWRAMS.name: # func = RamsesAWRAMS(seq).name # if self.device_type == DeviceType.SURFACE.name: # func = RamsesAWRAMS(seq).name # if self.device_type == DeviceType.PROFILE.name: # func = RamsesAWRAMS(seq).name # return func # def getCfgByFunc(self, func:str) : # cfg_id:dict = self.cfg.get(int(self.device_id)) # cfg_sensor = cfg_id.get( func) # return cfg_sensor # pass # def getCfgBySeq(self, seq:int) : # func = "" # if self.device_type == DeviceType.AWRAMS.name: # func = RamsesAWRAMS(seq).name # if self.device_type == DeviceType.SURFACE.name: # func = RamsesAWRAMS(seq).name # if self.device_type == DeviceType.PROFILE.name: # func = RamsesAWRAMS(seq).name # cfg_id:dict = self.cfg.get(int(self.device_id)) # cfg_sensor = cfg_id.get( func) # return cfg_sensor # pass # def doMultiSenorInterpo(self, data:dict, wl:dict ) : # """同步多个传感器插值""" # ret_dict = self.getRetDict() # x = self.wavelength # for k in ret_dict.keys(): # tmp = np.interp(x,wl[k],data[k]) # ret_dict.update({ k : tmp }) # return ret_dict # def doInterpo(self, data_after_average:list ) : # """由cfg计算波长后插值""" # x = self.wavelength # ret = [] # # for i in range(len(self.cfg)): # # cfg_index = RamsesFunc(i+1).name # # wl = self.getWavelenth( self.cfg[cfg_index] ) # # tmp = np.interp( x, wl, data_after_average[i] ) # # ret.append(tmp) # # pass # return ret # def getWavelenthDict( self, ) -> dict: # ret_dict = self.getRetDict() # cfg_id:dict = self.cfg.get( int(self.device_id)) # for k in ret_dict.keys(): # tmp = self.getWavelenth( cfg_id.get(k) ) # ret_dict.update({ k : tmp }) # return ret_dict # def getWavelenth(self,cfg:dict) -> np.ndarray: # ret = [] # for i in range(1,256): # tmp = float( cfg['c0s'] ) + float( cfg['c1s'] )*i \ # + float( cfg['c2s'] )*i*i + float( cfg['c3s'] )*i*i*i # ret.append(tmp) # pass # return np.array(ret) # def doSaveCsv(self, spectrum_after_interpo:dict ,save_path:Path) : # header = self.list2header(self.wavelength.tolist(), TOKEN, self.device_id+"_"+self.measure_id) # data_str = "" # for k,v in spectrum_after_interpo.items(): # firstcolumn = k # data_str = data_str + NEWLINE + \ # AWRAMS.list2header(spectrum_after_interpo[k].tolist(),TOKEN,k) # data_str = data_str + NEWLINE + AWRAMS.list2header( self.Lw.tolist(), SEPARATOR, "Lw" ) # data_str = data_str + NEWLINE + AWRAMS.list2header( self.Rs.tolist(), SEPARATOR, "Rs" ) # save_path.write_text(header+data_str) # # def getSaveFilePath(self) : # # """由cfg计算波长后插值""" # # return self.newpath.joinpath(self.ymdhms+SAVE_EXT_NAME) # # def doSaveIntensity(self, data_after_interp:list ) : # # """由cfg计算波长后插值""" # # header = AWRAMS.list2header(self.wavelength.tolist(), SEPARATOR, self.device_id+"_"+self.measure_id) # # data_str = "" # # # for i in range(self.sensor_num): # # # cfg_index = RamsesFunc(i+1).name # # # firstcolumn = self.cfg[cfg_index]["FUNC"] # # # data_str = data_str + NEWLINE + \ # # # AWRAMS.list2header(data_after_interp[i].tolist(),SEPARATOR,firstcolumn) # # # pass # # # data_str = data_str + NEWLINE + AWRAMS.list2header( self.Lw.tolist(), SEPARATOR, "Lw" ) # # # data_str = data_str + NEWLINE + AWRAMS.list2header( self.Rs.tolist(), SEPARATOR, "Rs" ) # # # if self.savefilepath.exists(): # # # self.savefilepath.touch() # # # self.savefilepath.write_text(header+data_str) # @staticmethod # def list2header( lst:List[float], separator:str, first_column:str=None): # ''' list 转为字符,作为文件头,是否有额外第一列''' # # temp = separator.join(lst) # tmpstr = "" # # if first_column: # # tmpstr = tmpstr + first_column + separator # # else: # # tmpstr = tmpstr + separator # if first_column: # tmpstr = tmpstr + first_column # for l in lst: # tmpstr = tmpstr + separator + str(round(l,8)) # return tmpstr # def doTotalCalibrate(self, int_data_list:list ) : # ret = [] # # data = int_data_list # # log.debug(f" doTotalCalibrate : {len(data) }", __name__, "", "") # # for i in range(len(data)): # 遍历获得每组数据 # # # i : measure group # # group = data[i] # List # # group_ret = [] # # # log.debug(f" doTotalCalibrate {i} : {data[i]}", __name__, "", "") # # for j in range(len(group)): # 遍历获得三个传感器数据分别标定 # # sensor = group[j] # Tuple # # # sensor_ret = [] # # # log.info(f"sensor {j}:{group[j]}", __name__, "", "") # # cfg_index = RamsesFunc(j+1).name # # # log.error(f" doTotalCalibrate : {self.cfg }", __name__, "", "") # # log.debug(f" doTotalCalibrate : { cfg_index } ", __name__, "", "") # # log.debug(f" doTotalCalibrate : {self.cfg[cfg_index] }", __name__, "", "") # # tmp_intensity = self.CalibrateSpectrumData( self.cfg[cfg_index], group[j] ) # # group_ret.append( tmp_intensity ) # # ret.append(group_ret) # return ret #[ [s1_ndarray,s2_ndarray,s3_ndarray],....... ] # # def CalibrateSpectrumData(self, ramsescal: dict, rawData: tuple): # # t0 = 8092 # # t = rawData[0] # # log.debug(f" CalibrateSpectrumData RawData {rawData}", __name__, "", "") # # raw = np.asarray(rawData[1], dtype=float) # # B0 = np.asarray(ramsescal["b0"], dtype=float) # # B1 = np.asarray(ramsescal["b1"], dtype=float) # # Mn = raw/65535 # # Bn = B0 + B1 * (t/t0) # # Cn = Mn-Bn # # Offset = AWRAMS.getOffset( # # Cn, int(ramsescal['DarkPixelStart']), int(ramsescal['DarkPixelStop'])) # # Dn = Cn-Offset # # En = Dn * (t0/t) # # Fn = En/np.asarray(ramsescal["cal"], dtype=float) # 空气或水中的标定文件 # # return Fn # def getDeviceIDMeasureID(self,fpath: Path): # path_lst = fpath.parts # device_id = path_lst[-2] # measure_id = path_lst[-1] # return (device_id, measure_id) # pass # def getGroupsFromBin(self,fpath: Path): # log.debug(f" getGroupsFromBin data: {fpath}", __name__, "", "" ) # f_list = [x for x in fpath.glob('*.bin')] # group_num = len(f_list) # info.bin pic.bin 假如pic.bin没有存下来?? # data = [] # [ b'', b'' ....] every b'' include three sensor data # for f in f_list: # stem = f.stem # if stem.isdigit(): # data.append(self.read_bin(f)) # # for i in range(group_num): # # path_ = fpath.joinpath(str(i)+".bin") # # if path_ not in f_list: # # log.warning( # # f"not find file {PurePath(path_)} ", __name__, "", "") # # return [] # # data.append(AWRAMS.read_bin(path_)) # log.debug(f" getGroupsFromBin data: {len(data)}", __name__, "", "" ) # return data # pass # def read_bin(self,fpath: Path): # log.debug(f" readbin: ", __name__, "", "" ) # ret = None # if not fpath.exists() : # log.info(f"not find file: {fpath} ") # return ret # with open(fpath, 'rb') as file: # ret = file.read() # return ret # log.debug(f" readbin: {ret} ", __name__, "", "" ) # return ret # pass # def deal_info_bin(self, ): # log.debug(f" deal_old_bin : ", __name__, "", "" ) # infopath = self.oldpath.joinpath("info.bin") # if not infopath.exists(): # raise Exception( f"not find file: {PurePath(infopath)}" ) # pass # info_bin = self.read_bin(infopath) # # log.debug(f"infopath cc {info_bin}", __name__, "", "" ) # self.info_dict = self.decode_info(info_bin) # # log.debug(f" info_dict : {info_dict}", __name__, "", "" ) # self.info_dict.update({"device_id":self.device_id}) # self.info_dict.update({"measure_id":self.measure_id}) # # log.debug(f" {info_dict}", __name__, "", "" ) # def deal_path_filepath(self ): # log.debug(f" deal_path_filepath : ", __name__, "", "" ) # self.sl2f.setBasePath(self.base_path) # path_tuple = ("data", str(self.device_id) # ,"20"+str(self.info_dict["year"]), str(self.info_dict["month"]), str(self.info_dict["day"])) # self.sl2f.setPath(path_tuple) # self.sl2f.newPathIfNot() # self.newpath = self.sl2f.current_path # self.sl2f.newFileIfNot(self.ymdhms+SAVE_EXT_NAME) # self.savefilepath = self.sl2f.current_filepath # self.sl2f.newFileIfNot(self.ymdhms+"_info.txt") # self.infopath = self.sl2f.current_filepath # def new_dirs_if_not(self,fpath:Path, *args) : # ''' * 目录不存在就新建 多级目录 ''' # if not fpath.exists: # raise # if len(args) == 0: # pass # tmpdir = fpath # # newpath = tmpdir.joinpath(args) # for arg in args: # tmpdir = tmpdir.joinpath(arg) # if not tmpdir.exists(): # tmpdir.mkdir() # return tmpdir # def decode_info(self,info: bytes) -> dict: # ret = {} # try: # temp = struct.unpack(" None: # temp_str = "" # for key, value in info_dict.items(): # temp_str = temp_str + key + " : " + str(value) + "\n" # fpath.write_text(temp_str) # # with open(fpath, "w+") as f: # # f.write(temp_str) # pass