Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 86 additions & 31 deletions mapillary_tools/geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,47 @@ class Point:
alt: float | None
angle: float | None

def get_gps_epoch_time(self) -> float | None:
"""
Return the GPS epoch time for this point.
Base Point class returns None, subclasses can override.
"""
return None

def _calculate_weight_for_interpolation(self, other: "Point", t: float) -> float:
"""
Calculate interpolation weight between self and other at time t.
Returns 0 if both points have the same time regardless of t.
"""
if self.time == other.time:
return 0.0
return (t - self.time) / (other.time - self.time)

def interpolate_with(self, other: "Point", t: float) -> "Point":
"""
Create a new interpolated point between self and other at time t.

Args:
other: The end point to interpolate towards
t: The target timestamp

Returns:
A new Point with interpolated values
"""
weight = self._calculate_weight_for_interpolation(other, t)

lat = self.lat + (other.lat - self.lat) * weight
lon = self.lon + (other.lon - self.lon) * weight
angle = compute_bearing((self.lat, self.lon), (other.lat, other.lon))

alt: float | None
if self.alt is not None and other.alt is not None:
alt = self.alt + (other.alt - self.alt) * weight
else:
alt = None

return Point(time=t, lat=lat, lon=lon, alt=alt, angle=angle)


PointLike = T.TypeVar("PointLike", bound=Point)

Expand All @@ -52,17 +93,36 @@ def gps_distance(latlon_1: tuple[float, float], latlon_2: tuple[float, float]) -


def avg_speed(sequence: T.Sequence[PointLike]) -> float:
"""
Calculate average speed over a sequence of points.
Uses GPS epoch time when available (via get_gps_epoch_time()),
otherwise falls back to the time field.
Returns 0.0 for empty or single-element sequences.
Returns NaN if time difference is zero (undefined speed).
"""
# Need at least 2 points to calculate speed
if len(sequence) < 2:
return 0.0

total_distance = 0.0
for cur, nxt in pairwise(sequence):
total_distance += gps_distance((cur.lat, cur.lon), (nxt.lat, nxt.lon))

if sequence:
time_diff = sequence[-1].time - sequence[0].time
first = sequence[0]
last = sequence[-1]

# Try to use GPS epoch time if available (via polymorphic method)
first_gps_time = first.get_gps_epoch_time()
last_gps_time = last.get_gps_epoch_time()

if first_gps_time is not None and last_gps_time is not None:
time_diff = last_gps_time - first_gps_time
else:
time_diff = 0.0
# Fall back to time field if GPS epoch time not available
time_diff = last.time - first.time

if time_diff == 0.0:
return float("inf")
return float("nan")

return total_distance / time_diff

Expand Down Expand Up @@ -141,7 +201,7 @@ def as_unix_time(dt: datetime.datetime | int | float) -> float:

if sys.version_info < (3, 10):

def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point:
def interpolate(points: T.Sequence[PointLike], t: float, lo: int = 0) -> PointLike:
"""
Interpolate or extrapolate the point at time t along the sequence of points (sorted by time).
"""
Expand All @@ -152,12 +212,14 @@ def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point:
# for cur, nex in pairwise(points):
# assert cur.time <= nex.time, "Points not sorted"

p = Point(time=t, lat=float("-inf"), lon=float("-inf"), alt=None, angle=None)
idx = bisect.bisect_left(points, p, lo=lo)
times = [p.time for p in points]
# Use bisect_left on the times list
idx = bisect.bisect_left(times, t, lo=lo)

return _interpolate_at_segment_idx(points, t, idx)
else:

def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point:
def interpolate(points: T.Sequence[PointLike], t: float, lo: int = 0) -> PointLike:
"""
Interpolate or extrapolate the point at time t along the sequence of points (sorted by time).
"""
Expand All @@ -172,18 +234,19 @@ def interpolate(points: T.Sequence[Point], t: float, lo: int = 0) -> Point:
return _interpolate_at_segment_idx(points, t, idx)


class Interpolator:
class Interpolator(T.Generic[PointLike]):
"""
Interpolator for interpolating a sequence of timestamps incrementally.
Preserves the type of input points (Point, GPSPoint, or CAMMGPSPoint).
"""

tracks: T.Sequence[T.Sequence[Point]]
tracks: T.Sequence[T.Sequence[PointLike]]
track_idx: int
# interpolation starts from the lower bound point index in the current track
lo: int
prev_time: float | None

def __init__(self, tracks: T.Sequence[T.Sequence[Point]]):
def __init__(self, tracks: T.Sequence[T.Sequence[PointLike]]):
# Remove empty tracks
self.tracks = [track for track in tracks if track]

Expand All @@ -204,7 +267,7 @@ def __init__(self, tracks: T.Sequence[T.Sequence[Point]]):

@staticmethod
def _lsearch_left(
track: T.Sequence[Point], t: float, lo: int = 0, hi: int | None = None
track: T.Sequence[PointLike], t: float, lo: int = 0, hi: int | None = None
) -> int:
"""
similar to bisect.bisect_left, but faster in the incremental search case
Expand All @@ -221,14 +284,14 @@ def _lsearch_left(
# assert track[lo - 1].time < t <= track[lo].time
return lo

def interpolate(self, t: float) -> Point:
def interpolate(self, t: float) -> PointLike:
if self.prev_time is not None:
if not (self.prev_time <= t):
raise ValueError(
f"Require times to be monotonically increasing, but got {self.prev_time} then {t}"
)

interpolated: Point | None = None
interpolated: PointLike | None = None

while self.track_idx < len(self.tracks):
track = self.tracks[self.track_idx]
Expand Down Expand Up @@ -318,25 +381,17 @@ def _ecef_from_lla2(lat: float, lon: float) -> tuple[float, float, float]:
return x, y, z


def _interpolate_segment(start: Point, end: Point, t: float) -> Point:
try:
weight = (t - start.time) / (end.time - start.time)
except ZeroDivisionError:
weight = 0.0

lat = start.lat + (end.lat - start.lat) * weight
lon = start.lon + (end.lon - start.lon) * weight
angle = compute_bearing((start.lat, start.lon), (end.lat, end.lon))
alt: float | None
if start.alt is not None and end.alt is not None:
alt = start.alt + (end.alt - start.alt) * weight
else:
alt = None

return Point(time=t, lat=lat, lon=lon, alt=alt, angle=angle)
def _interpolate_segment(start: PointLike, end: PointLike, t: float) -> PointLike:
"""
Interpolate between two points at time t, preserving the type of the input points.
Uses the polymorphic interpolate_with() method to support subclasses.
"""
return T.cast(PointLike, start.interpolate_with(end, t))


def _interpolate_at_segment_idx(points: T.Sequence[Point], t: float, idx: int) -> Point:
def _interpolate_at_segment_idx(
points: T.Sequence[PointLike], t: float, idx: int
) -> PointLike:
"""
Interpolate time t along the segment between idx - 1 and idx.
If idx is out of range, extrapolate it to the nearest segment (first or last).
Expand Down
103 changes: 103 additions & 0 deletions mapillary_tools/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,57 @@ class GPSPoint(TimestampedMeasurement, Point):
precision: float | None
ground_speed: float | None

def get_gps_epoch_time(self) -> float | None:
"""Return the GPS epoch time if valid, otherwise None."""
if self.epoch_time is not None and self.epoch_time > 0:
return self.epoch_time
return None

def interpolate_with(self, other: Point, t: float) -> Point:
"""Create a new interpolated GPSPoint using this and other point at time t."""
base = super().interpolate_with(other, t)
if not isinstance(other, GPSPoint):
return base

# Interpolate GPSPoint-specific fields
weight = self._calculate_weight_for_interpolation(other, t)
epoch_time: float | None
if (
self.epoch_time is not None
and other.epoch_time is not None
and self.epoch_time > 0
and other.epoch_time > 0
):
epoch_time = self.epoch_time + (other.epoch_time - self.epoch_time) * weight
else:
epoch_time = None

precision: float | None
if self.precision is not None and other.precision is not None:
precision = self.precision + (other.precision - self.precision) * weight
else:
precision = None

ground_speed: float | None
if self.ground_speed is not None and other.ground_speed is not None:
ground_speed = (
self.ground_speed + (other.ground_speed - self.ground_speed) * weight
)
else:
ground_speed = None

return GPSPoint(
time=base.time,
lat=base.lat,
lon=base.lon,
alt=base.alt,
angle=base.angle,
epoch_time=epoch_time,
fix=self.fix, # Use start point's fix value
precision=precision,
ground_speed=ground_speed,
)


@dataclasses.dataclass
class CAMMGPSPoint(TimestampedMeasurement, Point):
Expand All @@ -44,6 +95,58 @@ class CAMMGPSPoint(TimestampedMeasurement, Point):
velocity_up: float
speed_accuracy: float

def get_gps_epoch_time(self) -> float | None:
"""Return the GPS epoch time if valid, otherwise None."""
if self.time_gps_epoch > 0:
return self.time_gps_epoch
return None

def interpolate_with(self, other: Point, t: float) -> Point:
"""Create a new interpolated CAMMGPSPoint using this and other point at time t."""
base = super().interpolate_with(other, t)
if not isinstance(other, CAMMGPSPoint):
return base

# Interpolate all CAMM-specific fields
weight = self._calculate_weight_for_interpolation(other, t)
time_gps_epoch = (
self.time_gps_epoch + (other.time_gps_epoch - self.time_gps_epoch) * weight
)
horizontal_accuracy = (
self.horizontal_accuracy
+ (other.horizontal_accuracy - self.horizontal_accuracy) * weight
)
vertical_accuracy = (
self.vertical_accuracy
+ (other.vertical_accuracy - self.vertical_accuracy) * weight
)
velocity_east = (
self.velocity_east + (other.velocity_east - self.velocity_east) * weight
)
velocity_north = (
self.velocity_north + (other.velocity_north - self.velocity_north) * weight
)
velocity_up = self.velocity_up + (other.velocity_up - self.velocity_up) * weight
speed_accuracy = (
self.speed_accuracy + (other.speed_accuracy - self.speed_accuracy) * weight
)

return CAMMGPSPoint(
time=base.time,
lat=base.lat,
lon=base.lon,
alt=base.alt,
angle=base.angle,
time_gps_epoch=time_gps_epoch,
gps_fix_type=self.gps_fix_type, # Use start point's fix type
horizontal_accuracy=horizontal_accuracy,
vertical_accuracy=vertical_accuracy,
velocity_east=velocity_east,
velocity_north=velocity_north,
velocity_up=velocity_up,
speed_accuracy=speed_accuracy,
)


@dataclasses.dataclass(order=True)
class GyroscopeData(TimestampedMeasurement):
Expand Down
Loading