Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions eessi/testsuite/check_process_binding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python3
"""
Check process binding from standard input in format similar to the example below, which was obtained as follows:

$ mpirun -np 3 --map-by slot:PE=2 bash -c \
'echo "$HOSTNAME $(hwloc-calc -p --hierarchical package.numanode.core.pu $(hwloc-bind --get))"'
host1 Package:0.NUMANode:3.Core:51.PU:15 Package:0.NUMANode:4.Core:8.PU:16
host1 Package:0.NUMANode:1.Core:17.PU:5 Package:0.NUMANode:3.Core:48.PU:12
host2 Package:0.NUMANode:3.Core:49.PU:13 Package:0.NUMANode:3.Core:50.PU:14

Alternatively, if numanode is not supported (hwloc < 2.9.0):

$ mpirun -np 3 --map-by slot:PE=2 bash -c \
'echo "$HOSTNAME $(hwloc-calc -p --hierarchical package.core.pu $(hwloc-bind --get))"'
host1 Package:0.Core:51.PU:15 Package:0.Core:8.PU:16
host1 Package:0.Core:17.PU:5 Package:0.Core:48.PU:12
host2 Package:0.Core:49.PU:13 Package:0.Core:50.PU:14
"""

import argparse
from collections import Counter, defaultdict
import sys


def main():
parser = argparse.ArgumentParser(description="Check process binding.")
parser.add_argument("--nodes", type=int, required=True, help="Expected number of nodes")
parser.add_argument("--procs", type=int, required=True, help="Expected number of processes")
parser.add_argument("--cpus-per-proc", type=int, required=True, help="Expected number of CPUs per process")
args = parser.parse_args()

procs = [p for p in (line.split() for line in sys.stdin) if p]
nodes = {x[0] for x in procs}
cpus_per_task = [x[1:] for x in procs]

num_procs = len(procs)
if num_procs != args.procs:
print(f"PROCESS BINDING ERROR: wrong number of processes: expected {args.procs}, found {num_procs}",
file=sys.stderr)

num_nodes = len(nodes)
if num_nodes != args.nodes:
print(f"PROCESS BINDING ERROR: wrong number of nodes: expected {args.nodes}, found {num_nodes}",
file=sys.stderr)

error_cpus = []
warning_packages = []
warning_numanodes = []
warning_ht = []

for cpus in cpus_per_task:
num_cpus = len(cpus)
if num_cpus != args.cpus_per_proc:
error_cpus.append(num_cpus)

packages = set()
numanodes = set()
cores_occupation = defaultdict(int)

for cpu in cpus:
cpu_parts = dict(item.split(':') for item in cpu.split('.'))
packages.add(cpu_parts['Package'])
if cpu_parts.get('NUMANode'):
numanodes.add(cpu_parts['NUMANode'])
cores_occupation[(cpu_parts['Package'], cpu_parts['Core'])] += 1

num_packages = len(packages)
if num_packages > 1:
warning_packages.append(num_packages)

num_numanodes = len(numanodes)
if num_numanodes > 1:
warning_numanodes.append(num_numanodes)

for _, occupation in cores_occupation.items():
if occupation > 1:
warning_ht.append(occupation)

if error_cpus:
print(f"PROCESS BINDING ERROR: wrong number of cpus per process: expected {args.cpus_per_proc},"
f" found {Counter(error_cpus)}", file=sys.stderr)

if warning_packages:
print(f"PROCESS BINDING WARNING: processes spanning multiple packages: {Counter(warning_packages)}",
file=sys.stderr)

if warning_numanodes:
print(f"PROCESS BINDING WARNING: processes spanning multiple numanodes: {Counter(warning_numanodes)}",
file=sys.stderr)

if warning_ht:
print("PROCESS BINDING WARNING: processes with cores shared by processing units, indicating hyperthreading:"
f" {Counter(warning_ht)},", file=sys.stderr)


if __name__ == "__main__":
main()
38 changes: 37 additions & 1 deletion eessi/testsuite/eessi_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from reframe.utility.sanity import make_performance_function
import reframe.utility.sanity as sn

from eessi.testsuite import hooks
from eessi.testsuite import check_process_binding, hooks
from eessi.testsuite.constants import DEVICE_TYPES, SCALES, COMPUTE_UNITS, TAGS
from eessi.testsuite.utils import log
from eessi.testsuite import __version__ as testsuite_version
Expand Down Expand Up @@ -71,6 +71,9 @@ class EESSI_Mixin(RegressionMixin):
# Make sure the version of the EESSI test suite gets logged in the ReFrame report
eessi_testsuite_version = variable(str, value=testsuite_version)

# Check process binding in a prerun cmd
check_process_binding = variable(bool, value=True)

# Note that the error for an empty parameter is a bit unclear for ReFrame 4.6.2, but that will hopefully improve
# see https://github.com/reframe-hpc/reframe/issues/3254
# If that improves: uncomment the following to force the user to set module_name
Expand Down Expand Up @@ -258,6 +261,27 @@ def EESSI_mixin_set_user_executable_opts(self):
'specified on cmd line {[self.user_executable_opts]}')
self.executable_opts = [self.user_executable_opts]

@run_before('run', always_last=True)
def EESSI_check_proc_binding(self):
"""Check process binding in a pre-run cmd. Result is written into job error file."""
if not self.check_process_binding:
return
check_binding_script = check_process_binding.__file__
get_binding = os.path.join(os.path.dirname(check_binding_script), 'get_process_binding.sh')
check_binding = ' '.join([
f'{check_binding_script}',
f'--cpus-per-proc {self.num_cpus_per_task}',
f'--procs {self.num_tasks}',
f'--nodes {self.num_tasks // self.num_tasks_per_node}',
])
self.prerun_cmds.extend([
"if command -v hwloc-calc >/dev/null; then",
f"{self.job.launcher.run_command(self.job)} {get_binding} | tee /dev/stderr | {check_binding}",
"else",
"echo 'PROCESS BINDING WARNING: hwloc not available, skipping process binding check' >/dev/stderr",
"fi",
])

@run_after('run')
def EESSI_mixin_extract_runtime_info_from_log(self):
"""Extracts the printed runtime info from the job log and logs it as reframe variables"""
Expand All @@ -279,3 +303,15 @@ def EESSI_mixin_extract_runtime_info_from_log(self):
'modpath', str)
if module_path:
self.full_modulepath = f'{module_path}'

@run_after('run')
def EESSI_mixin_extract_errors_warnings(self):
"""Extract the printed errors and warnings from the job error file and log them"""
if self.is_dry_run() or self.check_process_binding is False:
return

messages = sn.extractall(r'PROCESS BINDING ERROR: .*', f'{self.stagedir}/{self.stderr}')
messages += sn.extractall(r'PROCESS BINDING WARNING: .*', f'{self.stagedir}/{self.stderr}')
Comment thread
smoors marked this conversation as resolved.
if messages:
for msg in messages:
getlogger().warning(msg)
12 changes: 12 additions & 0 deletions eessi/testsuite/get_process_binding.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash
# get process binding with hwloc

hwlocbind=$(hwloc-bind --get)
binding=$(hwloc-calc -p -H package.numanode.core.pu "$hwlocbind" 2>/dev/null)

if [[ -z $binding ]]; then
# skip numanode as a fallback: not supported until hwloc v2.9.0
binding=$(hwloc-calc -p -H package.core.pu "$hwlocbind")
fi

echo "$HOSTNAME $binding"
16 changes: 16 additions & 0 deletions eessi/testsuite/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@
_buildenv_modules = []


def _set_job_resources(test: rfm.RegressionTest):
"Set job resources"
# This is needed to get the correct launcher run_command with `self.job.launcher.run_command(self.job)`.
# ReFrame already sets job resources during the run step,
# but some hooks use the launcher run_command before the run step.
# See the `run` method of the `RegressionTest` class in reframe/core/pipeline.py
test.job.num_tasks = test.num_tasks
test.job.num_tasks_per_node = test.num_tasks_per_node
test.job.num_tasks_per_core = test.num_tasks_per_core
test.job.num_tasks_per_socket = test.num_tasks_per_socket
test.job.num_cpus_per_task = test.num_cpus_per_task
test.job.use_smt = test.use_multithreading


def _assign_default_num_cpus_per_node(test: rfm.RegressionTest):
"""
Check if the default number of cpus per node is already defined in the test
Expand Down Expand Up @@ -150,6 +164,8 @@ def assign_tasks_per_compute_unit(test: rfm.RegressionTest, compute_unit: str, n
test.env_vars['SRUN_CPUS_PER_TASK'] = test.num_cpus_per_task
log(f'Set environment variable SRUN_CPUS_PER_TASK to {test.env_vars["SRUN_CPUS_PER_TASK"]}')

_set_job_resources(test)


def _assign_num_tasks_per_node(test: rfm.RegressionTest, num_per: int = 1):
"""
Expand Down
24 changes: 7 additions & 17 deletions eessi/testsuite/tests/apps/openfoam/openfoam.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,6 @@ def set_compute_unit(self):
def required_mem_per_node(self):
return self.num_tasks_per_node * 1700

@run_after('setup')
def check_launcher_options(self):
# We had to get the launcher command and prepend this to the prerun steps (func prepare_environment) because:
# 1. A typical OpenFOAM job would contain multiple mpirun steps working on the same stage directory.
# 2. We had trouble using ReFrame fixtures to separate these over multiple jobs, because we couldn't get it to
# work together with the mixin class.
if (self.job.launcher.command(self.job)[0] == 'mpirun'):
self.launcher_command = self.job.launcher.command(self.job)
self.launcher_command[-1] = str(self.num_tasks_per_node * self.num_nodes)
elif (self.job.launcher.command(self.job)[0] == 'srun'):
self.launcher_command = self.job.launcher.command(self.job)
else:
self.skip(msg="The chosen launcher for this test is different from mpirun or srun which means that the"
"test will definitely fail, therefore skipping this test.")

@run_after('setup')
def check_minimum_cores(self):
# The 64M test case requires minimally 512 cores to run within reasonable time.
Expand All @@ -130,15 +115,20 @@ def check_minimum_cores(self):

@run_before('run')
def prepare_environment(self):
# redistributePar and renumberMesh are added to the prerun cmds because:
# 1. A typical OpenFOAM job would run them as mpirun steps working on the same stage directory.
# 2. We had trouble using ReFrame fixtures to separate these over multiple jobs, because we couldn't get it to
# work together with the mixin class.

# fullpath = os.path.join(self.ldc_64M.stagedir, 'fixedTol')
self.prerun_cmds = [
'cd ./cavity3D/64M/fixedTol',
'source $FOAM_BASH',
f"foamDictionary -entry numberOfSubdomains -set {self.num_tasks_per_node * self.num_nodes} "
"system/decomposeParDict",
'blockMesh 2>&1 | tee log.blockMesh',
f"{' '.join(self.launcher_command)} redistributePar -decompose -parallel 2>&1 | tee log.decompose",
f"{' '.join(self.launcher_command)} renumberMesh -parallel -overwrite 2>&1 | tee log.renumberMesh"]
f"{self.job.launcher.run_command(self.job)} redistributePar -decompose -parallel 2>&1 | tee log.decompose",
f"{self.job.launcher.run_command(self.job)} renumberMesh -parallel -overwrite 2>&1 | tee log.renumberMesh"]

@deferrable
def check_files(self):
Expand Down