Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/source/api/methods.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,11 @@ Utilities
:members:
:no-private-members:
:no-special-members:

Dimensionless Numbers
^^^^^^^^^^^^^^^^^^^^^

.. autoapimodule:: dimensionless_number_calculator
:members:
:no-private-members:
:no-special-members:
Comment thread
chraibi marked this conversation as resolved.
86 changes: 86 additions & 0 deletions notebooks/user_guide.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -3352,6 +3352,92 @@
":::"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Dimensionless Numbers: Intrusion and Avoidance\n",
"\n",
"The dimensionless Intrusion number $\\mathcal{I}n$ and Avoidance number $\\mathcal{A}v$\n",
"classify crowd flow regimes based on personal space intrusions and collision anticipation\n",
"([Cordes et al., *PNAS Nexus*, 2024](https://doi.org/10.1093/pnasnexus/pgae120)).\n",
"\n",
"The **Intrusion number** quantifies encroachment on personal space:\n",
"\n",
"$$\n",
"\\mathcal{I}n_i = \\sum_{j \\in \\mathcal{N}_i}\n",
"\\left(\\frac{r_{\\text{soc}} - \\ell_{\\text{min}}}{r_{ij} - \\ell_{\\text{min}}}\\right)^2\n",
"$$\n",
"\n",
"The **Avoidance number** quantifies the imminence of collisions via the time-to-collision (TTC):\n",
"\n",
"$$\n",
"\\mathcal{A}v_i = \\max_{j \\in \\mathcal{N}'_i}\n",
"\\frac{\\tau_0}{\\tau_{ij}}\n",
"$$\n",
"\n",
"where $r_{\\text{soc}} = 0.8\\,$m is the social radius, $\\ell_{\\text{min}} = 0.2\\,$m the pedestrian diameter,\n",
"and $\\tau_0 = 3\\,$s a reference timescale."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pedpy import IntrusionMethod, compute_avoidance, compute_intrusion\n",
"\n",
"intrusion = compute_intrusion(traj_data=traj)\n",
"intrusion.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"avoidance = compute_avoidance(traj_data=traj, frame_step=5, tau_0=3.0, radius=0.2)\n",
"avoidance.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Temporal evolution\n",
"\n",
"We can visualize how these numbers evolve over time by averaging over all pedestrians per frame:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"\n",
"mean_in = intrusion.groupby(\"frame\")[\"intrusion\"].mean()\n",
"mean_av = avoidance.groupby(\"frame\")[\"avoidance\"].mean()\n",
"\n",
"fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))\n",
"\n",
"ax1.plot(mean_in.index / traj.frame_rate, mean_in.values)\n",
"ax1.set_xlabel(\"Time / s\")\n",
"ax1.set_ylabel(r\"$\\langle \\mathcal{I}n \\rangle$\")\n",
"ax1.set_title(\"Mean Intrusion Number\")\n",
"\n",
"ax2.plot(mean_av.index / traj.frame_rate, mean_av.values)\n",
"ax2.set_xlabel(\"Time / s\")\n",
"ax2.set_ylabel(r\"$\\langle \\mathcal{A}v \\rangle$\")\n",
"ax2.set_title(\"Mean Avoidance Number\")\n",
"\n",
"fig.tight_layout()\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
14 changes: 14 additions & 0 deletions pedpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from .column_identifier import (
ACC_COL,
AVOIDANCE_COL,
A_X_COL,
A_Y_COL,
COUNT_COL,
Expand All @@ -25,6 +26,7 @@
FRAME_COL,
ID_COL,
INTERSECTION_COL,
INTRUSION_COL,
LAST_FRAME_COL,
MEAN_SPEED_COL,
MID_FRAME_COL,
Expand Down Expand Up @@ -86,6 +88,11 @@
compute_passing_density,
compute_voronoi_density,
)
from .methods.dimensionless_number_calculator import (
IntrusionMethod,
compute_avoidance,
compute_intrusion,
)
from .methods.flow_calculator import (
compute_flow,
compute_line_flow,
Expand All @@ -96,6 +103,7 @@
Cutoff,
SpeedCalculation,
compute_frame_range_in_area,
compute_individual_distances,
compute_individual_voronoi_polygons,
compute_intersecting_polygons,
compute_neighbor_distance,
Expand Down Expand Up @@ -188,6 +196,7 @@
"compute_frame_range_in_area",
"compute_individual_voronoi_polygons",
"compute_intersecting_polygons",
"compute_individual_distances",
"compute_neighbors",
"compute_neighbor_distance",
"compute_time_distance_line",
Expand Down Expand Up @@ -217,6 +226,9 @@
"compute_mean_acceleration_per_frame",
"compute_voronoi_acceleration",
"correct_invalid_trajectories",
"compute_intrusion",
"IntrusionMethod",
"compute_avoidance",
"PEDPY_BLUE",
"PEDPY_GREEN",
"PEDPY_GREY",
Expand Down Expand Up @@ -271,6 +283,8 @@
"MID_POSITION_COL",
"END_POSITION_COL",
"WINDOW_SIZE_COL",
"INTRUSION_COL",
"AVOIDANCE_COL",
"__version__",
"AccelerationError",
"GeometryError",
Expand Down
3 changes: 3 additions & 0 deletions pedpy/column_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@
SPEED_SP2_COL: Final = "s_sp-1"
FLOW_SP1_COL: Final = "j_sp+1"
FLOW_SP2_COL: Final = "j_sp-1"

INTRUSION_COL: Final = "intrusion"
AVOIDANCE_COL: Final = "avoidance"
163 changes: 163 additions & 0 deletions pedpy/methods/dimensionless_number_calculator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
"""Dimensionless numbers for pedestrian crowd classification.

Implements the Intrusion number and Avoidance number defined in
Cordes et al., PNAS Nexus 2024 (https://doi.org/10.1093/pnasnexus/pgae120).
"""

from enum import Enum

import numpy as np
import pandas as pd
import shapely

from pedpy.column_identifier import (
AVOIDANCE_COL,
FRAME_COL,
ID_COL,
INTRUSION_COL,
)
from pedpy.data.trajectory_data import TrajectoryData
from pedpy.methods.method_utils import (
SpeedCalculation,
compute_individual_distances,
)
Comment thread
chraibi marked this conversation as resolved.
from pedpy.methods.speed_calculator import compute_individual_speed


class IntrusionMethod(Enum): # pylint: disable=too-few-public-methods
"""Identifier of method to compute the intrusion."""

MAX = "max"
"""Use the max value in neighborhood as intrusion."""
SUM = "sum"
"""Use the sum of all values in neighborhood as intrusion."""


def compute_intrusion(
*,
traj_data: TrajectoryData,
r_soc: float = 0.8,
l_min: float = 0.2,
method: IntrusionMethod = IntrusionMethod.SUM,
) -> pd.DataFrame:
r"""Compute the intrusion number for each pedestrian per frame.

The intrusion variable :math:`\mathcal{I}n_i` quantifies how much
other agents encroach on pedestrian *i*'s personal space
(Cordes et al. 2024, Eq. 1):

.. math::

\mathcal{I}n_i = \sum_{j \in \mathcal{N}_i}
\left(\frac{r_\text{soc} - \ell_\text{min}}
{r_{ij} - \ell_\text{min}}\right)^{k_I},

with :math:`k_I = 2`. The neighbor set :math:`\mathcal{N}_i` contains
all agents *j* with :math:`r_{ij} \leq 3\,r_\text{soc}`.

Args:
traj_data (TrajectoryData): trajectory data to analyze
r_soc (float): social radius in m (default 0.8)
l_min (float): pedestrian diameter in m (default 0.2)
method (IntrusionMethod): aggregation over neighbors,
``SUM`` (default, as in the paper) or ``MAX``

Returns:
DataFrame with columns 'id', 'frame', and 'intrusion'.
Agents with no neighbors within the cutoff (or only at
exactly ``l_min`` distance) are absent from the result.
"""
intrusion = compute_individual_distances(traj_data=traj_data)
intrusion = intrusion.loc[(intrusion.distance <= 3 * r_soc) & (intrusion.distance > l_min)].copy()
intrusion.loc[:, INTRUSION_COL] = ((r_soc - l_min) / (intrusion.distance - l_min)) ** 2
intrusion = (
intrusion.groupby(by=[ID_COL, FRAME_COL])
.agg(
intrusion=(INTRUSION_COL, method.value),
)
.reset_index()
)

return intrusion


def compute_avoidance(
*,
traj_data: TrajectoryData,
frame_step: int,
radius: float = 0.4,
tau_0: float,
) -> pd.DataFrame:
r"""Compute the avoidance number for each pedestrian per frame.

The avoidance variable :math:`\mathcal{A}v_i` quantifies the
imminence of collisions that pedestrian *i* faces, based on
the time-to-collision (TTC) with neighbors
(Cordes et al. 2024, Eq. 2):

.. math::

\mathcal{A}v_i = \sum_{j \in \mathcal{N}'_i}
\left(\frac{\tau_0}{\tau_{ij}}\right)^{k_A},

with :math:`k_A = 1`. The neighbor set :math:`\mathcal{N}'_i` is
restricted to the agent with the shortest TTC :math:`\tau_{ij}`,
implemented by taking the ``max`` over :math:`\tau_0 / \tau_{ij}`.

Args:
traj_data (TrajectoryData): trajectory data to analyze
frame_step (int): number of frames used for velocity computation
radius (float): disc diameter :math:`\ell_\text{soc}` for TTC
computation in m (default 0.4, as in the paper)
tau_0 (float): reference timescale in s (paper uses 3.0)

Returns:
DataFrame with columns 'id', 'frame', and 'avoidance'
"""
velocity = compute_individual_speed(
traj_data=traj_data,
frame_step=frame_step,
compute_velocity=True,
speed_calculation=SpeedCalculation.BORDER_SINGLE_SIDED,
)

data = traj_data.data.merge(velocity, on=[ID_COL, FRAME_COL])
data["velocity"] = shapely.points(data.v_x, data.v_y)

matrix = data.merge(data, how="inner", on=FRAME_COL, suffixes=("", "_neighbor"))
matrix = matrix[matrix[ID_COL] != matrix[f"{ID_COL}_neighbor"]]

pos = shapely.get_coordinates(matrix.point)
pos_neighbor = shapely.get_coordinates(matrix.point_neighbor)
distance = np.linalg.norm(pos - pos_neighbor, axis=1)

delta_v = shapely.get_coordinates(matrix.velocity) - shapely.get_coordinates(matrix.velocity_neighbor)
delta_v_norm = np.linalg.norm(delta_v, axis=1)

# only compute for pairs with nonzero distance and nonzero relative velocity
computable = (distance > 0) & (delta_v_norm > 0)

ttc = np.full(matrix.shape[0], np.inf)

if np.any(computable):
d_c = distance[computable]
dv_c = delta_v[computable]
dv_norm_c = delta_v_norm[computable]

e_v = (pos[computable] - pos_neighbor[computable]) / d_c[:, np.newaxis]
v_rel_hat = dv_c / dv_norm_c[:, np.newaxis]
cos_alpha = np.sum(e_v * v_rel_hat, axis=1)

capital_a = (cos_alpha**2 - 1) * d_c**2 + radius**2
sqrt_a_safe = np.sqrt(np.maximum(capital_a, 0.0))

valid = (capital_a >= 0) & (-cos_alpha * d_c - sqrt_a_safe >= 0)

idx = np.where(computable)[0][valid]
ttc[idx] = (-cos_alpha[valid] * d_c[valid] - sqrt_a_safe[valid]) / dv_norm_c[valid]

matrix = matrix.copy()
matrix[AVOIDANCE_COL] = tau_0 / ttc

avoidance = matrix.groupby(by=[ID_COL, FRAME_COL], as_index=False).agg(avoidance=(AVOIDANCE_COL, "max"))
return avoidance
32 changes: 32 additions & 0 deletions pedpy/methods/method_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1295,3 +1295,35 @@ def is_individual_speed_valid(
return DataValidationStatus.ENTRY_MISSING

return DataValidationStatus.DATA_CORRECT


def compute_individual_distances(*, traj_data: TrajectoryData) -> pd.DataFrame:
"""Compute pairwise distances between all pedestrians per frame.

Args:
traj_data: trajectory data

Returns:
DataFrame with columns 'id', 'frame', 'neighbor_id', and 'distance'
"""
neighbor_point_col = f"{POINT_COL}_{NEIGHBOR_ID_COL}"
points = traj_data.data[[ID_COL, FRAME_COL, POINT_COL]]
neighbor_points = points.rename(
columns={
ID_COL: NEIGHBOR_ID_COL,
POINT_COL: neighbor_point_col,
}
)
matrix = points.merge(
neighbor_points,
how="inner",
on=FRAME_COL,
)
matrix = matrix[matrix[ID_COL] != matrix[NEIGHBOR_ID_COL]]
matrix[DISTANCE_COL] = np.linalg.norm(
shapely.get_coordinates(matrix[POINT_COL]) - shapely.get_coordinates(matrix[neighbor_point_col]),
axis=1,
)

matrix = matrix.drop(columns=[POINT_COL, neighbor_point_col])
return matrix.reset_index(drop=True)
Comment on lines +1300 to +1329
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compute_individual_distances builds a full Cartesian product of pedestrians per frame via merge, which is O(n²) rows and can become prohibitively slow/memory-intensive for typical crowd sizes. Consider adding an optional max-distance/cutoff parameter (or a separate compute_individual_distances_within_radius) and using a spatial index (e.g., Shapely STRtree.query/query_nearest) to only materialize pairs that can actually interact, rather than all pairs.

Suggested change
def compute_individual_distances(*, traj_data: TrajectoryData) -> pd.DataFrame:
"""Compute pairwise distances between all pedestrians per frame.
Args:
traj_data: trajectory data
Returns:
DataFrame with columns 'id', 'frame', 'neighbor_id', and 'distance'
"""
neighbor_point_col = f"{POINT_COL}_{NEIGHBOR_ID_COL}"
points = traj_data.data[[ID_COL, FRAME_COL, POINT_COL]]
neighbor_points = points.rename(
columns={
ID_COL: NEIGHBOR_ID_COL,
POINT_COL: neighbor_point_col,
}
)
matrix = points.merge(
neighbor_points,
how="inner",
on=FRAME_COL,
)
matrix = matrix[matrix[ID_COL] != matrix[NEIGHBOR_ID_COL]]
matrix[DISTANCE_COL] = np.linalg.norm(
shapely.get_coordinates(matrix[POINT_COL]) - shapely.get_coordinates(matrix[neighbor_point_col]),
axis=1,
)
matrix = matrix.drop(columns=[POINT_COL, neighbor_point_col])
return matrix.reset_index(drop=True)
def compute_individual_distances(
*,
traj_data: TrajectoryData,
max_distance: Optional[float] = None,
) -> pd.DataFrame:
"""Compute pairwise distances between pedestrians per frame.
Args:
traj_data: trajectory data
max_distance: optional cutoff radius. If provided, only pairs whose
distance is smaller than or equal to this value are returned.
Returns:
DataFrame with columns 'id', 'frame', 'neighbor_id', and 'distance'
"""
if max_distance is not None and max_distance < 0:
raise PedPyValueError("max_distance needs to be greater than or equal to 0.")
records = []
points = traj_data.data[[ID_COL, FRAME_COL, POINT_COL]]
for frame, frame_points in points.groupby(FRAME_COL, sort=False):
frame_points = frame_points.reset_index(drop=True)
ids = frame_points[ID_COL].to_numpy()
geometries = frame_points[POINT_COL].to_list()
coordinates = shapely.get_coordinates(geometries)
if len(frame_points) < 2:
continue
if max_distance is None:
for src_idx, dst_idx in itertools.permutations(range(len(frame_points)), 2):
distance = float(
np.linalg.norm(coordinates[src_idx] - coordinates[dst_idx])
)
records.append(
{
ID_COL: ids[src_idx],
FRAME_COL: frame,
NEIGHBOR_ID_COL: ids[dst_idx],
DISTANCE_COL: distance,
}
)
continue
tree = shapely.STRtree(geometries)
geometry_to_index = {id(geometry): idx for idx, geometry in enumerate(geometries)}
for src_idx, geometry in enumerate(geometries):
candidate_indices = {
geometry_to_index[id(candidate)]
for candidate in tree.query(geometry.buffer(max_distance))
}
candidate_indices.discard(src_idx)
for dst_idx in candidate_indices:
distance = float(
np.linalg.norm(coordinates[src_idx] - coordinates[dst_idx])
)
if distance <= max_distance:
records.append(
{
ID_COL: ids[src_idx],
FRAME_COL: frame,
NEIGHBOR_ID_COL: ids[dst_idx],
DISTANCE_COL: distance,
}
)
return pd.DataFrame.from_records(
records,
columns=[ID_COL, FRAME_COL, NEIGHBOR_ID_COL, DISTANCE_COL],
).reset_index(drop=True)

Copilot uses AI. Check for mistakes.
Loading