|
| 1 | +""" |
| 2 | +This module defines the [`PickBestCodec`][numcodecs_combinators.best.PickBestCodec] class, which picks the codec that encoded the data best. |
| 3 | +""" |
| 4 | + |
| 5 | +__all__ = ["PickBestCodec"] |
| 6 | + |
| 7 | +from io import BytesIO |
| 8 | +from typing import Callable, Optional |
| 9 | + |
| 10 | +import numcodecs |
| 11 | +import numcodecs.compat |
| 12 | +import numcodecs.registry |
| 13 | +import numpy as np |
| 14 | +import varint |
| 15 | +from numcodecs.abc import Codec |
| 16 | +from typing_extensions import Buffer, Self # MSPV 3.12 |
| 17 | + |
| 18 | +from .abc import CodecCombinatorMixin |
| 19 | + |
| 20 | + |
| 21 | +class PickBestCodec(Codec, CodecCombinatorMixin, tuple[Codec]): |
| 22 | + """ |
| 23 | + A codec that tries encoding with all combined codecs and then picks the one with the fewest bytes. |
| 24 | +
|
| 25 | + The inner codecs must all encode to 1D byte arrays. To use a codec not |
| 26 | + encoding to bytes with this combinator, you can wrap it using |
| 27 | + [`FramedCodecStack(codec)`][numcodecs_combinators.framed.FramedCodecStack] |
| 28 | + combinator. |
| 29 | +
|
| 30 | + This combinator uses the ULEB128 variable length integer encoding to encode |
| 31 | + the index of the codec that was chosen to encode and uses this index as a |
| 32 | + header before the encoded bytes. The header index is only included if this |
| 33 | + combinator wraps at least two codecs. If this combinator wraps zero codecs, |
| 34 | + it passes the original data through unchanged. |
| 35 | + """ |
| 36 | + |
| 37 | + __slots__ = () |
| 38 | + |
| 39 | + codec_id: str = "combinators.best" # type: ignore |
| 40 | + |
| 41 | + def __init__(self, *args: dict | Codec): |
| 42 | + pass |
| 43 | + |
| 44 | + def __new__(cls, *args: dict | Codec) -> Self: |
| 45 | + return super(PickBestCodec, cls).__new__( |
| 46 | + cls, |
| 47 | + tuple( |
| 48 | + codec |
| 49 | + if isinstance(codec, Codec) |
| 50 | + else numcodecs.registry.get_codec(codec) |
| 51 | + for codec in args |
| 52 | + ), |
| 53 | + ) |
| 54 | + |
| 55 | + def encode(self, buf: Buffer) -> bytes: |
| 56 | + """Encode the data in `buf`. |
| 57 | +
|
| 58 | + Parameters |
| 59 | + ---------- |
| 60 | + buf : Buffer |
| 61 | + Data to be encoded. May be any object supporting the new-style |
| 62 | + buffer protocol. |
| 63 | +
|
| 64 | + Returns |
| 65 | + ------- |
| 66 | + enc : bytes |
| 67 | + Encoded and data as a bytestring. |
| 68 | + """ |
| 69 | + |
| 70 | + if len(self) == 0: |
| 71 | + return buf |
| 72 | + |
| 73 | + data = numcodecs.compat.ensure_ndarray(buf) |
| 74 | + |
| 75 | + best_size = np.inf |
| 76 | + best_index = None |
| 77 | + best_encoded = None |
| 78 | + |
| 79 | + for i, codec in enumerate(self): |
| 80 | + encoded = numcodecs.compat.ensure_ndarray(codec.encode(np.copy(data))) |
| 81 | + assert encoded.dtype == np.dtype("uint8"), ( |
| 82 | + f"codec best[{i}] must encode to bytes" |
| 83 | + ) |
| 84 | + assert encoded.ndim <= 1, f"codec best[{i}] must encode to 1D bytes" |
| 85 | + |
| 86 | + if encoded.nbytes < best_size: |
| 87 | + best_size = encoded.nbytes |
| 88 | + best_index = i |
| 89 | + best_encoded = encoded |
| 90 | + |
| 91 | + encoded_index = varint.encode(best_index) |
| 92 | + encoded_bytes = numcodecs.compat.ensure_bytes(best_encoded) |
| 93 | + |
| 94 | + if len(self) == 1: |
| 95 | + return encoded_bytes |
| 96 | + |
| 97 | + return encoded_index + encoded_bytes |
| 98 | + |
| 99 | + def decode(self, buf: Buffer, out: Optional[Buffer] = None) -> Buffer: |
| 100 | + """Decode the data in `buf`. |
| 101 | +
|
| 102 | + Parameters |
| 103 | + ---------- |
| 104 | + buf : Buffer |
| 105 | + Encoded data. Must be an object representing a bytestring, e.g. |
| 106 | + [`bytes`][bytes] or a 1D array of [`np.uint8`][numpy.uint8]s etc. |
| 107 | + out : Buffer, optional |
| 108 | + Writeable buffer to store decoded data. N.B. if provided, this buffer must |
| 109 | + be exactly the right size to store the decoded data. |
| 110 | +
|
| 111 | + Returns |
| 112 | + ------- |
| 113 | + dec : Buffer |
| 114 | + Decoded data. May be any object supporting the new-style |
| 115 | + buffer protocol. |
| 116 | + """ |
| 117 | + |
| 118 | + if len(self) == 0: |
| 119 | + return numcodecs.compat.ndarray_copy(buf, out) |
| 120 | + |
| 121 | + b = numcodecs.compat.ensure_bytes(buf) |
| 122 | + b_io = BytesIO(b) |
| 123 | + |
| 124 | + if len(self) == 1: |
| 125 | + best_index = 0 |
| 126 | + else: |
| 127 | + best_index = varint.decode_stream(b_io) |
| 128 | + |
| 129 | + return self[best_index].decode(b_io.read(), out=out) |
| 130 | + |
| 131 | + def get_config(self) -> dict: |
| 132 | + """ |
| 133 | + Returns the configuration of the best codec combinator. |
| 134 | +
|
| 135 | + [`numcodecs.registry.get_codec(config)`][numcodecs.registry.get_codec] |
| 136 | + can be used to reconstruct this combinator from the returned config. |
| 137 | +
|
| 138 | + Returns |
| 139 | + ------- |
| 140 | + config : dict |
| 141 | + Configuration of the best codec combinator. |
| 142 | + """ |
| 143 | + |
| 144 | + return dict( |
| 145 | + id=type(self).codec_id, |
| 146 | + codecs=tuple(codec.get_config() for codec in self), |
| 147 | + ) |
| 148 | + |
| 149 | + @classmethod |
| 150 | + def from_config(cls, config: dict) -> Self: |
| 151 | + """ |
| 152 | + Instantiate the best codec combinator from a configuration [`dict`][dict]. |
| 153 | +
|
| 154 | + Parameters |
| 155 | + ---------- |
| 156 | + config : dict |
| 157 | + Configuration of the best codec combinator. |
| 158 | +
|
| 159 | + Returns |
| 160 | + ------- |
| 161 | + best : PickBestCodec |
| 162 | + Instantiated best codec combinator. |
| 163 | + """ |
| 164 | + |
| 165 | + return cls(*config["codecs"]) |
| 166 | + |
| 167 | + def __repr__(self) -> str: |
| 168 | + repr = ", ".join(f"{codec!r}" for codec in self) |
| 169 | + |
| 170 | + return f"{type(self).__name__}({repr})" |
| 171 | + |
| 172 | + def map(self, mapper: Callable[[Codec], Codec]) -> "PickBestCodec": |
| 173 | + """ |
| 174 | + Apply the `mapper` to all codecs that are in this combinator. |
| 175 | + In the returned combinator, each codec is replaced by its mapped codec. |
| 176 | +
|
| 177 | + The `mapper` should recursively apply itself to any inner codecs that |
| 178 | + also implement the [`CodecCombinatorMixin`][numcodecs_combinators.abc.CodecCombinatorMixin] |
| 179 | + mixin. |
| 180 | +
|
| 181 | + To automatically handle the recursive application as a caller, you can |
| 182 | + use |
| 183 | + ```python |
| 184 | + numcodecs_combinators.map_codec(best, mapper) |
| 185 | + ``` |
| 186 | + instead. |
| 187 | +
|
| 188 | + Parameters |
| 189 | + ---------- |
| 190 | + mapper : Callable[[Codec], Codec] |
| 191 | + The callable that should be applied to each codec to map over this |
| 192 | + best codec combinator. |
| 193 | +
|
| 194 | + Returns |
| 195 | + ------- |
| 196 | + mapped : PickBestCodec |
| 197 | + The mapped best codec combinator. |
| 198 | + """ |
| 199 | + |
| 200 | + return PickBestCodec(*map(mapper, self)) |
| 201 | + |
| 202 | + def __add__(self, other) -> "PickBestCodec": |
| 203 | + return PickBestCodec(*tuple.__add__(self, other)) |
| 204 | + |
| 205 | + def __mul__(self, other) -> "PickBestCodec": |
| 206 | + return PickBestCodec(*tuple.__mul__(self, other)) |
| 207 | + |
| 208 | + def __rmul__(self, other) -> "PickBestCodec": |
| 209 | + return PickBestCodec(*tuple.__rmul__(self, other)) |
| 210 | + |
| 211 | + |
| 212 | +numcodecs.registry.register_codec(PickBestCodec) |
0 commit comments