-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtask_server.py
More file actions
751 lines (670 loc) · 29.1 KB
/
task_server.py
File metadata and controls
751 lines (670 loc) · 29.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
#!/usr/bin/env python3
"""Thin MCP server exposing only task management tools.
Shares the same SQLite database as the main sqlite-kb server.
Exists because Claude Code 2.x has a tool-count limit per MCP server
(~9 tools visible out of 50), so task tools are split into a separate server.
"""
from __future__ import annotations
import json
import uuid
from typing import Any
from fastmcp_compat import FastMCP
from db_utils import (
get_conn as _get_conn,
TaskDAO,
TASK_ACTIVE_EXCLUSIONS as _TASK_ACTIVE_EXCLUSIONS,
TASK_PRIORITIES as _TASK_PRIORITIES,
validate_task_fields as _validate_task_fields,
build_priority_order_sql,
now_iso as _now,
setup_logger,
apply_task_mutation as _apply_task_mutation,
create_task_with_ledger as _create_task_with_ledger,
)
from retrieval_contract import (
RETRIEVAL_CONTRACT_VERSION,
classify_lookup_confidence,
is_visible_lookup_match,
order_surface_hits,
score_lookup_surface,
)
# Pre-built SQL for active-task exclusion
_EXCL_PH = ",".join("?" for _ in _TASK_ACTIVE_EXCLUSIONS)
# ── Logging (file-only, NEVER stdout — breaks MCP stdio) ────────────────
logger = setup_logger("sqlite-tasks", "task_server.log")
# ── Unified search engine (shared across query_tasks calls) ──────────────
from task_search import TaskSearchEngine
_search_engine = TaskSearchEngine()
def _vec_sync_task_safe(conn, task_id: str) -> None:
"""Sync task embedding, swallowing errors for graceful degradation."""
try:
from vec_search import vec_sync_task
vec_sync_task(conn, task_id)
except Exception as e:
logger.debug("vec_sync_task(%s) skipped: %s", task_id, e)
# ── FastMCP app ──────────────────────────────────────────────────────────
mcp = FastMCP(
"sqlite-tasks",
instructions=(
"Task management tools for SQLite-backed persistent memory. "
"Create, update, query, and digest tasks. "
"Use find_by_title when only a remembered phrase is known: it searches tasks, notes, "
"and entities across title/name, description, notes, observations, and project "
"regardless of status, section, or project filters, using retrieval contract "
f"{RETRIEVAL_CONTRACT_VERSION} with confidence gating. "
"Use description as the default primary body for task/note content; "
"use notes only for auxiliary/internal metadata. Shares DB with sqlite-kb."
),
)
# ═══════════════════════════════════════════════════════════════════════════
# Tool 1: create_task_or_note
# ═══════════════════════════════════════════════════════════════════════════
@mcp.tool()
def create_task_or_note(
title: str,
type: str = "task",
description: str = "",
section: str = "inbox",
priority: str = "medium",
due_date: str = "",
project: str = "",
parent_id: str = "",
notes: str = "",
recurring: str = "",
reminder_at: str = "",
) -> str:
"""Create a new task or note. Returns the UUID.
Put the main long-form task/note text in ``description`` by default.
Use ``notes`` only for auxiliary, internal, or machine-readable metadata.
Args:
title: Task title (required).
type: task | note.
description: Primary task/note body and main long-form content.
section: inbox | today | next | someday | waiting.
priority: low | medium | high | critical.
due_date: YYYY-MM-DD format or empty to skip.
project: Project tag for grouping.
parent_id: UUID of parent task (for subtasks).
notes: Secondary/internal notes or machine-readable metadata.
recurring: JSON config for recurrence (e.g. '{"every":"week","day":"monday"}').
reminder_at: ISO datetime for reminder (e.g. '2026-03-15T14:00:00').
"""
# Normalize empty strings to None
description = description or None
due_date = due_date or None
project = project or None
parent_id = parent_id or None
notes = notes or None
recurring = recurring or None
reminder_at = reminder_at or None
task_id = str(uuid.uuid4())
now = _now()
if err := _validate_task_fields(
section=section,
priority=priority,
type=type,
due_date=due_date,
recurring=recurring,
reminder_at=reminder_at,
):
return json.dumps({"error": err})
with _get_conn() as conn:
if parent_id:
if not TaskDAO.exists(conn, parent_id):
return json.dumps({"error": f"Parent task {parent_id} not found"})
_create_task_with_ledger(
conn,
task_id,
title,
now,
description=description,
status="not_started",
priority=priority,
section=section,
due_date=due_date,
project=project,
parent_id=parent_id,
notes=notes,
recurring=recurring,
reminder_at=reminder_at,
type=type,
tool_name="sqlite-tasks.create_task_or_note",
)
_vec_sync_task_safe(conn, task_id)
logger.info("create_task_or_note: %s (%s)", title, task_id)
return json.dumps(
{"task_id": task_id, "title": title, "type": type, "status": "not_started"}
)
# ═══════════════════════════════════════════════════════════════════════════
# Tool 2: update_task
# ═══════════════════════════════════════════════════════════════════════════
@mcp.tool()
def update_task(
task_id: str,
title: str = "",
description: str = "",
status: str = "",
priority: str = "",
section: str = "",
due_date: str = "",
project: str = "",
parent_id: str = "",
notes: str = "",
recurring: str = "",
reminder_at: str = "",
type: str = "",
) -> str:
"""Update a task's fields. Only non-empty fields are changed.
Pass special value "CLEAR" to set a field to NULL.
``description`` is the primary body field for task/note text.
``notes`` is reserved for secondary/internal metadata.
Args:
task_id: UUID of the task to update (required).
title: New title.
description: New main task/note body.
status: not_started | in_progress | done | archived | cancelled.
priority: low | medium | high | critical.
section: inbox | today | next | someday | waiting.
due_date: YYYY-MM-DD or "CLEAR" to remove.
project: Project tag or "CLEAR" to remove.
parent_id: Parent UUID or "CLEAR" to remove.
notes: New auxiliary/internal notes or "CLEAR" to remove.
recurring: JSON config or "CLEAR" to remove.
reminder_at: ISO datetime or "CLEAR" to remove.
type: task | note.
"""
fields = {
"title": title,
"description": description,
"status": status,
"priority": priority,
"section": section,
"due_date": due_date,
"project": project,
"parent_id": parent_id,
"notes": notes,
"recurring": recurring,
"reminder_at": reminder_at,
"type": type,
}
updates = {}
for k, v in fields.items():
if v == "CLEAR":
updates[k] = None
elif v: # non-empty string = update
updates[k] = v
if not updates:
return json.dumps({"error": "No fields to update. Pass non-empty values."})
val_fields = {
k: v
for k, v in updates.items()
if k
in (
"status",
"section",
"priority",
"type",
"due_date",
"recurring",
"reminder_at",
)
and v is not None
}
if err := _validate_task_fields(**val_fields):
return json.dumps({"error": err})
updates["updated_at"] = _now()
with _get_conn() as conn:
changed_keys = [k for k in updates if k != "updated_at"]
result = _apply_task_mutation(
conn,
task_id,
{k: v for k, v in updates.items() if k != "updated_at"},
timestamp=updates["updated_at"],
tool_name="sqlite-tasks.update_task",
)
if result.get("updated", 0) == 0 and result.get("missing"):
return json.dumps({"error": f"Task {task_id} not found"})
# Re-embed if content fields changed
if {"title", "description", "notes"} & set(result.get("changed_fields", ())):
_vec_sync_task_safe(conn, task_id)
logger.info("update_task: %s updated %s", task_id, list(updates.keys()))
return json.dumps({"updated": task_id, "fields": list(updates.keys())})
# ═══════════════════════════════════════════════════════════════════════════
# Tool 3: query_tasks
# ═══════════════════════════════════════════════════════════════════════════
@mcp.tool()
def query_tasks(
section: str = "",
status: str = "",
priority: str = "",
project: str = "",
parent_id: str = "",
type: str = "",
overdue_only: bool = False,
search: str = "",
summary_only: bool = False,
offset: int = 0,
limit: int = 50,
) -> str:
"""Query tasks with optional filters. Returns markdown table.
Filters are combined with AND. Leave empty to skip a filter.
overdue_only=True shows only tasks past due_date.
search: full-text search across title, description, notes.
summary_only=True omits description/notes (faster).
"""
conditions: list[str] = []
params: list[Any] = []
if section:
conditions.append("t.section = ?")
params.append(section)
if status:
conditions.append("t.status = ?")
params.append(status)
if priority:
conditions.append("t.priority = ?")
params.append(priority)
if project:
conditions.append("t.project = ?")
params.append(project)
if parent_id:
conditions.append("t.parent_id = ?")
params.append(parent_id)
if type:
conditions.append("t.type = ?")
params.append(type)
if overdue_only:
conditions.append("t.due_date < date('now')")
conditions.append(f"t.status NOT IN ({_EXCL_PH})")
params.extend(_TASK_ACTIVE_EXCLUSIONS)
if summary_only:
cols = "t.id, t.title, t.status, t.priority, t.section, t.due_date, t.project, t.parent_id, t.notes, t.created_at, t.updated_at"
else:
cols = "t.id, t.title, t.description, t.notes, t.status, t.priority, t.section, t.due_date, t.project, t.parent_id, t.created_at, t.updated_at"
from_clause = "tasks t"
order_clause = (
f"{build_priority_order_sql('t.')}, t.due_date ASC NULLS LAST, t.created_at ASC"
)
where = " AND ".join(conditions) if conditions else "1=1"
with _get_conn() as conn:
if search:
# FTS5 pre-filter: narrow rows before search engine re-ranks
fts_tokens = search.split()
fts_where = where
fts_params = list(params)
if fts_tokens:
# Prefix match (token*) so pre-filter is broader than
# the search engine's fuzzy/substring matching
escaped = []
for t in fts_tokens:
clean = "".join(c for c in t if c.isalnum() or c == "_")
if clean:
escaped.append(clean + "*")
fts_match = " OR ".join(escaped) if escaped else None
if fts_match:
try:
# Verify FTS5 query is valid before using it
conn.execute(
"SELECT 1 FROM tasks_fts WHERE tasks_fts MATCH ? LIMIT 1",
(fts_match,),
)
fts_where = (
f"{where} AND t.rowid IN "
f"(SELECT rowid FROM tasks_fts WHERE tasks_fts MATCH ?)"
)
fts_params.append(fts_match)
except Exception:
pass # FTS5 failed — fall back to unfiltered scan
sql = (
f"SELECT {cols} FROM {from_clause} WHERE {fts_where} "
f"ORDER BY {order_clause}"
)
all_rows = conn.execute(sql, fts_params).fetchall()
results = _search_engine.search(
search, [dict(r) for r in all_rows], conn=conn
)
total = len(results)
rows = results[offset : offset + limit]
else:
sql = (
f"SELECT {cols} FROM {from_clause} WHERE {where} "
f"ORDER BY {order_clause} "
f"LIMIT ? OFFSET ?"
)
rows = conn.execute(sql, params + [limit, offset]).fetchall()
count_sql = f"SELECT COUNT(*) FROM {from_clause} WHERE {where}"
total = conn.execute(count_sql, params).fetchone()[0]
rows = [dict(r) if not isinstance(r, dict) else r for r in rows] if rows else []
if not rows:
return json.dumps(
{"tasks": [], "count": 0, "total": total, "message": "No tasks match"}
)
lines = [
"| # | Title | Status | Priority | Section | Due | Project | Created | Notes |",
"|---|-------|--------|----------|---------|-----|---------|---------|-------|",
]
for i, r in enumerate(rows, 1):
due = r["due_date"] or "—"
proj = r["project"] or "—"
created = (r.get("created_at") or "—")[:16]
notes = (r["notes"] or "—")[:80]
lines.append(
f"| {i + offset} | {r['title']} | {r['status']} | {r['priority']} "
f"| {r['section']} | {due} | {proj} | {created} | {notes} |"
)
result = {
"tasks": rows,
"count": len(rows),
"total": total,
"offset": offset,
"limit": limit,
"markdown": "\n".join(lines),
}
if total > offset + limit:
result["has_more"] = True
result["next_offset"] = offset + limit
return json.dumps(result)
# ═══════════════════════════════════════════════════════════════════════════
# Tool 3b: find_by_title
# ═══════════════════════════════════════════════════════════════════════════
@mcp.tool()
def find_by_title(title_fragment: str, limit: int = 20) -> str:
"""Find tasks, notes, or entities by partial title or remembered phrase.
This is the cross-surface lookup tool when the caller only remembers a
phrase, not the storage surface. It searches across task titles,
descriptions, notes, projects, entity names, and entity observations.
It ignores task status, section, type, and project filters.
Args:
title_fragment: Any distinctive substring or remembered phrase.
limit: Maximum number of matches to return.
"""
query = (title_fragment or "").strip()
if not query:
return json.dumps(
{"matches": [], "count": 0, "message": "Empty title fragment"}
)
limit = max(1, min(int(limit), 100))
matches: list[dict[str, Any]] = []
with _get_conn() as conn:
task_rows = conn.execute(
"SELECT id, title, description, notes, type, status, section, priority, due_date, project, updated_at, created_at "
"FROM tasks"
).fetchall()
for row in task_rows:
title = row["title"] or ""
surface_scores = {
surface: score
for surface, score in (
("title", score_lookup_surface("title", query, title)),
(
"description",
score_lookup_surface("description", query, row["description"]),
),
("notes", score_lookup_surface("notes", query, row["notes"])),
("project", score_lookup_surface("project", query, row["project"])),
)
if score > 0
}
if not surface_scores:
continue
ordered_hits = order_surface_hits(surface_scores)
matched_in = [surface for surface, _ in ordered_hits]
score = float(ordered_hits[0][1])
confidence = classify_lookup_confidence(surface_scores)
kind = "note" if row["type"] == "note" else "task"
matches.append(
{
"kind": kind,
"id": row["id"],
"title": title,
"type": row["type"],
"status": row["status"],
"section": row["section"],
"priority": row["priority"],
"due_date": row["due_date"],
"project": row["project"],
"updated_at": row["updated_at"],
"created_at": row["created_at"],
"score": score,
"matched_in": matched_in,
"surface_scores": surface_scores,
"primary_surface": matched_in[0],
"confidence": confidence,
"ranking_contract_version": RETRIEVAL_CONTRACT_VERSION,
}
)
obs_by_entity: dict[int, list[str]] = {}
for row in conn.execute(
"SELECT entity_id, content FROM observations ORDER BY entity_id, id"
):
obs_by_entity.setdefault(row["entity_id"], []).append(row["content"])
entity_rows = conn.execute(
"SELECT id, name, entity_type, project, updated_at, created_at FROM entities"
).fetchall()
for row in entity_rows:
name = row["name"] or ""
obs_text = "\n".join(obs_by_entity.get(row["id"], []))
surface_scores = {
surface: score
for surface, score in (
("name", score_lookup_surface("name", query, name)),
(
"observations",
score_lookup_surface("observations", query, obs_text),
),
("project", score_lookup_surface("project", query, row["project"])),
(
"entity_type",
score_lookup_surface("entity_type", query, row["entity_type"]),
),
)
if score > 0
}
if not surface_scores:
continue
ordered_hits = order_surface_hits(surface_scores)
matched_in = [surface for surface, _ in ordered_hits]
score = float(ordered_hits[0][1])
confidence = classify_lookup_confidence(surface_scores)
matches.append(
{
"kind": "entity",
"id": row["id"],
"title": name,
"entityType": row["entity_type"],
"project": row["project"],
"updated_at": row["updated_at"],
"created_at": row["created_at"],
"score": score,
"matched_in": matched_in,
"surface_scores": surface_scores,
"primary_surface": matched_in[0],
"confidence": confidence,
"ranking_contract_version": RETRIEVAL_CONTRACT_VERSION,
}
)
matches.sort(
key=lambda item: (
float(item.get("score") or 0.0),
1 if item.get("kind") in {"task", "note"} else 0,
item.get("updated_at") or "",
item.get("created_at") or "",
),
reverse=True,
)
confident_matches = [
item
for item in matches
if is_visible_lookup_match(item.get("surface_scores") or {})
]
hidden_low_confidence = (
len(matches) - len(confident_matches) if confident_matches else 0
)
visible_matches = confident_matches if confident_matches else matches
matches = visible_matches[:limit]
lines = [
"| # | Kind | Title | Status/Type | Section | Project | Match | Confidence |",
"|---|------|-------|-------------|---------|---------|-------|------------|",
]
for idx, item in enumerate(matches, 1):
if item["kind"] == "entity":
status_or_type = item.get("entityType") or "entity"
section = "—"
else:
status_or_type = (
f"{item.get('status') or '—'} / {item.get('type') or 'task'}"
)
section = item.get("section") or "—"
lines.append(
f"| {idx} | {item['kind']} | {item['title']} | {status_or_type} | "
f"{section} | {item.get('project') or '—'} | {', '.join(item.get('matched_in') or []) or '—'} | "
f"{item.get('confidence') or 'low'} |"
)
return json.dumps(
{
"matches": matches,
"count": len(matches),
"query": query,
"hidden_low_confidence_count": hidden_low_confidence,
"ranking_contract_version": RETRIEVAL_CONTRACT_VERSION,
"markdown": "\n".join(lines) if matches else "",
"message": None if matches else "No title matches",
}
)
# ═══════════════════════════════════════════════════════════════════════════
# Tool 4: task_digest
# ═══════════════════════════════════════════════════════════════════════════
@mcp.tool()
def task_digest(
include_overdue: bool = True,
limit: int = 20,
) -> str:
"""Generate a formatted task digest for session start.
Shows pending/in-progress tasks grouped by section,
plus overdue tasks highlighted separately.
"""
target_sections = ["today", "inbox", "next"]
with _get_conn() as conn:
ph = ",".join("?" * len(target_sections))
active = conn.execute(
f"SELECT id, title, description, notes, status, priority, section, due_date, project "
f"FROM tasks "
f"WHERE section IN ({ph}) AND status IN ('not_started', 'in_progress') AND type = 'task' "
f"ORDER BY CASE section WHEN 'today' THEN 0 WHEN 'inbox' THEN 1 "
f"WHEN 'next' THEN 2 WHEN 'waiting' THEN 3 WHEN 'someday' THEN 4 END, "
f"{build_priority_order_sql()} LIMIT ?",
target_sections + [limit],
).fetchall()
overdue = []
if include_overdue:
overdue = conn.execute(
"SELECT id, title, description, notes, status, priority, section, due_date, project "
"FROM tasks "
f"WHERE due_date < date('now') AND status NOT IN ({_EXCL_PH}) AND type = 'task' "
"ORDER BY due_date ASC LIMIT 10",
list(_TASK_ACTIVE_EXCLUSIONS),
).fetchall()
counts = conn.execute(
"SELECT status, COUNT(*) as cnt FROM tasks "
"WHERE status NOT IN ('archived', 'cancelled') GROUP BY status"
).fetchall()
lines = ["## Task Digest"]
if counts:
stats = {r["status"]: r["cnt"] for r in counts}
total = sum(stats.values())
lines.append(
f"**Total active:** {total} | "
f"Not started: {stats.get('not_started', 0)} | "
f"In progress: {stats.get('in_progress', 0)} | "
f"Done: {stats.get('done', 0)}"
)
lines.append("")
if overdue:
lines.append(f"### OVERDUE ({len(overdue)})")
for t in overdue:
note_hint = f" | {t['notes'][:60]}..." if t["notes"] else ""
lines.append(
f"- [{t['priority'].upper()}] {t['title']} (due: {t['due_date']}){note_hint}"
)
lines.append("")
by_section: dict[str, list] = {}
for t in active:
by_section.setdefault(t["section"], []).append(t)
for sec in target_sections:
tasks = by_section.get(sec, [])
if tasks:
lines.append(f"### {sec.upper()} ({len(tasks)})")
for t in tasks:
due = f" [due: {t['due_date']}]" if t["due_date"] else ""
prio = (
f"[{t['priority'].upper()}] " if t["priority"] != "medium" else ""
)
note_hint = f" | {t['notes'][:60]}..." if t["notes"] else ""
lines.append(f"- {prio}{t['title']}{due}{note_hint}")
lines.append("")
return json.dumps(
{
"digest": "\n".join(lines),
"active_count": len(active),
"overdue_count": len(overdue),
}
)
# ═══════════════════════════════════════════════════════════════════════════
# Tool 5: archive_done_tasks
# ═══════════════════════════════════════════════════════════════════════════
@mcp.tool()
def archive_done_tasks(older_than_days: int = 7) -> str:
"""Archive completed tasks older than N days.
Moves tasks with status='done' and updated_at older than threshold to 'archived'.
"""
if older_than_days < 0:
return json.dumps({"error": "older_than_days must be non-negative"})
with _get_conn() as conn:
affected_ids = TaskDAO.archive_done(conn, older_than_days)
logger.info(
"archive_done_tasks: %d archived (older than %d days)",
len(affected_ids),
older_than_days,
)
return json.dumps(
{"archived": len(affected_ids), "threshold_days": older_than_days}
)
# ═══════════════════════════════════════════════════════════════════════════
# Tool 6: bump_overdue_priority
# ═══════════════════════════════════════════════════════════════════════════
@mcp.tool()
def bump_overdue_priority(target_priority: str = "high") -> str:
"""Bump priority of overdue tasks not done/archived.
Only bumps tasks with priority lower than target.
"""
if target_priority not in _TASK_PRIORITIES:
return json.dumps({"error": f"Invalid priority: {target_priority}"})
priority_rank = {p: i for i, p in enumerate(_TASK_PRIORITIES)}
target_rank = priority_rank[target_priority]
lower_priorities = [p for p, r in priority_rank.items() if r < target_rank]
if not lower_priorities:
return json.dumps({"bumped": 0, "message": "No lower priorities to bump"})
ph = ",".join("?" * len(lower_priorities))
now = _now()
with _get_conn() as conn:
affected = conn.execute(
f"SELECT id FROM tasks "
f"WHERE due_date < date('now') AND status NOT IN ({_EXCL_PH}) "
f"AND priority IN ({ph})",
list(_TASK_ACTIVE_EXCLUSIONS) + lower_priorities,
).fetchall()
bumped = 0
for row in affected:
result = _apply_task_mutation(
conn,
row["id"],
{"priority": target_priority},
timestamp=now,
tool_name="sqlite-tasks.bump_overdue_priority",
)
if result.get("updated", 0):
bumped += 1
logger.info("bump_overdue_priority: %d bumped to %s", bumped, target_priority)
return json.dumps({"bumped": bumped, "target_priority": target_priority})
# ── Entry point ──────────────────────────────────────────────────────────
if __name__ == "__main__":
mcp.run(transport="stdio")