diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index 8ccc63c1b17..95a8e487b3a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -248,10 +248,11 @@ private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, Ex AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid); String typeName = GraphHelper.getTypeName(vertex); context.startingEntityType = typeName; + context.startingEntityGuid = guid; processEntityGuid(guid, context); } - while (!context.guidsToProcess.isEmpty()) { + while (!context.guidsToProcess.isEmpty() || !context.lineageToProcess.isEmpty()) { while (!context.guidsToProcess.isEmpty()) { String guid = context.guidsToProcess.remove(0); @@ -288,8 +289,19 @@ private List getStartingEntity(AtlasObjectId item, ExportContext context private void processEntityGuid(String guid, ExportContext context) throws AtlasBaseException { LOG.debug("==> processEntityGuid({})", guid); + boolean resumeExportForStartingEntity = false; + if ((context.fetchType == ExportFetchType.CONNECTED + || (context.fetchType == ExportFetchType.INCREMENTAL && context.changeMarker <= 0)) + && guid.equals(context.startingEntityGuid)) { + resumeExportForStartingEntity = true; + } + if (context.guidsProcessed.contains(guid)) { - return; + if (resumeExportForStartingEntity) { + LOG.info("Resuming export for {}", guid); + } else { + return; + } } if (context.fetchType == ExportFetchType.INCREMENTAL && context.startingEntityType.equals(ATLAS_TYPE_HIVE_DB) && !context.skipLineage) { @@ -472,6 +484,7 @@ static class ExportContext { boolean isSkipConnectedFetch; private int progressReportCount; public String startingEntityType; + public String startingEntityGuid; ExportContext(AtlasExportResult result, ZipSink sink) { this.result = result; @@ -512,6 +525,7 @@ public void clear() { guidsProcessed.clear(); guidDirection.clear(); startingEntityType = null; + startingEntityGuid = null; } public void addToBeProcessed(boolean isSuperTypeProcess, String guid, TraversalDirection direction) { diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java index 6f42e1b85f9..064efc80f95 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java @@ -53,6 +53,9 @@ public void connectedFetch(AtlasEntity entity, ExportContext context) { } ExportService.TraversalDirection direction = context.guidDirection.get(entity.getGuid()); + if (context.startingEntityGuid.equals(entity.getGuid())) { + direction = null; + } if (direction == null || direction == UNKNOWN) { addToBeProcessed(entity, context, OUTWARD, INWARD); diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportServiceTest.java index 5890143eff7..66b3bd5194a 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportServiceTest.java @@ -23,6 +23,7 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; @@ -37,9 +38,11 @@ import org.apache.atlas.type.AtlasTypeRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.ITestContext; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -48,6 +51,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -76,6 +80,25 @@ public class ExportServiceTest extends AtlasTestBase { @Inject private AtlasEntityStoreV2 entityStore; + @Inject + private ImportService importService; + + private static final String HIVE_TABLE_TYPE_NAME = "hive_table"; + + private static final String TABLE_1_GUID = "9f9e5088-3ace-4dd7-ae3e-41a93875e264"; + private static final String TABLE_2_GUID = "49dc552e-835a-4b88-b752-220e03c6df36"; + private static final String TABLE_3_GUID = "8b2f7e53-ac4b-4c6b-8159-8b6b62805d1a"; + private static final String TABLE_4_GUID = "e5c2edd4-df48-4646-8f22-0fbdce235496"; + private static final String TABLE_5_GUID = "1f6cb442-e7c4-4521-827a-49d567240e74"; + private static final String TABLE_6_GUID = "edc9facc-2e76-4dbd-830d-ad7644541451"; + + private static final String TABLE_1_QUALIFIED_NAME = "hivedb01.hivetable01@primary"; + private static final String TABLE_2_QUALIFIED_NAME = "hivedb01.hivetable02@primary"; + private static final String TABLE_3_QUALIFIED_NAME = "hivedb01.hivetable03@primary"; + private static final String TABLE_4_QUALIFIED_NAME = "hivedb01.hivetable04@primary"; + private static final String TABLE_5_QUALIFIED_NAME = "hivedb01.hivetable05@primary"; + private static final String TABLE_6_QUALIFIED_NAME = "hivedb01.hivetable06@primary"; + @BeforeTest public void setupTest() throws IOException, AtlasBaseException { RequestContext.clear(); @@ -234,6 +257,60 @@ public void verifyTypeFull() throws AtlasBaseException, IOException { verifyExportForFullEmployeeData(zipSource); } + @DataProvider(name = "ctashivetables") + public static Object[][] importCtasData(ITestContext context) throws IOException, AtlasBaseException { + return ZipFileResourceTestUtils.getZipSource("ctas_hive_tables.zip"); + } + + @Test(dataProvider = "ctashivetables") + public void testExportConnectedHiveTables(InputStream inputStream) throws AtlasBaseException, IOException { + AtlasImportResult result = ZipFileResourceTestUtils.runImportWithNoParameters(importService, inputStream); + assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS); + + AtlasExportRequest request = getRequestForConnected(HIVE_TABLE_TYPE_NAME, TABLE_1_QUALIFIED_NAME, TABLE_2_QUALIFIED_NAME); + ZipSource zipSource = runExportWithParameters(request); + List guidList = zipSource.getCreationOrder(); + + assertTrue(guidList.contains(TABLE_1_GUID)); + assertTrue(guidList.contains(TABLE_2_GUID)); + assertTrue(guidList.contains(TABLE_3_GUID)); + assertFalse(guidList.contains(TABLE_4_GUID)); + assertFalse(guidList.contains(TABLE_5_GUID)); + assertFalse(guidList.contains(TABLE_6_GUID)); + + request = getRequestForConnected(HIVE_TABLE_TYPE_NAME, TABLE_1_QUALIFIED_NAME, TABLE_6_QUALIFIED_NAME); + zipSource = runExportWithParameters(request); + guidList = zipSource.getCreationOrder(); + + assertTrue(guidList.contains(TABLE_1_GUID)); + assertTrue(guidList.contains(TABLE_2_GUID)); + assertFalse(guidList.contains(TABLE_3_GUID)); + assertFalse(guidList.contains(TABLE_4_GUID)); + assertTrue(guidList.contains(TABLE_5_GUID)); + assertTrue(guidList.contains(TABLE_6_GUID)); + + request = getRequestForConnected(HIVE_TABLE_TYPE_NAME, TABLE_1_QUALIFIED_NAME, TABLE_4_QUALIFIED_NAME); + zipSource = runExportWithParameters(request); + guidList = zipSource.getCreationOrder(); + + assertTrue(guidList.contains(TABLE_1_GUID)); + assertTrue(guidList.contains(TABLE_2_GUID)); + assertTrue(guidList.contains(TABLE_3_GUID)); + assertTrue(guidList.contains(TABLE_4_GUID)); + assertTrue(guidList.contains(TABLE_5_GUID)); + assertFalse(guidList.contains(TABLE_6_GUID)); + } + + private AtlasExportRequest getRequestForConnected(String typeName, String qualifiedName1, String qualifiedName2) { + AtlasExportRequest request = new AtlasExportRequest(); + List itemsToExport = new ArrayList<>(); + itemsToExport.add(new AtlasObjectId(typeName, "qualifiedName", qualifiedName1)); + itemsToExport.add(new AtlasObjectId(typeName, "qualifiedName", qualifiedName2)); + request.setItemsToExport(itemsToExport); + setOptionsMap(request, true, AtlasExportRequest.FETCH_TYPE_CONNECTED, false, ""); + return request; + } + private AtlasExportRequest getRequestForFullFetch() { AtlasExportRequest request = new AtlasExportRequest(); diff --git a/repository/src/test/resources/ctas_hive_tables.zip b/repository/src/test/resources/ctas_hive_tables.zip new file mode 100644 index 00000000000..911a147c7da Binary files /dev/null and b/repository/src/test/resources/ctas_hive_tables.zip differ