from dataclasses import dataclass, field from typing import Any, List from enum import Enum import numpy as np import struct # import time from tools.mylogger import log class RamsesType(Enum): SAM = 1 SAMIP = 2 class AirWater(Enum): Air = 1 Water = 2 @dataclass class SamCal(object): SAMSN: str = None DarkPixelStart: int = 0 DarkPixelStop: int = 0 Firmware: str = None IDDataBack: str = None IDDataCal: str = None IDDataCalAQ: str = None IntegrationTime: int = None Reverse: float = None SerialNo_MMS: str = None WavelengthRange: str = None c0s: float = None c1s: float = None c2s: float = None c3s: float = None c4s: float = None cs: int or str = None def __post_init__(self): assert self.SAMSN != None assert len(self.SAMSN) == 4 , "SAMSN len is wrong" pass def set_samcal(self, samcal: dict): assert self.SAMSN == samcal['SAMSN'], "SamCal SAMSN doesnot match." # SAMSN_ = samcal['SAMSN'] self.DarkPixelStart = int(samcal['DarkPixelStart']) self.DarkPixelStop = int(samcal['DarkPixelStop']) self.Firmware = samcal['Firmware'] self.IDDataBack = samcal['IDDataBack'] self.IDDataCal = samcal['IDDataCal'] self.IDDataCalAQ = samcal['IDDataCalAQ'] if hasattr( samcal, 'IntegrationTime') : self.IntegrationTime = samcal['IntegrationTime'] else: self.IntegrationTime = " " # self.Reverse = samcal['Reverse'] # self.SerialNo_MMS_ = samcal['SerialNo_MMS'] if hasattr( samcal, 'WavelengthRange') : self.WavelengthRange = samcal['WavelengthRange'] else: self.WavelengthRange = "310-1100" self.c0s = float(samcal['c0s']) self.c1s = float(samcal['c1s']) self.c2s = float(samcal['c2s']) self.c3s = float(samcal['c3s']) self.c4s_ = float(samcal['c4s']) if hasattr( samcal, 'cs'): cs = samcal['cs'] else: cs_ = 0 pass @dataclass class IPCal(object): IPSN: str = None Incl_Orientation: str = None Incl_Xgain: float = None Incl_Xoffset: float = None Incl_Ygain: float = None Incl_Yoffset: float = None Incl_KBG: float = None Incl_Kref: float = None Press_Current_mA: float = None Press_Surface_bar: float = None Press_Gain: float = None WithIncl: float = None WithPress: float = None Press_Sens_mV_bar_4mA: float = None Press_Sens_mV_bar_1mA: float = None Press_Type: str = None CalibrationDate: str = None def __post_init__(self): pass def set_ipcal_from_dict(self, ipcal: dict): self.IPSN = ipcal['IPSN'] # = float(ipcal['IPSN'] ) self.Incl_Orientation = ipcal['Incl_Orientation'] self.Incl_Xgain = float(ipcal['Incl_Xgain']) self.Incl_Xoffset = float(ipcal['Incl_Xoffset']) self.Incl_Ygain = float(ipcal['Incl_Ygain']) self.Incl_Yoffset = float(ipcal['Incl_Yoffset']) self.Incl_KBG = float(ipcal['Incl_KBG']) self.Incl_Kref = float(ipcal['Incl_Kref']) self.Press_Current_mA = float(ipcal['Press_Current_mA']) self.Press_Surface_bar = float(ipcal['Press_Surface_bar']) self.Press_Gain = float(ipcal['Press_Gain']) self.WithIncl = float(ipcal['WithIncl']) self.WithPress = float(ipcal['WithPress']) self.Press_Sens_mV_bar_4mA = float(ipcal['Press_Sens_mV_bar_4mA']) self.Press_Sens_mV_bar_1mA = float(ipcal['Press_Sens_mV_bar_1mA']) self.Press_Type_ = ipcal['Press_Type'] self.CalibrationDate = ipcal['CalibrationDate'] @dataclass class RamsesCal(object): SN: str = None typ: RamsesType = RamsesType.SAM samsn: str = "" inifile: str = None calfile: str = None calaqfile: str = None backfile: str = None b0: List[float] = None b1: List[float] = None cal: List[float] = None calaq: List[float] = None samcal: SamCal = None ipcal: IPCal = None def __post_init__(self): assert self.SN != None, " Pls pass the SN when initiating..." assert len(self.SN) == 4 , "SN len is wrong" def set_ramsescal_from_dict(self, ramsescal: dict): ''' 与获得cfg兼容 # {'Lsky': # {'SN': '85B5', 'FUNC': 'Lsky', 'TYPE': 'SAM', 'samsn': '85B5', # 'inifile': 'SAM_85B5.ini', 'calfile': 'Cal_SAM_85B5.dat', 'calaqfile': 'CalAQ_SAM_85B5.dat', 'backfile': 'Back_SAM_85B5.dat', # 'cal': ['+NAN',.......... ''' assert self.SN == ramsescal['SN'], "RamsesCal SN doesnot match." if ramsescal['TYPE'] == "SAMIP": self.typ = RamsesType.SAMIP else: self.typ: RamsesType = RamsesType.SAM self.samsn: str = ramsescal['samsn'] self.inifile: str = ramsescal['inifile'] self.calfile: str = ramsescal['calfile'] self.calaqfile: str = ramsescal['calaqfile'] self.backfile: str = ramsescal['backfile'] self.b0: List[float] = [float(i) for i in ramsescal['b0'] ] self.b1: List[float] = [float(i) for i in ramsescal['b1'] ] self.cal: List[float] = [float(i) for i in ramsescal['cal'] ] self.calaq: List[float] = [float(i) for i in ramsescal['calaq'] ] # 设置sam_cal self.samcal: SamCal = SamCal( SAMSN=self.samsn ) self.samcal.set_samcal(ramsescal ) # 设置ip_cal if self.typ == RamsesType.SAMIP: self.ipcal: IPCal = IPCal( ) self.ipcal.set_ipcal_from_dict( ramsescal ) # def set_ramsescal_samcal(self, samcal: SamCal): # samcal: SamCal = samcal # def set_ramsescal_ipcal(self, ipcal: IPCal): # ipcal: IPCal = ipcal # def set_samcal_from_dict(self, samcal: dict): # assert self.SN != samcal['SAMSN'], "RamsesCal SN doesnot match." # # SAMSN_ = samcal['SAMSN'] # self.samcal.set_samcal # c0s_ = float(samcal['c0s']) # c1s_ = float(['c1s']) # c2s_ = float(['c2s']) # c3s_ = float(['c3s']) # c4s_ = float(['c4s']) # if samcal['cs_']: # cs_ = samcal['cs'] # else: # cs_ = 0 # self.samcal = SamCal(self.SN, DarkPixelStart_, DarkPixelStop_, Firmware_, IDDataBack_, IDDataCal_, IDDataCalAQ_, # IntegrationTime_, Reverse_, SerialNo_MMS_, WavelengthRange_, c0s_, c1s_, c2s_, c3s_, c4s_, cs_,) # pass # def set_samcal_from_dict(self, ipcal: dict): # IPSN_ = ipcal['IPSN'] # = float(ipcal['IPSN'] ) # Incl_Orientation_ = ipcal['Incl_Orientation'] # Incl_Xgain_ = float(ipcal['Incl_Xgain']) # Incl_Xoffset_ = float(ipcal['Incl_Xoffset']) # Incl_Ygain_ = float(ipcal['Incl_Ygain']) # Incl_Yoffset_ = float(ipcal['Incl_Yoffset']) # Incl_KBG_ = float(ipcal['Incl_KBG']) # Incl_Kref_ = float(ipcal['Incl_Kref']) # Press_Current_mA_ = float(ipcal['Press_Current_mA']) # Press_Surface_bar_ = float(ipcal['Press_Surface_bar']) # Press_Gain_ = float(ipcal['Press_Gain']) # WithIncl_ = float(ipcal['WithIncl']) # WithPress_ = float(ipcal['WithPress']) # Press_Sens_mV_bar_4mA_ = float(ipcal['Press_Sens_mV_bar_4mA']) # Press_Sens_mV_bar_1mA_ = float(ipcal['Press_Sens_mV_bar_1mA']) # Press_Type_ = ipcal['Press_Type'] # CalibrationDate_ = ipcal['CalibrationDate'] # self.ipcal = IPCal(IPSN_, Incl_Xgain_, Incl_Xoffset_, Incl_Ygain_, Incl_Yoffset_, Incl_KBG_, Incl_Kref_, Press_Current_mA_, Press_Surface_bar_, Press_Gain_, WithIncl_, WithPress_, # Press_Sens_mV_bar_4mA_, Press_Sens_mV_bar_1mA_, Press_Type_, CalibrationDate_) # pass @dataclass class RamsesFactory(object): SN: str = None typ: RamsesType = RamsesType.SAM airwater: AirWater = AirWater.Air data_raw: bytes = None # 原始的字节 未去遮罩 ip_raw: bytes =None # ip 数据分开传进来 data_after_remove_mask: bytes = None # 去遮罩后 data_valid_buf: bytes = None # 去掉帧头帧尾后的buf ip_buf: bytes = None data_Int_from_Hex: List[int] = None # 去遮罩后 data_after_cal: List[float] = None Wavelength: List[float] = None integratedTime: int = None RamsesCalData: RamsesCal = None '''Usage 设置标定参数: sn = "85C2" rf = RamsesFactory( SN=sn, airwater = AirWater.Air ) rf.set_cfg(cfg ) 设置数据, 从原始数据开始 set_data_raw(self, byt: bytes) removeMask() get_data_valid_buf() -- valid_buf 从有效数据开始 set_data_valid_buf(self, byt: bytes) 处理数据 process_from_hex_2_int(self,) ''' def __post_init__(self): assert self.SN != None assert len(self.SN) == 4 , "SAMSN len is wrong" self.RamsesCalData = RamsesCal(self.SN) pass def set_cfg(self, cfg:dict): assert self.SN == cfg["SN"] , "ramsesFactory SN does not match!!" if cfg['TYPE'] == "SAMIP": self.typ: RamsesType = RamsesType.SAMIP else: self.typ: RamsesType = RamsesType.SAM self.RamsesCalData.set_ramsescal_from_dict(cfg) def set_airwater(self,air_water:AirWater): self.airwater = air_water pass def set_data_raw_ip_raw(self, byt: bytes ): ''' byt 含 data_raw ip_raw ''' assert isinstance(byt, bytes), f"arg byt is not byte type" num = 0 buf = byt for i in range(50): if byt[i] == 35: num+=1 if num == 2: self.ip_raw = buf[:i] self.data_raw = buf[i:] break def set_data_raw(self, byt: bytes ): ''' 需要 光谱数据 兼容: 传入前分不出byte ip ip_byt: 判断是否含IP数据 ''' assert isinstance(byt, bytes), f"arg byt is not byte type" self.data_raw = byt def set_ip_raw(self, byt: bytes ): ''' 需要 光谱数据 兼容: 传入前分不出byte ip ip_byt: 判断是否含IP数据 ''' assert isinstance(byt, bytes), f"arg byt is not byte type" self.ip_raw = byt def set_ip_buf(self, byt: bytes ): ''' 处理不含遮罩的 IP buf ''' assert isinstance(byt, bytes) self.ip_buf = byt def set_data_valid_buf(self, byt: bytes): self.data_valid_buf = byt pass def set_data_Int_254(self, int_list: list): self.data_Int_from_Hex = int_list pass def get_data_valid_buf(self, ): self.remove_frame_head_tail( ) pass def remove_frame_head_tail(self,): ''' 兼容 SAM 576, SAMIP 的IP数据单独传进来 去掉 23 00 00 00 00 00 00 ...... 01 ''' assert (self.data_after_remove_mask != None), "please remove mask in advance" res = {} buf = self.data_after_remove_mask len_ = len(buf) # log.warning( f"valid buf: {buf.hex()}") # log.warning( f"len: {len_}") # print( f" head tail, {len_}") # print( self.data_after_remove_mask) # if self.typ == RamsesType.SAMIP: # assert len_ == 602, "SAMIP sensor , wrong the num of bytes" # self.buf_ip = self.data_after_remove_mask[:26] # buf = buf[26:] # # self.ip = self.decode_ip_buf(self.buf_ip, self.cal_cfg) # # self.buf = self.buf[26:] # len_ = len_ - 26 assert (len_ == 576), "SAM sensor , wrong the num of bytes" for i in range(576): res.update({i+1: {}}) self.data_valid_buf = buf[7:71] + buf[79:143] + \ buf[151:215] + buf[223:287] + \ buf[295:359] + buf[367:431] + \ buf[439:503] + buf[511:575] pass def set_integrate_time(self, it:int): self.integratedTime = it def process_from_hex_2_int(self, mode= 0): """ @description :从data_after_remove_mask 到 data_Int_from_Hex """ assert self.data_valid_buf != None, "please remove frame head and tail" # print( f" == {len(self.data_valid_buf)}= {self.data_valid_buf} ") # log.warning( f"valid buf: {self.data_valid_buf.hex()}") self.integratedTime = 2 << int(self.data_valid_buf[0]) # integrated time # print(f"===============it {self.integratedTime}") tmp = struct.unpack( " None: # self.integratedTime = 2 << int(self.data_valid_buf[0]) # integrated time # self.data_Int_from_Hex = struct.unpack( # " np.ndarray: '''处理raw 数据 8*72 = 576 mode =0, 无需remmoveMask ''' self.rf.set_data_raw(data) # 同时传入 IPbuf if mode == 1: self.rf.removeMask() else: self.rf.data_after_remove_mask = self.rf.data_raw self.rf.get_data_valid_buf( ) self.rf.process_from_hex_2_int() self.rf.calibrate_data() def deal_raw_data_list(self, data:list) -> np.ndarray: ''' (15 + 26 +3*sensor) removeMask get_valid_buf''' log.info( " ", __name__,"deal_raw_data_list ") size = len(data) assert size>0, ">>>> raw datalist len is 0 " tmp_data = np.array([]) for i in range(size): # log.info( f" Num {i} Group ", __name__,"deal_raw_data_list ") self.rf.set_data_raw( data[i] ) self.rf.removeMask() self.rf.get_data_valid_buf( ) self.rf.process_from_hex_2_int() self.rf.calibrate_data() if i == 0: tmp_data = self.rf.data_after_cal else: tmp_data = tmp_data + self.rf.data_after_cal self.rf.data_after_cal = tmp_data/size # return tmp_data/size def deal_raw_ip_list(self, ip_data:list) -> np.ndarray: ''' ip_raw''' log.info(" 处理 ip_data:list ",__name__, "deal_raw_ip_list") pass def deal_raw_ip_buf(self, ip_buf:bytes) -> tuple: ''' ip_raw''' log.debug(" args: ip_buf ",__class__, "deal_raw_ip_buf") return self.rf.decode_ip_buf(ip_buf) pass def deal_data_valid_buf(self, data:bytes): ''' 去 23 00 00 00 .....01后的数据, 64*8=512 ''' self.rf.set_data_valid_buf(data) self.rf.process_from_hex_2_int() self.rf.calibrate_data() def deal_data_int254(self, data:list, integrationTime:int): '''254 * int, integrationTime ''' self.rf.set_data_Int_254(data) self.rf.set_integrate_time(integrationTime) self.rf.calibrate_data() def get_data_after_cal(self,): return self.rf.data_after_cal def test_01(): """ @description :调用RamsesTest RamsesFactory 获得结果和已知的比较 """ # sn = "85C2" sn = "84E3" cfg = {} # r85c2 = Ramses85C2() # cfg = r85c2.calcfg r84e3 = Ramses84E3() cfg = r84e3.calcfg # rt.get_ip_cal() # rt.get_sam_cal() # rt.get_ramses_cal() rf = RamsesFactory( SN=sn, airwater=AirWater.Air ) rf.set_cfg(cfg ) # rf.set_data_raw( bytes.fromhex(r85c2.data_85C2) ) # rf.removeMask() # rf.get_data_valid_buf() # rf.process_from_hex_2_int() # rf.get_wavelenth(1) # rf.calibrate_data(1) # rf.process_from_hex_2_int() rf.data_Int_from_Hex = r84e3.data_Int_from_Hex rf.set_integrate_time (256) print( len(rf.RamsesCalData.b0) ) print( len(rf.RamsesCalData.b1) ) print( len(rf.RamsesCalData.cal) ) print( len(rf.RamsesCalData.calaq) ) # print( rf.RamsesCalData.samcal.DarkPixelStart ) # print( rf.RamsesCalData.samcal.DarkPixelStop ) # rf.get_wavelenth(1) rf.calibrate_data(1) # 4 1077 0 0 0.016433967 0.017747166 -0.001313199 0.000333492 0.010671759 0.085698462 0.124526848 0.124526478 # 5 1088 0 0 0.016601816 0.0177432 -0.001141384 0.000505308 0.016169857 0.102722719 0.157412665 0.157412431 def test_02(): """ @description :调用RamsesTest RamsesFactory 获得结果和已知的比较 """ sn = "85C2" cfg = {} r85c2 = Ramses85C2() cfg = r85c2.calcfg rf = RamsesFactory( SN=sn, airwater=AirWater.Air ) rf.set_cfg(cfg ) rf.set_data_raw( bytes.fromhex(r85c2.data_85C2) ) # print(f"raw : {rf.data_raw.hex()}" ) rf.removeMask() # print(f"mask : {rf.data_after_remove_mask.hex() }" ) rf.get_data_valid_buf() rf.process_from_hex_2_int(1) # rf.data_Int_from_Hex = r85c2.data_Int_from_Hex # rf.set_integrate_time (4096) # print( rf.data_Int_from_Hex) # print( len(rf.RamsesCalData.b0) ) # print( len(rf.RamsesCalData.b1) ) # print( len(rf.RamsesCalData.cal) ) # print( len(rf.RamsesCalData.calaq) ) # rf.get_wavelenth(1) rf.calibrate_data(1) if __name__ == '__main__': # 由于很多数据,要到整数这一步取平均, 需要将平均后的整数值传进去,在处理 # test_01() test_02() # print(hex(100)) # rc= RamsesCal() # print(rc) # rt =RamsesTest() # rt.get_sam_cal() # rt.get_ramses_cal() # print(rt.samcal) pass # {2: {1: {'FUNC': 'Lsky', 'SN': '85B5'}, 2: {'FUNC': 'Esky', 'SN': '50ED'}, 3: {'FUNC': 'Lwater', 'SN': '852F'}}, # 3: {1: {'FUNC': 'Lsky', 'SN': '85B5'}, 2: {'FUNC': 'Esky', 'SN': '50ED'}, 3: {'FUNC': 'Lwater', 'SN': '852F'}}} # {'Lsky': # {'SN': '85B5', 'FUNC': 'Lsky', 'TYPE': 'SAM', 'samsn': '85B5', # 'inifile': 'SAM_85B5.ini', 'calfile': 'Cal_SAM_85B5.dat', 'calaqfile': 'CalAQ_SAM_85B5.dat', 'backfile': 'Back_SAM_85B5.dat', # 'cal': ['+NAN',..........