diff --git a/.infra/crons.ts b/.infra/crons.ts index 5d61530867..ad7ee9f823 100644 --- a/.infra/crons.ts +++ b/.infra/crons.ts @@ -130,6 +130,14 @@ export const crons: Cron[] = [ name: 'post-analytics-history-day-clickhouse', schedule: '3-59/5 * * * *', }, + { + name: 'user-profile-analytics-clickhouse', + schedule: '7 */1 * * *', + }, + { + name: 'user-profile-analytics-history-clickhouse', + schedule: '15 */1 * * *', + }, { name: 'clean-zombie-opportunities', schedule: '30 6 * * *', diff --git a/__tests__/cron/userProfileAnalyticsClickhouse.ts b/__tests__/cron/userProfileAnalyticsClickhouse.ts new file mode 100644 index 0000000000..20ee488820 --- /dev/null +++ b/__tests__/cron/userProfileAnalyticsClickhouse.ts @@ -0,0 +1,220 @@ +import { crons } from '../../src/cron/index'; +import { userProfileAnalyticsClickhouseCron as cron } from '../../src/cron/userProfileAnalyticsClickhouse'; +import { + expectSuccessfulCron, + mockClickhouseClientOnce, + mockClickhouseQueryJSONOnce, + saveFixtures, +} from '../helpers'; +import { userProfileAnalyticsFixture } from '../fixture/userProfileAnalytics'; +import { usersFixture, plusUsersFixture } from '../fixture/user'; +import createOrGetConnection from '../../src/db'; +import type { DataSource } from 'typeorm'; +import { UserProfileAnalytics } from '../../src/entity/user/UserProfileAnalytics'; +import { User } from '../../src/entity/user/User'; +import { deleteRedisKey, getRedisHash } from '../../src/redis'; +import { generateStorageKey, StorageTopic } from '../../src/config'; +import { format, startOfToday } from 'date-fns'; + +let con: DataSource; + +beforeAll(async () => { + con = await createOrGetConnection(); +}); + +const cronConfigRedisKey = generateStorageKey( + StorageTopic.Cron, + cron.name, + 'config', +); + +beforeEach(async () => { + jest.clearAllMocks(); + await deleteRedisKey(cronConfigRedisKey); + await saveFixtures(con, User, [...usersFixture, ...plusUsersFixture]); +}); + +const userIds = ['1', '2', '3', '4', '5']; + +describe('userProfileAnalyticsClickhouse cron', () => { + it('should be registered', () => { + const registeredWorker = crons.find((item) => item.name === cron.name); + + expect(registeredWorker).toBeDefined(); + }); + + it('should sync user profile analytics data', async () => { + const clickhouseClientMock = mockClickhouseClientOnce(); + + const queryJSONSpy = mockClickhouseQueryJSONOnce( + clickhouseClientMock, + userProfileAnalyticsFixture.map((item, index) => ({ + ...item, + updatedAt: new Date(Date.now() + index * 1000).toISOString(), + id: userIds[index], + })), + ); + + const now = new Date(); + + await expectSuccessfulCron(cron); + + expect(queryJSONSpy).toHaveBeenCalledTimes(1); + expect(queryJSONSpy).toHaveBeenCalledWith({ + query: expect.stringContaining('SELECT'), + format: 'JSONEachRow', + query_params: { + lastRunAt: format(startOfToday(), 'yyyy-MM-dd HH:mm:ss'), + }, + }); + + const userProfileAnalytics = await con + .getRepository(UserProfileAnalytics) + .find({ + order: { + updatedAt: 'ASC', + }, + }); + + expect(userProfileAnalytics.length).toBe( + userProfileAnalyticsFixture.length, + ); + + userProfileAnalytics.forEach((item, index) => { + expect(item).toEqual({ + id: userIds[index], + updatedAt: expect.any(Date), + createdAt: expect.any(Date), + uniqueVisitors: userProfileAnalyticsFixture[index].uniqueVisitors, + } as UserProfileAnalytics); + }); + + const cronConfig = await getRedisHash(cronConfigRedisKey); + + expect(cronConfig).toBeDefined(); + expect(cronConfig.lastRunAt).toBeDefined(); + expect(new Date(cronConfig.lastRunAt).getTime()).toBeGreaterThan( + now.getTime(), + ); + }); + + it('should use lastRunAt from previous run', async () => { + let clickhouseClientMock = mockClickhouseClientOnce(); + + mockClickhouseQueryJSONOnce( + clickhouseClientMock, + userProfileAnalyticsFixture.map((item, index) => ({ + ...item, + updatedAt: new Date(Date.now() + index * 1000).toISOString(), + id: userIds[index], + })), + ); + + await expectSuccessfulCron(cron); + + const userProfileAnalytics = await con + .getRepository(UserProfileAnalytics) + .find({ + order: { + updatedAt: 'ASC', + }, + }); + + expect(userProfileAnalytics.length).toBe( + userProfileAnalyticsFixture.length, + ); + + userProfileAnalytics.forEach((item, index) => { + expect(item).toEqual({ + id: userIds[index], + updatedAt: expect.any(Date), + createdAt: expect.any(Date), + uniqueVisitors: userProfileAnalyticsFixture[index].uniqueVisitors, + } as UserProfileAnalytics); + }); + + const lastCronConfig = await getRedisHash(cronConfigRedisKey); + + clickhouseClientMock = mockClickhouseClientOnce(); + + const queryJSONSpy = mockClickhouseQueryJSONOnce( + clickhouseClientMock, + userProfileAnalyticsFixture.map((item, index) => ({ + ...item, + updatedAt: new Date(Date.now() + index * 1000).toISOString(), + id: userIds[index], + })), + ); + + await expectSuccessfulCron(cron); + + expect(queryJSONSpy).toHaveBeenCalledTimes(1); + expect(queryJSONSpy).toHaveBeenCalledWith({ + query: expect.stringContaining('SELECT'), + format: 'JSONEachRow', + query_params: { + lastRunAt: format( + new Date(lastCronConfig.lastRunAt), + 'yyyy-MM-dd HH:mm:ss', + ), + }, + }); + + const cronConfig = await getRedisHash(cronConfigRedisKey); + + expect(cronConfig).toBeDefined(); + expect(cronConfig.lastRunAt).toBeDefined(); + expect(new Date(cronConfig.lastRunAt).getTime()).toBeGreaterThan( + new Date(lastCronConfig.lastRunAt).getTime(), + ); + }); + + it('should upsert user profile analytics data on repeated runs', async () => { + let clickhouseClientMock = mockClickhouseClientOnce(); + + mockClickhouseQueryJSONOnce( + clickhouseClientMock, + userProfileAnalyticsFixture.map((item, index) => ({ + ...item, + updatedAt: new Date(Date.now() + index * 1000).toISOString(), + id: userIds[index], + })), + ); + + await expectSuccessfulCron(cron); + + clickhouseClientMock = mockClickhouseClientOnce(); + + mockClickhouseQueryJSONOnce( + clickhouseClientMock, + userProfileAnalyticsFixture.map((item, index) => ({ + ...item, + updatedAt: new Date(Date.now() + index * 1000).toISOString(), + id: userIds[index], + uniqueVisitors: item.uniqueVisitors + 100, + })), + ); + + await expectSuccessfulCron(cron); + + const userProfileAnalytics = await con + .getRepository(UserProfileAnalytics) + .find({ + select: ['id', 'uniqueVisitors'], + order: { + updatedAt: 'ASC', + }, + }); + + expect(userProfileAnalytics.length).toBe( + userProfileAnalyticsFixture.length, + ); + + userProfileAnalytics.forEach((item, index) => { + expect(item).toEqual({ + id: userIds[index], + uniqueVisitors: userProfileAnalyticsFixture[index].uniqueVisitors + 100, + } as UserProfileAnalytics); + }); + }); +}); diff --git a/__tests__/cron/userProfileAnalyticsHistoryClickhouse.ts b/__tests__/cron/userProfileAnalyticsHistoryClickhouse.ts new file mode 100644 index 0000000000..f18f610120 --- /dev/null +++ b/__tests__/cron/userProfileAnalyticsHistoryClickhouse.ts @@ -0,0 +1,232 @@ +import { crons } from '../../src/cron/index'; +import { userProfileAnalyticsHistoryClickhouseCron as cron } from '../../src/cron/userProfileAnalyticsHistoryClickhouse'; +import { + expectSuccessfulCron, + mockClickhouseClientOnce, + mockClickhouseQueryJSONOnce, + saveFixtures, +} from '../helpers'; +import { userProfileAnalyticsFixture } from '../fixture/userProfileAnalytics'; +import { usersFixture, plusUsersFixture } from '../fixture/user'; +import createOrGetConnection from '../../src/db'; +import type { DataSource } from 'typeorm'; +import { UserProfileAnalyticsHistory } from '../../src/entity/user/UserProfileAnalyticsHistory'; +import { User } from '../../src/entity/user/User'; +import { deleteRedisKey, getRedisHash } from '../../src/redis'; +import { generateStorageKey, StorageTopic } from '../../src/config'; +import { format, startOfToday } from 'date-fns'; + +let con: DataSource; + +beforeAll(async () => { + con = await createOrGetConnection(); +}); + +const cronConfigRedisKey = generateStorageKey( + StorageTopic.Cron, + cron.name, + 'config', +); + +beforeEach(async () => { + jest.clearAllMocks(); + await deleteRedisKey(cronConfigRedisKey); + await saveFixtures(con, User, [...usersFixture, ...plusUsersFixture]); +}); + +const userIds = ['1', '2', '3', '4', '5']; + +describe('userProfileAnalyticsHistoryClickhouse cron', () => { + it('should be registered', () => { + const registeredWorker = crons.find((item) => item.name === cron.name); + + expect(registeredWorker).toBeDefined(); + }); + + it('should sync user profile analytics history data', async () => { + const clickhouseClientMock = mockClickhouseClientOnce(); + const date = format(new Date(), 'yyyy-MM-dd'); + + const queryJSONSpy = mockClickhouseQueryJSONOnce( + clickhouseClientMock, + userProfileAnalyticsFixture.map((item, index) => ({ + updatedAt: new Date(Date.now() + index * 1000).toISOString(), + id: userIds[index], + date, + uniqueVisitors: item.uniqueVisitors, + })), + ); + + const now = new Date(); + + await expectSuccessfulCron(cron); + + expect(queryJSONSpy).toHaveBeenCalledTimes(1); + expect(queryJSONSpy).toHaveBeenCalledWith({ + query: expect.stringContaining('SELECT'), + format: 'JSONEachRow', + query_params: { + date, + lastRunAt: format(startOfToday(), 'yyyy-MM-dd HH:mm:ss'), + }, + }); + + const userProfileAnalyticsHistory = await con + .getRepository(UserProfileAnalyticsHistory) + .find({ + order: { + updatedAt: 'ASC', + }, + }); + + expect(userProfileAnalyticsHistory.length).toBe( + userProfileAnalyticsFixture.length, + ); + + userProfileAnalyticsHistory.forEach((item, index) => { + expect(item).toEqual({ + id: userIds[index], + updatedAt: expect.any(Date), + createdAt: expect.any(Date), + uniqueVisitors: userProfileAnalyticsFixture[index].uniqueVisitors, + date, + } as UserProfileAnalyticsHistory); + }); + + const cronConfig = await getRedisHash(cronConfigRedisKey); + + expect(cronConfig).toBeDefined(); + expect(cronConfig.lastRunAt).toBeDefined(); + expect(new Date(cronConfig.lastRunAt).getTime()).toBeGreaterThan( + now.getTime(), + ); + }); + + it('should use lastRunAt from previous run', async () => { + let clickhouseClientMock = mockClickhouseClientOnce(); + const date = format(new Date(), 'yyyy-MM-dd'); + + mockClickhouseQueryJSONOnce( + clickhouseClientMock, + userProfileAnalyticsFixture.map((item, index) => ({ + updatedAt: new Date(Date.now() + index * 1000).toISOString(), + id: userIds[index], + date, + uniqueVisitors: item.uniqueVisitors, + })), + ); + + await expectSuccessfulCron(cron); + + const userProfileAnalyticsHistory = await con + .getRepository(UserProfileAnalyticsHistory) + .find({ + order: { + updatedAt: 'ASC', + }, + }); + + expect(userProfileAnalyticsHistory.length).toBe( + userProfileAnalyticsFixture.length, + ); + + userProfileAnalyticsHistory.forEach((item, index) => { + expect(item).toEqual({ + id: userIds[index], + updatedAt: expect.any(Date), + createdAt: expect.any(Date), + uniqueVisitors: userProfileAnalyticsFixture[index].uniqueVisitors, + date, + } as UserProfileAnalyticsHistory); + }); + + const lastCronConfig = await getRedisHash(cronConfigRedisKey); + + clickhouseClientMock = mockClickhouseClientOnce(); + + const queryJSONSpy = mockClickhouseQueryJSONOnce( + clickhouseClientMock, + userProfileAnalyticsFixture.map((item, index) => ({ + updatedAt: new Date(Date.now() + index * 1000).toISOString(), + id: userIds[index], + date, + uniqueVisitors: item.uniqueVisitors, + })), + ); + + await expectSuccessfulCron(cron); + + expect(queryJSONSpy).toHaveBeenCalledTimes(1); + expect(queryJSONSpy).toHaveBeenCalledWith({ + query: expect.stringContaining('SELECT'), + format: 'JSONEachRow', + query_params: { + date, + lastRunAt: format( + new Date(lastCronConfig.lastRunAt), + 'yyyy-MM-dd HH:mm:ss', + ), + }, + }); + + const cronConfig = await getRedisHash(cronConfigRedisKey); + + expect(cronConfig).toBeDefined(); + expect(cronConfig.lastRunAt).toBeDefined(); + expect(new Date(cronConfig.lastRunAt).getTime()).toBeGreaterThan( + new Date(lastCronConfig.lastRunAt).getTime(), + ); + }); + + it('should upsert user profile analytics history data on repeated runs', async () => { + let clickhouseClientMock = mockClickhouseClientOnce(); + const date = format(new Date(), 'yyyy-MM-dd'); + + mockClickhouseQueryJSONOnce( + clickhouseClientMock, + userProfileAnalyticsFixture.map((item, index) => ({ + updatedAt: new Date(Date.now() + index * 1000).toISOString(), + id: userIds[index], + date, + uniqueVisitors: item.uniqueVisitors, + })), + ); + + await expectSuccessfulCron(cron); + + clickhouseClientMock = mockClickhouseClientOnce(); + + mockClickhouseQueryJSONOnce( + clickhouseClientMock, + userProfileAnalyticsFixture.map((item, index) => ({ + updatedAt: new Date(Date.now() + index * 1000).toISOString(), + id: userIds[index], + date, + uniqueVisitors: item.uniqueVisitors + 50, + })), + ); + + await expectSuccessfulCron(cron); + + const userProfileAnalyticsHistory = await con + .getRepository(UserProfileAnalyticsHistory) + .find({ + select: ['id', 'uniqueVisitors', 'date'], + order: { + updatedAt: 'ASC', + }, + }); + + expect(userProfileAnalyticsHistory.length).toBe( + userProfileAnalyticsFixture.length, + ); + + userProfileAnalyticsHistory.forEach((item, index) => { + expect(item).toEqual({ + id: userIds[index], + date, + uniqueVisitors: userProfileAnalyticsFixture[index].uniqueVisitors + 50, + } as UserProfileAnalyticsHistory); + }); + }); +}); diff --git a/__tests__/fixture/userProfileAnalytics.ts b/__tests__/fixture/userProfileAnalytics.ts new file mode 100644 index 0000000000..f26b9143d9 --- /dev/null +++ b/__tests__/fixture/userProfileAnalytics.ts @@ -0,0 +1,9 @@ +// fixture is without id and dates, you can inject them yourself in the test + +export const userProfileAnalyticsFixture = [ + { uniqueVisitors: 150 }, + { uniqueVisitors: 320 }, + { uniqueVisitors: 890 }, + { uniqueVisitors: 45 }, + { uniqueVisitors: 1250 }, +]; diff --git a/__tests__/users.ts b/__tests__/users.ts index e20fccdd81..ec3e05ccf2 100644 --- a/__tests__/users.ts +++ b/__tests__/users.ts @@ -57,6 +57,8 @@ import { UserTopReader, View, } from '../src/entity'; +import { UserProfileAnalytics } from '../src/entity/user/UserProfileAnalytics'; +import { UserProfileAnalyticsHistory } from '../src/entity/user/UserProfileAnalyticsHistory'; import { sourcesFixture } from './fixture/source'; import { CioTransactionalMessageTemplateId, @@ -178,6 +180,7 @@ let state: GraphQLTestingState; let client: GraphQLTestClient; let loggedUser: string = null; let isPlus: boolean; +let isTeamMember = false; const userTimezone = 'Pacific/Midway'; jest.mock('../src/common/paddle/index.ts', () => ({ @@ -216,7 +219,7 @@ beforeAll(async () => { loggedUser, undefined, undefined, - undefined, + isTeamMember, isPlus, 'US', ), @@ -230,6 +233,7 @@ const now = new Date(); beforeEach(async () => { loggedUser = null; isPlus = false; + isTeamMember = false; nock.cleanAll(); jest.clearAllMocks(); @@ -7656,3 +7660,186 @@ describe('mutation clearResume', () => { ).toEqual(0); }); }); + +describe('query userProfileAnalytics', () => { + const QUERY = ` + query UserProfileAnalytics($userId: ID!) { + userProfileAnalytics(userId: $userId) { + id + uniqueVisitors + updatedAt + } + } + `; + + it('should not allow unauthenticated users', () => + testQueryErrorCode( + client, + { query: QUERY, variables: { userId: '1' } }, + 'UNAUTHENTICATED', + )); + + it('should return null when viewing another user analytics', async () => { + loggedUser = '2'; + + await con.getRepository(UserProfileAnalytics).save({ + id: '1', + uniqueVisitors: 150, + }); + + const res = await client.query(QUERY, { variables: { userId: '1' } }); + expect(res.errors).toBeFalsy(); + expect(res.data.userProfileAnalytics).toBeNull(); + }); + + it('should return analytics for own profile', async () => { + loggedUser = '1'; + + const analytics = await con.getRepository(UserProfileAnalytics).save({ + id: '1', + uniqueVisitors: 150, + }); + + const res = await client.query(QUERY, { variables: { userId: '1' } }); + expect(res.errors).toBeFalsy(); + expect(res.data.userProfileAnalytics).toMatchObject({ + id: '1', + uniqueVisitors: 150, + updatedAt: analytics.updatedAt.toISOString(), + }); + }); + + it('should allow team member to view any user analytics', async () => { + loggedUser = '2'; + isTeamMember = true; + + const analytics = await con.getRepository(UserProfileAnalytics).save({ + id: '1', + uniqueVisitors: 150, + }); + + const res = await client.query(QUERY, { variables: { userId: '1' } }); + expect(res.errors).toBeFalsy(); + expect(res.data.userProfileAnalytics).toMatchObject({ + id: '1', + uniqueVisitors: 150, + updatedAt: analytics.updatedAt.toISOString(), + }); + }); + + it('should return not found error when no analytics record exists', () => { + loggedUser = '1'; + + return testQueryErrorCode( + client, + { query: QUERY, variables: { userId: '1' } }, + 'NOT_FOUND', + ); + }); +}); + +describe('query userProfileAnalyticsHistory', () => { + const QUERY = ` + query UserProfileAnalyticsHistory($userId: ID!, $first: Int, $after: String) { + userProfileAnalyticsHistory(userId: $userId, first: $first, after: $after) { + pageInfo { + hasNextPage + endCursor + } + edges { + node { + id + date + uniqueVisitors + updatedAt + } + } + } + } + `; + + it('should not allow unauthenticated users', () => + testQueryErrorCode( + client, + { query: QUERY, variables: { userId: '1' } }, + 'UNAUTHENTICATED', + )); + + it('should return null when viewing another user history', async () => { + loggedUser = '2'; + + await con + .getRepository(UserProfileAnalyticsHistory) + .save([{ id: '1', date: '2026-01-15', uniqueVisitors: 10 }]); + + const res = await client.query(QUERY, { variables: { userId: '1' } }); + expect(res.errors).toBeFalsy(); + expect(res.data.userProfileAnalyticsHistory).toBeNull(); + }); + + it('should return history for own profile', async () => { + loggedUser = '1'; + + await con.getRepository(UserProfileAnalyticsHistory).save([ + { id: '1', date: '2026-01-15', uniqueVisitors: 10 }, + { id: '1', date: '2026-01-14', uniqueVisitors: 25 }, + { id: '1', date: '2026-01-13', uniqueVisitors: 15 }, + ]); + + const res = await client.query(QUERY, { variables: { userId: '1' } }); + expect(res.errors).toBeFalsy(); + expect(res.data.userProfileAnalyticsHistory.edges).toHaveLength(3); + expect(res.data.userProfileAnalyticsHistory.edges[0].node).toMatchObject({ + id: '1', + date: '2026-01-15T00:00:00.000Z', + uniqueVisitors: 10, + }); + }); + + it('should allow team member to view any user history', async () => { + loggedUser = '2'; + isTeamMember = true; + + await con + .getRepository(UserProfileAnalyticsHistory) + .save([{ id: '1', date: '2026-01-15', uniqueVisitors: 10 }]); + + const res = await client.query(QUERY, { variables: { userId: '1' } }); + expect(res.errors).toBeFalsy(); + expect(res.data.userProfileAnalyticsHistory.edges).toHaveLength(1); + expect(res.data.userProfileAnalyticsHistory.edges[0].node).toMatchObject({ + id: '1', + date: '2026-01-15T00:00:00.000Z', + uniqueVisitors: 10, + }); + }); + + it('should paginate with first parameter', async () => { + loggedUser = '1'; + + await con.getRepository(UserProfileAnalyticsHistory).save([ + { id: '1', date: '2026-01-15', uniqueVisitors: 10 }, + { id: '1', date: '2026-01-14', uniqueVisitors: 25 }, + { id: '1', date: '2026-01-13', uniqueVisitors: 15 }, + { id: '1', date: '2026-01-12', uniqueVisitors: 20 }, + { id: '1', date: '2026-01-11', uniqueVisitors: 30 }, + ]); + + const res = await client.query(QUERY, { + variables: { userId: '1', first: 2 }, + }); + expect(res.errors).toBeFalsy(); + expect(res.data.userProfileAnalyticsHistory.edges).toHaveLength(2); + expect(res.data.userProfileAnalyticsHistory.pageInfo.hasNextPage).toBe( + true, + ); + }); + + it('should return empty edges when no history exists', async () => { + loggedUser = '1'; + + const res = await client.query(QUERY, { variables: { userId: '1' } }); + expect(res.errors).toBeFalsy(); + expect(res.data.userProfileAnalyticsHistory.edges).toHaveLength(0); + }); +}); diff --git a/clickhouse/migrations/20260115100000_user_profile_analytics.down.sql b/clickhouse/migrations/20260115100000_user_profile_analytics.down.sql new file mode 100644 index 0000000000..d37b038d63 --- /dev/null +++ b/clickhouse/migrations/20260115100000_user_profile_analytics.down.sql @@ -0,0 +1,5 @@ +DROP TABLE IF EXISTS api.user_profile_analytics_history_mv; +DROP TABLE IF EXISTS api.user_profile_analytics_mv; + +DROP TABLE IF EXISTS api.user_profile_analytics_history; +DROP TABLE IF EXISTS api.user_profile_analytics; diff --git a/clickhouse/migrations/20260115100000_user_profile_analytics.up.sql b/clickhouse/migrations/20260115100000_user_profile_analytics.up.sql new file mode 100644 index 0000000000..21a52b9cb7 --- /dev/null +++ b/clickhouse/migrations/20260115100000_user_profile_analytics.up.sql @@ -0,0 +1,51 @@ +CREATE TABLE IF NOT EXISTS api.user_profile_analytics +( + profile_id String, + created_at SimpleAggregateFunction(max, DateTime64(3)), + unique_visitors AggregateFunction(uniq, String) +) +ENGINE = AggregatingMergeTree +PARTITION BY toYYYYMM(created_at) +ORDER BY profile_id; + +CREATE TABLE IF NOT EXISTS api.user_profile_analytics_history +( + profile_id String, + date Date, + created_at SimpleAggregateFunction(max, DateTime64(3)), + unique_visitors AggregateFunction(uniq, String) +) +ENGINE = AggregatingMergeTree +PARTITION BY toYYYYMM(created_at) +ORDER BY (date, profile_id); + +-- MV for main aggregation (all-time totals) +CREATE MATERIALIZED VIEW IF NOT EXISTS api.user_profile_analytics_mv +TO api.user_profile_analytics +AS +SELECT + target_id AS profile_id, + uniqStateIf(user_id, event_name = 'profile view') AS unique_visitors, + max(server_timestamp) AS created_at +FROM events.raw_events +WHERE event_name IN ('profile view') +AND target_id IS NOT NULL +AND server_timestamp > '2026-01-23 11:04:00' +GROUP BY target_id +SETTINGS materialized_views_ignore_errors = 1; + +-- MV for daily history +CREATE MATERIALIZED VIEW IF NOT EXISTS api.user_profile_analytics_history_mv +TO api.user_profile_analytics_history +AS +SELECT + target_id AS profile_id, + toDate(server_timestamp) AS date, + uniqStateIf(user_id, event_name = 'profile view') AS unique_visitors, + max(server_timestamp) AS created_at +FROM events.raw_events +WHERE event_name IN ('profile view') +AND target_id IS NOT NULL +AND server_timestamp > '2026-01-23 11:04:00' +GROUP BY date, target_id +SETTINGS materialized_views_ignore_errors = 1; diff --git a/src/common/schema/userProfileAnalytics.ts b/src/common/schema/userProfileAnalytics.ts new file mode 100644 index 0000000000..1325996d8a --- /dev/null +++ b/src/common/schema/userProfileAnalytics.ts @@ -0,0 +1,13 @@ +import { format } from 'date-fns'; +import { z } from 'zod'; + +export const userProfileAnalyticsClickhouseSchema = z.strictObject({ + id: z.string(), + updatedAt: z.coerce.date(), + uniqueVisitors: z.coerce.number().nonnegative(), +}); + +export const userProfileAnalyticsHistoryClickhouseSchema = + userProfileAnalyticsClickhouseSchema.extend({ + date: z.coerce.date().transform((date) => format(date, 'yyyy-MM-dd')), + }); diff --git a/src/common/user.ts b/src/common/user.ts index 79cef477cd..69b8075d0d 100644 --- a/src/common/user.ts +++ b/src/common/user.ts @@ -269,3 +269,23 @@ export const checkCoresAccess = async ({ return checkUserCoresAccess({ user, requiredRole }); }; + +export const hasUserProfileAnalyticsPermissions = ({ + ctx, + userId, +}: { + ctx: AuthContext; + userId: string; +}): boolean => { + const { userId: requesterId, isTeamMember } = ctx; + + if (isTeamMember) { + return true; + } + + if (!requesterId) { + return false; + } + + return requesterId === userId; +}; diff --git a/src/cron/index.ts b/src/cron/index.ts index 65aa6d764d..60e0984c1b 100644 --- a/src/cron/index.ts +++ b/src/cron/index.ts @@ -21,6 +21,8 @@ import cleanGiftedPlus from './cleanGiftedPlus'; import { cleanStaleUserTransactions } from './cleanStaleUserTransactions'; import { postAnalyticsClickhouseCron } from './postAnalyticsClickhouse'; import { postAnalyticsHistoryDayClickhouseCron } from './postAnalyticsHistoryDayClickhouse'; +import { userProfileAnalyticsClickhouseCron } from './userProfileAnalyticsClickhouse'; +import { userProfileAnalyticsHistoryClickhouseCron } from './userProfileAnalyticsHistoryClickhouse'; import { cleanZombieOpportunities } from './cleanZombieOpportunities'; import { userProfileUpdatedSync } from './userProfileUpdatedSync'; import expireSuperAgentTrial from './expireSuperAgentTrial'; @@ -48,6 +50,8 @@ export const crons: Cron[] = [ cleanStaleUserTransactions, postAnalyticsClickhouseCron, postAnalyticsHistoryDayClickhouseCron, + userProfileAnalyticsClickhouseCron, + userProfileAnalyticsHistoryClickhouseCron, cleanZombieOpportunities, userProfileUpdatedSync, expireSuperAgentTrial, diff --git a/src/cron/userProfileAnalyticsClickhouse.ts b/src/cron/userProfileAnalyticsClickhouse.ts new file mode 100644 index 0000000000..795c0b4d63 --- /dev/null +++ b/src/cron/userProfileAnalyticsClickhouse.ts @@ -0,0 +1,118 @@ +import { format, startOfToday } from 'date-fns'; +import { z } from 'zod'; +import { Cron } from './cron'; +import { getClickHouseClient } from '../common/clickhouse'; +import { userProfileAnalyticsClickhouseSchema } from '../common/schema/userProfileAnalytics'; +import { UserProfileAnalytics } from '../entity/user/UserProfileAnalytics'; +import { getRedisHash, setRedisHash } from '../redis'; +import { generateStorageKey, StorageTopic } from '../config'; + +type UserProfileAnalyticsClickhouseCronConfig = Partial<{ + lastRunAt: string; +}>; + +export const userProfileAnalyticsClickhouseCron: Cron = { + name: 'user-profile-analytics-clickhouse', + handler: async (con, logger) => { + const redisStorageKey = generateStorageKey( + StorageTopic.Cron, + userProfileAnalyticsClickhouseCron.name, + 'config', + ); + + const cronConfig: Partial = + await getRedisHash(redisStorageKey); + + const lastRunAt = cronConfig.lastRunAt + ? new Date(cronConfig.lastRunAt) + : startOfToday(); + + if (Number.isNaN(lastRunAt.getTime())) { + throw new Error('Invalid last run time'); + } + + const clickhouseClient = getClickHouseClient(); + + const queryParams = { + lastRunAt: format(lastRunAt, 'yyyy-MM-dd HH:mm:ss'), + }; + + const response = await clickhouseClient.query({ + query: /* sql */ ` + SELECT + profile_id AS id, + max(created_at) AS "updatedAt", + uniqMerge(unique_visitors) AS "uniqueVisitors" + FROM api.user_profile_analytics + FINAL + WHERE user_id IN ( + SELECT DISTINCT user_id + FROM api.user_profile_analytics + WHERE created_at > {lastRunAt: DateTime} + ) + GROUP BY id + HAVING "updatedAt" > {lastRunAt: DateTime} + ORDER BY "updatedAt" DESC; + `, + format: 'JSONEachRow', + query_params: queryParams, + }); + + const result = z + .array(userProfileAnalyticsClickhouseSchema) + .safeParse(await response.json()); + + if (!result.success) { + logger.error( + { schemaError: result.error.issues[0] }, + 'Invalid user profile analytics data', + ); + throw new Error('Invalid user profile analytics data'); + } + + const { data } = result; + + const chunks: UserProfileAnalytics[][] = []; + const chunkSize = 500; + + data.forEach((item) => { + if ( + chunks.length === 0 || + chunks[chunks.length - 1].length === chunkSize + ) { + chunks.push([]); + } + chunks[chunks.length - 1].push(item as UserProfileAnalytics); + }); + + const currentRunAt = new Date(); + + await con.transaction(async (entityManager) => { + for (const chunk of chunks) { + if (chunk.length === 0) { + continue; + } + + await entityManager + .createQueryBuilder() + .insert() + .into(UserProfileAnalytics) + .values(chunk) + .orUpdate(Object.keys(chunk[0]), ['id']) + .execute(); + } + }); + + await setRedisHash( + redisStorageKey, + { + lastRunAt: currentRunAt.toISOString(), + }, + ); + + logger.info( + { rows: data.length, queryParams }, + 'synced user profile analytics data', + ); + }, +}; diff --git a/src/cron/userProfileAnalyticsHistoryClickhouse.ts b/src/cron/userProfileAnalyticsHistoryClickhouse.ts new file mode 100644 index 0000000000..9e2971847e --- /dev/null +++ b/src/cron/userProfileAnalyticsHistoryClickhouse.ts @@ -0,0 +1,115 @@ +import { format, startOfToday } from 'date-fns'; +import { z } from 'zod'; +import { Cron } from './cron'; +import { getClickHouseClient } from '../common/clickhouse'; +import { userProfileAnalyticsHistoryClickhouseSchema } from '../common/schema/userProfileAnalytics'; +import { UserProfileAnalyticsHistory } from '../entity/user/UserProfileAnalyticsHistory'; +import { getRedisHash, setRedisHash } from '../redis'; +import { generateStorageKey, StorageTopic } from '../config'; + +type UserProfileAnalyticsHistoryClickhouseCronConfig = Partial<{ + lastRunAt: string; +}>; + +export const userProfileAnalyticsHistoryClickhouseCron: Cron = { + name: 'user-profile-analytics-history-clickhouse', + handler: async (con, logger) => { + const redisStorageKey = generateStorageKey( + StorageTopic.Cron, + userProfileAnalyticsHistoryClickhouseCron.name, + 'config', + ); + + const cronConfig: Partial = + await getRedisHash(redisStorageKey); + + const lastRunAt = cronConfig.lastRunAt + ? new Date(cronConfig.lastRunAt) + : startOfToday(); + + if (Number.isNaN(lastRunAt.getTime())) { + throw new Error('Invalid last run time'); + } + + const clickhouseClient = getClickHouseClient(); + + const queryParams = { + lastRunAt: format(lastRunAt, 'yyyy-MM-dd HH:mm:ss'), + date: format(new Date(), 'yyyy-MM-dd'), + }; + + const response = await clickhouseClient.query({ + query: /* sql */ ` + SELECT + profile_id AS id, + date, + max(created_at) AS "updatedAt", + uniqMerge(unique_visitors) AS "uniqueVisitors" + FROM api.user_profile_analytics_history + FINAL + WHERE date = {date: Date} + GROUP BY date, id + HAVING "updatedAt" > {lastRunAt: DateTime} + `, + format: 'JSONEachRow', + query_params: queryParams, + }); + + const result = z + .array(userProfileAnalyticsHistoryClickhouseSchema) + .safeParse(await response.json()); + + if (!result.success) { + logger.error( + { schemaError: result.error.issues[0] }, + 'Invalid user profile analytics history data', + ); + throw new Error('Invalid user profile analytics history data'); + } + + const { data } = result; + + const chunks: UserProfileAnalyticsHistory[][] = []; + const chunkSize = 500; + + data.forEach((item) => { + if ( + chunks.length === 0 || + chunks[chunks.length - 1].length === chunkSize + ) { + chunks.push([]); + } + chunks[chunks.length - 1].push(item as UserProfileAnalyticsHistory); + }); + + const currentRunAt = new Date(); + + await con.transaction(async (entityManager) => { + for (const chunk of chunks) { + if (chunk.length === 0) { + continue; + } + + await entityManager + .createQueryBuilder() + .insert() + .into(UserProfileAnalyticsHistory) + .values(chunk) + .orUpdate(Object.keys(chunk[0]), ['id', 'date']) + .execute(); + } + }); + + await setRedisHash( + redisStorageKey, + { + lastRunAt: currentRunAt.toISOString(), + }, + ); + + logger.info( + { rows: data.length, queryParams }, + 'synced user profile analytics history data', + ); + }, +}; diff --git a/src/entity/user/UserProfileAnalytics.ts b/src/entity/user/UserProfileAnalytics.ts new file mode 100644 index 0000000000..02cbe6da12 --- /dev/null +++ b/src/entity/user/UserProfileAnalytics.ts @@ -0,0 +1,32 @@ +import { + Column, + CreateDateColumn, + Entity, + JoinColumn, + OneToOne, + PrimaryColumn, + UpdateDateColumn, +} from 'typeorm'; +import type { User } from './User'; + +@Entity() +export class UserProfileAnalytics { + @PrimaryColumn({ type: 'text' }) + id: string; + + @CreateDateColumn() + createdAt: Date; + + @UpdateDateColumn() + updatedAt: Date; + + @Column({ default: 0 }) + uniqueVisitors: number; + + @OneToOne('User', { + lazy: true, + onDelete: 'CASCADE', + }) + @JoinColumn({ name: 'id' }) + user: Promise; +} diff --git a/src/entity/user/UserProfileAnalyticsHistory.ts b/src/entity/user/UserProfileAnalyticsHistory.ts new file mode 100644 index 0000000000..ab25d0194e --- /dev/null +++ b/src/entity/user/UserProfileAnalyticsHistory.ts @@ -0,0 +1,35 @@ +import { + Column, + CreateDateColumn, + Entity, + JoinColumn, + ManyToOne, + PrimaryColumn, + UpdateDateColumn, +} from 'typeorm'; +import type { User } from './User'; + +@Entity() +export class UserProfileAnalyticsHistory { + @PrimaryColumn({ type: 'text' }) + id: string; + + @PrimaryColumn({ type: 'text' }) + date: string; + + @CreateDateColumn() + createdAt: Date; + + @UpdateDateColumn() + updatedAt: Date; + + @Column({ default: 0 }) + uniqueVisitors: number; + + @ManyToOne('User', { + lazy: true, + onDelete: 'CASCADE', + }) + @JoinColumn({ name: 'id' }) + user: Promise; +} diff --git a/src/entity/user/index.ts b/src/entity/user/index.ts index 2949ebe2d6..b21845f27b 100644 --- a/src/entity/user/index.ts +++ b/src/entity/user/index.ts @@ -7,6 +7,8 @@ export * from './UserPersonalizedDigest'; export * from './UserMarketingCta'; export * from './UserStats'; export * from './UserTopReader'; +export * from './UserProfileAnalytics'; +export * from './UserProfileAnalyticsHistory'; export * from './UserStack'; export * from './HotTake'; export * from './UserHotTake'; diff --git a/src/graphorm/index.ts b/src/graphorm/index.ts index 6aff122c4e..d0d1fb51b2 100644 --- a/src/graphorm/index.ts +++ b/src/graphorm/index.ts @@ -1659,6 +1659,25 @@ const obj = new GraphORM({ }, }, }, + UserProfileAnalytics: { + requiredColumns: ['id', 'updatedAt'], + fields: { + updatedAt: { + transform: transformDate, + }, + }, + }, + UserProfileAnalyticsHistory: { + requiredColumns: ['id', 'date', 'updatedAt'], + fields: { + date: { + transform: transformDate, + }, + updatedAt: { + transform: transformDate, + }, + }, + }, Opportunity: { requiredColumns: ['id', 'createdAt'], fields: { diff --git a/src/migration/1768505964487-UserProfileAnalytics.ts b/src/migration/1768505964487-UserProfileAnalytics.ts new file mode 100644 index 0000000000..a2b1df86dc --- /dev/null +++ b/src/migration/1768505964487-UserProfileAnalytics.ts @@ -0,0 +1,53 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class UserProfileAnalytics1768505964487 implements MigrationInterface { + name = 'UserProfileAnalytics1768506000000'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE "user_profile_analytics" ( + "id" text NOT NULL, + "createdAt" TIMESTAMP NOT NULL DEFAULT now(), + "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), + "uniqueVisitors" integer NOT NULL DEFAULT '0', + CONSTRAINT "PK_user_profile_analytics" PRIMARY KEY ("id") + )`, + ); + + await queryRunner.query( + `CREATE TABLE "user_profile_analytics_history" ( + "id" text NOT NULL, + "date" text NOT NULL, + "createdAt" TIMESTAMP NOT NULL DEFAULT now(), + "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), + "uniqueVisitors" integer NOT NULL DEFAULT '0', + CONSTRAINT "PK_user_profile_analytics_history" PRIMARY KEY ("id", "date") + )`, + ); + + await queryRunner.query( + `ALTER TABLE "user_profile_analytics" + ADD CONSTRAINT "FK_user_profile_analytics_user" + FOREIGN KEY ("id") REFERENCES "user"("id") + ON DELETE CASCADE ON UPDATE NO ACTION`, + ); + + await queryRunner.query( + `ALTER TABLE "user_profile_analytics_history" + ADD CONSTRAINT "FK_user_profile_analytics_history_user" + FOREIGN KEY ("id") REFERENCES "user"("id") + ON DELETE CASCADE ON UPDATE NO ACTION`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE "user_profile_analytics_history" DROP CONSTRAINT "FK_user_profile_analytics_history_user"`, + ); + await queryRunner.query( + `ALTER TABLE "user_profile_analytics" DROP CONSTRAINT "FK_user_profile_analytics_user"`, + ); + await queryRunner.query(`DROP TABLE "user_profile_analytics_history"`); + await queryRunner.query(`DROP TABLE "user_profile_analytics"`); + } +} diff --git a/src/schema/users.ts b/src/schema/users.ts index 3b44658ab6..49432d9a8d 100644 --- a/src/schema/users.ts +++ b/src/schema/users.ts @@ -105,6 +105,7 @@ import { checkUserCoresAccess, deleteUser, getUserCoresRole, + hasUserProfileAnalyticsPermissions, } from '../common/user'; import { randomInt, randomUUID } from 'crypto'; import { ArrayContains, DataSource, In, IsNull, QueryRunner } from 'typeorm'; @@ -315,6 +316,16 @@ export interface GQLUserPersonalizedDigest { flags: UserPersonalizedDigestFlagsPublic; } +export interface GQLUserProfileAnalytics { + id: string; + uniqueVisitors: number; + updatedAt: Date; +} + +export interface GQLUserProfileAnalyticsHistory extends GQLUserProfileAnalytics { + date: string; +} + export interface SendReportArgs { type: ReportEntity; id: string; @@ -1080,6 +1091,56 @@ export const typeDefs = /* GraphQL */ ` coresRole: Int! } + """ + User profile analytics with visitor data + """ + type UserProfileAnalytics { + """ + User ID + """ + id: ID! + """ + Total unique visitors to the profile + """ + uniqueVisitors: Int! + """ + Last update timestamp + """ + updatedAt: DateTime! + } + + """ + Daily user profile analytics history entry + """ + type UserProfileAnalyticsHistory { + """ + User ID + """ + id: ID! + """ + Date of the analytics (YYYY-MM-DD) + """ + date: DateTime! + """ + Unique visitors on this day + """ + uniqueVisitors: Int! + """ + Last update timestamp + """ + updatedAt: DateTime! + } + + type UserProfileAnalyticsHistoryEdge { + node: UserProfileAnalyticsHistory! + cursor: String! + } + + type UserProfileAnalyticsHistoryConnection { + pageInfo: PageInfo! + edges: [UserProfileAnalyticsHistoryEdge!]! + } + extend type Query { """ Get user based on logged in session @@ -1271,6 +1332,21 @@ export const typeDefs = /* GraphQL */ ` Get current user's notification preferences """ notificationSettings: JSON @auth + + """ + Get user profile analytics for the specified user + """ + userProfileAnalytics(userId: ID!): UserProfileAnalytics @auth + + """ + Get user profile analytics history (daily breakdown) + """ + userProfileAnalyticsHistory( + userId: ID! + after: String + before: String + first: Int + ): UserProfileAnalyticsHistoryConnection @auth } ${toGQLEnum(UploadPreset, 'UploadPreset')} @@ -2584,6 +2660,58 @@ export const resolvers: IResolvers = traceResolvers< ...user?.notificationFlags, }; }, + userProfileAnalytics: async ( + _, + args: { userId: string }, + ctx: AuthContext, + info: GraphQLResolveInfo, + ): Promise => { + if (!hasUserProfileAnalyticsPermissions({ ctx, userId: args.userId })) { + return null; + } + + return graphorm.queryOneOrFail( + ctx, + info, + (builder) => ({ + ...builder, + queryBuilder: builder.queryBuilder.andWhere( + `"${builder.alias}".id = :userId`, + { userId: args.userId }, + ), + }), + undefined, + true, + ); + }, + userProfileAnalyticsHistory: async ( + _, + args: ConnectionArguments & { userId: string }, + ctx: AuthContext, + info: GraphQLResolveInfo, + ): Promise | null> => { + if (!hasUserProfileAnalyticsPermissions({ ctx, userId: args.userId })) { + return null; + } + + return queryPaginatedByDate( + ctx, + info, + args, + { key: 'updatedAt' }, + { + queryBuilder: (builder) => { + builder.queryBuilder = builder.queryBuilder.andWhere( + `${builder.alias}.id = :userId`, + { userId: args.userId }, + ); + return builder; + }, + orderByKey: 'DESC', + readReplica: true, + }, + ); + }, }, Mutation: { clearImage: async (