# import numpy as np from pathlib import Path class DataPlot(): ''' 从文件读取数据,出来成待显示数据 ''' # def __init__(self, mypanel, ): def __init__(self, ): # self.mypanel = mypanel # self.__eventManager = eventManger # self.dataSaveFile = DATA_FNAME_WITHOUT_WATER # self.axes_title = "Water Attenuation Coefficient" # self.axes_xlabel = "Wavelength (nm)" # self.axes_ylabel = "Attenuation Coefficient m(-1)" # self.axes_title = "Attenuation Coefficient Demo" self.demo_time = '' self.demo_wl = [] # 纯水 self.demo_data = [] # 纯水 self.wavelength = None self.data_one = None self.data_multi = None # self.mode = "one" # multi # self.begin_line = None # self.line_interval = None self.fpath :Path = Path() self.total_lines = 0 self.token = ";" # self.MPL = myPanel.MPL_Panel(self) # 调用自己建立的 panel 类 # self.Figure = self.MPL.Figure # self.axes = self.MPL.axes # self.FigureCanvas = self.MPL.FigureCanvas def set_file_path( self, fpath:Path ): self.fpath = fpath pass def set_wavelength(self): pass # def set_mode(self, mode ="one"): # self.mode = mode # pass def set_token(self, token =";"): self.token = token pass def set_begin_line( self, lineno = 0 ): self.begin_line = lineno pass def set_line_interval( self, line_interval = 0 ): self.line_interval = line_interval pass def get_total_lines( self, ): ''' 大文件 ''' count = 0 thefile = open( self.fpath, 'rb') while True: buffer = thefile.read(8192*1024) if not buffer: break count += buffer.count(b'\n') self.total_lines = count + 1 def get_wavelength_from_file( self, ): tmp = self.get_first_line() self.wavelength = tmp.split(self.token)[1:] pass def get_reverse_x_lineno( self, x ): line = self.__get_reverse_x_lineno(x) line = line.split(self.token) return line[0], line[1:] pass def get_multi_by_x_m_n( self, x,m,n ): # res_time = [] # res_lines = [] lines = self.__get_reverse_x_m_n_line(x,m,n) # print(f'+++++ {lines}') # for i in range(len(lines)): # ln = # res_time.append( lines[i] [0] ) # res_lines.append( lines[i][1:] ) # return res_time, res_lines return lines pass def __get_reverse_x_lineno(self, x): """ 获得倒数第x行 """ try: # filesize = os.path.getsize(self.fpath) filesize = self.fpath.stat().st_size if filesize == 0: return None else: with open(self.fpath, 'rb') as fp: # to use seek from end, must use mode 'rb' offset = -8 # initialize offset while -offset < filesize: # offset cannot exceed file size fp.seek(offset, 2) # read # offset chars from eof(represent by number '2') lines = fp.readlines() # read from fp to eof if len(lines) >= x+1: # if contains at least 2 lines return lines[-1*x] # then last line is totally included else: offset *= 2 # enlarge offset fp.seek(0) lines = fp.readlines() if len(lines) < x: raise Exception("行数有误,请重试") return lines[-1*x] except FileNotFoundError: print(self.fpath.name + ' not found!') return None def __get_reverse_x_m_n_line(self, x, m=1, n=1): """ 获得倒数第x行, 间隔m行, 共取n个行 """ print(f'__get_reverse_x_m_n_line {x} {self.fpath}') min_lines = x + m*n res_lines = [] try: # filesize = os.path.getsize(self.fpath) filesize = self.fpath.stat().st_size if filesize == 0: return None else: with open(self.fpath, 'rb') as fp: # to use seek from end, must use mode 'rb'x offset = -8 # initialize offset while -offset < filesize: # offset cannot exceed file size fp.seek( offset, 2 ) # read # offset chars from eof(represent by number '2') lines = fp.readlines() # read from fp to eof if len(lines) >= min_lines+1 : for i in range(n): res_lines.append( lines[-1*(x+m*i)].decode()) return res_lines else: offset *= 2 # enlarge offset fp.seek( 0 ) lines = fp.readlines( ) if len(lines) < min_lines: raise Exception("行数有误,请重试") for i in range(n): res_lines.append( lines[-1*(x+m*i)].decode()) return res_lines print(res_lines) except FileNotFoundError: print(self.fpath.name + ' not found!') return None def get_first_line(self, ): firstline = '' with open(self.fpath, 'rb') as fp: # to use seek from end, must use mode 'rb'x firstline = fp.readline() return firstline.decode() def get_last_line(self,filename:Path): """ 获得最后一行 :param filename: file name :return: last line or None for empty file """ try: filesize = filename.stat().st_size if filesize == 0: return None else: with open(filename, 'rb') as fp: # to use seek from end, must use mode 'rb' offset = -8 # initialize offset while -offset < filesize: # offset cannot exceed file size fp.seek(offset, 2) # read # offset chars from eof(represent by number '2') lines = fp.readlines() # read from fp to eof if len(lines) >= 2: # if contains at least 2 lines return lines[-1] # then last line is totally included else: offset *= 2 # enlarge offset fp.seek(0) lines = fp.readlines() return lines[-1] except FileNotFoundError: print(filename + ' not found!') return None def tail(file, taillines=500, return_str=True, avg_line_length=None): """ 大文件的尾行,小文件会导致报错 taillines : 读取尾部多少行 avg_line_length:每行字符平均数, return_str:返回类型,默认为字符串,False为列表。 offset:每次循环相对文件末尾指针偏移数 f.tell() 当前指针位置 f.seek(a,b) a 偏移量, b:0 1 2 ,文件头 当前位置 文件尾部 """ with open(file, errors='ignore') as f: if not avg_line_length: f.seek(0, 2) f.seek(f.tell() - 3000) avg_line_length = int(3000 / len(f.readlines())) + 10 f.seek(0, 2) end_pointer = f.tell() offset = taillines * avg_line_length if offset > end_pointer: f.seek(0, 0) lines = f.readlines()[-taillines:] return "".join(lines) if return_str else lines offset_init = offset i = 1 while len(f.readlines()) < taillines: location = f.tell() - offset f.seek(location) i += 1 offset = i * offset_init if f.tell() - offset < 0: f.seek(0, 0) break else: f.seek(end_pointer - offset) lines = f.readlines() if len(lines) >= taillines: lines = lines[-taillines:] return "".join(lines) if return_str else lines # def makeMsg(self,key,value): # """ # 事件对象:制造事件 # 将事件写入event 并send # """ # # event = eventManager.Event( type_ = eventManager.EVENT_MSG) # event.dict[key] = value # # 发送事件 # self.__eventManager.SendEvent(event) if __name__ == "__main__": d = DataPlot() # def plot_demo( self, time_, data ,wavelength): # ''' # @ 根据数据画demo曲线 # ''' # x = np.array( wavelength ).astype(np.float) # datetime_ = str(time_ ) # y = np.array(data).astype(np.float) # self.mypanel.axes.clear() # 清空上次画的内容 # self.mypanel.axes.set_title( self.axes_title ) # self.mypanel.axes.set_xlabel( self.axes_xlabel ) # self.mypanel.axes.set_ylabel( self.axes_ylabel ) # self.mypanel.axes.plot(x, y, color="green", linewidth=0.5 , label=datetime_ ) # self.mypanel.axes.legend() # self.mypanel.axes.grid(True) # self.mypanel.FigureCanvas.draw() # def plot_one( self, time_, data ,wavelength): # ''' # @ 根据数据画一条曲线 # @ 参数 data # ''' # x = np.array( wavelength ).astype(np.float) # datetime_ = str(time_ ) # y = np.array(data).astype(np.float) # self.mypanel.axes.clear() # 清空上次画的内容 # self.mypanel.axes.set_title( self.axes_title ) # self.mypanel.axes.set_xlabel( self.axes_xlabel ) # self.mypanel.axes.set_ylabel( self.axes_ylabel ) # self.mypanel.axes.plot(x, y, color="green", linewidth=0.5 , label=datetime_ ) # self.mypanel.axes.legend() # self.mypanel.axes.grid(True) # self.mypanel.FigureCanvas.draw() # def plotOne( self, data ): # ''' # @ 根据数据画一条曲线 # @ 参数 data # ''' # x = np.array( self.wavelength ).astype(np.float) # datetime_ = data[0] # y = np.array(data[1:]).astype(np.float) # self.mypanel.axes.clear() # 清空上次画的内容 # self.mypanel.axes.set_title( self.axes_title ) # self.mypanel.axes.set_xlabel( self.axes_xlabel ) # self.mypanel.axes.set_ylabel( self.axes_ylabel ) # self.mypanel.axes.plot(x, y, color="green", linewidth=0.5 , label=datetime_ ) # self.mypanel.axes.legend() # self.mypanel.axes.grid(True) # self.mypanel.FigureCanvas.draw() # def OnPlot(self, wl, fpath ): # """ # * 查看历史曲线 # * 默认从最后一条记录读取 # """ # temp = self.__get_last_line( fpath ).decode().split(TOKEN) # time_ = temp[0] # data = temp[1:] # # 加判断波长 # print(fpath) # if len(data) > len( wl ): # for i in range( len(data)-len(wl) ): # data.pop() # self.plot_one( time_ , data, wl ) # def plotSeven(self,wavelength,d): # ''' # * 根据数据画七条曲线 # * 兼容数量不够七条的 # ''' # # myDebug.MyDebug.log_info( ":::::::::::: Class : %s -> Function : get_file_list " % ( __name__, ) ) # # myDebug.MyDebug.log_ctl( " d len : %s " % ( len(d), ) ) # # myDebug.MyDebug.log_ctl( " d : %s " % ( d, ) ) # x = np.array( wavelength ).astype(np.float) # self.mypanel.axes.clear() # 清空上次画的内容 # self.mypanel.axes.set_title( self.axes_title ) # self.mypanel.axes.set_xlabel( self.axes_xlabel ) # self.mypanel.axes.set_ylabel( self.axes_ylabel ) # # 处理 data # length = len(d) # data = [] # for s in d: # ss = s.split(TOKEN) # ss0 = ss[0] # ss1 = ss[1:] # # 去掉尾部多余 # if len(ss1) > len( wavelength ): # for i in range( len(ss1)-len(wavelength) ): # ss1.pop() # data.append( [ss0, ss1] ) # print( " data : %s " % ( len(data) , ) ) # if length >=1 : # print( " data[0][0] %s " % (data[0][0],)) # d0 = data[0][0] # y0 =np.array(data[0][1]).astype(np.float) # self.mypanel.axes.plot(x, y0, 'r', linewidth=0.5 , label = d0 ) # # len(data) =2 # if len(data) >=2 : # d1 = data[1][0] # y1 =np.array(data[1][1]).astype(np.float) # self.mypanel.axes.plot(x, y1, color='orange', linewidth=0.5 , label=d1 ) # # len(data) =3 # if len(data) >=3 : # d2 = data[2][0] # y2 =np.array(data[2][1]).astype(np.float) # self.mypanel.axes.plot(x, y2, 'y', linewidth=0.5 , label=d2 ) # # len(data) =4 # if len(data) >=4 : # d3 = data[3][0] # y3 =np.array(data[3][1]).astype(np.float) # self.mypanel.axes.plot(x, y3, 'g', linewidth=0.5 , label=d3 ) # # len(data) =5 # if len(data) >=5 : # d4 = data[4][0] # y4 =np.array(data[4][1]).astype(np.float) # self.mypanel.axes.plot(x, y4, 'c', linewidth=0.5 , label=d4 ) # # len(data) =6 # if len(data) >=6 : # d5 = data[5][0] # y5 =np.array(data[5][1]).astype(np.float) # self.mypanel.axes.plot(x, y5, 'b', linewidth=0.5 , label=d5 ) # # len(data) =7 # if len(data) >=7 : # d6 = data[6][0] # y6 =np.array(data[6][1]).astype(np.float) # self.mypanel.axes.plot(x, y6, color='purple', linewidth=0.5 , label=d6 ) # self.mypanel.axes.legend() # self.mypanel.axes.grid( True ) # self.mypanel.FigureCanvas.draw() # def OnPlot_7(self, wl, fpath): # """ # 最后间隔七条 # 默认间隔数为1 # """ # result_line = [] # count = 0 # thefile = open( fpath, 'rb') # while True: # buffer = thefile.read(8192*1024) # if not buffer: # break # count += buffer.count(b'\n') # count = count + 1 # self.makeMsg(1, u" 文件共有 " + str(count) + " 行" ) # # self.statusBar.SetStatusText(u" 文件共有 "+str(count)+" 行",0) # # 判断文件是否已有数据 # if count == 0: # #print('文件没有内容!') # self.makeMsg(2, u"数据文件没有记录!" ) # # with wx.MessageDialog(self, u"数据文件没有记录!", u"错误提示", wx.OK )as dlg: # # dlg.ShowModal() # elif count < 8: # for i in range(count-1): # result_line.append(linecache.getline( fpath, count-i).strip("\n")) # else: # for i in range(7): # result_line.append(linecache.getline( fpath, count-i).strip("\n")) # # 画7天的图像 # thefile.close() # self.plotSeven( wl,result_line ) # def OnPlot_D(self, wl,fpath, interval, begin): # """ # 指定位置间隔七条 # 考虑数据是否够 # """ # # print( "i am plot d") # # self.readConfig() # result_line = [] # count = 0 # thefile = open( fpath, 'rb') # while True: # buffer = thefile.read(8192*1024) # if not buffer: # break # count += buffer.count(b'\n') # count = count + 1 # self.makeMsg(1, u" 文件共有 "+str(count)+" 行" ) # # 判断文件绘图参数设置是否正确 # if count == 0: # #print('文件没有内容!') # self.makeMsg(2, u"数据文件没有记录!" ) # # with wx.MessageDialog(self, u"数据文件没有记录!", u"错误提示", wx.OK )as dlg: # # dlg.ShowModal() # elif ( interval*7 + begin ) < (count-1): # for i in range(7): # result_line.append( linecache.getline( fpath,count-begin-i*interval ).strip("\n") ) # else: # for i in range( ( (count-1)-begin )//interval ): # result_line.append(linecache.getline( fpath,count-begin-i*interval ).strip("\n") ) # # 画7天的图像 # thefile.close() # self.plotSeven( wl, result_line )