from dataclasses import dataclass,field from typing import Any,List from enum import Enum import numpy as np import struct class RamsesType(Enum): SAM = 1 SAMIP = 2 class AirWater(Enum): Air = 1 Water = 2 @dataclass class SamCal(object): SAMSN: str = None DarkPixelStart:int = 0 DarkPixelStop: int = 0 Firmware: str = None IDDataBack: str = None IDDataCal: str = None IDDataCalAQ: str = None IntegrationTime: int = None Reverse: float = None SerialNo_MMS: str = None WavelengthRange: str = None c0s: float = None c1s: float = None c2s: float = None c3s: float = None c4s: float = None cs: int or str = None @dataclass class IPCal(object): IPSN: str = None Incl_Orientation: str = None Incl_Xgain: float = None Incl_Xoffset: float = None Incl_Ygain: float = None Incl_Yoffset: float = None Incl_KBG: float = None Incl_Kref: float = None Press_Current_mA: float = None Press_Surface_bar: float = None Press_Gain: float = None WithIncl: float = None WithPress: float = None Press_Sens_mV_bar_4mA: float = None Press_Sens_mV_bar_1mA: float = None Press_Type: str = None CalibrationDate: str = None def __post_init__(self): pass @dataclass class RamsesCal(object): SN: str = "" typ: RamsesType = RamsesType.SAM samsn:str = "" inifile: str = None calfile: str = None calaqfile:str = None backfile:str = None b0: List[str] = None b1: List[str] = None cal: List[str] = None calaq: List[str] = None samcal:SamCal = None ipcal:IPCal = None def __post_init__(self): assert self.SN != None, " Pls pass the SN when initiating..." def set_samcal_from_dict(self, samcal:dict): SAMSN_ = samcal['SAMSN'] DarkPixelStart_ = int(samcal['DarkPixelStart']) DarkPixelStop_ = int(samcal['DarkPixelStop']) Firmware_ = samcal['Firmware'] IDDataBack_ = samcal['IDDataBack'] IDDataCal_ = samcal['IDDataCal'] IDDataCalAQ_ = samcal['IDDataCalAQ'] IntegrationTime_ = int(samcal['IntegrationTime']) Reverse_ = samcal['Reverse'] SerialNo_MMS_ = samcal['SerialNo_MMS'] if samcal['WavelengthRange']: WavelengthRange_ = samcal['WavelengthRange'] else: WavelengthRange_ = "310-1100" c0s_ = float(samcal['c0s']) c1s_ = float(['c1s']) c2s_ = float(['c2s']) c3s_ = float(['c3s']) c4s_ = float(['c4s']) if samcal['cs_']: cs_ = samcal['cs'] else: cs_ = 0 self.samcal= SamCal(SAMSN_ ,DarkPixelStart_ ,DarkPixelStop_ ,Firmware_ ,IDDataBack_ ,IDDataCal_ ,IDDataCalAQ_ ,IntegrationTime_,Reverse_ ,SerialNo_MMS_, WavelengthRange_ ,c0s_ ,c1s_ ,c2s_,c3s_,c4s_ ,cs_,) pass def set_samcal_from_dict(self, ipcal:dict): IPSN_ = ipcal['IPSN'] # = float(ipcal['IPSN'] ) Incl_Orientation_ = ipcal['Incl_Orientation'] Incl_Xgain_ = float(ipcal['Incl_Xgain'] ) Incl_Xoffset_ = float(ipcal['Incl_Xoffset'] ) Incl_Ygain_ = float(ipcal['Incl_Ygain'] ) Incl_Yoffset_ = float(ipcal['Incl_Yoffset'] ) Incl_KBG_ = float(ipcal['Incl_KBG'] ) Incl_Kref_ = float(ipcal['Incl_Kref'] ) Press_Current_mA_ = float(ipcal['Press_Current_mA'] ) Press_Surface_bar_ = float(ipcal['Press_Surface_bar'] ) Press_Gain_ = float(ipcal['Press_Gain'] ) WithIncl_ = float(ipcal['WithIncl'] ) WithPress_ = float(ipcal['WithPress'] ) Press_Sens_mV_bar_4mA_ = float(ipcal['Press_Sens_mV_bar_4mA'] ) Press_Sens_mV_bar_1mA_ = float(ipcal['Press_Sens_mV_bar_1mA'] ) Press_Type_ = ipcal['Press_Type'] CalibrationDate_ = ipcal['CalibrationDate'] self.ipcal = IPCal( IPSN_ ,Incl_Xgain_ ,Incl_Xoffset_ ,Incl_Ygain_,Incl_Yoffset_ ,Incl_KBG_,Incl_Kref_ ,Press_Current_mA_ ,Press_Surface_bar_ ,Press_Gain_,WithIncl_ ,WithPress_, Press_Sens_mV_bar_4mA_,Press_Sens_mV_bar_1mA_ ,Press_Type_ ,CalibrationDate_ ) pass @dataclass class RamsesFactory(object): SN: str = None typ: RamsesType = RamsesType.SAM airwater: AirWater = AirWater.Air raw_data: bytes # 原始的字节 未去遮罩 data_after_remove_mask : bytes # 去遮罩后 data_valid_buf: bytes # 去掉帧头帧尾后的buf ip_buf:bytes data_Int_from_Hex: List[int] # 去遮罩后 data_after_cal: List[float] Wavelength: List[float] integratedTime: int CalData: RamsesCal def set_data_valid_buf(self, data_int:list): self.data_Int_from_Hex = data_int pass def remove_frame_head_tail(self,): assert self.data_after_remove_mask != None , "please remove mask in advance" res = {} buf = self.data_after_remove_mask len_ = len(buf) if self.typ == RamsesType.SAMIP: assert len_ == 602, "SAMIP sensor , wrong the num of bytes" self.buf_ip = self.data_after_remove_mask[:26] buf = buf[26:] # self.ip = self.decode_ip_buf(self.buf_ip, self.cal_cfg) # self.buf = self.buf[26:] len_ = len_ - 26 assert len_ == 576, "SAMIP sensor , wrong the num of bytes" for i in range( 576 ): res.update( {i+1: {}} ) self.data_valid_buf = buf[7:71] + buf[79:143] + \ buf[151:215] + buf[223:287] + \ buf[295:359] + buf[367:431] + \ buf[439:503] + buf[511:575] pass def process_from_hex_2_int(self,): """ @description :从data_after_remove_mask 到 data_Int_from_Hex """ assert self.data_after_remove_mask != None , "please remove mask in advance" assert self.data_valid_buf != None , "please remove frame head and tail" self.integratedTime = 2 << int(self.data_after_remove_mask[0]) # integrated time self.data_Int_from_Hex = struct.unpack( " None: # self.integratedTime = 2 << int(self.data_valid_buf[0]) # integrated time # self.data_Int_from_Hex = struct.unpack( # " bytes: '''去除遮罩 0x64 0x65 0x66 0x67''' ret = b'' flag = False buf = byt blen = len(buf) for i in range(blen): if flag == False and buf[i] == 64 : flag = True continue if flag == False and buf[i] != 64 : ret = ret + buf[i].to_bytes( 1, byteorder = 'big' ) continue if flag == True and buf[i] == 100: ret = ret + b'\x40' flag = False continue if flag == True and buf[i] == 101: ret = ret + b'\x23' flag = False continue if flag == True and buf[i] == 102: ret = ret + b'\x11' flag = False continue if flag == True and buf[i] == 103: ret = ret + b'\x13' flag = False continue return ret pass @dataclass class AWRAMSData(object): info_frame:InfoFrame data_frame:DataFrame # 数据只到分组成byte结束 dataFactory:List[RamsesFactory] # 多个工厂,每个工厂有自己的 标定文件及参数,工厂标记序列哈,只处理相同序列号的产品数据 data_after_avg:List[int] pass def get_datafactory_by_data_frame(self,): """ @description : 调用工厂得到整数值 @param : 576字节数据, 数据id(第几个传感器,还是序列号?) @Returns : List[int] """ pass ## 封装该类的 处理buf hex2int函数 @dataclass class AWRAMS(object): Lsky:List[List[float],List[float]] # 无需记录原始波长,记录插值后的波长 Esky:List[List[float],List[float]] Lwater:List[List[float],List[float]] beginWavelength: float endWavelength: float wavelengthInterv: float Lw:np.ndarray Rs:np.ndarray Lw:np.ndarray def test_01(): """ @description :调用RamsesTest RamsesFactory 获得结果和已知的比较 """ rt =RamsesTest( ) rt.get_ip_cal() rt.get_sam_cal() rt.get_ramses_cal() rc = RamsesFactory(SN="8C52") # 1. samcal, ipcal # 2. ramsescal # 3. if __name__ == '__main__': # 由于很多数据,要到整数这一步取平均, 需要将平均后的整数值传进去,在处理 rc= RamsesCal() print(rc) # rt =RamsesTest() # rt.get_sam_cal() # rt.get_ramses_cal() # print(rt.samcal) pass