Skip to content
3 changes: 3 additions & 0 deletions examples/quickstart-fr.py.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@
# You can also plot the flows, with labels for the cities that are bigger than their neighbours
labels = pop_trips.get_prominent_cities()
pop_trips.plot_od_flows(labels=labels)

# You can get a report of the parameters used in the model (for now not all of them, but eventually all of them)
report = pop_trips.parameters_dataframe()
2 changes: 1 addition & 1 deletion mobility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from mobility.transport_modes.public_transport import PublicTransportRoutingParameters

from .generalized_cost_parameters import GeneralizedCostParameters
from .transport_modes.carpool.detailed.detailed_carpool_generalized_cost_parameters import DetailedCarpoolGeneralizedCostParameters
from .transport_modes.carpool.detailed.detailed_carpool_generalized_cost import DetailedCarpoolGeneralizedCostParameters

from .cost_of_time_parameters import CostOfTimeParameters

Expand Down
293 changes: 292 additions & 1 deletion mobility/asset.py
Comment thread
FlxPo marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import json
import hashlib
import pathlib
from typing import Any, TypeVar

import geopandas as gpd
import pandas as pd

from abc import ABC, abstractmethod
from dataclasses import is_dataclass, fields
from pandas.util import hash_pandas_object
from pydantic import BaseModel

P = TypeVar("P", bound=BaseModel)

class Asset(ABC):
"""
Abstract base class representing an Asset, with functionality for cache validation
Expand Down Expand Up @@ -89,7 +93,294 @@ def serialize(value):
serialized_inputs = json.dumps(hashable_inputs, sort_keys=True).encode('utf-8')

return hashlib.md5(serialized_inputs).hexdigest()

@staticmethod
def prepare_parameters(
parameters: P | None,
parameters_cls: type[P],
explicit_args: dict[str, Any],
required_fields: list[str] | None = None,
owner_name: str = "Asset",
) -> P:
"""Normalize constructor inputs into a pydantic parameters object.

Args:
parameters: Pre-built parameters instance, if provided by the caller.
parameters_cls: Pydantic model class used to build parameters when
``parameters`` is not provided.
explicit_args: Explicit constructor arguments that can be mapped to
``parameters_cls`` fields.
required_fields: Names of explicit arguments that must be provided
when ``parameters`` is not passed.
owner_name: Human-readable owner name used in error messages.

Returns:
A normalized pydantic parameters instance.

Raises:
ValueError: If required explicit arguments are missing when
``parameters`` is not provided.
"""
explicit_provided = {k: v for k, v in explicit_args.items() if v is not None}

if parameters is not None:
if not explicit_provided:
return parameters

merged = {**parameters.model_dump(mode="python"), **explicit_provided}
validated = parameters_cls.model_validate(merged)

# Preserve provenance: explicit kwargs and fields explicitly set on
# the provided parameters model are marked as explicit.
provided_fields = set(parameters.model_fields_set) | set(explicit_provided.keys())

return parameters_cls.model_construct(
_fields_set=provided_fields,
**validated.model_dump(mode="python"),
)

required_fields = required_fields or []
missing = [field for field in required_fields if explicit_args.get(field) is None]
if missing:
fields_str = ", ".join(f"`{field}`" for field in missing)
raise ValueError(
f"{owner_name}: missing required explicit argument(s) {fields_str} when `parameters` is not provided."
)

return parameters_cls(**explicit_provided)

def list_parameters(self, recursive: bool = True) -> list[dict[str, Any]]:
"""Collect parameter rows from pydantic models found in asset inputs.

Args:
recursive: If ``True``, traverse the full upstream asset DAG.
Otherwise, only inspect direct inputs of ``self``.

Returns:
A flat list of dictionaries, one per model field, including field
values, defaults, metadata, constraints, and path information.
"""

rows: list[dict[str, Any]] = []
visited_assets: set[int] = set()
asset_paths_by_id: dict[int, set[str]] = {}

def extract_constraints(field_schema: dict[str, Any]) -> dict[str, Any]:
keys = [
"minimum",
"maximum",
"exclusiveMinimum",
"exclusiveMaximum",
"multipleOf",
"enum",
"pattern",
"minLength",
"maxLength",
"minItems",
"maxItems",
"uniqueItems",
"const",
"format",
]
return {k: field_schema[k] for k in keys if k in field_schema}

def add_model_rows(model: BaseModel, asset: "Asset", asset_path: str, model_path: str) -> None:
schema = model.model_json_schema()
properties = schema.get("properties", {})
model_name = model.__class__.__name__
asset_type = asset.__class__.__name__

for field_name, field_info in model.__class__.model_fields.items():
field_schema = properties.get(field_name, {})

if field_info.is_required():
default_value = None
else:
default_value = field_info.default

# model_fields_set tracks fields explicitly provided by the caller.
if field_name in model.model_fields_set:
value_source = "explicit"
elif field_info.is_required():
value_source = "required"
else:
value_source = "default"

rows.append(
{
"_asset_id": id(asset),
"asset_path": asset_path,
"asset_type": asset_type,
"model_name": model_name,
"field_name": field_name,
"field_path": f"{model_path}.{field_name}",
"value": getattr(model, field_name),
"default": default_value,
"value_source": value_source,
"title": field_info.title,
"description": field_info.description,
"unit": (field_info.json_schema_extra or {}).get("unit"),
"constraints": extract_constraints(field_schema),
}
)

def scan_value(value: Any, owner_asset: "Asset", owner_asset_path: str, value_path: str) -> None:
if isinstance(value, BaseModel):
add_model_rows(value, owner_asset, owner_asset_path, value_path)
return

if isinstance(value, Asset):
if recursive:
walk_asset(value, value_path)
return

if isinstance(value, list):
for i, item in enumerate(value):
scan_value(item, owner_asset, owner_asset_path, f"{value_path}[{i}]")
return

if isinstance(value, dict):
for k, item in value.items():
scan_value(item, owner_asset, owner_asset_path, f"{value_path}.{k}")
return

def walk_asset(asset: "Asset", asset_path: str) -> None:
asset_id = id(asset)
asset_paths_by_id.setdefault(asset_id, set()).add(asset_path)

if id(asset) in visited_assets:
return
visited_assets.add(asset_id)

for input_name, input_value in asset.inputs.items():
value_path = f"{asset_path}.{input_name}"
scan_value(input_value, asset, asset_path, value_path)

walk_asset(self, self.__class__.__name__)

for row in rows:
paths = sorted(asset_paths_by_id.get(row["_asset_id"], {row["asset_path"]}))
row["asset_paths"] = paths

prefix = row["asset_path"] + "."
if row["field_path"].startswith(prefix):
suffix = row["field_path"][len(prefix):]
row["field_paths"] = [f"{path}.{suffix}" for path in paths]
else:
row["field_paths"] = [row["field_path"]]

del row["_asset_id"]

return rows

def parameters_markdown(self, recursive: bool = True) -> str:
"""Render parameters as a human-readable Markdown report.

Args:
recursive: If ``True``, include parameters from upstream assets in
the report.

Returns:
Markdown-formatted report text.
"""

rows = self.list_parameters(recursive=recursive)
if not rows:
return "# Parameters Report\n\nNo pydantic parameters found."

unique_assets = sorted({row["asset_path"] for row in rows})
unique_models = sorted({row["model_name"] for row in rows})
lines = [
"# Parameters Report",
"",
"## Summary",
"",
f"- Root asset: `{self.__class__.__name__}`",
f"- Recursive scan: `{recursive}`",
f"- Assets with parameters: `{len(unique_assets)}`",
f"- Parameter models found: `{len(unique_models)}`",
f"- Total parameter fields: `{len(rows)}`",
"",
]

rows_by_asset: dict[str, list[dict[str, Any]]] = {}
for row in rows:
rows_by_asset.setdefault(row["asset_path"], []).append(row)

for asset_path in unique_assets:
asset_rows = rows_by_asset[asset_path]
lines.append(f"## Asset `{asset_path}`")
lines.append("")
lines.append(f"Asset type: `{asset_rows[0]['asset_type']}`")
lines.append("")

model_names = sorted({row["model_name"] for row in asset_rows})
for model_name in model_names:
lines.append(f"### Model `{model_name}`")
lines.append("")

model_rows = sorted(
[row for row in asset_rows if row["model_name"] == model_name],
key=lambda r: r["field_name"]
)

for row in model_rows:
lines.append(f"#### `{row['field_name']}`")
lines.append(f"- Path: `{row['field_path']}`")
if len(row["asset_paths"]) > 1:
lines.append(f"- Alternate asset paths: `{row['asset_paths']}`")
if len(row["field_paths"]) > 1:
lines.append(f"- Alternate field paths: `{row['field_paths']}`")
lines.append(f"- Value: `{row['value']}`")
lines.append(f"- Default: `{row['default']}`")
lines.append(f"- Value source: `{row['value_source']}`")
if row["unit"]:
lines.append(f"- Unit: `{row['unit']}`")
if row["constraints"]:
lines.append(f"- Constraints: `{row['constraints']}`")
if row["title"]:
lines.append(f"- Title: {row['title']}")
if row["description"]:
lines.append(f"- Description: {row['description']}")
lines.append("")

lines.append("---")
lines.append(f"Generated from `{self.__class__.__name__}.parameters_markdown()`")

return "\n".join(lines)

def parameters_dataframe(self, recursive: bool = True) -> pd.DataFrame:
"""Return parameter rows as a pandas DataFrame.

Args:
recursive: If ``True``, include parameters from upstream assets.

Returns:
DataFrame containing one row per parameter field.
"""

rows = self.list_parameters(recursive=recursive)
columns = [
"asset_path",
"asset_type",
"model_name",
"field_name",
"field_path",
"asset_paths",
"field_paths",
"value",
"default",
"value_source",
"title",
"description",
"unit",
"constraints",
]
df = pd.DataFrame(rows)
if df.empty:
return pd.DataFrame(columns=columns)
return df.reindex(columns=columns)





2 changes: 1 addition & 1 deletion mobility/choice_models/destination_sequence_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def get_destination_probability(self, utilities, motives, dest_prob_cutoff):
costs_bin = utilities[0]
cost_bin_to_dest = utilities[1]

motives_lambda = {motive.name: motive.radiation_lambda for motive in motives}
motives_lambda = {motive.name: motive.inputs["parameters"].radiation_lambda for motive in motives}

prob = (

Expand Down
10 changes: 5 additions & 5 deletions mobility/choice_models/evaluation/car_traffic_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ def get(
weekday: bool = True
):

car_mode = [m for m in self.results.modes if m.name == "car"]
car_mode = [m for m in self.results.modes if m.inputs["parameters"].name == "car"]

if len(car_mode) == 0:
raise ValueError("No car mode in the model.")

car_mode = car_mode[0]

freeflow_graph = self.build_graph_lines_dataframe(car_mode.travel_costs.modified_path_graph)
congested_graph = self.build_graph_lines_dataframe(car_mode.travel_costs.congested_path_graph)
freeflow_graph = self.build_graph_lines_dataframe(car_mode.inputs["travel_costs"].modified_path_graph)
congested_graph = self.build_graph_lines_dataframe(car_mode.inputs["travel_costs"].congested_path_graph)

comparison = (
freeflow_graph
Expand All @@ -49,7 +49,7 @@ def get(
crs="EPSG:3035"
)

fp = car_mode.travel_costs.modified_path_graph.cache_path.parent / "congestion.gpkg"
fp = car_mode.inputs["travel_costs"].modified_path_graph.cache_path.parent / "congestion.gpkg"

gdf.to_file(
fp,
Expand Down Expand Up @@ -111,4 +111,4 @@ def build_graph_lines_dataframe(self, path_graph):

)

return graph_lines
return graph_lines
Loading
Loading