from dataclasses import dataclass from pathlib import Path from typing import List, Any, Callable from pubsub import pub import numpy as np from enum import Enum import struct import math import time from myconfig import TOKEN, DATA_DIR, FILE_MARK, OUTPUT_DIR, CAL_DIR from myconfig import PURE_WATER_FNAME, SAVE_EXT_NAME,FLOAT_RESERVE_BIT from myexception import MyException from mypath import MyDir from scipy.optimize import leastsq class WorkMode(Enum): FILEMODE=1 DEVICEMODE=2 class AlgorithMode(Enum): A720 =1 PureWater=2 class PureWaterData: wavelength= [360.0, 365.0, 370.0, 375.0, 380.0, 385.0, 390.0, 395.0, 400.0, 405.0, 410.0, 415.0, 420.0, 422.5, 425.0, 427.5, 430.0, 432.5, 435.0, 437.5, 440.0, 442.5, 445.0, 447.5, 450.0, 452.5, 455.0, 457.5, 460.0, 462.5, 465.0, 467.5, 470.0, 472.5, 475.0, 477.5, 480.0, 482.5, 485.0, 487.5, 490.0, 492.5, 495.0, 497.5, 500.0, 502.5, 505.0, 507.5, 510.0, 512.5, 515.0, 517.5, 520.0, 522.5, 525.0, 527.5, 530.0, 532.5, 535.0, 537.5, 540.0, 542.5, 545.0, 547.5, 550.0, 552.5, 555.0, 557.5, 560.0, 562.5, 565.0, 567.5, 570.0, 572.5, 575.0, 577.5, 580.0, 582.5, 585.0, 587.5, 590.0, 592.5, 595.0, 597.5, 600.0, 602.5, 605.0, 607.5, 610.0, 612.5, 615.0, 617.5, 620.0, 622.5, 625.0, 627.5, 630.0, 632.5, 635.0, 637.5, 640.0, 642.5, 645.0, 647.5, 650.0, 652.5, 655.0, 657.5, 660.0, 662.5, 665.0, 667.5, 670.0, 672.5, 675.0, 677.5, 680.0, 682.5, 685.0, 687.5, 690.0, 692.5, 695.0, 697.5, 700.0, 702.5, 705.0, 707.5, 710.0, 712.5, 715.0, 717.5, 720.0, 722.5, 725.0, 727.5, 730.0, 735.0, 740.0, 745.0, 750.0] coeff=[0.0066, 0.0063, 0.006, 0.0056, 0.0052, 0.005, 0.0048, 0.0047, 0.0046, 0.0046, 0.0046, 0.0046, 0.00454, 0.00474, 0.00478, 0.00482, 0.00495, 0.00504, 0.0053, 0.0058, 0.00635, 0.00696, 0.00751, 0.0083, 0.00922, 0.00969, 0.00962, 0.00957, 0.00979, 0.01005, 0.01011, 0.0102, 0.0106, 0.0109, 0.0114, 0.0121, 0.0127, 0.0131, 0.0136, 0.0144, 0.015, 0.0162, 0.0173, 0.0191, 0.0204, 0.0228, 0.0256, 0.028, 0.0325, 0.0372, 0.0396, 0.0399, 0.0409, 0.0416, 0.0417, 0.0428, 0.0434, 0.0447, 0.0452, 0.0466, 0.0474, 0.0489, 0.0511, 0.0537, 0.0565, 0.0593, 0.0596, 0.0606, 0.0619, 0.064, 0.0642, 0.0672, 0.0695, 0.0733, 0.0772, 0.0836, 0.0896, 0.0989, 0.11, 0.122, 0.1351, 0.1516, 0.1672, 0.1925, 0.2224, 0.247, 0.2577, 0.2629, 0.2644, 0.2665, 0.2678, 0.2707, 0.2755, 0.281, 0.2834, 0.2904, 0.2916, 0.2995, 0.3012, 0.3077, 0.3108, 0.322, 0.325, 0.335, 0.34, 0.358, 0.371, 0.393, 0.41, 0.424, 0.429, 0.436, 0.439, 0.448, 0.448, 0.461, 0.465, 0.478, 0.486, 0.502, 0.516, 0.538, 0.559, 0.592, 0.624, 0.663, 0.704, 0.756, 0.827, 0.914, 1.007, 1.119, 1.231, 1.356, 1.489, 1.678, 1.97, 2.51, 2.78, 2.83, 2.85] @dataclass class SerialPort : port: str = None baudrate: int = None bytesize: int = None parity: str =None stopbit: int =None def __post_init__(self): pass def set_serial_port_(self, sp:dict): self.port = sp['port'] self.baudrate = sp['baudrate'] self.bytesize = sp['bytesize'] self.parity = sp['parity'] self.stopbit = sp['stopbit'] pass @dataclass class Registers : slaveaddress: int = 1 functioncode: int = 3 DataBeginAddress: int = 2614 SNAddress: int = 2840 # 10 SNLen: int = 5 WLBeginAddress: int = 2102 count: int = 2 snBuf:bytes = b'' wavelengthBuf: bytes = b'' intensityBuf: bytes = b'' def __post_init__(self): pass def set_register(self, rg:dict): self.DataBeginAddress = rg['DataBeginAddress'] self.SNAddress = rg['SNAddress'] self.SNLen = rg['SNLen'] self.WLBeginAddress = rg['WLBeginAddress'] self.count = rg['count'] self.functioncode = rg['functioncode'] self.slaveaddress = rg['slaveaddress'] pass @dataclass class LogSetting : LogInterval: int =None RefreshInterval: int =None def set_log_setting(self, dct:dict): self.LogInterval = dct['LogInterval'] self.RefreshInterval = dct['RefreshInterval'] @dataclass class PlotSetting : LineBegin: int =None LineInterval: int =None def set_plot_setting(self, dct:dict): self.LineBegin = dct['LineBegin'] self.LineInterval = dct['LineInterval'] @dataclass class Retrieve : beginWL: int =None endWL: int=None interval: int=None def set_retrieve(self, dct:dict): self.beginWL = dct['beginWL'] self.endWL = dct['endWL'] self.interval = dct['interval'] @dataclass class Algorithm : # A720: int =None PureWater: int =None def set_algorithm(self, dct:dict): # self.A720 = dct['A720'] self.PureWater = dct['PureWater'] @dataclass class ConfigOscar : SN: str = None # lightPath: float = None oscarR :float = 0.04 oscarR0 :float = 0.035 mode:WorkMode = None filePath: Path = None deviceSN:str = None rawWavelength: list = None rawIntensity: list = None Wavelength: list = None Intensity: list = None # raw_wavelength_np = np.array([]) purewaterWavelength: list = None purewaterAttenuation: list = None purewaterAttAfterInterp: np.ndarray = None basisAQ:list = None relectivity:list = None outputWavelength: list = None beginSite:int =None endsite:int = None measureTime:str =None retrieve: Retrieve = None algorithm: Algorithm = None serailPort: SerialPort = None register: Registers = None logSetting: LogSetting = None plotSetting: PlotSetting = None absorptionCoef :list =None def __post_init__(self): self.retrieve = Retrieve() self.algorithm = Algorithm() pass class MyOscar(object): def __init__(self, sn: str = None): self.oscar = ConfigOscar(SN=sn) self.cfg= None self.file_lst =[] self.mydir = MyDir() self.ui_sn = "" self.devicesn_ok = False pass def set_SN(self, sn :str): self.oscar.SN = sn self.__prepare_for_save() def set_mode(self, mode:WorkMode=WorkMode.FILEMODE): self.oscar.mode = mode if self.oscar.mode == WorkMode.FILEMODE: pass if self.oscar.mode == WorkMode.DEVICEMODE: self.oscar.serailPort =SerialPort() self.oscar.serailPort.set_serial_port_( self.cfg['comsetting']) self.oscar.register = Registers() self.oscar.register.set_register( self.cfg['register']) pass def set_cfg_viper(self, cfg: dict): ''' cfg 从config.yaml 读出来的数据 ''' self.cfg = cfg if self.oscar.SN != cfg['device']['UISN']: raise Exception(f" 波长 不匹配") # self.oscar.lightPath= cfg['device']['UIPath'] self.oscar.algorithm.set_algorithm( cfg['algorithm'] ) self.oscar.retrieve.set_retrieve(cfg['retrieve'] ) self.oscar.logSetting =LogSetting() self.oscar.logSetting.set_log_setting(cfg['logsetting'] ) self.oscar.plotSetting =PlotSetting() self.oscar.plotSetting.set_plot_setting(cfg['plotsetting'] ) # if self.oscar.mode == WorkMode.FILEMODE: # pass # if self.oscar.mode == WorkMode.DEVICEMODE: # self.oscar.serailPort =SerialPort() # self.oscar.serailPort.set_serial_port_( cfg['comsetting']) # self.oscar.register =Registers() # self.oscar.register.set_register( cfg['register']) # pass pass def set_retrieve(self, rtv:dict ): self.oscar.retrieve.set_retrieve(rtv ) pass def set_raw_wavelength(self, raw_wavelength ): if self.oscar.retrieve is None: return self.oscar.rawWavelength = [float(i) for i in raw_wavelength] self.get_begin_end() self.get_output_wavelength() pass def set_raw_intensity(self, sn, time_str, intensity): if self.oscar.retrieve is None: return if sn != self.oscar.SN : raise MyException(f" 数据的波长 [{sn}] 与系统波长 [{self.oscar.SN}] 不匹配 ") # 赋值,并转为浮点 self.oscar.rawIntensity = [float(i) for i in intensity ] # 取有效波长范围数据 ,因为浊度校正须在720nm左右,去掉720以后波长 data = self.oscar.rawIntensity[self.oscar.beginSite:self.oscar.endsite] basis = self.oscar.basisAQ[self.oscar.beginSite:self.oscar.endsite] reflec = self.oscar.relectivity[self.oscar.beginSite:self.oscar.endsite] self.oscar.absorptionCoef = self.get_absorption_coeff( data,basis,reflec,self.oscar.purewaterAttAfterInterp) self.oscar.measureTime = time_str # # 分发数据 self.distribute_data( self.oscar.measureTime,self.oscar.absorptionCoef ) pass def get_absorption_coeff( self, data:List[float], basis_aq:List[float], refl:List[float], abs_coff_pw:List[float] ): """ @description : d 一组数据 """ abs_coeff_with_pw =[] if len(data) == len( refl) or len(data) == len( basis_aq) or len(data) == len( abs_coff_pw) : for fa,fb,rou,ab in zip(data, basis_aq,refl, abs_coff_pw ): abs_coeff_with_pw.append( self.my_do_leastsq( fa,fb,rou,ab ) ) return abs_coeff_with_pw def my_do_leastsq( self,fa:float,fb:float,rou:float,ab:float ): ''' 验证数据如下 aa = [0.374777, 0.97631290798748 , 0.481172 , 0.00642894, 7.903851278710607] bb= [1.23091 , 0.977035303934902 , 1.61263, 0.00622602 , 3.0728245039943833] ''' r = self.oscar.oscarR r0 = self.oscar.oscarR0 Tab = float(fa)/float(fb) #print( "Tab %s" %Tab) EXP_ab_r0 = np.exp(-1*float(ab)*r0) #print ( "Exp_ab_r0 %s" %EXP_ab_r0) Ps_ab_r = ( 1 - ( 2*float(ab)*r + 1) * np.exp (-2*float(ab)*r) ) /( 2*float(ab)*float(ab)*r*r ) #print ( "Ps_ab_r %s" %Ps_ab_r) # EXP_aa_r0 = exp(-1*aa*r0) # Ps_aa_r = ( 1 - ( 2*aa*r + 1) * exp (-2*aa*r) ) /( 2*aa*aa*r*r ) def f(x): return Tab*EXP_ab_r0*Ps_ab_r - \ np.exp(-1*x*r0)*( 1 - ( 2*x*r + 1) * np.exp (-2*x*r) ) /( 2*x*x*r*r ) - \ float(rou)* Tab *EXP_ab_r0*Ps_ab_r*( 1 - ( 2*x*r + 1) * np.exp (-2*x*r) ) /( 2*x*x*r*r ) + \ float(rou)* np.exp(-1*x*r0)*Ps_ab_r *( 1 - ( 2*x*r + 1) * np.exp (-2*x*r) ) /( 2*x*x*r*r ) result_with_pw = leastsq(f, 1)[0][0] return result_with_pw def set_pure_water(self,wavelength, atten): ''' 插值纯水, 为对应的输出波长''' self.oscar.purewaterWavelength = [float(i) for i in wavelength] self.oscar.purewaterAttenuation = [float(i) for i in atten] # print(f"== {self.oscar.purewaterWavelength}") # print(f"== {self.oscar.purewaterAttenuation}") if self.oscar.rawWavelength is None: raise MyException(f" 没有波长数据。") self.interpo_pure_water() self.__prepare_for_save() def interpo_pure_water(self): self.oscar.purewaterAttAfterInterp = np.interp( np.array(self.oscar.outputWavelength), np.array(self.oscar.purewaterWavelength), np.array(self.oscar.purewaterAttenuation)) pass def get_begin_end(self,) -> list: # 读取配置文件 for i in range(len(self.oscar.rawWavelength)): # print(f"i {i} {self.oscar.rawWavelength[i]}") if self.oscar.rawWavelength[i] < self.oscar.retrieve.beginWL \ and self.oscar.rawWavelength[i+1] > self.oscar.retrieve.beginWL: self.oscar.beginSite = i+1 pass if self.oscar.rawWavelength[i] < self.oscar.retrieve.endWL \ and self.oscar.rawWavelength[i+1] > self.oscar.retrieve.endWL: self.oscar.endsite = i+2 break self.get_output_wavelength() msg = "起始波长 : " + \ str(self.oscar.outputWavelength[0]) + \ " , 结束波长 : " + str(self.oscar.outputWavelength[-1]) self.__set_msg("notice", msg) pub.sendMessage(self.msg) def get_output_wavelength(self,): self.oscar.outputWavelength = self.oscar.rawWavelength[self.oscar.beginSite:self.oscar.endsite] pass pass def set_basis_aq(self,basis): self.oscar.basisAQ = [ float(i) for i in basis ] def set_reflectivity(self,reflec): self.oscar.relectivity = [ float(i) for i in reflec ] def distribute_data(self, time_str, data, mode =0): self.__set_msg( "data", {"time":time_str, "data":data } ) pub.sendMessage("update", msg=self.msg) # 保存数据 ??? self.mydir.setContent( self.oscar.absorptionCoef, TOKEN, self.oscar.measureTime ) self.mydir.writeContent() if mode == 1: print(f" \ wavelenght : {self.oscar.outputWavelength[0]} \ coef : {self.oscar.absorptionCoef[0]} \ purewater : {self.oscar.purewaterAttAfterInterp[0]} \ rawInt : {self.oscar.rawIntensity[self.oscar.beginSite]} \ ") def deal_measure_time_data(self, sn, res_time, res_data): ''' # res_time ['2011-01-28 00:00:32', '2011-01-28 00:01:04', '2011-01-28 00:04:05', '2011-01-28 00:04:17'] # res_data [ [[,,,]], [[,,,]], [[,,,]], [[,,,]]] 取 res_data[0] ''' # log.info(f" -> time : {res_time}",__name__, "deal_one_measure_time_data") # log.info(f" -> datalen : {len(res_data)} ",__name__, "deal_one_measure_time_data") if sn != self.oscar.SN: raise MyException(f" wrong SN file {sn} !! [SN={self.oscar.SN}]") for i in range(len(res_time)): self.set_raw_intensity( sn,res_time[i], res_data[i][0] ) self.__set_msg( "notice", "文件处理完毕" ) pub.sendMessage( "update", msg = self.msg ) # def correction_turbidity(self, data:np.ndarray ): # '''浊度校正, 吸光度 # 0 : 默认11项平均 # 1 : 720 # 2 : 不浊度校正 # ''' # # log.debug( "correction_turbidity .....",__name__, 'correction_turbidity' ) # if self.oscar.algorithm.A720 == 0: # count = data.shape[0] # tmp = 0.0 # for i in range(count-11,count,1): # tmp = tmp + data[i] # tmp = tmp/11 # # print(f"tmp ....{tmp}") # return data - tmp # pass # if self.oscar.algorithm.A720 == 1: # count = data.shape[0] # tmp = data[count] - (self.oscar.outputWavelength[count]-720) * (data[count] -data[count-1]) \ # / (self.oscar.outputWavelength[count]-self.oscar.outputWavelength[count-1]) # # print(f"tmp .... {tmp}") # return data-tmp # pass # if self.oscar.algorithm.A720 == 2: # return data # return data # pass def correction_pure_water(self, data:np.ndarray ): '''纯水校正 0 : 不变, 带有纯水 1 : 减去纯水 ''' if self.cfg['algorithm']['PureWater'] == 0: return data pass if self.cfg['algorithm']['PureWater'] == 1: return data - self.oscar.purewaterAttAfterInterp pass return data pass def __prepare_for_save(self,) -> bool: dir = Path().joinpath(DATA_DIR, OUTPUT_DIR) self.mydir.setBaseDir(dir) self.mydir.newDirIfNot() self.mydir.newFileIfNot(self.oscar.SN+SAVE_EXT_NAME) self.mydir.setHeader(self.oscar.outputWavelength, TOKEN, self.oscar.SN) if self.mydir.checkHeader() == 0: self.mydir.writeHeader() if self.mydir.checkHeader() == -1: # self.popDialog(" 文件头不一致, 请备份到其他目录,并在该目录下删除") raise MyException(" 文件头不一致, 请备份到其他目录,并在该目录下删除") def __set_msg(self, typ, d): self.msg = {} self.msg.update( {"type":typ} ) self.msg.update( {"data":d} ) pass def get_device_sn_from_buf(self ): # print( f" 000000 {self.oscar.register.snBuf}") self.oscar.deviceSN = self.oscar.register.snBuf[-6:-2].decode() if self.oscar.SN == self.oscar.deviceSN: self.devicesn_ok = True def get_raw_wavelength_from_buf(self ): rawWavelength = self.convert_buf_2_float(self.oscar.register.wavelengthBuf,FLOAT_RESERVE_BIT ) self.set_raw_wavelength( rawWavelength) print(f" === {len(self.oscar.rawWavelength)} {self.oscar.rawWavelength}" ) def get_raw_intensity_from_buf(self ): intens =self.convert_buf_2_float(self.oscar.register.intensityBuf,FLOAT_RESERVE_BIT ) time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) self.set_raw_intensity(self.oscar.deviceSN,time_str ,intens) def convert_buf_2_float(self, buff, bit = 3 ,byteOrder= "big" )-> None: res = [] len_ = len(buff) if len_%4 != 0: return res if byteOrder == "big": for i in range( int(len_/4) ): tmp = struct.unpack(">f", buff[i*4: i*4+4] ) res.append( round(tmp[0],bit) ) else: for i in range( int(len_/4) ): tmp = struct.unpack(">f", buff[i*4: i*4+4] ) res.append(round(tmp[0],bit)) return res pass def convert_str_2_float_list(self, lst )-> None: res = [] for l in lst: res.append(float(l)) return res pass