import numpy as np import struct import math import time from scipy.optimize import leastsq from myconfig import TOKEN, DATA_DIR, FILE_MARK, OUTPUT_DIR, CAL_DIR from myconfig import PURE_WATER_FNAME,SAVE_EXT_NAME,OSCAR_R,OSCAR_R0 from myconfig import MyConfig from configuration import Configuration from myexception import MyException from mylogger import log from mypath import MyDir from mythread import Multithread from readcal import ReadCal from uart import Uart from pathlib import Path, PurePath from pubsub import pub FLOAT_RESERVE_BIT = 8 class Oscar(object): def __init__(self,): self.cfg = {} self.mydir = MyDir() self.file_lst = [] self.raw_wavelength = [] self.raw_wavelength_np = np.array([]) self.purewater_wavelength = [] self.purewater_attenuation = [] self.output_wavelength = [] self.basis = [] self.reflectivity = [] self.begin = 0 self.end = 0 self.dir :Path = Path() self.datadir :Path = self.dir.joinpath(DATA_DIR) self.output_dir:Path= self.datadir.joinpath(OUTPUT_DIR) self.measure_time = '' self.data = [] self.res = b'' self.msg = {"type":"", "data":{}} self.__get_purewater_wl_and_data() self.purewater_after_interp = [] self.uart = Uart() self.uart_thread = Multithread() self.ui_sn = '' self.devicesn = '' self.filesn = '' pass def set_cfg(self, cfg): self.cfg = cfg self.ui_sn = self.cfg['device']['UISN'] self.__get_cal_info() pass # def set_read_para(self, cfg): # self.cfg = cfg # self.ui_sn = self.cfg['device']['UISN'] # pass # def set_uart_para(self, cfg): # self.cfg = cfg # self.ui_sn = self.cfg['device']['UISN'] # pass def __set_msg(self, typ, d): self.msg = {} self.msg.update( {"type":typ} ) self.msg.update( {"data":d} ) pass # def file_deal_thread(self, func) -> list: # self.get_data_files() # self.__deal_file_lst(func) # pass def get_data_files(self,) -> list: # 读取配置文件 self.mydir.setBaseDir(Path(self.datadir)) self.file_lst = self.mydir.get_files_from_currentdir(fmt="*/*.DAT") def get_begin_end(self,) -> list: # 读取配置文件 for i in range( len(self.raw_wavelength)): if self.raw_wavelength[i]self.cfg['retrieve']['beginWL']: self.begin = i+1 pass if self.raw_wavelength[i]self.cfg['retrieve']['endWL']: self.end = i+2 break self.get_output_wavelength() msg = "起始波长 : "+ str(self.output_wavelength[0]) +" , 结束波长 : "+ str(self.output_wavelength[-1]) log.info(msg, __name__, '','') self.__set_msg ("notice", msg ) pub.sendMessage(self.msg) def get_output_wavelength(self,) : self.output_wavelength = self.raw_wavelength[self.begin:self.end] pass def deal_file_lst(self, ): for fl in self.file_lst: log.info(f"-> {fl} ",__name__,"deal_file_lst") # 判断序列号是否一致file_lst if not self.__check_sn_from_datafile(fl): raise MyException( "文件的序列号和系统设置不一致" ) # 判断原始波长是否为空 if self.raw_wavelength == []: self.__get_raw_wl_from_datafile(self.file_lst[0]) # 获得截取的开始 结束点 self.get_begin_end() # 纯水系数插值 self.interpo_pure_water() # 准备储存数据文件 self.__prepare_for_save() ReadCal.read_columns_sets_by_mark_callback( fl, FILE_MARK, self.deal_measure_time_data, 1) # self.__set_msg( "notice", "文件处理完毕" ) # pub.sendMessage( "update", msg = self.msg ) def deal_measure_time_data(self, sn, res_time, res_data): log.info(f" -> time : {res_time}",__name__, "deal_one_measure_time_data") log.info(f" -> datalen : {len(res_data)} ",__name__, "deal_one_measure_time_data") basis = self.basis[self.begin:self.end] refl = self.reflectivity[self.begin:self.end] for i in range(len(res_time)): data = self.convert_str_2_float_list( res_data[i][0][self.begin:self.end] ) abs_coeff_with_water = self.get_absorption_coeff( data,basis,refl,self.purewater_after_interp) data = self.correction_pure_water(abs_coeff_with_water) # data = data.tolist() # print(data) self.mydir.setContent( data , TOKEN, res_time[i] ) self.mydir.writeContent() self.measure_time = res_time[i] self.data = data self.__set_msg( "data", {"time":res_time[i], "data":data } ) pub.sendMessage("update", msg=self.msg) # pub.sendMessage("update", msg=res_data[i][0][self.begin:self.end]) def __check_sn_from_datafile(self, fpath) -> bool: # 读取配置文件 sn = ReadCal.readFileSNbyIDDevice(fpath) if sn == self.ui_sn: return True return False def __get_raw_wl_from_datafile(self, fpath) -> None: # 读取配置文件 _, raw_wl = ReadCal.read_columns_set_by_mark(fpath, FILE_MARK, 0) self.raw_wavelength = self.convert_str_2_float_list (raw_wl[0]) self.raw_wavelength_np = np.array(raw_wl[0]) return None def __prepare_for_save(self,) -> bool: self.mydir.setBaseDir(self.output_dir) self.mydir.newDirIfNot() self.mydir.newFileIfNot(self.ui_sn+SAVE_EXT_NAME) self.mydir.setHeader(self.output_wavelength, TOKEN, self.ui_sn) if self.mydir.checkHeader() == 0: self.mydir.writeHeader() if self.mydir.checkHeader() == -1: # self.popDialog(" 文件头不一致, 请备份到其他目录,并在该目录下删除") raise MyException(" 文件头不一致, 请备份到其他目录,并在该目录下删除") def interpo_pure_water(self): self.purewater_after_interp = np.interp( np.array(self.output_wavelength) , np.array(self.purewater_wavelength) , np.array(self.purewater_attenuation) ) pass def correction_turbidity(self, data:np.ndarray ): '''浊度校正, 吸光度 0 : 默认11项平均 1 : 720 2 : 不浊度校正 ''' log.debug( "correction_turbidity .....",__name__, 'correction_turbidity' ) if self.cfg['algorithm']['A720'] == 0: count = data.shape[0] tmp = 0.0 for i in range(count-11,count,1): tmp = tmp + data[i] tmp = tmp/11 return data - tmp pass if self.cfg['algorithm']['A720'] == 1: count = data.shape[0] tmp = data[count] - (self.output_wavelength[count]-720) * (data[count] -data[count-1]) \ / (self.output_wavelength[count]-self.output_wavelength[count-1]) print(f"tmp .... {tmp}") return data-tmp pass if self.cfg['algorithm']['A720'] == 2: return data return data pass def correction_pure_water(self, data:np.ndarray ): '''浊度校正 0 : 不变 1 : 减去纯水 ''' if self.cfg['algorithm']['PureWater'] == 0: return data pass if self.cfg['algorithm']['PureWater'] == 1: return data - self.purewater_after_interp pass return data pass def __get_cal_info(self ): log.info(f" == ",__name__,'__get_cal_info') caldir = self.dir.joinpath(CAL_DIR ) basis_fpath = caldir.joinpath( "Basis aq_" + self.cfg['device']['UISN'] + ".dat") reflectivity_fpath = caldir.joinpath( "Reflectivity_" + self.cfg['device']['UISN'] + ".dat") if basis_fpath.exists(): _, res_data = ReadCal.read_columns_sets_by_mark( basis_fpath, FILE_MARK, 1) self.basis = self.convert_str_2_float_list( res_data[0][0] ) if reflectivity_fpath.exists(): _, res_data = ReadCal.read_columns_sets_by_mark( reflectivity_fpath, FILE_MARK, 1) self.reflectivity = self.convert_str_2_float_list( res_data[0][0] ) def __get_purewater_wl_and_data(self ): log.debug(f" ok ",__name__,'__get_purewater_wl_and_data') caldir = self.dir.joinpath(CAL_DIR ) cal_fpath = caldir.joinpath( PURE_WATER_FNAME) if cal_fpath.exists(): _, res_data = ReadCal.read_columns_sets_by_mark( cal_fpath, FILE_MARK, 0, 1) self.purewater_wavelength = self.convert_str_2_float_list( res_data[0][0]) self.purewater_attenuation = self.convert_str_2_float_list(res_data[0][1]) def set_serial(self, )-> None: self.uart.set_serial_para( self.cfg['comsetting']['port'] ,self.cfg['comsetting']['baudrate'] ,self.cfg['comsetting']['bytesize'] ,self.cfg['comsetting']['parity'] ,self.cfg['comsetting']['stopbit'] ) def set_modbus(self, beginaddress, step)-> None: self.uart.set_modbus( self.cfg['register']['slaveaddress'] ,self.cfg['register']['functioncode'] ,beginaddress ,step ) def sn_uart_thread(self, func)-> None: self.set_serial() self.uart_thread.remove_tasks() self.set_modbus( self.cfg['register']['SNAddress'], self.cfg['register']['SNLen'] ) self.uart_thread.add_task( self.device_sn, args=(func,) ) self.uart_thread.sequently_execute_tasks() pass def wl_uart_thread(self, func)-> None: if self.devicesn == []: raise Exception(" 请读取设备序列号!!") self.res = b'' self.set_serial() self.uart_thread.remove_tasks() for i in range(self.cfg['register']['count']): log.debug(f" -> i {i}", __name__,"wl_uart_thread") self.set_modbus( self.cfg['register']['WLBeginAddress'] + i*120, 120 ) log.debug(f" command -> {self.uart.command.hex()} ") self.uart_thread.add_task( self.device_wl, args=() ) self.uart_thread.sequently_execute_tasks() self.uart_thread.remove_tasks() # self.uart_thread.add_task_2_act( ) # self.uart_thread.execute_one_act() self.raw_wavelength = self.convert_buf_2_float(self.res) # 获得截取的开始 结束点 self.get_begin_end() self.get_output_wavelength() msg = "起始波长 : "+str(self.output_wavelength[0]) +" , 结束波长 : "+ str(self.output_wavelength[-1]) log.info(msg) self.__set_msg ("notice", msg ) pub.sendMessage(self.msg) func( self.output_wavelength ) # self.__set_msg( "wl", {} ) # pub.sendMessage("update", msg= self.msg) def log_uart_thread(self, func)-> None: if self.output_wavelength == [] : raise Exception("没有波长文件,请读取设备波长!!") if self.purewater_after_interp == [] : self.interpo_pure_water() # 准备测量,保存文件的准备 self.__prepare_for_save() self.res = b'' self.set_serial() self.uart_thread.remove_tasks() for i in range(self.cfg['register']['count']): self.set_modbus( self.cfg['register']['DataBeginAddress'] + i * 120, 120 ) self.uart_thread.add_task( self.device_log, args=( ) ) self.uart_thread.sequently_execute_tasks() self.uart_thread.remove_tasks() pass data = self.convert_buf_2_float(self.res,FLOAT_RESERVE_BIT) data = data[self.begin:self.end] log.info( f"log data : {data}",__name__,'log_uart_thread' ) abs_coeff_with_water = self.get_absorption_coeff( data, self.basis[self.begin:self.end], self.reflectivity[self.begin:self.end], self.purewater_after_interp ) data = self.correction_pure_water( abs_coeff_with_water ) # 保存文件 tm = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()) self.mydir.setContent( data , TOKEN, tm ) self.mydir.writeContent() func(tm,data) def device_sn(self, func)-> None: self.res =b'' self.uart.disconnect() self.uart.connect() self.uart.write() self.res = self.uart.OneMeasure() self.devicesn = self.res[-6:-2].decode() self.res = b'' func( self.devicesn ) # pub.sendMessage("update",msg=self.msg) def device_wl(self, )-> None: # self.res =b'' # log.warning(" -->", __name__, "device_wl") self.uart.disconnect() self.uart.connect() self.uart.write() tmp = self.uart.OneMeasure() self.res = self.res + tmp[3:len(tmp)-2] def device_log(self, )-> None: # self.res =b'' self.uart.disconnect() self.uart.connect() self.uart.write() tmp = self.uart.OneMeasure() self.res = self.res + tmp[3:len(tmp)-2] def convert_buf_2_float(self, buff, bit = 3 ,byteOrder= "big" )-> None: res = [] log.debug(f" === {len(buff)}== {buff}", __name__, 'convert_buf_2_float') len_ = len(buff) if len_%4 != 0: return res if byteOrder == "big": for i in range( int(len_/4) ): tmp = struct.unpack(">f", buff[i*4: i*4+4] ) res.append( round(tmp[0],bit) ) else: for i in range( int(len_/4) ): tmp = struct.unpack(">f", buff[i*4: i*4+4] ) res.append(round(tmp[0],bit)) return res pass def convert_str_2_float_list(self, lst )-> None: res = [] for l in lst: res.append(float(l)) return res pass def set_raw_wavelength_from_device(self, lst): self.raw_wavelength = lst pass def get_absorption_coeff( self, data, refl, basis_aq, abs_coff_pw ): """ @description : d 一组数据 """ r = OSCAR_R r0 = OSCAR_R0 abs_coeff_with_pw =[] # log.info( f" do_leastsq -> {len(data)} " ) if len(data) == len( refl) or len(data) == len( basis_aq) or len(data) == len( abs_coff_pw) : for fa,fb,rou,ab in zip(data, basis_aq,refl, abs_coff_pw ): Tab = float(fa)/float(fb) #print( "Tab %s" %Tab) EXP_ab_r0 = np.exp(-1*float(ab)*r0) #print ( "Exp_ab_r0 %s" %EXP_ab_r0) Ps_ab_r = ( 1 - ( 2*float(ab)*r + 1) * np.exp (-2*float(ab)*r) ) /( 2*float(ab)*float(ab)*r*r ) #print ( "Ps_ab_r %s" %Ps_ab_r) # EXP_aa_r0 = exp(-1*aa*r0) # Ps_aa_r = ( 1 - ( 2*aa*r + 1) * exp (-2*aa*r) ) /( 2*aa*aa*r*r ) def f(x): return Tab*EXP_ab_r0*Ps_ab_r - \ np.exp(-1*x*r0)*( 1 - ( 2*x*r + 1) * np.exp (-2*x*r) ) /( 2*x*x*r*r ) - \ float(rou)* Tab *EXP_ab_r0*Ps_ab_r*( 1 - ( 2*x*r + 1) * np.exp (-2*x*r) ) /( 2*x*x*r*r ) + \ float(rou)* np.exp(-1*x*r0)*Ps_ab_r *( 1 - ( 2*x*r + 1) * np.exp (-2*x*r) ) /( 2*x*x*r*r ) result_with_pw = leastsq(f, 1)[0][0] # print( result_with_pw ) # 添加到数组 abs_coeff_with_pw.append( result_with_pw ) return abs_coeff_with_pw if __name__ == '__main__': data = np.array([3,4,5,6]) print(data.shape[0]) print(data[3]) print(data-1) # if self.syscfg['retrieve']['enable'] == 0: # self.mydir.setHeader(self.raw_wavelength,TOKEN,self.ui_sn) # if self.mydir.checkHeader()==0: # self.mydir.writeHeader() # if self.mydir.checkHeader()==-1: # self.popDialog(" 文件头不一致, 请备份到其他目录,并在该目录下删除") # raise MyException(" 文件头不一致, 请备份到其他目录,并在该目录下删除") # res_time,res_data = ReadCal.read_columns_sets_by_mark( fl, FILE_MARK, 1 ) # for i in range( len(res_time) ): # self.__deal_one_measure_time_data(res_time[i], res_data[i]) # pass # pass # # 需要插值处理波长 # if self.syscfg['retrieve']['enable'] == 1: # self.mydir.setHeader(self.new_wavelength.tolist(),TOKEN,self.ui_sn) # if self.mydir.checkHeader()==0: # self.mydir.writeHeader() # if self.mydir.checkHeader()==-1: # self.popDialog(" 文件头不一致, 请备份到其他目录,并在该目录下删除") # raise MyException(" 文件头不一致, 请备份到其他目录,并在该目录下删除") # # res_data 不用带回调的函数callback 会导致多组数据拼接?? # res_time,res_data = ReadCal.read_columns_sets_by_mark( fl, FILE_MARK, 1 ) # log.warning(f" ==== {len(res_time)}" ) # log.warning(res_time) # log.warning(f" ==== {len(res_data[0])}" ) # log.warning(f" ==== { res_data[0] }" ) # for i in range( len(res_time) ): # self.__deal_one_measure_time_data(res_time[i], res_data[0][i]) # pass # pass