Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.atlas.glossary.GlossaryService;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
Expand All @@ -34,6 +35,8 @@
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
Expand All @@ -43,24 +46,36 @@
import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.inject.Inject;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;

import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;

@Component
public class ExportService {
Expand All @@ -75,6 +90,8 @@ public class ExportService {
private final AuditsWriter auditsWriter;
private ExportTypeProcessor exportTypeProcessor;
private static final String ATLAS_TYPE_HIVE_DB = "hive_db";
public static final String PROCESS_INPUTS = "__Process.inputs";
public static final String PROCESS_OUTPUTS = "__Process.outputs";

@Inject
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph graph, AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator, GlossaryService glossaryService) {
Expand All @@ -91,13 +108,14 @@ public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, Str
long startTime = System.currentTimeMillis();
AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime, getCurrentChangeMarker());
ExportContext context = new ExportContext(result, exportSink);
RelationshipAttributesExtractor relationshipAttributesExtractor = new RelationshipAttributesExtractor(typeRegistry);

exportTypeProcessor = new ExportTypeProcessor(typeRegistry, glossaryService);

try {
LOG.info("==> export(user={}, from={})", userName, requestingIP);

AtlasExportResult.OperationStatus[] statuses = processItems(request, context);
AtlasExportResult.OperationStatus[] statuses = processItems(request, context, relationshipAttributesExtractor);

processTypesDef(context);

Expand Down Expand Up @@ -219,20 +237,20 @@ private void processTypesDef(ExportContext context) {
}
}

private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context) {
AtlasExportResult.OperationStatus[] statuses = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()];
List<AtlasObjectId> itemsToExport = request.getItemsToExport();
private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) {
AtlasExportResult.OperationStatus[] statuses = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()];
List<AtlasObjectId> itemsToExport = request.getItemsToExport();

for (int i = 0; i < itemsToExport.size(); i++) {
AtlasObjectId item = itemsToExport.get(i);

statuses[i] = processObjectId(item, context);
statuses[i] = processObjectId(item, context, relationshipAttributesExtractor);
}

return statuses;
}

private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) {
private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) {
LOG.debug("==> processObjectId({})", item);

try {
Expand Down Expand Up @@ -266,6 +284,11 @@ private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, Ex

context.isSkipConnectedFetch = false;
}
if (context.fetchType != ExportFetchType.FULL && !context.skipLineage) {
for (String guid : entityGuids) {
addEntityGuids(guid, context, relationshipAttributesExtractor);
}
}
} catch (AtlasBaseException excp) {
LOG.error("Fetching entity failed for: {}", item, excp);

Expand Down Expand Up @@ -413,6 +436,91 @@ private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext c
context.reportProgress();
}

public void addEntityGuids(String guid, ExportContext context, RelationshipAttributesExtractor relationshipAttributesExtractor) throws AtlasBaseException {
AtlasVertex adjacentVertex;
Iterator<AtlasEdge> entityEdges;
Iterator<AtlasVertex> propagateClassificationVertices;
Iterator<AtlasVertex> appliedClassificationVertices;
String fetchedClassificationGuid;
List<AtlasClassification> processedClassifications = new ArrayList<>();

AtlasVertex initialEntityVertex = entityGraphRetriever.getEntityVertex(guid);
for (AtlasClassification currentClassification : entityGraphRetriever.getAllClassifications(initialEntityVertex)) {
if (context.guidsProcessed.contains(currentClassification.getEntityGuid())) {
processedClassifications.add(currentClassification);
}
}
context.newAddedGuids.add(guid);
while (!context.newAddedGuids.isEmpty()) {
String currentGuid = context.newAddedGuids.poll();

AtlasVertex entityVertex = entityGraphRetriever.getEntityVertex(currentGuid);
String entityTypeName = getTypeName(entityVertex);
List<AtlasClassification> classifications = entityGraphRetriever.getAllClassifications(entityVertex);
if (CollectionUtils.isNotEmpty(processedClassifications)) {
classifications.removeAll(processedClassifications);
}
if (CollectionUtils.isNotEmpty(classifications)) {
for (AtlasClassification classification : classifications) {
String classificationName = classification.getTypeName();
boolean isProcess = relationshipAttributesExtractor.isLineageType(entityTypeName);
entityEdges = isProcess
? GraphHelper.getEdgesForLabel(entityVertex, PROCESS_INPUTS, OUT)
: GraphHelper.getEdgesForLabel(entityVertex, PROCESS_OUTPUTS, IN);
while (entityEdges.hasNext()) {
AtlasEdge propagationEdge = entityEdges.next();
AtlasVertex outVertex = propagationEdge.getOutVertex();
AtlasVertex inVertex = propagationEdge.getInVertex();
adjacentVertex = StringUtils.equals(outVertex.getIdForDisplay(), entityVertex.getIdForDisplay()) ? inVertex : outVertex;
String adjacentGuid = getGuid(adjacentVertex);
boolean isPropagated = false;
propagateClassificationVertices = getClassificationVertices(inVertex, outVertex, isProcess, true, classificationName);
while (propagateClassificationVertices.hasNext()) {
AtlasVertex classificationVertex = propagateClassificationVertices.next();
fetchedClassificationGuid = classificationVertex.getProperty(CLASSIFICATION_ENTITY_GUID, String.class);
if (StringUtils.equals(classification.getEntityGuid(), fetchedClassificationGuid)) {
addAdjacentVertices(context, adjacentGuid);
isPropagated = true;
}
}
if (!isPropagated) {
appliedClassificationVertices = getClassificationVertices(inVertex, outVertex, isProcess, false, classificationName);

while (appliedClassificationVertices.hasNext()) {
AtlasVertex classificationVertex = appliedClassificationVertices.next();
fetchedClassificationGuid = classificationVertex.getProperty(CLASSIFICATION_ENTITY_GUID, String.class);
if (StringUtils.equals(classification.getEntityGuid(), fetchedClassificationGuid)) {
addAdjacentVertices(context, adjacentGuid);
break;
}
}
}
}
}
}
}
}

private Iterator<AtlasVertex> getClassificationVertices(AtlasVertex inVertex, AtlasVertex outVertex,
boolean isProcess, boolean isPropagated, String name) {
AtlasVertex base = isProcess ? inVertex : outVertex;
return base.query()
.direction(AtlasEdgeDirection.OUT)
.label(CLASSIFICATION_LABEL)
.has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, isPropagated)
.has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, name)
.vertices().iterator();
}

private void addAdjacentVertices(ExportContext context, String adjacentGuid) throws AtlasBaseException {
if (!context.newAddedGuids.contains(adjacentGuid)) {
context.newAddedGuids.add(adjacentGuid);
}
if (!context.sink.guids.contains(adjacentGuid)) {
context.addToSink(entityGraphRetriever.toAtlasEntityWithExtInfo(adjacentGuid));
}
}

public enum TraversalDirection {
UNKNOWN,
INWARD,
Expand Down Expand Up @@ -450,6 +558,7 @@ static class ExportContext {
final UniqueList<String> entityCreationOrder = new UniqueList<>();
final Set<String> guidsProcessed = new HashSet<>();
final UniqueList<String> guidsToProcess = new UniqueList<>();
final Queue<String> newAddedGuids = new ArrayDeque<>();
final UniqueList<String> lineageToProcess = new UniqueList<>();
final Set<String> lineageProcessed = new HashSet<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>();
Expand Down Expand Up @@ -511,6 +620,7 @@ public void clear() {
guidsToProcess.clear();
guidsProcessed.clear();
guidDirection.clear();
newAddedGuids.clear();
startingEntityType = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private TraversalDirection getRelationshipEdgeDirection(AtlasRelatedObjectId rel
return isOutEdge ? OUTWARD : INWARD;
}

private boolean isLineageType(String typeName) {
public boolean isLineageType(String typeName) {
AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(typeName);

return entityDef.getSuperTypes().contains(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.repository.AtlasTestBase;
Expand All @@ -49,6 +50,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -78,6 +80,24 @@ public class ExportIncrementalTest extends AtlasTestBase {
private static final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
private static final String EXPORT_REQUEST_CONNECTED = "export-connected";

private static final String FIRSTPARENT = "589a233a-f00e-4928-8efd-e7e72e30d370";
private static final String HIVEDB = "12eb7a9b-3b4d-48c9-902c-1fa2401823f7";
private static final String CTASLEVEL13 = "6f1c413c-1b35-421a-aabd-d5f94873ddf0";
private static final String CTASLEVEL12 = "58a68a94-67bb-488d-b111-3dfdd3a220eb";
private static final String CTASLEVEL11 = "c6657df3-3bea-44cc-a356-a81c9e72f9c7";
private static final String SECONDPARENT = "0ce3573b-c535-4bf9-970e-4d37f01806ef";
private static final String CTLASLEVEL11_1 = "80a3ead2-6ad7-4881-bd85-5e8b4fdb01c5";
private static final String HDFS_PATH = "d9c50322-b130-405e-b560-2b15bcdddb97";
private static final String SECONDPARENT_PROCESS = "f611662a-4ea6-4707-b7e9-02848fb28529";
private static final String CTASLEVEL13_PROCESS = "da34b191-5ab9-4934-94c6-5a97d3e59608";
private static final String CTASLEVEL12_PROCESS = "33fc0f3c-3522-4aaa-83c7-258752abe824";
private static final String CTASLEVEL11_1_PROCESS = "1339782e-fde7-402b-8271-2f91a65396e9";
private static final String CTASLEVEL11_PROCESS = "64cde929-195a-4c90-a921-b8c4d79ddfcf";

// Resolved after import
private static final String CTASLEVEL11_1_TABLE_QUALIFIED_NAME = "default.ctaslevel11_1@cm";
private static final String CTASLEVEL13_TABLE_QUALIFIED_NAME = "default.ctaslevel13@cm";

@Inject
AtlasTypeRegistry typeRegistry;

Expand All @@ -96,13 +116,19 @@ public class ExportIncrementalTest extends AtlasTestBase {
private AtlasClassificationType classificationTypeT1;
private AtlasClassificationType classificationTypeT2;
private AtlasClassificationType classificationTypeT3;

private long nextTimestamp;

@DataProvider(name = "hiveDb")
public static Object[][] getData(ITestContext context) throws IOException, AtlasBaseException {
return getZipSource("hive_db_lineage.zip");
}

@DataProvider(name = "classificationLineage")
public static Object[][] getClassificationData(ITestContext context) throws IOException, AtlasBaseException {
return getZipSource("classificationLineage.zip");
}

@BeforeClass
public void setup() throws IOException, AtlasBaseException {
basicSetup(typeDefStore, typeRegistry);
Expand Down Expand Up @@ -226,6 +252,11 @@ public void importHiveDb(InputStream stream) throws AtlasBaseException, IOExcept
runImportWithNoParameters(importService, stream);
}

@Test(dataProvider = "classificationLineage")
public void classificationineageDb(InputStream stream) throws AtlasBaseException, IOException {
runImportWithNoParameters(importService, stream);
}

@Test(dependsOnMethods = "importHiveDb")
public void exportTableInrementalConnected() throws AtlasBaseException, IOException {
InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
Expand All @@ -247,6 +278,25 @@ public void exportTableInrementalConnected() throws AtlasBaseException, IOExcept
verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_CTAS_2);
}

@Test(dependsOnMethods = "classificationineageDb")
public void exportTableInrementalConnectedClassificationLineage() throws AtlasBaseException, IOException {
InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(CTASLEVEL11_1_TABLE_QUALIFIED_NAME, EXPORT_INCREMENTAL, 0, false));
ZipSource sourceCopy = getZipSourceCopy(source);
if (entityStore.getClassification(FIRSTPARENT, "firstclassi") == null) {
entityStore.addClassification(Arrays.asList(FIRSTPARENT), new AtlasClassification("firstclassi", null));
}

verifyExpectedEntities(getFileNames(sourceCopy), HDFS_PATH, HIVEDB, CTLASLEVEL11_1, CTASLEVEL11_1_PROCESS, CTASLEVEL11_PROCESS, CTASLEVEL11, SECONDPARENT_PROCESS,
SECONDPARENT, FIRSTPARENT);

nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy);

entityStore.deleteClassification(FIRSTPARENT, "firstclassi", FIRSTPARENT);

source = runExportWithParameters(exportService, getExportRequestForHiveTable(CTASLEVEL11_1_TABLE_QUALIFIED_NAME, EXPORT_INCREMENTAL, nextTimestamp, false));
verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), CTLASLEVEL11_1, CTASLEVEL11_1_PROCESS, CTASLEVEL11_PROCESS, CTASLEVEL11, SECONDPARENT);
}

@Test(dependsOnMethods = "importHiveDb")
public void exportTableIncrementalForParentEntity() throws AtlasBaseException, IOException {
InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, 0, false));
Expand Down
Binary file not shown.
Loading