diff --git a/pyproject.toml b/pyproject.toml index 50c538da..154f52ac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dependencies = [ "code-meters >= 0.0.3", "conflator >= 0.1.7", "earthkit-data >= 0.19.0", - "earthkit-meteo >= 0.6.1", + "earthkit-meteo >= 1.0.0rc0", "earthkit-time >= 0.1.3", "earthkit-utils >= 0.2.1", "eccodes >= 1.6.1", @@ -69,6 +69,8 @@ pproc-monthly-stats = "pproc.monthly_stats:main" pproc-config="pproc.config_generation:main" pproc-ecpoint = "pproc.ecpoint:main" pproc-flight-levels = "pproc.flight_levels:main" +pproc-dewpoint-ml = "pproc.dewpoint_ml:main" +pproc-dewpoint-pl = "pproc.dewpoint_pl:main" [tool.pytest.ini_options] minversion = "6.0" diff --git a/src/pproc/config/io.py b/src/pproc/config/io.py index c44dbb0a..ae52e977 100644 --- a/src/pproc/config/io.py +++ b/src/pproc/config/io.py @@ -300,3 +300,8 @@ def create_output_model( ) FlightLevelsInputModel = create_input_model("FlightLevels", ["fc", "lnsp"]) FlightLevelsOutputModel = create_output_model("FlightLevels", ["levels"]) + + +DewpointPLInputModel = create_input_model("DewpointPL", ["pl"]) +DewpointMLInputModel = create_input_model("DewpointML", ["ml", "lnsp"]) +DewpointOutputModel = create_output_model("Dewpoint", ["dpt"]) diff --git a/src/pproc/config/types.py b/src/pproc/config/types.py index ce454d03..0201ea95 100644 --- a/src/pproc/config/types.py +++ b/src/pproc/config/types.py @@ -1712,3 +1712,19 @@ def _format_out(self, param: ParamConfig, req) -> dict: req["levtype"] = "fl" req["levelist"] = self.target_flight_levels return req + + +class DewpointPLConfig(AccumConfig): + parallelisation: int = 1 + inputs: io.DewpointPLInputModel + outputs: io.DewpointOutputModel = io.DewpointOutputModel() + parameters: list[ParamConfig] + + +class DewpointMLConfig(AccumConfig): + parallelisation: int = 1 + inputs: io.DewpointMLInputModel + outputs: io.DewpointOutputModel = io.DewpointOutputModel() + parameters: list[ParamConfig] + model: str + n_levels: int diff --git a/src/pproc/dewpoint_ml.py b/src/pproc/dewpoint_ml.py new file mode 100644 index 00000000..776da7f4 --- /dev/null +++ b/src/pproc/dewpoint_ml.py @@ -0,0 +1,127 @@ +# (C) Copyright 2026- ECMWF and individual contributors. + +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation nor +# does it submit to any jurisdiction. + +"""Dew point temperature on model levels""" + +import functools +import logging +import signal +import sys + +from conflator import Conflator +from earthkit.meteo import thermo, vertical +from earthkit.data import FieldList, SimpleFieldList +from meters import ResourceMeter +import numpy as np + +from pproc.common.accumulation_manager import AccumulationManager +from pproc.common.io import write_grib +from pproc.common.parallel import parallel_processing, sigterm_handler +from pproc.common.param_requester import ParamRequester +from pproc.config.types import ParamConfig, DewpointMLConfig +from pproc.common.utils import dict_product + + +logger = logging.getLogger(__name__) + +# GRIB parameter: dew point temperature +DEWPOINT_PARAM_ID = 3017 + + +def dewpoint_iteration( + config: DewpointMLConfig, + pconfig: ParamConfig, + dims: dict, +): + ids = ", ".join(f"{k}={v}" for k, v in dims.items()) + + fields = SimpleFieldList() + for src_name in config.inputs.names: + src_param = getattr(pconfig, src_name, pconfig) + total = pconfig.compute_totalfields(config.inputs, src_name) + requester = ParamRequester(src_param, config.inputs, total, src_name) + with ResourceMeter(f"Retrieve {src_name} {ids}"): + metadata, data = requester.retrieve_data(**dims) + fields += FieldList.from_array(data, [x.to_ekmetadata() for x in metadata]) + + with ResourceMeter(f"Compute dewpoint temperature {ids}"): + A, B = vertical.hybrid_level_parameters(config.n_levels, model=config.model) + + # Surface pressure [Pa] + lnsp = fields.sel(param="lnsp").order_by(number="ascending") + sp_arr = np.exp(lnsp.values) + + # Specific humidity [kg/kg] + q = fields.sel(param="q").order_by(level="ascending", number="ascending") + + # Pressure [Pa] + levels = list(set(q.metadata("vertical.level"))) + levels.sort() + p_arr = vertical.pressure_on_hybrid_levels( + A, B, sp_arr, levels=levels, output="full", vertical_dim=0 + ) + + # Reshape q to match p, where level dim is separate from other dims. + # The ordering of levels and the matching (level-first) ordering of q + # ensures that the levels dim is separated correctly after reshaping. + q_arr = q.values.reshape(p_arr.shape) + + # Dewpoint temperature [K] + dpt_arr = thermo.array.dewpoint_from_specific_humidity(q_arr, p_arr) + + out_dpt = config.outputs.dpt + templates = q.sel(level=levels[0]) + for i, level in enumerate(levels): + for j, values in enumerate(dpt_arr[i]): + write_grib( + out_dpt.target, + templates[j].metadata()._handle, + values.astype(np.float32), + { + **out_dpt.metadata, + **pconfig.metadata, + "paramId": DEWPOINT_PARAM_ID, + "level": level + }, + ) + + config.outputs.dpt.target.flush() + config.recovery.add_checkpoint(param=pconfig.name, **dims) + + +def main(): + sys.stdout.reconfigure(line_buffering=True) + signal.signal(signal.SIGTERM, sigterm_handler) + + cfg = Conflator(app_name="pproc-dewpoint-ml", model=DewpointMLConfig).load() + cfg.initialise() + cfg.print() + + plan = [] + for param in cfg.parameters: + accum_manager = AccumulationManager.create( + param.accumulations, + ) + for dims in dict_product(accum_manager.dims): + if cfg.recovery.existing_checkpoint(param=param.name, **dims): + print(f"Recovery: skipping dims: {param.name} {dims}") + continue + plan.append((param, dims)) + + iteration = functools.partial(dewpoint_iteration, cfg) + parallel_processing( + iteration, + plan, + cfg.parallelisation, + ) + + cfg.clean() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/pproc/dewpoint_pl.py b/src/pproc/dewpoint_pl.py new file mode 100644 index 00000000..649f60e3 --- /dev/null +++ b/src/pproc/dewpoint_pl.py @@ -0,0 +1,111 @@ +# (C) Copyright 2026- ECMWF and individual contributors. + +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation nor +# does it submit to any jurisdiction. + +"""Dew point temperature on pressure levels""" + +import functools +import logging +import signal +import sys + +from conflator import Conflator +from earthkit.meteo import thermo +from earthkit.data import FieldList, SimpleFieldList +from meters import ResourceMeter +import numpy as np + +from pproc.common.accumulation_manager import AccumulationManager +from pproc.common.io import write_grib +from pproc.common.parallel import parallel_processing, sigterm_handler +from pproc.common.param_requester import ParamRequester +from pproc.config.types import ParamConfig, DewpointPLConfig +from pproc.common.utils import dict_product + + +logger = logging.getLogger(__name__) + +# GRIB parameter: dew point temperature +DEWPOINT_PARAM_ID = 3017 + + +def dewpoint_iteration( + config: DewpointPLConfig, + pconfig: ParamConfig, + dims: dict, +): + ids = ", ".join(f"{k}={v}" for k, v in dims.items()) + + fields = SimpleFieldList() + for src_name in config.inputs.names: + src_param = getattr(pconfig, src_name, pconfig) + total = pconfig.compute_totalfields(config.inputs, src_name) + requester = ParamRequester(src_param, config.inputs, total, src_name) + with ResourceMeter(f"Retrieve {src_name} {ids}"): + metadata, data = requester.retrieve_data(**dims) + fields += FieldList.from_array(data, [x.to_ekmetadata() for x in metadata]) + + with ResourceMeter(f"Compute dewpoint temperature {ids}"): + # Specific humidity [kg/kg] + q = fields.sel(param="q") + q_arr = q.values + + # Pressure [Pa] + p = q.metadata("vertical.level") # [hPa] + p_arr = 100. * np.asarray(p)[:,None] # align for broadcasting + + # Dewpoint temperature [K] + dpt_arr = thermo.array.dewpoint_from_specific_humidity(q_arr, p_arr) + + for i, values in enumerate(dpt_arr): + out_dpt = config.outputs.dpt + write_grib( + out_dpt.target, + q[i].metadata()._handle, + values.astype(np.float32), + { + **out_dpt.metadata, + **pconfig.metadata, + "paramId": DEWPOINT_PARAM_ID, + }, + ) + + config.outputs.dpt.target.flush() + config.recovery.add_checkpoint(param=pconfig.name, **dims) + + +def main(): + sys.stdout.reconfigure(line_buffering=True) + signal.signal(signal.SIGTERM, sigterm_handler) + + cfg = Conflator(app_name="pproc-dewpoint-pl", model=DewpointPLConfig).load() + cfg.initialise() + cfg.print() + + plan = [] + for param in cfg.parameters: + accum_manager = AccumulationManager.create( + param.accumulations, + ) + for dims in dict_product(accum_manager.dims): + if cfg.recovery.existing_checkpoint(param=param.name, **dims): + print(f"Recovery: skipping dims: {param.name} {dims}") + continue + plan.append((param, dims)) + + iteration = functools.partial(dewpoint_iteration, cfg) + parallel_processing( + iteration, + plan, + cfg.parallelisation, + ) + + cfg.clean() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/templates/dewpoint-ml.yaml b/tests/templates/dewpoint-ml.yaml new file mode 100644 index 00000000..af219e7a --- /dev/null +++ b/tests/templates/dewpoint-ml.yaml @@ -0,0 +1,50 @@ +parallelisation: 1 + +model: ifs +n_levels: 137 + +parameters: + q: + inputs: + ml: + request: + param: 133 + grid: O1280 + lnsp: + request: + param: 152 + grid: O1280 + +inputs: + ml: + request: + class: od + date: 20260528 + expver: '0001' + levelist: [1, 2] + levtype: ml + step: [0, 96] + stream: oper + time: 00 + type: fc + source: + type: fdb + lnsp: + request: + class: od + date: 20260528 + expver: '0001' + levelist: [1] + levtype: ml + step: [0, 96] + stream: oper + time: 00 + type: fc + source: + type: fdb + +outputs: + dpt: + units: K + target: + type: fdb diff --git a/tests/templates/dewpoint-pl.yaml b/tests/templates/dewpoint-pl.yaml new file mode 100644 index 00000000..83212ae9 --- /dev/null +++ b/tests/templates/dewpoint-pl.yaml @@ -0,0 +1,31 @@ +parallelisation: 1 + +parameters: + q: + inputs: + pl: + request: + param: "133.128" + grid: O1280 + +inputs: + pl: + request: + class: od + date: 20260528 + expver: 1 + levelist: [500, 850] + levtype: pl + number: [1, 2] + step: [0, 24] + stream: enfo + time: 00 + type: pf + source: + type: fdb + +outputs: + dpt: + units: K + target: + type: fdb