You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					266 lines
				
				7.5 KiB
			
		
		
			
		
	
	
					266 lines
				
				7.5 KiB
			| 
											2 years ago
										 | from pathlib import PurePath, Path
 | ||
|  | 
 | ||
|  | # from myconfig import NEWLINE,TOKEN,SEPARATOR
 | ||
|  | """
 | ||
|  | """
 | ||
|  | class MyDir(object):
 | ||
|  |     """
 | ||
|  |     操作方法:设置base tuple_dir header
 | ||
|  |     设置的是路径, 文件名要 ifNotNewFile 传入
 | ||
|  |     """
 | ||
|  | 
 | ||
|  |     def __init__(self) -> None:
 | ||
|  |         self.base_dir = Path()
 | ||
|  |         self.dir_tuple = ()
 | ||
|  |         self.header = []
 | ||
|  |         self.header_str = ""
 | ||
|  |         self.content = []
 | ||
|  |         self.content_str = ""
 | ||
|  |         self.current_dir = None
 | ||
|  |         self.current_filepath = None
 | ||
|  |         pass
 | ||
|  | 
 | ||
|  |     def getDir(self,):
 | ||
|  |         return self.current_dir
 | ||
|  |         pass
 | ||
|  | 
 | ||
|  |     def setBaseDir(self, dir: Path):
 | ||
|  |         self.base_dir = dir
 | ||
|  |         self.current_dir =  self.base_dir
 | ||
|  |         pass
 | ||
|  | 
 | ||
|  |     def setDir(self, t:tuple=()):
 | ||
|  |         self.dir_tuple = t
 | ||
|  |         if len(self.dir_tuple) == 0 :
 | ||
|  |             self.current_dir =  self.base_dir 
 | ||
|  |         else:
 | ||
|  |              self.current_dir = self.base_dir.joinpath( *t )
 | ||
|  |         pass
 | ||
|  |       
 | ||
|  |     def getDirFromBaseAndTuple(self, base_dir:Path, dir_tuple: tuple):
 | ||
|  |         '''外部调用返回路径'''
 | ||
|  |         ret_path = base_dir
 | ||
|  |         t = dir_tuple
 | ||
|  |         if len(t) == 0 :
 | ||
|  |             ret_path =  ret_path 
 | ||
|  |         else:
 | ||
|  |             ret_path = ret_path.joinpath( *t )
 | ||
|  |         return ret_path
 | ||
|  |         pass
 | ||
|  | 
 | ||
|  |     def setHeader(self, headerlist:list, headerSeperator: str = ";", headerinfo: str = None):
 | ||
|  |         header_str = ""
 | ||
|  |         if len(headerlist) == 0:
 | ||
|  |             return
 | ||
|  |         if headerinfo != None:
 | ||
|  |             header_str = headerinfo + headerSeperator
 | ||
|  |         for hl in headerlist:
 | ||
|  |             header_str = header_str + str(hl) + headerSeperator
 | ||
|  |         self.header_str = header_str[:-1]  
 | ||
|  |         pass
 | ||
|  | 
 | ||
|  |     def setContent(self, contentlist: list, contentSeperator: str = ";", contentinfo: str = None):
 | ||
|  |         content_str = ""
 | ||
|  |         if len(contentlist) == 0:
 | ||
|  |             return
 | ||
|  |         if contentinfo != None:
 | ||
|  |             content_str = contentinfo + contentSeperator
 | ||
|  |         tmp_str = ""
 | ||
|  |         for cl in contentlist:
 | ||
|  |             tmp_str = tmp_str + str(cl) + contentSeperator
 | ||
|  |         self.content_str = content_str + tmp_str[:-1]
 | ||
|  |         pass
 | ||
|  |     
 | ||
|  |     def newDirIfNot(self,) -> None:
 | ||
|  |         # self.current_path =  self.base_path.joinpath(self.path_tuple)
 | ||
|  |         self.current_dir.mkdir(parents=True, exist_ok=True)
 | ||
|  |         pass
 | ||
|  | 
 | ||
|  |     def newFileIfNot(self, fname: str) -> None:
 | ||
|  |         self.newDirIfNot()        
 | ||
|  |         self.current_filepath = self.current_dir.joinpath(fname)
 | ||
|  |         if not self.current_filepath.exists():
 | ||
|  |             with open(self.current_filepath, 'a') as f:
 | ||
|  |                 pass
 | ||
|  |         return
 | ||
|  |         pass
 | ||
|  | 
 | ||
|  |     def getCurrentFileSize(self,):
 | ||
|  |         return self.current_filepath.stat().st_size
 | ||
|  | 
 | ||
|  |     def getFirstline(self,):
 | ||
|  |         first_line = ""
 | ||
|  |         with open(self.current_filepath, 'r') as f:  # 打开文件
 | ||
|  |             first_line = f.readline()  # 取第一行
 | ||
|  |         return first_line.strip('\n').strip('\r')
 | ||
|  | 
 | ||
|  |     def checkHeader(self,) -> int:
 | ||
|  |         '''
 | ||
|  |         返回:
 | ||
|  |         0 : 文件为空,可以直接写header
 | ||
|  |         1 : header对应上 无需处理
 | ||
|  |         -1: 需要提醒用户保存数据后,删除文件后再处理
 | ||
|  |         '''
 | ||
|  |         if self.getCurrentFileSize() == 0:
 | ||
|  |             return 0
 | ||
|  |         first_line = self.getFirstline()
 | ||
|  |         # print(f"firstline:   {first_line}"  )
 | ||
|  |         # print(f"header_str:   {self.header_str}" )
 | ||
|  |         if first_line == self.header_str:
 | ||
|  |             return 1
 | ||
|  |         return -1
 | ||
|  |         pass
 | ||
|  | 
 | ||
|  |     def writeHeader(self,) -> None:
 | ||
|  |         with open(self.current_filepath, "a") as f:
 | ||
|  |             f.write(self.header_str)
 | ||
|  |         return None
 | ||
|  |         pass
 | ||
|  |     
 | ||
|  |     def writeContent(self,new_line="\n") -> None:
 | ||
|  |         with open(self.current_filepath, "a") as f:
 | ||
|  |             f.write(new_line+self.content_str)
 | ||
|  |         return None
 | ||
|  |         pass
 | ||
|  |       
 | ||
|  |     def is_dir_empty(self,  ):
 | ||
|  |         '''文件夹是否为空'''
 | ||
|  |         has_next = next(self.current_dir.iterdir(), None)
 | ||
|  |         if has_next is None:
 | ||
|  |             return True
 | ||
|  |         return False
 | ||
|  | 
 | ||
|  |     def is_file_empty(self,):
 | ||
|  |         '''文件是否为空'''
 | ||
|  |         if self.current_dir.stat().st_size ==0:
 | ||
|  |             return True
 | ||
|  |         return False
 | ||
|  | 
 | ||
|  |     def deleteDir(self,):
 | ||
|  |         '''文件夹是否为空'''
 | ||
|  |         try:
 | ||
|  |             if self.current_dir.exists():
 | ||
|  |                 self.current_dir.rmdir()
 | ||
|  |         except OSError as e:
 | ||
|  |             raise Exception(e)
 | ||
|  |         return True
 | ||
|  | 
 | ||
|  |     ## 其他需求
 | ||
|  |     def get_child_dir(self,) -> list:
 | ||
|  |         ret = []
 | ||
|  |         tmp_dir = self.current_dir.glob("**/")
 | ||
|  |         for td in tmp_dir:
 | ||
|  |             if td.is_dir():
 | ||
|  |                 ret.append(td.relative_to(self.current_dir))
 | ||
|  |         return ret
 | ||
|  |         pass
 | ||
|  | 
 | ||
|  |     def get_child_dir_only(self,) -> list:
 | ||
|  |         ret = []
 | ||
|  |         for d in self.current_dir.iterdir():
 | ||
|  |             if d.is_dir():
 | ||
|  |                 ret.append(d.relative_to(self.current_dir))
 | ||
|  |         return ret
 | ||
|  |         pass
 | ||
|  | 
 | ||
|  |     def get_files_from_currentdir(self, fmt:str="*/*" ) -> list:
 | ||
|  |         '''fmt:  *  */*  */*/*'''
 | ||
|  |         ret = []
 | ||
|  |         tmp_dir = self.current_dir.glob(fmt)
 | ||
|  |         print(tmp_dir)
 | ||
|  |         for td in tmp_dir:
 | ||
|  |             if td.is_file():
 | ||
|  |                 ret.append(td)
 | ||
|  |         return ret
 | ||
|  |         pass
 | ||
|  | 
 | ||
|  |     def sort_dir_and_check( self, dirs:list ):
 | ||
|  |         '''相对目录排序,目录最后一级'''
 | ||
|  |         ret = []
 | ||
|  |         if len(dirs) == 0:
 | ||
|  |             return ret
 | ||
|  |         tmp = {}
 | ||
|  |         tmp_lst = []
 | ||
|  |         for d in dirs:
 | ||
|  |             last:str = d.parts[-1]
 | ||
|  |             if last.isdigit() :
 | ||
|  |                 tmp.update( {last:d} )
 | ||
|  |                 tmp_lst.append(int(last))
 | ||
|  |             pass
 | ||
|  | 
 | ||
|  |         tmp_lst.sort()
 | ||
|  |         for t in tmp:
 | ||
|  |             ret.append(tmp.get(str(t)))
 | ||
|  |             pass
 | ||
|  |         return ret
 | ||
|  | 
 | ||
|  | 
 | ||
|  |     def sort_filepath_and_check(self, path_files:list):
 | ||
|  |         '''相对目录排序,目录最后一级'''
 | ||
|  |         ret = []
 | ||
|  |         if len(path_files) == 0:
 | ||
|  |             return ret
 | ||
|  |         tmp = {}
 | ||
|  |         tmp_lst = []
 | ||
|  |         for d in path_files:
 | ||
|  |             last:str = d.stem
 | ||
|  |             if last.isdigit() :
 | ||
|  |                 tmp.update( {last:d} )
 | ||
|  |                 tmp_lst.append(int(last))
 | ||
|  |             pass
 | ||
|  | 
 | ||
|  |         tmp_lst.sort()
 | ||
|  |         for t in tmp:
 | ||
|  |             ret.append(tmp.get(str(t)))
 | ||
|  |             pass
 | ||
|  |         return ret
 | ||
|  | 
 | ||
|  |     def group_and_sort_filepath(self,path_files:list):
 | ||
|  |         ret = {}
 | ||
|  |         # dirs_lst = []
 | ||
|  |         # len_files = len(path_files)
 | ||
|  |         # if len_files == 0:
 | ||
|  |         #     return False
 | ||
|  |         # for pf in path_files:
 | ||
|  |         #     pf_dir:str = pf.parts[-2]
 | ||
|  |         #     if pf_dir.isdigit() and int(pf_dir) not in dirs_lst:
 | ||
|  |         #         dirs_lst.append( int(pf_dir) )
 | ||
|  |         # dirs_lst.sort()
 | ||
|  | 
 | ||
|  | 
 | ||
|  |             
 | ||
|  | 
 | ||
|  |     def check_dirs(self, dirs:list, begin:int =0, step:int=1):
 | ||
|  |         '''检查目录是否从begin开始递增'''
 | ||
|  |         len_dirs = len(dirs)
 | ||
|  |         if len_dirs == 0:
 | ||
|  |             return False
 | ||
|  |         for i in range(begin,len_dirs,step) :
 | ||
|  |             if dirs[i].parts[-1] != str(i) :
 | ||
|  |                 return False
 | ||
|  |         return True
 | ||
|  | 
 | ||
|  |     def check_path_files(self,path_files:list,begin:int =0, step:int=1):
 | ||
|  |         '''检查文件名从begin开始递增'''
 | ||
|  |         len_files = len(path_files)
 | ||
|  |         if len_files == 0:
 | ||
|  |             return False
 | ||
|  |         for i in range(begin,len_files,step) :
 | ||
|  |             if path_files[i].stem != str(i) :
 | ||
|  |                 return False
 | ||
|  |         return True
 | ||
|  | 
 | ||
|  | if __name__ == "__main__":
 | ||
|  |     mp = MyDir()
 | ||
|  |     mp.setBaseDir(Path())
 | ||
|  |     print(mp.current_dir)
 | ||
|  |     # t = ("test_dir","1","11")
 | ||
|  |     t = ("test_dir", )
 | ||
|  |     mp.setDir( t )
 | ||
|  |     print(mp.current_dir)
 | ||
|  | 
 | ||
|  |     cd = mp.get_child_dir_only()
 | ||
|  |     c = mp.sort_dir_and_check(cd)
 | ||
|  |     print(cd  )
 | ||
|  |     print(c )
 |