diff --git a/.flake8 b/.flake8 index ec314267e..a53b1cdb0 100644 --- a/.flake8 +++ b/.flake8 @@ -5,4 +5,4 @@ select = C,E,F,W,B ignore = E203, E501, W503, B008 copyright-check = True -max-complexity = 15 +max-complexity = 16 diff --git a/codecarbon/core/cpu.py b/codecarbon/core/cpu.py index 3ab71cfab..3e39f0441 100644 --- a/codecarbon/core/cpu.py +++ b/codecarbon/core/cpu.py @@ -817,6 +817,7 @@ def get_cpu_details(self, duration: Time) -> Dict: for rapl_file in self._rapl_files: logger.debug(rapl_file) + # Delta, if total external/cpu will break cpu_details[rapl_file.name] = rapl_file.energy_delta.kWh # We fake the name used by Power Gadget when using RAPL if "Energy" in rapl_file.name: diff --git a/codecarbon/core/measure.py b/codecarbon/core/measure.py index 6f612a5da..3eec52206 100644 --- a/codecarbon/core/measure.py +++ b/codecarbon/core/measure.py @@ -1,6 +1,7 @@ from time import perf_counter -from codecarbon.external.hardware import CPU, GPU, RAM, AppleSiliconChip +from codecarbon.external.cpu import CPU +from codecarbon.external.hardware import GPU, RAM, AppleSiliconChip from codecarbon.external.logger import logger diff --git a/codecarbon/core/resource_tracker.py b/codecarbon/core/resource_tracker.py index 120faf4ef..78e5051e8 100644 --- a/codecarbon/core/resource_tracker.py +++ b/codecarbon/core/resource_tracker.py @@ -4,7 +4,8 @@ from codecarbon.core import cpu, gpu, powermetrics from codecarbon.core.config import parse_gpu_ids from codecarbon.core.util import detect_cpu_model, is_linux_os, is_mac_os, is_windows_os -from codecarbon.external.hardware import CPU, GPU, MODE_CPU_LOAD, AppleSiliconChip +from codecarbon.external.cpu import CPU, MODE_CPU_LOAD +from codecarbon.external.hardware import GPU, AppleSiliconChip from codecarbon.external.logger import logger from codecarbon.external.ram import RAM @@ -28,6 +29,7 @@ def set_RAM_tracking(self): self.ram_tracker = "RAM power estimation model" ram = RAM( tracking_mode=self.tracker._tracking_mode, + tracking_pids=self.tracker._tracking_pids, force_ram_power=self.tracker._force_ram_power, ) self.tracker._conf["ram_total_size"] = ram.machine_memory_GB @@ -46,6 +48,7 @@ def _setup_cpu_load_mode(self, tdp, max_power): model, max_power, tracking_mode=self.tracker._tracking_mode, + tracking_pids=self.tracker._tracking_pids, ) self.cpu_tracker = MODE_CPU_LOAD self.tracker._conf["cpu_model"] = hardware_cpu.get_model() @@ -56,7 +59,12 @@ def _setup_power_gadget(self): """Set up CPU tracking using Intel Power Gadget.""" logger.info("Tracking Intel CPU via Power Gadget") self.cpu_tracker = "Power Gadget" - hardware_cpu = CPU.from_utils(self.tracker._output_dir, "intel_power_gadget") + hardware_cpu = CPU.from_utils( + self.tracker._output_dir, + "intel_power_gadget", + tracking_mode=self.tracker._tracking_mode, + tracking_pids=self.tracker._tracking_pids, + ) self.tracker._hardware.append(hardware_cpu) self.tracker._conf["cpu_model"] = hardware_cpu.get_model() return True @@ -70,6 +78,8 @@ def _setup_rapl(self): mode="intel_rapl", rapl_include_dram=self.tracker._rapl_include_dram, rapl_prefer_psys=self.tracker._rapl_prefer_psys, + tracking_mode=self.tracker._tracking_mode, + tracking_pids=self.tracker._tracking_pids, ) self.tracker._hardware.append(hardware_cpu) self.tracker._conf["cpu_model"] = hardware_cpu.get_model() @@ -81,6 +91,11 @@ def _setup_powermetrics(self): self.gpu_tracker = "PowerMetrics" self.cpu_tracker = "PowerMetrics" + if self.tracker._tracking_mode != "machine": + logger.warning( + "PowerMetrics only supports 'machine' tracking mode. Overriding tracking mode to 'machine'." + ) + hardware_cpu = AppleSiliconChip.from_utils( self.tracker._output_dir, chip_part="CPU" ) @@ -141,6 +156,7 @@ def _setup_fallback_tracking(self, tdp, max_power): model, max_power, tracking_mode=self.tracker._tracking_mode, + tracking_pids=self.tracker._tracking_pids, ) self.cpu_tracker = MODE_CPU_LOAD else: @@ -148,7 +164,12 @@ def _setup_fallback_tracking(self, tdp, max_power): "No CPU tracking mode found. Falling back on CPU constant mode." ) hardware_cpu = CPU.from_utils( - self.tracker._output_dir, "constant", model, max_power + self.tracker._output_dir, + "constant", + model, + max_power, + tracking_mode=self.tracker._tracking_mode, + tracking_pids=self.tracker._tracking_pids, ) self.cpu_tracker = "global constant" self.tracker._hardware.append(hardware_cpu) @@ -163,6 +184,7 @@ def _setup_fallback_tracking(self, tdp, max_power): model, max_power, tracking_mode=self.tracker._tracking_mode, + tracking_pids=self.tracker._tracking_pids, ) self.cpu_tracker = MODE_CPU_LOAD else: @@ -170,7 +192,12 @@ def _setup_fallback_tracking(self, tdp, max_power): "Failed to match CPU TDP constant. Falling back on a global constant." ) self.cpu_tracker = "global constant" - hardware_cpu = CPU.from_utils(self.tracker._output_dir, "constant") + hardware_cpu = CPU.from_utils( + self.tracker._output_dir, + "constant", + tracking_mode=self.tracker._tracking_mode, + tracking_pids=self.tracker._tracking_pids, + ) self.tracker._hardware.append(hardware_cpu) def set_CPU_tracking(self): diff --git a/codecarbon/emissions_tracker.py b/codecarbon/emissions_tracker.py index 0913c215b..a93d74eb0 100644 --- a/codecarbon/emissions_tracker.py +++ b/codecarbon/emissions_tracker.py @@ -22,8 +22,9 @@ from codecarbon.core.resource_tracker import ResourceTracker from codecarbon.core.units import Energy, Power, Time, Water from codecarbon.core.util import count_cpus, count_physical_cpus, suppress +from codecarbon.external.cpu import CPU from codecarbon.external.geography import CloudMetadata, GeoMetadata -from codecarbon.external.hardware import CPU, GPU, AppleSiliconChip +from codecarbon.external.hardware import GPU, AppleSiliconChip from codecarbon.external.logger import logger, set_logger_format, set_logger_level from codecarbon.external.ram import RAM from codecarbon.external.scheduler import PeriodicScheduler @@ -179,6 +180,7 @@ def __init__( str ] = _sentinel, # Deprecated, use electricitymaps_api_token tracking_mode: Optional[str] = _sentinel, + tracking_pids: Optional[List[int]] = _sentinel, log_level: Optional[Union[int, str]] = _sentinel, on_csv_write: Optional[str] = _sentinel, logger_preamble: Optional[str] = _sentinel, @@ -234,6 +236,8 @@ def __init__( power consumption due to the entire machine or to try and isolate the tracked processe's in isolation. Defaults to "machine". + :param tracking_pids: PID of the process to be tracked when using "process" mode. + Defaults to None, which means the current process. :param log_level: Global codecarbon log level. Accepts one of: {"debug", "info", "warning", "error", "critical"}. Defaults to "info". @@ -317,6 +321,30 @@ def __init__( self._set_from_conf(prometheus_url, "prometheus_url", "localhost:9091") self._set_from_conf(output_handlers, "output_handlers", []) self._set_from_conf(tracking_mode, "tracking_mode", "machine") + self._set_from_conf( + tracking_pids, "tracking_pids", [psutil.Process().pid], List[int] + ) + if self._tracking_pids is None or len(self._tracking_pids) == 0: + self._tracking_pids = [psutil.Process().pid] + tracking_pids = self._tracking_pids + + # Check if tracking pids are child of each other + pid_check = set() + for pid in self._tracking_pids: + try: + process = psutil.Process(pid) + pids_to_track = {pid} | { + child.pid for child in process.children(recursive=True) + } + except psutil.NoSuchProcess: + continue + + duplicates = pids_to_track & pid_check + for dup_pid in duplicates: + logger.warning(f"Process with pid {dup_pid} is already being tracked.") + + pid_check.update(pids_to_track) + self._set_from_conf(on_csv_write, "on_csv_write", "append") self._set_from_conf(logger_preamble, "logger_preamble", "") self._set_from_conf(force_cpu_power, "force_cpu_power", None, float) @@ -391,6 +419,12 @@ def __init__( else: logger.info(f" GPU model: {self._conf.get('gpu_model')}") + if self._tracking_mode == "process": + logger.info(" Tracking mode: process") + logger.info(" Tracked PIDs: " + str(self._tracking_pids)) + else: + logger.info(" Tracking mode: machine") + # Run `self._measure_power_and_energy` every `measure_power_secs` seconds in a # background thread self._scheduler = PeriodicScheduler( @@ -1233,6 +1267,7 @@ def track_emissions( str ] = _sentinel, # Deprecated, use electricitymaps_api_token tracking_mode: Optional[str] = _sentinel, + tracking_pids: Optional[List[int]] = _sentinel, log_level: Optional[Union[int, str]] = _sentinel, on_csv_write: Optional[str] = _sentinel, logger_preamble: Optional[str] = _sentinel, @@ -1293,6 +1328,8 @@ def track_emissions( power consumption due to the entire machine or to try and isolate the tracked processe's in isolation. Defaults to "machine". + :param tracking_pids: PID of the process to be tracked when using "process" mode. + Defaults to None, which means the current process. :param log_level: Global codecarbon log level. Accepts one of: {"debug", "info", "warning", "error", "critical"}. Defaults to "info". @@ -1367,6 +1404,7 @@ def wrapped_fn(*args, **kwargs): gpu_ids=gpu_ids, electricitymaps_api_token=_electricitymaps_token, tracking_mode=tracking_mode, + tracking_pids=tracking_pids, log_level=log_level, on_csv_write=on_csv_write, logger_preamble=logger_preamble, @@ -1406,6 +1444,7 @@ def wrapped_fn(*args, **kwargs): experiment_name=experiment_name, electricitymaps_api_token=_electricitymaps_token, tracking_mode=tracking_mode, + tracking_pids=tracking_pids, log_level=log_level, on_csv_write=on_csv_write, logger_preamble=logger_preamble, diff --git a/codecarbon/external/cpu.py b/codecarbon/external/cpu.py new file mode 100644 index 000000000..0dcb1691a --- /dev/null +++ b/codecarbon/external/cpu.py @@ -0,0 +1,305 @@ +""" +CPU Power consumption and metrics handling +""" + +import math +import re +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple + +import psutil + +from codecarbon.core.cpu import IntelPowerGadget, IntelRAPL +from codecarbon.core.units import Energy, Power, Time +from codecarbon.core.util import count_cpus, detect_cpu_model +from codecarbon.external.hardware import BaseHardware +from codecarbon.external.logger import logger + +# default W value for a CPU if no model is found in the ref csv +POWER_CONSTANT = 85 + +# ratio of TDP estimated to be consumed on average +CONSUMPTION_PERCENTAGE_CONSTANT = 0.5 + + +MODE_CPU_LOAD = "cpu_load" + + +@dataclass +class CPU(BaseHardware): + def __init__( + self, + output_dir: str, + mode: str, + model: str, + tdp: int, + tracking_pids: Optional[List[int]], + rapl_dir: str = "/sys/class/powercap/intel-rapl/subsystem", + tracking_mode: str = "machine", + rapl_include_dram: bool = False, + rapl_prefer_psys: bool = False, + ): + assert tracking_mode in ["machine", "process"] + self._power_history: List[Power] = [] + self._output_dir = output_dir + self._mode = mode + self._model = model + self._tdp = tdp + self._is_generic_tdp = False + self._tracking_mode = tracking_mode + self._tracking_pids = tracking_pids + self._cpu_count = count_cpus() + + if self._mode == "intel_power_gadget": + self._intel_interface = IntelPowerGadget(self._output_dir) + elif self._mode == "intel_rapl": + self._intel_interface = IntelRAPL( + rapl_dir=rapl_dir, + rapl_include_dram=rapl_include_dram, + rapl_prefer_psys=rapl_prefer_psys, + ) + + def __repr__(self) -> str: + if self._mode != "constant": + return f"CPU({' '.join(map(str.capitalize, self._mode.split('_')))})" + + s = f"CPU({self._model} > {self._tdp}W" + + if self._is_generic_tdp: + s += " [generic]" + + return s + ")" + + @staticmethod + def _calculate_power_from_cpu_load(tdp, cpu_load, model): + if "AMD Ryzen Threadripper" in model: + return CPU._calculate_power_from_cpu_load_treadripper(tdp, cpu_load) + else: + # Minimum power consumption is 10% of TDP + return max(tdp * (cpu_load / 100.0), tdp * 0.1) + + @staticmethod + def _calculate_power_from_cpu_load_treadripper(tdp, cpu_load): + load = cpu_load / 100.0 + + if load < 0.1: # Below 10% CPU load + return tdp * (0.05 * load * 10) + elif load <= 0.3: # 10-30% load - linear phase + return tdp * (0.05 + 1.8 * (load - 0.1)) + elif load <= 0.5: # 30-50% load - adjusted coefficients + # Increased base power and adjusted curve + base_power = 0.45 # Increased from 0.41 + power_range = 0.50 # Increased from 0.44 + factor = ((load - 0.3) / 0.2) ** 1.8 # Reduced power from 2.0 to 1.8 + return tdp * (base_power + power_range * factor) + else: # Above 50% - plateau phase + return tdp * (0.85 + 0.15 * (1 - math.exp(-(load - 0.5) * 5))) + + def get_cpu_load(self) -> float: + """ + Get CPU load percentage + :return: CPU load in percentage + """ + if self._tracking_mode == "machine": + cpu_load = psutil.cpu_percent(interval=0.5, percpu=False) + logger.debug(f"Total CPU load: {cpu_load:.1f} %") + elif self._tracking_mode == "process": + cpu_load = 0 + + for pid in self._tracking_pids: + if not psutil.pid_exists(pid): + # Log a warning and continue + logger.warning(f"Process with pid {pid} does not exist anymore.") + continue + self._process = psutil.Process(pid) + try: + cpu_load += self._process.cpu_percent(interval=0.5) + except (psutil.NoSuchProcess, psutil.AccessDenied): + # Main process terminated or access denied + continue + + children = self._process.children(recursive=True) + for child in children: + try: + # Use interval=0.0 for children to avoid blocking + child_cpu = child.cpu_percent(interval=0.0) + logger.debug(f"Child {child.pid} CPU: {child_cpu}") + cpu_load += child_cpu + except ( + psutil.NoSuchProcess, + psutil.AccessDenied, + psutil.ZombieProcess, + ): + # Child process may have terminated or we don't have access + continue + + # Normalize by CPU count + logger.info(f"Total CPU load (all processes): {cpu_load}") + cpu_load = cpu_load / self._cpu_count + else: + raise Exception(f"Unknown tracking_mode {self._tracking_mode}") + return cpu_load + + def _get_power_from_cpu_load(self): + """ + When in MODE_CPU_LOAD + """ + + cpu_load = self.get_cpu_load() + + if self._tracking_mode == "machine": + logger.debug(f"CPU load : {self._tdp=} W and {cpu_load:.1f} %") + # Cubic relationship with minimum 10% of TDP + load_factor = 0.1 + 0.9 * ((cpu_load / 100.0) ** 3) + power = self._tdp * load_factor + logger.debug( + f"CPU load {self._tdp} W and {cpu_load:.1f}% {load_factor=} => estimation of {power} W for whole machine." + ) + elif self._tracking_mode == "process": + # Normalize by CPU count + logger.info(f"Total CPU load (all processes): {cpu_load}") + power = self._tdp * cpu_load / 100 + logger.debug( + f"CPU load {self._tdp} W and {cpu_load * 100:.1f}% => estimation of {power} W for processes {self._tracking_pids} (including children)." + ) + else: + raise Exception(f"Unknown tracking_mode {self._tracking_mode}") + return Power.from_watts(power) + + def _get_power_from_cpus(self) -> Power: + """ + Get CPU power + :return: power in kW + """ + if self._mode == MODE_CPU_LOAD: + power = self._get_power_from_cpu_load() + return power + elif self._mode == "constant": + power = self._tdp * CONSUMPTION_PERCENTAGE_CONSTANT + return Power.from_watts(power) + if self._mode == "intel_rapl": + # Don't call get_cpu_details to avoid computing energy twice and losing data. + all_cpu_details: Dict = self._intel_interface.get_static_cpu_details() + else: + all_cpu_details: Dict = self._intel_interface.get_cpu_details() + + power = 0 + for metric, value in all_cpu_details.items(): + # "^Processor Power_\d+\(Watt\)$" for Intel Power Gadget + if re.match(r"^Processor Power", metric): + power += value + logger.debug(f"_get_power_from_cpus - MATCH {metric} : {value}") + else: + logger.debug(f"_get_power_from_cpus - DONT MATCH {metric} : {value}") + + # Rescale power with correct tracking mode + # Machine -> 100% + # Process -> With CPU load + cpu_load = self.get_cpu_load() + power = self._tdp * cpu_load / 100 + logger.debug( + f"Estimated CPU power for processes {self._tracking_pids} (including children): {power} W based on CPU load {cpu_load} % and TDP {self._tdp} W." + ) + + return Power.from_watts(power) + + def _get_energy_from_cpus(self, delay: Time) -> Energy: + """ + Get CPU energy deltas from RAPL files + :return: energy in kWh + """ + all_cpu_details: Dict = self._intel_interface.get_cpu_details(delay) + + delta_energy = 0 + for metric, value in all_cpu_details.items(): + if re.match(r"^Processor Energy Delta_\d", metric): + delta_energy += value + logger.debug(f"_get_energy_from_cpus - MATCH {metric} : {value}") + + # Rescale energy with correct tracking mode + # get_cpu_details should never return total energy + # Machine -> 100% + # Process -> With CPU load + cpu_load = self.get_cpu_load() + delta_energy = self._tdp * cpu_load / 100 + logger.debug( + f"Estimated CPU power for processes {self._tracking_pids} (including children): {delta_energy} W based on CPU load {cpu_load} % and TDP {self._tdp} W." + ) + + return Energy.from_energy(delta_energy) + + def total_power(self) -> Power: + self._power_history.append(self._get_power_from_cpus()) + if len(self._power_history) == 0: + logger.warning("Power history is empty, returning 0 W") + return Power.from_watts(0) + power_history_in_W = [power.W for power in self._power_history] + cpu_power = sum(power_history_in_W) / len(power_history_in_W) + self._power_history = [] + return Power.from_watts(cpu_power) + + def measure_power_and_energy(self, last_duration: float) -> Tuple[Power, Energy]: + if self._mode == "intel_rapl": + energy = self._get_energy_from_cpus(delay=Time(seconds=last_duration)) + power = self.total_power() + return power, energy + # If not intel_rapl, we call the parent method from BaseHardware + # to compute energy from power and time + return super().measure_power_and_energy(last_duration=last_duration) + + def start(self): + if self._mode in ["intel_power_gadget", "intel_rapl", "apple_powermetrics"]: + self._intel_interface.start() + if self._mode == MODE_CPU_LOAD: + # The first time this is called it will return a meaningless 0.0 value which you are supposed to ignore. + _ = self._get_power_from_cpu_load() + + def monitor_power(self): + cpu_power = self._get_power_from_cpus() + self._power_history.append(cpu_power) + + def get_model(self): + return self._model + + @classmethod + def from_utils( + cls, + output_dir: str, + mode: str, + model: Optional[str] = None, + tdp: Optional[int] = None, + tracking_mode: str = "machine", + tracking_pids: Optional[List[int]] = None, + rapl_include_dram: bool = False, + rapl_prefer_psys: bool = False, + ) -> "CPU": + if model is None: + model = detect_cpu_model() + if model is None: + logger.warning("Could not read CPU model.") + + if tdp is None: + tdp = POWER_CONSTANT + cpu = cls( + output_dir=output_dir, + mode=mode, + model=model, + tdp=tdp, + rapl_include_dram=rapl_include_dram, + rapl_prefer_psys=rapl_prefer_psys, + tracking_mode=tracking_mode, + tracking_pids=tracking_pids, + ) + cpu._is_generic_tdp = True + return cpu + + return cls( + output_dir=output_dir, + mode=mode, + model=model, + tdp=tdp, + tracking_mode=tracking_mode, + tracking_pids=tracking_pids, + rapl_include_dram=rapl_include_dram, + rapl_prefer_psys=rapl_prefer_psys, + ) diff --git a/codecarbon/external/hardware.py b/codecarbon/external/hardware.py index 30ce125ee..c58490547 100644 --- a/codecarbon/external/hardware.py +++ b/codecarbon/external/hardware.py @@ -2,37 +2,26 @@ Encapsulates external dependencies to retrieve hardware metadata """ -import math import re from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Dict, Iterable, List, Optional, Tuple -import psutil - -from codecarbon.core.cpu import IntelPowerGadget, IntelRAPL from codecarbon.core.gpu import AllGPUDevices from codecarbon.core.powermetrics import ApplePowermetrics from codecarbon.core.units import Energy, Power, Time -from codecarbon.core.util import count_cpus, detect_cpu_model +from codecarbon.core.util import detect_cpu_model from codecarbon.external.logger import logger -# default W value for a CPU if no model is found in the ref csv -POWER_CONSTANT = 85 - -# ratio of TDP estimated to be consumed on average -CONSUMPTION_PERCENTAGE_CONSTANT = 0.5 - B_TO_GB = 1024 * 1024 * 1024 -MODE_CPU_LOAD = "cpu_load" - @dataclass class BaseHardware(ABC): @abstractmethod def total_power(self) -> Power: - pass + raise NotImplementedError() + # pass def description(self) -> str: return repr(self) @@ -43,9 +32,11 @@ def measure_power_and_energy(self, last_duration: float) -> Tuple[Power, Energy] hardware and convert it to energy. """ power = self.total_power() + energy = Energy.from_power_and_time( power=power, time=Time.from_seconds(last_duration) ) + return power, energy def start(self) -> None: # noqa B027 @@ -158,217 +149,6 @@ def from_utils(cls, gpu_ids: Optional[List] = None) -> "GPU": return cls(gpu_ids=new_gpu_ids) -@dataclass -class CPU(BaseHardware): - def __init__( - self, - output_dir: str, - mode: str, - model: str, - tdp: int, - rapl_dir: str = "/sys/class/powercap/intel-rapl/subsystem", - tracking_mode: str = "machine", - rapl_include_dram: bool = False, - rapl_prefer_psys: bool = False, - ): - assert tracking_mode in ["machine", "process"] - self._power_history: List[Power] = [] - self._output_dir = output_dir - self._mode = mode - self._model = model - self._tdp = tdp - self._is_generic_tdp = False - self._tracking_mode = tracking_mode - self._pid = psutil.Process().pid - self._cpu_count = count_cpus() - self._process = psutil.Process(self._pid) - - if self._mode == "intel_power_gadget": - self._intel_interface = IntelPowerGadget(self._output_dir) - elif self._mode == "intel_rapl": - self._intel_interface = IntelRAPL( - rapl_dir=rapl_dir, - rapl_include_dram=rapl_include_dram, - rapl_prefer_psys=rapl_prefer_psys, - ) - - def __repr__(self) -> str: - if self._mode != "constant": - return f"CPU({' '.join(map(str.capitalize, self._mode.split('_')))})" - - s = f"CPU({self._model} > {self._tdp}W" - - if self._is_generic_tdp: - s += " [generic]" - - return s + ")" - - @staticmethod - def _calculate_power_from_cpu_load(tdp, cpu_load, model): - if "AMD Ryzen Threadripper" in model: - return CPU._calculate_power_from_cpu_load_treadripper(tdp, cpu_load) - else: - # Minimum power consumption is 10% of TDP - return max(tdp * (cpu_load / 100.0), tdp * 0.1) - - @staticmethod - def _calculate_power_from_cpu_load_treadripper(tdp, cpu_load): - load = cpu_load / 100.0 - - if load < 0.1: # Below 10% CPU load - return tdp * (0.05 * load * 10) - elif load <= 0.3: # 10-30% load - linear phase - return tdp * (0.05 + 1.8 * (load - 0.1)) - elif load <= 0.5: # 30-50% load - adjusted coefficients - # Increased base power and adjusted curve - base_power = 0.45 # Increased from 0.41 - power_range = 0.50 # Increased from 0.44 - factor = ((load - 0.3) / 0.2) ** 1.8 # Reduced power from 2.0 to 1.8 - return tdp * (base_power + power_range * factor) - else: # Above 50% - plateau phase - return tdp * (0.85 + 0.15 * (1 - math.exp(-(load - 0.5) * 5))) - - def _get_power_from_cpu_load(self): - """ - When in MODE_CPU_LOAD - """ - if self._tracking_mode == "machine": - tdp = self._tdp - cpu_load = psutil.cpu_percent( - interval=0.5, percpu=False - ) # Convert to 0-1 range - logger.debug(f"CPU load : {self._tdp=} W and {cpu_load:.1f} %") - # Cubic relationship with minimum 10% of TDP - load_factor = 0.1 + 0.9 * ((cpu_load / 100.0) ** 3) - power = tdp * load_factor - logger.debug( - f"CPU load {self._tdp} W and {cpu_load:.1f}% {load_factor=} => estimation of {power} W for whole machine." - ) - elif self._tracking_mode == "process": - - cpu_load = self._process.cpu_percent(interval=0.5) / self._cpu_count - power = self._tdp * cpu_load / 100 - logger.debug( - f"CPU load {self._tdp} W and {cpu_load * 100:.1f}% => estimation of {power} W for process {self._pid}." - ) - else: - raise Exception(f"Unknown tracking_mode {self._tracking_mode}") - return Power.from_watts(power) - - def _get_power_from_cpus(self) -> Power: - """ - Get CPU power - :return: power in kW - """ - if self._mode == MODE_CPU_LOAD: - power = self._get_power_from_cpu_load() - return power - elif self._mode == "constant": - power = self._tdp * CONSUMPTION_PERCENTAGE_CONSTANT - return Power.from_watts(power) - if self._mode == "intel_rapl": - # Don't call get_cpu_details to avoid computing energy twice and losing data. - all_cpu_details: Dict = self._intel_interface.get_static_cpu_details() - else: - all_cpu_details: Dict = self._intel_interface.get_cpu_details() - - power = 0 - for metric, value in all_cpu_details.items(): - # "^Processor Power_\d+\(Watt\)$" for Intel Power Gadget - if re.match(r"^Processor Power", metric): - power += value - logger.debug(f"_get_power_from_cpus - MATCH {metric} : {value}") - else: - logger.debug(f"_get_power_from_cpus - DONT MATCH {metric} : {value}") - return Power.from_watts(power) - - def _get_energy_from_cpus(self, delay: Time) -> Energy: - """ - Get CPU energy deltas from RAPL files - :return: energy in kWh - """ - all_cpu_details: Dict = self._intel_interface.get_cpu_details(delay) - - energy = 0 - for metric, value in all_cpu_details.items(): - if re.match(r"^Processor Energy Delta_\d", metric): - energy += value - # logger.debug(f"_get_energy_from_cpus - MATCH {metric} : {value}") - return Energy.from_energy(energy) - - def total_power(self) -> Power: - self._power_history.append(self._get_power_from_cpus()) - if len(self._power_history) == 0: - logger.warning("Power history is empty, returning 0 W") - return Power.from_watts(0) - power_history_in_W = [power.W for power in self._power_history] - cpu_power = sum(power_history_in_W) / len(power_history_in_W) - self._power_history = [] - return Power.from_watts(cpu_power) - - def measure_power_and_energy(self, last_duration: float) -> Tuple[Power, Energy]: - if self._mode == "intel_rapl": - energy = self._get_energy_from_cpus(delay=Time(seconds=last_duration)) - power = self.total_power() - return power, energy - # If not intel_rapl, we call the parent method from BaseHardware - # to compute energy from power and time - return super().measure_power_and_energy(last_duration=last_duration) - - def start(self): - if self._mode in ["intel_power_gadget", "intel_rapl", "apple_powermetrics"]: - self._intel_interface.start() - if self._mode == MODE_CPU_LOAD: - # The first time this is called it will return a meaningless 0.0 value which you are supposed to ignore. - _ = self._get_power_from_cpu_load() - - def monitor_power(self): - cpu_power = self._get_power_from_cpus() - self._power_history.append(cpu_power) - - def get_model(self): - return self._model - - @classmethod - def from_utils( - cls, - output_dir: str, - mode: str, - model: Optional[str] = None, - tdp: Optional[int] = None, - tracking_mode: str = "machine", - rapl_include_dram: bool = False, - rapl_prefer_psys: bool = False, - ) -> "CPU": - if model is None: - model = detect_cpu_model() - if model is None: - logger.warning("Could not read CPU model.") - - if tdp is None: - tdp = POWER_CONSTANT - cpu = cls( - output_dir=output_dir, - mode=mode, - model=model, - tdp=tdp, - rapl_include_dram=rapl_include_dram, - rapl_prefer_psys=rapl_prefer_psys, - ) - cpu._is_generic_tdp = True - return cpu - - return cls( - output_dir=output_dir, - mode=mode, - model=model, - tdp=tdp, - tracking_mode=tracking_mode, - rapl_include_dram=rapl_include_dram, - rapl_prefer_psys=rapl_prefer_psys, - ) - - @dataclass class AppleSiliconChip(BaseHardware): def __init__( diff --git a/codecarbon/external/ram.py b/codecarbon/external/ram.py index 20a5c2fad..e6205183f 100644 --- a/codecarbon/external/ram.py +++ b/codecarbon/external/ram.py @@ -2,7 +2,7 @@ import re import subprocess from dataclasses import dataclass -from typing import Optional +from typing import List, Optional import psutil @@ -34,9 +34,9 @@ class RAM(BaseHardware): def __init__( self, - pid: int = psutil.Process().pid, children: bool = True, tracking_mode: str = "machine", + tracking_pids: Optional[List[int]] = None, force_ram_power: Optional[int] = None, ): """ @@ -45,19 +45,21 @@ def __init__( is True. Args: - pid (int, optional): Process id (with respect to which we'll look for - children). Defaults to psutil.Process().pid. children (int, optional): Look for children of the process when computing total RAM used. Defaults to True. tracking_mode (str, optional): Whether to track "machine" or "process" RAM. Defaults to "machine". + tracking_pids ([int], optional): Process id to track RAM usage for "process" + tracking_mode. Defaults to None. force_ram_power (int, optional): User-provided RAM power in watts. If provided, this value is used instead of estimating RAM power. Defaults to None. """ - self._pid = pid self._children = children self._tracking_mode = tracking_mode + + self._tracking_pids = tracking_pids + self._force_ram_power = force_ram_power # Check if using ARM architecture self.is_arm_cpu = self._detect_arm_cpu() @@ -192,16 +194,21 @@ def _calculate_ram_power(self, memory_gb: float) -> float: # Apply minimum power constraint return max(min_power, total_power) - def _get_children_memories(self): + def _get_children_memories(self, pid: int): """ Compute the used RAM by the process's children Returns: list(int): The list of RAM values """ - current_process = psutil.Process(self._pid) + memorie_consumption = dict() + current_process = psutil.Process(pid) + children = current_process.children(recursive=True) - return [child.memory_info().rss for child in children] + for child in children: + memorie_consumption[child.pid] = child.memory_info().rss + + return memorie_consumption def _read_slurm_scontrol(self): try: @@ -285,17 +292,35 @@ def slurm_memory_GB(self): return mem @property - def process_memory_GB(self): + def process_memory_GB(self) -> float: """ Property to compute the process's total memory usage in bytes. Returns: float: RAM usage (GB) """ - children_memories = self._get_children_memories() if self._children else [] - main_memory = psutil.Process(self._pid).memory_info().rss - memories = children_memories + [main_memory] - return sum([m for m in memories if m] + [0]) / B_TO_GB + + # Store memory usage in dict to avoid double counting + total_memory = dict() + + for pid in self._tracking_pids: + if not psutil.pid_exists(pid): + logger.warning(f"Process with pid {pid} does not exist anymore.") + continue + + # Own memory + total_memory[pid] = psutil.Process(pid).memory_info().rss + + # Children's memory + children_memories = self._get_children_memories(pid) + for child_pid, mem in children_memories.items(): + total_memory[child_pid] = mem + + # Reduce to total memory + total_memory = sum(total_memory.values()) + logger.debug(f"Process total memory usage: {total_memory / B_TO_GB:.2f} GB") + + return total_memory / B_TO_GB @property def machine_memory_GB(self): diff --git a/docs/_sources/parameters.rst.txt b/docs/_sources/parameters.rst.txt index cf7884c88..26b17f896 100644 --- a/docs/_sources/parameters.rst.txt +++ b/docs/_sources/parameters.rst.txt @@ -24,6 +24,9 @@ Input Parameters * - tracking_mode - | ``machine`` measure the power consumptions of the entire machine (default) | ``process`` try and isolate the tracked processes in isolation + * - tracking_pids + - | List of PIDs to track when using ``process`` tracking mode, + | defaults to ``None``, which tracks the current process * - gpu_ids - | Comma-separated list of GPU ids to track, defaults to ``None`` | These can either be integer indexes of GPUs on the system, or prefixes diff --git a/docs/edit/parameters.rst b/docs/edit/parameters.rst index cf7884c88..26b17f896 100644 --- a/docs/edit/parameters.rst +++ b/docs/edit/parameters.rst @@ -24,6 +24,9 @@ Input Parameters * - tracking_mode - | ``machine`` measure the power consumptions of the entire machine (default) | ``process`` try and isolate the tracked processes in isolation + * - tracking_pids + - | List of PIDs to track when using ``process`` tracking mode, + | defaults to ``None``, which tracks the current process * - gpu_ids - | Comma-separated list of GPU ids to track, defaults to ``None`` | These can either be integer indexes of GPUs on the system, or prefixes diff --git a/examples/compare_cpu_load_and_RAPL.py b/examples/compare_cpu_load_and_RAPL.py index 9798d2a92..fb74d77c9 100644 --- a/examples/compare_cpu_load_and_RAPL.py +++ b/examples/compare_cpu_load_and_RAPL.py @@ -34,7 +34,7 @@ print("WARNING : No tapo module found !!!") from codecarbon import EmissionsTracker -from codecarbon.external.hardware import CPU, MODE_CPU_LOAD +from codecarbon.external.cpu import CPU, MODE_CPU_LOAD measure_power_secs = 10 test_phase_duration = 30 diff --git a/examples/slurm_logging.py b/examples/slurm_logging.py new file mode 100755 index 000000000..6ebdcb693 --- /dev/null +++ b/examples/slurm_logging.py @@ -0,0 +1,207 @@ +#!/root/.venv/codecarbon/bin/python3 + +import argparse +import logging +import os +import subprocess as sp +import sys +import time +import traceback + +import psutil + +from codecarbon import OfflineEmissionsTracker + + +def _print_process_tree(proc, indent=0): + prefix = " " * indent + try: + name = proc.name() + pid = proc.pid + except psutil.NoSuchProcess: + return + + log_message(f"{prefix}{name} (pid {pid})\n") + + # Children + for child in proc.children(recursive=False): + _print_process_tree(child, indent + 4) + + +def print_process_tree(pid=os.getpid()): + current = psutil.Process(pid) + log_message("\n=== Parent Tree ===\n") + p = current + stack = [] + while p is not None: + stack.append(p) + p = p.parent() + + # Print ancestors from root → current + for proc in reversed(stack): + log_message(f"{proc.name()} (pid {proc.pid})\n") + log_message("\n=== Children Tree ===\n") + _print_process_tree(current) + + +def query_slurm_pids(jobid): + + try: + sp_output = sp.check_output( + ["/usr/local/bin/scontrol", "listpids", str(jobid)], stderr=sp.STDOUT + ) + log_message(f"scontrol output:\n{sp_output.decode()}") + except sp.CalledProcessError as e: + log_message(f"scontrol failed for job {jobid}\n") + log_message(f"Return code: {e.returncode}\n") + log_message(f"Output:\n{e.output.decode(errors='replace')}\n") + return [] + except Exception as e: + # Catch-all for other failures + log_message(f"Unexpected error calling scontrol: {e}\n") + return [] + + pids = [] + lines = sp_output.decode().strip().splitlines() + for line in lines[1:]: # Skip the first line + parts = line.split() + if not parts: + continue + + pid = parts[0] + # skip invalid PIDs + if pid in ("-1", "-"): + continue + + try: + pids.append(int(pid)) + except ValueError: + # In case pid is something unexpected + continue + + return pids + + +def log_message(message): + print(message) + if logfile is not None: + logfile.write(message + "\n") + logfile.flush() + + +def build_argument_parser(): + parser = argparse.ArgumentParser(description="CodeCarbon job wrapper") + group_ids = parser.add_mutually_exclusive_group(required=True) + group_ids.add_argument( + "--jobid", type=int, required=False, default=None, help="SLURM Job ID" + ) + group_ids.add_argument( + "--pids", type=int, nargs="+", required=False, default=[], help="Process ID" + ) + + parser.add_argument("--user", type=str, required=True, help="SLURM Job User") + parser.add_argument( + "--gpuids", + type=str, + required=False, + help="Comma-separated GPU IDs assigned to the job", + ) + return parser + + +################################################################### + +# Loglevel debug +logging.basicConfig(level=logging.DEBUG) + +logfile = None +try: + parser = build_argument_parser() + args = parser.parse_args() + + jobid = args.jobid + pids = args.pids + + user = args.user + if args.gpuids: + gpuids = args.gpuids.split(",") + else: + gpuids = [] + + os.environ["SLURM_JOB_ID"] = str(jobid) + os.environ["SLURM_JOB_USER"] = str(user) + os.environ["SLURM_JOB_GPUS"] = ",".join(gpuids) + + logfile = open(f"/tmp/cc_{jobid}.log", "w", buffering=1) + log_message("Python started") + log_message(f"Interpreter: {sys.executable}") + + log_message("CodeCarbon SLURM Prolog Script Started") + log_message(f"Time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}") + + log_message("Available environment variables:") + for key, value in os.environ.items(): + log_message(f"{key}: {value}") + + log_message("Wait 60 seconds to allow job processes to start") + for i in range(60): + log_message(f" Waiting... {1 * i} seconds elapsed") + time.sleep(1) # Give some time for the job to start properly + + log_message( + "Wait completed at " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ) + + if jobid is None: + log_message(f"Using provided PIDs: {pids}") + else: + log_message('Parse scontrol for process IDs with "scontrol listpids"') + pids = query_slurm_pids(jobid) + + log_message(f"Found PIDs: {pids}") + + for pid in pids: + log_message(f"Process tree for PID {pid}:") + print_process_tree(pid) + + log_message(f"Job ID: {jobid}, User: {user}, GPU IDs: {gpuids}") + + tracker = OfflineEmissionsTracker( + country_iso_code="DEU", + region="DE-NW", + measure_power_secs=10, + api_call_interval=2, + gpu_ids=f"{gpuids}", + tracking_mode="process", + tracking_pids=args.jobid, + save_to_prometheus=True, + prometheus_url="129.217.31.239:9091", + project_name=f"{user}", + experiment_name=f"{jobid}", + output_dir="/tmp/codecarbon_log/", + output_file="/tmp/codecarbon_log/emission.csv", + ) + + tracker.start() + + # Check for termination signal every second + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + log_message("Received termination signal. Stopping CodeCarbon tracker...") + except Exception as e: + log_message(f"Exception in tracking loop: {e}") + raise e + finally: + tracker.stop() + log_message("CodeCarbon tracker stopped.") + +except Exception: + log_message("Exception occurred:") + log_message(traceback.format_exc()) + +finally: + if logfile is not None: + log_message("CodeCarbon SLURM Prolog Script Ended") + logfile.close() diff --git a/examples/slurm_prolog.sh b/examples/slurm_prolog.sh new file mode 100755 index 000000000..eaa24859c --- /dev/null +++ b/examples/slurm_prolog.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +JOBID=$SLURM_JOB_ID +LOGFILE="/tmp/prolog_${JOBID}.log" +PIDFILE="/tmp/prolog_${JOBID}.pid" + +echo "Starting CodeCarbon for job $JOBID" >> "$LOGFILE" + +mkdir -p /tmp/codecarbon_log/ + +# Check if GPU IDs are available +if [ -z "$SLURM_JOB_GPUS" ]; then + # We cannot inherit the cgroup because slum kills the entire cgroup on end of initialization + systemd-run --unit codecarbon_$JOBID \ + /etc/slurm/pyscripts/_codecarbon.py \ + --jobid $JOBID \ + --user $SLURM_JOB_USER \ + &>> "$LOGFILE" +else + systemd-run --unit codecarbon_$JOBID \ + /etc/slurm/pyscripts/_codecarbon.py \ + --jobid $JOBID \ + --user $SLURM_JOB_USER \ + --gpuids $SLURM_JOB_GPUS \ + &>> "$LOGFILE" +fi + +# Save PID for epilog +sleep 1 +exit 0 + + diff --git a/tests/test_cpu.py b/tests/test_cpu.py index 5453ac4ef..2a270c93c 100644 --- a/tests/test_cpu.py +++ b/tests/test_cpu.py @@ -14,7 +14,7 @@ ) from codecarbon.core.units import Energy, Power, Time from codecarbon.core.util import count_physical_cpus -from codecarbon.external.hardware import CPU +from codecarbon.external.cpu import CPU from codecarbon.input import DataSource diff --git a/tests/test_cpu_load.py b/tests/test_cpu_load.py index 314f7f4d2..df29916f2 100644 --- a/tests/test_cpu_load.py +++ b/tests/test_cpu_load.py @@ -1,10 +1,12 @@ +import os import unittest from time import sleep from unittest import mock from codecarbon.core.units import Power from codecarbon.emissions_tracker import OfflineEmissionsTracker -from codecarbon.external.hardware import CPU, MODE_CPU_LOAD, AppleSiliconChip +from codecarbon.external.cpu import CPU, MODE_CPU_LOAD +from codecarbon.external.hardware import AppleSiliconChip @mock.patch("codecarbon.core.cpu.is_psutil_available", return_value=True) @@ -23,6 +25,7 @@ def test_cpu_total_power_process( "Intel(R) Core(TM) i7-7600U CPU @ 2.80GHz", 100, tracking_mode="process", + tracking_pids=[os.getpid()], ) cpu.start() sleep(0.5) @@ -30,7 +33,7 @@ def test_cpu_total_power_process( self.assertGreaterEqual(power.W, 0.0) @mock.patch( - "codecarbon.external.hardware.CPU._get_power_from_cpu_load", + "codecarbon.external.cpu.CPU._get_power_from_cpu_load", return_value=Power.from_watts(50), ) def test_cpu_total_power( @@ -41,7 +44,11 @@ def test_cpu_total_power( mocked_get_power_from_cpu_load, ): cpu = CPU.from_utils( - None, MODE_CPU_LOAD, "Intel(R) Core(TM) i7-7600U CPU @ 2.80GHz", 100 + None, + MODE_CPU_LOAD, + "Intel(R) Core(TM) i7-7600U CPU @ 2.80GHz", + 100, + tracking_mode="machine", ) cpu.start() sleep(0.5) @@ -76,7 +83,9 @@ def test_cpu_calculate_power_from_cpu_load_threadripper( ): tdp = 100 cpu_model = "AMD Ryzen Threadripper 3990X 64-Core Processor" - cpu = CPU.from_utils(None, MODE_CPU_LOAD, cpu_model, tdp) + cpu = CPU.from_utils( + None, MODE_CPU_LOAD, cpu_model, tdp, tracking_mode="machine" + ) tests_values = [ { "cpu_load": 0.0, @@ -103,7 +112,9 @@ def test_cpu_calculate_power_from_cpu_load_linear( ): tdp = 100 cpu_model = "Random Processor" - cpu = CPU.from_utils(None, MODE_CPU_LOAD, cpu_model, tdp) + cpu = CPU.from_utils( + None, MODE_CPU_LOAD, cpu_model, tdp, tracking_mode="machine" + ) tests_values = [ { "cpu_load": 0.0, diff --git a/tests/test_emissions_tracker.py b/tests/test_emissions_tracker.py index b330fc77a..b807f9535 100644 --- a/tests/test_emissions_tracker.py +++ b/tests/test_emissions_tracker.py @@ -420,7 +420,7 @@ def test_carbon_tracker_online_context_manager_TWO_GPU_PRIVATE_INFRA_CANADA( self.assertAlmostEqual(tracker.final_emissions, 6.262572537957655e-05, places=2) @mock.patch("codecarbon.external.ram.RAM.measure_power_and_energy") - @mock.patch("codecarbon.external.hardware.CPU.measure_power_and_energy") + @mock.patch("codecarbon.external.cpu.CPU.measure_power_and_energy") @mock.patch( "codecarbon.external.hardware.AppleSiliconChip.measure_power_and_energy", autospec=True, diff --git a/tests/test_pid_tracking.py b/tests/test_pid_tracking.py new file mode 100644 index 000000000..427572390 --- /dev/null +++ b/tests/test_pid_tracking.py @@ -0,0 +1,110 @@ +import os +import subprocess as sp +import tempfile +import time +import unittest + +from codecarbon.emissions_tracker import OfflineEmissionsTracker + +python_load_code = """ +import math +i = 0 +erg = 0 +while True: + i += 1 + a = math.sqrt(64*64*64*64*64) + erg += a +print(erg) +""" + + +class TestPIDTracking(unittest.TestCase): + def setUp(self) -> None: + self.project_name = "project_TestPIDTracking" + self.emissions_file = "emissions-test-TestPIDTracking" + self.emissions_path = tempfile.gettempdir() + self.emissions_file_path = os.path.join( + self.emissions_path, self.emissions_file + ) + if os.path.isfile(self.emissions_file_path): + os.remove(self.emissions_file_path) + + self.pids = [] + self.process = [] + for _ in range(4): + self.process.append(sp.Popen(["python", "-c", python_load_code])) + self.pids.append(self.process[-1].pid) + self.pids.append(os.getpid()) + + def tearDown(self) -> None: + if os.path.isfile(self.emissions_file_path): + os.remove(self.emissions_file_path) + + for proc in self.process: + if proc.poll() is None: # Check if process is still running + proc.terminate() + proc.wait() + + def print_process_tree(self, pid, level=0): + import psutil + + try: + process = psutil.Process(pid) + print(" " * (level * 2) + f"PID: {process.pid}, Name: {process.name()}") + children = process.children() + for child in children: + self.print_process_tree(child.pid, level + 1) + except psutil.NoSuchProcess: + print(" " * (level * 2) + f"PID: {pid} does not exist.") + + def test_carbon_pid_tracking_offline(self): + + # Print PID structure + main_pid = os.getpid() + self.print_process_tree(main_pid) + + # Subprocess PIDs are children, therefore both should be equal + tracker_pid = OfflineEmissionsTracker( + output_dir=self.emissions_path, + output_file=self.emissions_file + "_pid.csv", + tracking_mode="process", + tracking_pids=self.pids, + gpu_ids=[], + ) + tracker_self = OfflineEmissionsTracker( + output_dir=self.emissions_path, + output_file=self.emissions_file + "_global.csv", + tracking_mode="process", + tracking_pids=[os.getpid()], + gpu_ids=[], + ) + + tracker_pid.start() + tracker_self.start() + + time.sleep(10) + + for proc in self.process: + proc.terminate() + proc.wait() + + time.sleep(1) # Ensure all data is logged + + emissions_pid = tracker_pid.stop() + emissions_self = tracker_self.stop() + + print(f"Emissions (pid): {emissions_pid} kgCO2eq") + print(f"Emissions (self): {emissions_self} kgCO2eq") + + if not isinstance(emissions_pid, float): + print(emissions_pid) + assert isinstance(emissions_pid, float) + + self.assertNotEqual(emissions_pid, 0.0) + + # Compare emissions from both trackers, should be less than 10% difference + diff = abs(emissions_pid - emissions_self) + avg = (emissions_pid + emissions_self) / 2 + percent_diff = (diff / avg) * 100 + print(f"Percent difference: {percent_diff}%") + self.assertLessEqual(percent_diff, 10.0)