Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions docs/background-jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Background Job Manager

FinMind includes a resilient background job manager that automatically processes
scheduled tasks (like reminders) with built-in retry logic, monitoring, and
crash recovery.

## Architecture

The job manager wraps APScheduler with:

- **Automatic retry** with configurable exponential backoff
- **Dead-letter tracking** for jobs that exhaust all retries
- **Per-job execution history** with timing and error details
- **Prometheus metrics** for observability
- **Redis-backed persistence** for crash recovery
- **Admin API endpoints** for monitoring and management

## How It Works

### Job Lifecycle

```
PENDING → RUNNING → SUCCESS
↘ RETRYING → RUNNING → SUCCESS
↘ RETRYING → ... → FAILED (dead-lettered)
```

When a job fails:
1. The error is logged with full traceback
2. Retry delay is calculated using exponential backoff
3. A one-shot retry is scheduled after the delay
4. If max retries are exhausted, the job is **dead-lettered** (skipped on future triggers)
5. An admin can reset dead-lettered jobs via the API

### Crash Recovery

Job state is persisted to Redis after every execution. On restart:
- Previous state is restored from Redis
- Jobs that were `RUNNING` when the process crashed are marked as `RETRYING`
- The scheduler resumes with correct attempt counts

## Configuration

### Retry Policy

Each job can have its own retry policy:

```python
from app.services.job_manager import job_manager, RetryPolicy

job_manager.add_job(
my_function,
job_id="my_job",
trigger="interval",
retry_policy=RetryPolicy(
max_retries=5, # attempts before dead-letter
base_delay_seconds=10.0, # initial retry delay
max_delay_seconds=600.0, # cap on retry delay
backoff_factor=2.0, # exponential multiplier
),
minutes=5, # run every 5 minutes
)
```

### Default Retry Policy

If no policy is specified, jobs use:
- 3 retries
- 5s initial delay, 2x backoff, 300s max

## API Endpoints

### `GET /jobs/health` (unauthenticated)

Health check for monitoring systems. Returns 200 if all jobs are healthy,
503 if any job is failed or missed.

```json
{
"healthy": true,
"jobs": {
"process_due_reminders": {
"healthy": true,
"status": "success",
"total_runs": 42,
"total_failures": 0
}
}
}
```

### `GET /jobs/status` (admin only)

Detailed status including execution history.

### `GET /jobs/dead-letters` (admin only)

List jobs that exhausted all retries.

### `POST /jobs/<job_id>/reset` (admin only)

Reset a dead-lettered job so it retries on the next trigger.

## Prometheus Metrics

| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `finmind_job_executions_total` | Counter | `job_id`, `status` | Total executions |
| `finmind_job_retries_total` | Counter | `job_id` | Retry attempts |
| `finmind_job_dead_letters_total` | Counter | `job_id` | Dead-lettered jobs |
| `finmind_job_duration_seconds` | Histogram | `job_id` | Execution duration |
| `finmind_jobs_active` | Gauge | — | Currently running jobs |

## Adding New Jobs

```python
# In app/__init__.py or a dedicated jobs module

def my_periodic_task():
"""Runs inside Flask app context automatically."""
# Your business logic here
pass

job_manager.add_job(
my_periodic_task,
job_id="my_periodic_task",
trigger="cron",
hour=9,
minute=0,
)
```

The job manager handles:
- Running the function inside the Flask app context
- Catching and logging exceptions
- Retrying on failure
- Recording metrics and history
- Persisting state to Redis
106 changes: 106 additions & 0 deletions docs/weekly-digest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Weekly Financial Digest

FinMind generates weekly financial summaries highlighting spending trends,
category breakdowns, anomalies, and actionable insights.

## Features

- **Spending summary**: Total expenses, income, net flow, transaction count
- **Week-over-week comparison**: Percentage change vs previous week
- **Category breakdown**: Top spending categories with percentages
- **Daily spending chart data**: Per-day totals for visualization
- **Anomaly detection**: Flags spending spikes, drops, and large transactions
- **Upcoming bills**: Bills due in the next 7 days
- **AI-enhanced narrative** (optional): Gemini-powered summary and tips
- **Heuristic fallback**: Smart tips generated without AI when Gemini is unavailable
- **Email rendering**: Plain-text email template for digest delivery
- **Automated delivery**: Weekly cron job sends digests every Monday at 9:00 AM UTC

## API Endpoints

### `GET /digest/weekly`

Generate a weekly digest for the authenticated user.

**Query Parameters:**
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `week_of` | ISO date | Previous week | Any date within the target week |

**Headers (optional):**
| Header | Description |
|--------|-------------|
| `X-Gemini-Api-Key` | Gemini API key for AI narrative |
| `X-Insight-Persona` | Custom AI persona |

**Response:**
```json
{
"week_start": "2026-03-09",
"week_end": "2026-03-15",
"total_expenses": 1500.00,
"total_income": 2000.00,
"net_flow": 500.00,
"previous_week_expenses": 1200.00,
"week_over_week_change_pct": 25.0,
"transaction_count": 15,
"daily_spending": [
{"date": "2026-03-09", "total": 200.00},
{"date": "2026-03-10", "total": 350.00},
...
],
"category_breakdown": [
{"category": "Food", "total": 800.00, "count": 8, "pct": 53.3},
{"category": "Transport", "total": 700.00, "count": 7, "pct": 46.7}
],
"anomalies": ["Spending jumped 25% vs last week"],
"upcoming_bills": [
{"id": 1, "name": "Rent", "amount": 15000.00, "due_date": "2026-03-20", "autopay": true}
],
"tips": ["Your biggest category was Food..."],
"method": "heuristic"
}
```

### `GET /digest/weekly/email-preview`

Same as `/digest/weekly` but also returns a formatted plain-text email body.

```json
{
"email_body": "📊 FinMind Weekly Digest...",
"digest": { ... }
}
```

## Anomaly Detection

The system flags:
- **Spending spike**: >30% increase week-over-week
- **Spending drop**: >20% decrease (positive reinforcement)
- **Large transaction**: Single transaction >40% of weekly total
- **No data**: Zero expenses recorded (potential tracking gap)

## Automated Delivery

Digests are sent automatically every **Monday at 9:00 AM UTC** via the
background job manager. The job:

1. Iterates all users
2. Generates each user's digest for the previous week
3. Renders to email format
4. Sends via the configured SMTP provider
5. Retries up to 3 times on failure (60s → 600s backoff)

Monitor via `GET /jobs/health` or `GET /jobs/status` (admin).

## AI Enhancement

When a Gemini API key is available (per-user header or server config),
the digest includes:

- `narrative`: 2-3 sentence AI-written summary
- `ai_tips`: 3 actionable AI-generated tips
- `mood`: Overall financial health (great/good/okay/concerning/critical)

Falls back to heuristic tips transparently if Gemini is unavailable.
85 changes: 83 additions & 2 deletions packages/backend/app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from flask import Flask, jsonify
from .config import Settings
from .extensions import db, jwt
from .extensions import db, jwt, redis_client
from .routes import register_routes
from .observability import (
Observability,
configure_logging,
finalize_request,
init_request_context,
)
from .services.job_manager import job_manager, RetryPolicy
from flask_cors import CORS
import atexit
import click
import os
import logging
from datetime import timedelta
from datetime import datetime, timedelta, timezone


def create_app(settings: Settings | None = None) -> Flask:
Expand Down Expand Up @@ -56,6 +58,85 @@ def create_app(settings: Settings | None = None) -> Flask:
with app.app_context():
_ensure_schema_compatibility(app)

# Background job manager with retry & monitoring
obs = app.extensions["observability"]
job_manager.init_app(app, redis_client=redis_client, registry=obs.registry)

def _process_due_reminders():
"""Background job: send all due reminders."""
from .models import Reminder
from .services.reminders import send_reminder

now = datetime.now(timezone.utc) + timedelta(minutes=1)
items = (
db.session.query(Reminder)
.filter(Reminder.sent.is_(False), Reminder.send_at <= now)
.all()
)
sent_count = 0
for r in items:
if send_reminder(r):
r.sent = True
sent_count += 1
else:
# Let the job manager's retry handle transient failures
raise RuntimeError(
f"Failed to send reminder {r.id} via {r.channel}"
) if sent_count == 0 else None
if items:
db.session.commit()
logger.info("Processed %d/%d due reminders", sent_count, len(items))

job_manager.add_job(
_process_due_reminders,
job_id="process_due_reminders",
trigger="interval",
retry_policy=RetryPolicy(
max_retries=5,
base_delay_seconds=10.0,
max_delay_seconds=600.0,
backoff_factor=2.0,
),
minutes=1,
)

def _send_weekly_digests():
"""Background job: generate and email weekly digests to all users."""
from .models import User
from .services.digest import generate_weekly_digest, generate_digest_email_body
from .services.reminders import send_email

users = db.session.query(User).all()
sent = 0
for user in users:
try:
digest = generate_weekly_digest(user.id)
body = generate_digest_email_body(digest)
if send_email(user.email, "Your FinMind Weekly Digest", body):
sent += 1
except Exception:
logger.warning("Digest failed for user %s", user.id, exc_info=True)
logger.info("Weekly digests sent: %d/%d users", sent, len(users))

job_manager.add_job(
_send_weekly_digests,
job_id="send_weekly_digests",
trigger="cron",
retry_policy=RetryPolicy(
max_retries=3,
base_delay_seconds=60.0,
max_delay_seconds=600.0,
),
day_of_week="mon",
hour=9,
minute=0,
)

# Only start scheduler in the main process (not in reloader child)
if not app.debug or os.environ.get("WERKZEUG_RUN_MAIN") == "true":
job_manager.start()
atexit.register(lambda: job_manager.shutdown(wait=False))

@app.before_request
def _before_request():
init_request_context()
Expand Down
4 changes: 4 additions & 0 deletions packages/backend/app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from .categories import bp as categories_bp
from .docs import bp as docs_bp
from .dashboard import bp as dashboard_bp
from .jobs import bp as jobs_bp
from .digest import bp as digest_bp


def register_routes(app: Flask):
Expand All @@ -18,3 +20,5 @@ def register_routes(app: Flask):
app.register_blueprint(categories_bp, url_prefix="/categories")
app.register_blueprint(docs_bp, url_prefix="/docs")
app.register_blueprint(dashboard_bp, url_prefix="/dashboard")
app.register_blueprint(jobs_bp, url_prefix="/jobs")
app.register_blueprint(digest_bp, url_prefix="/digest")
Loading