diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index 8ccc63c1b17..03d5e7e57ce 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -23,6 +23,7 @@ import org.apache.atlas.glossary.GlossaryService; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; @@ -34,6 +35,8 @@ import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; @@ -43,24 +46,36 @@ import org.apache.atlas.util.AtlasGremlinQueryProvider; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.inject.Inject; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED; import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL; import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID; +import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL; import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; +import static org.apache.atlas.repository.graph.GraphHelper.getGuid; +import static org.apache.atlas.repository.graph.GraphHelper.getTypeName; +import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN; +import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT; @Component public class ExportService { @@ -75,6 +90,8 @@ public class ExportService { private final AuditsWriter auditsWriter; private ExportTypeProcessor exportTypeProcessor; private static final String ATLAS_TYPE_HIVE_DB = "hive_db"; + public static final String PROCESS_INPUTS = "__Process.inputs"; + public static final String PROCESS_OUTPUTS = "__Process.outputs"; @Inject public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph graph, AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator, GlossaryService glossaryService) { @@ -91,13 +108,14 @@ public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, Str long startTime = System.currentTimeMillis(); AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime, getCurrentChangeMarker()); ExportContext context = new ExportContext(result, exportSink); + RelationshipAttributesExtractor relationshipAttributesExtractor = new RelationshipAttributesExtractor(typeRegistry); exportTypeProcessor = new ExportTypeProcessor(typeRegistry, glossaryService); try { LOG.info("==> export(user={}, from={})", userName, requestingIP); - AtlasExportResult.OperationStatus[] statuses = processItems(request, context); + AtlasExportResult.OperationStatus[] statuses = processItems(request, context, relationshipAttributesExtractor); processTypesDef(context); @@ -219,20 +237,20 @@ private void processTypesDef(ExportContext context) { } } - private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context) { - AtlasExportResult.OperationStatus[] statuses = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()]; - List itemsToExport = request.getItemsToExport(); + private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) { + AtlasExportResult.OperationStatus[] statuses = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()]; + List itemsToExport = request.getItemsToExport(); for (int i = 0; i < itemsToExport.size(); i++) { AtlasObjectId item = itemsToExport.get(i); - statuses[i] = processObjectId(item, context); + statuses[i] = processObjectId(item, context, relationshipAttributesExtractor); } return statuses; } - private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) { + private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) { LOG.debug("==> processObjectId({})", item); try { @@ -266,6 +284,11 @@ private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, Ex context.isSkipConnectedFetch = false; } + if (context.fetchType != ExportFetchType.FULL && !context.skipLineage) { + for (String guid : entityGuids) { + addEntityGuids(guid, context, relationshipAttributesExtractor); + } + } } catch (AtlasBaseException excp) { LOG.error("Fetching entity failed for: {}", item, excp); @@ -413,6 +436,91 @@ private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext c context.reportProgress(); } + public void addEntityGuids(String guid, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) throws AtlasBaseException { + AtlasVertex adjacentVertex; + Iterator entityEdges; + Iterator propagateClassificationVertices; + Iterator appliedClassificationVertices; + String fetchedClassificationGuid; + List processedClassifications = new ArrayList<>(); + + AtlasVertex initialEntityVertex = entityGraphRetriever.getEntityVertex(guid); + for (AtlasClassification currentClassification : entityGraphRetriever.getAllClassifications(initialEntityVertex)) { + if (context.guidsProcessed.contains(currentClassification.getEntityGuid())) { + processedClassifications.add(currentClassification); + } + } + context.newAddedGuids.add(guid); + while (!context.newAddedGuids.isEmpty()) { + String currentGuid = context.newAddedGuids.poll(); + + AtlasVertex entityVertex = entityGraphRetriever.getEntityVertex(currentGuid); + String entityTypeName = getTypeName(entityVertex); + List classifications = entityGraphRetriever.getAllClassifications(entityVertex); + if (CollectionUtils.isNotEmpty(processedClassifications)) { + classifications.removeAll(processedClassifications); + } + if (CollectionUtils.isNotEmpty(classifications)) { + for (AtlasClassification classification : classifications) { + String classificationName = classification.getTypeName(); + boolean isProcess = relationshipAttributesExtractor.isLineageType(entityTypeName); + entityEdges = isProcess + ? GraphHelper.getEdgesForLabel(entityVertex, PROCESS_INPUTS, OUT) + : GraphHelper.getEdgesForLabel(entityVertex, PROCESS_OUTPUTS, IN); + while (entityEdges.hasNext()) { + AtlasEdge propagationEdge = entityEdges.next(); + AtlasVertex outVertex = propagationEdge.getOutVertex(); + AtlasVertex inVertex = propagationEdge.getInVertex(); + adjacentVertex = StringUtils.equals(outVertex.getIdForDisplay(), entityVertex.getIdForDisplay()) ? inVertex : outVertex; + String adjacentGuid = getGuid(adjacentVertex); + boolean isPropagated = false; + propagateClassificationVertices = getClassificationVertices(inVertex, outVertex, isProcess, true, classificationName); + while (propagateClassificationVertices.hasNext()) { + AtlasVertex classificationVertex = propagateClassificationVertices.next(); + fetchedClassificationGuid = classificationVertex.getProperty(CLASSIFICATION_ENTITY_GUID, String.class); + if (StringUtils.equals(classification.getEntityGuid(), fetchedClassificationGuid)) { + addAdjacentVertices(context, adjacentGuid); + isPropagated = true; + } + } + if (!isPropagated) { + appliedClassificationVertices = getClassificationVertices(inVertex, outVertex, isProcess, false, classificationName); + + while (appliedClassificationVertices.hasNext()) { + AtlasVertex classificationVertex = appliedClassificationVertices.next(); + fetchedClassificationGuid = classificationVertex.getProperty(CLASSIFICATION_ENTITY_GUID, String.class); + if (StringUtils.equals(classification.getEntityGuid(), fetchedClassificationGuid)) { + addAdjacentVertices(context, adjacentGuid); + break; + } + } + } + } + } + } + } + } + + private Iterator getClassificationVertices(AtlasVertex inVertex, AtlasVertex outVertex, + boolean isProcess, boolean isPropagated, String name) { + AtlasVertex base = isProcess ? inVertex : outVertex; + return base.query() + .direction(AtlasEdgeDirection.OUT) + .label(CLASSIFICATION_LABEL) + .has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, isPropagated) + .has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, name) + .vertices().iterator(); + } + + private void addAdjacentVertices(ExportContext context, String adjacentGuid) throws AtlasBaseException { + if (!context.newAddedGuids.contains(adjacentGuid)) { + context.newAddedGuids.add(adjacentGuid); + } + if (!context.sink.guids.contains(adjacentGuid)) { + context.addToSink(entityGraphRetriever.toAtlasEntityWithExtInfo(adjacentGuid)); + } + } + public enum TraversalDirection { UNKNOWN, INWARD, @@ -450,6 +558,7 @@ static class ExportContext { final UniqueList entityCreationOrder = new UniqueList<>(); final Set guidsProcessed = new HashSet<>(); final UniqueList guidsToProcess = new UniqueList<>(); + final Queue newAddedGuids = new ArrayDeque<>(); final UniqueList lineageToProcess = new UniqueList<>(); final Set lineageProcessed = new HashSet<>(); final Map guidDirection = new HashMap<>(); @@ -511,6 +620,7 @@ public void clear() { guidsToProcess.clear(); guidsProcessed.clear(); guidDirection.clear(); + newAddedGuids.clear(); startingEntityType = null; } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java index 6f42e1b85f9..857fb664eea 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java @@ -131,7 +131,7 @@ private TraversalDirection getRelationshipEdgeDirection(AtlasRelatedObjectId rel return isOutEdge ? OUTWARD : INWARD; } - private boolean isLineageType(String typeName) { + public boolean isLineageType(String typeName) { AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(typeName); return entityDef.getSuperTypes().contains(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS); diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java index 3283e024174..fa166d38ac1 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java @@ -24,6 +24,7 @@ import org.apache.atlas.TestUtilsV2; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.repository.AtlasTestBase; @@ -49,6 +50,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -78,6 +80,24 @@ public class ExportIncrementalTest extends AtlasTestBase { private static final String EXPORT_REQUEST_INCREMENTAL = "export-incremental"; private static final String EXPORT_REQUEST_CONNECTED = "export-connected"; + private static final String FIRSTPARENT = "589a233a-f00e-4928-8efd-e7e72e30d370"; + private static final String HIVEDB = "12eb7a9b-3b4d-48c9-902c-1fa2401823f7"; + private static final String CTASLEVEL13 = "6f1c413c-1b35-421a-aabd-d5f94873ddf0"; + private static final String CTASLEVEL12 = "58a68a94-67bb-488d-b111-3dfdd3a220eb"; + private static final String CTASLEVEL11 = "c6657df3-3bea-44cc-a356-a81c9e72f9c7"; + private static final String SECONDPARENT = "0ce3573b-c535-4bf9-970e-4d37f01806ef"; + private static final String CTLASLEVEL11_1 = "80a3ead2-6ad7-4881-bd85-5e8b4fdb01c5"; + private static final String HDFS_PATH = "d9c50322-b130-405e-b560-2b15bcdddb97"; + private static final String SECONDPARENT_PROCESS = "f611662a-4ea6-4707-b7e9-02848fb28529"; + private static final String CTASLEVEL13_PROCESS = "da34b191-5ab9-4934-94c6-5a97d3e59608"; + private static final String CTASLEVEL12_PROCESS = "33fc0f3c-3522-4aaa-83c7-258752abe824"; + private static final String CTASLEVEL11_1_PROCESS = "1339782e-fde7-402b-8271-2f91a65396e9"; + private static final String CTASLEVEL11_PROCESS = "64cde929-195a-4c90-a921-b8c4d79ddfcf"; + + // Resolved after import + private static final String CTASLEVEL11_1_TABLE_QUALIFIED_NAME = "default.ctaslevel11_1@cm"; + private static final String CTASLEVEL13_TABLE_QUALIFIED_NAME = "default.ctaslevel13@cm"; + @Inject AtlasTypeRegistry typeRegistry; @@ -96,6 +116,7 @@ public class ExportIncrementalTest extends AtlasTestBase { private AtlasClassificationType classificationTypeT1; private AtlasClassificationType classificationTypeT2; private AtlasClassificationType classificationTypeT3; + private long nextTimestamp; @DataProvider(name = "hiveDb") @@ -103,6 +124,11 @@ public static Object[][] getData(ITestContext context) throws IOException, Atlas return getZipSource("hive_db_lineage.zip"); } + @DataProvider(name = "classificationLineage") + public static Object[][] getClassificationData(ITestContext context) throws IOException, AtlasBaseException { + return getZipSource("classificationLineage.zip"); + } + @BeforeClass public void setup() throws IOException, AtlasBaseException { basicSetup(typeDefStore, typeRegistry); @@ -226,6 +252,11 @@ public void importHiveDb(InputStream stream) throws AtlasBaseException, IOExcept runImportWithNoParameters(importService, stream); } + @Test(dataProvider = "classificationLineage") + public void classificationineageDb(InputStream stream) throws AtlasBaseException, IOException { + runImportWithNoParameters(importService, stream); + } + @Test(dependsOnMethods = "importHiveDb") public void exportTableInrementalConnected() throws AtlasBaseException, IOException { InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true)); @@ -247,6 +278,25 @@ public void exportTableInrementalConnected() throws AtlasBaseException, IOExcept verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_CTAS_2); } + @Test(dependsOnMethods = "classificationineageDb") + public void exportTableInrementalConnectedClassificationLineage() throws AtlasBaseException, IOException { + InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(CTASLEVEL11_1_TABLE_QUALIFIED_NAME, EXPORT_INCREMENTAL, 0, false)); + ZipSource sourceCopy = getZipSourceCopy(source); + if (entityStore.getClassification(FIRSTPARENT, "firstclassi") == null) { + entityStore.addClassification(Arrays.asList(FIRSTPARENT), new AtlasClassification("firstclassi", null)); + } + + verifyExpectedEntities(getFileNames(sourceCopy), HDFS_PATH, HIVEDB, CTLASLEVEL11_1, CTASLEVEL11_1_PROCESS, CTASLEVEL11_PROCESS, CTASLEVEL11, SECONDPARENT_PROCESS, + SECONDPARENT, FIRSTPARENT); + + nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy); + + entityStore.deleteClassification(FIRSTPARENT, "firstclassi", FIRSTPARENT); + + source = runExportWithParameters(exportService, getExportRequestForHiveTable(CTASLEVEL11_1_TABLE_QUALIFIED_NAME, EXPORT_INCREMENTAL, nextTimestamp, false)); + verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), CTLASLEVEL11_1, CTASLEVEL11_1_PROCESS, CTASLEVEL11_PROCESS, CTASLEVEL11, SECONDPARENT); + } + @Test(dependsOnMethods = "importHiveDb") public void exportTableIncrementalForParentEntity() throws AtlasBaseException, IOException { InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, 0, false)); diff --git a/repository/src/test/resources/classificationLineage.zip b/repository/src/test/resources/classificationLineage.zip new file mode 100644 index 00000000000..db10020ebec Binary files /dev/null and b/repository/src/test/resources/classificationLineage.zip differ