Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 60 additions & 14 deletions src/smpclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@
from typing import Final, TypeVar

from pydantic import ValidationError
from pydantic_core import ErrorDetails
from smp import header as smpheader
from smp import message as smpmsg
from typing_extensions import assert_never

from smpclient.exceptions import SMPBadSequence, SMPUploadError
from smpclient.exceptions import SMPBadSequence, SMPUploadError, SMPValidationException
from smpclient.generics import SMPRequest, TEr1, TEr2, TRep, error, success
from smpclient.requests.file_management import FileDownload, FileUpload
from smpclient.requests.image_management import ImageUploadWrite
Expand All @@ -66,6 +67,51 @@
"""A single-shot upload request whose `data` field is filled to maximize throughput."""


def _hexdump(frame: bytes) -> str:
"""Format `frame` as an offset/hex/printable-ASCII dump for readable debug logging."""

def row(offset: int) -> str:
chunk: Final = frame[offset : offset + 16]
columns: Final = " ".join(f"{byte:02x}" for byte in chunk)
printable: Final = "".join(chr(byte) if 0x20 <= byte <= 0x7E else "." for byte in chunk)
return f"\t{offset:04x} {columns:<47} {printable}"

return "\n".join(row(offset) for offset in range(0, len(frame), 16))


def _format_validation_error(error: ValidationError) -> str:
def row(detail: ErrorDetails) -> str:
location: Final = ".".join(str(part) for part in detail["loc"])
return f"\t\t[{detail['type']}] {detail['msg']}: {location}; input: {detail['input']}"

return "\n".join(row(detail) for detail in error.errors())


def _validation_failure(
header: smpheader.Header,
frame: bytes,
errors: tuple[tuple[type[smpmsg.Response], ValidationError], ...],
) -> tuple[str, str]:
"""Return the `(summary, details)` describing why `frame` matched none of `errors`' types."""
summary: Final = (
"\nFrame could not be parsed as any of:\n"
f"\t{[response.__name__ for response, _ in errors]}\n"
)
Comment on lines +96 to +99
details: Final = "\n".join(
(
f"Header:\n\t{header}",
f"Frame:\n{_hexdump(frame)}",
"Errors:",
*(
f"\tCould not be parsed as {response.__name__} because "
f"{len(error.errors())} error(s):\n{_format_validation_error(error)}"
for response, error in errors
),
)
)
return summary, details


class SMPClient:
"""Create a client to the SMP server `address`, using `transport`.

Expand Down Expand Up @@ -137,7 +183,7 @@ async def request(
Raises:
TimeoutError: if the request times out
SMPBadSequence: if the response sequence does not match the request sequence
ValidationError: if the response cannot be parsed as a Response or Error
SMPValidationException: if the response cannot be parsed as a Response or Error

Examples:
Usage:
Expand Down Expand Up @@ -194,23 +240,23 @@ async def request(
f"Bad sequence {header.sequence}, expected {request.header.sequence}"
)

errors: list[tuple[type[smpmsg.Response], ValidationError]] = []
try:
return request._Response.loads(frame) # type: ignore
except ValidationError:
pass
return request._Response.loads(frame) # type: ignore[return-value]
except ValidationError as error:
errors.append((request._Response, error))
try:
return request._ErrorV1.loads(frame)
except ValidationError:
pass
except ValidationError as error:
errors.append((request._ErrorV1, error))
try:
return request._ErrorV2.loads(frame)
except ValidationError:
error_message = (
f"Response could not by parsed as one of {request._Response}, "
f"{request._ErrorV1}, or {request._ErrorV2}. {header=} {frame=}"
)
logger.error(error_message)
raise ValidationError(error_message)
except ValidationError as error:
errors.append((request._ErrorV2, error))

summary, details = _validation_failure(header, frame, tuple(errors))
logger.error(summary + details)
raise SMPValidationException(summary, details) from None
Comment on lines +257 to +259

async def upload(
self,
Expand Down
7 changes: 7 additions & 0 deletions src/smpclient/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,10 @@ class SMPBadSequence(SMPClientException): ...


class SMPUploadError(SMPClientException): ...


class SMPValidationException(SMPClientException):
def __init__(self, msg: str, details: str) -> None:
self.msg: str = msg
self.details: str = details
super().__init__(msg)
22 changes: 21 additions & 1 deletion tests/test_smp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
)

from smpclient import SMPClient
from smpclient.exceptions import SMPBadSequence, SMPUploadError
from smpclient.exceptions import SMPBadSequence, SMPUploadError, SMPValidationException
from smpclient.generics import error, error_v1, error_v2, success
from smpclient.requests.file_management import FileDownload, FileUpload
from smpclient.requests.image_management import ImageUploadWrite
Expand Down Expand Up @@ -165,6 +165,26 @@ async def test_request() -> None:
raise AssertionError(f"Unexpected response type: {type(rep)}")


@pytest.mark.asyncio
async def test_request_unparseable_frame() -> None:
"""A frame matching none of the Response/ErrorV1/ErrorV2 types raises with diagnostics."""
m = SMPMockTransport()
s = SMPClient(m, "address")

req = ResetWrite()
# A frame from a different group/command can be parsed as none of `ResetWrite`'s types.
m.receive.return_value = ImageUploadWriteResponse(sequence=req.header.sequence, off=0).BYTES

with pytest.raises(SMPValidationException) as exc_info:
await s.request(req)

assert req._Response.__name__ in exc_info.value.msg
assert "Header:" in exc_info.value.details
assert "Frame:" in exc_info.value.details
assert req._ErrorV1.__name__ in exc_info.value.details
assert req._ErrorV2.__name__ in exc_info.value.details


@pytest.mark.asyncio
async def test_upload() -> None:
m = SMPMockTransport()
Expand Down
Loading