|
| 1 | +import { AttachmentStatus, PrismaClient } from "@sourcebot/db"; |
| 2 | +import { createLogger, env, getStorageBackend } from "@sourcebot/shared"; |
| 3 | +import { setIntervalAsync } from "./utils.js"; |
| 4 | + |
| 5 | +const BATCH_SIZE = 1_000; |
| 6 | +const ONE_HOUR_MS = 60 * 60 * 1000; |
| 7 | + |
| 8 | +const logger = createLogger('attachment-pruner'); |
| 9 | + |
| 10 | +/** |
| 11 | + * Periodically reclaims orphaned attachment blobs older than the configured TTL, |
| 12 | + * along with their stored bytes, using the `DELETING` tombstone protocol: an |
| 13 | + * orphan is first atomically flipped to `DELETING`, then its bytes are deleted, |
| 14 | + * and only then is the row removed. Because the row (the only durable handle to |
| 15 | + * the bytes) outlives the byte delete, a failed byte delete is always retryable. |
| 16 | + * |
| 17 | + * Each tick condemns two classes of orphan to `DELETING`, then reclaims all |
| 18 | + * tombstones: |
| 19 | + * |
| 20 | + * 1. PENDING (uploaded-but-never-linked): produced when a user selects a file |
| 21 | + * in the chat box but never sends the message. |
| 22 | + * 2. COMMITTED with zero links: normally a committed blob is reclaimed inline |
| 23 | + * by the chat-delete sweep in the web app, but if that sweep is interrupted |
| 24 | + * (process crash / DB error / failed byte delete) the blob is left tombstoned |
| 25 | + * or unlinked. This is the backstop for that case. |
| 26 | + * |
| 27 | + * @note Byte deletion goes through the shared `StorageBackend`, so the web app |
| 28 | + * and this worker share one on-disk layout. |
| 29 | + */ |
| 30 | +export class AttachmentPruner { |
| 31 | + private interval?: NodeJS.Timeout; |
| 32 | + private readonly storage = getStorageBackend(); |
| 33 | + |
| 34 | + constructor(private db: PrismaClient) {} |
| 35 | + |
| 36 | + startScheduler() { |
| 37 | + const ttlHours = env.SOURCEBOT_CHAT_ATTACHMENT_ORPHAN_TTL_HOURS; |
| 38 | + if (ttlHours <= 0) { |
| 39 | + logger.info('SOURCEBOT_CHAT_ATTACHMENT_ORPHAN_TTL_HOURS is 0, attachment orphan pruning is disabled.'); |
| 40 | + return; |
| 41 | + } |
| 42 | + |
| 43 | + logger.debug(`Attachment pruner started. Reclaiming orphaned attachments older than ${ttlHours} hours.`); |
| 44 | + |
| 45 | + // Run immediately on startup, then every hour. The startup call isn't |
| 46 | + // awaited, so log any failure here: this worker exits on |
| 47 | + // unhandledRejection, and the recurring schedule will retry. |
| 48 | + this.pruneOrphanedAttachments().catch((error) => { |
| 49 | + logger.warn(`Initial attachment prune failed: ${error}`); |
| 50 | + }); |
| 51 | + this.interval = setIntervalAsync(() => this.pruneOrphanedAttachments(), ONE_HOUR_MS); |
| 52 | + } |
| 53 | + |
| 54 | + async dispose() { |
| 55 | + if (this.interval) { |
| 56 | + clearInterval(this.interval); |
| 57 | + this.interval = undefined; |
| 58 | + } |
| 59 | + } |
| 60 | + |
| 61 | + private async pruneOrphanedAttachments() { |
| 62 | + const cutoff = new Date(Date.now() - env.SOURCEBOT_CHAT_ATTACHMENT_ORPHAN_TTL_HOURS * ONE_HOUR_MS); |
| 63 | + |
| 64 | + // Condemn orphans by flipping them to the DELETING tombstone. Each claim |
| 65 | + // is atomic, so a PENDING blob committed by a concurrent send (its commit |
| 66 | + // matches only PENDING rows) or a zero-link blob re-linked by a concurrent |
| 67 | + // duplicate-chat loses the claim and is left intact. |
| 68 | + // |
| 69 | + // PENDING orphans: uploaded but the message was never sent. |
| 70 | + const pendingClaimed = await this.db.attachment.updateMany({ |
| 71 | + where: { |
| 72 | + status: AttachmentStatus.PENDING, |
| 73 | + createdAt: { lt: cutoff }, |
| 74 | + }, |
| 75 | + data: { status: AttachmentStatus.DELETING }, |
| 76 | + }); |
| 77 | + |
| 78 | + // COMMITTED orphans: blobs left with zero links by an interrupted |
| 79 | + // chat-delete sweep in the web app. |
| 80 | + const committedClaimed = await this.db.attachment.updateMany({ |
| 81 | + where: { |
| 82 | + status: AttachmentStatus.COMMITTED, |
| 83 | + createdAt: { lt: cutoff }, |
| 84 | + chats: { none: {} }, |
| 85 | + }, |
| 86 | + data: { status: AttachmentStatus.DELETING }, |
| 87 | + }); |
| 88 | + |
| 89 | + // Reclaim every tombstone: delete bytes, then the row. This also picks up |
| 90 | + // tombstones left behind by the web app's inline reclaim (or a crashed |
| 91 | + // earlier tick) whose byte delete failed. |
| 92 | + const reclaimed = await this.reclaimTombstonedAttachments(); |
| 93 | + |
| 94 | + if (pendingClaimed.count > 0 || committedClaimed.count > 0 || reclaimed > 0) { |
| 95 | + logger.debug( |
| 96 | + `Attachment prune: condemned ${pendingClaimed.count} PENDING + ` + |
| 97 | + `${committedClaimed.count} COMMITTED orphan(s), reclaimed ${reclaimed} tombstone(s).`, |
| 98 | + ); |
| 99 | + } |
| 100 | + } |
| 101 | + |
| 102 | + /** |
| 103 | + * Deletes the bytes for every `DELETING` tombstone, then removes the row. |
| 104 | + * The row (the only durable handle to the bytes) is removed only after its |
| 105 | + * bytes are confirmed gone, so a failed byte delete leaves the tombstone in |
| 106 | + * place to be retried on the next tick — bytes can never be orphaned by a |
| 107 | + * transient storage error. Rows whose byte delete fails this run are |
| 108 | + * excluded from subsequent batches so a persistent failure can't spin the |
| 109 | + * loop. |
| 110 | + * |
| 111 | + * @returns the number of tombstones fully reclaimed (bytes + row). |
| 112 | + */ |
| 113 | + private async reclaimTombstonedAttachments(): Promise<number> { |
| 114 | + let totalReclaimed = 0; |
| 115 | + const failedIds: string[] = []; |
| 116 | + |
| 117 | + while (true) { |
| 118 | + const batch = await this.db.attachment.findMany({ |
| 119 | + where: { status: AttachmentStatus.DELETING, id: { notIn: failedIds } }, |
| 120 | + select: { id: true, storageKey: true }, |
| 121 | + take: BATCH_SIZE, |
| 122 | + }); |
| 123 | + |
| 124 | + if (batch.length === 0) { |
| 125 | + break; |
| 126 | + } |
| 127 | + |
| 128 | + const settled = await Promise.allSettled( |
| 129 | + batch.map((attachment) => this.storage.delete(attachment.storageKey))); |
| 130 | + |
| 131 | + const reclaimedIds: string[] = []; |
| 132 | + batch.forEach((attachment, index) => { |
| 133 | + const outcome = settled[index]; |
| 134 | + if (outcome.status === 'fulfilled') { |
| 135 | + reclaimedIds.push(attachment.id); |
| 136 | + } else { |
| 137 | + logger.warn(`Failed to delete bytes for tombstoned attachment ${attachment.id}, will retry next tick: ${outcome.reason}`); |
| 138 | + failedIds.push(attachment.id); |
| 139 | + } |
| 140 | + }); |
| 141 | + |
| 142 | + if (reclaimedIds.length > 0) { |
| 143 | + const result = await this.db.attachment.deleteMany({ |
| 144 | + where: { id: { in: reclaimedIds }, status: AttachmentStatus.DELETING }, |
| 145 | + }); |
| 146 | + totalReclaimed += result.count; |
| 147 | + } |
| 148 | + |
| 149 | + if (batch.length < BATCH_SIZE) { |
| 150 | + break; |
| 151 | + } |
| 152 | + } |
| 153 | + |
| 154 | + return totalReclaimed; |
| 155 | + } |
| 156 | +} |
0 commit comments