From 17b94097d6aab329ba981ec042751153507f09d8 Mon Sep 17 00:00:00 2001 From: Edward Betts Date: Fri, 12 Jun 2026 16:08:41 +0100 Subject: [PATCH 1/4] Fix POSIX ioctls on big-endian systems --- serialx/platforms/serial_posix.py | 22 +++--- tests/test_serial_posix.py | 117 ++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 10 deletions(-) create mode 100644 tests/test_serial_posix.py diff --git a/serialx/platforms/serial_posix.py b/serialx/platforms/serial_posix.py index 6931942..06e7945 100644 --- a/serialx/platforms/serial_posix.py +++ b/serialx/platforms/serial_posix.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import array import errno import fcntl import logging @@ -320,8 +321,7 @@ def _get_modem_pins(self) -> ModemPins: """Get current modem control bits.""" assert self._fileno is not None - # A `bytearray` is critical here: `bytes` will not be mutated - buffer = bytearray((0x00000000).to_bytes(4, "little")) + buffer = array.array("i", [0x00000000]) try: fcntl.ioctl(self._fileno, termios.TIOCMGET, buffer) @@ -330,7 +330,7 @@ def _get_modem_pins(self) -> ModemPins: LOGGER.debug("Device is not a serial port, cannot get modem pins") return ModemPins() - n = int.from_bytes(buffer, "little") + n = buffer[0] return ModemPins( **{ name: PinState.HIGH if n & bit else PinState.LOW @@ -353,7 +353,9 @@ def _set_modem_pins(self, modem_pins: ModemPins) -> None: if all_pins_set: value = modem_pins_as_int(modem_pins) LOGGER.debug("Setting all with TIOCMSET: 0x%08X", value) - fcntl.ioctl(self._fileno, termios.TIOCMSET, value.to_bytes(4, "little")) + fcntl.ioctl( + self._fileno, termios.TIOCMSET, array.array("i", [value]) + ) else: to_set = modem_pins_mask_of_value(modem_pins, PinState.HIGH) to_clear = modem_pins_mask_of_value(modem_pins, PinState.LOW) @@ -361,13 +363,13 @@ def _set_modem_pins(self, modem_pins: ModemPins) -> None: if to_set: LOGGER.debug("Setting TIOCMBIS: 0x%08X", to_set) fcntl.ioctl( - self._fileno, termios.TIOCMBIS, to_set.to_bytes(4, "little") + self._fileno, termios.TIOCMBIS, array.array("i", [to_set]) ) if to_clear: LOGGER.debug("TIOCMBIC: 0x%08X", to_clear) fcntl.ioctl( - self._fileno, termios.TIOCMBIC, to_clear.to_bytes(4, "little") + self._fileno, termios.TIOCMBIC, array.array("i", [to_clear]) ) except OSError as exc: if exc.errno == errno.ENOTTY: @@ -461,20 +463,20 @@ def _write(self, data: Buffer, *, timeout: float | None) -> int: def num_unread_bytes(self) -> int: """Return the number of bytes waiting to be read.""" assert self._fileno is not None - buffer = bytearray((0x00000000).to_bytes(4, "little")) + buffer = array.array("i", [0x00000000]) fcntl.ioctl(self._fileno, termios.FIONREAD, buffer) - return int.from_bytes(buffer, "little") + return buffer[0] def num_unwritten_bytes(self) -> int: """Return the number of bytes waiting to be written.""" assert self._fileno is not None - buffer = bytearray((0x00000000).to_bytes(4, "little")) + buffer = array.array("i", [0x00000000]) fcntl.ioctl(self._fileno, termios.TIOCOUTQ, buffer) - return int.from_bytes(buffer, "little") + return buffer[0] def _reset_read_buffer(self) -> None: """Reset the read buffer.""" diff --git a/tests/test_serial_posix.py b/tests/test_serial_posix.py new file mode 100644 index 0000000..22b5c9d --- /dev/null +++ b/tests/test_serial_posix.py @@ -0,0 +1,117 @@ +"""POSIX serial port tests.""" + +import sys + +import pytest + +if sys.platform == "win32": + pytest.skip("POSIX-only tests", allow_module_level=True) + +import array +import errno +import termios +from typing import Any +from unittest.mock import patch + +from serialx import ModemPins, PinState +from serialx.platforms.serial_posix import PosixSerial + + +def test_num_unread_bytes_uses_native_int_ioctl_buffer() -> None: + """FIONREAD writes a native C int, not a little-endian byte string.""" + + def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int: + assert request == termios.FIONREAD + assert isinstance(arg, array.array) + assert arg.typecode == "i" + arg[0] = 123 + return 0 + + serial = PosixSerial(fileno=1) + with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl): + assert serial.num_unread_bytes() == 123 + + +def test_num_unwritten_bytes_uses_native_int_ioctl_buffer() -> None: + """TIOCOUTQ writes a native C int, not a little-endian byte string.""" + + def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int: + assert request == termios.TIOCOUTQ + assert isinstance(arg, array.array) + assert arg.typecode == "i" + arg[0] = 456 + return 0 + + serial = PosixSerial(fileno=1) + with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl): + assert serial.num_unwritten_bytes() == 456 + + +def test_get_modem_pins_uses_native_int_ioctl_buffer() -> None: + """TIOCMGET writes a native C int, not a little-endian byte string.""" + + def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int: + assert request == termios.TIOCMGET + assert isinstance(arg, array.array) + assert arg.typecode == "i" + arg[0] = termios.TIOCM_DTR + return 0 + + serial = PosixSerial(fileno=1) + with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl): + pins = serial.get_modem_pins() + + assert pins.dtr is PinState.HIGH + assert pins.rts is PinState.LOW + + +def test_set_modem_pins_uses_native_int_ioctl_buffer() -> None: + """TIOCMSET reads a native C int, not a little-endian byte string.""" + + def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int: + assert request == termios.TIOCMSET + assert isinstance(arg, array.array) + assert arg.typecode == "i" + assert arg[0] & termios.TIOCM_DTR + assert arg[0] & termios.TIOCM_RTS + return 0 + + serial = PosixSerial(fileno=1) + with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl): + serial._set_modem_pins( + ModemPins( + le=PinState.LOW, + dtr=PinState.HIGH, + rts=PinState.HIGH, + st=PinState.LOW, + sr=PinState.LOW, + cts=PinState.LOW, + car=PinState.LOW, + rng=PinState.LOW, + dsr=PinState.LOW, + ) + ) + + +def test_partial_set_modem_pins_uses_native_int_ioctl_buffer() -> None: + """TIOCMBIS/TIOCMBIC read native C ints, not little-endian byte strings.""" + + seen_requests = [] + + def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int: + assert isinstance(arg, array.array) + assert arg.typecode == "i" + seen_requests.append(request) + if request == termios.TIOCMBIS: + assert arg[0] == termios.TIOCM_DTR + elif request == termios.TIOCMBIC: + assert arg[0] == termios.TIOCM_RTS + else: + raise OSError(errno.ENOTTY, "unexpected ioctl") + return 0 + + serial = PosixSerial(fileno=1) + with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl): + serial.set_modem_pins(dtr=True, rts=False) + + assert seen_requests == [termios.TIOCMBIS, termios.TIOCMBIC] From 40fc4b120f3cfd60d64289430b50c89e66b43eb4 Mon Sep 17 00:00:00 2001 From: Edward Betts Date: Fri, 12 Jun 2026 16:30:35 +0100 Subject: [PATCH 2/4] Update tests/test_serial_posix.py Co-authored-by: puddly <32534428+puddly@users.noreply.github.com> --- tests/test_serial_posix.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_serial_posix.py b/tests/test_serial_posix.py index 22b5c9d..193a832 100644 --- a/tests/test_serial_posix.py +++ b/tests/test_serial_posix.py @@ -78,7 +78,7 @@ def ioctl(fd: int, request: int, arg: Any = 0, mutate_flag: bool = True) -> int: serial = PosixSerial(fileno=1) with patch("serialx.platforms.serial_posix.fcntl.ioctl", side_effect=ioctl): - serial._set_modem_pins( + serial.set_modem_pins( ModemPins( le=PinState.LOW, dtr=PinState.HIGH, From 1c0753527905dc30e81a64f362ed83b0e54c2b35 Mon Sep 17 00:00:00 2001 From: Edward Betts Date: Fri, 12 Jun 2026 16:30:42 +0100 Subject: [PATCH 3/4] Update tests/test_serial_posix.py Co-authored-by: puddly <32534428+puddly@users.noreply.github.com> --- tests/test_serial_posix.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_serial_posix.py b/tests/test_serial_posix.py index 193a832..4f6df57 100644 --- a/tests/test_serial_posix.py +++ b/tests/test_serial_posix.py @@ -4,7 +4,7 @@ import pytest -if sys.platform == "win32": +if sys.platform in ("win32", "emscripten"): pytest.skip("POSIX-only tests", allow_module_level=True) import array From 92743358cbc36bee65aa47b8bd487fc8b4c83064 Mon Sep 17 00:00:00 2001 From: Edward Betts Date: Fri, 12 Jun 2026 16:44:39 +0100 Subject: [PATCH 4/4] Fix code formatting. --- serialx/platforms/serial_posix.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/serialx/platforms/serial_posix.py b/serialx/platforms/serial_posix.py index 06e7945..95ba2a8 100644 --- a/serialx/platforms/serial_posix.py +++ b/serialx/platforms/serial_posix.py @@ -2,8 +2,8 @@ from __future__ import annotations -import asyncio import array +import asyncio import errno import fcntl import logging @@ -353,9 +353,7 @@ def _set_modem_pins(self, modem_pins: ModemPins) -> None: if all_pins_set: value = modem_pins_as_int(modem_pins) LOGGER.debug("Setting all with TIOCMSET: 0x%08X", value) - fcntl.ioctl( - self._fileno, termios.TIOCMSET, array.array("i", [value]) - ) + fcntl.ioctl(self._fileno, termios.TIOCMSET, array.array("i", [value])) else: to_set = modem_pins_mask_of_value(modem_pins, PinState.HIGH) to_clear = modem_pins_mask_of_value(modem_pins, PinState.LOW)