Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 166 additions & 4 deletions radiacode/radiacode.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
CTRL,
VS,
VSFR,
AlarmLimits,
DisplayDirection,
DoseRateDB,
Event,
Expand Down Expand Up @@ -138,10 +139,42 @@ def write_request(self, command_id: int | VSFR, data: Optional[bytes] = None) ->
assert retcode == 1
assert r.size() == 0

def batch_read_vsfrs(self, vsfr_ids: list[VSFR]) -> list[int]:
assert len(vsfr_ids)
r = self.execute(COMMAND.RD_VIRT_SFR_BATCH, b''.join(struct.pack('<I', int(c)) for c in vsfr_ids))
ret = [r.unpack('<I')[0] for _ in range(len(vsfr_ids))]
def batch_read_vsfrs(self, vsfr_ids: list[VSFR], unpack_format: str) -> list[int | float]:
"""Read multiple VSFRs

Args:
vsfr_ids: a list of VSFRs to fetch
unpack_format: a `struct` format string used to unpack the response.

Byte order and word length indicators in unpack_format may be omitted
and will be removed if given, as the device uses standard size little
endian format.

Repeat count is not supported (use "ffff" instead of "4f") as the length
of the unpack_format string must equal the number of VSFRs being fetched.
"""
nvsfr = len(vsfr_ids)
if nvsfr == 0:
raise ValueError('No VSFRs specified')

if not all([isinstance(i, VSFR) for i in vsfr_ids]):
raise ValueError('vsfr_ids must be a list of VSFRs')

unpack_format = unpack_format.strip('@<=>!')
if not (isinstance(unpack_format, str) and len(unpack_format) == nvsfr):
raise ValueError(f'invalid unpack_format `{unpack_format}`')

msg = [struct.pack('<I', nvsfr)]
msg.extend([struct.pack('<I', int(c)) for c in vsfr_ids])
r = self.execute(COMMAND.RD_VIRT_SFR_BATCH, b''.join(msg))

valid_flags = r.unpack('<I')[0]
expected_flags = (1 << nvsfr) - 1
if valid_flags != expected_flags:
raise ValueError(f'Unexpected validity flags, bad vsfr_id? {valid_flags:08b} != {expected_flags:08b}')

ret = [r.unpack(f'<{unpack_format[i]}')[0] for i in range(nvsfr)]

assert r.size() == 0
return ret

Expand Down Expand Up @@ -383,3 +416,132 @@ def set_vibro_ctrl(self, ctrls: list[CTRL]) -> None:
assert c != CTRL.CLICKS, 'CTRL.CLICKS not supported for vibro'
flags |= int(c)
self.write_request(VSFR.VIBRO_CTRL, struct.pack('<I', flags))

def get_alarm_limits(self) -> AlarmLimits:
"Retrieve the alarm limits"
regs = [
VSFR.CR_LEV1_cp10s,
VSFR.CR_LEV2_cp10s,
VSFR.DR_LEV1_uR_h,
VSFR.DR_LEV2_uR_h,
VSFR.DS_LEV1_uR,
VSFR.DS_LEV2_uR,
VSFR.DS_UNITS,
VSFR.CR_UNITS,
]

resp = self.batch_read_vsfrs(regs, 'I' * len(regs))

dose_multiplier = 100 if resp[6] else 1
count_multiplier = 60 if resp[7] else 1
return AlarmLimits(
l1_count_rate=resp[0] / 10 * count_multiplier,
l2_count_rate=resp[1] / 10 * count_multiplier,
l1_dose_rate=resp[2] / dose_multiplier,
l2_dose_rate=resp[3] / dose_multiplier,
l1_dose=resp[4] / 1e6 / dose_multiplier,
l2_dose=resp[5] / 1e6 / dose_multiplier,
dose_unit='Sv' if resp[6] else 'R',
count_unit='cpm' if resp[7] else 'cps',
)

def set_alarm_limits(
self,
l1_count_rate: int | float | None = None,
l2_count_rate: int | float | None = None,
l1_dose_rate: int | float | None = None,
l2_dose_rate: int | float | None = None,
l1_dose: int | float | None = None,
l2_dose: int | float | None = None,
dose_unit_sv: bool | None = None,
count_unit_cpm: bool | None = None,
) -> bool:
"""Set alarm limits - returns True if the specified limits were set

Args:
l1_count_rate: count rate at which to raise a level 1 alarm
l2_count_rate: count rate at which to raise a level 2 alarm
l1_dose_rate: dose rate (micro-unit/hr) at which to raise a level 1 alarm
l2_dose_rate: dose rate (micro-unit/hr) at which to raise a level 2 alarm
l1_dose: accumulated dose (micro-unit) at which to raise a level 1 alarm
l2_dose: accumulated dose (micro-unit) at which to raise a level 2 alarm
dose_unit_sv = specify the dose in Sievert rather than Roentgen
count_unit_cpm = set device count rate reporting to cpm rather than cps

Internally, the device stores count rate in counts/10s and dose in uR. It
appears that the device uses a fixed 100Sv/R converstion.

If count_unit_cpm is not specified, the count rate register(s) will be set to
the specified values without any conversion. If it is specified, count rate
will be scaled, and the display units register will also be set.

If dose_unit_sv is not specified the dose argument is assumed to be in uR,
and the dose alarm register will be set. If dose_unit_sv is true, the dose
argument will be assumed to be in uSv, will be converted to uR and stored,
and the display unit will be set to Sv. If dose_unit_sv is False, the dose
argument will be assumed to be in uR, will be stored as such, and the display
unit will be set to Sv.
"""

which_limits = []
limit_values = []

dose_multiplier = 100 if dose_unit_sv is True else 1
if isinstance(count_unit_cpm, bool):
count_multiplier = 1 / 6 if count_unit_cpm else 10
else:
count_multiplier = 1

if isinstance(l1_count_rate, (int, float)):
if l1_count_rate < 0:
raise ValueError('bad l1_count_rate')
which_limits.append(VSFR.CR_LEV1_cp10s)
limit_values.append(round(l1_count_rate * count_multiplier))

if isinstance(l2_count_rate, (int, float)):
if l2_count_rate < 0:
raise ValueError('bad l2_count_rate')
which_limits.append(VSFR.CR_LEV2_cp10s)
limit_values.append(round(l2_count_rate * count_multiplier))

if isinstance(l1_dose_rate, (int, float)):
if l1_dose_rate < 0:
raise ValueError('bad l1_dose_rate')
which_limits.append(VSFR.DR_LEV1_uR_h)
limit_values.append(round(l1_dose_rate * dose_multiplier))

if isinstance(l2_dose_rate, (int, float)):
if l2_dose_rate < 0:
raise ValueError('bad l2_dose_rate')
which_limits.append(VSFR.DR_LEV2_uR_h)
limit_values.append(round(l2_dose_rate * dose_multiplier))

if isinstance(l1_dose, (int, float)):
if l1_dose < 0:
raise ValueError('bad l1_dose')
which_limits.append(VSFR.DS_LEV1_uR)
limit_values.append(round(l1_dose * dose_multiplier))

if isinstance(l2_dose, (int, float)):
if l2_dose < 0:
raise ValueError('bad l2_dose')
which_limits.append(VSFR.DS_LEV2_uR)
limit_values.append(round(l2_dose * dose_multiplier))

if isinstance(dose_unit_sv, bool):
which_limits.append(VSFR.DS_UNITS)
limit_values.append(int(dose_unit_sv))

if isinstance(count_unit_cpm, bool):
which_limits.append(VSFR.CR_UNITS)
limit_values.append(int(count_unit_cpm))

num_to_set = len(which_limits)
if not num_to_set:
raise ValueError('No limits specified')

pack_items = [num_to_set] + [int(x) for x in which_limits] + limit_values
pack_format = f'<I{num_to_set}I{num_to_set}I'
resp = self.execute(COMMAND.WR_VIRT_SFR_BATCH, struct.pack(pack_format, *pack_items))
expected_valid = (1 << len(which_limits)) - 1
return expected_valid == resp.unpack('<I')[0]
26 changes: 26 additions & 0 deletions radiacode/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,32 @@ class Spectrum:
counts: list[int]


@dataclass
class AlarmLimits:
"""Alarm limits

The count rate may be per second or per minute depending on device
configuration. The `count_unit` attribute indicates which.

Dose rate is in micro-units (Sv or R) per hour.

Accumulated dos is in micro-units (Sv or R).

The dose unit may be Roentgen or Sievert depending on device
configuration. The `dose_unit` attribute indicates which. There seems
to be a fixed 100Sv/R conversion within the device
"""

l1_count_rate: float
l2_count_rate: float
count_unit: str
l1_dose_rate: float
l2_dose_rate: float
l1_dose: float
l2_dose: float
dose_unit: str


class DisplayDirection(Enum):
AUTO = 0
RIGHT = 1
Expand Down