Skip to content

Commit e761ec0

Browse files
author
Logan Sulpizio
committed
Reduce memory usage in CSV parsing and parquet upsert, optimized for lower RAM rather than performance
1 parent 1ed5f43 commit e761ec0

1 file changed

Lines changed: 87 additions & 66 deletions

File tree

job/rent_files/upsert_weekly_rent_csv_to_parquet.py

Lines changed: 87 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from __future__ import annotations
22

33
import csv
4+
import math
45
import re
6+
from collections import defaultdict
57
from pathlib import Path
68
from typing import IO, Any, Dict, List, Optional, Tuple, Union
79

@@ -82,7 +84,6 @@ def _detect_header_and_token_row(
8284
header_row_idx: Optional[int] = None
8385
investor_column_name_found: Optional[str] = None
8486

85-
# 1) Detect header row
8687
for idx, row in enumerate(preview_rows):
8788
normalized = [_normalize(c) for c in row]
8889

@@ -100,19 +101,15 @@ def _detect_header_and_token_row(
100101
f"Could not detect header row containing one of: {investor_column_candidates}"
101102
)
102103

103-
# 2) Detect token row:
104-
# first try above header, then below header
105104
token_row_idx: Optional[int] = None
106105

107-
# Search above header
108106
start_above = max(0, header_row_idx - token_row_search_window)
109107
for idx in range(header_row_idx - 1, start_above - 1, -1):
110108
normalized = [_normalize(c) for c in preview_rows[idx]]
111109
if any(_is_evm_address(c) for c in normalized):
112110
token_row_idx = idx
113111
break
114112

115-
# Search below header if not found above
116113
if token_row_idx is None:
117114
end_below = min(len(preview_rows), header_row_idx + 1 + token_row_search_window)
118115
for idx in range(header_row_idx + 1, end_below):
@@ -183,20 +180,47 @@ def _normalize_long_df(df: pd.DataFrame) -> pd.DataFrame:
183180
return df.copy()
184181

185182
out = df.copy()
186-
out["year"] = out["year"].astype(int)
187-
out["week"] = out["week"].astype(int)
183+
out["year"] = out["year"].astype("int32")
184+
out["week"] = out["week"].astype("int16")
188185
out["currency"] = out["currency"].astype(str).str.upper().str.strip()
189186
out["investor"] = out["investor"].astype(str).str.lower().str.strip()
190187
out["token"] = out["token"].astype(str).str.lower().str.strip()
191-
out["amount"] = pd.to_numeric(out["amount"], errors="coerce").fillna(0.0).astype(float)
188+
out["amount"] = pd.to_numeric(out["amount"], errors="coerce").fillna(0.0).astype("float64")
192189

193190
out = out.sort_values(
194-
by=["year", "week", "currency", "investor", "token"]
191+
by=["year", "week", "currency", "investor", "token"],
192+
kind="mergesort",
195193
).reset_index(drop=True)
196194

197195
return out
198196

199197

198+
def _safe_float(value: Any) -> float:
199+
text = _normalize(value)
200+
if text == "":
201+
return 0.0
202+
203+
try:
204+
number = float(text)
205+
except (TypeError, ValueError):
206+
return 0.0
207+
208+
if not math.isfinite(number):
209+
return 0.0
210+
211+
return number
212+
213+
214+
def _read_header_columns(csv_path: Union[str, Path], header_row_idx: int) -> List[str]:
215+
with open(csv_path, "r", encoding="utf-8-sig", newline="") as stream:
216+
reader = csv.reader(stream, delimiter=",")
217+
for idx, row in enumerate(reader):
218+
if idx == header_row_idx:
219+
return [_normalize(c) for c in row]
220+
221+
raise ValueError("Could not read detected header row from CSV.")
222+
223+
200224
def _parse_weekly_csv_to_long_df(
201225
csv_path: Union[str, Path],
202226
*,
@@ -206,13 +230,7 @@ def _parse_weekly_csv_to_long_df(
206230
) -> pd.DataFrame:
207231
header_row_idx, token_row, investor_column_name = _detect_header_and_token_row(csv_path)
208232

209-
header_df = pd.read_csv(
210-
csv_path,
211-
skiprows=header_row_idx,
212-
nrows=0,
213-
engine="c",
214-
)
215-
columns = list(header_df.columns)
233+
columns = _read_header_columns(csv_path, header_row_idx)
216234

217235
if investor_column_name not in columns:
218236
raise ValueError(f"Column '{investor_column_name}' not found after header detection.")
@@ -223,67 +241,67 @@ def _parse_weekly_csv_to_long_df(
223241
if not token_columns:
224242
raise ValueError("No token columns detected.")
225243

226-
usecols = [investor_column_name] + token_columns
244+
investor_col_idx = columns.index(investor_column_name)
245+
token_col_indices = [(columns.index(col), col_to_token[col].lower()) for col in token_columns]
227246

228-
df = pd.read_csv(
229-
csv_path,
230-
skiprows=header_row_idx,
231-
header=0,
232-
usecols=usecols,
233-
engine="c",
234-
)
247+
aggregated: Dict[Tuple[str, str], float] = defaultdict(float)
235248

236-
df[investor_column_name] = (
237-
df[investor_column_name]
238-
.astype(str)
239-
.str.strip()
240-
.str.lower()
241-
)
242-
243-
investor_mask = df[investor_column_name].apply(_is_evm_address)
244-
investors_df = df.loc[investor_mask].copy()
245-
246-
if investors_df.empty:
247-
raise ValueError(
248-
f"No valid EVM investor address found in the '{investor_column_name}' column."
249-
)
250-
251-
token_block = investors_df[token_columns].apply(pd.to_numeric, errors="coerce").fillna(0.0)
249+
with open(csv_path, "r", encoding="utf-8-sig", newline="") as stream:
250+
reader = csv.reader(stream, delimiter=",")
252251

253-
token_labels = [col_to_token[col].lower() for col in token_columns]
254-
token_block.columns = token_labels
255-
token_block = token_block.T.groupby(level=0).sum().T
252+
for row_idx, row in enumerate(reader):
253+
if row_idx <= header_row_idx:
254+
continue
256255

257-
long_df = (
258-
token_block
259-
.assign(investor=investors_df[investor_column_name].values)
260-
.melt(id_vars=["investor"], var_name="token", value_name="amount")
261-
)
256+
investor_raw = row[investor_col_idx] if investor_col_idx < len(row) else ""
257+
investor = _normalize(investor_raw).lower()
262258

263-
long_df["investor"] = long_df["investor"].astype(str).str.lower().str.strip()
264-
long_df["token"] = long_df["token"].astype(str).str.lower().str.strip()
259+
if not _is_evm_address(investor):
260+
continue
265261

266-
long_df = long_df[long_df["amount"] != 0.0].copy()
262+
for col_idx, token in token_col_indices:
263+
raw_amount = row[col_idx] if col_idx < len(row) else ""
264+
amount = _safe_float(raw_amount)
267265

268-
long_df = (
269-
long_df.groupby(["investor", "token"], as_index=False)["amount"]
270-
.sum()
271-
)
266+
if amount != 0.0:
267+
aggregated[(investor, token)] += amount
272268

273-
if long_df.empty:
269+
if not aggregated:
274270
raise ValueError(
275271
"Parsing succeeded structurally, but no non-zero investor/token revenue row was found."
276272
)
277273

278-
if not long_df["investor"].map(_is_evm_address).all():
279-
raise ValueError("At least one parsed investor is not a valid EVM address.")
274+
records = []
275+
for (investor, token), amount in aggregated.items():
276+
if amount == 0.0:
277+
continue
278+
279+
if not _is_evm_address(investor):
280+
raise ValueError("At least one parsed investor is not a valid EVM address.")
281+
282+
if not _is_evm_address(token):
283+
raise ValueError("At least one parsed token is not a valid EVM address.")
284+
285+
records.append(
286+
{
287+
"year": int(year),
288+
"week": int(week),
289+
"currency": paid_in_currency.upper().strip(),
290+
"investor": investor,
291+
"token": token,
292+
"amount": float(amount),
293+
}
294+
)
280295

281-
if not long_df["token"].map(_is_evm_address).all():
282-
raise ValueError("At least one parsed token is not a valid EVM address.")
296+
if not records:
297+
raise ValueError(
298+
"Parsing succeeded structurally, but no non-zero investor/token revenue row was found."
299+
)
283300

284-
long_df.insert(0, "currency", paid_in_currency.upper())
285-
long_df.insert(0, "week", int(week))
286-
long_df.insert(0, "year", int(year))
301+
long_df = pd.DataFrame.from_records(
302+
records,
303+
columns=["year", "week", "currency", "investor", "token", "amount"],
304+
)
287305

288306
return _normalize_long_df(long_df)
289307

@@ -328,10 +346,13 @@ def upsert_weekly_rent_csv_to_parquet(
328346
new_week_df.to_parquet(parquet_path, index=False, engine="pyarrow")
329347
return parquet_path
330348

331-
existing_df = pd.read_parquet(parquet_path, engine="pyarrow")
332-
existing_df = _normalize_long_df(existing_df)
349+
existing_df = pd.read_parquet(
350+
parquet_path,
351+
columns=["year", "week", "currency", "investor", "token", "amount"],
352+
engine="pyarrow",
353+
)
333354

334-
updated_df = pd.concat([existing_df, new_week_df], ignore_index=True)
355+
updated_df = pd.concat([existing_df, new_week_df], ignore_index=True, copy=False)
335356
updated_df = _normalize_long_df(updated_df)
336357

337358
updated_df.to_parquet(parquet_path, index=False, engine="pyarrow")

0 commit comments

Comments
 (0)