diff --git a/src/rxn_ca/cli.py b/src/rxn_ca/cli.py index 3947b38..fbb25eb 100644 --- a/src/rxn_ca/cli.py +++ b/src/rxn_ca/cli.py @@ -293,15 +293,43 @@ def suggest_precursors(): action='store_true', help='Output as JSON instead of human-readable format', ) + parser.add_argument( + '--anions', + type=str, + default=None, + help='Comma-separated anion formulas for precursors (default: O,CO3,OH,NO3)', + ) + parser.add_argument( + '--metathesis', + type=str, + default=None, + help='Comma-separated metathesis anion formulas (e.g., Cl or Cl,Br)', + ) + parser.add_argument( + '--counter-cations', + type=str, + default=None, + help='Comma-separated counter-cations for metathesis (e.g., Na,K)', + ) args = parser.parse_args() target = args.target print(f"Finding precursor combinations for: {target}", file=sys.stderr) + # Parse anion/cation arguments + anions = args.anions.split(',') if args.anions else None + metathesis_anions = args.metathesis.split(',') if args.metathesis else None + counter_cations = args.counter_cations.split(',') if args.counter_cations else None + # Expand elements to include precursor anions (C, N, H, etc.) print("Expanding element set for precursor phases...", file=sys.stderr) - elements = get_expanded_elements(target) + elements = get_expanded_elements( + target, + anions=anions, + metathesis_anions=metathesis_anions, + counter_cations=counter_cations, + ) print(f" Elements: {', '.join(sorted(elements))}", file=sys.stderr) # Fetch entries from Materials Project diff --git a/src/rxn_ca/optimization/objective.py b/src/rxn_ca/optimization/objective.py index 305a0e4..fd2efb6 100644 --- a/src/rxn_ca/optimization/objective.py +++ b/src/rxn_ca/optimization/objective.py @@ -49,7 +49,7 @@ class ObjectiveConfig: num_realizations: int = 3 cache_results: bool = True live_compress: bool = True - compress_freq: int = 50 + compress_freq: int = 500 class ObjectiveFunction: diff --git a/src/rxn_ca/optimization/precursor_selection.py b/src/rxn_ca/optimization/precursor_selection.py index e474230..bba72ea 100644 --- a/src/rxn_ca/optimization/precursor_selection.py +++ b/src/rxn_ca/optimization/precursor_selection.py @@ -65,11 +65,11 @@ def __post_init__(self): AnionType("phosphate", "PO4", -3, frozenset({"P", "O"})), ] -# Default anion types for typical solid-state synthesis -DEFAULT_PRECURSOR_ANIONS: List[str] = ["oxide", "carbonate", "hydroxide", "nitrate"] +# Default anion types for typical solid-state synthesis (by formula) +DEFAULT_PRECURSOR_ANIONS: List[str] = ["O", "CO3", "OH", "NO3"] -# Anion types useful for metathesis reactions -METATHESIS_ANIONS: List[str] = ["chloride", "bromide", "nitrate", "sulfate", "acetate"] +# Anion types useful for metathesis reactions (by formula) +METATHESIS_ANIONS: List[str] = ["Cl", "Br", "NO3", "SO4", "C2H3O2"] # Counter-cations for metathesis reactions (provide leaving groups) # Maps cation symbol to its oxidation state @@ -81,6 +81,9 @@ def __post_init__(self): "Cs": 1, } +# Build formula -> AnionType lookup (populated after COMMON_ANION_TYPES is defined) +_ANION_BY_FORMULA: Dict[str, "AnionType"] = {} + # ============================================================================= # Precursor formula generation @@ -164,22 +167,49 @@ def generate_precursor_formula( return cation_str + anion_str -def get_anion_by_name(name: str) -> AnionType: - """Get an AnionType by its name. +def _build_anion_lookup() -> None: + """Build the formula -> AnionType lookup dict.""" + for anion in COMMON_ANION_TYPES: + _ANION_BY_FORMULA[anion.formula] = anion + + +def get_anion(identifier: str) -> AnionType: + """Get an AnionType by its formula or name. Args: - name: Anion name (e.g., "oxide", "carbonate") + identifier: Anion formula (e.g., "CO3", "Cl") or name (e.g., "carbonate") Returns: Matching AnionType Raises: - ValueError: If anion name not found + ValueError: If anion not found + + Examples: + >>> get_anion("CO3") + AnionType(name='carbonate', formula='CO3', ...) + >>> get_anion("Cl") + AnionType(name='chloride', formula='Cl', ...) + >>> get_anion("carbonate") # name also works for backwards compat + AnionType(name='carbonate', formula='CO3', ...) """ + # Ensure lookup is populated + if not _ANION_BY_FORMULA: + _build_anion_lookup() + + # Try formula first + if identifier in _ANION_BY_FORMULA: + return _ANION_BY_FORMULA[identifier] + + # Fall back to name lookup for backwards compatibility for anion in COMMON_ANION_TYPES: - if anion.name == name: + if anion.name == identifier: return anion - raise ValueError(f"Unknown anion type: {name}") + + raise ValueError( + f"Unknown anion: '{identifier}'. " + f"Valid formulas: {list(_ANION_BY_FORMULA.keys())}" + ) def generate_practical_precursors( @@ -214,7 +244,7 @@ def generate_practical_precursors( if not oxidation_states: return [] - anions = [get_anion_by_name(name) for name in anion_types] + anions = [get_anion(a) for a in anion_types] precursors = [] seen = set() @@ -253,7 +283,7 @@ def generate_metathesis_sources( if counter_cations is None: counter_cations = ["Na", "K"] - anion = get_anion_by_name(target_anion) + anion = get_anion(target_anion) sources = [] for cation in counter_cations: @@ -266,21 +296,22 @@ def generate_metathesis_sources( return sources -def get_elements_from_anion_types(anion_types: Optional[List[str]] = None) -> Set[str]: - """Get all elements introduced by a set of anion types. +def get_elements_from_anions(anions: Optional[List[str]] = None) -> Set[str]: + """Get all elements introduced by a set of anions. Args: - anion_types: List of anion type names. Defaults to DEFAULT_PRECURSOR_ANIONS. + anions: List of anion formulas (e.g., ["CO3", "Cl"]). + Defaults to DEFAULT_PRECURSOR_ANIONS. Returns: Set of element symbols """ - if anion_types is None: - anion_types = DEFAULT_PRECURSOR_ANIONS + if anions is None: + anions = DEFAULT_PRECURSOR_ANIONS elements: Set[str] = set() - for name in anion_types: - anion = get_anion_by_name(name) + for identifier in anions: + anion = get_anion(identifier) elements.update(anion.elements) return elements @@ -288,20 +319,25 @@ def get_elements_from_anion_types(anion_types: Optional[List[str]] = None) -> Se def get_expanded_elements( target_phase: str, - anion_types: Optional[List[str]] = None, - include_metathesis: bool = True, + anions: Optional[List[str]] = None, + metathesis_anions: Optional[List[str]] = None, + counter_cations: Optional[List[str]] = None, ) -> Set[str]: """Get the full set of elements needed for precursor selection. This expands beyond the target phase elements to include elements from - common precursor anions (e.g., C from carbonates, N from nitrates). + precursor anions (e.g., C from CO3, N from NO3) and optionally metathesis + reagents. Use this to determine what elements to pass to get_entries(). Args: target_phase: Target product formula (e.g., "BaTiO3") - anion_types: Anion types to consider. Defaults to DEFAULT_PRECURSOR_ANIONS. - include_metathesis: If True, also include elements from metathesis anions. + anions: Base anion formulas for precursors. Defaults to ["O", "CO3", "OH", "NO3"]. + metathesis_anions: Additional anions for metathesis (e.g., ["Cl"]). + Defaults to None (no metathesis anions). + counter_cations: Counter-cations for metathesis (e.g., ["Na", "K"]). + Defaults to None (no counter-cations). Returns: Set of element symbols needed for get_entries() @@ -309,30 +345,41 @@ def get_expanded_elements( Examples: >>> get_expanded_elements("BaTiO3") {'Ba', 'Ti', 'O', 'C', 'N', 'H'} - >>> get_expanded_elements("BaTiO3", anion_types=["oxide"]) + >>> get_expanded_elements("BaTiO3", anions=["O"]) {'Ba', 'Ti', 'O'} + >>> get_expanded_elements("BaTiO3", metathesis_anions=["Cl"], counter_cations=["Na"]) + {'Ba', 'Ti', 'O', 'C', 'N', 'H', 'Cl', 'Na'} """ # Start with target phase elements target_comp = Composition(target_phase) elements = {str(el) for el in target_comp.elements} - # Add elements from precursor anions - if anion_types is None: - anion_types = list(DEFAULT_PRECURSOR_ANIONS) - - if include_metathesis: - # Add metathesis anion elements too - anion_types = list(set(anion_types + METATHESIS_ANIONS)) - - elements.update(get_elements_from_anion_types(anion_types)) - - # Add counter-cation elements if including metathesis - if include_metathesis: - elements.update(METATHESIS_COUNTER_CATIONS.keys()) - # Remove NH4 as it's not a real element, add N and H instead - elements.discard("NH4") - elements.add("N") - elements.add("H") + # Add elements from base precursor anions + if anions is None: + anions = list(DEFAULT_PRECURSOR_ANIONS) + + all_anions = list(anions) + + # Add metathesis anions if specified + if metathesis_anions: + all_anions = list(set(all_anions + metathesis_anions)) + + elements.update(get_elements_from_anions(all_anions)) + + # Add counter-cation elements if specified + if counter_cations: + for cation in counter_cations: + if cation == "NH4": + # NH4 is not a real element, add N and H instead + elements.add("N") + elements.add("H") + elif cation not in METATHESIS_COUNTER_CATIONS: + raise ValueError( + f"Unknown counter-cation: '{cation}'. " + f"Valid options: {list(METATHESIS_COUNTER_CATIONS.keys())}" + ) + else: + elements.add(cation) return elements diff --git a/src/rxn_ca/optimization/search_space.py b/src/rxn_ca/optimization/search_space.py index c109d20..d6ad2b0 100644 --- a/src/rxn_ca/optimization/search_space.py +++ b/src/rxn_ca/optimization/search_space.py @@ -124,19 +124,26 @@ def add_precursor_ratio( name: str, low: float, high: float, + step: float = 0.05, ) -> "SearchSpace": - """Add a precursor ratio parameter. + """Add a precursor ratio parameter (discrete with given step size). + + Discrete rather than continuous: BayBE enumerates all candidate values + and selects via Thompson Sampling, avoiding the L-BFGS-B boundary-hang + that occurs when a continuous parameter's optimum sits at its lower bound + (e.g. the stoichiometric ratio is also the minimum of the search range). Args: name: Parameter name (should match a precursor slot name + "_ratio") - low: Minimum ratio (typically 0-1) - high: Maximum ratio (typically 0-1) + low: Minimum ratio value + high: Maximum ratio value + step: Grid spacing between ratio values (default 0.05) Returns: self for method chaining """ ratio_name = f"{name}_ratio" if not name.endswith("_ratio") else name - param = ContinuousParameter(name=ratio_name, low=low, high=high) + param = DiscreteParameter(name=ratio_name, low=low, high=high, step=step) return self._add_parameter(param) def add_continuous( @@ -295,6 +302,41 @@ def parameter_names(self) -> List[str]: """Get list of all parameter names.""" return [p.name for p in self.parameters] + def as_dict(self) -> dict: + """Serialize to a JSON-safe dict for passing between jobflow jobs.""" + params = [] + for p in self.parameters: + d: Dict[str, Any] = {"type": p.param_type.value, "name": p.name} + if isinstance(p, DiscreteParameter): + d["low"] = p.low + d["high"] = p.high + d["step"] = p.step + elif isinstance(p, ContinuousParameter): + d["low"] = p.low + d["high"] = p.high + elif isinstance(p, PrecursorSlotParameter): + d["candidates"] = p.candidates + elif isinstance(p, CategoricalParameter): + d["choices"] = p.choices + params.append(d) + return {"parameters": params} + + @classmethod + def from_dict(cls, d: dict) -> "SearchSpace": + """Reconstruct a SearchSpace from a serialized dict.""" + space = cls() + for p in d["parameters"]: + ptype = p["type"] + if ptype == "discrete": + space.add_discrete(p["name"], p["low"], p["high"], p["step"]) + elif ptype == "continuous": + space.add_continuous(p["name"], p["low"], p["high"]) + elif ptype == "precursor_slot": + space.add_precursor_slot(p["name"], p["candidates"]) + elif ptype == "categorical": + space.add_categorical(p["name"], p["choices"]) + return space + def __repr__(self) -> str: param_strs = [] for p in self.parameters: diff --git a/src/rxn_ca/utilities/get_scored_rxns.py b/src/rxn_ca/utilities/get_scored_rxns.py index d305c84..f0e4e22 100644 --- a/src/rxn_ca/utilities/get_scored_rxns.py +++ b/src/rxn_ca/utilities/get_scored_rxns.py @@ -1,3 +1,7 @@ +import os +import multiprocessing as mp +from typing import List + from rxn_network.reactions.reaction_set import ReactionSet from ..core import HeatingSchedule @@ -6,11 +10,14 @@ from ..reactions import ReactionLibrary, ScoredReaction, ScoredReactionSet, score_rxns from ..reactions.scorers import BasicScore, TammanScore -from typing import List +_scoring_globals = {} -import multiprocessing as mp -_scoring_globals = {} +def _pool_initializer(data: dict): + """Populate worker-process globals (called by forkserver/spawn pool workers).""" + global _scoring_globals + _scoring_globals.update(data) + def fn(temp): score_class = _scoring_globals.get('score_class') diff --git a/src/rxn_ca/workflow/__init__.py b/src/rxn_ca/workflow/__init__.py index ea6fe1e..a41e6ab 100644 --- a/src/rxn_ca/workflow/__init__.py +++ b/src/rxn_ca/workflow/__init__.py @@ -8,13 +8,16 @@ """ from .schemas import SimulationOutput, ReactionLibraryData -from .jobs import setup_reaction_library, run_simulation -from .flows import create_simulation_flow +from .jobs import setup_reaction_library, run_simulation, init_bo_campaign, bo_trial_step +from .flows import create_simulation_flow, BOFlowMaker __all__ = [ "SimulationOutput", "ReactionLibraryData", "setup_reaction_library", "run_simulation", + "init_bo_campaign", + "bo_trial_step", "create_simulation_flow", + "BOFlowMaker", ] diff --git a/src/rxn_ca/workflow/flows/__init__.py b/src/rxn_ca/workflow/flows/__init__.py new file mode 100644 index 0000000..b6f1069 --- /dev/null +++ b/src/rxn_ca/workflow/flows/__init__.py @@ -0,0 +1,8 @@ +from .core import create_simulation_flow, create_multi_simulation_flow +from .bayesian import BOFlowMaker + +__all__ = [ + "create_simulation_flow", + "create_multi_simulation_flow", + "BOFlowMaker", +] diff --git a/src/rxn_ca/workflow/flows/bayesian.py b/src/rxn_ca/workflow/flows/bayesian.py new file mode 100644 index 0000000..484f542 --- /dev/null +++ b/src/rxn_ca/workflow/flows/bayesian.py @@ -0,0 +1,316 @@ +"""Bayesian Optimization flow Maker for ReactCA synthesis.""" + +from __future__ import annotations + +import numpy as np +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional + +from jobflow import Flow, Maker + +from rxn_ca.optimization import SearchSpace +from ..jobs.core import setup_reaction_library +from ..jobs.bayesian import init_bo_campaign, bo_trial_step + + +@dataclass +class BOFlowMaker(Maker): + """Maker for Bayesian Optimization flows over ReactCA simulations. + + Stores optimization configuration as dataclass fields so the same + Maker instance can be reused for multiple chemical systems without + re-specifying all parameters. Can be serialized with monty and + composed into larger workflow Makers. + + Wires together three job stages: + setup_reaction_library → init_bo_campaign → bo_trial_step[0] + ↓ Response.addition + bo_trial_step[1] → ... + + Attributes: + name: Name prefix for generated Flows. Full flow name is + "{name}_{target_phase}_{chemical_system}". + n_initial: Number of random exploration trials before BO guidance. + n_iterations: Number of BO-guided trials after the initial phase. + scorer_type: How to score each simulation result. + "final" — yield of target phase at the end of the simulation. + "maximum" — peak yield of target phase during the simulation. + simulation_size: CA grid size (NxN cells). + num_realizations: Number of independent simulation runs per trial. + Results are averaged for the BO score. + live_compress: Store full CA state snapshots at each compress_freq + step instead of diffs. Strongly recommended. + compress_freq: Interval (in simulation steps) between snapshots + when live_compress is True. + metastability_cutoff: Energy above the convex hull (eV/atom) below + which phases are included. + exclude_theoretical: If True, exclude phases without experimental + observations in the Materials Project database. + + Example — reuse the same maker for different systems: + maker = BOFlowMaker(n_initial=5, n_iterations=15, simulation_size=10) + flow_li = maker.make("Li-Si-O-C", "Li4SiO4", search_space_li, output_dir_li) + flow_ba = maker.make("Ba-Ti-O", "BaTiO3", search_space_ba, output_dir_ba) + + Example — compose into a larger Maker: + @dataclass + class MySweepMaker(Maker): + name: str = "sweep" + bo_maker: BOFlowMaker = field(default_factory=BOFlowMaker) + + def make(self, systems: list, ...) -> Flow: + flows = [self.bo_maker.make(sys, ...) for sys in systems] + return Flow(flows, name=self.name) + """ + + name: str = "bo_flow" + n_initial: int = 5 + n_iterations: int = 15 + scorer_type: str = "final" + simulation_size: int = 10 + num_realizations: int = 3 + live_compress: bool = True + compress_freq: int = 500 + metastability_cutoff: float = 0.03 + exclude_theoretical: bool = True + + def make( + self, + chemical_system: str, + target_phase: str, + search_space: SearchSpace, + output_dir: str, + fixed_precursors: Optional[Dict[str, float]] = None, + ensure_phases: Optional[List[str]] = None, + fw_category: Optional[str] = None, + metadata: Optional[Dict] = None, + **library_kwargs, + ) -> Flow: + """Build a Bayesian Optimization Flow for the given chemical system. + + Args: + chemical_system: Element system string, e.g. "Li-Si-O-C". + target_phase: Formula of the target product, e.g. "Li4SiO4". + search_space: Configured SearchSpace. Must contain a 'hold_temp' + parameter; precursor slots (if any) are derived automatically. + output_dir: Shared filesystem path written to by every trial job + (history.csv, best_result.json, simulations/). Must be + accessible from all HPC worker nodes. + fixed_precursors: Formula → molar amount map. When provided, + precursor selection is not optimized (only the thermal profile + is). Mutually exclusive with precursor slots in search_space. + ensure_phases: Phases that must be present in the reaction library. + Defaults to target_phase + all precursor candidates. + fw_category: FireWorks _category tag. When provided, every trial + job (including dynamically-added ones) carries this tag so + workers launched with `rlaunch --category ` only run + Fireworks belonging to this workflow. Generated automatically + by optimize_synthesis_general.py and written to + {output_dir}/fw_category.txt. + **library_kwargs: Forwarded to setup_reaction_library + (e.g. thermo_types=["R2SCAN"]). + + Returns: + Flow containing setup, init, and first trial jobs. Subsequent trial + jobs are added dynamically at runtime via Response(addition=...). + + Raises: + ValueError: If search_space has no 'hold_temp' parameter. + ValueError: If both fixed_precursors and precursor slots are given. + """ + flow_name = f"{self.name}_{target_phase}_{chemical_system}" + output_dir = str(Path(output_dir).expanduser().resolve()) + total_iterations = self.n_initial + self.n_iterations + + # --- Derive temperatures from search space hold_temp parameter --- + hold_temp_param = search_space.get_parameter("hold_temp") + if hold_temp_param is None: + raise ValueError( + "search_space must contain a 'hold_temp' parameter. " + "Add one with search_space.add_temperature_range(...)." + ) + temperatures = sorted( + set(int(t) for t in np.arange(300, hold_temp_param.high + 1, 100)) + ) + + # --- Derive precursor slot names from search space --- + precursor_slot_names = [p.name for p in search_space.precursor_parameters] + + if fixed_precursors is not None and precursor_slot_names: + raise ValueError( + "Provide either fixed_precursors or precursor slots in search_space, not both." + ) + + # --- Build ensure_phases if not explicitly provided --- + if ensure_phases is None: + ensure_phases = [target_phase] + if fixed_precursors: + ensure_phases.extend(fixed_precursors.keys()) + else: + for param in search_space.precursor_parameters: + ensure_phases.extend(param.candidates) + ensure_phases = list(dict.fromkeys(ensure_phases)) + + # --- Objective config passed to every trial job --- + objective_config = { + "target_phase": target_phase, + "scorer_type": self.scorer_type, + "simulation_size": self.simulation_size, + "num_realizations": self.num_realizations, + "live_compress": self.live_compress, + "compress_freq": self.compress_freq, + "target_name": "yield", + } + + # --- Job 1: Build reaction library (runs once, shared across all trials) --- + setup_kwargs = dict(library_kwargs) + library_dir = setup_kwargs.pop("library_dir", None) + include_library_dict = setup_kwargs.pop("include_library_dict", False) + entry_kwargs = setup_kwargs.pop("entry_kwargs", {}) + + setup_job = setup_reaction_library( + chemical_system=chemical_system, + temperatures=temperatures, + ensure_phases=ensure_phases, + metastability_cutoff=self.metastability_cutoff, + exclude_theoretical=self.exclude_theoretical, + save_to_file=True, + library_dir=library_dir, + include_library_dict=include_library_dict, + entry_kwargs=entry_kwargs, + **setup_kwargs, + ) + setup_job.name = f"setup_{chemical_system}" + + # --- Job 2: Initialize BayBE Campaign --- + init_job = init_bo_campaign( + search_space_config=search_space.as_dict(), + n_initial=self.n_initial, + n_iterations=self.n_iterations, + ) + init_job.name = "init_bo_campaign" + + # --- Merge user metadata with auto-derived fields --- + _metadata = { + "target_phase": target_phase, + "chemical_system": chemical_system, + **(metadata or {}), + } + + # --- Job 3: First BO trial --- + # campaign_json and reaction_library_data are job output references — + # jobflow resolves them at runtime after the upstream jobs complete. + first_trial = bo_trial_step( + iteration=0, + total_iterations=total_iterations, + campaign_json=init_job.output["campaign.json"], + reaction_library_data=setup_job.output, + precursor_slot_names=precursor_slot_names, + fixed_precursors=fixed_precursors, + objective_config=objective_config, + output_dir=output_dir, + fw_category=fw_category, + metadata=_metadata, + ) + if fw_category: + first_trial.update_config({"manager_config": {"_category": fw_category}}) + first_trial.name = "bo_trial_step_000" + + flow = Flow( + [setup_job, init_job, first_trial], + name=flow_name, + ) + flow.update_metadata(_metadata) + return flow + + def make_campaign( + self, + chemical_system: str, + target_phase: str, + search_space: SearchSpace, + output_dir: str, + reaction_library_data, + fixed_precursors: Optional[Dict[str, float]] = None, + fw_category: Optional[str] = None, + metadata: Optional[Dict] = None, + ) -> Flow: + """Build a BO campaign Flow using a pre-built reaction library. + + Like make(), but accepts an existing reaction_library_data (or a + jobflow OutputReference to one) instead of creating a + setup_reaction_library job internally. Use this when sharing one + reaction library across multiple campaigns for the same chemical system. + + Args: + chemical_system: Element system string, e.g. "Na-Mo-Cd-N-O". + target_phase: Formula of the target product. + search_space: Configured SearchSpace. + output_dir: Shared filesystem path for trial outputs + (history.csv, best_result.json, simulations/). + reaction_library_data: A ReactionLibraryData object or a jobflow + OutputReference (e.g. setup_job.output) that resolves to one. + fixed_precursors: Formula → molar amount map. When provided, + precursor selection is not optimized. + fw_category: FireWorks _category tag. When provided, every job in + this campaign carries the tag so workers with a matching + fworker.yaml only run this campaign's Fireworks. + + Returns: + Flow containing init and first trial jobs only (no setup job). + """ + flow_name = f"{self.name}_{target_phase}_{chemical_system}" + output_dir = str(Path(output_dir).expanduser().resolve()) + total_iterations = self.n_initial + self.n_iterations + + precursor_slot_names = [p.name for p in search_space.precursor_parameters] + + if fixed_precursors is not None and precursor_slot_names: + raise ValueError( + "Provide either fixed_precursors or precursor slots in search_space, not both." + ) + + objective_config = { + "target_phase": target_phase, + "scorer_type": self.scorer_type, + "simulation_size": self.simulation_size, + "num_realizations": self.num_realizations, + "live_compress": self.live_compress, + "compress_freq": self.compress_freq, + "target_name": "yield", + } + + init_job = init_bo_campaign( + search_space_config=search_space.as_dict(), + n_initial=self.n_initial, + n_iterations=self.n_iterations, + ) + init_job.name = "init_bo_campaign" + if fw_category: + init_job.update_config({"manager_config": {"_category": fw_category}}) + + _metadata = { + "target_phase": target_phase, + "chemical_system": chemical_system, + **(metadata or {}), + } + + first_trial = bo_trial_step( + iteration=0, + total_iterations=total_iterations, + campaign_json=init_job.output["campaign.json"], + reaction_library_data=reaction_library_data, + precursor_slot_names=precursor_slot_names, + fixed_precursors=fixed_precursors, + objective_config=objective_config, + output_dir=output_dir, + fw_category=fw_category, + metadata=_metadata, + ) + if fw_category: + first_trial.update_config({"manager_config": {"_category": fw_category}}) + first_trial.name = "bo_trial_step_000" + + flow = Flow([init_job, first_trial], name=flow_name) + flow.update_metadata(_metadata) + return flow diff --git a/src/rxn_ca/workflow/flows.py b/src/rxn_ca/workflow/flows/core.py similarity index 92% rename from src/rxn_ca/workflow/flows.py rename to src/rxn_ca/workflow/flows/core.py index 04af1f3..54f2f29 100644 --- a/src/rxn_ca/workflow/flows.py +++ b/src/rxn_ca/workflow/flows/core.py @@ -1,11 +1,12 @@ -"""Pre-built flows for common rxn-ca simulation workflows.""" +"""Core flows for rxn-ca simulations.""" + +from __future__ import annotations from typing import Any, Dict, List, Optional, Union from jobflow import Flow -from .jobs import setup_reaction_library, run_simulation -from .schemas import SimulationOutput +from ..jobs.core import setup_reaction_library, run_simulation def create_simulation_flow( @@ -40,7 +41,7 @@ def create_simulation_flow( Returns: Flow with setup and simulation jobs, outputting SimulationOutput - + Example: >>> from rxn_ca.core.recipe import ReactionRecipe >>> from rxn_ca.core.heating import HeatingSchedule, HeatingStep @@ -62,17 +63,14 @@ def create_simulation_flow( ... ensure_phases=["BaTiO3", "BaCO3", "TiO2"], ... ) """ - # Ensure we have a ReactionRecipe object from rxn_ca.core.recipe import ReactionRecipe if not isinstance(recipe, ReactionRecipe): recipe = ReactionRecipe.from_dict(recipe) - # Get temperatures from recipe if not provided if temperatures is None: temperatures = recipe.heating_schedule.all_temps - # Create setup job setup_job = setup_reaction_library( chemical_system=chemical_system, temperatures=temperatures, @@ -83,7 +81,6 @@ def create_simulation_flow( ) setup_job.name = f"setup_{chemical_system}" - # Create simulation job sim_job = run_simulation( recipe=recipe, reaction_library_data=setup_job.output, @@ -130,7 +127,6 @@ def create_multi_simulation_flow( """ from rxn_ca.core.recipe import ReactionRecipe - # Ensure all recipes are ReactionRecipe objects and collect temperatures recipe_objects = [] all_temps = set() @@ -140,11 +136,9 @@ def create_multi_simulation_flow( recipe_objects.append(r) all_temps.update(r.heating_schedule.all_temps) - # Use provided temperatures or collected ones if temperatures is None: temperatures = sorted(all_temps) - # Create setup job (shared across all simulations) setup_job = setup_reaction_library( chemical_system=chemical_system, temperatures=temperatures, @@ -158,7 +152,6 @@ def create_multi_simulation_flow( jobs = [setup_job] outputs = [] - # Create simulation jobs for i, recipe in enumerate(recipe_objects): metadata = metadata_list[i] if metadata_list else None diff --git a/src/rxn_ca/workflow/jobs/__init__.py b/src/rxn_ca/workflow/jobs/__init__.py new file mode 100644 index 0000000..6156f8d --- /dev/null +++ b/src/rxn_ca/workflow/jobs/__init__.py @@ -0,0 +1,9 @@ +from .core import setup_reaction_library, run_simulation +from .bayesian import init_bo_campaign, bo_trial_step + +__all__ = [ + "setup_reaction_library", + "run_simulation", + "init_bo_campaign", + "bo_trial_step", +] diff --git a/src/rxn_ca/workflow/jobs/bayesian.py b/src/rxn_ca/workflow/jobs/bayesian.py new file mode 100644 index 0000000..281a330 --- /dev/null +++ b/src/rxn_ca/workflow/jobs/bayesian.py @@ -0,0 +1,418 @@ +"""Bayesian Optimization jobs for rxn-ca simulations.""" + +from __future__ import annotations + +import csv +import json +import signal +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +import pandas as pd +from jobflow import Response, job + +_RECOMMEND_TIMEOUT_SECONDS = 300 # fall back to random after 5 min + + +class _RecommendTimeout(Exception): + pass + + +def _alarm_handler(signum, frame): + raise _RecommendTimeout() + +from ..schemas import ReactionLibraryData +import os + +# --------------------------------------------------------------------------- +# Helpers (not @job — called inside bo_trial_step) +# --------------------------------------------------------------------------- + + +def build_recipe_from_params( + params: Dict[str, Any], + precursor_slot_names: List[str], + fixed_precursors: Optional[Dict[str, float]], + simulation_size: int, + num_realizations: int, +) -> "OptimizableRecipe": + """Convert a BayBE parameter recommendation into an OptimizableRecipe. + + Args: + params: Parameter dict recommended by BayBE (may contain numpy scalars). + precursor_slot_names: Names of precursor slot parameters in the search + space (e.g. ["li_source", "si_source"]). Used to extract the + selected precursor formula for each slot from params. + fixed_precursors: If provided, use these formula→amount pairs directly + (precursor selection is not optimized). + simulation_size: CA grid size (NxN). + num_realizations: Number of simulation realizations to average. + + Returns: + Configured OptimizableRecipe ready for to_recipe(). + """ + from rxn_ca.optimization import OptimizableRecipe + + # Normalize numpy scalar types that BayBE returns in its DataFrame + clean_params = { + k: (v.item() if hasattr(v, "item") else v) + for k, v in params.items() + } + + if fixed_precursors is not None: + # Allow BayBE to optimize individual precursor amounts via + # '{formula}_ratio' parameters in the search space. Formulas without + # a ratio parameter keep their base amount from fixed_precursors. + scaled = { + formula: float(clean_params.get(f"{formula}_ratio", base_amount)) + for formula, base_amount in fixed_precursors.items() + } + return OptimizableRecipe( + precursors=scaled, + hold_temp=int(clean_params["hold_temp"]), + hold_time=int(clean_params["hold_time"]), + ramp_step_time=int(clean_params.get("ramp_step_time", 1)), + simulation_size=simulation_size, + num_simulations=num_realizations, + ) + + precursor_slots = {name: clean_params[name] for name in precursor_slot_names} + + return OptimizableRecipe.from_params( + params=clean_params, + precursor_slots=precursor_slots, + simulation_size=simulation_size, + num_simulations=num_realizations, + ) + + +def _score_result(result_doc: Any, target_phase: str, scorer_type: str) -> float: + """Score a simulation result document. + + Args: + result_doc: RxnCAResultDoc from the simulation. + target_phase: Chemical formula of the target product. + scorer_type: "final" (yield at end) or "maximum" (peak yield). + + Returns: + Float score in [0, 1], or 0.0 if scoring fails. + """ + from rxn_ca.optimization import AnalyzedResult, FinalProductScorer, MaximumProductScorer, get_result_analysis + + try: + traces = get_result_analysis(result_doc) + analyzed = AnalyzedResult(traces) + scorer_cls = MaximumProductScorer if scorer_type == "maximum" else FinalProductScorer + return scorer_cls(target_phase).score(analyzed) + except Exception as e: + print(f"Warning: scoring failed ({type(e).__name__}: {e})") + return 0.0 + + +# --------------------------------------------------------------------------- +# Jobs +# --------------------------------------------------------------------------- + +@job +def init_bo_campaign( + search_space_config: dict, + n_initial: int, + n_iterations: int, + target_name: str = "yield", + # precursor_smiles: Optional[Dict[str, Dict[str, str]]] = None, +) -> dict: + """Initialize a BayBE Campaign from a serialized SearchSpace. + + Builds the BayBE Campaign (search space, recommender, target) and returns + it as a JSON string so it can be passed between stateless HPC jobs without + losing the GP posterior. + + Args: + search_space_config: Serialized SearchSpace from SearchSpace.as_dict(). + n_initial: Number of random exploration trials before BO guidance. + n_iterations: Number of BO-guided trials after the initial phase. + target_name: Name of the optimization target (default "yield"). + + Returns: + {"campaign_json": } + """ + from rxn_ca.optimization import BayesianOptimizer, SearchSpace + from rxn_ca.optimization.objective import MockObjectiveFunction, ObjectiveConfig, ScorerType + + search_space = SearchSpace.from_dict(search_space_config) + + stub_objective = MockObjectiveFunction( + config=ObjectiveConfig( + target_phase="__stub__", + scorer_type=ScorerType.FINAL, + ), + score_fn=lambda p: 0.0, + ) + + optimizer = BayesianOptimizer( + search_space=search_space, + objective=stub_objective, + n_initial=n_initial, + n_iterations=n_iterations, + target_name=target_name, + # precursor_smiles=precursor_smiles or {}, + ) + + print(f"Initialized BayBE Campaign: {search_space}") + print(f" n_initial={n_initial}, n_iterations={n_iterations}, target='{target_name}'") + with open("campaign.json", "w") as f: + json.dump(optimizer._campaign.to_json(), f) + return {"campaign.json": os.getcwd() + "/campaign.json"} + + +@job +def bo_trial_step( + iteration: int, + total_iterations: int, + campaign_json: str, + reaction_library_data: ReactionLibraryData, + precursor_slot_names: List[str], + fixed_precursors: Optional[Dict[str, float]], + objective_config: dict, + output_dir: str, + fw_category: Optional[str] = None, + metadata: Optional[Dict] = None, +) -> Response: + """Run one Bayesian optimization trial and chain the next. + + Each call: + 1. Restores the BayBE Campaign from JSON (GP posterior preserved). + 2. Calls campaign.recommend() for the next parameter configuration. + 3. Builds an OptimizableRecipe from those parameters. + 4. Runs the CA simulation using the pre-built reaction library. + 5. Scores the result and adds it to the Campaign. + 6. Saves per-trial JSON and appends to history.csv. + 7. Returns Response(addition=) or a summary on the final trial. + + Args: + iteration: Current 0-based trial index. + total_iterations: Total trials (n_initial + n_iterations). + campaign_json: Serialized BayBE Campaign from init_bo_campaign or the + previous trial. Contains the full GP posterior. + reaction_library_data: Output from setup_reaction_library. Passed + through every trial — MP enumeration is done once. + precursor_slot_names: Names of PrecursorSlotParameters in the search + space (e.g. ["li_source", "si_source"]). + fixed_precursors: Formula → amount map when precursors are not + optimized. Set to None when using precursor slots. + objective_config: Dict with keys: + target_phase, scorer_type ("final"|"maximum"), + simulation_size, num_realizations, + live_compress (bool), compress_freq (int). + output_dir: Shared filesystem path for per-trial JSON, history.csv, + and best_result.json. Must be accessible from all worker nodes. + fw_category: FireWorks _category tag for this workflow. When set, + propagated to each dynamically-added next trial so workers + filtered with `rlaunch --category ` only run this workflow. + + Returns: + Response(addition=) if more trials remain, + otherwise a summary dict with best score and parameters. + """ + from baybe import Campaign + from rxn_ca.phases import SolidPhaseSet + from rxn_ca.reactions import ReactionLibrary + from rxn_ca.utilities.parallel_sim import run_sim_parallel + from rxn_ca.utilities.single_sim import run_single_sim + + target_phase = objective_config["target_phase"] + scorer_type = objective_config.get("scorer_type", "final") + simulation_size = objective_config["simulation_size"] + num_realizations = objective_config["num_realizations"] + live_compress = objective_config.get("live_compress", True) + compress_freq = objective_config.get("compress_freq", 500) + target_name = objective_config.get("target_name", "yield") + + output_path = Path(output_dir) + sim_dir = output_path / "simulations" + sim_dir.mkdir(parents=True, exist_ok=True) + + print(f"\n=== BO Trial {iteration + 1}/{total_iterations} ===") + + # --- Step 1: Restore BayBE Campaign (GP posterior preserved) --- + with open(campaign_json, "r") as f: + campaign_jsonfile = json.load(f) + campaign = Campaign.from_json(campaign_jsonfile) + + # --- Step 2: Get next recommendation --- + # Guard against the acquisition-function optimiser hanging at a continuous + # parameter boundary (L-BFGS-B can spin forever when the GP posterior peak + # sits exactly on the lower/upper bound). After _RECOMMEND_TIMEOUT_SECONDS + # we fall back to RandomRecommender so the trial can still run. + signal.signal(signal.SIGALRM, _alarm_handler) + signal.alarm(_RECOMMEND_TIMEOUT_SECONDS) + try: + recommendation = campaign.recommend(batch_size=1) + signal.alarm(0) + except _RecommendTimeout: + signal.alarm(0) + print( + f"Warning: campaign.recommend() timed out after " + f"{_RECOMMEND_TIMEOUT_SECONDS}s. " + "Falling back to random recommendation. " + "Consider re-registering the workflow with discrete ratio parameters " + "to prevent this (see search_space.add_precursor_ratio)." + ) + from baybe.recommenders.pure.nonpredictive.sampling import RandomRecommender + recommendation = RandomRecommender().recommend( + batch_size=1, + searchspace=campaign.searchspace, + objective=campaign.objective, + measurements=campaign.measurements, + ) + params = { + col: ( + recommendation.iloc[0][col].item() + if hasattr(recommendation.iloc[0][col], "item") + else recommendation.iloc[0][col] + ) + for col in recommendation.columns + } + print(f"Recommended params: {params}") + + # --- Step 3: Build recipe --- + opt_recipe = build_recipe_from_params( + params=params, + precursor_slot_names=precursor_slot_names, + fixed_precursors=fixed_precursors, + simulation_size=simulation_size, + num_realizations=num_realizations, + ) + recipe = opt_recipe.to_recipe() + print(f"Recipe: {opt_recipe}") + + # --- Step 4: Load reaction library --- + if reaction_library_data.reaction_library_path: + rxn_lib = ReactionLibrary.from_file(reaction_library_data.reaction_library_path) + phase_set = rxn_lib.phases + elif reaction_library_data.reaction_library_dict: + phase_set = SolidPhaseSet.from_dict(reaction_library_data.phase_set_dict) + rxn_lib = ReactionLibrary.from_dict(reaction_library_data.reaction_library_dict) + else: + raise ValueError( + "reaction_library_data must include reaction_library_path or reaction_library_dict" + ) + + # --- Steps 5&6: Run simulation and score --- + # If trial_path already exists this Firework is being re-run after a + # walltime kill that occurred after the simulation completed (e.g. during + # campaign save or Response chaining). Skip the expensive simulation and + # reuse the cached score so the GP posterior stays consistent. + trial_path = sim_dir / f"trial_{iteration:03d}.json" + if trial_path.exists(): + cached = json.loads(trial_path.read_text()) + score = cached["score"] + result_doc = None + print(f"Re-run detected: reusing cached score {score:.4f} from {trial_path.name}") + else: + print("Running simulation...") + if num_realizations > 1: + result_doc = run_sim_parallel( + recipe, + reaction_lib=rxn_lib, + phase_set=phase_set, + live_compress=live_compress, + compress_freq=compress_freq, + ) + else: + result_doc = run_single_sim( + recipe, + reaction_lib=rxn_lib, + phase_set=phase_set, + live_compress=live_compress, + compress_freq=compress_freq, + ) + score = _score_result(result_doc, target_phase, scorer_type) + print(f"Score ({target_phase}, {scorer_type}): {score:.4f}") + + # --- Step 7: Tell Campaign --- + campaign.add_measurements(pd.DataFrame([{**params, target_name: score}])) + + # --- Step 8: Save per-trial outputs --- + if not trial_path.exists(): + trial_path.write_text(json.dumps({ + "iteration": iteration, + "params": params, + "score": score, + "timestamp": datetime.utcnow().isoformat(), + }, indent=2)) + + history_path = output_path / "history.csv" + already_logged = history_path.exists() and any( + int(row["iteration"]) == iteration + for row in csv.DictReader(history_path.open()) + ) + if not already_logged: + write_header = not history_path.exists() + fieldnames = ["iteration", "score", *sorted(params.keys())] + with history_path.open("a", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + if write_header: + writer.writeheader() + writer.writerow({"iteration": iteration, "score": score, **params}) + + if result_doc is not None and hasattr(result_doc, "to_file"): + try: + result_doc.to_file(str(sim_dir / f"trial_{iteration:03d}_full.json")) + except Exception as e: + print(f"Warning: could not save full result doc: {e}") + + # --- Step 9: Chain next trial or finalize --- + # Write the updated campaign to a shared-filesystem file so the next + # trial job (running on a different worker) can read it by path. + # Passing campaign.to_json() directly would give a raw JSON string, but + # bo_trial_step expects a file path (open(campaign_json, "r")). + campaign_path = str(output_path / f"campaign_iter_{iteration:03d}.json") + with open(campaign_path, "w") as f: + json.dump(campaign.to_json(), f) + + if iteration + 1 < total_iterations: + next_job = bo_trial_step( + iteration=iteration + 1, + total_iterations=total_iterations, + campaign_json=campaign_path, + reaction_library_data=reaction_library_data, + precursor_slot_names=precursor_slot_names, + fixed_precursors=fixed_precursors, + objective_config=objective_config, + output_dir=output_dir, + fw_category=fw_category, + metadata=metadata, + ) + if fw_category: + next_job.update_config({"manager_config": {"_category": fw_category}}) + next_job.name = f"bo_trial_step_{iteration + 1:03d}" + if metadata: + next_job.update_metadata(metadata) + return Response(addition=next_job) + + # Final iteration: collect all trial results and write summary + history_rows: List[dict] = [] + for i in range(total_iterations): + trial_file = sim_dir / f"trial_{i:03d}.json" + if trial_file.exists(): + history_rows.append(json.loads(trial_file.read_text())) + + if history_rows: + best = max(history_rows, key=lambda r: r["score"]) + summary = { + "target_phase": target_phase, + "fixed_precursors": fixed_precursors, + "best_score": best["score"], + "best_params": best["params"], + "best_iteration": best["iteration"], + "total_evaluations": total_iterations, + } + (output_path / "best_result.json").write_text(json.dumps(summary, indent=2)) + print("\nOptimization complete.") + print(f"Best score: {best['score']:.4f} at iteration {best['iteration']}") + print(f"Best params: {best['params']}") + return summary + + return {"status": "complete", "iterations": total_iterations} diff --git a/src/rxn_ca/workflow/jobs.py b/src/rxn_ca/workflow/jobs/core.py similarity index 80% rename from src/rxn_ca/workflow/jobs.py rename to src/rxn_ca/workflow/jobs/core.py index fb1fc02..e7b6796 100644 --- a/src/rxn_ca/workflow/jobs.py +++ b/src/rxn_ca/workflow/jobs/core.py @@ -11,7 +11,7 @@ from jobflow import job from monty.json import MontyEncoder, MontyDecoder -from .schemas import ReactionLibraryData, SimulationOutput +from ..schemas import ReactionLibraryData, SimulationOutput def _build_reaction_library( @@ -20,6 +20,7 @@ def _build_reaction_library( ensure_phases: List[str] = None, metastability_cutoff: float = 0.1, exclude_theoretical: bool = True, + **entry_kwargs, ) -> tuple: """Build phase set and reaction library for a chemical system. @@ -46,6 +47,7 @@ def _build_reaction_library( metastability_cutoff=metastability_cutoff, ensure_phases=ensure_phases or [], exclude_theoretical_phases=exclude_theoretical, + thermo_types=entry_kwargs.get("thermo_types", ["GGA_GGA+U"]) ) print(f"Got {len(entries)} entries for {chemical_system}") if ensure_phases: @@ -59,13 +61,17 @@ def _build_reaction_library( rxn_set = run_enumerators(enumerators, entries) print(f"Enumerated {len(rxn_set)} reactions") + # rxn_network's G_ELEMS uses integer string keys ('300', '400', ...), + # so temperatures must be ints — float 300.0 becomes '300.0' and KeyErrors. + int_temps = [int(t) for t in temperatures] + # Compute reactions at all temperatures - temp_rxn_mapping = rxn_set.compute_at_temperatures(temperatures) + temp_rxn_mapping = rxn_set.compute_at_temperatures(int_temps) # Score reactions reaction_lib = get_scored_rxns( rxn_set, - temps=temperatures, + temps=int_temps, phase_set=phase_set, rxns_at_temps=temp_rxn_mapping, parallel=True, @@ -79,9 +85,13 @@ def setup_reaction_library( chemical_system: str, temperatures: List[float], ensure_phases: List[str] = None, - metastability_cutoff: float = 0.1, + metastability_cutoff: float = 0.03, exclude_theoretical: bool = True, save_to_file: bool = True, + library_dir: Optional[str] = None, + include_library_dict: bool = True, + entry_kwargs: Optional[Dict[str, Any]] = None, + **library_kwargs, ) -> ReactionLibraryData: """Set up phase set and reaction library for a chemical system. @@ -99,30 +109,42 @@ def setup_reaction_library( metastability_cutoff: Energy above hull cutoff for phases exclude_theoretical: Whether to exclude theoretical phases save_to_file: If True, save reaction library to a JSON file + library_dir: Optional output directory for saved library JSON. + Defaults to current working directory when not provided. + include_library_dict: Whether to include the full serialized + reaction_library_dict in the returned payload. + entry_kwargs: Optional kwargs forwarded to entry/reaction enumeration. Returns: ReactionLibraryData with phase set, reaction library, and metadata """ + build_kwargs: Dict[str, Any] = {} + build_kwargs.update(entry_kwargs or {}) + build_kwargs.update(library_kwargs) + phase_set, reaction_lib = _build_reaction_library( chemical_system, temperatures, ensure_phases, metastability_cutoff, exclude_theoretical, + **build_kwargs, ) # Optionally save reaction library to file reaction_library_path = None if save_to_file: + output_dir = Path(library_dir).expanduser() if library_dir else Path.cwd() + output_dir.mkdir(parents=True, exist_ok=True) filename = f"reaction_library_{chemical_system.replace('-', '_')}.json" - reaction_library_path = str(Path.cwd() / filename) + reaction_library_path = str((output_dir / filename).resolve()) with open(reaction_library_path, "w") as f: json.dump(reaction_lib.as_dict(), f, cls=MontyEncoder) print(f"Saved reaction library to {reaction_library_path}") return ReactionLibraryData( phase_set_dict=phase_set.as_dict(), - reaction_library_dict=reaction_lib.as_dict(), + reaction_library_dict=reaction_lib.as_dict() if include_library_dict else {}, chemical_system=chemical_system, temperatures=temperatures, phases_available=list(phase_set.phases), @@ -136,11 +158,11 @@ def run_simulation( reaction_library_data: ReactionLibraryData = None, chemical_system: str = None, ensure_phases: List[str] = None, - metastability_cutoff: float = 0.1, + metastability_cutoff: float = 0.03, save_to_file: bool = True, metadata: Dict[str, Any] = None, live_compress: bool = True, - compress_freq: int = 100, + compress_freq: int = 500, ) -> SimulationOutput: """Run an rxn-ca simulation. @@ -174,8 +196,16 @@ def run_simulation( # Set up phase set and reaction library if reaction_library_data is not None: - phase_set = SolidPhaseSet.from_dict(reaction_library_data.phase_set_dict) - reaction_lib = ReactionLibrary.from_dict(reaction_library_data.reaction_library_dict) + if reaction_library_data.reaction_library_path: + reaction_lib = ReactionLibrary.from_file(reaction_library_data.reaction_library_path) + phase_set = reaction_lib.phases + elif reaction_library_data.reaction_library_dict: + phase_set = SolidPhaseSet.from_dict(reaction_library_data.phase_set_dict) + reaction_lib = ReactionLibrary.from_dict(reaction_library_data.reaction_library_dict) + else: + raise ValueError( + "reaction_library_data must include reaction_library_path or reaction_library_dict" + ) chem_sys = reaction_library_data.chemical_system reaction_library_path = reaction_library_data.reaction_library_path else: