Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/src/database/repositories/memberRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import {
includeMemberToSegments,
} from '@crowd/data-access-layer/src/members/segments'
import { IDbMemberData } from '@crowd/data-access-layer/src/members/types'
import { optionsQx } from '@crowd/data-access-layer/src/queryExecutor'
import { optionsBgQx, optionsQx } from '@crowd/data-access-layer/src/queryExecutor'
import {
fetchManySegments,
getSegmentMergeSuggestionCounts,
Expand Down Expand Up @@ -1259,7 +1259,7 @@ class MemberRepository {
let memberResponse = null

const qx = optionsQx(options)
const bgQx = optionsQx({ ...options, transaction: null })
const bgQx = optionsBgQx(options)

memberResponse = await queryMembersAdvanced(qx, bgQx, options.redis, {
filter: { id: { eq: id } },
Expand Down
6 changes: 3 additions & 3 deletions backend/src/services/memberService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
insertMemberSegmentAggregates,
queryMembersAdvanced,
} from '@crowd/data-access-layer/src/members'
import { QueryExecutor, optionsQx } from '@crowd/data-access-layer/src/queryExecutor'
import { QueryExecutor, optionsBgQx, optionsQx } from '@crowd/data-access-layer/src/queryExecutor'
import {
decrementMemberMergeSuggestionCounts,
fetchManySegments,
Expand Down Expand Up @@ -944,7 +944,7 @@ export default class MemberService extends LoggerBase {

async findAllAutocomplete(data) {
const qx = optionsQx(this.options)
const bgQx = optionsQx({ ...this.options, transaction: null })
const bgQx = optionsBgQx(this.options)

return queryMembersAdvanced(qx, bgQx, this.options.redis, {
filter: data.filter,
Expand All @@ -970,7 +970,7 @@ export default class MemberService extends LoggerBase {
}

const qx = optionsQx(this.options)
const bgQx = optionsQx({ ...this.options, transaction: null })
const bgQx = optionsBgQx(this.options)

return queryMembersAdvanced(qx, bgQx, this.options.redis, {
...data,
Expand Down
176 changes: 130 additions & 46 deletions services/libs/data-access-layer/src/members/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,28 +195,43 @@ export async function queryMembersAdvanced(
// Initialize cache
const cache = new MemberQueryCache(redis)

// Build cache key
// Normalize search once: trim whitespace, lowercase (buildSearchCTE lowercases anyway),
// convert empty string to null so "" and null hash identically.
const normalizedSearch = search?.trim().toLowerCase() || null

// Full result key — includes pagination and projection so page 1 and page 2 are separate entries.
const cacheKey = cache.buildCacheKey({
countOnly,
fields,
filter,
include,
includeAllAttributes,
limit,
offset,
orderBy,
search,
search: normalizedSearch,
segmentId,
})

// Count key — excludes pagination/projection and include flags since the count query
// only depends on filter, search, and segmentId (buildCountQuery hardcodes includeMemberOrgs=false).
const countCacheKey = cache.buildCountCacheKey({
filter,
search: normalizedSearch,
segmentId,
Comment on lines 208 to 220
})

// Try to get from cache first
const cachedResult = countOnly ? null : await cache.get(cacheKey)
const cachedCount = countOnly ? null : await cache.getCount(cacheKey)
const cachedCount = countOnly ? await cache.getCount(countCacheKey) : null

if (cachedResult) {
log.info(
{ cacheKey, segmentId, search: normalizedSearch, limit, offset, orderBy },
'Members advanced query cache hit — returning cached result, scheduling background refresh',
)
refreshCacheInBackground(bgQx, redis, cacheKey, {
filter,
search,
search: normalizedSearch,
limit,
offset,
orderBy,
Expand All @@ -227,22 +242,18 @@ export async function queryMembersAdvanced(
includeAllAttributes,
attributeSettings,
})

log.info(`Members advanced query cache hit: ${cacheKey}`)
return cachedResult
}

if (countOnly && cachedCount !== null) {
refreshCountCacheInBackground(bgQx, redis, cacheKey, {
filter,
search,
segmentId,
include,
includeAllAttributes,
attributeSettings,
})

log.debug(`Members advanced count query cache hit: ${cacheKey}`)
log.info(
{ countCacheKey, segmentId, search: normalizedSearch },
'Members advanced count cache hit — returning cached count',
)
Comment on lines +249 to +252
// No background refresh for count hits: the count is a single integer with a 6h TTL,
// refreshing it on every hit would fire a COUNT(*) query per request, defeating the cache.
// The count is kept fresh by: (1) full-result refreshes that also write countCacheKey,
// (2) natural TTL expiry, (3) explicit cache invalidation on member updates.
return {
rows: [],
count: cachedCount,
Expand All @@ -251,19 +262,65 @@ export async function queryMembersAdvanced(
}
}

return await executeQuery(qx, redis, cacheKey, {
filter,
search,
limit,
offset,
orderBy,
segmentId,
countOnly,
fields,
include,
includeAllAttributes,
attributeSettings,
})
log.info(
{
cacheKey,
countCacheKey,
segmentId,
search: normalizedSearch,
limit,
offset,
orderBy,
countOnly,
},
'Members advanced query cache miss — executing query synchronously',
)

try {
return await executeQuery(qx, redis, cacheKey, {
filter,
search: normalizedSearch,
limit,
offset,
orderBy,
segmentId,
countOnly,
fields,
include,
includeAllAttributes,
attributeSettings,
})
} catch (error) {
log.warn(
{ cacheKey, countCacheKey, segmentId, search: normalizedSearch, countOnly, err: error },
'Members advanced query failed on cache miss — scheduling background refresh for next retry',
)
if (countOnly) {
refreshCountCacheInBackground(bgQx, redis, countCacheKey, {
filter,
search: normalizedSearch,
segmentId,
include,
includeAllAttributes,
attributeSettings,
})
} else {
refreshCacheInBackground(bgQx, redis, cacheKey, {
filter,
search: normalizedSearch,
limit,
offset,
orderBy,
segmentId,
countOnly: false,
fields,
include,
includeAllAttributes,
attributeSettings,
})
}
throw error
}
}

export async function executeQuery(
Expand Down Expand Up @@ -299,6 +356,11 @@ export async function executeQuery(
}: IQueryMembersAdvancedParams,
): Promise<PageData<IDbMemberData>> {
const cache = new MemberQueryCache(redis)
const countCacheKey = cache.buildCountCacheKey({
filter,
search: search ?? null,
segmentId,
})
const withAggregates = !!segmentId
const searchConfig = buildSearchCTE(search)

Expand Down Expand Up @@ -343,15 +405,16 @@ export async function executeQuery(
withAggregates,
searchConfig,
filterString,
includeMemberOrgs: include.memberOrganizations,
// Count never needs org data in SELECT — filterHasMo inside buildCountQuery already
// handles the case where the filter itself references mo.* columns.
includeMemberOrgs: false,
})

if (countOnly) {
const result = await qx.selectOne(countQuery, params)
const count = parseInt(result.count, 10)

// Cache the count
await cache.setCount(cacheKey, count, 21600) // 6 hours TTL
await cache.setCount(countCacheKey, count, 21600)
Comment thread
cursor[bot] marked this conversation as resolved.

return {
rows: [],
Expand Down Expand Up @@ -393,16 +456,24 @@ export async function executeQuery(
offset,
})

// Skip the count sub-query if the count is already cached — saves a full table scan
// on every page navigation after the first (page 2, 3, ... all share the same countCacheKey).
const cachedCount = await cache.getCount(countCacheKey)
const [rows, countResult] = await Promise.all([
qx.select(mainQuery, params),
qx.selectOne(countQuery, params),
cachedCount === null ? qx.selectOne(countQuery, params) : Promise.resolve(null),
])

const count = parseInt(countResult.count, 10)
const count = cachedCount !== null ? cachedCount : parseInt(countResult?.count ?? '0', 10)
const memberIds = rows.map((org) => org.id)

if (memberIds.length === 0) {
return { rows: [], count, limit, offset }
const emptyResult = { rows: [], count, limit, offset }
await Promise.all([
cache.set(cacheKey, emptyResult, 21600),
cachedCount === null ? cache.setCount(countCacheKey, count, 21600) : Promise.resolve(),
])
return emptyResult
}

const [memberOrganizations, identities, memberSegments, maintainerRoles] = await Promise.all([
Expand Down Expand Up @@ -537,7 +608,10 @@ export async function executeQuery(

const result = { rows, count, limit, offset }

await cache.set(cacheKey, result, 21600) // 6 hours TTL
await Promise.all([
cache.set(cacheKey, result, 21600),
cache.setCount(countCacheKey, count, 21600),
])

return result
}
Expand All @@ -547,26 +621,36 @@ async function refreshCacheInBackground(
redis: RedisClient,
cacheKey: string,
params: IQueryMembersAdvancedParams,
countOnly = false,
): Promise<void> {
const label = countOnly ? 'count cache' : 'query cache'
const cache = new MemberQueryCache(redis)
const acquired = await cache.tryAcquireRefreshLock(cacheKey)
if (!acquired) {
log.debug(
{ cacheKey },
`Members advanced ${label} refresh already in progress — skipping duplicate`,
)
return
}
try {
await executeQuery(qx, redis, cacheKey, params)
log.info({ cacheKey }, `Members advanced ${label} background refresh started`)
await executeQuery(qx, redis, cacheKey, countOnly ? { ...params, countOnly: true } : params)
log.info({ cacheKey }, `Members advanced ${label} background refresh completed`)
} catch (error) {
log.warn('Background cache refresh failed:', error)
log.warn({ cacheKey, err: error }, `Members advanced ${label} background refresh failed`)
} finally {
await cache.releaseRefreshLock(cacheKey)
}
}

async function refreshCountCacheInBackground(
function refreshCountCacheInBackground(
qx: QueryExecutor,
redis: RedisClient,
cacheKey: string,
params: IQueryMembersAdvancedParams,
): Promise<void> {
try {
log.info(`Refreshing members advanced count cache in background: ${cacheKey}`)
await executeQuery(qx, redis, cacheKey, { ...params, countOnly: true })
} catch (error) {
log.warn('Background count cache refresh failed:', error)
}
return refreshCacheInBackground(qx, redis, cacheKey, params, true)
}

export async function queryMembers<T extends MemberField>(
Expand Down
12 changes: 10 additions & 2 deletions services/libs/data-access-layer/src/members/queryBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,12 +422,14 @@ export const buildQuery = ({
Boolean,
)

const needsMemberEnrichments = filterHasMe || fields.includes('me.')

const joins = [
withAggregates
? `INNER JOIN "memberSegmentsAgg" msa ON msa."memberId" = m.id AND msa."segmentId" = $(segmentId)`
: '',
needsMemberOrgs ? `LEFT JOIN member_orgs mo ON mo."memberId" = m.id` : '',
`LEFT JOIN "memberEnrichments" me ON me."memberId" = m.id`,
needsMemberEnrichments ? `LEFT JOIN "memberEnrichments" me ON me."memberId" = m.id` : '',
searchConfig.join,
].filter(Boolean)

Expand Down Expand Up @@ -464,9 +466,15 @@ export const buildCountQuery = ({
searchConfig.join,
].filter(Boolean)

// COUNT(*) is safe only when every join is guaranteed one-to-one per member:
// - memberSegmentsAgg is unique on (memberId, segmentId) with segmentId fixed → safe
// - member_orgs CTE uses ARRAY_AGG GROUP BY memberId → one row per member → safe
// - memberEnrichments may have multiple rows per member → COUNT(*) would overcount
const countExpr = withAggregates && !filterHasMe ? 'COUNT(*)' : 'COUNT(DISTINCT m.id)'

return `
${ctes.length > 0 ? `WITH ${ctes.join(',\n')}` : ''}
SELECT COUNT(DISTINCT m.id) AS count
SELECT ${countExpr} AS count
Comment thread
cursor[bot] marked this conversation as resolved.
FROM members m
${joins.join('\n')}
WHERE (${filterString})
Expand Down
Loading
Loading