Skip to content

Commit 940a084

Browse files
damian0815Damian Stewart
andauthored
feat: util/are_part_min_max_increasing.py tool
Add a tool to check if row groups .min / .max for a particular column (eg url_surtkey) are increasing within each parquet file passed as input. Note that this tool only checks that, within a single `.parquet` file, each row group's `.min` is greater than or equal to the previous row group's `.max`; further, the context of this condition is restricted to a single parquet file. For cases where the table as a whole consists of multiple parquet files, the condition may not hold across file boundaries. Signed-off-by: Damian Stewart <ot@damianstewart.com> Co-authored-by: Damian Stewart <ot@damianstewart.com>
1 parent eac5b01 commit 940a084

File tree

7 files changed

+264
-0
lines changed

7 files changed

+264
-0
lines changed

.github/workflows/python_test.yml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
name: Python Unit Tests
2+
3+
on:
4+
push:
5+
branches: [ main ]
6+
pull_request:
7+
branches: [ main ]
8+
workflow_dispatch:
9+
10+
jobs:
11+
test:
12+
runs-on: ubuntu-latest
13+
strategy:
14+
matrix:
15+
python-version: ['3.10', '3.11', '3.12', '3.13']
16+
fail-fast: false
17+
18+
steps:
19+
- uses: actions/checkout@v4
20+
21+
- name: Set up Python ${{ matrix.python-version }}
22+
uses: actions/setup-python@v5
23+
with:
24+
python-version: ${{ matrix.python-version }}
25+
26+
- name: Install uv
27+
run: |
28+
curl -LsSf https://astral.sh/uv/install.sh | sh
29+
echo "$HOME/.cargo/bin" >> $GITHUB_PATH
30+
31+
- name: Install dependencies
32+
run: |
33+
uv sync
34+
35+
- name: Run tests
36+
run: |
37+
uv run -m pytest src -v

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,3 +314,9 @@ It's also possible to pass the result of SQL query as a CSV file, e.g., an Athen
314314
...
315315
```
316316

317+
## Part Row Group Test
318+
319+
This repository also includes a tool to check whether the row groups in a given table part parquet file have increasing min/max values - see `util/are_part_min_max_increasing.py`.
320+
321+
Note that this tool only checks that, within a single `.parquet` file, each row group's `.min` is greater than or equal to the previous row group's `.max`; further, the context of this condition is restricted to a single parquet file. For cases where the table as a whole consists of multiple parquet files, the condition may not hold across file boundaries.
322+

pyproject.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[project]
2+
name = "cc-index-table"
3+
version = "0.1.0"
4+
description = "Tools for working with Common Crawl index tables."
5+
requires-python = ">=3.12"
6+
dependencies = [
7+
"boto3>=1.40.61",
8+
"pyarrow>=22.0.0",
9+
"pytest>=8.4.2",
10+
"tqdm>=4.67.1",
11+
]

src/util/__init__.py

Whitespace-only changes.
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
from collections import defaultdict
2+
3+
import pyarrow.parquet as pq
4+
import argparse
5+
6+
from urllib.parse import urlparse
7+
import boto3
8+
import gzip
9+
from tqdm.auto import tqdm
10+
11+
class NoStatisticsException(Exception):
12+
pass
13+
14+
def are_parquet_file_row_groups_min_max_ordered(pf: pq.ParquetFile, column_name: str) -> bool:
15+
sort_column_index = next(i for i, name in enumerate(pf.schema.names)
16+
if name == column_name)
17+
18+
# keep track of min/max in this ParquetFile
19+
whole_min = None
20+
whole_max = None
21+
prev_max = None
22+
for row_group_index in range(pf.num_row_groups):
23+
row_group = pf.metadata.row_group(row_group_index)
24+
column = row_group.column(sort_column_index)
25+
if column.statistics.min is None or column.statistics.max is None:
26+
print(f"row group {row_group_index} has null min/max statistics on {column_name}, skipping")
27+
continue
28+
if prev_max is not None and prev_max > column.statistics.min:
29+
print(f"row group {row_group_index} min is not strictly increasing w.r.t previous row group max on {column_name}: '{column.statistics.min}' <= '{prev_max}' ; stopping")
30+
return False
31+
whole_min = column.statistics.min if whole_min is None else min(column.statistics.min, whole_min)
32+
whole_max = column.statistics.max if whole_max is None else max(column.statistics.max, whole_max)
33+
prev_max = column.statistics.max
34+
return True
35+
36+
37+
def are_all_parts_min_max_ordered(file_or_s3_url_list: list[str], sort_column_name: str) -> bool:
38+
is_sorted = True
39+
status = defaultdict(int)
40+
with tqdm(file_or_s3_url_list) as pbar:
41+
for file_or_url in pbar:
42+
pf = pq.ParquetFile(file_or_url)
43+
this_is_sorted = are_parquet_file_row_groups_min_max_ordered(pf, column_name=sort_column_name)
44+
if not this_is_sorted:
45+
print(
46+
f"Row groups are *internally* not ordered by min/max in file {file_or_url}"
47+
)
48+
is_sorted = False
49+
status['internally_unsorted'] += 1
50+
51+
pbar.set_postfix(status)
52+
return is_sorted
53+
54+
55+
def is_gzip(content: bytes) -> bool:
56+
return content[:2] == b'\x1f\x8b'
57+
58+
59+
def read_file_list(path_or_url: str, prefix: str) -> list[str]:
60+
parsed = urlparse(path_or_url)
61+
if parsed.scheme == "s3":
62+
s3 = boto3.client("s3")
63+
bucket = parsed.netloc
64+
key = parsed.path.lstrip("/")
65+
obj = s3.get_object(Bucket=bucket, Key=key)
66+
content = obj["Body"].read()
67+
else:
68+
with open(path_or_url, "rb") as f:
69+
content = f.read()
70+
71+
if is_gzip(content):
72+
content = gzip.decompress(content)
73+
lines = content.decode("utf-8").split("\n")
74+
return [prefix + line.strip() for line in lines if len(line.strip()) > 0]
75+
76+
77+
if __name__ == "__main__":
78+
parser = argparse.ArgumentParser(description="Check if row groups within parquet files have strictly increasing non-overlapping min/max ranges. Exit code is 0 if sorted, 1 if not sorted.")
79+
parser.add_argument("files_or_s3_urls_file", type=str, help="path or s3:// URI to a text file containing a list of paths, to check; used in combination with --prefix to recover individual file paths.")
80+
parser.add_argument("--is_single_parquet_file", action=argparse.BooleanOptionalAction, default=False, help="If passed, treat the input as a single parquet file")
81+
parser.add_argument("--prefix", type=str, default="s3://commoncrawl/", help="Prefix to prepend to entries read from the file (default: 's3://commoncrawl/')")
82+
parser.add_argument("--column", type=str, default="url_surtkey", help="Column name to check against (default: 'url_surtkey')")
83+
84+
args = parser.parse_args()
85+
86+
if args.is_single_parquet_file:
87+
files = [args.files_or_s3_urls_file]
88+
else:
89+
files = read_file_list(args.files_or_s3_urls_file, prefix=args.prefix)
90+
is_sorted = are_all_parts_min_max_ordered(files, sort_column_name=args.column)
91+
if is_sorted:
92+
print("✅ Files are sorted")
93+
exit(0)
94+
else:
95+
print("❌ Files are NOT sorted")
96+
exit(1)

src/util/test/__init__.py

Whitespace-only changes.
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import random
2+
from unittest.mock import MagicMock, patch
3+
4+
from util.are_part_min_max_increasing import are_parquet_file_row_groups_min_max_ordered, are_all_parts_min_max_ordered
5+
6+
7+
def _create_mock_parquet_file(column_name: str, row_groups_stats: list[tuple[str, str]]):
8+
mock_pf = MagicMock()
9+
mock_pf.schema.names = [column_name]
10+
mock_pf.num_row_groups = len(row_groups_stats)
11+
12+
mock_row_groups = []
13+
for min_val, max_val in row_groups_stats:
14+
mock_row_group = MagicMock()
15+
mock_column = MagicMock()
16+
mock_column.statistics.min = min_val
17+
mock_column.statistics.max = max_val
18+
mock_row_group.column.return_value = mock_column
19+
mock_row_groups.append(mock_row_group)
20+
21+
mock_pf.metadata.row_group.side_effect = lambda i: mock_row_groups[i]
22+
return mock_pf
23+
24+
25+
def test_single_row_group_sorted():
26+
mock_pf = _create_mock_parquet_file('url_surtkey', [('a', 'b')])
27+
is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey')
28+
assert is_sorted
29+
30+
31+
def test_row_groups_sorted():
32+
all_row_groups_stats = [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h')]
33+
for n in range(1, len(all_row_groups_stats)):
34+
row_groups_stats = all_row_groups_stats[:n]
35+
mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats)
36+
is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey')
37+
assert is_sorted
38+
39+
40+
def test_row_groups_unsorted():
41+
all_row_groups_stats = [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h')]
42+
count = 0
43+
while count < 100:
44+
for n in range(2, len(all_row_groups_stats)):
45+
row_groups_stats = all_row_groups_stats[:n].copy()
46+
random.shuffle(row_groups_stats)
47+
if row_groups_stats == all_row_groups_stats[:n]:
48+
# shuffle resulted in same order, try again
49+
continue
50+
51+
mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats)
52+
is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey')
53+
assert not is_sorted
54+
55+
count += 1
56+
57+
58+
def test_row_groups_null_stats_inorder():
59+
row_groups = [('a', 'b'), (None, None), ('c', 'd')]
60+
mock_pf = _create_mock_parquet_file('url_surtkey', row_groups)
61+
is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey')
62+
assert is_sorted
63+
64+
65+
def test_row_groups_null_stats_out_of_order():
66+
row_groups = [('c', 'd'), (None, None), ('a', 'b')]
67+
mock_pf = _create_mock_parquet_file('url_surtkey', row_groups)
68+
is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey')
69+
assert not is_sorted
70+
71+
72+
def test_row_groups_overlapping_min_max():
73+
row_groups = [('a', 'b'), ('b', 'd'), ('e', 'f'), ('g', 'h')]
74+
mock_pf = _create_mock_parquet_file('url_surtkey', row_groups)
75+
is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey')
76+
assert is_sorted
77+
78+
79+
def test_row_groups_overlapping():
80+
row_groups = [('a', 'c'), ('b', 'd')]
81+
mock_pf = _create_mock_parquet_file('url_surtkey', row_groups)
82+
is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey')
83+
assert not is_sorted
84+
85+
86+
def test_ordered_files_sorted():
87+
files_config = {
88+
'/data/a': [('aaa', 'bbb'), ('bbc', 'ccc')],
89+
'/data/b': [('ccd', 'ddd'), ('dde', 'eee')],
90+
'/data/c': [('eef', 'fff'), ('ffg', 'ggg')],
91+
}
92+
93+
def mock_parquet_file(path):
94+
return _create_mock_parquet_file('url_surtkey', files_config[path])
95+
96+
with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file):
97+
result = are_all_parts_min_max_ordered(['/data/a', '/data/b', '/data/c'], 'url_surtkey')
98+
assert result
99+
100+
101+
def test_ordered_files_unsorted():
102+
files_config = {
103+
'/data/a': [('aaa', 'bbb'), ('bbc', 'ccc')],
104+
'/data/b': [('ccd', 'ddd'), ('dde', 'eee')],
105+
'/data/c': [('eef', 'fff'), ('ffg', 'ggg')],
106+
}
107+
108+
def mock_parquet_file(path):
109+
return _create_mock_parquet_file('url_surtkey', files_config[path])
110+
111+
with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file):
112+
result = are_all_parts_min_max_ordered(['/data/a', '/data/c', '/data/b'], 'url_surtkey')
113+
assert result # we don't care about the order of files
114+

0 commit comments

Comments
 (0)