Skip to content

Latest commit

 

History

History
330 lines (260 loc) · 9.91 KB

File metadata and controls

330 lines (260 loc) · 9.91 KB

Workflows

Python-based ETL pipeline using Render Workflows SDK for distributed task execution.

Overview

The workflow orchestrates parallel data collection from GitHub's API, processes repositories through a 3-layer data pipeline, and stores results in PostgreSQL for analytics.

Architecture

main_analysis_task (orchestrator)
├── fetch_language_repos (Python)
│   └── analyze_repo_batch
├── fetch_language_repos (TypeScript)
│   └── analyze_repo_batch
├── fetch_language_repos (Go)
│   └── analyze_repo_batch
├── fetch_render_repos
│   └── analyze_repo_batch
└── aggregate_results (ETL: Staging → Analytics)
    └── cleanup_old_data

Task breakdown

main_analysis_task

Orchestrator - Spawns 4 parallel tasks (3 languages + Render), then aggregates results.

Execution modes:

  • Production: Full pipeline (all languages + Render)
  • Dev mode: Python only (DEV_MODE=true + DEV_REPO_LIMIT=5)

Returns:

{
    'repos_processed': int,
    'execution_time': float,
    'cleanup_stats': dict,
    'trace_id': str,
    'success': bool
}

fetch_language_repos(language: str)

Parallel task - Fetches trending repos for Python, TypeScript, or Go.

Process:

  1. Search GitHub API (repos updated in last 30 days, created in last 180 days)
  2. Take top 25 repos (sorted by stars)
  3. Fetch READMEs in parallel
  4. Store in raw layer
  5. Spawn analyze_repo_batch subtask

Returns:

{
    'repos': List[Dict],  # Enriched repo data
    'subtasks': List[Dict],  # Timing metadata
    'started_at': str,  # ISO timestamp
    'completed_at': str
}

fetch_render_repos()

Parallel task - Discovers Render projects via code search.

Detection strategy:

  • Code search for render.yaml in repository root only
  • Repos created within last 18 months
  • Target: 25 projects

Process:

  1. Search GitHub for filename:render.yaml path:/ language:yaml
  2. Fetch READMEs in parallel
  3. Store in raw layer (assigned language='render')
  4. Spawn analyze_repo_batch subtask

analyze_repo_batch(repos: List[Dict], readme_contents: Dict)

Subtask - Analyzes repos in batches of 10.

Per repository:

  1. Validate required fields (full_name, language, created_at, updated_at)
  2. Build enriched data structure
  3. Parse ISO datetime strings to timezone-aware objects
  4. Store in staging layer (stg_repos_validated)

Error handling: Continues on individual repo failures (logged but not fatal)

aggregate_results(all_results, db_pool, execution_start, trace)

ETL pipeline - Extracts from staging, calculates scores, loads to analytics.

Process:

  1. Extract top 50 repos per language (balanced)
  2. Extract ALL Render repos (language='render')
  3. Calculate momentum scores (70% recency + 30% normalized stars)
  4. Load to analytics layer:
    • dim_repositories (upsert with SCD Type 2)
    • fact_repo_snapshots (daily snapshot with momentum score)
  5. Run data retention cleanup

Scoring formula:

recency_score = exponential_decay(repo_age_days)  # 1.0 for ≤14 days, decay to 0.01
normalized_stars = stars / max_stars_in_category
momentum_score = (recency_score * 0.7) + (normalized_stars * 0.3)

cleanup_old_data(db_pool)

Maintenance task - Applies tiered data retention policy.

Retention windows:

  • Raw layer: 7 days
  • Staging layer: 7 days
  • Analytics layer: 30 days

Error handling: Failures logged but don't break workflow

Files

File Purpose
workflow.py Main workflow entry point; holds the app instance and calls app.start()
app.py Shared Workflows app instance (imported by all task modules)
github_api.py Async GitHub API client (search, fetch)
connections.py Shared resource management (DB pool, HTTP session)
auth_setup.py Interactive GitHub auth token generator
tasks/main_task.py Main orchestrator task (spawns parallel subtasks, aggregates)
tasks/language_tasks.py fetch_language_repos and fetch_render_repos tasks
tasks/batch_analysis.py analyze_repo_batch subtask (validates, enriches, stores to staging)
etl/extract.py Raw layer ingestion (store_raw_repos) and staging extraction
etl/cleanup.py Staging layer storage (store_in_staging) and data retention cleanup
etl/load.py Analytics layer loading (fact/dim inserts, momentum scoring)
etl/aggregate.py ETL orchestration (staging → analytics pipeline)
lib/encryption.py Fernet encryption/decryption for GitHub tokens
lib/oauth_manager.py GitHub OAuth token lifecycle management
utils/ Shared helpers (chunking, tracing, connection init)
requirements.txt Python dependencies

Environment variables

# Required
DATABASE_URL=postgresql://...

# GitHub Authentication (see Authentication Setup below)
GITHUB_TOKEN_ENCRYPTION_KEY=<generated-key>  # Auto-generated by auth_setup.py

# GitHub OAuth (managed automatically, optional for initial seed)
GITHUB_CLIENT_ID=<your-oauth-app-client-id>
GITHUB_CLIENT_SECRET=<your-oauth-app-client-secret>
GITHUB_ACCESS_TOKEN=ghu_...     # Optional: Only for initial DB seed
GITHUB_REFRESH_TOKEN=ghr_...    # Optional: Only for initial DB seed

# OR: Personal Access Token (PAT) - simpler alternative
GITHUB_ACCESS_TOKEN=ghp_...     # No expiration, no refresh needed

# Optional (development)
DEV_MODE=true           # Runs Python task only
DEV_REPO_LIMIT=5        # Processes 5 repos instead of 25

# Optional (local dev)
RENDER_USE_LOCAL_DEV=true
RENDER_LOCAL_DEV_URL=http://localhost:8120

Running locally

Option 1: Quick start (recommended)

python ../bin/local_dev.py

Option 2: Manual

# Terminal 1: Start task server
cd workflows
pip install -r requirements.txt
python workflow.py  # Listens on port 8120

# Terminal 2: Trigger workflow
cd trigger
python trigger.py

Running on Render

Workflows are triggered via:

  1. Cron job: Daily at 14:00 UTC (6 AM PST)
  2. Manual: Dashboard → Workflows → trender-wf → Run Task
  3. API: python trigger/trigger.py

Debugging

Check workflow status:

# Via logs
render logs --service trender-wf

# Via database
psql $DATABASE_URL -c "SELECT * FROM fact_workflow_runs ORDER BY started_at DESC LIMIT 1;"

Inspect trace tree:

# Query the task_tree JSONB column
SELECT run_id, status, task_tree FROM fact_workflow_runs
WHERE run_id = 'wfr_...';

Common issues:

Issue Solution
Auth errors / 401 Run python auth_setup.py to initialize credentials in the database
Connection refused Check DATABASE_URL is correct
Rate limit errors Verify OAuth token is valid; token auto-refreshes daily via refresh_auth.py
No repos returned GitHub API may be rate limited or down

Data flow

GitHub API
    ↓
raw_github_repos (JSONB)
    ↓
stg_repos_validated (cleaned)
    ↓
dim_repositories + fact_repo_snapshots (dimensional model)
    ↓
Dashboard views

Performance

  • Execution time: ~90 seconds for ~100 repos
  • Parallelism: 4 concurrent tasks (25 repos each)
  • Batching: Repos analyzed in batches of 10
  • README fetching: Parallel requests to minimize API latency

Testing

# Fast iteration: Dev mode (Python only, 5 repos)
export DEV_MODE=true
export DEV_REPO_LIMIT=5
python workflow.py

# Full pipeline locally
unset DEV_MODE
python ../bin/local_dev.py

Workflow tracing

The WorkflowTrace class tracks execution with hierarchical task timing:

{
    'name': 'main_analysis_task',
    'started_at': '2026-02-02T12:00:00Z',
    'completed_at': '2026-02-02T12:00:15Z',
    'status': 'completed',
    'children': [
        {'name': 'fetch_language_repos', 'language': 'Python', ...},
        {'name': 'fetch_language_repos', 'language': 'TypeScript', ...},
        ...
    ]
}

Stored in fact_workflow_runs for observability.

Authentication Setup

The workflow requires GitHub API authentication. Run the setup script:

cd workflows
python auth_setup.py

The script will:

  1. Auto-generate encryption key (GITHUB_TOKEN_ENCRYPTION_KEY) for secure token storage
  2. Walk you through OAuth or PAT setup
  3. Save credentials to database (OAuth only) for automatic refresh

Option A: Personal Access Token (PAT) - Recommended

  • ✅ Simple setup (one token)
  • ✅ No expiration (or you control it)
  • ✅ Same API rate limits (5,000 requests/hour)
  • Generate at: https://github.com/settings/tokens/new
  • Required scopes: repo, read:org

Option B: OAuth App - Automatic Token Refresh

  • Tokens auto-refresh every 8 hours (zero manual intervention)
  • Encrypted storage in PostgreSQL (Fernet encryption)
  • Self-sustaining chain - refresh tokens renew every 6 months
  • 🔒 Requires one-time setup: OAuth app + encryption key
  • Use case: Production deployments, long-running workflows

How OAuth auto-refresh works:

  1. First run: Loads tokens from DB (or seeds from env vars if DB empty)
  2. Every run: Checks expiry → refreshes if needed → persists to DB
  3. Mid-run 401 errors: Automatically refreshes and retries
  4. You never touch tokens again - as long as workflow runs once per 6 months

Required for Render (one-time):

  • GITHUB_TOKEN_ENCRYPTION_KEY (auto-generated by auth_setup.py)
  • GITHUB_CLIENT_ID and GITHUB_CLIENT_SECRET (from OAuth app)
  • DATABASE_URL (already configured)

Optional for first run only:

  • GITHUB_ACCESS_TOKEN and GITHUB_REFRESH_TOKEN (seeds DB, then auto-managed)

Dependencies

See requirements.txt:

  • render-sdk: Workflows SDK with @app.task decorators
  • asyncpg: Async PostgreSQL driver
  • aiohttp: Async HTTP client
  • python-dotenv: Environment variable management

Contributing

  1. Make changes to workflow logic
  2. Test locally: python ../bin/local_dev.py
  3. Deploy: Push to GitHub (auto-deploy enabled)
  4. Monitor: Dashboard → Workflows → Logs