diff --git a/.infra/common.ts b/.infra/common.ts index 4f3ffe71f2..10b9465e2e 100644 --- a/.infra/common.ts +++ b/.infra/common.ts @@ -506,6 +506,10 @@ export const workers: Worker[] = [ topic: 'api.v1.generate-channel-digest', subscription: 'api.generate-channel-digest', }, + { + topic: 'api.v1.generate-channel-highlight', + subscription: 'api.generate-channel-highlight', + }, ]; export const personalizedDigestWorkers: Worker[] = [ diff --git a/.infra/crons.ts b/.infra/crons.ts index 57e652f97d..5d4ade5d21 100644 --- a/.infra/crons.ts +++ b/.infra/crons.ts @@ -174,6 +174,10 @@ export const crons: Cron[] = [ name: 'channel-digests', schedule: '17 4 * * *', }, + { + name: 'channel-highlights', + schedule: '12 * * * *', + }, { name: 'clean-expired-better-auth-sessions', schedule: '0 3 * * *', diff --git a/__tests__/cron/channelHighlights.ts b/__tests__/cron/channelHighlights.ts new file mode 100644 index 0000000000..7282145d71 --- /dev/null +++ b/__tests__/cron/channelHighlights.ts @@ -0,0 +1,86 @@ +import type { DataSource } from 'typeorm'; +import createOrGetConnection from '../../src/db'; +import { ChannelHighlightDefinition } from '../../src/entity/ChannelHighlightDefinition'; +import * as typedPubsub from '../../src/common/typedPubsub'; +import channelHighlights from '../../src/cron/channelHighlights'; +import { crons } from '../../src/cron/index'; + +let con: DataSource; + +beforeAll(async () => { + con = await createOrGetConnection(); +}); + +describe('channelHighlights cron', () => { + afterEach(async () => { + jest.restoreAllMocks(); + await con.getRepository(ChannelHighlightDefinition).clear(); + }); + + it('should be registered', () => { + const registeredCron = crons.find( + (item) => item.name === channelHighlights.name, + ); + + expect(registeredCron).toBeDefined(); + }); + + it('should enqueue active highlight definitions', async () => { + const triggerTypedEventSpy = jest + .spyOn(typedPubsub, 'triggerTypedEvent') + .mockResolvedValue(); + + await con.getRepository(ChannelHighlightDefinition).save([ + { + channel: 'backend', + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 10, + }, + { + channel: 'vibes', + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 10, + }, + { + channel: 'disabled', + mode: 'disabled', + candidateHorizonHours: 72, + maxItems: 10, + }, + ]); + + const startedAt = Date.now(); + await channelHighlights.handler(con, {} as never, {} as never); + const completedAt = Date.now(); + + expect(triggerTypedEventSpy.mock.calls).toEqual([ + [ + {}, + 'api.v1.generate-channel-highlight', + { + channel: 'backend', + scheduledAt: expect.any(String), + }, + ], + [ + {}, + 'api.v1.generate-channel-highlight', + { + channel: 'vibes', + scheduledAt: expect.any(String), + }, + ], + ]); + + const scheduledAt = Date.parse( + triggerTypedEventSpy.mock.calls[0][2].scheduledAt, + ); + expect(scheduledAt).toBeGreaterThanOrEqual(startedAt); + expect(scheduledAt).toBeLessThanOrEqual(completedAt); + expect(triggerTypedEventSpy.mock.calls[0][2].scheduledAt).toBe( + triggerTypedEventSpy.mock.calls[1][2].scheduledAt, + ); + }); +}); diff --git a/__tests__/workers/generateChannelHighlight.ts b/__tests__/workers/generateChannelHighlight.ts new file mode 100644 index 0000000000..e9d0bfb453 --- /dev/null +++ b/__tests__/workers/generateChannelHighlight.ts @@ -0,0 +1,584 @@ +import type { DataSource } from 'typeorm'; +import createOrGetConnection from '../../src/db'; +import { ChannelHighlightDefinition } from '../../src/entity/ChannelHighlightDefinition'; +import { ChannelHighlightRun } from '../../src/entity/ChannelHighlightRun'; +import { ChannelHighlightState } from '../../src/entity/ChannelHighlightState'; +import { PostHighlight } from '../../src/entity/PostHighlight'; +import { + CollectionPost, + ArticlePost, + SocialTwitterPost, + Source, +} from '../../src/entity'; +import { + PostRelation, + PostRelationType, +} from '../../src/entity/posts/PostRelation'; +import { PostType } from '../../src/entity/posts/Post'; +import worker from '../../src/workers/generateChannelHighlight'; +import { typedWorkers } from '../../src/workers/index'; +import { deleteKeysByPattern } from '../../src/redis'; +import * as evaluator from '../../src/common/channelHighlight/evaluate'; +import { expectSuccessfulTypedBackground } from '../helpers'; +import { createSource } from '../fixture/source'; + +let con: DataSource; + +beforeAll(async () => { + con = await createOrGetConnection(); +}); + +const saveArticle = async ({ + id, + title, + createdAt, + statsUpdatedAt = createdAt, + metadataChangedAt = createdAt, + channel = 'vibes', + sourceId = 'content-source', +}: { + id: string; + title: string; + createdAt: Date; + statsUpdatedAt?: Date; + metadataChangedAt?: Date; + channel?: string; + sourceId?: string; +}) => + con.getRepository(ArticlePost).save({ + id, + shortId: id, + title, + url: `https://example.com/${id}`, + canonicalUrl: `https://example.com/${id}`, + score: 0, + sourceId, + visible: true, + deleted: false, + banned: false, + showOnFeed: true, + createdAt, + metadataChangedAt, + statsUpdatedAt, + type: PostType.Article, + contentMeta: { + channels: [channel], + }, + }); + +const saveCollection = async ({ + id, + title, + createdAt, + channel = 'vibes', + sourceId = 'content-source', +}: { + id: string; + title: string; + createdAt: Date; + channel?: string; + sourceId?: string; +}) => + con.getRepository(CollectionPost).save({ + id, + shortId: id, + title, + url: `https://example.com/${id}`, + canonicalUrl: `https://example.com/${id}`, + score: 0, + sourceId, + visible: true, + deleted: false, + banned: false, + showOnFeed: true, + createdAt, + metadataChangedAt: createdAt, + statsUpdatedAt: createdAt, + type: PostType.Collection, + contentMeta: { + channels: [channel], + }, + }); + +const saveTwitterPost = async ({ + id, + title, + url, + createdAt, + channel = 'vibes', + sourceId = 'content-source', + sharedPostId, + contentMeta, +}: { + id: string; + title: string; + url: string; + createdAt: Date; + channel?: string; + sourceId?: string; + sharedPostId?: string; + contentMeta?: Record; +}) => + con.getRepository(SocialTwitterPost).save({ + id, + shortId: id, + title, + url, + canonicalUrl: url, + score: 0, + sourceId, + visible: true, + deleted: false, + banned: false, + showOnFeed: true, + createdAt, + metadataChangedAt: createdAt, + statsUpdatedAt: createdAt, + type: PostType.SocialTwitter, + sharedPostId, + contentMeta: { + channels: [channel], + ...(contentMeta || {}), + }, + }); + +describe('generateChannelHighlight worker', () => { + beforeEach(async () => { + await con + .getRepository(Source) + .save([ + createSource( + 'content-source', + 'Content', + 'https://daily.dev/content.png', + ), + createSource( + 'secondary-source', + 'Secondary', + 'https://daily.dev/secondary.png', + ), + ]); + }); + + afterEach(async () => { + jest.restoreAllMocks(); + await deleteKeysByPattern('channel-highlight:*'); + await con.getRepository(ChannelHighlightRun).clear(); + await con.getRepository(ChannelHighlightState).clear(); + await con.getRepository(ChannelHighlightDefinition).clear(); + await con.getRepository(PostHighlight).clear(); + await con.getRepository(PostRelation).clear(); + await con + .createQueryBuilder() + .delete() + .from('post') + .where('"sourceId" IN (:...sourceIds)', { + sourceIds: ['content-source', 'secondary-source'], + }) + .execute(); + await con + .getRepository(Source) + .delete(['content-source', 'secondary-source']); + }); + + it('should be registered', () => { + const registeredWorker = typedWorkers.find( + (item) => item.subscription === worker.subscription, + ); + + expect(registeredWorker).toBeDefined(); + }); + + it('should keep live highlights unchanged in shadow mode and store a comparison run', async () => { + const now = new Date('2026-03-03T10:00:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ + channel: 'vibes', + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 3, + }); + await saveArticle({ + id: 'live-1', + title: 'Live story', + createdAt: new Date('2026-03-03T08:00:00.000Z'), + }); + await saveCollection({ + id: 'collection-1', + title: 'Fresh collection', + createdAt: new Date('2026-03-03T09:30:00.000Z'), + }); + await saveArticle({ + id: 'child-1', + title: 'Fresh child story', + createdAt: new Date('2026-03-03T09:20:00.000Z'), + }); + await con.getRepository(PostRelation).save({ + postId: 'collection-1', + relatedPostId: 'child-1', + type: PostRelationType.Collection, + }); + await con.getRepository(PostHighlight).save({ + channel: 'vibes', + postId: 'live-1', + rank: 1, + headline: 'Live headline', + }); + + const evaluatorSpy = jest + .spyOn(evaluator, 'evaluateChannelHighlights') + .mockImplementation(async ({ candidates }) => ({ + items: candidates.slice(0, 1).map((candidate, index) => ({ + storyKey: candidate.storyKey, + postId: candidate.canonicalPostId, + headline: `${candidate.title} headline`, + significanceScore: 0.9, + significanceLabel: 'breaking', + rank: index + 1, + reason: 'test', + })), + })); + + await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + worker, + { + channel: 'vibes', + scheduledAt: now.toISOString(), + }, + ); + + expect(evaluatorSpy).toHaveBeenCalledTimes(1); + expect(evaluatorSpy.mock.calls[0][0].candidates[0]).toMatchObject({ + storyKey: 'collection:collection-1', + canonicalPostId: 'collection-1', + memberPostIds: ['child-1', 'collection-1'], + }); + + const liveHighlights = await con.getRepository(PostHighlight).find({ + where: { channel: 'vibes' }, + order: { rank: 'ASC' }, + }); + expect(liveHighlights).toHaveLength(1); + expect(liveHighlights[0]).toMatchObject({ + postId: 'live-1', + headline: 'Live headline', + }); + + const run = await con.getRepository(ChannelHighlightRun).findOneByOrFail({ + channel: 'vibes', + }); + expect(run.status).toBe('completed'); + expect(run.comparison).toMatchObject({ + wouldPublish: true, + published: false, + baselineCount: 1, + internalCount: 1, + }); + + const state = await con + .getRepository(ChannelHighlightState) + .findOneByOrFail({ + channel: 'vibes', + }); + expect(state.candidatePool).toEqual({ + stories: expect.arrayContaining([ + expect.objectContaining({ + storyKey: 'collection:collection-1', + canonicalPostId: 'collection-1', + }), + ]), + }); + }); + + it('should publish evaluated highlights in publish mode', async () => { + const now = new Date('2026-03-03T11:00:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ + channel: 'vibes', + mode: 'publish', + candidateHorizonHours: 72, + maxItems: 3, + }); + await saveArticle({ + id: 'publish-1', + title: 'Publish me', + createdAt: new Date('2026-03-03T10:30:00.000Z'), + }); + + jest.spyOn(evaluator, 'evaluateChannelHighlights').mockResolvedValue({ + items: [ + { + storyKey: 'url:https://example.com/publish-1', + postId: 'publish-1', + headline: 'Publish headline', + significanceScore: 0.95, + significanceLabel: 'breaking', + rank: 1, + reason: 'test', + }, + ], + }); + + await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + worker, + { + channel: 'vibes', + scheduledAt: now.toISOString(), + }, + ); + + const liveHighlights = await con.getRepository(PostHighlight).find({ + where: { channel: 'vibes' }, + order: { rank: 'ASC' }, + }); + expect(liveHighlights).toHaveLength(1); + expect(liveHighlights[0]).toMatchObject({ + postId: 'publish-1', + headline: 'Publish headline', + rank: 1, + }); + }); + + it('should publish a collection upgrade for the same story', async () => { + const now = new Date('2026-03-03T11:30:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ + channel: 'vibes', + mode: 'publish', + candidateHorizonHours: 72, + maxItems: 3, + }); + await saveCollection({ + id: 'col-upgrade', + title: 'Collection upgrade', + createdAt: new Date('2026-03-03T11:10:00.000Z'), + }); + await saveArticle({ + id: 'child-upgr', + title: 'Child upgrade', + createdAt: new Date('2026-03-03T11:05:00.000Z'), + }); + await con.getRepository(PostRelation).save({ + postId: 'col-upgrade', + relatedPostId: 'child-upgr', + type: PostRelationType.Collection, + }); + await con.getRepository(PostHighlight).save({ + channel: 'vibes', + postId: 'child-upgr', + rank: 1, + headline: 'Same story headline', + }); + + jest.spyOn(evaluator, 'evaluateChannelHighlights').mockResolvedValue({ + items: [ + { + storyKey: 'collection:col-upgrade', + postId: 'col-upgrade', + headline: 'Same story headline', + significanceScore: 0.96, + significanceLabel: 'breaking', + rank: 1, + reason: 'test', + }, + ], + }); + + await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + worker, + { + channel: 'vibes', + scheduledAt: now.toISOString(), + }, + ); + + const liveHighlights = await con.getRepository(PostHighlight).find({ + where: { channel: 'vibes' }, + order: { rank: 'ASC' }, + }); + expect(liveHighlights).toEqual([ + expect.objectContaining({ + postId: 'col-upgrade', + headline: 'Same story headline', + rank: 1, + }), + ]); + }); + + it('should re-evaluate a cached story when a relation changed after the last evaluation', async () => { + const now = new Date('2026-03-03T12:30:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ + channel: 'vibes', + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 3, + }); + await saveCollection({ + id: 'col-cached', + title: 'Cached collection', + createdAt: new Date('2026-03-03T11:00:00.000Z'), + }); + await saveArticle({ + id: 'child-cach', + title: 'Cached child', + createdAt: new Date('2026-03-03T10:55:00.000Z'), + }); + await con.getRepository(ChannelHighlightState).save({ + channel: 'vibes', + lastFetchedAt: new Date('2026-03-03T12:00:00.000Z'), + lastPublishedAt: null, + candidatePool: { + stories: [ + { + storyKey: 'collection:col-cached', + canonicalPostId: 'col-cached', + collectionId: 'col-cached', + memberPostIds: ['child-cach', 'col-cached'], + firstSeenAt: '2026-03-03T10:55:00.000Z', + lastSeenAt: '2026-03-03T12:00:00.000Z', + lastLlmEvaluatedAt: '2026-03-03T12:05:00.000Z', + lastSignificanceScore: 0.82, + lastSignificanceLabel: 'breaking', + lastHeadline: 'Cached headline', + status: 'active', + }, + ], + }, + }); + await con.getRepository(PostRelation).save({ + postId: 'col-cached', + relatedPostId: 'child-cach', + type: PostRelationType.Collection, + createdAt: new Date('2026-03-03T12:10:00.000Z'), + }); + + const evaluatorSpy = jest + .spyOn(evaluator, 'evaluateChannelHighlights') + .mockResolvedValue({ + items: [ + { + storyKey: 'collection:col-cached', + postId: 'col-cached', + headline: 'Fresh headline', + significanceScore: 0.91, + significanceLabel: 'breaking', + rank: 1, + reason: 'test', + }, + ], + }); + + await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + worker, + { + channel: 'vibes', + scheduledAt: now.toISOString(), + }, + ); + + expect(evaluatorSpy).toHaveBeenCalledTimes(1); + }); + + it('should exclude posts older than the configured horizon even when they were recently updated', async () => { + const now = new Date('2026-03-03T12:00:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ + channel: 'vibes', + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 3, + }); + await saveArticle({ + id: 'old-1', + title: 'Old but active', + createdAt: new Date('2026-02-20T12:00:00.000Z'), + statsUpdatedAt: new Date('2026-03-03T11:55:00.000Z'), + metadataChangedAt: new Date('2026-03-03T11:55:00.000Z'), + }); + await saveArticle({ + id: 'fresh-1', + title: 'Fresh candidate', + createdAt: new Date('2026-03-03T11:30:00.000Z'), + }); + + const evaluatorSpy = jest + .spyOn(evaluator, 'evaluateChannelHighlights') + .mockImplementation(async ({ candidates }) => ({ + items: candidates.map((candidate, index) => ({ + storyKey: candidate.storyKey, + postId: candidate.canonicalPostId, + headline: candidate.title, + significanceScore: 0.7, + significanceLabel: 'notable', + rank: index + 1, + reason: 'test', + })), + })); + + await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + worker, + { + channel: 'vibes', + scheduledAt: now.toISOString(), + }, + ); + + expect(evaluatorSpy).toHaveBeenCalledTimes(1); + expect(evaluatorSpy.mock.calls[0][0].candidates).toEqual([ + expect.objectContaining({ + canonicalPostId: 'fresh-1', + }), + ]); + }); + + it('should prefer referenced tweet identity for quote tweets', async () => { + const now = new Date('2026-03-03T13:00:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ + channel: 'vibes', + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 3, + }); + await saveTwitterPost({ + id: 'tweet-quote-1', + title: 'Quote tweet', + url: 'https://x.com/quoter/status/1234567890123456789', + createdAt: new Date('2026-03-03T12:45:00.000Z'), + contentMeta: { + social_twitter: { + tweet_id: '1234567890123456789', + reference: { + tweet_id: '9876543210987654321', + url: 'https://x.com/original/status/9876543210987654321', + }, + }, + }, + }); + + const evaluatorSpy = jest + .spyOn(evaluator, 'evaluateChannelHighlights') + .mockImplementation(async ({ candidates }) => ({ + items: candidates.map((candidate, index) => ({ + storyKey: candidate.storyKey, + postId: candidate.canonicalPostId, + headline: candidate.title, + significanceScore: 0.8, + significanceLabel: 'major', + rank: index + 1, + reason: 'test', + })), + })); + + await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + worker, + { + channel: 'vibes', + scheduledAt: now.toISOString(), + }, + ); + + expect(evaluatorSpy).toHaveBeenCalledTimes(1); + expect(evaluatorSpy.mock.calls[0][0].candidates).toEqual([ + expect.objectContaining({ + canonicalPostId: 'tweet-quote-1', + storyKey: 'twitter:9876543210987654321', + }), + ]); + }); +}); diff --git a/src/common/channelHighlight/decisions.ts b/src/common/channelHighlight/decisions.ts new file mode 100644 index 0000000000..af71b11f74 --- /dev/null +++ b/src/common/channelHighlight/decisions.ts @@ -0,0 +1,166 @@ +import type { EvaluatedHighlightItem } from './evaluate'; +import type { StoredHighlightStory } from './schema'; +import type { HighlightBaselineItem, HighlightStory } from './types'; + +const toSortedPostIdsKey = (postIds: string[]): string => + [...postIds].sort().join('|'); + +const hasCachedStoryChanged = (story: HighlightStory): boolean => { + if (!story.cached) { + return true; + } + + return ( + story.cached.canonicalPostId !== story.canonicalPost.id || + toSortedPostIdsKey(story.cached.memberPostIds) !== + toSortedPostIdsKey(story.memberPosts.map((post) => post.id)) + ); +}; + +// Reuse the prior LLM decision only when the underlying story is still the same +// story, the canonical post did not change, and no newer post/relation activity +// happened since that decision. +export const shouldReuseEvaluations = ({ + shortlist, +}: { + shortlist: HighlightStory[]; +}): boolean => + shortlist.length > 0 && + shortlist.every((story) => { + if ( + !story.cached?.lastHeadline || + story.cached.lastSignificanceScore == null || + !story.cached.lastLlmEvaluatedAt + ) { + return false; + } + + if (hasCachedStoryChanged(story)) { + return false; + } + + return ( + new Date(story.cached.lastLlmEvaluatedAt).getTime() >= + story.lastSeenAt.getTime() + ); + }); + +export const reuseEvaluations = ({ + shortlist, + maxItems, +}: { + shortlist: HighlightStory[]; + maxItems: number; +}): EvaluatedHighlightItem[] => + shortlist.slice(0, maxItems).map((story, index) => ({ + storyKey: story.storyKey, + postId: story.canonicalPost.id, + headline: story.cached?.lastHeadline || story.canonicalPost.title || '', + significanceScore: story.cached?.lastSignificanceScore || 0, + significanceLabel: story.cached?.lastSignificanceLabel || 'routine', + rank: index + 1, + reason: 'Reused cached evaluation', + })); + +export const toPoolStory = ({ + story, + evaluation, + evaluatedAt, +}: { + story: HighlightStory; + evaluation?: EvaluatedHighlightItem; + evaluatedAt: Date; +}): StoredHighlightStory => ({ + storyKey: story.storyKey, + canonicalPostId: story.canonicalPost.id, + collectionId: story.collectionId, + memberPostIds: story.memberPosts.map((post) => post.id).sort(), + firstSeenAt: story.cached?.firstSeenAt || story.firstSeenAt.toISOString(), + lastSeenAt: story.lastSeenAt.toISOString(), + lastLlmEvaluatedAt: evaluation + ? evaluatedAt.toISOString() + : story.cached?.lastLlmEvaluatedAt || null, + lastSignificanceScore: + evaluation?.significanceScore ?? + story.cached?.lastSignificanceScore ?? + null, + lastSignificanceLabel: + evaluation?.significanceLabel ?? + story.cached?.lastSignificanceLabel ?? + null, + lastHeadline: evaluation?.headline ?? story.cached?.lastHeadline ?? null, + status: story.cached?.status || 'active', +}); + +const toItemSignature = (item: { + storyKey: string; + rank: number; + postId: string; +}): string => `${item.storyKey}:${item.rank}:${item.postId}`; + +// Publishing depends on the editorial surface changing in a user-visible way: +// a different story, a different canonical post for the same story, or a +// different headline for the same ranked set. +export const shouldPublish = ({ + baseline, + internal, +}: { + baseline: HighlightBaselineItem[]; + internal: EvaluatedHighlightItem[]; +}): boolean => { + if (!baseline.length) { + return internal.length > 0; + } + + if (!internal.length) { + return false; + } + + const baselineSignature = baseline.map(toItemSignature).join('|'); + const internalSignature = internal.map(toItemSignature).join('|'); + + if (baselineSignature !== internalSignature) { + return true; + } + + const baselineHeadlines = baseline.map((item) => item.headline).join('|'); + const internalHeadlines = internal.map((item) => item.headline).join('|'); + return baselineHeadlines !== internalHeadlines; +}; + +export const compareSnapshots = ({ + baseline, + internal, +}: { + baseline: HighlightBaselineItem[]; + internal: EvaluatedHighlightItem[]; +}) => { + const baselineByStory = new Map( + baseline.map((item) => [item.storyKey, item]), + ); + const internalByStory = new Map( + internal.map((item) => [item.storyKey, item]), + ); + const overlap = [...internalByStory.keys()].filter((storyKey) => + baselineByStory.has(storyKey), + ); + + return { + baselineCount: baseline.length, + internalCount: internal.length, + overlapCount: overlap.length, + addedStoryKeys: [...internalByStory.keys()].filter( + (storyKey) => !baselineByStory.has(storyKey), + ), + removedStoryKeys: [...baselineByStory.keys()].filter( + (storyKey) => !internalByStory.has(storyKey), + ), + churnCount: + [...internalByStory.keys()].filter( + (storyKey) => !baselineByStory.has(storyKey), + ).length + + [...baselineByStory.keys()].filter( + (storyKey) => !internalByStory.has(storyKey), + ).length, + }; +}; diff --git a/src/common/channelHighlight/definitions.ts b/src/common/channelHighlight/definitions.ts new file mode 100644 index 0000000000..3ad6c01daf --- /dev/null +++ b/src/common/channelHighlight/definitions.ts @@ -0,0 +1,31 @@ +import type { DataSource } from 'typeorm'; +import { Not } from 'typeorm'; +import { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; + +export const getChannelHighlightDefinitions = async ({ + con, +}: { + con: DataSource; +}): Promise => + con.getRepository(ChannelHighlightDefinition).find({ + where: { + mode: Not('disabled'), + }, + order: { + channel: 'ASC', + }, + }); + +export const getChannelHighlightDefinitionByChannel = async ({ + con, + channel, +}: { + con: DataSource; + channel: string; +}): Promise => + con.getRepository(ChannelHighlightDefinition).findOne({ + where: { + channel, + mode: Not('disabled'), + }, + }); diff --git a/src/common/channelHighlight/evaluate.ts b/src/common/channelHighlight/evaluate.ts new file mode 100644 index 0000000000..ab0f347cc1 --- /dev/null +++ b/src/common/channelHighlight/evaluate.ts @@ -0,0 +1,127 @@ +import type { PostType } from '../../entity/posts/Post'; + +export type HighlightQualitySummary = { + clickbaitProbability: number | null; + specificity: string | null; + intent: string | null; + substanceDepth: string | null; + titleContentAlignment: string | null; + selfPromotionScore: number | null; +}; + +export type HighlightStoryCandidate = { + storyKey: string; + canonicalPostId: string; + collectionId: string | null; + memberPostIds: string[]; + title: string; + summary: string; + type: PostType; + sourceId: string; + createdAt: string; + lastActivityAt: string; + upvotes: number; + comments: number; + views: number; + contentCuration: string[]; + quality: HighlightQualitySummary; + preliminaryScore: number; +}; + +export type EvaluateChannelHighlightsRequest = { + channel: string; + maxItems: number; + currentHighlights: { + postId: string; + rank: number; + headline: string; + storyKey: string; + }[]; + candidates: HighlightStoryCandidate[]; +}; + +export type EvaluatedHighlightItem = { + storyKey: string; + postId: string; + headline: string; + significanceScore: number; + significanceLabel: string; + rank: number; + reason: string; +}; + +export type EvaluateChannelHighlightsResponse = { + items: EvaluatedHighlightItem[]; +}; + +const clampScore = (score: number): number => + Math.max(0, Math.min(1, Number(score.toFixed(3)))); + +const toHeadline = (title: string): string => title.trim().slice(0, 200); + +const toSignificance = ( + candidate: HighlightStoryCandidate, +): Pick< + EvaluatedHighlightItem, + 'significanceLabel' | 'significanceScore' | 'reason' +> => { + const curation = new Set(candidate.contentCuration); + const isBreaking = + curation.has('news') || + curation.has('release') || + curation.has('leak') || + curation.has('milestone') || + curation.has('drama'); + const strongEngagement = + candidate.upvotes + candidate.comments * 2 + candidate.views / 250; + const score = clampScore( + candidate.preliminaryScore / 100 + (isBreaking ? 0.25 : 0), + ); + + if (score >= 0.8 || (isBreaking && strongEngagement >= 20)) { + return { + significanceLabel: 'breaking', + significanceScore: score, + reason: 'Mock evaluator marked the story as breaking', + }; + } + + if (score >= 0.55) { + return { + significanceLabel: 'notable', + significanceScore: score, + reason: 'Mock evaluator marked the story as notable', + }; + } + + return { + significanceLabel: 'routine', + significanceScore: score, + reason: 'Mock evaluator marked the story as routine', + }; +}; + +// This is an API-side placeholder until the Bragi contract is finalized. +export const evaluateChannelHighlights = async ({ + maxItems, + candidates, +}: EvaluateChannelHighlightsRequest): Promise => { + const items = candidates + .sort((left, right) => right.preliminaryScore - left.preliminaryScore) + .slice(0, maxItems) + .map((candidate, index) => { + const significance = toSignificance(candidate); + return { + storyKey: candidate.storyKey, + postId: candidate.canonicalPostId, + headline: toHeadline(candidate.title), + rank: index + 1, + ...significance, + }; + }) + .filter((item) => item.significanceLabel !== 'routine' || item.rank === 1); + + return { + items, + }; +}; diff --git a/src/common/channelHighlight/generate.ts b/src/common/channelHighlight/generate.ts new file mode 100644 index 0000000000..4dd7e0310b --- /dev/null +++ b/src/common/channelHighlight/generate.ts @@ -0,0 +1,333 @@ +import type { DataSource } from 'typeorm'; +import { logger as baseLogger } from '../../logger'; +import { ChannelHighlightRun } from '../../entity/ChannelHighlightRun'; +import { ChannelHighlightState } from '../../entity/ChannelHighlightState'; +import { replaceHighlightsForChannel } from './publish'; +import { + evaluateChannelHighlights, + type EvaluatedHighlightItem, +} from './evaluate'; +import { + fetchCurrentHighlights, + fetchDefinitionState, + fetchIncrementalPosts, + fetchPostsByIds, + fetchRelations, + getFetchStart, + getHorizonStart, + mergePosts, + parseCandidatePool, +} from './queries'; +import { + buildBaselineSnapshot, + buildStories, + toStoryCandidate, +} from './stories'; +import { + compareSnapshots, + reuseEvaluations, + shouldPublish, + shouldReuseEvaluations, + toPoolStory, +} from './decisions'; +import type { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; +import type { GenerateChannelHighlightResult, HighlightStory } from './types'; + +const MAX_CANDIDATE_POOL_STORIES = 50; +const SHORTLIST_MULTIPLIER = 3; + +const buildStoriesByPostId = ( + stories: HighlightStory[], +): Map => { + const storiesByPostId = new Map(); + + for (const story of stories) { + storiesByPostId.set(story.canonicalPost.id, story); + for (const memberPost of story.memberPosts) { + storiesByPostId.set(memberPost.id, story); + } + } + + return storiesByPostId; +}; + +const buildInputSummary = ({ + fetchStart, + horizonStart, + shortlist, +}: { + fetchStart: Date; + horizonStart: Date; + shortlist: HighlightStory[]; +}) => ({ + fetchStart: fetchStart.toISOString(), + horizonStart: horizonStart.toISOString(), + shortlist: shortlist.map((story) => ({ + storyKey: story.storyKey, + canonicalPostId: story.canonicalPost.id, + preliminaryScore: story.preliminaryScore, + })), +}); + +const buildMetrics = ({ + incrementalPosts, + retainedPosts, + stories, + shortlist, + reusedEvaluation, +}: { + incrementalPosts: { length: number }; + retainedPosts: { length: number }; + stories: { length: number }; + shortlist: { length: number }; + reusedEvaluation: boolean; +}) => ({ + fetchedPosts: incrementalPosts.length, + retainedPosts: retainedPosts.length, + totalStories: stories.length, + shortlistStories: shortlist.length, + evaluatedStories: reusedEvaluation ? 0 : shortlist.length, + reusedEvaluation, +}); + +const buildCandidatePool = ({ + stories, + evaluatedItems, + now, +}: { + stories: HighlightStory[]; + evaluatedItems: EvaluatedHighlightItem[]; + now: Date; +}) => ({ + stories: stories.slice(0, MAX_CANDIDATE_POOL_STORIES).map((story) => + toPoolStory({ + story, + evaluation: evaluatedItems.find( + (item) => item.storyKey === story.storyKey, + ), + evaluatedAt: now, + }), + ), +}); + +// High-level flow: +// 1. Fetch fresh and retained posts inside the configured horizon. +// 2. Collapse them into story candidates, preferring collections. +// 3. Reuse cached editorial decisions only when the story is truly unchanged. +// 4. Compare the internal result with the live highlights, then persist the run. +export const generateChannelHighlight = async ({ + con, + definition, + now = new Date(), +}: { + con: DataSource; + definition: ChannelHighlightDefinition; + now?: Date; +}): Promise => { + const runRepo = con.getRepository(ChannelHighlightRun); + const run = await runRepo.save( + runRepo.create({ + channel: definition.channel, + scheduledAt: now, + status: 'processing', + baselineSnapshot: [], + inputSummary: {}, + internalSnapshot: [], + comparison: {}, + metrics: {}, + }), + ); + + try { + const [state, currentHighlights] = await Promise.all([ + fetchDefinitionState({ + con, + channel: definition.channel, + }), + fetchCurrentHighlights({ + con, + channel: definition.channel, + }), + ]); + const previousPool = parseCandidatePool(state?.candidatePool); + const horizonStart = getHorizonStart({ + now, + definition, + }); + const fetchStart = getFetchStart({ + now, + definition, + state, + }); + + const retainedPostIds = new Set([ + ...previousPool.stories.flatMap((story) => story.memberPostIds), + ...currentHighlights.map((item) => item.postId), + ]); + const [retainedPosts, incrementalPosts] = await Promise.all([ + fetchPostsByIds({ + con, + ids: [...retainedPostIds], + horizonStart, + }), + fetchIncrementalPosts({ + con, + channel: definition.channel, + fetchStart, + horizonStart, + }), + ]); + const basePosts = mergePosts([retainedPosts, incrementalPosts]); + const baseRelations = await fetchRelations({ + con, + postIds: basePosts.map((post) => post.id), + }); + const relationPosts = await fetchPostsByIds({ + con, + ids: [ + ...new Set( + baseRelations.flatMap((relation) => [ + relation.postId, + relation.relatedPostId, + ]), + ), + ], + horizonStart, + }); + const stories = buildStories({ + posts: mergePosts([basePosts, relationPosts]), + relations: baseRelations, + previousPool, + horizonStart, + referenceTime: now, + }); + const storiesByPostId = buildStoriesByPostId(stories); + // We only send a bounded candidate set into the evaluator. The shortlist is + // intentionally larger than `maxItems` so the editorial step can still + // rank, dedupe, and discard borderline stories without seeing the entire + // pool. + const shortlist = stories.slice( + 0, + definition.maxItems * SHORTLIST_MULTIPLIER, + ); + const baselineSnapshot = buildBaselineSnapshot({ + highlights: currentHighlights, + storiesByPostId, + }); + + const reusedEvaluation = shouldReuseEvaluations({ + shortlist, + }); + const evaluatedItems = reusedEvaluation + ? reuseEvaluations({ + shortlist, + maxItems: definition.maxItems, + }) + : ( + await evaluateChannelHighlights({ + channel: definition.channel, + maxItems: definition.maxItems, + currentHighlights: baselineSnapshot, + candidates: shortlist.map(toStoryCandidate), + }) + ).items; + + const comparison = compareSnapshots({ + baseline: baselineSnapshot, + internal: evaluatedItems, + }); + const wouldPublish = shouldPublish({ + baseline: baselineSnapshot, + internal: evaluatedItems, + }); + const publish = definition.mode === 'publish' && wouldPublish; + const metrics = buildMetrics({ + incrementalPosts, + retainedPosts, + stories, + shortlist, + reusedEvaluation, + }); + await runRepo.update( + { + id: run.id, + }, + { + baselineSnapshot, + inputSummary: buildInputSummary({ + fetchStart, + horizonStart, + shortlist, + }), + metrics, + }, + ); + + await con.transaction(async (manager) => { + await manager.getRepository(ChannelHighlightState).save({ + channel: definition.channel, + lastFetchedAt: now, + lastPublishedAt: publish ? now : state?.lastPublishedAt || null, + candidatePool: buildCandidatePool({ + stories, + evaluatedItems, + now, + }), + }); + + if (publish) { + await replaceHighlightsForChannel({ + manager, + channel: definition.channel, + items: evaluatedItems.map((item) => ({ + postId: item.postId, + rank: item.rank, + headline: item.headline, + })), + }); + } + + await manager.getRepository(ChannelHighlightRun).update( + { + id: run.id, + }, + { + status: 'completed', + completedAt: new Date(), + internalSnapshot: evaluatedItems, + comparison: { + ...comparison, + wouldPublish, + published: publish, + }, + metrics, + }, + ); + }); + + return { + run: await runRepo.findOneByOrFail({ + id: run.id, + }), + published: publish, + }; + } catch (err) { + baseLogger.error( + { err, channel: definition.channel }, + 'Failed channel highlight run', + ); + await runRepo.update( + { + id: run.id, + }, + { + status: 'failed', + completedAt: new Date(), + error: { + message: err instanceof Error ? err.message : 'Unknown error', + }, + }, + ); + throw err; + } +}; diff --git a/src/common/channelHighlight/publish.ts b/src/common/channelHighlight/publish.ts new file mode 100644 index 0000000000..1b452fde9f --- /dev/null +++ b/src/common/channelHighlight/publish.ts @@ -0,0 +1,32 @@ +import type { EntityManager } from 'typeorm'; +import { PostHighlight } from '../../entity/PostHighlight'; + +export type PublishHighlightItem = { + postId: string; + rank: number; + headline: string; +}; + +export const replaceHighlightsForChannel = async ({ + manager, + channel, + items, +}: { + manager: EntityManager; + channel: string; + items: PublishHighlightItem[]; +}): Promise => { + const repo = manager.getRepository(PostHighlight); + await repo.delete({ channel }); + + if (!items.length) { + return; + } + + await repo.insert( + items.map((item) => ({ + ...item, + channel, + })), + ); +}; diff --git a/src/common/channelHighlight/queries.ts b/src/common/channelHighlight/queries.ts new file mode 100644 index 0000000000..65fd442483 --- /dev/null +++ b/src/common/channelHighlight/queries.ts @@ -0,0 +1,184 @@ +import { Brackets, In, MoreThanOrEqual, type DataSource } from 'typeorm'; +import { ONE_HOUR_IN_SECONDS } from '../constants'; +import { ChannelHighlightState } from '../../entity/ChannelHighlightState'; +import { PostHighlight } from '../../entity/PostHighlight'; +import { Post } from '../../entity/posts/Post'; +import { + PostRelation, + PostRelationType, +} from '../../entity/posts/PostRelation'; +import { + channelHighlightCandidatePoolSchema, + emptyChannelHighlightCandidatePool, + type ChannelHighlightCandidatePool, +} from './schema'; +import type { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; +import type { HighlightPost } from './types'; + +const HIGHLIGHT_FETCH_OVERLAP_SECONDS = 10 * 60; + +export const getHorizonStart = ({ + now, + definition, +}: { + now: Date; + definition: Pick; +}): Date => + new Date( + now.getTime() - + definition.candidateHorizonHours * ONE_HOUR_IN_SECONDS * 1000, + ); + +export const getFetchStart = ({ + now, + definition, + state, +}: { + now: Date; + definition: Pick; + state: Pick | null; +}): Date => { + const horizonStart = getHorizonStart({ + now, + definition, + }); + + if (!state?.lastFetchedAt) { + return horizonStart; + } + + const overlapStart = new Date( + state.lastFetchedAt.getTime() - HIGHLIGHT_FETCH_OVERLAP_SECONDS * 1000, + ); + + return overlapStart > horizonStart ? overlapStart : horizonStart; +}; + +export const parseCandidatePool = ( + value: ChannelHighlightState['candidatePool'] | null | undefined, +): ChannelHighlightCandidatePool => { + const parsed = channelHighlightCandidatePoolSchema.safeParse(value); + return parsed.success ? parsed.data : emptyChannelHighlightCandidatePool(); +}; + +export const fetchDefinitionState = async ({ + con, + channel, +}: { + con: DataSource; + channel: string; +}): Promise => + con.getRepository(ChannelHighlightState).findOne({ + where: { + channel, + }, + }); + +export const fetchCurrentHighlights = async ({ + con, + channel, +}: { + con: DataSource; + channel: string; +}): Promise => + con.getRepository(PostHighlight).find({ + where: { + channel, + }, + order: { + rank: 'ASC', + }, + }); + +export const fetchPostsByIds = async ({ + con, + ids, + horizonStart, +}: { + con: DataSource; + ids: string[]; + horizonStart: Date; +}): Promise => { + if (!ids.length) { + return []; + } + + return con.getRepository(Post).find({ + where: { + id: In(ids), + createdAt: MoreThanOrEqual(horizonStart), + visible: true, + deleted: false, + banned: false, + showOnFeed: true, + }, + }) as unknown as Promise; +}; + +export const fetchIncrementalPosts = async ({ + con, + channel, + fetchStart, + horizonStart, +}: { + con: DataSource; + channel: string; + fetchStart: Date; + horizonStart: Date; +}): Promise => + con + .getRepository(Post) + .createQueryBuilder('post') + .where('post.createdAt >= :horizonStart', { horizonStart }) + .andWhere('post.visible = true') + .andWhere('post.deleted = false') + .andWhere('post.banned = false') + .andWhere('post.showOnFeed = true') + .andWhere(`(post."contentMeta"->'channels') ? :channel`, { channel }) + .andWhere( + new Brackets((builder) => { + builder + .where('post.createdAt >= :fetchStart', { fetchStart }) + .orWhere('post.metadataChangedAt >= :fetchStart', { fetchStart }) + .orWhere('post.statsUpdatedAt >= :fetchStart', { fetchStart }); + }), + ) + .getMany() as unknown as Promise; + +export const fetchRelations = async ({ + con, + postIds, +}: { + con: DataSource; + postIds: string[]; +}): Promise => { + if (!postIds.length) { + return []; + } + + return con + .getRepository(PostRelation) + .createQueryBuilder('relation') + .where('relation.type = :type', { + type: PostRelationType.Collection, + }) + .andWhere( + new Brackets((builder) => { + builder + .where('relation.relatedPostId IN (:...postIds)', { postIds }) + .orWhere('relation.postId IN (:...postIds)', { postIds }); + }), + ) + .getMany(); +}; + +export const mergePosts = (groups: HighlightPost[][]): HighlightPost[] => { + const byId = new Map(); + for (const posts of groups) { + for (const post of posts) { + byId.set(post.id, post); + } + } + + return [...byId.values()]; +}; diff --git a/src/common/channelHighlight/schema.ts b/src/common/channelHighlight/schema.ts new file mode 100644 index 0000000000..4d97a8c3f1 --- /dev/null +++ b/src/common/channelHighlight/schema.ts @@ -0,0 +1,39 @@ +import { z } from 'zod'; + +export const channelHighlightModes = ['disabled', 'shadow', 'publish'] as const; +export type ChannelHighlightMode = (typeof channelHighlightModes)[number]; + +export const channelHighlightStatuses = [ + 'active', + 'published', + 'dropped', +] as const; +export type ChannelHighlightStatus = (typeof channelHighlightStatuses)[number]; + +export const storedHighlightStorySchema = z.object({ + storyKey: z.string().min(1), + canonicalPostId: z.string().min(1), + collectionId: z.string().min(1).nullable(), + memberPostIds: z.array(z.string().min(1)).min(1), + firstSeenAt: z.string().datetime(), + lastSeenAt: z.string().datetime(), + lastLlmEvaluatedAt: z.string().datetime().nullable().optional(), + lastSignificanceScore: z.number().min(0).max(1).nullable().optional(), + lastSignificanceLabel: z.string().min(1).nullable().optional(), + lastHeadline: z.string().min(1).max(200).nullable().optional(), + status: z.enum(channelHighlightStatuses).default('active'), +}); + +export const channelHighlightCandidatePoolSchema = z.object({ + stories: z.array(storedHighlightStorySchema).default([]), +}); + +export type StoredHighlightStory = z.infer; +export type ChannelHighlightCandidatePool = z.infer< + typeof channelHighlightCandidatePoolSchema +>; + +export const emptyChannelHighlightCandidatePool = + (): ChannelHighlightCandidatePool => ({ + stories: [], + }); diff --git a/src/common/channelHighlight/stories.ts b/src/common/channelHighlight/stories.ts new file mode 100644 index 0000000000..1cd0b20247 --- /dev/null +++ b/src/common/channelHighlight/stories.ts @@ -0,0 +1,339 @@ +import { extractTwitterStatusId } from '../twitterSocial'; +import { PostType, type PostContentQuality } from '../../entity/posts/Post'; +import { PostRelation } from '../../entity/posts/PostRelation'; +import { ONE_DAY_IN_SECONDS } from '../constants'; +import type { ChannelHighlightCandidatePool } from './schema'; +import type { + HighlightBaselineItem, + HighlightPost, + HighlightStory, +} from './types'; +import type { + HighlightQualitySummary, + HighlightStoryCandidate, +} from './evaluate'; + +const toLastActivityAt = (post: HighlightPost): Date => { + const candidates = [ + post.createdAt?.getTime() || 0, + post.metadataChangedAt?.getTime() || 0, + post.statsUpdatedAt?.getTime() || 0, + ]; + return new Date(Math.max(...candidates)); +}; + +export const toQualitySummary = ( + quality: PostContentQuality, +): HighlightQualitySummary => ({ + clickbaitProbability: + typeof quality?.is_clickbait_probability === 'number' + ? quality.is_clickbait_probability + : null, + specificity: quality?.specificity || null, + intent: quality?.intent || null, + substanceDepth: quality?.substance_depth || null, + titleContentAlignment: quality?.title_content_alignment || null, + selfPromotionScore: + typeof quality?.self_promotion_score === 'number' + ? quality.self_promotion_score + : null, +}); + +const getTwitterSocialMeta = ( + post: HighlightPost, +): Record | null => { + const socialTwitter = (post.contentMeta as Record | null) + ?.social_twitter; + + if (!socialTwitter || typeof socialTwitter !== 'object') { + return null; + } + + return socialTwitter as Record; +}; + +const getTwitterReferenceStatusId = ( + post: HighlightPost, +): string | undefined => { + const reference = getTwitterSocialMeta(post)?.reference; + if (!reference || typeof reference !== 'object') { + return undefined; + } + + const referenceRecord = reference as Record; + const tweetId = + typeof referenceRecord.tweet_id === 'string' + ? referenceRecord.tweet_id.trim() + : undefined; + + if (tweetId) { + return tweetId; + } + + const referenceUrl = + typeof referenceRecord.url === 'string' ? referenceRecord.url : undefined; + return extractTwitterStatusId(referenceUrl); +}; + +const getTwitterRootStatusId = (post: HighlightPost): string | undefined => { + const socialMeta = getTwitterSocialMeta(post); + const tweetId = + typeof socialMeta?.tweet_id === 'string' ? socialMeta.tweet_id.trim() : ''; + + if (tweetId) { + return tweetId; + } + + return extractTwitterStatusId(post.canonicalUrl || post.url); +}; + +const getTwitterStoryKey = (post: HighlightPost): string | null => { + if (post.type !== PostType.SocialTwitter) { + return null; + } + + const referenceStatusId = getTwitterReferenceStatusId(post); + if (referenceStatusId) { + return `twitter:${referenceStatusId}`; + } + + const rootStatusId = getTwitterRootStatusId(post); + if (rootStatusId) { + return `twitter:${rootStatusId}`; + } + + if (post.sharedPostId) { + return `twitter:${post.sharedPostId}`; + } + + return null; +}; + +export const getStoryKey = ({ + canonicalPost, + collectionId, +}: { + canonicalPost: HighlightPost; + collectionId: string | null; +}): string => { + if (collectionId) { + return `collection:${collectionId}`; + } + + const twitterStoryKey = getTwitterStoryKey(canonicalPost); + if (twitterStoryKey) { + return twitterStoryKey; + } + + const canonicalUrl = canonicalPost.canonicalUrl || canonicalPost.url; + if (canonicalUrl) { + return `url:${canonicalUrl.toLowerCase().trim()}`; + } + + return `post:${canonicalPost.id}`; +}; + +const toPreliminaryScore = ({ + story, + horizonStart, + referenceTime, +}: { + story: Omit; + horizonStart: Date; + referenceTime: Date; +}): number => { + const ageSeconds = Math.max( + 1, + (referenceTime.getTime() - story.canonicalPost.createdAt.getTime()) / 1000, + ); + const horizonSeconds = Math.max( + ONE_DAY_IN_SECONDS, + (referenceTime.getTime() - horizonStart.getTime()) / 1000, + ); + const recency = Math.max(0.15, 1 - ageSeconds / horizonSeconds); + const engagement = + story.canonicalPost.upvotes + + story.canonicalPost.comments * 2 + + story.canonicalPost.views / 200; + const collectionBoost = story.collectionId ? 8 : 0; + const curationBoost = story.canonicalPost.contentCuration.some((value) => + ['news', 'release', 'milestone', 'leak', 'drama'].includes(value), + ) + ? 5 + : 0; + const quality = toQualitySummary(story.canonicalPost.contentQuality || {}); + const penalty = + (quality.clickbaitProbability || 0) * 5 + + (quality.selfPromotionScore || 0) * 3; + + return Number( + (engagement * recency + collectionBoost + curationBoost - penalty).toFixed( + 3, + ), + ); +}; + +const selectCanonicalPost = (posts: HighlightPost[]): HighlightPost => + posts.slice().sort((left, right) => { + const leftScore = left.upvotes + left.comments * 2 + left.views / 200; + const rightScore = right.upvotes + right.comments * 2 + right.views / 200; + if (leftScore !== rightScore) { + return rightScore - leftScore; + } + + return right.createdAt.getTime() - left.createdAt.getTime(); + })[0]; + +// Highlights reason about stories, not raw posts. Collections are the primary +// story boundary, while social/twitter posts fall back to a stable tweet key. +export const buildStories = ({ + posts, + relations, + previousPool, + horizonStart, + referenceTime, +}: { + posts: HighlightPost[]; + relations: PostRelation[]; + previousPool: ChannelHighlightCandidatePool; + horizonStart: Date; + referenceTime: Date; +}): HighlightStory[] => { + const postsById = new Map(posts.map((post) => [post.id, post])); + const previousStories = new Map( + previousPool.stories.map((story) => [story.storyKey, story]), + ); + const collectionByChildId = new Map(); + const collectionIds = new Set(); + const relationActivityByCollectionId = new Map(); + + for (const relation of relations) { + if ( + !postsById.has(relation.postId) || + !postsById.has(relation.relatedPostId) + ) { + continue; + } + + collectionIds.add(relation.postId); + collectionByChildId.set(relation.relatedPostId, relation.postId); + const currentActivity = relationActivityByCollectionId.get(relation.postId); + if (!currentActivity || relation.createdAt > currentActivity) { + relationActivityByCollectionId.set(relation.postId, relation.createdAt); + } + } + + const groupedStories = new Map(); + const storyCollections = new Map(); + + for (const post of posts) { + const collectionId = + collectionByChildId.get(post.id) || + (collectionIds.has(post.id) ? post.id : null); + const canonicalPost = + collectionId && postsById.has(collectionId) + ? postsById.get(collectionId)! + : post; + const storyKey = getStoryKey({ + canonicalPost, + collectionId, + }); + + if (!groupedStories.has(storyKey)) { + groupedStories.set(storyKey, []); + storyCollections.set(storyKey, collectionId); + } + + groupedStories.get(storyKey)!.push(post); + } + + return [...groupedStories.entries()] + .map(([storyKey, memberPosts]) => { + const collectionId = storyCollections.get(storyKey) || null; + const canonicalPost = + collectionId && postsById.has(collectionId) + ? postsById.get(collectionId)! + : selectCanonicalPost(memberPosts); + const firstSeenAt = memberPosts.reduce( + (current, post) => + post.createdAt < current ? post.createdAt : current, + canonicalPost.createdAt, + ); + const postActivityAt = memberPosts.reduce((current, post) => { + const lastActivityAt = toLastActivityAt(post); + return lastActivityAt > current ? lastActivityAt : current; + }, toLastActivityAt(canonicalPost)); + const relationActivityAt = collectionId + ? relationActivityByCollectionId.get(collectionId) + : null; + const baseStory = { + storyKey, + canonicalPost, + memberPosts, + collectionId, + firstSeenAt, + lastSeenAt: + relationActivityAt && relationActivityAt > postActivityAt + ? relationActivityAt + : postActivityAt, + cached: previousStories.get(storyKey) || null, + }; + + return { + ...baseStory, + preliminaryScore: toPreliminaryScore({ + story: baseStory, + horizonStart, + referenceTime, + }), + }; + }) + .filter((story) => story.lastSeenAt >= horizonStart) + .sort((left, right) => right.preliminaryScore - left.preliminaryScore); +}; + +export const toStoryCandidate = ( + story: HighlightStory, +): HighlightStoryCandidate => ({ + storyKey: story.storyKey, + canonicalPostId: story.canonicalPost.id, + collectionId: story.collectionId, + memberPostIds: story.memberPosts.map((post) => post.id).sort(), + title: story.canonicalPost.title || '', + summary: story.canonicalPost.summary || '', + type: story.canonicalPost.type, + sourceId: story.canonicalPost.sourceId, + createdAt: story.canonicalPost.createdAt.toISOString(), + lastActivityAt: story.lastSeenAt.toISOString(), + upvotes: story.canonicalPost.upvotes, + comments: story.canonicalPost.comments, + views: story.canonicalPost.views, + contentCuration: story.canonicalPost.contentCuration || [], + quality: toQualitySummary(story.canonicalPost.contentQuality || {}), + preliminaryScore: story.preliminaryScore, +}); + +const toBaselineStoryKey = ({ + postId, + storiesByPostId, +}: { + postId: string; + storiesByPostId: Map; +}): string => storiesByPostId.get(postId)?.storyKey || `post:${postId}`; + +export const buildBaselineSnapshot = ({ + highlights, + storiesByPostId, +}: { + highlights: { postId: string; rank: number; headline: string }[]; + storiesByPostId: Map; +}): HighlightBaselineItem[] => + highlights.map((highlight) => ({ + postId: highlight.postId, + rank: highlight.rank, + headline: highlight.headline, + storyKey: toBaselineStoryKey({ + postId: highlight.postId, + storiesByPostId, + }), + })); diff --git a/src/common/channelHighlight/types.ts b/src/common/channelHighlight/types.ts new file mode 100644 index 0000000000..012f1fa5b1 --- /dev/null +++ b/src/common/channelHighlight/types.ts @@ -0,0 +1,52 @@ +import type { Post } from '../../entity/posts/Post'; +import type { ChannelHighlightRun } from '../../entity/ChannelHighlightRun'; +import type { StoredHighlightStory } from './schema'; + +export type HighlightPost = Pick< + Post, + | 'id' + | 'type' + | 'title' + | 'summary' + | 'createdAt' + | 'metadataChangedAt' + | 'statsUpdatedAt' + | 'upvotes' + | 'comments' + | 'views' + | 'sourceId' + | 'contentCuration' + | 'contentQuality' + | 'visible' + | 'deleted' + | 'banned' + | 'showOnFeed' + | 'contentMeta' +> & { + url: string | null; + canonicalUrl: string | null; + sharedPostId?: string | null; +}; + +export type HighlightStory = { + storyKey: string; + canonicalPost: HighlightPost; + memberPosts: HighlightPost[]; + collectionId: string | null; + firstSeenAt: Date; + lastSeenAt: Date; + preliminaryScore: number; + cached?: StoredHighlightStory | null; +}; + +export type HighlightBaselineItem = { + postId: string; + rank: number; + headline: string; + storyKey: string; +}; + +export type GenerateChannelHighlightResult = { + run: ChannelHighlightRun; + published: boolean; +}; diff --git a/src/common/typedPubsub.ts b/src/common/typedPubsub.ts index 582c7dd25f..6ac759f014 100644 --- a/src/common/typedPubsub.ts +++ b/src/common/typedPubsub.ts @@ -297,6 +297,10 @@ export type PubSubSchema = { digestKey: string; scheduledAt: string; }; + 'api.v1.generate-channel-highlight': { + channel: string; + scheduledAt: string; + }; }; export async function triggerTypedEvent( diff --git a/src/cron/channelHighlights.ts b/src/cron/channelHighlights.ts new file mode 100644 index 0000000000..5e67545b38 --- /dev/null +++ b/src/cron/channelHighlights.ts @@ -0,0 +1,24 @@ +import { getChannelHighlightDefinitions } from '../common/channelHighlight/definitions'; +import { triggerTypedEvent } from '../common/typedPubsub'; +import { Cron } from './cron'; + +const cron: Cron = { + name: 'channel-highlights', + handler: async (con, logger) => { + const scheduledAt = new Date().toISOString(); + const definitions = await getChannelHighlightDefinitions({ + con, + }); + + await Promise.all( + definitions.map(({ channel }) => + triggerTypedEvent(logger, 'api.v1.generate-channel-highlight', { + channel, + scheduledAt, + }), + ), + ); + }, +}; + +export default cron; diff --git a/src/cron/index.ts b/src/cron/index.ts index 23dff9331c..47897dd22c 100644 --- a/src/cron/index.ts +++ b/src/cron/index.ts @@ -33,6 +33,7 @@ import rotateDailyQuests from './rotateDailyQuests'; import rotateWeeklyQuests from './rotateWeeklyQuests'; import backfillGearCategory from './backfillGearCategory'; import channelDigests from './channelDigests'; +import channelHighlights from './channelHighlights'; import { cleanExpiredBetterAuthSessions } from './cleanExpiredBetterAuthSessions'; export const crons: Cron[] = [ @@ -70,5 +71,6 @@ export const crons: Cron[] = [ rotateWeeklyQuests, backfillGearCategory, channelDigests, + channelHighlights, cleanExpiredBetterAuthSessions, ]; diff --git a/src/entity/ChannelHighlightDefinition.ts b/src/entity/ChannelHighlightDefinition.ts new file mode 100644 index 0000000000..31a19e30de --- /dev/null +++ b/src/entity/ChannelHighlightDefinition.ts @@ -0,0 +1,29 @@ +import { + Column, + CreateDateColumn, + Entity, + PrimaryColumn, + UpdateDateColumn, +} from 'typeorm'; +import type { ChannelHighlightMode } from '../common/channelHighlight/schema'; + +@Entity() +export class ChannelHighlightDefinition { + @PrimaryColumn({ type: 'text' }) + channel: string; + + @Column({ type: 'text', default: 'disabled' }) + mode: ChannelHighlightMode; + + @Column({ type: 'smallint', default: 72 }) + candidateHorizonHours: number; + + @Column({ type: 'smallint', default: 10 }) + maxItems: number; + + @CreateDateColumn() + createdAt: Date; + + @UpdateDateColumn() + updatedAt: Date; +} diff --git a/src/entity/ChannelHighlightRun.ts b/src/entity/ChannelHighlightRun.ts new file mode 100644 index 0000000000..baf8bf7920 --- /dev/null +++ b/src/entity/ChannelHighlightRun.ts @@ -0,0 +1,50 @@ +import { + Column, + CreateDateColumn, + Entity, + Index, + PrimaryGeneratedColumn, +} from 'typeorm'; + +@Entity() +@Index('IDX_channel_highlight_run_channel_scheduledAt', [ + 'channel', + 'scheduledAt', +]) +export class ChannelHighlightRun { + @PrimaryGeneratedColumn('uuid') + id: string; + + @Column({ type: 'text' }) + channel: string; + + @Column({ type: 'timestamp' }) + scheduledAt: Date; + + @CreateDateColumn() + startedAt: Date; + + @Column({ type: 'timestamp', nullable: true }) + completedAt: Date | null; + + @Column({ type: 'text' }) + status: string; + + @Column({ type: 'jsonb', default: () => `'[]'::jsonb` }) + baselineSnapshot: object[]; + + @Column({ type: 'jsonb', default: () => `'{}'::jsonb` }) + inputSummary: object; + + @Column({ type: 'jsonb', default: () => `'[]'::jsonb` }) + internalSnapshot: object[]; + + @Column({ type: 'jsonb', default: () => `'{}'::jsonb` }) + comparison: object; + + @Column({ type: 'jsonb', default: () => `'{}'::jsonb` }) + metrics: object; + + @Column({ type: 'jsonb', nullable: true }) + error: object | null; +} diff --git a/src/entity/ChannelHighlightState.ts b/src/entity/ChannelHighlightState.ts new file mode 100644 index 0000000000..a2ce2dd29d --- /dev/null +++ b/src/entity/ChannelHighlightState.ts @@ -0,0 +1,20 @@ +import { Column, Entity, PrimaryColumn, UpdateDateColumn } from 'typeorm'; +import type { ChannelHighlightCandidatePool } from '../common/channelHighlight/schema'; + +@Entity() +export class ChannelHighlightState { + @PrimaryColumn({ type: 'text' }) + channel: string; + + @Column({ type: 'timestamp', nullable: true }) + lastFetchedAt: Date | null; + + @Column({ type: 'timestamp', nullable: true }) + lastPublishedAt: Date | null; + + @Column({ type: 'jsonb', default: () => `'{"stories":[]}'::jsonb` }) + candidatePool: ChannelHighlightCandidatePool; + + @UpdateDateColumn() + updatedAt: Date; +} diff --git a/src/entity/index.ts b/src/entity/index.ts index ed4554f7ee..5ebee308ea 100644 --- a/src/entity/index.ts +++ b/src/entity/index.ts @@ -48,3 +48,6 @@ export * from './Quest'; export * from './QuestReward'; export * from './QuestRotation'; export * from './PostHighlight'; +export * from './ChannelHighlightDefinition'; +export * from './ChannelHighlightState'; +export * from './ChannelHighlightRun'; diff --git a/src/migration/1773000000000-ChannelHighlights.ts b/src/migration/1773000000000-ChannelHighlights.ts new file mode 100644 index 0000000000..0e92339650 --- /dev/null +++ b/src/migration/1773000000000-ChannelHighlights.ts @@ -0,0 +1,82 @@ +import type { MigrationInterface, QueryRunner } from 'typeorm'; + +export class ChannelHighlights1773000000000 implements MigrationInterface { + name = 'ChannelHighlights1773000000000'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(/* sql */ ` + CREATE TABLE "channel_highlight_definition" ( + "channel" text NOT NULL, + "mode" text NOT NULL DEFAULT 'disabled', + "candidateHorizonHours" smallint NOT NULL DEFAULT 72, + "maxItems" smallint NOT NULL DEFAULT 10, + "createdAt" TIMESTAMP NOT NULL DEFAULT now(), + "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT "PK_channel_highlight_definition_channel" + PRIMARY KEY ("channel") + ) + `); + + await queryRunner.query(/* sql */ ` + CREATE TABLE "channel_highlight_state" ( + "channel" text NOT NULL, + "lastFetchedAt" TIMESTAMP, + "lastPublishedAt" TIMESTAMP, + "candidatePool" jsonb NOT NULL DEFAULT '{"stories":[]}'::jsonb, + "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT "PK_channel_highlight_state_channel" + PRIMARY KEY ("channel") + ) + `); + + await queryRunner.query(/* sql */ ` + CREATE TABLE "channel_highlight_run" ( + "id" uuid NOT NULL DEFAULT uuid_generate_v4(), + "channel" text NOT NULL, + "scheduledAt" TIMESTAMP NOT NULL, + "startedAt" TIMESTAMP NOT NULL DEFAULT now(), + "completedAt" TIMESTAMP, + "status" text NOT NULL, + "baselineSnapshot" jsonb NOT NULL DEFAULT '[]'::jsonb, + "inputSummary" jsonb NOT NULL DEFAULT '{}'::jsonb, + "internalSnapshot" jsonb NOT NULL DEFAULT '[]'::jsonb, + "comparison" jsonb NOT NULL DEFAULT '{}'::jsonb, + "metrics" jsonb NOT NULL DEFAULT '{}'::jsonb, + "error" jsonb, + CONSTRAINT "PK_channel_highlight_run_id" + PRIMARY KEY ("id") + ) + `); + + await queryRunner.query(/* sql */ ` + CREATE INDEX "IDX_channel_highlight_run_channel_scheduledAt" + ON "channel_highlight_run" ("channel", "scheduledAt") + `); + + await queryRunner.query(/* sql */ ` + INSERT INTO "channel_highlight_definition" ( + "channel", + "mode", + "candidateHorizonHours", + "maxItems" + ) + VALUES ('vibes', 'disabled', 72, 10) + ON CONFLICT ("channel") DO NOTHING + `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(/* sql */ ` + DROP INDEX IF EXISTS "IDX_channel_highlight_run_channel_scheduledAt" + `); + await queryRunner.query(/* sql */ ` + DROP TABLE IF EXISTS "channel_highlight_run" + `); + await queryRunner.query(/* sql */ ` + DROP TABLE IF EXISTS "channel_highlight_state" + `); + await queryRunner.query(/* sql */ ` + DROP TABLE IF EXISTS "channel_highlight_definition" + `); + } +} diff --git a/src/workers/generateChannelDigest.ts b/src/workers/generateChannelDigest.ts index 1d70994d4b..889a5ad1a9 100644 --- a/src/workers/generateChannelDigest.ts +++ b/src/workers/generateChannelDigest.ts @@ -5,13 +5,8 @@ import { ONE_MINUTE_IN_SECONDS, ONE_WEEK_IN_SECONDS, } from '../common/constants'; -import { - checkRedisObjectExists, - deleteRedisKey, - setRedisObjectIfNotExistsWithExpiry, - setRedisObjectWithExpiry, -} from '../redis'; import { TypedWorker } from './worker'; +import { withRedisDoneLock } from './withRedisDoneLock'; const CHANNEL_DIGEST_LOCK_TTL_SECONDS = 10 * ONE_MINUTE_IN_SECONDS; @@ -62,43 +57,29 @@ const worker: TypedWorker<'api.v1.generate-channel-digest'> = { return; } - const doneKey = getChannelDigestDoneKey({ - digestKey, - scheduledAt, - }); - if (await checkRedisObjectExists(doneKey)) { - return; - } - - const lockKey = getChannelDigestLockKey({ - digestKey, - scheduledAt, - }); - const lockAcquired = await setRedisObjectIfNotExistsWithExpiry( - lockKey, - message.messageId || digestKey, - CHANNEL_DIGEST_LOCK_TTL_SECONDS, - ); - if (!lockAcquired) { - return; - } - try { - await generateChannelDigest({ - con, - definition, - now, + await withRedisDoneLock({ + doneKey: getChannelDigestDoneKey({ + digestKey, + scheduledAt, + }), + lockKey: getChannelDigestLockKey({ + digestKey, + scheduledAt, + }), + lockValue: message.messageId || digestKey, + lockTtlSeconds: CHANNEL_DIGEST_LOCK_TTL_SECONDS, + doneTtlSeconds: getChannelDigestDoneTtl(definition.frequency), + execute: () => + generateChannelDigest({ + con, + definition, + now, + }).then(() => undefined), }); - await setRedisObjectWithExpiry( - doneKey, - '1', - getChannelDigestDoneTtl(definition.frequency), - ); } catch (err) { logger.error({ ...logDetails, err }, 'Failed to generate channel digest'); throw err; - } finally { - await deleteRedisKey(lockKey); } }, }; diff --git a/src/workers/generateChannelHighlight.ts b/src/workers/generateChannelHighlight.ts new file mode 100644 index 0000000000..93ea81b8a4 --- /dev/null +++ b/src/workers/generateChannelHighlight.ts @@ -0,0 +1,77 @@ +import { ONE_DAY_IN_SECONDS, ONE_MINUTE_IN_SECONDS } from '../common/constants'; +import { getChannelHighlightDefinitionByChannel } from '../common/channelHighlight/definitions'; +import { generateChannelHighlight } from '../common/channelHighlight/generate'; +import { TypedWorker } from './worker'; +import { withRedisDoneLock } from './withRedisDoneLock'; + +const CHANNEL_HIGHLIGHT_LOCK_TTL_SECONDS = 10 * ONE_MINUTE_IN_SECONDS; +const CHANNEL_HIGHLIGHT_DONE_TTL_SECONDS = 2 * ONE_DAY_IN_SECONDS; + +const getChannelHighlightDoneKey = ({ + channel, + scheduledAt, +}: { + channel: string; + scheduledAt: string; +}): string => `channel-highlight:done:${channel}:${scheduledAt}`; + +const getChannelHighlightLockKey = ({ + channel, + scheduledAt, +}: { + channel: string; + scheduledAt: string; +}): string => `channel-highlight:lock:${channel}:${scheduledAt}`; + +const worker: TypedWorker<'api.v1.generate-channel-highlight'> = { + subscription: 'api.generate-channel-highlight', + handler: async (message, con, logger): Promise => { + const { channel, scheduledAt } = message.data; + const logDetails = { channel, scheduledAt, messageId: message.messageId }; + const definition = await getChannelHighlightDefinitionByChannel({ + con, + channel, + }); + + if (!definition) { + logger.error(logDetails, 'Channel highlight definition not found'); + return; + } + + const now = new Date(scheduledAt); + if (Number.isNaN(now.getTime())) { + logger.error(logDetails, 'Channel highlight scheduledAt is invalid'); + return; + } + + try { + await withRedisDoneLock({ + doneKey: getChannelHighlightDoneKey({ + channel, + scheduledAt, + }), + lockKey: getChannelHighlightLockKey({ + channel, + scheduledAt, + }), + lockValue: message.messageId || channel, + lockTtlSeconds: CHANNEL_HIGHLIGHT_LOCK_TTL_SECONDS, + doneTtlSeconds: CHANNEL_HIGHLIGHT_DONE_TTL_SECONDS, + execute: () => + generateChannelHighlight({ + con, + definition, + now, + }).then(() => undefined), + }); + } catch (err) { + logger.error( + { ...logDetails, err }, + 'Failed to generate channel highlight', + ); + throw err; + } + }, +}; + +export default worker; diff --git a/src/workers/index.ts b/src/workers/index.ts index dd986c8d54..cb2c86f9f7 100644 --- a/src/workers/index.ts +++ b/src/workers/index.ts @@ -83,6 +83,7 @@ import feedbackUpdatedSlack from './feedbackUpdatedSlack'; import gearClassify from './gearClassify'; import agenticDigestTweet from './agenticDigestTweet'; import generateChannelDigest from './generateChannelDigest'; +import generateChannelHighlight from './generateChannelHighlight'; import { jobExecuteWorker } from './job/jobExecute'; import workerJobDeadLetterLog from './workerJobDeadLetterLog'; @@ -171,6 +172,7 @@ export const typedWorkers: BaseTypedWorker[] = [ gearClassify, agenticDigestTweet, generateChannelDigest, + generateChannelHighlight, ]; export const personalizedDigestWorkers: Worker[] = [ diff --git a/src/workers/withRedisDoneLock.ts b/src/workers/withRedisDoneLock.ts new file mode 100644 index 0000000000..95f82931e1 --- /dev/null +++ b/src/workers/withRedisDoneLock.ts @@ -0,0 +1,43 @@ +import { + checkRedisObjectExists, + deleteRedisKey, + setRedisObjectIfNotExistsWithExpiry, + setRedisObjectWithExpiry, +} from '../redis'; + +export const withRedisDoneLock = async ({ + doneKey, + lockKey, + lockValue, + lockTtlSeconds, + doneTtlSeconds, + execute, +}: { + doneKey: string; + lockKey: string; + lockValue: string; + lockTtlSeconds: number; + doneTtlSeconds: number; + execute: () => Promise; +}): Promise => { + if (await checkRedisObjectExists(doneKey)) { + return false; + } + + const lockAcquired = await setRedisObjectIfNotExistsWithExpiry( + lockKey, + lockValue, + lockTtlSeconds, + ); + if (!lockAcquired) { + return false; + } + + try { + await execute(); + await setRedisObjectWithExpiry(doneKey, '1', doneTtlSeconds); + return true; + } finally { + await deleteRedisKey(lockKey); + } +};