Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions downstreamadapter/sink/cloudstorage/defragmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package cloudstorage
import (
"context"

"github.com/pingcap/log"
commonType "github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/hash"
"github.com/pingcap/ticdc/pkg/sink/cloudstorage"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
"github.com/pingcap/ticdc/utils/chann"
"go.uber.org/zap"
)

// eventFragment is used to attach a sequence number to TxnCallbackableEvent.
Expand All @@ -39,7 +42,17 @@ type eventFragment struct {
encodedMsgs []*common.Message
}

func newEventFragment(seq uint64, version cloudstorage.VersionedTableName, event *commonEvent.DMLEvent) eventFragment {
func newEventFragment(seq uint64, event *commonEvent.DMLEvent) eventFragment {
version := cloudstorage.VersionedTableName{
TableNameWithPhysicTableID: commonType.TableName{
Schema: event.TableInfo.GetSchemaName(),
Table: event.TableInfo.GetTableName(),
TableID: event.PhysicalTableID,
IsPartition: event.TableInfo.IsPartitionTable(),
},
TableInfoVersion: event.TableInfoVersion,
DispatcherID: event.GetDispatcherID(),
}
return eventFragment{
seqNumber: seq,
versionedTable: version,
Expand All @@ -51,10 +64,12 @@ func newEventFragment(seq uint64, version cloudstorage.VersionedTableName, event
// out of order.
type defragmenter struct {
lastDispatchedSeq uint64
future map[uint64]eventFragment
inputCh <-chan eventFragment
outputChs []*chann.DrainableChann[eventFragment]
hasher *hash.PositionInertia

// future record out-of-order events
future map[uint64]eventFragment
inputCh <-chan eventFragment
outputChs []*chann.DrainableChann[eventFragment]
hasher *hash.PositionInertia
}

func newDefragmenter(
Expand All @@ -75,7 +90,7 @@ func (d *defragmenter) Run(ctx context.Context) error {
select {
case <-ctx.Done():
d.future = nil
return errors.Trace(ctx.Err())
return context.Cause(ctx)
case frag, ok := <-d.inputCh:
if !ok {
return nil
Expand All @@ -87,7 +102,9 @@ func (d *defragmenter) Run(ctx context.Context) error {
} else if frag.seqNumber > next {
d.future[frag.seqNumber] = frag
} else {
return nil
log.Error("fragment out of order, this should not happen.",
zap.Uint64("expected", next), zap.Uint64("seq", frag.seqNumber))
return errors.ErrStorageSinkSendFailed.FastGen("out of order fragment, expect %d, but got %d", next, frag.seqNumber)
}
}
}
Expand All @@ -107,12 +124,12 @@ func (d *defragmenter) writeMsgsConsecutive(
default:
}
next := d.lastDispatchedSeq + 1
if frag, ok := d.future[next]; ok {
delete(d.future, next)
d.dispatchFragToDMLWorker(frag)
} else {
frag, ok := d.future[next]
if !ok {
return
}
delete(d.future, next)
d.dispatchFragToDMLWorker(frag)
}
}

Expand Down
24 changes: 4 additions & 20 deletions downstreamadapter/sink/cloudstorage/dml_writers.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,34 +90,18 @@ func (d *dmlWriters) Run(ctx context.Context) error {
return d.defragmenter.Run(ctx)
})

for i := 0; i < len(d.writers); i++ {
for _, w := range d.writers {
eg.Go(func() error {
// UnlimitedChannel will block when there is no event, they cannot dirrectly find ctx.Done()
// Thus, we need to close the channel when the context is done
defer d.encodeGroup.inputCh.Close()
return d.writers[i].Run(ctx)
return w.Run(ctx)
})
}
return eg.Wait()
}

func (d *dmlWriters) AddDMLEvent(event *commonEvent.DMLEvent) {
tbl := cloudstorage.VersionedTableName{
TableNameWithPhysicTableID: commonType.TableName{
Schema: event.TableInfo.GetSchemaName(),
Table: event.TableInfo.GetTableName(),
TableID: event.PhysicalTableID,
IsPartition: event.TableInfo.IsPartitionTable(),
},
TableInfoVersion: event.TableInfoVersion,
DispatcherID: event.GetDispatcherID(),
}
seq := atomic.AddUint64(&d.lastSeqNum, 1)
_ = d.statistics.RecordBatchExecution(func() (int, int64, error) {
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
d.msgCh.Push(newEventFragment(seq, tbl, event))
return int(event.Len()), event.GetSize(), nil
})
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
d.msgCh.Push(newEventFragment(seq, event))
}

func (d *dmlWriters) close() {
Expand Down
5 changes: 2 additions & 3 deletions downstreamadapter/sink/cloudstorage/encoding_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"

commonType "github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/sink/codec"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
"github.com/pingcap/ticdc/utils/chann"
Expand Down Expand Up @@ -79,7 +78,7 @@ func (eg *encodingGroup) runEncoder(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
return context.Cause(ctx)
default:
frag, ok := eg.inputCh.Get()
if !ok || eg.closed.Load() {
Expand All @@ -93,7 +92,7 @@ func (eg *encodingGroup) runEncoder(ctx context.Context) error {

select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
return context.Cause(ctx)
case eg.outputCh <- frag:
}
}
Expand Down
75 changes: 37 additions & 38 deletions downstreamadapter/sink/cloudstorage/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ type sink struct {

dmlWriters *dmlWriters

checkpointChan chan uint64
lastCheckpointTs atomic.Uint64
lastSendCheckpointTsTime time.Time
checkpointChan chan uint64
lastCheckpointTs atomic.Uint64

tableSchemaStore *commonEvent.TableSchemaStore
cron *cron.Cron
Expand Down Expand Up @@ -105,33 +104,32 @@ func New(
putil.GetOrZero(sinkConfig.Protocol),
)
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
// get cloud storage file extension according to the specific protocol.
ext := helper.GetFileExtension(protocol)
// the last param maxMsgBytes is mainly to limit the size of a single message for
// batch protocols in mq scenario. In cloud storage sink, we just set it to max int.
encoderConfig, err := helper.GetEncoderConfig(changefeedID, sinkURI, protocol, sinkConfig, math.MaxInt)
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
storage, err := putil.GetExternalStorageWithDefaultTimeout(ctx, sinkURI.String())
if err != nil {
return nil, err
return nil, errors.WrapError(errors.ErrFailToCreateExternalStorage, err)
}
statistics := metrics.NewStatistics(changefeedID, "cloudstorage")
return &sink{
changefeedID: changefeedID,
sinkURI: sinkURI,
cfg: cfg,
cleanupJobs: cleanupJobs,
storage: storage,
dmlWriters: newDMLWriters(changefeedID, storage, cfg, encoderConfig, ext, statistics),
checkpointChan: make(chan uint64, 16),
lastSendCheckpointTsTime: time.Now(),
outputRawChangeEvent: sinkConfig.CloudStorageConfig.GetOutputRawChangeEvent(),
statistics: statistics,
isNormal: atomic.NewBool(true),
changefeedID: changefeedID,
sinkURI: sinkURI,
cfg: cfg,
cleanupJobs: cleanupJobs,
storage: storage,
dmlWriters: newDMLWriters(changefeedID, storage, cfg, encoderConfig, ext, statistics),
checkpointChan: make(chan uint64, 16),
outputRawChangeEvent: sinkConfig.CloudStorageConfig.GetOutputRawChangeEvent(),
statistics: statistics,
isNormal: atomic.NewBool(true),

ctx: ctx,
}, nil
Expand Down Expand Up @@ -171,20 +169,20 @@ func (s *sink) AddDMLEvent(event *commonEvent.DMLEvent) {
}

func (s *sink) WriteBlockEvent(event commonEvent.BlockEvent) error {
var err error
switch e := event.(type) {
case *commonEvent.DDLEvent:
err = s.writeDDLEvent(e)
err := s.writeDDLEvent(e)
if err != nil {
s.isNormal.Store(false)
return err
}
default:
log.Error("cloudstorage sink doesn't support this type of block event",
// cloud storage sink only handle the DDL event, if other kinds of block event also send
// to it by mistake, can be skipped silently, does not affect the overall correctness.
log.Error("cloudstorage sink doesn't support this type of block event, skip it",
zap.String("namespace", s.changefeedID.Keyspace()),
zap.String("changefeed", s.changefeedID.Name()),
zap.String("eventType", commonEvent.TypeToString(event.GetType())))
return errors.ErrInvalidEventType.GenWithStackByArgs(commonEvent.TypeToString(event.GetType()))
}
if err != nil {
s.isNormal.Store(false)
return err
}
event.PostFlush()
return nil
Expand All @@ -198,35 +196,35 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error {
def.FromTableInfo(event.ExtraSchemaName, event.ExtraTableName, event.TableInfo, event.FinishedTs, s.cfg.OutputColumnID)
def.Query = event.Query
def.Type = event.Type
if err := s.writeFile(event, def); err != nil {
if err := s.writeDDLEvent2File(event, def); err != nil {
return err
}
var sourceTableDef cloudstorage.TableDefinition
sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, event.MultipleTableInfos[1], event.FinishedTs, s.cfg.OutputColumnID)
if err := s.writeFile(event, sourceTableDef); err != nil {
if err := s.writeDDLEvent2File(event, sourceTableDef); err != nil {
return err
}
} else {
for _, e := range event.GetEvents() {
var def cloudstorage.TableDefinition
def.FromDDLEvent(e, s.cfg.OutputColumnID)
if err := s.writeFile(e, def); err != nil {
if err := s.writeDDLEvent2File(e, def); err != nil {
return err
}
}
}
return nil
}

func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition) error {
func (s *sink) writeDDLEvent2File(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition) error {
encodedDef, err := def.MarshalWithQuery()
if err != nil {
return errors.Trace(err)
return err
}

path, err := def.GenerateSchemaFilePath()
if err != nil {
return errors.Trace(err)
return err
}
log.Debug("write ddl event to external storage",
zap.String("path", path), zap.Any("ddl", v))
Expand All @@ -244,7 +242,7 @@ func (s *sink) AddCheckpointTs(ts uint64) {
case s.checkpointChan <- ts:
case <-s.ctx.Done():
return
// We can just drop the checkpoint ts if the channel is full to avoid blocking since the checkpointTs will come indefinitely
// We can just drop the checkpoint ts if the channel is full to avoid blocking since the checkpointTs will come indefinitely
default:
}
}
Expand All @@ -257,14 +255,15 @@ func (s *sink) sendCheckpointTs(ctx context.Context) error {
metrics.CheckpointTsMessageCount.DeleteLabelValues(s.changefeedID.Keyspace(), s.changefeedID.Name())
}()

lastSendCheckpointTsTime := time.Now()
var (
checkpoint uint64
ok bool
)
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
return context.Cause(ctx)
case checkpoint, ok = <-s.checkpointChan:
if !ok {
log.Warn("cloud storage sink checkpoint channel closed",
Expand All @@ -274,8 +273,8 @@ func (s *sink) sendCheckpointTs(ctx context.Context) error {
}
}

if time.Since(s.lastSendCheckpointTsTime) < 2*time.Second {
log.Warn("skip write checkpoint ts to external storage",
if time.Since(lastSendCheckpointTsTime) < 2*time.Second {
log.Debug("skip write checkpoint ts to external storage",
zap.Any("changefeedID", s.changefeedID),
zap.Uint64("checkpoint", checkpoint))
continue
Expand All @@ -293,14 +292,14 @@ func (s *sink) sendCheckpointTs(ctx context.Context) error {
}
err = s.storage.WriteFile(ctx, "metadata", message)
if err != nil {
log.Error("CloudStorageSink storage write file failed",
log.Error("CloudStorageSink storage update checkpoint to metadata file failed",
zap.String("keyspace", s.changefeedID.Keyspace()),
zap.String("changefeed", s.changefeedID.Name()),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
return errors.Trace(err)
return errors.WrapError(errors.ErrStorageSinkSendFailed, err)
}
s.lastSendCheckpointTsTime = time.Now()
lastSendCheckpointTsTime = time.Now()
s.lastCheckpointTs.Store(checkpoint)

checkpointTsMessageCount.Inc()
Expand Down
Loading