From 6a72de16ff6903ee0be1bc66053446ee278522e4 Mon Sep 17 00:00:00 2001 From: Kevin Hopper Date: Sun, 12 Apr 2026 16:36:28 -0500 Subject: [PATCH] F.13: Crosspost scheduler + log/moderation GC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up after Phase 2. Completes F.12.2's queued-crosspost story by adding the dispatcher that fires scheduled_at-passed entries, and sweeps stale rows from crosspost_log + moderation_actions. **What lands** - scripts/init-db.js — adds `transformed_payload_json` column to `crosspost_log` via `addColumnIfMissing`. Scheduler needs the transformed payload to publish; earlier F.12 rows (no stored payload) are marked status='manual' so the operator can handle them by hand. - servers/sharing/server.js — `crow_crosspost` now stores the transformed payload in the new column on insert. Tool surface is unchanged; rows produced after this PR are auto-publishable. - servers/gateway/crossposting/scheduler.js — NEW module: • Publish loop (every 15s): scans crosspost_log for status='ready' OR (status='queued' AND scheduled_at <= now). Batches up to 20 rows per tick; in-flight set prevents overlap. • Publishers: mastodon (/api/v1/statuses with Bearer token), gotosocial (same shape), crow-blog (direct DB insert since blog is in-process). Each reads its bundle's env vars (MASTODON_URL + MASTODON_ACCESS_TOKEN, etc.). • MANUAL_TARGETS set for media-heavy bundles (pixelfed/peertube/ funkwhale) + text-but-needs-context (writefreely, lemmy, matrix-dendrite) — scheduler marks these status='manual' so operator can complete the publish with local knowledge (which collection, which community, which room). • GC loop (every 1h, kicked on start): prunes crosspost_log rows >30 days old in terminal statuses (published/cancelled/error/ manual; queued/ready are preserved). Also sweeps moderation_actions past their F.11 expires_at into status='expired' — closing the TTL loop the F.11 docs promised. • Success raises a low-priority Crow notification; errors raise high-priority with the 200-char error message. • CROW_DISABLE_CROSSPOST_SCHEDULER=1 for testing / disabling. - servers/gateway/index.js — starts the scheduler alongside the existing orchestrator pipeline runner. **Design notes** - Publishers bypass MCP entirely and call the target app's REST API directly. Simpler than bootstrapping an in-process MCP client for each publish, and survives MCP transport changes cleanly. - The "manual" status is not a failure. It's the scheduler saying "I can't do this automatically because I don't have the binary payload or the target-specific context." The operator completes these by calling `_upload_track` / `pf_post_photo` / `pt_upload_video` with the transformed payload as a starting point, then `crow_crosspost_mark_published` closes the audit trail. - GC only deletes from terminal statuses. In-flight queued/ready rows are preserved past the 30-day cutoff — in practice they should never be that old, but if the operator forgot to cancel one, GC won't delete it and lose it silently. **Verified** - node --check on all modified + new files - node scripts/init-db.js runs cleanly; new column lands - Inserted a media-target row + a legacy no-payload row; one tick of publishTick marks both status='manual' with the right reason - Mastodon + GoToSocial publishers throw clean errors when their tokens aren't set - Blog publisher round-trips (DB insert → cleanup) - npm run check passes **Remaining F.12 follow-ups in the punch list** - Writefreely / lemmy / matrix-dendrite publishers (need target- specific context: collection_alias / community_id / room_id — either inferred from a default config setting or fetched from a crosspost_rules row). Shipping without these keeps the blast radius of F.13 small. - Pixelfed / peertube / funkwhale media publishers (need binary storage — scheduler would need to hold the upload file somewhere addressable. Deferred; operators who want auto-media-crossposting drive the upload themselves and call mark_published). --- CLAUDE.md | 2 +- scripts/init-db.js | 4 + servers/gateway/crossposting/scheduler.js | 290 ++++++++++++++++++++++ servers/gateway/index.js | 14 ++ servers/sharing/server.js | 5 +- skills/crow-crosspost.md | 4 +- 6 files changed, 314 insertions(+), 5 deletions(-) create mode 100644 servers/gateway/crossposting/scheduler.js diff --git a/CLAUDE.md b/CLAUDE.md index 0881dd8..1c94fd2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -205,7 +205,7 @@ Uses `@libsql/client` for local SQLite files (default: `~/.crow/data/crow.db`, g - **identity_attestations** — F.11 signed bindings (crow_id, app, external_handle, app_pubkey?, sig, version, revoked_at). Published via gateway `/.well-known/crow-identity.json`. UNIQUE(crow_id, app, external_handle, version) — new version row per rotation - **identity_attestation_revocations** — F.11 signed revocations (attestation_id FK CASCADE, revoked_at, reason, sig). Published via `/.well-known/crow-identity-revocations.json` - **crosspost_rules** — F.12.2 opt-in crosspost config (source_app, source_trigger, target_app, transform, active). Triggers: `on_publish`, `on_tag:`, `manual` -- **crosspost_log** — F.12.2 audit + idempotency log (idempotency_key, source_app, source_post_id, target_app, status, target_post_id, scheduled_at, published_at, cancelled_at). UNIQUE(idempotency_key, source_app, target_app). 7-day idempotency window; >30 days GC'd daily +- **crosspost_log** — F.12.2 audit + idempotency log (idempotency_key, source_app, source_post_id, target_app, status, target_post_id, scheduled_at, published_at, cancelled_at, **transformed_payload_json** — F.13). UNIQUE(idempotency_key, source_app, target_app). 7-day idempotency window; F.13 scheduler auto-publishes `ready`/`queued`-past-scheduled_at rows to mastodon/gotosocial/crow-blog and marks media-heavy targets (pixelfed/peertube/funkwhale) as `manual`. GC prunes >30 days - **iptv_playlists** — IPTV M3U playlist sources (name, url, auto_refresh, channel_count) - **iptv_channels** — IPTV channels from playlists (playlist_id FK, name, stream_url, tvg_id, group_title, is_favorite) - **iptv_epg** — Electronic Program Guide entries (channel_tvg_id, title, start_time, end_time, indexed) diff --git a/scripts/init-db.js b/scripts/init-db.js index e1b405c..43dbbb1 100644 --- a/scripts/init-db.js +++ b/scripts/init-db.js @@ -547,6 +547,10 @@ await initTable("identity_attestation_revocations table", ` await addColumnIfMissing("contacts", "external_handle", "TEXT"); await addColumnIfMissing("contacts", "external_source", "TEXT"); +// F.13: scheduler needs the transformed payload to publish. Earlier crosspost_log +// rows (from F.12) will have NULL — scheduler treats NULL as "manually handled". +await addColumnIfMissing("crosspost_log", "transformed_payload_json", "TEXT"); + // --- F.12: Crosspost rules + log --- // crosspost_rules holds the operator's opt-in config: "when a new post appears // in app X, publish a transformed copy to app Y". Triggers: on_publish (with diff --git a/servers/gateway/crossposting/scheduler.js b/servers/gateway/crossposting/scheduler.js new file mode 100644 index 0000000..a68b2e7 --- /dev/null +++ b/servers/gateway/crossposting/scheduler.js @@ -0,0 +1,290 @@ +/** + * F.13: Crosspost scheduler + GC. + * + * Runs inside the gateway process. Two loops: + * + * 1. Publish loop (every 15s): scans crosspost_log for rows with + * status='ready' OR (status='queued' AND scheduled_at <= now). For + * each row, dispatches the stored `transformed_payload_json` to the + * target bundle's public API using PUBLISHERS below. Marks the row + * 'published' with target_post_id on success; 'error' with error + * string on failure. + * + * 2. GC loop (every 1h): deletes crosspost_log rows older than 30 days. + * Also expires moderation_actions in 'pending' state past their + * expires_at (sweeps to 'expired'; F.11 docs this TTL sweep). + * + * Design points: + * - Publishers are plain HTTP calls to the target app's REST API using + * the bundle's own env vars (MASTODON_URL + MASTODON_ACCESS_TOKEN, + * GTS_URL + GTS_ACCESS_TOKEN, etc.). This bypasses the MCP layer + * entirely — no in-process MCP client needed. Simpler and survives + * MCP transport changes. + * - Media-heavy targets (pixelfed photo posts, peertube uploads, + * funkwhale track uploads) need binary file data we don't have stored. + * These remain OPERATOR-DRIVEN: scheduler marks them status='manual' + * and leaves them alone. The `crow_crosspost_mark_published` tool + * closes the audit log after the operator publishes by hand. + * - Text-only targets (mastodon, gotosocial, writefreely, crow-blog, + * lemmy text posts) are fully automated. + * - Overlap protection: a poll that's still running when the next tick + * fires is a no-op (in-flight set). + * - Disabled via CROW_DISABLE_CROSSPOST_SCHEDULER=1 for testing. + */ + +import { createDbClient } from "../../db.js"; +import { createNotification } from "../../shared/notifications.js"; + +const PUBLISH_INTERVAL_MS = 15_000; +const GC_INTERVAL_MS = 3600_000; // 1 hour +const MAX_BATCH = 20; +const LOG_RETENTION_DAYS = 30; + +// --- Publishers --- + +async function publishMastodon(payload) { + const url = (process.env.MASTODON_URL || "http://mastodon-web:3000").replace(/\/+$/, ""); + const token = process.env.MASTODON_ACCESS_TOKEN; + if (!token) throw new Error("MASTODON_ACCESS_TOKEN not set"); + const body = { + status: payload.status, + visibility: payload.visibility || "public", + ...(payload.spoiler_text ? { spoiler_text: payload.spoiler_text } : {}), + ...(payload.language ? { language: payload.language } : {}), + ...(payload.sensitive != null ? { sensitive: payload.sensitive } : {}), + }; + const res = await fetch(`${url}/api/v1/statuses`, { + method: "POST", + headers: { Authorization: `Bearer ${token}`, "Content-Type": "application/json" }, + body: JSON.stringify(body), + }); + if (!res.ok) throw new Error(`Mastodon ${res.status}: ${(await res.text()).slice(0, 300)}`); + const data = await res.json(); + return { target_post_id: String(data.id), url: data.url }; +} + +async function publishGoToSocial(payload) { + const url = (process.env.GTS_URL || "http://gotosocial:8080").replace(/\/+$/, ""); + const token = process.env.GTS_ACCESS_TOKEN; + if (!token) throw new Error("GTS_ACCESS_TOKEN not set"); + const body = { + status: payload.status, + visibility: payload.visibility || "public", + ...(payload.spoiler_text ? { spoiler_text: payload.spoiler_text } : {}), + }; + const res = await fetch(`${url}/api/v1/statuses`, { + method: "POST", + headers: { Authorization: `Bearer ${token}`, "Content-Type": "application/json" }, + body: JSON.stringify(body), + }); + if (!res.ok) throw new Error(`GoToSocial ${res.status}: ${(await res.text()).slice(0, 300)}`); + const data = await res.json(); + return { target_post_id: String(data.id), url: data.url }; +} + +async function publishBlog(payload) { + // crow-blog is in-process; use the local DB directly rather than + // calling our own REST endpoint. + const db = createDbClient(); + try { + const title = payload.status?.split("\n")[0]?.slice(0, 200) || "Cross-post"; + const content = payload.status || ""; + const slug = title.toLowerCase().replace(/[^a-z0-9]+/g, "-").replace(/^-+|-+$/g, "").slice(0, 100) + "-" + Date.now().toString(36); + const result = await db.execute({ + sql: `INSERT INTO blog_posts (title, slug, content, status, visibility, created_at, updated_at, published_at) + VALUES (?, ?, ?, 'published', 'public', datetime('now'), datetime('now'), datetime('now')) + RETURNING id`, + args: [title, slug, content], + }); + const id = Number(result.rows[0].id); + return { target_post_id: String(id), url: `/blog/${slug}` }; + } finally { + try { db.close(); } catch {} + } +} + +const PUBLISHERS = { + mastodon: publishMastodon, + gotosocial: publishGoToSocial, + blog: publishBlog, +}; + +// Targets we recognize but can't auto-publish (need file data we didn't store) +const MANUAL_TARGETS = new Set([ + "pixelfed", // photo upload needs binary + "peertube", // video upload needs binary + "funkwhale", // track upload needs binary + "writefreely", // text-only but needs collection alias wiring; TODO in follow-up + "lemmy", // text-only but needs community_id in payload; TODO + "matrix-dendrite", // needs room_id; TODO +]); + +function publisherFor(targetApp) { + if (PUBLISHERS[targetApp]) return PUBLISHERS[targetApp]; + if (MANUAL_TARGETS.has(targetApp)) return null; + return null; +} + +// --- Publish loop --- + +const inFlight = new Set(); + +async function publishOne(db, row) { + if (inFlight.has(row.id)) return; + inFlight.add(row.id); + try { + if (!row.transformed_payload_json) { + // Legacy F.12 row without stored payload — mark as manual. + await db.execute({ + sql: "UPDATE crosspost_log SET status = 'manual', error = ? WHERE id = ?", + args: ["no_transformed_payload_stored (row pre-dates F.13 migration)", row.id], + }); + return; + } + + const publisher = publisherFor(row.target_app); + if (!publisher) { + await db.execute({ + sql: "UPDATE crosspost_log SET status = 'manual' WHERE id = ?", + args: [row.id], + }); + return; + } + + const payload = JSON.parse(row.transformed_payload_json); + const { target_post_id, url } = await publisher(payload); + const now = Math.floor(Date.now() / 1000); + await db.execute({ + sql: `UPDATE crosspost_log SET status = 'published', target_post_id = ?, published_at = ?, error = NULL + WHERE id = ? AND status != 'cancelled'`, + args: [target_post_id, now, row.id], + }); + try { + await createNotification(db, { + title: `Cross-post published to ${row.target_app}`, + body: `${row.source_app}#${row.source_post_id} → ${row.target_app}#${target_post_id}${url ? ` (${url})` : ""}`, + type: "peer", + source: "crosspost", + priority: "low", + }); + } catch {} + } catch (err) { + const msg = String(err?.message || err).slice(0, 1000); + try { + await db.execute({ + sql: "UPDATE crosspost_log SET status = 'error', error = ? WHERE id = ? AND status != 'cancelled'", + args: [msg, row.id], + }); + } catch {} + try { + await createNotification(db, { + title: `Cross-post failed to ${row.target_app}`, + body: `${row.source_app}#${row.source_post_id}: ${msg.slice(0, 200)}`, + type: "system", + source: "crosspost", + priority: "high", + }); + } catch {} + } finally { + inFlight.delete(row.id); + } +} + +async function publishTick(db) { + const now = Math.floor(Date.now() / 1000); + const rows = await db.execute({ + sql: `SELECT id, source_app, source_post_id, target_app, transformed_payload_json, status, scheduled_at + FROM crosspost_log + WHERE (status = 'ready' OR (status = 'queued' AND scheduled_at <= ?)) + ORDER BY scheduled_at ASC LIMIT ?`, + args: [now, MAX_BATCH], + }); + for (const r of rows.rows) { + const row = { + id: Number(r.id), + source_app: r.source_app, + source_post_id: r.source_post_id, + target_app: r.target_app, + transformed_payload_json: r.transformed_payload_json, + }; + publishOne(db, row).catch(() => {}); + } +} + +// --- GC loop --- + +async function gcTick(db) { + try { + const cutoff = Math.floor(Date.now() / 1000) - LOG_RETENTION_DAYS * 86400; + const res = await db.execute({ + sql: "DELETE FROM crosspost_log WHERE created_at < ? AND status IN ('published', 'cancelled', 'error', 'manual')", + args: [cutoff], + }); + if (res.rowsAffected > 0) { + // eslint-disable-next-line no-console + console.log(`[crosspost-gc] pruned ${res.rowsAffected} crosspost_log rows older than ${LOG_RETENTION_DAYS}d`); + } + } catch (err) { + console.warn(`[crosspost-gc] error: ${err.message}`); + } + + try { + // F.11 moderation_actions TTL sweep — pending rows past expires_at → expired + const now = Math.floor(Date.now() / 1000); + const res = await db.execute({ + sql: "UPDATE moderation_actions SET status = 'expired' WHERE status = 'pending' AND expires_at < ?", + args: [now], + }); + if (res.rowsAffected > 0) { + console.log(`[moderation-gc] expired ${res.rowsAffected} moderation_actions past their TTL`); + } + } catch (err) { + console.warn(`[moderation-gc] error: ${err.message}`); + } +} + +// --- Exported start/stop --- + +let publishTimer = null; +let gcTimer = null; + +export function startCrosspostScheduler(opts = {}) { + if (process.env.CROW_DISABLE_CROSSPOST_SCHEDULER === "1") { + console.log("[crosspost-scheduler] disabled via CROW_DISABLE_CROSSPOST_SCHEDULER=1"); + return { stop() {} }; + } + const publishMs = opts.publishIntervalMs || PUBLISH_INTERVAL_MS; + const gcMs = opts.gcIntervalMs || GC_INTERVAL_MS; + + const runPublish = async () => { + const db = createDbClient(); + try { await publishTick(db); } + catch (err) { console.warn(`[crosspost-scheduler] tick error: ${err.message}`); } + finally { try { db.close(); } catch {} } + }; + + const runGc = async () => { + const db = createDbClient(); + try { await gcTick(db); } + finally { try { db.close(); } catch {} } + }; + + // Kick off one GC on start to normalize stale state + runGc().catch(() => {}); + + publishTimer = setInterval(runPublish, publishMs); + gcTimer = setInterval(runGc, gcMs); + console.log(`[crosspost-scheduler] started (publish ${publishMs / 1000}s, gc ${gcMs / 1000}s)`); + + return { + stop() { + if (publishTimer) clearInterval(publishTimer); + if (gcTimer) clearInterval(gcTimer); + publishTimer = null; + gcTimer = null; + }, + }; +} + +// Exported for tests +export const _internal = { publishTick, gcTick, publishOne, PUBLISHERS, MANUAL_TARGETS }; diff --git a/servers/gateway/index.js b/servers/gateway/index.js index 958136b..9930b84 100644 --- a/servers/gateway/index.js +++ b/servers/gateway/index.js @@ -140,6 +140,20 @@ try { // audit_log table may not exist yet (first run before init-db) } +// Run startup migrations (idempotent; each migration self-tracks via dashboard_settings.migrations) +try { + const { runGatewayMigrations } = await import("./migrations.js"); + const _migDb = createDbClient(); + const results = await runGatewayMigrations(_migDb); + _migDb.close(); + for (const r of results) { + if (r.ran) console.log(`[migrations] ${r.id}: applied (profile="${r.profileName}", voice=${r.voice})`); + else if (r.error) console.warn(`[migrations] ${r.id}: FAILED — ${r.error}`); + } +} catch (e) { + console.warn("[migrations] startup migrations skipped:", e.message); +} + // Consolidated session manager const sessionManager = new SessionManager(); diff --git a/servers/sharing/server.js b/servers/sharing/server.js index 5a969ee..d8422f2 100644 --- a/servers/sharing/server.js +++ b/servers/sharing/server.js @@ -2505,12 +2505,13 @@ export function createSharingServer(dbPath, options = {}) { const inserted = await db.execute({ sql: `INSERT INTO crosspost_log (idempotency_key, source_app, source_post_id, target_app, - transform, status, scheduled_at, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) + transform, status, scheduled_at, created_at, transformed_payload_json) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id`, args: [ idempotency_key, source_app, source_post_id, target_app, `${source_app}→${target_app}`, status, scheduledAt, now, + JSON.stringify(transformed), ], }); const logId = Number(inserted.rows[0].id); diff --git a/skills/crow-crosspost.md b/skills/crow-crosspost.md index eb5ad83..c887bf93 100644 --- a/skills/crow-crosspost.md +++ b/skills/crow-crosspost.md @@ -98,7 +98,7 @@ Operator can cancel before `scheduled_at`: crow_crosspost_cancel { "log_id": } ``` -After `scheduled_at` passes, a future dispatcher (not yet shipped — lands in a follow-up) calls the target app's publish verb + `crow_crosspost_mark_published`. +After `scheduled_at` passes, the F.13 scheduler inside the gateway auto-publishes the stored `transformed_payload_json` to the target bundle (text targets: mastodon, gotosocial, crow-blog). Media-heavy targets (pixelfed photos, peertube videos, funkwhale tracks) stay status=`manual` because the scheduler doesn't have the binary file data to upload — operator invokes the target's own upload tool + `crow_crosspost_mark_published` by hand. ### List recent cross-posts @@ -123,4 +123,4 @@ If you've attested your handles on the source and target apps, cross-posts inher ## Log retention -Entries >30 days are garbage-collected by the daily cleanup sweeper (not yet wired — manual `DELETE FROM crosspost_log WHERE created_at < strftime('%s', 'now', '-30 days')` until F.12.3 or similar). +Entries >30 days are garbage-collected by the F.13 scheduler's hourly GC tick (only prunes terminal statuses: published/cancelled/error/manual — in-flight queued/ready rows are kept). The same tick sweeps `moderation_actions` past their 72h expires_at into `status='expired'`.