From 260de8538b147aa423e3959574c55014ccd2f346 Mon Sep 17 00:00:00 2001 From: takaidohigasi Date: Thu, 8 Jan 2026 22:53:01 +0900 Subject: [PATCH] codec(ticdc): improve error logging for Debezium encoding failures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add detailed error logging when Debezium field value encoding fails. The error log now includes: - Schema name - Table name - Column name - Value (truncated to 1024 chars if too large, hex-encoded for binary types) - Commit timestamp This helps diagnose encoding issues like enum columns with invalid string values (e.g., issue #12474 in tiflow). Additionally, added panic recovery and hex encoding for binary types to ensure safe and readable error logging. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pkg/sink/codec/debezium/codec.go | 40 ++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/pkg/sink/codec/debezium/codec.go b/pkg/sink/codec/debezium/codec.go index ebf39f1214..889a4fae31 100644 --- a/pkg/sink/codec/debezium/codec.go +++ b/pkg/sink/codec/debezium/codec.go @@ -15,6 +15,7 @@ package debezium import ( "bytes" + "encoding/hex" "fmt" "io" "strconv" @@ -47,6 +48,7 @@ func (c *dbzCodec) writeDebeziumFieldValues( row *chunk.Row, tableInfo *commonType.TableInfo, columnSelector commonEvent.Selector, + commitTs uint64, ) error { var err error writer.WriteObjectField(fieldName, func() { @@ -56,7 +58,35 @@ func (c *dbzCodec) writeDebeziumFieldValues( } err = c.writeDebeziumFieldValue(writer, row, i, colInfo) if err != nil { - log.Error("write Debezium field value meet error", zap.Error(err)) + // Get value for logging with panic recovery and truncation + ft := &colInfo.FieldType + datum := row.GetDatum(i, ft) + valueStr := func() (s string) { + defer func() { + if r := recover(); r != nil { + s = fmt.Sprintf("", r) + } + }() + // Use hex encoding for binary types for better readability + switch ft.GetType() { + case mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob, mysql.TypeVarString, mysql.TypeString: + if mysql.HasBinaryFlag(ft.GetFlag()) { + return hex.EncodeToString(datum.GetBytes()) + } + } + return datum.String() + }() + const maxValueLen = 1024 + if len(valueStr) > maxValueLen { + valueStr = valueStr[:maxValueLen] + "...(truncated)" + } + log.Error("failed to write Debezium field value", + zap.String("schema", tableInfo.GetSchemaName()), + zap.String("table", tableInfo.GetTableName()), + zap.String("column", colInfo.Name.O), + zap.String("value", valueStr), + zap.Uint64("commitTs", commitTs), + zap.Error(err)) break } } @@ -928,18 +958,18 @@ func (c *dbzCodec) EncodeValue( // after: An optional field that specifies the state of the row after the event occurred. // Optional field that specifies the state of the row after the event occurred. // In a delete event value, the after field is null, signifying that the row no longer exists. - err = c.writeDebeziumFieldValues(jWriter, "after", e.GetRows(), e.TableInfo, e.ColumnSelector) + err = c.writeDebeziumFieldValues(jWriter, "after", e.GetRows(), e.TableInfo, e.ColumnSelector, e.CommitTs) } else if e.IsDelete() { jWriter.WriteStringField("op", "d") jWriter.WriteNullField("after") - err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreRows(), e.TableInfo, e.ColumnSelector) + err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreRows(), e.TableInfo, e.ColumnSelector, e.CommitTs) } else if e.IsUpdate() { jWriter.WriteStringField("op", "u") if c.config.DebeziumOutputOldValue { - err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreRows(), e.TableInfo, e.ColumnSelector) + err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreRows(), e.TableInfo, e.ColumnSelector, e.CommitTs) } if err == nil { - err = c.writeDebeziumFieldValues(jWriter, "after", e.GetRows(), e.TableInfo, e.ColumnSelector) + err = c.writeDebeziumFieldValues(jWriter, "after", e.GetRows(), e.TableInfo, e.ColumnSelector, e.CommitTs) } } })