Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions pkg/sink/codec/debezium/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package debezium

import (
"bytes"
"encoding/hex"
"fmt"
"io"
"strconv"
Expand Down Expand Up @@ -47,6 +48,7 @@ func (c *dbzCodec) writeDebeziumFieldValues(
row *chunk.Row,
tableInfo *commonType.TableInfo,
columnSelector commonEvent.Selector,
commitTs uint64,
) error {
var err error
writer.WriteObjectField(fieldName, func() {
Expand All @@ -56,7 +58,35 @@ func (c *dbzCodec) writeDebeziumFieldValues(
}
err = c.writeDebeziumFieldValue(writer, row, i, colInfo)
if err != nil {
log.Error("write Debezium field value meet error", zap.Error(err))
// Get value for logging with panic recovery and truncation
ft := &colInfo.FieldType
datum := row.GetDatum(i, ft)
valueStr := func() (s string) {
defer func() {
if r := recover(); r != nil {
s = fmt.Sprintf("<panic: %v>", r)
}
}()
// Use hex encoding for binary types for better readability
switch ft.GetType() {
case mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString:
if mysql.HasBinaryFlag(ft.GetFlag()) {
return hex.EncodeToString(datum.GetBytes())
}
}
return datum.String()
}()
const maxValueLen = 1024
if len(valueStr) > maxValueLen {
valueStr = valueStr[:maxValueLen] + "...(truncated)"
}
log.Error("failed to write Debezium field value",
zap.String("schema", tableInfo.GetSchemaName()),
zap.String("table", tableInfo.GetTableName()),
zap.String("column", colInfo.Name.O),
zap.String("value", valueStr),
zap.Uint64("commitTs", commitTs),
zap.Error(err))
break
}
}
Expand Down Expand Up @@ -928,18 +958,18 @@ func (c *dbzCodec) EncodeValue(
// after: An optional field that specifies the state of the row after the event occurred.
// Optional field that specifies the state of the row after the event occurred.
// In a delete event value, the after field is null, signifying that the row no longer exists.
err = c.writeDebeziumFieldValues(jWriter, "after", e.GetRows(), e.TableInfo, e.ColumnSelector)
err = c.writeDebeziumFieldValues(jWriter, "after", e.GetRows(), e.TableInfo, e.ColumnSelector, e.CommitTs)
} else if e.IsDelete() {
jWriter.WriteStringField("op", "d")
jWriter.WriteNullField("after")
err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreRows(), e.TableInfo, e.ColumnSelector)
err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreRows(), e.TableInfo, e.ColumnSelector, e.CommitTs)
} else if e.IsUpdate() {
jWriter.WriteStringField("op", "u")
if c.config.DebeziumOutputOldValue {
err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreRows(), e.TableInfo, e.ColumnSelector)
err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreRows(), e.TableInfo, e.ColumnSelector, e.CommitTs)
}
if err == nil {
err = c.writeDebeziumFieldValues(jWriter, "after", e.GetRows(), e.TableInfo, e.ColumnSelector)
err = c.writeDebeziumFieldValues(jWriter, "after", e.GetRows(), e.TableInfo, e.ColumnSelector, e.CommitTs)
}
}
})
Expand Down