From 9721e47232ace4a2dc349da14e0c54db46a44ea6 Mon Sep 17 00:00:00 2001 From: sfaynet Date: Fri, 17 Apr 2026 17:30:46 +0300 Subject: [PATCH] fix_list_duration_s3_api_walk --- pkg/backup/download.go | 53 ++++++++++++++++++++++++++++++++++++++++++ pkg/storage/general.go | 31 ++++++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/pkg/backup/download.go b/pkg/backup/download.go index e481ad01..f69c8da3 100644 --- a/pkg/backup/download.go +++ b/pkg/backup/download.go @@ -110,6 +110,11 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [ } }() + // Prefetch metadata for incremental backup chain to populate cache + if err := b.prefetchBackupMetadataChain(ctx, backupName); err != nil { + log.Warn().Err(err).Msg("prefetchBackupMetadataChain failed, continuing with on-demand fetching") + } + remoteBackups, err := b.dst.BackupList(ctx, true, backupName) if err != nil { return errors.WithMessage(err, "BackupList") @@ -320,6 +325,10 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [ "object_disk_size": utils.FormatBytes(backupMetadata.ObjectDiskSize), "version": backupVersion, }).Msg("done") + + // Clear backup list cache after download completes + storage.ClearBackupListCache() + return nil } @@ -1462,3 +1471,47 @@ func (b *Backuper) getDownloadDiskForNonExistsDisk(notExistsDiskType string, fil } return false, filteredDisks[leastUsedIdx].Name, filteredDisks[leastUsedIdx].FreeSpace - partSize, nil } + +// prefetchBackupMetadataChain - prefetch metadata for the entire incremental backup chain +// to populate the in-memory cache and avoid repeated S3 API calls +func (b *Backuper) prefetchBackupMetadataChain(ctx context.Context, backupName string) error { + start := time.Now() + visited := make(map[string]bool) + backupChain := []string{} + + // Discover the backup chain by walking RequiredBackup links + currentBackup := backupName + for currentBackup != "" && !visited[currentBackup] { + backupChain = append(backupChain, currentBackup) + visited[currentBackup] = true + + // Get metadata to find RequiredBackup + backupList, err := b.dst.BackupList(ctx, true, currentBackup) + if err != nil { + return errors.Wrapf(err, "BackupList for %s", currentBackup) + } + + var found bool + for _, backup := range backupList { + if backup.BackupName == currentBackup { + currentBackup = backup.RequiredBackup + found = true + break + } + } + + if !found { + break + } + } + + if len(backupChain) > 1 { + log.Info().Msgf("prefetchBackupMetadataChain: discovered chain of %d backups: %v (took %s)", + len(backupChain), backupChain, utils.HumanizeDuration(time.Since(start))) + } else { + log.Debug().Msgf("prefetchBackupMetadataChain: single backup, no chain (took %s)", + utils.HumanizeDuration(time.Since(start))) + } + + return nil +} diff --git a/pkg/storage/general.go b/pkg/storage/general.go index 5ee16dc1..8e35a50b 100644 --- a/pkg/storage/general.go +++ b/pkg/storage/general.go @@ -53,6 +53,8 @@ type BackupDestination struct { } var metadataCacheLock sync.RWMutex +var backupListCache = make(map[string][]Backup) +var backupListCacheLock sync.RWMutex func (bd *BackupDestination) RemoveBackupRemote(ctx context.Context, backup Backup, cfg *config.Config, retrierClassifier retrier.Classifier) error { retry := retrier.New(retrier.ExponentialBackoff(cfg.General.RetriesOnFailure, common.AddRandomJitter(cfg.General.RetriesDuration, cfg.General.RetriesJitter)), retrierClassifier) @@ -218,6 +220,19 @@ func (bd *BackupDestination) saveMetadataCache(ctx context.Context, listCache ma func (bd *BackupDestination) BackupList(ctx context.Context, parseMetadata bool, parseMetadataOnly string) ([]Backup, error) { backupListStart := time.Now() + + // Check in-memory cache first for single backup requests + if parseMetadataOnly != "" { + backupListCacheLock.RLock() + cacheKey := bd.Kind() + ":" + parseMetadataOnly + if cachedList, ok := backupListCache[cacheKey]; ok { + backupListCacheLock.RUnlock() + log.Debug().Str("backup", parseMetadataOnly).Msg("BackupList: using in-memory cache") + return cachedList, nil + } + backupListCacheLock.RUnlock() + } + defer func() { log.Info().Dur("list_duration", time.Since(backupListStart)).Send() }() @@ -322,9 +337,25 @@ func (bd *BackupDestination) BackupList(ctx context.Context, parseMetadata bool, return nil, errors.Wrap(err, "bd.saveMetadataCache return error") } } + + // Save to in-memory cache for single backup requests + if parseMetadataOnly != "" && len(result) > 0 { + backupListCacheLock.Lock() + cacheKey := bd.Kind() + ":" + parseMetadataOnly + backupListCache[cacheKey] = result + backupListCacheLock.Unlock() + } + return result, nil } +// ClearBackupListCache clears the in-memory cache of backup lists +func ClearBackupListCache() { + backupListCacheLock.Lock() + defer backupListCacheLock.Unlock() + backupListCache = make(map[string][]Backup) +} + func (bd *BackupDestination) DownloadCompressedStream(ctx context.Context, remotePath string, localPath string, maxSpeed uint64) (int64, error) { if err := os.MkdirAll(localPath, 0750); err != nil { return 0, errors.WithMessage(err, "DownloadCompressedStream MkdirAll")