From 8684ca298eb1033b32d2b5eed3cd4ad0edf57e37 Mon Sep 17 00:00:00 2001 From: "victor.quan" Date: Fri, 16 Oct 2020 15:32:03 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=81=E8=A3=85=E7=BB=9F=E4=B8=80=E8=BE=93?= =?UTF-8?q?=E5=87=BA=E7=9A=84field=E3=80=81record=E7=9A=84DTO=EF=BC=8C?= =?UTF-8?q?=E6=96=B9=E4=BE=BF=E4=BD=BF=E7=94=A8=E6=96=B9=E7=9B=B4=E6=8E=A5?= =?UTF-8?q?=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- javaimpl/src/main/java/boot/FieldDTO.java | 44 ++++++++ javaimpl/src/main/java/boot/FormatRecord.java | 103 ++++++++++++++++++ javaimpl/src/main/java/boot/RecordDTO.java | 99 +++++++++++++++++ 3 files changed, 246 insertions(+) create mode 100644 javaimpl/src/main/java/boot/FieldDTO.java create mode 100644 javaimpl/src/main/java/boot/FormatRecord.java create mode 100644 javaimpl/src/main/java/boot/RecordDTO.java diff --git a/javaimpl/src/main/java/boot/FieldDTO.java b/javaimpl/src/main/java/boot/FieldDTO.java new file mode 100644 index 0000000..44b1531 --- /dev/null +++ b/javaimpl/src/main/java/boot/FieldDTO.java @@ -0,0 +1,44 @@ +package boot; + +/** + * 封装field相关的数据结构 + */ +public class FieldDTO { + private String fieldName; + private String before; + private String after; + + public FieldDTO(String fieldName, String before, String after) { + this.fieldName = fieldName; + this.before = before; + this.after = after; + } + + public FieldDTO() { + + } + + public String getFieldName() { + return fieldName; + } + + public void setFieldName(String fieldName) { + this.fieldName = fieldName; + } + + public String getBefore() { + return before; + } + + public void setBefore(String before) { + this.before = before; + } + + public String getAfter() { + return after; + } + + public void setAfter(String after) { + this.after = after; + } +} diff --git a/javaimpl/src/main/java/boot/FormatRecord.java b/javaimpl/src/main/java/boot/FormatRecord.java new file mode 100644 index 0000000..0fdebb7 --- /dev/null +++ b/javaimpl/src/main/java/boot/FormatRecord.java @@ -0,0 +1,103 @@ +package boot; + +import com.alibaba.dts.formats.avro.Field; +import com.alibaba.dts.formats.avro.Record; +import common.FieldEntryHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import recordprocessor.FieldConverter; + +import java.util.Iterator; +import java.util.List; + +import static common.Util.uncompressionObjectName; + +/** + * 格式化成 {@link boot.RecordDTO} 数据对象。 + */ +public class FormatRecord { + private static final Logger log = LoggerFactory.getLogger(MysqlRecordPrinter.class); + private static final FieldConverter FIELD_CONVERTER = FieldConverter.getConverter("mysql", null); + + + public static RecordDTO format(Record record) { + RecordDTO recordDTO = new RecordDTO(); + switch (record.getOperation()) { + case DDL: { + appendRecordGeneralInfo(record, recordDTO); + break; + } + default: { + List fields = (List) record.getFields(); + FieldEntryHolder[] fieldArray = getFieldEntryHolder(record); + appendRecordGeneralInfo(record, recordDTO); + appendFields(fields, fieldArray[0], fieldArray[1], recordDTO); + break; + } + } + return recordDTO; + } + + + private static FieldEntryHolder[] getFieldEntryHolder(Record record) { + // this is a simple impl, may exist unhandled situation + FieldEntryHolder[] fieldArray = new FieldEntryHolder[2]; + + fieldArray[0] = new FieldEntryHolder((List) record.getBeforeImages()); + fieldArray[1] = new FieldEntryHolder((List) record.getAfterImages()); + + return fieldArray; + } + + private static void appendFields(List fields, FieldEntryHolder before, FieldEntryHolder after, RecordDTO recordDTO) { + if (null != fields) { + Iterator fieldIterator = fields.iterator(); + while (fieldIterator.hasNext() && before.hasNext() && after.hasNext()) { + Field field = fieldIterator.next(); + Object toPrintBefore = before.take(); + Object toPrintAfter = after.take(); + appendField(field, toPrintBefore, toPrintAfter, recordDTO); + } + } + } + + private static void appendField(Field field, Object beforeImage, Object afterImage, RecordDTO recordDTO) { + FieldDTO fieldDTO = new FieldDTO(); + fieldDTO.setFieldName(field.getName()); + if (null != beforeImage) { + fieldDTO.setBefore(FIELD_CONVERTER.convert(field, beforeImage).toString()); + } + if (null != afterImage) { + fieldDTO.setAfter(FIELD_CONVERTER.convert(field, afterImage).toString()); + } + recordDTO.appendFieldDTO(fieldDTO); + } + + private static void appendRecordGeneralInfo(Record record, RecordDTO recordDTO) { + String dbName = null; + String tableName = null; + // here we get db and table name + String[] dbPair = uncompressionObjectName(record.getObjectName()); + if (null != dbPair) { + if (dbPair.length == 2) { + dbName = dbPair[0]; + tableName = dbPair[1]; + } else if (dbPair.length == 3) { + dbName = dbPair[0]; + tableName = dbPair[2]; + } else if (dbPair.length == 1) { + dbName = dbPair[0]; + tableName = ""; + } else { + throw new RuntimeException("invalid db and table name pair for record [" + record + "]"); + } + } + recordDTO.setRecordId(record.getId()); + recordDTO.setDbName(dbName); + recordDTO.setTableName(tableName); + recordDTO.setRecordType(record.getOperation()); + recordDTO.setRecordTimestamp(record.getSourceTimestamp()); + recordDTO.setTags(record.getTags()); + } + +} diff --git a/javaimpl/src/main/java/boot/RecordDTO.java b/javaimpl/src/main/java/boot/RecordDTO.java new file mode 100644 index 0000000..cc7d3ff --- /dev/null +++ b/javaimpl/src/main/java/boot/RecordDTO.java @@ -0,0 +1,99 @@ +package boot; + +import com.alibaba.dts.formats.avro.Operation; +import com.alibaba.dts.formats.avro.Source; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * 封装record数据结构 + */ +public class RecordDTO { + private String dbName; + private String tableName; + private Long recordId; + private com.alibaba.dts.formats.avro.Operation recordType; + private java.lang.Long recordTimestamp; + private java.util.Map tags; + private List fieldDTOList; + + public RecordDTO(String dbName, String tableName, Long recordId, Operation recordType, Long recordTimestamp, Map tags, List fieldDTOList) { + this.dbName = dbName; + this.tableName = tableName; + this.recordId = recordId; + this.recordType = recordType; + this.recordTimestamp = recordTimestamp; + this.tags = tags; + this.fieldDTOList = fieldDTOList; + } + + public RecordDTO() { + + } + + public String getDbName() { + return dbName; + } + + public void setDbName(String dbName) { + this.dbName = dbName; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public Long getRecordId() { + return recordId; + } + + public void setRecordId(Long recordId) { + this.recordId = recordId; + } + + public Operation getRecordType() { + return recordType; + } + + public void setRecordType(Operation recordType) { + this.recordType = recordType; + } + + public Long getRecordTimestamp() { + return recordTimestamp; + } + + public void setRecordTimestamp(Long recordTimestamp) { + this.recordTimestamp = recordTimestamp; + } + + public Map getTags() { + return tags; + } + + public void setTags(Map tags) { + this.tags = tags; + } + + public List getFieldDTOList() { + return fieldDTOList; + } + + public void setFieldDTOList(List fieldDTOList) { + this.fieldDTOList = fieldDTOList; + } + + public void appendFieldDTO(FieldDTO field) { + if (this.fieldDTOList == null) { + this.fieldDTOList = new ArrayList<>(); + } + this.fieldDTOList.add(field); + } + +}