Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1506,21 +1506,19 @@ private void mapRelationshipAttributes(AtlasEntity entity, AtlasEntityType entit

if (op.equals(CREATE)) {
for (String attrName : entityType.getRelationshipAttributes().keySet()) {
Object attrValue = entity.getRelationshipAttribute(attrName);
String relationType = AtlasEntityUtil.getRelationshipType(attrValue);
AtlasAttribute attribute = entityType.getRelationshipAttribute(attrName, relationType);

mapAttribute(attribute, attrValue, vertex, op, context);
Object attrValue = entity.getRelationshipAttribute(attrName);
if (attrValue != null) {
mapRelationshipAttributeWithMultipleTypes(entity, entityType, attrName, attrValue, vertex, op, context);
}
}
} else if (op.equals(UPDATE) || op.equals(PARTIAL_UPDATE)) {
// relationship attributes mapping
for (String attrName : entityType.getRelationshipAttributes().keySet()) {
if (entity.hasRelationshipAttribute(attrName)) {
Object attrValue = entity.getRelationshipAttribute(attrName);
String relationType = AtlasEntityUtil.getRelationshipType(attrValue);
AtlasAttribute attribute = entityType.getRelationshipAttribute(attrName, relationType);

mapAttribute(attribute, attrValue, vertex, op, context);
Object attrValue = entity.getRelationshipAttribute(attrName);
if (attrValue != null) {
mapRelationshipAttributeWithMultipleTypes(entity, entityType, attrName, attrValue, vertex, op, context);
}
}
}
}
Expand All @@ -1533,6 +1531,98 @@ private void mapRelationshipAttributes(AtlasEntity entity, AtlasEntityType entit
LOG.debug("<== mapRelationshipAttributes({}, {})", op, entity.getTypeName());
}

private void mapRelationshipAttributeWithMultipleTypes(AtlasEntity entity, AtlasEntityType entityType, String attrName, Object attrValue, AtlasVertex vertex, EntityOperation op, EntityMutationContext context) throws AtlasBaseException {
LOG.debug("==> mapRelationshipAttributeWithMultipleTypes({}, {})", attrName, entity.getTypeName());
Set<String> relationshipTypeNames = entityType.getAttributeRelationshipTypes(attrName);

if (CollectionUtils.isEmpty(relationshipTypeNames)) {
// Fallback to single relationship type processing
String relationType = AtlasEntityUtil.getRelationshipType(attrValue);
AtlasAttribute attribute = entityType.getRelationshipAttribute(attrName, relationType);
mapAttribute(attribute, attrValue, vertex, op, context);

return;
}

if (attrValue instanceof Collection) {
Collection<?> relatedObjects = (Collection<?>) attrValue;

// Group related objects by their appropriate relationship type
// e.g., hive_table elements should use hive_table_db relationship, anothertype_table elements should use anothertype_table_db
Map<String, List<Object>> elementsByRelationshipType = groupElementsByRelationshipType(
relatedObjects, attrName, relationshipTypeNames);

// Process each relationship type with its filtered elements
for (Map.Entry<String, List<Object>> entry : elementsByRelationshipType.entrySet()) {
String relationshipTypeName = entry.getKey();
List<Object> filteredElements = entry.getValue();

AtlasAttribute attribute = entityType.getRelationshipAttribute(attrName, relationshipTypeName);

if (attribute != null && CollectionUtils.isNotEmpty(filteredElements)) {
// Use the same collection type as the original (List or Set)
Object filteredValue = createCollectionOfSameType(attrValue, filteredElements);

LOG.debug("Processing relationship type {} for attribute {} with {} elements",
relationshipTypeName, attrName, filteredElements.size());

mapAttribute(attribute, filteredValue, vertex, op, context);
}
}
} else {
// Single element - prefer explicit relationship type from the value
String appropriateRelType = AtlasEntityUtil.getRelationshipType(attrValue);
AtlasAttribute attribute = entityType.getRelationshipAttribute(attrName, appropriateRelType);
mapAttribute(attribute, attrValue, vertex, op, context);
}

LOG.debug("<== mapRelationshipAttributeWithMultipleTypes({}, {})", attrName, entity.getTypeName());
}

private Map<String, List<Object>> groupElementsByRelationshipType(Collection<?> relatedObjects,
String attrName,
Set<String> relationshipTypeNames) {
Map<String, List<Object>> elementsByRelationshipType = new HashMap<>();

// Group related objects by their appropriate relationship type
for (Object element : relatedObjects) {
// Prefer the explicit relationship type encoded in the element, if any
String appropriateRelType = AtlasEntityUtil.getRelationshipType(element);

if (StringUtils.isEmpty(appropriateRelType)) {
// If only one relationship type is configured, fall back to it; otherwise, skip this element
if (relationshipTypeNames.size() == 1) {
appropriateRelType = relationshipTypeNames.iterator().next();
LOG.warn("No relationshipType found in element for attribute {}; falling back to configured type {}",
attrName, appropriateRelType);
} else {
LOG.warn("No relationshipType found in element for attribute {}; configured relationship types: {}. Skipping element.",
attrName, relationshipTypeNames);
continue;
}
} else if (!relationshipTypeNames.contains(appropriateRelType)) {
// Element's relationshipType is not configured for this attribute; ignore this element
LOG.warn("relationshipType {} from element in attribute {} is not in configured relationship types {}; element will be ignored",
appropriateRelType, attrName, relationshipTypeNames);
continue;
}

elementsByRelationshipType.computeIfAbsent(appropriateRelType, k -> new ArrayList<>()).add(element);
}

return elementsByRelationshipType;
}

private Object createCollectionOfSameType(Object originalValue, List<Object> filteredElements) {
if (originalValue instanceof List) {
return filteredElements;
} else if (originalValue instanceof Set) {
return new HashSet<>(filteredElements);
} else {
return filteredElements;
}
}

private void mapAttribute(AtlasAttribute attribute, Object attrValue, AtlasVertex vertex, EntityOperation op, EntityMutationContext context) throws AtlasBaseException {
boolean isDeletedEntity = context.isDeletedEntity(vertex);
AtlasType attrType = attribute.getAttributeType();
Expand Down
Loading