You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
208 lines
6.9 KiB
208 lines
6.9 KiB
2 years ago
|
from pathlib import Path
|
||
|
from awrams import InfoFrame
|
||
|
|
||
|
class DealFolder(object):
|
||
|
def __init__(self,):
|
||
|
self.base_folder = Path()
|
||
|
self.old_folder = Path()
|
||
|
self.new_folder = Path()
|
||
|
self.info_frame = InfoFrame()
|
||
|
self.measure_con = None
|
||
|
pass
|
||
|
|
||
|
def setOlderFolder(self,pth:Path):
|
||
|
''' 解析信息帧, 获得新文件夹位置转移文件并 '''
|
||
|
self.old_folder = pth
|
||
|
pass
|
||
|
|
||
|
def getBaseFolder(self,grade=1):
|
||
|
''' 解析信息帧, 获得新文件夹位置转移文件并 '''
|
||
|
self.base_folder = self.old_folder
|
||
|
for i in range(grade):
|
||
|
try:
|
||
|
self.base_folder = self.base_folder.parent
|
||
|
except :
|
||
|
assert 1>2, f">>>> Fail to Get Base Folder"
|
||
|
assert self.base_folder.exists(), f">>>> Fail to Get Base Folder"
|
||
|
pass
|
||
|
|
||
|
|
||
|
def transferFromOldFolder(self, ):
|
||
|
bin_files = self.old_folder.glob('*.bin')
|
||
|
for bf in bin_files:
|
||
|
if bf.name == "pic.bin":
|
||
|
bf.replace( self.new_folder.joinpath( "pic.jpg" ) )
|
||
|
else:
|
||
|
bf.replace( self.new_folder.joinpath(bf.name) )
|
||
|
pass
|
||
|
|
||
|
def transferToNewFolder(self, mode=0):
|
||
|
''' pic.bin pic.jpg'''
|
||
|
self.new_folder = self.base_folder.joinpath(
|
||
|
"20" + str(self.info_frame.year)
|
||
|
, str(self.info_frame.month)
|
||
|
, str(self.info_frame.day)
|
||
|
, str(self.measure_con)
|
||
|
)
|
||
|
bin_files = self.old_folder.glob('*.bin')
|
||
|
for bf in bin_files:
|
||
|
if bf.name == "pic.bin":
|
||
|
bf.replace( self.new_folder.joinpath( "pic.jpg" ) )
|
||
|
else:
|
||
|
bf.replace( self.new_folder.joinpath(bf.name) )
|
||
|
pass
|
||
|
|
||
|
def deleteOldFolder(self,):
|
||
|
try:
|
||
|
if self.old_folder.exists():
|
||
|
self.old_folder.rmdir()
|
||
|
except OSError as e:
|
||
|
raise Exception(e)
|
||
|
|
||
|
|
||
|
def readOneFolder(self,):
|
||
|
ret =[]
|
||
|
f_lst = self.new_folder.glob('*.bin')
|
||
|
for fl in f_lst:
|
||
|
ret.append(self.read_bin(fl))
|
||
|
return ret
|
||
|
|
||
|
def read_bin(self,fpath: Path):
|
||
|
assert fpath.exists(), f">>>> not find {fpath} "
|
||
|
|
||
|
with open(fpath, 'rb') as file:
|
||
|
ret = file.read()
|
||
|
self.info_frame.set_info_frame(ret)
|
||
|
|
||
|
pass
|
||
|
|
||
|
def getSensorPathFromInfoPath(self, info_path: Path, ) -> Path:
|
||
|
# 服务器上转移后的模式
|
||
|
if self.mode == 1:
|
||
|
sensor_path = self.base_path.joinpath(
|
||
|
info_path.parent,
|
||
|
"sensor.bin"
|
||
|
)
|
||
|
elif self.mode == 0:
|
||
|
sensor_path = self.base_path.joinpath(
|
||
|
"data",
|
||
|
info_path.parts[1][:4] + "_" + info_path.parts[1][-2:],
|
||
|
info_path.parts[2],
|
||
|
"sensor",
|
||
|
info_path.name
|
||
|
)
|
||
|
else:
|
||
|
sensor_path = None
|
||
|
return sensor_path
|
||
|
pass
|
||
|
|
||
|
def getOutputPathFromSensorPath(self, sensor_path: Path) -> Path:
|
||
|
if self.mode == 1:
|
||
|
output_path = self.output_path.joinpath(
|
||
|
self.info_dict['year'] + "_" + self.info_dict['month']
|
||
|
+ "_" + self.info_dict['day'] + "_" + self.info_dict['hour']
|
||
|
+ "_" + self.info_dict['minute'] +
|
||
|
"_" + self.info_dict['second']
|
||
|
)
|
||
|
elif self.mode == 0:
|
||
|
output_path = self.output_path.joinpath(
|
||
|
sensor_path.parts[1][:4] + "_" + sensor_path.parts[1][-2:]
|
||
|
+ "_" + sensor_path.parts[2] + "_" + sensor_path.name
|
||
|
)
|
||
|
else:
|
||
|
output_path = None
|
||
|
return output_path
|
||
|
pass
|
||
|
|
||
|
def getCurrentMeasureTimeFromPath(self, fpath: Path) -> str:
|
||
|
ret = ''
|
||
|
if self.mode == 1: # 读信息txt获得时间
|
||
|
txt_path = fpath.parent.glob("*.txt")
|
||
|
txt_stem = txt_path[0].stem
|
||
|
ret = fpath.parts[1][:4]+"-"+fpath.parts[1][-2:]+"-"+fpath.parts[2]+" " \
|
||
|
+ txt_stem[-9:-7] + ":" + \
|
||
|
txt_stem[-6:-4] + ":" + txt_stem[-3:-1]
|
||
|
pass
|
||
|
elif self.mode == 0:
|
||
|
ret = fpath.parts[1][:4]+"-"+fpath.parts[1][-2:]+"-"+fpath.parts[2]+" " \
|
||
|
+ fpath.name[0:2] + ":" + \
|
||
|
fpath.name[3:5] + ":" + fpath.name[6:8]
|
||
|
else:
|
||
|
pass
|
||
|
return ret
|
||
|
pass
|
||
|
|
||
|
def getDataFileList(self, ):
|
||
|
'''
|
||
|
获得成对的info sensor 文件
|
||
|
[目录名,文件名,年月日,时间, measure_id]
|
||
|
'''
|
||
|
# ret = []
|
||
|
fs = None
|
||
|
self.filelist = []
|
||
|
if self.mode == 1:
|
||
|
fs = self.server_path.glob("*/*/*/*/info.bin")
|
||
|
else:
|
||
|
fs = self.data_path.glob("*/*/info/*")
|
||
|
|
||
|
for f in fs:
|
||
|
error_file = {}
|
||
|
if f.stat().st_size == 0:
|
||
|
error_file.update({"path": f})
|
||
|
error_file.update({"error": "info file size is zero"})
|
||
|
self.error_result.append(error_file)
|
||
|
continue
|
||
|
|
||
|
# self.info_path_fname = f
|
||
|
sensor_path = self.getSensorPathFromInfoPath(f)
|
||
|
# sensor_path = Path(sensor_purepath)
|
||
|
# sensor 文件不存在
|
||
|
if not sensor_path.exists():
|
||
|
error_file.update({"path": f})
|
||
|
error_file.update({"error": "cannot find the sensor file "})
|
||
|
self.error_result.append(error_file)
|
||
|
continue
|
||
|
|
||
|
# sensor文件大小为0
|
||
|
if sensor_path.stat().st_size == 0:
|
||
|
error_file.update({"path": sensor_path})
|
||
|
error_file.update(
|
||
|
{"error": "sensor file size of the sensor is zero' "})
|
||
|
self.error_result.append(error_file)
|
||
|
continue
|
||
|
self.setFilelist(f, sensor_path)
|
||
|
pass
|
||
|
|
||
|
def getDataFileList_SERVER(self,):
|
||
|
'''
|
||
|
获得成对的info sensor 文件
|
||
|
[目录名,文件名,年月日,时间]
|
||
|
'''
|
||
|
# log.info(f" === ", self.server_path, "getDataFileList_SERVER")
|
||
|
self.filelist = []
|
||
|
fs = self.server_path.glob("*/*/*/*/*/info.bin")
|
||
|
for f in fs:
|
||
|
# print(f)
|
||
|
error_file = {}
|
||
|
if f.stat().st_size == 0:
|
||
|
error_file.update({"path": f})
|
||
|
error_file.update({"error": "info file size is zero"})
|
||
|
self.error_result.append(error_file)
|
||
|
continue
|
||
|
|
||
|
# 获得全部bin文件 (info.bin (num).bin),不含 pic.bin
|
||
|
current_path = f.parent
|
||
|
bin_files = current_path.glob('*.bin')
|
||
|
res_files = []
|
||
|
|
||
|
# 检查(num).bin 是否有遗漏
|
||
|
for bl in bin_files:
|
||
|
if bl.stem.isdigit():
|
||
|
res_files.append(bl)
|
||
|
pass
|
||
|
|
||
|
if res_files != []:
|
||
|
res_files.append(f)
|
||
|
self.filelist.append(res_files)
|
||
|
pass
|