Skip to content

Commit 6d3a537

Browse files
committed
Merge branch 'develop' of https://github.com/KomMonitor/processes-api into develop
2 parents a9c9a92 + 1c6d7f2 commit 6d3a537

1 file changed

Lines changed: 222 additions & 0 deletions

File tree

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
import math
2+
import logging
3+
from typing import Tuple
4+
import openapi_client
5+
from openapi_client import ApiClient
6+
from prefect import task, flow
7+
from prefect.cache_policies import NO_CACHE
8+
from pygeoapi.process.base import *
9+
from pygeoapi.util import JobStatus
10+
from pygeoapi_prefect import schemas
11+
from pygeoapi_prefect.schemas import ProcessDescription, ProcessJobControlOption, Parameter, \
12+
AdditionalProcessIOParameters
13+
from pygeoapi_prefect.schemas import ProcessInput, ProcessIOSchema, ProcessIOType
14+
15+
try:
16+
from .. import pykmhelper
17+
except ImportError:
18+
from processor.process import pykmhelper
19+
20+
try:
21+
from ..pykmhelper import IndicatorType, IndicatorCollection, IndicatorCalculationType
22+
except ImportError:
23+
from processor.process.pykmhelper import IndicatorType, IndicatorCollection, IndicatorCalculationType
24+
25+
try:
26+
from ..base import KommonitorProcess, KommonitorProcessConfig, KommonitorResult, \
27+
KommonitorJobSummary, KOMMONITOR_DATA_MANAGEMENT_URL, generate_flow_run_name, DataManagementException
28+
except ImportError:
29+
from processor.process.base import KommonitorProcess, KommonitorProcessConfig, KommonitorResult, \
30+
KommonitorJobSummary, KOMMONITOR_DATA_MANAGEMENT_URL, generate_flow_run_name, DataManagementException
31+
32+
# this name should be set for @flow(name='<processName>') and within detailed_process_description as
33+
# additional_parameters.parameters[0].value[0].apiName
34+
# this is necessary in order to have a comparable name between prefect schedules and pygeoAPI process descriptions
35+
processName = "km_indicator_multiply_value"
36+
37+
@flow(persist_result=True, name=processName, flow_run_name=generate_flow_run_name)
38+
def process_flow(
39+
job_id: str,
40+
execution_request: schemas.ExecuteRequest
41+
) -> dict:
42+
return KommonitorProcess.execute_process_flow(KmIndicatorMultiplyValue.run, job_id, execution_request)
43+
44+
45+
class KmIndicatorMultiplyValue(KommonitorProcess):
46+
process_flow = process_flow
47+
48+
detailed_process_description = ProcessDescription(
49+
id=processName,
50+
version="0.0.1",
51+
title="Multiplikation (fester Wert)",
52+
description= "Multipliziert einen Indikator mit einem festen Wert.",
53+
example={},
54+
job_control_options=[
55+
ProcessJobControlOption.SYNC_EXECUTE,
56+
ProcessJobControlOption.ASYNC_EXECUTE,
57+
],
58+
additional_parameters=AdditionalProcessIOParameters(
59+
parameters=[
60+
Parameter(
61+
name="kommonitorUiParams",
62+
value=[{
63+
"longTitle": "Multiplikation eines Indikator mit einem festen Wert",
64+
"apiName": processName,
65+
"calculation_info": "Produkt aus Indikator und Wert",
66+
"formula": "$ \\prod_{n=1}^{m} I_{n} $",
67+
"legend": "",
68+
"dynamicFormula": "$$ prod_baseIndicators $$",
69+
"dynamicLegend": "${list_baseIndicators}",
70+
"inputBoxes": [
71+
{
72+
"id": "computation_id",
73+
"title": "Notwendiger (Basis-)Indikator",
74+
"description": "",
75+
"contents": [
76+
"computation_id"
77+
]
78+
}
79+
]
80+
}]
81+
)
82+
]
83+
),
84+
inputs=KommonitorProcess.common_inputs | {
85+
"computation_id": ProcessInput(
86+
id= "COMPUTATION_ID",
87+
title="für die Berechnung erforderlicher Basisindikator",
88+
description="Indikatoren ID des erforderlichen Basisindikators.",
89+
schema_=ProcessIOSchema(type_=ProcessIOType.STRING, required=True)
90+
),
91+
"num_value": ProcessInput(
92+
id= "NUM_VALUE",
93+
title="Multiplikationswert",
94+
description="Wert mit welchem der Basisindikator multipliziert wird.",
95+
schema_=ProcessIOSchema(type_=ProcessIOType.NUMBER, required=True)
96+
)
97+
},
98+
outputs = KommonitorProcess.common_output
99+
)
100+
101+
# run Method has to be implemented for all KomMonitor Skripts
102+
@staticmethod
103+
@task(cache_policy=NO_CACHE)
104+
def run(config: KommonitorProcessConfig,
105+
logger: logging.Logger,
106+
data_management_client: ApiClient) -> Tuple[JobStatus, KommonitorResult, KommonitorJobSummary]:
107+
108+
logger.debug("Starting execution...")
109+
110+
# Load inputs
111+
inputs = config.inputs
112+
# Extract all relevant inputs
113+
target_id = inputs["target_indicator_id"]
114+
computation_id = inputs["computation_id"]
115+
target_spatial_units = inputs["target_spatial_units"]
116+
target_time = inputs["target_time"]
117+
num_value = inputs["num_value"]
118+
119+
120+
# Init object to store computation results
121+
result = KommonitorResult()
122+
job_summary = KommonitorJobSummary()
123+
124+
try:
125+
# 3. Generate result || Main Script
126+
indicators_controller = openapi_client.IndicatorsApi(data_management_client)
127+
spatial_unit_controller = openapi_client.SpatialUnitsApi(data_management_client)
128+
129+
# create Indicator Objects and IndicatorCollection to store the informations belonging to the Indicator
130+
ti = IndicatorType(target_id, IndicatorCalculationType.TARGET_INDICATOR)
131+
132+
collection = IndicatorCollection()
133+
collection.add_indicator(IndicatorType(computation_id, IndicatorCalculationType.COMPUTATION_INDICATOR))
134+
135+
136+
# query indicator metadate to check for errors occured
137+
ti.get_indicator_by_id(indicators_controller)
138+
139+
for indicator in collection.indicators:
140+
collection.indicators[indicator].get_indicator_by_id(indicators_controller)
141+
142+
# calculate intersection dates and all dates that have to be computed according to target_time schema
143+
bool_missing_timestamp, all_times = pykmhelper.getAll_target_time_from_indicator_collection(ti, collection, target_time)
144+
145+
for spatial_unit in target_spatial_units:
146+
147+
# Init results and job summary for current spatial unit
148+
job_summary.init_spatial_unit_summary(spatial_unit)
149+
result.init_spatial_unit_result_with_indicator(spatial_unit, spatial_unit_controller, ti)
150+
151+
# query data-management-api to get all spatial unit features for the current spatial unit.
152+
# store the list containing all features-IDs as an attribute for the collection
153+
collection.fetch_all_spatial_unit_features(spatial_unit_controller, spatial_unit)
154+
155+
# catch missing timestamp error
156+
if bool_missing_timestamp:
157+
collection.check_applicable_target_dates(job_summary)
158+
159+
# catch missing spatial unit error
160+
collection.check_applicable_spatial_units(spatial_unit, job_summary)
161+
162+
# query the correct indicator for numerator and denominator
163+
for indicator in collection.indicators:
164+
collection.indicators[indicator].values = indicators_controller.get_indicator_by_spatial_unit_id_and_id_without_geometry(
165+
indicator,
166+
spatial_unit)
167+
168+
collection.fetch_indicator_feature_time_series()
169+
170+
# get the intersection of all applicable su_features and check for missing spatial unit feature error
171+
collection.find_intersection_applicable_su_features()
172+
collection.check_applicable_spatial_unit_features(job_summary)
173+
174+
logger.debug("Retrieved required indicators successfully")
175+
176+
# iterate over all features an append the indicator here happen the main calculations for the requested values
177+
indicator_values = []
178+
for feature in collection.intersection_su_features:
179+
valueMapping = []
180+
for targetTime in all_times:
181+
try:
182+
try:
183+
time_with_prefix = pykmhelper.getTargetDateWithPropertyPrefix(targetTime)
184+
185+
time_value = float(collection.indicators[computation_id].time_series[feature][time_with_prefix])
186+
value = float(num_value) * time_value
187+
except TypeError:
188+
value = None
189+
190+
except RuntimeError as r:
191+
logger.error(r)
192+
logger.error(f"There occurred an error during the processing of the indicator for spatial unit: {spatial_unit}")
193+
job_summary.add_processing_error("INDICATOR", computation_id, str(r), targetTime, feature)
194+
value = None
195+
196+
valueMapping.append({"indicatorValue": value, "timestamp": targetTime})
197+
indicator_values.append({"spatialReferenceKey": str(feature), "valueMapping": valueMapping})
198+
199+
# Job Summary and results
200+
job_summary.add_number_of_integrated_features(len(indicator_values))
201+
job_summary.add_integrated_target_dates(all_times)
202+
job_summary.add_modified_resource(KOMMONITOR_DATA_MANAGEMENT_URL, target_id, spatial_unit)
203+
job_summary.complete_spatial_unit_summary()
204+
205+
result.add_indicator_values(indicator_values)
206+
result.complete_spatial_unit_result()
207+
208+
# logger.info(result.values)
209+
# logger.info(job_summary.summary)
210+
# 4.1 Return success and result
211+
return JobStatus.successful, result, job_summary
212+
except DataManagementException as e:
213+
logger.error(f"Error while requesting Data Management API: {e}")
214+
# 4.2 Catch possible errors cleanly
215+
if e.spatial_unit and bool(job_summary):
216+
job_summary.add_data_management_api_error(e.resource_type, e.id, e.error_code, e)
217+
job_summary.complete_spatial_unit_summary()
218+
else:
219+
job_summary.init_spatial_unit_summary(target_spatial_units[0])
220+
job_summary.add_data_management_api_error(e.resource_type, e.id, e.error_code, e)
221+
job_summary.complete_spatial_unit_summary()
222+
return JobStatus.failed, None, job_summary

0 commit comments

Comments
 (0)