-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrestore.go
More file actions
359 lines (314 loc) · 12.6 KB
/
restore.go
File metadata and controls
359 lines (314 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
package stackgraph
import (
"context"
"fmt"
"sort"
"strconv"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/spf13/cobra"
"github.com/stackvista/stackstate-backup-cli/cmd/cmdutils"
"github.com/stackvista/stackstate-backup-cli/internal/app"
"github.com/stackvista/stackstate-backup-cli/internal/clients/k8s"
s3client "github.com/stackvista/stackstate-backup-cli/internal/clients/s3"
"github.com/stackvista/stackstate-backup-cli/internal/foundation/config"
"github.com/stackvista/stackstate-backup-cli/internal/foundation/logger"
"github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward"
"github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore"
"github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale"
corev1 "k8s.io/api/core/v1"
)
const (
jobNameTemplate = "stackgraph-restore"
configMapDefaultFileMode = 0755
purgeStackgraphDataFlag = "-force"
)
// Restore command flags
var (
archiveName string
useLatest bool
background bool
skipConfirmation bool
)
func restoreCmd(globalFlags *config.CLIGlobalFlags) *cobra.Command {
cmd := &cobra.Command{
Use: "restore",
Short: "Restore Stackgraph from a backup archive",
Long: `Restore Stackgraph data from a backup archive stored in S3/Minio. Can use --latest or --archive to specify which backup to restore.`,
Run: func(_ *cobra.Command, _ []string) {
cmdutils.Run(globalFlags, runRestore, cmdutils.MinioIsRequired)
},
}
cmd.Flags().StringVar(&archiveName, "archive", "", "Specific archive name to restore (e.g., sts-backup-20210216-0300.graph)")
cmd.Flags().BoolVar(&useLatest, "latest", false, "Restore from the most recent backup")
cmd.Flags().BoolVar(&background, "background", false, "Run restore job in background without waiting for completion")
cmd.Flags().BoolVarP(&skipConfirmation, "yes", "y", false, "Skip confirmation prompt")
cmd.MarkFlagsMutuallyExclusive("archive", "latest")
cmd.MarkFlagsOneRequired("archive", "latest")
return cmd
}
func runRestore(appCtx *app.Context) error {
// Determine which archive to restore
backupFile := archiveName
if useLatest {
appCtx.Logger.Infof("Finding latest backup...")
latest, err := getLatestBackup(appCtx.K8sClient, appCtx.Namespace, appCtx.Config, appCtx.Logger)
if err != nil {
return err
}
backupFile = latest
appCtx.Logger.Infof("Using latest backup: %s", backupFile)
}
// Warn user and ask for confirmation
if !skipConfirmation {
appCtx.Logger.Println()
appCtx.Logger.Warningf("WARNING: Restoring from backup will PURGE all existing Stackgraph data!")
appCtx.Logger.Warningf("This operation cannot be undone.")
appCtx.Logger.Println()
appCtx.Logger.Infof("Backup to restore: %s", backupFile)
appCtx.Logger.Infof("Namespace: %s", appCtx.Namespace)
appCtx.Logger.Println()
if !restore.PromptForConfirmation() {
return fmt.Errorf("restore operation cancelled by user")
}
}
// Scale down deployments before restore (with lock protection)
appCtx.Logger.Println()
scaleDownLabelSelector := appCtx.Config.Stackgraph.Restore.ScaleDownLabelSelector
scaledDeployments, err := scale.ScaleDownWithLock(scale.ScaleDownWithLockParams{
K8sClient: appCtx.K8sClient,
Namespace: appCtx.Namespace,
LabelSelector: scaleDownLabelSelector,
Datastore: config.DatastoreStackgraph,
AllSelectors: appCtx.Config.GetAllScaleDownSelectors(),
Log: appCtx.Logger,
})
if err != nil {
return err
}
// Ensure deployments are scaled back up and lock released on exit (even if restore fails)
defer func() {
if len(scaledDeployments) > 0 && !background {
appCtx.Logger.Println()
if err := scale.ScaleUpAndReleaseLock(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger); err != nil {
appCtx.Logger.Warningf("Failed to scale up deployments: %v", err)
}
}
}()
// Setup Kubernetes resources for restore job
appCtx.Logger.Println()
if err := restore.EnsureResources(appCtx.K8sClient, appCtx.Namespace, appCtx.Config, appCtx.Logger); err != nil {
return err
}
// Create restore job
appCtx.Logger.Println()
appCtx.Logger.Infof("Creating restore job for backup: %s", backupFile)
jobName := fmt.Sprintf("%s-%s", jobNameTemplate, time.Now().Format("20060102t150405"))
if err = createRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, backupFile, appCtx.Config); err != nil {
return fmt.Errorf("failed to create restore job: %w", err)
}
appCtx.Logger.Successf("Restore job created: %s", jobName)
if background {
restore.PrintRunningJobStatus(appCtx.Logger, "stackgraph", jobName, appCtx.Namespace, 0)
return nil
}
return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger)
}
// waitAndCleanupRestoreJob waits for job completion and cleans up resources
func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error {
restore.PrintWaitingMessage(log, "stackgraph", jobName, namespace)
return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, true)
}
// getLatestBackup retrieves the most recent backup from S3
func getLatestBackup(k8sClient *k8s.Client, namespace string, config *config.Config, log *logger.Logger) (string, error) {
// Setup port-forward to Minio
serviceName := config.Minio.Service.Name
localPort := config.Minio.Service.LocalPortForwardPort
remotePort := config.Minio.Service.Port
pf, err := portforward.SetupPortForward(k8sClient, namespace, serviceName, localPort, remotePort, log)
if err != nil {
return "", err
}
defer close(pf.StopChan)
// Create S3 client
endpoint := fmt.Sprintf("http://localhost:%d", pf.LocalPort)
s3Client, err := s3client.NewClient(endpoint, config.Minio.AccessKey, config.Minio.SecretKey)
if err != nil {
return "", err
}
// List objects in bucket
bucket := config.Stackgraph.Bucket
prefix := config.Stackgraph.S3Prefix
multipartArchive := config.Stackgraph.MultipartArchive
input := &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
}
result, err := s3Client.ListObjectsV2(context.Background(), input)
if err != nil {
return "", fmt.Errorf("failed to list S3 objects: %w", err)
}
// Filter objects based on whether the archive is split or not
filteredObjects := s3client.FilterBackupObjects(result.Contents, multipartArchive)
if len(filteredObjects) == 0 {
return "", fmt.Errorf("no backups found in bucket %s", bucket)
}
// Sort by LastModified time (most recent first)
sort.Slice(filteredObjects, func(i, j int) bool {
return filteredObjects[i].LastModified.After(filteredObjects[j].LastModified)
})
return filteredObjects[0].Key, nil
}
// buildPVCSpec builds a PVCSpec from configuration
func buildPVCSpec(name string, config *config.Config, labels map[string]string) k8s.PVCSpec {
pvcConfig := config.Stackgraph.Restore.PVC
// Convert string access modes to k8s types
accessModes := []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce} // default
if len(pvcConfig.AccessModes) > 0 {
accessModes = make([]corev1.PersistentVolumeAccessMode, 0, len(pvcConfig.AccessModes))
for _, mode := range pvcConfig.AccessModes {
accessModes = append(accessModes, corev1.PersistentVolumeAccessMode(mode))
}
}
// Handle storage class (nil if not set)
var storageClass *string
if pvcConfig.StorageClassName != "" {
storageClass = &pvcConfig.StorageClassName
}
return k8s.PVCSpec{
Name: name,
Labels: labels,
StorageSize: pvcConfig.Size,
AccessModes: accessModes,
StorageClass: storageClass,
}
}
// createRestoreJob creates a Kubernetes Job and PVC for restoring from backup
func createRestoreJob(k8sClient *k8s.Client, namespace, jobName, backupFile string, config *config.Config) error {
defaultMode := int32(configMapDefaultFileMode)
// Merge common labels with resource-specific labels
pvcLabels := k8s.MergeLabels(config.Kubernetes.CommonLabels, map[string]string{})
jobLabels := k8s.MergeLabels(config.Kubernetes.CommonLabels, config.Stackgraph.Restore.Job.Labels)
// Create PVC first
pvcSpec := buildPVCSpec(jobName, config, pvcLabels)
pvc, err := k8sClient.CreatePVC(namespace, pvcSpec)
if err != nil {
return fmt.Errorf("failed to create PVC: %w", err)
}
// Build job spec using configuration
spec := k8s.JobSpec{
Name: jobName,
Labels: jobLabels,
ImagePullSecrets: k8s.ConvertImagePullSecrets(config.Stackgraph.Restore.Job.ImagePullSecrets),
SecurityContext: k8s.ConvertPodSecurityContext(&config.Stackgraph.Restore.Job.SecurityContext),
NodeSelector: config.Stackgraph.Restore.Job.NodeSelector,
Tolerations: k8s.ConvertTolerations(config.Stackgraph.Restore.Job.Tolerations),
Affinity: k8s.ConvertAffinity(config.Stackgraph.Restore.Job.Affinity),
Containers: buildRestoreContainers(backupFile, config),
InitContainers: buildRestoreInitContainers(config),
Volumes: buildRestoreVolumes(jobName, config, defaultMode),
}
// Create job
_, err = k8sClient.CreateJob(namespace, spec)
if err != nil {
// Cleanup PVC if job creation fails
_ = k8sClient.DeletePVC(namespace, pvc.Name)
return fmt.Errorf("failed to create job: %w", err)
}
return nil
}
// buildRestoreEnvVars constructs environment variables for the restore job
func buildRestoreEnvVars(backupFile string, config *config.Config) []corev1.EnvVar {
return []corev1.EnvVar{
{Name: "BACKUP_FILE", Value: backupFile},
{Name: "FORCE_DELETE", Value: purgeStackgraphDataFlag},
{Name: "BACKUP_STACKGRAPH_BUCKET_NAME", Value: config.Stackgraph.Bucket},
{Name: "BACKUP_STACKGRAPH_S3_PREFIX", Value: config.Stackgraph.S3Prefix},
{Name: "BACKUP_STACKGRAPH_MULTIPART_ARCHIVE", Value: strconv.FormatBool(config.Stackgraph.MultipartArchive)},
{Name: "MINIO_ENDPOINT", Value: fmt.Sprintf("%s:%d", config.Minio.Service.Name, config.Minio.Service.Port)},
{Name: "ZOOKEEPER_QUORUM", Value: config.Stackgraph.Restore.ZookeeperQuorum},
}
}
// buildRestoreVolumeMounts constructs volume mounts for the restore job container
func buildRestoreVolumeMounts() []corev1.VolumeMount {
return []corev1.VolumeMount{
{Name: "backup-log", MountPath: "/opt/docker/etc_log"},
{Name: "backup-restore-scripts", MountPath: "/backup-restore-scripts"},
{Name: "minio-keys", MountPath: "/aws-keys"},
{Name: "tmp-data", MountPath: "/tmp-data"},
}
}
// buildRestoreInitContainers constructs init containers for the restore job
func buildRestoreInitContainers(config *config.Config) []corev1.Container {
return []corev1.Container{
{
Name: "wait",
Image: config.Stackgraph.Restore.Job.WaitImage,
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{
"sh",
"-c",
fmt.Sprintf("/entrypoint -c %s:%d -t 300", config.Minio.Service.Name, config.Minio.Service.Port),
},
SecurityContext: k8s.ConvertSecurityContext(config.Stackgraph.Restore.Job.ContainerSecurityContext),
},
}
}
// buildRestoreVolumes constructs volumes for the restore job pod
func buildRestoreVolumes(jobName string, config *config.Config, defaultMode int32) []corev1.Volume {
return []corev1.Volume{
{
Name: "backup-log",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: config.Stackgraph.Restore.LoggingConfigConfigMapName,
},
},
},
},
{
Name: "backup-restore-scripts",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: restore.RestoreScriptsConfigMap,
},
DefaultMode: &defaultMode,
},
},
},
{
Name: "minio-keys",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: restore.MinioKeysSecretName,
},
},
},
{
Name: "tmp-data",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: jobName,
},
},
},
}
}
// buildRestoreContainers constructs containers for the restore job
func buildRestoreContainers(backupFile string, config *config.Config) []corev1.Container {
return []corev1.Container{
{
Name: "restore",
Image: config.Stackgraph.Restore.Job.Image,
ImagePullPolicy: corev1.PullIfNotPresent,
SecurityContext: k8s.ConvertSecurityContext(config.Stackgraph.Restore.Job.ContainerSecurityContext),
Command: []string{"/backup-restore-scripts/restore-stackgraph-backup.sh"},
Env: buildRestoreEnvVars(backupFile, config),
Resources: k8s.ConvertResources(config.Stackgraph.Restore.Job.Resources),
VolumeMounts: buildRestoreVolumeMounts(),
},
}
}