包含服务器端 ,桌面端两个分支
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
awrams/tools/mypath.py

265 lines
7.5 KiB

3 years ago
from pathlib import PurePath, Path
# from myconfig import NEWLINE,TOKEN,SEPARATOR
"""
"""
class MyDir(object):
"""
操作方法:设置base tuple_dir header
设置的是路径, 文件名要 ifNotNewFile 传入
"""
def __init__(self) -> None:
self.base_dir = Path()
self.dir_tuple = ()
self.header = []
self.header_str = ""
self.content = []
self.content_str = ""
self.current_dir = None
self.current_filepath = None
pass
def getDir(self,):
return self.current_dir
pass
def setBaseDir(self, dir: Path):
self.base_dir = dir
self.current_dir = self.base_dir
pass
def setDir(self, t:tuple=()):
''' set'''
3 years ago
self.dir_tuple = t
if len(self.dir_tuple) == 0 :
self.current_dir = self.base_dir
else:
self.current_dir = self.base_dir.joinpath( *t )
pass
def getDirFromBaseAndTuple(self, base_dir:Path, dir_tuple: tuple):
'''外部调用返回路径'''
ret_path = base_dir
t = dir_tuple
if len(t) == 0 :
ret_path = ret_path
else:
ret_path = ret_path.joinpath( *t )
return ret_path
pass
def setHeader(self, headerlist:list, headerSeperator: str = ";", headerinfo: str = None):
header_str = ""
if len(headerlist) == 0:
return
if headerinfo != None:
header_str = headerinfo + headerSeperator
for hl in headerlist:
header_str = header_str + str(hl) + headerSeperator
self.header_str = header_str[:-1]
pass
def setContent(self, contentlist: list, contentSeperator: str = ";", contentinfo: str = None):
content_str = ""
if len(contentlist) == 0:
return
if contentinfo != None:
content_str = contentinfo + contentSeperator
tmp_str = ""
for cl in contentlist:
tmp_str = tmp_str + str(cl) + contentSeperator
self.content_str = content_str + tmp_str[:-1]
pass
def newDirIfNot(self,) -> None:
# self.current_path = self.base_path.joinpath(self.path_tuple)
self.current_dir.mkdir(parents=True, exist_ok=True)
pass
def newFileIfNot(self, fname: str) -> None:
self.newDirIfNot()
self.current_filepath = self.current_dir.joinpath(fname)
if not self.current_filepath.exists():
with open(self.current_filepath, 'a') as f:
pass
return
pass
def getCurrentFileSize(self,):
return self.current_filepath.stat().st_size
def getFirstline(self,):
first_line = ""
with open(self.current_filepath, 'r') as f: # 打开文件
first_line = f.readline() # 取第一行
return first_line.strip('\n').strip('\r')
def checkHeader(self,) -> int:
'''
返回:
0 : 文件为空,可以直接写header
1 : header对应上 无需处理
-1: 需要提醒用户保存数据后,删除文件后再处理
'''
if self.getCurrentFileSize() == 0:
return 0
first_line = self.getFirstline()
# print(f"firstline: {first_line}" )
# print(f"header_str: {self.header_str}" )
if first_line == self.header_str:
return 1
return -1
pass
def writeHeader(self,) -> None:
with open(self.current_filepath, "a") as f:
f.write(self.header_str)
return None
pass
def writeContent(self,new_line="\n") -> None:
with open(self.current_filepath, "a") as f:
f.write(new_line+self.content_str)
return None
pass
def is_dir_empty(self, ):
'''文件夹是否为空'''
has_next = next(self.current_dir.iterdir(), None)
if has_next is None:
return True
return False
def is_file_empty(self,):
'''文件是否为空'''
if self.current_dir.stat().st_size ==0:
return True
return False
def deleteDir(self,):
'''文件夹是否为空'''
try:
if self.current_dir.exists():
self.current_dir.rmdir()
except OSError as e:
raise Exception(e)
return True
## 其他需求
def get_child_dir(self,) -> list:
ret = []
tmp_dir = self.current_dir.glob("**/")
for td in tmp_dir:
if td.is_dir():
ret.append(td.relative_to(self.current_dir))
return ret
pass
def get_child_dir_only(self,) -> list:
ret = []
for d in self.current_dir.iterdir():
if d.is_dir():
ret.append(d.relative_to(self.current_dir))
return ret
pass
def get_files_from_currentdir(self, fmt:str="*/*" ) -> list:
'''fmt: * */* */*/*'''
ret = []
tmp_dir = self.current_dir.glob(fmt)
# print(tmp_dir)
3 years ago
for td in tmp_dir:
if td.is_file():
ret.append(td)
return ret
pass
def sort_dir_and_check( self, dirs:list ):
'''相对目录排序,目录最后一级'''
ret = []
if len(dirs) == 0:
return ret
tmp = {}
tmp_lst = []
for d in dirs:
last:str = d.parts[-1]
if last.isdigit() :
tmp.update( {last:d} )
tmp_lst.append(int(last))
pass
tmp_lst.sort()
for t in tmp:
ret.append(tmp.get(str(t)))
pass
return ret
def sort_filepath_and_check(self, path_files:list):
'''相对目录排序,目录最后一级'''
ret = []
if len(path_files) == 0:
return ret
tmp = {}
tmp_lst = []
for d in path_files:
last:str = d.stem
if last.isdigit() :
tmp.update( {last:d} )
tmp_lst.append(int(last))
pass
tmp_lst.sort()
for t in tmp:
ret.append(tmp.get(str(t)))
pass
return ret
def group_and_sort_filepath(self,path_files:list):
ret = {}
# dirs_lst = []
# len_files = len(path_files)
# if len_files == 0:
# return False
# for pf in path_files:
# pf_dir:str = pf.parts[-2]
# if pf_dir.isdigit() and int(pf_dir) not in dirs_lst:
# dirs_lst.append( int(pf_dir) )
# dirs_lst.sort()
def check_dirs(self, dirs:list, begin:int =0, step:int=1):
'''检查目录是否从begin开始递增'''
len_dirs = len(dirs)
if len_dirs == 0:
return False
for i in range(begin,len_dirs,step) :
if dirs[i].parts[-1] != str(i) :
return False
return True
def check_path_files(self,path_files:list,begin:int =0, step:int=1):
'''检查文件名从begin开始递增'''
len_files = len(path_files)
if len_files == 0:
return False
for i in range(begin,len_files,step) :
if path_files[i].stem != str(i) :
return False
return True
if __name__ == "__main__":
mp = MyDir()
mp.setBaseDir(Path())
print(mp.current_dir)
# t = ("test_dir","1","11")
t = ("test_dir", )
mp.setDir( t )
print(mp.current_dir)
cd = mp.get_child_dir_only()
c = mp.sort_dir_and_check(cd)
print(cd )
print(c )