Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ rtcoef*

advance_time
closest_member_tool
cmems_ssh_to_obs
compare_states
compute_error
create_fixed_network_seq
Expand Down
58 changes: 43 additions & 15 deletions assimilation_code/modules/utilities/read_csv_mod.f90
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
! Utility routines for reading simple ASCII, CSV-like tabular data.
!
! This module provides a lightweight interface to work with
! non-NetCDF data products. It supports files with a single
! header row and delimited fields (comma, semicolon, or whitespace).
! non-NetCDF data products. It supports files with multiple
! header rows and delimited fields (comma, semicolon, or whitespace).
!
! Some features:
! - Cached CSV file handle with header, delimiter, and row & column count.
Expand Down Expand Up @@ -61,7 +61,8 @@ module read_csv_mod
private
character(len=256) :: filename = ''
integer :: nrows = 0
integer :: ncols = 0
integer :: ncols = 0
integer :: skip = 0 ! Optional lines to skip before the header
integer :: iunit = -1
character :: delim = ','
character(len=512) :: fields(MAX_NUM_FIELDS)
Expand Down Expand Up @@ -332,7 +333,7 @@ subroutine csv_skip_header(cf, context)
type(csv_file_type), intent(inout) :: cf
character(len=*), intent(in), optional :: context

integer :: io
integer :: i, io
character(len=MAX_FIELDS_LEN) :: line

character(len=*), parameter :: routine = 'csv_skip_header'
Expand All @@ -346,12 +347,15 @@ subroutine csv_skip_header(cf, context)
return
endif

read(cf%iunit, '(A)', iostat=io) line
if (io /= 0) then
write(string1,'(A,I0)') 'READ(header) failed, io=', io
call error_handler(E_ERR, routine, string1, context)
return
endif
! Start skipping (include any other unnecessary lines)
do i = 1, cf%skip+1
read(cf%iunit, '(A)', iostat=io) line
if (io /= 0) then
write(string1,'(A,I0)') 'READ(header) failed, io=', io
call error_handler(E_ERR, routine, string1, context)
return
endif
enddo

end subroutine csv_skip_header

Expand All @@ -372,37 +376,60 @@ end function csv_get_nrows
! Open a CSV handle: cache header/dims.
! By doing so, we won't need to open the file
! every time to read header or get dimensions.
subroutine csv_open(fname, cf, forced_delim, context)
subroutine csv_open(fname, cf, skip_lines, forced_delim, context)

character(len=*), intent(in) :: fname
type(csv_file_type), intent(out) :: cf
integer, intent(in), optional :: skip_lines
character(len=*), intent(in), optional :: forced_delim
character(len=*), intent(in), optional :: context

character(len=*), parameter :: routine = 'csv_open'

integer :: io
integer :: i, io
character(len=MAX_FIELDS_LEN) :: line

! Reset
cf%filename = fname
cf%nrows = 0
cf%ncols = 0
cf%skip = 0
cf%iunit = -1
cf%delim = ','
cf%fields = ''
cf%is_open = .false.

! Number of rows (excluding header)
! Number of rows (excluding 1 header line)
cf%nrows = csv_get_nrows_from_file(fname, context)

! Read header and delimiter
! Open the file
cf%iunit = open_file(fname, action='read', form='formatted')

! Check if any lines need to be skipped on top of the header
if (present(skip_lines)) then
cf%skip = skip_lines

do i = 1, cf%skip
read(cf%iunit, '(A)', iostat=io) line

if (io /= 0) then
call csv_close(cf)

write(string1, '(A, I0)') 'Got bad read code from input file, io = ', io
call error_handler(E_ERR, routine, string1, context)
return
endif
enddo
endif

! Update data rows
cf%nrows = cf%nrows - cf%skip

! The header
read(cf%iunit, '(A)', iostat=io) line
if (io /= 0) then
call csv_close(cf)

write(string1, '(A, I0)') 'Got bad read code from input file, io = ', io
call error_handler(E_ERR, routine, string1, context)
return
Expand All @@ -429,6 +456,7 @@ subroutine csv_close(cf)
cf%filename = ''
cf%nrows = 0
cf%ncols = 0
cf%skip = 0
cf%iunit = -1
cf%delim = ','
cf%fields = ''
Expand Down
5 changes: 4 additions & 1 deletion assimilation_code/modules/utilities/read_csv_mod.rst
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ What this module does
- values that cannot be converted to integer are returned as ``MISSING_I``
- values that cannot be converted to real are returned as ``MISSING_R8``

- If a file has extra lines on top (typically including links and data source
information) before the actual header, these can be skipped when opening the
file with ``csv_open`` using the optional argument ``skip_lines``

- Treats backslash (``\``) as an escape character, preventing interpretation of the following character during parsing.

What this module does not do
Expand All @@ -150,7 +154,6 @@ What this module does not do

- It does not support:

- multiple header lines
- comment lines or metadata blocks
- embedded newlines inside quoted fields

Expand Down
1 change: 1 addition & 0 deletions guide/available-observation-converters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Each directory has at least one converter:
- ``BATS``: :ref:`bats`
- ``CHAMP``: :ref:`champ`
- ``cice``: :ref:`cice_to_obs`
- ``cmems_ssh_l3``: :ref:`cmems_ssh_to_obs`
- ``CNOFS``: See ``DART/observations/obs_converters/CNOFS``
- ``CONAGUA``: :ref:`conagua`
- ``COSMOS``: :ref:`cosmos`
Expand Down
1 change: 1 addition & 0 deletions index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ References
observations/obs_converters/CHAMP/README
observations/obs_converters/BATS/readme
observations/obs_converters/cice/cice_to_obs
observations/obs_converters/cmems_ssh_l3/readme
observations/obs_converters/CONAGUA/README
observations/obs_converters/COSMOS/COSMOS_to_obs
observations/obs_converters/COSMOS/COSMOS_development
Expand Down
212 changes: 212 additions & 0 deletions models/ROMS_rutgers/preprocess_ocean_obs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
"""
preprocess_ocean_obs.py -- Filter an obs_seq file by bathymetric depth and
optionally replace observation error with a depth-
dependent model.

When --obs-type is given, only that type is depth-filtered and error-updated;
all other types are kept unchanged. Without --obs-type, all obs are processed.

Usage examples
--------------
# All obs, depth filter + depth-dependent error (defaults):
python preprocess_ocean_obs.py obs_seq.all obs_seq.all_trim \
--roms-file roms_restart.nc

# All obs, depth filter only (keep original error variance):
python preprocess_ocean_obs.py obs_seq.all obs_seq.all_trim \
--roms-file roms_restart.nc --no-depth-error

# One obs type, depth filter + error update, custom depth cut-off
# (other types in the file are kept unchanged):
python preprocess_ocean_obs.py obs_seq.all obs_seq.deep \
--roms-file roms_restart.nc \
--obs-type SATELLITE_SSH \
--min-depth 500

# All obs, error update only (no depth filtering):
python preprocess_ocean_obs.py obs_seq.all obs_seq.err_updated \
--roms-file roms_restart.nc --no-depth-filter

"""

# Module imports
from scipy.spatial import cKDTree

import argparse
import sys
import pandas as pd
import numpy as np
import xarray as xr
import pydartdiags.obs_sequence.obs_sequence as obsq


# CLI
def parse_args():
p = argparse.ArgumentParser(
description="Trim an obs_seq file by ROMS bathymetric depth.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)

# Required positional arguments
p.add_argument("obs_in", help="Input obs_seq file")
p.add_argument("obs_out", help="Output obs_seq file")
p.add_argument("--roms-file", required=True,
help="ROMS NetCDF file with h, mask_rho, lon_rho, lat_rho")
p.add_argument("--obs-type", default=None,
help="Process only this obs type; all others pass through unchanged")

# Depth filter
filt = p.add_argument_group("depth filter")
filt.add_argument("--no-depth-filter", dest="use_depth_filter",
action="store_false", default=True,
help="Keep obs at all depths")
filt.add_argument("--min-depth", type=float, default=200.0,
help="Remove obs shallower than this [m]")

# Depth-dependent error
err = p.add_argument_group("depth-dependent error")
err.add_argument("--no-depth-error", dest="use_depth_error",
action="store_false", default=True,
help="Keep original obs error variance")
err.add_argument("--sigma-min", type=float, default=0.04,
help="Deep-ocean obs error std-dev [obs units]")
err.add_argument("--sigma-max", type=float, default=0.08,
help="Shallow-water obs error std-dev [obs units]")
err.add_argument("--h0", type=float, default=500.0,
help="Depth scale for error transition [m]")

args = p.parse_args()

if not args.use_depth_filter and not args.use_depth_error:
p.error("--no-depth-filter and --no-depth-error together leave nothing to do.")

return args


# Depth lookup: vectorised via KDTree
def lookup_depths(lon_obs, lat_obs, glon, glat, bath, mask):
"""
Return the bathymetric depth at the nearest wet grid point for every
observation. This is an approximate depth estimate because observations
are not necessarily collocated with the model grid.
"""
wet = mask > 0.5 # 0: land, 1: wet
tree = cKDTree(np.column_stack([glon[wet], glat[wet]]))
_, idx = tree.query(np.column_stack([lon_obs, lat_obs]))
return bath[wet][idx]


# Depth-dependent error model
def depth_dependent_error_std(depth, sigma_min, sigma_max, h0):
"""Exponential depth-to-error mapping; returns std-dev (not variance)."""
return sigma_min + (sigma_max - sigma_min) * np.exp(-depth / h0)


# Main
def main():
args = parse_args()

# Read the input obs_seq and ROMS grid
print(f"\nReading obs_seq: {args.obs_in}")
obs_seq = obsq.ObsSequence(args.obs_in)

print(f"Reading ROMS grid: {args.roms_file}")
with xr.open_dataset(args.roms_file) as roms:
bath = roms["h"].values
mask = roms["mask_rho"].values
glon = np.mod(roms["lon_rho"].values, 360.0) # DART-style
glat = roms["lat_rho"].values

# Which observation type(s) to process?
df = obs_seq.df.copy()

# First, find out what obs are available in file:
available = sorted(df["type"].unique())
print(f"\nObs types in file: {available}")

if args.obs_type:
# User selected obs type
if args.obs_type not in available:
print(f"ERROR: '{args.obs_type}' not found in file.\n"
f"Available: {available}")
sys.exit(1)
obs_df = df[df["type"] == args.obs_type].copy()
obs_rest = df[df["type"] != args.obs_type].copy()
label = args.obs_type
else:
obs_df = df.copy()
obs_rest = None
label = "all"

Nobs_in = len(obs_df)
print(f"\nInput obs ({label}): {Nobs_in}")

if Nobs_in == 0: # empty?
print("No observations to process — exiting.")
sys.exit(0)

# Figure out the options for depth filtering and
# the error update.
if args.use_depth_filter or args.use_depth_error:
# Depth lookup needed for filter and/or error update

print("Looking up bathymetric depths ...")
lon_obs = obs_df["longitude"].values
lat_obs = obs_df["latitude"].values
depth = lookup_depths(lon_obs, lat_obs, glon, glat, bath, mask)
else:
depth = None

if args.use_depth_filter and depth is not None:
keep = np.isfinite(depth) & (depth > args.min_depth)
obs_kept = obs_df.loc[keep].copy()
depth_kept = depth[keep]
else:
obs_kept = obs_df.copy()
depth_kept = depth

if args.use_depth_error and depth_kept is not None:
sigma = depth_dependent_error_std(
depth_kept, args.sigma_min, args.sigma_max, args.h0)
obs_kept["obs_err_var"] = sigma ** 2

# Merge back and re-number:
if obs_rest is not None:
obs_final = pd.concat([obs_rest, obs_kept], ignore_index=True)
obs_final = obs_final.sort_values(["days", "seconds", "obs_num"]). \
reset_index(drop=True)
else:
obs_final = obs_kept.reset_index(drop=True)

obs_final["obs_num"] = np.arange(1, len(obs_final) + 1)

# Write the output obs_seq
obs_seq.df = obs_final

obs_seq.update_attributes_from_df()
obs_seq._update_linked_list(obs_final) # private, no public API for this?
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think obs_seq.update_attributes_from_df() calls _update_linked_list so you
don't need to call _update_linked_list directly. I will take a closer look running your example to see if there is anything funky going on.

Suggested change
obs_seq._update_linked_list(obs_final) # private, no public API for this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I wasn't sure about it too.

obs_seq.write_obs_seq(args.obs_out)

# Print summary
print(f"\nSummary ({label})")

if args.use_depth_filter:
print(f" Kept: {len(obs_kept)} (depth > {args.min_depth:.0f} m)")
print(f" Removed: {Nobs_in - len(obs_kept)}")
if len(obs_kept):
print(f" Depth range kept: {depth_kept.min():.1f} – {depth_kept.max():.1f} m")
else:
print(f" Depth filter: off | all {Nobs_in} obs kept")

if args.use_depth_error:
print(f" Error model: sigma = {args.sigma_min} + "
f"({args.sigma_max} - {args.sigma_min}) * exp(-h / {args.h0})")
else:
print(f" Error model: Original obs error variance kept")

if obs_rest is not None:
print(f"\n Other obs types kept unchanged: {len(obs_rest)}")

print(f"\n Wrote: {args.obs_out}")

if __name__ == "__main__":
main()
Loading
Loading