You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
237 lines
8.7 KiB
237 lines
8.7 KiB
3 years ago
|
# import numpy as np
|
||
|
|
||
|
from pathlib import Path
|
||
|
|
||
|
|
||
|
class DataPlot():
|
||
|
'''
|
||
|
从文件读取数据,出来成待显示数据
|
||
|
'''
|
||
|
|
||
|
# def __init__(self, mypanel, ):
|
||
|
def __init__(self, ):
|
||
|
# self.mypanel = mypanel
|
||
|
# self.__eventManager = eventManger
|
||
|
# self.dataSaveFile = DATA_FNAME_WITHOUT_WATER
|
||
|
# self.axes_title = "Water Attenuation Coefficient"
|
||
|
# self.axes_xlabel = "Wavelength (nm)"
|
||
|
# self.axes_ylabel = "Attenuation Coefficient m(-1)"
|
||
|
# self.axes_title = "Attenuation Coefficient Demo"
|
||
|
self.demo_time = ''
|
||
|
self.demo_wl = [] # 纯水
|
||
|
self.demo_data = [] # 纯水
|
||
|
|
||
|
self.wavelength = None
|
||
|
self.data_one = None
|
||
|
self.data_multi = None
|
||
|
# self.mode = "one" # multi
|
||
|
# self.begin_line = None
|
||
|
# self.line_interval = None
|
||
|
self.fpath :Path = Path()
|
||
|
self.total_lines = 0
|
||
|
self.token = ";"
|
||
|
# self.MPL = myPanel.MPL_Panel(self) # 调用自己建立的 panel 类
|
||
|
# self.Figure = self.MPL.Figure
|
||
|
# self.axes = self.MPL.axes
|
||
|
# self.FigureCanvas = self.MPL.FigureCanvas
|
||
|
def set_file_path( self, fpath:Path ):
|
||
|
self.fpath = fpath
|
||
|
pass
|
||
|
|
||
|
def set_wavelength(self):
|
||
|
pass
|
||
|
|
||
|
# def set_mode(self, mode ="one"):
|
||
|
# self.mode = mode
|
||
|
# pass
|
||
|
|
||
|
def set_token(self, token =";"):
|
||
|
self.token = token
|
||
|
pass
|
||
|
|
||
|
def set_begin_line( self, lineno = 0 ):
|
||
|
self.begin_line = lineno
|
||
|
pass
|
||
|
|
||
|
def set_line_interval( self, line_interval = 0 ):
|
||
|
self.line_interval = line_interval
|
||
|
pass
|
||
|
|
||
|
def get_total_lines( self, ):
|
||
|
''' 大文件 '''
|
||
|
count = 0
|
||
|
thefile = open( self.fpath, 'rb')
|
||
|
while True:
|
||
|
buffer = thefile.read(8192*1024)
|
||
|
if not buffer:
|
||
|
break
|
||
|
count += buffer.count(b'\n')
|
||
|
self.total_lines = count + 1
|
||
|
|
||
|
def get_wavelength_from_file( self, ):
|
||
|
tmp = self.get_first_line()
|
||
|
self.wavelength = tmp.split(self.token)[1:]
|
||
|
pass
|
||
|
|
||
|
def get_reverse_x_lineno( self, x ):
|
||
|
line = self.__get_reverse_x_lineno(x)
|
||
|
line = line.split(self.token)
|
||
|
return line[0], line[1:]
|
||
|
pass
|
||
|
|
||
|
def get_multi_by_x_m_n( self, x,m,n ):
|
||
|
# res_time = []
|
||
|
# res_lines = []
|
||
|
lines = self.__get_reverse_x_m_n_line(x,m,n)
|
||
|
# print(f'+++++ {lines}')
|
||
|
# for i in range(len(lines)):
|
||
|
# ln =
|
||
|
# res_time.append( lines[i] [0] )
|
||
|
# res_lines.append( lines[i][1:] )
|
||
|
# return res_time, res_lines
|
||
|
return lines
|
||
|
pass
|
||
|
|
||
|
def __get_reverse_x_lineno(self, x):
|
||
|
"""
|
||
|
获得倒数第x行
|
||
|
"""
|
||
|
try:
|
||
|
# filesize = os.path.getsize(self.fpath)
|
||
|
filesize = self.fpath.stat().st_size
|
||
|
if filesize == 0:
|
||
|
return None
|
||
|
else:
|
||
|
with open(self.fpath, 'rb') as fp: # to use seek from end, must use mode 'rb'
|
||
|
offset = -8 # initialize offset
|
||
|
while -offset < filesize: # offset cannot exceed file size
|
||
|
fp.seek(offset, 2) # read # offset chars from eof(represent by number '2')
|
||
|
lines = fp.readlines() # read from fp to eof
|
||
|
if len(lines) >= x+1: # if contains at least 2 lines
|
||
|
return lines[-1*x] # then last line is totally included
|
||
|
else:
|
||
|
offset *= 2 # enlarge offset
|
||
|
fp.seek(0)
|
||
|
lines = fp.readlines()
|
||
|
if len(lines) < x:
|
||
|
raise Exception("行数有误,请重试")
|
||
|
return lines[-1*x]
|
||
|
except FileNotFoundError:
|
||
|
print(self.fpath.name + ' not found!')
|
||
|
return None
|
||
|
|
||
|
def __get_reverse_x_m_n_line(self, x, m=1, n=1):
|
||
|
"""
|
||
|
获得倒数第x行, 间隔m行, 共取n个行
|
||
|
"""
|
||
|
print(f'__get_reverse_x_m_n_line {x} {self.fpath}')
|
||
|
min_lines = x + m*n
|
||
|
res_lines = []
|
||
|
try:
|
||
|
# filesize = os.path.getsize(self.fpath)
|
||
|
filesize = self.fpath.stat().st_size
|
||
|
if filesize == 0:
|
||
|
return None
|
||
|
else:
|
||
|
with open(self.fpath, 'rb') as fp: # to use seek from end, must use mode 'rb'x
|
||
|
offset = -8 # initialize offset
|
||
|
while -offset < filesize: # offset cannot exceed file size
|
||
|
fp.seek( offset, 2 ) # read # offset chars from eof(represent by number '2')
|
||
|
lines = fp.readlines() # read from fp to eof
|
||
|
if len(lines) >= min_lines+1 :
|
||
|
for i in range(n):
|
||
|
res_lines.append( lines[-1*(x+m*i)].decode())
|
||
|
return res_lines
|
||
|
else:
|
||
|
offset *= 2 # enlarge offset
|
||
|
fp.seek( 0 )
|
||
|
lines = fp.readlines( )
|
||
|
if len(lines) < min_lines:
|
||
|
raise Exception("行数有误,请重试")
|
||
|
for i in range(n):
|
||
|
res_lines.append( lines[-1*(x+m*i)].decode())
|
||
|
return res_lines
|
||
|
print(res_lines)
|
||
|
except FileNotFoundError:
|
||
|
print(self.fpath.name + ' not found!')
|
||
|
return None
|
||
|
|
||
|
def get_first_line(self, ):
|
||
|
firstline = ''
|
||
|
with open(self.fpath, 'rb') as fp: # to use seek from end, must use mode 'rb'x
|
||
|
firstline = fp.readline()
|
||
|
return firstline.decode()
|
||
|
|
||
|
def get_last_line(self,filename:Path):
|
||
|
"""
|
||
|
获得最后一行
|
||
|
:param filename: file name
|
||
|
:return: last line or None for empty file
|
||
|
"""
|
||
|
try:
|
||
|
filesize = filename.stat().st_size
|
||
|
if filesize == 0:
|
||
|
return None
|
||
|
else:
|
||
|
with open(filename, 'rb') as fp: # to use seek from end, must use mode 'rb'
|
||
|
offset = -8 # initialize offset
|
||
|
while -offset < filesize: # offset cannot exceed file size
|
||
|
fp.seek(offset, 2) # read # offset chars from eof(represent by number '2')
|
||
|
lines = fp.readlines() # read from fp to eof
|
||
|
if len(lines) >= 2: # if contains at least 2 lines
|
||
|
return lines[-1] # then last line is totally included
|
||
|
else:
|
||
|
offset *= 2 # enlarge offset
|
||
|
fp.seek(0)
|
||
|
lines = fp.readlines()
|
||
|
return lines[-1]
|
||
|
except FileNotFoundError:
|
||
|
print(filename + ' not found!')
|
||
|
return None
|
||
|
|
||
|
def tail(file, taillines=500, return_str=True, avg_line_length=None):
|
||
|
"""
|
||
|
大文件的尾行,小文件会导致报错
|
||
|
taillines : 读取尾部多少行
|
||
|
avg_line_length:每行字符平均数,
|
||
|
return_str:返回类型,默认为字符串,False为列表。
|
||
|
offset:每次循环相对文件末尾指针偏移数
|
||
|
f.tell() 当前指针位置
|
||
|
f.seek(a,b) a 偏移量, b:0 1 2 ,文件头 当前位置 文件尾部
|
||
|
"""
|
||
|
with open(file, errors='ignore') as f:
|
||
|
if not avg_line_length:
|
||
|
f.seek(0, 2)
|
||
|
f.seek(f.tell() - 3000)
|
||
|
avg_line_length = int(3000 / len(f.readlines())) + 10
|
||
|
f.seek(0, 2)
|
||
|
end_pointer = f.tell()
|
||
|
offset = taillines * avg_line_length
|
||
|
if offset > end_pointer:
|
||
|
f.seek(0, 0)
|
||
|
lines = f.readlines()[-taillines:]
|
||
|
return "".join(lines) if return_str else lines
|
||
|
offset_init = offset
|
||
|
i = 1
|
||
|
while len(f.readlines()) < taillines:
|
||
|
location = f.tell() - offset
|
||
|
f.seek(location)
|
||
|
i += 1
|
||
|
offset = i * offset_init
|
||
|
if f.tell() - offset < 0:
|
||
|
f.seek(0, 0)
|
||
|
break
|
||
|
else:
|
||
|
f.seek(end_pointer - offset)
|
||
|
lines = f.readlines()
|
||
|
if len(lines) >= taillines:
|
||
|
lines = lines[-taillines:]
|
||
|
|
||
|
return "".join(lines) if return_str else lines
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
d = DataPlot()
|
||
|
|
||
|
|
||
|
|