From b101c1a54d972a6c9d5bf96691a96152ce6bc8b0 Mon Sep 17 00:00:00 2001 From: esea_info Date: Wed, 19 Apr 2023 12:54:54 +0800 Subject: [PATCH] ramses.py decode_sensor_buf --- Ramses.py | 135 ++++++++++++++++++++++++++++++++++++++++++---------- handheld.py | 22 +++++---- 2 files changed, 124 insertions(+), 33 deletions(-) diff --git a/Ramses.py b/Ramses.py index a5429af..49c9671 100644 --- a/Ramses.py +++ b/Ramses.py @@ -28,9 +28,12 @@ class Ramses(object): @Returns : realWavelength Intensity """ self.buf = b'' + self.mode = 0 # 默认0 空气中,1:水中 + self.buf_ip = b'' self.it = None self.light_int = None # 未标定的整数值 self.spectrum = None # 光谱强度 + self.ip = [] # self.current_buf = "" # self.current_buf_seq = 0 # self.current_it_int = {"it": 0, "light_int": []} # 积分时间及换算的整数值 @@ -39,10 +42,14 @@ class Ramses(object): # self.current_cal = {} # 当前传感器的序列号 pass - def setBuf(self, buf: bytes): + def setBuf( self, buf: bytes ): self.buf = buf pass + def setMode( self, mode = 1 ): + self.mode = mode + pass + def setCalCfg(self, d: dict): self.cal_cfg = d pass @@ -86,27 +93,31 @@ class Ramses(object): if len_ < 576: return if ip_included: + self.buf_ip = self.buf[:26] + self.ip = self.decode_ip_buf(self.buf_ip, self.cal_cfg) self.buf = self.buf[26:] len_ = len_ - 26 if len_ % 576 != 0: return - for i in range(int(len_/576)): - res.update({i+1: {}}) + for i in range( int(len_/576) ): + res.update( {i+1: {}} ) temp_buf = self.buf[7:71] + self.buf[79:143] + \ self.buf[151:215] + self.buf[223:287] + \ self.buf[295:359] + self.buf[367:431] + \ self.buf[439:503] + self.buf[511:575] self.ConvertAndCalibrate( temp_buf ) # print(len(temp_buf)) - temp = self.__ConvertBytesToInt(temp_buf) - res.update( { i+1: temp } ) + # temp = self.__ConvertBytesToInt(temp_buf) + # res.update( { i+1: temp } ) # print(res) pass def ConvertAndCalibrate(self,) -> None: + '''单个成功数据转化 标定''' log.debug(f" ConvertAndCalibrate ", __name__) temp = self.__ConvertBytesToInt( ) + # print( f"int : {self.it} {temp}" ) self.__CalibrateSpectrumData( ) pass @@ -114,30 +125,40 @@ class Ramses(object): def __ConvertBytesToInt(self ) -> None: res = {} d = [] # List [ Tuple[ it:int, sing_set:tuple[int] ] ] - self.it = 2 << int(self.buf[1]) # integrated time + print(f" ================= ") + print(f" {self.buf.hex()} ") + # self.it = 2 << int(self.buf[1]) # integrated time + self.it = 2 << int(self.buf[0]) # integrated time self.light_int = struct.unpack( " bytes: + '''去除遮罩 0x64 0x65 0x66 0x67''' + ret = b'' + flag = False + blen = len(buf) + for i in range(blen): + if flag == False and buf[i] == 64 : + flag = True + continue + if flag == False and buf[i] != 64 : + ret = ret + buf[i].to_bytes( 1, byteorder = 'big' ) + continue + if flag == True and buf[i] == 100: + ret = ret + b'\x40' + flag = False + continue + if flag == True and buf[i] == 101: + ret = ret + b'\x23' + flag = False + continue + if flag == True and buf[i] == 102: + ret = ret + b'\x11' + flag = False + continue + if flag == True and buf[i] == 103: + ret = ret + b'\x13' + flag = False + continue + return ret + pass + + def decode_ip_buf(self, buf, ip_cal:dict): + tmpbuf = buf + if len(tmpbuf) ==26 and tmpbuf[0] == 0x13: + tmpbuf = tmpbuf[2:] + + Incl_XGain = float(ip_cal['Incl_XOffset'] ) + Incl_XOffset = float(ip_cal['Incl_XOffset'] ) + Incl_YGain = float(ip_cal['Incl_YGain'] ) + Incl_YOffset = float(ip_cal['Incl_YOffset'] ) + + Incl_KRef = float(ip_cal['Incl_KRef'] ) + Press_Sens_mV_bar_1mA = float(ip_cal['Press_Sens_mV_bar_1mA'] ) + Incl_KBG = float(ip_cal['Incl_KBG'] ) + Press_Sens_mV_bar_4mA = float(ip_cal['Press_Sens_mV_bar_4mA'] ) + Press_Gain = float(ip_cal['Press_Gain'] ) + Press_Surface_bar = float(ip_cal['Press_Surface_bar'] ) - # def read_bin(self,fpath: Path): - # log.debug(f" readbin: ", __name__, "", "" ) - # ret = None - # if not fpath.exists() : - # log.info(f"not find file: {fpath} ") - # return ret - # with open(fpath, 'rb') as file: - # ret = file.read() - # return ret - # log.debug(f" readbin: {ret} ", __name__, "", "" ) - # return ret - # pass + + ip_info = struct.unpack("