Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ Versionado: una entrada por **entrega académica** del semestre (no SemVer estri

## [Unreleased]

### Added — H4 fase 5: calibración parcial CP-01c + ADR-0020 (2026-05-21)
- Ejecutadas las tareas H4-cal-1 y H4-cal-2 del [ADR-0013](docs/architecture/decisions/0013-cp01c-criterio-calibrado.md):
- **H4-cal-1** ✅: parámetro `factor_calibracion: float = 1.0` agregado a `cargar_grafo_iv_region`. Aplica multiplicador al `speed_kph` de cada arista in-memory tras la carga (no persiste al GraphML cacheado). Default `1.0` preserva paridad RT-02 12/12 OK.
- **H4-cal-2** ✅: nuevo módulo experimental [`domain/routing/a_estrella_calibrado.py`](core-python/src/sentinel_dispatch/domain/routing/a_estrella_calibrado.py) con state extendido `(nodo, nodo_previo)` y `turn_penalty_s=2.0` por giro `>30°`. **No reemplaza** al A* operativo — vive separado para no romper la paridad bit-exacta con Java.
- **H4-cal-eval** parcial: nuevo test integration `test_cp01c_calibracion_y_turn_penalty`. **Resultado medido (2026-05-21)**: 27/100 dentro de ±15 % (mediana 0.250, p75=0.367, p95=0.836). El criterio CP-01c (≥85/100) **NO se alcanza** con calibración+turn penalty solas — el 68 % de la dispersión sigue atribuida a snap-to-node (predicho por ADR-0011 §Diagnóstico).
- [ADR-0013](docs/architecture/decisions/0013-cp01c-criterio-calibrado.md) actualizado con sección "Resultado de ejecución H4-cal-eval"; sigue `status: proposed` (criterio numérico no alcanzado).
- [ADR-0020](docs/architecture/decisions/0020-cp01c-parcial-snap-to-edge-necesario.md) nuevo, `accepted`: congela el resultado parcial, explica por qué snap-to-edge (H5 Ruta A) es **bloqueante** para promover ADR-0013 a accepted, planifica las 3 sub-tareas H5-cal-3a/b/c con esfuerzo estimado 6-8 h.
- Test marcado `@pytest.mark.xfail(strict=True)` con razón que apunta a ADR-0020. Cuando H5-cal-3 entregue, quitar `xfail` y promover ADR-0013 a `accepted` en el mismo PR.
- 10 UT del A* calibrado experimental en `tests/unit/domain/routing/test_a_estrella_calibrado.py` (bearing/delta-bearing + 6 tests del algoritmo: ruta recta sin penalty, giro 90° aplica penalty, origen=destino, sin ruta, factor_hora inválido, turn_penalty=0 equivale al A* simple).

### Changed — H4 fase 5
- `docs/quality/trazabilidad.md`: nueva fila para **CP-01c 🟡 H5** con referencias a ADR-0013 y ADR-0020.

### Added — H4 fase 4: spike performance CP-12 + ADR-0019 (RN-05 / CP-12, 2026-05-21)
- Nuevo script reproducible [`tools/spike_cp12_performance.py`](tools/spike_cp12_performance.py): genera 50 unidades sintéticas (30 Avanzada / 20 Básica) distribuidas en grilla regular sobre la bbox conurbación La Serena-Coquimbo, 1 incidente Echo en el centro, carga del grafo `coquimbo.graphml` excluida del wall-clock, 10 corridas warm-cache, reporta p50/p95/max/media + JSON crudo en `tools/_out/spike_cp12_resultado.json`.
- **Resultado del spike (corrida 2026-05-21)**: p50 = 1884.6 ms, p95 = 1941.6 ms, max = 1975.1 ms, media = 1895.8 ms. El criterio SRS (≤ 1000 ms) **no se cumple** con A* secuencial; cada A* sobre ~16 K nodos toma ~37 ms × 50 unidades.
Expand Down
29 changes: 28 additions & 1 deletion core-python/src/sentinel_dispatch/adapters/grafo_osmnx.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def cargar_grafo_iv_region(
bbox: tuple[float, float, float, float] = BBOX_IV_REGION,
ruta_cache: Path = GRAPHML_PATH,
forzar_descarga: bool = False,
factor_calibracion: float = 1.0,
) -> nx.MultiDiGraph:
"""Carga el grafo vial de la conurbación La Serena-Coquimbo, con caché local.

Expand All @@ -110,15 +111,27 @@ def cargar_grafo_iv_region(
Path al archivo ``.graphml`` de caché. Por defecto :data:`GRAPHML_PATH`.
forzar_descarga:
Si ``True``, ignora la caché existente y re-descarga.
factor_calibracion:
Multiplicador aplicado al ``speed_kph`` de cada arista tras la carga
(ADR-0013 §H4-cal-1). Default ``1.0`` (sin cambio). Usar ``0.85``
para acercar las velocidades efectivas al perfil ``car.lua`` de OSRM
(CP-01c). NO se persiste a disco — la calibración vive solo en memoria
para no contaminar el grafo cacheado ni la paridad RT-02.

Retorna
-------
nx.MultiDiGraph
Grafo vial con atributo ``speed_kph`` en todas las aristas.
Grafo vial con atributo ``speed_kph`` en todas las aristas, opcionalmente
escalado por ``factor_calibracion``.
"""
if factor_calibracion <= 0:
raise ValueError(f"factor_calibracion debe ser > 0, recibido: {factor_calibracion}")

if ruta_cache.exists() and not forzar_descarga:
_log.info("Cargando grafo desde caché: %s", ruta_cache)
grafo: nx.MultiDiGraph = ox.load_graphml(ruta_cache)
if factor_calibracion != 1.0:
_aplicar_factor_calibracion(grafo, factor_calibracion)
return grafo

_log.info("Descargando grafo vial desde OSM (bbox=%s)…", bbox)
Expand All @@ -136,9 +149,23 @@ def cargar_grafo_iv_region(
ruta_cache.parent.mkdir(parents=True, exist_ok=True)
ox.save_graphml(grafo, filepath=ruta_cache)
_log.info("Grafo persistido en: %s", ruta_cache)

if factor_calibracion != 1.0:
_aplicar_factor_calibracion(grafo, factor_calibracion)
return grafo


def _aplicar_factor_calibracion(grafo: nx.MultiDiGraph, factor: float) -> None:
"""Escala el ``speed_kph`` de cada arista por ``factor`` (in-place).

Solo afecta el grafo en memoria; el archivo GraphML cacheado en disco no
se modifica. ADR-0013 §H4-cal-1.
"""
for _u, _v, data in grafo.edges(data=True):
speed = data.get("speed_kph", MAXSPEED_FALLBACK_KMH)
data["speed_kph"] = float(speed) * factor


# ---------------------------------------------------------------------------
# Adapter — implementación del puerto GrafoVial
# ---------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
"""A* experimental con turn penalty simple (ADR-0013 §H4-cal-2).

Variante calibrada del :func:`a_estrella` que añade un costo aditivo
``turn_penalty_s`` cuando el cambio de bearing entre la arista entrante y
la arista saliente supera ``bearing_umbral_grados``. Modela
groseramente el comportamiento de OSRM `car.lua`, que penaliza giros
fuertes en intersecciones.

**Aislamiento de producción**: esta función NO reemplaza a
:func:`a_estrella` en el orquestador. Vive como módulo separado para
preservar la paridad bit-exacta Java↔Python de RT-02 (ADR-0017). Se
invoca exclusivamente desde el test de calibración CP-01c
(``test_routing_vs_osrm.py::test_cp01c_calibracion_y_turn_penalty``).

Si futura V2 incorpora turn penalty al A* operativo, este módulo se
vuelve obsoleto y se reescribe el A* principal (decisión costosa de
revertir; ameritaría ADR nuevo).
"""

from __future__ import annotations

import heapq
import math
from typing import TYPE_CHECKING

from sentinel_dispatch.domain.routing.heuristica import haversine_segundos
from sentinel_dispatch.domain.routing.tipos import NodoId, NoRutaDisponibleError

if TYPE_CHECKING:
from sentinel_dispatch.domain.routing.grafo_vial import GrafoVial


def _bearing_grados(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Bearing inicial entre dos coordenadas en grados decimales, [0, 360)."""
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
delta_lambda = math.radians(lon2 - lon1)
y = math.sin(delta_lambda) * math.cos(phi2)
x = math.cos(phi1) * math.sin(phi2) - math.sin(phi1) * math.cos(phi2) * math.cos(delta_lambda)
return (math.degrees(math.atan2(y, x)) + 360.0) % 360.0


def _delta_bearing(b1: float, b2: float) -> float:
"""Diferencia angular menor entre dos bearings (en grados, [0, 180])."""
diff = abs(b2 - b1) % 360.0
return min(diff, 360.0 - diff)


def a_estrella_calibrado(
grafo: GrafoVial,
origen: NodoId,
destino: NodoId,
*,
factor_hora: float = 1.0,
factor_sirena: float = 1.0,
turn_penalty_s: float = 2.0,
bearing_umbral_grados: float = 30.0,
) -> tuple[float, list[NodoId]]:
"""A* con penalización aditiva por giros pronunciados.

Estado extendido: ``(nodo, nodo_previo)``. La arista entrante se
representa implícitamente por el nodo previo (suficiente para grafos
sin múltiples aristas paralelas con bearings distintos — en práctica
las pocas aristas paralelas de OSMnx comparten dirección general).

Penalty: si ``|bearing(prev→actual) − bearing(actual→vecino)| > umbral``
se suma ``turn_penalty_s`` al g_score del vecino. Para el primer paso
(sin nodo previo) no aplica penalty.

Args:
grafo: instancia del port :class:`GrafoVial` (solo lectura).
origen, destino: NodoIds.
factor_hora, factor_sirena: idénticos al A* original.
turn_penalty_s: segundos a sumar por giro pronunciado (default 2.0,
valor citado en ADR-0013).
bearing_umbral_grados: umbral de Δbearing para considerar "giro
pronunciado" (default 30°, valor citado en ADR-0013).

Returns:
Tupla ``(eta_segundos, ruta_de_nodos)``.

Raises:
NoRutaDisponibleError: si no existe camino.
ValueError: si los factores son ≤ 0.
"""
if factor_hora <= 0:
raise ValueError(f"factor_hora debe ser > 0, recibido: {factor_hora}")
if factor_sirena <= 0:
raise ValueError(f"factor_sirena debe ser > 0, recibido: {factor_sirena}")
if turn_penalty_s < 0:
raise ValueError(f"turn_penalty_s debe ser >= 0, recibido: {turn_penalty_s}")

if origen == destino:
return (0.0, [origen])

lat_destino, lon_destino = grafo.coordenadas(destino)

# Estado del open-set: (nodo_actual, nodo_previo). Para el origen, previo=None.
g_score: dict[tuple[NodoId, NodoId | None], float] = {(origen, None): 0.0}
padre: dict[tuple[NodoId, NodoId | None], tuple[NodoId, NodoId | None]] = {}

contador: int = 0
lat_origen, lon_origen = grafo.coordenadas(origen)
h_origen = haversine_segundos(lat_origen, lon_origen, lat_destino, lon_destino)
heap: list[tuple[float, int, NodoId, NodoId | None]] = [(h_origen, contador, origen, None)]

while heap:
_, _, nodo_actual, nodo_prev = heapq.heappop(heap)
estado_actual: tuple[NodoId, NodoId | None] = (nodo_actual, nodo_prev)
g_actual = g_score.get(estado_actual, math.inf)

if nodo_actual == destino:
return (g_actual, _reconstruir_ruta(padre, estado_actual, origen))

# Pre-calcula bearing de la arista entrante (si existe nodo previo)
bearing_in: float | None = None
if nodo_prev is not None:
lat_prev, lon_prev = grafo.coordenadas(nodo_prev)
lat_act, lon_act = grafo.coordenadas(nodo_actual)
bearing_in = _bearing_grados(lat_prev, lon_prev, lat_act, lon_act)

for arista in grafo.vecinos(nodo_actual):
velocidad_ms = arista.velocidad_efectiva_kmh * 1000.0 / 3600.0
peso = arista.longitud_m / (velocidad_ms * factor_hora * factor_sirena)

penalty = 0.0
if bearing_in is not None:
lat_act, lon_act = grafo.coordenadas(nodo_actual)
lat_vec, lon_vec = grafo.coordenadas(arista.destino)
bearing_out = _bearing_grados(lat_act, lon_act, lat_vec, lon_vec)
if _delta_bearing(bearing_in, bearing_out) > bearing_umbral_grados:
penalty = turn_penalty_s

g_tentativo = g_actual + peso + penalty
estado_vecino: tuple[NodoId, NodoId | None] = (arista.destino, nodo_actual)

if g_tentativo < g_score.get(estado_vecino, math.inf):
g_score[estado_vecino] = g_tentativo
padre[estado_vecino] = estado_actual
lat_vec, lon_vec = grafo.coordenadas(arista.destino)
h_vec = haversine_segundos(lat_vec, lon_vec, lat_destino, lon_destino)
contador += 1
heapq.heappush(heap, (g_tentativo + h_vec, contador, arista.destino, nodo_actual))

raise NoRutaDisponibleError(f"sin ruta entre {origen} y {destino}")


def _reconstruir_ruta(
padre: dict[tuple[NodoId, NodoId | None], tuple[NodoId, NodoId | None]],
estado_final: tuple[NodoId, NodoId | None],
origen: NodoId,
) -> list[NodoId]:
"""Reconstruye la ruta de nodos desde el estado final hasta el origen."""
ruta: list[NodoId] = []
estado: tuple[NodoId, NodoId | None] | None = estado_final
while estado is not None and estado[0] != origen:
ruta.append(estado[0])
estado = padre.get(estado)
ruta.append(origen)
ruta.reverse()
return ruta
101 changes: 101 additions & 0 deletions core-python/tests/integration/test_routing_vs_osrm.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
cargar_grafo_iv_region,
)
from sentinel_dispatch.domain.routing.a_estrella import a_estrella
from sentinel_dispatch.domain.routing.a_estrella_calibrado import a_estrella_calibrado

_log = logging.getLogger(__name__)

Expand All @@ -42,6 +43,15 @@
MINIMO_DENTRO: int = 75
"""Cantidad mínima de pares dentro de tolerancia exigida por CP-01a."""

TOLERANCIA_DURACION_CP01C: float = 0.15
"""Tolerancia relativa de duration tras calibración (CP-01c, ADR-0013)."""

MINIMO_DENTRO_CP01C: int = 85
"""Cantidad mínima de pares dentro de tolerancia exigida por CP-01c."""

FACTOR_CALIBRACION: float = 0.85
"""Factor a aplicar al speed cascade para acercarlo al perfil OSRM (ADR-0013)."""

FIXTURE_PATH: Path = Path(__file__).resolve().parents[1] / "fixtures" / "osrm_oracle.json"


Expand Down Expand Up @@ -186,3 +196,94 @@ def test_a_estrella_vs_osrm_paridad_distancia(
f"{TOLERANCIA_DISTANCIA}, mínimo exigido: {MINIMO_DENTRO}. "
f"Distribución observada: {_resumen_distribucion(errores_distancia)}"
)


@pytest.fixture(scope="module")
def adapter_calibrado() -> OsmnxGrafoVial:
"""Carga el grafo Coquimbo con `factor_calibracion=0.85` (ADR-0013)."""
if not GRAPHML_PATH.exists():
pytest.skip(f"GraphML ausente: {GRAPHML_PATH}. Generar con: make build-graph")
grafo = cargar_grafo_iv_region(
ruta_cache=GRAPHML_PATH,
forzar_descarga=False,
factor_calibracion=FACTOR_CALIBRACION,
)
return OsmnxGrafoVial(grafo=grafo)


@pytest.mark.xfail(
reason=(
"CP-01c no alcanzable solo con calibración+turn penalty. "
"Spike H4-cal-eval (2026-05-21) midió 27/100 dentro de ±15% "
"(mediana 0.250). Snap-to-edge (H5, ADR-0016 Ruta A) es necesario "
"para llegar a ≥85/100. Documentado en ADR-0020."
),
strict=True,
)
def test_cp01c_calibracion_y_turn_penalty(
fixture_osrm: dict[str, object],
adapter_calibrado: OsmnxGrafoVial,
) -> None:
"""CP-01c (ADR-0013): ≥85/100 pares con |Δ_duration|/t_OSRM ≤ 0.15.

Aplica las dos mejoras de calibración del ADR-0013:
- speed cascade × 0.85 (vía `cargar_grafo_iv_region(factor_calibracion=0.85)`).
- turn penalty 2.0 s por giro >30° (vía `a_estrella_calibrado`).

No invalida CP-01a: este test reporta duration usando un A* experimental
SEPARADO del A* operativo. La paridad bit-exacta con Java (RT-02) sigue
intacta.

**Estado actual (2026-05-21)**: `xfail` esperado hasta que snap-to-edge
(H5 Ruta A) reduzca los outliers atribuidos a snap (~68 % de la dispersión
según ADR-0011 §Diagnóstico). Ver ADR-0020.
"""
pares = fixture_osrm["pares"]
assert isinstance(pares, list)
assert len(pares) == 100

errores_duracion: list[float] = []
for par in pares:
assert isinstance(par, dict)
origen_coord = par["origen"]
destino_coord = par["destino"]
assert isinstance(origen_coord, dict)
assert isinstance(destino_coord, dict)
t_osrm = float(par["duration_s"])

nodo_origen = adapter_calibrado.nodo_mas_cercano(
float(origen_coord["lat"]), float(origen_coord["lon"])
)
nodo_destino = adapter_calibrado.nodo_mas_cercano(
float(destino_coord["lat"]), float(destino_coord["lon"])
)

t_propio, _ruta = a_estrella_calibrado(
adapter_calibrado,
nodo_origen,
nodo_destino,
factor_hora=1.0,
factor_sirena=1.0,
)

if t_osrm > 0.0:
errores_duracion.append(abs(t_propio - t_osrm) / t_osrm)

dentro = sum(1 for e in errores_duracion if e <= TOLERANCIA_DURACION_CP01C)

_log.info(
"CP-01c (calibración + turn penalty): %s",
_resumen_distribucion(errores_duracion),
)
_log.info(
"Veredicto CP-01c: dentro=%d/100 (mínimo %d con tol ±%.0f%%)",
dentro,
MINIMO_DENTRO_CP01C,
TOLERANCIA_DURACION_CP01C * 100,
)

assert dentro >= MINIMO_DENTRO_CP01C, (
f"CP-01c falla: solo {dentro}/100 pares con |Δ_duration|/t_OSRM ≤ "
f"{TOLERANCIA_DURACION_CP01C}, mínimo exigido: {MINIMO_DENTRO_CP01C}. "
f"Distribución observada: {_resumen_distribucion(errores_duracion)}"
)
Loading
Loading