Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 248 additions & 0 deletions docs/user_guide/examples/tutorial_write_in_kernel.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "0",
"metadata": {},
"source": [
"# 🎓 Writing output in a Kernel"
]
},
{
"cell_type": "markdown",
"id": "1",
"metadata": {},
"source": [
"The typical way to write particle data to a file is by specifying the `output_file` argument in a `pset.execute()` call. This will write to a file on a regular interval, specified in the `outputdt` argument of the `ParticleFile`. \n",
"\n",
"However, sometimes you may want more control of _when_ to write particle data to a file. For example, you may want to write when a particle is deleted, or only when it is in a certain region or when other conditions are met. This can be especially useful if you want to do connectivity studies and don't care about having the full trajectory of each particle available. \n",
"\n",
"For these cases, you can use the `pfile.write()` method anywhere in a `Kernel`. You can write to the same file as the one specified in `output_file` or to a different file. \n",
"\n",
"This short tutorial will show you how to use `pfile.write()` in a kernel. We will use the same dataset and particle set as in the [output tutorial ](../../getting_started/tutorial_output.ipynb)."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2",
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"\n",
"import parcels\n",
"import parcels.tutorial"
]
},
{
"cell_type": "markdown",
"id": "3",
"metadata": {},
"source": [
"We start by defining two `ParticleFile` objects: one for writing at regular intervals and one for writing in the kernel. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4",
"metadata": {},
"outputs": [],
"source": [
"# The file that will write at regular intervals (every 2 hours in this case)\n",
"output_file_regular = parcels.ParticleFile(\n",
" \"output_regular.parquet\",\n",
" outputdt=np.timedelta64(2, \"h\"),\n",
")\n",
"\n",
"# The file that will write when the `write_kernel` is executed.\n",
"output_file_kernel = parcels.ParticleFile(\n",
" \"output_kernel.parquet\",\n",
" outputdt=np.timedelta64(\n",
" 2, \"h\"\n",
" ), # TODO: this argument is not relevant for this file, since we will write to it manually in the kernel, but it still needs to be specified when creating the ParticleFile object\n",
")"
]
},
{
"cell_type": "markdown",
"id": "5",
"metadata": {},
"source": [
"Now we define a kernel that writes to the file when executed. In this example, we write all particles to the file, but you can also choose to write only a subset of the particles by using indexing (e.g., `particles[0]` for the first particle). "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6",
"metadata": {},
"outputs": [],
"source": [
"def write_kernel(particles, fieldset):\n",
" output_file_kernel.write(\n",
" particles,\n",
" particles.time, # Note that here we use the time of the particles, which may not have started yet.\n",
" fieldset=fieldset, # Note that we need to specify the fieldset here as well, since the ParticleSetView does not have a reference to the fieldset.\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "7",
"metadata": {},
"source": [
"Now we run the simulation with both the advection kernel and the write kernel. Note that we specify the `output_file` argument in the `execute()` call, which will write to a file at regular intervals, but we can still use the `write_kernel` to write to a file at specific times."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8",
"metadata": {},
"outputs": [],
"source": [
"# Load the CopernicusMarine data in the Agulhas region from the example_datasets\n",
"ds_fields = parcels.tutorial.open_dataset(\n",
" \"CopernicusMarine_data_for_Argo_tutorial/data\"\n",
")\n",
"ds_fields.load() # load the dataset into memory\n",
"\n",
"# Convert to SGRID-compliant dataset and create FieldSet\n",
"fields = {\"U\": ds_fields[\"uo\"], \"V\": ds_fields[\"vo\"]}\n",
"ds_fset = parcels.convert.copernicusmarine_to_sgrid(fields=fields)\n",
"fieldset = parcels.FieldSet.from_sgrid_conventions(ds_fset)\n",
"\n",
"pset = parcels.ParticleSet(fieldset=fieldset, lon=[31, 32], lat=[-32, -31], z=[1, 1])\n",
"\n",
"pset.execute(\n",
" kernels=[\n",
" parcels.kernels.AdvectionRK2,\n",
" write_kernel, # the kernel that writes to the file when executed\n",
" ],\n",
" runtime=np.timedelta64(4, \"h\"),\n",
" dt=np.timedelta64(15, \"m\"),\n",
" output_file=output_file_regular, # the file that writes at regular intervals\n",
")"
]
},
{
"cell_type": "markdown",
"id": "9",
"metadata": {},
"source": [
"When we inspect the output files, we can see that the file that writes at regular intervals has 6 rows (2 particles * 3 time steps) at intervals of 2 hours, while the file that writes in the kernel has 32 rows (2 particles * 16 time steps) at intervals of 15 minutes."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "10",
"metadata": {},
"outputs": [],
"source": [
"df_loop = parcels.read_particlefile(output_file_regular.path)\n",
"assert len(df_loop) == 6 # 2 particles * 3 time steps (0h, 2h, 4h)\n",
"df_loop"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "11",
"metadata": {},
"outputs": [],
"source": [
"output_file_kernel._writer.close() # close the writer to flush the data to disk\n",
"df_kernel = parcels.read_particlefile(output_file_kernel.path)\n",
"assert (\n",
" len(df_kernel) == 32\n",
") # 2 particles * 16 time steps (every 15 minutes for 4 hours)\n",
"df_kernel"
]
},
{
"cell_type": "markdown",
"id": "12",
"metadata": {},
"source": [
"## Writing on particle deletion\n",
"\n",
"We can also write to the same file in both the `output_file` and the kernel. This could be useful if e.g. a particle is deleted in the kernel and you want to write the particle data at the time of deletion and you also want to have the full trajectory of the particle available in the output file."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "13",
"metadata": {},
"outputs": [],
"source": [
"output_file_both = parcels.ParticleFile(\n",
" \"output_both.parquet\",\n",
" outputdt=np.timedelta64(2, \"h\"),\n",
")\n",
"\n",
"\n",
"def write_kernel_on_delete(particles, fieldset):\n",
" ptcls = particles[particles.time == 8100] # delete particles at 2h15m\n",
" ptcls.state = parcels.StatusCode.Delete\n",
" if len(ptcls.time) > 0:\n",
" output_file_both.write(\n",
" ptcls,\n",
" ptcls.time,\n",
" fieldset=fieldset,\n",
" )\n",
"\n",
"\n",
"pset = parcels.ParticleSet(fieldset=fieldset, lon=[31, 32], lat=[-32, -31], z=[1, 1])\n",
"\n",
"pset.execute(\n",
" kernels=[\n",
" parcels.kernels.AdvectionRK2,\n",
" write_kernel_on_delete, # the kernel that writes to the file when executed\n",
" ],\n",
" runtime=np.timedelta64(4, \"h\"),\n",
" dt=np.timedelta64(15, \"m\"),\n",
" output_file=output_file_both, # the file that writes at regular intervals\n",
")\n",
"\n",
"df = parcels.read_particlefile(output_file_both.path)\n",
"assert len(df) == 6 # 2 particles * 3 time steps (0h, 2h, 2h15m)\n",
"df"
]
},
{
"cell_type": "markdown",
"id": "14",
"metadata": {},
"source": [
"As you can see in the output above, each particle is now written three times, at 0h, at 2h, and at 2h15m when the particle is deleted.\n",
"\n",
"Of course, if you do not add the `output_file` argument to the `execute()` call, then only the kernel will write to the file and there will be one row per particle in the output file, at the time of deletion. Note that you then need to explicitly close the writer to flush the data to disk(with `output_file_both._writer.close()`), since the `execute()` call will not do this for you if you do not specify an `output_file`."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Parcels:docs (3.14.4)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.14.4"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
14 changes: 10 additions & 4 deletions src/parcels/_core/particlefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import parcels
from parcels._core.particle import ParticleClass
from parcels._core.particlesetview import ParticleSetView
from parcels._core.utils.time import timedelta_to_float
from parcels._reprs import particlefile_repr
from parcels._typing import PathLike
Expand Down Expand Up @@ -115,7 +116,7 @@ def outputdt(self):
def path(self):
return self._path

def write(self, pset: ParticleSet, time, indices=None):
def write(self, pset: ParticleSet | ParticleSetView, time, fieldset=None, indices=None):
"""Write all data from one time step to the zarr file,
before the particle locations are updated.

Expand All @@ -125,17 +126,22 @@ def write(self, pset: ParticleSet, time, indices=None):
ParticleSet object to write
time :
Time at which to write ParticleSet (same time object as fieldset)
fieldset :
FieldSet object associated with the ParticleSet (optional). By default, the fieldset associated with the ParticleSet will be used, but this can be overridden by providing a fieldset here. This is used in cases when the particleset is a ParticleSetView.
"""
pclass = pset._ptype
time_interval = pset.fieldset.time_interval
if isinstance(pset, ParticleSetView) and fieldset is None:
raise ValueError("When writing a ParticleSetView, a fieldset must be provided to the write() method.")
if fieldset is None:
fieldset = pset.fieldset
particle_data = pset._data

if self._writer is None:
assert not self.path.exists(), "If the file exists, the writer should already be set"
self._writer = pq.ParquetWriter(self.path, _get_schema(pclass, self.metadata, pset.fieldset.time_interval))
self._writer = pq.ParquetWriter(self.path, _get_schema(pclass, self.metadata, fieldset.time_interval))

if isinstance(time, (np.timedelta64, np.datetime64)):
time = timedelta_to_float(time - time_interval.left)
time = timedelta_to_float(time - fieldset.time_interval.left)
vars_to_write = _get_vars_to_write(pclass)
if indices is None:
indices_to_write = _to_write_particles(particle_data, time)
Expand Down
Loading