You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
264 lines
7.5 KiB
264 lines
7.5 KiB
from pathlib import PurePath, Path
|
|
|
|
# from myconfig import NEWLINE,TOKEN,SEPARATOR
|
|
"""
|
|
"""
|
|
class MyDir(object):
|
|
"""
|
|
操作方法:设置base tuple_dir header
|
|
设置的是路径, 文件名要 ifNotNewFile 传入
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.base_dir = Path()
|
|
self.dir_tuple = ()
|
|
self.header = []
|
|
self.header_str = ""
|
|
self.content = []
|
|
self.content_str = ""
|
|
self.current_dir = None
|
|
self.current_filepath = None
|
|
pass
|
|
|
|
def getDir(self,):
|
|
return self.current_dir
|
|
pass
|
|
|
|
def setBaseDir(self, dir: Path):
|
|
self.base_dir = dir
|
|
self.current_dir = self.base_dir
|
|
pass
|
|
|
|
def setDir(self, t:tuple=()):
|
|
''' set'''
|
|
self.dir_tuple = t
|
|
if len(self.dir_tuple) == 0 :
|
|
self.current_dir = self.base_dir
|
|
else:
|
|
self.current_dir = self.base_dir.joinpath( *t )
|
|
pass
|
|
|
|
def getDirFromBaseAndTuple(self, base_dir:Path, dir_tuple: tuple):
|
|
'''外部调用返回路径'''
|
|
ret_path = base_dir
|
|
t = dir_tuple
|
|
if len(t) == 0 :
|
|
ret_path = ret_path
|
|
else:
|
|
ret_path = ret_path.joinpath( *t )
|
|
return ret_path
|
|
pass
|
|
|
|
def setHeader(self, headerlist:list, headerSeperator: str = ";", headerinfo: str = None):
|
|
header_str = ""
|
|
if len(headerlist) == 0:
|
|
return
|
|
if headerinfo != None:
|
|
header_str = headerinfo + headerSeperator
|
|
for hl in headerlist:
|
|
header_str = header_str + str(hl) + headerSeperator
|
|
self.header_str = header_str[:-1]
|
|
pass
|
|
|
|
def setContent(self, contentlist: list, contentSeperator: str = ";", contentinfo: str = None):
|
|
content_str = ""
|
|
if len(contentlist) == 0:
|
|
return
|
|
if contentinfo != None:
|
|
content_str = contentinfo + contentSeperator
|
|
tmp_str = ""
|
|
for cl in contentlist:
|
|
tmp_str = tmp_str + str(cl) + contentSeperator
|
|
self.content_str = content_str + tmp_str[:-1]
|
|
pass
|
|
|
|
def newDirIfNot(self,) -> None:
|
|
# self.current_path = self.base_path.joinpath(self.path_tuple)
|
|
self.current_dir.mkdir(parents=True, exist_ok=True)
|
|
pass
|
|
|
|
def newFileIfNot(self, fname: str) -> None:
|
|
self.newDirIfNot()
|
|
self.current_filepath = self.current_dir.joinpath(fname)
|
|
if not self.current_filepath.exists():
|
|
with open(self.current_filepath, 'a') as f:
|
|
pass
|
|
return
|
|
pass
|
|
|
|
def getCurrentFileSize(self,):
|
|
return self.current_filepath.stat().st_size
|
|
|
|
def getFirstline(self,):
|
|
first_line = ""
|
|
with open(self.current_filepath, 'r') as f: # 打开文件
|
|
first_line = f.readline() # 取第一行
|
|
return first_line.strip('\n').strip('\r')
|
|
|
|
def checkHeader(self,) -> int:
|
|
'''
|
|
返回:
|
|
0 : 文件为空,可以直接写header
|
|
1 : header对应上 无需处理
|
|
-1: 需要提醒用户保存数据后,删除文件后再处理
|
|
'''
|
|
if self.getCurrentFileSize() == 0:
|
|
return 0
|
|
first_line = self.getFirstline()
|
|
# print(f"firstline: {first_line}" )
|
|
# print(f"header_str: {self.header_str}" )
|
|
if first_line == self.header_str:
|
|
return 1
|
|
return -1
|
|
pass
|
|
|
|
def writeHeader(self,) -> None:
|
|
with open(self.current_filepath, "a") as f:
|
|
f.write(self.header_str)
|
|
return None
|
|
pass
|
|
|
|
def writeContent(self,new_line="\n") -> None:
|
|
with open(self.current_filepath, "a") as f:
|
|
f.write(new_line+self.content_str)
|
|
return None
|
|
pass
|
|
|
|
def is_dir_empty(self, ):
|
|
'''文件夹是否为空'''
|
|
has_next = next(self.current_dir.iterdir(), None)
|
|
if has_next is None:
|
|
return True
|
|
return False
|
|
|
|
def is_file_empty(self,):
|
|
'''文件是否为空'''
|
|
if self.current_dir.stat().st_size ==0:
|
|
return True
|
|
return False
|
|
|
|
def deleteDir(self,):
|
|
'''文件夹是否为空'''
|
|
try:
|
|
if self.current_dir.exists():
|
|
self.current_dir.rmdir()
|
|
except OSError as e:
|
|
raise Exception(e)
|
|
return True
|
|
|
|
## 其他需求
|
|
def get_child_dir(self,) -> list:
|
|
ret = []
|
|
tmp_dir = self.current_dir.glob("**/")
|
|
for td in tmp_dir:
|
|
if td.is_dir():
|
|
ret.append(td.relative_to(self.current_dir))
|
|
return ret
|
|
pass
|
|
|
|
def get_child_dir_only(self,) -> list:
|
|
ret = []
|
|
for d in self.current_dir.iterdir():
|
|
if d.is_dir():
|
|
ret.append(d.relative_to(self.current_dir))
|
|
return ret
|
|
pass
|
|
|
|
def get_files_from_currentdir(self, fmt:str="*/*" ) -> list:
|
|
'''fmt: * */* */*/*'''
|
|
ret = []
|
|
tmp_dir = self.current_dir.glob(fmt)
|
|
# print(tmp_dir)
|
|
for td in tmp_dir:
|
|
if td.is_file():
|
|
ret.append(td)
|
|
return ret
|
|
pass
|
|
|
|
def sort_dir_and_check( self, dirs:list ):
|
|
'''相对目录排序,目录最后一级'''
|
|
ret = []
|
|
if len(dirs) == 0:
|
|
return ret
|
|
tmp = {}
|
|
tmp_lst = []
|
|
for d in dirs:
|
|
last:str = d.parts[-1]
|
|
if last.isdigit() :
|
|
tmp.update( {last:d} )
|
|
tmp_lst.append(int(last))
|
|
pass
|
|
|
|
tmp_lst.sort()
|
|
for t in tmp:
|
|
ret.append(tmp.get(str(t)))
|
|
pass
|
|
return ret
|
|
|
|
|
|
def sort_filepath_and_check(self, path_files:list):
|
|
'''相对目录排序,目录最后一级'''
|
|
ret = []
|
|
if len(path_files) == 0:
|
|
return ret
|
|
tmp = {}
|
|
tmp_lst = []
|
|
for d in path_files:
|
|
last:str = d.stem
|
|
if last.isdigit() :
|
|
tmp.update( {last:d} )
|
|
tmp_lst.append(int(last))
|
|
pass
|
|
|
|
tmp_lst.sort()
|
|
for t in tmp:
|
|
ret.append(tmp.get(str(t)))
|
|
pass
|
|
return ret
|
|
|
|
def group_and_sort_filepath(self,path_files:list):
|
|
ret = {}
|
|
# dirs_lst = []
|
|
# len_files = len(path_files)
|
|
# if len_files == 0:
|
|
# return False
|
|
# for pf in path_files:
|
|
# pf_dir:str = pf.parts[-2]
|
|
# if pf_dir.isdigit() and int(pf_dir) not in dirs_lst:
|
|
# dirs_lst.append( int(pf_dir) )
|
|
# dirs_lst.sort()
|
|
|
|
|
|
def check_dirs(self, dirs:list, begin:int =0, step:int=1):
|
|
'''检查目录是否从begin开始递增'''
|
|
len_dirs = len(dirs)
|
|
if len_dirs == 0:
|
|
return False
|
|
for i in range(begin,len_dirs,step) :
|
|
if dirs[i].parts[-1] != str(i) :
|
|
return False
|
|
return True
|
|
|
|
def check_path_files(self,path_files:list,begin:int =0, step:int=1):
|
|
'''检查文件名从begin开始递增'''
|
|
len_files = len(path_files)
|
|
if len_files == 0:
|
|
return False
|
|
for i in range(begin,len_files,step) :
|
|
if path_files[i].stem != str(i) :
|
|
return False
|
|
return True
|
|
|
|
if __name__ == "__main__":
|
|
mp = MyDir()
|
|
mp.setBaseDir(Path())
|
|
print(mp.current_dir)
|
|
# t = ("test_dir","1","11")
|
|
t = ("test_dir", )
|
|
mp.setDir( t )
|
|
print(mp.current_dir)
|
|
|
|
cd = mp.get_child_dir_only()
|
|
c = mp.sort_dir_and_check(cd)
|
|
print(cd )
|
|
print(c )
|
|
|