|
|
|
@ -28,9 +28,12 @@ class Ramses(object): |
|
|
|
|
@Returns : realWavelength Intensity |
|
|
|
|
""" |
|
|
|
|
self.buf = b'' |
|
|
|
|
self.mode = 0 # 默认0 空气中,1:水中 |
|
|
|
|
self.buf_ip = b'' |
|
|
|
|
self.it = None |
|
|
|
|
self.light_int = None # 未标定的整数值 |
|
|
|
|
self.spectrum = None # 光谱强度 |
|
|
|
|
self.ip = [] |
|
|
|
|
# self.current_buf = "" |
|
|
|
|
# self.current_buf_seq = 0 |
|
|
|
|
# self.current_it_int = {"it": 0, "light_int": []} # 积分时间及换算的整数值 |
|
|
|
@ -39,10 +42,14 @@ class Ramses(object): |
|
|
|
|
# self.current_cal = {} # 当前传感器的序列号 |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
def setBuf(self, buf: bytes): |
|
|
|
|
def setBuf( self, buf: bytes ): |
|
|
|
|
self.buf = buf |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
def setMode( self, mode = 1 ): |
|
|
|
|
self.mode = mode |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
def setCalCfg(self, d: dict): |
|
|
|
|
self.cal_cfg = d |
|
|
|
|
pass |
|
|
|
@ -86,27 +93,31 @@ class Ramses(object): |
|
|
|
|
if len_ < 576: |
|
|
|
|
return |
|
|
|
|
if ip_included: |
|
|
|
|
self.buf_ip = self.buf[:26] |
|
|
|
|
self.ip = self.decode_ip_buf(self.buf_ip, self.cal_cfg) |
|
|
|
|
self.buf = self.buf[26:] |
|
|
|
|
len_ = len_ - 26 |
|
|
|
|
if len_ % 576 != 0: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
for i in range(int(len_/576)): |
|
|
|
|
res.update({i+1: {}}) |
|
|
|
|
for i in range( int(len_/576) ): |
|
|
|
|
res.update( {i+1: {}} ) |
|
|
|
|
temp_buf = self.buf[7:71] + self.buf[79:143] + \ |
|
|
|
|
self.buf[151:215] + self.buf[223:287] + \ |
|
|
|
|
self.buf[295:359] + self.buf[367:431] + \ |
|
|
|
|
self.buf[439:503] + self.buf[511:575] |
|
|
|
|
self.ConvertAndCalibrate( temp_buf ) |
|
|
|
|
# print(len(temp_buf)) |
|
|
|
|
temp = self.__ConvertBytesToInt(temp_buf) |
|
|
|
|
res.update( { i+1: temp } ) |
|
|
|
|
# temp = self.__ConvertBytesToInt(temp_buf) |
|
|
|
|
# res.update( { i+1: temp } ) |
|
|
|
|
# print(res) |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
def ConvertAndCalibrate(self,) -> None: |
|
|
|
|
'''单个成功数据转化 标定''' |
|
|
|
|
log.debug(f" ConvertAndCalibrate ", __name__) |
|
|
|
|
temp = self.__ConvertBytesToInt( ) |
|
|
|
|
# print( f"int : {self.it} {temp}" ) |
|
|
|
|
self.__CalibrateSpectrumData( ) |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
@ -114,30 +125,40 @@ class Ramses(object): |
|
|
|
|
def __ConvertBytesToInt(self ) -> None: |
|
|
|
|
res = {} |
|
|
|
|
d = [] # List [ Tuple[ it:int, sing_set:tuple[int] ] ] |
|
|
|
|
self.it = 2 << int(self.buf[1]) # integrated time |
|
|
|
|
print(f" ================= ") |
|
|
|
|
print(f" {self.buf.hex()} ") |
|
|
|
|
# self.it = 2 << int(self.buf[1]) # integrated time |
|
|
|
|
self.it = 2 << int(self.buf[0]) # integrated time |
|
|
|
|
self.light_int = struct.unpack( |
|
|
|
|
"<HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH \ |
|
|
|
|
HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH \ |
|
|
|
|
HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH \ |
|
|
|
|
HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH", self.buf[2:]) |
|
|
|
|
print( self.light_int ) |
|
|
|
|
|
|
|
|
|
def __CalibrateSpectrumData(self ,) : |
|
|
|
|
t0 = 8092 |
|
|
|
|
log.debug(f" __CalibrateSpectrumData ..... ", __name__) |
|
|
|
|
|
|
|
|
|
raw = np.asarray(self.light_int, dtype=float) |
|
|
|
|
B0 = np.asarray(self.cal_cfg["b0"], dtype=float) |
|
|
|
|
B1 = np.asarray(self.cal_cfg["b1"], dtype=float) |
|
|
|
|
|
|
|
|
|
Mn = raw/65535 |
|
|
|
|
Bn = B0 + B1 * (self.it/t0) |
|
|
|
|
Bn = B0 + B1 * ( self.it/t0 ) |
|
|
|
|
Cn = Mn-Bn |
|
|
|
|
|
|
|
|
|
Offset = self.getOffset( |
|
|
|
|
Cn, int(self.cal_cfg['DarkPixelStart']), int(self.cal_cfg['DarkPixelStop'])) |
|
|
|
|
Dn = Cn-Offset |
|
|
|
|
En = Dn * (t0/self.it) |
|
|
|
|
Fn = En/np.asarray(self.cal_cfg["cal"], dtype=float) # 空气或水中的标定文件 |
|
|
|
|
Dn = Cn - Offset |
|
|
|
|
En = Dn * ( t0/self.it ) |
|
|
|
|
if self.mode == 0: |
|
|
|
|
Fn = En/np.asarray(self.cal_cfg["cal"], dtype=float) # 空气的标定文件 |
|
|
|
|
else: |
|
|
|
|
Fn = En/np.asarray(self.cal_cfg["calaq"], dtype=float) # 水中的标定文件 |
|
|
|
|
# Fn = En/np.asarray(self.cal_cfg["cal"], dtype=float) # 空气或水中的标定文件 |
|
|
|
|
self.spectrum = Fn |
|
|
|
|
# print(self.spectrum) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def getOffset(self, data: np.ndarray, start: int, stop: int): |
|
|
|
@ -145,20 +166,86 @@ class Ramses(object): |
|
|
|
|
for i in range(start-1, stop, 1): |
|
|
|
|
ret = ret + data[i] |
|
|
|
|
return ret / (stop - start + 1) |
|
|
|
|
|
|
|
|
|
def removeMask(self, buf:bytes ) -> bytes: |
|
|
|
|
'''去除遮罩 0x64 0x65 0x66 0x67''' |
|
|
|
|
ret = b'' |
|
|
|
|
flag = False |
|
|
|
|
blen = len(buf) |
|
|
|
|
for i in range(blen): |
|
|
|
|
if flag == False and buf[i] == 64 : |
|
|
|
|
flag = True |
|
|
|
|
continue |
|
|
|
|
if flag == False and buf[i] != 64 : |
|
|
|
|
ret = ret + buf[i].to_bytes( 1, byteorder = 'big' ) |
|
|
|
|
continue |
|
|
|
|
if flag == True and buf[i] == 100: |
|
|
|
|
ret = ret + b'\x40' |
|
|
|
|
flag = False |
|
|
|
|
continue |
|
|
|
|
if flag == True and buf[i] == 101: |
|
|
|
|
ret = ret + b'\x23' |
|
|
|
|
flag = False |
|
|
|
|
continue |
|
|
|
|
if flag == True and buf[i] == 102: |
|
|
|
|
ret = ret + b'\x11' |
|
|
|
|
flag = False |
|
|
|
|
continue |
|
|
|
|
if flag == True and buf[i] == 103: |
|
|
|
|
ret = ret + b'\x13' |
|
|
|
|
flag = False |
|
|
|
|
continue |
|
|
|
|
return ret |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
# def read_bin(self,fpath: Path): |
|
|
|
|
# log.debug(f" readbin: ", __name__, "", "" ) |
|
|
|
|
# ret = None |
|
|
|
|
# if not fpath.exists() : |
|
|
|
|
# log.info(f"not find file: {fpath} ") |
|
|
|
|
# return ret |
|
|
|
|
# with open(fpath, 'rb') as file: |
|
|
|
|
# ret = file.read() |
|
|
|
|
# return ret |
|
|
|
|
# log.debug(f" readbin: {ret} ", __name__, "", "" ) |
|
|
|
|
# return ret |
|
|
|
|
# pass |
|
|
|
|
def decode_ip_buf(self, buf, ip_cal:dict): |
|
|
|
|
tmpbuf = buf |
|
|
|
|
if len(tmpbuf) ==26 and tmpbuf[0] == 0x13: |
|
|
|
|
tmpbuf = tmpbuf[2:] |
|
|
|
|
|
|
|
|
|
Incl_XGain = float(ip_cal['Incl_XOffset'] ) |
|
|
|
|
Incl_XOffset = float(ip_cal['Incl_XOffset'] ) |
|
|
|
|
Incl_YGain = float(ip_cal['Incl_YGain'] ) |
|
|
|
|
Incl_YOffset = float(ip_cal['Incl_YOffset'] ) |
|
|
|
|
|
|
|
|
|
Incl_KRef = float(ip_cal['Incl_KRef'] ) |
|
|
|
|
Press_Sens_mV_bar_1mA = float(ip_cal['Press_Sens_mV_bar_1mA'] ) |
|
|
|
|
Incl_KBG = float(ip_cal['Incl_KBG'] ) |
|
|
|
|
Press_Sens_mV_bar_4mA = float(ip_cal['Press_Sens_mV_bar_4mA'] ) |
|
|
|
|
Press_Gain = float(ip_cal['Press_Gain'] ) |
|
|
|
|
Press_Surface_bar = float(ip_cal['Press_Surface_bar'] ) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ip_info = struct.unpack("<BBBBBBBBBBBBBBBBBBBBBBBB", buf) |
|
|
|
|
byte11 = ip_info[11] |
|
|
|
|
byte12 = ip_info[12] |
|
|
|
|
byte13 = ip_info[13] |
|
|
|
|
byte14 = ip_info[14] |
|
|
|
|
byte15 = ip_info[15] |
|
|
|
|
byte16 = ip_info[16] |
|
|
|
|
byte17 = ip_info[17] |
|
|
|
|
byte18 = ip_info[18] |
|
|
|
|
byte19 = ip_info[19] |
|
|
|
|
byte20 = ip_info[20] |
|
|
|
|
byte21 = ip_info[21] |
|
|
|
|
byte22 = ip_info[22] |
|
|
|
|
|
|
|
|
|
X = (byte11 -Incl_XOffset) / Incl_XGain # 单位 度 |
|
|
|
|
Y = (byte12 - Incl_YOffset ) / Incl_YGain # 单位 度 |
|
|
|
|
npress = byte14 *256 + byte13 |
|
|
|
|
nbg = byte18 * 256 + byte17 |
|
|
|
|
nrefh = byte20 * 256 + byte19 |
|
|
|
|
nrefl = byte22 * 256 + byte21 |
|
|
|
|
noffset = nrefl - ( Incl_KRef * (nrefh-nrefl)) |
|
|
|
|
VPress = Incl_KBG * (npress-noffset) / (nbg- noffset) #电压值 |
|
|
|
|
press_sens = Press_Sens_mV_bar_4mA |
|
|
|
|
if press_sens <= 0: |
|
|
|
|
press_sens = 4* Press_Sens_mV_bar_1mA |
|
|
|
|
p_bar = 1000 * VPress / (press_sens * Press_Gain ) |
|
|
|
|
press_delta = p_bar - 1.021 |
|
|
|
|
depth_m = press_delta * 10 |
|
|
|
|
return [depth_m,X,Y] |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|