包含服务器端 ,桌面端两个分支
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
awrams/awrams.py

1031 lines
37 KiB

#! python3
# -*- encoding: utf-8 -*-
'''
@File : handheld.py
@Time : 2023/02/24 17:20:59
@Author : Jim @ Yiwin
@Version : 1.0
@Contact : jim@yi-win.com
'''
CUR_TIME_STR_FMT = "%Y-%m-%d %H:%M:%S"
import time
import locale
import struct
import numpy as np
from pathlib import PurePath,Path
from myconfig import CURRENT_DIR,DATA_DIR,OUTPUT_DIR,NEWLINE,ROWFACTOR,SAVE_EXT_NAME,TOKEN
from myconfig import DeviceType,RamsesSURFACE,RamsesAWRAMS,RamsesPROFILE
from tools.mylogger import log
from tools.myexception import MyException
from tools.mytime import MyTime
from tools.mypath import MyDir
from Ramses import Ramses
class HandHeldBuf:
def __init__(self,) -> None:
self.__buf = b''
self.__head = {}
self.__begin_sign = b'\x23'
self.__end_sign = b'\x0D'
self.data_ip = b''
self.measure_group = {
"Lsky": b'',
"Esky": b'',
"Lwater": b'',
}
self.one_group_data= b''
self.state = 0
def readFile2Buf(self, fpath) -> None:
with open(fpath,"rb") as f:
self.__buf = f.read()
pass
pass
def read_buf(self, size: int) -> bytes:
if size > self.__buf.__len__():
return b''
ret = self.__buf[0:size]
self.__buf = self.__buf[size:]
return ret
def write_buf(self, buf: bytes) -> None:
len = buf.__len__()
# logging.info(f'Received ID:{id} Size:{len}')
self.__buf = self.__buf+buf
def get_buf_size(self) -> int:
return self.__buf.__len__()
def back_bytes(self, buf: bytes) -> None:
self.__buf = buf+self.__buf
def reset_head(self) -> None:
self.__head = {}
def reset_buf(self) -> None:
self.__buf = b''
def getResult(self) -> str:
return self.res
def resetMeasureGroup(self) -> None:
self.measure_group['Lsky'] = b''
self.measure_group['Esky'] = b''
self.measure_group['Lwater'] = b''
def getMeasureGroup(self) -> dict:
return self.measure_group
def decode_handheld(self) -> bool:
'''以26个字节开始,一般26个00 ,然后23... 07.... '''
if self.get_buf_size() < 1754:
self.__buf = b''
return False
self.data_ip = self.read_buf(26)
Lsky = self.read_buf(576)
Esky = self.read_buf(576)
Lwater = self.read_buf(576)
self.measure_group['Lsky'] = self.deal_576_to_512(Lsky)
self.measure_group['Esky'] = self.deal_576_to_512(Esky)
self.measure_group['Lwater'] = self.deal_576_to_512(Lwater)
if self.measure_group['Lsky'] == b'' \
or self.measure_group['Esky'] == b'' \
or self.measure_group['Lwater'] == b'' :
return False
return True
def decode_one_group_handheld(self) -> bool:
'''以26个字节开始,一般26个00 ,然后23... 07.... '''
if self.get_buf_size() < 1754:
self.__buf = b''
return False
self.data_ip = self.read_buf(26)
self.one_group_data = self.read_buf(1728)
return True
def deal_576_to_512(self,data:bytes) -> bytes:
''' 576字节校验,拆分成字典 23... 07.... ,然后254*2'''
ret = {}
index = 72
for i in range(8):
temp = data[i*index : i*index+index]
# print( temp.hex())
if temp[0] != 35 and temp[0]<8 and temp>0 :
return b''
pass
ret.update( { temp[4]: temp} )
if len(ret) != 8:
return b''
ret_byte = ret[7][7:71]+ ret[6][7:71]+ ret[5][7:71]+ ret[4][7:71] \
+ ret[3][7:71]+ ret[2][7:71]+ ret[1][7:71]+ ret[0][7:71]
return ret_byte
def decode(self) -> str:
ret = ''
temp_buf = b''
token = ";"
if TOKEN:
token = TOKEN
if self.state == 0:
while self.get_buf_size() >= 1:
if self.read_buf(1) != self.__end_sign:
continue
self.state = 1
break
if self.state == 1:
while self.get_buf_size() >= 1:
buf = self.read_buf(1)
if buf != self.__end_sign:
temp_buf += buf
if buf == self.__end_sign:
ret = temp_buf[0:20].decode(
'utf-8').strip(" ") + token + temp_buf[-9:-1].decode('utf-8').strip(" ")
log.info(f"decode : {ret}")
temp_buf = b''
self.back_bytes(temp_buf) # 写回临时buf到
self.res = ret
return ret
pass
class HandHeldPath(object):
'''处理一次测量'''
def __init__(self, ):
self.mode = 0
self.cfg ={}
self.mydir = MyDir()
self.base_path:Path = Path()
self.output_path:Path = Path()
self.data_path:Path = Path()
self.filelist = []
self.error_result = []
pass
def setMode( self, mode:int = 0 ):
self.mode = mode
pass
def setBasePath( self, fpath:Path ):
self.base_path = fpath
pass
def setDataPath( self, fpath:Path ):
self.data_path = fpath
pass
def setOutputPath( self, fpath:Path ):
self.output_path = fpath
pass
def getSensorPathFromInfoPath( self, info_path:Path, ) -> Path:
# 服务器上转移后的模式
if self.mode == 1:
sensor_path = self.base_path.joinpath(
info_path.parent,
"sensor.bin"
)
elif self.mode == 0:
sensor_path = self.base_path.joinpath(
"data",
info_path.parts[1][:4] + "_" + info_path.parts[1][-2:],
info_path.parts[2],
"sensor",
info_path.name
)
else:
sensor_path = None
return sensor_path
pass
def getOutputPathFromSensorPath(self,sensor_path:Path ) -> Path:
if self.mode == 1:
output_path = self.output_path.joinpath(
self.info_dict['year'] + "_" + self.info_dict['month']
+ "_" + self.info_dict['day'] + "_" + self.info_dict['hour']
+ "_" + self.info_dict['minute'] + "_" + self.info_dict['second']
)
elif self.mode == 0:
output_path = self.output_path.joinpath(
sensor_path.parts[1][:4] + "_" + sensor_path.parts[1][-2:]
+ "_" +sensor_path.parts[2] + "_" +sensor_path.name
)
else:
output_path = None
return output_path
pass
def getCurrentMeasureTimeFromPath(self,fpath:Path ) -> str:
ret = ''
if self.mode == 1: # 读信息txt获得时间
txt_path = fpath.parent.glob("*.txt")
txt_stem = txt_path[0].stem
ret = fpath.parts[1][:4]+"-"+fpath.parts[1][-2:]+"-"+fpath.parts[2]+" " \
+ txt_stem[-9:-7] + ":" + txt_stem[-6:-4] + ":" + txt_stem[-3:-1]
pass
elif self.mode == 0:
ret = fpath.parts[1][:4]+"-"+fpath.parts[1][-2:]+"-"+fpath.parts[2]+" " \
+ fpath.name[0:2] + ":" + fpath.name[3:5] + ":" + fpath.name[6:8]
else:
pass
return ret
pass
def getDataFileList(self, ):
'''
获得成对的info sensor 文件
[目录名,文件名,年月日,时间, measure_id]
'''
# ret = []
fs = None
self.filelist = []
if self.mode == 1:
fs = self.data_path.glob( "*/*/*/*/info.bin" )
else:
fs = self.data_path.glob( "*/*/info/*" )
for f in fs:
error_file = {}
if f.stat().st_size==0:
error_file.update( {"path": f } )
error_file.update( {"error":"info file size is zero"} )
self.error_result.append(error_file)
continue
# self.info_path_fname = f
sensor_path = self.getSensorPathFromInfoPath(f)
# sensor_path = Path(sensor_purepath)
# sensor 文件不存在
if not sensor_path.exists():
error_file.update( {"path":f} )
error_file.update( {"error":"cannot find the sensor file "} )
self.error_result.append(error_file)
continue
# sensor文件大小为0
if sensor_path.stat().st_size==0:
error_file.update( {"path":sensor_path} )
error_file.update( {"error":"sensor file size of the sensor is zero' "} )
self.error_result.append(error_file)
continue
self.setFilelist(f,sensor_path )
pass
# def getDataFileList_SD(self, mode:int = 0 ):
# '''
# 获得成对的info sensor 文件
# [目录名,文件名,年月日,时间]
# '''
# # ret = []
# fs = self.data_path.glob( "*/*/info/*" )
# for f in fs:
# error_file = {}
# if f.stat().st_size==0:
# error_file.update( {"path": f.name } )
# error_file.update( {"error":"info file size is zero"} )
# self.error_result.append(error_file)
# continue
# self.info_path_fname = f
# sensor_path = self.getSensorPathFromInfoPath( f )
# # sensor_path = Path(sensor_purepath)
# # sensor 文件不存在
# if not sensor_path.exists():
# error_file.update( { "path" : f } )
# error_file.update( {"error":"cannot find the sensor file "} )
# self.error_result.append(error_file)
# continue
# # sensor文件大小为0
# if sensor_path.stat().st_size==0:
# error_file.update( { "path" : f } )
# error_file.update( {"error" : "the file size of the sensor is zero' "} )
# self.error_result.append(error_file)
# continue
# self.setFilelist(f,sensor_path,mode=mode)
# # print(self.filelist)
# pass
def setFilelist(self, info_path:Path, sensor_path:Path):
temp = {}
temp.update( {"info_path" : info_path } )
temp.update( {"name" : info_path.name} )
temp.update( {"parent" : info_path.parent} )
if self.mode==1: # 服务器转移后目录
temp.update( {"year" : info_path.parts[-5] } )
temp.update( {"month" :info_path.parts[-4] } )
temp.update( {"day" :info_path.parts[-3] } )
temp.update( {"year" : info_path.parts[1][:4] } )
temp.update( {"month" :info_path.parts[1][-2:] } )
temp.update( {"day" :info_path.parts[2] } )
temp.update( { "sensor_path" :sensor_path } )
self.filelist.append(temp)
def getFilelist(self, ):
return self.filelist
def printTest(self,d,sleep_time:int=5):
log.info( f"***** : I am testing from HandheldPath ********", __name__ )
print(d)
log.info( f"***** : Ending testing from HandheldPath ********", __name__ )
time.sleep(sleep_time)
pass
class AWRAMS(object):
def __init__(self, ):
"""
@description : 手持数据初始化
receive --> deal() 调用 dealOneMeasurement_Online()
"""
self.device_type = DeviceType.AWRAMS.name
self.device_enum = None
self.device_id = {}
self.syscfg = {}
self.retrieve ={}
self.error_result=[]
self.mode = 0 # 0:默认SD数据, 1:服务器数据
self.base_path = CURRENT_DIR
self.current_path = CURRENT_DIR
self.data_path = DATA_DIR
self.output_path = OUTPUT_DIR
self.info_path_fname:Path = self.base_path
self.sensor_path_fname:Path = self.base_path
self.filelist = [] # 包含了全部路径信息
self.intensity_before_avg = None
self.intensity_after_avg = {} # 最终结果{lsky: esky: lwater: }
self.intensity_after_interpo = {} # 最终结果{lsky: esky: lwater: }
self.one_group_result = {} # 手持式 多次间隔
self.wavelength = np.array([])
self.real_wavelength = {} # 最终结果{lsky: esky: lwater: }
self.current_filepath = '' # 不含后缀
self.current_measure_time = None # 当前测量时间
self.current_group_num = 0
self.measurement_interval = 0
self.measurement_repeat = 1
# self.sl2f = SaveList2File()
self.mydir = MyDir()
self.my_time = MyTime()
self.hhb = HandHeldBuf()
self.hhp = HandHeldPath()
self.ramses = Ramses()
self.info_dict = {}
self.res = {} # 最终结果{lsky: esky: lwater: Lw: Rs:}
# 方法
self.setDeviceEnum()
self.setMode()
self.set_hhp_path()
pass
def setMode(self,mode=0 ):
self.hhp.setMode (mode) # 0 :读SD卡 1:读服务器数据
def setMeasureID(self, mid:int):
self.measure_id = mid
def setOldFolder(self, tuple_path:tuple):
self.old_folder = self.base_path.joinpath( *tuple_path )
def transferFromOldFolder(self, ):
log.info( f"transferFromOldFolder: ", __name__ )
self.getNewFolderFromOldFolder()
bin_files = self.old_folder.glob('*.bin')
for bf in bin_files:
if bf.name == "pic.bin":
bf.replace( self.new_folder.joinpath( "pic.jpg" ) )
else:
bf.replace( self.new_folder.joinpath(bf.name) )
pass
def deleteOldFolder(self, ):
log.info( f"deleteOldFolder: ", __name__ )
self.mydir.setBaseDir(self.old_folder)
self.mydir.deleteDir()
self.old_folder = DATA_DIR
def getNewFolderFromOldFolder( self, ) -> Path:
# 服务器上转移后的模式
tmp_folder = self.old_folder.parent.parent
path_tuple = ("20"+str(self.info_dict['year'])
,str(self.info_dict['month'])
,str(self.info_dict['day'])
,self.old_folder.parts[-1] )
self.new_folder =tmp_folder.joinpath(*path_tuple)
self.mydir.setBaseDir(self.new_folder)
self.mydir.newDirIfNot()
pass
def getInfoDict(self, ):
info_path = self.old_folder.joinpath("info.bin")
hexbin = self.read_bin(info_path)
self.info_dict = self.decode_info(hexbin)
self.new_folder = self.getNewFolderFromOldFolder()
def set_hhp_path(self, ):
self.hhp.setBasePath (self.base_path)
self.hhp.setDataPath (self.data_path)
self.hhp.setOutputPath (self.output_path)
def setDeviceEnum(self, ):
if self.device_type == DeviceType.SURFACE.name:
self.device_enum = RamsesSURFACE
if self.device_type == DeviceType.AWRAMS.name:
self.device_enum = RamsesAWRAMS
if self.device_type == DeviceType.PROFILE.name:
self.device_enum = RamsesPROFILE
def getDataFileList(self, ):
self.hhp.getDataFileList()
self.filelist = self.hhp.getFilelist()
# self.printTest(self.filelist)
pass
def dealAllMeasurements(self, ):
log.info(f" 所有测量文件", __name__, "dealAllMeasurements", )
if len(self.filelist)<1:
pass
# 前面已经考虑各种文件错误
for df in self.filelist:
# 处理信息帧
self.info_dict = {}
self.info_path_fname:Path = df["info_path"]
self.sensor_path_fname:Path = df["sensor_path"]
hex_info = self.read_bin( self.info_path_fname )
try:
self.info_dict = self.decode_info( hex_info )
except:
log.error( f"处理信息文件"
+ "/" +self.info_path_fname.stem
+ "出现错误", __name__, "dealAllMeasurements" )
raise MyException( "处理文件"+ self.info_path_fname + "出现错误")
pass
try: # awrams每次只有一组数据
self.dealOneMeasurement(self.sensor_path_fname )
# self.measurement_interval = int(self.info_dict['Measure_Interval'])
# self.measurement_repeat = int(self.info_dict['Measure_Repeat'])
# self.dealOneHandheldMeasurement(self.sensor_path_fname )
except Exception as e:
log.error( "处理传感器文件" + self.sensor_path_fname.name + " 出现错误",__name__,"dealAllMeasurements")
raise MyException( "处理传感器文件" + self.sensor_path_fname.name + " 出现错误" )
pass
log.info(f"Finished !! ", __name__, "dealAllMeasurements")
return True,self.error_result
pass
def dealOneHandheldMeasurement(self, fpath:Path):
'''handheld一次测量包含多组数据'''
# 调用handheldbuf 处理,将一组数据提交出来
log.info(f" 手持一个文件,多组测量数据", __name__, "dealOneHandheldMeasurement")
if len(self.filelist)<1:
pass
# 当前文件名
self.output_path = OUTPUT_DIR
self.current_filepath = fpath
self.current_measure_time = self.hhp.getCurrentMeasureTimeFromPath(fpath)
self.ymdhms = "20"+ str(self.info_dict['year']) + '_' \
+ str(self.info_dict['month']) + '_' \
+ str(self.info_dict['day']) + '_' \
+ str(self.info_dict['hour']) + '_' \
+ str(self.info_dict['minute']) + '_' \
+ str(self.info_dict['second'])
self.output_path = self.output_path.joinpath( self.ymdhms )
log.debug(f"current_measure_time: {self.current_measure_time}", __name__, "dealOneHandheldMeasurement")
self.hhb.readFile2Buf(fpath)
log.debug(f"buf: {self.hhb.get_buf_size()}", __name__, "dealOneHandheldMeasurement")
self.decode_sensor_buf()
# 解析Buf, 对buf分组,获得[{lsky: esky : lwater} .... ]
len_total = len(self.intensity_before_avg)
if len_total%self.measurement_repeat != 0:
self.res = {}
return # 返回退出
group_num = int(len_total/self.measurement_repeat)
log.info(f"group_num...: {group_num}", __name__, "dealOneHandheldMeasurement")
self.real_wavelength = self.getWavelenthDict()
if group_num == 1:
self.dealOneGroup() # self.intensity_before_avg
return
self.dealMultiGroup(group_num)
def dealMultiGroup(self, group_num:int ):
log.info(f"group_num: {group_num}", __name__, "dealMultiGroup")
# 分组进行处理
for i in range(group_num):
self.current_group_num = i
# 重设当前测量时间
self.current_measure_time = ""
self.res = {}
tmp_before_avg = []
for j in range( self.measurement_repeat ):
tmp_before_avg.append( self.intensity_before_avg[j+i*self.measurement_repeat] )
pass
self.getAvg(tmp_before_avg)
self.__do_sensor_dict_interpo()
self.appendSave( )
def dealOneGroup(self, ):
# 分组,并获得平均值, 255个未插值结果 (依据 measurement_interval measurement_repeat)
self.getAvg( self.intensity_before_avg )
# 插值
self.real_wavelength = self.getWavelenthDict()
self.__do_sensor_dict_interpo()
self.ymdhms = "20"+ str(self.info_dict['year']) + '_' \
+ str(self.info_dict['month']) + '_' \
+ str(self.info_dict['day']) + '_' \
+ str(self.info_dict['hour']) + '_' \
+ str(self.info_dict['minute']) + '_' \
+ str(self.info_dict['second'])
self.output_path = self.output_path.joinpath( self.ymdhms )
self.appendSave()
def dealOneMeasurement(self, fpath:Path):
'''handheld一次测量包含多组数据'''
# 调用handheldbuf 处理,将一组数据提交出来
log.info(f"dealOneMeasurement: 一组测量数据", __name__, "dealOneMeasurement")
if len(self.filelist)<1:
pass
# 当前文件名
self.output_path = OUTPUT_DIR
self.current_filepath = fpath
self.current_measure_time = self.hhp.getCurrentMeasureTimeFromPath(fpath)
self.ymdhms = "20"+ str(self.info_dict['year']) + '_' \
+ str(self.info_dict['month']) + '_' \
+ str(self.info_dict['day']) + '_' \
+ str(self.info_dict['hour']) + '_' \
+ str(self.info_dict['minute']) + '_' \
+ str(self.info_dict['second'])
self.output_path = self.output_path.joinpath( self.ymdhms )
log.debug(f"current_measure_time: {self.current_measure_time}", __name__, "dealOneHandheldMeasurement")
self.hhb.readFile2Buf(fpath)
self.decode_sensor_buf()
self.real_wavelength = self.getWavelenthDict()
self.dealOneGroup()
def dealOneMeasurement_Online(self, ):
'''handheld一次测量包含多组数据'''
# 调用handheldbuf 处理,将一组数据提交出来
log.info(f"dealOneMeasurement_Online: 一组测量数据", __name__, "dealOneMeasurement_Online")
# 当前文件名
self.output_path = self.new_folder
# self.current_measure_time = self.hhp.getCurrentMeasureTimeFromPath(fpath)
self.ymdhms = self.getTimeFnameFromInfoDict()
# getBuf
self.get_sensor_buf()
self.decode_sensor_buf()
self.real_wavelength = self.getWavelenthDict()
self.getAvg( self.intensity_before_avg )
self.real_wavelength = self.getWavelenthDict()
self.__do_sensor_dict_interpo()
self.saveOnefileForLskyEskyLwaterLwRS()
def get_sensor_buf(self,) :
buf = b""
self.mydir.setBaseDir(self.new_folder)
my_bin_list = self.mydir.get_files_from_currentdir("*.bin")
my_bin_list = self.mydir.sort_filepath_and_check(my_bin_list)
for mbl in my_bin_list:
buf = buf + self.read_bin(mbl)
self.hhb.write_buf(buf)
pass
def getTimeFnameFromInfoDict(self,) :
ymdhms = "20"+ str(self.info_dict['year']) + '_' \
+ str(self.info_dict['month']) + '_' \
+ str(self.info_dict['day']) + '_' \
+ str(self.info_dict['hour']) + '_' \
+ str(self.info_dict['minute']) + '_' \
+ str(self.info_dict['second'])
return ymdhms
def decode_sensor_buf(self,) :
# 处理Buf,对多组数据取平均
self.intensity_before_avg = []
# res_before_avg = []
self.clearRes() # 清空数据
while True:
if not self.hhb.decode_one_group_handheld() :
break # 清空数据
res = {}
buf = self.hhb.one_group_data[26:]
for i in range(1,4,1):
temp_buf = buf[7:71] + buf[79:143] + \
buf[151:215] + buf[223:287] + \
buf[295:359] + buf[367:431] + \
buf[439:503] + buf[511:575]
# Ramses类 标定处理,设置buf 标定文件
self.ramses.setBuf(temp_buf)
func = self.getFuncBySeq(i)
cfg = self.getCfgByFunc( func)
self.ramses.setCalCfg(cfg)
self.ramses.resetItSpectrum()
self.ramses.ConvertAndCalibrate()
res.update({ func : self.ramses.spectrum })
self.intensity_before_avg.append( res )
pass
def getAvg( self, d:list) :
log.info(f"getAvg: 平均多组数据", __name__, "getAvg")
data = d
ret = {}
len_result = len(data)
if len_result == 0:
self.intensity_after_avg ={}
return None
if len_result == 1:
self.intensity_after_avg = data[0]
return None
ret = data[0]
res_dict = self.getRetDict()
for k in res_dict.keys():
for i in range(1,len_result,1):
data[0][k] = data[0][k] + data[i][k]
ret = data[0][k]/len_result
self.intensity_after_avg.update( { k : ret } )
log.debug(f"getAvg: {self.intensity_after_avg}", __name__, "getAvg")
pass
def getRetDict(self,) :
ret_dict = { }
ret_dict.update( {self.device_enum(1).name:np.array([])} )
ret_dict.update( {self.device_enum(2).name:np.array([])} )
ret_dict.update( {self.device_enum(3).name:np.array([])} )
# self.one_group_result = ret_dict
return ret_dict
def __do_sensor_dict_interpo(self,) :
log.info( f"同步处理多个个插值 ", __name__, "__do_sensor_dict_interpo" )
self.clearRes()
for k in self.intensity_after_avg.keys():
tmp = np.interp( self.wavelength, self.real_wavelength[k], self.intensity_after_avg[k] )
self.res.update( { k : tmp } )
def getCfgByDid(self, func:str) :
cfg_id:dict = self.cfg.get(int(self.device_id))
cfg_sensor = cfg_id.get( func)
return cfg_sensor
pass
def getFuncBySeq(self, seq:int) :
func = ""
if self.device_type == DeviceType.AWRAMS.name:
func = RamsesAWRAMS(seq).name
if self.device_type == DeviceType.SURFACE.name:
func = RamsesAWRAMS(seq).name
if self.device_type == DeviceType.PROFILE.name:
func = RamsesAWRAMS(seq).name
return func
def getCfgByFunc(self, func:str) :
cfg_id:dict = self.syscfg.get(int(self.device_id))
cfg_sensor = cfg_id.get( func)
return cfg_sensor
pass
def getCfgBySeq(self, seq:int) :
func = ""
if self.device_type == DeviceType.AWRAMS.name:
func = RamsesAWRAMS(seq).name
if self.device_type == DeviceType.SURFACE.name:
func = RamsesAWRAMS(seq).name
if self.device_type == DeviceType.PROFILE.name:
func = RamsesAWRAMS(seq).name
cfg_id:dict = self.syscfg.get(int(self.device_id))
cfg_sensor = cfg_id.get( func)
return cfg_sensor
pass
def saveOnefileForLskyEskyLwaterLwRS(self, ) -> bool:
log.info(f" ", __name__, "saveOnefileForLskyEskyLwaterLwRS")
self.mydir.setBaseDir(self.output_path) #基路径
time_name = self.getTimeFnameFromInfoDict()
Lw = self.res["Lwater"] - ROWFACTOR * self.res["Lsky"]
self.res.update({ self.device_enum(4).name : Lw })
Rs = self.res[self.device_enum(4).name] / self.res["Esky"]
self.res.update({ self.device_enum(5).name : Rs })
self.mydir.setHeader( self.wavelength.tolist(), TOKEN, "device_id_"+str(self.device_id) )
header = self.mydir.header_str
save_path_csv = self.new_folder.joinpath(time_name + SAVE_EXT_NAME)
data_str = ""
for k in self.res.keys():
self.mydir.setContent(self.res[k].tolist(),TOKEN,k )
data_str = data_str + NEWLINE + self.mydir.content_str
save_path_csv.write_text(header+data_str)
path_info_txt = self.new_folder.joinpath( time_name + "_info.txt" )
self.save_dict_to_file( self.info_dict, path_info_txt )
self.do_retrieve()
def do_retrieve(self, ) -> None:
'''反演参数并保存 self.res'''
PAR_400_700 = self.get_par_400_700()
PAR_350_950 = self.get_par_350_950()
chl = self.get_chl()
cdom = self.get_CDOM()
pass
def get_par_400_700(self, ) -> float:
'''反演参数并保存 基于self.res'''
return 0.0
pass
def get_par_350_950(self, ) -> float:
'''反演参数并保存 基于self.res'''
return 0.0
pass
def get_chl(self, ) -> float:
'''反演参数并保存 基于self.res'''
return 0.0
pass
def get_CDOM(self, ) -> float:
'''反演参数并保存 基于self.res'''
return 0.0
pass
def appendSave(self, ) -> bool:
'''self.output_path'''
self.checkAndSaveData( )
self.getLwRsAndSave()
path_info_txt = self.output_path.joinpath( "info.txt" )
self.save_dict_to_file( self.info_dict, path_info_txt )
def checkAndSaveData(self, ) -> bool:
"""
check self.Lsky Esky Lwater and Save
处理self.res 的数据
"""
log.info(f"checkAndSaveData: {self.output_path.parts}", __name__)
self.mydir.setBaseDir(self.output_path) #基路径
self.mydir.setHeader( self.wavelength.tolist(), TOKEN, "device_id_"+str(self.device_id) )
# print(f"header_str : {self.mydir.header_str[-1:]}")
if self.current_group_num == 0:
self.newFileByFunc( self.device_enum(1).name )
self.newFileByFunc( self.device_enum(2).name )
self.newFileByFunc( self.device_enum(3).name )
pass
self.appendFileByFunc( self.device_enum(1).name )
self.appendFileByFunc( self.device_enum(2).name )
self.appendFileByFunc( self.device_enum(3).name )
def newFileByFunc(self, func:str) -> None:
self.mydir.newFileIfNot( func+SAVE_EXT_NAME)
if self.mydir.checkHeader() == -1:
log.error(f"请备份文件:{self.mydir.current_filepath.parent} {self.mydir.current_filepath.name}, 并删除文件后再试!", __name__)
raise MyException( f"请备份文件:{self.mydir.current_filepath.parent} {self.mydir.current_filepath.name}, 并删除文件后再试!" )
return False
if self.mydir.checkHeader() == 1:
pass
if self.mydir.checkHeader() == 0:
self.mydir.writeHeader()
pass
# 写入数据content
self.mydir.setContent(self.res[func].tolist(), TOKEN, self.current_measure_time)
self.mydir.writeContent()
def appendFileByFunc(self, func:str) -> None:
# 追加写入数据content
self.mydir.setContent(self.res[func].tolist(), TOKEN, self.current_measure_time)
self.mydir.writeContent()
def clearRes(self, ) -> None:
self.res = { }
def getLwRsAndSave(self, ) -> bool:
"""
并计算Lw Rs并保存
"""
Lw = self.res["Lwater"] - ROWFACTOR * self.res["Lsky"]
self.res.update({ "Lw" : Lw })
Rs = self.res["Lw"] / self.res["Esky"]
self.res.update({ "Rs" : Rs })
self.mydir.setBaseDir(self.output_path) #基路径
# 保存
if self.current_group_num == 0:
self.newFileByFunc( "Lw" )
self.newFileByFunc( "Rs" )
pass
self.appendFileByFunc( "Lw" )
self.appendFileByFunc( "Rs" )
return True
def read_bin(self,fpath: Path):
log.debug(f" readbin: ", __name__, "", "" )
ret = None
if not fpath.exists() :
log.info(f"not find file: {fpath} ")
return ret
with open(fpath, 'rb') as file:
ret = file.read()
return ret
log.debug(f" readbin: {ret} ", __name__, "", "" )
return ret
pass
def decode_info( self,info: bytes ) -> dict:
ret = {}
# 剖面型加了1311+24个字节共26个字节
# 保留字节有所变化,改为序列号,每个序列号两个字节
try:
temp = struct.unpack("<BBBBBBHHHHHHIIHHHHHBBBHHIfff \
HHHBHBHHHHH",
info)
except Exception as e:
return ret
log.debug(temp,__name__,"decode_info")
time_ = "20"+str(temp[0]) + "-" + str(temp[1]) + "-" + str(temp[2]) + " " \
+ str(temp[3]) + ":" + str(temp[4]) + ":" + str(temp[5])
ret.update({"time": time_})
ret.update({"year": temp[0]})
ret.update({"month": temp[1]})
ret.update({"day": temp[2]})
ret.update({"hour": temp[3]})
ret.update({"minute": temp[4]})
ret.update({"second": temp[5]})
ret.update({"Roll": temp[6]})
ret.update({"Pitch": temp[7]})
ret.update({"Yaw": temp[8]})
ret.update({"Hx": temp[9]})
ret.update({"Hy": temp[10]})
ret.update({"Hz": temp[11]})
ret.update({"lon": temp[12]})
ret.update({"lat": temp[13]})
ret.update({"satelite_num": temp[14]})
ret.update({"PDOP": temp[15]})
ret.update({"HDOP": temp[16]})
ret.update({"VDOP": temp[17]})
ret.update({"Temperature": temp[18]})
ret.update({"Humidity": temp[19]})
ret.update({"Battery": temp[20]})
ret.update({"ErrorCode": temp[21]})
ret.update({"Azimuth": temp[22]})
ret.update({"RunAngle": temp[23]})
ret.update({"MeasuyeGroupNum": temp[24]})
ret.update({"Tiltx": temp[25]})
ret.update({"Tilty": temp[26]})
ret.update({"Depth": temp[27]})
ret.update({"Sensor1": hex(temp[28])[2:].upper()}) # 28 27 转16进制
ret.update({"Sensor2": hex(temp[29])[2:].upper()}) # 30 29
ret.update({"Sensor3": hex(temp[30])[2:].upper()}) # 32 31
ret.update({"Measure_Num": temp[31]}) # 33
ret.update({"Measure_Interval": temp[32]}) # 34
ret.update({"Measure_Repeat": temp[33]}) # 35
# ret.update({"Byte11": temp[46]}) # 46
# ret.update({"Byte12": temp[47]}) # 47
# ret.update({"Byte13": temp[48]}) # 48
# ret.update({"Byte14": temp[49]}) # 49
# ret.update({"Byte15": temp[50]}) # 50
# ret.update({"Byte16": temp[51]}) # 51
# ret.update({"Byte17": temp[52]}) # 52
# ret.update({"Byte18": temp[53]}) # 53
# ret.update({"Byte19": temp[54]}) # 54
# ret.update({"Byte20": temp[55]}) # 55
# ret.update({"Byte21": temp[56]}) # 56
# ret.update({"Byte22": temp[57]}) # 57
# ret.update({"Byte23": temp[58]}) # 58
self.info_dict = ret
return ret
pass
def save_dict_to_file(self,info_dict: dict, fpath: Path) -> None:
temp_str = ""
for key, value in info_dict.items():
temp_str = temp_str + key + " : " + str(value) + "\n"
with open(fpath, "w+") as f:
f.write(temp_str)
def save_error_to_file(self, errlist: list, fpath: Path) -> None:
temp_str = ""
if len(errlist) <1:
return None
for errdict in errlist:
temp_str = temp_str + errdict["path"] +" : "+ errdict["error"] + "\n"
pass
with open(fpath, "w+") as f:
f.write(temp_str)
return None
pass
def getWavelenthDict( self, ) -> dict:
ret_dict = self.getRetDict()
cfg_id:dict = self.syscfg.get( int(self.device_id))
for k in ret_dict.keys():
tmp = self.getWavelength( cfg_id.get(k) )
ret_dict.update({ k : tmp })
return ret_dict
def getWavelength(self,ramsesdict:dict) -> np.ndarray:
ret = []
for i in range(1,256):
tmp = float(ramsesdict['c0s']) + float(ramsesdict['c1s'])*i \
+ float(ramsesdict['c2s'])*i*i + float(ramsesdict['c3s'])*i*i*i
ret.append(tmp)
pass
return np.array(ret)
def setSyscfg(self, syscfg:dict):
self.syscfg = syscfg
def setRetrieve(self, retrieve:dict):
log.debug( f"setRetrieve : {retrieve}......", __name__ )
self.retrieve = retrieve
self.setNewWavelength()
# self.wavelength:np.ndarray = np.arange ( self.retrieve['beginWL'], self.retrieve['endWL'], self.retrieve['interval'] )
def setNewWavelength(self, ):
self.wavelength:np.ndarray = np.arange (
self.retrieve['beginWL'], self.retrieve['endWL'], self.retrieve['interval'] )
def setDeviceID(self, did:int):
self.device_id = did
def setDeviceType(self, device_type):
self.device_type = device_type
def getErrorInfoDict(self, ):
return self.error_result
def printResult(self,):
log.info( f"***** : Print Lsky Esky Lwater Lw Rs......", __name__ )
print(self.res[self.device_enum(1)])
print(self.res[self.device_enum(2)])
print(self.res[self.device_enum(3)])
print(self.res[self.device_enum(4)])
print(self.res[self.device_enum(5)])
pass
def printTest(self,d,sleep_time:int=5):
log.info( f"***** : I am testing ********", __name__ )
print(d)
log.info( f"***** : Ending testing ********", __name__ )
time.sleep(sleep_time)
pass
if __name__ == "__main__":
# hh = HandHeld()
# hh.getDataFileList()
# hh.dealAllMeasurements()
# data/2023_02/07/sensor/14_02_46
# hhb= HandHeldBuf()
# hhb.readFile2Buf("data/2023_02/07/sensor/14_02_46")
# hhb.decode_handheld()
# print(hh.error_result)
# print("======")
# print(hh.filelist)
# print( hh.getDataFname() )
# t = b'\x07'
# print( int(t[0]))
# print( 2 << int(t[0]))
pass