Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 53 additions & 4 deletions drivers/mongodb/internal/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (m *Mongo) changeStreamSync(stream protocol.Stream, pool *protocol.WriterPo
prevResumeToken := m.State.GetCursor(stream.Self(), cdcCursorField)
chunks := m.State.GetChunks(stream.Self())

if prevResumeToken == nil || chunks == nil || chunks.Len() != 0 {
if (prevResumeToken == nil || chunks == nil || chunks.Len() != 0) && (m.config.StategenTimestamp == "") {
// get current resume token and do full load for stream
resumeToken, err := m.getCurrentResumeToken(cdcCtx, collection, pipeline)
if err != nil {
Expand All @@ -75,9 +75,58 @@ func (m *Mongo) changeStreamSync(stream protocol.Stream, pool *protocol.WriterPo
logger.Infof("backfill done for stream[%s]", stream.ID())
}

changeStreamOpts = changeStreamOpts.SetResumeAfter(map[string]any{cdcCursorField: prevResumeToken})
// resume cdc sync from prev resume token
logger.Infof("Starting CDC sync for stream[%s] with resume token[%s]", stream.ID(), prevResumeToken)
if m.config.StategenTimestamp != "" {
// Use timestamp-based starting point
t, err := time.Parse(time.RFC3339, m.config.StategenTimestamp)
if err != nil {
return fmt.Errorf("failed to parse StategenTimestamp (expected RFC3339 format like '2023-12-19T10:30:00Z'): %s", err)
}

ts := primitive.Timestamp{
T: uint32(t.Unix()),
I: 0,
}

initialCursor, err := collection.Watch(cdcCtx, pipeline, options.ChangeStream().SetStartAtOperationTime(&ts))
if err != nil {
return fmt.Errorf("failed to get initial cursor: %s", err)
}
defer initialCursor.Close(cdcCtx)

// Initialize empty chunks to skip backfill
m.State.SetChunks(stream.Self(), types.NewSet[types.Chunk]())

// if no documents found at timestamp, get latest operation token
if !initialCursor.TryNext(cdcCtx) {
logger.Infof("No documents found at timestamp %v for stream [%s], getting latest operation token",
ts, stream.ID())

// Get a cursor to iterate through all operations
lastToken, err := m.getCurrentResumeToken(cdcCtx, collection, pipeline)
if err != nil {
return fmt.Errorf("failed to get latest operation cursor: %s", err)
}

lastTokenparsed := ""
if lastToken != nil {
lastTokenparsed = (*lastToken).Lookup(cdcCursorField).StringValue()
} else {
logger.Warnf("No last token found for stream [%s]", stream.ID())
}

m.State.SetCursor(stream.Self(), cdcCursorField, lastTokenparsed)
return nil
}

changeStreamOpts = changeStreamOpts.SetStartAtOperationTime(&ts)
logger.Infof("Starting CDC for stream [%s] from timestamp %s",
stream.ID(), m.config.StategenTimestamp)
} else {
// Use resume token-based starting point
changeStreamOpts = changeStreamOpts.SetResumeAfter(map[string]any{cdcCursorField: prevResumeToken})
logger.Infof("Starting CDC for stream [%s] with resume token [%s]",
stream.ID(), prevResumeToken)
}

cursor, err := collection.Watch(cdcCtx, pipeline, changeStreamOpts)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions drivers/mongodb/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
DefaultMode types.SyncMode `json:"default_mode"`
RetryCount int `json:"backoff_retry_count"`
PartitionStrategy string `json:"partition_strategy"`
StategenTimestamp string `json:"stategen_timestamp"`
}

func (c *Config) URI() string {
Expand Down
Loading