diff --git a/backend/src/database/repositories/memberRepository.ts b/backend/src/database/repositories/memberRepository.ts index 2a5e677185..e79fe35495 100644 --- a/backend/src/database/repositories/memberRepository.ts +++ b/backend/src/database/repositories/memberRepository.ts @@ -50,7 +50,7 @@ import { includeMemberToSegments, } from '@crowd/data-access-layer/src/members/segments' import { IDbMemberData } from '@crowd/data-access-layer/src/members/types' -import { optionsQx } from '@crowd/data-access-layer/src/queryExecutor' +import { optionsBgQx, optionsQx } from '@crowd/data-access-layer/src/queryExecutor' import { fetchManySegments, getSegmentMergeSuggestionCounts, @@ -1259,7 +1259,7 @@ class MemberRepository { let memberResponse = null const qx = optionsQx(options) - const bgQx = optionsQx({ ...options, transaction: null }) + const bgQx = optionsBgQx(options) memberResponse = await queryMembersAdvanced(qx, bgQx, options.redis, { filter: { id: { eq: id } }, diff --git a/backend/src/services/memberService.ts b/backend/src/services/memberService.ts index f069ed6b63..6e8f82bcee 100644 --- a/backend/src/services/memberService.ts +++ b/backend/src/services/memberService.ts @@ -21,7 +21,7 @@ import { insertMemberSegmentAggregates, queryMembersAdvanced, } from '@crowd/data-access-layer/src/members' -import { QueryExecutor, optionsQx } from '@crowd/data-access-layer/src/queryExecutor' +import { QueryExecutor, optionsBgQx, optionsQx } from '@crowd/data-access-layer/src/queryExecutor' import { decrementMemberMergeSuggestionCounts, fetchManySegments, @@ -944,7 +944,7 @@ export default class MemberService extends LoggerBase { async findAllAutocomplete(data) { const qx = optionsQx(this.options) - const bgQx = optionsQx({ ...this.options, transaction: null }) + const bgQx = optionsBgQx(this.options) return queryMembersAdvanced(qx, bgQx, this.options.redis, { filter: data.filter, @@ -970,7 +970,7 @@ export default class MemberService extends LoggerBase { } const qx = optionsQx(this.options) - const bgQx = optionsQx({ ...this.options, transaction: null }) + const bgQx = optionsBgQx(this.options) return queryMembersAdvanced(qx, bgQx, this.options.redis, { ...data, diff --git a/services/libs/data-access-layer/src/members/base.ts b/services/libs/data-access-layer/src/members/base.ts index 582a0d7812..767db22dcc 100644 --- a/services/libs/data-access-layer/src/members/base.ts +++ b/services/libs/data-access-layer/src/members/base.ts @@ -195,9 +195,12 @@ export async function queryMembersAdvanced( // Initialize cache const cache = new MemberQueryCache(redis) - // Build cache key + // Normalize search once: trim whitespace, lowercase (buildSearchCTE lowercases anyway), + // convert empty string to null so "" and null hash identically. + const normalizedSearch = search?.trim().toLowerCase() || null + + // Full result key — includes pagination and projection so page 1 and page 2 are separate entries. const cacheKey = cache.buildCacheKey({ - countOnly, fields, filter, include, @@ -205,18 +208,30 @@ export async function queryMembersAdvanced( limit, offset, orderBy, - search, + search: normalizedSearch, + segmentId, + }) + + // Count key — excludes pagination/projection and include flags since the count query + // only depends on filter, search, and segmentId (buildCountQuery hardcodes includeMemberOrgs=false). + const countCacheKey = cache.buildCountCacheKey({ + filter, + search: normalizedSearch, segmentId, }) // Try to get from cache first const cachedResult = countOnly ? null : await cache.get(cacheKey) - const cachedCount = countOnly ? null : await cache.getCount(cacheKey) + const cachedCount = countOnly ? await cache.getCount(countCacheKey) : null if (cachedResult) { + log.info( + { cacheKey, segmentId, search: normalizedSearch, limit, offset, orderBy }, + 'Members advanced query cache hit — returning cached result, scheduling background refresh', + ) refreshCacheInBackground(bgQx, redis, cacheKey, { filter, - search, + search: normalizedSearch, limit, offset, orderBy, @@ -227,22 +242,18 @@ export async function queryMembersAdvanced( includeAllAttributes, attributeSettings, }) - - log.info(`Members advanced query cache hit: ${cacheKey}`) return cachedResult } if (countOnly && cachedCount !== null) { - refreshCountCacheInBackground(bgQx, redis, cacheKey, { - filter, - search, - segmentId, - include, - includeAllAttributes, - attributeSettings, - }) - - log.debug(`Members advanced count query cache hit: ${cacheKey}`) + log.info( + { countCacheKey, segmentId, search: normalizedSearch }, + 'Members advanced count cache hit — returning cached count', + ) + // No background refresh for count hits: the count is a single integer with a 6h TTL, + // refreshing it on every hit would fire a COUNT(*) query per request, defeating the cache. + // The count is kept fresh by: (1) full-result refreshes that also write countCacheKey, + // (2) natural TTL expiry, (3) explicit cache invalidation on member updates. return { rows: [], count: cachedCount, @@ -251,19 +262,65 @@ export async function queryMembersAdvanced( } } - return await executeQuery(qx, redis, cacheKey, { - filter, - search, - limit, - offset, - orderBy, - segmentId, - countOnly, - fields, - include, - includeAllAttributes, - attributeSettings, - }) + log.info( + { + cacheKey, + countCacheKey, + segmentId, + search: normalizedSearch, + limit, + offset, + orderBy, + countOnly, + }, + 'Members advanced query cache miss — executing query synchronously', + ) + + try { + return await executeQuery(qx, redis, cacheKey, { + filter, + search: normalizedSearch, + limit, + offset, + orderBy, + segmentId, + countOnly, + fields, + include, + includeAllAttributes, + attributeSettings, + }) + } catch (error) { + log.warn( + { cacheKey, countCacheKey, segmentId, search: normalizedSearch, countOnly, err: error }, + 'Members advanced query failed on cache miss — scheduling background refresh for next retry', + ) + if (countOnly) { + refreshCountCacheInBackground(bgQx, redis, countCacheKey, { + filter, + search: normalizedSearch, + segmentId, + include, + includeAllAttributes, + attributeSettings, + }) + } else { + refreshCacheInBackground(bgQx, redis, cacheKey, { + filter, + search: normalizedSearch, + limit, + offset, + orderBy, + segmentId, + countOnly: false, + fields, + include, + includeAllAttributes, + attributeSettings, + }) + } + throw error + } } export async function executeQuery( @@ -299,6 +356,11 @@ export async function executeQuery( }: IQueryMembersAdvancedParams, ): Promise> { const cache = new MemberQueryCache(redis) + const countCacheKey = cache.buildCountCacheKey({ + filter, + search: search ?? null, + segmentId, + }) const withAggregates = !!segmentId const searchConfig = buildSearchCTE(search) @@ -343,15 +405,16 @@ export async function executeQuery( withAggregates, searchConfig, filterString, - includeMemberOrgs: include.memberOrganizations, + // Count never needs org data in SELECT — filterHasMo inside buildCountQuery already + // handles the case where the filter itself references mo.* columns. + includeMemberOrgs: false, }) if (countOnly) { const result = await qx.selectOne(countQuery, params) const count = parseInt(result.count, 10) - // Cache the count - await cache.setCount(cacheKey, count, 21600) // 6 hours TTL + await cache.setCount(countCacheKey, count, 21600) return { rows: [], @@ -393,16 +456,24 @@ export async function executeQuery( offset, }) + // Skip the count sub-query if the count is already cached — saves a full table scan + // on every page navigation after the first (page 2, 3, ... all share the same countCacheKey). + const cachedCount = await cache.getCount(countCacheKey) const [rows, countResult] = await Promise.all([ qx.select(mainQuery, params), - qx.selectOne(countQuery, params), + cachedCount === null ? qx.selectOne(countQuery, params) : Promise.resolve(null), ]) - const count = parseInt(countResult.count, 10) + const count = cachedCount !== null ? cachedCount : parseInt(countResult?.count ?? '0', 10) const memberIds = rows.map((org) => org.id) if (memberIds.length === 0) { - return { rows: [], count, limit, offset } + const emptyResult = { rows: [], count, limit, offset } + await Promise.all([ + cache.set(cacheKey, emptyResult, 21600), + cachedCount === null ? cache.setCount(countCacheKey, count, 21600) : Promise.resolve(), + ]) + return emptyResult } const [memberOrganizations, identities, memberSegments, maintainerRoles] = await Promise.all([ @@ -537,7 +608,10 @@ export async function executeQuery( const result = { rows, count, limit, offset } - await cache.set(cacheKey, result, 21600) // 6 hours TTL + await Promise.all([ + cache.set(cacheKey, result, 21600), + cache.setCount(countCacheKey, count, 21600), + ]) return result } @@ -547,26 +621,36 @@ async function refreshCacheInBackground( redis: RedisClient, cacheKey: string, params: IQueryMembersAdvancedParams, + countOnly = false, ): Promise { + const label = countOnly ? 'count cache' : 'query cache' + const cache = new MemberQueryCache(redis) + const acquired = await cache.tryAcquireRefreshLock(cacheKey) + if (!acquired) { + log.debug( + { cacheKey }, + `Members advanced ${label} refresh already in progress — skipping duplicate`, + ) + return + } try { - await executeQuery(qx, redis, cacheKey, params) + log.info({ cacheKey }, `Members advanced ${label} background refresh started`) + await executeQuery(qx, redis, cacheKey, countOnly ? { ...params, countOnly: true } : params) + log.info({ cacheKey }, `Members advanced ${label} background refresh completed`) } catch (error) { - log.warn('Background cache refresh failed:', error) + log.warn({ cacheKey, err: error }, `Members advanced ${label} background refresh failed`) + } finally { + await cache.releaseRefreshLock(cacheKey) } } -async function refreshCountCacheInBackground( +function refreshCountCacheInBackground( qx: QueryExecutor, redis: RedisClient, cacheKey: string, params: IQueryMembersAdvancedParams, ): Promise { - try { - log.info(`Refreshing members advanced count cache in background: ${cacheKey}`) - await executeQuery(qx, redis, cacheKey, { ...params, countOnly: true }) - } catch (error) { - log.warn('Background count cache refresh failed:', error) - } + return refreshCacheInBackground(qx, redis, cacheKey, params, true) } export async function queryMembers( diff --git a/services/libs/data-access-layer/src/members/queryBuilder.ts b/services/libs/data-access-layer/src/members/queryBuilder.ts index 1f31b3e4a2..0eb2f044a3 100644 --- a/services/libs/data-access-layer/src/members/queryBuilder.ts +++ b/services/libs/data-access-layer/src/members/queryBuilder.ts @@ -422,12 +422,14 @@ export const buildQuery = ({ Boolean, ) + const needsMemberEnrichments = filterHasMe || fields.includes('me.') + const joins = [ withAggregates ? `INNER JOIN "memberSegmentsAgg" msa ON msa."memberId" = m.id AND msa."segmentId" = $(segmentId)` : '', needsMemberOrgs ? `LEFT JOIN member_orgs mo ON mo."memberId" = m.id` : '', - `LEFT JOIN "memberEnrichments" me ON me."memberId" = m.id`, + needsMemberEnrichments ? `LEFT JOIN "memberEnrichments" me ON me."memberId" = m.id` : '', searchConfig.join, ].filter(Boolean) @@ -464,9 +466,15 @@ export const buildCountQuery = ({ searchConfig.join, ].filter(Boolean) + // COUNT(*) is safe only when every join is guaranteed one-to-one per member: + // - memberSegmentsAgg is unique on (memberId, segmentId) with segmentId fixed → safe + // - member_orgs CTE uses ARRAY_AGG GROUP BY memberId → one row per member → safe + // - memberEnrichments may have multiple rows per member → COUNT(*) would overcount + const countExpr = withAggregates && !filterHasMe ? 'COUNT(*)' : 'COUNT(DISTINCT m.id)' + return ` ${ctes.length > 0 ? `WITH ${ctes.join(',\n')}` : ''} - SELECT COUNT(DISTINCT m.id) AS count + SELECT ${countExpr} AS count FROM members m ${joins.join('\n')} WHERE (${filterString}) diff --git a/services/libs/data-access-layer/src/members/queryCache.ts b/services/libs/data-access-layer/src/members/queryCache.ts index ba9ceda98d..6eeaff8673 100644 --- a/services/libs/data-access-layer/src/members/queryCache.ts +++ b/services/libs/data-access-layer/src/members/queryCache.ts @@ -1,4 +1,4 @@ -import { createHash } from 'crypto' +import { createHash, randomBytes } from 'crypto' import { getServiceLogger } from '@crowd/logging' import { RedisCache, RedisClient } from '@crowd/redis' @@ -19,27 +19,48 @@ const log = getServiceLogger() export class MemberQueryCache { private cache: RedisCache private countCache: RedisCache + private lockCache: RedisCache constructor(redis: RedisClient) { this.cache = new RedisCache('members-advanced', redis, log) this.countCache = new RedisCache('members-count', redis, log) + this.lockCache = new RedisCache('members-refresh-lock', redis, log) + } + + // Returns true if lock was acquired (no other refresh in progress for this key). + // Uses a cryptographically random token to distinguish "we set it" from "already existed". + // TTL ensures the lock auto-expires if the refresh crashes without releasing it. + async tryAcquireRefreshLock(cacheKey: string, ttlSeconds = 90): Promise { + try { + const token = randomBytes(16).toString('hex') + const stored = await this.lockCache.setIfNotExistsOrGet(cacheKey, token, ttlSeconds) + return stored === token + } catch { + return true // fail open: if Redis is down, let the refresh proceed + } + } + + async releaseRefreshLock(cacheKey: string): Promise { + try { + await this.lockCache.delete(cacheKey) + } catch { + // best effort + } } buildCacheKey(params: { - countOnly?: boolean fields?: string[] filter?: Record include?: IncludeOptions includeAllAttributes?: boolean - limit: number - offset: number + limit?: number + offset?: number orderBy?: string search?: string segmentId?: string }): string { const cleanParams = Object.fromEntries( Object.entries({ - countOnly: params.countOnly, fields: params.fields?.sort(), filter: params.filter, include: params.include, @@ -63,6 +84,18 @@ export class MemberQueryCache { return `members_advanced:${hash}` } + buildCountCacheKey(params: { + filter?: Record + search?: string + segmentId?: string + }): string { + return this.buildCacheKey({ + filter: params.filter, + search: params.search, + segmentId: params.segmentId, + }) + } + async get(cacheKey: string): Promise | null> { try { const cachedResult = await this.cache.get(cacheKey) @@ -85,22 +118,34 @@ export class MemberQueryCache { } async getCount(cacheKey: string): Promise { - const cachedCount = await this.countCache.get(cacheKey) - return cachedCount ? parseInt(cachedCount, 10) : null + try { + const cachedCount = await this.countCache.get(cacheKey) + if (!cachedCount) return null + const parsed = parseInt(cachedCount, 10) + return isNaN(parsed) ? null : parsed + } catch (error) { + log.warn('Error retrieving count from cache', { error }) + return null + } } async setCount(cacheKey: string, count: number, ttlSeconds: number): Promise { - await this.countCache.set(cacheKey, count.toString(), ttlSeconds) + try { + await this.countCache.set(cacheKey, count.toString(), ttlSeconds) + } catch (error) { + log.warn('Error saving count to cache', { error }) + } } async invalidateAll(): Promise { try { - const [resultsDeleted, countsDeleted] = await Promise.all([ + const [resultsDeleted, countsDeleted, locksDeleted] = await Promise.all([ this.cache.deleteAll(), this.countCache.deleteAll(), + this.lockCache.deleteAll(), ]) log.info( - `Invalidated member query cache: ${resultsDeleted} result entries, ${countsDeleted} count entries`, + `Invalidated member query cache: ${resultsDeleted} result entries, ${countsDeleted} count entries, ${locksDeleted} locks`, ) } catch (error) { log.warn('Error invalidating member query cache', { error }) @@ -109,9 +154,13 @@ export class MemberQueryCache { async invalidateByPattern(pattern: string): Promise { try { - const keysDeleted = await this.cache.deleteByKeyPattern(pattern) + const [resultsDeleted, countsDeleted, locksDeleted] = await Promise.all([ + this.cache.deleteByKeyPattern(pattern), + this.countCache.deleteByKeyPattern(pattern), + this.lockCache.deleteByKeyPattern(pattern), + ]) log.info( - `Invalidated member query cache by pattern: ${keysDeleted} entries deleted for pattern ${pattern}`, + `Invalidated member query cache by pattern: ${resultsDeleted} result entries, ${countsDeleted} count entries, ${locksDeleted} locks deleted for pattern ${pattern}`, ) } catch (error) { log.warn('Error invalidating member query cache by pattern', { error, pattern }) diff --git a/services/libs/data-access-layer/src/queryExecutor.ts b/services/libs/data-access-layer/src/queryExecutor.ts index 298b997507..10650e27a7 100644 --- a/services/libs/data-access-layer/src/queryExecutor.ts +++ b/services/libs/data-access-layer/src/queryExecutor.ts @@ -20,10 +20,16 @@ export function formatQuery(query: string, params?: object): string { } export class SequelizeQueryExecutor implements QueryExecutor { - constructor(private readonly sequelize: Sequelize) {} + constructor( + private readonly sequelize: Sequelize, + private readonly noTransaction = false, + ) {} protected prepareOptions(options: any): any { - return options + // When noTransaction=true, explicitly opt out of any CLS or implicit + // transaction binding — used for background/fire-and-forget work that + // must not inherit a parent request's transaction. + return this.noTransaction ? { ...options, transaction: null } : options } select(query: string, params?: object): Promise { @@ -166,3 +172,12 @@ export function optionsQx(options: any): QueryExecutor { return new SequelizeQueryExecutor(seq) } + +/** + * Creates a QueryExecutor for fire-and-forget background work. + * Always runs outside any transaction — safe to use after the caller's + * request transaction has been committed. + */ +export function optionsBgQx(options: any): QueryExecutor { + return new SequelizeQueryExecutor(options.database.sequelize, true) +}