Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/unix-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ jobs:
- shell: bash -l {0}
name: Install dependencies
run: |
conda install -c conda-forge "numpy<2.4" "pandas<3"
conda install -c conda-forge pytorch-cpu
conda install -c pytorch numpy pandas
conda install -c conda-forge mpi4py openmpi=5.*
pip install .[test]
pip install git+https://github.com/campa-consortium/gest-api.git
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ jobs:
- shell: bash -l {0}
name: Install dependencies
run: |
conda install -c conda-forge "numpy<2.4" "pandas<3"
conda install -c conda-forge pytorch-cpu
conda install -c pytorch numpy pandas
conda install -c conda-forge mpi4py mpich
pip install .[test]
pip install git+https://github.com/campa-consortium/gest-api.git
Expand Down
101 changes: 60 additions & 41 deletions optimas/generators/ax/developer/multitask.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@

from optimas.generators.ax.base import AxGenerator
from optimas.core import (
TrialParameter,
Task,
Trial,
TrialStatus,
Expand Down Expand Up @@ -154,9 +153,6 @@ class AxMultitaskGenerator(AxGenerator):
VOCS object defining variables, objectives, constraints, and observables.
lofi_task, hifi_task : Task
The low- and high-fidelity tasks.
analyzed_parameters : list of Parameter, optional
List of parameters to analyze at each trial, but which are not
optimization objectives. By default ``None``.
use_cuda : bool, optional
Whether to allow the generator to run on a CUDA GPU. By default
``False``.
Expand All @@ -178,6 +174,8 @@ class AxMultitaskGenerator(AxGenerator):

"""

returns_id = True

def __init__(
self,
vocs: VOCS,
Expand All @@ -190,16 +188,6 @@ def __init__(
model_save_period: Optional[int] = 5,
model_history_dir: Optional[str] = "model_history",
) -> None:

# As trial parameters these get written to history array
# Ax trial_index and arm toegther locate a point
# Multiple points (Optimas trials) can share the same Ax trial_index
# vocs interface note: These are not part of vocs. They are only stored
# to allow keeping track of them from previous runs.
custom_trial_parameters = [
TrialParameter("arm_name", "ax_arm_name", dtype="U32"),
TrialParameter("ax_trial_id", "ax_trial_index", dtype=int),
]
self._check_inputs(vocs, lofi_task, hifi_task)

super().__init__(
Expand All @@ -210,7 +198,6 @@ def __init__(
save_model=save_model,
model_save_period=model_save_period,
model_history_dir=model_history_dir,
custom_trial_parameters=custom_trial_parameters,
)
self.lofi_task = lofi_task
self.hifi_task = hifi_task
Expand All @@ -226,6 +213,10 @@ def __init__(
self.gr_lofi = None
self._experiment = self._create_experiment()

# Internal mapping: _id -> (arm_name, ax_trial_id, trial_type)
self._id_mapping = {}
self._next_id = 0

def get_gen_specs(
self, sim_workers: int, run_params: Dict, sim_max: int
) -> Dict:
Expand Down Expand Up @@ -285,11 +276,22 @@ def suggest(self, num_points: Optional[int]) -> List[dict]:
if trial_param.name == "trial_type":
point[trial_param.name] = trial_type

point["ax_trial_id"] = trial_index
point["arm_name"] = arm.name
# Generate unique _id and store mapping
current_id = self._next_id
self._id_mapping[current_id] = {
"ax_trial_id": trial_index,
"arm_name": arm.name,
}
point["_id"] = current_id
self._next_id += 1
points.append(point)
return points

def _get_trial_mapping(self, gen_id: int) -> Tuple[int, str]:
"""Get mapping information for a trial gen_id."""
mapping = self._id_mapping[gen_id]
return mapping["ax_trial_id"], mapping["arm_name"]

def ingest(self, results: List[dict]) -> None:
"""Incorporate evaluated trials into experiment."""
# reconstruct Optimas trials
Expand All @@ -304,60 +306,77 @@ def ingest(self, results: List[dict]) -> None:
)
trials.append(trial)

# Apply _id mapping to all trials before processing
for trial in trials:
if trial.gen_id is not None:
if trial.gen_id not in self._id_mapping:
raise ValueError(
f"Trial has _id={trial.gen_id} which is not recognized by this generator."
)
trial.ax_trial_id, trial.arm_name = self._get_trial_mapping(
trial.gen_id
)

if self.gen_state == NOT_STARTED:
self._incorporate_external_data(trials)
else:
self._complete_evaluations(trials)

def _incorporate_external_data(self, trials: List[Trial]) -> None:
"""Incorporate external data (e.g., from history) into experiment."""
# Get trial indices.
trial_indices = []
for trial in trials:
trial_indices.append(trial.ax_trial_id)
trial_indices = np.unique(np.array(trial_indices))

# Group trials by index.
grouped_trials = {}
for index in trial_indices:
grouped_trials[index] = []
"""Incorporate external data (e.g., from history) into experiment.

Unknown/external points have no gen_id. We create new arms and add
observations directly to the experiment, then let the model use them
as if starting fresh.
"""
# Group by trial_type (default to hifi if not specified)
grouped_by_type = {}
for trial in trials:
grouped_trials[trial.ax_trial_id].append(trial)

# Add trials to experiment.
for index in trial_indices:
# Get all trials with current index.
trials_i = grouped_trials[index]
trial_type = trials_i[0].trial_type
# Create arms.
trial_type = getattr(trial, "trial_type", self.hifi_task.name)
if trial_type not in grouped_by_type:
grouped_by_type[trial_type] = []
grouped_by_type[trial_type].append(trial)

param_to_name = {}
arm_count = 0
for trial_type, trials_i in grouped_by_type.items():
arms = []
for trial in trials_i:
params = {}
for var, val in zip(
trial.varying_parameters, trial.parameter_values
):
params[var.name] = val
arms.append(Arm(parameters=params, name=trial.arm_name))
arm = Arm(parameters=params)
if arm.signature not in param_to_name:
param_to_name[arm.signature] = f"external_{arm_count}"
arm_count += 1
arms.append(
Arm(parameters=params, name=param_to_name[arm.signature])
)
# self._next_id += 1

# Create new batch trial.
gr = GeneratorRun(arms=arms, weights=[1.0] * len(arms))
ax_trial = self._experiment.new_batch_trial(
generator_run=gr, trial_type=trial_type
)
ax_trial.run()
# Incorporate observations.
for trial in trials_i:
for i, trial in enumerate(trials_i):
arm_name = ax_trial.arms[i].name
if trial.status != TrialStatus.FAILED:
objective_eval = {}
oe = trial.objective_evaluations[0]
objective_eval["f"] = (oe.value, oe.sem)
ax_trial.run_metadata[trial.arm_name] = objective_eval
ax_trial.run_metadata[arm_name] = objective_eval
else:
ax_trial.mark_arm_abandoned(trial.arm_name)
ax_trial.mark_arm_abandoned(arm_name)
# Mark batch trial as completed.
ax_trial.mark_completed()
# Keep track of high-fidelity trials.
if trial_type == self.hifi_task.name:
self.hifi_trials.append(index)
self.hifi_trials.append(ax_trial.index)

def _complete_evaluations(self, trials: List[Trial]) -> None:
"""Complete evaluated trials."""
Expand Down
95 changes: 25 additions & 70 deletions optimas/generators/ax/service/ax_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

from ax.service.ax_client import AxClient
from ax.core.objective import MultiObjective
from ax.core.types import ComparisonOp

from optimas.core import Objective, VaryingParameter, Parameter
from optimas.core import Parameter
from gest_api.vocs import VOCS
from .base import AxServiceGenerator

Expand All @@ -16,17 +17,14 @@ class AxClientGenerator(AxServiceGenerator):
This generator allows the user to provide a custom ``AxClient``,
allowing for maximum control of the optimization.

For this generator there is no need to provide the list of
``varying_parameters`` or ``objectives``. The generator will obtain
these parameters directly from the ``AxClient``.
For this generator there is no need to provide the ``vocs``. The
generator builds a VOCS (variables, objectives, constraints, and
observables) directly from the ``AxClient``.

Parameters
----------
ax_client : AxClient
The Ax client from which the trials will be generated.
analyzed_parameters : list of Parameter, optional
List of parameters to analyze at each trial, but which are not
optimization objectives. By default ``None``.
abandon_failed_trials : bool, optional
Whether failed trials should be abandoned (i.e., not suggested again).
By default, ``True``.
Expand All @@ -52,19 +50,15 @@ class AxClientGenerator(AxServiceGenerator):

Notes
-----
If the ``AxClient`` contains ``outcome_constraints``, these will appear in
the ``optimas`` log as optimization objectives. They are still being
correctly used as constraints by the ``AxClient``, and the optimization
will work as expected. This is only an issue on ``optimas``, which fails to
properly recognize them because optimization constraints have not yet been
implemented.
Outcome constraints are passed into VOCS as constraints and are correctly
used by the ``AxClient``. The ``optimas`` log/display does not yet show
constraints separately; constraint metrics may appear as extra columns.

"""

def __init__(
self,
ax_client: AxClient,
analyzed_parameters: Optional[List[Parameter]] = None,
abandon_failed_trials: Optional[bool] = True,
gpu_id: Optional[int] = 0,
dedicated_resources: Optional[bool] = False,
Expand All @@ -74,12 +68,6 @@ def __init__(
):
# Create VOCS object from AxClient data
vocs = self._create_vocs_from_ax_client(ax_client)

# Add constraints to analyzed parameters
analyzed_parameters = self._add_constraints_to_analyzed_parameters(
analyzed_parameters, ax_client
)

use_cuda = self._use_cuda(ax_client)
self._ax_client = ax_client

Expand Down Expand Up @@ -113,65 +101,32 @@ def _create_vocs_from_ax_client(self, ax_client: AxClient) -> VOCS:
obj_type = "MINIMIZE" if ax_obj.minimize else "MAXIMIZE"
objectives[ax_obj.metric_names[0]] = obj_type

# Extract observables from outcome constraints (if any)
observables = set()
# Extract constraints from outcome constraints (if any)
constraints = {}
ax_config = ax_client.experiment.optimization_config
if ax_config.outcome_constraints:
for constraint in ax_config.outcome_constraints:
observables.add(constraint.metric.name)
name = constraint.metric.name
if constraint.op == ComparisonOp.LEQ:
constraints[name] = ["LESS_THAN", constraint.bound]
elif constraint.op == ComparisonOp.GEQ:
constraints[name] = ["GREATER_THAN", constraint.bound]

return VOCS(
variables=variables,
objectives=objectives,
observables=observables,
constraints=constraints,
)

def _get_varying_parameters(self, ax_client: AxClient):
"""Obtain the list of varying parameters from the AxClient."""
varying_parameters = []
for _, p in ax_client.experiment.search_space.parameters.items():
vp = VaryingParameter(
name=p.name,
lower_bound=p.lower,
upper_bound=p.upper,
is_fidelity=p.is_fidelity,
fidelity_target_value=p.target_value,
dtype=p.python_type,
)
varying_parameters.append(vp)
return varying_parameters

def _get_objectives(self, ax_client: AxClient):
"""Obtain the list of objectives from the AxClient."""
objectives = []
ax_objective = ax_client.experiment.optimization_config.objective
if isinstance(ax_objective, MultiObjective):
ax_objectives = ax_objective.objectives
else:
ax_objectives = [ax_objective]
for ax_obj in ax_objectives:
obj = Objective(
name=ax_obj.metric_names[0], minimize=ax_obj.minimize
)
objectives.append(obj)
return objectives

def _add_constraints_to_analyzed_parameters(
self, analyzed_parameters: List[Parameter], ax_client: AxClient
):
"""Add outcome constraints to the list of analyzed parameters.

This is currently needed because optimas does not yet have a
proper definition of constraints. The constraints will be correctly
handled and given to the AxClient, but will appear as analyzed
parameters in the optimization log.
"""
ax_config = ax_client.experiment.optimization_config
if ax_config.outcome_constraints and analyzed_parameters is None:
analyzed_parameters = []
for constraint in ax_config.outcome_constraints:
analyzed_parameters.append(Parameter(name=constraint.metric.name))
return analyzed_parameters
def _convert_vocs_constraints_to_outcome_constraints(
self,
) -> tuple[List[str], List[Parameter]]:
"""Override to skip conversion since AxClient already has constraints."""
constraint_parameters = []
if hasattr(self._vocs, "constraints") and self._vocs.constraints:
for constraint_name in self._vocs.constraints.keys():
constraint_parameters.append(Parameter(constraint_name))
return [], constraint_parameters

def _create_ax_client(self) -> AxClient:
"""Override the base function to simply return the given."""
Expand Down
2 changes: 2 additions & 0 deletions optimas/generators/ax/service/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class AxServiceGenerator(AxGenerator):

"""

returns_id = True

def __init__(
self,
vocs: VOCS,
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ classifiers = [
]
dependencies = [
'libensemble >= 1.3.0',
'numpy < 2.4',
'jinja2',
'pandas',
'pandas < 3',
'mpi4py',
'pydantic >= 2.0',
]
Expand Down
Loading