Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.xtable.service;

import static org.apache.xtable.conversion.ConversionUtils.convertToSourceTable;
import static org.apache.xtable.hudi.HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG;
import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
Expand All @@ -27,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import lombok.extern.log4j.Log4j2;

Expand Down Expand Up @@ -186,20 +188,34 @@ public ConversionService(
* @return a ConvertTableResponse containing details of the converted target tables
*/
public ConvertTableResponse convertTable(ConvertTableRequest convertTableRequest) {

Properties sourceProperties = new Properties();
if (convertTableRequest.getConfigurations() != null) {
String partitionSpec =
convertTableRequest.getConfigurations().getOrDefault("partition-spec", null);
if (partitionSpec != null) {
sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionSpec);
}
}

SourceTable sourceTable =
SourceTable.builder()
.name(convertTableRequest.getSourceTableName())
.basePath(convertTableRequest.getSourceTablePath())
.dataPath(convertTableRequest.getSourceDataPath())
.formatName(convertTableRequest.getSourceFormat())
.additionalProperties(sourceProperties)
.build();

List<TargetTable> targetTables = new ArrayList<>();
for (String targetFormat : convertTableRequest.getTargetFormats()) {
TargetTable targetTable =
TargetTable.builder()
.name(convertTableRequest.getSourceTableName())
.basePath(convertTableRequest.getSourceTablePath())
// set the metadata path to the data path as the default (required by Hudi)
.basePath(convertTableRequest.getSourceDataPath())
.formatName(targetFormat)
.additionalProperties(sourceProperties)
.build();
targetTables.add(targetTable);
}
Expand All @@ -220,7 +236,7 @@ public ConvertTableResponse convertTable(ConvertTableRequest convertTableRequest
String schemaString = extractSchemaString(targetTable, internalTable);
convertedTables.add(
ConvertedTable.builder()
.targetFormat(internalTable.getName())
.targetFormat(internalTable.getTableFormat())
.targetSchema(schemaString)
.targetMetadataPath(internalTable.getLatestMetdataPath())
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class ConvertTableRequest {
@JsonProperty("source-table-path")
private String sourceTablePath;

@JsonProperty("source-data-path")
private String sourceDataPath;

@JsonProperty("target-formats")
private List<String> targetFormats;

Expand All @@ -52,12 +55,14 @@ public ConvertTableRequest(
@JsonProperty("source-format") String sourceFormat,
@JsonProperty("source-table-name") String sourceTableName,
@JsonProperty("source-table-path") String sourceTablePath,
@JsonProperty("source-data-path") String sourceDataPath,
@JsonProperty("target-format") List<String> targetFormat,
@JsonProperty("configurations") Map<String, String> configurations) {

this.sourceFormat = sourceFormat;
this.sourceTableName = sourceTableName;
this.sourceTablePath = sourceTablePath;
this.sourceDataPath = sourceDataPath;
this.targetFormats = targetFormat;
this.configurations = configurations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
class TestConversionService {
private static final String SOURCE_NAME = "users";
private static final String SOURCE_PATH = "s3://bucket/tables/users";
private static final String SOURCE_DATA_PATH = "s3://bucket/tables/users/data";
private static final String HUDI_META_PATH = "s3://bucket/tables/users/.hoodie";
private static final String ICEBERG_META_PATH =
"s3://bucket/tables/users/metadata/v1.metadata.json";
Expand Down Expand Up @@ -111,6 +112,7 @@ void convertToTargetHudi() {
.sourceFormat(TableFormat.DELTA)
.sourceTableName(SOURCE_NAME)
.sourceTablePath(SOURCE_PATH)
.sourceDataPath(SOURCE_DATA_PATH)
.targetFormats(Collections.singletonList(TableFormat.HUDI))
.build();

Expand All @@ -120,7 +122,7 @@ void convertToTargetHudi() {
when(provider.getConversionSourceInstance(any())).thenReturn(conversionSrc);
when(conversionSrc.getCurrentTable()).thenReturn(internalTbl);

when(internalTbl.getName()).thenReturn(TableFormat.HUDI);
when(internalTbl.getTableFormat()).thenReturn(TableFormat.HUDI);
when(internalTbl.getLatestMetdataPath()).thenReturn(HUDI_META_PATH);
when(internalTbl.getReadSchema()).thenReturn(internalSchema);

Expand All @@ -146,6 +148,7 @@ void convertToTargetIceberg() {
.sourceFormat(TableFormat.DELTA)
.sourceTableName(SOURCE_NAME)
.sourceTablePath(SOURCE_PATH)
.sourceDataPath(SOURCE_DATA_PATH)
.targetFormats(Collections.singletonList(TableFormat.ICEBERG))
.build();

Expand All @@ -157,7 +160,7 @@ void convertToTargetIceberg() {
when(provider.getConversionSourceInstance(any())).thenReturn(conversionSrc);
when(conversionSrc.getCurrentTable()).thenReturn(internalTbl);

when(internalTbl.getName()).thenReturn(TableFormat.ICEBERG);
when(internalTbl.getTableFormat()).thenReturn(TableFormat.ICEBERG);
when(internalTbl.getLatestMetdataPath()).thenReturn(ICEBERG_META_PATH);
when(internalTbl.getReadSchema()).thenReturn(internalSchema);

Expand Down Expand Up @@ -185,6 +188,7 @@ void convertToTargetDelta() {
.sourceFormat(TableFormat.ICEBERG)
.sourceTableName(SOURCE_NAME)
.sourceTablePath(SOURCE_PATH)
.sourceDataPath(SOURCE_DATA_PATH)
.targetFormats(Collections.singletonList(TableFormat.DELTA))
.build();

Expand All @@ -194,7 +198,7 @@ void convertToTargetDelta() {
when(provider.getConversionSourceInstance(any())).thenReturn(conversionSrc);
when(conversionSrc.getCurrentTable()).thenReturn(internalTbl);

when(internalTbl.getName()).thenReturn(TableFormat.DELTA);
when(internalTbl.getTableFormat()).thenReturn(TableFormat.DELTA);
when(internalTbl.getLatestMetdataPath()).thenReturn(DELTA_META_PATH);
when(internalTbl.getReadSchema()).thenReturn(internalSchema);

Expand Down