diff --git a/examples-proposed/022-tasks-and-ensembles/ensemble.conf b/examples-proposed/022-tasks-and-ensembles/ensemble.conf index 19ca57b5..78755433 100644 --- a/examples-proposed/022-tasks-and-ensembles/ensemble.conf +++ b/examples-proposed/022-tasks-and-ensembles/ensemble.conf @@ -13,7 +13,7 @@ SIMULATION_MODE = NORMAL CLASS = DRIVER SUB_CLASS = NAME = EnsembleDriver - NPROC = 1 + NPROC = 20 BIN_PATH = INPUT_FILES = OUTPUT_FILES = diff --git a/examples-proposed/022-tasks-and-ensembles/instance_component.py b/examples-proposed/022-tasks-and-ensembles/instance_component.py index e1a5cc65..1eb8c32d 100644 --- a/examples-proposed/022-tasks-and-ensembles/instance_component.py +++ b/examples-proposed/022-tasks-and-ensembles/instance_component.py @@ -11,6 +11,11 @@ class InstanceComponent(Component): def step(self, timestamp: float = 0.0, **keywords): + if 'HWLOC_XMLFILE' in os.environ: + self.services.warning(f'HWLOC_XMLfile still set!') + else: + self.services.info('HWLOC_XMLFILE is not set') + # ENSEMBLE_INSTANCE is a special IPS variable that contains the # string uniquely identifying this instance. Each instance will have # the `run_ensemble()` `name` argument prepended to a unique number @@ -37,7 +42,7 @@ def step(self, timestamp: float = 0.0, **keywords): '-o', 'stats.csv'] cmd = str(mpi_executable) + ' ' + ' '.join(args) try: - run_id = self.services.launch_task(nproc=1, + run_id = self.services.launch_task(nproc=5, working_dir=working_dir, binary=cmd) except Exception as e: diff --git a/examples-proposed/022-tasks-and-ensembles/template.conf b/examples-proposed/022-tasks-and-ensembles/template.conf index 146b16e5..fabf351b 100644 --- a/examples-proposed/022-tasks-and-ensembles/template.conf +++ b/examples-proposed/022-tasks-and-ensembles/template.conf @@ -27,7 +27,7 @@ SIMULATION_MODE = NORMAL CLASS = WORKER SUB_CLASS = NAME = InstanceComponent - NPROC = 1 + NPROC = 2 BIN_PATH = INPUT_FILES = OUTPUT_FILES = diff --git a/examples-proposed/024-aggregated-compute-ensemble/README.md b/examples-proposed/024-aggregated-compute-ensemble/README.md new file mode 100644 index 00000000..cc68ad63 --- /dev/null +++ b/examples-proposed/024-aggregated-compute-ensemble/README.md @@ -0,0 +1,33 @@ +# An example ensemble simulation for aggregated computing + +This example demonstrates how to set up an ensemble simulation in IPS that +performs aggregated computing across multiple ensemble instances. Each ensemble +instance runs a component for ${COMPUTE} that reports values local to the +instance, but that is then aggregated at the top-level after the instances +have finished. + +Note that there will be Dask related errors and warnings at the end that can be +ignored. These are due to Dask not having a clean shutdown. + +## Contents + +* `driver.py` -- top-level driver +* `instance_component.py` -- component worker code +* `instance_driver.py` -- component driver code + +* `ensemble.conf` -- top-level configuration file +* `platform.conf` -- platform configuration file +* `template.conf` -- ensemble instance configuration file + + +## Instructions + +To run the code, run: + +```bash +PORTAL_API_KEY=changeme ips.py --platform platform.conf --simulation ensemble.conf +``` + +Depending on the web portal instance you want to connect to, you will need to +change `PORTAL_API_KEY` in the run command and `PORTAL_URL` in the +`ensemble.conf` file. diff --git a/examples-proposed/024-aggregated-compute-ensemble/__init__.py b/examples-proposed/024-aggregated-compute-ensemble/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples-proposed/024-aggregated-compute-ensemble/driver.py b/examples-proposed/024-aggregated-compute-ensemble/driver.py new file mode 100644 index 00000000..dd0011bc --- /dev/null +++ b/examples-proposed/024-aggregated-compute-ensemble/driver.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +""" +Simple ensemble driver that just dispatches an IPS ensemble for an example +compute application. +""" + +from pathlib import Path + +from ipsframework import Component +from ipsframework.ipsutil import params_from_csv + + +class EnsembleDriver(Component): + """Kicks off a simple ensemble""" + + def init(self, timestamp=0.0): + pass + # TODO temporarily commenting this out until the actual + # example is ready to consider adding a notebook to the portal. + + # NOTEBOOK_TEMPLATE = 'notebook.ipynb' + # self.services.stage_input_files([NOTEBOOK_TEMPLATE]) + # try: + # self.services.initialize_jupyter_notebook(NOTEBOOK_TEMPLATE) + # except Exception: + # print('did not add notebook to portal') + + def step(self, timestamp=0.0): + # This CSV file contains the parameters used for the + # different instances. + variables = params_from_csv(self.config['PARAMETER_FILE']) + + # This is the IPS configuration file for the instances that looks + # like a regular configuration file except there are slots for the + # variables (e.g., 'alpha', 'T_final', etc.). 'TEMPLATE' is + # specified in the config file section for this driver. + template = Path(self.config['TEMPLATE']) + self.services.info(f'Using template config file {template}') + + if not template.exists(): + raise RuntimeError( + f'{template} config template file does not exist') + + # Now spin up and run the instances. This function will return a list + # with each list element corresponding to an instance. You can use + # this information to find the specific instance run directory for a + # given set of variables. + # + # The "name" parameter must be unique for each ensemble within a run, + # and will be used as an identifier on the Portal. + mapping = self.services.run_ensemble(template, variables, + run_dir=Path('.').absolute(), + name='INSTANCE_', + num_nodes=1, cores_per_instance=1) + + # Print each mapping of instance name to what variable values were used. + for instance in mapping: + self.services.info(f'{instance!s}') diff --git a/examples-proposed/024-aggregated-compute-ensemble/ensemble.conf b/examples-proposed/024-aggregated-compute-ensemble/ensemble.conf new file mode 100644 index 00000000..b01655f0 --- /dev/null +++ b/examples-proposed/024-aggregated-compute-ensemble/ensemble.conf @@ -0,0 +1,33 @@ +SIM_NAME = simpleensemble +SIM_ROOT = $PWD/ENSEMBLES +LOG_FILE = log +LOG_LEVEL = INFO +SIMULATION_MODE = NORMAL + +INPUT_DIR = $PWD/input_dir/ + +USE_PORTAL = True +PORTAL_URL = https://lb.ipsportal.development.svc.spin.nersc.org +# do not commit actual PORTAL_API_KEY value to version control, best to set as an environment variable +#PORTAL_API_KEY=changeme + +[PORTS] + NAMES = DRIVER + [[DRIVER]] + IMPLEMENTATION = ensemble_driver + +[ensemble_driver] + CLASS = DRIVER + SUB_CLASS = + NAME = EnsembleDriver + NPROC = 1 + BIN_PATH = + INPUT_FILES = + OUTPUT_FILES = + SCRIPT = $PWD/driver.py + MODULE = + # Specifies the template configuration file used for instances + TEMPLATE = $PWD/template.conf + # Specifies the parameter values for each instance + PARAMETER_FILE = $PWD/values.csv + diff --git a/examples-proposed/024-aggregated-compute-ensemble/gen_data.py b/examples-proposed/024-aggregated-compute-ensemble/gen_data.py new file mode 100644 index 00000000..f8bd70b3 --- /dev/null +++ b/examples-proposed/024-aggregated-compute-ensemble/gen_data.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +""" + Used to generate synthetic data as an example. +""" +import argparse +from typing import Any +import json +import csv +from time import time +from traceback import print_exc +import numpy as np +import matplotlib.pyplot as plt + +from ipsframework.resourceHelper import get_platform_info + +def main(instance: str, + alpha: float, L:float, T_final:float, Nx:int, Nt:int) -> dict[str, Any]: + """ Generate synthetic data to emulate an actual simulation or complex + calculation. + + As a side-effect it will save a plot to the current working directory with + the name `solution.png`. + + :param instance: instance name + :param alpha: thermal diffusivity + :param L: domain length + :param T_final: final time + :param Nx: number of spatial grid points + :param Nt: number of time steps + :returns: x, y, where x is the steps and u the corresponding values + """ + start = time() + + # Discretization + dx = L / (Nx - 1) + dt = T_final / Nt + r = alpha * dt / (dx ** 2) + + # # Check stability condition for explicit method + if r > 0.5: + print("Warning: Stability condition r <= 0.5 is not met. " + "Results may be inaccurate.") + + # Initial condition (e.g., a sine wave) + x = np.linspace(0, L, Nx) + u = np.sin(np.pi * x) + + # Boundary conditions (Dirichlet, e.g., u(0,t) = 0, u(L,t) = 0) These are + # already handled by the initial setup of u=0 at boundaries if the + # initial condition is 0 there. If non-zero, they would be set within the + # time loop. + + # Time evolution + for n in range(Nt): + u_new = np.copy(u) # Create a copy for updating + for i in range(1, Nx - 1): + u_new[i] = u[i] + r * (u[i + 1] - 2 * u[i] + u[i - 1]) + u = u_new + + # Plotting the result + plt.plot(x, u) + plt.xlabel("Position (x)") + plt.ylabel("Temperature (u)") + plt.title("Solution of 1D Heat Equation") + plt.grid(True) + plt.savefig(f"{instance}_solution.png") + + # Save some per-component stats + stats_fname = f'{instance}_stats.csv' + run_env = get_platform_info() + + with open(stats_fname, 'w') as f: + # Write run-time stats to a CSV as well as the runtime parameters + # specific to this instance. + writer = csv.writer(f) + writer.writerow( + ['instance', 'hostname', 'pid', 'core', + 'affinity', + 'alpha', 'L', 'T_final', 'Nx', 'Nt', + 'start', 'end']) + + writer.writerow([instance, run_env['hostname'], + run_env['pid'], run_env['core_id'], + run_env['affinity'], + alpha, L, T_final, Nx, Nt, + start, time()]) + + return {'x': x.tolist(), 'u': u.tolist()} + + + +if __name__ == '__main__': + try: + parser = argparse.ArgumentParser(description='Generate synthetic data to ' + 'emulate an actual simulation ' + 'or complex') + parser.add_argument('--instance', type=str, + help='instance name') + parser.add_argument('--alpha', type=float, default=1.0,) + parser.add_argument('--L', type=float, default=1.0,) + parser.add_argument('--T_final', type=float, default=1.0,) + parser.add_argument('--Nx', type=int, default=100,) + parser.add_argument('--Nt', type=int, default=100,) + + args = parser.parse_args() + + data = main(args.instance, + args.alpha, args.L, args.T_final, args.Nx, args.Nt) + + file_name = f'{args.instance}_solution.json' + print(f'Writing to {file_name}') + with open(file_name, 'w') as f: + json.dump(data, f) + + except Exception as e: + print(f'Encountered error: {e}') + print(f'Encountered error: {e}', file='gen_data_error.txt') + print_exc() diff --git a/examples-proposed/024-aggregated-compute-ensemble/instance_component.py b/examples-proposed/024-aggregated-compute-ensemble/instance_component.py new file mode 100644 index 00000000..3e1940b7 --- /dev/null +++ b/examples-proposed/024-aggregated-compute-ensemble/instance_component.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +""" +Component to be stepped in instance. + +This should generate a PNG image, a JSON file, and a CSV file. The first +two are from synthetic data generated from `gen_data.py`. The latter is +also generated from provenance data captured in `gen_data.py`, too. +""" +from pathlib import Path +from time import time +from typing import Any + +from ipsframework import Component + + +def create_cmd(instance: str, path: Path, alpha: float, L:float, T_final:float, + Nx:int, Nt:int) -> list[Any]: + """ create the command to run the external data generator + + :param instance: instance name + :param path: path to data generator script directory + :param alpha: thermal diffusivity + :param L: domain length + :param T_final: final time + :param Nx: number of spatial grid points + :param Nt: number of time steps + :returns: list of command line arguments to be executed in step() + """ + executable = f'{path!s}/gen_data.py' + cmd = ['python3', executable, '--instance', instance, + '--alpha', alpha, '--L', L, '--T_final', T_final, + '--Nx', Nx, '--Nt', Nt] + return cmd + + +class InstanceComponent(Component): + def step(self, timestamp: float = 0.0, **keywords): + start = time() + + # ENSEMBLE_INSTANCE is a special IPS variable that contains the + # string uniquely identifying this instance. Each instance will have + # the `run_ensemble()` `name` argument prepended to a unique number + # for each instance. E.g., ENSEMBLE_INSTANCE might be "MY_INSTANCE_23". + instance_id = self.services.get_config_param('ENSEMBLE_INSTANCE') + self.services.info(f'{instance_id}: Start of step of instance component.') + + # Echo the parameters we're expecting, A, B, and C + self.services.info(f'{instance_id}: instance component parameters: ' + f'alpha={self.alpha}, L={self.L}, ' + f'T_final={self.T_final}, Nx={self.Nx}, ' + f'Nt={self.Nt}') + + cmd = create_cmd(instance_id, Path(self.BIN_PATH), + self.alpha, self.L, self.T_final, self.Nx, self.Nt) + + working_dir = str(Path('.').absolute()) + self.services.info(f'{instance_id}: Launching executable ' + f'in {working_dir}') + run_id = None + try: + cmd = ' '.join(cmd) # need one big ole string for executing tasks + run_id = self.services.launch_task(nproc=1, + working_dir=working_dir, + binary=cmd) + except Exception as e: + self.services.critical(f'{instance_id}: Unable to launch ' + f'executable in {working_dir}') + + return_value = self.services.wait_task(run_id) # block until done + + self.services.info(f'{instance_id}: Completed MPI executable with ' + f'return value: {return_value}.') + + # TODO temporarily commenting this out until the actual + # example is ready to consider adding data files to the portal. This + # originally came from code Lance wrote in a previous example. + # try: + # self.services.add_analysis_data_files([data_fname, stats_fname], timestamp) + # except Exception: + # print('did not add data files to portal, check logs') + + self.services.info(f'{instance_id}: End of step of instance component.') diff --git a/examples-proposed/024-aggregated-compute-ensemble/instance_driver.py b/examples-proposed/024-aggregated-compute-ensemble/instance_driver.py new file mode 100644 index 00000000..33f1954b --- /dev/null +++ b/examples-proposed/024-aggregated-compute-ensemble/instance_driver.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 +""" +Driver component for instances +""" + +from ipsframework import Component + + +class InstanceDriver(Component): + """ + Instance driver component that steps the main component + """ + + def step(self, timestamp: float = 0.0, **keywords): + instance_component = self.services.get_port('WORKER') + + self.services.call(instance_component, 'step', 0.0) diff --git a/examples-proposed/024-aggregated-compute-ensemble/perlmutter.slurm b/examples-proposed/024-aggregated-compute-ensemble/perlmutter.slurm new file mode 100644 index 00000000..b99cf266 --- /dev/null +++ b/examples-proposed/024-aggregated-compute-ensemble/perlmutter.slurm @@ -0,0 +1,27 @@ +#!/bin/bash +# +# Set up the IPS ensemble example run. Note that you must have a setup +# python environment on Perlmutter in which IPS is already installed for +# this example to work. In this case, the conda environment is named "ips", but you're +# free to create your own environment with a different name; but, be sure to +# change the "conda activate" line below accordingly if you do so. +# +#SBATCH --account=atom # REPLACE "atom" WITH YOUR PROJECT ID +#SBATCH --constraint=cpu +#SBATCH --nodes=1 +#SBATCH --time=5 +#SBATCH -p debug + +module load PrgEnv-gnu openmpi python + +# Again, this assumes that there exists a conda environment named "ips". +conda activate ips + +# Set the API key and portal URL +source /global/common/software/atom/ips-portal/credentials/ips-portal-development + +# The 2>&1 binds stderr to stdout so that both are captured in the tee log file. The +# `tee` command allows you to see the output on the terminal as well as save it. +ips.py --simulation=ensemble.conf --platform=platform.conf \ + 2>&1 | tee ${SLURM_JOBID}_ips.log + diff --git a/examples-proposed/024-aggregated-compute-ensemble/platform.conf b/examples-proposed/024-aggregated-compute-ensemble/platform.conf new file mode 100644 index 00000000..7bd5585d --- /dev/null +++ b/examples-proposed/024-aggregated-compute-ensemble/platform.conf @@ -0,0 +1,9 @@ +# Platform config to run this example on localhost +MPIRUN = eval +NODE_DETECTION = manual +PROCS_PER_NODE = 8 +CORES_PER_NODE = 8 +SOCKETS_PER_NODE = 1 +NODE_ALLOCATION_MODE = shared +HOST = localhost +SCRATCH = \ No newline at end of file diff --git a/examples-proposed/024-aggregated-compute-ensemble/template.conf b/examples-proposed/024-aggregated-compute-ensemble/template.conf new file mode 100644 index 00000000..1e699128 --- /dev/null +++ b/examples-proposed/024-aggregated-compute-ensemble/template.conf @@ -0,0 +1,41 @@ +SIM_NAME = simpleensembleinstance +SIM_ROOT = $PWD +LOG_FILE = log +LOG_LEVEL = INFO +SIMULATION_MODE = NORMAL + +[PORTS] + NAMES = DRIVER WORKER + [[DRIVER]] + IMPLEMENTATION = instance_driver + + [[WORKER]] + IMPLEMENTATION = instance_component + +[instance_driver] + CLASS = DRIVER + SUB_CLASS = + NAME = InstanceDriver + NPROC = 1 + BIN_PATH = + INPUT_FILES = + OUTPUT_FILES = + SCRIPT = $PWD/instance_driver.py + MODULE = + +[instance_component] + CLASS = WORKER + SUB_CLASS = + NAME = InstanceComponent + NPROC = 1 + BIN_PATH = $PWD + INPUT_FILES = + OUTPUT_FILES = + SCRIPT = $PWD/instance_component.py + MODULE = + # These are the variables that will have values substituted for each instance. + alpha = + L = + T_final = + Nx = + Nt = diff --git a/examples-proposed/024-aggregated-compute-ensemble/values.csv b/examples-proposed/024-aggregated-compute-ensemble/values.csv new file mode 100644 index 00000000..82ced66b --- /dev/null +++ b/examples-proposed/024-aggregated-compute-ensemble/values.csv @@ -0,0 +1,12 @@ +instance_component:alpha, instance_component:L, instance_component:T_final,instance_component:Nx,instance_component:Nt +0.1, 1.0, 1.0, 50, 1000 +0.14, 1.70, 5.38, 95,850 +0.32, 2.56, 5.74, 57,600 +0.23, 4.84, 2.64, 95,910 +0.28, 4.72, 2.76, 78,710 +0.16, 5.91, 3.44, 70,689 +0.11, 3.89, 6.76, 73,640 +0.15, 4.06, 1.69, 35,500 +0.19, 9.50, 9.16, 2,910 +0.07, 6.77, 5.88, 14,300 +0.26, 9.36, 6.56, 91,710 diff --git a/examples-proposed/README.md b/examples-proposed/README.md index 810cd23e..85e4bd67 100644 --- a/examples-proposed/README.md +++ b/examples-proposed/README.md @@ -19,4 +19,9 @@ Note that each example will explicitly need to be installed into its own virtual - `009-task-pool-sync` - showcases a simple example utilizing `dask` to simulate parallelism - `010-adios-example` - meant to be a hello-world example for utilizing ADIOS files with the IPS Portal API. - `020-simple-ensemble` - simple example of how to run an ensemble of simulations -- `020-simple-ensemble-with-portal` - same as the `020-simple-ensemble` example, but with IPS Portal integration. \ No newline at end of file +- `021-ensembles-from-CSV` - ensembles with parameters read from CSV file +- `022-tasks-and-ensembles` - ensembles that submit tasks +- `023-simple-ensemble-with-portal` - same as the `020-simple-ensemble` + example, but with IPS Portal integration. +- `024-aggregated-compute-ensemble` - ensembles that submit tasks and + perform analytics per instance and for aggregated instances via the portal diff --git a/ipsframework/resourceHelper.py b/ipsframework/resourceHelper.py index f9867065..eed408fb 100644 --- a/ipsframework/resourceHelper.py +++ b/ipsframework/resourceHelper.py @@ -416,9 +416,12 @@ def get_platform_info(): properly for a given system. :returns: A dictionary containing hostname, cpu count, cpu core id for - current running process, and available GPU devices if set + current running process, and available GPU devices if set; if the + platform is supported it will also return CPU affinity """ - result = {'hostname': platform.node(), 'cpu_count': psutil.cpu_count(), 'pid': os.getpid()} + result = {'hostname': platform.node(), + 'cpu_count': psutil.cpu_count(), + 'pid': os.getpid()} if 'CUDA_VISIBLE_DEVICES' in os.environ: result['cuda_visible_devices'] = os.environ['CUDA_VISIBLE_DEVICES'] @@ -429,9 +432,16 @@ def get_platform_info(): p = psutil.Process() with p.oneshot(): result['core_id'] = p.cpu_num() + if hasattr(p, 'cpu_affinity'): + # Not all platforms support `cpu_affinity`, which is why + # we check first. + result['affinity'] = p.cpu_affinity() + else: + result['affinity'] = 'Unsupported Op' except Exception: # cpu_num() only available on linux (and BSD systems), so this will # throw an exception on other platforms - pass + result['core_id'] = 'Unsupported Op' + result['affinity'] = 'Unsupported Op' return result diff --git a/pyproject.toml b/pyproject.toml index e0cbd6f9..32ae57af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,11 +45,13 @@ dependencies = [ "dask", #"dask==2022.10.0", #"dask==2023.12.1", # TODO need a version compatible with 3.8 - "distributed", + "distributed" ] [project.optional-dependencies] -docs = ["sphinx", "sphinx_rtd_theme"] +# sphinx for making docs locally; matplotlib because some of the +# examples generate figures. +docs = ["sphinx", "sphinx_rtd_theme", "matplotlib"] [project.scripts] "ips.py" = "ipsframework.ips:main"