Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions poly_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ def get_markets(main_file: str = "markets.csv", missing_file: str = "missing_mar
"""
import polars as pl

# Schema overrides for long token IDs
# Schema overrides for long token IDs and hex condition/market ids
schema_overrides = {
"token1": pl.Utf8, # 76-digit ids → strings
"token2": pl.Utf8,
"token1": pl.Utf8, # 76-digit ids → strings
"token2": pl.Utf8,
"id": pl.Utf8, # hex market ids like 0xcb9a...
"condition_id": pl.Utf8, # hex condition ids
}

dfs = []
Expand Down
250 changes: 174 additions & 76 deletions update_utils/process_live.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,61 @@
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import io
import json
import polars as pl
from poly_utils.utils import get_markets, update_missing_tokens

import subprocess

import pandas as pd

def get_processed_df(df):
OFFSET_FILE = 'processed/source_offset.json'
BOOTSTRAP_CHUNK_ROWS = 100_000 # rows per chunk (~8 MB each) during full-file scans

def _load_offset():
if os.path.exists(OFFSET_FILE):
with open(OFFSET_FILE, 'r') as f:
return json.load(f).get('byte_offset', 0)
return 0

def _save_offset(byte_offset):
os.makedirs('processed', exist_ok=True)
with open(OFFSET_FILE, 'w') as f:
json.dump({'byte_offset': byte_offset}, f)

def _load_markets_long():
"""Load and pivot markets once; reuse across chunks."""
markets_df = get_markets()
markets_df = markets_df.rename({'id': 'market_id'})

# 1) Make markets long: (market_id, side, asset_id) where side ∈ {"token1", "token2"}
markets_long = (
markets_df
.select(["market_id", "token1", "token2"])
.melt(id_vars="market_id", value_vars=["token1", "token2"],
variable_name="side", value_name="asset_id")
variable_name="side", value_name="asset_id")
)
return markets_long

def get_processed_df(df, markets_long=None):
if markets_long is None:
markets_long = _load_markets_long()

# 2) Identify the non-USDC asset for each trade (the one that isn't 0)
# Identify the non-USDC asset for each trade
df = df.with_columns(
pl.when(pl.col("makerAssetId") != "0")
.then(pl.col("makerAssetId"))
.otherwise(pl.col("takerAssetId"))
.alias("nonusdc_asset_id")
)

# 3) Join once on that non-USDC asset to recover the market + side ("token1" or "token2")
# Join to recover market_id + side
df = df.join(
markets_long,
left_on="nonusdc_asset_id",
right_on="asset_id",
how="left",
)

# 4) label columns and keep market_id
df = df.with_columns([
pl.when(pl.col("makerAssetId") == "0").then(pl.lit("USDC")).otherwise(pl.col("side")).alias("makerAsset"),
pl.when(pl.col("takerAssetId") == "0").then(pl.lit("USDC")).otherwise(pl.col("side")).alias("takerAsset"),
Expand All @@ -54,20 +73,12 @@ def get_processed_df(df):
(pl.col("takerAmountFilled") / 10**6).alias("takerAmountFilled"),
])

df = df.with_columns(
pl.when(pl.col("takerAsset") == "USDC")
.then(pl.lit("BUY"))
.otherwise(pl.lit("SELL"))
.alias("taker_direction")
)

df = df.with_columns([
pl.when(pl.col("takerAsset") == "USDC")
.then(pl.lit("BUY"))
.otherwise(pl.lit("SELL"))
.alias("taker_direction"),

# reverse of taker_direction
pl.when(pl.col("takerAsset") == "USDC")
.then(pl.lit("SELL"))
.otherwise(pl.lit("BUY"))
Expand All @@ -84,102 +95,189 @@ def get_processed_df(df):
.then(pl.col("takerAmountFilled"))
.otherwise(pl.col("makerAmountFilled"))
.alias("usd_amount"),

pl.when(pl.col("takerAsset") != "USDC")
.then(pl.col("takerAmountFilled"))
.otherwise(pl.col("makerAmountFilled"))
.alias("token_amount"),

pl.when(pl.col("takerAsset") == "USDC")
.then(pl.col("takerAmountFilled") / pl.col("makerAmountFilled"))
.otherwise(pl.col("makerAmountFilled") / pl.col("takerAmountFilled"))
.cast(pl.Float64)
.alias("price")
])


df = df[['timestamp', 'market_id', 'maker', 'taker', 'nonusdc_side', 'maker_direction', 'taker_direction', 'price', 'usd_amount', 'token_amount', 'transactionHash']]
return df


def _process_chunked(source_file, op_file, schema_overrides, markets_long,
skip_before_epoch=None, anchor=None):
"""
Stream source_file in BOOTSTRAP_CHUNK_ROWS chunks, optionally skipping rows
before skip_before_epoch and before the anchor row.

def process_live():
processed_file = 'processed/trades.csv'
anchor dict keys: timestamp (pd.Timestamp), transactionHash, maker, taker
Returns total rows written.
"""
reader = pl.read_csv_batched(
source_file,
schema_overrides=schema_overrides,
batch_size=BOOTSTRAP_CHUNK_ROWS,
)

print("=" * 60)
print("🔄 Processing Live Trades")
print("=" * 60)
anchor_found = (anchor is None) # if no anchor needed, treat as already found
total_written = 0
chunk_num = 0

last_processed = None
while True:
batches = reader.next_batches(1)
if not batches:
break
batch = batches[0]

if os.path.exists(processed_file):
print(f"✓ Found existing processed file: {processed_file}")
result = subprocess.run(['tail', '-n', '1', processed_file], capture_output=True, text=True)
last_line = result.stdout.strip()
splitted = last_line.split(',')
batch = batch.with_columns(
pl.from_epoch(pl.col('timestamp'), time_unit='s').alias('timestamp')
)

last_processed = {
'timestamp': pd.to_datetime(splitted[0]),
'transactionHash': splitted[-1],
'maker': splitted[2],
'taker': splitted[3],
}
# Fast-skip entire chunks that end before the resume point
if skip_before_epoch is not None:
max_ts = batch['timestamp'].max()
if max_ts is not None and max_ts < pd.Timestamp(skip_before_epoch, unit='s', tz='UTC'):
continue

print(f"📍 Resuming from: {last_processed['timestamp']}")
print(f" Last hash: {last_processed['transactionHash'][:16]}...")
else:
print("⚠ No existing processed file found - processing from beginning")
if not anchor_found:
# Find anchor row and discard everything up to and including it
batch = batch.with_row_index("_idx")
hits = batch.filter(
(pl.col('timestamp') == anchor['timestamp']) &
(pl.col('transactionHash') == anchor['transactionHash']) &
(pl.col('maker') == anchor['maker']) &
(pl.col('taker') == anchor['taker'])
)
if len(hits) > 0:
cut = hits['_idx'][-1] # last matching row (most conservative)
batch = batch.filter(pl.col('_idx') > cut)
anchor_found = True
batch = batch.drop('_idx')

print(f"\n📂 Reading: goldsky/orderFilled.csv")
if len(batch) == 0:
continue

schema_overrides = {
"takerAssetId": pl.Utf8,
"makerAssetId": pl.Utf8,
}
new_df = get_processed_df(batch, markets_long)
total_written += len(new_df)
chunk_num += 1

df = pl.scan_csv("goldsky/orderFilled.csv", schema_overrides=schema_overrides).collect(streaming=True)
df = df.with_columns(
pl.from_epoch(pl.col('timestamp'), time_unit='s').alias('timestamp')
)
file_exists = os.path.isfile(op_file)
with open(op_file, mode="ab") as f:
new_df.write_csv(f, include_header=not file_exists)

print(f"✓ Loaded {len(df):,} rows")
if chunk_num % 10 == 0:
print(f" [chunk {chunk_num}] +{len(new_df):,} rows (total written: {total_written:,})")

df = df.with_row_index()
return total_written

if last_processed is None:
df_process = df.drop('index')
else:
same_timestamp = df.filter(pl.col('timestamp') == last_processed['timestamp'])
same_timestamp = same_timestamp.filter(
(pl.col("transactionHash") == last_processed['transactionHash']) & (pl.col("maker") == last_processed['maker']) & (pl.col("taker") == last_processed['taker'])
)

if same_timestamp.is_empty():
print("⚠ Last processed row not found in source data; processing all rows")
df_process = df.drop('index')
else:
df_process = df.filter(pl.col('index') > same_timestamp.row(0)[0]).drop('index')
def process_live():
source_file = 'goldsky/orderFilled.csv'
op_file = 'processed/trades.csv'

print(f"⚙️ Processing {len(df_process):,} new rows...")
print("=" * 60)
print("[*] Processing Live Trades")
print("=" * 60)

new_df = get_processed_df(df_process)

if not os.path.isdir('processed'):
os.makedirs('processed')
schema_overrides = {
"takerAssetId": pl.Utf8,
"makerAssetId": pl.Utf8,
}

current_size = os.path.getsize(source_file)
last_offset = _load_offset()

os.makedirs('processed', exist_ok=True)

# ------------------------------------------------------------------
# PATH 1 — Normal incremental run (offset file exists)
# Seek directly to byte offset; only read new bytes appended since
# the last run. No full-file scan needed.
# ------------------------------------------------------------------
if last_offset > 0 and last_offset <= current_size:
print(f"[*] Incremental read: {(current_size - last_offset) / 1e6:.1f} MB new data")
with open(source_file, 'rb') as f:
header = f.readline()
if last_offset <= len(header):
new_bytes = f.read()
else:
f.seek(last_offset)
new_bytes = f.read()

if not new_bytes.strip():
print("[*] No new data since last run.")
_save_offset(current_size)
return

buf = io.BytesIO(header + new_bytes)
df = pl.read_csv(buf, schema_overrides=schema_overrides)
df = df.with_columns(
pl.from_epoch(pl.col('timestamp'), time_unit='s').alias('timestamp')
)
print(f"[+] Loaded {len(df):,} new rows")

markets_long = _load_markets_long()
new_df = get_processed_df(df, markets_long)
print(f"[~] Writing {len(new_df):,} processed rows...")

file_exists = os.path.isfile(op_file)
with open(op_file, mode="ab") as f:
new_df.write_csv(f, include_header=not file_exists)

# ------------------------------------------------------------------
# PATH 2 — Bootstrap: output file exists but no offset file.
# Chunked scan to avoid loading 37 GB into memory at once.
# ------------------------------------------------------------------
elif last_offset == 0 and os.path.exists(op_file) and os.path.getsize(op_file) > 0:
print(f"[!] No offset file — bootstrapping via chunked scan ({BOOTSTRAP_CHUNK_ROWS:,} rows/chunk)")

result = subprocess.run(['tail', '-n', '1', op_file], capture_output=True, text=True)
last_line = result.stdout.strip().split(',')
last_ts = pd.to_datetime(last_line[0])
last_epoch = int(last_ts.timestamp())
anchor = {
'timestamp': last_ts,
'transactionHash': last_line[-1],
'maker': last_line[2],
'taker': last_line[3],
}
print(f"[>] Resuming from: {last_ts}")

op_file = 'processed/trades.csv'
markets_long = _load_markets_long()
total = _process_chunked(
source_file, op_file, schema_overrides, markets_long,
skip_before_epoch=last_epoch,
anchor=anchor,
)
print(f"[*] Bootstrap complete — {total:,} new rows written")

if not os.path.isfile(op_file):
new_df.write_csv(op_file)
print(f"✓ Created new file: processed/trades.csv")
# ------------------------------------------------------------------
# PATH 3 — First-ever run: no output file at all.
# Also chunked to stay within RAM.
# ------------------------------------------------------------------
else:
print(f"✓ Appending {len(new_df):,} rows to processed/trades.csv")
with open(op_file, mode="a") as f:
new_df.write_csv(f, include_header=False)
print(f"[!] No existing output — full chunked scan ({BOOTSTRAP_CHUNK_ROWS:,} rows/chunk)")
markets_long = _load_markets_long()
total = _process_chunked(
source_file, op_file, schema_overrides, markets_long,
)
print(f"[*] Initial processing complete — {total:,} rows written")

# Save offset so next run is O(new bytes only)
_save_offset(current_size)


print("=" * 60)
print(" Processing complete!")
print("[OK] Processing complete!")
print("=" * 60)



if __name__ == "__main__":
process_live()
process_live()
Loading