From e9c1699bfd7f3f1b999a4c9ebeb1f442218aa120 Mon Sep 17 00:00:00 2001 From: Ido Shamun <1993245+idoshamun@users.noreply.github.com> Date: Wed, 18 Mar 2026 18:14:58 +0200 Subject: [PATCH 1/5] Add channel highlights generation flow --- .infra/crons.ts | 4 + __tests__/cron/channelHighlights.ts | 85 ++ __tests__/workers/generateChannelHighlight.ts | 487 ++++++++++ src/common/channelHighlight/definitions.ts | 35 + src/common/channelHighlight/evaluate.ts | 127 +++ src/common/channelHighlight/generate.ts | 883 ++++++++++++++++++ src/common/channelHighlight/publish.ts | 32 + src/common/channelHighlight/schema.ts | 39 + src/common/typedPubsub.ts | 4 + src/cron/channelHighlights.ts | 27 + src/cron/index.ts | 2 + src/entity/ChannelHighlightDefinition.ts | 32 + src/entity/ChannelHighlightRun.ts | 50 + src/entity/ChannelHighlightState.ts | 20 + src/entity/index.ts | 3 + .../1773000000000-ChannelHighlights.ts | 84 ++ src/workers/generateChannelHighlight.ts | 96 ++ src/workers/index.ts | 2 + 18 files changed, 2012 insertions(+) create mode 100644 __tests__/cron/channelHighlights.ts create mode 100644 __tests__/workers/generateChannelHighlight.ts create mode 100644 src/common/channelHighlight/definitions.ts create mode 100644 src/common/channelHighlight/evaluate.ts create mode 100644 src/common/channelHighlight/generate.ts create mode 100644 src/common/channelHighlight/publish.ts create mode 100644 src/common/channelHighlight/schema.ts create mode 100644 src/cron/channelHighlights.ts create mode 100644 src/entity/ChannelHighlightDefinition.ts create mode 100644 src/entity/ChannelHighlightRun.ts create mode 100644 src/entity/ChannelHighlightState.ts create mode 100644 src/migration/1773000000000-ChannelHighlights.ts create mode 100644 src/workers/generateChannelHighlight.ts diff --git a/.infra/crons.ts b/.infra/crons.ts index 57e652f97d..5d4ade5d21 100644 --- a/.infra/crons.ts +++ b/.infra/crons.ts @@ -174,6 +174,10 @@ export const crons: Cron[] = [ name: 'channel-digests', schedule: '17 4 * * *', }, + { + name: 'channel-highlights', + schedule: '12 * * * *', + }, { name: 'clean-expired-better-auth-sessions', schedule: '0 3 * * *', diff --git a/__tests__/cron/channelHighlights.ts b/__tests__/cron/channelHighlights.ts new file mode 100644 index 0000000000..6ab1d72d3d --- /dev/null +++ b/__tests__/cron/channelHighlights.ts @@ -0,0 +1,85 @@ +import type { DataSource } from 'typeorm'; +import createOrGetConnection from '../../src/db'; +import { ChannelHighlightDefinition } from '../../src/entity/ChannelHighlightDefinition'; +import * as typedPubsub from '../../src/common/typedPubsub'; +import * as channelHighlightsModule from '../../src/cron/channelHighlights'; +import { crons } from '../../src/cron/index'; + +let con: DataSource; + +beforeAll(async () => { + con = await createOrGetConnection(); +}); + +describe('channelHighlights cron', () => { + afterEach(async () => { + jest.restoreAllMocks(); + await con.getRepository(ChannelHighlightDefinition).clear(); + }); + + it('should be registered', () => { + const registeredCron = crons.find( + (item) => item.name === channelHighlightsModule.default.name, + ); + + expect(registeredCron).toBeDefined(); + }); + + it('should enqueue enabled highlight definitions', async () => { + jest + .spyOn(channelHighlightsModule, 'getChannelHighlightsNow') + .mockReturnValue(new Date('2026-03-02T10:00:00.000Z')); + const triggerTypedEventSpy = jest + .spyOn(typedPubsub, 'triggerTypedEvent') + .mockResolvedValue(); + + await con.getRepository(ChannelHighlightDefinition).save([ + { + channel: 'backend', + enabled: true, + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 10, + }, + { + channel: 'vibes', + enabled: true, + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 10, + }, + { + channel: 'disabled', + enabled: false, + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 10, + }, + ]); + + await channelHighlightsModule.default.handler( + con, + {} as never, + {} as never, + ); + + expect(triggerTypedEventSpy.mock.calls).toEqual([ + [ + {}, + 'api.v1.generate-channel-highlight', + { + channel: 'backend', + scheduledAt: '2026-03-02T10:00:00.000Z', + }, + ], + [ + {}, + 'api.v1.generate-channel-highlight', + { + channel: 'vibes', + scheduledAt: '2026-03-02T10:00:00.000Z', + }, + ], + ]); + }); +}); diff --git a/__tests__/workers/generateChannelHighlight.ts b/__tests__/workers/generateChannelHighlight.ts new file mode 100644 index 0000000000..d2c8dbfb0e --- /dev/null +++ b/__tests__/workers/generateChannelHighlight.ts @@ -0,0 +1,487 @@ +import type { DataSource } from 'typeorm'; +import createOrGetConnection from '../../src/db'; +import { ChannelHighlightDefinition } from '../../src/entity/ChannelHighlightDefinition'; +import { ChannelHighlightRun } from '../../src/entity/ChannelHighlightRun'; +import { ChannelHighlightState } from '../../src/entity/ChannelHighlightState'; +import { PostHighlight } from '../../src/entity/PostHighlight'; +import { CollectionPost, ArticlePost, Source } from '../../src/entity'; +import { + PostRelation, + PostRelationType, +} from '../../src/entity/posts/PostRelation'; +import { PostType } from '../../src/entity/posts/Post'; +import worker from '../../src/workers/generateChannelHighlight'; +import { typedWorkers } from '../../src/workers/index'; +import { deleteKeysByPattern } from '../../src/redis'; +import * as evaluator from '../../src/common/channelHighlight/evaluate'; +import { expectSuccessfulTypedBackground } from '../helpers'; +import { createSource } from '../fixture/source'; + +let con: DataSource; + +beforeAll(async () => { + con = await createOrGetConnection(); +}); + +const saveArticle = async ({ + id, + title, + createdAt, + statsUpdatedAt = createdAt, + metadataChangedAt = createdAt, + channel = 'vibes', + sourceId = 'content-source', +}: { + id: string; + title: string; + createdAt: Date; + statsUpdatedAt?: Date; + metadataChangedAt?: Date; + channel?: string; + sourceId?: string; +}) => + con.getRepository(ArticlePost).save({ + id, + shortId: id, + title, + url: `https://example.com/${id}`, + canonicalUrl: `https://example.com/${id}`, + score: 0, + sourceId, + visible: true, + deleted: false, + banned: false, + showOnFeed: true, + createdAt, + metadataChangedAt, + statsUpdatedAt, + type: PostType.Article, + contentMeta: { + channels: [channel], + }, + }); + +const saveCollection = async ({ + id, + title, + createdAt, + channel = 'vibes', + sourceId = 'content-source', +}: { + id: string; + title: string; + createdAt: Date; + channel?: string; + sourceId?: string; +}) => + con.getRepository(CollectionPost).save({ + id, + shortId: id, + title, + url: `https://example.com/${id}`, + canonicalUrl: `https://example.com/${id}`, + score: 0, + sourceId, + visible: true, + deleted: false, + banned: false, + showOnFeed: true, + createdAt, + metadataChangedAt: createdAt, + statsUpdatedAt: createdAt, + type: PostType.Collection, + contentMeta: { + channels: [channel], + }, + }); + +describe('generateChannelHighlight worker', () => { + beforeEach(async () => { + await con + .getRepository(Source) + .save([ + createSource( + 'content-source', + 'Content', + 'https://daily.dev/content.png', + ), + createSource( + 'secondary-source', + 'Secondary', + 'https://daily.dev/secondary.png', + ), + ]); + }); + + afterEach(async () => { + jest.restoreAllMocks(); + await deleteKeysByPattern('channel-highlight:*'); + await con.getRepository(ChannelHighlightRun).clear(); + await con.getRepository(ChannelHighlightState).clear(); + await con.getRepository(ChannelHighlightDefinition).clear(); + await con.getRepository(PostHighlight).clear(); + await con.getRepository(PostRelation).clear(); + await con + .createQueryBuilder() + .delete() + .from('post') + .where('"sourceId" IN (:...sourceIds)', { + sourceIds: ['content-source', 'secondary-source'], + }) + .execute(); + await con + .getRepository(Source) + .delete(['content-source', 'secondary-source']); + }); + + it('should be registered', () => { + const registeredWorker = typedWorkers.find( + (item) => item.subscription === worker.subscription, + ); + + expect(registeredWorker).toBeDefined(); + }); + + it('should keep live highlights unchanged in shadow mode and store a comparison run', async () => { + const now = new Date('2026-03-03T10:00:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ + channel: 'vibes', + enabled: true, + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 3, + }); + await saveArticle({ + id: 'live-1', + title: 'Live story', + createdAt: new Date('2026-03-03T08:00:00.000Z'), + }); + await saveCollection({ + id: 'collection-1', + title: 'Fresh collection', + createdAt: new Date('2026-03-03T09:30:00.000Z'), + }); + await saveArticle({ + id: 'child-1', + title: 'Fresh child story', + createdAt: new Date('2026-03-03T09:20:00.000Z'), + }); + await con.getRepository(PostRelation).save({ + postId: 'collection-1', + relatedPostId: 'child-1', + type: PostRelationType.Collection, + }); + await con.getRepository(PostHighlight).save({ + channel: 'vibes', + postId: 'live-1', + rank: 1, + headline: 'Live headline', + }); + + const evaluatorSpy = jest + .spyOn(evaluator, 'evaluateChannelHighlights') + .mockImplementation(async ({ candidates }) => ({ + items: candidates.slice(0, 1).map((candidate, index) => ({ + storyKey: candidate.storyKey, + postId: candidate.canonicalPostId, + headline: `${candidate.title} headline`, + significanceScore: 0.9, + significanceLabel: 'breaking', + rank: index + 1, + reason: 'test', + })), + })); + + await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + worker, + { + channel: 'vibes', + scheduledAt: now.toISOString(), + }, + ); + + expect(evaluatorSpy).toHaveBeenCalledTimes(1); + expect(evaluatorSpy.mock.calls[0][0].candidates[0]).toMatchObject({ + storyKey: 'collection:collection-1', + canonicalPostId: 'collection-1', + memberPostIds: ['child-1', 'collection-1'], + }); + + const liveHighlights = await con.getRepository(PostHighlight).find({ + where: { channel: 'vibes' }, + order: { rank: 'ASC' }, + }); + expect(liveHighlights).toHaveLength(1); + expect(liveHighlights[0]).toMatchObject({ + postId: 'live-1', + headline: 'Live headline', + }); + + const run = await con.getRepository(ChannelHighlightRun).findOneByOrFail({ + channel: 'vibes', + }); + expect(run.status).toBe('completed'); + expect(run.comparison).toMatchObject({ + wouldPublish: true, + published: false, + baselineCount: 1, + internalCount: 1, + }); + + const state = await con + .getRepository(ChannelHighlightState) + .findOneByOrFail({ + channel: 'vibes', + }); + expect(state.candidatePool).toEqual({ + stories: expect.arrayContaining([ + expect.objectContaining({ + storyKey: 'collection:collection-1', + canonicalPostId: 'collection-1', + }), + ]), + }); + }); + + it('should publish evaluated highlights in publish mode', async () => { + const now = new Date('2026-03-03T11:00:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ + channel: 'vibes', + enabled: true, + mode: 'publish', + candidateHorizonHours: 72, + maxItems: 3, + }); + await saveArticle({ + id: 'publish-1', + title: 'Publish me', + createdAt: new Date('2026-03-03T10:30:00.000Z'), + }); + + jest.spyOn(evaluator, 'evaluateChannelHighlights').mockResolvedValue({ + items: [ + { + storyKey: 'url:https://example.com/publish-1', + postId: 'publish-1', + headline: 'Publish headline', + significanceScore: 0.95, + significanceLabel: 'breaking', + rank: 1, + reason: 'test', + }, + ], + }); + + await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + worker, + { + channel: 'vibes', + scheduledAt: now.toISOString(), + }, + ); + + const liveHighlights = await con.getRepository(PostHighlight).find({ + where: { channel: 'vibes' }, + order: { rank: 'ASC' }, + }); + expect(liveHighlights).toHaveLength(1); + expect(liveHighlights[0]).toMatchObject({ + postId: 'publish-1', + headline: 'Publish headline', + rank: 1, + }); + }); + + it('should publish a collection upgrade for the same story', async () => { + const now = new Date('2026-03-03T11:30:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ + channel: 'vibes', + enabled: true, + mode: 'publish', + candidateHorizonHours: 72, + maxItems: 3, + }); + await saveCollection({ + id: 'col-upgrade', + title: 'Collection upgrade', + createdAt: new Date('2026-03-03T11:10:00.000Z'), + }); + await saveArticle({ + id: 'child-upgr', + title: 'Child upgrade', + createdAt: new Date('2026-03-03T11:05:00.000Z'), + }); + await con.getRepository(PostRelation).save({ + postId: 'col-upgrade', + relatedPostId: 'child-upgr', + type: PostRelationType.Collection, + }); + await con.getRepository(PostHighlight).save({ + channel: 'vibes', + postId: 'child-upgr', + rank: 1, + headline: 'Same story headline', + }); + + jest.spyOn(evaluator, 'evaluateChannelHighlights').mockResolvedValue({ + items: [ + { + storyKey: 'collection:col-upgrade', + postId: 'col-upgrade', + headline: 'Same story headline', + significanceScore: 0.96, + significanceLabel: 'breaking', + rank: 1, + reason: 'test', + }, + ], + }); + + await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + worker, + { + channel: 'vibes', + scheduledAt: now.toISOString(), + }, + ); + + const liveHighlights = await con.getRepository(PostHighlight).find({ + where: { channel: 'vibes' }, + order: { rank: 'ASC' }, + }); + expect(liveHighlights).toEqual([ + expect.objectContaining({ + postId: 'col-upgrade', + headline: 'Same story headline', + rank: 1, + }), + ]); + }); + + it('should re-evaluate a cached story when a relation changed after the last evaluation', async () => { + const now = new Date('2026-03-03T12:30:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ + channel: 'vibes', + enabled: true, + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 3, + }); + await saveCollection({ + id: 'col-cached', + title: 'Cached collection', + createdAt: new Date('2026-03-03T11:00:00.000Z'), + }); + await saveArticle({ + id: 'child-cach', + title: 'Cached child', + createdAt: new Date('2026-03-03T10:55:00.000Z'), + }); + await con.getRepository(ChannelHighlightState).save({ + channel: 'vibes', + lastFetchedAt: new Date('2026-03-03T12:00:00.000Z'), + lastPublishedAt: null, + candidatePool: { + stories: [ + { + storyKey: 'collection:col-cached', + canonicalPostId: 'col-cached', + collectionId: 'col-cached', + memberPostIds: ['child-cach', 'col-cached'], + firstSeenAt: '2026-03-03T10:55:00.000Z', + lastSeenAt: '2026-03-03T12:00:00.000Z', + lastLlmEvaluatedAt: '2026-03-03T12:05:00.000Z', + lastSignificanceScore: 0.82, + lastSignificanceLabel: 'breaking', + lastHeadline: 'Cached headline', + status: 'active', + }, + ], + }, + }); + await con.getRepository(PostRelation).save({ + postId: 'col-cached', + relatedPostId: 'child-cach', + type: PostRelationType.Collection, + createdAt: new Date('2026-03-03T12:10:00.000Z'), + }); + + const evaluatorSpy = jest + .spyOn(evaluator, 'evaluateChannelHighlights') + .mockResolvedValue({ + items: [ + { + storyKey: 'collection:col-cached', + postId: 'col-cached', + headline: 'Fresh headline', + significanceScore: 0.91, + significanceLabel: 'breaking', + rank: 1, + reason: 'test', + }, + ], + }); + + await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + worker, + { + channel: 'vibes', + scheduledAt: now.toISOString(), + }, + ); + + expect(evaluatorSpy).toHaveBeenCalledTimes(1); + }); + + it('should exclude posts older than the configured horizon even when they were recently updated', async () => { + const now = new Date('2026-03-03T12:00:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ + channel: 'vibes', + enabled: true, + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 3, + }); + await saveArticle({ + id: 'old-1', + title: 'Old but active', + createdAt: new Date('2026-02-20T12:00:00.000Z'), + statsUpdatedAt: new Date('2026-03-03T11:55:00.000Z'), + metadataChangedAt: new Date('2026-03-03T11:55:00.000Z'), + }); + await saveArticle({ + id: 'fresh-1', + title: 'Fresh candidate', + createdAt: new Date('2026-03-03T11:30:00.000Z'), + }); + + const evaluatorSpy = jest + .spyOn(evaluator, 'evaluateChannelHighlights') + .mockImplementation(async ({ candidates }) => ({ + items: candidates.map((candidate, index) => ({ + storyKey: candidate.storyKey, + postId: candidate.canonicalPostId, + headline: candidate.title, + significanceScore: 0.7, + significanceLabel: 'notable', + rank: index + 1, + reason: 'test', + })), + })); + + await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + worker, + { + channel: 'vibes', + scheduledAt: now.toISOString(), + }, + ); + + expect(evaluatorSpy).toHaveBeenCalledTimes(1); + expect(evaluatorSpy.mock.calls[0][0].candidates).toEqual([ + expect.objectContaining({ + canonicalPostId: 'fresh-1', + }), + ]); + }); +}); diff --git a/src/common/channelHighlight/definitions.ts b/src/common/channelHighlight/definitions.ts new file mode 100644 index 0000000000..657500e133 --- /dev/null +++ b/src/common/channelHighlight/definitions.ts @@ -0,0 +1,35 @@ +import type { DataSource } from 'typeorm'; +import { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; +import { queryReadReplica } from '../queryReadReplica'; + +export const getChannelHighlightDefinitions = async ({ + con, +}: { + con: DataSource; +}): Promise => + queryReadReplica(con, ({ queryRunner }) => + queryRunner.manager.getRepository(ChannelHighlightDefinition).find({ + where: { + enabled: true, + }, + order: { + channel: 'ASC', + }, + }), + ); + +export const getChannelHighlightDefinitionByChannel = async ({ + con, + channel, +}: { + con: DataSource; + channel: string; +}): Promise => + queryReadReplica(con, ({ queryRunner }) => + queryRunner.manager.getRepository(ChannelHighlightDefinition).findOne({ + where: { + channel, + enabled: true, + }, + }), + ); diff --git a/src/common/channelHighlight/evaluate.ts b/src/common/channelHighlight/evaluate.ts new file mode 100644 index 0000000000..ab0f347cc1 --- /dev/null +++ b/src/common/channelHighlight/evaluate.ts @@ -0,0 +1,127 @@ +import type { PostType } from '../../entity/posts/Post'; + +export type HighlightQualitySummary = { + clickbaitProbability: number | null; + specificity: string | null; + intent: string | null; + substanceDepth: string | null; + titleContentAlignment: string | null; + selfPromotionScore: number | null; +}; + +export type HighlightStoryCandidate = { + storyKey: string; + canonicalPostId: string; + collectionId: string | null; + memberPostIds: string[]; + title: string; + summary: string; + type: PostType; + sourceId: string; + createdAt: string; + lastActivityAt: string; + upvotes: number; + comments: number; + views: number; + contentCuration: string[]; + quality: HighlightQualitySummary; + preliminaryScore: number; +}; + +export type EvaluateChannelHighlightsRequest = { + channel: string; + maxItems: number; + currentHighlights: { + postId: string; + rank: number; + headline: string; + storyKey: string; + }[]; + candidates: HighlightStoryCandidate[]; +}; + +export type EvaluatedHighlightItem = { + storyKey: string; + postId: string; + headline: string; + significanceScore: number; + significanceLabel: string; + rank: number; + reason: string; +}; + +export type EvaluateChannelHighlightsResponse = { + items: EvaluatedHighlightItem[]; +}; + +const clampScore = (score: number): number => + Math.max(0, Math.min(1, Number(score.toFixed(3)))); + +const toHeadline = (title: string): string => title.trim().slice(0, 200); + +const toSignificance = ( + candidate: HighlightStoryCandidate, +): Pick< + EvaluatedHighlightItem, + 'significanceLabel' | 'significanceScore' | 'reason' +> => { + const curation = new Set(candidate.contentCuration); + const isBreaking = + curation.has('news') || + curation.has('release') || + curation.has('leak') || + curation.has('milestone') || + curation.has('drama'); + const strongEngagement = + candidate.upvotes + candidate.comments * 2 + candidate.views / 250; + const score = clampScore( + candidate.preliminaryScore / 100 + (isBreaking ? 0.25 : 0), + ); + + if (score >= 0.8 || (isBreaking && strongEngagement >= 20)) { + return { + significanceLabel: 'breaking', + significanceScore: score, + reason: 'Mock evaluator marked the story as breaking', + }; + } + + if (score >= 0.55) { + return { + significanceLabel: 'notable', + significanceScore: score, + reason: 'Mock evaluator marked the story as notable', + }; + } + + return { + significanceLabel: 'routine', + significanceScore: score, + reason: 'Mock evaluator marked the story as routine', + }; +}; + +// This is an API-side placeholder until the Bragi contract is finalized. +export const evaluateChannelHighlights = async ({ + maxItems, + candidates, +}: EvaluateChannelHighlightsRequest): Promise => { + const items = candidates + .sort((left, right) => right.preliminaryScore - left.preliminaryScore) + .slice(0, maxItems) + .map((candidate, index) => { + const significance = toSignificance(candidate); + return { + storyKey: candidate.storyKey, + postId: candidate.canonicalPostId, + headline: toHeadline(candidate.title), + rank: index + 1, + ...significance, + }; + }) + .filter((item) => item.significanceLabel !== 'routine' || item.rank === 1); + + return { + items, + }; +}; diff --git a/src/common/channelHighlight/generate.ts b/src/common/channelHighlight/generate.ts new file mode 100644 index 0000000000..9daedd029e --- /dev/null +++ b/src/common/channelHighlight/generate.ts @@ -0,0 +1,883 @@ +import { Brackets, In, MoreThanOrEqual, type DataSource } from 'typeorm'; +import { logger as baseLogger } from '../../logger'; +import { ONE_DAY_IN_SECONDS, ONE_HOUR_IN_SECONDS } from '../constants'; +import { queryReadReplica } from '../queryReadReplica'; +import { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; +import { ChannelHighlightRun } from '../../entity/ChannelHighlightRun'; +import { ChannelHighlightState } from '../../entity/ChannelHighlightState'; +import { PostHighlight } from '../../entity/PostHighlight'; +import { Post, PostContentQuality } from '../../entity/posts/Post'; +import { + PostRelation, + PostRelationType, +} from '../../entity/posts/PostRelation'; +import { + channelHighlightCandidatePoolSchema, + emptyChannelHighlightCandidatePool, + type ChannelHighlightCandidatePool, + type StoredHighlightStory, +} from './schema'; +import { + evaluateChannelHighlights, + type EvaluatedHighlightItem, + type HighlightQualitySummary, + type HighlightStoryCandidate, +} from './evaluate'; +import { replaceHighlightsForChannel } from './publish'; + +const HIGHLIGHT_FETCH_OVERLAP_SECONDS = 10 * 60; +const MAX_CANDIDATE_POOL_STORIES = 50; +const SHORTLIST_MULTIPLIER = 3; + +type HighlightPost = Pick< + Post, + | 'id' + | 'type' + | 'title' + | 'summary' + | 'createdAt' + | 'metadataChangedAt' + | 'statsUpdatedAt' + | 'upvotes' + | 'comments' + | 'views' + | 'sourceId' + | 'contentCuration' + | 'contentQuality' + | 'visible' + | 'deleted' + | 'banned' + | 'showOnFeed' + | 'contentMeta' +> & { + url: string | null; + canonicalUrl: string | null; +}; + +type HighlightStory = { + storyKey: string; + canonicalPost: HighlightPost; + memberPosts: HighlightPost[]; + collectionId: string | null; + firstSeenAt: Date; + lastSeenAt: Date; + preliminaryScore: number; + cached?: StoredHighlightStory | null; +}; + +type HighlightBaselineItem = { + postId: string; + rank: number; + headline: string; + storyKey: string; +}; + +type GenerateChannelHighlightResult = { + run: ChannelHighlightRun; + published: boolean; +}; + +const getHorizonStart = ({ + now, + definition, +}: { + now: Date; + definition: Pick; +}): Date => + new Date( + now.getTime() - + definition.candidateHorizonHours * ONE_HOUR_IN_SECONDS * 1000, + ); + +const getFetchStart = ({ + now, + definition, + state, +}: { + now: Date; + definition: Pick; + state: Pick | null; +}): Date => { + const horizonStart = getHorizonStart({ + now, + definition, + }); + + if (!state?.lastFetchedAt) { + return horizonStart; + } + + const overlapStart = new Date( + state.lastFetchedAt.getTime() - HIGHLIGHT_FETCH_OVERLAP_SECONDS * 1000, + ); + + return overlapStart > horizonStart ? overlapStart : horizonStart; +}; + +const parseCandidatePool = ( + value: ChannelHighlightState['candidatePool'] | null | undefined, +): ChannelHighlightCandidatePool => { + const parsed = channelHighlightCandidatePoolSchema.safeParse(value); + return parsed.success ? parsed.data : emptyChannelHighlightCandidatePool(); +}; + +const toLastActivityAt = (post: HighlightPost): Date => { + const candidates = [ + post.createdAt?.getTime() || 0, + post.metadataChangedAt?.getTime() || 0, + post.statsUpdatedAt?.getTime() || 0, + ]; + return new Date(Math.max(...candidates)); +}; + +const toQualitySummary = ( + quality: PostContentQuality, +): HighlightQualitySummary => ({ + clickbaitProbability: + typeof quality?.is_clickbait_probability === 'number' + ? quality.is_clickbait_probability + : null, + specificity: quality?.specificity || null, + intent: quality?.intent || null, + substanceDepth: quality?.substance_depth || null, + titleContentAlignment: quality?.title_content_alignment || null, + selfPromotionScore: + typeof quality?.self_promotion_score === 'number' + ? quality.self_promotion_score + : null, +}); + +const getStoryKey = ({ + canonicalPost, + collectionId, +}: { + canonicalPost: HighlightPost; + collectionId: string | null; +}): string => { + if (collectionId) { + return `collection:${collectionId}`; + } + + const canonicalUrl = canonicalPost.canonicalUrl || canonicalPost.url; + if (canonicalUrl) { + return `url:${canonicalUrl.toLowerCase().trim()}`; + } + + return `post:${canonicalPost.id}`; +}; + +const toPreliminaryScore = ({ + story, + horizonStart, + referenceTime, +}: { + story: Omit; + horizonStart: Date; + referenceTime: Date; +}): number => { + const ageSeconds = Math.max( + 1, + (referenceTime.getTime() - story.canonicalPost.createdAt.getTime()) / 1000, + ); + const horizonSeconds = Math.max( + ONE_DAY_IN_SECONDS, + (referenceTime.getTime() - horizonStart.getTime()) / 1000, + ); + const recency = Math.max(0.15, 1 - ageSeconds / horizonSeconds); + const engagement = + story.canonicalPost.upvotes + + story.canonicalPost.comments * 2 + + story.canonicalPost.views / 200; + const collectionBoost = story.collectionId ? 8 : 0; + const curationBoost = story.canonicalPost.contentCuration.some((value) => + ['news', 'release', 'milestone', 'leak', 'drama'].includes(value), + ) + ? 5 + : 0; + const quality = toQualitySummary(story.canonicalPost.contentQuality || {}); + const penalty = + (quality.clickbaitProbability || 0) * 5 + + (quality.selfPromotionScore || 0) * 3; + + return Number( + (engagement * recency + collectionBoost + curationBoost - penalty).toFixed( + 3, + ), + ); +}; + +const selectCanonicalPost = (posts: HighlightPost[]): HighlightPost => + posts.slice().sort((left, right) => { + const leftScore = left.upvotes + left.comments * 2 + left.views / 200; + const rightScore = right.upvotes + right.comments * 2 + right.views / 200; + if (leftScore !== rightScore) { + return rightScore - leftScore; + } + + return right.createdAt.getTime() - left.createdAt.getTime(); + })[0]; + +const fetchDefinitionState = async ({ + con, + channel, +}: { + con: DataSource; + channel: string; +}): Promise => + queryReadReplica(con, ({ queryRunner }) => + queryRunner.manager.getRepository(ChannelHighlightState).findOne({ + where: { + channel, + }, + }), + ); + +const fetchCurrentHighlights = async ({ + con, + channel, +}: { + con: DataSource; + channel: string; +}): Promise => + queryReadReplica(con, ({ queryRunner }) => + queryRunner.manager.getRepository(PostHighlight).find({ + where: { + channel, + }, + order: { + rank: 'ASC', + }, + }), + ); + +const fetchPostsByIds = async ({ + con, + ids, + horizonStart, +}: { + con: DataSource; + ids: string[]; + horizonStart: Date; +}): Promise => { + if (!ids.length) { + return []; + } + + return queryReadReplica(con, ({ queryRunner }) => + queryRunner.manager.getRepository(Post).find({ + where: { + id: In(ids), + createdAt: MoreThanOrEqual(horizonStart), + visible: true, + deleted: false, + banned: false, + showOnFeed: true, + }, + }), + ) as unknown as Promise; +}; + +const fetchIncrementalPosts = async ({ + con, + channel, + fetchStart, + horizonStart, +}: { + con: DataSource; + channel: string; + fetchStart: Date; + horizonStart: Date; +}): Promise => + queryReadReplica(con, ({ queryRunner }) => + queryRunner.manager + .getRepository(Post) + .createQueryBuilder('post') + .where('post.createdAt >= :horizonStart', { horizonStart }) + .andWhere('post.visible = true') + .andWhere('post.deleted = false') + .andWhere('post.banned = false') + .andWhere('post.showOnFeed = true') + .andWhere(`(post."contentMeta"->'channels') ? :channel`, { channel }) + .andWhere( + new Brackets((builder) => { + builder + .where('post.createdAt >= :fetchStart', { fetchStart }) + .orWhere('post.metadataChangedAt >= :fetchStart', { fetchStart }) + .orWhere('post.statsUpdatedAt >= :fetchStart', { fetchStart }); + }), + ) + .getMany(), + ) as unknown as Promise; + +const fetchRelations = async ({ + con, + postIds, +}: { + con: DataSource; + postIds: string[]; +}): Promise => { + if (!postIds.length) { + return []; + } + + return queryReadReplica(con, ({ queryRunner }) => + queryRunner.manager + .getRepository(PostRelation) + .createQueryBuilder('relation') + .where('relation.type = :type', { + type: PostRelationType.Collection, + }) + .andWhere( + new Brackets((builder) => { + builder + .where('relation.relatedPostId IN (:...postIds)', { postIds }) + .orWhere('relation.postId IN (:...postIds)', { postIds }); + }), + ) + .getMany(), + ); +}; + +const mergePosts = (groups: HighlightPost[][]): HighlightPost[] => { + const byId = new Map(); + for (const posts of groups) { + for (const post of posts) { + byId.set(post.id, post); + } + } + + return [...byId.values()]; +}; + +const buildStories = ({ + posts, + relations, + previousPool, + horizonStart, + referenceTime, +}: { + posts: HighlightPost[]; + relations: PostRelation[]; + previousPool: ChannelHighlightCandidatePool; + horizonStart: Date; + referenceTime: Date; +}): HighlightStory[] => { + const postsById = new Map(posts.map((post) => [post.id, post])); + const previousStories = new Map( + previousPool.stories.map((story) => [story.storyKey, story]), + ); + const collectionByChildId = new Map(); + const collectionIds = new Set(); + const relationActivityByCollectionId = new Map(); + + for (const relation of relations) { + if ( + !postsById.has(relation.postId) || + !postsById.has(relation.relatedPostId) + ) { + continue; + } + + collectionIds.add(relation.postId); + collectionByChildId.set(relation.relatedPostId, relation.postId); + const currentActivity = relationActivityByCollectionId.get(relation.postId); + if (!currentActivity || relation.createdAt > currentActivity) { + relationActivityByCollectionId.set(relation.postId, relation.createdAt); + } + } + + const groupedStories = new Map(); + const storyCollections = new Map(); + + for (const post of posts) { + const collectionId = + collectionByChildId.get(post.id) || + (collectionIds.has(post.id) ? post.id : null); + const canonicalPost = + collectionId && postsById.has(collectionId) + ? postsById.get(collectionId)! + : post; + const storyKey = getStoryKey({ + canonicalPost, + collectionId, + }); + + if (!groupedStories.has(storyKey)) { + groupedStories.set(storyKey, []); + storyCollections.set(storyKey, collectionId); + } + + groupedStories.get(storyKey)!.push(post); + } + + return [...groupedStories.entries()] + .map(([storyKey, memberPosts]) => { + const collectionId = storyCollections.get(storyKey) || null; + const canonicalPost = + collectionId && postsById.has(collectionId) + ? postsById.get(collectionId)! + : selectCanonicalPost(memberPosts); + const firstSeenAt = memberPosts.reduce( + (current, post) => + post.createdAt < current ? post.createdAt : current, + canonicalPost.createdAt, + ); + const lastSeenAt = memberPosts.reduce((current, post) => { + const lastActivityAt = toLastActivityAt(post); + return lastActivityAt > current ? lastActivityAt : current; + }, toLastActivityAt(canonicalPost)); + const relationActivityAt = collectionId + ? relationActivityByCollectionId.get(collectionId) + : null; + const baseStory = { + storyKey, + canonicalPost, + memberPosts, + collectionId, + firstSeenAt, + lastSeenAt: + relationActivityAt && relationActivityAt > lastSeenAt + ? relationActivityAt + : lastSeenAt, + cached: previousStories.get(storyKey) || null, + }; + + return { + ...baseStory, + preliminaryScore: toPreliminaryScore({ + story: baseStory, + horizonStart, + referenceTime, + }), + }; + }) + .filter((story) => story.lastSeenAt >= horizonStart) + .sort((left, right) => right.preliminaryScore - left.preliminaryScore); +}; + +const toStoryCandidate = (story: HighlightStory): HighlightStoryCandidate => ({ + storyKey: story.storyKey, + canonicalPostId: story.canonicalPost.id, + collectionId: story.collectionId, + memberPostIds: story.memberPosts.map((post) => post.id).sort(), + title: story.canonicalPost.title || '', + summary: story.canonicalPost.summary || '', + type: story.canonicalPost.type, + sourceId: story.canonicalPost.sourceId, + createdAt: story.canonicalPost.createdAt.toISOString(), + lastActivityAt: story.lastSeenAt.toISOString(), + upvotes: story.canonicalPost.upvotes, + comments: story.canonicalPost.comments, + views: story.canonicalPost.views, + contentCuration: story.canonicalPost.contentCuration || [], + quality: toQualitySummary(story.canonicalPost.contentQuality || {}), + preliminaryScore: story.preliminaryScore, +}); + +const toBaselineStoryKey = ({ + postId, + storiesByPostId, +}: { + postId: string; + storiesByPostId: Map; +}): string => storiesByPostId.get(postId)?.storyKey || `post:${postId}`; + +const buildBaselineSnapshot = ({ + highlights, + storiesByPostId, +}: { + highlights: PostHighlight[]; + storiesByPostId: Map; +}): HighlightBaselineItem[] => + highlights.map((highlight) => ({ + postId: highlight.postId, + rank: highlight.rank, + headline: highlight.headline, + storyKey: toBaselineStoryKey({ + postId: highlight.postId, + storiesByPostId, + }), + })); + +const arePostIdsEqual = (left: string[], right: string[]): boolean => + left.length === right.length && + left.every((postId, index) => postId === right[index]); + +const hasCachedStoryChanged = (story: HighlightStory): boolean => { + if (!story.cached) { + return true; + } + + const cachedMemberPostIds = [...story.cached.memberPostIds].sort(); + const storyMemberPostIds = story.memberPosts.map((post) => post.id).sort(); + + return ( + story.cached.canonicalPostId !== story.canonicalPost.id || + !arePostIdsEqual(cachedMemberPostIds, storyMemberPostIds) + ); +}; + +const shouldReuseEvaluations = ({ + shortlist, +}: { + shortlist: HighlightStory[]; +}): boolean => + shortlist.length > 0 && + shortlist.every((story) => { + if ( + !story.cached?.lastHeadline || + story.cached.lastSignificanceScore == null + ) { + return false; + } + + if (!story.cached.lastLlmEvaluatedAt) { + return false; + } + + if (hasCachedStoryChanged(story)) { + return false; + } + + return ( + new Date(story.cached.lastLlmEvaluatedAt).getTime() >= + story.lastSeenAt.getTime() + ); + }); + +const reuseEvaluations = ({ + shortlist, + maxItems, +}: { + shortlist: HighlightStory[]; + maxItems: number; +}): EvaluatedHighlightItem[] => + shortlist.slice(0, maxItems).map((story, index) => ({ + storyKey: story.storyKey, + postId: story.canonicalPost.id, + headline: story.cached?.lastHeadline || story.canonicalPost.title || '', + significanceScore: story.cached?.lastSignificanceScore || 0, + significanceLabel: story.cached?.lastSignificanceLabel || 'routine', + rank: index + 1, + reason: 'Reused cached evaluation', + })); + +const toPoolStory = ({ + story, + evaluation, + evaluatedAt, +}: { + story: HighlightStory; + evaluation?: EvaluatedHighlightItem; + evaluatedAt: Date; +}): StoredHighlightStory => ({ + storyKey: story.storyKey, + canonicalPostId: story.canonicalPost.id, + collectionId: story.collectionId, + memberPostIds: story.memberPosts.map((post) => post.id).sort(), + firstSeenAt: story.cached?.firstSeenAt || story.firstSeenAt.toISOString(), + lastSeenAt: story.lastSeenAt.toISOString(), + lastLlmEvaluatedAt: evaluation + ? evaluatedAt.toISOString() + : story.cached?.lastLlmEvaluatedAt || null, + lastSignificanceScore: + evaluation?.significanceScore ?? + story.cached?.lastSignificanceScore ?? + null, + lastSignificanceLabel: + evaluation?.significanceLabel ?? + story.cached?.lastSignificanceLabel ?? + null, + lastHeadline: evaluation?.headline ?? story.cached?.lastHeadline ?? null, + status: story.cached?.status || 'active', +}); + +const shouldPublish = ({ + baseline, + internal, +}: { + baseline: HighlightBaselineItem[]; + internal: EvaluatedHighlightItem[]; +}): boolean => { + if (!baseline.length) { + return internal.length > 0; + } + + if (!internal.length) { + return false; + } + + const toItemSignature = (item: { + storyKey: string; + rank: number; + postId: string; + }): string => `${item.storyKey}:${item.rank}:${item.postId}`; + + const baselineSignature = baseline.map(toItemSignature).join('|'); + const internalSignature = internal.map(toItemSignature).join('|'); + + if (baselineSignature !== internalSignature) { + return true; + } + + const baselineHeadlines = baseline.map((item) => item.headline).join('|'); + const internalHeadlines = internal.map((item) => item.headline).join('|'); + return baselineHeadlines !== internalHeadlines; +}; + +const compareSnapshots = ({ + baseline, + internal, +}: { + baseline: HighlightBaselineItem[]; + internal: EvaluatedHighlightItem[]; +}) => { + const baselineByStory = new Map( + baseline.map((item) => [item.storyKey, item]), + ); + const internalByStory = new Map( + internal.map((item) => [item.storyKey, item]), + ); + const overlap = [...internalByStory.keys()].filter((storyKey) => + baselineByStory.has(storyKey), + ); + + return { + baselineCount: baseline.length, + internalCount: internal.length, + overlapCount: overlap.length, + addedStoryKeys: [...internalByStory.keys()].filter( + (storyKey) => !baselineByStory.has(storyKey), + ), + removedStoryKeys: [...baselineByStory.keys()].filter( + (storyKey) => !internalByStory.has(storyKey), + ), + churnCount: + [...internalByStory.keys()].filter( + (storyKey) => !baselineByStory.has(storyKey), + ).length + + [...baselineByStory.keys()].filter( + (storyKey) => !internalByStory.has(storyKey), + ).length, + }; +}; + +export const generateChannelHighlight = async ({ + con, + definition, + now = new Date(), +}: { + con: DataSource; + definition: ChannelHighlightDefinition; + now?: Date; +}): Promise => { + const state = await fetchDefinitionState({ + con, + channel: definition.channel, + }); + const previousPool = parseCandidatePool(state?.candidatePool); + const currentHighlights = await fetchCurrentHighlights({ + con, + channel: definition.channel, + }); + const horizonStart = getHorizonStart({ + now, + definition, + }); + const fetchStart = getFetchStart({ + now, + definition, + state, + }); + const previousPoolPostIds = previousPool.stories.flatMap( + (story) => story.memberPostIds, + ); + const currentHighlightPostIds = currentHighlights.map((item) => item.postId); + const retainedPosts = await fetchPostsByIds({ + con, + ids: [...new Set([...previousPoolPostIds, ...currentHighlightPostIds])], + horizonStart, + }); + const incrementalPosts = await fetchIncrementalPosts({ + con, + channel: definition.channel, + fetchStart, + horizonStart, + }); + const basePosts = mergePosts([retainedPosts, incrementalPosts]); + const baseRelations = await fetchRelations({ + con, + postIds: basePosts.map((post) => post.id), + }); + const relatedPostIds = [ + ...new Set( + baseRelations.flatMap((relation) => [ + relation.postId, + relation.relatedPostId, + ]), + ), + ]; + const relationPosts = await fetchPostsByIds({ + con, + ids: relatedPostIds, + horizonStart, + }); + const posts = mergePosts([basePosts, relationPosts]); + const stories = buildStories({ + posts, + relations: baseRelations, + previousPool, + horizonStart, + referenceTime: now, + }); + const storiesByPostId = new Map(); + for (const story of stories) { + storiesByPostId.set(story.canonicalPost.id, story); + for (const memberPost of story.memberPosts) { + storiesByPostId.set(memberPost.id, story); + } + } + + const shortlist = stories.slice( + 0, + definition.maxItems * SHORTLIST_MULTIPLIER, + ); + const baselineSnapshot = buildBaselineSnapshot({ + highlights: currentHighlights, + storiesByPostId, + }); + const reusedEvaluations = shouldReuseEvaluations({ + shortlist, + }); + + const evaluatedItems = reusedEvaluations + ? reuseEvaluations({ + shortlist, + maxItems: definition.maxItems, + }) + : ( + await evaluateChannelHighlights({ + channel: definition.channel, + maxItems: definition.maxItems, + currentHighlights: baselineSnapshot, + candidates: shortlist.map(toStoryCandidate), + }) + ).items; + + const candidatePool = { + stories: stories.slice(0, MAX_CANDIDATE_POOL_STORIES).map((story) => + toPoolStory({ + story, + evaluation: evaluatedItems.find( + (item) => item.storyKey === story.storyKey, + ), + evaluatedAt: now, + }), + ), + }; + const comparison = compareSnapshots({ + baseline: baselineSnapshot, + internal: evaluatedItems, + }); + const wouldPublish = shouldPublish({ + baseline: baselineSnapshot, + internal: evaluatedItems, + }); + const publish = definition.mode === 'publish' && wouldPublish; + const metrics = { + fetchedPosts: incrementalPosts.length, + retainedPosts: retainedPosts.length, + totalStories: stories.length, + shortlistStories: shortlist.length, + evaluatedStories: reusedEvaluations ? 0 : shortlist.length, + reusedEvaluation: reusedEvaluations, + }; + + const runRepo = con.getRepository(ChannelHighlightRun); + let run = runRepo.create({ + channel: definition.channel, + scheduledAt: now, + status: 'processing', + baselineSnapshot, + inputSummary: { + fetchStart: fetchStart.toISOString(), + horizonStart: horizonStart.toISOString(), + shortlist: shortlist.map((story) => ({ + storyKey: story.storyKey, + canonicalPostId: story.canonicalPost.id, + preliminaryScore: story.preliminaryScore, + })), + }, + internalSnapshot: [], + comparison: {}, + metrics, + }); + run = await runRepo.save(run); + + try { + await con.transaction(async (manager) => { + await manager.getRepository(ChannelHighlightState).save({ + channel: definition.channel, + lastFetchedAt: now, + lastPublishedAt: publish ? now : state?.lastPublishedAt || null, + candidatePool, + }); + + if (publish) { + await replaceHighlightsForChannel({ + manager, + channel: definition.channel, + items: evaluatedItems.map((item) => ({ + postId: item.postId, + rank: item.rank, + headline: item.headline, + })), + }); + } + + await manager.getRepository(ChannelHighlightRun).update( + { + id: run.id, + }, + { + status: 'completed', + completedAt: new Date(), + internalSnapshot: evaluatedItems, + comparison: { + ...comparison, + wouldPublish, + published: publish, + }, + metrics, + }, + ); + }); + + const completedRun = await runRepo.findOneByOrFail({ + id: run.id, + }); + + return { + run: completedRun, + published: publish, + }; + } catch (err) { + baseLogger.error( + { err, channel: definition.channel }, + 'Failed channel highlight run', + ); + await runRepo.update( + { + id: run.id, + }, + { + status: 'failed', + completedAt: new Date(), + error: { + message: err instanceof Error ? err.message : 'Unknown error', + }, + }, + ); + throw err; + } +}; diff --git a/src/common/channelHighlight/publish.ts b/src/common/channelHighlight/publish.ts new file mode 100644 index 0000000000..1b452fde9f --- /dev/null +++ b/src/common/channelHighlight/publish.ts @@ -0,0 +1,32 @@ +import type { EntityManager } from 'typeorm'; +import { PostHighlight } from '../../entity/PostHighlight'; + +export type PublishHighlightItem = { + postId: string; + rank: number; + headline: string; +}; + +export const replaceHighlightsForChannel = async ({ + manager, + channel, + items, +}: { + manager: EntityManager; + channel: string; + items: PublishHighlightItem[]; +}): Promise => { + const repo = manager.getRepository(PostHighlight); + await repo.delete({ channel }); + + if (!items.length) { + return; + } + + await repo.insert( + items.map((item) => ({ + ...item, + channel, + })), + ); +}; diff --git a/src/common/channelHighlight/schema.ts b/src/common/channelHighlight/schema.ts new file mode 100644 index 0000000000..3e0ac4f656 --- /dev/null +++ b/src/common/channelHighlight/schema.ts @@ -0,0 +1,39 @@ +import { z } from 'zod'; + +export const channelHighlightModes = ['shadow', 'publish'] as const; +export type ChannelHighlightMode = (typeof channelHighlightModes)[number]; + +export const channelHighlightStatuses = [ + 'active', + 'published', + 'dropped', +] as const; +export type ChannelHighlightStatus = (typeof channelHighlightStatuses)[number]; + +export const storedHighlightStorySchema = z.object({ + storyKey: z.string().min(1), + canonicalPostId: z.string().min(1), + collectionId: z.string().min(1).nullable(), + memberPostIds: z.array(z.string().min(1)).min(1), + firstSeenAt: z.string().datetime(), + lastSeenAt: z.string().datetime(), + lastLlmEvaluatedAt: z.string().datetime().nullable().optional(), + lastSignificanceScore: z.number().min(0).max(1).nullable().optional(), + lastSignificanceLabel: z.string().min(1).nullable().optional(), + lastHeadline: z.string().min(1).max(200).nullable().optional(), + status: z.enum(channelHighlightStatuses).default('active'), +}); + +export const channelHighlightCandidatePoolSchema = z.object({ + stories: z.array(storedHighlightStorySchema).default([]), +}); + +export type StoredHighlightStory = z.infer; +export type ChannelHighlightCandidatePool = z.infer< + typeof channelHighlightCandidatePoolSchema +>; + +export const emptyChannelHighlightCandidatePool = + (): ChannelHighlightCandidatePool => ({ + stories: [], + }); diff --git a/src/common/typedPubsub.ts b/src/common/typedPubsub.ts index 582c7dd25f..6ac759f014 100644 --- a/src/common/typedPubsub.ts +++ b/src/common/typedPubsub.ts @@ -297,6 +297,10 @@ export type PubSubSchema = { digestKey: string; scheduledAt: string; }; + 'api.v1.generate-channel-highlight': { + channel: string; + scheduledAt: string; + }; }; export async function triggerTypedEvent( diff --git a/src/cron/channelHighlights.ts b/src/cron/channelHighlights.ts new file mode 100644 index 0000000000..e40732f85a --- /dev/null +++ b/src/cron/channelHighlights.ts @@ -0,0 +1,27 @@ +import { getChannelHighlightDefinitions } from '../common/channelHighlight/definitions'; +import { triggerTypedEvent } from '../common/typedPubsub'; +import { Cron } from './cron'; + +export const getChannelHighlightsNow = (): Date => new Date(); + +const cron: Cron = { + name: 'channel-highlights', + handler: async (con, logger) => { + const now = getChannelHighlightsNow(); + const scheduledAt = now.toISOString(); + const definitions = await getChannelHighlightDefinitions({ + con, + }); + + await Promise.all( + definitions.map(({ channel }) => + triggerTypedEvent(logger, 'api.v1.generate-channel-highlight', { + channel, + scheduledAt, + }), + ), + ); + }, +}; + +export default cron; diff --git a/src/cron/index.ts b/src/cron/index.ts index 23dff9331c..47897dd22c 100644 --- a/src/cron/index.ts +++ b/src/cron/index.ts @@ -33,6 +33,7 @@ import rotateDailyQuests from './rotateDailyQuests'; import rotateWeeklyQuests from './rotateWeeklyQuests'; import backfillGearCategory from './backfillGearCategory'; import channelDigests from './channelDigests'; +import channelHighlights from './channelHighlights'; import { cleanExpiredBetterAuthSessions } from './cleanExpiredBetterAuthSessions'; export const crons: Cron[] = [ @@ -70,5 +71,6 @@ export const crons: Cron[] = [ rotateWeeklyQuests, backfillGearCategory, channelDigests, + channelHighlights, cleanExpiredBetterAuthSessions, ]; diff --git a/src/entity/ChannelHighlightDefinition.ts b/src/entity/ChannelHighlightDefinition.ts new file mode 100644 index 0000000000..5b7e68ca45 --- /dev/null +++ b/src/entity/ChannelHighlightDefinition.ts @@ -0,0 +1,32 @@ +import { + Column, + CreateDateColumn, + Entity, + PrimaryColumn, + UpdateDateColumn, +} from 'typeorm'; +import type { ChannelHighlightMode } from '../common/channelHighlight/schema'; + +@Entity() +export class ChannelHighlightDefinition { + @PrimaryColumn({ type: 'text' }) + channel: string; + + @Column({ type: 'boolean', default: false }) + enabled: boolean; + + @Column({ type: 'text', default: 'shadow' }) + mode: ChannelHighlightMode; + + @Column({ type: 'smallint', default: 72 }) + candidateHorizonHours: number; + + @Column({ type: 'smallint', default: 10 }) + maxItems: number; + + @CreateDateColumn() + createdAt: Date; + + @UpdateDateColumn() + updatedAt: Date; +} diff --git a/src/entity/ChannelHighlightRun.ts b/src/entity/ChannelHighlightRun.ts new file mode 100644 index 0000000000..baf8bf7920 --- /dev/null +++ b/src/entity/ChannelHighlightRun.ts @@ -0,0 +1,50 @@ +import { + Column, + CreateDateColumn, + Entity, + Index, + PrimaryGeneratedColumn, +} from 'typeorm'; + +@Entity() +@Index('IDX_channel_highlight_run_channel_scheduledAt', [ + 'channel', + 'scheduledAt', +]) +export class ChannelHighlightRun { + @PrimaryGeneratedColumn('uuid') + id: string; + + @Column({ type: 'text' }) + channel: string; + + @Column({ type: 'timestamp' }) + scheduledAt: Date; + + @CreateDateColumn() + startedAt: Date; + + @Column({ type: 'timestamp', nullable: true }) + completedAt: Date | null; + + @Column({ type: 'text' }) + status: string; + + @Column({ type: 'jsonb', default: () => `'[]'::jsonb` }) + baselineSnapshot: object[]; + + @Column({ type: 'jsonb', default: () => `'{}'::jsonb` }) + inputSummary: object; + + @Column({ type: 'jsonb', default: () => `'[]'::jsonb` }) + internalSnapshot: object[]; + + @Column({ type: 'jsonb', default: () => `'{}'::jsonb` }) + comparison: object; + + @Column({ type: 'jsonb', default: () => `'{}'::jsonb` }) + metrics: object; + + @Column({ type: 'jsonb', nullable: true }) + error: object | null; +} diff --git a/src/entity/ChannelHighlightState.ts b/src/entity/ChannelHighlightState.ts new file mode 100644 index 0000000000..a2ce2dd29d --- /dev/null +++ b/src/entity/ChannelHighlightState.ts @@ -0,0 +1,20 @@ +import { Column, Entity, PrimaryColumn, UpdateDateColumn } from 'typeorm'; +import type { ChannelHighlightCandidatePool } from '../common/channelHighlight/schema'; + +@Entity() +export class ChannelHighlightState { + @PrimaryColumn({ type: 'text' }) + channel: string; + + @Column({ type: 'timestamp', nullable: true }) + lastFetchedAt: Date | null; + + @Column({ type: 'timestamp', nullable: true }) + lastPublishedAt: Date | null; + + @Column({ type: 'jsonb', default: () => `'{"stories":[]}'::jsonb` }) + candidatePool: ChannelHighlightCandidatePool; + + @UpdateDateColumn() + updatedAt: Date; +} diff --git a/src/entity/index.ts b/src/entity/index.ts index ed4554f7ee..5ebee308ea 100644 --- a/src/entity/index.ts +++ b/src/entity/index.ts @@ -48,3 +48,6 @@ export * from './Quest'; export * from './QuestReward'; export * from './QuestRotation'; export * from './PostHighlight'; +export * from './ChannelHighlightDefinition'; +export * from './ChannelHighlightState'; +export * from './ChannelHighlightRun'; diff --git a/src/migration/1773000000000-ChannelHighlights.ts b/src/migration/1773000000000-ChannelHighlights.ts new file mode 100644 index 0000000000..8b82c12f42 --- /dev/null +++ b/src/migration/1773000000000-ChannelHighlights.ts @@ -0,0 +1,84 @@ +import type { MigrationInterface, QueryRunner } from 'typeorm'; + +export class ChannelHighlights1773000000000 implements MigrationInterface { + name = 'ChannelHighlights1773000000000'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(/* sql */ ` + CREATE TABLE "channel_highlight_definition" ( + "channel" text NOT NULL, + "enabled" boolean NOT NULL DEFAULT false, + "mode" text NOT NULL DEFAULT 'shadow', + "candidateHorizonHours" smallint NOT NULL DEFAULT 72, + "maxItems" smallint NOT NULL DEFAULT 10, + "createdAt" TIMESTAMP NOT NULL DEFAULT now(), + "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT "PK_channel_highlight_definition_channel" + PRIMARY KEY ("channel") + ) + `); + + await queryRunner.query(/* sql */ ` + CREATE TABLE "channel_highlight_state" ( + "channel" text NOT NULL, + "lastFetchedAt" TIMESTAMP, + "lastPublishedAt" TIMESTAMP, + "candidatePool" jsonb NOT NULL DEFAULT '{"stories":[]}'::jsonb, + "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT "PK_channel_highlight_state_channel" + PRIMARY KEY ("channel") + ) + `); + + await queryRunner.query(/* sql */ ` + CREATE TABLE "channel_highlight_run" ( + "id" uuid NOT NULL DEFAULT uuid_generate_v4(), + "channel" text NOT NULL, + "scheduledAt" TIMESTAMP NOT NULL, + "startedAt" TIMESTAMP NOT NULL DEFAULT now(), + "completedAt" TIMESTAMP, + "status" text NOT NULL, + "baselineSnapshot" jsonb NOT NULL DEFAULT '[]'::jsonb, + "inputSummary" jsonb NOT NULL DEFAULT '{}'::jsonb, + "internalSnapshot" jsonb NOT NULL DEFAULT '[]'::jsonb, + "comparison" jsonb NOT NULL DEFAULT '{}'::jsonb, + "metrics" jsonb NOT NULL DEFAULT '{}'::jsonb, + "error" jsonb, + CONSTRAINT "PK_channel_highlight_run_id" + PRIMARY KEY ("id") + ) + `); + + await queryRunner.query(/* sql */ ` + CREATE INDEX "IDX_channel_highlight_run_channel_scheduledAt" + ON "channel_highlight_run" ("channel", "scheduledAt") + `); + + await queryRunner.query(/* sql */ ` + INSERT INTO "channel_highlight_definition" ( + "channel", + "enabled", + "mode", + "candidateHorizonHours", + "maxItems" + ) + VALUES ('vibes', false, 'shadow', 72, 10) + ON CONFLICT ("channel") DO NOTHING + `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(/* sql */ ` + DROP INDEX IF EXISTS "IDX_channel_highlight_run_channel_scheduledAt" + `); + await queryRunner.query(/* sql */ ` + DROP TABLE IF EXISTS "channel_highlight_run" + `); + await queryRunner.query(/* sql */ ` + DROP TABLE IF EXISTS "channel_highlight_state" + `); + await queryRunner.query(/* sql */ ` + DROP TABLE IF EXISTS "channel_highlight_definition" + `); + } +} diff --git a/src/workers/generateChannelHighlight.ts b/src/workers/generateChannelHighlight.ts new file mode 100644 index 0000000000..d8ff9b8215 --- /dev/null +++ b/src/workers/generateChannelHighlight.ts @@ -0,0 +1,96 @@ +import { ONE_DAY_IN_SECONDS, ONE_MINUTE_IN_SECONDS } from '../common/constants'; +import { getChannelHighlightDefinitionByChannel } from '../common/channelHighlight/definitions'; +import { generateChannelHighlight } from '../common/channelHighlight/generate'; +import { + checkRedisObjectExists, + deleteRedisKey, + setRedisObjectIfNotExistsWithExpiry, + setRedisObjectWithExpiry, +} from '../redis'; +import { TypedWorker } from './worker'; + +const CHANNEL_HIGHLIGHT_LOCK_TTL_SECONDS = 10 * ONE_MINUTE_IN_SECONDS; +const CHANNEL_HIGHLIGHT_DONE_TTL_SECONDS = 2 * ONE_DAY_IN_SECONDS; + +const getChannelHighlightDoneKey = ({ + channel, + scheduledAt, +}: { + channel: string; + scheduledAt: string; +}): string => `channel-highlight:done:${channel}:${scheduledAt}`; + +const getChannelHighlightLockKey = ({ + channel, + scheduledAt, +}: { + channel: string; + scheduledAt: string; +}): string => `channel-highlight:lock:${channel}:${scheduledAt}`; + +const worker: TypedWorker<'api.v1.generate-channel-highlight'> = { + subscription: 'api.generate-channel-highlight', + handler: async (message, con, logger): Promise => { + const { channel, scheduledAt } = message.data; + const logDetails = { channel, scheduledAt, messageId: message.messageId }; + const definition = await getChannelHighlightDefinitionByChannel({ + con, + channel, + }); + + if (!definition) { + logger.error(logDetails, 'Channel highlight definition not found'); + return; + } + + const now = new Date(scheduledAt); + if (Number.isNaN(now.getTime())) { + logger.error(logDetails, 'Channel highlight scheduledAt is invalid'); + return; + } + + const doneKey = getChannelHighlightDoneKey({ + channel, + scheduledAt, + }); + if (await checkRedisObjectExists(doneKey)) { + return; + } + + const lockKey = getChannelHighlightLockKey({ + channel, + scheduledAt, + }); + const lockAcquired = await setRedisObjectIfNotExistsWithExpiry( + lockKey, + message.messageId || channel, + CHANNEL_HIGHLIGHT_LOCK_TTL_SECONDS, + ); + if (!lockAcquired) { + return; + } + + try { + await generateChannelHighlight({ + con, + definition, + now, + }); + await setRedisObjectWithExpiry( + doneKey, + '1', + CHANNEL_HIGHLIGHT_DONE_TTL_SECONDS, + ); + } catch (err) { + logger.error( + { ...logDetails, err }, + 'Failed to generate channel highlight', + ); + throw err; + } finally { + await deleteRedisKey(lockKey); + } + }, +}; + +export default worker; diff --git a/src/workers/index.ts b/src/workers/index.ts index dd986c8d54..cb2c86f9f7 100644 --- a/src/workers/index.ts +++ b/src/workers/index.ts @@ -83,6 +83,7 @@ import feedbackUpdatedSlack from './feedbackUpdatedSlack'; import gearClassify from './gearClassify'; import agenticDigestTweet from './agenticDigestTweet'; import generateChannelDigest from './generateChannelDigest'; +import generateChannelHighlight from './generateChannelHighlight'; import { jobExecuteWorker } from './job/jobExecute'; import workerJobDeadLetterLog from './workerJobDeadLetterLog'; @@ -171,6 +172,7 @@ export const typedWorkers: BaseTypedWorker[] = [ gearClassify, agenticDigestTweet, generateChannelDigest, + generateChannelHighlight, ]; export const personalizedDigestWorkers: Worker[] = [ From c1488f98ab1fb61d5a94ca84b6f29371f4027cda Mon Sep 17 00:00:00 2001 From: Ido Shamun <1993245+idoshamun@users.noreply.github.com> Date: Thu, 19 Mar 2026 12:01:16 +0200 Subject: [PATCH 2/5] Refactor channel highlight generation flow --- __tests__/cron/channelHighlights.ts | 28 +- src/common/channelHighlight/decisions.ts | 169 +++++ src/common/channelHighlight/definitions.ts | 33 +- src/common/channelHighlight/generate.ts | 821 ++++----------------- src/common/channelHighlight/queries.ts | 184 +++++ src/common/channelHighlight/stories.ts | 286 +++++++ src/common/channelHighlight/types.ts | 52 ++ src/cron/channelHighlights.ts | 5 +- src/workers/generateChannelDigest.ts | 57 +- src/workers/generateChannelHighlight.ts | 57 +- src/workers/withRedisDoneLock.ts | 43 ++ 11 files changed, 929 insertions(+), 806 deletions(-) create mode 100644 src/common/channelHighlight/decisions.ts create mode 100644 src/common/channelHighlight/queries.ts create mode 100644 src/common/channelHighlight/stories.ts create mode 100644 src/common/channelHighlight/types.ts create mode 100644 src/workers/withRedisDoneLock.ts diff --git a/__tests__/cron/channelHighlights.ts b/__tests__/cron/channelHighlights.ts index 6ab1d72d3d..55e86c0118 100644 --- a/__tests__/cron/channelHighlights.ts +++ b/__tests__/cron/channelHighlights.ts @@ -2,7 +2,7 @@ import type { DataSource } from 'typeorm'; import createOrGetConnection from '../../src/db'; import { ChannelHighlightDefinition } from '../../src/entity/ChannelHighlightDefinition'; import * as typedPubsub from '../../src/common/typedPubsub'; -import * as channelHighlightsModule from '../../src/cron/channelHighlights'; +import channelHighlights from '../../src/cron/channelHighlights'; import { crons } from '../../src/cron/index'; let con: DataSource; @@ -19,16 +19,13 @@ describe('channelHighlights cron', () => { it('should be registered', () => { const registeredCron = crons.find( - (item) => item.name === channelHighlightsModule.default.name, + (item) => item.name === channelHighlights.name, ); expect(registeredCron).toBeDefined(); }); it('should enqueue enabled highlight definitions', async () => { - jest - .spyOn(channelHighlightsModule, 'getChannelHighlightsNow') - .mockReturnValue(new Date('2026-03-02T10:00:00.000Z')); const triggerTypedEventSpy = jest .spyOn(typedPubsub, 'triggerTypedEvent') .mockResolvedValue(); @@ -57,11 +54,9 @@ describe('channelHighlights cron', () => { }, ]); - await channelHighlightsModule.default.handler( - con, - {} as never, - {} as never, - ); + const startedAt = Date.now(); + await channelHighlights.handler(con, {} as never, {} as never); + const completedAt = Date.now(); expect(triggerTypedEventSpy.mock.calls).toEqual([ [ @@ -69,7 +64,7 @@ describe('channelHighlights cron', () => { 'api.v1.generate-channel-highlight', { channel: 'backend', - scheduledAt: '2026-03-02T10:00:00.000Z', + scheduledAt: expect.any(String), }, ], [ @@ -77,9 +72,18 @@ describe('channelHighlights cron', () => { 'api.v1.generate-channel-highlight', { channel: 'vibes', - scheduledAt: '2026-03-02T10:00:00.000Z', + scheduledAt: expect.any(String), }, ], ]); + + const scheduledAt = Date.parse( + triggerTypedEventSpy.mock.calls[0][2].scheduledAt, + ); + expect(scheduledAt).toBeGreaterThanOrEqual(startedAt); + expect(scheduledAt).toBeLessThanOrEqual(completedAt); + expect(triggerTypedEventSpy.mock.calls[0][2].scheduledAt).toBe( + triggerTypedEventSpy.mock.calls[1][2].scheduledAt, + ); }); }); diff --git a/src/common/channelHighlight/decisions.ts b/src/common/channelHighlight/decisions.ts new file mode 100644 index 0000000000..ce118ed983 --- /dev/null +++ b/src/common/channelHighlight/decisions.ts @@ -0,0 +1,169 @@ +import type { EvaluatedHighlightItem } from './evaluate'; +import type { StoredHighlightStory } from './schema'; +import type { HighlightBaselineItem, HighlightStory } from './types'; + +const arePostIdsEqual = (left: string[], right: string[]): boolean => + left.length === right.length && + left.every((postId, index) => postId === right[index]); + +const hasCachedStoryChanged = (story: HighlightStory): boolean => { + if (!story.cached) { + return true; + } + + const cachedMemberPostIds = [...story.cached.memberPostIds].sort(); + const storyMemberPostIds = story.memberPosts.map((post) => post.id).sort(); + + return ( + story.cached.canonicalPostId !== story.canonicalPost.id || + !arePostIdsEqual(cachedMemberPostIds, storyMemberPostIds) + ); +}; + +// Reuse the prior LLM decision only when the underlying story is still the same +// story, the canonical post did not change, and no newer post/relation activity +// happened since that decision. +export const shouldReuseEvaluations = ({ + shortlist, +}: { + shortlist: HighlightStory[]; +}): boolean => + shortlist.length > 0 && + shortlist.every((story) => { + if ( + !story.cached?.lastHeadline || + story.cached.lastSignificanceScore == null || + !story.cached.lastLlmEvaluatedAt + ) { + return false; + } + + if (hasCachedStoryChanged(story)) { + return false; + } + + return ( + new Date(story.cached.lastLlmEvaluatedAt).getTime() >= + story.lastSeenAt.getTime() + ); + }); + +export const reuseEvaluations = ({ + shortlist, + maxItems, +}: { + shortlist: HighlightStory[]; + maxItems: number; +}): EvaluatedHighlightItem[] => + shortlist.slice(0, maxItems).map((story, index) => ({ + storyKey: story.storyKey, + postId: story.canonicalPost.id, + headline: story.cached?.lastHeadline || story.canonicalPost.title || '', + significanceScore: story.cached?.lastSignificanceScore || 0, + significanceLabel: story.cached?.lastSignificanceLabel || 'routine', + rank: index + 1, + reason: 'Reused cached evaluation', + })); + +export const toPoolStory = ({ + story, + evaluation, + evaluatedAt, +}: { + story: HighlightStory; + evaluation?: EvaluatedHighlightItem; + evaluatedAt: Date; +}): StoredHighlightStory => ({ + storyKey: story.storyKey, + canonicalPostId: story.canonicalPost.id, + collectionId: story.collectionId, + memberPostIds: story.memberPosts.map((post) => post.id).sort(), + firstSeenAt: story.cached?.firstSeenAt || story.firstSeenAt.toISOString(), + lastSeenAt: story.lastSeenAt.toISOString(), + lastLlmEvaluatedAt: evaluation + ? evaluatedAt.toISOString() + : story.cached?.lastLlmEvaluatedAt || null, + lastSignificanceScore: + evaluation?.significanceScore ?? + story.cached?.lastSignificanceScore ?? + null, + lastSignificanceLabel: + evaluation?.significanceLabel ?? + story.cached?.lastSignificanceLabel ?? + null, + lastHeadline: evaluation?.headline ?? story.cached?.lastHeadline ?? null, + status: story.cached?.status || 'active', +}); + +const toItemSignature = (item: { + storyKey: string; + rank: number; + postId: string; +}): string => `${item.storyKey}:${item.rank}:${item.postId}`; + +// Publishing depends on the editorial surface changing in a user-visible way: +// a different story, a different canonical post for the same story, or a +// different headline for the same ranked set. +export const shouldPublish = ({ + baseline, + internal, +}: { + baseline: HighlightBaselineItem[]; + internal: EvaluatedHighlightItem[]; +}): boolean => { + if (!baseline.length) { + return internal.length > 0; + } + + if (!internal.length) { + return false; + } + + const baselineSignature = baseline.map(toItemSignature).join('|'); + const internalSignature = internal.map(toItemSignature).join('|'); + + if (baselineSignature !== internalSignature) { + return true; + } + + const baselineHeadlines = baseline.map((item) => item.headline).join('|'); + const internalHeadlines = internal.map((item) => item.headline).join('|'); + return baselineHeadlines !== internalHeadlines; +}; + +export const compareSnapshots = ({ + baseline, + internal, +}: { + baseline: HighlightBaselineItem[]; + internal: EvaluatedHighlightItem[]; +}) => { + const baselineByStory = new Map( + baseline.map((item) => [item.storyKey, item]), + ); + const internalByStory = new Map( + internal.map((item) => [item.storyKey, item]), + ); + const overlap = [...internalByStory.keys()].filter((storyKey) => + baselineByStory.has(storyKey), + ); + + return { + baselineCount: baseline.length, + internalCount: internal.length, + overlapCount: overlap.length, + addedStoryKeys: [...internalByStory.keys()].filter( + (storyKey) => !baselineByStory.has(storyKey), + ), + removedStoryKeys: [...baselineByStory.keys()].filter( + (storyKey) => !internalByStory.has(storyKey), + ), + churnCount: + [...internalByStory.keys()].filter( + (storyKey) => !baselineByStory.has(storyKey), + ).length + + [...baselineByStory.keys()].filter( + (storyKey) => !internalByStory.has(storyKey), + ).length, + }; +}; diff --git a/src/common/channelHighlight/definitions.ts b/src/common/channelHighlight/definitions.ts index 657500e133..0fbc36f0ca 100644 --- a/src/common/channelHighlight/definitions.ts +++ b/src/common/channelHighlight/definitions.ts @@ -1,22 +1,19 @@ import type { DataSource } from 'typeorm'; import { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; -import { queryReadReplica } from '../queryReadReplica'; export const getChannelHighlightDefinitions = async ({ con, }: { con: DataSource; }): Promise => - queryReadReplica(con, ({ queryRunner }) => - queryRunner.manager.getRepository(ChannelHighlightDefinition).find({ - where: { - enabled: true, - }, - order: { - channel: 'ASC', - }, - }), - ); + con.getRepository(ChannelHighlightDefinition).find({ + where: { + enabled: true, + }, + order: { + channel: 'ASC', + }, + }); export const getChannelHighlightDefinitionByChannel = async ({ con, @@ -25,11 +22,9 @@ export const getChannelHighlightDefinitionByChannel = async ({ con: DataSource; channel: string; }): Promise => - queryReadReplica(con, ({ queryRunner }) => - queryRunner.manager.getRepository(ChannelHighlightDefinition).findOne({ - where: { - channel, - enabled: true, - }, - }), - ); + con.getRepository(ChannelHighlightDefinition).findOne({ + where: { + channel, + enabled: true, + }, + }); diff --git a/src/common/channelHighlight/generate.ts b/src/common/channelHighlight/generate.ts index 9daedd029e..9834e662f2 100644 --- a/src/common/channelHighlight/generate.ts +++ b/src/common/channelHighlight/generate.ts @@ -1,667 +1,120 @@ -import { Brackets, In, MoreThanOrEqual, type DataSource } from 'typeorm'; +import type { DataSource } from 'typeorm'; import { logger as baseLogger } from '../../logger'; -import { ONE_DAY_IN_SECONDS, ONE_HOUR_IN_SECONDS } from '../constants'; -import { queryReadReplica } from '../queryReadReplica'; -import { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; import { ChannelHighlightRun } from '../../entity/ChannelHighlightRun'; import { ChannelHighlightState } from '../../entity/ChannelHighlightState'; -import { PostHighlight } from '../../entity/PostHighlight'; -import { Post, PostContentQuality } from '../../entity/posts/Post'; -import { - PostRelation, - PostRelationType, -} from '../../entity/posts/PostRelation'; -import { - channelHighlightCandidatePoolSchema, - emptyChannelHighlightCandidatePool, - type ChannelHighlightCandidatePool, - type StoredHighlightStory, -} from './schema'; +import { replaceHighlightsForChannel } from './publish'; import { evaluateChannelHighlights, type EvaluatedHighlightItem, - type HighlightQualitySummary, - type HighlightStoryCandidate, } from './evaluate'; -import { replaceHighlightsForChannel } from './publish'; +import { + fetchCurrentHighlights, + fetchDefinitionState, + fetchIncrementalPosts, + fetchPostsByIds, + fetchRelations, + getFetchStart, + getHorizonStart, + mergePosts, + parseCandidatePool, +} from './queries'; +import { + buildBaselineSnapshot, + buildStories, + toStoryCandidate, +} from './stories'; +import { + compareSnapshots, + reuseEvaluations, + shouldPublish, + shouldReuseEvaluations, + toPoolStory, +} from './decisions'; +import type { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; +import type { GenerateChannelHighlightResult, HighlightStory } from './types'; -const HIGHLIGHT_FETCH_OVERLAP_SECONDS = 10 * 60; const MAX_CANDIDATE_POOL_STORIES = 50; const SHORTLIST_MULTIPLIER = 3; -type HighlightPost = Pick< - Post, - | 'id' - | 'type' - | 'title' - | 'summary' - | 'createdAt' - | 'metadataChangedAt' - | 'statsUpdatedAt' - | 'upvotes' - | 'comments' - | 'views' - | 'sourceId' - | 'contentCuration' - | 'contentQuality' - | 'visible' - | 'deleted' - | 'banned' - | 'showOnFeed' - | 'contentMeta' -> & { - url: string | null; - canonicalUrl: string | null; -}; - -type HighlightStory = { - storyKey: string; - canonicalPost: HighlightPost; - memberPosts: HighlightPost[]; - collectionId: string | null; - firstSeenAt: Date; - lastSeenAt: Date; - preliminaryScore: number; - cached?: StoredHighlightStory | null; -}; - -type HighlightBaselineItem = { - postId: string; - rank: number; - headline: string; - storyKey: string; -}; - -type GenerateChannelHighlightResult = { - run: ChannelHighlightRun; - published: boolean; -}; - -const getHorizonStart = ({ - now, - definition, -}: { - now: Date; - definition: Pick; -}): Date => - new Date( - now.getTime() - - definition.candidateHorizonHours * ONE_HOUR_IN_SECONDS * 1000, - ); - -const getFetchStart = ({ - now, - definition, - state, -}: { - now: Date; - definition: Pick; - state: Pick | null; -}): Date => { - const horizonStart = getHorizonStart({ - now, - definition, - }); - - if (!state?.lastFetchedAt) { - return horizonStart; - } - - const overlapStart = new Date( - state.lastFetchedAt.getTime() - HIGHLIGHT_FETCH_OVERLAP_SECONDS * 1000, - ); - - return overlapStart > horizonStart ? overlapStart : horizonStart; -}; - -const parseCandidatePool = ( - value: ChannelHighlightState['candidatePool'] | null | undefined, -): ChannelHighlightCandidatePool => { - const parsed = channelHighlightCandidatePoolSchema.safeParse(value); - return parsed.success ? parsed.data : emptyChannelHighlightCandidatePool(); -}; - -const toLastActivityAt = (post: HighlightPost): Date => { - const candidates = [ - post.createdAt?.getTime() || 0, - post.metadataChangedAt?.getTime() || 0, - post.statsUpdatedAt?.getTime() || 0, - ]; - return new Date(Math.max(...candidates)); -}; - -const toQualitySummary = ( - quality: PostContentQuality, -): HighlightQualitySummary => ({ - clickbaitProbability: - typeof quality?.is_clickbait_probability === 'number' - ? quality.is_clickbait_probability - : null, - specificity: quality?.specificity || null, - intent: quality?.intent || null, - substanceDepth: quality?.substance_depth || null, - titleContentAlignment: quality?.title_content_alignment || null, - selfPromotionScore: - typeof quality?.self_promotion_score === 'number' - ? quality.self_promotion_score - : null, -}); - -const getStoryKey = ({ - canonicalPost, - collectionId, -}: { - canonicalPost: HighlightPost; - collectionId: string | null; -}): string => { - if (collectionId) { - return `collection:${collectionId}`; - } - - const canonicalUrl = canonicalPost.canonicalUrl || canonicalPost.url; - if (canonicalUrl) { - return `url:${canonicalUrl.toLowerCase().trim()}`; - } - - return `post:${canonicalPost.id}`; -}; - -const toPreliminaryScore = ({ - story, - horizonStart, - referenceTime, -}: { - story: Omit; - horizonStart: Date; - referenceTime: Date; -}): number => { - const ageSeconds = Math.max( - 1, - (referenceTime.getTime() - story.canonicalPost.createdAt.getTime()) / 1000, - ); - const horizonSeconds = Math.max( - ONE_DAY_IN_SECONDS, - (referenceTime.getTime() - horizonStart.getTime()) / 1000, - ); - const recency = Math.max(0.15, 1 - ageSeconds / horizonSeconds); - const engagement = - story.canonicalPost.upvotes + - story.canonicalPost.comments * 2 + - story.canonicalPost.views / 200; - const collectionBoost = story.collectionId ? 8 : 0; - const curationBoost = story.canonicalPost.contentCuration.some((value) => - ['news', 'release', 'milestone', 'leak', 'drama'].includes(value), - ) - ? 5 - : 0; - const quality = toQualitySummary(story.canonicalPost.contentQuality || {}); - const penalty = - (quality.clickbaitProbability || 0) * 5 + - (quality.selfPromotionScore || 0) * 3; - - return Number( - (engagement * recency + collectionBoost + curationBoost - penalty).toFixed( - 3, - ), - ); -}; +const buildStoriesByPostId = ( + stories: HighlightStory[], +): Map => { + const storiesByPostId = new Map(); -const selectCanonicalPost = (posts: HighlightPost[]): HighlightPost => - posts.slice().sort((left, right) => { - const leftScore = left.upvotes + left.comments * 2 + left.views / 200; - const rightScore = right.upvotes + right.comments * 2 + right.views / 200; - if (leftScore !== rightScore) { - return rightScore - leftScore; + for (const story of stories) { + storiesByPostId.set(story.canonicalPost.id, story); + for (const memberPost of story.memberPosts) { + storiesByPostId.set(memberPost.id, story); } - - return right.createdAt.getTime() - left.createdAt.getTime(); - })[0]; - -const fetchDefinitionState = async ({ - con, - channel, -}: { - con: DataSource; - channel: string; -}): Promise => - queryReadReplica(con, ({ queryRunner }) => - queryRunner.manager.getRepository(ChannelHighlightState).findOne({ - where: { - channel, - }, - }), - ); - -const fetchCurrentHighlights = async ({ - con, - channel, -}: { - con: DataSource; - channel: string; -}): Promise => - queryReadReplica(con, ({ queryRunner }) => - queryRunner.manager.getRepository(PostHighlight).find({ - where: { - channel, - }, - order: { - rank: 'ASC', - }, - }), - ); - -const fetchPostsByIds = async ({ - con, - ids, - horizonStart, -}: { - con: DataSource; - ids: string[]; - horizonStart: Date; -}): Promise => { - if (!ids.length) { - return []; } - return queryReadReplica(con, ({ queryRunner }) => - queryRunner.manager.getRepository(Post).find({ - where: { - id: In(ids), - createdAt: MoreThanOrEqual(horizonStart), - visible: true, - deleted: false, - banned: false, - showOnFeed: true, - }, - }), - ) as unknown as Promise; + return storiesByPostId; }; -const fetchIncrementalPosts = async ({ - con, - channel, +const buildInputSummary = ({ fetchStart, horizonStart, + shortlist, }: { - con: DataSource; - channel: string; fetchStart: Date; horizonStart: Date; -}): Promise => - queryReadReplica(con, ({ queryRunner }) => - queryRunner.manager - .getRepository(Post) - .createQueryBuilder('post') - .where('post.createdAt >= :horizonStart', { horizonStart }) - .andWhere('post.visible = true') - .andWhere('post.deleted = false') - .andWhere('post.banned = false') - .andWhere('post.showOnFeed = true') - .andWhere(`(post."contentMeta"->'channels') ? :channel`, { channel }) - .andWhere( - new Brackets((builder) => { - builder - .where('post.createdAt >= :fetchStart', { fetchStart }) - .orWhere('post.metadataChangedAt >= :fetchStart', { fetchStart }) - .orWhere('post.statsUpdatedAt >= :fetchStart', { fetchStart }); - }), - ) - .getMany(), - ) as unknown as Promise; - -const fetchRelations = async ({ - con, - postIds, -}: { - con: DataSource; - postIds: string[]; -}): Promise => { - if (!postIds.length) { - return []; - } - - return queryReadReplica(con, ({ queryRunner }) => - queryRunner.manager - .getRepository(PostRelation) - .createQueryBuilder('relation') - .where('relation.type = :type', { - type: PostRelationType.Collection, - }) - .andWhere( - new Brackets((builder) => { - builder - .where('relation.relatedPostId IN (:...postIds)', { postIds }) - .orWhere('relation.postId IN (:...postIds)', { postIds }); - }), - ) - .getMany(), - ); -}; - -const mergePosts = (groups: HighlightPost[][]): HighlightPost[] => { - const byId = new Map(); - for (const posts of groups) { - for (const post of posts) { - byId.set(post.id, post); - } - } - - return [...byId.values()]; -}; - -const buildStories = ({ - posts, - relations, - previousPool, - horizonStart, - referenceTime, -}: { - posts: HighlightPost[]; - relations: PostRelation[]; - previousPool: ChannelHighlightCandidatePool; - horizonStart: Date; - referenceTime: Date; -}): HighlightStory[] => { - const postsById = new Map(posts.map((post) => [post.id, post])); - const previousStories = new Map( - previousPool.stories.map((story) => [story.storyKey, story]), - ); - const collectionByChildId = new Map(); - const collectionIds = new Set(); - const relationActivityByCollectionId = new Map(); - - for (const relation of relations) { - if ( - !postsById.has(relation.postId) || - !postsById.has(relation.relatedPostId) - ) { - continue; - } - - collectionIds.add(relation.postId); - collectionByChildId.set(relation.relatedPostId, relation.postId); - const currentActivity = relationActivityByCollectionId.get(relation.postId); - if (!currentActivity || relation.createdAt > currentActivity) { - relationActivityByCollectionId.set(relation.postId, relation.createdAt); - } - } - - const groupedStories = new Map(); - const storyCollections = new Map(); - - for (const post of posts) { - const collectionId = - collectionByChildId.get(post.id) || - (collectionIds.has(post.id) ? post.id : null); - const canonicalPost = - collectionId && postsById.has(collectionId) - ? postsById.get(collectionId)! - : post; - const storyKey = getStoryKey({ - canonicalPost, - collectionId, - }); - - if (!groupedStories.has(storyKey)) { - groupedStories.set(storyKey, []); - storyCollections.set(storyKey, collectionId); - } - - groupedStories.get(storyKey)!.push(post); - } - - return [...groupedStories.entries()] - .map(([storyKey, memberPosts]) => { - const collectionId = storyCollections.get(storyKey) || null; - const canonicalPost = - collectionId && postsById.has(collectionId) - ? postsById.get(collectionId)! - : selectCanonicalPost(memberPosts); - const firstSeenAt = memberPosts.reduce( - (current, post) => - post.createdAt < current ? post.createdAt : current, - canonicalPost.createdAt, - ); - const lastSeenAt = memberPosts.reduce((current, post) => { - const lastActivityAt = toLastActivityAt(post); - return lastActivityAt > current ? lastActivityAt : current; - }, toLastActivityAt(canonicalPost)); - const relationActivityAt = collectionId - ? relationActivityByCollectionId.get(collectionId) - : null; - const baseStory = { - storyKey, - canonicalPost, - memberPosts, - collectionId, - firstSeenAt, - lastSeenAt: - relationActivityAt && relationActivityAt > lastSeenAt - ? relationActivityAt - : lastSeenAt, - cached: previousStories.get(storyKey) || null, - }; - - return { - ...baseStory, - preliminaryScore: toPreliminaryScore({ - story: baseStory, - horizonStart, - referenceTime, - }), - }; - }) - .filter((story) => story.lastSeenAt >= horizonStart) - .sort((left, right) => right.preliminaryScore - left.preliminaryScore); -}; - -const toStoryCandidate = (story: HighlightStory): HighlightStoryCandidate => ({ - storyKey: story.storyKey, - canonicalPostId: story.canonicalPost.id, - collectionId: story.collectionId, - memberPostIds: story.memberPosts.map((post) => post.id).sort(), - title: story.canonicalPost.title || '', - summary: story.canonicalPost.summary || '', - type: story.canonicalPost.type, - sourceId: story.canonicalPost.sourceId, - createdAt: story.canonicalPost.createdAt.toISOString(), - lastActivityAt: story.lastSeenAt.toISOString(), - upvotes: story.canonicalPost.upvotes, - comments: story.canonicalPost.comments, - views: story.canonicalPost.views, - contentCuration: story.canonicalPost.contentCuration || [], - quality: toQualitySummary(story.canonicalPost.contentQuality || {}), - preliminaryScore: story.preliminaryScore, -}); - -const toBaselineStoryKey = ({ - postId, - storiesByPostId, -}: { - postId: string; - storiesByPostId: Map; -}): string => storiesByPostId.get(postId)?.storyKey || `post:${postId}`; - -const buildBaselineSnapshot = ({ - highlights, - storiesByPostId, -}: { - highlights: PostHighlight[]; - storiesByPostId: Map; -}): HighlightBaselineItem[] => - highlights.map((highlight) => ({ - postId: highlight.postId, - rank: highlight.rank, - headline: highlight.headline, - storyKey: toBaselineStoryKey({ - postId: highlight.postId, - storiesByPostId, - }), - })); - -const arePostIdsEqual = (left: string[], right: string[]): boolean => - left.length === right.length && - left.every((postId, index) => postId === right[index]); - -const hasCachedStoryChanged = (story: HighlightStory): boolean => { - if (!story.cached) { - return true; - } - - const cachedMemberPostIds = [...story.cached.memberPostIds].sort(); - const storyMemberPostIds = story.memberPosts.map((post) => post.id).sort(); - - return ( - story.cached.canonicalPostId !== story.canonicalPost.id || - !arePostIdsEqual(cachedMemberPostIds, storyMemberPostIds) - ); -}; - -const shouldReuseEvaluations = ({ - shortlist, -}: { shortlist: HighlightStory[]; -}): boolean => - shortlist.length > 0 && - shortlist.every((story) => { - if ( - !story.cached?.lastHeadline || - story.cached.lastSignificanceScore == null - ) { - return false; - } - - if (!story.cached.lastLlmEvaluatedAt) { - return false; - } - - if (hasCachedStoryChanged(story)) { - return false; - } - - return ( - new Date(story.cached.lastLlmEvaluatedAt).getTime() >= - story.lastSeenAt.getTime() - ); - }); - -const reuseEvaluations = ({ - shortlist, - maxItems, -}: { - shortlist: HighlightStory[]; - maxItems: number; -}): EvaluatedHighlightItem[] => - shortlist.slice(0, maxItems).map((story, index) => ({ +}) => ({ + fetchStart: fetchStart.toISOString(), + horizonStart: horizonStart.toISOString(), + shortlist: shortlist.map((story) => ({ storyKey: story.storyKey, - postId: story.canonicalPost.id, - headline: story.cached?.lastHeadline || story.canonicalPost.title || '', - significanceScore: story.cached?.lastSignificanceScore || 0, - significanceLabel: story.cached?.lastSignificanceLabel || 'routine', - rank: index + 1, - reason: 'Reused cached evaluation', - })); - -const toPoolStory = ({ - story, - evaluation, - evaluatedAt, -}: { - story: HighlightStory; - evaluation?: EvaluatedHighlightItem; - evaluatedAt: Date; -}): StoredHighlightStory => ({ - storyKey: story.storyKey, - canonicalPostId: story.canonicalPost.id, - collectionId: story.collectionId, - memberPostIds: story.memberPosts.map((post) => post.id).sort(), - firstSeenAt: story.cached?.firstSeenAt || story.firstSeenAt.toISOString(), - lastSeenAt: story.lastSeenAt.toISOString(), - lastLlmEvaluatedAt: evaluation - ? evaluatedAt.toISOString() - : story.cached?.lastLlmEvaluatedAt || null, - lastSignificanceScore: - evaluation?.significanceScore ?? - story.cached?.lastSignificanceScore ?? - null, - lastSignificanceLabel: - evaluation?.significanceLabel ?? - story.cached?.lastSignificanceLabel ?? - null, - lastHeadline: evaluation?.headline ?? story.cached?.lastHeadline ?? null, - status: story.cached?.status || 'active', + canonicalPostId: story.canonicalPost.id, + preliminaryScore: story.preliminaryScore, + })), }); -const shouldPublish = ({ - baseline, - internal, +const buildMetrics = ({ + incrementalPosts, + retainedPosts, + stories, + shortlist, + reusedEvaluation, }: { - baseline: HighlightBaselineItem[]; - internal: EvaluatedHighlightItem[]; -}): boolean => { - if (!baseline.length) { - return internal.length > 0; - } - - if (!internal.length) { - return false; - } - - const toItemSignature = (item: { - storyKey: string; - rank: number; - postId: string; - }): string => `${item.storyKey}:${item.rank}:${item.postId}`; - - const baselineSignature = baseline.map(toItemSignature).join('|'); - const internalSignature = internal.map(toItemSignature).join('|'); - - if (baselineSignature !== internalSignature) { - return true; - } - - const baselineHeadlines = baseline.map((item) => item.headline).join('|'); - const internalHeadlines = internal.map((item) => item.headline).join('|'); - return baselineHeadlines !== internalHeadlines; -}; + incrementalPosts: { length: number }; + retainedPosts: { length: number }; + stories: { length: number }; + shortlist: { length: number }; + reusedEvaluation: boolean; +}) => ({ + fetchedPosts: incrementalPosts.length, + retainedPosts: retainedPosts.length, + totalStories: stories.length, + shortlistStories: shortlist.length, + evaluatedStories: reusedEvaluation ? 0 : shortlist.length, + reusedEvaluation, +}); -const compareSnapshots = ({ - baseline, - internal, +const buildCandidatePool = ({ + stories, + evaluatedItems, + now, }: { - baseline: HighlightBaselineItem[]; - internal: EvaluatedHighlightItem[]; -}) => { - const baselineByStory = new Map( - baseline.map((item) => [item.storyKey, item]), - ); - const internalByStory = new Map( - internal.map((item) => [item.storyKey, item]), - ); - const overlap = [...internalByStory.keys()].filter((storyKey) => - baselineByStory.has(storyKey), - ); - - return { - baselineCount: baseline.length, - internalCount: internal.length, - overlapCount: overlap.length, - addedStoryKeys: [...internalByStory.keys()].filter( - (storyKey) => !baselineByStory.has(storyKey), - ), - removedStoryKeys: [...baselineByStory.keys()].filter( - (storyKey) => !internalByStory.has(storyKey), - ), - churnCount: - [...internalByStory.keys()].filter( - (storyKey) => !baselineByStory.has(storyKey), - ).length + - [...baselineByStory.keys()].filter( - (storyKey) => !internalByStory.has(storyKey), - ).length, - }; -}; + stories: HighlightStory[]; + evaluatedItems: EvaluatedHighlightItem[]; + now: Date; +}) => ({ + stories: stories.slice(0, MAX_CANDIDATE_POOL_STORIES).map((story) => + toPoolStory({ + story, + evaluation: evaluatedItems.find( + (item) => item.storyKey === story.storyKey, + ), + evaluatedAt: now, + }), + ), +}); +// High-level flow: +// 1. Fetch fresh and retained posts inside the configured horizon. +// 2. Collapse them into story candidates, preferring collections. +// 3. Reuse cached editorial decisions only when the story is truly unchanged. +// 4. Compare the internal result with the live highlights, then persist the run. export const generateChannelHighlight = async ({ con, definition, @@ -689,13 +142,14 @@ export const generateChannelHighlight = async ({ definition, state, }); - const previousPoolPostIds = previousPool.stories.flatMap( - (story) => story.memberPostIds, - ); - const currentHighlightPostIds = currentHighlights.map((item) => item.postId); + + const retainedPostIds = new Set([ + ...previousPool.stories.flatMap((story) => story.memberPostIds), + ...currentHighlights.map((item) => item.postId), + ]); const retainedPosts = await fetchPostsByIds({ con, - ids: [...new Set([...previousPoolPostIds, ...currentHighlightPostIds])], + ids: [...retainedPostIds], horizonStart, }); const incrementalPosts = await fetchIncrementalPosts({ @@ -709,35 +163,26 @@ export const generateChannelHighlight = async ({ con, postIds: basePosts.map((post) => post.id), }); - const relatedPostIds = [ - ...new Set( - baseRelations.flatMap((relation) => [ - relation.postId, - relation.relatedPostId, - ]), - ), - ]; const relationPosts = await fetchPostsByIds({ con, - ids: relatedPostIds, + ids: [ + ...new Set( + baseRelations.flatMap((relation) => [ + relation.postId, + relation.relatedPostId, + ]), + ), + ], horizonStart, }); - const posts = mergePosts([basePosts, relationPosts]); const stories = buildStories({ - posts, + posts: mergePosts([basePosts, relationPosts]), relations: baseRelations, previousPool, horizonStart, referenceTime: now, }); - const storiesByPostId = new Map(); - for (const story of stories) { - storiesByPostId.set(story.canonicalPost.id, story); - for (const memberPost of story.memberPosts) { - storiesByPostId.set(memberPost.id, story); - } - } - + const storiesByPostId = buildStoriesByPostId(stories); const shortlist = stories.slice( 0, definition.maxItems * SHORTLIST_MULTIPLIER, @@ -746,11 +191,11 @@ export const generateChannelHighlight = async ({ highlights: currentHighlights, storiesByPostId, }); - const reusedEvaluations = shouldReuseEvaluations({ + + const reusedEvaluation = shouldReuseEvaluations({ shortlist, }); - - const evaluatedItems = reusedEvaluations + const evaluatedItems = reusedEvaluation ? reuseEvaluations({ shortlist, maxItems: definition.maxItems, @@ -764,17 +209,6 @@ export const generateChannelHighlight = async ({ }) ).items; - const candidatePool = { - stories: stories.slice(0, MAX_CANDIDATE_POOL_STORIES).map((story) => - toPoolStory({ - story, - evaluation: evaluatedItems.find( - (item) => item.storyKey === story.storyKey, - ), - evaluatedAt: now, - }), - ), - }; const comparison = compareSnapshots({ baseline: baselineSnapshot, internal: evaluatedItems, @@ -784,14 +218,13 @@ export const generateChannelHighlight = async ({ internal: evaluatedItems, }); const publish = definition.mode === 'publish' && wouldPublish; - const metrics = { - fetchedPosts: incrementalPosts.length, - retainedPosts: retainedPosts.length, - totalStories: stories.length, - shortlistStories: shortlist.length, - evaluatedStories: reusedEvaluations ? 0 : shortlist.length, - reusedEvaluation: reusedEvaluations, - }; + const metrics = buildMetrics({ + incrementalPosts, + retainedPosts, + stories, + shortlist, + reusedEvaluation, + }); const runRepo = con.getRepository(ChannelHighlightRun); let run = runRepo.create({ @@ -799,15 +232,11 @@ export const generateChannelHighlight = async ({ scheduledAt: now, status: 'processing', baselineSnapshot, - inputSummary: { - fetchStart: fetchStart.toISOString(), - horizonStart: horizonStart.toISOString(), - shortlist: shortlist.map((story) => ({ - storyKey: story.storyKey, - canonicalPostId: story.canonicalPost.id, - preliminaryScore: story.preliminaryScore, - })), - }, + inputSummary: buildInputSummary({ + fetchStart, + horizonStart, + shortlist, + }), internalSnapshot: [], comparison: {}, metrics, @@ -820,7 +249,11 @@ export const generateChannelHighlight = async ({ channel: definition.channel, lastFetchedAt: now, lastPublishedAt: publish ? now : state?.lastPublishedAt || null, - candidatePool, + candidatePool: buildCandidatePool({ + stories, + evaluatedItems, + now, + }), }); if (publish) { @@ -853,12 +286,10 @@ export const generateChannelHighlight = async ({ ); }); - const completedRun = await runRepo.findOneByOrFail({ - id: run.id, - }); - return { - run: completedRun, + run: await runRepo.findOneByOrFail({ + id: run.id, + }), published: publish, }; } catch (err) { diff --git a/src/common/channelHighlight/queries.ts b/src/common/channelHighlight/queries.ts new file mode 100644 index 0000000000..65fd442483 --- /dev/null +++ b/src/common/channelHighlight/queries.ts @@ -0,0 +1,184 @@ +import { Brackets, In, MoreThanOrEqual, type DataSource } from 'typeorm'; +import { ONE_HOUR_IN_SECONDS } from '../constants'; +import { ChannelHighlightState } from '../../entity/ChannelHighlightState'; +import { PostHighlight } from '../../entity/PostHighlight'; +import { Post } from '../../entity/posts/Post'; +import { + PostRelation, + PostRelationType, +} from '../../entity/posts/PostRelation'; +import { + channelHighlightCandidatePoolSchema, + emptyChannelHighlightCandidatePool, + type ChannelHighlightCandidatePool, +} from './schema'; +import type { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; +import type { HighlightPost } from './types'; + +const HIGHLIGHT_FETCH_OVERLAP_SECONDS = 10 * 60; + +export const getHorizonStart = ({ + now, + definition, +}: { + now: Date; + definition: Pick; +}): Date => + new Date( + now.getTime() - + definition.candidateHorizonHours * ONE_HOUR_IN_SECONDS * 1000, + ); + +export const getFetchStart = ({ + now, + definition, + state, +}: { + now: Date; + definition: Pick; + state: Pick | null; +}): Date => { + const horizonStart = getHorizonStart({ + now, + definition, + }); + + if (!state?.lastFetchedAt) { + return horizonStart; + } + + const overlapStart = new Date( + state.lastFetchedAt.getTime() - HIGHLIGHT_FETCH_OVERLAP_SECONDS * 1000, + ); + + return overlapStart > horizonStart ? overlapStart : horizonStart; +}; + +export const parseCandidatePool = ( + value: ChannelHighlightState['candidatePool'] | null | undefined, +): ChannelHighlightCandidatePool => { + const parsed = channelHighlightCandidatePoolSchema.safeParse(value); + return parsed.success ? parsed.data : emptyChannelHighlightCandidatePool(); +}; + +export const fetchDefinitionState = async ({ + con, + channel, +}: { + con: DataSource; + channel: string; +}): Promise => + con.getRepository(ChannelHighlightState).findOne({ + where: { + channel, + }, + }); + +export const fetchCurrentHighlights = async ({ + con, + channel, +}: { + con: DataSource; + channel: string; +}): Promise => + con.getRepository(PostHighlight).find({ + where: { + channel, + }, + order: { + rank: 'ASC', + }, + }); + +export const fetchPostsByIds = async ({ + con, + ids, + horizonStart, +}: { + con: DataSource; + ids: string[]; + horizonStart: Date; +}): Promise => { + if (!ids.length) { + return []; + } + + return con.getRepository(Post).find({ + where: { + id: In(ids), + createdAt: MoreThanOrEqual(horizonStart), + visible: true, + deleted: false, + banned: false, + showOnFeed: true, + }, + }) as unknown as Promise; +}; + +export const fetchIncrementalPosts = async ({ + con, + channel, + fetchStart, + horizonStart, +}: { + con: DataSource; + channel: string; + fetchStart: Date; + horizonStart: Date; +}): Promise => + con + .getRepository(Post) + .createQueryBuilder('post') + .where('post.createdAt >= :horizonStart', { horizonStart }) + .andWhere('post.visible = true') + .andWhere('post.deleted = false') + .andWhere('post.banned = false') + .andWhere('post.showOnFeed = true') + .andWhere(`(post."contentMeta"->'channels') ? :channel`, { channel }) + .andWhere( + new Brackets((builder) => { + builder + .where('post.createdAt >= :fetchStart', { fetchStart }) + .orWhere('post.metadataChangedAt >= :fetchStart', { fetchStart }) + .orWhere('post.statsUpdatedAt >= :fetchStart', { fetchStart }); + }), + ) + .getMany() as unknown as Promise; + +export const fetchRelations = async ({ + con, + postIds, +}: { + con: DataSource; + postIds: string[]; +}): Promise => { + if (!postIds.length) { + return []; + } + + return con + .getRepository(PostRelation) + .createQueryBuilder('relation') + .where('relation.type = :type', { + type: PostRelationType.Collection, + }) + .andWhere( + new Brackets((builder) => { + builder + .where('relation.relatedPostId IN (:...postIds)', { postIds }) + .orWhere('relation.postId IN (:...postIds)', { postIds }); + }), + ) + .getMany(); +}; + +export const mergePosts = (groups: HighlightPost[][]): HighlightPost[] => { + const byId = new Map(); + for (const posts of groups) { + for (const post of posts) { + byId.set(post.id, post); + } + } + + return [...byId.values()]; +}; diff --git a/src/common/channelHighlight/stories.ts b/src/common/channelHighlight/stories.ts new file mode 100644 index 0000000000..b927a0d98f --- /dev/null +++ b/src/common/channelHighlight/stories.ts @@ -0,0 +1,286 @@ +import { PostType, type PostContentQuality } from '../../entity/posts/Post'; +import { PostRelation } from '../../entity/posts/PostRelation'; +import { ONE_DAY_IN_SECONDS } from '../constants'; +import type { ChannelHighlightCandidatePool } from './schema'; +import type { + HighlightBaselineItem, + HighlightPost, + HighlightStory, +} from './types'; +import type { + HighlightQualitySummary, + HighlightStoryCandidate, +} from './evaluate'; + +const TWITTER_STATUS_URL_REGEX = + /(?:https?:\/\/)?(?:www\.)?(?:twitter\.com|x\.com)\/[^/?#]+\/status\/(\d+)/i; + +const toLastActivityAt = (post: HighlightPost): Date => { + const candidates = [ + post.createdAt?.getTime() || 0, + post.metadataChangedAt?.getTime() || 0, + post.statsUpdatedAt?.getTime() || 0, + ]; + return new Date(Math.max(...candidates)); +}; + +export const toQualitySummary = ( + quality: PostContentQuality, +): HighlightQualitySummary => ({ + clickbaitProbability: + typeof quality?.is_clickbait_probability === 'number' + ? quality.is_clickbait_probability + : null, + specificity: quality?.specificity || null, + intent: quality?.intent || null, + substanceDepth: quality?.substance_depth || null, + titleContentAlignment: quality?.title_content_alignment || null, + selfPromotionScore: + typeof quality?.self_promotion_score === 'number' + ? quality.self_promotion_score + : null, +}); + +const getTwitterStoryKey = (post: HighlightPost): string | null => { + if (post.type !== PostType.SocialTwitter) { + return null; + } + + if (post.sharedPostId) { + return `twitter:${post.sharedPostId}`; + } + + const match = (post.canonicalUrl || post.url || '').match( + TWITTER_STATUS_URL_REGEX, + ); + return match?.[1] ? `twitter:${match[1]}` : null; +}; + +export const getStoryKey = ({ + canonicalPost, + collectionId, +}: { + canonicalPost: HighlightPost; + collectionId: string | null; +}): string => { + if (collectionId) { + return `collection:${collectionId}`; + } + + const twitterStoryKey = getTwitterStoryKey(canonicalPost); + if (twitterStoryKey) { + return twitterStoryKey; + } + + const canonicalUrl = canonicalPost.canonicalUrl || canonicalPost.url; + if (canonicalUrl) { + return `url:${canonicalUrl.toLowerCase().trim()}`; + } + + return `post:${canonicalPost.id}`; +}; + +const toPreliminaryScore = ({ + story, + horizonStart, + referenceTime, +}: { + story: Omit; + horizonStart: Date; + referenceTime: Date; +}): number => { + const ageSeconds = Math.max( + 1, + (referenceTime.getTime() - story.canonicalPost.createdAt.getTime()) / 1000, + ); + const horizonSeconds = Math.max( + ONE_DAY_IN_SECONDS, + (referenceTime.getTime() - horizonStart.getTime()) / 1000, + ); + const recency = Math.max(0.15, 1 - ageSeconds / horizonSeconds); + const engagement = + story.canonicalPost.upvotes + + story.canonicalPost.comments * 2 + + story.canonicalPost.views / 200; + const collectionBoost = story.collectionId ? 8 : 0; + const curationBoost = story.canonicalPost.contentCuration.some((value) => + ['news', 'release', 'milestone', 'leak', 'drama'].includes(value), + ) + ? 5 + : 0; + const quality = toQualitySummary(story.canonicalPost.contentQuality || {}); + const penalty = + (quality.clickbaitProbability || 0) * 5 + + (quality.selfPromotionScore || 0) * 3; + + return Number( + (engagement * recency + collectionBoost + curationBoost - penalty).toFixed( + 3, + ), + ); +}; + +const selectCanonicalPost = (posts: HighlightPost[]): HighlightPost => + posts.slice().sort((left, right) => { + const leftScore = left.upvotes + left.comments * 2 + left.views / 200; + const rightScore = right.upvotes + right.comments * 2 + right.views / 200; + if (leftScore !== rightScore) { + return rightScore - leftScore; + } + + return right.createdAt.getTime() - left.createdAt.getTime(); + })[0]; + +// Highlights reason about stories, not raw posts. Collections are the primary +// story boundary, while social/twitter posts fall back to a stable tweet key. +export const buildStories = ({ + posts, + relations, + previousPool, + horizonStart, + referenceTime, +}: { + posts: HighlightPost[]; + relations: PostRelation[]; + previousPool: ChannelHighlightCandidatePool; + horizonStart: Date; + referenceTime: Date; +}): HighlightStory[] => { + const postsById = new Map(posts.map((post) => [post.id, post])); + const previousStories = new Map( + previousPool.stories.map((story) => [story.storyKey, story]), + ); + const collectionByChildId = new Map(); + const collectionIds = new Set(); + const relationActivityByCollectionId = new Map(); + + for (const relation of relations) { + if ( + !postsById.has(relation.postId) || + !postsById.has(relation.relatedPostId) + ) { + continue; + } + + collectionIds.add(relation.postId); + collectionByChildId.set(relation.relatedPostId, relation.postId); + const currentActivity = relationActivityByCollectionId.get(relation.postId); + if (!currentActivity || relation.createdAt > currentActivity) { + relationActivityByCollectionId.set(relation.postId, relation.createdAt); + } + } + + const groupedStories = new Map(); + const storyCollections = new Map(); + + for (const post of posts) { + const collectionId = + collectionByChildId.get(post.id) || + (collectionIds.has(post.id) ? post.id : null); + const canonicalPost = + collectionId && postsById.has(collectionId) + ? postsById.get(collectionId)! + : post; + const storyKey = getStoryKey({ + canonicalPost, + collectionId, + }); + + if (!groupedStories.has(storyKey)) { + groupedStories.set(storyKey, []); + storyCollections.set(storyKey, collectionId); + } + + groupedStories.get(storyKey)!.push(post); + } + + return [...groupedStories.entries()] + .map(([storyKey, memberPosts]) => { + const collectionId = storyCollections.get(storyKey) || null; + const canonicalPost = + collectionId && postsById.has(collectionId) + ? postsById.get(collectionId)! + : selectCanonicalPost(memberPosts); + const firstSeenAt = memberPosts.reduce( + (current, post) => + post.createdAt < current ? post.createdAt : current, + canonicalPost.createdAt, + ); + const postActivityAt = memberPosts.reduce((current, post) => { + const lastActivityAt = toLastActivityAt(post); + return lastActivityAt > current ? lastActivityAt : current; + }, toLastActivityAt(canonicalPost)); + const relationActivityAt = collectionId + ? relationActivityByCollectionId.get(collectionId) + : null; + const baseStory = { + storyKey, + canonicalPost, + memberPosts, + collectionId, + firstSeenAt, + lastSeenAt: + relationActivityAt && relationActivityAt > postActivityAt + ? relationActivityAt + : postActivityAt, + cached: previousStories.get(storyKey) || null, + }; + + return { + ...baseStory, + preliminaryScore: toPreliminaryScore({ + story: baseStory, + horizonStart, + referenceTime, + }), + }; + }) + .filter((story) => story.lastSeenAt >= horizonStart) + .sort((left, right) => right.preliminaryScore - left.preliminaryScore); +}; + +export const toStoryCandidate = ( + story: HighlightStory, +): HighlightStoryCandidate => ({ + storyKey: story.storyKey, + canonicalPostId: story.canonicalPost.id, + collectionId: story.collectionId, + memberPostIds: story.memberPosts.map((post) => post.id).sort(), + title: story.canonicalPost.title || '', + summary: story.canonicalPost.summary || '', + type: story.canonicalPost.type, + sourceId: story.canonicalPost.sourceId, + createdAt: story.canonicalPost.createdAt.toISOString(), + lastActivityAt: story.lastSeenAt.toISOString(), + upvotes: story.canonicalPost.upvotes, + comments: story.canonicalPost.comments, + views: story.canonicalPost.views, + contentCuration: story.canonicalPost.contentCuration || [], + quality: toQualitySummary(story.canonicalPost.contentQuality || {}), + preliminaryScore: story.preliminaryScore, +}); + +const toBaselineStoryKey = ({ + postId, + storiesByPostId, +}: { + postId: string; + storiesByPostId: Map; +}): string => storiesByPostId.get(postId)?.storyKey || `post:${postId}`; + +export const buildBaselineSnapshot = ({ + highlights, + storiesByPostId, +}: { + highlights: { postId: string; rank: number; headline: string }[]; + storiesByPostId: Map; +}): HighlightBaselineItem[] => + highlights.map((highlight) => ({ + postId: highlight.postId, + rank: highlight.rank, + headline: highlight.headline, + storyKey: toBaselineStoryKey({ + postId: highlight.postId, + storiesByPostId, + }), + })); diff --git a/src/common/channelHighlight/types.ts b/src/common/channelHighlight/types.ts new file mode 100644 index 0000000000..012f1fa5b1 --- /dev/null +++ b/src/common/channelHighlight/types.ts @@ -0,0 +1,52 @@ +import type { Post } from '../../entity/posts/Post'; +import type { ChannelHighlightRun } from '../../entity/ChannelHighlightRun'; +import type { StoredHighlightStory } from './schema'; + +export type HighlightPost = Pick< + Post, + | 'id' + | 'type' + | 'title' + | 'summary' + | 'createdAt' + | 'metadataChangedAt' + | 'statsUpdatedAt' + | 'upvotes' + | 'comments' + | 'views' + | 'sourceId' + | 'contentCuration' + | 'contentQuality' + | 'visible' + | 'deleted' + | 'banned' + | 'showOnFeed' + | 'contentMeta' +> & { + url: string | null; + canonicalUrl: string | null; + sharedPostId?: string | null; +}; + +export type HighlightStory = { + storyKey: string; + canonicalPost: HighlightPost; + memberPosts: HighlightPost[]; + collectionId: string | null; + firstSeenAt: Date; + lastSeenAt: Date; + preliminaryScore: number; + cached?: StoredHighlightStory | null; +}; + +export type HighlightBaselineItem = { + postId: string; + rank: number; + headline: string; + storyKey: string; +}; + +export type GenerateChannelHighlightResult = { + run: ChannelHighlightRun; + published: boolean; +}; diff --git a/src/cron/channelHighlights.ts b/src/cron/channelHighlights.ts index e40732f85a..5e67545b38 100644 --- a/src/cron/channelHighlights.ts +++ b/src/cron/channelHighlights.ts @@ -2,13 +2,10 @@ import { getChannelHighlightDefinitions } from '../common/channelHighlight/defin import { triggerTypedEvent } from '../common/typedPubsub'; import { Cron } from './cron'; -export const getChannelHighlightsNow = (): Date => new Date(); - const cron: Cron = { name: 'channel-highlights', handler: async (con, logger) => { - const now = getChannelHighlightsNow(); - const scheduledAt = now.toISOString(); + const scheduledAt = new Date().toISOString(); const definitions = await getChannelHighlightDefinitions({ con, }); diff --git a/src/workers/generateChannelDigest.ts b/src/workers/generateChannelDigest.ts index 1d70994d4b..889a5ad1a9 100644 --- a/src/workers/generateChannelDigest.ts +++ b/src/workers/generateChannelDigest.ts @@ -5,13 +5,8 @@ import { ONE_MINUTE_IN_SECONDS, ONE_WEEK_IN_SECONDS, } from '../common/constants'; -import { - checkRedisObjectExists, - deleteRedisKey, - setRedisObjectIfNotExistsWithExpiry, - setRedisObjectWithExpiry, -} from '../redis'; import { TypedWorker } from './worker'; +import { withRedisDoneLock } from './withRedisDoneLock'; const CHANNEL_DIGEST_LOCK_TTL_SECONDS = 10 * ONE_MINUTE_IN_SECONDS; @@ -62,43 +57,29 @@ const worker: TypedWorker<'api.v1.generate-channel-digest'> = { return; } - const doneKey = getChannelDigestDoneKey({ - digestKey, - scheduledAt, - }); - if (await checkRedisObjectExists(doneKey)) { - return; - } - - const lockKey = getChannelDigestLockKey({ - digestKey, - scheduledAt, - }); - const lockAcquired = await setRedisObjectIfNotExistsWithExpiry( - lockKey, - message.messageId || digestKey, - CHANNEL_DIGEST_LOCK_TTL_SECONDS, - ); - if (!lockAcquired) { - return; - } - try { - await generateChannelDigest({ - con, - definition, - now, + await withRedisDoneLock({ + doneKey: getChannelDigestDoneKey({ + digestKey, + scheduledAt, + }), + lockKey: getChannelDigestLockKey({ + digestKey, + scheduledAt, + }), + lockValue: message.messageId || digestKey, + lockTtlSeconds: CHANNEL_DIGEST_LOCK_TTL_SECONDS, + doneTtlSeconds: getChannelDigestDoneTtl(definition.frequency), + execute: () => + generateChannelDigest({ + con, + definition, + now, + }).then(() => undefined), }); - await setRedisObjectWithExpiry( - doneKey, - '1', - getChannelDigestDoneTtl(definition.frequency), - ); } catch (err) { logger.error({ ...logDetails, err }, 'Failed to generate channel digest'); throw err; - } finally { - await deleteRedisKey(lockKey); } }, }; diff --git a/src/workers/generateChannelHighlight.ts b/src/workers/generateChannelHighlight.ts index d8ff9b8215..93ea81b8a4 100644 --- a/src/workers/generateChannelHighlight.ts +++ b/src/workers/generateChannelHighlight.ts @@ -1,13 +1,8 @@ import { ONE_DAY_IN_SECONDS, ONE_MINUTE_IN_SECONDS } from '../common/constants'; import { getChannelHighlightDefinitionByChannel } from '../common/channelHighlight/definitions'; import { generateChannelHighlight } from '../common/channelHighlight/generate'; -import { - checkRedisObjectExists, - deleteRedisKey, - setRedisObjectIfNotExistsWithExpiry, - setRedisObjectWithExpiry, -} from '../redis'; import { TypedWorker } from './worker'; +import { withRedisDoneLock } from './withRedisDoneLock'; const CHANNEL_HIGHLIGHT_LOCK_TTL_SECONDS = 10 * ONE_MINUTE_IN_SECONDS; const CHANNEL_HIGHLIGHT_DONE_TTL_SECONDS = 2 * ONE_DAY_IN_SECONDS; @@ -49,46 +44,32 @@ const worker: TypedWorker<'api.v1.generate-channel-highlight'> = { return; } - const doneKey = getChannelHighlightDoneKey({ - channel, - scheduledAt, - }); - if (await checkRedisObjectExists(doneKey)) { - return; - } - - const lockKey = getChannelHighlightLockKey({ - channel, - scheduledAt, - }); - const lockAcquired = await setRedisObjectIfNotExistsWithExpiry( - lockKey, - message.messageId || channel, - CHANNEL_HIGHLIGHT_LOCK_TTL_SECONDS, - ); - if (!lockAcquired) { - return; - } - try { - await generateChannelHighlight({ - con, - definition, - now, + await withRedisDoneLock({ + doneKey: getChannelHighlightDoneKey({ + channel, + scheduledAt, + }), + lockKey: getChannelHighlightLockKey({ + channel, + scheduledAt, + }), + lockValue: message.messageId || channel, + lockTtlSeconds: CHANNEL_HIGHLIGHT_LOCK_TTL_SECONDS, + doneTtlSeconds: CHANNEL_HIGHLIGHT_DONE_TTL_SECONDS, + execute: () => + generateChannelHighlight({ + con, + definition, + now, + }).then(() => undefined), }); - await setRedisObjectWithExpiry( - doneKey, - '1', - CHANNEL_HIGHLIGHT_DONE_TTL_SECONDS, - ); } catch (err) { logger.error( { ...logDetails, err }, 'Failed to generate channel highlight', ); throw err; - } finally { - await deleteRedisKey(lockKey); } }, }; diff --git a/src/workers/withRedisDoneLock.ts b/src/workers/withRedisDoneLock.ts new file mode 100644 index 0000000000..95f82931e1 --- /dev/null +++ b/src/workers/withRedisDoneLock.ts @@ -0,0 +1,43 @@ +import { + checkRedisObjectExists, + deleteRedisKey, + setRedisObjectIfNotExistsWithExpiry, + setRedisObjectWithExpiry, +} from '../redis'; + +export const withRedisDoneLock = async ({ + doneKey, + lockKey, + lockValue, + lockTtlSeconds, + doneTtlSeconds, + execute, +}: { + doneKey: string; + lockKey: string; + lockValue: string; + lockTtlSeconds: number; + doneTtlSeconds: number; + execute: () => Promise; +}): Promise => { + if (await checkRedisObjectExists(doneKey)) { + return false; + } + + const lockAcquired = await setRedisObjectIfNotExistsWithExpiry( + lockKey, + lockValue, + lockTtlSeconds, + ); + if (!lockAcquired) { + return false; + } + + try { + await execute(); + await setRedisObjectWithExpiry(doneKey, '1', doneTtlSeconds); + return true; + } finally { + await deleteRedisKey(lockKey); + } +}; From b8a71c07aa3e4c0dc8981921312d8b9925858d84 Mon Sep 17 00:00:00 2001 From: Ido Shamun <1993245+idoshamun@users.noreply.github.com> Date: Thu, 19 Mar 2026 15:02:28 +0200 Subject: [PATCH 3/5] Improve twitter highlight story keys --- __tests__/workers/generateChannelHighlight.ts | 105 +++++++++++++++++- src/common/channelHighlight/stories.ts | 67 +++++++++-- 2 files changed, 164 insertions(+), 8 deletions(-) diff --git a/__tests__/workers/generateChannelHighlight.ts b/__tests__/workers/generateChannelHighlight.ts index d2c8dbfb0e..4c4290789f 100644 --- a/__tests__/workers/generateChannelHighlight.ts +++ b/__tests__/workers/generateChannelHighlight.ts @@ -4,7 +4,12 @@ import { ChannelHighlightDefinition } from '../../src/entity/ChannelHighlightDef import { ChannelHighlightRun } from '../../src/entity/ChannelHighlightRun'; import { ChannelHighlightState } from '../../src/entity/ChannelHighlightState'; import { PostHighlight } from '../../src/entity/PostHighlight'; -import { CollectionPost, ArticlePost, Source } from '../../src/entity'; +import { + CollectionPost, + ArticlePost, + SocialTwitterPost, + Source, +} from '../../src/entity'; import { PostRelation, PostRelationType, @@ -95,6 +100,48 @@ const saveCollection = async ({ }, }); +const saveTwitterPost = async ({ + id, + title, + url, + createdAt, + channel = 'vibes', + sourceId = 'content-source', + sharedPostId, + contentMeta, +}: { + id: string; + title: string; + url: string; + createdAt: Date; + channel?: string; + sourceId?: string; + sharedPostId?: string; + contentMeta?: Record; +}) => + con.getRepository(SocialTwitterPost).save({ + id, + shortId: id, + title, + url, + canonicalUrl: url, + score: 0, + sourceId, + visible: true, + deleted: false, + banned: false, + showOnFeed: true, + createdAt, + metadataChangedAt: createdAt, + statsUpdatedAt: createdAt, + type: PostType.SocialTwitter, + sharedPostId, + contentMeta: { + channels: [channel], + ...(contentMeta || {}), + }, + }); + describe('generateChannelHighlight worker', () => { beforeEach(async () => { await con @@ -484,4 +531,60 @@ describe('generateChannelHighlight worker', () => { }), ]); }); + + it('should prefer referenced tweet identity for quote tweets', async () => { + const now = new Date('2026-03-03T13:00:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ + channel: 'vibes', + enabled: true, + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 3, + }); + await saveTwitterPost({ + id: 'tweet-quote-1', + title: 'Quote tweet', + url: 'https://x.com/quoter/status/1234567890123456789', + createdAt: new Date('2026-03-03T12:45:00.000Z'), + contentMeta: { + social_twitter: { + tweet_id: '1234567890123456789', + reference: { + tweet_id: '9876543210987654321', + url: 'https://x.com/original/status/9876543210987654321', + }, + }, + }, + }); + + const evaluatorSpy = jest + .spyOn(evaluator, 'evaluateChannelHighlights') + .mockImplementation(async ({ candidates }) => ({ + items: candidates.map((candidate, index) => ({ + storyKey: candidate.storyKey, + postId: candidate.canonicalPostId, + headline: candidate.title, + significanceScore: 0.8, + significanceLabel: 'major', + rank: index + 1, + reason: 'test', + })), + })); + + await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + worker, + { + channel: 'vibes', + scheduledAt: now.toISOString(), + }, + ); + + expect(evaluatorSpy).toHaveBeenCalledTimes(1); + expect(evaluatorSpy.mock.calls[0][0].candidates).toEqual([ + expect.objectContaining({ + canonicalPostId: 'tweet-quote-1', + storyKey: 'twitter:9876543210987654321', + }), + ]); + }); }); diff --git a/src/common/channelHighlight/stories.ts b/src/common/channelHighlight/stories.ts index b927a0d98f..1cd0b20247 100644 --- a/src/common/channelHighlight/stories.ts +++ b/src/common/channelHighlight/stories.ts @@ -1,3 +1,4 @@ +import { extractTwitterStatusId } from '../twitterSocial'; import { PostType, type PostContentQuality } from '../../entity/posts/Post'; import { PostRelation } from '../../entity/posts/PostRelation'; import { ONE_DAY_IN_SECONDS } from '../constants'; @@ -12,9 +13,6 @@ import type { HighlightStoryCandidate, } from './evaluate'; -const TWITTER_STATUS_URL_REGEX = - /(?:https?:\/\/)?(?:www\.)?(?:twitter\.com|x\.com)\/[^/?#]+\/status\/(\d+)/i; - const toLastActivityAt = (post: HighlightPost): Date => { const candidates = [ post.createdAt?.getTime() || 0, @@ -41,19 +39,74 @@ export const toQualitySummary = ( : null, }); +const getTwitterSocialMeta = ( + post: HighlightPost, +): Record | null => { + const socialTwitter = (post.contentMeta as Record | null) + ?.social_twitter; + + if (!socialTwitter || typeof socialTwitter !== 'object') { + return null; + } + + return socialTwitter as Record; +}; + +const getTwitterReferenceStatusId = ( + post: HighlightPost, +): string | undefined => { + const reference = getTwitterSocialMeta(post)?.reference; + if (!reference || typeof reference !== 'object') { + return undefined; + } + + const referenceRecord = reference as Record; + const tweetId = + typeof referenceRecord.tweet_id === 'string' + ? referenceRecord.tweet_id.trim() + : undefined; + + if (tweetId) { + return tweetId; + } + + const referenceUrl = + typeof referenceRecord.url === 'string' ? referenceRecord.url : undefined; + return extractTwitterStatusId(referenceUrl); +}; + +const getTwitterRootStatusId = (post: HighlightPost): string | undefined => { + const socialMeta = getTwitterSocialMeta(post); + const tweetId = + typeof socialMeta?.tweet_id === 'string' ? socialMeta.tweet_id.trim() : ''; + + if (tweetId) { + return tweetId; + } + + return extractTwitterStatusId(post.canonicalUrl || post.url); +}; + const getTwitterStoryKey = (post: HighlightPost): string | null => { if (post.type !== PostType.SocialTwitter) { return null; } + const referenceStatusId = getTwitterReferenceStatusId(post); + if (referenceStatusId) { + return `twitter:${referenceStatusId}`; + } + + const rootStatusId = getTwitterRootStatusId(post); + if (rootStatusId) { + return `twitter:${rootStatusId}`; + } + if (post.sharedPostId) { return `twitter:${post.sharedPostId}`; } - const match = (post.canonicalUrl || post.url || '').match( - TWITTER_STATUS_URL_REGEX, - ); - return match?.[1] ? `twitter:${match[1]}` : null; + return null; }; export const getStoryKey = ({ From eeef736c7394031b1268951767e1602b5901dabb Mon Sep 17 00:00:00 2001 From: Ido Shamun <1993245+idoshamun@users.noreply.github.com> Date: Thu, 19 Mar 2026 15:13:32 +0200 Subject: [PATCH 4/5] Refine channel highlight review follow-ups --- __tests__/cron/channelHighlights.ts | 7 +- __tests__/workers/generateChannelHighlight.ts | 6 - src/common/channelHighlight/decisions.ts | 11 +- src/common/channelHighlight/definitions.ts | 5 +- src/common/channelHighlight/generate.ts | 245 ++++++++++-------- src/common/channelHighlight/schema.ts | 2 +- src/entity/ChannelHighlightDefinition.ts | 5 +- .../1773000000000-ChannelHighlights.ts | 6 +- 8 files changed, 145 insertions(+), 142 deletions(-) diff --git a/__tests__/cron/channelHighlights.ts b/__tests__/cron/channelHighlights.ts index 55e86c0118..7282145d71 100644 --- a/__tests__/cron/channelHighlights.ts +++ b/__tests__/cron/channelHighlights.ts @@ -25,7 +25,7 @@ describe('channelHighlights cron', () => { expect(registeredCron).toBeDefined(); }); - it('should enqueue enabled highlight definitions', async () => { + it('should enqueue active highlight definitions', async () => { const triggerTypedEventSpy = jest .spyOn(typedPubsub, 'triggerTypedEvent') .mockResolvedValue(); @@ -33,22 +33,19 @@ describe('channelHighlights cron', () => { await con.getRepository(ChannelHighlightDefinition).save([ { channel: 'backend', - enabled: true, mode: 'shadow', candidateHorizonHours: 72, maxItems: 10, }, { channel: 'vibes', - enabled: true, mode: 'shadow', candidateHorizonHours: 72, maxItems: 10, }, { channel: 'disabled', - enabled: false, - mode: 'shadow', + mode: 'disabled', candidateHorizonHours: 72, maxItems: 10, }, diff --git a/__tests__/workers/generateChannelHighlight.ts b/__tests__/workers/generateChannelHighlight.ts index 4c4290789f..e9d0bfb453 100644 --- a/__tests__/workers/generateChannelHighlight.ts +++ b/__tests__/workers/generateChannelHighlight.ts @@ -193,7 +193,6 @@ describe('generateChannelHighlight worker', () => { const now = new Date('2026-03-03T10:00:00.000Z'); await con.getRepository(ChannelHighlightDefinition).save({ channel: 'vibes', - enabled: true, mode: 'shadow', candidateHorizonHours: 72, maxItems: 3, @@ -294,7 +293,6 @@ describe('generateChannelHighlight worker', () => { const now = new Date('2026-03-03T11:00:00.000Z'); await con.getRepository(ChannelHighlightDefinition).save({ channel: 'vibes', - enabled: true, mode: 'publish', candidateHorizonHours: 72, maxItems: 3, @@ -343,7 +341,6 @@ describe('generateChannelHighlight worker', () => { const now = new Date('2026-03-03T11:30:00.000Z'); await con.getRepository(ChannelHighlightDefinition).save({ channel: 'vibes', - enabled: true, mode: 'publish', candidateHorizonHours: 72, maxItems: 3, @@ -409,7 +406,6 @@ describe('generateChannelHighlight worker', () => { const now = new Date('2026-03-03T12:30:00.000Z'); await con.getRepository(ChannelHighlightDefinition).save({ channel: 'vibes', - enabled: true, mode: 'shadow', candidateHorizonHours: 72, maxItems: 3, @@ -484,7 +480,6 @@ describe('generateChannelHighlight worker', () => { const now = new Date('2026-03-03T12:00:00.000Z'); await con.getRepository(ChannelHighlightDefinition).save({ channel: 'vibes', - enabled: true, mode: 'shadow', candidateHorizonHours: 72, maxItems: 3, @@ -536,7 +531,6 @@ describe('generateChannelHighlight worker', () => { const now = new Date('2026-03-03T13:00:00.000Z'); await con.getRepository(ChannelHighlightDefinition).save({ channel: 'vibes', - enabled: true, mode: 'shadow', candidateHorizonHours: 72, maxItems: 3, diff --git a/src/common/channelHighlight/decisions.ts b/src/common/channelHighlight/decisions.ts index ce118ed983..af71b11f74 100644 --- a/src/common/channelHighlight/decisions.ts +++ b/src/common/channelHighlight/decisions.ts @@ -2,21 +2,18 @@ import type { EvaluatedHighlightItem } from './evaluate'; import type { StoredHighlightStory } from './schema'; import type { HighlightBaselineItem, HighlightStory } from './types'; -const arePostIdsEqual = (left: string[], right: string[]): boolean => - left.length === right.length && - left.every((postId, index) => postId === right[index]); +const toSortedPostIdsKey = (postIds: string[]): string => + [...postIds].sort().join('|'); const hasCachedStoryChanged = (story: HighlightStory): boolean => { if (!story.cached) { return true; } - const cachedMemberPostIds = [...story.cached.memberPostIds].sort(); - const storyMemberPostIds = story.memberPosts.map((post) => post.id).sort(); - return ( story.cached.canonicalPostId !== story.canonicalPost.id || - !arePostIdsEqual(cachedMemberPostIds, storyMemberPostIds) + toSortedPostIdsKey(story.cached.memberPostIds) !== + toSortedPostIdsKey(story.memberPosts.map((post) => post.id)) ); }; diff --git a/src/common/channelHighlight/definitions.ts b/src/common/channelHighlight/definitions.ts index 0fbc36f0ca..3ad6c01daf 100644 --- a/src/common/channelHighlight/definitions.ts +++ b/src/common/channelHighlight/definitions.ts @@ -1,4 +1,5 @@ import type { DataSource } from 'typeorm'; +import { Not } from 'typeorm'; import { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; export const getChannelHighlightDefinitions = async ({ @@ -8,7 +9,7 @@ export const getChannelHighlightDefinitions = async ({ }): Promise => con.getRepository(ChannelHighlightDefinition).find({ where: { - enabled: true, + mode: Not('disabled'), }, order: { channel: 'ASC', @@ -25,6 +26,6 @@ export const getChannelHighlightDefinitionByChannel = async ({ con.getRepository(ChannelHighlightDefinition).findOne({ where: { channel, - enabled: true, + mode: Not('disabled'), }, }); diff --git a/src/common/channelHighlight/generate.ts b/src/common/channelHighlight/generate.ts index 9834e662f2..4dd7e0310b 100644 --- a/src/common/channelHighlight/generate.ts +++ b/src/common/channelHighlight/generate.ts @@ -124,126 +124,145 @@ export const generateChannelHighlight = async ({ definition: ChannelHighlightDefinition; now?: Date; }): Promise => { - const state = await fetchDefinitionState({ - con, - channel: definition.channel, - }); - const previousPool = parseCandidatePool(state?.candidatePool); - const currentHighlights = await fetchCurrentHighlights({ - con, - channel: definition.channel, - }); - const horizonStart = getHorizonStart({ - now, - definition, - }); - const fetchStart = getFetchStart({ - now, - definition, - state, - }); - - const retainedPostIds = new Set([ - ...previousPool.stories.flatMap((story) => story.memberPostIds), - ...currentHighlights.map((item) => item.postId), - ]); - const retainedPosts = await fetchPostsByIds({ - con, - ids: [...retainedPostIds], - horizonStart, - }); - const incrementalPosts = await fetchIncrementalPosts({ - con, - channel: definition.channel, - fetchStart, - horizonStart, - }); - const basePosts = mergePosts([retainedPosts, incrementalPosts]); - const baseRelations = await fetchRelations({ - con, - postIds: basePosts.map((post) => post.id), - }); - const relationPosts = await fetchPostsByIds({ - con, - ids: [ - ...new Set( - baseRelations.flatMap((relation) => [ - relation.postId, - relation.relatedPostId, - ]), - ), - ], - horizonStart, - }); - const stories = buildStories({ - posts: mergePosts([basePosts, relationPosts]), - relations: baseRelations, - previousPool, - horizonStart, - referenceTime: now, - }); - const storiesByPostId = buildStoriesByPostId(stories); - const shortlist = stories.slice( - 0, - definition.maxItems * SHORTLIST_MULTIPLIER, + const runRepo = con.getRepository(ChannelHighlightRun); + const run = await runRepo.save( + runRepo.create({ + channel: definition.channel, + scheduledAt: now, + status: 'processing', + baselineSnapshot: [], + inputSummary: {}, + internalSnapshot: [], + comparison: {}, + metrics: {}, + }), ); - const baselineSnapshot = buildBaselineSnapshot({ - highlights: currentHighlights, - storiesByPostId, - }); - const reusedEvaluation = shouldReuseEvaluations({ - shortlist, - }); - const evaluatedItems = reusedEvaluation - ? reuseEvaluations({ - shortlist, - maxItems: definition.maxItems, - }) - : ( - await evaluateChannelHighlights({ - channel: definition.channel, + try { + const [state, currentHighlights] = await Promise.all([ + fetchDefinitionState({ + con, + channel: definition.channel, + }), + fetchCurrentHighlights({ + con, + channel: definition.channel, + }), + ]); + const previousPool = parseCandidatePool(state?.candidatePool); + const horizonStart = getHorizonStart({ + now, + definition, + }); + const fetchStart = getFetchStart({ + now, + definition, + state, + }); + + const retainedPostIds = new Set([ + ...previousPool.stories.flatMap((story) => story.memberPostIds), + ...currentHighlights.map((item) => item.postId), + ]); + const [retainedPosts, incrementalPosts] = await Promise.all([ + fetchPostsByIds({ + con, + ids: [...retainedPostIds], + horizonStart, + }), + fetchIncrementalPosts({ + con, + channel: definition.channel, + fetchStart, + horizonStart, + }), + ]); + const basePosts = mergePosts([retainedPosts, incrementalPosts]); + const baseRelations = await fetchRelations({ + con, + postIds: basePosts.map((post) => post.id), + }); + const relationPosts = await fetchPostsByIds({ + con, + ids: [ + ...new Set( + baseRelations.flatMap((relation) => [ + relation.postId, + relation.relatedPostId, + ]), + ), + ], + horizonStart, + }); + const stories = buildStories({ + posts: mergePosts([basePosts, relationPosts]), + relations: baseRelations, + previousPool, + horizonStart, + referenceTime: now, + }); + const storiesByPostId = buildStoriesByPostId(stories); + // We only send a bounded candidate set into the evaluator. The shortlist is + // intentionally larger than `maxItems` so the editorial step can still + // rank, dedupe, and discard borderline stories without seeing the entire + // pool. + const shortlist = stories.slice( + 0, + definition.maxItems * SHORTLIST_MULTIPLIER, + ); + const baselineSnapshot = buildBaselineSnapshot({ + highlights: currentHighlights, + storiesByPostId, + }); + + const reusedEvaluation = shouldReuseEvaluations({ + shortlist, + }); + const evaluatedItems = reusedEvaluation + ? reuseEvaluations({ + shortlist, maxItems: definition.maxItems, - currentHighlights: baselineSnapshot, - candidates: shortlist.map(toStoryCandidate), }) - ).items; - - const comparison = compareSnapshots({ - baseline: baselineSnapshot, - internal: evaluatedItems, - }); - const wouldPublish = shouldPublish({ - baseline: baselineSnapshot, - internal: evaluatedItems, - }); - const publish = definition.mode === 'publish' && wouldPublish; - const metrics = buildMetrics({ - incrementalPosts, - retainedPosts, - stories, - shortlist, - reusedEvaluation, - }); + : ( + await evaluateChannelHighlights({ + channel: definition.channel, + maxItems: definition.maxItems, + currentHighlights: baselineSnapshot, + candidates: shortlist.map(toStoryCandidate), + }) + ).items; - const runRepo = con.getRepository(ChannelHighlightRun); - let run = runRepo.create({ - channel: definition.channel, - scheduledAt: now, - status: 'processing', - baselineSnapshot, - inputSummary: buildInputSummary({ - fetchStart, - horizonStart, + const comparison = compareSnapshots({ + baseline: baselineSnapshot, + internal: evaluatedItems, + }); + const wouldPublish = shouldPublish({ + baseline: baselineSnapshot, + internal: evaluatedItems, + }); + const publish = definition.mode === 'publish' && wouldPublish; + const metrics = buildMetrics({ + incrementalPosts, + retainedPosts, + stories, shortlist, - }), - internalSnapshot: [], - comparison: {}, - metrics, - }); - run = await runRepo.save(run); + reusedEvaluation, + }); + await runRepo.update( + { + id: run.id, + }, + { + baselineSnapshot, + inputSummary: buildInputSummary({ + fetchStart, + horizonStart, + shortlist, + }), + metrics, + }, + ); - try { await con.transaction(async (manager) => { await manager.getRepository(ChannelHighlightState).save({ channel: definition.channel, diff --git a/src/common/channelHighlight/schema.ts b/src/common/channelHighlight/schema.ts index 3e0ac4f656..4d97a8c3f1 100644 --- a/src/common/channelHighlight/schema.ts +++ b/src/common/channelHighlight/schema.ts @@ -1,6 +1,6 @@ import { z } from 'zod'; -export const channelHighlightModes = ['shadow', 'publish'] as const; +export const channelHighlightModes = ['disabled', 'shadow', 'publish'] as const; export type ChannelHighlightMode = (typeof channelHighlightModes)[number]; export const channelHighlightStatuses = [ diff --git a/src/entity/ChannelHighlightDefinition.ts b/src/entity/ChannelHighlightDefinition.ts index 5b7e68ca45..31a19e30de 100644 --- a/src/entity/ChannelHighlightDefinition.ts +++ b/src/entity/ChannelHighlightDefinition.ts @@ -12,10 +12,7 @@ export class ChannelHighlightDefinition { @PrimaryColumn({ type: 'text' }) channel: string; - @Column({ type: 'boolean', default: false }) - enabled: boolean; - - @Column({ type: 'text', default: 'shadow' }) + @Column({ type: 'text', default: 'disabled' }) mode: ChannelHighlightMode; @Column({ type: 'smallint', default: 72 }) diff --git a/src/migration/1773000000000-ChannelHighlights.ts b/src/migration/1773000000000-ChannelHighlights.ts index 8b82c12f42..0e92339650 100644 --- a/src/migration/1773000000000-ChannelHighlights.ts +++ b/src/migration/1773000000000-ChannelHighlights.ts @@ -7,8 +7,7 @@ export class ChannelHighlights1773000000000 implements MigrationInterface { await queryRunner.query(/* sql */ ` CREATE TABLE "channel_highlight_definition" ( "channel" text NOT NULL, - "enabled" boolean NOT NULL DEFAULT false, - "mode" text NOT NULL DEFAULT 'shadow', + "mode" text NOT NULL DEFAULT 'disabled', "candidateHorizonHours" smallint NOT NULL DEFAULT 72, "maxItems" smallint NOT NULL DEFAULT 10, "createdAt" TIMESTAMP NOT NULL DEFAULT now(), @@ -57,12 +56,11 @@ export class ChannelHighlights1773000000000 implements MigrationInterface { await queryRunner.query(/* sql */ ` INSERT INTO "channel_highlight_definition" ( "channel", - "enabled", "mode", "candidateHorizonHours", "maxItems" ) - VALUES ('vibes', false, 'shadow', 72, 10) + VALUES ('vibes', 'disabled', 72, 10) ON CONFLICT ("channel") DO NOTHING `); } From eb6d8f53bbf371d091a1c97917f186af986a2549 Mon Sep 17 00:00:00 2001 From: Ido Shamun <1993245+idoshamun@users.noreply.github.com> Date: Thu, 19 Mar 2026 16:06:30 +0200 Subject: [PATCH 5/5] Add channel highlight worker subscription --- .infra/common.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.infra/common.ts b/.infra/common.ts index 4f3ffe71f2..10b9465e2e 100644 --- a/.infra/common.ts +++ b/.infra/common.ts @@ -506,6 +506,10 @@ export const workers: Worker[] = [ topic: 'api.v1.generate-channel-digest', subscription: 'api.generate-channel-digest', }, + { + topic: 'api.v1.generate-channel-highlight', + subscription: 'api.generate-channel-highlight', + }, ]; export const personalizedDigestWorkers: Worker[] = [