diff --git a/Ramses.py b/Ramses.py index a80e707..e5cb6ab 100644 --- a/Ramses.py +++ b/Ramses.py @@ -136,7 +136,7 @@ class Ramses(object): # print( self.light_int ) def __CalibrateSpectrumData(self ,) : - t0 = 8092 + t0 = 8192 log.debug(f" __CalibrateSpectrumData ..... ", __name__) raw = np.asarray(self.light_int, dtype=float) diff --git a/myRamses.py b/myRamses.py new file mode 100644 index 0000000..5d848c8 --- /dev/null +++ b/myRamses.py @@ -0,0 +1,684 @@ +from dataclasses import dataclass,field +from typing import Any,List +from enum import Enum +import numpy as np +import struct + +class RamsesType(Enum): + SAM = 1 + SAMIP = 2 + +class AirWater(Enum): + Air = 1 + Water = 2 + + +@dataclass +class SamCal(object): + SAMSN: str = None + DarkPixelStart:int = 0 + DarkPixelStop: int = 0 + Firmware: str = None + IDDataBack: str = None + IDDataCal: str = None + IDDataCalAQ: str = None + IntegrationTime: int = None + Reverse: float = None + SerialNo_MMS: str = None + WavelengthRange: str = None + c0s: float = None + c1s: float = None + c2s: float = None + c3s: float = None + c4s: float = None + cs: int or str = None + + + +@dataclass +class IPCal(object): + IPSN: str = None + Incl_Orientation: str = None + Incl_Xgain: float = None + Incl_Xoffset: float = None + Incl_Ygain: float = None + Incl_Yoffset: float = None + Incl_KBG: float = None + Incl_Kref: float = None + Press_Current_mA: float = None + Press_Surface_bar: float = None + Press_Gain: float = None + WithIncl: float = None + WithPress: float = None + Press_Sens_mV_bar_4mA: float = None + Press_Sens_mV_bar_1mA: float = None + Press_Type: str = None + CalibrationDate: str = None + def __post_init__(self): + pass + + +@dataclass +class RamsesCal(object): + SN: str = "" + typ: RamsesType = RamsesType.SAM + samsn:str = "" + inifile: str = None + calfile: str = None + calaqfile:str = None + backfile:str = None + b0: List[str] = None + b1: List[str] = None + cal: List[str] = None + calaq: List[str] = None + samcal:SamCal = None + ipcal:IPCal = None + def __post_init__(self): + assert self.SN != None, " Pls pass the SN when initiating..." + + + def set_samcal_from_dict(self, samcal:dict): + SAMSN_ = samcal['SAMSN'] + DarkPixelStart_ = int(samcal['DarkPixelStart']) + DarkPixelStop_ = int(samcal['DarkPixelStop']) + Firmware_ = samcal['Firmware'] + IDDataBack_ = samcal['IDDataBack'] + IDDataCal_ = samcal['IDDataCal'] + IDDataCalAQ_ = samcal['IDDataCalAQ'] + IntegrationTime_ = int(samcal['IntegrationTime']) + Reverse_ = samcal['Reverse'] + SerialNo_MMS_ = samcal['SerialNo_MMS'] + if samcal['WavelengthRange']: + WavelengthRange_ = samcal['WavelengthRange'] + else: + WavelengthRange_ = "310-1100" + + c0s_ = float(samcal['c0s']) + c1s_ = float(['c1s']) + c2s_ = float(['c2s']) + c3s_ = float(['c3s']) + c4s_ = float(['c4s']) + if samcal['cs_']: + cs_ = samcal['cs'] + else: + cs_ = 0 + + self.samcal= SamCal(SAMSN_ ,DarkPixelStart_ ,DarkPixelStop_ + ,Firmware_ ,IDDataBack_ ,IDDataCal_ ,IDDataCalAQ_ + ,IntegrationTime_,Reverse_ ,SerialNo_MMS_, WavelengthRange_ + ,c0s_ ,c1s_ ,c2s_,c3s_,c4s_ ,cs_,) + pass + + def set_samcal_from_dict(self, ipcal:dict): + IPSN_ = ipcal['IPSN'] # = float(ipcal['IPSN'] ) + Incl_Orientation_ = ipcal['Incl_Orientation'] + Incl_Xgain_ = float(ipcal['Incl_Xgain'] ) + Incl_Xoffset_ = float(ipcal['Incl_Xoffset'] ) + Incl_Ygain_ = float(ipcal['Incl_Ygain'] ) + Incl_Yoffset_ = float(ipcal['Incl_Yoffset'] ) + Incl_KBG_ = float(ipcal['Incl_KBG'] ) + Incl_Kref_ = float(ipcal['Incl_Kref'] ) + Press_Current_mA_ = float(ipcal['Press_Current_mA'] ) + Press_Surface_bar_ = float(ipcal['Press_Surface_bar'] ) + Press_Gain_ = float(ipcal['Press_Gain'] ) + WithIncl_ = float(ipcal['WithIncl'] ) + WithPress_ = float(ipcal['WithPress'] ) + Press_Sens_mV_bar_4mA_ = float(ipcal['Press_Sens_mV_bar_4mA'] ) + Press_Sens_mV_bar_1mA_ = float(ipcal['Press_Sens_mV_bar_1mA'] ) + Press_Type_ = ipcal['Press_Type'] + CalibrationDate_ = ipcal['CalibrationDate'] + + self.ipcal = IPCal( IPSN_ ,Incl_Xgain_ + ,Incl_Xoffset_ ,Incl_Ygain_,Incl_Yoffset_ ,Incl_KBG_,Incl_Kref_ + ,Press_Current_mA_ ,Press_Surface_bar_ ,Press_Gain_,WithIncl_ ,WithPress_, + Press_Sens_mV_bar_4mA_,Press_Sens_mV_bar_1mA_ ,Press_Type_ + ,CalibrationDate_ ) + pass + + +@dataclass +class RamsesFactory(object): + SN: str = None + typ: RamsesType = RamsesType.SAM + airwater: AirWater = AirWater.Air + raw_data: bytes # 原始的字节 未去遮罩 + data_after_remove_mask : bytes # 去遮罩后 + data_valid_buf: bytes # 去掉帧头帧尾后的buf + ip_buf:bytes + data_Int_from_Hex: List[int] # 去遮罩后 + data_after_cal: List[float] + Wavelength: List[float] + integratedTime: int + CalData: RamsesCal + + def set_data_valid_buf(self, data_int:list): + self.data_Int_from_Hex = data_int + pass + + + def remove_frame_head_tail(self,): + assert self.data_after_remove_mask != None , "please remove mask in advance" + res = {} + buf = self.data_after_remove_mask + len_ = len(buf) + if self.typ == RamsesType.SAMIP: + assert len_ == 602, "SAMIP sensor , wrong the num of bytes" + self.buf_ip = self.data_after_remove_mask[:26] + buf = buf[26:] + # self.ip = self.decode_ip_buf(self.buf_ip, self.cal_cfg) + # self.buf = self.buf[26:] + len_ = len_ - 26 + assert len_ == 576, "SAMIP sensor , wrong the num of bytes" + + for i in range( 576 ): + res.update( {i+1: {}} ) + self.data_valid_buf = buf[7:71] + buf[79:143] + \ + buf[151:215] + buf[223:287] + \ + buf[295:359] + buf[367:431] + \ + buf[439:503] + buf[511:575] + + pass + + + def process_from_hex_2_int(self,): + """ + @description :从data_after_remove_mask 到 data_Int_from_Hex + """ + assert self.data_after_remove_mask != None , "please remove mask in advance" + assert self.data_valid_buf != None , "please remove frame head and tail" + + self.integratedTime = 2 << int(self.data_after_remove_mask[0]) # integrated time + + self.data_Int_from_Hex = struct.unpack( + " None: + # self.integratedTime = 2 << int(self.data_valid_buf[0]) # integrated time + # self.data_Int_from_Hex = struct.unpack( + # " bytes: + '''去除遮罩 0x64 0x65 0x66 0x67''' + ret = b'' + flag = False + buf = byt + blen = len(buf) + for i in range(blen): + if flag == False and buf[i] == 64 : + flag = True + continue + if flag == False and buf[i] != 64 : + ret = ret + buf[i].to_bytes( 1, byteorder = 'big' ) + continue + if flag == True and buf[i] == 100: + ret = ret + b'\x40' + flag = False + continue + if flag == True and buf[i] == 101: + ret = ret + b'\x23' + flag = False + continue + if flag == True and buf[i] == 102: + ret = ret + b'\x11' + flag = False + continue + if flag == True and buf[i] == 103: + ret = ret + b'\x13' + flag = False + continue + return ret + pass + + +@dataclass +class AWRAMSData(object): + info_frame:InfoFrame + data_frame:DataFrame # 数据只到分组成byte结束 + dataFactory:List[RamsesFactory] # 多个工厂,每个工厂有自己的 标定文件及参数,工厂标记序列哈,只处理相同序列号的产品数据 + + data_after_avg:List[int] + pass + + def get_datafactory_by_data_frame(self,): + """ + @description : 调用工厂得到整数值 + @param : 576字节数据, 数据id(第几个传感器,还是序列号?) + @Returns : List[int] + """ + pass + + + ## 封装该类的 处理buf hex2int函数 + + +@dataclass +class AWRAMS(object): + Lsky:List[List[float],List[float]] # 无需记录原始波长,记录插值后的波长 + Esky:List[List[float],List[float]] + Lwater:List[List[float],List[float]] + beginWavelength: float + endWavelength: float + wavelengthInterv: float + Lw:np.ndarray + Rs:np.ndarray + Lw:np.ndarray + + +def test_01(): + """ + @description :调用RamsesTest RamsesFactory 获得结果和已知的比较 + """ + rt =RamsesTest( ) + rt.get_ip_cal() + rt.get_sam_cal() + rt.get_ramses_cal() + + rc = RamsesFactory(SN="8C52") + # 1. samcal, ipcal + + # 2. ramsescal + + # 3. + + + +if __name__ == '__main__': + # 由于很多数据,要到整数这一步取平均, 需要将平均后的整数值传进去,在处理 + + rc= RamsesCal() + print(rc) + + # rt =RamsesTest() + # rt.get_sam_cal() + # rt.get_ramses_cal() + # print(rt.samcal) + pass +