Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies = [
"code-meters >= 0.0.3",
"conflator >= 0.1.7",
"earthkit-data >= 0.19.0",
"earthkit-meteo >= 0.6.1",
"earthkit-meteo >= 1.0.0rc0",
"earthkit-time >= 0.1.3",
"earthkit-utils >= 0.2.1",
"eccodes >= 1.6.1",
Expand Down Expand Up @@ -69,6 +69,8 @@ pproc-monthly-stats = "pproc.monthly_stats:main"
pproc-config="pproc.config_generation:main"
pproc-ecpoint = "pproc.ecpoint:main"
pproc-flight-levels = "pproc.flight_levels:main"
pproc-dewpoint-ml = "pproc.dewpoint_ml:main"
pproc-dewpoint-pl = "pproc.dewpoint_pl:main"

[tool.pytest.ini_options]
minversion = "6.0"
Expand Down
5 changes: 5 additions & 0 deletions src/pproc/config/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,8 @@ def create_output_model(
)
FlightLevelsInputModel = create_input_model("FlightLevels", ["fc", "lnsp"])
FlightLevelsOutputModel = create_output_model("FlightLevels", ["levels"])


DewpointPLInputModel = create_input_model("DewpointPL", ["pl"])
DewpointMLInputModel = create_input_model("DewpointML", ["ml", "lnsp"])
DewpointOutputModel = create_output_model("Dewpoint", ["dpt"])
16 changes: 16 additions & 0 deletions src/pproc/config/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1712,3 +1712,19 @@ def _format_out(self, param: ParamConfig, req) -> dict:
req["levtype"] = "fl"
req["levelist"] = self.target_flight_levels
return req


class DewpointPLConfig(AccumConfig):
parallelisation: int = 1
inputs: io.DewpointPLInputModel
outputs: io.DewpointOutputModel = io.DewpointOutputModel()
parameters: list[ParamConfig]


class DewpointMLConfig(AccumConfig):
parallelisation: int = 1
inputs: io.DewpointMLInputModel
outputs: io.DewpointOutputModel = io.DewpointOutputModel()
parameters: list[ParamConfig]
model: str
n_levels: int
127 changes: 127 additions & 0 deletions src/pproc/dewpoint_ml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# (C) Copyright 2026- ECMWF and individual contributors.

# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation nor
# does it submit to any jurisdiction.

"""Dew point temperature on model levels"""

import functools
import logging
import signal
import sys

from conflator import Conflator
from earthkit.meteo import thermo, vertical
from earthkit.data import FieldList, SimpleFieldList
from meters import ResourceMeter
import numpy as np

from pproc.common.accumulation_manager import AccumulationManager
from pproc.common.io import write_grib
from pproc.common.parallel import parallel_processing, sigterm_handler
from pproc.common.param_requester import ParamRequester
from pproc.config.types import ParamConfig, DewpointMLConfig
from pproc.common.utils import dict_product


logger = logging.getLogger(__name__)

# GRIB parameter: dew point temperature
DEWPOINT_PARAM_ID = 3017


def dewpoint_iteration(
config: DewpointMLConfig,
pconfig: ParamConfig,
dims: dict,
):
ids = ", ".join(f"{k}={v}" for k, v in dims.items())

fields = SimpleFieldList()
for src_name in config.inputs.names:
src_param = getattr(pconfig, src_name, pconfig)
total = pconfig.compute_totalfields(config.inputs, src_name)
requester = ParamRequester(src_param, config.inputs, total, src_name)
with ResourceMeter(f"Retrieve {src_name} {ids}"):
metadata, data = requester.retrieve_data(**dims)
fields += FieldList.from_array(data, [x.to_ekmetadata() for x in metadata])

with ResourceMeter(f"Compute dewpoint temperature {ids}"):
A, B = vertical.hybrid_level_parameters(config.n_levels, model=config.model)

# Surface pressure [Pa]
lnsp = fields.sel(param="lnsp").order_by(number="ascending")
sp_arr = np.exp(lnsp.values)

# Specific humidity [kg/kg]
q = fields.sel(param="q").order_by(level="ascending", number="ascending")

# Pressure [Pa]
levels = list(set(q.metadata("vertical.level")))
levels.sort()
p_arr = vertical.pressure_on_hybrid_levels(
A, B, sp_arr, levels=levels, output="full", vertical_dim=0
)

# Reshape q to match p, where level dim is separate from other dims.
# The ordering of levels and the matching (level-first) ordering of q
# ensures that the levels dim is separated correctly after reshaping.
q_arr = q.values.reshape(p_arr.shape)

# Dewpoint temperature [K]
dpt_arr = thermo.array.dewpoint_from_specific_humidity(q_arr, p_arr)

out_dpt = config.outputs.dpt
templates = q.sel(level=levels[0])
for i, level in enumerate(levels):
for j, values in enumerate(dpt_arr[i]):
write_grib(
out_dpt.target,
templates[j].metadata()._handle,
values.astype(np.float32),
{
**out_dpt.metadata,
**pconfig.metadata,
"paramId": DEWPOINT_PARAM_ID,
"level": level
},
)

config.outputs.dpt.target.flush()
config.recovery.add_checkpoint(param=pconfig.name, **dims)


def main():
sys.stdout.reconfigure(line_buffering=True)
signal.signal(signal.SIGTERM, sigterm_handler)

cfg = Conflator(app_name="pproc-dewpoint-ml", model=DewpointMLConfig).load()
cfg.initialise()
cfg.print()

plan = []
for param in cfg.parameters:
accum_manager = AccumulationManager.create(
param.accumulations,
)
for dims in dict_product(accum_manager.dims):
if cfg.recovery.existing_checkpoint(param=param.name, **dims):
print(f"Recovery: skipping dims: {param.name} {dims}")
continue
plan.append((param, dims))

iteration = functools.partial(dewpoint_iteration, cfg)
parallel_processing(
iteration,
plan,
cfg.parallelisation,
)

cfg.clean()


if __name__ == "__main__":
sys.exit(main())
111 changes: 111 additions & 0 deletions src/pproc/dewpoint_pl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# (C) Copyright 2026- ECMWF and individual contributors.

# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation nor
# does it submit to any jurisdiction.

"""Dew point temperature on pressure levels"""

import functools
import logging
import signal
import sys

from conflator import Conflator
from earthkit.meteo import thermo
from earthkit.data import FieldList, SimpleFieldList
from meters import ResourceMeter
import numpy as np

from pproc.common.accumulation_manager import AccumulationManager
from pproc.common.io import write_grib
from pproc.common.parallel import parallel_processing, sigterm_handler
from pproc.common.param_requester import ParamRequester
from pproc.config.types import ParamConfig, DewpointPLConfig
from pproc.common.utils import dict_product


logger = logging.getLogger(__name__)

# GRIB parameter: dew point temperature
DEWPOINT_PARAM_ID = 3017


def dewpoint_iteration(
config: DewpointPLConfig,
pconfig: ParamConfig,
dims: dict,
):
ids = ", ".join(f"{k}={v}" for k, v in dims.items())

fields = SimpleFieldList()
for src_name in config.inputs.names:
src_param = getattr(pconfig, src_name, pconfig)
total = pconfig.compute_totalfields(config.inputs, src_name)
requester = ParamRequester(src_param, config.inputs, total, src_name)
with ResourceMeter(f"Retrieve {src_name} {ids}"):
metadata, data = requester.retrieve_data(**dims)
fields += FieldList.from_array(data, [x.to_ekmetadata() for x in metadata])

with ResourceMeter(f"Compute dewpoint temperature {ids}"):
# Specific humidity [kg/kg]
q = fields.sel(param="q")
q_arr = q.values

# Pressure [Pa]
p = q.metadata("vertical.level") # [hPa]
p_arr = 100. * np.asarray(p)[:,None] # align for broadcasting

# Dewpoint temperature [K]
dpt_arr = thermo.array.dewpoint_from_specific_humidity(q_arr, p_arr)

for i, values in enumerate(dpt_arr):
out_dpt = config.outputs.dpt
write_grib(
out_dpt.target,
q[i].metadata()._handle,
values.astype(np.float32),
{
**out_dpt.metadata,
**pconfig.metadata,
"paramId": DEWPOINT_PARAM_ID,
},
)

config.outputs.dpt.target.flush()
config.recovery.add_checkpoint(param=pconfig.name, **dims)


def main():
sys.stdout.reconfigure(line_buffering=True)
signal.signal(signal.SIGTERM, sigterm_handler)

cfg = Conflator(app_name="pproc-dewpoint-pl", model=DewpointPLConfig).load()
cfg.initialise()
cfg.print()

plan = []
for param in cfg.parameters:
accum_manager = AccumulationManager.create(
param.accumulations,
)
for dims in dict_product(accum_manager.dims):
if cfg.recovery.existing_checkpoint(param=param.name, **dims):
print(f"Recovery: skipping dims: {param.name} {dims}")
continue
plan.append((param, dims))

iteration = functools.partial(dewpoint_iteration, cfg)
parallel_processing(
iteration,
plan,
cfg.parallelisation,
)

cfg.clean()


if __name__ == "__main__":
sys.exit(main())
50 changes: 50 additions & 0 deletions tests/templates/dewpoint-ml.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
parallelisation: 1

model: ifs
n_levels: 137

parameters:
q:
inputs:
ml:
request:
param: 133
grid: O1280
lnsp:
request:
param: 152
grid: O1280

inputs:
ml:
request:
class: od
date: 20260528
expver: '0001'
levelist: [1, 2]
levtype: ml
step: [0, 96]
stream: oper
time: 00
type: fc
source:
type: fdb
lnsp:
request:
class: od
date: 20260528
expver: '0001'
levelist: [1]
levtype: ml
step: [0, 96]
stream: oper
time: 00
type: fc
source:
type: fdb

outputs:
dpt:
units: K
target:
type: fdb
31 changes: 31 additions & 0 deletions tests/templates/dewpoint-pl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
parallelisation: 1

parameters:
q:
inputs:
pl:
request:
param: "133.128"
grid: O1280

inputs:
pl:
request:
class: od
date: 20260528
expver: 1
levelist: [500, 850]
levtype: pl
number: [1, 2]
step: [0, 24]
stream: enfo
time: 00
type: pf
source:
type: fdb

outputs:
dpt:
units: K
target:
type: fdb