Skip to content

Commit 00ec00d

Browse files
committed
feat: streaming validation for MySQL and MariaDB backups
- Remove downloadBackupToTempFile function from MySQL/MariaDB validation - Use direct streaming from storage instead of downloading to temp file - Reduces disk usage from full backup size to ~0 MB during validation - Reduces network traffic from full backup size to ~64 KB - Validation now completes in seconds instead of minutes/hours for large backups
1 parent 2fc7bd9 commit 00ec00d

9 files changed

Lines changed: 55 additions & 196 deletions

File tree

backend/internal/features/backups/backups/usecases/mariadb/validate_backup_uc.go

Lines changed: 24 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,18 @@ import (
66
"fmt"
77
"io"
88
"log/slog"
9-
"os"
10-
"path/filepath"
119
"strings"
1210
"time"
1311

1412
"github.com/klauspost/compress/zstd"
1513

16-
"postgresus-backend/internal/config"
1714
backup_encryption "postgresus-backend/internal/features/backups/backups/encryption"
1815
backups_config "postgresus-backend/internal/features/backups/config"
1916
usecases_common "postgresus-backend/internal/features/backups/backups/usecases/common"
2017
"postgresus-backend/internal/features/databases"
2118
encryption_secrets "postgresus-backend/internal/features/encryption/secrets"
2219
"postgresus-backend/internal/features/storages"
2320
util_encryption "postgresus-backend/internal/util/encryption"
24-
files_utils "postgresus-backend/internal/util/files"
25-
26-
"github.com/google/uuid"
2721
)
2822

2923
type ValidateMariadbBackupUsecase struct {
@@ -46,7 +40,7 @@ func (uc *ValidateMariadbBackupUsecase) Execute(
4640
storage *storages.Storage,
4741
) (*ValidationResult, error) {
4842
uc.logger.Info(
49-
"Validating MariaDB backup integrity",
43+
"Validating MariaDB backup integrity (streaming)",
5044
"backupId", backup.ID,
5145
"databaseId", database.ID,
5246
)
@@ -59,27 +53,35 @@ func (uc *ValidateMariadbBackupUsecase) Execute(
5953
}, nil
6054
}
6155

62-
// Download backup to temporary file
63-
tempFile, cleanupFunc, err := uc.downloadBackupToTempFile(ctx, backup, storage)
56+
// Get backup data from storage (streaming, no temp file)
57+
rawReader, err := storage.GetFile(uc.fieldEncryptor, backup.ID)
6458
if err != nil {
6559
return &ValidationResult{
6660
IsValid: false,
67-
Error: stringPtr(fmt.Sprintf("failed to download backup: %v", err)),
61+
Error: stringPtr(fmt.Sprintf("failed to get backup file from storage: %v", err)),
6862
}, nil
6963
}
70-
defer cleanupFunc()
64+
defer func() {
65+
if err := rawReader.Close(); err != nil {
66+
uc.logger.Error("Failed to close backup reader", "error", err)
67+
}
68+
}()
7169

72-
// Open and decompress zstd file
73-
file, err := os.Open(tempFile)
74-
if err != nil {
75-
return &ValidationResult{
76-
IsValid: false,
77-
Error: stringPtr(fmt.Sprintf("failed to open backup file: %v", err)),
78-
}, nil
70+
// Setup decryption if needed
71+
var backupReader io.Reader = rawReader
72+
if backup.Encryption == backups_config.BackupEncryptionEncrypted {
73+
decryptReader, err := uc.setupDecryption(rawReader, backup)
74+
if err != nil {
75+
return &ValidationResult{
76+
IsValid: false,
77+
Error: stringPtr(fmt.Sprintf("failed to setup decryption: %v", err)),
78+
}, nil
79+
}
80+
backupReader = decryptReader
7981
}
80-
defer file.Close()
8182

82-
zstdReader, err := zstd.NewReader(file)
83+
// Decompress zstd stream
84+
zstdReader, err := zstd.NewReader(backupReader)
8385
if err != nil {
8486
return &ValidationResult{
8587
IsValid: false,
@@ -89,7 +91,7 @@ func (uc *ValidateMariadbBackupUsecase) Execute(
8991
}
9092
defer zstdReader.Close()
9193

92-
// Read first 64KB for syntax validation
94+
// Read first 64KB for syntax validation (streaming!)
9395
buffer := make([]byte, 64*1024)
9496
n, err := zstdReader.Read(buffer)
9597
if err != nil && err != io.EOF {
@@ -122,7 +124,7 @@ func (uc *ValidateMariadbBackupUsecase) Execute(
122124
}
123125

124126
details := fmt.Sprintf(
125-
"Backup file is valid. Contains CREATE TABLE: %v, INSERT: %v",
127+
"Backup file is valid (streaming validation). Contains CREATE TABLE: %v, INSERT: %v",
126128
hasCreateTable,
127129
hasInsert,
128130
)
@@ -134,81 +136,6 @@ func (uc *ValidateMariadbBackupUsecase) Execute(
134136
}, nil
135137
}
136138

137-
// downloadBackupToTempFile downloads backup data from storage to a temporary file
138-
func (uc *ValidateMariadbBackupUsecase) downloadBackupToTempFile(
139-
ctx context.Context,
140-
backup *usecases_common.BackupInfo,
141-
storage *storages.Storage,
142-
) (string, func(), error) {
143-
err := files_utils.EnsureDirectories([]string{
144-
config.GetEnv().TempFolder,
145-
})
146-
if err != nil {
147-
return "", nil, fmt.Errorf("failed to ensure directories: %w", err)
148-
}
149-
150-
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "validate_"+uuid.New().String())
151-
if err != nil {
152-
return "", nil, fmt.Errorf("failed to create temporary directory: %w", err)
153-
}
154-
155-
cleanupFunc := func() {
156-
_ = os.RemoveAll(tempDir)
157-
}
158-
159-
tempBackupFile := filepath.Join(tempDir, "backup.sql.zst")
160-
161-
uc.logger.Info(
162-
"Downloading backup file from storage to temporary file",
163-
"backupId", backup.ID,
164-
"tempFile", tempBackupFile,
165-
"encrypted", backup.Encryption == backups_config.BackupEncryptionEncrypted,
166-
)
167-
168-
fieldEncryptor := util_encryption.GetFieldEncryptor()
169-
rawReader, err := storage.GetFile(fieldEncryptor, backup.ID)
170-
if err != nil {
171-
cleanupFunc()
172-
return "", nil, fmt.Errorf("failed to get backup file from storage: %w", err)
173-
}
174-
defer func() {
175-
if err := rawReader.Close(); err != nil {
176-
uc.logger.Error("Failed to close backup reader", "error", err)
177-
}
178-
}()
179-
180-
// Create a reader that handles decryption if needed
181-
var backupReader io.Reader = rawReader
182-
if backup.Encryption == backups_config.BackupEncryptionEncrypted {
183-
decryptReader, err := uc.setupDecryption(rawReader, backup)
184-
if err != nil {
185-
cleanupFunc()
186-
return "", nil, fmt.Errorf("failed to setup decryption: %w", err)
187-
}
188-
backupReader = decryptReader
189-
}
190-
191-
tempFile, err := os.Create(tempBackupFile)
192-
if err != nil {
193-
cleanupFunc()
194-
return "", nil, fmt.Errorf("failed to create temporary backup file: %w", err)
195-
}
196-
defer func() {
197-
if err := tempFile.Close(); err != nil {
198-
uc.logger.Error("Failed to close temporary file", "error", err)
199-
}
200-
}()
201-
202-
_, err = io.Copy(tempFile, backupReader)
203-
if err != nil {
204-
cleanupFunc()
205-
return "", nil, fmt.Errorf("failed to write backup to temporary file: %w", err)
206-
}
207-
208-
uc.logger.Info("Backup file written to temporary location", "tempFile", tempBackupFile)
209-
return tempBackupFile, cleanupFunc, nil
210-
}
211-
212139
func (uc *ValidateMariadbBackupUsecase) setupDecryption(
213140
reader io.Reader,
214141
backup *usecases_common.BackupInfo,
@@ -250,4 +177,3 @@ func (uc *ValidateMariadbBackupUsecase) setupDecryption(
250177
func stringPtr(s string) *string {
251178
return &s
252179
}
253-

backend/internal/features/backups/backups/usecases/mysql/validate_backup_uc.go

Lines changed: 24 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,18 @@ import (
66
"fmt"
77
"io"
88
"log/slog"
9-
"os"
10-
"path/filepath"
119
"strings"
1210
"time"
1311

1412
"github.com/klauspost/compress/zstd"
1513

16-
"postgresus-backend/internal/config"
1714
backup_encryption "postgresus-backend/internal/features/backups/backups/encryption"
1815
backups_config "postgresus-backend/internal/features/backups/config"
1916
usecases_common "postgresus-backend/internal/features/backups/backups/usecases/common"
2017
"postgresus-backend/internal/features/databases"
2118
encryption_secrets "postgresus-backend/internal/features/encryption/secrets"
2219
"postgresus-backend/internal/features/storages"
2320
util_encryption "postgresus-backend/internal/util/encryption"
24-
files_utils "postgresus-backend/internal/util/files"
25-
26-
"github.com/google/uuid"
2721
)
2822

2923
type ValidateMysqlBackupUsecase struct {
@@ -46,7 +40,7 @@ func (uc *ValidateMysqlBackupUsecase) Execute(
4640
storage *storages.Storage,
4741
) (*ValidationResult, error) {
4842
uc.logger.Info(
49-
"Validating MySQL backup integrity",
43+
"Validating MySQL backup integrity (streaming)",
5044
"backupId", backup.ID,
5145
"databaseId", database.ID,
5246
)
@@ -59,27 +53,35 @@ func (uc *ValidateMysqlBackupUsecase) Execute(
5953
}, nil
6054
}
6155

62-
// Download backup to temporary file
63-
tempFile, cleanupFunc, err := uc.downloadBackupToTempFile(ctx, backup, storage)
56+
// Get backup data from storage (streaming, no temp file)
57+
rawReader, err := storage.GetFile(uc.fieldEncryptor, backup.ID)
6458
if err != nil {
6559
return &ValidationResult{
6660
IsValid: false,
67-
Error: stringPtr(fmt.Sprintf("failed to download backup: %v", err)),
61+
Error: stringPtr(fmt.Sprintf("failed to get backup file from storage: %v", err)),
6862
}, nil
6963
}
70-
defer cleanupFunc()
64+
defer func() {
65+
if err := rawReader.Close(); err != nil {
66+
uc.logger.Error("Failed to close backup reader", "error", err)
67+
}
68+
}()
7169

72-
// Open and decompress zstd file
73-
file, err := os.Open(tempFile)
74-
if err != nil {
75-
return &ValidationResult{
76-
IsValid: false,
77-
Error: stringPtr(fmt.Sprintf("failed to open backup file: %v", err)),
78-
}, nil
70+
// Setup decryption if needed
71+
var backupReader io.Reader = rawReader
72+
if backup.Encryption == backups_config.BackupEncryptionEncrypted {
73+
decryptReader, err := uc.setupDecryption(rawReader, backup)
74+
if err != nil {
75+
return &ValidationResult{
76+
IsValid: false,
77+
Error: stringPtr(fmt.Sprintf("failed to setup decryption: %v", err)),
78+
}, nil
79+
}
80+
backupReader = decryptReader
7981
}
80-
defer file.Close()
8182

82-
zstdReader, err := zstd.NewReader(file)
83+
// Decompress zstd stream
84+
zstdReader, err := zstd.NewReader(backupReader)
8385
if err != nil {
8486
return &ValidationResult{
8587
IsValid: false,
@@ -89,7 +91,7 @@ func (uc *ValidateMysqlBackupUsecase) Execute(
8991
}
9092
defer zstdReader.Close()
9193

92-
// Read first 64KB for syntax validation
94+
// Read first 64KB for syntax validation (streaming!)
9395
buffer := make([]byte, 64*1024)
9496
n, err := zstdReader.Read(buffer)
9597
if err != nil && err != io.EOF {
@@ -122,7 +124,7 @@ func (uc *ValidateMysqlBackupUsecase) Execute(
122124
}
123125

124126
details := fmt.Sprintf(
125-
"Backup file is valid. Contains CREATE TABLE: %v, INSERT: %v",
127+
"Backup file is valid (streaming validation). Contains CREATE TABLE: %v, INSERT: %v",
126128
hasCreateTable,
127129
hasInsert,
128130
)
@@ -134,81 +136,6 @@ func (uc *ValidateMysqlBackupUsecase) Execute(
134136
}, nil
135137
}
136138

137-
// downloadBackupToTempFile downloads backup data from storage to a temporary file
138-
func (uc *ValidateMysqlBackupUsecase) downloadBackupToTempFile(
139-
ctx context.Context,
140-
backup *usecases_common.BackupInfo,
141-
storage *storages.Storage,
142-
) (string, func(), error) {
143-
err := files_utils.EnsureDirectories([]string{
144-
config.GetEnv().TempFolder,
145-
})
146-
if err != nil {
147-
return "", nil, fmt.Errorf("failed to ensure directories: %w", err)
148-
}
149-
150-
tempDir, err := os.MkdirTemp(config.GetEnv().TempFolder, "validate_"+uuid.New().String())
151-
if err != nil {
152-
return "", nil, fmt.Errorf("failed to create temporary directory: %w", err)
153-
}
154-
155-
cleanupFunc := func() {
156-
_ = os.RemoveAll(tempDir)
157-
}
158-
159-
tempBackupFile := filepath.Join(tempDir, "backup.sql.zst")
160-
161-
uc.logger.Info(
162-
"Downloading backup file from storage to temporary file",
163-
"backupId", backup.ID,
164-
"tempFile", tempBackupFile,
165-
"encrypted", backup.Encryption == backups_config.BackupEncryptionEncrypted,
166-
)
167-
168-
fieldEncryptor := util_encryption.GetFieldEncryptor()
169-
rawReader, err := storage.GetFile(fieldEncryptor, backup.ID)
170-
if err != nil {
171-
cleanupFunc()
172-
return "", nil, fmt.Errorf("failed to get backup file from storage: %w", err)
173-
}
174-
defer func() {
175-
if err := rawReader.Close(); err != nil {
176-
uc.logger.Error("Failed to close backup reader", "error", err)
177-
}
178-
}()
179-
180-
// Create a reader that handles decryption if needed
181-
var backupReader io.Reader = rawReader
182-
if backup.Encryption == backups_config.BackupEncryptionEncrypted {
183-
decryptReader, err := uc.setupDecryption(rawReader, backup)
184-
if err != nil {
185-
cleanupFunc()
186-
return "", nil, fmt.Errorf("failed to setup decryption: %w", err)
187-
}
188-
backupReader = decryptReader
189-
}
190-
191-
tempFile, err := os.Create(tempBackupFile)
192-
if err != nil {
193-
cleanupFunc()
194-
return "", nil, fmt.Errorf("failed to create temporary backup file: %w", err)
195-
}
196-
defer func() {
197-
if err := tempFile.Close(); err != nil {
198-
uc.logger.Error("Failed to close temporary file", "error", err)
199-
}
200-
}()
201-
202-
_, err = io.Copy(tempFile, backupReader)
203-
if err != nil {
204-
cleanupFunc()
205-
return "", nil, fmt.Errorf("failed to write backup to temporary file: %w", err)
206-
}
207-
208-
uc.logger.Info("Backup file written to temporary location", "tempFile", tempBackupFile)
209-
return tempBackupFile, cleanupFunc, nil
210-
}
211-
212139
func (uc *ValidateMysqlBackupUsecase) setupDecryption(
213140
reader io.Reader,
214141
backup *usecases_common.BackupInfo,
@@ -250,4 +177,3 @@ func (uc *ValidateMysqlBackupUsecase) setupDecryption(
250177
func stringPtr(s string) *string {
251178
return &s
252179
}
253-

backend/internal/features/databases/cleanup_service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,4 @@ func (s *CleanupService) runCleanup() {
4848

4949

5050

51+

backend/internal/features/databases/controller_server_helper.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,4 @@ func (c *DatabaseController) createOrGetServer(
113113

114114

115115

116+

backend/internal/features/databases/databases/mariadb/discovery.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,4 @@ func buildDiscoveryDSN(req DiscoveryRequest) string {
119119

120120

121121

122+

backend/internal/features/databases/databases/mongodb/discovery.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,4 @@ func buildDiscoveryURI(req DiscoveryRequest) string {
123123

124124

125125

126+

0 commit comments

Comments
 (0)