diff --git a/hathorlib/headers/nano_header.py b/hathorlib/headers/nano_header.py index d14404b..082ca54 100644 --- a/hathorlib/headers/nano_header.py +++ b/hathorlib/headers/nano_header.py @@ -22,14 +22,15 @@ from hathorlib.headers.types import VertexHeaderId from hathorlib.nanocontracts import DeprecatedNanoContract from hathorlib.nanocontracts.types import NCActionType -from hathorlib.utils import int_to_bytes, unpack, unpack_len +from hathorlib.utils import decode_unsigned, encode_unsigned, int_to_bytes, unpack, unpack_len if TYPE_CHECKING: from hathorlib.base_transaction import BaseTransaction -NC_VERSION = 1 NC_INITIALIZE_METHOD = 'initialize' ADDRESS_LEN_BYTES = 25 +ADDRESS_SEQNUM_SIZE: int = 8 # bytes +_NC_SCRIPT_LEN_MAX_BYTES: int = 2 @dataclass(frozen=True) @@ -43,6 +44,9 @@ class NanoHeaderAction: class NanoHeader(VertexBaseHeader): tx: BaseTransaction + # Sequence number for the caller. + nc_seqnum: int + # nc_id equals to the blueprint_id when a Nano Contract is being created. # nc_id equals to the nanocontract_id when a method is being called. nc_id: bytes @@ -59,8 +63,6 @@ class NanoHeader(VertexBaseHeader): nc_address: bytes nc_script: bytes - nc_version: int = NC_VERSION - @classmethod def _deserialize_action(cls, buf: bytes) -> tuple[NanoHeaderAction, bytes]: from hathorlib.base_transaction import bytes_to_output_value @@ -78,11 +80,9 @@ def _deserialize_action(cls, buf: bytes) -> tuple[NanoHeaderAction, bytes]: def deserialize(cls, tx: BaseTransaction, buf: bytes) -> tuple[NanoHeader, bytes]: header_id, buf = buf[:1], buf[1:] assert header_id == VertexHeaderId.NANO_HEADER.value - (nc_version,), buf = unpack('!B', buf) - if nc_version != NC_VERSION: - raise ValueError('unknown nanocontract version: {}'.format(nc_version)) nc_id, buf = unpack_len(32, buf) + nc_seqnum, buf = decode_unsigned(buf, max_bytes=ADDRESS_SEQNUM_SIZE) (nc_method_len,), buf = unpack('!B', buf) nc_method, buf = unpack_len(nc_method_len, buf) (nc_args_bytes_len,), buf = unpack('!H', buf) @@ -96,14 +96,14 @@ def deserialize(cls, tx: BaseTransaction, buf: bytes) -> tuple[NanoHeader, bytes nc_actions.append(action) nc_address, buf = unpack_len(ADDRESS_LEN_BYTES, buf) - (nc_script_len,), buf = unpack('!H', buf) + nc_script_len, buf = decode_unsigned(buf, max_bytes=_NC_SCRIPT_LEN_MAX_BYTES) nc_script, buf = unpack_len(nc_script_len, buf) decoded_nc_method = nc_method.decode('ascii') return cls( tx=tx, - nc_version=nc_version, + nc_seqnum=nc_seqnum, nc_id=nc_id, nc_method=decoded_nc_method, nc_args_bytes=nc_args_bytes, @@ -127,8 +127,8 @@ def _serialize_without_header_id(self, *, skip_signature: bool) -> deque[bytes]: encoded_method = self.nc_method.encode('ascii') ret: deque[bytes] = deque() - ret.append(int_to_bytes(NC_VERSION, 1)) ret.append(self.nc_id) + ret.append(encode_unsigned(self.nc_seqnum, max_bytes=ADDRESS_SEQNUM_SIZE)) ret.append(int_to_bytes(len(encoded_method), 1)) ret.append(encoded_method) ret.append(int_to_bytes(len(self.nc_args_bytes), 2)) @@ -141,10 +141,10 @@ def _serialize_without_header_id(self, *, skip_signature: bool) -> deque[bytes]: ret.append(self.nc_address) if not skip_signature: - ret.append(int_to_bytes(len(self.nc_script), 2)) + ret.append(encode_unsigned(len(self.nc_script), max_bytes=_NC_SCRIPT_LEN_MAX_BYTES)) ret.append(self.nc_script) else: - ret.append(int_to_bytes(0, 2)) + ret.append(encode_unsigned(0, max_bytes=_NC_SCRIPT_LEN_MAX_BYTES)) return ret def serialize(self) -> bytes: diff --git a/hathorlib/nanocontracts/types.py b/hathorlib/nanocontracts/types.py index 63dacb1..15341ba 100644 --- a/hathorlib/nanocontracts/types.py +++ b/hathorlib/nanocontracts/types.py @@ -25,7 +25,7 @@ class NCActionType(Enum): DEPOSIT = 1 WITHDRAWAL = 2 GRANT_AUTHORITY = 3 - INVOKE_AUTHORITY = 4 + ACQUIRE_AUTHORITY = 4 def __str__(self) -> str: return self.name.lower() diff --git a/hathorlib/serialization/__init__.py b/hathorlib/serialization/__init__.py new file mode 100644 index 0000000..65e1626 --- /dev/null +++ b/hathorlib/serialization/__init__.py @@ -0,0 +1,27 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .deserializer import Deserializer +from .exceptions import BadDataError, OutOfDataError, SerializationError, TooLongError, UnsupportedTypeError +from .serializer import Serializer + +__all__ = [ + 'Serializer', + 'Deserializer', + 'SerializationError', + 'UnsupportedTypeError', + 'TooLongError', + 'OutOfDataError', + 'BadDataError', +] diff --git a/hathorlib/serialization/adapters/__init__.py b/hathorlib/serialization/adapters/__init__.py new file mode 100644 index 0000000..88a7d94 --- /dev/null +++ b/hathorlib/serialization/adapters/__init__.py @@ -0,0 +1,10 @@ +from .generic_adapter import GenericDeserializerAdapter, GenericSerializerAdapter +from .max_bytes import MaxBytesDeserializer, MaxBytesExceededError, MaxBytesSerializer + +__all__ = [ + 'GenericDeserializerAdapter', + 'GenericSerializerAdapter', + 'MaxBytesDeserializer', + 'MaxBytesExceededError', + 'MaxBytesSerializer', +] diff --git a/hathorlib/serialization/adapters/generic_adapter.py b/hathorlib/serialization/adapters/generic_adapter.py new file mode 100644 index 0000000..c7f7d58 --- /dev/null +++ b/hathorlib/serialization/adapters/generic_adapter.py @@ -0,0 +1,110 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from types import TracebackType +from typing import Generic, TypeVar, Union + +from typing_extensions import Self, override + +from hathorlib.serialization.deserializer import Deserializer +from hathorlib.serialization.serializer import Serializer + +from ..types import Buffer + +S = TypeVar('S', bound=Serializer) +D = TypeVar('D', bound=Deserializer) + + +class GenericSerializerAdapter(Serializer, Generic[S]): + inner: S + + def __init__(self, serializer: S) -> None: + self.inner = serializer + + @override + def finalize(self) -> Buffer: + return self.inner.finalize() + + @override + def cur_pos(self) -> int: + return self.inner.cur_pos() + + @override + def write_byte(self, data: int) -> None: + self.inner.write_byte(data) + + @override + def write_bytes(self, data: Buffer) -> None: + self.inner.write_bytes(data) + + # allow using this adapter as a context manager: + + def __enter__(self) -> Self: + return self + + def __exit__( + self, + exc_type: Union[type[BaseException], None], + exc_value: Union[BaseException, None], + traceback: Union[TracebackType, None], + ) -> None: + pass + + +class GenericDeserializerAdapter(Deserializer, Generic[D]): + inner: D + + def __init__(self, deserializer: D) -> None: + self.inner = deserializer + + @override + def finalize(self) -> None: + return self.inner.finalize() + + @override + def is_empty(self) -> bool: + return self.inner.is_empty() + + @override + def peek_byte(self) -> int: + return self.inner.peek_byte() + + @override + def peek_bytes(self, n: int, *, exact: bool = True) -> Buffer: + return self.inner.peek_bytes(n, exact=exact) + + @override + def read_byte(self) -> int: + return self.inner.read_byte() + + @override + def read_bytes(self, n: int, *, exact: bool = True) -> Buffer: + return self.inner.read_bytes(n, exact=exact) + + @override + def read_all(self) -> Buffer: + return self.inner.read_all() + + # allow using this adapter as a context manager: + + def __enter__(self) -> Self: + return self + + def __exit__( + self, + exc_type: Union[type[BaseException], None], + exc_value: Union[BaseException, None], + traceback: Union[TracebackType, None], + ) -> None: + pass diff --git a/hathorlib/serialization/adapters/max_bytes.py b/hathorlib/serialization/adapters/max_bytes.py new file mode 100644 index 0000000..f435d61 --- /dev/null +++ b/hathorlib/serialization/adapters/max_bytes.py @@ -0,0 +1,91 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import TypeVar + +from typing_extensions import override + +from hathorlib.serialization.deserializer import Deserializer +from hathorlib.serialization.exceptions import SerializationError +from hathorlib.serialization.serializer import Serializer + +from ..types import Buffer +from .generic_adapter import GenericDeserializerAdapter, GenericSerializerAdapter + +S = TypeVar('S', bound=Serializer) +D = TypeVar('D', bound=Deserializer) + + +class MaxBytesExceededError(SerializationError): + """ This error is raised when the adapted serializer reached its maximum bytes write/read. + + After this exception is raised the adapted serializer cannot be used anymore. Handlers of this exception are + expected to either: bubble up the exception (or an equivalente exception), or return an error. Handlers should not + try to write again on the same serializer. + + It is possible that the inner serializer is still usable, but the point where the serialized stopped writing or + reading might leave the rest of the data unusable, so for that reason it should be considered a failed + (de)serialization overall, and not simply a failed "read/write" operation. + """ + pass + + +class MaxBytesSerializer(GenericSerializerAdapter[S]): + def __init__(self, serializer: S, max_bytes: int) -> None: + super().__init__(serializer) + self._bytes_left = max_bytes + + def _check_update_exceeds(self, write_size: int) -> None: + self._bytes_left -= write_size + if self._bytes_left < 0: + raise MaxBytesExceededError + + @override + def write_byte(self, data: int) -> None: + self._check_update_exceeds(1) + super().write_byte(data) + + @override + def write_bytes(self, data: Buffer) -> None: + data_view = memoryview(data) + self._check_update_exceeds(len(data_view)) + super().write_bytes(data_view) + + +class MaxBytesDeserializer(GenericDeserializerAdapter[D]): + def __init__(self, deserializer: D, max_bytes: int) -> None: + super().__init__(deserializer) + self._bytes_left = max_bytes + + def _check_update_exceeds(self, read_size: int) -> None: + self._bytes_left -= read_size + if self._bytes_left < 0: + raise MaxBytesExceededError + + @override + def read_byte(self) -> int: + self._check_update_exceeds(1) + return super().read_byte() + + @override + def read_bytes(self, n: int, *, exact: bool = True) -> Buffer: + self._check_update_exceeds(n) + return super().read_bytes(n, exact=exact) + + @override + def read_all(self) -> Buffer: + result = super().read_bytes(self._bytes_left, exact=False) + if not self.is_empty(): + raise MaxBytesExceededError + return result diff --git a/hathorlib/serialization/bytes_deserializer.py b/hathorlib/serialization/bytes_deserializer.py new file mode 100644 index 0000000..1a26ec7 --- /dev/null +++ b/hathorlib/serialization/bytes_deserializer.py @@ -0,0 +1,76 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing_extensions import override + +from .deserializer import Deserializer +from .exceptions import OutOfDataError +from .types import Buffer + +_EMPTY_VIEW = memoryview(b'') + + +class BytesDeserializer(Deserializer): + """Simple implementation of a Deserializer to parse values from a byte sequence. + + This implementation maintains a memoryview that is shortened as the bytes are read. + """ + + def __init__(self, data: Buffer) -> None: + self._view = memoryview(data) + + @override + def finalize(self) -> None: + if not self.is_empty(): + raise ValueError('trailing data') + del self._view + + @override + def is_empty(self) -> bool: + # XXX: least amount of OPs, "not" converts to bool with the correct semantics of "is empty" + return not self._view + + @override + def peek_byte(self) -> int: + if not len(self._view): + raise OutOfDataError('not enough bytes to read') + return self._view[0] + + @override + def peek_bytes(self, n: int, *, exact: bool = True) -> memoryview: + if n < 0: + raise ValueError('value cannot be negative') + if exact and len(self._view) < n: + raise OutOfDataError('not enough bytes to read') + return self._view[:n] + + @override + def read_byte(self) -> int: + b = self.peek_byte() + self._view = self._view[1:] + return b + + @override + def read_bytes(self, n: int, *, exact: bool = True) -> memoryview: + b = self.peek_bytes(n, exact=exact) + if exact and len(self._view) < n: + raise OutOfDataError('not enough bytes to read') + self._view = self._view[n:] + return b + + @override + def read_all(self) -> memoryview: + b = self._view + self._view = _EMPTY_VIEW + return b diff --git a/hathorlib/serialization/bytes_serializer.py b/hathorlib/serialization/bytes_serializer.py new file mode 100644 index 0000000..604b398 --- /dev/null +++ b/hathorlib/serialization/bytes_serializer.py @@ -0,0 +1,53 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing_extensions import override + +from .serializer import Serializer +from .types import Buffer + + +class BytesSerializer(Serializer): + """Simple implementation of Serializer to write to memory. + + This implementation defers joining everything until finalize is called, before that every write is stored as a + memoryview in a list. + """ + + def __init__(self) -> None: + self._parts: list[memoryview] = [] + self._pos: int = 0 + + @override + def finalize(self) -> memoryview: + result = memoryview(b''.join(self._parts)) + del self._parts + del self._pos + return result + + @override + def cur_pos(self) -> int: + return self._pos + + @override + def write_byte(self, data: int) -> None: + # int.to_bytes checks for correct range + self._parts.append(memoryview(int.to_bytes(data, length=1, byteorder='big'))) + self._pos += 1 + + @override + def write_bytes(self, data: Buffer) -> None: + part = memoryview(data) + self._parts.append(part) + self._pos += len(part) diff --git a/hathorlib/serialization/deserializer.py b/hathorlib/serialization/deserializer.py new file mode 100644 index 0000000..5aa4804 --- /dev/null +++ b/hathorlib/serialization/deserializer.py @@ -0,0 +1,109 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import struct +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, Iterator, TypeVar, overload + +from typing_extensions import Self + +from .types import Buffer + +if TYPE_CHECKING: + from .adapters import MaxBytesDeserializer + from .bytes_deserializer import BytesDeserializer + +T = TypeVar('T') + + +class Deserializer(ABC): + def finalize(self) -> None: + """Check that all bytes were consumed, the deserializer cannot be used after this.""" + raise TypeError('this deserializer does not support finalization') + + @staticmethod + def build_bytes_deserializer(data: Buffer) -> BytesDeserializer: + from .bytes_deserializer import BytesDeserializer + return BytesDeserializer(data) + + @abstractmethod + def is_empty(self) -> bool: + raise NotImplementedError + + @abstractmethod + def peek_byte(self) -> int: + """Read a single byte but don't consume from buffer.""" + raise NotImplementedError + + @abstractmethod + def peek_bytes(self, n: int, *, exact: bool = True) -> Buffer: + """Read n single byte but don't consume from buffer.""" + raise NotImplementedError + + def peek_struct(self, format: str) -> tuple[Any, ...]: + size = struct.calcsize(format) + data = self.peek_bytes(size) + return struct.unpack(format, data) + + @abstractmethod + def read_byte(self) -> int: + """Read a single byte as unsigned int.""" + raise NotImplementedError + + @abstractmethod + def read_bytes(self, n: int, *, exact: bool = True) -> Buffer: + """Read n bytes, when exact=True it errors if there isn't enough data""" + # XXX: this is a blanket implementation that is an example of the behavior, this implementation has to be + # explicitly used if needed + def iter_bytes() -> Iterator[int]: + for _ in range(n): + if not exact and self.is_empty(): + break + yield self.read_byte() + return bytes(iter_bytes()) + + @abstractmethod + def read_all(self) -> Buffer: + """Read all bytes until the reader is empty.""" + # XXX: it is recommended that implementors of Deserializer specialize this implementation + def iter_bytes() -> Iterator[int]: + while not self.is_empty(): + yield self.read_byte() + return bytes(iter_bytes()) + + def read_struct(self, format: str) -> tuple[Any, ...]: + size = struct.calcsize(format) + data = self.read_bytes(size) + return struct.unpack_from(format, data) + + def with_max_bytes(self, max_bytes: int) -> MaxBytesDeserializer[Self]: + """Helper method to wrap the current deserializer with MaxBytesDeserializer.""" + from .adapters import MaxBytesDeserializer + return MaxBytesDeserializer(self, max_bytes) + + @overload + def with_optional_max_bytes(self, max_bytes: None) -> Self: + ... + + @overload + def with_optional_max_bytes(self, max_bytes: int) -> MaxBytesDeserializer[Self]: + ... + + def with_optional_max_bytes(self, max_bytes: int | None) -> Self | MaxBytesDeserializer[Self]: + """Helper method to optionally wrap the current deserializer.""" + if max_bytes is None: + return self + return self.with_max_bytes(max_bytes) diff --git a/hathorlib/serialization/encoding/__init__.py b/hathorlib/serialization/encoding/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hathorlib/serialization/encoding/leb128.py b/hathorlib/serialization/encoding/leb128.py new file mode 100644 index 0000000..ef154f0 --- /dev/null +++ b/hathorlib/serialization/encoding/leb128.py @@ -0,0 +1,93 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module implements LEB128 for signed integers. + +LEB128 or Little Endian Base 128 is a variable-length code compression used to store arbitrarily large +integers in a small number of bytes. LEB128 is used in the DWARF debug file format and the WebAssembly +binary encoding for all integer literals. + +References: +- https://en.wikipedia.org/wiki/LEB128 +- https://dwarfstd.org/doc/DWARF5.pdf +- https://webassembly.github.io/spec/core/binary/values.html#integers + +This module implements LEB128 encoding/decoding using the standard 1-byte block split into 1-bit for continuation and +7-bits for data. The data can be either a signed or unsigned integer. + +>>> se = Serializer.build_bytes_serializer() +>>> se.write_bytes(b'test') # writes 74657374 +>>> encode_leb128(se, 0, signed=True) # writes 00 +>>> encode_leb128(se, 624485, signed=True) # writes e58e26 +>>> encode_leb128(se, -123456, signed=True) # writes c0bb78 +>>> bytes(se.finalize()).hex() +'7465737400e58e26c0bb78' + +>>> data = bytes.fromhex('00 e58e26 c0bb78 74657374') +>>> de = Deserializer.build_bytes_deserializer(data) +>>> decode_leb128(de, signed=True) # reads 00 +0 +>>> decode_leb128(de, signed=True) # reads e58e26 +624485 +>>> decode_leb128(de, signed=True) # reads c0bb78 +-123456 +>>> bytes(de.read_all()) # reads 74657374 +b'test' +>>> de.finalize() +""" + +from hathorlib.serialization import Deserializer, Serializer + + +def encode_leb128(serializer: Serializer, value: int, *, signed: bool) -> None: + """ Encodes an integer using LEB128. + + Caller must explicitly choose `signed=True` or `signed=False`. + + This module's docstring has more details on LEB128 and examples. + """ + if not signed and value < 0: + raise ValueError('cannot encode value <0 as unsigend') + while True: + byte = value & 0b0111_1111 + value >>= 7 + if signed: + cont = (value == 0 and (byte & 0b0100_0000) == 0) or (value == -1 and (byte & 0b0100_0000) != 0) + else: + cont = (value == 0 and (byte & 0b1000_0000) == 0) + if cont: + serializer.write_byte(byte) + break + serializer.write_byte(byte | 0b1000_0000) + + +def decode_leb128(deserializer: Deserializer, *, signed: bool) -> int: + """ Decodes a LEB128-encoded integer. + + Caller must explicitly choose `signed=True` or `signed=False`. + + This module's docstring has more details on LEB128 and examples. + """ + result = 0 + shift = 0 + while True: + byte = deserializer.read_byte() + result |= (byte & 0b0111_1111) << shift + shift += 7 + assert shift % 7 == 0 + if (byte & 0b1000_0000) == 0: + if signed and (byte & 0b0100_0000) != 0: + return result | -(1 << shift) + return result diff --git a/hathorlib/serialization/exceptions.py b/hathorlib/serialization/exceptions.py new file mode 100644 index 0000000..b35f3f8 --- /dev/null +++ b/hathorlib/serialization/exceptions.py @@ -0,0 +1,37 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import struct + +from hathorlib.exceptions import HathorError + + +class SerializationError(HathorError): + pass + + +class UnsupportedTypeError(SerializationError): + pass + + +class TooLongError(SerializationError): + pass + + +class OutOfDataError(SerializationError, struct.error): + pass + + +class BadDataError(SerializationError): + pass diff --git a/hathorlib/serialization/serializer.py b/hathorlib/serialization/serializer.py new file mode 100644 index 0000000..46d4135 --- /dev/null +++ b/hathorlib/serialization/serializer.py @@ -0,0 +1,78 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import struct +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, TypeVar, overload + +from typing_extensions import Self + +from .types import Buffer + +if TYPE_CHECKING: + from .adapters import MaxBytesSerializer + from .bytes_serializer import BytesSerializer + +T = TypeVar('T') + + +class Serializer(ABC): + def finalize(self) -> Buffer: + """Get the resulting byte sequence, the serializer cannot be reused after this.""" + raise TypeError('this serializer does not support finalization') + + @abstractmethod + def cur_pos(self) -> int: + raise NotImplementedError + + @abstractmethod + def write_byte(self, data: int) -> None: + """Write a single byte.""" + raise NotImplementedError + + @abstractmethod + def write_bytes(self, data: Buffer) -> None: + # XXX: it is recommended that implementors of Serializer specialize this implementation + for byte in bytes(memoryview(data)): + self.write_byte(byte) + + def write_struct(self, data: tuple[Any, ...], format: str) -> None: + data_bytes = struct.pack(format, *data) + self.write_bytes(data_bytes) + + def with_max_bytes(self, max_bytes: int) -> MaxBytesSerializer[Self]: + """Helper method to wrap the current serializer with MaxBytesSerializer.""" + from .adapters import MaxBytesSerializer + return MaxBytesSerializer(self, max_bytes) + + @overload + def with_optional_max_bytes(self, max_bytes: None) -> Self: + ... + + @overload + def with_optional_max_bytes(self, max_bytes: int) -> MaxBytesSerializer[Self]: + ... + + def with_optional_max_bytes(self, max_bytes: int | None) -> Self | MaxBytesSerializer[Self]: + """Helper method to optionally wrap the current serializer.""" + if max_bytes is None: + return self + return self.with_max_bytes(max_bytes) + + @staticmethod + def build_bytes_serializer() -> BytesSerializer: + from .bytes_serializer import BytesSerializer + return BytesSerializer() diff --git a/hathorlib/serialization/types.py b/hathorlib/serialization/types.py new file mode 100644 index 0000000..4455b4f --- /dev/null +++ b/hathorlib/serialization/types.py @@ -0,0 +1,19 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Union + +from typing_extensions import TypeAlias + +Buffer: TypeAlias = Union[bytes, memoryview] diff --git a/hathorlib/utils.py b/hathorlib/utils.py index 6627568..c41e75e 100644 --- a/hathorlib/utils.py +++ b/hathorlib/utils.py @@ -7,7 +7,7 @@ import hashlib import re import struct -from typing import Any, Tuple, cast +from typing import Any, Tuple, Union, cast import base58 from cryptography.hazmat.primitives.asymmetric import ec @@ -15,6 +15,9 @@ from hathorlib.conf import HathorSettings from hathorlib.exceptions import InvalidAddress +from hathorlib.serialization import Deserializer, SerializationError, Serializer +from hathorlib.serialization.adapters import MaxBytesExceededError +from hathorlib.serialization.encoding.leb128 import decode_leb128, encode_leb128 settings = HathorSettings() @@ -205,3 +208,103 @@ def get_hash160(public_key_bytes: bytes) -> bytes: h = hashlib.new('ripemd160') h.update(key_hash.digest()) return h.digest() + + +def encode_signed(value: int, *, max_bytes: Union[int, None] = None) -> bytes: + """ + Receive a signed integer and return its LEB128-encoded bytes. + + >>> encode_signed(0) == bytes([0x00]) + True + >>> encode_signed(624485) == bytes([0xE5, 0x8E, 0x26]) + True + >>> encode_signed(-123456) == bytes([0xC0, 0xBB, 0x78]) + True + """ + serializer: Serializer = Serializer.build_bytes_serializer() + try: + encode_leb128(serializer.with_optional_max_bytes(max_bytes), value, signed=True) + except MaxBytesExceededError as e: + raise ValueError(f'cannot encode more than {max_bytes} bytes') from e + except SerializationError as e: + raise ValueError('serialization error') from e + return bytes(serializer.finalize()) + + +def encode_unsigned(value: int, *, max_bytes: Union[int, None] = None) -> bytes: + """ + Receive an unsigned integer and return its LEB128-encoded bytes. + + >>> encode_unsigned(0) == bytes([0x00]) + True + >>> encode_unsigned(624485) == bytes([0xE5, 0x8E, 0x26]) + True + """ + serializer: Serializer = Serializer.build_bytes_serializer() + try: + encode_leb128(serializer.with_optional_max_bytes(max_bytes), value, signed=False) + except MaxBytesExceededError as e: + raise ValueError(f'cannot encode more than {max_bytes} bytes') from e + except SerializationError as e: + raise ValueError('serialization error') from e + return bytes(serializer.finalize()) + + +def decode_signed(data: bytes, *, max_bytes: Union[int, None] = None) -> tuple[int, bytes]: + """ + Receive and consume a buffer returning a tuple of the unpacked + LEB128-encoded signed integer and the reamining buffer. + + >>> decode_signed(bytes([0x00]) + b'test') + (0, b'test') + >>> decode_signed(bytes([0xE5, 0x8E, 0x26]) + b'test') + (624485, b'test') + >>> decode_signed(bytes([0xC0, 0xBB, 0x78]) + b'test') + (-123456, b'test') + >>> decode_signed(bytes([0xC0, 0xBB, 0x78]) + b'test', max_bytes=3) + (-123456, b'test') + >>> try: + ... decode_signed(bytes([0xC0, 0xBB, 0x78]) + b'test', max_bytes=2) + ... except ValueError as e: + ... print(e) + cannot decode more than 2 bytes + """ + deserializer = Deserializer.build_bytes_deserializer(data) + try: + value = decode_leb128(deserializer.with_optional_max_bytes(max_bytes), signed=True) + except MaxBytesExceededError as e: + raise ValueError(f'cannot decode more than {max_bytes} bytes') from e + except SerializationError as e: + raise ValueError('deserialization error') from e + remaining_data = bytes(deserializer.read_all()) + deserializer.finalize() + return (value, remaining_data) + + +def decode_unsigned(data: bytes, *, max_bytes: Union[int, None] = None) -> tuple[int, bytes]: + """ + Receive and consume a buffer returning a tuple of the unpacked + LEB128-encoded unsigned integer and the reamining buffer. + + >>> decode_unsigned(bytes([0x00]) + b'test') + (0, b'test') + >>> decode_unsigned(bytes([0xE5, 0x8E, 0x26]) + b'test') + (624485, b'test') + >>> decode_unsigned(bytes([0xE5, 0x8E, 0x26]) + b'test', max_bytes=3) + (624485, b'test') + >>> try: + ... decode_unsigned(bytes([0xE5, 0x8E, 0x26]) + b'test', max_bytes=2) + ... except ValueError as e: + ... print(e) + cannot decode more than 2 bytes + """ + deserializer = Deserializer.build_bytes_deserializer(data) + try: + value = decode_leb128(deserializer.with_optional_max_bytes(max_bytes), signed=False) + except MaxBytesExceededError as e: + raise ValueError(f'cannot decode more than {max_bytes} bytes') from e + except SerializationError as e: + raise ValueError('deserialization error') from e + remaining_data = bytes(deserializer.read_all()) + deserializer.finalize() + return (value, remaining_data) diff --git a/pyproject.toml b/pyproject.toml index 25e2eb8..471710a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ [tool.poetry] name = "hathorlib" -version = "0.9.1" +version = "0.10.0" description = "Hathor Network base objects library" authors = ["Hathor Team "] license = "Apache-2.0" diff --git a/tests/test_nanocontract.py b/tests/test_nanocontract.py index 6a324ba..cfda8dc 100644 --- a/tests/test_nanocontract.py +++ b/tests/test_nanocontract.py @@ -18,6 +18,7 @@ def _get_nc(self) -> Transaction: nc.timestamp = 123456 nano_header = NanoHeader( tx=nc, + nc_seqnum=123, nc_actions=[ NanoHeaderAction( type=NCActionType.DEPOSIT, @@ -48,7 +49,7 @@ def test_serialization(self) -> None: assert isinstance(nano_header1, NanoHeader) assert isinstance(nano_header2, NanoHeader) - self.assertEqual(nano_header1.nc_version, nano_header2.nc_version) + self.assertEqual(nano_header1.nc_seqnum, nano_header2.nc_seqnum) self.assertEqual(nano_header1.nc_id, nano_header2.nc_id) self.assertEqual(nano_header1.nc_method, nano_header2.nc_method) self.assertEqual(nano_header1.nc_args_bytes, nano_header2.nc_args_bytes) @@ -65,7 +66,7 @@ def test_serialization_skip_signature(self) -> None: assert isinstance(deserialized, NanoHeader) assert len(buf) == 0 - assert deserialized.nc_version == nano_header.nc_version + assert deserialized.nc_seqnum == nano_header.nc_seqnum assert deserialized.nc_id == nano_header.nc_id assert deserialized.nc_method == nano_header.nc_method assert deserialized.nc_args_bytes == nano_header.nc_args_bytes