-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconvert_csv_to_parquet.py
More file actions
76 lines (68 loc) · 2.82 KB
/
convert_csv_to_parquet.py
File metadata and controls
76 lines (68 loc) · 2.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import duckdb
import json
import argparse
import os
import math
# python3 convert_csv_to_parquet.py --csv_file /dev/shm/ss13husallm.csv --schema_file /xdbc-client/tests/schemas/ss13husallm.json --output_dir /dev/shm --max_size_mb 64
# Parse arguments
parser = argparse.ArgumentParser(description="Split CSV into Parquet files of fixed size using DuckDB.")
parser.add_argument('--csv_file', type=str, required=True, help="Path to the input CSV file.")
parser.add_argument('--schema_file', type=str, required=True, help="Path to the JSON schema file.")
parser.add_argument('--output_dir', type=str, required=True, help="Directory to store the Parquet splits.")
parser.add_argument('--max_size_mb', type=int, required=True, help="Maximum size of each Parquet file in MB.")
args = parser.parse_args()
# Load schema
with open(args.schema_file, 'r') as f:
schema = json.load(f)
# Create output directory named after the CSV file (without .csv)
base_name = os.path.splitext(os.path.basename(args.csv_file))[0]
output_dir = os.path.join(args.output_dir, base_name)
os.makedirs(output_dir, exist_ok=True)
# Prepare column definitions for DuckDB
columns = []
column_definitions = []
for idx, col in enumerate(schema):
col_name = col['name']
if col['type'] == 'INT':
columns.append(col_name)
column_definitions.append(f"{col_name} INTEGER")
elif col['type'] == 'DOUBLE':
columns.append(col_name)
column_definitions.append(f"{col_name} DOUBLE")
elif col['type'] == 'CHAR':
columns.append(col_name)
column_definitions.append(f"{col_name} CHAR({col['size']})")
elif col['type'] == 'STRING':
columns.append(col_name)
column_definitions.append(f"{col_name} VARCHAR({col['size']})")
column_definitions_sql = ", ".join(column_definitions)
delim = ","
if base_name == "lineitem_sf10":
delim = "|"
# Load CSV into DuckDB in-memory table with schema
conn = duckdb.connect()
conn.execute(f"""
CREATE TABLE temp_table ({column_definitions_sql});
""")
conn.execute(f"""
COPY temp_table FROM '{args.csv_file}' (DELIMITER '{delim}', HEADER FALSE);
""")
# Calculate rows per split
row_count = conn.execute("SELECT COUNT(*) FROM temp_table").fetchone()[0]
bytes_per_row = sum(col['size'] for col in schema)
rows_per_split = math.floor((args.max_size_mb * 1024 * 1024) / bytes_per_row)
# Split and write Parquet files
part_number = 0
for start in range(0, row_count, rows_per_split):
end = min(start + rows_per_split, row_count)
parquet_file = os.path.join(output_dir, f"{base_name}_part{part_number}.parquet")
query = f"""
COPY (
SELECT * FROM temp_table
LIMIT {rows_per_split} OFFSET {start}
) TO '{parquet_file}' (FORMAT PARQUET);
"""
conn.execute(query)
print(f"Written: {parquet_file}")
part_number += 1
print("Splitting completed.")