From 5cd8aac139a941f80d919791d2cd931a2a52d615 Mon Sep 17 00:00:00 2001 From: Chris Kuethe Date: Tue, 29 Apr 2025 11:52:16 -0700 Subject: [PATCH 1/3] Fix batch_read_vsfrs The read request should be encoded as int(n)+list[int(vsfr]); telling the device first how many VSFRs you wish to read, followed by their IDs. Incorrect encoding will crash your device. Then, depending on which VSFRs you read, you might want to decode them as something other than ints. By way of example, you could retrieve the calibration coefficients by hand with ``` resp = rc103.batch_read_vsfrs( [VSFR.CHN_TO_keV_A0, VSFR.CHN_TO_keV_A1, VSFR.CHN_TO_keV_A2], "<4x3f") ``` Inspecting the result from `execute()`, I get back the following 16 byte response: '07000000f5dbc4c084f51f40bc5ff339'. Without providing a format that would unhelpfully decode to [7, 3234126837, 1075836292, 972251068] By providing the format string `<4x3f`, that decodes to (-6.15185022354126, 2.4993600845336914, 0.00046419899445027113) which are in fact the calibration coefficients for my device. I get the same values from the `energy_calib()` function, as well as from querying the VSFRs individually. --- radiacode/radiacode.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/radiacode/radiacode.py b/radiacode/radiacode.py index d9bd5ab..3e0e224 100644 --- a/radiacode/radiacode.py +++ b/radiacode/radiacode.py @@ -138,10 +138,25 @@ def write_request(self, command_id: int | VSFR, data: Optional[bytes] = None) -> assert retcode == 1 assert r.size() == 0 - def batch_read_vsfrs(self, vsfr_ids: list[VSFR]) -> list[int]: + def batch_read_vsfrs(self, vsfr_ids: list[VSFR], unpack_format=None) -> list[int]: + """Read multiple VSFRs + + Args: + vsfr_ids: a list of VSFRs to fetch + unpack_format: a `struct` format string used to unpack the response + into certain data types, eg. '<4x3f` to skip the first 4 bytes, + and interpret the remaining bytes into 3 floats. + + If not given, the response is simply decoded into a list of ints. + """ assert len(vsfr_ids) - r = self.execute(COMMAND.RD_VIRT_SFR_BATCH, b''.join(struct.pack(' Date: Tue, 29 Apr 2025 17:40:21 -0700 Subject: [PATCH 2/3] Getting/Setting alarm limits --- radiacode/radiacode.py | 133 +++++++++++++++++++++++++++++++++++++++++ radiacode/types.py | 26 ++++++++ 2 files changed, 159 insertions(+) diff --git a/radiacode/radiacode.py b/radiacode/radiacode.py index 3e0e224..cf65406 100644 --- a/radiacode/radiacode.py +++ b/radiacode/radiacode.py @@ -20,6 +20,7 @@ CTRL, VS, VSFR, + AlarmLimits, DisplayDirection, DoseRateDB, Event, @@ -398,3 +399,135 @@ def set_vibro_ctrl(self, ctrls: list[CTRL]) -> None: assert c != CTRL.CLICKS, 'CTRL.CLICKS not supported for vibro' flags |= int(c) self.write_request(VSFR.VIBRO_CTRL, struct.pack(' AlarmLimits: + "Retrieve the alarm limits" + regs = [ + VSFR.CR_LEV1_cp10s, + VSFR.CR_LEV2_cp10s, + VSFR.DR_LEV1_uR_h, + VSFR.DR_LEV2_uR_h, + VSFR.DS_LEV1_uR, + VSFR.DS_LEV2_uR, + VSFR.DS_UNITS, + VSFR.CR_UNITS, + ] + expected_valid = (1 << len(regs)) - 1 + + resp = self.batch_read_vsfrs(regs) + + assert resp[0] == expected_valid + + dose_multiplier = 100 if resp[7] else 1 + count_multiplier = 60 if resp[8] else 1 + return AlarmLimits( + l1_count_rate=resp[1] / 10 * count_multiplier, + l2_count_rate=resp[2] / 10 * count_multiplier, + l1_dose_rate=resp[3] / dose_multiplier, + l2_dose_rate=resp[4] / dose_multiplier, + l1_dose=resp[5] / 1e6 / dose_multiplier, + l2_dose=resp[6] / 1e6 / dose_multiplier, + dose_unit='Sv' if resp[7] else 'R', + count_unit='cpm' if resp[8] else 'cps', + ) + + def set_alarm_limits( + self, + l1_count_rate: int | float | None = None, + l2_count_rate: int | float | None = None, + l1_dose_rate: int | float | None = None, + l2_dose_rate: int | float | None = None, + l1_dose: int | float | None = None, + l2_dose: int | float | None = None, + dose_unit_sv: bool | None = None, + count_unit_cpm: bool | None = None, + ) -> bool: + """Set alarm limits - returns True if the specified limits were set + + Args: + l1_count_rate: count rate at which to raise a level 1 alarm + l2_count_rate: count rate at which to raise a level 2 alarm + l1_dose_rate: dose rate (micro-unit/hr) at which to raise a level 1 alarm + l2_dose_rate: dose rate (micro-unit/hr) at which to raise a level 2 alarm + l1_dose: accumulated dose (micro-unit) at which to raise a level 1 alarm + l2_dose: accumulated dose (micro-unit) at which to raise a level 2 alarm + dose_unit_sv = specify the dose in Sievert rather than Roentgen + count_unit_cpm = set device count rate reporting to cpm rather than cps + + Internally, the device stores count rate in counts/10s and dose in uR. It + appears that the device uses a fixed 100Sv/R converstion. + + If count_unit_cpm is not specified, the count rate register(s) will be set to + the specified values without any conversion. If it is specified, count rate + will be scaled, and the display units register will also be set. + + If dose_unit_sv is not specified the dose argument is assumed to be in uR, + and the dose alarm register will be set. If dose_unit_sv is true, the dose + argument will be assumed to be in uSv, will be converted to uR and stored, + and the display unit will be set to Sv. If dose_unit_sv is False, the dose + argument will be assumed to be in uR, will be stored as such, and the display + unit will be set to Sv. + """ + + which_limits = [] + limit_values = [] + + dose_multiplier = 100 if dose_unit_sv is True else 1 + if isinstance(count_unit_cpm, bool): + count_multiplier = 1 / 6 if count_unit_cpm else 10 + else: + count_multiplier = 1 + + if isinstance(l1_count_rate, (int, float)): + if l1_count_rate < 0: + raise ValueError('bad l1_count_rate') + which_limits.append(VSFR.CR_LEV1_cp10s) + limit_values.append(round(l1_count_rate * count_multiplier)) + + if isinstance(l2_count_rate, (int, float)): + if l2_count_rate < 0: + raise ValueError('bad l2_count_rate') + which_limits.append(VSFR.CR_LEV2_cp10s) + limit_values.append(round(l2_count_rate * count_multiplier)) + + if isinstance(l1_dose_rate, (int, float)): + if l1_dose_rate < 0: + raise ValueError('bad l1_dose_rate') + which_limits.append(VSFR.DR_LEV1_uR_h) + limit_values.append(round(l1_dose_rate * dose_multiplier)) + + if isinstance(l2_dose_rate, (int, float)): + if l2_dose_rate < 0: + raise ValueError('bad l2_dose_rate') + which_limits.append(VSFR.DR_LEV2_uR_h) + limit_values.append(round(l2_dose_rate * dose_multiplier)) + + if isinstance(l1_dose, (int, float)): + if l1_dose < 0: + raise ValueError('bad l1_dose') + which_limits.append(VSFR.DS_LEV1_uR) + limit_values.append(round(l1_dose * dose_multiplier)) + + if isinstance(l2_dose, (int, float)): + if l2_dose < 0: + raise ValueError('bad l2_dose') + which_limits.append(VSFR.DS_LEV2_uR) + limit_values.append(round(l2_dose * dose_multiplier)) + + if isinstance(dose_unit_sv, bool): + which_limits.append(VSFR.DS_UNITS) + limit_values.append(int(dose_unit_sv)) + + if isinstance(count_unit_cpm, bool): + which_limits.append(VSFR.CR_UNITS) + limit_values.append(int(count_unit_cpm)) + + num_to_set = len(which_limits) + if not num_to_set: + raise ValueError('No limits specified') + + pack_items = [num_to_set] + [int(x) for x in which_limits] + limit_values + pack_format = f' Date: Thu, 1 May 2025 16:07:13 -0700 Subject: [PATCH 3/3] more consistency checks and type enforcement in batch_read_vsfrs Callers of batch_read_vsfrs should know the data type of each requested VSFR, and should therefore be able to specify a correct format string. Using an inappropriate format string will raise an error. Require that vsfr_ids is a list of VSFR dataclasses. This prevents the use of arbitrary integers as a VSFR to read. Using other data types will raise an error. Add a check for valid reads. The first item returned by the device is a bitfield indicating which VSFRs were read successfully. Use this to check that batch read operation was completely successful. As only the defined VSFRs are permitted, and those were retrieved from the device, there should be no reason for any read to fail as it might by retrieving an arbitrary location (eg. 0xABAD1DEA). If such a failure is detected, an execption is raised. --- radiacode/radiacode.py | 64 +++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/radiacode/radiacode.py b/radiacode/radiacode.py index cf65406..b534061 100644 --- a/radiacode/radiacode.py +++ b/radiacode/radiacode.py @@ -139,25 +139,42 @@ def write_request(self, command_id: int | VSFR, data: Optional[bytes] = None) -> assert retcode == 1 assert r.size() == 0 - def batch_read_vsfrs(self, vsfr_ids: list[VSFR], unpack_format=None) -> list[int]: + def batch_read_vsfrs(self, vsfr_ids: list[VSFR], unpack_format: str) -> list[int | float]: """Read multiple VSFRs Args: vsfr_ids: a list of VSFRs to fetch - unpack_format: a `struct` format string used to unpack the response - into certain data types, eg. '<4x3f` to skip the first 4 bytes, - and interpret the remaining bytes into 3 floats. + unpack_format: a `struct` format string used to unpack the response. - If not given, the response is simply decoded into a list of ints. + Byte order and word length indicators in unpack_format may be omitted + and will be removed if given, as the device uses standard size little + endian format. + + Repeat count is not supported (use "ffff" instead of "4f") as the length + of the unpack_format string must equal the number of VSFRs being fetched. """ - assert len(vsfr_ids) - msg = [struct.pack('!') + if not (isinstance(unpack_format, str) and len(unpack_format) == nvsfr): + raise ValueError(f'invalid unpack_format `{unpack_format}`') + + msg = [struct.pack(' AlarmLimits: VSFR.DS_UNITS, VSFR.CR_UNITS, ] - expected_valid = (1 << len(regs)) - 1 - - resp = self.batch_read_vsfrs(regs) - assert resp[0] == expected_valid + resp = self.batch_read_vsfrs(regs, 'I' * len(regs)) - dose_multiplier = 100 if resp[7] else 1 - count_multiplier = 60 if resp[8] else 1 + dose_multiplier = 100 if resp[6] else 1 + count_multiplier = 60 if resp[7] else 1 return AlarmLimits( - l1_count_rate=resp[1] / 10 * count_multiplier, - l2_count_rate=resp[2] / 10 * count_multiplier, - l1_dose_rate=resp[3] / dose_multiplier, - l2_dose_rate=resp[4] / dose_multiplier, - l1_dose=resp[5] / 1e6 / dose_multiplier, - l2_dose=resp[6] / 1e6 / dose_multiplier, - dose_unit='Sv' if resp[7] else 'R', - count_unit='cpm' if resp[8] else 'cps', + l1_count_rate=resp[0] / 10 * count_multiplier, + l2_count_rate=resp[1] / 10 * count_multiplier, + l1_dose_rate=resp[2] / dose_multiplier, + l2_dose_rate=resp[3] / dose_multiplier, + l1_dose=resp[4] / 1e6 / dose_multiplier, + l2_dose=resp[5] / 1e6 / dose_multiplier, + dose_unit='Sv' if resp[6] else 'R', + count_unit='cpm' if resp[7] else 'cps', ) def set_alarm_limits(