From e77a5a7f8a3d8bb3fe58008ea9390e6d7911ba6c Mon Sep 17 00:00:00 2001 From: rileykk Date: Thu, 16 Feb 2023 13:16:06 -0800 Subject: [PATCH 01/25] Setup for passing desired projection to datastore proxy --- data-access/nexustiles/dao/CassandraProxy.py | 3 ++- data-access/nexustiles/nexustiles.py | 14 ++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/CassandraProxy.py index 4ce684a8..ee5a8bbe 100644 --- a/data-access/nexustiles/dao/CassandraProxy.py +++ b/data-access/nexustiles/dao/CassandraProxy.py @@ -31,6 +31,7 @@ logger = logging.getLogger(__name__) + class NexusTileData(Model): __table_name__ = 'sea_surface_temp' tile_id = columns.UUID(primary_key=True) @@ -53,7 +54,7 @@ def get_raw_data_array(self): return from_shaped_array(the_tile_data.variable_data) - def get_lat_lon_time_data_meta(self): + def get_lat_lon_time_data_meta(self, projection='grid'): """ Retrieve data from data store and metadata from metadata store for this tile. For gridded tiles, the tile shape of the data diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index a3aa61e9..d73bb975 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -56,7 +56,7 @@ def fetch_data_for_func(*args, **kwargs): if ('fetch_data' in kwargs and kwargs['fetch_data']) or ('fetch_data' not in kwargs and default_fetch): if len(tiles) > 0: cassandra_start = datetime.now() - args[0].fetch_data_for_tiles(*tiles) + args[0].fetch_data_for_tiles(*tiles, desired_projection=args[0].desired_projection) cassandra_duration += (datetime.now() - cassandra_start).total_seconds() if 'metrics_callback' in kwargs and kwargs['metrics_callback'] is not None: @@ -79,13 +79,15 @@ class NexusTileServiceException(Exception): class NexusTileService(object): - def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None): + def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None, desired_projection='grid'): self._datastore = None self._metadatastore = None self._config = configparser.RawConfigParser() self._config.read(NexusTileService._get_config_files('config/datastores.ini')) + self.desired_projection = desired_projection + if config: self.override_config(config) @@ -450,7 +452,7 @@ def get_tile_count(self, ds, bounding_polygon=None, start_time=0, end_time=-1, m """ return self._metadatastore.get_tile_count(ds, bounding_polygon, start_time, end_time, metadata, **kwargs) - def fetch_data_for_tiles(self, *tiles): + def fetch_data_for_tiles(self, *tiles, **kwargs): nexus_tile_ids = set([tile.tile_id for tile in tiles]) matched_tile_data = self._datastore.fetch_nexus_tiles(*nexus_tile_ids) @@ -461,8 +463,12 @@ def fetch_data_for_tiles(self, *tiles): if len(missing_data) > 0: raise Exception("Missing data for tile_id(s) %s." % missing_data) + desired_projection = kwargs['desired_projection'] if 'desired_projection' in kwargs else 'grid' + for a_tile in tiles: - lats, lons, times, data, meta, is_multi_var = tile_data_by_id[a_tile.tile_id].get_lat_lon_time_data_meta() + lats, lons, times, data, meta, is_multi_var = tile_data_by_id[a_tile.tile_id].get_lat_lon_time_data_meta( + projection=desired_projection + ) a_tile.latitudes = lats a_tile.longitudes = lons From 4a25544f88b4fea74d7d6e024f4f20cb29f9cee0 Mon Sep 17 00:00:00 2001 From: rileykk Date: Fri, 17 Feb 2023 13:23:18 -0800 Subject: [PATCH 02/25] Setup for passing desired projection to datastore proxy --- .../webservice/algorithms/NexusCalcHandler.py | 5 +- .../nexustiles/dao/CassandraSwathProxy.py | 328 ++++++++++++++++++ data-access/nexustiles/model/nexusmodel.py | 1 + data-access/nexustiles/nexustiles.py | 15 +- 4 files changed, 346 insertions(+), 3 deletions(-) create mode 100644 data-access/nexustiles/dao/CassandraSwathProxy.py diff --git a/analysis/webservice/algorithms/NexusCalcHandler.py b/analysis/webservice/algorithms/NexusCalcHandler.py index 13edd0c8..2ba6d893 100644 --- a/analysis/webservice/algorithms/NexusCalcHandler.py +++ b/analysis/webservice/algorithms/NexusCalcHandler.py @@ -39,7 +39,10 @@ def validate(cls): def __init__(self, tile_service_factory, **kwargs): self._tile_service_factory = tile_service_factory - self._tile_service = tile_service_factory() + if 'desired_projection' in kwargs: + self._tile_service = tile_service_factory(desired_projection=kwargs['desired_projection']) + else: + self._tile_service = tile_service_factory() def _get_tile_service(self): return self._tile_service diff --git a/data-access/nexustiles/dao/CassandraSwathProxy.py b/data-access/nexustiles/dao/CassandraSwathProxy.py new file mode 100644 index 00000000..7c0581f0 --- /dev/null +++ b/data-access/nexustiles/dao/CassandraSwathProxy.py @@ -0,0 +1,328 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import uuid +from configparser import NoOptionError + +import nexusproto.DataTile_pb2 as nexusproto +import numpy as np +from cassandra.auth import PlainTextAuthProvider +from cassandra.cqlengine import columns, connection, CQLEngineException +from cassandra.cluster import NoHostAvailable +from cassandra.cqlengine.models import Model +from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy +from multiprocessing.synchronize import Lock +from nexusproto.serialization import from_shaped_array + +INIT_LOCK = Lock(ctx=None) + +logger = logging.getLogger(__name__) + + +class NexusTileData(Model): + __table_name__ = 'sea_surface_temp' + tile_id = columns.UUID(primary_key=True) + tile_blob = columns.Blob() + + __nexus_tile = None + + def _get_nexus_tile(self): + if self.__nexus_tile is None: + self.__nexus_tile = nexusproto.TileData.FromString(self.tile_blob) + + return self.__nexus_tile + + def get_raw_data_array(self): + + nexus_tile = self._get_nexus_tile() + the_tile_type = nexus_tile.tile.WhichOneof("tile_type") + + the_tile_data = getattr(nexus_tile.tile, the_tile_type) + + return from_shaped_array(the_tile_data.variable_data) + + def get_lat_lon_time_data_meta(self, projection='grid'): + """ + Retrieve data from data store and metadata from metadata store + for this tile. For swath tiles, the tile shape of the data + will match the input shape. For example, if the input was a + 30x30 tile, all variables will also be 30x30. However, if the + tile is a gridded tile, the lat and lon arrays will be reflected + into 2 dimensions to reflect how the equivalent data would be + represented in swath format. + + Multi-variable tile will also include an extra dimension in the + data array. For example, a 30 x 30 x 30 array would be + transformed to N x 30 x 30 x 30 where N is the number of + variables in this tile. + + latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data, is_multi_var + + :return: latitude data + :return: longitude data + :return: time data + :return: data + :return: meta data dictionary + :return: boolean flag, True if this tile has more than one variable + """ + is_multi_var = False + + if self._get_nexus_tile().HasField('grid_tile'): + grid_tile = self._get_nexus_tile().grid_tile + + grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_tile.variable_data)) + latitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.latitude)) + longitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.longitude)) + + reflected_lon_array = np.broadcast_to(longitude_data, (len(latitude_data), len(longitude_data))) + reflected_lat_array = np.broadcast_to(latitude_data, (len(longitude_data), len(latitude_data))) + reflected_lat_array = np.transpose(reflected_lat_array) + + if len(grid_tile_data.shape) == 2: + grid_tile_data = grid_tile_data[np.newaxis, :] + + # Extract the meta data + meta_data = {} + for meta_data_obj in grid_tile.meta_data: + name = meta_data_obj.name + meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) + if len(meta_array.shape) == 2: + meta_array = meta_array[np.newaxis, :] + meta_data[name] = meta_array + + return reflected_lat_array, reflected_lon_array, np.array([grid_tile.time]), grid_tile_data, meta_data, is_multi_var + elif self._get_nexus_tile().HasField('swath_tile'): + swath_tile = self._get_nexus_tile().swath_tile + + latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)) + longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)) + time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)) + + # Simplify the tile if the time dimension is the same value repeated + if np.all(time_data == np.min(time_data)): + time_data = np.array([np.min(time_data)]) + + swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data)) + tile_data = swath_tile_data + + # Extract the metadata + meta_data = {} + for meta_data_obj in swath_tile.meta_data: + name = meta_data_obj.name + actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) + # reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape) + meta_data[name] = actual_meta_array + + return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var + # TODO: Do we use this? + # elif self._get_nexus_tile().HasField('time_series_tile'): + # time_series_tile = self._get_nexus_tile().time_series_tile + # + # time_series_tile_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.variable_data)) + # time_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.time)).reshape(-1) + # latitude_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.latitude)) + # longitude_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.longitude)) + # + # reshaped_array = np.ma.masked_all((len(time_data), len(latitude_data), len(longitude_data))) + # idx = np.arange(len(latitude_data)) + # reshaped_array[:, idx, idx] = time_series_tile_data + # tile_data = reshaped_array + # # Extract the meta data + # meta_data = {} + # for meta_data_obj in time_series_tile.meta_data: + # name = meta_data_obj.name + # meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) + # + # reshaped_meta_array = np.ma.masked_all((len(time_data), len(latitude_data), len(longitude_data))) + # idx = np.arange(len(latitude_data)) + # reshaped_meta_array[:, idx, idx] = meta_array + # + # meta_data[name] = reshaped_meta_array + # + # return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var + elif self._get_nexus_tile().HasField('swath_multi_variable_tile'): + swath_tile = self._get_nexus_tile().swath_multi_variable_tile + is_multi_var = True + + latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)) + longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)) + time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)) + + # Simplify the tile if the time dimension is the same value repeated + if np.all(time_data == np.min(time_data)): + time_data = np.array([np.min(time_data)]) + + swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data)) + swath_tile_data = np.moveaxis(swath_tile_data, -1, 0) + + tile_data = [] + + for variable_array in swath_tile_data: + tile_data.append(variable_array) + + # Extract the metadata + + meta_data = {} + for meta_data_obj in swath_tile.meta_data: + name = meta_data_obj.name + actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) + # reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape) + meta_data[name] = actual_meta_array + + return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var + elif self._get_nexus_tile().HasField('grid_multi_variable_tile'): + grid_multi_variable_tile = self._get_nexus_tile().grid_multi_variable_tile + is_multi_var = True + + grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.variable_data)) + latitude_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.latitude)) + longitude_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.longitude)) + + reflected_lon_array = np.broadcast_to(longitude_data, (len(latitude_data), len(longitude_data))) + reflected_lat_array = np.broadcast_to(latitude_data, (len(longitude_data), len(latitude_data))) + reflected_lat_array = np.transpose(reflected_lat_array) + + # If there are 3 dimensions, that means the time dimension + # was squeezed. Add back in + if len(grid_tile_data.shape) == 3: + grid_tile_data = np.expand_dims(grid_tile_data, axis=1) + # If there are 4 dimensions, that means the time dimension + # is present. Move the multivar dimension. + if len(grid_tile_data.shape) == 4: + grid_tile_data = np.moveaxis(grid_tile_data, -1, 0) + + # Extract the meta data + meta_data = {} + for meta_data_obj in grid_multi_variable_tile.meta_data: + name = meta_data_obj.name + meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) + if len(meta_array.shape) == 2: + meta_array = meta_array[np.newaxis, :] + meta_data[name] = meta_array + + return reflected_lat_array, reflected_lon_array, np.array([grid_multi_variable_tile.time]), grid_tile_data, meta_data, is_multi_var + else: + raise NotImplementedError("Only supports grid_tile, swath_tile, swath_multi_variable_tile, and time_series_tile") + + @staticmethod + def _to_standard_index(data_array, desired_shape, is_multi_var=False): + """ + Transform swath data to a standard format where data runs along + diagonal of ND matrix and the non-diagonal data points are + masked + + :param data_array: The data array to be transformed + :param desired_shape: The desired shape of the resulting array + :param is_multi_var: True if this is a multi-variable tile + :type data_array: np.array + :type desired_shape: tuple + :type is_multi_var: bool + :return: Reshaped array + :rtype: np.array + """ + + if desired_shape[0] == 1: + reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2])) + row, col = np.indices(data_array.shape) + + reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[ + row.flat, col.flat] + reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[ + row.flat, col.flat] + reshaped_array = reshaped_array[np.newaxis, :] + elif is_multi_var == True: + # Break the array up by variable. Translate shape from + # len(times) x len(latitudes) x len(longitudes) x num_vars, + # to + # num_vars x len(times) x len(latitudes) x len(longitudes) + reshaped_data_array = np.moveaxis(data_array, -1, 0) + reshaped_array = [] + + for variable_data_array in reshaped_data_array: + variable_reshaped_array = np.ma.masked_all(desired_shape) + row, col = np.indices(variable_data_array.shape) + + variable_reshaped_array[np.diag_indices(desired_shape[1], len(variable_reshaped_array.shape))] = variable_data_array[ + row.flat, col.flat] + variable_reshaped_array.mask[np.diag_indices(desired_shape[1], len(variable_reshaped_array.shape))] = variable_data_array.mask[ + row.flat, col.flat] + reshaped_array.append(variable_reshaped_array) + else: + reshaped_array = np.ma.masked_all(desired_shape) + row, col = np.indices(data_array.shape) + + reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[ + row.flat, col.flat] + reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[ + row.flat, col.flat] + + return reshaped_array + + +class CassandraSwathProxy(object): + def __init__(self, config): + self.config = config + self.__cass_url = config.get("cassandra", "host") + self.__cass_username = config.get("cassandra", "username") + self.__cass_password = config.get("cassandra", "password") + self.__cass_keyspace = config.get("cassandra", "keyspace") + self.__cass_local_DC = config.get("cassandra", "local_datacenter") + self.__cass_protocol_version = config.getint("cassandra", "protocol_version") + self.__cass_dc_policy = config.get("cassandra", "dc_policy") + + try: + self.__cass_port = config.getint("cassandra", "port") + except NoOptionError: + self.__cass_port = 9042 + + with INIT_LOCK: + try: + connection.get_cluster() + except CQLEngineException: + self.__open() + + def __open(self): + if self.__cass_dc_policy == 'DCAwareRoundRobinPolicy': + dc_policy = DCAwareRoundRobinPolicy(self.__cass_local_DC) + token_policy = TokenAwarePolicy(dc_policy) + elif self.__cass_dc_policy == 'WhiteListRoundRobinPolicy': + token_policy = WhiteListRoundRobinPolicy([self.__cass_url]) + + if self.__cass_username and self.__cass_password: + auth_provider = PlainTextAuthProvider(username=self.__cass_username, password=self.__cass_password) + else: + auth_provider = None + try: + connection.setup( + [host for host in self.__cass_url.split(',')], self.__cass_keyspace, + protocol_version=self.__cass_protocol_version, load_balancing_policy=token_policy, + port=self.__cass_port, + auth_provider=auth_provider + ) + except NoHostAvailable as e: + logger.error("Cassandra is not accessible, SDAP will not server local datasets", e) + + def fetch_nexus_tiles(self, *tile_ids): + tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if + (isinstance(tile_id, str) or isinstance(tile_id, str))] + + res = [] + for tile_id in tile_ids: + filterResults = NexusTileData.objects.filter(tile_id=tile_id) + if len(filterResults) > 0: + res.append(filterResults[0]) + + return res diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py index f5c9df64..51793c9b 100644 --- a/data-access/nexustiles/model/nexusmodel.py +++ b/data-access/nexustiles/model/nexusmodel.py @@ -91,6 +91,7 @@ class Tile(object): data: np.array = None is_multi: bool = None meta_data: dict = None + projection: str = 'grid' def __str__(self): return str(self.get_summary()) diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index d73bb975..9efc08dc 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -17,6 +17,7 @@ import logging import sys import json +import traceback from datetime import datetime from functools import wraps, reduce @@ -27,6 +28,7 @@ from shapely.geometry import MultiPolygon, box from .dao import CassandraProxy +from .dao import CassandraSwathProxy from .dao import DynamoProxy from .dao import S3Proxy from .dao import SolrProxy @@ -40,7 +42,7 @@ level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout) -logger = logging.getLogger("testing") +logger = logging.getLogger(__name__) def tile_data(default_fetch=True): @@ -94,7 +96,10 @@ def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None, de if not skipDatastore: datastore = self._config.get("datastore", "store") if datastore == "cassandra": - self._datastore = CassandraProxy.CassandraProxy(self._config) + if desired_projection == "grid": + self._datastore = CassandraProxy.CassandraProxy(self._config) + else: + self._datastore = CassandraSwathProxy.CassandraSwathProxy(self._config) elif datastore == "s3": self._datastore = S3Proxy.S3Proxy(self._config) elif datastore == "dynamo": @@ -109,6 +114,11 @@ def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None, de elif metadatastore == "elasticsearch": self._metadatastore = ElasticsearchProxy.ElasticsearchProxy(self._config) + logger.info(f'Created new NexusTileService with data store {type(self._datastore)} and metadata ' + f'store {type(self._metadatastore)}') + # logger.info('Traceback for debugging...') + # logger.info(''.join(traceback.format_stack())) + def override_config(self, config): for section in config.sections(): if self._config.has_section(section): # only override preexisting section, ignores the other @@ -476,6 +486,7 @@ def fetch_data_for_tiles(self, *tiles, **kwargs): a_tile.data = data a_tile.meta_data = meta a_tile.is_multi = is_multi_var + a_tile.projection = desired_projection del (tile_data_by_id[a_tile.tile_id]) From f730f7c4d3fd0bc703e8e1d696a0462f40a165a3 Mon Sep 17 00:00:00 2001 From: rileykk Date: Fri, 17 Feb 2023 13:42:54 -0800 Subject: [PATCH 03/25] Fixed error with masking shape with swath projected grid tiles --- data-access/nexustiles/nexustiles.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index 9efc08dc..7f86675f 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -371,9 +371,14 @@ def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles): tile.longitudes = ma.masked_outside(tile.longitudes, min_lon, max_lon) # Or together the masks of the individual arrays to create the new mask - data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ - | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ - | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] + if self.desired_projection == 'grid': + data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ + | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ + | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] + else: + data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ + | ma.getmaskarray(tile.latitudes)[np.newaxis, :, :] \ + | ma.getmaskarray(tile.longitudes)[np.newaxis, :, :] # If this is multi-var, need to mask each variable separately. if tile.is_multi: From 61f59f8ad2377acbda991df5650a0c0916acb2d4 Mon Sep 17 00:00:00 2001 From: rileykk Date: Fri, 17 Feb 2023 13:45:53 -0800 Subject: [PATCH 04/25] Removed unused method --- .../nexustiles/dao/CassandraSwathProxy.py | 56 ------------------- 1 file changed, 56 deletions(-) diff --git a/data-access/nexustiles/dao/CassandraSwathProxy.py b/data-access/nexustiles/dao/CassandraSwathProxy.py index 7c0581f0..4e295b17 100644 --- a/data-access/nexustiles/dao/CassandraSwathProxy.py +++ b/data-access/nexustiles/dao/CassandraSwathProxy.py @@ -123,7 +123,6 @@ def get_lat_lon_time_data_meta(self, projection='grid'): for meta_data_obj in swath_tile.meta_data: name = meta_data_obj.name actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) - # reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape) meta_data[name] = actual_meta_array return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var @@ -179,7 +178,6 @@ def get_lat_lon_time_data_meta(self, projection='grid'): for meta_data_obj in swath_tile.meta_data: name = meta_data_obj.name actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) - # reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape) meta_data[name] = actual_meta_array return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var @@ -217,60 +215,6 @@ def get_lat_lon_time_data_meta(self, projection='grid'): else: raise NotImplementedError("Only supports grid_tile, swath_tile, swath_multi_variable_tile, and time_series_tile") - @staticmethod - def _to_standard_index(data_array, desired_shape, is_multi_var=False): - """ - Transform swath data to a standard format where data runs along - diagonal of ND matrix and the non-diagonal data points are - masked - - :param data_array: The data array to be transformed - :param desired_shape: The desired shape of the resulting array - :param is_multi_var: True if this is a multi-variable tile - :type data_array: np.array - :type desired_shape: tuple - :type is_multi_var: bool - :return: Reshaped array - :rtype: np.array - """ - - if desired_shape[0] == 1: - reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2])) - row, col = np.indices(data_array.shape) - - reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[ - row.flat, col.flat] - reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[ - row.flat, col.flat] - reshaped_array = reshaped_array[np.newaxis, :] - elif is_multi_var == True: - # Break the array up by variable. Translate shape from - # len(times) x len(latitudes) x len(longitudes) x num_vars, - # to - # num_vars x len(times) x len(latitudes) x len(longitudes) - reshaped_data_array = np.moveaxis(data_array, -1, 0) - reshaped_array = [] - - for variable_data_array in reshaped_data_array: - variable_reshaped_array = np.ma.masked_all(desired_shape) - row, col = np.indices(variable_data_array.shape) - - variable_reshaped_array[np.diag_indices(desired_shape[1], len(variable_reshaped_array.shape))] = variable_data_array[ - row.flat, col.flat] - variable_reshaped_array.mask[np.diag_indices(desired_shape[1], len(variable_reshaped_array.shape))] = variable_data_array.mask[ - row.flat, col.flat] - reshaped_array.append(variable_reshaped_array) - else: - reshaped_array = np.ma.masked_all(desired_shape) - row, col = np.indices(data_array.shape) - - reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[ - row.flat, col.flat] - reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[ - row.flat, col.flat] - - return reshaped_array - class CassandraSwathProxy(object): def __init__(self, config): From 9b4998bd8d2746226115fd973a1960a0548e55ec Mon Sep 17 00:00:00 2001 From: rileykk Date: Fri, 17 Feb 2023 13:46:48 -0800 Subject: [PATCH 05/25] imports --- data-access/nexustiles/nexustiles.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index 7f86675f..0a42423c 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -14,10 +14,9 @@ # limitations under the License. import configparser +import json import logging import sys -import json -import traceback from datetime import datetime from functools import wraps, reduce @@ -30,10 +29,9 @@ from .dao import CassandraProxy from .dao import CassandraSwathProxy from .dao import DynamoProxy +from .dao import ElasticsearchProxy from .dao import S3Proxy from .dao import SolrProxy -from .dao import ElasticsearchProxy - from .model.nexusmodel import Tile, BBox, TileStats, TileVariable EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) From 9574aa43034b16bdd0d2803459bb7d2d8dca0bc0 Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 21 Feb 2023 11:04:43 -0800 Subject: [PATCH 06/25] tile masking --- data-access/nexustiles/nexustiles.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index 0a42423c..65a14754 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -374,9 +374,14 @@ def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles): | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] else: - data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ - | ma.getmaskarray(tile.latitudes)[np.newaxis, :, :] \ - | ma.getmaskarray(tile.longitudes)[np.newaxis, :, :] + if len(tile.times.shape) == 1: + data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ + | ma.getmaskarray(tile.latitudes)[np.newaxis, :, :] \ + | ma.getmaskarray(tile.longitudes)[np.newaxis, :, :] + else: + data_mask = ma.getmaskarray(tile.times) \ + | ma.getmaskarray(tile.latitudes) \ + | ma.getmaskarray(tile.longitudes) # If this is multi-var, need to mask each variable separately. if tile.is_multi: From bba713f9506e056782d86e46bba976338e6099d8 Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 21 Feb 2023 11:29:15 -0800 Subject: [PATCH 07/25] Fix for swath masking --- data-access/nexustiles/dao/CassandraSwathProxy.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/data-access/nexustiles/dao/CassandraSwathProxy.py b/data-access/nexustiles/dao/CassandraSwathProxy.py index 4e295b17..40f665cb 100644 --- a/data-access/nexustiles/dao/CassandraSwathProxy.py +++ b/data-access/nexustiles/dao/CassandraSwathProxy.py @@ -112,8 +112,8 @@ def get_lat_lon_time_data_meta(self, projection='grid'): time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)) # Simplify the tile if the time dimension is the same value repeated - if np.all(time_data == np.min(time_data)): - time_data = np.array([np.min(time_data)]) + # if np.all(time_data == np.min(time_data)): + # time_data = np.array([np.min(time_data)]) swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data)) tile_data = swath_tile_data @@ -161,8 +161,8 @@ def get_lat_lon_time_data_meta(self, projection='grid'): time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)) # Simplify the tile if the time dimension is the same value repeated - if np.all(time_data == np.min(time_data)): - time_data = np.array([np.min(time_data)]) + # if np.all(time_data == np.min(time_data)): + # time_data = np.array([np.min(time_data)]) swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data)) swath_tile_data = np.moveaxis(swath_tile_data, -1, 0) From 66cf144ecbb703345b7f5fbdb4397e3f1aedb564 Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 21 Feb 2023 12:04:47 -0800 Subject: [PATCH 08/25] kwargs for basedomshandler --- analysis/webservice/algorithms/doms/BaseDomsHandler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py index 904732b7..76a563d2 100644 --- a/analysis/webservice/algorithms/doms/BaseDomsHandler.py +++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py @@ -48,8 +48,8 @@ class BaseDomsQueryCalcHandler(NexusCalcHandler): - def __init__(self, tile_service_factory): - NexusCalcHandler.__init__(self, tile_service_factory) + def __init__(self, tile_service_factory, **kwargs): + NexusCalcHandler.__init__(self, tile_service_factory, **kwargs) def getDataSourceByName(self, source): for s in config.ENDPOINTS: From dde792b5b484de7c2351c798fd182b598c160c4b Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 21 Feb 2023 12:10:25 -0800 Subject: [PATCH 09/25] Moved tile data mask or-ing to function --- data-access/nexustiles/nexustiles.py | 43 ++++++++++++++-------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index 65a14754..5617ad9b 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -362,26 +362,31 @@ def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_tim bounds = self._metadatastore.find_distinct_bounding_boxes_in_polygon(bounding_polygon, ds, start_time, end_time) return [box(*b) for b in bounds] + def _data_mask_logical_or(self, tile): + # Or together the masks of the individual arrays to create the new mask + if self.desired_projection == 'grid': + data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ + | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ + | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] + else: + if len(tile.times.shape) == 1: + data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ + | ma.getmaskarray(tile.latitudes)[np.newaxis, :, :] \ + | ma.getmaskarray(tile.longitudes)[np.newaxis, :, :] + else: + data_mask = ma.getmaskarray(tile.times) \ + | ma.getmaskarray(tile.latitudes) \ + | ma.getmaskarray(tile.longitudes) + + return data_mask + def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles): for tile in tiles: tile.latitudes = ma.masked_outside(tile.latitudes, min_lat, max_lat) tile.longitudes = ma.masked_outside(tile.longitudes, min_lon, max_lon) - # Or together the masks of the individual arrays to create the new mask - if self.desired_projection == 'grid': - data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ - | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ - | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] - else: - if len(tile.times.shape) == 1: - data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ - | ma.getmaskarray(tile.latitudes)[np.newaxis, :, :] \ - | ma.getmaskarray(tile.longitudes)[np.newaxis, :, :] - else: - data_mask = ma.getmaskarray(tile.times) \ - | ma.getmaskarray(tile.latitudes) \ - | ma.getmaskarray(tile.longitudes) + data_mask = self._data_mask_logical_or(tile) # If this is multi-var, need to mask each variable separately. if tile.is_multi: @@ -404,10 +409,7 @@ def mask_tiles_to_bbox_and_time(self, min_lat, max_lat, min_lon, max_lon, start_ tile.latitudes = ma.masked_outside(tile.latitudes, min_lat, max_lat) tile.longitudes = ma.masked_outside(tile.longitudes, min_lon, max_lon) - # Or together the masks of the individual arrays to create the new mask - data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ - | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ - | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] + data_mask = self._data_mask_logical_or(tile) tile.data = ma.masked_where(data_mask, tile.data) @@ -438,10 +440,7 @@ def mask_tiles_to_time_range(self, start_time, end_time, tiles): for tile in tiles: tile.times = ma.masked_outside(tile.times, start_time, end_time) - # Or together the masks of the individual arrays to create the new mask - data_mask = ma.getmaskarray(tile.times)[:, np.newaxis, np.newaxis] \ - | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ - | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] + data_mask = self._data_mask_logical_or(tile) # If this is multi-var, need to mask each variable separately. if tile.is_multi: From d31a054fd5e3e87ed083e2cff75e75fefbd7c663 Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 21 Feb 2023 12:15:05 -0800 Subject: [PATCH 10/25] Point generator function --- data-access/nexustiles/model/nexusmodel.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py index 51793c9b..1bdb6c6d 100644 --- a/data-access/nexustiles/model/nexusmodel.py +++ b/data-access/nexustiles/model/nexusmodel.py @@ -129,11 +129,16 @@ def get_summary(self): def nexus_point_generator(self, include_nan=False): indices = self.get_indices(include_nan) + idx_len = len(indices[0]) + + time_slice = slice(0, 1) if idx_len == 3 else slice(None) + geo_slice = slice(-2, None) if idx_len == 3 else slice(None) + if include_nan: for index in indices: - time = self.times[index[0]] - lat = self.latitudes[index[1]] - lon = self.longitudes[index[2]] + time = self.times[index[time_slice]] + lat = self.latitudes[index[geo_slice]] + lon = self.longitudes[index[geo_slice]] if self.is_multi: data_vals = [data[index] for data in self.data] else: @@ -143,9 +148,9 @@ def nexus_point_generator(self, include_nan=False): else: for index in indices: index = tuple(index) - time = self.times[index[0]] - lat = self.latitudes[index[1]] - lon = self.longitudes[index[2]] + time = self.times[index[time_slice]] + lat = self.latitudes[index[geo_slice]] + lon = self.longitudes[index[geo_slice]] if self.is_multi: data_vals = [data[index] for data in self.data] else: From 603e2d7bd3c8cd754fa1d7858e5ce059ab02cb87 Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 21 Feb 2023 12:33:54 -0800 Subject: [PATCH 11/25] Changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ae015645..d2ed1517 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Deletebyquery: Parameter to set the number of rows to fetch from Solr. Speeds up time to gather tiles to delete; especially when there is a lot of them. ### Changed +- SDAP-440: Set up framework to roll out changes to SDAP algorithms to work with swath formatted data for both tile types rather than having tiles formatted as gridded which is very memory inefficient. ### Deprecated ### Removed ### Fixed From 633101f632995458e9ac98af6893dcef8e3ae73b Mon Sep 17 00:00:00 2001 From: rileykk Date: Fri, 31 Mar 2023 13:09:21 -0700 Subject: [PATCH 12/25] Fixed geo slicing for gridded tiles Maybe... --- data-access/nexustiles/model/nexusmodel.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py index 693bc775..fde49d22 100644 --- a/data-access/nexustiles/model/nexusmodel.py +++ b/data-access/nexustiles/model/nexusmodel.py @@ -129,10 +129,16 @@ def get_summary(self): def nexus_point_generator(self, include_nan=False): indices = self.get_indices(include_nan) + print(indices) + idx_len = len(indices[0]) + print(self.latitudes) + print(self.times) + print(self.data) + time_slice = slice(0, 1) if idx_len == 3 else slice(None) - geo_slice = slice(-2, None) if idx_len == 3 else slice(None) + geo_slice = slice(-1, None) if idx_len == 3 else slice(None) if include_nan: for index in indices: From 0c0fcb32cbf0ebb9acdeb3ba1ce301d7bf6f1554 Mon Sep 17 00:00:00 2001 From: rileykk Date: Fri, 31 Mar 2023 13:15:56 -0700 Subject: [PATCH 13/25] Commented out debug print statements --- data-access/nexustiles/model/nexusmodel.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py index fde49d22..f63465a3 100644 --- a/data-access/nexustiles/model/nexusmodel.py +++ b/data-access/nexustiles/model/nexusmodel.py @@ -129,13 +129,13 @@ def get_summary(self): def nexus_point_generator(self, include_nan=False): indices = self.get_indices(include_nan) - print(indices) + # print(indices) idx_len = len(indices[0]) - print(self.latitudes) - print(self.times) - print(self.data) + # print(self.latitudes) + # print(self.times) + # print(self.data) time_slice = slice(0, 1) if idx_len == 3 else slice(None) geo_slice = slice(-1, None) if idx_len == 3 else slice(None) From 4d547bfabc7a3ff765c21cc5208f593b0b2dc0e7 Mon Sep 17 00:00:00 2001 From: rileykk Date: Fri, 31 Mar 2023 13:59:39 -0700 Subject: [PATCH 14/25] Nexus_point_generator should return empty if there are no valid indices --- data-access/nexustiles/model/nexusmodel.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py index f63465a3..5717102d 100644 --- a/data-access/nexustiles/model/nexusmodel.py +++ b/data-access/nexustiles/model/nexusmodel.py @@ -128,18 +128,14 @@ def get_summary(self): def nexus_point_generator(self, include_nan=False): indices = self.get_indices(include_nan) - - # print(indices) - idx_len = len(indices[0]) - # print(self.latitudes) - # print(self.times) - # print(self.data) - time_slice = slice(0, 1) if idx_len == 3 else slice(None) geo_slice = slice(-1, None) if idx_len == 3 else slice(None) + if len(indices) == 0 or (isinstance(indices, np.ndarray) and indices.size == 0): + return + if include_nan: for index in indices: time = self.times[index[time_slice]] From 5d5275c1f5312be234c6dfea71a5fb4a7d117582 Mon Sep 17 00:00:00 2001 From: rileykk Date: Mon, 3 Apr 2023 12:23:46 -0700 Subject: [PATCH 15/25] Fixed slicing for gridded format tile lat&lon in point generator --- data-access/nexustiles/model/nexusmodel.py | 24 +++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py index 5717102d..76ec769c 100644 --- a/data-access/nexustiles/model/nexusmodel.py +++ b/data-access/nexustiles/model/nexusmodel.py @@ -131,7 +131,13 @@ def nexus_point_generator(self, include_nan=False): idx_len = len(indices[0]) time_slice = slice(0, 1) if idx_len == 3 else slice(None) - geo_slice = slice(-1, None) if idx_len == 3 else slice(None) + + if len(self.latitudes.shape) == 1: + lat_slice = slice(1,2) + lon_slice = slice(2,3) + else: + lat_slice = slice(None) + lon_slice = slice(None) if len(indices) == 0 or (isinstance(indices, np.ndarray) and indices.size == 0): return @@ -139,8 +145,8 @@ def nexus_point_generator(self, include_nan=False): if include_nan: for index in indices: time = self.times[index[time_slice]] - lat = self.latitudes[index[geo_slice]] - lon = self.longitudes[index[geo_slice]] + lat = self.latitudes[index[lat_slice]] + lon = self.longitudes[index[lon_slice]] if self.is_multi: data_vals = [data[index] for data in self.data] else: @@ -150,9 +156,17 @@ def nexus_point_generator(self, include_nan=False): else: for index in indices: index = tuple(index) + + # print('index', index) + # print('slice', lat_slice, lon_slice, time_slice) + # print('tile_lats', self.latitudes) + # print('tile_lons', self.longitudes) + time = self.times[index[time_slice]] - lat = self.latitudes[index[geo_slice]] - lon = self.longitudes[index[geo_slice]] + lat = self.latitudes[index[lat_slice]] + lon = self.longitudes[index[lon_slice]] + # print('indexed_lats', lat) + # print('indexed_lons', lon) if self.is_multi: data_vals = [data[index] for data in self.data] else: From 18cd582d0ae76a87935aaafe23fac2e12f09951c Mon Sep 17 00:00:00 2001 From: rileykk Date: Thu, 4 May 2023 10:37:26 -0700 Subject: [PATCH 16/25] Fix for nexus_point_generator --- data-access/nexustiles/model/nexusmodel.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py index 76ec769c..ea0b27a7 100644 --- a/data-access/nexustiles/model/nexusmodel.py +++ b/data-access/nexustiles/model/nexusmodel.py @@ -130,14 +130,14 @@ def nexus_point_generator(self, include_nan=False): indices = self.get_indices(include_nan) idx_len = len(indices[0]) - time_slice = slice(0, 1) if idx_len == 3 else slice(None) - - if len(self.latitudes.shape) == 1: - lat_slice = slice(1,2) - lon_slice = slice(2,3) + if self.projection == 'grid': + lat_slice = slice(1, 2) + lon_slice = slice(2, 3) + time_slice = slice(0, 1) else: lat_slice = slice(None) lon_slice = slice(None) + time_slice = slice(None) if len(indices) == 0 or (isinstance(indices, np.ndarray) and indices.size == 0): return @@ -147,30 +147,27 @@ def nexus_point_generator(self, include_nan=False): time = self.times[index[time_slice]] lat = self.latitudes[index[lat_slice]] lon = self.longitudes[index[lon_slice]] + if self.is_multi: data_vals = [data[index] for data in self.data] else: data_vals = self.data[index] + point = NexusPoint(lat, lon, None, time, index, data_vals) yield point else: for index in indices: index = tuple(index) - # print('index', index) - # print('slice', lat_slice, lon_slice, time_slice) - # print('tile_lats', self.latitudes) - # print('tile_lons', self.longitudes) - time = self.times[index[time_slice]] lat = self.latitudes[index[lat_slice]] lon = self.longitudes[index[lon_slice]] - # print('indexed_lats', lat) - # print('indexed_lons', lon) + if self.is_multi: data_vals = [data[index] for data in self.data] else: data_vals = self.data[index] + point = NexusPoint(lat, lon, None, time, index, data_vals) yield point From a3bf4400add215b85b96a5467e69543429ce2870 Mon Sep 17 00:00:00 2001 From: rileykk Date: Thu, 4 May 2023 10:42:12 -0700 Subject: [PATCH 17/25] Updated changelog to reflect post-1.1.0 status --- CHANGELOG.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3449062f..a78036e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- SDAP-440: Added `CassandraSwathProxy` to process tiles in a method optimized for swath format +- SDAP-440: Set up framework to roll out changes to SDAP algorithms to work with swath formatted data for both tile types rather than having tiles formatted as gridded which is very memory inefficient. +### Changed +### Deprecated +### Removed +### Fixed +### Security + +## [1.1.0] - 2023-04-26 +### Added - Deletebyquery: Parameter to set the number of rows to fetch from Solr. Speeds up time to gather tiles to delete; especially when there is a lot of them. - Added Saildrone's `baja_2018` insitu dataset. - SDAP-454: Added new query parameter `prioritizeDistance` to matchup algorithm @@ -20,7 +30,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `platforms` parameter in `/match_spark` is now a multi-select list. - Added note to `/stats` endpoint to note it is limited to satellite datasets - SDAP-450: Updated helm chart to reflect k8s 1.22 changes. Bumped RMQ dependency version & updated Bitnami dependency chart URLs. Ingress template is already up to date. -- SDAP-440: Set up framework to roll out changes to SDAP algorithms to work with swath formatted data for both tile types rather than having tiles formatted as gridded which is very memory inefficient. ### Deprecated ### Removed ### Fixed From f42e4540f44871e5ccedd5f7d5e47afe98ff3f40 Mon Sep 17 00:00:00 2001 From: rileykk Date: Thu, 4 May 2023 10:45:34 -0700 Subject: [PATCH 18/25] logging --- data-access/nexustiles/nexustiles.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index 5617ad9b..fd8c730d 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -113,9 +113,7 @@ def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None, de self._metadatastore = ElasticsearchProxy.ElasticsearchProxy(self._config) logger.info(f'Created new NexusTileService with data store {type(self._datastore)} and metadata ' - f'store {type(self._metadatastore)}') - # logger.info('Traceback for debugging...') - # logger.info(''.join(traceback.format_stack())) + f'store {type(self._metadatastore)}. Desired projection: {desired_projection}') def override_config(self, config): for section in config.sections(): From f15d580188ff7a29ee49f2854bf04e304a438dd1 Mon Sep 17 00:00:00 2001 From: rileykk Date: Thu, 4 May 2023 10:55:47 -0700 Subject: [PATCH 19/25] Restrict desired_projection to grid or swath --- data-access/nexustiles/nexustiles.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index fd8c730d..c39537e0 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -86,6 +86,12 @@ def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None, de self._config = configparser.RawConfigParser() self._config.read(NexusTileService._get_config_files('config/datastores.ini')) + if desired_projection not in ['grid', 'swath']: + raise ValueError(f'Invalid value provided for NexusTileService desired_projection: {desired_projection}') + # warnings.warn(f'Invalid value provided for NexusTileService desired_projection: {desired_projection}. ' + # f'Defaulting to \'grid\'') + # desired_projection = 'grid' + self.desired_projection = desired_projection if config: From c1c9299a913ff811468cd395de7c3371fc2eed9d Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 9 May 2023 12:51:26 -0700 Subject: [PATCH 20/25] 440 - Fixed masking and grid handling --- data-access/nexustiles/dao/CassandraSwathProxy.py | 10 ++++++---- data-access/nexustiles/model/nexusmodel.py | 2 +- data-access/nexustiles/nexustiles.py | 5 ++++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/data-access/nexustiles/dao/CassandraSwathProxy.py b/data-access/nexustiles/dao/CassandraSwathProxy.py index 40f665cb..06f84f34 100644 --- a/data-access/nexustiles/dao/CassandraSwathProxy.py +++ b/data-access/nexustiles/dao/CassandraSwathProxy.py @@ -91,8 +91,10 @@ def get_lat_lon_time_data_meta(self, projection='grid'): reflected_lat_array = np.broadcast_to(latitude_data, (len(longitude_data), len(latitude_data))) reflected_lat_array = np.transpose(reflected_lat_array) - if len(grid_tile_data.shape) == 2: - grid_tile_data = grid_tile_data[np.newaxis, :] + time_array = np.broadcast_to(grid_tile.time, grid_tile_data.shape) + + # if len(grid_tile_data.shape) == 2: + # grid_tile_data = grid_tile_data[np.newaxis, :] # Extract the meta data meta_data = {} @@ -103,7 +105,7 @@ def get_lat_lon_time_data_meta(self, projection='grid'): meta_array = meta_array[np.newaxis, :] meta_data[name] = meta_array - return reflected_lat_array, reflected_lon_array, np.array([grid_tile.time]), grid_tile_data, meta_data, is_multi_var + return reflected_lat_array, reflected_lon_array, time_array, grid_tile_data, meta_data, is_multi_var elif self._get_nexus_tile().HasField('swath_tile'): swath_tile = self._get_nexus_tile().swath_tile @@ -180,7 +182,7 @@ def get_lat_lon_time_data_meta(self, projection='grid'): actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) meta_data[name] = actual_meta_array - return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var + return latitude_data, longitude_data, time_data, np.ma.array(tile_data), meta_data, is_multi_var elif self._get_nexus_tile().HasField('grid_multi_variable_tile'): grid_multi_variable_tile = self._get_nexus_tile().grid_multi_variable_tile is_multi_var = True diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py index ea0b27a7..1b4a5124 100644 --- a/data-access/nexustiles/model/nexusmodel.py +++ b/data-access/nexustiles/model/nexusmodel.py @@ -175,7 +175,7 @@ def get_indices(self, include_nan=False): if include_nan: return list(np.ndindex(self.data.shape)) if self.is_multi: - combined_data_inv_mask = reduce(np.logical_and, [data.mask for data in self.data]) + combined_data_inv_mask = reduce(np.logical_and, [np.ma.getmaskarray(data) for data in self.data]) return np.argwhere(np.logical_not(combined_data_inv_mask)) else: return np.transpose(np.where(np.ma.getmaskarray(self.data) == False)).tolist() diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index c39537e0..fddb703d 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -395,7 +395,10 @@ def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles): # If this is multi-var, need to mask each variable separately. if tile.is_multi: # Combine space/time mask with existing mask on data - data_mask = reduce(np.logical_or, [tile.data[0].mask, data_mask]) + # Data masks are ANDed because we want to mask out only when ALL data vars are invalid + combined_data_mask = reduce(np.logical_and, [d.mask for d in tile.data]) + # We now OR in the bounds mask because out of bounds data must be excluded regardless of validity + data_mask = np.logical_or(combined_data_mask, data_mask) num_vars = len(tile.data) multi_data_mask = np.repeat(data_mask[np.newaxis, ...], num_vars, axis=0) From 28bc593b2c007715dce9be643cb74c9da9c768cf Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 9 May 2023 13:17:40 -0700 Subject: [PATCH 21/25] Cleanup --- data-access/nexustiles/model/nexusmodel.py | 1 - data-access/nexustiles/nexustiles.py | 3 --- 2 files changed, 4 deletions(-) diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py index 1b4a5124..dcf4b9f9 100644 --- a/data-access/nexustiles/model/nexusmodel.py +++ b/data-access/nexustiles/model/nexusmodel.py @@ -128,7 +128,6 @@ def get_summary(self): def nexus_point_generator(self, include_nan=False): indices = self.get_indices(include_nan) - idx_len = len(indices[0]) if self.projection == 'grid': lat_slice = slice(1, 2) diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index fddb703d..500c8672 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -88,9 +88,6 @@ def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None, de if desired_projection not in ['grid', 'swath']: raise ValueError(f'Invalid value provided for NexusTileService desired_projection: {desired_projection}') - # warnings.warn(f'Invalid value provided for NexusTileService desired_projection: {desired_projection}. ' - # f'Defaulting to \'grid\'') - # desired_projection = 'grid' self.desired_projection = desired_projection From 75571fa8b0f6d7b566a7e3c916f0aac8f8c6f59b Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 9 May 2023 14:12:50 -0700 Subject: [PATCH 22/25] 440 - Ensure tiles are fetched in proper projection --- data-access/nexustiles/nexustiles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index 500c8672..3da8d1a7 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -484,7 +484,7 @@ def fetch_data_for_tiles(self, *tiles, **kwargs): if len(missing_data) > 0: raise Exception("Missing data for tile_id(s) %s." % missing_data) - desired_projection = kwargs['desired_projection'] if 'desired_projection' in kwargs else 'grid' + desired_projection = kwargs['desired_projection'] if 'desired_projection' in kwargs else self.desired_projection for a_tile in tiles: lats, lons, times, data, meta, is_multi_var = tile_data_by_id[a_tile.tile_id].get_lat_lon_time_data_meta( From 9862ffc869f5a6c265e0d60e8040803bbfae949b Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 16 May 2023 11:46:39 -0700 Subject: [PATCH 23/25] Changelog --- CHANGELOG.md | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb71d539..9df55a1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,17 +4,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [1.1.0] - 2023-04-26 +## Unreleased ### Added - SDAP-461: Added 4 remaining Saildrone insitu datasets. -### Changed -### Deprecated -### Removed -### Fixed -### Security - -## [1.1.0] - 2023-04-26 -### Added - SDAP-440: Added `CassandraSwathProxy` to process tiles in a method optimized for swath format - SDAP-440: Set up framework to roll out changes to SDAP algorithms to work with swath formatted data for both tile types rather than having tiles formatted as gridded which is very memory inefficient. ### Changed From d1daece77875e98966b48ce1eb72be1db79021ee Mon Sep 17 00:00:00 2001 From: rileykk Date: Tue, 8 Aug 2023 14:33:13 -0700 Subject: [PATCH 24/25] 440 - GridMulti fix? --- data-access/nexustiles/dao/CassandraSwathProxy.py | 7 +++++-- data-access/nexustiles/model/nexusmodel.py | 15 ++++++++++----- data-access/nexustiles/nexustiles.py | 2 +- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/data-access/nexustiles/dao/CassandraSwathProxy.py b/data-access/nexustiles/dao/CassandraSwathProxy.py index 06f84f34..36a108b2 100644 --- a/data-access/nexustiles/dao/CassandraSwathProxy.py +++ b/data-access/nexustiles/dao/CassandraSwathProxy.py @@ -195,10 +195,13 @@ def get_lat_lon_time_data_meta(self, projection='grid'): reflected_lat_array = np.broadcast_to(latitude_data, (len(longitude_data), len(latitude_data))) reflected_lat_array = np.transpose(reflected_lat_array) + time = np.array([grid_multi_variable_tile.time]) + reflected_time_array = np.broadcast_to(time, (len(latitude_data), len(longitude_data))) + # If there are 3 dimensions, that means the time dimension # was squeezed. Add back in if len(grid_tile_data.shape) == 3: - grid_tile_data = np.expand_dims(grid_tile_data, axis=1) + grid_tile_data = np.expand_dims(grid_tile_data, axis=0) # If there are 4 dimensions, that means the time dimension # is present. Move the multivar dimension. if len(grid_tile_data.shape) == 4: @@ -213,7 +216,7 @@ def get_lat_lon_time_data_meta(self, projection='grid'): meta_array = meta_array[np.newaxis, :] meta_data[name] = meta_array - return reflected_lat_array, reflected_lon_array, np.array([grid_multi_variable_tile.time]), grid_tile_data, meta_data, is_multi_var + return reflected_lat_array, reflected_lon_array, reflected_time_array, grid_tile_data, meta_data, is_multi_var else: raise NotImplementedError("Only supports grid_tile, swath_tile, swath_multi_variable_tile, and time_series_tile") diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py index dcf4b9f9..b9b7056d 100644 --- a/data-access/nexustiles/model/nexusmodel.py +++ b/data-access/nexustiles/model/nexusmodel.py @@ -129,17 +129,22 @@ def get_summary(self): def nexus_point_generator(self, include_nan=False): indices = self.get_indices(include_nan) + if len(indices) == 0 or (isinstance(indices, np.ndarray) and indices.size == 0): + return if self.projection == 'grid': lat_slice = slice(1, 2) lon_slice = slice(2, 3) time_slice = slice(0, 1) else: - lat_slice = slice(None) - lon_slice = slice(None) - time_slice = slice(None) + def slice_for_var(v): + if len(v.shape) < len(indices[0]): + return slice(-len(v.shape), -1) + else: + return None - if len(indices) == 0 or (isinstance(indices, np.ndarray) and indices.size == 0): - return + lat_slice = slice_for_var(self.latitudes) + lon_slice = slice_for_var(self.longitudes) + time_slice = slice_for_var(self.times) if include_nan: for index in indices: diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index 3da8d1a7..9dd3bf80 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -393,7 +393,7 @@ def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles): if tile.is_multi: # Combine space/time mask with existing mask on data # Data masks are ANDed because we want to mask out only when ALL data vars are invalid - combined_data_mask = reduce(np.logical_and, [d.mask for d in tile.data]) + combined_data_mask = reduce(np.logical_and, [ma.getmaskarray(d) for d in tile.data]) # We now OR in the bounds mask because out of bounds data must be excluded regardless of validity data_mask = np.logical_or(combined_data_mask, data_mask) From 03efb50334393ac7d8f86de9e58bf8436d1eadbe Mon Sep 17 00:00:00 2001 From: rileykk Date: Wed, 9 Aug 2023 09:04:26 -0700 Subject: [PATCH 25/25] GridMulti and not all vars unmasked --- .../nexustiles/dao/CassandraSwathProxy.py | 5 ++++ data-access/nexustiles/model/nexusmodel.py | 26 +++++++++++-------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/data-access/nexustiles/dao/CassandraSwathProxy.py b/data-access/nexustiles/dao/CassandraSwathProxy.py index 36a108b2..dcfbc47e 100644 --- a/data-access/nexustiles/dao/CassandraSwathProxy.py +++ b/data-access/nexustiles/dao/CassandraSwathProxy.py @@ -195,9 +195,14 @@ def get_lat_lon_time_data_meta(self, projection='grid'): reflected_lat_array = np.broadcast_to(latitude_data, (len(longitude_data), len(latitude_data))) reflected_lat_array = np.transpose(reflected_lat_array) + reflected_lat_array = np.expand_dims(reflected_lat_array, axis=0) + reflected_lon_array = np.expand_dims(reflected_lon_array, axis=0) + time = np.array([grid_multi_variable_tile.time]) reflected_time_array = np.broadcast_to(time, (len(latitude_data), len(longitude_data))) + reflected_time_array = np.expand_dims(reflected_time_array, axis=0) + # If there are 3 dimensions, that means the time dimension # was squeezed. Add back in if len(grid_tile_data.shape) == 3: diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py index b9b7056d..a5f67058 100644 --- a/data-access/nexustiles/model/nexusmodel.py +++ b/data-access/nexustiles/model/nexusmodel.py @@ -136,15 +136,9 @@ def nexus_point_generator(self, include_nan=False): lon_slice = slice(2, 3) time_slice = slice(0, 1) else: - def slice_for_var(v): - if len(v.shape) < len(indices[0]): - return slice(-len(v.shape), -1) - else: - return None - - lat_slice = slice_for_var(self.latitudes) - lon_slice = slice_for_var(self.longitudes) - time_slice = slice_for_var(self.times) + lat_slice = slice(None) + lon_slice = slice(None) + time_slice = slice(None) if include_nan: for index in indices: @@ -153,7 +147,12 @@ def slice_for_var(v): lon = self.longitudes[index[lon_slice]] if self.is_multi: - data_vals = [data[index] for data in self.data] + data_vals = [] + + for data in self.data: + val = data[index] + + data_vals.append(val if val is not np.ma.masked else np.nan) else: data_vals = self.data[index] @@ -168,7 +167,12 @@ def slice_for_var(v): lon = self.longitudes[index[lon_slice]] if self.is_multi: - data_vals = [data[index] for data in self.data] + data_vals = [] + + for data in self.data: + val = data[index] + + data_vals.append(val if val is not np.ma.masked else np.nan) else: data_vals = self.data[index]