# import numpy as np from pathlib import Path class DataPlot(): ''' 从文件读取数据,出来成待显示数据 ''' # def __init__(self, mypanel, ): def __init__(self, ): # self.mypanel = mypanel # self.__eventManager = eventManger # self.dataSaveFile = DATA_FNAME_WITHOUT_WATER # self.axes_title = "Water Attenuation Coefficient" # self.axes_xlabel = "Wavelength (nm)" # self.axes_ylabel = "Attenuation Coefficient m(-1)" # self.axes_title = "Attenuation Coefficient Demo" self.demo_time = '' self.demo_wl = [] # 纯水 self.demo_data = [] # 纯水 self.wavelength = None self.data_one = None self.data_multi = None # self.mode = "one" # multi # self.begin_line = None # self.line_interval = None self.fpath :Path = Path() self.total_lines = 0 self.token = ";" # self.MPL = myPanel.MPL_Panel(self) # 调用自己建立的 panel 类 # self.Figure = self.MPL.Figure # self.axes = self.MPL.axes # self.FigureCanvas = self.MPL.FigureCanvas def set_file_path( self, fpath:Path ): self.fpath = fpath pass def set_wavelength(self): pass # def set_mode(self, mode ="one"): # self.mode = mode # pass def set_token(self, token =";"): self.token = token pass def set_begin_line( self, lineno = 0 ): self.begin_line = lineno pass def set_line_interval( self, line_interval = 0 ): self.line_interval = line_interval pass def get_total_lines( self, ): ''' 大文件 ''' count = 0 thefile = open( self.fpath, 'rb') while True: buffer = thefile.read(8192*1024) if not buffer: break count += buffer.count(b'\n') self.total_lines = count + 1 def get_wavelength_from_file( self, ): tmp = self.get_first_line() self.wavelength = tmp.split(self.token)[1:] pass def get_reverse_x_lineno( self, x ): line = self.__get_reverse_x_lineno(x) line = line.split(self.token) return line[0], line[1:] pass def get_multi_by_x_m_n( self, x,m,n ): # res_time = [] # res_lines = [] lines = self.__get_reverse_x_m_n_line(x,m,n) # print(f'+++++ {lines}') # for i in range(len(lines)): # ln = # res_time.append( lines[i] [0] ) # res_lines.append( lines[i][1:] ) # return res_time, res_lines return lines pass def __get_reverse_x_lineno(self, x): """ 获得倒数第x行 """ try: # filesize = os.path.getsize(self.fpath) filesize = self.fpath.stat().st_size if filesize == 0: return None else: with open(self.fpath, 'rb') as fp: # to use seek from end, must use mode 'rb' offset = -8 # initialize offset while -offset < filesize: # offset cannot exceed file size fp.seek(offset, 2) # read # offset chars from eof(represent by number '2') lines = fp.readlines() # read from fp to eof if len(lines) >= x+1: # if contains at least 2 lines return lines[-1*x] # then last line is totally included else: offset *= 2 # enlarge offset fp.seek(0) lines = fp.readlines() if len(lines) < x: raise Exception("行数有误,请重试") return lines[-1*x] except FileNotFoundError: print(self.fpath.name + ' not found!') return None def __get_reverse_x_m_n_line(self, x, m=1, n=1): """ 获得倒数第x行, 间隔m行, 共取n个行 """ print(f'__get_reverse_x_m_n_line {x} {self.fpath}') min_lines = x + m*n res_lines = [] try: # filesize = os.path.getsize(self.fpath) filesize = self.fpath.stat().st_size if filesize == 0: return None else: with open(self.fpath, 'rb') as fp: # to use seek from end, must use mode 'rb'x offset = -8 # initialize offset while -offset < filesize: # offset cannot exceed file size fp.seek( offset, 2 ) # read # offset chars from eof(represent by number '2') lines = fp.readlines() # read from fp to eof if len(lines) >= min_lines+1 : for i in range(n): res_lines.append( lines[-1*(x+m*i)].decode()) return res_lines else: offset *= 2 # enlarge offset fp.seek( 0 ) lines = fp.readlines( ) if len(lines) < min_lines: raise Exception("行数有误,请重试") for i in range(n): res_lines.append( lines[-1*(x+m*i)].decode()) return res_lines print(res_lines) except FileNotFoundError: print(self.fpath.name + ' not found!') return None def get_first_line(self, ): firstline = '' with open(self.fpath, 'rb') as fp: # to use seek from end, must use mode 'rb'x firstline = fp.readline() return firstline.decode() def get_last_line(self,filename:Path): """ 获得最后一行 :param filename: file name :return: last line or None for empty file """ try: filesize = filename.stat().st_size if filesize == 0: return None else: with open(filename, 'rb') as fp: # to use seek from end, must use mode 'rb' offset = -8 # initialize offset while -offset < filesize: # offset cannot exceed file size fp.seek(offset, 2) # read # offset chars from eof(represent by number '2') lines = fp.readlines() # read from fp to eof if len(lines) >= 2: # if contains at least 2 lines return lines[-1] # then last line is totally included else: offset *= 2 # enlarge offset fp.seek(0) lines = fp.readlines() return lines[-1] except FileNotFoundError: print(filename + ' not found!') return None def tail(file, taillines=500, return_str=True, avg_line_length=None): """ 大文件的尾行,小文件会导致报错 taillines : 读取尾部多少行 avg_line_length:每行字符平均数, return_str:返回类型,默认为字符串,False为列表。 offset:每次循环相对文件末尾指针偏移数 f.tell() 当前指针位置 f.seek(a,b) a 偏移量, b:0 1 2 ,文件头 当前位置 文件尾部 """ with open(file, errors='ignore') as f: if not avg_line_length: f.seek(0, 2) f.seek(f.tell() - 3000) avg_line_length = int(3000 / len(f.readlines())) + 10 f.seek(0, 2) end_pointer = f.tell() offset = taillines * avg_line_length if offset > end_pointer: f.seek(0, 0) lines = f.readlines()[-taillines:] return "".join(lines) if return_str else lines offset_init = offset i = 1 while len(f.readlines()) < taillines: location = f.tell() - offset f.seek(location) i += 1 offset = i * offset_init if f.tell() - offset < 0: f.seek(0, 0) break else: f.seek(end_pointer - offset) lines = f.readlines() if len(lines) >= taillines: lines = lines[-taillines:] return "".join(lines) if return_str else lines if __name__ == "__main__": d = DataPlot()