diff --git a/activitysim/abm/models/disaggregate_accessibility.py b/activitysim/abm/models/disaggregate_accessibility.py index aa2703e19c..e5a66916bd 100644 --- a/activitysim/abm/models/disaggregate_accessibility.py +++ b/activitysim/abm/models/disaggregate_accessibility.py @@ -15,7 +15,11 @@ from activitysim.abm.models.util import tour_destination from activitysim.abm.tables import shadow_pricing from activitysim.core import estimation, los, tracing, util, workflow -from activitysim.core.configuration.base import PreprocessorSettings, PydanticReadable +from activitysim.core.configuration.base import ( + ComputeSettings, + PreprocessorSettings, + PydanticReadable, +) from activitysim.core.configuration.logit import TourLocationComponentSettings from activitysim.core.expressions import assign_columns @@ -184,6 +188,8 @@ class DisaggregateAccessibilitySettings(PydanticReadable, extra="forbid"): If not supplied or None, will default to the chunk size in the location choice model settings. """ + compute_settings: ComputeSettings | None = None + def read_disaggregate_accessibility_yaml( state: workflow.State, file_name @@ -783,6 +789,11 @@ def get_disaggregate_logsums( if disagg_model_settings.explicit_chunk is not None: model_settings.explicit_chunk = disagg_model_settings.explicit_chunk + # Can set compute settings for disaggregate accessibility + # Otherwise this will be set to whatever is in the location model settings + if disagg_model_settings.compute_settings is not None: + model_settings.compute_settings = disagg_model_settings.compute_settings + # Include the suffix tags to pass onto downstream logsum models (e.g., tour mode choice) if model_settings.LOGSUM_SETTINGS: suffixes = util.concat_suffix_dict(disagg_model_settings.suffixes) diff --git a/activitysim/abm/models/joint_tour_participation.py b/activitysim/abm/models/joint_tour_participation.py index 778ae0f449..1c8f3af9d0 100644 --- a/activitysim/abm/models/joint_tour_participation.py +++ b/activitysim/abm/models/joint_tour_participation.py @@ -20,8 +20,8 @@ ) from activitysim.core.configuration.base import ComputeSettings, PreprocessorSettings from activitysim.core.configuration.logit import LogitComponentSettings -from activitysim.core.util import assign_in_place, reindex from activitysim.core.exceptions import InvalidTravelError +from activitysim.core.util import assign_in_place, reindex logger = logging.getLogger(__name__) @@ -127,7 +127,7 @@ def get_tour_satisfaction(candidates, participate): def participants_chooser( state: workflow.State, - probs: pd.DataFrame, + probs_or_utils: pd.DataFrame, choosers: pd.DataFrame, spec: pd.DataFrame, trace_label: str, @@ -147,9 +147,10 @@ def participants_chooser( Parameters ---------- - probs : pandas.DataFrame + probs_or_utils : pandas.DataFrame Rows for choosers and columns for the alternatives from which they - are choosing. Values are expected to be valid probabilities across + are choosing. If running with explicit_error_terms, these are utilities. + Otherwise, values are expected to be valid probabilities across each row, e.g. they should sum to 1. choosers : pandas.dataframe simple_simulate choosers df @@ -166,7 +167,7 @@ def participants_chooser( """ - assert probs.index.equals(choosers.index) + assert probs_or_utils.index.equals(choosers.index) # choice is boolean (participate or not) model_settings = JointTourParticipationSettings.read_settings_file( @@ -202,7 +203,7 @@ def participants_chooser( "%s max iterations exceeded (%s).", trace_label, MAX_ITERATIONS ) diagnostic_cols = ["tour_id", "household_id", "composition", "adult"] - unsatisfied_candidates = candidates[diagnostic_cols].join(probs) + unsatisfied_candidates = candidates[diagnostic_cols].join(probs_or_utils) state.tracing.write_csv( unsatisfied_candidates, file_name="%s.UNSATISFIED" % trace_label, @@ -215,9 +216,31 @@ def participants_chooser( f"Forcing joint tour participation for {num_tours_remaining} tours." ) # anybody with probability > 0 is forced to join the joint tour - probs[choice_col] = np.where(probs[choice_col] > 0, 1, 0) - non_choice_col = [col for col in probs.columns if col != choice_col][0] - probs[non_choice_col] = 1 - probs[choice_col] + if state.settings.use_explicit_error_terms: + # need "is valid choice" such that we certainly choose those with non-zero values, + # and do not choose others. Let's use 3.0 as large value here. + probs_or_utils[choice_col] = np.where( + probs_or_utils[choice_col] > logit.UTIL_MIN, + 3.0, + logit.UTIL_UNAVAILABLE, + ) + non_choice_col = [ + col for col in probs_or_utils.columns if col != choice_col + ][0] + probs_or_utils[non_choice_col] = np.where( + probs_or_utils[choice_col] <= logit.UTIL_MIN, + 3.0, + logit.UTIL_UNAVAILABLE, + ) + else: + probs_or_utils[choice_col] = np.where( + probs_or_utils[choice_col] > 0, 1, 0 + ) + non_choice_col = [ + col for col in probs_or_utils.columns if col != choice_col + ][0] + probs_or_utils[non_choice_col] = 1 - probs_or_utils[choice_col] + if iter > MAX_ITERATIONS + 1: raise InvalidTravelError( f"{num_tours_remaining} tours could not be satisfied even with forcing participation" @@ -227,8 +250,13 @@ def participants_chooser( f"{num_tours_remaining} tours could not be satisfied after {iter} iterations" ) - choices, rands = logit.make_choices( - state, probs, trace_label=trace_label, trace_choosers=choosers + choice_function = ( + logit.make_choices_utility_based + if state.settings.use_explicit_error_terms + else logit.make_choices + ) + choices, rands = choice_function( + state, probs_or_utils, trace_label=trace_label, trace_choosers=choosers ) participate = choices == PARTICIPATE_CHOICE @@ -252,7 +280,7 @@ def participants_chooser( rands_list.append(rands[satisfied]) # remove candidates of satisfied tours - probs = probs[~satisfied] + probs_or_utils = probs_or_utils[~satisfied] candidates = candidates[~satisfied] logger.debug( @@ -401,6 +429,16 @@ def joint_tour_participation( if i not in model_settings.compute_settings.protect_columns: model_settings.compute_settings.protect_columns.append(i) + # TODO EET: this is related to the difference in nested logit and logit choice as per comment in + # make_choices_utility_based. As soon as alt_order_array is removed from arguments to + # make_choices_explicit_error_term_nl this guard can be removed + if state.settings.use_explicit_error_terms: + assert ( + nest_spec is None + ), "Nested logit model custom chooser for EET requires name_mapping, currently not implemented in jtp" + + custom_chooser = participants_chooser + choices = simulate.simple_simulate_by_chunk_id( state, choosers=candidates, @@ -409,7 +447,7 @@ def joint_tour_participation( locals_d=constants, trace_label=trace_label, trace_choice_name="participation", - custom_chooser=participants_chooser, + custom_chooser=custom_chooser, estimator=estimator, compute_settings=model_settings.compute_settings, ) diff --git a/activitysim/abm/models/location_choice.py b/activitysim/abm/models/location_choice.py index a8b8fd8a18..360ebc6b1f 100644 --- a/activitysim/abm/models/location_choice.py +++ b/activitysim/abm/models/location_choice.py @@ -17,6 +17,7 @@ ) from activitysim.core.interaction_sample import interaction_sample from activitysim.core.interaction_sample_simulate import interaction_sample_simulate +from activitysim.core.logit import AltsContext from activitysim.core.util import reindex from activitysim.core.exceptions import DuplicateWorkflowTableError @@ -603,6 +604,7 @@ def run_location_simulate( chunk_tag, trace_label, skip_choice=False, + alts_context: AltsContext | None = None, ): """ run location model on location_sample annotated with mode_choice logsum @@ -713,6 +715,7 @@ def run_location_simulate( compute_settings=model_settings.compute_settings.subcomponent_settings( "simulate" ), + alts_context=alts_context, ) if not want_logsums: @@ -789,6 +792,11 @@ def run_location_choice( if choosers.shape[0] == 0: logger.info(f"{trace_label} skipping segment {segment_name}: no choosers") continue + # using land use rather than size terms in case something goes 0 base -> nonzero project, double + # check if that would be in dest_size_terms as a zero + alts_context = AltsContext.from_series(dest_size_terms.index) # index zone_id, not ALT_DEST_COL_NAME + # assumes that dest_size_terms will always contain zeros for non-attractive zones, i.e. it will have the + # same length as land_use # - location_sample location_sample_df = run_location_sample( @@ -842,6 +850,7 @@ def run_location_choice( trace_label, "simulate.%s" % segment_name ), skip_choice=skip_choice, + alts_context=alts_context, ) if estimator: @@ -1020,6 +1029,10 @@ def iterate_location_choice( ) = None # initialize to None, will be populated in first iteration for iteration in range(1, max_iterations + 1): + # Force reset the setting at the start of each shadow price iteration for consistency + logger.info("Resetting random number seeds for iteration {}".format(iteration)) + state.get_rn_generator().end_step(trace_label) + state.get_rn_generator().begin_step(trace_label) persons_merged_df_ = persons_merged_df.copy() if spc.use_shadow_pricing and iteration > 1: diff --git a/activitysim/abm/models/parking_location_choice.py b/activitysim/abm/models/parking_location_choice.py index 32f3aabee2..5df4f3f687 100644 --- a/activitysim/abm/models/parking_location_choice.py +++ b/activitysim/abm/models/parking_location_choice.py @@ -21,6 +21,7 @@ from activitysim.core.configuration.base import PreprocessorSettings from activitysim.core.configuration.logit import LogitComponentSettings from activitysim.core.interaction_sample_simulate import interaction_sample_simulate +from activitysim.core.logit import AltsContext from activitysim.core.tracing import print_elapsed_time from activitysim.core.util import assign_in_place, drop_unused_columns from activitysim.core.exceptions import DuplicateWorkflowTableError @@ -112,6 +113,7 @@ def parking_destination_simulate( chunk_size, trace_hh_id, trace_label, + alts_context: AltsContext | None = None, ): """ Chose destination from destination_sample (with od_logsum and dp_logsum columns added) @@ -150,6 +152,7 @@ def parking_destination_simulate( trace_label=trace_label, trace_choice_name="parking_loc", explicit_chunk_size=model_settings.explicit_chunk, + alts_context=alts_context, ) # drop any failed zero_prob destinations @@ -211,6 +214,9 @@ def choose_parking_location( ) destination_sample.index = np.repeat(trips.index.values, len(alternatives)) destination_sample.index.name = trips.index.name + # using destination_sample would also be right because destination_sample isn't a sample here, + # but that could change + alts_context = AltsContext.from_series(alternatives[alt_dest_col_name]) destinations = parking_destination_simulate( state, @@ -223,6 +229,7 @@ def choose_parking_location( chunk_size=chunk_size, trace_hh_id=trace_hh_id, trace_label=trace_label, + alts_context=alts_context ) if want_sample_table: diff --git a/activitysim/abm/models/trip_departure_choice.py b/activitysim/abm/models/trip_departure_choice.py index 7b34f8e742..a0bf95e5a3 100644 --- a/activitysim/abm/models/trip_departure_choice.py +++ b/activitysim/abm/models/trip_departure_choice.py @@ -24,10 +24,10 @@ PreprocessorSettings, PydanticCompute, ) +from activitysim.core.exceptions import SegmentedSpecificationError from activitysim.core.skim_dataset import SkimDataset from activitysim.core.skim_dictionary import SkimDict from activitysim.core.util import reindex -from activitysim.core.exceptions import SegmentedSpecificationError logger = logging.getLogger(__name__) @@ -351,37 +351,51 @@ def choose_tour_leg_pattern( column_labels=["alternative", "utility"], ) - # convert to probabilities (utilities exponentiated and normalized to probs) - # probs is same shape as utilities, one row per chooser and one column for alternative - probs = logit.utils_to_probs( - state, utilities_df, trace_label=trace_label, trace_choosers=trip_segment - ) + if state.settings.use_explicit_error_terms: + utilities_df = logit.validate_utils( + state, utilities_df, trace_label=trace_label, trace_choosers=trip_segment + ) + # make choices + # positions is series with the chosen alternative represented as a column index in probs + # which is an integer between zero and num alternatives in the alternative sample + positions, rands = logit.make_choices_utility_based( + state, utilities_df, trace_label=trace_label, trace_choosers=trip_segment + ) + + del utilities_df + chunk_sizer.log_df(trace_label, "utilities_df", None) + else: + # convert to probabilities (utilities exponentiated and normalized to probs) + # probs is same shape as utilities, one row per chooser and one column for alternative + probs = logit.utils_to_probs( + state, utilities_df, trace_label=trace_label, trace_choosers=trip_segment + ) - chunk_sizer.log_df(trace_label, "probs", probs) + chunk_sizer.log_df(trace_label, "probs", probs) - del utilities_df - chunk_sizer.log_df(trace_label, "utilities_df", None) + del utilities_df + chunk_sizer.log_df(trace_label, "utilities_df", None) - if have_trace_targets: - state.tracing.trace_df( - probs, - tracing.extend_trace_label(trace_label, "probs"), - column_labels=["alternative", "probability"], + if have_trace_targets: + state.tracing.trace_df( + probs, + tracing.extend_trace_label(trace_label, "probs"), + column_labels=["alternative", "probability"], + ) + + # make choices + # positions is series with the chosen alternative represented as a column index in probs + # which is an integer between zero and num alternatives in the alternative sample + positions, rands = logit.make_choices( + state, probs, trace_label=trace_label, trace_choosers=trip_segment ) - # make choices - # positions is series with the chosen alternative represented as a column index in probs - # which is an integer between zero and num alternatives in the alternative sample - positions, rands = logit.make_choices( - state, probs, trace_label=trace_label, trace_choosers=trip_segment - ) + del probs + chunk_sizer.log_df(trace_label, "probs", None) chunk_sizer.log_df(trace_label, "positions", positions) chunk_sizer.log_df(trace_label, "rands", rands) - del probs - chunk_sizer.log_df(trace_label, "probs", None) - # shouldn't have chosen any of the dummy pad utilities assert positions.max() < max_sample_count diff --git a/activitysim/abm/models/trip_destination.py b/activitysim/abm/models/trip_destination.py index 853cfc35e9..3ad35470cd 100644 --- a/activitysim/abm/models/trip_destination.py +++ b/activitysim/abm/models/trip_destination.py @@ -32,6 +32,7 @@ from activitysim.core.configuration.logit import LocationComponentSettings from activitysim.core.interaction_sample import interaction_sample from activitysim.core.interaction_sample_simulate import interaction_sample_simulate +from activitysim.core.logit import AltsContext from activitysim.core.skim_dictionary import DataFrameMatrix from activitysim.core.tracing import print_elapsed_time from activitysim.core.util import assign_in_place, reindex @@ -950,6 +951,7 @@ def trip_destination_simulate( skim_hotel, estimator, trace_label, + alts_context: AltsContext | None = None, ): """ Chose destination from destination_sample (with od_logsum and dp_logsum columns added) @@ -1036,6 +1038,7 @@ def trip_destination_simulate( trace_choice_name="trip_dest", estimator=estimator, explicit_chunk_size=model_settings.explicit_chunk, + alts_context=alts_context, ) if not want_logsums: @@ -1126,7 +1129,10 @@ def choose_trip_destination( destination_sample["dp_logsum"] = 0.0 t0 = print_elapsed_time("%s.compute_logsums" % trace_label, t0, debug=True) - + alt_dest_col_name = model_settings.ALT_DEST_COL_NAME + alts = alternatives.index + assert alts.name == alt_dest_col_name + alts_context = AltsContext.from_series(alts) destinations = trip_destination_simulate( state, primary_purpose=primary_purpose, @@ -1138,6 +1144,7 @@ def choose_trip_destination( skim_hotel=skim_hotel, estimator=estimator, trace_label=trace_label, + alts_context=alts_context, ) dropped_trips = ~trips.index.isin(destinations.index) diff --git a/activitysim/abm/models/trip_scheduling_choice.py b/activitysim/abm/models/trip_scheduling_choice.py index 094b00843c..254155c333 100644 --- a/activitysim/abm/models/trip_scheduling_choice.py +++ b/activitysim/abm/models/trip_scheduling_choice.py @@ -19,6 +19,7 @@ PydanticReadable, ) from activitysim.core.interaction_sample_simulate import _interaction_sample_simulate +from activitysim.core.logit import AltsContext from activitysim.core.skim_dataset import SkimDataset from activitysim.core.skim_dictionary import SkimDict @@ -275,7 +276,7 @@ def run_trip_scheduling_choice( choosers, chunk_trace_label, chunk_sizer, - ) in chunk.adaptive_chunked_choosers(state, indirect_tours, trace_label): + ) in chunk.adaptive_chunked_choosers(state, indirect_tours, trace_label, explicit_chunk_size=model_settings.explicit_chunk): # Sort the choosers and get the schedule alternatives choosers = choosers.sort_index() schedules = generate_schedule_alternatives(choosers).sort_index() @@ -302,6 +303,7 @@ def run_trip_scheduling_choice( estimator=None, chunk_sizer=chunk_sizer, compute_settings=model_settings.compute_settings, + alts_context= AltsContext(schedules[SCHEDULE_ID].min(), schedules[SCHEDULE_ID].max()), ) assert len(choices.index) == len(choosers.index) @@ -347,6 +349,13 @@ class TripSchedulingChoiceSettings(PydanticReadable, extra="forbid"): compute_settings: ComputeSettings = ComputeSettings() """Compute settings for this component.""" + explicit_chunk: float = 0 + """ + If > 0, use this chunk size instead of adaptive chunking. + If less than 1, use this fraction of the total number of rows. + """ + + @workflow.step def trip_scheduling_choice( diff --git a/activitysim/abm/models/util/cdap.py b/activitysim/abm/models/util/cdap.py index 21f42de827..48ec7a31a1 100644 --- a/activitysim/abm/models/util/cdap.py +++ b/activitysim/abm/models/util/cdap.py @@ -999,11 +999,18 @@ def household_activity_choices( # add joint util to util utils = utils.add(joint_tour_utils) - probs = logit.utils_to_probs(state, utils, trace_label=trace_label) + if state.settings.use_explicit_error_terms: + utils = logit.validate_utils(state, utils, trace_label=trace_label) - # select an activity pattern alternative for each household based on probability - # result is a series indexed on _hh_index_ with the (0 based) index of the column from probs - idx_choices, rands = logit.make_choices(state, probs, trace_label=trace_label) + idx_choices, rands = logit.make_choices_utility_based( + state, utils, trace_label=trace_label + ) + else: + probs = logit.utils_to_probs(state, utils, trace_label=trace_label) + + # select an activity pattern alternative for each household based on probability + # result is a series indexed on _hh_index_ with the (0 based) index of the column from probs + idx_choices, rands = logit.make_choices(state, probs, trace_label=trace_label) # convert choice expressed as index into alternative name from util column label choices = pd.Series(utils.columns[idx_choices].values, index=utils.index) @@ -1021,16 +1028,20 @@ def household_activity_choices( "%s.hhsize%d_utils" % (trace_label, hhsize), column_labels=["expression", "household"], ) - state.tracing.trace_df( - probs, - "%s.hhsize%d_probs" % (trace_label, hhsize), - column_labels=["expression", "household"], - ) + + if not state.settings.use_explicit_error_terms: + state.tracing.trace_df( + probs, + "%s.hhsize%d_probs" % (trace_label, hhsize), + column_labels=["expression", "household"], + ) + state.tracing.trace_df( choices, "%s.hhsize%d_activity_choices" % (trace_label, hhsize), column_labels=["expression", "household"], ) + state.tracing.trace_df( rands, "%s.hhsize%d_rands" % (trace_label, hhsize), columns=[None, "rand"] ) diff --git a/activitysim/abm/tables/shadow_pricing.py b/activitysim/abm/tables/shadow_pricing.py index f993df445c..091aaf7504 100644 --- a/activitysim/abm/tables/shadow_pricing.py +++ b/activitysim/abm/tables/shadow_pricing.py @@ -171,6 +171,7 @@ def __init__( self.num_processes = num_processes self.use_shadow_pricing = bool(state.settings.use_shadow_pricing) + logger.info("Use shadow pricing: %s", self.use_shadow_pricing) self.saved_shadow_price_file_path = ( None # set by read_saved_shadow_prices if loaded ) diff --git a/activitysim/core/chunk.py b/activitysim/core/chunk.py index f0074683d8..019f6abaef 100644 --- a/activitysim/core/chunk.py +++ b/activitysim/core/chunk.py @@ -1244,6 +1244,7 @@ def adaptive_chunked_choosers( chunk_size = math.ceil(num_choosers * explicit_chunk_size) else: chunk_size = math.ceil(explicit_chunk_size / num_processes) + logger.info(f"Running {trace_label} with {chunk_size} choosers") elif chunk_size is None: chunk_size = state.settings.chunk_size diff --git a/activitysim/core/configuration/base.py b/activitysim/core/configuration/base.py index 6936e6d1ad..aad266ff12 100644 --- a/activitysim/core/configuration/base.py +++ b/activitysim/core/configuration/base.py @@ -135,6 +135,10 @@ class ComputeSettings(PydanticBase): Sharrow settings for a component. """ + # Make this more general compute settings and use for explicit error term overrides + # Default None work for sub-components defined in getter below (eet_subcomponent) + use_explicit_error_terms: None | bool | dict[str, bool] = None + sharrow_skip: bool | dict[str, bool] = False """Skip sharrow when evaluating this component. @@ -218,6 +222,13 @@ def should_skip(self, subcomponent: str) -> bool: else: return bool(self.sharrow_skip) + def eet_subcomponent(self, subcomponent: str) -> bool: + """Check for EET overrides for a particular subcomponent.""" + if isinstance(self.use_explicit_error_terms, dict): + return self.use_explicit_error_terms.get(subcomponent, None) + else: + return self.use_explicit_error_terms + @contextmanager def pandas_option_context(self): """Context manager to set pandas options for compute settings.""" @@ -266,6 +277,7 @@ def subcomponent_settings(self, subcomponent: str) -> ComputeSettings: use_numba=self.use_numba, drop_unused_columns=self.drop_unused_columns, protect_columns=self.protect_columns, + use_explicit_error_terms=self.eet_subcomponent(subcomponent), ) diff --git a/activitysim/core/configuration/top.py b/activitysim/core/configuration/top.py index 4055608100..bded7123d9 100644 --- a/activitysim/core/configuration/top.py +++ b/activitysim/core/configuration/top.py @@ -701,6 +701,7 @@ def _check_store_skims_in_shm(self): "memory_profile", "instrument", "sharrow", + "use_explicit_error_terms", ) """ Setting to log on startup. @@ -778,7 +779,18 @@ def _check_store_skims_in_shm(self): """ run checks to validate that YAML settings files are loadable and spec and coefficent csv can be resolved. - should catch many common errors early, including missing required configurations or specified coefficient labels without defined values. + should catch many common errors early, including missing required configurations or specified coefficient labels without defined values. + """ + + use_explicit_error_terms: bool = False + """ + Make choice from random utility model by drawing from distribution of unobserved + part of utility and taking the maximum of total utility. + + Defaults to standard Monte Carlo method, i.e., calculating probabilities and then + drawing a single uniform random number to draw from cumulative probabily. + + .. versionadded:: 1.x """ other_settings: dict[str, Any] = None diff --git a/activitysim/core/interaction_sample.py b/activitysim/core/interaction_sample.py index 52116638ba..e66840778a 100644 --- a/activitysim/core/interaction_sample.py +++ b/activitysim/core/interaction_sample.py @@ -3,10 +3,10 @@ from __future__ import annotations import logging +import typing import numpy as np import pandas as pd - from activitysim.core import ( chunk, estimation, @@ -17,16 +17,143 @@ util, workflow, ) +from activitysim.core.chunk import ChunkSizer from activitysim.core.configuration.base import ComputeSettings +from activitysim.core.exceptions import SegmentedSpecificationError from activitysim.core.skim_dataset import DatasetWrapper from activitysim.core.skim_dictionary import SkimWrapper -from activitysim.core.exceptions import SegmentedSpecificationError +if typing.TYPE_CHECKING: + from activitysim.core.random import Random logger = logging.getLogger(__name__) DUMP = False +def _poisson_sample_alternatives_inner( + alternative_count: int, + probs: pd.DataFrame, + poisson_inclusion_probs: pd.DataFrame, + rng: Random, + trace_label: str | None, + chunk_sizer:ChunkSizer, +) -> pd.DataFrame: + rands = rng.random_for_df(probs, n=alternative_count) + chunk_sizer.log_df(trace_label, "rands", rands) + sampled_mask = rands < poisson_inclusion_probs + sampled_results = poisson_inclusion_probs.where(sampled_mask) + return sampled_results + + +def make_sample_choices_utility_based( + state: workflow.State, + choosers, + utilities, + alternatives, + sample_size, + alternative_count, + alt_col_name, + allow_zero_probs, + trace_label:str, + chunk_sizer:ChunkSizer, +): + assert isinstance(utilities, pd.DataFrame) + assert utilities.shape == (len(choosers), alternative_count) + + assert isinstance(alternatives, pd.DataFrame) + assert len(alternatives) == alternative_count + + if allow_zero_probs: + zero_probs = ( + utilities.sum(axis=1) <= utilities.shape[1] * logit.UTIL_UNAVAILABLE + ) + if zero_probs.all(): + utils = pd.DataFrame( + columns=[alt_col_name, "rand", "prob", choosers.index.name] + ) + probs = pd.DataFrame(0.0, index=utils.index, columns=utils.columns) + return utils, probs + if zero_probs.any(): + # remove from sample + utilities = utilities[~zero_probs] + choosers = choosers[~zero_probs] + + utils_array = utilities.to_numpy() + chunk_sizer.log_df(trace_label, "utils_array", utils_array) + + probs = logit.utils_to_probs( + state, + utilities, + allow_zero_probs=allow_zero_probs, + trace_label=trace_label, + overflow_protection=not allow_zero_probs, + trace_choosers=choosers, + ) + inclusion_probs, sampled_alternatives = _poisson_sample_alternatives(alternative_count, chunk_sizer, probs, + sample_size, state, trace_label) + + # Stack removes the NaNs (the ones that weren't sampled) + # and gives us a multi-index of (person_id, alt_id) + choices_df = ( + sampled_alternatives.rename_axis("alt_idx", axis=1) + .stack() + .reset_index(name="prob") + .assign(**{alt_col_name: lambda df: alternatives.index.values[df["alt_idx"]]}) + .drop(columns=["alt_idx"]) + ) + + # Here we return the inclusion probabilities i.e. the true probability of being sampled and (ab)use the fact + # that pick_count=1 by definition and ln(1)=0 and recover the standard sample correction term. + # In non-Poisson sampling, we would return the probs of sampling an alternative once + # and the sampling correction factor np.log(df.pick_count/df.prob) is applied to the simulate utilities. + # TODO is it safe change the meaning of df.prob, given it's referenced in expression csvs? + # (but the alternative is to update all the expression CSV for sampling?) + return choices_df, inclusion_probs + + +def _poisson_sample_alternatives(alternative_count, chunk_sizer: ChunkSizer, probs: pd.DataFrame, sample_size, + state: workflow.State, trace_label: str) -> tuple[pd.DataFrame, pd.DataFrame]: + # compute the inclusion probability as the reciprocal of alt never being drawn + # -- these are common, so compute once upfront + exclusion_probs = (1 - probs) ** sample_size + inclusion_probs = 1 - exclusion_probs + + n = 0 + probs_subset = probs + inclusion_probs_subset = inclusion_probs + sampled_alternatives = pd.DataFrame(0.0, index=inclusion_probs.index, columns=inclusion_probs.columns) + while True: + sampled_results_subset = _poisson_sample_alternatives_inner( + alternative_count, probs_subset, inclusion_probs_subset, state.get_rn_generator(), trace_label, chunk_sizer + ) + no_alts_sampled_mask = sampled_results_subset.isna().all(axis=1) + alts_with_sampled_alternatives = sampled_results_subset[~no_alts_sampled_mask] + sampled_alternatives.loc[alts_with_sampled_alternatives.index, :] = alts_with_sampled_alternatives + if no_alts_sampled_mask.any(): + # TODO if this happens in base but the project case is such that something is picked, random numbers won't + # be consistent - we're asserting that this is very rare models where the sample size is not too small + logger.info(f"Poisson sampling of alternatives failed with {n=}, retrying") + # TODO put this behind a debug guard, because it will be slow + logger.info( + f"Sampled size was {sample_size}, poisson method mean expected sample size was {inclusion_probs.sum(axis=1).mean():.1f}, actual sampled mean was {(sampled_alternatives > 0).sum(axis=1).mean():.1f} and highest zero selection prob was {(exclusion_probs).product(axis=1).max():.2g}") + probs_subset = probs[no_alts_sampled_mask] + inclusion_probs_subset = inclusion_probs[no_alts_sampled_mask] + + else: # All alternatives are fine + break + + n += 1 + if n == 10: + choosers_no_alts_sampled = sampled_results_subset[no_alts_sampled_mask] + msg = (f"Poisson choice set sampling failed after 10 attempts for these cases:\n" + f"{choosers_no_alts_sampled}\n{probs_subset}") + raise ValueError(msg) + + chunk_sizer.log_df(trace_label, "sampled_alternatives", sampled_alternatives) + + return inclusion_probs, sampled_alternatives + + def make_sample_choices( state: workflow.State, choosers, @@ -135,7 +262,7 @@ def _interaction_sample( locals_d=None, trace_label=None, zone_layer=None, - chunk_sizer=None, + chunk_sizer: ChunkSizer|None=None, compute_settings: ComputeSettings | None = None, ): """ @@ -191,15 +318,18 @@ def _interaction_sample( choices_df : pandas.DataFrame A DataFrame where index should match the index of the choosers DataFrame - and columns alt_col_name, prob, rand, pick_count + and columns alt_col_name, prob, pick_count + alt_col_name: int + the identifier of the alternatives prob: float the probability of the chosen alternative - rand: float - the rand that did the choosing pick_count : int number of duplicate picks for chooser, alt """ + assert chunk_sizer is not None, "chunk_sizer cannot be None but old nullable signature is preserved" + # TODO it's probably safe to reorder these arguments to make chunk_sizer mandatory since + # _interaction_sample is private? have_trace_targets = state.tracing.has_trace_targets(choosers) trace_ids = None @@ -443,54 +573,55 @@ def _interaction_sample( state.tracing.dump_df(DUMP, utilities, trace_label, "utilities") - # convert to probabilities (utilities exponentiated and normalized to probs) - # probs is same shape as utilities, one row per chooser and one column for alternative - probs = logit.utils_to_probs( - state, - utilities, - allow_zero_probs=allow_zero_probs, - trace_label=trace_label, - trace_choosers=choosers, - overflow_protection=not allow_zero_probs, - ) - chunk_sizer.log_df(trace_label, "probs", probs) - - del utilities - chunk_sizer.log_df(trace_label, "utilities", None) - - if have_trace_targets: - state.tracing.trace_df( - probs, - tracing.extend_trace_label(trace_label, "probs"), - column_labels=["alternative", "probability"], + if compute_settings.use_explicit_error_terms is not None: + use_eet = compute_settings.use_explicit_error_terms + logger.info( + f"Interaction sample model-specific EET overrides for {trace_label}: eet = {use_eet}" ) + else: + use_eet = state.settings.use_explicit_error_terms if sample_size == 0: - # FIXME return full alternative set rather than sample - logger.info( - "Estimation mode for %s using unsampled alternatives" % (trace_label,) - ) + # Return full alternative set rather than sample + logger.info("Using unsampled alternatives for %s" % (trace_label,)) - index_name = probs.index.name + index_name = utilities.index.name choices_df = ( - pd.melt(probs.reset_index(), id_vars=[index_name]) + pd.melt( + utilities.reset_index(), + id_vars=[index_name], + value_name="prob", + var_name=alt_col_name, + ) .sort_values(by=index_name, kind="mergesort") .set_index(index_name) - .rename(columns={"value": "prob"}) - .drop(columns="variable") + .assign(prob=1) + .assign(pick_count=1) ) + chunk_sizer.log_df(trace_label, "choices_df", choices_df) - choices_df["pick_count"] = 1 - choices_df.insert( - 0, alt_col_name, np.tile(alternatives.index.values, len(choosers.index)) - ) + # utilities are numbered 0..n-1 so we need to map back to alt ids + alternative_map = pd.Series(alternatives.index).to_dict() + choices_df[alt_col_name] = choices_df[alt_col_name].map(alternative_map) + + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) return choices_df - else: - choices_df = make_sample_choices( + + if use_eet: + utilities = logit.validate_utils( + state, + utilities, + allow_zero_probs=allow_zero_probs, + trace_label=trace_label, + trace_choosers=choosers, + ) + + choices_df, probs = make_sample_choices_utility_based( state, choosers, - probs, + utilities, alternatives, sample_size, alternative_count, @@ -500,45 +631,77 @@ def _interaction_sample( chunk_sizer=chunk_sizer, ) - chunk_sizer.log_df(trace_label, "choices_df", choices_df) + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) - if estimation.manager.enabled and sample_size > 0: - # we need to ensure chosen alternative is included in the sample - survey_choices = estimation.manager.get_survey_destination_choices( - state, choosers, trace_label - ) - if survey_choices is not None: - assert ( - survey_choices.index == choosers.index - ).all(), "survey_choices and choosers must have the same index" - survey_choices.name = alt_col_name - survey_choices = survey_choices.dropna().astype( - choices_df[alt_col_name].dtype + if estimation.manager.enabled and sample_size > 0: + + choices_df = _ensure_chosen_alts_in_sample( + alt_col_name, + alternatives, + choices_df, + choosers, + probs, + state, + trace_label, ) - # merge all survey choices onto choices_df - probs_df = probs.reset_index().melt( - id_vars=[choosers.index.name], - var_name=alt_col_name, - value_name="prob", + del probs + chunk_sizer.log_df(trace_label, "probs", None) + + else: + # convert to probabilities (utilities exponentiated and normalized to probs) + # probs is same shape as utilities, one row per chooser and one column for alternative + probs = logit.utils_to_probs( + state, + utilities, + allow_zero_probs=allow_zero_probs, + trace_label=trace_label, + trace_choosers=choosers, + overflow_protection=not allow_zero_probs, + ) + chunk_sizer.log_df(trace_label, "probs", probs) + + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) + + if have_trace_targets: + state.tracing.trace_df( + probs, + tracing.extend_trace_label(trace_label, "probs"), + column_labels=["alternative", "probability"], ) - # probs are numbered 0..n-1 so we need to map back to alt ids - zone_map = pd.Series(alternatives.index).to_dict() - probs_df[alt_col_name] = probs_df[alt_col_name].map(zone_map) - - survey_choices = pd.merge( - survey_choices, - probs_df, - on=[choosers.index.name, alt_col_name], - how="left", + + choices_df = make_sample_choices( + state, + choosers, + probs, + alternatives, + sample_size, + alternative_count, + alt_col_name, + allow_zero_probs=allow_zero_probs, + trace_label=trace_label, + chunk_sizer=chunk_sizer, + ) + + chunk_sizer.log_df(trace_label, "choices_df", choices_df) + + if estimation.manager.enabled and sample_size > 0: + choices_df = _ensure_chosen_alts_in_sample( + alt_col_name, + alternatives, + choices_df, + choosers, + probs, + state, + trace_label, ) - survey_choices["rand"] = 0 - survey_choices["prob"].fillna(0, inplace=True) - choices_df = pd.concat([choices_df, survey_choices], ignore_index=True) - choices_df.sort_values(by=[choosers.index.name], inplace=True) - del probs - chunk_sizer.log_df(trace_label, "probs", None) + del probs + chunk_sizer.log_df(trace_label, "probs", None) + + chunk_sizer.log_df(trace_label, "choices_df", choices_df) # pick_count and pick_dup # pick_count is number of duplicate picks @@ -570,8 +733,10 @@ def _interaction_sample( column_labels=["sample_alt", "alternative"], ) - # don't need this after tracing - del choices_df["rand"] + if not state.settings.use_explicit_error_terms: + # don't need this after tracing + del choices_df["rand"] + chunk_sizer.log_df(trace_label, "choices_df", choices_df) # - NARROW @@ -582,6 +747,49 @@ def _interaction_sample( return choices_df +def _ensure_chosen_alts_in_sample( + alt_col_name, + alternatives: pd.DataFrame, + choices_df: pd.DataFrame, + choosers: pd.DataFrame, + probs: pd.DataFrame, + state: workflow.State, + trace_label:str, +) -> pd.DataFrame: + # we need to ensure chosen alternative is included in the sample + survey_choices = estimation.manager.get_survey_destination_choices( + state, choosers, trace_label + ) + if survey_choices is not None: + assert ( + survey_choices.index == choosers.index + ).all(), "survey_choices and choosers must have the same index" + survey_choices.name = alt_col_name + survey_choices = survey_choices.dropna().astype(choices_df[alt_col_name].dtype) + + # merge all survey choices onto choices_df + probs_df = probs.reset_index().melt( + id_vars=[choosers.index.name], + var_name=alt_col_name, + value_name="prob", + ) + # probs are numbered 0..n-1 so we need to map back to alt ids + zone_map = pd.Series(alternatives.index).to_dict() + probs_df[alt_col_name] = probs_df[alt_col_name].map(zone_map) + + survey_choices = pd.merge( + survey_choices, + probs_df, + on=[choosers.index.name, alt_col_name], + how="left", + ) + survey_choices["rand"] = 0 + survey_choices["prob"].fillna(0, inplace=True) + choices_df = pd.concat([choices_df, survey_choices], ignore_index=True) + choices_df.sort_values(by=[choosers.index.name], inplace=True) + return choices_df + + def interaction_sample( state: workflow.State, choosers: pd.DataFrame, @@ -674,7 +882,14 @@ def interaction_sample( assert choosers.index.is_monotonic_increasing # FIXME - legacy logic - not sure this is needed or even correct? - sample_size = min(sample_size, len(alternatives.index)) + if not state.settings.use_explicit_error_terms: + sample_size = min(sample_size, len(alternatives.index)) + # with poisson sampling, definitely don't want to reduce sample size - it's not a sample size but a number + # of theoretical draws. Another options would be to disable sampling if # alts < sample size to ensure + # all are included (but this wouldn't behave well if there were land use changes in the project case which + # switched regimes) + + logger.info(f" --- interaction_sample sample size = {sample_size}") result_list = [] for ( diff --git a/activitysim/core/interaction_sample_simulate.py b/activitysim/core/interaction_sample_simulate.py index df1c53fa03..2dbd52e871 100644 --- a/activitysim/core/interaction_sample_simulate.py +++ b/activitysim/core/interaction_sample_simulate.py @@ -9,8 +9,9 @@ from activitysim.core import chunk, interaction_simulate, logit, tracing, util, workflow from activitysim.core.configuration.base import ComputeSettings -from activitysim.core.simulate import set_skim_wrapper_targets from activitysim.core.exceptions import SegmentedSpecificationError +from activitysim.core.logit import AltsContext +from activitysim.core.simulate import set_skim_wrapper_targets logger = logging.getLogger(__name__) @@ -34,6 +35,7 @@ def _interaction_sample_simulate( *, chunk_sizer: chunk.ChunkSizer, compute_settings: ComputeSettings | None = None, + alts_context: AltsContext | None = None, ): """ Run a MNL simulation in the situation in which alternatives must @@ -220,9 +222,6 @@ def _interaction_sample_simulate( ) chunk_sizer.log_df(trace_label, "interaction_utilities", interaction_utilities) - del interaction_df - chunk_sizer.log_df(trace_label, "interaction_df", None) - if have_trace_targets: state.tracing.trace_interaction_eval_results( trace_eval_results, @@ -258,26 +257,37 @@ def _interaction_sample_simulate( # (we want to insert dummy utilities at the END of the list of alternative utilities) # inserts is a list of the indices at which we want to do the insertions inserts = np.repeat(last_row_offsets, max_sample_count - sample_counts) - + del last_row_offsets del sample_counts chunk_sizer.log_df(trace_label, "sample_counts", None) # insert the zero-prob utilities to pad each alternative set to same size padded_utilities = np.insert(interaction_utilities.utility.values, inserts, -999) + padded_alt_nrs = np.insert(interaction_df[choice_column], inserts, -999) chunk_sizer.log_df(trace_label, "padded_utilities", padded_utilities) del inserts - + del interaction_df del interaction_utilities + # TODO EET: chunk_sizer logging could be more precise + chunk_sizer.log_df(trace_label, "interaction_df", None) chunk_sizer.log_df(trace_label, "interaction_utilities", None) # reshape to array with one row per chooser, one column per alternative padded_utilities = padded_utilities.reshape(-1, max_sample_count) + padded_alt_nrs = padded_alt_nrs.reshape(-1, max_sample_count) # convert to a dataframe with one row per chooser and one column per alternative utilities_df = pd.DataFrame(padded_utilities, index=choosers.index) + # alt_nrs_df has columns for each alt in the choice set, with values indicating which alt_id + # they correspond to (as opposed to the 0-n index implied by the column number). + if alts_context is not None: + alt_nrs_df = pd.DataFrame(padded_alt_nrs, index=choosers.index) + else: + alt_nrs_df = None # if we don't provide the number of dense alternatives, assume that we'll use the old approach chunk_sizer.log_df(trace_label, "utilities_df", utilities_df) del padded_utilities + del padded_alt_nrs chunk_sizer.log_df(trace_label, "padded_utilities", None) if have_trace_targets: @@ -287,50 +297,89 @@ def _interaction_sample_simulate( column_labels=["alternative", "utility"], ) - # convert to probabilities (utilities exponentiated and normalized to probs) - # probs is same shape as utilities, one row per chooser and one column for alternative - if want_logsums: - probs, logsums = logit.utils_to_probs( + if state.settings.use_explicit_error_terms: + if want_logsums: + logsums = logit.utils_to_logsums( + utilities_df, allow_zero_probs=allow_zero_probs + ) + chunk_sizer.log_df(trace_label, "logsums", logsums) + + if skip_choice: + return choosers.join(logsums.to_frame("logsums")) + + utilities_df = logit.validate_utils( state, utilities_df, allow_zero_probs=allow_zero_probs, trace_label=trace_label, trace_choosers=choosers, - overflow_protection=not allow_zero_probs, - return_logsums=True, ) - chunk_sizer.log_df(trace_label, "logsums", logsums) - else: - probs = logit.utils_to_probs( - state, - utilities_df, - allow_zero_probs=allow_zero_probs, - trace_label=trace_label, - trace_choosers=choosers, - overflow_protection=not allow_zero_probs, + + if allow_zero_probs: + zero_probs = ( + utilities_df.sum(axis=1) + <= utilities_df.shape[1] * logit.UTIL_UNAVAILABLE + ) + if zero_probs.any(): + # copied from proabability below, fix when that gets fixed + # FIXME this is kind of gnarly, but we force choice of first alt + utilities_df.loc[ + zero_probs, 0 + ] = 3.0 # arbitrary value much larger than UTIL_UNAVAILABLE + + # positions is series with the chosen alternative represented as a column index in utilities_df + # which is an integer between zero and num alternatives in the alternative sample + positions, rands = logit.make_choices_utility_based( + state, utilities_df, trace_label=trace_label, trace_choosers=choosers, alts_context=alts_context, + alt_nrs_df=alt_nrs_df ) - chunk_sizer.log_df(trace_label, "probs", probs) - del utilities_df - chunk_sizer.log_df(trace_label, "utilities_df", None) + del utilities_df + chunk_sizer.log_df(trace_label, "utilities_df", None) + else: + # convert to probabilities (utilities exponentiated and normalized to probs) + # probs is same shape as utilities, one row per chooser and one column for alternative + if want_logsums: + probs, logsums = logit.utils_to_probs( + state, + utilities_df, + allow_zero_probs=allow_zero_probs, + trace_label=trace_label, + trace_choosers=choosers, + overflow_protection=not allow_zero_probs, + return_logsums=True, + ) + chunk_sizer.log_df(trace_label, "logsums", logsums) + else: + probs = logit.utils_to_probs( + state, + utilities_df, + allow_zero_probs=allow_zero_probs, + trace_label=trace_label, + trace_choosers=choosers, + overflow_protection=not allow_zero_probs, + ) + chunk_sizer.log_df(trace_label, "probs", probs) + + del utilities_df + chunk_sizer.log_df(trace_label, "utilities_df", None) - if have_trace_targets: - state.tracing.trace_df( - probs, - tracing.extend_trace_label(trace_label, "probs"), - column_labels=["alternative", "probability"], - ) + if have_trace_targets: + state.tracing.trace_df( + probs, + tracing.extend_trace_label(trace_label, "probs"), + column_labels=["alternative", "probability"], + ) - if allow_zero_probs: - zero_probs = probs.sum(axis=1) == 0 - if zero_probs.any(): - # FIXME this is kind of gnarly, but we force choice of first alt - probs.loc[zero_probs, 0] = 1.0 + if allow_zero_probs: + zero_probs = probs.sum(axis=1) == 0 + if zero_probs.any(): + # FIXME this is kind of gnarly, but we force choice of first alt + probs.loc[zero_probs, 0] = 1.0 - if skip_choice: - return choosers.join(logsums.to_frame("logsums")) + if skip_choice: + return choosers.join(logsums.to_frame("logsums")) - else: # make choices # positions is series with the chosen alternative represented as a column index in probs # which is an integer between zero and num alternatives in the alternative sample @@ -338,59 +387,59 @@ def _interaction_sample_simulate( state, probs, trace_label=trace_label, trace_choosers=choosers ) - chunk_sizer.log_df(trace_label, "positions", positions) - chunk_sizer.log_df(trace_label, "rands", rands) - del probs chunk_sizer.log_df(trace_label, "probs", None) - # shouldn't have chosen any of the dummy pad utilities - assert positions.max() < max_sample_count + chunk_sizer.log_df(trace_label, "positions", positions) + chunk_sizer.log_df(trace_label, "rands", rands) - # need to get from an integer offset into the alternative sample to the alternative index - # that is, we want the index value of the row that is offset by rows into the - # tranche of this choosers alternatives created by cross join of alternatives and choosers + # shouldn't have chosen any of the dummy pad utilities + assert positions.max() < max_sample_count - # resulting pandas Int64Index has one element per chooser row and is in same order as choosers - choices = alternatives[choice_column].take(positions + first_row_offsets) + # need to get from an integer offset into the alternative sample to the alternative index + # that is, we want the index value of the row that is offset by rows into the + # tranche of this choosers alternatives created by cross join of alternatives and choosers - # create a series with index from choosers and the index of the chosen alternative - choices = pd.Series(choices, index=choosers.index) + # resulting pandas Int64Index has one element per chooser row and is in same order as choosers + choices = alternatives[choice_column].take(positions + first_row_offsets) - chunk_sizer.log_df(trace_label, "choices", choices) + # create a series with index from choosers and the index of the chosen alternative + choices = pd.Series(choices, index=choosers.index) - if allow_zero_probs and zero_probs.any() and zero_prob_choice_val is not None: - # FIXME this is kind of gnarly, patch choice for zero_probs - choices.loc[zero_probs] = zero_prob_choice_val + chunk_sizer.log_df(trace_label, "choices", choices) - if have_trace_targets: - state.tracing.trace_df( - choices, - tracing.extend_trace_label(trace_label, "choices"), - columns=[None, trace_choice_name], - ) + if allow_zero_probs and zero_probs.any() and zero_prob_choice_val is not None: + # FIXME this is kind of gnarly, patch choice for zero_probs + choices.loc[zero_probs] = zero_prob_choice_val + + if have_trace_targets: + state.tracing.trace_df( + choices, + tracing.extend_trace_label(trace_label, "choices"), + columns=[None, trace_choice_name], + ) + state.tracing.trace_df( + rands, + tracing.extend_trace_label(trace_label, "rands"), + columns=[None, "rand"], + ) + if want_logsums: state.tracing.trace_df( - rands, - tracing.extend_trace_label(trace_label, "rands"), - columns=[None, "rand"], + logsums, + tracing.extend_trace_label(trace_label, "logsum"), + columns=[None, "logsum"], ) - if want_logsums: - state.tracing.trace_df( - logsums, - tracing.extend_trace_label(trace_label, "logsum"), - columns=[None, "logsum"], - ) - if want_logsums: - choices = choices.to_frame("choice") - choices["logsum"] = logsums + if want_logsums: + choices = choices.to_frame("choice") + choices["logsum"] = logsums - chunk_sizer.log_df(trace_label, "choices", choices) + chunk_sizer.log_df(trace_label, "choices", choices) - # handing this off to our caller - chunk_sizer.log_df(trace_label, "choices", None) + # handing this off to our caller + chunk_sizer.log_df(trace_label, "choices", None) - return choices + return choices def interaction_sample_simulate( @@ -413,6 +462,7 @@ def interaction_sample_simulate( skip_choice=False, explicit_chunk_size=0, *, + alts_context: AltsContext | None = None, compute_settings: ComputeSettings | None = None, ): """ @@ -458,6 +508,12 @@ def interaction_sample_simulate( explicit_chunk_size : float, optional If > 0, specifies the chunk size to use when chunking the interaction simulation. If < 1, specifies the fraction of the total number of choosers. + alts_context: int, optional + The number of alternatives available in the choice set in the absense of sampling. + This is used with EET simulation to ensure consistent random numbers across the whole alternative set + ( as the sampled set may change between base and project). When not provided, + the fallback approach is used which may result in frozen error terms being applied to the wrong alternatives + if the choice set changes. Returns ------- @@ -513,6 +569,7 @@ def interaction_sample_simulate( skip_choice, chunk_sizer=chunk_sizer, compute_settings=compute_settings, + alts_context=alts_context, ) result_list.append(choices) diff --git a/activitysim/core/interaction_simulate.py b/activitysim/core/interaction_simulate.py index 0d84241395..ef6bbb1217 100644 --- a/activitysim/core/interaction_simulate.py +++ b/activitysim/core/interaction_simulate.py @@ -15,8 +15,8 @@ from activitysim.core import chunk, logit, simulate, timing, tracing, util, workflow from activitysim.core.configuration.base import ComputeSettings -from activitysim.core.fast_eval import fast_eval from activitysim.core.exceptions import SegmentedSpecificationError +from activitysim.core.fast_eval import fast_eval logger = logging.getLogger(__name__) @@ -904,29 +904,42 @@ def _interaction_simulate( state.tracing.dump_df(DUMP, utilities, trace_label, "utilities") - # convert to probabilities (utilities exponentiated and normalized to probs) - # probs is same shape as utilities, one row per chooser and one column for alternative - probs = logit.utils_to_probs( - state, utilities, trace_label=trace_label, trace_choosers=choosers - ) - chunk_sizer.log_df(trace_label, "probs", probs) + if state.settings.use_explicit_error_terms: + utilities = logit.validate_utils( + state, utilities, trace_label=trace_label, trace_choosers=choosers + ) + positions, rands = logit.make_choices_utility_based( + state, utilities, trace_label=trace_label, trace_choosers=choosers + ) - del utilities - chunk_sizer.log_df(trace_label, "utilities", None) + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) - if have_trace_targets: - state.tracing.trace_df( - probs, - tracing.extend_trace_label(trace_label, "probs"), - column_labels=["alternative", "probability"], + else: + # convert to probabilities (utilities exponentiated and normalized to probs) + # probs is same shape as utilities, one row per chooser and one column for alternative + probs = logit.utils_to_probs( + state, utilities, trace_label=trace_label, trace_choosers=choosers + ) + chunk_sizer.log_df(trace_label, "probs", probs) + + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) + + if have_trace_targets: + state.tracing.trace_df( + probs, + tracing.extend_trace_label(trace_label, "probs"), + column_labels=["alternative", "probability"], + ) + + # make choices + # positions is series with the chosen alternative represented as a column index in probs + # which is an integer between zero and num alternatives in the alternative sample + positions, rands = logit.make_choices( + state, probs, trace_label=trace_label, trace_choosers=choosers ) - # make choices - # positions is series with the chosen alternative represented as a column index in probs - # which is an integer between zero and num alternatives in the alternative sample - positions, rands = logit.make_choices( - state, probs, trace_label=trace_label, trace_choosers=choosers - ) chunk_sizer.log_df(trace_label, "positions", positions) chunk_sizer.log_df(trace_label, "rands", rands) diff --git a/activitysim/core/logit.py b/activitysim/core/logit.py index 105e18fecc..8ab8ee1cf3 100644 --- a/activitysim/core/logit.py +++ b/activitysim/core/logit.py @@ -4,6 +4,8 @@ import logging import warnings +from dataclasses import dataclass +from typing import Union import numpy as np import pandas as pd @@ -22,6 +24,11 @@ EXP_UTIL_MIN = 1e-300 EXP_UTIL_MAX = np.inf +# TODO-EET: Figure out what type we want UTIL_MIN to be, currently np.float64 +UTIL_MIN = np.log(EXP_UTIL_MIN, dtype=np.float64) +UTIL_UNAVAILABLE = 1000.0 * (UTIL_MIN - 1.0) + + PROB_MIN = 0.0 PROB_MAX = 1.0 @@ -128,6 +135,70 @@ def utils_to_logsums(utils, exponentiated=False, allow_zero_probs=False): return logsums +def validate_utils( + state: workflow.State, + utils, + trace_label=None, + allow_zero_probs=False, + trace_choosers=None, +): + """ + Validate utilities to ensure non-available choices are treated the same in EET and MC. + For EET decisions, no conversion to probabilities is required because choices + are made on the basis of comparing utilities (only differences matter). + However, large negative utility values are used in practice to make choices + unavailable based on probability calculations, which boils down to evaluating + exp(utility). We here use this to define a minimum utility that corresponds + to an unavailable choice. + + Parameters + ---------- + utils : pandas.DataFrame + Rows should be choosers and columns should be alternatives. + + trace_label : str, optional + label for tracing bad utility or probability values + + allow_zero_probs : bool + if True value rows in which all utility alts are UTIL_MIN will be set to + UTIL_UNAVAILABLE. + + trace_choosers : pandas.dataframe + the choosers df (for interaction_simulate) to facilitate the reporting of hh_id + by report_bad_choices because it can't deduce hh_id from the interaction_dataset + which is indexed on index values from alternatives df + + Returns + ------- + utils : pandas.DataFrame + utils with values that would lead to zero probability replaced by UTIL_UNAVAILABLE + + """ + trace_label = tracing.extend_trace_label(trace_label, "validate_utils") + + utils_arr = utils.values + + np.putmask(utils_arr, utils_arr <= UTIL_MIN, UTIL_UNAVAILABLE) + + arr_sum = utils_arr.sum(axis=1) + + if not allow_zero_probs: + zero_probs = arr_sum <= utils_arr.shape[1] * UTIL_UNAVAILABLE + if zero_probs.any(): + report_bad_choices( + state, + zero_probs, + utils, + trace_label=tracing.extend_trace_label(trace_label, "zero_prob_utils"), + msg="all probabilities are zero", + trace_choosers=trace_choosers, + ) + + utils = pd.DataFrame(utils_arr, columns=utils.columns, index=utils.index) + + return utils + + def utils_to_probs( state: workflow.State, utils, @@ -274,6 +345,171 @@ def utils_to_probs( return probs, logsums return probs +FREEZE_RANDOM_NUMBERS_FOR_DENSE_ALTERNATIVE_SET = True + +@dataclass +class AltsContext: + """Representation of the alternatives without carrying around that full array.""" + min_alt_id: int + max_alt_id: int + + def __post_init__(self): + # e.g. for zero based zones max_alt_id = n_alts - 1 + # but for 1 based zones, we don't need to add extra padding + self.n_rands_to_sample = max(self.max_alt_id, self.n_alts_to_cover_max_id) + + @classmethod + def from_series(cls, ser:Union[pd.Series,pd.Index])->"AltsContext": + min_alt_id = ser.min() + max_alt_id = ser.max() + return cls(min_alt_id, max_alt_id) + + @classmethod + def from_num_alts(cls, num_alts:int, zero_based:bool=True)->"AltsContext": + if zero_based: + offset = -1 + else: + offset =0 + return cls(min_alt_id=1+offset, max_alt_id=num_alts+offset ) + + + @property + def n_alts_to_cover_max_id(self) -> int: + """If zones were non-consecutive, this could be a big over-estimate.""" + return self.max_alt_id+1 + + +# TODO-EET: add doc string, tracing +def add_ev1_random(state: workflow.State, df: pd.DataFrame, alt_info: AltsContext | None = None, + alt_nrs_df: pd.DataFrame | None = None, ): + + nest_utils_for_choice = df.copy() + assert (alt_info is None) == ( + alt_nrs_df is None), "n_zones and alt_nrs_df must both be provided or omitted together" + + if alt_nrs_df is not None and FREEZE_RANDOM_NUMBERS_FOR_DENSE_ALTERNATIVE_SET: + assert alt_info is not None # narrowing for mypy + + idx_array = alt_nrs_df.values + mask = idx_array == -999 + safe_idx = np.where(mask, 1, idx_array) # replace -999 with a temp value inbounds + # generate random number for all alts - this is wasteful, but ensures that the same zone + # gets the same random number if the sampled choice set changes between base and project + # (alternatively, one could seed a channel for (persons x zones) and use the zone seed to ensure consistency. + # Trade off is needing to seed (persons x zones) rows and multiindex channels to + # avoid extra random numbers generated here. Quick benchmark suggests seeding per row is likely slower + rands_dense = state.get_rn_generator().gumbel_for_df(nest_utils_for_choice, n=alt_info.n_alts_to_cover_max_id) + # generate n=alt_info.max_alt_id+1 rather than n_alts so that indexing works + # (this is drawing a random number for a redundant zeroth zone in 1 based zoning systems) + # TODO deal with non 0->n-1 indexed land use more efficiently? ideally do where alt_nrs_df is constructed, + # not on the fly here. Potentially via state.get_injectable('network_los').get_skim_dict('taz').zone_ids + rands = np.take_along_axis(rands_dense, safe_idx, axis=1) + rands[mask] = 0 # zero out the masked zones so they don't have the util adjustment of alt 0 + else: + # old behaviour, to remove + rands = state.get_rn_generator().gumbel_for_df(nest_utils_for_choice, n=nest_utils_for_choice.shape[1]) + + nest_utils_for_choice += rands + return nest_utils_for_choice + + +def choose_from_tree( + nest_utils, all_alternatives, logit_nest_groups, nest_alternatives_by_name +): + for level, nest_names in logit_nest_groups.items(): + if level == 1: + next_level_alts = nest_alternatives_by_name[nest_names[0]] + continue + choice_this_level = nest_utils[nest_utils.index.isin(next_level_alts)].idxmax() + if choice_this_level in all_alternatives: + return choice_this_level + next_level_alts = nest_alternatives_by_name[choice_this_level] + raise ValueError("This should never happen - no alternative found") + + +# TODO-EET: add doc string, tracing +def make_choices_explicit_error_term_nl( + state, nested_utilities, alt_order_array, nest_spec, trace_label +): + """walk down the nesting tree and make choice at each level, which is the root of the next level choice.""" + nest_utils_for_choice = add_ev1_random(state, nested_utilities) + + all_alternatives = set(nest.name for nest in each_nest(nest_spec, type="leaf")) + logit_nest_groups = group_nest_names_by_level(nest_spec) + nest_alternatives_by_name = {n.name: n.alternatives for n in each_nest(nest_spec)} + + # Apply is slow. It could *maybe* be sped up by using the fact that the nesting structure is the same for all rows: + # Add ev1(0,1) to all entries (as is currently being done). Then, at each level, pick the maximum of the available + # composite alternatives and set the corresponding entry to 1 for each row, set all other alternatives at this level + # to zero. Once the tree is walked (all alternatives have been processed), take the product of the alternatives in + # each leaf's alternative list. Then pick the only alternative with entry 1, all others must be 0. + choices = nest_utils_for_choice.apply( + lambda x: choose_from_tree( + x, all_alternatives, logit_nest_groups, nest_alternatives_by_name + ), + axis=1, + ) + # TODO-EET: reporting like for zero probs + assert not choices.isnull().any(), f"No choice for {trace_label}" + choices = pd.Series(choices, index=nest_utils_for_choice.index) + + # In order for choice indexing to be consistent with MNL and cumsum MC choices, we need to index in the order + # alternatives were originally created before adding nest nodes that are not elemental alternatives + choices = choices.map({v: k for k, v in enumerate(alt_order_array)}) + + return choices + + +# TODO-EET: add doc string, tracing +def make_choices_explicit_error_term_mnl(state, utilities, trace_label, + alts_context: AltsContext | None = None, + alt_nrs_df: pd.DataFrame | None = None, + ): + utilities_incl_unobs = add_ev1_random(state, utilities, alts_context, alt_nrs_df) + choices = np.argmax(utilities_incl_unobs.to_numpy(), axis=1) + # TODO-EET: reporting like for zero probs + assert not np.isnan(choices).any(), f"No choice for {trace_label}" + choices = pd.Series(choices, index=utilities_incl_unobs.index) + return choices + + +def make_choices_explicit_error_term( + state, utilities, alt_order_array, nest_spec=None, trace_label=None, + alts_context: AltsContext | None = None, + alt_nrs_df: pd.DataFrame | None = None, +): + trace_label = tracing.extend_trace_label(trace_label, "make_choices_eet") + if nest_spec is None: + choices = make_choices_explicit_error_term_mnl(state, utilities, trace_label, alts_context, alt_nrs_df) + else: + choices = make_choices_explicit_error_term_nl( + state, utilities, alt_order_array, nest_spec, trace_label + ) + return choices + + +def make_choices_utility_based( + state: workflow.State, + utilities: pd.DataFrame, + name_mapping=None, + nest_spec=None, + trace_label: str | None = None, + trace_choosers=None, + allow_bad_probs=False, + alts_context: AltsContext | None = None, + alt_nrs_df: pd.DataFrame | None = None, +) -> tuple[pd.Series, pd.Series]: + trace_label = tracing.extend_trace_label(trace_label, "make_choices_utility_based") + + # TODO-EET: index of choices for nested utilities is different than unnested - this needs to be consistent for + # turning indexes into alternative names to keep code changes to minimum for now + choices = make_choices_explicit_error_term( + state, utilities, name_mapping, nest_spec, trace_label, alts_context, alt_nrs_df + ) + # TODO-EET: rands - log all zeros for now + rands = pd.Series(np.zeros_like(utilities.index.values), index=utilities.index) + return choices, rands + def make_choices( state: workflow.State, @@ -284,28 +520,23 @@ def make_choices( ) -> tuple[pd.Series, pd.Series]: """ Make choices for each chooser from among a set of alternatives. - Parameters ---------- probs : pandas.DataFrame Rows for choosers and columns for the alternatives from which they are choosing. Values are expected to be valid probabilities across each row, e.g. they should sum to 1. - trace_choosers : pandas.dataframe the choosers df (for interaction_simulate) to facilitate the reporting of hh_id by report_bad_choices because it can't deduce hh_id from the interaction_dataset which is indexed on index values from alternatives df - Returns ------- choices : pandas.Series Maps chooser IDs (from `probs` index) to a choice, where the choice is an index into the columns of `probs`. - rands : pandas.Series The random numbers used to make the choices (for debugging, tracing) - """ trace_label = tracing.extend_trace_label(trace_label, "make_choices") @@ -546,6 +777,7 @@ def _each_nest(spec: LogitNestSpec, parent_nest, post_order): nest.level = parent_nest.level + 1 nest.product_of_coefficients = parent_nest.product_of_coefficients nest.ancestors = parent_nest.ancestors + [name] + nest.coefficient = parent_nest.coefficient yield spec, nest @@ -602,3 +834,12 @@ def count_each_nest(spec, count): return 1 return count_each_nest(nest_spec, 0) if nest_spec is not None else 0 + + +def group_nest_names_by_level(nest_spec): + # group nests by level, returns {level: [nest.name at that level]} + depth = np.max([x.level for x in each_nest(nest_spec)]) + nest_levels = {x: [] for x in range(1, depth + 1)} + for n in each_nest(nest_spec): + nest_levels[n.level].append(n.name) + return nest_levels diff --git a/activitysim/core/pathbuilder.py b/activitysim/core/pathbuilder.py index 31393ceeaf..da241addcd 100644 --- a/activitysim/core/pathbuilder.py +++ b/activitysim/core/pathbuilder.py @@ -991,16 +991,16 @@ def build_virtual_path( utilities_df.index = orig.index with memo("#TVPB build_virtual_path make_choices"): - probs = logit.utils_to_probs( - self.network_los.state, - utilities_df, - allow_zero_probs=True, - trace_label=trace_label, - overflow_protection=False, - ) - chunk_sizer.log_df(trace_label, "probs", probs) - if trace: + probs = logit.utils_to_probs( + self.network_los.state, + utilities_df, + allow_zero_probs=True, + trace_label=trace_label, + overflow_protection=False, + ) + chunk_sizer.log_df(trace_label, "probs", probs) + choices = override_choices utilities_df["choices"] = choices @@ -1008,21 +1008,44 @@ def build_virtual_path( probs["choices"] = choices self.trace_df(probs, trace_label, "probs") + del probs + chunk_sizer.log_df(trace_label, "probs", None) else: - choices, rands = logit.make_choices( - self.network_los.state, - probs, - allow_bad_probs=True, - trace_label=trace_label, - ) + if self.network_los.state.settings.use_explicit_error_terms: + utilities_df = logit.validate_utils( + self.network_los.state, + utilities_df, + allow_zero_probs=True, + trace_label=trace_label, + ) + choices, rands = logit.make_choices_utility_based( + self.network_los.state, + utilities_df, + trace_label=trace_label, + ) + else: + probs = logit.utils_to_probs( + self.network_los.state, + utilities_df, + allow_zero_probs=True, + trace_label=trace_label, + overflow_protection=False, + ) + chunk_sizer.log_df(trace_label, "probs", probs) + + choices, rands = logit.make_choices( + self.network_los.state, + probs, + allow_bad_probs=True, + trace_label=trace_label, + ) + del probs + chunk_sizer.log_df(trace_label, "probs", None) chunk_sizer.log_df(trace_label, "rands", rands) del rands chunk_sizer.log_df(trace_label, "rands", None) - del probs - chunk_sizer.log_df(trace_label, "probs", None) - # we need to get path_set, btap, atap from path_df row with same seq and path_num # drop seq join column, but keep path_num of choice to override_choices when tracing columns_to_cache = ["btap", "atap", "path_set", "path_num"] diff --git a/activitysim/core/random.py b/activitysim/core/random.py index 37b1976403..79e6f7e7a4 100644 --- a/activitysim/core/random.py +++ b/activitysim/core/random.py @@ -9,8 +9,8 @@ import numpy as np import pandas as pd -from activitysim.core.util import reindex from activitysim.core.exceptions import DuplicateLoadableObjectError, TableIndexError +from activitysim.core.util import reindex from .tracing import print_elapsed_time @@ -194,6 +194,8 @@ def _generators_for_df(self, df): df_row_states = self.row_states.loc[df.index] + # https://numpy.org/doc/stable/reference/random/generator.html + # np.random.default_rng() prng = np.random.RandomState() for row in df_row_states.itertuples(): prng.seed(row.row_seed) @@ -245,6 +247,50 @@ def random_for_df(self, df, step_name, n=1): self.row_states.loc[df.index, "offset"] += n return rands + def gumbel_for_df(self, df, step_name, n=1): + """ + Return n floating point gumbel-distributed numbers for each row in df + using the appropriate random channel for each row. + + Subsequent calls (in the same step) will return the next rand for each df row + + The resulting array will be the same length (and order) as df + This method is designed to support alternative selection from a probability array + + The columns in df are ignored; the index name and values are used to determine + which random number sequence to to use. + + If "true pseudo random" behavior is desired (i.e. NOT repeatable) the set_base_seed + method (q.v.) may be used to globally reseed all random streams. + + Parameters + ---------- + df : pandas.DataFrame + df with index name and values corresponding to a registered channel + + n : int + number of rands desired per df row + + Returns + ------- + rands : 2-D ndarray + array the same length as df, with n floats in range [0, 1) for each df row + """ + + assert self.step_name + assert self.step_name == step_name + + # - reminder: prng must be called when yielded as generated sequence, not serialized + generators = self._generators_for_df(df) + + # rands = np.asanyarray([prng.gumbel(size=n) for prng in generators]) + # this is about 20% faster for large arrays, like for destination choice + rands = np.asanyarray([-np.log(-np.log(prng.rand(n))) for prng in generators]) + + # update offset for rows we handled + self.row_states.loc[df.index, "offset"] += n + return rands + def normal_for_df(self, df, step_name, mu, sigma, lognormal=False, size=None): """ Return a floating point random number in normal (or lognormal) distribution @@ -617,6 +663,42 @@ def random_for_df(self, df, n=1): rands = channel.random_for_df(df, self.step_name, n) return rands + def gumbel_for_df(self, df, n=1): + """ + Return a single floating point gumbel for each row in df + using the appropriate random channel for each row. + + Subsequent calls (in the same step) will return the next rand for each df row + + The resulting array will be the same length (and order) as df + This method is designed to support alternative selection from a probability array + + The columns in df are ignored; the index name and values are used to determine + which random number sequence to to use. + + We assume that we can identify the channel to used based on the name of df.index + This channel should have already been registered by a call to add_channel (q.v.) + + If "true pseudo random" behavior is desired (i.e. NOT repeatable) the set_base_seed + method (q.v.) may be used to globally reseed all random streams. + + Parameters + ---------- + df : pandas.DataFrame + df with index name and values corresponding to a registered channel + + n : int + number of rands desired (default 1) + + Returns + ------- + choices : 1-D ndarray the same length as df + a single float in range [0, 1) for each row in df + """ + channel = self.get_channel_for_df(df) + rands = channel.gumbel_for_df(df, self.step_name, n) + return rands + def normal_for_df(self, df, mu=0, sigma=1, broadcast=False, size=None): """ Return a single floating point normal random number in range (-inf, inf) for each row in df diff --git a/activitysim/core/simulate.py b/activitysim/core/simulate.py index c38e16a850..bf9c23f5d7 100644 --- a/activitysim/core/simulate.py +++ b/activitysim/core/simulate.py @@ -4,6 +4,7 @@ import logging import time +import typing import warnings from collections import OrderedDict from collections.abc import Callable @@ -32,7 +33,8 @@ LogitNestSpec, TemplatedLogitComponentSettings, ) -from activitysim.core.estimation import Estimator + +from activitysim.core.exceptions import ModelConfigurationError from activitysim.core.fast_eval import fast_eval from activitysim.core.simulate_consts import ( ALT_LOSER_UTIL, @@ -40,7 +42,9 @@ SPEC_EXPRESSION_NAME, SPEC_LABEL_NAME, ) -from activitysim.core.exceptions import ModelConfigurationError +if typing.TYPE_CHECKING: + from activitysim.core.estimation import Estimator + logger = logging.getLogger(__name__) @@ -1093,6 +1097,46 @@ def set_skim_wrapper_targets(df, skims, allow_partial_success: bool = True): # ) +def compute_nested_utilities(raw_utilities, nest_spec): + """ + compute nest utilities based on nesting coefficients + + For nest nodes this is the logsum of alternatives adjusted by nesting coefficient + + leaf <- raw_utility / nest_coefficient + nest <- ln(sum of exponentiated raw_utility of leaves) * nest_coefficient) + + Parameters + ---------- + raw_utilities : pandas.DataFrame + dataframe with the raw alternative utilities of all leaves + (what in non-nested logit would be the utilities of all the alternatives) + nest_spec : dict + Nest tree dict from the model spec yaml file + + Returns + ------- + nested_utilities : pandas.DataFrame + Will have the index of `raw_utilities` and columns for leaf and node utilities + """ + nested_utilities = pd.DataFrame(index=raw_utilities.index) + + for nest in logit.each_nest(nest_spec, post_order=True): + name = nest.name + if nest.is_leaf: + nested_utilities[name] = ( + raw_utilities[name].astype(float) / nest.product_of_coefficients + ) + else: + # the alternative nested_utilities will already have been computed due to post_order + with np.errstate(divide="ignore"): + nested_utilities[name] = nest.coefficient * np.log( + np.exp(nested_utilities[nest.alternatives]).sum(axis=1) + ) + + return nested_utilities + + def compute_nested_exp_utilities(raw_utilities, nest_spec): """ compute exponentiated nest utilities based on nesting coefficients @@ -1224,7 +1268,7 @@ def eval_mnl( choosers, spec, locals_d, - custom_chooser: CustomChooser_T, + custom_chooser: CustomChooser_T | None, estimator, log_alt_losers=False, want_logsums=False, @@ -1308,29 +1352,47 @@ def eval_mnl( column_labels=["alternative", "utility"], ) - probs = logit.utils_to_probs( - state, utilities, trace_label=trace_label, trace_choosers=choosers - ) - chunk_sizer.log_df(trace_label, "probs", probs) + if state.settings.use_explicit_error_terms: + utilities = logit.validate_utils( + state, utilities, trace_label=trace_label, trace_choosers=choosers + ) - del utilities - chunk_sizer.log_df(trace_label, "utilities", None) + if custom_chooser: + choices, rands = custom_chooser( + state, utilities, choosers, spec, trace_label + ) + else: + choices, rands = logit.make_choices_utility_based( + state, utilities, trace_label=trace_label + ) - if have_trace_targets: - # report these now in case make_choices throws error on bad_choices - state.tracing.trace_df( - probs, - "%s.probs" % trace_label, - column_labels=["alternative", "probability"], - ) + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) - if custom_chooser: - choices, rands = custom_chooser(state, probs, choosers, spec, trace_label) else: - choices, rands = logit.make_choices(state, probs, trace_label=trace_label) + probs = logit.utils_to_probs( + state, utilities, trace_label=trace_label, trace_choosers=choosers + ) + chunk_sizer.log_df(trace_label, "probs", probs) - del probs - chunk_sizer.log_df(trace_label, "probs", None) + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) + + if have_trace_targets: + # report these now in case make_choices throws error on bad_choices + state.tracing.trace_df( + probs, + "%s.probs" % trace_label, + column_labels=["alternative", "probability"], + ) + + if custom_chooser: + choices, rands = custom_chooser(state, probs, choosers, spec, trace_label) + else: + choices, rands = logit.make_choices(state, probs, trace_label=trace_label) + + del probs + chunk_sizer.log_df(trace_label, "probs", None) if have_trace_targets: state.tracing.trace_df( @@ -1431,87 +1493,140 @@ def eval_nl( column_labels=["alternative", "utility"], ) - # exponentiated utilities of leaves and nests - nested_exp_utilities = compute_nested_exp_utilities(raw_utilities, nest_spec) - chunk_sizer.log_df(trace_label, "nested_exp_utilities", nested_exp_utilities) + if state.settings.use_explicit_error_terms: + # TODO-EET: Nested utility zero choice probability + raw_utilities = logit.validate_utils( + state, raw_utilities, allow_zero_probs=True, trace_label=trace_label + ) - del raw_utilities - chunk_sizer.log_df(trace_label, "raw_utilities", None) + # utilities of leaves and nests + nested_utilities = compute_nested_utilities(raw_utilities, nest_spec) + chunk_sizer.log_df(trace_label, "nested_utilities", nested_utilities) - if have_trace_targets: - state.tracing.trace_df( - nested_exp_utilities, - "%s.nested_exp_utilities" % trace_label, - column_labels=["alternative", "utility"], - ) + # TODO-EET: use nested_utiltites directly to compute logsums? + if want_logsums: + # logsum of nest root + # exponentiated utilities of leaves and nests + nested_exp_utilities = compute_nested_exp_utilities( + raw_utilities, nest_spec + ) + chunk_sizer.log_df( + trace_label, "nested_exp_utilities", nested_exp_utilities + ) + logsums = pd.Series(np.log(nested_exp_utilities.root), index=choosers.index) + chunk_sizer.log_df(trace_label, "logsums", logsums) + + # TODO-EET: index of choices for nested utilities is different than unnested - this needs to be consistent for + # turning indexes into alternative names to keep code changes to minimum for now + name_mapping = raw_utilities.columns.values + + del raw_utilities + chunk_sizer.log_df(trace_label, "raw_utilities", None) + + if custom_chooser: + choices, rands = custom_chooser( + state, + utilities=nested_utilities, + name_mapping=name_mapping, + choosers=choosers, + spec=spec, + nest_spec=nest_spec, + trace_label=trace_label, + ) + else: + choices, rands = logit.make_choices_utility_based( + state, + nested_utilities, + name_mapping=name_mapping, + nest_spec=nest_spec, + trace_label=trace_label, + ) - # probabilities of alternatives relative to siblings sharing the same nest - nested_probabilities = compute_nested_probabilities( - state, nested_exp_utilities, nest_spec, trace_label=trace_label - ) - chunk_sizer.log_df(trace_label, "nested_probabilities", nested_probabilities) + del nested_utilities + chunk_sizer.log_df(trace_label, "nested_utilities", None) - if want_logsums: - # logsum of nest root - logsums = pd.Series(np.log(nested_exp_utilities.root), index=choosers.index) - chunk_sizer.log_df(trace_label, "logsums", logsums) + else: + # exponentiated utilities of leaves and nests + nested_exp_utilities = compute_nested_exp_utilities(raw_utilities, nest_spec) + chunk_sizer.log_df(trace_label, "nested_exp_utilities", nested_exp_utilities) - del nested_exp_utilities - chunk_sizer.log_df(trace_label, "nested_exp_utilities", None) + del raw_utilities + chunk_sizer.log_df(trace_label, "raw_utilities", None) - if have_trace_targets: - state.tracing.trace_df( - nested_probabilities, - "%s.nested_probabilities" % trace_label, - column_labels=["alternative", "probability"], + if have_trace_targets: + state.tracing.trace_df( + nested_exp_utilities, + "%s.nested_exp_utilities" % trace_label, + column_labels=["alternative", "utility"], + ) + + # probabilities of alternatives relative to siblings sharing the same nest + nested_probabilities = compute_nested_probabilities( + state, nested_exp_utilities, nest_spec, trace_label=trace_label ) + chunk_sizer.log_df(trace_label, "nested_probabilities", nested_probabilities) - # global (flattened) leaf probabilities based on relative nest coefficients (in spec order) - base_probabilities = compute_base_probabilities( - nested_probabilities, nest_spec, spec - ) - chunk_sizer.log_df(trace_label, "base_probabilities", base_probabilities) + if want_logsums: + # logsum of nest root + logsums = pd.Series(np.log(nested_exp_utilities.root), index=choosers.index) + chunk_sizer.log_df(trace_label, "logsums", logsums) - del nested_probabilities - chunk_sizer.log_df(trace_label, "nested_probabilities", None) + del nested_exp_utilities + chunk_sizer.log_df(trace_label, "nested_exp_utilities", None) - if have_trace_targets: - state.tracing.trace_df( - base_probabilities, - "%s.base_probabilities" % trace_label, - column_labels=["alternative", "probability"], + if have_trace_targets: + state.tracing.trace_df( + nested_probabilities, + "%s.nested_probabilities" % trace_label, + column_labels=["alternative", "probability"], + ) + + # global (flattened) leaf probabilities based on relative nest coefficients (in spec order) + base_probabilities = compute_base_probabilities( + nested_probabilities, nest_spec, spec ) + chunk_sizer.log_df(trace_label, "base_probabilities", base_probabilities) - # note base_probabilities could all be zero since we allowed all probs for nests to be zero - # check here to print a clear message but make_choices will raise error if probs don't sum to 1 - BAD_PROB_THRESHOLD = 0.001 - no_choices = (base_probabilities.sum(axis=1) - 1).abs() > BAD_PROB_THRESHOLD + del nested_probabilities + chunk_sizer.log_df(trace_label, "nested_probabilities", None) - if no_choices.any(): - logit.report_bad_choices( - state, - no_choices, - base_probabilities, - trace_label=tracing.extend_trace_label(trace_label, "bad_probs"), - trace_choosers=choosers, - msg="base_probabilities do not sum to one", - ) + if have_trace_targets: + state.tracing.trace_df( + base_probabilities, + "%s.base_probabilities" % trace_label, + column_labels=["alternative", "probability"], + ) - if custom_chooser: - choices, rands = custom_chooser( - state, - base_probabilities, - choosers, - spec, - trace_label, - ) - else: - choices, rands = logit.make_choices( - state, base_probabilities, trace_label=trace_label - ) + # note base_probabilities could all be zero since we allowed all probs for nests to be zero + # check here to print a clear message but make_choices will raise error if probs don't sum to 1 + BAD_PROB_THRESHOLD = 0.001 + no_choices = (base_probabilities.sum(axis=1) - 1).abs() > BAD_PROB_THRESHOLD + + if no_choices.any(): + logit.report_bad_choices( + state, + no_choices, + base_probabilities, + trace_label=tracing.extend_trace_label(trace_label, "bad_probs"), + trace_choosers=choosers, + msg="base_probabilities do not sum to one", + ) + + if custom_chooser: + choices, rands = custom_chooser( + state, + base_probabilities, + choosers, + spec, + trace_label, + ) + else: + choices, rands = logit.make_choices( + state, base_probabilities, trace_label=trace_label + ) - del base_probabilities - chunk_sizer.log_df(trace_label, "base_probabilities", None) + del base_probabilities + chunk_sizer.log_df(trace_label, "base_probabilities", None) if have_trace_targets: state.tracing.trace_df( diff --git a/activitysim/core/test/test_logit.py b/activitysim/core/test/test_logit.py index e249475de2..30bae8a3b9 100644 --- a/activitysim/core/test/test_logit.py +++ b/activitysim/core/test/test_logit.py @@ -3,13 +3,16 @@ from __future__ import annotations import os.path +import re import numpy as np import pandas as pd import pandas.testing as pdt import pytest -from activitysim.core import logit, workflow + +from activitysim.core import logit, workflow, random +from activitysim.core.logit import add_ev1_random, AltsContext from activitysim.core.simulate import eval_variables @@ -70,6 +73,9 @@ def utilities(choosers, spec, test_data): ) +# TODO-EET: Add tests here! + + def test_utils_to_probs(utilities, test_data): state = workflow.State().default_settings() probs = logit.utils_to_probs(state, utilities, trace_label=None) @@ -127,6 +133,65 @@ def test_make_choices_only_one(): choices, pd.Series([0, 1], index=["x", "y"]), check_dtype=False ) +def reset_step(state, name='test_step'): + state.get_rn_generator().end_step(name) + state.get_rn_generator().begin_step(name) + +def test_make_choices_utility_based_sampled_alts(): + """Test the situation of making choices from a sampled choice set""" + # TODO should these tests go in test_random? + state = workflow.State().default_settings() + # Make explicit that there's two indexing schemes - the raw alts, and the 0 based internals + utils_project_raw = pd.DataFrame({"a":10.582999, "b":10.680792, "c":10.710443}, index=pd.Index([0], name='person_id')) + # zero based indexes + utils_project = utils_project_raw.rename(columns={"a":0, "b":1, "c":2}) + utils_base = utils_project_raw[["a", "c"]].rename(columns={"a":0, "c":1}) + + assert utils_project.index.name == "person_id" + state.get_rn_generator().add_channel("persons", utils_project) + state.get_rn_generator().begin_step("test_step") + # mock base case, where alt 1 is omitted (it was improved in the project) + # this situation is quite common with poisson sampling with a variable choice set size, + # but it can also happen in with-replacement EET sampling e.g. if alt 2 had a pick_count of 2 in the base case. + # In principle, it can also be problematic for non-sampled choices where there is a base project difference in the + # availability of alternatives .e.g a new mode was introduced in the project case + + utils_project_with_rands = add_ev1_random(state, utils_project) + rands_project = utils_project_with_rands - utils_project + reset_step(state) + utils_base_with_rands = add_ev1_random(state, utils_base) + rands_base = utils_base_with_rands - utils_base + rands_base_labeled = rands_base.rename(columns={0:"a", 1:"c"}) + rands_project_labeled = rands_project.rename(columns={0:"a", 1:"b", 2:"c"}) + with pytest.raises(AssertionError, match=re.escape('(column name="c") are different')): + # TODO this should pass + pdt.assert_frame_equal(rands_base_labeled, rands_project_labeled.loc[:, rands_base_labeled.columns]) + # document incorrect invariant - first two columns have the same random numbers: + pdt.assert_frame_equal(rands_base, rands_project.iloc[:, :2]) + + # revised approach + reset_step(state) + alt_nrs_df = pd.DataFrame({0:0, 1:1, 2:2}, index=utils_project_raw.index) + alt_info = AltsContext.from_num_alts(3, zero_based=True) + utils_project_with_rands = add_ev1_random(state, utils_project, alt_info=alt_info, alt_nrs_df=alt_nrs_df) + rands_project = utils_project_with_rands - utils_project + reset_step(state) + + # alt "b" is missing from the sampled choice set, alt_nrs_df is set to reflect that + alt_nrs_df = pd.DataFrame({0: 0, 1: 2}, index=utils_project_raw.index) + utils_base_with_rands = add_ev1_random(state, utils_base, alt_info=alt_info, alt_nrs_df=alt_nrs_df) + rands_base = utils_base_with_rands - utils_base + rands_base_labeled = rands_base.rename(columns={0: "a", 1: "c"}) + rands_project_labeled = rands_project.rename(columns={0: "a", 1: "b", 2: "c"}) + + # Corrected invariant holds true + pdt.assert_frame_equal(rands_base_labeled, rands_project_labeled.loc[:, rands_base_labeled.columns]) + + + + + + def test_make_choices_real_probs(utilities): state = workflow.State().default_settings()