From 9f739d59f0a9e1033168834548cb3cca81edb707 Mon Sep 17 00:00:00 2001 From: Developer Date: Wed, 22 Apr 2026 12:18:33 +0100 Subject: [PATCH] Fix hung scraper by adding HTTP timeout; bundle related improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add timeout=30 to RequestsHTTPTransport in update_goldsky.py (both initial creation and retry recreation). Without a timeout, a stale TCP connection blocks the scraper indefinitely — observed as 19+ hours of zero CPU and no cursor advancement. - Add process lock file (_acquire_lock/_release_lock) to prevent duplicate scraper instances. - Buffer CSV writes (50-batch buffer) to reduce I/O overhead against the large orderFilled.csv. - Fix sticky-cursor resume from CSV to use last_timestamp - 1 + sticky, avoiding false end-of-data on the boundary second. - process_live.py: add byte-offset tracking (OFFSET_FILE) so incremental runs only process new rows rather than re-scanning the full CSV. - poly_utils/utils.py: extend schema_overrides with id and condition_id (hex strings) to prevent silent type coercion. Co-Authored-By: Claude Sonnet 4.6 --- poly_utils/utils.py | 8 +- update_utils/process_live.py | 256 +++++++++++++++++++++++---------- update_utils/update_goldsky.py | 129 +++++++++++++---- 3 files changed, 287 insertions(+), 106 deletions(-) diff --git a/poly_utils/utils.py b/poly_utils/utils.py index 021dbad..b22823f 100644 --- a/poly_utils/utils.py +++ b/poly_utils/utils.py @@ -16,10 +16,12 @@ def get_markets(main_file: str = "markets.csv", missing_file: str = "missing_mar """ import polars as pl - # Schema overrides for long token IDs + # Schema overrides for long token IDs and hex condition/market ids schema_overrides = { - "token1": pl.Utf8, # 76-digit ids → strings - "token2": pl.Utf8, + "token1": pl.Utf8, # 76-digit ids → strings + "token2": pl.Utf8, + "id": pl.Utf8, # hex market ids like 0xcb9a... + "condition_id": pl.Utf8, # hex condition ids } dfs = [] diff --git a/update_utils/process_live.py b/update_utils/process_live.py index d8b76be..7c0f38e 100644 --- a/update_utils/process_live.py +++ b/update_utils/process_live.py @@ -5,6 +5,8 @@ import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import io +import json import polars as pl from poly_utils.utils import get_markets, update_missing_tokens @@ -12,19 +14,37 @@ import pandas as pd -def get_processed_df(df): +OFFSET_FILE = 'processed/source_offset.json' +BOOTSTRAP_CHUNK_ROWS = 100_000 # rows per chunk (~8 MB each) during full-file scans + +def _load_offset(): + if os.path.exists(OFFSET_FILE): + with open(OFFSET_FILE, 'r') as f: + return json.load(f).get('byte_offset', 0) + return 0 + +def _save_offset(byte_offset): + os.makedirs('processed', exist_ok=True) + with open(OFFSET_FILE, 'w') as f: + json.dump({'byte_offset': byte_offset}, f) + +def _load_markets_long(): + """Load and pivot markets once; reuse across chunks.""" markets_df = get_markets() markets_df = markets_df.rename({'id': 'market_id'}) - - # 1) Make markets long: (market_id, side, asset_id) where side ∈ {"token1", "token2"} markets_long = ( markets_df .select(["market_id", "token1", "token2"]) .melt(id_vars="market_id", value_vars=["token1", "token2"], - variable_name="side", value_name="asset_id") + variable_name="side", value_name="asset_id") ) + return markets_long + +def get_processed_df(df, markets_long=None): + if markets_long is None: + markets_long = _load_markets_long() - # 2) Identify the non-USDC asset for each trade (the one that isn't 0) + # Identify the non-USDC asset for each trade df = df.with_columns( pl.when(pl.col("makerAssetId") != "0") .then(pl.col("makerAssetId")) @@ -32,7 +52,7 @@ def get_processed_df(df): .alias("nonusdc_asset_id") ) - # 3) Join once on that non-USDC asset to recover the market + side ("token1" or "token2") + # Join to recover market_id + side df = df.join( markets_long, left_on="nonusdc_asset_id", @@ -40,7 +60,6 @@ def get_processed_df(df): how="left", ) - # 4) label columns and keep market_id df = df.with_columns([ pl.when(pl.col("makerAssetId") == "0").then(pl.lit("USDC")).otherwise(pl.col("side")).alias("makerAsset"), pl.when(pl.col("takerAssetId") == "0").then(pl.lit("USDC")).otherwise(pl.col("side")).alias("takerAsset"), @@ -54,20 +73,12 @@ def get_processed_df(df): (pl.col("takerAmountFilled") / 10**6).alias("takerAmountFilled"), ]) - df = df.with_columns( - pl.when(pl.col("takerAsset") == "USDC") - .then(pl.lit("BUY")) - .otherwise(pl.lit("SELL")) - .alias("taker_direction") - ) - df = df.with_columns([ pl.when(pl.col("takerAsset") == "USDC") .then(pl.lit("BUY")) .otherwise(pl.lit("SELL")) .alias("taker_direction"), - # reverse of taker_direction pl.when(pl.col("takerAsset") == "USDC") .then(pl.lit("SELL")) .otherwise(pl.lit("BUY")) @@ -84,10 +95,12 @@ def get_processed_df(df): .then(pl.col("takerAmountFilled")) .otherwise(pl.col("makerAmountFilled")) .alias("usd_amount"), + pl.when(pl.col("takerAsset") != "USDC") .then(pl.col("takerAmountFilled")) .otherwise(pl.col("makerAmountFilled")) .alias("token_amount"), + pl.when(pl.col("takerAsset") == "USDC") .then(pl.col("takerAmountFilled") / pl.col("makerAmountFilled")) .otherwise(pl.col("makerAmountFilled") / pl.col("takerAmountFilled")) @@ -95,83 +108,176 @@ def get_processed_df(df): .alias("price") ]) - df = df[['timestamp', 'market_id', 'maker', 'taker', 'nonusdc_side', 'maker_direction', 'taker_direction', 'price', 'usd_amount', 'token_amount', 'transactionHash']] return df +def _process_chunked(source_file, op_file, schema_overrides, markets_long, + skip_before_epoch=None, anchor=None): + """ + Stream source_file in BOOTSTRAP_CHUNK_ROWS chunks, optionally skipping rows + before skip_before_epoch and before the anchor row. + + anchor dict keys: timestamp (pd.Timestamp), transactionHash, maker, taker + Returns total rows written. + """ + reader = pl.read_csv_batched( + source_file, + schema_overrides=schema_overrides, + batch_size=BOOTSTRAP_CHUNK_ROWS, + ) + + anchor_found = (anchor is None) # if no anchor needed, treat as already found + total_written = 0 + chunk_num = 0 + + while True: + batches = reader.next_batches(1) + if not batches: + break + batch = batches[0] + + batch = batch.with_columns( + pl.from_epoch(pl.col('timestamp'), time_unit='s').alias('timestamp') + ) + + # Fast-skip entire chunks that end before the resume point + if skip_before_epoch is not None: + max_ts = batch['timestamp'].max() + if max_ts is not None and max_ts < pd.Timestamp(skip_before_epoch, unit='s', tz='UTC'): + continue + + if not anchor_found: + # Find anchor row and discard everything up to and including it + batch = batch.with_row_index("_idx") + hits = batch.filter( + (pl.col('timestamp') == anchor['timestamp']) & + (pl.col('transactionHash') == anchor['transactionHash']) & + (pl.col('maker') == anchor['maker']) & + (pl.col('taker') == anchor['taker']) + ) + if len(hits) > 0: + cut = hits['_idx'][-1] # last matching row (most conservative) + batch = batch.filter(pl.col('_idx') > cut) + anchor_found = True + batch = batch.drop('_idx') + + if len(batch) == 0: + continue + + new_df = get_processed_df(batch, markets_long) + total_written += len(new_df) + chunk_num += 1 + + file_exists = os.path.isfile(op_file) + with open(op_file, mode="ab") as f: + new_df.write_csv(f, include_header=not file_exists) + + if chunk_num % 10 == 0: + print(f" [chunk {chunk_num}] +{len(new_df):,} rows (total written: {total_written:,})") + + return total_written + def process_live(): - processed_file = 'processed/trades.csv' + source_file = 'goldsky/orderFilled.csv' + op_file = 'processed/trades.csv' print("=" * 60) - print("šŸ”„ Processing Live Trades") + print("[*] Processing Live Trades") print("=" * 60) - last_processed = {} - - if os.path.exists(processed_file): - print(f"āœ“ Found existing processed file: {processed_file}") - result = subprocess.run(['tail', '-n', '1', processed_file], capture_output=True, text=True) - last_line = result.stdout.strip() - splitted = last_line.split(',') - - last_processed['timestamp'] = pd.to_datetime(splitted[0]) - last_processed['transactionHash'] = splitted[-1] - last_processed['maker'] = splitted[2] - last_processed['taker'] = splitted[3] - - print(f"šŸ“ Resuming from: {last_processed['timestamp']}") - print(f" Last hash: {last_processed['transactionHash'][:16]}...") - else: - print("⚠ No existing processed file found - processing from beginning") - - print(f"\nšŸ“‚ Reading: goldsky/orderFilled.csv") - schema_overrides = { "takerAssetId": pl.Utf8, "makerAssetId": pl.Utf8, } - df = pl.scan_csv("goldsky/orderFilled.csv", schema_overrides=schema_overrides).collect(streaming=True) - df = df.with_columns( - pl.from_epoch(pl.col('timestamp'), time_unit='s').alias('timestamp') - ) - - print(f"āœ“ Loaded {len(df):,} rows") - - df = df.with_row_index() - - same_timestamp = df.filter(pl.col('timestamp') == last_processed['timestamp']) - same_timestamp = same_timestamp.filter( - (pl.col("transactionHash") == last_processed['transactionHash']) & (pl.col("maker") == last_processed['maker']) & (pl.col("taker") == last_processed['taker']) - ) - - df_process = df.filter(pl.col('index') > same_timestamp.row(0)[0]) - df_process = df_process.drop('index') - - print(f"āš™ļø Processing {len(df_process):,} new rows...") - - new_df = get_processed_df(df_process) - - if not os.path.isdir('processed'): - os.makedirs('processed') - - - op_file = 'processed/trades.csv' - - if not os.path.isfile(op_file): - new_df.write_csv(op_file) - print(f"āœ“ Created new file: processed/trades.csv") + current_size = os.path.getsize(source_file) + last_offset = _load_offset() + + os.makedirs('processed', exist_ok=True) + + # ------------------------------------------------------------------ + # PATH 1 — Normal incremental run (offset file exists) + # Seek directly to byte offset; only read new bytes appended since + # the last run. No full-file scan needed. + # ------------------------------------------------------------------ + if last_offset > 0 and last_offset <= current_size: + print(f"[*] Incremental read: {(current_size - last_offset) / 1e6:.1f} MB new data") + with open(source_file, 'rb') as f: + header = f.readline() + if last_offset <= len(header): + new_bytes = f.read() + else: + f.seek(last_offset) + new_bytes = f.read() + + if not new_bytes.strip(): + print("[*] No new data since last run.") + _save_offset(current_size) + return + + buf = io.BytesIO(header + new_bytes) + df = pl.read_csv(buf, schema_overrides=schema_overrides) + df = df.with_columns( + pl.from_epoch(pl.col('timestamp'), time_unit='s').alias('timestamp') + ) + print(f"[+] Loaded {len(df):,} new rows") + + markets_long = _load_markets_long() + new_df = get_processed_df(df, markets_long) + print(f"[~] Writing {len(new_df):,} processed rows...") + + file_exists = os.path.isfile(op_file) + with open(op_file, mode="ab") as f: + new_df.write_csv(f, include_header=not file_exists) + + # ------------------------------------------------------------------ + # PATH 2 — Bootstrap: output file exists but no offset file. + # Chunked scan to avoid loading 37 GB into memory at once. + # ------------------------------------------------------------------ + elif last_offset == 0 and os.path.exists(op_file) and os.path.getsize(op_file) > 0: + print(f"[!] No offset file — bootstrapping via chunked scan ({BOOTSTRAP_CHUNK_ROWS:,} rows/chunk)") + + result = subprocess.run(['tail', '-n', '1', op_file], capture_output=True, text=True) + last_line = result.stdout.strip().split(',') + last_ts = pd.to_datetime(last_line[0]) + last_epoch = int(last_ts.timestamp()) + anchor = { + 'timestamp': last_ts, + 'transactionHash': last_line[-1], + 'maker': last_line[2], + 'taker': last_line[3], + } + print(f"[>] Resuming from: {last_ts}") + + markets_long = _load_markets_long() + total = _process_chunked( + source_file, op_file, schema_overrides, markets_long, + skip_before_epoch=last_epoch, + anchor=anchor, + ) + print(f"[*] Bootstrap complete — {total:,} new rows written") + + # ------------------------------------------------------------------ + # PATH 3 — First-ever run: no output file at all. + # Also chunked to stay within RAM. + # ------------------------------------------------------------------ else: - print(f"āœ“ Appending {len(new_df):,} rows to processed/trades.csv") - with open(op_file, mode="a") as f: - new_df.write_csv(f, include_header=False) + print(f"[!] No existing output — full chunked scan ({BOOTSTRAP_CHUNK_ROWS:,} rows/chunk)") + markets_long = _load_markets_long() + total = _process_chunked( + source_file, op_file, schema_overrides, markets_long, + ) + print(f"[*] Initial processing complete — {total:,} rows written") + + # Save offset so next run is O(new bytes only) + _save_offset(current_size) - print("=" * 60) - print("āœ… Processing complete!") + print("[OK] Processing complete!") print("=" * 60) - + + if __name__ == "__main__": - process_live() \ No newline at end of file + process_live() diff --git a/update_utils/update_goldsky.py b/update_utils/update_goldsky.py index 5fdb197..99152f8 100644 --- a/update_utils/update_goldsky.py +++ b/update_utils/update_goldsky.py @@ -19,6 +19,46 @@ os.mkdir('goldsky') CURSOR_FILE = 'goldsky/cursor_state.json' +LOCK_FILE = 'goldsky/scraper.lock' + +def _acquire_lock(): + if os.path.isfile(LOCK_FILE): + try: + with open(LOCK_FILE, 'r') as f: + pid = int(f.read().strip()) + try: + os.kill(pid, 0) + return False # Process still running + except OSError: + pass # Stale lock — process is gone + except Exception: + pass + with open(LOCK_FILE, 'w') as f: + f.write(str(os.getpid())) + return True + +def _release_lock(): + try: + if os.path.isfile(LOCK_FILE): + os.remove(LOCK_FILE) + except Exception: + pass + +def _write_buffer(buffer, output_file): + """Write buffered dataframes to CSV in one operation.""" + if not buffer: + return + combined_df = pd.concat(buffer, ignore_index=True) + # Deduplicate by transactionHash (id column is dropped before buffering) + combined_df = combined_df.drop_duplicates(subset=['transactionHash']) + # Only keep the columns we want to save + cols = [c for c in COLUMNS_TO_SAVE if c in combined_df.columns] + combined_df = combined_df[cols] + if os.path.isfile(output_file): + combined_df.to_csv(output_file, index=None, mode='a', header=None) + else: + combined_df.to_csv(output_file, index=None) + print(f" -> Wrote {len(combined_df)} records to disk") def save_cursor(timestamp, last_id, sticky_timestamp=None): """Save cursor state to file for efficient resume.""" @@ -72,23 +112,33 @@ def get_latest_cursor(): if 'timestamp' in headers: timestamp_index = headers.index('timestamp') + # 'id' is the first column in COLUMNS_TO_SAVE but not saved; use transactionHash + # The CSV columns are: timestamp,maker,makerAssetId,makerAmountFilled,taker,takerAssetId,takerAmountFilled,transactionHash + # We need the Goldsky event id — it's not stored in the CSV. + # Instead, use a sticky cursor at last_timestamp so we page past all + # already-stored events at that second before advancing forward. + # We set last_timestamp (non-sticky) to last_timestamp - 1 so the sticky + # clause is: timestamp=last_timestamp, id_gt="" which returns all ids > "" + # (i.e. all events at that second), then the loop exhausts them and advances. values = last_line.split(',') if len(values) > timestamp_index: last_timestamp = int(values[timestamp_index]) readable_time = datetime.fromtimestamp(last_timestamp, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') print(f'Resuming from CSV (no cursor file): timestamp {last_timestamp} ({readable_time})') - # Go back 1 second to ensure no data loss (may create some duplicates) - return last_timestamp - 1, None, None + # Use sticky cursor at last_timestamp with id_gt="" to exhaust that + # second completely, then naturally advance — avoids false end-of-data + # that the old "timestamp - 1" approach caused. + return last_timestamp - 1, None, last_timestamp except Exception as e: print(f"Error reading latest file with tail: {e}") # Fallback to pandas try: df = pd.read_csv(cache_file) if len(df) > 0 and 'timestamp' in df.columns: - last_timestamp = df.iloc[-1]['timestamp'] - readable_time = datetime.fromtimestamp(int(last_timestamp), tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') + last_timestamp = int(df.iloc[-1]['timestamp']) + readable_time = datetime.fromtimestamp(last_timestamp, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') print(f'Resuming from CSV (no cursor file): timestamp {last_timestamp} ({readable_time})') - return int(last_timestamp) - 1, None, None + return last_timestamp - 1, None, last_timestamp except Exception as e2: print(f"Error reading with pandas: {e2}") @@ -100,17 +150,24 @@ def scrape(at_once=1000): QUERY_URL = "https://api.goldsky.com/api/public/project_cl6mb8i9h0003e201j6li0diw/subgraphs/orderbook-subgraph/0.0.1/gn" print(f"Query URL: {QUERY_URL}") print(f"Runtime timestamp: {RUNTIME_TIMESTAMP}") - + # Get starting cursor from latest file (includes sticky state for perfect resume) last_timestamp, last_id, sticky_timestamp = get_latest_cursor() count = 0 total_records = 0 + write_buffer = [] # Buffer batches in memory + buffer_size = 50 # Write every N batches to reduce I/O print(f"\nStarting scrape for orderFilledEvents") - + output_file = 'goldsky/orderFilled.csv' print(f"Output file: {output_file}") print(f"Saving columns: {COLUMNS_TO_SAVE}") + print(f"Batch size: {at_once}, Write buffer: {buffer_size} batches") + + # Create client once and reuse across batches + transport = RequestsHTTPTransport(url=QUERY_URL, verify=True, retries=3, timeout=30) + client = Client(transport=transport) while True: # Build the where clause based on cursor state @@ -120,7 +177,7 @@ def scrape(at_once=1000): else: # Normal mode: advance by timestamp where_clause = f'timestamp_gt: "{last_timestamp}"' - + q_string = '''query MyQuery { orderFilledEvents(orderBy: timestamp, orderDirection: asc first: ''' + str(at_once) + ''' @@ -141,17 +198,18 @@ def scrape(at_once=1000): ''' query = gql(q_string) - transport = RequestsHTTPTransport(url=QUERY_URL, verify=True, retries=3) - client = Client(transport=transport) - + try: res = client.execute(query) except Exception as e: print(f"Query error: {e}") print("Retrying in 5 seconds...") time.sleep(5) + # Recreate client on error to reset connection + transport = RequestsHTTPTransport(url=QUERY_URL, verify=True, retries=3, timeout=30) + client = Client(transport=transport) continue - + if not res['orderFilledEvents'] or len(res['orderFilledEvents']) == 0: if sticky_timestamp is not None: # Exhausted events at sticky timestamp, advance to next timestamp @@ -159,20 +217,23 @@ def scrape(at_once=1000): sticky_timestamp = None last_id = None continue + # Write any remaining buffered data before exiting + if write_buffer: + _write_buffer(write_buffer, output_file) print(f"No more data for orderFilledEvents") break df = pd.DataFrame([flatten(x) for x in res['orderFilledEvents']]).reset_index(drop=True) - + # Sort by timestamp and id for consistent ordering df = df.sort_values(['timestamp', 'id'], ascending=True).reset_index(drop=True) - + batch_last_timestamp = int(df.iloc[-1]['timestamp']) batch_last_id = df.iloc[-1]['id'] batch_first_timestamp = int(df.iloc[0]['timestamp']) - + readable_time = datetime.fromtimestamp(batch_last_timestamp, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') - + # Determine if we need sticky cursor for next iteration if len(df) >= at_once: # Batch is full - check if all events are at the same timestamp @@ -195,26 +256,33 @@ def scrape(at_once=1000): sticky_timestamp = None last_id = None print(f"Batch {count + 1}: Timestamp {batch_last_timestamp} ({readable_time}), Records: {len(df)} [STICKY COMPLETE]") + count += 1 + total_records += len(df) + df = df[COLUMNS_TO_SAVE].copy() + write_buffer.append(df) + # Write buffer if it reaches threshold + if len(write_buffer) >= buffer_size: + _write_buffer(write_buffer, output_file) + write_buffer = [] + save_cursor(last_timestamp, last_id, sticky_timestamp) + continue # skip break check — more data may exist at higher timestamps else: # Normal advancement last_timestamp = batch_last_timestamp print(f"Batch {count + 1}: Last timestamp {batch_last_timestamp} ({readable_time}), Records: {len(df)}") - + count += 1 total_records += len(df) - # Remove duplicates (by id to be safe) - df = df.drop_duplicates(subset=['id']) - # Filter to only the columns we want to save df_to_save = df[COLUMNS_TO_SAVE].copy() + write_buffer.append(df_to_save) + + # Write buffer if it reaches threshold + if len(write_buffer) >= buffer_size: + _write_buffer(write_buffer, output_file) + write_buffer = [] - # Save to file - if os.path.isfile(output_file): - df_to_save.to_csv(output_file, index=None, mode='a', header=None) - else: - df_to_save.to_csv(output_file, index=None) - # Save cursor state for efficient resume (no duplicates on restart) save_cursor(last_timestamp, last_id, sticky_timestamp) @@ -224,13 +292,16 @@ def scrape(at_once=1000): # Clear cursor file on successful completion if os.path.isfile(CURSOR_FILE): os.remove(CURSOR_FILE) - + print(f"Finished scraping orderFilledEvents") print(f"Total new records: {total_records}") print(f"Output file: {output_file}") def update_goldsky(): """Run scraping for orderFilledEvents""" + if not _acquire_lock(): + print(f"Another scraper instance is already running (PID in {LOCK_FILE}). Exiting.") + return print(f"\n{'='*50}") print(f"Starting to scrape orderFilledEvents") print(f"Runtime: {RUNTIME_TIMESTAMP}") @@ -239,4 +310,6 @@ def update_goldsky(): scrape() print(f"Successfully completed orderFilledEvents") except Exception as e: - print(f"Error scraping orderFilledEvents: {str(e)}") \ No newline at end of file + print(f"Error scraping orderFilledEvents: {str(e)}") + finally: + _release_lock() \ No newline at end of file