#! python3 # -*- encoding: utf-8 -*- ''' @File : Ramses.py @Time : 2023/03/03 11:09:50 @Author : Jim @ Yiwin @Version : 1.0 @Contact : jim@yi-win.com @Desc : @para : 23 ..07 .... 06 05 04 03 02 01 00 ip信息不包含 ''' import struct import numpy as np from pathlib import Path from tools.mylogger import log from myconfig import RamsesAWRAMS, RamsesSURFACE, RamsesPROFILE, DeviceType class Ramses(object): def __init__(self,): """ @description :处理Ramses的数据标定 Hex -- realWavelength Intensity @param : 23 ..07 .... 06 05 04 03 02 01 00 ip信息不包含 @Returns : realWavelength Intensity """ self.buf = b'' self.mode = 0 # 默认0 空气中,1:水中 self.buf_ip = b'' self.it = None self.light_int = None # 未标定的整数值 self.spectrum = None # 光谱强度 self.ip = [] # self.current_buf = "" # self.current_buf_seq = 0 # self.current_it_int = {"it": 0, "light_int": []} # 积分时间及换算的整数值 # self.res = {"wavelength": [], "light": []} self.cal_cfg = {} # self.current_cal = {} # 当前传感器的序列号 pass def setBuf( self, buf: bytes ): self.buf = buf pass def setMode( self, mode = 1 ): self.mode = mode pass def setCalCfg(self, d: dict): self.cal_cfg = d pass def getRealWavelength(self, d: dict): self.cal_cfg = d pass def getSpectrum(self): return self.spectrum def resetPara(self, ): self.buf = b'' self.it = None self.light_int = None self.spectrum = None # 光谱强度 self.cal_cfg = {} pass def resetItSpectrum(self, ): self.it = None self.spectrum = None # 光谱强度 pass def printPara(self, ): print(f"**************Ramses printPara*******************") print(f"{self.buf}") print(f"{self.cal_cfg}") print(f"{self.it}") print(f"{self.light_int}") print(f"{self.spectrum}") print(f"**************Ramses printPara*******************") pass def dealBuf(self, ip_included:bool=False): """多个传感器的数据处理, 头部是否包含Ip帧的信息""" log.info(f" dealBuf ", __name__) res = {} len_ = len(self.buf) if len_ < 576: return if ip_included: self.buf_ip = self.buf[:26] self.ip = self.decode_ip_buf(self.buf_ip, self.cal_cfg) self.buf = self.buf[26:] len_ = len_ - 26 if len_ % 576 != 0: return for i in range( int(len_/576) ): res.update( {i+1: {}} ) temp_buf = self.buf[7:71] + self.buf[79:143] + \ self.buf[151:215] + self.buf[223:287] + \ self.buf[295:359] + self.buf[367:431] + \ self.buf[439:503] + self.buf[511:575] self.ConvertAndCalibrate( temp_buf ) # print(len(temp_buf)) # temp = self.__ConvertBytesToInt(temp_buf) # res.update( { i+1: temp } ) # print(res) pass def ConvertAndCalibrate(self,) -> None: '''单个成功数据转化 标定''' log.debug(f" ConvertAndCalibrate ", __name__) temp = self.__ConvertBytesToInt( ) # print( f"int : {self.it} {temp}" ) self.__CalibrateSpectrumData( ) pass # 转换一个传感器的部分 def __ConvertBytesToInt(self ) -> None: res = {} d = [] # List [ Tuple[ it:int, sing_set:tuple[int] ] ] # print(f" ================= ") # print(f" {self.buf.hex()} ") self.it = 2 << int(self.buf[1]) # integrated time # self.it = 2 << int(self.buf[0]) # integrated time self.light_int = struct.unpack( " bytes: '''去除遮罩 0x64 0x65 0x66 0x67''' ret = b'' flag = False blen = len(buf) for i in range(blen): if flag == False and buf[i] == 64 : flag = True continue if flag == False and buf[i] != 64 : ret = ret + buf[i].to_bytes( 1, byteorder = 'big' ) continue if flag == True and buf[i] == 100: ret = ret + b'\x40' flag = False continue if flag == True and buf[i] == 101: ret = ret + b'\x23' flag = False continue if flag == True and buf[i] == 102: ret = ret + b'\x11' flag = False continue if flag == True and buf[i] == 103: ret = ret + b'\x13' flag = False continue return ret pass def decode_ip_buf(self, buf, ip_cal:dict): tmpbuf = buf if len(tmpbuf) ==26 and tmpbuf[0] == 0x13: tmpbuf = tmpbuf[2:] Incl_XGain = float(ip_cal['Incl_XOffset'] ) Incl_XOffset = float(ip_cal['Incl_XOffset'] ) Incl_YGain = float(ip_cal['Incl_YGain'] ) Incl_YOffset = float(ip_cal['Incl_YOffset'] ) Incl_KRef = float(ip_cal['Incl_KRef'] ) Press_Sens_mV_bar_1mA = float(ip_cal['Press_Sens_mV_bar_1mA'] ) Incl_KBG = float(ip_cal['Incl_KBG'] ) Press_Sens_mV_bar_4mA = float(ip_cal['Press_Sens_mV_bar_4mA'] ) Press_Gain = float(ip_cal['Press_Gain'] ) Press_Surface_bar = float(ip_cal['Press_Surface_bar'] ) ip_info = struct.unpack("