diff --git a/docs/user_guide/examples/tutorial_write_in_kernel.ipynb b/docs/user_guide/examples/tutorial_write_in_kernel.ipynb new file mode 100644 index 000000000..c1f8c7282 --- /dev/null +++ b/docs/user_guide/examples/tutorial_write_in_kernel.ipynb @@ -0,0 +1,248 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# 🎓 Writing output in a Kernel" + ] + }, + { + "cell_type": "markdown", + "id": "1", + "metadata": {}, + "source": [ + "The typical way to write particle data to a file is by specifying the `output_file` argument in a `pset.execute()` call. This will write to a file on a regular interval, specified in the `outputdt` argument of the `ParticleFile`. \n", + "\n", + "However, sometimes you may want more control of _when_ to write particle data to a file. For example, you may want to write when a particle is deleted, or only when it is in a certain region or when other conditions are met. This can be especially useful if you want to do connectivity studies and don't care about having the full trajectory of each particle available. \n", + "\n", + "For these cases, you can use the `pfile.write()` method anywhere in a `Kernel`. You can write to the same file as the one specified in `output_file` or to a different file. \n", + "\n", + "This short tutorial will show you how to use `pfile.write()` in a kernel. We will use the same dataset and particle set as in the [output tutorial ](../../getting_started/tutorial_output.ipynb)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "\n", + "import parcels\n", + "import parcels.tutorial" + ] + }, + { + "cell_type": "markdown", + "id": "3", + "metadata": {}, + "source": [ + "We start by defining two `ParticleFile` objects: one for writing at regular intervals and one for writing in the kernel. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4", + "metadata": {}, + "outputs": [], + "source": [ + "# The file that will write at regular intervals (every 2 hours in this case)\n", + "output_file_regular = parcels.ParticleFile(\n", + " \"output_regular.parquet\",\n", + " outputdt=np.timedelta64(2, \"h\"),\n", + ")\n", + "\n", + "# The file that will write when the `write_kernel` is executed.\n", + "output_file_kernel = parcels.ParticleFile(\n", + " \"output_kernel.parquet\",\n", + " outputdt=np.timedelta64(\n", + " 2, \"h\"\n", + " ), # TODO: this argument is not relevant for this file, since we will write to it manually in the kernel, but it still needs to be specified when creating the ParticleFile object\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "5", + "metadata": {}, + "source": [ + "Now we define a kernel that writes to the file when executed. In this example, we write all particles to the file, but you can also choose to write only a subset of the particles by using indexing (e.g., `particles[0]` for the first particle). " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "def write_kernel(particles, fieldset):\n", + " output_file_kernel.write(\n", + " particles,\n", + " particles.time, # Note that here we use the time of the particles, which may not have started yet.\n", + " fieldset=fieldset, # Note that we need to specify the fieldset here as well, since the ParticleSetView does not have a reference to the fieldset.\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "Now we run the simulation with both the advection kernel and the write kernel. Note that we specify the `output_file` argument in the `execute()` call, which will write to a file at regular intervals, but we can still use the `write_kernel` to write to a file at specific times." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "# Load the CopernicusMarine data in the Agulhas region from the example_datasets\n", + "ds_fields = parcels.tutorial.open_dataset(\n", + " \"CopernicusMarine_data_for_Argo_tutorial/data\"\n", + ")\n", + "ds_fields.load() # load the dataset into memory\n", + "\n", + "# Convert to SGRID-compliant dataset and create FieldSet\n", + "fields = {\"U\": ds_fields[\"uo\"], \"V\": ds_fields[\"vo\"]}\n", + "ds_fset = parcels.convert.copernicusmarine_to_sgrid(fields=fields)\n", + "fieldset = parcels.FieldSet.from_sgrid_conventions(ds_fset)\n", + "\n", + "pset = parcels.ParticleSet(fieldset=fieldset, lon=[31, 32], lat=[-32, -31], z=[1, 1])\n", + "\n", + "pset.execute(\n", + " kernels=[\n", + " parcels.kernels.AdvectionRK2,\n", + " write_kernel, # the kernel that writes to the file when executed\n", + " ],\n", + " runtime=np.timedelta64(4, \"h\"),\n", + " dt=np.timedelta64(15, \"m\"),\n", + " output_file=output_file_regular, # the file that writes at regular intervals\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "9", + "metadata": {}, + "source": [ + "When we inspect the output files, we can see that the file that writes at regular intervals has 6 rows (2 particles * 3 time steps) at intervals of 2 hours, while the file that writes in the kernel has 32 rows (2 particles * 16 time steps) at intervals of 15 minutes." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [], + "source": [ + "df_loop = parcels.read_particlefile(output_file_regular.path)\n", + "assert len(df_loop) == 6 # 2 particles * 3 time steps (0h, 2h, 4h)\n", + "df_loop" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11", + "metadata": {}, + "outputs": [], + "source": [ + "output_file_kernel._writer.close() # close the writer to flush the data to disk\n", + "df_kernel = parcels.read_particlefile(output_file_kernel.path)\n", + "assert (\n", + " len(df_kernel) == 32\n", + ") # 2 particles * 16 time steps (every 15 minutes for 4 hours)\n", + "df_kernel" + ] + }, + { + "cell_type": "markdown", + "id": "12", + "metadata": {}, + "source": [ + "## Writing on particle deletion\n", + "\n", + "We can also write to the same file in both the `output_file` and the kernel. This could be useful if e.g. a particle is deleted in the kernel and you want to write the particle data at the time of deletion and you also want to have the full trajectory of the particle available in the output file." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "13", + "metadata": {}, + "outputs": [], + "source": [ + "output_file_both = parcels.ParticleFile(\n", + " \"output_both.parquet\",\n", + " outputdt=np.timedelta64(2, \"h\"),\n", + ")\n", + "\n", + "\n", + "def write_kernel_on_delete(particles, fieldset):\n", + " ptcls = particles[particles.time == 8100] # delete particles at 2h15m\n", + " ptcls.state = parcels.StatusCode.Delete\n", + " if len(ptcls.time) > 0:\n", + " output_file_both.write(\n", + " ptcls,\n", + " ptcls.time,\n", + " fieldset=fieldset,\n", + " )\n", + "\n", + "\n", + "pset = parcels.ParticleSet(fieldset=fieldset, lon=[31, 32], lat=[-32, -31], z=[1, 1])\n", + "\n", + "pset.execute(\n", + " kernels=[\n", + " parcels.kernels.AdvectionRK2,\n", + " write_kernel_on_delete, # the kernel that writes to the file when executed\n", + " ],\n", + " runtime=np.timedelta64(4, \"h\"),\n", + " dt=np.timedelta64(15, \"m\"),\n", + " output_file=output_file_both, # the file that writes at regular intervals\n", + ")\n", + "\n", + "df = parcels.read_particlefile(output_file_both.path)\n", + "assert len(df) == 6 # 2 particles * 3 time steps (0h, 2h, 2h15m)\n", + "df" + ] + }, + { + "cell_type": "markdown", + "id": "14", + "metadata": {}, + "source": [ + "As you can see in the output above, each particle is now written three times, at 0h, at 2h, and at 2h15m when the particle is deleted.\n", + "\n", + "Of course, if you do not add the `output_file` argument to the `execute()` call, then only the kernel will write to the file and there will be one row per particle in the output file, at the time of deletion. Note that you then need to explicitly close the writer to flush the data to disk(with `output_file_both._writer.close()`), since the `execute()` call will not do this for you if you do not specify an `output_file`." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Parcels:docs (3.14.4)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.14.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/parcels/_core/particlefile.py b/src/parcels/_core/particlefile.py index 0f1a2d2c5..6e11d17d5 100644 --- a/src/parcels/_core/particlefile.py +++ b/src/parcels/_core/particlefile.py @@ -15,6 +15,7 @@ import parcels from parcels._core.particle import ParticleClass +from parcels._core.particlesetview import ParticleSetView from parcels._core.utils.time import timedelta_to_float from parcels._reprs import particlefile_repr from parcels._typing import PathLike @@ -116,7 +117,7 @@ def outputdt(self): def path(self): return self._path - def write(self, pset: ParticleSet, time, indices=None): + def write(self, pset: ParticleSet | ParticleSetView, time, fieldset=None, indices=None): """Write all data from one time step to the zarr file, before the particle locations are updated. @@ -126,17 +127,22 @@ def write(self, pset: ParticleSet, time, indices=None): ParticleSet object to write time : Time at which to write ParticleSet (same time object as fieldset) + fieldset : + FieldSet object associated with the ParticleSet (optional). By default, the fieldset associated with the ParticleSet will be used, but this can be overridden by providing a fieldset here. This is used in cases when the particleset is a ParticleSetView. """ pclass = pset._ptype - time_interval = pset.fieldset.time_interval + if isinstance(pset, ParticleSetView) and fieldset is None: + raise ValueError("When writing a ParticleSetView, a fieldset must be provided to the write() method.") + if fieldset is None: + fieldset = pset.fieldset particle_data = pset._data if self._writer is None: assert not self.path.exists(), "If the file exists, the writer should already be set" - self._writer = pq.ParquetWriter(self.path, _get_schema(pclass, self.metadata, pset.fieldset.time_interval)) + self._writer = pq.ParquetWriter(self.path, _get_schema(pclass, self.metadata, fieldset.time_interval)) if isinstance(time, (np.timedelta64, np.datetime64)): - time = timedelta_to_float(time - time_interval.left) + time = timedelta_to_float(time - fieldset.time_interval.left) vars_to_write = _get_vars_to_write(pclass) if indices is None: indices_to_write = _to_write_particles(particle_data, time)