Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
}
}()

// Prefetch metadata for incremental backup chain to populate cache
if err := b.prefetchBackupMetadataChain(ctx, backupName); err != nil {
log.Warn().Err(err).Msg("prefetchBackupMetadataChain failed, continuing with on-demand fetching")
}

remoteBackups, err := b.dst.BackupList(ctx, true, backupName)
if err != nil {
return errors.WithMessage(err, "BackupList")
Expand Down Expand Up @@ -320,6 +325,10 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
"object_disk_size": utils.FormatBytes(backupMetadata.ObjectDiskSize),
"version": backupVersion,
}).Msg("done")

// Clear backup list cache after download completes
storage.ClearBackupListCache()

return nil
}

Expand Down Expand Up @@ -1462,3 +1471,47 @@ func (b *Backuper) getDownloadDiskForNonExistsDisk(notExistsDiskType string, fil
}
return false, filteredDisks[leastUsedIdx].Name, filteredDisks[leastUsedIdx].FreeSpace - partSize, nil
}

// prefetchBackupMetadataChain - prefetch metadata for the entire incremental backup chain
// to populate the in-memory cache and avoid repeated S3 API calls
func (b *Backuper) prefetchBackupMetadataChain(ctx context.Context, backupName string) error {
start := time.Now()
visited := make(map[string]bool)
backupChain := []string{}

// Discover the backup chain by walking RequiredBackup links
currentBackup := backupName
for currentBackup != "" && !visited[currentBackup] {
backupChain = append(backupChain, currentBackup)
visited[currentBackup] = true

// Get metadata to find RequiredBackup
backupList, err := b.dst.BackupList(ctx, true, currentBackup)
if err != nil {
return errors.Wrapf(err, "BackupList for %s", currentBackup)
}

var found bool
for _, backup := range backupList {
if backup.BackupName == currentBackup {
currentBackup = backup.RequiredBackup
found = true
break
}
}

if !found {
break
}
}

if len(backupChain) > 1 {
log.Info().Msgf("prefetchBackupMetadataChain: discovered chain of %d backups: %v (took %s)",
len(backupChain), backupChain, utils.HumanizeDuration(time.Since(start)))
} else {
log.Debug().Msgf("prefetchBackupMetadataChain: single backup, no chain (took %s)",
utils.HumanizeDuration(time.Since(start)))
}

return nil
}
31 changes: 31 additions & 0 deletions pkg/storage/general.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type BackupDestination struct {
}

var metadataCacheLock sync.RWMutex
var backupListCache = make(map[string][]Backup)
var backupListCacheLock sync.RWMutex

func (bd *BackupDestination) RemoveBackupRemote(ctx context.Context, backup Backup, cfg *config.Config, retrierClassifier retrier.Classifier) error {
retry := retrier.New(retrier.ExponentialBackoff(cfg.General.RetriesOnFailure, common.AddRandomJitter(cfg.General.RetriesDuration, cfg.General.RetriesJitter)), retrierClassifier)
Expand Down Expand Up @@ -218,6 +220,19 @@ func (bd *BackupDestination) saveMetadataCache(ctx context.Context, listCache ma

func (bd *BackupDestination) BackupList(ctx context.Context, parseMetadata bool, parseMetadataOnly string) ([]Backup, error) {
backupListStart := time.Now()

// Check in-memory cache first for single backup requests
if parseMetadataOnly != "" {
backupListCacheLock.RLock()
cacheKey := bd.Kind() + ":" + parseMetadataOnly
if cachedList, ok := backupListCache[cacheKey]; ok {
backupListCacheLock.RUnlock()
log.Debug().Str("backup", parseMetadataOnly).Msg("BackupList: using in-memory cache")
return cachedList, nil
}
backupListCacheLock.RUnlock()
}

defer func() {
log.Info().Dur("list_duration", time.Since(backupListStart)).Send()
}()
Expand Down Expand Up @@ -322,9 +337,25 @@ func (bd *BackupDestination) BackupList(ctx context.Context, parseMetadata bool,
return nil, errors.Wrap(err, "bd.saveMetadataCache return error")
}
}

// Save to in-memory cache for single backup requests
if parseMetadataOnly != "" && len(result) > 0 {
backupListCacheLock.Lock()
cacheKey := bd.Kind() + ":" + parseMetadataOnly
backupListCache[cacheKey] = result
backupListCacheLock.Unlock()
}

return result, nil
}

// ClearBackupListCache clears the in-memory cache of backup lists
func ClearBackupListCache() {
backupListCacheLock.Lock()
defer backupListCacheLock.Unlock()
backupListCache = make(map[string][]Backup)
}

func (bd *BackupDestination) DownloadCompressedStream(ctx context.Context, remotePath string, localPath string, maxSpeed uint64) (int64, error) {
if err := os.MkdirAll(localPath, 0750); err != nil {
return 0, errors.WithMessage(err, "DownloadCompressedStream MkdirAll")
Expand Down
Loading