This guide covers development setup, architecture, and contribution guidelines for BenchHub Plus.
BenchHub Plus follows a microservices architecture with the following components:
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Reflex β β FastAPI β β Celery β
β Frontend βββββΊβ Backend βββββΊβ Workers β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β β β
β βΌ βΌ
β βββββββββββββββββββ βββββββββββββββββββ
β β PostgreSQL β β Redis β
β β Database β β Cache β
ββββββββββββββββ€ β βββββββββββββββββββ
βββββββββββββββββββ
β
βββββββββββββββββββ
β HRET Toolkit β
β Integration β
βββββββββββββββββββ
- Frontend (Reflex): User interface, form handling, reactive state management
- Backend (FastAPI): REST API, business logic, request orchestration
- Workers (Celery): Async task processing, HRET integration, model evaluation
- Database (PostgreSQL): Data persistence, caching, task tracking
- Cache (Redis): Session storage, task queue, temporary data
- Python 3.11+
- PostgreSQL 12+
- Redis 6+
- Git
- Docker (optional but recommended)
# Clone repository
git clone https://github.com/HAE-RAE/BenchhubPlus.git
cd BenchhubPlus
# Setup development environment
./scripts/setup.sh
# Configure environment
cp .env.example .env
# Edit .env with your settings
# Start development services
./scripts/deploy.sh developmentUse Docker for infrastructure (PostgreSQL, Redis) and run Python services natively:
# 1. Start infrastructure with Docker
docker compose -f docker-compose.dev.yml up -d postgres redis
# 2. Create virtual environment
python3.11 -m venv venv
source venv/bin/activate
# 3. Install dependencies
pip install -e .
# 4. Clone and install HRET (required for evaluation tasks)
git clone https://github.com/HAE-RAE/haerae-evaluation-toolkit.git
pip install -e ./haerae-evaluation-toolkit
# 5. Configure environment
cp .env.example .env
# Edit .env: set OPENAI_API_KEY, DEV_AUTH_BYPASS=true
# Ensure DATABASE_URL uses localhost:5433, REDIS_URL uses localhost:6380
# 6. Start services individually
# Terminal 1 β FastAPI backend
PYTHONPATH="." python -m uvicorn apps.backend.main:app --host 0.0.0.0 --port 8000 --reload
# Terminal 2 β Celery worker
celery -A apps.worker.celery_app worker --loglevel=info
# Terminal 3 β Reflex frontend
cd apps/reflex_frontend
DEV_AUTH_BYPASS=true API_BASE_URL=http://localhost:8000 PUBLIC_API_BASE_URL=http://localhost:8000 \
reflex run --env dev --backend-port 8002 --frontend-port 3000Seed Data: Place
seed_data.parquetin thedata/directory. The backend seeds the database automatically on startup if it is empty.
BenchhubPlus/
βββ apps/ # Application modules
β βββ backend/ # FastAPI backend
β β βββ main.py # FastAPI app entry point
β β βββ routers/ # API route handlers
β β βββ services/ # Business logic layer
β β βββ repositories/ # Data access layer
β βββ reflex_frontend/ # Reflex frontend
β β βββ reflex_frontend/
β β β βββ reflex_frontend.py # App entry point and router
β β β βββ state.py # Centralized AppState
β β β βββ components/ # Reusable UI (header, nav, footer)
β β β βββ pages/ # Page components (evaluation, status, leaderboard, manager)
β β βββ assets/ # Static assets
β β βββ rxconfig.py # Reflex configuration
β βββ worker/ # Celery workers
β β βββ celery_app.py # Celery configuration
β β βββ tasks/ # Async task definitions
β β βββ hret_runner.py # HRET integration
β βββ core/ # Shared core modules
β β βββ config.py # Configuration management
β β βββ db.py # Database setup
β β βββ models.py # SQLAlchemy models
β β βββ schemas.py # Pydantic schemas
β β βββ security.py # Security utilities
β βββ evaluation/ # Evaluation engine
βββ docs/ # Documentation
βββ scripts/ # Deployment scripts
βββ tests/ # Test suites
βββ logs/ # Application logs
βββ docker-compose.yml # Production deployment
βββ docker-compose.dev.yml # Development deployment
βββ pyproject.toml # Python dependencies
# Create feature branch
git checkout -b feature/your-feature-name
# Make changes
# ... develop your feature ...
# Run tests
./scripts/test.sh
# Commit changes
git add .
git commit -m "feat: add your feature description"
# Push and create PR
git push origin feature/your-feature-nameWe use the following tools for code quality:
# Install development tools
pip install black isort flake8 mypy pytest
# Format code
black apps/
isort apps/
# Lint code
flake8 apps/
mypy apps/
# Run tests
pytest tests/# Install pre-commit
pip install pre-commit
# Setup hooks
pre-commit install
# Run manually
pre-commit run --all-filestests/
βββ unit/ # Unit tests
β βββ test_models.py # Database model tests
β βββ test_services.py # Service layer tests
β βββ test_utils.py # Utility function tests
βββ integration/ # Integration tests
β βββ test_api.py # API endpoint tests
β βββ test_worker.py # Worker task tests
β βββ test_database.py # Database integration tests
βββ e2e/ # End-to-end tests
β βββ test_evaluation.py # Full evaluation flow
β βββ test_frontend.py # Frontend interaction tests
βββ fixtures/ # Test data and fixtures
βββ sample_data.json
βββ mock_responses.py
# All tests
pytest
# Specific test categories
pytest tests/unit/
pytest tests/integration/
pytest tests/e2e/
# With coverage
pytest --cov=apps tests/
# Specific test file
pytest tests/unit/test_models.py
# Specific test function
pytest tests/unit/test_models.py::test_leaderboard_cache_creation# Unit test example
import pytest
from apps.core.models import LeaderboardCache
def test_leaderboard_cache_creation():
cache = LeaderboardCache(
model_name="test-model",
score=0.85,
language="English",
subject_type="Math",
task_type="QA"
)
assert cache.model_name == "test-model"
assert cache.score == 0.85
# Integration test example
import pytest
from fastapi.testclient import TestClient
from apps.backend.main import app
client = TestClient(app)
def test_health_endpoint():
response = client.get("/api/v1/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"# apps/backend/routers/new_feature.py
from fastapi import APIRouter, Depends
from apps.core.schemas import NewFeatureRequest, NewFeatureResponse
from apps.backend.services.new_feature_service import NewFeatureService
router = APIRouter(prefix="/new-feature", tags=["new-feature"])
@router.post("/", response_model=NewFeatureResponse)
async def create_new_feature(
request: NewFeatureRequest,
service: NewFeatureService = Depends()
):
return await service.create_feature(request)# apps/core/models.py
from sqlalchemy import Column, String, DateTime, Float
from apps.core.db import Base
class NewFeatureModel(Base):
__tablename__ = "new_features"
id = Column(String, primary_key=True)
name = Column(String, nullable=False)
value = Column(Float, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)# apps/core/schemas.py
from pydantic import BaseModel
from datetime import datetime
class NewFeatureRequest(BaseModel):
name: str
value: float
class NewFeatureResponse(BaseModel):
id: str
name: str
value: float
created_at: datetime
class Config:
from_attributes = True# apps/backend/services/new_feature_service.py
from apps.core.schemas import NewFeatureRequest, NewFeatureResponse
from apps.backend.repositories.new_feature_repository import NewFeatureRepository
class NewFeatureService:
def __init__(self, repository: NewFeatureRepository):
self.repository = repository
async def create_feature(self, request: NewFeatureRequest) -> NewFeatureResponse:
feature = await self.repository.create(request)
return NewFeatureResponse.from_orm(feature)# apps/backend/repositories/new_feature_repository.py
from sqlalchemy.orm import Session
from apps.core.models import NewFeatureModel
from apps.core.schemas import NewFeatureRequest
class NewFeatureRepository:
def __init__(self, db: Session):
self.db = db
async def create(self, request: NewFeatureRequest) -> NewFeatureModel:
feature = NewFeatureModel(
id=str(uuid.uuid4()),
name=request.name,
value=request.value
)
self.db.add(feature)
self.db.commit()
self.db.refresh(feature)
return feature# apps/reflex_frontend/reflex_frontend/new_feature.py
import httpx
import reflex as rx
API_BASE = "http://localhost:8000"
class NewFeatureState(rx.State):
name: str = ""
value: str = "0"
async def create_feature(self):
try:
payload = {"name": self.name, "value": float(self.value)}
except ValueError:
return rx.toast.error("Enter a valid number")
async with httpx.AsyncClient() as client:
response = await client.post(
f"{API_BASE}/api/v1/new-feature/",
json=payload,
)
if response.status_code == 200:
return rx.toast.success("Feature created successfully!")
return rx.toast.error("Failed to create feature")
def render_new_feature_form():
return rx.vstack(
rx.heading("New Feature"),
rx.input(
placeholder="Feature Name",
on_change=NewFeatureState.set_name,
),
rx.input(
placeholder="Feature Value",
type_="number",
on_change=NewFeatureState.set_value,
),
rx.button(
"Create Feature",
on_click=NewFeatureState.create_feature,
),
spacing="3",
)# apps/worker/tasks/new_task.py
from apps.worker.celery_app import celery_app
from celery import Task
@celery_app.task(bind=True)
def new_evaluation_task(self: Task, task_data: dict):
"""New evaluation task implementation."""
try:
# Update task progress
self.update_state(
state='PROGRESS',
meta={'current': 0, 'total': 100}
)
# Perform task logic
result = perform_evaluation(task_data)
# Update progress
self.update_state(
state='PROGRESS',
meta={'current': 100, 'total': 100}
)
return result
except Exception as exc:
self.update_state(
state='FAILURE',
meta={'error': str(exc)}
)
raise# Check task status
from apps.worker.celery_app import celery_app
result = celery_app.AsyncResult(task_id)
print(f"Status: {result.status}")
print(f"Result: {result.result}")# Add to your code for debugging
import logging
logger = logging.getLogger(__name__)
logger.debug("Debug message")
logger.info("Info message")
logger.error("Error message")
# Use debugger
import pdb; pdb.set_trace()# Reflex debugging β add print/log statements inside rx.State methods
# They appear in the Reflex backend console (terminal running reflex run)
import logging
logger = logging.getLogger(__name__)
class AppState(rx.State):
async def some_handler(self):
logger.debug(f"Current state: {self.language_filter}")
# Use rx.toast for quick UI feedback
return rx.toast.info("Debug: handler triggered")Run with verbose logging:
reflex run --env dev --loglevel debug# Run worker with debug logging
celery -A apps.worker.celery_app worker --loglevel=debug
# Monitor tasks
celery -A apps.worker.celery_app events
# Inspect workers
celery -A apps.worker.celery_app inspect active# Use database indexes
class LeaderboardCache(Base):
__tablename__ = "leaderboard_cache"
# Add indexes for frequently queried fields
__table_args__ = (
Index('idx_model_score', 'model_name', 'score'),
Index('idx_language_subject', 'language', 'subject_type'),
)
# Use query optimization
def get_leaderboard_optimized(db: Session, filters: dict):
query = db.query(LeaderboardCache)
# Add filters efficiently
if filters.get('language'):
query = query.filter(LeaderboardCache.language == filters['language'])
# Use pagination
return query.offset(filters.get('offset', 0)).limit(filters.get('limit', 100))# Redis caching
import redis
from functools import wraps
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def cache_result(expiration=3600):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
# Try to get from cache
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Compute and cache result
result = func(*args, **kwargs)
redis_client.setex(cache_key, expiration, json.dumps(result))
return result
return wrapper
return decorator
@cache_result(expiration=1800)
def expensive_computation(data):
# Expensive operation
return result# Input validation
from pydantic import BaseModel, validator
class EvaluationRequest(BaseModel):
query: str
models: List[ModelConfig]
@validator('query')
def validate_query(cls, v):
if len(v) > 1000:
raise ValueError('Query too long')
return v.strip()
# Rate limiting
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/api/v1/leaderboard/generate")
@limiter.limit("10/minute")
async def generate_leaderboard(request: Request, ...):
# Endpoint implementation
pass# Encrypt sensitive data
from cryptography.fernet import Fernet
def encrypt_api_key(api_key: str, key: bytes) -> str:
f = Fernet(key)
return f.encrypt(api_key.encode()).decode()
def decrypt_api_key(encrypted_key: str, key: bytes) -> str:
f = Fernet(key)
return f.decrypt(encrypted_key.encode()).decode()def evaluate_model(model_config: ModelConfig, samples: List[Sample]) -> EvaluationResult:
"""
Evaluate a model on given samples.
Args:
model_config: Configuration for the model to evaluate
samples: List of evaluation samples
Returns:
EvaluationResult containing scores and metrics
Raises:
ModelAPIError: If model API call fails
ValidationError: If samples are invalid
Example:
>>> config = ModelConfig(name="gpt-4", api_key="sk-...")
>>> samples = [Sample(question="What is 2+2?", answer="4")]
>>> result = evaluate_model(config, samples)
>>> print(result.average_score)
0.95
"""
# Implementation
pass# FastAPI automatic documentation
@app.post(
"/api/v1/leaderboard/generate",
response_model=TaskResponse,
summary="Generate Leaderboard",
description="Create a new evaluation task based on natural language query",
responses={
200: {"description": "Task created successfully"},
400: {"description": "Invalid request data"},
422: {"description": "Validation error"}
}
)
async def generate_leaderboard(request: EvaluationRequest):
"""
Generate a new leaderboard evaluation.
This endpoint accepts a natural language query and model configurations,
then creates an asynchronous evaluation task.
"""
pass# Production environment variables
export DATABASE_URL="postgresql://user:pass@host:5432/db"
export CELERY_BROKER_URL="redis://host:6379/0"
export OPENAI_API_KEY="sk-..."
export DEBUG="false"
export LOG_LEVEL="info"# Build images
docker build -f Dockerfile.backend -t benchhub-backend .
docker build -f Dockerfile.worker -t benchhub-worker .
docker build -f Dockerfile.reflex -t benchhub-frontend .
# Run with docker-compose
docker-compose up -d# Health check endpoint
@app.get("/api/v1/health")
async def health_check():
"""System health check."""
# Check database
try:
db.execute("SELECT 1")
db_status = "connected"
except Exception:
db_status = "disconnected"
# Check Redis
try:
redis_client.ping()
redis_status = "connected"
except Exception:
redis_status = "disconnected"
# Overall status
status = "healthy" if all([
db_status == "connected",
redis_status == "connected"
]) else "unhealthy"
return {
"status": status,
"database_status": db_status,
"redis_status": redis_status,
"timestamp": datetime.utcnow()
}- Fork the repository
- Create a feature branch
- Make your changes
- Add tests for new functionality
- Ensure all tests pass
- Update documentation
- Submit pull request
- Code follows style guidelines
- Tests are included and passing
- Documentation is updated
- No security vulnerabilities
- Performance impact considered
- Backward compatibility maintained
Use the provided issue templates for:
- Bug reports
- Feature requests
- Documentation improvements
- Performance issues
For questions about development or to discuss architectural decisions, please open a discussion in our GitHub repository.