Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,11 @@ private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, Ex
AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid);
String typeName = GraphHelper.getTypeName(vertex);
context.startingEntityType = typeName;
context.startingEntityGuid = guid;
processEntityGuid(guid, context);
}

while (!context.guidsToProcess.isEmpty()) {
while (!context.guidsToProcess.isEmpty() || !context.lineageToProcess.isEmpty()) {
while (!context.guidsToProcess.isEmpty()) {
String guid = context.guidsToProcess.remove(0);

Expand Down Expand Up @@ -288,8 +289,19 @@ private List<String> getStartingEntity(AtlasObjectId item, ExportContext context
private void processEntityGuid(String guid, ExportContext context) throws AtlasBaseException {
LOG.debug("==> processEntityGuid({})", guid);

boolean resumeExportForStartingEntity = false;
if ((context.fetchType == ExportFetchType.CONNECTED
|| (context.fetchType == ExportFetchType.INCREMENTAL && context.changeMarker <= 0))
&& guid.equals(context.startingEntityGuid)) {
resumeExportForStartingEntity = true;
}

if (context.guidsProcessed.contains(guid)) {
return;
if (resumeExportForStartingEntity) {
LOG.info("Resuming export for {}", guid);
} else {
return;
}
}

if (context.fetchType == ExportFetchType.INCREMENTAL && context.startingEntityType.equals(ATLAS_TYPE_HIVE_DB) && !context.skipLineage) {
Expand Down Expand Up @@ -472,6 +484,7 @@ static class ExportContext {
boolean isSkipConnectedFetch;
private int progressReportCount;
public String startingEntityType;
public String startingEntityGuid;

ExportContext(AtlasExportResult result, ZipSink sink) {
this.result = result;
Expand Down Expand Up @@ -512,6 +525,7 @@ public void clear() {
guidsProcessed.clear();
guidDirection.clear();
startingEntityType = null;
startingEntityGuid = null;
}

public void addToBeProcessed(boolean isSuperTypeProcess, String guid, TraversalDirection direction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public void connectedFetch(AtlasEntity entity, ExportContext context) {
}

ExportService.TraversalDirection direction = context.guidDirection.get(entity.getGuid());
if (context.startingEntityGuid.equals(entity.getGuid())) {
direction = null;
}

if (direction == null || direction == UNKNOWN) {
addToBeProcessed(entity, context, OUTWARD, INWARD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
Expand All @@ -37,9 +38,11 @@
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.ITestContext;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;

Expand All @@ -48,6 +51,7 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -76,6 +80,25 @@ public class ExportServiceTest extends AtlasTestBase {
@Inject
private AtlasEntityStoreV2 entityStore;

@Inject
private ImportService importService;

private static final String HIVE_TABLE_TYPE_NAME = "hive_table";

private static final String TABLE_1_GUID = "9f9e5088-3ace-4dd7-ae3e-41a93875e264";
private static final String TABLE_2_GUID = "49dc552e-835a-4b88-b752-220e03c6df36";
private static final String TABLE_3_GUID = "8b2f7e53-ac4b-4c6b-8159-8b6b62805d1a";
private static final String TABLE_4_GUID = "e5c2edd4-df48-4646-8f22-0fbdce235496";
private static final String TABLE_5_GUID = "1f6cb442-e7c4-4521-827a-49d567240e74";
private static final String TABLE_6_GUID = "edc9facc-2e76-4dbd-830d-ad7644541451";

private static final String TABLE_1_QUALIFIED_NAME = "hivedb01.hivetable01@primary";
private static final String TABLE_2_QUALIFIED_NAME = "hivedb01.hivetable02@primary";
private static final String TABLE_3_QUALIFIED_NAME = "hivedb01.hivetable03@primary";
private static final String TABLE_4_QUALIFIED_NAME = "hivedb01.hivetable04@primary";
private static final String TABLE_5_QUALIFIED_NAME = "hivedb01.hivetable05@primary";
private static final String TABLE_6_QUALIFIED_NAME = "hivedb01.hivetable06@primary";

@BeforeTest
public void setupTest() throws IOException, AtlasBaseException {
RequestContext.clear();
Expand Down Expand Up @@ -234,6 +257,60 @@ public void verifyTypeFull() throws AtlasBaseException, IOException {
verifyExportForFullEmployeeData(zipSource);
}

@DataProvider(name = "ctashivetables")
public static Object[][] importCtasData(ITestContext context) throws IOException, AtlasBaseException {
return ZipFileResourceTestUtils.getZipSource("ctas_hive_tables.zip");
}

@Test(dataProvider = "ctashivetables")
public void testExportConnectedHiveTables(InputStream inputStream) throws AtlasBaseException, IOException {
AtlasImportResult result = ZipFileResourceTestUtils.runImportWithNoParameters(importService, inputStream);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);

AtlasExportRequest request = getRequestForConnected(HIVE_TABLE_TYPE_NAME, TABLE_1_QUALIFIED_NAME, TABLE_2_QUALIFIED_NAME);
ZipSource zipSource = runExportWithParameters(request);
List<String> guidList = zipSource.getCreationOrder();

assertTrue(guidList.contains(TABLE_1_GUID));
assertTrue(guidList.contains(TABLE_2_GUID));
assertTrue(guidList.contains(TABLE_3_GUID));
assertFalse(guidList.contains(TABLE_4_GUID));
assertFalse(guidList.contains(TABLE_5_GUID));
assertFalse(guidList.contains(TABLE_6_GUID));

request = getRequestForConnected(HIVE_TABLE_TYPE_NAME, TABLE_1_QUALIFIED_NAME, TABLE_6_QUALIFIED_NAME);
zipSource = runExportWithParameters(request);
guidList = zipSource.getCreationOrder();

assertTrue(guidList.contains(TABLE_1_GUID));
assertTrue(guidList.contains(TABLE_2_GUID));
assertFalse(guidList.contains(TABLE_3_GUID));
assertFalse(guidList.contains(TABLE_4_GUID));
assertTrue(guidList.contains(TABLE_5_GUID));
assertTrue(guidList.contains(TABLE_6_GUID));

request = getRequestForConnected(HIVE_TABLE_TYPE_NAME, TABLE_1_QUALIFIED_NAME, TABLE_4_QUALIFIED_NAME);
zipSource = runExportWithParameters(request);
guidList = zipSource.getCreationOrder();

assertTrue(guidList.contains(TABLE_1_GUID));
assertTrue(guidList.contains(TABLE_2_GUID));
assertTrue(guidList.contains(TABLE_3_GUID));
assertTrue(guidList.contains(TABLE_4_GUID));
assertTrue(guidList.contains(TABLE_5_GUID));
assertFalse(guidList.contains(TABLE_6_GUID));
}

private AtlasExportRequest getRequestForConnected(String typeName, String qualifiedName1, String qualifiedName2) {
AtlasExportRequest request = new AtlasExportRequest();
List<AtlasObjectId> itemsToExport = new ArrayList<>();
itemsToExport.add(new AtlasObjectId(typeName, "qualifiedName", qualifiedName1));
itemsToExport.add(new AtlasObjectId(typeName, "qualifiedName", qualifiedName2));
request.setItemsToExport(itemsToExport);
setOptionsMap(request, true, AtlasExportRequest.FETCH_TYPE_CONNECTED, false, "");
return request;
}

private AtlasExportRequest getRequestForFullFetch() {
AtlasExportRequest request = new AtlasExportRequest();

Expand Down
Binary file not shown.
Loading