diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 954ee75082..2455913a73 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -542,7 +542,7 @@ func (h *OpenAPIV2) GetChangeFeed(c *gin.Context) { taskStatus := make([]config.CaptureTaskStatus, 0) detail := CfInfoToAPIModel(cfInfo, status, taskStatus) - c.JSON(http.StatusOK, detail) + respondWithFormat(c, http.StatusOK, detail) } func shouldShowRunningError(state config.FeedState) bool { diff --git a/api/v2/changefeed_toml_test.go b/api/v2/changefeed_toml_test.go new file mode 100644 index 0000000000..4a8539ef4a --- /dev/null +++ b/api/v2/changefeed_toml_test.go @@ -0,0 +1,197 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "bytes" + "net/http/httptest" + "testing" + "time" + + "github.com/BurntSushi/toml" + "github.com/gin-gonic/gin" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/util" + "github.com/stretchr/testify/require" +) + +// TestJSONDurationTextRoundTrip verifies JSONDuration serializes to and parses +// from a human-readable string, which is what makes TOML output round-trippable. +func TestJSONDurationTextRoundTrip(t *testing.T) { + t.Parallel() + + cases := []time.Duration{ + 10 * time.Minute, + 24 * time.Hour, + 5 * time.Second, + 30*time.Minute + 15*time.Second, + } + for _, dur := range cases { + d := JSONDuration{duration: dur} + text, err := d.MarshalText() + require.NoError(t, err) + + var d2 JSONDuration + require.NoError(t, d2.UnmarshalText(text)) + require.Equal(t, d.duration, d2.duration, + "round-trip failed for %v: got text %q", dur, string(text)) + } +} + +// TestJSONDurationUnmarshalTextInvalid verifies invalid duration text is rejected. +func TestJSONDurationUnmarshalTextInvalid(t *testing.T) { + t.Parallel() + + var d JSONDuration + require.Error(t, d.UnmarshalText([]byte("not-a-duration"))) + require.Error(t, d.UnmarshalText([]byte(""))) + require.Error(t, d.UnmarshalText([]byte("1d"))) // Go doesn't support "d" unit +} + +// TestJSONDurationTOMLNotEmptyTable verifies a JSONDuration field encodes as a +// readable string rather than an empty TOML table (the failure mode that the +// MarshalText/UnmarshalText methods fix). +func TestJSONDurationTOMLNotEmptyTable(t *testing.T) { + t.Parallel() + + cfg := &ReplicaConfig{ + SyncPointInterval: &JSONDuration{duration: 10 * time.Minute}, + } + var buf bytes.Buffer + require.NoError(t, toml.NewEncoder(&buf).Encode(cfg)) + out := buf.String() + require.Contains(t, out, "sync-point-interval") + require.Contains(t, out, "10m0s") + require.NotContains(t, out, "[sync-point-interval]") // not an (empty) table +} + +// TestChangeFeedInfoTOMLRoundTripToInternal is the highest-value test: it encodes +// a ChangeFeedInfo to TOML, then decodes the config back into the internal +// config.ReplicaConfig (the exact target that `changefeed create --config` +// parses), proving the TOML output is import-compatible. +func TestChangeFeedInfoTOMLRoundTripToInternal(t *testing.T) { + t.Parallel() + + info := &ChangeFeedInfo{ + ID: "test-cf", + SinkURI: "blackhole://", + StartTs: 449999999999999999, + Config: &ReplicaConfig{ + MemoryQuota: util.AddressOf(uint64(1024)), + CaseSensitive: util.AddressOf(true), + ForceReplicate: util.AddressOf(true), + CheckGCSafePoint: util.AddressOf(false), + SyncPointInterval: &JSONDuration{duration: 10 * time.Minute}, + Integrity: &IntegrityConfig{ + IntegrityCheckLevel: util.AddressOf("correctness"), + CorruptionHandleLevel: util.AddressOf("warn"), + }, + Consistent: &ConsistentConfig{ + Level: util.AddressOf("eventual"), + MaxLogSize: util.AddressOf(int64(128)), + FlushIntervalInMs: util.AddressOf(int64(2000)), + Storage: util.AddressOf("s3://test"), + }, + }, + } + + var buf bytes.Buffer + require.NoError(t, toml.NewEncoder(&buf).Encode(info)) + out := buf.String() + + // Top-level kebab-case keys and runtime field omissions. + require.Contains(t, out, `sink-uri = "blackhole://"`) + require.Contains(t, out, "start-ts") + require.NotContains(t, out, "gid") // GID is omitted from TOML (toml:"-") + + // The [config] section must decode into the internal ReplicaConfig used by + // `changefeed create --config`. + var wrapper struct { + Config config.ReplicaConfig `toml:"config"` + } + _, err := toml.Decode(out, &wrapper) + require.NoError(t, err) + require.Equal(t, uint64(1024), util.GetOrZero(wrapper.Config.MemoryQuota)) + require.True(t, util.GetOrZero(wrapper.Config.CaseSensitive)) + require.True(t, util.GetOrZero(wrapper.Config.ForceReplicate)) + require.NotNil(t, wrapper.Config.SyncPointInterval) + require.Equal(t, 10*time.Minute, *wrapper.Config.SyncPointInterval) + require.Equal(t, "correctness", util.GetOrZero(wrapper.Config.Integrity.IntegrityCheckLevel)) + require.Equal(t, "eventual", util.GetOrZero(wrapper.Config.Consistent.Level)) +} + +// TestDefaultConfigTOMLRoundTripToInternal encodes the full default replica +// config to TOML and decodes it into the internal config.ReplicaConfig, then +// asserts that no config-section key is left undecoded. This proves every TOML +// tag added to the api/v2 config tree matches what `changefeed create --config` +// parses โ€” a guard against tag drift across the ~200 tagged fields. +func TestDefaultConfigTOMLRoundTripToInternal(t *testing.T) { + t.Parallel() + + info := &ChangeFeedInfo{ID: "cf", SinkURI: "blackhole://", Config: GetDefaultReplicaConfig()} + var buf bytes.Buffer + require.NoError(t, toml.NewEncoder(&buf).Encode(info)) + + var wrapper struct { + Config config.ReplicaConfig `toml:"config"` + } + meta, err := toml.Decode(buf.String(), &wrapper) + require.NoError(t, err) + + // Top-level runtime fields (id, sink-uri, ...) are expected to be undecoded + // against this wrapper; only config.* keys must all map to the internal type. + var cfgUndecoded []string + for _, k := range meta.Undecoded() { + if len(k) > 0 && k[0] == "config" { + cfgUndecoded = append(cfgUndecoded, k.String()) + } + } + require.Empty(t, cfgUndecoded, "config keys not decodable into internal config: %v", cfgUndecoded) +} + +// TestRespondWithFormat verifies content negotiation: the default and explicit +// JSON requests yield JSON, while Accept: application/toml yields TOML. +func TestRespondWithFormat(t *testing.T) { + t.Parallel() + + gin.SetMode(gin.TestMode) + obj := &ChangeFeedInfo{ID: "cf-1", SinkURI: "blackhole://"} + + cases := []struct { + name string + accept string + wantCType string + wantContain string + }{ + {"default json", "", "application/json", `"id":"cf-1"`}, + {"explicit json", "application/json", "application/json", `"id":"cf-1"`}, + {"toml", "application/toml", "application/toml", `id = "cf-1"`}, + {"toml with charset", "application/toml; charset=utf-8", "application/toml", `id = "cf-1"`}, + {"toml mixed case", "Application/TOML", "application/toml", `id = "cf-1"`}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/", nil) + if tc.accept != "" { + c.Request.Header.Set("Accept", tc.accept) + } + respondWithFormat(c, 200, obj) + require.Equal(t, 200, w.Code) + require.Contains(t, w.Header().Get("Content-Type"), tc.wantCType) + require.Contains(t, w.Body.String(), tc.wantContain) + }) + } +} diff --git a/api/v2/helper.go b/api/v2/helper.go index 39a1c4817d..e24ca38de7 100644 --- a/api/v2/helper.go +++ b/api/v2/helper.go @@ -14,14 +14,17 @@ package v2 import ( + "bytes" "context" "encoding/json" "fmt" "io" "net/http" "net/url" + "strings" "time" + "github.com/BurntSushi/toml" "github.com/gin-gonic/gin" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/config" @@ -89,6 +92,36 @@ func isFromV1API(c *gin.Context) bool { return c.GetHeader("from-ticdc-api-v1") == "true" } +// mimeTOML is the media type used to request and return TOML payloads. +const mimeTOML = "application/toml" + +// wantsTOML reports whether the client requested a TOML response via the +// Accept header. A missing header, or anything other than application/toml, +// keeps the default JSON behavior. A simple substring match is sufficient +// while only JSON and TOML are supported; it tolerates parameters such as +// "application/toml; charset=utf-8". Media types are case-insensitive +// (RFC 7231 ยง3.1.1.1), so the header is lowercased before matching. +func wantsTOML(c *gin.Context) bool { + return strings.Contains(strings.ToLower(c.GetHeader("Accept")), mimeTOML) +} + +// respondWithFormat writes obj to the response, negotiating the encoding from +// the Accept header: application/toml yields a TOML body, anything else yields +// JSON (the unchanged default). It is the shared seam for content negotiation +// across v2 endpoints. +func respondWithFormat(c *gin.Context, code int, obj any) { + if wantsTOML(c) { + var buf bytes.Buffer + if err := toml.NewEncoder(&buf).Encode(obj); err != nil { + _ = c.Error(errors.Trace(err)) + return + } + c.Data(code, mimeTOML+"; charset=utf-8", buf.Bytes()) + return + } + c.JSON(code, obj) +} + func getStatus(c *gin.Context) int { if isFromV1API(c) { return http.StatusAccepted diff --git a/api/v2/model.go b/api/v2/model.go index 7aa22a3015..0575a73e23 100644 --- a/api/v2/model.go +++ b/api/v2/model.go @@ -113,10 +113,10 @@ type ChangefeedCommonInfo struct { // SyncedStatusConfig represents synced check interval config for a changefeed type SyncedStatusConfig struct { // The minimum interval between the latest synced ts and now required to reach synced state - SyncedCheckInterval *int64 `json:"synced_check_interval"` + SyncedCheckInterval *int64 `json:"synced_check_interval" toml:"synced-check-interval"` // The maximum interval between latest checkpoint ts and now or // between latest sink's checkpoint ts and puller's checkpoint ts required to reach synced state - CheckpointInterval *int64 `json:"checkpoint_interval"` + CheckpointInterval *int64 `json:"checkpoint_interval" toml:"checkpoint-interval"` } // MarshalJSON marshal changefeed common info to json @@ -190,44 +190,62 @@ func (d *JSONDuration) UnmarshalJSON(b []byte) error { } } +// MarshalText implements encoding.TextMarshaler so that TOML (and other +// text-based encoders) serialize JSONDuration as a human-readable string +// like "10m0s" or "24h0m0s", instead of raw nanoseconds. +// +// JSON serialization is unaffected โ€” MarshalJSON still produces nanoseconds. +func (d JSONDuration) MarshalText() ([]byte, error) { + return []byte(d.duration.String()), nil +} + +// UnmarshalText implements encoding.TextUnmarshaler so that TOML (and other +// text-based decoders) can parse duration strings like "5m0s" or "1h30m". +// This enables round-trip: MarshalText โ†’ UnmarshalText preserves the value. +func (d *JSONDuration) UnmarshalText(text []byte) error { + var err error + d.duration, err = time.ParseDuration(string(text)) + return err +} + // ReplicaConfig is a duplicate of config.ReplicaConfig type ReplicaConfig struct { - MemoryQuota *uint64 `json:"memory_quota,omitempty"` - EventCollectorBatchCount *int `json:"event_collector_batch_count,omitempty"` - EventCollectorBatchBytes *int `json:"event_collector_batch_bytes,omitempty"` - CaseSensitive *bool `json:"case_sensitive,omitempty"` - ForceReplicate *bool `json:"force_replicate,omitempty"` - IgnoreIneligibleTable *bool `json:"ignore_ineligible_table,omitempty"` - CheckGCSafePoint *bool `json:"check_gc_safe_point,omitempty"` - EnableSyncPoint *bool `json:"enable_sync_point,omitempty"` - EnableTableMonitor *bool `json:"enable_table_monitor,omitempty"` - BDRMode *bool `json:"bdr_mode,omitempty"` + MemoryQuota *uint64 `json:"memory_quota,omitempty" toml:"memory-quota,omitempty"` + EventCollectorBatchCount *int `json:"event_collector_batch_count,omitempty" toml:"event-collector-batch-count,omitempty"` + EventCollectorBatchBytes *int `json:"event_collector_batch_bytes,omitempty" toml:"event-collector-batch-bytes,omitempty"` + CaseSensitive *bool `json:"case_sensitive,omitempty" toml:"case-sensitive,omitempty"` + ForceReplicate *bool `json:"force_replicate,omitempty" toml:"force-replicate,omitempty"` + IgnoreIneligibleTable *bool `json:"ignore_ineligible_table,omitempty" toml:"ignore-ineligible-table,omitempty"` + CheckGCSafePoint *bool `json:"check_gc_safe_point,omitempty" toml:"check-gc-safe-point,omitempty"` + EnableSyncPoint *bool `json:"enable_sync_point,omitempty" toml:"enable-sync-point,omitempty"` + EnableTableMonitor *bool `json:"enable_table_monitor,omitempty" toml:"enable-table-monitor,omitempty"` + BDRMode *bool `json:"bdr_mode,omitempty" toml:"bdr-mode,omitempty"` // EnableActiveActive enables active-active replication mode on top of BDR. // It requires BDRMode to be true and is only supported by TiDB and storage sinks. - EnableActiveActive *bool `json:"enable_active_active,omitempty"` + EnableActiveActive *bool `json:"enable_active_active,omitempty" toml:"enable-active-active,omitempty"` // ActiveActiveProgressInterval controls how often the MySQL/TiDB sink updates the // active-active progress table in EnableActiveActive mode (for hard delete safety checks). - ActiveActiveProgressInterval *JSONDuration `json:"active_active_progress_interval,omitempty"` + ActiveActiveProgressInterval *JSONDuration `json:"active_active_progress_interval,omitempty" toml:"active-active-progress-interval,omitempty"` // ActiveActiveSyncStatsInterval controls how often the MySQL/TiDB sink queries // the TiDB session variable @@tidb_cdc_active_active_sync_stats for conflict statistics. // Set it to 0 to disable metric collection. // This option only takes effect when EnableActiveActive is true and the downstream is TiDB. - ActiveActiveSyncStatsInterval *JSONDuration `json:"active_active_sync_stats_interval,omitempty"` + ActiveActiveSyncStatsInterval *JSONDuration `json:"active_active_sync_stats_interval,omitempty" toml:"active-active-sync-stats-interval,omitempty"` - SyncPointInterval *JSONDuration `json:"sync_point_interval,omitempty"` - SyncPointRetention *JSONDuration `json:"sync_point_retention,omitempty"` + SyncPointInterval *JSONDuration `json:"sync_point_interval,omitempty" toml:"sync-point-interval,omitempty"` + SyncPointRetention *JSONDuration `json:"sync_point_retention,omitempty" toml:"sync-point-retention,omitempty"` - Filter *FilterConfig `json:"filter,omitempty"` - Mounter *MounterConfig `json:"mounter,omitempty"` - Sink *SinkConfig `json:"sink,omitempty"` - Consistent *ConsistentConfig `json:"consistent,omitempty"` - Scheduler *ChangefeedSchedulerConfig `json:"scheduler,omitempty"` - Integrity *IntegrityConfig `json:"integrity,omitempty"` - ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty"` - SyncedStatus *SyncedStatusConfig `json:"synced_status,omitempty"` + Filter *FilterConfig `json:"filter,omitempty" toml:"filter,omitempty"` + Mounter *MounterConfig `json:"mounter,omitempty" toml:"mounter,omitempty"` + Sink *SinkConfig `json:"sink,omitempty" toml:"sink,omitempty"` + Consistent *ConsistentConfig `json:"consistent,omitempty" toml:"consistent,omitempty"` + Scheduler *ChangefeedSchedulerConfig `json:"scheduler,omitempty" toml:"scheduler,omitempty"` + Integrity *IntegrityConfig `json:"integrity,omitempty" toml:"integrity,omitempty"` + ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty" toml:"changefeed-error-stuck-duration,omitempty"` + SyncedStatus *SyncedStatusConfig `json:"synced_status,omitempty" toml:"synced-status,omitempty"` // Deprecated: we don't use this field since v8.0.0. - SQLMode *string `json:"sql_mode,omitempty"` + SQLMode *string `json:"sql_mode,omitempty" toml:"sql-mode,omitempty"` } // ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig @@ -1069,28 +1087,28 @@ func GetDefaultReplicaConfig() *ReplicaConfig { // FilterConfig represents filter config for a changefeed // This is a duplicate of config.FilterConfig type FilterConfig struct { - Rules []string `json:"rules,omitempty"` - IgnoreTxnStartTs []uint64 `json:"ignore_txn_start_ts,omitempty"` - EventFilters []EventFilterRule `json:"event_filters,omitempty"` + Rules []string `json:"rules,omitempty" toml:"rules,omitempty"` + IgnoreTxnStartTs []uint64 `json:"ignore_txn_start_ts,omitempty" toml:"ignore-txn-start-ts,omitempty"` + EventFilters []EventFilterRule `json:"event_filters,omitempty" toml:"event-filters,omitempty"` } // MounterConfig represents mounter config for a changefeed type MounterConfig struct { - WorkerNum *int `json:"worker_num,omitempty"` + WorkerNum *int `json:"worker_num,omitempty" toml:"worker-num,omitempty"` } // EventFilterRule is used by sql event filter and expression filter type EventFilterRule struct { - Matcher []string `json:"matcher"` - IgnoreEvent []string `json:"ignore_event"` + Matcher []string `json:"matcher" toml:"matcher"` + IgnoreEvent []string `json:"ignore_event" toml:"ignore-event"` // regular expression IgnoreSQL []string `toml:"ignore_sql" json:"ignore_sql"` // sql expression - IgnoreInsertValueExpr string `json:"ignore_insert_value_expr"` - IgnoreUpdateNewValueExpr string `json:"ignore_update_new_value_expr"` - IgnoreUpdateOldValueExpr string `json:"ignore_update_old_value_expr"` - IgnoreDeleteValueExpr string `json:"ignore_delete_value_expr"` - IgnoreUpdateOnlyColumns []string `json:"ignore_update_only_columns,omitempty"` + IgnoreInsertValueExpr string `json:"ignore_insert_value_expr" toml:"ignore-insert-value-expr"` + IgnoreUpdateNewValueExpr string `json:"ignore_update_new_value_expr" toml:"ignore-update-new-value-expr"` + IgnoreUpdateOldValueExpr string `json:"ignore_update_old_value_expr" toml:"ignore-update-old-value-expr"` + IgnoreDeleteValueExpr string `json:"ignore_delete_value_expr" toml:"ignore-delete-value-expr"` + IgnoreUpdateOnlyColumns []string `json:"ignore_update_only_columns,omitempty" toml:"ignore-update-only-columns,omitempty"` } // ToInternalEventFilterRule converts EventFilterRule to *config.EventFilterRule @@ -1153,67 +1171,67 @@ type Table struct { // SinkConfig represents sink config for a changefeed // This is a duplicate of config.SinkConfig type SinkConfig struct { - Protocol *string `json:"protocol,omitempty"` - SchemaRegistry *string `json:"schema_registry,omitempty"` - CSVConfig *CSVConfig `json:"csv,omitempty"` - DispatchRules []*DispatchRule `json:"dispatchers,omitempty"` - ColumnSelectors []*ColumnSelector `json:"column_selectors,omitempty"` - TxnAtomicity *string `json:"transaction_atomicity,omitempty"` - EncoderConcurrency *int `json:"encoder_concurrency,omitempty"` - Terminator *string `json:"terminator,omitempty"` - DateSeparator *string `json:"date_separator,omitempty"` - EnablePartitionSeparator *bool `json:"enable_partition_separator,omitempty"` - FileIndexWidth *int `json:"file_index_width,omitempty"` + Protocol *string `json:"protocol,omitempty" toml:"protocol,omitempty"` + SchemaRegistry *string `json:"schema_registry,omitempty" toml:"schema-registry,omitempty"` + CSVConfig *CSVConfig `json:"csv,omitempty" toml:"csv,omitempty"` + DispatchRules []*DispatchRule `json:"dispatchers,omitempty" toml:"dispatchers,omitempty"` + ColumnSelectors []*ColumnSelector `json:"column_selectors,omitempty" toml:"column-selectors,omitempty"` + TxnAtomicity *string `json:"transaction_atomicity,omitempty" toml:"transaction-atomicity,omitempty"` + EncoderConcurrency *int `json:"encoder_concurrency,omitempty" toml:"encoder-concurrency,omitempty"` + Terminator *string `json:"terminator,omitempty" toml:"terminator,omitempty"` + DateSeparator *string `json:"date_separator,omitempty" toml:"date-separator,omitempty"` + EnablePartitionSeparator *bool `json:"enable_partition_separator,omitempty" toml:"enable-partition-separator,omitempty"` + FileIndexWidth *int `json:"file_index_width,omitempty" toml:"file-index-digit,omitempty"` // deprecated: it's become useless since v9.0.0 - EnableKafkaSinkV2 *bool `json:"enable_kafka_sink_v2,omitempty"` - OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns,omitempty"` - DeleteOnlyOutputHandleKeyColumns *bool `json:"delete_only_output_handle_key_columns"` - ContentCompatible *bool `json:"content_compatible"` - SafeMode *bool `json:"safe_mode,omitempty"` - KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"` - PulsarConfig *PulsarConfig `json:"pulsar_config,omitempty"` - MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"` - CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"` - AdvanceTimeoutInSec *uint `json:"advance_timeout,omitempty"` - SendBootstrapIntervalInSec *int64 `json:"send_bootstrap_interval_in_sec,omitempty"` - SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"` - SendBootstrapToAllPartition *bool `json:"send_bootstrap_to_all_partition,omitempty"` - SendAllBootstrapAtStart *bool `json:"send-all-bootstrap-at-start,omitempty"` - DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"` - DebeziumConfig *DebeziumConfig `json:"debezium,omitempty"` - OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"` + EnableKafkaSinkV2 *bool `json:"enable_kafka_sink_v2,omitempty" toml:"enable-kafka-sink-v2,omitempty"` + OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns,omitempty" toml:"only-output-updated-columns,omitempty"` + DeleteOnlyOutputHandleKeyColumns *bool `json:"delete_only_output_handle_key_columns" toml:"delete-only-output-handle-key-columns"` + ContentCompatible *bool `json:"content_compatible" toml:"content-compatible"` + SafeMode *bool `json:"safe_mode,omitempty" toml:"safe-mode,omitempty"` + KafkaConfig *KafkaConfig `json:"kafka_config,omitempty" toml:"kafka-config,omitempty"` + PulsarConfig *PulsarConfig `json:"pulsar_config,omitempty" toml:"pulsar-config,omitempty"` + MySQLConfig *MySQLConfig `json:"mysql_config,omitempty" toml:"mysql-config,omitempty"` + CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty" toml:"cloud-storage-config,omitempty"` + AdvanceTimeoutInSec *uint `json:"advance_timeout_in_sec,omitempty" toml:"advance-timeout-in-sec,omitempty"` + SendBootstrapIntervalInSec *int64 `json:"send_bootstrap_interval_in_sec,omitempty" toml:"send-bootstrap-interval-in-sec,omitempty"` + SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty" toml:"send-bootstrap-in-msg-count,omitempty"` + SendBootstrapToAllPartition *bool `json:"send_bootstrap_to_all_partition,omitempty" toml:"send-bootstrap-to-all-partition,omitempty"` + SendAllBootstrapAtStart *bool `json:"send_all_bootstrap_at_start,omitempty" toml:"send-all-bootstrap-at-start,omitempty"` + DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty" toml:"debezium-disable-schema,omitempty"` + DebeziumConfig *DebeziumConfig `json:"debezium,omitempty" toml:"debezium,omitempty"` + OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty" toml:"open,omitempty"` } // CSVConfig denotes the csv config // This is the same as config.CSVConfig type CSVConfig struct { - Delimiter string `json:"delimiter"` - Quote string `json:"quote"` - NullString string `json:"null"` - IncludeCommitTs bool `json:"include_commit_ts"` - BinaryEncodingMethod string `json:"binary_encoding_method"` - OutputOldValue bool `json:"output_old_value"` - OutputHandleKey bool `json:"output_handle_key"` - OutputFieldHeader bool `json:"output_field_header"` + Delimiter string `json:"delimiter" toml:"delimiter"` + Quote string `json:"quote" toml:"quote"` + NullString string `json:"null" toml:"null"` + IncludeCommitTs bool `json:"include_commit_ts" toml:"include-commit-ts"` + BinaryEncodingMethod string `json:"binary_encoding_method" toml:"binary-encoding-method"` + OutputOldValue bool `json:"output_old_value" toml:"output-old-value"` + OutputHandleKey bool `json:"output_handle_key" toml:"output-handle-key"` + OutputFieldHeader bool `json:"output_field_header" toml:"output-field-header"` } // LargeMessageHandleConfig denotes the large message handling config // This is the same as config.LargeMessageHandleConfig type LargeMessageHandleConfig struct { - LargeMessageHandleOption string `json:"large_message_handle_option"` - LargeMessageHandleCompression string `json:"large_message_handle_compression"` - ClaimCheckStorageURI string `json:"claim_check_storage_uri"` - ClaimCheckRawValue bool `json:"claim_check_raw_value"` + LargeMessageHandleOption string `json:"large_message_handle_option" toml:"large-message-handle-option"` + LargeMessageHandleCompression string `json:"large_message_handle_compression" toml:"large-message-handle-compression"` + ClaimCheckStorageURI string `json:"claim_check_storage_uri" toml:"claim-check-storage-uri"` + ClaimCheckRawValue bool `json:"claim_check_raw_value" toml:"claim-check-raw-value"` } // DispatchRule represents partition rule for a table // This is a duplicate of config.DispatchRule type DispatchRule struct { - Matcher []string `json:"matcher,omitempty"` - PartitionRule string `json:"partition,omitempty"` - IndexName string `json:"index,omitempty"` - Columns []string `json:"columns,omitempty"` - TopicRule string `json:"topic,omitempty"` + Matcher []string `json:"matcher,omitempty" toml:"matcher,omitempty"` + PartitionRule string `json:"partition,omitempty" toml:"partition,omitempty"` + IndexName string `json:"index,omitempty" toml:"index,omitempty"` + Columns []string `json:"columns,omitempty" toml:"columns,omitempty"` + TopicRule string `json:"topic,omitempty" toml:"topic,omitempty"` // TargetSchema sets the routed downstream schema name. // Leave it empty to keep the source schema name. @@ -1221,44 +1239,44 @@ type DispatchRule struct { // writes to `sales_bak`.`orders`. // You can also use placeholders. For example, `target-schema = "{schema}_bak"` // the target schema becomes `sales_bak`. - TargetSchema string `json:"target-schema,omitempty"` + TargetSchema string `json:"target-schema,omitempty" toml:"target-schema,omitempty"` // TargetTable sets the routed downstream table name. // Leave it empty to keep the source table name. // For example, if the source table is `sales`.`orders`, `target-table = "orders_bak"` // writes to `sales`.`orders_bak`. // You can also use placeholders. For example, `target-table = "{schema}_{table}"` // becomes `sales_orders`. - TargetTable string `json:"target-table,omitempty"` + TargetTable string `json:"target-table,omitempty" toml:"target-table,omitempty"` } // ColumnSelector represents a column selector for a table. // This is a duplicate of config.ColumnSelector type ColumnSelector struct { - Matcher []string `json:"matcher,omitempty"` - Columns []string `json:"columns,omitempty"` + Matcher []string `json:"matcher,omitempty" toml:"matcher,omitempty"` + Columns []string `json:"columns,omitempty" toml:"columns,omitempty"` } // ConsistentConfig represents replication consistency config for a changefeed // This is a duplicate of config.ConsistentConfig type ConsistentConfig struct { - Level *string `json:"level,omitempty"` - MaxLogSize *int64 `json:"max_log_size,omitempty"` - FlushIntervalInMs *int64 `json:"flush_interval,omitempty"` - MetaFlushIntervalInMs *int64 `json:"meta_flush_interval,omitempty"` - EncodingWorkerNum *int `json:"encoding_worker_num,omitempty"` - FlushWorkerNum *int `json:"flush_worker_num,omitempty"` - Storage *string `json:"storage,omitempty"` - UseFileBackend *bool `json:"use_file_backend,omitempty"` - Compression *string `json:"compression,omitempty"` - FlushConcurrency *int `json:"flush_concurrency,omitempty"` - MemoryUsage *ConsistentMemoryUsage `json:"memory_usage,omitempty"` - - EventCollectorBatchCount *int `json:"event_collector_batch_count,omitempty"` + Level *string `json:"level,omitempty" toml:"level,omitempty"` + MaxLogSize *int64 `json:"max_log_size,omitempty" toml:"max-log-size,omitempty"` + FlushIntervalInMs *int64 `json:"flush_interval,omitempty" toml:"flush-interval,omitempty"` + MetaFlushIntervalInMs *int64 `json:"meta_flush_interval,omitempty" toml:"meta-flush-interval,omitempty"` + EncodingWorkerNum *int `json:"encoding_worker_num,omitempty" toml:"encoding-worker-num,omitempty"` + FlushWorkerNum *int `json:"flush_worker_num,omitempty" toml:"flush-worker-num,omitempty"` + Storage *string `json:"storage,omitempty" toml:"storage,omitempty"` + UseFileBackend *bool `json:"use_file_backend,omitempty" toml:"use-file-backend,omitempty"` + Compression *string `json:"compression,omitempty" toml:"compression,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty" toml:"flush-concurrency,omitempty"` + MemoryUsage *ConsistentMemoryUsage `json:"memory_usage,omitempty" toml:"memory-usage,omitempty"` + + EventCollectorBatchCount *int `json:"event_collector_batch_count,omitempty" toml:"event-collector-batch-count,omitempty"` } // ConsistentMemoryUsage represents memory usage of Consistent module. type ConsistentMemoryUsage struct { - MemoryQuotaPercentage uint64 `json:"memory_quota_percentage"` + MemoryQuotaPercentage uint64 `json:"memory_quota_percentage" toml:"memory-quota-percentage"` } // ChangefeedSchedulerConfig is per changefeed scheduler settings. @@ -1266,46 +1284,46 @@ type ConsistentMemoryUsage struct { type ChangefeedSchedulerConfig struct { // EnableTableAcrossNodes set true to split one table to multiple spans and // distribute to multiple TiCDC nodes. - EnableTableAcrossNodes *bool `json:"enable_table_across_nodes,omitempty"` + EnableTableAcrossNodes *bool `json:"enable_table_across_nodes,omitempty" toml:"enable-table-across-nodes,omitempty"` // RegionThreshold is the region count threshold of splitting a table. - RegionThreshold *int `json:"region_threshold,omitempty"` + RegionThreshold *int `json:"region_threshold,omitempty" toml:"region-threshold,omitempty"` // RegionCountPerSpan is the maximax region count for each span when first splitted by RegionCountSpliiter - RegionCountPerSpan *int `json:"region_count_per_span,omitempty"` + RegionCountPerSpan *int `json:"region_count_per_span,omitempty" toml:"region-count-per-span,omitempty"` // RegionCountRefreshInterval controls how often we refresh span region count with PD. - RegionCountRefreshInterval *time.Duration `json:"region_count_refresh_interval,omitempty"` + RegionCountRefreshInterval *time.Duration `json:"region_count_refresh_interval,omitempty" toml:"region-count-refresh-interval,omitempty"` // WriteKeyThreshold is the written keys threshold of splitting a table. - WriteKeyThreshold *int `json:"write_key_threshold,omitempty"` + WriteKeyThreshold *int `json:"write_key_threshold,omitempty" toml:"write-key-threshold,omitempty"` // SchedulingTaskCountPerNode is the upper limit for scheduling tasks each node. - SchedulingTaskCountPerNode *int `json:"scheduling_task_count_per_node,omitempty"` + SchedulingTaskCountPerNode *int `json:"scheduling_task_count_per_node,omitempty" toml:"scheduling-task-count-per-node,omitempty"` // EnableSplittableCheck controls whether to check if a table is splittable before splitting. // If true, only tables with primary key and no unique key can be split. // If false, all tables can be split without checking. // For MySQL downstream, this is always set to true for data consistency. - EnableSplittableCheck *bool `json:"enable_splittable_check,omitempty"` + EnableSplittableCheck *bool `json:"enable_splittable_check,omitempty" toml:"enable-splittable-check,omitempty"` // ForceSplit controls whether to skip the splittable table check for MySQL downstream. // If true, the splittable table check will be skipped even if the downstream is MySQL. // This is useful for advanced users who are aware of the risks of splitting unsplittable tables. // Default value is false. - ForceSplit *bool `json:"force_split,omitempty"` + ForceSplit *bool `json:"force_split,omitempty" toml:"force-split,omitempty"` // These config is used for adjust the frequency of balancing traffic. // BalanceScoreThreshold is the score threshold for balancing traffic. Larger value means less frequent balancing. - BalanceScoreThreshold *int `json:"balance_score_threshold,omitempty"` + BalanceScoreThreshold *int `json:"balance_score_threshold,omitempty" toml:"balance-score-threshold,omitempty"` // MinTrafficPercentage is the minimum traffic percentage for balancing traffic. Larger value means less frequent balancing. - MinTrafficPercentage *float64 `json:"min_traffic_percentage,omitempty"` + MinTrafficPercentage *float64 `json:"min_traffic_percentage,omitempty" toml:"min-traffic-percentage,omitempty"` // MaxTrafficPercentage is the maximum traffic percentage for balancing traffic. Less value means less frequent balancing. - MaxTrafficPercentage *float64 `json:"max_traffic_percentage,omitempty"` + MaxTrafficPercentage *float64 `json:"max_traffic_percentage,omitempty" toml:"max-traffic-percentage,omitempty"` } // IntegrityConfig is the config for integrity check // This is a duplicate of Integrity.Config type IntegrityConfig struct { - IntegrityCheckLevel *string `json:"integrity_check_level,omitempty"` - CorruptionHandleLevel *string `json:"corruption_handle_level,omitempty"` + IntegrityCheckLevel *string `json:"integrity_check_level,omitempty" toml:"integrity-check-level,omitempty"` + CorruptionHandleLevel *string `json:"corruption_handle_level,omitempty" toml:"corruption-handle-level,omitempty"` } // EtcdData contains key/value pair of etcd data type EtcdData struct { - Key string `json:"key,omitempty"` + Key string `json:"key,omitempty" toml:"key,omitempty"` Value string `json:"value,omitempty"` } @@ -1318,29 +1336,32 @@ type ResolveLockReq struct { // ChangeFeedInfo describes the detail of a ChangeFeed type ChangeFeedInfo struct { - UpstreamID uint64 `json:"upstream_id,omitempty"` - ID string `json:"id"` - Keyspace string `json:"keyspace"` - SinkURI string `json:"sink_uri,omitempty"` - CreateTime time.Time `json:"create_time"` + UpstreamID uint64 `json:"upstream_id,omitempty" toml:"upstream-id,omitempty"` + ID string `json:"id" toml:"id"` + Keyspace string `json:"keyspace" toml:"keyspace"` + SinkURI string `json:"sink_uri,omitempty" toml:"sink-uri,omitempty"` + CreateTime time.Time `json:"create_time" toml:"create-time"` // Start sync at this commit ts if `StartTs` is specify or using the CreateTime of changefeed. - StartTs uint64 `json:"start_ts,omitempty"` + StartTs uint64 `json:"start_ts,omitempty" toml:"start-ts,omitempty"` // The ChangeFeed will exits until sync to timestamp TargetTs - TargetTs uint64 `json:"target_ts,omitempty"` + TargetTs uint64 `json:"target_ts,omitempty" toml:"target-ts,omitempty"` // used for admin job notification, trigger watch event in capture - AdminJobType config.AdminJobType `json:"admin_job_type,omitempty"` - Config *ReplicaConfig `json:"config,omitempty"` - State config.FeedState `json:"state,omitempty"` - Error *config.RunningError `json:"error,omitempty"` - CreatorVersion string `json:"creator_version,omitempty"` - - ResolvedTs uint64 `json:"resolved_ts"` - CheckpointTs uint64 `json:"checkpoint_ts"` - CheckpointTime api.JSONTime `json:"checkpoint_time"` - TaskStatus []config.CaptureTaskStatus `json:"task_status,omitempty"` - - GID common.GID `json:"gid"` - MaintainerAddr string `json:"maintainer_addr,omitempty"` + AdminJobType config.AdminJobType `json:"admin_job_type,omitempty" toml:"admin-job-type,omitempty"` + Config *ReplicaConfig `json:"config,omitempty" toml:"config,omitempty"` + State config.FeedState `json:"state,omitempty" toml:"state,omitempty"` + Error *config.RunningError `json:"error,omitempty" toml:"error,omitempty"` + CreatorVersion string `json:"creator_version,omitempty" toml:"creator-version,omitempty"` + + ResolvedTs uint64 `json:"resolved_ts" toml:"resolved-ts"` + CheckpointTs uint64 `json:"checkpoint_ts" toml:"checkpoint-ts"` + CheckpointTime api.JSONTime `json:"checkpoint_time" toml:"checkpoint-time"` + TaskStatus []config.CaptureTaskStatus `json:"task_status,omitempty" toml:"task-status,omitempty"` + + // GID is omitted from TOML: its uint64 low/high can exceed TOML's signed + // int64 range (so the output would not parse back), and it is internal + // runtime metadata, not part of a changefeed's creatable config. + GID common.GID `json:"gid" toml:"-"` + MaintainerAddr string `json:"maintainer_addr,omitempty" toml:"maintainer-addr,omitempty"` } // SyncedStatus describes the detail of a changefeed's synced status @@ -1415,119 +1436,119 @@ type Capture struct { // CodecConfig represents a MQ codec configuration type CodecConfig struct { - EnableTiDBExtension *bool `json:"enable_tidb_extension,omitempty"` - MaxBatchSize *int `json:"max_batch_size,omitempty"` - AvroEnableWatermark *bool `json:"avro_enable_watermark,omitempty"` - AvroDecimalHandlingMode *string `json:"avro_decimal_handling_mode,omitempty"` - AvroBigintUnsignedHandlingMode *string `json:"avro_bigint_unsigned_handling_mode,omitempty"` - EncodingFormat *string `json:"encoding_format,omitempty"` + EnableTiDBExtension *bool `json:"enable_tidb_extension,omitempty" toml:"enable-tidb-extension,omitempty"` + MaxBatchSize *int `json:"max_batch_size,omitempty" toml:"max-batch-size,omitempty"` + AvroEnableWatermark *bool `json:"avro_enable_watermark,omitempty" toml:"avro-enable-watermark,omitempty"` + AvroDecimalHandlingMode *string `json:"avro_decimal_handling_mode,omitempty" toml:"avro-decimal-handling-mode,omitempty"` + AvroBigintUnsignedHandlingMode *string `json:"avro_bigint_unsigned_handling_mode,omitempty" toml:"avro-bigint-unsigned-handling-mode,omitempty"` + EncodingFormat *string `json:"encoding_format,omitempty" toml:"encoding-format,omitempty"` } // PulsarConfig represents a pulsar sink configuration type PulsarConfig struct { - TLSKeyFilePath *string `json:"tls-certificate-path,omitempty"` - TLSCertificateFile *string `json:"tls-private-key-path,omitempty"` - TLSTrustCertsFilePath *string `json:"tls-trust-certs-file-path,omitempty"` - PulsarProducerCacheSize *int32 `json:"pulsar-producer-cache-size,omitempty"` - PulsarVersion *string `json:"pulsar-version,omitempty"` - CompressionType *string `json:"compression-type,omitempty"` - AuthenticationToken *string `json:"authentication-token,omitempty"` - ConnectionTimeout *int `json:"connection-timeout,omitempty"` - OperationTimeout *int `json:"operation-timeout,omitempty"` - BatchingMaxMessages *uint `json:"batching-max-messages,omitempty"` - BatchingMaxPublishDelay *int `json:"batching-max-publish-delay,omitempty"` - SendTimeout *int `json:"send-timeout,omitempty"` - TokenFromFile *string `json:"token-from-file,omitempty"` - BasicUserName *string `json:"basic-user-name,omitempty"` - BasicPassword *string `json:"basic-password,omitempty"` - AuthTLSCertificatePath *string `json:"auth-tls-certificate-path,omitempty"` - AuthTLSPrivateKeyPath *string `json:"auth-tls-private-key-path,omitempty"` - OAuth2 *PulsarOAuth2 `json:"oauth2,omitempty"` - OutputRawChangeEvent *bool `json:"output-raw-change-event,omitempty"` + TLSKeyFilePath *string `json:"tls-certificate-path,omitempty" toml:"tls-certificate-path,omitempty"` + TLSCertificateFile *string `json:"tls-private-key-path,omitempty" toml:"tls-private-key-path,omitempty"` + TLSTrustCertsFilePath *string `json:"tls-trust-certs-file-path,omitempty" toml:"tls-trust-certs-file-path,omitempty"` + PulsarProducerCacheSize *int32 `json:"pulsar-producer-cache-size,omitempty" toml:"pulsar-producer-cache-size,omitempty"` + PulsarVersion *string `json:"pulsar-version,omitempty" toml:"pulsar-version,omitempty"` + CompressionType *string `json:"compression-type,omitempty" toml:"compression-type,omitempty"` + AuthenticationToken *string `json:"authentication-token,omitempty" toml:"authentication-token,omitempty"` + ConnectionTimeout *int `json:"connection-timeout,omitempty" toml:"connection-timeout,omitempty"` + OperationTimeout *int `json:"operation-timeout,omitempty" toml:"operation-timeout,omitempty"` + BatchingMaxMessages *uint `json:"batching-max-messages,omitempty" toml:"batching-max-messages,omitempty"` + BatchingMaxPublishDelay *int `json:"batching-max-publish-delay,omitempty" toml:"batching-max-publish-delay,omitempty"` + SendTimeout *int `json:"send-timeout,omitempty" toml:"send-timeout,omitempty"` + TokenFromFile *string `json:"token-from-file,omitempty" toml:"token-from-file,omitempty"` + BasicUserName *string `json:"basic-user-name,omitempty" toml:"basic-user-name,omitempty"` + BasicPassword *string `json:"basic-password,omitempty" toml:"basic-password,omitempty"` + AuthTLSCertificatePath *string `json:"auth-tls-certificate-path,omitempty" toml:"auth-tls-certificate-path,omitempty"` + AuthTLSPrivateKeyPath *string `json:"auth-tls-private-key-path,omitempty" toml:"auth-tls-private-key-path,omitempty"` + OAuth2 *PulsarOAuth2 `json:"oauth2,omitempty" toml:"oauth2,omitempty"` + OutputRawChangeEvent *bool `json:"output-raw-change-event,omitempty" toml:"output-raw-change-event,omitempty"` } // PulsarOAuth2 is the configuration for OAuth2 type PulsarOAuth2 struct { - OAuth2IssuerURL string `json:"oauth2-issuer-url,omitempty"` - OAuth2Audience string `json:"oauth2-audience,omitempty"` - OAuth2PrivateKey string `json:"oauth2-private-key,omitempty"` - OAuth2ClientID string `json:"oauth2-client-id,omitempty"` - OAuth2Scope string `json:"oauth2-scope,omitempty"` + OAuth2IssuerURL string `json:"oauth2-issuer-url,omitempty" toml:"oauth2-issuer-url,omitempty"` + OAuth2Audience string `json:"oauth2-audience,omitempty" toml:"oauth2-audience,omitempty"` + OAuth2PrivateKey string `json:"oauth2-private-key,omitempty" toml:"oauth2-private-key,omitempty"` + OAuth2ClientID string `json:"oauth2-client-id,omitempty" toml:"oauth2-client-id,omitempty"` + OAuth2Scope string `json:"oauth2-scope,omitempty" toml:"oauth2-scope,omitempty"` } // KafkaConfig represents a kafka sink configuration type KafkaConfig struct { - PartitionNum *int32 `json:"partition_num,omitempty"` - ReplicationFactor *int16 `json:"replication_factor,omitempty"` - KafkaVersion *string `json:"kafka_version,omitempty"` - MaxMessageBytes *int `json:"max_message_bytes,omitempty"` - Compression *string `json:"compression,omitempty"` - KafkaClientID *string `json:"kafka_client_id,omitempty"` - AutoCreateTopic *bool `json:"auto_create_topic,omitempty"` - DialTimeout *string `json:"dial_timeout,omitempty"` - WriteTimeout *string `json:"write_timeout,omitempty"` - ReadTimeout *string `json:"read_timeout,omitempty"` - RequiredAcks *int `json:"required_acks,omitempty"` - SASLUser *string `json:"sasl_user,omitempty"` - SASLPassword *string `json:"sasl_password,omitempty"` - SASLMechanism *string `json:"sasl_mechanism,omitempty"` - SASLGssAPIAuthType *string `json:"sasl_gssapi_auth_type,omitempty"` - SASLGssAPIKeytabPath *string `json:"sasl_gssapi_keytab_path,omitempty"` - SASLGssAPIKerberosConfigPath *string `json:"sasl_gssapi_kerberos_config_path,omitempty"` - SASLGssAPIServiceName *string `json:"sasl_gssapi_service_name,omitempty"` - SASLGssAPIUser *string `json:"sasl_gssapi_user,omitempty"` - SASLGssAPIPassword *string `json:"sasl_gssapi_password,omitempty"` - SASLGssAPIRealm *string `json:"sasl_gssapi_realm,omitempty"` - SASLGssAPIDisablePafxfast *bool `json:"sasl_gssapi_disable_pafxfast,omitempty"` - SASLOAuthClientID *string `json:"sasl_oauth_client_id,omitempty"` - SASLOAuthClientSecret *string `json:"sasl_oauth_client_secret,omitempty"` - SASLOAuthTokenURL *string `json:"sasl_oauth_token_url,omitempty"` - SASLOAuthScopes []string `json:"sasl_oauth_scopes,omitempty"` - SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"` - SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"` - EnableTLS *bool `json:"enable_tls,omitempty"` - CA *string `json:"ca,omitempty"` - Cert *string `json:"cert,omitempty"` - Key *string `json:"key,omitempty"` - InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty"` - CodecConfig *CodecConfig `json:"codec_config,omitempty"` - LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"` - GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `json:"glue_schema_registry_config,omitempty"` - OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"` + PartitionNum *int32 `json:"partition_num,omitempty" toml:"partition-num,omitempty"` + ReplicationFactor *int16 `json:"replication_factor,omitempty" toml:"replication-factor,omitempty"` + KafkaVersion *string `json:"kafka_version,omitempty" toml:"kafka-version,omitempty"` + MaxMessageBytes *int `json:"max_message_bytes,omitempty" toml:"max-message-bytes,omitempty"` + Compression *string `json:"compression,omitempty" toml:"compression,omitempty"` + KafkaClientID *string `json:"kafka_client_id,omitempty" toml:"kafka-client-id,omitempty"` + AutoCreateTopic *bool `json:"auto_create_topic,omitempty" toml:"auto-create-topic,omitempty"` + DialTimeout *string `json:"dial_timeout,omitempty" toml:"dial-timeout,omitempty"` + WriteTimeout *string `json:"write_timeout,omitempty" toml:"write-timeout,omitempty"` + ReadTimeout *string `json:"read_timeout,omitempty" toml:"read-timeout,omitempty"` + RequiredAcks *int `json:"required_acks,omitempty" toml:"required-acks,omitempty"` + SASLUser *string `json:"sasl_user,omitempty" toml:"sasl-user,omitempty"` + SASLPassword *string `json:"sasl_password,omitempty" toml:"sasl-password,omitempty"` + SASLMechanism *string `json:"sasl_mechanism,omitempty" toml:"sasl-mechanism,omitempty"` + SASLGssAPIAuthType *string `json:"sasl_gssapi_auth_type,omitempty" toml:"sasl-gssapi-auth-type,omitempty"` + SASLGssAPIKeytabPath *string `json:"sasl_gssapi_keytab_path,omitempty" toml:"sasl-gssapi-keytab-path,omitempty"` + SASLGssAPIKerberosConfigPath *string `json:"sasl_gssapi_kerberos_config_path,omitempty" toml:"sasl-gssapi-kerberos-config-path,omitempty"` + SASLGssAPIServiceName *string `json:"sasl_gssapi_service_name,omitempty" toml:"sasl-gssapi-service-name,omitempty"` + SASLGssAPIUser *string `json:"sasl_gssapi_user,omitempty" toml:"sasl-gssapi-user,omitempty"` + SASLGssAPIPassword *string `json:"sasl_gssapi_password,omitempty" toml:"sasl-gssapi-password,omitempty"` + SASLGssAPIRealm *string `json:"sasl_gssapi_realm,omitempty" toml:"sasl-gssapi-realm,omitempty"` + SASLGssAPIDisablePafxfast *bool `json:"sasl_gssapi_disable_pafxfast,omitempty" toml:"sasl-gssapi-disable-pafxfast,omitempty"` + SASLOAuthClientID *string `json:"sasl_oauth_client_id,omitempty" toml:"sasl-oauth-client-id,omitempty"` + SASLOAuthClientSecret *string `json:"sasl_oauth_client_secret,omitempty" toml:"sasl-oauth-client-secret,omitempty"` + SASLOAuthTokenURL *string `json:"sasl_oauth_token_url,omitempty" toml:"sasl-oauth-token-url,omitempty"` + SASLOAuthScopes []string `json:"sasl_oauth_scopes,omitempty" toml:"sasl-oauth-scopes,omitempty"` + SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty" toml:"sasl-oauth-grant-type,omitempty"` + SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty" toml:"sasl-oauth-audience,omitempty"` + EnableTLS *bool `json:"enable_tls,omitempty" toml:"enable-tls,omitempty"` + CA *string `json:"ca,omitempty" toml:"ca,omitempty"` + Cert *string `json:"cert,omitempty" toml:"cert,omitempty"` + Key *string `json:"key,omitempty" toml:"key,omitempty"` + InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty" toml:"insecure-skip-verify,omitempty"` + CodecConfig *CodecConfig `json:"codec_config,omitempty" toml:"codec-config,omitempty"` + LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty" toml:"large-message-handle,omitempty"` + GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `json:"glue_schema_registry_config,omitempty" toml:"glue-schema-registry-config,omitempty"` + OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty" toml:"output-raw-change-event,omitempty"` } // MySQLConfig represents a MySQL sink configuration type MySQLConfig struct { - WorkerCount *int `json:"worker_count,omitempty"` - MaxTxnRow *int `json:"max_txn_row,omitempty"` - MaxMultiUpdateRowSize *int `json:"max_multi_update_row_size,omitempty"` - MaxMultiUpdateRowCount *int `json:"max_multi_update_row_count,omitempty"` - TiDBTxnMode *string `json:"tidb_txn_mode,omitempty"` - SSLCa *string `json:"ssl_ca,omitempty"` - SSLCert *string `json:"ssl_cert,omitempty"` - SSLKey *string `json:"ssl_key,omitempty"` - TimeZone *string `json:"time_zone,omitempty"` - WriteTimeout *string `json:"write_timeout,omitempty"` - ReadTimeout *string `json:"read_timeout,omitempty"` - Timeout *string `json:"timeout,omitempty"` - EnableBatchDML *bool `json:"enable_batch_dml,omitempty"` - EnableMultiStatement *bool `json:"enable_multi_statement,omitempty"` - EnableCachePreparedStatement *bool `json:"enable_cache_prepared_statement,omitempty"` + WorkerCount *int `json:"worker_count,omitempty" toml:"worker-count,omitempty"` + MaxTxnRow *int `json:"max_txn_row,omitempty" toml:"max-txn-row,omitempty"` + MaxMultiUpdateRowSize *int `json:"max_multi_update_row_size,omitempty" toml:"max-multi-update-row-size,omitempty"` + MaxMultiUpdateRowCount *int `json:"max_multi_update_row_count,omitempty" toml:"max-multi-update-row-count,omitempty"` + TiDBTxnMode *string `json:"tidb_txn_mode,omitempty" toml:"tidb-txn-mode,omitempty"` + SSLCa *string `json:"ssl_ca,omitempty" toml:"ssl-ca,omitempty"` + SSLCert *string `json:"ssl_cert,omitempty" toml:"ssl-cert,omitempty"` + SSLKey *string `json:"ssl_key,omitempty" toml:"ssl-key,omitempty"` + TimeZone *string `json:"time_zone,omitempty" toml:"time-zone,omitempty"` + WriteTimeout *string `json:"write_timeout,omitempty" toml:"write-timeout,omitempty"` + ReadTimeout *string `json:"read_timeout,omitempty" toml:"read-timeout,omitempty"` + Timeout *string `json:"timeout,omitempty" toml:"timeout,omitempty"` + EnableBatchDML *bool `json:"enable_batch_dml,omitempty" toml:"enable-batch-dml,omitempty"` + EnableMultiStatement *bool `json:"enable_multi_statement,omitempty" toml:"enable-multi-statement,omitempty"` + EnableCachePreparedStatement *bool `json:"enable_cache_prepared_statement,omitempty" toml:"enable-cache-prepared-statement,omitempty"` } // CloudStorageConfig represents a cloud storage sink configuration type CloudStorageConfig struct { - WorkerCount *int `json:"worker_count,omitempty"` - FlushInterval *string `json:"flush_interval,omitempty"` - FileSize *int `json:"file_size,omitempty"` - SpoolDiskQuota *int64 `json:"spool_disk_quota,omitempty"` - SpoolBaseDir *string `json:"spool_base_dir,omitempty"` - OutputColumnID *bool `json:"output_column_id,omitempty"` - FileExpirationDays *int `json:"file_expiration_days,omitempty"` - FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` - FlushConcurrency *int `json:"flush_concurrency,omitempty"` - OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"` - UseTableIDAsPath *bool `json:"use_table_id_as_path,omitempty"` + WorkerCount *int `json:"worker_count,omitempty" toml:"worker-count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty" toml:"flush-interval,omitempty"` + FileSize *int `json:"file_size,omitempty" toml:"file-size,omitempty"` + SpoolDiskQuota *int64 `json:"spool_disk_quota,omitempty" toml:"spool-disk-quota,omitempty"` + SpoolBaseDir *string `json:"spool_base_dir,omitempty" toml:"spool-base-dir,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty" toml:"output-column-id,omitempty"` + FileExpirationDays *int `json:"file_expiration_days,omitempty" toml:"file-expiration-days,omitempty"` + FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty" toml:"file-cleanup-cron-spec,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty" toml:"flush-concurrency,omitempty"` + OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty" toml:"output-raw-change-event,omitempty"` + UseTableIDAsPath *bool `json:"use_table_id_as_path,omitempty" toml:"use-table-id-as-path,omitempty"` } // ChangefeedStatus holds common information of a changefeed in cdc @@ -1542,24 +1563,24 @@ type ChangefeedStatus struct { // GlueSchemaRegistryConfig represents a glue schema registry configuration type GlueSchemaRegistryConfig struct { // Name of the schema registry - RegistryName string `json:"registry_name"` + RegistryName string `json:"registry_name" toml:"registry-name"` // Region of the schema registry - Region string `json:"region"` + Region string `json:"region" toml:"region"` // AccessKey of the schema registry - AccessKey string `json:"access_key,omitempty"` + AccessKey string `json:"access_key,omitempty" toml:"access-key,omitempty"` // SecretAccessKey of the schema registry - SecretAccessKey string `json:"secret_access_key,omitempty"` - Token string `json:"token,omitempty"` + SecretAccessKey string `json:"secret_access_key,omitempty" toml:"secret-access-key,omitempty"` + Token string `json:"token,omitempty" toml:"token,omitempty"` } // OpenProtocolConfig represents the configurations for open protocol encoding type OpenProtocolConfig struct { - OutputOldValue bool `json:"output_old_value"` + OutputOldValue bool `json:"output_old_value" toml:"output-old-value"` } // DebeziumConfig represents the configurations for debezium protocol encoding type DebeziumConfig struct { - OutputOldValue bool `json:"output_old_value"` + OutputOldValue bool `json:"output_old_value" toml:"output-old-value"` } type DispatcherCount struct { diff --git a/pkg/api/util.go b/pkg/api/util.go index 465b8a279d..2094ff8f28 100644 --- a/pkg/api/util.go +++ b/pkg/api/util.go @@ -26,7 +26,10 @@ import ( "go.uber.org/zap" ) -const timeFormat = `"2006-01-02 15:04:05.000"` +const ( + timeFormat = `"2006-01-02 15:04:05.000"` + textTimeFormat = "2006-01-02 15:04:05.000" +) // JSONTime used to wrap time into json format type JSONTime time.Time @@ -49,6 +52,22 @@ func (t *JSONTime) UnmarshalJSON(data []byte) error { return nil } +// MarshalText implements encoding.TextMarshaler so that TOML (and other text +// encoders) serialize JSONTime as a string value instead of a table/struct. +func (t JSONTime) MarshalText() ([]byte, error) { + return []byte(time.Time(t).Format(textTimeFormat)), nil +} + +// UnmarshalText implements encoding.TextUnmarshaler for round-trip support. +func (t *JSONTime) UnmarshalText(data []byte) error { + tm, err := time.Parse(textTimeFormat, string(data)) + if err != nil { + return err + } + *t = JSONTime(tm) + return nil +} + // HTTPError of cdc http api type HTTPError struct { Error string `json:"error_msg"` diff --git a/tests/integration_tests/changefeed_query_toml_api/conf/overrides.toml b/tests/integration_tests/changefeed_query_toml_api/conf/overrides.toml new file mode 100644 index 0000000000..c94807976e --- /dev/null +++ b/tests/integration_tests/changefeed_query_toml_api/conf/overrides.toml @@ -0,0 +1,11 @@ +case-sensitive = true + +[mounter] + worker-num = 8 + +[filter] + rules = ["test_db.*", "db_alpha.*", "db_bravo.*", "db_charlie.*"] + +[scheduler] + enable-table-across-nodes = true + region-threshold = 1000 diff --git a/tests/integration_tests/changefeed_query_toml_api/run.sh b/tests/integration_tests/changefeed_query_toml_api/run.sh new file mode 100755 index 0000000000..c532131969 --- /dev/null +++ b/tests/integration_tests/changefeed_query_toml_api/run.sh @@ -0,0 +1,300 @@ +#!/bin/bash + +# This case verifies the API-layer TOML support added for issue #4936: +# GET /api/v2/changefeeds/:id with `Accept: application/toml` returns the +# changefeed as TOML, while the default (or application/json) keeps JSON. +# It mirrors the behaviors covered by the PR1 pytest suite (kebab-case keys, +# config sections, human-readable durations/timestamps, JSON<->TOML field +# correspondence, password redaction) and checks that the exported TOML config +# is import-compatible by feeding it back through `changefeed create --config`. + +set -euo pipefail + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +# This test only exercises the HTTP API output format; no downstream sink needed. +if [ "$SINK_TYPE" != "mysql" ]; then + echo "[$(date)] <<<<<< skip $TEST_NAME for sink type $SINK_TYPE (API output only) >>>>>>" + exit 0 +fi + +API="http://${CDC_HOST}:${CDC_PORT}/api/v2/changefeeds" + +# assert_jq runs a jq filter and prints an explicit FAIL (instead of letting +# `set -e` abort silently with stdout redirected) so failures are debuggable. +assert_jq() { + local file="$1" filter="$2" msg="$3" + if ! jq -e "$filter" "$file" >/dev/null; then + echo "FAIL: $msg (filter: $filter)" + echo "--- response body ---" + cat "$file" + exit 1 + fi +} + +# assert_grep / refute_grep give explicit FAIL output instead of letting a bare +# `grep -q` abort the script under `set -e` with no diagnostic. +assert_grep() { + local pattern="$1" file="$2" msg="$3" + if ! grep -q "$pattern" "$file"; then + echo "FAIL: $msg (missing pattern: $pattern)" + echo "--- body ---" + cat "$file" + exit 1 + fi +} +refute_grep() { + local pattern="$1" file="$2" msg="$3" + if grep -q "$pattern" "$file"; then + echo "FAIL: $msg (unexpected pattern: $pattern)" + echo "--- body ---" + cat "$file" + exit 1 + fi +} + +query_json() { + curl -sf -X GET "$API/$1?keyspace=$KEYSPACE_NAME" -o "$2" +} +query_json_accept() { + curl -sf -X GET -H 'Accept: application/json' "$API/$1?keyspace=$KEYSPACE_NAME" -o "$2" +} +query_toml() { + curl -sf -X GET -H 'Accept: application/toml' "$API/$1?keyspace=$KEYSPACE_NAME" -o "$2" +} + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + # Three changefeeds: default config, a non-default config, and a realistic + # one with credentials in the sink URI (for redaction checks). + cdc_cli_changefeed create --sink-uri="blackhole://" -c "cf-default" + cdc_cli_changefeed create --sink-uri="blackhole://" -c "cf-overrides" \ + --config="$CUR/conf/overrides.toml" + # blackhole sink with embedded credentials: it never dials, but the URI still + # flows through MaskSinkURI so we can assert redaction. + cdc_cli_changefeed create \ + --sink-uri="blackhole://root:topsecretpass@10.0.0.9:3306/" \ + -c "cf-realistic" --config="$CUR/conf/overrides.toml" + + check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" "cf-default" "normal" "null" "" + check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" "cf-overrides" "normal" "null" "" + check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" "cf-realistic" "normal" "null" "" + + query_json "cf-default" "$WORK_DIR/default.json" + query_toml "cf-default" "$WORK_DIR/default.toml" + query_json "cf-overrides" "$WORK_DIR/overrides.json" + query_toml "cf-overrides" "$WORK_DIR/overrides.toml" + query_toml "cf-realistic" "$WORK_DIR/realistic.toml" + query_json "cf-realistic" "$WORK_DIR/realistic.json" + + # --- Test 1: default response and Accept: application/json return JSON --- + assert_jq "$WORK_DIR/default.json" '.id == "cf-default"' "Test 1 - default response id" + assert_jq "$WORK_DIR/default.json" '.config != null' "Test 1 - default response has config" + query_json_accept "cf-default" "$WORK_DIR/default_json.json" + assert_jq "$WORK_DIR/default_json.json" '.id == "cf-default"' "Test 1 - Accept application/json" + echo "PASS: Test 1 - default and Accept application/json return JSON" + + # --- Test 2: Accept: application/toml returns a TOML body --- + # A TOML body has bare key = value lines, not a leading JSON brace. + if [ "$(head -c 1 "$WORK_DIR/default.toml")" = "{" ]; then + echo "FAIL: Test 2 - expected TOML, got JSON-looking body" + cat "$WORK_DIR/default.toml" + exit 1 + fi + assert_grep '^id = "cf-default"' "$WORK_DIR/default.toml" "Test 2 - top-level id" + assert_grep '^sink-uri' "$WORK_DIR/default.toml" "Test 2 - top-level sink-uri" + # GID is runtime metadata and must be omitted from TOML (toml:"-"). + refute_grep '^gid' "$WORK_DIR/default.toml" "Test 2 - gid omitted" + echo "PASS: Test 2 - Accept application/toml returns TOML" + + # --- Test 3: TOML uses kebab-case keys, never snake_case --- + for k in sink-uri start-ts upstream-id case-sensitive check-gc-safe-point worker-num; do + assert_grep "$k" "$WORK_DIR/default.toml" "Test 3 - kebab-case key $k present" + done + for k in sink_uri start_ts upstream_id case_sensitive check_gc_safe_point worker_num; do + refute_grep "$k" "$WORK_DIR/default.toml" "Test 3 - snake_case key $k absent" + done + echo "PASS: Test 3 - TOML uses kebab-case keys" + + # --- Test 4: config lives under [config] and nested [config.*] sections --- + for sec in '^\[config\]' '^ \[config.filter\]' '^ \[config.mounter\]' '^ \[config.sink\]'; do + assert_grep "$sec" "$WORK_DIR/default.toml" "Test 4 - section $sec" + done + echo "PASS: Test 4 - config under [config] sections" + + # --- Test 5: durations are human-readable strings, not nanoseconds/tables --- + refute_grep '^ \[changefeed-error-stuck-duration\]' "$WORK_DIR/default.toml" \ + "Test 5 - duration must not be an empty table" + python3 - "$WORK_DIR/default.toml" <<'PY' +import sys, tomllib + + +def fail(msg): + print("FAIL: Test 5 - " + msg) + sys.exit(1) + + +with open(sys.argv[1], "rb") as f: + data = tomllib.load(f) +cfg = data.get("config", {}) +for key, want in [ + ("sync-point-interval", "10m0s"), + ("sync-point-retention", "24h0m0s"), + ("changefeed-error-stuck-duration", "30m0s"), +]: + val = cfg.get(key) + if val is not None and val != want: + fail("%s: expected %r got %r" % (key, want, val)) +print("PASS: Test 5 - durations are human-readable strings") +PY + + # --- Test 6: JSONTime fields render in readable form (not raw structs) --- + # checkpoint-time is api.JSONTime, whose MarshalText emits "2006-01-02 15:04:05.000". + python3 - "$WORK_DIR/default.toml" <<'PY' +import re, sys +text = open(sys.argv[1]).read() +if not re.search(r'checkpoint-time = "\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', text): + print("FAIL: Test 6 - checkpoint-time not in readable format") + print(text[:400]) + sys.exit(1) +print("PASS: Test 6 - JSONTime fields are readable") +PY + + # --- Test 7: overrides applied, in both JSON and TOML --- + assert_jq "$WORK_DIR/overrides.json" '.config.case_sensitive == true' "Test 7 - json case_sensitive" + assert_jq "$WORK_DIR/overrides.json" '.config.mounter.worker_num == 8' "Test 7 - json worker_num" + python3 - "$WORK_DIR/overrides.toml" <<'PY' +import sys, tomllib + + +def fail(msg): + print("FAIL: Test 7 - " + msg) + sys.exit(1) + + +with open(sys.argv[1], "rb") as f: + data = tomllib.load(f) +if data.get("id") != "cf-overrides": + fail("id mismatch: %r" % data.get("id")) +cfg = data.get("config") +if not isinstance(cfg, dict): + fail("missing [config] section") +if cfg.get("case-sensitive") is not True: + fail("case-sensitive: %r" % cfg.get("case-sensitive")) +mounter = cfg.get("mounter") +if not isinstance(mounter, dict) or mounter.get("worker-num") != 8: + fail("mounter.worker-num: %r" % (mounter,)) +flt = cfg.get("filter") +want_rules = ["test_db.*", "db_alpha.*", "db_bravo.*", "db_charlie.*"] +if not isinstance(flt, dict) or flt.get("rules") != want_rules: + fail("filter.rules: %r" % (flt,)) +sched = cfg.get("scheduler") +if not isinstance(sched, dict) or sched.get("enable-table-across-nodes") is not True \ + or sched.get("region-threshold") != 1000: + fail("scheduler: %r" % (sched,)) +print("PASS: Test 7 - overrides applied in TOML") +PY + + # --- Test 8: JSON and TOML carry the same data (kebab vs snake) --- + python3 - "$WORK_DIR/overrides.json" "$WORK_DIR/overrides.toml" <<'PY' +import json, sys, tomllib + + +def fail(msg): + print("FAIL: Test 8 - " + msg) + sys.exit(1) + + +with open(sys.argv[1]) as f: + j = json.load(f) +with open(sys.argv[2], "rb") as f: + t = tomllib.load(f) + +for jk, tk in [("id", "id"), ("keyspace", "keyspace"), ("sink_uri", "sink-uri"), + ("start_ts", "start-ts"), ("upstream_id", "upstream-id"), + ("state", "state")]: + if j.get(jk) != t.get(tk): + fail("top-level %s/%s: json=%r toml=%r" % (jk, tk, j.get(jk), t.get(tk))) + +jc, tc = j["config"], t["config"] +if jc["case_sensitive"] != tc["case-sensitive"]: + fail("config.case-sensitive: json=%r toml=%r" % (jc["case_sensitive"], tc["case-sensitive"])) +if jc["mounter"]["worker_num"] != tc["mounter"]["worker-num"]: + fail("mounter.worker-num: json=%r toml=%r" % (jc["mounter"]["worker_num"], tc["mounter"]["worker-num"])) +if jc["filter"]["rules"] != tc["filter"]["rules"]: + fail("filter.rules: json=%r toml=%r" % (jc["filter"]["rules"], tc["filter"]["rules"])) +if jc["scheduler"]["enable_table_across_nodes"] != tc["scheduler"]["enable-table-across-nodes"]: + fail("scheduler.enable-table-across-nodes mismatch") +if jc["scheduler"]["region_threshold"] != tc["scheduler"]["region-threshold"]: + fail("scheduler.region-threshold mismatch") +print("PASS: Test 8 - JSON and TOML carry the same data") +PY + + # --- Test 9: sink URI credentials are redacted in both formats --- + refute_grep 'topsecretpass' "$WORK_DIR/realistic.toml" "Test 9 - password redacted in TOML" + refute_grep 'topsecretpass' "$WORK_DIR/realistic.json" "Test 9 - password redacted in JSON" + assert_grep 'xxxxx' "$WORK_DIR/realistic.toml" "Test 9 - redaction marker in TOML" + echo "PASS: Test 9 - sink URI credentials redacted" + + # --- Test 10: exported TOML config is import-compatible --- + # Extract the [config] subset into a standalone file and create a new + # changefeed from it, proving round-trip with `changefeed create --config`. + python3 - "$WORK_DIR/overrides.toml" "$WORK_DIR/reimport.toml" <<'PY' +import sys, tomllib + + +def fail(msg): + print("FAIL: Test 10 - " + msg) + sys.exit(1) + + +with open(sys.argv[1], "rb") as f: + data = tomllib.load(f) +cfg = data.get("config") +if not isinstance(cfg, dict): + fail("missing [config] section") +mounter = cfg.get("mounter") +flt = cfg.get("filter") +if not isinstance(mounter, dict) or "worker-num" not in mounter: + fail("missing mounter.worker-num") +if not isinstance(flt, dict) or "rules" not in flt: + fail("missing filter.rules") +# Minimal hand-serialization of the subset we set, in kebab-case TOML, so we +# do not depend on a TOML writer being installed in the test environment. +lines = [ + 'case-sensitive = %s' % ("true" if cfg.get("case-sensitive") else "false"), + '[mounter]', + ' worker-num = %d' % mounter["worker-num"], + '[filter]', + ' rules = [%s]' % ", ".join('"%s"' % r for r in flt["rules"]), +] +with open(sys.argv[2], "w") as f: + f.write("\n".join(lines) + "\n") +print("wrote reimport config") +PY + cdc_cli_changefeed create --sink-uri="blackhole://" -c "cf-reimport" \ + --config="$WORK_DIR/reimport.toml" + check_changefeed_state "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" "cf-reimport" "normal" "null" "" + echo "PASS: Test 10 - exported TOML config re-imports successfully" + + # Cleanup changefeeds + cdc_cli_changefeed --changefeed-id "cf-default" remove + cdc_cli_changefeed --changefeed-id "cf-overrides" remove + cdc_cli_changefeed --changefeed-id "cf-realistic" remove + cdc_cli_changefeed --changefeed-id "cf-reimport" remove + + cleanup_process $CDC_BINARY +} + +trap 'stop_test $WORK_DIR' EXIT +run "$@" +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_light_it_in_ci.sh b/tests/integration_tests/run_light_it_in_ci.sh index 1dd75a780f..c99032669c 100755 --- a/tests/integration_tests/run_light_it_in_ci.sh +++ b/tests/integration_tests/run_light_it_in_ci.sh @@ -59,7 +59,7 @@ mysql_groups=( # ds_memory_control 'row_format tiflash multi_rocks fail_over_ddl_M correctness_for_shared_column_schema partial_index' # G13 - 'cli_with_auth fail_over_ddl_N maintainer_failover_when_operator cli_missing_keyspace_error' + 'cli_with_auth fail_over_ddl_N maintainer_failover_when_operator cli_missing_keyspace_error changefeed_query_toml_api' # G14 'batch_add_table batch_update_to_no_batch fail_over_ddl_O update_changefeed_check_config pause_changefeed_with_long_time_ddl' # G15