-
Notifications
You must be signed in to change notification settings - Fork 179
Expand file tree
/
Copy pathGenerationCacheHelper.java
More file actions
146 lines (116 loc) · 6.87 KB
/
GenerationCacheHelper.java
File metadata and controls
146 lines (116 loc) · 6.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package org.ohdsi.webapi.generationcache;
import org.ohdsi.sql.SqlRender;
import org.ohdsi.webapi.cohortdefinition.CohortDefinition;
import org.ohdsi.webapi.cohortdefinition.CohortGenerationRequest;
import org.ohdsi.webapi.cohortdefinition.CohortGenerationRequestBuilder;
import org.ohdsi.webapi.cohortdefinition.CohortGenerationUtils;
import org.ohdsi.webapi.source.Source;
import org.ohdsi.webapi.util.CancelableJdbcTemplate;
import org.ohdsi.webapi.util.SourceUtils;
import org.ohdsi.webapi.util.StatementCancel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.springframework.transaction.support.TransactionTemplate;
import static org.ohdsi.webapi.Constants.Params.RESULTS_DATABASE_SCHEMA;
@Component
public class GenerationCacheHelper {
private static final Logger log = LoggerFactory.getLogger(GenerationCacheHelper.class);
private static final String CACHE_USED = "Using cached generation results for %s with id=%s and source=%s";
private static final ConcurrentHashMap<CacheableResource, Object> monitors = new ConcurrentHashMap<>();
private final TransactionTemplate transactionTemplateRequiresNew;
private final GenerationCacheService generationCacheService;
public GenerationCacheHelper(GenerationCacheService generationCacheService, TransactionTemplate transactionTemplateRequiresNew) {
this.generationCacheService = generationCacheService;
this.transactionTemplateRequiresNew = transactionTemplateRequiresNew;
}
public Integer computeHash(String expression) {
return generationCacheService.getDesignHash(CacheableGenerationType.COHORT, expression);
}
public CacheResult computeCacheIfAbsent(CohortDefinition cohortDefinition, Source source, CohortGenerationRequestBuilder requestBuilder, BiConsumer<Integer, String[]> sqlExecutor) {
CacheableGenerationType type = CacheableGenerationType.COHORT;
Integer designHash = computeHash(cohortDefinition.getDetails().getExpression());
log.info("Computes cache if absent for type = {}, design = {}, source id = {}", type, designHash.toString(), source.getSourceId());
synchronized (monitors.computeIfAbsent(new CacheableResource(type, designHash, source.getSourceId()), cr -> new Object())) {
// we execute the synchronized block in a separate transaction to make the cache changes visible immediately to all other threads
return transactionTemplateRequiresNew.execute(s -> {
log.info("Retrieves or invalidates cache for cohort id = {}", cohortDefinition.getId());
GenerationCache cache = generationCacheService.getCacheOrEraseInvalid(type, designHash, source.getSourceId());
if (cache == null || requestBuilder.hasRetainCohortCovariates()) {
String messagePrefix = (cache == null ? "Cache is absent" : "Cache will not be used because the retain cohort covariates option is switched on");
log.info(messagePrefix + " for cohort id = {}. Calculating with design hash = {}", cohortDefinition.getId(), designHash);
// Ensure that there are no records in results schema with which we could mess up
generationCacheService.removeCache(type, source, designHash);
// the line below forces a cached entry to be really deleted and it is a bit unclear why this line was even present as the cache had to be null anyway
// without it there is a constraint violation exception when there was a cache entry present and the retain covariates option is on
GenerationCache cachedResultsStillPresent = generationCacheService.getCacheOrEraseInvalid(type, designHash, source.getSourceId());
CohortGenerationRequest cohortGenerationRequest = requestBuilder
.withExpression(cohortDefinition.getDetails().getExpressionObject())
.withSource(source)
.withTargetId(designHash)
.withCohortId(cohortDefinition.getId())
.buildWithRetainCohortCovariates();
String[] sqls = CohortGenerationUtils.buildGenerationSql(cohortGenerationRequest);
sqlExecutor.accept(designHash, sqls);
cache = generationCacheService.cacheResults(CacheableGenerationType.COHORT, designHash, source.getSourceId());
} else {
log.info(String.format(CACHE_USED, type, cohortDefinition.getId(), source.getSourceKey()));
}
String sql = SqlRender.renderSql(
generationCacheService.getResultsSql(cache),
new String[]{RESULTS_DATABASE_SCHEMA},
new String[]{SourceUtils.getResultsQualifier(source)}
);
log.info("Finished computation cache if absent for cohort id = {}", cohortDefinition.getId());
return new CacheResult(cache.getDesignHash(), sql);
});
}
}
public void runCancelableCohortGeneration(CancelableJdbcTemplate cancelableJdbcTemplate, StatementCancel stmtCancel, String sqls[]) {
cancelableJdbcTemplate.batchUpdate(stmtCancel, sqls);
// Ensure that no cache created if generation has been cancelled
if (stmtCancel.isCanceled()) {
throw new RuntimeException("Cohort generation has been cancelled");
}
}
public class CacheResult {
private Integer identifier;
private String sql;
public CacheResult(Integer identifier, String sql) {
this.identifier = identifier;
this.sql = sql;
}
public Integer getIdentifier() {
return identifier;
}
public String getSql() {
return sql;
}
}
private static class CacheableResource {
private CacheableGenerationType type;
private Integer designHash;
private Integer sourceId;
public CacheableResource(CacheableGenerationType type, Integer designHash, Integer sourceId) {
this.type = type;
this.designHash = designHash;
this.sourceId = sourceId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheableResource that = (CacheableResource) o;
return type == that.type &&
Objects.equals(designHash, that.designHash) &&
Objects.equals(sourceId, that.sourceId);
}
@Override
public int hashCode() {
return Objects.hash(type, designHash, sourceId);
}
}
}