Skip to content

Commit 7367dfd

Browse files
author
anton.voskresensky
committed
add parallel snapshots
1 parent d0955a4 commit 7367dfd

1 file changed

Lines changed: 54 additions & 11 deletions

File tree

pkg/utils/snapshots.go

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,14 @@ func GetActiveSnapshotCount(client *opensearch.Client) (int, error) {
156156
return len(status.Snapshots), nil
157157
}
158158

159+
func GetActiveSnapshots(client *opensearch.Client) ([]opensearch.SnapshotInfo, error) {
160+
status, err := client.GetSnapshotStatus()
161+
if err != nil {
162+
return nil, err
163+
}
164+
return status.Snapshots, nil
165+
}
166+
159167
type SnapshotTask struct {
160168
SnapshotName string
161169
IndicesStr string
@@ -181,21 +189,36 @@ func CreateSnapshotsInParallel(client *opensearch.Client, tasks []SnapshotTask,
181189
return sortedTasks[i].Size < sortedTasks[j].Size
182190
})
183191

192+
order := "descending (largest first)"
193+
if !sortDescending {
194+
order = "ascending (smallest first)"
195+
}
196+
logger.Info(fmt.Sprintf("Starting parallel snapshot creation tasksCount=%d maxConcurrent=%d sortOrder=%s", len(sortedTasks), maxConcurrent, order))
197+
if len(sortedTasks) > 0 {
198+
taskNames := make([]string, 0, len(sortedTasks))
199+
for _, task := range sortedTasks {
200+
taskNames = append(taskNames, task.SnapshotName)
201+
}
202+
logger.Info(fmt.Sprintf("Snapshot tasks in order: %s", strings.Join(taskNames, ", ")))
203+
}
204+
184205
taskChan := make(chan SnapshotTask, len(sortedTasks))
185206

186207
for _, task := range sortedTasks {
187208
taskChan <- task
188209
}
189210
close(taskChan)
190211

212+
logger.Info(fmt.Sprintf("Creating %d worker goroutines for snapshot creation", maxConcurrent))
191213
for i := 0; i < maxConcurrent; i++ {
214+
workerID := i + 1
192215
wg.Add(1)
193-
go func() {
216+
go func(id int) {
194217
defer wg.Done()
195218

196219
for task := range taskChan {
197-
logger.Info(fmt.Sprintf("Creating snapshot snapshot=%s repo=%s", task.SnapshotName, task.Repo))
198-
logger.Info(fmt.Sprintf("Snapshot indices %s", task.IndicesStr))
220+
logger.Info(fmt.Sprintf("Worker %d: Starting snapshot creation snapshot=%s repo=%s", id, task.SnapshotName, task.Repo))
221+
logger.Info(fmt.Sprintf("Worker %d: Snapshot indices %s", id, task.IndicesStr))
199222

200223
err := CreateSnapshotWithRetry(client, task.SnapshotName, task.IndicesStr, task.Repo, task.Namespace, task.DateStr, madisonClient, logger, task.PollInterval, maxConcurrent)
201224

@@ -205,35 +228,54 @@ func CreateSnapshotsInParallel(client *opensearch.Client, tasks []SnapshotTask,
205228
snapshotName = fmt.Sprintf("%s (repo=%s)", task.SnapshotName, task.Repo)
206229
}
207230
if err != nil {
208-
logger.Error(fmt.Sprintf("Failed to create snapshot after retries snapshot=%s error=%v", task.SnapshotName, err))
231+
logger.Error(fmt.Sprintf("Worker %d: Failed to create snapshot after retries snapshot=%s error=%v", id, task.SnapshotName, err))
209232
failed = append(failed, snapshotName)
210233
} else {
234+
logger.Info(fmt.Sprintf("Worker %d: Successfully created snapshot snapshot=%s", id, task.SnapshotName))
211235
successful = append(successful, snapshotName)
212236
}
213237
mu.Unlock()
214238
}
215-
}()
239+
logger.Info(fmt.Sprintf("Worker %d: Finished processing all assigned tasks", id))
240+
}(workerID)
216241
}
217242

218243
wg.Wait()
219244
return successful, failed
220245
}
221246

222-
func WaitForSnapshotSlot(client *opensearch.Client, logger *logging.Logger, maxConcurrent int, waitInterval time.Duration) error {
247+
func WaitForSnapshotSlot(client *opensearch.Client, logger *logging.Logger, maxConcurrent int, waitInterval time.Duration, waitingForSnapshot string) error {
223248
for {
224-
activeCount, err := GetActiveSnapshotCount(client)
249+
activeSnapshots, err := GetActiveSnapshots(client)
225250
if err != nil {
226-
logger.Warn(fmt.Sprintf("Failed to get active snapshot count, retrying error=%v", err))
251+
logger.Warn(fmt.Sprintf("Failed to get active snapshot status, retrying error=%v", err))
227252
time.Sleep(waitInterval)
228253
continue
229254
}
230255

256+
activeCount := len(activeSnapshots)
231257
if activeCount < maxConcurrent {
232-
logger.Info(fmt.Sprintf("Snapshot slot available active=%d max=%d", activeCount, maxConcurrent))
258+
if waitingForSnapshot != "" {
259+
logger.Info(fmt.Sprintf("Snapshot slot available for snapshot=%s active=%d max=%d", waitingForSnapshot, activeCount, maxConcurrent))
260+
} else {
261+
logger.Info(fmt.Sprintf("Snapshot slot available active=%d max=%d", activeCount, maxConcurrent))
262+
}
233263
return nil
234264
}
235265

236-
logger.Info(fmt.Sprintf("Waiting for snapshot slot active=%d max=%d", activeCount, maxConcurrent))
266+
activeNames := make([]string, 0, len(activeSnapshots))
267+
for _, s := range activeSnapshots {
268+
if s.Repository != "" && s.Snapshot != "" {
269+
activeNames = append(activeNames, fmt.Sprintf("%s/%s", s.Repository, s.Snapshot))
270+
} else if s.Snapshot != "" {
271+
activeNames = append(activeNames, s.Snapshot)
272+
}
273+
}
274+
if waitingForSnapshot != "" {
275+
logger.Info(fmt.Sprintf("Waiting for snapshot slot snapshot=%s active=%d max=%d activeSnapshots=[%s] waitInterval=%v", waitingForSnapshot, activeCount, maxConcurrent, strings.Join(activeNames, ", "), waitInterval))
276+
} else {
277+
logger.Info(fmt.Sprintf("Waiting for snapshot slot active=%d max=%d activeSnapshots=[%s] waitInterval=%v", activeCount, maxConcurrent, strings.Join(activeNames, ", "), waitInterval))
278+
}
237279
time.Sleep(waitInterval)
238280
}
239281
}
@@ -297,7 +339,8 @@ retryLoop:
297339
logger.Info(fmt.Sprintf("Creating snapshot attempt snapshot=%s attempt=%d maxRetries=%d", snapshotName, attempt, maxRetries))
298340

299341
if maxConcurrent > 0 {
300-
err := WaitForSnapshotSlot(client, logger, maxConcurrent, pollInterval)
342+
logger.Info(fmt.Sprintf("Waiting for snapshot slot before creating snapshot=%s attempt=%d", snapshotName, attempt))
343+
err := WaitForSnapshotSlot(client, logger, maxConcurrent, pollInterval, snapshotName)
301344
if err != nil {
302345
logger.Error(fmt.Sprintf("Failed to wait for snapshot slot snapshot=%s error=%v", snapshotName, err))
303346
return err

0 commit comments

Comments
 (0)