#! python3 # -*- encoding: utf-8 -*- ''' @File : handheld.py @Time : 2023/02/24 17:20:59 @Author : Jim @ Yiwin @Version : 1.0 @Contact : jim@yi-win.com ''' CUR_TIME_STR_FMT = "%Y-%m-%d %H:%M:%S" import time import locale import struct import numpy as np from pathlib import PurePath,Path from myconfig import CURRENT_DIR,DATA_DIR,OUTPUT_DIR,NEWLINE,ROWFACTOR,SAVE_EXT_NAME,TOKEN from myconfig import DeviceType,RamsesSURFACE,RamsesAWRAMS,RamsesPROFILE from tools.mylogger import log from tools.myexception import MyException from tools.mytime import MyTime from tools.mypath import MyDir from Ramses import Ramses class HandHeldBuf: def __init__(self,) -> None: self.__buf = b'' self.__head = {} self.__begin_sign = b'\x23' self.__end_sign = b'\x0D' self.data_ip = b'' self.measure_group = { "Lsky": b'', "Esky": b'', "Lwater": b'', } self.one_group_data= b'' self.state = 0 def readFile2Buf(self, fpath) -> None: with open(fpath,"rb") as f: self.__buf = f.read() pass pass def read_buf(self, size: int) -> bytes: if size > self.__buf.__len__(): return b'' ret = self.__buf[0:size] self.__buf = self.__buf[size:] return ret def write_buf(self, buf: bytes) -> None: len = buf.__len__() # logging.info(f'Received ID:{id} Size:{len}') self.__buf = self.__buf+buf def get_buf_size(self) -> int: return self.__buf.__len__() def back_bytes(self, buf: bytes) -> None: self.__buf = buf+self.__buf def reset_head(self) -> None: self.__head = {} def reset_buf(self) -> None: self.__buf = b'' def getResult(self) -> str: return self.res def resetMeasureGroup(self) -> None: self.measure_group['Lsky'] = b'' self.measure_group['Esky'] = b'' self.measure_group['Lwater'] = b'' def getMeasureGroup(self) -> dict: return self.measure_group def decode_handheld(self) -> bool: '''以26个字节开始,一般26个00 ,然后23... 07.... ''' if self.get_buf_size() < 1754: self.__buf = b'' return False self.data_ip = self.read_buf(26) Lsky = self.read_buf(576) Esky = self.read_buf(576) Lwater = self.read_buf(576) self.measure_group['Lsky'] = self.deal_576_to_512(Lsky) self.measure_group['Esky'] = self.deal_576_to_512(Esky) self.measure_group['Lwater'] = self.deal_576_to_512(Lwater) if self.measure_group['Lsky'] == b'' \ or self.measure_group['Esky'] == b'' \ or self.measure_group['Lwater'] == b'' : return False return True def decode_one_group_handheld(self) -> bool: '''以26个字节开始,一般26个00 ,然后23... 07.... ''' if self.get_buf_size() < 1754: self.__buf = b'' return False self.data_ip = self.read_buf(26) self.one_group_data = self.read_buf(1728) return True def deal_576_to_512(self,data:bytes) -> bytes: ''' 576字节校验,拆分成字典 23... 07.... ,然后254*2''' ret = {} index = 72 for i in range(8): temp = data[i*index : i*index+index] # print( temp.hex()) if temp[0] != 35 and temp[0]<8 and temp>0 : return b'' pass ret.update( { temp[4]: temp} ) if len(ret) != 8: return b'' ret_byte = ret[7][7:71]+ ret[6][7:71]+ ret[5][7:71]+ ret[4][7:71] \ + ret[3][7:71]+ ret[2][7:71]+ ret[1][7:71]+ ret[0][7:71] return ret_byte def decode(self) -> str: ret = '' temp_buf = b'' token = ";" if TOKEN: token = TOKEN if self.state == 0: while self.get_buf_size() >= 1: if self.read_buf(1) != self.__end_sign: continue self.state = 1 break if self.state == 1: while self.get_buf_size() >= 1: buf = self.read_buf(1) if buf != self.__end_sign: temp_buf += buf if buf == self.__end_sign: ret = temp_buf[0:20].decode( 'utf-8').strip(" ") + token + temp_buf[-9:-1].decode('utf-8').strip(" ") log.info(f"decode : {ret}") temp_buf = b'' self.back_bytes(temp_buf) # 写回临时buf到 self.res = ret return ret pass class HandHeldPath(object): '''处理一次测量''' def __init__(self, ): self.mode = 0 self.cfg ={} self.mydir = MyDir() self.base_path:Path = Path() self.output_path:Path = Path() self.data_path:Path = Path() self.filelist = [] self.error_result = [] pass def setMode( self, mode:int = 0 ): self.mode = mode pass def setBasePath( self, fpath:Path ): self.base_path = fpath pass def setDataPath( self, fpath:Path ): self.data_path = fpath pass def setOutputPath( self, fpath:Path ): self.output_path = fpath pass def getSensorPathFromInfoPath( self, info_path:Path, ) -> Path: # 服务器上转移后的模式 if self.mode == 1: sensor_path = self.base_path.joinpath( info_path.parent, "sensor.bin" ) elif self.mode == 0: sensor_path = self.base_path.joinpath( "data", info_path.parts[1][:4] + "_" + info_path.parts[1][-2:], info_path.parts[2], "sensor", info_path.name ) else: sensor_path = None return sensor_path pass def getOutputPathFromSensorPath(self,sensor_path:Path ) -> Path: if self.mode == 1: output_path = self.output_path.joinpath( self.info_dict['year'] + "_" + self.info_dict['month'] + "_" + self.info_dict['day'] + "_" + self.info_dict['hour'] + "_" + self.info_dict['minute'] + "_" + self.info_dict['second'] ) elif self.mode == 0: output_path = self.output_path.joinpath( sensor_path.parts[1][:4] + "_" + sensor_path.parts[1][-2:] + "_" +sensor_path.parts[2] + "_" +sensor_path.name ) else: output_path = None return output_path pass def getCurrentMeasureTimeFromPath(self,fpath:Path ) -> str: ret = '' if self.mode == 1: # 读信息txt获得时间 txt_path = fpath.parent.glob("*.txt") txt_stem = txt_path[0].stem ret = fpath.parts[1][:4]+"-"+fpath.parts[1][-2:]+"-"+fpath.parts[2]+" " \ + txt_stem[-9:-7] + ":" + txt_stem[-6:-4] + ":" + txt_stem[-3:-1] pass elif self.mode == 0: ret = fpath.parts[1][:4]+"-"+fpath.parts[1][-2:]+"-"+fpath.parts[2]+" " \ + fpath.name[0:2] + ":" + fpath.name[3:5] + ":" + fpath.name[6:8] else: pass return ret pass def getDataFileList(self, ): ''' 获得成对的info sensor 文件 [目录名,文件名,年月日,时间, measure_id] ''' # ret = [] fs = None self.filelist = [] if self.mode == 1: fs = self.data_path.glob( "*/*/*/*/info.bin" ) else: fs = self.data_path.glob( "*/*/info/*" ) for f in fs: error_file = {} if f.stat().st_size==0: error_file.update( {"path": f } ) error_file.update( {"error":"info file size is zero"} ) self.error_result.append(error_file) continue # self.info_path_fname = f sensor_path = self.getSensorPathFromInfoPath(f) # sensor_path = Path(sensor_purepath) # sensor 文件不存在 if not sensor_path.exists(): error_file.update( {"path":f} ) error_file.update( {"error":"cannot find the sensor file "} ) self.error_result.append(error_file) continue # sensor文件大小为0 if sensor_path.stat().st_size==0: error_file.update( {"path":sensor_path} ) error_file.update( {"error":"sensor file size of the sensor is zero' "} ) self.error_result.append(error_file) continue self.setFilelist(f,sensor_path ) pass # def getDataFileList_SD(self, mode:int = 0 ): # ''' # 获得成对的info sensor 文件 # [目录名,文件名,年月日,时间] # ''' # # ret = [] # fs = self.data_path.glob( "*/*/info/*" ) # for f in fs: # error_file = {} # if f.stat().st_size==0: # error_file.update( {"path": f.name } ) # error_file.update( {"error":"info file size is zero"} ) # self.error_result.append(error_file) # continue # self.info_path_fname = f # sensor_path = self.getSensorPathFromInfoPath( f ) # # sensor_path = Path(sensor_purepath) # # sensor 文件不存在 # if not sensor_path.exists(): # error_file.update( { "path" : f } ) # error_file.update( {"error":"cannot find the sensor file "} ) # self.error_result.append(error_file) # continue # # sensor文件大小为0 # if sensor_path.stat().st_size==0: # error_file.update( { "path" : f } ) # error_file.update( {"error" : "the file size of the sensor is zero' "} ) # self.error_result.append(error_file) # continue # self.setFilelist(f,sensor_path,mode=mode) # # print(self.filelist) # pass def setFilelist(self, info_path:Path, sensor_path:Path): temp = {} temp.update( {"info_path" : info_path } ) temp.update( {"name" : info_path.name} ) temp.update( {"parent" : info_path.parent} ) if self.mode==1: # 服务器转移后目录 temp.update( {"year" : info_path.parts[-5] } ) temp.update( {"month" :info_path.parts[-4] } ) temp.update( {"day" :info_path.parts[-3] } ) temp.update( {"year" : info_path.parts[1][:4] } ) temp.update( {"month" :info_path.parts[1][-2:] } ) temp.update( {"day" :info_path.parts[2] } ) temp.update( { "sensor_path" :sensor_path } ) self.filelist.append(temp) def getFilelist(self, ): return self.filelist def printTest(self,d,sleep_time:int=5): log.info( f"***** : I am testing from HandheldPath ********", __name__ ) print(d) log.info( f"***** : Ending testing from HandheldPath ********", __name__ ) time.sleep(sleep_time) pass class AWRAMS(object): def __init__(self, ): """ @description : 手持数据初始化 receive --> deal() 调用 dealOneMeasurement_Online() """ self.device_type = DeviceType.AWRAMS.name self.device_enum = None self.device_id = {} self.syscfg = {} self.retrieve ={} self.error_result=[] self.mode = 0 # 0:默认SD数据, 1:服务器数据 self.base_path = CURRENT_DIR self.current_path = CURRENT_DIR self.data_path = DATA_DIR self.output_path = OUTPUT_DIR self.info_path_fname:Path = self.base_path self.sensor_path_fname:Path = self.base_path self.filelist = [] # 包含了全部路径信息 self.intensity_before_avg = None self.intensity_after_avg = {} # 最终结果{lsky: esky: lwater: } self.intensity_after_interpo = {} # 最终结果{lsky: esky: lwater: } self.one_group_result = {} # 手持式 多次间隔 self.wavelength = np.array([]) self.real_wavelength = {} # 最终结果{lsky: esky: lwater: } self.current_filepath = '' # 不含后缀 self.current_measure_time = None # 当前测量时间 self.current_group_num = 0 self.measurement_interval = 0 self.measurement_repeat = 1 # self.sl2f = SaveList2File() self.mydir = MyDir() self.my_time = MyTime() self.hhb = HandHeldBuf() self.hhp = HandHeldPath() self.ramses = Ramses() self.info_dict = {} self.res = {} # 最终结果{lsky: esky: lwater: Lw: Rs:} # 方法 self.setDeviceEnum() self.setMode() self.set_hhp_path() pass def setMode(self,mode=0 ): self.hhp.setMode (mode) # 0 :读SD卡 1:读服务器数据 def setMeasureID(self, mid:int): self.measure_id = mid def setOldFolder(self, tuple_path:tuple): self.old_folder = self.base_path.joinpath( *tuple_path ) def transferFromOldFolder(self, ): log.info( f"transferFromOldFolder: ", __name__ ) self.getNewFolderFromOldFolder() bin_files = self.old_folder.glob('*.bin') for bf in bin_files: if bf.name == "pic.bin": bf.replace( self.new_folder.joinpath( "pic.jpg" ) ) else: bf.replace( self.new_folder.joinpath(bf.name) ) pass def deleteOldFolder(self, ): log.info( f"deleteOldFolder: ", __name__ ) self.mydir.setBaseDir(self.old_folder) self.mydir.deleteDir() self.old_folder = DATA_DIR def getNewFolderFromOldFolder( self, ) -> Path: # 服务器上转移后的模式 tmp_folder = self.old_folder.parent.parent path_tuple = ("20"+f"{self.info_dict['year']:02d}" ,f"{self.info_dict['month']:02d}" ,f"{self.info_dict['day']:02d}" ,self.old_folder.parts[-1] ) self.new_folder =tmp_folder.joinpath(*path_tuple) self.mydir.setBaseDir(self.new_folder) self.mydir.newDirIfNot() pass def getInfoDict(self, ): info_path = self.old_folder.joinpath("info.bin") hexbin = self.read_bin(info_path) self.info_dict = self.decode_info(hexbin) self.new_folder = self.getNewFolderFromOldFolder() def set_hhp_path(self, ): self.hhp.setBasePath (self.base_path) self.hhp.setDataPath (self.data_path) self.hhp.setOutputPath (self.output_path) def setDeviceEnum(self, ): if self.device_type == DeviceType.SURFACE.name: self.device_enum = RamsesSURFACE if self.device_type == DeviceType.AWRAMS.name: self.device_enum = RamsesAWRAMS if self.device_type == DeviceType.PROFILE.name: self.device_enum = RamsesPROFILE def getDataFileList(self, ): self.hhp.getDataFileList() self.filelist = self.hhp.getFilelist() # self.printTest(self.filelist) pass def dealAllMeasurements(self, ): log.info(f" 所有测量文件", __name__, "dealAllMeasurements", ) if len(self.filelist)<1: pass # 前面已经考虑各种文件错误 for df in self.filelist: # 处理信息帧 self.info_dict = {} self.info_path_fname:Path = df["info_path"] self.sensor_path_fname:Path = df["sensor_path"] hex_info = self.read_bin( self.info_path_fname ) try: self.info_dict = self.decode_info( hex_info ) except: log.error( f"处理信息文件" + "/" +self.info_path_fname.stem + "出现错误", __name__, "dealAllMeasurements" ) raise MyException( "处理文件"+ self.info_path_fname + "出现错误") pass try: # awrams每次只有一组数据 self.dealOneMeasurement(self.sensor_path_fname ) # self.measurement_interval = int(self.info_dict['Measure_Interval']) # self.measurement_repeat = int(self.info_dict['Measure_Repeat']) # self.dealOneHandheldMeasurement(self.sensor_path_fname ) except Exception as e: log.error( "处理传感器文件" + self.sensor_path_fname.name + " 出现错误",__name__,"dealAllMeasurements") raise MyException( "处理传感器文件" + self.sensor_path_fname.name + " 出现错误" ) pass log.info(f"Finished !! ", __name__, "dealAllMeasurements") return True,self.error_result pass def dealOneHandheldMeasurement(self, fpath:Path): '''handheld一次测量包含多组数据''' # 调用handheldbuf 处理,将一组数据提交出来 log.info(f" 手持一个文件,多组测量数据", __name__, "dealOneHandheldMeasurement") if len(self.filelist)<1: pass # 当前文件名 self.output_path = OUTPUT_DIR self.current_filepath = fpath self.current_measure_time = self.hhp.getCurrentMeasureTimeFromPath(fpath) self.ymdhms = "20"+ str(self.info_dict['year']) + '_' \ + str(self.info_dict['month']) + '_' \ + str(self.info_dict['day']) + '_' \ + str(self.info_dict['hour']) + '_' \ + str(self.info_dict['minute']) + '_' \ + str(self.info_dict['second']) self.output_path = self.output_path.joinpath( self.ymdhms ) log.debug(f"current_measure_time: {self.current_measure_time}", __name__, "dealOneHandheldMeasurement") self.hhb.readFile2Buf(fpath) log.debug(f"buf: {self.hhb.get_buf_size()}", __name__, "dealOneHandheldMeasurement") self.decode_sensor_buf() # 解析Buf, 对buf分组,获得[{lsky: esky : lwater} .... ] len_total = len(self.intensity_before_avg) if len_total%self.measurement_repeat != 0: self.res = {} return # 返回退出 group_num = int(len_total/self.measurement_repeat) log.info(f"group_num...: {group_num}", __name__, "dealOneHandheldMeasurement") self.real_wavelength = self.getWavelenthDict() if group_num == 1: self.dealOneGroup() # self.intensity_before_avg return self.dealMultiGroup(group_num) def dealMultiGroup(self, group_num:int ): log.info(f"group_num: {group_num}", __name__, "dealMultiGroup") # 分组进行处理 for i in range(group_num): self.current_group_num = i # 重设当前测量时间 self.current_measure_time = "" self.res = {} tmp_before_avg = [] for j in range( self.measurement_repeat ): tmp_before_avg.append( self.intensity_before_avg[j+i*self.measurement_repeat] ) pass self.getAvg(tmp_before_avg) self.__do_sensor_dict_interpo() self.appendSave( ) def dealOneGroup(self, ): # 分组,并获得平均值, 255个未插值结果 (依据 measurement_interval measurement_repeat) self.getAvg( self.intensity_before_avg ) # 插值 self.real_wavelength = self.getWavelenthDict() self.__do_sensor_dict_interpo() self.ymdhms = "20"+ str(self.info_dict['year']) + '_' \ + str(self.info_dict['month']) + '_' \ + str(self.info_dict['day']) + '_' \ + str(self.info_dict['hour']) + '_' \ + str(self.info_dict['minute']) + '_' \ + str(self.info_dict['second']) self.output_path = self.output_path.joinpath( self.ymdhms ) self.appendSave() def dealOneMeasurement(self, fpath:Path): '''handheld一次测量包含多组数据''' # 调用handheldbuf 处理,将一组数据提交出来 log.info(f"dealOneMeasurement: 一组测量数据", __name__, "dealOneMeasurement") if len(self.filelist)<1: pass # 当前文件名 self.output_path = OUTPUT_DIR self.current_filepath = fpath self.current_measure_time = self.hhp.getCurrentMeasureTimeFromPath(fpath) self.get_ymdhms() self.output_path = self.output_path.joinpath( self.ymdhms ) log.debug(f"current_measure_time: {self.current_measure_time}", __name__, "dealOneHandheldMeasurement") self.hhb.readFile2Buf(fpath) self.decode_sensor_buf() self.real_wavelength = self.getWavelenthDict() self.dealOneGroup() def get_ymdhms(self, ): self.ymdhms = "20"+ f"{self.info_dict['year']:02d}" + '_' \ + f"{self.info_dict['month']:02d}" + '_' \ + f"{self.info_dict['day']:02d}" + '_' \ + f"{self.info_dict['hour']:02d}" + '_' \ + f"{self.info_dict['minute']:02d}" + '_' \ + f"{self.info_dict['second']:02d}" def dealOneMeasurement_Online(self, ): '''handheld一次测量包含多组数据''' # 调用handheldbuf 处理,将一组数据提交出来 log.info(f"dealOneMeasurement_Online: 一组测量数据", __name__, "dealOneMeasurement_Online") # 当前文件名 self.output_path = self.new_folder # self.current_measure_time = self.hhp.getCurrentMeasureTimeFromPath(fpath) self.get_ymdhms() # getBuf self.get_sensor_buf() self.decode_sensor_buf() self.real_wavelength = self.getWavelenthDict() self.getAvg( self.intensity_before_avg ) self.real_wavelength = self.getWavelenthDict() self.__do_sensor_dict_interpo() self.saveOnefileForLskyEskyLwaterLwRS() def get_sensor_buf(self,) : buf = b"" self.mydir.setBaseDir(self.new_folder) my_bin_list = self.mydir.get_files_from_currentdir("*.bin") my_bin_list = self.mydir.sort_filepath_and_check(my_bin_list) for mbl in my_bin_list: buf = buf + self.read_bin(mbl) self.hhb.write_buf(buf) pass def getTimeFnameFromInfoDict(self,) : ymdhms = "20"+ str(self.info_dict['year']) + '_' \ + str(self.info_dict['month']) + '_' \ + str(self.info_dict['day']) + '_' \ + str(self.info_dict['hour']) + '_' \ + str(self.info_dict['minute']) + '_' \ + str(self.info_dict['second']) return ymdhms def decode_sensor_buf(self,) : # 处理Buf,对多组数据取平均 self.intensity_before_avg = [] # res_before_avg = [] self.clearRes() # 清空数据 while True: ## 以下代码已经处理了 ip的数据帧 if not self.hhb.decode_one_group_handheld() : break # 清空数据 res = {} ## 上方代码已经处理了 ip的数据帧 buf = self.hhb.one_group_data # buf = self.hhb.one_group_data[26:] for i in range( 1, 4, 1 ): site = (i-1) * 576 temp_buf = buf[7+site:71+site] + buf[79+site:143+site] + \ buf[151+site:215+site] + buf[223+site:287+site] + \ buf[295+site:359+site] + buf[367+site:431+site] + \ buf[439+site:503+site] + buf[511+site:575+site] # Ramses类 标定处理,设置buf 标定文件 self.ramses.setBuf(temp_buf) func = self.getFuncBySeq(i) cfg = self.getCfgByFunc( func) self.ramses.setCalCfg(cfg) self.ramses.resetItSpectrum() self.ramses.ConvertAndCalibrate() res.update( { func : self.ramses.spectrum } ) self.intensity_before_avg.append( res ) pass def getAvg( self, d:list) : log.info(f"getAvg: 平均多组数据", __name__, "getAvg") data = d ret = {} len_result = len(data) if len_result == 0: self.intensity_after_avg ={} return None if len_result == 1: self.intensity_after_avg = data[0] return None ret = data[0] res_dict = self.getRetDict() for k in res_dict.keys(): for i in range(1,len_result,1): data[0][k] = data[0][k] + data[i][k] ret = data[0][k]/len_result self.intensity_after_avg.update( { k : ret } ) log.debug(f"getAvg: {self.intensity_after_avg}", __name__, "getAvg") pass def getRetDict(self,) : ret_dict = { } ret_dict.update( {self.device_enum(1).name:np.array([])} ) ret_dict.update( {self.device_enum(2).name:np.array([])} ) ret_dict.update( {self.device_enum(3).name:np.array([])} ) # self.one_group_result = ret_dict return ret_dict def __do_sensor_dict_interpo(self,) : log.info( f"同步处理多个个插值 ", __name__, "__do_sensor_dict_interpo" ) self.clearRes() for k in self.intensity_after_avg.keys(): tmp = np.interp( self.wavelength, self.real_wavelength[k], self.intensity_after_avg[k] ) self.res.update( { k : tmp } ) def getCfgByDid(self, func:str) : cfg_id:dict = self.cfg.get(int(self.device_id)) cfg_sensor = cfg_id.get( func) return cfg_sensor pass def getFuncBySeq(self, seq:int) : func = "" if self.device_type == DeviceType.AWRAMS.name: func = RamsesAWRAMS(seq).name if self.device_type == DeviceType.SURFACE.name: func = RamsesAWRAMS(seq).name if self.device_type == DeviceType.PROFILE.name: func = RamsesAWRAMS(seq).name return func def getCfgByFunc(self, func:str) : cfg_id:dict = self.syscfg.get(int(self.device_id)) cfg_sensor = cfg_id.get( func) return cfg_sensor pass def getCfgBySeq(self, seq:int) : func = "" if self.device_type == DeviceType.AWRAMS.name: func = RamsesAWRAMS(seq).name if self.device_type == DeviceType.SURFACE.name: func = RamsesAWRAMS(seq).name if self.device_type == DeviceType.PROFILE.name: func = RamsesAWRAMS(seq).name cfg_id:dict = self.syscfg.get(int(self.device_id)) cfg_sensor = cfg_id.get( func) return cfg_sensor pass def saveOnefileForLskyEskyLwaterLwRS(self, ) -> bool: log.info(f" ", __name__, "saveOnefileForLskyEskyLwaterLwRS") self.mydir.setBaseDir(self.output_path) #基路径 time_name = self.getTimeFnameFromInfoDict() Lw = self.res["Lwater"] - ROWFACTOR * self.res["Lsky"] self.res.update({ self.device_enum(4).name : Lw }) Rs = self.res[self.device_enum(4).name] / self.res["Esky"] self.res.update({ self.device_enum(5).name : Rs }) self.mydir.setHeader( self.wavelength.tolist(), TOKEN, "device_id_"+str(self.device_id) ) header = self.mydir.header_str save_path_csv = self.new_folder.joinpath(self.ymdhms + SAVE_EXT_NAME) data_str = "" for k in self.res.keys(): self.mydir.setContent(self.res[k].tolist(),TOKEN,k ) data_str = data_str + NEWLINE + self.mydir.content_str save_path_csv.write_text(header+data_str) path_info_txt = self.new_folder.joinpath( self.ymdhms + "_info.txt" ) self.save_dict_to_file( self.info_dict, path_info_txt ) self.do_retrieve() self.append_retrieve(save_path_csv) def do_retrieve(self, ) -> None: '''反演参数并保存 self.res''' par_400_700 = self.get_par_400_700() par_350_950 = self.get_par_350_950() chl = self.get_chl() cdom = self.get_CDOM() self.res_retrieve = [ par_400_700, par_350_950, chl, cdom ] pass def append_retrieve(self, fpath) -> None: '''反演参数并保存 self.res''' # save_path_csv:Path = self.mydir.current_filepath res = NEWLINE + "res" for r in self.res_retrieve: res = res + TOKEN + str(r) with open(fpath, '+a') as f: f.write(res) pass def get_par_400_700(self, ) -> float: '''反演参数并保存 基于self.res''' par_400_700 = 0.0 for i in range( 50, 350, 1 ): par_400_700 = par_400_700 + self.res['Esky'][i] return par_400_700 pass def get_par_350_950(self, ) -> float: '''反演参数并保存 基于self.res''' par_350_950 = 0.0 for i in range( 0, 600, 1 ): par_350_950 = par_350_950 + self.res['Esky'][i] return par_350_950 pass def get_chl(self, ) -> float: '''反演参数并保存 基于self.res''' return 0.0 pass def get_CDOM(self, ) -> float: '''反演参数并保存 基于self.res''' return 0.0 pass def appendSave(self, ) -> bool: '''self.output_path''' self.checkAndSaveData( ) self.getLwRsAndSave() path_info_txt = self.output_path.joinpath( "info.txt" ) self.save_dict_to_file( self.info_dict, path_info_txt ) def checkAndSaveData(self, ) -> bool: """ check self.Lsky Esky Lwater and Save 处理self.res 的数据 """ log.info(f"checkAndSaveData: {self.output_path.parts}", __name__) self.mydir.setBaseDir(self.output_path) #基路径 self.mydir.setHeader( self.wavelength.tolist(), TOKEN, "device_id_"+str(self.device_id) ) # print(f"header_str : {self.mydir.header_str[-1:]}") if self.current_group_num == 0: self.newFileByFunc( self.device_enum(1).name ) self.newFileByFunc( self.device_enum(2).name ) self.newFileByFunc( self.device_enum(3).name ) pass self.appendFileByFunc( self.device_enum(1).name ) self.appendFileByFunc( self.device_enum(2).name ) self.appendFileByFunc( self.device_enum(3).name ) def newFileByFunc(self, func:str) -> None: self.mydir.newFileIfNot( func+SAVE_EXT_NAME) if self.mydir.checkHeader() == -1: log.error(f"请备份文件:{self.mydir.current_filepath.parent} {self.mydir.current_filepath.name}, 并删除文件后再试!", __name__) raise MyException( f"请备份文件:{self.mydir.current_filepath.parent} {self.mydir.current_filepath.name}, 并删除文件后再试!" ) return False if self.mydir.checkHeader() == 1: pass if self.mydir.checkHeader() == 0: self.mydir.writeHeader() pass # 写入数据content self.mydir.setContent(self.res[func].tolist(), TOKEN, self.current_measure_time) self.mydir.writeContent() def appendFileByFunc(self, func:str) -> None: # 追加写入数据content self.mydir.setContent(self.res[func].tolist(), TOKEN, self.current_measure_time) self.mydir.writeContent() def clearRes(self, ) -> None: self.res = { } def getLwRsAndSave(self, ) -> bool: """ 并计算Lw Rs并保存 """ Lw = self.res["Lwater"] - ROWFACTOR * self.res["Lsky"] self.res.update({ "Lw" : Lw }) Rs = self.res["Lw"] / self.res["Esky"] self.res.update({ "Rs" : Rs }) self.mydir.setBaseDir(self.output_path) #基路径 # 保存 if self.current_group_num == 0: self.newFileByFunc( "Lw" ) self.newFileByFunc( "Rs" ) pass self.appendFileByFunc( "Lw" ) self.appendFileByFunc( "Rs" ) return True def read_bin(self,fpath: Path): log.debug(f" readbin: ", __name__, "", "" ) ret = None if not fpath.exists() : log.info(f"not find file: {fpath} ") return ret with open(fpath, 'rb') as file: ret = file.read() return ret log.debug(f" readbin: {ret} ", __name__, "", "" ) return ret pass def decode_info( self,info: bytes ) -> dict: ret = {} # 剖面型加了1311+24个字节共26个字节 # 保留字节有所变化,改为序列号,每个序列号两个字节 try: temp = struct.unpack(" None: temp_str = "" for key, value in info_dict.items(): temp_str = temp_str + key + " : " + str(value) + "\n" with open(fpath, "w+") as f: f.write(temp_str) def save_error_to_file(self, errlist: list, fpath: Path) -> None: temp_str = "" if len(errlist) <1: return None for errdict in errlist: temp_str = temp_str + errdict["path"] +" : "+ errdict["error"] + "\n" pass with open(fpath, "w+") as f: f.write(temp_str) return None pass def getWavelenthDict( self, ) -> dict: ret_dict = self.getRetDict() cfg_id:dict = self.syscfg.get( int(self.device_id)) for k in ret_dict.keys(): tmp = self.getWavelength( cfg_id.get(k) ) ret_dict.update({ k : tmp }) return ret_dict def getWavelength(self,ramsesdict:dict) -> np.ndarray: ret = [] for i in range(2,257): tmp = float(ramsesdict['c0s']) + float(ramsesdict['c1s'])*i \ + float(ramsesdict['c2s'])*i*i + float(ramsesdict['c3s'])*i*i*i ret.append(tmp) pass return np.array(ret) def setSyscfg(self, syscfg:dict): self.syscfg = syscfg def setRetrieve(self, retrieve:dict): log.debug( f"setRetrieve : {retrieve}......", __name__ ) self.retrieve = retrieve self.setNewWavelength() # self.wavelength:np.ndarray = np.arange ( self.retrieve['beginWL'], self.retrieve['endWL'], self.retrieve['interval'] ) def setNewWavelength(self, ): self.wavelength:np.ndarray = np.arange ( self.retrieve['beginWL'], self.retrieve['endWL'], self.retrieve['interval'] ) def setDeviceID(self, did:int): self.device_id = did def setDeviceType(self, device_type): self.device_type = device_type def getErrorInfoDict(self, ): return self.error_result def printResult(self,): log.info( f"***** : Print Lsky Esky Lwater Lw Rs......", __name__ ) print(self.res[self.device_enum(1)]) print(self.res[self.device_enum(2)]) print(self.res[self.device_enum(3)]) print(self.res[self.device_enum(4)]) print(self.res[self.device_enum(5)]) pass def printTest(self,d,sleep_time:int=5): log.info( f"***** : I am testing ********", __name__ ) print(d) log.info( f"***** : Ending testing ********", __name__ ) time.sleep(sleep_time) pass if __name__ == "__main__": # hh = HandHeld() # hh.getDataFileList() # hh.dealAllMeasurements() # data/2023_02/07/sensor/14_02_46 # hhb= HandHeldBuf() # hhb.readFile2Buf("data/2023_02/07/sensor/14_02_46") # hhb.decode_handheld() # print(hh.error_result) # print("======") # print(hh.filelist) # print( hh.getDataFname() ) # t = b'\x07' # print( int(t[0])) # print( 2 << int(t[0])) pass