diff --git a/.github/workflows/core_tests.yml b/.github/workflows/core_tests.yml index 8e659b25c..a458a56fe 100644 --- a/.github/workflows/core_tests.yml +++ b/.github/workflows/core_tests.yml @@ -130,6 +130,7 @@ jobs: - run: uv run pytest test/test_skim_name_conflicts.py - run: uv run pytest test/random_seed/test_random_seed.py + - run: uv run pytest test/skip_failed_choices/test_skip_failed_choices.py builtin_regional_models: needs: foundation diff --git a/activitysim/abm/models/location_choice.py b/activitysim/abm/models/location_choice.py index 7f032a8ae..59b4a1c4d 100644 --- a/activitysim/abm/models/location_choice.py +++ b/activitysim/abm/models/location_choice.py @@ -234,6 +234,11 @@ def location_sample( ): # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] choosers = persons_merged[chooser_columns] # create wrapper with keys for this lookup - in this case there is a home_zone_id in the choosers @@ -390,6 +395,11 @@ def location_presample( # FIXME maybe we should add it for multi-zone (from maz_taz) if missing? chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS chooser_columns = [HOME_TAZ if c == HOME_MAZ else c for c in chooser_columns] + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] choosers = persons_merged[chooser_columns] # create wrapper with keys for this lookup - in this case there is a HOME_TAZ in the choosers @@ -620,6 +630,11 @@ def run_location_simulate( # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] choosers = persons_merged[chooser_columns] alt_dest_col_name = model_settings.ALT_DEST_COL_NAME @@ -1072,6 +1087,23 @@ def iterate_location_choice( else: choices_df = choices_df_ + # drop choices that belong to the failed households: state.skipped_household_ids + # so that their choices are not considered in shadow price calculations + # first append household_id to choices_df + choices_df = choices_df.merge( + persons_merged_df[["household_id"]], + left_index=True, + right_index=True, + how="left", + ) + if len(choices_df) > 0: + choices_df = choices_df[ + ~choices_df["household_id"].isin( + state.get("skipped_household_ids", set()) + ) + ] + choices_df = choices_df.drop(columns=["household_id"]) + spc.set_choices( choices=choices_df["choice"], segment_ids=persons_merged_df[chooser_segment_column].reindex( diff --git a/activitysim/abm/models/school_escorting.py b/activitysim/abm/models/school_escorting.py index 32e5058e7..183b45ee2 100644 --- a/activitysim/abm/models/school_escorting.py +++ b/activitysim/abm/models/school_escorting.py @@ -472,6 +472,11 @@ def school_escorting( # reduce memory by limiting columns if selected columns are supplied chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS if chooser_columns is not None: + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in choosers.columns + ): + chooser_columns = chooser_columns + ["household_id"] chooser_columns = chooser_columns + participant_columns choosers = choosers[chooser_columns] diff --git a/activitysim/abm/models/trip_matrices.py b/activitysim/abm/models/trip_matrices.py index 5552e3b00..c4372a216 100644 --- a/activitysim/abm/models/trip_matrices.py +++ b/activitysim/abm/models/trip_matrices.py @@ -307,6 +307,12 @@ def write_trip_matrices( .TAZ.tolist() ) + # print out number of households skipped due to failed choices + if state.settings.skip_failed_choices: + logger.info( + f"\n!!!\nATTENTION: Skipped households with failed choices during simulation. Number of households skipped: {state.get('num_skipped_households', 0)}.\n!!!" + ) + def annotate_trips( state: workflow.State, @@ -393,6 +399,21 @@ def write_matrices( if not matrix_settings: logger.error("Missing MATRICES setting in write_trip_matrices.yaml") + hh_weight_col = model_settings.HH_EXPANSION_WEIGHT_COL + if hh_weight_col: + if state.get("num_skipped_households", 0) > 0: + logger.info( + f"Adjusting household expansion weights in {hh_weight_col} to account for {state.get('num_skipped_households', 0)} skipped households." + ) + # adjust the hh expansion weights to account for skipped households + adjustment_factor = state.get_dataframe("households").shape[0] / ( + state.get_dataframe("households").shape[0] + + state.get("num_skipped_households", 0) + ) + aggregate_trips[hh_weight_col] = ( + aggregate_trips[hh_weight_col] * adjustment_factor + ) + for matrix in matrix_settings: matrix_is_tap = matrix.is_tap diff --git a/activitysim/abm/models/trip_mode_choice.py b/activitysim/abm/models/trip_mode_choice.py index a942b7af8..7e3cf89eb 100644 --- a/activitysim/abm/models/trip_mode_choice.py +++ b/activitysim/abm/models/trip_mode_choice.py @@ -366,7 +366,14 @@ def trip_mode_choice( "trip_mode_choice choices", trips_df[mode_column_name], value_counts=True ) - assert not trips_df[mode_column_name].isnull().any() + # if we're skipping failed choices, the trip modes for failed simulations will be null + if state.settings.skip_failed_choices: + mask_skipped = trips_df["household_id"].isin( + state.get("skipped_household_ids", set()) + ) + assert not trips_df.loc[~mask_skipped, mode_column_name].isnull().any() + else: + assert not trips_df[mode_column_name].isnull().any() state.add_table("trips", trips_df) @@ -382,6 +389,11 @@ def trip_mode_choice( # need to update locals_dict to access skims that are the same .shape as trips table locals_dict = {} locals_dict.update(constants) + if state.settings.skip_failed_choices: + mask_skipped = trips_merged["household_id"].isin( + state.get("skipped_household_ids", set()) + ) + trips_merged = trips_merged.loc[~mask_skipped] simulate.set_skim_wrapper_targets(trips_merged, skims) locals_dict.update(skims) locals_dict["timeframe"] = "trip" diff --git a/activitysim/abm/models/util/school_escort_tours_trips.py b/activitysim/abm/models/util/school_escort_tours_trips.py index 665844023..a6fb52acb 100644 --- a/activitysim/abm/models/util/school_escort_tours_trips.py +++ b/activitysim/abm/models/util/school_escort_tours_trips.py @@ -1043,6 +1043,10 @@ def force_escortee_trip_modes_to_match_chauffeur(state: workflow.State, trips): f"Changed {diff.sum()} trip modes of school escortees to match their chauffeur" ) + # trip_mode can be na if the run allows skipping failed choices and the trip mode choice has failed + if state.settings.skip_failed_choices: + return trips + assert ( ~trips.trip_mode.isna() ).all(), f"Missing trip mode for {trips[trips.trip_mode.isna()]}" diff --git a/activitysim/abm/models/util/tour_destination.py b/activitysim/abm/models/util/tour_destination.py index d99803bd7..2cac47ee1 100644 --- a/activitysim/abm/models/util/tour_destination.py +++ b/activitysim/abm/models/util/tour_destination.py @@ -625,6 +625,11 @@ def run_destination_sample( # if special person id is passed chooser_id_column = model_settings.CHOOSER_ID_COLUMN + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] persons_merged = persons_merged[ [c for c in persons_merged.columns if c in chooser_columns] ] @@ -799,6 +804,11 @@ def run_destination_simulate( # if special person id is passed chooser_id_column = model_settings.CHOOSER_ID_COLUMN + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] persons_merged = persons_merged[ [c for c in persons_merged.columns if c in chooser_columns] ] diff --git a/activitysim/abm/models/util/tour_od.py b/activitysim/abm/models/util/tour_od.py index 5ec9dd493..c2548cbd6 100644 --- a/activitysim/abm/models/util/tour_od.py +++ b/activitysim/abm/models/util/tour_od.py @@ -692,6 +692,9 @@ def run_od_sample( choosers = tours # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ("household_id" in choosers.columns): + chooser_columns = chooser_columns + ["household_id"] choosers = choosers[chooser_columns] # interaction_sample requires that choosers.index.is_monotonic_increasing @@ -951,6 +954,9 @@ def run_od_simulate( # FIXME - MEMORY HACK - only include columns actually used in spec chooser_columns = model_settings.SIMULATE_CHOOSER_COLUMNS + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ("household_id" in choosers.columns): + chooser_columns = chooser_columns + ["household_id"] choosers = choosers[chooser_columns] # interaction_sample requires that choosers.index.is_monotonic_increasing diff --git a/activitysim/abm/models/util/tour_scheduling.py b/activitysim/abm/models/util/tour_scheduling.py index 80474db59..e591c29db 100644 --- a/activitysim/abm/models/util/tour_scheduling.py +++ b/activitysim/abm/models/util/tour_scheduling.py @@ -40,6 +40,12 @@ def run_tour_scheduling( c for c in model_columns if c not in logsum_columns ] + # Drop this when PR #1017 is merged + if ("household_id" not in chooser_columns) and ( + "household_id" in persons_merged.columns + ): + chooser_columns = chooser_columns + ["household_id"] + persons_merged = expressions.filter_chooser_columns(persons_merged, chooser_columns) timetable = state.get_injectable("timetable") diff --git a/activitysim/core/configuration/top.py b/activitysim/core/configuration/top.py index 405560810..1229e754b 100644 --- a/activitysim/core/configuration/top.py +++ b/activitysim/core/configuration/top.py @@ -781,6 +781,13 @@ def _check_store_skims_in_shm(self): should catch many common errors early, including missing required configurations or specified coefficient labels without defined values. """ + skip_failed_choices: bool = True + """ + Skip households that cause errors during processing instead of failing the model run. + + .. versionadded:: 1.6 + """ + other_settings: dict[str, Any] = None def _get_attr(self, attr): diff --git a/activitysim/core/interaction_sample_simulate.py b/activitysim/core/interaction_sample_simulate.py index df1c53fa0..e34974840 100644 --- a/activitysim/core/interaction_sample_simulate.py +++ b/activitysim/core/interaction_sample_simulate.py @@ -351,6 +351,11 @@ def _interaction_sample_simulate( # that is, we want the index value of the row that is offset by rows into the # tranche of this choosers alternatives created by cross join of alternatives and choosers + # when skip failed choices is enabled, the position may be -99 for failed choices, which gets droppped eventually + # here we just need to clip to zero to avoid getting the wrong index in the take() below + if state.settings.skip_failed_choices: + positions = positions.clip(lower=0) + # resulting pandas Int64Index has one element per chooser row and is in same order as choosers choices = alternatives[choice_column].take(positions + first_row_offsets) diff --git a/activitysim/core/logit.py b/activitysim/core/logit.py index 105e18fec..04073cb3a 100644 --- a/activitysim/core/logit.py +++ b/activitysim/core/logit.py @@ -30,6 +30,7 @@ def report_bad_choices( state: workflow.State, bad_row_map, df, + skip_failed_choices, trace_label, msg, trace_choosers=None, @@ -87,6 +88,27 @@ def report_bad_choices( logger.warning(row_msg) + if skip_failed_choices: + # update counter in state + num_skipped_households = state.get("num_skipped_households", 0) + skipped_household_ids = state.get("skipped_household_ids", set()) + for hh_id in df[trace_col].unique(): + if hh_id is None: + continue + if hh_id not in skipped_household_ids: + skipped_household_ids.add(hh_id) + num_skipped_households += 1 + else: + continue + state.set("num_skipped_households", num_skipped_households) + state.set("skipped_household_ids", skipped_household_ids) + + logger.debug( + f"Skipping {bad_row_map.sum()} bad choices. Total skipped households so far: {num_skipped_households}. Skipped household IDs: {skipped_household_ids}" + ) + + return + if raise_error: raise InvalidTravelError(msg_with_count) @@ -136,6 +158,7 @@ def utils_to_probs( allow_zero_probs=False, trace_choosers=None, overflow_protection: bool = True, + skip_failed_choices: bool = True, return_logsums: bool = False, ): """ @@ -176,6 +199,16 @@ def utils_to_probs( overflow_protection will have no benefit but impose a modest computational overhead cost. + skip_failed_choices : bool, default True + If True, when bad choices are detected (all zero probabilities or infinite + probabilities), the entire household that's causing bad choices will be skipped instead of + being masked by overflow protection or causing an error. + A counter will be incremented for each skipped household. This is useful when running large + simulations where occasional bad choices are encountered and should not halt the process. + The counter can be accessed via `state.get("num_skipped_households", 0)`. + The number of skipped households and their IDs will be logged at the end of the simulation. + When `skip_failed_choices` is True, `overflow_protection` will be reverted to False to avoid conflicts. + Returns ------- probs : pandas.DataFrame @@ -203,6 +236,13 @@ def utils_to_probs( utils_arr.dtype == np.float32 and utils_arr.max() > 85 ) + if state.settings.skip_failed_choices is not None: + skip_failed_choices = state.settings.skip_failed_choices + # when skipping failed choices, we cannot use overflow protection + # because it would mask the underlying issue causing bad choices + if skip_failed_choices: + overflow_protection = False + if overflow_protection: # exponentiated utils will overflow, downshift them shifts = utils_arr.max(1, keepdims=True) @@ -240,6 +280,7 @@ def utils_to_probs( state, zero_probs, utils, + skip_failed_choices, trace_label=tracing.extend_trace_label(trace_label, "zero_prob_utils"), msg="all probabilities are zero", trace_choosers=trace_choosers, @@ -251,6 +292,7 @@ def utils_to_probs( state, inf_utils, utils, + skip_failed_choices, trace_label=tracing.extend_trace_label(trace_label, "inf_exp_utils"), msg="infinite exponentiated utilities", trace_choosers=trace_choosers, @@ -281,6 +323,7 @@ def make_choices( trace_label: str = None, trace_choosers=None, allow_bad_probs=False, + skip_failed_choices=True, ) -> tuple[pd.Series, pd.Series]: """ Make choices for each chooser from among a set of alternatives. @@ -316,11 +359,15 @@ def make_choices( np.ones(len(probs.index)) ).abs() > BAD_PROB_THRESHOLD * np.ones(len(probs.index)) + if state.settings.skip_failed_choices is not None: + skip_failed_choices = state.settings.skip_failed_choices + if bad_probs.any() and not allow_bad_probs: report_bad_choices( state, bad_probs, probs, + skip_failed_choices, trace_label=tracing.extend_trace_label(trace_label, "bad_probs"), msg="probabilities do not add up to 1", trace_choosers=trace_choosers, @@ -329,6 +376,8 @@ def make_choices( rands = state.get_rn_generator().random_for_df(probs) choices = pd.Series(choice_maker(probs.values, rands), index=probs.index) + # mark bad choices with -99 + choices[bad_probs] = -99 rands = pd.Series(np.asanyarray(rands).flatten(), index=probs.index) diff --git a/activitysim/core/simulate.py b/activitysim/core/simulate.py index 55caf050a..9dd0ac6a2 100644 --- a/activitysim/core/simulate.py +++ b/activitysim/core/simulate.py @@ -1325,7 +1325,9 @@ def eval_mnl( if custom_chooser: choices, rands = custom_chooser(state, probs, choosers, spec, trace_label) else: - choices, rands = logit.make_choices(state, probs, trace_label=trace_label) + choices, rands = logit.make_choices( + state, probs, trace_label=trace_label, trace_choosers=choosers + ) del probs chunk_sizer.log_df(trace_label, "probs", None) @@ -1485,11 +1487,15 @@ def eval_nl( BAD_PROB_THRESHOLD = 0.001 no_choices = (base_probabilities.sum(axis=1) - 1).abs() > BAD_PROB_THRESHOLD + if state.settings.skip_failed_choices is not None: + skip_failed_choices = state.settings.skip_failed_choices + if no_choices.any(): logit.report_bad_choices( state, no_choices, base_probabilities, + skip_failed_choices, trace_label=tracing.extend_trace_label(trace_label, "bad_probs"), trace_choosers=choosers, msg="base_probabilities do not sum to one", diff --git a/activitysim/core/test/test_logit.py b/activitysim/core/test/test_logit.py index e249475de..d0ee07cb2 100644 --- a/activitysim/core/test/test_logit.py +++ b/activitysim/core/test/test_logit.py @@ -78,6 +78,7 @@ def test_utils_to_probs(utilities, test_data): def test_utils_to_probs_raises(): state = workflow.State().default_settings() + state.settings.skip_failed_choices = False idx = pd.Index(name="household_id", data=[1]) with pytest.raises(RuntimeError) as excinfo: logit.utils_to_probs( diff --git a/activitysim/core/util.py b/activitysim/core/util.py index 20f79c760..58bc50be4 100644 --- a/activitysim/core/util.py +++ b/activitysim/core/util.py @@ -683,6 +683,9 @@ def drop_unused_columns( unique_variables_in_spec |= set(additional_columns or []) + # always keep household_id + unique_variables_in_spec.add("household_id") + if locals_d: unique_variables_in_spec.add(locals_d.get("orig_col_name", None)) unique_variables_in_spec.add(locals_d.get("dest_col_name", None)) diff --git a/activitysim/core/workflow/state.py b/activitysim/core/workflow/state.py index 9f7dcd4d6..eb1ad9f2e 100644 --- a/activitysim/core/workflow/state.py +++ b/activitysim/core/workflow/state.py @@ -939,6 +939,62 @@ def add_table( # at some later time if desired. self.existing_table_status[name] = True self.set(name, content) + # TODO: do not update tables if no new households were skipped + # right now it is updating every time which is inefficient + if self.get("num_skipped_households", 0) > 0: + self.update_table() + + def update_table(self, name: str = None): + """ + Go through existing tables in the state and + get rid of any rows that correspond to skipped households. + """ + skipped_hh_ids = self.get("skipped_household_ids", set()) + if not skipped_hh_ids: + return + + # get existing tables in the current state context + existing_tables = self.registered_tables() + + for table_name in existing_tables: + if not self.is_table(table_name): + continue + df = self.get_dataframe(table_name, as_copy=False) + # get the initial length of the dataframe + initial_len = len(df) + # check if household_id is in index or columns + if "household_id" in df.index.names: + df.drop( + index=df.loc[ + df.index.get_level_values("household_id").isin(skipped_hh_ids) + ].index, + inplace=True, + ) + elif "household_id" in df.columns: + df.drop( + index=df[df["household_id"].isin(skipped_hh_ids)].index, + inplace=True, + ) + else: + continue + # get the length of the dataframe after dropping rows + final_len = len(df) + logger.debug( + f"update_table: dropped {initial_len - final_len} rows from {table_name} " + f"corresponding to skipped households" + ) + # mark this table as edited if we dropped any rows + if final_len < initial_len: + self.existing_table_status[table_name] = True + # terminate the run if we dropped all rows + # and raise an error + if final_len == 0: + raise RuntimeError( + f"update_table: all rows dropped from {table_name} " + f"corresponding to skipped households, terminating run" + ) + # set the updated dataframe back to the state + self.set(table_name, df) def is_table(self, name: str): """ diff --git a/test/skip_failed_choices/.gitignore b/test/skip_failed_choices/.gitignore new file mode 100644 index 000000000..67176c62d --- /dev/null +++ b/test/skip_failed_choices/.gitignore @@ -0,0 +1,2 @@ +configs*/ +output/ \ No newline at end of file diff --git a/test/skip_failed_choices/test_skip_failed_choices.py b/test/skip_failed_choices/test_skip_failed_choices.py new file mode 100644 index 000000000..50f0581b1 --- /dev/null +++ b/test/skip_failed_choices/test_skip_failed_choices.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +# ActivitySim +# See full license in LICENSE.txt. +import importlib.resources +import os +from shutil import copytree + +import pandas as pd +import pytest +import yaml + +from activitysim.core import workflow + + +def example_path(dirname): + resource = os.path.join("examples", "prototype_mtc", dirname) + return str(importlib.resources.files("activitysim").joinpath(resource)) + + +def dir_test_path(dirname): + return os.path.join(os.path.dirname(__file__), dirname) + + +data_dir = example_path("data") +new_configs_dir = dir_test_path("configs") +new_settings_file = os.path.join(new_configs_dir, "settings.yaml") +# copy example configs to test/skip_failed_choices/configs if not already there +if not os.path.exists(new_configs_dir): + copytree(example_path("configs"), new_configs_dir) + + +def update_settings(settings_file, key, value): + with open(settings_file, "r") as f: + settings = yaml.safe_load(f) + f.close() + + settings[key] = value + + with open(settings_file, "w") as f: + yaml.safe_dump(settings, f) + f.close() + + +def update_uec_csv(uec_file, expression, coef_value): + # read in the uec file + df = pd.read_csv(uec_file) + # append a new row, set expression and coef_value + df.loc[len(df), "Expression"] = expression + # from the 4th column onward are coefficients + for col in df.columns[3:]: + df.loc[len(df) - 1, col] = coef_value + df.to_csv(uec_file, index=False) + + +@pytest.fixture +def state(): + configs_dir = new_configs_dir + output_dir = dir_test_path("output") + data_dir = example_path("data") + + # turn the global setting on to skip failed choices + update_settings(new_settings_file, "skip_failed_choices", True) + + # make some choices fail by setting extreme coefficients in the uec + # auto ownership + auto_ownership_uec_file = os.path.join(new_configs_dir, "auto_ownership.csv") + # forcing households in home zone 8 (recoded 7) to fail auto ownership choice + update_uec_csv(auto_ownership_uec_file, "@df.home_zone_id==7", -999.0) + + # work location choice + work_location_choice_uec_file = os.path.join( + new_configs_dir, "workplace_location.csv" + ) + # forcing workers from home zone 18 to fail work location choice + # as if there is a network connection problem for zone 18 + update_uec_csv(work_location_choice_uec_file, "@df.home_zone_id==18", -999.0) + + # trip mode choice + trip_mode_choice_uec_file = os.path.join(new_configs_dir, "trip_mode_choice.csv") + # forcing trips on drivealone tours to fail trip mode choice + update_uec_csv(trip_mode_choice_uec_file, "@df.tour_mode=='DRIVEALONEFREE'", -999.0) + + state = workflow.State.make_default( + configs_dir=configs_dir, + output_dir=output_dir, + data_dir=data_dir, + ) + + from activitysim.abm.tables.skims import network_los_preload + + state.get(network_los_preload) + + state.logging.config_logger() + return state + + +def test_skip_failed_choices(state): + + # check that the setting is indeed set to True + assert state.settings.skip_failed_choices is True + + state.run(models=state.settings.models, resume_after=None) + + # check that the number of skipped households is recorded in state + assert state.get("num_skipped_households", 0) == 943 + + # check that there are no DRIVEALONEFREE tours in the final tours + final_tours_df = state.get_dataframe("tours") + assert "DRIVEALONEFREE" not in final_tours_df["tour_mode"].values + + # check that there are no households in home zone 8 (recoded 7) in the final households + final_households_df = state.get_dataframe("households") + assert not any(final_households_df["home_zone_id"] == 7) + + # check that there are no workers from households in home zone 18 in the final persons + final_persons_df = state.get_dataframe("persons") + assert not any( + (final_persons_df["home_zone_id"] == 18) + & (final_persons_df["is_worker"] == True) + )