Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions javaimpl/src/main/java/boot/FieldDTO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package boot;

/**
* 封装field相关的数据结构
*/
public class FieldDTO {
private String fieldName;
private String before;
private String after;

public FieldDTO(String fieldName, String before, String after) {
this.fieldName = fieldName;
this.before = before;
this.after = after;
}

public FieldDTO() {

}

public String getFieldName() {
return fieldName;
}

public void setFieldName(String fieldName) {
this.fieldName = fieldName;
}

public String getBefore() {
return before;
}

public void setBefore(String before) {
this.before = before;
}

public String getAfter() {
return after;
}

public void setAfter(String after) {
this.after = after;
}
}
103 changes: 103 additions & 0 deletions javaimpl/src/main/java/boot/FormatRecord.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package boot;

import com.alibaba.dts.formats.avro.Field;
import com.alibaba.dts.formats.avro.Record;
import common.FieldEntryHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import recordprocessor.FieldConverter;

import java.util.Iterator;
import java.util.List;

import static common.Util.uncompressionObjectName;

/**
* 格式化成 {@link boot.RecordDTO} 数据对象。
*/
public class FormatRecord {
private static final Logger log = LoggerFactory.getLogger(MysqlRecordPrinter.class);
private static final FieldConverter FIELD_CONVERTER = FieldConverter.getConverter("mysql", null);


public static RecordDTO format(Record record) {
RecordDTO recordDTO = new RecordDTO();
switch (record.getOperation()) {
case DDL: {
appendRecordGeneralInfo(record, recordDTO);
break;
}
default: {
List<Field> fields = (List<Field>) record.getFields();
FieldEntryHolder[] fieldArray = getFieldEntryHolder(record);
appendRecordGeneralInfo(record, recordDTO);
appendFields(fields, fieldArray[0], fieldArray[1], recordDTO);
break;
}
}
return recordDTO;
}


private static FieldEntryHolder[] getFieldEntryHolder(Record record) {
// this is a simple impl, may exist unhandled situation
FieldEntryHolder[] fieldArray = new FieldEntryHolder[2];

fieldArray[0] = new FieldEntryHolder((List<Object>) record.getBeforeImages());
fieldArray[1] = new FieldEntryHolder((List<Object>) record.getAfterImages());

return fieldArray;
}

private static void appendFields(List<Field> fields, FieldEntryHolder before, FieldEntryHolder after, RecordDTO recordDTO) {
if (null != fields) {
Iterator<Field> fieldIterator = fields.iterator();
while (fieldIterator.hasNext() && before.hasNext() && after.hasNext()) {
Field field = fieldIterator.next();
Object toPrintBefore = before.take();
Object toPrintAfter = after.take();
appendField(field, toPrintBefore, toPrintAfter, recordDTO);
}
}
}

private static void appendField(Field field, Object beforeImage, Object afterImage, RecordDTO recordDTO) {
FieldDTO fieldDTO = new FieldDTO();
fieldDTO.setFieldName(field.getName());
if (null != beforeImage) {
fieldDTO.setBefore(FIELD_CONVERTER.convert(field, beforeImage).toString());
}
if (null != afterImage) {
fieldDTO.setAfter(FIELD_CONVERTER.convert(field, afterImage).toString());
}
recordDTO.appendFieldDTO(fieldDTO);
}

private static void appendRecordGeneralInfo(Record record, RecordDTO recordDTO) {
String dbName = null;
String tableName = null;
// here we get db and table name
String[] dbPair = uncompressionObjectName(record.getObjectName());
if (null != dbPair) {
if (dbPair.length == 2) {
dbName = dbPair[0];
tableName = dbPair[1];
} else if (dbPair.length == 3) {
dbName = dbPair[0];
tableName = dbPair[2];
} else if (dbPair.length == 1) {
dbName = dbPair[0];
tableName = "";
} else {
throw new RuntimeException("invalid db and table name pair for record [" + record + "]");
}
}
recordDTO.setRecordId(record.getId());
recordDTO.setDbName(dbName);
recordDTO.setTableName(tableName);
recordDTO.setRecordType(record.getOperation());
recordDTO.setRecordTimestamp(record.getSourceTimestamp());
recordDTO.setTags(record.getTags());
}

}
99 changes: 99 additions & 0 deletions javaimpl/src/main/java/boot/RecordDTO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package boot;

import com.alibaba.dts.formats.avro.Operation;
import com.alibaba.dts.formats.avro.Source;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* 封装record数据结构
*/
public class RecordDTO {
private String dbName;
private String tableName;
private Long recordId;
private com.alibaba.dts.formats.avro.Operation recordType;
private java.lang.Long recordTimestamp;
private java.util.Map<java.lang.String, java.lang.String> tags;
private List<FieldDTO> fieldDTOList;

public RecordDTO(String dbName, String tableName, Long recordId, Operation recordType, Long recordTimestamp, Map<String, String> tags, List<FieldDTO> fieldDTOList) {
this.dbName = dbName;
this.tableName = tableName;
this.recordId = recordId;
this.recordType = recordType;
this.recordTimestamp = recordTimestamp;
this.tags = tags;
this.fieldDTOList = fieldDTOList;
}

public RecordDTO() {

}

public String getDbName() {
return dbName;
}

public void setDbName(String dbName) {
this.dbName = dbName;
}

public String getTableName() {
return tableName;
}

public void setTableName(String tableName) {
this.tableName = tableName;
}

public Long getRecordId() {
return recordId;
}

public void setRecordId(Long recordId) {
this.recordId = recordId;
}

public Operation getRecordType() {
return recordType;
}

public void setRecordType(Operation recordType) {
this.recordType = recordType;
}

public Long getRecordTimestamp() {
return recordTimestamp;
}

public void setRecordTimestamp(Long recordTimestamp) {
this.recordTimestamp = recordTimestamp;
}

public Map<String, String> getTags() {
return tags;
}

public void setTags(Map<String, String> tags) {
this.tags = tags;
}

public List<FieldDTO> getFieldDTOList() {
return fieldDTOList;
}

public void setFieldDTOList(List<FieldDTO> fieldDTOList) {
this.fieldDTOList = fieldDTOList;
}

public void appendFieldDTO(FieldDTO field) {
if (this.fieldDTOList == null) {
this.fieldDTOList = new ArrayList<>();
}
this.fieldDTOList.add(field);
}

}