Skip to content

Commit 8b2385e

Browse files
authored
chore(jobsdb): extend no results cache to support partition ids (rudderlabs#6529)
🔒 Scanned for secrets using gitleaks 8.28.0 # Description Refactoring `NoResultsCache` to support partition id caching. Even though we are introducing support for caching partition ids, currently: - No queries are leveraging partition id filtering, i.e. all queries are using the wildcard (`*`) branch. - No full dataset branch invalidation can be triggered due to partition ids, since: - while updating job statuses we are not capturing the actual partition id of the job that we are updating, but using `none` instead. - while storing new jobs, if partition id is missing we are falling back to `none`. ## Linear Ticket resolves PIPE-2602 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 167cafa commit 8b2385e

4 files changed

Lines changed: 742 additions & 537 deletions

File tree

jobsdb/internal/cache/cache.go

Lines changed: 139 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -57,50 +57,58 @@ type NoResultsCache[T ParameterFilter] struct {
5757
cacheTree cacheTree // a hierarchical tree of cache entries
5858
}
5959

60-
// Get returns true if the cache contains a valid entry for the provided dataset, workspace, customVals, states and parameters filters.
61-
func (c *NoResultsCache[T]) Get(dataset, workspace string, customVals, states []string, parameters []T) bool {
60+
// Get returns true if the cache contains a valid entry for the provided dataset, partitions, workspace, customVals, states and parameters filters.
61+
func (c *NoResultsCache[T]) Get(dataset string, partitions []string, workspace string, customVals, states []string, parameters []T) bool {
6262
if c.skipCache(states, parameters) {
6363
return false
6464
}
65-
workspace, states, customVals, params := filtersToCacheKeys(workspace, states, customVals, parameters)
65+
partitionKeys, workspace, states, customVals, params := filtersToCacheKeys(partitions, workspace, states, customVals, parameters)
6666

6767
c.cacheTreeMu.RLock()
6868
defer c.cacheTreeMu.RUnlock()
6969

7070
if _, ok := c.cacheTree[dataset]; !ok {
7171
return false
7272
}
73-
if _, ok := c.cacheTree[dataset][workspace]; !ok {
74-
return false
75-
}
76-
for _, customVal := range customVals {
77-
if _, ok := c.cacheTree[dataset][workspace][customVal]; !ok {
73+
74+
for _, partition := range partitionKeys {
75+
if _, ok := c.cacheTree[dataset][partition]; !ok {
7876
return false
7977
}
80-
for _, state := range states {
81-
if _, ok := c.cacheTree[dataset][workspace][customVal][state]; !ok {
78+
if _, ok := c.cacheTree[dataset][partition][workspace]; !ok {
79+
return false
80+
}
81+
for _, customVal := range customVals {
82+
if _, ok := c.cacheTree[dataset][partition][workspace][customVal]; !ok {
8283
return false
8384
}
84-
for _, param := range params {
85-
if mark, ok := c.cacheTree[dataset][workspace][customVal][state][param]; !ok || !mark.noJobs || time.Now().After(mark.t.Add(c.ttl())) {
85+
for _, state := range states {
86+
if _, ok := c.cacheTree[dataset][partition][workspace][customVal][state]; !ok {
8687
return false
8788
}
89+
for _, param := range params {
90+
if mark, ok := c.cacheTree[dataset][partition][workspace][customVal][state][param]; !ok || !mark.noJobs || time.Now().After(mark.t.Add(c.ttl())) {
91+
return false
92+
}
93+
}
8894
}
8995
}
9096
}
97+
9198
return true
9299
}
93100

94-
// Invalidate invalidates all cache entries for the provided dataset, workspace, customVals, states and parameters.
95-
func (c *NoResultsCache[T]) Invalidate(dataset, workspace string, customVals, states []string, parameters []T) {
101+
// Invalidate invalidates all cache entries for the provided dataset, partitions, workspace, customVals, states and parameters.
102+
func (c *NoResultsCache[T]) Invalidate(dataset string, partitions []string, workspace string, customVals, states []string, parameters []T) {
96103
c.cacheTreeMu.Lock()
97104
defer c.cacheTreeMu.Unlock()
98-
workspaces, states, customVals, params := c.filtersToInvalidationKeys(workspace, states, customVals, parameters)
105+
partitions, workspaces, states, customVals, params := c.filtersToInvalidationKeys(partitions, workspace, states, customVals, parameters)
99106

100-
if len(workspaces) == 0 { // if no workspace is provided, invalidate all by deleting the workspace's parent node
107+
if len(partitions) == 0 { // if no partitions are provided, invalidate all by deleting the partitions's parent node
101108
if c.warnOnBranchInvalidation.Load() && (len(customVals) > 0 || len(states) > 0 || len(parameters) > 0) {
102109
c.logger.Warnn("Invalidating entire dataset",
103110
logger.NewStringField("dataset", dataset),
111+
logger.NewStringField("partitions", strings.Join(partitions, ",")),
104112
logger.NewStringField("workspace", workspace),
105113
logger.NewStringField("customVals", strings.Join(customVals, ",")),
106114
logger.NewStringField("states", strings.Join(states, ",")),
@@ -113,68 +121,90 @@ func (c *NoResultsCache[T]) Invalidate(dataset, workspace string, customVals, st
113121
if _, ok := c.cacheTree[dataset]; !ok {
114122
return
115123
}
116-
for _, workspace := range workspaces {
117-
if len(customVals) == 0 { // if no custom value is provided, invalidate all by deleting the customVal's parent node
118-
if c.warnOnBranchInvalidation.Load() {
119-
c.logger.Warnn("Invalidating entire workspace branch",
124+
for _, partition := range partitions {
125+
if len(workspaces) == 0 { // if no workspace is provided, invalidate all by deleting the workspace's parent node
126+
if c.warnOnBranchInvalidation.Load() && (len(customVals) > 0 || len(states) > 0 || len(parameters) > 0) {
127+
c.logger.Warnn("Invalidating entire partition",
120128
logger.NewStringField("dataset", dataset),
129+
logger.NewStringField("partition", partition),
121130
logger.NewStringField("workspace", workspace),
122131
logger.NewStringField("customVals", strings.Join(customVals, ",")),
123132
logger.NewStringField("states", strings.Join(states, ",")),
124133
logger.NewStringField("parameters", strings.Join(lo.Map(parameters, func(pf T, _ int) string { return pf.GetName() + ":" + pf.GetValue() }), ",")),
125134
)
126135
}
127-
delete(c.cacheTree[dataset], workspace)
136+
delete(c.cacheTree[dataset], partition)
128137
continue
129138
}
130-
if _, ok := c.cacheTree[dataset][workspace]; !ok {
139+
if _, ok := c.cacheTree[dataset][partition]; !ok {
131140
continue
132141
}
133-
for _, customVal := range customVals {
134-
if len(states) == 0 { // if no state is provided, invalidate all by deleting the state's parent node
142+
for _, workspace := range workspaces {
143+
if len(customVals) == 0 { // if no custom value is provided, invalidate all by deleting the customVal's parent node
135144
if c.warnOnBranchInvalidation.Load() {
136-
c.logger.Warnn("Invalidating entire customVal branch",
145+
c.logger.Warnn("Invalidating entire workspace branch",
137146
logger.NewStringField("dataset", dataset),
147+
logger.NewStringField("partition", partition),
138148
logger.NewStringField("workspace", workspace),
139-
logger.NewStringField("customVal", customVal),
149+
logger.NewStringField("customVals", strings.Join(customVals, ",")),
140150
logger.NewStringField("states", strings.Join(states, ",")),
141151
logger.NewStringField("parameters", strings.Join(lo.Map(parameters, func(pf T, _ int) string { return pf.GetName() + ":" + pf.GetValue() }), ",")),
142152
)
143153
}
144-
delete(c.cacheTree[dataset][workspace], customVal)
154+
delete(c.cacheTree[dataset][partition], workspace)
145155
continue
146156
}
147-
if _, ok := c.cacheTree[dataset][workspace][customVal]; !ok {
157+
if _, ok := c.cacheTree[dataset][partition][workspace]; !ok {
148158
continue
149159
}
150-
for _, state := range states {
151-
if len(params) == 0 { // if no parameter is provided, invalidate all by deleting the param's parent node
160+
for _, customVal := range customVals {
161+
if len(states) == 0 { // if no state is provided, invalidate all by deleting the state's parent node
152162
if c.warnOnBranchInvalidation.Load() {
153-
c.logger.Warnn("Invalidating entire state branch",
163+
c.logger.Warnn("Invalidating entire customVal branch",
154164
logger.NewStringField("dataset", dataset),
165+
logger.NewStringField("partition", partition),
155166
logger.NewStringField("workspace", workspace),
156167
logger.NewStringField("customVal", customVal),
157-
logger.NewStringField("state", state),
168+
logger.NewStringField("states", strings.Join(states, ",")),
158169
logger.NewStringField("parameters", strings.Join(lo.Map(parameters, func(pf T, _ int) string { return pf.GetName() + ":" + pf.GetValue() }), ",")),
159170
)
160171
}
161-
delete(c.cacheTree[dataset][workspace][customVal], state)
172+
delete(c.cacheTree[dataset][partition][workspace], customVal)
162173
continue
163174
}
164-
if _, ok := c.cacheTree[dataset][workspace][customVal][state]; !ok {
175+
if _, ok := c.cacheTree[dataset][partition][workspace][customVal]; !ok {
165176
continue
166177
}
167-
for _, param := range params {
168-
if c.warnOnBranchInvalidation.Load() { // if logging is enabled, log the invalidation of the leaf node at debug level, since this is the most granular level
169-
c.logger.Debugn("Invalidating leaf",
170-
logger.NewStringField("dataset", dataset),
171-
logger.NewStringField("workspace", workspace),
172-
logger.NewStringField("customVal", customVal),
173-
logger.NewStringField("state", state),
174-
logger.NewStringField("parameter", param),
175-
)
178+
for _, state := range states {
179+
if len(params) == 0 { // if no parameter is provided, invalidate all by deleting the param's parent node
180+
if c.warnOnBranchInvalidation.Load() {
181+
c.logger.Warnn("Invalidating entire state branch",
182+
logger.NewStringField("dataset", dataset),
183+
logger.NewStringField("partition", partition),
184+
logger.NewStringField("workspace", workspace),
185+
logger.NewStringField("customVal", customVal),
186+
logger.NewStringField("state", state),
187+
logger.NewStringField("parameters", strings.Join(lo.Map(parameters, func(pf T, _ int) string { return pf.GetName() + ":" + pf.GetValue() }), ",")),
188+
)
189+
}
190+
delete(c.cacheTree[dataset][partition][workspace][customVal], state)
191+
continue
192+
}
193+
if _, ok := c.cacheTree[dataset][partition][workspace][customVal][state]; !ok {
194+
continue
195+
}
196+
for _, param := range params {
197+
if c.warnOnBranchInvalidation.Load() { // if logging is enabled, log the invalidation of the leaf node at debug level, since this is the most granular level
198+
c.logger.Debugn("Invalidating leaf",
199+
logger.NewStringField("dataset", dataset),
200+
logger.NewStringField("workspace", workspace),
201+
logger.NewStringField("customVal", customVal),
202+
logger.NewStringField("state", state),
203+
logger.NewStringField("parameter", param),
204+
)
205+
}
206+
delete(c.cacheTree[dataset][partition][workspace][customVal][state], param)
176207
}
177-
delete(c.cacheTree[dataset][workspace][customVal][state], param)
178208
}
179209
}
180210
}
@@ -183,15 +213,16 @@ func (c *NoResultsCache[T]) Invalidate(dataset, workspace string, customVals, st
183213

184214
// InvalidateDataset invalidates all cache entries for a given dataset.
185215
func (c *NoResultsCache[T]) InvalidateDataset(dataset string) {
186-
c.Invalidate(dataset, "", nil, nil, nil)
216+
c.Invalidate(dataset, nil, "", nil, nil, nil)
187217
}
188218

189219
// StartNoResultTx prepares the cache for accepting new no result entries.
190220
// The cache uses a special marker to prevent synchronisation issues between competing calls of Invalidate & SetNoResult.
191-
func (c *NoResultsCache[T]) StartNoResultTx(dataset, workspace string, customVals, states []string, parameters []T) (tx *NoResultTx[T]) {
221+
func (c *NoResultsCache[T]) StartNoResultTx(dataset string, partitions []string, workspace string, customVals, states []string, parameters []T) (tx *NoResultTx[T]) {
192222
tx = &NoResultTx[T]{
193223
id: uuid.New().String(),
194224
dataset: dataset,
225+
partitions: partitions,
195226
workspace: workspace,
196227
customVals: customVals,
197228
states: states,
@@ -201,39 +232,47 @@ func (c *NoResultsCache[T]) StartNoResultTx(dataset, workspace string, customVal
201232
if c.skipCache(states, parameters) {
202233
return tx
203234
}
204-
workspace, states, customVals, params := filtersToCacheKeys(workspace, states, customVals, parameters)
235+
partitions, workspace, states, customVals, params := filtersToCacheKeys(partitions, workspace, states, customVals, parameters)
205236

206237
c.cacheTreeMu.Lock()
207238
defer c.cacheTreeMu.Unlock()
208239

209240
if _, ok := c.cacheTree[dataset]; !ok {
210-
c.cacheTree[dataset] = map[string]map[string]map[string]map[string]cacheEntry{}
241+
c.cacheTree[dataset] = map[string]map[string]map[string]map[string]map[string]cacheEntry{}
211242
}
212-
if _, ok := c.cacheTree[dataset][workspace]; !ok {
213-
c.cacheTree[dataset][workspace] = map[string]map[string]map[string]cacheEntry{}
214-
}
215-
for _, customVal := range customVals {
216-
if _, ok := c.cacheTree[dataset][workspace][customVal]; !ok {
217-
c.cacheTree[dataset][workspace][customVal] = map[string]map[string]cacheEntry{}
243+
for _, partition := range partitions {
244+
if _, ok := c.cacheTree[dataset][partition]; !ok {
245+
c.cacheTree[dataset][partition] = map[string]map[string]map[string]map[string]cacheEntry{}
246+
}
247+
if _, ok := c.cacheTree[dataset][partition][workspace]; !ok {
248+
c.cacheTree[dataset][partition][workspace] = map[string]map[string]map[string]cacheEntry{}
218249
}
219-
for _, state := range states {
220-
if _, ok := c.cacheTree[dataset][workspace][customVal][state]; !ok {
221-
c.cacheTree[dataset][workspace][customVal][state] = map[string]cacheEntry{}
250+
for _, customVal := range customVals {
251+
if _, ok := c.cacheTree[dataset][partition][workspace][customVal]; !ok {
252+
c.cacheTree[dataset][partition][workspace][customVal] = map[string]map[string]cacheEntry{}
222253
}
223-
for _, param := range params {
224-
e := c.cacheTree[dataset][workspace][customVal][state][param]
225-
e.AddToken(tx.id)
226-
c.cacheTree[dataset][workspace][customVal][state][param] = e
254+
for _, state := range states {
255+
if _, ok := c.cacheTree[dataset][partition][workspace][customVal][state]; !ok {
256+
c.cacheTree[dataset][partition][workspace][customVal][state] = map[string]cacheEntry{}
257+
}
258+
for _, param := range params {
259+
e := c.cacheTree[dataset][partition][workspace][customVal][state][param]
260+
e.AddToken(tx.id)
261+
c.cacheTree[dataset][partition][workspace][customVal][state][param] = e
262+
}
227263
}
228264
}
229265
}
266+
230267
return tx
231268
}
232269

233270
// NoResultTx is a transaction for the NoResultsCache.
234271
type NoResultTx[T ParameterFilter] struct {
235272
id string
236-
dataset, workspace string
273+
dataset string
274+
partitions []string
275+
workspace string
237276
customVals, states []string
238277
parameters []T
239278
c *NoResultsCache[T]
@@ -245,38 +284,47 @@ func (tx *NoResultTx[T]) Commit() bool {
245284
if tx.c.skipCache(tx.states, tx.parameters) {
246285
return false
247286
}
248-
workspace, states, customVals, params := filtersToCacheKeys(tx.workspace, tx.states, tx.customVals, tx.parameters)
287+
partitions, workspace, states, customVals, params := filtersToCacheKeys(tx.partitions, tx.workspace, tx.states, tx.customVals, tx.parameters)
249288

250289
tx.c.cacheTreeMu.Lock()
251290
defer tx.c.cacheTreeMu.Unlock()
252291

253292
if _, ok := tx.c.cacheTree[tx.dataset]; !ok {
254293
return false
255294
}
256-
if _, ok := tx.c.cacheTree[tx.dataset][workspace]; !ok {
257-
return false
258-
}
295+
259296
var missed bool
260-
for _, customVal := range customVals {
261-
if _, ok := tx.c.cacheTree[tx.dataset][workspace][customVal]; !ok {
297+
for _, partition := range partitions {
298+
if _, ok := tx.c.cacheTree[tx.dataset][partition]; !ok {
262299
missed = true
263300
continue
264301
}
265-
for _, state := range states {
266-
if _, ok := tx.c.cacheTree[tx.dataset][workspace][customVal][state]; !ok {
302+
if _, ok := tx.c.cacheTree[tx.dataset][partition][workspace]; !ok {
303+
missed = true
304+
continue
305+
}
306+
for _, customVal := range customVals {
307+
if _, ok := tx.c.cacheTree[tx.dataset][partition][workspace][customVal]; !ok {
267308
missed = true
268309
continue
269310
}
270-
for _, param := range params {
271-
e := tx.c.cacheTree[tx.dataset][workspace][customVal][state][param]
272-
if e.SetNoJobs(tx.id) {
273-
tx.c.cacheTree[tx.dataset][workspace][customVal][state][param] = e
274-
} else {
311+
for _, state := range states {
312+
if _, ok := tx.c.cacheTree[tx.dataset][partition][workspace][customVal][state]; !ok {
275313
missed = true
314+
continue
315+
}
316+
for _, param := range params {
317+
e := tx.c.cacheTree[tx.dataset][partition][workspace][customVal][state][param]
318+
if e.SetNoJobs(tx.id) {
319+
tx.c.cacheTree[tx.dataset][partition][workspace][customVal][state][param] = e
320+
} else {
321+
missed = true
322+
}
276323
}
277324
}
278325
}
279326
}
327+
280328
return !missed
281329
}
282330

@@ -295,9 +343,13 @@ func (c *NoResultsCache[T]) skipCache(states []string, parameters []T) bool {
295343
return false
296344
}
297345

298-
// filtersToCacheKeys returns the cache keys for the provided workspace, states, customVals and parameters filters.
346+
// filtersToCacheKeys returns the cache keys for the provided partition, workspace, states, customVals and parameters filters.
299347
// Wildcards are used if empty parameters are provided.
300-
func filtersToCacheKeys[T ParameterFilter](workspaceFilter string, statesFilter, customValsFilter []string, parametersFilter []T) (workspaceKey string, stateKeys, customValKeys, paramKeys []string) {
348+
func filtersToCacheKeys[T ParameterFilter](partitionFilter []string, workspaceFilter string, statesFilter, customValsFilter []string, parametersFilter []T) (partitionKeys []string, workspaceKey string, stateKeys, customValKeys, paramKeys []string) {
349+
partitionKeys = partitionFilter
350+
if len(partitionKeys) == 0 { // if no partition is provided, we use the wildcard
351+
partitionKeys = []string{wildcard}
352+
}
301353
workspaceKey = workspaceFilter
302354
if workspaceKey == "" { // if no workspace is provided, we use the wildcard
303355
workspaceKey = wildcard
@@ -314,12 +366,17 @@ func filtersToCacheKeys[T ParameterFilter](workspaceFilter string, statesFilter,
314366
if len(paramKeys) == 0 { // if no parameter is provided, we use the wildcard
315367
paramKeys = []string{wildcard}
316368
}
317-
return workspaceKey, stateKeys, customValKeys, paramKeys
369+
return partitionKeys, workspaceKey, stateKeys, customValKeys, paramKeys
318370
}
319371

320372
// filtersToInvalidationKeys returns the cache keys that need to be invalidated for the provided workspace, states, customVals and parameters filters.
321373
// Wildcard keys are also returned if needed. An empty slice is returned if all keys need to be invalidated at that level.
322-
func (c *NoResultsCache[T]) filtersToInvalidationKeys(workspaceFilter string, statesFilter, customValsFilter []string, parametersFilter []T) (workspaceKeys, stateKeys, customValKeys, paramKeys []string) {
374+
func (c *NoResultsCache[T]) filtersToInvalidationKeys(partitionFilter []string, workspaceFilter string, statesFilter, customValsFilter []string, parametersFilter []T) (partitionKeys, workspaceKeys, stateKeys, customValKeys, paramKeys []string) {
375+
if len(partitionFilter) > 0 { // include partitions along with the wildcard
376+
partitionKeys = make([]string, len(partitionFilter)+1)
377+
copy(partitionKeys, partitionFilter)
378+
partitionKeys[len(partitionFilter)] = wildcard
379+
}
323380
if workspaceFilter != "" {
324381
workspaceKeys = []string{workspaceFilter, wildcard}
325382
}
@@ -336,7 +393,7 @@ func (c *NoResultsCache[T]) filtersToInvalidationKeys(workspaceFilter string, st
336393
if len(paramKeys) > 0 { // include params along with the wildcard
337394
paramKeys = append(paramKeys, wildcard)
338395
}
339-
return workspaceKeys, stateKeys, customValKeys, paramKeys
396+
return partitionKeys, workspaceKeys, stateKeys, customValKeys, paramKeys
340397
}
341398

342399
// String returns a string representation of the cache's tree contents.
@@ -351,11 +408,12 @@ func (c *NoResultsCache[T]) String() string {
351408

352409
type (
353410
datasetKey = string
411+
partitionKey = string
354412
workspaceKey = string
355413
customValKey = string
356414
stateKey = string
357415
paramKey = string
358-
cacheTree map[datasetKey]map[workspaceKey]map[customValKey]map[stateKey]map[paramKey]cacheEntry
416+
cacheTree map[datasetKey]map[partitionKey]map[workspaceKey]map[customValKey]map[stateKey]map[paramKey]cacheEntry
359417
)
360418

361419
type cacheEntry struct {

0 commit comments

Comments
 (0)