diff --git a/.gitignore b/.gitignore index 4c49bd7..76ffd48 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ .env +dataset/*.parquet +dataset/submissions/* \ No newline at end of file diff --git a/README.md b/README.md index 54fe6b8..0113e64 100644 --- a/README.md +++ b/README.md @@ -34,3 +34,18 @@ python export.py The script will create a directory at the specified output path containing the dataset in Parquet format. If `--output_dir` is not provided, it will save to `dataset` in the current working directory. +## Tests +The deduplication scripts can be tested by running +```bash +python test_dedup.py +# if you have pytest you can run +python -m pytest test_dedup.py -v +``` +To test things we actually create a fake dataset. Here are the features of it +The test creates a 50-entry dataset with: +- **Exact duplicates**: First 5 entries use identical code +- **Fuzzy duplicates**: Next 5 entries use similar code with small variations +- **Multiple run modes**: `leaderboard`, `test`, `benchmark` +- **Mixed success states**: Both `True` and `False` values for `run_passed` +- **Realistic struct data**: Complex nested structures for `run_result`, `run_compilation`, `run_meta`, and `run_system_info` +- **Proper timestamps**: All timestamp fields include timezone information diff --git a/dataset/README.md b/dataset/README.md index 6084549..52550e2 100644 --- a/dataset/README.md +++ b/dataset/README.md @@ -11,7 +11,13 @@ tags: license: mit --- -If you use GPUMODE/amd-kernels-2025 in your work, please cite: +This is the dataset that was created from the first and second AMD $100K kernel competitions, containing roughly 110K kernels for fp8-gemm, moe, mla, all2all, gemm+reducescatter, and allgather+gemm optimized to run on MI300. Learn more at gpumode.com/v2/news + +To see the full list of kernel competitions we've ran and are running you can checkout https://github.com/gpu-mode/reference-kernels which also contains details on reference kernels and their input shapes and distributions + +We are planning on adding kernels optimized for NVFP4 on Blackwell next + +If you use this dataset in your work, please cite: ```bibtex @inproceedings{ diff --git a/dedup.py b/dedup.py new file mode 100644 index 0000000..8f69f1c --- /dev/null +++ b/dedup.py @@ -0,0 +1,564 @@ +""" +Deduplication Pipeline for Code Submissions +============================================ + +This module removes duplicate code submissions using a two-stage approach: + +1. EXACT DEDUPLICATION (remove_duplicates) + - Computes SHA-256 hash of each code submission + - Groups submissions by run_mode, run_passed, and score/duration + - Within each group, keeps only unique code (by hash) + - When duplicates exist, keeps the one with better metrics (lower score or faster duration) + +2. FUZZY DEDUPLICATION (fuzzy_filter) + - Uses MinHash + Locality Sensitive Hashing (LSH) to find near-duplicates + - Process: + a) Convert each code submission to a set of character n-grams (default: 5-char) + b) Create MinHash signature for each submission (compact fingerprint) + c) Use LSH to efficiently find candidate pairs with high Jaccard similarity + d) Group similar submissions into clusters + e) Keep one representative from each cluster (highest submission ID) + +Usage: + + In practice this should be part of export.py, but if + you need to run things adhoc just do: + + python dedup.py input.parquet output.parquet + +""" + +from datasets import load_dataset +import tqdm +from collections import defaultdict +import hashlib +from typing import Dict, List, Tuple, Union +from concurrent.futures import ProcessPoolExecutor, as_completed +import multiprocessing + +import datasketch +import pandas as pd +import numpy as np +import pyarrow.parquet as pq +import os + +# ============================================================================= +# DEDUPLICATION CONFIGURATION CONSTANTS +# ============================================================================= + +# Fuzzy Deduplication Parameters +FUZZY_SIMILARITY_THRESHOLD = 0.8 +""" +Jaccard similarity threshold for considering two documents as duplicates. +Range: 0.0 to 1.0 +- 0.8 = High threshold, only very similar documents are considered duplicates +- 0.7 = Medium threshold, moderately similar documents are duplicates +- 0.5 = Low threshold, loosely similar documents are duplicates +Higher values = more strict deduplication, fewer items removed +""" + +NGRAM_SIZE = 5 +""" +Size of character n-grams used for MinHash fingerprinting. +- Smaller values (3-4): More sensitive to small changes, better for short text +- Larger values (5-7): Less sensitive to minor variations, better for longer text +- Too small: May create false positives (different texts seem similar) +- Too large: May miss actual duplicates with small variations +""" + +LSH_BANDS = 16 +""" +Number of bands for Locality Sensitive Hashing (LSH). +Used to speed up similarity detection by grouping similar hashes. +- More bands = faster but less accurate similarity detection +- Fewer bands = slower but more accurate similarity detection +Must divide evenly into ROWS_PER_BAND * LSH_BANDS = total permutations +""" + +ROWS_PER_BAND = 128 +""" +Number of rows per band in LSH configuration. +Total MinHash permutations = ROWS_PER_BAND * LSH_BANDS +- More rows per band = higher precision, may miss some similar pairs +- Fewer rows per band = higher recall, may include more false positives +Default: 128 rows × 16 bands = 2048 total permutations +""" + +# Score Processing Parameters +LEADERBOARD_SCORE_PRECISION = 4 +""" +Number of decimal places to round leaderboard scores when grouping submissions. +Used to group submissions with very similar scores together. +- Higher precision (more decimal places): More granular grouping +- Lower precision (fewer decimal places): Broader grouping of similar scores +""" + +DURATION_PRECISION = 0 +""" +Number of decimal places to round execution duration (in seconds). +Used to group submissions with similar execution times. +- 0: Round to nearest second (1.7s → 2s) +- 1: Round to nearest 0.1s (1.73s → 1.7s) +""" + +# ============================================================================= +# CONFIGURATION SUMMARY +# ============================================================================= +""" +Current deduplication configuration: +├─ Similarity Detection: 0.8 threshold (strict) +├─ Text Fingerprinting: 5-character n-grams +├─ LSH Performance: 16 bands × 128 rows = 2048 permutations +├─ Score Grouping: 4 decimal places for leaderboard scores +└─ Duration Grouping: 0 decimal places for execution times + +To adjust deduplication sensitivity: +- Increase FUZZY_SIMILARITY_THRESHOLD (0.8→0.9) for stricter deduplication +- Decrease FUZZY_SIMILARITY_THRESHOLD (0.8→0.7) for more aggressive deduplication +- Adjust NGRAM_SIZE for different text lengths (3-4 for short, 5-7 for long) +""" + +def remove_duplicates(data_dict: Dict[str, Dict[bool, Dict[Union[float, int], List[Dict]]]]): + """ + Remove exact duplicates from the nested data structure returned by get_sorted_hf_data. + + Args: + data_dict: Nested dictionary structure from get_sorted_hf_data + + Returns: + Dictionary with same structure but duplicates removed + """ + deduplicated_dict = {} + + for run_mode, score_duration_dict in data_dict.items(): + deduplicated_dict[run_mode] = {} + + for run_success, run_success_dict in score_duration_dict.items(): + deduplicated_dict[run_mode][run_success] = {} + for score_duration, rows in run_success_dict.items(): + # Use a dictionary to track unique entries by their content hash + unique_entries = {} + + for row in rows: + content = row.get('code', "") + content_hash = hashlib.sha256(content.encode()).hexdigest() + + if content_hash not in unique_entries: + unique_entries[content_hash] = row + else: + # If duplicate found, keep the one with better metrics + existing_row = unique_entries[content_hash] + + # For leaderboard mode with successful runs, prefer lower scores / faster times + if run_mode == 'leaderboard' and row.get('run_passed') == True: + if row.get('run_score', 0) < existing_row.get('run_score', 0): + unique_entries[content_hash] = row + # For other cases, prefer shorter duration (faster execution) + else: + existing_duration = existing_row.get('run_meta', {}).get('duration', float('inf')) + current_duration = row.get('run_meta', {}).get('duration', float('inf')) + if current_duration < existing_duration: + unique_entries[content_hash] = row + + deduplicated_dict[run_mode][run_success][score_duration] = list(unique_entries.values()) + + return deduplicated_dict + + +def _create_single_minhash(args: Tuple[str, str, int, int]) -> Tuple[str, datasketch.MinHash]: + """Create a MinHash for a single document. Used for parallel processing.""" + submission_id, text, ngram_size, num_permutations = args + minhash = datasketch.MinHash(num_perm=num_permutations) + text_lower = text.lower() + text_bytes = text_lower.encode('utf8') + + # Generate n-grams directly as bytes to avoid repeated encoding + for i in range(len(text_bytes) - ngram_size + 1): + minhash.update(text_bytes[i:i + ngram_size]) + + return submission_id, minhash + + +def create_minhashes( + documents: List[Dict[str, str]], + ngram_size: int = NGRAM_SIZE, + bands: int = LSH_BANDS, + rows_per_band: int = ROWS_PER_BAND, + n_jobs: int = None, + position: int = 0, +) -> Dict[str, datasketch.MinHash]: + """ + Create MinHash signatures for a list of documents with LSH bands configuration. + + Args: + documents: List of dictionaries, each containing 'submission_id' and 'code' keys + ngram_size: Size of n-grams to generate from input text (default: 5) + bands: Number of bands for LSH (default: 16) + rows_per_band: Rows per band for LSH (default: 128) + n_jobs: Number of parallel workers. Defaults to CPU count. + position: Position for nested tqdm progress bar + + Returns: + Dictionary mapping document submission_ids to their MinHash signatures + """ + num_permutations = rows_per_band * bands + + if n_jobs is None: + n_jobs = multiprocessing.cpu_count() + + # Prepare arguments for parallel processing + args_list = [ + (doc["submission_id"], doc["code"], ngram_size, num_permutations) + for doc in documents + ] + + # Use parallel processing for large datasets + if len(documents) > 100 and n_jobs > 1: + minhash_dict = {} + with ProcessPoolExecutor(max_workers=n_jobs) as executor: + futures = {executor.submit(_create_single_minhash, args): args[0] for args in args_list} + for future in tqdm.tqdm(as_completed(futures), total=len(futures), + desc="Creating minhashes", position=position, leave=False): + submission_id, minhash = future.result() + minhash_dict[submission_id] = minhash + return minhash_dict + + # Sequential processing for small datasets + minhash_dict = {} + for args in tqdm.tqdm(args_list, desc="Creating minhashes", position=position, leave=False): + submission_id, minhash = _create_single_minhash(args) + minhash_dict[submission_id] = minhash + + return minhash_dict + + +def create_similarity_matrix( + minhashes: Dict[str, datasketch.MinHash], + rows_per_band: int, + num_bands: int, + threshold: float, +) -> Dict[str, List[str]]: + """Build LSH index and query for similar documents.""" + lsh = datasketch.MinHashLSH(threshold=threshold, num_perm=num_bands * rows_per_band) + + # Batch insert for better performance + for submission_id, minhash in minhashes.items(): + lsh.insert(submission_id, minhash) + + # Query all at once + similarity_matrix = {} + for submission_id, minhash in minhashes.items(): + similar_ids = lsh.query(minhash) + # Remove self from results inline + similarity_matrix[submission_id] = [s for s in similar_ids if s != submission_id] + + return similarity_matrix + + +def filter_matrix( + similarity_matrix: Dict[str, List[str]] +) -> set: + good_submission_ids = set() + processed = set() + + for submission_id, similar_submission_ids in similarity_matrix.items(): + if submission_id in processed: + continue + + # Find all submissions in the similarity cluster + cluster = {submission_id} + cluster.update(similar_submission_ids) + + # Keep the one with the largest ID (tiebreaker) + keeper = max(cluster) + good_submission_ids.add(keeper) + + # Mark all in cluster as processed + processed.update(cluster) + + return good_submission_ids + + +def fuzzy_filter( + data_dict: Dict[str, Dict[bool, Dict[Union[float, int], List[Dict]]]], + threshold: float = FUZZY_SIMILARITY_THRESHOLD, + ngram_size: int = NGRAM_SIZE, + bands: int = LSH_BANDS, + rows_per_band: int = ROWS_PER_BAND, +) -> Dict[str, Dict[bool, Dict[Union[float, int], List[Dict]]]]: + """Apply fuzzy deduplication to the nested data structure.""" + deduped_data = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + + # Count total groups for progress bar + total_groups = sum( + len(score_duration_dict) + for run_success_dict in data_dict.values() + for score_duration_dict in run_success_dict.values() + ) + + with tqdm.tqdm(total=total_groups, desc="Fuzzy dedup groups", position=0) as pbar: + for run_mode, run_success_dict in data_dict.items(): + for run_success, score_duration_dict in run_success_dict.items(): + for score_duration, rows in score_duration_dict.items(): + pbar.set_postfix({"mode": run_mode, "rows": len(rows)}) + deduped_data[run_mode][run_success][score_duration] = _fuzzy_filter( + rows, threshold, ngram_size, bands, rows_per_band, position=1 + ) + pbar.update(1) + + return deduped_data + + +def _fuzzy_filter( + data_list: List[Dict], + threshold: float = FUZZY_SIMILARITY_THRESHOLD, + ngram_size: int = NGRAM_SIZE, + bands: int = LSH_BANDS, + rows_per_band: int = ROWS_PER_BAND, + position: int = 0, +) -> List[Dict]: + """ + Apply fuzzy deduplication to a list of documents. + + Args: + data_list: List of row dictionaries + threshold: Similarity threshold for LSH + ngram_size: Size of n-grams for MinHash + bands: Number of bands for LSH + rows_per_band: Rows per band for LSH + position: Position for nested tqdm progress bar + + Returns: + List with fuzzy duplicates removed + """ + if len(data_list) <= 1: + return data_list + + # Build documents list without tqdm overhead + all_documents = [ + {"submission_id": str(i), "code": row.get('code', str(row)), "original_row": row} + for i, row in enumerate(data_list) + ] + + # Apply fuzzy deduplication + minhashes = create_minhashes( + all_documents, ngram_size=ngram_size, bands=bands, rows_per_band=rows_per_band, + position=position + ) + similarity_matrix = create_similarity_matrix( + minhashes, rows_per_band=rows_per_band, num_bands=bands, threshold=threshold + ) + + good_submission_ids = filter_matrix(similarity_matrix) + + # Keep only the documents that passed the filter + return [all_documents[int(sid)]["original_row"] for sid in good_submission_ids] + + +def convert_df_to_dict(df: pd.DataFrame) -> Dict[str, Dict[bool, Dict[Union[float, int], List[Dict]]]]: + """ + Convert a pandas DataFrame to a nested dictionary structure. + + Args: + df: pandas DataFrame + + Returns: + Nested dictionary structure grouped by run_mode, run_passed, and duration + """ + # Extract duration from run_meta column (vectorized where possible) + if 'run_meta' in df.columns: + durations = df['run_meta'].apply(lambda x: x.get('duration', 0) if isinstance(x, dict) else 0) + else: + durations = pd.Series([0] * len(df)) + + # Add duration as a column for grouping + df = df.copy() + df['_duration'] = durations + + data_dict = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + + # Group by run_mode and run_passed, then iterate groups (much faster than iterrows) + for (run_mode, run_passed), group in df.groupby(['run_mode', 'run_passed'], sort=False): + # Convert group to list of dicts at once + records = group.drop(columns=['_duration']).to_dict('records') + group_durations = group['_duration'].tolist() + + for record, duration in zip(records, group_durations): + data_dict[run_mode][run_passed][duration].append(record) + + return data_dict + +def flatten_data(data_dict: Dict[str, Dict[Union[float, int], List[Dict]]]) -> List[Dict]: + """ + Flatten the nested data structure to a list of documents with metadata. + + Args: + data_dict: Nested dictionary structure from get_sorted_hf_data + + Returns: + List of documents with additional metadata fields + """ + flattened = [] + for run_mode, run_success_dict in data_dict.items(): + for run_success, score_duration_dict in run_success_dict.items(): + for score_duration, rows in score_duration_dict.items(): + for row in rows: + # Add metadata directly to dict (avoid copy if possible) + if isinstance(row, dict): + row['_run_mode'] = run_mode + row['_run_success'] = run_success + row['_score_duration'] = score_duration + flattened.append(row) + else: + # Handle pandas Series + row_dict = row.to_dict() if hasattr(row, 'to_dict') else dict(row) + row_dict['_run_mode'] = run_mode + row_dict['_run_success'] = run_success + row_dict['_score_duration'] = score_duration + flattened.append(row_dict) + return flattened + +def dedup_df(df: pd.DataFrame) -> pd.DataFrame: + """ + Deduplicate a pandas DataFrame. + + Args: + df: pandas DataFrame + """ + # convert to dict + data_dict = convert_df_to_dict(df) + # deduplicate + deduplicated_data = fuzzy_filter( + data_dict, + threshold=FUZZY_SIMILARITY_THRESHOLD, + ngram_size=NGRAM_SIZE, + bands=LSH_BANDS, + rows_per_band=ROWS_PER_BAND + ) + # convert to df + flattened_data = flatten_data(deduplicated_data) + df = pd.DataFrame(flattened_data) + return df + +def create_parquet_file(data_dict: Dict[str, Dict[Union[float, int], List[Dict]]], filename: str): + """ + Create a Parquet file from the nested data structure. + + Args: + data_dict: Nested dictionary structure from get_sorted_hf_data + filename: Name of the output Parquet file + """ + # Flatten the data + flattened_data = flatten_data(data_dict) + + # Create a pandas DataFrame from the flattened data + df = pd.DataFrame(flattened_data) + # Convert the DataFrame to a Parquet file + df.to_parquet(filename, index=False) + + +def _count_items(data_dict: Dict[str, Dict[bool, Dict[Union[float, int], List[Dict]]]]) -> int: + """ + Count total number of items in the nested data structure. (useful for testing) + + Args: + data_dict: Nested dictionary structure from get_sorted_hf_data + + Returns: + Total number of items + """ + total = 0 + for run_mode in data_dict.values(): + for run_success_dict in run_mode.values(): + for rows in run_success_dict.values(): + total += len(rows) + return total + + +# Columns required for deduplication +REQUIRED_COLUMNS = ['code', 'run_mode', 'run_passed', 'run_meta', 'submission_id'] + + +def dedup_file(input_path: str, output_path: str) -> None: + """ + Deduplicate a parquet file and save the result. + + Args: + input_path: Path to input parquet file + output_path: Path to output parquet file + """ + # Show file size + file_size = os.path.getsize(input_path) + print(f"Loading {input_path} ({file_size / 1e9:.2f} GB)...") + + # Use PyArrow for faster loading, only load required columns + pf = pq.ParquetFile(input_path) + available_columns = pf.schema.names + columns_to_load = [c for c in REQUIRED_COLUMNS if c in available_columns] + + print(f"Loading columns: {columns_to_load}") + table = pq.read_table(input_path, columns=columns_to_load) + df = table.to_pandas() + print(f"Loaded {len(df)} rows") + + # Decode bytes to string if needed + if 'code' in df.columns and len(df) > 0: + if isinstance(df['code'].iloc[0], bytes): + print("Decoding code column from bytes...") + df['code'] = df['code'].apply( + lambda x: x.decode('utf-8') if isinstance(x, bytes) else x + ) + + original_count = len(df) + + # Convert to nested dict structure + print("Converting to nested structure...") + data_dict = convert_df_to_dict(df) + + # Apply exact deduplication + print("Applying exact deduplication...") + exact_deduped = remove_duplicates(data_dict) + exact_count = _count_items(exact_deduped) + + + # Apply fuzzy deduplication + print("Applying fuzzy deduplication...") + fuzzy_deduped = fuzzy_filter( + exact_deduped, + threshold=FUZZY_SIMILARITY_THRESHOLD, + ngram_size=NGRAM_SIZE, + bands=LSH_BANDS, + rows_per_band=ROWS_PER_BAND + ) + + # Flatten and save + print("Flattening and saving...") + flattened = flatten_data(fuzzy_deduped) + result_df = pd.DataFrame(flattened) + result_df.to_parquet(output_path, index=False) + + final_count = len(result_df) + + print("Deduplication results Summary:") + print(f"Original rows: {original_count}") + print(f"After hash based dedup dedup: {exact_count} rows") + print(f"Final rows: {final_count}") + print(f"Removed {original_count - final_count} duplicates ({100 * (original_count - final_count) / original_count:.1f}%)") + print(f"Saved to {output_path}") + + +def main(): + import sys + + if len(sys.argv) == 3: + # File-based deduplication + input_path = sys.argv[1] + output_path = sys.argv[2] + dedup_file(input_path, output_path) + else: + print("Usage: python dedup.py ") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/export.py b/export.py index b4e3cdb..5e2687f 100644 --- a/export.py +++ b/export.py @@ -1,10 +1,13 @@ import argparse +import gc import os -import numpy as np import pandas as pd -from datasets import Dataset from dotenv import load_dotenv from sqlalchemy import create_engine, text +import pyarrow as pa +import pyarrow.parquet as pq +import glob +from dedup import deduplicate_df load_dotenv() @@ -24,32 +27,32 @@ DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" # The leaderboard IDs to export -LEADERBOARD_IDS = [463, 430, 399, 398] +LEADERBOARD_IDS = [463, 430, 399, 398, 563, 564, 565] -def fetch_leaderboards(engine, leaderboard_ids) -> Dataset: +def fetch_and_save_leaderboards(engine, leaderboard_ids, output_path): """ - Fetches and processes leaderboard data from the database. + Fetches leaderboard data from the database and saves it directly to parquet. This function queries the database for specific leaderboards, selecting key fields and fetching all associated GPU types for each leaderboard - using a subquery. + using a subquery. It saves the leaderboards directly to parquet. Args: engine: The SQLAlchemy engine instance for database connection. leaderboard_ids: A list of integer IDs for the leaderboards to fetch. Returns: - A Hugging Face `Dataset` object containing the leaderboard data. + The number of leaderboards. """ - print("Fetching leaderboards...") + print("Fetching and saving leaderboards...") query = text(""" SELECT id, name, - deadline, + deadline AT TIME ZONE 'UTC' AS deadline, task->>'lang' AS lang, - task->>'description' AS description, + description, task->'files'->>'reference.py' AS reference, ( SELECT array_agg(gpu_type) @@ -60,10 +63,45 @@ def fetch_leaderboards(engine, leaderboard_ids) -> Dataset: WHERE id = ANY(:leaderboard_ids) """) df = pd.read_sql_query(query, engine, params={'leaderboard_ids': leaderboard_ids}) - return Dataset.from_pandas(df) + df.to_parquet(output_path, index=False) + print(f"Leaderboards saved to {output_path}") + + +def anonymize_users_in_db(engine, leaderboard_ids): + """Create a temporary mapping table in the database.""" + with engine.begin() as conn: + # Create temporary table with anonymized IDs + conn.execute(text(""" + CREATE TEMP TABLE user_mapping AS + SELECT + user_id as original_user_id, + ROW_NUMBER() OVER (ORDER BY RANDOM()) as anonymized_user_id + FROM ( + SELECT DISTINCT user_id + FROM leaderboard.submission + WHERE leaderboard_id = ANY(:leaderboard_ids) + ) AS distinct_users + """), {'leaderboard_ids': leaderboard_ids}) + + +def handle_empty_structs(df): + """ + Replace empty struct/dict values with None to avoid PyArrow serialization errors. + + PyArrow cannot write empty struct types to Parquet. This function checks + columns that contain dict/struct values and replaces empty ones with None. + """ + for col in df.columns: + if df[col].dtype == 'object': + # Check if column contains dict-like objects + sample = df[col].dropna().head(1) + if len(sample) > 0 and isinstance(sample.iloc[0], dict): + # Replace empty dicts with None + df[col] = df[col].apply(lambda x: None if isinstance(x, dict) and len(x) == 0 else x) + return df -def fetch_submissions(engine, leaderboard_ids) -> Dataset: +def fetch_and_save_submissions(engine, leaderboard_ids, output_path, chunksize=8192): """ Fetches and processes submission data from the database. @@ -76,23 +114,20 @@ def fetch_submissions(engine, leaderboard_ids) -> Dataset: engine: The SQLAlchemy engine instance for database connection. leaderboard_ids: A list of integer IDs for the leaderboards whose submissions are to be fetched. - - Returns: - A Hugging Face `Dataset` object containing the submissions data. """ print("Fetching submissions...") - query = text(""" + query = """ SELECT s.id AS submission_id, s.leaderboard_id, - s.user_id, - s.submission_time, + um.anonymized_user_id AS user_id, + s.submission_time AT TIME ZONE 'UTC' AS submission_time, s.file_name, c.code, c.id AS code_id, r.id AS run_id, - r.start_time AS run_start_time, - r.end_time AS run_end_time, + r.start_time AT TIME ZONE 'UTC' AS run_start_time, + r.end_time AT TIME ZONE 'UTC' AS run_end_time, r.mode AS run_mode, r.score AS run_score, r.passed AS run_passed, @@ -101,12 +136,61 @@ def fetch_submissions(engine, leaderboard_ids) -> Dataset: r.meta as run_meta, r.system_info AS run_system_info FROM leaderboard.submission AS s - JOIN leaderboard.runs AS r ON s.id = r.submission_id + LEFT JOIN leaderboard.runs AS r ON s.id = r.submission_id JOIN leaderboard.code_files AS c ON s.code_id = c.id + LEFT JOIN user_mapping um ON s.user_id = um.original_user_id WHERE s.leaderboard_id = ANY(:leaderboard_ids) - """) - df = pd.read_sql_query(query, engine, params={'leaderboard_ids': leaderboard_ids}) - return Dataset.from_pandas(df) + """ + + part = 0 + + with engine.connect().execution_options(stream_results=True) as conn: + for chunk_df in pd.read_sql_query( + text(query), + conn, + params={'leaderboard_ids': leaderboard_ids}, + chunksize=chunksize + ): + # Decode hex values code column + if 'code' in chunk_df.columns: + chunk_df['code'] = chunk_df['code'].apply(decode_hex_if_needed) + + # Convert nullable integer columns to consistent types + # This prevents type mismatches when some chunks have all NULLs + nullable_int_cols = ['run_id', 'code_id', 'submission_id', 'leaderboard_id', 'user_id'] + for col in nullable_int_cols: + if col in chunk_df.columns: + chunk_df[col] = chunk_df[col].astype('Int64') + + # Handle empty structs that PyArrow can't serialize + chunk_df = handle_empty_structs(chunk_df) + + # Convert to arrow table + table = pa.Table.from_pandas(chunk_df) + + # Write chunk as separate parquet file + filename = os.path.join(output_path, f"submissions_part_{part:05d}.parquet") + pq.write_table(table, filename) + + print(f" Wrote {len(chunk_df)} submissions to part {part}") + + # Filter for and save successful submissions + if 'run_passed' in chunk_df.columns: + success_mask = chunk_df['run_passed'] == True + if success_mask.any(): + success_df = chunk_df[success_mask] + success_table = pa.Table.from_pandas(success_df) + success_filename = os.path.join(output_path, f"successful_submissions_part_{part:05d}.parquet") + pq.write_table(success_table, success_filename) + print(f" Wrote {len(success_df)} successful submissions to part {part}") + del success_df, success_table, success_mask + + del chunk_df, table + gc.collect() + + part += 1 + + print(f"Submissions saved to {part} parquet files in {output_path}") def decode_hex_if_needed(code_val: str) -> str: @@ -132,7 +216,67 @@ def decode_hex_if_needed(code_val: str) -> str: return code_val -def main(output_dir): +def consolidate_parquet_files(input_dir, pattern, output_file): + """ + Consolidates multiple parquet part files into a single parquet file. + + Args: + input_dir: Directory containing the parquet part files + pattern: Glob pattern to match the part files (e.g., "submissions_part_*.parquet") + output_file: Path to the output consolidated parquet file + skip_deduplication: Whether to skip deduplication step (default: False) + """ + + # Find all matching parquet files + part_files = sorted(glob.glob(os.path.join(input_dir, pattern))) + + if not part_files: + print(f" No files found matching pattern {pattern}") + return + + print(f" Consolidating {len(part_files)} {pattern} files into {output_file}...") + + # First pass: Read only schemas (not data) from all files to unify them + schemas = [] + for part_file in part_files: + parquet_file = pq.ParquetFile(part_file) + schemas.append(parquet_file.schema_arrow) + + # Unify schemas across all tables to handle struct field variations + unified_schema = pa.unify_schemas(schemas) + + # Second pass: Read each file, cast to unified schema, and write incrementally + total_rows = 0 + with pq.ParquetWriter(output_file, unified_schema) as writer: + for part_file in part_files: + # Read one file at a time + table = pq.read_table(part_file) + + # Cast to unified schema (fills missing fields with nulls) + unified_table = table.cast(unified_schema) + + # Write to output file + writer.write_table(unified_table) + + total_rows += len(unified_table) + + print(f" Done! Consolidated {len(part_files)} files ({total_rows} total rows)") + +def deduplicate_parquet_file(input_file, output_file): + """ + Deduplicates a parquet file using the dedup.py script. + """ + + # load the parquet file into a pandas dataframe + df = pd.read_parquet(input_file) + + # deduplicate the dataframe + deduplicated_df = deduplicate_df(df) + + # save the deduplicated dataframe to a new parquet file + deduplicated_df.to_parquet(output_file) + +def main(output_dir, skip_deduplication): """ Orchestrates the data export process. @@ -140,81 +284,75 @@ def main(output_dir): and submission data, anonymizes user IDs, and saves the results to separate Parquet files: `leaderboards.parquet`, `submissions.parquet`, and `successful_submissions.parquet`. The user ID mapping is not saved. + Temporary files are not deleted and should be manually removed if + desired. Args: output_dir (str): The local directory path to save the Parquet files. """ engine = create_engine(DATABASE_URL) - rng = np.random.default_rng() # Ensure the output directory exists os.makedirs(output_dir, exist_ok=True) # Fetch and save leaderboards - leaderboards_dataset = fetch_leaderboards(engine, LEADERBOARD_IDS) leaderboards_output_path = os.path.join(output_dir, "leaderboards.parquet") - leaderboards_dataset.to_parquet(leaderboards_output_path) - print(f"Leaderboards dataset successfully saved to {leaderboards_output_path}") + fetch_and_save_leaderboards(engine, LEADERBOARD_IDS, leaderboards_output_path) + + anonymize_users_in_db(engine, LEADERBOARD_IDS) # Fetch submissions - submissions_dataset = fetch_submissions(engine, LEADERBOARD_IDS) - submissions_df = submissions_dataset.to_pandas() - - # Decode hexadecimal 'code' values - if 'code' in submissions_df.columns: - print("Decoding 'code' column from hexadecimal where necessary...") - submissions_df['code'] = submissions_df['code'].apply(decode_hex_if_needed) - - # Anonymize user IDs if submissions exist - if not submissions_df.empty and 'user_id' in submissions_df.columns: - print("Anonymizing user IDs...") - unique_user_ids = submissions_df['user_id'].unique() - num_unique_users = len(unique_user_ids) - - # Create a randomly permuted mapping in memory - permuted_ids = rng.permutation(range(1, num_unique_users + 1)) - user_map_df = pd.DataFrame({ - 'original_user_id': unique_user_ids, - 'anonymized_user_id': permuted_ids - }) - - # Replace original user IDs with anonymized IDs - original_cols = list(submissions_df.columns) - user_id_index = original_cols.index('user_id') - - submissions_df = submissions_df.merge(user_map_df, left_on='user_id', right_on='original_user_id') - submissions_df = submissions_df.drop(columns=['user_id', 'original_user_id']) - submissions_df = submissions_df.rename(columns={'anonymized_user_id': 'user_id'}) - - # Restore original column order - new_order = [col for col in original_cols if col != 'user_id'] - new_order.insert(user_id_index, 'user_id') - submissions_df = submissions_df[new_order] - - # Convert back to a dataset - submissions_dataset = Dataset.from_pandas(submissions_df) - - # Save the submissions dataset (anonymized or original if empty) - submissions_output_path = os.path.join(output_dir, "submissions.parquet") - submissions_dataset.to_parquet(submissions_output_path) - print(f"Submissions dataset successfully saved to {submissions_output_path}") - - # Filter for and save successful submissions from the anonymized data - if 'run_passed' in submissions_df.columns: - print("Creating successful submissions dataset...") - successful_submissions_df = submissions_df[submissions_df['run_passed'] == True].copy() - - # Convert to dataset and save - successful_submissions_dataset = Dataset.from_pandas(successful_submissions_df) - successful_output_path = os.path.join( - output_dir, "successful_submissions.parquet" + submissions_output_path = os.path.join(output_dir, "submissions") + os.makedirs(submissions_output_path, exist_ok=True) + fetch_and_save_submissions(engine, LEADERBOARD_IDS, submissions_output_path) + + # Consolidate part files into single parquet files + consolidate_parquet_files( + submissions_output_path, + "submissions_part_*.parquet", + os.path.join(output_dir, "submissions.parquet") + ) + + consolidate_parquet_files( + submissions_output_path, + "successful_submissions_part_*.parquet", + os.path.join(output_dir, "successful_submissions.parquet") + ) + + if not skip_deduplication: + deduplicated_submissions_output_path = os.path.join(output_dir, "deduplicated_submissions") + deduplicated_successful_submissions_output_path = os.path.join(output_dir, "deduplicated_successful_submissions") + os.makedirs(deduplicated_submissions_output_path, exist_ok=True) + # we do this as everything combined can be too much for pandas to handle + # if things get too big I'd multiprocess this + for file in glob.glob(os.path.join(output_dir, "submissions_part_*.parquet")): + deduplicate_parquet_file(file, os.path.join(deduplicated_submissions_output_path, os.path.basename(file))) + for file in glob.glob(os.path.join(output_dir, "successful_submissions_part_*.parquet")): + deduplicate_parquet_file(file, os.path.join(deduplicated_successful_submissions_output_path, os.path.basename(file))) + consolidate_parquet_files( + deduplicated_submissions_output_path, + "submissions_part_*.parquet", + os.path.join(output_dir, "deduplicated_submissions.parquet") ) - successful_submissions_dataset.to_parquet(successful_output_path) - print( - "Successful submissions dataset successfully saved to " - f"{successful_output_path}" + consolidate_parquet_files( + deduplicated_successful_submissions_output_path, + "successful_submissions_part_*.parquet", + os.path.join(output_dir, "deduplicated_successful_submissions.parquet") ) - + original_submission_rows = pd.read_parquet(os.path.join(output_dir, "submissions.parquet")).shape[0] + deduplicated_submission_rows = pd.read_parquet(os.path.join(output_dir, "deduplicated_submissions.parquet")).shape[0] + original_successful_submission_rows = pd.read_parquet(os.path.join(output_dir, "successful_submissions.parquet")).shape[0] + deduplicated_successful_submission_rows = pd.read_parquet(os.path.join(output_dir, "deduplicated_successful_submissions.parquet")).shape[0] + + print("Deduplication results Summary:") + print(f"Original submissions rows: {original_submission_rows}") + print(f"Deduplicated submissions rows: {deduplicated_submission_rows}") + print(f"Removed {original_submission_rows - deduplicated_submission_rows} duplicates ({100 * (original_submission_rows - deduplicated_submission_rows) / original_submission_rows:.1f}%)") + print(f"Original successful submissions rows: {original_successful_submission_rows}") + print(f"Deduplicated successful submissions rows: {deduplicated_successful_submission_rows}") + print(f"Removed {original_successful_submission_rows - deduplicated_successful_submission_rows} duplicates ({100 * (original_successful_submission_rows - deduplicated_successful_submission_rows) / original_successful_submission_rows:.1f}%)") + else: + print("Skipping deduplication step") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Export leaderboard data to a Hugging Face dataset.") @@ -224,5 +362,10 @@ def main(output_dir): default="dataset", help="Directory to save the Hugging Face dataset." ) + parser.add_argument( + "--skip_deduplication", + action="store_true", + help="Skip deduplication step" + ) args = parser.parse_args() - main(args.output_dir) \ No newline at end of file + main(args.output_dir, args.skip_deduplication) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 0698625..3b5b938 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,5 @@ -datasets pandas psycopg2-binary SQLAlchemy pyarrow python-dotenv -numpy \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/fixtures/__init__.py b/tests/fixtures/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/fixtures/create_fixtures.py b/tests/fixtures/create_fixtures.py new file mode 100644 index 0000000..2823b52 --- /dev/null +++ b/tests/fixtures/create_fixtures.py @@ -0,0 +1,43 @@ +"""Script to create small test fixtures from the actual parquet data.""" + +import pyarrow.parquet as pq +import pandas as pd +import os + +FIXTURE_DIR = os.path.dirname(__file__) + +def create_fixture(parquet_name: str) -> pd.DataFrame: + """Create a small fixture with 5 rows from the specified parquet file. + + Args: + parquet_name: Name of the parquet file (e.g., 'submissions' or 'successful_submissions') + + Returns: + DataFrame with the fixture data + """ + source_path = os.path.join(FIXTURE_DIR, f'../../data/{parquet_name}.parquet') + output_path = os.path.join(FIXTURE_DIR, f'{parquet_name}_fixture.parquet') + + if not os.path.exists(source_path): + print(f"Error: {source_path} does not exist") + print(f"Please place a recent {parquet_name}.parquet in {source_path} and rerun this script") + exit(1) + + columns = [ + 'submission_id', 'leaderboard_id', 'user_id', 'submission_time', + 'file_name', 'code', 'code_id', 'run_id', 'run_start_time', + 'run_end_time', 'run_mode', 'run_score', 'run_passed', + 'run_compilation', 'run_meta', 'run_system_info' + ] + + pf = pq.ParquetFile(source_path) + table = pf.read_row_group(0, columns=columns) + df = table.to_pandas().head(5) + + df.to_parquet(output_path, index=False) + print(f"Created {output_path} with {len(df)} rows") + return df + +if __name__ == '__main__': + create_fixture('submissions') + create_fixture('successful_submissions') diff --git a/tests/fixtures/submissions_fixture.parquet b/tests/fixtures/submissions_fixture.parquet new file mode 100644 index 0000000..67acda4 Binary files /dev/null and b/tests/fixtures/submissions_fixture.parquet differ diff --git a/tests/fixtures/successful_submissions_fixture.parquet b/tests/fixtures/successful_submissions_fixture.parquet new file mode 100644 index 0000000..7e4c853 Binary files /dev/null and b/tests/fixtures/successful_submissions_fixture.parquet differ diff --git a/tests/test_dedup.py b/tests/test_dedup.py new file mode 100644 index 0000000..721fd71 --- /dev/null +++ b/tests/test_dedup.py @@ -0,0 +1,490 @@ +#!/usr/bin/env python3 +""" +Unit tests for the deduplication pipeline. +Tests the end-to-end flow with fake data matching the database schema. +""" + +import unittest +import pandas as pd +import numpy as np +from datetime import datetime, timezone +import random +from typing import Dict, List, Any +import tempfile +import os +import sys + + +from dedup import ( + remove_duplicates, + fuzzy_filter, + convert_df_to_dict, + flatten_data, + dedup_df, + _count_items, + create_parquet_file +) + + +class TestDedupEndToEnd(unittest.TestCase): + + def setUp(self): + """Set up test fixtures with fake data matching the schema.""" + random.seed(42) # For reproducible tests + np.random.seed(42) + self.fake_data = self.create_fake_dataset(50) + self.df = pd.DataFrame(self.fake_data) + + def create_fake_dataset(self, num_entries: int) -> List[Dict[str, Any]]: + """Create a fake dataset with the required schema fields.""" + fake_data = [] + + # Sample code snippets (some duplicates for testing) + code_samples = [ + "def hello_world():\n print('Hello World')", + "import numpy as np\nx = np.array([1, 2, 3])", + "for i in range(10):\n print(i)", + "class MyClass:\n def __init__(self):\n pass", + "def fibonacci(n):\n if n <= 1:\n return n\n return fibonacci(n-1) + fibonacci(n-2)", + "import pandas as pd\ndf = pd.DataFrame({'a': [1, 2, 3]})", + "def quicksort(arr):\n if len(arr) <= 1:\n return arr", + "x = [1, 2, 3, 4, 5]\ny = [i**2 for i in x]", + "try:\n result = 10 / 0\nexcept ZeroDivisionError:\n print('Error')", + "def hello_world():\n print('Hello World')", # Exact duplicate + ] + + run_modes = ['leaderboard', 'benchmark', 'test'] + file_names = ['solution.py', 'main.py', 'algorithm.py', 'test.py'] + + for i in range(num_entries): + # Create base timestamp + base_time = datetime(2024, 1, 1, tzinfo=timezone.utc) + submission_time = base_time.replace( + day=random.randint(1, 28), + hour=random.randint(0, 23), + minute=random.randint(0, 59) + ) + + # Select code (with some duplicates) + code = random.choice(code_samples) + if i < 5: # First 5 entries use the same code for exact duplicate testing + code = code_samples[0] + elif i < 10: # Next 5 use slightly modified versions for fuzzy testing + code = code_samples[0] + f"\n# Comment {i}" + + run_mode = random.choice(run_modes) + run_passed = random.choice([True, False]) + + # Generate run score based on mode and success + if run_mode == 'leaderboard' and run_passed: + run_score = round(random.uniform(0.1, 1.0), 4) + else: + run_score = 0.0 if not run_passed else round(random.uniform(0.1, 0.8), 4) + + # Create the entry matching the database schema + entry = { + 'submission_id': i + 1000, + 'leaderboard_id': random.randint(1, 10), + 'user_id': random.randint(100, 999), + 'submission_time': submission_time, + 'file_name': random.choice(file_names), + 'code': code, + 'code_id': i + 2000, + 'run_id': i + 3000, + 'run_start_time': submission_time, + 'run_end_time': submission_time.replace( + second=random.randint(1, 59) + ), + 'run_mode': run_mode, + 'run_score': run_score, + 'run_passed': run_passed, + 'run_result': { + 'benchmark-count': random.randint(1, 10), + 'benchmark.0.best': f'benchmark_{random.randint(1, 100)}.txt', + 'benchmark.0.err': '', + 'benchmark.0.mean': round(random.uniform(0.1, 2.0), 6), + 'benchmark.0.report': f'report_{i}.json' + }, + 'run_compilation': { + 'command': 'python', + 'exit_code': 0 if run_passed else random.randint(1, 255), + 'nvcc_found': random.choice([True, False]), + 'nvcc_version': f'11.{random.randint(0, 8)}', + 'stderr': '' if run_passed else f'Error message {i}', + 'stdout': f'Output {i}', + 'success': run_passed + }, + 'run_meta': { + 'command': 'python solution.py', + 'duration': round(random.uniform(0.1, 10.0), 3), + 'exit_code': 0 if run_passed else random.randint(1, 255), + 'stderr': '' if run_passed else f'Runtime error {i}', + 'stdout': f'Runtime output {i}', + 'success': run_passed + }, + 'run_system_info': { + 'cpu': f'Intel Core i{random.randint(5, 9)}', + 'gpu': random.choice(['NVIDIA RTX 3080', 'NVIDIA RTX 4090', 'None']), + 'platform': random.choice(['linux', 'darwin', 'win32']), + 'torch': f'2.{random.randint(0, 3)}.{random.randint(0, 9)}' + } + } + fake_data.append(entry) + + return fake_data + + def test_dataframe_creation(self): + """Test that the fake dataset creates a valid DataFrame.""" + self.assertEqual(len(self.df), 50) + + # Check required columns exist (matching the schema in the image) + required_columns = [ + 'submission_id', 'leaderboard_id', 'user_id', 'submission_time', + 'file_name', 'code', 'code_id', 'run_id', 'run_start_time', + 'run_end_time', 'run_mode', 'run_score', 'run_passed', + 'run_result', 'run_compilation', 'run_meta', 'run_system_info' + ] + + for col in required_columns: + self.assertIn(col, self.df.columns, f"Missing required column: {col}") + + # Check data types + self.assertTrue(self.df['submission_id'].dtype in ['int64', 'int32']) + self.assertTrue(self.df['run_passed'].dtype == 'bool') + self.assertTrue(self.df['run_score'].dtype in ['float64', 'float32']) + + # Verify struct fields exist + sample_row = self.df.iloc[0] + self.assertIsInstance(sample_row['run_result'], dict) + self.assertIsInstance(sample_row['run_compilation'], dict) + self.assertIsInstance(sample_row['run_meta'], dict) + self.assertIsInstance(sample_row['run_system_info'], dict) + + def test_convert_df_to_dict(self): + """Test conversion from DataFrame to nested dictionary structure.""" + try: + data_dict = convert_df_to_dict(self.df) + + # Check structure + self.assertIsInstance(data_dict, dict) + + # Should have run_mode keys + run_modes = set(self.df['run_mode'].unique()) + self.assertEqual(set(data_dict.keys()), run_modes) + + # Check nested structure + for run_mode in data_dict: + self.assertIsInstance(data_dict[run_mode], dict) + for run_success in data_dict[run_mode]: + self.assertIsInstance(data_dict[run_mode][run_success], dict) + for score_duration in data_dict[run_mode][run_success]: + self.assertIsInstance( + data_dict[run_mode][run_success][score_duration], + list + ) + except NameError: + self.skipTest("convert_df_to_dict function not available") + + def test_exact_deduplication(self): + """Test exact duplicate removal.""" + try: + data_dict = convert_df_to_dict(self.df) + original_count = _count_items(data_dict) + + deduplicated_data = remove_duplicates(data_dict) + deduplicated_count = _count_items(deduplicated_data) + + # Should have fewer or equal items after deduplication + self.assertLessEqual(deduplicated_count, original_count) + + # Structure should be preserved + self.assertEqual(set(data_dict.keys()), set(deduplicated_data.keys())) + + except NameError as e: + self.skipTest(f"Required functions not available: {e}") + + def test_fuzzy_deduplication_small(self): + """Test fuzzy duplicate removal with small threshold for faster testing.""" + try: + data_dict = convert_df_to_dict(self.df) + original_count = _count_items(data_dict) + + # Use small parameters for faster testing + fuzzy_deduplicated_data = fuzzy_filter( + data_dict, + threshold=0.5, # Lower threshold for faster testing + ngram_size=3, # Smaller ngram size + bands=4, # Fewer bands + rows_per_band=32 # Fewer rows per band + ) + + fuzzy_count = _count_items(fuzzy_deduplicated_data) + + # Should have fewer or equal items after fuzzy deduplication + self.assertLessEqual(fuzzy_count, original_count) + + # Structure should be preserved + self.assertEqual(set(data_dict.keys()), set(fuzzy_deduplicated_data.keys())) + + except NameError as e: + self.skipTest(f"Required functions not available: {e}") + + def test_flatten_and_reconstruct(self): + """Test flattening and reconstruction of data.""" + try: + data_dict = convert_df_to_dict(self.df) + original_count = _count_items(data_dict) + + # Flatten + flattened_data = flatten_data(data_dict) + self.assertEqual(len(flattened_data), original_count) + + # Check metadata fields were added + if flattened_data: + sample_row = flattened_data[0] + self.assertIn('_run_mode', sample_row) + self.assertIn('_run_success', sample_row) + self.assertIn('_score_duration', sample_row) + + except NameError as e: + self.skipTest(f"Required functions not available: {e}") + + def test_dedup_df_end_to_end(self): + """Test the complete deduplication pipeline.""" + try: + original_length = len(self.df) + + # Run the complete deduplication pipeline + deduplicated_df = dedup_df(self.df) + + # Should return a DataFrame + self.assertIsInstance(deduplicated_df, pd.DataFrame) + + # Should have fewer or equal rows + self.assertLessEqual(len(deduplicated_df), original_length) + + # Should preserve required columns + required_columns = ['submission_id', 'code', 'run_mode', 'run_passed'] + for col in required_columns: + self.assertIn(col, deduplicated_df.columns) + + # Check data integrity + self.assertFalse(deduplicated_df.empty, "Deduplicated DataFrame should not be empty") + + except NameError as e: + self.skipTest(f"dedup_df function not available: {e}") + + def test_parquet_creation(self): + """Test Parquet file creation.""" + try: + data_dict = convert_df_to_dict(self.df) + + with tempfile.NamedTemporaryFile(suffix='.parquet', delete=False) as tmp_file: + try: + create_parquet_file(data_dict, tmp_file.name) + + # Check file was created + self.assertTrue(os.path.exists(tmp_file.name)) + + # Check file is not empty + self.assertGreater(os.path.getsize(tmp_file.name), 0) + + # Try to read the file back + df_from_parquet = pd.read_parquet(tmp_file.name) + self.assertIsInstance(df_from_parquet, pd.DataFrame) + self.assertGreater(len(df_from_parquet), 0) + + finally: + # Clean up + if os.path.exists(tmp_file.name): + os.unlink(tmp_file.name) + + except NameError as e: + self.skipTest(f"Required functions not available: {e}") + + def test_data_consistency_after_deduplication(self): + """Test that data remains consistent after deduplication.""" + try: + # Create dataset with known duplicates + duplicate_data = [] + + # Add the same code 3 times with different metadata + base_entry = self.fake_data[0].copy() + for i in range(3): + entry = base_entry.copy() + entry['submission_id'] = 9000 + i + entry['run_id'] = 9100 + i + duplicate_data.append(entry) + + # Add to main dataset + test_data = self.fake_data + duplicate_data + test_df = pd.DataFrame(test_data) + + original_length = len(test_df) + deduplicated_df = dedup_df(test_df) + + # Should have removed at least 2 duplicates + self.assertLess(len(deduplicated_df), original_length) + + # Check that essential fields are preserved + self.assertTrue(all(col in deduplicated_df.columns for col in + ['submission_id', 'code', 'run_mode', 'run_passed'])) + + except NameError as e: + self.skipTest(f"Required functions not available: {e}") + + def test_schema_compliance(self): + """Test that the fake dataset matches the expected schema from the database.""" + # Test all required fields exist and have correct types + + # Test BIGINT fields + bigint_fields = ['submission_id', 'leaderboard_id', 'user_id', 'code_id', 'run_id'] + for field in bigint_fields: + self.assertTrue(self.df[field].dtype in ['int64', 'int32'], + f"{field} should be integer type") + + # Test VARCHAR fields + varchar_fields = ['file_name', 'code', 'run_mode'] + for field in varchar_fields: + self.assertTrue(self.df[field].dtype == 'object', + f"{field} should be string/object type") + + # Test TIMESTAMP fields + timestamp_fields = ['submission_time', 'run_start_time', 'run_end_time'] + for field in timestamp_fields: + # Check that all values are datetime objects with timezone + sample_value = self.df[field].iloc[0] + self.assertIsInstance(sample_value, datetime) + self.assertIsNotNone(sample_value.tzinfo) + + # Test DOUBLE field + self.assertTrue(self.df['run_score'].dtype in ['float64', 'float32']) + + # Test BOOLEAN field + self.assertTrue(self.df['run_passed'].dtype == 'bool') + + # Test STRUCT fields + struct_fields = ['run_result', 'run_compilation', 'run_meta', 'run_system_info'] + for field in struct_fields: + # All values should be dictionaries + self.assertTrue(all(isinstance(val, dict) for val in self.df[field])) + + def test_duplicate_detection(self): + """Test that we can detect exact and near duplicates in the dataset.""" + # Count exact duplicates by code + code_counts = self.df['code'].value_counts() + exact_duplicates = code_counts[code_counts > 1] + + # Should have some exact duplicates (first 5 entries) + self.assertGreater(len(exact_duplicates), 0, "Should have exact duplicates for testing") + + # Check that fuzzy duplicates exist (entries with similar code) + similar_code_count = 0 + base_code = "def hello_world():\n print('Hello World')" + for code in self.df['code']: + if base_code in code and code != base_code: + similar_code_count += 1 + + self.assertGreater(similar_code_count, 0, "Should have fuzzy duplicates for testing") + + +class TestIntegrationWithParquetFixtures(unittest.TestCase): + """Integration tests using real parquet fixtures.""" + + FIXTURES_DIR = os.path.join(os.path.dirname(__file__), 'fixtures') + + @classmethod + def setUpClass(cls): + """Load parquet fixtures once for all tests.""" + submissions_path = os.path.join(cls.FIXTURES_DIR, 'submissions_fixture.parquet') + if os.path.exists(submissions_path): + cls.submissions_df = pd.read_parquet(submissions_path) + # Decode bytes to string if needed + if cls.submissions_df['code'].dtype == object and len(cls.submissions_df) > 0: + if isinstance(cls.submissions_df['code'].iloc[0], bytes): + cls.submissions_df['code'] = cls.submissions_df['code'].apply( + lambda x: x.decode('utf-8') if isinstance(x, bytes) else x + ) + else: + cls.submissions_df = None + + def test_exact_dedup_on_fixture(self): + """Test exact deduplication on real fixture data.""" + if self.submissions_df is None: + self.skipTest("Fixture not available") + + data_dict = convert_df_to_dict(self.submissions_df) + original_count = _count_items(data_dict) + + deduplicated = remove_duplicates(data_dict) + dedup_count = _count_items(deduplicated) + + # Should have same or fewer items + self.assertLessEqual(dedup_count, original_count) + # Structure should be preserved + self.assertEqual(set(data_dict.keys()), set(deduplicated.keys())) + + def test_fuzzy_dedup_on_fixture(self): + """Test fuzzy deduplication on real fixture data.""" + if self.submissions_df is None: + self.skipTest("Fixture not available") + + data_dict = convert_df_to_dict(self.submissions_df) + original_count = _count_items(data_dict) + + # Use smaller parameters for faster testing + fuzzy_deduped = fuzzy_filter( + data_dict, + threshold=0.5, + ngram_size=3, + bands=4, + rows_per_band=32 + ) + + fuzzy_count = _count_items(fuzzy_deduped) + + # Should have same or fewer items + self.assertLessEqual(fuzzy_count, original_count) + + def test_full_pipeline_on_fixture(self): + """Test the full dedup pipeline on real fixture data.""" + if self.submissions_df is None: + self.skipTest("Fixture not available") + + data_dict = convert_df_to_dict(self.submissions_df) + original_count = _count_items(data_dict) + + # Run exact dedup first + exact_deduped = remove_duplicates(data_dict) + + # Then fuzzy dedup with smaller params + fuzzy_deduped = fuzzy_filter( + exact_deduped, + threshold=0.5, + ngram_size=3, + bands=4, + rows_per_band=32 + ) + + # Flatten to DataFrame + flattened = flatten_data(fuzzy_deduped) + result_df = pd.DataFrame(flattened) + + # Verify output + self.assertIsInstance(result_df, pd.DataFrame) + self.assertLessEqual(len(result_df), original_count) + + # Should preserve key columns + if len(result_df) > 0: + self.assertIn('code', result_df.columns) + self.assertIn('run_mode', result_df.columns) + + +if __name__ == '__main__': + # Add some helpful output + print("Running deduplication pipeline tests...") + print(f"Python version: {sys.version}") + print(f"Pandas version: {pd.__version__}") + + # Run the tests + unittest.main(verbosity=2) \ No newline at end of file