From 84a5ef5a1fe955c889b4349945cb72df814c375a Mon Sep 17 00:00:00 2001 From: Shivaji Kharse Date: Mon, 16 Mar 2026 15:33:05 +0530 Subject: [PATCH 1/2] resolve rate-limit failures when downloading datasets --- .../ci-dgraph-integration2-tests.yml | 16 +++++ .github/workflows/ci-dgraph-ldbc-tests.yml | 15 ++++- .github/workflows/ci-dgraph-load-tests.yml | 15 ++++- dgraphtest/load.go | 30 ++++++--- t/t.go | 64 +++++++++++++------ 5 files changed, 111 insertions(+), 29 deletions(-) diff --git a/.github/workflows/ci-dgraph-integration2-tests.yml b/.github/workflows/ci-dgraph-integration2-tests.yml index 403a08465b4..7c8fadd24f4 100644 --- a/.github/workflows/ci-dgraph-integration2-tests.yml +++ b/.github/workflows/ci-dgraph-integration2-tests.yml @@ -29,6 +29,15 @@ jobs: with: fetch-depth: 0 + - name: Restore benchmark dataset cache + uses: actions/cache/restore@v4 + with: + path: dgraphtest/datafiles + key: dataset-dgraphtest-v1 + + - name: Ensure datafiles directory + run: mkdir -p dgraphtest/datafiles + - name: Set up Go uses: actions/setup-go@v6 with: @@ -55,3 +64,10 @@ jobs: go clean -testcache # sleep sleep 5 + + - name: Save benchmark dataset cache + if: success() + uses: actions/cache/save@v4 + with: + path: dgraphtest/datafiles + key: dataset-dgraphtest-v1 diff --git a/.github/workflows/ci-dgraph-ldbc-tests.yml b/.github/workflows/ci-dgraph-ldbc-tests.yml index 045ad3d42e9..4ba863dedb0 100644 --- a/.github/workflows/ci-dgraph-ldbc-tests.yml +++ b/.github/workflows/ci-dgraph-ldbc-tests.yml @@ -28,6 +28,12 @@ jobs: - name: Checkout Dgraph uses: actions/checkout@v5 + - name: Restore LDBC dataset cache + uses: actions/cache/restore@v4 + with: + path: ${{ github.workspace }}/test-data + key: dataset-ldbc-v1 + - name: Set up Go uses: actions/setup-go@v6 with: @@ -61,6 +67,13 @@ jobs: # move the binary cp dgraph/dgraph ~/go/bin/dgraph # run the ldbc tests - cd t; ./t --suite=ldbc + cd t; ./t --suite=ldbc --tmp=${{ github.workspace }}/test-data # clean up docker containers after test execution ./t -r + + - name: Save LDBC dataset cache + if: success() + uses: actions/cache/save@v4 + with: + path: ${{ github.workspace }}/test-data + key: dataset-ldbc-v1 diff --git a/.github/workflows/ci-dgraph-load-tests.yml b/.github/workflows/ci-dgraph-load-tests.yml index a1967b59d53..dd25be53a63 100644 --- a/.github/workflows/ci-dgraph-load-tests.yml +++ b/.github/workflows/ci-dgraph-load-tests.yml @@ -27,6 +27,12 @@ jobs: steps: - uses: actions/checkout@v5 + - name: Restore load test dataset cache + uses: actions/cache/restore@v4 + with: + path: ${{ github.workspace }}/test-data + key: dataset-load-v1 + - name: Set up Go uses: actions/setup-go@v6 with: @@ -60,8 +66,15 @@ jobs: # move the binary cp dgraph/dgraph ~/go/bin/dgraph # run the load tests - cd t; ./t --suite=load + cd t; ./t --suite=load --tmp=${{ github.workspace }}/test-data # clean up docker containers after test execution ./t -r # sleep sleep 5 + + - name: Save load test dataset cache + if: success() + uses: actions/cache/save@v4 + with: + path: ${{ github.workspace }}/test-data + key: dataset-load-v1 diff --git a/dgraphtest/load.go b/dgraphtest/load.go index 116c04a6b5c..d7bf35255e9 100644 --- a/dgraphtest/load.go +++ b/dgraphtest/load.go @@ -20,6 +20,7 @@ import ( "runtime" "strconv" "strings" + "time" "github.com/pkg/errors" @@ -41,10 +42,10 @@ func (c *LocalCluster) HostDgraphBinaryPath() string { } var datafiles = map[string]string{ - "1million.schema": "https://github.com/dgraph-io/dgraph-benchmarks/blob/main/data/1million.schema?raw=true", - "1million.rdf.gz": "https://github.com/dgraph-io/dgraph-benchmarks/blob/main/data/1million.rdf.gz?raw=true", - "21million.schema": "https://github.com/dgraph-io/dgraph-benchmarks/blob/main/data/21million.schema?raw=true", - "21million.rdf.gz": "https://github.com/dgraph-io/dgraph-benchmarks/blob/main/data/21million.rdf.gz?raw=true", + "1million.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.schema", + "1million.rdf.gz": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.rdf.gz", + "21million.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.schema", + "21million.rdf.gz": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.rdf.gz", } type DatasetType int @@ -604,11 +605,22 @@ func (d *Dataset) ensureFile(filename string) string { } func downloadFile(fname, url string) error { - cmd := exec.Command("wget", "-O", fname, url) - cmd.Dir = datasetFilesPath - - if _, err := cmd.CombinedOutput(); err != nil { - return fmt.Errorf("error downloading file %s: %w", fname, err) + const maxRetries = 3 + fpath := filepath.Join(datasetFilesPath, fname) + for attempt := 1; attempt <= maxRetries; attempt++ { + cmd := exec.Command("wget", "--tries=3", "--waitretry=5", "--retry-connrefused", "-O", fname, url) + cmd.Dir = datasetFilesPath + + if out, err := cmd.CombinedOutput(); err != nil { + log.Printf("attempt %d/%d failed to download %s: %v\n%s", attempt, maxRetries, fname, err, string(out)) + if attempt < maxRetries { + time.Sleep(time.Duration(attempt*5) * time.Second) + continue + } + _ = os.Remove(fpath) + return fmt.Errorf("error downloading file %s after %d attempts: %w", fname, maxRetries, err) + } + return nil } return nil } diff --git a/t/t.go b/t/t.go index 168b6d364c7..cad64a1e1eb 100644 --- a/t/t.go +++ b/t/t.go @@ -1159,7 +1159,27 @@ var rdfFileNames = [...]string{ "workAt_0.rdf"} var ldbcDataFiles = map[string]string{ - "ldbcTypes.schema": "https://github.com/dgraph-io/dgraph-benchmarks/blob/main/ldbc/sf0.3/ldbcTypes.schema?raw=true", + "ldbcTypes.schema": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/ldbc/sf0.3/ldbcTypes.schema", +} + +func wgetWithRetry(fname, url, dir string) error { + const maxRetries = 3 + fpath := filepath.Join(dir, fname) + for attempt := 1; attempt <= maxRetries; attempt++ { + cmd := exec.Command("wget", "--tries=3", "--waitretry=5", "--retry-connrefused", "-O", fname, url) + cmd.Dir = dir + if out, err := cmd.CombinedOutput(); err != nil { + fmt.Printf("attempt %d/%d failed to download %s: %v\n%s\n", attempt, maxRetries, fname, err, string(out)) + if attempt < maxRetries { + time.Sleep(time.Duration(attempt*5) * time.Second) + continue + } + _ = os.Remove(fpath) + return fmt.Errorf("failed to download %s after %d attempts: %w", fname, maxRetries, err) + } + return nil + } + return nil } func downloadDataFiles() { @@ -1168,12 +1188,13 @@ func downloadDataFiles() { return } for fname, link := range datafiles { - cmd := exec.Command("wget", "-O", fname, link) - cmd.Dir = *tmp - - if out, err := cmd.CombinedOutput(); err != nil { - fmt.Printf("Error %v\n", err) - panic(fmt.Sprintf("error downloading a file: %s", string(out))) + fpath := filepath.Join(*tmp, fname) + if fi, err := os.Stat(fpath); err == nil && fi.Size() > 0 { + fmt.Printf("Skipping %s (already exists)\n", fname) + continue + } + if err := wgetWithRetry(fname, link, *tmp); err != nil { + panic(fmt.Sprintf("error downloading %s: %v", fname, err)) } } } @@ -1189,20 +1210,26 @@ func downloadLDBCFiles(dir string) { } start := time.Now() + sem := make(chan struct{}, 5) var wg sync.WaitGroup for fname, link := range ldbcDataFiles { + fpath := filepath.Join(dir, fname) + if fi, err := os.Stat(fpath); err == nil && fi.Size() > 0 { + fmt.Printf("Skipping %s (already exists)\n", fname) + continue + } wg.Add(1) - go func(fname, link string, wg *sync.WaitGroup) { + go func(fname, link string) { defer wg.Done() - start := time.Now() - cmd := exec.Command("wget", "-O", fname, link) - cmd.Dir = dir - if out, err := cmd.CombinedOutput(); err != nil { - fmt.Printf("Error %v\n", err) - panic(fmt.Sprintf("error downloading a file: %s", string(out))) + sem <- struct{}{} + defer func() { <-sem }() + + dlStart := time.Now() + if err := wgetWithRetry(fname, link, dir); err != nil { + panic(fmt.Sprintf("error downloading %s: %v", fname, err)) } - fmt.Printf("Downloaded %s to %s in %s \n", fname, dir, time.Since(start)) - }(fname, link, &wg) + fmt.Printf("Downloaded %s to %s in %s \n", fname, dir, time.Since(dlStart)) + }(fname, link) } wg.Wait() fmt.Printf("Downloaded %d files in %s \n", len(ldbcDataFiles), time.Since(start)) @@ -1387,7 +1414,9 @@ func run() error { needsData := testSuiteContainsAny("load", "ldbc", "all") if needsData && *tmp == "" { *tmp = filepath.Join(os.TempDir(), "dgraph-test-data") - x.Check(testutil.MakeDirEmpty([]string{*tmp})) + } + if needsData { + x.Check(os.MkdirAll(*tmp, 0755)) } if testSuiteContainsAny("load", "all") { downloadDataFiles() @@ -1449,7 +1478,6 @@ func main() { procId = rand.Intn(1000) err := run() - _ = os.RemoveAll(*tmp) if err != nil { os.Exit(1) } From 8175eccfcbc062e5a5984ab699bf61aef6d3c2fe Mon Sep 17 00:00:00 2001 From: Shivaji Kharse Date: Fri, 20 Mar 2026 15:16:10 +0530 Subject: [PATCH 2/2] resolve review comments --- .../ci-dgraph-integration2-tests.yml | 4 +- .github/workflows/ci-dgraph-ldbc-tests.yml | 6 +- .github/workflows/ci-dgraph-load-tests.yml | 6 +- dgraphtest/load.go | 50 ++-- t/t.go | 135 +++++---- testutil/benchmark-data-version | 1 + testutil/download.go | 260 ++++++++++++++++++ 7 files changed, 362 insertions(+), 100 deletions(-) create mode 100644 testutil/benchmark-data-version create mode 100644 testutil/download.go diff --git a/.github/workflows/ci-dgraph-integration2-tests.yml b/.github/workflows/ci-dgraph-integration2-tests.yml index 7c8fadd24f4..86fe6cca541 100644 --- a/.github/workflows/ci-dgraph-integration2-tests.yml +++ b/.github/workflows/ci-dgraph-integration2-tests.yml @@ -33,7 +33,7 @@ jobs: uses: actions/cache/restore@v4 with: path: dgraphtest/datafiles - key: dataset-dgraphtest-v1 + key: dataset-dgraphtest-${{ hashFiles('testutil/benchmark-data-version') }} - name: Ensure datafiles directory run: mkdir -p dgraphtest/datafiles @@ -70,4 +70,4 @@ jobs: uses: actions/cache/save@v4 with: path: dgraphtest/datafiles - key: dataset-dgraphtest-v1 + key: dataset-dgraphtest-${{ hashFiles('testutil/benchmark-data-version') }} diff --git a/.github/workflows/ci-dgraph-ldbc-tests.yml b/.github/workflows/ci-dgraph-ldbc-tests.yml index 4ba863dedb0..bcb733d8dc1 100644 --- a/.github/workflows/ci-dgraph-ldbc-tests.yml +++ b/.github/workflows/ci-dgraph-ldbc-tests.yml @@ -32,7 +32,7 @@ jobs: uses: actions/cache/restore@v4 with: path: ${{ github.workspace }}/test-data - key: dataset-ldbc-v1 + key: dataset-ldbc-${{ hashFiles('testutil/benchmark-data-version') }} - name: Set up Go uses: actions/setup-go@v6 @@ -67,7 +67,7 @@ jobs: # move the binary cp dgraph/dgraph ~/go/bin/dgraph # run the ldbc tests - cd t; ./t --suite=ldbc --tmp=${{ github.workspace }}/test-data + cd t; ./t --suite=ldbc --tmp=${{ github.workspace }}/test-data --keep-data # clean up docker containers after test execution ./t -r @@ -76,4 +76,4 @@ jobs: uses: actions/cache/save@v4 with: path: ${{ github.workspace }}/test-data - key: dataset-ldbc-v1 + key: dataset-ldbc-${{ hashFiles('testutil/benchmark-data-version') }} diff --git a/.github/workflows/ci-dgraph-load-tests.yml b/.github/workflows/ci-dgraph-load-tests.yml index dd25be53a63..b6f8d17d4f3 100644 --- a/.github/workflows/ci-dgraph-load-tests.yml +++ b/.github/workflows/ci-dgraph-load-tests.yml @@ -31,7 +31,7 @@ jobs: uses: actions/cache/restore@v4 with: path: ${{ github.workspace }}/test-data - key: dataset-load-v1 + key: dataset-load-${{ hashFiles('testutil/benchmark-data-version') }} - name: Set up Go uses: actions/setup-go@v6 @@ -66,7 +66,7 @@ jobs: # move the binary cp dgraph/dgraph ~/go/bin/dgraph # run the load tests - cd t; ./t --suite=load --tmp=${{ github.workspace }}/test-data + cd t; ./t --suite=load --tmp=${{ github.workspace }}/test-data --keep-data # clean up docker containers after test execution ./t -r # sleep @@ -77,4 +77,4 @@ jobs: uses: actions/cache/save@v4 with: path: ${{ github.workspace }}/test-data - key: dataset-load-v1 + key: dataset-load-${{ hashFiles('testutil/benchmark-data-version') }} diff --git a/dgraphtest/load.go b/dgraphtest/load.go index d7bf35255e9..278b604a1db 100644 --- a/dgraphtest/load.go +++ b/dgraphtest/load.go @@ -20,13 +20,13 @@ import ( "runtime" "strconv" "strings" - "time" "github.com/pkg/errors" "github.com/dgraph-io/dgo/v250/protos/api" "github.com/dgraph-io/dgraph/v25/dgraphapi" "github.com/dgraph-io/dgraph/v25/enc" + "github.com/dgraph-io/dgraph/v25/testutil" "github.com/dgraph-io/dgraph/v25/x" ) @@ -41,11 +41,13 @@ func (c *LocalCluster) HostDgraphBinaryPath() string { return filepath.Join(c.tempBinDir, "dgraph_host") } -var datafiles = map[string]string{ - "1million.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.schema", - "1million.rdf.gz": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.rdf.gz", - "21million.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.schema", - "21million.rdf.gz": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.rdf.gz", +// datafilePaths maps filenames to their paths inside the dgraph-benchmarks repo. +// URLs are constructed at runtime by combining these with the configured ref. +var datafilePaths = map[string]string{ + "1million.schema": "data/1million.schema", + "1million.rdf.gz": "data/1million.rdf.gz", + "21million.schema": "data/21million.schema", + "21million.rdf.gz": "data/21million.rdf.gz", } type DatasetType int @@ -592,35 +594,21 @@ func (d *Dataset) GqlSchemaPath() string { func (d *Dataset) ensureFile(filename string) string { fullPath := filepath.Join(datasetFilesPath, filename) - if exists, _ := fileExists(fullPath); !exists { - url, ok := datafiles[filename] + if !testutil.FileExistsAndValid(fullPath) { + repoPath, ok := datafilePaths[filename] if !ok { - panic(fmt.Sprintf("dataset file %s not found in datafiles map", filename)) + panic(fmt.Sprintf("dataset file %s not found in datafilePaths map", filename)) } - if err := downloadFile(filename, url); err != nil { + ref := testutil.BenchmarkDataRef("") + var err error + if strings.HasSuffix(filename, ".rdf.gz") { + err = testutil.DownloadLFSFile(filename, ref, repoPath, datasetFilesPath) + } else { + err = testutil.DownloadFile(filename, testutil.BenchmarkRawURL(ref, repoPath), datasetFilesPath) + } + if err != nil { panic(fmt.Sprintf("failed to download %s: %v", filename, err)) } } return fullPath } - -func downloadFile(fname, url string) error { - const maxRetries = 3 - fpath := filepath.Join(datasetFilesPath, fname) - for attempt := 1; attempt <= maxRetries; attempt++ { - cmd := exec.Command("wget", "--tries=3", "--waitretry=5", "--retry-connrefused", "-O", fname, url) - cmd.Dir = datasetFilesPath - - if out, err := cmd.CombinedOutput(); err != nil { - log.Printf("attempt %d/%d failed to download %s: %v\n%s", attempt, maxRetries, fname, err, string(out)) - if attempt < maxRetries { - time.Sleep(time.Duration(attempt*5) * time.Second) - continue - } - _ = os.Remove(fpath) - return fmt.Errorf("error downloading file %s after %d attempts: %w", fname, maxRetries, err) - } - return nil - } - return nil -} diff --git a/t/t.go b/t/t.go index cad64a1e1eb..adc9ce6bef4 100644 --- a/t/t.go +++ b/t/t.go @@ -33,6 +33,7 @@ import ( "github.com/docker/docker/client" "github.com/golang/glog" "github.com/spf13/pflag" + "golang.org/x/sync/errgroup" "golang.org/x/tools/go/packages" "github.com/dgraph-io/dgraph/v25/testutil" @@ -93,7 +94,13 @@ var ( "unit = true unit tests only (no Docker, no integration tag). "+ "integration = everything except ldbc, load, and systest-heavy (with Docker). "+ "systest = systest-baseline + systest-heavy.") - tmp = pflag.String("tmp", "", "Temporary directory used to download data.") + tmp = pflag.String("tmp", "", "Temporary directory used to download data.") + keepData = pflag.Bool("keep-data", false, + "If true, do not remove the data directory after tests complete. "+ + "Useful in CI where data is cached between runs.") + dataRef = pflag.String("data-ref", "", + "Git ref (branch, tag, or SHA) for dgraph-benchmarks data. "+ + "Overrides DGRAPH_TEST_DATA_REF env var and benchmark-data-version file.") downloadResources = pflag.BoolP("download", "d", true, "Flag to specify whether to download resources or not") race = pflag.Bool("race", false, "Set true to build with race") @@ -1121,18 +1128,17 @@ func isHeavyPackage(pkg string) bool { return false } -var datafiles = map[string]string{ - "1million-noindex.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million-noindex.schema", - "1million.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.schema", - "1million.rdf.gz": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/1million.rdf.gz", - "21million.schema": "https://raw.githubusercontent.com/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.schema", - "21million.rdf.gz": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/data/21million.rdf.gz", +// datafilePaths maps filenames to their paths inside the dgraph-benchmarks repo. +// URLs are constructed at runtime from BenchmarkDataRef(). +var datafilePaths = map[string]string{ + "1million-noindex.schema": "data/1million-noindex.schema", + "1million.schema": "data/1million.schema", + "1million.rdf.gz": "data/1million.rdf.gz", + "21million.schema": "data/21million.schema", + "21million.rdf.gz": "data/21million.rdf.gz", } -var baseUrl = "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/ldbc/sf0.3/ldbc_rdf_0.3/" -var suffix = "?raw=true" - -var rdfFileNames = [...]string{ +var ldbcRdfFileNames = [...]string{ "Deltas.rdf", "comment_0.rdf", "containerOf_0.rdf", @@ -1156,83 +1162,81 @@ var rdfFileNames = [...]string{ "studyAt_0.rdf", "tag_0.rdf", "tagclass_0.rdf", - "workAt_0.rdf"} - -var ldbcDataFiles = map[string]string{ - "ldbcTypes.schema": "https://media.githubusercontent.com/media/dgraph-io/dgraph-benchmarks/refs/heads/main/ldbc/sf0.3/ldbcTypes.schema", + "workAt_0.rdf", } -func wgetWithRetry(fname, url, dir string) error { - const maxRetries = 3 - fpath := filepath.Join(dir, fname) - for attempt := 1; attempt <= maxRetries; attempt++ { - cmd := exec.Command("wget", "--tries=3", "--waitretry=5", "--retry-connrefused", "-O", fname, url) - cmd.Dir = dir - if out, err := cmd.CombinedOutput(); err != nil { - fmt.Printf("attempt %d/%d failed to download %s: %v\n%s\n", attempt, maxRetries, fname, err, string(out)) - if attempt < maxRetries { - time.Sleep(time.Duration(attempt*5) * time.Second) - continue - } - _ = os.Remove(fpath) - return fmt.Errorf("failed to download %s after %d attempts: %w", fname, maxRetries, err) - } - return nil - } - return nil +// ldbcFilePaths maps filenames to their paths inside the dgraph-benchmarks repo. +var ldbcFilePaths = map[string]string{ + "ldbcTypes.schema": "ldbc/sf0.3/ldbcTypes.schema", } -func downloadDataFiles() { +func downloadDataFiles() error { if !*downloadResources { fmt.Print("Skipping downloading of resources\n") - return + return nil } - for fname, link := range datafiles { + ref := testutil.BenchmarkDataRef(*dataRef) + fmt.Printf("Using benchmark data ref: %s\n", ref) + for fname, repoPath := range datafilePaths { fpath := filepath.Join(*tmp, fname) - if fi, err := os.Stat(fpath); err == nil && fi.Size() > 0 { + if testutil.FileExistsAndValid(fpath) { fmt.Printf("Skipping %s (already exists)\n", fname) continue } - if err := wgetWithRetry(fname, link, *tmp); err != nil { - panic(fmt.Sprintf("error downloading %s: %v", fname, err)) + var err error + if strings.HasSuffix(fname, ".rdf.gz") { + err = testutil.DownloadLFSFile(fname, ref, repoPath, *tmp) + } else { + err = testutil.DownloadFile(fname, testutil.BenchmarkRawURL(ref, repoPath), *tmp) + } + if err != nil { + return fmt.Errorf("error downloading %s: %v", fname, err) } } + return nil } -func downloadLDBCFiles(dir string) { +func downloadLDBCFiles(dir string) error { if !*downloadResources { fmt.Print("Skipping downloading of resources\n") - return + return nil } - for _, name := range rdfFileNames { - ldbcDataFiles[name] = baseUrl + name + suffix + ref := testutil.BenchmarkDataRef(*dataRef) + fmt.Printf("Using benchmark data ref: %s\n", ref) + + // All LDBC files (schema + RDF) are LFS-tracked. + allFiles := make(map[string]string) + for fname, repoPath := range ldbcFilePaths { + allFiles[fname] = repoPath + } + for _, name := range ldbcRdfFileNames { + allFiles[name] = "ldbc/sf0.3/ldbc_rdf_0.3/" + name } start := time.Now() - sem := make(chan struct{}, 5) - var wg sync.WaitGroup - for fname, link := range ldbcDataFiles { + g, _ := errgroup.WithContext(context.Background()) + g.SetLimit(5) + for fname, repoPath := range allFiles { fpath := filepath.Join(dir, fname) - if fi, err := os.Stat(fpath); err == nil && fi.Size() > 0 { + if testutil.FileExistsAndValid(fpath) { fmt.Printf("Skipping %s (already exists)\n", fname) continue } - wg.Add(1) - go func(fname, link string) { - defer wg.Done() - sem <- struct{}{} - defer func() { <-sem }() - + g.Go(func() error { dlStart := time.Now() - if err := wgetWithRetry(fname, link, dir); err != nil { - panic(fmt.Sprintf("error downloading %s: %v", fname, err)) + if err := testutil.DownloadLFSFile(fname, ref, repoPath, dir); err != nil { + return fmt.Errorf("error downloading %s: %v", fname, err) } - fmt.Printf("Downloaded %s to %s in %s \n", fname, dir, time.Since(dlStart)) - }(fname, link) + fmt.Printf("Downloaded %s to %s in %s\n", fname, dir, time.Since(dlStart)) + return nil + }) } - wg.Wait() - fmt.Printf("Downloaded %d files in %s \n", len(ldbcDataFiles), time.Since(start)) + if err := g.Wait(); err != nil { + return err + } + fmt.Printf("Downloaded %d files in %s\n", len(allFiles), time.Since(start)) + return nil } func createTestCoverageFile(path string) error { @@ -1419,7 +1423,10 @@ func run() error { x.Check(os.MkdirAll(*tmp, 0755)) } if testSuiteContainsAny("load", "all") { - downloadDataFiles() + if err := downloadDataFiles(); err != nil { + fmt.Printf("Failed to download data files: %v\n", err) + return + } } if testSuiteContainsAny("ldbc", "all") { // LDBC files go into a subdirectory because the LDBC test bulk-loads @@ -1427,7 +1434,10 @@ func run() error { // with LDBC data causes schema mismatches. ldbcDir := filepath.Join(*tmp, "ldbc") x.Check(os.MkdirAll(ldbcDir, 0755)) - downloadLDBCFiles(ldbcDir) + if err := downloadLDBCFiles(ldbcDir); err != nil { + fmt.Printf("Failed to download LDBC files: %v\n", err) + return + } } for i, task := range valid { select { @@ -1478,6 +1488,9 @@ func main() { procId = rand.Intn(1000) err := run() + if !*keepData && *tmp != "" { + _ = os.RemoveAll(*tmp) + } if err != nil { os.Exit(1) } diff --git a/testutil/benchmark-data-version b/testutil/benchmark-data-version new file mode 100644 index 00000000000..ba2906d0666 --- /dev/null +++ b/testutil/benchmark-data-version @@ -0,0 +1 @@ +main diff --git a/testutil/download.go b/testutil/download.go new file mode 100644 index 00000000000..ee4ffac217d --- /dev/null +++ b/testutil/download.go @@ -0,0 +1,260 @@ +/* + * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package testutil + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "log" + "net/http" + "os" + "os/exec" + "path/filepath" + "runtime" + "strconv" + "strings" + "time" +) + +const ( + defaultMaxRetries = 3 + + benchmarkRepo = "dgraph-io/dgraph-benchmarks" + + // Environment variable to override the benchmark data ref at runtime. + envDataRef = "DGRAPH_TEST_DATA_REF" + + // benchmarkDataVersionFile is the filename (co-located with this source file + // in testutil/) that pins the git ref for benchmark data downloads. + // Changing its contents invalidates CI caches (workflows key on + // hashFiles('testutil/benchmark-data-version')). + // + // To override at runtime without editing the file: + // - Set the DGRAPH_TEST_DATA_REF environment variable, or + // - Pass --data-ref= to the t test runner. + benchmarkDataVersionFile = "benchmark-data-version" +) + +var ( + // lfsPointerPrefix is the first line of every Git LFS pointer file. + lfsPointerPrefix = []byte("version https://git-lfs.github.com") + + // gzipMagic is the two-byte header for gzip files. + gzipMagic = []byte{0x1f, 0x8b} +) + +// testutilDir returns the directory containing this source file (testutil/). +func testutilDir() string { + _, thisFile, _, _ := runtime.Caller(0) + return filepath.Dir(thisFile) +} + +// BenchmarkDataRef returns the git ref (branch, tag, or SHA) to use when +// downloading benchmark data from dgraph-benchmarks. Resolution order: +// 1. refOverride argument (non-empty string, e.g. from a --data-ref CLI flag) +// 2. DGRAPH_TEST_DATA_REF environment variable +// 3. Contents of testutil/benchmark-data-version file +// 4. Falls back to "main" +func BenchmarkDataRef(refOverride string) string { + if refOverride != "" { + return refOverride + } + if v := os.Getenv(envDataRef); v != "" { + return v + } + versionFile := filepath.Join(testutilDir(), benchmarkDataVersionFile) + if data, err := os.ReadFile(versionFile); err == nil { + if ref := strings.TrimSpace(string(data)); ref != "" { + return ref + } + } + return "main" +} + +// BenchmarkRawURL returns a raw.githubusercontent.com URL for a non-LFS file +// in the dgraph-benchmarks repo at the given ref. +func BenchmarkRawURL(ref, path string) string { + return fmt.Sprintf("https://raw.githubusercontent.com/%s/%s/%s", benchmarkRepo, ref, path) +} + +// BenchmarkLFSURL returns a media.githubusercontent.com URL for an LFS-tracked +// file in the dgraph-benchmarks repo at the given ref. +func BenchmarkLFSURL(ref, path string) string { + return fmt.Sprintf("https://media.githubusercontent.com/media/%s/%s/%s", benchmarkRepo, ref, path) +} + +// DownloadFile downloads a file from url into dir/fname using wget with retry +// logic (3 Go-level attempts, exponential backoff). wget itself uses --tries=1 +// so there is a single retry layer. On final failure, any partial file is +// removed to prevent corrupt data from persisting in caches. +// +// After a successful download the file is validated with ValidateFile; if +// validation fails the file is removed and an error is returned. +func DownloadFile(fname, url, dir string) error { + fpath := filepath.Join(dir, fname) + for attempt := 1; attempt <= defaultMaxRetries; attempt++ { + cmd := exec.Command("wget", "--tries=1", "--retry-connrefused", "-O", fname, url) + cmd.Dir = dir + if out, err := cmd.CombinedOutput(); err != nil { + log.Printf("attempt %d/%d failed to download %s: %v\n%s", + attempt, defaultMaxRetries, fname, err, string(out)) + if attempt < defaultMaxRetries { + time.Sleep(time.Duration(attempt*5) * time.Second) + continue + } + _ = os.Remove(fpath) + return fmt.Errorf("failed to download %s after %d attempts: %w", fname, defaultMaxRetries, err) + } + if err := ValidateFile(fpath); err != nil { + _ = os.Remove(fpath) + return fmt.Errorf("downloaded file %s is invalid: %w", fname, err) + } + return nil + } + return nil +} + +// lfsPointerInfo holds the expected SHA256 and size parsed from an LFS pointer. +type lfsPointerInfo struct { + SHA256 string + Size int64 +} + +// fetchLFSPointer fetches the raw LFS pointer file for the given repo path +// and ref, then parses out the SHA256 hash and file size. +func fetchLFSPointer(ref, repoPath string) (*lfsPointerInfo, error) { + pointerURL := BenchmarkRawURL(ref, repoPath) + resp, err := http.Get(pointerURL) + if err != nil { + return nil, fmt.Errorf("fetching LFS pointer: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("LFS pointer returned status %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading LFS pointer body: %w", err) + } + + info := &lfsPointerInfo{} + for _, line := range strings.Split(string(body), "\n") { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "oid sha256:") { + info.SHA256 = strings.TrimPrefix(line, "oid sha256:") + } + if strings.HasPrefix(line, "size ") { + info.Size, err = strconv.ParseInt(strings.TrimPrefix(line, "size "), 10, 64) + if err != nil { + return nil, fmt.Errorf("parsing LFS pointer size: %w", err) + } + } + } + if info.SHA256 == "" || info.Size == 0 { + return nil, fmt.Errorf("LFS pointer missing oid or size fields") + } + return info, nil +} + +// verifyFileChecksum computes the SHA256 of the file at fpath and compares it +// against the expected hash. Also verifies the file size matches. +func verifyFileChecksum(fpath string, expected *lfsPointerInfo) error { + fi, err := os.Stat(fpath) + if err != nil { + return err + } + if fi.Size() != expected.Size { + return fmt.Errorf("size mismatch: local %d != expected %d", fi.Size(), expected.Size) + } + + f, err := os.Open(fpath) + if err != nil { + return err + } + defer f.Close() + + h := sha256.New() + if _, err := io.Copy(h, f); err != nil { + return fmt.Errorf("computing SHA256: %w", err) + } + actual := hex.EncodeToString(h.Sum(nil)) + if actual != expected.SHA256 { + return fmt.Errorf("SHA256 mismatch: local %s != expected %s", actual, expected.SHA256) + } + return nil +} + +// DownloadLFSFile downloads an LFS-tracked file and verifies its integrity +// against the LFS pointer (SHA256 + size). It first fetches the tiny pointer +// from raw.githubusercontent.com to get the expected hash and size, then +// downloads the actual content from media.githubusercontent.com and verifies. +// +// This provides full integrity verification: truncation, corruption, and +// wrong-file detection are all caught. +func DownloadLFSFile(fname, ref, repoPath, dir string) error { + pointer, err := fetchLFSPointer(ref, repoPath) + if err != nil { + log.Printf("warning: could not fetch LFS pointer for %s, falling back to basic download: %v", fname, err) + return DownloadFile(fname, BenchmarkLFSURL(ref, repoPath), dir) + } + + url := BenchmarkLFSURL(ref, repoPath) + if err := DownloadFile(fname, url, dir); err != nil { + return err + } + + fpath := filepath.Join(dir, fname) + if err := verifyFileChecksum(fpath, pointer); err != nil { + _ = os.Remove(fpath) + return fmt.Errorf("integrity check failed for %s: %w", fname, err) + } + log.Printf("verified %s: SHA256=%s size=%d", fname, pointer.SHA256, pointer.Size) + return nil +} + +// FileExistsAndValid returns true if the file at fpath exists, is a regular +// file, has size > 0, and passes ValidateFile content checks. +func FileExistsAndValid(fpath string) bool { + fi, err := os.Stat(fpath) + if err != nil || fi.IsDir() || fi.Size() == 0 { + return false + } + return ValidateFile(fpath) == nil +} + +// ValidateFile performs content-level integrity checks on a downloaded file: +// - Rejects Git LFS pointer files (text stubs left when LFS content wasn't fetched). +// - Verifies .gz files start with the gzip magic bytes (0x1f 0x8b). +func ValidateFile(fpath string) error { + f, err := os.Open(fpath) + if err != nil { + return err + } + defer f.Close() + + header := make([]byte, 64) + n, err := f.Read(header) + if err != nil { + return fmt.Errorf("reading file header: %w", err) + } + header = header[:n] + + if bytes.HasPrefix(header, lfsPointerPrefix) { + return fmt.Errorf("file is a Git LFS pointer, not actual content") + } + + if strings.HasSuffix(fpath, ".gz") { + if n < 2 || !bytes.HasPrefix(header, gzipMagic) { + return fmt.Errorf("file does not have valid gzip header") + } + } + + return nil +}