This project demonstrates an event-driven real-time data pipeline that ingests, enriches, and stores IP geolocation data using:
- FastAPI (API for real-time IP ingestion)
- Redis (Queue for decoupled processing)
- Apache Airflow (Workflow orchestration)
- PostgreSQL (Data storage)
- Docker Compose (Containerisation)
- IPStack API (IP enrichment)
Many companies need to track user IP locations for:
- Fraud detection (flagging suspicious logins)
- Personalisation (region-specific content)
- Security monitoring (detect unusual access patterns)
This pipeline mimics real-world architectures used in finance, e-commerce, and streaming platforms.
- FastAPI endpoint to accept IPs in real-time.
- Redis queue for decoupled ingestion.
- Airflow DAG to orchestrate enrichment and storage.
- Enrich IPs with city, country, latitude & longitude using IPStack API.
- Store results in PostgreSQL for analytics.
- Fully Dockerised for easy setup.
geolocation_ip_app/
├── README.md
├── config/
├── dags/
│ ├── geo_dag_pipeline.py # Original DAG (random IPs)
│ └── ip_dag_app.py # NEW DAG (fetch from Redis queue)
├── data/
│ └── sample_of_logs.txt
├── docker/
│ ├── docker-compose.yml
│ └── Dockerfile.fastapi
├── docs/
│ ├── architecture_diagram.jpg
│ ├── airflow_dags_time.jpg
│ ├── airflow_output.jpg
│ ├── dags_workflow_graph.jpg
│ ├── demo_pipeline.gif
│ └── ip_geolocation_table.jpg
├── requirements.txt
├── src/
│ ├── ingestion.py # Original ingestion (random IPs)
│ ├── ingestion_app.py # NEW ingestion (fetch from Redis)
│ ├── api_service_app.py # NEW FastAPI service
│ ├── enrichment.py
│ ├── storage.py
│ └── utils.py
└── tests/
- FastAPI – Real-time ingestion API
- Redis – Queue for IP processing
- Apache Airflow – Orchestration (batch & real-time hybrid)
- PostgreSQL – Persistent storage
- Docker Compose – Containerised environment
- IPStack API – IP enrichment service
- Python 3.9+
- Docker & Docker Desktop
- Free IPStack API Key
Create config/.env with:
GEO_IP_API_KEY=your_ipstack_api_key
POSTGRES_USER=your_postgre_user
POSTGRES_PASSWORD=your_password
POSTGRES_DB=database_name
DB_HOST=postgres
DB_PORT=5432
DB_NAME=database_name
DB_USER=username
DB_PASS=your_passwordClone the repository:
git clone https://github.com/konomissira/geolocation_ip_app.git
cd geolocation_ip_appcd docker
docker-compose up --builddocker exec -it airflow_webserver airflow users create \
--username airflow \
--firstname Data \
--lastname Engineer \
--role Admin \
--email myemail@example.com \
--password airflowAccess Airflow UI at: http://localhost:8080
Login: airflow / airflow
FastAPI Docs: http://localhost:8000/docs
- Send IPs to FastAPI:
curl -X POST "http://localhost:8000/ingest-ip" -H "Content-Type: application/json" -d '{"ip": "8.8.8.8"}'OR multiple IPs:
curl -X POST "http://localhost:8000/ingest-ips" -H "Content-Type: application/json" -d '{"ips": ["8.8.8.8", "1.1.1.1"]}'-
IPs are added to Redis queue (
ip_queue). -
Airflow DAG (
ip_queue_pipeline) fetches from Redis → enriches via IPStack → stores in PostgreSQL. -
Query PostgreSQL:
docker exec -it geo_postgres psql -U kono -d geo_db
SELECT * FROM ip_geolocation;- geo_dag_pipeline → Generates random IPs (original version)
- ip_queue_pipeline → Pulls IPs from Redis (new version)
Schedule: Every 10 minutes (configurable)
- Event-driven DAG triggering via Airflow API
- Integrate Kafka or AWS Kinesis for log streaming
- Add Grafana dashboards for analytics
- Implement ML model for anomaly detection
MIT License © 2025 Kono Missira

