剖模, server desktop分支
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
profiler/dataplot.py

237 lines
8.7 KiB

# import numpy as np
from pathlib import Path
class DataPlot():
'''
从文件读取数据出来成待显示数据
'''
# def __init__(self, mypanel, ):
def __init__(self, ):
# self.mypanel = mypanel
# self.__eventManager = eventManger
# self.dataSaveFile = DATA_FNAME_WITHOUT_WATER
# self.axes_title = "Water Attenuation Coefficient"
# self.axes_xlabel = "Wavelength (nm)"
# self.axes_ylabel = "Attenuation Coefficient m(-1)"
# self.axes_title = "Attenuation Coefficient Demo"
self.demo_time = ''
self.demo_wl = [] # 纯水
self.demo_data = [] # 纯水
self.wavelength = None
self.data_one = None
self.data_multi = None
# self.mode = "one" # multi
# self.begin_line = None
# self.line_interval = None
self.fpath :Path = Path()
self.total_lines = 0
self.token = ";"
# self.MPL = myPanel.MPL_Panel(self) # 调用自己建立的 panel 类
# self.Figure = self.MPL.Figure
# self.axes = self.MPL.axes
# self.FigureCanvas = self.MPL.FigureCanvas
def set_file_path( self, fpath:Path ):
self.fpath = fpath
pass
def set_wavelength(self):
pass
# def set_mode(self, mode ="one"):
# self.mode = mode
# pass
def set_token(self, token =";"):
self.token = token
pass
def set_begin_line( self, lineno = 0 ):
self.begin_line = lineno
pass
def set_line_interval( self, line_interval = 0 ):
self.line_interval = line_interval
pass
def get_total_lines( self, ):
''' 大文件 '''
count = 0
thefile = open( self.fpath, 'rb')
while True:
buffer = thefile.read(8192*1024)
if not buffer:
break
count += buffer.count(b'\n')
self.total_lines = count + 1
def get_wavelength_from_file( self, ):
tmp = self.get_first_line()
self.wavelength = tmp.split(self.token)[1:]
pass
def get_reverse_x_lineno( self, x ):
line = self.__get_reverse_x_lineno(x)
line = line.split(self.token)
return line[0], line[1:]
pass
def get_multi_by_x_m_n( self, x,m,n ):
# res_time = []
# res_lines = []
lines = self.__get_reverse_x_m_n_line(x,m,n)
# print(f'+++++ {lines}')
# for i in range(len(lines)):
# ln =
# res_time.append( lines[i] [0] )
# res_lines.append( lines[i][1:] )
# return res_time, res_lines
return lines
pass
def __get_reverse_x_lineno(self, x):
"""
获得倒数第x行
"""
try:
# filesize = os.path.getsize(self.fpath)
filesize = self.fpath.stat().st_size
if filesize == 0:
return None
else:
with open(self.fpath, 'rb') as fp: # to use seek from end, must use mode 'rb'
offset = -8 # initialize offset
while -offset < filesize: # offset cannot exceed file size
fp.seek(offset, 2) # read # offset chars from eof(represent by number '2')
lines = fp.readlines() # read from fp to eof
if len(lines) >= x+1: # if contains at least 2 lines
return lines[-1*x] # then last line is totally included
else:
offset *= 2 # enlarge offset
fp.seek(0)
lines = fp.readlines()
if len(lines) < x:
raise Exception("行数有误,请重试")
return lines[-1*x]
except FileNotFoundError:
print(self.fpath.name + ' not found!')
return None
def __get_reverse_x_m_n_line(self, x, m=1, n=1):
"""
获得倒数第x行, 间隔m行, 共取n个行
"""
print(f'__get_reverse_x_m_n_line {x} {self.fpath}')
min_lines = x + m*n
res_lines = []
try:
# filesize = os.path.getsize(self.fpath)
filesize = self.fpath.stat().st_size
if filesize == 0:
return None
else:
with open(self.fpath, 'rb') as fp: # to use seek from end, must use mode 'rb'x
offset = -8 # initialize offset
while -offset < filesize: # offset cannot exceed file size
fp.seek( offset, 2 ) # read # offset chars from eof(represent by number '2')
lines = fp.readlines() # read from fp to eof
if len(lines) >= min_lines+1 :
for i in range(n):
res_lines.append( lines[-1*(x+m*i)].decode())
return res_lines
else:
offset *= 2 # enlarge offset
fp.seek( 0 )
lines = fp.readlines( )
if len(lines) < min_lines:
raise Exception("行数有误,请重试")
for i in range(n):
res_lines.append( lines[-1*(x+m*i)].decode())
return res_lines
print(res_lines)
except FileNotFoundError:
print(self.fpath.name + ' not found!')
return None
def get_first_line(self, ):
firstline = ''
with open(self.fpath, 'rb') as fp: # to use seek from end, must use mode 'rb'x
firstline = fp.readline()
return firstline.decode()
def get_last_line(self,filename:Path):
"""
获得最后一行
:param filename: file name
:return: last line or None for empty file
"""
try:
filesize = filename.stat().st_size
if filesize == 0:
return None
else:
with open(filename, 'rb') as fp: # to use seek from end, must use mode 'rb'
offset = -8 # initialize offset
while -offset < filesize: # offset cannot exceed file size
fp.seek(offset, 2) # read # offset chars from eof(represent by number '2')
lines = fp.readlines() # read from fp to eof
if len(lines) >= 2: # if contains at least 2 lines
return lines[-1] # then last line is totally included
else:
offset *= 2 # enlarge offset
fp.seek(0)
lines = fp.readlines()
return lines[-1]
except FileNotFoundError:
print(filename + ' not found!')
return None
def tail(file, taillines=500, return_str=True, avg_line_length=None):
"""
大文件的尾行小文件会导致报错
taillines : 读取尾部多少行
avg_line_length:每行字符平均数,
return_str:返回类型默认为字符串False为列表
offset:每次循环相对文件末尾指针偏移数
f.tell() 当前指针位置
f.seek(a,b) a 偏移量 b:0 1 2 文件头 当前位置 文件尾部
"""
with open(file, errors='ignore') as f:
if not avg_line_length:
f.seek(0, 2)
f.seek(f.tell() - 3000)
avg_line_length = int(3000 / len(f.readlines())) + 10
f.seek(0, 2)
end_pointer = f.tell()
offset = taillines * avg_line_length
if offset > end_pointer:
f.seek(0, 0)
lines = f.readlines()[-taillines:]
return "".join(lines) if return_str else lines
offset_init = offset
i = 1
while len(f.readlines()) < taillines:
location = f.tell() - offset
f.seek(location)
i += 1
offset = i * offset_init
if f.tell() - offset < 0:
f.seek(0, 0)
break
else:
f.seek(end_pointer - offset)
lines = f.readlines()
if len(lines) >= taillines:
lines = lines[-taillines:]
return "".join(lines) if return_str else lines
if __name__ == "__main__":
d = DataPlot()