diff --git a/docs/PIPELINE_OPERATIONS_GUIDE.md b/docs/PIPELINE_OPERATIONS_GUIDE.md new file mode 100644 index 0000000..d5da5f6 --- /dev/null +++ b/docs/PIPELINE_OPERATIONS_GUIDE.md @@ -0,0 +1,706 @@ +# QuantMini Pipeline Operations Guide + +**Comprehensive guide for running and optimizing the QuantMini data pipeline** + +--- + +## Table of Contents + +1. [Quick Start](#quick-start) +2. [Parallel Execution](#parallel-execution) +3. [Data Refresh Strategies](#data-refresh-strategies) +4. [Performance Optimization](#performance-optimization) +5. [Corporate Actions Architecture](#corporate-actions-architecture) +6. [Metadata Tracking](#metadata-tracking) +7. [Troubleshooting](#troubleshooting) + +--- + +## 1. Quick Start + +### Running Daily Updates + +```bash +# Default: Process yesterday's data in parallel (5-10 minutes) +./scripts/daily_update_parallel.sh + +# Backfill last 7 days +./scripts/daily_update_parallel.sh --days-back 7 + +# Sequential mode (for lower-spec hardware, 17-30 minutes) +./scripts/daily_update.sh --days-back 1 +``` + +### Performance Comparison + +| Mode | Duration | Use Case | +|------|----------|----------| +| **Parallel** | 5-10 min | 8+ cores, 32+ GB RAM, SSD | +| **Sequential (optimized)** | 17-30 min | 4+ cores, 16+ GB RAM | +| **Sequential (legacy)** | 55-105 min | <4 cores, <16 GB RAM | + +--- + +## 2. Parallel Execution + +### Parallelization Strategy + +**Landing Layer (4 parallel S3 downloads):** +``` +Job 1: Stocks Daily S3 +Job 2: Stocks Minute S3 +Job 3: Options Daily S3 +Job 4: Options Minute S3 +Time: ~2-3 minutes (vs 8-12 min sequential) +``` + +**Bronze Layer (11 parallel jobs):** +``` +S3 Ingestion (4 jobs) + Polygon API Downloads (7 jobs) +├── Stocks Daily/Minute → Bronze +├── Options Daily/Minute → Bronze +├── Fundamentals (180-day window) +├── Corporate Actions +├── Ticker Events +├── News +└── Short Interest/Volume (30-day window) + +Sequential after parallel: +└── Financial Ratios (depends on fundamentals) +└── Reference Data (Mondays only) + +Time: ~2-4 minutes (vs 10-15 min sequential) +``` + +**Silver Layer (3 parallel jobs):** +``` +Job 1: Financial Ratios → Silver +Job 2: Corporate Actions → Silver +Job 3: Fundamentals Flattening → Silver +Time: ~1-2 minutes (vs 3-5 min sequential) +``` + +**Gold Layer (Sequential):** +``` +1. Enrich Stocks Daily +2. Convert to Qlib Binary +3. Enrich Stocks Minute +4. Enrich Options Daily +Time: ~1-2 minutes (feature dependencies require sequential) +``` + +### Usage Options + +```bash +# Basic usage +./scripts/daily_update_parallel.sh + +# Advanced options +./scripts/daily_update_parallel.sh \ + --date 2024-01-15 \ + --max-parallel 4 \ + --skip-landing \ + --skip-gold \ + --fundamental-tickers "AAPL MSFT GOOGL" + +# Dry run (see execution plan) +./scripts/daily_update_parallel.sh --dry-run +``` + +### System Requirements + +**Minimum:** +- CPU: 4 cores +- RAM: 16 GB +- Disk: Fast SSD +- Network: 100 Mbps + +**Recommended:** +- CPU: 8+ cores +- RAM: 32 GB +- Disk: NVMe SSD +- Network: 500 Mbps+ + +--- + +## 3. Data Refresh Strategies + +### Summary Table + +| Data Type | Frequency | Lookback | Future Window | API Calls/Week | +|-----------|-----------|----------|---------------|----------------| +| **Fundamentals** | Weekly | 180 days | N/A | ~100 | +| **Corporate Actions** | Daily | 30 days | 90 days | ~14 | +| **Short Interest/Volume** | Weekly | 30 days | N/A | ~2,000 | +| **Ticker Events** | Weekly | All time | N/A | ~50 | +| **Financial Ratios** | Weekly | Derived | N/A | 0 (calculated) | + +### Fundamentals (Weekly) + +**Recommended Refresh: Every Sunday at 2 AM** + +```bash +# 180-day lookback captures last 2 quarters +quantmini polygon fundamentals $TICKERS \ + --timeframe quarterly \ + --filing-date-gte $(date -d '180 days ago' +%Y-%m-%d) \ + --output-dir $BRONZE_DIR/fundamentals + +# Calculate ratios immediately after +quantmini polygon financial-ratios $TICKERS \ + --input-dir $BRONZE_DIR/fundamentals \ + --output-dir $BRONZE_DIR/fundamentals \ + --include-growth +``` + +**Rationale:** +- Companies file 10-Q quarterly (~90 days) +- 180-day window captures amendments and late filings +- Earnings seasons: Late Jan, Apr, Jul, Oct +- Weekly refresh sufficient for quarterly data + +### Corporate Actions (Daily) + +**Recommended Refresh: Every day at 3 AM** + +```bash +# Historical (last 30 days) +quantmini polygon corporate-actions \ + --start-date $(date -d '30 days ago' +%Y-%m-%d) \ + --end-date $(date +%Y-%m-%d) \ + --include-ipos \ + --output-dir $BRONZE_DIR/corporate_actions + +# Future events (next 90 days) - critical for dividend strategies! +quantmini polygon corporate-actions \ + --start-date $(date +%Y-%m-%d) \ + --end-date $(date -d '90 days' +%Y-%m-%d) \ + --include-ipos \ + --output-dir $BRONZE_DIR/corporate_actions_future +``` + +**Rationale:** +- Dividends/splits announced unpredictably +- 30-day lookback captures recent changes and corrections +- 90-day future window captures announced dividends for strategies +- Daily refresh ensures timely updates + +**Monthly Full Backfill (1st of month, 1 AM):** +```bash +quantmini polygon corporate-actions \ + --start-date $(date -d '2 years ago' +%Y-%m-%d) \ + --end-date $(date +%Y-%m-%d) \ + --include-ipos \ + --output-dir $BRONZE_DIR/corporate_actions +``` + +### Short Interest & Volume (Weekly) + +**Recommended Refresh: Every Monday at 4 AM** + +```bash +# ⚠️ IMPORTANT: API returns ALL tickers regardless of ticker parameter +quantmini polygon short-data ALL \ + --settlement-date-gte $(date -d '30 days ago' +%Y-%m-%d) \ + --date-gte $(date -d '30 days ago' +%Y-%m-%d) \ + --output-dir $BRONZE_DIR/fundamentals \ + --limit 1000 +``` + +**Rationale:** +- Short interest updated bi-weekly (15th and end of month) +- 30-day window captures 2 reporting cycles +- API returns all tickers - filter in silver layer +- Weekly refresh captures updates without daily overhead + +**Performance:** 2-5 minutes with 30-day window (vs 30-60+ min without filtering) + +### Weekly Schedule + +**Sunday (2-4 AM):** +```bash +# 2:00 AM - Fundamentals (180-day window) +# 2:30 AM - Financial Ratios +# 3:00 AM - Ticker Events +# API calls: ~100 +``` + +**Monday (4 AM):** +```bash +# 4:00 AM - Short Interest/Volume (30-day window) +# API calls: ~2,000 (paginated) +``` + +**Daily (3 AM):** +```bash +# 3:00 AM - Corporate Actions (30-day historical + 90-day future) +# API calls: ~2 per day = 14/week +``` + +**Monthly (1st of Month, 1 AM):** +```bash +# 1:00 AM - Full Corporate Actions Backfill (2 years) +# API calls: ~1 +``` + +### Total API Usage + +**Per Week:** ~2,114 calls (well within free tier: 5 calls/min) +**Per Month:** ~8,457 calls + +--- + +## 4. Performance Optimization + +### Date Filtering Optimization + +**Impact: 3-4x faster (55-105 min → 17-30 min)** + +All Polygon API calls now use date filtering to avoid downloading ALL historical data: + +**Short Data (10-20x faster):** +```bash +# Before: Downloaded ~1.2M records +# After: 30-day window downloads ~50-100K records +# Duration: 30-60 min → 2-5 min + +quantmini polygon short-data $TICKERS \ + --settlement-date-gte $(date -d '30 days ago' +%Y-%m-%d) \ + --date-gte $(date -d '30 days ago' +%Y-%m-%d) +``` + +**Fundamentals (5-10x faster):** +```bash +# Before: Downloaded ALL filings since 2000 +# After: 180-day window downloads last 2 quarters +# Duration: 15-30 min → 3-5 min + +quantmini polygon fundamentals $TICKERS \ + --timeframe quarterly \ + --filing-date-gte $(date -d '180 days ago' +%Y-%m-%d) +``` + +### Lookback Window Strategy + +| Data Type | Daily Update | Aggressive | Rationale | +|-----------|--------------|------------|-----------| +| **Short Interest** | 30 days | 30 days | Bi-weekly reporting | +| **Short Volume** | 30 days | 30 days | Daily data, 30d sufficient | +| **Fundamentals (Quarterly)** | 180 days | 365 days | Catch amendments | +| **Fundamentals (Annual)** | 365 days | 365 days | Annual cycle | +| **Corporate Actions (Historical)** | 30 days | 90 days | Recent activity | +| **Corporate Actions (Future)** | 90 days | 180 days | Announced events | + +### API Usage Impact + +**Daily Pipeline API Calls:** + +| Endpoint | Before | After | Reduction | +|----------|--------|-------|-----------| +| Short Interest | ~60,000 | ~100 | **99.8%** | +| Short Volume | ~1.2M | ~300 | **99.97%** | +| Fundamentals | ~50,000 | ~500 | **99%** | +| **Total** | ~1.3M | ~900 | **99.9%** | + +Benefits even with unlimited API tier: +- Reduced server load +- Improved reliability +- Faster downloads +- Lower bandwidth costs + +--- + +## 5. Corporate Actions Architecture + +### Silver Layer Design + +**Partitioning Structure:** +``` +silver/corporate_actions/ +├── ticker=ABBV/ +│ ├── event_type=dividend/data.parquet +│ └── event_type=ticker_change/data.parquet +├── ticker=ABT/ +│ └── event_type=dividend/data.parquet +└── ... (1,198+ tickers) +``` + +**Key Features:** +- **Ticker-first partitioning**: Optimizes stock screening (100x faster for single ticker) +- **Event-type sub-partitioning**: Filter without scanning irrelevant data +- **Unified schema**: All event types share common base + nullable type-specific fields +- **Derived features**: Pre-calculated annualized dividends, split flags, etc. + +### Event Types Tracked + +**Dividend Fields:** +- cash_amount, currency, declaration_date, ex_dividend_date +- record_date, pay_date, frequency, div_type +- **Derived:** annualized_amount, is_special, quarter + +**Split Fields:** +- execution_date, from, to, ratio +- **Derived:** is_reverse (ratio < 1.0) + +**IPO Fields:** +- listing_date, issue_price, shares_offered, exchange, status + +**Ticker Change Fields:** +- new_ticker + +### Query Performance + +| Query Type | Time | Files Read | +|------------|------|------------| +| Single ticker lookup | ~5-10ms | 1 file | +| Portfolio (10 tickers) | ~50-100ms | 10 files | +| Event-type scan | ~100-200ms | N files for event type | +| Full table scan | ~500ms-1s | All files | + +**Example: Get ABBV dividend history** +```python +import polars as pl +from src.utils.paths import get_quantlake_root + +silver_path = get_quantlake_root() / 'silver' / 'corporate_actions' + +df = pl.scan_parquet( + str(silver_path / 'ticker=ABBV' / 'event_type=dividend' / 'data.parquet') +).collect() + +print(df.select(['event_date', 'div_cash_amount', 'div_annualized_amount'])) +``` + +### Transformation Script + +```bash +# Transform bronze → silver with metadata tracking +export QUANTLAKE_ROOT=/Users/zheyuanzhao/workspace/quantlake + +# Transform all tickers +python scripts/transformation/corporate_actions_silver_optimized.py + +# Transform specific tickers +python scripts/transformation/corporate_actions_silver_optimized.py --tickers AAPL MSFT +``` + +--- + +## 6. Metadata Tracking + +### Layer-Based Architecture + +Metadata is organized by Medallion Architecture layers: + +``` +metadata/ +├── bronze/ +│ ├── stocks_daily/ +│ │ ├── watermark.json +│ │ └── 2025/10/2025-10-20.json +│ ├── fundamentals/ +│ └── corporate_actions/ +├── silver/ +│ ├── corporate_actions/ +│ ├── fundamentals/ +│ └── financial_ratios/ +└── gold/ + └── stocks_daily_qlib/ + ├── watermark.json + └── 2025/10/2025-10-20.json +``` + +### Metadata Content + +**Ingestion Record Example:** +```json +{ + "data_type": "stocks_daily", + "date": "2025-10-20", + "symbol": null, + "status": "success", + "layer": "bronze", + "timestamp": "2025-10-21T11:33:46.123456", + "statistics": { + "records": 11782, + "file_size_mb": 45.2, + "processing_time_sec": 3.5 + }, + "error": null +} +``` + +**Watermark Example:** +```json +{ + "data_type": "stocks_daily", + "symbol": null, + "date": "2025-10-20", + "timestamp": "2025-10-21T11:33:46.456789" +} +``` + +### Benefits + +✅ **Incremental Processing**: Resume from last successful date +✅ **Gap Detection**: Identify missing dates for backfilling +✅ **Success Monitoring**: Track pipeline health and success rates +✅ **Error Tracking**: Review which dates failed and why +✅ **Statistics**: Monitor records processed, file sizes, times +✅ **Watermarks**: Know exactly what's been processed + +### Viewing Metadata + +```bash +# CLI display of all metadata +python -m src.storage.metadata_manager + +# Example output: +# 📊 stocks_daily (Bronze): +# Total jobs: 7 +# Success: 7, Skipped: 0, Failed: 0 +# Success rate: 100.0% +# Records: 82,474 +# Size: 316.4 MB +# Watermark: 2025-10-20 +# +# 📊 stocks_daily_qlib (Gold): +# Total jobs: 1 +# Success: 1, Skipped: 0, Failed: 0 +# Success rate: 100.0% +# Symbols Converted: 11,782 +# Watermark: 2025-10-20 + +# Check specific date +cat /Users/zheyuanzhao/workspace/quantlake/metadata/gold/stocks_daily_qlib/2025/10/2025-10-20.json +``` + +--- + +## 7. Troubleshooting + +### Parallel Jobs Failing Randomly + +**Symptoms:** Some jobs fail intermittently + +**Possible Causes:** +1. Insufficient memory +2. Network bandwidth saturation +3. API rate limiting + +**Solutions:** +```bash +# Reduce max parallel jobs +./scripts/daily_update_parallel.sh --max-parallel 4 + +# Use sequential script +./scripts/daily_update.sh +``` + +### Slower Than Expected + +**Symptoms:** Parallel script slower than sequential + +**Possible Causes:** +1. Low CPU cores (<4) +2. Slow disk (HDD vs SSD) +3. Limited network bandwidth +4. High system load + +**Solutions:** +```bash +# Check system load +top # or htop + +# Run during low-load periods +./scripts/daily_update_parallel.sh # Run at night + +# Use sequential for constrained systems +./scripts/daily_update.sh +``` + +### High Memory Usage + +**Symptoms:** System runs out of memory + +**Solutions:** +```bash +# Limit parallel jobs +./scripts/daily_update_parallel.sh --max-parallel 2 + +# Skip memory-intensive layers +./scripts/daily_update_parallel.sh --skip-landing --skip-bronze + +# Use streaming mode +export PIPELINE_MODE=streaming +./scripts/daily_update.sh +``` + +### Disk I/O Bottleneck + +**Symptoms:** Jobs queued waiting for disk writes + +**Solutions:** +```bash +# Reduce parallel jobs +./scripts/daily_update_parallel.sh --max-parallel 4 + +# Use sequential for HDD +./scripts/daily_update.sh + +# Consider SSD upgrade +``` + +### Metadata Not Recording + +**Symptoms:** Empty metadata directory + +**Check:** +```bash +# Verify metadata directory exists +ls -la /Users/zheyuanzhao/workspace/quantlake/metadata/ + +# Re-run ingestion (will skip existing, record metadata) +python scripts/ingestion/landing_to_bronze.py \ + --data-type stocks_daily \ + --start-date 2025-10-20 \ + --end-date 2025-10-20 \ + --no-incremental +``` + +### Schema Validation Errors + +**Symptoms:** Parquet write failures with schema conflicts + +**Solution:** +```bash +# Verify parquet.use_dictionary = false in config +cat config/pipeline_config.yaml | grep use_dictionary + +# Check existing schema +python -c " +import pyarrow.parquet as pq +metadata = pq.read_metadata('data/bronze/stocks_daily/year=2024/month=01/day=01/part.parquet') +print(metadata.schema) +" +``` + +### API Rate Limit Errors + +**Symptoms:** 429 Too Many Requests errors + +**Solutions:** +```bash +# Check your API tier limits +# Free tier: 5 calls/min +# Starter: Unlimited + +# Reduce parallel API downloads +./scripts/daily_update_parallel.sh --max-parallel 2 + +# Use longer date windows (fewer API calls) +# Already optimized with date filtering +``` + +--- + +## Best Practices + +### 1. Choose Right Script for Your Hardware + +| Hardware | Script | Performance | +|----------|--------|-------------| +| **8+ cores, 32 GB, NVMe SSD** | parallel | 5-7 min | +| **4-8 cores, 16 GB, SSD** | parallel | 7-10 min | +| **2-4 cores, 8 GB, HDD** | sequential | 17-30 min | + +### 2. Monitor First Few Runs + +```bash +# Watch logs in real-time +tail -f logs/daily_update_parallel_*.log + +# Check system resources +htop # or top + +# Verify data integrity +ls -lh ~/workspace/quantlake/bronze/fundamentals/**/*.parquet +``` + +### 3. Production Deployment + +**Recommended cron setup:** +```bash +# Daily at 2 AM: Fast parallel execution +0 2 * * * /path/to/quantmini/scripts/daily_update_parallel.sh + +# Weekly at 3 AM Sunday: Full backfill for safety +0 3 * * 0 /path/to/quantmini/scripts/daily_update.sh --days-back 7 +``` + +### 4. Incremental Updates + +Use watermarks for efficient processing: +```python +from src.storage.metadata_manager import MetadataManager + +metadata = MetadataManager(metadata_root) + +# Get last processed date +last_date = metadata.get_watermark('stocks_daily', layer='bronze') + +# Process only new dates +start_date = (datetime.strptime(last_date, '%Y-%m-%d') + timedelta(days=1)).strftime('%Y-%m-%d') +``` + +--- + +## Quick Reference + +### Common Commands + +```bash +# Daily update (parallel, default: yesterday) +./scripts/daily_update_parallel.sh + +# 7-day backfill (parallel) +./scripts/daily_update_parallel.sh --days-back 7 + +# Daily update (sequential, all layers) +./scripts/daily_update.sh --days-back 1 + +# View metadata +python -m src.storage.metadata_manager + +# Transform corporate actions to silver +python scripts/transformation/corporate_actions_silver_optimized.py + +# Check pipeline configuration +quantmini config show +``` + +### Performance Targets + +| Pipeline | Target Duration | Bottleneck | +|----------|----------------|------------| +| Landing (parallel) | 2-3 min | S3 download speed | +| Bronze (parallel) | 2-4 min | Short data API | +| Silver (parallel) | 1-2 min | Transformation compute | +| Gold (sequential) | 1-2 min | Feature dependencies | +| **Total (parallel)** | **5-10 min** | System resources | +| **Total (sequential)** | **17-30 min** | Processing mode | + +### Data Quality Metrics + +Monitor these key metrics: + +1. **Freshness**: Days since latest data (alert if >14 days) +2. **Coverage**: % of tickers with data (alert if <95%) +3. **Success Rate**: Successful vs failed jobs (alert if <95%) +4. **Record Counts**: Anomalies in records added (0 or huge spikes) + +--- + +**Last Updated:** 2025-10-21 +**Version:** 2.0 (Consolidated from 6 operational docs) +**Status:** Production Ready diff --git a/docs/getting-started/DATA_CONFIGURATION.md b/docs/getting-started/DATA_CONFIGURATION.md index 2bacc25..b0c2939 100644 --- a/docs/getting-started/DATA_CONFIGURATION.md +++ b/docs/getting-started/DATA_CONFIGURATION.md @@ -65,7 +65,7 @@ Edit `config/system_profile.yaml` (gitignored - safe for personal paths): cp config/system_profile.yaml.example config/system_profile.yaml # Edit system_profile.yaml -data_root: /Volumes/ExternalSSD/quantmini-data/data +data_root: /Volumes/ExternalSSD/quantlake/data ``` **Pros**: @@ -148,13 +148,13 @@ Store data on a fast external drive: ```bash # macOS -DATA_ROOT=/Volumes/ExternalSSD/quantmini-data/data +DATA_ROOT=/Volumes/ExternalSSD/quantlake/data # Linux -DATA_ROOT=/mnt/storage/quantmini-data/data +DATA_ROOT=/mnt/storage/quantlake/data # Windows (WSL) -DATA_ROOT=/mnt/d/quantmini-data/data +DATA_ROOT=/mnt/d/quantlake/data ``` **Pros**: More storage capacity, doesn't fill system drive @@ -169,7 +169,7 @@ Store data on NAS or cloud storage: DATA_ROOT=/mnt/nas/quantmini/data # Cloud (mounted via rclone, etc.) -DATA_ROOT=/mnt/s3/quantmini-data/data +DATA_ROOT=/mnt/s3/quantlake/data ``` **Pros**: Accessible from multiple machines, backup built-in diff --git a/docs/guides/data-ingestion-strategies.md b/docs/guides/data-ingestion-strategies.md index b84be5d..1467735 100644 --- a/docs/guides/data-ingestion-strategies.md +++ b/docs/guides/data-ingestion-strategies.md @@ -509,7 +509,7 @@ uv run python scripts/validation/validate_duckdb_access.py **Solution**: ```bash # Check disk usage -df -h /Volumes/sandisk/quantmini-data +df -h /Volumes/sandisk/quantlake # Clean old data uv run python scripts/maintenance/cleanup_old_data.py \ @@ -517,7 +517,7 @@ uv run python scripts/maintenance/cleanup_old_data.py \ # Move to external drive rsync -av --progress \ - /Volumes/sandisk/quantmini-data/ \ + /Volumes/sandisk/quantlake/ \ /Volumes/backup/quantmini-archive/ ``` diff --git a/scripts/transformation/corporate_actions_silver_optimized.py b/scripts/transformation/corporate_actions_silver_optimized.py new file mode 100755 index 0000000..fbf4f24 --- /dev/null +++ b/scripts/transformation/corporate_actions_silver_optimized.py @@ -0,0 +1,642 @@ +#!/usr/bin/env python3 +""" +Optimized Corporate Actions Silver Layer Transformation + +This script creates an optimized silver layer for corporate actions data with: +- Ticker-first partitioning for fast stock screening +- Event-type sub-partitioning for efficient filtering +- Derived features for analysis +- Data quality validation + +Partitioning structure: + silver/corporate_actions/ + ├── ticker=AAPL/ + │ ├── event_type=dividend/ + │ │ └── data.parquet + │ ├── event_type=split/ + │ │ └── data.parquet + └── ticker=MSFT/ + └── event_type=dividend/ + └── data.parquet + +Usage: + python scripts/transformation/corporate_actions_silver_optimized.py + python scripts/transformation/corporate_actions_silver_optimized.py --tickers AAPL MSFT GOOGL +""" + +import sys +from pathlib import Path +from datetime import datetime +import logging +from typing import Optional, List + +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +import polars as pl +from src.utils.paths import get_quantlake_root +from src.storage.metadata_manager import MetadataManager +from src.core.config_loader import ConfigLoader + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def process_dividends(bronze_path: Path, tickers: Optional[List[str]] = None) -> pl.DataFrame: + """Process dividend files from bronze layer""" + + logger.info("Processing DIVIDENDS...") + dividends_path = bronze_path / "dividends" + + if not dividends_path.exists(): + logger.warning(f"Dividends path not found: {dividends_path}") + return None + + # Find all parquet files + all_files = list(dividends_path.rglob("*.parquet")) + + # Filter by tickers if specified + if tickers: + ticker_set = set(t.upper() for t in tickers) + all_files = [f for f in all_files if f.stem.replace('ticker=', '') in ticker_set] + + logger.info(f" Found {len(all_files):,} dividend files") + + if not all_files: + return None + + # Load all files + dfs = [] + for file_path in all_files: + try: + df = pl.read_parquet(file_path) + dfs.append(df) + except Exception as e: + logger.warning(f"Failed to read {file_path}: {e}") + continue + + if not dfs: + return None + + # Combine all dividends + combined_df = pl.concat(dfs, how="vertical_relaxed") + + # Transform to unified schema with derived features + unified_df = combined_df.select([ + # Base fields + pl.col('ticker'), + pl.lit('dividend').alias('event_type'), + pl.col('ex_dividend_date').str.to_date().alias('event_date'), + pl.col('id'), + pl.col('downloaded_at'), + + # Dividend-specific fields + pl.col('cash_amount').alias('div_cash_amount'), + pl.col('currency').alias('div_currency'), + pl.col('declaration_date').str.to_date().alias('div_declaration_date'), + pl.col('dividend_type').alias('div_type'), + pl.col('ex_dividend_date').str.to_date().alias('div_ex_dividend_date'), + pl.col('frequency').alias('div_frequency'), + pl.col('pay_date').str.to_date().alias('div_pay_date'), + pl.col('record_date').str.to_date().alias('div_record_date'), + + # Null columns for splits + pl.lit(None).cast(pl.Date).alias('split_execution_date'), + pl.lit(None).cast(pl.Float64).alias('split_from'), + pl.lit(None).cast(pl.Float64).alias('split_to'), + pl.lit(None).cast(pl.Float64).alias('split_ratio'), + pl.lit(None).cast(pl.Boolean).alias('split_is_reverse'), + + # Null columns for IPOs + pl.lit(None).cast(pl.Date).alias('ipo_listing_date'), + pl.lit(None).cast(pl.Float64).alias('ipo_issue_price'), + pl.lit(None).cast(pl.Int64).alias('ipo_shares_offered'), + pl.lit(None).cast(pl.String).alias('ipo_exchange'), + pl.lit(None).cast(pl.String).alias('ipo_status'), + + # Null columns for ticker changes + pl.lit(None).cast(pl.String).alias('new_ticker'), + ]) + + # Add derived features for dividends + unified_df = unified_df.with_columns([ + # Annualized amount based on frequency + pl.when(pl.col('div_frequency') == 12).then(pl.col('div_cash_amount') * 12) + .when(pl.col('div_frequency') == 4).then(pl.col('div_cash_amount') * 4) + .when(pl.col('div_frequency') == 2).then(pl.col('div_cash_amount') * 2) + .when(pl.col('div_frequency') == 1).then(pl.col('div_cash_amount')) + .otherwise(None) + .alias('div_annualized_amount'), + + # Special dividend flag (one-time) + (pl.col('div_frequency') == 0).alias('div_is_special'), + + # Quarter from ex-dividend date + pl.col('event_date').dt.quarter().cast(pl.Int8).alias('div_quarter'), + ]) + + logger.info(f" Processed {len(unified_df):,} dividend records") + return unified_df + + +def process_splits(bronze_path: Path, tickers: Optional[List[str]] = None) -> pl.DataFrame: + """Process stock split files from bronze layer""" + + logger.info("Processing SPLITS...") + splits_path = bronze_path / "splits" + + if not splits_path.exists(): + logger.warning(f"Splits path not found: {splits_path}") + return None + + # Find all parquet files + all_files = list(splits_path.rglob("*.parquet")) + + # Filter by tickers if specified + if tickers: + ticker_set = set(t.upper() for t in tickers) + all_files = [f for f in all_files if f.stem.replace('ticker=', '') in ticker_set] + + logger.info(f" Found {len(all_files):,} split files") + + if not all_files: + return None + + # Load all files + dfs = [] + for file_path in all_files: + try: + df = pl.read_parquet(file_path) + dfs.append(df) + except Exception as e: + logger.warning(f"Failed to read {file_path}: {e}") + continue + + if not dfs: + return None + + # Combine all splits + combined_df = pl.concat(dfs, how="vertical_relaxed") + + # Transform to unified schema + unified_df = combined_df.select([ + # Base fields + pl.col('ticker'), + pl.lit('split').alias('event_type'), + pl.col('execution_date').str.to_date().alias('event_date'), + pl.col('id'), + pl.col('downloaded_at'), + + # Null columns for dividends + pl.lit(None).cast(pl.Float64).alias('div_cash_amount'), + pl.lit(None).cast(pl.String).alias('div_currency'), + pl.lit(None).cast(pl.Date).alias('div_declaration_date'), + pl.lit(None).cast(pl.String).alias('div_type'), + pl.lit(None).cast(pl.Date).alias('div_ex_dividend_date'), + pl.lit(None).cast(pl.Int64).alias('div_frequency'), + pl.lit(None).cast(pl.Date).alias('div_pay_date'), + pl.lit(None).cast(pl.Date).alias('div_record_date'), + pl.lit(None).cast(pl.Float64).alias('div_annualized_amount'), + pl.lit(None).cast(pl.Boolean).alias('div_is_special'), + pl.lit(None).cast(pl.Int8).alias('div_quarter'), + + # Split-specific fields + pl.col('execution_date').str.to_date().alias('split_execution_date'), + pl.col('split_from').cast(pl.Float64).alias('split_from'), + pl.col('split_to').cast(pl.Float64).alias('split_to'), + (pl.col('split_to').cast(pl.Float64) / pl.col('split_from').cast(pl.Float64)).alias('split_ratio'), + + # Null columns for IPOs + pl.lit(None).cast(pl.Date).alias('ipo_listing_date'), + pl.lit(None).cast(pl.Float64).alias('ipo_issue_price'), + pl.lit(None).cast(pl.Int64).alias('ipo_shares_offered'), + pl.lit(None).cast(pl.String).alias('ipo_exchange'), + pl.lit(None).cast(pl.String).alias('ipo_status'), + + # Null columns for ticker changes + pl.lit(None).cast(pl.String).alias('new_ticker'), + ]) + + # Add derived features for splits + unified_df = unified_df.with_columns([ + # Reverse split flag (ratio < 1) + (pl.col('split_ratio') < 1.0).alias('split_is_reverse'), + ]) + + logger.info(f" Processed {len(unified_df):,} split records") + return unified_df + + +def process_ipos(bronze_path: Path, tickers: Optional[List[str]] = None) -> pl.DataFrame: + """Process IPO files from bronze layer""" + + logger.info("Processing IPOS...") + ipos_path = bronze_path / "ipos" + + if not ipos_path.exists(): + logger.warning(f"IPOs path not found: {ipos_path}") + return None + + # Find all parquet files + all_files = list(ipos_path.rglob("*.parquet")) + + # Filter by tickers if specified + if tickers: + ticker_set = set(t.upper() for t in tickers) + all_files = [f for f in all_files if f.stem.replace('ticker=', '') in ticker_set] + + logger.info(f" Found {len(all_files):,} IPO files") + + if not all_files: + return None + + # Load all files + dfs = [] + for file_path in all_files: + try: + df = pl.read_parquet(file_path) + dfs.append(df) + except Exception as e: + logger.warning(f"Failed to read {file_path}: {e}") + continue + + if not dfs: + return None + + # Combine all IPOs + combined_df = pl.concat(dfs, how="vertical_relaxed") + + # Generate ID if not present + if 'id' not in combined_df.columns: + combined_df = combined_df.with_columns( + (pl.col('ticker') + '_' + pl.col('listing_date')).alias('id') + ) + + # Transform to unified schema + unified_df = combined_df.select([ + # Base fields + pl.col('ticker'), + pl.lit('ipo').alias('event_type'), + pl.col('listing_date').str.to_date().alias('event_date'), + pl.col('id'), + pl.col('downloaded_at'), + + # Null columns for dividends + pl.lit(None).cast(pl.Float64).alias('div_cash_amount'), + pl.lit(None).cast(pl.String).alias('div_currency'), + pl.lit(None).cast(pl.Date).alias('div_declaration_date'), + pl.lit(None).cast(pl.String).alias('div_type'), + pl.lit(None).cast(pl.Date).alias('div_ex_dividend_date'), + pl.lit(None).cast(pl.Int64).alias('div_frequency'), + pl.lit(None).cast(pl.Date).alias('div_pay_date'), + pl.lit(None).cast(pl.Date).alias('div_record_date'), + pl.lit(None).cast(pl.Float64).alias('div_annualized_amount'), + pl.lit(None).cast(pl.Boolean).alias('div_is_special'), + pl.lit(None).cast(pl.Int8).alias('div_quarter'), + + # Null columns for splits + pl.lit(None).cast(pl.Date).alias('split_execution_date'), + pl.lit(None).cast(pl.Float64).alias('split_from'), + pl.lit(None).cast(pl.Float64).alias('split_to'), + pl.lit(None).cast(pl.Float64).alias('split_ratio'), + pl.lit(None).cast(pl.Boolean).alias('split_is_reverse'), + + # IPO-specific fields + pl.col('listing_date').str.to_date().alias('ipo_listing_date'), + pl.col('final_issue_price').alias('ipo_issue_price'), + pl.col('max_shares_offered').alias('ipo_shares_offered'), + pl.col('primary_exchange').alias('ipo_exchange'), + pl.col('ipo_status').alias('ipo_status'), + + # Null columns for ticker changes + pl.lit(None).cast(pl.String).alias('new_ticker'), + ]) + + logger.info(f" Processed {len(unified_df):,} IPO records") + return unified_df + + +def process_ticker_events(bronze_path: Path, tickers: Optional[List[str]] = None) -> pl.DataFrame: + """Process ticker change events from bronze layer""" + + logger.info("Processing TICKER EVENTS...") + ticker_events_path = bronze_path / "ticker_events" + + if not ticker_events_path.exists(): + logger.warning(f"Ticker events path not found: {ticker_events_path}") + return None + + # Find all parquet files + all_files = list(ticker_events_path.rglob("*.parquet")) + + # Filter by tickers if specified + if tickers: + ticker_set = set(t.upper() for t in tickers) + all_files = [f for f in all_files if f.stem.replace('ticker=', '') in ticker_set] + + logger.info(f" Found {len(all_files):,} ticker event files") + + if not all_files: + return None + + # Load all files + dfs = [] + for file_path in all_files: + try: + df = pl.read_parquet(file_path) + dfs.append(df) + except Exception as e: + logger.warning(f"Failed to read {file_path}: {e}") + continue + + if not dfs: + return None + + # Combine all ticker events + combined_df = pl.concat(dfs, how="vertical_relaxed") + + # Generate ID if not present + if 'id' not in combined_df.columns: + combined_df = combined_df.with_columns( + (pl.col('ticker') + '_' + pl.col('date')).alias('id') + ) + + # Transform to unified schema + unified_df = combined_df.select([ + # Base fields + pl.col('ticker'), + pl.lit('ticker_change').alias('event_type'), + pl.col('date').str.to_date().alias('event_date'), + pl.col('id'), + pl.col('downloaded_at'), + + # Null columns for dividends + pl.lit(None).cast(pl.Float64).alias('div_cash_amount'), + pl.lit(None).cast(pl.String).alias('div_currency'), + pl.lit(None).cast(pl.Date).alias('div_declaration_date'), + pl.lit(None).cast(pl.String).alias('div_type'), + pl.lit(None).cast(pl.Date).alias('div_ex_dividend_date'), + pl.lit(None).cast(pl.Int64).alias('div_frequency'), + pl.lit(None).cast(pl.Date).alias('div_pay_date'), + pl.lit(None).cast(pl.Date).alias('div_record_date'), + pl.lit(None).cast(pl.Float64).alias('div_annualized_amount'), + pl.lit(None).cast(pl.Boolean).alias('div_is_special'), + pl.lit(None).cast(pl.Int8).alias('div_quarter'), + + # Null columns for splits + pl.lit(None).cast(pl.Date).alias('split_execution_date'), + pl.lit(None).cast(pl.Float64).alias('split_from'), + pl.lit(None).cast(pl.Float64).alias('split_to'), + pl.lit(None).cast(pl.Float64).alias('split_ratio'), + pl.lit(None).cast(pl.Boolean).alias('split_is_reverse'), + + # Null columns for IPOs + pl.lit(None).cast(pl.Date).alias('ipo_listing_date'), + pl.lit(None).cast(pl.Float64).alias('ipo_issue_price'), + pl.lit(None).cast(pl.Int64).alias('ipo_shares_offered'), + pl.lit(None).cast(pl.String).alias('ipo_exchange'), + pl.lit(None).cast(pl.String).alias('ipo_status'), + + # Ticker change specific fields + pl.col('new_ticker') if 'new_ticker' in combined_df.columns else pl.lit(None).cast(pl.String).alias('new_ticker'), + ]) + + logger.info(f" Processed {len(unified_df):,} ticker change records") + return unified_df + + +def write_partitioned_silver(df: pl.DataFrame, silver_path: Path) -> dict: + """ + Write data to silver layer with ticker + event_type partitioning + + Args: + df: DataFrame with all corporate actions + silver_path: Root path for silver layer + + Returns: + Dictionary with write statistics + """ + silver_path.mkdir(parents=True, exist_ok=True) + + stats = { + 'tickers_written': 0, + 'files_written': 0, + 'total_records': len(df) + } + + # Get unique ticker/event_type combinations + partitions = df.select(['ticker', 'event_type']).unique() + + logger.info(f"Writing {len(partitions)} partitions...") + + for row in partitions.iter_rows(named=True): + ticker = row['ticker'] + event_type = row['event_type'] + + # Filter data for this partition + partition_df = df.filter( + (pl.col('ticker') == ticker) & + (pl.col('event_type') == event_type) + ) + + # Sort by event_date descending (most recent first) + partition_df = partition_df.sort('event_date', descending=True) + + # Add processing metadata + partition_df = partition_df.with_columns([ + pl.lit(datetime.now()).alias('processed_at'), + pl.col('event_date').dt.year().cast(pl.Int32).alias('year'), + pl.col('event_date').dt.quarter().cast(pl.Int8).alias('quarter'), + pl.col('event_date').dt.month().cast(pl.Int8).alias('month'), + ]) + + # Create partition directory + partition_dir = silver_path / f"ticker={ticker}" / f"event_type={event_type}" + partition_dir.mkdir(parents=True, exist_ok=True) + + output_file = partition_dir / "data.parquet" + + # Write with optimizations + partition_df.write_parquet( + output_file, + compression='zstd', + compression_level=3, + statistics=True, # Write column statistics for predicate pushdown + row_group_size=50000 # Optimize for query performance + ) + + stats['files_written'] += 1 + + if stats['files_written'] % 100 == 0: + logger.info(f" Written {stats['files_written']} partitions...") + + stats['tickers_written'] = df.select('ticker').n_unique() + + return stats + + +def main(): + """Main entry point""" + import argparse + + parser = argparse.ArgumentParser( + description='Transform corporate actions to optimized silver layer', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__ + ) + + parser.add_argument( + '--tickers', + nargs='+', + help='Specific tickers to process (default: all)' + ) + + parser.add_argument( + '--bronze-dir', + type=Path, + help='Bronze layer path (default: $QUANTLAKE_ROOT/bronze/corporate_actions)' + ) + + parser.add_argument( + '--silver-dir', + type=Path, + help='Silver layer path (default: $QUANTLAKE_ROOT/silver/corporate_actions)' + ) + + args = parser.parse_args() + + logger.info("="*80) + logger.info("OPTIMIZED CORPORATE ACTIONS SILVER LAYER TRANSFORMATION") + logger.info("="*80) + logger.info("") + + # Paths (using centralized configuration) + quantlake_root = get_quantlake_root() + bronze_path = args.bronze_dir or quantlake_root / 'bronze' / 'corporate_actions' + silver_path = args.silver_dir or quantlake_root / 'silver' / 'corporate_actions' + + logger.info(f"Bronze path: {bronze_path}") + logger.info(f"Silver path: {silver_path}") + + if args.tickers: + logger.info(f"Processing tickers: {', '.join(args.tickers)}") + else: + logger.info("Processing ALL tickers") + logger.info("") + + # Process each corporate action type + dividends_df = process_dividends(bronze_path, args.tickers) + splits_df = process_splits(bronze_path, args.tickers) + ipos_df = process_ipos(bronze_path, args.tickers) + ticker_events_df = process_ticker_events(bronze_path, args.tickers) + + # Combine all corporate actions + logger.info("") + logger.info("Combining all corporate actions...") + + all_dfs = [] + if dividends_df is not None: + all_dfs.append(dividends_df) + if splits_df is not None: + all_dfs.append(splits_df) + if ipos_df is not None: + all_dfs.append(ipos_df) + if ticker_events_df is not None: + all_dfs.append(ticker_events_df) + + if not all_dfs: + logger.error("No corporate actions found!") + return + + # Define consistent column order + column_order = [ + # Base fields + 'ticker', 'event_type', 'event_date', 'id', 'downloaded_at', + # Dividend fields + 'div_cash_amount', 'div_currency', 'div_declaration_date', 'div_type', + 'div_ex_dividend_date', 'div_frequency', 'div_pay_date', 'div_record_date', + 'div_annualized_amount', 'div_is_special', 'div_quarter', + # Split fields + 'split_execution_date', 'split_from', 'split_to', 'split_ratio', 'split_is_reverse', + # IPO fields + 'ipo_listing_date', 'ipo_issue_price', 'ipo_shares_offered', 'ipo_exchange', 'ipo_status', + # Ticker change fields + 'new_ticker' + ] + + # Ensure all dataframes have the same columns in the same order + aligned_dfs = [df.select(column_order) for df in all_dfs] + + combined_df = pl.concat(aligned_dfs, how="vertical") + + # Summary statistics + logger.info(f"Total records: {len(combined_df):,}") + logger.info(f"Total columns: {len(combined_df.columns)}") + logger.info("") + logger.info("Records by event type:") + for event_type, count in combined_df.group_by('event_type').agg(pl.len()).iter_rows(): + logger.info(f" {event_type}: {count:,}") + + logger.info("") + logger.info(f"Unique tickers: {combined_df['ticker'].n_unique()}") + logger.info(f"Date range: {combined_df['event_date'].min()} to {combined_df['event_date'].max()}") + + # Write to silver layer with optimized partitioning + logger.info("") + logger.info("Writing to silver layer with ticker + event_type partitioning...") + + stats = write_partitioned_silver(combined_df, silver_path) + + logger.info("") + logger.info("✓ Corporate actions silver layer created") + logger.info(f" Location: {silver_path}") + logger.info(f" Tickers: {stats['tickers_written']:,}") + logger.info(f" Files written: {stats['files_written']:,}") + logger.info(f" Total records: {stats['total_records']:,}") + logger.info(f" Partitioning: ticker / event_type") + logger.info(f" Optimization: Sorted by event_date DESC, no dictionary encoding") + logger.info("") + + # Record metadata for silver layer + try: + config = ConfigLoader() + metadata_root = config.get_metadata_path() + metadata_manager = MetadataManager(metadata_root) + + # Get date range from the combined data + min_date = str(combined_df['event_date'].min()) + max_date = str(combined_df['event_date'].max()) + + # Record metadata for each date in the range + # For corporate actions, we record a single entry for the transformation + metadata_manager.record_ingestion( + data_type='corporate_actions', + date=max_date, # Use max date as the watermark + status='success', + statistics={ + 'records': stats['total_records'], + 'tickers': stats['tickers_written'], + 'files_written': stats['files_written'], + 'min_date': min_date, + 'max_date': max_date, + }, + layer='silver' + ) + + # Update watermark + metadata_manager.set_watermark( + data_type='corporate_actions', + date=max_date, + layer='silver' + ) + + logger.info("✓ Metadata recorded for silver layer") + + except Exception as e: + logger.warning(f"Failed to record metadata: {e}") + + +if __name__ == '__main__': + main() diff --git a/src/cli/commands/data.py b/src/cli/commands/data.py index aab1e00..1bce92d 100644 --- a/src/cli/commands/data.py +++ b/src/cli/commands/data.py @@ -172,10 +172,10 @@ def enrich(data_type, start_date, end_date, incremental): config = ConfigLoader() click.echo(f"⚙️ Enriching {data_type} from {start_date} to {end_date}...") - + with FeatureEngineer( - parquet_root=config.get_data_root() / 'parquet', - enriched_root=config.get_data_root() / 'enriched', + parquet_root=config.get_bronze_path(), + enriched_root=config.get_silver_path(), config=config ) as engineer: result = engineer.enrich_date_range( @@ -184,11 +184,43 @@ def enrich(data_type, start_date, end_date, incremental): end_date=end_date, incremental=incremental ) - + click.echo(f"\n✅ Enriched {result['records_enriched']:,} records") click.echo(f" Dates processed: {result['dates_processed']}") click.echo(f" Features added: {result['features_added']}") + # Record metadata for silver layer (enrichment adds features to create silver layer) + try: + metadata_root = config.get_metadata_path() + metadata_manager = MetadataManager(metadata_root) + + # Record metadata for the enrichment + metadata_manager.record_ingestion( + data_type=data_type, + date=end_date, + status='success', + statistics={ + 'records_enriched': result['records_enriched'], + 'dates_processed': result['dates_processed'], + 'features_added': result['features_added'], + 'start_date': start_date, + 'end_date': end_date, + }, + layer='silver' + ) + + # Update watermark + metadata_manager.set_watermark( + data_type=data_type, + date=end_date, + layer='silver' + ) + + click.echo("✓ Metadata recorded for silver layer") + + except Exception as e: + click.echo(f"Warning: Failed to record metadata: {e}", err=True) + @data.command() @click.option('--data-type', '-t', @@ -216,8 +248,8 @@ def convert(data_type, start_date, end_date, incremental): click.echo(f"🔄 Converting {data_type} to Qlib binary format...") writer = QlibBinaryWriter( - enriched_root=config.get_data_root() / 'enriched', - qlib_root=config.get_data_root() / 'qlib', + enriched_root=config.get_silver_path(), + qlib_root=config.get_gold_path() / 'qlib', config=config ) @@ -230,6 +262,38 @@ def convert(data_type, start_date, end_date, incremental): click.echo(f"\n✅ Converted {result['symbols_converted']} symbols") click.echo(f" Features: {result['features_written']}") + + # Record metadata for gold layer (Qlib binary format) + try: + metadata_root = config.get_metadata_path() + metadata_manager = MetadataManager(metadata_root) + + # Record metadata for the conversion + # Use a special data_type to distinguish from regular enrichment + metadata_manager.record_ingestion( + data_type=f"{data_type}_qlib", + date=end_date, + status='success', + statistics={ + 'symbols_converted': result['symbols_converted'], + 'features_written': result['features_written'], + 'start_date': start_date, + 'end_date': end_date, + }, + layer='gold' + ) + + # Update watermark + metadata_manager.set_watermark( + data_type=f"{data_type}_qlib", + date=end_date, + layer='gold' + ) + + click.echo("✓ Metadata recorded for gold layer (Qlib conversion)") + + except Exception as e: + click.echo(f"Warning: Failed to record metadata: {e}", err=True) if 'elapsed_time' in result: click.echo(f" Time: {result['elapsed_time']:.2f}s") @@ -253,9 +317,9 @@ def query(data_type, symbols, fields, start_date, end_date, output, limit): click.echo(f"🔍 Querying {data_type}...") click.echo(f" Symbols: {', '.join(symbols)}") click.echo(f" Fields: {', '.join(fields)}") - + engine = QueryEngine( - data_root=config.get_data_root() / 'enriched', + data_root=config.get_silver_path(), config=config ) diff --git a/src/cli/commands/polygon.py b/src/cli/commands/polygon.py index a830500..d155b0f 100644 --- a/src/cli/commands/polygon.py +++ b/src/cli/commands/polygon.py @@ -12,10 +12,12 @@ import click import asyncio from pathlib import Path -from datetime import date, timedelta +from datetime import date, timedelta, datetime as dt import logging from ...core.config_loader import ConfigLoader +from src.utils.paths import get_quantlake_root +from ...storage.metadata_manager import MetadataManager from ...download import ( PolygonRESTClient, ReferenceDataDownloader, @@ -38,6 +40,44 @@ logger = logging.getLogger(__name__) +def _record_polygon_metadata(data_type: str, records: int, status: str = 'success', error: str = None): + """ + Record metadata for Polygon API downloads + + Args: + data_type: Type of data (fundamentals, corporate_actions, news, short_data) + records: Number of records downloaded + status: Status ('success', 'failed') + error: Optional error message + """ + try: + config = ConfigLoader() + metadata_root = config.get_metadata_path() + metadata_manager = MetadataManager(metadata_root) + + # Use current date as the "date" for API downloads + today = dt.now().strftime('%Y-%m-%d') + + metadata_manager.record_ingestion( + data_type=data_type, + date=today, + status=status, + statistics={ + 'records': records, + 'download_timestamp': dt.now().isoformat() + }, + error=error + ) + + # Update watermark + if status == 'success': + metadata_manager.set_watermark(data_type=data_type, date=today) + + except Exception as e: + # Don't let metadata errors block the download + logger.warning(f"Failed to record metadata: {e}") + + @click.group() def polygon(): """Polygon REST API data downloads""" @@ -63,9 +103,13 @@ def _get_api_key(credentials: dict) -> str: @polygon.command() @click.option('--asset-class', type=str, help='Filter by asset class (stocks, options, crypto, fx, indices)') @click.option('--locale', type=str, help='Filter by locale (us, global)') -@click.option('--output-dir', type=Path, default='data/reference', help='Output directory') +@click.option('--output-dir', type=Path, default=None, help='Output directory') def ticker_types(asset_class, locale, output_dir): """Download ticker types""" + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'reference' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -93,9 +137,13 @@ async def run(): @polygon.command() @click.argument('tickers', nargs=-1, required=True) -@click.option('--output-dir', type=Path, default='data/partitioned_screener', help='Output directory (partitioned structure)') +@click.option('--output-dir', type=Path, default=None, help='Output directory (partitioned structure)') def related_tickers(tickers, output_dir): """Download related tickers for one or more tickers in partitioned structure""" + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'reference' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -137,9 +185,13 @@ async def run(): @click.option('--start-date', type=str, help='Start date (YYYY-MM-DD)') @click.option('--end-date', type=str, help='End-date (YYYY-MM-DD)') @click.option('--include-ipos', is_flag=True, help='Include IPO data') -@click.option('--output-dir', type=Path, default='data/partitioned_screener', help='Output directory (partitioned structure)') +@click.option('--output-dir', type=Path, default=None, help='Output directory (partitioned structure)') def corporate_actions(ticker, start_date, end_date, include_ipos, output_dir): """Download corporate actions (dividends, splits, IPOs) in partitioned structure""" + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'corporate_actions' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -175,6 +227,12 @@ async def run(): if include_ipos: click.echo(f" IPOs: {len(data['ipos'])} records") + # Record metadata + total_records = len(data['dividends']) + len(data['splits']) + if include_ipos: + total_records += len(data['ipos']) + _record_polygon_metadata('corporate_actions', total_records, 'success') + # Show statistics stats = client.get_statistics() click.echo(f"\n📊 Statistics:") @@ -188,9 +246,13 @@ async def run(): @polygon.command() @click.argument('tickers', nargs=-1, required=True) -@click.option('--output-dir', type=Path, default='data/partitioned_screener', help='Output directory (partitioned structure)') +@click.option('--output-dir', type=Path, default=None, help='Output directory (partitioned structure)') def ticker_events(tickers, output_dir): """Download ticker events (symbol changes, rebranding) in partitioned structure""" + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'bronze' / 'corporate_actions' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -230,9 +292,34 @@ async def run(): @polygon.command() @click.argument('tickers', nargs=-1, required=True) @click.option('--timeframe', type=click.Choice(['annual', 'quarterly']), default='quarterly', help='Reporting period') -@click.option('--output-dir', type=Path, default='data/partitioned_screener', help='Output directory (partitioned structure)') -def fundamentals(tickers, timeframe, output_dir): - """Download fundamentals (balance sheets, income statements, cash flow) in partitioned structure""" +@click.option('--filing-date-gte', type=str, default=None, help='Filing date >= YYYY-MM-DD (default: last 180 days)') +@click.option('--filing-date-lt', type=str, default=None, help='Filing date < YYYY-MM-DD (default: today)') +@click.option('--output-dir', type=Path, default=None, help='Output directory (partitioned structure)') +def fundamentals(tickers, timeframe, filing_date_gte, filing_date_lt, output_dir): + """Download fundamentals (balance sheets, income statements, cash flow) in partitioned structure + + OPTIMIZED: Now supports date filtering on API side for much faster downloads! + + For daily updates, use --filing-date-gte to get only recent filings. + Defaults to last 180 days (6 months = 2 quarters) if no dates specified. + + Examples: + quantmini polygon fundamentals AAPL MSFT --filing-date-gte 2024-01-01 + quantmini polygon fundamentals AAPL --filing-date-gte 2024-01-01 --filing-date-lt 2024-12-31 + """ + from datetime import datetime, timedelta + + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'fundamentals' + + # Default to last 180 days (6 months = 2 quarters) if no dates specified + if not filing_date_gte and not filing_date_lt: + today = datetime.now().date() + default_start = today - timedelta(days=180) + filing_date_gte = str(default_start) + click.echo(f"ℹ️ No date range specified, defaulting to last 180 days ({default_start} to {today})") + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -252,16 +339,27 @@ async def run(): output_dir, use_partitioned_structure=True ) - click.echo(f"📥 Downloading {timeframe} fundamentals for {len(tickers)} tickers...") + + date_info = f" from {filing_date_gte or 'beginning'} to {filing_date_lt or 'today'}" + click.echo(f"📥 Downloading {timeframe} fundamentals for {len(tickers)} tickers{date_info}...") click.echo(f"📂 Saving to partitioned structure: {output_dir}/") - data = await downloader.download_financials_batch(list(tickers), timeframe) + data = await downloader.download_financials_batch( + list(tickers), + timeframe, + filing_date_gte=filing_date_gte, + filing_date_lt=filing_date_lt + ) click.echo(f"✅ Downloaded fundamentals:") click.echo(f" Balance sheets: {data['balance_sheets']} records") click.echo(f" Cash flow: {data['cash_flow']} records") click.echo(f" Income statements: {data['income_statements']} records") + # Record metadata + total_records = data['balance_sheets'] + data['cash_flow'] + data['income_statements'] + _record_polygon_metadata('fundamentals', total_records, 'success') + # Show statistics stats = client.get_statistics() click.echo(f"\n📊 Statistics:") @@ -275,11 +373,17 @@ async def run(): @polygon.command() @click.argument('tickers', nargs=-1, required=True) -@click.option('--input-dir', type=Path, default='data/partitioned_screener', help='Input directory with fundamentals data') -@click.option('--output-dir', type=Path, default='data/partitioned_screener', help='Output directory (partitioned structure)') +@click.option('--input-dir', type=Path, default=None, help='Input directory with fundamentals data') +@click.option('--output-dir', type=Path, default=None, help='Output directory (partitioned structure)') @click.option('--include-growth', is_flag=True, default=True, help='Include growth rate calculations') def financial_ratios(tickers, input_dir, output_dir, include_growth): """Calculate financial ratios from fundamentals data in partitioned structure""" + # Use centralized path configuration if paths not specified + if not input_dir: + input_dir = get_quantlake_root() / 'fundamentals' + if not output_dir: + output_dir = get_quantlake_root() / 'fundamentals' + async def run(): downloader = FinancialRatiosDownloader( input_dir, @@ -314,9 +418,13 @@ async def run(): @click.option('--start-date', type=str, help='Start date (YYYY-MM-DD)') @click.option('--end-date', type=str, help='End date (YYYY-MM-DD)') @click.option('--days', type=int, default=90, help='Number of days to download (default: 90)') -@click.option('--output-dir', type=Path, default='data/economy', help='Output directory') +@click.option('--output-dir', type=Path, default=None, help='Output directory') def economy(start_date, end_date, days, output_dir): """Download economy data (treasury yields, inflation, expectations)""" + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'economy' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -361,9 +469,13 @@ async def run(): @polygon.command() @click.option('--date', type=str, help='Date for yield curve (YYYY-MM-DD, default: today)') -@click.option('--output-dir', type=Path, default='data/economy', help='Output directory') +@click.option('--output-dir', type=Path, default=None, help='Output directory') def yield_curve(date_str, output_dir): """Download full treasury yield curve for a specific date""" + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'economy' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -407,9 +519,13 @@ async def run(): @polygon.command() @click.argument('tickers', nargs=-1, required=True) -@click.option('--output-dir', type=Path, default='data/partitioned_screener', help='Output directory (partitioned structure)') +@click.option('--output-dir', type=Path, default=None, help='Output directory (partitioned structure)') def short_interest(tickers, output_dir): """Download short interest data for one or more tickers in partitioned structure""" + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'bronze' / 'fundamentals' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -454,9 +570,13 @@ async def run(): @polygon.command() @click.argument('tickers', nargs=-1, required=True) -@click.option('--output-dir', type=Path, default='data/partitioned_screener', help='Output directory (partitioned structure)') +@click.option('--output-dir', type=Path, default=None, help='Output directory (partitioned structure)') def short_volume(tickers, output_dir): """Download short volume data for one or more tickers in partitioned structure""" + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'bronze' / 'fundamentals' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -501,9 +621,33 @@ async def run(): @polygon.command() @click.argument('tickers', nargs=-1, required=True) -@click.option('--output-dir', type=Path, default='data/partitioned_screener', help='Output directory (partitioned structure)') -def short_data(tickers, output_dir): - """Download both short interest and short volume for one or more tickers in partitioned structure""" +@click.option('--settlement-date-gte', type=str, default=None, help='Short interest: settlement date >= YYYY-MM-DD (default: last 30 days)') +@click.option('--settlement-date-lte', type=str, default=None, help='Short interest: settlement date <= YYYY-MM-DD (default: today)') +@click.option('--date-gte', type=str, default=None, help='Short volume: date >= YYYY-MM-DD (default: last 30 days)') +@click.option('--date-lte', type=str, default=None, help='Short volume: date <= YYYY-MM-DD (default: today)') +@click.option('--output-dir', type=Path, default=None, help='Output directory (partitioned structure)') +def short_data(tickers, settlement_date_gte, settlement_date_lte, date_gte, date_lte, output_dir): + """Download both short interest and short volume for one or more tickers in partitioned structure + + UPDATED: Now uses date filtering on API side for much faster downloads! + + For daily updates, use --settlement-date-gte and --date-gte to get only recent data. + Example: --settlement-date-gte 2025-10-01 --date-gte 2025-10-01 + """ + from datetime import datetime, timedelta + + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'bronze' / 'fundamentals' + + # Default to last 30 days if no dates specified + if not settlement_date_gte and not settlement_date_lte and not date_gte and not date_lte: + today = datetime.now().date() + default_start = today - timedelta(days=30) + settlement_date_gte = str(default_start) + date_gte = str(default_start) + click.echo(f"ℹ️ No date range specified, defaulting to last 30 days ({default_start} to {today})") + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -524,14 +668,26 @@ async def run(): use_partitioned_structure=True ) click.echo(f"📂 Saving to partitioned structure: {output_dir}/") - click.echo(f"📥 Downloading short data for {len(tickers)} tickers...") - data = await downloader.download_short_data_batch(list(tickers)) + date_info = f" from {settlement_date_gte or date_gte or 'beginning'} to {settlement_date_lte or date_lte or 'today'}" + click.echo(f"📥 Downloading short data for {len(tickers)} tickers{date_info}...") + + data = await downloader.download_short_data_batch( + tickers=list(tickers), + settlement_date_gte=settlement_date_gte, + settlement_date_lte=settlement_date_lte, + date_gte=date_gte, + date_lte=date_lte + ) click.echo(f"✅ Downloaded short data:") click.echo(f" Short interest: {len(data['short_interest'])} records") click.echo(f" Short volume: {len(data['short_volume'])} records") + # Record metadata + total_records = len(data['short_interest']) + len(data['short_volume']) + _record_polygon_metadata('short_data', total_records, 'success') + # Show statistics stats = client.get_statistics() click.echo(f"\n📊 Statistics:") @@ -551,9 +707,13 @@ async def run(): @click.option('--timespan', type=click.Choice(['minute', 'hour', 'day', 'week', 'month']), default='day', help='Size of time window') @click.option('--from-date', type=str, help='Start date (YYYY-MM-DD)') @click.option('--to-date', type=str, help='End date (YYYY-MM-DD)') -@click.option('--output-dir', type=Path, default='data/bars', help='Output directory') +@click.option('--output-dir', type=Path, default=None, help='Output directory') def bars(tickers, multiplier, timespan, from_date, to_date, output_dir): """Download aggregate bars (OHLCV) for one or more tickers""" + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'bars' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -578,9 +738,13 @@ async def run(): @polygon.command() @click.argument('tickers', nargs=-1, required=True) -@click.option('--output-dir', type=Path, default='data/snapshots', help='Output directory') +@click.option('--output-dir', type=Path, default=None, help='Output directory') def snapshots(tickers, output_dir): """Download real-time snapshots for one or more tickers""" + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'snapshots' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -600,9 +764,13 @@ async def run(): @polygon.command() -@click.option('--output-dir', type=Path, default='data/market_status', help='Output directory') +@click.option('--output-dir', type=Path, default=None, help='Output directory') def market_status(output_dir): """Download market status, holidays, and metadata""" + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'market_status' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -625,9 +793,13 @@ async def run(): @click.argument('ticker', required=True) @click.option('--indicator', type=click.Choice(['sma', 'ema', 'macd', 'rsi', 'all']), default='all', help='Indicator type') @click.option('--window', type=int, default=50, help='Window size (for SMA/EMA/RSI)') -@click.option('--output-dir', type=Path, default='data/indicators', help='Output directory') +@click.option('--output-dir', type=Path, default=None, help='Output directory') def indicators(ticker, indicator, window, output_dir): """Download technical indicators for a ticker""" + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'indicators' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -662,9 +834,13 @@ async def run(): @polygon.command() @click.option('--underlying', type=str, help='Underlying ticker') @click.option('--expiration', type=str, help='Expiration date (YYYY-MM-DD)') -@click.option('--output-dir', type=Path, default='data/options', help='Output directory') +@click.option('--output-dir', type=Path, default=None, help='Output directory') def options(underlying, expiration, output_dir): """Download options contracts and chains""" + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'options' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -692,7 +868,7 @@ async def run(): @click.option('--end-date', type=str, help='End date for news (YYYY-MM-DD)') @click.option('--days', type=int, default=30, help='Number of days to download (default: 30, used if dates not specified)') @click.option('--limit', type=int, default=1000, help='Number of news articles per ticker (max 1000)') -@click.option('--output-dir', type=Path, default='data/partitioned_screener', help='Output directory (partitioned structure)') +@click.option('--output-dir', type=Path, default=None, help='Output directory (partitioned structure)') def news(tickers, start_date, end_date, days, limit, output_dir): """Download news articles for one or more tickers in partitioned structure @@ -701,6 +877,10 @@ def news(tickers, start_date, end_date, days, limit, output_dir): quantmini polygon news AAPL --start-date 2024-01-01 --end-date 2024-12-31 quantmini polygon news --days 7 # All tickers from the last 7 days """ + # Use centralized path configuration if output_dir not specified + if not output_dir: + output_dir = get_quantlake_root() / 'news' + async def run(): config = ConfigLoader() credentials = config.get_credentials('polygon') @@ -726,6 +906,7 @@ async def run(): click.echo(f"📂 Saving to partitioned structure: {output_dir}/news/") click.echo(f"📅 Date range: {start_date} to {end_date}") + total_articles = 0 if tickers: # Download for specific tickers click.echo(f"📥 Downloading news for {len(tickers)} tickers...") @@ -738,7 +919,8 @@ async def run(): published_utc_lte=end_date, limit=limit ) - click.echo(f"✅ Downloaded {len(df)} news articles") + total_articles = len(df) + click.echo(f"✅ Downloaded {total_articles} news articles") else: # Batch download result = await downloader.download_news_batch( @@ -747,7 +929,8 @@ async def run(): published_utc_lte=end_date, limit=limit ) - click.echo(f"✅ Downloaded {result['total_articles']} total news articles") + total_articles = result['total_articles'] + click.echo(f"✅ Downloaded {total_articles} total news articles") else: # Download all news (no ticker filter) click.echo(f"📥 Downloading all news articles...") @@ -757,7 +940,11 @@ async def run(): published_utc_lte=end_date, limit=limit ) - click.echo(f"✅ Downloaded {len(df)} news articles") + total_articles = len(df) + click.echo(f"✅ Downloaded {total_articles} news articles") + + # Record metadata + _record_polygon_metadata('news', total_articles, 'success') # Show statistics stats = client.get_statistics() diff --git a/src/cli/commands/transform.py b/src/cli/commands/transform.py new file mode 100644 index 0000000..3738d2c --- /dev/null +++ b/src/cli/commands/transform.py @@ -0,0 +1,751 @@ +"""Data transformation commands for silver layer generation.""" + +import click +import sys +from pathlib import Path +from datetime import datetime +import logging + +import polars as pl + +# Import centralized path utilities +from src.utils.paths import get_quantlake_root +from src.storage.metadata_manager import MetadataManager +from src.core.config_loader import ConfigLoader + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +@click.group() +def transform(): + """Bronze to silver layer transformations.""" + pass + + +@transform.command('financial-ratios') +@click.option('--bronze-dir', '-b', + type=click.Path(exists=True), + default=None, + help='Bronze layer financial ratios directory (default: $QUANTLAKE_ROOT/fundamentals/financial_ratios)') +@click.option('--silver-dir', '-s', + type=click.Path(), + default=None, + help='Silver layer output directory (default: $QUANTLAKE_ROOT/silver/financial_ratios)') +def financial_ratios(bronze_dir, silver_dir): + """Move financial ratios from bronze to silver layer.""" + + # Use environment variable defaults if not specified + quantlake_root = get_quantlake_root() + bronze_path = Path(bronze_dir) if bronze_dir else quantlake_root / 'fundamentals' / 'financial_ratios' + silver_path = Path(silver_dir) if silver_dir else quantlake_root / 'silver' / 'financial_ratios' + + click.echo("="*80) + click.echo("MOVING FINANCIAL RATIOS TO SILVER LAYER") + click.echo("="*80) + click.echo(f"Bronze path: {bronze_path}") + click.echo(f"Silver path: {silver_path}") + click.echo("") + + # Find all parquet files + all_files = list(bronze_path.rglob("*.parquet")) + click.echo(f"Found {len(all_files):,} files") + + # Load all files + click.echo("Loading and consolidating files...") + dfs = [] + for file_path in all_files: + try: + df = pl.read_parquet(file_path) + dfs.append(df) + except Exception as e: + click.echo(f"Warning: Failed to read {file_path}: {e}", err=True) + continue + + # Combine all data (using vertical_relaxed to handle schema differences) + click.echo(f"Combining {len(dfs)} dataframes...") + # Collect all unique columns across all dataframes + all_columns = set() + for df in dfs: + all_columns.update(df.columns) + + # Ensure all dataframes have the same columns (fill missing with nulls) + aligned_dfs = [] + for df in dfs: + missing_cols = all_columns - set(df.columns) + for col in missing_cols: + df = df.with_columns(pl.lit(None).alias(col)) + aligned_dfs.append(df.select(sorted(all_columns))) + + combined_df = pl.concat(aligned_dfs, how="vertical_relaxed") + + click.echo(f"Total records: {len(combined_df):,}") + click.echo(f"Total columns: {len(combined_df.columns)}") + click.echo(f"Unique tickers: {combined_df['ticker'].n_unique()}") + + # Add processed_at timestamp + combined_df = combined_df.with_columns( + pl.lit(datetime.now()).alias('processed_at') + ) + + # Save to silver layer partitioned by fiscal_year and fiscal_period + click.echo("") + click.echo("Saving to silver layer...") + silver_path.mkdir(parents=True, exist_ok=True) + + for (year, quarter), group_df in combined_df.group_by(['fiscal_year', 'fiscal_period']): + partition_dir = silver_path / f"year={year}" / f"quarter={quarter}" + partition_dir.mkdir(parents=True, exist_ok=True) + + output_file = partition_dir / "data.parquet" + group_df.write_parquet( + output_file, + compression='zstd', + compression_level=3 + ) + + click.echo(f" Saved: year={year}, quarter={quarter} ({len(group_df):,} records)") + + click.echo("") + click.echo("✓ Financial ratios moved to silver layer") + click.echo(f" Location: {silver_path}") + click.echo(f" Total records: {len(combined_df):,}") + click.echo(f" Total columns: {len(combined_df.columns)}") + click.echo(f" Partitioning: fiscal_year / fiscal_period") + click.echo("") + + # Record metadata for silver layer + try: + config = ConfigLoader() + metadata_root = config.get_metadata_path() + metadata_manager = MetadataManager(metadata_root) + + # Get date range from the combined data + if 'filing_date' in combined_df.columns: + min_date = str(combined_df['filing_date'].min()) + max_date = str(combined_df['filing_date'].max()) + else: + max_date = datetime.now().strftime('%Y-%m-%d') + min_date = max_date + + # Record metadata + metadata_manager.record_ingestion( + data_type='financial_ratios', + date=max_date, + status='success', + statistics={ + 'records': len(combined_df), + 'tickers': combined_df['ticker'].n_unique(), + 'columns': len(combined_df.columns), + 'min_filing_date': min_date, + 'max_filing_date': max_date, + }, + layer='silver' + ) + + # Update watermark + metadata_manager.set_watermark( + data_type='financial_ratios', + date=max_date, + layer='silver' + ) + + click.echo("✓ Metadata recorded for silver layer") + + except Exception as e: + click.echo(f"Warning: Failed to record metadata: {e}", err=True) + + +@transform.command('corporate-actions') +@click.option('--bronze-dir', '-b', + type=click.Path(exists=True), + default=None, + help='Bronze layer corporate actions directory (default: $QUANTLAKE_ROOT/bronze/corporate_actions)') +@click.option('--silver-dir', '-s', + type=click.Path(), + default=None, + help='Silver layer output directory (default: $QUANTLAKE_ROOT/silver/ticker_events)') +def corporate_actions(bronze_dir, silver_dir): + """Consolidate corporate actions (dividends, splits, IPOs) to silver layer.""" + + # Use environment variable defaults if not specified + quantlake_root = get_quantlake_root() + bronze_path = Path(bronze_dir) if bronze_dir else quantlake_root / 'bronze' / 'corporate_actions' + silver_path = Path(silver_dir) if silver_dir else quantlake_root / 'silver' / 'ticker_events' + + click.echo("="*80) + click.echo("PHASE 3: CORPORATE ACTIONS CONSOLIDATION") + click.echo("="*80) + click.echo(f"Bronze path: {bronze_path}") + click.echo(f"Silver path: {silver_path}") + click.echo("") + + # Process dividends + def process_dividends(): + click.echo("Processing DIVIDENDS...") + dividends_path = bronze_path / "dividends" + + if not dividends_path.exists(): + click.echo(f"Warning: Dividends path not found: {dividends_path}", err=True) + return None + + all_files = list(dividends_path.rglob("*.parquet")) + click.echo(f" Found {len(all_files):,} dividend files") + + dfs = [] + for file_path in all_files: + try: + df = pl.read_parquet(file_path) + dfs.append(df) + except Exception as e: + click.echo(f"Warning: Failed to read {file_path}: {e}", err=True) + continue + + if not dfs: + return None + + combined_df = pl.concat(dfs, how="vertical_relaxed") + + unified_df = combined_df.select([ + pl.col('ticker'), + pl.lit('dividend').alias('action_type'), + pl.col('ex_dividend_date').str.to_date().alias('event_date'), + pl.col('id'), + pl.col('downloaded_at'), + pl.col('cash_amount').alias('div_cash_amount'), + pl.col('currency').alias('div_currency'), + pl.col('declaration_date').str.to_date().alias('div_declaration_date'), + pl.col('dividend_type').alias('div_dividend_type'), + pl.col('ex_dividend_date').str.to_date().alias('div_ex_dividend_date'), + pl.col('frequency').alias('div_frequency'), + pl.col('pay_date').str.to_date().alias('div_pay_date'), + pl.col('record_date').str.to_date().alias('div_record_date'), + pl.lit(None).cast(pl.Date).alias('split_execution_date'), + pl.lit(None).cast(pl.Float64).alias('split_from'), + pl.lit(None).cast(pl.Float64).alias('split_to'), + pl.lit(None).cast(pl.Float64).alias('split_ratio'), + pl.lit(None).cast(pl.Date).alias('ipo_last_updated'), + pl.lit(None).cast(pl.Date).alias('ipo_announced_date'), + pl.lit(None).cast(pl.Date).alias('ipo_listing_date'), + pl.lit(None).cast(pl.String).alias('ipo_issuer_name'), + pl.lit(None).cast(pl.String).alias('ipo_currency_code'), + pl.lit(None).cast(pl.String).alias('ipo_us_code'), + pl.lit(None).cast(pl.String).alias('ipo_isin'), + pl.lit(None).cast(pl.Float64).alias('ipo_final_issue_price'), + pl.lit(None).cast(pl.Int64).alias('ipo_max_shares_offered'), + pl.lit(None).cast(pl.Float64).alias('ipo_lowest_offer_price'), + pl.lit(None).cast(pl.Float64).alias('ipo_highest_offer_price'), + pl.lit(None).cast(pl.Float64).alias('ipo_total_offer_size'), + pl.lit(None).cast(pl.String).alias('ipo_primary_exchange'), + pl.lit(None).cast(pl.Int64).alias('ipo_shares_outstanding'), + pl.lit(None).cast(pl.String).alias('ipo_security_type'), + pl.lit(None).cast(pl.Int64).alias('ipo_lot_size'), + pl.lit(None).cast(pl.String).alias('ipo_security_description'), + pl.lit(None).cast(pl.String).alias('ipo_status'), + # Ticker event specific fields (null for dividends) + pl.lit(None).cast(pl.String).alias('new_ticker'), + pl.lit(None).cast(pl.String).alias('event_type'), + ]) + + click.echo(f" Processed {len(unified_df):,} dividend records") + return unified_df + + # Process splits + def process_splits(): + click.echo("Processing SPLITS...") + splits_path = bronze_path / "splits" + + if not splits_path.exists(): + click.echo(f"Warning: Splits path not found: {splits_path}", err=True) + return None + + all_files = list(splits_path.rglob("*.parquet")) + click.echo(f" Found {len(all_files):,} split files") + + dfs = [] + for file_path in all_files: + try: + df = pl.read_parquet(file_path) + dfs.append(df) + except Exception as e: + click.echo(f"Warning: Failed to read {file_path}: {e}", err=True) + continue + + if not dfs: + return None + + combined_df = pl.concat(dfs, how="vertical_relaxed") + + unified_df = combined_df.select([ + pl.col('ticker'), + pl.lit('split').alias('action_type'), + pl.col('execution_date').str.to_date().alias('event_date'), + pl.col('id'), + pl.col('downloaded_at'), + pl.lit(None).cast(pl.Float64).alias('div_cash_amount'), + pl.lit(None).cast(pl.String).alias('div_currency'), + pl.lit(None).cast(pl.Date).alias('div_declaration_date'), + pl.lit(None).cast(pl.String).alias('div_dividend_type'), + pl.lit(None).cast(pl.Date).alias('div_ex_dividend_date'), + pl.lit(None).cast(pl.Int64).alias('div_frequency'), + pl.lit(None).cast(pl.Date).alias('div_pay_date'), + pl.lit(None).cast(pl.Date).alias('div_record_date'), + pl.col('execution_date').str.to_date().alias('split_execution_date'), + pl.col('split_from').alias('split_from'), + pl.col('split_to').alias('split_to'), + (pl.col('split_to') / pl.col('split_from')).alias('split_ratio'), + pl.lit(None).cast(pl.Date).alias('ipo_last_updated'), + pl.lit(None).cast(pl.Date).alias('ipo_announced_date'), + pl.lit(None).cast(pl.Date).alias('ipo_listing_date'), + pl.lit(None).cast(pl.String).alias('ipo_issuer_name'), + pl.lit(None).cast(pl.String).alias('ipo_currency_code'), + pl.lit(None).cast(pl.String).alias('ipo_us_code'), + pl.lit(None).cast(pl.String).alias('ipo_isin'), + pl.lit(None).cast(pl.Float64).alias('ipo_final_issue_price'), + pl.lit(None).cast(pl.Int64).alias('ipo_max_shares_offered'), + pl.lit(None).cast(pl.Float64).alias('ipo_lowest_offer_price'), + pl.lit(None).cast(pl.Float64).alias('ipo_highest_offer_price'), + pl.lit(None).cast(pl.Float64).alias('ipo_total_offer_size'), + pl.lit(None).cast(pl.String).alias('ipo_primary_exchange'), + pl.lit(None).cast(pl.Int64).alias('ipo_shares_outstanding'), + pl.lit(None).cast(pl.String).alias('ipo_security_type'), + pl.lit(None).cast(pl.Int64).alias('ipo_lot_size'), + pl.lit(None).cast(pl.String).alias('ipo_security_description'), + pl.lit(None).cast(pl.String).alias('ipo_status'), + # Ticker event specific fields (null for splits) + pl.lit(None).cast(pl.String).alias('new_ticker'), + pl.lit(None).cast(pl.String).alias('event_type'), + ]) + + click.echo(f" Processed {len(unified_df):,} split records") + return unified_df + + # Process IPOs + def process_ipos(): + click.echo("Processing IPOS...") + ipos_path = bronze_path / "ipos" + + if not ipos_path.exists(): + click.echo(f"Warning: IPOs path not found: {ipos_path}", err=True) + return None + + all_files = list(ipos_path.rglob("*.parquet")) + click.echo(f" Found {len(all_files):,} IPO files") + + dfs = [] + for file_path in all_files: + try: + df = pl.read_parquet(file_path) + dfs.append(df) + except Exception as e: + click.echo(f"Warning: Failed to read {file_path}: {e}", err=True) + continue + + if not dfs: + return None + + combined_df = pl.concat(dfs, how="vertical_relaxed") + + # Generate ID if not present + if 'id' not in combined_df.columns: + combined_df = combined_df.with_columns( + (pl.col('ticker') + '_' + pl.col('listing_date')).alias('id') + ) + + unified_df = combined_df.select([ + pl.col('ticker'), + pl.lit('ipo').alias('action_type'), + pl.col('listing_date').str.to_date().alias('event_date'), + pl.col('id'), + pl.col('downloaded_at'), + pl.lit(None).cast(pl.Float64).alias('div_cash_amount'), + pl.lit(None).cast(pl.String).alias('div_currency'), + pl.lit(None).cast(pl.Date).alias('div_declaration_date'), + pl.lit(None).cast(pl.String).alias('div_dividend_type'), + pl.lit(None).cast(pl.Date).alias('div_ex_dividend_date'), + pl.lit(None).cast(pl.Int64).alias('div_frequency'), + pl.lit(None).cast(pl.Date).alias('div_pay_date'), + pl.lit(None).cast(pl.Date).alias('div_record_date'), + pl.lit(None).cast(pl.Date).alias('split_execution_date'), + pl.lit(None).cast(pl.Float64).alias('split_from'), + pl.lit(None).cast(pl.Float64).alias('split_to'), + pl.lit(None).cast(pl.Float64).alias('split_ratio'), + pl.col('last_updated').str.to_date().alias('ipo_last_updated'), + pl.col('announced_date').str.to_date().alias('ipo_announced_date'), + pl.col('listing_date').str.to_date().alias('ipo_listing_date'), + pl.col('issuer_name').alias('ipo_issuer_name'), + pl.col('currency_code').alias('ipo_currency_code'), + pl.col('us_code').alias('ipo_us_code'), + pl.col('isin').alias('ipo_isin'), + pl.col('final_issue_price').alias('ipo_final_issue_price'), + pl.col('max_shares_offered').alias('ipo_max_shares_offered'), + pl.col('lowest_offer_price').alias('ipo_lowest_offer_price'), + pl.col('highest_offer_price').alias('ipo_highest_offer_price'), + pl.col('total_offer_size').alias('ipo_total_offer_size'), + pl.col('primary_exchange').alias('ipo_primary_exchange'), + pl.col('shares_outstanding').alias('ipo_shares_outstanding'), + pl.col('security_type').alias('ipo_security_type'), + pl.col('lot_size').alias('ipo_lot_size'), + pl.col('security_description').alias('ipo_security_description'), + pl.col('ipo_status').alias('ipo_status'), + # Ticker event specific fields (null for IPOs) + pl.lit(None).cast(pl.String).alias('new_ticker'), + pl.lit(None).cast(pl.String).alias('event_type'), + ]) + + click.echo(f" Processed {len(unified_df):,} IPO records") + return unified_df + + # Process ticker events (symbol changes) + def process_ticker_events(): + click.echo("Processing TICKER EVENTS...") + ticker_events_path = bronze_path / "ticker_events" + + if not ticker_events_path.exists(): + click.echo(f"Warning: Ticker events path not found: {ticker_events_path}", err=True) + return None + + all_files = list(ticker_events_path.rglob("*.parquet")) + click.echo(f" Found {len(all_files):,} ticker event files") + + if not all_files: + return None + + dfs = [] + for file_path in all_files: + try: + df = pl.read_parquet(file_path) + dfs.append(df) + except Exception as e: + click.echo(f"Warning: Failed to read {file_path}: {e}", err=True) + continue + + if not dfs: + return None + + combined_df = pl.concat(dfs, how="vertical_relaxed") + + # Generate ID if not present + if 'id' not in combined_df.columns: + combined_df = combined_df.with_columns( + (pl.col('ticker') + '_' + pl.col('date')).alias('id') + ) + + # Create unified schema matching other action types + unified_df = combined_df.select([ + pl.col('ticker'), + pl.lit('ticker_change').alias('action_type'), + pl.col('date').str.to_date().alias('event_date'), + pl.col('id'), + pl.col('downloaded_at'), + pl.lit(None).cast(pl.Float64).alias('div_cash_amount'), + pl.lit(None).cast(pl.String).alias('div_currency'), + pl.lit(None).cast(pl.Date).alias('div_declaration_date'), + pl.lit(None).cast(pl.String).alias('div_dividend_type'), + pl.lit(None).cast(pl.Date).alias('div_ex_dividend_date'), + pl.lit(None).cast(pl.Int64).alias('div_frequency'), + pl.lit(None).cast(pl.Date).alias('div_pay_date'), + pl.lit(None).cast(pl.Date).alias('div_record_date'), + pl.lit(None).cast(pl.Date).alias('split_execution_date'), + pl.lit(None).cast(pl.Float64).alias('split_from'), + pl.lit(None).cast(pl.Float64).alias('split_to'), + pl.lit(None).cast(pl.Float64).alias('split_ratio'), + pl.lit(None).cast(pl.Date).alias('ipo_last_updated'), + pl.lit(None).cast(pl.Date).alias('ipo_announced_date'), + pl.lit(None).cast(pl.Date).alias('ipo_listing_date'), + pl.lit(None).cast(pl.String).alias('ipo_issuer_name'), + pl.lit(None).cast(pl.String).alias('ipo_currency_code'), + pl.lit(None).cast(pl.String).alias('ipo_us_code'), + pl.lit(None).cast(pl.String).alias('ipo_isin'), + pl.lit(None).cast(pl.Float64).alias('ipo_final_issue_price'), + pl.lit(None).cast(pl.Int64).alias('ipo_max_shares_offered'), + pl.lit(None).cast(pl.Float64).alias('ipo_lowest_offer_price'), + pl.lit(None).cast(pl.Float64).alias('ipo_highest_offer_price'), + pl.lit(None).cast(pl.Float64).alias('ipo_total_offer_size'), + pl.lit(None).cast(pl.String).alias('ipo_primary_exchange'), + pl.lit(None).cast(pl.Int64).alias('ipo_shares_outstanding'), + pl.lit(None).cast(pl.String).alias('ipo_security_type'), + pl.lit(None).cast(pl.Int64).alias('ipo_lot_size'), + pl.lit(None).cast(pl.String).alias('ipo_security_description'), + pl.lit(None).cast(pl.String).alias('ipo_status'), + # Ticker event specific fields + pl.col('new_ticker') if 'new_ticker' in combined_df.columns else pl.lit(None).cast(pl.String).alias('new_ticker'), + pl.col('event_type') if 'event_type' in combined_df.columns else pl.lit(None).cast(pl.String).alias('event_type'), + ]) + + click.echo(f" Processed {len(unified_df):,} ticker event records") + return unified_df + + # Process each corporate action type + dividends_df = process_dividends() + splits_df = process_splits() + ipos_df = process_ipos() + ticker_events_df = process_ticker_events() + + # Combine all corporate actions + click.echo("") + click.echo("Combining all corporate actions...") + + all_dfs = [] + if dividends_df is not None: + all_dfs.append(dividends_df) + if splits_df is not None: + all_dfs.append(splits_df) + if ipos_df is not None: + all_dfs.append(ipos_df) + if ticker_events_df is not None: + all_dfs.append(ticker_events_df) + + if not all_dfs: + click.echo("Error: No corporate actions found!", err=True) + return + + combined_df = pl.concat(all_dfs, how="vertical_relaxed") + + # Add metadata columns + combined_df = combined_df.with_columns([ + pl.lit(datetime.now()).alias('processed_at'), + pl.col('event_date').dt.year().alias('year'), + pl.col('event_date').dt.month().alias('month'), + ]) + + # Summary statistics + click.echo(f"Total records: {len(combined_df):,}") + click.echo(f"Total columns: {len(combined_df.columns)}") + click.echo("") + click.echo("Records by action type:") + for action_type, count in combined_df.group_by('action_type').agg(pl.len()).iter_rows(): + click.echo(f" {action_type}: {count:,}") + + click.echo("") + click.echo(f"Unique tickers: {combined_df['ticker'].n_unique()}") + click.echo(f"Date range: {combined_df['event_date'].min()} to {combined_df['event_date'].max()}") + + # Save to silver layer partitioned by year and month + click.echo("") + click.echo("Saving to silver layer...") + silver_path.mkdir(parents=True, exist_ok=True) + + for (year, month), group_df in combined_df.group_by(['year', 'month']): + partition_dir = silver_path / f"year={year}" / f"month={month:02d}" + partition_dir.mkdir(parents=True, exist_ok=True) + + output_file = partition_dir / "data.parquet" + group_df.write_parquet( + output_file, + compression='zstd', + compression_level=3, + use_pyarrow_extension_array=False # Disable dictionary encoding to prevent schema conflicts + ) + + click.echo(f" Saved: year={year}, month={month:02d} ({len(group_df):,} records)") + + click.echo("") + click.echo("✓ Corporate actions consolidated to silver layer") + click.echo(f" Location: {silver_path}") + click.echo(f" Total records: {len(combined_df):,}") + click.echo(f" Total columns: {len(combined_df.columns)}") + click.echo(f" Partitioning: year / month") + click.echo(f" Action types: {', '.join(combined_df['action_type'].unique().sort())}") + click.echo("") + + +@transform.command('fundamentals') +@click.option('--bronze-dir', '-b', + type=click.Path(exists=True), + default=None, + help='Bronze layer fundamentals directory (default: $QUANTLAKE_ROOT/fundamentals)') +@click.option('--silver-dir', '-s', + type=click.Path(), + default=None, + help='Silver layer output directory (default: $QUANTLAKE_ROOT/silver/fundamentals_wide)') +@click.option('--tickers', '-t', + multiple=True, + help='Tickers to process (if not specified, processes all)') +def fundamentals(bronze_dir, silver_dir, tickers): + """Flatten fundamentals (balance sheets, income statements, cash flow) to wide format.""" + + # Use environment variable defaults if not specified + quantlake_root = get_quantlake_root() + bronze_path = Path(bronze_dir) if bronze_dir else quantlake_root / 'fundamentals' + silver_path = Path(silver_dir) if silver_dir else quantlake_root / 'silver' / 'fundamentals_wide' + + click.echo("="*80) + click.echo("FLATTENING FUNDAMENTALS TO SILVER LAYER") + click.echo("="*80) + click.echo(f"Bronze path: {bronze_path}") + click.echo(f"Silver path: {silver_path}") + click.echo("") + + # Find all tickers if not specified + if not tickers: + balance_sheet_dir = bronze_path / 'balance_sheets' + if balance_sheet_dir.exists(): + ticker_files = list(balance_sheet_dir.rglob("ticker=*.parquet")) + tickers = list(set([f.stem.replace('ticker=', '') for f in ticker_files])) + click.echo(f"Found {len(tickers)} tickers to process") + else: + click.echo("Error: Balance sheets directory not found!", err=True) + return + else: + click.echo(f"Processing {len(tickers)} specified tickers") + + # Process each ticker + all_wide_dfs = [] + + for ticker in tickers: + try: + # Load balance sheets + bs_files = list(bronze_path.glob(f'balance_sheets/**/ticker={ticker}.parquet')) + if not bs_files: + click.echo(f" Skipping {ticker}: No balance sheet data", err=True) + continue + + bs_df = pl.read_parquet(bs_files[0]) if len(bs_files) == 1 else pl.concat([pl.read_parquet(f) for f in bs_files]) + + # Extract ticker from tickers array (Polygon returns a list) + if 'tickers' in bs_df.columns: + bs_df = bs_df.with_columns( + pl.col('tickers').list.first().alias('ticker') + ).drop('tickers') + + # Load income statements + is_files = list(bronze_path.glob(f'income_statements/**/ticker={ticker}.parquet')) + is_df = pl.read_parquet(is_files[0]) if is_files and len(is_files) == 1 else (pl.concat([pl.read_parquet(f) for f in is_files]) if is_files else None) + + # Extract ticker from tickers array + if is_df is not None and 'tickers' in is_df.columns: + is_df = is_df.with_columns( + pl.col('tickers').list.first().alias('ticker') + ).drop('tickers') + + # Load cash flow + cf_files = list(bronze_path.glob(f'cash_flow/**/ticker={ticker}.parquet')) + cf_df = pl.read_parquet(cf_files[0]) if cf_files and len(cf_files) == 1 else (pl.concat([pl.read_parquet(f) for f in cf_files]) if cf_files else None) + + # Extract ticker from tickers array + if cf_df is not None and 'tickers' in cf_df.columns: + cf_df = cf_df.with_columns( + pl.col('tickers').list.first().alias('ticker') + ).drop('tickers') + + # Rename columns with prefixes + bs_df = bs_df.rename({col: f'bs_{col}' for col in bs_df.columns if col not in ['ticker', 'filing_date', 'fiscal_year', 'fiscal_period', 'fiscal_quarter']}) + + if is_df is not None: + is_df = is_df.rename({col: f'is_{col}' for col in is_df.columns if col not in ['ticker', 'filing_date', 'fiscal_year', 'fiscal_period', 'fiscal_quarter']}) + + if cf_df is not None: + cf_df = cf_df.rename({col: f'cf_{col}' for col in cf_df.columns if col not in ['ticker', 'filing_date', 'fiscal_year', 'fiscal_period', 'fiscal_quarter']}) + + # Merge on common keys + wide_df = bs_df + + if is_df is not None: + wide_df = wide_df.join( + is_df, + on=['ticker', 'filing_date', 'fiscal_year', 'fiscal_period'], + how='outer_coalesce' + ) + + if cf_df is not None: + wide_df = wide_df.join( + cf_df, + on=['ticker', 'filing_date', 'fiscal_year', 'fiscal_period'], + how='outer_coalesce' + ) + + all_wide_dfs.append(wide_df) + click.echo(f" Processed {ticker}: {len(wide_df)} quarters, {len(wide_df.columns)} columns") + + except Exception as e: + click.echo(f" Error processing {ticker}: {e}", err=True) + continue + + if not all_wide_dfs: + click.echo("Error: No fundamentals data processed!", err=True) + return + + # Combine all tickers + click.echo("") + click.echo("Combining all tickers...") + combined_df = pl.concat(all_wide_dfs, how="diagonal_relaxed") + + # Add processed_at timestamp + combined_df = combined_df.with_columns( + pl.lit(datetime.now()).alias('processed_at') + ) + + click.echo(f"Total records: {len(combined_df):,}") + click.echo(f"Total columns: {len(combined_df.columns)}") + click.echo(f"Unique tickers: {combined_df['ticker'].n_unique()}") + + # Save to silver layer partitioned by fiscal_year and fiscal_period + click.echo("") + click.echo("Saving to silver layer...") + silver_path.mkdir(parents=True, exist_ok=True) + + for (year, quarter), group_df in combined_df.group_by(['fiscal_year', 'fiscal_period']): + partition_dir = silver_path / f"year={year}" / f"quarter={quarter}" + partition_dir.mkdir(parents=True, exist_ok=True) + + output_file = partition_dir / "data.parquet" + group_df.write_parquet( + output_file, + compression='zstd', + compression_level=3 + ) + + click.echo(f" Saved: year={year}, quarter={quarter} ({len(group_df):,} records)") + + click.echo("") + click.echo("✓ Fundamentals flattened to silver layer") + click.echo(f" Location: {silver_path}") + click.echo(f" Total records: {len(combined_df):,}") + click.echo(f" Total columns: {len(combined_df.columns)}") + click.echo(f" Partitioning: fiscal_year / fiscal_period") + click.echo("") + + # Record metadata for silver layer + try: + config = ConfigLoader() + metadata_root = config.get_metadata_path() + metadata_manager = MetadataManager(metadata_root) + + # Get date range from the combined data + if 'filing_date' in combined_df.columns: + min_date = str(combined_df['filing_date'].min()) + max_date = str(combined_df['filing_date'].max()) + else: + max_date = datetime.now().strftime('%Y-%m-%d') + min_date = max_date + + # Record metadata + metadata_manager.record_ingestion( + data_type='fundamentals', + date=max_date, + status='success', + statistics={ + 'records': len(combined_df), + 'tickers': combined_df['ticker'].n_unique(), + 'columns': len(combined_df.columns), + 'min_filing_date': min_date, + 'max_filing_date': max_date, + }, + layer='silver' + ) + + # Update watermark + metadata_manager.set_watermark( + data_type='fundamentals', + date=max_date, + layer='silver' + ) + + click.echo("✓ Metadata recorded for silver layer") + + except Exception as e: + click.echo(f"Warning: Failed to record metadata: {e}", err=True) diff --git a/src/download/corporate_actions.py b/src/download/corporate_actions.py index c3804e0..faa1504 100644 --- a/src/download/corporate_actions.py +++ b/src/download/corporate_actions.py @@ -63,7 +63,7 @@ def _save_partitioned( Args: df: DataFrame to save - data_type: Type of data (dividends, splits, etc.) + data_type: Type of data (dividends, splits, ipos, ticker_events) date_column: Column name for date partitioning """ if len(df) == 0: @@ -113,7 +113,7 @@ def _save_partitioned( (pl.col('ticker') == ticker) ).drop(['year', 'month']) - # Create partition directory: year=2024/month=10/ticker=AAPL.parquet + # Create partition directory: {data_type}/year=2024/month=10/ticker=AAPL.parquet partition_dir = self.output_dir / data_type / f'year={year}' / f'month={month:02d}' partition_dir.mkdir(parents=True, exist_ok=True) @@ -124,7 +124,12 @@ def _save_partitioned( existing_df = pl.read_parquet(output_file) partition_df = pl.concat([existing_df, partition_df], how="diagonal") - partition_df.write_parquet(str(output_file), compression='zstd') + partition_df.write_parquet( + str(output_file), + compression='zstd', + use_pyarrow=True, + pyarrow_options={'use_dictionary': False} # Disable dictionary encoding to prevent schema conflicts + ) logger.info(f"Saved {len(partition_df)} records to {output_file}") async def download_dividends( diff --git a/src/storage/metadata_manager.py b/src/storage/metadata_manager.py index d555e0f..59c8d29 100755 --- a/src/storage/metadata_manager.py +++ b/src/storage/metadata_manager.py @@ -57,7 +57,8 @@ def record_ingestion( status: str, statistics: Dict[str, Any], symbol: Optional[str] = None, - error: Optional[str] = None + error: Optional[str] = None, + layer: str = 'bronze' ): """ Record ingestion result @@ -69,6 +70,7 @@ def record_ingestion( statistics: Ingestion statistics symbol: Optional symbol (for minute data) error: Optional error message + layer: Medallion layer ('landing', 'bronze', 'silver', 'gold') """ try: # Build metadata record @@ -77,19 +79,20 @@ def record_ingestion( 'date': date, 'symbol': symbol, 'status': status, + 'layer': layer, 'timestamp': datetime.now().isoformat(), 'statistics': statistics, 'error': error, } # Save to file - metadata_file = self._get_metadata_file(data_type, date, symbol) + metadata_file = self._get_metadata_file(data_type, date, symbol, layer) metadata_file.parent.mkdir(parents=True, exist_ok=True) with open(metadata_file, 'w') as f: json.dump(record, f, indent=2) - logger.debug(f"Recorded ingestion: {data_type} / {date} / {status}") + logger.debug(f"Recorded ingestion: {layer}/{data_type} / {date} / {status}") except Exception as e: raise MetadataManagerError(f"Failed to record ingestion: {e}") @@ -131,7 +134,8 @@ def list_ingestions( data_type: str, start_date: Optional[str] = None, end_date: Optional[str] = None, - status: Optional[str] = None + status: Optional[str] = None, + layer: Optional[str] = None ) -> List[Dict[str, Any]]: """ List ingestion records with optional filtering @@ -141,6 +145,7 @@ def list_ingestions( start_date: Optional start date filter end_date: Optional end date filter status: Optional status filter + layer: Optional layer filter ('landing', 'bronze', 'silver', 'gold') Returns: List of metadata records @@ -148,28 +153,52 @@ def list_ingestions( try: records = [] - metadata_dir = self.metadata_root / data_type - if not metadata_dir.exists(): - return records + # Determine which directories to search + if layer: + search_dirs = [self.metadata_root / layer / data_type] + else: + # Search all layers for backward compatibility + search_dirs = [] + for layer_name in ['landing', 'bronze', 'silver', 'gold']: + layer_dir = self.metadata_root / layer_name / data_type + if layer_dir.exists(): + search_dirs.append(layer_dir) + + # Also check old flat structure for backward compatibility + old_dir = self.metadata_root / data_type + if old_dir.exists(): + search_dirs.append(old_dir) + + # Find all metadata files (exclude watermark files) + for metadata_dir in search_dirs: + if not metadata_dir.exists(): + continue + + for metadata_file in metadata_dir.rglob('*.json'): + # Skip watermark files + if 'watermark' in metadata_file.name: + continue - # Find all metadata files - for metadata_file in metadata_dir.rglob('*.json'): - try: - with open(metadata_file, 'r') as f: - record = json.load(f) + try: + with open(metadata_file, 'r') as f: + record = json.load(f) - # Apply filters - if start_date and record['date'] < start_date: - continue - if end_date and record['date'] > end_date: - continue - if status and record['status'] != status: - continue + # Skip if missing required fields (e.g., watermark files) + if 'status' not in record or 'date' not in record: + continue + + # Apply filters + if start_date and record['date'] < start_date: + continue + if end_date and record['date'] > end_date: + continue + if status and record['status'] != status: + continue - records.append(record) + records.append(record) - except Exception as e: - logger.warning(f"Failed to read {metadata_file}: {e}") + except Exception as e: + logger.warning(f"Failed to read {metadata_file}: {e}") # Sort by date records.sort(key=lambda r: (r['date'], r.get('symbol', ''))) @@ -182,7 +211,8 @@ def list_ingestions( def get_watermark( self, data_type: str, - symbol: Optional[str] = None + symbol: Optional[str] = None, + layer: str = 'bronze' ) -> Optional[str]: """ Get watermark (latest successfully ingested date) for incremental processing @@ -190,12 +220,13 @@ def get_watermark( Args: data_type: Data type symbol: Optional symbol + layer: Medallion layer Returns: Latest date string or None """ try: - records = self.list_ingestions(data_type, status='success') + records = self.list_ingestions(data_type, status='success', layer=layer) if symbol: records = [r for r in records if r.get('symbol') == symbol] @@ -215,7 +246,8 @@ def set_watermark( self, data_type: str, date: str, - symbol: Optional[str] = None + symbol: Optional[str] = None, + layer: str = 'bronze' ): """ Set watermark for incremental processing @@ -223,16 +255,18 @@ def set_watermark( Args: data_type: Data type date: Date string + layer: Medallion layer symbol: Optional symbol """ try: - watermark_file = self._get_watermark_file(data_type, symbol) + watermark_file = self._get_watermark_file(data_type, symbol, layer) watermark_file.parent.mkdir(parents=True, exist_ok=True) watermark = { 'data_type': data_type, 'symbol': symbol, 'date': date, + 'layer': layer, 'timestamp': datetime.now().isoformat(), } @@ -286,7 +320,8 @@ def get_statistics_summary( self, data_type: str, start_date: Optional[str] = None, - end_date: Optional[str] = None + end_date: Optional[str] = None, + layer: Optional[str] = None ) -> Dict[str, Any]: """ Get aggregated statistics for ingestion jobs @@ -295,12 +330,13 @@ def get_statistics_summary( data_type: Data type start_date: Optional start date end_date: Optional end date + layer: Optional layer filter Returns: Summary statistics """ try: - records = self.list_ingestions(data_type, start_date, end_date) + records = self.list_ingestions(data_type, start_date, end_date, layer=layer) if not records: return { @@ -317,9 +353,15 @@ def get_statistics_summary( failed = sum(1 for r in records if r['status'] == 'failed') skipped = sum(1 for r in records if r['status'] == 'skipped') + # Count skipped as successful for success rate + successful_count = success + skipped + # Sum records processed + # Handle different field names: 'records', 'symbols_converted', 'records_enriched' total_records = sum( - r['statistics'].get('records', 0) + r['statistics'].get('records', + r['statistics'].get('symbols_converted', + r['statistics'].get('records_enriched', 0))) for r in records if r['status'] == 'success' ) @@ -341,7 +383,7 @@ def get_statistics_summary( 'success': success, 'failed': failed, 'skipped': skipped, - 'success_rate': success / total_jobs if total_jobs > 0 else 0, + 'success_rate': successful_count / total_jobs if total_jobs > 0 else 0, 'total_records': total_records, 'total_size_mb': total_size_mb, } @@ -377,7 +419,8 @@ def _get_metadata_file( self, data_type: str, date: str, - symbol: Optional[str] = None + symbol: Optional[str] = None, + layer: str = 'bronze' ) -> Path: """ Get metadata file path @@ -386,11 +429,12 @@ def _get_metadata_file( data_type: Data type date: Date string symbol: Optional symbol + layer: Medallion layer Returns: Path to metadata file """ - path = self.metadata_root / data_type / date[:4] / date[5:7] + path = self.metadata_root / layer / data_type / date[:4] / date[5:7] if symbol: path = path / f"{date}_{symbol}.json" @@ -402,7 +446,8 @@ def _get_metadata_file( def _get_watermark_file( self, data_type: str, - symbol: Optional[str] = None + symbol: Optional[str] = None, + layer: str = 'bronze' ) -> Path: """ Get watermark file path @@ -410,11 +455,12 @@ def _get_watermark_file( Args: data_type: Data type symbol: Optional symbol + layer: Medallion layer Returns: Path to watermark file """ - path = self.metadata_root / data_type + path = self.metadata_root / layer / data_type if symbol: path = path / f"watermark_{symbol}.json" @@ -536,22 +582,49 @@ def main(): print("✅ MetadataManager initialized") print(f" Root: {metadata_root}") - # List statistics for all data types - for data_type in ['stocks_daily', 'stocks_minute', 'options_daily', 'options_minute']: - stats = manager.get_statistics_summary(data_type) - - if stats['total_jobs'] > 0: - print(f"\n📊 {data_type}:") - print(f" Total jobs: {stats['total_jobs']}") - print(f" Success: {stats['success']} ({stats['success_rate']:.1%})") - print(f" Failed: {stats['failed']}") - print(f" Records: {stats['total_records']:,}") - print(f" Size: {stats['total_size_mb']:.1f} MB") - - # Get watermark - watermark = manager.get_watermark(data_type) - if watermark: - print(f" Watermark: {watermark}") + # List statistics for all data types organized by layer + layers = ['landing', 'bronze', 'silver', 'gold'] + data_types_by_layer = { + 'bronze': ['stocks_daily', 'stocks_minute', 'options_daily', 'options_minute', + 'fundamentals', 'corporate_actions', 'news', 'short_data'], + 'silver': ['stocks_daily', 'stocks_minute', 'options_daily', 'options_minute', + 'fundamentals', 'corporate_actions', 'financial_ratios'], + 'gold': ['stocks_daily_qlib'] + } + + for layer in layers: + layer_has_data = False + layer_output = [] + + # Get data types for this layer + data_types = data_types_by_layer.get(layer, []) + + for data_type in data_types: + stats = manager.get_statistics_summary(data_type, layer=layer) + + if stats['total_jobs'] > 0: + if not layer_has_data: + layer_output.append(f"\n{'='*80}") + layer_output.append(f"📦 {layer.upper()} LAYER") + layer_output.append('='*80) + layer_has_data = True + + layer_output.append(f"\n📊 {data_type}:") + layer_output.append(f" Total jobs: {stats['total_jobs']}") + layer_output.append(f" Success: {stats['success']}, Skipped: {stats['skipped']}, Failed: {stats['failed']}") + layer_output.append(f" Success rate: {stats['success_rate']:.1%}") + layer_output.append(f" Records: {stats['total_records']:,}") + layer_output.append(f" Size: {stats['total_size_mb']:.1f} MB") + + # Get watermark + watermark = manager.get_watermark(data_type, layer=layer) + if watermark: + layer_output.append(f" Watermark: {watermark}") + + # Print layer output if it has data + if layer_has_data: + for line in layer_output: + print(line) except Exception as e: print(f"❌ Error: {e}")