From 47872818304e684ce42af1bae25ceb3ed5df62ea Mon Sep 17 00:00:00 2001 From: capJavert Date: Tue, 20 Jan 2026 21:28:45 +0100 Subject: [PATCH 1/4] feat: anon login claim opportunities --- __tests__/cron/cleanZombieOpportunities.ts | 50 +++++++++++++++--- src/common/opportunity/parse.ts | 15 +----- src/common/opportunity/user.ts | 51 +++++++++++-------- src/common/paddle/index.ts | 6 +-- src/cron/cleanZombieOpportunities.ts | 47 ++++++++++++----- src/entity/ClaimableItem.ts | 6 ++- src/entity/opportunities/Opportunity.ts | 1 - src/entity/user/utils.ts | 2 +- .../1768937354866-ClaimableItemIdentifier.ts | 27 ++++++++++ src/schema/opportunity.ts | 33 +++++++++--- 10 files changed, 170 insertions(+), 68 deletions(-) create mode 100644 src/migration/1768937354866-ClaimableItemIdentifier.ts diff --git a/__tests__/cron/cleanZombieOpportunities.ts b/__tests__/cron/cleanZombieOpportunities.ts index 7f8c35dae2..67ba9b8509 100644 --- a/__tests__/cron/cleanZombieOpportunities.ts +++ b/__tests__/cron/cleanZombieOpportunities.ts @@ -12,8 +12,13 @@ import { randomUUID } from 'node:crypto'; import { OpportunityJob } from '../../src/entity/opportunities/OpportunityJob'; import { subDays } from 'date-fns'; import { Organization } from '../../src/entity/Organization'; -import { Opportunity } from '../../src/entity/opportunities/Opportunity'; import { DatasetLocation } from '../../src/entity/dataset/DatasetLocation'; +import { + ClaimableItem, + ClaimableItemTypes, +} from '../../src/entity/ClaimableItem'; +import { Opportunity } from '../../src/entity/opportunities/Opportunity'; +import { OpportunityState } from '@dailydotdev/schema'; let con: DataSource; @@ -33,20 +38,19 @@ beforeEach(async () => { opportunitiesFixture.map((opportunity) => { return { ...opportunity, + state: OpportunityState.DRAFT, id: randomUUID(), }; }), ); - await saveFixtures( - con, - OpportunityJob, + const opps1 = await con.getRepository(OpportunityJob).save( opportunitiesFixture.map((opportunity) => { return { ...opportunity, + state: OpportunityState.DRAFT, id: randomUUID(), organizationId: null, - flags: { anonUserId: randomUUID() }, createdAt: subDays(new Date(), 3), }; }), @@ -54,17 +58,43 @@ beforeEach(async () => { await saveFixtures( con, - OpportunityJob, + ClaimableItem, + opps1.map((opportunity) => { + return { + identifier: randomUUID(), + type: ClaimableItemTypes.Opportunity, + flags: { + opportunityId: opportunity.id, + }, + }; + }), + ); + + const opps2 = await con.getRepository(OpportunityJob).save( opportunitiesFixture.map((opportunity) => { return { ...opportunity, + state: OpportunityState.DRAFT, id: randomUUID(), organizationId: null, - flags: { anonUserId: randomUUID() }, createdAt: subDays(new Date(), 1), }; }), ); + + await saveFixtures( + con, + ClaimableItem, + opps2.map((opportunity) => { + return { + identifier: randomUUID(), + type: ClaimableItemTypes.Opportunity, + flags: { + opportunityId: opportunity.id, + }, + }; + }), + ); }); describe('cleanZombieOpportunities cron', () => { @@ -79,10 +109,14 @@ describe('cleanZombieOpportunities cron', () => { expect(opportunities.length).toEqual(15); + const claimableItems = await con.getRepository(ClaimableItem).find(); + const zombieOpportunitiesCount = opportunities.filter((opportunity) => { return ( !(opportunity as OpportunityJob).organizationId && - opportunity.flags?.anonUserId && + claimableItems.some( + (item) => item.flags.opportunityId === opportunity.id, + ) && opportunity.createdAt < subDays(new Date(), 2) ); }).length; diff --git a/src/common/opportunity/parse.ts b/src/common/opportunity/parse.ts index 6734132dbf..3956bff7aa 100644 --- a/src/common/opportunity/parse.ts +++ b/src/common/opportunity/parse.ts @@ -13,10 +13,7 @@ import { import { getBufferFromStream } from '../utils'; import { ValidationError } from 'apollo-server-errors'; import { garmScraperService } from '../scraper'; -import { - acceptedOpportunityFileTypes, - opportunityMatchBatchSize, -} from '../../types'; +import { acceptedOpportunityFileTypes } from '../../types'; import { getBrokkrClient } from '../brokkr'; import { opportunityCreateParseSchema } from '../schema/opportunities'; import { markdown, sanitizeHtml } from '../markdown'; @@ -25,7 +22,6 @@ import { OpportunityLocation } from '../../entity/opportunities/OpportunityLocat import { OpportunityKeyword } from '../../entity/OpportunityKeyword'; import { findDatasetLocation } from '../../entity/dataset/utils'; import { addOpportunityDefaultQuestionFeedback } from './question'; -import type { Opportunity } from '../../entity/opportunities/Opportunity'; import { EntityManager } from 'typeorm'; import { logger } from '../../logger'; import { randomUUID } from 'node:crypto'; @@ -298,14 +294,6 @@ export async function createOpportunityFromParsedData( const locationData = parsedOpportunity.location || []; return ctx.con.transaction(async (entityManager) => { - const flags: Opportunity['flags'] = {}; - - if (!ctx.userId) { - flags.anonUserId = ctx.trackingId; - } - - flags.batchSize = opportunityMatchBatchSize; - // eslint-disable-next-line @typescript-eslint/no-unused-vars const { location, ...opportunityData } = parsedOpportunity; @@ -336,7 +324,6 @@ export async function createOpportunityFromParsedData( ...opportunityData, state: OpportunityState.DRAFT, content, - flags, } as DeepPartial), ); diff --git a/src/common/opportunity/user.ts b/src/common/opportunity/user.ts index 7a4498b67c..b218f52214 100644 --- a/src/common/opportunity/user.ts +++ b/src/common/opportunity/user.ts @@ -1,9 +1,9 @@ -import { IsNull, type EntityManager } from 'typeorm'; +import { In, IsNull, type EntityManager } from 'typeorm'; import { OpportunityJob } from '../../entity/opportunities/OpportunityJob'; -import { updateFlagsStatement } from '../utils'; import type { Opportunity } from '../../entity/opportunities/Opportunity'; import { OpportunityUserRecruiter } from '../../entity/opportunities/user/OpportunityUserRecruiter'; import { logger } from '../../logger'; +import { ClaimableItem, ClaimableItemTypes } from '../../entity/ClaimableItem'; export const claimAnonOpportunities = async ({ anonUserId, @@ -20,25 +20,26 @@ export const claimAnonOpportunities = async ({ } const result = await con.transaction(async (entityManager) => { - const opportunityUpdateResult = await entityManager - .getRepository(OpportunityJob) - .createQueryBuilder() - .update() - .set({ - flags: updateFlagsStatement({ - anonUserId: null, - }), - }) - .where("flags->>'anonUserId' = :anonUserId", { - anonUserId, - }) - .andWhere({ - organizationId: IsNull(), // only claim opportunities not linked to an organization yet - }) - .returning(['id']) - .execute(); + const claimableItems = await entityManager + .getRepository(ClaimableItem) + .findBy({ + identifier: anonUserId, + type: ClaimableItemTypes.Opportunity, + claimedById: IsNull(), + }); - const opportunities = opportunityUpdateResult.raw as { id: string }[]; + const opportunities = await entityManager + .getRepository(OpportunityJob) + .find({ + where: { + id: In( + claimableItems + .filter((item) => item.flags.opportunityId) + .map((item) => item.flags.opportunityId), + ), + organizationId: IsNull(), // only claim opportunities not linked to an organization yet + }, + }); const opportunityUserUpsertResult = await entityManager .getRepository(OpportunityUserRecruiter) @@ -56,6 +57,16 @@ export const claimAnonOpportunities = async ({ }, ); + await entityManager.getRepository(ClaimableItem).update( + { + id: In(claimableItems.map((item) => item.id)), + }, + { + claimedById: userId, + claimedAt: new Date(), + }, + ); + return opportunityUserUpsertResult.identifiers.map((item) => { return { id: item.opportunityId, diff --git a/src/common/paddle/index.ts b/src/common/paddle/index.ts index fc1722fb0d..152de121bd 100644 --- a/src/common/paddle/index.ts +++ b/src/common/paddle/index.ts @@ -261,7 +261,7 @@ export const updateClaimableItem = async ( const existingEntries = await con.getRepository(ClaimableItem).find({ where: { - email: customer.email, + identifier: customer.email, claimedAt: IsNull(), }, }); @@ -272,7 +272,7 @@ export const updateClaimableItem = async ( await con.getRepository(ClaimableItem).insert({ type: ClaimableItemTypes.Plus, - email: customer.email, + identifier: customer.email, flags: { cycle: extractSubscriptionCycle(data.items), createdAt: data.startedAt, @@ -296,7 +296,7 @@ export const dropClaimableItem = async ( const customer = await paddleInstance.customers.get(data.customerId); await con.getRepository(ClaimableItem).delete({ - email: customer.email, + identifier: customer.email, claimedAt: IsNull(), flags: JsonContains({ subscriptionId: data.id, diff --git a/src/cron/cleanZombieOpportunities.ts b/src/cron/cleanZombieOpportunities.ts index 6581f524e9..1c7b7c9811 100644 --- a/src/cron/cleanZombieOpportunities.ts +++ b/src/cron/cleanZombieOpportunities.ts @@ -3,26 +3,47 @@ import { Cron } from './cron'; import { logger } from '../logger'; import { IsNull, LessThan } from 'typeorm'; import { Opportunity } from '../entity/opportunities/Opportunity'; +import { OpportunityState } from '@dailydotdev/schema'; +import { ClaimableItem } from '../entity/ClaimableItem'; export const cleanZombieOpportunities: Cron = { name: 'clean-zombie-opportunities', handler: async (con) => { const timeThreshold = subDays(new Date(), 2); - const query = await con - .getRepository(Opportunity) - .createQueryBuilder() - .delete() - .where({ - organizationId: IsNull(), - }) - .andWhere({ - createdAt: LessThan(timeThreshold), - }) - .andWhere(`flags->'anonUserId' IS NOT NULL`); + con.transaction(async (entityManager) => { + const query = entityManager + .getRepository(Opportunity) + .createQueryBuilder() + .delete() + .where({ + organizationId: IsNull(), + }) + .andWhere({ + createdAt: LessThan(timeThreshold), + }) + .andWhere({ + state: OpportunityState.DRAFT, + }) + .returning('id'); - const { affected } = await query.execute(); + const { affected, raw } = await query.execute(); - logger.info({ count: affected }, 'zombies opportunities cleaned! 🧟'); + const { affected: claimables } = await entityManager + .getRepository(ClaimableItem) + .createQueryBuilder() + .delete() + .where(`flags->>'opportunityId' IN (:...ids)`, { + ids: raw + .filter((item: { id: string }) => item) + .map((item: { id: string }) => item.id), + }) + .execute(); + + logger.info( + { count: affected, claimables }, + 'zombies opportunities cleaned! 🧟', + ); + }); }, }; diff --git a/src/entity/ClaimableItem.ts b/src/entity/ClaimableItem.ts index 06a6e28204..4c8a6842b7 100644 --- a/src/entity/ClaimableItem.ts +++ b/src/entity/ClaimableItem.ts @@ -11,6 +11,7 @@ import type { SubscriptionProvider, SubscriptionStatus } from '../common/plus'; export enum ClaimableItemTypes { Plus = 'plus', + Opportunity = 'opportunity', } export type ClaimableItemFlags = { @@ -19,6 +20,7 @@ export type ClaimableItemFlags = { subscriptionId: string | null; provider: SubscriptionProvider | null; status: SubscriptionStatus | null; + opportunityId?: string; }; @Entity() @@ -27,8 +29,8 @@ export class ClaimableItem { id: string; @Column({ type: 'text' }) - @Index('IDX_claimable_item_email') - email: string; + @Index('IDX_claimable_item_identifier') + identifier: string; // typically email @Column({ default: () => 'now()' }) createdAt: Date; diff --git a/src/entity/opportunities/Opportunity.ts b/src/entity/opportunities/Opportunity.ts index 8080cf043f..f0437aae3b 100644 --- a/src/entity/opportunities/Opportunity.ts +++ b/src/entity/opportunities/Opportunity.ts @@ -23,7 +23,6 @@ import type { OpportunityLocation } from './OpportunityLocation'; import type { OpportunityPreviewStatus } from '../../common/opportunity/types'; export type OpportunityFlags = Partial<{ - anonUserId: string | null; preview: { userIds: string[]; totalCount: number; diff --git a/src/entity/user/utils.ts b/src/entity/user/utils.ts index b6c06dd222..387fdb8090 100644 --- a/src/entity/user/utils.ts +++ b/src/entity/user/utils.ts @@ -388,7 +388,7 @@ export const addClaimableItemsToUser = async ( ) => { const subscription = await con .getRepository(ClaimableItem) - .findOneBy({ email: body.email, claimedById: IsNull() }); + .findOneBy({ identifier: body.email, claimedById: IsNull() }); if (subscription) { await paddleInstance.subscriptions.update( diff --git a/src/migration/1768937354866-ClaimableItemIdentifier.ts b/src/migration/1768937354866-ClaimableItemIdentifier.ts new file mode 100644 index 0000000000..d979527460 --- /dev/null +++ b/src/migration/1768937354866-ClaimableItemIdentifier.ts @@ -0,0 +1,27 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class ClaimableItemIdentifier1768937354866 implements MigrationInterface { + name = 'ClaimableItemIdentifier1768937354866'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "public"."IDX_claimable_item_email"`); + await queryRunner.query( + `ALTER TABLE "claimable_item" RENAME COLUMN "email" TO "identifier"`, + ); + await queryRunner.query( + `CREATE INDEX "IDX_claimable_item_identifier" ON "claimable_item" ("identifier") `, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `DROP INDEX "public"."IDX_claimable_item_identifier"`, + ); + await queryRunner.query( + `ALTER TABLE "claimable_item" RENAME COLUMN "identifier" TO "email"`, + ); + await queryRunner.query( + `CREATE INDEX "IDX_claimable_item_email" ON "claimable_item" ("email") `, + ); + } +} diff --git a/src/schema/opportunity.ts b/src/schema/opportunity.ts index cf33709c31..cc4b0196e9 100644 --- a/src/schema/opportunity.ts +++ b/src/schema/opportunity.ts @@ -73,7 +73,7 @@ import { } from '../common/opportunity/accessControl'; import { sanitizeHtml } from '../common/markdown'; import { QuestionScreening } from '../entity/questions/QuestionScreening'; -import { In, Not, JsonContains, EntityManager, DeepPartial } from 'typeorm'; +import { In, Not, EntityManager, DeepPartial, IsNull } from 'typeorm'; import { Organization } from '../entity/Organization'; import { Source, SourceType } from '../entity/Source'; import { @@ -118,6 +118,7 @@ import { notifyOpportunityFeedbackSubmitted } from '../common/opportunity/pubsub import { triggerTypedEvent } from '../common/typedPubsub'; import { randomUUID } from 'crypto'; import { opportunityMatchBatchSize } from '../types'; +import { ClaimableItem, ClaimableItemTypes } from '../entity/ClaimableItem'; export interface GQLOpportunity extends Pick< Opportunity, @@ -1592,11 +1593,25 @@ export const resolvers: IResolvers = traceResolvers< relations: { keywords: true }, }); } else { + const claimableItem = await ctx.con + .getRepository(ClaimableItem) + .findOne({ + where: { + identifier: ctx.trackingId, + type: ClaimableItemTypes.Opportunity, + claimedById: IsNull(), + }, + }); + + if (!claimableItem?.flags?.opportunityId) { + throw new NotFoundError('No opportunity found for preview'); + } + opportunity = await ctx.con .getRepository(OpportunityJob) .findOneOrFail({ where: { - flags: JsonContains({ anonUserId: ctx.trackingId }), + id: claimableItem.flags.opportunityId, }, relations: { keywords: true }, }); @@ -2860,10 +2875,6 @@ export const resolvers: IResolvers = traceResolvers< }, }; - if (!ctx.userId) { - flags.anonUserId = ctx.trackingId; - } - const opportunity = await ctx.con.transaction(async (entityManager) => { const newOpportunity = await ctx.con.getRepository(OpportunityJob).save( ctx.con.getRepository(OpportunityJob).create({ @@ -2882,6 +2893,16 @@ export const resolvers: IResolvers = traceResolvers< userId: ctx.userId, }), ); + } else { + await entityManager.getRepository(ClaimableItem).insert( + entityManager.getRepository(ClaimableItem).create({ + identifier: ctx.trackingId, + type: ClaimableItemTypes.Opportunity, + flags: { + opportunityId: newOpportunity.id, + }, + }), + ); } return newOpportunity; From 33ae91359c0f8b90c168c8afab095cf7e6d22b7c Mon Sep 17 00:00:00 2001 From: capJavert Date: Tue, 20 Jan 2026 22:05:32 +0100 Subject: [PATCH 2/4] fix: tests --- __tests__/paddle.ts | 20 ++-- __tests__/private.ts | 27 +++-- __tests__/schema/opportunity.ts | 102 ++++++++++-------- __tests__/users.ts | 4 +- .../workers/opportunity/parseOpportunity.ts | 12 ++- 5 files changed, 101 insertions(+), 64 deletions(-) diff --git a/__tests__/paddle.ts b/__tests__/paddle.ts index 924bbfc661..6e2b0c7297 100644 --- a/__tests__/paddle.ts +++ b/__tests__/paddle.ts @@ -1222,10 +1222,10 @@ describe('plus subscription', () => { const claimableItem = await con .getRepository(ClaimableItem) - .findOneByOrFail({ email: mockCustomer.email }); + .findOneByOrFail({ identifier: mockCustomer.email }); expect(claimableItem).toBeTruthy(); - expect(claimableItem.email).toBe('test@example.com'); + expect(claimableItem.identifier).toBe('test@example.com'); expect(claimableItem.type).toBe(ClaimableItemTypes.Plus); expect(claimableItem.flags).toHaveProperty( 'cycle', @@ -1259,7 +1259,7 @@ it('should throw an error if the email already has a claimable subscription', as }); await con.getRepository(ClaimableItem).save({ - email: 'test@example.com', + identifier: 'test@example.com', type: ClaimableItemTypes.Plus, flags: { cycle: SubscriptionCycles.Yearly, @@ -1279,7 +1279,7 @@ it('should not throw an error if the email has claimed a previously claimable su .mockResolvedValue(mockCustomer as Customer); await con.getRepository(ClaimableItem).save({ - email: 'test@example.com', + identifier: 'test@example.com', type: ClaimableItemTypes.Plus, flags: { cycle: SubscriptionCycles.Yearly, @@ -1310,10 +1310,10 @@ describe('anonymous subscription', () => { const claimableItem = await con .getRepository(ClaimableItem) - .findOneByOrFail({ email: mockCustomer.email }); + .findOneByOrFail({ identifier: mockCustomer.email }); expect(claimableItem).toBeTruthy(); - expect(claimableItem.email).toBe('test@example.com'); + expect(claimableItem.identifier).toBe('test@example.com'); expect(claimableItem.type).toBe(ClaimableItemTypes.Plus); expect(claimableItem.flags).toHaveProperty( 'cycle', @@ -1346,7 +1346,7 @@ describe('anonymous subscription', () => { }); await con.getRepository(ClaimableItem).save({ - email: 'test@example.com', + identifier: 'test@example.com', type: ClaimableItemTypes.Plus, flags: { status: SubscriptionStatus.Active, @@ -1369,7 +1369,7 @@ describe('anonymous subscription', () => { .mockResolvedValue(mockCustomer as Customer); await con.getRepository(ClaimableItem).save({ - email: 'test@example.com', + identifier: 'test@example.com', type: ClaimableItemTypes.Plus, flags: { status: SubscriptionStatus.Active, @@ -1393,7 +1393,7 @@ describe('anonymous subscription', () => { const mockCustomer = { email: 'test@example.com' }; await con.getRepository(ClaimableItem).save({ - email: 'test@example.com', + identifier: 'test@example.com', type: ClaimableItemTypes.Plus, flags: { status: SubscriptionStatus.Active, @@ -1418,7 +1418,7 @@ describe('anonymous subscription', () => { const claimableItem = await con .getRepository(ClaimableItem) - .findOneBy({ email: mockCustomer.email }); + .findOneBy({ identifier: mockCustomer.email }); expect(claimableItem).toBeNull(); }); diff --git a/__tests__/private.ts b/__tests__/private.ts index 309f623e77..2a678631fa 100644 --- a/__tests__/private.ts +++ b/__tests__/private.ts @@ -31,6 +31,7 @@ import { generateShortId } from '../src/ids'; import { OpportunityUser } from '../src/entity/opportunities/user'; import { OpportunityUserType } from '../src/entity/opportunities/types'; import { QuestionFeedback } from '../src/entity/questions/QuestionFeedback'; +import { ClaimableItem, ClaimableItemTypes } from '../src/entity/ClaimableItem'; jest.mock('../src/common/geo', () => ({ ...(jest.requireActual('../src/common/geo') as Record), @@ -793,18 +794,23 @@ describe('POST /p/newUser', () => { title: 'Test', tldr: 'Test', state: OpportunityState.DRAFT, - flags: { - anonUserId, - }, }), ); + await con.getRepository(ClaimableItem).save({ + identifier: anonUserId, + type: ClaimableItemTypes.Opportunity, + flags: { + opportunityId: opportunity.id, + }, + }); + const { body } = await request(app.server) .post('/p/newUser') .set('Content-type', 'application/json') .set('authorization', `Service ${process.env.ACCESS_SECRET}`) .send({ - id: opportunity.flags.anonUserId, + id: anonUserId, name: anonUserId, image: usersFixture[0].image, username: anonUserId, @@ -816,10 +822,15 @@ describe('POST /p/newUser', () => { expect(body.status).toEqual('ok'); expect(body.userId).toEqual(anonUserId); - const updatedOpportunity = await con - .getRepository(OpportunityJob) - .findOneBy({ id: opportunity.id }); - expect(updatedOpportunity!.flags.anonUserId).toBeNull(); + const updatedClaimableItem = await con + .getRepository(ClaimableItem) + .findOneBy({ + identifier: anonUserId, + type: ClaimableItemTypes.Opportunity, + }); + expect(updatedClaimableItem).not.toBeNull(); + expect(updatedClaimableItem!.claimedAt).toBeInstanceOf(Date); + expect(updatedClaimableItem!.claimedById).toBe(anonUserId); const opportunityUser = await con.getRepository(OpportunityUser).findOneBy({ opportunityId: opportunity.id, diff --git a/__tests__/schema/opportunity.ts b/__tests__/schema/opportunity.ts index eea3400969..dbb2673dc8 100644 --- a/__tests__/schema/opportunity.ts +++ b/__tests__/schema/opportunity.ts @@ -87,6 +87,10 @@ import { SubscriptionStatus } from '../../src/common/plus'; import { OpportunityPreviewStatus } from '../../src/common/opportunity/types'; import { unsupportedOpportunityDomains } from '../../src/common/schema/opportunities'; import * as typedPubsub from '../../src/common/typedPubsub'; +import { + ClaimableItem, + ClaimableItemTypes, +} from '../../src/entity/ClaimableItem'; // Mock Slack WebClient const mockConversationsCreate = jest.fn(); @@ -6138,7 +6142,6 @@ describe('query opportunityPreview', () => { { id: opportunitiesFixture[0].id }, { flags: { - anonUserId: 'test-anon-user-123', preview: { userIds: ['1', '2'], totalCount: 2000, // mocked total count @@ -6147,6 +6150,12 @@ describe('query opportunityPreview', () => { }, ); + await con.getRepository(ClaimableItem).insert({ + identifier: 'test-anon-user-123', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: opportunitiesFixture[0].id }, + }); + const res = await client.query(OPPORTUNITY_PREVIEW_QUERY, { variables: { first: 10 }, }); @@ -6165,7 +6174,6 @@ describe('query opportunityPreview', () => { { id: opportunitiesFixture[0].id }, { flags: { - anonUserId: 'test-anon-user-123', preview: { userIds: ['1'], totalCount: 1000, // mocked total count @@ -6174,6 +6182,12 @@ describe('query opportunityPreview', () => { }, ); + await con.getRepository(ClaimableItem).insert({ + identifier: 'test-anon-user-123', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: opportunitiesFixture[0].id }, + }); + const res = await client.query(OPPORTUNITY_PREVIEW_QUERY, { variables: { first: 10 }, }); @@ -6228,14 +6242,11 @@ describe('query opportunityPreview', () => { }); it('should indicate async generation is in progress', async () => { - await con.getRepository(OpportunityJob).update( - { id: opportunitiesFixture[0].id }, - { - flags: { - anonUserId: 'test-anon-user-123', - }, - }, - ); + await con.getRepository(ClaimableItem).insert({ + identifier: 'test-anon-user-123', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: opportunitiesFixture[0].id }, + }); const res = await client.query(OPPORTUNITY_PREVIEW_QUERY, { variables: { first: 10 }, @@ -6259,7 +6270,6 @@ describe('query opportunityPreview', () => { { id: opportunitiesFixture[0].id }, { flags: { - anonUserId: 'test-anon-user-123', preview: { userIds: [], totalCount: 0, @@ -6268,6 +6278,12 @@ describe('query opportunityPreview', () => { }, ); + await con.getRepository(ClaimableItem).insert({ + identifier: 'test-anon-user-123', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: opportunitiesFixture[0].id }, + }); + const res = await client.query(OPPORTUNITY_PREVIEW_QUERY, { variables: { first: 10 }, }); @@ -6290,7 +6306,6 @@ describe('query opportunityPreview', () => { { id: opportunitiesFixture[0].id }, { flags: { - anonUserId: 'test-anon-user-123', preview: { userIds: ['1'], totalCount: 1000, // mocked total count @@ -6299,6 +6314,12 @@ describe('query opportunityPreview', () => { }, ); + await con.getRepository(ClaimableItem).insert({ + identifier: 'test-anon-user-123', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: opportunitiesFixture[0].id }, + }); + const res = await client.query(OPPORTUNITY_PREVIEW_QUERY, { variables: { first: 10 }, }); @@ -6316,14 +6337,11 @@ describe('query opportunityPreview', () => { }); it('should send valid opportunity data to gondul', async () => { - await con.getRepository(OpportunityJob).update( - { id: opportunitiesFixture[0].id }, - { - flags: { - anonUserId: 'test-anon-user-123', - }, - }, - ); + await con.getRepository(ClaimableItem).insert({ + identifier: 'test-anon-user-123', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: opportunitiesFixture[0].id }, + }); const opportunityPreviewSpy = jest.spyOn( gondulModule.getGondulOpportunityServiceClient().instance, @@ -6375,14 +6393,11 @@ describe('query opportunityPreview', () => { }); it('should send valid locations data for continent matched opportunity', async () => { - await con.getRepository(OpportunityJob).update( - { id: opportunitiesFixture[0].id }, - { - flags: { - anonUserId: 'test-anon-user-123', - }, - }, - ); + await con.getRepository(ClaimableItem).insert({ + identifier: 'test-anon-user-123', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: opportunitiesFixture[0].id }, + }); const continentLocation = await con.getRepository(DatasetLocation).save( con.getRepository(DatasetLocation).create({ @@ -6424,14 +6439,11 @@ describe('query opportunityPreview', () => { }); it('should default to US location when opportunity has no locations', async () => { - await con.getRepository(OpportunityJob).update( - { id: opportunitiesFixture[0].id }, - { - flags: { - anonUserId: 'test-anon-user-123', - }, - }, - ); + await con.getRepository(ClaimableItem).insert({ + identifier: 'test-anon-user-123', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: opportunitiesFixture[0].id }, + }); await con.getRepository(OpportunityLocation).delete({ opportunityId: opportunitiesFixture[0].id, @@ -6463,12 +6475,15 @@ describe('query opportunityPreview', () => { { id: opportunitiesFixture[0].id }, { state: OpportunityState.PARSING, - flags: { - anonUserId: 'test-anon-user-123', - }, }, ); + await con.getRepository(ClaimableItem).insert({ + identifier: 'test-anon-user-123', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: opportunitiesFixture[0].id }, + }); + await testQueryErrorCode( client, { @@ -6485,12 +6500,15 @@ describe('query opportunityPreview', () => { { id: opportunitiesFixture[0].id }, { state: OpportunityState.ERROR, - flags: { - anonUserId: 'test-anon-user-123', - }, }, ); + await con.getRepository(ClaimableItem).insert({ + identifier: 'test-anon-user-123', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: opportunitiesFixture[0].id }, + }); + await testQueryErrorCode( client, { diff --git a/__tests__/users.ts b/__tests__/users.ts index 6836ccd604..e20fccdd81 100644 --- a/__tests__/users.ts +++ b/__tests__/users.ts @@ -7121,7 +7121,7 @@ describe('add claimable items to user', () => { await con.getRepository(ClaimableItem).save({ id: claimableItemUuid, type: ClaimableItemTypes.Plus, - email: 'john.doe@example.com', + identifier: 'john.doe@example.com', flags, }); @@ -7233,7 +7233,7 @@ describe('add claimable items to user', () => { await con.getRepository(ClaimableItem).save({ id: claimableItemUuid, type: ClaimableItemTypes.Plus, - email: 'johnclaim@email.com', + identifier: 'johnclaim@email.com', flags, }); diff --git a/__tests__/workers/opportunity/parseOpportunity.ts b/__tests__/workers/opportunity/parseOpportunity.ts index 302d6120dd..49a42e15d5 100644 --- a/__tests__/workers/opportunity/parseOpportunity.ts +++ b/__tests__/workers/opportunity/parseOpportunity.ts @@ -36,6 +36,10 @@ import { RESUME_BUCKET_NAME } from '../../../src/config'; import { createClient } from '@connectrpc/connect'; import type { ServiceClient } from '../../../src/types'; import * as brokkrCommon from '../../../src/common/brokkr'; +import { + ClaimableItem, + ClaimableItemTypes, +} from '../../../src/entity/ClaimableItem'; const mockStorageDownload = jest.fn(); const mockStorageDelete = jest.fn(); @@ -561,6 +565,12 @@ describe('parseOpportunity worker', () => { }); it('should handle anonymous user (trackingId only)', async () => { + await con.getRepository(ClaimableItem).insert({ + identifier: 'anon1', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: testOpportunityId }, + }); + await con.getRepository(OpportunityJob).save({ id: testOpportunityId, type: OpportunityType.JOB, @@ -570,7 +580,6 @@ describe('parseOpportunity worker', () => { content: new OpportunityContent({}), flags: { batchSize: 100, - anonUserId: 'anon1', file: { blobName: testBlobName, bucketName: RESUME_BUCKET_NAME, @@ -590,7 +599,6 @@ describe('parseOpportunity worker', () => { }); expect(opportunity!.state).toBe(OpportunityState.DRAFT); - expect(opportunity!.flags?.anonUserId).toBe('anon1'); // Verify no recruiter was assigned const recruiter = await con From 0c4c44a835a36fd88554fc706256dc2c390948aa Mon Sep 17 00:00:00 2001 From: capJavert Date: Tue, 20 Jan 2026 22:59:19 +0100 Subject: [PATCH 3/4] feat: add claim opportunities mutation --- __tests__/schema/opportunity.ts | 265 +++++++++++++++++++++++++++++++- src/schema/opportunity.ts | 29 ++++ 2 files changed, 293 insertions(+), 1 deletion(-) diff --git a/__tests__/schema/opportunity.ts b/__tests__/schema/opportunity.ts index dbb2673dc8..36cb83d764 100644 --- a/__tests__/schema/opportunity.ts +++ b/__tests__/schema/opportunity.ts @@ -1,5 +1,5 @@ import type { ZodError } from 'zod'; -import { DataSource, IsNull } from 'typeorm'; +import { DataSource, In, IsNull } from 'typeorm'; import request from 'supertest'; import { Alerts, Keyword, User } from '../../src/entity'; import { Opportunity } from '../../src/entity/opportunities/Opportunity'; @@ -6836,3 +6836,266 @@ describe('mutation reimportOpportunity', () => { expect(updatedOpportunity.state).toBe(originalOpportunity.state); // State should be preserved }); }); + +describe('mutation claimOpportunities', () => { + const MUTATION = /* GraphQL */ ` + mutation ClaimOpportunities($identifier: String!) { + claimOpportunities(identifier: $identifier) { + ids + } + } + `; + + it('should require authentication', async () => { + await testMutationErrorCode( + client, + { + mutation: MUTATION, + variables: { identifier: 'anon-123' }, + }, + 'UNAUTHENTICATED', + ); + }); + + it('should claim multiple opportunities for valid identifier', async () => { + loggedUser = '1'; + + const oppId1 = 'a50e8400-e29b-41d4-a716-446655440001'; + const oppId2 = 'a50e8400-e29b-41d4-a716-446655440002'; + + // Create opportunities without organization (claimable) + await con.getRepository(OpportunityJob).save([ + { + id: oppId1, + type: OpportunityType.JOB, + state: OpportunityState.DRAFT, + title: 'Claimable Opportunity 1', + tldr: 'Test opportunity 1', + organizationId: null, + }, + { + id: oppId2, + type: OpportunityType.JOB, + state: OpportunityState.DRAFT, + title: 'Claimable Opportunity 2', + tldr: 'Test opportunity 2', + organizationId: null, + }, + ]); + + // Create claimable items for the same anonymous identifier + await con.getRepository(ClaimableItem).save([ + { + identifier: 'anon-user-abc', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: oppId1 }, + }, + { + identifier: 'anon-user-abc', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: oppId2 }, + }, + ]); + + const res = await client.mutate(MUTATION, { + variables: { identifier: 'anon-user-abc' }, + }); + + expect(res.errors).toBeFalsy(); + expect(res.data.claimOpportunities.ids).toHaveLength(2); + expect(res.data.claimOpportunities.ids).toEqual( + expect.arrayContaining([oppId1, oppId2]), + ); + + // Verify OpportunityUserRecruiter created for both + const recruiters = await con + .getRepository(OpportunityUserRecruiter) + .findBy({ userId: '1', opportunityId: In([oppId1, oppId2]) }); + expect(recruiters).toHaveLength(2); + + // Verify ClaimableItems marked as claimed + const claimedItems = await con + .getRepository(ClaimableItem) + .findBy({ identifier: 'anon-user-abc' }); + expect(claimedItems.every((item) => item.claimedById === '1')).toBe(true); + expect(claimedItems.every((item) => item.claimedAt !== null)).toBe(true); + }); + + it('should return empty when no claimable items match identifier', async () => { + loggedUser = '1'; + + const res = await client.mutate(MUTATION, { + variables: { identifier: 'non-existent-identifier' }, + }); + + expect(res.errors).toBeFalsy(); + expect(res.data.claimOpportunities).toEqual({ ids: [] }); + }); + + it('should return empty when claimable items are already claimed', async () => { + loggedUser = '1'; + + const oppId = 'b50e8400-e29b-41d4-a716-446655440001'; + + await con.getRepository(OpportunityJob).save({ + id: oppId, + type: OpportunityType.JOB, + state: OpportunityState.DRAFT, + title: 'Already Claimed Opportunity', + tldr: 'Test opportunity', + organizationId: null, + }); + + await con.getRepository(ClaimableItem).save({ + identifier: 'anon-claimed', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: oppId }, + claimedById: '2', + claimedAt: new Date(), + }); + + const res = await client.mutate(MUTATION, { + variables: { identifier: 'anon-claimed' }, + }); + + expect(res.errors).toBeFalsy(); + expect(res.data.claimOpportunities).toEqual({ ids: [] }); + }); + + it('should return empty when opportunities have organization linked', async () => { + loggedUser = '1'; + + const oppId = 'c50e8400-e29b-41d4-a716-446655440001'; + + await con.getRepository(OpportunityJob).save({ + id: oppId, + type: OpportunityType.JOB, + state: OpportunityState.DRAFT, + title: 'Opportunity With Org', + tldr: 'Test opportunity', + organizationId: organizationsFixture[0].id, + }); + + await con.getRepository(ClaimableItem).save({ + identifier: 'anon-with-org', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: oppId }, + }); + + const res = await client.mutate(MUTATION, { + variables: { identifier: 'anon-with-org' }, + }); + + expect(res.errors).toBeFalsy(); + expect(res.data.claimOpportunities).toEqual({ ids: [] }); + }); + + it('should return empty when claimable item flags lack opportunityId', async () => { + loggedUser = '1'; + + await con.getRepository(ClaimableItem).save({ + identifier: 'anon-no-opp-id', + type: ClaimableItemTypes.Opportunity, + flags: {}, + }); + + const res = await client.mutate(MUTATION, { + variables: { identifier: 'anon-no-opp-id' }, + }); + + expect(res.errors).toBeFalsy(); + expect(res.data.claimOpportunities).toEqual({ ids: [] }); + }); + + it('should only claim opportunities without organization', async () => { + loggedUser = '1'; + + const oppId1 = 'd50e8400-e29b-41d4-a716-446655440001'; + const oppId2 = 'd50e8400-e29b-41d4-a716-446655440002'; + + await con.getRepository(OpportunityJob).save([ + { + id: oppId1, + type: OpportunityType.JOB, + state: OpportunityState.DRAFT, + title: 'Claimable', + tldr: 'Test opportunity 1', + organizationId: null, + }, + { + id: oppId2, + type: OpportunityType.JOB, + state: OpportunityState.DRAFT, + title: 'Not Claimable - has org', + tldr: 'Test opportunity 2', + organizationId: organizationsFixture[0].id, + }, + ]); + + await con.getRepository(ClaimableItem).save([ + { + identifier: 'anon-partial', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: oppId1 }, + }, + { + identifier: 'anon-partial', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: oppId2 }, + }, + ]); + + const res = await client.mutate(MUTATION, { + variables: { identifier: 'anon-partial' }, + }); + + expect(res.errors).toBeFalsy(); + expect(res.data.claimOpportunities.ids).toEqual([oppId1]); + + // Both claimable items should be marked as claimed + const claimedItems = await con + .getRepository(ClaimableItem) + .findBy({ identifier: 'anon-partial' }); + expect(claimedItems.every((item) => item.claimedById === '1')).toBe(true); + }); + + it('should handle existing OpportunityUserRecruiter gracefully', async () => { + loggedUser = '1'; + + const oppId = 'e50e8400-e29b-41d4-a716-446655440001'; + + await con.getRepository(OpportunityJob).save({ + id: oppId, + type: OpportunityType.JOB, + state: OpportunityState.DRAFT, + title: 'Opportunity with existing recruiter', + tldr: 'Test opportunity', + organizationId: null, + }); + + // Pre-existing recruiter relationship + await con.getRepository(OpportunityUserRecruiter).save({ + opportunityId: oppId, + userId: '1', + }); + + await con.getRepository(ClaimableItem).save({ + identifier: 'anon-existing-recruiter', + type: ClaimableItemTypes.Opportunity, + flags: { opportunityId: oppId }, + }); + + const res = await client.mutate(MUTATION, { + variables: { identifier: 'anon-existing-recruiter' }, + }); + + expect(res.errors).toBeFalsy(); + expect(res.data.claimOpportunities.ids).toEqual([oppId]); + + // Should still only have one recruiter record (upsert) + const recruiters = await con + .getRepository(OpportunityUserRecruiter) + .findBy({ opportunityId: oppId, userId: '1' }); + expect(recruiters).toHaveLength(1); + }); +}); diff --git a/src/schema/opportunity.ts b/src/schema/opportunity.ts index cc4b0196e9..3d5d8e1696 100644 --- a/src/schema/opportunity.ts +++ b/src/schema/opportunity.ts @@ -119,6 +119,7 @@ import { triggerTypedEvent } from '../common/typedPubsub'; import { randomUUID } from 'crypto'; import { opportunityMatchBatchSize } from '../types'; import { ClaimableItem, ClaimableItemTypes } from '../entity/ClaimableItem'; +import { claimAnonOpportunities } from '../common/opportunity/user'; export interface GQLOpportunity extends Pick< Opportunity, @@ -205,6 +206,10 @@ export interface GQLOpportunityStats { introduced: number; } +export type GQLOpportunitiesClaim = { + ids: string[]; +}; + export const typeDefs = /* GraphQL */ ` ${toGQLEnum(OpportunityMatchStatus, 'OpportunityMatchStatus')} ${toGQLEnum(OrganizationLinkType, 'OrganizationLinkType')} @@ -620,6 +625,10 @@ export const typeDefs = /* GraphQL */ ` introduced: Int! } + type OpportunitiesClaim { + ids: [String!]! + } + extend type Query { """ Get the public information about a Opportunity listing @@ -1032,6 +1041,11 @@ export const typeDefs = /* GraphQL */ ` payload: AddOpportunitySeatsInput! ): EmptyResponse @auth + + """ + Claim opportunities associated with an anonymous identifier + """ + claimOpportunities(identifier: String!): OpportunitiesClaim @auth } `; @@ -3020,6 +3034,21 @@ export const resolvers: IResolvers = traceResolvers< throw error; } }, + claimOpportunities: async ( + _, + { identifier }: { identifier: string }, + ctx: AuthContext, + ): Promise => { + const opportunities = await claimAnonOpportunities({ + anonUserId: identifier, + userId: ctx.userId, + con: ctx.con.manager, + }); + + return { + ids: opportunities.map((item) => item.id), + }; + }, }, OpportunityMatch: { engagementProfile: async ( From 0b0f173847f3dd0756c173eb8ff708ec4a2bd248 Mon Sep 17 00:00:00 2001 From: capJavert Date: Thu, 22 Jan 2026 12:25:55 +0100 Subject: [PATCH 4/4] feat: feedback --- src/common/opportunity/user.ts | 14 +++++--- src/cron/cleanZombieOpportunities.ts | 34 ++++++++++++------- .../1768937354866-ClaimableItemIdentifier.ts | 10 +++--- 3 files changed, 37 insertions(+), 21 deletions(-) diff --git a/src/common/opportunity/user.ts b/src/common/opportunity/user.ts index b218f52214..2e4d43f455 100644 --- a/src/common/opportunity/user.ts +++ b/src/common/opportunity/user.ts @@ -28,15 +28,19 @@ export const claimAnonOpportunities = async ({ claimedById: IsNull(), }); + const opportunityIds = claimableItems + .filter((item) => item.flags.opportunityId) + .map((item) => item.flags.opportunityId); + + if (!opportunityIds.length) { + return []; + } + const opportunities = await entityManager .getRepository(OpportunityJob) .find({ where: { - id: In( - claimableItems - .filter((item) => item.flags.opportunityId) - .map((item) => item.flags.opportunityId), - ), + id: In(opportunityIds), organizationId: IsNull(), // only claim opportunities not linked to an organization yet }, }); diff --git a/src/cron/cleanZombieOpportunities.ts b/src/cron/cleanZombieOpportunities.ts index 1c7b7c9811..055e776a4c 100644 --- a/src/cron/cleanZombieOpportunities.ts +++ b/src/cron/cleanZombieOpportunities.ts @@ -11,7 +11,7 @@ export const cleanZombieOpportunities: Cron = { handler: async (con) => { const timeThreshold = subDays(new Date(), 2); - con.transaction(async (entityManager) => { + await con.transaction(async (entityManager) => { const query = entityManager .getRepository(Opportunity) .createQueryBuilder() @@ -29,19 +29,29 @@ export const cleanZombieOpportunities: Cron = { const { affected, raw } = await query.execute(); - const { affected: claimables } = await entityManager - .getRepository(ClaimableItem) - .createQueryBuilder() - .delete() - .where(`flags->>'opportunityId' IN (:...ids)`, { - ids: raw - .filter((item: { id: string }) => item) - .map((item: { id: string }) => item.id), - }) - .execute(); + const ids = raw + .filter((item: { id: string }) => item) + .map((item: { id: string }) => item.id); + + let claimables = 0; + + if (ids.length) { + const { affected } = await entityManager + .getRepository(ClaimableItem) + .createQueryBuilder() + .delete() + .where(`flags->>'opportunityId' IN (:...ids)`, { + ids, + }) + .execute(); + + if (affected) { + claimables += affected; + } + } logger.info( - { count: affected, claimables }, + { count: affected || 0, claimables }, 'zombies opportunities cleaned! 🧟', ); }); diff --git a/src/migration/1768937354866-ClaimableItemIdentifier.ts b/src/migration/1768937354866-ClaimableItemIdentifier.ts index d979527460..e60966b197 100644 --- a/src/migration/1768937354866-ClaimableItemIdentifier.ts +++ b/src/migration/1768937354866-ClaimableItemIdentifier.ts @@ -4,24 +4,26 @@ export class ClaimableItemIdentifier1768937354866 implements MigrationInterface name = 'ClaimableItemIdentifier1768937354866'; public async up(queryRunner: QueryRunner): Promise { - await queryRunner.query(`DROP INDEX "public"."IDX_claimable_item_email"`); + await queryRunner.query( + `DROP INDEX IF EXISTS "public"."IDX_claimable_item_email"`, + ); await queryRunner.query( `ALTER TABLE "claimable_item" RENAME COLUMN "email" TO "identifier"`, ); await queryRunner.query( - `CREATE INDEX "IDX_claimable_item_identifier" ON "claimable_item" ("identifier") `, + `CREATE INDEX IF NOT EXISTS "IDX_claimable_item_identifier" ON "claimable_item" ("identifier") `, ); } public async down(queryRunner: QueryRunner): Promise { await queryRunner.query( - `DROP INDEX "public"."IDX_claimable_item_identifier"`, + `DROP INDEX IF EXISTS "public"."IDX_claimable_item_identifier"`, ); await queryRunner.query( `ALTER TABLE "claimable_item" RENAME COLUMN "identifier" TO "email"`, ); await queryRunner.query( - `CREATE INDEX "IDX_claimable_item_email" ON "claimable_item" ("email") `, + `CREATE INDEX IF NOT EXISTS "IDX_claimable_item_email" ON "claimable_item" ("email") `, ); } }