Skip to content

Commit 12f5893

Browse files
SofiaSazonovaSofia Sazonovapetrkalos
authored
Asyncronous notification for mf enforcement rules (#1804)
bugfixes ### Feature or Bugfix <!-- please choose --> - When MF enforcement rule is created, entity owners are notified asynchronously. MF enforcement rule is created and return immidiately - fix for delete_mf_enforcement_rule - small typos ### Detail - <feature1 or bug1> - <feature2 or bug2> ### Relates - <URL or Ticket> ### Security Please answer the questions below briefly where applicable, or write `N/A`. Based on [OWASP 10](https://owasp.org/Top10/en/). - Does this PR introduce or modify any input fields or queries - this includes fetching data from storage outside the application (e.g. a database, an S3 bucket)? - Is the input sanitized? - What precautions are you taking before deserializing the data you consume? - Is injection prevented by parametrizing queries? - Have you ensured no `eval` or similar functions are used? - Does this PR introduce any functionality or component that requires authorization? - How have you ensured it respects the existing AuthN/AuthZ mechanisms? - Are you logging failed auth attempts? - Are you using or adding any cryptographic features? - Do you use a standard proven implementations? - Are the used keys controlled by the customer? Where are they stored? - Are you introducing any new policies/roles/users? - Have you used the least-privilege principle? How? By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --------- Co-authored-by: Sofia Sazonova <sazonova@amazon.co.uk> Co-authored-by: Petros Kalos <kalosp@amazon.com>
1 parent 34799e8 commit 12f5893

5 files changed

Lines changed: 180 additions & 134 deletions

File tree

backend/dataall/modules/metadata_forms/__init__.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
from typing import List, Type
2-
1+
from typing import List, Type, Set
2+
import logging
33
from dataall.base.loader import ModuleInterface, ImportMode
44

5+
log = logging.getLogger(__name__)
6+
57

68
class MetadataFormsApiModuleInterface(ModuleInterface):
79
"""Implements ModuleInterface for Metadata Forms GraphQl lambda"""
@@ -19,3 +21,19 @@ def depends_on() -> List[Type['ModuleInterface']]:
1921
from dataall.modules.datasets_base import DatasetBaseApiModuleInterface
2022

2123
return [DatasetBaseApiModuleInterface]
24+
25+
26+
class MetadataFormAsyncHandlersModuleInterface(ModuleInterface):
27+
"""Implements ModuleInterface for metadataform async lambda"""
28+
29+
@staticmethod
30+
def is_supported(modes: Set[ImportMode]):
31+
return ImportMode.HANDLERS in modes
32+
33+
def __init__(self):
34+
import dataall.modules.metadata_forms.handlers
35+
import dataall.modules.metadata_forms.db.metadata_form_models
36+
import dataall.modules.metadata_forms.db.metadata_form_repository
37+
import dataall.modules.metadata_forms.services.metadata_form_enforcement_service
38+
39+
log.info('Metadata Form handlers have been imported')

backend/dataall/modules/metadata_forms/db/metadata_form_repository.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,5 @@ def update_version_in_rules(session, uri, version):
391391

392392
@staticmethod
393393
def delete_rule(session, rule_uri):
394-
session.query(MetadataFormEnforcementRule).filter(
395-
MetadataFormEnforcementRule.metadataFormUri == rule_uri
396-
).delete()
394+
session.query(MetadataFormEnforcementRule).filter(MetadataFormEnforcementRule.uri == rule_uri).delete()
397395
session.commit()
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from dataall.modules.metadata_forms.handlers import metadata_form_handler
2+
3+
__all__ = ['metadata_form_handler']
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from dataall.core.tasks.service_handlers import Worker
2+
from dataall.core.tasks.db.task_models import Task
3+
from dataall.modules.metadata_forms.services.metadata_form_enforcement_service import MetadataFormEnforcementService
4+
5+
6+
class EcsMetadataFormHandler:
7+
@staticmethod
8+
@Worker.handler(path='metadata_form.enforcement.notify')
9+
def notify_owners_of_enforcement(engine, task: Task):
10+
with engine.scoped_session() as session:
11+
MetadataFormEnforcementService.notify_owners_of_enforcement(
12+
session=session, rule_uri=task.targetUri, mf_name=task.payload.get('mf_name', 'Unknown name')
13+
)

backend/dataall/modules/metadata_forms/services/metadata_form_enforcement_service.py

Lines changed: 143 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from typing import List
22
from dataall.base.context import get_context
3-
from dataall.base.db import exceptions
3+
from dataall.base.db import exceptions, Engine
44
from dataall.base.db.paginator import paginate_list
5+
from dataall.core.tasks.db.task_models import Task
6+
from dataall.core.tasks.service_handlers import Worker
57
from dataall.core.environment.db.environment_repositories import EnvironmentRepository
68
from dataall.core.environment.db.environment_models import Environment
79
from dataall.core.organizations.db.organization_models import Organization
@@ -68,6 +70,20 @@ class MetadataFormEnforcementService:
6870
def _get_entity_uri(session, data):
6971
return data.get('homeEntity')
7072

73+
@classmethod
74+
def notify_owners_of_enforcement(cls, session, rule_uri: str, mf_name: str) -> bool:
75+
affected_entities = MetadataFormEnforcementService._get_affected_entities(session=session, uri=rule_uri)
76+
for entity in affected_entities:
77+
if entity['owner']:
78+
NotificationRepository.create_notification(
79+
session,
80+
recipient=entity['owner'],
81+
target_uri=f'{entity["uri"]}|{entity["type"]}',
82+
message=f'Usage of metadata form "{mf_name}" was enforced for {entity["uri"]} {entity["type"]}',
83+
notification_type='METADATA_FORM_ENFORCED',
84+
)
85+
return True
86+
7187
@staticmethod
7288
@TenantPolicyService.has_tenant_permission(MANAGE_METADATA_FORMS)
7389
@MetadataFormAccessService.can_perform(ENFORCE_METADATA_FORM)
@@ -79,154 +95,148 @@ def create_mf_enforcement_rule(uri, data):
7995
version = MetadataFormRepository.get_metadata_form_version_number_latest(session, uri)
8096
rule = MetadataFormRepository.create_mf_enforcement_rule(session, uri, data, version)
8197

82-
affected_entities = MetadataFormEnforcementService.get_affected_entities(rule.uri, rule=rule)
83-
for entity in affected_entities:
84-
if entity['owner']:
85-
NotificationRepository.create_notification(
86-
session,
87-
recipient=entity['owner'],
88-
target_uri=f'{entity["uri"]}|{entity["type"]}',
89-
message=f'Usage of metadata form "{mf.name}" was enforced for {entity["uri"]} {entity["type"]}',
90-
notification_type='METADATA_FORM_ENFORCED',
91-
)
98+
task = Task(
99+
targetUri=rule.uri,
100+
action='metadata_form.enforcement.notify',
101+
payload={'mf_name': mf.name},
102+
)
103+
session.add(task)
104+
session.commit()
105+
106+
Worker.queue(engine=get_context().db_engine, task_ids=[task.taskUri])
92107

93108
return rule
94109

95110
@staticmethod
96-
def get_affected_organizations(uri, rule=None) -> List[Organization]:
97-
with get_context().db_engine.scoped_session() as session:
98-
if not rule:
99-
rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri)
100-
if rule.level == MetadataFormEnforcementScope.Global.value:
101-
return OrganizationRepository.query_all_active_organizations(session)
102-
if rule.level == MetadataFormEnforcementScope.Organization.value:
103-
return [OrganizationRepository.get_organization_by_uri(session, rule.homeEntity)]
104-
return []
111+
def _get_affected_organizations(session, uri, rule=None) -> List[Organization]:
112+
if not rule:
113+
rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri)
114+
if rule.level == MetadataFormEnforcementScope.Global.value:
115+
return OrganizationRepository.query_all_active_organizations(session)
116+
if rule.level == MetadataFormEnforcementScope.Organization.value:
117+
return [OrganizationRepository.get_organization_by_uri(session, rule.homeEntity)]
118+
return []
105119

106120
@staticmethod
107-
def get_affected_environments(uri, rule=None) -> List[Environment]:
108-
with get_context().db_engine.scoped_session() as session:
109-
if not rule:
110-
rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri)
111-
if rule.level == MetadataFormEnforcementScope.Global.value:
112-
return EnvironmentRepository.query_all_active_environments(session)
113-
if rule.level == MetadataFormEnforcementScope.Organization.value:
114-
return OrganizationRepository.query_organization_environments(
115-
session, uri=rule.homeEntity, filter=None
116-
).all()
117-
if rule.level == MetadataFormEnforcementScope.Environment.value:
118-
return [EnvironmentRepository.get_environment_by_uri(session, rule.homeEntity)]
119-
return []
121+
def _get_affected_environments(session, uri, rule=None) -> List[Environment]:
122+
if not rule:
123+
rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri)
124+
if rule.level == MetadataFormEnforcementScope.Global.value:
125+
return EnvironmentRepository.query_all_active_environments(session)
126+
if rule.level == MetadataFormEnforcementScope.Organization.value:
127+
return OrganizationRepository.query_organization_environments(
128+
session, uri=rule.homeEntity, filter=None
129+
).all()
130+
if rule.level == MetadataFormEnforcementScope.Environment.value:
131+
return [EnvironmentRepository.get_environment_by_uri(session, rule.homeEntity)]
132+
return []
120133

121134
@staticmethod
122-
def get_affected_datasets(uri, rule=None) -> List[DatasetBase]:
123-
with get_context().db_engine.scoped_session() as session:
124-
if not rule:
125-
rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri)
126-
if rule.level == MetadataFormEnforcementScope.Global.value:
127-
return DatasetListRepository.query_datasets(session).all()
128-
if rule.level == MetadataFormEnforcementScope.Organization.value:
129-
return DatasetListRepository.query_datasets(session, organizationUri=rule.homeEntity).all()
130-
if rule.level == MetadataFormEnforcementScope.Environment.value:
131-
return DatasetListRepository.query_datasets(session, environmentUri=rule.homeEntity).all()
132-
if rule.level == MetadataFormEnforcementScope.Dataset.value:
133-
return [DatasetBaseRepository.get_dataset_by_uri(session, rule.homeEntity)]
134-
return []
135+
def _get_affected_datasets(session, uri, rule=None) -> List[DatasetBase]:
136+
if not rule:
137+
rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri)
138+
if rule.level == MetadataFormEnforcementScope.Global.value:
139+
return DatasetListRepository.query_datasets(session).all()
140+
if rule.level == MetadataFormEnforcementScope.Organization.value:
141+
return DatasetListRepository.query_datasets(session, organizationUri=rule.homeEntity).all()
142+
if rule.level == MetadataFormEnforcementScope.Environment.value:
143+
return DatasetListRepository.query_datasets(session, environmentUri=rule.homeEntity).all()
144+
if rule.level == MetadataFormEnforcementScope.Dataset.value:
145+
return [DatasetBaseRepository.get_dataset_by_uri(session, rule.homeEntity)]
146+
return []
135147

136148
@staticmethod
137-
def get_attachement_for_rule(rule, entityUri) -> AttachedMetadataForm:
138-
with get_context().db_engine.scoped_session() as session:
139-
return MetadataFormRepository.query_all_attached_metadata_forms_for_entity(
140-
session,
141-
entityUri=entityUri,
142-
metadataFormUri=rule.metadataFormUri,
143-
version=rule.version,
144-
).first()
149+
def _get_attachement_for_rule(session, rule, entityUri) -> AttachedMetadataForm:
150+
return MetadataFormRepository.query_all_attached_metadata_forms_for_entity(
151+
session,
152+
entityUri=entityUri,
153+
metadataFormUri=rule.metadataFormUri,
154+
version=rule.version,
155+
).first()
145156

146157
@staticmethod
147-
def form_affected_entity_object(type, entity: MetadataFormEntity, rule):
158+
def _form_affected_entity_object(session, type, entity: MetadataFormEntity, rule):
148159
return {
149160
'type': type,
150161
'name': entity.entity_name(),
151162
'uri': entity.uri(),
152163
'owner': entity.owner_name(),
153-
'attached': MetadataFormEnforcementService.get_attachement_for_rule(rule, entity.uri()),
164+
'attached': MetadataFormEnforcementService._get_attachement_for_rule(session, rule, entity.uri()),
154165
}
155166

156167
@staticmethod
157-
def get_affected_entities(uri, rule=None):
168+
def _get_affected_entities(session, uri, rule=None):
158169
affected_entities = []
159-
with get_context().db_engine.scoped_session() as session:
160-
if not rule:
161-
rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri)
170+
if not rule:
171+
rule = MetadataFormRepository.get_mf_enforcement_rule_by_uri(session, uri)
172+
173+
orgs = MetadataFormEnforcementService._get_affected_organizations(session, uri, rule)
174+
if MetadataFormEntityTypes.Organization.value in rule.entityTypes:
175+
affected_entities.extend(
176+
[
177+
MetadataFormEnforcementService._form_affected_entity_object(
178+
session, MetadataFormEntityTypes.Organization.value, o, rule
179+
)
180+
for o in orgs
181+
]
182+
)
162183

163-
orgs = MetadataFormEnforcementService.get_affected_organizations(uri, rule)
164-
if MetadataFormEntityTypes.Organization.value in rule.entityTypes:
165-
affected_entities.extend(
166-
[
167-
MetadataFormEnforcementService.form_affected_entity_object(
168-
MetadataFormEntityTypes.Organization.value, o, rule
169-
)
170-
for o in orgs
171-
]
172-
)
184+
envs = MetadataFormEnforcementService._get_affected_environments(session, uri, rule)
185+
if MetadataFormEntityTypes.Environment.value in rule.entityTypes:
186+
affected_entities.extend(
187+
[
188+
MetadataFormEnforcementService._form_affected_entity_object(
189+
session, MetadataFormEntityTypes.Environment.value, e, rule
190+
)
191+
for e in envs
192+
]
193+
)
173194

174-
envs = MetadataFormEnforcementService.get_affected_environments(uri, rule)
175-
if MetadataFormEntityTypes.Environment.value in rule.entityTypes:
176-
affected_entities.extend(
177-
[
178-
MetadataFormEnforcementService.form_affected_entity_object(
179-
MetadataFormEntityTypes.Environment.value, e, rule
180-
)
181-
for e in envs
182-
]
183-
)
195+
datasets = []
196+
if MetadataFormEntityManager.is_registered(
197+
MetadataFormEntityTypes.S3Dataset.value
198+
) or MetadataFormEntityManager.is_registered(MetadataFormEntityTypes.RDDataset.value):
199+
datasets = MetadataFormEnforcementService._get_affected_datasets(session, uri, rule)
200+
affected_entities.extend(
201+
[
202+
MetadataFormEnforcementService._form_affected_entity_object(
203+
session, ds.datasetType.value + '-Dataset', ds, rule
204+
)
205+
for ds in datasets
206+
if ds.datasetType.value + '-Dataset' in rule.entityTypes
207+
and MetadataFormEntityManager.is_registered(ds.datasetType.value + '-Dataset')
208+
]
209+
)
184210

185-
datasets = []
186-
if MetadataFormEntityManager.is_registered(
187-
MetadataFormEntityTypes.S3Dataset.value
188-
) or MetadataFormEntityManager.is_registered(MetadataFormEntityTypes.RDDataset.value):
189-
datasets = MetadataFormEnforcementService.get_affected_datasets(uri, rule)
190-
affected_entities.extend(
191-
[
192-
MetadataFormEnforcementService.form_affected_entity_object(
193-
ds.datasetType.value + '-Dataset', ds, rule
194-
)
195-
for ds in datasets
196-
if ds.datasetType.value + '-Dataset' in rule.entityTypes
197-
and MetadataFormEntityManager.is_registered(ds.datasetType.value + '-Dataset')
198-
]
199-
)
211+
entity_types = set(rule.entityTypes[:]) - {
212+
MetadataFormEntityTypes.Organization.value,
213+
MetadataFormEntityTypes.Environment.value,
214+
MetadataFormEntityTypes.RDDataset.value,
215+
MetadataFormEntityTypes.S3Dataset.value,
216+
}
200217

201-
entity_types = set(rule.entityTypes[:]) - {
202-
MetadataFormEntityTypes.Organization.value,
203-
MetadataFormEntityTypes.Environment.value,
204-
MetadataFormEntityTypes.RDDataset.value,
205-
MetadataFormEntityTypes.S3Dataset.value,
206-
}
207-
208-
for entity_type in entity_types:
209-
entity_class = MetadataFormEntityManager.get_resource(entity_type)
210-
level = ENTITY_SCOPE_BY_TYPE[entity_type]
211-
all_entities = session.query(entity_class)
212-
if level == MetadataFormEnforcementScope.Organization:
213-
all_entities = all_entities.filter(
214-
entity_class.organizationUri.in_([org.organizationUri for org in orgs])
215-
)
216-
if level == MetadataFormEnforcementScope.Environment:
217-
all_entities = all_entities.filter(
218-
entity_class.environmentUri.in_([env.environmentUri for env in envs])
219-
)
220-
if level == MetadataFormEnforcementScope.Dataset:
221-
all_entities = all_entities.filter(entity_class.datasetUri.in_([ds.datasetUri for ds in datasets]))
222-
all_entities = all_entities.all()
223-
affected_entities.extend(
224-
[
225-
MetadataFormEnforcementService.form_affected_entity_object(entity_type, e, rule)
226-
for e in all_entities
227-
]
218+
for entity_type in entity_types:
219+
entity_class = MetadataFormEntityManager.get_resource(entity_type)
220+
level = ENTITY_SCOPE_BY_TYPE[entity_type]
221+
all_entities = session.query(entity_class)
222+
if level == MetadataFormEnforcementScope.Organization:
223+
all_entities = all_entities.filter(
224+
entity_class.organizationUri.in_([org.organizationUri for org in orgs])
225+
)
226+
if level == MetadataFormEnforcementScope.Environment:
227+
all_entities = all_entities.filter(
228+
entity_class.environmentUri.in_([env.environmentUri for env in envs])
228229
)
229-
return affected_entities
230+
if level == MetadataFormEnforcementScope.Dataset:
231+
all_entities = all_entities.filter(entity_class.datasetUri.in_([ds.datasetUri for ds in datasets]))
232+
all_entities = all_entities.all()
233+
affected_entities.extend(
234+
[
235+
MetadataFormEnforcementService._form_affected_entity_object(session, entity_type, e, rule)
236+
for e in all_entities
237+
]
238+
)
239+
return affected_entities
230240

231241
@staticmethod
232242
def list_mf_enforcement_rules(uri):
@@ -236,11 +246,13 @@ def list_mf_enforcement_rules(uri):
236246
@staticmethod
237247
def paginate_mf_affected_entities(uri, data=None):
238248
data = data or {}
239-
return paginate_list(
240-
items=MetadataFormEnforcementService.get_affected_entities(uri),
241-
page=data.get('page', 1),
242-
page_size=data.get('pageSize', 10),
243-
).to_dict()
249+
250+
with get_context().db_engine.scoped_session() as session:
251+
return paginate_list(
252+
items=MetadataFormEnforcementService._get_affected_entities(session=session, uri=uri),
253+
page=data.get('page', 1),
254+
page_size=data.get('pageSize', 10),
255+
).to_dict()
244256

245257
@staticmethod
246258
def resolve_home_entity(uri, rule=None):
@@ -283,6 +295,8 @@ def get_rules_that_affect_entity(entity_type, entity_uri):
283295
entity_scope = ENTITY_SCOPE_BY_TYPE[entity_type]
284296
with get_context().db_engine.scoped_session() as session:
285297
entity = session.query(entity_class).get(entity_uri)
298+
if not entity:
299+
return []
286300
parent_dataset_uri, parent_env_uri, parent_org_uri = None, None, None
287301

288302
if entity_scope == MetadataFormEnforcementScope.Dataset:
@@ -339,7 +353,7 @@ def get_rules_that_affect_entity(entity_type, entity_uri):
339353
)
340354

341355
for r in all_rules:
342-
attached = MetadataFormEnforcementService.get_attachement_for_rule(r, entity_uri)
356+
attached = MetadataFormEnforcementService._get_attachement_for_rule(session, r, entity_uri)
343357
r.attached = attached.uri if attached else None
344358
r.metadataFormName = MetadataFormRepository.get_metadata_form(session, r.metadataFormUri).name
345359

0 commit comments

Comments
 (0)