Skip to content

Commit 8a7d7e8

Browse files
Add some tests for owner-config and enhance this feat
1 parent 9225e48 commit 8a7d7e8

File tree

18 files changed

+2328
-99
lines changed

18 files changed

+2328
-99
lines changed

ingestion/src/metadata/ingestion/source/database/database_service.py

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -633,18 +633,15 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList
633633
EntityReferenceList with owner or None
634634
"""
635635
try:
636-
# Get parent (database) owner for inheritance
637636
parent_owner = None
638637
database_entity = getattr(self.context.get(), "database_entity", None)
639638
if database_entity:
640639
db_owners = database_entity.owners
641640
if db_owners and db_owners.root:
642641
parent_owner = db_owners.root[0].name
643642

644-
# Build FQN for more precise matching
645643
schema_fqn = f"{self.context.get().database}.{schema_name}"
646644

647-
# Priority 1: Use ownerConfig if configured
648645
if (
649646
hasattr(self.source_config, "ownerConfig")
650647
and self.source_config.ownerConfig
@@ -653,24 +650,12 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList
653650
metadata=self.metadata,
654651
owner_config=self.source_config.ownerConfig,
655652
entity_type="databaseSchema",
656-
entity_name=schema_fqn, # Use FQN for matching
653+
entity_name=schema_fqn,
657654
parent_owner=parent_owner,
658655
)
659656
if owner_ref:
660657
return owner_ref
661658

662-
# Also try simple name if FQN didn't match
663-
if schema_fqn != schema_name:
664-
owner_ref = get_owner_from_config(
665-
metadata=self.metadata,
666-
owner_config=self.source_config.ownerConfig,
667-
entity_type="databaseSchema",
668-
entity_name=schema_name,
669-
parent_owner=parent_owner,
670-
)
671-
if owner_ref:
672-
return owner_ref
673-
674659
except Exception as exc:
675660
logger.debug(traceback.format_exc())
676661
logger.warning(f"Error processing owner for schema {schema_name}: {exc}")
@@ -693,7 +678,6 @@ def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]:
693678
EntityReferenceList with owner or None
694679
"""
695680
try:
696-
# Get parent (schema) owner for inheritance
697681
parent_owner = None
698682
database_schema_entity = getattr(
699683
self.context.get(), "database_schema_entity", None
@@ -703,48 +687,22 @@ def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]:
703687
if schema_owners and schema_owners.root:
704688
parent_owner = schema_owners.root[0].name
705689

706-
# Build FQN for more precise matching
707690
table_fqn = f"{self.context.get().database}.{self.context.get().database_schema}.{table_name}"
708691

709-
# Priority 1: Use ownerConfig if configured
710692
if (
711693
hasattr(self.source_config, "ownerConfig")
712694
and self.source_config.ownerConfig
713695
):
714-
logger.debug(
715-
f"Trying ownerConfig for table '{table_name}', FQN: '{table_fqn}'"
716-
)
717-
logger.debug(f"Owner config: {self.source_config.ownerConfig}")
718696
owner_ref = get_owner_from_config(
719697
metadata=self.metadata,
720698
owner_config=self.source_config.ownerConfig,
721699
entity_type="table",
722-
entity_name=table_fqn, # Use FQN for matching
700+
entity_name=table_fqn,
723701
parent_owner=parent_owner,
724702
)
725703
if owner_ref:
726-
logger.debug(f"Found owner from FQN match: {owner_ref}")
727704
return owner_ref
728705

729-
# Also try simple name if FQN didn't match
730-
if table_fqn != table_name:
731-
logger.debug(
732-
f"FQN match failed, trying simple name: '{table_name}'"
733-
)
734-
owner_ref = get_owner_from_config(
735-
metadata=self.metadata,
736-
owner_config=self.source_config.ownerConfig,
737-
entity_type="table",
738-
entity_name=table_name,
739-
parent_owner=parent_owner,
740-
)
741-
if owner_ref:
742-
logger.debug(f"Found owner from simple name match: {owner_ref}")
743-
return owner_ref
744-
745-
logger.debug(f"No owner found for table '{table_name}'")
746-
747-
# Priority 2: Extract owner from source system (if includeOwners enabled)
748706
if self.source_config.includeOwners and hasattr(
749707
self.inspector, "get_table_owner"
750708
):

ingestion/src/metadata/utils/owner_utils.py

Lines changed: 89 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"""
88

99
import traceback
10-
from typing import Dict, Optional, Union
10+
from typing import Dict, List, Optional, Union
1111

1212
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
1313
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@@ -24,9 +24,9 @@ class OwnerResolver:
2424
{
2525
"default": "fallback-owner", # Default owner for all entities
2626
"service": "service-owner", # Optional
27-
"database": "db-owner" | {"db1": "owner1", "db2": "owner2"},
28-
"schema": "schema-owner" | {"schema1": "owner1"},
29-
"table": "table-owner" | {"table1": "owner1"},
27+
"database": "db-owner" | {"db1": "owner1", "db2": "owner2"} | {"db3": ["owner1", "owner2"]},
28+
"databaseSchema": "schema-owner" | {"schema1": "owner1"} | {"schema2": ["owner1", "owner2"]},
29+
"table": "table-owner" | {"table1": "owner1"} | {"table2": ["owner1", "owner2"]},
3030
"enableInheritance": true # Default true
3131
}
3232
@@ -59,7 +59,7 @@ def resolve_owner(
5959
Resolve owner for an entity based on configuration
6060
6161
Args:
62-
entity_type: Type of entity ("database", "schema", "table")
62+
entity_type: Type of entity ("database", "databaseSchema", "table")
6363
entity_name: Name or FQN of the entity
6464
parent_owner: Owner inherited from parent entity
6565
@@ -82,30 +82,30 @@ def resolve_owner(
8282
if level_config:
8383
# If it's a dict, try exact matching
8484
if isinstance(level_config, dict):
85-
# First try full name matching
85+
# First try full name matching (FQN)
8686
if entity_name in level_config:
87-
owner_name = level_config[entity_name]
88-
owner_ref = self._get_owner_ref(owner_name)
87+
owner_names = level_config[entity_name]
88+
owner_ref = self._get_owner_refs(owner_names)
8989
if owner_ref:
9090
logger.debug(
91-
f"Using specific {entity_type} owner for '{entity_name}': {owner_name}"
91+
f"Matched owner for '{entity_name}' using FQN: {owner_names}"
9292
)
9393
return owner_ref
9494

95-
# Try matching with only the last part of the name (e.g., "sales_db.public.orders" matches "orders")
95+
# Fallback to simple name matching
9696
simple_name = entity_name.split(".")[-1]
9797
if simple_name != entity_name and simple_name in level_config:
98-
owner_name = level_config[simple_name]
99-
owner_ref = self._get_owner_ref(owner_name)
98+
owner_names = level_config[simple_name]
99+
owner_ref = self._get_owner_refs(owner_names)
100100
if owner_ref:
101-
logger.debug(
102-
f"Using specific {entity_type} owner for '{simple_name}': {owner_name}"
101+
logger.info(
102+
f"FQN match failed for '{entity_name}', matched using simple name '{simple_name}': {owner_names}"
103103
)
104104
return owner_ref
105105

106106
# If it's a string, use it directly
107107
elif isinstance(level_config, str):
108-
owner_ref = self._get_owner_ref(level_config)
108+
owner_ref = self._get_owner_refs(level_config)
109109
if owner_ref:
110110
logger.debug(
111111
f"Using {entity_type} level owner for '{entity_name}': {level_config}"
@@ -114,7 +114,7 @@ def resolve_owner(
114114

115115
# 2. If inheritance is enabled, use parent owner
116116
if self.enable_inheritance and parent_owner:
117-
owner_ref = self._get_owner_ref(parent_owner)
117+
owner_ref = self._get_owner_refs(parent_owner)
118118
if owner_ref:
119119
logger.debug(
120120
f"Using inherited owner for '{entity_name}': {parent_owner}"
@@ -124,7 +124,7 @@ def resolve_owner(
124124
# 3. Use default owner
125125
default_owner = self.config.get("default")
126126
if default_owner:
127-
owner_ref = self._get_owner_ref(default_owner)
127+
owner_ref = self._get_owner_refs(default_owner)
128128
if owner_ref:
129129
logger.debug(
130130
f"Using default owner for '{entity_name}': {default_owner}"
@@ -139,41 +139,91 @@ def resolve_owner(
139139

140140
return None
141141

142-
def _get_owner_ref(self, owner_name: str) -> Optional[EntityReferenceList]:
142+
def _get_owner_refs(
143+
self, owner_names: Union[str, List[str]]
144+
) -> Optional[EntityReferenceList]:
143145
"""
144-
Get owner reference from OpenMetadata
146+
Get owner references from OpenMetadata (supports single or multiple owners)
147+
148+
Business Rules:
149+
- Multiple users are allowed
150+
- Only ONE team is allowed
151+
- Users and teams are mutually exclusive
145152
146153
Args:
147-
owner_name: User or team name/email
154+
owner_names: Single owner name or list of owner names (user/team name or email)
148155
149156
Returns:
150-
EntityReferenceList or None if not found
157+
EntityReferenceList with all found owners, or None if none found
151158
"""
152-
try:
153-
if not owner_name:
154-
return None
159+
if isinstance(owner_names, str):
160+
owner_names = [owner_names]
155161

156-
# Try to get owner by name (handles both users and teams)
157-
owner_ref = self.metadata.get_reference_by_name(
158-
name=owner_name, is_owner=True
159-
)
162+
if not owner_names:
163+
return None
164+
165+
all_owners = []
166+
owner_types = set() # Track types: 'user' or 'team'
160167

161-
if owner_ref:
162-
return owner_ref
168+
for owner_name in owner_names:
169+
try:
170+
if not owner_name:
171+
continue
172+
173+
owner_ref = self.metadata.get_reference_by_name(
174+
name=owner_name, is_owner=True
175+
)
163176

164-
# Try by email if name lookup failed and it looks like an email
165-
if "@" in owner_name:
166-
owner_ref = self.metadata.get_reference_by_email(owner_name)
167177
if owner_ref:
168-
return owner_ref
178+
if owner_ref.root:
179+
owner_entity = owner_ref.root[0]
180+
all_owners.append(owner_entity)
181+
owner_types.add(owner_entity.type)
182+
logger.debug(
183+
f"Found owner: {owner_name} (type: {owner_entity.type})"
184+
)
185+
continue
169186

170-
logger.warning(f"Could not find owner: {owner_name}")
187+
if "@" in owner_name:
188+
owner_ref = self.metadata.get_reference_by_email(owner_name)
189+
if owner_ref:
190+
if owner_ref.root:
191+
owner_entity = owner_ref.root[0]
192+
all_owners.append(owner_entity)
193+
owner_types.add(owner_entity.type)
194+
logger.debug(
195+
f"Found owner by email: {owner_name} (type: {owner_entity.type})"
196+
)
197+
continue
171198

172-
except Exception as exc:
173-
logger.debug(f"Error getting owner reference for '{owner_name}': {exc}")
174-
logger.debug(traceback.format_exc())
199+
logger.warning(f"Could not find owner: {owner_name}")
175200

176-
return None
201+
except Exception as exc:
202+
logger.warning(
203+
f"Error getting owner reference for '{owner_name}': {exc}"
204+
)
205+
logger.debug(traceback.format_exc())
206+
207+
if not all_owners:
208+
return None
209+
210+
# VALIDATION 1: Cannot mix users and teams
211+
if len(owner_types) > 1:
212+
logger.warning(
213+
f"VALIDATION ERROR: Cannot mix users and teams in owner list. "
214+
f"Found types: {owner_types}. Skipping this owner configuration."
215+
)
216+
return None
217+
218+
# VALIDATION 2: Only one team allowed
219+
if "team" in owner_types and len(all_owners) > 1:
220+
logger.warning(
221+
f"VALIDATION ERROR: Only ONE team allowed as owner, but got {len(all_owners)} teams. "
222+
f"Using only the first team: {all_owners[0].name}"
223+
)
224+
return EntityReferenceList(root=[all_owners[0]])
225+
226+
return EntityReferenceList(root=all_owners)
177227

178228

179229
def get_owner_from_config(

0 commit comments

Comments
 (0)