from pathlib import Path from awrams import InfoFrame class DealFolder(object): def __init__(self,): self.base_folder = Path() self.old_folder = Path() self.new_folder = Path() self.info_frame = InfoFrame() self.measure_con = None pass def setOlderFolder(self,pth:Path): ''' 解析信息帧, 获得新文件夹位置转移文件并 ''' self.old_folder = pth pass def getBaseFolder(self,grade=1): ''' 解析信息帧, 获得新文件夹位置转移文件并 ''' self.base_folder = self.old_folder for i in range(grade): try: self.base_folder = self.base_folder.parent except : assert 1>2, f">>>> Fail to Get Base Folder" assert self.base_folder.exists(), f">>>> Fail to Get Base Folder" pass def transferFromOldFolder(self, ): bin_files = self.old_folder.glob('*.bin') for bf in bin_files: if bf.name == "pic.bin": bf.replace( self.new_folder.joinpath( "pic.jpg" ) ) else: bf.replace( self.new_folder.joinpath(bf.name) ) pass def transferToNewFolder(self, mode=0): ''' pic.bin pic.jpg''' self.new_folder = self.base_folder.joinpath( "20" + str(self.info_frame.year) , str(self.info_frame.month) , str(self.info_frame.day) , str(self.measure_con) ) bin_files = self.old_folder.glob('*.bin') for bf in bin_files: if bf.name == "pic.bin": bf.replace( self.new_folder.joinpath( "pic.jpg" ) ) else: bf.replace( self.new_folder.joinpath(bf.name) ) pass def deleteOldFolder(self,): try: if self.old_folder.exists(): self.old_folder.rmdir() except OSError as e: raise Exception(e) def readOneFolder(self,): ret =[] f_lst = self.new_folder.glob('*.bin') for fl in f_lst: ret.append(self.read_bin(fl)) return ret def read_bin(self,fpath: Path): assert fpath.exists(), f">>>> not find {fpath} " with open(fpath, 'rb') as file: ret = file.read() self.info_frame.set_info_frame(ret) pass def getSensorPathFromInfoPath(self, info_path: Path, ) -> Path: # 服务器上转移后的模式 if self.mode == 1: sensor_path = self.base_path.joinpath( info_path.parent, "sensor.bin" ) elif self.mode == 0: sensor_path = self.base_path.joinpath( "data", info_path.parts[1][:4] + "_" + info_path.parts[1][-2:], info_path.parts[2], "sensor", info_path.name ) else: sensor_path = None return sensor_path pass def getOutputPathFromSensorPath(self, sensor_path: Path) -> Path: if self.mode == 1: output_path = self.output_path.joinpath( self.info_dict['year'] + "_" + self.info_dict['month'] + "_" + self.info_dict['day'] + "_" + self.info_dict['hour'] + "_" + self.info_dict['minute'] + "_" + self.info_dict['second'] ) elif self.mode == 0: output_path = self.output_path.joinpath( sensor_path.parts[1][:4] + "_" + sensor_path.parts[1][-2:] + "_" + sensor_path.parts[2] + "_" + sensor_path.name ) else: output_path = None return output_path pass def getCurrentMeasureTimeFromPath(self, fpath: Path) -> str: ret = '' if self.mode == 1: # 读信息txt获得时间 txt_path = fpath.parent.glob("*.txt") txt_stem = txt_path[0].stem ret = fpath.parts[1][:4]+"-"+fpath.parts[1][-2:]+"-"+fpath.parts[2]+" " \ + txt_stem[-9:-7] + ":" + \ txt_stem[-6:-4] + ":" + txt_stem[-3:-1] pass elif self.mode == 0: ret = fpath.parts[1][:4]+"-"+fpath.parts[1][-2:]+"-"+fpath.parts[2]+" " \ + fpath.name[0:2] + ":" + \ fpath.name[3:5] + ":" + fpath.name[6:8] else: pass return ret pass def getDataFileList(self, ): ''' 获得成对的info sensor 文件 [目录名,文件名,年月日,时间, measure_id] ''' # ret = [] fs = None self.filelist = [] if self.mode == 1: fs = self.server_path.glob("*/*/*/*/info.bin") else: fs = self.data_path.glob("*/*/info/*") for f in fs: error_file = {} if f.stat().st_size == 0: error_file.update({"path": f}) error_file.update({"error": "info file size is zero"}) self.error_result.append(error_file) continue # self.info_path_fname = f sensor_path = self.getSensorPathFromInfoPath(f) # sensor_path = Path(sensor_purepath) # sensor 文件不存在 if not sensor_path.exists(): error_file.update({"path": f}) error_file.update({"error": "cannot find the sensor file "}) self.error_result.append(error_file) continue # sensor文件大小为0 if sensor_path.stat().st_size == 0: error_file.update({"path": sensor_path}) error_file.update( {"error": "sensor file size of the sensor is zero' "}) self.error_result.append(error_file) continue self.setFilelist(f, sensor_path) pass def getDataFileList_SERVER(self,): ''' 获得成对的info sensor 文件 [目录名,文件名,年月日,时间] ''' # log.info(f" === ", self.server_path, "getDataFileList_SERVER") self.filelist = [] fs = self.server_path.glob("*/*/*/*/*/info.bin") for f in fs: # print(f) error_file = {} if f.stat().st_size == 0: error_file.update({"path": f}) error_file.update({"error": "info file size is zero"}) self.error_result.append(error_file) continue # 获得全部bin文件 (info.bin (num).bin),不含 pic.bin current_path = f.parent bin_files = current_path.glob('*.bin') res_files = [] # 检查(num).bin 是否有遗漏 for bl in bin_files: if bl.stem.isdigit(): res_files.append(bl) pass if res_files != []: res_files.append(f) self.filelist.append(res_files) pass