This project is a high-performance, asynchronous ETL pipeline designed to ingest raw job data from XML feeds, enrich it using Google's Gemini AI, and synchronize the results with an external API (Xano). The system is built on a modern Python stack featuring FastAPI, asyncpg, and is fully containerized with Docker for easy deployment and scalability.
The pipeline's core function is to transform raw job listings into a structured, valuable dataset by classifying them into industry verticals, generating standardized metadata, and scoring their relevance. It uses a local PostgreSQL database as a "source of truth" to perform robust, sequential synchronization with external services.
The pipeline operates in a series of sequential stages, ensuring data integrity and accuracy at each step.
[START]
│
▼
┌─────────────────────────────┐ ┌───────────────────┐
│ 1. Ingest & Pre-filter │ │ Store Closed jobs │
│ (Stream XML, Remove CLOSED) │─────►│ (Rejected) │
└────────────┬────────────────┘ └───────────────────┘
│
▼
┌───────────────────────────┐
│ 2. Deduplicate & Aggregate│
│ (Hash, Multi-Location) │
└────────────┬──────────────┘
│
▼
┌───────────────────────────┐ ┌───────────────────┐
│ 3. AI Enrichment (Stage 1)│ │ Confidence< 0.5 │
│ (Industry, Confidence) │─────►│ (archived_jobs) │
└────────────┬──────────────┘ └───────────────────┘
│
▼
┌───────────────────────────┐
│ 4. AI Enrichment (Stage 2)│
│ (Job Function) │
└────────────┬──────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ 5. Categorize Jobs (Based on AI Results) │
└────────────────┬────────────────┬────────────────┘
│ │
┌─────────────────────▼───────────┐ │
│ AI Process FAILED │ │
│ (e.g., API error, timeout) │ │
└─────────────────────────────────┘ │
│ │
│ ▼
│ ┌──────────────────────────────┐
│ │ AI Process SUCCEEDED │
│ │ (Confidence Score is valid) │
│ └───────────┬──────────────────┘
│ │
│ ▼
│ ┌─────────────────────────────┐
│ │ 6. Route by Confidence Score│
│ └──────┬──────────┬───────────┘
│ │ │
│ ┌─────────▼──────┐ ┌─▼───────────────┐
│ │ Confidence<0.86│ │ Confidence>0.86 │
│ │ (Manual) │ │ (Approved) │
│ └─────────┬──────┘ └─┬───────────────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────────────────────┐
│ 7. Save ALL Categories to Supabase (Source of Truth) │
│ (open_jobs, manual_review_jobs, rejected_jobs) │
└───────────────────────────┬──────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ 8. Full Synchronization with Xano (for Approved Jobs Only) │
│ a. Get current jobs from Xano │
│ b. Delete jobs from Xano that are not in Supabase │
│ c. Add jobs to Xano that are in Supabase but not Xano │
└───────────────────────────┬──────────────────────────────────┘
│
▼
[END]
- Asynchronous from the Ground Up: Built with
asyncio, FastAPI, andasyncpgfor high-performance, non-blocking I/O. - Streaming XML Parser: Efficiently processes large XML feeds from URLs without loading the entire file into memory.
- Intelligent AI Enrichment:
- A multi-stage process using Google's Gemini model to generate standardized titles, descriptions, skills, and industry classifications.
- Correct Context Handling: Groups jobs by industry before classifying functions to ensure the AI always has the correct set of options.
- Robust Error Handling: Automatically retries failed API calls (for both AI and external services) with exponential backoff and gracefully handles malformed data.
- Dual Execution Modes: Can be run as a command-line script for single feeds or as a persistent FastAPI server for API-driven execution.
- Full External Synchronization: Implements a sequential "delete-then-add" workflow to ensure an external service (Xano) is a perfect mirror of the Supabase database.
- Containerized Deployment: Includes a
Dockerfileanddocker-compose.ymlfor easy, consistent deployment in any environment.
A key feature of the pipeline is its ability to identify and merge job postings that are for the same role but are listed in different locations. This prevents duplicates and creates a cleaner dataset. The process works in two passes:
This initial pass groups jobs based on their original, raw data.
- Key Generation: A unique key is created for each job using its
title,company_name, andemployment_type. - Grouping: All jobs with the exact same key are placed into a group.
- Similarity Analysis: Within each group, the job descriptions are compared using fuzzy string matching to determine if they are for the same role.
- Merging: Jobs that are determined to be duplicates are merged into a single record. The locations from all the duplicate jobs are combined into one list, and the
is_multi_locationflag is set totrue.
After the first stage of AI enrichment, a second, more accurate aggregation pass is performed. This pass uses the standardized ai_title generated by the Gemini model, which is cleaner and more consistent than the original title, leading to better matching.
- Python 3.11+
- Docker & Docker Compose
- Access to a PostgreSQL database (e.g., Supabase, Neon, or a local instance)
-
Clone the Repository
git clone <your-repository-url> cd <your-repository-name>
-
Create a Virtual Environment
python -m venv .venv source .venv/bin/activate # On Windows, use: .venv\Scripts\activate
-
Install Dependencies
pip install -r requirements.txt
-
Configure Environment Variables
- Copy the sample
.env.samplefile to a new file named.env.cp .env.sample .env
- Open the
.envfile and fill in your actual credentials for the database, Google Cloud, and Xano.
- Copy the sample
This method is ideal for testing, debugging, or manually processing a specific feed.
- Usage:
python process_feed.py --feed-id <ID_OF_THE_FEED>
- Example:
python process_feed.py --feed-id 21
This method runs the application as a high-performance web server using FastAPI and is the recommended approach for production.
-
Start the Server:
uvicorn app:app --host 0.0.0.0 --port 8080
-
Trigger a Feed: Send a
POSTrequest to the/trigger-feed/{feed_id}endpoint.- Example using
curl:curl -X POST http://localhost:8080/trigger-feed/21
- Response: The server will immediately respond with a
202 Acceptedstatus, confirming that the task has started in the background.
- Example using
-
Check Execution Status: Send a
GETrequest to the/execution-status/{feed_id}endpoint to see the report from the last run.- Example using
curl:curl http://localhost:8080/execution-status/21
- Example using
-
API Documentation: While the server is running, you can access interactive API documentation (Swagger UI) by navigating to
http://localhost:8080/docsin your browser.
- Ensure your
.envfile is complete. - Build and Run the Container:
The application will be accessible at
docker-compose up --build
http://localhost:8080.
- Execution Reports: The
execution_reportstable in the database stores detailed statistics for each pipeline run. - Feed Statistics: The
feedstable is updated after every run with a summary, includinglast_run_status,avg_processing_time, andconsecutive_failure_count. - Log Levels: The application provides detailed logs at INFO, WARNING, and ERROR levels to track the progress and diagnose issues.