A sophisticated AI-powered fact-checking system that verifies medical and scientific claims against authoritative sources using a 7-phase Corrective Retrieval Pipeline combined with hybrid ranking algorithms and knowledge graph integration.
Luxia Worker implements an advanced information retrieval and verification system designed for biomedical fact-checking (extensible to other domains). The system:
- π Searches trusted medical/scientific domains (WHO, CDC, NIH, PubMed, etc.)
- π Extracts facts, entities, and relationships using LLM-powered NLP
- π Stores findings in dual storage systems (Pinecone vector DB + Neo4j knowledge graph)
- β Ranks evidence using 5-signal hybrid scoring (recency, credibility, semantic similarity, entity match, KG score)
- π Reinforces low-confidence results through iterative search loops with failed entity targeting
- Quick Start
- Architecture
- Installation
- Configuration
- API Endpoints
- Pipeline Phases
- Development
- Testing
- Deployment
- Troubleshooting
# Clone and setup
git clone https://github.com/Luxia-AI/worker.git
cd worker
# Create virtual environment
python -m venv .venv
source .venv/Scripts/activate # Windows
# or
source .venv/bin/activate # macOS/Linux
# Install dependencies
pip install -r requirements.txt
pip install -r requirements-dev.txt
# Configure environment
cp .env.example .env
# Edit .env with your API keys
# Run quality checks
./run.sh all
# Start development server
python main.py
# or
uvicorn app.main:app --reload --port 9000# Build and run with Docker Compose
docker-compose up --build
# View logs
docker-compose logs -f worker
# Stop services
docker-compose downInput (claim)
β
[1] SEARCH PHASE
β’ Query reformulation (LLM)
β’ Trusted domain filtering
β’ Google CSE search
β
[2] SCRAPING PHASE
β’ HTML β Text extraction (Trafilatura)
β’ Content deduplication
β
[3] EXTRACTION PHASE
β’ Fact extraction (LLM)
β’ Entity extraction (LLM)
β’ Relation extraction (LLM)
β
[4] INGESTION PHASE
β’ VDB ingestion (Pinecone)
β’ KG ingestion (Neo4j)
β
[5] RETRIEVAL PHASE
β’ Semantic search (VDB)
β’ Structural queries (KG)
β
[6] RANKING PHASE
β’ 5-signal hybrid scoring:
1. Credibility (domain authority)
2. Recency (exponential decay)
3. Semantic similarity (embedding)
4. Entity match (extracted entities)
5. KG structural score
β
[7] REINFORCEMENT PHASE
β’ If confidence < THRESHOLD:
β Collect failed entities
β Re-search with entity queries
β Loop up to MAX_ROUNDS
β
Output (ranked evidence with confidence)
Claim Input
β
TrustedSearch (LLM query reformulation)
β
Google CSE β Search URLs
β
Trafilatura β Scrape content
β
3x LLM Extraction (Facts, Entities, Relations)
β
βββββββββββββββββββββββ
β Parallel Ingest β
ββββββββββββ¬βββββββββββ€
β Pinecone β Neo4j β
β (VDB) β (KG) β
ββββββββββββ΄βββββββββββ
β
βββββββββββββββββββββββ
β Parallel Retrieval β
ββββββββββββ¬βββββββββββ€
β Semantic βStructuralβ
β Search β Queries β
ββββββββββββ΄βββββββββββ
β
Hybrid Ranking (5 signals)
β
Confidence >= Threshold?
ββ YES β Return top-k evidence
ββ NO β Reinforcement Loop
app/
βββ main.py # FastAPI app entry point
βββ constants/
β βββ config.py # All configuration constants
β βββ llm_prompts.py # LLM prompt templates
βββ core/
β βββ config.py # Pydantic settings loader
β βββ logger.py # Structured logging
β βββ rate_limit.py # Rate limiter decorator
β βββ utils.py # Helper utilities
βββ routers/
β βββ pinecone.py # /worker/search endpoint
β βββ admin.py # /admin/logs endpoint
βββ services/
βββ corrective/ # Core pipeline
β βββ pipeline/
β β βββ __init__.py # CorrectivePipeline orchestrator
β β βββ search_phase.py
β β βββ extraction_phase.py
β β βββ ingestion_phase.py
β β βββ retrieval_phase.py
β β βββ ranking_phase.py
β β βββ reinforcement_phase.py
β βββ trusted_search.py # Trusted domain search + LLM reformulation
β βββ scraper.py # Trafilatura wrapper
β βββ fact_extractor.py # LLM fact extraction
β βββ entity_extractor.py # LLM entity extraction
β βββ relation_extractor.py # LLM relation extraction
βββ embedding/
β βββ model.py # Embedding model management
βββ vdb/
β βββ pinecone_client.py # Pinecone API wrapper
β βββ vdb_ingest.py # Vector DB ingestion
β βββ vdb_retrieval.py # Semantic search
βββ kg/
β βββ neo4j_client.py # Neo4j API wrapper
β βββ kg_ingest.py # Knowledge graph ingestion
β βββ kg_retrieval.py # Structural queries
β βββ schema_init.py # KG schema creation
βββ ranking/
β βββ hybrid_ranker.py # 5-signal ranking
β βββ trust_ranker.py # Domain trust scoring
βββ llms/
β βββ groq_service.py # LLM integration (Groq/OpenAI)
βββ logging/
β βββ log_manager.py # Redis + SQLite logging
β βββ log_handler.py # Logging integration
β βββ log_store.py # SQLite persistence
β βββ redis_broadcaster.py # Redis pub/sub
βββ common/
βββ url_helpers.py # URL utilities
βββ text_cleaner.py # Text normalization
βββ dedup.py # Deduplication
βββ list_ops.py # List operations
- Python: 3.13+
- External Services:
- Pinecone (vector database)
- Neo4j (knowledge graph)
- Redis (logging & caching)
- Google Custom Search Engine (CSE)
- Groq or OpenAI (LLM API)
-
Clone Repository
git clone https://github.com/Luxia-AI/worker.git cd worker -
Create Virtual Environment
python -m venv .venv source .venv/Scripts/activate # Windows
-
Install Dependencies
pip install -r requirements.txt pip install -r requirements-dev.txt # For development -
Configure Environment (see Configuration)
Create a .env file in the project root:
# FastAPI
LOG_LEVEL=INFO
PORT=9000
# LLM Configuration
LLM_MODEL_NAME=grok-2-1212
LLM_TEMPERATURE=0.7
GROQ_API_KEY=your_groq_api_key_here
OPENAI_API_KEY=your_openai_api_key_here # Fallback
# Embedding Model
EMBEDDING_MODEL_NAME_PROD=sentence-transformers/all-MiniLM-L6-v2
EMBEDDING_MODEL_NAME_TEST=sentence-transformers/all-MiniLM-L6-v2
# Pinecone (Vector Database)
PINECONE_API_KEY=your_pinecone_api_key
PINECONE_INDEX_NAME=worker-index
PINECONE_ENVIRONMENT=us-east-1
# Neo4j (Knowledge Graph)
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=your_neo4j_password
# Google Custom Search
GOOGLE_API_KEY=your_google_api_key
GOOGLE_CSE_ID=your_cse_id
# Redis (Logging & Caching)
REDIS_URL=redis://localhost:6379
LOG_DB_PATH=./logs.db
# Database
DATABASE_URL=sqlite:///./logs.db
# Features
RATE_LIMIT_ENABLED=true
RATE_LIMIT_CALLS=5
RATE_LIMIT_PERIOD=1 # secondsAll tuneable parameters are in app/constants/config.py:
# Pipeline configuration
PIPELINE_MAX_ROUNDS = 3 # Max reinforcement loops
PIPELINE_CONF_THRESHOLD = 0.70 # Confidence threshold for reinforcement
PIPELINE_MIN_NEW_URLS = 2 # Min new URLs per reinforcement round
# Ranking weights (5-signal hybrid scoring)
RANKING_WEIGHTS = {
'credibility': 0.25,
'recency': 0.25,
'semantic_similarity': 0.25,
'entity_match': 0.15,
'kg_score': 0.10
}
# Trusted domains
TRUSTED_DOMAINS_AUTHORITY = {'who.int', 'cdc.gov', 'nih.gov', ...}
TRUSTED_DOMAINS_EDU_GOV = {'*.edu', '*.gov'}Endpoint: GET /worker/search
Query Parameters:
query(string, required): Medical claim or question to verify
Response:
{
"query": "Does vitamin C prevent colds?",
"results": [
{
"statement": "Vitamin C does not prevent common cold infections...",
"confidence": 0.85,
"source_url": "https://example.com/article",
"source": "NIH",
"published_at": "2023-06-15",
"entities": ["vitamin C", "common cold"],
"evidence_score": 0.87
}
]
}Example:
curl "http://localhost:9000/worker/search?query=WHO+guidelines+for+COVID-19+vaccination"Endpoint: GET /admin/logs
Query Parameters:
skip(int): Number of logs to skip (pagination)limit(int): Number of logs to returnrequest_id(string, optional): Filter by request IDlevel(string, optional): Filter by log level (DEBUG, INFO, WARNING, ERROR)module(string, optional): Filter by module name
Response:
{
"logs": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"timestamp": "2025-11-22T10:30:00Z",
"level": "INFO",
"message": "[SearchPhase:uuid] Found 5 trusted sources",
"module": "search_phase",
"request_id": "req-123"
}
],
"total": 1250,
"page": 1,
"per_page": 10
}Endpoint: GET /admin/logs/stats
Query Parameters:
request_id(string, optional): Filter by request IDlevel(string, optional): Filter by log levelmodule(string, optional): Filter by module
Response:
{
"total": 1250,
"by_level": {
"DEBUG": 50,
"INFO": 1000,
"WARNING": 150,
"ERROR": 50
},
"by_module": {
"search_phase": 300,
"extraction_phase": 280,
"ranking_phase": 220,
...
}
}File: app/services/corrective/pipeline/search_phase.py
Process:
- Reformulate input query using LLM (improves search results)
- Filter Google CSE results to trusted domains only
- Return top-N URLs from authoritative sources
Key Function: do_search(claim: str) -> List[str]
Outputs:
search_urls: List of URLs from trusted domains
File: app/services/corrective/pipeline/extraction_phase.py:scrape_pages()
Process:
- Fetch HTML from URLs using HTTP client
- Convert HTML to plain text using Trafilatura
- Clean and normalize text
- Deduplicate content
Key Function: scrape_pages(search_urls: List[str]) -> List[str]
Outputs:
scraped_content: List of plain-text webpage content
File: app/services/corrective/pipeline/extraction_phase.py:extract_all()
Process:
- Fact Extraction: LLM extracts claims/statements from content
- Entity Extraction: LLM identifies medical/scientific entities
- Relation Extraction: LLM identifies relationships between entities
Key Functions:
extract_all(content: List[str]) -> Tuple[List[Dict], List[str], List[Dict]]
Outputs:
facts: List of fact dicts with statement, confidence, source, published_atentities: List of extracted entitiestriples: List of relationship triples (subject-relation-object)
File: app/services/corrective/pipeline/ingestion_phase.py
Process:
- Embed facts using sentence transformer
- Store embeddings in Pinecone (VDB)
- Store facts as nodes/relationships in Neo4j (KG)
Key Function: ingest_facts_and_triples(facts, triples) -> Tuple[int, int]
Outputs:
- Facts stored in Pinecone
- Relationships stored in Neo4j
File: app/services/corrective/pipeline/retrieval_phase.py
Process:
- Semantic Search: Query Pinecone for similar facts
- Structural Search: Query Neo4j for related entities/relationships
- Combine and deduplicate candidates
Key Function: retrieve_candidates(claim: str) -> List[Dict]
Outputs:
candidates: List of retrieved fact/relationship candidates
File: app/services/corrective/pipeline/ranking_phase.py
Process: Compute 5-signal hybrid score:
-
Credibility (0.25 weight)
- Domain authority mapping (WHO > CDC > Academic > News)
- Normalized to [0, 1]
-
Recency (0.25 weight)
- Exponential decay: exp(-age_days / HALF_LIFE)
- Recent sources weighted higher
-
Semantic Similarity (0.25 weight)
- Cosine similarity between claim and fact embeddings
- VDB embedding match score
-
Entity Match (0.15 weight)
- % of extracted entities found in candidate
- Bonus for exact entity matches
-
KG Structural Score (0.10 weight)
- Confidence of relationships in knowledge graph
- Path strength in entity networks
Final Score: Weighted sum of normalized signals
Key Function: rank_candidates(candidates: List[Dict]) -> List[Dict]
Outputs:
ranked_candidates: Sorted by final_score (descending)
File: app/services/corrective/pipeline/reinforcement_phase.py
Process:
- Check if max(ranked_candidates).final_score < CONF_THRESHOLD
- If yes and round < MAX_ROUNDS:
- Collect entities from low-scoring candidates
- Re-search with entity-focused queries (Phase 1)
- Repeat phases 2-6
- Return final ranked results
Key Function: reinforcement_loop(candidate_results, round) -> List[Dict]
Logic:
while round < MAX_ROUNDS and max_confidence < THRESHOLD:
failed_entities = collect_low_confidence_entities(candidates)
new_urls = search_with_entities(failed_entities)
if len(new_urls) < MIN_NEW_URLS:
break # Not enough new evidence
candidates = pipeline_phases_2_to_6(new_urls)
round += 1
tests/
βββ test_pipeline_full.py # E2E tests (mocked external services)
βββ test_pipeline_integration.py # Integration tests
βββ test_pipeline_with_real_storage.py # Real storage tests
βββ test_pipeline_actual.py # Actual pipeline tests
βββ test_entity_extractor.py # Service unit tests
βββ test_fact_extracting.py
βββ test_relation_extractor.py
βββ test_scraper.py
βββ test_trusted_search.py
βββ test_hybrid_rank.py
βββ test_trust_ranker.py
βββ test_neo4j_client.py
βββ test_pinecone_client.py
βββ test_vdb_ingest.py
βββ test_vdb_retrieval.py
βββ test_kg_ingest.py
βββ test_logging_system.py
βββ conftest.py # Pytest configuration with fixtures
# Run all tests
pytest
# Run specific test file
pytest tests/test_pipeline_full.py
# Run tests matching pattern
pytest tests/ -k "extraction"
# Run with coverage
pytest --cov=app tests/
# Run only local tests (skip external service tests)
pytest -m "not redis_required and not e2e"
# Run verbose output
pytest -v
# Run in parallel (faster)
pytest -n auto@pytest.mark.integration # Requires external services
@pytest.mark.slow # Long-running tests
@pytest.mark.redis_required # Requires Redis (auto-skipped in CI)
@pytest.mark.e2e # End-to-end tests# .github/workflows/ci.yml
- name: Run Tests
run: pytest -m "not redis_required and not e2e" -qTests with Redis/E2E markers are auto-skipped in CI environment.
Constants (never hardcode):
# app/constants/config.py
PIPELINE_MAX_ROUNDS = 3
PIPELINE_CONF_THRESHOLD = 0.70
RANKING_WEIGHTS = {...}
TRUSTED_DOMAINS_AUTHORITY = {...}Logging (structured with round_id):
from app.core.logger import get_logger
logger = get_logger(__name__)
logger.info(f"[PhaseX:round_id] Message", extra={"round_id": round_id})Async (async-first design):
async def extract_facts(content: List[str]) -> List[Dict]:
tasks = [extractor.extract(c) for c in content]
return await asyncio.gather(*tasks)# Run all checks
./run.sh all
# Run specific check
./run.sh "black mypy ruff"
# Available checks: pytest, ruff, black, isort, flake8, bandit, mypy- Black: Line length = 120
- isort: Black profile
- Type hints: mypy (lenient, some ignores for framework code)
- Linting: Ruff, Flake8
- Security: Bandit
Build:
docker build -t luxia-worker:latest .Run:
docker run -p 9000:9000 \
-e PINECONE_API_KEY=xxx \
-e NEO4J_URI=bolt://neo4j:7687 \
-e GROQ_API_KEY=xxx \
-e GOOGLE_API_KEY=xxx \
luxia-worker:latestDocker Compose (recommended):
docker-compose up --build# .env.local
REDIS_URL: redis://localhost:6379
NEO4J_URI: bolt://localhost:7687
# .env.docker (docker-compose.yml sets these)
REDIS_URL: redis://redis:6379
NEO4J_URI: bolt://neo4j:7687
# .env.prod (K8s secrets, etc.)
# All secrets from environment/vault# Check API health
curl http://localhost:9000/worker/search?query=test
# Check logs
curl http://localhost:9000/admin/logs?limit=10
# Check Docker container
docker ps
docker logs workerRedis Connection Failed
Error: Connection refused (redis://localhost:6379)
- Ensure Redis is running:
docker run -p 6379:6379 redis - Check REDIS_URL in .env
Pinecone Not Found
Error: Index not found: worker-index
- Create index in Pinecone dashboard
- Verify PINECONE_INDEX_NAME and PINECONE_API_KEY
Neo4j Connection Issues
Error: Could not connect to bolt://localhost:7687
- Ensure Neo4j is running:
docker run -p 7687:7687 neo4j - Check credentials: NEO4J_USER, NEO4J_PASSWORD
LLM API Errors
Error: Groq API rate limit exceeded
- Check GROQ_API_KEY
- Reduce rate limits or use fallback OpenAI
- Check OPENAI_API_KEY
Tests Failing with "Redis not available"
# Expected in CI (tests auto-skip)
# For local testing, ensure Redis is running or skip:
pytest -m "not redis_required"Enable detailed logging:
# .env
LOG_LEVEL=DEBUGView logs:
curl http://localhost:9000/admin/logs?level=DEBUG&limit=50- Batch Operations: Process multiple claims in parallel
- Caching: Results cached in Redis (configurable TTL)
- Rate Limiting: Respect external API limits (configured in config.py)
- Embedding Model: Use lightweight model for production (all-MiniLM-L6-v2)
- Search Phase: ~2-3 seconds per claim
- Scraping: ~5-10 seconds (5-20 URLs)
- Extraction: ~8-12 seconds (3x LLM calls)
- Retrieval: ~1-2 seconds (VDB + KG queries)
- Ranking: ~0.5 seconds (hybrid scoring)
- Total (single round): ~20-30 seconds
- With Reinforcement: 20-90 seconds (up to 3 rounds)
- Unit Tests: 81+ passing
- Integration Tests: 12+ tests
- E2E Tests: 5+ real claim scenarios
- Code Quality: 100% (Black, isort, mypy, ruff, flake8, bandit)
- Create feature branch:
git checkout -b feature/your-feature - Make changes following code style
- Run tests:
./run.sh all - Commit:
git commit -m "feat: description" - Push and create PR
See LICENSE file in repository.
- Issues: GitHub Issues
- Questions: GitHub Discussions
Version: 1.0.0
Last Updated: November 22, 2025
Status: Production Ready β