diff --git a/CHANGELOG.md b/CHANGELOG.md index d2701e09..d28f78f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - SDAP-473: Added support for matchup job prioritization - SDAP-483: Added `.asf.yaml` to configure Jira auto-linking. - SDAP-487: Added script to migrate existing `doms.doms_data` data to new schema. +- SDAP-401: Added JSON output formatting for use with COVERAGE to `/match_spark` ### Changed - SDAP-453: Updated results storage and retrieval to support output JSON from `/cdmsresults` that matches output from `/match_spark`. - **NOTE:** Deploying these changes to an existing SDAP deployment will require modifying the Cassandra database with stored results. There is a script to do so at `/tools/update-doms-data-schema/update.py` diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py index 84c91633..02e5e178 100644 --- a/analysis/webservice/algorithms/doms/BaseDomsHandler.py +++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py @@ -113,6 +113,9 @@ def toCSV(self): def toNetCDF(self): return DomsNetCDFFormatter.create(self.__executionId, self.results(), self.__args, self.__details) + def toCAML(self, request): + return DomsCAMLFormatter.create(self.__executionId, self.results(), self.__args, self.__details, request) + def filename(self): return f'CDMS_{self.__executionId}' @@ -600,3 +603,453 @@ def __enrichDepth(var, var_min, var_max): var.axis = "Z" var.positive = "Down" + +class DomsCAMLFormatter: + @staticmethod + def create(executionId, results, params, details, request): + import copy + + def empty_if_none(string): + if string is not None: + return string + else: + return "" + + def datetime_to_iso(dt): + return dt.isoformat() + "+0000" + + def get_match_by_variable_name(matches, variable_name): + m = [m for m in matches if m['cf_variable_name'] == variable_name] + return m[0] + + def round_down_day(dt): + return datetime(*dt.timetuple()[:3]) + + query = {} + result = {} + caml_params = params['caml_params'] + + query['apiRequest'] = f"{request.protocol}://{request.host}{request.uri}" + query['analysisName'] = 'colocation_trajectory' + query['primaryName'] = params['primary'] + query['secondaryName'] = params['matchup'] + + b = params['bbox'].split(',') + + query['bounds'] = { + "lon_min": float(b[0]), + "lon_max": float(b[2]), + "lat_min": float(b[1]), + "lat_max": float(b[3]), + "time_start": datetime_to_iso(params['startTime']), + "time_end": datetime_to_iso(params['endTime']) + } + + variables = [] + charts = [] + + if len(results) > 0: + try: + variables.append({ + "object": "primary", + "name": caml_params['primary'], + "units": empty_if_none(get_match_by_variable_name(results[0]['primary'], caml_params['primary'])["variable_unit"]) + }) + except KeyError: + variables.append({ + "object": "primary", + "name": caml_params['primary'], + "units": "" + }) + + results.sort(key=lambda e: e['time']) + + try: + variables.append({ + "object": "secondary", + "name": caml_params['secondary'], + "units": empty_if_none(get_match_by_variable_name(results[0]['matches'][0]['secondary'], caml_params['secondary'])["variable_unit"]) + }) + except KeyError: + variables.append({ + "object": "secondary", + "name": caml_params['secondary'], + "units": "" + }) + + result['variable'] = variables + + data = [] + + if caml_params['charts']['time_series']: + data = [[], []] + + for r in results: + secondary = None + secondary_match = None + + for s in r['matches']: + try: + if caml_params['format'] == 'Matchup': + secondary_match = get_match_by_variable_name(s['secondary'], caml_params['secondary']) + secondary = s + break + else: + if caml_params['secondary'] in s: + secondary = s + secondary_match = {'variable_value': s[caml_params['secondary']]} + break + except: + pass + + if secondary is None: + continue + + if caml_params['format'] == 'Matchup': + data[0].append([datetime_to_iso(r['time']), get_match_by_variable_name(r['primary'], caml_params['primary'])['variable_value']]) + else: + data[0].append([datetime_to_iso(r['time']), r[caml_params['primary']]]) + data[1].append([datetime_to_iso(secondary['time']), secondary_match['variable_value']]) + + data[0].sort(key=lambda e: (e[0], e[1])) + data[1].sort(key=lambda e: (e[0], e[1])) + + charts.append({ + "object": ["primary"], + "type": "xy_line_point", + "title": "Time Series", + "xAxis_label": 'Time', + "yAxis_label": f"{variables[0]['name']} ({variables[0]['units']})", + "xAxis_type": "datetime", + "yAxis_type": "number", + "xySeries_data": copy.deepcopy(data[0]), + "xySeries_labels": [query["primaryName"]] + }) + + charts.append({ + "object": ["secondary"], + "type": "xy_line_point", + "title": "Time Series", + "xAxis_label": 'Time', + "yAxis_label": f"{variables[1]['name']} ({variables[1]['units']})", + "xAxis_type": "datetime", + "yAxis_type": "number", + "xySeries_data": copy.deepcopy(data[1]), + "xySeries_labels": [query["secondaryName"]] + }) + + data.clear() + + if caml_params['charts']['scatter']: + for r in results: + secondary_match = None + + for s in r['matches']: + try: + if caml_params['format'] == 'Matchup': + secondary_match = get_match_by_variable_name(s['secondary'], caml_params['secondary']) + break + else: + if caml_params['secondary'] in s: + secondary_match = {'variable_value': s[caml_params['secondary']]} + break + except: + pass + + if secondary_match is None: + continue + + data.append( + [ + get_match_by_variable_name(r['primary'], caml_params['primary'])['variable_value'] + if caml_params['format'] == 'Matchup' else r[caml_params['primary']], + secondary_match['variable_value'] + ] + ) + + data.sort(key=lambda e: (e[0], e[1])) + + charts.append({ + "object": ["primary", "secondary"], + "type": "xy_scatter_point", + "title": "Scatter Plot", + "xAxis_label": f"{query['primaryName']} ({variables[0]['units']})", + "yAxis_label": f"{query['secondaryName']} ({variables[1]['units']})", + "xAxis_type": "number", + "yAxis_type": "number", + "xySeries_data": copy.deepcopy(data), + "xySeries_labels": f"{variables[0]['name']} x {variables[1]['name']}" + }) + + data.clear() + + if caml_params['charts']['histogram_primary'] or caml_params['charts']['histogram_secondary']: + primary_histdata = {} + secondary_histdata = {} + + for r in results: + secondary = None + secondary_match = None + + for s in r['matches']: + try: + if caml_params['format'] == 'Matchup': + secondary_match = get_match_by_variable_name(s['secondary'], caml_params['secondary']) + secondary = s + break + else: + if caml_params['secondary'] in s: + secondary = s + secondary_match = {'variable_value': s[caml_params['secondary']]} + break + except: + pass + + if secondary is None: + continue + + pts = 'fixed' + sts = 'fixed' + + if pts not in primary_histdata: + primary_histdata[pts] = { + 'data': [], + 'hist': None + } + + if sts not in secondary_histdata: + secondary_histdata[sts] = { + 'data': [], + 'hist': None + } + + if caml_params['format'] == 'Matchup': + primary_histdata[pts]['data'].append(get_match_by_variable_name(r['primary'], caml_params['primary'])['variable_value']) + else: + primary_histdata[pts]['data'].append(r[caml_params['primary']]) + secondary_histdata[sts]['data'].append(secondary_match['variable_value']) + + if "histogram_bins" in caml_params: + bins = caml_params['histogram_bins'] + else: + bins = [-5, 0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55] + + for d in primary_histdata: + hist, _ = np.histogram(primary_histdata[d]['data'], bins=bins, density=False) + h = [] + + for i in range(len(hist)): + h.append([bins[i], int(hist[i])]) + + primary_histdata[d]['hist'] = copy.deepcopy(h) + + for d in secondary_histdata: + hist, _ = np.histogram(secondary_histdata[d]['data'], bins=bins, density=False) + h = [] + + for i in range(len(hist)): + h.append([bins[i], int(hist[i])]) + + secondary_histdata[d]['hist'] = copy.deepcopy(h) + + if caml_params['charts']['histogram_primary']: + for d in primary_histdata: + for bin in primary_histdata[d]['hist']: + data.append(bin) + + charts.append({ + "object": ["primary"], + "type": "histogram", + "title": "Frequency Distribution", + "xAxis_label": f"{variables[0]['name']} ({variables[0]['units']})", + "yAxis_label": "frequency (count)", + "xAxis_type": "number", + "yAxis_type": "number", + "xySeries_data": copy.deepcopy(data), + "xySeries_labels": [query['primaryName']] + }) + + data.clear() + + if caml_params['charts']['histogram_secondary']: + for d in secondary_histdata: + for bin in secondary_histdata[d]['hist']: + data.append(bin) + + charts.append({ + "object": ["secondary"], + "type": "histogram", + "title": "Frequency Distribution", + "xAxis_label": f"{variables[1]['name']} ({variables[1]['units']})", + "yAxis_label": "frequency (count)", + "xAxis_type": "number", + "yAxis_type": "number", + "xySeries_data": copy.deepcopy(data), + "xySeries_labels": [query['secondaryName']] + }) + + data.clear() + + if caml_params['charts']['histogram_primary_timeseries'] or \ + caml_params['charts']['histogram_secondary_timeseries']: + primary_histdata = {} + secondary_histdata = {} + + for r in results: + secondary = None + secondary_match = None + + for s in r['matches']: + try: + if caml_params['format'] == 'Matchup': + secondary_match = get_match_by_variable_name(s['secondary'], caml_params['secondary']) + secondary = s + break + else: + if caml_params['secondary'] in s: + secondary = s + secondary_match = {'variable_value': s[caml_params['secondary']]} + break + except: + pass + + if secondary is None: + continue + + pts = datetime_to_iso(round_down_day(r['time'])) + sts = datetime_to_iso(round_down_day(secondary['time'])) + + if pts not in primary_histdata: + primary_histdata[pts] = { + 'data': [], + 'hist': None + } + + if sts not in secondary_histdata: + secondary_histdata[sts] = { + 'data': [], + 'hist': None + } + + if caml_params['format'] == 'Matchup': + primary_histdata[pts]['data'].append(get_match_by_variable_name(r['primary'], caml_params['primary'])['variable_value']) + else: + primary_histdata[pts]['data'].append(r[caml_params['primary']]) + secondary_histdata[sts]['data'].append(secondary_match['variable_value']) + + if "histogram_bins" in caml_params: + bins = caml_params['histogram_bins'] + else: + bins = [-5, 0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55] + + for d in primary_histdata: + hist, _ = np.histogram(primary_histdata[d]['data'], bins=bins, density=False) + h = [] + + for i in range(len(hist)): + h.append([bins[i], int(hist[i])]) + + primary_histdata[d]['hist'] = copy.deepcopy(h) + + for d in secondary_histdata: + hist, _ = np.histogram(secondary_histdata[d]['data'], bins=bins, density=False) + h = [] + + for i in range(len(hist)): + h.append([bins[i], int(hist[i])]) + + secondary_histdata[d]['hist'] = copy.deepcopy(h) + + if caml_params['charts']['histogram_primary_timeseries']: + for d in primary_histdata: + d_hist = [d] + + for bin in primary_histdata[d]['hist']: + d_hist.append(bin) + + data.append([d_hist]) + + charts.append({ + "object": ["primary"], + "type": "histogram_timeseries", + "title": "Frequency Distribution over Time", + "xAxis_label": f"{variables[0]['name']} ({variables[0]['units']})", + "yAxis_label": "frequency (count)", + "xAxis_type": "number", + "yAxis_type": "number", + "xySeries_data": copy.deepcopy(data), + "xySeries_labels": [query['primaryName']] + }) + + data.clear() + + if caml_params['charts']['histogram_secondary_timeseries']: + for d in secondary_histdata: + d_hist = [d] + + for bin in secondary_histdata[d]['hist']: + d_hist.append(bin) + + data.append([d_hist]) + + charts.append({ + "object": ["secondary"], + "type": "histogram_timeseries", + "title": "Frequency Distribution over Time", + "xAxis_label": f"{variables[1]['name']} ({variables[1]['units']})", + "yAxis_label": "frequency (count)", + "xAxis_type": "number", + "yAxis_type": "number", + "xySeries_data": copy.deepcopy(data), + "xySeries_labels": [query['secondaryName']] + }) + + data.clear() + + if caml_params['charts']['trajectory']: + for r in results: + secondary = None + secondary_match = None + + for s in r['matches']: + try: + if caml_params['format'] == 'Matchup': + secondary_match = get_match_by_variable_name(s['secondary'], caml_params['secondary']) + secondary = s + break + else: + if caml_params['secondary'] in s: + secondary = s + secondary_match = {'variable_value': s[caml_params['secondary']]} + break + except: + pass + + if secondary_match is None: + continue + + primary = get_match_by_variable_name(r['primary'], caml_params['primary']) if caml_params['format'] == 'Matchup' else {'variable_value': r[caml_params['primary']]} + + data.append([ + datetime_to_iso(secondary['time']), + [float(secondary['lat']), float(secondary['lon'])], + primary['variable_value'] - secondary_match['variable_value'] + ]) + + data.sort(key=lambda e: (e[0], e[1][0], e[1][1], e[2])) + + charts.append({ + "object": ["secondary"], + "type": "trajectory", + "title": "Along track colocation differences", + "colorbar_label": f"{variables[1]['name']} ({variables[1]['units']})", + "xySeries_data": copy.deepcopy(data), + }) + + result['chart'] = charts + + return json.dumps( + {'executionId': executionId, 'query': query, 'result': result, 'params': params}, + indent=4, + cls=DomsEncoder + ) diff --git a/analysis/webservice/algorithms/doms/ResultsRetrieval.py b/analysis/webservice/algorithms/doms/ResultsRetrieval.py index f03c1caa..aa175435 100644 --- a/analysis/webservice/algorithms/doms/ResultsRetrieval.py +++ b/analysis/webservice/algorithms/doms/ResultsRetrieval.py @@ -19,6 +19,21 @@ from . import ResultsStorage from webservice.NexusHandler import nexus_handler from webservice.webmodel import NexusProcessingException +from webservice.algorithms.doms.insitu import query_insitu_schema + + +class Schema: + def __init__(self): + self.schema = None + + def get(self): + if self.schema is None: + self.schema = query_insitu_schema() + + return self.schema + + +insitu_schema = Schema() @nexus_handler @@ -34,10 +49,106 @@ def __init__(self, tile_service_factory, config=None): self.config = config def calc(self, computeOptions, **args): + from webservice.algorithms_spark.Matchup import get_insitu_params + execution_id = computeOptions.get_argument("id", None) page_num = computeOptions.get_int_arg('pageNum', default=1) page_size = computeOptions.get_int_arg('pageSize', default=1000) + caml_params = {} + + output_type = computeOptions.get_argument("output", default='JSON') + + if output_type == 'CAML': + primary = computeOptions.get_argument("camlPrimary") + if primary is None: + raise NexusProcessingException( + reason="Primary dataset argument is required when outputting in CAML format", code=400) + + secondary = computeOptions.get_argument("camlSecondary") + if secondary is None: + raise NexusProcessingException( + reason="Secondary dataset argument is required when outputting in CAML format", code=400) + + insitu_params = get_insitu_params(insitu_schema.get()) + + if secondary not in insitu_params: + raise NexusProcessingException( + reason=f"Parameter {secondary} not supported. Must be one of {insitu_params}", code=400) + + CHART_TYPES = [ + 'time_series', + 'scatter', + 'histogram_primary', + 'histogram_secondary', + 'histogram_primary_timeseries', + 'histogram_secondary_timeseries', + 'trajectory' + ] + + types_arg = computeOptions.get_argument("camlChartTypes") + + if types_arg is None: + types = { + 'time_series': False, + 'scatter': True, + 'histogram_primary': True, + 'histogram_secondary': True, + 'histogram_primary_timeseries': True, + 'histogram_secondary_timeseries': True, + 'trajectory': True + } + else: + types_arg = types_arg.split(',') + + types = { + 'time_series': False, + 'scatter': False, + 'histogram_primary': False, + 'histogram_secondary': False, + 'histogram_primary_timeseries': False, + 'histogram_secondary_timeseries': False, + 'trajectory': False + } + + for t in types_arg: + if t not in CHART_TYPES: + raise NexusProcessingException( + reason=f"Invalid chart type argument: {t}", + code=400 + ) + + types[t] = True + + caml_params['primary'] = primary + caml_params['secondary'] = secondary + caml_params['charts'] = types + caml_params['format'] = 'Results' + + hist_bins = computeOptions.get_argument("camlHistBins") + + if hist_bins and (types['histogram_primary'] or types['histogram_secondary'] or + types['histogram_primary_timeseries'] or types['histogram_secondary_timeseries']): + hist_bins = hist_bins.split(',') + + bins = [] + + for b in hist_bins: + try: + v = int(b) + if v in bins: + raise NexusProcessingException(reason="duplicate bin in parameter", code=400) + bins.append(v) + except: + raise NexusProcessingException("non numeric argument provided for bins", code=400) + + if len(bins) == 0: + raise NexusProcessingException(reason='No bins given in argument', code=400) + + bins.sort() + + caml_params['histogram_bins'] = bins + try: execution_id = uuid.UUID(execution_id) except: @@ -48,5 +159,9 @@ def calc(self, computeOptions, **args): with ResultsStorage.ResultsRetrieval(self.config) as storage: params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results, page_num=page_num, page_size=page_size) + if output_type == 'CAML': + params['caml_params'] = caml_params + params['matchup'] = params['matchup'][0] + return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=len(data), computeOptions=None, executionId=execution_id, page_num=page_num, page_size=page_size) diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py index a55f61d1..fb00ddd7 100644 --- a/analysis/webservice/algorithms_spark/Matchup.py +++ b/analysis/webservice/algorithms_spark/Matchup.py @@ -231,10 +231,103 @@ def parse_arguments(self, request): prioritize_distance = request.get_boolean_arg("prioritizeDistance", default=True) + output_type = request.get_argument("output", default='JSON') + + caml_params = {} + + if output_type == 'CAML': + primary = request.get_argument("camlPrimary") + if primary is None: + raise NexusProcessingException(reason="Primary dataset argument is required when outputting in CAML format", code=400) + + secondary = request.get_argument("camlSecondary") + if secondary is None: + raise NexusProcessingException(reason="Secondary dataset argument is required when outputting in CAML format", code=400) + + if secondary not in insitu_params: + raise NexusProcessingException( + reason=f"Parameter {secondary} not supported. Must be one of {insitu_params}", code=400) + + parameter_s = secondary # Override parameter as it makes no sense for it to differ + + CHART_TYPES = [ + 'time_series', + 'scatter', + 'histogram_primary', + 'histogram_secondary', + 'histogram_primary_timeseries', + 'histogram_secondary_timeseries', + 'trajectory' + ] + + types_arg = request.get_argument("camlChartTypes") + + if types_arg is None: + types = { + 'time_series': False, + 'scatter': True, + 'histogram_primary': True, + 'histogram_secondary': True, + 'histogram_primary_timeseries': True, + 'histogram_secondary_timeseries': True, + 'trajectory': True + } + else: + types_arg = types_arg.split(',') + + types = { + 'time_series': False, + 'scatter': False, + 'histogram_primary': False, + 'histogram_secondary': False, + 'histogram_primary_timeseries': False, + 'histogram_secondary_timeseries': False, + 'trajectory': False + } + + for t in types_arg: + if t not in CHART_TYPES: + raise NexusProcessingException( + reason=f"Invalid chart type argument: {t}", + code=400 + ) + + types[t] = True + + caml_params['primary'] = primary + caml_params['secondary'] = secondary + caml_params['charts'] = types + caml_params['format'] = 'Matchup' + + hist_bins = request.get_argument("camlHistBins") + + if hist_bins and (types['histogram_primary'] or types['histogram_secondary'] or + types['histogram_primary_timeseries'] or types['histogram_secondary_timeseries']): + hist_bins = hist_bins.split(',') + + bins = [] + + for b in hist_bins: + try: + v = int(b) + if v in bins: + raise NexusProcessingException(reason="duplicate bin in parameter", code=400) + bins.append(v) + except: + raise NexusProcessingException("non numeric argument provided for bins", code=400) + + if len(bins) == 0: + raise NexusProcessingException(reason='No bins given in argument', code=400) + + bins.sort() + + caml_params['histogram_bins'] = bins + return bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \ start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ depth_min, depth_max, time_tolerance, radius_tolerance, \ - platforms, match_once, result_size_limit, prioritize_distance + platforms, match_once, result_size_limit, prioritize_distance, \ + output_type, caml_params def get_job_pool(self, tile_ids): if len(tile_ids) > LARGE_JOB_THRESHOLD: @@ -310,7 +403,8 @@ def calc(self, request, tornado_io_loop, **args): bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \ start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ depth_min, depth_max, time_tolerance, radius_tolerance, \ - platforms, match_once, result_size_limit, prioritize_distance = self.parse_arguments(request) + platforms, match_once, result_size_limit, prioritize_distance, output_type, \ + caml_params = self.parse_arguments(request) args = { "primary": primary_ds_name, @@ -324,6 +418,9 @@ def calc(self, request, tornado_io_loop, **args): "parameter": parameter_s } + if output_type == 'CAML': + args['caml_params'] = caml_params + if depth_min is not None: args["depthMin"] = float(depth_min) diff --git a/analysis/webservice/nexus_tornado/request/renderers/NexusCAMLRenderer.py b/analysis/webservice/nexus_tornado/request/renderers/NexusCAMLRenderer.py new file mode 100644 index 00000000..467cdf87 --- /dev/null +++ b/analysis/webservice/nexus_tornado/request/renderers/NexusCAMLRenderer.py @@ -0,0 +1,19 @@ +import sys +import traceback +import json + + +class NexusCAMLRenderer(object): + def __init__(self, nexus_request): + self.request = nexus_request + + def render(self, tornado_handler, result): + tornado_handler.set_header("Content-Type", "application/json") + try: + result_str = result.toCAML(tornado_handler.request) + tornado_handler.write(result_str) + tornado_handler.finish() + except AttributeError: + traceback.print_exc(file=sys.stdout) + tornado_handler.write(json.dumps(result, indent=4)) + tornado_handler.finish() diff --git a/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py b/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py index e0dabe22..8dcdbe04 100644 --- a/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py +++ b/analysis/webservice/nexus_tornado/request/renderers/NexusRendererFactory.py @@ -14,7 +14,7 @@ # limitations under the License. class NexusRendererFactory(object): - content_types = ["CSV", "JSON", "XML", "PNG", "NETCDF", "ZIP"] + content_types = ["CSV", "JSON", "XML", "PNG", "NETCDF", "ZIP", "CAML"] module = __import__(__name__) @classmethod diff --git a/analysis/webservice/nexus_tornado/request/renderers/__init__.py b/analysis/webservice/nexus_tornado/request/renderers/__init__.py index 4c1d31e7..2689218b 100644 --- a/analysis/webservice/nexus_tornado/request/renderers/__init__.py +++ b/analysis/webservice/nexus_tornado/request/renderers/__init__.py @@ -18,4 +18,5 @@ from .NexusCSVRenderer import NexusCSVRenderer from .NexusNETCDFRenderer import NexusNETCDFRenderer from .NexusPNGRenderer import NexusPNGRenderer -from .NexusZIPRenderer import NexusZIPRenderer \ No newline at end of file +from .NexusZIPRenderer import NexusZIPRenderer +from .NexusCAMLRenderer import NexusCAMLRenderer \ No newline at end of file