Skip to content

Commit d277fca

Browse files
committed
feat(create): add new createDiff mode
1 parent ae65354 commit d277fca

10 files changed

Lines changed: 860 additions & 52 deletions

File tree

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ The job query format is as follows :
6565
// or "ratio", this will activate a detailed count of final matched patients per criteria
6666
"details": "<details>",
6767
// optional sampling ratio value between 0.0 and 1.0 to limit the number of patients of the cohort to create (it can be used to sample an existing cohort)
68-
"sampling": "<sampling>"
68+
"sampling": "<sampling>",
69+
// optional cohort id to use as a base for the "createDiff" mode
70+
"baseCohortId": "<base cohort id>"
6971
},
7072
"callbackUrl": "<callback url>" // optional callback url to retrieve the result
7173
}
@@ -75,6 +77,7 @@ The job query format is as follows :
7577
with `mode` being one of the following values:
7678
- `count` : Return the number of patients that match the criteria of the `cohortDefinitionSyntax`
7779
- `create`: Create a cohort of patients that match the criteria of the `cohortDefinitionSyntax`
80+
- `create_diff`: Create a change list from a base cohort of patients (defined in `modeOptions`) and the new/deleted ones that match the criteria of the `cohortDefinitionSyntax`
7881

7982
and `cohortDefinitionSyntax` being a JSON string that represents the criteria described in the [query format section](#query-format).
8083

src/main/scala/fr/aphp/id/eds/requester/CreateQuery.scala

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,18 @@ import fr.aphp.id.eds.requester.tools.JobUtils.addEmptyGroup
88
import fr.aphp.id.eds.requester.tools.{JobUtils, JobUtilsService, StageDetails}
99
import org.apache.log4j.Logger
1010
import org.apache.spark.sql.{SparkSession, functions => F}
11+
import org.hl7.fhir.r4.model.ListResource.ListMode
1112

1213
object CreateOptions extends Enumeration {
1314
type CreateOptions = String
1415
val sampling = "sampling"
1516
}
1617

18+
object CreateDiffOptions extends Enumeration {
19+
type CreateDiffOptions = String
20+
val baseCohortId = "baseCohortId"
21+
}
22+
1723
case class CreateQuery(queryBuilder: QueryBuilder = new DefaultQueryBuilder(),
1824
jobUtilsService: JobUtilsService = JobUtils)
1925
extends JobBase {
@@ -36,10 +42,13 @@ case class CreateQuery(queryBuilder: QueryBuilder = new DefaultQueryBuilder(),
3642
runtime: JobEnv,
3743
data: SparkJobParameter
3844
): JobBaseResult = {
39-
implicit val (request, criterionTagsMap, omopTools, resourceResolver, cacheEnabled) =
45+
implicit val (request, criterionTagsMap, omopTools, resourceResolver, cacheEnabled) = {
4046
jobUtilsService.initSparkJobRequest(logger, spark, runtime, data)
47+
}
48+
implicit val sparkSession: SparkSession = spark
4149

4250
validateRequestOrThrow(request)
51+
validateModeOptionsOrThrow(data)
4352

4453
// Init values here because we are in an object (i.e a singleton) and not a class
4554
var status: String = ""
@@ -72,12 +81,13 @@ case class CreateQuery(queryBuilder: QueryBuilder = new DefaultQueryBuilder(),
7281
.filter(c => cohort.columns.contains(c))
7382
.map(c => F.col(c)): _*)
7483
.dropDuplicates()
84+
.withColumn("deleted", F.lit(false))
7585

7686
if (data.modeOptions.contains(CreateOptions.sampling)) {
7787
val sampling = data.modeOptions(CreateOptions.sampling).toDouble
7888
// https://stackoverflow.com/questions/37416825/dataframe-sample-in-apache-spark-scala#comment62349780_37418684
7989
// to be sure to have the right number of rows
80-
cohort = cohort.sample(sampling+0.1).limit((sampling * cohort.count()).round.toInt)
90+
cohort = cohort.sample(sampling + 0.1).limit((sampling * cohort.count()).round.toInt)
8191
}
8292
cohort.cache()
8393
count = cohort.count()
@@ -93,8 +103,17 @@ case class CreateQuery(queryBuilder: QueryBuilder = new DefaultQueryBuilder(),
93103
data.cohortDefinitionSyntax,
94104
data.ownerEntityId,
95105
request.resourceType,
106+
if (data.mode == JobType.createDiff && data.modeOptions.contains(
107+
CreateDiffOptions.baseCohortId))
108+
Some(data.modeOptions(CreateDiffOptions.baseCohortId).toLong)
109+
else None,
110+
if (data.mode == JobType.createDiff) {
111+
ListMode.CHANGES
112+
} else {
113+
ListMode.SNAPSHOT
114+
},
96115
count
97-
))
116+
))
98117
.getOrElse(-1L)
99118
}
100119
// get a new cohortId
@@ -107,9 +126,31 @@ case class CreateQuery(queryBuilder: QueryBuilder = new DefaultQueryBuilder(),
107126

108127
// upload into pg and solr
109128
if (omopTools.isDefined) {
129+
val cohortToUpload =
130+
if (data.mode == JobType.createDiff && data.modeOptions.contains(
131+
CreateDiffOptions.baseCohortId)) {
132+
val baseCohortItems =
133+
omopTools.get.readCohortEntries(data.modeOptions(CreateDiffOptions.baseCohortId).toLong)
134+
baseCohortItems
135+
.join(cohort,
136+
baseCohortItems("_itemreferenceid") === cohort(ResultColumn.SUBJECT),
137+
"full_outer")
138+
.filter(
139+
baseCohortItems("_itemreferenceid").isNull || F
140+
.col(ResultColumn.SUBJECT)
141+
.isNull)
142+
.select(
143+
F.coalesce(baseCohortItems("_itemreferenceid"), cohort(ResultColumn.SUBJECT))
144+
.as(ResultColumn.SUBJECT),
145+
F.when(cohort(ResultColumn.SUBJECT).isNull, true).otherwise(false).as("deleted")
146+
)
147+
} else {
148+
cohort
149+
}
150+
110151
omopTools.get.updateCohort(
111152
cohortDefinitionId,
112-
cohort,
153+
cohortToUpload,
113154
completeRequest.sourcePopulation,
114155
count,
115156
cohortSizeBiggerThanLimit,
@@ -120,6 +161,24 @@ case class CreateQuery(queryBuilder: QueryBuilder = new DefaultQueryBuilder(),
120161
getCreationResult(cohortDefinitionId, count, status)
121162
}
122163

164+
private def validateModeOptionsOrThrow(data: SparkJobParameter): Unit = {
165+
val modeOptions = data.modeOptions
166+
if (data.mode == JobType.createDiff) {
167+
if (modeOptions.contains(CreateOptions.sampling)) {
168+
throw new RuntimeException("Can't use sampling with createDiff mode")
169+
}
170+
if (!modeOptions.contains(CreateDiffOptions.baseCohortId)) {
171+
throw new RuntimeException("baseCohortId is required for createDiff mode")
172+
}
173+
}
174+
if (modeOptions.contains(CreateOptions.sampling)) {
175+
val sampling = modeOptions(CreateOptions.sampling).toDouble
176+
if (sampling <= 0 || sampling > 1) {
177+
throw new RuntimeException("Sampling value should be between 0 and 1")
178+
}
179+
}
180+
}
181+
123182
private def getCreationResult(cohortDefinitionId: Long,
124183
count: Long,
125184
status: String): JobBaseResult = {

src/main/scala/fr/aphp/id/eds/requester/cohort/CohortCreation.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import fr.aphp.id.eds.requester.query.model.SourcePopulation
77
import fr.aphp.id.eds.requester.query.resolver.rest.DefaultRestFhirClient
88
import fr.aphp.id.eds.requester.{AppConfig, FhirServerConfig, PGConfig}
99
import org.apache.spark.sql.{DataFrame, SparkSession}
10+
import org.hl7.fhir.r4.model.ListResource.ListMode
11+
1012

1113
trait CohortCreation {
1214

@@ -24,6 +26,8 @@ trait CohortCreation {
2426
cohortDefinitionSyntax: String,
2527
ownerEntityId: String,
2628
resourceType: String,
29+
baseCohortId: Option[Long],
30+
mode: ListMode,
2731
size: Long): Long
2832

2933
def updateCohort(cohortId: Long,
@@ -33,6 +37,8 @@ trait CohortCreation {
3337
delayCohortCreation: Boolean,
3438
resourceType: String): Unit
3539

40+
def readCohortEntries(cohortId: Long)(implicit spark: SparkSession): DataFrame
41+
3642
}
3743

3844
object CohortCreation {

src/main/scala/fr/aphp/id/eds/requester/cohort/fhir/FhirCohortCreation.scala

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package fr.aphp.id.eds.requester.cohort.fhir
22

3+
import ca.uhn.fhir.rest.api.{SortOrderEnum, SortSpec}
34
import fr.aphp.id.eds.requester.ResultColumn
45
import fr.aphp.id.eds.requester.cohort.CohortCreation
56
import fr.aphp.id.eds.requester.query.model.SourcePopulation
67
import fr.aphp.id.eds.requester.query.resolver.rest.RestFhirClient
7-
import org.apache.spark.sql.{DataFrame, Row}
8-
import org.hl7.fhir.r4.model.{ListResource, Reference}
8+
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
9+
import org.hl7.fhir.r4.model.ListResource.ListMode
10+
import org.hl7.fhir.r4.model.{Bundle, Identifier, ListResource, Reference}
911

10-
import scala.jdk.CollectionConverters.seqAsJavaListConverter
12+
import scala.jdk.CollectionConverters.{asScalaBufferConverter, seqAsJavaListConverter}
1113

1214
class FhirCohortCreation(restFhirClient: RestFhirClient) extends CohortCreation {
1315

@@ -25,10 +27,19 @@ class FhirCohortCreation(restFhirClient: RestFhirClient) extends CohortCreation
2527
cohortDefinitionSyntax: String,
2628
ownerEntityId: String,
2729
resourceType: String,
30+
baseCohortId: Option[Long],
31+
mode: ListMode,
2832
size: Long): Long = {
2933
val list = new ListResource()
3034
list.setTitle(cohortDefinitionName)
31-
restFhirClient.getClient.create().resource(list).execute().getId.getIdPartAsLong
35+
list.setMode(mode)
36+
if (baseCohortId.isDefined) {
37+
list.setIdentifier(
38+
List(new Identifier().setValue(baseCohortId.get.toString)).asJava
39+
)
40+
}
41+
val fhirCreatedResource = restFhirClient.getClient.create().resource(list).execute()
42+
fhirCreatedResource.getResource.getIdElement.getIdPartAsLong
3243
}
3344

3445
override def updateCohort(cohortId: Long,
@@ -47,12 +58,50 @@ class FhirCohortCreation(restFhirClient: RestFhirClient) extends CohortCreation
4758
restFhirClient.getClient.update().resource(list).execute()
4859
}
4960

61+
override def readCohortEntries(cohortId: Long)(implicit spark: SparkSession): DataFrame = {
62+
val baseList = restFhirClient.getClient
63+
.read()
64+
.resource(classOf[ListResource])
65+
.withId(cohortId.toString)
66+
.execute()
67+
.getEntry
68+
69+
val diffListsResults: Bundle = restFhirClient.getClient
70+
.search()
71+
.forResource(classOf[ListResource])
72+
.where(ListResource.IDENTIFIER.exactly().code(cohortId.toString))
73+
.sort(new SortSpec("date", SortOrderEnum.ASC))
74+
.execute()
75+
val diffLists = diffListsResults.getEntry.asScala
76+
.map(_.getResource.asInstanceOf[ListResource])
77+
.filter(l => l.hasMode && l.getMode.equals(ListMode.CHANGES))
78+
79+
val diffEntries = diffLists.flatMap(_.getEntry.asScala)
80+
val result = diffEntries.foldLeft(baseList.asScala.map(_.getItem.getReference).toSet) {
81+
(acc, entry) =>
82+
val itemId = entry.getItem.getReference
83+
val deleted = entry.getDeleted
84+
if (deleted) {
85+
acc - itemId
86+
} else {
87+
acc + itemId
88+
}
89+
}
90+
91+
import spark.implicits._
92+
93+
result.toSeq.map(id => id.split("/").last.toLong).toDF("_itemreferenceid")
94+
}
95+
5096
private def createEntry(row: Row): ListResource.ListEntryComponent = {
5197
val patient = new Reference()
52-
val patientId = row.getAs[String](ResultColumn.SUBJECT)
98+
val patientId = row.getAs[Long](ResultColumn.SUBJECT)
99+
val deleted = row.getAs[Boolean]("deleted")
53100
patient.setReference("Patient/" + patientId)
54-
patient.setId(patientId)
101+
patient.setId(patientId.toString)
55102
val entry = new ListResource.ListEntryComponent()
56103
entry.setItem(patient)
104+
entry.setDeleted(deleted)
105+
entry
57106
}
58107
}

src/main/scala/fr/aphp/id/eds/requester/cohort/pg/PGCohortCreation.scala

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import fr.aphp.id.eds.requester.query.model.SourcePopulation
88
import fr.aphp.id.eds.requester.tools.SolrTools
99
import fr.aphp.id.eds.requester.{AppConfig, ResultColumn}
1010
import org.apache.spark.sql.functions._
11-
import org.apache.spark.sql.{DataFrame, functions => F}
11+
import org.apache.spark.sql.{DataFrame, SparkSession, functions => F}
12+
import org.hl7.fhir.r4.model.ListResource.ListMode
1213

1314
/**
1415
* @param pg pgTool obj
@@ -24,12 +25,19 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
2425
cohortDefinitionSyntax: String,
2526
ownerEntityId: String,
2627
resourceType: String,
28+
baseCohortId: Option[Long],
29+
mode: ListMode,
2730
size: Long): Long = {
31+
val (indentifier_col, identifier_val) = if (baseCohortId.isDefined) {
32+
(" identifier,", s" ${baseCohortId.get.toString},")
33+
} else {
34+
("", "")
35+
}
2836
val stmt =
2937
s"""
3038
|insert into ${cohort_table_rw}
31-
|(hash, title, ${note_text_column_name}, _sourcereferenceid, source__reference, _provider, source__type, mode, status, subject__type, date, _size)
32-
|values (-1, ?, ?, ?, ?, '$cohort_provider_name', 'Practitioner', 'snapshot', '${CohortStatus.RUNNING}', ?, now(), ?)
39+
|(hash, title, ${note_text_column_name},${indentifier_col} _sourcereferenceid, source__reference, _provider, source__type, mode, status, subject__type, date, _size)
40+
|values (-1, ?, ?,${identifier_val} ?, ?, '$cohort_provider_name', 'Practitioner', '${mode.toCode}', '${CohortStatus.RUNNING}', ?, now(), ?)
3341
|returning id
3442
|""".stripMargin
3543
val result = pg
@@ -70,6 +78,7 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
7078
.select(F.col("_itemreferenceid"),
7179
F.col("item__reference"),
7280
F.col("_provider"),
81+
F.col("deleted"),
7382
F.col("_listid"))
7483

7584
uploadCohortTableToPG(dataframe)
@@ -87,6 +96,40 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
8796
}
8897
}
8998

99+
override def readCohortEntries(cohortId: Long)(implicit spark: SparkSession): DataFrame = {
100+
val stmt =
101+
s"""
102+
|select _itemreferenceid
103+
|from ${cohort_item_table_rw}
104+
|where _listid = $cohortId
105+
|""".stripMargin
106+
val baseCohort = pg.sqlExecWithResult(stmt)
107+
val diffs = readCohortDiffEntries(cohortId)
108+
val addedDiffs = diffs.filter(col("deleted").isNull || col("deleted") === false)
109+
val deletedDiffs = diffs.filter(col("deleted") === true)
110+
111+
val result = baseCohort
112+
.union(addedDiffs.select("_itemreferenceid"))
113+
.except(deletedDiffs.select("_itemreferenceid"))
114+
115+
result
116+
}
117+
118+
private def readCohortDiffEntries(cohortId: Long): DataFrame = {
119+
val stmt =
120+
s"""
121+
|select date,_itemreferenceid,deleted
122+
|from ${cohort_item_table_rw}
123+
|join ${cohort_table_rw} on ${cohort_table_rw}.id = ${cohort_item_table_rw}._listid
124+
|where ${cohort_table_rw}.identifier___official__value = '$cohortId'
125+
|""".stripMargin
126+
pg.sqlExecWithResult(stmt)
127+
.select(col("date"), col("_itemreferenceid"), col("deleted"))
128+
.orderBy(col("date").asc)
129+
.groupBy(col("_itemreferenceid"))
130+
.agg(last(col("deleted")).as("deleted"))
131+
}
132+
90133
private def uploadRelationship(cohortDefinitionId: Long,
91134
sourcePopulation: SourcePopulation): Unit = {
92135
if (sourcePopulation.cohortList.isDefined) {
@@ -168,6 +211,7 @@ class PGCohortCreation(pg: PGTool) extends CohortCreation with LazyLogging {
168211
"_listid",
169212
"item__reference",
170213
"_provider",
214+
"deleted",
171215
"_itemreferenceid"
172216
).toSet == df.columns.toSet,
173217
"cohort dataframe shall have _listid, _provider, _provider and item__reference"

src/main/scala/fr/aphp/id/eds/requester/jobs/SparkJobParameter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ object JobType extends Enumeration {
2929
val countAll = "count_all"
3030
val countWithDetails = "count_with_details"
3131
val create = "create"
32+
val createDiff = "create_diff"
3233
val purgeCache = "purge_cache"
3334
}
3435

src/main/scala/fr/aphp/id/eds/requester/server/JobController.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class JobController(implicit val swagger: Swagger)
7878
case JobType.countAll => jobManager.execJob(JobsConfig.countJob, jobData)
7979
case JobType.countWithDetails => jobManager.execJob(JobsConfig.countJob, jobData)
8080
case JobType.create => jobManager.execJob(JobsConfig.createJob, jobData)
81+
case JobType.createDiff => jobManager.execJob(JobsConfig.createJob, jobData)
8182
}
8283
}
8384

0 commit comments

Comments
 (0)