|
| 1 | +"""Split pings into groups by their true heading (yaw). |
| 2 | +
|
| 3 | +Divide the compass into *n* evenly spaced directions and assign each |
| 4 | +ping to the nearest one. An optional *mirror* mode treats headings |
| 5 | +180° apart as equivalent—useful for survey lines that alternate |
| 6 | +direction. |
| 7 | +""" |
| 8 | + |
| 9 | +import math |
| 10 | +from typing import Dict, List |
| 11 | + |
| 12 | +from themachinethatgoesping.pingprocessing.core.progress import get_progress_iterator |
| 13 | + |
| 14 | +from themachinethatgoesping.echosounders import filetemplates |
| 15 | + |
| 16 | +I_Ping = filetemplates.I_Ping |
| 17 | + |
| 18 | + |
| 19 | +# ──────────────────────────── helpers ──────────────────────────── |
| 20 | + |
| 21 | +def _angular_distance(a: float, b: float) -> float: |
| 22 | + """Unsigned shortest angular distance in [0, 180].""" |
| 23 | + return abs((a - b + 180.0) % 360.0 - 180.0) |
| 24 | + |
| 25 | + |
| 26 | +def _make_bin_centers( |
| 27 | + num_directions: int, |
| 28 | + heading_offset: float, |
| 29 | + mirror: bool, |
| 30 | +) -> List[float]: |
| 31 | + """Return the bin-center headings. |
| 32 | +
|
| 33 | + Without *mirror* the full 360° circle is divided. |
| 34 | + With *mirror* only the 0°–180° half-circle is divided (the other |
| 35 | + half is folded back). |
| 36 | + """ |
| 37 | + span = 180.0 if mirror else 360.0 |
| 38 | + step = span / num_directions |
| 39 | + return [(heading_offset + i * step) % 360.0 for i in range(num_directions)] |
| 40 | + |
| 41 | + |
| 42 | +def _nearest_bin( |
| 43 | + heading: float, |
| 44 | + bin_centers: List[float], |
| 45 | + mirror: bool, |
| 46 | +) -> float: |
| 47 | + """Return the bin center closest to *heading*. |
| 48 | +
|
| 49 | + When *mirror* is ``True``, headings 180° apart from a bin center |
| 50 | + are considered equally close (i.e. 225° maps to the 45° bin). |
| 51 | + """ |
| 52 | + best_center = bin_centers[0] |
| 53 | + best_dist = float("inf") |
| 54 | + |
| 55 | + for center in bin_centers: |
| 56 | + if mirror: |
| 57 | + dist = min( |
| 58 | + _angular_distance(heading, center), |
| 59 | + _angular_distance(heading, (center + 180.0) % 360.0), |
| 60 | + ) |
| 61 | + else: |
| 62 | + dist = _angular_distance(heading, center) |
| 63 | + |
| 64 | + if dist < best_dist: |
| 65 | + best_dist = dist |
| 66 | + best_center = center |
| 67 | + |
| 68 | + return best_center |
| 69 | + |
| 70 | + |
| 71 | +def _format_label(degrees: float) -> str: |
| 72 | + """Human-readable label for a bin center, e.g. ``'heading_045'``.""" |
| 73 | + return f"heading_{degrees:05.1f}" |
| 74 | + |
| 75 | + |
| 76 | +# ──────────────────────────── public API ───────────────────────── |
| 77 | + |
| 78 | +def by_heading( |
| 79 | + pings: List[I_Ping], |
| 80 | + num_directions: int = 4, |
| 81 | + heading_offset: float = 0.0, |
| 82 | + mirror: bool = False, |
| 83 | + progress: bool = False, |
| 84 | +) -> Dict[str, List[I_Ping]]: |
| 85 | + """Split pings into groups by true heading (yaw). |
| 86 | +
|
| 87 | + The compass is divided into *num_directions* evenly spaced bins. |
| 88 | + Each ping is placed in the bin whose centre is closest to the |
| 89 | + ping's ``yaw`` (true heading). |
| 90 | +
|
| 91 | + Parameters |
| 92 | + ---------- |
| 93 | + pings : list of I_Ping |
| 94 | + Input ping sequence. |
| 95 | + num_directions : int, optional |
| 96 | + Number of heading bins. Default **4** (N / E / S / W when |
| 97 | + *heading_offset* is 0). |
| 98 | + heading_offset : float, optional |
| 99 | + Rotation of the bin grid in degrees. The first bin centre is |
| 100 | + placed at this angle. Default **0** (north). |
| 101 | + mirror : bool, optional |
| 102 | + If ``True``, opposite headings (180° apart) are treated as the |
| 103 | + same direction. The *num_directions* bins then span only 180° |
| 104 | + instead of 360°. For example ``num_directions=4, mirror=True`` |
| 105 | + yields bins at 0°, 45°, 90°, 135°; headings 180°–359° are |
| 106 | + folded onto the corresponding opposite bin. Default **False**. |
| 107 | + progress : bool, optional |
| 108 | + Show a progress bar. Default **False**. |
| 109 | +
|
| 110 | + Returns |
| 111 | + ------- |
| 112 | + dict[str, list[I_Ping]] |
| 113 | + Keys are ``"heading_XXX.X"`` labels (one per bin centre). |
| 114 | + Values are lists of pings assigned to that direction. |
| 115 | +
|
| 116 | + Examples |
| 117 | + -------- |
| 118 | + Four cardinal directions:: |
| 119 | +
|
| 120 | + groups = by_heading(pings, num_directions=4, heading_offset=0) |
| 121 | + # keys: 'heading_000.0', 'heading_090.0', |
| 122 | + # 'heading_180.0', 'heading_270.0' |
| 123 | +
|
| 124 | + Six directions with offset, mirrored:: |
| 125 | +
|
| 126 | + groups = by_heading(pings, num_directions=6, |
| 127 | + heading_offset=10, mirror=True) |
| 128 | + # bins at 10°, 40°, 70°, 100°, 130°, 160° |
| 129 | + # a ping at 190° → assigned to 'heading_010.0' |
| 130 | + """ |
| 131 | + if num_directions < 1: |
| 132 | + raise ValueError("num_directions must be >= 1") |
| 133 | + |
| 134 | + bin_centers = _make_bin_centers(num_directions, heading_offset, mirror) |
| 135 | + |
| 136 | + # Pre-build result dict with stable key order |
| 137 | + result: Dict[str, List[I_Ping]] = { |
| 138 | + _format_label(c): [] for c in bin_centers |
| 139 | + } |
| 140 | + |
| 141 | + if len(pings) == 0: |
| 142 | + return result |
| 143 | + |
| 144 | + it = get_progress_iterator(pings, progress, desc="Split pings by heading") |
| 145 | + |
| 146 | + for ping in it: |
| 147 | + geo = ping.get_geolocation() |
| 148 | + heading = geo.yaw % 360.0 |
| 149 | + |
| 150 | + center = _nearest_bin(heading, bin_centers, mirror) |
| 151 | + result[_format_label(center)].append(ping) |
| 152 | + |
| 153 | + return result |
0 commit comments