import wx import os import time import threading import numpy as np from pathlib import Path,PurePath from wx.lib.pubsub import pub # from configobj import ConfigObj from listctrl import Listctrl # from mypanel import MyPanel from mypanel import Plot from uiconfig.ui_com_setting import UI_Com_Setting from uiconfig.ui_filepath_setting import UI_Filepath_Setting from uiconfig.ui_pathsn_setting import UI_PathSN_Setting from uiconfig.ui_plot_setting import UI_Plot_Setting from uiconfig.ui_log_setting import UI_Log_Setting from uiconfig.uiabout import About from uiconfig.uihelp import Help from myconfig import DeviceType,YAML_FILE_NAME,RETRIEVE_CFG_FILE from myconfig import TOKEN,DATA_DIR,FILE_MARK,OUTPUT_DIR,SAVE_EXT_NAME from myconfig import MyConfig from configuration import Configuration # from awrams import AWRAMS,HandHeldBuf from myexception import MyException from mylogger import log from mypath import MyDir from mythread import Mythead from readcal import ReadCal from uart import Uart from oscar import Oscar from dataplot import DataPlot # -定义菜单ID,关联Event------------------------- """ # 菜单 文件 -- 处理文件 设备 -- 序列 波长 测量 设置 -- 串口 光程 文件路径 绘图参数 采集设置 作图 -- 最后一条曲线 最后七条 指定位置七条 帮助 """ ID_MEASURE = 1 ID_DEAL_FILE = 2 ID_DEVICE_SN = 3 ID_DEVICE_WL = 4 ID_DEVICE_LOG = 5 ID_COM_SETTING = 6 ID_PATH_SN = 7 ID_FILE_PATH = 8 ID_PLOT_SETTING = 9 ID_LOG_SETTING = 10 ID_DEVICE_STOP = 11 ID_PLOT_ONE = 15 ID_PLOT_SEVEN = 16 ID_PLOT_RULE = 17 ID_HELP = 21 ID_ABOUT = 22 class YiwinFrame( wx.Frame ): '''将buf类传进来''' def __init__(self, title, parent, size=(900, 750)): self.device_id = 2 self.device_type = DeviceType.OSCAR.name self.raw_wavelength = [] self.new_wavelength = [] self.device_wavelength = [] self.device_data = [] self.output_wavelength = [] self.output_wl_ndarray = np.array([]) self.syscfg = {} self.sensor_cfg = {} self.retrieve = {} self.calinfo_is_ok = False self.title = title self.dir :Path = Path() self.datadir :Path = self.dir.joinpath(DATA_DIR) self.output_dir:Path = self.datadir.joinpath(OUTPUT_DIR) self.save_fname :Path = Path() self.result = '' self.displayData:list = [] self.interval = 0 self.measure_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()) super(YiwinFrame, self).__init__( parent, title=self.title, size = size ) self.BoxSizer = wx.BoxSizer(wx.HORIZONTAL) # 建立一个Boxsizer self.SetSizer(self.BoxSizer) self.Centre(wx.BOTH) # 建立 listctrl panel 放到BoxSizer self.listctrl_panel = Listctrl( self ) # 调用自己建立的 Listctrl panel 类 self.BoxSizer.Add( self.listctrl_panel, proportion =-10, border = 2, flag = wx.ALL | wx.EXPAND) self.list_ctrl = self.listctrl_panel.list_ctrl # 隐藏 等待show self.listctrl_panel.Hide() self.plotpanel = Plot( self ) # 调用自己建立的 Listctrl panel 类 self.BoxSizer.Add( self.plotpanel, proportion =-10, border = 0, flag = wx.ALL | wx.EXPAND) # self.static_text = self.plotpanel.staticText1 # 隐藏 等待show self.plotpanel.Show() self.statusBar = self.CreateStatusBar() # 创建状态栏 self.statusBar.SetFieldsCount(2) # 状态栏分成3个区域 self.statusBar.SetStatusWidths([-1, -1]) # 区域宽度比列,用负数 self.statusBar.SetStatusText(u" 等待接收消息......", 0) self.__set_menu() # 添加菜单 self.__attach_events() # 菜单事件 self.__set_properties() # self.__read_config() self.__setTimer() log.info(f"system init....",__name__, "__init__") self.mycfg = MyConfig() self.mydir = MyDir() self.mycfg.set_retrieve() self.__read_config() self.ui_sn = self.syscfg['device']['UISN'] self.device_sn = '' self.file_sn = '' self.interval = self.syscfg['logsetting']['LogInterval'] self.port = self.syscfg['comsetting']['port'] self.uart = Uart() self.oscar = Oscar() self.dataplot = DataPlot() self.plot_pure_water() pub.subscribe( self.updateDisplay, "update") pass def __set_menu(self): ''' # 设置菜单 ''' self.menubar = wx.MenuBar() fileMenu = wx.Menu() fileMenu.Append(ID_DEAL_FILE, u'&处理文件', '...') self.menubar.Append(fileMenu, u'&文件 ') deviceMenu = wx.Menu() deviceMenu.Append(ID_DEVICE_SN, u'&序列号', '...') deviceMenu.AppendSeparator() deviceMenu.Append(ID_DEVICE_WL, u'&波长', '...') deviceMenu.AppendSeparator() deviceMenu.Append(ID_DEVICE_LOG, u'&采集数据', '...') deviceMenu.AppendSeparator() deviceMenu.Append(ID_DEVICE_STOP, u'&停止采集', '...') self.menubar.Append(deviceMenu, u'&设备 ') settingMenu = wx.Menu() settingMenu.Append(ID_COM_SETTING, u'&串口设置', '...') settingMenu.AppendSeparator() settingMenu.Append(ID_PATH_SN, u'&序列号', ' ') settingMenu.AppendSeparator() settingMenu.Append(ID_FILE_PATH, u'&文件路径', ' ') settingMenu.AppendSeparator() settingMenu.Append(ID_PLOT_SETTING, u'&绘图设置', ' ') settingMenu.AppendSeparator() settingMenu.Append(ID_LOG_SETTING, u'&采集设置', ' ') self.menubar.Append(settingMenu, u'&系统设置') plotMenu = wx.Menu() plotMenu.Append( ID_PLOT_ONE, u'&最后一条', '...' ) plotMenu.AppendSeparator() plotMenu.Append( ID_PLOT_SEVEN, u'&最后七条', '...' ) plotMenu.AppendSeparator() plotMenu.Append( ID_PLOT_RULE, u'&指定七条', '...' ) self.menubar.Append(plotMenu, u'&绘图 ') aboutMenu = wx.Menu() aboutMenu.Append( ID_HELP, u'&帮助', 'help...' ) aboutMenu.AppendSeparator() aboutMenu.Append( ID_ABOUT, u'&关于我们', '关于我们...' ) self.menubar.Append( aboutMenu, u'&帮助') self.SetMenuBar(self.menubar) pass def __set_properties(self): self.SetSize((800, 600)) # self.SetTitle(u'传感器数据采集--上海奕枫仪器设备有限公司') self.Centre() def __attach_events(self): ''' # 绑定菜单事件 ''' self.Bind(wx.EVT_MENU, self.OnDealFile, id = ID_DEAL_FILE) self.Bind(wx.EVT_MENU, self.OnDeviceSN, id = ID_DEVICE_SN) self.Bind(wx.EVT_MENU, self.OnDeviceWL, id = ID_DEVICE_WL) self.Bind(wx.EVT_MENU, self.OnDeviceLog, id = ID_DEVICE_LOG) self.Bind(wx.EVT_MENU, self.OnDeviceStop, id = ID_DEVICE_STOP) self.Bind(wx.EVT_MENU, self.OnComSetting, id = ID_COM_SETTING) self.Bind(wx.EVT_MENU, self.OnPathSNSetting, id = ID_PATH_SN) self.Bind(wx.EVT_MENU, self.OnFilePathSetting, id = ID_FILE_PATH) self.Bind(wx.EVT_MENU, self.OnPlotSetting, id = ID_PLOT_SETTING) self.Bind(wx.EVT_MENU, self.OnLogSetting, id = ID_LOG_SETTING) self.Bind(wx.EVT_MENU, self.OnPlotOne, id = ID_PLOT_ONE) self.Bind(wx.EVT_MENU, self.OnPlotSeven, id = ID_PLOT_SEVEN) self.Bind(wx.EVT_MENU, self.OnPlotRule, id = ID_PLOT_RULE) self.Bind(wx.EVT_MENU, self.OnHelpConfig, id = ID_HELP) self.Bind(wx.EVT_MENU, self.OnAboutConfig, id = ID_ABOUT) pass def __setTimer(self): self.timer = wx.Timer(self) #创建定时器,菜单以后 self.Bind(wx.EVT_TIMER, self.OnTimer, self.timer) #绑定一个定时器事件 pass def updateDisplay(self,msg): log.debug(f" updateDisplay 。。 {msg['data']}") if msg['type'] == "notice": self.__update_notice(msg['data']) pass if msg['type'] == "sn": self.__update_sn(msg['data']) pass if msg['type'] == "wl": self.__update_wl(msg['data']) pass if msg['type'] == "log": self.__update_log(msg['data'] ) print(f" ...log update ...{msg['data']} " ) pass if msg['type'] == "data": # log.info(f" ... update .{msg['data']}........... " ) self.__update_data(msg['data']) pass pass def __update_sn(self,sn): if sn != self.ui_sn: self.alterStatus_0(f" sn: {sn} 与系统sn {self.ui_sn} 一致") self.popDialog(f" sn: {sn} 与系统sn {self.ui_sn} 一致") raise MyException(f" sn: {sn} 与系统sn {self.ui_sn} 一致") else: self.alterStatus_0(f" sn: {sn} 与系统sn {self.ui_sn} 不一致") pass def __update_notice(self,msg): log.debug(f" msg : {msg}") self.alterStatus_0(msg) pass def __update_wl(self,wl): log.debug(f" 获得的波长成功 : {wl}") self.output_wavelength = wl self.alterStatus_0( f" 获得的波长成功 " ) pass def __update_log(self,d): log.info(f" 获得的数据 {d}") self.alterStatus_0( d) def __update_data( self, d ): '''对于文件过来的数据''' log.debug(f" ", __name__," __update_data") self.plotpanel.clear_past() self.plotpanel.set_title_x_y( *self.plotpanel.measure_legend ) self.plotpanel.plot_one( d['time'] ,np.array(self.oscar.output_wavelength) ,np.array(d['data']) ) pass def OnStart( self, event ): log.info( f" OnStart....interval: { self.interval } minutes, port: { self.port} " ) interval_ms = int(self.interval) * 60 * 1000 self.timer.Start( interval_ms ) # 开始监测 def OnStop( self ,event ): self.timer.Stop() pass def OnTimer(self, event): ''' 定时器实现 ''' # log.info( f"OnTimer .... " ) self.__OnDeviceLog( ) pass def __read_config(self,): # 读取配置文件 self.syscfg = self.mycfg.read_yaml() retrieve = self.mycfg.read_rtv_yaml() self.syscfg.update( { "retrieve" : retrieve } ) def OnDealFile(self, e)-> None: self.__OnDeviceStop() log.info( f"OnDealFile: 处理测量文件", __name__, "", "" ) # 线程守护 self.mt = Mythead() self.oscar.set_cfg(self.syscfg) try: self.oscar.get_data_files() self.mt.set_task( self.oscar.deal_file_lst ) self.mt.start() self.alterStatus_0(" 处理完所有文件" ) except Exception as e: self.alterStatus_0(e ) self.popDialog( e ) pass def OnDeviceSN(self, e)-> None: self.__OnDeviceStop() log.info(f"OnDeviceSN: 获取device sn ", __name__, "", "") # self.mt = Mythead() self.oscar.set_cfg(self.syscfg) self.oscar.set_serial() # self.oscar.set_modbus(self.syscfg['register']['SNAddress'], self.syscfg['register']['SNLen']) try: self.oscar.sn_uart_thread( self.get_device_sn ) except Exception as e: self.alterStatus_0(e ) self.popDialog( e ) self.alterStatus_0(" SN 匹配一致" ) pass def get_device_sn(self,sn): self.device_sn = sn if self.device_sn != self.ui_sn: log.error( f" 设备SN {self.device_sn} 与系统设置 {self.ui_sn}不一致 错误 " ) pass else: log.info(" sn 匹配成功 ! ") pass def OnDeviceWL(self, e)-> None: self.__OnDeviceStop() log.info(f"OnDeviceWL: 获得device wavelength ... ", __name__, "", "") if self.device_sn != '': self.oscar.set_cfg( self.syscfg ) self.oscar.set_serial() try: self.oscar.wl_uart_thread( self.get_device_wl ) except Exception as e: self.alterStatus_0(e ) self.popDialog( e ) self.alterStatus_0(" 获取波长成功!" ) else: self.popDialog('请获取设备序列号SN信息再试') pass def get_device_wl(self, wl_buf): if len(wl_buf) != 0: self.output_wavelength = wl_buf self.output_wl_ndarray = np.array(self.output_wavelength) else: log.error( " 获取设备波长出错 " ) raise MyException( " 获取设备波长出错 " ) pass def OnDeviceLog(self, e)-> None: self.__OnDeviceStop() log.info( f"OnDeviceLog: 采集数据 定时器采集", __name__, "", "" ) # !!!! 判断是否有波长和sn if self.output_wavelength == []: self.alterStatus_0( " 没有设备波长信息,请获取设备波长") self.popDialog( " 没有设备波长信息,请获取设备波长") raise MyException(" 没有设备波长信息,请获取设备波长") self.__OnDeviceLog() # 如何整点获取,如何快速获取 self.timer.Start( int(self.interval) * 60 * 1000 ) pass def __OnDeviceLog(self,)-> None: self.alterStatus_0(" 开始采集数据....") self.device_data = b'' self.mt = Mythead() self.oscar.set_cfg( self.syscfg ) self.oscar.set_serial() try: self.oscar.log_uart_thread( self.get_device_data ) except Exception as e: print(e) self.alterStatus_0(e ) self.popDialog( e ) pass def get_device_data( self,tm, data_buf ): if len(data_buf) != 0: self.device_data = self.oscar.convert_buf_2_float( data_buf ,8 ) self.plotpanel.clear_past() self.plotpanel.set_title_x_y( *self.plotpanel.measure_legend ) self.plotpanel.plot_one( tm , self.output_wl_ndarray ,np.array(data_buf) ) else: log.error( " 采集设备数据出错 " ) raise MyException(" 采集设备数据出错 ") pass def OnDeviceStop(self, e)-> None: self.__OnDeviceStop() pass def __OnDeviceStop(self, )-> None: if self.timer.IsRunning(): self.timer.Stop() pass def plot_pure_water(self,): # self.dataplot.set_file_path() self.__OnDeviceStop() self.plotpanel.set_title_x_y( *self.plotpanel.purewater_legend ) time_ = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()) wl = np.array(self.oscar.purewater_wavelength) att = np.array(self.oscar.purewater_attenuation) self.plotpanel.plot_one(time_, wl, att) pass def OnPlotOne(self,e): self.__OnDeviceStop() self.plotpanel.set_title_x_y( *self.plotpanel.measure_legend ) self.dataplot.set_file_path( self.output_dir.joinpath(self.ui_sn+SAVE_EXT_NAME) ) self.dataplot.set_token(TOKEN) # !! 判断波长,获得波长 wavelength:str = self.dataplot.get_first_line().strip('\n').strip('\r') self.output_wavelength = wavelength.split(TOKEN)[1:] self.output_wl_ndarray = np.array(self.output_wavelength).astype(np.float64) line = self.dataplot.get_multi_by_x_m_n(1,1,1) line = line[0].strip('\n').strip('\r').split(TOKEN) tm = line[0] line = np.array(line[1:]).astype(np.float64) self.plotpanel.axes.clear() self.plotpanel.plot_one( tm, self.output_wl_ndarray, line ) self.alterStatus_0( f" plot {tm}") pass def OnPlotSeven(self,e): self.__OnDeviceStop() self.plotpanel.set_title_x_y( *self.plotpanel.measure_legend ) times = [] lines = [] log.debug( f'+++ ',__name__, 'OnPlotSeven' ) self.dataplot.set_file_path( self.output_dir.joinpath( self.ui_sn+SAVE_EXT_NAME ) ) self.dataplot.set_token(TOKEN) # !! 判断波长,获得波长 wavelength:str = self.dataplot.get_first_line().strip('\n').strip('\r') self.output_wavelength = wavelength.split(TOKEN)[1:] self.output_wl_ndarray = np.array(self.output_wavelength).astype(np.float64) line_7 = self.dataplot.get_multi_by_x_m_n(1,1,7) # log.warning( f'+++ {line_7} ',__name__, 'OnPlotSeven' ) for line in line_7: line = line.strip('\n').strip('\r').split(TOKEN) tm = line[0] line = np.array(line[1:]).astype(np.float64) times.append(tm) lines.append( line ) self.plotpanel.axes.clear() self.plotpanel.plot_multi( times, self.output_wl_ndarray, lines ) self.alterStatus_0( f" plot 7 ") pass def OnPlotRule(self,e): self.__OnDeviceStop() self.plotpanel.set_title_x_y( *self.plotpanel.measure_legend ) times = [] lines = [] log.debug( f'+++ ',__name__, 'OnPlotSeven' ) self.dataplot.set_file_path( self.output_dir.joinpath( self.ui_sn+SAVE_EXT_NAME ) ) self.dataplot.set_token(TOKEN) # !! 判断波长,获得波长 wavelength:str = self.dataplot.get_first_line().strip('\n').strip('\r') self.output_wavelength = wavelength.split(TOKEN)[1:] self.output_wl_ndarray = np.array(self.output_wavelength).astype(np.float64) line_7 = self.dataplot.get_multi_by_x_m_n(1,1,7) # log.warning( f'+++ {line_7} ',__name__, 'OnPlotSeven' ) for line in line_7: line = line.strip('\n').strip('\r').split(TOKEN) tm = line[0] line = np.array(line[1:]).astype(np.float64) times.append( tm ) lines.append( line ) self.plotpanel.axes.clear() self.plotpanel.plot_multi( times, self.output_wl_ndarray, lines ) self.alterStatus_0( f" plot designated ") pass def OnComSetting(self,e): with UI_Com_Setting( self, -1 ) as Dialog_Sensor_Setting: Dialog_Sensor_Setting.CenterOnParent() resultLog = Dialog_Sensor_Setting.ShowModal() if resultLog == wx.ID_OK: log.info( " COM config dialog confirm, call back " ) self.__read_config() pass def OnPathSNSetting(self,e): with UI_PathSN_Setting( self, -1 ) as Dialog_Sensor_Setting: Dialog_Sensor_Setting.CenterOnParent() resultLog = Dialog_Sensor_Setting.ShowModal() if resultLog == wx.ID_OK: log.info( " PathSN config dialog confirm, call back " ) self.__read_config() pass def OnFilePathSetting(self,e): with UI_Filepath_Setting( self, -1 ) as Dialog_Sensor_Setting: Dialog_Sensor_Setting.CenterOnParent() resultLog = Dialog_Sensor_Setting.ShowModal() if resultLog == wx.ID_OK: log.info( " FilePath config dialog confirm, call back " ) self.__read_config() pass def OnPlotSetting(self,e): with UI_Plot_Setting( self, -1 ) as Dialog_Sensor_Setting: Dialog_Sensor_Setting.CenterOnParent() resultLog = Dialog_Sensor_Setting.ShowModal() if resultLog == wx.ID_OK: log.info( " Plot config dialog confirm, call back " ) self.__read_config() pass def OnLogSetting(self,e): with UI_Log_Setting( self, -1 ) as Dialog_Sensor_Setting: Dialog_Sensor_Setting.CenterOnParent() resultLog = Dialog_Sensor_Setting.ShowModal() if resultLog == wx.ID_OK: log.info( " Log config dialog confirm, call back " ) self.__read_config() pass def OnHelpConfig(self, e): with Help( self, -1, "") as Dialog_Help: resultLog = Dialog_Help.ShowModal() if resultLog == wx.ID_OK: log.info("Help info") pass def OnAboutConfig(self, e): with About( self, -1, "") as Dialog_About: resultLog = Dialog_About.ShowModal() if resultLog == wx.ID_OK: log.info("Aboutus") pass def OnOther(self, e): pass def OnQuit(self, e): self.Close() def alterStatus_0(self,msg): self.statusBar.SetStatusText( msg, 0 ) def popDialog(self, msg, msg_type=u"错误提示"): with wx.MessageDialog( self, msg, msg_type, wx.OK )as dlg: dlg.ShowModal() # class SerialThread(threading.Thread): # """进度条类 """ # def __init__(self, parent): # """ # :param parent: 主线程UI # """ # super(SerialThread, self).__init__() # 继承 # self.parent = parent # # log.info(f"SerialThread ... {self.parent.kh}") # self.start() # self.join() # # self.setDaemon(True) # 设置为守护线程, 即子线程是守护进程,主线程结束子线程也随之结束。 # def stop(self): # self.parent.kh.disconnect() # log.info(" Serial stop.... ") # pass # def run(self): # log.info(" Serial run.... ") # wx.CallAfter(self.parent.OnRcv) # wx.CallAfter(self.parent.update_process_bar, count) # 调用parent的函数 # wx.CallAfter(self.parent.close_process_bar) # destroy进度条 # def OnSerialThreadStart(self): # self.m = SerialThread(self) # pass # def OnSerialThreadStop(self): # self.m.stop() # pass # def OnDisplaySave(self): # ''' # 保存数据 self.result # ''' # log.info(f"OnDisplaySave ....") # # self.m = SerialThread(self) # # self.kh.flush() # self.OnSerialThreadStart() # def OnRcv( self ): # log.info(f"OnRcv....") # self.kh.setPort(self.port) # if not self.OnDetectPort: # MyException(f"Can not find port : {self.port}") # log.info(f"{self.port} ok!") # # if not self.kh: # # self.kh = KH3000(self.port) # # self.result = self.kh.OneMeasure() # log.info( f"OnRcv success {self.result}", __class__.__name__ ) # self.OnSave() # self.OnDisplay() # pass # def __get_data_files(self,) -> list: # # 读取配置文件 # file_lst = [] # self.mydir.setBaseDir( Path(self.datadir) ) # file_lst = self.mydir.get_files_from_currentdir(fmt= "*/*.DAT") # return file_lst # def __check_sn_from_datafile(self,fpath) -> bool: # # 读取配置文件 # sn = ReadCal.readFileSNbyIDDevice(fpath) # if sn == self.ui_sn: # return True # return False # def __get_raw_wl_from_datafile(self,fpath) -> list: # # 读取配置文件 # _,raw_wl = ReadCal.read_columns_set_by_mark(fpath, FILE_MARK,0) # return raw_wl # def __deal_file_lst(self,file_lst): # for fl in file_lst: # # 判断序列号是否一致file_lst # if not self.__check_sn_from_datafile(fl): # self.onNotify("文件的序列号和系统设置不一致" ) # self.popDialog(" 文件的序列号和系统设置不一致") # raise MyException("文件的序列号和系统设置不一致") # # 判断原始波长是否为空 # if self.raw_wavelength == []: # res = self.__get_raw_wl_from_datafile(file_lst[0]) # self.raw_wavelength = res[0] # # 处理输出文件 # self.mydir.setBaseDir(self.output_dir) # self.mydir.newDirIfNot() # self.mydir.newFileIfNot(self.ui_sn+SAVE_EXT_NAME) # if self.syscfg['retrieve']['enable'] == 0: # self.mydir.setHeader(self.raw_wavelength,TOKEN,self.ui_sn) # if self.mydir.checkHeader()==0: # self.mydir.writeHeader() # if self.mydir.checkHeader()==-1: # self.popDialog(" 文件头不一致, 请备份到其他目录,并在该目录下删除") # raise MyException(" 文件头不一致, 请备份到其他目录,并在该目录下删除") # res_time,res_data = ReadCal.read_columns_sets_by_mark( fl, FILE_MARK, 1 ) # for i in range( len(res_time) ): # self.__deal_one_measure_time_data(res_time[i], res_data[i]) # pass # pass # # 需要插值处理波长 # if self.syscfg['retrieve']['enable'] == 1: # self.mydir.setHeader(self.new_wavelength.tolist(),TOKEN,self.ui_sn) # if self.mydir.checkHeader()==0: # self.mydir.writeHeader() # if self.mydir.checkHeader()==-1: # self.popDialog(" 文件头不一致, 请备份到其他目录,并在该目录下删除") # raise MyException(" 文件头不一致, 请备份到其他目录,并在该目录下删除") # # res_data 不用带回调的函数callback 会导致多组数据拼接?? # res_time,res_data = ReadCal.read_columns_sets_by_mark( fl, FILE_MARK, 1 ) # log.warning(f" ==== {len(res_time)}" ) # log.warning(res_time) # log.warning(f" ==== {len(res_data[0])}" ) # log.warning(f" ==== { res_data[0] }" ) # for i in range( len(res_time) ): # self.__deal_one_measure_time_data(res_time[i], res_data[0][i]) # pass # pass # def __deal_one_measure_time_data(self,res_time,res_data): # if self.syscfg['retrieve']['enable'] == 0: # self.mydir.setContent(res_data ,TOKEN,res_time ) # self.mydir.writeContent() # # 插值处理 # if self.syscfg['retrieve']['enable'] == 1: # # tmp_data = np.interp( self.new_wavelength, np.array(self.raw_wavelength) , np.array(res_data) ) # x = self.new_wavelength # xp = np.array(self.raw_wavelength).astype(np.float32) # fp = np.array(res_data).astype(np.float32) # log.warning(f" {x}") # log.warning(f" {xp.shape }") # log.warning(f" { fp.shape }") # tmp_data = np.interp( x, xp , fp ) # # tmp_data = np.interp( self.new_wavelength.tolist(), self.raw_wavelength , res_data[0] ) # self.mydir.setContent( tmp_data ,TOKEN, res_time ) # self.mydir.writeContent() # pass # def __deal_one_file(self,fpath): # res_time,res_data = ReadCal.read_columns_sets_by_mark( fpath, FILE_MARK, 1 ) # for i in range( len(res_time) ): # if self.syscfg['retrieve']['enable'] == 0: # self.mydir.setContent(res_data[i],TOKEN,res_time[i]) # self.mydir.writeContent() # # 插值处理 # if self.syscfg['retrieve']['enable'] == 1: # self.mydir.setContent(res_data[i],TOKEN,res_time[i]) # self.mydir.writeContent() # pass # pass # def __set_serial(self, )-> None: # self.uart.set_serial_para( # self.syscfg['comsetting']['port'] # ,self.syscfg['comsetting']['baudrate'] # ,self.syscfg['comsetting']['bytesize'] # ,self.syscfg['comsetting']['parity'] # ,self.syscfg['comsetting']['stopbit'] # ) # def __set_modbus(self, beginaddress, step)-> None: # self.uart.set_modbus( # self.syscfg['register']['slaveadress'] # ,self.syscfg['register']['functioncode'] # ,beginaddress # ,step # )