diff --git a/pkg/sink/codec/debezium/codec.go b/pkg/sink/codec/debezium/codec.go index ebf39f121..889a4fae3 100644 --- a/pkg/sink/codec/debezium/codec.go +++ b/pkg/sink/codec/debezium/codec.go @@ -15,6 +15,7 @@ package debezium import ( "bytes" + "encoding/hex" "fmt" "io" "strconv" @@ -47,6 +48,7 @@ func (c *dbzCodec) writeDebeziumFieldValues( row *chunk.Row, tableInfo *commonType.TableInfo, columnSelector commonEvent.Selector, + commitTs uint64, ) error { var err error writer.WriteObjectField(fieldName, func() { @@ -56,7 +58,35 @@ func (c *dbzCodec) writeDebeziumFieldValues( } err = c.writeDebeziumFieldValue(writer, row, i, colInfo) if err != nil { - log.Error("write Debezium field value meet error", zap.Error(err)) + // Get value for logging with panic recovery and truncation + ft := &colInfo.FieldType + datum := row.GetDatum(i, ft) + valueStr := func() (s string) { + defer func() { + if r := recover(); r != nil { + s = fmt.Sprintf("", r) + } + }() + // Use hex encoding for binary types for better readability + switch ft.GetType() { + case mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString: + if mysql.HasBinaryFlag(ft.GetFlag()) { + return hex.EncodeToString(datum.GetBytes()) + } + } + return datum.String() + }() + const maxValueLen = 1024 + if len(valueStr) > maxValueLen { + valueStr = valueStr[:maxValueLen] + "...(truncated)" + } + log.Error("failed to write Debezium field value", + zap.String("schema", tableInfo.GetSchemaName()), + zap.String("table", tableInfo.GetTableName()), + zap.String("column", colInfo.Name.O), + zap.String("value", valueStr), + zap.Uint64("commitTs", commitTs), + zap.Error(err)) break } } @@ -928,18 +958,18 @@ func (c *dbzCodec) EncodeValue( // after: An optional field that specifies the state of the row after the event occurred. // Optional field that specifies the state of the row after the event occurred. // In a delete event value, the after field is null, signifying that the row no longer exists. - err = c.writeDebeziumFieldValues(jWriter, "after", e.GetRows(), e.TableInfo, e.ColumnSelector) + err = c.writeDebeziumFieldValues(jWriter, "after", e.GetRows(), e.TableInfo, e.ColumnSelector, e.CommitTs) } else if e.IsDelete() { jWriter.WriteStringField("op", "d") jWriter.WriteNullField("after") - err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreRows(), e.TableInfo, e.ColumnSelector) + err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreRows(), e.TableInfo, e.ColumnSelector, e.CommitTs) } else if e.IsUpdate() { jWriter.WriteStringField("op", "u") if c.config.DebeziumOutputOldValue { - err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreRows(), e.TableInfo, e.ColumnSelector) + err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreRows(), e.TableInfo, e.ColumnSelector, e.CommitTs) } if err == nil { - err = c.writeDebeziumFieldValues(jWriter, "after", e.GetRows(), e.TableInfo, e.ColumnSelector) + err = c.writeDebeziumFieldValues(jWriter, "after", e.GetRows(), e.TableInfo, e.ColumnSelector, e.CommitTs) } } })