A small end-to-end ETL-style data pipeline demonstrating real-world data engineering: - Extract stock data from the Alpha Vantage API - Transform with Pandas (cleaning, validation, typing) - Load locally into CSV + Parquet - Optionally upload to AWS S3 - Process with PySpark (local Databricks-style ETL) - Run tests via GitHub Actions CI
This project demonstrates how Python, CI, AWS, and Spark can work together in a small, clear, end-to-end data pipeline.
git clone https://github.com/Annette3125/api-collector.git
cd api-collector
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
cp .env.sample .env
python -m api_collector.get_dataExpected: creates data/new/stock_data_latest.csv and timestamped CSV/Parquet snapshots.
Pipeline: Alpha Vantage → get_data.py → CSV/Parquet → (optional) S3 → (optional) Spark summary → tests/CI.
Output contract: CSV columns are always: date, open, high, low, close, volume, symbol.
- Pulls TIME_SERIES_DAILY stock data via Alpha Vantage API
- Supports multiple symbols (configurable in
.env) - Handles rate limits and HTTP errors
- Renames and normalizes columns
- Converts datatypes (numeric, datetime)
- Drops invalid rows (negative or missing prices)
- Sorts by date, removes duplicates
- Saves fresh snapshot as:
- data/new/stock_data_latest.csv
- timestamped history files
- Parquet outputs for further analytics
Example CSV schema:
date,open,high,low,close,volume,symbol- Python 3.11+
- Git
Environment variables
Create .env from sample:
cp .env.sample .envALPHA_VANTAGE_API_KEY=your-api-key
SYMBOLS=AAPL,GOOGL,MSFT
DATA_DIR=data/new
RATE_LIMIT_SLEEP=15Get a free API key from Alpha Vantage: https://www.alphavantage.co/support/#api-key
Extract + Transform + Load
python -m api_collector.get_dataSample output files
- data/new/stock_data_latest.csv
- data/new/stock_data_.csv
- data/new/stock_data_.parquet
Timestamp format: YYYYMMDD_HHMMSS (UTC).
Daily scheduler (optional)
python -m api_collector.scheduler- Configure AWS credentials locally
aws configureTo use S3 upload, configure AWS credentials locally (or use an IAM role) and set S3_BUCKET_NAME in .env.
- Set S3 bucket in .env:
S3_BUCKET_NAME=your-bucket-name
S3_KEY=raw/stock_data_latest.csv- Upload latest CSV:
python -m api_collector.upload_to_s3Example output:
s3://<your-bucket-name>/raw/stock_data_latest.csvmacOS prerequisites:
export JAVA_HOME="$(/usr/libexec/java_home -v 17)"
export SPARK_LOCAL_IP=127.0.0.1Run Spark ETL:
python -m api_collector.databricks_etlor:
./scripts/run_spark.shOutput: data/processed/stock_summary.parquet
pytest -qCI runs automatically on every GitHub push.
- Upload processed Parquet to S3
- Orchestrator (Airflow/Prefect)
- Streamlit dashboard
- Small FastAPI service for querying results
