-
Notifications
You must be signed in to change notification settings - Fork 14
Add a tool to check if row groups .min / .max are strictly increasing within a parquet file
#40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4d4038c
0b229d9
2914f42
9285ec6
6d63937
b120571
aeffeec
a3484ac
62f7a9a
c94b835
eea7ed9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| name: Python Unit Tests | ||
|
|
||
| on: | ||
| push: | ||
| branches: [ main ] | ||
| pull_request: | ||
| branches: [ main ] | ||
| workflow_dispatch: | ||
|
|
||
| jobs: | ||
| test: | ||
| runs-on: ubuntu-latest | ||
| strategy: | ||
| matrix: | ||
| python-version: ['3.10', '3.11', '3.12', '3.13'] | ||
| fail-fast: false | ||
|
|
||
| steps: | ||
| - uses: actions/checkout@v4 | ||
|
|
||
| - name: Set up Python ${{ matrix.python-version }} | ||
| uses: actions/setup-python@v5 | ||
| with: | ||
| python-version: ${{ matrix.python-version }} | ||
|
|
||
| - name: Install uv | ||
| run: | | ||
| curl -LsSf https://astral.sh/uv/install.sh | sh | ||
| echo "$HOME/.cargo/bin" >> $GITHUB_PATH | ||
|
|
||
| - name: Install dependencies | ||
| run: | | ||
| uv sync | ||
|
|
||
| - name: Run tests | ||
| run: | | ||
| uv run -m pytest src -v |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| [project] | ||
| name = "cc-index-table" | ||
| version = "0.1.0" | ||
| description = "Tools for working with Common Crawl index tables." | ||
| requires-python = ">=3.12" | ||
| dependencies = [ | ||
| "boto3>=1.40.61", | ||
| "pyarrow>=22.0.0", | ||
| "pytest>=8.4.2", | ||
| "tqdm>=4.67.1", | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| from collections import defaultdict | ||
|
|
||
| import pyarrow.parquet as pq | ||
| import argparse | ||
|
|
||
| from urllib.parse import urlparse | ||
| import boto3 | ||
| import gzip | ||
| from tqdm.auto import tqdm | ||
|
|
||
| class NoStatisticsException(Exception): | ||
| pass | ||
|
|
||
| def are_parquet_file_row_groups_min_max_ordered(pf: pq.ParquetFile, column_name: str) -> bool: | ||
| sort_column_index = next(i for i, name in enumerate(pf.schema.names) | ||
| if name == column_name) | ||
|
|
||
| # keep track of min/max in this ParquetFile | ||
| whole_min = None | ||
| whole_max = None | ||
| prev_max = None | ||
| for row_group_index in range(pf.num_row_groups): | ||
| row_group = pf.metadata.row_group(row_group_index) | ||
| column = row_group.column(sort_column_index) | ||
| if column.statistics.min is None or column.statistics.max is None: | ||
| print(f"row group {row_group_index} has null min/max statistics on {column_name}, skipping") | ||
| continue | ||
| if prev_max is not None and prev_max > column.statistics.min: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Must prepared that no min/max statistics are available in a row group because of overlong URLs / SURT keys. Cf. PARQUET-1685:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, i can reproduce - what's the expected behaviour if column.statistics.min is None?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added logic to skip the row.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 If there are no statistics it should not fail. Skipping and reporting the row is fine. |
||
| print(f"row group {row_group_index} min is not strictly increasing w.r.t previous row group max on {column_name}: '{column.statistics.min}' <= '{prev_max}' ; stopping") | ||
| return False | ||
| whole_min = column.statistics.min if whole_min is None else min(column.statistics.min, whole_min) | ||
| whole_max = column.statistics.max if whole_max is None else max(column.statistics.max, whole_max) | ||
| prev_max = column.statistics.max | ||
| return True | ||
|
|
||
|
|
||
| def are_all_parts_min_max_ordered(file_or_s3_url_list: list[str], sort_column_name: str) -> bool: | ||
| is_sorted = True | ||
| status = defaultdict(int) | ||
| with tqdm(file_or_s3_url_list) as pbar: | ||
| for file_or_url in pbar: | ||
| pf = pq.ParquetFile(file_or_url) | ||
| this_is_sorted = are_parquet_file_row_groups_min_max_ordered(pf, column_name=sort_column_name) | ||
| if not this_is_sorted: | ||
| print( | ||
| f"Row groups are *internally* not ordered by min/max in file {file_or_url}" | ||
| ) | ||
| is_sorted = False | ||
| status['internally_unsorted'] += 1 | ||
|
|
||
| pbar.set_postfix(status) | ||
| return is_sorted | ||
|
|
||
|
|
||
| def is_gzip(content: bytes) -> bool: | ||
| return content[:2] == b'\x1f\x8b' | ||
|
|
||
|
|
||
| def read_file_list(path_or_url: str, prefix: str) -> list[str]: | ||
| parsed = urlparse(path_or_url) | ||
| if parsed.scheme == "s3": | ||
| s3 = boto3.client("s3") | ||
| bucket = parsed.netloc | ||
| key = parsed.path.lstrip("/") | ||
| obj = s3.get_object(Bucket=bucket, Key=key) | ||
| content = obj["Body"].read() | ||
| else: | ||
| with open(path_or_url, "rb") as f: | ||
| content = f.read() | ||
|
|
||
| if is_gzip(content): | ||
| content = gzip.decompress(content) | ||
| lines = content.decode("utf-8").split("\n") | ||
| return [prefix + line.strip() for line in lines if len(line.strip()) > 0] | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| parser = argparse.ArgumentParser(description="Check if row groups within parquet files have strictly increasing non-overlapping min/max ranges. Exit code is 0 if sorted, 1 if not sorted.") | ||
| parser.add_argument("files_or_s3_urls_file", type=str, help="path or s3:// URI to a text file containing a list of paths, to check; used in combination with --prefix to recover individual file paths.") | ||
| parser.add_argument("--is_single_parquet_file", action=argparse.BooleanOptionalAction, default=False, help="If passed, treat the input as a single parquet file") | ||
| parser.add_argument("--prefix", type=str, default="s3://commoncrawl/", help="Prefix to prepend to entries read from the file (default: 's3://commoncrawl/')") | ||
| parser.add_argument("--column", type=str, default="url_surtkey", help="Column name to check against (default: 'url_surtkey')") | ||
|
|
||
| args = parser.parse_args() | ||
|
|
||
| if args.is_single_parquet_file: | ||
| files = [args.files_or_s3_urls_file] | ||
| else: | ||
| files = read_file_list(args.files_or_s3_urls_file, prefix=args.prefix) | ||
| is_sorted = are_all_parts_min_max_ordered(files, sort_column_name=args.column) | ||
| if is_sorted: | ||
| print("✅ Files are sorted") | ||
| exit(0) | ||
| else: | ||
| print("❌ Files are NOT sorted") | ||
| exit(1) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| import random | ||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| from util.are_part_min_max_increasing import are_parquet_file_row_groups_min_max_ordered, are_all_parts_min_max_ordered | ||
|
|
||
|
|
||
| def _create_mock_parquet_file(column_name: str, row_groups_stats: list[tuple[str, str]]): | ||
| mock_pf = MagicMock() | ||
| mock_pf.schema.names = [column_name] | ||
| mock_pf.num_row_groups = len(row_groups_stats) | ||
|
|
||
| mock_row_groups = [] | ||
| for min_val, max_val in row_groups_stats: | ||
| mock_row_group = MagicMock() | ||
| mock_column = MagicMock() | ||
| mock_column.statistics.min = min_val | ||
| mock_column.statistics.max = max_val | ||
| mock_row_group.column.return_value = mock_column | ||
| mock_row_groups.append(mock_row_group) | ||
|
|
||
| mock_pf.metadata.row_group.side_effect = lambda i: mock_row_groups[i] | ||
| return mock_pf | ||
|
|
||
|
|
||
| def test_single_row_group_sorted(): | ||
| mock_pf = _create_mock_parquet_file('url_surtkey', [('a', 'b')]) | ||
| is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') | ||
| assert is_sorted | ||
|
|
||
|
|
||
| def test_row_groups_sorted(): | ||
| all_row_groups_stats = [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h')] | ||
| for n in range(1, len(all_row_groups_stats)): | ||
| row_groups_stats = all_row_groups_stats[:n] | ||
| mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats) | ||
| is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') | ||
| assert is_sorted | ||
|
|
||
|
|
||
| def test_row_groups_unsorted(): | ||
| all_row_groups_stats = [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h')] | ||
| count = 0 | ||
| while count < 100: | ||
| for n in range(2, len(all_row_groups_stats)): | ||
| row_groups_stats = all_row_groups_stats[:n].copy() | ||
| random.shuffle(row_groups_stats) | ||
| if row_groups_stats == all_row_groups_stats[:n]: | ||
| # shuffle resulted in same order, try again | ||
| continue | ||
|
|
||
| mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats) | ||
| is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') | ||
| assert not is_sorted | ||
|
|
||
| count += 1 | ||
|
|
||
|
|
||
| def test_row_groups_null_stats_inorder(): | ||
| row_groups = [('a', 'b'), (None, None), ('c', 'd')] | ||
| mock_pf = _create_mock_parquet_file('url_surtkey', row_groups) | ||
| is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') | ||
| assert is_sorted | ||
|
|
||
|
|
||
| def test_row_groups_null_stats_out_of_order(): | ||
| row_groups = [('c', 'd'), (None, None), ('a', 'b')] | ||
| mock_pf = _create_mock_parquet_file('url_surtkey', row_groups) | ||
| is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') | ||
| assert not is_sorted | ||
|
|
||
|
|
||
| def test_row_groups_overlapping_min_max(): | ||
| row_groups = [('a', 'b'), ('b', 'd'), ('e', 'f'), ('g', 'h')] | ||
| mock_pf = _create_mock_parquet_file('url_surtkey', row_groups) | ||
| is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') | ||
| assert is_sorted | ||
|
|
||
|
|
||
| def test_row_groups_overlapping(): | ||
| row_groups = [('a', 'c'), ('b', 'd')] | ||
| mock_pf = _create_mock_parquet_file('url_surtkey', row_groups) | ||
| is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') | ||
| assert not is_sorted | ||
|
|
||
|
|
||
| def test_ordered_files_sorted(): | ||
| files_config = { | ||
| '/data/a': [('aaa', 'bbb'), ('bbc', 'ccc')], | ||
| '/data/b': [('ccd', 'ddd'), ('dde', 'eee')], | ||
| '/data/c': [('eef', 'fff'), ('ffg', 'ggg')], | ||
| } | ||
|
|
||
| def mock_parquet_file(path): | ||
| return _create_mock_parquet_file('url_surtkey', files_config[path]) | ||
|
|
||
| with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file): | ||
| result = are_all_parts_min_max_ordered(['/data/a', '/data/b', '/data/c'], 'url_surtkey') | ||
| assert result | ||
|
|
||
|
|
||
| def test_ordered_files_unsorted(): | ||
| files_config = { | ||
| '/data/a': [('aaa', 'bbb'), ('bbc', 'ccc')], | ||
| '/data/b': [('ccd', 'ddd'), ('dde', 'eee')], | ||
| '/data/c': [('eef', 'fff'), ('ffg', 'ggg')], | ||
| } | ||
|
|
||
| def mock_parquet_file(path): | ||
| return _create_mock_parquet_file('url_surtkey', files_config[path]) | ||
|
|
||
| with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file): | ||
| result = are_all_parts_min_max_ordered(['/data/a', '/data/c', '/data/b'], 'url_surtkey') | ||
| assert result # we don't care about the order of files | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about this strict condition: values in
url_surtkeyare not unique and it may happen (although the probability is low) that two rows with the same SURT key end up in two row groups. Than the tool reports an error, although the column might be perfectly sorted.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thnk this is fine as-is? in the case where
prev_max == column.statistics.minthis condition fails and no error is reportedThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a unit test to validate that this case is indeed supported