From c9812c76adb264e0d4b72699b6208b7ee40bdf1a Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 19 Feb 2024 13:04:18 +0000 Subject: [PATCH 01/55] add big file test --- tests/test_compression_remote_reductionist.py | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/test_compression_remote_reductionist.py b/tests/test_compression_remote_reductionist.py index 34e7bfaa..ba42f641 100644 --- a/tests/test_compression_remote_reductionist.py +++ b/tests/test_compression_remote_reductionist.py @@ -182,3 +182,40 @@ def test_compression_and_filters_cmip6_forced_s3_using_local_Reductionist(): result = active[0:2,4:6,7:9] assert nc_min == result assert result == 239.25946044921875 + + +def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file(): + """ + Test identical to previous + test_compression_and_filters_cmip6_forced_s3_from_local_2 + but using a bigger file, for more relevant performance measures. + + This is for a special anon=True bucket connected to via valid key.secret + Variable standard_name: tendency_of_eastward_wind_due_to_orographic_gravity_wave_drag + Variable var_name: m01s06i247_4 + dims: 30 (time) 39 (plev) 325 (lat) 432 (lon) + + Entire mother file info: + [2024-01-24 10:07:02 GMT] 2.8GiB STANDARD da193a_25_day__198807-198807.nc + """ + storage_options = { + 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", + 'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT", + 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"} + } + active_storage_url = "https://192.171.169.248:8080" + bigger_file = "da193a_25_day__198807-198807.nc" + + test_file_uri = os.path.join( + S3_BUCKET, + bigger_file + ) + print("S3 Test file path:", test_file_uri) + active = Active(test_file_uri, 'm01s06i247_4', storage_type="s3", + storage_options=storage_options, + active_storage_url=active_storage_url) + + active._version = 1 + active._method = "min" + + result = active[0:2, 0:3, 4:6, 7:9] From 4afb750f037e1ca0e7ecbc2bd6c7036b7faf6089 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 19 Feb 2024 13:04:29 +0000 Subject: [PATCH 02/55] run big file test --- .github/workflows/test_s3_remote_reductionist.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test_s3_remote_reductionist.yml b/.github/workflows/test_s3_remote_reductionist.yml index 1edc96c3..e5ab6302 100644 --- a/.github/workflows/test_s3_remote_reductionist.yml +++ b/.github/workflows/test_s3_remote_reductionist.yml @@ -7,6 +7,7 @@ on: push: branches: - main # keep this at all times + - test_s3_bigger_file pull_request: schedule: - cron: '0 0 * * *' # nightly @@ -58,9 +59,9 @@ jobs: python -V which python pip install -e . - - name: Run one single test + - name: Run big file run: | - pytest tests/test_compression_remote_reductionist.py + pytest tests/test_compression_remote_reductionist.py::test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file - name: Upload HTML report artifact uses: actions/upload-artifact@v3 with: From 2b0c7c4270972a4c86542d7a5cbe1e9d76c58981 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 19 Feb 2024 13:34:59 +0000 Subject: [PATCH 03/55] use timeit see memory --- .github/workflows/test_s3_remote_reductionist.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_s3_remote_reductionist.yml b/.github/workflows/test_s3_remote_reductionist.yml index e5ab6302..bf845124 100644 --- a/.github/workflows/test_s3_remote_reductionist.yml +++ b/.github/workflows/test_s3_remote_reductionist.yml @@ -61,7 +61,7 @@ jobs: pip install -e . - name: Run big file run: | - pytest tests/test_compression_remote_reductionist.py::test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file + \time pytest tests/test_compression_remote_reductionist.py::test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file - name: Upload HTML report artifact uses: actions/upload-artifact@v3 with: From 935bcafb46fa87fdbdd8f0d54beb7746efbd8ca9 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 19 Feb 2024 13:47:14 +0000 Subject: [PATCH 04/55] one single process Vasili --- .github/workflows/test_s3_remote_reductionist.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_s3_remote_reductionist.yml b/.github/workflows/test_s3_remote_reductionist.yml index bf845124..a67ace43 100644 --- a/.github/workflows/test_s3_remote_reductionist.yml +++ b/.github/workflows/test_s3_remote_reductionist.yml @@ -22,7 +22,7 @@ jobs: runs-on: "ubuntu-latest" strategy: matrix: - python-version: ["3.9", "3.10", "3.11", "3.12"] + python-version: ["3.12"] # ["3.9", "3.10", "3.11", "3.12"] fail-fast: false name: Linux Python ${{ matrix.python-version }} steps: From 40f6e6e6223f3f7895119a0e014f728d9203c41e Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 19 Feb 2024 14:02:06 +0000 Subject: [PATCH 05/55] back to all Pythons --- .github/workflows/test_s3_remote_reductionist.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_s3_remote_reductionist.yml b/.github/workflows/test_s3_remote_reductionist.yml index a67ace43..bf845124 100644 --- a/.github/workflows/test_s3_remote_reductionist.yml +++ b/.github/workflows/test_s3_remote_reductionist.yml @@ -22,7 +22,7 @@ jobs: runs-on: "ubuntu-latest" strategy: matrix: - python-version: ["3.12"] # ["3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] fail-fast: false name: Linux Python ${{ matrix.python-version }} steps: From f8f930fb918e34dcaed58ea482413a5911e0de00 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 20 Feb 2024 13:26:25 +0000 Subject: [PATCH 06/55] toss some timings --- activestorage/active.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/activestorage/active.py b/activestorage/active.py index 1cf03eef..b447310e 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -18,7 +18,9 @@ from activestorage.storage import reduce_chunk from activestorage import netcdf_to_zarr as nz +import time +t1 = time.time() @contextlib.contextmanager def load_from_s3(uri, storage_options=None): """ @@ -259,16 +261,20 @@ def _via_kerchunk(self, index): """ The objective is to use kerchunk to read the slices ourselves. """ + tx = time.time() # FIXME: Order of calls is hardcoded' if self.zds is None: print(f"Kerchunking file {self.uri} with variable " f"{self.ncvar} for storage type {self.storage_type}") + tx1 = time.time() ds, zarray, zattrs = nz.load_netcdf_zarr_generic( self.uri, self.ncvar, self.storage_type, self.storage_options, ) + ty1 = time.time() + print("Time to load to netCDF from Zarr", ty1 - tx1) # The following is a hangove from exploration # and is needed if using the original doing it ourselves # self.zds = make_an_array_instance_active(ds) @@ -284,7 +290,9 @@ def _via_kerchunk(self, index): # FIXME: We do not get the correct byte order on the Zarr # Array's dtype when using S3, so capture it here. self._dtype = np.dtype(zarray['dtype']) - + + ty = time.time() + print("Time Via Kerchunk", ty - tx) return self._get_selection(index) def _get_selection(self, *args): @@ -366,6 +374,7 @@ def _from_storage(self, stripped_indexer, drop_axes, out_shape, out_dtype, else: session = None + t3 = time.time() # Process storage chunks using a thread pool. with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_threads) as executor: futures = [] @@ -394,6 +403,8 @@ def _from_storage(self, stripped_indexer, drop_axes, out_shape, out_dtype, result, selection = result out[selection] = result + t4 = time.time() + print("Storage chunks processing", t4 - t3) if method is not None: # Apply the method (again) to aggregate the result out = method(out) @@ -498,6 +509,8 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, bucket = os.path.dirname(parsed_url.path) # bucketX object = os.path.basename(parsed_url.path) # fileY print("S3 anon=True Bucket and File:", bucket, object) + t2 = time.time() + print("Time before Reductionist", t2 - t1) tmp, count = reductionist.reduce_chunk(session, self.active_storage_url, self._get_endpoint_url(), From b1a1aebb4c95b69e3ee86007225e5f6b476cd8a0 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 20 Feb 2024 13:26:38 +0000 Subject: [PATCH 07/55] save file and toss some timings --- activestorage/netcdf_to_zarr.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 4bd0bb82..8aa2ce13 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -9,6 +9,7 @@ from activestorage.config import * from kerchunk.hdf import SingleHdf5ToZarr +import time def gen_json(file_url, varname, outf, storage_type, storage_options): """Generate a json file that contains the kerchunk-ed data for Zarr.""" @@ -35,12 +36,17 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): storage_options['default_cache_type'] = "none" fs = s3fs.S3FileSystem(**storage_options) fs2 = fsspec.filesystem('') + tk1 = time.time() with fs.open(file_url, 'rb') as s3file: h5chunks = SingleHdf5ToZarr(s3file, file_url, inline_threshold=0) + tk2 = time.time() + print("Time to set up Kerchunk", tk2 - tk1) with fs2.open(outf, 'wb') as f: content = h5chunks.translate() f.write(ujson.dumps(content).encode()) + tk3 = time.time() + print("Time to Translate and Dump Kerchunks to json file", tk3 - tk2) # not S3 else: fs = fsspec.filesystem('') @@ -98,7 +104,9 @@ def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, bu print(f"Storage type {storage_type}") # Write the Zarr group JSON to a temporary file. - with tempfile.NamedTemporaryFile() as out_json: + save_json = "test_file.json" + # with tempfile.NamedTemporaryFile() as out_json: + with open(save_json, "wb") as out_json: _, zarray, zattrs = gen_json(fileloc, varname, out_json.name, From 6030040ce9936aafa42e1477b247ba5fdc74644e Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 20 Feb 2024 13:27:03 +0000 Subject: [PATCH 08/55] fail a test and add v0 test --- tests/test_compression_remote_reductionist.py | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/tests/test_compression_remote_reductionist.py b/tests/test_compression_remote_reductionist.py index ba42f641..699ae56d 100644 --- a/tests/test_compression_remote_reductionist.py +++ b/tests/test_compression_remote_reductionist.py @@ -184,7 +184,7 @@ def test_compression_and_filters_cmip6_forced_s3_using_local_Reductionist(): assert result == 239.25946044921875 -def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file(): +def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v1(): """ Test identical to previous test_compression_and_filters_cmip6_forced_s3_from_local_2 @@ -219,3 +219,41 @@ def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file(): active._method = "min" result = active[0:2, 0:3, 4:6, 7:9] + print(x) + + +def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v0(): + """ + Test identical to previous + test_compression_and_filters_cmip6_forced_s3_from_local_2 + but using a bigger file, for more relevant performance measures. + + This is for a special anon=True bucket connected to via valid key.secret + Variable standard_name: tendency_of_eastward_wind_due_to_orographic_gravity_wave_drag + Variable var_name: m01s06i247_4 + dims: 30 (time) 39 (plev) 325 (lat) 432 (lon) + + Entire mother file info: + [2024-01-24 10:07:02 GMT] 2.8GiB STANDARD da193a_25_day__198807-198807.nc + """ + storage_options = { + 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", + 'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT", + 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"} + } + active_storage_url = "https://192.171.169.248:8080" + bigger_file = "da193a_25_day__198807-198807.nc" + + test_file_uri = os.path.join( + S3_BUCKET, + bigger_file + ) + print("S3 Test file path:", test_file_uri) + active = Active(test_file_uri, 'm01s06i247_4', storage_type="s3", + storage_options=storage_options, + active_storage_url=active_storage_url) + + active._version = 0 + d = active[0:2, 0:3, 4:6, 7:9] + min_result = np.min(d) + print(min_result) From 6423a14cd284fd71534d5ec51a33cfcc247e3eae Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 20 Feb 2024 13:38:57 +0000 Subject: [PATCH 09/55] GA workflow new test names --- .github/workflows/test_s3_remote_reductionist.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test_s3_remote_reductionist.yml b/.github/workflows/test_s3_remote_reductionist.yml index bf845124..4fa6d807 100644 --- a/.github/workflows/test_s3_remote_reductionist.yml +++ b/.github/workflows/test_s3_remote_reductionist.yml @@ -61,7 +61,8 @@ jobs: pip install -e . - name: Run big file run: | - \time pytest tests/test_compression_remote_reductionist.py::test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file + \time pytest tests/test_compression_remote_reductionist.py::test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v1 + \time pytest tests/test_compression_remote_reductionist.py::test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v0 - name: Upload HTML report artifact uses: actions/upload-artifact@v3 with: From c2d426c9ddc94c3db70043b0dfa38e31ab110407 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 22 Feb 2024 15:54:55 +0000 Subject: [PATCH 10/55] turn off storage caching --- activestorage/netcdf_to_zarr.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 8aa2ce13..e9e8364c 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -18,8 +18,8 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, secret=S3_SECRET_KEY, client_kwargs={'endpoint_url': S3_URL}, - default_fill_cache=False, - default_cache_type="none" + # default_fill_cache=False, + # default_cache_type="none" ) fs2 = fsspec.filesystem('') with fs.open(file_url, 'rb') as s3file: @@ -32,8 +32,8 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): # S3 passed-in configuration elif storage_type == "s3" and storage_options is not None: storage_options = storage_options.copy() - storage_options['default_fill_cache'] = False - storage_options['default_cache_type'] = "none" + # storage_options['default_fill_cache'] = False + # storage_options['default_cache_type'] = "none" fs = s3fs.S3FileSystem(**storage_options) fs2 = fsspec.filesystem('') tk1 = time.time() From 67ce586461ee25a501a4819c35ba14e9cce6a08a Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 23 Feb 2024 12:49:32 +0000 Subject: [PATCH 11/55] kluge to get a Group --- activestorage/active.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/activestorage/active.py b/activestorage/active.py index b447310e..88945e50 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -19,6 +19,7 @@ from activestorage import netcdf_to_zarr as nz import time +import zarr t1 = time.time() @contextlib.contextmanager @@ -278,6 +279,8 @@ def _via_kerchunk(self, index): # The following is a hangove from exploration # and is needed if using the original doing it ourselves # self.zds = make_an_array_instance_active(ds) + if isinstance(ds, zarr.hierarchy.Group): + ds = ds[self.ncvar] self.zds = ds # Retain attributes and other information @@ -302,6 +305,7 @@ def _get_selection(self, *args): from zarr and friends and use simple dictionaries and tuples, then we can go to the storage layer with no zarr. """ + print("ZDS", self.zds) compressor = self.zds._compressor filters = self.zds._filters @@ -471,7 +475,7 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, """ coord = '.'.join([str(c) for c in chunk_coords]) key = f"{self.ncvar}/{coord}" - rfile, offset, size = tuple(fsref[key]) + rfile, offset, size = tuple(fsref[self.ncvar + " /" + key]) # S3: pass in pre-configured storage options (credentials) if self.storage_type == "s3": From 748c266763fccbd4f54410932f76c8832644fe1a Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 23 Feb 2024 12:49:40 +0000 Subject: [PATCH 12/55] kluge to get a Group --- activestorage/netcdf_to_zarr.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index e9e8364c..f2eb27af 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -10,6 +10,7 @@ from kerchunk.hdf import SingleHdf5ToZarr import time +import h5py def gen_json(file_url, varname, outf, storage_type, storage_options): """Generate a json file that contains the kerchunk-ed data for Zarr.""" @@ -32,13 +33,21 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): # S3 passed-in configuration elif storage_type == "s3" and storage_options is not None: storage_options = storage_options.copy() - # storage_options['default_fill_cache'] = False + storage_options['default_fill_cache'] = False # storage_options['default_cache_type'] = "none" fs = s3fs.S3FileSystem(**storage_options) fs2 = fsspec.filesystem('') tk1 = time.time() with fs.open(file_url, 'rb') as s3file: - h5chunks = SingleHdf5ToZarr(s3file, file_url, + s3file = h5py.File(s3file, mode="w") + if isinstance(s3file[varname], h5py.Dataset): + print("Looking only at a single Dataset", s3file[varname]) + s3file.create_group(varname + " ") + s3file[varname + " "][varname] = s3file[varname] + elif isinstance(s3file[varname], h5py.Group): + print("Looking only at a single Group", s3file[varname]) + s3file = s3file[varname] + h5chunks = SingleHdf5ToZarr(s3file, file_url, var=varname, inline_threshold=0) tk2 = time.time() print("Time to set up Kerchunk", tk2 - tk1) @@ -69,8 +78,8 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): content = h5chunks.translate() f.write(ujson.dumps(content).encode()) - zarray = ujson.loads(content['refs'][f"{varname}/.zarray"]) - zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"]) + zarray = ujson.loads(content['refs'][f"{varname} /{varname}/.zarray"]) + zattrs = ujson.loads(content['refs'][f"{varname} /{varname}/.zattrs"]) return outf, zarray, zattrs @@ -115,7 +124,7 @@ def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, bu # open this monster print(f"Attempting to open and convert {fileloc}.") - ref_ds = open_zarr_group(out_json.name, varname) + ref_ds = open_zarr_group(out_json.name, varname + " ") return ref_ds, zarray, zattrs From 16e565678a3097e1718299f8a166d4a9158aa6ee Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 26 Feb 2024 14:40:57 +0000 Subject: [PATCH 13/55] fix module --- activestorage/netcdf_to_zarr.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index f2eb27af..802f026d 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -19,8 +19,8 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, secret=S3_SECRET_KEY, client_kwargs={'endpoint_url': S3_URL}, - # default_fill_cache=False, - # default_cache_type="none" + default_fill_cache=False, + default_cache_type="first" ) fs2 = fsspec.filesystem('') with fs.open(file_url, 'rb') as s3file: @@ -34,7 +34,7 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): elif storage_type == "s3" and storage_options is not None: storage_options = storage_options.copy() storage_options['default_fill_cache'] = False - # storage_options['default_cache_type'] = "none" + storage_options['default_cache_type'] = "first" fs = s3fs.S3FileSystem(**storage_options) fs2 = fsspec.filesystem('') tk1 = time.time() @@ -44,10 +44,11 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): print("Looking only at a single Dataset", s3file[varname]) s3file.create_group(varname + " ") s3file[varname + " "][varname] = s3file[varname] + s3file = s3file[varname + " "] elif isinstance(s3file[varname], h5py.Group): print("Looking only at a single Group", s3file[varname]) s3file = s3file[varname] - h5chunks = SingleHdf5ToZarr(s3file, file_url, var=varname, + h5chunks = SingleHdf5ToZarr(s3file, file_url, inline_threshold=0) tk2 = time.time() print("Time to set up Kerchunk", tk2 - tk1) From fb5c5d28851c3f02b04c847a1eef67ab93191b1a Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 26 Feb 2024 14:45:23 +0000 Subject: [PATCH 14/55] add code change suggested for Kerchunk --- activestorage/netcdf_to_zarr.py | 40 +++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 802f026d..621a504d 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -31,6 +31,46 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): f.write(ujson.dumps(content).encode()) # S3 passed-in configuration +#### this implementation works with a minimally changed kerchunk/hdf.py ##### +############################################################################# +### def __init__( +### self, +### h5f: "BinaryIO | str", +### url: str = None, +### spec=1, +### inline_threshold=500, +### storage_options=None, +### error="warn", +### vlen_encode="embed", +### ): +### +### # Open HDF5 file in read mode... +### lggr.debug(f"HDF5 file: {h5f}") +### +### if isinstance(h5f, str): +### fs, path = fsspec.core.url_to_fs(h5f, **(storage_options or {})) +### self.input_file = fs.open(path, "rb") +### url = h5f +### self._h5f = h5py.File(self.input_file, mode="r") +### elif isinstance(h5f, io.IOBase): +### self.input_file = h5f +### self._h5f = h5py.File(self.input_file, mode="r") +### elif isinstance(h5f, (h5py.File, h5py.Group)): +### self._h5f = h5f +### +### self.spec = spec +### self.inline = inline_threshold +### if vlen_encode not in ["embed", "null", "leave", "encode"]: +### raise NotImplementedError +### self.vlen = vlen_encode +### +### self.store = {} +### self._zroot = zarr.group(store=self.store, overwrite=True) +### +### self._uri = url +### self.error = error +### lggr.debug(f"HDF5 file URI: {self._uri}") +############################################################################### elif storage_type == "s3" and storage_options is not None: storage_options = storage_options.copy() storage_options['default_fill_cache'] = False From 256cb6535d67203d2ca90c98e1bb3215ed43484c Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 26 Feb 2024 15:05:48 +0000 Subject: [PATCH 15/55] allow for cases --- activestorage/active.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/activestorage/active.py b/activestorage/active.py index 88945e50..36e54df0 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -475,7 +475,10 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, """ coord = '.'.join([str(c) for c in chunk_coords]) key = f"{self.ncvar}/{coord}" - rfile, offset, size = tuple(fsref[self.ncvar + " /" + key]) + try: + rfile, offset, size = tuple(fsref[self.ncvar + " /" + key]) + except KeyError: + rfile, offset, size = tuple(fsref[key]) # S3: pass in pre-configured storage options (credentials) if self.storage_type == "s3": From c3775cb265b2e6ed09804898f8decb64d2e5e243 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 26 Feb 2024 15:05:59 +0000 Subject: [PATCH 16/55] allow for cases --- activestorage/netcdf_to_zarr.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 621a504d..7b313a52 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -119,8 +119,13 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): content = h5chunks.translate() f.write(ujson.dumps(content).encode()) - zarray = ujson.loads(content['refs'][f"{varname} /{varname}/.zarray"]) - zattrs = ujson.loads(content['refs'][f"{varname} /{varname}/.zattrs"]) + # account for both Group and Dataset + try: + zarray = ujson.loads(content['refs'][f"{varname}/.zarray"]) + zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"]) + except KeyError: + zarray = ujson.loads(content['refs'][f"{varname} /{varname}/.zarray"]) + zattrs = ujson.loads(content['refs'][f"{varname} /{varname}/.zattrs"]) return outf, zarray, zattrs @@ -165,7 +170,10 @@ def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, bu # open this monster print(f"Attempting to open and convert {fileloc}.") - ref_ds = open_zarr_group(out_json.name, varname + " ") + try: + ref_ds = open_zarr_group(out_json.name, varname) + except AttributeError: + ref_ds = open_zarr_group(out_json.name, varname + " ") return ref_ds, zarray, zattrs From e3184a723ea6318b9bf897dd8cfa6ed652a15ef8 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 27 Feb 2024 17:11:44 +0000 Subject: [PATCH 17/55] correct file url for kerchunk and pass storage options --- activestorage/netcdf_to_zarr.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 7b313a52..0f810433 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -78,6 +78,8 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): fs = s3fs.S3FileSystem(**storage_options) fs2 = fsspec.filesystem('') tk1 = time.time() + print("Storage options dict", storage_options) + print("File url", file_url) with fs.open(file_url, 'rb') as s3file: s3file = h5py.File(s3file, mode="w") if isinstance(s3file[varname], h5py.Dataset): @@ -88,8 +90,12 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): elif isinstance(s3file[varname], h5py.Group): print("Looking only at a single Group", s3file[varname]) s3file = s3file[varname] + if not file_url.startswith("s3://"): + file_url = "s3://" + file_url + print("File_url to Kerchunk", file_url) h5chunks = SingleHdf5ToZarr(s3file, file_url, - inline_threshold=0) + inline_threshold=0, + storage_options=storage_options) tk2 = time.time() print("Time to set up Kerchunk", tk2 - tk1) with fs2.open(outf, 'wb') as f: From 6dd3a32c3719ac605697503261c4b8577fe697ea Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Wed, 28 Feb 2024 17:30:12 +0000 Subject: [PATCH 18/55] add function that readds the correct compressor to the reductionist --- activestorage/netcdf_to_zarr.py | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 0f810433..4ea71b33 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -12,6 +12,32 @@ import time import h5py + +def _correct_compressor(content, varname): + """Correct the compressor type as it comes out of Kerchunk.""" + new_content = content.copy() + try: + new_zarray = ujson.loads(new_content['refs'][f"{varname}/.zarray"]) + group = False + except KeyError: + new_zarray = ujson.loads(new_content['refs'][f"{varname} /{varname}/.zarray"]) + group = True + + # re-add the correct compressor if it's in the "filters" list + if new_zarray["compressor"] is None: + for zfilter in new_zarray["filters"]: + if zfilter["id"] == "zlib": + new_zarray["compressor"] = zfilter + new_zarray["filters"].remove(zfilter) + + if not group: + new_content['refs'][f"{varname}/.zarray"] = ujson.dumps(new_zarray) + else: + new_content['refs'][f"{varname} /{varname}/.zarray"] = ujson.dumps(new_zarray) + + return new_content + + def gen_json(file_url, varname, outf, storage_type, storage_options): """Generate a json file that contains the kerchunk-ed data for Zarr.""" # S3 configuration presets @@ -100,6 +126,7 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): print("Time to set up Kerchunk", tk2 - tk1) with fs2.open(outf, 'wb') as f: content = h5chunks.translate() + content = _correct_compressor(content, varname) f.write(ujson.dumps(content).encode()) tk3 = time.time() print("Time to Translate and Dump Kerchunks to json file", tk3 - tk2) @@ -132,7 +159,7 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): except KeyError: zarray = ujson.loads(content['refs'][f"{varname} /{varname}/.zarray"]) zattrs = ujson.loads(content['refs'][f"{varname} /{varname}/.zattrs"]) - + return outf, zarray, zattrs From 0677ca40e3df1e56bc513805bf8725e429a7a9c5 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 29 Feb 2024 13:57:29 +0000 Subject: [PATCH 19/55] just print chunk selections to look at them --- activestorage/active.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/activestorage/active.py b/activestorage/active.py index 36e54df0..d54339f3 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -383,6 +383,11 @@ def _from_storage(self, stripped_indexer, drop_axes, out_shape, out_dtype, with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_threads) as executor: futures = [] # Submit chunks for processing. + print("Stripped indexer") + for chunk_coords, chunk_selection, out_selection in stripped_indexer: + print("Chunk coords", chunk_coords) + print("Chunk selection", chunk_selection) + print("Out selection", out_selection) for chunk_coords, chunk_selection, out_selection in stripped_indexer: future = executor.submit( self._process_chunk, From f3dc818fe75bc633d53f722241f38167a3a089a6 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 29 Feb 2024 15:38:08 +0000 Subject: [PATCH 20/55] finally made it work all through --- activestorage/netcdf_to_zarr.py | 82 ++++++++------------------------- 1 file changed, 19 insertions(+), 63 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 4ea71b33..ff32b7c8 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -13,8 +13,14 @@ import h5py -def _correct_compressor(content, varname): - """Correct the compressor type as it comes out of Kerchunk.""" +def _correct_compressor_and_filename(content, varname): + """ + Correct the compressor type as it comes out of Kerchunk. + Also correct file name as Kerchnk now prefixes it with "s3://" + and for special buckets like Bryan's bnl the correct file is bnl/file.nc + not s3://bnl/file.nc + """ + tc1 = time.time() new_content = content.copy() try: new_zarray = ujson.loads(new_content['refs'][f"{varname}/.zarray"]) @@ -35,6 +41,14 @@ def _correct_compressor(content, varname): else: new_content['refs'][f"{varname} /{varname}/.zarray"] = ujson.dumps(new_zarray) + # FIXME TODO this is an absolute nightmate: the type of bucket on UOR ACES + # this is a HACK and it works only with the crazy Bryan S3 bucket "bnl/file.nc" + for key in new_content['refs'].keys(): + if varname in key and isinstance(new_content['refs'][key], list) and "s3://" in new_content['refs'][key][0]: + new_content['refs'][key][0] = new_content['refs'][key][0].replace("s3://", "") + + tc2 = time.time() + print("Time to manipulate Kerchunk Zarr output", tc2 - tc1) return new_content @@ -57,46 +71,6 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): f.write(ujson.dumps(content).encode()) # S3 passed-in configuration -#### this implementation works with a minimally changed kerchunk/hdf.py ##### -############################################################################# -### def __init__( -### self, -### h5f: "BinaryIO | str", -### url: str = None, -### spec=1, -### inline_threshold=500, -### storage_options=None, -### error="warn", -### vlen_encode="embed", -### ): -### -### # Open HDF5 file in read mode... -### lggr.debug(f"HDF5 file: {h5f}") -### -### if isinstance(h5f, str): -### fs, path = fsspec.core.url_to_fs(h5f, **(storage_options or {})) -### self.input_file = fs.open(path, "rb") -### url = h5f -### self._h5f = h5py.File(self.input_file, mode="r") -### elif isinstance(h5f, io.IOBase): -### self.input_file = h5f -### self._h5f = h5py.File(self.input_file, mode="r") -### elif isinstance(h5f, (h5py.File, h5py.Group)): -### self._h5f = h5f -### -### self.spec = spec -### self.inline = inline_threshold -### if vlen_encode not in ["embed", "null", "leave", "encode"]: -### raise NotImplementedError -### self.vlen = vlen_encode -### -### self.store = {} -### self._zroot = zarr.group(store=self.store, overwrite=True) -### -### self._uri = url -### self.error = error -### lggr.debug(f"HDF5 file URI: {self._uri}") -############################################################################### elif storage_type == "s3" and storage_options is not None: storage_options = storage_options.copy() storage_options['default_fill_cache'] = False @@ -104,8 +78,6 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): fs = s3fs.S3FileSystem(**storage_options) fs2 = fsspec.filesystem('') tk1 = time.time() - print("Storage options dict", storage_options) - print("File url", file_url) with fs.open(file_url, 'rb') as s3file: s3file = h5py.File(s3file, mode="w") if isinstance(s3file[varname], h5py.Dataset): @@ -116,20 +88,19 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): elif isinstance(s3file[varname], h5py.Group): print("Looking only at a single Group", s3file[varname]) s3file = s3file[varname] + # Kerchunk wants the correct file name in S3 format if not file_url.startswith("s3://"): file_url = "s3://" + file_url - print("File_url to Kerchunk", file_url) h5chunks = SingleHdf5ToZarr(s3file, file_url, inline_threshold=0, storage_options=storage_options) tk2 = time.time() - print("Time to set up Kerchunk", tk2 - tk1) with fs2.open(outf, 'wb') as f: content = h5chunks.translate() - content = _correct_compressor(content, varname) + content = _correct_compressor_and_filename(content, varname) f.write(ujson.dumps(content).encode()) tk3 = time.time() - print("Time to Translate and Dump Kerchunks to json file", tk3 - tk2) + print("Time to Kerchunk and write JSON file", tk3 - tk2) # not S3 else: fs = fsspec.filesystem('') @@ -182,7 +153,6 @@ def open_zarr_group(out_json, varname): print(f"Zarr Group does not contain variable {varname}. " f"Zarr Group info: {zarr_group.info}") raise attrerr - #print("Zarr array info:", zarr_array.info) return zarr_array @@ -209,17 +179,3 @@ def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, bu ref_ds = open_zarr_group(out_json.name, varname + " ") return ref_ds, zarray, zattrs - - -#d = {'version': 1, -# 'refs': { -# '.zgroup': '{"zarr_format":2}', -# '.zattrs': '{"Conventions":"CF-1.6","access-list":"grenvillelister simonwilson jeffcole","awarning":"**** THIS SUITE WILL ARCHIVE NON-DUPLEXED DATA TO MOOSE. FOR CRITICAL MODEL RUNS SWITCH TO DUPLEXED IN: postproc --> Post Processing - common settings --> Moose Archiving --> non_duplexed_set. Follow guidance in http:\\/\\/www-twiki\\/Main\\/MassNonDuplexPolicy","branch-date":"1950-01-01","calendar":"360_day","code-version":"UM 11.6, NEMO vn3.6","creation_time":"2022-10-28 12:28","decription":"Initialised from EN4 climatology","description":"Copy of u-ar696\\/trunk@77470","email":"r.k.schieman@reading.ac.uk","end-date":"2015-01-01","experiment-id":"historical","forcing":"AA,BC,CO2","forcing-info":"blah, blah, blah","institution":"NCAS","macro-parent-experiment-id":"historical","macro-parent-experiment-mip":"CMIP","macro-parent-variant-id":"r1i1p1f3","model-id":"HadGEM3-CG31-MM","name":"\\/work\\/n02\\/n02\\/grenvill\\/cylc-run\\/u-cn134\\/share\\/cycle\\/19500101T0000Z\\/3h_","owner":"rosalynhatcher","project":"Coupled Climate","timeStamp":"2022-Oct-28 12:20:33 GMT","title":"[CANARI] GC3.1 N216 ORCA025 UM11.6","uuid":"51e5ef20-d376-4aa6-938e-4c242886b7b1"}', -# 'lat/.zarray': '{"chunks":[324],"compressor":{"id":"zlib","level":1},"dtype":" Date: Thu, 29 Feb 2024 15:52:25 +0000 Subject: [PATCH 21/55] streamline --- activestorage/netcdf_to_zarr.py | 39 +++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index ff32b7c8..a2674642 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -13,7 +13,7 @@ import h5py -def _correct_compressor_and_filename(content, varname): +def _correct_compressor_and_filename(content, varname, bryan_bucket=False): """ Correct the compressor type as it comes out of Kerchunk. Also correct file name as Kerchnk now prefixes it with "s3://" @@ -43,9 +43,13 @@ def _correct_compressor_and_filename(content, varname): # FIXME TODO this is an absolute nightmate: the type of bucket on UOR ACES # this is a HACK and it works only with the crazy Bryan S3 bucket "bnl/file.nc" - for key in new_content['refs'].keys(): - if varname in key and isinstance(new_content['refs'][key], list) and "s3://" in new_content['refs'][key][0]: - new_content['refs'][key][0] = new_content['refs'][key][0].replace("s3://", "") + # the problem: filename gets written to JSON as "s3://bnl/file.nc" but Reductionist doesn't + # find it since it needs url=bnl/file.nc, with endpoint URL being extracted from the + # endpoint_url of storage_options. BAH! + if bryan_bucket: + for key in new_content['refs'].keys(): + if varname in key and isinstance(new_content['refs'][key], list) and "s3://" in new_content['refs'][key][0]: + new_content['refs'][key][0] = new_content['refs'][key][0].replace("s3://", "") tc2 = time.time() print("Time to manipulate Kerchunk Zarr output", tc2 - tc1) @@ -91,13 +95,18 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): # Kerchunk wants the correct file name in S3 format if not file_url.startswith("s3://"): file_url = "s3://" + file_url + bryan_bucket = False + if "bnl" in file_url: + bryan_bucket = True h5chunks = SingleHdf5ToZarr(s3file, file_url, inline_threshold=0, storage_options=storage_options) tk2 = time.time() with fs2.open(outf, 'wb') as f: content = h5chunks.translate() - content = _correct_compressor_and_filename(content, varname) + content = _correct_compressor_and_filename(content, + varname, + bryan_bucket=bryan_bucket) f.write(ujson.dumps(content).encode()) tk3 = time.time() print("Time to Kerchunk and write JSON file", tk3 - tk2) @@ -148,11 +157,16 @@ def open_zarr_group(out_json, varname): zarr_group = zarr.open_group(mapper) try: - zarr_array = getattr(zarr_group, varname) - except AttributeError as attrerr: - print(f"Zarr Group does not contain variable {varname}. " - f"Zarr Group info: {zarr_group.info}") - raise attrerr + print("Trying opening Zarr object as Group") + zarr_array = getattr(zarr_group, varname + " ") + except AttributeError as attrerr_group: + print("Trying opening Zarr object as Dataset") + try: + zarr_array = getattr(zarr_group, varname) + except AttributeError as attrerr: + print(f"Zarr Group does not contain variable {varname}. " + f"Zarr Group info: {zarr_group.info}") + raise attrerr return zarr_array @@ -173,9 +187,6 @@ def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, bu # open this monster print(f"Attempting to open and convert {fileloc}.") - try: - ref_ds = open_zarr_group(out_json.name, varname) - except AttributeError: - ref_ds = open_zarr_group(out_json.name, varname + " ") + ref_ds = open_zarr_group(out_json.name, varname) return ref_ds, zarray, zattrs From 0425e3caa3bf17859d8d80b3b622542f0ab28b14 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 29 Feb 2024 15:57:12 +0000 Subject: [PATCH 22/55] cleanup prints --- activestorage/active.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/activestorage/active.py b/activestorage/active.py index d54339f3..973d2255 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -295,7 +295,7 @@ def _via_kerchunk(self, index): self._dtype = np.dtype(zarray['dtype']) ty = time.time() - print("Time Via Kerchunk", ty - tx) + print("Time of _via_kerchunk()", ty - tx) return self._get_selection(index) def _get_selection(self, *args): @@ -305,7 +305,6 @@ def _get_selection(self, *args): from zarr and friends and use simple dictionaries and tuples, then we can go to the storage layer with no zarr. """ - print("ZDS", self.zds) compressor = self.zds._compressor filters = self.zds._filters @@ -378,16 +377,10 @@ def _from_storage(self, stripped_indexer, drop_axes, out_shape, out_dtype, else: session = None - t3 = time.time() # Process storage chunks using a thread pool. with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_threads) as executor: futures = [] # Submit chunks for processing. - print("Stripped indexer") - for chunk_coords, chunk_selection, out_selection in stripped_indexer: - print("Chunk coords", chunk_coords) - print("Chunk selection", chunk_selection) - print("Out selection", out_selection) for chunk_coords, chunk_selection, out_selection in stripped_indexer: future = executor.submit( self._process_chunk, @@ -412,8 +405,6 @@ def _from_storage(self, stripped_indexer, drop_axes, out_shape, out_dtype, result, selection = result out[selection] = result - t4 = time.time() - print("Storage chunks processing", t4 - t3) if method is not None: # Apply the method (again) to aggregate the result out = method(out) @@ -516,13 +507,13 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, # special case for "anon=True" buckets that work only with e.g. # fs = s3fs.S3FileSystem(anon=True, client_kwargs={'endpoint_url': S3_URL}) # where file uri = bucketX/fileY.mc - print("S3 Storage options to Reductionist:", self.storage_options) + print("S3 Storage options:", self.storage_options) if self.storage_options.get("anon", None) == True: bucket = os.path.dirname(parsed_url.path) # bucketX object = os.path.basename(parsed_url.path) # fileY print("S3 anon=True Bucket and File:", bucket, object) t2 = time.time() - print("Time before Reductionist", t2 - t1) + print("Total time before going in Reductionist", t2 - t1) tmp, count = reductionist.reduce_chunk(session, self.active_storage_url, self._get_endpoint_url(), From f52693c292976c9044627c654cd6ff714c21aa30 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 29 Feb 2024 17:34:37 +0000 Subject: [PATCH 23/55] handle exception --- activestorage/netcdf_to_zarr.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index a2674642..b1fe31a4 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -70,8 +70,14 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): with fs.open(file_url, 'rb') as s3file: h5chunks = SingleHdf5ToZarr(s3file, file_url, inline_threshold=0) + bryan_bucket = False + if "bnl" in file_url: + bryan_bucket = True with fs2.open(outf, 'wb') as f: content = h5chunks.translate() + content = _correct_compressor_and_filename(content, + varname, + bryan_bucket=bryan_bucket) f.write(ujson.dumps(content).encode()) # S3 passed-in configuration @@ -155,18 +161,20 @@ def open_zarr_group(out_json, varname): mapper = fs.get_mapper("") # local FS mapper #mapper.fs.reference has the kerchunk mapping, how does this propagate into the Zarr array? zarr_group = zarr.open_group(mapper) - + + not_group = False try: - print("Trying opening Zarr object as Group") zarr_array = getattr(zarr_group, varname + " ") - except AttributeError as attrerr_group: - print("Trying opening Zarr object as Dataset") + except AttributeError: + not_group = True + pass + if not_group: try: zarr_array = getattr(zarr_group, varname) - except AttributeError as attrerr: + except AttributeError: print(f"Zarr Group does not contain variable {varname}. " f"Zarr Group info: {zarr_group.info}") - raise attrerr + raise return zarr_array From e9a8409f845a6cc8644fc497e3c815e6d051c836 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 29 Feb 2024 17:53:41 +0000 Subject: [PATCH 24/55] revert to JSON tempfile before I die of not realizing WTF was going one hahah --- activestorage/netcdf_to_zarr.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index b1fe31a4..24269562 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -184,9 +184,7 @@ def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, bu print(f"Storage type {storage_type}") # Write the Zarr group JSON to a temporary file. - save_json = "test_file.json" - # with tempfile.NamedTemporaryFile() as out_json: - with open(save_json, "wb") as out_json: + with tempfile.NamedTemporaryFile() as out_json: _, zarray, zattrs = gen_json(fileloc, varname, out_json.name, From ecc203d7d632e77a41a237d5883b3964be23884c Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 1 Mar 2024 13:06:05 +0000 Subject: [PATCH 25/55] added comments --- activestorage/netcdf_to_zarr.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 24269562..9499b5f0 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -89,6 +89,9 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): fs2 = fsspec.filesystem('') tk1 = time.time() with fs.open(file_url, 'rb') as s3file: + # restrict only to the Group/Dataset that the varname belongs to + # this saves 2-3x time in Kerchunk + # TODO make this available to the other routines that use Kerchunk s3file = h5py.File(s3file, mode="w") if isinstance(s3file[varname], h5py.Dataset): print("Looking only at a single Dataset", s3file[varname]) From 8f92ae778e854ec038dc27bb1dc7fe28b6222339 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 1 Mar 2024 13:06:16 +0000 Subject: [PATCH 26/55] unfail test --- tests/test_compression_remote_reductionist.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_compression_remote_reductionist.py b/tests/test_compression_remote_reductionist.py index 699ae56d..e4ec7d11 100644 --- a/tests/test_compression_remote_reductionist.py +++ b/tests/test_compression_remote_reductionist.py @@ -2,6 +2,8 @@ import numpy as np import pytest +import utils + from netCDF4 import Dataset from pathlib import Path @@ -10,8 +12,6 @@ from activestorage.dummy_data import make_compressed_ncdata from activestorage.reductionist import ReductionistError as RedErr -import utils - # Bryan's S3 machine + Bryan's reductionist STORAGE_OPTIONS_Bryan = { @@ -197,6 +197,8 @@ def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v1(): Entire mother file info: [2024-01-24 10:07:02 GMT] 2.8GiB STANDARD da193a_25_day__198807-198807.nc + + NOTE: we used this test as timing reference for performance testing. """ storage_options = { 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", @@ -219,7 +221,6 @@ def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v1(): active._method = "min" result = active[0:2, 0:3, 4:6, 7:9] - print(x) def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v0(): From dbae5641daccd7ffaabfe828a5a0aff16b95171b Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 1 Mar 2024 13:26:30 +0000 Subject: [PATCH 27/55] add info about PytestUnraisableError --- tests/test_compression_remote_reductionist.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_compression_remote_reductionist.py b/tests/test_compression_remote_reductionist.py index e4ec7d11..02499137 100644 --- a/tests/test_compression_remote_reductionist.py +++ b/tests/test_compression_remote_reductionist.py @@ -184,6 +184,18 @@ def test_compression_and_filters_cmip6_forced_s3_using_local_Reductionist(): assert result == 239.25946044921875 +# if pytest is run on a single thread, this test throws a PytestUnraisableException +# followed at the end by a SegFault (test passes fine, and writes report etc); when +# pytest runs in n>=2 threads all is fine. This is defo due to something in Kerchunk +# tests/test_compression_remote_reductionist.py::test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v1 +# /home/valeriu/miniconda3/envs/pyactive/lib/python3.12/site-packages/_pytest/unraisableexception.py:80: PytestUnraisableExceptionWarning: Exception ignored in: 'h5py._objects.ObjectID.__dealloc__' +# +# Traceback (most recent call last): +# File "h5py/_objects.pyx", line 201, in h5py._objects.ObjectID.__dealloc__ +# File "h5py/h5fd.pyx", line 180, in h5py.h5fd.H5FD_fileobj_truncate +# AttributeError: 'S3File' object has no attribute 'truncate' +# For no I am just shutting this up, later we may have to take it up with Kerchunk +@pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning") def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v1(): """ Test identical to previous From 40f110b1a399979e93966976b97689bb9acde385 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 1 Mar 2024 13:34:20 +0000 Subject: [PATCH 28/55] cleanup GA flow --- .github/workflows/test_s3_remote_reductionist.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test_s3_remote_reductionist.yml b/.github/workflows/test_s3_remote_reductionist.yml index 4fa6d807..c1e71b36 100644 --- a/.github/workflows/test_s3_remote_reductionist.yml +++ b/.github/workflows/test_s3_remote_reductionist.yml @@ -7,7 +7,6 @@ on: push: branches: - main # keep this at all times - - test_s3_bigger_file pull_request: schedule: - cron: '0 0 * * *' # nightly @@ -59,10 +58,12 @@ jobs: python -V which python pip install -e . - - name: Run big file + - name: Run gold test run: | \time pytest tests/test_compression_remote_reductionist.py::test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v1 - \time pytest tests/test_compression_remote_reductionist.py::test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v0 + - name: Run remote reductionist tests + run: | + pytest tests/test_compression_remote_reductionist.py - name: Upload HTML report artifact uses: actions/upload-artifact@v3 with: From 91d3f5709502c7065dfe17b80bdd5d6d9087cbb6 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Wed, 6 Mar 2024 16:42:46 +0000 Subject: [PATCH 29/55] pin kerchunk --- environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index 8aa5f34f..b850ba33 100644 --- a/environment.yml +++ b/environment.yml @@ -10,7 +10,7 @@ dependencies: - fsspec - h5netcdf - h5py # needed by Kerchunk - - kerchunk + - kerchunk >=0.2.4 # issues with numcodecs in 0.2.2 and 0.2.3 - netcdf4 - numcodecs >=0.12 # github.com/valeriupredoi/PyActiveStorage/issues/162 - numpy !=1.24.3 # severe masking bug From 8cbf1f03d00de68276b5016a18beda3eb972864d Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Wed, 6 Mar 2024 16:42:56 +0000 Subject: [PATCH 30/55] pin kerchunk --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index ed771420..516811d3 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ 'fsspec', 'h5netcdf', 'h5py', # needed by Kerchunk - 'kerchunk', + 'kerchunk>=0.2.4', # issues with numcodecs in 0.2.2/3 'netcdf4', 'numcodecs>=0.12', # github/issues/162 'numpy!=1.24.3', # severe masking bug From fcefd231d4d016a3990e1836fe16416fcf1ebe7c Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 7 Mar 2024 12:28:59 +0000 Subject: [PATCH 31/55] use correct attributes compression and shuffle --- tests/test_compression.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_compression.py b/tests/test_compression.py index 1cba14b1..206c1568 100644 --- a/tests/test_compression.py +++ b/tests/test_compression.py @@ -17,8 +17,8 @@ def check_dataset_filters(temp_file: str, ncvar: str, compression: str, shuffle: if USE_S3: with load_from_s3(temp_file) as test_data: # NOTE: h5netcdf thinks zlib is gzip - assert test_data.variables[ncvar].compression == "gzip" - assert test_data.variables[ncvar].shuffle == shuffle + assert test_data[ncvar].compression == "gzip" + assert test_data[ncvar].shuffle == shuffle else: with Dataset(temp_file) as test_data: test_data_filters = test_data.variables[ncvar].filters() From a4aadce4c2ecd6e64a9bee60fa2218af55c89a16 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 7 Mar 2024 12:56:20 +0000 Subject: [PATCH 32/55] Revert "use correct attributes compression and shuffle" This reverts commit fcefd231d4d016a3990e1836fe16416fcf1ebe7c. --- tests/test_compression.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_compression.py b/tests/test_compression.py index 206c1568..1cba14b1 100644 --- a/tests/test_compression.py +++ b/tests/test_compression.py @@ -17,8 +17,8 @@ def check_dataset_filters(temp_file: str, ncvar: str, compression: str, shuffle: if USE_S3: with load_from_s3(temp_file) as test_data: # NOTE: h5netcdf thinks zlib is gzip - assert test_data[ncvar].compression == "gzip" - assert test_data[ncvar].shuffle == shuffle + assert test_data.variables[ncvar].compression == "gzip" + assert test_data.variables[ncvar].shuffle == shuffle else: with Dataset(temp_file) as test_data: test_data_filters = test_data.variables[ncvar].filters() From 8f9e9abbb6f48a60824f58ededee03c6864758ab Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 7 Mar 2024 13:16:02 +0000 Subject: [PATCH 33/55] fix case filters is None --- activestorage/netcdf_to_zarr.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index cc7e7325..fd6f6b97 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -20,8 +20,9 @@ def _correct_compressor_and_filename(content, varname, bryan_bucket=False): and for special buckets like Bryan's bnl the correct file is bnl/file.nc not s3://bnl/file.nc """ - tc1 = time.time() new_content = content.copy() + + # prelimniary assembly try: new_zarray = ujson.loads(new_content['refs'][f"{varname}/.zarray"]) group = False @@ -30,16 +31,16 @@ def _correct_compressor_and_filename(content, varname, bryan_bucket=False): group = True # re-add the correct compressor if it's in the "filters" list - if new_zarray["compressor"] is None: + if new_zarray["compressor"] is None and new_zarray["filters"]: for zfilter in new_zarray["filters"]: if zfilter["id"] == "zlib": new_zarray["compressor"] = zfilter new_zarray["filters"].remove(zfilter) - if not group: - new_content['refs'][f"{varname}/.zarray"] = ujson.dumps(new_zarray) - else: - new_content['refs'][f"{varname} /{varname}/.zarray"] = ujson.dumps(new_zarray) + if not group: + new_content['refs'][f"{varname}/.zarray"] = ujson.dumps(new_zarray) + else: + new_content['refs'][f"{varname} /{varname}/.zarray"] = ujson.dumps(new_zarray) # FIXME TODO this is an absolute nightmate: the type of bucket on UOR ACES # this is a HACK and it works only with the crazy Bryan S3 bucket "bnl/file.nc" @@ -51,8 +52,6 @@ def _correct_compressor_and_filename(content, varname, bryan_bucket=False): if varname in key and isinstance(new_content['refs'][key], list) and "s3://" in new_content['refs'][key][0]: new_content['refs'][key][0] = new_content['refs'][key][0].replace("s3://", "") - tc2 = time.time() - print("Time to manipulate Kerchunk Zarr output", tc2 - tc1) return new_content From ea20e1609990de116aa1c6b302ffe9b326eb2878 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 7 Mar 2024 13:25:52 +0000 Subject: [PATCH 34/55] use two procs for pytest --- .github/workflows/test_s3_minio.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_s3_minio.yml b/.github/workflows/test_s3_minio.yml index c1399441..f033c17b 100644 --- a/.github/workflows/test_s3_minio.yml +++ b/.github/workflows/test_s3_minio.yml @@ -65,7 +65,7 @@ jobs: pip install -e . - name: Run tests run: | - pytest + pytest -n 2 - name: Run S3 exploratory tests run: | pytest tests/s3_exploratory/test_s3_reduction.py --html=test-reports/s3-exploratory-report.html From 151284dce598a16b767621fa8b0510365bcddb40 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 7 Mar 2024 16:01:30 +0000 Subject: [PATCH 35/55] replace s3 test file because I am a pillock and I overwrote the other --- tests/test_compression_remote_reductionist.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/test_compression_remote_reductionist.py b/tests/test_compression_remote_reductionist.py index 02499137..8a5eac3d 100644 --- a/tests/test_compression_remote_reductionist.py +++ b/tests/test_compression_remote_reductionist.py @@ -208,7 +208,9 @@ def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v1(): dims: 30 (time) 39 (plev) 325 (lat) 432 (lon) Entire mother file info: - [2024-01-24 10:07:02 GMT] 2.8GiB STANDARD da193a_25_day__198807-198807.nc + mc ls bryan/bnl/da193a_25_day__198808-198808.nc + [2024-01-24 10:07:03 GMT] 2.8GiB STANDARD da193a_25_day__198808-198808.nc + NOTE: we used this test as timing reference for performance testing. """ @@ -218,7 +220,7 @@ def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v1(): 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"} } active_storage_url = "https://192.171.169.248:8080" - bigger_file = "da193a_25_day__198807-198807.nc" + bigger_file = "da193a_25_day__198808-198808.nc" test_file_uri = os.path.join( S3_BUCKET, @@ -247,7 +249,9 @@ def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v0(): dims: 30 (time) 39 (plev) 325 (lat) 432 (lon) Entire mother file info: - [2024-01-24 10:07:02 GMT] 2.8GiB STANDARD da193a_25_day__198807-198807.nc + mc ls bryan/bnl/da193a_25_day__198808-198808.nc + [2024-01-24 10:07:03 GMT] 2.8GiB STANDARD da193a_25_day__198808-198808.nc + """ storage_options = { 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", @@ -255,7 +259,7 @@ def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v0(): 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"} } active_storage_url = "https://192.171.169.248:8080" - bigger_file = "da193a_25_day__198807-198807.nc" + bigger_file = "da193a_25_day__198808-198808.nc" test_file_uri = os.path.join( S3_BUCKET, From 8c82fb8cf543a6fce380801e8d8cecbda1487a8d Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 7 Mar 2024 17:39:27 +0000 Subject: [PATCH 36/55] fix SegFault --- activestorage/netcdf_to_zarr.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index fd6f6b97..d3263607 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -87,19 +87,20 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): fs = s3fs.S3FileSystem(**storage_options) fs2 = fsspec.filesystem('') tk1 = time.time() - with fs.open(file_url, 'rb') as s3file: + with fs.open(file_url, 'rb') as s3file_o: # restrict only to the Group/Dataset that the varname belongs to # this saves 2-3x time in Kerchunk # TODO make this available to the other routines that use Kerchunk - s3file = h5py.File(s3file, mode="w") - if isinstance(s3file[varname], h5py.Dataset): - print("Looking only at a single Dataset", s3file[varname]) - s3file.create_group(varname + " ") - s3file[varname + " "][varname] = s3file[varname] - s3file = s3file[varname + " "] - elif isinstance(s3file[varname], h5py.Group): - print("Looking only at a single Group", s3file[varname]) - s3file = s3file[varname] + s3file_r = h5py.File(s3file_o, mode="r") + s3file_w = h5py.File(s3file_o, mode="w") + if isinstance(s3file_r[varname], h5py.Dataset): + print("Looking only at a single Dataset", s3file_r[varname]) + s3file_w.create_group(varname + " ") + s3file_w[varname + " "][varname] = s3file_w[varname] + s3file = s3file_w[varname + " "] + elif isinstance(s3file_r[varname], h5py.Group): + print("Looking only at a single Group", s3file_r[varname]) + s3file = s3file_r[varname] # Kerchunk wants the correct file name in S3 format if not file_url.startswith("s3://"): file_url = "s3://" + file_url From aa30b0bb04f60c81f3d4fcfe91ba04c7dafc573f Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 8 Mar 2024 16:19:42 +0000 Subject: [PATCH 37/55] add extension to truncate to the other kerchunking block and lots of inline comments --- activestorage/netcdf_to_zarr.py | 38 ++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index d3263607..35f24843 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -66,12 +66,27 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): default_cache_type="first" # best for HDF5 ) fs2 = fsspec.filesystem('') - with fs.open(file_url, 'rb') as s3file: + with fs.open(file_url, 'rb') as s3file_o_1: + # see below for reasoning behind this + s3file_r_1 = h5py.File(s3file_o_1, mode="r") + s3file_w_1 = h5py.File(s3file_o_1, mode="w") + if isinstance(s3file_r_1[varname], h5py.Dataset): + print("Looking only at a single Dataset", s3file_r_1[varname]) + s3file_w_1.create_group(varname + " ") + s3file_w_1[varname + " "][varname] = s3file_w_1[varname] + s3file = s3file_w_1[varname + " "] + elif isinstance(s3file_r_1[varname], h5py.Group): + print("Looking only at a single Group", s3file_r_1[varname]) + s3file = s3file_r_1[varname] + h5chunks = SingleHdf5ToZarr(s3file, file_url, inline_threshold=0) + + # TODO absolute crap, this needs to go bryan_bucket = False if "bnl" in file_url: bryan_bucket = True + with fs2.open(outf, 'wb') as f: content = h5chunks.translate() content = _correct_compressor_and_filename(content, @@ -89,8 +104,21 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): tk1 = time.time() with fs.open(file_url, 'rb') as s3file_o: # restrict only to the Group/Dataset that the varname belongs to - # this saves 2-3x time in Kerchunk - # TODO make this available to the other routines that use Kerchunk + # this saves 4-5x time in Kerchunk + # Restrict the s3file HDF5 file only to the Dataset or Group of interest. + # This bit extracts the Dataset or Group of interest + # (depending what type of object the varname is in). It is the best we can do with + # non-breaking h5py API, This is a touchy bit of said API, and depending on the + # way things are coded, can easily through SegFaults. Explanations: + # - an s3fs File object with HDF5 structure is passed in + # - h5py allows structural edits (creating/adding a Group) + # only if opening said file in WRITE mode + # - clear distinction between said File open in W mode as opposed to + # said file open in R(B) mode + # - the reason we open it in R mode is that we can only truncate it (select on key) if in R mode + # and then migrate extracted data to the file open in W mode + # - operations like copy or selection/truncating will always throw SegFaults + # if not operating with two open Files: W and R s3file_r = h5py.File(s3file_o, mode="r") s3file_w = h5py.File(s3file_o, mode="w") if isinstance(s3file_r[varname], h5py.Dataset): @@ -101,12 +129,16 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): elif isinstance(s3file_r[varname], h5py.Group): print("Looking only at a single Group", s3file_r[varname]) s3file = s3file_r[varname] + # Kerchunk wants the correct file name in S3 format if not file_url.startswith("s3://"): file_url = "s3://" + file_url + + # TODO absolute crap: this needs to go bryan_bucket = False if "bnl" in file_url: bryan_bucket = True + h5chunks = SingleHdf5ToZarr(s3file, file_url, inline_threshold=0, storage_options=storage_options) From 0a6bd1d317b70a20dc23b39063882acbba84d26f Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 8 Mar 2024 16:33:35 +0000 Subject: [PATCH 38/55] add valid storage options for file opening in Kerchunk --- activestorage/netcdf_to_zarr.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 35f24843..1545bbfc 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -79,8 +79,12 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): print("Looking only at a single Group", s3file_r_1[varname]) s3file = s3file_r_1[varname] + storage_options = {"key": S3_ACCESS_KEY, + "secret": S3_SECRET_KEY, + "client_kwargs": {'endpoint_url': S3_URL}} h5chunks = SingleHdf5ToZarr(s3file, file_url, - inline_threshold=0) + inline_threshold=0, + storage_options=storage_options) # TODO absolute crap, this needs to go bryan_bucket = False @@ -119,6 +123,8 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): # and then migrate extracted data to the file open in W mode # - operations like copy or selection/truncating will always throw SegFaults # if not operating with two open Files: W and R + # - this block can not be extracted into a function because we need to dealloc each instance of + # s3file_o, s3file_r and s3file_w (hence the naming is different in the step above) s3file_r = h5py.File(s3file_o, mode="r") s3file_w = h5py.File(s3file_o, mode="w") if isinstance(s3file_r[varname], h5py.Dataset): From 2f75873ee70858dc180ddc177a816580fb49b427 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 8 Mar 2024 16:44:30 +0000 Subject: [PATCH 39/55] reinstance variable in test --- tests/s3_exploratory/test_s3_reduction.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/s3_exploratory/test_s3_reduction.py b/tests/s3_exploratory/test_s3_reduction.py index 3546a528..34cd6157 100644 --- a/tests/s3_exploratory/test_s3_reduction.py +++ b/tests/s3_exploratory/test_s3_reduction.py @@ -115,11 +115,11 @@ def test_with_valid_netCDF_file(test_data_path): print("S3 file uri", s3_testfile_uri) # run Active on s3 file - active = Active(s3_testfile_uri, "TREFHT", "s3") - active._version = 2 - active.method = "mean" - active.components = True - result1 = active[4:5, 1:2] + active2 = Active(s3_testfile_uri, "TREFHT", "s3") + active2._version = 2 + active2.method = "mean" + active2.components = True + result1 = active2[4:5, 1:2] print(result1) # expect {'sum': array([[[2368.3232]]], dtype=float32), 'n': array([[[8]]])} From 6a4206d61b0aff061bf444f2c12a6708c74956cc Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 8 Mar 2024 16:49:57 +0000 Subject: [PATCH 40/55] del pointer --- tests/s3_exploratory/test_s3_reduction.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/s3_exploratory/test_s3_reduction.py b/tests/s3_exploratory/test_s3_reduction.py index 34cd6157..d8e90026 100644 --- a/tests/s3_exploratory/test_s3_reduction.py +++ b/tests/s3_exploratory/test_s3_reduction.py @@ -104,6 +104,9 @@ def test_with_valid_netCDF_file(test_data_path): active._version = 2 active.method = "mean" active.components = True + + del active + result2 = active[4:5, 1:2] print(result2) From 23cc9d53868996fa6bdb1f2652b52c50215b1461 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 8 Mar 2024 16:57:45 +0000 Subject: [PATCH 41/55] separate calls to Active --- tests/s3_exploratory/test_s3_reduction.py | 29 +++++++++++++---------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/tests/s3_exploratory/test_s3_reduction.py b/tests/s3_exploratory/test_s3_reduction.py index d8e90026..25219fc8 100644 --- a/tests/s3_exploratory/test_s3_reduction.py +++ b/tests/s3_exploratory/test_s3_reduction.py @@ -89,6 +89,20 @@ def test_data_path(): return Path(__file__).resolve().parent / 'test_data' +def _run_active(ncfile, s3=False): + if not s3: + active = Active(ncfile, "TREFHT") + else: + active = Active(ncfile, "TREFHT", "s3") + + active._version = 2 + active.method = "mean" + active.components = True + + result = active[4:5, 1:2] + return result + + def test_with_valid_netCDF_file(test_data_path): """ Test as above but with an actual netCDF4 file. @@ -100,14 +114,7 @@ def test_with_valid_netCDF_file(test_data_path): ncfile = str(test_data_path / "cesm2_native.nc") # run POSIX (local) Active - active = Active(ncfile, "TREFHT") - active._version = 2 - active.method = "mean" - active.components = True - - del active - - result2 = active[4:5, 1:2] + result2 = _run_active(ncfile) print(result2) # put data onto S3. then rm from local @@ -118,11 +125,7 @@ def test_with_valid_netCDF_file(test_data_path): print("S3 file uri", s3_testfile_uri) # run Active on s3 file - active2 = Active(s3_testfile_uri, "TREFHT", "s3") - active2._version = 2 - active2.method = "mean" - active2.components = True - result1 = active2[4:5, 1:2] + result1 = _run_active(s3_testfile_uri, s3=True) print(result1) # expect {'sum': array([[[2368.3232]]], dtype=float32), 'n': array([[[8]]])} From 86a22d894afbe611f57314a9b9bb6848df4955fa Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 8 Mar 2024 17:03:37 +0000 Subject: [PATCH 42/55] reduce test --- tests/s3_exploratory/test_s3_reduction.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/s3_exploratory/test_s3_reduction.py b/tests/s3_exploratory/test_s3_reduction.py index 25219fc8..2a2ba63f 100644 --- a/tests/s3_exploratory/test_s3_reduction.py +++ b/tests/s3_exploratory/test_s3_reduction.py @@ -114,8 +114,8 @@ def test_with_valid_netCDF_file(test_data_path): ncfile = str(test_data_path / "cesm2_native.nc") # run POSIX (local) Active - result2 = _run_active(ncfile) - print(result2) + #result2 = _run_active(ncfile) + #print(result2) # put data onto S3. then rm from local object = os.path.basename(ncfile) @@ -133,8 +133,8 @@ def test_with_valid_netCDF_file(test_data_path): assert_allclose(result1["sum"], np.array([[[2368.3232]]], dtype="float32"), rtol=1e-6) assert_array_equal(result1["n"], np.array([[[8]]])) - assert_allclose(result1["sum"], result2["sum"], rtol=1e-6) - assert_array_equal(result1["n"], result2["n"]) + #assert_allclose(result1["sum"], result2["sum"], rtol=1e-6) + #assert_array_equal(result1["n"], result2["n"]) def test_reductionist_reduce_chunk(): From 3d50db81d482cfb1537193aba89ec7de632f0356 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 8 Mar 2024 17:11:01 +0000 Subject: [PATCH 43/55] reduce test --- tests/s3_exploratory/test_s3_reduction.py | 36 ----------------------- 1 file changed, 36 deletions(-) diff --git a/tests/s3_exploratory/test_s3_reduction.py b/tests/s3_exploratory/test_s3_reduction.py index 2a2ba63f..247b23c3 100644 --- a/tests/s3_exploratory/test_s3_reduction.py +++ b/tests/s3_exploratory/test_s3_reduction.py @@ -47,42 +47,6 @@ def upload_to_s3(server, username, password, bucket, object, rfile): return os.path.join(bucket, object) -def test_Active(): - """ - Shows what we expect an active example test to achieve and provides "the right answer" - Done twice: POSIX active and Reductionist; we compare results. - - identical to tests/test_harness.py::testActive() - - """ - # make dummy data - s3_testfile, local_testfile = make_tempfile() - - # put s3 dummy data onto S3. then rm from local - object = os.path.basename(s3_testfile) - bucket_file = upload_to_s3(S3_URL, S3_ACCESS_KEY, S3_SECRET_KEY, - S3_BUCKET, object, s3_testfile) - os.remove(s3_testfile) - s3_testfile_uri = os.path.join("s3://", bucket_file) - print("S3 file uri", s3_testfile_uri) - - # run Active on s3 file - active = Active(s3_testfile_uri, "data", "s3") - active.method = "mean" - result1 = active[0:2, 4:6, 7:9] - print(result1) - - # run Active on local file - active = Active(local_testfile, "data") - active._version = 2 - active.method = "mean" - active.components = True - result2 = active[0:2, 4:6, 7:9] - print(result2) - - assert_array_equal(result1, result2["sum"]/result2["n"]) - - @pytest.fixture def test_data_path(): """Path to test data for CMOR fixes.""" From 2582d9670d3016da738229306f7d61ad636a177a Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 8 Mar 2024 17:17:04 +0000 Subject: [PATCH 44/55] don't run exploratory tests --- .github/workflows/test_s3_minio.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test_s3_minio.yml b/.github/workflows/test_s3_minio.yml index f033c17b..31076efc 100644 --- a/.github/workflows/test_s3_minio.yml +++ b/.github/workflows/test_s3_minio.yml @@ -66,10 +66,10 @@ jobs: - name: Run tests run: | pytest -n 2 - - name: Run S3 exploratory tests - run: | - pytest tests/s3_exploratory/test_s3_reduction.py --html=test-reports/s3-exploratory-report.html - if: always() + #- name: Run S3 exploratory tests + # run: | + # pytest tests/s3_exploratory/test_s3_reduction.py --html=test-reports/s3-exploratory-report.html + # if: always() - name: Install pytest-monitor run: pip install pytest-monitor - name: Run S3 performance tests From 662a69fb3f7cefee10c6105c794dcf5ac32a0166 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 8 Mar 2024 17:24:34 +0000 Subject: [PATCH 45/55] cleanup test --- tests/s3_exploratory/test_s3_arrange_files.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/s3_exploratory/test_s3_arrange_files.py b/tests/s3_exploratory/test_s3_arrange_files.py index 18b1015e..78ecd3fe 100644 --- a/tests/s3_exploratory/test_s3_arrange_files.py +++ b/tests/s3_exploratory/test_s3_arrange_files.py @@ -6,12 +6,8 @@ import s3fs import tempfile -from activestorage.active import Active from activestorage.dummy_data import make_vanilla_ncdata import activestorage.storage as st -from activestorage.reductionist import reduce_chunk as reductionist_reduce_chunk -from activestorage.netcdf_to_zarr import gen_json -from kerchunk.hdf import SingleHdf5ToZarr from numpy.testing import assert_allclose, assert_array_equal from pathlib import Path From 3ee87320400cd69df0fe3a6dddee29b0c835f6d6 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Sat, 9 Mar 2024 13:06:04 +0000 Subject: [PATCH 46/55] reinstate test; we need to figure out the segfaulting not brushit under the carpet --- tests/s3_exploratory/test_s3_reduction.py | 62 +++++++++++++++++------ 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/tests/s3_exploratory/test_s3_reduction.py b/tests/s3_exploratory/test_s3_reduction.py index 247b23c3..3546a528 100644 --- a/tests/s3_exploratory/test_s3_reduction.py +++ b/tests/s3_exploratory/test_s3_reduction.py @@ -47,24 +47,46 @@ def upload_to_s3(server, username, password, bucket, object, rfile): return os.path.join(bucket, object) -@pytest.fixture -def test_data_path(): - """Path to test data for CMOR fixes.""" - return Path(__file__).resolve().parent / 'test_data' +def test_Active(): + """ + Shows what we expect an active example test to achieve and provides "the right answer" + Done twice: POSIX active and Reductionist; we compare results. + + identical to tests/test_harness.py::testActive() + """ + # make dummy data + s3_testfile, local_testfile = make_tempfile() -def _run_active(ncfile, s3=False): - if not s3: - active = Active(ncfile, "TREFHT") - else: - active = Active(ncfile, "TREFHT", "s3") + # put s3 dummy data onto S3. then rm from local + object = os.path.basename(s3_testfile) + bucket_file = upload_to_s3(S3_URL, S3_ACCESS_KEY, S3_SECRET_KEY, + S3_BUCKET, object, s3_testfile) + os.remove(s3_testfile) + s3_testfile_uri = os.path.join("s3://", bucket_file) + print("S3 file uri", s3_testfile_uri) + + # run Active on s3 file + active = Active(s3_testfile_uri, "data", "s3") + active.method = "mean" + result1 = active[0:2, 4:6, 7:9] + print(result1) + # run Active on local file + active = Active(local_testfile, "data") active._version = 2 active.method = "mean" active.components = True + result2 = active[0:2, 4:6, 7:9] + print(result2) + + assert_array_equal(result1, result2["sum"]/result2["n"]) - result = active[4:5, 1:2] - return result + +@pytest.fixture +def test_data_path(): + """Path to test data for CMOR fixes.""" + return Path(__file__).resolve().parent / 'test_data' def test_with_valid_netCDF_file(test_data_path): @@ -78,8 +100,12 @@ def test_with_valid_netCDF_file(test_data_path): ncfile = str(test_data_path / "cesm2_native.nc") # run POSIX (local) Active - #result2 = _run_active(ncfile) - #print(result2) + active = Active(ncfile, "TREFHT") + active._version = 2 + active.method = "mean" + active.components = True + result2 = active[4:5, 1:2] + print(result2) # put data onto S3. then rm from local object = os.path.basename(ncfile) @@ -89,7 +115,11 @@ def test_with_valid_netCDF_file(test_data_path): print("S3 file uri", s3_testfile_uri) # run Active on s3 file - result1 = _run_active(s3_testfile_uri, s3=True) + active = Active(s3_testfile_uri, "TREFHT", "s3") + active._version = 2 + active.method = "mean" + active.components = True + result1 = active[4:5, 1:2] print(result1) # expect {'sum': array([[[2368.3232]]], dtype=float32), 'n': array([[[8]]])} @@ -97,8 +127,8 @@ def test_with_valid_netCDF_file(test_data_path): assert_allclose(result1["sum"], np.array([[[2368.3232]]], dtype="float32"), rtol=1e-6) assert_array_equal(result1["n"], np.array([[[8]]])) - #assert_allclose(result1["sum"], result2["sum"], rtol=1e-6) - #assert_array_equal(result1["n"], result2["n"]) + assert_allclose(result1["sum"], result2["sum"], rtol=1e-6) + assert_array_equal(result1["n"], result2["n"]) def test_reductionist_reduce_chunk(): From 48df2cdc6f1314eee7715e570e5600b92b5fb507 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Sat, 9 Mar 2024 13:09:44 +0000 Subject: [PATCH 47/55] cleanup and restore test --- tests/s3_exploratory/test_s3_arrange_files.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/s3_exploratory/test_s3_arrange_files.py b/tests/s3_exploratory/test_s3_arrange_files.py index 78ecd3fe..e0358c98 100644 --- a/tests/s3_exploratory/test_s3_arrange_files.py +++ b/tests/s3_exploratory/test_s3_arrange_files.py @@ -7,9 +7,8 @@ import tempfile from activestorage.dummy_data import make_vanilla_ncdata -import activestorage.storage as st +from kerchunk.hdf import SingleHdf5ToZarr -from numpy.testing import assert_allclose, assert_array_equal from pathlib import Path from config_minio import * From 30d435f6a6297be8b448771997c5a50b72620179 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Sat, 9 Mar 2024 13:19:24 +0000 Subject: [PATCH 48/55] reinstate full workflow --- .github/workflows/test_s3_minio.yml | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/.github/workflows/test_s3_minio.yml b/.github/workflows/test_s3_minio.yml index 31076efc..3de15a77 100644 --- a/.github/workflows/test_s3_minio.yml +++ b/.github/workflows/test_s3_minio.yml @@ -64,12 +64,14 @@ jobs: which python pip install -e . - name: Run tests + # run tests on one process to catch eventual SegFaults + # no SegFaults happen with n procs > 1 run: | - pytest -n 2 - #- name: Run S3 exploratory tests - # run: | - # pytest tests/s3_exploratory/test_s3_reduction.py --html=test-reports/s3-exploratory-report.html - # if: always() + pytest + - name: Run S3 exploratory tests + run: | + pytest tests/s3_exploratory/test_s3_reduction.py --html=test-reports/s3-exploratory-report.html + if: always() - name: Install pytest-monitor run: pip install pytest-monitor - name: Run S3 performance tests From 110b96a7b8861b17ca1defdefff31ab5465a0e86 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Sat, 9 Mar 2024 13:29:59 +0000 Subject: [PATCH 49/55] try create new variable --- activestorage/netcdf_to_zarr.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 1545bbfc..744cfaf9 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -74,15 +74,15 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): print("Looking only at a single Dataset", s3file_r_1[varname]) s3file_w_1.create_group(varname + " ") s3file_w_1[varname + " "][varname] = s3file_w_1[varname] - s3file = s3file_w_1[varname + " "] + s3file_1 = s3file_w_1[varname + " "] elif isinstance(s3file_r_1[varname], h5py.Group): print("Looking only at a single Group", s3file_r_1[varname]) - s3file = s3file_r_1[varname] + s3file_1 = s3file_r_1[varname] storage_options = {"key": S3_ACCESS_KEY, "secret": S3_SECRET_KEY, "client_kwargs": {'endpoint_url': S3_URL}} - h5chunks = SingleHdf5ToZarr(s3file, file_url, + h5chunks = SingleHdf5ToZarr(s3file_1, file_url, inline_threshold=0, storage_options=storage_options) From 8b9c890dd5a8846c7d91fe16436047441bb53444 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Sat, 9 Mar 2024 14:06:27 +0000 Subject: [PATCH 50/55] same var name again --- activestorage/netcdf_to_zarr.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 744cfaf9..1545bbfc 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -74,15 +74,15 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): print("Looking only at a single Dataset", s3file_r_1[varname]) s3file_w_1.create_group(varname + " ") s3file_w_1[varname + " "][varname] = s3file_w_1[varname] - s3file_1 = s3file_w_1[varname + " "] + s3file = s3file_w_1[varname + " "] elif isinstance(s3file_r_1[varname], h5py.Group): print("Looking only at a single Group", s3file_r_1[varname]) - s3file_1 = s3file_r_1[varname] + s3file = s3file_r_1[varname] storage_options = {"key": S3_ACCESS_KEY, "secret": S3_SECRET_KEY, "client_kwargs": {'endpoint_url': S3_URL}} - h5chunks = SingleHdf5ToZarr(s3file_1, file_url, + h5chunks = SingleHdf5ToZarr(s3file, file_url, inline_threshold=0, storage_options=storage_options) From 0079a8bc68027b61f50a173911125d91e2a0c6fb Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Sat, 9 Mar 2024 14:14:16 +0000 Subject: [PATCH 51/55] try returning after each conditional --- activestorage/netcdf_to_zarr.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 1545bbfc..9943c016 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -55,6 +55,19 @@ def _correct_compressor_and_filename(content, varname, bryan_bucket=False): return new_content +def _return_zcomponents(content, varname): + """Return zarr array and attributes.""" + # account for both Group and Dataset + try: + zarray = ujson.loads(content['refs'][f"{varname}/.zarray"]) + zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"]) + except KeyError: + zarray = ujson.loads(content['refs'][f"{varname} /{varname}/.zarray"]) + zattrs = ujson.loads(content['refs'][f"{varname} /{varname}/.zattrs"]) + + return zarray, zattrs + + def gen_json(file_url, varname, outf, storage_type, storage_options): """Generate a json file that contains the kerchunk-ed data for Zarr.""" # S3 configuration presets @@ -97,6 +110,8 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): varname, bryan_bucket=bryan_bucket) f.write(ujson.dumps(content).encode()) + zarray, zattrs = _return_zcomponents(content, varname) + return outf, zarray, zattrs # S3 passed-in configuration elif storage_type == "s3" and storage_options is not None: @@ -157,6 +172,10 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): f.write(ujson.dumps(content).encode()) tk3 = time.time() print("Time to Kerchunk and write JSON file", tk3 - tk2) + + zarray, zattrs = _return_zcomponents(content, varname) + return outf, zarray, zattrs + # not S3 else: fs = fsspec.filesystem('') @@ -179,15 +198,8 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): content = h5chunks.translate() f.write(ujson.dumps(content).encode()) - # account for both Group and Dataset - try: - zarray = ujson.loads(content['refs'][f"{varname}/.zarray"]) - zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"]) - except KeyError: - zarray = ujson.loads(content['refs'][f"{varname} /{varname}/.zarray"]) - zattrs = ujson.loads(content['refs'][f"{varname} /{varname}/.zattrs"]) - - return outf, zarray, zattrs + zarray, zattrs = _return_zcomponents(content, varname) + return outf, zarray, zattrs def open_zarr_group(out_json, varname): From 96c9ba4738a61c352dd16cf0ec16ce09c2c29f66 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Sat, 9 Mar 2024 14:23:20 +0000 Subject: [PATCH 52/55] comment out Dataset/group selection when preset S3 options are used --- activestorage/netcdf_to_zarr.py | 37 +++++++++++++++++---------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 9943c016..a630e038 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -79,25 +79,26 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): default_cache_type="first" # best for HDF5 ) fs2 = fsspec.filesystem('') - with fs.open(file_url, 'rb') as s3file_o_1: - # see below for reasoning behind this - s3file_r_1 = h5py.File(s3file_o_1, mode="r") - s3file_w_1 = h5py.File(s3file_o_1, mode="w") - if isinstance(s3file_r_1[varname], h5py.Dataset): - print("Looking only at a single Dataset", s3file_r_1[varname]) - s3file_w_1.create_group(varname + " ") - s3file_w_1[varname + " "][varname] = s3file_w_1[varname] - s3file = s3file_w_1[varname + " "] - elif isinstance(s3file_r_1[varname], h5py.Group): - print("Looking only at a single Group", s3file_r_1[varname]) - s3file = s3file_r_1[varname] - - storage_options = {"key": S3_ACCESS_KEY, - "secret": S3_SECRET_KEY, - "client_kwargs": {'endpoint_url': S3_URL}} + with fs.open(file_url, 'rb') as s3file: + # with fs.open(file_url, 'rb') as s3file_o_1: + # this block, together with the block down, is causing + # SegFaults; h5py backend very brittle: see below for reasoning behind this + # s3file_r_1 = h5py.File(s3file_o_1, mode="r") + # s3file_w_1 = h5py.File(s3file_o_1, mode="w") + # if isinstance(s3file_r_1[varname], h5py.Dataset): + # print("Looking only at a single Dataset", s3file_r_1[varname]) + # s3file_w_1.create_group(varname + " ") + # s3file_w_1[varname + " "][varname] = s3file_w_1[varname] + # s3file = s3file_w_1[varname + " "] + # elif isinstance(s3file_r_1[varname], h5py.Group): + # print("Looking only at a single Group", s3file_r_1[varname]) + # s3file = s3file_r_1[varname] + # storage_options = {"key": S3_ACCESS_KEY, + # "secret": S3_SECRET_KEY, + # "client_kwargs": {'endpoint_url': S3_URL}} h5chunks = SingleHdf5ToZarr(s3file, file_url, - inline_threshold=0, - storage_options=storage_options) + inline_threshold=0) + # storage_options=storage_options) # TODO absolute crap, this needs to go bryan_bucket = False From 03c11b34e706a9d90fa9b9c1c6ef7d85b18f7621 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Sun, 10 Mar 2024 14:51:11 +0000 Subject: [PATCH 53/55] add note --- activestorage/netcdf_to_zarr.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index a630e038..3e8c24d7 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -80,9 +80,10 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): ) fs2 = fsspec.filesystem('') with fs.open(file_url, 'rb') as s3file: - # with fs.open(file_url, 'rb') as s3file_o_1: - # this block, together with the block down, is causing - # SegFaults; h5py backend very brittle: see below for reasoning behind this + # this block allows for Dataset/Group selection but is causing + # SegFaults in S3/Minio tests; h5py backend very brittle: see below for reasoning behind this + # since this case is only for the S3/Minio tests, it's OK to not have it, test files are small + # with fs.open(file_url, 'rb') as s3file_o_1: # -> best to have unique name # s3file_r_1 = h5py.File(s3file_o_1, mode="r") # s3file_w_1 = h5py.File(s3file_o_1, mode="w") # if isinstance(s3file_r_1[varname], h5py.Dataset): From d064319e2924dd1659a7363c0f1b3447cfd5b573 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 18 Mar 2024 12:55:32 +0000 Subject: [PATCH 54/55] comment out heavy printing to stdout --- activestorage/reductionist.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/activestorage/reductionist.py b/activestorage/reductionist.py index 654b54ba..5073a6a1 100644 --- a/activestorage/reductionist.py +++ b/activestorage/reductionist.py @@ -53,7 +53,9 @@ def reduce_chunk(session, server, source, bucket, object, """ request_data = build_request_data(source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection) - print("Reductionist request data dictionary:", request_data) + # This is very useful to see what is being sent to Reductionist + # print("Reductionist request data dictionary:", request_data) + # print("Chunk size", request_data["size"]) api_operation = "sum" if operation == "mean" else operation or "select" url = f'{server}/v1/{api_operation}/' response = request(session, url, request_data) From 730f6ecec4a2ecbd31354495662e42d045a2ef2f Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Mon, 18 Mar 2024 12:57:30 +0000 Subject: [PATCH 55/55] turn off some printing to stdout --- activestorage/active.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/activestorage/active.py b/activestorage/active.py index 973d2255..7fd89a12 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -81,7 +81,7 @@ def __init__( uri, ncvar, storage_type=None, - max_threads=100, + max_threads=150, storage_options=None, active_storage_url=None ): @@ -469,6 +469,7 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, Note the need to use counts for some methods """ + t1c = time.time() coord = '.'.join([str(c) for c in chunk_coords]) key = f"{self.ncvar}/{coord}" try: @@ -478,7 +479,7 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, # S3: pass in pre-configured storage options (credentials) if self.storage_type == "s3": - print("S3 rfile is:", rfile) + # print("S3 rfile is:", rfile) parsed_url = urllib.parse.urlparse(rfile) bucket = parsed_url.netloc object = parsed_url.path @@ -490,8 +491,8 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, if bucket == "": bucket = os.path.dirname(object) object = os.path.basename(object) - print("S3 bucket:", bucket) - print("S3 file:", object) + # print("S3 bucket:", bucket) + # print("S3 file:", object) if self.storage_options is None: tmp, count = reductionist.reduce_chunk(session, S3_ACTIVE_STORAGE_URL, @@ -505,15 +506,13 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, operation=self._method) else: # special case for "anon=True" buckets that work only with e.g. - # fs = s3fs.S3FileSystem(anon=True, client_kwargs={'endpoint_url': S3_URL}) + # "fs = s3fs.S3FileSystem(anon=True, client_kwargs={'endpoint_url': S3_URL})" # where file uri = bucketX/fileY.mc - print("S3 Storage options:", self.storage_options) + # print("S3 Storage options:", self.storage_options) if self.storage_options.get("anon", None) == True: bucket = os.path.dirname(parsed_url.path) # bucketX object = os.path.basename(parsed_url.path) # fileY print("S3 anon=True Bucket and File:", bucket, object) - t2 = time.time() - print("Total time before going in Reductionist", t2 - t1) tmp, count = reductionist.reduce_chunk(session, self.active_storage_url, self._get_endpoint_url(), @@ -533,7 +532,8 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, missing, self.zds._dtype, self.zds._chunks, self.zds._order, chunk_selection, method=self.method) - + t2c = time.time() + print("Chunk processing time via _process_chunk()", t2c - t1c) if self.method is not None: return tmp, count else: