from dataclasses import dataclass, field from typing import Any, List from enum import Enum import numpy as np import struct class RamsesType(Enum): SAM = 1 SAMIP = 2 class AirWater(Enum): Air = 1 Water = 2 @dataclass class SamCal(object): SAMSN: str = None DarkPixelStart: int = 0 DarkPixelStop: int = 0 Firmware: str = None IDDataBack: str = None IDDataCal: str = None IDDataCalAQ: str = None IntegrationTime: int = None Reverse: float = None SerialNo_MMS: str = None WavelengthRange: str = None c0s: float = None c1s: float = None c2s: float = None c3s: float = None c4s: float = None cs: int or str = None def __post_init__(self): assert self.SAMSN != None assert len(self.SAMSN) == 4 , "SAMSN len is wrong" pass def set_samcal(self, samcal: dict): assert self.SAMSN == samcal['SAMSN'], "SamCal SAMSN doesnot match." # SAMSN_ = samcal['SAMSN'] self.DarkPixelStart = int(samcal['DarkPixelStart']) self.DarkPixelStop = int(samcal['DarkPixelStop']) self.Firmware = samcal['Firmware'] self.IDDataBack = samcal['IDDataBack'] self.IDDataCal = samcal['IDDataCal'] self.IDDataCalAQ = samcal['IDDataCalAQ'] self.IntegrationTime = int(samcal['IntegrationTime']) self.Reverse = samcal['Reverse'] self.SerialNo_MMS_ = samcal['SerialNo_MMS'] if hasattr( samcal, 'WavelengthRange') : self.WavelengthRange = samcal['WavelengthRange'] else: self.WavelengthRange = "310-1100" self.c0s = float(samcal['c0s']) self.c1s = float(samcal['c1s']) self.c2s = float(samcal['c2s']) self.c3s = float(samcal['c3s']) self.c4s_ = float(samcal['c4s']) if hasattr( samcal, 'cs'): cs = samcal['cs'] else: cs_ = 0 pass @dataclass class IPCal(object): IPSN: str = None Incl_Orientation: str = None Incl_Xgain: float = None Incl_Xoffset: float = None Incl_Ygain: float = None Incl_Yoffset: float = None Incl_KBG: float = None Incl_Kref: float = None Press_Current_mA: float = None Press_Surface_bar: float = None Press_Gain: float = None WithIncl: float = None WithPress: float = None Press_Sens_mV_bar_4mA: float = None Press_Sens_mV_bar_1mA: float = None Press_Type: str = None CalibrationDate: str = None def __post_init__(self): pass def set_ipcal_from_dict(self, ipcal: dict): self.IPSN = ipcal['IPSN'] # = float(ipcal['IPSN'] ) self.Incl_Orientation = ipcal['Incl_Orientation'] self.Incl_Xgain = float(ipcal['Incl_Xgain']) self.Incl_Xoffset = float(ipcal['Incl_Xoffset']) self.Incl_Ygain = float(ipcal['Incl_Ygain']) self.Incl_Yoffset = float(ipcal['Incl_Yoffset']) self.Incl_KBG = float(ipcal['Incl_KBG']) self.Incl_Kref = float(ipcal['Incl_Kref']) self.Press_Current_mA = float(ipcal['Press_Current_mA']) self.Press_Surface_bar = float(ipcal['Press_Surface_bar']) self.Press_Gain = float(ipcal['Press_Gain']) self.WithIncl = float(ipcal['WithIncl']) self.WithPress = float(ipcal['WithPress']) self.Press_Sens_mV_bar_4mA = float(ipcal['Press_Sens_mV_bar_4mA']) self.Press_Sens_mV_bar_1mA = float(ipcal['Press_Sens_mV_bar_1mA']) self.Press_Type_ = ipcal['Press_Type'] self.CalibrationDate = ipcal['CalibrationDate'] @dataclass class RamsesCal(object): SN: str = None typ: RamsesType = RamsesType.SAM samsn: str = "" inifile: str = None calfile: str = None calaqfile: str = None backfile: str = None b0: List[float] = None b1: List[float] = None cal: List[float] = None calaq: List[float] = None samcal: SamCal = None ipcal: IPCal = None def __post_init__(self): assert self.SN != None, " Pls pass the SN when initiating..." assert len(self.SN) == 4 , "SN len is wrong" def set_ramsescal_from_dict(self, ramsescal: dict): ''' 与获得cfg兼容 # {'Lsky': # {'SN': '85B5', 'FUNC': 'Lsky', 'TYPE': 'SAM', 'samsn': '85B5', # 'inifile': 'SAM_85B5.ini', 'calfile': 'Cal_SAM_85B5.dat', 'calaqfile': 'CalAQ_SAM_85B5.dat', 'backfile': 'Back_SAM_85B5.dat', # 'cal': ['+NAN',.......... ''' assert self.SN == ramsescal['SN'], "RamsesCal SN doesnot match." if ramsescal['TYPE'] == "SAMIP": self.typ = RamsesType.SAMIP else: self.typ: RamsesType = RamsesType.SAM self.samsn: str = ramsescal['samsn'] self.inifile: str = ramsescal['inifile'] self.calfile: str = ramsescal['calfile'] self.calaqfile: str = ramsescal['calaqfile'] self.backfile: str = ramsescal['backfile'] self.b0: List[float] = [float(i) for i in ramsescal['b0'] ] self.b1: List[float] = [float(i) for i in ramsescal['b1'] ] self.cal: List[float] = [float(i) for i in ramsescal['cal'] ] self.calaq: List[float] = [float(i) for i in ramsescal['calaq'] ] self.samcal: SamCal = SamCal( SAMSN=self.samsn ) self.samcal.set_samcal(ramsescal ) if self.typ == RamsesType.SAMIP: self.ipcal: IPCal = IPCal( ) self.ipcal.set_ipcal_from_dict( ramsescal ) # def set_ramsescal_samcal(self, samcal: SamCal): # samcal: SamCal = samcal # def set_ramsescal_ipcal(self, ipcal: IPCal): # ipcal: IPCal = ipcal # def set_samcal_from_dict(self, samcal: dict): # assert self.SN != samcal['SAMSN'], "RamsesCal SN doesnot match." # # SAMSN_ = samcal['SAMSN'] # self.samcal.set_samcal # c0s_ = float(samcal['c0s']) # c1s_ = float(['c1s']) # c2s_ = float(['c2s']) # c3s_ = float(['c3s']) # c4s_ = float(['c4s']) # if samcal['cs_']: # cs_ = samcal['cs'] # else: # cs_ = 0 # self.samcal = SamCal(self.SN, DarkPixelStart_, DarkPixelStop_, Firmware_, IDDataBack_, IDDataCal_, IDDataCalAQ_, # IntegrationTime_, Reverse_, SerialNo_MMS_, WavelengthRange_, c0s_, c1s_, c2s_, c3s_, c4s_, cs_,) # pass # def set_samcal_from_dict(self, ipcal: dict): # IPSN_ = ipcal['IPSN'] # = float(ipcal['IPSN'] ) # Incl_Orientation_ = ipcal['Incl_Orientation'] # Incl_Xgain_ = float(ipcal['Incl_Xgain']) # Incl_Xoffset_ = float(ipcal['Incl_Xoffset']) # Incl_Ygain_ = float(ipcal['Incl_Ygain']) # Incl_Yoffset_ = float(ipcal['Incl_Yoffset']) # Incl_KBG_ = float(ipcal['Incl_KBG']) # Incl_Kref_ = float(ipcal['Incl_Kref']) # Press_Current_mA_ = float(ipcal['Press_Current_mA']) # Press_Surface_bar_ = float(ipcal['Press_Surface_bar']) # Press_Gain_ = float(ipcal['Press_Gain']) # WithIncl_ = float(ipcal['WithIncl']) # WithPress_ = float(ipcal['WithPress']) # Press_Sens_mV_bar_4mA_ = float(ipcal['Press_Sens_mV_bar_4mA']) # Press_Sens_mV_bar_1mA_ = float(ipcal['Press_Sens_mV_bar_1mA']) # Press_Type_ = ipcal['Press_Type'] # CalibrationDate_ = ipcal['CalibrationDate'] # self.ipcal = IPCal(IPSN_, Incl_Xgain_, Incl_Xoffset_, Incl_Ygain_, Incl_Yoffset_, Incl_KBG_, Incl_Kref_, Press_Current_mA_, Press_Surface_bar_, Press_Gain_, WithIncl_, WithPress_, # Press_Sens_mV_bar_4mA_, Press_Sens_mV_bar_1mA_, Press_Type_, CalibrationDate_) # pass @dataclass class RamsesFactory(object): SN: str = None typ: RamsesType = RamsesType.SAM airwater: AirWater = AirWater.Air data_raw: bytes = None # 原始的字节 未去遮罩 data_after_remove_mask: bytes = None # 去遮罩后 data_valid_buf: bytes = None # 去掉帧头帧尾后的buf ip_buf: bytes = None data_Int_from_Hex: List[int] = None # 去遮罩后 data_after_cal: List[float] = None Wavelength: List[float] = None integratedTime: int = None RamsesCalData: RamsesCal = None '''Usage 设置标定参数: sn = "85C2" rf = RamsesFactory( SN=sn, airwater = AirWater.Air ) rf.set_cfg(cfg ) 设置数据, 从原始数据开始 set_data_raw(self, byt: bytes) removeMask() get_data_valid_buf() -- valid_buf 从有效数据开始 set_data_valid_buf(self, byt: bytes) 处理数据 process_from_hex_2_int(self,) ''' def __post_init__(self): assert self.SN != None assert len(self.SN) == 4 , "SAMSN len is wrong" self.RamsesCalData = RamsesCal(self.SN) pass def set_cfg(self, cfg:dict): assert self.SN == cfg["SN"] , "ramsesFactory SN does not match!!" if cfg['TYPE'] == "SAMIP": self.typ: RamsesType = RamsesType.SAMIP else: self.typ: RamsesType = RamsesType.SAM self.RamsesCalData.set_ramsescal_from_dict(cfg) def set_airwater(self,air_water:AirWater): self.airwater = air_water pass def set_data_raw(self, byt: bytes): assert isinstance(byt, bytes) self.data_raw = byt pass def set_data_valid_buf(self, byt: bytes): self.data_valid_buf = byt pass def get_data_valid_buf(self, ): self.remove_frame_head_tail( ) pass def remove_frame_head_tail(self,): ''' 兼容 SAM 576, SAMIP 602 去掉 23 00 00 00 00 00 00 ...... 01 ''' assert self.data_after_remove_mask != None, "please remove mask in advance" res = {} buf = self.data_after_remove_mask len_ = len(buf) print( f" head tail, {len_}") print( self.data_after_remove_mask) if self.typ == RamsesType.SAMIP: assert len_ == 602, "SAMIP sensor , wrong the num of bytes" self.buf_ip = self.data_after_remove_mask[:26] buf = buf[26:] # self.ip = self.decode_ip_buf(self.buf_ip, self.cal_cfg) # self.buf = self.buf[26:] len_ = len_ - 26 assert len_ == 576, "SAMIP sensor , wrong the num of bytes" for i in range(576): res.update({i+1: {}}) self.data_valid_buf = buf[7:71] + buf[79:143] + \ buf[151:215] + buf[223:287] + \ buf[295:359] + buf[367:431] + \ buf[439:503] + buf[511:575] pass def set_integrate_time(self, it:int): self.integratedTime = it def process_from_hex_2_int(self, mode= 0): """ @description :从data_after_remove_mask 到 data_Int_from_Hex """ assert self.data_after_remove_mask != None, "please remove mask in advance" assert self.data_valid_buf != None, "please remove frame head and tail" self.integratedTime = 2 << int(self.data_valid_buf[0]) # integrated time print(f"===============it {self.integratedTime}") tmp = struct.unpack( " None: # self.integratedTime = 2 << int(self.data_valid_buf[0]) # integrated time # self.data_Int_from_Hex = struct.unpack( # "