|
1 | 1 | from __future__ import annotations |
2 | 2 | import numpy as np |
3 | | -from struct import pack, unpack_from |
| 3 | +import struct |
4 | 4 |
|
5 | 5 |
|
6 | 6 | class HalfVector: |
7 | 7 | def __init__(self, value: list[float] | np.ndarray[tuple[int], np.dtype[np.floating]]) -> None: |
8 | | - # asarray still copies if same dtype |
9 | | - if not isinstance(value, np.ndarray) or value.dtype != '>f2': |
10 | | - value = np.asarray(value, dtype='>f2') |
11 | | - |
12 | | - # for mypy |
13 | | - assert isinstance(value, np.ndarray) |
14 | | - |
15 | | - if value.ndim != 1: |
16 | | - raise ValueError('expected ndim to be 1') |
17 | | - |
18 | | - # atleast_1d for ty |
19 | | - self._value = np.atleast_1d(value) |
| 8 | + if isinstance(value, list): |
| 9 | + dim = len(value) |
| 10 | + try: |
| 11 | + self._value = struct.pack(f'>HH{dim}e', dim, 0, *value) |
| 12 | + except struct.error as e: |
| 13 | + raise ValueError('expected list[float]') |
| 14 | + elif isinstance(value, np.ndarray): |
| 15 | + if value.ndim != 1: |
| 16 | + raise ValueError('expected ndim to be 1') |
| 17 | + |
| 18 | + # asarray still copies if same dtype |
| 19 | + if value.dtype != '>f2': |
| 20 | + value = np.asarray(value, dtype='>f2') |
| 21 | + |
| 22 | + self._value = struct.pack('>HH', value.shape[0], 0) + value.tobytes() |
| 23 | + else: |
| 24 | + raise ValueError('expected list or ndarray') |
20 | 25 |
|
21 | 26 | def __repr__(self) -> str: |
22 | 27 | return f'HalfVector({self.to_list()})' |
23 | 28 |
|
24 | 29 | def __eq__(self, other: object) -> bool: |
25 | 30 | if isinstance(other, self.__class__): |
26 | | - return np.array_equal(self.to_numpy(), other.to_numpy()) |
| 31 | + return self.to_binary() == other.to_binary() |
27 | 32 | return False |
28 | 33 |
|
29 | 34 | def dimensions(self) -> int: |
30 | | - return len(self._value) |
| 35 | + dim, = struct.unpack_from('>H', self._value) |
| 36 | + return dim |
31 | 37 |
|
32 | 38 | def to_list(self) -> list[float]: |
33 | | - return self._value.tolist() |
| 39 | + return list(struct.unpack_from(f'>{self.dimensions()}e', self._value[4:])) |
34 | 40 |
|
35 | 41 | def to_numpy(self) -> np.ndarray[tuple[int], np.dtype[np.float16]]: |
36 | 42 | # TODO return native endian |
37 | | - return self._value |
| 43 | + return np.frombuffer(self._value, dtype='>f2', count=self.dimensions(), offset=4) |
38 | 44 |
|
39 | 45 | def to_text(self) -> str: |
40 | | - return '[' + ','.join([str(float(v)) for v in self._value]) + ']' |
| 46 | + return '[' + ','.join([str(v) for v in self.to_list()]) + ']' |
41 | 47 |
|
42 | 48 | def to_binary(self) -> bytes: |
43 | | - return pack('>HH', self.dimensions(), 0) + self._value.tobytes() |
| 49 | + return self._value |
44 | 50 |
|
45 | 51 | @classmethod |
46 | 52 | def from_text(cls, value: str) -> HalfVector: |
47 | 53 | return cls([float(v) for v in value[1:-1].split(',')]) |
48 | 54 |
|
49 | 55 | @classmethod |
50 | 56 | def from_binary(cls, value: bytes) -> HalfVector: |
51 | | - dim, unused = unpack_from('>HH', value) |
52 | | - return cls(np.frombuffer(value, dtype='>f2', count=dim, offset=4)) |
| 57 | + # TODO check dimensions/length and unused |
| 58 | + vec = cls.__new__(cls) |
| 59 | + vec._value = value |
| 60 | + return vec |
53 | 61 |
|
54 | 62 | @classmethod |
55 | 63 | def _to_db(cls, value: object, dim: int | None = None) -> str | None: |
|
0 commit comments