Skip to content
This repository was archived by the owner on Oct 2, 2025. It is now read-only.

Commit 8e15560

Browse files
author
Andrey Sokolov
authored
Support subset restore with on-error-continue mode (#131)
Add possibility to use on-error-continue with filtering. Add the discardData method to the IRestoreReader interface. When an error happens during COPY FROM execution, discardData is called to read the rest of the table data from plugin's pipe. So the transition to the next table occurs and the offset is positioned automatically. In the case of discard error we don't stop the helper, because other helpers can restore data on other segments. Another reason - hanging on COPY happens when one helper stops before restoring a next table. Add the discardErr boolean field to RestoreReader. This field is used in discardData and copyData to avoid bufReader reading when an error has happened in discardData before. When skip file is found, call discardData to move on the next table data. Data written in writeTestTOC has now started to be used. Fix this data. Ticket: ADBDEV-7890
1 parent ea5269b commit 8e15560

2 files changed

Lines changed: 261 additions & 8 deletions

File tree

helper/helper_test.go

Lines changed: 213 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package helper
33
import (
44
"bufio"
55
"fmt"
6+
"io"
67
"os"
8+
"strings"
79

810
"github.com/greenplum-db/gpbackup/utils"
911
"golang.org/x/sys/unix"
@@ -21,7 +23,9 @@ var (
2123
)
2224

2325
type restoreReaderTestImpl struct {
24-
waitCount int
26+
waitCount int
27+
discardedBytes int64
28+
discardErr error
2529
}
2630

2731
func (r *restoreReaderTestImpl) waitForPlugin() error {
@@ -45,7 +49,16 @@ func (r *restoreReaderTestImpl) closeFileHandle() {
4549
}
4650

4751
func (r *restoreReaderTestImpl) getReaderType() ReaderType {
48-
return "nil"
52+
return SUBSET
53+
}
54+
55+
func (r *restoreReaderTestImpl) discardData(num int64) (int64, error) {
56+
if r.discardErr != nil {
57+
return 0, r.discardErr
58+
}
59+
60+
r.discardedBytes += num
61+
return num, nil
4962
}
5063

5164
type helperTestStep struct {
@@ -198,6 +211,47 @@ func (pt *testPluginCmd) Wait() error {
198211
func (pt *testPluginCmd) errLog() {
199212
}
200213

214+
type limitReader struct {
215+
remainder int
216+
err error
217+
}
218+
219+
func (r *limitReader) Read(p []byte) (n int, err error) {
220+
if r.remainder <= 0 {
221+
return 0, r.err
222+
}
223+
224+
if len(p) > r.remainder {
225+
p = p[0:r.remainder]
226+
}
227+
228+
n = len(p)
229+
for i := 0; i < n; i++ {
230+
p[i] = 1
231+
}
232+
r.remainder -= n
233+
return
234+
}
235+
236+
type limitWriter struct {
237+
remainder int
238+
}
239+
240+
func (w *limitWriter) Write(p []byte) (n int, err error) {
241+
if w.remainder < len(p) {
242+
n = w.remainder
243+
} else {
244+
n = len(p)
245+
}
246+
247+
if w.remainder == 0 {
248+
err = io.ErrShortWrite
249+
}
250+
251+
w.remainder -= n
252+
return
253+
}
254+
201255
var _ = Describe("helper tests", func() {
202256
var pluginConfig utils.PluginConfig
203257
var isSubset bool
@@ -243,7 +297,7 @@ var _ = Describe("helper tests", func() {
243297
pluginConfig.Options["restore_subset"] = "on"
244298
*onErrorContinue = true
245299
isSubset = getSubsetFlag(fileToRead, &pluginConfig)
246-
Expect(isSubset).To(Equal(false))
300+
Expect(isSubset).To(Equal(true))
247301
})
248302
It("when restore_subset is off, --on-error-continue is false, compression \"gz\" is used", func() {
249303
pluginConfig.Options["restore_subset"] = "off"
@@ -416,6 +470,52 @@ var _ = Describe("helper tests", func() {
416470
err := doRestoreAgentInternal(helper)
417471
Expect(err).To(BeNil())
418472
})
473+
It("discard data if skip file is discovered with single datafile", func() {
474+
*singleDataFile = true
475+
*isResizeRestore = false
476+
*tocFile = testTocFile
477+
478+
writeTestTOC(testTocFile)
479+
defer func() {
480+
_ = os.Remove(*tocFile)
481+
}()
482+
483+
oidBatch := []oidWithBatch{
484+
{1 /* The first oid from TOC */, 0},
485+
}
486+
487+
expectedScenario := []helperTestStep{
488+
{"mock_1_0", false, 1, true, "Can not open pipe for table 1, check_skip_file shall called, skip file exists"},
489+
}
490+
491+
helper := newHelperTest(oidBatch, expectedScenario)
492+
err := doRestoreAgentInternal(helper)
493+
Expect(err).ToNot(HaveOccurred())
494+
Expect(helper.restoreData.discardedBytes).To(Equal(int64(18)))
495+
})
496+
It("discard error data if skip file is discovered with single datafile", func() {
497+
*singleDataFile = true
498+
*isResizeRestore = false
499+
*tocFile = testTocFile
500+
501+
writeTestTOC(testTocFile)
502+
defer func() {
503+
_ = os.Remove(*tocFile)
504+
}()
505+
506+
oidBatch := []oidWithBatch{
507+
{1 /* The first oid from TOC */, 0},
508+
}
509+
510+
expectedScenario := []helperTestStep{
511+
{"mock_1_0", false, 1, true, "Can not open pipe for table 1, check_skip_file shall called, skip file exists"},
512+
}
513+
514+
helper := newHelperTest(oidBatch, expectedScenario)
515+
helper.restoreData.discardErr = io.EOF
516+
err := doRestoreAgentInternal(helper)
517+
Expect(err).To(Equal(io.EOF))
518+
})
419519
It("calls Wait in waitForPlugin doRestoreAgent for single data file", func() {
420520
*singleDataFile = true
421521
*isResizeRestore = false
@@ -476,6 +576,10 @@ var _ = Describe("helper tests", func() {
476576
})
477577
})
478578
Describe("RestoreReader tests", func() {
579+
AfterEach(func() {
580+
*onErrorContinue = false
581+
writer = nil
582+
})
479583
It("waitForPlugin normal completion", func() {
480584
test_cmd1 := testPluginCmd{hasProcess_: true}
481585
test_reader := new(RestoreReader)
@@ -504,20 +608,123 @@ var _ = Describe("helper tests", func() {
504608
Expect(err).To(HaveOccurred())
505609
Expect(err.Error()).To(Equal(msg))
506610
})
611+
It("CopyData, readerType is SUBSET. Normal completion", func() {
612+
writer = bufio.NewWriterSize(&limitWriter{100}, 5)
613+
614+
test_reader := RestoreReader{
615+
readerType: SUBSET,
616+
bufReader: bufio.NewReader(&limitReader{100, io.EOF}),
617+
}
618+
619+
bytesRead, err := test_reader.copyData(18)
620+
Expect(bytesRead).To(Equal(int64(18)))
621+
Expect(err).ToNot(HaveOccurred())
622+
})
623+
It("CopyData, readerType is SUBSET. Error on write", func() {
624+
*onErrorContinue = true
625+
bufSize := 5
626+
toRead := int64(18)
627+
writer = bufio.NewWriterSize(&limitWriter{7}, bufSize)
628+
629+
test_reader := RestoreReader{
630+
readerType: SUBSET,
631+
bufReader: bufio.NewReader(&limitReader{100, io.EOF}),
632+
}
633+
634+
bytesRead, err := test_reader.copyData(toRead)
635+
Expect(bytesRead).To(Equal(toRead))
636+
Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true))
637+
str := fmt.Sprintf("copied %d bytes from %d: ", bufSize*2, toRead)
638+
Expect(strings.HasPrefix(err.Error(), str)).To(Equal(true))
639+
640+
})
641+
It("CopyData, readerType is SUBSET. EOF", func() {
642+
*onErrorContinue = true
643+
writer = bufio.NewWriterSize(&limitWriter{100}, 5)
644+
645+
test_reader := RestoreReader{
646+
readerType: SUBSET,
647+
bufReader: bufio.NewReader(&limitReader{25, io.EOF}),
648+
}
649+
650+
bytesRead, err := test_reader.copyData(30)
651+
Expect(bytesRead).To(Equal(int64(25)))
652+
Expect(err).To(Equal(io.EOF))
653+
})
654+
It("CopyData, readerType is SUBSET. Error on write and EOF", func() {
655+
*onErrorContinue = true
656+
bufSize := 5
657+
toCopy := int64(30)
658+
rLmt := int64(25)
659+
writer = bufio.NewWriterSize(&limitWriter{7}, bufSize)
660+
661+
test_reader := RestoreReader{
662+
readerType: SUBSET,
663+
bufReader: bufio.NewReader(&limitReader{int(rLmt), io.EOF}),
664+
}
665+
666+
bytesRead, err := test_reader.copyData(toCopy)
667+
Expect(bytesRead).To(Equal(rLmt))
668+
Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true))
669+
Expect(errors.Is(err, io.EOF)).To(Equal(true))
670+
readBeforeErr := int64(bufSize * 2)
671+
prefix := fmt.Sprintf("discard error in copyData: discarded %d bytes from %d: ", rLmt-readBeforeErr, toCopy-readBeforeErr)
672+
Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true))
673+
strCopied := fmt.Sprintf("copied %d bytes from %d: ", readBeforeErr, toCopy)
674+
Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true))
675+
676+
bytesRead, err = test_reader.copyData(10)
677+
Expect(bytesRead).To(Equal(int64(0)))
678+
Expect(err.Error()).To(Equal("10 bytes to copy, but discard error has already occurred. Skipping read."))
679+
680+
bytesRead, err = test_reader.discardData(5)
681+
Expect(bytesRead).To(Equal(int64(0)))
682+
Expect(err.Error()).To(Equal("5 bytes to discard, but discard error has already occurred. Skipping read."))
683+
})
684+
It("CopyData, readerType is SUBSET. Error on write and on read", func() {
685+
*onErrorContinue = true
686+
bufSize := 5
687+
toCopy := int64(30)
688+
rLmt := int64(25)
689+
writer = bufio.NewWriterSize(&limitWriter{7}, bufSize)
690+
691+
test_reader := RestoreReader{
692+
readerType: SUBSET,
693+
bufReader: bufio.NewReader(&limitReader{int(rLmt), io.ErrNoProgress}),
694+
}
695+
696+
bytesRead, err := test_reader.copyData(toCopy)
697+
Expect(bytesRead).To(Equal(rLmt))
698+
Expect(errors.Is(err, io.ErrShortWrite)).To(Equal(true))
699+
Expect(errors.Is(err, io.ErrNoProgress)).To(Equal(true))
700+
readBeforeErr := int64(bufSize * 2)
701+
prefix := fmt.Sprintf("discard error in copyData: discarded %d bytes from %d: ", rLmt-readBeforeErr, toCopy-readBeforeErr)
702+
Expect(strings.HasPrefix(err.Error(), prefix)).To(Equal(true))
703+
strCopied := fmt.Sprintf("copied %d bytes from %d: ", readBeforeErr, toCopy)
704+
Expect(strings.Contains(err.Error(), strCopied)).To(Equal(true))
705+
706+
bytesRead, err = test_reader.copyData(10)
707+
Expect(bytesRead).To(Equal(int64(0)))
708+
Expect(err.Error()).To(Equal("10 bytes to copy, but discard error has already occurred. Skipping read."))
709+
710+
bytesRead, err = test_reader.discardData(5)
711+
Expect(bytesRead).To(Equal(int64(0)))
712+
Expect(err.Error()).To(Equal("5 bytes to discard, but discard error has already occurred. Skipping read."))
713+
})
507714
})
508715
})
509716

510717
func writeTestTOC(tocFile string) {
511718
// Write test TOC. We are not going to read data using it, so dataLength is a random number
512719
dataLength := 100
513720
customTOC := fmt.Sprintf(`dataentries:
514-
1:
721+
1:
515722
startbyte: 0
516723
endbyte: 18
517-
2:
724+
2:
518725
startbyte: 18
519726
endbyte: %[1]d
520-
3:
727+
3:
521728
startbyte: %[1]d
522729
endbyte: %d
523730
`, dataLength+18, dataLength+18+18)

helper/restore_helper.go

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bufio"
55
"bytes"
66
"compress/gzip"
7+
errorsStd "errors"
78
"fmt"
89
"io"
910
"io/ioutil"
@@ -54,6 +55,7 @@ type IRestoreReader interface {
5455
copyAllData() (int64, error)
5556
closeFileHandle()
5657
getReaderType() ReaderType
58+
discardData(num int64) (int64, error)
5759
}
5860

5961
type RestoreReader struct {
@@ -62,6 +64,7 @@ type RestoreReader struct {
6264
seekReader io.ReadSeeker
6365
pluginCmd IPluginCmd
6466
readerType ReaderType
67+
discardErr bool
6568
}
6669

6770
// Wait for plugin process that should be already finished. This should be
@@ -102,14 +105,53 @@ func (r *RestoreReader) positionReader(pos uint64, oid int) error {
102105
return nil
103106
}
104107

108+
func (r *RestoreReader) discardData(num int64) (int64, error) {
109+
if r.readerType != SUBSET {
110+
panic("discardData should be called for readerType == SUBSET only")
111+
}
112+
113+
if r.discardErr {
114+
err := fmt.Errorf("%d bytes to discard, but discard error has already occurred. Skipping read.", num)
115+
logVerbose(err.Error())
116+
return 0, err
117+
}
118+
119+
n, err := io.CopyN(io.Discard, r.bufReader, num)
120+
if err == nil {
121+
logVerbose(fmt.Sprintf("discarded %d bytes", n))
122+
} else {
123+
r.discardErr = true
124+
err = errors.Wrapf(err, "discarded %d bytes from %d", n, num)
125+
logError(err.Error())
126+
}
127+
return n, err
128+
}
129+
105130
func (r *RestoreReader) copyData(num int64) (int64, error) {
106131
var bytesRead int64
107132
var err error
108133
switch r.readerType {
109134
case SEEKABLE:
110135
bytesRead, err = io.CopyN(writer, r.seekReader, num)
111-
case NONSEEKABLE, SUBSET:
136+
case NONSEEKABLE:
137+
bytesRead, err = io.CopyN(writer, r.bufReader, num)
138+
case SUBSET:
139+
if r.discardErr {
140+
err := fmt.Errorf("%d bytes to copy, but discard error has already occurred. Skipping read.", num)
141+
logVerbose(err.Error())
142+
return 0, err
143+
}
144+
112145
bytesRead, err = io.CopyN(writer, r.bufReader, num)
146+
if err != nil && err != io.EOF && *onErrorContinue {
147+
err = errors.Wrapf(err, "copied %d bytes from %d", bytesRead, num)
148+
bytesDiscard, errDiscard := r.discardData(num - bytesRead)
149+
bytesRead += bytesDiscard
150+
if errDiscard != nil {
151+
err = errorsStd.Join(errDiscard, err)
152+
err = errors.Wrap(err, "discard error in copyData")
153+
}
154+
}
113155
}
114156
return bytesRead, err
115157
}
@@ -351,6 +393,10 @@ func doRestoreAgentInternal(restoreHelper IRestoreHelper) error {
351393
logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum))
352394
err = nil
353395
skipOid = tableOid
396+
if *singleDataFile && readers[contentToRestore] != nil && readers[contentToRestore].getReaderType() == SUBSET {
397+
bytesToDiscard := int64(end[contentToRestore] - start[contentToRestore])
398+
_, err = readers[contentToRestore].discardData(bytesToDiscard)
399+
}
354400
/* Close up to *copyQueue files with this tableOid */
355401
for idx := 0; idx < *copyQueue; idx++ {
356402
batchToDelete := batchNum + idx
@@ -613,7 +659,7 @@ func getSubsetFlag(fileToRead string, pluginConfig *utils.PluginConfig) bool {
613659
return false
614660
}
615661
// Helper's option does not allow to use subset
616-
if !*isFiltered || *onErrorContinue {
662+
if !*isFiltered {
617663
return false
618664
}
619665
// Restore subset and compression does not allow together

0 commit comments

Comments
 (0)