Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sensors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ def _try_add_sensor(import_name: str, class_name: str) -> None:
_try_add_sensor("tcs3472", "TCS3472")
_try_add_sensor("tcs3430", "TCS3430")
_try_add_sensor("opt4048", "OPT4048")
_try_add_sensor("opt4060", "OPT4060")
Comment thread
lincoltd7 marked this conversation as resolved.
_try_add_sensor("ina226", "INA226")
62 changes: 26 additions & 36 deletions sensors/opt4048.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,47 +106,37 @@ def __init__(self):
super().__init__()
self._overload = False

# ── Low-level I2C (16-bit big-endian registers) ──────────────────────────

def _read_reg16(self, reg: int) -> int:
"""Read a 16-bit big-endian register."""
return self._read_u16_be(reg)

def _write_reg16(self, reg: int, value: int):
"""Write a 16-bit big-endian register."""
self._write_u16_be(reg, value)

# ── Configuration helpers (public API) ───────────────────────────────────

def set_range(self, rng: int):
"""Set the measurement range (use RANGE_* constants or RANGE_AUTO)."""
cfg = self._read_reg16(_REG_CONFIG)
cfg = self._read_u16_be(_REG_CONFIG)
cfg = (cfg & ~(0x0F << 10)) | ((rng & 0x0F) << 10)
self._write_reg16(_REG_CONFIG, cfg)
self._write_u16_be(_REG_CONFIG, cfg)

def get_range(self) -> int:
"""Return the current range setting."""
return (self._read_reg16(_REG_CONFIG) >> 10) & 0x0F
return (self._read_u16_be(_REG_CONFIG) >> 10) & 0x0F

def set_conversion_time(self, ct: int):
"""Set the per-channel conversion time (use CONV_* constants)."""
cfg = self._read_reg16(_REG_CONFIG)
cfg = self._read_u16_be(_REG_CONFIG)
cfg = (cfg & ~(0x0F << 6)) | ((ct & 0x0F) << 6)
self._write_reg16(_REG_CONFIG, cfg)
self._write_u16_be(_REG_CONFIG, cfg)

def get_conversion_time(self) -> int:
"""Return the current conversion-time setting."""
return (self._read_reg16(_REG_CONFIG) >> 6) & 0x0F
return (self._read_u16_be(_REG_CONFIG) >> 6) & 0x0F

def set_mode(self, mode: int):
"""Set the operating mode (use MODE_* constants)."""
cfg = self._read_reg16(_REG_CONFIG)
cfg = self._read_u16_be(_REG_CONFIG)
cfg = (cfg & ~(0x03 << 4)) | ((mode & 0x03) << 4)
self._write_reg16(_REG_CONFIG, cfg)
self._write_u16_be(_REG_CONFIG, cfg)

def get_mode(self) -> int:
"""Return the current operating mode."""
return (self._read_reg16(_REG_CONFIG) >> 4) & 0x03
return (self._read_u16_be(_REG_CONFIG) >> 4) & 0x03

def set_interrupt_enabled(self, enabled: bool):
"""Enable or disable conversion-ready interrupt on the INT pin.
Expand All @@ -155,33 +145,33 @@ def set_interrupt_enabled(self, enabled: bool):
converted, allowing the host to poll the status register rather than
busy-waiting.
"""
tcfg = self._read_reg16(_REG_THRESH_CFG)
tcfg = self._read_u16_be(_REG_THRESH_CFG)
if enabled:
tcfg = (tcfg & ~(0x03 << 2)) | (_INT_CFG_ALL_READY << 2)
else:
tcfg = (tcfg & ~(0x03 << 2)) | (_INT_CFG_DISABLED << 2)
self._write_reg16(_REG_THRESH_CFG, tcfg)
self._write_u16_be(_REG_THRESH_CFG, tcfg)

def get_interrupt_enabled(self) -> bool:
"""Return True if the conversion-ready interrupt is enabled."""
return ((self._read_reg16(_REG_THRESH_CFG) >> 2) & 0x03) == _INT_CFG_ALL_READY
return ((self._read_u16_be(_REG_THRESH_CFG) >> 2) & 0x03) == _INT_CFG_ALL_READY


def set_latched_interrupt(self, enabled: bool, threshold_ch: int = 3, threshold_low: int = 0x0000, threshold_high: int = 0xFFFF):
"""Enable or disable threshold Latched interrupt."""
if enabled:
# Setup Threshold
self._write_reg16(_REG_THRESH_LO, threshold_low) # low threshold
self._write_reg16(_REG_THRESH_HI, threshold_high) # high threshold
self._write_u16_be(_REG_THRESH_LO, threshold_low) # low threshold
self._write_u16_be(_REG_THRESH_HI, threshold_high) # high threshold

cfg = self._read_reg16(_REG_CONFIG)
cfg = self._read_u16_be(_REG_CONFIG)
if enabled:
cfg |= 1 << 3 # INT_LATCH = 1 (latch interrupt until cleared by reading status)
else:
cfg &= ~(1 << 3) # INT_LATCH = 0 (non-latched interrupt)
self._write_reg16(_REG_CONFIG, cfg)
self._write_u16_be(_REG_CONFIG, cfg)

tcfg = self._read_reg16(_REG_THRESH_CFG)
tcfg = self._read_u16_be(_REG_THRESH_CFG)
if enabled:
tcfg = (tcfg & 0x8001) | (threshold_ch << 5) | (_INT_DIR_OUTPUT << 4) | (_INT_CFG_DISABLED << 2)
# 15-7: 0x80
Expand All @@ -191,13 +181,13 @@ def set_latched_interrupt(self, enabled: bool, threshold_ch: int = 3, threshold_
else:
# Make Int Pin an input
tcfg = tcfg & ~(1 << 4)
self._write_reg16(_REG_THRESH_CFG, tcfg)
self._write_u16_be(_REG_THRESH_CFG, tcfg)


# ── SensorBase interface ─────────────────────────────────────────────────

def _init(self) -> bool:
dev_id = self._read_reg16(_REG_DEVICE_ID)
dev_id = self._read_u16_be(_REG_DEVICE_ID)
if dev_id != _DEVICE_ID_EXPECT:
print(f"S:OPT4048 ID 0x{dev_id:04X} (expected 0x{_DEVICE_ID_EXPECT:04X}) - rejecting")
return False
Expand All @@ -210,23 +200,23 @@ def _init(self) -> bool:
# INT polarity: active-low (bit 2 = 0)
# Fault count : 1 (bits 1:0 = 0)
cfg = (RANGE_AUTO << 10) | (CONV_1_8MS << 6) | (MODE_CONTINUOUS << 4) | 0x08
self._write_reg16(_REG_CONFIG, cfg)
self._write_u16_be(_REG_CONFIG, cfg)

# Enable conversion-ready interrupt so status polling works
#self.set_interrupt_enabled(True)
# The conversion ready interrupt is only 1us in duration which is too short for the LS pin to
# reliably capture, so we enable latching mode and poll the status register for the ready flag instead.
self.set_latched_interrupt(True, threshold_low = 0x8400, threshold_high = 0x8400)

#r = self._read_reg16(_REG_THRESH_LO)
#r = self._read_u16_be(_REG_THRESH_LO)
#print(f"thresh[8]: 0x{r:04X}")
#r = self._read_reg16(_REG_THRESH_HI)
#r = self._read_u16_be(_REG_THRESH_HI)
#print(f"thresh[9]: 0x{r:04X}")
#r = self._read_reg16(_REG_CONFIG)
#r = self._read_u16_be(_REG_CONFIG)
#print(f"config[A]: 0x{r:04X}")
#r = self._read_reg16(_REG_THRESH_CFG)
#r = self._read_u16_be(_REG_THRESH_CFG)
#print(f"thresh[B]: 0x{r:04X}")
#r = self._read_reg16(_REG_STATUS)
#r = self._read_u16_be(_REG_STATUS)
#print(f"status[C]: 0x{r:04X}")

return True
Expand All @@ -235,7 +225,7 @@ def _measure(self) -> dict:
# Poll status for conversion-ready; timeout after 30 ms
deadline = time.ticks_add(time.ticks_ms(), self.READ_INTERVAL_MS)
while True:
st = self._read_reg16(_REG_STATUS)
st = self._read_u16_be(_REG_STATUS)
print(f"OPT4048 status: 0x{st:04X}")
Comment thread
lincoltd7 marked this conversation as resolved.
if st & _FLAG_READY:
self._overload = bool(st & _FLAG_OVERLOAD)
Expand Down
Loading
Loading