Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions docs/background-jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Background Job Manager

FinMind includes a resilient background job manager that automatically processes
scheduled tasks (like reminders) with built-in retry logic, monitoring, and
crash recovery.

## Architecture

The job manager wraps APScheduler with:

- **Automatic retry** with configurable exponential backoff
- **Dead-letter tracking** for jobs that exhaust all retries
- **Per-job execution history** with timing and error details
- **Prometheus metrics** for observability
- **Redis-backed persistence** for crash recovery
- **Admin API endpoints** for monitoring and management

## How It Works

### Job Lifecycle

```
PENDING → RUNNING → SUCCESS
↘ RETRYING → RUNNING → SUCCESS
↘ RETRYING → ... → FAILED (dead-lettered)
```

When a job fails:
1. The error is logged with full traceback
2. Retry delay is calculated using exponential backoff
3. A one-shot retry is scheduled after the delay
4. If max retries are exhausted, the job is **dead-lettered** (skipped on future triggers)
5. An admin can reset dead-lettered jobs via the API

### Crash Recovery

Job state is persisted to Redis after every execution. On restart:
- Previous state is restored from Redis
- Jobs that were `RUNNING` when the process crashed are marked as `RETRYING`
- The scheduler resumes with correct attempt counts

## Configuration

### Retry Policy

Each job can have its own retry policy:

```python
from app.services.job_manager import job_manager, RetryPolicy

job_manager.add_job(
my_function,
job_id="my_job",
trigger="interval",
retry_policy=RetryPolicy(
max_retries=5, # attempts before dead-letter
base_delay_seconds=10.0, # initial retry delay
max_delay_seconds=600.0, # cap on retry delay
backoff_factor=2.0, # exponential multiplier
),
minutes=5, # run every 5 minutes
)
```

### Default Retry Policy

If no policy is specified, jobs use:
- 3 retries
- 5s initial delay, 2x backoff, 300s max

## API Endpoints

### `GET /jobs/health` (unauthenticated)

Health check for monitoring systems. Returns 200 if all jobs are healthy,
503 if any job is failed or missed.

```json
{
"healthy": true,
"jobs": {
"process_due_reminders": {
"healthy": true,
"status": "success",
"total_runs": 42,
"total_failures": 0
}
}
}
```

### `GET /jobs/status` (admin only)

Detailed status including execution history.

### `GET /jobs/dead-letters` (admin only)

List jobs that exhausted all retries.

### `POST /jobs/<job_id>/reset` (admin only)

Reset a dead-lettered job so it retries on the next trigger.

## Prometheus Metrics

| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `finmind_job_executions_total` | Counter | `job_id`, `status` | Total executions |
| `finmind_job_retries_total` | Counter | `job_id` | Retry attempts |
| `finmind_job_dead_letters_total` | Counter | `job_id` | Dead-lettered jobs |
| `finmind_job_duration_seconds` | Histogram | `job_id` | Execution duration |
| `finmind_jobs_active` | Gauge | — | Currently running jobs |

## Adding New Jobs

```python
# In app/__init__.py or a dedicated jobs module

def my_periodic_task():
"""Runs inside Flask app context automatically."""
# Your business logic here
pass

job_manager.add_job(
my_periodic_task,
job_id="my_periodic_task",
trigger="cron",
hour=9,
minute=0,
)
```

The job manager handles:
- Running the function inside the Flask app context
- Catching and logging exceptions
- Retrying on failure
- Recording metrics and history
- Persisting state to Redis
53 changes: 51 additions & 2 deletions packages/backend/app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from flask import Flask, jsonify
from .config import Settings
from .extensions import db, jwt
from .extensions import db, jwt, redis_client
from .routes import register_routes
from .observability import (
Observability,
configure_logging,
finalize_request,
init_request_context,
)
from .services.job_manager import job_manager, RetryPolicy
from flask_cors import CORS
import atexit
import click
import os
import logging
from datetime import timedelta
from datetime import datetime, timedelta, timezone


def create_app(settings: Settings | None = None) -> Flask:
Expand Down Expand Up @@ -56,6 +58,53 @@ def create_app(settings: Settings | None = None) -> Flask:
with app.app_context():
_ensure_schema_compatibility(app)

# Background job manager with retry & monitoring
obs = app.extensions["observability"]
job_manager.init_app(app, redis_client=redis_client, registry=obs.registry)

def _process_due_reminders():
"""Background job: send all due reminders."""
from .models import Reminder
from .services.reminders import send_reminder

now = datetime.now(timezone.utc) + timedelta(minutes=1)
items = (
db.session.query(Reminder)
.filter(Reminder.sent.is_(False), Reminder.send_at <= now)
.all()
)
sent_count = 0
for r in items:
if send_reminder(r):
r.sent = True
sent_count += 1
else:
# Let the job manager's retry handle transient failures
raise RuntimeError(
f"Failed to send reminder {r.id} via {r.channel}"
) if sent_count == 0 else None
if items:
db.session.commit()
logger.info("Processed %d/%d due reminders", sent_count, len(items))

job_manager.add_job(
_process_due_reminders,
job_id="process_due_reminders",
trigger="interval",
retry_policy=RetryPolicy(
max_retries=5,
base_delay_seconds=10.0,
max_delay_seconds=600.0,
backoff_factor=2.0,
),
minutes=1,
)

# Only start scheduler in the main process (not in reloader child)
if not app.debug or os.environ.get("WERKZEUG_RUN_MAIN") == "true":
job_manager.start()
atexit.register(lambda: job_manager.shutdown(wait=False))

@app.before_request
def _before_request():
init_request_context()
Expand Down
2 changes: 2 additions & 0 deletions packages/backend/app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .categories import bp as categories_bp
from .docs import bp as docs_bp
from .dashboard import bp as dashboard_bp
from .jobs import bp as jobs_bp


def register_routes(app: Flask):
Expand All @@ -18,3 +19,4 @@ def register_routes(app: Flask):
app.register_blueprint(categories_bp, url_prefix="/categories")
app.register_blueprint(docs_bp, url_prefix="/docs")
app.register_blueprint(dashboard_bp, url_prefix="/dashboard")
app.register_blueprint(jobs_bp, url_prefix="/jobs")
89 changes: 89 additions & 0 deletions packages/backend/app/routes/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""API routes for background job monitoring and management.

Provides endpoints for operators and admins to:
- View job status, history, and health
- Inspect dead-lettered jobs
- Reset failed jobs for retry
"""

from flask import Blueprint, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity
from ..extensions import db
from ..models import User, Role

bp = Blueprint("jobs", __name__)


def _require_admin():
uid = int(get_jwt_identity())
user = db.session.get(User, uid)
if not user or user.role != Role.ADMIN.value:
return None, (jsonify(error="admin access required"), 403)
return user, None


@bp.get("/status")
@jwt_required()
def job_status():
"""Get status of all managed background jobs."""
user, err = _require_admin()
if err:
return err

from ..services.job_manager import job_manager
status = job_manager.get_status()
return jsonify(jobs=status), 200


@bp.get("/dead-letters")
@jwt_required()
def dead_letters():
"""List jobs that exhausted all retries."""
user, err = _require_admin()
if err:
return err

from ..services.job_manager import job_manager
dead = job_manager.get_dead_letters()
return jsonify(dead_letters=dead), 200


@bp.post("/<job_id>/reset")
@jwt_required()
def reset_job(job_id: str):
"""Reset a dead-lettered job so it retries on next trigger."""
user, err = _require_admin()
if err:
return err

from ..services.job_manager import job_manager
if job_manager.reset_job(job_id):
return jsonify(message=f"Job {job_id} reset successfully"), 200
return jsonify(error=f"Job {job_id} not found"), 404


@bp.get("/health")
def job_health():
"""Unauthenticated health check for monitoring systems.

Returns a summary without sensitive details.
"""
from ..services.job_manager import job_manager
status = job_manager.get_status()
summary = {}
all_healthy = True
for jid, info in status.items():
healthy = info["status"] not in ("failed", "missed")
summary[jid] = {
"healthy": healthy,
"status": info["status"],
"total_runs": info["total_runs"],
"total_failures": info["total_failures"],
}
if not healthy:
all_healthy = False

return jsonify(
healthy=all_healthy,
jobs=summary,
), 200 if all_healthy else 503
Loading